Browse Source

avformat/fifo: add timeshift option to delay output

Signed-off-by: Marton Balint <cus@passwd.hu>
tags/n4.4
Marton Balint 5 years ago
parent
commit
81975cd24b
3 changed files with 64 additions and 2 deletions
  1. +5
    -0
      doc/muxers.texi
  2. +58
    -1
      libavformat/fifo.c
  3. +1
    -1
      libavformat/version.h

+ 5
- 0
doc/muxers.texi View File

@@ -2275,6 +2275,11 @@ certain (usually permanent) errors the recovery is not attempted even when
Specify whether to wait for the keyframe after recovering from Specify whether to wait for the keyframe after recovering from
queue overflow or failure. This option is set to 0 (false) by default. queue overflow or failure. This option is set to 0 (false) by default.


@item timeshift @var{duration}
Buffer the specified amount of packets and delay writing the output. Note that
@var{queue_size} must be big enough to store the packets for timeshift. At the
end of the input the fifo buffer is flushed at realtime speed.

@end table @end table


@subsection Examples @subsection Examples


+ 58
- 1
libavformat/fifo.c View File

@@ -19,6 +19,8 @@
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/ */


#include <stdatomic.h>

#include "libavutil/avassert.h" #include "libavutil/avassert.h"
#include "libavutil/opt.h" #include "libavutil/opt.h"
#include "libavutil/time.h" #include "libavutil/time.h"
@@ -77,6 +79,9 @@ typedef struct FifoContext {
/* Value > 0 signals queue overflow */ /* Value > 0 signals queue overflow */
volatile uint8_t overflow_flag; volatile uint8_t overflow_flag;


atomic_int_least64_t queue_duration;
int64_t last_sent_dts;
int64_t timeshift;
} FifoContext; } FifoContext;


typedef struct FifoThreadContext { typedef struct FifoThreadContext {
@@ -98,9 +103,12 @@ typedef struct FifoThreadContext {
* so finalization by calling write_trailer and ff_io_close must be done * so finalization by calling write_trailer and ff_io_close must be done
* before exiting / reinitialization of underlying muxer */ * before exiting / reinitialization of underlying muxer */
uint8_t header_written; uint8_t header_written;

int64_t last_received_dts;
} FifoThreadContext; } FifoThreadContext;


typedef enum FifoMessageType { typedef enum FifoMessageType {
FIFO_NOOP,
FIFO_WRITE_HEADER, FIFO_WRITE_HEADER,
FIFO_WRITE_PACKET, FIFO_WRITE_PACKET,
FIFO_FLUSH_OUTPUT FIFO_FLUSH_OUTPUT
@@ -159,6 +167,15 @@ static int fifo_thread_flush_output(FifoThreadContext *ctx)
return av_write_frame(avf2, NULL); return av_write_frame(avf2, NULL);
} }


static int64_t next_duration(AVFormatContext *avf, AVPacket *pkt, int64_t *last_dts)
{
AVStream *st = avf->streams[pkt->stream_index];
int64_t dts = av_rescale_q(pkt->dts, st->time_base, AV_TIME_BASE_Q);
int64_t duration = (*last_dts == AV_NOPTS_VALUE ? 0 : dts - *last_dts);
*last_dts = dts;
return duration;
}

static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt) static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
{ {
AVFormatContext *avf = ctx->avf; AVFormatContext *avf = ctx->avf;
@@ -167,6 +184,9 @@ static int fifo_thread_write_packet(FifoThreadContext *ctx, AVPacket *pkt)
AVRational src_tb, dst_tb; AVRational src_tb, dst_tb;
int ret, s_idx; int ret, s_idx;


if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
atomic_fetch_sub_explicit(&fifo->queue_duration, next_duration(avf, pkt, &ctx->last_received_dts), memory_order_relaxed);

if (ctx->drop_until_keyframe) { if (ctx->drop_until_keyframe) {
if (pkt->flags & AV_PKT_FLAG_KEY) { if (pkt->flags & AV_PKT_FLAG_KEY) {
ctx->drop_until_keyframe = 0; ctx->drop_until_keyframe = 0;
@@ -209,6 +229,9 @@ static int fifo_thread_dispatch_message(FifoThreadContext *ctx, FifoMessage *msg
{ {
int ret = AVERROR(EINVAL); int ret = AVERROR(EINVAL);


if (msg->type == FIFO_NOOP)
return 0;

if (!ctx->header_written) { if (!ctx->header_written) {
ret = fifo_thread_write_header(ctx); ret = fifo_thread_write_header(ctx);
if (ret < 0) if (ret < 0)
@@ -390,12 +413,13 @@ static void *fifo_consumer_thread(void *data)
AVFormatContext *avf = data; AVFormatContext *avf = data;
FifoContext *fifo = avf->priv_data; FifoContext *fifo = avf->priv_data;
AVThreadMessageQueue *queue = fifo->queue; AVThreadMessageQueue *queue = fifo->queue;
FifoMessage msg = {FIFO_WRITE_HEADER, {0}};
FifoMessage msg = {fifo->timeshift ? FIFO_NOOP : FIFO_WRITE_HEADER, {0}};
int ret; int ret;


FifoThreadContext fifo_thread_ctx; FifoThreadContext fifo_thread_ctx;
memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext)); memset(&fifo_thread_ctx, 0, sizeof(FifoThreadContext));
fifo_thread_ctx.avf = avf; fifo_thread_ctx.avf = avf;
fifo_thread_ctx.last_received_dts = AV_NOPTS_VALUE;


while (1) { while (1) {
uint8_t just_flushed = 0; uint8_t just_flushed = 0;
@@ -429,6 +453,10 @@ static void *fifo_consumer_thread(void *data)
if (just_flushed) if (just_flushed)
av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n"); av_log(avf, AV_LOG_INFO, "FIFO queue flushed\n");


if (fifo->timeshift)
while (atomic_load_explicit(&fifo->queue_duration, memory_order_relaxed) < fifo->timeshift)
av_usleep(10000);

ret = av_thread_message_queue_recv(queue, &msg, 0); ret = av_thread_message_queue_recv(queue, &msg, 0);
if (ret < 0) { if (ret < 0) {
av_thread_message_queue_set_err_send(queue, ret); av_thread_message_queue_set_err_send(queue, ret);
@@ -488,6 +516,8 @@ static int fifo_init(AVFormatContext *avf)
" only when drop_pkts_on_overflow is also turned on\n"); " only when drop_pkts_on_overflow is also turned on\n");
return AVERROR(EINVAL); return AVERROR(EINVAL);
} }
atomic_init(&fifo->queue_duration, 0);
fifo->last_sent_dts = AV_NOPTS_VALUE;


oformat = av_guess_format(fifo->format, avf->url, NULL); oformat = av_guess_format(fifo->format, avf->url, NULL);
if (!oformat) { if (!oformat) {
@@ -563,6 +593,9 @@ static int fifo_write_packet(AVFormatContext *avf, AVPacket *pkt)
goto fail; goto fail;
} }


if (fifo->timeshift && pkt->dts != AV_NOPTS_VALUE)
atomic_fetch_add_explicit(&fifo->queue_duration, next_duration(avf, pkt, &fifo->last_sent_dts), memory_order_relaxed);

return ret; return ret;
fail: fail:
if (pkt) if (pkt)
@@ -576,6 +609,27 @@ static int fifo_write_trailer(AVFormatContext *avf)
int ret; int ret;


av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF); av_thread_message_queue_set_err_recv(fifo->queue, AVERROR_EOF);
if (fifo->timeshift) {
int64_t now = av_gettime_relative();
int64_t elapsed = 0;
FifoMessage msg = {FIFO_NOOP};
do {
int64_t delay = av_gettime_relative() - now;
if (delay < 0) { // Discontinuity?
delay = 10000;
now = av_gettime_relative();
} else {
now += delay;
}
atomic_fetch_add_explicit(&fifo->queue_duration, delay, memory_order_relaxed);
elapsed += delay;
if (elapsed > fifo->timeshift)
break;
av_usleep(10000);
ret = av_thread_message_queue_send(fifo->queue, &msg, AV_THREAD_MESSAGE_NONBLOCK);
} while (ret >= 0 || ret == AVERROR(EAGAIN));
atomic_store(&fifo->queue_duration, INT64_MAX);
}


ret = pthread_join(fifo->writer_thread, NULL); ret = pthread_join(fifo->writer_thread, NULL);
if (ret < 0) { if (ret < 0) {
@@ -630,6 +684,9 @@ static const AVOption options[] = {
{"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error), {"recover_any_error", "Attempt recovery regardless of type of the error", OFFSET(recover_any_error),
AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM}, AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},


{"timeshift", "Delay fifo output", OFFSET(timeshift),
AV_OPT_TYPE_DURATION, {.i64 = 0}, 0, INT64_MAX, AV_OPT_FLAG_ENCODING_PARAM},

{NULL}, {NULL},
}; };




+ 1
- 1
libavformat/version.h View File

@@ -33,7 +33,7 @@
// Also please add any ticket numbers that you believe might be affected here // Also please add any ticket numbers that you believe might be affected here
#define LIBAVFORMAT_VERSION_MAJOR 58 #define LIBAVFORMAT_VERSION_MAJOR 58
#define LIBAVFORMAT_VERSION_MINOR 46 #define LIBAVFORMAT_VERSION_MINOR 46
#define LIBAVFORMAT_VERSION_MICRO 100
#define LIBAVFORMAT_VERSION_MICRO 101


#define LIBAVFORMAT_VERSION_INT AV_VERSION_INT(LIBAVFORMAT_VERSION_MAJOR, \ #define LIBAVFORMAT_VERSION_INT AV_VERSION_INT(LIBAVFORMAT_VERSION_MAJOR, \
LIBAVFORMAT_VERSION_MINOR, \ LIBAVFORMAT_VERSION_MINOR, \


Loading…
Cancel
Save