#!/usr/bin/env python3 # Copyright (C) 2018 Simon Brummer # # This file is subject to the terms and conditions of the GNU Lesser # General Public License v2.1. See the file LICENSE in the top level # directory for more details. import sys import os import re import socket import random import testrunner class Runner: def __init__(self, timeout, echo=False, skip=False): self.timeout = timeout self.echo = echo self.skip = skip def __call__(self, fn): if self.skip: print('\n- "{}": SKIPPED'.format(fn.__name__), end='') return 0 res = -1 try: res = testrunner.run(fn, self.timeout, self.echo) finally: if res == 0: print('- "{}": SUCCESS'.format(fn.__name__), end='') else: print('- "{}": FAILED'.format(fn.__name__), end='') return res class _HostTcpNode: def __init__(self): self.opened = False self.sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.interface = self._get_interface() self.address = self._get_ip_address(self.interface) def send(self, payload_to_send): self.sock.send(payload_to_send.encode('utf-8')) def receive(self, sent_payload): total_bytes = len(sent_payload) assert self.sock.recv(total_bytes, socket.MSG_WAITALL).decode('utf-8') == sent_payload def close(self): self.sock.close() self.opened = False def _get_interface(self): # Check if given tap device is part of a network bridge # if so use bridged interface instead of given tap device tap = os.environ["TAPDEV"] result = os.popen('bridge link show dev {}'.format(tap)) bridge = re.search('master (.*) state', result.read()) return bridge.group(1).strip() if bridge else tap def _get_ip_address(self, interface): result = os.popen('ip addr show dev ' + interface + ' scope link') return re.search('inet6 (.*)/64', result.read()).group(1).strip() class HostTcpServer(_HostTcpNode): def __init__(self, listen_port): super().__init__() self.listening = False self.listen_sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) self.listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.listen_port = listen_port def __enter__(self): if not self.listening: self.listen() return self def __exit__(self, _1, _2, _3): if self.listening: self.stop_listen() def listen(self): self.listen_sock.bind(('::', self.listen_port)) self.listen_sock.listen(1) self.listening = True def accept(self): self.sock, _ = self.listen_sock.accept() def stop_listen(self): self.listen_sock.close() self.listening = False class HostTcpClient(_HostTcpNode): def __init__(self, target): super().__init__() self.target_addr = str(target.address) self.target_port = str(target.listen_port) def __enter__(self): if not self.opened: self.open() return self def __exit__(self, _1, _2, _3): if self.opened: self.close() def open(self): addrinfo = socket.getaddrinfo( self.target_addr + '%' + self.interface, self.target_port, type=socket.SOCK_STREAM ) self.sock.connect(addrinfo[0][-1]) self.opened = True class _SockTcpNode: def __init__(self, child): self.child = child self.connected = False self.interface = self._get_interface() self.address = self._get_ip_address() def write(self, payload): self.child.sendline('sock_tcp_write {}'.format(str(payload))) self.child.expect_exact('sock_tcp_write: sent {}'.format(len(payload))) def read(self, payload, timeout_ms=0): total_bytes = str(len(payload)) self.child.sendline('sock_tcp_read {} {}'.format(total_bytes, timeout_ms)) self.child.expect_exact('sock_tcp_read: received {} {}'.format( total_bytes, payload) ) def disconnect(self): self.child.sendline('sock_tcp_disconnect') self.child.expect_exact('sock_tcp_disconnect: returns') self.connected = False def get_local(self): self.child.sendline('sock_tcp_get_local') self.child.expect_exact('sock_tcp_get_local: returns 0') def get_remote(self): self.child.sendline('sock_tcp_get_remote') self.child.expect_exact('sock_tcp_get_remote: returns 0') def _get_interface(self): self.child.sendline('ifconfig') self.child.expect(r'Iface\s+(\d+)\s') return self.child.match.group(1).strip() def _get_ip_address(self): self.child.sendline('ifconfig') self.child.expect(r'(fe80:[0-9a-f:]+)\s') return self.child.match.group(1).strip() class SockTcpServer(_SockTcpNode): def __init__(self, child, listen_port, listen_addr='::'): super().__init__(child) self.listening = False self.listen_port = str(listen_port) self.listen_addr = str(listen_addr) def __enter__(self): if not self.listening: self.listen() return self def __exit__(self, _1, _2, _3): if self.listening: self.stop_listen() def listen(self): self.child.sendline('sock_tcp_listen [{}]:{}'.format( self.listen_addr, self.listen_port) ) self.child.expect_exact('sock_tcp_listen: returns 0') self.listening = True def accept(self, timeout_ms): self.child.sendline('sock_tcp_accept {}'.format(str(timeout_ms))) self.child.expect_exact('sock_tcp_accept: returns 0') self.opened = True def stop_listen(self): self.child.sendline('sock_tcp_stop_listen') self.child.expect_exact('sock_tcp_stop_listen: returns') self.listening = False def queue_get_local(self): self.child.sendline('sock_tcp_queue_get_local') self.child.expect_exact('sock_tcp_queue_get_local: returns 0') class SockTcpClient(_SockTcpNode): def __init__(self, child, target, local_port=0): super().__init__(child) self.target_addr = target.address + '%' + self.interface self.target_port = str(target.listen_port) self.local_port = local_port def __enter__(self): if not self.connected: self.connect() return self def __exit__(self, _1, _2, _3): if self.connected: self.disconnect() def connect(self): self.child.sendline('sock_tcp_connect [{}]:{} {}'.format( self.target_addr, self.target_port, self.local_port) ) self.child.expect_exact('sock_tcp_connect: returns 0') self.connected = True def generate_port_number(): return random.randint(1024, 65535) def sudo_guard(uses_scapy=False): sudo_required = uses_scapy or (not os.environ.get("BOARD", "") in ["native", "native64"]) if sudo_required and os.geteuid() != 0: print("\x1b[1;31mThis test requires root privileges.\n" "It uses `./dist/tools/ethos/start_networking.sh` as term" + (" and it's constructing and sending Ethernet frames." if uses_scapy else "") + "\x1b[0m\n", file=sys.stderr) sys.exit(1)