diff --git a/disco/control.go b/disco/control.go new file mode 100644 index 0000000..e92f17e --- /dev/null +++ b/disco/control.go @@ -0,0 +1,12 @@ +package disco + +type Controller interface { + Handle(b []byte) + Name() string + Type() uint8 +} + +type ControllerManager interface { + Register(Controller) + Unregister(Controller) +} diff --git a/disco/disco.go b/disco/disco.go index 210cf6f..26659df 100644 --- a/disco/disco.go +++ b/disco/disco.go @@ -75,6 +75,7 @@ func (d *Disco) magic() []byte { type PeerStore interface { FindPeer(peer.ID) (*PeerContext, bool) + Peers() []PeerState } type PeerContext struct { diff --git a/disco/ws.go b/disco/ws.go index 146d25d..d2a89b6 100644 --- a/disco/ws.go +++ b/disco/ws.go @@ -22,24 +22,27 @@ import ( ) var ( - _ io.ReadWriter = (*WSConn)(nil) + _ io.ReadWriter = (*WSConn)(nil) + _ ControllerManager = (*WSConn)(nil) ) type WSConn struct { *websocket.Conn - server *peer.Peermap - connectedServer string - peerID peer.ID - metadata url.Values - closedSig chan int - datagrams chan *Datagram - peers chan *PeerFindEvent - peersUDPAddrs chan *PeerUDPAddrEvent - nonce byte - stuns []string - activeTime atomic.Int64 - writeMutex sync.Mutex - rateLimiter *rate.Limiter + server *peer.Peermap + connectedServer string + peerID peer.ID + metadata url.Values + closedSig chan int + datagrams chan *Datagram + peers chan *PeerFindEvent + peersUDPAddrs chan *PeerUDPAddrEvent + nonce byte + stuns []string + activeTime atomic.Int64 + writeMutex sync.Mutex + rateLimiter *rate.Limiter + controllersMutex sync.RWMutex + controllers map[uint8][]Controller connData chan []byte connBuf []byte @@ -135,6 +138,24 @@ func (c *WSConn) ServerURL() string { return c.connectedServer } +func (c *WSConn) Register(ctr Controller) { + c.controllersMutex.Lock() + defer c.controllersMutex.Unlock() + c.controllers[ctr.Type()] = append(c.controllers[ctr.Type()], ctr) +} + +func (c *WSConn) Unregister(ctr Controller) { + c.controllersMutex.Lock() + defer c.controllersMutex.Unlock() + var filterd []Controller + for _, ct := range c.controllers[ctr.Type()] { + if ct.Name() != ctr.Name() { + filterd = append(filterd, ct) + } + } + c.controllers[ctr.Type()] = filterd +} + func (c *WSConn) dial(ctx context.Context, server string) error { networkSecret, err := c.server.SecretStore().NetworkSecret() if err != nil { @@ -343,6 +364,13 @@ func (c *WSConn) handleEvents(b []byte) { go c.updateNetworkSecret(secret) case peer.CONTROL_CONN: c.connData <- b[1:] + default: + c.controllersMutex.RLock() + ctrs := c.controllers[b[0]] + c.controllersMutex.RUnlock() + for _, ctr := range ctrs { + ctr.Handle(b) + } } } @@ -384,6 +412,7 @@ func DialPeermap(ctx context.Context, server *peer.Peermap, peerID peer.ID, meta peers: make(chan *PeerFindEvent, 20), peersUDPAddrs: make(chan *PeerUDPAddrEvent, 20), connData: make(chan []byte, 128), + controllers: make(map[uint8][]Controller), } if err := wsConn.dial(ctx, ""); err != nil { return nil, err diff --git a/p2p/conn.go b/p2p/conn.go index 31c3920..d6e273b 100644 --- a/p2p/conn.go +++ b/p2p/conn.go @@ -185,11 +185,6 @@ func (c *PeerPacketConn) TryLeadDisco(peerID peer.ID) { } } -// UDPConn return the os udp socket -func (c *PeerPacketConn) UDPConn() net.PacketConn { - return c.udpConn -} - // ServerStream is the connection stream to the peermap server func (c *PeerPacketConn) ServerStream() io.ReadWriter { return c.wsConn @@ -200,9 +195,14 @@ func (c *PeerPacketConn) ServerURL() string { return c.wsConn.ServerURL() } -// Peers return the found peers -func (c *PeerPacketConn) Peers() []disco.PeerState { - return c.udpConn.Peers() +// ControllerManager makes changes attempting to move the current state towards the desired state +func (c *PeerPacketConn) ControllerManager() disco.ControllerManager { + return c.wsConn +} + +// PeerStore stores the found peers +func (c *PeerPacketConn) PeerStore() disco.PeerStore { + return c.udpConn } // runControlEventLoop events control loop