feat: multicast backend (wip)

Signed-off-by: Adam Rizkalla <ajarizzo@gmail.com>
This commit is contained in:
Adam Rizkalla
2025-03-04 18:39:03 +00:00
committed by Steffen Vogel
parent 0fb041dedb
commit b0ea76d023
10 changed files with 239 additions and 19 deletions
+1
View File
@@ -17,4 +17,5 @@ import (
// Signaling backends.
_ "cunicu.li/cunicu/pkg/signaling/grpc"
_ "cunicu.li/cunicu/pkg/signaling/inprocess"
_ "cunicu.li/cunicu/pkg/signaling/mcast"
)
+1
View File
@@ -67,6 +67,7 @@ cunīcu can support multiple backends for signaling session information such as
### Available backends
- gRPC
- Multicast
Checkout the [`Backend`](https://github.com/cunicu/cunicu/blob/main/pkg/signaling/backend.go) interface for implementing your own backend.
+1
View File
@@ -38,6 +38,7 @@ buildGo124Module {
excludedPackages = [
"pkg/config"
"pkg/selfupdate"
"pkg/signaling/mcast"
"pkg/tty"
"scripts"
];
+21 -17
View File
@@ -33,6 +33,7 @@ const (
BackendType_MULTI BackendType = 0
BackendType_GRPC BackendType = 1
BackendType_INPROCESS BackendType = 2
BackendType_MCAST BackendType = 3
)
// Enum value maps for BackendType.
@@ -41,11 +42,13 @@ var (
0: "MULTI",
1: "GRPC",
2: "INPROCESS",
3: "MCAST",
}
BackendType_value = map[string]int32{
"MULTI": 0,
"GRPC": 1,
"INPROCESS": 2,
"MCAST": 3,
}
)
@@ -343,25 +346,26 @@ var file_signaling_signaling_proto_rawDesc = []byte{
0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x6e, 0x6f, 0x6e, 0x63, 0x65, 0x22, 0x23, 0x0a, 0x0f,
0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x73, 0x12,
0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b, 0x65,
0x79, 0x2a, 0x31, 0x0a, 0x0b, 0x42, 0x61, 0x63, 0x6b, 0x65, 0x6e, 0x64, 0x54, 0x79, 0x70, 0x65,
0x79, 0x2a, 0x3c, 0x0a, 0x0b, 0x42, 0x61, 0x63, 0x6b, 0x65, 0x6e, 0x64, 0x54, 0x79, 0x70, 0x65,
0x12, 0x09, 0x0a, 0x05, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x47,
0x52, 0x50, 0x43, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x49, 0x4e, 0x50, 0x52, 0x4f, 0x43, 0x45,
0x53, 0x53, 0x10, 0x02, 0x32, 0xc7, 0x01, 0x0a, 0x09, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69,
0x6e, 0x67, 0x12, 0x32, 0x0a, 0x0c, 0x47, 0x65, 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x49, 0x6e,
0x66, 0x6f, 0x12, 0x0d, 0x2e, 0x63, 0x75, 0x6e, 0x69, 0x63, 0x75, 0x2e, 0x45, 0x6d, 0x70, 0x74,
0x79, 0x1a, 0x11, 0x2e, 0x63, 0x75, 0x6e, 0x69, 0x63, 0x75, 0x2e, 0x42, 0x75, 0x69, 0x6c, 0x64,
0x49, 0x6e, 0x66, 0x6f, 0x22, 0x00, 0x12, 0x4e, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72,
0x69, 0x62, 0x65, 0x12, 0x21, 0x2e, 0x63, 0x75, 0x6e, 0x69, 0x63, 0x75, 0x2e, 0x73, 0x69, 0x67,
0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65,
0x50, 0x61, 0x72, 0x61, 0x6d, 0x73, 0x1a, 0x1a, 0x2e, 0x63, 0x75, 0x6e, 0x69, 0x63, 0x75, 0x2e,
0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x45, 0x6e, 0x76, 0x65, 0x6c, 0x6f,
0x70, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x36, 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73,
0x68, 0x12, 0x1a, 0x2e, 0x63, 0x75, 0x6e, 0x69, 0x63, 0x75, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61,
0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x45, 0x6e, 0x76, 0x65, 0x6c, 0x6f, 0x70, 0x65, 0x1a, 0x0d, 0x2e,
0x63, 0x75, 0x6e, 0x69, 0x63, 0x75, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x42, 0x26,
0x5a, 0x24, 0x63, 0x75, 0x6e, 0x69, 0x63, 0x75, 0x2e, 0x6c, 0x69, 0x2f, 0x63, 0x75, 0x6e, 0x69,
0x63, 0x75, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x69, 0x67,
0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x53, 0x53, 0x10, 0x02, 0x12, 0x09, 0x0a, 0x05, 0x4d, 0x43, 0x41, 0x53, 0x54, 0x10, 0x03, 0x32,
0xc7, 0x01, 0x0a, 0x09, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x12, 0x32, 0x0a,
0x0c, 0x47, 0x65, 0x74, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x0d, 0x2e,
0x63, 0x75, 0x6e, 0x69, 0x63, 0x75, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x11, 0x2e, 0x63,
0x75, 0x6e, 0x69, 0x63, 0x75, 0x2e, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x22,
0x00, 0x12, 0x4e, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x21,
0x2e, 0x63, 0x75, 0x6e, 0x69, 0x63, 0x75, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e,
0x67, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x50, 0x61, 0x72, 0x61, 0x6d,
0x73, 0x1a, 0x1a, 0x2e, 0x63, 0x75, 0x6e, 0x69, 0x63, 0x75, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61,
0x6c, 0x69, 0x6e, 0x67, 0x2e, 0x45, 0x6e, 0x76, 0x65, 0x6c, 0x6f, 0x70, 0x65, 0x22, 0x00, 0x30,
0x01, 0x12, 0x36, 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x1a, 0x2e, 0x63,
0x75, 0x6e, 0x69, 0x63, 0x75, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e, 0x67, 0x2e,
0x45, 0x6e, 0x76, 0x65, 0x6c, 0x6f, 0x70, 0x65, 0x1a, 0x0d, 0x2e, 0x63, 0x75, 0x6e, 0x69, 0x63,
0x75, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x42, 0x26, 0x5a, 0x24, 0x63, 0x75, 0x6e,
0x69, 0x63, 0x75, 0x2e, 0x6c, 0x69, 0x2f, 0x63, 0x75, 0x6e, 0x69, 0x63, 0x75, 0x2f, 0x70, 0x6b,
0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x69, 0x6e,
0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
+1 -1
View File
@@ -23,7 +23,7 @@ func (c *BackendConfig) Parse(cfg *signaling.BackendConfig) (err error) {
c.Target, c.Options, err = ParseURL(c.BackendConfig.URI.String())
if err != nil {
return fmt.Errorf("failed to parse gRPC URL:%w", err)
return fmt.Errorf("failed to parse gRPC URL: %w", err)
}
return nil
+152
View File
@@ -0,0 +1,152 @@
// SPDX-FileCopyrightText: 2025 Adam Rizkalla <ajarizzo@gmail.com>
// SPDX-License-Identifier: Apache-2.0
package mcast
import (
"context"
"fmt"
"net"
"syscall"
"cunicu.li/cunicu/pkg/crypto"
"cunicu.li/cunicu/pkg/log"
signalingproto "cunicu.li/cunicu/pkg/proto/signaling"
"cunicu.li/cunicu/pkg/signaling"
"go.uber.org/zap"
"golang.org/x/net/ipv4"
"google.golang.org/protobuf/proto"
)
func init() { //nolint:gochecknoinits
signaling.Backends["multicast"] = &signaling.BackendPlugin{
New: NewBackend,
Description: "Multicast",
}
}
type Backend struct {
signaling.SubscriptionsRegistry
send_conn net.PacketConn
recv_conn *net.UDPConn
mcast_addr *net.UDPAddr
config BackendConfig
logger *log.Logger
}
func NewBackend(cfg *signaling.BackendConfig, logger *log.Logger) (signaling.Backend, error) {
b := &Backend{
SubscriptionsRegistry: signaling.NewSubscriptionsRegistry(),
logger: logger,
}
//if err := b.config.Parse(cfg); err != nil {
// return nil, fmt.Errorf("failed to parse backend configuration: %w", err)
//}
var err error
// Parse multicast group
if b.mcast_addr, err = net.ResolveUDPAddr("udp", "224.0.0.1:9999"); err != nil {
return nil, fmt.Errorf("Error parsing multicast address: %w", err)
}
// Bind to any available local UDP port for sending to multicast group
if b.send_conn, err = net.ListenPacket("udp", ":0"); err != nil {
return nil, fmt.Errorf("Error binding to local address: %w", err)
}
p := ipv4.NewPacketConn(b.send_conn)
if err := p.JoinGroup(nil, b.mcast_addr); err != nil {
return nil, fmt.Errorf("Error joining multicast group: %w", err)
}
// Add listener for multicast group
if b.recv_conn, err = net.ListenMulticastUDP("udp", nil, b.mcast_addr); err != nil {
return nil, fmt.Errorf("Error adding multicast listener: %w", err)
}
// Enable multicast loopback
fd, _ := b.recv_conn.File()
syscall.SetsockoptInt(int(fd.Fd()), syscall.IPPROTO_IP, syscall.IP_MULTICAST_LOOP, 1)
//syscall.SetsockoptInt(fd, syscall.IPPROTO_IP, syscall.IP_MULTICAST_LOOP, 1)
go func() {
buf := make([]byte, 4096)
for {
n, _, err := b.recv_conn.ReadFrom(buf)
if err != nil {
if err == net.ErrClosed {
break
}
b.logger.Error("Error reading from UDPConn", zap.Error(err))
break
//continue
}
var env signalingproto.Envelope
err = proto.Unmarshal(buf[:n], &env)
if err != nil {
b.logger.Error("Error unmarshaling protobuf", zap.Error(err))
continue
}
if err := b.SubscriptionsRegistry.NewMessage(&env); err != nil {
if err == signaling.ErrNotSubscribed {
// Message wasn't for us but we will get everything over multicast, just ignore it
} else {
b.logger.Error("Failed to decrypt message", zap.Error(err))
}
continue
}
}
}()
for _, h := range cfg.OnReady {
h.OnSignalingBackendReady(b)
}
return b, nil
}
func (b *Backend) Type() signalingproto.BackendType {
return signalingproto.BackendType_MCAST
}
func (b *Backend) Subscribe(ctx context.Context, kp *crypto.KeyPair, h signaling.MessageHandler) (bool, error) {
return b.SubscriptionsRegistry.Subscribe(kp, h)
}
func (b *Backend) Unsubscribe(ctx context.Context, kp *crypto.KeyPair, h signaling.MessageHandler) (bool, error) {
return b.SubscriptionsRegistry.Unsubscribe(kp, h)
}
func (b *Backend) Publish(ctx context.Context, kp *crypto.KeyPair, msg *signaling.Message) error {
env, err := msg.Encrypt(kp)
if err != nil {
return fmt.Errorf("failed to encrypt message: %w", err)
}
data, err := proto.Marshal(env)
if err != nil {
return fmt.Errorf("Error marshaling protobuf: %w", err)
}
if _, err = b.send_conn.WriteTo(data, b.mcast_addr); err != nil {
return fmt.Errorf("failed to publish message: %w", err)
}
return nil
}
func (b *Backend) Close() error {
//return fmt.Errorf("Close() called")
//if err := b.conn.Close(); err != nil {
// return fmt.Errorf("failed to close multicast connection: %w", err)
//}
return nil
}
+29
View File
@@ -0,0 +1,29 @@
// SPDX-FileCopyrightText: 2025 Adam Rizkalla <ajarizzo@gmail.com>
// SPDX-License-Identifier: Apache-2.0
package mcast_test
import (
"net/url"
"testing"
_ "cunicu.li/cunicu/pkg/signaling/mcast"
"cunicu.li/cunicu/test"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
func TestSuite(t *testing.T) {
test.SetupLogging()
RegisterFailHandler(Fail)
RunSpecs(t, "Multicast Backend Suite")
}
var _ = Describe("Multicast backend", func() {
u := url.URL{
Scheme: "multicast",
}
test.BackendTest(&u, 10)
})
+26
View File
@@ -0,0 +1,26 @@
// SPDX-FileCopyrightText: 2025 Adam Rizkalla <ajarizzo@gmail.com>
// SPDX-License-Identifier: Apache-2.0
package mcast
import (
"cunicu.li/cunicu/pkg/signaling"
)
type BackendConfig struct {
signaling.BackendConfig
Target string
Loopback bool
}
func (c *BackendConfig) Parse(cfg *signaling.BackendConfig) (err error) {
c.BackendConfig = *cfg
//c.Target, c.Loopback, err = ParseURL(c.BackendConfig.URI.String())
//if err != nil {
// return fmt.Errorf("failed to parse multicast URL: %w", err)
//}
return nil
}
+5
View File
@@ -0,0 +1,5 @@
// SPDX-FileCopyrightText: 2025 Adam Rizkalla <ajarizzo@gmail.com>
// SPDX-License-Identifier: Apache-2.0
// Package mcast implements a signaling backend using multicast
package mcast
+2 -1
View File
@@ -16,6 +16,7 @@ enum BackendType {
MULTI = 0;
GRPC = 1;
INPROCESS = 2;
MCAST = 3;
}
message Envelope {
@@ -47,4 +48,4 @@ service Signaling {
rpc Subscribe(SubscribeParams) returns (stream Envelope) {}
rpc Publish(Envelope) returns (Empty) {}
}
}