p2p: add OnPeerLeave option

This commit is contained in:
rkonfj
2024-11-30 16:40:37 +08:00
parent 2341432d21
commit 843064e735
5 changed files with 85 additions and 58 deletions
+3
View File
@@ -30,6 +30,8 @@ func (code ControlCode) String() string {
return "UPDATE_NAT_INFO"
case CONTROL_UPDATE_META:
return "UPDATE_PEER"
case CONTROL_PEER_LEAVE:
return "PEER_LEAVE"
case CONTROL_CONN:
return "CONTROL_CONN"
default:
@@ -49,6 +51,7 @@ const (
CONTROL_UPDATE_NETWORK_SECRET ControlCode = 20
CONTROL_UPDATE_NAT_INFO ControlCode = 21
CONTROL_UPDATE_META ControlCode = 22
CONTROL_PEER_LEAVE ControlCode = 25
CONTROL_CONN ControlCode = 30
)
+28 -31
View File
@@ -28,6 +28,11 @@ var (
_ disco.ControllerManager = (*WSConn)(nil)
)
type Event struct {
ControlCode disco.ControlCode
Data any
}
type WSConn struct {
rawConn atomic.Pointer[websocket.Conn]
server *disco.Server
@@ -37,9 +42,7 @@ type WSConn struct {
closedSig chan int
closed atomic.Bool
datagrams chan *disco.Datagram
peers chan *disco.Peer
peersMeta chan *disco.Peer
peersUDPAddrs chan *disco.PeerUDPAddr
events chan Event
nonce byte
stuns []string
activeTime atomic.Int64
@@ -97,8 +100,7 @@ func (c *WSConn) Close() error {
c.closed.Store(true)
close(c.closedSig)
close(c.datagrams)
close(c.peers)
close(c.peersUDPAddrs)
close(c.events)
close(c.connData)
close(c.connEOF)
if conn := c.rawConn.Load(); conn != nil {
@@ -159,16 +161,8 @@ func (c *WSConn) Datagrams() <-chan *disco.Datagram {
return c.datagrams
}
func (c *WSConn) Peers() <-chan *disco.Peer {
return c.peers
}
func (c *WSConn) PeersMeta() <-chan *disco.Peer {
return c.peersMeta
}
func (c *WSConn) PeersUDPAddrs() <-chan *disco.PeerUDPAddr {
return c.peersUDPAddrs
func (c *WSConn) Events() <-chan Event {
return c.events
}
func (c *WSConn) STUNs() []string {
@@ -401,19 +395,23 @@ func (c *WSConn) runEventsReadLoop() {
}
func (c *WSConn) handleEvents(b []byte) {
logger := slog.With("code", disco.ControlCode(b[0]))
logger := slog.With("type", disco.ControlCode(b[0]))
switch disco.ControlCode(b[0]) {
case disco.CONTROL_RELAY:
c.datagrams <- &disco.Datagram{PeerID: disco.PeerID(b[2 : b[1]+2]), Data: b[b[1]+2:]}
case disco.CONTROL_NEW_PEER:
meta, _ := url.ParseQuery(string(b[b[1]+2:]))
event := disco.Peer{ID: disco.PeerID(b[2 : b[1]+2]), Metadata: meta}
c.peers <- &event
c.events <- Event{ControlCode: disco.ControlCode(b[0]), Data: &event}
logger.Log(context.Background(), -2, "[WS] Event", "peer", event.ID, "meta", event.Metadata.Encode())
case disco.CONTROL_PEER_LEAVE:
peerID := disco.PeerID(b[2 : b[1]+2])
c.events <- Event{ControlCode: disco.ControlCode(b[0]), Data: peerID}
logger.Log(context.Background(), -2, "[WS] Event", "peer", peerID)
case disco.CONTROL_UPDATE_META:
meta, _ := url.ParseQuery(string(b[b[1]+2:]))
event := disco.Peer{ID: disco.PeerID(b[2 : b[1]+2]), Metadata: meta}
c.peersMeta <- &event
c.events <- Event{ControlCode: disco.ControlCode(b[0]), Data: &event}
logger.Log(context.Background(), -2, "[WS] Event", "peer", event.ID, "meta", event.Metadata.Encode())
case disco.CONTROL_NEW_PEER_UDP_ADDR:
if b[b[1]+2] != 'a' { // old version without nat type
@@ -423,7 +421,7 @@ func (c *WSConn) handleEvents(b []byte) {
slog.Error("Resolve udp addr error", "err", err)
break
}
c.peersUDPAddrs <- &disco.PeerUDPAddr{ID: disco.PeerID(b[2 : b[1]+2]), Addr: addr}
c.events <- Event{ControlCode: disco.ControlCode(b[0]), Data: disco.PeerUDPAddr{ID: disco.PeerID(b[2 : b[1]+2]), Addr: addr}}
return
}
addrLen := b[b[1]+3]
@@ -433,7 +431,8 @@ func (c *WSConn) handleEvents(b []byte) {
slog.Error("Resolve udp addr error", "err", err)
break
}
c.peersUDPAddrs <- &disco.PeerUDPAddr{ID: disco.PeerID(b[2 : b[1]+2]), Addr: addr, Type: disco.NATType(b[s+addrLen:])}
udpAddr := disco.PeerUDPAddr{ID: disco.PeerID(b[2 : b[1]+2]), Addr: addr, Type: disco.NATType(b[s+addrLen:])}
c.events <- Event{ControlCode: disco.ControlCode(b[0]), Data: udpAddr}
case disco.CONTROL_UPDATE_NETWORK_SECRET:
var secret disco.NetworkSecret
if err := json.Unmarshal(b[1:], &secret); err != nil {
@@ -487,17 +486,15 @@ func (c *WSConn) updateNetworkSecret(secret disco.NetworkSecret) {
func DialPeermap(ctx context.Context, server *disco.Server, peerID disco.PeerID, metadata url.Values) (*WSConn, error) {
wsConn := &WSConn{
server: server,
peerID: peerID,
metadata: metadata,
closedSig: make(chan int),
datagrams: make(chan *disco.Datagram, 50),
peers: make(chan *disco.Peer, 20),
peersMeta: make(chan *disco.Peer, 20),
peersUDPAddrs: make(chan *disco.PeerUDPAddr, 20),
connData: make(chan []byte, 128),
connEOF: make(chan struct{}),
controllers: make(map[uint8][]disco.Controller),
server: server,
peerID: peerID,
metadata: metadata,
closedSig: make(chan int),
datagrams: make(chan *disco.Datagram, 50),
events: make(chan Event, 100),
connData: make(chan []byte, 128),
connEOF: make(chan struct{}),
controllers: make(map[uint8][]disco.Controller),
}
if err := wsConn.dial(ctx, ""); err != nil {
return nil, err
+9
View File
@@ -24,12 +24,14 @@ type Config struct {
SymmAlgo secure.SymmAlgo
Metadata url.Values
OnPeer OnPeer
OnPeerLeave OnPeerLeave
KeepAlivePeriod time.Duration
MinDiscoPeriod time.Duration
}
type Option func(cfg *Config) error
type OnPeer func(disco.PeerID, url.Values)
type OnPeerLeave func(disco.PeerID)
var (
OptionNoOp Option = func(cfg *Config) error { return nil }
@@ -103,6 +105,13 @@ func ListenPeerUp(onPeer OnPeer) Option {
}
}
func ListenPeerLeave(onPeerLeave OnPeerLeave) Option {
return func(cfg *Config) error {
cfg.OnPeerLeave = onPeerLeave
return nil
}
}
func FileSecretStore(storeFilePath string) disco.SecretStore {
return &disco.FileSecretStore{StoreFilePath: storeFilePath}
}
+25 -19
View File
@@ -333,35 +333,41 @@ func (c *PacketConn) runNetworkChangeDetectLoop() {
// runControlEventLoop events control loop
func (c *PacketConn) runControlEventLoop() {
handleEvent := func(e ws.Event) {
switch e.ControlCode {
case disco.CONTROL_NEW_PEER:
peer := e.Data.(*disco.Peer)
c.udpConn.GenerateLocalAddrsSends(peer.ID, c.wsConn.STUNs())
c.peerMap.Put(peer.ID, peer.Metadata)
if onPeer := c.cfg.OnPeer; onPeer != nil {
go onPeer(peer.ID, peer.Metadata)
}
case disco.CONTROL_PEER_LEAVE:
if onLeave := c.cfg.OnPeerLeave; onLeave != nil {
go onLeave(e.Data.(disco.PeerID))
}
case disco.CONTROL_UPDATE_META:
peer := e.Data.(*disco.Peer)
c.peerMap.Put(peer.ID, peer.Metadata)
if onPeer := c.cfg.OnPeer; onPeer != nil {
onPeer(peer.ID, peer.Metadata)
}
case disco.CONTROL_NEW_PEER_UDP_ADDR:
c.udpConn.RunDiscoMessageSendLoop(e.Data.(disco.PeerUDPAddr))
}
}
for {
select {
case peer, ok := <-c.wsConn.Peers():
case e, ok := <-c.wsConn.Events():
if !ok {
return
}
go c.udpConn.GenerateLocalAddrsSends(peer.ID, c.wsConn.STUNs())
c.peerMap.Put(peer.ID, peer.Metadata)
if onPeer := c.cfg.OnPeer; onPeer != nil {
go onPeer(peer.ID, peer.Metadata)
}
case peer, ok := <-c.wsConn.PeersMeta():
if !ok {
return
}
c.peerMap.Put(peer.ID, peer.Metadata)
if onPeer := c.cfg.OnPeer; onPeer != nil {
go onPeer(peer.ID, peer.Metadata)
}
go handleEvent(e)
case natEvent, ok := <-c.udpConn.NATEvents():
if !ok {
return
}
go c.wsConn.UpdateNATInfo(*natEvent)
case revcUDPAddr, ok := <-c.wsConn.PeersUDPAddrs():
if !ok {
return
}
go c.udpConn.RunDiscoMessageSendLoop(*revcUDPAddr)
case sendUDPAddr, ok := <-c.udpConn.UDPAddrSends():
if !ok {
return
+20 -8
View File
@@ -108,6 +108,7 @@ func (p *peerConn) Close() error {
p.conn.Close()
close(p.exitSig)
close(p.connData)
p.broadcastLeave()
})
return nil
}
@@ -180,8 +181,7 @@ func (p *peerConn) leadDisco(target *peerConn) {
p.write(b1)
}
func (p *peerConn) broadcastMeta() {
myMeta := []byte(p.metadata.Encode())
func (p *peerConn) broadcast(b []byte) {
ctx, _ := p.peerMap.getNetwork(p.networkSecret.Network)
var peers []*peerConn
ctx.peersMutex.RLock()
@@ -192,16 +192,28 @@ func (p *peerConn) broadcastMeta() {
peers = append(peers, v)
}
ctx.peersMutex.RUnlock()
slog.Debug("BroadcastMeta", "count", len(peers), "meta", p.metadata.Encode())
slog.Debug("Broadcast", "count", len(peers), "type", disco.ControlCode(b[0]))
for _, target := range peers {
b := append([]byte(nil), disco.CONTROL_UPDATE_META.Byte())
b = append(b, p.id.Len())
b = append(b, p.id.Bytes()...)
b = append(b, myMeta...)
target.write(b)
target.write(append([]byte(nil), b...))
}
}
func (p *peerConn) broadcastMeta() {
myMeta := []byte(p.metadata.Encode())
b := append([]byte(nil), disco.CONTROL_UPDATE_META.Byte())
b = append(b, p.id.Len())
b = append(b, p.id.Bytes()...)
b = append(b, myMeta...)
p.broadcast(b)
}
func (p *peerConn) broadcastLeave() {
b := append([]byte(nil), disco.CONTROL_PEER_LEAVE.Byte())
b = append(b, p.id.Len())
b = append(b, p.id.Bytes()...)
p.broadcast(b)
}
func (p *peerConn) readMessageLoop() {
for {
select {