diff --git a/adapter/outbound/trojan.go b/adapter/outbound/trojan.go index 2c04da12..fc3a6f82 100644 --- a/adapter/outbound/trojan.go +++ b/adapter/outbound/trojan.go @@ -27,7 +27,7 @@ type Trojan struct { hexPassword [trojan.KeyLength]byte // for gun mux - gunTransport *gun.Transport + gunClient *gun.Client realityConfig *tlsC.RealityConfig echConfig *ech.Config @@ -115,7 +115,7 @@ func (t *Trojan) StreamConnContext(ctx context.Context, c net.Conn, metadata *C. c, err = vmess.StreamWebsocketConn(ctx, c, wsOpts) case "grpc": - break // already handle in gun transport + break // already handle in dialContext default: // default tcp network // handle TLS @@ -175,7 +175,7 @@ func (t *Trojan) writeHeaderContext(ctx context.Context, c net.Conn, metadata *C func (t *Trojan) dialContext(ctx context.Context) (c net.Conn, err error) { switch t.option.Network { case "grpc": // gun transport - return t.gunTransport.Dial() + return t.gunClient.Dial() default: } return t.dialer.DialContext(ctx, "tcp", t.addr) @@ -236,10 +236,13 @@ func (t *Trojan) ProxyInfo() C.ProxyInfo { // Close implements C.ProxyAdapter func (t *Trojan) Close() error { - if t.gunTransport != nil { - return t.gunTransport.Close() + var errs []error + if t.gunClient != nil { + if err := t.gunClient.Close(); err != nil { + errs = append(errs, err) + } } - return nil + return errors.Join(errs...) } func NewTrojan(option TrojanOption) (*Trojan, error) { @@ -320,7 +323,14 @@ func NewTrojan(option TrojanOption) (*Trojan, error) { PingInterval: option.GrpcOpts.PingInterval, } - t.gunTransport = gun.NewTransport(dialFn, tlsConfig, gunConfig) + t.gunClient = gun.NewClient( + func() *gun.Transport { + return gun.NewTransport(dialFn, tlsConfig, gunConfig) + }, + option.GrpcOpts.MaxConnections, + option.GrpcOpts.MinStreams, + option.GrpcOpts.MaxStreams, + ) } return t, nil diff --git a/adapter/outbound/vless.go b/adapter/outbound/vless.go index 07526edd..93b10867 100644 --- a/adapter/outbound/vless.go +++ b/adapter/outbound/vless.go @@ -36,7 +36,7 @@ type Vless struct { encryption *encryption.ClientInstance // for gun mux - gunTransport *gun.Transport + gunClient *gun.Client // for xhttp xhttpClient *xhttp.Client @@ -196,9 +196,9 @@ func (v *Vless) StreamConnContext(ctx context.Context, c net.Conn, metadata *C.M c, err = vmess.StreamH2Conn(ctx, c, h2Opts) case "grpc": - break // already handle in gun transport + break // already handle in dialContext case "xhttp": - break // already handle in xhttp client + break // already handle in dialContext default: // default tcp network // handle TLS @@ -280,7 +280,7 @@ func (v *Vless) streamTLSConn(ctx context.Context, conn net.Conn, isH2 bool) (ne func (v *Vless) dialContext(ctx context.Context) (c net.Conn, err error) { switch v.option.Network { case "grpc": // gun transport - return v.gunTransport.Dial() + return v.gunClient.Dial() case "xhttp": return v.xhttpClient.Dial() default: @@ -358,8 +358,8 @@ func (v *Vless) ProxyInfo() C.ProxyInfo { // Close implements C.ProxyAdapter func (v *Vless) Close() error { var errs []error - if v.gunTransport != nil { - if err := v.gunTransport.Close(); err != nil { + if v.gunClient != nil { + if err := v.gunClient.Close(); err != nil { errs = append(errs, err) } } @@ -505,7 +505,14 @@ func NewVless(option VlessOption) (*Vless, error) { } } - v.gunTransport = gun.NewTransport(dialFn, tlsConfig, gunConfig) + v.gunClient = gun.NewClient( + func() *gun.Transport { + return gun.NewTransport(dialFn, tlsConfig, gunConfig) + }, + option.GrpcOpts.MaxConnections, + option.GrpcOpts.MinStreams, + option.GrpcOpts.MaxStreams, + ) case "xhttp": requestHost := v.option.XHTTPOpts.Host if requestHost == "" { diff --git a/adapter/outbound/vmess.go b/adapter/outbound/vmess.go index c12950d3..c2237986 100644 --- a/adapter/outbound/vmess.go +++ b/adapter/outbound/vmess.go @@ -34,7 +34,7 @@ type Vmess struct { option *VmessOption // for gun mux - gunTransport *gun.Transport + gunClient *gun.Client realityConfig *tlsC.RealityConfig echConfig *ech.Config @@ -86,6 +86,9 @@ type GrpcOptions struct { GrpcServiceName string `proxy:"grpc-service-name,omitempty"` GrpcUserAgent string `proxy:"grpc-user-agent,omitempty"` PingInterval int `proxy:"ping-interval,omitempty"` + MaxConnections int `proxy:"max-connections,omitempty"` + MinStreams int `proxy:"min-streams,omitempty"` + MaxStreams int `proxy:"max-streams,omitempty"` } type WSOptions struct { @@ -172,7 +175,7 @@ func (v *Vmess) StreamConnContext(ctx context.Context, c net.Conn, metadata *C.M c, err = mihomoVMess.StreamH2Conn(ctx, c, h2Opts) case "grpc": - break // already handle in gun transport + break // already handle in dialContext default: // default tcp network // handle TLS @@ -274,7 +277,7 @@ func (v *Vmess) streamTLSConn(ctx context.Context, conn net.Conn, isH2 bool) (ne func (v *Vmess) dialContext(ctx context.Context) (c net.Conn, err error) { switch v.option.Network { case "grpc": // gun transport - return v.gunTransport.Dial() + return v.gunClient.Dial() default: } return v.dialer.DialContext(ctx, "tcp", v.addr) @@ -331,10 +334,13 @@ func (v *Vmess) ProxyInfo() C.ProxyInfo { // Close implements C.ProxyAdapter func (v *Vmess) Close() error { - if v.gunTransport != nil { - return v.gunTransport.Close() + var errs []error + if v.gunClient != nil { + if err := v.gunClient.Close(); err != nil { + errs = append(errs, err) + } } - return nil + return errors.Join(errs...) } // SupportUOT implements C.ProxyAdapter @@ -438,7 +444,14 @@ func NewVmess(option VmessOption) (*Vmess, error) { } } - v.gunTransport = gun.NewTransport(dialFn, tlsConfig, gunConfig) + v.gunClient = gun.NewClient( + func() *gun.Transport { + return gun.NewTransport(dialFn, tlsConfig, gunConfig) + }, + option.GrpcOpts.MaxConnections, + option.GrpcOpts.MinStreams, + option.GrpcOpts.MaxStreams, + ) } return v, nil diff --git a/docs/config.yaml b/docs/config.yaml index 99fed80c..ea0fdb9a 100644 --- a/docs/config.yaml +++ b/docs/config.yaml @@ -670,6 +670,9 @@ proxies: # socks5 grpc-service-name: "example" # grpc-user-agent: "grpc-go/1.36.0" # ping-interval: 0 # 默认关闭,单位为秒 + # max-connections: 1 # Maximum connections. Conflict with max-streams. + # min-streams: 0 # Minimum multiplexed streams in a connection before opening a new connection. Conflict with max-streams. + # max-streams: 0 # Maximum multiplexed streams in a connection before opening a new connection. Conflict with max-connections and min-streams. # ip-version: ipv4 # vless @@ -761,6 +764,9 @@ proxies: # socks5 grpc-service-name: "grpc" # grpc-user-agent: "grpc-go/1.36.0" # ping-interval: 0 # 默认关闭,单位为秒 + # max-connections: 1 # Maximum connections. Conflict with max-streams. + # min-streams: 0 # Minimum multiplexed streams in a connection before opening a new connection. Conflict with max-streams. + # max-streams: 0 # Maximum multiplexed streams in a connection before opening a new connection. Conflict with max-connections and min-streams. reality-opts: public-key: CrrQSjAG_YkHLwvM2M-7XkKJilgL5upBKCp0od0tLhE @@ -896,6 +902,9 @@ proxies: # socks5 grpc-service-name: "example" # grpc-user-agent: "grpc-go/1.36.0" # ping-interval: 0 # 默认关闭,单位为秒 + # max-connections: 1 # Maximum connections. Conflict with max-streams. + # min-streams: 0 # Minimum multiplexed streams in a connection before opening a new connection. Conflict with max-streams. + # max-streams: 0 # Maximum multiplexed streams in a connection before opening a new connection. Conflict with max-connections and min-streams. - name: trojan-ws server: server diff --git a/transport/gun/gun.go b/transport/gun/gun.go index aec7e4e0..da747f88 100644 --- a/transport/gun/gun.go +++ b/transport/gun/gun.go @@ -13,6 +13,7 @@ import ( "net/url" "strings" "sync" + "sync/atomic" "time" "github.com/metacubex/mihomo/common/buf" @@ -51,6 +52,7 @@ type Conn struct { closeMutex sync.Mutex closed bool + onClose func() // deadlines deadline *time.Timer @@ -209,6 +211,10 @@ func (g *Conn) Close() error { } } + if g.onClose != nil { + g.onClose() + } + return errors.Join(errorArr...) } @@ -240,6 +246,7 @@ type Transport struct { ctx context.Context cancel context.CancelFunc closeOnce sync.Once + count atomic.Int64 } func (t *Transport) Close() error { @@ -349,6 +356,9 @@ func (t *Transport) Dial() (net.Conn, error) { writer: writer, } + t.count.Add(1) + conn.onClose = func() { t.count.Add(-1) } + go conn.Init() // ensure conn.initOnce.Do has been called before return @@ -358,6 +368,78 @@ func (t *Transport) Dial() (net.Conn, error) { return conn, nil } +type Client struct { + mutex sync.Mutex + maxConnections int + minStreams int + maxStreams int + transports []*Transport + maker func() *Transport +} + +func NewClient(maker func() *Transport, maxConnections, minStreams, maxStreams int) *Client { + if maxConnections == 0 && minStreams == 0 && maxStreams == 0 { + maxConnections = 1 + } + return &Client{ + maxConnections: maxConnections, + minStreams: minStreams, + maxStreams: maxStreams, + maker: maker, + } +} + +func (c *Client) Dial() (net.Conn, error) { + return c.getTransport().Dial() +} + +func (c *Client) Close() error { + c.mutex.Lock() + defer c.mutex.Unlock() + var errs []error + for _, t := range c.transports { + if err := t.Close(); err != nil { + errs = append(errs, err) + } + } + c.transports = nil + return errors.Join(errs...) +} + +func (c *Client) getTransport() *Transport { + c.mutex.Lock() + defer c.mutex.Unlock() + var transport *Transport + for _, t := range c.transports { + if transport == nil || t.count.Load() < transport.count.Load() { + transport = t + } + } + if transport == nil { + return c.newTransportLocked() + } + numStreams := int(transport.count.Load()) + if numStreams == 0 { + return transport + } + if c.maxConnections > 0 { + if len(c.transports) >= c.maxConnections || numStreams < c.minStreams { + return transport + } + } else { + if c.maxStreams > 0 && numStreams < c.maxStreams { + return transport + } + } + return c.newTransportLocked() +} + +func (c *Client) newTransportLocked() *Transport { + transport := c.maker() + c.transports = append(c.transports, transport) + return transport +} + func StreamGunWithConn(conn net.Conn, tlsConfig *vmess.TLSConfig, gunCfg *Config) (net.Conn, error) { dialFn := func(ctx context.Context, network, addr string) (net.Conn, error) { return conn, nil