You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

619 lines
19KB

  1. /*
  2. * Tee pseudo-muxer
  3. * Copyright (c) 2012 Nicolas George
  4. *
  5. * This file is part of FFmpeg.
  6. *
  7. * FFmpeg is free software; you can redistribute it and/or
  8. * modify it under the terms of the GNU Lesser General Public License
  9. * as published by the Free Software Foundation; either
  10. * version 2.1 of the License, or (at your option) any later version.
  11. *
  12. * FFmpeg is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU Lesser General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU Lesser General Public License
  18. * along with FFmpeg; if not, write to the Free Software * Foundation, Inc.,
  19. * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  20. */
  21. #include "libavutil/avutil.h"
  22. #include "libavutil/avstring.h"
  23. #include "libavutil/opt.h"
  24. #include "internal.h"
  25. #include "avformat.h"
  26. #include "avio_internal.h"
  27. #include "tee_common.h"
  28. typedef enum {
  29. ON_SLAVE_FAILURE_ABORT = 1,
  30. ON_SLAVE_FAILURE_IGNORE = 2
  31. } SlaveFailurePolicy;
  32. #define DEFAULT_SLAVE_FAILURE_POLICY ON_SLAVE_FAILURE_ABORT
  33. typedef struct {
  34. AVFormatContext *avf;
  35. AVBSFContext **bsfs; ///< bitstream filters per stream
  36. SlaveFailurePolicy on_fail;
  37. int use_fifo;
  38. AVDictionary *fifo_options;
  39. /** map from input to output streams indexes,
  40. * disabled output streams are set to -1 */
  41. int *stream_map;
  42. int header_written;
  43. } TeeSlave;
  44. typedef struct TeeContext {
  45. const AVClass *class;
  46. unsigned nb_slaves;
  47. unsigned nb_alive;
  48. TeeSlave *slaves;
  49. int use_fifo;
  50. AVDictionary *fifo_options;
  51. } TeeContext;
  52. static const char *const slave_delim = "|";
  53. static const char *const slave_bsfs_spec_sep = "/";
  54. static const char *const slave_select_sep = ",";
  55. #define OFFSET(x) offsetof(TeeContext, x)
  56. static const AVOption options[] = {
  57. {"use_fifo", "Use fifo pseudo-muxer to separate actual muxers from encoder",
  58. OFFSET(use_fifo), AV_OPT_TYPE_BOOL, {.i64 = 0}, 0, 1, AV_OPT_FLAG_ENCODING_PARAM},
  59. {"fifo_options", "fifo pseudo-muxer options", OFFSET(fifo_options),
  60. AV_OPT_TYPE_DICT, {.str = NULL}, 0, 0, AV_OPT_FLAG_ENCODING_PARAM},
  61. {NULL}
  62. };
  63. static const AVClass tee_muxer_class = {
  64. .class_name = "Tee muxer",
  65. .item_name = av_default_item_name,
  66. .option = options,
  67. .version = LIBAVUTIL_VERSION_INT,
  68. };
  69. static inline int parse_slave_failure_policy_option(const char *opt, TeeSlave *tee_slave)
  70. {
  71. if (!opt) {
  72. tee_slave->on_fail = DEFAULT_SLAVE_FAILURE_POLICY;
  73. return 0;
  74. } else if (!av_strcasecmp("abort", opt)) {
  75. tee_slave->on_fail = ON_SLAVE_FAILURE_ABORT;
  76. return 0;
  77. } else if (!av_strcasecmp("ignore", opt)) {
  78. tee_slave->on_fail = ON_SLAVE_FAILURE_IGNORE;
  79. return 0;
  80. }
  81. /* Set failure behaviour to abort, so invalid option error will not be ignored */
  82. tee_slave->on_fail = ON_SLAVE_FAILURE_ABORT;
  83. return AVERROR(EINVAL);
  84. }
  85. static int parse_slave_fifo_options(const char *use_fifo,
  86. const char *fifo_options, TeeSlave *tee_slave)
  87. {
  88. int ret = 0;
  89. if (use_fifo) {
  90. /*TODO - change this to use proper function for parsing boolean
  91. * options when there is one */
  92. if (av_match_name(use_fifo, "true,y,yes,enable,enabled,on,1")) {
  93. tee_slave->use_fifo = 1;
  94. } else if (av_match_name(use_fifo, "false,n,no,disable,disabled,off,0")) {
  95. tee_slave->use_fifo = 0;
  96. } else {
  97. return AVERROR(EINVAL);
  98. }
  99. }
  100. if (fifo_options)
  101. ret = av_dict_parse_string(&tee_slave->fifo_options, fifo_options, "=", ":", 0);
  102. return ret;
  103. }
  104. static int close_slave(TeeSlave *tee_slave)
  105. {
  106. AVFormatContext *avf;
  107. unsigned i;
  108. int ret = 0;
  109. avf = tee_slave->avf;
  110. if (!avf)
  111. return 0;
  112. if (tee_slave->header_written)
  113. ret = av_write_trailer(avf);
  114. if (tee_slave->bsfs) {
  115. for (i = 0; i < avf->nb_streams; ++i)
  116. av_bsf_free(&tee_slave->bsfs[i]);
  117. }
  118. av_freep(&tee_slave->stream_map);
  119. av_freep(&tee_slave->bsfs);
  120. ff_format_io_close(avf, &avf->pb);
  121. avformat_free_context(avf);
  122. tee_slave->avf = NULL;
  123. return ret;
  124. }
  125. static void close_slaves(AVFormatContext *avf)
  126. {
  127. TeeContext *tee = avf->priv_data;
  128. unsigned i;
  129. for (i = 0; i < tee->nb_slaves; i++) {
  130. close_slave(&tee->slaves[i]);
  131. }
  132. av_freep(&tee->slaves);
  133. }
  134. static int open_slave(AVFormatContext *avf, char *slave, TeeSlave *tee_slave)
  135. {
  136. int i, ret;
  137. AVDictionary *options = NULL, *bsf_options = NULL;
  138. AVDictionaryEntry *entry;
  139. char *filename;
  140. char *format = NULL, *select = NULL, *on_fail = NULL;
  141. char *use_fifo = NULL, *fifo_options_str = NULL;
  142. AVFormatContext *avf2 = NULL;
  143. AVStream *st, *st2;
  144. int stream_count;
  145. int fullret;
  146. char *subselect = NULL, *next_subselect = NULL, *first_subselect = NULL, *tmp_select = NULL;
  147. if ((ret = ff_tee_parse_slave_options(avf, slave, &options, &filename)) < 0)
  148. return ret;
  149. #define STEAL_OPTION(option, field) do { \
  150. if ((entry = av_dict_get(options, option, NULL, 0))) { \
  151. field = entry->value; \
  152. entry->value = NULL; /* prevent it from being freed */ \
  153. av_dict_set(&options, option, NULL, 0); \
  154. } \
  155. } while (0)
  156. STEAL_OPTION("f", format);
  157. STEAL_OPTION("select", select);
  158. STEAL_OPTION("onfail", on_fail);
  159. STEAL_OPTION("use_fifo", use_fifo);
  160. STEAL_OPTION("fifo_options", fifo_options_str);
  161. entry = NULL;
  162. while ((entry = av_dict_get(options, "bsfs", entry, AV_DICT_IGNORE_SUFFIX))) {
  163. /* trim out strlen("bsfs") characters from key */
  164. av_dict_set(&bsf_options, entry->key + 4, entry->value, 0);
  165. av_dict_set(&options, entry->key, NULL, 0);
  166. }
  167. ret = parse_slave_failure_policy_option(on_fail, tee_slave);
  168. if (ret < 0) {
  169. av_log(avf, AV_LOG_ERROR,
  170. "Invalid onfail option value, valid options are 'abort' and 'ignore'\n");
  171. goto end;
  172. }
  173. ret = parse_slave_fifo_options(use_fifo, fifo_options_str, tee_slave);
  174. if (ret < 0) {
  175. av_log(avf, AV_LOG_ERROR, "Error parsing fifo options: %s\n", av_err2str(ret));
  176. goto end;
  177. }
  178. if (tee_slave->use_fifo) {
  179. if (options) {
  180. char *format_options_str = NULL;
  181. ret = av_dict_get_string(options, &format_options_str, '=', ':');
  182. if (ret < 0)
  183. goto end;
  184. ret = av_dict_set(&tee_slave->fifo_options, "format_opts", format_options_str,
  185. AV_DICT_DONT_STRDUP_VAL);
  186. if (ret < 0)
  187. goto end;
  188. }
  189. if (format) {
  190. ret = av_dict_set(&tee_slave->fifo_options, "fifo_format", format,
  191. AV_DICT_DONT_STRDUP_VAL);
  192. format = NULL;
  193. if (ret < 0)
  194. goto end;
  195. }
  196. av_dict_free(&options);
  197. options = tee_slave->fifo_options;
  198. }
  199. ret = avformat_alloc_output_context2(&avf2, NULL,
  200. tee_slave->use_fifo ? "fifo" :format, filename);
  201. if (ret < 0)
  202. goto end;
  203. tee_slave->avf = avf2;
  204. av_dict_copy(&avf2->metadata, avf->metadata, 0);
  205. avf2->opaque = avf->opaque;
  206. avf2->io_open = avf->io_open;
  207. avf2->io_close = avf->io_close;
  208. avf2->interrupt_callback = avf->interrupt_callback;
  209. avf2->flags = avf->flags;
  210. avf2->strict_std_compliance = avf->strict_std_compliance;
  211. tee_slave->stream_map = av_calloc(avf->nb_streams, sizeof(*tee_slave->stream_map));
  212. if (!tee_slave->stream_map) {
  213. ret = AVERROR(ENOMEM);
  214. goto end;
  215. }
  216. stream_count = 0;
  217. for (i = 0; i < avf->nb_streams; i++) {
  218. st = avf->streams[i];
  219. if (select) {
  220. tmp_select = av_strdup(select); // av_strtok is destructive so we regenerate it in each loop
  221. if (!tmp_select) {
  222. ret = AVERROR(ENOMEM);
  223. goto end;
  224. }
  225. fullret = 0;
  226. first_subselect = tmp_select;
  227. next_subselect = NULL;
  228. while (subselect = av_strtok(first_subselect, slave_select_sep, &next_subselect)) {
  229. first_subselect = NULL;
  230. ret = avformat_match_stream_specifier(avf, avf->streams[i], subselect);
  231. if (ret < 0) {
  232. av_log(avf, AV_LOG_ERROR,
  233. "Invalid stream specifier '%s' for output '%s'\n",
  234. subselect, slave);
  235. goto end;
  236. }
  237. if (ret != 0) {
  238. fullret = 1; // match
  239. break;
  240. }
  241. }
  242. av_freep(&tmp_select);
  243. if (fullret == 0) { /* no match */
  244. tee_slave->stream_map[i] = -1;
  245. continue;
  246. }
  247. }
  248. tee_slave->stream_map[i] = stream_count++;
  249. if (!(st2 = avformat_new_stream(avf2, NULL))) {
  250. ret = AVERROR(ENOMEM);
  251. goto end;
  252. }
  253. ret = ff_stream_encode_params_copy(st2, st);
  254. if (ret < 0)
  255. goto end;
  256. }
  257. ret = ff_format_output_open(avf2, filename, NULL);
  258. if (ret < 0) {
  259. av_log(avf, AV_LOG_ERROR, "Slave '%s': error opening: %s\n", slave,
  260. av_err2str(ret));
  261. goto end;
  262. }
  263. if ((ret = avformat_write_header(avf2, &options)) < 0) {
  264. av_log(avf, AV_LOG_ERROR, "Slave '%s': error writing header: %s\n",
  265. slave, av_err2str(ret));
  266. goto end;
  267. }
  268. tee_slave->header_written = 1;
  269. tee_slave->bsfs = av_calloc(avf2->nb_streams, sizeof(*tee_slave->bsfs));
  270. if (!tee_slave->bsfs) {
  271. ret = AVERROR(ENOMEM);
  272. goto end;
  273. }
  274. entry = NULL;
  275. while (entry = av_dict_get(bsf_options, "", NULL, AV_DICT_IGNORE_SUFFIX)) {
  276. const char *spec = entry->key;
  277. if (*spec) {
  278. if (strspn(spec, slave_bsfs_spec_sep) != 1) {
  279. av_log(avf, AV_LOG_ERROR,
  280. "Specifier separator in '%s' is '%c', but only characters '%s' "
  281. "are allowed\n", entry->key, *spec, slave_bsfs_spec_sep);
  282. ret = AVERROR(EINVAL);
  283. goto end;
  284. }
  285. spec++; /* consume separator */
  286. }
  287. for (i = 0; i < avf2->nb_streams; i++) {
  288. ret = avformat_match_stream_specifier(avf2, avf2->streams[i], spec);
  289. if (ret < 0) {
  290. av_log(avf, AV_LOG_ERROR,
  291. "Invalid stream specifier '%s' in bsfs option '%s' for slave "
  292. "output '%s'\n", spec, entry->key, filename);
  293. goto end;
  294. }
  295. if (ret > 0) {
  296. av_log(avf, AV_LOG_DEBUG, "spec:%s bsfs:%s matches stream %d of slave "
  297. "output '%s'\n", spec, entry->value, i, filename);
  298. if (tee_slave->bsfs[i]) {
  299. av_log(avf, AV_LOG_WARNING,
  300. "Duplicate bsfs specification associated to stream %d of slave "
  301. "output '%s', filters will be ignored\n", i, filename);
  302. continue;
  303. }
  304. ret = av_bsf_list_parse_str(entry->value, &tee_slave->bsfs[i]);
  305. if (ret < 0) {
  306. av_log(avf, AV_LOG_ERROR,
  307. "Error parsing bitstream filter sequence '%s' associated to "
  308. "stream %d of slave output '%s'\n", entry->value, i, filename);
  309. goto end;
  310. }
  311. }
  312. }
  313. av_dict_set(&bsf_options, entry->key, NULL, 0);
  314. }
  315. for (i = 0; i < avf->nb_streams; i++){
  316. int target_stream = tee_slave->stream_map[i];
  317. if (target_stream < 0)
  318. continue;
  319. if (!tee_slave->bsfs[target_stream]) {
  320. /* Add pass-through bitstream filter */
  321. ret = av_bsf_get_null_filter(&tee_slave->bsfs[target_stream]);
  322. if (ret < 0) {
  323. av_log(avf, AV_LOG_ERROR,
  324. "Failed to create pass-through bitstream filter: %s\n",
  325. av_err2str(ret));
  326. goto end;
  327. }
  328. }
  329. tee_slave->bsfs[target_stream]->time_base_in = avf->streams[i]->time_base;
  330. ret = avcodec_parameters_copy(tee_slave->bsfs[target_stream]->par_in,
  331. avf->streams[i]->codecpar);
  332. if (ret < 0)
  333. goto end;
  334. ret = av_bsf_init(tee_slave->bsfs[target_stream]);
  335. if (ret < 0) {
  336. av_log(avf, AV_LOG_ERROR,
  337. "Failed to initialize bitstream filter(s): %s\n",
  338. av_err2str(ret));
  339. goto end;
  340. }
  341. }
  342. if (options) {
  343. entry = NULL;
  344. while ((entry = av_dict_get(options, "", entry, AV_DICT_IGNORE_SUFFIX)))
  345. av_log(avf2, AV_LOG_ERROR, "Unknown option '%s'\n", entry->key);
  346. ret = AVERROR_OPTION_NOT_FOUND;
  347. goto end;
  348. }
  349. end:
  350. av_free(format);
  351. av_free(select);
  352. av_free(on_fail);
  353. av_dict_free(&options);
  354. av_dict_free(&bsf_options);
  355. av_freep(&tmp_select);
  356. return ret;
  357. }
  358. static void log_slave(TeeSlave *slave, void *log_ctx, int log_level)
  359. {
  360. int i;
  361. av_log(log_ctx, log_level, "filename:'%s' format:%s\n",
  362. slave->avf->url, slave->avf->oformat->name);
  363. for (i = 0; i < slave->avf->nb_streams; i++) {
  364. AVStream *st = slave->avf->streams[i];
  365. AVBSFContext *bsf = slave->bsfs[i];
  366. const char *bsf_name;
  367. av_log(log_ctx, log_level, " stream:%d codec:%s type:%s",
  368. i, avcodec_get_name(st->codecpar->codec_id),
  369. av_get_media_type_string(st->codecpar->codec_type));
  370. bsf_name = bsf->filter->priv_class ?
  371. bsf->filter->priv_class->item_name(bsf) : bsf->filter->name;
  372. av_log(log_ctx, log_level, " bsfs: %s\n", bsf_name);
  373. }
  374. }
  375. static int tee_process_slave_failure(AVFormatContext *avf, unsigned slave_idx, int err_n)
  376. {
  377. TeeContext *tee = avf->priv_data;
  378. TeeSlave *tee_slave = &tee->slaves[slave_idx];
  379. tee->nb_alive--;
  380. close_slave(tee_slave);
  381. if (!tee->nb_alive) {
  382. av_log(avf, AV_LOG_ERROR, "All tee outputs failed.\n");
  383. return err_n;
  384. } else if (tee_slave->on_fail == ON_SLAVE_FAILURE_ABORT) {
  385. av_log(avf, AV_LOG_ERROR, "Slave muxer #%u failed, aborting.\n", slave_idx);
  386. return err_n;
  387. } else {
  388. av_log(avf, AV_LOG_ERROR, "Slave muxer #%u failed: %s, continuing with %u/%u slaves.\n",
  389. slave_idx, av_err2str(err_n), tee->nb_alive, tee->nb_slaves);
  390. return 0;
  391. }
  392. }
  393. static int tee_write_header(AVFormatContext *avf)
  394. {
  395. TeeContext *tee = avf->priv_data;
  396. unsigned nb_slaves = 0, i;
  397. const char *filename = avf->url;
  398. char **slaves = NULL;
  399. int ret;
  400. while (*filename) {
  401. char *slave = av_get_token(&filename, slave_delim);
  402. if (!slave) {
  403. ret = AVERROR(ENOMEM);
  404. goto fail;
  405. }
  406. ret = av_dynarray_add_nofree(&slaves, &nb_slaves, slave);
  407. if (ret < 0) {
  408. av_free(slave);
  409. goto fail;
  410. }
  411. if (strspn(filename, slave_delim))
  412. filename++;
  413. }
  414. if (!(tee->slaves = av_mallocz_array(nb_slaves, sizeof(*tee->slaves)))) {
  415. ret = AVERROR(ENOMEM);
  416. goto fail;
  417. }
  418. tee->nb_slaves = tee->nb_alive = nb_slaves;
  419. for (i = 0; i < nb_slaves; i++) {
  420. tee->slaves[i].use_fifo = tee->use_fifo;
  421. ret = av_dict_copy(&tee->slaves[i].fifo_options, tee->fifo_options, 0);
  422. if (ret < 0)
  423. goto fail;
  424. if ((ret = open_slave(avf, slaves[i], &tee->slaves[i])) < 0) {
  425. ret = tee_process_slave_failure(avf, i, ret);
  426. if (ret < 0)
  427. goto fail;
  428. } else {
  429. log_slave(&tee->slaves[i], avf, AV_LOG_VERBOSE);
  430. }
  431. av_freep(&slaves[i]);
  432. }
  433. for (i = 0; i < avf->nb_streams; i++) {
  434. int j, mapped = 0;
  435. for (j = 0; j < tee->nb_slaves; j++)
  436. if (tee->slaves[j].avf)
  437. mapped += tee->slaves[j].stream_map[i] >= 0;
  438. if (!mapped)
  439. av_log(avf, AV_LOG_WARNING, "Input stream #%d is not mapped "
  440. "to any slave.\n", i);
  441. }
  442. av_free(slaves);
  443. return 0;
  444. fail:
  445. for (i = 0; i < nb_slaves; i++)
  446. av_freep(&slaves[i]);
  447. close_slaves(avf);
  448. av_free(slaves);
  449. return ret;
  450. }
  451. static int tee_write_trailer(AVFormatContext *avf)
  452. {
  453. TeeContext *tee = avf->priv_data;
  454. int ret_all = 0, ret;
  455. unsigned i;
  456. for (i = 0; i < tee->nb_slaves; i++) {
  457. if ((ret = close_slave(&tee->slaves[i])) < 0) {
  458. ret = tee_process_slave_failure(avf, i, ret);
  459. if (!ret_all && ret < 0)
  460. ret_all = ret;
  461. }
  462. }
  463. av_freep(&tee->slaves);
  464. return ret_all;
  465. }
  466. static int tee_write_packet(AVFormatContext *avf, AVPacket *pkt)
  467. {
  468. TeeContext *tee = avf->priv_data;
  469. AVFormatContext *avf2;
  470. AVBSFContext *bsfs;
  471. AVPacket pkt2;
  472. int ret_all = 0, ret;
  473. unsigned i, s;
  474. int s2;
  475. for (i = 0; i < tee->nb_slaves; i++) {
  476. if (!(avf2 = tee->slaves[i].avf))
  477. continue;
  478. /* Flush slave if pkt is NULL*/
  479. if (!pkt) {
  480. ret = av_interleaved_write_frame(avf2, NULL);
  481. if (ret < 0) {
  482. ret = tee_process_slave_failure(avf, i, ret);
  483. if (!ret_all && ret < 0)
  484. ret_all = ret;
  485. }
  486. continue;
  487. }
  488. s = pkt->stream_index;
  489. s2 = tee->slaves[i].stream_map[s];
  490. if (s2 < 0)
  491. continue;
  492. if ((ret = av_packet_ref(&pkt2, pkt)) < 0)
  493. if (!ret_all) {
  494. ret_all = ret;
  495. continue;
  496. }
  497. bsfs = tee->slaves[i].bsfs[s2];
  498. pkt2.stream_index = s2;
  499. ret = av_bsf_send_packet(bsfs, &pkt2);
  500. if (ret < 0) {
  501. av_log(avf, AV_LOG_ERROR, "Error while sending packet to bitstream filter: %s\n",
  502. av_err2str(ret));
  503. ret = tee_process_slave_failure(avf, i, ret);
  504. if (!ret_all && ret < 0)
  505. ret_all = ret;
  506. }
  507. while(1) {
  508. ret = av_bsf_receive_packet(bsfs, &pkt2);
  509. if (ret == AVERROR(EAGAIN)) {
  510. ret = 0;
  511. break;
  512. } else if (ret < 0) {
  513. break;
  514. }
  515. av_packet_rescale_ts(&pkt2, bsfs->time_base_out,
  516. avf2->streams[s2]->time_base);
  517. ret = av_interleaved_write_frame(avf2, &pkt2);
  518. if (ret < 0)
  519. break;
  520. };
  521. if (ret < 0) {
  522. ret = tee_process_slave_failure(avf, i, ret);
  523. if (!ret_all && ret < 0)
  524. ret_all = ret;
  525. }
  526. }
  527. return ret_all;
  528. }
  529. AVOutputFormat ff_tee_muxer = {
  530. .name = "tee",
  531. .long_name = NULL_IF_CONFIG_SMALL("Multiple muxer tee"),
  532. .priv_data_size = sizeof(TeeContext),
  533. .write_header = tee_write_header,
  534. .write_trailer = tee_write_trailer,
  535. .write_packet = tee_write_packet,
  536. .priv_class = &tee_muxer_class,
  537. .flags = AVFMT_NOFILE | AVFMT_ALLOW_FLUSH,
  538. };