mirror of
https://github.com/RIOT-OS/RIOT.git
synced 2025-01-18 01:32:44 +01:00
163 lines
5.5 KiB
Python
Executable File
163 lines
5.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
# Copyright (C) 2017 Eistec AB
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import sys
|
|
import re
|
|
import logging
|
|
import argparse
|
|
import serial
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
import matplotlib.animation as animation
|
|
|
|
|
|
class SpectrumEmitter(object):
|
|
def __init__(self, port):
|
|
self.port = port
|
|
|
|
def data_gen(self):
|
|
logging.info("Begin collecting data from serial port")
|
|
while True:
|
|
# Read one line from the spectrum device
|
|
line = self.port.readline().rstrip()
|
|
pkt_data = re.match(
|
|
r"\[([-+]?\d+),\s*([-+]?\d+),\s*([-+]?\d+)\]\s*(.*)",
|
|
line.decode(errors="replace"),
|
|
)
|
|
if pkt_data:
|
|
ed = {}
|
|
try:
|
|
iface_id = int(pkt_data.group(1))
|
|
timestamp = int(pkt_data.group(2))
|
|
count = int(pkt_data.group(3))
|
|
except ValueError:
|
|
# Incorrect data received, probably UART noise or debugging
|
|
# messages from the device, not much else we can do other
|
|
# than try again with the next line
|
|
continue
|
|
logging.debug("data: if=%d cnt=%d t=%d", iface_id, count, timestamp)
|
|
raw = pkt_data.group(4)
|
|
for ch_ed in raw.split(","):
|
|
try:
|
|
pair = ch_ed.split(":")
|
|
ch = int(pair[0])
|
|
ed[ch] = float(pair[1])
|
|
except (ValueError, IndexError):
|
|
continue
|
|
yield ed
|
|
|
|
|
|
class RSSIPlot(object):
|
|
def __init__(self, ax, *args, tlen=120, dt=0.5, nchannels=27):
|
|
self.ax = ax
|
|
self.count = 0
|
|
self.dt = dt
|
|
self.tlen = tlen
|
|
# Generate mesh for plotting, this creates a grid of nchannel rows and
|
|
# (tlen / dt) columns
|
|
self.Y, self.X = np.mgrid[
|
|
slice(0 - 0.5, nchannels + 0.5, 1),
|
|
slice(-self.tlen - self.dt / 2, 0 + 1 - self.dt / 2, self.dt),
|
|
]
|
|
Z = np.zeros_like(self.X)
|
|
# X and Y are the bounds, so Z should be the value *inside* those bounds.
|
|
# Therefore, remove the last row and column from the Z array.
|
|
self.Z = Z[:-1, :-1]
|
|
self.pcm = self.ax.pcolormesh(
|
|
self.X, self.Y, self.Z, vmin=-100, vmax=-20, cmap=plt.cm.get_cmap("jet")
|
|
)
|
|
self.ax.get_figure().colorbar(self.pcm, label="Measured signal level [dB]")
|
|
self.ax.set_ylabel("Channel number")
|
|
self.ax.set_xlabel("Time [s]")
|
|
self.ch_min = nchannels
|
|
self.ch_max = 0
|
|
|
|
def update(self, ed):
|
|
resize = False
|
|
for ch in ed.keys():
|
|
if ch < self.ch_min:
|
|
self.ch_min = ch
|
|
resize = True
|
|
if ch > self.ch_max:
|
|
self.ch_max = ch
|
|
resize = True
|
|
col = np.zeros((self.Z.shape[0], 1))
|
|
for ch in ed.keys():
|
|
col[ch, 0] = ed[ch]
|
|
self.Z = np.hstack((self.Z[:, 1:], col))
|
|
if resize:
|
|
self.ax.set_ylim([self.ch_min - 0.5, self.ch_max + 0.5])
|
|
self.ax.set_yticks(range(self.ch_min, self.ch_max + 1))
|
|
self.pcm.set_array(self.Z.ravel())
|
|
return (self.pcm,)
|
|
|
|
|
|
def main(argv):
|
|
loglevels = [
|
|
logging.CRITICAL,
|
|
logging.ERROR,
|
|
logging.WARN,
|
|
logging.INFO,
|
|
logging.DEBUG,
|
|
]
|
|
parser = argparse.ArgumentParser(argv)
|
|
parser.add_argument(
|
|
"-v",
|
|
"--verbosity",
|
|
type=int,
|
|
default=4,
|
|
help="set logging verbosity, 1=CRITICAL, 5=DEBUG",
|
|
)
|
|
parser.add_argument("tty", help="Serial port device file name")
|
|
parser.add_argument(
|
|
"-b", "--baudrate", default=115200, type=int, help="Serial port baudrate"
|
|
)
|
|
args = parser.parse_args()
|
|
# logging setup
|
|
logging.basicConfig(level=loglevels[args.verbosity - 1])
|
|
|
|
# open serial port
|
|
try:
|
|
logging.debug("Open serial port %s, baud=%d", args.tty, args.baudrate)
|
|
port = serial.Serial(
|
|
port=args.tty, baudrate=9600, dsrdtr=0, rtscts=0, timeout=0.3
|
|
)
|
|
# This baudrate reconfiguration is necessary for certain USB to serial
|
|
# adapters, the Linux cdc_acm driver will keep repeating stale buffer
|
|
# contents otherwise. No idea about the cause, but this fixes the symptom.
|
|
port.baudrate = args.baudrate
|
|
except IOError:
|
|
logging.critical("error opening serial port", file=sys.stderr)
|
|
sys.exit(2)
|
|
|
|
try:
|
|
logging.debug("Creating figure")
|
|
fig, ax = plt.subplots()
|
|
graph = RSSIPlot(ax)
|
|
emitter = SpectrumEmitter(port)
|
|
animation.FuncAnimation(
|
|
fig, graph.update, emitter.data_gen, interval=10, blit=True
|
|
)
|
|
plt.show()
|
|
except KeyboardInterrupt:
|
|
port.close()
|
|
sys.exit(2)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main(sys.argv)
|