Chore: improve code architecture

This commit is contained in:
Dreamacro
2018-11-21 13:47:46 +08:00
parent 91e35f2f6a
commit 01a477bd3d
37 changed files with 926 additions and 990 deletions

View File

@ -1,51 +0,0 @@
package tunnel
import (
"fmt"
C "github.com/Dreamacro/clash/constant"
log "github.com/sirupsen/logrus"
)
type Log struct {
LogLevel C.LogLevel
Payload string
}
func (l *Log) Type() string {
return l.LogLevel.String()
}
func print(data Log) {
switch data.LogLevel {
case C.INFO:
log.Infoln(data.Payload)
case C.WARNING:
log.Warnln(data.Payload)
case C.ERROR:
log.Errorln(data.Payload)
case C.DEBUG:
log.Debugln(data.Payload)
}
}
func (t *Tunnel) subscribeLogs() {
sub, err := t.observable.Subscribe()
if err != nil {
log.Fatalf("Can't subscribe tunnel log: %s", err.Error())
}
for elm := range sub {
data := elm.(Log)
if data.LogLevel <= t.logLevel {
print(data)
}
}
}
func newLog(logLevel C.LogLevel, format string, v ...interface{}) Log {
return Log{
LogLevel: logLevel,
Payload: fmt.Sprintf(format, v...),
}
}

53
tunnel/mode.go Normal file
View File

@ -0,0 +1,53 @@
package tunnel
import (
"encoding/json"
"errors"
)
type Mode int
var (
// ModeMapping is a mapping for Mode enum
ModeMapping = map[string]Mode{
"Global": Global,
"Rule": Rule,
"Direct": Direct,
}
)
const (
Global Mode = iota
Rule
Direct
)
// UnmarshalJSON unserialize Mode
func (m *Mode) UnmarshalJSON(data []byte) error {
var tp string
json.Unmarshal(data, tp)
mode, exist := ModeMapping[tp]
if !exist {
return errors.New("invalid mode")
}
*m = mode
return nil
}
// MarshalJSON serialize Mode
func (m Mode) MarshalJSON() ([]byte, error) {
return json.Marshal(m.String())
}
func (m Mode) String() string {
switch m {
case Global:
return "Global"
case Rule:
return "Rule"
case Direct:
return "Direct"
default:
return "Unknow"
}
}

View File

@ -5,9 +5,8 @@ import (
"time"
InboundAdapter "github.com/Dreamacro/clash/adapters/inbound"
"github.com/Dreamacro/clash/common/observable"
cfg "github.com/Dreamacro/clash/config"
C "github.com/Dreamacro/clash/constant"
"github.com/Dreamacro/clash/log"
"gopkg.in/eapache/channels.v1"
)
@ -26,12 +25,7 @@ type Tunnel struct {
traffic *C.Traffic
// Outbound Rule
mode cfg.Mode
// Log
logCh chan interface{}
observable *observable.Observable
logLevel C.LogLevel
mode Mode
}
// Add request to queue
@ -44,33 +38,38 @@ func (t *Tunnel) Traffic() *C.Traffic {
return t.traffic
}
// Log return clash log stream
func (t *Tunnel) Log() *observable.Observable {
return t.observable
// Rules return all rules
func (t *Tunnel) Rules() []C.Rule {
return t.rules
}
func (t *Tunnel) configMonitor(signal chan<- struct{}) {
sub := cfg.Instance().Subscribe()
signal <- struct{}{}
for elm := range sub {
event := elm.(*cfg.Event)
switch event.Type {
case "proxies":
proxies := event.Payload.(map[string]C.Proxy)
t.configLock.Lock()
t.proxies = proxies
t.configLock.Unlock()
case "rules":
rules := event.Payload.([]C.Rule)
t.configLock.Lock()
t.rules = rules
t.configLock.Unlock()
case "mode":
t.mode = event.Payload.(cfg.Mode)
case "log-level":
t.logLevel = event.Payload.(C.LogLevel)
}
}
// UpdateRules handle update rules
func (t *Tunnel) UpdateRules(rules []C.Rule) {
t.configLock.Lock()
t.rules = rules
t.configLock.Unlock()
}
// Proxies return all proxies
func (t *Tunnel) Proxies() map[string]C.Proxy {
return t.proxies
}
// UpdateProxies handle update proxies
func (t *Tunnel) UpdateProxies(proxies map[string]C.Proxy) {
t.configLock.Lock()
t.proxies = proxies
t.configLock.Unlock()
}
// Mode return current mode
func (t *Tunnel) Mode() Mode {
return t.mode
}
// SetMode change the mode of tunnel
func (t *Tunnel) SetMode(mode Mode) {
t.mode = mode
}
func (t *Tunnel) process() {
@ -88,9 +87,9 @@ func (t *Tunnel) handleConn(localConn C.ServerAdapter) {
var proxy C.Proxy
switch t.mode {
case cfg.Direct:
case Direct:
proxy = t.proxies["DIRECT"]
case cfg.Global:
case Global:
proxy = t.proxies["GLOBAL"]
// Rule
default:
@ -98,7 +97,7 @@ func (t *Tunnel) handleConn(localConn C.ServerAdapter) {
}
remoConn, err := proxy.Generator(metadata)
if err != nil {
t.logCh <- newLog(C.WARNING, "Proxy connect error: %s", err.Error())
log.Warnln("Proxy connect error: %s", err.Error())
return
}
defer remoConn.Close()
@ -121,34 +120,21 @@ func (t *Tunnel) match(metadata *C.Metadata) C.Proxy {
if !ok {
continue
}
t.logCh <- newLog(C.INFO, "%v match %s using %s", metadata.String(), rule.RuleType().String(), rule.Adapter())
log.Infoln("%v match %s using %s", metadata.String(), rule.RuleType().String(), rule.Adapter())
return a
}
}
t.logCh <- newLog(C.INFO, "%v doesn't match any rule using DIRECT", metadata.String())
log.Infoln("%v doesn't match any rule using DIRECT", metadata.String())
return t.proxies["DIRECT"]
}
// Run initial task
func (t *Tunnel) Run() {
go t.process()
go t.subscribeLogs()
signal := make(chan struct{})
go t.configMonitor(signal)
<-signal
}
func newTunnel() *Tunnel {
logCh := make(chan interface{})
return &Tunnel{
queue: channels.NewInfiniteChannel(),
proxies: make(map[string]C.Proxy),
observable: observable.NewObservable(logCh),
logCh: logCh,
configLock: &sync.RWMutex{},
traffic: C.NewTraffic(time.Second),
mode: cfg.Rule,
logLevel: C.INFO,
mode: Rule,
}
}
@ -156,6 +142,7 @@ func newTunnel() *Tunnel {
func Instance() *Tunnel {
once.Do(func() {
tunnel = newTunnel()
go tunnel.process()
})
return tunnel
}