OutputQueue for output

Output side counterpart to the StreamBuffer. It is not exactly the
same solution, because input and output characteristics are quite
different (for one, on the input side stream is just a series of
bytes, whereas on the output side we are able to tell packets
apart, assign timestamps, packet types, etc).

There is simple Golang code for writing the data into the files,
so that some tests will pass with this code. Note that the big
problem with the queue output is that some muxers (for example
mp4) need to be able to seek() in output, and we don't allow that.
This commit is contained in:
Michal Adamczak
2022-07-24 13:30:17 +02:00
parent 8a2f6e7773
commit c74c5c69a3
8 changed files with 345 additions and 9 deletions
+42 -5
View File
@@ -252,12 +252,16 @@ video_encoder_err:
static void free_output_no_trailer(struct output_ctx *octx, enum FreeOutputPolicy policy)
{
if (octx->oc) {
if (!(octx->oc->oformat->flags & AVFMT_NOFILE) && octx->oc->pb) {
// we check against AVFMT_FLAG_CUSTOM_IO to avoid trying to close file
// in case we are using custom i/o - this would cause crash
if (!(octx->oc->oformat->flags & AVFMT_NOFILE) &&
!(octx->oc->flags & AVFMT_FLAG_CUSTOM_IO) && octx->oc->pb) {
avio_closep(&octx->oc->pb);
}
avformat_free_context(octx->oc);
octx->oc = NULL;
}
queue_push_staging(&octx->write_context, END_OF_OUTPUT, -1);
if (octx->vc &&
((octx->hw_type == AV_HWDEVICE_TYPE_NONE) || (FORCE_CLOSE_HW_ENCODER == policy))) {
avcodec_free_context(&octx->vc);
@@ -295,7 +299,7 @@ void free_output(struct output_ctx *octx, enum FreeOutputPolicy policy)
free_output_no_trailer(octx, policy);
}
int open_output(struct output_ctx *octx, struct input_ctx *ictx)
int open_output(struct output_ctx *octx, struct input_ctx *ictx, OutputQueue *queue)
{
int ret = 0;
@@ -351,8 +355,27 @@ int open_output(struct output_ctx *octx, struct input_ctx *ictx)
// Muxer headers can be written now once streams were added
if (!(fmt->flags & AVFMT_NOFILE)) {
ret = avio_open(&octx->oc->pb, octx->fname, AVIO_FLAG_WRITE);
if (ret < 0) LPMS_ERR(open_output_err, "Error opening output file");
if (queue) {
// output through queue
ret = queue_setup_as_output(queue, &octx->write_context, octx->oc);
if (ret < 0) LPMS_ERR(open_output_err, "Error setting up output queue");
// make sure muxer options are compatible with queue output
// TODO: not sure if that is the best option for detecting a container
// type but it is surprisingly hard to find guidance on that
if (fmt->mime_type && !strcmp("video/mp4", fmt->mime_type)) {
// Default configuration of MP4 muxer needs seekable output, which
// the queue is not able to provide. Passing the following flags removes
// seekable requirement. This is also configuration recommended for
// streaming purposes, so it seems better suited anyway (the whole point
// with queues is to provide Low Latency/streaming support)
ret = av_dict_set(&octx->muxer->opts, "movflags", "frag_keyframe+empty_moov", 0);
if (ret < 0) LPMS_ERR(open_output_err, "Error setting movflags for fragmented output");
}
} else {
// normal file output
ret = avio_open(&octx->oc->pb, octx->fname, AVIO_FLAG_WRITE);
if (ret < 0) LPMS_ERR(open_output_err, "Error opening output file");
}
}
// IMPORTANT: notice how up to and including this point open_output_err is
@@ -364,6 +387,10 @@ int open_output(struct output_ctx *octx, struct input_ctx *ictx)
// call free_output_no_trailer() exclusively!
ret = avformat_write_header(octx->oc, &octx->muxer->opts);
if (ret < 0) LPMS_ERR(open_output_err, "Error writing header");
// flush headers
// ret = av_interleaved_write_frame(octx->oc, NULL);
if (ret < 0) LPMS_ERR(open_output_err, "Error flushing headers");
queue_push_staging(&octx->write_context, BEGIN_OF_OUTPUT, 0);
// From now on it is normal free_output(), hence after_header error label
if(octx->sfilters != NULL && needs_decoder(octx->video->name) && octx->sf.active == 0) {
@@ -431,6 +458,8 @@ encode_cleanup:
int mux(AVPacket *pkt, AVRational tb, struct output_ctx *octx, AVStream *ost)
{
int ret;
int64_t pts = pkt->pts;
pkt->stream_index = ost->index;
if (av_cmp_q(tb, ost->time_base)) {
av_packet_rescale_ts(pkt, tb, ost->time_base);
@@ -481,7 +510,15 @@ int mux(AVPacket *pkt, AVRational tb, struct output_ctx *octx, AVStream *ost)
octx->last_video_dts = pkt->dts;
}
return av_interleaved_write_frame(octx->oc, pkt);
// make sure correct timestamp will get carried through to output_queue
ret = av_interleaved_write_frame(octx->oc, pkt);
if (0 > ret) return ret;
// this means "flush output", we want to do it so that output_queue will get
// properly associated packets and timestamps
ret = av_interleaved_write_frame(octx->oc, NULL);
if (0 > ret) return ret;
queue_push_staging(&octx->write_context, PACKET_OUTPUT, pts);
return 0;
}
static int getmetadatainf(AVFrame *inf, struct output_ctx *octx)
+2 -1
View File
@@ -4,13 +4,14 @@
#include "decoder.h"
#include "transcoder.h"
#include "filter.h"
#include "output_queue.h"
enum FreeOutputPolicy {
FORCE_CLOSE_HW_ENCODER,
PRESERVE_HW_ENCODER
};
int open_output(struct output_ctx *octx, struct input_ctx *ictx);
int open_output(struct output_ctx *octx, struct input_ctx *ictx, OutputQueue *queue);
void free_output(struct output_ctx *octx, enum FreeOutputPolicy);
int process_out(struct input_ctx *ictx, struct output_ctx *octx, AVCodecContext *encoder, AVStream *ost,
struct filter_ctx *filter, AVFrame *inf);
+51
View File
@@ -636,6 +636,47 @@ func loadInputBuffer(t *Transcoder, input *TranscodeOptionsIn) {
}
}
func storeOutputQueue(t *Transcoder, outputs []TranscodeOptions) (error) {
var fds = make([]*os.File, len(outputs))
for i := range fds {
file, err := os.Create(outputs[i].Oname)
if nil != err {
return err
}
fds[i] = file
}
for {
// get next output packet
packet := C.lpms_transcode_peek_packet(t.handle)
if 8 == packet.flags {
break
}
// this is a data packet, write it to file
data := C.GoBytes(unsafe.Pointer(packet.data), packet.size)
bytes, err := fds[packet.index].Write(data)
if nil != err {
return err
}
if bytes != int(packet.size) {
panic("storeOutputQueue couldn't write all bytes error")
}
// pop data packet
C.lpms_transcode_pop_packet(t.handle)
}
// if we are here, we just have terminating packet, remove it
// (terminating packet carries no data, it is added there to signify
// the end of all input)
C.lpms_transcode_pop_packet(t.handle)
// Close all the open files
for i := range fds {
if nil != fds[i] {
fds[i].Close()
}
}
// Success
return nil
}
// create C output params array and return it along with corresponding finalizer
// function that makes sure there are no C memory leaks
@@ -961,6 +1002,16 @@ func (t *Transcoder) Transcode(input *TranscodeOptionsIn, ps []TranscodeOptions)
}
loadInputBuffer(t, input)
ret := int(C.lpms_transcode(inp, paramsPointer, resultsPointer, C.int(len(params)), decoded))
// be careful to use storeOutputQueue to fake lpms_transcode so that the test
// reacts properly
if (ret == 0) {
err = storeOutputQueue(t, ps)
if nil != err {
// fake output error
ret = C.AVERROR_STREAM_NOT_FOUND
}
}
if ret != 0 {
if LogTranscodeErrors {
glog.Error("Transcoder Return : ", ErrorMap[ret])
+4
View File
@@ -3,6 +3,7 @@
#include <libavfilter/avfilter.h>
#include "decoder.h"
#include "output_queue.h"
struct filter_ctx {
int active;
@@ -36,6 +37,7 @@ struct filter_ctx {
int flushing;
};
// TODO move this away, this ain't filter
struct output_ctx {
char *fname; // required output file name
char *vfilters; // required output video filters
@@ -74,6 +76,8 @@ struct output_ctx {
output_results *res; // data to return for this output
char *xcoderParams;
WriteContext write_context;
};
int init_video_filters(struct input_ctx *ictx, struct output_ctx *octx);
+158
View File
@@ -0,0 +1,158 @@
#include "output_queue.h"
void queue_create(OutputQueue *queue)
{
pthread_mutex_init(&queue->mutex, NULL);
pthread_cond_init(&queue->condition, NULL);
queue->front = queue->back = NULL;
}
void queue_destroy(OutputQueue *queue)
{
pthread_mutex_destroy(&queue->mutex);
pthread_cond_destroy(&queue->condition);
queue_reset(queue);
}
static int queue_write_function(void *user_data, uint8_t *buf, int buf_size)
{
WriteContext *wctx = (WriteContext *)user_data;
// Prepare packet
OutputPacket *packet = (OutputPacket *)malloc(sizeof(OutputPacket));
if (!packet) return -1;
packet->data = (uint8_t *)malloc(buf_size);
if (!packet->data) {
free(packet);
return -1;
}
memcpy(packet->data, buf, buf_size);
packet->size = buf_size;
packet->index = wctx->index;
packet->next = NULL;
// Important - we are not adding to the queue now. This is because we don't
// know which flags to assign yet - for example, we can't assign END_OF_OUTPUT
// because we don't know if we are at last packet or not. Instead, packets are
// added to the staging area and queue_push_staging will be used to add them
// to the queue after muxing operation finishes.
if (wctx->staging_back) {
// not the first packet
wctx->staging_back->next = packet;
wctx->staging_back = packet;
} else {
// first packet
wctx->staging_front = wctx->staging_back = packet;
}
return buf_size;
}
int queue_setup_as_output(OutputQueue *queue, WriteContext *wctx, AVFormatContext *ctx)
{
wctx->queue = queue;
wctx->staging_front = wctx->staging_back = NULL;
// IMPORTANT: I am not sure if ffmpeg documentation states that explicitly,
// but the memory of ctx->pb as well as its io_buffer seem to be released when
// ctx will get closed. I tried otherwise and got "double free" errors
#define BUFFER_SIZE 4096
void *io_buffer = av_malloc(BUFFER_SIZE);
if (!io_buffer) return -1;
ctx->pb = avio_alloc_context(
io_buffer, BUFFER_SIZE, // buffer and size
1, // write allowed
wctx, // pass write context as user data
NULL, // no read function supplied
queue_write_function,
NULL); // no seek function supplied
if (!ctx->pb) return -1;
ctx->flags |= AVFMT_FLAG_CUSTOM_IO | AVFMT_FLAG_FLUSH_PACKETS;
return 0;
}
void queue_reset(OutputQueue *queue)
{
while (queue->front) {
OutputPacket *tmp = queue->front;
queue->front = queue->front->next;
if (tmp->data) free(tmp->data);
free(tmp);
}
queue->back = NULL;
}
const OutputPacket *queue_peek_front(OutputQueue *queue)
{
OutputPacket *tmp;
pthread_mutex_lock(&queue->mutex);
while (!queue->front) {
// wait until there is packet in the buffer
pthread_cond_wait(&queue->condition, &queue->mutex);
}
tmp = queue->front;
pthread_mutex_unlock(&queue->mutex);
return tmp;
}
void queue_pop_front(OutputQueue *queue)
{
OutputPacket *tmp;
pthread_mutex_lock(&queue->mutex);
while (!queue->front) {
// wait until there is packet in the buffer
pthread_cond_wait(&queue->condition, &queue->mutex);
}
tmp = queue->front;
queue->front = queue->front->next;
if (!queue->front) queue->back = NULL;
pthread_mutex_unlock(&queue->mutex);
if (tmp->data) free(tmp->data);
free(tmp);
}
void queue_push_staging(WriteContext *wctx, PacketFlags flags, int64_t timestamp)
{
// iterate over staging area setting flags and timestamps
OutputPacket *packet = wctx->staging_front;
// Make sure that END_OF_OUTPUT only gets assigned to the last packet
// this is because the caller knows all packets are emitted, but it
// doesn't know how many of them
PacketFlags safe_flags = flags & ~END_OF_OUTPUT;
if (!packet) return; // nothing to do
while (packet) {
packet->flags = packet->next ? safe_flags : flags;
packet->timestamp = timestamp;
packet = packet->next;
}
// move staging area into queue
pthread_mutex_lock(&wctx->queue->mutex);
if (wctx->queue->back) {
// not empty queue
wctx->queue->back->next = wctx->staging_front;
wctx->queue->back = wctx->staging_back;
} else {
// empty queue
wctx->queue->front = wctx->staging_front;
wctx->queue->back = wctx->staging_back;
}
wctx->staging_front = wctx->staging_back = NULL;
pthread_mutex_unlock(&wctx->queue->mutex);
pthread_cond_signal(&wctx->queue->condition);
}
int queue_push_end(OutputQueue *queue)
{
OutputPacket *packet = (OutputPacket *)malloc(sizeof(OutputPacket));
if (!packet) return -1;
packet->size = 0;
packet->data = NULL;
packet->timestamp = -1;
packet->flags = END_OF_ALL_OUTPUTS;
pthread_mutex_lock(&queue->mutex);
if (queue->back) {
queue->back->next = packet;
queue->back = packet;
} else {
queue->front = queue->back = packet;
}
pthread_mutex_unlock(&queue->mutex);
pthread_cond_signal(&queue->condition);
return 0;
}
+57
View File
@@ -0,0 +1,57 @@
#ifndef _LPMS_OUTPUT_QUEUE_H_
#define _LPMS_OUTPUT_QUEUE_H_
#include <libavformat/avformat.h>
#include <libavutil/thread.h>
typedef enum {
BEGIN_OF_OUTPUT = 0x1, // before first packet is muxed - headers, etc
// (these packets will have timestamps of -1)
PACKET_OUTPUT = 0x2, // data packet - has valid timestamp
END_OF_OUTPUT = 0x4, // end of current stream (trailers, also ts == -1)
END_OF_ALL_OUTPUTS = 0x8 // very last packet, no data beyond
} PacketFlags;
typedef struct _OutputPacket {
struct _OutputPacket *next;
uint8_t *data;
int size;
int index;
PacketFlags flags;
int64_t timestamp;
} OutputPacket;
typedef struct {
// These are called "pthread", but FFmpeg also has Windows implementation,
// so we should be safe on all reasonable platforms
pthread_cond_t condition;
pthread_mutex_t mutex;
OutputPacket *front;
OutputPacket *back;
} OutputQueue;
typedef struct {
// Queue to use
OutputQueue *queue;
int index;
// Staging area for packets
OutputPacket *staging_front;
OutputPacket *staging_back;
} WriteContext;
// NOT THREAD SAFE
void queue_create(OutputQueue *queue);
void queue_destroy(OutputQueue *queue);
// setup glue logic to allow ctx to use queue as output
int queue_setup_as_output(OutputQueue *queue, WriteContext *wctx, AVFormatContext *ctx);
void queue_reset(OutputQueue *queue);
// THREAD SAFE
const OutputPacket *queue_peek_front(OutputQueue *queue);
void queue_pop_front(OutputQueue *queue);
void queue_push_staging(WriteContext *wctx, PacketFlags flags, int64_t timestamp);
int queue_push_end(OutputQueue *queue);
#endif
+26 -3
View File
@@ -4,6 +4,7 @@
#include "encoder.h"
#include "logging.h"
#include "stream_buffer.h"
#include "output_queue.h"
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
@@ -81,7 +82,9 @@ struct transcode_thread {
// Input buffer - when I/O is done outside of transcoder, for example in
// Low Latency scenarios
StreamBuffer input_buffer;
int use_buffer_for_input;
int use_buffer_for_input; // TODO: name it "use custom output" or some such
// Output
OutputQueue output_queue;
};
// TODO: this feels like it belongs elsewhere, not in the top-level transcoder
@@ -647,7 +650,7 @@ int lpms_transcode(input_params *inp, output_params *params,
if (!needs_decoder(params[i].audio.name)) h->ictx.da = ++decode_a == nb_outputs;
}
ret = open_input(inp, &h->ictx, h->use_buffer_for_input ? &h->input_buffer : 0);
ret = open_input(inp, &h->ictx, h->use_buffer_for_input ? &h->input_buffer : NULL);
if (ret < 0) LPMS_ERR(transcode_cleanup, "Unable to open input");
// populate output contexts
@@ -680,8 +683,9 @@ int lpms_transcode(input_params *inp, output_params *params,
octx->dv = h->ictx.vi < 0 || is_drop(octx->video->name);
octx->da = h->ictx.ai < 0 || is_drop(octx->audio->name);
octx->res = &results[i];
octx->write_context.index = i;
ret = open_output(octx, &h->ictx);
ret = open_output(octx, &h->ictx, h->use_buffer_for_input ? &h->output_queue : NULL);
if (ret < 0) LPMS_ERR(transcode_cleanup, "Unable to open output");
}
@@ -695,6 +699,10 @@ int lpms_transcode(input_params *inp, output_params *params,
// Part IV: shutdown
transcode_cleanup:
if (h->use_buffer_for_input) {
// terminate output queue
queue_push_end(&h->output_queue);
}
// IMPORTANT: note that this is the only place when PRESERVE_HW_ENCODER and
// PRESERVE_HW_DECODER are used. This is done to retain HW encoder and decoder
@@ -754,11 +762,14 @@ struct transcode_thread* lpms_transcode_new(lvpdnn_opts *dnn_opts)
free(h);
return NULL;
}
queue_create(&h->output_queue);
// handle dnn filter graph creation
if (dnn_opts) {
AVFilterGraph *filtergraph = create_dnn_filtergraph(dnn_opts);
if (!filtergraph) {
buffer_destroy(&h->input_buffer);
queue_destroy(&h->output_queue);
free(h);
h = NULL;
} else {
@@ -780,6 +791,7 @@ void lpms_transcode_push_reset(struct transcode_thread *handle, int on)
{
if (!handle) return;
buffer_reset(&handle->input_buffer);
queue_reset(&handle->output_queue);
handle->use_buffer_for_input = on;
}
@@ -801,3 +813,14 @@ void lpms_transcode_push_error(struct transcode_thread *handle, int code)
buffer_error(&handle->input_buffer, code);
}
const OutputPacket *lpms_transcode_peek_packet(struct transcode_thread *handle)
{
if (!handle) return NULL;
return queue_peek_front(&handle->output_queue);
}
void lpms_transcode_pop_packet(struct transcode_thread *handle)
{
if (!handle) return;
return queue_pop_front(&handle->output_queue);
}
+5
View File
@@ -7,6 +7,7 @@
#include <libavformat/avformat.h>
#include <libavfilter/avfilter.h>
#include "logging.h"
#include "output_queue.h"
// LPMS specific errors
extern const int lpms_ERR_INPUT_PIXFMT;
@@ -101,9 +102,13 @@ int lpms_transcode(input_params *inp, output_params *params, output_results *res
struct transcode_thread* lpms_transcode_new(lvpdnn_opts *dnn_opts);
void lpms_transcode_stop(struct transcode_thread* handle);
void lpms_transcode_discontinuity(struct transcode_thread *handle);
// LL interface for input
void lpms_transcode_push_reset(struct transcode_thread *handle, int on);
void lpms_transcode_push_bytes(struct transcode_thread* handle, uint8_t *bytes, int size);
void lpms_transcode_push_eof(struct transcode_thread *handle);
void lpms_transcode_push_error(struct transcode_thread *handle, int code);
// LL interface for output
const OutputPacket *lpms_transcode_peek_packet(struct transcode_thread *handle);
void lpms_transcode_pop_packet(struct transcode_thread *handle);
#endif // _LPMS_TRANSCODER_H_