1
0
mirror of https://github.com/RIOT-OS/RIOT.git synced 2025-01-18 12:52:44 +01:00

dist/pythonlibs/riotctrl_shell: add loramac shell

This commit is contained in:
Francisco Molina 2021-04-27 17:13:10 +02:00
parent 1c3c2db812
commit 3fbd079bc3
No known key found for this signature in database
GPG Key ID: 3E94EAC3DBDEEDA8
2 changed files with 235 additions and 0 deletions

View File

@ -0,0 +1,94 @@
# Copyright (C) 2021 Inria
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
"""
Shell interactions related to Semtech Loramac package
Defines shell command interactions related to Semtech Loramac package
"""
import re
from riotctrl.shell import ShellInteraction
# ==== Parsers ====
class LoramacHelpParser():
def __init__(self):
self.eeprom_c = re.compile(r'Usage: loramac \<([\w+\|?]+)\>')
def has_eeprom(self, cmd_output):
for line in cmd_output.splitlines():
m = self.eeprom_c.search(line)
if m is not None:
if 'save' in m.group(1):
return True
return False
class LoramacUpLinkCounterParser():
def __init__(self):
self.uplink_c = re.compile(r'Uplink Counter: (\d+)')
def uplink_count(self, cmd_output):
for line in cmd_output.splitlines():
m = self.uplink_c.search(line)
if m is not None:
return m.group(1)
raise RuntimeError
# ==== ShellInteractions ====
class Loramac(ShellInteraction):
@ShellInteraction.check_term
def loramac_cmd(self, args=None, timeout=-1, async_=False):
cmd = "loramac"
if args is not None:
cmd += " {args}".format(args=" ".join(str(a) for a in args))
return self.cmd(cmd, timeout=timeout, async_=False)
def loramac_join(self, mode, timeout=-1, async_=False):
res = self.loramac_cmd(args=("join", mode),
timeout=timeout, async_=async_)
if "succeeded" in res:
return res
raise RuntimeError(res)
def loramac_tx(self, payload, cnf=False, port=2, timeout=-1, async_=False):
payload = '\"' + payload + '\"'
res = self.loramac_cmd(
args=("tx", payload, 'cnf' if cnf else "uncnf", port),
timeout=timeout, async_=async_
)
if "success" in res:
return res
raise RuntimeError(res)
def loramac_set(self, key, value, timeout=-1, async_=False):
res = self.loramac_cmd(args=("set", key, value),
timeout=timeout, async_=async_)
if "Usage" in res:
raise RuntimeError(res)
return res
def loramac_get(self, key, timeout=-1, async_=False):
res = self.loramac_cmd(args=("get", key),
timeout=timeout, async_=async_)
if "Usage" in res:
raise RuntimeError(res)
return res
def loramac_eeprom_save(self, timeout=-1, async_=False):
return self.loramac_cmd(args=("save",), timeout=timeout, async_=async_)
def loramac_eeprom_erase(self, timeout=-1, async_=False):
return self.loramac_cmd(args=("erase",), timeout=timeout,
async_=async_)
def loramac_help(self, timeout=-1, async_=False):
return self.loramac_cmd(args=("help",), timeout=timeout, async_=async_)

View File

@ -0,0 +1,141 @@
# Copyright (C) 2021 Inria
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
import pytest
import riotctrl_shell.loramac
from .common import init_ctrl
def test_loramac_join_otaa_failed():
rc = init_ctrl(output="""
loramac join otaa
Join procedure failed!
""")
shell = riotctrl_shell.loramac.Loramac(rc)
with pytest.raises(RuntimeError) as error:
shell.loramac_join('otaa')
assert "Join procedure failed!" in str(error.value)
def test_loramac_join_otaa_succeed():
rc = init_ctrl(output="""
loramac join otaa
Join procedure succeeded!
""")
shell = riotctrl_shell.loramac.Loramac(rc)
res = shell.loramac_join('otaa')
assert "Join procedure succeeded!" in res
def test_loramac_join_abp_succeed():
rc = init_ctrl(output="""
loramac join abp
Join procedure succeeded!
""")
shell = riotctrl_shell.loramac.Loramac(rc)
res = shell.loramac_join('abp')
assert "Join procedure succeeded!" in res
def test_loramac_tx_not_joined():
rc = init_ctrl(output="""
loramac tx "Hello RIOT!"
Cannot send: not joined
""")
shell = riotctrl_shell.loramac.Loramac(rc)
with pytest.raises(RuntimeError) as error:
shell.loramac_tx("Hello RIOT!")
assert "Cannot send: not joined" in str(error.value)
def test_loramac_tx_succeed():
rc = init_ctrl(output="""
loramac tx "Hello RIOT!"
Message sent with success
""")
shell = riotctrl_shell.loramac.Loramac(rc)
res = shell.loramac_tx("Hello RIOT!")
assert "Message sent with success" in res
def test_loramac_help_parser_has_eeprom_no():
rc = init_ctrl(output="""
loramac help
Usage: loramac <get|set|join|tx|link_check>
""")
shell = riotctrl_shell.loramac.Loramac(rc)
parser = riotctrl_shell.loramac.LoramacHelpParser()
assert not parser.has_eeprom(shell.loramac_help())
def test_loramac_help_parser_has_eeprom_yes():
rc = init_ctrl(output="""
loramac help
Usage: loramac <get|set|join|tx|link_check|save|erase>
""")
shell = riotctrl_shell.loramac.Loramac(rc)
parser = riotctrl_shell.loramac.LoramacHelpParser()
assert parser.has_eeprom(shell.loramac_help())
def test_loramac_set_fail():
rc = init_ctrl(output="""
loramac set dr 123
Usage: loramac set dr <0..16>
""")
shell = riotctrl_shell.loramac.Loramac(rc)
with pytest.raises(RuntimeError) as error:
shell.loramac_set('dr', 123)
assert "Usage: loramac set dr <0..16>" in str(error.value)
def test_loramac_set_succeed():
rc = init_ctrl(output="loramac set dr 5")
shell = riotctrl_shell.loramac.Loramac(rc)
shell.loramac_set('dr', 5)
assert rc.term.last_command == "loramac set dr 5"
def test_loramac_get_fail():
rc = init_ctrl(output="""
loramac get dxe
Usage: loramac get <deveui|appeui|appkey|appskey|nwkskey|devaddr|class|dr|adr|public|netid|tx_power|rx2_freq|rx2_dr|ul_cnt>
""") # noqa: E501
shell = riotctrl_shell.loramac.Loramac(rc)
with pytest.raises(RuntimeError) as error:
shell.loramac_get('dxe')
assert "Usage: loramac get" in str(error.value)
def test_loramac_get_succeed():
rc = init_ctrl(output="""
loramac get dr
DATARATE: 5
""")
shell = riotctrl_shell.loramac.Loramac(rc)
res = shell.loramac_get('dr')
assert "Usage: loramac get" not in res
def test_loramac_uplink_counter_parser():
rc = init_ctrl(output="""
loramac get ul_cnt
Uplink Counter: 32
""")
shell = riotctrl_shell.loramac.Loramac(rc)
parser = riotctrl_shell.loramac.LoramacUpLinkCounterParser()
assert parser.uplink_count(shell.loramac_get('ul_cnt')) == '32'
def test_loramac_uplink_counter_parser_fail():
rc = init_ctrl(output="""
loramac get ul_cnt
Uplink Counter: asdfasdf
""")
shell = riotctrl_shell.loramac.Loramac(rc)
parser = riotctrl_shell.loramac.LoramacUpLinkCounterParser()
with pytest.raises(RuntimeError):
parser.uplink_count(shell.loramac_get('ul_cnt'))