Skip to content

Commit 94f0582

Browse files
committed
udpnat2: Add SetHandler
1 parent cd17884 commit 94f0582

File tree

6 files changed

+84
-59
lines changed

6 files changed

+84
-59
lines changed

common/bufio/cache.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,10 +184,12 @@ func (c *CachedPacketConn) ReadCachedPacket() *N.PacketBuffer {
184184
if buffer != nil {
185185
buffer.DecRef()
186186
}
187-
return &N.PacketBuffer{
187+
packet := N.NewPacketBuffer()
188+
*packet = N.PacketBuffer{
188189
Buffer: buffer,
189190
Destination: c.destination,
190191
}
192+
return packet
191193
}
192194

193195
func (c *CachedPacketConn) Upstream() any {

common/network/conn.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ type UDPHandler interface {
124124
}
125125

126126
type UDPHandlerEx interface {
127-
NewPacketEx(buffer *buf.Buffer, source M.Socksaddr, destination M.Socksaddr)
127+
NewPacketEx(buffer *buf.Buffer, source M.Socksaddr)
128128
}
129129

130130
// Deprecated: Use UDPConnectionHandlerEx instead.
@@ -146,11 +146,6 @@ type CachedPacketReader interface {
146146
ReadCachedPacket() *PacketBuffer
147147
}
148148

149-
type PacketBuffer struct {
150-
Buffer *buf.Buffer
151-
Destination M.Socksaddr
152-
}
153-
154149
type WithUpstreamReader interface {
155150
UpstreamReader() any
156151
}

common/network/packet.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package network
2+
3+
import (
4+
"sync"
5+
6+
"github.com/sagernet/sing/common/buf"
7+
M "github.com/sagernet/sing/common/metadata"
8+
)
9+
10+
type PacketBuffer struct {
11+
Buffer *buf.Buffer
12+
Destination M.Socksaddr
13+
}
14+
15+
var packetPool = sync.Pool{
16+
New: func() any {
17+
return new(PacketBuffer)
18+
},
19+
}
20+
21+
func NewPacketBuffer() *PacketBuffer {
22+
return packetPool.Get().(*PacketBuffer)
23+
}
24+
25+
func PutPacketBuffer(packet *PacketBuffer) {
26+
*packet = PacketBuffer{}
27+
packetPool.Put(packet)
28+
}
29+
30+
func ReleaseMultiPacketBuffer(packetBuffers []*PacketBuffer) {
31+
for _, packet := range packetBuffers {
32+
packet.Buffer.Release()
33+
PutPacketBuffer(packet)
34+
}
35+
}

common/udpnat2/conn.go

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,23 @@ import (
1212
"github.com/sagernet/sing/common/pipe"
1313
)
1414

15-
type natConn struct {
15+
type Conn struct {
1616
writer N.PacketWriter
1717
localAddr M.Socksaddr
18-
packetChan chan *Packet
18+
handler N.UDPHandlerEx
19+
packetChan chan *N.PacketBuffer
1920
doneChan chan struct{}
2021
readDeadline pipe.Deadline
2122
readWaitOptions N.ReadWaitOptions
2223
}
2324

24-
func (c *natConn) ReadPacket(buffer *buf.Buffer) (addr M.Socksaddr, err error) {
25+
func (c *Conn) ReadPacket(buffer *buf.Buffer) (addr M.Socksaddr, err error) {
2526
select {
2627
case p := <-c.packetChan:
2728
_, err = buffer.ReadOnceFrom(p.Buffer)
2829
destination := p.Destination
2930
p.Buffer.Release()
30-
PutPacket(p)
31+
N.PutPacketBuffer(p)
3132
return destination, err
3233
case <-c.doneChan:
3334
return M.Socksaddr{}, io.ErrClosedPipe
@@ -36,21 +37,36 @@ func (c *natConn) ReadPacket(buffer *buf.Buffer) (addr M.Socksaddr, err error) {
3637
}
3738
}
3839

39-
func (c *natConn) WritePacket(buffer *buf.Buffer, destination M.Socksaddr) error {
40+
func (c *Conn) WritePacket(buffer *buf.Buffer, destination M.Socksaddr) error {
4041
return c.writer.WritePacket(buffer, destination)
4142
}
4243

43-
func (c *natConn) InitializeReadWaiter(options N.ReadWaitOptions) (needCopy bool) {
44+
func (c *Conn) SetHandler(handler N.UDPHandlerEx) {
45+
c.handler = handler
46+
fetch:
47+
for {
48+
select {
49+
case packet := <-c.packetChan:
50+
c.handler.NewPacketEx(packet.Buffer, packet.Destination)
51+
N.PutPacketBuffer(packet)
52+
continue fetch
53+
default:
54+
break fetch
55+
}
56+
}
57+
}
58+
59+
func (c *Conn) InitializeReadWaiter(options N.ReadWaitOptions) (needCopy bool) {
4460
c.readWaitOptions = options
4561
return false
4662
}
4763

48-
func (c *natConn) WaitReadPacket() (buffer *buf.Buffer, destination M.Socksaddr, err error) {
64+
func (c *Conn) WaitReadPacket() (buffer *buf.Buffer, destination M.Socksaddr, err error) {
4965
select {
5066
case packet := <-c.packetChan:
5167
buffer = c.readWaitOptions.Copy(packet.Buffer)
5268
destination = packet.Destination
53-
PutPacket(packet)
69+
N.PutPacketBuffer(packet)
5470
return
5571
case <-c.doneChan:
5672
return nil, M.Socksaddr{}, io.ErrClosedPipe
@@ -59,7 +75,7 @@ func (c *natConn) WaitReadPacket() (buffer *buf.Buffer, destination M.Socksaddr,
5975
}
6076
}
6177

62-
func (c *natConn) Close() error {
78+
func (c *Conn) Close() error {
6379
select {
6480
case <-c.doneChan:
6581
default:
@@ -68,23 +84,23 @@ func (c *natConn) Close() error {
6884
return nil
6985
}
7086

71-
func (c *natConn) LocalAddr() net.Addr {
87+
func (c *Conn) LocalAddr() net.Addr {
7288
return c.localAddr
7389
}
7490

75-
func (c *natConn) RemoteAddr() net.Addr {
91+
func (c *Conn) RemoteAddr() net.Addr {
7692
return M.Socksaddr{}
7793
}
7894

79-
func (c *natConn) SetDeadline(t time.Time) error {
95+
func (c *Conn) SetDeadline(t time.Time) error {
8096
return os.ErrInvalid
8197
}
8298

83-
func (c *natConn) SetReadDeadline(t time.Time) error {
99+
func (c *Conn) SetReadDeadline(t time.Time) error {
84100
c.readDeadline.Set(t)
85101
return nil
86102
}
87103

88-
func (c *natConn) SetWriteDeadline(t time.Time) error {
104+
func (c *Conn) SetWriteDeadline(t time.Time) error {
89105
return os.ErrInvalid
90106
}

common/udpnat2/packet.go

Lines changed: 0 additions & 28 deletions
This file was deleted.

common/udpnat2/service.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
)
1515

1616
type Service struct {
17-
nat *freelru.LRU[netip.AddrPort, *natConn]
17+
nat *freelru.LRU[netip.AddrPort, *Conn]
1818
handler N.UDPConnectionHandlerEx
1919
prepare PrepareFunc
2020
metrics Metrics
@@ -30,17 +30,17 @@ type Metrics struct {
3030
}
3131

3232
func New(handler N.UDPConnectionHandlerEx, prepare PrepareFunc, timeout time.Duration) *Service {
33-
nat := common.Must1(freelru.New[netip.AddrPort, *natConn](1024, maphash.NewHasher[netip.AddrPort]().Hash32))
33+
nat := common.Must1(freelru.New[netip.AddrPort, *Conn](1024, maphash.NewHasher[netip.AddrPort]().Hash32))
3434
nat.SetLifetime(timeout)
35-
nat.SetHealthCheck(func(port netip.AddrPort, conn *natConn) bool {
35+
nat.SetHealthCheck(func(port netip.AddrPort, conn *Conn) bool {
3636
select {
3737
case <-conn.doneChan:
3838
return false
3939
default:
4040
return true
4141
}
4242
})
43-
nat.SetOnEvict(func(_ netip.AddrPort, conn *natConn) {
43+
nat.SetOnEvict(func(_ netip.AddrPort, conn *Conn) {
4444
conn.Close()
4545
})
4646
return &Service{
@@ -55,26 +55,31 @@ func (s *Service) NewPacket(bufferSlices [][]byte, source M.Socksaddr, destinati
5555
if !loaded {
5656
ok, ctx, writer, onClose := s.prepare(source, destination, userData)
5757
if !ok {
58+
println(2)
5859
s.metrics.Rejects++
5960
return
6061
}
61-
conn = &natConn{
62+
conn = &Conn{
6263
writer: writer,
6364
localAddr: source,
64-
packetChan: make(chan *Packet, 64),
65+
packetChan: make(chan *N.PacketBuffer, 64),
6566
doneChan: make(chan struct{}),
6667
readDeadline: pipe.MakeDeadline(),
6768
}
6869
s.nat.Add(source.AddrPort(), conn)
69-
s.handler.NewPacketConnectionEx(ctx, conn, source, destination, onClose)
70+
go s.handler.NewPacketConnectionEx(ctx, conn, source, destination, onClose)
7071
s.metrics.Creates++
7172
}
72-
packet := NewPacket()
7373
buffer := conn.readWaitOptions.NewPacketBuffer()
7474
for _, bufferSlice := range bufferSlices {
7575
buffer.Write(bufferSlice)
7676
}
77-
*packet = Packet{
77+
if conn.handler != nil {
78+
conn.handler.NewPacketEx(buffer, destination)
79+
return
80+
}
81+
packet := N.NewPacketBuffer()
82+
*packet = N.PacketBuffer{
7883
Buffer: buffer,
7984
Destination: destination,
8085
}
@@ -83,7 +88,7 @@ func (s *Service) NewPacket(bufferSlices [][]byte, source M.Socksaddr, destinati
8388
s.metrics.Inputs++
8489
default:
8590
packet.Buffer.Release()
86-
PutPacket(packet)
91+
N.PutPacketBuffer(packet)
8792
s.metrics.Drops++
8893
}
8994
}

0 commit comments

Comments
 (0)