chore: decrease shadowsocks udp read memory used for no-windows platform

This commit is contained in:
wwqgtxx
2023-05-11 19:01:41 +08:00
parent e404695a0d
commit 234f7dbd3b
12 changed files with 361 additions and 86 deletions

View File

@ -3,34 +3,43 @@ package net
import "net"
type bindPacketConn struct {
net.PacketConn
EnhancePacketConn
rAddr net.Addr
}
func (wpc *bindPacketConn) Read(b []byte) (n int, err error) {
n, _, err = wpc.PacketConn.ReadFrom(b)
func (c *bindPacketConn) Read(b []byte) (n int, err error) {
n, _, err = c.EnhancePacketConn.ReadFrom(b)
return n, err
}
func (wpc *bindPacketConn) Write(b []byte) (n int, err error) {
return wpc.PacketConn.WriteTo(b, wpc.rAddr)
func (c *bindPacketConn) WaitRead() (data []byte, put func(), err error) {
data, put, _, err = c.EnhancePacketConn.WaitReadFrom()
return
}
func (wpc *bindPacketConn) RemoteAddr() net.Addr {
return wpc.rAddr
func (c *bindPacketConn) Write(b []byte) (n int, err error) {
return c.EnhancePacketConn.WriteTo(b, c.rAddr)
}
func (wpc *bindPacketConn) LocalAddr() net.Addr {
if wpc.PacketConn.LocalAddr() == nil {
func (c *bindPacketConn) RemoteAddr() net.Addr {
return c.rAddr
}
func (c *bindPacketConn) LocalAddr() net.Addr {
if c.EnhancePacketConn.LocalAddr() == nil {
return &net.UDPAddr{IP: net.IPv4zero, Port: 0}
} else {
return wpc.PacketConn.LocalAddr()
return c.EnhancePacketConn.LocalAddr()
}
}
func NewBindPacketConn(pc net.PacketConn, rAddr net.Addr) net.Conn {
func (c *bindPacketConn) Upstream() any {
return c.EnhancePacketConn
}
func NewBindPacketConn(pc EnhancePacketConn, rAddr net.Addr) net.Conn {
return &bindPacketConn{
PacketConn: pc,
rAddr: rAddr,
EnhancePacketConn: pc,
rAddr: rAddr,
}
}

View File

@ -0,0 +1,172 @@
package deadline
import (
"net"
"os"
"time"
"github.com/Dreamacro/clash/common/atomic"
"github.com/Dreamacro/clash/common/net/packet"
)
type readResult struct {
data []byte
put func()
addr net.Addr
err error
}
type PacketConn struct {
net.PacketConn
deadline atomic.TypedValue[time.Time]
pipeDeadline pipeDeadline
disablePipe atomic.Bool
inRead atomic.Bool
resultCh chan *readResult
}
func NewPacketConn(pc net.PacketConn) net.PacketConn {
c := &PacketConn{
PacketConn: pc,
pipeDeadline: makePipeDeadline(),
resultCh: make(chan *readResult, 1),
}
c.resultCh <- nil
if enhancePacketConn, isEnhance := pc.(packet.EnhancePacketConn); isEnhance {
return &EnhancePacketConn{
PacketConn: c,
enhancePacketConn: enhancePacketConn,
}
}
return c
}
func (c *PacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
select {
case result := <-c.resultCh:
if result != nil {
n = copy(p, result.data)
addr = result.addr
err = result.err
c.resultCh <- nil // finish cache read
return
} else {
c.resultCh <- nil
break
}
case <-c.pipeDeadline.wait():
return 0, nil, os.ErrDeadlineExceeded
}
if c.disablePipe.Load() {
return c.PacketConn.ReadFrom(p)
} else if c.deadline.Load().IsZero() {
c.inRead.Store(true)
defer c.inRead.Store(false)
n, addr, err = c.PacketConn.ReadFrom(p)
return
}
<-c.resultCh
go c.pipeReadFrom(len(p))
return c.ReadFrom(p)
}
func (c *PacketConn) pipeReadFrom(size int) {
buffer := make([]byte, size)
n, addr, err := c.PacketConn.ReadFrom(buffer)
buffer = buffer[:n]
c.resultCh <- &readResult{
data: buffer,
addr: addr,
err: err,
}
}
func (c *PacketConn) SetReadDeadline(t time.Time) error {
if c.disablePipe.Load() {
return c.PacketConn.SetReadDeadline(t)
} else if c.inRead.Load() {
c.disablePipe.Store(true)
return c.PacketConn.SetReadDeadline(t)
}
c.deadline.Store(t)
c.pipeDeadline.set(t)
return nil
}
func (c *PacketConn) ReaderReplaceable() bool {
select {
case result := <-c.resultCh:
c.resultCh <- result
if result != nil {
return false // cache reading
} else {
break
}
default:
return false // pipe reading
}
return c.disablePipe.Load() || c.deadline.Load().IsZero()
}
func (c *PacketConn) WriterReplaceable() bool {
return true
}
func (c *PacketConn) Upstream() any {
return c.PacketConn
}
func (c *PacketConn) NeedAdditionalReadDeadline() bool {
return false
}
type EnhancePacketConn struct {
*PacketConn
enhancePacketConn packet.EnhancePacketConn
}
func (c *EnhancePacketConn) WaitReadFrom() (data []byte, put func(), addr net.Addr, err error) {
select {
case result := <-c.resultCh:
if result != nil {
data = result.data
put = result.put
addr = result.addr
err = result.err
c.resultCh <- nil // finish cache read
return
} else {
c.resultCh <- nil
break
}
case <-c.pipeDeadline.wait():
return nil, nil, nil, os.ErrDeadlineExceeded
}
if c.disablePipe.Load() {
return c.enhancePacketConn.WaitReadFrom()
} else if c.deadline.Load().IsZero() {
c.inRead.Store(true)
defer c.inRead.Store(false)
data, put, addr, err = c.enhancePacketConn.WaitReadFrom()
return
}
<-c.resultCh
go c.pipeWaitReadFrom()
return c.WaitReadFrom()
}
func (c *EnhancePacketConn) pipeWaitReadFrom() {
data, put, addr, err := c.enhancePacketConn.WaitReadFrom()
c.resultCh <- &readResult{
data: data,
put: put,
addr: addr,
err: err,
}
}

View File

@ -0,0 +1,84 @@
// Copyright 2010 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package deadline
import (
"sync"
"time"
)
// pipeDeadline is an abstraction for handling timeouts.
type pipeDeadline struct {
mu sync.Mutex // Guards timer and cancel
timer *time.Timer
cancel chan struct{} // Must be non-nil
}
func makePipeDeadline() pipeDeadline {
return pipeDeadline{cancel: make(chan struct{})}
}
// set sets the point in time when the deadline will time out.
// A timeout event is signaled by closing the channel returned by waiter.
// Once a timeout has occurred, the deadline can be refreshed by specifying a
// t value in the future.
//
// A zero value for t prevents timeout.
func (d *pipeDeadline) set(t time.Time) {
d.mu.Lock()
defer d.mu.Unlock()
if d.timer != nil && !d.timer.Stop() {
<-d.cancel // Wait for the timer callback to finish and close cancel
}
d.timer = nil
// Time is zero, then there is no deadline.
closed := isClosedChan(d.cancel)
if t.IsZero() {
if closed {
d.cancel = make(chan struct{})
}
return
}
// Time in the future, setup a timer to cancel in the future.
if dur := time.Until(t); dur > 0 {
if closed {
d.cancel = make(chan struct{})
}
d.timer = time.AfterFunc(dur, func() {
close(d.cancel)
})
return
}
// Time in the past, so close immediately.
if !closed {
close(d.cancel)
}
}
// wait returns a channel that is closed when the deadline is exceeded.
func (d *pipeDeadline) wait() chan struct{} {
d.mu.Lock()
defer d.mu.Unlock()
return d.cancel
}
func isClosedChan(c <-chan struct{}) bool {
select {
case <-c:
return true
default:
return false
}
}
func makeFilledChan() chan struct{} {
ch := make(chan struct{}, 1)
ch <- struct{}{}
return ch
}

View File

@ -4,69 +4,14 @@ import (
"net"
"sync"
"github.com/Dreamacro/clash/common/pool"
"github.com/Dreamacro/clash/common/net/deadline"
"github.com/Dreamacro/clash/common/net/packet"
)
type EnhancePacketConn interface {
net.PacketConn
WaitReadFrom() (data []byte, put func(), addr net.Addr, err error)
Upstream() any
}
type EnhancePacketConn = packet.EnhancePacketConn
func NewEnhancePacketConn(pc net.PacketConn) EnhancePacketConn {
if udpConn, isUDPConn := pc.(*net.UDPConn); isUDPConn {
return &enhanceUDPConn{UDPConn: udpConn}
}
return &enhancePacketConn{PacketConn: pc}
}
type enhancePacketConn struct {
net.PacketConn
}
func (c *enhancePacketConn) WaitReadFrom() (data []byte, put func(), addr net.Addr, err error) {
return waitReadFrom(c.PacketConn)
}
func (c *enhancePacketConn) Upstream() any {
return c.PacketConn
}
func (c *enhancePacketConn) WriterReplaceable() bool {
return true
}
func (c *enhancePacketConn) ReaderReplaceable() bool {
return true
}
func (c *enhanceUDPConn) Upstream() any {
return c.UDPConn
}
func (c *enhanceUDPConn) WriterReplaceable() bool {
return true
}
func (c *enhanceUDPConn) ReaderReplaceable() bool {
return true
}
func waitReadFrom(pc net.PacketConn) (data []byte, put func(), addr net.Addr, err error) {
readBuf := pool.Get(pool.UDPBufferSize)
put = func() {
_ = pool.Put(readBuf)
}
var readN int
readN, addr, err = pc.ReadFrom(readBuf)
if readN > 0 {
data = readBuf[:readN]
} else {
put()
put = nil
}
return
}
var NewEnhancePacketConn = packet.NewEnhancePacketConn
var NewDeadlinePacketConn = deadline.NewPacketConn
type threadSafePacketConn struct {
net.PacketConn

View File

@ -0,0 +1,70 @@
package packet
import (
"net"
"github.com/Dreamacro/clash/common/pool"
)
type EnhancePacketConn interface {
net.PacketConn
WaitReadFrom() (data []byte, put func(), addr net.Addr, err error)
}
func NewEnhancePacketConn(pc net.PacketConn) EnhancePacketConn {
if udpConn, isUDPConn := pc.(*net.UDPConn); isUDPConn {
return &enhanceUDPConn{UDPConn: udpConn}
}
if enhancePC, isEnhancePC := pc.(EnhancePacketConn); isEnhancePC {
return enhancePC
}
return &enhancePacketConn{PacketConn: pc}
}
type enhancePacketConn struct {
net.PacketConn
}
func (c *enhancePacketConn) WaitReadFrom() (data []byte, put func(), addr net.Addr, err error) {
return waitReadFrom(c.PacketConn)
}
func (c *enhancePacketConn) Upstream() any {
return c.PacketConn
}
func (c *enhancePacketConn) WriterReplaceable() bool {
return true
}
func (c *enhancePacketConn) ReaderReplaceable() bool {
return true
}
func (c *enhanceUDPConn) Upstream() any {
return c.UDPConn
}
func (c *enhanceUDPConn) WriterReplaceable() bool {
return true
}
func (c *enhanceUDPConn) ReaderReplaceable() bool {
return true
}
func waitReadFrom(pc net.PacketConn) (data []byte, put func(), addr net.Addr, err error) {
readBuf := pool.Get(pool.UDPBufferSize)
put = func() {
_ = pool.Put(readBuf)
}
var readN int
readN, addr, err = pc.ReadFrom(readBuf)
if readN > 0 {
data = readBuf[:readN]
} else {
put()
put = nil
}
return
}

View File

@ -1,6 +1,6 @@
//go:build !windows
package net
package packet
import (
"io"

View File

@ -1,6 +1,6 @@
//go:build windows
package net
package packet
import (
"net"

View File

@ -23,10 +23,6 @@ func NewDeadlineConn(conn net.Conn) ExtendedConn {
return deadline.NewFallbackConn(conn)
}
func NewDeadlinePacketConn(pc net.PacketConn) network.NetPacketConn {
return deadline.NewFallbackPacketConn(bufio.NewPacketConn(pc))
}
func NeedHandshake(conn any) bool {
if earlyConn, isEarlyConn := common.Cast[network.EarlyConn](conn); isEarlyConn && earlyConn.NeedHandshake() {
return true