1
0
mirror of https://github.com/RIOT-OS/RIOT.git synced 2025-01-18 12:52:44 +01:00

Merge pull request #17353 from benpicco/tests/gnrc_rpl

tests/gnrc_rpl: add automated test for gnrc_rpl
This commit is contained in:
benpicco 2022-03-22 12:10:34 +01:00 committed by GitHub
commit dd8b3fc593
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 283 additions and 0 deletions

31
tests/gnrc_rpl/Makefile Normal file
View File

@ -0,0 +1,31 @@
include ../Makefile.tests_common
USEMODULE += auto_init_gnrc_netif
USEMODULE += auto_init_gnrc_rpl
USEMODULE += gnrc_ipv6_router_default
USEMODULE += gnrc_icmpv6_echo
USEMODULE += gnrc_rpl
USEMODULE += shell
USEMODULE += shell_commands
# automated test only works on native
TEST_ON_CI_WHITELIST += native
ifeq (native, $(BOARD))
USEMODULE += socket_zep
USEMODULE += socket_zep_hello
USEMODULE += netdev
TERMFLAGS += -z 127.0.0.1:17754 # Murdock has no IPv6 support
else
USEMODULE += netdev_default
endif
.PHONY: host-tools
host-tools:
$(Q)env -u CC -u CFLAGS $(MAKE) -C $(RIOTTOOLS)
TERMDEPS += host-tools
include $(RIOTBASE)/Makefile.include

View File

@ -0,0 +1,41 @@
BOARD_INSUFFICIENT_MEMORY := \
arduino-duemilanove \
arduino-leonardo \
arduino-mega2560 \
arduino-nano \
arduino-uno \
atmega328p \
atmega328p-xplained-mini \
atmega1284p \
atxmega-a3bu-xplained \
bluepill-stm32f030c8 \
derfmega128 \
i-nucleo-lrwan1 \
mega-xplained \
microduino-corerf \
msb-430 \
msb-430h \
nucleo-f030r8 \
nucleo-f031k6 \
nucleo-f042k6 \
nucleo-f070rb \
nucleo-f072rb \
nucleo-f103rb \
nucleo-f302r8 \
nucleo-f303k8 \
nucleo-f334r8 \
nucleo-l011k4 \
nucleo-l031k6 \
nucleo-l053r8 \
samd10-xmini \
slstk3400a \
stk3200 \
stm32f030f4-demo \
stm32f0discovery \
stm32g0316-disco \
stm32l0538-disco \
telosb \
waspmote-pro \
z1 \
zigduino \
#

35
tests/gnrc_rpl/main.c Normal file
View File

@ -0,0 +1,35 @@
/*
* Copyright (C) 2020 HAW
*
* This file is subject to the terms and conditions of the GNU Lesser
* General Public License v2.1. See the file LICENSE in the top level
* directory for more details.
*/
/**
* @ingroup tests
* @{
*
* @file
* @brief Compile test for gnrc_rpl
*
* @author Cenk Gündoğan <mail+dev@gundogan.net>
*
* @}
*/
#include "shell.h"
#include "msg.h"
#define MAIN_QUEUE_SIZE (8)
static msg_t _main_msg_queue[MAIN_QUEUE_SIZE];
int main(void)
{
char line_buf[SHELL_DEFAULT_BUFSIZE];
msg_init_queue(_main_msg_queue, MAIN_QUEUE_SIZE);
shell_run(NULL, line_buf, SHELL_DEFAULT_BUFSIZE);
return 0;
}

176
tests/gnrc_rpl/tests/01-run.py Executable file
View File

@ -0,0 +1,176 @@
#!/usr/bin/env python3
# Copyright (C) 2021 Benjamin Valentin
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
import subprocess
import time
from subprocess import Popen
from riotctrl_shell.gnrc import GNRCICMPv6Echo, GNRCICMPv6EchoParser
from riotctrl_shell.netif import Ifconfig
from riotctrl.ctrl import RIOTCtrlBoardFactory
from riotctrl_ctrl import native
from riotctrl_shell.netif import IfconfigListParser
PARSERS = {
"ping6": GNRCICMPv6EchoParser(),
"ifconfig": IfconfigListParser(),
}
class RIOTCtrlAppFactory(RIOTCtrlBoardFactory):
def __init__(self):
super().__init__(board_cls={
'native': native.NativeRIOTCtrl,
})
self.ctrl_list = list()
def __enter__(self):
return self
def __exit__(self, *exc):
for ctrl in self.ctrl_list:
ctrl.stop_term()
def get_shell(self, application_directory='.', env={'BOARD': 'native'}):
# retrieve a RIOTCtrl Object
ctrl = super().get_ctrl(
env=env,
application_directory=application_directory
)
# append ctrl to list
self.ctrl_list.append(ctrl)
# start terminal
ctrl.start_term()
# return ctrl with started terminal
return Shell(ctrl)
def get_shells(self, num=1):
terms = []
for i in range(num):
terms.append(self.get_shell())
return terms
class Shell(Ifconfig, GNRCICMPv6Echo):
pass
def first_netif_and_addr_by_scope(ifconfig_out, scope):
netifs = PARSERS["ifconfig"].parse(ifconfig_out)
key = next(iter(netifs))
netif = netifs[key]
return (
key,
[addr["addr"] for addr in netif["ipv6_addrs"] if addr["scope"] == scope][0],
)
def global_addr(ifconfig_out):
return first_netif_and_addr_by_scope(ifconfig_out, "global")
def test_linear_topology(factory, zep_dispatch):
# linear topology with 4 nodes
topology = ("A B\n"
"B C\n"
"C D\n")
zep_dispatch.stdin.write(topology.encode())
zep_dispatch.stdin.close()
# create native instances
nodes = factory.get_shells(4)
A = nodes[0]
D = nodes[3]
# add prefix to root node
A.cmd("nib prefix add 5 2001:db8::/32")
A.cmd("ifconfig 5 add 2001:db8::1/32")
A.cmd("rpl root 0 2001:db8::1")
root_addr = global_addr(A.ifconfig_list())[1]
# wait for the creation of the DODAG
time.sleep(10)
# ping root node from last node
parser = GNRCICMPv6EchoParser()
result = parser.parse(D.ping6(root_addr))
# assert packetloss is under 10%"))
assert result['stats']['packet_loss'] < 10
# assert at least one responder
assert result['stats']['rx'] > 0
# 2 intermediate hops, 64 - 2
assert result['replies'][0]['ttl'] == 62
# terminate nodes
for n in nodes:
n.stop_term()
def test_alternative_route(factory, zep_dispatch):
# topology with 5 nodes with alternative routes
# A
# / \
# B ― C
# | |
# D ― E
topology = ("A B\n"
"A C\n"
"B C\n"
"B D\n"
"C E\n"
"D E\n")
zep_dispatch.stdin.write(topology.encode())
zep_dispatch.stdin.close()
# create native instances
nodes = factory.get_shells(5)
A = nodes[0]
D = nodes[3]
# add prefix to root node
A.cmd("nib prefix add 5 2001:db8::/32")
A.cmd("ifconfig 5 add 2001:db8::1/32")
A.cmd("rpl root 0 2001:db8::1")
root_addr = global_addr(A.ifconfig_list())[1]
# wait for the creation of the DODAG
time.sleep(10)
# ping root node from last node
parser = GNRCICMPv6EchoParser()
result = parser.parse(D.ping6(root_addr))
# assert packetloss is under 10%"))
assert result['stats']['packet_loss'] < 10
# assert at least one responder
assert result['stats']['rx'] > 0
# 1 intermediate hop, 64 - 1
assert result['replies'][0]['ttl'] == 63
# terminate nodes
for n in nodes:
n.stop_term()
def run_test(func, factory):
with Popen(['../../dist/tools/zep_dispatch/bin/zep_dispatch',
'-t', '-', '127.0.0.1', '17754'], stdin=subprocess.PIPE) as zep_dispatch:
try:
func(factory, zep_dispatch)
finally:
zep_dispatch.terminate()
if __name__ == "__main__":
with RIOTCtrlAppFactory() as factory:
run_test(test_linear_topology, factory)
run_test(test_alternative_route, factory)