mirror of
https://github.com/RIOT-OS/RIOT.git
synced 2024-12-29 04:50:03 +01:00
tests: provide tests for emcute
This commit is contained in:
parent
3e7ddcdd4c
commit
5507b1b4a2
38
tests/emcute/Makefile
Normal file
38
tests/emcute/Makefile
Normal file
@ -0,0 +1,38 @@
|
||||
include ../Makefile.tests_common
|
||||
|
||||
RIOTBASE ?= $(CURDIR)/../..
|
||||
|
||||
export TAP ?= tap0
|
||||
|
||||
# use Ethernet as link-layer protocol
|
||||
ifeq (native,$(BOARD))
|
||||
TERMFLAGS ?= $(TAP)
|
||||
else
|
||||
ETHOS_BAUDRATE ?= 115200
|
||||
CFLAGS += -DETHOS_BAUDRATE=$(ETHOS_BAUDRATE)
|
||||
TERMDEPS += ethos
|
||||
TERMPROG ?= sudo $(RIOTTOOLS)/ethos/ethos
|
||||
TERMFLAGS ?= $(TAP) $(PORT) $(ETHOS_BAUDRATE)
|
||||
endif
|
||||
USEMODULE += auto_init_gnrc_netif
|
||||
USEMODULE += gnrc_ipv6_default
|
||||
USEMODULE += gnrc_sock_udp
|
||||
USEMODULE += emcute
|
||||
USEMODULE += od
|
||||
USEMODULE += shell
|
||||
USEMODULE += shell_commands
|
||||
USEMODULE += sock_util
|
||||
|
||||
CFLAGS += -DEMCUTE_TOPIC_MAXLEN="249" # 256 - 7
|
||||
CFLAGS += -DSTDIO_UART_RX_BUFSIZE="512" # Adapt to SHELL_BUFSIZE in app
|
||||
|
||||
# The test requires some setup and to be run as root
|
||||
# So it cannot currently be run
|
||||
TEST_ON_CI_BLACKLIST += all
|
||||
|
||||
.PHONY: ethos
|
||||
|
||||
ethos:
|
||||
$(Q)env -u CC -u CFLAGS make -C $(RIOTTOOLS)/ethos
|
||||
|
||||
include $(RIOTBASE)/Makefile.include
|
6
tests/emcute/Makefile.board.dep
Normal file
6
tests/emcute/Makefile.board.dep
Normal file
@ -0,0 +1,6 @@
|
||||
# Put board specific dependencies here
|
||||
ifeq (native,$(BOARD))
|
||||
USEMODULE += netdev_tap
|
||||
else
|
||||
USEMODULE += stdio_ethos
|
||||
endif
|
56
tests/emcute/Makefile.ci
Normal file
56
tests/emcute/Makefile.ci
Normal file
@ -0,0 +1,56 @@
|
||||
BOARD_INSUFFICIENT_MEMORY := \
|
||||
airfy-beacon \
|
||||
arduino-duemilanove \
|
||||
arduino-leonardo \
|
||||
arduino-mega2560 \
|
||||
arduino-nano \
|
||||
arduino-uno \
|
||||
atmega1284p \
|
||||
atmega328p \
|
||||
b-l072z-lrwan1 \
|
||||
blackpill-128kib \
|
||||
blackpill \
|
||||
bluepill-128kib \
|
||||
bluepill \
|
||||
calliope-mini \
|
||||
cc2650-launchpad \
|
||||
cc2650stk \
|
||||
chronos \
|
||||
derfmega128 \
|
||||
hifive1 \
|
||||
hifive1b \
|
||||
i-nucleo-lrwan1 \
|
||||
lsn50 \
|
||||
maple-mini \
|
||||
mega-xplained \
|
||||
microbit \
|
||||
microduino-corerf \
|
||||
msb-430 \
|
||||
msb-430h \
|
||||
nrf51dongle \
|
||||
nrf6310 \
|
||||
nucleo-f030r8 \
|
||||
nucleo-f031k6 \
|
||||
nucleo-f042k6 \
|
||||
nucleo-f070rb \
|
||||
nucleo-f072rb \
|
||||
nucleo-f103rb \
|
||||
nucleo-f303k8 \
|
||||
nucleo-f334r8 \
|
||||
nucleo-l031k6 \
|
||||
nucleo-l053r8 \
|
||||
nucleo-l073rz \
|
||||
opencm904 \
|
||||
saml10-xpro \
|
||||
saml11-xpro \
|
||||
spark-core \
|
||||
stm32f030f4-demo \
|
||||
stm32f0discovery \
|
||||
stm32l0538-disco \
|
||||
telosb \
|
||||
waspmote-pro \
|
||||
wsn430-v1_3b \
|
||||
wsn430-v1_4 \
|
||||
yunjia-nrf51822 \
|
||||
z1 \
|
||||
#
|
8
tests/emcute/README.md
Normal file
8
tests/emcute/README.md
Normal file
@ -0,0 +1,8 @@
|
||||
# Overview
|
||||
|
||||
This is a test application for emcute. It is supposed to be run with the test
|
||||
scripts in `tests/`:
|
||||
|
||||
```
|
||||
BOARD="<your choice> make flash test"
|
||||
```
|
367
tests/emcute/main.c
Normal file
367
tests/emcute/main.c
Normal file
@ -0,0 +1,367 @@
|
||||
/*
|
||||
* Copyright (C) 2019 Freie Universität Berlin
|
||||
*
|
||||
* This file is subject to the terms and conditions of the GNU Lesser
|
||||
* General Public License v2.1. See the file LICENSE in the top level
|
||||
* directory for more details.
|
||||
*/
|
||||
|
||||
/**
|
||||
* @ingroup tests
|
||||
* @{
|
||||
*
|
||||
* @file
|
||||
* @brief emcute MQTT-SN test application
|
||||
*
|
||||
* @author Martine Sophie Lenders <m.lenders@fu-berlin.de>
|
||||
*
|
||||
* @}
|
||||
*/
|
||||
|
||||
#include <stdbool.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
|
||||
#include "net/emcute.h"
|
||||
#include "net/mqttsn.h"
|
||||
#include "net/ipv6/addr.h"
|
||||
#include "od.h"
|
||||
#include "shell.h"
|
||||
#include "thread.h"
|
||||
#include "net/sock/util.h"
|
||||
|
||||
/* get to maximum length for client ID ;-)*/
|
||||
#define EMCUTE_ID "emcute test app ......."
|
||||
#define EMCUTE_PRIO (THREAD_PRIORITY_MAIN - 1)
|
||||
|
||||
#define NUMOFTOPS (4U)
|
||||
#define SHELL_BUFSIZE (512U) /* for sub with long topic */
|
||||
|
||||
static char _emcute_stack[THREAD_STACKSIZE_DEFAULT];
|
||||
static char _shell_buffer[SHELL_BUFSIZE];
|
||||
static uint8_t _pub_buf[EMCUTE_BUFSIZE];
|
||||
|
||||
static emcute_topic_t _topics[NUMOFTOPS];
|
||||
static emcute_sub_t _subscriptions[NUMOFTOPS];
|
||||
static char _topic_names[NUMOFTOPS][EMCUTE_TOPIC_MAXLEN + 1];
|
||||
static char _addr_str[IPV6_ADDR_MAX_STR_LEN];
|
||||
|
||||
static sock_udp_ep_t _gw = { .family = AF_INET6 };
|
||||
|
||||
static int _con(int argc, char **argv);
|
||||
static int _discon(int argc, char **argv);
|
||||
static int _reg(int argc, char **argv);
|
||||
static int _pub(int argc, char **argv);
|
||||
static int _sub(int argc, char **argv);
|
||||
static int _unsub(int argc, char **argv);
|
||||
static int _will(int argc, char **argv);
|
||||
static int _info(int argc, char **argv);
|
||||
|
||||
static const shell_command_t _shell_commands[] = {
|
||||
{ "con", "connect to a MQTT-SN broker", _con },
|
||||
{ "discon", "disconnect from current broker", _discon },
|
||||
{ "reg", "register to a topic", _reg },
|
||||
{ "pub", "publish a number of bytes under a topic", _pub },
|
||||
{ "sub", "subscribe to a topic", _sub },
|
||||
{ "unsub", "unsubscribe from a topic", _unsub },
|
||||
{ "will", "register a last will", _will },
|
||||
{ "info", "print client state", _info },
|
||||
{ NULL, NULL, NULL },
|
||||
};
|
||||
|
||||
static void *_emcute_thread(void *arg)
|
||||
{
|
||||
(void)arg;
|
||||
emcute_run(MQTTSN_DEFAULT_PORT, EMCUTE_ID);
|
||||
return NULL; /* should never be reached */
|
||||
}
|
||||
|
||||
static unsigned _get_qos(const char *str)
|
||||
{
|
||||
int qos = atoi(str);
|
||||
switch (qos) {
|
||||
case 1: return EMCUTE_QOS_1;
|
||||
case 2: return EMCUTE_QOS_2;
|
||||
default: return EMCUTE_QOS_0;
|
||||
}
|
||||
}
|
||||
|
||||
static void _on_pub(const emcute_topic_t *topic, void *data, size_t len)
|
||||
{
|
||||
(void)data;
|
||||
printf("### got publication of %u bytes for topic '%s' [%d] ###\n",
|
||||
(unsigned)len, topic->name, (int)topic->id);
|
||||
}
|
||||
|
||||
static int _con(int argc, char **argv)
|
||||
{
|
||||
char *topic = NULL;
|
||||
char *message = NULL;
|
||||
size_t len = 0;
|
||||
|
||||
if (argc < 2) {
|
||||
printf("usage %s <addr> [<will topic> <will msg>]\n",
|
||||
argv[0]);
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (sock_udp_str2ep(&_gw, argv[1]) != 0) {
|
||||
puts("error: unable to parse gateway address");
|
||||
_gw.port = 0;
|
||||
return 1;
|
||||
}
|
||||
if (_gw.port == 0) {
|
||||
_gw.port = MQTTSN_DEFAULT_PORT;
|
||||
}
|
||||
if (argc >= 4) {
|
||||
topic = argv[2];
|
||||
message = argv[3];
|
||||
len = strlen(message);
|
||||
}
|
||||
|
||||
if (emcute_con(&_gw, true, topic, message, len, 0) != EMCUTE_OK) {
|
||||
printf("error: unable to connect to %s\n", argv[1]);
|
||||
_gw.port = 0;
|
||||
return 1;
|
||||
}
|
||||
printf("success: connected to gateway at %s\n", argv[1]);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int _discon(int argc, char **argv)
|
||||
{
|
||||
(void)argc;
|
||||
(void)argv;
|
||||
|
||||
int res = emcute_discon();
|
||||
if (res == EMCUTE_NOGW) {
|
||||
puts("error: not connected to any broker");
|
||||
return 1;
|
||||
}
|
||||
else if (res != EMCUTE_OK) {
|
||||
puts("error: unable to disconnect");
|
||||
return 1;
|
||||
}
|
||||
puts("success: disconnect successful");
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int _topic_name_find(const char *name)
|
||||
{
|
||||
int res = -1;
|
||||
|
||||
for (unsigned i = 0; i < NUMOFTOPS; i++) {
|
||||
if ((_topic_names[i][0] == '\0') && (res < 0)) {
|
||||
res = i;
|
||||
}
|
||||
else if (strncmp(name, _topic_names[i], EMCUTE_TOPIC_MAXLEN) == 0) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
static int _reg(int argc, char **argv)
|
||||
{
|
||||
emcute_topic_t *t;
|
||||
int idx;
|
||||
bool was_set = false;
|
||||
|
||||
if (argc < 2) {
|
||||
printf("usage: %s <topic name>\n", argv[0]);
|
||||
return 1;
|
||||
}
|
||||
|
||||
idx = _topic_name_find(argv[1]);
|
||||
if (idx < 0) {
|
||||
puts("error: no space left to register");
|
||||
return 1;
|
||||
}
|
||||
if (_topic_names[idx][0] != '\0') {
|
||||
was_set = true;
|
||||
}
|
||||
else {
|
||||
strncpy(_topic_names[idx], argv[1], EMCUTE_TOPIC_MAXLEN);
|
||||
}
|
||||
t = &_topics[idx];
|
||||
t->name = _topic_names[idx];
|
||||
if (emcute_reg(t) != EMCUTE_OK) {
|
||||
puts("error: unable to obtain topic ID");
|
||||
if (was_set) {
|
||||
_topic_names[idx][0] = '\0';
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
printf("success: registered to topic '%s [%d]'\n", t->name, t->id);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int _pub(int argc, char **argv)
|
||||
{
|
||||
unsigned flags = EMCUTE_QOS_0;
|
||||
int len;
|
||||
emcute_topic_t *t;
|
||||
int idx;
|
||||
|
||||
if (argc < 3) {
|
||||
printf("usage: %s <topic name> <data_len> [QoS level]\n", argv[0]);
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (argc >= 4) {
|
||||
flags |= _get_qos(argv[3]);
|
||||
}
|
||||
|
||||
idx = _topic_name_find(argv[1]);
|
||||
if ((idx < 0) || !(_topics[idx].name)) {
|
||||
puts("error: topic not registered");
|
||||
return 1;
|
||||
}
|
||||
t = &_topics[idx];
|
||||
len = atoi(argv[2]);
|
||||
if ((unsigned)len > sizeof(_pub_buf)) {
|
||||
printf("error: len %d > %lu\n", len, (unsigned long)sizeof(_pub_buf));
|
||||
return 1;
|
||||
}
|
||||
memset(_pub_buf, 92, len);
|
||||
if (emcute_pub(t, _pub_buf, len, flags) != EMCUTE_OK) {
|
||||
printf("error: unable to publish data to topic '%s [%d]'\n",
|
||||
t->name, (int)t->id);
|
||||
return 1;
|
||||
}
|
||||
|
||||
printf("success: published %d bytes to topic '%s [%d]'\n",
|
||||
(int)len, t->name, t->id);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int _sub(int argc, char **argv)
|
||||
{
|
||||
unsigned flags = EMCUTE_QOS_0;
|
||||
int idx;
|
||||
bool was_set = false;
|
||||
|
||||
if (argc < 2) {
|
||||
printf("usage: %s <topic name> [QoS level]\n", argv[0]);
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (strlen(argv[1]) > EMCUTE_TOPIC_MAXLEN) {
|
||||
puts("error: topic name exceeds maximum possible size");
|
||||
return 1;
|
||||
}
|
||||
if (argc >= 3) {
|
||||
flags |= _get_qos(argv[2]);
|
||||
}
|
||||
|
||||
idx = _topic_name_find(argv[1]);
|
||||
if (idx < 0) {
|
||||
puts("error: no space to subscribe");
|
||||
}
|
||||
|
||||
_subscriptions[idx].cb = _on_pub;
|
||||
if (_topic_names[idx][0] != '\0') {
|
||||
was_set = true;
|
||||
}
|
||||
else {
|
||||
strncpy(_topic_names[idx], argv[1], EMCUTE_TOPIC_MAXLEN);
|
||||
}
|
||||
_subscriptions[idx].topic.name = _topic_names[idx];
|
||||
if (emcute_sub(&_subscriptions[idx], flags) != EMCUTE_OK) {
|
||||
printf("error: unable to subscribe to %s\n", argv[1]);
|
||||
if (was_set) {
|
||||
_topic_names[idx][0] = '\0';
|
||||
}
|
||||
memset(&_subscriptions[idx], 0, sizeof(emcute_sub_t));
|
||||
return 1;
|
||||
}
|
||||
|
||||
printf("success: now subscribed to %s\n", argv[1]);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int _unsub(int argc, char **argv)
|
||||
{
|
||||
int idx;
|
||||
|
||||
if (argc < 2) {
|
||||
printf("usage %s <topic name>\n", argv[0]);
|
||||
return 1;
|
||||
}
|
||||
|
||||
idx = _topic_name_find(argv[1]);
|
||||
if ((idx < 0) || !_subscriptions[idx].topic.name) {
|
||||
printf("error: no subscription for topic '%s' found\n", argv[1]);
|
||||
}
|
||||
else if (emcute_unsub(&_subscriptions[idx]) != EMCUTE_OK) {
|
||||
printf("error: Unsubscription form '%s' failed\n", argv[1]);
|
||||
}
|
||||
else {
|
||||
memset(&_subscriptions[idx], 0, sizeof(emcute_sub_t));
|
||||
printf("success: unsubscribed from '%s'\n", argv[1]);
|
||||
return 0;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int _will(int argc, char **argv)
|
||||
{
|
||||
if (argc < 3) {
|
||||
printf("usage %s <will topic name> <will message content>\n", argv[0]);
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (emcute_willupd_topic(argv[1], 0) != EMCUTE_OK) {
|
||||
puts("error: unable to update the last will topic");
|
||||
return 1;
|
||||
}
|
||||
if (emcute_willupd_msg(argv[2], strlen(argv[2])) != EMCUTE_OK) {
|
||||
puts("error: unable to update the last will message");
|
||||
return 1;
|
||||
}
|
||||
|
||||
puts("success: updated last will topic and message");
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int _info(int argc, char **argv)
|
||||
{
|
||||
(void)argc;
|
||||
(void)argv;
|
||||
if (_gw.port > 0) {
|
||||
printf("Broker: '[%s]:%u'\n",
|
||||
ipv6_addr_to_str(_addr_str, (ipv6_addr_t *)_gw.addr.ipv6,
|
||||
sizeof(_addr_str)), _gw.port);
|
||||
puts("- Topics:");
|
||||
for (unsigned i = 0; i < NUMOFTOPS; i++) {
|
||||
if (_topics[i].name) {
|
||||
printf(" %2u: %s\n", _topics[i].id,
|
||||
_topics[i].name);
|
||||
}
|
||||
}
|
||||
puts("- Subscriptions:");
|
||||
for (unsigned i = 0; i < NUMOFTOPS; i++) {
|
||||
if (_subscriptions[i].topic.name) {
|
||||
printf(" %2u: %s\n", _subscriptions[i].topic.id,
|
||||
_subscriptions[i].topic.name);
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int main(void)
|
||||
{
|
||||
puts("success: starting test application");
|
||||
/* start the emcute thread */
|
||||
thread_create(_emcute_stack, sizeof(_emcute_stack), EMCUTE_PRIO, 0,
|
||||
_emcute_thread, NULL, "emcute");
|
||||
/* start shell */
|
||||
shell_run(_shell_commands, _shell_buffer, sizeof(_shell_buffer));
|
||||
return 0;
|
||||
}
|
482
tests/emcute/tests/01-run.py
Executable file
482
tests/emcute/tests/01-run.py
Executable file
@ -0,0 +1,482 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Copyright (C) 2018 Freie Universität Berlin
|
||||
#
|
||||
# This file is subject to the terms and conditions of the GNU Lesser
|
||||
# General Public License v2.1. See the file LICENSE in the top level
|
||||
# directory for more details.
|
||||
|
||||
import os
|
||||
import pprint
|
||||
import random
|
||||
import re
|
||||
import socket
|
||||
import sys
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
from scapy.all import Automaton, ATMT, log_runtime, MTU, raw, SimpleSocket
|
||||
from scapy.contrib import mqttsn
|
||||
from testrunner import run, utils
|
||||
|
||||
TEST_INTERACTIVE_RETRIES = int(os.environ.get('TEST_INTERACTIVE_RETRIES') or 5)
|
||||
TEST_INTERACTIVE_DELAY = int(os.environ.get('TEST_INTERACTIVE_DELAY') or 1)
|
||||
|
||||
SERVER_PORT = 1883
|
||||
MODES = set(["pub", "sub", "sub_w_reg"])
|
||||
TIMEOUT = 1
|
||||
|
||||
|
||||
class MQTTSNServer(Automaton):
|
||||
class MQTTSNServerSocket(SimpleSocket):
|
||||
def __init__(self, server, *args, **kwargs):
|
||||
super(MQTTSNServer.MQTTSNServerSocket, self)\
|
||||
.__init__(*args, **kwargs)
|
||||
self.server = server
|
||||
|
||||
def recv(self, x=MTU):
|
||||
pkt, sa = self.ins.recvfrom(x)
|
||||
self.server.last_remote = sa
|
||||
return mqttsn.MQTTSN(pkt)
|
||||
|
||||
def send(self, x):
|
||||
assert self.server.last_remote is not None
|
||||
try:
|
||||
sx = raw(x)
|
||||
x.sent_time = time.time()
|
||||
self.outs.sendto(sx, self.server.last_remote)
|
||||
except socket.error as msg:
|
||||
log_runtime.error(msg)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
kwargs["ll"] = MQTTSNServer.MQTTSNServerSocket
|
||||
kwargs["recvsock"] = MQTTSNServer.MQTTSNServerSocket
|
||||
self.last_remote = None
|
||||
super(MQTTSNServer, self).__init__(*args, **kwargs)
|
||||
|
||||
def parse_args(self, spawn, bind_addr, topic_name, mode, pub_interval,
|
||||
qos_level=0,
|
||||
data_len_start=1, data_len_end=1000, data_len_step=1,
|
||||
bind_port=SERVER_PORT, family=socket.AF_INET,
|
||||
type=socket.SOCK_DGRAM, proto=0, *args, **kwargs):
|
||||
assert mode in MODES
|
||||
super(MQTTSNServer, self).parse_args(*args, **kwargs)
|
||||
self.spawn = spawn
|
||||
self.topic_name = topic_name
|
||||
self.mode = mode
|
||||
self.pub_interval = pub_interval
|
||||
self.qos_level = qos_level
|
||||
self.data_len = data_len_start
|
||||
self.data_len_end = data_len_end
|
||||
self.data_len_step = data_len_step
|
||||
self.last_mid = random.randint(0, 0xffff)
|
||||
self.topics = []
|
||||
self.registered_topics = []
|
||||
self.subscriptions = []
|
||||
self.res = ""
|
||||
|
||||
sock = socket.socket(family, type, proto)
|
||||
res = socket.getaddrinfo(bind_addr, bind_port)
|
||||
sockaddr = res[0][4]
|
||||
sock.bind(sockaddr)
|
||||
self.gw_addr = "[{}]:{}".format(sockaddr[0], sockaddr[1])
|
||||
self.socket_kargs = {"sock": sock, "server": self}
|
||||
|
||||
# >>> private properties and methods <<< #
|
||||
@property
|
||||
def _qos_flags(self):
|
||||
qos = min(self.qos_level, 2)
|
||||
if qos < 0:
|
||||
qos = mqttsn.QOS_NEG1
|
||||
return qos
|
||||
|
||||
def _check_pkt_qos(self, pkt):
|
||||
qos_types = [mqttsn.PUBLISH, mqttsn.SUBSCRIBE]
|
||||
return (pkt.type not in qos_types) or (pkt.qos == self._qos_flags)
|
||||
|
||||
def _get_tid(self, topic_name):
|
||||
if topic_name not in self.topics:
|
||||
self.topics.append(topic_name)
|
||||
return self.topics.index(topic_name) + 1
|
||||
|
||||
def _get_topic_name(self, tid):
|
||||
return self.topics[tid - 1]
|
||||
|
||||
# >>> automaton states <<< #
|
||||
@ATMT.state(initial=1)
|
||||
def BEGIN(self):
|
||||
utils.test_utils_interactive_sync(self.spawn,
|
||||
TEST_INTERACTIVE_RETRIES,
|
||||
TEST_INTERACTIVE_DELAY)
|
||||
raise self.CONNECT_FROM_NODE()
|
||||
|
||||
@ATMT.state()
|
||||
def CONNECT_FROM_NODE(self):
|
||||
self.spawn.sendline("con {}".format(self.gw_addr))
|
||||
raise self.WAITING(mqttsn.CONNECT)
|
||||
|
||||
@ATMT.state()
|
||||
def REGISTER_FROM_NODE(self):
|
||||
self.spawn.sendline("reg {}".format(self.topic_name))
|
||||
raise self.WAITING(mqttsn.REGISTER)
|
||||
|
||||
@ATMT.state()
|
||||
def PUBLISH_FROM_NODE(self, topic_name):
|
||||
if self.data_len < self.data_len_end:
|
||||
self.spawn.sendline("pub {} {:d} {:d}" .format(topic_name,
|
||||
self.data_len,
|
||||
self.qos_level))
|
||||
raise self.WAITING(mqttsn.PUBLISH)
|
||||
else:
|
||||
raise self.END()
|
||||
|
||||
@ATMT.state()
|
||||
def SUBSCRIBE_FROM_NODE(self):
|
||||
self.spawn.sendline("sub {} {}".format(self.topic_name,
|
||||
self.qos_level))
|
||||
raise self.WAITING(mqttsn.SUBSCRIBE)
|
||||
|
||||
@ATMT.state()
|
||||
def PUBLISH_TO_NODE(self, subscription):
|
||||
tid = subscription["tid"]
|
||||
self.last_mid += 1
|
||||
mid = self.last_mid
|
||||
if self.data_len == 0:
|
||||
# send deliberately broken length packets
|
||||
# (to small payload, len field < 256)
|
||||
self.last_packet = mqttsn.MQTTSN(len=128) / mqttsn.MQTTSNPublish(
|
||||
qos=self._qos_flags, tid=tid, mid=mid, data="128"
|
||||
)
|
||||
self.send(self.last_packet)
|
||||
# send deliberately broken length packets
|
||||
# (to small payload, len field >= 256)
|
||||
self.last_packet = mqttsn.MQTTSN(len=400) / mqttsn.MQTTSNPublish(
|
||||
qos=self._qos_flags, tid=tid, mid=mid, data="400"
|
||||
)
|
||||
self.send(self.last_packet)
|
||||
# send deliberately broken length packets (too large payload)
|
||||
self.last_packet = mqttsn.MQTTSN(len=10) / mqttsn.MQTTSNPublish(
|
||||
qos=self._qos_flags, tid=tid, mid=mid, data="X" * 20
|
||||
)
|
||||
self.send(self.last_packet)
|
||||
return subscription, mid
|
||||
if self.data_len < self.data_len_end:
|
||||
self.last_packet = mqttsn.MQTTSN() / mqttsn.MQTTSNPublish(
|
||||
qos=self._qos_flags, tid=tid, mid=mid, data="X" * self.data_len
|
||||
)
|
||||
self.send(self.last_packet)
|
||||
return subscription, mid
|
||||
else:
|
||||
raise self.END()
|
||||
|
||||
@ATMT.state()
|
||||
def WAITING(self, exp_type, tid=None, mid=None):
|
||||
return exp_type, mid, tid
|
||||
|
||||
@ATMT.state(final=1)
|
||||
def END(self):
|
||||
self.spawn.sendline("info")
|
||||
self.spawn.expect_exact("Broker: '{}'".format(self.gw_addr))
|
||||
self.spawn.expect_exact("- Topics")
|
||||
for tid, topic_name in enumerate(self.registered_topics, 1):
|
||||
self.spawn.expect_exact(" {:2d}: {}".format(tid, topic_name))
|
||||
self.spawn.expect_exact("- Subscriptions")
|
||||
for sub in self.subscriptions:
|
||||
self.spawn.expect_exact(" {:2d}: {}".format(
|
||||
sub["tid"], sub["topic_name"].decode())
|
||||
)
|
||||
self.spawn.sendline("reboot")
|
||||
return self.res
|
||||
|
||||
@ATMT.state(error=1)
|
||||
def UNEXPECTED_MESSAGE_TYPE(self, type, qos=None):
|
||||
self.res += "\nUnexpected message type {} [{}]".format(
|
||||
mqttsn.PACKET_TYPE[type],
|
||||
mqttsn.QOS_LEVELS[qos] if qos is not None else "-",
|
||||
)
|
||||
return self.res
|
||||
|
||||
@ATMT.state(error=1)
|
||||
def UNEXPECTED_PARAMETERS(self, pkt):
|
||||
self.res += "\nUnexpected parameters \n" \
|
||||
" {}".format(repr(pkt))
|
||||
return self.res
|
||||
|
||||
@ATMT.state(error=1)
|
||||
def MESSAGE_TIMEOUT(self, exp_type):
|
||||
self.res += "\n{} timed out".format(mqttsn.PACKET_TYPE[exp_type])
|
||||
return self.res
|
||||
|
||||
# >>> automaton timeouts, conditions and actions <<< #
|
||||
@ATMT.timeout(WAITING, TIMEOUT)
|
||||
def timeout_message(self, args):
|
||||
raise self.MESSAGE_TIMEOUT(args[0])
|
||||
|
||||
@ATMT.condition(PUBLISH_TO_NODE, prio=1)
|
||||
def PUBLISH_asks_for_PUBACK(self, args):
|
||||
subscription = args[0]
|
||||
tid = subscription["tid"]
|
||||
mid = args[1]
|
||||
if self.last_packet.qos in [mqttsn.QOS_1, mqttsn.QOS_2]:
|
||||
raise self.WAITING(mqttsn.PUBACK, tid, mid)
|
||||
|
||||
@ATMT.condition(PUBLISH_TO_NODE, prio=2)
|
||||
def wait_for_PUBLISH_on_node(self, args):
|
||||
subscription = args[0]
|
||||
if self.data_len > 0:
|
||||
self.spawn.expect_exact(
|
||||
"### got publication of {:d} bytes for topic "
|
||||
"'{}' [{:d}] ###"
|
||||
.format(self.data_len, subscription["topic_name"].decode(),
|
||||
subscription["tid"]))
|
||||
self.data_len += self.data_len_step
|
||||
time.sleep(self.pub_interval)
|
||||
raise self.PUBLISH_TO_NODE(subscription)
|
||||
|
||||
@ATMT.receive_condition(WAITING, prio=1)
|
||||
def receive_wrong_message(self, pkt, args):
|
||||
exp_type = args[0]
|
||||
if pkt.type != exp_type or not self._check_pkt_qos(pkt):
|
||||
raise self.UNEXPECTED_MESSAGE_TYPE(pkt.type, pkt.qos)
|
||||
|
||||
@ATMT.receive_condition(WAITING, prio=2)
|
||||
def receive_unexpected_parameters(self, pkt, args):
|
||||
if pkt.type == mqttsn.PUBLISH:
|
||||
if self.data_len != len(pkt.data):
|
||||
raise self.UNEXPECTED_PARAMETERS(pkt)
|
||||
elif pkt.type == mqttsn.PUBACK:
|
||||
exp_mid = args[1]
|
||||
exp_tid = args[2]
|
||||
if (exp_tid != pkt.tid) or (exp_mid != pkt.mid) or \
|
||||
(mqttsn.ACCEPTED != pkt.return_code):
|
||||
raise self.UNEXPECTED_PARAMETERS(pkt)
|
||||
|
||||
@ATMT.receive_condition(WAITING, prio=2)
|
||||
def receive_CONNECT_mode_sub(self, pkt, args):
|
||||
if pkt.type == mqttsn.CONNECT and self.mode == "sub":
|
||||
raise self.SUBSCRIBE_FROM_NODE()
|
||||
|
||||
@ATMT.receive_condition(WAITING, prio=2)
|
||||
def receive_CONNECT_mode_pub_or_sub_w_reg(self, pkt, args):
|
||||
if pkt.type == mqttsn.CONNECT and self.mode in ["pub", "sub_w_reg"]:
|
||||
raise self.REGISTER_FROM_NODE()
|
||||
|
||||
@ATMT.receive_condition(WAITING, prio=2)
|
||||
def receive_REGISTER_mode_pub(self, pkt, args):
|
||||
if pkt.type == mqttsn.REGISTER:
|
||||
topic_name = pkt.topic_name.decode()
|
||||
if self.mode in ["pub"]:
|
||||
raise self.PUBLISH_FROM_NODE(topic_name) \
|
||||
.action_parameters(topic_name=topic_name,
|
||||
mid=pkt.mid)
|
||||
|
||||
@ATMT.receive_condition(WAITING, prio=3)
|
||||
def receive_REGISTER_mode_sub_w_reg(self, pkt, args):
|
||||
if pkt.type == mqttsn.REGISTER:
|
||||
topic_name = pkt.topic_name.decode()
|
||||
if self.mode in ["sub_w_reg"]:
|
||||
raise self.SUBSCRIBE_FROM_NODE() \
|
||||
.action_parameters(topic_name=topic_name,
|
||||
mid=pkt.mid)
|
||||
|
||||
@ATMT.receive_condition(WAITING, prio=3)
|
||||
def receive_PUBLISH(self, pkt, args):
|
||||
if pkt.type == mqttsn.PUBLISH:
|
||||
topic_name = self._get_topic_name(pkt.tid)
|
||||
self.res += ":".join("{:02x}".format(c) for c in pkt.data)
|
||||
self.data_len += self.data_len_step
|
||||
raise self.PUBLISH_FROM_NODE(topic_name) \
|
||||
.action_parameters(topic_name=topic_name,
|
||||
qos=pkt.qos, mid=pkt.mid, tid=pkt.tid)
|
||||
|
||||
@ATMT.receive_condition(WAITING, prio=2)
|
||||
def receive_SUBSCRIBE(self, pkt, args):
|
||||
if pkt.type == mqttsn.SUBSCRIBE:
|
||||
if pkt.tid_type in [mqttsn.TID_NORMAL, mqttsn.TID_SHORT]:
|
||||
topic_name = pkt.topic_name
|
||||
tid = self._get_tid(pkt.topic_name)
|
||||
elif pkt.tid_type == mqttsn.TID_PREDEF:
|
||||
tid = pkt.tid
|
||||
topic_name = self._get_topic_name(tid)
|
||||
else:
|
||||
assert(False)
|
||||
subscription = {"tid": tid, "topic_name": topic_name}
|
||||
if subscription not in self.subscriptions:
|
||||
self.subscriptions.append(subscription)
|
||||
raise self.PUBLISH_TO_NODE(subscription) \
|
||||
.action_parameters(mid=pkt.mid, tid=tid)
|
||||
|
||||
@ATMT.receive_condition(WAITING, prio=2)
|
||||
def receive_PUBACK(self, pkt, args):
|
||||
if pkt.type == mqttsn.PUBACK:
|
||||
self.data_len += self.data_len_step
|
||||
time.sleep(self.pub_interval)
|
||||
raise self.PUBLISH_TO_NODE({
|
||||
"tid": pkt.tid,
|
||||
"topic_name": self._get_topic_name(pkt.tid)
|
||||
})
|
||||
|
||||
@ATMT.action(receive_CONNECT_mode_sub)
|
||||
@ATMT.action(receive_CONNECT_mode_pub_or_sub_w_reg)
|
||||
def send_CONNACK(self):
|
||||
# send too large packet for reception buffer
|
||||
# see https://github.com/RIOT-OS/RIOT/pull/12382
|
||||
self.last_packet = mqttsn.MQTTSN() / \
|
||||
mqttsn.MQTTSNConnack() / ("X" * 525)
|
||||
self.send(self.last_packet)
|
||||
# send deliberately broken length packets (too small len)
|
||||
self.last_packet = mqttsn.MQTTSN(len=2) / mqttsn.MQTTSNConnack()
|
||||
self.send(self.last_packet)
|
||||
# send deliberately broken length packets (too large len)
|
||||
self.last_packet = mqttsn.MQTTSN(len=3, type=mqttsn.CONNACK)
|
||||
self.send(self.last_packet)
|
||||
# send deliberately broken length packets (garbage payload)
|
||||
self.last_packet = mqttsn.MQTTSN(len=128) / \
|
||||
mqttsn.MQTTSNConnack() / b"this is garbage"
|
||||
self.send(self.last_packet)
|
||||
self.last_packet = mqttsn.MQTTSN() / mqttsn.MQTTSNConnack()
|
||||
self.send(self.last_packet)
|
||||
self.spawn.expect_exact("success: connected to gateway at {}"
|
||||
.format(self.gw_addr))
|
||||
|
||||
@ATMT.action(receive_REGISTER_mode_pub)
|
||||
@ATMT.action(receive_REGISTER_mode_sub_w_reg)
|
||||
def send_REGACK(self, topic_name, mid):
|
||||
tid = self._get_tid(topic_name)
|
||||
if topic_name not in self.registered_topics:
|
||||
self.registered_topics.append(topic_name)
|
||||
# send deliberately broken length packets (too small len)
|
||||
self.last_packet = mqttsn.MQTTSN(len=4) / \
|
||||
mqttsn.MQTTSNRegack(mid=mid, tid=tid)
|
||||
self.send(self.last_packet)
|
||||
# send deliberately broken length packets (too large len)
|
||||
# include valid MID for extra confusion
|
||||
self.last_packet = mqttsn.MQTTSN(len=7, type=mqttsn.REGACK) / \
|
||||
bytes([tid >> 8, tid & 0xff, mid >> 8, mid & 0xff])
|
||||
self.send(self.last_packet)
|
||||
# send deliberately broken length packets (garbage payload)
|
||||
self.last_packet = mqttsn.MQTTSN(len=128) / \
|
||||
mqttsn.MQTTSNRegack(mid=mid, tid=tid) / b"this is garbage"
|
||||
self.send(self.last_packet)
|
||||
self.last_packet = mqttsn.MQTTSN() / \
|
||||
mqttsn.MQTTSNRegack(mid=mid, tid=tid)
|
||||
self.send(self.last_packet)
|
||||
self.spawn.expect_exact("success: registered to topic '{} [{:d}]'"
|
||||
.format(topic_name, tid))
|
||||
|
||||
@ATMT.action(receive_PUBLISH)
|
||||
def send_PUBACK_if_required(self, qos, topic_name, mid, tid):
|
||||
if qos in (mqttsn.QOS_1, mqttsn.QOS_2):
|
||||
# send deliberately broken length packets (too small len)
|
||||
self.last_packet = mqttsn.MQTTSN(len=4) / \
|
||||
mqttsn.MQTTSNPuback(mid=mid, tid=tid)
|
||||
self.send(self.last_packet)
|
||||
# send deliberately broken length packets (too large len)
|
||||
# include valid MID for extra confusion
|
||||
self.last_packet = mqttsn.MQTTSN(len=7, type=mqttsn.PUBACK) / \
|
||||
bytes([tid >> 8, tid & 0xff, mid >> 8, mid & 0xff])
|
||||
self.send(self.last_packet)
|
||||
# send deliberately broken length packets (garbage payload)
|
||||
self.last_packet = mqttsn.MQTTSN(len=128) / \
|
||||
mqttsn.MQTTSNPuback(mid=mid, tid=tid) / b"this is garbage"
|
||||
self.send(self.last_packet)
|
||||
self.last_packet = mqttsn.MQTTSN() / mqttsn.MQTTSNPuback(mid=mid,
|
||||
tid=tid)
|
||||
self.send(self.last_packet)
|
||||
self.spawn.expect_exact(
|
||||
"success: published {:d} bytes to topic '{} [{:d}]'"
|
||||
.format(self.data_len - self.data_len_step, topic_name, tid)
|
||||
)
|
||||
time.sleep(self.pub_interval)
|
||||
|
||||
@ATMT.action(receive_SUBSCRIBE)
|
||||
def send_SUBACK(self, mid, tid):
|
||||
self.last_packet = mqttsn.MQTTSN() / mqttsn.MQTTSNSuback(
|
||||
tid=tid, mid=mid
|
||||
)
|
||||
self.send(self.last_packet)
|
||||
self.spawn.expect_exact("success: now subscribed to {}"
|
||||
.format(self._get_topic_name(tid).decode()))
|
||||
|
||||
|
||||
def check_and_search_output(cmd, pattern, res_group, *args, **kwargs):
|
||||
if isinstance(cmd, str):
|
||||
kwargs["shell"] = True
|
||||
output = subprocess.check_output(cmd, *args, **kwargs).decode("utf-8")
|
||||
for line in output.splitlines():
|
||||
m = re.search(pattern, line)
|
||||
if m is not None:
|
||||
return m.group(res_group)
|
||||
return None
|
||||
|
||||
|
||||
def get_bridge(tap):
|
||||
res = check_and_search_output(
|
||||
"command -v bridge", "^(.*bridge)", 1)
|
||||
if res is not None:
|
||||
res = check_and_search_output(
|
||||
["bridge", "link"],
|
||||
r"{}.+master\s+(?P<master>[^\s]+)".format(tap),
|
||||
"master"
|
||||
)
|
||||
return tap if res is None else res
|
||||
|
||||
|
||||
def get_host_lladdr(tap):
|
||||
res = check_and_search_output(
|
||||
["ip", "addr", "show", "dev", tap, "scope", "link"],
|
||||
r"inet6\s+(?P<lladdr>[0-9A-Fa-f:]+)/\d+",
|
||||
"lladdr"
|
||||
)
|
||||
if res is None:
|
||||
raise AssertionError(
|
||||
"Can't find host link-local address on interface {}"
|
||||
.format(tap)
|
||||
)
|
||||
else:
|
||||
return res
|
||||
|
||||
|
||||
def testfunc(child):
|
||||
tap = get_bridge(os.environ["TAP"])
|
||||
lladdr = get_host_lladdr(tap)
|
||||
|
||||
time.sleep(1)
|
||||
DATA_MAX_LEN = 512 - 9 # PUBLISH + 2 byte extra for length
|
||||
TOPIC_MAX_LEN = 249 # see Makefile
|
||||
for test_params in [
|
||||
{"qos_level": 0, "mode": "sub", "topic_name": "/test",
|
||||
"data_len_start": 0, "data_len_end": DATA_MAX_LEN,
|
||||
"data_len_step": 50},
|
||||
{"qos_level": 1, "mode": "sub", "topic_name": "/test",
|
||||
"data_len_start": 0, "data_len_end": DATA_MAX_LEN,
|
||||
"data_len_step": 50},
|
||||
{"qos_level": 1, "mode": "sub",
|
||||
"topic_name": "/" + ("x" * (TOPIC_MAX_LEN - 1)),
|
||||
"data_len_start": 8, "data_len_end": 9},
|
||||
{"qos_level": 1, "mode": "sub_w_reg", "topic_name": "/test",
|
||||
"data_len_start": 8, "data_len_end": 9},
|
||||
{"qos_level": 0, "mode": "pub", "topic_name": "/test",
|
||||
"data_len_start": 1, "data_len_end": DATA_MAX_LEN,
|
||||
"data_len_step": 50},
|
||||
{"qos_level": 1, "mode": "pub", "topic_name": "/test",
|
||||
"data_len_start": 1, "data_len_end": DATA_MAX_LEN,
|
||||
"data_len_step": 50}
|
||||
]:
|
||||
print("Run test case")
|
||||
pprint.pprint(test_params, compact=False)
|
||||
server = MQTTSNServer(child, pub_interval=.001,
|
||||
family=socket.AF_INET6,
|
||||
bind_addr=lladdr + "%" + tap,
|
||||
bind_port=SERVER_PORT, **test_params)
|
||||
try:
|
||||
server.run()
|
||||
finally:
|
||||
server.stop()
|
||||
server.socket_kargs["sock"].close()
|
||||
time.sleep(1)
|
||||
print("SUCCESS")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(run(testfunc, timeout=TIMEOUT, echo=False))
|
Loading…
Reference in New Issue
Block a user