diff --git a/bwestimator.go b/bwestimator.go index c8a0735..778da8f 100644 --- a/bwestimator.go +++ b/bwestimator.go @@ -136,10 +136,7 @@ func (bwc *BWEController) sendBitrateUpdate(id string, callback UpdateBitrateCal case err := <-done: if err != nil { fmt.Printf("bitrate update callback (id=%s) failed: %v. Unsubscribing...\n", id, err) - if err := bwc.Unsubscribe(id); err != nil { - fmt.Printf(err.Error()) - } - return + bwc.Unsubscribe(id) } case <-bwc.ctx.Done(): return @@ -153,16 +150,15 @@ func (bwc *BWEController) getBitrate() (int, error) { return bwc.estimator.GetTargetBitrate(), nil } -func (bwc *BWEController) Unsubscribe(id string) error { +func (bwc *BWEController) Unsubscribe(id string) { bwc.mux.Lock() defer bwc.mux.Unlock() if _, exists := bwc.subs[id]; !exists { - return errors.New("subscriber not found") + return } delete(bwc.subs, id) - return nil } func (bwc *BWEController) Close() error { @@ -178,6 +174,10 @@ func (bwc *BWEController) Close() error { bwc.mux.Lock() defer bwc.mux.Unlock() + if bwc.estimator == nil { + return + } + if err := bwc.estimator.Close(); err != nil { merr = multierr.Append(merr, err) } diff --git a/firebase_answer.go b/firebase_answer.go index bba25e2..68b39a6 100644 --- a/firebase_answer.go +++ b/firebase_answer.go @@ -22,7 +22,7 @@ type AnswerSignal struct { ctx context.Context } -func CreateFirebaseAnswerSignal(ctx context.Context) *AnswerSignal { +func CreateFirebaseAnswerSignal(ctx context.Context) (*AnswerSignal, error) { var ( configuration option.ClientOption app *firebase.App @@ -31,20 +31,20 @@ func CreateFirebaseAnswerSignal(ctx context.Context) *AnswerSignal { ) if configuration, err = GetFirebaseConfiguration(); err != nil { - panic(err) + return nil, err } if app, err = firebase.NewApp(ctx, nil, configuration); err != nil { - panic(err) + return nil, err } if client, err = app.Firestore(ctx); err != nil { - panic(err) + return nil, err } return &AnswerSignal{ app: app, client: client, ctx: ctx, - } + }, nil } func (signal *AnswerSignal) Connect(category string, pc *PeerConnection) error { diff --git a/firebase_config.go b/firebase_config.go index d291f9d..34d1af2 100644 --- a/firebase_config.go +++ b/firebase_config.go @@ -24,10 +24,11 @@ type firebaseConfig struct { func GetFirebaseConfiguration() (option.ClientOption, error) { config := firebaseConfig{ - Type: os.Getenv("FIREBASE_TYPE"), - ProjectID: os.Getenv("FIREBASE_PROJECT_ID"), - PrivateKeyID: os.Getenv("FIREBASE_PRIVATE_KEY_ID"), - PrivateKey: strings.ReplaceAll(os.Getenv("FIREBASE_PRIVATE_KEY"), "\\n", "\n"), + Type: os.Getenv("FIREBASE_TYPE"), + ProjectID: os.Getenv("FIREBASE_PROJECT_ID"), + PrivateKeyID: os.Getenv("FIREBASE_PRIVATE_KEY_ID"), + PrivateKey: strings.ReplaceAll(os.Getenv("FIREBASE_PRIVATE_KEY"), "\\n", "\n"), + // PrivateKey: os.Getenv("FIREBASE_PRIVATE_KEY"), ClientEmail: os.Getenv("FIREBASE_CLIENT_EMAIL"), ClientID: os.Getenv("FIREBASE_CLIENT_ID"), AuthURI: os.Getenv("FIREBASE_AUTH_URI"), diff --git a/firebase_offer.go b/firebase_offer.go index 24fedc3..a700ad8 100644 --- a/firebase_offer.go +++ b/firebase_offer.go @@ -14,14 +14,14 @@ import ( "google.golang.org/grpc/status" ) -type OfferSignal struct { +type FirebaseOfferSignal struct { app *firebase.App firebaseClient *firestore.Client docRef *firestore.DocumentRef ctx context.Context } -func CreateFirebaseOfferSignal(ctx context.Context) *OfferSignal { +func CreateFirebaseOfferSignal(ctx context.Context) (*FirebaseOfferSignal, error) { var ( configuration option.ClientOption app *firebase.App @@ -29,25 +29,24 @@ func CreateFirebaseOfferSignal(ctx context.Context) *OfferSignal { err error ) - // TODO: I DO NOT LIKE THE 'PANIC' HERE. REMOVE AND RETURN A ERROR if configuration, err = GetFirebaseConfiguration(); err != nil { - panic(err) + return nil, err } if app, err = firebase.NewApp(ctx, nil, configuration); err != nil { - panic(err) + return nil, err } if firebaseClient, err = app.Firestore(ctx); err != nil { - panic(err) + return nil, err } - return &OfferSignal{ + return &FirebaseOfferSignal{ app: app, firebaseClient: firebaseClient, ctx: ctx, - } + }, nil } -func (signal *OfferSignal) Connect(category string, pc *PeerConnection) error { +func (signal *FirebaseOfferSignal) Connect(category string, pc *PeerConnection) error { signal.docRef = signal.firebaseClient.Collection(category).Doc(pc.label) _, err := signal.docRef.Get(signal.ctx) @@ -97,7 +96,7 @@ func (signal *OfferSignal) Connect(category string, pc *PeerConnection) error { return signal.offer(pc) } -func (signal *OfferSignal) offer(pc *PeerConnection) error { +func (signal *FirebaseOfferSignal) offer(pc *PeerConnection) error { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() @@ -126,7 +125,7 @@ loop: Type: webrtc.SDPTypeAnswer, SDP: sdp, }); err != nil { - fmt.Printf("error while setting remote description: %s", err.Error()) + fmt.Printf("error while setting remote description: %s\n", err.Error()) continue loop } @@ -137,7 +136,7 @@ loop: return nil } -func (signal *OfferSignal) Close() error { +func (signal *FirebaseOfferSignal) Close() error { if err := signal.firebaseClient.Close(); err != nil { fmt.Printf("failed to close firebase client with error: %v\n", err) return err diff --git a/generic_answer.go b/generic_answer.go new file mode 100644 index 0000000..16be8bb --- /dev/null +++ b/generic_answer.go @@ -0,0 +1,82 @@ +package client + +import ( + "context" + "errors" + "fmt" + + "github.com/pion/webrtc/v4" +) + +type GenericAnswerSignal struct { + ctx context.Context + + forOffer ForOffer + onAnswer OnAnswer +} + +func NewGenericAnswerSignal(ctx context.Context, onAnswer OnAnswer, forOffer ForOffer) *GenericAnswerSignal { + return &GenericAnswerSignal{ + ctx: ctx, + forOffer: forOffer, + onAnswer: onAnswer, + } +} + +func (s *GenericAnswerSignal) Connect(_ string, pc *PeerConnection) error { + if s.onAnswer == nil || s.forOffer == nil { + return errors.New("connect method cannot be used. use offer and answer methods instead") + } + + offerSDP, err := s.forOffer(s.ctx) + if err != nil { + return err + } + + if err := s.Offer(pc, offerSDP); err != nil { + return err + } + + answerSDP, err := s.Answer(pc) + if err != nil { + return err + } + + return s.onAnswer(s.ctx, answerSDP) +} + +// Offer sets the remote offer SDP on the PeerConnection. +func (s *GenericAnswerSignal) Offer(pc *PeerConnection, sdp string) error { + if err := pc.peerConnection.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeOffer, + SDP: sdp, + }); err != nil { + return fmt.Errorf("failed to set remote description (pc=%s); err: %w", pc.GetLabel(), err) + } + return nil +} + +// Answer creates and sets the local answer, waits for ICE gathering to complete, and returns the local SDP. +func (s *GenericAnswerSignal) Answer(pc *PeerConnection) (string, error) { + answer, err := pc.peerConnection.CreateAnswer(nil) + if err != nil { + return "", fmt.Errorf("error while creating answer: %w", err) + } + + if err := pc.peerConnection.SetLocalDescription(answer); err != nil { + return "", fmt.Errorf("error while setting local sdp: %w", err) + } + + select { + case <-s.ctx.Done(): + return "", fmt.Errorf("failed to gather ICE candidates within context deadline; err: %w", s.ctx.Err()) + case <-webrtc.GatheringCompletePromise(pc.peerConnection): + } + + return pc.peerConnection.LocalDescription().SDP, nil +} + +func (s *GenericAnswerSignal) Close() error { + // NOTE: INTENTIONALLY EMPTY + return nil +} diff --git a/generic_offer.go b/generic_offer.go new file mode 100644 index 0000000..a1afdcc --- /dev/null +++ b/generic_offer.go @@ -0,0 +1,86 @@ +package client + +import ( + "context" + "errors" + "fmt" + + "github.com/pion/webrtc/v4" +) + +type GenericOfferSignal struct { + ctx context.Context + + onOffer OnOffer + forAnswer ForAnswer +} + +func NewGenericOfferSignal(ctx context.Context, onOffer OnOffer, forAnswer ForAnswer) *GenericOfferSignal { + return &GenericOfferSignal{ + ctx: ctx, + onOffer: onOffer, + forAnswer: forAnswer, + } +} + +func (s *GenericOfferSignal) Connect(_ string, pc *PeerConnection) error { + if s.onOffer == nil || s.forAnswer == nil { + return errors.New("connect method cannot be used. use offer and answer methods instead") + } + + offer, err := s.Offer(pc) + if err != nil { + return err + } + + if err := s.onOffer(s.ctx, offer); err != nil { + return err + } + + answer, err := s.forAnswer(s.ctx) + if err != nil { + return err + } + + if err := s.Answer(pc, answer); err != nil { + return err + } + + return nil +} + +func (s *GenericOfferSignal) Offer(pc *PeerConnection) (string, error) { + offer, err := pc.peerConnection.CreateOffer(nil) + if err != nil { + return "", fmt.Errorf("error while creating offer: %w", err) + } + + if err := pc.peerConnection.SetLocalDescription(offer); err != nil { + return "", fmt.Errorf("error while setting local sdp: %w", err) + + } + + select { + case <-s.ctx.Done(): + return "", fmt.Errorf("failed to gather ICE candidates within context deadline; err: %w", s.ctx.Err()) + case <-webrtc.GatheringCompletePromise(pc.peerConnection): + } + + return pc.peerConnection.LocalDescription().SDP, nil +} + +func (s *GenericOfferSignal) Answer(pc *PeerConnection, sdp string) error { + if err := pc.peerConnection.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeAnswer, + SDP: sdp, + }); err != nil { + return fmt.Errorf("failed to set remote description (pc=%s); err: %w", pc.GetLabel(), err) + } + + return nil +} + +func (s *GenericOfferSignal) Close() error { + // NOTE: INTENTIONALLY EMPTY + return nil +} diff --git a/peerconnection.go b/peerconnection.go index cd93a9f..cad02fb 100644 --- a/peerconnection.go +++ b/peerconnection.go @@ -310,23 +310,39 @@ func (pc *PeerConnection) GetBWEstimator() (*BWEController, error) { } func (pc *PeerConnection) Close() error { + fmt.Printf("[PeerConnection] Starting close for peer connection: %s\n", pc.label) + var merr error pc.once.Do(func() { + fmt.Printf("[PeerConnection] Canceling context for peer: %s\n", pc.label) if pc.cancel != nil { pc.cancel() } - if pc.dataChannels != nil { - merr = multierr.Append(merr, pc.dataChannels.Close()) + fmt.Printf("[PeerConnection] Closing underlying WebRTC peer connection for: %s\n", pc.label) + if err := pc.peerConnection.Close(); err != nil { + fmt.Printf("[PeerConnection] ERROR: Failed to close WebRTC peer connection for %s: %v\n", pc.label, err) + merr = multierr.Append(merr, err) + } else { + fmt.Printf("[PeerConnection] ✓ WebRTC peer connection closed for: %s\n", pc.label) } if pc.bwc != nil { - merr = multierr.Append(merr, pc.bwc.Close()) + fmt.Printf("[PeerConnection] Closing bandwidth controller for: %s\n", pc.label) + if err := pc.bwc.Close(); err != nil { + fmt.Printf("[PeerConnection] ERROR: Failed to close bandwidth controller for %s: %v\n", pc.label, err) + merr = multierr.Append(merr, err) + } else { + fmt.Printf("[PeerConnection] ✓ Bandwidth controller closed for: %s\n", pc.label) + } + } + + if merr == nil { + fmt.Printf("[PeerConnection] ✓ Successfully closed peer connection: %s\n", pc.label) + } else { + fmt.Printf("[PeerConnection] ⚠ Peer connection closed with errors for %s: %v\n", pc.label, merr) } }) - // clear tracks if any - // clear sinks if any - // clear bwc ?? - return pc.peerConnection.Close() + return merr } diff --git a/pkg/mediasink/sinks.go b/pkg/mediasink/sinks.go index e6e8450..53b5ba2 100644 --- a/pkg/mediasink/sinks.go +++ b/pkg/mediasink/sinks.go @@ -54,18 +54,20 @@ func (s *Sink) setRTPReceiver(receiver *webrtc.RTPReceiver) { s.rtpReceiver = receiver } -func (s *Sink) readRTPReceiver(rtcpBuf []byte) { +func (s *Sink) readRTPReceiver(rtcpBuf []byte) error { s.mux.RLock() defer s.mux.RUnlock() if s.rtpReceiver == nil { time.Sleep(10 * time.Millisecond) - return + return nil } if _, _, err := s.rtpReceiver.Read(rtcpBuf); err != nil { - fmt.Printf("error while reading rtcp packets") + fmt.Printf("error while reading rtcp packets (err=%v)\n", err) + return err } + return nil } func (s *Sink) rtpReceiverLoop() { @@ -76,7 +78,9 @@ func (s *Sink) rtpReceiverLoop() { return default: rtcpBuf := make([]byte, 1500) - s.readRTPReceiver(rtcpBuf) + if err := s.readRTPReceiver(rtcpBuf); err != nil { + return + } } } } diff --git a/pkg/mediasource/track.go b/pkg/mediasource/track.go index 7419145..614e766 100644 --- a/pkg/mediasource/track.go +++ b/pkg/mediasource/track.go @@ -3,7 +3,6 @@ package mediasource import ( "context" "errors" - "fmt" "github.com/pion/webrtc/v4" "github.com/pion/webrtc/v4/pkg/media" @@ -63,7 +62,7 @@ func (track *Track) rtpSenderLoop() { default: rtcpBuf := make([]byte, 1500) if _, _, err := track.rtpSender.Read(rtcpBuf); err != nil { - fmt.Println("error while reading rtcp packets") + // fmt.Println("error while reading rtcp packets") } } } diff --git a/rtc_config.go b/rtc_config.go index ad04ed0..b469dcf 100644 --- a/rtc_config.go +++ b/rtc_config.go @@ -6,7 +6,7 @@ import ( "github.com/pion/webrtc/v4" ) -func GetRTCConfiguration() webrtc.Configuration { +func GetFullRTCConfiguration() webrtc.Configuration { return webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ { @@ -33,3 +33,13 @@ func GetRTCConfiguration() webrtc.Configuration { }, } } + +func GetSTUNOnlyRTCConfiguration() webrtc.Configuration { + return webrtc.Configuration{ + ICEServers: []webrtc.ICEServer{ + { + URLs: []string{os.Getenv("STUN_SERVER_URL")}, + }, + }, + } +} diff --git a/signal.go b/signal.go index 3314ad6..c4a6fe9 100644 --- a/signal.go +++ b/signal.go @@ -1,5 +1,7 @@ package client +import "context" + const ( FieldOffer = "offer" FieldAnswer = "answer" @@ -11,7 +13,13 @@ const ( FieldCreatedAt = "created-at" ) -type BaseSignal interface { - Connect(string, *PeerConnection) error - Close() error -} +type ( + BaseSignal interface { + Connect(string, *PeerConnection) error + Close() error + } + ForOffer func(ctx context.Context) (string, error) + OnAnswer func(ctx context.Context, sdp string) error + OnOffer func(ctx context.Context, sdp string) error + ForAnswer func(ctx context.Context) (string, error) +)