From f109b8b095b3b642c04140da3a9174a4df481761 Mon Sep 17 00:00:00 2001 From: wwqgtxx Date: Wed, 8 Apr 2026 15:11:58 +0800 Subject: [PATCH] feat: support range format for xhttp `sc-max-each-post-bytes` --- adapter/outbound/vless.go | 4 +- listener/config/vless.go | 2 +- listener/inbound/vless.go | 2 +- listener/sing_vless/server.go | 5 +- transport/xhttp/client.go | 39 +++++---- transport/xhttp/config.go | 139 +++++++++++++-------------------- transport/xhttp/reuse.go | 41 +++++----- transport/xhttp/server.go | 69 +++++++--------- transport/xhttp/server_test.go | 5 +- 9 files changed, 143 insertions(+), 163 deletions(-) diff --git a/adapter/outbound/vless.go b/adapter/outbound/vless.go index 7ff6e6c1..2c71ebb1 100644 --- a/adapter/outbound/vless.go +++ b/adapter/outbound/vless.go @@ -82,7 +82,7 @@ type XHTTPOptions struct { Headers map[string]string `proxy:"headers,omitempty"` NoGRPCHeader bool `proxy:"no-grpc-header,omitempty"` XPaddingBytes string `proxy:"x-padding-bytes,omitempty"` - ScMaxEachPostBytes int `proxy:"sc-max-each-post-bytes,omitempty"` + ScMaxEachPostBytes string `proxy:"sc-max-each-post-bytes,omitempty"` ReuseSettings *XHTTPReuseSettings `proxy:"reuse-settings,omitempty"` // aka XMUX DownloadSettings *XHTTPDownloadSettings `proxy:"download-settings,omitempty"` } @@ -102,7 +102,7 @@ type XHTTPDownloadSettings struct { Headers *map[string]string `proxy:"headers,omitempty"` NoGRPCHeader *bool `proxy:"no-grpc-header,omitempty"` XPaddingBytes *string `proxy:"x-padding-bytes,omitempty"` - ScMaxEachPostBytes *int `proxy:"sc-max-each-post-bytes,omitempty"` + ScMaxEachPostBytes *string `proxy:"sc-max-each-post-bytes,omitempty"` ReuseSettings *XHTTPReuseSettings `proxy:"reuse-settings,omitempty"` // aka XMUX // proxy part Server *string `proxy:"server,omitempty"` diff --git a/listener/config/vless.go b/listener/config/vless.go index 884bc53a..9246cb99 100644 --- a/listener/config/vless.go +++ b/listener/config/vless.go @@ -36,7 +36,7 @@ type XHTTPConfig struct { Mode string NoSSEHeader bool ScStreamUpServerSecs string - ScMaxEachPostBytes int + ScMaxEachPostBytes string } func (t VlessServer) String() string { diff --git a/listener/inbound/vless.go b/listener/inbound/vless.go index 43beb119..653dc8c5 100644 --- a/listener/inbound/vless.go +++ b/listener/inbound/vless.go @@ -37,7 +37,7 @@ type XHTTPConfig struct { Mode string `inbound:"mode,omitempty"` NoSSEHeader bool `inbound:"no-sse-header,omitempty"` ScStreamUpServerSecs string `inbound:"sc-stream-up-server-secs,omitempty"` - ScMaxEachPostBytes int `inbound:"sc-max-each-post-bytes,omitempty"` + ScMaxEachPostBytes string `inbound:"sc-max-each-post-bytes,omitempty"` } func (o XHTTPConfig) Build() LC.XHTTPConfig { diff --git a/listener/sing_vless/server.go b/listener/sing_vless/server.go index 00930579..5c609632 100644 --- a/listener/sing_vless/server.go +++ b/listener/sing_vless/server.go @@ -154,7 +154,7 @@ func New(config LC.VlessServer, tunnel C.Tunnel, additions ...inbound.Addition) } } if config.XHTTPConfig.Path != "" || config.XHTTPConfig.Host != "" || config.XHTTPConfig.Mode != "" { - httpServer.Handler = xhttp.NewServerHandler(xhttp.ServerOption{ + httpServer.Handler, err = xhttp.NewServerHandler(xhttp.ServerOption{ Config: xhttp.Config{ Host: config.XHTTPConfig.Host, Path: config.XHTTPConfig.Path, @@ -168,6 +168,9 @@ func New(config LC.VlessServer, tunnel C.Tunnel, additions ...inbound.Addition) }, HttpHandler: httpServer.Handler, }) + if err != nil { + return nil, err + } if !slices.Contains(tlsConfig.NextProtos, "http/1.1") { tlsConfig.NextProtos = append([]string{"http/1.1"}, tlsConfig.NextProtos...) } diff --git a/transport/xhttp/client.go b/transport/xhttp/client.go index 8bf36da3..f1eae90f 100644 --- a/transport/xhttp/client.go +++ b/transport/xhttp/client.go @@ -24,19 +24,20 @@ type WrapTLSFunc func(ctx context.Context, conn net.Conn, isH2 bool) (net.Conn, type TransportMaker func() http.RoundTripper type PacketUpWriter struct { - ctx context.Context - cancel context.CancelFunc - cfg *Config - sessionID string - transport http.RoundTripper - writeMu sync.Mutex - seq uint64 + ctx context.Context + cancel context.CancelFunc + cfg *Config + scMaxEachPostBytes Range + sessionID string + transport http.RoundTripper + writeMu sync.Mutex + seq uint64 } func (c *PacketUpWriter) Write(b []byte) (int, error) { c.writeMu.Lock() defer c.writeMu.Unlock() - scMaxEachPostBytes := c.cfg.GetNormalizedScMaxEachPostBytes() + scMaxEachPostBytes := c.scMaxEachPostBytes.Rand() if len(b) < scMaxEachPostBytes { return c.write(b) } @@ -117,6 +118,7 @@ type Client struct { cancel context.CancelFunc mode string cfg *Config + scMaxEachPostBytes Range makeTransport TransportMaker makeDownloadTransport TransportMaker uploadManager *ReuseManager @@ -130,11 +132,19 @@ func NewClient(cfg *Config, makeTransport TransportMaker, makeDownloadTransport default: return nil, fmt.Errorf("xhttp mode %s is not implemented yet", mode) } + scMaxEachPostBytes, err := cfg.GetNormalizedScMaxEachPostBytes() + if err != nil { + return nil, err + } + if scMaxEachPostBytes.Max == 0 { // default to 1MB + scMaxEachPostBytes.Max = 1000000 + } ctx, cancel := context.WithCancel(context.Background()) client := &Client{ mode: mode, cfg: cfg, + scMaxEachPostBytes: scMaxEachPostBytes, makeTransport: makeTransport, makeDownloadTransport: makeDownloadTransport, ctx: ctx, @@ -403,12 +413,13 @@ func (c *Client) DialPacketUp() (net.Conn, error) { writerCtx, writerCancel := context.WithCancel(c.ctx) writer := &PacketUpWriter{ - ctx: writerCtx, - cancel: writerCancel, - cfg: c.cfg, - sessionID: sessionID, - transport: uploadTransport, - seq: 0, + ctx: writerCtx, + cancel: writerCancel, + cfg: c.cfg, + scMaxEachPostBytes: c.scMaxEachPostBytes, + sessionID: sessionID, + transport: uploadTransport, + seq: 0, } conn := &Conn{writer: writer} diff --git a/transport/xhttp/config.go b/transport/xhttp/config.go index d528c420..0c80b663 100644 --- a/transport/xhttp/config.go +++ b/transport/xhttp/config.go @@ -20,7 +20,7 @@ type Config struct { XPaddingBytes string NoSSEHeader bool // server only ScStreamUpServerSecs string // server only - ScMaxEachPostBytes int + ScMaxEachPostBytes string ReuseConfig *ReuseConfig DownloadConfig *Config } @@ -94,142 +94,111 @@ func (c *Config) RequestHeader() http.Header { } func (c *Config) RandomPadding() (string, error) { - paddingRange := c.XPaddingBytes - if paddingRange == "" { - paddingRange = "100-1000" - } - - minVal, maxVal, err := parseRange(paddingRange) + r, err := ParseRange(c.XPaddingBytes, "100-1000") if err != nil { - return "", err + return "", fmt.Errorf("invalid x-padding-bytes: %w", err) } - if minVal < 0 || maxVal < minVal { - return "", fmt.Errorf("invalid x-padding-bytes range: %s", paddingRange) - } - if maxVal == 0 { - return "", nil - } - - n := minVal - if maxVal > minVal { - n = minVal + rand.Intn(maxVal-minVal+1) - } - - return strings.Repeat("X", n), nil + return strings.Repeat("X", r.Rand()), nil } -func (c *Config) GetNormalizedScStreamUpServerSecs() (int, error) { - scStreamUpServerSecs := c.ScStreamUpServerSecs - if scStreamUpServerSecs == "" { - scStreamUpServerSecs = "20-80" - } - - minVal, maxVal, err := parseRange(scStreamUpServerSecs) +func (c *Config) GetNormalizedScStreamUpServerSecs() (Range, error) { + r, err := ParseRange(c.ScStreamUpServerSecs, "20-80") if err != nil { - return 0, err + return Range{}, fmt.Errorf("invalid sc-stream-up-server-secs: %w", err) } - if minVal < 0 || maxVal < minVal { - return 0, fmt.Errorf("invalid sc-stream-up-server-secs range: %s", scStreamUpServerSecs) - } - if maxVal == 0 { - return 0, nil - } - - n := minVal - if maxVal > minVal { - n = minVal + rand.Intn(maxVal-minVal+1) - } - - return n, nil + return r, nil } -func (c *Config) GetNormalizedScMaxEachPostBytes() int { - if c.ScMaxEachPostBytes == 0 { - return 1000000 +func (c *Config) GetNormalizedScMaxEachPostBytes() (Range, error) { + r, err := ParseRange(c.ScStreamUpServerSecs, "1000000") + if err != nil { + return Range{}, fmt.Errorf("invalid sc-max-each-post-bytes: %w", err) } - return c.ScMaxEachPostBytes + return r, nil } -func parseRange(s string) (int, int, error) { +type Range struct { + Min int + Max int +} + +func (r Range) Rand() int { + if r.Min == r.Max { + return r.Min + } + return r.Min + rand.Intn(r.Max-r.Min+1) +} + +func ParseRange(s string, fallback string) (Range, error) { + if strings.TrimSpace(s) == "" { + return parseRange(fallback) + } + return parseRange(s) +} + +func parseRange(s string) (Range, error) { parts := strings.Split(strings.TrimSpace(s), "-") if len(parts) == 1 { v, err := strconv.Atoi(parts[0]) if err != nil { - return 0, 0, err + return Range{}, err } - return v, v, nil + return Range{v, v}, nil } if len(parts) != 2 { - return 0, 0, fmt.Errorf("invalid range: %s", s) + return Range{}, fmt.Errorf("invalid range: %s", s) } minVal, err := strconv.Atoi(strings.TrimSpace(parts[0])) if err != nil { - return 0, 0, err + return Range{}, err } maxVal, err := strconv.Atoi(strings.TrimSpace(parts[1])) if err != nil { - return 0, 0, err - } - return minVal, maxVal, nil -} - -func resolveRangeValue(s string, fallback int) (int, error) { - if strings.TrimSpace(s) == "" { - return fallback, nil - } - - minVal, maxVal, err := parseRange(s) - if err != nil { - return 0, err + return Range{}, err } if minVal < 0 || maxVal < minVal { - return 0, fmt.Errorf("invalid range: %s", s) + return Range{}, fmt.Errorf("invalid range: %s", s) } - - if minVal == maxVal { - return minVal, nil - } - - return minVal + rand.Intn(maxVal-minVal+1), nil + return Range{minVal, maxVal}, nil } -func (c *ReuseConfig) ResolveManagerConfig() (int, int, error) { +func (c *ReuseConfig) ResolveManagerConfig() (Range, Range, error) { if c == nil { - return 0, 0, nil + return Range{}, Range{}, nil } - maxConnections, err := resolveRangeValue(c.MaxConnections, 0) + maxConnections, err := ParseRange(c.MaxConnections, "0") if err != nil { - return 0, 0, fmt.Errorf("invalid max-connections: %w", err) + return Range{}, Range{}, fmt.Errorf("invalid max-connections: %w", err) } - maxConcurrency, err := resolveRangeValue(c.MaxConcurrency, 0) + maxConcurrency, err := ParseRange(c.MaxConcurrency, "0") if err != nil { - return 0, 0, fmt.Errorf("invalid max-concurrency: %w", err) + return Range{}, Range{}, fmt.Errorf("invalid max-concurrency: %w", err) } return maxConnections, maxConcurrency, nil } -func (c *ReuseConfig) ResolveEntryConfig() (int, int, int, error) { +func (c *ReuseConfig) ResolveEntryConfig() (Range, Range, Range, error) { if c == nil { - return 0, 0, 0, nil + return Range{}, Range{}, Range{}, nil } - hMaxRequestTimes, err := resolveRangeValue(c.HMaxRequestTimes, 0) + hMaxRequestTimes, err := ParseRange(c.HMaxRequestTimes, "0") if err != nil { - return 0, 0, 0, fmt.Errorf("invalid h-max-request-times: %w", err) + return Range{}, Range{}, Range{}, fmt.Errorf("invalid h-max-request-times: %w", err) } - hMaxReusableSecs, err := resolveRangeValue(c.HMaxReusableSecs, 0) + hMaxReusableSecs, err := ParseRange(c.HMaxReusableSecs, "0") if err != nil { - return 0, 0, 0, fmt.Errorf("invalid h-max-reusable-secs: %w", err) + return Range{}, Range{}, Range{}, fmt.Errorf("invalid h-max-reusable-secs: %w", err) } - cMaxReuseTimes, err := resolveRangeValue(c.CMaxReuseTimes, 0) + cMaxReuseTimes, err := ParseRange(c.CMaxReuseTimes, "0") if err != nil { - return 0, 0, 0, fmt.Errorf("invalid c-max-reuse-times: %w", err) + return Range{}, Range{}, Range{}, fmt.Errorf("invalid c-max-reuse-times: %w", err) } return hMaxRequestTimes, hMaxReusableSecs, cMaxReuseTimes, nil diff --git a/transport/xhttp/reuse.go b/transport/xhttp/reuse.go index 132b18a1..006056cd 100644 --- a/transport/xhttp/reuse.go +++ b/transport/xhttp/reuse.go @@ -55,12 +55,14 @@ func (rt *ReuseTransport) Close() error { var _ http.RoundTripper = (*ReuseTransport)(nil) type ReuseManager struct { - cfg *ReuseConfig - maxConnections int - maxConcurrency int - maker TransportMaker - mu sync.Mutex - entries []*reuseEntry + maxConnections int + maxConcurrency int + hMaxRequestTimes Range + hMaxReusableSecs Range + cMaxReuseTimes Range + maker TransportMaker + mu sync.Mutex + entries []*reuseEntry } func NewReuseManager(cfg *ReuseConfig, makeTransport TransportMaker) (*ReuseManager, error) { @@ -71,16 +73,18 @@ func NewReuseManager(cfg *ReuseConfig, makeTransport TransportMaker) (*ReuseMana if err != nil { return nil, err } - _, _, _, err = cfg.ResolveEntryConfig() // check if config is valid + hMaxRequestTimes, hMaxReusableSecs, cMaxReuseTimes, err := cfg.ResolveEntryConfig() if err != nil { return nil, err } return &ReuseManager{ - cfg: cfg, - maxConnections: connections, - maxConcurrency: concurrency, - maker: makeTransport, - entries: make([]*reuseEntry, 0), + maxConnections: connections.Rand(), + maxConcurrency: concurrency.Rand(), + hMaxRequestTimes: hMaxRequestTimes, + hMaxReusableSecs: hMaxReusableSecs, + cMaxReuseTimes: cMaxReuseTimes, + maker: makeTransport, + entries: make([]*reuseEntry, 0), }, nil } @@ -169,17 +173,16 @@ func (m *ReuseManager) canCreateLocked() bool { func (m *ReuseManager) newEntryLocked(transport http.RoundTripper, now time.Time) *reuseEntry { entry := &reuseEntry{transport: transport} - hMaxRequestTimes, hMaxReusableSecs, cMaxReuseTimes, _ := m.cfg.ResolveEntryConfig() // error already checked in [NewReuseManager] - if hMaxRequestTimes > 0 { - entry.leftRequests.Store(int32(hMaxRequestTimes)) + if m.hMaxRequestTimes.Max > 0 { + entry.leftRequests.Store(int32(m.hMaxRequestTimes.Rand())) } else { entry.leftRequests.Store(1<<30 - 1) } - if hMaxReusableSecs > 0 { - entry.unreusableAt = now.Add(time.Duration(hMaxReusableSecs) * time.Second) + if m.hMaxReusableSecs.Max > 0 { + entry.unreusableAt = now.Add(time.Duration(m.hMaxReusableSecs.Rand()) * time.Second) } - if cMaxReuseTimes > 0 { - entry.maxReuseTimes = int32(cMaxReuseTimes) + if m.cMaxReuseTimes.Max > 0 { + entry.maxReuseTimes = int32(m.cMaxReuseTimes.Rand()) } m.entries = append(m.entries, entry) diff --git a/transport/xhttp/server.go b/transport/xhttp/server.go index ab3bd786..301e8687 100644 --- a/transport/xhttp/server.go +++ b/transport/xhttp/server.go @@ -98,21 +98,37 @@ type requestHandler struct { connHandler func(net.Conn) httpHandler http.Handler + scMaxEachPostBytes Range + scStreamUpServerSecs Range + mu sync.Mutex sessions map[string]*httpSession } -func NewServerHandler(opt ServerOption) http.Handler { +func NewServerHandler(opt ServerOption) (http.Handler, error) { + scMaxEachPostBytes, err := opt.Config.GetNormalizedScMaxEachPostBytes() + if err != nil { + return nil, err + } + if scMaxEachPostBytes.Max == 0 { // default to 1MB + scMaxEachPostBytes.Max = 1000000 + } + scStreamUpServerSecs, err := opt.Config.GetNormalizedScStreamUpServerSecs() + if err != nil { + return nil, err + } // using h2c.NewHandler to ensure we can work in plain http2 // and some tls conn is not *tls.Conn (like *reality.Conn) return h2c.NewHandler(&requestHandler{ - config: opt.Config, - connHandler: opt.ConnHandler, - httpHandler: opt.HttpHandler, - sessions: map[string]*httpSession{}, + config: opt.Config, + connHandler: opt.ConnHandler, + httpHandler: opt.HttpHandler, + scMaxEachPostBytes: scMaxEachPostBytes, + scStreamUpServerSecs: scStreamUpServerSecs, + sessions: map[string]*httpSession{}, }, &http.Http2Server{ IdleTimeout: 30 * time.Second, - }) + }), nil } func (h *requestHandler) getOrCreateSession(sessionID string) *httpSession { @@ -209,12 +225,7 @@ func (h *requestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { parts := splitNonEmpty(rest) // stream-one: POST /path - if r.Method == http.MethodPost && len(parts) == 0 { - if !h.allowStreamOne() { - http.NotFound(w, r) - return - } - + if r.Method == http.MethodPost && len(parts) == 0 && h.allowStreamOne() { w.Header().Set("X-Accel-Buffering", "no") w.Header().Set("Cache-Control", "no-store") w.WriteHeader(http.StatusOK) @@ -241,12 +252,7 @@ func (h *requestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // stream-up/packet-up download: GET /path/{session} - if r.Method == http.MethodGet && len(parts) == 1 { - if !h.allowSessionDownload() { - http.NotFound(w, r) - return - } - + if r.Method == http.MethodGet && len(parts) == 1 && h.allowSessionDownload() { sessionID := parts[0] session := h.getOrCreateSession(sessionID) session.markConnected() @@ -288,12 +294,7 @@ func (h *requestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // stream-up upload: POST /path/{session} - if r.Method == http.MethodPost && len(parts) == 1 { - if !h.allowStreamUpUpload() { - http.NotFound(w, r) - return - } - + if r.Method == http.MethodPost && len(parts) == 1 && h.allowStreamUpUpload() { sessionID := parts[0] session := h.getSession(sessionID) if session == nil { @@ -322,13 +323,9 @@ func (h *requestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } w.WriteHeader(http.StatusOK) referrer := r.Header.Get("Referer") - if referrer != "" { + if referrer != "" && h.scStreamUpServerSecs.Max > 0 { go func() { for { - scStreamUpServerSecs, _ := h.config.GetNormalizedScStreamUpServerSecs() - if scStreamUpServerSecs == 0 { - break - } paddingValue, _ := h.config.RandomPadding() if paddingValue == "" { break @@ -337,7 +334,7 @@ func (h *requestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if err != nil { break } - time.Sleep(time.Duration(scStreamUpServerSecs) * time.Second) + time.Sleep(time.Duration(h.scStreamUpServerSecs.Rand()) * time.Second) } }() } @@ -352,12 +349,7 @@ func (h *requestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // packet-up upload: POST /path/{session}/{seq} - if r.Method == http.MethodPost && len(parts) == 2 { - if !h.allowPacketUpUpload() { - http.NotFound(w, r) - return - } - + if r.Method == http.MethodPost && len(parts) == 2 && h.allowPacketUpUpload() { sessionID := parts[0] seq, err := strconv.ParseUint(parts[1], 10, 64) if err != nil { @@ -371,13 +363,12 @@ func (h *requestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - scMaxEachPostBytes := int64(h.config.GetNormalizedScMaxEachPostBytes()) - if r.ContentLength > scMaxEachPostBytes { + if r.ContentLength > int64(h.scMaxEachPostBytes.Max) { http.Error(w, "body too large", http.StatusRequestEntityTooLarge) return } - body, err := io.ReadAll(io.LimitReader(r.Body, scMaxEachPostBytes+1)) + body, err := io.ReadAll(io.LimitReader(r.Body, int64(h.scMaxEachPostBytes.Max)+1)) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return diff --git a/transport/xhttp/server_test.go b/transport/xhttp/server_test.go index bb9565e9..f7d951d8 100644 --- a/transport/xhttp/server_test.go +++ b/transport/xhttp/server_test.go @@ -78,7 +78,7 @@ func TestServerHandlerModeRestrictions(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { - handler := NewServerHandler(ServerOption{ + handler, err := NewServerHandler(ServerOption{ Config: Config{ Path: "/xhttp", Mode: testCase.mode, @@ -87,6 +87,9 @@ func TestServerHandlerModeRestrictions(t *testing.T) { _ = conn.Close() }, }) + if err != nil { + panic(err) + } req := httptest.NewRequest(testCase.method, testCase.target, io.NopCloser(http.NoBody)) recorder := httptest.NewRecorder()