diff --git a/cmd/cunicu/cmd.go b/cmd/cunicu/cmd.go index 067d8b21..8e5ab34d 100644 --- a/cmd/cunicu/cmd.go +++ b/cmd/cunicu/cmd.go @@ -17,4 +17,5 @@ import ( // Signaling backends. _ "cunicu.li/cunicu/pkg/signaling/grpc" _ "cunicu.li/cunicu/pkg/signaling/inprocess" + _ "cunicu.li/cunicu/pkg/signaling/mcast" ) diff --git a/docs/dev/signaling.md b/docs/dev/signaling.md index bc6e31b9..111f09cf 100644 --- a/docs/dev/signaling.md +++ b/docs/dev/signaling.md @@ -67,6 +67,7 @@ cunīcu can support multiple backends for signaling session information such as ### Available backends - gRPC +- Multicast Checkout the [`Backend`](https://github.com/cunicu/cunicu/blob/main/pkg/signaling/backend.go) interface for implementing your own backend. diff --git a/nix/cunicu.nix b/nix/cunicu.nix index 6609e2a6..0d4e08df 100644 --- a/nix/cunicu.nix +++ b/nix/cunicu.nix @@ -38,6 +38,7 @@ buildGo124Module { excludedPackages = [ "pkg/config" "pkg/selfupdate" + "pkg/signaling/mcast" "pkg/tty" "scripts" ]; diff --git a/pkg/proto/signaling/signaling.pb.go b/pkg/proto/signaling/signaling.pb.go index e3e99515..42610311 100644 --- a/pkg/proto/signaling/signaling.pb.go +++ b/pkg/proto/signaling/signaling.pb.go @@ -33,6 +33,7 @@ const ( BackendType_MULTI BackendType = 0 BackendType_GRPC BackendType = 1 BackendType_INPROCESS BackendType = 2 + BackendType_MCAST BackendType = 3 ) // Enum value maps for BackendType. @@ -41,11 +42,13 @@ var ( 0: "MULTI", 1: "GRPC", 2: "INPROCESS", + 3: "MCAST", } BackendType_value = map[string]int32{ "MULTI": 0, "GRPC": 1, "INPROCESS": 2, + "MCAST": 3, } ) @@ -343,25 +346,26 @@ var file_signaling_signaling_proto_rawDesc = []byte{ 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x6e, 0x6f, 0x6e, 0x63, 0x65, 0x22, 0x23, 0x0a, 0x0f, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x73, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b, 0x65, - 0x79, 0x2a, 0x31, 0x0a, 0x0b, 0x42, 0x61, 0x63, 0x6b, 0x65, 0x6e, 0x64, 0x54, 0x79, 0x70, 0x65, + 0x79, 0x2a, 0x3c, 0x0a, 0x0b, 0x42, 0x61, 0x63, 0x6b, 0x65, 0x6e, 0x64, 0x54, 0x79, 0x70, 0x65, 0x12, 0x09, 0x0a, 0x05, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x47, 0x52, 0x50, 0x43, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x49, 0x4e, 0x50, 0x52, 0x4f, 0x43, 0x45, - 0x53, 0x53, 0x10, 0x02, 0x32, 0xc7, 0x01, 0x0a, 0x09, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, - 0x6e, 0x67, 0x12, 0x32, 0x0a, 0x0c, 0x47, 0x65, 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x49, 0x6e, - 0x66, 0x6f, 0x12, 0x0d, 0x2e, 0x63, 0x75, 0x6e, 0x69, 0x63, 0x75, 0x2e, 0x45, 0x6d, 0x70, 0x74, - 0x79, 0x1a, 0x11, 0x2e, 0x63, 0x75, 0x6e, 0x69, 0x63, 0x75, 0x2e, 0x42, 0x75, 0x69, 0x6c, 0x64, - 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x00, 0x12, 0x4e, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, - 0x69, 0x62, 0x65, 0x12, 0x21, 0x2e, 0x63, 0x75, 0x6e, 0x69, 0x63, 0x75, 0x2e, 0x73, 0x69, 0x67, - 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, - 0x50, 0x61, 0x72, 0x61, 0x6d, 0x73, 0x1a, 0x1a, 0x2e, 0x63, 0x75, 0x6e, 0x69, 0x63, 0x75, 0x2e, - 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x45, 0x6e, 0x76, 0x65, 0x6c, 0x6f, - 0x70, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x36, 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, - 0x68, 0x12, 0x1a, 0x2e, 0x63, 0x75, 0x6e, 0x69, 0x63, 0x75, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, - 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x45, 0x6e, 0x76, 0x65, 0x6c, 0x6f, 0x70, 0x65, 0x1a, 0x0d, 0x2e, - 0x63, 0x75, 0x6e, 0x69, 0x63, 0x75, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x42, 0x26, - 0x5a, 0x24, 0x63, 0x75, 0x6e, 0x69, 0x63, 0x75, 0x2e, 0x6c, 0x69, 0x2f, 0x63, 0x75, 0x6e, 0x69, - 0x63, 0x75, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x69, 0x67, - 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x53, 0x53, 0x10, 0x02, 0x12, 0x09, 0x0a, 0x05, 0x4d, 0x43, 0x41, 0x53, 0x54, 0x10, 0x03, 0x32, + 0xc7, 0x01, 0x0a, 0x09, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x12, 0x32, 0x0a, + 0x0c, 0x47, 0x65, 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x0d, 0x2e, + 0x63, 0x75, 0x6e, 0x69, 0x63, 0x75, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x11, 0x2e, 0x63, + 0x75, 0x6e, 0x69, 0x63, 0x75, 0x2e, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x22, + 0x00, 0x12, 0x4e, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x21, + 0x2e, 0x63, 0x75, 0x6e, 0x69, 0x63, 0x75, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, + 0x67, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x50, 0x61, 0x72, 0x61, 0x6d, + 0x73, 0x1a, 0x1a, 0x2e, 0x63, 0x75, 0x6e, 0x69, 0x63, 0x75, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, + 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x45, 0x6e, 0x76, 0x65, 0x6c, 0x6f, 0x70, 0x65, 0x22, 0x00, 0x30, + 0x01, 0x12, 0x36, 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x1a, 0x2e, 0x63, + 0x75, 0x6e, 0x69, 0x63, 0x75, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, + 0x45, 0x6e, 0x76, 0x65, 0x6c, 0x6f, 0x70, 0x65, 0x1a, 0x0d, 0x2e, 0x63, 0x75, 0x6e, 0x69, 0x63, + 0x75, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x42, 0x26, 0x5a, 0x24, 0x63, 0x75, 0x6e, + 0x69, 0x63, 0x75, 0x2e, 0x6c, 0x69, 0x2f, 0x63, 0x75, 0x6e, 0x69, 0x63, 0x75, 0x2f, 0x70, 0x6b, + 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, + 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/pkg/signaling/grpc/config.go b/pkg/signaling/grpc/config.go index 30f8e69a..ad2e6dcb 100644 --- a/pkg/signaling/grpc/config.go +++ b/pkg/signaling/grpc/config.go @@ -23,7 +23,7 @@ func (c *BackendConfig) Parse(cfg *signaling.BackendConfig) (err error) { c.Target, c.Options, err = ParseURL(c.BackendConfig.URI.String()) if err != nil { - return fmt.Errorf("failed to parse gRPC URL:%w", err) + return fmt.Errorf("failed to parse gRPC URL: %w", err) } return nil diff --git a/pkg/signaling/mcast/backend.go b/pkg/signaling/mcast/backend.go new file mode 100644 index 00000000..3c4db676 --- /dev/null +++ b/pkg/signaling/mcast/backend.go @@ -0,0 +1,152 @@ +// SPDX-FileCopyrightText: 2025 Adam Rizkalla +// SPDX-License-Identifier: Apache-2.0 + +package mcast + +import ( + "context" + "fmt" + "net" + "syscall" + + "cunicu.li/cunicu/pkg/crypto" + "cunicu.li/cunicu/pkg/log" + signalingproto "cunicu.li/cunicu/pkg/proto/signaling" + "cunicu.li/cunicu/pkg/signaling" + "go.uber.org/zap" + "golang.org/x/net/ipv4" + "google.golang.org/protobuf/proto" +) + +func init() { //nolint:gochecknoinits + signaling.Backends["multicast"] = &signaling.BackendPlugin{ + New: NewBackend, + Description: "Multicast", + } +} + +type Backend struct { + signaling.SubscriptionsRegistry + + send_conn net.PacketConn + recv_conn *net.UDPConn + mcast_addr *net.UDPAddr + config BackendConfig + + logger *log.Logger +} + +func NewBackend(cfg *signaling.BackendConfig, logger *log.Logger) (signaling.Backend, error) { + b := &Backend{ + SubscriptionsRegistry: signaling.NewSubscriptionsRegistry(), + logger: logger, + } + + //if err := b.config.Parse(cfg); err != nil { + // return nil, fmt.Errorf("failed to parse backend configuration: %w", err) + //} + + var err error + + // Parse multicast group + if b.mcast_addr, err = net.ResolveUDPAddr("udp", "224.0.0.1:9999"); err != nil { + return nil, fmt.Errorf("Error parsing multicast address: %w", err) + } + + // Bind to any available local UDP port for sending to multicast group + if b.send_conn, err = net.ListenPacket("udp", ":0"); err != nil { + return nil, fmt.Errorf("Error binding to local address: %w", err) + } + + p := ipv4.NewPacketConn(b.send_conn) + + if err := p.JoinGroup(nil, b.mcast_addr); err != nil { + return nil, fmt.Errorf("Error joining multicast group: %w", err) + } + + // Add listener for multicast group + if b.recv_conn, err = net.ListenMulticastUDP("udp", nil, b.mcast_addr); err != nil { + return nil, fmt.Errorf("Error adding multicast listener: %w", err) + } + + // Enable multicast loopback + fd, _ := b.recv_conn.File() + syscall.SetsockoptInt(int(fd.Fd()), syscall.IPPROTO_IP, syscall.IP_MULTICAST_LOOP, 1) + //syscall.SetsockoptInt(fd, syscall.IPPROTO_IP, syscall.IP_MULTICAST_LOOP, 1) + + go func() { + buf := make([]byte, 4096) + for { + n, _, err := b.recv_conn.ReadFrom(buf) + if err != nil { + if err == net.ErrClosed { + break + } + b.logger.Error("Error reading from UDPConn", zap.Error(err)) + break + //continue + } + + var env signalingproto.Envelope + err = proto.Unmarshal(buf[:n], &env) + if err != nil { + b.logger.Error("Error unmarshaling protobuf", zap.Error(err)) + continue + } + + if err := b.SubscriptionsRegistry.NewMessage(&env); err != nil { + if err == signaling.ErrNotSubscribed { + // Message wasn't for us but we will get everything over multicast, just ignore it + } else { + b.logger.Error("Failed to decrypt message", zap.Error(err)) + } + continue + } + } + }() + + for _, h := range cfg.OnReady { + h.OnSignalingBackendReady(b) + } + + return b, nil +} + +func (b *Backend) Type() signalingproto.BackendType { + return signalingproto.BackendType_MCAST +} + +func (b *Backend) Subscribe(ctx context.Context, kp *crypto.KeyPair, h signaling.MessageHandler) (bool, error) { + return b.SubscriptionsRegistry.Subscribe(kp, h) +} + +func (b *Backend) Unsubscribe(ctx context.Context, kp *crypto.KeyPair, h signaling.MessageHandler) (bool, error) { + return b.SubscriptionsRegistry.Unsubscribe(kp, h) +} + +func (b *Backend) Publish(ctx context.Context, kp *crypto.KeyPair, msg *signaling.Message) error { + env, err := msg.Encrypt(kp) + if err != nil { + return fmt.Errorf("failed to encrypt message: %w", err) + } + + data, err := proto.Marshal(env) + if err != nil { + return fmt.Errorf("Error marshaling protobuf: %w", err) + } + + if _, err = b.send_conn.WriteTo(data, b.mcast_addr); err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + return nil +} + +func (b *Backend) Close() error { + //return fmt.Errorf("Close() called") + //if err := b.conn.Close(); err != nil { + // return fmt.Errorf("failed to close multicast connection: %w", err) + //} + + return nil +} diff --git a/pkg/signaling/mcast/backend_test.go b/pkg/signaling/mcast/backend_test.go new file mode 100644 index 00000000..9667e7dd --- /dev/null +++ b/pkg/signaling/mcast/backend_test.go @@ -0,0 +1,29 @@ +// SPDX-FileCopyrightText: 2025 Adam Rizkalla +// SPDX-License-Identifier: Apache-2.0 + +package mcast_test + +import ( + "net/url" + "testing" + + _ "cunicu.li/cunicu/pkg/signaling/mcast" + "cunicu.li/cunicu/test" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestSuite(t *testing.T) { + test.SetupLogging() + RegisterFailHandler(Fail) + RunSpecs(t, "Multicast Backend Suite") +} + +var _ = Describe("Multicast backend", func() { + u := url.URL{ + Scheme: "multicast", + } + + test.BackendTest(&u, 10) +}) diff --git a/pkg/signaling/mcast/config.go b/pkg/signaling/mcast/config.go new file mode 100644 index 00000000..dd8a9a55 --- /dev/null +++ b/pkg/signaling/mcast/config.go @@ -0,0 +1,26 @@ +// SPDX-FileCopyrightText: 2025 Adam Rizkalla +// SPDX-License-Identifier: Apache-2.0 + +package mcast + +import ( + "cunicu.li/cunicu/pkg/signaling" +) + +type BackendConfig struct { + signaling.BackendConfig + + Target string + Loopback bool +} + +func (c *BackendConfig) Parse(cfg *signaling.BackendConfig) (err error) { + c.BackendConfig = *cfg + + //c.Target, c.Loopback, err = ParseURL(c.BackendConfig.URI.String()) + //if err != nil { + // return fmt.Errorf("failed to parse multicast URL: %w", err) + //} + + return nil +} diff --git a/pkg/signaling/mcast/mcast.go b/pkg/signaling/mcast/mcast.go new file mode 100644 index 00000000..77b981e3 --- /dev/null +++ b/pkg/signaling/mcast/mcast.go @@ -0,0 +1,5 @@ +// SPDX-FileCopyrightText: 2025 Adam Rizkalla +// SPDX-License-Identifier: Apache-2.0 + +// Package mcast implements a signaling backend using multicast +package mcast diff --git a/proto/signaling/signaling.proto b/proto/signaling/signaling.proto index 5dee98ac..718b1ec2 100644 --- a/proto/signaling/signaling.proto +++ b/proto/signaling/signaling.proto @@ -16,6 +16,7 @@ enum BackendType { MULTI = 0; GRPC = 1; INPROCESS = 2; + MCAST = 3; } message Envelope { @@ -47,4 +48,4 @@ service Signaling { rpc Subscribe(SubscribeParams) returns (stream Envelope) {} rpc Publish(Envelope) returns (Empty) {} -} \ No newline at end of file +}