You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

618 lines
17KB

  1. /*
  2. * Input async protocol.
  3. * Copyright (c) 2015 Zhang Rui <bbcallen@gmail.com>
  4. *
  5. * This file is part of FFmpeg.
  6. *
  7. * FFmpeg is free software; you can redistribute it and/or
  8. * modify it under the terms of the GNU Lesser General Public
  9. * License as published by the Free Software Foundation; either
  10. * version 2.1 of the License, or (at your option) any later version.
  11. *
  12. * FFmpeg is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  15. * Lesser General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU Lesser General Public
  18. * License along with FFmpeg; if not, write to the Free Software
  19. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  20. *
  21. * Based on libavformat/cache.c by Michael Niedermayer
  22. */
  23. /**
  24. * @TODO
  25. * support timeout
  26. * support backward short seek
  27. * support work with concatdec, hls
  28. */
  29. #include "libavutil/avassert.h"
  30. #include "libavutil/avstring.h"
  31. #include "libavutil/error.h"
  32. #include "libavutil/fifo.h"
  33. #include "libavutil/log.h"
  34. #include "libavutil/opt.h"
  35. #include "libavutil/thread.h"
  36. #include "url.h"
  37. #include <stdint.h>
  38. #if HAVE_UNISTD_H
  39. #include <unistd.h>
  40. #endif
  41. #define BUFFER_CAPACITY (4 * 1024 * 1024)
  42. #define SHORT_SEEK_THRESHOLD (256 * 1024)
  43. typedef struct Context {
  44. AVClass *class;
  45. URLContext *inner;
  46. int seek_request;
  47. int64_t seek_pos;
  48. int seek_whence;
  49. int seek_completed;
  50. int64_t seek_ret;
  51. int inner_io_error;
  52. int io_error;
  53. int io_eof_reached;
  54. int64_t logical_pos;
  55. int64_t logical_size;
  56. AVFifoBuffer *fifo;
  57. pthread_cond_t cond_wakeup_main;
  58. pthread_cond_t cond_wakeup_background;
  59. pthread_mutex_t mutex;
  60. pthread_t async_buffer_thread;
  61. int abort_request;
  62. AVIOInterruptCB interrupt_callback;
  63. } Context;
  64. static int async_check_interrupt(void *arg)
  65. {
  66. URLContext *h = arg;
  67. Context *c = h->priv_data;
  68. if (c->abort_request)
  69. return 1;
  70. if (ff_check_interrupt(&c->interrupt_callback))
  71. c->abort_request = 1;
  72. return c->abort_request;
  73. }
  74. static int wrapped_url_read(void *src, void *dst, int size)
  75. {
  76. URLContext *h = src;
  77. Context *c = h->priv_data;
  78. int ret;
  79. ret = ffurl_read(c->inner, dst, size);
  80. c->inner_io_error = ret < 0 ? ret : 0;
  81. return ret;
  82. }
  83. static void *async_buffer_task(void *arg)
  84. {
  85. URLContext *h = arg;
  86. Context *c = h->priv_data;
  87. AVFifoBuffer *fifo = c->fifo;
  88. int ret = 0;
  89. int64_t seek_ret;
  90. while (1) {
  91. int fifo_space, to_copy;
  92. pthread_mutex_lock(&c->mutex);
  93. if (async_check_interrupt(h)) {
  94. c->io_eof_reached = 1;
  95. c->io_error = AVERROR_EXIT;
  96. pthread_cond_signal(&c->cond_wakeup_main);
  97. pthread_mutex_unlock(&c->mutex);
  98. break;
  99. }
  100. if (c->seek_request) {
  101. seek_ret = ffurl_seek(c->inner, c->seek_pos, c->seek_whence);
  102. if (seek_ret < 0) {
  103. c->io_eof_reached = 1;
  104. c->io_error = (int)seek_ret;
  105. } else {
  106. c->io_eof_reached = 0;
  107. c->io_error = 0;
  108. }
  109. c->seek_completed = 1;
  110. c->seek_ret = seek_ret;
  111. c->seek_request = 0;
  112. av_fifo_reset(fifo);
  113. pthread_cond_signal(&c->cond_wakeup_main);
  114. pthread_mutex_unlock(&c->mutex);
  115. continue;
  116. }
  117. fifo_space = av_fifo_space(fifo);
  118. if (c->io_eof_reached || fifo_space <= 0) {
  119. pthread_cond_signal(&c->cond_wakeup_main);
  120. pthread_cond_wait(&c->cond_wakeup_background, &c->mutex);
  121. pthread_mutex_unlock(&c->mutex);
  122. continue;
  123. }
  124. pthread_mutex_unlock(&c->mutex);
  125. to_copy = FFMIN(4096, fifo_space);
  126. ret = av_fifo_generic_write(fifo, (void *)h, to_copy, (void *)wrapped_url_read);
  127. pthread_mutex_lock(&c->mutex);
  128. if (ret <= 0) {
  129. c->io_eof_reached = 1;
  130. if (c->inner_io_error < 0)
  131. c->io_error = c->inner_io_error;
  132. }
  133. pthread_cond_signal(&c->cond_wakeup_main);
  134. pthread_mutex_unlock(&c->mutex);
  135. }
  136. return NULL;
  137. }
  138. static int async_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
  139. {
  140. Context *c = h->priv_data;
  141. int ret;
  142. AVIOInterruptCB interrupt_callback = {.callback = async_check_interrupt, .opaque = h};
  143. av_strstart(arg, "async:", &arg);
  144. c->fifo = av_fifo_alloc(BUFFER_CAPACITY);
  145. if (!c->fifo) {
  146. ret = AVERROR(ENOMEM);
  147. goto fifo_fail;
  148. }
  149. /* wrap interrupt callback */
  150. c->interrupt_callback = h->interrupt_callback;
  151. ret = ffurl_open(&c->inner, arg, flags, &interrupt_callback, options);
  152. if (ret != 0) {
  153. av_log(h, AV_LOG_ERROR, "ffurl_open failed : %s, %s\n", av_err2str(ret), arg);
  154. goto url_fail;
  155. }
  156. c->logical_size = ffurl_size(c->inner);
  157. h->is_streamed = c->inner->is_streamed;
  158. ret = pthread_mutex_init(&c->mutex, NULL);
  159. if (ret != 0) {
  160. av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", av_err2str(ret));
  161. goto mutex_fail;
  162. }
  163. ret = pthread_cond_init(&c->cond_wakeup_main, NULL);
  164. if (ret != 0) {
  165. av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", av_err2str(ret));
  166. goto cond_wakeup_main_fail;
  167. }
  168. ret = pthread_cond_init(&c->cond_wakeup_background, NULL);
  169. if (ret != 0) {
  170. av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", av_err2str(ret));
  171. goto cond_wakeup_background_fail;
  172. }
  173. ret = pthread_create(&c->async_buffer_thread, NULL, async_buffer_task, h);
  174. if (ret) {
  175. av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", av_err2str(ret));
  176. goto thread_fail;
  177. }
  178. return 0;
  179. thread_fail:
  180. pthread_cond_destroy(&c->cond_wakeup_background);
  181. cond_wakeup_background_fail:
  182. pthread_cond_destroy(&c->cond_wakeup_main);
  183. cond_wakeup_main_fail:
  184. pthread_mutex_destroy(&c->mutex);
  185. mutex_fail:
  186. ffurl_close(c->inner);
  187. url_fail:
  188. av_fifo_freep(&c->fifo);
  189. fifo_fail:
  190. return ret;
  191. }
  192. static int async_close(URLContext *h)
  193. {
  194. Context *c = h->priv_data;
  195. int ret;
  196. pthread_mutex_lock(&c->mutex);
  197. c->abort_request = 1;
  198. pthread_cond_signal(&c->cond_wakeup_background);
  199. pthread_mutex_unlock(&c->mutex);
  200. ret = pthread_join(c->async_buffer_thread, NULL);
  201. if (ret != 0)
  202. av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", av_err2str(ret));
  203. pthread_cond_destroy(&c->cond_wakeup_background);
  204. pthread_cond_destroy(&c->cond_wakeup_main);
  205. pthread_mutex_destroy(&c->mutex);
  206. ffurl_close(c->inner);
  207. av_fifo_freep(&c->fifo);
  208. return 0;
  209. }
  210. static int async_read_internal(URLContext *h, void *dest, int size, int read_complete,
  211. void (*func)(void*, void*, int))
  212. {
  213. Context *c = h->priv_data;
  214. AVFifoBuffer *fifo = c->fifo;
  215. int to_read = size;
  216. int ret = 0;
  217. pthread_mutex_lock(&c->mutex);
  218. while (to_read > 0) {
  219. int fifo_size, to_copy;
  220. if (async_check_interrupt(h)) {
  221. ret = AVERROR_EXIT;
  222. break;
  223. }
  224. fifo_size = av_fifo_size(fifo);
  225. to_copy = FFMIN(to_read, fifo_size);
  226. if (to_copy > 0) {
  227. av_fifo_generic_read(fifo, dest, to_copy, func);
  228. if (!func)
  229. dest = (uint8_t *)dest + to_copy;
  230. c->logical_pos += to_copy;
  231. to_read -= to_copy;
  232. ret = size - to_read;
  233. if (to_read <= 0 || !read_complete)
  234. break;
  235. } else if (c->io_eof_reached) {
  236. if (ret <= 0) {
  237. if (c->io_error)
  238. ret = c->io_error;
  239. else
  240. ret = AVERROR_EOF;
  241. }
  242. break;
  243. }
  244. pthread_cond_signal(&c->cond_wakeup_background);
  245. pthread_cond_wait(&c->cond_wakeup_main, &c->mutex);
  246. }
  247. pthread_cond_signal(&c->cond_wakeup_background);
  248. pthread_mutex_unlock(&c->mutex);
  249. return ret;
  250. }
  251. static int async_read(URLContext *h, unsigned char *buf, int size)
  252. {
  253. return async_read_internal(h, buf, size, 0, NULL);
  254. }
  255. static void fifo_do_not_copy_func(void* dest, void* src, int size) {
  256. // do not copy
  257. }
  258. static int64_t async_seek(URLContext *h, int64_t pos, int whence)
  259. {
  260. Context *c = h->priv_data;
  261. AVFifoBuffer *fifo = c->fifo;
  262. int64_t ret;
  263. int64_t new_logical_pos;
  264. int fifo_size;
  265. if (whence == AVSEEK_SIZE) {
  266. av_log(h, AV_LOG_TRACE, "async_seek: AVSEEK_SIZE: %"PRId64"\n", (int64_t)c->logical_size);
  267. return c->logical_size;
  268. } else if (whence == SEEK_CUR) {
  269. av_log(h, AV_LOG_TRACE, "async_seek: %"PRId64"\n", pos);
  270. new_logical_pos = pos + c->logical_pos;
  271. } else if (whence == SEEK_SET){
  272. av_log(h, AV_LOG_TRACE, "async_seek: %"PRId64"\n", pos);
  273. new_logical_pos = pos;
  274. } else {
  275. return AVERROR(EINVAL);
  276. }
  277. if (new_logical_pos < 0)
  278. return AVERROR(EINVAL);
  279. fifo_size = av_fifo_size(fifo);
  280. if (new_logical_pos == c->logical_pos) {
  281. /* current position */
  282. return c->logical_pos;
  283. } else if ((new_logical_pos > c->logical_pos) &&
  284. (new_logical_pos < (c->logical_pos + fifo_size + SHORT_SEEK_THRESHOLD))) {
  285. /* fast seek */
  286. av_log(h, AV_LOG_TRACE, "async_seek: fask_seek %"PRId64" from %d dist:%d/%d\n",
  287. new_logical_pos, (int)c->logical_pos,
  288. (int)(new_logical_pos - c->logical_pos), fifo_size);
  289. async_read_internal(h, NULL, (int)(new_logical_pos - c->logical_pos), 1, fifo_do_not_copy_func);
  290. return c->logical_pos;
  291. } else if (c->logical_size <= 0) {
  292. /* can not seek */
  293. return AVERROR(EINVAL);
  294. } else if (new_logical_pos > c->logical_size) {
  295. /* beyond end */
  296. return AVERROR(EINVAL);
  297. }
  298. pthread_mutex_lock(&c->mutex);
  299. c->seek_request = 1;
  300. c->seek_pos = new_logical_pos;
  301. c->seek_whence = SEEK_SET;
  302. c->seek_completed = 0;
  303. c->seek_ret = 0;
  304. while (1) {
  305. if (async_check_interrupt(h)) {
  306. ret = AVERROR_EXIT;
  307. break;
  308. }
  309. if (c->seek_completed) {
  310. if (c->seek_ret >= 0)
  311. c->logical_pos = c->seek_ret;
  312. ret = c->seek_ret;
  313. break;
  314. }
  315. pthread_cond_signal(&c->cond_wakeup_background);
  316. pthread_cond_wait(&c->cond_wakeup_main, &c->mutex);
  317. }
  318. pthread_mutex_unlock(&c->mutex);
  319. return ret;
  320. }
  321. #define OFFSET(x) offsetof(Context, x)
  322. #define D AV_OPT_FLAG_DECODING_PARAM
  323. static const AVOption options[] = {
  324. {NULL},
  325. };
  326. #undef D
  327. #undef OFFSET
  328. static const AVClass async_context_class = {
  329. .class_name = "Async",
  330. .item_name = av_default_item_name,
  331. .option = options,
  332. .version = LIBAVUTIL_VERSION_INT,
  333. };
  334. URLProtocol ff_async_protocol = {
  335. .name = "async",
  336. .url_open2 = async_open,
  337. .url_read = async_read,
  338. .url_seek = async_seek,
  339. .url_close = async_close,
  340. .priv_data_size = sizeof(Context),
  341. .priv_data_class = &async_context_class,
  342. };
  343. #ifdef TEST
  344. #define TEST_SEEK_POS (1536)
  345. #define TEST_STREAM_SIZE (2048)
  346. typedef struct TestContext {
  347. AVClass *class;
  348. int64_t logical_pos;
  349. int64_t logical_size;
  350. /* options */
  351. int opt_read_error;
  352. } TestContext;
  353. static int async_test_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
  354. {
  355. TestContext *c = h->priv_data;
  356. c->logical_pos = 0;
  357. c->logical_size = TEST_STREAM_SIZE;
  358. return 0;
  359. }
  360. static int async_test_close(URLContext *h)
  361. {
  362. return 0;
  363. }
  364. static int async_test_read(URLContext *h, unsigned char *buf, int size)
  365. {
  366. TestContext *c = h->priv_data;
  367. int i;
  368. int read_len = 0;
  369. if (c->opt_read_error)
  370. return c->opt_read_error;
  371. if (c->logical_pos >= c->logical_size)
  372. return AVERROR_EOF;
  373. for (i = 0; i < size; ++i) {
  374. buf[i] = c->logical_pos & 0xFF;
  375. c->logical_pos++;
  376. read_len++;
  377. if (c->logical_pos >= c->logical_size)
  378. break;
  379. }
  380. return read_len;
  381. }
  382. static int64_t async_test_seek(URLContext *h, int64_t pos, int whence)
  383. {
  384. TestContext *c = h->priv_data;
  385. int64_t new_logical_pos;
  386. if (whence == AVSEEK_SIZE) {
  387. return c->logical_size;
  388. } else if (whence == SEEK_CUR) {
  389. new_logical_pos = pos + c->logical_pos;
  390. } else if (whence == SEEK_SET){
  391. new_logical_pos = pos;
  392. } else {
  393. return AVERROR(EINVAL);
  394. }
  395. if (new_logical_pos < 0)
  396. return AVERROR(EINVAL);
  397. c->logical_pos = new_logical_pos;
  398. return new_logical_pos;
  399. }
  400. #define OFFSET(x) offsetof(TestContext, x)
  401. #define D AV_OPT_FLAG_DECODING_PARAM
  402. static const AVOption async_test_options[] = {
  403. { "async-test-read-error", "cause read fail",
  404. OFFSET(opt_read_error), AV_OPT_TYPE_INT, { .i64 = 0 }, INT_MIN, INT_MAX, .flags = D },
  405. {NULL},
  406. };
  407. #undef D
  408. #undef OFFSET
  409. static const AVClass async_test_context_class = {
  410. .class_name = "Async-Test",
  411. .item_name = av_default_item_name,
  412. .option = async_test_options,
  413. .version = LIBAVUTIL_VERSION_INT,
  414. };
  415. URLProtocol ff_async_test_protocol = {
  416. .name = "async-test",
  417. .url_open2 = async_test_open,
  418. .url_read = async_test_read,
  419. .url_seek = async_test_seek,
  420. .url_close = async_test_close,
  421. .priv_data_size = sizeof(TestContext),
  422. .priv_data_class = &async_test_context_class,
  423. };
  424. int main(void)
  425. {
  426. URLContext *h = NULL;
  427. int i;
  428. int ret;
  429. int64_t size;
  430. int64_t pos;
  431. int64_t read_len;
  432. unsigned char buf[4096];
  433. AVDictionary *opts = NULL;
  434. ffurl_register_protocol(&ff_async_protocol);
  435. ffurl_register_protocol(&ff_async_test_protocol);
  436. /*
  437. * test normal read
  438. */
  439. ret = ffurl_open(&h, "async:async-test:", AVIO_FLAG_READ, NULL, NULL);
  440. printf("open: %d\n", ret);
  441. size = ffurl_size(h);
  442. printf("size: %"PRId64"\n", size);
  443. pos = ffurl_seek(h, 0, SEEK_CUR);
  444. read_len = 0;
  445. while (1) {
  446. ret = ffurl_read(h, buf, sizeof(buf));
  447. if (ret == AVERROR_EOF) {
  448. printf("read-error: AVERROR_EOF at %"PRId64"\n", ffurl_seek(h, 0, SEEK_CUR));
  449. break;
  450. }
  451. else if (ret == 0)
  452. break;
  453. else if (ret < 0) {
  454. printf("read-error: %d at %"PRId64"\n", ret, ffurl_seek(h, 0, SEEK_CUR));
  455. goto fail;
  456. } else {
  457. for (i = 0; i < ret; ++i) {
  458. if (buf[i] != (pos & 0xFF)) {
  459. printf("read-mismatch: actual %d, expecting %d, at %"PRId64"\n",
  460. (int)buf[i], (int)(pos & 0xFF), pos);
  461. break;
  462. }
  463. pos++;
  464. }
  465. }
  466. read_len += ret;
  467. }
  468. printf("read: %"PRId64"\n", read_len);
  469. /*
  470. * test normal seek
  471. */
  472. ret = ffurl_read(h, buf, 1);
  473. printf("read: %d\n", ret);
  474. pos = ffurl_seek(h, TEST_SEEK_POS, SEEK_SET);
  475. printf("seek: %"PRId64"\n", pos);
  476. read_len = 0;
  477. while (1) {
  478. ret = ffurl_read(h, buf, sizeof(buf));
  479. if (ret == AVERROR_EOF)
  480. break;
  481. else if (ret == 0)
  482. break;
  483. else if (ret < 0) {
  484. printf("read-error: %d at %"PRId64"\n", ret, ffurl_seek(h, 0, SEEK_CUR));
  485. goto fail;
  486. } else {
  487. for (i = 0; i < ret; ++i) {
  488. if (buf[i] != (pos & 0xFF)) {
  489. printf("read-mismatch: actual %d, expecting %d, at %"PRId64"\n",
  490. (int)buf[i], (int)(pos & 0xFF), pos);
  491. break;
  492. }
  493. pos++;
  494. }
  495. }
  496. read_len += ret;
  497. }
  498. printf("read: %"PRId64"\n", read_len);
  499. ret = ffurl_read(h, buf, 1);
  500. printf("read: %d\n", ret);
  501. /*
  502. * test read error
  503. */
  504. ffurl_close(h);
  505. av_dict_set_int(&opts, "async-test-read-error", -10000, 0);
  506. ret = ffurl_open(&h, "async:async-test:", AVIO_FLAG_READ, NULL, &opts);
  507. printf("open: %d\n", ret);
  508. ret = ffurl_read(h, buf, 1);
  509. printf("read: %d\n", ret);
  510. fail:
  511. av_dict_free(&opts);
  512. ffurl_close(h);
  513. return 0;
  514. }
  515. #endif