1
0
mirror of https://github.com/RIOT-OS/RIOT.git synced 2024-12-29 04:50:03 +01:00

pythonlib/riotctrl_shell: reformat with black

This commit is contained in:
Alexandre Abadie 2021-09-29 13:15:46 +02:00
parent f2bb9242bb
commit b44652c489
No known key found for this signature in database
GPG Key ID: 1C919A403CAE1405
6 changed files with 263 additions and 199 deletions

View File

@ -16,46 +16,50 @@ from riotctrl.shell import ShellInteraction
class CongureTest(ShellInteraction):
@ShellInteraction.check_term
def setup(self, ident=0, timeout=-1, async_=False):
return self.cmd('cong_setup {ident}'.format(ident=ident),
timeout=timeout, async_=async_)
return self.cmd(
"cong_setup {ident}".format(ident=ident), timeout=timeout, async_=async_
)
@ShellInteraction.check_term
def clear(self, timeout=-1, async_=False):
return self.cmd('cong_clear', timeout=timeout, async_=async_)
return self.cmd("cong_clear", timeout=timeout, async_=async_)
@ShellInteraction.check_term
def init(self, ctx, timeout=-1, async_=False):
return self.cmd('cong_init 0x{ctx:x}'.format(ctx=ctx),
timeout=timeout, async_=async_)
return self.cmd(
"cong_init 0x{ctx:x}".format(ctx=ctx), timeout=timeout, async_=async_
)
@ShellInteraction.check_term
def inter_msg_interval(self, msg_size, timeout=-1, async_=False):
return self.cmd('cong_imi {msg_size}'.format(msg_size=msg_size),
timeout=timeout, async_=async_)
return self.cmd(
"cong_imi {msg_size}".format(msg_size=msg_size),
timeout=timeout,
async_=async_,
)
@ShellInteraction.check_term
def add_msg(self, send_time, size, resends, timeout=-1, async_=False):
return self.cmd(
'cong_add_msg {send_time} {size} {resends}'
.format(send_time=send_time, size=size, resends=resends)
"cong_add_msg {send_time} {size} {resends}".format(
send_time=send_time, size=size, resends=resends
)
)
@ShellInteraction.check_term
def msgs_reset(self, timeout=-1, async_=False):
return self.cmd('cong_msgs_reset')
return self.cmd("cong_msgs_reset")
@ShellInteraction.check_term
def report(self, cmd, *args, timeout=-1, async_=False):
args = ' '.join(str(a) for a in args)
return self.cmd('cong_report {cmd} {args}'.format(cmd=cmd, args=args))
args = " ".join(str(a) for a in args)
return self.cmd("cong_report {cmd} {args}".format(cmd=cmd, args=args))
def report_msg_sent(self, msg_size, timeout=-1, async_=False):
return self.report('msg_sent', msg_size,
timeout=timeout, async_=async_)
return self.report("msg_sent", msg_size, timeout=timeout, async_=async_)
def report_msg_discarded(self, msg_size, timeout=-1, async_=False):
return self.report('msg_discarded', msg_size,
timeout=timeout, async_=async_)
return self.report("msg_discarded", msg_size, timeout=timeout, async_=async_)
def _report_msgs_timeout_lost_base(self, cmd, timeout=-1, async_=False):
return self.report(cmd, timeout=timeout, async_=async_)
@ -64,48 +68,64 @@ class CongureTest(ShellInteraction):
tmp = None
for msg in msgs:
tmp = self.add_msg(**msg)
assert 'success' in tmp
res = self._report_msgs_timeout_lost_base(
cmd, timeout=timeout, async_=async_
)
assert "success" in tmp
res = self._report_msgs_timeout_lost_base(cmd, timeout=timeout, async_=async_)
return res
def report_msgs_timeout_base(self, timeout=-1, async_=False):
return self._report_msgs_timeout_lost_base(
'msgs_timeout', timeout=timeout, async_=async_
"msgs_timeout", timeout=timeout, async_=async_
)
def report_msgs_timeout(self, msgs, timeout=-1, async_=False):
return self._report_msgs_timeout_lost(
'msgs_timeout', msgs, timeout=timeout, async_=async_
"msgs_timeout", msgs, timeout=timeout, async_=async_
)
def report_msgs_lost_base(self, timeout=-1, async_=False):
return self._report_msgs_timeout_lost_base(
'msgs_lost', timeout=timeout, async_=async_
"msgs_lost", timeout=timeout, async_=async_
)
def report_msgs_lost(self, msgs, timeout=-1, async_=False):
return self._report_msgs_timeout_lost(
'msgs_lost', msgs, timeout=timeout, async_=async_
"msgs_lost", msgs, timeout=timeout, async_=async_
)
def report_msg_acked_base(self, ack_recv_time, ack_id, ack_size, ack_clean,
ack_wnd, ack_delay, timeout=-1, async_=False):
def report_msg_acked_base(
self,
ack_recv_time,
ack_id,
ack_size,
ack_clean,
ack_wnd,
ack_delay,
timeout=-1,
async_=False,
):
if isinstance(ack_clean, bool):
ack_clean = int(ack_clean)
return self.report('msg_acked', ack_recv_time, ack_id, ack_size,
ack_clean, ack_wnd, ack_delay,
timeout=-1, async_=False)
return self.report(
"msg_acked",
ack_recv_time,
ack_id,
ack_size,
ack_clean,
ack_wnd,
ack_delay,
timeout=-1,
async_=False,
)
def report_msg_acked(self, msg, ack, timeout=-1, async_=False):
tmp = self.add_msg(**msg)
assert 'success' in tmp
assert "success" in tmp
res = self.report_msg_acked_base(
**{'ack_{}'.format(k): v for k, v in ack.items()},
timeout=timeout, async_=async_
**{"ack_{}".format(k): v for k, v in ack.items()},
timeout=timeout,
async_=async_
)
return res
def report_ecn_ce(self, time, timeout=-1, async_=False):
return self.report('ecn_ce', time, timeout=timeout, async_=async_)
return self.report("ecn_ce", time, timeout=timeout, async_=async_)

View File

@ -17,13 +17,15 @@ from riotctrl.shell import ShellInteraction, ShellInteractionParser
# ==== Parsers ====
class CordEpRegistrationInfoParser(ShellInteractionParser):
class CordEpRegistrationInfoParser(ShellInteractionParser):
def __init__(self):
self.comps = {
"rdaddr": re.compile(r"RD address:\s+"
r"(?P<rdaddr>coaps?://"
r"\[[0-9a-f:]+(%\S+)?\](:\d+)?)"),
"rdaddr": re.compile(
r"RD address:\s+"
r"(?P<rdaddr>coaps?://"
r"\[[0-9a-f:]+(%\S+)?\](:\d+)?)"
),
"epname": re.compile(r"ep name:\s+(?P<epname>.+)$"),
"ltime": re.compile(r"lifetime:\s+(?P<ltime>\d+)s$"),
"regif": re.compile(r"reg if:\s+(?P<regif>\S+)$"),
@ -73,10 +75,8 @@ class CordEpRegistrationInfoParser(ShellInteractionParser):
class CordEpDiscoverParser(ShellInteractionParser):
def __init__(self):
self.comp = re.compile(r"the registration interface is"
r"\s+'(?P<regif>\S+)'$")
self.comp = re.compile(r"the registration interface is" r"\s+'(?P<regif>\S+)'$")
def parse(self, cmd_output):
"""
@ -117,8 +117,7 @@ class CordEp(ShellInteraction):
def cord_ep_info(self, timeout=-1, async_=False):
return self.cord_ep_cmd(self.INFO, None, timeout, async_)
def cord_ep_register(self, uri, regif=None,
timeout=-1, async_=False):
def cord_ep_register(self, uri, regif=None, timeout=-1, async_=False):
args = [uri]
if regif is not None:
args.append(regif)

View File

@ -17,22 +17,29 @@ from riotctrl.shell import ShellInteraction, ShellInteractionParser
# ==== Parsers ====
class GNRCICMPv6EchoParser(ShellInteractionParser):
def __init__(self):
self.c_reply = re.compile(r"\d+ bytes from "
r"(?P<source>[0-9a-f:]+(%\S+)?): "
r"icmp_seq=(?P<seq>\d+) ttl=(?P<ttl>\d+)"
r"( corrupted at offset (?P<corrupted>\d+))?"
r"( truncated by (?P<truncated>\d+) byte)?"
r"( rssi=(?P<rssi>-?\d+) dBm)?"
r"( time=(?P<rtt>\d+.\d+) ms)?"
r"(?P<dup> \(DUP\))?")
self.c_stats = re.compile(r"(?P<tx>\d+) packets transmitted, "
r"(?P<rx>\d+) packets received, "
r"((?P<dup>\d+) duplicates, )?"
r"(?P<packet_loss>\d+)% packet loss")
self.c_rtts = re.compile(r"round-trip min/avg/max = (?P<min>\d+.\d+)/"
r"(?P<avg>\d+.\d+)/(?P<max>\d+.\d+) ms")
self.c_reply = re.compile(
r"\d+ bytes from "
r"(?P<source>[0-9a-f:]+(%\S+)?): "
r"icmp_seq=(?P<seq>\d+) ttl=(?P<ttl>\d+)"
r"( corrupted at offset (?P<corrupted>\d+))?"
r"( truncated by (?P<truncated>\d+) byte)?"
r"( rssi=(?P<rssi>-?\d+) dBm)?"
r"( time=(?P<rtt>\d+.\d+) ms)?"
r"(?P<dup> \(DUP\))?"
)
self.c_stats = re.compile(
r"(?P<tx>\d+) packets transmitted, "
r"(?P<rx>\d+) packets received, "
r"((?P<dup>\d+) duplicates, )?"
r"(?P<packet_loss>\d+)% packet loss"
)
self.c_rtts = re.compile(
r"round-trip min/avg/max = (?P<min>\d+.\d+)/"
r"(?P<avg>\d+.\d+)/(?P<max>\d+.\d+) ms"
)
@staticmethod
def _add_reply(res, reply):
@ -153,12 +160,14 @@ class GNRCICMPv6EchoParser(ShellInteractionParser):
class GNRCIPv6NIBNeighShowParser(ShellInteractionParser):
def __init__(self):
self.c_neigh = re.compile(r"(?P<ipv6_addr>[0-9a-f:]+)\s+"
r"dev\s+#(?P<iface>\d+)\s+"
r"lladdr\s+(?P<l2addr>[0-9A-F:]+)?"
r"(\s+(?P<router>router))?"
r"(\s+((?P<nud_state>[A-Z_]+)|-))?"
r"(\s+(?P<ar_state>[A-Z_]+))?$")
self.c_neigh = re.compile(
r"(?P<ipv6_addr>[0-9a-f:]+)\s+"
r"dev\s+#(?P<iface>\d+)\s+"
r"lladdr\s+(?P<l2addr>[0-9A-F:]+)?"
r"(\s+(?P<router>router))?"
r"(\s+((?P<nud_state>[A-Z_]+)|-))?"
r"(\s+(?P<ar_state>[A-Z_]+))?$"
)
def parse(self, cmd_output):
"""
@ -195,8 +204,7 @@ class GNRCIPv6NIBNeighShowParser(ShellInteractionParser):
for line in cmd_output.splitlines():
m = self.c_neigh.search(line)
if m is not None:
res.append({k: v for k, v in m.groupdict().items()
if v is not None})
res.append({k: v for k, v in m.groupdict().items() if v is not None})
res[-1]["iface"] = int(res[-1]["iface"])
if "router" in res[-1]:
res[-1]["router"] = True
@ -205,11 +213,13 @@ class GNRCIPv6NIBNeighShowParser(ShellInteractionParser):
class GNRCIPv6NIBPrefixShowParser(ShellInteractionParser):
def __init__(self):
self.c_prefix = re.compile(r"(?P<prefix>[0-9a-f:]+)/"
r"(?P<prefix_len>\d+)\s+"
r"dev\s+#(?P<iface>\d+)"
r"(\s+expires (?P<valid_sec>\d+) sec)?"
r"(\s+deprecates (?P<pref_sec>\d+) sec)?$")
self.c_prefix = re.compile(
r"(?P<prefix>[0-9a-f:]+)/"
r"(?P<prefix_len>\d+)\s+"
r"dev\s+#(?P<iface>\d+)"
r"(\s+expires (?P<valid_sec>\d+) sec)?"
r"(\s+deprecates (?P<pref_sec>\d+) sec)?$"
)
def parse(self, cmd_output):
"""
@ -247,11 +257,13 @@ class GNRCIPv6NIBPrefixShowParser(ShellInteractionParser):
class GNRCIPv6NIBRouteShowParser(ShellInteractionParser):
def __init__(self):
self.c_route = re.compile(r"(?P<route>default(?P<primary>\*)?|"
r"(?P<prefix>[0-9a-f:]+)/"
r"(?P<prefix_len>\d+))\s+"
r"(via\s+(?P<next_hop>[0-9a-f:]+)\s+)?"
r"dev\s+#(?P<iface>\d+)$")
self.c_route = re.compile(
r"(?P<route>default(?P<primary>\*)?|"
r"(?P<prefix>[0-9a-f:]+)/"
r"(?P<prefix_len>\d+))\s+"
r"(via\s+(?P<next_hop>[0-9a-f:]+)\s+)?"
r"dev\s+#(?P<iface>\d+)$"
)
def parse(self, cmd_output):
"""
@ -312,11 +324,17 @@ class GNRCIPv6NIBRouteShowParser(ShellInteractionParser):
m = self.c_route.search(line)
if m is not None:
fte = {k: v for k, v in m.groupdict().items() if v is not None}
fte['iface'] = int(fte['iface'])
if "prefix" in fte and fte["prefix"] is not None and \
"prefix_len" in fte and fte["prefix_len"] is not None:
fte["route"] = {"prefix": fte["prefix"],
"prefix_len": int(fte["prefix_len"])}
fte["iface"] = int(fte["iface"])
if (
"prefix" in fte
and fte["prefix"] is not None
and "prefix_len" in fte
and fte["prefix_len"] is not None
):
fte["route"] = {
"prefix": fte["prefix"],
"prefix_len": int(fte["prefix_len"]),
}
elif fte["route"].startswith("default"):
fte["route"] = {"default": True}
else:
@ -334,9 +352,11 @@ class GNRCIPv6NIBRouteShowParser(ShellInteractionParser):
class GNRCIPv6NIBABRShowParser(ShellInteractionParser):
def __init__(self):
self.c_abr = re.compile(r"(?P<addr>[0-9a-f:]+)\s+"
r"v(?P<version>\d+)\s+expires\s+"
r"(?P<valid_min>\d+)min$")
self.c_abr = re.compile(
r"(?P<addr>[0-9a-f:]+)\s+"
r"v(?P<version>\d+)\s+expires\s+"
r"(?P<valid_min>\d+)min$"
)
def parse(self, cmd_output):
"""
@ -372,16 +392,20 @@ class GNRCPktbufStatsResults(dict):
Returns true if the packet buffer stats indicate that the packet buffer
is empty
"""
if "first_byte" not in self or \
"first_unused" not in self or \
"start" not in self["first_unused"] or \
"size" not in self["first_unused"]:
raise ValueError("{} has no items 'first_byte' or 'first_unused' "
"or 'first_unused' has no items 'start' or 'size'"
.format(self))
if (
"first_byte" not in self
or "first_unused" not in self
or "start" not in self["first_unused"]
or "size" not in self["first_unused"]
):
raise ValueError(
"{} has no items 'first_byte' or 'first_unused' "
"or 'first_unused' has no items 'start' or 'size'".format(self)
)
else:
return (self["first_byte"] == self["first_unused"]["start"]) and \
(self["size"] == self["first_unused"]["size"])
return (self["first_byte"] == self["first_unused"]["start"]) and (
self["size"] == self["first_unused"]["size"]
)
def fullest_capacity(self):
"""
@ -389,8 +413,7 @@ class GNRCPktbufStatsResults(dict):
command was called
"""
if "last_byte_used" not in self or "size" not in self:
raise ValueError("{} has no items 'last_byte_used' or 'size'"
.format(self))
raise ValueError("{} has no items 'last_byte_used' or 'size'".format(self))
else:
return self["last_byte_used"] / self["size"]
@ -413,11 +436,13 @@ class GNRCPktbufStatsParser(ShellInteractionParser):
@staticmethod
def _init_res(first_byte, last_byte, size):
return GNRCPktbufStatsResults((
("first_byte", int(first_byte, base=16)),
("last_byte", int(last_byte, base=16)),
("size", int(size)),
))
return GNRCPktbufStatsResults(
(
("first_byte", int(first_byte, base=16)),
("last_byte", int(last_byte, base=16)),
("size", int(size)),
)
)
@staticmethod
def _set_last_byte_used(res, last_byte_used):
@ -425,13 +450,10 @@ class GNRCPktbufStatsParser(ShellInteractionParser):
@staticmethod
def _set_first_unused(res, first_unused):
res["first_unused"] = {
"start": int(first_unused["start"], base=16)
}
res["first_unused"] = {"start": int(first_unused["start"], base=16)}
if first_unused["next"] is not None:
res["first_unused"]["next"] = int(first_unused["next"], base=16)
if "next" not in res["first_unused"] or \
not res["first_unused"]["next"]:
if "next" not in res["first_unused"] or not res["first_unused"]["next"]:
res["first_unused"]["next"] = None
res["first_unused"]["size"] = int(first_unused["size"])
@ -496,14 +518,29 @@ class GNRCPktbufStatsParser(ShellInteractionParser):
# ==== ShellInteractions ====
class GNRCICMPv6Echo(ShellInteraction):
@ShellInteraction.check_term
def ping6(self, hostname, count=3, interval=1000, packet_size=4,
hop_limit=None, timeout=1000, async_=False):
cmd = "ping6 {hostname} -c {count} -i {interval} " \
"-s {packet_size} -W {timeout}" \
.format(hostname=hostname, count=count, interval=interval,
packet_size=packet_size, timeout=timeout)
def ping6(
self,
hostname,
count=3,
interval=1000,
packet_size=4,
hop_limit=None,
timeout=1000,
async_=False,
):
cmd = (
"ping6 {hostname} -c {count} -i {interval} "
"-s {packet_size} -W {timeout}".format(
hostname=hostname,
count=count,
interval=interval,
packet_size=packet_size,
timeout=timeout,
)
)
if hop_limit is not None:
cmd += " -h {hop_limit}".format(hop_limit=hop_limit)
@ -521,14 +558,12 @@ class GNRCIPv6NIB(ShellInteraction):
@ShellInteraction.check_term
def nib_cmd(self, cmd, args=None, timeout=-1, async_=False):
return self.cmd(self._create_cmd(cmd, args),
timeout=timeout, async_=async_)
return self.cmd(self._create_cmd(cmd, args), timeout=timeout, async_=async_)
def nib_neigh_show(self, iface=None, timeout=-1, async_=False):
return self._nib_show(self.NEIGH, iface, timeout, async_)
def nib_neigh_add(self, iface, ipv6_addr, l2addr=None,
timeout=-1, async_=False):
def nib_neigh_add(self, iface, ipv6_addr, l2addr=None, timeout=-1, async_=False):
args = [iface, ipv6_addr]
if l2addr:
args.append(l2addr)
@ -540,8 +575,9 @@ class GNRCIPv6NIB(ShellInteraction):
def nib_prefix_show(self, iface=None, timeout=-1, async_=False):
return self._nib_show(self.PREFIX, iface, timeout, async_)
def nib_prefix_add(self, iface, prefix, valid_sec=None, pref_sec=None,
timeout=-1, async_=False):
def nib_prefix_add(
self, iface, prefix, valid_sec=None, pref_sec=None, timeout=-1, async_=False
):
if valid_sec is None and pref_sec is not None:
raise ValueError("pref_sec provided with no valid_sec")
args = [iface, prefix]
@ -557,8 +593,9 @@ class GNRCIPv6NIB(ShellInteraction):
def nib_route_show(self, iface=None, timeout=-1, async_=False):
return self._nib_show(self.ROUTE, iface, timeout, async_)
def nib_route_add(self, iface, prefix, next_hop, ltime_sec=None,
timeout=-1, async_=False):
def nib_route_add(
self, iface, prefix, next_hop, ltime_sec=None, timeout=-1, async_=False
):
args = [iface, prefix, next_hop]
if ltime_sec:
args.append(int(ltime_sec))

View File

@ -17,22 +17,22 @@ from riotctrl.shell import ShellInteraction
# ==== Parsers ====
class LoramacHelpParser():
class LoramacHelpParser:
def __init__(self):
self.eeprom_c = re.compile(r'Usage: loramac \<([\w+\|?]+)\>')
self.eeprom_c = re.compile(r"Usage: loramac \<([\w+\|?]+)\>")
def has_eeprom(self, cmd_output):
for line in cmd_output.splitlines():
m = self.eeprom_c.search(line)
if m is not None:
if 'save' in m.group(1):
if "save" in m.group(1):
return True
return False
class LoramacUpLinkCounterParser():
class LoramacUpLinkCounterParser:
def __init__(self):
self.uplink_c = re.compile(r'Uplink Counter: (\d+)')
self.uplink_c = re.compile(r"Uplink Counter: (\d+)")
def uplink_count(self, cmd_output):
for line in cmd_output.splitlines():
@ -41,6 +41,7 @@ class LoramacUpLinkCounterParser():
return m.group(1)
raise RuntimeError
# ==== ShellInteractions ====
@ -53,32 +54,30 @@ class Loramac(ShellInteraction):
return self.cmd(cmd, timeout=timeout, async_=False)
def loramac_join(self, mode, timeout=-1, async_=False):
res = self.loramac_cmd(args=("join", mode),
timeout=timeout, async_=async_)
res = self.loramac_cmd(args=("join", mode), timeout=timeout, async_=async_)
if "succeeded" in res:
return res
raise RuntimeError(res)
def loramac_tx(self, payload, cnf=False, port=2, timeout=-1, async_=False):
payload = '\"' + payload + '\"'
payload = '"' + payload + '"'
res = self.loramac_cmd(
args=("tx", payload, 'cnf' if cnf else "uncnf", port),
timeout=timeout, async_=async_
)
args=("tx", payload, "cnf" if cnf else "uncnf", port),
timeout=timeout,
async_=async_,
)
if "success" in res:
return res
raise RuntimeError(res)
def loramac_set(self, key, value, timeout=-1, async_=False):
res = self.loramac_cmd(args=("set", key, value),
timeout=timeout, async_=async_)
res = self.loramac_cmd(args=("set", key, value), timeout=timeout, async_=async_)
if "Usage" in res:
raise RuntimeError(res)
return res
def loramac_get(self, key, timeout=-1, async_=False):
res = self.loramac_cmd(args=("get", key),
timeout=timeout, async_=async_)
res = self.loramac_cmd(args=("get", key), timeout=timeout, async_=async_)
if "Usage" in res:
raise RuntimeError(res)
return res
@ -87,8 +86,7 @@ class Loramac(ShellInteraction):
return self.loramac_cmd(args=("save",), timeout=timeout, async_=async_)
def loramac_eeprom_erase(self, timeout=-1, async_=False):
return self.loramac_cmd(args=("erase",), timeout=timeout,
async_=async_)
return self.loramac_cmd(args=("erase",), timeout=timeout, async_=async_)
def loramac_help(self, timeout=-1, async_=False):
return self.loramac_cmd(args=("help",), timeout=timeout, async_=async_)

View File

@ -17,6 +17,7 @@ from riotctrl.shell import ShellInteraction, ShellInteractionParser
# ==== Parsers ====
class IfconfigListParser(ShellInteractionParser):
def __init__(self):
self.iface_c = re.compile(r"Iface\s+(?P<name>\S+)\s")
@ -25,18 +26,20 @@ class IfconfigListParser(ShellInteractionParser):
# e.g. for MCS: 1 (BPSK, rate 1/2, 2x frequency repetition) MTU :1280
# "1 (BPSK, rate 1/2, 2x frequency repetition)" belongs to the option
# value, "MTU" does not
self.option_c = re.compile(r"^(?P<option>[^:]+):\s?"
r"(?P<value>\S+(\s\S+)*)$")
self.option_c = re.compile(r"^(?P<option>[^:]+):\s?" r"(?P<value>\S+(\s\S+)*)$")
# options are evaluated before flags, so all options that don't contain
# colons are flags
self.flag_c = re.compile(r"^(?P<flag>[^:]+)$")
self.ipv6_c = re.compile(r"inet6 (?P<type>addr|group): "
r"(?P<addr>[0-9a-f:]+)(\s+"
r"scope:\s+(?P<scope>\S+)"
r"(?P<anycast>\s+\[anycast\])?\s+"
r"(?P<state>\S+))?$")
self.bl_header_c = re.compile(r"(?P<mode>White|Black)-listed "
r"link layer addresses:")
self.ipv6_c = re.compile(
r"inet6 (?P<type>addr|group): "
r"(?P<addr>[0-9a-f:]+)(\s+"
r"scope:\s+(?P<scope>\S+)"
r"(?P<anycast>\s+\[anycast\])?\s+"
r"(?P<state>\S+))?$"
)
self.bl_header_c = re.compile(
r"(?P<mode>White|Black)-listed " r"link layer addresses:"
)
self.bl_c = re.compile(r"\d+: (?P<addr>[0-9a-f]{2}(:[0-9a-f]{2})*)$")
def parse(self, cmd_output):
@ -71,7 +74,7 @@ class IfconfigListParser(ShellInteractionParser):
netifs = {}
current = netifs[name] = {}
# Go ahead in line to not confuse options parser
line = line[m.end():]
line = line[m.end() :]
offset += m.end() + 1
if current is not None:
# XXX checking for IPv4 address might also go here
@ -90,9 +93,11 @@ class IfconfigListParser(ShellInteractionParser):
current["whitelist"].append(m.group("addr"))
else:
parse_blacklist = False
elif not parse_blacklist and \
"blacklist" not in current and \
"whitelist" not in current:
elif (
not parse_blacklist
and "blacklist" not in current
and "whitelist" not in current
):
m = self.bl_header_c.search(line)
if m is not None:
if m.group("mode") == "Black":
@ -184,7 +189,7 @@ class IfconfigListParser(ShellInteractionParser):
if m is not None:
addr = m.groupdict()
typ = addr.pop("type")
if typ == "addr": # unicast address
if typ == "addr": # unicast address
# reformat anycast item if existent
if addr.get("anycast") is None:
addr.pop("anycast", None)
@ -194,7 +199,7 @@ class IfconfigListParser(ShellInteractionParser):
netif["ipv6_addrs"].append(addr)
else:
netif["ipv6_addrs"] = [addr]
else: # multicast address
else: # multicast address
for key in set(addr):
# remove empty matches
if addr[key] is None:
@ -210,13 +215,17 @@ class IfconfigListParser(ShellInteractionParser):
class IfconfigStatsParser(ShellInteractionParser):
def __init__(self):
self.header_c = re.compile(r"Statistics for (?P<module>.+)$")
self.rx_c = re.compile(r"RX packets\s+(?P<packets>\d+)\s+"
r"bytes\s+(?P<bytes>\d+)$")
self.tx_c = re.compile(r"TX packets\s+(?P<packets>\d+)\s+"
r"\(Multicast:\s+(?P<multicast>\d+)\)\s+"
r"bytes\s+(?P<bytes>\d+)$")
self.tx_err_c = re.compile(r"TX succeeded\s+(?P<succeeded>\d+)\s+"
r"errors\s+(?P<errors>\d+)$")
self.rx_c = re.compile(
r"RX packets\s+(?P<packets>\d+)\s+" r"bytes\s+(?P<bytes>\d+)$"
)
self.tx_c = re.compile(
r"TX packets\s+(?P<packets>\d+)\s+"
r"\(Multicast:\s+(?P<multicast>\d+)\)\s+"
r"bytes\s+(?P<bytes>\d+)$"
)
self.tx_err_c = re.compile(
r"TX succeeded\s+(?P<succeeded>\d+)\s+" r"errors\s+(?P<errors>\d+)$"
)
def parse(self, cmd_output):
"""
@ -259,13 +268,11 @@ class IfconfigStatsParser(ShellInteractionParser):
if "rx" not in current:
m = self.rx_c.search(line)
if m is not None:
current["rx"] = {k: int(v)
for k, v in m.groupdict().items()}
current["rx"] = {k: int(v) for k, v in m.groupdict().items()}
elif "tx" not in current:
m = self.tx_c.search(line)
if m is not None:
current["tx"] = {k: int(v)
for k, v in m.groupdict().items()}
current["tx"] = {k: int(v) for k, v in m.groupdict().items()}
elif "tx" in current:
m = self.tx_err_c.search(line)
if m is not None:
@ -277,6 +284,7 @@ class IfconfigStatsParser(ShellInteractionParser):
# ==== ShellInteractions ====
class Ifconfig(ShellInteraction):
def ifconfig_list(self, netif=None, timeout=-1, async_=False):
return self.ifconfig_cmd(netif=netif, timeout=timeout, async_=async_)
@ -293,81 +301,82 @@ class Ifconfig(ShellInteraction):
return self.cmd(cmd, timeout=timeout, async_=False)
def ifconfig_help(self, netif, timeout=-1, async_=False):
return self.ifconfig_cmd(netif=netif, args=("help",),
timeout=timeout, async_=async_)
return self.ifconfig_cmd(
netif=netif, args=("help",), timeout=timeout, async_=async_
)
def ifconfig_set(self, netif, key, value, timeout=-1, async_=False):
return self._ifconfig_success_cmd(netif=netif,
args=("set", key, value),
timeout=timeout, async_=async_)
return self._ifconfig_success_cmd(
netif=netif, args=("set", key, value), timeout=timeout, async_=async_
)
def ifconfig_up(self, netif, timeout=-1, async_=False):
self._ifconfig_error_cmd(netif=netif, args=("up",),
timeout=timeout, async_=async_)
self._ifconfig_error_cmd(
netif=netif, args=("up",), timeout=timeout, async_=async_
)
def ifconfig_down(self, netif, timeout=-1, async_=False):
self._ifconfig_error_cmd(netif=netif, args=("down",),
timeout=timeout, async_=async_)
self._ifconfig_error_cmd(
netif=netif, args=("down",), timeout=timeout, async_=async_
)
def ifconfig_add(self, netif, addr, anycast=False,
timeout=-1, async_=False):
def ifconfig_add(self, netif, addr, anycast=False, timeout=-1, async_=False):
args = ["add", addr]
if anycast:
args.append("anycast")
return self._ifconfig_success_cmd(netif=netif, args=args,
timeout=timeout, async_=async_)
return self._ifconfig_success_cmd(
netif=netif, args=args, timeout=timeout, async_=async_
)
def ifconfig_del(self, netif, addr, timeout=-1, async_=False):
return self._ifconfig_success_cmd(netif=netif, args=("del", addr),
timeout=timeout, async_=async_)
def ifconfig_flag(self, netif, flag, enable=True,
timeout=-1, async_=False):
return self._ifconfig_success_cmd(
netif=netif, args=("{}{}".format("" if enable else "-", flag),),
timeout=timeout, async_=async_
netif=netif, args=("del", addr), timeout=timeout, async_=async_
)
def ifconfig_flag(self, netif, flag, enable=True, timeout=-1, async_=False):
return self._ifconfig_success_cmd(
netif=netif,
args=("{}{}".format("" if enable else "-", flag),),
timeout=timeout,
async_=async_,
)
def ifconfig_l2filter_add(self, netif, addr, timeout=-1, async_=False):
return self._ifconfig_success_cmd(
netif=netif, args=("l2filter", "add", addr),
timeout=timeout, async_=async_
netif=netif, args=("l2filter", "add", addr), timeout=timeout, async_=async_
)
def ifconfig_l2filter_del(self, netif, addr, timeout=-1, async_=False):
return self._ifconfig_success_cmd(
netif=netif, args=("l2filter", "del", addr),
timeout=timeout, async_=async_
netif=netif, args=("l2filter", "del", addr), timeout=timeout, async_=async_
)
def ifconfig_stats(self, netif, module, timeout=-1, async_=False):
res = self.ifconfig_cmd(netif=netif, args=("stats", module),
timeout=timeout, async_=async_)
res = self.ifconfig_cmd(
netif=netif, args=("stats", module), timeout=timeout, async_=async_
)
if "Statistics for " in res:
return res
raise RuntimeError(res)
def ifconfig_stats_reset(self, netif, module, timeout=-1, async_=False):
res = self.ifconfig_cmd(netif=netif, args=("stats", module, "reset"),
timeout=timeout, async_=async_)
res = self.ifconfig_cmd(
netif=netif, args=("stats", module, "reset"), timeout=timeout, async_=async_
)
if "Reset statistics for module " in res:
return res
raise RuntimeError(res)
def _ifconfig_success_cmd(self, netif=None, args=None,
timeout=-1, async_=False):
def _ifconfig_success_cmd(self, netif=None, args=None, timeout=-1, async_=False):
"""For commands that have a success output"""
res = self.ifconfig_cmd(netif=netif, args=args,
timeout=timeout, async_=async_)
res = self.ifconfig_cmd(netif=netif, args=args, timeout=timeout, async_=async_)
if "success" in res:
return res
raise RuntimeError(res)
def _ifconfig_error_cmd(self, netif=None, args=None,
timeout=-1, async_=False):
def _ifconfig_error_cmd(self, netif=None, args=None, timeout=-1, async_=False):
"""For commands that only have an error output"""
res = self.ifconfig_cmd(netif=netif, args=args,
timeout=timeout, async_=async_)
res = self.ifconfig_cmd(netif=netif, args=args, timeout=timeout, async_=async_)
if "error" in res:
raise RuntimeError(res)
@ -376,9 +385,7 @@ class TXTSnd(ShellInteraction):
@ShellInteraction.check_term
def netif_txtsnd(self, netif, target, data, timeout=-1, async_=False):
cmd = "txtsnd {netif} {target} {data}".format(
netif=netif,
target=target,
data=data
netif=netif, target=target, data=data
)
res = self.cmd(cmd)
if "error" in res or "usage" in res:

View File

@ -15,6 +15,7 @@ from riotctrl.shell import ShellInteraction
class Help(ShellInteraction):
"""Help ShellInteraction"""
@ShellInteraction.check_term
def help(self, timeout=-1, async_=False):
"""Sends the reboot command via the terminal"""
@ -23,6 +24,7 @@ class Help(ShellInteraction):
class Reboot(ShellInteraction):
"""Reboot ShellInteraction"""
@ShellInteraction.check_term
def reboot(self, timeout=-1, async_=False):
"""Sends the reboot command via the terminal"""
@ -31,6 +33,7 @@ class Reboot(ShellInteraction):
class Version(ShellInteraction):
"""Version ShellInteraction"""
@ShellInteraction.check_term
def version(self, timeout=-1, async_=False):
"""Sends the reboot command via the terminal"""