Skip to content

Commit 50fa1ab

Browse files
committed
udpnat2: New synced udp nat service
1 parent b07fb48 commit 50fa1ab

File tree

12 files changed

+307
-40
lines changed

12 files changed

+307
-40
lines changed

common/bufio/conn.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,7 @@ func (w *ExtendedUDPConn) ReadPacket(buffer *buf.Buffer) (M.Socksaddr, error) {
3535

3636
func (w *ExtendedUDPConn) WritePacket(buffer *buf.Buffer, destination M.Socksaddr) error {
3737
defer buffer.Release()
38-
if destination.IsFqdn() {
39-
udpAddr, err := net.ResolveUDPAddr("udp", destination.String())
40-
if err != nil {
41-
return err
42-
}
43-
return common.Error(w.UDPConn.WriteTo(buffer.Bytes(), udpAddr))
44-
}
45-
return common.Error(w.UDPConn.WriteToUDP(buffer.Bytes(), destination.UDPAddr()))
38+
return common.Error(w.UDPConn.WriteToUDPAddrPort(buffer.Bytes(), destination.AddrPort()))
4639
}
4740

4841
func (w *ExtendedUDPConn) Upstream() any {

common/network/conn.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ type UDPHandler interface {
124124
}
125125

126126
type UDPHandlerEx interface {
127-
NewPacket(ctx context.Context, conn PacketConn, buffer *buf.Buffer, source M.Socksaddr, destination M.Socksaddr) error
127+
NewPacketEx(buffer *buf.Buffer, source M.Socksaddr, destination M.Socksaddr)
128128
}
129129

130130
// Deprecated: Use UDPConnectionHandlerEx instead.

common/network/direct.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,27 @@ func (o ReadWaitOptions) NeedHeadroom() bool {
1919
return o.FrontHeadroom > 0 || o.RearHeadroom > 0
2020
}
2121

22+
func (o ReadWaitOptions) Copy(buffer *buf.Buffer) *buf.Buffer {
23+
if o.FrontHeadroom > buffer.Start() ||
24+
o.RearHeadroom > buffer.FreeLen() {
25+
newBuffer := o.newBuffer(buf.UDPBufferSize, false)
26+
newBuffer.Write(buffer.Bytes())
27+
buffer.Release()
28+
return newBuffer
29+
} else {
30+
return buffer
31+
}
32+
}
33+
2234
func (o ReadWaitOptions) NewBuffer() *buf.Buffer {
23-
return o.newBuffer(buf.BufferSize)
35+
return o.newBuffer(buf.BufferSize, true)
2436
}
2537

2638
func (o ReadWaitOptions) NewPacketBuffer() *buf.Buffer {
27-
return o.newBuffer(buf.UDPBufferSize)
39+
return o.newBuffer(buf.UDPBufferSize, true)
2840
}
2941

30-
func (o ReadWaitOptions) newBuffer(defaultBufferSize int) *buf.Buffer {
42+
func (o ReadWaitOptions) newBuffer(defaultBufferSize int, reserve bool) *buf.Buffer {
3143
var bufferSize int
3244
if o.MTU > 0 {
3345
bufferSize = o.MTU + o.FrontHeadroom + o.RearHeadroom
@@ -36,9 +48,9 @@ func (o ReadWaitOptions) newBuffer(defaultBufferSize int) *buf.Buffer {
3648
}
3749
buffer := buf.NewSize(bufferSize)
3850
if o.FrontHeadroom > 0 {
39-
buffer.Resize(o.FrontHeadroom, 0)
51+
buffer.Advance(o.FrontHeadroom)
4052
}
41-
if o.RearHeadroom > 0 {
53+
if o.RearHeadroom > 0 && reserve {
4254
buffer.Reserve(o.RearHeadroom)
4355
}
4456
return buffer

common/udpnat/service.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,6 @@ func (s *Service[T]) NewContextPacketEx(ctx context.Context, key T, buffer *buf.
131131
s.nat.Delete(key)
132132
}
133133
}()
134-
} else {
135-
c.localAddr = source
136134
}
137135
if common.Done(c.ctx) {
138136
s.nat.Delete(key)
@@ -215,10 +213,6 @@ func (c *conn) SetWriteDeadline(t time.Time) error {
215213
return os.ErrInvalid
216214
}
217215

218-
func (c *conn) NeedAdditionalReadDeadline() bool {
219-
return true
220-
}
221-
222216
func (c *conn) Upstream() any {
223217
return c.source
224218
}

common/udpnat2/conn.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package udpnat
2+
3+
import (
4+
"io"
5+
"net"
6+
"os"
7+
"time"
8+
9+
"github.com/sagernet/sing/common/buf"
10+
M "github.com/sagernet/sing/common/metadata"
11+
N "github.com/sagernet/sing/common/network"
12+
"github.com/sagernet/sing/common/pipe"
13+
)
14+
15+
type natConn struct {
16+
writer N.PacketWriter
17+
localAddr M.Socksaddr
18+
packetChan chan *Packet
19+
doneChan chan struct{}
20+
readDeadline pipe.Deadline
21+
readWaitOptions N.ReadWaitOptions
22+
}
23+
24+
func (c *natConn) ReadPacket(buffer *buf.Buffer) (addr M.Socksaddr, err error) {
25+
select {
26+
case p := <-c.packetChan:
27+
_, err = buffer.ReadOnceFrom(p.Buffer)
28+
destination := p.Destination
29+
p.Buffer.Release()
30+
PutPacket(p)
31+
return destination, err
32+
case <-c.doneChan:
33+
return M.Socksaddr{}, io.ErrClosedPipe
34+
case <-c.readDeadline.Wait():
35+
return M.Socksaddr{}, os.ErrDeadlineExceeded
36+
}
37+
}
38+
39+
func (c *natConn) WritePacket(buffer *buf.Buffer, destination M.Socksaddr) error {
40+
return c.writer.WritePacket(buffer, destination)
41+
}
42+
43+
func (c *natConn) InitializeReadWaiter(options N.ReadWaitOptions) (needCopy bool) {
44+
c.readWaitOptions = options
45+
return false
46+
}
47+
48+
func (c *natConn) WaitReadPacket() (buffer *buf.Buffer, destination M.Socksaddr, err error) {
49+
select {
50+
case packet := <-c.packetChan:
51+
buffer = c.readWaitOptions.Copy(packet.Buffer)
52+
destination = packet.Destination
53+
PutPacket(packet)
54+
return
55+
case <-c.doneChan:
56+
return nil, M.Socksaddr{}, io.ErrClosedPipe
57+
case <-c.readDeadline.Wait():
58+
return nil, M.Socksaddr{}, os.ErrDeadlineExceeded
59+
}
60+
}
61+
62+
func (c *natConn) Close() error {
63+
select {
64+
case <-c.doneChan:
65+
default:
66+
close(c.doneChan)
67+
}
68+
return nil
69+
}
70+
71+
func (c *natConn) LocalAddr() net.Addr {
72+
return c.localAddr
73+
}
74+
75+
func (c *natConn) RemoteAddr() net.Addr {
76+
return M.Socksaddr{}
77+
}
78+
79+
func (c *natConn) SetDeadline(t time.Time) error {
80+
return os.ErrInvalid
81+
}
82+
83+
func (c *natConn) SetReadDeadline(t time.Time) error {
84+
c.readDeadline.Set(t)
85+
return nil
86+
}
87+
88+
func (c *natConn) SetWriteDeadline(t time.Time) error {
89+
return os.ErrInvalid
90+
}

common/udpnat2/packet.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package udpnat
2+
3+
import (
4+
M "github.com/sagernet/sing/common/metadata"
5+
"sync"
6+
7+
"github.com/sagernet/sing/common/buf"
8+
)
9+
10+
var packetPool = sync.Pool{
11+
New: func() any {
12+
return new(Packet)
13+
},
14+
}
15+
16+
type Packet struct {
17+
Buffer *buf.Buffer
18+
Destination M.Socksaddr
19+
}
20+
21+
func NewPacket() *Packet {
22+
return packetPool.Get().(*Packet)
23+
}
24+
25+
func PutPacket(packet *Packet) {
26+
*packet = Packet{}
27+
packetPool.Put(packet)
28+
}

common/udpnat2/service.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package udpnat
2+
3+
import (
4+
"context"
5+
"net/netip"
6+
"time"
7+
8+
"github.com/sagernet/sing/common"
9+
M "github.com/sagernet/sing/common/metadata"
10+
N "github.com/sagernet/sing/common/network"
11+
"github.com/sagernet/sing/common/pipe"
12+
"github.com/sagernet/sing/contrab/freelru"
13+
"github.com/sagernet/sing/contrab/maphash"
14+
)
15+
16+
type Service struct {
17+
nat *freelru.LRU[netip.AddrPort, *natConn]
18+
handler N.UDPConnectionHandlerEx
19+
prepare PrepareFunc
20+
metrics Metrics
21+
}
22+
23+
type PrepareFunc func(source M.Socksaddr, destination M.Socksaddr, userData any) (bool, context.Context, N.PacketWriter, N.CloseHandlerFunc)
24+
25+
type Metrics struct {
26+
Creates uint64
27+
Rejects uint64
28+
Inputs uint64
29+
Drops uint64
30+
}
31+
32+
func New(handler N.UDPConnectionHandlerEx, prepare PrepareFunc, timeout time.Duration) *Service {
33+
nat := common.Must1(freelru.New[netip.AddrPort, *natConn](1024, maphash.NewHasher[netip.AddrPort]().Hash32))
34+
nat.SetLifetime(timeout)
35+
nat.SetHealthCheck(func(port netip.AddrPort, conn *natConn) bool {
36+
select {
37+
case <-conn.doneChan:
38+
return false
39+
default:
40+
return true
41+
}
42+
})
43+
nat.SetOnEvict(func(_ netip.AddrPort, conn *natConn) {
44+
conn.Close()
45+
})
46+
return &Service{
47+
nat: nat,
48+
handler: handler,
49+
prepare: prepare,
50+
}
51+
}
52+
53+
func (s *Service) NewPacket(bufferSlices [][]byte, source M.Socksaddr, destination M.Socksaddr, userData any) {
54+
conn, loaded := s.nat.Get(source.AddrPort())
55+
if !loaded {
56+
ok, ctx, writer, onClose := s.prepare(source, destination, userData)
57+
if !ok {
58+
s.metrics.Rejects++
59+
return
60+
}
61+
conn = &natConn{
62+
writer: writer,
63+
localAddr: source,
64+
packetChan: make(chan *Packet, 64),
65+
doneChan: make(chan struct{}),
66+
readDeadline: pipe.MakeDeadline(),
67+
}
68+
s.nat.Add(source.AddrPort(), conn)
69+
s.handler.NewPacketConnectionEx(ctx, conn, source, destination, onClose)
70+
s.metrics.Creates++
71+
}
72+
packet := NewPacket()
73+
buffer := conn.readWaitOptions.NewPacketBuffer()
74+
for _, bufferSlice := range bufferSlices {
75+
buffer.Write(bufferSlice)
76+
}
77+
*packet = Packet{
78+
Buffer: buffer,
79+
Destination: destination,
80+
}
81+
select {
82+
case conn.packetChan <- packet:
83+
s.metrics.Inputs++
84+
default:
85+
packet.Buffer.Release()
86+
PutPacket(packet)
87+
s.metrics.Drops++
88+
}
89+
}
90+
91+
func (s *Service) Metrics() Metrics {
92+
return s.metrics
93+
}

0 commit comments

Comments
 (0)