diff --git a/docs/config.yaml b/docs/config.yaml index 67bcc405..42f8ef4e 100644 --- a/docs/config.yaml +++ b/docs/config.yaml @@ -1687,6 +1687,7 @@ listeners: # host: "" # mode: auto # Available: "stream-one", "stream-up" or "packet-up" # no-sse-header: false + # sc-max-buffered-posts: 30 # sc-stream-up-server-secs: "20-80" # sc-max-each-post-bytes: 1000000 # ------------------------- diff --git a/listener/config/vless.go b/listener/config/vless.go index 9246cb99..5e67e3f7 100644 --- a/listener/config/vless.go +++ b/listener/config/vless.go @@ -36,6 +36,7 @@ type XHTTPConfig struct { Mode string NoSSEHeader bool ScStreamUpServerSecs string + ScMaxBufferedPosts string ScMaxEachPostBytes string } diff --git a/listener/inbound/vless.go b/listener/inbound/vless.go index 653dc8c5..aad9e70e 100644 --- a/listener/inbound/vless.go +++ b/listener/inbound/vless.go @@ -37,6 +37,7 @@ type XHTTPConfig struct { Mode string `inbound:"mode,omitempty"` NoSSEHeader bool `inbound:"no-sse-header,omitempty"` ScStreamUpServerSecs string `inbound:"sc-stream-up-server-secs,omitempty"` + ScMaxBufferedPosts string `inbound:"sc-max-buffered-posts,omitempty"` ScMaxEachPostBytes string `inbound:"sc-max-each-post-bytes,omitempty"` } @@ -47,6 +48,7 @@ func (o XHTTPConfig) Build() LC.XHTTPConfig { Mode: o.Mode, NoSSEHeader: o.NoSSEHeader, ScStreamUpServerSecs: o.ScStreamUpServerSecs, + ScMaxBufferedPosts: o.ScMaxBufferedPosts, ScMaxEachPostBytes: o.ScMaxEachPostBytes, } } diff --git a/listener/sing_vless/server.go b/listener/sing_vless/server.go index 5c609632..d684056b 100644 --- a/listener/sing_vless/server.go +++ b/listener/sing_vless/server.go @@ -161,6 +161,7 @@ func New(config LC.VlessServer, tunnel C.Tunnel, additions ...inbound.Addition) Mode: config.XHTTPConfig.Mode, NoSSEHeader: config.XHTTPConfig.NoSSEHeader, ScStreamUpServerSecs: config.XHTTPConfig.ScStreamUpServerSecs, + ScMaxBufferedPosts: config.XHTTPConfig.ScMaxBufferedPosts, ScMaxEachPostBytes: config.XHTTPConfig.ScMaxEachPostBytes, }, ConnHandler: func(conn net.Conn) { diff --git a/transport/xhttp/config.go b/transport/xhttp/config.go index 6caed3fb..0c6d2a78 100644 --- a/transport/xhttp/config.go +++ b/transport/xhttp/config.go @@ -20,6 +20,7 @@ type Config struct { XPaddingBytes string NoSSEHeader bool // server only ScStreamUpServerSecs string // server only + ScMaxBufferedPosts string // server only ScMaxEachPostBytes string ScMinPostsIntervalMs string ReuseConfig *ReuseConfig @@ -110,8 +111,19 @@ func (c *Config) GetNormalizedScStreamUpServerSecs() (Range, error) { return r, nil } +func (c *Config) GetNormalizedScMaxBufferedPosts() (Range, error) { + r, err := ParseRange(c.ScMaxBufferedPosts, "30") + if err != nil { + return Range{}, fmt.Errorf("invalid sc-max-buffered-posts: %w", err) + } + if r.Max == 0 { + return Range{}, fmt.Errorf("invalid sc-max-buffered-posts: must be greater than zero") + } + return r, nil +} + func (c *Config) GetNormalizedScMaxEachPostBytes() (Range, error) { - r, err := ParseRange(c.ScStreamUpServerSecs, "1000000") + r, err := ParseRange(c.ScMaxEachPostBytes, "1000000") if err != nil { return Range{}, fmt.Errorf("invalid sc-max-each-post-bytes: %w", err) } diff --git a/transport/xhttp/server.go b/transport/xhttp/server.go index 2f32bb31..ce71bb1c 100644 --- a/transport/xhttp/server.go +++ b/transport/xhttp/server.go @@ -80,9 +80,9 @@ type httpSession struct { once sync.Once } -func newHTTPSession() *httpSession { +func newHTTPSession(maxPackets int) *httpSession { return &httpSession{ - uploadQueue: NewUploadQueue(), + uploadQueue: NewUploadQueue(maxPackets), connected: make(chan struct{}), } } @@ -100,6 +100,7 @@ type requestHandler struct { scMaxEachPostBytes Range scStreamUpServerSecs Range + scMaxBufferedPosts Range mu sync.Mutex sessions map[string]*httpSession @@ -114,6 +115,10 @@ func NewServerHandler(opt ServerOption) (http.Handler, error) { if err != nil { return nil, err } + scMaxBufferedPosts, err := opt.Config.GetNormalizedScMaxBufferedPosts() + if err != nil { + return nil, err + } // using h2c.NewHandler to ensure we can work in plain http2 // and some tls conn is not *tls.Conn (like *reality.Conn) return h2c.NewHandler(&requestHandler{ @@ -122,6 +127,7 @@ func NewServerHandler(opt ServerOption) (http.Handler, error) { httpHandler: opt.HttpHandler, scMaxEachPostBytes: scMaxEachPostBytes, scStreamUpServerSecs: scStreamUpServerSecs, + scMaxBufferedPosts: scMaxBufferedPosts, sessions: map[string]*httpSession{}, }, &http.Http2Server{ IdleTimeout: 30 * time.Second, @@ -137,7 +143,7 @@ func (h *requestHandler) getOrCreateSession(sessionID string) *httpSession { return s } - s = newHTTPSession() + s = newHTTPSession(h.scMaxBufferedPosts.Max) h.sessions[sessionID] = s return s } @@ -371,10 +377,11 @@ func (h *requestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - if err := session.uploadQueue.Push(Packet{ + err = session.uploadQueue.Push(Packet{ Seq: seq, Payload: body, - }); err != nil { + }) + if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } diff --git a/transport/xhttp/upload_queue.go b/transport/xhttp/upload_queue.go index 731f85bd..1fac71bb 100644 --- a/transport/xhttp/upload_queue.go +++ b/transport/xhttp/upload_queue.go @@ -13,20 +13,22 @@ type Packet struct { } type UploadQueue struct { - mu sync.Mutex - cond *sync.Cond - packets map[uint64][]byte - nextSeq uint64 - buf []byte - closed bool - reader io.ReadCloser + mu sync.Mutex + cond sync.Cond + packets map[uint64][]byte + nextSeq uint64 + buf []byte + closed bool + maxPackets int + reader io.ReadCloser } -func NewUploadQueue() *UploadQueue { +func NewUploadQueue(maxPackets int) *UploadQueue { q := &UploadQueue{ - packets: make(map[uint64][]byte), + packets: make(map[uint64][]byte, maxPackets), + maxPackets: maxPackets, } - q.cond = sync.NewCond(&q.mu) + q.cond = sync.Cond{L: &q.mu} return q } @@ -48,6 +50,13 @@ func (q *UploadQueue) Push(p Packet) error { return nil } + if len(q.packets) > q.maxPackets { + // the "reassembly buffer" is too large, and we want to + // constrain memory usage somehow. let's tear down the + // connection, and hope the application retries. + return errors.New("packet queue is too large") + } + q.packets[p.Seq] = p.Payload q.cond.Broadcast() return nil