implemented restart and video update features

This commit is contained in:
harshabose
2025-10-21 20:36:28 +05:30
parent e70a3ba762
commit c6d7e502c3
11 changed files with 251 additions and 46 deletions
+7 -7
View File
@@ -136,10 +136,7 @@ func (bwc *BWEController) sendBitrateUpdate(id string, callback UpdateBitrateCal
case err := <-done:
if err != nil {
fmt.Printf("bitrate update callback (id=%s) failed: %v. Unsubscribing...\n", id, err)
if err := bwc.Unsubscribe(id); err != nil {
fmt.Printf(err.Error())
}
return
bwc.Unsubscribe(id)
}
case <-bwc.ctx.Done():
return
@@ -153,16 +150,15 @@ func (bwc *BWEController) getBitrate() (int, error) {
return bwc.estimator.GetTargetBitrate(), nil
}
func (bwc *BWEController) Unsubscribe(id string) error {
func (bwc *BWEController) Unsubscribe(id string) {
bwc.mux.Lock()
defer bwc.mux.Unlock()
if _, exists := bwc.subs[id]; !exists {
return errors.New("subscriber not found")
return
}
delete(bwc.subs, id)
return nil
}
func (bwc *BWEController) Close() error {
@@ -178,6 +174,10 @@ func (bwc *BWEController) Close() error {
bwc.mux.Lock()
defer bwc.mux.Unlock()
if bwc.estimator == nil {
return
}
if err := bwc.estimator.Close(); err != nil {
merr = multierr.Append(merr, err)
}
+5 -5
View File
@@ -22,7 +22,7 @@ type AnswerSignal struct {
ctx context.Context
}
func CreateFirebaseAnswerSignal(ctx context.Context) *AnswerSignal {
func CreateFirebaseAnswerSignal(ctx context.Context) (*AnswerSignal, error) {
var (
configuration option.ClientOption
app *firebase.App
@@ -31,20 +31,20 @@ func CreateFirebaseAnswerSignal(ctx context.Context) *AnswerSignal {
)
if configuration, err = GetFirebaseConfiguration(); err != nil {
panic(err)
return nil, err
}
if app, err = firebase.NewApp(ctx, nil, configuration); err != nil {
panic(err)
return nil, err
}
if client, err = app.Firestore(ctx); err != nil {
panic(err)
return nil, err
}
return &AnswerSignal{
app: app,
client: client,
ctx: ctx,
}
}, nil
}
func (signal *AnswerSignal) Connect(category string, pc *PeerConnection) error {
+5 -4
View File
@@ -24,10 +24,11 @@ type firebaseConfig struct {
func GetFirebaseConfiguration() (option.ClientOption, error) {
config := firebaseConfig{
Type: os.Getenv("FIREBASE_TYPE"),
ProjectID: os.Getenv("FIREBASE_PROJECT_ID"),
PrivateKeyID: os.Getenv("FIREBASE_PRIVATE_KEY_ID"),
PrivateKey: strings.ReplaceAll(os.Getenv("FIREBASE_PRIVATE_KEY"), "\\n", "\n"),
Type: os.Getenv("FIREBASE_TYPE"),
ProjectID: os.Getenv("FIREBASE_PROJECT_ID"),
PrivateKeyID: os.Getenv("FIREBASE_PRIVATE_KEY_ID"),
PrivateKey: strings.ReplaceAll(os.Getenv("FIREBASE_PRIVATE_KEY"), "\\n", "\n"),
// PrivateKey: os.Getenv("FIREBASE_PRIVATE_KEY"),
ClientEmail: os.Getenv("FIREBASE_CLIENT_EMAIL"),
ClientID: os.Getenv("FIREBASE_CLIENT_ID"),
AuthURI: os.Getenv("FIREBASE_AUTH_URI"),
+11 -12
View File
@@ -14,14 +14,14 @@ import (
"google.golang.org/grpc/status"
)
type OfferSignal struct {
type FirebaseOfferSignal struct {
app *firebase.App
firebaseClient *firestore.Client
docRef *firestore.DocumentRef
ctx context.Context
}
func CreateFirebaseOfferSignal(ctx context.Context) *OfferSignal {
func CreateFirebaseOfferSignal(ctx context.Context) (*FirebaseOfferSignal, error) {
var (
configuration option.ClientOption
app *firebase.App
@@ -29,25 +29,24 @@ func CreateFirebaseOfferSignal(ctx context.Context) *OfferSignal {
err error
)
// TODO: I DO NOT LIKE THE 'PANIC' HERE. REMOVE AND RETURN A ERROR
if configuration, err = GetFirebaseConfiguration(); err != nil {
panic(err)
return nil, err
}
if app, err = firebase.NewApp(ctx, nil, configuration); err != nil {
panic(err)
return nil, err
}
if firebaseClient, err = app.Firestore(ctx); err != nil {
panic(err)
return nil, err
}
return &OfferSignal{
return &FirebaseOfferSignal{
app: app,
firebaseClient: firebaseClient,
ctx: ctx,
}
}, nil
}
func (signal *OfferSignal) Connect(category string, pc *PeerConnection) error {
func (signal *FirebaseOfferSignal) Connect(category string, pc *PeerConnection) error {
signal.docRef = signal.firebaseClient.Collection(category).Doc(pc.label)
_, err := signal.docRef.Get(signal.ctx)
@@ -97,7 +96,7 @@ func (signal *OfferSignal) Connect(category string, pc *PeerConnection) error {
return signal.offer(pc)
}
func (signal *OfferSignal) offer(pc *PeerConnection) error {
func (signal *FirebaseOfferSignal) offer(pc *PeerConnection) error {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
@@ -126,7 +125,7 @@ loop:
Type: webrtc.SDPTypeAnswer,
SDP: sdp,
}); err != nil {
fmt.Printf("error while setting remote description: %s", err.Error())
fmt.Printf("error while setting remote description: %s\n", err.Error())
continue loop
}
@@ -137,7 +136,7 @@ loop:
return nil
}
func (signal *OfferSignal) Close() error {
func (signal *FirebaseOfferSignal) Close() error {
if err := signal.firebaseClient.Close(); err != nil {
fmt.Printf("failed to close firebase client with error: %v\n", err)
return err
+82
View File
@@ -0,0 +1,82 @@
package client
import (
"context"
"errors"
"fmt"
"github.com/pion/webrtc/v4"
)
type GenericAnswerSignal struct {
ctx context.Context
forOffer ForOffer
onAnswer OnAnswer
}
func NewGenericAnswerSignal(ctx context.Context, onAnswer OnAnswer, forOffer ForOffer) *GenericAnswerSignal {
return &GenericAnswerSignal{
ctx: ctx,
forOffer: forOffer,
onAnswer: onAnswer,
}
}
func (s *GenericAnswerSignal) Connect(_ string, pc *PeerConnection) error {
if s.onAnswer == nil || s.forOffer == nil {
return errors.New("connect method cannot be used. use offer and answer methods instead")
}
offerSDP, err := s.forOffer(s.ctx)
if err != nil {
return err
}
if err := s.Offer(pc, offerSDP); err != nil {
return err
}
answerSDP, err := s.Answer(pc)
if err != nil {
return err
}
return s.onAnswer(s.ctx, answerSDP)
}
// Offer sets the remote offer SDP on the PeerConnection.
func (s *GenericAnswerSignal) Offer(pc *PeerConnection, sdp string) error {
if err := pc.peerConnection.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: sdp,
}); err != nil {
return fmt.Errorf("failed to set remote description (pc=%s); err: %w", pc.GetLabel(), err)
}
return nil
}
// Answer creates and sets the local answer, waits for ICE gathering to complete, and returns the local SDP.
func (s *GenericAnswerSignal) Answer(pc *PeerConnection) (string, error) {
answer, err := pc.peerConnection.CreateAnswer(nil)
if err != nil {
return "", fmt.Errorf("error while creating answer: %w", err)
}
if err := pc.peerConnection.SetLocalDescription(answer); err != nil {
return "", fmt.Errorf("error while setting local sdp: %w", err)
}
select {
case <-s.ctx.Done():
return "", fmt.Errorf("failed to gather ICE candidates within context deadline; err: %w", s.ctx.Err())
case <-webrtc.GatheringCompletePromise(pc.peerConnection):
}
return pc.peerConnection.LocalDescription().SDP, nil
}
func (s *GenericAnswerSignal) Close() error {
// NOTE: INTENTIONALLY EMPTY
return nil
}
+86
View File
@@ -0,0 +1,86 @@
package client
import (
"context"
"errors"
"fmt"
"github.com/pion/webrtc/v4"
)
type GenericOfferSignal struct {
ctx context.Context
onOffer OnOffer
forAnswer ForAnswer
}
func NewGenericOfferSignal(ctx context.Context, onOffer OnOffer, forAnswer ForAnswer) *GenericOfferSignal {
return &GenericOfferSignal{
ctx: ctx,
onOffer: onOffer,
forAnswer: forAnswer,
}
}
func (s *GenericOfferSignal) Connect(_ string, pc *PeerConnection) error {
if s.onOffer == nil || s.forAnswer == nil {
return errors.New("connect method cannot be used. use offer and answer methods instead")
}
offer, err := s.Offer(pc)
if err != nil {
return err
}
if err := s.onOffer(s.ctx, offer); err != nil {
return err
}
answer, err := s.forAnswer(s.ctx)
if err != nil {
return err
}
if err := s.Answer(pc, answer); err != nil {
return err
}
return nil
}
func (s *GenericOfferSignal) Offer(pc *PeerConnection) (string, error) {
offer, err := pc.peerConnection.CreateOffer(nil)
if err != nil {
return "", fmt.Errorf("error while creating offer: %w", err)
}
if err := pc.peerConnection.SetLocalDescription(offer); err != nil {
return "", fmt.Errorf("error while setting local sdp: %w", err)
}
select {
case <-s.ctx.Done():
return "", fmt.Errorf("failed to gather ICE candidates within context deadline; err: %w", s.ctx.Err())
case <-webrtc.GatheringCompletePromise(pc.peerConnection):
}
return pc.peerConnection.LocalDescription().SDP, nil
}
func (s *GenericOfferSignal) Answer(pc *PeerConnection, sdp string) error {
if err := pc.peerConnection.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: sdp,
}); err != nil {
return fmt.Errorf("failed to set remote description (pc=%s); err: %w", pc.GetLabel(), err)
}
return nil
}
func (s *GenericOfferSignal) Close() error {
// NOTE: INTENTIONALLY EMPTY
return nil
}
+23 -7
View File
@@ -310,23 +310,39 @@ func (pc *PeerConnection) GetBWEstimator() (*BWEController, error) {
}
func (pc *PeerConnection) Close() error {
fmt.Printf("[PeerConnection] Starting close for peer connection: %s\n", pc.label)
var merr error
pc.once.Do(func() {
fmt.Printf("[PeerConnection] Canceling context for peer: %s\n", pc.label)
if pc.cancel != nil {
pc.cancel()
}
if pc.dataChannels != nil {
merr = multierr.Append(merr, pc.dataChannels.Close())
fmt.Printf("[PeerConnection] Closing underlying WebRTC peer connection for: %s\n", pc.label)
if err := pc.peerConnection.Close(); err != nil {
fmt.Printf("[PeerConnection] ERROR: Failed to close WebRTC peer connection for %s: %v\n", pc.label, err)
merr = multierr.Append(merr, err)
} else {
fmt.Printf("[PeerConnection] ✓ WebRTC peer connection closed for: %s\n", pc.label)
}
if pc.bwc != nil {
merr = multierr.Append(merr, pc.bwc.Close())
fmt.Printf("[PeerConnection] Closing bandwidth controller for: %s\n", pc.label)
if err := pc.bwc.Close(); err != nil {
fmt.Printf("[PeerConnection] ERROR: Failed to close bandwidth controller for %s: %v\n", pc.label, err)
merr = multierr.Append(merr, err)
} else {
fmt.Printf("[PeerConnection] ✓ Bandwidth controller closed for: %s\n", pc.label)
}
}
if merr == nil {
fmt.Printf("[PeerConnection] ✓ Successfully closed peer connection: %s\n", pc.label)
} else {
fmt.Printf("[PeerConnection] ⚠ Peer connection closed with errors for %s: %v\n", pc.label, merr)
}
})
// clear tracks if any
// clear sinks if any
// clear bwc ??
return pc.peerConnection.Close()
return merr
}
+8 -4
View File
@@ -54,18 +54,20 @@ func (s *Sink) setRTPReceiver(receiver *webrtc.RTPReceiver) {
s.rtpReceiver = receiver
}
func (s *Sink) readRTPReceiver(rtcpBuf []byte) {
func (s *Sink) readRTPReceiver(rtcpBuf []byte) error {
s.mux.RLock()
defer s.mux.RUnlock()
if s.rtpReceiver == nil {
time.Sleep(10 * time.Millisecond)
return
return nil
}
if _, _, err := s.rtpReceiver.Read(rtcpBuf); err != nil {
fmt.Printf("error while reading rtcp packets")
fmt.Printf("error while reading rtcp packets (err=%v)\n", err)
return err
}
return nil
}
func (s *Sink) rtpReceiverLoop() {
@@ -76,7 +78,9 @@ func (s *Sink) rtpReceiverLoop() {
return
default:
rtcpBuf := make([]byte, 1500)
s.readRTPReceiver(rtcpBuf)
if err := s.readRTPReceiver(rtcpBuf); err != nil {
return
}
}
}
}
+1 -2
View File
@@ -3,7 +3,6 @@ package mediasource
import (
"context"
"errors"
"fmt"
"github.com/pion/webrtc/v4"
"github.com/pion/webrtc/v4/pkg/media"
@@ -63,7 +62,7 @@ func (track *Track) rtpSenderLoop() {
default:
rtcpBuf := make([]byte, 1500)
if _, _, err := track.rtpSender.Read(rtcpBuf); err != nil {
fmt.Println("error while reading rtcp packets")
// fmt.Println("error while reading rtcp packets")
}
}
}
+11 -1
View File
@@ -6,7 +6,7 @@ import (
"github.com/pion/webrtc/v4"
)
func GetRTCConfiguration() webrtc.Configuration {
func GetFullRTCConfiguration() webrtc.Configuration {
return webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
@@ -33,3 +33,13 @@ func GetRTCConfiguration() webrtc.Configuration {
},
}
}
func GetSTUNOnlyRTCConfiguration() webrtc.Configuration {
return webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{os.Getenv("STUN_SERVER_URL")},
},
},
}
}
+12 -4
View File
@@ -1,5 +1,7 @@
package client
import "context"
const (
FieldOffer = "offer"
FieldAnswer = "answer"
@@ -11,7 +13,13 @@ const (
FieldCreatedAt = "created-at"
)
type BaseSignal interface {
Connect(string, *PeerConnection) error
Close() error
}
type (
BaseSignal interface {
Connect(string, *PeerConnection) error
Close() error
}
ForOffer func(ctx context.Context) (string, error)
OnAnswer func(ctx context.Context, sdp string) error
OnOffer func(ctx context.Context, sdp string) error
ForAnswer func(ctx context.Context) (string, error)
)