feat: add max-connections, min-streams and max-streams options to trusttunnel client

This commit is contained in:
wwqgtxx
2026-04-06 21:07:04 +08:00
parent 8b37e99f35
commit caacf7a9cd
4 changed files with 134 additions and 3 deletions
+10 -3
View File
@@ -14,7 +14,7 @@ import (
type TrustTunnel struct {
*Base
client *trusttunnel.Client
client *trusttunnel.PoolClient
option *TrustTunnelOption
}
@@ -35,10 +35,14 @@ type TrustTunnelOption struct {
PrivateKey string `proxy:"private-key,omitempty"`
UDP bool `proxy:"udp,omitempty"`
HealthCheck bool `proxy:"health-check,omitempty"`
// quic options
Quic bool `proxy:"quic,omitempty"`
CongestionController string `proxy:"congestion-controller,omitempty"`
CWND int `proxy:"cwnd,omitempty"`
// reuse options
MaxConnections int `proxy:"max-connections,omitempty"`
MinStreams int `proxy:"min-streams,omitempty"`
MaxStreams int `proxy:"max-streams,omitempty"`
}
func (t *TrustTunnel) DialContext(ctx context.Context, metadata *C.Metadata) (_ C.Conn, err error) {
@@ -114,6 +118,9 @@ func NewTrustTunnel(option TrustTunnelOption) (*TrustTunnel, error) {
QUICCongestionControl: option.CongestionController,
QUICCwnd: option.CWND,
HealthCheck: option.HealthCheck,
MaxConnections: option.MaxConnections,
MinStreams: option.MinStreams,
MaxStreams: option.MaxStreams,
}
echConfig, err := option.ECHOpts.Parse()
if err != nil {
@@ -134,7 +141,7 @@ func NewTrustTunnel(option TrustTunnelOption) (*TrustTunnel, error) {
}
tOption.TLSConfig = tlsConfig
client, err := trusttunnel.NewClient(context.TODO(), tOption)
client, err := trusttunnel.NewPoolClient(context.TODO(), tOption)
if err != nil {
return nil, err
}
+5
View File
@@ -1210,8 +1210,13 @@ proxies: # socks5
# alpn:
# - h2
# skip-cert-verify: true
### quic options
# quic: true # 默认为false
# congestion-controller: bbr
### reuse options
# max-connections: 1 # Maximum connections. Conflict with max-streams.
# min-streams: 0 # Minimum multiplexed streams in a connection before opening a new connection. Conflict with max-streams.
# max-streams: 0 # Maximum multiplexed streams in a connection before opening a new connection. Conflict with max-connections and min-streams.
# dns 出站会将请求劫持到内部 dns 模块,所有请求均在内部处理
- name: "dns-out"
+115
View File
@@ -9,9 +9,11 @@ import (
"net/netip"
"net/url"
"sync"
"sync/atomic"
"time"
"github.com/metacubex/mihomo/common/httputils"
"github.com/metacubex/mihomo/common/once"
C "github.com/metacubex/mihomo/constant"
"github.com/metacubex/mihomo/transport/vmess"
@@ -33,6 +35,9 @@ type ClientOptions struct {
QUICCongestionControl string
QUICCwnd int
HealthCheck bool
MaxConnections int
MinStreams int
MaxStreams int
}
type Client struct {
@@ -45,6 +50,7 @@ type Client struct {
startOnce sync.Once
healthCheck bool
healthCheckTimer *time.Timer
count atomic.Int64
}
func NewClient(ctx context.Context, options ClientOptions) (client *Client, err error) {
@@ -134,6 +140,10 @@ func (c *Client) roundTrip(request *http.Request, conn *httpConn) {
writer: pipeWriter,
created: make(chan struct{}),
}
c.count.Add(1)
conn.closeFn = once.OnceFunc(func() {
c.count.Add(-1)
})
ctx, cancel := context.WithCancel(c.ctx) // requestCtx must alive during conn not closed
conn.cancelFn = cancel // cancel ctx when conn closed
go func() {
@@ -245,3 +255,108 @@ func (c *Client) HealthCheck(ctx context.Context) error {
}
return nil
}
type PoolClient struct {
mutex sync.Mutex
maxConnections int
minStreams int
maxStreams int
ctx context.Context
options ClientOptions
clients []*Client
}
func NewPoolClient(ctx context.Context, options ClientOptions) (*PoolClient, error) {
maxConnections := options.MaxConnections
minStreams := options.MinStreams
maxStreams := options.MaxStreams
if maxConnections == 0 && minStreams == 0 && maxStreams == 0 {
maxConnections = 1
}
client, err := NewClient(ctx, options) // reserve one client and verify the configuration
if err != nil {
return nil, err
}
return &PoolClient{
maxConnections: maxConnections,
minStreams: minStreams,
maxStreams: maxStreams,
ctx: ctx,
options: options,
clients: []*Client{client},
}, nil
}
func (c *PoolClient) Dial(ctx context.Context, host string) (net.Conn, error) {
transport, err := c.getClient()
if err != nil {
return nil, err
}
return transport.Dial(ctx, host)
}
func (c *PoolClient) ListenPacket(ctx context.Context) (net.PacketConn, error) {
transport, err := c.getClient()
if err != nil {
return nil, err
}
return transport.ListenPacket(ctx)
}
func (c *PoolClient) ListenICMP(ctx context.Context) (*IcmpConn, error) {
transport, err := c.getClient()
if err != nil {
return nil, err
}
return transport.ListenICMP(ctx)
}
func (c *PoolClient) Close() error {
c.mutex.Lock()
defer c.mutex.Unlock()
var errs []error
for _, t := range c.clients {
if err := t.Close(); err != nil {
errs = append(errs, err)
}
}
c.clients = nil
return errors.Join(errs...)
}
func (c *PoolClient) getClient() (*Client, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
var transport *Client
for _, t := range c.clients {
if transport == nil || t.count.Load() < transport.count.Load() {
transport = t
}
}
if transport == nil {
return c.newTransportLocked()
}
numStreams := int(transport.count.Load())
if numStreams == 0 {
return transport, nil
}
if c.maxConnections > 0 {
if len(c.clients) >= c.maxConnections || numStreams < c.minStreams {
return transport, nil
}
} else {
if c.maxStreams > 0 && numStreams < c.maxStreams {
return transport, nil
}
}
return c.newTransportLocked()
}
func (c *PoolClient) newTransportLocked() (*Client, error) {
transport, err := NewClient(c.ctx, c.options)
if err != nil {
return nil, err
}
c.clients = append(c.clients, transport)
return transport, nil
}
+4
View File
@@ -98,6 +98,7 @@ type httpConn struct {
created chan struct{}
createErr error
cancelFn func()
closeFn func()
httputils.NetAddr
// deadlines
@@ -129,6 +130,9 @@ func (h *httpConn) Close() error {
if h.cancelFn != nil {
h.cancelFn()
}
if h.closeFn != nil {
h.closeFn()
}
return errors.Join(errorArr...)
}