1
0
mirror of https://github.com/RIOT-OS/RIOT.git synced 2024-12-29 04:50:03 +01:00

Merge pull request https://github.com/RIOT-OS/applications/pull/82 from miri64/scripts/enh/black

sniffer / spectrum-scanner: reformat using black
This commit is contained in:
chrysn 2022-09-20 11:13:06 +02:00
commit a54501fa21
2 changed files with 84 additions and 54 deletions

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
"""
(C) 2012, Mariano Alvira <mar@devl.org>
(C) 2014, Oliver Hahm <oliver.hahm@inria.fr>
(C) 2015, Hauke Petersen <hauke.petersen@fu-berlin.de>
@ -30,7 +30,7 @@ HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
SUCH DAMAGE.
'''
"""
from __future__ import print_function
import argparse
@ -42,13 +42,13 @@ from struct import pack
from serial import Serial
# PCAP setup
MAGIC = 0xa1b2c3d4
MAGIC = 0xA1B2C3D4
MAJOR = 2
MINOR = 4
ZONE = 0
SIG = 0
SNAPLEN = 0xffff
NETWORK = 230 # 802.15.4 no FCS
SNAPLEN = 0xFFFF
NETWORK = 230 # 802.15.4 no FCS
DEFAULT_BAUDRATE = 115200
@ -56,45 +56,44 @@ DEFAULT_BAUDRATE = 115200
def configure_interface(port, channel):
line = ""
iface = 0
port.write('ifconfig\n'.encode())
port.write("ifconfig\n".encode())
while True:
line = port.readline()
if line == b'':
print("Application has no network interface defined",
file=sys.stderr)
if line == b"":
print("Application has no network interface defined", file=sys.stderr)
sys.exit(2)
match = re.search(r'^Iface +(\d+)', line.decode(errors="ignore"))
match = re.search(r"^Iface +(\d+)", line.decode(errors="ignore"))
if match is not None:
iface = int(match.group(1))
break
# set channel, raw mode, and promiscuous mode
print('ifconfig %d set chan %d' % (iface, channel), file=sys.stderr)
print('ifconfig %d raw' % iface, file=sys.stderr)
print('ifconfig %d promisc' % iface, file=sys.stderr)
port.write(('ifconfig %d set chan %d\n' % (iface, channel)).encode())
port.write(('ifconfig %d raw\n' % iface).encode())
port.write(('ifconfig %d promisc\n' % iface).encode())
print("ifconfig %d set chan %d" % (iface, channel), file=sys.stderr)
print("ifconfig %d raw" % iface, file=sys.stderr)
print("ifconfig %d promisc" % iface, file=sys.stderr)
port.write(("ifconfig %d set chan %d\n" % (iface, channel)).encode())
port.write(("ifconfig %d raw\n" % iface).encode())
port.write(("ifconfig %d promisc\n" % iface).encode())
def generate_pcap(port, out):
# count incoming packets
count = 0
# output overall PCAP header
out.write(pack('<LHHLLLL', MAGIC, MAJOR, MINOR, ZONE, SIG, SNAPLEN,
NETWORK))
out.write(pack("<LHHLLLL", MAGIC, MAJOR, MINOR, ZONE, SIG, SNAPLEN, NETWORK))
sys.stderr.write("RX: %i\r" % count)
while True:
line = port.readline().rstrip()
pkt_header = re.match(r">? *rftest-rx --- len (\w+).*",
line.decode(errors="ignore"))
pkt_header = re.match(
r">? *rftest-rx --- len (\w+).*", line.decode(errors="ignore")
)
if pkt_header:
now = time()
sec = int(now)
usec = int((now - sec) * 1000000)
length = int(pkt_header.group(1), 16)
out.write(pack('<LLLL', sec, usec, length, length))
out.write(pack("<LLLL", sec, usec, length, length))
out.flush()
count += 1
sys.stderr.write("RX: %i\r" % count)
@ -102,10 +101,10 @@ def generate_pcap(port, out):
pkt_data = re.match(r"(\w\w )+", line.decode(errors="ignore"))
if pkt_data:
for part in line.decode(errors="ignore").split(' '):
for part in line.decode(errors="ignore").split(" "):
byte = re.match(r"(\w\w)", part)
if byte:
out.write(pack('<B', int(byte.group(1), 16)))
out.write(pack("<B", int(byte.group(1), 16)))
out.flush()
@ -114,15 +113,14 @@ def connect(args):
if args.conn.startswith("/dev/tty") or args.conn.startswith("COM"):
# open serial port
try:
conn = Serial(args.conn, args.baudrate, dsrdtr=0, rtscts=0,
timeout=1)
conn = Serial(args.conn, args.baudrate, dsrdtr=0, rtscts=0, timeout=1)
except IOError:
print("error opening serial port %s" % args.conn, file=sys.stderr)
sys.exit(2)
else:
try:
port = args.conn.split(":")[-1]
host = args.conn[:-(len(port)+1)]
host = args.conn[: -(len(port) + 1)]
port = int(port)
except (IndexError, ValueError):
print("Can't parse host:port pair %s" % args.conn, file=sys.stderr)
@ -143,17 +141,29 @@ def main():
else:
default_outfile = sys.stdout
p = argparse.ArgumentParser()
p.add_argument("-b", "--baudrate", type=int, default=DEFAULT_BAUDRATE,
help="Baudrate of the serial port (only evaluated "
"for non TCP-terminal, default: %d)" %
DEFAULT_BAUDRATE)
p.add_argument("conn", metavar="tty/host:port", type=str,
help="Serial port or TCP (host, port) tuple to "
"terminal with sniffer application")
p.add_argument(
"-b",
"--baudrate",
type=int,
default=DEFAULT_BAUDRATE,
help="Baudrate of the serial port (only evaluated "
"for non TCP-terminal, default: %d)" % DEFAULT_BAUDRATE,
)
p.add_argument(
"conn",
metavar="tty/host:port",
type=str,
help="Serial port or TCP (host, port) tuple to "
"terminal with sniffer application",
)
p.add_argument("channel", type=int, help="Channel to sniff on")
p.add_argument("outfile", type=argparse.FileType("w+b"),
default=default_outfile, nargs="?",
help="PCAP file to output to (default: stdout)")
p.add_argument(
"outfile",
type=argparse.FileType("w+b"),
default=default_outfile,
nargs="?",
help="PCAP file to output to (default: stdout)",
)
args = p.parse_args()
conn = connect(args)

View File

@ -26,7 +26,6 @@ import matplotlib.animation as animation
class SpectrumEmitter(object):
def __init__(self, port):
self.port = port
@ -35,7 +34,10 @@ class SpectrumEmitter(object):
while True:
# Read one line from the spectrum device
line = self.port.readline().rstrip()
pkt_data = re.match(r"\[([-+]?\d+),\s*([-+]?\d+),\s*([-+]?\d+)\]\s*(.*)", line.decode(errors='replace'))
pkt_data = re.match(
r"\[([-+]?\d+),\s*([-+]?\d+),\s*([-+]?\d+)\]\s*(.*)",
line.decode(errors="replace"),
)
if pkt_data:
ed = {}
try:
@ -60,7 +62,6 @@ class SpectrumEmitter(object):
class RSSIPlot(object):
def __init__(self, ax, *args, tlen=120, dt=0.5, nchannels=27):
self.ax = ax
self.count = 0
@ -68,13 +69,17 @@ class RSSIPlot(object):
self.tlen = tlen
# Generate mesh for plotting, this creates a grid of nchannel rows and
# (tlen / dt) columns
self.Y, self.X = np.mgrid[slice(0 - .5, nchannels + 0.5, 1),
slice(-self.tlen - self.dt / 2, 0 + 1 - self.dt / 2, self.dt)]
self.Y, self.X = np.mgrid[
slice(0 - 0.5, nchannels + 0.5, 1),
slice(-self.tlen - self.dt / 2, 0 + 1 - self.dt / 2, self.dt),
]
Z = np.zeros_like(self.X)
# X and Y are the bounds, so Z should be the value *inside* those bounds.
# Therefore, remove the last row and column from the Z array.
self.Z = Z[:-1, :-1]
self.pcm = self.ax.pcolormesh(self.X, self.Y, self.Z, vmin=-100, vmax=-20, cmap=plt.cm.get_cmap('jet'))
self.pcm = self.ax.pcolormesh(
self.X, self.Y, self.Z, vmin=-100, vmax=-20, cmap=plt.cm.get_cmap("jet")
)
self.ax.get_figure().colorbar(self.pcm, label="Measured signal level [dB]")
self.ax.set_ylabel("Channel number")
self.ax.set_xlabel("Time [s]")
@ -95,29 +100,42 @@ class RSSIPlot(object):
col[ch, 0] = ed[ch]
self.Z = np.hstack((self.Z[:, 1:], col))
if resize:
self.ax.set_ylim([self.ch_min - .5, self.ch_max + 0.5])
self.ax.set_ylim([self.ch_min - 0.5, self.ch_max + 0.5])
self.ax.set_yticks(range(self.ch_min, self.ch_max + 1))
self.pcm.set_array(self.Z.ravel())
return self.pcm,
return (self.pcm,)
def main(argv):
loglevels = [logging.CRITICAL, logging.ERROR, logging.WARN, logging.INFO, logging.DEBUG]
loglevels = [
logging.CRITICAL,
logging.ERROR,
logging.WARN,
logging.INFO,
logging.DEBUG,
]
parser = argparse.ArgumentParser(argv)
parser.add_argument('-v', '--verbosity', type=int, default=4,
help='set logging verbosity, 1=CRITICAL, 5=DEBUG')
parser.add_argument('tty',
help='Serial port device file name')
parser.add_argument('-b', '--baudrate', default=115200, type=int,
help='Serial port baudrate')
parser.add_argument(
"-v",
"--verbosity",
type=int,
default=4,
help="set logging verbosity, 1=CRITICAL, 5=DEBUG",
)
parser.add_argument("tty", help="Serial port device file name")
parser.add_argument(
"-b", "--baudrate", default=115200, type=int, help="Serial port baudrate"
)
args = parser.parse_args()
# logging setup
logging.basicConfig(level=loglevels[args.verbosity-1])
logging.basicConfig(level=loglevels[args.verbosity - 1])
# open serial port
try:
logging.debug("Open serial port %s, baud=%d", args.tty, args.baudrate)
port = serial.Serial(port=args.tty, baudrate=9600, dsrdtr=0, rtscts=0, timeout=0.3)
port = serial.Serial(
port=args.tty, baudrate=9600, dsrdtr=0, rtscts=0, timeout=0.3
)
# This baudrate reconfiguration is necessary for certain USB to serial
# adapters, the Linux cdc_acm driver will keep repeating stale buffer
# contents otherwise. No idea about the cause, but this fixes the symptom.
@ -131,7 +149,9 @@ def main(argv):
fig, ax = plt.subplots()
graph = RSSIPlot(ax)
emitter = SpectrumEmitter(port)
animation.FuncAnimation(fig, graph.update, emitter.data_gen, interval=10, blit=True)
animation.FuncAnimation(
fig, graph.update, emitter.data_gen, interval=10, blit=True
)
plt.show()
except KeyboardInterrupt:
port.close()