@@ -18,17 +18,100 @@ import NIOPosix
1818
1919#if canImport(Darwin)
2020import Darwin
21+ #elseif canImport(Glibc)
22+ import Glibc
2123#endif
2224
2325extension NIOIPProtocol {
2426 static let reservedForTesting = Self ( rawValue: 253 )
2527}
2628
29+ func XCTSkipIfUserHasNotEnoughRightsForRawSocketAPI( file: StaticString = #filePath, line: UInt = #line) throws {
30+ try XCTSkipIf ( geteuid ( ) != 0 , " Raw Socket API requires root privileges " , file: file, line: line)
31+ }
32+
33+ struct IPv4Address : Hashable {
34+ var rawValue : UInt32
35+ }
36+
37+ extension IPv4Address {
38+ init ( _ v1: UInt8 , _ v2: UInt8 , _ v3: UInt8 , _ v4: UInt8 ) {
39+ rawValue = UInt32 ( v1) << 24 | UInt32 ( v2) << 16 | UInt32 ( v3) << 8 | UInt32 ( v4)
40+ }
41+ }
42+
43+ struct IPv4Header {
44+ static let size : Int = 20
45+
46+ private let versionAndIhl : UInt8
47+ var version : UInt8 {
48+ versionAndIhl >> 4
49+ }
50+ var internetHeaderLength : UInt8 {
51+ versionAndIhl & 0b0000_1111
52+ }
53+ private let dscpAndEcn : UInt8
54+ var differentiatedServicesCodePoint : UInt8 {
55+ dscpAndEcn >> 2
56+ }
57+ var explicitCongestionNotification : UInt8 {
58+ dscpAndEcn & 0b0000_0011
59+ }
60+ let totalLength : UInt16
61+ let identification : UInt16
62+ let flagsAndFragmentOffset : UInt16
63+ var flags : UInt8 {
64+ UInt8 ( flagsAndFragmentOffset >> 13 )
65+ }
66+ var fragmentOffset : UInt16 {
67+ flagsAndFragmentOffset & 0b0001_1111_1111_1111
68+ }
69+ let timeToLive : UInt8
70+ let `protocol` : NIOIPProtocol
71+ let headerChecksum : UInt16
72+ let sourceIpAdress : IPv4Address
73+ let destinationIpAddress : IPv4Address
74+
75+ init ? ( buffer: inout ByteBuffer ) {
76+ guard let (
77+ versionAndIhl,
78+ dscpAndEcn,
79+ totalLength,
80+ identification,
81+ flagsAndFragmentOffset,
82+ timeToLive,
83+ `protocol`,
84+ headerChecksum,
85+ sourceIpAdress,
86+ destinationIpAddress
87+ ) = buffer. readMultipleIntegers ( as: (
88+ UInt8,
89+ UInt8,
90+ UInt16,
91+ UInt16,
92+ UInt16,
93+ UInt8,
94+ UInt8,
95+ UInt16,
96+ UInt32,
97+ UInt32
98+ ) . self) else { return nil }
99+ self . versionAndIhl = versionAndIhl
100+ self . dscpAndEcn = dscpAndEcn
101+ self . totalLength = totalLength
102+ self . identification = identification
103+ self . flagsAndFragmentOffset = flagsAndFragmentOffset
104+ self . timeToLive = timeToLive
105+ self . `protocol` = . init( rawValue: `protocol`)
106+ self . headerChecksum = headerChecksum
107+ self . sourceIpAdress = . init( rawValue: sourceIpAdress)
108+ self . destinationIpAddress = . init( rawValue: destinationIpAddress)
109+ }
110+ }
111+
27112final class RawSocketBootstrapTests : XCTestCase {
28- func testWriteAndRead( ) throws {
29- #if canImport(Darwin)
30- try XCTSkipIf ( geteuid ( ) != 0 , " Raw Socket API requires root privileges on Darwin " )
31- #endif
113+ func testBindWithRecevMmsg( ) throws {
114+ try XCTSkipIfUserHasNotEnoughRightsForRawSocketAPI ( )
32115
33116 let elg = MultiThreadedEventLoopGroup ( numberOfThreads: 1 )
34117 defer { XCTAssertNoThrow ( try elg. syncShutdownGracefully ( ) ) }
@@ -48,13 +131,60 @@ final class RawSocketBootstrapTests: XCTestCase {
48131 }
49132 channel. flush ( )
50133
51- let receivedMessages = Set ( try channel. waitForDatagrams ( count: 10 ) . map ( \. data) . map { buffer in
52- String (
53- decoding: buffer. readableBytesView. dropFirst ( 4 * 5 ) , // skip the IPv4 header
54- as: UTF8 . self
55- )
134+ let receivedMessages = Set ( try channel. waitForDatagrams ( count: 10 ) . map { envelop -> String in
135+ var data = envelop. data
136+ let header = try XCTUnwrap ( IPv4Header ( buffer: & data) )
137+ XCTAssertEqual ( header. version, 4 )
138+ XCTAssertEqual ( header. protocol, . reservedForTesting)
139+ XCTAssertEqual ( Int ( header. totalLength) , IPv4Header . size + data. readableBytes)
140+ XCTAssertEqual ( header. sourceIpAdress, . init( 127 , 0 , 0 , 1 ) )
141+ XCTAssertEqual ( header. destinationIpAddress, . init( 127 , 0 , 0 , 1 ) )
142+ return String ( buffer: data)
143+ } )
144+
145+ XCTAssertEqual ( receivedMessages, Set ( expectedMessages) )
146+ }
147+
148+ func testConnect( ) throws {
149+ try XCTSkipIfUserHasNotEnoughRightsForRawSocketAPI ( )
150+
151+ let elg = MultiThreadedEventLoopGroup ( numberOfThreads: 1 )
152+ defer { XCTAssertNoThrow ( try elg. syncShutdownGracefully ( ) ) }
153+ let readChannel = try RawSocketBootstrap ( group: elg)
154+ . channelInitializer {
155+ $0. pipeline. addHandler ( DatagramReadRecorder < ByteBuffer > ( ) , name: " ByteReadRecorder " )
156+ }
157+ . bind ( host: " 127.0.0.1 " , ipProtocol: . reservedForTesting) . wait ( )
158+ defer { XCTAssertNoThrow ( try readChannel. close ( ) . wait ( ) ) }
159+
160+ let writeChannel = try RawSocketBootstrap ( group: elg)
161+ . channelInitializer {
162+ $0. pipeline. addHandler ( DatagramReadRecorder < ByteBuffer > ( ) , name: " ByteReadRecorder " )
163+ }
164+ . bind ( host: " 127.0.0.1 " , ipProtocol: . reservedForTesting) . wait ( )
165+ defer { XCTAssertNoThrow ( try writeChannel. close ( ) . wait ( ) ) }
166+
167+ let expectedMessages = ( 0 ..< 10 ) . map { " Hello World \( $0) " }
168+ for message in expectedMessages {
169+ _ = try writeChannel. write ( AddressedEnvelope (
170+ remoteAddress: SocketAddress ( ipAddress: " 127.0.0.1 " , port: 0 ) ,
171+ data: ByteBuffer ( string: message)
172+ ) )
173+ }
174+ writeChannel. flush ( )
175+
176+ let receivedMessages = Set ( try readChannel. waitForDatagrams ( count: 10 ) . map { envelop -> String in
177+ var data = envelop. data
178+ let header = try XCTUnwrap ( IPv4Header ( buffer: & data) )
179+ XCTAssertEqual ( header. version, 4 )
180+ XCTAssertEqual ( header. protocol, . reservedForTesting)
181+ XCTAssertEqual ( Int ( header. totalLength) , IPv4Header . size + data. readableBytes)
182+ XCTAssertEqual ( header. sourceIpAdress, . init( 127 , 0 , 0 , 1 ) )
183+ XCTAssertEqual ( header. destinationIpAddress, . init( 127 , 0 , 0 , 1 ) )
184+ return String ( buffer: data)
56185 } )
57186
58187 XCTAssertEqual ( receivedMessages, Set ( expectedMessages) )
59188 }
189+
60190}
0 commit comments