You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

558 lines
15KB

  1. /*
  2. * Input async protocol.
  3. * Copyright (c) 2015 Zhang Rui <bbcallen@gmail.com>
  4. *
  5. * This file is part of FFmpeg.
  6. *
  7. * FFmpeg is free software; you can redistribute it and/or
  8. * modify it under the terms of the GNU Lesser General Public
  9. * License as published by the Free Software Foundation; either
  10. * version 2.1 of the License, or (at your option) any later version.
  11. *
  12. * FFmpeg is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  15. * Lesser General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU Lesser General Public
  18. * License along with FFmpeg; if not, write to the Free Software
  19. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  20. *
  21. * Based on libavformat/cache.c by Michael Niedermayer
  22. */
  23. /**
  24. * @TODO
  25. * support timeout
  26. * support backward short seek
  27. * support work with concatdec, hls
  28. */
  29. #include "libavutil/avassert.h"
  30. #include "libavutil/avstring.h"
  31. #include "libavutil/error.h"
  32. #include "libavutil/fifo.h"
  33. #include "libavutil/log.h"
  34. #include "libavutil/opt.h"
  35. #include "url.h"
  36. #include <stdint.h>
  37. #include <pthread.h>
  38. #if HAVE_UNISTD_H
  39. #include <unistd.h>
  40. #endif
  41. #define BUFFER_CAPACITY (4 * 1024 * 1024)
  42. #define SHORT_SEEK_THRESHOLD (256 * 1024)
  43. typedef struct Context {
  44. AVClass *class;
  45. URLContext *inner;
  46. int seek_request;
  47. size_t seek_pos;
  48. int seek_whence;
  49. int seek_completed;
  50. int64_t seek_ret;
  51. int io_error;
  52. int io_eof_reached;
  53. size_t logical_pos;
  54. size_t logical_size;
  55. AVFifoBuffer *fifo;
  56. pthread_cond_t cond_wakeup_main;
  57. pthread_cond_t cond_wakeup_background;
  58. pthread_mutex_t mutex;
  59. pthread_t async_buffer_thread;
  60. int abort_request;
  61. AVIOInterruptCB interrupt_callback;
  62. } Context;
  63. static int async_check_interrupt(void *arg)
  64. {
  65. URLContext *h = arg;
  66. Context *c = h->priv_data;
  67. if (c->abort_request)
  68. return 1;
  69. if (ff_check_interrupt(&c->interrupt_callback))
  70. c->abort_request = 1;
  71. return c->abort_request;
  72. }
  73. static void *async_buffer_task(void *arg)
  74. {
  75. URLContext *h = arg;
  76. Context *c = h->priv_data;
  77. AVFifoBuffer *fifo = c->fifo;
  78. int ret = 0;
  79. while (1) {
  80. int fifo_space, to_copy;
  81. if (async_check_interrupt(h)) {
  82. c->io_eof_reached = 1;
  83. c->io_error = AVERROR_EXIT;
  84. break;
  85. }
  86. if (c->seek_request) {
  87. pthread_mutex_lock(&c->mutex);
  88. ret = ffurl_seek(c->inner, c->seek_pos, c->seek_whence);
  89. if (ret < 0) {
  90. c->io_eof_reached = 1;
  91. c->io_error = ret;
  92. } else {
  93. c->io_eof_reached = 0;
  94. c->io_error = 0;
  95. }
  96. c->seek_completed = 1;
  97. c->seek_ret = ret;
  98. c->seek_request = 0;
  99. av_fifo_reset(fifo);
  100. pthread_cond_signal(&c->cond_wakeup_main);
  101. pthread_mutex_unlock(&c->mutex);
  102. continue;
  103. }
  104. fifo_space = av_fifo_space(fifo);
  105. if (c->io_eof_reached || fifo_space <= 0) {
  106. pthread_mutex_lock(&c->mutex);
  107. pthread_cond_signal(&c->cond_wakeup_main);
  108. pthread_cond_wait(&c->cond_wakeup_background, &c->mutex);
  109. pthread_mutex_unlock(&c->mutex);
  110. continue;
  111. }
  112. to_copy = FFMIN(4096, fifo_space);
  113. ret = av_fifo_generic_write(fifo, c->inner, to_copy, (void *)ffurl_read);
  114. if (ret <= 0) {
  115. c->io_eof_reached = 1;
  116. if (ret < 0) {
  117. c->io_error = ret;
  118. }
  119. }
  120. pthread_mutex_lock(&c->mutex);
  121. pthread_cond_signal(&c->cond_wakeup_main);
  122. pthread_mutex_unlock(&c->mutex);
  123. }
  124. return NULL;
  125. }
  126. static int async_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
  127. {
  128. Context *c = h->priv_data;
  129. int ret;
  130. AVIOInterruptCB interrupt_callback = {.callback = async_check_interrupt, .opaque = h};
  131. av_strstart(arg, "async:", &arg);
  132. c->fifo = av_fifo_alloc(BUFFER_CAPACITY);
  133. if (!c->fifo) {
  134. ret = AVERROR(ENOMEM);
  135. goto fifo_fail;
  136. }
  137. /* wrap interrupt callback */
  138. c->interrupt_callback = h->interrupt_callback;
  139. ret = ffurl_open(&c->inner, arg, flags, &interrupt_callback, options);
  140. if (ret != 0) {
  141. av_log(h, AV_LOG_ERROR, "ffurl_open failed : %s, %s\n", strerror(ret), arg);
  142. goto url_fail;
  143. }
  144. c->logical_size = ffurl_size(c->inner);
  145. h->is_streamed = c->inner->is_streamed;
  146. ret = pthread_mutex_init(&c->mutex, NULL);
  147. if (ret != 0) {
  148. av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
  149. goto mutex_fail;
  150. }
  151. ret = pthread_cond_init(&c->cond_wakeup_main, NULL);
  152. if (ret != 0) {
  153. av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
  154. goto cond_wakeup_main_fail;
  155. }
  156. ret = pthread_cond_init(&c->cond_wakeup_background, NULL);
  157. if (ret != 0) {
  158. av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
  159. goto cond_wakeup_background_fail;
  160. }
  161. ret = pthread_create(&c->async_buffer_thread, NULL, async_buffer_task, h);
  162. if (ret) {
  163. av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
  164. goto thread_fail;
  165. }
  166. return 0;
  167. thread_fail:
  168. pthread_cond_destroy(&c->cond_wakeup_background);
  169. cond_wakeup_background_fail:
  170. pthread_cond_destroy(&c->cond_wakeup_main);
  171. cond_wakeup_main_fail:
  172. pthread_mutex_destroy(&c->mutex);
  173. mutex_fail:
  174. ffurl_close(c->inner);
  175. url_fail:
  176. av_fifo_freep(&c->fifo);
  177. fifo_fail:
  178. return ret;
  179. }
  180. static int async_close(URLContext *h)
  181. {
  182. Context *c = h->priv_data;
  183. int ret;
  184. pthread_mutex_lock(&c->mutex);
  185. c->abort_request = 1;
  186. pthread_cond_signal(&c->cond_wakeup_background);
  187. pthread_mutex_unlock(&c->mutex);
  188. ret = pthread_join(c->async_buffer_thread, NULL);
  189. if (ret != 0)
  190. av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
  191. pthread_cond_destroy(&c->cond_wakeup_background);
  192. pthread_cond_destroy(&c->cond_wakeup_main);
  193. pthread_mutex_destroy(&c->mutex);
  194. ffurl_close(c->inner);
  195. av_fifo_freep(&c->fifo);
  196. return 0;
  197. }
  198. static int async_read_internal(URLContext *h, void *dest, int size, int read_complete,
  199. void (*func)(void*, void*, int))
  200. {
  201. Context *c = h->priv_data;
  202. AVFifoBuffer *fifo = c->fifo;
  203. int to_read = size;
  204. int ret = 0;
  205. pthread_mutex_lock(&c->mutex);
  206. while (to_read > 0) {
  207. int fifo_size, to_copy;
  208. if (async_check_interrupt(h)) {
  209. ret = AVERROR_EXIT;
  210. break;
  211. }
  212. fifo_size = av_fifo_size(fifo);
  213. to_copy = FFMIN(to_read, fifo_size);
  214. if (to_copy > 0) {
  215. av_fifo_generic_read(fifo, dest, to_copy, func);
  216. if (!func)
  217. dest = (uint8_t *)dest + to_copy;
  218. c->logical_pos += to_copy;
  219. to_read -= to_copy;
  220. ret = size - to_read;
  221. if (to_read <= 0 || !read_complete)
  222. break;
  223. } else if (c->io_eof_reached) {
  224. if (ret <= 0)
  225. ret = AVERROR_EOF;
  226. break;
  227. }
  228. pthread_cond_signal(&c->cond_wakeup_background);
  229. pthread_cond_wait(&c->cond_wakeup_main, &c->mutex);
  230. }
  231. pthread_cond_signal(&c->cond_wakeup_background);
  232. pthread_mutex_unlock(&c->mutex);
  233. return ret;
  234. }
  235. static int async_read(URLContext *h, unsigned char *buf, int size)
  236. {
  237. return async_read_internal(h, buf, size, 0, NULL);
  238. }
  239. static void fifo_do_not_copy_func(void* dest, void* src, int size) {
  240. // do not copy
  241. }
  242. static int64_t async_seek(URLContext *h, int64_t pos, int whence)
  243. {
  244. Context *c = h->priv_data;
  245. AVFifoBuffer *fifo = c->fifo;
  246. int64_t ret;
  247. int64_t new_logical_pos;
  248. int fifo_size;
  249. if (whence == AVSEEK_SIZE) {
  250. av_log(h, AV_LOG_TRACE, "async_seek: AVSEEK_SIZE: %"PRId64"\n", (int64_t)c->logical_size);
  251. return c->logical_size;
  252. } else if (whence == SEEK_CUR) {
  253. av_log(h, AV_LOG_TRACE, "async_seek: %"PRId64"\n", pos);
  254. new_logical_pos = pos + c->logical_pos;
  255. } else if (whence == SEEK_SET){
  256. av_log(h, AV_LOG_TRACE, "async_seek: %"PRId64"\n", pos);
  257. new_logical_pos = pos;
  258. } else {
  259. return AVERROR(EINVAL);
  260. }
  261. if (new_logical_pos < 0)
  262. return AVERROR(EINVAL);
  263. fifo_size = av_fifo_size(fifo);
  264. if (new_logical_pos == c->logical_pos) {
  265. /* current position */
  266. return c->logical_pos;
  267. } else if ((new_logical_pos > c->logical_pos) &&
  268. (new_logical_pos < (c->logical_pos + fifo_size + SHORT_SEEK_THRESHOLD))) {
  269. /* fast seek */
  270. av_log(h, AV_LOG_TRACE, "async_seek: fask_seek %"PRId64" from %d dist:%d/%d\n",
  271. new_logical_pos, (int)c->logical_pos,
  272. (int)(new_logical_pos - c->logical_pos), fifo_size);
  273. async_read_internal(h, NULL, new_logical_pos - c->logical_pos, 1, fifo_do_not_copy_func);
  274. return c->logical_pos;
  275. } else if (c->logical_size <= 0) {
  276. /* can not seek */
  277. return AVERROR(EINVAL);
  278. } else if (new_logical_pos > c->logical_size) {
  279. /* beyond end */
  280. return AVERROR(EINVAL);
  281. }
  282. pthread_mutex_lock(&c->mutex);
  283. c->seek_request = 1;
  284. c->seek_pos = new_logical_pos;
  285. c->seek_whence = SEEK_SET;
  286. c->seek_completed = 0;
  287. c->seek_ret = 0;
  288. while (1) {
  289. if (async_check_interrupt(h)) {
  290. ret = AVERROR_EXIT;
  291. break;
  292. }
  293. if (c->seek_completed) {
  294. if (c->seek_ret >= 0)
  295. c->logical_pos = c->seek_ret;
  296. ret = c->seek_ret;
  297. break;
  298. }
  299. pthread_cond_signal(&c->cond_wakeup_background);
  300. pthread_cond_wait(&c->cond_wakeup_main, &c->mutex);
  301. }
  302. pthread_mutex_unlock(&c->mutex);
  303. return ret;
  304. }
  305. #define OFFSET(x) offsetof(Context, x)
  306. #define D AV_OPT_FLAG_DECODING_PARAM
  307. static const AVOption options[] = {
  308. {NULL},
  309. };
  310. static const AVClass async_context_class = {
  311. .class_name = "Async",
  312. .item_name = av_default_item_name,
  313. .option = options,
  314. .version = LIBAVUTIL_VERSION_INT,
  315. };
  316. URLProtocol ff_async_protocol = {
  317. .name = "async",
  318. .url_open2 = async_open,
  319. .url_read = async_read,
  320. .url_seek = async_seek,
  321. .url_close = async_close,
  322. .priv_data_size = sizeof(Context),
  323. .priv_data_class = &async_context_class,
  324. };
  325. #ifdef TEST
  326. #define TEST_SEEK_POS (1536)
  327. #define TEST_STREAM_SIZE (2048)
  328. typedef struct TestContext {
  329. AVClass *class;
  330. size_t logical_pos;
  331. size_t logical_size;
  332. } TestContext;
  333. static int async_test_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
  334. {
  335. TestContext *c = h->priv_data;
  336. c->logical_pos = 0;
  337. c->logical_size = TEST_STREAM_SIZE;
  338. return 0;
  339. }
  340. static int async_test_close(URLContext *h)
  341. {
  342. return 0;
  343. }
  344. static int async_test_read(URLContext *h, unsigned char *buf, int size)
  345. {
  346. TestContext *c = h->priv_data;
  347. int i;
  348. int read_len = 0;
  349. if (c->logical_pos >= c->logical_size)
  350. return AVERROR_EOF;
  351. for (i = 0; i < size; ++i) {
  352. buf[i] = c->logical_pos & 0xFF;
  353. c->logical_pos++;
  354. read_len++;
  355. if (c->logical_pos >= c->logical_size)
  356. break;
  357. }
  358. return read_len;
  359. }
  360. static int64_t async_test_seek(URLContext *h, int64_t pos, int whence)
  361. {
  362. TestContext *c = h->priv_data;
  363. int64_t new_logical_pos;
  364. if (whence == AVSEEK_SIZE) {
  365. return c->logical_size;
  366. } else if (whence == SEEK_CUR) {
  367. new_logical_pos = pos + c->logical_pos;
  368. } else if (whence == SEEK_SET){
  369. new_logical_pos = pos;
  370. } else {
  371. return AVERROR(EINVAL);
  372. }
  373. if (new_logical_pos < 0)
  374. return AVERROR(EINVAL);
  375. c->logical_pos = new_logical_pos;
  376. return new_logical_pos;
  377. }
  378. static const AVClass async_test_context_class = {
  379. .class_name = "Async-Test",
  380. .item_name = av_default_item_name,
  381. .version = LIBAVUTIL_VERSION_INT,
  382. };
  383. URLProtocol ff_async_test_protocol = {
  384. .name = "async-test",
  385. .url_open2 = async_test_open,
  386. .url_read = async_test_read,
  387. .url_seek = async_test_seek,
  388. .url_close = async_test_close,
  389. .priv_data_size = sizeof(TestContext),
  390. .priv_data_class = &async_test_context_class,
  391. };
  392. int main(void)
  393. {
  394. URLContext *h = NULL;
  395. int i;
  396. int ret;
  397. int64_t size;
  398. int64_t pos;
  399. int64_t read_len;
  400. unsigned char buf[4096];
  401. ffurl_register_protocol(&ff_async_protocol);
  402. ffurl_register_protocol(&ff_async_test_protocol);
  403. ret = ffurl_open(&h, "async:async-test:", AVIO_FLAG_READ, NULL, NULL);
  404. printf("open: %d\n", ret);
  405. size = ffurl_size(h);
  406. printf("size: %"PRId64"\n", size);
  407. pos = ffurl_seek(h, 0, SEEK_CUR);
  408. read_len = 0;
  409. while (1) {
  410. ret = ffurl_read(h, buf, sizeof(buf));
  411. if (ret == AVERROR_EOF) {
  412. printf("read-error: AVERROR_EOF at %"PRId64"\n", ffurl_seek(h, 0, SEEK_CUR));
  413. break;
  414. }
  415. else if (ret == 0)
  416. break;
  417. else if (ret < 0) {
  418. printf("read-error: %d at %"PRId64"\n", ret, ffurl_seek(h, 0, SEEK_CUR));
  419. goto fail;
  420. } else {
  421. for (i = 0; i < ret; ++i) {
  422. if (buf[i] != (pos & 0xFF)) {
  423. printf("read-mismatch: actual %d, expecting %d, at %"PRId64"\n",
  424. (int)buf[i], (int)(pos & 0xFF), pos);
  425. break;
  426. }
  427. pos++;
  428. }
  429. }
  430. read_len += ret;
  431. }
  432. printf("read: %"PRId64"\n", read_len);
  433. ret = ffurl_read(h, buf, 1);
  434. printf("read: %d\n", ret);
  435. pos = ffurl_seek(h, TEST_SEEK_POS, SEEK_SET);
  436. printf("seek: %"PRId64"\n", pos);
  437. read_len = 0;
  438. while (1) {
  439. ret = ffurl_read(h, buf, sizeof(buf));
  440. if (ret == AVERROR_EOF)
  441. break;
  442. else if (ret == 0)
  443. break;
  444. else if (ret < 0) {
  445. printf("read-error: %d at %"PRId64"\n", ret, ffurl_seek(h, 0, SEEK_CUR));
  446. goto fail;
  447. } else {
  448. for (i = 0; i < ret; ++i) {
  449. if (buf[i] != (pos & 0xFF)) {
  450. printf("read-mismatch: actual %d, expecting %d, at %"PRId64"\n",
  451. (int)buf[i], (int)(pos & 0xFF), pos);
  452. break;
  453. }
  454. pos++;
  455. }
  456. }
  457. read_len += ret;
  458. }
  459. printf("read: %"PRId64"\n", read_len);
  460. ret = ffurl_read(h, buf, 1);
  461. printf("read: %d\n", ret);
  462. fail:
  463. ffurl_close(h);
  464. return 0;
  465. }
  466. #endif