Fix: grpc transport concurrent write

This commit is contained in:
Dreamacro
2021-04-14 21:46:05 +08:00
parent 936b7012ba
commit 4e9e4b6cde
2 changed files with 13 additions and 42 deletions

View File

@ -5,6 +5,7 @@ package gun
import (
"bufio"
"bytes"
"crypto/tls"
"encoding/binary"
"errors"
@ -30,6 +31,7 @@ var (
"content-type": []string{"application/grpc"},
"user-agent": []string{"grpc-go/1.36.0"},
}
bufferPool = sync.Pool{New: func() interface{} { return &bytes.Buffer{} }}
)
type DialFn = func(network, addr string) (net.Conn, error)
@ -119,13 +121,20 @@ func (g *Conn) Read(b []byte) (n int, err error) {
}
func (g *Conn) Write(b []byte) (n int, err error) {
protobufHeader := appendUleb128([]byte{0x0A}, uint64(len(b)))
protobufHeader := [binary.MaxVarintLen64 + 1]byte{0x0A}
varuintSize := binary.PutUvarint(protobufHeader[1:], uint64(len(b)))
grpcHeader := make([]byte, 5)
grpcPayloadLen := uint32(len(protobufHeader) + len(b))
grpcPayloadLen := uint32(varuintSize + 1 + len(b))
binary.BigEndian.PutUint32(grpcHeader[1:5], grpcPayloadLen)
buffers := net.Buffers{grpcHeader, protobufHeader, b}
_, err = buffers.WriteTo(g.writer)
buf := bufferPool.Get().(*bytes.Buffer)
defer bufferPool.Put(buf)
defer buf.Reset()
buf.Write(grpcHeader)
buf.Write(protobufHeader[:varuintSize+1])
buf.Write(b)
_, err = g.writer.Write(buf.Bytes())
if err == io.ErrClosedPipe && g.err != nil {
err = g.err
}