diff --git a/ffmpeg/encoder.c b/ffmpeg/encoder.c index f951766f03..5ba1dffbbb 100755 --- a/ffmpeg/encoder.c +++ b/ffmpeg/encoder.c @@ -252,12 +252,16 @@ video_encoder_err: static void free_output_no_trailer(struct output_ctx *octx, enum FreeOutputPolicy policy) { if (octx->oc) { - if (!(octx->oc->oformat->flags & AVFMT_NOFILE) && octx->oc->pb) { + // we check against AVFMT_FLAG_CUSTOM_IO to avoid trying to close file + // in case we are using custom i/o - this would cause crash + if (!(octx->oc->oformat->flags & AVFMT_NOFILE) && + !(octx->oc->flags & AVFMT_FLAG_CUSTOM_IO) && octx->oc->pb) { avio_closep(&octx->oc->pb); } avformat_free_context(octx->oc); octx->oc = NULL; } + queue_push_staging(&octx->write_context, END_OF_OUTPUT, -1); if (octx->vc && ((octx->hw_type == AV_HWDEVICE_TYPE_NONE) || (FORCE_CLOSE_HW_ENCODER == policy))) { avcodec_free_context(&octx->vc); @@ -295,7 +299,7 @@ void free_output(struct output_ctx *octx, enum FreeOutputPolicy policy) free_output_no_trailer(octx, policy); } -int open_output(struct output_ctx *octx, struct input_ctx *ictx) +int open_output(struct output_ctx *octx, struct input_ctx *ictx, OutputQueue *queue) { int ret = 0; @@ -351,8 +355,27 @@ int open_output(struct output_ctx *octx, struct input_ctx *ictx) // Muxer headers can be written now once streams were added if (!(fmt->flags & AVFMT_NOFILE)) { - ret = avio_open(&octx->oc->pb, octx->fname, AVIO_FLAG_WRITE); - if (ret < 0) LPMS_ERR(open_output_err, "Error opening output file"); + if (queue) { + // output through queue + ret = queue_setup_as_output(queue, &octx->write_context, octx->oc); + if (ret < 0) LPMS_ERR(open_output_err, "Error setting up output queue"); + // make sure muxer options are compatible with queue output + // TODO: not sure if that is the best option for detecting a container + // type but it is surprisingly hard to find guidance on that + if (fmt->mime_type && !strcmp("video/mp4", fmt->mime_type)) { + // Default configuration of MP4 muxer needs seekable output, which + // the queue is not able to provide. Passing the following flags removes + // seekable requirement. This is also configuration recommended for + // streaming purposes, so it seems better suited anyway (the whole point + // with queues is to provide Low Latency/streaming support) + ret = av_dict_set(&octx->muxer->opts, "movflags", "frag_keyframe+empty_moov", 0); + if (ret < 0) LPMS_ERR(open_output_err, "Error setting movflags for fragmented output"); + } + } else { + // normal file output + ret = avio_open(&octx->oc->pb, octx->fname, AVIO_FLAG_WRITE); + if (ret < 0) LPMS_ERR(open_output_err, "Error opening output file"); + } } // IMPORTANT: notice how up to and including this point open_output_err is @@ -364,6 +387,10 @@ int open_output(struct output_ctx *octx, struct input_ctx *ictx) // call free_output_no_trailer() exclusively! ret = avformat_write_header(octx->oc, &octx->muxer->opts); if (ret < 0) LPMS_ERR(open_output_err, "Error writing header"); + // flush headers +// ret = av_interleaved_write_frame(octx->oc, NULL); + if (ret < 0) LPMS_ERR(open_output_err, "Error flushing headers"); + queue_push_staging(&octx->write_context, BEGIN_OF_OUTPUT, 0); // From now on it is normal free_output(), hence after_header error label if(octx->sfilters != NULL && needs_decoder(octx->video->name) && octx->sf.active == 0) { @@ -431,6 +458,8 @@ encode_cleanup: int mux(AVPacket *pkt, AVRational tb, struct output_ctx *octx, AVStream *ost) { + int ret; + int64_t pts = pkt->pts; pkt->stream_index = ost->index; if (av_cmp_q(tb, ost->time_base)) { av_packet_rescale_ts(pkt, tb, ost->time_base); @@ -481,7 +510,15 @@ int mux(AVPacket *pkt, AVRational tb, struct output_ctx *octx, AVStream *ost) octx->last_video_dts = pkt->dts; } - return av_interleaved_write_frame(octx->oc, pkt); + // make sure correct timestamp will get carried through to output_queue + ret = av_interleaved_write_frame(octx->oc, pkt); + if (0 > ret) return ret; + // this means "flush output", we want to do it so that output_queue will get + // properly associated packets and timestamps + ret = av_interleaved_write_frame(octx->oc, NULL); + if (0 > ret) return ret; + queue_push_staging(&octx->write_context, PACKET_OUTPUT, pts); + return 0; } static int getmetadatainf(AVFrame *inf, struct output_ctx *octx) diff --git a/ffmpeg/encoder.h b/ffmpeg/encoder.h index 55b202749f..3466547b06 100644 --- a/ffmpeg/encoder.h +++ b/ffmpeg/encoder.h @@ -4,13 +4,14 @@ #include "decoder.h" #include "transcoder.h" #include "filter.h" +#include "output_queue.h" enum FreeOutputPolicy { FORCE_CLOSE_HW_ENCODER, PRESERVE_HW_ENCODER }; -int open_output(struct output_ctx *octx, struct input_ctx *ictx); +int open_output(struct output_ctx *octx, struct input_ctx *ictx, OutputQueue *queue); void free_output(struct output_ctx *octx, enum FreeOutputPolicy); int process_out(struct input_ctx *ictx, struct output_ctx *octx, AVCodecContext *encoder, AVStream *ost, struct filter_ctx *filter, AVFrame *inf); diff --git a/ffmpeg/ffmpeg.go b/ffmpeg/ffmpeg.go index 4c875664a4..afd247aa76 100755 --- a/ffmpeg/ffmpeg.go +++ b/ffmpeg/ffmpeg.go @@ -636,6 +636,47 @@ func loadInputBuffer(t *Transcoder, input *TranscodeOptionsIn) { } } +func storeOutputQueue(t *Transcoder, outputs []TranscodeOptions) (error) { + var fds = make([]*os.File, len(outputs)) + for i := range fds { + file, err := os.Create(outputs[i].Oname) + if nil != err { + return err + } + fds[i] = file + } + for { + // get next output packet + packet := C.lpms_transcode_peek_packet(t.handle) + if 8 == packet.flags { + break + } + // this is a data packet, write it to file + data := C.GoBytes(unsafe.Pointer(packet.data), packet.size) + bytes, err := fds[packet.index].Write(data) + if nil != err { + return err + } + if bytes != int(packet.size) { + panic("storeOutputQueue couldn't write all bytes error") + } + // pop data packet + C.lpms_transcode_pop_packet(t.handle) + } + // if we are here, we just have terminating packet, remove it + // (terminating packet carries no data, it is added there to signify + // the end of all input) + C.lpms_transcode_pop_packet(t.handle) + // Close all the open files + for i := range fds { + if nil != fds[i] { + fds[i].Close() + } + } + // Success + return nil +} + // create C output params array and return it along with corresponding finalizer // function that makes sure there are no C memory leaks @@ -961,6 +1002,16 @@ func (t *Transcoder) Transcode(input *TranscodeOptionsIn, ps []TranscodeOptions) } loadInputBuffer(t, input) ret := int(C.lpms_transcode(inp, paramsPointer, resultsPointer, C.int(len(params)), decoded)) + // be careful to use storeOutputQueue to fake lpms_transcode so that the test + // reacts properly + if (ret == 0) { + err = storeOutputQueue(t, ps) + if nil != err { + // fake output error + ret = C.AVERROR_STREAM_NOT_FOUND + } + } + if ret != 0 { if LogTranscodeErrors { glog.Error("Transcoder Return : ", ErrorMap[ret]) diff --git a/ffmpeg/filter.h b/ffmpeg/filter.h index 5e6995c493..15a37da656 100755 --- a/ffmpeg/filter.h +++ b/ffmpeg/filter.h @@ -3,6 +3,7 @@ #include #include "decoder.h" +#include "output_queue.h" struct filter_ctx { int active; @@ -36,6 +37,7 @@ struct filter_ctx { int flushing; }; +// TODO move this away, this ain't filter struct output_ctx { char *fname; // required output file name char *vfilters; // required output video filters @@ -74,6 +76,8 @@ struct output_ctx { output_results *res; // data to return for this output char *xcoderParams; + + WriteContext write_context; }; int init_video_filters(struct input_ctx *ictx, struct output_ctx *octx); diff --git a/ffmpeg/output_queue.c b/ffmpeg/output_queue.c new file mode 100644 index 0000000000..1700bd6e01 --- /dev/null +++ b/ffmpeg/output_queue.c @@ -0,0 +1,158 @@ +#include "output_queue.h" + +void queue_create(OutputQueue *queue) +{ + pthread_mutex_init(&queue->mutex, NULL); + pthread_cond_init(&queue->condition, NULL); + queue->front = queue->back = NULL; +} + +void queue_destroy(OutputQueue *queue) +{ + pthread_mutex_destroy(&queue->mutex); + pthread_cond_destroy(&queue->condition); + queue_reset(queue); +} + +static int queue_write_function(void *user_data, uint8_t *buf, int buf_size) +{ + WriteContext *wctx = (WriteContext *)user_data; + // Prepare packet + OutputPacket *packet = (OutputPacket *)malloc(sizeof(OutputPacket)); + if (!packet) return -1; + packet->data = (uint8_t *)malloc(buf_size); + if (!packet->data) { + free(packet); + return -1; + } + memcpy(packet->data, buf, buf_size); + packet->size = buf_size; + packet->index = wctx->index; + packet->next = NULL; + // Important - we are not adding to the queue now. This is because we don't + // know which flags to assign yet - for example, we can't assign END_OF_OUTPUT + // because we don't know if we are at last packet or not. Instead, packets are + // added to the staging area and queue_push_staging will be used to add them + // to the queue after muxing operation finishes. + if (wctx->staging_back) { + // not the first packet + wctx->staging_back->next = packet; + wctx->staging_back = packet; + } else { + // first packet + wctx->staging_front = wctx->staging_back = packet; + } + return buf_size; +} + +int queue_setup_as_output(OutputQueue *queue, WriteContext *wctx, AVFormatContext *ctx) +{ + wctx->queue = queue; + wctx->staging_front = wctx->staging_back = NULL; + // IMPORTANT: I am not sure if ffmpeg documentation states that explicitly, + // but the memory of ctx->pb as well as its io_buffer seem to be released when + // ctx will get closed. I tried otherwise and got "double free" errors +#define BUFFER_SIZE 4096 + void *io_buffer = av_malloc(BUFFER_SIZE); + if (!io_buffer) return -1; + ctx->pb = avio_alloc_context( + io_buffer, BUFFER_SIZE, // buffer and size + 1, // write allowed + wctx, // pass write context as user data + NULL, // no read function supplied + queue_write_function, + NULL); // no seek function supplied + if (!ctx->pb) return -1; + ctx->flags |= AVFMT_FLAG_CUSTOM_IO | AVFMT_FLAG_FLUSH_PACKETS; + return 0; +} + +void queue_reset(OutputQueue *queue) +{ + while (queue->front) { + OutputPacket *tmp = queue->front; + queue->front = queue->front->next; + if (tmp->data) free(tmp->data); + free(tmp); + } + queue->back = NULL; +} + +const OutputPacket *queue_peek_front(OutputQueue *queue) +{ + OutputPacket *tmp; + pthread_mutex_lock(&queue->mutex); + while (!queue->front) { + // wait until there is packet in the buffer + pthread_cond_wait(&queue->condition, &queue->mutex); + } + tmp = queue->front; + pthread_mutex_unlock(&queue->mutex); + return tmp; +} + +void queue_pop_front(OutputQueue *queue) +{ + OutputPacket *tmp; + pthread_mutex_lock(&queue->mutex); + while (!queue->front) { + // wait until there is packet in the buffer + pthread_cond_wait(&queue->condition, &queue->mutex); + } + tmp = queue->front; + queue->front = queue->front->next; + if (!queue->front) queue->back = NULL; + pthread_mutex_unlock(&queue->mutex); + if (tmp->data) free(tmp->data); + free(tmp); +} + +void queue_push_staging(WriteContext *wctx, PacketFlags flags, int64_t timestamp) +{ + // iterate over staging area setting flags and timestamps + OutputPacket *packet = wctx->staging_front; + // Make sure that END_OF_OUTPUT only gets assigned to the last packet + // this is because the caller knows all packets are emitted, but it + // doesn't know how many of them + PacketFlags safe_flags = flags & ~END_OF_OUTPUT; + if (!packet) return; // nothing to do + while (packet) { + packet->flags = packet->next ? safe_flags : flags; + packet->timestamp = timestamp; + packet = packet->next; + } + // move staging area into queue + pthread_mutex_lock(&wctx->queue->mutex); + if (wctx->queue->back) { + // not empty queue + wctx->queue->back->next = wctx->staging_front; + wctx->queue->back = wctx->staging_back; + } else { + // empty queue + wctx->queue->front = wctx->staging_front; + wctx->queue->back = wctx->staging_back; + } + wctx->staging_front = wctx->staging_back = NULL; + pthread_mutex_unlock(&wctx->queue->mutex); + pthread_cond_signal(&wctx->queue->condition); +} + +int queue_push_end(OutputQueue *queue) +{ + OutputPacket *packet = (OutputPacket *)malloc(sizeof(OutputPacket)); + if (!packet) return -1; + packet->size = 0; + packet->data = NULL; + packet->timestamp = -1; + packet->flags = END_OF_ALL_OUTPUTS; + pthread_mutex_lock(&queue->mutex); + if (queue->back) { + queue->back->next = packet; + queue->back = packet; + } else { + queue->front = queue->back = packet; + } + pthread_mutex_unlock(&queue->mutex); + pthread_cond_signal(&queue->condition); + return 0; +} diff --git a/ffmpeg/output_queue.h b/ffmpeg/output_queue.h new file mode 100644 index 0000000000..33f49064e6 --- /dev/null +++ b/ffmpeg/output_queue.h @@ -0,0 +1,57 @@ +#ifndef _LPMS_OUTPUT_QUEUE_H_ +#define _LPMS_OUTPUT_QUEUE_H_ + +#include +#include + +typedef enum { + BEGIN_OF_OUTPUT = 0x1, // before first packet is muxed - headers, etc + // (these packets will have timestamps of -1) + PACKET_OUTPUT = 0x2, // data packet - has valid timestamp + END_OF_OUTPUT = 0x4, // end of current stream (trailers, also ts == -1) + END_OF_ALL_OUTPUTS = 0x8 // very last packet, no data beyond +} PacketFlags; + +typedef struct _OutputPacket { + struct _OutputPacket *next; + uint8_t *data; + int size; + int index; + PacketFlags flags; + int64_t timestamp; +} OutputPacket; + +typedef struct { + // These are called "pthread", but FFmpeg also has Windows implementation, + // so we should be safe on all reasonable platforms + pthread_cond_t condition; + pthread_mutex_t mutex; + OutputPacket *front; + OutputPacket *back; +} OutputQueue; + +typedef struct { + // Queue to use + OutputQueue *queue; + int index; + // Staging area for packets + OutputPacket *staging_front; + OutputPacket *staging_back; +} WriteContext; + +// NOT THREAD SAFE +void queue_create(OutputQueue *queue); +void queue_destroy(OutputQueue *queue); +// setup glue logic to allow ctx to use queue as output +int queue_setup_as_output(OutputQueue *queue, WriteContext *wctx, AVFormatContext *ctx); +void queue_reset(OutputQueue *queue); + +// THREAD SAFE +const OutputPacket *queue_peek_front(OutputQueue *queue); +void queue_pop_front(OutputQueue *queue); +void queue_push_staging(WriteContext *wctx, PacketFlags flags, int64_t timestamp); +int queue_push_end(OutputQueue *queue); + +#endif + + diff --git a/ffmpeg/transcoder.c b/ffmpeg/transcoder.c index 423e1dbf61..faad8fb3cd 100755 --- a/ffmpeg/transcoder.c +++ b/ffmpeg/transcoder.c @@ -4,6 +4,7 @@ #include "encoder.h" #include "logging.h" #include "stream_buffer.h" +#include "output_queue.h" #include #include @@ -81,7 +82,9 @@ struct transcode_thread { // Input buffer - when I/O is done outside of transcoder, for example in // Low Latency scenarios StreamBuffer input_buffer; - int use_buffer_for_input; + int use_buffer_for_input; // TODO: name it "use custom output" or some such + // Output + OutputQueue output_queue; }; // TODO: this feels like it belongs elsewhere, not in the top-level transcoder @@ -647,7 +650,7 @@ int lpms_transcode(input_params *inp, output_params *params, if (!needs_decoder(params[i].audio.name)) h->ictx.da = ++decode_a == nb_outputs; } - ret = open_input(inp, &h->ictx, h->use_buffer_for_input ? &h->input_buffer : 0); + ret = open_input(inp, &h->ictx, h->use_buffer_for_input ? &h->input_buffer : NULL); if (ret < 0) LPMS_ERR(transcode_cleanup, "Unable to open input"); // populate output contexts @@ -680,8 +683,9 @@ int lpms_transcode(input_params *inp, output_params *params, octx->dv = h->ictx.vi < 0 || is_drop(octx->video->name); octx->da = h->ictx.ai < 0 || is_drop(octx->audio->name); octx->res = &results[i]; + octx->write_context.index = i; - ret = open_output(octx, &h->ictx); + ret = open_output(octx, &h->ictx, h->use_buffer_for_input ? &h->output_queue : NULL); if (ret < 0) LPMS_ERR(transcode_cleanup, "Unable to open output"); } @@ -695,6 +699,10 @@ int lpms_transcode(input_params *inp, output_params *params, // Part IV: shutdown transcode_cleanup: + if (h->use_buffer_for_input) { + // terminate output queue + queue_push_end(&h->output_queue); + } // IMPORTANT: note that this is the only place when PRESERVE_HW_ENCODER and // PRESERVE_HW_DECODER are used. This is done to retain HW encoder and decoder @@ -754,11 +762,14 @@ struct transcode_thread* lpms_transcode_new(lvpdnn_opts *dnn_opts) free(h); return NULL; } + queue_create(&h->output_queue); + // handle dnn filter graph creation if (dnn_opts) { AVFilterGraph *filtergraph = create_dnn_filtergraph(dnn_opts); if (!filtergraph) { buffer_destroy(&h->input_buffer); + queue_destroy(&h->output_queue); free(h); h = NULL; } else { @@ -780,6 +791,7 @@ void lpms_transcode_push_reset(struct transcode_thread *handle, int on) { if (!handle) return; buffer_reset(&handle->input_buffer); + queue_reset(&handle->output_queue); handle->use_buffer_for_input = on; } @@ -801,3 +813,14 @@ void lpms_transcode_push_error(struct transcode_thread *handle, int code) buffer_error(&handle->input_buffer, code); } +const OutputPacket *lpms_transcode_peek_packet(struct transcode_thread *handle) +{ + if (!handle) return NULL; + return queue_peek_front(&handle->output_queue); +} + +void lpms_transcode_pop_packet(struct transcode_thread *handle) +{ + if (!handle) return; + return queue_pop_front(&handle->output_queue); +} diff --git a/ffmpeg/transcoder.h b/ffmpeg/transcoder.h index 83ebce44d8..bc23269973 100755 --- a/ffmpeg/transcoder.h +++ b/ffmpeg/transcoder.h @@ -7,6 +7,7 @@ #include #include #include "logging.h" +#include "output_queue.h" // LPMS specific errors extern const int lpms_ERR_INPUT_PIXFMT; @@ -101,9 +102,13 @@ int lpms_transcode(input_params *inp, output_params *params, output_results *res struct transcode_thread* lpms_transcode_new(lvpdnn_opts *dnn_opts); void lpms_transcode_stop(struct transcode_thread* handle); void lpms_transcode_discontinuity(struct transcode_thread *handle); +// LL interface for input void lpms_transcode_push_reset(struct transcode_thread *handle, int on); void lpms_transcode_push_bytes(struct transcode_thread* handle, uint8_t *bytes, int size); void lpms_transcode_push_eof(struct transcode_thread *handle); void lpms_transcode_push_error(struct transcode_thread *handle, int code); +// LL interface for output +const OutputPacket *lpms_transcode_peek_packet(struct transcode_thread *handle); +void lpms_transcode_pop_packet(struct transcode_thread *handle); #endif // _LPMS_TRANSCODER_H_