You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

388 lines
11KB

  1. /*
  2. * Input async protocol.
  3. * Copyright (c) 2015 Zhang Rui <bbcallen@gmail.com>
  4. *
  5. * This file is part of FFmpeg.
  6. *
  7. * FFmpeg is free software; you can redistribute it and/or
  8. * modify it under the terms of the GNU Lesser General Public
  9. * License as published by the Free Software Foundation; either
  10. * version 2.1 of the License, or (at your option) any later version.
  11. *
  12. * FFmpeg is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  15. * Lesser General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU Lesser General Public
  18. * License along with FFmpeg; if not, write to the Free Software
  19. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  20. *
  21. * Based on libavformat/cache.c by Michael Niedermayer
  22. */
  23. /**
  24. * @TODO
  25. * support timeout
  26. * support backward short seek
  27. * support work with concatdec, hls
  28. */
  29. #include "libavutil/avassert.h"
  30. #include "libavutil/avstring.h"
  31. #include "libavutil/error.h"
  32. #include "libavutil/fifo.h"
  33. #include "libavutil/log.h"
  34. #include "libavutil/opt.h"
  35. #include "url.h"
  36. #include <stdint.h>
  37. #include <pthread.h>
  38. #if HAVE_UNISTD_H
  39. #include <unistd.h>
  40. #endif
  41. #define BUFFER_CAPACITY (4 * 1024 * 1024)
  42. #define SHORT_SEEK_THRESHOLD (256 * 1024)
  43. typedef struct Context {
  44. AVClass *class;
  45. URLContext *inner;
  46. int seek_request;
  47. size_t seek_pos;
  48. int seek_whence;
  49. int seek_completed;
  50. int64_t seek_ret;
  51. int io_error;
  52. int io_eof_reached;
  53. size_t logical_pos;
  54. size_t logical_size;
  55. AVFifoBuffer *fifo;
  56. pthread_cond_t cond_wakeup_main;
  57. pthread_cond_t cond_wakeup_background;
  58. pthread_mutex_t mutex;
  59. pthread_t async_buffer_thread;
  60. int abort_request;
  61. AVIOInterruptCB interrupt_callback;
  62. } Context;
  63. static int async_interrupt_callback(void *arg)
  64. {
  65. URLContext *h = arg;
  66. Context *c = h->priv_data;
  67. int ret = 0;
  68. if (c->interrupt_callback.callback) {
  69. ret = c->interrupt_callback.callback(c->interrupt_callback.opaque);
  70. if (!ret)
  71. return ret;
  72. }
  73. return c->abort_request;
  74. }
  75. static void *async_buffer_task(void *arg)
  76. {
  77. URLContext *h = arg;
  78. Context *c = h->priv_data;
  79. AVFifoBuffer *fifo = c->fifo;
  80. int ret = 0;
  81. while (1) {
  82. int fifo_space, to_copy;
  83. if (async_interrupt_callback(h)) {
  84. c->io_eof_reached = 1;
  85. c->io_error = AVERROR_EXIT;
  86. break;
  87. }
  88. if (c->seek_request) {
  89. pthread_mutex_lock(&c->mutex);
  90. ret = ffurl_seek(c->inner, c->seek_pos, c->seek_whence);
  91. if (ret < 0) {
  92. c->io_eof_reached = 1;
  93. c->io_error = ret;
  94. } else {
  95. c->io_eof_reached = 0;
  96. c->io_error = 0;
  97. }
  98. c->seek_completed = 1;
  99. c->seek_ret = ret;
  100. c->seek_request = 0;
  101. av_fifo_reset(fifo);
  102. pthread_cond_signal(&c->cond_wakeup_main);
  103. pthread_mutex_unlock(&c->mutex);
  104. continue;
  105. }
  106. fifo_space = av_fifo_space(fifo);
  107. if (c->io_eof_reached || fifo_space <= 0) {
  108. pthread_mutex_lock(&c->mutex);
  109. pthread_cond_signal(&c->cond_wakeup_main);
  110. pthread_cond_wait(&c->cond_wakeup_background, &c->mutex);
  111. pthread_mutex_unlock(&c->mutex);
  112. continue;
  113. }
  114. to_copy = FFMIN(4096, fifo_space);
  115. ret = av_fifo_generic_write(fifo, c->inner, to_copy, (void *)ffurl_read);
  116. if (ret <= 0) {
  117. c->io_eof_reached = 1;
  118. if (ret < 0) {
  119. c->io_error = ret;
  120. }
  121. }
  122. pthread_mutex_lock(&c->mutex);
  123. pthread_cond_signal(&c->cond_wakeup_main);
  124. pthread_mutex_unlock(&c->mutex);
  125. }
  126. return NULL;
  127. }
  128. static int async_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
  129. {
  130. Context *c = h->priv_data;
  131. int ret;
  132. AVIOInterruptCB interrupt_callback = {.callback = async_interrupt_callback, .opaque = h};
  133. av_strstart(arg, "async:", &arg);
  134. c->fifo = av_fifo_alloc(BUFFER_CAPACITY);
  135. if (!c->fifo) {
  136. ret = AVERROR(ENOMEM);
  137. goto fifo_fail;
  138. }
  139. /* wrap interrupt callback */
  140. c->interrupt_callback = h->interrupt_callback;
  141. ret = ffurl_open(&c->inner, arg, flags, &interrupt_callback, options);
  142. if (ret != 0) {
  143. av_log(h, AV_LOG_ERROR, "ffurl_open failed : %s, %s\n", strerror(ret), arg);
  144. goto url_fail;
  145. }
  146. c->logical_size = ffurl_size(c->inner);
  147. h->is_streamed = c->inner->is_streamed;
  148. ret = pthread_mutex_init(&c->mutex, NULL);
  149. if (ret != 0) {
  150. av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
  151. goto mutex_fail;
  152. }
  153. ret = pthread_cond_init(&c->cond_wakeup_main, NULL);
  154. if (ret != 0) {
  155. av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
  156. goto cond_wakeup_main_fail;
  157. }
  158. ret = pthread_cond_init(&c->cond_wakeup_background, NULL);
  159. if (ret != 0) {
  160. av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
  161. goto cond_wakeup_background_fail;
  162. }
  163. ret = pthread_create(&c->async_buffer_thread, NULL, async_buffer_task, h);
  164. if (ret) {
  165. av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
  166. goto thread_fail;
  167. }
  168. return 0;
  169. thread_fail:
  170. pthread_cond_destroy(&c->cond_wakeup_background);
  171. cond_wakeup_background_fail:
  172. pthread_cond_destroy(&c->cond_wakeup_main);
  173. cond_wakeup_main_fail:
  174. pthread_mutex_destroy(&c->mutex);
  175. mutex_fail:
  176. ffurl_close(c->inner);
  177. url_fail:
  178. av_fifo_freep(&c->fifo);
  179. fifo_fail:
  180. return ret;
  181. }
  182. static int async_close(URLContext *h)
  183. {
  184. Context *c = h->priv_data;
  185. int ret;
  186. pthread_mutex_lock(&c->mutex);
  187. c->abort_request = 1;
  188. pthread_cond_signal(&c->cond_wakeup_background);
  189. pthread_mutex_unlock(&c->mutex);
  190. ret = pthread_join(c->async_buffer_thread, NULL);
  191. if (ret != 0)
  192. av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
  193. pthread_cond_destroy(&c->cond_wakeup_background);
  194. pthread_cond_destroy(&c->cond_wakeup_main);
  195. pthread_mutex_destroy(&c->mutex);
  196. ffurl_close(c->inner);
  197. av_fifo_freep(&c->fifo);
  198. return 0;
  199. }
  200. static int async_read_internal(URLContext *h, void *dest, int size, int read_complete,
  201. void (*func)(void*, void*, int))
  202. {
  203. Context *c = h->priv_data;
  204. AVFifoBuffer *fifo = c->fifo;
  205. int to_read = size;
  206. int ret = 0;
  207. pthread_mutex_lock(&c->mutex);
  208. while (to_read > 0) {
  209. int fifo_size, to_copy;
  210. if (async_interrupt_callback(h)) {
  211. ret = AVERROR_EXIT;
  212. break;
  213. }
  214. fifo_size = av_fifo_size(fifo);
  215. to_copy = FFMIN(to_read, fifo_size);
  216. if (to_copy > 0) {
  217. av_fifo_generic_read(fifo, dest, to_copy, func);
  218. if (!func)
  219. dest = (uint8_t *)dest + to_copy;
  220. c->logical_pos += to_copy;
  221. to_read -= to_copy;
  222. ret = size - to_read;
  223. if (to_read <= 0 || !read_complete)
  224. break;
  225. } else if (c->io_eof_reached) {
  226. if (ret <= 0)
  227. ret = AVERROR_EOF;
  228. break;
  229. }
  230. pthread_cond_signal(&c->cond_wakeup_background);
  231. pthread_cond_wait(&c->cond_wakeup_main, &c->mutex);
  232. }
  233. pthread_cond_signal(&c->cond_wakeup_background);
  234. pthread_mutex_unlock(&c->mutex);
  235. return ret;
  236. }
  237. static int async_read(URLContext *h, unsigned char *buf, int size)
  238. {
  239. return async_read_internal(h, buf, size, 0, NULL);
  240. }
  241. static void fifo_do_not_copy_func(void* dest, void* src, int size) {
  242. // do not copy
  243. }
  244. static int64_t async_seek(URLContext *h, int64_t pos, int whence)
  245. {
  246. Context *c = h->priv_data;
  247. AVFifoBuffer *fifo = c->fifo;
  248. int64_t ret;
  249. int64_t new_logical_pos;
  250. int fifo_size;
  251. if (whence == AVSEEK_SIZE) {
  252. av_log(h, AV_LOG_TRACE, "async_seek: AVSEEK_SIZE: %"PRId64"\n", (int64_t)c->logical_size);
  253. return c->logical_size;
  254. } else if (whence == SEEK_CUR) {
  255. av_log(h, AV_LOG_TRACE, "async_seek: %"PRId64"\n", pos);
  256. new_logical_pos = pos + c->logical_pos;
  257. } else if (whence == SEEK_SET){
  258. av_log(h, AV_LOG_TRACE, "async_seek: %"PRId64"\n", pos);
  259. new_logical_pos = pos;
  260. } else {
  261. return AVERROR(EINVAL);
  262. }
  263. if (new_logical_pos < 0)
  264. return AVERROR(EINVAL);
  265. fifo_size = av_fifo_size(fifo);
  266. if (new_logical_pos == c->logical_pos) {
  267. /* current position */
  268. return c->logical_pos;
  269. } else if ((new_logical_pos > c->logical_pos) &&
  270. (new_logical_pos < (c->logical_pos + fifo_size + SHORT_SEEK_THRESHOLD))) {
  271. /* fast seek */
  272. av_log(h, AV_LOG_TRACE, "async_seek: fask_seek %"PRId64" from %d dist:%d/%d\n",
  273. new_logical_pos, (int)c->logical_pos,
  274. (int)(new_logical_pos - c->logical_pos), fifo_size);
  275. async_read_internal(h, NULL, new_logical_pos - c->logical_pos, 1, fifo_do_not_copy_func);
  276. return c->logical_pos;
  277. } else if (c->logical_size <= 0) {
  278. /* can not seek */
  279. return AVERROR(EINVAL);
  280. } else if (new_logical_pos > c->logical_size) {
  281. /* beyond end */
  282. return AVERROR(EINVAL);
  283. }
  284. pthread_mutex_lock(&c->mutex);
  285. c->seek_request = 1;
  286. c->seek_pos = new_logical_pos;
  287. c->seek_whence = SEEK_SET;
  288. c->seek_completed = 0;
  289. c->seek_ret = 0;
  290. while (1) {
  291. if (async_interrupt_callback(h)) {
  292. ret = AVERROR_EXIT;
  293. break;
  294. }
  295. if (c->seek_completed) {
  296. if (c->seek_ret >= 0)
  297. c->logical_pos = c->seek_ret;
  298. ret = c->seek_ret;
  299. break;
  300. }
  301. pthread_cond_signal(&c->cond_wakeup_background);
  302. pthread_cond_wait(&c->cond_wakeup_main, &c->mutex);
  303. }
  304. pthread_mutex_unlock(&c->mutex);
  305. return ret;
  306. }
  307. #define OFFSET(x) offsetof(Context, x)
  308. #define D AV_OPT_FLAG_DECODING_PARAM
  309. static const AVOption options[] = {
  310. {NULL},
  311. };
  312. static const AVClass async_context_class = {
  313. .class_name = "Async",
  314. .item_name = av_default_item_name,
  315. .option = options,
  316. .version = LIBAVUTIL_VERSION_INT,
  317. };
  318. URLProtocol ff_async_protocol = {
  319. .name = "async",
  320. .url_open2 = async_open,
  321. .url_read = async_read,
  322. .url_seek = async_seek,
  323. .url_close = async_close,
  324. .priv_data_size = sizeof(Context),
  325. .priv_data_class = &async_context_class,
  326. };