dual-sim integration v1.0

This commit is contained in:
harshabose
2026-02-16 00:04:04 +05:30
parent 4d4bdcd8e4
commit b707de69cd
7 changed files with 71 additions and 49 deletions
+6 -9
View File
@@ -175,9 +175,7 @@ func (bwc *BWEController) Unsubscribe(id string) {
delete(bwc.subs, id)
}
func (bwc *BWEController) Close() error {
var merr error = nil
func (bwc *BWEController) Close() {
bwc.once.Do(func() {
if bwc.cancel != nil {
bwc.cancel()
@@ -188,9 +186,11 @@ func (bwc *BWEController) Close() error {
bwc.mux.Lock()
defer bwc.mux.Unlock()
if bwc.estimator == nil {
return
}
// if bwc.estimator != nil {
// if err := bwc.estimator.Close(); err != nil {
// return
// }
// }
// NOTE: CLOSED BY PC
// if err := bwc.estimator.Close(); err != nil {
@@ -198,8 +198,5 @@ func (bwc *BWEController) Close() error {
// }
bwc.subs = nil
return
})
return merr
}
+5 -4
View File
@@ -3,6 +3,7 @@ package client
import (
"context"
"errors"
"fmt"
"iter"
"sync"
"time"
@@ -48,7 +49,7 @@ func NewClient(
if settings == nil {
settings = &webrtc.SettingEngine{}
}
// settings.SetFireOnTrackBeforeFirstRTP(true)
settings.DetachDataChannels()
c := &Client{
@@ -101,6 +102,7 @@ func (c *Client) CreatePeerConnectionWithBWEstimator(label string, config webrtc
select {
case e := <-c.estimator:
pc.bwc.set(e.e, e.interval)
pc.bwc.Start()
}
}
@@ -176,10 +178,10 @@ func (c *Client) ClosePeerConnectionIfExists(label string) error {
return nil
}
func (c *Client) Close() error {
func (c *Client) Close() {
for _, pc := range c.PeerConnections() {
if err := pc.Close(); err != nil {
return err
fmt.Printf("Client: error while closing err=%v\n", err)
}
}
@@ -187,5 +189,4 @@ func (c *Client) Close() error {
defer c.mux.Unlock()
c.pcs = make(map[string]*PeerConnection)
return nil
}
+1 -4
View File
@@ -387,10 +387,7 @@ func (pc *PeerConnection) Close() error {
}
if pc.bwc != nil {
if err := pc.bwc.Close(); err != nil {
merr = multierr.Append(merr, err)
}
return
pc.bwc.Close()
}
})
+31 -27
View File
@@ -7,25 +7,26 @@ import (
"iter"
"reflect"
"sync"
"time"
"github.com/pion/interceptor"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4"
"github.com/harshabose/mediapipe/pkg/generators"
"github.com/harshabose/tools/pkg/cond"
)
type Sink struct {
generator generators.CanGeneratePionRTPPacket
generator *webrtc.TrackRemote
codecCapability *webrtc.RTPCodecParameters
rtpReceiver *webrtc.RTPReceiver
mux sync.RWMutex
cond *cond.ContextCond
ctx context.Context
}
func CreateSink(ctx context.Context, options ...SinkOption) (*Sink, error) {
sink := &Sink{ctx: ctx}
sink.cond = cond.NewContextCond(&(sink.mux))
for _, option := range options {
if err := option(sink); err != nil {
@@ -40,30 +41,30 @@ func CreateSink(ctx context.Context, options ...SinkOption) (*Sink, error) {
return sink, nil
}
func (s *Sink) setGenerator(generator generators.CanGeneratePionRTPPacket) {
func (s *Sink) setGenerator(generator *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
s.mux.Lock()
defer s.mux.Unlock()
s.generator = generator
}
func (s *Sink) setRTPReceiver(receiver *webrtc.RTPReceiver) {
s.mux.Lock()
defer s.mux.Unlock()
s.rtpReceiver = receiver
s.cond.Broadcast()
}
func (s *Sink) readRTPReceiver(rtcpBuf []byte) error {
s.mux.RLock()
defer s.mux.RUnlock()
func (s *Sink) readRTPReceiver(ctx context.Context, rtcpBuf []byte) error {
s.mux.Lock()
if s.rtpReceiver == nil {
time.Sleep(10 * time.Millisecond)
return nil
for s.rtpReceiver == nil {
if err := s.cond.Wait(ctx); err != nil {
s.mux.Unlock()
return err
}
}
reader := s.rtpReceiver
if _, _, err := s.rtpReceiver.Read(rtcpBuf); err != nil {
s.mux.Unlock()
if _, _, err := reader.Read(rtcpBuf); err != nil {
fmt.Printf("error while reading rtcp packets (err=%v)\n", err)
return err
}
@@ -78,22 +79,27 @@ func (s *Sink) rtpReceiverLoop() {
return
default:
rtcpBuf := make([]byte, 1500)
if err := s.readRTPReceiver(rtcpBuf); err != nil {
if err := s.readRTPReceiver(s.ctx, rtcpBuf); err != nil {
return
}
}
}
}
func (s *Sink) ReadRTP() (*rtp.Packet, interceptor.Attributes, error) {
s.mux.RLock()
defer s.mux.RUnlock()
func (s *Sink) ReadRTP(ctx context.Context) (*rtp.Packet, interceptor.Attributes, error) {
s.cond.L.Lock()
if s.generator == nil {
return nil, interceptor.Attributes{}, nil
for s.generator == nil {
if err := s.cond.Wait(ctx); err != nil {
s.cond.L.Unlock()
return nil, nil, err
}
}
reader := s.generator
return s.generator.ReadRTP()
s.cond.L.Unlock()
return reader.ReadRTP()
}
type Sinks struct {
@@ -117,7 +123,6 @@ func (s *Sinks) onTrack(pc *webrtc.PeerConnection) {
sink, err := s.GetSink(remote.ID())
if err != nil {
fmt.Printf("failed to trigger on track callback with err: %v\n", err)
// TODO: MAYBE SET A DEFAULT SINK?
return
}
@@ -126,8 +131,7 @@ func (s *Sinks) onTrack(pc *webrtc.PeerConnection) {
return
}
sink.setRTPReceiver(receiver)
sink.setGenerator(remote)
sink.setGenerator(remote, receiver)
go sink.rtpReceiverLoop()
})
+1
View File
@@ -63,6 +63,7 @@ func (track *Track) rtpSenderLoop() {
rtcpBuf := make([]byte, 1500)
if _, _, err := track.rtpSender.Read(rtcpBuf); err != nil {
// fmt.Println("error while reading rtcp packets")
continue
}
}
}
+5 -1
View File
@@ -67,13 +67,17 @@ func (track *RTPTrack) rtpSenderLoop() {
default:
rtcpBuf := make([]byte, 1500)
if _, _, err := track.rtpSender.Read(rtcpBuf); err != nil {
fmt.Println("error while reading rtcp packets")
// fmt.Println("error while reading rtcp packets")
continue
}
}
}
}
func (track *RTPTrack) WriteRTP(packet *rtp.Packet) error {
if packet == nil {
return nil
}
if err := track.consumer.WriteRTP(packet); err != nil {
fmt.Printf("error while writing samples to track (id: ); err; %v. Continuing...", err)
}
+22 -4
View File
@@ -18,8 +18,11 @@ var (
)
type UpdateEncoderConfig struct {
MaxBitrate, MinBitrate int64
MinBitrateChangePercentage float64
EnableAdaptiveEncoding bool
MaxBitrate, MinBitrate int64
MinBitrateIncrementChangePercentage float64
MinBitrateDecrementChangePercentage float64
}
func (c UpdateEncoderConfig) validate() error {
@@ -104,6 +107,10 @@ func (u *UpdateEncoder) Close() {
}
func (u *UpdateEncoder) AdaptBitrate(bps int64) error {
if !u.config.EnableAdaptiveEncoding {
return nil
}
bps = u.cutoff(bps)
g, ok := u.encoder.(CanGetCurrentBitrate)
@@ -116,9 +123,20 @@ func (u *UpdateEncoder) AdaptBitrate(bps int64) error {
return err
}
increase := bps > current
_, change := calculateBitrateChange(current, bps)
if change < u.config.MinBitrateChangePercentage {
return nil
if increase {
if change < u.config.MinBitrateIncrementChangePercentage {
return nil
}
}
if !increase {
if change < u.config.MinBitrateDecrementChangePercentage {
return nil
}
}
if err := u.builder.AdaptBitrate(bps); err != nil {