refactor: udp

This commit is contained in:
世界
2022-06-09 21:16:42 +08:00
parent cd466f05d3
commit 637a8b6ed5
23 changed files with 231 additions and 127 deletions

View File

@ -4,11 +4,11 @@ import (
"net"
"github.com/Dreamacro/clash/adapter/inbound"
"github.com/Dreamacro/clash/common/pool"
"github.com/Dreamacro/clash/common/sockopt"
C "github.com/Dreamacro/clash/constant"
"github.com/Dreamacro/clash/log"
"github.com/Dreamacro/clash/transport/socks5"
"github.com/sagernet/sing/common/buf"
M "github.com/sagernet/sing/common/metadata"
)
type UDPListener struct {
@ -49,34 +49,35 @@ func NewUDP(addr string, in chan<- *inbound.PacketAdapter) (*UDPListener, error)
}
go func() {
for {
buf := pool.Get(pool.UDPBufferSize)
n, remoteAddr, err := l.ReadFrom(buf)
buffer := buf.NewPacket()
n, remoteAddr, err := l.ReadFrom(buffer.FreeBytes())
if err != nil {
pool.Put(buf)
buffer.Release()
if sl.closed {
break
}
continue
}
handleSocksUDP(l, in, buf[:n], remoteAddr)
buffer.Extend(n)
handleSocksUDP(l, in, buffer, remoteAddr)
}
}()
return sl, nil
}
func handleSocksUDP(pc net.PacketConn, in chan<- *inbound.PacketAdapter, buf []byte, addr net.Addr) {
target, payload, err := socks5.DecodeUDPPacket(buf)
func handleSocksUDP(pc net.PacketConn, in chan<- *inbound.PacketAdapter, buffer *buf.Buffer, addr net.Addr) {
buffer.Advance(3)
target, err := M.SocksaddrSerializer.ReadAddrPort(buffer)
if err != nil {
// Unresolved UDP packet, return buffer to the pool
pool.Put(buf)
buffer.Release()
return
}
packet := &packet{
pc: pc,
rAddr: addr,
payload: payload,
bufRef: buf,
payload: buffer,
}
select {
case in <- inbound.NewPacket(target, packet, C.SOCKS5):

View File

@ -3,18 +3,19 @@ package socks
import (
"net"
"github.com/Dreamacro/clash/common/pool"
"github.com/Dreamacro/clash/transport/socks5"
"github.com/sagernet/sing/common"
"github.com/sagernet/sing/common/buf"
M "github.com/sagernet/sing/common/metadata"
)
type packet struct {
pc net.PacketConn
rAddr net.Addr
payload []byte
bufRef []byte
payload *buf.Buffer
}
func (c *packet) Data() []byte {
func (c *packet) Data() *buf.Buffer {
return c.payload
}
@ -27,11 +28,15 @@ func (c *packet) WriteBack(b []byte, addr net.Addr) (n int, err error) {
return c.pc.WriteTo(packet, c.rAddr)
}
func (c *packet) WritePacket(buffer *buf.Buffer, destination M.Socksaddr) error {
defer buffer.Release()
header := buf.With(buffer.ExtendHeader(3 + M.SocksaddrSerializer.AddrPortLen(destination)))
common.Must(header.WriteZeroN(3))
common.Must(M.SocksaddrSerializer.WriteAddrPort(header, destination))
return common.Error(c.pc.WriteTo(buffer.Bytes(), c.rAddr))
}
// LocalAddr returns the source IP/Port of UDP Packet
func (c *packet) LocalAddr() net.Addr {
return c.rAddr
}
func (c *packet) Drop() {
pool.Put(c.bufRef)
}

View File

@ -3,15 +3,16 @@ package tproxy
import (
"net"
"github.com/Dreamacro/clash/common/pool"
"github.com/sagernet/sing/common/buf"
M "github.com/sagernet/sing/common/metadata"
)
type packet struct {
lAddr *net.UDPAddr
buf []byte
buf *buf.Buffer
}
func (c *packet) Data() []byte {
func (c *packet) Data() *buf.Buffer {
return c.buf
}
@ -27,11 +28,18 @@ func (c *packet) WriteBack(b []byte, addr net.Addr) (n int, err error) {
return
}
func (c *packet) WritePacket(buffer *buf.Buffer, addr M.Socksaddr) error {
defer buffer.Release()
tc, err := dialUDP("udp", addr.UDPAddr(), c.lAddr)
defer tc.Close()
if err != nil {
return err
}
_, err = tc.Write(buffer.Bytes())
return nil
}
// LocalAddr returns the source IP/Port of UDP Packet
func (c *packet) LocalAddr() net.Addr {
return c.lAddr
}
func (c *packet) Drop() {
pool.Put(c.buf)
}

View File

@ -4,9 +4,9 @@ import (
"net"
"github.com/Dreamacro/clash/adapter/inbound"
"github.com/Dreamacro/clash/common/pool"
C "github.com/Dreamacro/clash/constant"
"github.com/Dreamacro/clash/transport/socks5"
"github.com/sagernet/sing/common/buf"
M "github.com/sagernet/sing/common/metadata"
)
type UDPListener struct {
@ -57,10 +57,10 @@ func NewUDP(addr string, in chan<- *inbound.PacketAdapter) (*UDPListener, error)
go func() {
oob := make([]byte, 1024)
for {
buf := pool.Get(pool.UDPBufferSize)
n, oobn, _, lAddr, err := c.ReadMsgUDP(buf, oob)
buffer := buf.NewPacket()
n, oobn, _, lAddr, err := c.ReadMsgUDP(buffer.FreeBytes(), oob)
if err != nil {
pool.Put(buf)
buffer.Release()
if rl.closed {
break
}
@ -71,21 +71,21 @@ func NewUDP(addr string, in chan<- *inbound.PacketAdapter) (*UDPListener, error)
if err != nil {
continue
}
handlePacketConn(l, in, buf[:n], lAddr, rAddr)
buffer.Extend(n)
handlePacketConn(l, in, buffer, lAddr, rAddr)
}
}()
return rl, nil
}
func handlePacketConn(pc net.PacketConn, in chan<- *inbound.PacketAdapter, buf []byte, lAddr *net.UDPAddr, rAddr *net.UDPAddr) {
target := socks5.ParseAddrToSocksAddr(rAddr)
func handlePacketConn(pc net.PacketConn, in chan<- *inbound.PacketAdapter, buf *buf.Buffer, lAddr *net.UDPAddr, rAddr *net.UDPAddr) {
pkt := &packet{
lAddr: lAddr,
buf: buf,
}
select {
case in <- inbound.NewPacket(target, pkt, C.TPROXY):
case in <- inbound.NewPacket(M.SocksaddrFromNet(rAddr), pkt, C.TPROXY):
default:
}
}

View File

@ -15,6 +15,8 @@ import (
D "github.com/Dreamacro/clash/listener/tun/ipstack/commons"
"github.com/Dreamacro/clash/listener/tun/ipstack/gvisor/adapter"
"github.com/Dreamacro/clash/transport/socks5"
"github.com/sagernet/sing/common/buf"
M "github.com/sagernet/sing/common/metadata"
)
var _ adapter.Handler = (*gvHandler)(nil)
@ -96,27 +98,24 @@ func (gh *gvHandler) HandleUDP(tunConn adapter.UDPConn) {
return
}
target := socks5.ParseAddrToSocksAddr(rAddr)
target := M.SocksaddrFromNet(rAddr)
go func() {
for {
buf := pool.Get(pool.UDPBufferSize)
buffer := buf.NewPacket()
n, addr, err := tunConn.ReadFrom(buf)
n, addr, err := tunConn.ReadFrom(buffer.FreeBytes())
if err != nil {
_ = pool.Put(buf)
buffer.Release()
break
}
payload := buf[:n]
buffer.Truncate(n)
if D.ShouldHijackDns(gh.dnsHijack, rAddrPort) {
go func() {
defer func() {
_ = pool.Put(buf)
}()
defer buffer.Release()
msg, err1 := D.RelayDnsPacket(payload)
msg, err1 := D.RelayDnsPacket(buffer.Bytes())
if err1 != nil {
return
}
@ -130,7 +129,7 @@ func (gh *gvHandler) HandleUDP(tunConn adapter.UDPConn) {
gvPacket := &packet{
pc: tunConn,
rAddr: addr,
payload: payload,
payload: buffer,
}
select {

View File

@ -5,10 +5,12 @@ package gvisor
import (
"net"
"github.com/Dreamacro/clash/common/pool"
"github.com/Dreamacro/clash/listener/tun/ipstack/gvisor/adapter"
"github.com/Dreamacro/clash/listener/tun/ipstack/gvisor/option"
"github.com/Dreamacro/clash/log"
"github.com/sagernet/sing/common"
"github.com/sagernet/sing/common/buf"
M "github.com/sagernet/sing/common/metadata"
"gvisor.dev/gvisor/pkg/tcpip/adapters/gonet"
"gvisor.dev/gvisor/pkg/tcpip/stack"
"gvisor.dev/gvisor/pkg/tcpip/transport/udp"
@ -51,10 +53,10 @@ func (c *udpConn) ID() *stack.TransportEndpointID {
type packet struct {
pc adapter.UDPConn
rAddr net.Addr
payload []byte
payload *buf.Buffer
}
func (c *packet) Data() []byte {
func (c *packet) Data() *buf.Buffer {
return c.payload
}
@ -63,11 +65,11 @@ func (c *packet) WriteBack(b []byte, _ net.Addr) (n int, err error) {
return c.pc.WriteTo(b, c.rAddr)
}
func (c *packet) WritePacket(buffer *buf.Buffer, addr M.Socksaddr) error {
return common.Error(c.pc.WriteTo(buffer.Bytes(), c.rAddr))
}
// LocalAddr returns the source IP/Port of UDP Packet
func (c *packet) LocalAddr() net.Addr {
return c.rAddr
}
func (c *packet) Drop() {
_ = pool.Put(c.payload)
}

View File

@ -21,7 +21,8 @@ import (
"github.com/Dreamacro/clash/listener/tun/ipstack/system/mars"
"github.com/Dreamacro/clash/listener/tun/ipstack/system/mars/nat"
"github.com/Dreamacro/clash/log"
"github.com/Dreamacro/clash/transport/socks5"
"github.com/sagernet/sing/common/buf"
M "github.com/sagernet/sing/common/metadata"
)
type sysStack struct {
@ -153,37 +154,37 @@ func New(device device.Device, dnsHijack []netip.AddrPort, tunAddress netip.Pref
}(stack.UDP())
for !ipStack.closed {
buf := pool.Get(pool.UDPBufferSize)
buffer := buf.NewPacket()
n, lRAddr, rRAddr, err := stack.UDP().ReadFrom(buf)
n, lRAddr, rRAddr, err := stack.UDP().ReadFrom(buffer.FreeBytes())
if err != nil {
_ = pool.Put(buf)
buffer.Release()
break
}
buffer.Truncate(n)
raw := buf[:n]
lAddr := lRAddr.(*net.UDPAddr)
rAddr := rRAddr.(*net.UDPAddr)
rAddrPort := netip.AddrPortFrom(nnip.IpToAddr(rAddr.IP), uint16(rAddr.Port))
if rAddrPort.Addr().IsLoopback() || rAddrPort.Addr() == gateway {
_ = pool.Put(buf)
buffer.Release()
continue
}
if D.ShouldHijackDns(dnsAddr, rAddrPort) {
go func() {
msg, err := D.RelayDnsPacket(raw)
msg, err := D.RelayDnsPacket(buffer.Bytes())
if err != nil {
_ = pool.Put(buf)
buffer.Release()
return
}
_, _ = stack.UDP().WriteTo(msg, rAddr, lAddr)
_ = pool.Put(buf)
buffer.Release()
}()
continue
@ -191,17 +192,14 @@ func New(device device.Device, dnsHijack []netip.AddrPort, tunAddress netip.Pref
pkt := &packet{
local: lAddr,
data: raw,
data: buffer,
writeBack: func(b []byte, addr net.Addr) (int, error) {
return stack.UDP().WriteTo(b, rAddr, lAddr)
},
drop: func() {
_ = pool.Put(buf)
},
}
select {
case udpIn <- inbound.NewPacket(socks5.ParseAddrToSocksAddr(rAddr), pkt, C.TUN):
case udpIn <- inbound.NewPacket(M.SocksaddrFromNet(rAddr), pkt, C.TUN):
default:
}
}

View File

@ -1,15 +1,20 @@
package system
import "net"
import (
"net"
"github.com/sagernet/sing/common"
"github.com/sagernet/sing/common/buf"
M "github.com/sagernet/sing/common/metadata"
)
type packet struct {
local *net.UDPAddr
data []byte
data *buf.Buffer
writeBack func(b []byte, addr net.Addr) (int, error)
drop func()
}
func (pkt *packet) Data() []byte {
func (pkt *packet) Data() *buf.Buffer {
return pkt.data
}
@ -17,8 +22,9 @@ func (pkt *packet) WriteBack(b []byte, addr net.Addr) (n int, err error) {
return pkt.writeBack(b, addr)
}
func (pkt *packet) Drop() {
pkt.drop()
func (pkt *packet) WritePacket(buffer *buf.Buffer, addr M.Socksaddr) error {
defer buffer.Release()
return common.Error(pkt.writeBack(buffer.Bytes(), addr.UDPAddr()))
}
func (pkt *packet) LocalAddr() net.Addr {