feat: add max-connections, min-streams and max-streams options to grpc-opts

This commit is contained in:
wwqgtxx
2026-04-05 19:43:10 +08:00
parent 78e4b844c2
commit 77095546ae
5 changed files with 142 additions and 21 deletions
+17 -7
View File
@@ -27,7 +27,7 @@ type Trojan struct {
hexPassword [trojan.KeyLength]byte
// for gun mux
gunTransport *gun.Transport
gunClient *gun.Client
realityConfig *tlsC.RealityConfig
echConfig *ech.Config
@@ -115,7 +115,7 @@ func (t *Trojan) StreamConnContext(ctx context.Context, c net.Conn, metadata *C.
c, err = vmess.StreamWebsocketConn(ctx, c, wsOpts)
case "grpc":
break // already handle in gun transport
break // already handle in dialContext
default:
// default tcp network
// handle TLS
@@ -175,7 +175,7 @@ func (t *Trojan) writeHeaderContext(ctx context.Context, c net.Conn, metadata *C
func (t *Trojan) dialContext(ctx context.Context) (c net.Conn, err error) {
switch t.option.Network {
case "grpc": // gun transport
return t.gunTransport.Dial()
return t.gunClient.Dial()
default:
}
return t.dialer.DialContext(ctx, "tcp", t.addr)
@@ -236,10 +236,13 @@ func (t *Trojan) ProxyInfo() C.ProxyInfo {
// Close implements C.ProxyAdapter
func (t *Trojan) Close() error {
if t.gunTransport != nil {
return t.gunTransport.Close()
var errs []error
if t.gunClient != nil {
if err := t.gunClient.Close(); err != nil {
errs = append(errs, err)
}
}
return nil
return errors.Join(errs...)
}
func NewTrojan(option TrojanOption) (*Trojan, error) {
@@ -320,7 +323,14 @@ func NewTrojan(option TrojanOption) (*Trojan, error) {
PingInterval: option.GrpcOpts.PingInterval,
}
t.gunTransport = gun.NewTransport(dialFn, tlsConfig, gunConfig)
t.gunClient = gun.NewClient(
func() *gun.Transport {
return gun.NewTransport(dialFn, tlsConfig, gunConfig)
},
option.GrpcOpts.MaxConnections,
option.GrpcOpts.MinStreams,
option.GrpcOpts.MaxStreams,
)
}
return t, nil
+14 -7
View File
@@ -36,7 +36,7 @@ type Vless struct {
encryption *encryption.ClientInstance
// for gun mux
gunTransport *gun.Transport
gunClient *gun.Client
// for xhttp
xhttpClient *xhttp.Client
@@ -196,9 +196,9 @@ func (v *Vless) StreamConnContext(ctx context.Context, c net.Conn, metadata *C.M
c, err = vmess.StreamH2Conn(ctx, c, h2Opts)
case "grpc":
break // already handle in gun transport
break // already handle in dialContext
case "xhttp":
break // already handle in xhttp client
break // already handle in dialContext
default:
// default tcp network
// handle TLS
@@ -280,7 +280,7 @@ func (v *Vless) streamTLSConn(ctx context.Context, conn net.Conn, isH2 bool) (ne
func (v *Vless) dialContext(ctx context.Context) (c net.Conn, err error) {
switch v.option.Network {
case "grpc": // gun transport
return v.gunTransport.Dial()
return v.gunClient.Dial()
case "xhttp":
return v.xhttpClient.Dial()
default:
@@ -358,8 +358,8 @@ func (v *Vless) ProxyInfo() C.ProxyInfo {
// Close implements C.ProxyAdapter
func (v *Vless) Close() error {
var errs []error
if v.gunTransport != nil {
if err := v.gunTransport.Close(); err != nil {
if v.gunClient != nil {
if err := v.gunClient.Close(); err != nil {
errs = append(errs, err)
}
}
@@ -505,7 +505,14 @@ func NewVless(option VlessOption) (*Vless, error) {
}
}
v.gunTransport = gun.NewTransport(dialFn, tlsConfig, gunConfig)
v.gunClient = gun.NewClient(
func() *gun.Transport {
return gun.NewTransport(dialFn, tlsConfig, gunConfig)
},
option.GrpcOpts.MaxConnections,
option.GrpcOpts.MinStreams,
option.GrpcOpts.MaxStreams,
)
case "xhttp":
requestHost := v.option.XHTTPOpts.Host
if requestHost == "" {
+20 -7
View File
@@ -34,7 +34,7 @@ type Vmess struct {
option *VmessOption
// for gun mux
gunTransport *gun.Transport
gunClient *gun.Client
realityConfig *tlsC.RealityConfig
echConfig *ech.Config
@@ -86,6 +86,9 @@ type GrpcOptions struct {
GrpcServiceName string `proxy:"grpc-service-name,omitempty"`
GrpcUserAgent string `proxy:"grpc-user-agent,omitempty"`
PingInterval int `proxy:"ping-interval,omitempty"`
MaxConnections int `proxy:"max-connections,omitempty"`
MinStreams int `proxy:"min-streams,omitempty"`
MaxStreams int `proxy:"max-streams,omitempty"`
}
type WSOptions struct {
@@ -172,7 +175,7 @@ func (v *Vmess) StreamConnContext(ctx context.Context, c net.Conn, metadata *C.M
c, err = mihomoVMess.StreamH2Conn(ctx, c, h2Opts)
case "grpc":
break // already handle in gun transport
break // already handle in dialContext
default:
// default tcp network
// handle TLS
@@ -274,7 +277,7 @@ func (v *Vmess) streamTLSConn(ctx context.Context, conn net.Conn, isH2 bool) (ne
func (v *Vmess) dialContext(ctx context.Context) (c net.Conn, err error) {
switch v.option.Network {
case "grpc": // gun transport
return v.gunTransport.Dial()
return v.gunClient.Dial()
default:
}
return v.dialer.DialContext(ctx, "tcp", v.addr)
@@ -331,10 +334,13 @@ func (v *Vmess) ProxyInfo() C.ProxyInfo {
// Close implements C.ProxyAdapter
func (v *Vmess) Close() error {
if v.gunTransport != nil {
return v.gunTransport.Close()
var errs []error
if v.gunClient != nil {
if err := v.gunClient.Close(); err != nil {
errs = append(errs, err)
}
}
return nil
return errors.Join(errs...)
}
// SupportUOT implements C.ProxyAdapter
@@ -438,7 +444,14 @@ func NewVmess(option VmessOption) (*Vmess, error) {
}
}
v.gunTransport = gun.NewTransport(dialFn, tlsConfig, gunConfig)
v.gunClient = gun.NewClient(
func() *gun.Transport {
return gun.NewTransport(dialFn, tlsConfig, gunConfig)
},
option.GrpcOpts.MaxConnections,
option.GrpcOpts.MinStreams,
option.GrpcOpts.MaxStreams,
)
}
return v, nil
+9
View File
@@ -670,6 +670,9 @@ proxies: # socks5
grpc-service-name: "example"
# grpc-user-agent: "grpc-go/1.36.0"
# ping-interval: 0 # 默认关闭,单位为秒
# max-connections: 1 # Maximum connections. Conflict with max-streams.
# min-streams: 0 # Minimum multiplexed streams in a connection before opening a new connection. Conflict with max-streams.
# max-streams: 0 # Maximum multiplexed streams in a connection before opening a new connection. Conflict with max-connections and min-streams.
# ip-version: ipv4
# vless
@@ -761,6 +764,9 @@ proxies: # socks5
grpc-service-name: "grpc"
# grpc-user-agent: "grpc-go/1.36.0"
# ping-interval: 0 # 默认关闭,单位为秒
# max-connections: 1 # Maximum connections. Conflict with max-streams.
# min-streams: 0 # Minimum multiplexed streams in a connection before opening a new connection. Conflict with max-streams.
# max-streams: 0 # Maximum multiplexed streams in a connection before opening a new connection. Conflict with max-connections and min-streams.
reality-opts:
public-key: CrrQSjAG_YkHLwvM2M-7XkKJilgL5upBKCp0od0tLhE
@@ -896,6 +902,9 @@ proxies: # socks5
grpc-service-name: "example"
# grpc-user-agent: "grpc-go/1.36.0"
# ping-interval: 0 # 默认关闭,单位为秒
# max-connections: 1 # Maximum connections. Conflict with max-streams.
# min-streams: 0 # Minimum multiplexed streams in a connection before opening a new connection. Conflict with max-streams.
# max-streams: 0 # Maximum multiplexed streams in a connection before opening a new connection. Conflict with max-connections and min-streams.
- name: trojan-ws
server: server
+82
View File
@@ -13,6 +13,7 @@ import (
"net/url"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/metacubex/mihomo/common/buf"
@@ -51,6 +52,7 @@ type Conn struct {
closeMutex sync.Mutex
closed bool
onClose func()
// deadlines
deadline *time.Timer
@@ -209,6 +211,10 @@ func (g *Conn) Close() error {
}
}
if g.onClose != nil {
g.onClose()
}
return errors.Join(errorArr...)
}
@@ -240,6 +246,7 @@ type Transport struct {
ctx context.Context
cancel context.CancelFunc
closeOnce sync.Once
count atomic.Int64
}
func (t *Transport) Close() error {
@@ -349,6 +356,9 @@ func (t *Transport) Dial() (net.Conn, error) {
writer: writer,
}
t.count.Add(1)
conn.onClose = func() { t.count.Add(-1) }
go conn.Init()
// ensure conn.initOnce.Do has been called before return
@@ -358,6 +368,78 @@ func (t *Transport) Dial() (net.Conn, error) {
return conn, nil
}
type Client struct {
mutex sync.Mutex
maxConnections int
minStreams int
maxStreams int
transports []*Transport
maker func() *Transport
}
func NewClient(maker func() *Transport, maxConnections, minStreams, maxStreams int) *Client {
if maxConnections == 0 && minStreams == 0 && maxStreams == 0 {
maxConnections = 1
}
return &Client{
maxConnections: maxConnections,
minStreams: minStreams,
maxStreams: maxStreams,
maker: maker,
}
}
func (c *Client) Dial() (net.Conn, error) {
return c.getTransport().Dial()
}
func (c *Client) Close() error {
c.mutex.Lock()
defer c.mutex.Unlock()
var errs []error
for _, t := range c.transports {
if err := t.Close(); err != nil {
errs = append(errs, err)
}
}
c.transports = nil
return errors.Join(errs...)
}
func (c *Client) getTransport() *Transport {
c.mutex.Lock()
defer c.mutex.Unlock()
var transport *Transport
for _, t := range c.transports {
if transport == nil || t.count.Load() < transport.count.Load() {
transport = t
}
}
if transport == nil {
return c.newTransportLocked()
}
numStreams := int(transport.count.Load())
if numStreams == 0 {
return transport
}
if c.maxConnections > 0 {
if len(c.transports) >= c.maxConnections || numStreams < c.minStreams {
return transport
}
} else {
if c.maxStreams > 0 && numStreams < c.maxStreams {
return transport
}
}
return c.newTransportLocked()
}
func (c *Client) newTransportLocked() *Transport {
transport := c.maker()
c.transports = append(c.transports, transport)
return transport
}
func StreamGunWithConn(conn net.Conn, tlsConfig *vmess.TLSConfig, gunCfg *Config) (net.Conn, error) {
dialFn := func(ctx context.Context, network, addr string) (net.Conn, error) {
return conn, nil