chore: async xhttp RoundTrip to let some CDN/reverse proxy happy (#2719)

This commit is contained in:
kyber1024
2026-04-17 22:07:32 +10:00
committed by GitHub
parent f425c81a85
commit 2247c0ebb4
2 changed files with 202 additions and 41 deletions
+185 -31
View File
@@ -12,11 +12,13 @@ import (
"net/url"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/metacubex/mihomo/common/httputils"
"github.com/metacubex/http"
"github.com/metacubex/http/httptrace"
"github.com/metacubex/quic-go"
"github.com/metacubex/quic-go/http3"
"github.com/metacubex/tls"
@@ -344,7 +346,24 @@ func (c *Client) DialStreamOne() (net.Conn, error) {
conn := &Conn{writer: pw}
req, err := http.NewRequestWithContext(httputils.NewAddrContext(&conn.NetAddr, c.ctx), http.MethodPost, requestURL.String(), pr)
// Use gotConn to detect when TCP connection is established, so we can
// return the conn immediately without waiting for the HTTP response.
// This breaks the deadlock where CDN buffers response headers until the
// server sends body data, but the server waits for our request body,
// which can't be sent because we haven't returned the conn yet.
gotConn := make(chan struct{})
var gotConnOnce sync.Once
var tcpConnected atomic.Bool
addrCtx := httputils.NewAddrContext(&conn.NetAddr, c.ctx)
ctx := httptrace.WithClientTrace(addrCtx, &httptrace.ClientTrace{
GotConn: func(info httptrace.GotConnInfo) {
tcpConnected.Store(true)
gotConnOnce.Do(func() { close(gotConn) })
},
})
req, err := http.NewRequestWithContext(ctx, http.MethodPost, requestURL.String(), pr)
if err != nil {
_ = pr.Close()
_ = pw.Close()
@@ -360,21 +379,37 @@ func (c *Client) DialStreamOne() (net.Conn, error) {
return nil, err
}
resp, err := transport.RoundTrip(req)
if err != nil {
wrc := NewWaitReadCloser()
go func() {
resp, err := transport.RoundTrip(req)
if err != nil {
wrc.CloseWithError(err)
gotConnOnce.Do(func() { close(gotConn) })
return
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
_ = resp.Body.Close()
wrc.CloseWithError(fmt.Errorf("xhttp stream-one bad status: %s", resp.Status))
gotConnOnce.Do(func() { close(gotConn) })
return
}
wrc.Set(resp.Body)
}()
<-gotConn
if !tcpConnected.Load() {
// RoundTrip failed before TCP connected (e.g. DNS failure)
_ = pr.Close()
_ = pw.Close()
httputils.CloseTransport(transport)
var buf [0]byte
_, err = wrc.Read(buf[:])
return nil, err
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
_ = resp.Body.Close()
_ = pr.Close()
_ = pw.Close()
httputils.CloseTransport(transport)
return nil, fmt.Errorf("xhttp stream-one bad status: %s", resp.Status)
}
conn.reader = resp.Body
conn.reader = wrc
conn.onClose = func() {
_ = pr.Close()
httputils.CloseTransport(transport)
@@ -411,8 +446,21 @@ func (c *Client) DialStreamUp() (net.Conn, error) {
sessionID := newSessionID()
// Async download: avoid blocking on CDN response header buffering
gotConn := make(chan struct{})
var gotConnOnce sync.Once
var tcpConnected atomic.Bool
addrCtx := httputils.NewAddrContext(&conn.NetAddr, c.ctx)
downloadCtx := httptrace.WithClientTrace(addrCtx, &httptrace.ClientTrace{
GotConn: func(info httptrace.GotConnInfo) {
tcpConnected.Store(true)
gotConnOnce.Do(func() { close(gotConn) })
},
})
downloadReq, err := http.NewRequestWithContext(
httputils.NewAddrContext(&conn.NetAddr, c.ctx),
downloadCtx,
http.MethodGet,
downloadURL.String(),
nil,
@@ -449,19 +497,39 @@ func (c *Client) DialStreamUp() (net.Conn, error) {
}
uploadReq.Host = c.cfg.Host
downloadResp, err := downloadTransport.RoundTrip(downloadReq)
if err != nil {
wrc := NewWaitReadCloser()
go func() {
resp, err := downloadTransport.RoundTrip(downloadReq)
if err != nil {
wrc.CloseWithError(err)
gotConnOnce.Do(func() { close(gotConn) })
return
}
if resp.StatusCode != http.StatusOK {
_ = resp.Body.Close()
wrc.CloseWithError(fmt.Errorf("xhttp stream-up download bad status: %s", resp.Status))
gotConnOnce.Do(func() { close(gotConn) })
return
}
wrc.Set(resp.Body)
}()
<-gotConn
if !tcpConnected.Load() {
_ = pr.Close()
_ = pw.Close()
httputils.CloseTransport(uploadTransport)
httputils.CloseTransport(downloadTransport)
var buf [0]byte
_, err = wrc.Read(buf[:])
return nil, err
}
if downloadResp.StatusCode != http.StatusOK {
_ = downloadResp.Body.Close()
httputils.CloseTransport(uploadTransport)
httputils.CloseTransport(downloadTransport)
return nil, fmt.Errorf("xhttp stream-up download bad status: %s", downloadResp.Status)
}
// Start upload after download TCP is connected, so the server has likely
// already processed the GET and created the session. This preserves the
// original ordering (download before upload) while still being async.
go func() {
resp, err := uploadTransport.RoundTrip(uploadReq)
if err != nil {
@@ -476,7 +544,7 @@ func (c *Client) DialStreamUp() (net.Conn, error) {
}
}()
conn.reader = downloadResp.Body
conn.reader = wrc
conn.onClose = func() {
_ = pr.Close()
httputils.CloseTransport(uploadTransport)
@@ -518,8 +586,21 @@ func (c *Client) DialPacketUp() (net.Conn, error) {
writer.writeCond = sync.Cond{L: &writer.writeMu}
conn := &Conn{writer: writer}
// Async download: avoid blocking on CDN response header buffering
gotConn := make(chan struct{})
var gotConnOnce sync.Once
var tcpConnected atomic.Bool
addrCtx := httputils.NewAddrContext(&conn.NetAddr, c.ctx)
downloadCtx := httptrace.WithClientTrace(addrCtx, &httptrace.ClientTrace{
GotConn: func(info httptrace.GotConnInfo) {
tcpConnected.Store(true)
gotConnOnce.Do(func() { close(gotConn) })
},
})
downloadReq, err := http.NewRequestWithContext(
httputils.NewAddrContext(&conn.NetAddr, c.ctx),
downloadCtx,
http.MethodGet,
downloadURL.String(),
nil,
@@ -536,20 +617,35 @@ func (c *Client) DialPacketUp() (net.Conn, error) {
}
downloadReq.Host = downloadCfg.Host
resp, err := downloadTransport.RoundTrip(downloadReq)
if err != nil {
wrc := NewWaitReadCloser()
go func() {
resp, err := downloadTransport.RoundTrip(downloadReq)
if err != nil {
wrc.CloseWithError(err)
gotConnOnce.Do(func() { close(gotConn) })
return
}
if resp.StatusCode != http.StatusOK {
_ = resp.Body.Close()
wrc.CloseWithError(fmt.Errorf("xhttp packet-up download bad status: %s", resp.Status))
gotConnOnce.Do(func() { close(gotConn) })
return
}
wrc.Set(resp.Body)
}()
<-gotConn
if !tcpConnected.Load() {
httputils.CloseTransport(uploadTransport)
httputils.CloseTransport(downloadTransport)
var buf [0]byte
_, err = wrc.Read(buf[:])
return nil, err
}
if resp.StatusCode != http.StatusOK {
_ = resp.Body.Close()
httputils.CloseTransport(uploadTransport)
httputils.CloseTransport(downloadTransport)
return nil, fmt.Errorf("xhttp packet-up download bad status: %s", resp.Status)
}
conn.reader = resp.Body
conn.reader = wrc
conn.onClose = func() {
// uploadTransport already closed by writer
httputils.CloseTransport(downloadTransport)
@@ -563,3 +659,61 @@ func newSessionID() string {
_, _ = rand.Read(b[:])
return hex.EncodeToString(b[:])
}
// WaitReadCloser is an io.ReadCloser that blocks on Read() until the underlying
// ReadCloser is provided via Set(). This enables returning a reader immediately
// while the actual HTTP response body is obtained asynchronously in a goroutine,
// breaking the synchronous RoundTrip deadlock with CDN header buffering.
type WaitReadCloser struct {
wait chan struct{}
once sync.Once
rc io.ReadCloser
err error
}
func NewWaitReadCloser() *WaitReadCloser {
return &WaitReadCloser{wait: make(chan struct{})}
}
// Set provides the underlying ReadCloser and unblocks any pending Read calls.
// Must be called at most once. If Close was already called, rc is closed to
// prevent leaks.
func (w *WaitReadCloser) Set(rc io.ReadCloser) {
set := false
w.once.Do(func() {
w.rc = rc
set = true
close(w.wait)
})
if !set {
rc.Close()
}
}
// CloseWithError records an error and unblocks any pending Read calls.
func (w *WaitReadCloser) CloseWithError(err error) {
w.once.Do(func() {
w.err = err
close(w.wait)
})
}
func (w *WaitReadCloser) Read(b []byte) (int, error) {
<-w.wait
if w.rc != nil {
return w.rc.Read(b)
}
if w.err != nil {
return 0, w.err
}
return 0, io.ErrClosedPipe
}
func (w *WaitReadCloser) Close() error {
w.once.Do(func() { close(w.wait) })
<-w.wait
if w.rc != nil {
return w.rc.Close()
}
return nil
}
+17 -10
View File
@@ -145,9 +145,24 @@ func (h *requestHandler) getOrCreateSession(sessionID string) *httpSession {
s = newHTTPSession(h.scMaxBufferedPosts.Max)
h.sessions[sessionID] = s
// Reap orphan sessions that never become fully connected (e.g. from probing).
// Matches Xray-core's 30-second reaper in upsertSession.
go func() {
timer := time.NewTimer(30 * time.Second)
defer timer.Stop()
select {
case <-timer.C:
h.deleteSession(sessionID)
case <-s.connected:
}
}()
return s
}
func (h *requestHandler) deleteSession(sessionID string) {
h.mu.Lock()
defer h.mu.Unlock()
@@ -299,11 +314,7 @@ func (h *requestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// stream-up upload: POST /path/{session}
if r.Method == http.MethodPost && len(parts) == 1 && h.allowStreamUpUpload() {
sessionID := parts[0]
session := h.getSession(sessionID)
if session == nil {
http.Error(w, "unknown xhttp session", http.StatusBadRequest)
return
}
session := h.getOrCreateSession(sessionID)
httpSC := newHTTPServerConn(w, r.Body)
err := session.uploadQueue.Push(Packet{
@@ -360,11 +371,7 @@ func (h *requestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
session := h.getSession(sessionID)
if session == nil {
http.Error(w, "unknown xhttp session", http.StatusBadRequest)
return
}
session := h.getOrCreateSession(sessionID)
if r.ContentLength > int64(h.scMaxEachPostBytes.Max) {
http.Error(w, "body too large", http.StatusRequestEntityTooLarge)