1
0
mirror of https://github.com/RIOT-OS/RIOT.git synced 2025-01-18 12:52:44 +01:00

Merge pull request https://github.com/RIOT-OS/applications/pull/53 from gebart/pr/spectrum-plot

spectrum-scanner: Improved plotting script
This commit is contained in:
Sebastian Meiling 2019-11-28 11:50:10 +01:00 committed by chrysn
commit 3d2c230696

View File

@ -20,78 +20,84 @@ import re
import time
import logging
import argparse
from serial import Serial
import serial
import numpy as np
import matplotlib.pylab as plt
import matplotlib.pyplot as plt
import matplotlib.animation as animation
class rssi_plot(object):
class SpectrumEmitter(object):
def plot_rssi(self, port):
self.count = 0
self.dt = 0.5
self.tlen = 120
# Generate mesh for plotting
Y, X = np.mgrid[slice(0 - .5, 26 + 1.5, 1), slice(0 - self.dt / 2, self.tlen + 1 - self.dt / 2, self.dt)]
Z = np.zeros_like(X)
# X and Y are bounds, so Z should be the value *inside* those bounds.
# Therefore, remove the last value from the Z array.
Z = Z[:, :-1]
logging.debug("Creating figure")
plt.figure()
pcm = plt.pcolormesh(X, Y, Z, vmin=-128, vmax=0, cmap=plt.cm.get_cmap('jet'))
plt.xlim([0, self.tlen])
plt.ylim([0, 26])
plt.colorbar(label="Measured signal level [dB]")
plt.ylabel("Channel number")
plt.xlabel("Time [s]")
plt.ion()
logging.debug("Show plot window")
plt.show()
ch_min = 26
ch_max = 0
last_update = time.time()
def __init__(self, port):
self.port = port
def data_gen(self):
logging.info("Begin collecting data from serial port")
while True:
line = port.readline().rstrip()
# Read one line from the spectrum device
line = self.port.readline().rstrip()
pkt_data = re.match(r"\[([-+]?\d+),\s*([-+]?\d+),\s*([-+]?\d+)\]\s*(.*)", line.decode(errors='replace'))
if pkt_data:
now = time.time()
ed = {}
try:
iface_id = int(pkt_data.group(1))
timestamp = int(pkt_data.group(2))
count = int(pkt_data.group(3))
tidx = int(timestamp / (self.dt * 1000000)) % (Z.shape[1])
except ValueError:
# Incorrect data received, probably UART noise or debugging
# messages from the device, not much else we can do other
# than try again with the next line
continue
logging.debug("data: tidx=%d if=%d cnt=%d t=%d", tidx, iface_id, count, timestamp)
logging.debug("data: if=%d cnt=%d t=%d", iface_id, count, timestamp)
raw = pkt_data.group(4)
resize = False
for ch_ed in raw.split(","):
try:
pair = ch_ed.split(":")
ch = int(pair[0])
ed = float(pair[1])
if ch < ch_min:
ch_min = ch
resize = True
if ch > ch_max:
ch_max = ch
resize = True
Z[ch, tidx] = ed
ed[ch] = float(pair[1])
except (ValueError, IndexError):
continue
if resize:
logging.debug("resize: %d %d" % (ch_min, ch_max))
plt.ylim([ch_min - .5, ch_max + .5])
if now > last_update + 1:
last_update = now
pcm.set_array(Z.ravel())
pcm.autoscale()
pcm.changed()
plt.pause(0.01)
yield ed
class RSSIPlot(object):
def __init__(self, ax, *args, tlen=120, dt=0.5, nchannels=27):
self.ax = ax
self.count = 0
self.dt = dt
self.tlen = tlen
# Generate mesh for plotting, this creates a grid of nchannel rows and
# (tlen / dt) columns
self.Y, self.X = np.mgrid[slice(0 - .5, nchannels + 0.5, 1), slice(-self.tlen - self.dt / 2, 0 + 1 - self.dt / 2, self.dt)]
Z = np.zeros_like(self.X)
# X and Y are the bounds, so Z should be the value *inside* those bounds.
# Therefore, remove the last row and column from the Z array.
self.Z = Z[:-1, :-1]
self.pcm = self.ax.pcolormesh(self.X, self.Y, self.Z, vmin=-100, vmax=-20, cmap=plt.cm.get_cmap('jet'))
self.ax.get_figure().colorbar(self.pcm, label="Measured signal level [dB]")
self.ax.set_ylabel("Channel number")
self.ax.set_xlabel("Time [s]")
self.ch_min = nchannels
self.ch_max = 0
def update(self, ed):
resize = False
for ch in ed.keys():
if ch < self.ch_min:
self.ch_min = ch
resize = True
if ch > self.ch_max:
self.ch_max = ch
resize = True
col = np.zeros((self.Z.shape[0], 1))
for ch in ed.keys():
col[ch, 0] = ed[ch]
self.Z = np.hstack((self.Z[:, 1:], col))
if resize:
self.ax.set_ylim([self.ch_min - .5, self.ch_max + 0.5])
self.ax.set_yticks(range(self.ch_min, self.ch_max + 1))
self.pcm.set_array(self.Z.ravel())
return self.pcm,
def main(argv):
@ -110,14 +116,22 @@ def main(argv):
# open serial port
try:
logging.debug("Open serial port %s, baud=%d", args.tty, args.baudrate)
port = Serial(args.tty, args.baudrate, dsrdtr=0, rtscts=0, timeout=0.3)
port = serial.Serial(port=args.tty, baudrate=9600, dsrdtr=0, rtscts=0, timeout=0.3)
# This baudrate reconfiguration is necessary for certain USB to serial
# adapters, the Linux cdc_acm driver will keep repeating stale buffer
# contents otherwise. No idea about the cause, but this fixes the symptom.
port.baudrate = args.baudrate
except IOError:
logging.critical("error opening serial port", file=sys.stderr)
sys.exit(2)
try:
app = rssi_plot()
app.plot_rssi(port)
logging.debug("Creating figure")
fig, ax = plt.subplots()
graph = RSSIPlot(ax)
emitter = SpectrumEmitter(port)
ani = animation.FuncAnimation(fig, graph.update, emitter.data_gen, interval=10, blit=True)
plt.show()
except KeyboardInterrupt:
port.close()
sys.exit(2)