Feature: add experimental connections API
This commit is contained in:
@ -13,12 +13,11 @@ import (
|
||||
)
|
||||
|
||||
func (t *Tunnel) handleHTTP(request *adapters.HTTPAdapter, outbound net.Conn) {
|
||||
conn := newTrafficTrack(outbound, t.traffic)
|
||||
req := request.R
|
||||
host := req.Host
|
||||
|
||||
inboundReeder := bufio.NewReader(request)
|
||||
outboundReeder := bufio.NewReader(conn)
|
||||
outboundReeder := bufio.NewReader(outbound)
|
||||
|
||||
for {
|
||||
keepAlive := strings.TrimSpace(strings.ToLower(req.Header.Get("Proxy-Connection"))) == "keep-alive"
|
||||
@ -26,7 +25,7 @@ func (t *Tunnel) handleHTTP(request *adapters.HTTPAdapter, outbound net.Conn) {
|
||||
req.Header.Set("Connection", "close")
|
||||
req.RequestURI = ""
|
||||
adapters.RemoveHopByHopHeaders(req.Header)
|
||||
err := req.Write(conn)
|
||||
err := req.Write(outbound)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
@ -91,7 +90,7 @@ func (t *Tunnel) handleUDPToRemote(conn net.Conn, pc net.PacketConn, addr net.Ad
|
||||
if _, err = pc.WriteTo(buf[:n], addr); err != nil {
|
||||
return
|
||||
}
|
||||
t.traffic.Up() <- int64(n)
|
||||
DefaultManager.Upload() <- int64(n)
|
||||
}
|
||||
|
||||
func (t *Tunnel) handleUDPToLocal(conn net.Conn, pc net.PacketConn, key string, timeout time.Duration) {
|
||||
@ -111,13 +110,12 @@ func (t *Tunnel) handleUDPToLocal(conn net.Conn, pc net.PacketConn, key string,
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
t.traffic.Down() <- int64(n)
|
||||
DefaultManager.Download() <- int64(n)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Tunnel) handleSocket(request *adapters.SocketAdapter, outbound net.Conn) {
|
||||
conn := newTrafficTrack(outbound, t.traffic)
|
||||
relay(request, conn)
|
||||
relay(request, outbound)
|
||||
}
|
||||
|
||||
// relay copies between left and right bidirectionally.
|
||||
|
87
tunnel/manager.go
Normal file
87
tunnel/manager.go
Normal file
@ -0,0 +1,87 @@
|
||||
package tunnel
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
var DefaultManager *Manager
|
||||
|
||||
func init() {
|
||||
DefaultManager = &Manager{
|
||||
upload: make(chan int64),
|
||||
download: make(chan int64),
|
||||
}
|
||||
DefaultManager.handle()
|
||||
}
|
||||
|
||||
type Manager struct {
|
||||
connections sync.Map
|
||||
upload chan int64
|
||||
download chan int64
|
||||
uploadTemp int64
|
||||
downloadTemp int64
|
||||
uploadBlip int64
|
||||
downloadBlip int64
|
||||
uploadTotal int64
|
||||
downloadTotal int64
|
||||
}
|
||||
|
||||
func (m *Manager) Join(c tracker) {
|
||||
m.connections.Store(c.ID(), c)
|
||||
}
|
||||
|
||||
func (m *Manager) Leave(c tracker) {
|
||||
m.connections.Delete(c.ID())
|
||||
}
|
||||
|
||||
func (m *Manager) Upload() chan<- int64 {
|
||||
return m.upload
|
||||
}
|
||||
|
||||
func (m *Manager) Download() chan<- int64 {
|
||||
return m.download
|
||||
}
|
||||
|
||||
func (m *Manager) Now() (up int64, down int64) {
|
||||
return m.uploadBlip, m.downloadBlip
|
||||
}
|
||||
|
||||
func (m *Manager) Snapshot() *Snapshot {
|
||||
connections := []tracker{}
|
||||
m.connections.Range(func(key, value interface{}) bool {
|
||||
connections = append(connections, value.(tracker))
|
||||
return true
|
||||
})
|
||||
|
||||
return &Snapshot{
|
||||
UploadTotal: m.uploadTotal,
|
||||
DownloadTotal: m.downloadTotal,
|
||||
Connections: connections,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) handle() {
|
||||
go m.handleCh(m.upload, &m.uploadTemp, &m.uploadBlip, &m.uploadTotal)
|
||||
go m.handleCh(m.download, &m.downloadTemp, &m.downloadBlip, &m.downloadTotal)
|
||||
}
|
||||
|
||||
func (m *Manager) handleCh(ch <-chan int64, temp *int64, blip *int64, total *int64) {
|
||||
ticker := time.NewTicker(time.Second)
|
||||
for {
|
||||
select {
|
||||
case n := <-ch:
|
||||
*temp += n
|
||||
*total += n
|
||||
case <-ticker.C:
|
||||
*blip = *temp
|
||||
*temp = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type Snapshot struct {
|
||||
DownloadTotal int64 `json:"downloadTotal"`
|
||||
UploadTotal int64 `json:"uploadTotal"`
|
||||
Connections []tracker `json:"connections"`
|
||||
}
|
122
tunnel/tracker.go
Normal file
122
tunnel/tracker.go
Normal file
@ -0,0 +1,122 @@
|
||||
package tunnel
|
||||
|
||||
import (
|
||||
"net"
|
||||
"time"
|
||||
|
||||
C "github.com/Dreamacro/clash/constant"
|
||||
"github.com/gofrs/uuid"
|
||||
)
|
||||
|
||||
type tracker interface {
|
||||
ID() string
|
||||
Close() error
|
||||
}
|
||||
|
||||
type trackerInfo struct {
|
||||
UUID uuid.UUID `json:"id"`
|
||||
Metadata *C.Metadata `json:"metadata"`
|
||||
UploadTotal int64 `json:"upload"`
|
||||
DownloadTotal int64 `json:"download"`
|
||||
Start time.Time `json:"start"`
|
||||
Chain C.Chain `json:"chains"`
|
||||
Rule string `json:"rule"`
|
||||
}
|
||||
|
||||
type tcpTracker struct {
|
||||
C.Conn `json:"-"`
|
||||
*trackerInfo
|
||||
manager *Manager
|
||||
}
|
||||
|
||||
func (tt *tcpTracker) ID() string {
|
||||
return tt.UUID.String()
|
||||
}
|
||||
|
||||
func (tt *tcpTracker) Read(b []byte) (int, error) {
|
||||
n, err := tt.Conn.Read(b)
|
||||
download := int64(n)
|
||||
tt.manager.Download() <- download
|
||||
tt.DownloadTotal += download
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (tt *tcpTracker) Write(b []byte) (int, error) {
|
||||
n, err := tt.Conn.Write(b)
|
||||
upload := int64(n)
|
||||
tt.manager.Upload() <- upload
|
||||
tt.UploadTotal += upload
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (tt *tcpTracker) Close() error {
|
||||
tt.manager.Leave(tt)
|
||||
return tt.Conn.Close()
|
||||
}
|
||||
|
||||
func newTCPTracker(conn C.Conn, manager *Manager, metadata *C.Metadata, rule C.Rule) *tcpTracker {
|
||||
uuid, _ := uuid.NewV4()
|
||||
t := &tcpTracker{
|
||||
Conn: conn,
|
||||
manager: manager,
|
||||
trackerInfo: &trackerInfo{
|
||||
UUID: uuid,
|
||||
Start: time.Now(),
|
||||
Metadata: metadata,
|
||||
Chain: conn.Chains(),
|
||||
Rule: rule.RuleType().String(),
|
||||
},
|
||||
}
|
||||
|
||||
manager.Join(t)
|
||||
return t
|
||||
}
|
||||
|
||||
type udpTracker struct {
|
||||
C.PacketConn `json:"-"`
|
||||
*trackerInfo
|
||||
manager *Manager
|
||||
}
|
||||
|
||||
func (ut *udpTracker) ID() string {
|
||||
return ut.UUID.String()
|
||||
}
|
||||
|
||||
func (ut *udpTracker) ReadFrom(b []byte) (int, net.Addr, error) {
|
||||
n, addr, err := ut.PacketConn.ReadFrom(b)
|
||||
download := int64(n)
|
||||
ut.manager.Download() <- download
|
||||
ut.DownloadTotal += download
|
||||
return n, addr, err
|
||||
}
|
||||
|
||||
func (ut *udpTracker) WriteTo(b []byte, addr net.Addr) (int, error) {
|
||||
n, err := ut.PacketConn.WriteTo(b, addr)
|
||||
upload := int64(n)
|
||||
ut.manager.Upload() <- upload
|
||||
ut.UploadTotal += upload
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (ut *udpTracker) Close() error {
|
||||
ut.manager.Leave(ut)
|
||||
return ut.PacketConn.Close()
|
||||
}
|
||||
|
||||
func newUDPTracker(conn C.PacketConn, manager *Manager, metadata *C.Metadata, rule C.Rule) *udpTracker {
|
||||
uuid, _ := uuid.NewV4()
|
||||
ut := &udpTracker{
|
||||
PacketConn: conn,
|
||||
manager: manager,
|
||||
trackerInfo: &trackerInfo{
|
||||
UUID: uuid,
|
||||
Start: time.Now(),
|
||||
Metadata: metadata,
|
||||
Chain: conn.Chains(),
|
||||
Rule: rule.RuleType().String(),
|
||||
},
|
||||
}
|
||||
|
||||
manager.Join(ut)
|
||||
return ut
|
||||
}
|
@ -30,8 +30,7 @@ type Tunnel struct {
|
||||
natTable *nat.Table
|
||||
rules []C.Rule
|
||||
proxies map[string]C.Proxy
|
||||
configMux *sync.RWMutex
|
||||
traffic *C.Traffic
|
||||
configMux sync.RWMutex
|
||||
|
||||
// experimental features
|
||||
ignoreResolveFail bool
|
||||
@ -50,11 +49,6 @@ func (t *Tunnel) Add(req C.ServerAdapter) {
|
||||
}
|
||||
}
|
||||
|
||||
// Traffic return traffic of all connections
|
||||
func (t *Tunnel) Traffic() *C.Traffic {
|
||||
return t.traffic
|
||||
}
|
||||
|
||||
// Rules return all rules
|
||||
func (t *Tunnel) Rules() []C.Rule {
|
||||
return t.rules
|
||||
@ -123,7 +117,7 @@ func (t *Tunnel) needLookupIP(metadata *C.Metadata) bool {
|
||||
func (t *Tunnel) resolveMetadata(metadata *C.Metadata) (C.Proxy, C.Rule, error) {
|
||||
// preprocess enhanced-mode metadata
|
||||
if t.needLookupIP(metadata) {
|
||||
host, exist := dns.DefaultResolver.IPToHost(*metadata.DstIP)
|
||||
host, exist := dns.DefaultResolver.IPToHost(metadata.DstIP)
|
||||
if exist {
|
||||
metadata.Host = host
|
||||
metadata.AddrType = C.AtypDomainName
|
||||
@ -188,8 +182,8 @@ func (t *Tunnel) handleUDPConn(localConn C.ServerAdapter) {
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
pc = rawPc
|
||||
addr = nAddr
|
||||
pc = newUDPTracker(rawPc, DefaultManager, metadata, rule)
|
||||
|
||||
if rule != nil {
|
||||
log.Infoln("%s --> %v match %s using %s", metadata.SrcIP.String(), metadata.String(), rule.RuleType().String(), rawPc.Chains().String())
|
||||
@ -231,6 +225,7 @@ func (t *Tunnel) handleTCPConn(localConn C.ServerAdapter) {
|
||||
log.Warnln("dial %s error: %s", proxy.Name(), err.Error())
|
||||
return
|
||||
}
|
||||
remoteConn = newTCPTracker(remoteConn, DefaultManager, metadata, rule)
|
||||
defer remoteConn.Close()
|
||||
|
||||
if rule != nil {
|
||||
@ -259,7 +254,7 @@ func (t *Tunnel) match(metadata *C.Metadata) (C.Proxy, C.Rule, error) {
|
||||
|
||||
if node := dns.DefaultHosts.Search(metadata.Host); node != nil {
|
||||
ip := node.Data.(net.IP)
|
||||
metadata.DstIP = &ip
|
||||
metadata.DstIP = ip
|
||||
resolved = true
|
||||
}
|
||||
|
||||
@ -273,7 +268,7 @@ func (t *Tunnel) match(metadata *C.Metadata) (C.Proxy, C.Rule, error) {
|
||||
log.Debugln("[DNS] resolve %s error: %s", metadata.Host, err.Error())
|
||||
} else {
|
||||
log.Debugln("[DNS] %s --> %s", metadata.Host, ip.String())
|
||||
metadata.DstIP = &ip
|
||||
metadata.DstIP = ip
|
||||
}
|
||||
resolved = true
|
||||
}
|
||||
@ -296,13 +291,11 @@ func (t *Tunnel) match(metadata *C.Metadata) (C.Proxy, C.Rule, error) {
|
||||
|
||||
func newTunnel() *Tunnel {
|
||||
return &Tunnel{
|
||||
tcpQueue: channels.NewInfiniteChannel(),
|
||||
udpQueue: channels.NewInfiniteChannel(),
|
||||
natTable: nat.New(),
|
||||
proxies: make(map[string]C.Proxy),
|
||||
configMux: &sync.RWMutex{},
|
||||
traffic: C.NewTraffic(time.Second),
|
||||
mode: Rule,
|
||||
tcpQueue: channels.NewInfiniteChannel(),
|
||||
udpQueue: channels.NewInfiniteChannel(),
|
||||
natTable: nat.New(),
|
||||
proxies: make(map[string]C.Proxy),
|
||||
mode: Rule,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,29 +0,0 @@
|
||||
package tunnel
|
||||
|
||||
import (
|
||||
"net"
|
||||
|
||||
C "github.com/Dreamacro/clash/constant"
|
||||
)
|
||||
|
||||
// TrafficTrack record traffic of net.Conn
|
||||
type TrafficTrack struct {
|
||||
net.Conn
|
||||
traffic *C.Traffic
|
||||
}
|
||||
|
||||
func (tt *TrafficTrack) Read(b []byte) (int, error) {
|
||||
n, err := tt.Conn.Read(b)
|
||||
tt.traffic.Down() <- int64(n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (tt *TrafficTrack) Write(b []byte) (int, error) {
|
||||
n, err := tt.Conn.Write(b)
|
||||
tt.traffic.Up() <- int64(n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
func newTrafficTrack(conn net.Conn, traffic *C.Traffic) *TrafficTrack {
|
||||
return &TrafficTrack{traffic: traffic, Conn: conn}
|
||||
}
|
Reference in New Issue
Block a user