@@ -3,6 +3,7 @@ package libp2pwebrtc
33import (
44 "crypto/rand"
55 "errors"
6+ "fmt"
67 "io"
78 "os"
89 "sync/atomic"
@@ -148,8 +149,8 @@ func TestStreamSimpleReadWriteClose(t *testing.T) {
148149 client , server := getDetachedDataChannels (t )
149150
150151 var clientDone , serverDone atomic.Bool
151- clientStr := newStream (client .dc , client .rwc , func () { clientDone .Store (true ) })
152- serverStr := newStream (server .dc , server .rwc , func () { serverDone .Store (true ) })
152+ clientStr := newStream (client .dc , client .rwc , maxSendMessageSize , func () { clientDone .Store (true ) })
153+ serverStr := newStream (server .dc , server .rwc , maxSendMessageSize , func () { serverDone .Store (true ) })
153154
154155 // send a foobar from the client
155156 n , err := clientStr .Write ([]byte ("foobar" ))
@@ -194,8 +195,8 @@ func TestStreamSimpleReadWriteClose(t *testing.T) {
194195func TestStreamPartialReads (t * testing.T ) {
195196 client , server := getDetachedDataChannels (t )
196197
197- clientStr := newStream (client .dc , client .rwc , func () {})
198- serverStr := newStream (server .dc , server .rwc , func () {})
198+ clientStr := newStream (client .dc , client .rwc , maxSendMessageSize , func () {})
199+ serverStr := newStream (server .dc , server .rwc , maxSendMessageSize , func () {})
199200
200201 _ , err := serverStr .Write ([]byte ("foobar" ))
201202 require .NoError (t , err )
@@ -217,8 +218,8 @@ func TestStreamPartialReads(t *testing.T) {
217218func TestStreamSkipEmptyFrames (t * testing.T ) {
218219 client , server := getDetachedDataChannels (t )
219220
220- clientStr := newStream (client .dc , client .rwc , func () {})
221- serverStr := newStream (server .dc , server .rwc , func () {})
221+ clientStr := newStream (client .dc , client .rwc , maxSendMessageSize , func () {})
222+ serverStr := newStream (server .dc , server .rwc , maxSendMessageSize , func () {})
222223
223224 for i := 0 ; i < 10 ; i ++ {
224225 require .NoError (t , serverStr .writer .WriteMsg (& pb.Message {}))
@@ -252,7 +253,7 @@ func TestStreamSkipEmptyFrames(t *testing.T) {
252253func TestStreamReadReturnsOnClose (t * testing.T ) {
253254 client , _ := getDetachedDataChannels (t )
254255
255- clientStr := newStream (client .dc , client .rwc , func () {})
256+ clientStr := newStream (client .dc , client .rwc , maxSendMessageSize , func () {})
256257 errChan := make (chan error , 1 )
257258 go func () {
258259 _ , err := clientStr .Read ([]byte {0 })
@@ -275,8 +276,8 @@ func TestStreamResets(t *testing.T) {
275276 client , server := getDetachedDataChannels (t )
276277
277278 var clientDone , serverDone atomic.Bool
278- clientStr := newStream (client .dc , client .rwc , func () { clientDone .Store (true ) })
279- serverStr := newStream (server .dc , server .rwc , func () { serverDone .Store (true ) })
279+ clientStr := newStream (client .dc , client .rwc , maxSendMessageSize , func () { clientDone .Store (true ) })
280+ serverStr := newStream (server .dc , server .rwc , maxSendMessageSize , func () { serverDone .Store (true ) })
280281
281282 // send a foobar from the client
282283 _ , err := clientStr .Write ([]byte ("foobar" ))
@@ -311,8 +312,8 @@ func TestStreamResets(t *testing.T) {
311312func TestStreamReadDeadlineAsync (t * testing.T ) {
312313 client , server := getDetachedDataChannels (t )
313314
314- clientStr := newStream (client .dc , client .rwc , func () {})
315- serverStr := newStream (server .dc , server .rwc , func () {})
315+ clientStr := newStream (client .dc , client .rwc , maxSendMessageSize , func () {})
316+ serverStr := newStream (server .dc , server .rwc , maxSendMessageSize , func () {})
316317
317318 timeout := 100 * time .Millisecond
318319 if os .Getenv ("CI" ) != "" {
@@ -342,8 +343,8 @@ func TestStreamReadDeadlineAsync(t *testing.T) {
342343func TestStreamWriteDeadlineAsync (t * testing.T ) {
343344 client , server := getDetachedDataChannels (t )
344345
345- clientStr := newStream (client .dc , client .rwc , func () {})
346- serverStr := newStream (server .dc , server .rwc , func () {})
346+ clientStr := newStream (client .dc , client .rwc , maxSendMessageSize , func () {})
347+ serverStr := newStream (server .dc , server .rwc , maxSendMessageSize , func () {})
347348 _ = serverStr
348349
349350 b := make ([]byte , 1024 )
@@ -372,8 +373,8 @@ func TestStreamWriteDeadlineAsync(t *testing.T) {
372373func TestStreamReadAfterClose (t * testing.T ) {
373374 client , server := getDetachedDataChannels (t )
374375
375- clientStr := newStream (client .dc , client .rwc , func () {})
376- serverStr := newStream (server .dc , server .rwc , func () {})
376+ clientStr := newStream (client .dc , client .rwc , maxSendMessageSize , func () {})
377+ serverStr := newStream (server .dc , server .rwc , maxSendMessageSize , func () {})
377378
378379 serverStr .Close ()
379380 b := make ([]byte , 1 )
@@ -384,8 +385,8 @@ func TestStreamReadAfterClose(t *testing.T) {
384385
385386 client , server = getDetachedDataChannels (t )
386387
387- clientStr = newStream (client .dc , client .rwc , func () {})
388- serverStr = newStream (server .dc , server .rwc , func () {})
388+ clientStr = newStream (client .dc , client .rwc , maxSendMessageSize , func () {})
389+ serverStr = newStream (server .dc , server .rwc , maxSendMessageSize , func () {})
389390
390391 serverStr .Reset ()
391392 b = make ([]byte , 1 )
@@ -399,8 +400,8 @@ func TestStreamCloseAfterFINACK(t *testing.T) {
399400 client , server := getDetachedDataChannels (t )
400401
401402 done := make (chan bool , 1 )
402- clientStr := newStream (client .dc , client .rwc , func () { done <- true })
403- serverStr := newStream (server .dc , server .rwc , func () {})
403+ clientStr := newStream (client .dc , client .rwc , maxSendMessageSize , func () { done <- true })
404+ serverStr := newStream (server .dc , server .rwc , maxSendMessageSize , func () {})
404405
405406 go func () {
406407 err := clientStr .Close ()
@@ -427,8 +428,8 @@ func TestStreamFinAckAfterStopSending(t *testing.T) {
427428 client , server := getDetachedDataChannels (t )
428429
429430 done := make (chan bool , 1 )
430- clientStr := newStream (client .dc , client .rwc , func () { done <- true })
431- serverStr := newStream (server .dc , server .rwc , func () {})
431+ clientStr := newStream (client .dc , client .rwc , maxSendMessageSize , func () { done <- true })
432+ serverStr := newStream (server .dc , server .rwc , maxSendMessageSize , func () {})
432433
433434 go func () {
434435 clientStr .CloseRead ()
@@ -460,8 +461,8 @@ func TestStreamConcurrentClose(t *testing.T) {
460461
461462 start := make (chan bool , 2 )
462463 done := make (chan bool , 2 )
463- clientStr := newStream (client .dc , client .rwc , func () { done <- true })
464- serverStr := newStream (server .dc , server .rwc , func () { done <- true })
464+ clientStr := newStream (client .dc , client .rwc , maxSendMessageSize , func () { done <- true })
465+ serverStr := newStream (server .dc , server .rwc , maxSendMessageSize , func () { done <- true })
465466
466467 go func () {
467468 start <- true
@@ -495,7 +496,7 @@ func TestStreamResetAfterClose(t *testing.T) {
495496 client , server := getDetachedDataChannels (t )
496497
497498 done := make (chan bool , 2 )
498- clientStr := newStream (client .dc , client .rwc , func () { done <- true })
499+ clientStr := newStream (client .dc , client .rwc , maxSendMessageSize , func () { done <- true })
499500 clientStr .Close ()
500501
501502 select {
@@ -520,7 +521,7 @@ func TestStreamDataChannelCloseOnFINACK(t *testing.T) {
520521 client , server := getDetachedDataChannels (t )
521522
522523 done := make (chan bool , 1 )
523- clientStr := newStream (client .dc , client .rwc , func () { done <- true })
524+ clientStr := newStream (client .dc , client .rwc , maxSendMessageSize , func () { done <- true })
524525
525526 clientStr .Close ()
526527
@@ -540,24 +541,34 @@ func TestStreamDataChannelCloseOnFINACK(t *testing.T) {
540541}
541542
542543func TestStreamChunking (t * testing.T ) {
543- client , server := getDetachedDataChannels (t )
544-
545- clientStr := newStream (client .dc , client .rwc , func () {})
546- serverStr := newStream (server .dc , server .rwc , func () {})
547-
548- const N = (16 << 10 ) + 1000
549- go func () {
550- data := make ([]byte , N )
551- _ , err := clientStr .Write (data )
552- require .NoError (t , err )
553- }()
554-
555- data := make ([]byte , N )
556- n , err := serverStr .Read (data )
557- require .NoError (t , err )
558- require .LessOrEqual (t , n , 16 << 10 )
544+ for _ , msgSize := range []int {16 << 10 , 32 << 10 , 64 << 10 , 128 << 10 , 256 << 10 } {
545+ t .Run (fmt .Sprintf ("msgSize=%d" , msgSize ), func (t * testing.T ) {
546+ client , server := getDetachedDataChannels (t )
547+ defer client .dc .Close ()
548+ defer server .dc .Close ()
549+
550+ clientStr := newStream (client .dc , client .rwc , msgSize , func () {})
551+ // server should read large messages even if it can only send 16 kB messages.
552+ serverStr := newStream (server .dc , server .rwc , 16 << 10 , func () {})
553+
554+ N := msgSize + 1000
555+ input := make ([]byte , N )
556+ _ , err := rand .Read (input )
557+ require .NoError (t , err )
558+ go func () {
559+ _ , err = clientStr .Write (input )
560+ require .NoError (t , err )
561+ }()
559562
560- nn , err := serverStr .Read (data )
561- require .NoError (t , err )
562- require .Equal (t , nn + n , N )
563+ data := make ([]byte , N )
564+ n , err := serverStr .Read (data )
565+ require .NoError (t , err )
566+ require .LessOrEqual (t , n , msgSize )
567+ // shouldn't be much less than msgSize
568+ require .GreaterOrEqual (t , n , msgSize - 100 )
569+ _ , err = serverStr .Read (data [n :])
570+ require .NoError (t , err )
571+ require .Equal (t , input , data )
572+ })
573+ }
563574}
0 commit comments