Chore: deprecated eapache/channels

This commit is contained in:
Dreamacro
2020-10-20 17:44:39 +08:00
parent baabf21340
commit 2321e9139d
5 changed files with 57 additions and 38 deletions

View File

@ -113,3 +113,34 @@ func TestObservable_SubscribeGoroutineLeak(t *testing.T) {
_, more := <-list[0]
assert.False(t, more)
}
func Benchmark_Observable_1000(b *testing.B) {
ch := make(chan interface{})
o := NewObservable(ch)
num := 1000
subs := []Subscription{}
for i := 0; i < num; i++ {
sub, _ := o.Subscribe()
subs = append(subs, sub)
}
wg := sync.WaitGroup{}
wg.Add(num)
b.ResetTimer()
for _, sub := range subs {
go func(s Subscription) {
for range s {
}
wg.Done()
}(sub)
}
for i := 0; i < b.N; i++ {
ch <- i
}
close(ch)
wg.Wait()
}

View File

@ -2,34 +2,32 @@ package observable
import (
"sync"
"gopkg.in/eapache/channels.v1"
)
type Subscription <-chan interface{}
type Subscriber struct {
buffer *channels.InfiniteChannel
buffer chan interface{}
once sync.Once
}
func (s *Subscriber) Emit(item interface{}) {
s.buffer.In() <- item
s.buffer <- item
}
func (s *Subscriber) Out() Subscription {
return s.buffer.Out()
return s.buffer
}
func (s *Subscriber) Close() {
s.once.Do(func() {
s.buffer.Close()
close(s.buffer)
})
}
func newSubscriber() *Subscriber {
sub := &Subscriber{
buffer: channels.NewInfiniteChannel(),
buffer: make(chan interface{}, 200),
}
return sub
}