1
0
mirror of https://github.com/RIOT-OS/RIOT.git synced 2025-01-18 12:52:44 +01:00

Merge pull request #15856 from miri64/tests/fix/gnrc_ipv6_ext_frag-dynamical-iface

tests/gnrc_ipv6_ext_frag: fetch Interface IDs dynamically
This commit is contained in:
Martine Lenders 2021-01-26 16:26:28 +01:00 committed by GitHub
commit 0882dc99af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -58,8 +58,8 @@ def stop_udp_server(child):
"Error: server was not running"]) "Error: server was not running"])
def udp_send(child, addr, port, length, num=1, delay=1000000): def udp_send(child, addr, iface, port, length, num=1, delay=1000000):
child.sendline("udp send {addr}%6 {port} {length} {num} {delay}" child.sendline("udp send {addr}%{iface} {port} {length} {num} {delay}"
.format(**vars())) .format(**vars()))
child.expect(r"Success: send {length} byte to \[[0-9a-f:]+\]:{port}" child.expect(r"Success: send {length} byte to \[[0-9a-f:]+\]:{port}"
.format(**vars())) .format(**vars()))
@ -173,10 +173,36 @@ def test_ipv6_ext_frag_shell_test_1(child, s, iface, ll_dst):
pktbuf_empty(child) pktbuf_empty(child)
def _check_iface(child):
# get TAP MAC address
child.sendline("ifconfig")
ethos_id = None
mock_id = None
hwaddr = None
for _ in range(2):
child.expect(r"Iface\s+(\d+)\s+.*")
match = re.search(r"HWaddr:\s+([0-9A-F:]{17})\s+",
child.match.group(0))
if match is not None:
# interface has a hardware address
ethos_id = int(child.match.group(1))
hwaddr = match.group(1)
else:
mock_id = int(child.match.group(1))
# consume MTU for later calls of `ifconfig {mock_id}`
child.expect(r"MTU:(\d+)")
# check if interface is configured properly
assert ethos_id is not None
assert mock_id is not None
assert hwaddr is not None
return ethos_id, mock_id, hwaddr
def test_ipv6_ext_frag_send_success(child, s, iface, ll_dst): def test_ipv6_ext_frag_send_success(child, s, iface, ll_dst):
length = get_host_mtu(iface) length = get_host_mtu(iface)
port = s.getsockname()[1] port = s.getsockname()[1]
udp_send(child, ll_dst, port, length) ethos_id, _, _ = _check_iface(child)
udp_send(child, ll_dst, ethos_id, port, length)
data, _ = s.recvfrom(length) data, _ = s.recvfrom(length)
assert len(data) == length assert len(data) == length
pktbuf_empty(child) pktbuf_empty(child)
@ -192,7 +218,8 @@ def test_ipv6_ext_frag_send_last_fragment_filled(child, s, iface, ll_dst):
# second fragment fills the whole available MTU # second fragment fills the whole available MTU
length += mtu length += mtu
port = s.getsockname()[1] port = s.getsockname()[1]
udp_send(child, ll_dst, port, length) ethos_id, _, _ = _check_iface(child)
udp_send(child, ll_dst, ethos_id, port, length)
data, _ = s.recvfrom(length) data, _ = s.recvfrom(length)
assert len(data) == length assert len(data) == length
pktbuf_empty(child) pktbuf_empty(child)
@ -205,7 +232,8 @@ def test_ipv6_ext_frag_send_last_fragment_only_one_byte(child, s,
length = (mtu - len(IPv6() / UDP())) length = (mtu - len(IPv6() / UDP()))
length += 1 length += 1
port = s.getsockname()[1] port = s.getsockname()[1]
udp_send(child, ll_dst, port, length) ethos_id, _, _ = _check_iface(child)
udp_send(child, ll_dst, ethos_id, port, length)
data, _ = s.recvfrom(length) data, _ = s.recvfrom(length)
assert len(data) == length assert len(data) == length
pktbuf_empty(child) pktbuf_empty(child)
@ -217,18 +245,19 @@ def test_ipv6_ext_frag_send_full_pktbuf(child, s, iface, ll_dst):
length -= (len(IPv6() / IPv6ExtHdrFragment() / UDP()) + length -= (len(IPv6() / IPv6ExtHdrFragment() / UDP()) +
(len(IPv6() / IPv6ExtHdrFragment())) + 96) (len(IPv6() / IPv6ExtHdrFragment())) + 96)
port = s.getsockname()[1] port = s.getsockname()[1]
ethos_id, _, _ = _check_iface(child)
# trigger neighbor discovery so it doesn't fill the packet buffer # trigger neighbor discovery so it doesn't fill the packet buffer
udp_send(child, ll_dst, port, 1) udp_send(child, ll_dst, ethos_id, port, 1)
data, _ = s.recvfrom(1) data, _ = s.recvfrom(1)
last_nd = time.time() last_nd = time.time()
count = 0 count = 0
while True: while True:
if (time.time() - last_nd) > 5: if (time.time() - last_nd) > 5:
# trigger neighbor discovery so it doesn't fill the packet buffer # trigger neighbor discovery so it doesn't fill the packet buffer
udp_send(child, ll_dst, port, 1) udp_send(child, ll_dst, ethos_id, port, 1)
data, _ = s.recvfrom(1) data, _ = s.recvfrom(1)
last_nd = time.time() last_nd = time.time()
udp_send(child, ll_dst, port, length) udp_send(child, ll_dst, ethos_id, port, length)
count += 1 count += 1
try: try:
data, _ = s.recvfrom(length) data, _ = s.recvfrom(length)
@ -245,36 +274,31 @@ def test_ipv6_ext_frag_send_full_pktbuf(child, s, iface, ll_dst):
def _fwd_setup(child, ll_dst, g_src, g_dst): def _fwd_setup(child, ll_dst, g_src, g_dst):
# check if interface is configured properly ethos_id, mock_id, hwaddr = _check_iface(child)
child.sendline("ifconfig 7") child.sendline("ifconfig {}".format(mock_id))
child.expect(r"MTU:(\d+)") child.expect(r"MTU:(\d+)")
mtu = int(child.match.group(1)) mtu = int(child.match.group(1))
# configure routes # configure routes
child.sendline("nib route add 7 {}/128 fe80::1".format(g_dst)) child.sendline("nib route add {} {}/128 fe80::1".format(mock_id, g_dst))
child.sendline("nib route add 6 {}/128 {}".format(g_src, ll_dst)) child.sendline("nib route add {} {}/128 {}"
.format(ethos_id, g_src, ll_dst))
child.sendline("nib route") child.sendline("nib route")
child.expect(r"{}/128 via fe80::1 dev #7".format(g_dst)) child.expect(r"{}/128 via fe80::1 dev #{}".format(g_dst, mock_id))
child.expect(r"{}/128 via {} dev #6".format(g_src, ll_dst)) child.expect(r"{}/128 via {} dev #{}".format(g_src, ll_dst, ethos_id))
child.sendline("nib neigh add 7 fe80::1") child.sendline("nib neigh add {} fe80::1".format(mock_id))
child.sendline("nib neigh") child.sendline("nib neigh")
child.expect(r"fe80::1 dev #7 lladdr\s+-") child.expect(r"fe80::1 dev #{} lladdr\s+-".format(mock_id))
# get TAP MAC address return mock_id, mtu, hwaddr
child.sendline("ifconfig 6")
child.expect(r"HWaddr: ([0-9A-F:]+)\s")
hwaddr = child.match.group(1)
# consume MTU for later calls of `ifconfig 7`
child.expect(r"MTU:(\d+)")
return mtu, hwaddr
def _fwd_teardown(child): def _fwd_teardown(child, mock_id):
# remove route # remove route
child.sendline("nib neigh del 7 fe80::1") child.sendline("nib neigh del {} fe80::1".format(mock_id))
child.sendline("nib route del 7 affe::/64") child.sendline("nib route del {} affe::/64".format(mock_id))
def test_ipv6_ext_frag_fwd_success(child, s, iface, ll_dst): def test_ipv6_ext_frag_fwd_success(child, s, iface, ll_dst):
mtu, dst_mac = _fwd_setup(child, ll_dst, "beef::1", "affe::1") mock_id, mtu, dst_mac = _fwd_setup(child, ll_dst, "beef::1", "affe::1")
payload_fit = mtu - len(IPv6() / IPv6ExtHdrFragment() / UDP()) payload_fit = mtu - len(IPv6() / IPv6ExtHdrFragment() / UDP())
pkt = Ether(dst=dst_mac) / IPv6(src="beef::1", dst="affe::1") / \ pkt = Ether(dst=dst_mac) / IPv6(src="beef::1", dst="affe::1") / \
IPv6ExtHdrFragment(m=True, id=0x477384a9) / \ IPv6ExtHdrFragment(m=True, id=0x477384a9) / \
@ -294,11 +318,11 @@ def test_ipv6_ext_frag_fwd_success(child, s, iface, ll_dst):
exp_str = ("{:08X}" + (" {:02X}") * len(bs)).format(addr, *bs) exp_str = ("{:08X}" + (" {:02X}") * len(bs)).format(addr, *bs)
child.expect_exact(exp_str) child.expect_exact(exp_str)
addr += 16 addr += 16
_fwd_teardown(child) _fwd_teardown(child, mock_id)
def test_ipv6_ext_frag_fwd_too_big(child, s, iface, ll_dst): def test_ipv6_ext_frag_fwd_too_big(child, s, iface, ll_dst):
mtu, dst_mac = _fwd_setup(child, ll_dst, "beef::1", "affe::1") mock_id, mtu, dst_mac = _fwd_setup(child, ll_dst, "beef::1", "affe::1")
assert(get_host_mtu(iface) > mtu) assert(get_host_mtu(iface) > mtu)
payload_fit = get_host_mtu(iface) - len(IPv6() / IPv6ExtHdrFragment() / payload_fit = get_host_mtu(iface) - len(IPv6() / IPv6ExtHdrFragment() /
UDP()) UDP())
@ -312,7 +336,7 @@ def test_ipv6_ext_frag_fwd_too_big(child, s, iface, ll_dst):
assert(ICMPv6PacketTooBig in pkt) assert(ICMPv6PacketTooBig in pkt)
assert(IPv6ExtHdrFragment in pkt) assert(IPv6ExtHdrFragment in pkt)
assert(pkt[IPv6ExtHdrFragment].id == 0x477384a9) assert(pkt[IPv6ExtHdrFragment].id == 0x477384a9)
_fwd_teardown(child) _fwd_teardown(child, mock_id)
def testfunc(child): def testfunc(child):