You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

559 lines
15KB

  1. /*
  2. * Input async protocol.
  3. * Copyright (c) 2015 Zhang Rui <bbcallen@gmail.com>
  4. *
  5. * This file is part of FFmpeg.
  6. *
  7. * FFmpeg is free software; you can redistribute it and/or
  8. * modify it under the terms of the GNU Lesser General Public
  9. * License as published by the Free Software Foundation; either
  10. * version 2.1 of the License, or (at your option) any later version.
  11. *
  12. * FFmpeg is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  15. * Lesser General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU Lesser General Public
  18. * License along with FFmpeg; if not, write to the Free Software
  19. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  20. *
  21. * Based on libavformat/cache.c by Michael Niedermayer
  22. */
  23. /**
  24. * @TODO
  25. * support timeout
  26. * support backward short seek
  27. * support work with concatdec, hls
  28. */
  29. #include "libavutil/avassert.h"
  30. #include "libavutil/avstring.h"
  31. #include "libavutil/error.h"
  32. #include "libavutil/fifo.h"
  33. #include "libavutil/log.h"
  34. #include "libavutil/opt.h"
  35. #include "url.h"
  36. #include <stdint.h>
  37. #include <pthread.h>
  38. #if HAVE_UNISTD_H
  39. #include <unistd.h>
  40. #endif
  41. #define BUFFER_CAPACITY (4 * 1024 * 1024)
  42. #define SHORT_SEEK_THRESHOLD (256 * 1024)
  43. typedef struct Context {
  44. AVClass *class;
  45. URLContext *inner;
  46. int seek_request;
  47. size_t seek_pos;
  48. int seek_whence;
  49. int seek_completed;
  50. int64_t seek_ret;
  51. int io_error;
  52. int io_eof_reached;
  53. size_t logical_pos;
  54. size_t logical_size;
  55. AVFifoBuffer *fifo;
  56. pthread_cond_t cond_wakeup_main;
  57. pthread_cond_t cond_wakeup_background;
  58. pthread_mutex_t mutex;
  59. pthread_t async_buffer_thread;
  60. int abort_request;
  61. AVIOInterruptCB interrupt_callback;
  62. } Context;
  63. static int async_check_interrupt(void *arg)
  64. {
  65. URLContext *h = arg;
  66. Context *c = h->priv_data;
  67. if (c->abort_request)
  68. return 1;
  69. if (ff_check_interrupt(&c->interrupt_callback))
  70. c->abort_request = 1;
  71. return c->abort_request;
  72. }
  73. static void *async_buffer_task(void *arg)
  74. {
  75. URLContext *h = arg;
  76. Context *c = h->priv_data;
  77. AVFifoBuffer *fifo = c->fifo;
  78. int ret = 0;
  79. while (1) {
  80. int fifo_space, to_copy;
  81. pthread_mutex_lock(&c->mutex);
  82. if (async_check_interrupt(h)) {
  83. c->io_eof_reached = 1;
  84. c->io_error = AVERROR_EXIT;
  85. pthread_mutex_unlock(&c->mutex);
  86. break;
  87. }
  88. if (c->seek_request) {
  89. ret = ffurl_seek(c->inner, c->seek_pos, c->seek_whence);
  90. if (ret < 0) {
  91. c->io_eof_reached = 1;
  92. c->io_error = ret;
  93. } else {
  94. c->io_eof_reached = 0;
  95. c->io_error = 0;
  96. }
  97. c->seek_completed = 1;
  98. c->seek_ret = ret;
  99. c->seek_request = 0;
  100. av_fifo_reset(fifo);
  101. pthread_cond_signal(&c->cond_wakeup_main);
  102. pthread_mutex_unlock(&c->mutex);
  103. continue;
  104. }
  105. fifo_space = av_fifo_space(fifo);
  106. if (c->io_eof_reached || fifo_space <= 0) {
  107. pthread_cond_signal(&c->cond_wakeup_main);
  108. pthread_cond_wait(&c->cond_wakeup_background, &c->mutex);
  109. pthread_mutex_unlock(&c->mutex);
  110. continue;
  111. }
  112. pthread_mutex_unlock(&c->mutex);
  113. to_copy = FFMIN(4096, fifo_space);
  114. ret = av_fifo_generic_write(fifo, c->inner, to_copy, (void *)ffurl_read);
  115. pthread_mutex_lock(&c->mutex);
  116. if (ret <= 0) {
  117. c->io_eof_reached = 1;
  118. if (ret < 0) {
  119. c->io_error = ret;
  120. }
  121. }
  122. pthread_cond_signal(&c->cond_wakeup_main);
  123. pthread_mutex_unlock(&c->mutex);
  124. }
  125. return NULL;
  126. }
  127. static int async_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
  128. {
  129. Context *c = h->priv_data;
  130. int ret;
  131. AVIOInterruptCB interrupt_callback = {.callback = async_check_interrupt, .opaque = h};
  132. av_strstart(arg, "async:", &arg);
  133. c->fifo = av_fifo_alloc(BUFFER_CAPACITY);
  134. if (!c->fifo) {
  135. ret = AVERROR(ENOMEM);
  136. goto fifo_fail;
  137. }
  138. /* wrap interrupt callback */
  139. c->interrupt_callback = h->interrupt_callback;
  140. ret = ffurl_open(&c->inner, arg, flags, &interrupt_callback, options);
  141. if (ret != 0) {
  142. av_log(h, AV_LOG_ERROR, "ffurl_open failed : %s, %s\n", strerror(ret), arg);
  143. goto url_fail;
  144. }
  145. c->logical_size = ffurl_size(c->inner);
  146. h->is_streamed = c->inner->is_streamed;
  147. ret = pthread_mutex_init(&c->mutex, NULL);
  148. if (ret != 0) {
  149. av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
  150. goto mutex_fail;
  151. }
  152. ret = pthread_cond_init(&c->cond_wakeup_main, NULL);
  153. if (ret != 0) {
  154. av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
  155. goto cond_wakeup_main_fail;
  156. }
  157. ret = pthread_cond_init(&c->cond_wakeup_background, NULL);
  158. if (ret != 0) {
  159. av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
  160. goto cond_wakeup_background_fail;
  161. }
  162. ret = pthread_create(&c->async_buffer_thread, NULL, async_buffer_task, h);
  163. if (ret) {
  164. av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
  165. goto thread_fail;
  166. }
  167. return 0;
  168. thread_fail:
  169. pthread_cond_destroy(&c->cond_wakeup_background);
  170. cond_wakeup_background_fail:
  171. pthread_cond_destroy(&c->cond_wakeup_main);
  172. cond_wakeup_main_fail:
  173. pthread_mutex_destroy(&c->mutex);
  174. mutex_fail:
  175. ffurl_close(c->inner);
  176. url_fail:
  177. av_fifo_freep(&c->fifo);
  178. fifo_fail:
  179. return ret;
  180. }
  181. static int async_close(URLContext *h)
  182. {
  183. Context *c = h->priv_data;
  184. int ret;
  185. pthread_mutex_lock(&c->mutex);
  186. c->abort_request = 1;
  187. pthread_cond_signal(&c->cond_wakeup_background);
  188. pthread_mutex_unlock(&c->mutex);
  189. ret = pthread_join(c->async_buffer_thread, NULL);
  190. if (ret != 0)
  191. av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
  192. pthread_cond_destroy(&c->cond_wakeup_background);
  193. pthread_cond_destroy(&c->cond_wakeup_main);
  194. pthread_mutex_destroy(&c->mutex);
  195. ffurl_close(c->inner);
  196. av_fifo_freep(&c->fifo);
  197. return 0;
  198. }
  199. static int async_read_internal(URLContext *h, void *dest, int size, int read_complete,
  200. void (*func)(void*, void*, int))
  201. {
  202. Context *c = h->priv_data;
  203. AVFifoBuffer *fifo = c->fifo;
  204. int to_read = size;
  205. int ret = 0;
  206. pthread_mutex_lock(&c->mutex);
  207. while (to_read > 0) {
  208. int fifo_size, to_copy;
  209. if (async_check_interrupt(h)) {
  210. ret = AVERROR_EXIT;
  211. break;
  212. }
  213. fifo_size = av_fifo_size(fifo);
  214. to_copy = FFMIN(to_read, fifo_size);
  215. if (to_copy > 0) {
  216. av_fifo_generic_read(fifo, dest, to_copy, func);
  217. if (!func)
  218. dest = (uint8_t *)dest + to_copy;
  219. c->logical_pos += to_copy;
  220. to_read -= to_copy;
  221. ret = size - to_read;
  222. if (to_read <= 0 || !read_complete)
  223. break;
  224. } else if (c->io_eof_reached) {
  225. if (ret <= 0)
  226. ret = AVERROR_EOF;
  227. break;
  228. }
  229. pthread_cond_signal(&c->cond_wakeup_background);
  230. pthread_cond_wait(&c->cond_wakeup_main, &c->mutex);
  231. }
  232. pthread_cond_signal(&c->cond_wakeup_background);
  233. pthread_mutex_unlock(&c->mutex);
  234. return ret;
  235. }
  236. static int async_read(URLContext *h, unsigned char *buf, int size)
  237. {
  238. return async_read_internal(h, buf, size, 0, NULL);
  239. }
  240. static void fifo_do_not_copy_func(void* dest, void* src, int size) {
  241. // do not copy
  242. }
  243. static int64_t async_seek(URLContext *h, int64_t pos, int whence)
  244. {
  245. Context *c = h->priv_data;
  246. AVFifoBuffer *fifo = c->fifo;
  247. int64_t ret;
  248. int64_t new_logical_pos;
  249. int fifo_size;
  250. if (whence == AVSEEK_SIZE) {
  251. av_log(h, AV_LOG_TRACE, "async_seek: AVSEEK_SIZE: %"PRId64"\n", (int64_t)c->logical_size);
  252. return c->logical_size;
  253. } else if (whence == SEEK_CUR) {
  254. av_log(h, AV_LOG_TRACE, "async_seek: %"PRId64"\n", pos);
  255. new_logical_pos = pos + c->logical_pos;
  256. } else if (whence == SEEK_SET){
  257. av_log(h, AV_LOG_TRACE, "async_seek: %"PRId64"\n", pos);
  258. new_logical_pos = pos;
  259. } else {
  260. return AVERROR(EINVAL);
  261. }
  262. if (new_logical_pos < 0)
  263. return AVERROR(EINVAL);
  264. fifo_size = av_fifo_size(fifo);
  265. if (new_logical_pos == c->logical_pos) {
  266. /* current position */
  267. return c->logical_pos;
  268. } else if ((new_logical_pos > c->logical_pos) &&
  269. (new_logical_pos < (c->logical_pos + fifo_size + SHORT_SEEK_THRESHOLD))) {
  270. /* fast seek */
  271. av_log(h, AV_LOG_TRACE, "async_seek: fask_seek %"PRId64" from %d dist:%d/%d\n",
  272. new_logical_pos, (int)c->logical_pos,
  273. (int)(new_logical_pos - c->logical_pos), fifo_size);
  274. async_read_internal(h, NULL, new_logical_pos - c->logical_pos, 1, fifo_do_not_copy_func);
  275. return c->logical_pos;
  276. } else if (c->logical_size <= 0) {
  277. /* can not seek */
  278. return AVERROR(EINVAL);
  279. } else if (new_logical_pos > c->logical_size) {
  280. /* beyond end */
  281. return AVERROR(EINVAL);
  282. }
  283. pthread_mutex_lock(&c->mutex);
  284. c->seek_request = 1;
  285. c->seek_pos = new_logical_pos;
  286. c->seek_whence = SEEK_SET;
  287. c->seek_completed = 0;
  288. c->seek_ret = 0;
  289. while (1) {
  290. if (async_check_interrupt(h)) {
  291. ret = AVERROR_EXIT;
  292. break;
  293. }
  294. if (c->seek_completed) {
  295. if (c->seek_ret >= 0)
  296. c->logical_pos = c->seek_ret;
  297. ret = c->seek_ret;
  298. break;
  299. }
  300. pthread_cond_signal(&c->cond_wakeup_background);
  301. pthread_cond_wait(&c->cond_wakeup_main, &c->mutex);
  302. }
  303. pthread_mutex_unlock(&c->mutex);
  304. return ret;
  305. }
  306. #define OFFSET(x) offsetof(Context, x)
  307. #define D AV_OPT_FLAG_DECODING_PARAM
  308. static const AVOption options[] = {
  309. {NULL},
  310. };
  311. static const AVClass async_context_class = {
  312. .class_name = "Async",
  313. .item_name = av_default_item_name,
  314. .option = options,
  315. .version = LIBAVUTIL_VERSION_INT,
  316. };
  317. URLProtocol ff_async_protocol = {
  318. .name = "async",
  319. .url_open2 = async_open,
  320. .url_read = async_read,
  321. .url_seek = async_seek,
  322. .url_close = async_close,
  323. .priv_data_size = sizeof(Context),
  324. .priv_data_class = &async_context_class,
  325. };
  326. #ifdef TEST
  327. #define TEST_SEEK_POS (1536)
  328. #define TEST_STREAM_SIZE (2048)
  329. typedef struct TestContext {
  330. AVClass *class;
  331. size_t logical_pos;
  332. size_t logical_size;
  333. } TestContext;
  334. static int async_test_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
  335. {
  336. TestContext *c = h->priv_data;
  337. c->logical_pos = 0;
  338. c->logical_size = TEST_STREAM_SIZE;
  339. return 0;
  340. }
  341. static int async_test_close(URLContext *h)
  342. {
  343. return 0;
  344. }
  345. static int async_test_read(URLContext *h, unsigned char *buf, int size)
  346. {
  347. TestContext *c = h->priv_data;
  348. int i;
  349. int read_len = 0;
  350. if (c->logical_pos >= c->logical_size)
  351. return AVERROR_EOF;
  352. for (i = 0; i < size; ++i) {
  353. buf[i] = c->logical_pos & 0xFF;
  354. c->logical_pos++;
  355. read_len++;
  356. if (c->logical_pos >= c->logical_size)
  357. break;
  358. }
  359. return read_len;
  360. }
  361. static int64_t async_test_seek(URLContext *h, int64_t pos, int whence)
  362. {
  363. TestContext *c = h->priv_data;
  364. int64_t new_logical_pos;
  365. if (whence == AVSEEK_SIZE) {
  366. return c->logical_size;
  367. } else if (whence == SEEK_CUR) {
  368. new_logical_pos = pos + c->logical_pos;
  369. } else if (whence == SEEK_SET){
  370. new_logical_pos = pos;
  371. } else {
  372. return AVERROR(EINVAL);
  373. }
  374. if (new_logical_pos < 0)
  375. return AVERROR(EINVAL);
  376. c->logical_pos = new_logical_pos;
  377. return new_logical_pos;
  378. }
  379. static const AVClass async_test_context_class = {
  380. .class_name = "Async-Test",
  381. .item_name = av_default_item_name,
  382. .version = LIBAVUTIL_VERSION_INT,
  383. };
  384. URLProtocol ff_async_test_protocol = {
  385. .name = "async-test",
  386. .url_open2 = async_test_open,
  387. .url_read = async_test_read,
  388. .url_seek = async_test_seek,
  389. .url_close = async_test_close,
  390. .priv_data_size = sizeof(TestContext),
  391. .priv_data_class = &async_test_context_class,
  392. };
  393. int main(void)
  394. {
  395. URLContext *h = NULL;
  396. int i;
  397. int ret;
  398. int64_t size;
  399. int64_t pos;
  400. int64_t read_len;
  401. unsigned char buf[4096];
  402. ffurl_register_protocol(&ff_async_protocol);
  403. ffurl_register_protocol(&ff_async_test_protocol);
  404. ret = ffurl_open(&h, "async:async-test:", AVIO_FLAG_READ, NULL, NULL);
  405. printf("open: %d\n", ret);
  406. size = ffurl_size(h);
  407. printf("size: %"PRId64"\n", size);
  408. pos = ffurl_seek(h, 0, SEEK_CUR);
  409. read_len = 0;
  410. while (1) {
  411. ret = ffurl_read(h, buf, sizeof(buf));
  412. if (ret == AVERROR_EOF) {
  413. printf("read-error: AVERROR_EOF at %"PRId64"\n", ffurl_seek(h, 0, SEEK_CUR));
  414. break;
  415. }
  416. else if (ret == 0)
  417. break;
  418. else if (ret < 0) {
  419. printf("read-error: %d at %"PRId64"\n", ret, ffurl_seek(h, 0, SEEK_CUR));
  420. goto fail;
  421. } else {
  422. for (i = 0; i < ret; ++i) {
  423. if (buf[i] != (pos & 0xFF)) {
  424. printf("read-mismatch: actual %d, expecting %d, at %"PRId64"\n",
  425. (int)buf[i], (int)(pos & 0xFF), pos);
  426. break;
  427. }
  428. pos++;
  429. }
  430. }
  431. read_len += ret;
  432. }
  433. printf("read: %"PRId64"\n", read_len);
  434. ret = ffurl_read(h, buf, 1);
  435. printf("read: %d\n", ret);
  436. pos = ffurl_seek(h, TEST_SEEK_POS, SEEK_SET);
  437. printf("seek: %"PRId64"\n", pos);
  438. read_len = 0;
  439. while (1) {
  440. ret = ffurl_read(h, buf, sizeof(buf));
  441. if (ret == AVERROR_EOF)
  442. break;
  443. else if (ret == 0)
  444. break;
  445. else if (ret < 0) {
  446. printf("read-error: %d at %"PRId64"\n", ret, ffurl_seek(h, 0, SEEK_CUR));
  447. goto fail;
  448. } else {
  449. for (i = 0; i < ret; ++i) {
  450. if (buf[i] != (pos & 0xFF)) {
  451. printf("read-mismatch: actual %d, expecting %d, at %"PRId64"\n",
  452. (int)buf[i], (int)(pos & 0xFF), pos);
  453. break;
  454. }
  455. pos++;
  456. }
  457. }
  458. read_len += ret;
  459. }
  460. printf("read: %"PRId64"\n", read_len);
  461. ret = ffurl_read(h, buf, 1);
  462. printf("read: %d\n", ret);
  463. fail:
  464. ffurl_close(h);
  465. return 0;
  466. }
  467. #endif