fix: DoQ and HTTP/3 over proxy
This commit is contained in:
138
dns/doq.go
138
dns/doq.go
@ -88,9 +88,9 @@ func newDoQ(resolver *Resolver, addr string, adapter string) (dnsClient, error)
|
||||
}
|
||||
|
||||
// Address implements the Upstream interface for *dnsOverQUIC.
|
||||
func (p *dnsOverQUIC) Address() string { return p.addr }
|
||||
func (doq *dnsOverQUIC) Address() string { return doq.addr }
|
||||
|
||||
func (p *dnsOverQUIC) ExchangeContext(ctx context.Context, m *D.Msg) (msg *D.Msg, err error) {
|
||||
func (doq *dnsOverQUIC) ExchangeContext(ctx context.Context, m *D.Msg) (msg *D.Msg, err error) {
|
||||
// When sending queries over a QUIC connection, the DNS Message ID MUST be
|
||||
// set to zero.
|
||||
id := m.Id
|
||||
@ -105,49 +105,49 @@ func (p *dnsOverQUIC) ExchangeContext(ctx context.Context, m *D.Msg) (msg *D.Msg
|
||||
|
||||
// Check if there was already an active conn before sending the request.
|
||||
// We'll only attempt to re-connect if there was one.
|
||||
hasConnection := p.hasConnection()
|
||||
hasConnection := doq.hasConnection()
|
||||
|
||||
// Make the first attempt to send the DNS query.
|
||||
msg, err = p.exchangeQUIC(ctx, m)
|
||||
msg, err = doq.exchangeQUIC(ctx, m)
|
||||
|
||||
// Make up to 2 attempts to re-open the QUIC connection and send the request
|
||||
// again. There are several cases where this workaround is necessary to
|
||||
// make DoQ usable. We need to make 2 attempts in the case when the
|
||||
// connection was closed (due to inactivity for example) AND the server
|
||||
// refuses to open a 0-RTT connection.
|
||||
for i := 0; hasConnection && p.shouldRetry(err) && i < 2; i++ {
|
||||
for i := 0; hasConnection && doq.shouldRetry(err) && i < 2; i++ {
|
||||
log.Debugln("re-creating the QUIC connection and retrying due to %v", err)
|
||||
|
||||
// Close the active connection to make sure we'll try to re-connect.
|
||||
p.closeConnWithError(err)
|
||||
doq.closeConnWithError(err)
|
||||
|
||||
// Retry sending the request.
|
||||
msg, err = p.exchangeQUIC(ctx, m)
|
||||
msg, err = doq.exchangeQUIC(ctx, m)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
// If we're unable to exchange messages, make sure the connection is
|
||||
// closed and signal about an internal error.
|
||||
p.closeConnWithError(err)
|
||||
doq.closeConnWithError(err)
|
||||
}
|
||||
|
||||
return msg, err
|
||||
}
|
||||
|
||||
// Exchange implements the Upstream interface for *dnsOverQUIC.
|
||||
func (p *dnsOverQUIC) Exchange(m *D.Msg) (msg *D.Msg, err error) {
|
||||
return p.ExchangeContext(context.Background(), m)
|
||||
func (doq *dnsOverQUIC) Exchange(m *D.Msg) (msg *D.Msg, err error) {
|
||||
return doq.ExchangeContext(context.Background(), m)
|
||||
}
|
||||
|
||||
// Close implements the Upstream interface for *dnsOverQUIC.
|
||||
func (p *dnsOverQUIC) Close() (err error) {
|
||||
p.connMu.Lock()
|
||||
defer p.connMu.Unlock()
|
||||
func (doq *dnsOverQUIC) Close() (err error) {
|
||||
doq.connMu.Lock()
|
||||
defer doq.connMu.Unlock()
|
||||
|
||||
runtime.SetFinalizer(p, nil)
|
||||
runtime.SetFinalizer(doq, nil)
|
||||
|
||||
if p.conn != nil {
|
||||
err = p.conn.CloseWithError(QUICCodeNoError, "")
|
||||
if doq.conn != nil {
|
||||
err = doq.conn.CloseWithError(QUICCodeNoError, "")
|
||||
}
|
||||
|
||||
return err
|
||||
@ -155,9 +155,9 @@ func (p *dnsOverQUIC) Close() (err error) {
|
||||
|
||||
// exchangeQUIC attempts to open a QUIC connection, send the DNS message
|
||||
// through it and return the response it got from the server.
|
||||
func (p *dnsOverQUIC) exchangeQUIC(ctx context.Context, msg *D.Msg) (resp *D.Msg, err error) {
|
||||
func (doq *dnsOverQUIC) exchangeQUIC(ctx context.Context, msg *D.Msg) (resp *D.Msg, err error) {
|
||||
var conn quic.Connection
|
||||
conn, err = p.getConnection(true)
|
||||
conn, err = doq.getConnection(true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -169,7 +169,7 @@ func (p *dnsOverQUIC) exchangeQUIC(ctx context.Context, msg *D.Msg) (resp *D.Msg
|
||||
}
|
||||
|
||||
var stream quic.Stream
|
||||
stream, err = p.openStream(ctx, conn)
|
||||
stream, err = doq.openStream(ctx, conn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -185,7 +185,7 @@ func (p *dnsOverQUIC) exchangeQUIC(ctx context.Context, msg *D.Msg) (resp *D.Msg
|
||||
// write-direction of the stream, but does not prevent reading from it.
|
||||
_ = stream.Close()
|
||||
|
||||
return p.readMsg(stream)
|
||||
return doq.readMsg(stream)
|
||||
}
|
||||
|
||||
// AddPrefix adds a 2-byte prefix with the DNS message length.
|
||||
@ -199,17 +199,17 @@ func AddPrefix(b []byte) (m []byte) {
|
||||
|
||||
// shouldRetry checks what error we received and decides whether it is required
|
||||
// to re-open the connection and retry sending the request.
|
||||
func (p *dnsOverQUIC) shouldRetry(err error) (ok bool) {
|
||||
func (doq *dnsOverQUIC) shouldRetry(err error) (ok bool) {
|
||||
return isQUICRetryError(err)
|
||||
}
|
||||
|
||||
// getBytesPool returns (creates if needed) a pool we store byte buffers in.
|
||||
func (p *dnsOverQUIC) getBytesPool() (pool *sync.Pool) {
|
||||
p.bytesPoolGuard.Lock()
|
||||
defer p.bytesPoolGuard.Unlock()
|
||||
func (doq *dnsOverQUIC) getBytesPool() (pool *sync.Pool) {
|
||||
doq.bytesPoolGuard.Lock()
|
||||
defer doq.bytesPoolGuard.Unlock()
|
||||
|
||||
if p.bytesPool == nil {
|
||||
p.bytesPool = &sync.Pool{
|
||||
if doq.bytesPool == nil {
|
||||
doq.bytesPool = &sync.Pool{
|
||||
New: func() interface{} {
|
||||
b := make([]byte, MaxMsgSize)
|
||||
|
||||
@ -218,19 +218,19 @@ func (p *dnsOverQUIC) getBytesPool() (pool *sync.Pool) {
|
||||
}
|
||||
}
|
||||
|
||||
return p.bytesPool
|
||||
return doq.bytesPool
|
||||
}
|
||||
|
||||
// getConnection opens or returns an existing quic.Connection. useCached
|
||||
// argument controls whether we should try to use the existing cached
|
||||
// connection. If it is false, we will forcibly create a new connection and
|
||||
// close the existing one if needed.
|
||||
func (p *dnsOverQUIC) getConnection(useCached bool) (quic.Connection, error) {
|
||||
func (doq *dnsOverQUIC) getConnection(useCached bool) (quic.Connection, error) {
|
||||
var conn quic.Connection
|
||||
p.connMu.RLock()
|
||||
conn = p.conn
|
||||
doq.connMu.RLock()
|
||||
conn = doq.conn
|
||||
if conn != nil && useCached {
|
||||
p.connMu.RUnlock()
|
||||
doq.connMu.RUnlock()
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
@ -238,50 +238,50 @@ func (p *dnsOverQUIC) getConnection(useCached bool) (quic.Connection, error) {
|
||||
// we're recreating the connection, let's create a new one.
|
||||
_ = conn.CloseWithError(QUICCodeNoError, "")
|
||||
}
|
||||
p.connMu.RUnlock()
|
||||
doq.connMu.RUnlock()
|
||||
|
||||
p.connMu.Lock()
|
||||
defer p.connMu.Unlock()
|
||||
doq.connMu.Lock()
|
||||
defer doq.connMu.Unlock()
|
||||
|
||||
var err error
|
||||
conn, err = p.openConnection()
|
||||
conn, err = doq.openConnection()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.conn = conn
|
||||
doq.conn = conn
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
// hasConnection returns true if there's an active QUIC connection.
|
||||
func (p *dnsOverQUIC) hasConnection() (ok bool) {
|
||||
p.connMu.Lock()
|
||||
defer p.connMu.Unlock()
|
||||
func (doq *dnsOverQUIC) hasConnection() (ok bool) {
|
||||
doq.connMu.Lock()
|
||||
defer doq.connMu.Unlock()
|
||||
|
||||
return p.conn != nil
|
||||
return doq.conn != nil
|
||||
}
|
||||
|
||||
// getQUICConfig returns the QUIC config in a thread-safe manner. Note, that
|
||||
// this method returns a pointer, it is forbidden to change its properties.
|
||||
func (p *dnsOverQUIC) getQUICConfig() (c *quic.Config) {
|
||||
p.quicConfigGuard.Lock()
|
||||
defer p.quicConfigGuard.Unlock()
|
||||
func (doq *dnsOverQUIC) getQUICConfig() (c *quic.Config) {
|
||||
doq.quicConfigGuard.Lock()
|
||||
defer doq.quicConfigGuard.Unlock()
|
||||
|
||||
return p.quicConfig
|
||||
return doq.quicConfig
|
||||
}
|
||||
|
||||
// resetQUICConfig re-creates the tokens store as we may need to use a new one
|
||||
// if we failed to connect.
|
||||
func (p *dnsOverQUIC) resetQUICConfig() {
|
||||
p.quicConfigGuard.Lock()
|
||||
defer p.quicConfigGuard.Unlock()
|
||||
func (doq *dnsOverQUIC) resetQUICConfig() {
|
||||
doq.quicConfigGuard.Lock()
|
||||
defer doq.quicConfigGuard.Unlock()
|
||||
|
||||
p.quicConfig = p.quicConfig.Clone()
|
||||
p.quicConfig.TokenStore = newQUICTokenStore()
|
||||
doq.quicConfig = doq.quicConfig.Clone()
|
||||
doq.quicConfig.TokenStore = newQUICTokenStore()
|
||||
}
|
||||
|
||||
// openStream opens a new QUIC stream for the specified connection.
|
||||
func (p *dnsOverQUIC) openStream(ctx context.Context, conn quic.Connection) (quic.Stream, error) {
|
||||
func (doq *dnsOverQUIC) openStream(ctx context.Context, conn quic.Connection) (quic.Stream, error) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
@ -292,7 +292,7 @@ func (p *dnsOverQUIC) openStream(ctx context.Context, conn quic.Connection) (qui
|
||||
|
||||
// We can get here if the old QUIC connection is not valid anymore. We
|
||||
// should try to re-create the connection again in this case.
|
||||
newConn, err := p.getConnection(false)
|
||||
newConn, err := doq.getConnection(false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -321,14 +321,18 @@ func (doq *dnsOverQUIC) openConnection() (conn quic.Connection, err error) {
|
||||
// It's never actually used
|
||||
_ = rawConn.Close()
|
||||
cancel()
|
||||
|
||||
var addr string
|
||||
udpConn, ok := rawConn.(*net.UDPConn)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("failed to open connection to %s", doq.addr)
|
||||
if packetConn, ok := rawConn.(*wrapPacketConn); !ok {
|
||||
return nil, fmt.Errorf("failed to open connection to %s", doq.addr)
|
||||
} else {
|
||||
addr = packetConn.RemoteAddr().String()
|
||||
}
|
||||
} else {
|
||||
addr = udpConn.RemoteAddr().String()
|
||||
}
|
||||
|
||||
addr := udpConn.RemoteAddr().String()
|
||||
|
||||
ip, port, err := net.SplitHostPort(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -379,11 +383,11 @@ func (doq *dnsOverQUIC) openConnection() (conn quic.Connection, err error) {
|
||||
// closeConnWithError closes the active connection with error to make sure that
|
||||
// new queries were processed in another connection. We can do that in the case
|
||||
// of a fatal error.
|
||||
func (p *dnsOverQUIC) closeConnWithError(err error) {
|
||||
p.connMu.Lock()
|
||||
defer p.connMu.Unlock()
|
||||
func (doq *dnsOverQUIC) closeConnWithError(err error) {
|
||||
doq.connMu.Lock()
|
||||
defer doq.connMu.Unlock()
|
||||
|
||||
if p.conn == nil {
|
||||
if doq.conn == nil {
|
||||
// Do nothing, there's no active conn anyways.
|
||||
return
|
||||
}
|
||||
@ -395,19 +399,19 @@ func (p *dnsOverQUIC) closeConnWithError(err error) {
|
||||
|
||||
if errors.Is(err, quic.Err0RTTRejected) {
|
||||
// Reset the TokenStore only if 0-RTT was rejected.
|
||||
p.resetQUICConfig()
|
||||
doq.resetQUICConfig()
|
||||
}
|
||||
|
||||
err = p.conn.CloseWithError(code, "")
|
||||
err = doq.conn.CloseWithError(code, "")
|
||||
if err != nil {
|
||||
log.Errorln("failed to close the conn: %v", err)
|
||||
}
|
||||
p.conn = nil
|
||||
doq.conn = nil
|
||||
}
|
||||
|
||||
// readMsg reads the incoming DNS message from the QUIC stream.
|
||||
func (p *dnsOverQUIC) readMsg(stream quic.Stream) (m *D.Msg, err error) {
|
||||
pool := p.getBytesPool()
|
||||
func (doq *dnsOverQUIC) readMsg(stream quic.Stream) (m *D.Msg, err error) {
|
||||
pool := doq.getBytesPool()
|
||||
bufPtr := pool.Get().(*[]byte)
|
||||
|
||||
defer pool.Put(bufPtr)
|
||||
@ -415,7 +419,7 @@ func (p *dnsOverQUIC) readMsg(stream quic.Stream) (m *D.Msg, err error) {
|
||||
respBuf := *bufPtr
|
||||
n, err := stream.Read(respBuf)
|
||||
if err != nil && n == 0 {
|
||||
return nil, fmt.Errorf("reading response from %s: %w", p.Address(), err)
|
||||
return nil, fmt.Errorf("reading response from %s: %w", doq.Address(), err)
|
||||
}
|
||||
|
||||
// All DNS messages (queries and responses) sent over DoQ connections MUST
|
||||
@ -426,7 +430,7 @@ func (p *dnsOverQUIC) readMsg(stream quic.Stream) (m *D.Msg, err error) {
|
||||
m = new(D.Msg)
|
||||
err = m.Unpack(respBuf[2:])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unpacking response from %s: %w", p.Address(), err)
|
||||
return nil, fmt.Errorf("unpacking response from %s: %w", doq.Address(), err)
|
||||
}
|
||||
|
||||
return m, nil
|
||||
@ -512,7 +516,7 @@ func getDialHandler(r *Resolver, proxyAdapter string) dialHandler {
|
||||
if len(proxyAdapter) == 0 {
|
||||
return dialer.DialContext(ctx, network, net.JoinHostPort(ip.String(), port), dialer.WithDirect())
|
||||
} else {
|
||||
return dialContextExtra(ctx, proxyAdapter, network, ip.Unmap(), port, dialer.WithDirect())
|
||||
return dialContextExtra(ctx, proxyAdapter, network, ip.Unmap(), port)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user