Skip to content

Commit be33de8

Browse files
authored
Merge branch 'main' into Stream-lifts/PR
2 parents c33a1ac + fdaae89 commit be33de8

File tree

93 files changed

+3897
-1721
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

93 files changed

+3897
-1721
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ jobs:
339339
name: Test I/O on macOS
340340
strategy:
341341
matrix:
342-
os: [macos-latest]
342+
os: [macos-14]
343343
java: [temurin@17]
344344
project: [ioJS, ioJVM, ioNative]
345345
runs-on: ${{ matrix.os }}

build.sbt

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ ThisBuild / githubWorkflowAddedJobs +=
3434
scalas = Nil,
3535
sbtStepPreamble = Nil,
3636
javas = List(githubWorkflowJavaVersions.value.head),
37-
oses = List("macos-latest"),
37+
oses = List(
38+
"macos-14"
39+
), // FIXME: macos-15 breaks sending multicast to local network - https://github.com/actions/runner-images/issues/10924
3840
matrixAdds = Map("project" -> List("ioJS", "ioJVM", "ioNative")),
3941
steps = githubWorkflowJobSetup.value.toList ++ List(
4042
WorkflowStep.Run(List("brew install s2n"), cond = Some("matrix.project == 'ioNative'")),
@@ -272,6 +274,93 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
272274
),
273275
ProblemFilters.exclude[MissingTypesProblem](
274276
"fs2.interop.flow.StreamSubscriber$State$WaitingOnUpstream$"
277+
),
278+
// Network refactor: #3563
279+
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.Network.connect"),
280+
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.Network.bind"),
281+
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.Network.bindAndAccept"),
282+
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.Socket.address"),
283+
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.Socket.peerAddress"),
284+
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("fs2.io.net.Socket.address"),
285+
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("fs2.io.net.Socket.supportedOptions"),
286+
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("fs2.io.net.Socket.getOption"),
287+
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("fs2.io.net.Socket.setOption"),
288+
ProblemFilters.exclude[DirectMissingMethodProblem](
289+
"fs2.io.net.SocketCompanionPlatform#AsyncSocket.this"
290+
),
291+
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.SocketGroup$AbstractAsyncSocketGroup"),
292+
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.SocketGroupCompanionPlatform"),
293+
ProblemFilters.exclude[MissingClassProblem](
294+
"fs2.io.net.SocketGroupCompanionPlatform$AsyncSocketGroup"
295+
),
296+
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("fs2.io.net.tls.TLSSocket.address"),
297+
ProblemFilters.exclude[InheritedNewAbstractMethodProblem](
298+
"fs2.io.net.tls.TLSSocket.supportedOptions"
299+
),
300+
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("fs2.io.net.tls.TLSSocket.getOption"),
301+
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("fs2.io.net.tls.TLSSocket.setOption"),
302+
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.unixsocket.JdkUnixSockets"),
303+
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.unixsocket.JdkUnixSockets$"),
304+
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.unixsocket.JdkUnixSocketsImpl"),
305+
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.unixsocket.JnrUnixSockets"),
306+
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.unixsocket.JnrUnixSockets$"),
307+
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.unixsocket.JnrUnixSocketsImpl"),
308+
ProblemFilters.exclude[MissingClassProblem](
309+
"fs2.io.net.unixsocket.UnixSocketsCompanionPlatform$AsyncSocket"
310+
),
311+
ProblemFilters.exclude[MissingClassProblem](
312+
"fs2.io.net.unixsocket.UnixSocketsCompanionPlatform$AsyncUnixSockets"
313+
),
314+
ProblemFilters.exclude[IncompatibleMethTypeProblem]("fs2.io.net.SelectingSocket.apply"),
315+
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.SelectingSocketGroup"),
316+
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.io.net.Socket.forAsync"),
317+
ProblemFilters.exclude[ReversedMissingMethodProblem](
318+
"fs2.io.net.SocketOptionCompanionPlatform#Key.get"
319+
),
320+
ProblemFilters.exclude[InheritedNewAbstractMethodProblem](
321+
"fs2.io.net.Network.openDatagramSocket"
322+
),
323+
ProblemFilters.exclude[IncompatibleMethTypeProblem]("fs2.io.net.FdPollingSocket.apply"),
324+
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.FdPollingSocketGroup"),
325+
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.unixsocket.FdPollingUnixSockets"),
326+
ProblemFilters.exclude[IncompatibleResultTypeProblem](
327+
"fs2.io.net.AsynchronousDatagramSocketGroup#WriterDatagram.remote"
328+
),
329+
ProblemFilters.exclude[IncompatibleMethTypeProblem](
330+
"fs2.io.net.AsynchronousDatagramSocketGroup#WriterDatagram.this"
331+
),
332+
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.DatagramSocket.address"),
333+
ProblemFilters.exclude[ReversedMissingMethodProblem](
334+
"fs2.io.net.DatagramSocket.supportedOptions"
335+
),
336+
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.DatagramSocket.getOption"),
337+
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.DatagramSocket.setOption"),
338+
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.DatagramSocket.readGen"),
339+
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.DatagramSocket.connect"),
340+
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.DatagramSocket.disconnect"),
341+
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.DatagramSocket.write"),
342+
ProblemFilters.exclude[MissingClassProblem](
343+
"fs2.io.net.DatagramSocketGroupCompanionPlatform$AsyncDatagramSocketGroup"
344+
),
345+
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.Network.bindDatagramSocket"),
346+
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.SocketGroup$"),
347+
ProblemFilters.exclude[ReversedMissingMethodProblem](
348+
"fs2.io.net.SocketOptionCompanionPlatform#Key.fs2$io$net$SocketOptionCompanionPlatform$Key$$$outer"
349+
),
350+
ProblemFilters.exclude[ReversedMissingMethodProblem](
351+
"fs2.io.net.DatagramSocketOption#Key.toSocketOption"
352+
),
353+
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.DatagramSocket.join"),
354+
ProblemFilters.exclude[IncompatibleMethTypeProblem](
355+
"fs2.io.net.DatagramSocketOption.multicastInterface"
356+
),
357+
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.Network.dns"),
358+
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.Network.interfaces"),
359+
ProblemFilters.exclude[InheritedNewAbstractMethodProblem](
360+
"fs2.io.net.tls.TLSContext#Builder.fromKeyStoreFile"
361+
),
362+
ProblemFilters.exclude[InheritedNewAbstractMethodProblem](
363+
"fs2.io.net.tls.TLSContext#Builder.fs2$io$net$tls$TLSContextCompanionPlatform$BuilderPlatform$$$outer"
275364
)
276365
)
277366

flake.lock

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

io/js-jvm/src/main/scala/fs2/io/net/Network.scala

Lines changed: 0 additions & 65 deletions
This file was deleted.
File renamed without changes.

io/js-jvm/src/test/scala/fs2/io/net/udp/UdpSuite.scala

Lines changed: 83 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -31,25 +31,29 @@ import com.comcast.ip4s._
3131

3232
import scala.concurrent.duration._
3333

34-
class UdpSuite extends Fs2Suite with UdpSuitePlatform {
35-
def sendAndReceive(socket: DatagramSocket[IO], toSend: Datagram): IO[Datagram] =
34+
class UdpSuite extends Fs2Suite {
35+
private def sendAndReceive(socket: DatagramSocket[IO], toSend: Datagram): IO[Datagram] =
3636
socket
3737
.write(toSend) >> socket.read.timeoutTo(1.second, IO.defer(sendAndReceive(socket, toSend)))
3838

39+
private def sendAndReceiveBytes(socket: DatagramSocket[IO], bytes: Chunk[Byte]): IO[Datagram] =
40+
socket
41+
.write(bytes) >> socket.read.timeoutTo(1.second, IO.defer(sendAndReceiveBytes(socket, bytes)))
42+
3943
group("udp") {
4044
test("echo one") {
4145
val msg = Chunk.array("Hello, world!".getBytes)
4246
Stream
43-
.resource(Network[IO].openDatagramSocket())
47+
.resource(Network[IO].bindDatagramSocket(SocketAddress.Wildcard))
4448
.flatMap { serverSocket =>
45-
Stream.eval(serverSocket.localAddress).map(_.port).flatMap { serverPort =>
46-
val serverAddress = SocketAddress(ip"127.0.0.1", serverPort)
47-
val server = serverSocket.reads.foreach(packet => serverSocket.write(packet))
48-
val client = Stream.resource(Network[IO].openDatagramSocket()).evalMap { clientSocket =>
49-
sendAndReceive(clientSocket, Datagram(serverAddress, msg))
49+
val serverAddress = SocketAddress(ip"127.0.0.1", serverSocket.address.asIpUnsafe.port)
50+
val server = serverSocket.reads.foreach(packet => serverSocket.write(packet))
51+
val client =
52+
Stream.resource(Network[IO].bindDatagramSocket(SocketAddress.Wildcard)).evalMap {
53+
clientSocket =>
54+
sendAndReceive(clientSocket, Datagram(serverAddress, msg))
5055
}
51-
client.concurrently(server)
52-
}
56+
client.concurrently(server)
5357
}
5458
.compile
5559
.lastOrError
@@ -67,55 +71,88 @@ class UdpSuite extends Fs2Suite with UdpSuitePlatform {
6771
.sorted
6872

6973
Stream
70-
.resource(Network[IO].openDatagramSocket())
74+
.resource(Network[IO].bindDatagramSocket())
7175
.flatMap { serverSocket =>
72-
Stream.eval(serverSocket.localAddress).map(_.port).flatMap { serverPort =>
73-
val serverAddress = SocketAddress(ip"127.0.0.1", serverPort)
74-
val server = serverSocket.reads.foreach(packet => serverSocket.write(packet))
75-
val client = Stream.resource(Network[IO].openDatagramSocket()).flatMap { clientSocket =>
76-
Stream
77-
.emits(msgs.map(msg => Datagram(serverAddress, msg)))
78-
.evalMap(msg => sendAndReceive(clientSocket, msg))
79-
}
80-
val clients = Stream
81-
.constant(client)
82-
.take(numClients.toLong)
83-
.parJoin(numParallelClients)
84-
clients.concurrently(server)
76+
val serverAddress = SocketAddress(ip"127.0.0.1", serverSocket.address.asIpUnsafe.port)
77+
val server = serverSocket.reads.foreach(packet => serverSocket.write(packet))
78+
val client = Stream.resource(Network[IO].bindDatagramSocket()).flatMap { clientSocket =>
79+
Stream
80+
.emits(msgs.map(msg => Datagram(serverAddress, msg)))
81+
.evalMap(msg => sendAndReceive(clientSocket, msg))
8582
}
83+
val clients = Stream
84+
.constant(client)
85+
.take(numClients.toLong)
86+
.parJoin(numParallelClients)
87+
clients.concurrently(server)
8688
}
8789
.compile
8890
.toVector
8991
.map(_.map(p => new String(p.bytes.toArray)).sorted)
9092
.assertEquals(expected)
9193
}
9294

93-
test("multicast".ignore) {
94-
// Fails often based on routing table of host machine
95-
val group = mip"232.10.10.10"
96-
val groupJoin = MulticastJoin.asm(group)
95+
test("echo connected") {
9796
val msg = Chunk.array("Hello, world!".getBytes)
9897
Stream
99-
.resource(
100-
Network[IO].openDatagramSocket(
101-
options = List(DatagramSocketOption.multicastTtl(1)),
102-
protocolFamily = Some(v4ProtocolFamily)
103-
)
104-
)
98+
.resource(Network[IO].bindDatagramSocket())
10599
.flatMap { serverSocket =>
106-
Stream.eval(serverSocket.localAddress).map(_.port).flatMap { serverPort =>
107-
val server = Stream
108-
.exec(
109-
v4Interfaces.traverse_(interface => serverSocket.join(groupJoin, interface))
110-
) ++
111-
serverSocket.reads.foreach(packet => serverSocket.write(packet))
112-
val client = Stream.resource(Network[IO].openDatagramSocket()).flatMap { clientSocket =>
113-
Stream(Datagram(SocketAddress(group.address, serverPort), msg))
114-
.through(clientSocket.writes)
115-
.drain ++ Stream.eval(clientSocket.read)
116-
}
117-
client.concurrently(server)
100+
val serverAddress = serverSocket.address.asIpUnsafe
101+
val server = serverSocket.reads.foreach(packet => serverSocket.write(packet))
102+
val client = Stream.resource(Network[IO].bindDatagramSocket()).evalMap { clientSocket =>
103+
clientSocket.connect(serverAddress) >> sendAndReceiveBytes(clientSocket, msg)
118104
}
105+
client.concurrently(server)
106+
}
107+
.compile
108+
.lastOrError
109+
.map(_.bytes)
110+
.assertEquals(msg)
111+
}
112+
113+
test("multicast") {
114+
val group = mip"239.10.10.10"
115+
val groupJoin = MulticastJoin.asm(group)
116+
val msg = Chunk.array("Hello, world!".getBytes)
117+
val outgoingInterface =
118+
// Get first non-loopback interface with an IPv4 address
119+
Network[IO].interfaces.getAll.map { interfaces =>
120+
interfaces.values
121+
.filterNot(_.isLoopback)
122+
.flatMap(iface =>
123+
iface.addresses.filter(_.address.fold(_ => true, _ => false)).as(iface)
124+
)
125+
.head
126+
}
127+
Stream
128+
.eval(outgoingInterface)
129+
.flatMap { out =>
130+
Stream
131+
.resource(
132+
Network[IO]
133+
.bindDatagramSocket(
134+
options = List(SocketOption.multicastTtl(1), SocketOption.multicastInterface(out))
135+
)
136+
.evalMap { serverSocket =>
137+
Network[IO].interfaces.getAll.flatMap { interfaces =>
138+
interfaces.values.toList
139+
.filter(iface =>
140+
iface.addresses.exists(_.address.fold(_ => true, _ => false))
141+
)
142+
.traverse_(iface => serverSocket.join(groupJoin, iface))
143+
.as(serverSocket)
144+
}
145+
}
146+
)
147+
.flatMap { serverSocket =>
148+
val server = serverSocket.reads.foreach(packet => serverSocket.write(packet))
149+
val client =
150+
Stream.resource(Network[IO].bindDatagramSocket()).flatMap { clientSocket =>
151+
val to = SocketAddress(group.address, serverSocket.address.asIpUnsafe.port)
152+
Stream.eval(clientSocket.write(msg, to) >> clientSocket.read)
153+
}
154+
client.concurrently(server)
155+
}
119156
}
120157
.compile
121158
.lastOrError

0 commit comments

Comments
 (0)