feat: support upload packet merging in packet-up mode and sc-min-posts-interval-ms settings for xhttp transport

This commit is contained in:
wwqgtxx
2026-04-14 18:19:47 +08:00
parent cac48917f1
commit ad91303f99
4 changed files with 131 additions and 60 deletions
+36 -32
View File
@@ -79,15 +79,16 @@ type VlessOption struct {
}
type XHTTPOptions struct {
Path string `proxy:"path,omitempty"`
Host string `proxy:"host,omitempty"`
Mode string `proxy:"mode,omitempty"`
Headers map[string]string `proxy:"headers,omitempty"`
NoGRPCHeader bool `proxy:"no-grpc-header,omitempty"`
XPaddingBytes string `proxy:"x-padding-bytes,omitempty"`
ScMaxEachPostBytes string `proxy:"sc-max-each-post-bytes,omitempty"`
ReuseSettings *XHTTPReuseSettings `proxy:"reuse-settings,omitempty"` // aka XMUX
DownloadSettings *XHTTPDownloadSettings `proxy:"download-settings,omitempty"`
Path string `proxy:"path,omitempty"`
Host string `proxy:"host,omitempty"`
Mode string `proxy:"mode,omitempty"`
Headers map[string]string `proxy:"headers,omitempty"`
NoGRPCHeader bool `proxy:"no-grpc-header,omitempty"`
XPaddingBytes string `proxy:"x-padding-bytes,omitempty"`
ScMaxEachPostBytes string `proxy:"sc-max-each-post-bytes,omitempty"`
ScMinPostsIntervalMs string `proxy:"sc-min-posts-interval-ms,omitempty"`
ReuseSettings *XHTTPReuseSettings `proxy:"reuse-settings,omitempty"` // aka XMUX
DownloadSettings *XHTTPDownloadSettings `proxy:"download-settings,omitempty"`
}
type XHTTPReuseSettings struct {
@@ -101,13 +102,14 @@ type XHTTPReuseSettings struct {
type XHTTPDownloadSettings struct {
// xhttp part
Path *string `proxy:"path,omitempty"`
Host *string `proxy:"host,omitempty"`
Headers *map[string]string `proxy:"headers,omitempty"`
NoGRPCHeader *bool `proxy:"no-grpc-header,omitempty"`
XPaddingBytes *string `proxy:"x-padding-bytes,omitempty"`
ScMaxEachPostBytes *string `proxy:"sc-max-each-post-bytes,omitempty"`
ReuseSettings *XHTTPReuseSettings `proxy:"reuse-settings,omitempty"` // aka XMUX
Path *string `proxy:"path,omitempty"`
Host *string `proxy:"host,omitempty"`
Headers *map[string]string `proxy:"headers,omitempty"`
NoGRPCHeader *bool `proxy:"no-grpc-header,omitempty"`
XPaddingBytes *string `proxy:"x-padding-bytes,omitempty"`
ScMaxEachPostBytes *string `proxy:"sc-max-each-post-bytes,omitempty"`
ScMinPostsIntervalMs *string `proxy:"sc-min-posts-interval-ms,omitempty"`
ReuseSettings *XHTTPReuseSettings `proxy:"reuse-settings,omitempty"` // aka XMUX
// proxy part
Server *string `proxy:"server,omitempty"`
Port *int `proxy:"port,omitempty"`
@@ -544,14 +546,15 @@ func NewVless(option VlessOption) (*Vless, error) {
}
cfg := &xhttp.Config{
Host: requestHost,
Path: v.option.XHTTPOpts.Path,
Mode: v.option.XHTTPOpts.Mode,
Headers: v.option.XHTTPOpts.Headers,
NoGRPCHeader: v.option.XHTTPOpts.NoGRPCHeader,
XPaddingBytes: v.option.XHTTPOpts.XPaddingBytes,
ScMaxEachPostBytes: v.option.XHTTPOpts.ScMaxEachPostBytes,
ReuseConfig: reuseCfg,
Host: requestHost,
Path: v.option.XHTTPOpts.Path,
Mode: v.option.XHTTPOpts.Mode,
Headers: v.option.XHTTPOpts.Headers,
NoGRPCHeader: v.option.XHTTPOpts.NoGRPCHeader,
XPaddingBytes: v.option.XHTTPOpts.XPaddingBytes,
ScMaxEachPostBytes: v.option.XHTTPOpts.ScMaxEachPostBytes,
ScMinPostsIntervalMs: v.option.XHTTPOpts.ScMinPostsIntervalMs,
ReuseConfig: reuseCfg,
}
makeTransport := func() http.RoundTripper {
@@ -658,14 +661,15 @@ func NewVless(option VlessOption) (*Vless, error) {
}
cfg.DownloadConfig = &xhttp.Config{
Host: downloadHost,
Path: lo.FromPtrOr(ds.Path, v.option.XHTTPOpts.Path),
Mode: v.option.XHTTPOpts.Mode,
Headers: lo.FromPtrOr(ds.Headers, v.option.XHTTPOpts.Headers),
NoGRPCHeader: lo.FromPtrOr(ds.NoGRPCHeader, v.option.XHTTPOpts.NoGRPCHeader),
XPaddingBytes: lo.FromPtrOr(ds.XPaddingBytes, v.option.XHTTPOpts.XPaddingBytes),
ScMaxEachPostBytes: lo.FromPtrOr(ds.ScMaxEachPostBytes, v.option.XHTTPOpts.ScMaxEachPostBytes),
ReuseConfig: downloadReuseCfg,
Host: downloadHost,
Path: lo.FromPtrOr(ds.Path, v.option.XHTTPOpts.Path),
Mode: v.option.XHTTPOpts.Mode,
Headers: lo.FromPtrOr(ds.Headers, v.option.XHTTPOpts.Headers),
NoGRPCHeader: lo.FromPtrOr(ds.NoGRPCHeader, v.option.XHTTPOpts.NoGRPCHeader),
XPaddingBytes: lo.FromPtrOr(ds.XPaddingBytes, v.option.XHTTPOpts.XPaddingBytes),
ScMaxEachPostBytes: lo.FromPtrOr(ds.ScMaxEachPostBytes, v.option.XHTTPOpts.ScMaxEachPostBytes),
ScMinPostsIntervalMs: lo.FromPtrOr(ds.ScMinPostsIntervalMs, v.option.XHTTPOpts.ScMinPostsIntervalMs),
ReuseConfig: downloadReuseCfg,
}
makeDownloadTransport = func() http.RoundTripper {
+2
View File
@@ -826,6 +826,7 @@ proxies: # socks5
# no-grpc-header: false
# x-padding-bytes: "100-1000"
# sc-max-each-post-bytes: 1000000
# sc-min-posts-interval-ms: 30
# reuse-settings: # aka XMUX
# max-concurrency: "16-32"
# max-connections: "0"
@@ -842,6 +843,7 @@ proxies: # socks5
# no-grpc-header: false
# x-padding-bytes: "100-1000"
# sc-max-each-post-bytes: 1000000
# sc-min-posts-interval-ms: 30
# reuse-settings: # aka XMUX
# max-concurrency: "16-32"
# max-connections: "0"
+81 -28
View File
@@ -1,6 +1,7 @@
package xhttp
import (
"bytes"
"context"
"crypto/rand"
"encoding/hex"
@@ -39,36 +40,71 @@ type DialQUICFunc func(ctx context.Context, cfg *quic.Config) (*quic.Conn, error
type TransportMaker func() http.RoundTripper
type PacketUpWriter struct {
ctx context.Context
cancel context.CancelFunc
cfg *Config
scMaxEachPostBytes Range
sessionID string
transport http.RoundTripper
writeMu sync.Mutex
seq uint64
ctx context.Context
cancel context.CancelFunc
cfg *Config
scMaxEachPostBytes int
scMinPostsIntervalMs Range
sessionID string
transport http.RoundTripper
writeMu sync.Mutex
writeCond sync.Cond
seq uint64
buf []byte
timer *time.Timer
flushErr error
}
func (c *PacketUpWriter) Write(b []byte) (int, error) {
c.writeMu.Lock()
defer c.writeMu.Unlock()
scMaxEachPostBytes := c.scMaxEachPostBytes.Rand()
if len(b) < scMaxEachPostBytes {
return c.write(b)
if err := c.flushErr; err != nil {
return 0, err
}
var n int
for start := 0; start < len(b); start += scMaxEachPostBytes {
end := start + scMaxEachPostBytes
if end > len(b) {
end = len(b)
data := bytes.NewBuffer(b)
for data.Len() > 0 {
if c.timer == nil { // start a timer to flush the buffer
c.timer = time.AfterFunc(time.Duration(c.scMinPostsIntervalMs.Rand())*time.Millisecond, c.flush)
}
_n, err := c.write(b[start:end])
n += _n
if err != nil {
return n, err
c.buf = append(c.buf, data.Next(c.scMaxEachPostBytes-len(c.buf))...) // let buffer fill up to scMaxEachPostBytes
if len(c.buf) >= c.scMaxEachPostBytes { // too much data in buffer, wait the flush complete
c.writeCond.Wait()
if err := c.flushErr; err != nil {
return 0, err
}
}
}
return n, nil
return len(b), nil
}
func (c *PacketUpWriter) flush() {
c.writeMu.Lock()
defer c.writeMu.Unlock()
defer c.writeCond.Broadcast() // wake up the waited Write() call
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
if c.flushErr != nil {
return
}
if len(c.buf) == 0 {
return
}
_, err := c.write(c.buf)
c.buf = c.buf[:0] // reset buffer
if err != nil {
c.flushErr = err
return
}
}
func (c *PacketUpWriter) write(b []byte) (int, error) {
@@ -106,6 +142,15 @@ func (c *PacketUpWriter) write(b []byte) (int, error) {
}
func (c *PacketUpWriter) Close() error {
ch := make(chan struct{})
go func() { // flush in the background
defer close(ch)
c.flush()
}()
select {
case <-ch:
case <-time.After(time.Second):
}
c.cancel()
httputils.CloseTransport(c.transport)
return nil
@@ -185,6 +230,7 @@ type Client struct {
mode string
cfg *Config
scMaxEachPostBytes Range
scMinPostsIntervalMs Range
makeTransport TransportMaker
makeDownloadTransport TransportMaker
uploadManager *ReuseManager
@@ -202,12 +248,17 @@ func NewClient(cfg *Config, makeTransport TransportMaker, makeDownloadTransport
if err != nil {
return nil, err
}
scMinPostsIntervalMs, err := cfg.GetNormalizedScMinPostsIntervalMs()
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
client := &Client{
mode: mode,
cfg: cfg,
scMaxEachPostBytes: scMaxEachPostBytes,
scMinPostsIntervalMs: scMinPostsIntervalMs,
makeTransport: makeTransport,
makeDownloadTransport: makeDownloadTransport,
ctx: ctx,
@@ -455,14 +506,16 @@ func (c *Client) DialPacketUp() (net.Conn, error) {
writerCtx, writerCancel := context.WithCancel(c.ctx)
writer := &PacketUpWriter{
ctx: writerCtx,
cancel: writerCancel,
cfg: c.cfg,
scMaxEachPostBytes: c.scMaxEachPostBytes,
sessionID: sessionID,
transport: uploadTransport,
seq: 0,
ctx: writerCtx,
cancel: writerCancel,
cfg: c.cfg,
scMaxEachPostBytes: c.scMaxEachPostBytes.Rand(),
scMinPostsIntervalMs: c.scMinPostsIntervalMs,
sessionID: sessionID,
transport: uploadTransport,
seq: 0,
}
writer.writeCond = sync.Cond{L: &writer.writeMu}
conn := &Conn{writer: writer}
downloadReq, err := http.NewRequestWithContext(
+12
View File
@@ -21,6 +21,7 @@ type Config struct {
NoSSEHeader bool // server only
ScStreamUpServerSecs string // server only
ScMaxEachPostBytes string
ScMinPostsIntervalMs string
ReuseConfig *ReuseConfig
DownloadConfig *Config
}
@@ -120,6 +121,17 @@ func (c *Config) GetNormalizedScMaxEachPostBytes() (Range, error) {
return r, nil
}
func (c *Config) GetNormalizedScMinPostsIntervalMs() (Range, error) {
r, err := ParseRange(c.ScMinPostsIntervalMs, "30")
if err != nil {
return Range{}, fmt.Errorf("invalid sc-min-posts-interval-ms: %w", err)
}
if r.Max == 0 {
return Range{}, fmt.Errorf("invalid sc-min-posts-interval-ms: must be greater than zero")
}
return r, nil
}
type Range struct {
Min int
Max int