1
0
mirror of https://github.com/RIOT-OS/RIOT.git synced 2025-01-18 12:52:44 +01:00

riotctrl_shell.gnrc: move compile object init to parser constructors

This commit is contained in:
Martine S. Lenders 2020-07-08 12:21:20 +02:00
parent 0a86086967
commit c3e6d89bcb
No known key found for this signature in database
GPG Key ID: CCD317364F63286F

View File

@ -18,6 +18,20 @@ from riotctrl.shell import ShellInteraction, ShellInteractionParser
# ==== Parsers ==== # ==== Parsers ====
class GNRCICMPv6EchoParser(ShellInteractionParser): class GNRCICMPv6EchoParser(ShellInteractionParser):
def __init__(self):
self.c_reply = re.compile(r"\d+ bytes from "
r"(?P<source>[0-9a-f:]+(%\S+)?): "
r"icmp_seq=(?P<seq>\d+) ttl=(?P<ttl>\d+)"
r"( rssi=(?P<rssi>-?\d+) dBm)?"
r"( time=(?P<rtt>\d+.\d+) ms)?"
r"(?P<dup> \(DUP\))?")
self.c_stats = re.compile(r"(?P<tx>\d+) packets transmitted, "
r"(?P<rx>\d+) packets received, "
r"((?P<dup>\d+) duplicates, )?"
r"(?P<packet_loss>\d+)% packet loss")
self.c_rtts = re.compile(r"round-trip min/avg/max = (?P<min>\d+.\d+)/"
r"(?P<avg>\d+.\d+)/(?P<max>\d+.\d+) ms")
@staticmethod @staticmethod
def _add_reply(res, reply): def _add_reply(res, reply):
reply["seq"] = int(reply["seq"]) reply["seq"] = int(reply["seq"])
@ -106,29 +120,18 @@ class GNRCICMPv6EchoParser(ShellInteractionParser):
8.839 8.839
""" """
res = {} res = {}
c_reply = re.compile(r"\d+ bytes from (?P<source>[0-9a-f:]+(%\S+)?): "
r"icmp_seq=(?P<seq>\d+) ttl=(?P<ttl>\d+)"
r"( rssi=(?P<rssi>-?\d+) dBm)?"
r"( time=(?P<rtt>\d+.\d+) ms)?"
r"(?P<dup> \(DUP\))?")
c_stats = re.compile(r"(?P<tx>\d+) packets transmitted, "
r"(?P<rx>\d+) packets received, "
r"((?P<dup>\d+) duplicates, )?"
r"(?P<packet_loss>\d+)% packet loss")
c_rtts = re.compile(r"round-trip min/avg/max = (?P<min>\d+.\d+)/"
r"(?P<avg>\d+.\d+)/(?P<max>\d+.\d+) ms")
for line in cmd_output.splitlines(): for line in cmd_output.splitlines():
if "stats" not in res: # If final stats were not found yet if "stats" not in res: # If final stats were not found yet
m = c_reply.match(line) m = self.c_reply.match(line)
if m is not None: if m is not None:
self._add_reply(res, m.groupdict()) self._add_reply(res, m.groupdict())
continue continue
m = c_stats.match(line) m = self.c_stats.match(line)
if m is not None: if m is not None:
self._set_stats(res, m.groupdict()) self._set_stats(res, m.groupdict())
continue continue
else: else:
m = c_rtts.match(line) m = self.c_rtts.match(line)
if m is not None: if m is not None:
self._set_rtts(res, m.groupdict()) self._set_rtts(res, m.groupdict())
return res return res
@ -164,6 +167,21 @@ class GNRCPktbufStatsResults(dict):
class GNRCPktbufStatsParser(ShellInteractionParser): class GNRCPktbufStatsParser(ShellInteractionParser):
def __init__(self):
self.c_init1 = re.compile(
r"packet buffer: first byte: 0x(?P<first_byte>[0-9A-Fa-f]+), "
r"last byte: 0x(?P<last_byte>[0-9A-Fa-f]+) "
r"\(size: +(?P<size>\d+)\)"
)
self.c_init2 = re.compile(r" position of last byte used: (\d+)")
self.c_unused = re.compile(
r"~ unused: 0x(?P<start>[0-9A-Fa-f]+) "
# flake reports r'\(', r'\)' as invalid escape sequence
# false positively
r"\(next: ""(0x(?P<next>[0-9A-Fa-f]+)|\(nil\)), " # noqa W605
r"size: +(?P<size>\d+)\) ~"
)
@staticmethod @staticmethod
def _init_res(first_byte, last_byte, size): def _init_res(first_byte, last_byte, size):
return GNRCPktbufStatsResults(( return GNRCPktbufStatsResults((
@ -220,30 +238,22 @@ class GNRCPktbufStatsParser(ShellInteractionParser):
>>> res["first_unused"]["size"] >>> res["first_unused"]["size"]
8192 8192
""" """
c_init1 = re.compile(r"packet buffer: "
r"first byte: 0x(?P<first_byte>[0-9A-Fa-f]+), "
r"last byte: 0x(?P<last_byte>[0-9A-Fa-f]+) "
r"\(size: +(?P<size>\d+)\)")
c_init2 = re.compile(r" position of last byte used: (\d+)")
c_unused = re.compile(r"~ unused: 0x(?P<start>[0-9A-Fa-f]+) "
r"\(next: (0x(?P<next>[0-9A-Fa-f]+)|\(nil\)), "
r"size: +(?P<size>\d+)\) ~")
res = None res = None
for line in cmd_output.splitlines(): for line in cmd_output.splitlines():
if res is None: if res is None:
m = c_init1.match(line) m = self.c_init1.match(line)
if m is not None: if m is not None:
res = self._init_res(**m.groupdict()) res = self._init_res(**m.groupdict())
# no sense in further parsing if we did not find the first line # no sense in further parsing if we did not find the first line
# yet. If we found it just continue parsing with next line # yet. If we found it just continue parsing with next line
continue continue
elif "last_byte_used" not in res: elif "last_byte_used" not in res:
m = c_init2.match(line) m = self.c_init2.match(line)
if m is not None: if m is not None:
self._set_last_byte_used(res, m.group(1)) self._set_last_byte_used(res, m.group(1))
continue continue
elif "first_unused" not in res: elif "first_unused" not in res:
m = c_unused.match(line) m = self.c_unused.match(line)
if m is not None: if m is not None:
self._set_first_unused(res, m.groupdict()) self._set_first_unused(res, m.groupdict())
if res is not None and "last_byte_used" not in res: if res is not None and "last_byte_used" not in res: