Skip to content

Commit 5cdf355

Browse files
committed
wip options
1 parent 473ed5a commit 5cdf355

File tree

5 files changed

+324
-43
lines changed

5 files changed

+324
-43
lines changed

association.go

Lines changed: 53 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ type Association struct {
228228
// Congestion control parameters
229229
maxReceiveBufferSize uint32
230230
maxMessageSize uint32
231+
rtoMax float64
231232
cwnd uint32 // my congestion window size
232233
rwnd uint32 // calculated peer's receiver windows size
233234
ssthresh uint32 // slow start threshold
@@ -248,9 +249,9 @@ type Association struct {
248249
ackTimer *ackTimer
249250

250251
// RACK / TLP state
252+
rack rackSettings // rack configurable options
251253
rackReoWnd time.Duration // dynamic reordering window
252254
rackMinRTT time.Duration // min observed RTT
253-
rackMinRTTWnd *windowedMin // the window used to determine minRTT, defaults to 30s
254255
rackDeliveredTime time.Time // send time of most recently delivered original chunk
255256
rackHighestDeliveredOrigTSN uint32
256257
rackReorderingSeen bool // ever observed reordering for this association
@@ -266,9 +267,6 @@ type Association struct {
266267
rackDeadline time.Time
267268
ptoDeadline time.Time
268269

269-
rackWCDelAck time.Duration // 200ms default
270-
rackReoWndFloor time.Duration
271-
272270
// Chunks stored for retransmission
273271
storedInit *chunkInit
274272
storedCookieEcho *chunkCookieEcho
@@ -318,16 +316,17 @@ type Association struct {
318316
// Config collects the arguments to createAssociation construction into
319317
// a single structure.
320318
type Config struct {
321-
Name string
322-
NetConn net.Conn
323-
MaxReceiveBufferSize uint32
324-
MaxMessageSize uint32
325-
EnableZeroChecksum bool
326-
LoggerFactory logging.LoggerFactory
327-
BlockWrite bool
328-
MTU uint32
319+
LoggerFactory logging.LoggerFactory
320+
Name string
321+
NetConn net.Conn
322+
323+
BlockWrite bool
324+
EnableZeroChecksum bool
325+
MTU uint32
329326

330327
// congestion control configuration
328+
MaxReceiveBufferSize uint32
329+
MaxMessageSize uint32
331330
// RTOMax is the maximum retransmission timeout in milliseconds
332331
RTOMax float64
333332
// Minimum congestion window
@@ -337,16 +336,12 @@ type Config struct {
337336
// Step of congestion window increase at Congestion Avoidance
338337
CwndCAStep uint32
339338

340-
// The RACK configs are currently private as SCTP will be reworked to use the
341-
// modern options pattern in a future release.
342-
// Optional: size of window used to determine minimum RTT for RACK (defaults to 30s)
343-
rackMinRTTWnd time.Duration
344-
// Optional: cap the minimum reordering window: 0 = use quarter-RTT
345-
rackReoWndFloor time.Duration
346-
// Optional: receiver worst-case delayed-ACK for PTO when only one packet is in flight
347-
rackWCDelAck time.Duration
339+
// RACK config options
340+
rack rackSettings
348341
}
349342

343+
// TODO: should we deprecate this in favor of the WithOptions style?
344+
350345
// Server accepts a SCTP stream over a conn.
351346
func Server(config Config) (*Association, error) {
352347
a := createAssociation(config)
@@ -390,6 +385,21 @@ func createClientWithContext(ctx context.Context, config Config) (*Association,
390385
}
391386
}
392387

388+
// newAssociationWithOptions finalizes a pre-configured association with optional overrides.
389+
//
390+
//nolint:gocognit,cyclop
391+
func newAssociationWithOptions(assoc *Association, opts ...AssociationOption) (*Association, error) {
392+
var err error
393+
394+
for _, opt := range opts {
395+
if err = opt(assoc); err != nil {
396+
return nil, err
397+
}
398+
}
399+
400+
return assoc, nil
401+
}
402+
393403
func createAssociation(config Config) *Association {
394404
maxReceiveBufferSize := config.MaxReceiveBufferSize
395405
if maxReceiveBufferSize == 0 {
@@ -406,6 +416,8 @@ func createAssociation(config Config) *Association {
406416
mtu = initialMTU
407417
}
408418

419+
rtoMax := config.RTOMax
420+
409421
tsn := globalMathRandomGenerator.Uint32()
410422
assoc := &Association{
411423
netConn: config.NetConn,
@@ -459,18 +471,20 @@ func createAssociation(config Config) *Association {
459471
assoc.tlrBurstLaterRTTUnits = tlrBurstDefaultLaterRTT
460472

461473
// RACK defaults
462-
assoc.rackWCDelAck = config.rackWCDelAck
463-
if assoc.rackWCDelAck == 0 {
464-
assoc.rackWCDelAck = 200 * time.Millisecond // WCDelAckT, RACK for SCTP section 2C
474+
assoc.rack.rackWCDelAck = config.rack.rackWCDelAck
475+
if assoc.rack.rackWCDelAck == 0 {
476+
assoc.rack.rackWCDelAck = 200 * time.Millisecond // WCDelAckT, RACK for SCTP section 2C
465477
}
466478

467-
// defaults to 30s window to determine minRTT
468-
assoc.rackMinRTTWnd = newWindowedMin(config.rackMinRTTWnd)
479+
assoc.rack.rackMinRTTWnd = config.rack.rackMinRTTWnd
480+
if assoc.rack.rackMinRTTWnd == nil {
481+
assoc.rack.rackMinRTTWnd = newWindowedMin(30 * time.Second)
482+
}
469483

470484
assoc.timerUpdateCh = make(chan struct{}, 1)
471485
go assoc.timerLoop()
472486

473-
assoc.rackReoWndFloor = config.rackReoWndFloor // optional floor; usually 0
487+
assoc.rack.rackReoWndFloor = config.rack.rackReoWndFloor // optional floor; usually 0
474488
assoc.rackKeepInflatedRecoveries = 0
475489

476490
if assoc.name == "" {
@@ -486,11 +500,11 @@ func createAssociation(config Config) *Association {
486500
assoc.name, assoc.CWND(), assoc.ssthresh, assoc.inflightQueue.getNumBytes())
487501

488502
assoc.srtt.Store(float64(0))
489-
assoc.t1Init = newRTXTimer(timerT1Init, assoc, maxInitRetrans, config.RTOMax)
490-
assoc.t1Cookie = newRTXTimer(timerT1Cookie, assoc, maxInitRetrans, config.RTOMax)
491-
assoc.t2Shutdown = newRTXTimer(timerT2Shutdown, assoc, noMaxRetrans, config.RTOMax)
492-
assoc.t3RTX = newRTXTimer(timerT3RTX, assoc, noMaxRetrans, config.RTOMax)
493-
assoc.tReconfig = newRTXTimer(timerReconfig, assoc, noMaxRetrans, config.RTOMax)
503+
assoc.t1Init = newRTXTimer(timerT1Init, assoc, maxInitRetrans, rtoMax)
504+
assoc.t1Cookie = newRTXTimer(timerT1Cookie, assoc, maxInitRetrans, rtoMax)
505+
assoc.t2Shutdown = newRTXTimer(timerT2Shutdown, assoc, noMaxRetrans, rtoMax)
506+
assoc.t3RTX = newRTXTimer(timerT3RTX, assoc, noMaxRetrans, rtoMax)
507+
assoc.tReconfig = newRTXTimer(timerReconfig, assoc, noMaxRetrans, rtoMax)
494508
assoc.ackTimer = newAckTimer(assoc)
495509

496510
return assoc
@@ -1617,7 +1631,7 @@ func (a *Association) handleHeartbeatAck(c *chunkHeartbeatAck) {
16171631
srtt := a.rtoMgr.setNewRTT(rttMs)
16181632
a.srtt.Store(srtt)
16191633

1620-
a.rackMinRTTWnd.Push(now, now.Sub(sent))
1634+
a.rack.rackMinRTTWnd.Push(now, now.Sub(sent))
16211635

16221636
a.log.Tracef("[%s] HB RTT: measured=%.3fms srtt=%.3fms rto=%.3fms",
16231637
a.name, rttMs, srtt, a.rtoMgr.getRTO())
@@ -1955,7 +1969,7 @@ func (a *Association) processSelectiveAck(selectiveAckChunk *chunkSelectiveAck)
19551969
// use a window to determine minRtt instead of a global min
19561970
// as the RTT can fluctuate, which can cause problems if going from a
19571971
// high RTT to a low RTT.
1958-
a.rackMinRTTWnd.Push(now, now.Sub(chunkPayload.since))
1972+
a.rack.rackMinRTTWnd.Push(now, now.Sub(chunkPayload.since))
19591973

19601974
a.log.Tracef("[%s] SACK: measured-rtt=%f srtt=%f new-rto=%f",
19611975
a.name, rtt, srtt, a.rtoMgr.getRTO())
@@ -2012,7 +2026,7 @@ func (a *Association) processSelectiveAck(selectiveAckChunk *chunkSelectiveAck)
20122026
srtt := a.rtoMgr.setNewRTT(rtt)
20132027
a.srtt.Store(srtt)
20142028

2015-
a.rackMinRTTWnd.Push(now, now.Sub(chunkPayload.since))
2029+
a.rack.rackMinRTTWnd.Push(now, now.Sub(chunkPayload.since))
20162030

20172031
a.log.Tracef("[%s] SACK: measured-rtt=%f srtt=%f new-rto=%f",
20182032
a.name, rtt, srtt, a.rtoMgr.getRTO())
@@ -3502,13 +3516,13 @@ func (a *Association) onRackAfterSACK( // nolint:gocognit,cyclop,gocyclo
35023516
}
35033517

35043518
// 2) Maintain ReoWND (RACK for SCTP section 2B)
3505-
if minRTT := a.rackMinRTTWnd.Min(currTime); minRTT > 0 {
3519+
if minRTT := a.rack.rackMinRTTWnd.Min(currTime); minRTT > 0 {
35063520
a.rackMinRTT = minRTT
35073521
}
35083522

35093523
var base time.Duration
35103524
if a.rackMinRTT > 0 {
3511-
base = max(a.rackMinRTT/4, a.rackReoWndFloor)
3525+
base = max(a.rackMinRTT/4, a.rack.rackReoWndFloor)
35123526
}
35133527

35143528
// Suppress during recovery if no reordering ever seen; else (re)initialize from base if zero.
@@ -3521,7 +3535,7 @@ func (a *Association) onRackAfterSACK( // nolint:gocognit,cyclop,gocyclo
35213535
// DSACK-style inflation using SCTP duplicate TSNs (RACK for SCTP section 3 noting SCTP
35223536
// natively reports duplicates + RACK for SCTP section 2B policy)
35233537
if len(sack.duplicateTSN) > 0 && a.rackMinRTT > 0 {
3524-
a.rackReoWnd += max(a.rackMinRTT/4, a.rackReoWndFloor)
3538+
a.rackReoWnd += max(a.rackMinRTT/4, a.rack.rackReoWndFloor)
35253539
// keep inflated for 16 loss recoveries before reset
35263540
a.rackKeepInflatedRecoveries = 16
35273541
a.log.Tracef("[%s] RACK: DSACK/dupTSN seen, inflate reoWnd to %v", a.name, a.rackReoWnd)
@@ -3619,7 +3633,7 @@ func (a *Association) onRackAfterSACK( // nolint:gocognit,cyclop,gocyclo
36193633
extra := 2 * time.Millisecond
36203634

36213635
if a.inflightQueue.size() == 1 {
3622-
extra = a.rackWCDelAck // 200ms for single outstanding, else 2ms
3636+
extra = a.rack.rackWCDelAck // 200ms for single outstanding, else 2ms
36233637
}
36243638

36253639
pto = 2*srtt + extra
@@ -3645,7 +3659,7 @@ func (a *Association) schedulePTOAfterSendLocked() {
36453659
extra := 2 * time.Millisecond
36463660

36473661
if a.inflightQueue.size() == 1 {
3648-
extra = a.rackWCDelAck
3662+
extra = a.rack.rackWCDelAck
36493663
}
36503664

36513665
pto = 2*srtt + extra

association_options.go

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
2+
// SPDX-License-Identifier: MIT
3+
4+
package sctp
5+
6+
import (
7+
"net"
8+
9+
"github.com/pion/logging"
10+
)
11+
12+
// AssociationOption represents a function that can be used to configure an Association.
13+
type AssociationOption func(*Association) error
14+
15+
// TODO: should these options be split into ServerOption and ClientOption?
16+
// im not sure what separate options they would have though...
17+
18+
// ServerOption configures a Server.
19+
type ServerOption interface {
20+
applyServer(*Config) error
21+
}
22+
23+
// ClientOption configures a Client.
24+
type ClientOption interface {
25+
applyClient(*Config) error
26+
}
27+
28+
// WithLoggerFactory sets the logger factory for the association.
29+
func WithLoggerFactory(loggerFactory logging.LoggerFactory) AssociationOption {
30+
return func(a *Association) error {
31+
a.log = loggerFactory.NewLogger("sctp")
32+
33+
return nil
34+
}
35+
}
36+
37+
// WithName sets the name of the association.
38+
func WithName(name string) AssociationOption {
39+
return func(a *Association) error {
40+
a.name = name
41+
42+
return nil
43+
}
44+
}
45+
46+
// WithNetConn sets the net.Conn used by the association.
47+
func WithNetConn(net net.Conn) AssociationOption {
48+
return func(a *Association) error {
49+
a.netConn = net
50+
51+
return nil
52+
}
53+
}
54+
55+
// WithBlockWrite sets whether the association should use blocking writes.
56+
// By default this is false.
57+
func WithBlockWrite(b bool) AssociationOption {
58+
return func(a *Association) error {
59+
a.blockWrite = b
60+
61+
return nil
62+
}
63+
}
64+
65+
// WithEnableZeroChecksum sets whether the association should accept zero as a valid checksum.
66+
// By default this is false.
67+
func WithEnableZeroChecksum(b bool) AssociationOption {
68+
return func(a *Association) error {
69+
a.recvZeroChecksum = b
70+
71+
return nil
72+
}
73+
}
74+
75+
// WithMTU sets the MTU size for the association.
76+
// By default this is 1228.
77+
func WithMTU(size uint32) AssociationOption {
78+
return func(a *Association) error {
79+
if size == 0 {
80+
return ErrZeroMTUOption
81+
}
82+
a.mtu = size
83+
84+
return nil
85+
}
86+
}
87+
88+
// Congestion control options //
89+
90+
// WithMaxReceiveBufferSize sets the maximum receive buffer size for the association.
91+
// By default this is 1024 * 1024 = 1048576.
92+
func WithMaxReceiveBufferSize(size uint32) AssociationOption {
93+
return func(a *Association) error {
94+
if size == 0 {
95+
return ErrZeroMaxReceiveBufferOption
96+
}
97+
a.maxReceiveBufferSize = size
98+
99+
return nil
100+
}
101+
}
102+
103+
// WithMaxMessageSize sets the maximum message size for the association.
104+
// By default this is 65536.
105+
func WithMaxMessageSize(size uint32) AssociationOption {
106+
return func(a *Association) error {
107+
if size == 0 {
108+
return ErrZeroMaxMessageSize
109+
}
110+
a.maxMessageSize = size
111+
112+
return nil
113+
}
114+
}
115+
116+
// WithRTOMax sets the max retransmission timeout in ms for the association.
117+
func WithRTOMax(rtoMax float64) AssociationOption {
118+
return func(a *Association) error {
119+
if rtoMax <= 0 {
120+
return ErrInvalidRTOMax
121+
}
122+
a.rtoMax = rtoMax
123+
124+
return nil
125+
}
126+
}
127+
128+
// WithMinCwnd sets the minimum congestion window for the association.
129+
func WithMinCwnd(minCwnd uint32) AssociationOption {
130+
return func(a *Association) error {
131+
a.minCwnd = minCwnd
132+
133+
return nil
134+
}
135+
}
136+
137+
// WithFastRtxWnd sets the fast retransmission window for the association.
138+
func WithFastRtxWnd(fastRtxWnd uint32) AssociationOption {
139+
return func(a *Association) error {
140+
a.fastRtxWnd = fastRtxWnd
141+
142+
return nil
143+
}
144+
}
145+
146+
// WithCwndCAStep sets the congestion window congestion avoidance step for the association.
147+
func WithCwndCAStep(cwndCAStep uint32) AssociationOption {
148+
return func(a *Association) error {
149+
a.cwndCAStep = cwndCAStep
150+
151+
return nil
152+
}
153+
}

0 commit comments

Comments
 (0)