1
0
mirror of https://github.com/RIOT-OS/RIOT.git synced 2025-01-18 12:52:44 +01:00
RIOT/tests/net/gnrc_sock_tcp/tests-as-root/helpers.py

245 lines
7.4 KiB
Python

#!/usr/bin/env python3
# Copyright (C) 2018 Simon Brummer <simon.brummer@posteo.de>
#
# This file is subject to the terms and conditions of the GNU Lesser
# General Public License v2.1. See the file LICENSE in the top level
# directory for more details.
import sys
import os
import re
import socket
import random
import testrunner
class Runner:
def __init__(self, timeout, echo=False, skip=False):
self.timeout = timeout
self.echo = echo
self.skip = skip
def __call__(self, fn):
if self.skip:
print('\n- "{}": SKIPPED'.format(fn.__name__), end='')
return 0
res = -1
try:
res = testrunner.run(fn, self.timeout, self.echo)
finally:
if res == 0:
print('- "{}": SUCCESS'.format(fn.__name__), end='')
else:
print('- "{}": FAILED'.format(fn.__name__), end='')
return res
class _HostTcpNode:
def __init__(self):
self.opened = False
self.sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.interface = self._get_interface()
self.address = self._get_ip_address(self.interface)
def send(self, payload_to_send):
self.sock.send(payload_to_send.encode('utf-8'))
def receive(self, sent_payload):
total_bytes = len(sent_payload)
assert self.sock.recv(total_bytes, socket.MSG_WAITALL).decode('utf-8') == sent_payload
def close(self):
self.sock.close()
self.opened = False
def _get_interface(self):
# Check if given tap device is part of a network bridge
# if so use bridged interface instead of given tap device
tap = os.environ["TAPDEV"]
result = os.popen('bridge link show dev {}'.format(tap))
bridge = re.search('master (.*) state', result.read())
return bridge.group(1).strip() if bridge else tap
def _get_ip_address(self, interface):
result = os.popen('ip addr show dev ' + interface + ' scope link')
return re.search('inet6 (.*)/64', result.read()).group(1).strip()
class HostTcpServer(_HostTcpNode):
def __init__(self, listen_port):
super().__init__()
self.listening = False
self.listen_sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self.listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.listen_port = listen_port
def __enter__(self):
if not self.listening:
self.listen()
return self
def __exit__(self, _1, _2, _3):
if self.listening:
self.stop_listen()
def listen(self):
self.listen_sock.bind(('::', self.listen_port))
self.listen_sock.listen(1)
self.listening = True
def accept(self):
self.sock, _ = self.listen_sock.accept()
def stop_listen(self):
self.listen_sock.close()
self.listening = False
class HostTcpClient(_HostTcpNode):
def __init__(self, target):
super().__init__()
self.target_addr = str(target.address)
self.target_port = str(target.listen_port)
def __enter__(self):
if not self.opened:
self.open()
return self
def __exit__(self, _1, _2, _3):
if self.opened:
self.close()
def open(self):
addrinfo = socket.getaddrinfo(
self.target_addr + '%' + self.interface,
self.target_port,
type=socket.SOCK_STREAM
)
self.sock.connect(addrinfo[0][-1])
self.opened = True
class _SockTcpNode:
def __init__(self, child):
self.child = child
self.connected = False
self.interface = self._get_interface()
self.address = self._get_ip_address()
def write(self, payload):
self.child.sendline('sock_tcp_write {}'.format(str(payload)))
self.child.expect_exact('sock_tcp_write: sent {}'.format(len(payload)))
def read(self, payload, timeout_ms=0):
total_bytes = str(len(payload))
self.child.sendline('sock_tcp_read {} {}'.format(total_bytes, timeout_ms))
self.child.expect_exact('sock_tcp_read: received {} {}'.format(
total_bytes, payload)
)
def disconnect(self):
self.child.sendline('sock_tcp_disconnect')
self.child.expect_exact('sock_tcp_disconnect: returns')
self.connected = False
def get_local(self):
self.child.sendline('sock_tcp_get_local')
self.child.expect_exact('sock_tcp_get_local: returns 0')
def get_remote(self):
self.child.sendline('sock_tcp_get_remote')
self.child.expect_exact('sock_tcp_get_remote: returns 0')
def _get_interface(self):
self.child.sendline('ifconfig')
self.child.expect(r'Iface\s+(\d+)\s')
return self.child.match.group(1).strip()
def _get_ip_address(self):
self.child.sendline('ifconfig')
self.child.expect(r'(fe80:[0-9a-f:]+)\s')
return self.child.match.group(1).strip()
class SockTcpServer(_SockTcpNode):
def __init__(self, child, listen_port, listen_addr='::'):
super().__init__(child)
self.listening = False
self.listen_port = str(listen_port)
self.listen_addr = str(listen_addr)
def __enter__(self):
if not self.listening:
self.listen()
return self
def __exit__(self, _1, _2, _3):
if self.listening:
self.stop_listen()
def listen(self):
self.child.sendline('sock_tcp_listen [{}]:{}'.format(
self.listen_addr, self.listen_port)
)
self.child.expect_exact('sock_tcp_listen: returns 0')
self.listening = True
def accept(self, timeout_ms):
self.child.sendline('sock_tcp_accept {}'.format(str(timeout_ms)))
self.child.expect_exact('sock_tcp_accept: returns 0')
self.opened = True
def stop_listen(self):
self.child.sendline('sock_tcp_stop_listen')
self.child.expect_exact('sock_tcp_stop_listen: returns')
self.listening = False
def queue_get_local(self):
self.child.sendline('sock_tcp_queue_get_local')
self.child.expect_exact('sock_tcp_queue_get_local: returns 0')
class SockTcpClient(_SockTcpNode):
def __init__(self, child, target, local_port=0):
super().__init__(child)
self.target_addr = target.address + '%' + self.interface
self.target_port = str(target.listen_port)
self.local_port = local_port
def __enter__(self):
if not self.connected:
self.connect()
return self
def __exit__(self, _1, _2, _3):
if self.connected:
self.disconnect()
def connect(self):
self.child.sendline('sock_tcp_connect [{}]:{} {}'.format(
self.target_addr, self.target_port, self.local_port)
)
self.child.expect_exact('sock_tcp_connect: returns 0')
self.connected = True
def generate_port_number():
return random.randint(1024, 65535)
def sudo_guard(uses_scapy=False):
sudo_required = uses_scapy or (os.environ.get("BOARD", "") != "native")
if sudo_required and os.geteuid() != 0:
print("\x1b[1;31mThis test requires root privileges.\n"
"It uses `./dist/tools/ethos/start_networking.sh` as term" +
(" and it's constructing and sending Ethernet frames."
if uses_scapy else "") + "\x1b[0m\n",
file=sys.stderr)
sys.exit(1)