Update On Mon Nov 3 19:37:21 CET 2025

This commit is contained in:
github-action[bot]
2025-11-03 19:37:22 +01:00
parent e385f0040d
commit a96af7ed12
58 changed files with 584 additions and 612 deletions
+42 -31
View File
@@ -44,21 +44,22 @@ const (
// Mux manages the sessions and underlays.
type Mux struct {
// ---- common fields ----
isClient bool
endpoints []UnderlayProperties
underlays []Underlay
dialer apicommon.Dialer
resolver apicommon.DNSResolver
listenerFactory apicommon.ListenerFactory
chAccept chan net.Conn
acceptHasErr atomic.Bool
acceptErr chan error // this channel is closed when accept has error
used bool
done chan struct{}
ctx context.Context // mux master context
ctxCancelFunc context.CancelFunc // function to cancel master context when mux is closed
mu sync.Mutex
cleaner *time.Ticker
isClient bool
endpoints []UnderlayProperties
underlays []Underlay
dialer apicommon.Dialer
resolver apicommon.DNSResolver
streamListenerFactory apicommon.StreamListenerFactory
packetListenerFactory apicommon.PacketListenerFactory
chAccept chan net.Conn
acceptHasErr atomic.Bool
acceptErr chan error // this channel is closed when accept has error
used bool
done chan struct{}
ctx context.Context // mux master context
ctxCancelFunc context.CancelFunc // function to cancel master context when mux is closed
mu sync.Mutex
cleaner *time.Ticker
// ---- client only fields ----
username string
@@ -79,15 +80,16 @@ func NewMux(isClinet bool) *Mux {
log.Infof("Initializing server multiplexer")
}
mux := &Mux{
isClient: isClinet,
underlays: make([]Underlay, 0),
dialer: &net.Dialer{Timeout: 10 * time.Second, Control: sockopts.DefaultDialerControl()},
resolver: &net.Resolver{},
listenerFactory: &net.ListenConfig{Control: sockopts.DefaultListenerControl()},
chAccept: make(chan net.Conn, sessionChanCapacity),
acceptErr: make(chan error),
done: make(chan struct{}),
cleaner: time.NewTicker(underlayCleanInterval),
isClient: isClinet,
underlays: make([]Underlay, 0),
dialer: &net.Dialer{Timeout: 10 * time.Second, Control: sockopts.DefaultDialerControl()},
resolver: &net.Resolver{},
streamListenerFactory: &net.ListenConfig{Control: sockopts.DefaultListenerControl()},
packetListenerFactory: &net.ListenConfig{Control: sockopts.DefaultListenerControl()},
chAccept: make(chan net.Conn, sessionChanCapacity),
acceptErr: make(chan error),
done: make(chan struct{}),
cleaner: time.NewTicker(underlayCleanInterval),
}
mux.ctx, mux.ctxCancelFunc = context.WithCancel(context.Background())
@@ -157,12 +159,21 @@ func (m *Mux) SetResolver(resolver apicommon.DNSResolver) *Mux {
return m
}
// SetListenerFactory updates the network listener factory used by the mux.
func (m *Mux) SetListenerFactory(listenerFactory apicommon.ListenerFactory) *Mux {
// SetStreamListenerFactory updates the stream-oriented network listener factory used by the mux.
func (m *Mux) SetStreamListenerFactory(listenerFactory apicommon.StreamListenerFactory) *Mux {
m.mu.Lock()
defer m.mu.Unlock()
m.listenerFactory = listenerFactory
log.Infof("Mux listener factory has been updated")
m.streamListenerFactory = listenerFactory
log.Infof("Mux stream listener factory has been updated")
return m
}
// SetPacketListenerFactory updates the packet-oriented network listener factory used by the mux.
func (m *Mux) SetPacketListenerFactory(listenerFactory apicommon.PacketListenerFactory) *Mux {
m.mu.Lock()
defer m.mu.Unlock()
m.packetListenerFactory = listenerFactory
log.Infof("Mux packet listener factory has been updated")
return m
}
@@ -389,7 +400,7 @@ func (m *Mux) acceptUnderlayLoop(ctx context.Context, properties UnderlayPropert
}
return
}
rawListener, err := m.listenerFactory.Listen(ctx, tcpAddr.Network(), tcpAddr.String())
rawListener, err := m.streamListenerFactory.Listen(ctx, tcpAddr.Network(), tcpAddr.String())
if err != nil {
log.Errorf("Listen() failed: %v", err)
if m.acceptHasErr.CompareAndSwap(false, true) {
@@ -462,7 +473,7 @@ func (m *Mux) acceptUnderlayLoop(ctx context.Context, properties UnderlayPropert
}
return
}
conn, err := m.listenerFactory.ListenPacket(ctx, udpAddr.Network(), udpAddr.String())
conn, err := m.packetListenerFactory.ListenPacket(ctx, udpAddr.Network(), udpAddr.String())
if err != nil {
log.Errorf("ListenPacket() failed: %v", err)
if m.acceptHasErr.CompareAndSwap(false, true) {
@@ -592,7 +603,7 @@ func (m *Mux) newUnderlay(ctx context.Context) (Underlay, error) {
block.SetBlockContext(cipher.BlockContext{
UserName: m.username,
})
underlay, err = NewPacketUnderlay(ctx, p.RemoteAddr().Network(), p.RemoteAddr().String(), p.MTU(), block, m.resolver)
underlay, err = NewPacketUnderlay(ctx, m.packetListenerFactory, p.RemoteAddr().Network(), p.RemoteAddr().String(), p.MTU(), block, m.resolver)
if err != nil {
return nil, fmt.Errorf("NewUDPUnderlay() failed: %v", err)
}