|
4 | 4 | import logging |
5 | 5 | import os |
6 | 6 | import random |
| 7 | +import socket |
7 | 8 | import string |
8 | 9 | import unittest |
9 | 10 | import unittest.mock |
@@ -42,6 +43,13 @@ def _create_broadcast_test_packet(self, src) -> scapy.packet.Packet: |
42 | 43 | / scapy.packet.Raw(load=self._test_payload) |
43 | 44 | ) |
44 | 45 |
|
| 46 | + packet2 = ( |
| 47 | + scapy.layers.l2.Ether(dst="ff:ff:ff:ff:ff:ff") |
| 48 | + / scapy.layers.inet.IP(dst="255.255.255.255") |
| 49 | + / scapy.layers.inet.UDP(dport=5076) |
| 50 | + / scapy.packet.Raw(b"a" * 200) |
| 51 | + ) |
| 52 | + |
45 | 53 | return packet |
46 | 54 |
|
47 | 55 | async def test_main_runs(self): |
@@ -110,53 +118,100 @@ def test_discover_relays(self, *_): |
110 | 118 | class TestSnowSignalFragmented(unittest.IsolatedAsyncioTestCase): |
111 | 119 | """Test sending a valid fragmented UDP packet""" |
112 | 120 |
|
113 | | - maxDiff = None |
114 | | - |
115 | | - def _create_broadcast_test_packet_frag1(self) -> scapy.packet.Packet: |
116 | | - packet = scapy.layers.l2.Ether( |
117 | | - b"\xff\xff\xff\xff\xff\xff\x02B\n\x00\x03\xb5\x08\x00E\x00\x05\xa4\xd4Y \x00@\x11e<\n\x00\x03\xb5\n\x00\x03\xff\xa5\xa8\x13\xd4\x05\xa3?:\xca\x02\x00\x03\x93\x05\x00\x00\\b\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00\x00\x00\xa8\xa5\x01\x03tcp4\x00m1 \x10\x15DEV:LADA:MRE:DOUBLE:0n1 \x10\x15DEV:LADA:MRE:DOUBLE:1o1 \x10\x15DEV:LADA:MRE:DOUBLE:2p1 \x10\x15DEV:LADA:MRE:DOUBLE:3q1 \x10\x15DEV:LADA:MRE:DOUBLE:4r1 \x10\x15DEV:LADA:MRE:DOUBLE:5s1 \x10\x15DEV:LADA:MRE:DOUBLE:6t1 \x10\x15DEV:LADA:MRE:DOUBLE:7u1 \x10\x15DEV:LADA:MRE:DOUBLE:8v1 \x10\x15DEV:LADA:MRE:DOUBLE:9w1 \x10\x16DEV:LADA:MRE:DOUBLE:10x1 \x10\x16DEV:LADA:MRE:DOUBLE:11y1 \x10\x16DEV:LADA:MRE:DOUBLE:12z1 \x10\x16DEV:LADA:MRE:DOUBLE:13{1 \x10\x16DEV:LADA:MRE:DOUBLE:14|1 \x10\x16DEV:LADA:MRE:DOUBLE:15}1 \x10\x16DEV:LADA:MRE:DOUBLE:16~1 \x10\x16DEV:LADA:MRE:DOUBLE:17\x7f1 \x10\x16DEV:LADA:MRE:DOUBLE:18\x801 \x10\x16DEV:LADA:MRE:DOUBLE:19\x811 \x10\x16DEV:LADA:MRE:DOUBLE:20\x821 \x10\x16DEV:LADA:MRE:DOUBLE:21\x831 \x10\x16DEV:LADA:MRE:DOUBLE:22\x841 \x10\x16DEV:LADA:MRE:DOUBLE:23\x851 \x10\x16DEV:LADA:MRE:DOUBLE:24\x861 \x10\x16DEV:LADA:MRE:DOUBLE:25\x871 \x10\x16DEV:LADA:MRE:DOUBLE:26\x881 \x10\x16DEV:LADA:MRE:DOUBLE:27\x891 \x10\x16DEV:LADA:MRE:DOUBLE:28\x8a1 \x10\x16DEV:LADA:MRE:DOUBLE:29\x8b1 \x10\x16DEV:LADA:MRE:DOUBLE:30\x8c1 \x10\x16DEV:LADA:MRE:DOUBLE:31\x8d1 \x10\x16DEV:LADA:MRE:DOUBLE:32\x8e1 \x10\x16DEV:LADA:MRE:DOUBLE:33\x8f1 \x10\x16DEV:LADA:MRE:DOUBLE:34\x901 \x10\x16DEV:LADA:MRE:DOUBLE:35\x911 \x10\x16DEV:LADA:MRE:DOUBLE:36\x921 \x10\x16DEV:LADA:MRE:DOUBLE:37\x931 \x10\x16DEV:LADA:MRE:DOUBLE:38\x941 \x10\x16DEV:LADA:MRE:DOUBLE:39\x951 \x10\x16DEV:LADA:MRE:DOUBLE:40\x961 \x10\x16DEV:LADA:MRE:DOUBLE:41\x971 \x10\x16DEV:LADA:MRE:DOUBLE:42\x981 \x10\x16DEV:LADA:MRE:DOUBLE:43\x991 \x10\x16DEV:LADA:MRE:DOUBLE:44\x9a1 \x10\x16DEV:LADA:MRE:DOUBLE:45\x9b1 \x10\x16DEV:LADA:MRE:DOUBLE:46\x9c1 \x10\x16DEV:LADA:MRE:DOUBLE:47\x9d1 \x10\x16DEV:LADA:MRE:DOUBLE:48\x9e1 \x10\x16DEV:LADA:MRE:DOUBLE:49\x9f1 \x10\x16DEV:LADA:MRE:DOUBLE:50\xa01 \x10\x16DEV" |
| 121 | + ### This is often needed to understand what the hell is going on in this complex integration test |
| 122 | + # logger = logging.getLogger(__name__) |
| 123 | + # logging.basicConfig( |
| 124 | + # format="%(asctime)s - %(levelname)s - %(name)s.%(funcName)s: %(message)s", |
| 125 | + # encoding="utf-8", |
| 126 | + # level=logging.INFO, |
| 127 | + # ) |
| 128 | + |
| 129 | + def send_udp_broadcast(self, message: bytes, port: int = 5076): |
| 130 | + """Send a UDP broadcast message""" |
| 131 | + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) |
| 132 | + sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) |
| 133 | + sock.sendto(message, ("255.255.255.255", port)) |
| 134 | + sock.close() |
| 135 | + |
| 136 | + class UDPReceiveOnceProtocol(asyncio.DatagramProtocol): |
| 137 | + """Listen for a single UDP message""" |
| 138 | + |
| 139 | + def __init__(self): |
| 140 | + self.message = None |
| 141 | + |
| 142 | + def connection_made(self, transport): |
| 143 | + self.transport = transport |
| 144 | + |
| 145 | + def datagram_received(self, data, addr): |
| 146 | + self.message = data |
| 147 | + self.transport.close() |
| 148 | + |
| 149 | + async def test_fragmentation_sendreceive(self): |
| 150 | + """Simple test that we are sending and receiving""" |
| 151 | + |
| 152 | + # Start loop listening for UDP messages on port 5076 |
| 153 | + loop = asyncio.get_running_loop() |
| 154 | + transport, protocol = await loop.create_datagram_endpoint( |
| 155 | + self.UDPReceiveOnceProtocol, local_addr=("0.0.0.0", 5076) |
118 | 156 | ) |
119 | 157 |
|
120 | | - return packet |
121 | | - |
122 | | - def _create_broadcast_test_packet_frag2(self) -> scapy.packet.Packet: |
123 | | - packet = scapy.layers.l2.Ether( |
124 | | - b"\xff\xff\xff\xff\xff\xff\x02B\n\x00\x03\xb5\x08\x00E\x00\x00'\xd4Y\x00\xb2@\x11\x8a\x07\n\x00\x03\xb5\n\x00\x03\xff:LADA:MRE:DOUBLE:51" |
| 158 | + # Give it a little time to fully setup |
| 159 | + await asyncio.sleep(0.1) |
| 160 | + |
| 161 | + # Send a fragmented UDP message. We ensure fragmentation by making the message payload long |
| 162 | + toolong_msg = b"abcdefghij" * 500 |
| 163 | + self.send_udp_broadcast(toolong_msg) |
| 164 | + |
| 165 | + # Give them time to arrive |
| 166 | + await asyncio.sleep(0.1) |
| 167 | + |
| 168 | + self.assertEqual(protocol.message, toolong_msg) |
| 169 | + |
| 170 | + # Because this test is using UDP broadcast messages sourced from the same container, and thus |
| 171 | + # the same MAC address, we need to switch off the UDPRelayTransmit l2filter |
| 172 | + # Mocking out the UDPRelayReceive has a primary purpose of letting us test that the fragments |
| 173 | + # are transmitted as expected, but it also serves to disable rebroadcasts and thus mitigate |
| 174 | + # the risk of a mini packet storm |
| 175 | + @patch("snowsignal.udp_relay_transmit.UDPRelayTransmit.l2filter", return_value=True) |
| 176 | + @patch("snowsignal.udp_relay_receive.UDPRelayReceive.datagram_received") |
| 177 | + async def test_fragments_rebroadcast(self, mock_datagram_received: unittest.mock.AsyncMock, _): |
| 178 | + # Start main, note that we can't use the loopback interface as we won't see packet |
| 179 | + # fragmentation on that interface. That makes this test very brittle |
| 180 | + main_task = asyncio.create_task( |
| 181 | + snowsignal.main("--target-interface=eth0 --other-relays=172.21.0.3", loop_forever=True) |
125 | 182 | ) |
126 | 183 |
|
127 | | - return packet |
128 | | - |
129 | | - async def test_main_runs(self): |
130 | | - """See if main executes without any problems!""" |
131 | | - |
132 | | - await snowsignal.main("--log-level=error", loop_forever=False) |
133 | | - |
134 | | - @patch.object(snowsignal.UDPRelayTransmit, "_send_to_relays_packet") |
135 | | - async def test_integration( |
136 | | - self, |
137 | | - receive_datagram_mock: unittest.mock.AsyncMock, |
138 | | - ): |
139 | | - """Simple integration test""" |
140 | | - # Start main, note that we are using the loopback interface. This is |
141 | | - # important for CI/CD testing (and handy for keeping our test packets |
142 | | - # local). |
143 | | - main_task = asyncio.create_task(snowsignal.main("--target-interface=lo --log-level=error", loop_forever=True)) |
144 | | - |
145 | 184 | # Give time for setup to happen |
146 | | - await asyncio.sleep(0.5) |
147 | | - |
148 | | - # Send the broadcast fragments to the loopback interface |
149 | | - send_packet_frag1 = self._create_broadcast_test_packet_frag1() |
150 | | - scapy.sendrecv.sendp(send_packet_frag1, "lo") |
| 185 | + await asyncio.sleep(1) |
151 | 186 |
|
152 | | - send_packet_frag2 = self._create_broadcast_test_packet_frag2() |
153 | | - scapy.sendrecv.sendp(send_packet_frag2, "lo") |
| 187 | + # Send a fragmented UDP message. We ensure fragmentation by making the message payload long |
| 188 | + toolong_msg = b"" |
| 189 | + for i in range(500): |
| 190 | + toolong_msg += f"test{i:03d}".encode("utf-8") |
| 191 | + self.send_udp_broadcast(toolong_msg) |
154 | 192 |
|
155 | 193 | # And some time for packets to fly around |
156 | 194 | await asyncio.sleep(0.25) |
157 | 195 |
|
158 | | - # Then test if it all worked! |
159 | | - self.assertEqual(receive_datagram_mock.call_count, 2) |
| 196 | + # Then test if it all worked! We attempt to reassemble the packet payload from the fragments |
| 197 | + # by looping throuhg the calls to datagram_received and examining the data argument |
| 198 | + received_packet_payloads = b"" |
| 199 | + for call in mock_datagram_received.call_args_list: |
| 200 | + data = call[0][0] |
| 201 | + |
| 202 | + if data[0:2] == b"SS": |
| 203 | + data = data[2:] |
| 204 | + else: |
| 205 | + self.fail("Unexpected data format received; did not start with magic bytes 'SS'") |
| 206 | + |
| 207 | + # First fragment is UDP but later ones are not |
| 208 | + packet = scapy.layers.l2.Ether(data) |
| 209 | + try: |
| 210 | + received_packet_payloads += bytes(packet[scapy.layers.inet.UDP].payload) |
| 211 | + except IndexError: |
| 212 | + received_packet_payloads += bytes(packet[scapy.layers.inet.IP].payload) |
| 213 | + |
| 214 | + self.assertEqual(received_packet_payloads, toolong_msg) |
160 | 215 |
|
161 | 216 | # Quit main, though it probably quits anyway |
162 | 217 | main_task.cancel() |
0 commit comments