Improve: config convergent and add log-level

This commit is contained in:
Dreamacro
2018-07-26 00:04:59 +08:00
parent 7357d2d0c2
commit 8389150318
31 changed files with 757 additions and 484 deletions

View File

@ -1,16 +1,14 @@
package tunnel
import (
"fmt"
"strconv"
"strings"
"sync"
"time"
"github.com/Dreamacro/clash/adapters"
LocalAdapter "github.com/Dreamacro/clash/adapters/local"
RemoteAdapter "github.com/Dreamacro/clash/adapters/remote"
cfg "github.com/Dreamacro/clash/config"
C "github.com/Dreamacro/clash/constant"
"github.com/Dreamacro/clash/observable"
R "github.com/Dreamacro/clash/rules"
"gopkg.in/eapache/channels.v1"
)
@ -20,168 +18,61 @@ var (
once sync.Once
)
// Tunnel handle proxy socket and HTTP/SOCKS socket
type Tunnel struct {
queue *channels.InfiniteChannel
rules []C.Rule
proxies map[string]C.Proxy
observable *observable.Observable
logCh chan interface{}
configLock *sync.RWMutex
traffic *C.Traffic
mode Mode
selector *adapters.Selector
// Outbound Rule
mode cfg.Mode
selector *RemoteAdapter.Selector
// Log
logCh chan interface{}
observable *observable.Observable
logLevel C.LogLevel
}
// Add request to queue
func (t *Tunnel) Add(req C.ServerAdapter) {
t.queue.In() <- req
}
// Traffic return traffic of all connections
func (t *Tunnel) Traffic() *C.Traffic {
return t.traffic
}
func (t *Tunnel) Config() ([]C.Rule, map[string]C.Proxy) {
return t.rules, t.proxies
}
// Log return clash log stream
func (t *Tunnel) Log() *observable.Observable {
return t.observable
}
func (t *Tunnel) SetMode(mode Mode) {
t.mode = mode
}
func (t *Tunnel) GetMode() Mode {
return t.mode
}
func (t *Tunnel) UpdateConfig() (err error) {
cfg, err := C.GetConfig()
if err != nil {
return
}
// empty proxies and rules
proxies := make(map[string]C.Proxy)
rules := []C.Rule{}
proxiesConfig := cfg.Section("Proxy")
rulesConfig := cfg.Section("Rule")
groupsConfig := cfg.Section("Proxy Group")
// parse proxy
for _, key := range proxiesConfig.Keys() {
proxy := key.Strings(",")
if len(proxy) == 0 {
continue
}
switch proxy[0] {
// ss, server, port, cipter, password
case "ss":
if len(proxy) < 5 {
continue
}
ssURL := fmt.Sprintf("ss://%s:%s@%s:%s", proxy[3], proxy[4], proxy[1], proxy[2])
ss, err := adapters.NewShadowSocks(key.Name(), ssURL, t.traffic)
if err != nil {
return err
}
proxies[key.Name()] = ss
func (t *Tunnel) configMonitor(signal chan<- struct{}) {
sub := cfg.Instance().Subscribe()
signal <- struct{}{}
for elm := range sub {
event := elm.(*cfg.Event)
switch event.Type {
case "proxies":
proxies := event.Payload.(map[string]C.Proxy)
t.configLock.Lock()
t.proxies = proxies
t.configLock.Unlock()
case "rules":
rules := event.Payload.([]C.Rule)
t.configLock.Lock()
t.rules = rules
t.configLock.Unlock()
case "mode":
t.mode = event.Payload.(cfg.Mode)
case "log-level":
t.logLevel = event.Payload.(C.LogLevel)
}
}
// parse rules
for _, key := range rulesConfig.Keys() {
rule := strings.Split(key.Name(), ",")
if len(rule) < 3 {
continue
}
rule = trimArr(rule)
switch rule[0] {
case "DOMAIN-SUFFIX":
rules = append(rules, R.NewDomainSuffix(rule[1], rule[2]))
case "DOMAIN-KEYWORD":
rules = append(rules, R.NewDomainKeyword(rule[1], rule[2]))
case "GEOIP":
rules = append(rules, R.NewGEOIP(rule[1], rule[2]))
case "IP-CIDR", "IP-CIDR6":
rules = append(rules, R.NewIPCIDR(rule[1], rule[2]))
case "FINAL":
rules = append(rules, R.NewFinal(rule[2]))
}
}
// parse proxy groups
for _, key := range groupsConfig.Keys() {
rule := strings.Split(key.Value(), ",")
rule = trimArr(rule)
switch rule[0] {
case "url-test":
if len(rule) < 4 {
return fmt.Errorf("URLTest need more than 4 param")
}
proxyNames := rule[1 : len(rule)-2]
delay, _ := strconv.Atoi(rule[len(rule)-1])
url := rule[len(rule)-2]
var ps []C.Proxy
for _, name := range proxyNames {
if p, ok := proxies[name]; ok {
ps = append(ps, p)
}
}
adapter, err := adapters.NewURLTest(key.Name(), ps, url, time.Duration(delay)*time.Second)
if err != nil {
return fmt.Errorf("Config error: %s", err.Error())
}
proxies[key.Name()] = adapter
case "select":
if len(rule) < 3 {
return fmt.Errorf("Selector need more than 3 param")
}
proxyNames := rule[1:]
selectProxy := make(map[string]C.Proxy)
for _, name := range proxyNames {
proxy, exist := proxies[name]
if !exist {
return fmt.Errorf("Proxy %s not exist", name)
}
selectProxy[name] = proxy
}
selector, err := adapters.NewSelector(key.Name(), selectProxy)
if err != nil {
return fmt.Errorf("Selector create error: %s", err.Error())
}
proxies[key.Name()] = selector
}
}
// init proxy
proxies["DIRECT"] = adapters.NewDirect(t.traffic)
proxies["REJECT"] = adapters.NewReject()
t.configLock.Lock()
defer t.configLock.Unlock()
// stop url-test
for _, elm := range t.proxies {
urlTest, ok := elm.(*adapters.URLTest)
if ok {
urlTest.Close()
}
}
s, err := adapters.NewSelector("Proxy", proxies)
if err != nil {
return err
}
t.proxies = proxies
t.rules = rules
t.selector = s
return nil
}
func (t *Tunnel) process() {
@ -199,9 +90,9 @@ func (t *Tunnel) handleConn(localConn C.ServerAdapter) {
var proxy C.Proxy
switch t.mode {
case Direct:
case cfg.Direct:
proxy = t.proxies["DIRECT"]
case Global:
case cfg.Global:
proxy = t.selector
// Rule
default:
@ -209,12 +100,22 @@ func (t *Tunnel) handleConn(localConn C.ServerAdapter) {
}
remoConn, err := proxy.Generator(addr)
if err != nil {
t.logCh <- newLog(WARNING, "Proxy connect error: %s", err.Error())
t.logCh <- newLog(C.WARNING, "Proxy connect error: %s", err.Error())
return
}
defer remoConn.Close()
localConn.Connect(remoConn)
switch adapter := localConn.(type) {
case *LocalAdapter.HttpAdapter:
t.handleHTTP(adapter, remoConn)
break
case *LocalAdapter.HttpsAdapter:
t.handleHTTPS(adapter, remoConn)
break
case *LocalAdapter.SocksAdapter:
t.handleSOCKS(adapter, remoConn)
break
}
}
func (t *Tunnel) match(addr *C.Addr) C.Proxy {
@ -227,31 +128,39 @@ func (t *Tunnel) match(addr *C.Addr) C.Proxy {
if !ok {
continue
}
t.logCh <- newLog(INFO, "%v match %s using %s", addr.String(), rule.RuleType().String(), rule.Adapter())
t.logCh <- newLog(C.INFO, "%v match %s using %s", addr.String(), rule.RuleType().String(), rule.Adapter())
return a
}
}
t.logCh <- newLog(INFO, "%v doesn't match any rule using DIRECT", addr.String())
t.logCh <- newLog(C.INFO, "%v doesn't match any rule using DIRECT", addr.String())
return t.proxies["DIRECT"]
}
// Run initial task
func (t *Tunnel) Run() {
go t.process()
go t.subscribeLogs()
signal := make(chan struct{})
go t.configMonitor(signal)
<-signal
}
func newTunnel() *Tunnel {
logCh := make(chan interface{})
tunnel := &Tunnel{
return &Tunnel{
queue: channels.NewInfiniteChannel(),
proxies: make(map[string]C.Proxy),
observable: observable.NewObservable(logCh),
logCh: logCh,
configLock: &sync.RWMutex{},
traffic: C.NewTraffic(time.Second),
mode: Rule,
mode: cfg.Rule,
logLevel: C.INFO,
}
go tunnel.process()
go tunnel.subscribeLogs()
return tunnel
}
func GetInstance() *Tunnel {
// Instance return singleton instance of Tunnel
func Instance() *Tunnel {
once.Do(func() {
tunnel = newTunnel()
})