mirror of
https://github.com/RIOT-OS/RIOT.git
synced 2024-12-29 04:50:03 +01:00
Merge pull request #11406 from miri64/tools/enh/test-mixins
riotctrl_shell: initial import of shell interaction spawn
This commit is contained in:
commit
0ddb539d97
0
dist/pythonlibs/riotctrl_shell/__init__.py
vendored
Normal file
0
dist/pythonlibs/riotctrl_shell/__init__.py
vendored
Normal file
189
dist/pythonlibs/riotctrl_shell/gnrc.py
vendored
Normal file
189
dist/pythonlibs/riotctrl_shell/gnrc.py
vendored
Normal file
@ -0,0 +1,189 @@
|
||||
# Copyright (C) 2019-20 Freie Universität Berlin
|
||||
#
|
||||
# This file is subject to the terms and conditions of the GNU Lesser
|
||||
# General Public License v2.1. See the file LICENSE in the top level
|
||||
# directory for more details.
|
||||
|
||||
"""
|
||||
GNRC-related shell interactions
|
||||
|
||||
Defines GNRC-related shell command interactions
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
from riotctrl.shell import ShellInteraction, ShellInteractionParser
|
||||
|
||||
|
||||
# ==== Parsers ====
|
||||
|
||||
class GNRCICMPv6EchoParser(ShellInteractionParser):
|
||||
@staticmethod
|
||||
def _add_reply(res, reply):
|
||||
reply["seq"] = int(reply["seq"])
|
||||
reply["ttl"] = int(reply["ttl"])
|
||||
reply["rtt"] = float(reply["rtt"])
|
||||
if reply.get("rssi") is not None:
|
||||
reply["rssi"] = int(reply["rssi"])
|
||||
if "replies" in res:
|
||||
res["replies"].append(reply)
|
||||
else:
|
||||
res["replies"] = [reply]
|
||||
|
||||
@staticmethod
|
||||
def _set_stats(res, stats):
|
||||
stats["packet_loss"] = int(stats["packet_loss"])
|
||||
stats["rx"] = int(stats["rx"])
|
||||
stats["tx"] = int(stats["tx"])
|
||||
res["stats"] = stats
|
||||
|
||||
@staticmethod
|
||||
def _set_rtts(res, rtts):
|
||||
rtts["min"] = float(rtts["min"])
|
||||
rtts["avg"] = float(rtts["avg"])
|
||||
rtts["max"] = float(rtts["max"])
|
||||
res["rtts"] = rtts
|
||||
|
||||
def parse(self, cmd_output):
|
||||
res = {}
|
||||
c_reply = re.compile(r"\d+ bytes from (?P<source>[0-9a-f:]+): "
|
||||
r"icmp_seq=(?P<seq>\d+) ttl=(?P<ttl>\d+) "
|
||||
r"(rssi=(?P<rssi>-?\d+) dBm )?"
|
||||
r"time=(?P<rtt>\d+.\d+) ms")
|
||||
c_stats = re.compile(r"(?P<tx>\d+) packets transmitted, "
|
||||
r"(?P<rx>\d+) packets received, "
|
||||
r"(?P<packet_loss>\d+)% packet loss")
|
||||
c_rtts = re.compile(r"round-trip min/avg/max = (?P<min>\d+.\d+)/"
|
||||
r"(?P<avg>\d+.\d+)/(?P<max>\d+.\d+) ms")
|
||||
for line in cmd_output.splitlines():
|
||||
if "stats" not in res: # If final stats were not found yet
|
||||
m = c_reply.match(line)
|
||||
if m is not None:
|
||||
self._add_reply(res, m.groupdict())
|
||||
continue
|
||||
m = c_stats.match(line)
|
||||
if m is not None:
|
||||
self._set_stats(res, m.groupdict())
|
||||
continue
|
||||
else:
|
||||
m = c_rtts.match(line)
|
||||
if m is not None:
|
||||
self._set_rtts(res, m.groupdict())
|
||||
return res
|
||||
# Unable to parse RTTs, so something went wrong.
|
||||
return None
|
||||
|
||||
|
||||
class GNRCPktbufStatsResults(dict):
|
||||
def is_empty(self):
|
||||
"""
|
||||
Returns true if the packet buffer stats indicate that the packet buffer
|
||||
is empty
|
||||
"""
|
||||
if "first_byte" not in self or \
|
||||
"first_unused" not in self or \
|
||||
"start" not in self["first_unused"] or \
|
||||
"size" not in self["first_unused"]:
|
||||
raise ValueError("{} has no items 'first_byte' or 'first_unused' "
|
||||
"or 'first_unused' has no items 'start' or 'size'"
|
||||
.format(self))
|
||||
else:
|
||||
return (self["first_byte"] == self["first_unused"]["start"]) and \
|
||||
(self["size"] == self["first_unused"]["size"])
|
||||
|
||||
def fullest_capacity(self):
|
||||
"""
|
||||
Returns the packet buffer usage at its fullest capacity when the
|
||||
command was called
|
||||
"""
|
||||
if "last_byte_used" not in self or "size" not in self:
|
||||
raise ValueError("{} has no items 'last_byte_used' or 'size'"
|
||||
.format(self))
|
||||
else:
|
||||
return self["last_byte_used"] / self["size"]
|
||||
|
||||
|
||||
class GNRCPktbufStatsParser(ShellInteractionParser):
|
||||
@staticmethod
|
||||
def _init_res(first_byte, last_byte, size):
|
||||
return GNRCPktbufStatsResults((
|
||||
("first_byte", int(first_byte, base=16)),
|
||||
("last_byte", int(last_byte, base=16)),
|
||||
("size", int(size)),
|
||||
))
|
||||
|
||||
@staticmethod
|
||||
def _set_last_byte_used(res, last_byte_used):
|
||||
res["last_byte_used"] = int(last_byte_used)
|
||||
|
||||
@staticmethod
|
||||
def _set_first_unused(res, first_unused):
|
||||
res["first_unused"] = {
|
||||
"start": int(first_unused["start"], base=16)
|
||||
}
|
||||
if first_unused["next"] is not None:
|
||||
res["first_unused"]["next"] = int(first_unused["next"], base=16)
|
||||
if "next" not in res["first_unused"] or \
|
||||
not res["first_unused"]["next"]:
|
||||
res["first_unused"]["next"] = None
|
||||
res["first_unused"]["size"] = int(first_unused["size"])
|
||||
|
||||
def parse(self, cmd_output):
|
||||
c_init1 = re.compile(r"packet buffer: "
|
||||
r"first byte: 0x(?P<first_byte>[0-9A-Fa-f]+), "
|
||||
r"last byte: 0x(?P<last_byte>[0-9A-Fa-f]+) "
|
||||
r"\(size: +(?P<size>\d+)\)")
|
||||
c_init2 = re.compile(r" position of last byte used: (\d+)")
|
||||
c_unused = re.compile(r"~ unused: 0x(?P<start>[0-9A-Fa-f]+) "
|
||||
r"\(next: (0x(?P<next>[0-9A-Fa-f]+)|\(nil\)), "
|
||||
r"size: +(?P<size>\d+)\) ~")
|
||||
res = None
|
||||
for line in cmd_output.splitlines():
|
||||
if res is None:
|
||||
m = c_init1.match(line)
|
||||
if m is not None:
|
||||
res = self._init_res(**m.groupdict())
|
||||
# no sense in further parsing if we did not find the first line
|
||||
# yet. If we found it just continue parsing with next line
|
||||
continue
|
||||
elif "last_byte_used" not in res:
|
||||
m = c_init2.match(line)
|
||||
if m is not None:
|
||||
self._set_last_byte_used(res, m.group(1))
|
||||
continue
|
||||
elif "first_unused" not in res:
|
||||
m = c_unused.match(line)
|
||||
if m is not None:
|
||||
self._set_first_unused(res, m.groupdict())
|
||||
if res is not None and "last_byte_used" not in res:
|
||||
# Could not parse second line of header => something went wrong
|
||||
return None
|
||||
else:
|
||||
# Just return res (might be also None if first line of header was
|
||||
# not found)
|
||||
return res
|
||||
|
||||
|
||||
# ==== ShellInteractions ====
|
||||
|
||||
class GNRCICMPv6Echo(ShellInteraction):
|
||||
@ShellInteraction.check_term
|
||||
def ping6(self, hostname, count=3, interval=1000, packet_size=4,
|
||||
hop_limit=None, timeout=1000, async_=False):
|
||||
cmd = "ping6 {hostname} -c {count} -i {interval} " \
|
||||
"-s {packet_size} -W {timeout}" \
|
||||
.format(hostname=hostname, count=count, interval=interval,
|
||||
packet_size=packet_size, timeout=timeout)
|
||||
|
||||
if hop_limit is not None:
|
||||
cmd += " -h {hop_limit}".format(hop_limit=hop_limit)
|
||||
|
||||
# wait a second longer than all pings
|
||||
cmd_timeout = ((timeout / 1000) * count) + 1
|
||||
return self.cmd(cmd, timeout=cmd_timeout, async_=async_)
|
||||
|
||||
|
||||
class GNRCPktbufStats(ShellInteraction):
|
||||
@ShellInteraction.check_term
|
||||
def pktbuf_stats(self, timeout=-1, async_=False):
|
||||
return self.cmd("pktbuf", timeout=timeout, async_=async_)
|
1
dist/pythonlibs/riotctrl_shell/requirements.txt
vendored
Normal file
1
dist/pythonlibs/riotctrl_shell/requirements.txt
vendored
Normal file
@ -0,0 +1 @@
|
||||
git+ssh://git@github.com/RIOT-OS/riotctrl
|
37
dist/pythonlibs/riotctrl_shell/sys.py
vendored
Normal file
37
dist/pythonlibs/riotctrl_shell/sys.py
vendored
Normal file
@ -0,0 +1,37 @@
|
||||
# Copyright (C) 20 Freie Universität Berlin
|
||||
#
|
||||
# This file is subject to the terms and conditions of the GNU Lesser
|
||||
# General Public License v2.1. See the file LICENSE in the top level
|
||||
# directory for more details.
|
||||
|
||||
"""
|
||||
sys-related shell interactions
|
||||
|
||||
Defines sys-related shell command interactions
|
||||
"""
|
||||
|
||||
from riotctrl.shell import ShellInteraction
|
||||
|
||||
|
||||
class Help(ShellInteraction):
|
||||
"""Help ShellInteraction"""
|
||||
@ShellInteraction.check_term
|
||||
def help(self, timeout=-1, async_=False):
|
||||
"""Sends the reboot command via the terminal"""
|
||||
return self.cmd("help", timeout, async_)
|
||||
|
||||
|
||||
class Reboot(ShellInteraction):
|
||||
"""Reboot ShellInteraction"""
|
||||
@ShellInteraction.check_term
|
||||
def reboot(self, timeout=-1, async_=False):
|
||||
"""Sends the reboot command via the terminal"""
|
||||
return self.cmd("reboot", timeout, async_)
|
||||
|
||||
|
||||
class Version(ShellInteraction):
|
||||
"""Version ShellInteraction"""
|
||||
@ShellInteraction.check_term
|
||||
def version(self, timeout=-1, async_=False):
|
||||
"""Sends the reboot command via the terminal"""
|
||||
return self.cmd("version", timeout, async_)
|
0
dist/pythonlibs/riotctrl_shell/tests/__init__.py
vendored
Normal file
0
dist/pythonlibs/riotctrl_shell/tests/__init__.py
vendored
Normal file
34
dist/pythonlibs/riotctrl_shell/tests/common.py
vendored
Normal file
34
dist/pythonlibs/riotctrl_shell/tests/common.py
vendored
Normal file
@ -0,0 +1,34 @@
|
||||
# Copyright (C) 2020 Freie Universität Berlin
|
||||
#
|
||||
# This file is subject to the terms and conditions of the GNU Lesser
|
||||
# General Public License v2.1. See the file LICENSE in the top level
|
||||
# directory for more details.
|
||||
|
||||
|
||||
class MockSpawn():
|
||||
def __init__(self, *args, **kwargs):
|
||||
# set some expected attributes
|
||||
self.before = None
|
||||
self.echo = False
|
||||
|
||||
def sendline(self, line, *args, **kwargs):
|
||||
# just echo last input for before (what replwrap is assembling output
|
||||
# from)
|
||||
self.before = line
|
||||
|
||||
def expect_exact(self, *args, **kwargs):
|
||||
# always match on prompt with replwrap
|
||||
return 0
|
||||
|
||||
|
||||
class MockRIOTCtrl():
|
||||
"""
|
||||
Mock RIOT ctrl
|
||||
"""
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.term = MockSpawn()
|
||||
|
||||
|
||||
def init_ctrl():
|
||||
rc = MockRIOTCtrl("foobar", env={"BOARD": "native"})
|
||||
return rc
|
103
dist/pythonlibs/riotctrl_shell/tests/test_gnrc.py
vendored
Normal file
103
dist/pythonlibs/riotctrl_shell/tests/test_gnrc.py
vendored
Normal file
@ -0,0 +1,103 @@
|
||||
# Copyright (C) 2020 Freie Universität Berlin
|
||||
#
|
||||
# This file is subject to the terms and conditions of the GNU Lesser
|
||||
# General Public License v2.1. See the file LICENSE in the top level
|
||||
# directory for more details.
|
||||
|
||||
import logging
|
||||
import riotctrl_shell.gnrc
|
||||
|
||||
from .common import init_ctrl
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def test_ping6():
|
||||
rc = init_ctrl()
|
||||
si = riotctrl_shell.gnrc.GNRCICMPv6Echo(rc)
|
||||
res = si.ping6("::1", interval=100)
|
||||
# mock just returns last input
|
||||
assert "ping6" in res
|
||||
assert " ::1 " in res
|
||||
assert " -i 100 " in res
|
||||
|
||||
|
||||
def test_ping6_parser_success():
|
||||
parser = riotctrl_shell.gnrc.GNRCICMPv6EchoParser()
|
||||
ping_res = parser.parse("""
|
||||
12 bytes from ::1: icmp_seq=0 ttl=64 time=0.435 ms
|
||||
12 bytes from ::1: icmp_seq=1 ttl=64 time=0.433 ms
|
||||
12 bytes from ::1: icmp_seq=2 ttl=64 time=0.432 ms
|
||||
|
||||
--- ::1 PING statistics ---
|
||||
3 packets transmitted, 3 packets received, 0% packet loss
|
||||
round-trip min/avg/max = 0.432/0.433/0.435 ms""")
|
||||
assert ping_res is not None
|
||||
assert "rtts" in ping_res
|
||||
assert "avg" in ping_res["rtts"]
|
||||
|
||||
|
||||
def test_ping6_parser_empty():
|
||||
parser = riotctrl_shell.gnrc.GNRCICMPv6EchoParser()
|
||||
ping_res = parser.parse("")
|
||||
assert ping_res is None
|
||||
|
||||
|
||||
def test_ping6_parser_missing_rtts():
|
||||
parser = riotctrl_shell.gnrc.GNRCICMPv6EchoParser()
|
||||
ping_res = parser.parse("""
|
||||
12 bytes from ::1: icmp_seq=0 ttl=64 time=0.553 ms
|
||||
12 bytes from ::1: icmp_seq=1 ttl=64 time=0.496 ms
|
||||
12 bytes from ::1: icmp_seq=2 ttl=64 time=0.496 ms
|
||||
|
||||
--- ::1 PING statistics ---
|
||||
3 packets transmitted, 3 packets received, 0% packet loss""")
|
||||
assert ping_res is None
|
||||
|
||||
|
||||
def test_pktbuf():
|
||||
rc = init_ctrl()
|
||||
si = riotctrl_shell.gnrc.GNRCPktbufStats(rc)
|
||||
res = si.pktbuf_stats()
|
||||
# mock just returns last input
|
||||
assert res == "pktbuf"
|
||||
|
||||
|
||||
def test_pktbuf_parser_success_empty():
|
||||
parser = riotctrl_shell.gnrc.GNRCPktbufStatsParser()
|
||||
pktbuf_res = parser.parse("""
|
||||
packet buffer: first byte: 0x5660dce0, last byte: 0x5660fce0 (size: 8192)
|
||||
position of last byte used: 1792
|
||||
~ unused: 0x5660dce0 (next: (nil), size: 8192) ~""")
|
||||
assert pktbuf_res is not None
|
||||
assert pktbuf_res["first_byte"] > 0
|
||||
assert "start" in pktbuf_res["first_unused"]
|
||||
assert pktbuf_res.is_empty()
|
||||
|
||||
|
||||
def test_pktbuf_parser_success_not_empty():
|
||||
parser = riotctrl_shell.gnrc.GNRCPktbufStatsParser()
|
||||
pktbuf_res = parser.parse("""
|
||||
packet buffer: first byte: 0x5660dce0, last byte: 0x5660fce0 (size: 8192)
|
||||
position of last byte used: 1792
|
||||
~ unused: 0x5660de00 (next: (nil), size: 7904) ~""")
|
||||
assert pktbuf_res is not None
|
||||
assert pktbuf_res["first_byte"] > 0
|
||||
assert "start" in pktbuf_res["first_unused"]
|
||||
assert not pktbuf_res.is_empty()
|
||||
|
||||
|
||||
def test_pktbuf_parser_empty():
|
||||
parser = riotctrl_shell.gnrc.GNRCPktbufStatsParser()
|
||||
pktbuf_res = parser.parse("")
|
||||
assert pktbuf_res is None
|
||||
|
||||
|
||||
def test_pktbuf_parser_2nd_header_not_found():
|
||||
parser = riotctrl_shell.gnrc.GNRCPktbufStatsParser()
|
||||
pktbuf_res = parser.parse(
|
||||
"packet buffer: first byte: 0x5668ace0, last byte: 0x5668cce0 "
|
||||
"(size: 8192)"
|
||||
)
|
||||
assert pktbuf_res is None
|
33
dist/pythonlibs/riotctrl_shell/tests/test_sys.py
vendored
Normal file
33
dist/pythonlibs/riotctrl_shell/tests/test_sys.py
vendored
Normal file
@ -0,0 +1,33 @@
|
||||
# Copyright (C) 2020 Freie Universität Berlin
|
||||
#
|
||||
# This file is subject to the terms and conditions of the GNU Lesser
|
||||
# General Public License v2.1. See the file LICENSE in the top level
|
||||
# directory for more details.
|
||||
|
||||
import riotctrl_shell.sys
|
||||
|
||||
from .common import init_ctrl
|
||||
|
||||
|
||||
def test_help():
|
||||
rc = init_ctrl()
|
||||
si = riotctrl_shell.sys.Help(rc)
|
||||
res = si.help()
|
||||
# mock just returns last input
|
||||
assert res == "help"
|
||||
|
||||
|
||||
def test_reboot():
|
||||
rc = init_ctrl()
|
||||
si = riotctrl_shell.sys.Reboot(rc)
|
||||
res = si.reboot()
|
||||
# mock just returns last input
|
||||
assert res == "reboot"
|
||||
|
||||
|
||||
def test_version():
|
||||
rc = init_ctrl()
|
||||
si = riotctrl_shell.sys.Version(rc)
|
||||
res = si.version()
|
||||
# mock just returns last input
|
||||
assert res == "version"
|
20
dist/pythonlibs/riotctrl_shell/tox.ini
vendored
Normal file
20
dist/pythonlibs/riotctrl_shell/tox.ini
vendored
Normal file
@ -0,0 +1,20 @@
|
||||
[tox]
|
||||
envlist = test,flake8
|
||||
skipsdist = True
|
||||
|
||||
[testenv]
|
||||
commands =
|
||||
test: {[testenv:test]commands}
|
||||
flake8: {[testenv:flake8]commands}
|
||||
|
||||
[testenv:test]
|
||||
deps =
|
||||
pytest
|
||||
-rrequirements.txt
|
||||
commands =
|
||||
pytest -v
|
||||
|
||||
[testenv:flake8]
|
||||
deps = flake8
|
||||
commands =
|
||||
flake8 .
|
Loading…
Reference in New Issue
Block a user