From 04094631815b5f6e043c8ca66dfefe5d9fb7f042 Mon Sep 17 00:00:00 2001 From: harshabose Date: Sat, 8 Mar 2025 02:18:29 +0530 Subject: [PATCH] first commit --- .gitignore | 0 internal/client/client.go | 8 +++ internal/user/user.go | 1 + pkg/interceptor/chain.go | 42 ++++++++++++++++ pkg/interceptor/errors.go | 52 ++++++++++++++++++++ pkg/interceptor/interceptor.go | 64 ++++++++++++++++++++++++ pkg/interceptor/no_interceptor.go | 19 ++++++++ pkg/message/message.go | 14 ++++++ pkg/socket/client.go | 1 + pkg/socket/options.go | 3 ++ pkg/socket/settings.go | 8 +++ pkg/socket/socket.go | 81 +++++++++++++++++++++++++++++++ 12 files changed, 293 insertions(+) create mode 100644 .gitignore create mode 100644 internal/client/client.go create mode 100644 internal/user/user.go create mode 100644 pkg/interceptor/chain.go create mode 100644 pkg/interceptor/errors.go create mode 100644 pkg/interceptor/interceptor.go create mode 100644 pkg/interceptor/no_interceptor.go create mode 100644 pkg/message/message.go create mode 100644 pkg/socket/client.go create mode 100644 pkg/socket/options.go create mode 100644 pkg/socket/settings.go create mode 100644 pkg/socket/socket.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e69de29 diff --git a/internal/client/client.go b/internal/client/client.go new file mode 100644 index 0000000..e8c04ff --- /dev/null +++ b/internal/client/client.go @@ -0,0 +1,8 @@ +package client + +import "github.com/coder/websocket" + +type Client struct { + id string + connection *websocket.Conn +} diff --git a/internal/user/user.go b/internal/user/user.go new file mode 100644 index 0000000..a00006b --- /dev/null +++ b/internal/user/user.go @@ -0,0 +1 @@ +package user diff --git a/pkg/interceptor/chain.go b/pkg/interceptor/chain.go new file mode 100644 index 0000000..1a045e9 --- /dev/null +++ b/pkg/interceptor/chain.go @@ -0,0 +1,42 @@ +package interceptor + +type Chain struct { + interceptors []Interceptor +} + +func CreateChain(interceptors []Interceptor) *Chain { + return &Chain{interceptors: interceptors} +} + +func (chain *Chain) BindIncoming(reader IncomingReader) IncomingReader { + for _, interceptor := range chain.interceptors { + interceptor.BindIncoming(reader) + } + + return reader +} + +func (chain *Chain) BindOutgoing(writer OutgoingWriter) OutgoingWriter { + for _, interceptor := range chain.interceptors { + interceptor.BindOutgoing(writer) + } + + return writer +} + +func (chain *Chain) BindConnection(connection Connection) Connection { + for _, interceptor := range chain.interceptors { + interceptor.BindConnection(connection) + } + + return connection +} + +func (chain *Chain) Close() error { + var errs []error + for _, interceptor := range chain.interceptors { + errs = append(errs, interceptor.Close()) + } + + return flattenErrs(errs) +} diff --git a/pkg/interceptor/errors.go b/pkg/interceptor/errors.go new file mode 100644 index 0000000..fb1a506 --- /dev/null +++ b/pkg/interceptor/errors.go @@ -0,0 +1,52 @@ +package interceptor + +import ( + "errors" + "strings" +) + +func flattenErrs(errs []error) error { + var errs2 []error + for _, e := range errs { + if e != nil { + errs2 = append(errs2, e) + } + } + if len(errs2) == 0 { + return nil + } + return multiError(errs2) +} + +type multiError []error + +func (me multiError) Error() string { + var errstrings []string + + for _, err := range me { + if err != nil { + errstrings = append(errstrings, err.Error()) + } + } + + if len(errstrings) == 0 { + return "multiError must contain multiple error but is empty" + } + + return strings.Join(errstrings, "\n") +} + +func (me multiError) Is(err error) bool { + for _, e := range me { + if errors.Is(e, err) { + return true + } + var me2 multiError + if errors.As(e, &me2) { + if me2.Is(err) { + return true + } + } + } + return false +} diff --git a/pkg/interceptor/interceptor.go b/pkg/interceptor/interceptor.go new file mode 100644 index 0000000..3972aa2 --- /dev/null +++ b/pkg/interceptor/interceptor.go @@ -0,0 +1,64 @@ +package interceptor + +import ( + "io" + + "github.com/harshabose/skyline_sonata/serve/pkg/message" +) + +type Registry struct { + factories []Factory +} + +func (registry *Registry) Register(factory Factory) { + registry.factories = append(registry.factories, factory) +} + +func (registry *Registry) Build(id string) (Interceptor, error) { + if len(registry.factories) == 0 { + return &NoInterceptor{}, nil + } + + interceptors := make([]Interceptor, 0) + for _, factory := range registry.factories { + interceptor, err := factory.NewInterceptor(id) + if err != nil { + return nil, err + } + + interceptors = append(interceptors, interceptor) + } + + return CreateChain(interceptors), nil +} + +// Factory provides an interface for constructing interceptors +type Factory interface { + NewInterceptor(id string) (Interceptor, error) +} + +// Interceptor are transformers which bind to incoming, outgoing and connection of a client of the websocket. This can +// be used to add functionalities to the websocket connection. +type Interceptor interface { + // BindIncoming binds to incoming messages to a client + BindIncoming(IncomingReader) IncomingReader + + // BindOutgoing binds to outgoing messages from a client + BindOutgoing(OutgoingWriter) OutgoingWriter + + // BindConnection binds to the websocket connection itself + BindConnection(Connection) Connection + + io.Closer +} + +type IncomingReader interface { + Read([]byte) (int, error) +} + +type OutgoingWriter interface { + Write(message message.BaseMessage) (int, error) +} + +type Connection interface { +} diff --git a/pkg/interceptor/no_interceptor.go b/pkg/interceptor/no_interceptor.go new file mode 100644 index 0000000..82e5f63 --- /dev/null +++ b/pkg/interceptor/no_interceptor.go @@ -0,0 +1,19 @@ +package interceptor + +type NoInterceptor struct{} + +func (interceptor *NoInterceptor) BindIncoming(reader IncomingReader) IncomingReader { + return reader +} + +func (interceptor *NoInterceptor) BindOutgoing(writer OutgoingWriter) OutgoingWriter { + return writer +} + +func (interceptor *NoInterceptor) BindConnection(connection Connection) Connection { + return connection +} + +func (interceptor *NoInterceptor) Close() error { + return nil +} diff --git a/pkg/message/message.go b/pkg/message/message.go new file mode 100644 index 0000000..6a9d915 --- /dev/null +++ b/pkg/message/message.go @@ -0,0 +1,14 @@ +package message + +type BaseMessage interface { + Marshal() ([]byte, error) +} + +type Header struct { + SourceID string `json:"source_id"` + DestinationID string `json:"destination_id"` +} + +type Message struct { + Header +} diff --git a/pkg/socket/client.go b/pkg/socket/client.go new file mode 100644 index 0000000..69513dc --- /dev/null +++ b/pkg/socket/client.go @@ -0,0 +1 @@ +package socket diff --git a/pkg/socket/options.go b/pkg/socket/options.go new file mode 100644 index 0000000..e51ec1d --- /dev/null +++ b/pkg/socket/options.go @@ -0,0 +1,3 @@ +package socket + +type Option = func(*Socket) error diff --git a/pkg/socket/settings.go b/pkg/socket/settings.go new file mode 100644 index 0000000..e69e75a --- /dev/null +++ b/pkg/socket/settings.go @@ -0,0 +1,8 @@ +package socket + +type Settings struct { +} + +func RegisterDefaultSettings(settings *Settings) error { + return nil +} diff --git a/pkg/socket/socket.go b/pkg/socket/socket.go new file mode 100644 index 0000000..b912702 --- /dev/null +++ b/pkg/socket/socket.go @@ -0,0 +1,81 @@ +package socket + +import ( + "github.com/coder/websocket" + + "github.com/harshabose/skyline_sonata/serve/pkg/interceptor" +) + +type API struct { + settings *Settings + interceptorRegistry *interceptor.Registry +} + +type APIOption = func(*API) error + +func WithSocketSettings(settings *Settings) APIOption { + return func(api *API) error { + api.settings = settings + return nil + } +} + +func WithInterceptorRegistry(registry *interceptor.Registry) APIOption { + return func(api *API) error { + api.interceptorRegistry = registry + return nil + } +} + +func CreateSocketFactory(options ...APIOption) (*API, error) { + api := &API{ + settings: nil, + interceptorRegistry: nil, + } + + for _, option := range options { + if err := option(api); err != nil { + return nil, err + } + } + + if api.settings == nil { + api.settings = &Settings{} + if err := RegisterDefaultSettings(api.settings); err != nil { + return nil, err + } + } + + return api, nil +} + +func (api *API) CreateWebSocket(id string, options ...Option) (*Socket, error) { + socket := &Socket{ + id: id, + } + + interceptors, err := api.interceptorRegistry.Build(id) + if err != nil { + return nil, err + } + + socket.interceptor = interceptors + + for _, option := range options { + if err := option(socket); err != nil { + return nil, err + } + } + + return socket, nil +} + +type Socket struct { + id string + connections map[string]*websocket.Conn + interceptor interceptor.Interceptor +} + +func (socket *Socket) Serve() { + +}