prefinal commit; added direct rtsp input, updated transcoder, few minor ui bugs

This commit is contained in:
harshabose
2026-01-19 14:34:06 +05:30
parent cd56141da4
commit 4d4bdcd8e4
21 changed files with 464 additions and 661 deletions
+10 -5
View File
@@ -15,6 +15,11 @@ import (
"github.com/harshabose/tools/pkg/multierr"
)
type estimator struct {
e cc.BandwidthEstimator
interval time.Duration
}
type Client struct {
pcs map[string]*PeerConnection
mediaEngine *webrtc.MediaEngine
@@ -22,8 +27,8 @@ type Client struct {
interceptorRegistry *interceptor.Registry
api *webrtc.API
estimatorChan chan cc.BandwidthEstimator
getterChan chan stats.Getter
estimator chan estimator
getterChan chan stats.Getter
mux sync.RWMutex
ctx context.Context
@@ -51,7 +56,7 @@ func NewClient(
interceptorRegistry: interceptorRegistry,
settingsEngine: settings,
pcs: make(map[string]*PeerConnection),
estimatorChan: make(chan cc.BandwidthEstimator, 10),
estimator: make(chan estimator, 10),
ctx: ctx,
}
@@ -94,8 +99,8 @@ func (c *Client) CreatePeerConnectionWithBWEstimator(label string, config webrtc
// TODO: THE WEIRD DESIGN OF CC INTERCEPTOR IN PION. TRACK THE ISSUE WITH "https://github.com/pion/webrtc/issues/3053"
if pc.bwc != nil {
select {
case estimator := <-c.estimatorChan:
pc.bwc.set(estimator, 500*time.Millisecond)
case e := <-c.estimator:
pc.bwc.set(e.e, e.interval)
}
}
+2 -2
View File
@@ -230,8 +230,8 @@ func WithBandwidthControlInterceptor(initialBitrate, minimumBitrate, maximumBitr
return err
}
congestionController.OnNewPeerConnection(func(id string, estimator cc.BandwidthEstimator) {
client.estimatorChan <- estimator
congestionController.OnNewPeerConnection(func(id string, e cc.BandwidthEstimator) {
client.estimator <- estimator{e: e, interval: interval}
})
client.interceptorRegistry.Add(congestionController)
+93 -88
View File
@@ -5,6 +5,7 @@ package transcode
import (
"context"
"errors"
"sync"
"time"
"github.com/asticode/go-astiav"
@@ -17,19 +18,17 @@ type GeneralDecoder struct {
decoderContext *astiav.CodecContext
codec *astiav.Codec
buffer buffer.BufferWithGenerator[*astiav.Frame]
ctx context.Context
cancel context.CancelFunc
once sync.Once
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
func CreateGeneralDecoder(ctx context.Context, canProduceMediaType CanProduceMediaPacket, options ...DecoderOption) (*GeneralDecoder, error) {
var (
err error
contextOption DecoderOption
decoder *GeneralDecoder
)
ctx2, cancel := context.WithCancel(ctx)
decoder = &GeneralDecoder{
decoder := &GeneralDecoder{
demuxer: canProduceMediaType,
ctx: ctx2,
cancel: cancel,
@@ -40,17 +39,19 @@ func CreateGeneralDecoder(ctx context.Context, canProduceMediaType CanProduceMed
return nil, ErrorInterfaceMismatch
}
if canDescribeMediaPacket.MediaType() == astiav.MediaTypeVideo {
contextOption = withVideoSetDecoderContext(canDescribeMediaPacket)
}
if canDescribeMediaPacket.MediaType() == astiav.MediaTypeAudio {
contextOption = withAudioSetDecoderContext(canDescribeMediaPacket)
if canDescribeMediaPacket.MediaType() != astiav.MediaTypeVideo && canDescribeMediaPacket.MediaType() != astiav.MediaTypeAudio {
return nil, ErrorUnsupportedMedia
}
options = append([]DecoderOption{contextOption}, options...)
o := withVideoSetDecoderContext(canDescribeMediaPacket)
if canDescribeMediaPacket.MediaType() == astiav.MediaTypeAudio {
o = withAudioSetDecoderContext(canDescribeMediaPacket)
}
options = append([]DecoderOption{o}, options...)
for _, option := range options {
if err = option(decoder); err != nil {
if err := option(decoder); err != nil {
return nil, err
}
}
@@ -59,171 +60,175 @@ func CreateGeneralDecoder(ctx context.Context, canProduceMediaType CanProduceMed
decoder.buffer = buffer.NewChannelBufferWithGenerator(ctx, buffer.CreateFramePool(), 256, 1)
}
if err = decoder.decoderContext.Open(decoder.codec, nil); err != nil {
if err := decoder.decoderContext.Open(decoder.codec, nil); err != nil {
return nil, err
}
return decoder, nil
}
func (decoder *GeneralDecoder) Ctx() context.Context {
return decoder.ctx
func (d *GeneralDecoder) Start() {
go d.loop()
}
func (decoder *GeneralDecoder) Start() {
go decoder.loop()
func (d *GeneralDecoder) Close() {
d.once.Do(func() {
if d.cancel != nil {
d.cancel()
}
d.wg.Wait()
d.close()
})
}
func (decoder *GeneralDecoder) Stop() {
decoder.cancel()
}
func (decoder *GeneralDecoder) loop() {
defer decoder.close()
func (d *GeneralDecoder) loop() {
d.wg.Add(1)
defer d.wg.Done()
loop1:
for {
select {
case <-decoder.ctx.Done():
case <-d.ctx.Done():
return
default:
packet, err := decoder.getPacket()
packet, err := d.getPacket()
if err != nil {
// fmt.Println("unable to get packet from demuxer; err:", err.Error())
continue
}
if err := decoder.decoderContext.SendPacket(packet); err != nil {
decoder.demuxer.PutBack(packet)
if err := d.decoderContext.SendPacket(packet); err != nil {
d.demuxer.PutBack(packet)
if !errors.Is(err, astiav.ErrEagain) {
continue loop1
}
}
loop2:
for {
frame := decoder.buffer.Get()
if err := decoder.decoderContext.ReceiveFrame(frame); err != nil {
decoder.buffer.Put(frame)
frame := d.buffer.Get()
if err := d.decoderContext.ReceiveFrame(frame); err != nil {
d.buffer.Put(frame)
break loop2
}
frame.SetPictureType(astiav.PictureTypeNone)
frame.SetPictureType(astiav.PictureTypeNone) // this is needed as the ffmpeg decoder picture type is different
if err := decoder.pushFrame(frame); err != nil {
decoder.buffer.Put(frame)
if err := d.pushFrame(frame); err != nil {
d.buffer.Put(frame)
continue loop2
}
}
decoder.demuxer.PutBack(packet)
d.demuxer.PutBack(packet)
}
}
}
func (decoder *GeneralDecoder) pushFrame(frame *astiav.Frame) error {
ctx, cancel := context.WithTimeout(decoder.ctx, 50*time.Millisecond)
func (d *GeneralDecoder) pushFrame(frame *astiav.Frame) error {
ctx, cancel := context.WithTimeout(d.ctx, 50*time.Millisecond)
defer cancel()
return decoder.buffer.Push(ctx, frame)
return d.buffer.Push(ctx, frame)
}
func (decoder *GeneralDecoder) getPacket() (*astiav.Packet, error) {
ctx, cancel := context.WithTimeout(decoder.ctx, 50*time.Millisecond)
func (d *GeneralDecoder) getPacket() (*astiav.Packet, error) {
ctx, cancel := context.WithTimeout(d.ctx, 50*time.Millisecond)
defer cancel()
return decoder.demuxer.GetPacket(ctx)
return d.demuxer.GetPacket(ctx)
}
func (decoder *GeneralDecoder) GetFrame(ctx context.Context) (*astiav.Frame, error) {
return decoder.buffer.Pop(ctx)
func (d *GeneralDecoder) GetFrame(ctx context.Context) (*astiav.Frame, error) {
return d.buffer.Pop(ctx)
}
func (decoder *GeneralDecoder) PutBack(frame *astiav.Frame) {
decoder.buffer.Put(frame)
func (d *GeneralDecoder) PutBack(frame *astiav.Frame) {
d.buffer.Put(frame)
}
func (decoder *GeneralDecoder) close() {
if decoder.decoderContext != nil {
decoder.decoderContext.Free()
func (d *GeneralDecoder) close() {
if d.decoderContext != nil {
d.decoderContext.Free()
}
}
func (decoder *GeneralDecoder) SetBuffer(buffer buffer.BufferWithGenerator[*astiav.Frame]) {
decoder.buffer = buffer
func (d *GeneralDecoder) SetBuffer(buffer buffer.BufferWithGenerator[*astiav.Frame]) {
d.buffer = buffer
}
func (decoder *GeneralDecoder) SetCodec(producer CanDescribeMediaPacket) error {
if decoder.codec = astiav.FindDecoder(producer.CodecID()); decoder.codec == nil {
func (d *GeneralDecoder) SetCodec(producer CanDescribeMediaPacket) error {
if d.codec = astiav.FindDecoder(producer.CodecID()); d.codec == nil {
return ErrorNoCodecFound
}
decoder.decoderContext = astiav.AllocCodecContext(decoder.codec)
if decoder.decoderContext == nil {
d.decoderContext = astiav.AllocCodecContext(d.codec)
if d.decoderContext == nil {
return ErrorAllocateCodecContext
}
return nil
}
func (decoder *GeneralDecoder) FillContextContent(producer CanDescribeMediaPacket) error {
return producer.GetCodecParameters().ToCodecContext(decoder.decoderContext)
func (d *GeneralDecoder) FillContextContent(producer CanDescribeMediaPacket) error {
return producer.GetCodecParameters().ToCodecContext(d.decoderContext)
}
func (decoder *GeneralDecoder) SetFrameRate(producer CanDescribeFrameRate) {
decoder.decoderContext.SetFramerate(producer.FrameRate())
func (d *GeneralDecoder) SetFrameRate(producer CanDescribeFrameRate) {
d.decoderContext.SetFramerate(producer.FrameRate())
}
func (decoder *GeneralDecoder) SetTimeBase(producer CanDescribeTimeBase) {
decoder.decoderContext.SetTimeBase(producer.TimeBase())
func (d *GeneralDecoder) SetTimeBase(producer CanDescribeTimeBase) {
d.decoderContext.SetTimeBase(producer.TimeBase())
}
// ### IMPLEMENTS CanDescribeMediaVideoFrame
func (decoder *GeneralDecoder) FrameRate() astiav.Rational {
return decoder.decoderContext.Framerate()
func (d *GeneralDecoder) FrameRate() astiav.Rational {
return d.decoderContext.Framerate()
}
func (decoder *GeneralDecoder) TimeBase() astiav.Rational {
return decoder.decoderContext.TimeBase()
func (d *GeneralDecoder) TimeBase() astiav.Rational {
return d.decoderContext.TimeBase()
}
func (decoder *GeneralDecoder) Height() int {
return decoder.decoderContext.Height()
func (d *GeneralDecoder) Height() int {
return d.decoderContext.Height()
}
func (decoder *GeneralDecoder) Width() int {
return decoder.decoderContext.Width()
func (d *GeneralDecoder) Width() int {
return d.decoderContext.Width()
}
func (decoder *GeneralDecoder) PixelFormat() astiav.PixelFormat {
return decoder.decoderContext.PixelFormat()
func (d *GeneralDecoder) PixelFormat() astiav.PixelFormat {
return d.decoderContext.PixelFormat()
}
func (decoder *GeneralDecoder) SampleAspectRatio() astiav.Rational {
return decoder.decoderContext.SampleAspectRatio()
func (d *GeneralDecoder) SampleAspectRatio() astiav.Rational {
return d.decoderContext.SampleAspectRatio()
}
func (decoder *GeneralDecoder) ColorSpace() astiav.ColorSpace {
return decoder.decoderContext.ColorSpace()
func (d *GeneralDecoder) ColorSpace() astiav.ColorSpace {
return d.decoderContext.ColorSpace()
}
func (decoder *GeneralDecoder) ColorRange() astiav.ColorRange {
return decoder.decoderContext.ColorRange()
func (d *GeneralDecoder) ColorRange() astiav.ColorRange {
return d.decoderContext.ColorRange()
}
// ## CanDescribeMediaAudioFrame
func (decoder *GeneralDecoder) SampleRate() int {
return decoder.decoderContext.SampleRate()
func (d *GeneralDecoder) SampleRate() int {
return d.decoderContext.SampleRate()
}
func (decoder *GeneralDecoder) SampleFormat() astiav.SampleFormat {
return decoder.decoderContext.SampleFormat()
func (d *GeneralDecoder) SampleFormat() astiav.SampleFormat {
return d.decoderContext.SampleFormat()
}
func (decoder *GeneralDecoder) ChannelLayout() astiav.ChannelLayout {
return decoder.decoderContext.ChannelLayout()
func (d *GeneralDecoder) ChannelLayout() astiav.ChannelLayout {
return d.decoderContext.ChannelLayout()
}
// ## CanDescribeMediaFrame
func (decoder *GeneralDecoder) MediaType() astiav.MediaType {
return decoder.decoderContext.MediaType()
func (d *GeneralDecoder) MediaType() astiav.MediaType {
return d.decoderContext.MediaType()
}
+5 -2
View File
@@ -3,6 +3,8 @@
package transcode
import (
"context"
"github.com/asticode/go-astiav"
"github.com/harshabose/tools/pkg/buffer"
@@ -49,13 +51,14 @@ func withAudioSetDecoderContext(demuxer CanDescribeMediaPacket) DecoderOption {
}
}
func WithDecoderBuffer(size int, pool buffer.Pool[*astiav.Frame]) DecoderOption {
func WithDecoderBuffer(ctx context.Context, size int, pool buffer.Pool[*astiav.Frame]) DecoderOption {
return func(decoder Decoder) error {
s, ok := decoder.(CanSetBuffer[*astiav.Frame])
if !ok {
return ErrorInterfaceMismatch
}
s.SetBuffer(buffer.NewChannelBufferWithGenerator(decoder.Ctx(), pool, uint(size), 1))
s.SetBuffer(buffer.NewChannelBufferWithGenerator(ctx, pool, uint(size), 1))
return nil
}
}
+15 -7
View File
@@ -5,6 +5,7 @@ package transcode
import (
"context"
"fmt"
"sync"
"time"
"github.com/asticode/go-astiav"
@@ -21,6 +22,8 @@ type GeneralDemuxer struct {
buffer buffer.BufferWithGenerator[*astiav.Packet]
once sync.Once
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
@@ -76,20 +79,25 @@ func CreateGeneralDemuxer(ctx context.Context, containerAddress string, options
return demuxer, nil
}
func (d *GeneralDemuxer) Ctx() context.Context {
return d.ctx
}
func (d *GeneralDemuxer) Start() {
go d.loop()
}
func (d *GeneralDemuxer) Stop() {
d.cancel()
func (d *GeneralDemuxer) Close() {
d.once.Do(func() {
if d.cancel != nil {
d.cancel()
d.wg.Wait()
d.close()
}
})
}
func (d *GeneralDemuxer) loop() {
defer d.close()
d.wg.Add(1)
defer d.wg.Done()
loop1:
for {
+4 -2
View File
@@ -3,6 +3,8 @@
package transcode
import (
"context"
"github.com/asticode/go-astiav"
"github.com/harshabose/tools/pkg/buffer"
@@ -92,13 +94,13 @@ func WithAvFoundationInputFormatOption(demuxer Demuxer) error {
return nil
}
func WithDemuxerBuffer(size int, pool buffer.Pool[*astiav.Packet]) DemuxerOption {
func WithDemuxerBuffer(ctx context.Context, size int, pool buffer.Pool[*astiav.Packet]) DemuxerOption {
return func(demuxer Demuxer) error {
s, ok := demuxer.(CanSetBuffer[*astiav.Packet])
if !ok {
return ErrorInterfaceMismatch
}
s.SetBuffer(buffer.NewChannelBufferWithGenerator(demuxer.Ctx(), pool, uint(size), 1))
s.SetBuffer(buffer.NewChannelBufferWithGenerator(ctx, pool, uint(size), 1))
return nil
}
}
+61 -60
View File
@@ -4,7 +4,6 @@ package transcode
import (
"context"
"encoding/base64"
"errors"
"fmt"
"sync"
@@ -27,6 +26,7 @@ type GeneralEncoder struct {
pps []byte
once sync.Once
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
@@ -81,101 +81,102 @@ func CreateGeneralEncoder(ctx context.Context, codecID astiav.CodecID, canProduc
return encoder, nil
}
func (encoder *GeneralEncoder) Ctx() context.Context {
return encoder.ctx
func (e *GeneralEncoder) Start() {
go e.loop()
}
func (encoder *GeneralEncoder) Start() {
go encoder.loop()
func (e *GeneralEncoder) GetParameterSets() ([]byte, []byte, error) {
e.findParameterSets(e.encoderContext.ExtraData())
return e.sps, e.pps, nil
}
func (encoder *GeneralEncoder) GetParameterSets() ([]byte, []byte, error) {
encoder.findParameterSets(encoder.encoderContext.ExtraData())
return encoder.sps, encoder.pps, nil
func (e *GeneralEncoder) TimeBase() astiav.Rational {
return e.encoderContext.TimeBase()
}
func (encoder *GeneralEncoder) TimeBase() astiav.Rational {
return encoder.encoderContext.TimeBase()
}
func (encoder *GeneralEncoder) loop() {
defer encoder.close()
func (e *GeneralEncoder) loop() {
e.wg.Add(1)
defer e.wg.Done()
loop1:
for {
select {
case <-encoder.ctx.Done():
case <-e.ctx.Done():
return
default:
frame, err := encoder.getFrame()
frame, err := e.getFrame()
if err != nil {
continue
}
if err := encoder.encoderContext.SendFrame(frame); err != nil {
encoder.producer.PutBack(frame)
if err := e.encoderContext.SendFrame(frame); err != nil {
e.producer.PutBack(frame)
if !errors.Is(err, astiav.ErrEagain) {
continue loop1
}
}
loop2:
for {
packet := encoder.buffer.Get()
if err = encoder.encoderContext.ReceivePacket(packet); err != nil {
encoder.buffer.Put(packet)
packet := e.buffer.Get()
if err = e.encoderContext.ReceivePacket(packet); err != nil {
e.buffer.Put(packet)
break loop2
}
if err := encoder.pushPacket(packet); err != nil {
encoder.buffer.Put(packet)
if err := e.pushPacket(packet); err != nil {
e.buffer.Put(packet)
continue loop2
}
}
encoder.producer.PutBack(frame)
e.producer.PutBack(frame)
}
}
}
func (encoder *GeneralEncoder) getFrame() (*astiav.Frame, error) {
ctx, cancel := context.WithTimeout(encoder.ctx, 100*time.Millisecond)
func (e *GeneralEncoder) getFrame() (*astiav.Frame, error) {
ctx, cancel := context.WithTimeout(e.ctx, 100*time.Millisecond)
defer cancel()
return encoder.producer.GetFrame(ctx)
return e.producer.GetFrame(ctx)
}
func (encoder *GeneralEncoder) GetPacket(ctx context.Context) (*astiav.Packet, error) {
return encoder.buffer.Pop(ctx)
func (e *GeneralEncoder) GetPacket(ctx context.Context) (*astiav.Packet, error) {
return e.buffer.Pop(ctx)
}
func (encoder *GeneralEncoder) pushPacket(packet *astiav.Packet) error {
ctx, cancel := context.WithTimeout(encoder.ctx, 100*time.Millisecond)
func (e *GeneralEncoder) pushPacket(packet *astiav.Packet) error {
ctx, cancel := context.WithTimeout(e.ctx, 100*time.Millisecond)
defer cancel()
return encoder.buffer.Push(ctx, packet)
return e.buffer.Push(ctx, packet)
}
func (encoder *GeneralEncoder) PutBack(packet *astiav.Packet) {
encoder.buffer.Put(packet)
func (e *GeneralEncoder) PutBack(packet *astiav.Packet) {
e.buffer.Put(packet)
}
func (encoder *GeneralEncoder) Stop() {
encoder.once.Do(func() {
if encoder.cancel != nil {
encoder.cancel()
func (e *GeneralEncoder) Close() {
e.once.Do(func() {
if e.cancel != nil {
e.cancel()
}
e.wg.Wait()
e.close()
})
}
func (encoder *GeneralEncoder) close() {
if encoder.encoderContext != nil {
encoder.encoderContext.Free()
func (e *GeneralEncoder) close() {
if e.encoderContext != nil {
e.encoderContext.Free()
}
if encoder.codecFlags != nil {
encoder.codecFlags.Free()
if e.codecFlags != nil {
e.codecFlags.Free()
}
}
func (encoder *GeneralEncoder) findParameterSets(extraData []byte) {
func (e *GeneralEncoder) findParameterSets(extraData []byte) {
if len(extraData) > 0 {
// Find the first start code (0x00000001)
for i := 0; i < len(extraData)-4; i++ {
@@ -193,39 +194,39 @@ func (encoder *GeneralEncoder) findParameterSets(extraData []byte) {
}
if nalType == 7 { // SPS
encoder.sps = make([]byte, nextStart-i)
copy(encoder.sps, extraData[i:nextStart])
e.sps = make([]byte, nextStart-i)
copy(e.sps, extraData[i:nextStart])
} else if nalType == 8 { // PPS
encoder.pps = make([]byte, len(extraData)-i)
copy(encoder.pps, extraData[i:])
e.pps = make([]byte, len(extraData)-i)
copy(e.pps, extraData[i:])
}
i = nextStart - 1
}
}
fmt.Println("SPS for current encoder: ", encoder.sps)
fmt.Println("\tSPS for current encoder in Base64:", base64.StdEncoding.EncodeToString(encoder.sps))
fmt.Println("PPS for current encoder: ", encoder.pps)
fmt.Println("\tPPS for current encoder in Base64:", base64.StdEncoding.EncodeToString(encoder.pps))
// fmt.Println("SPS for current encoder: ", e.sps)
// fmt.Println("\tSPS for current encoder in Base64:", base64.StdEncoding.EncodeToString(e.sps))
// fmt.Println("PPS for current encoder: ", e.pps)
// fmt.Println("\tPPS for current encoder in Base64:", base64.StdEncoding.EncodeToString(e.pps))
}
}
func (encoder *GeneralEncoder) SetBuffer(buffer buffer.BufferWithGenerator[*astiav.Packet]) {
encoder.buffer = buffer
func (e *GeneralEncoder) SetBuffer(buffer buffer.BufferWithGenerator[*astiav.Packet]) {
e.buffer = buffer
}
func (encoder *GeneralEncoder) SetEncoderCodecSettings(settings codecSettings) error {
encoder.encoderSettings = settings
return encoder.encoderSettings.ForEach(func(key string, value string) error {
func (e *GeneralEncoder) SetEncoderCodecSettings(settings codecSettings) error {
e.encoderSettings = settings
return e.encoderSettings.ForEach(func(key string, value string) error {
if value == "" {
return nil
}
return encoder.codecFlags.Set(key, value, 0)
return e.codecFlags.Set(key, value, 0)
})
}
func (encoder *GeneralEncoder) GetCurrentBitrate() (int64, error) {
g, ok := encoder.encoderSettings.(CanGetCurrentBitrate)
func (e *GeneralEncoder) GetCurrentBitrate() (int64, error) {
g, ok := e.encoderSettings.(CanGetCurrentBitrate)
if !ok {
return 0, ErrorInterfaceMismatch
}
+11 -61
View File
@@ -6,25 +6,22 @@ import (
"context"
"github.com/asticode/go-astiav"
"github.com/harshabose/tools/pkg/buffer"
)
type GeneralEncoderBuilder struct {
codecID astiav.CodecID
bufferSize int
pool buffer.Pool[*astiav.Packet]
settings codecSettings
producer CanProduceMediaFrame
codecID astiav.CodecID
producer CanProduceMediaFrame
options []EncoderOption
settings codecSettings
}
func NewEncoderBuilder(codecID astiav.CodecID, settings codecSettings, producer CanProduceMediaFrame, bufferSize int, pool buffer.Pool[*astiav.Packet]) *GeneralEncoderBuilder {
func NewEncoderBuilder(codecID astiav.CodecID, settings codecSettings, producer CanProduceMediaFrame, options ...EncoderOption) *GeneralEncoderBuilder {
return &GeneralEncoderBuilder{
bufferSize: bufferSize,
pool: pool,
codecID: codecID,
settings: settings,
producer: producer,
codecID: codecID,
producer: producer,
options: options,
settings: settings,
}
}
@@ -43,54 +40,7 @@ func (b *GeneralEncoderBuilder) BuildWithProducer(ctx context.Context, producer
}
func (b *GeneralEncoderBuilder) Build(ctx context.Context) (Encoder, error) {
codec := astiav.FindEncoder(b.codecID)
if codec == nil {
return nil, ErrorNoCodecFound
}
ctx2, cancel := context.WithCancel(ctx)
encoder := &GeneralEncoder{
producer: b.producer,
codec: codec,
codecFlags: astiav.NewDictionary(),
ctx: ctx2,
cancel: cancel,
}
encoder.encoderContext = astiav.AllocCodecContext(codec)
if encoder.encoderContext == nil {
return nil, ErrorAllocateCodecContext
}
canDescribeMediaFrame, ok := encoder.producer.(CanDescribeMediaFrame)
if !ok {
return nil, ErrorInterfaceMismatch
}
if canDescribeMediaFrame.MediaType() == astiav.MediaTypeAudio {
withAudioSetEncoderContextParameters(canDescribeMediaFrame, encoder.encoderContext)
}
if canDescribeMediaFrame.MediaType() == astiav.MediaTypeVideo {
withVideoSetEncoderContextParameter(canDescribeMediaFrame, encoder.encoderContext)
}
if err := encoder.SetEncoderCodecSettings(b.settings); err != nil {
return nil, err
}
if err := WithEncoderBufferSize(b.bufferSize, b.pool)(encoder); err != nil {
return nil, err
}
encoder.encoderContext.SetFlags(astiav.NewCodecContextFlags(astiav.CodecContextFlagGlobalHeader))
if err := encoder.encoderContext.Open(encoder.codec, encoder.codecFlags); err != nil {
return nil, err
}
encoder.findParameterSets(encoder.encoderContext.ExtraData())
return encoder, nil
return CreateGeneralEncoder(ctx, b.codecID, b.producer, append(b.options, WithCodecSettings(b.settings))...)
}
func (b *GeneralEncoderBuilder) GetCurrentBitrate() (int64, error) {
+3 -2
View File
@@ -3,6 +3,7 @@
package transcode
import (
"context"
"fmt"
"reflect"
"strings"
@@ -320,13 +321,13 @@ func withVideoSetEncoderContextParameter(filter CanDescribeMediaVideoFrame, eCtx
eCtx.SetFramerate(filter.FrameRate())
}
func WithEncoderBufferSize(size int, pool buffer.Pool[*astiav.Packet]) EncoderOption {
func WithEncoderBufferSize(ctx context.Context, size int, pool buffer.Pool[*astiav.Packet]) EncoderOption {
return func(encoder Encoder) error {
s, ok := encoder.(CanSetBuffer[*astiav.Packet])
if !ok {
return ErrorInterfaceMismatch
}
s.SetBuffer(buffer.NewChannelBufferWithGenerator(encoder.Ctx(), pool, uint(size), 1))
s.SetBuffer(buffer.NewChannelBufferWithGenerator(ctx, pool, uint(size), 1))
return nil
}
}
+1
View File
@@ -7,6 +7,7 @@ import "errors"
var (
ErrorAllocateFormatContext = errors.New("error allocate format context")
ErrorOpenInputContainer = errors.New("error opening container")
ErrorUnsupportedMedia = errors.New("unsupported media type")
ErrorNoStreamFound = errors.New("error no stream found")
ErrorGeneralAllocate = errors.New("error allocating general object")
ErrorNoVideoStreamFound = errors.New("no video stream found")
+108 -98
View File
@@ -5,6 +5,7 @@ package transcode
import (
"context"
"fmt"
"sync"
"time"
"github.com/asticode/go-astiav"
@@ -22,12 +23,15 @@ type GeneralFilter struct {
srcContext *astiav.BuffersrcFilterContext
sinkContext *astiav.BuffersinkFilterContext
srcContextParams *astiav.BuffersrcFilterContextParameters // NOTE: THIS BECOMES NIL AFTER INITIALISATION
ctx context.Context
cancel context.CancelFunc
once sync.Once
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
func CreateGeneralFilter(ctx context.Context, canProduceMediaFrame CanProduceMediaFrame, filterConfig FilterConfig, options ...FilterOption) (*GeneralFilter, error) {
ctx2, cancel := context.WithCancel(ctx)
ctx2, cancel2 := context.WithCancel(ctx)
filter := &GeneralFilter{
graph: astiav.AllocFilterGraph(),
decoder: canProduceMediaFrame,
@@ -35,11 +39,9 @@ func CreateGeneralFilter(ctx context.Context, canProduceMediaFrame CanProduceMed
output: astiav.AllocFilterInOut(),
srcContextParams: astiav.AllocBuffersrcFilterContextParameters(),
ctx: ctx2,
cancel: cancel,
cancel: cancel2,
}
// TODO: CHECK IF ALL ATTRIBUTES ARE ALLOCATED PROPERLY
filterSrc := astiav.FindFilterByName(filterConfig.Source.String())
if filterSrc == nil {
return nil, ErrorNoFilterName
@@ -66,16 +68,20 @@ func CreateGeneralFilter(ctx context.Context, canProduceMediaFrame CanProduceMed
if !ok {
return nil, ErrorInterfaceMismatch
}
if canDescribeMediaFrame.MediaType() == astiav.MediaTypeVideo {
options = append([]FilterOption{withVideoSetFilterContextParameters(canDescribeMediaFrame)}, options...)
if canDescribeMediaFrame.MediaType() != astiav.MediaTypeVideo && canDescribeMediaFrame.MediaType() != astiav.MediaTypeAudio {
return nil, ErrorUnsupportedMedia
}
o := withVideoSetFilterContextParameters(canDescribeMediaFrame)
if canDescribeMediaFrame.MediaType() == astiav.MediaTypeAudio {
options = append([]FilterOption{withAudioSetFilterContextParameters(canDescribeMediaFrame)}, options...)
o = withAudioSetFilterContextParameters(canDescribeMediaFrame)
}
options = append([]FilterOption{o}, options...)
for _, option := range options {
if err = option(filter); err != nil {
// TODO: SET CONTENT HERE
return nil, err
}
}
@@ -121,184 +127,188 @@ func CreateGeneralFilter(ctx context.Context, canProduceMediaFrame CanProduceMed
return filter, nil
}
func (filter *GeneralFilter) Ctx() context.Context {
return filter.ctx
func (f *GeneralFilter) Start() {
go f.loop()
}
func (filter *GeneralFilter) Start() {
go filter.loop()
func (f *GeneralFilter) Close() {
f.once.Do(func() {
if f.cancel != nil {
f.cancel()
}
f.wg.Wait()
f.close()
})
}
func (filter *GeneralFilter) Stop() {
filter.cancel()
}
func (filter *GeneralFilter) loop() {
defer filter.close()
func (f *GeneralFilter) loop() {
f.wg.Add(1)
defer f.wg.Done()
loop1:
for {
select {
case <-filter.ctx.Done():
case <-f.ctx.Done():
return
default:
srcFrame, err := filter.getFrame()
srcFrame, err := f.getFrame()
if err != nil {
// fmt.Println("unable to get frame from decoder; err:", err.Error())
continue
}
if err := filter.srcContext.AddFrame(srcFrame, astiav.NewBuffersrcFlags(astiav.BuffersrcFlagKeepRef)); err != nil {
filter.buffer.Put(srcFrame)
if err := f.srcContext.AddFrame(srcFrame, astiav.NewBuffersrcFlags(astiav.BuffersrcFlagKeepRef)); err != nil {
f.buffer.Put(srcFrame)
continue loop1
}
loop2:
for {
sinkFrame := filter.buffer.Get()
if err = filter.sinkContext.GetFrame(sinkFrame, astiav.NewBuffersinkFlags()); err != nil {
filter.buffer.Put(sinkFrame)
sinkFrame := f.buffer.Get()
if err = f.sinkContext.GetFrame(sinkFrame, astiav.NewBuffersinkFlags()); err != nil {
f.buffer.Put(sinkFrame)
break loop2
}
if err := filter.pushFrame(sinkFrame); err != nil {
filter.buffer.Put(sinkFrame)
if err := f.pushFrame(sinkFrame); err != nil {
f.buffer.Put(sinkFrame)
continue loop2
}
}
filter.decoder.PutBack(srcFrame)
f.decoder.PutBack(srcFrame)
}
}
}
func (filter *GeneralFilter) pushFrame(frame *astiav.Frame) error {
ctx, cancel := context.WithTimeout(filter.ctx, 50*time.Millisecond)
func (f *GeneralFilter) pushFrame(frame *astiav.Frame) error {
ctx, cancel := context.WithTimeout(f.ctx, 50*time.Millisecond)
defer cancel()
return filter.buffer.Push(ctx, frame)
return f.buffer.Push(ctx, frame)
}
func (filter *GeneralFilter) getFrame() (*astiav.Frame, error) {
ctx, cancel := context.WithTimeout(filter.ctx, 50*time.Millisecond)
func (f *GeneralFilter) getFrame() (*astiav.Frame, error) {
ctx, cancel := context.WithTimeout(f.ctx, 50*time.Millisecond)
defer cancel()
return filter.decoder.GetFrame(ctx)
return f.decoder.GetFrame(ctx)
}
func (filter *GeneralFilter) PutBack(frame *astiav.Frame) {
filter.buffer.Put(frame)
func (f *GeneralFilter) PutBack(frame *astiav.Frame) {
f.buffer.Put(frame)
}
func (filter *GeneralFilter) GetFrame(ctx context.Context) (*astiav.Frame, error) {
return filter.buffer.Pop(ctx)
func (f *GeneralFilter) GetFrame(ctx context.Context) (*astiav.Frame, error) {
return f.buffer.Pop(ctx)
}
func (filter *GeneralFilter) close() {
if filter.graph != nil {
filter.graph.Free()
func (f *GeneralFilter) close() {
if f.graph != nil {
f.graph.Free()
}
if filter.input != nil {
filter.input.Free()
if f.input != nil {
f.input.Free()
}
if filter.output != nil {
filter.output.Free()
if f.output != nil {
f.output.Free()
}
}
func (filter *GeneralFilter) SetBuffer(buffer buffer.BufferWithGenerator[*astiav.Frame]) {
filter.buffer = buffer
func (f *GeneralFilter) SetBuffer(buffer buffer.BufferWithGenerator[*astiav.Frame]) {
f.buffer = buffer
}
func (filter *GeneralFilter) AddToFilterContent(content string) {
filter.content += content
func (f *GeneralFilter) AddToFilterContent(content string) {
f.content += content
}
func (filter *GeneralFilter) SetFrameRate(describe CanDescribeFrameRate) {
filter.srcContextParams.SetFramerate(describe.FrameRate())
func (f *GeneralFilter) SetFrameRate(describe CanDescribeFrameRate) {
f.srcContextParams.SetFramerate(describe.FrameRate())
}
func (filter *GeneralFilter) SetTimeBase(describe CanDescribeTimeBase) {
filter.srcContextParams.SetTimeBase(describe.TimeBase())
func (f *GeneralFilter) SetTimeBase(describe CanDescribeTimeBase) {
f.srcContextParams.SetTimeBase(describe.TimeBase())
}
func (filter *GeneralFilter) SetHeight(describe CanDescribeMediaVideoFrame) {
filter.srcContextParams.SetHeight(describe.Height())
func (f *GeneralFilter) SetHeight(describe CanDescribeMediaVideoFrame) {
f.srcContextParams.SetHeight(describe.Height())
}
func (filter *GeneralFilter) SetWidth(describe CanDescribeMediaVideoFrame) {
filter.srcContextParams.SetWidth(describe.Width())
func (f *GeneralFilter) SetWidth(describe CanDescribeMediaVideoFrame) {
f.srcContextParams.SetWidth(describe.Width())
}
func (filter *GeneralFilter) SetPixelFormat(describe CanDescribeMediaVideoFrame) {
filter.srcContextParams.SetPixelFormat(describe.PixelFormat())
func (f *GeneralFilter) SetPixelFormat(describe CanDescribeMediaVideoFrame) {
f.srcContextParams.SetPixelFormat(describe.PixelFormat())
}
func (filter *GeneralFilter) SetSampleAspectRatio(describe CanDescribeMediaVideoFrame) {
filter.srcContextParams.SetSampleAspectRatio(describe.SampleAspectRatio())
func (f *GeneralFilter) SetSampleAspectRatio(describe CanDescribeMediaVideoFrame) {
f.srcContextParams.SetSampleAspectRatio(describe.SampleAspectRatio())
}
func (filter *GeneralFilter) SetColorSpace(describe CanDescribeMediaVideoFrame) {
filter.srcContextParams.SetColorSpace(describe.ColorSpace())
func (f *GeneralFilter) SetColorSpace(describe CanDescribeMediaVideoFrame) {
f.srcContextParams.SetColorSpace(describe.ColorSpace())
}
func (filter *GeneralFilter) SetColorRange(describe CanDescribeMediaVideoFrame) {
filter.srcContextParams.SetColorRange(describe.ColorRange())
func (f *GeneralFilter) SetColorRange(describe CanDescribeMediaVideoFrame) {
f.srcContextParams.SetColorRange(describe.ColorRange())
}
func (filter *GeneralFilter) SetSampleRate(describe CanDescribeMediaAudioFrame) {
filter.srcContextParams.SetSampleRate(describe.SampleRate())
func (f *GeneralFilter) SetSampleRate(describe CanDescribeMediaAudioFrame) {
f.srcContextParams.SetSampleRate(describe.SampleRate())
}
func (filter *GeneralFilter) SetSampleFormat(describe CanDescribeMediaAudioFrame) {
filter.srcContextParams.SetSampleFormat(describe.SampleFormat())
func (f *GeneralFilter) SetSampleFormat(describe CanDescribeMediaAudioFrame) {
f.srcContextParams.SetSampleFormat(describe.SampleFormat())
}
func (filter *GeneralFilter) SetChannelLayout(describe CanDescribeMediaAudioFrame) {
filter.srcContextParams.SetChannelLayout(describe.ChannelLayout())
func (f *GeneralFilter) SetChannelLayout(describe CanDescribeMediaAudioFrame) {
f.srcContextParams.SetChannelLayout(describe.ChannelLayout())
}
func (filter *GeneralFilter) MediaType() astiav.MediaType {
return filter.sinkContext.MediaType()
func (f *GeneralFilter) MediaType() astiav.MediaType {
return f.sinkContext.MediaType()
}
func (filter *GeneralFilter) FrameRate() astiav.Rational {
return filter.sinkContext.FrameRate()
func (f *GeneralFilter) FrameRate() astiav.Rational {
return f.sinkContext.FrameRate()
}
func (filter *GeneralFilter) TimeBase() astiav.Rational {
return filter.sinkContext.TimeBase()
func (f *GeneralFilter) TimeBase() astiav.Rational {
return f.sinkContext.TimeBase()
}
func (filter *GeneralFilter) Height() int {
return filter.sinkContext.Height()
func (f *GeneralFilter) Height() int {
return f.sinkContext.Height()
}
func (filter *GeneralFilter) Width() int {
return filter.sinkContext.Width()
func (f *GeneralFilter) Width() int {
return f.sinkContext.Width()
}
func (filter *GeneralFilter) PixelFormat() astiav.PixelFormat {
return filter.sinkContext.PixelFormat()
func (f *GeneralFilter) PixelFormat() astiav.PixelFormat {
return f.sinkContext.PixelFormat()
}
func (filter *GeneralFilter) SampleAspectRatio() astiav.Rational {
return filter.sinkContext.SampleAspectRatio()
func (f *GeneralFilter) SampleAspectRatio() astiav.Rational {
return f.sinkContext.SampleAspectRatio()
}
func (filter *GeneralFilter) ColorSpace() astiav.ColorSpace {
return filter.sinkContext.ColorSpace()
func (f *GeneralFilter) ColorSpace() astiav.ColorSpace {
return f.sinkContext.ColorSpace()
}
func (filter *GeneralFilter) ColorRange() astiav.ColorRange {
return filter.sinkContext.ColorRange()
func (f *GeneralFilter) ColorRange() astiav.ColorRange {
return f.sinkContext.ColorRange()
}
func (filter *GeneralFilter) SampleRate() int {
return filter.sinkContext.SampleRate()
func (f *GeneralFilter) SampleRate() int {
return f.sinkContext.SampleRate()
}
func (filter *GeneralFilter) SampleFormat() astiav.SampleFormat {
return filter.sinkContext.SampleFormat()
func (f *GeneralFilter) SampleFormat() astiav.SampleFormat {
return f.sinkContext.SampleFormat()
}
func (filter *GeneralFilter) ChannelLayout() astiav.ChannelLayout {
return filter.sinkContext.ChannelLayout()
func (f *GeneralFilter) ChannelLayout() astiav.ChannelLayout {
return f.sinkContext.ChannelLayout()
}
+1 -9
View File
@@ -4,29 +4,21 @@ package transcode
import (
"context"
"github.com/asticode/go-astiav"
"github.com/harshabose/tools/pkg/buffer"
)
type GeneralFilterBuilder struct {
producer CanProduceMediaFrame
config FilterConfig
bufsize int
pool buffer.Pool[*astiav.Frame]
options []FilterOption
fps uint8
fpsOption FilterOption
}
func NewGeneralFilterBuilder(config FilterConfig, producer CanProduceMediaFrame, bufsize int, pool buffer.Pool[*astiav.Frame], options ...FilterOption) *GeneralFilterBuilder {
func NewGeneralFilterBuilder(config FilterConfig, producer CanProduceMediaFrame, options ...FilterOption) *GeneralFilterBuilder {
return &GeneralFilterBuilder{
producer: producer,
config: config,
bufsize: bufsize,
pool: pool,
options: options,
}
}
+4 -2
View File
@@ -3,6 +3,7 @@
package transcode
import (
"context"
"fmt"
"strings"
@@ -42,13 +43,14 @@ var (
}
)
func WithFilterBuffer(size int, pool buffer.Pool[*astiav.Frame]) FilterOption {
func WithFilterBuffer(ctx context.Context, size int, pool buffer.Pool[*astiav.Frame]) FilterOption {
return func(filter Filter) error {
s, ok := filter.(CanSetBuffer[*astiav.Frame])
if !ok {
return ErrorInterfaceMismatch
}
s.SetBuffer(buffer.NewChannelBufferWithGenerator(filter.Ctx(), pool, uint(size), 1))
s.SetBuffer(buffer.NewChannelBufferWithGenerator(ctx, pool, uint(size), 1))
return nil
}
}
-5
View File
@@ -12,11 +12,6 @@ type CanAddToFilterContent interface {
AddToFilterContent(string)
}
type CanPauseUnPauseEncoder interface {
PauseEncoding() error
UnPauseEncoding() error
}
type CanGetParameterSets interface {
GetParameterSets() (sps, pps []byte, err error)
}
+4 -8
View File
@@ -100,29 +100,25 @@ type CanSetMediaPacket interface {
}
type Demuxer interface {
Ctx() context.Context
Start()
Stop()
Close()
CanProduceMediaPacket
}
type Decoder interface {
Ctx() context.Context
Start()
Stop()
Close()
CanProduceMediaFrame
}
type Filter interface {
Ctx() context.Context
Start()
Stop()
Close()
CanProduceMediaFrame
}
type Encoder interface {
Ctx() context.Context
Start()
Stop()
Close()
CanProduceMediaPacket
}
+18 -54
View File
@@ -93,18 +93,18 @@ func newSplitEncoder(encoder *GeneralEncoder, producer *dummyMediaFrameProducer)
}
}
// MultiUpdateEncoder is deprecated
type MultiUpdateEncoder struct {
encoders []*splitEncoder
active atomic.Pointer[splitEncoder]
config MultiConfig
bitrates []int64
producer CanProduceMediaFrame
ctx context.Context
cancel context.CancelFunc
paused atomic.Bool
resume chan struct{}
pauseMux sync.Mutex
once sync.Once
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
func NewMultiUpdateEncoder(ctx context.Context, config MultiConfig, builder *GeneralEncoderBuilder) (*MultiUpdateEncoder, error) {
@@ -112,15 +112,14 @@ func NewMultiUpdateEncoder(ctx context.Context, config MultiConfig, builder *Gen
return nil, err
}
ctx2, cancel := context.WithCancel(ctx)
ctx2, cancel2 := context.WithCancel(ctx)
encoder := &MultiUpdateEncoder{
encoders: make([]*splitEncoder, 0),
config: config,
bitrates: config.getBitrates(),
producer: builder.producer,
ctx: ctx2,
cancel: cancel,
resume: make(chan struct{}),
cancel: cancel2,
}
describer, ok := encoder.producer.(CanDescribeMediaFrame)
@@ -155,10 +154,6 @@ func NewMultiUpdateEncoder(ctx context.Context, config MultiConfig, builder *Gen
return encoder, nil
}
func (u *MultiUpdateEncoder) Ctx() context.Context {
return u.ctx
}
func (u *MultiUpdateEncoder) Start() {
for _, encoder := range u.encoders {
encoder.encoder.Start()
@@ -175,15 +170,17 @@ func (u *MultiUpdateEncoder) PutBack(packet *astiav.Packet) {
u.active.Load().encoder.PutBack(packet)
}
func (u *MultiUpdateEncoder) Stop() {
u.cancel()
func (u *MultiUpdateEncoder) Close() {
u.once.Do(func() {
if u.cancel != nil {
u.cancel()
}
u.wg.Wait()
})
}
func (u *MultiUpdateEncoder) AdaptBitrate(bps int64) error {
if err := u.checkPause(bps); err != nil {
return err
}
bps = u.cutoff(bps)
bestIndex := u.findBestEncoderIndex(bps)
@@ -223,42 +220,13 @@ func (u *MultiUpdateEncoder) cutoff(bps int64) int64 {
return bps
}
func (u *MultiUpdateEncoder) shouldPause(bps int64) bool {
return bps <= u.config.MinBitrate && u.config.CutVideoBelowMinBitrate
}
func (u *MultiUpdateEncoder) checkPause(bps int64) error {
shouldPause := u.shouldPause(bps)
if shouldPause {
fmt.Println("pausing video...")
return u.PauseEncoding()
}
return u.UnPauseEncoding()
}
func (u *MultiUpdateEncoder) PauseEncoding() error {
u.paused.Store(true)
return nil
}
func (u *MultiUpdateEncoder) UnPauseEncoding() error {
u.pauseMux.Lock()
defer u.pauseMux.Unlock()
if u.paused.Swap(false) {
close(u.resume)
u.resume = make(chan struct{})
}
return nil
}
func (u *MultiUpdateEncoder) GetParameterSets() (sps []byte, pps []byte, err error) {
return u.active.Load().encoder.GetParameterSets()
}
func (u *MultiUpdateEncoder) loop() {
defer u.close()
u.wg.Add(1)
defer u.wg.Done()
for {
select {
@@ -282,7 +250,7 @@ func (u *MultiUpdateEncoder) loop() {
}
func (u *MultiUpdateEncoder) getFrame() (*astiav.Frame, error) {
ctx, cancel := context.WithTimeout(u.ctx, 50*time.Millisecond)
ctx, cancel := context.WithTimeout(u.ctx, 100*time.Millisecond)
defer cancel()
return u.producer.GetFrame(ctx)
@@ -308,7 +276,3 @@ func (u *MultiUpdateEncoder) pushFrame(encoder *splitEncoder, frame *astiav.Fram
// PUT IN BUFFER
return encoder.producer.pushFrame(ctx, refFrame)
}
func (u *MultiUpdateEncoder) close() {
}
+9 -25
View File
@@ -48,11 +48,11 @@ func (t *Transcoder) Start() {
t.encoder.Start()
}
func (t *Transcoder) Stop() {
t.encoder.Stop()
t.filter.Stop()
t.decoder.Stop()
t.demuxer.Stop()
func (t *Transcoder) Close() {
t.encoder.Close()
t.filter.Close()
t.decoder.Close()
t.demuxer.Close()
}
func (t *Transcoder) GetPacket(ctx context.Context) (*astiav.Packet, error) {
@@ -63,7 +63,7 @@ func (t *Transcoder) PutBack(packet *astiav.Packet) {
t.encoder.PutBack(packet)
}
// Generate method is to satisfy mediapipe.CanGenerate interface. TODO: but I would prefer to integrate with PutBack
// Generate method is to satisfy mediapipe.CanGenerate interface.
func (t *Transcoder) Generate(ctx context.Context) (*astiav.Packet, error) {
packet, err := t.encoder.GetPacket(ctx)
if err != nil {
@@ -72,24 +72,6 @@ func (t *Transcoder) Generate(ctx context.Context) (*astiav.Packet, error) {
return packet, nil
}
func (t *Transcoder) PauseEncoding() error {
p, ok := t.encoder.(CanPauseUnPauseEncoder)
if !ok {
return ErrorInterfaceMismatch
}
return p.PauseEncoding()
}
func (t *Transcoder) UnPauseEncoding() error {
p, ok := t.encoder.(CanPauseUnPauseEncoder)
if !ok {
return ErrorInterfaceMismatch
}
return p.UnPauseEncoding()
}
func (t *Transcoder) GetParameterSets() (sps, pps []byte, err error) {
p, ok := t.encoder.(CanGetParameterSets)
if !ok {
@@ -112,9 +94,11 @@ func (t *Transcoder) AdaptBitrate(bps int64) error {
if err := u.AdaptBitrate(bps); err != nil {
return err
}
return nil
}
return nil
return ErrorInterfaceMismatch
}
func (t *Transcoder) GetCurrentBitrate() (int64, error) {
+7 -8
View File
@@ -6,8 +6,6 @@ import (
"context"
"github.com/asticode/go-astiav"
"github.com/harshabose/tools/pkg/buffer"
)
func WithGeneralDemuxer(ctx context.Context, containerAddress string, options ...DemuxerOption) TranscoderOption {
@@ -46,9 +44,9 @@ func WithGeneralFilter(ctx context.Context, filterConfig FilterConfig, options .
}
}
func WithFPSControlFilter(ctx context.Context, config FilterConfig, config2 UpdateFilterConfig, bufsize int, pool buffer.Pool[*astiav.Frame], options ...FilterOption) TranscoderOption {
func WithFPSControlFilter(ctx context.Context, config FilterConfig, config2 UpdateFilterConfig, options ...FilterOption) TranscoderOption {
return func(transcoder *Transcoder) error {
builder := NewGeneralFilterBuilder(config, transcoder.decoder, bufsize, pool, options...)
builder := NewGeneralFilterBuilder(config, transcoder.decoder, options...)
f, err := NewUpdateFilter(ctx, config2, builder, config2.InitialFPS)
if err != nil {
return err
@@ -71,9 +69,9 @@ func WithGeneralEncoder(ctx context.Context, codecID astiav.CodecID, options ...
}
}
func WithBitrateControlEncoder(ctx context.Context, codecID astiav.CodecID, bitrateControlConfig UpdateEncoderConfig, settings codecSettings, bufferSize int, pool buffer.Pool[*astiav.Packet]) TranscoderOption {
func WithBitrateControlEncoder(ctx context.Context, codecID astiav.CodecID, bitrateControlConfig UpdateEncoderConfig, settings codecSettings, options ...EncoderOption) TranscoderOption {
return func(transcoder *Transcoder) error {
builder := NewEncoderBuilder(codecID, settings, transcoder.filter, bufferSize, pool)
builder := NewEncoderBuilder(codecID, settings, transcoder.filter, options...)
updateEncoder, err := NewUpdateEncoder(ctx, bitrateControlConfig, builder)
if err != nil {
return err
@@ -84,9 +82,10 @@ func WithBitrateControlEncoder(ctx context.Context, codecID astiav.CodecID, bitr
}
}
func WithMultiEncoderBitrateControl(ctx context.Context, codecID astiav.CodecID, config MultiConfig, settings codecSettings, bufferSize int, pool buffer.Pool[*astiav.Packet]) TranscoderOption {
// WithMultiEncoderBitrateControl deprecated
func WithMultiEncoderBitrateControl(ctx context.Context, codecID astiav.CodecID, config MultiConfig, settings codecSettings, options ...EncoderOption) TranscoderOption {
return func(transcoder *Transcoder) error {
builder := NewEncoderBuilder(codecID, settings, transcoder.filter, bufferSize, pool)
builder := NewEncoderBuilder(codecID, settings, transcoder.filter, options...)
multiEncoder, err := NewMultiUpdateEncoder(ctx, config, builder)
if err != nil {
return err
+43 -118
View File
@@ -7,17 +7,19 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/asticode/go-astiav"
"github.com/harshabose/tools/pkg/buffer"
"github.com/harshabose/tools/pkg/cond"
)
var (
ErrUpdateEncoderNotReady = errors.New("update encoder not in ready state")
)
type UpdateEncoderConfig struct {
MaxBitrate, MinBitrate int64
CutVideoBelowMinBitrate bool
MaxBitrate, MinBitrate int64
MinBitrateChangePercentage float64
}
func (c UpdateEncoderConfig) validate() error {
@@ -32,21 +34,16 @@ type UpdateEncoder struct {
encoder Encoder
config UpdateEncoderConfig
builder *GeneralEncoderBuilder
buffer buffer.BufferWithGenerator[*astiav.Packet]
mux sync.RWMutex
ctx context.Context
paused atomic.Bool
resume chan struct{}
pauseMux sync.Mutex
cond *cond.ContextCond
ctx context.Context
}
func NewUpdateEncoder(ctx context.Context, config UpdateEncoderConfig, builder *GeneralEncoderBuilder) (*UpdateEncoder, error) {
updater := &UpdateEncoder{
config: config,
builder: builder,
resume: make(chan struct{}),
buffer: buffer.NewChannelBufferWithGenerator(ctx, buffer.CreatePacketPool(), 30, 1),
cond: cond.NewContextCond(&sync.Mutex{}),
ctx: ctx,
}
@@ -61,50 +58,52 @@ func NewUpdateEncoder(ctx context.Context, config UpdateEncoderConfig, builder *
updater.encoder = encoder
go updater.loop()
return updater, nil
}
func (u *UpdateEncoder) Ctx() context.Context {
u.mux.Lock()
defer u.mux.Unlock()
return u.encoder.Ctx()
}
func (u *UpdateEncoder) Start() {
u.mux.Lock()
defer u.mux.Unlock()
u.cond.L.Lock()
defer u.cond.L.Unlock()
u.encoder.Start()
}
func (u *UpdateEncoder) GetPacket(ctx context.Context) (*astiav.Packet, error) {
return u.buffer.Pop(ctx)
u.cond.L.Lock()
defer u.cond.L.Unlock()
for {
if u.encoder == nil {
if err := u.cond.Wait(ctx); err != nil {
return nil, ErrUpdateEncoderNotReady
}
continue
}
p, err := u.encoder.GetPacket(ctx)
if err != nil {
return nil, err
}
return p, nil
}
}
func (u *UpdateEncoder) PutBack(packet *astiav.Packet) {
u.mux.RLock()
defer u.mux.RUnlock()
u.cond.L.Lock()
defer u.cond.L.Unlock()
u.encoder.PutBack(packet)
}
func (u *UpdateEncoder) Stop() {
u.mux.Lock()
defer u.mux.Unlock()
func (u *UpdateEncoder) Close() {
u.cond.L.Lock()
defer u.cond.L.Unlock()
u.encoder.Stop()
u.encoder.Close()
}
// AdaptToBitrate modifies the encoder's target bitrate to the specified value in bits per second.
// Returns an error if the update fails.
func (u *UpdateEncoder) AdaptBitrate(bps int64) error {
if err := u.checkPause(bps); err != nil {
return err
}
bps = u.cutoff(bps)
g, ok := u.encoder.(CanGetCurrentBitrate)
@@ -118,7 +117,7 @@ func (u *UpdateEncoder) AdaptBitrate(bps int64) error {
}
_, change := calculateBitrateChange(current, bps)
if change < 5 {
if change < u.config.MinBitrateChangePercentage {
return nil
}
@@ -128,18 +127,20 @@ func (u *UpdateEncoder) AdaptBitrate(bps int64) error {
newEncoder, err := u.builder.Build(u.ctx)
if err != nil {
return fmt.Errorf("build new encoder: %w", err)
return err
}
newEncoder.Start()
u.mux.Lock()
u.cond.L.Lock()
oldEncoder := u.encoder
u.encoder = newEncoder
u.mux.Unlock()
u.cond.L.Unlock()
u.cond.Broadcast()
if oldEncoder != nil {
oldEncoder.Stop()
oldEncoder.Close()
}
return nil
@@ -157,36 +158,6 @@ func (u *UpdateEncoder) cutoff(bps int64) int64 {
return bps
}
func (u *UpdateEncoder) shouldPause(bps int64) bool {
return bps <= u.config.MinBitrate && u.config.CutVideoBelowMinBitrate
}
func (u *UpdateEncoder) checkPause(bps int64) error {
shouldPause := u.shouldPause(bps)
if shouldPause {
fmt.Println("pausing video...")
return u.PauseEncoding()
}
return u.UnPauseEncoding()
}
func (u *UpdateEncoder) PauseEncoding() error {
u.paused.Store(true)
return nil
}
func (u *UpdateEncoder) UnPauseEncoding() error {
u.pauseMux.Lock()
defer u.pauseMux.Unlock()
if u.paused.Swap(false) {
close(u.resume)
u.resume = make(chan struct{})
}
return nil
}
func (u *UpdateEncoder) GetParameterSets() (sps []byte, pps []byte, err error) {
p, ok := u.encoder.(CanGetParameterSets)
if !ok {
@@ -208,49 +179,3 @@ func calculateBitrateChange(currentBps, newBps int64) (absoluteChange int64, per
return absoluteChange, percentageChange
}
func (u *UpdateEncoder) getPacket() (*astiav.Packet, error) {
u.mux.RLock()
defer u.mux.RUnlock()
if u.encoder != nil {
ctx, cancel := context.WithTimeout(u.ctx, 100*time.Millisecond) // approx 2*fps
defer cancel()
p, err := u.encoder.GetPacket(ctx)
if err != nil {
return nil, err
}
return p, nil
}
return nil, errors.New("encoder is nil")
}
func (u *UpdateEncoder) pushPacket(p *astiav.Packet) error {
if p == nil {
return nil
}
ctx, cancel := context.WithTimeout(u.ctx, 100*time.Millisecond)
defer cancel()
return u.buffer.Push(ctx, p)
}
func (u *UpdateEncoder) loop() {
for {
select {
case <-u.ctx.Done():
return
default:
p, err := u.getPacket()
if err != nil {
fmt.Println("error getting packet from encoder; err:", err.Error())
continue
}
if err := u.pushPacket(p); err != nil {
fmt.Println(err.Error())
}
}
}
}
+64 -102
View File
@@ -7,11 +7,14 @@ import (
"errors"
"fmt"
"sync"
"time"
"github.com/asticode/go-astiav"
"github.com/harshabose/tools/pkg/buffer"
"github.com/harshabose/tools/pkg/cond"
)
var (
ErrUpdateFilterNotReady = errors.New("update filter not in ready state")
)
type UpdateFilterConfig struct {
@@ -33,94 +36,90 @@ type UpdateFilter struct {
config UpdateFilterConfig
builder *GeneralFilterBuilder
buffer buffer.BufferWithGenerator[*astiav.Frame]
mux sync.RWMutex
// wg sync.WaitGroup // TODO: ADD CLOSE METHODS ON TRANSCODER ELEMENTS
ctx context.Context
// cancel context.CancelFunc
cond *cond.ContextCond
ctx context.Context
}
func (f *UpdateFilter) MediaType() astiav.MediaType {
f.mux.RLock()
defer f.mux.RUnlock()
f.cond.L.Lock()
defer f.cond.L.Unlock()
return f.filter.(CanDescribeMediaFrame).MediaType()
}
func (f *UpdateFilter) FrameRate() astiav.Rational {
f.mux.RLock()
defer f.mux.RUnlock()
f.cond.L.Lock()
defer f.cond.L.Unlock()
return f.filter.(CanDescribeMediaFrame).FrameRate()
}
func (f *UpdateFilter) TimeBase() astiav.Rational {
f.mux.RLock()
defer f.mux.RUnlock()
f.cond.L.Lock()
defer f.cond.L.Unlock()
return f.filter.(CanDescribeMediaFrame).TimeBase()
}
func (f *UpdateFilter) Height() int {
f.mux.RLock()
defer f.mux.RUnlock()
f.cond.L.Lock()
defer f.cond.L.Unlock()
return f.filter.(CanDescribeMediaFrame).Height()
}
func (f *UpdateFilter) Width() int {
f.mux.RLock()
defer f.mux.RUnlock()
f.cond.L.Lock()
defer f.cond.L.Unlock()
return f.filter.(CanDescribeMediaFrame).Width()
}
func (f *UpdateFilter) PixelFormat() astiav.PixelFormat {
f.mux.RLock()
defer f.mux.RUnlock()
f.cond.L.Lock()
defer f.cond.L.Unlock()
return f.filter.(CanDescribeMediaFrame).PixelFormat()
}
func (f *UpdateFilter) SampleAspectRatio() astiav.Rational {
f.mux.RLock()
defer f.mux.RUnlock()
f.cond.L.Lock()
defer f.cond.L.Unlock()
return f.filter.(CanDescribeMediaFrame).SampleAspectRatio()
}
func (f *UpdateFilter) ColorSpace() astiav.ColorSpace {
f.mux.RLock()
defer f.mux.RUnlock()
f.cond.L.Lock()
defer f.cond.L.Unlock()
return f.filter.(CanDescribeMediaFrame).ColorSpace()
}
func (f *UpdateFilter) ColorRange() astiav.ColorRange {
f.mux.RLock()
defer f.mux.RUnlock()
f.cond.L.Lock()
defer f.cond.L.Unlock()
return f.filter.(CanDescribeMediaFrame).ColorRange()
}
func (f *UpdateFilter) SampleRate() int {
f.mux.RLock()
defer f.mux.RUnlock()
f.cond.L.Lock()
defer f.cond.L.Unlock()
return f.filter.(CanDescribeMediaFrame).SampleRate()
}
func (f *UpdateFilter) SampleFormat() astiav.SampleFormat {
f.mux.RLock()
defer f.mux.RUnlock()
f.cond.L.Lock()
defer f.cond.L.Unlock()
return f.filter.(CanDescribeMediaFrame).SampleFormat()
}
func (f *UpdateFilter) ChannelLayout() astiav.ChannelLayout {
f.mux.RLock()
defer f.mux.RUnlock()
f.cond.L.Lock()
defer f.cond.L.Unlock()
return f.filter.(CanDescribeMediaFrame).ChannelLayout()
}
@@ -143,43 +142,54 @@ func NewUpdateFilter(ctx context.Context, config UpdateFilterConfig, builder *Ge
filter: f,
config: config,
builder: builder,
buffer: buffer.NewChannelBufferWithGenerator(ctx, buffer.CreateFramePool(), 30, 1), // TODO: CHANGE THIS ASAP
mux: sync.RWMutex{},
cond: cond.NewContextCond(&sync.Mutex{}),
ctx: ctx,
}
go filter.loop()
return filter, nil
}
func (f *UpdateFilter) Ctx() context.Context {
return f.ctx
}
func (f *UpdateFilter) Start() {
f.mux.RLock()
defer f.mux.RUnlock()
f.cond.L.Lock()
defer f.cond.L.Unlock()
f.filter.Start()
}
func (f *UpdateFilter) GetFrame(ctx context.Context) (*astiav.Frame, error) {
return f.buffer.Pop(ctx)
f.cond.L.Lock()
defer f.cond.L.Unlock()
for {
if f.filter == nil {
if err := f.cond.Wait(ctx); err != nil {
return nil, ErrUpdateFilterNotReady
}
continue
}
frame, err := f.filter.GetFrame(ctx)
if err != nil {
return nil, err
}
return frame, nil
}
}
func (f *UpdateFilter) PutBack(frame *astiav.Frame) {
f.mux.RLock()
defer f.mux.RUnlock()
f.cond.L.Lock()
defer f.cond.L.Unlock()
f.filter.PutBack(frame)
}
func (f *UpdateFilter) Stop() {
f.mux.Lock()
defer f.mux.Unlock()
func (f *UpdateFilter) Close() {
f.cond.L.Lock()
defer f.cond.L.Unlock()
f.filter.Stop()
f.filter.Close()
}
func (f *UpdateFilter) AdaptBitrate(bps int64) error {
@@ -199,13 +209,15 @@ func (f *UpdateFilter) AdaptBitrate(bps int64) error {
nf.Start()
f.mux.Lock()
f.cond.L.Lock()
old := f.filter
f.filter = nf
f.mux.Unlock()
f.cond.L.Unlock()
f.cond.Broadcast()
if old != nil {
old.Stop()
old.Close()
}
return nil
@@ -214,53 +226,3 @@ func (f *UpdateFilter) AdaptBitrate(bps int64) error {
func (f *UpdateFilter) GetCurrentFPS() (uint8, error) {
return f.builder.GetCurrentFPS()
}
func (f *UpdateFilter) getFrame() (*astiav.Frame, error) {
f.mux.RLock()
defer f.mux.RUnlock()
if f.filter != nil {
ctx2, cancel2 := context.WithTimeout(f.ctx, 100*time.Millisecond)
defer cancel2()
frame, err := f.filter.GetFrame(ctx2)
if err != nil {
return nil, err
}
return frame, nil
}
return nil, errors.New("filter is nil")
}
func (f *UpdateFilter) pushFrame(frame *astiav.Frame) error {
if frame == nil {
return nil
}
ctx2, cancel2 := context.WithTimeout(f.ctx, 100*time.Millisecond)
defer cancel2()
return f.buffer.Push(ctx2, frame)
}
func (f *UpdateFilter) loop() {
for {
select {
case <-f.ctx.Done():
return
default:
frame, err := f.getFrame()
if err != nil {
fmt.Printf("error getting frame from update filter; err=%v\n", err)
continue
}
if err := f.pushFrame(frame); err != nil {
fmt.Printf("error pushing frame in update filter; err=%v\n", err)
continue
}
}
}
}
+1 -3
View File
@@ -68,7 +68,7 @@ func (g *StatsGetter) loop1(interval time.Duration) {
}
}
func (g *StatsGetter) Close() error {
func (g *StatsGetter) Close() {
g.once.Do(func() {
if g.cancel != nil {
g.cancel()
@@ -76,8 +76,6 @@ func (g *StatsGetter) Close() error {
g.wg.Wait()
})
return nil
}
func (g *StatsGetter) Generate(pc *PeerConnection) Stat {