You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

260 lines
7.4KB

  1. /*
  2. * This file is part of FFmpeg.
  3. *
  4. * FFmpeg is free software; you can redistribute it and/or
  5. * modify it under the terms of the GNU Lesser General Public
  6. * License as published by the Free Software Foundation; either
  7. * version 2.1 of the License, or (at your option) any later version.
  8. *
  9. * FFmpeg is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. * Lesser General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU Lesser General Public
  15. * License along with FFmpeg; if not, write to the Free Software
  16. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  17. */
  18. #include <stdatomic.h>
  19. #include "slicethread.h"
  20. #include "mem.h"
  21. #include "thread.h"
  22. #include "avassert.h"
  23. #if HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS2THREADS
  24. typedef struct WorkerContext {
  25. AVSliceThread *ctx;
  26. pthread_mutex_t mutex;
  27. pthread_cond_t cond;
  28. pthread_t thread;
  29. int done;
  30. } WorkerContext;
  31. struct AVSliceThread {
  32. WorkerContext *workers;
  33. int nb_threads;
  34. int nb_active_threads;
  35. int nb_jobs;
  36. atomic_uint first_job;
  37. atomic_uint current_job;
  38. pthread_mutex_t done_mutex;
  39. pthread_cond_t done_cond;
  40. int done;
  41. int finished;
  42. void *priv;
  43. void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads);
  44. void (*main_func)(void *priv);
  45. };
  46. static int run_jobs(AVSliceThread *ctx)
  47. {
  48. unsigned nb_jobs = ctx->nb_jobs;
  49. unsigned nb_active_threads = ctx->nb_active_threads;
  50. unsigned first_job = atomic_fetch_add_explicit(&ctx->first_job, 1, memory_order_acq_rel);
  51. unsigned current_job = first_job;
  52. do {
  53. ctx->worker_func(ctx->priv, current_job, first_job, nb_jobs, nb_active_threads);
  54. } while ((current_job = atomic_fetch_add_explicit(&ctx->current_job, 1, memory_order_acq_rel)) < nb_jobs);
  55. return current_job == nb_jobs + nb_active_threads - 1;
  56. }
  57. static void *attribute_align_arg thread_worker(void *v)
  58. {
  59. WorkerContext *w = v;
  60. AVSliceThread *ctx = w->ctx;
  61. pthread_mutex_lock(&w->mutex);
  62. pthread_cond_signal(&w->cond);
  63. while (1) {
  64. w->done = 1;
  65. while (w->done)
  66. pthread_cond_wait(&w->cond, &w->mutex);
  67. if (ctx->finished) {
  68. pthread_mutex_unlock(&w->mutex);
  69. return NULL;
  70. }
  71. if (run_jobs(ctx)) {
  72. pthread_mutex_lock(&ctx->done_mutex);
  73. ctx->done = 1;
  74. pthread_cond_signal(&ctx->done_cond);
  75. pthread_mutex_unlock(&ctx->done_mutex);
  76. }
  77. }
  78. }
  79. int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
  80. void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
  81. void (*main_func)(void *priv),
  82. int nb_threads)
  83. {
  84. AVSliceThread *ctx;
  85. int nb_workers, i;
  86. #if HAVE_W32THREADS
  87. w32thread_init();
  88. #endif
  89. av_assert0(nb_threads >= 0);
  90. if (!nb_threads) {
  91. int nb_cpus = av_cpu_count();
  92. if (nb_cpus > 1)
  93. nb_threads = nb_cpus + 1;
  94. else
  95. nb_threads = 1;
  96. }
  97. nb_workers = nb_threads;
  98. if (!main_func)
  99. nb_workers--;
  100. *pctx = ctx = av_mallocz(sizeof(*ctx));
  101. if (!ctx)
  102. return AVERROR(ENOMEM);
  103. if (nb_workers && !(ctx->workers = av_calloc(nb_workers, sizeof(*ctx->workers)))) {
  104. av_freep(pctx);
  105. return AVERROR(ENOMEM);
  106. }
  107. ctx->priv = priv;
  108. ctx->worker_func = worker_func;
  109. ctx->main_func = main_func;
  110. ctx->nb_threads = nb_threads;
  111. ctx->nb_active_threads = 0;
  112. ctx->nb_jobs = 0;
  113. ctx->finished = 0;
  114. atomic_init(&ctx->first_job, 0);
  115. atomic_init(&ctx->current_job, 0);
  116. pthread_mutex_init(&ctx->done_mutex, NULL);
  117. pthread_cond_init(&ctx->done_cond, NULL);
  118. ctx->done = 0;
  119. for (i = 0; i < nb_workers; i++) {
  120. WorkerContext *w = &ctx->workers[i];
  121. int ret;
  122. w->ctx = ctx;
  123. pthread_mutex_init(&w->mutex, NULL);
  124. pthread_cond_init(&w->cond, NULL);
  125. pthread_mutex_lock(&w->mutex);
  126. w->done = 0;
  127. if (ret = pthread_create(&w->thread, NULL, thread_worker, w)) {
  128. ctx->nb_threads = main_func ? i : i + 1;
  129. pthread_mutex_unlock(&w->mutex);
  130. pthread_cond_destroy(&w->cond);
  131. pthread_mutex_destroy(&w->mutex);
  132. avpriv_slicethread_free(pctx);
  133. return AVERROR(ret);
  134. }
  135. while (!w->done)
  136. pthread_cond_wait(&w->cond, &w->mutex);
  137. pthread_mutex_unlock(&w->mutex);
  138. }
  139. return nb_threads;
  140. }
  141. void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
  142. {
  143. int nb_workers, i, is_last = 0;
  144. av_assert0(nb_jobs > 0);
  145. ctx->nb_jobs = nb_jobs;
  146. ctx->nb_active_threads = FFMIN(nb_jobs, ctx->nb_threads);
  147. atomic_store_explicit(&ctx->first_job, 0, memory_order_relaxed);
  148. atomic_store_explicit(&ctx->current_job, ctx->nb_active_threads, memory_order_relaxed);
  149. nb_workers = ctx->nb_active_threads;
  150. if (!ctx->main_func || !execute_main)
  151. nb_workers--;
  152. for (i = 0; i < nb_workers; i++) {
  153. WorkerContext *w = &ctx->workers[i];
  154. pthread_mutex_lock(&w->mutex);
  155. w->done = 0;
  156. pthread_cond_signal(&w->cond);
  157. pthread_mutex_unlock(&w->mutex);
  158. }
  159. if (ctx->main_func && execute_main)
  160. ctx->main_func(ctx->priv);
  161. else
  162. is_last = run_jobs(ctx);
  163. if (!is_last) {
  164. pthread_mutex_lock(&ctx->done_mutex);
  165. while (!ctx->done)
  166. pthread_cond_wait(&ctx->done_cond, &ctx->done_mutex);
  167. ctx->done = 0;
  168. pthread_mutex_unlock(&ctx->done_mutex);
  169. }
  170. }
  171. void avpriv_slicethread_free(AVSliceThread **pctx)
  172. {
  173. AVSliceThread *ctx;
  174. int nb_workers, i;
  175. if (!pctx || !*pctx)
  176. return;
  177. ctx = *pctx;
  178. nb_workers = ctx->nb_threads;
  179. if (!ctx->main_func)
  180. nb_workers--;
  181. ctx->finished = 1;
  182. for (i = 0; i < nb_workers; i++) {
  183. WorkerContext *w = &ctx->workers[i];
  184. pthread_mutex_lock(&w->mutex);
  185. w->done = 0;
  186. pthread_cond_signal(&w->cond);
  187. pthread_mutex_unlock(&w->mutex);
  188. }
  189. for (i = 0; i < nb_workers; i++) {
  190. WorkerContext *w = &ctx->workers[i];
  191. pthread_join(w->thread, NULL);
  192. pthread_cond_destroy(&w->cond);
  193. pthread_mutex_destroy(&w->mutex);
  194. }
  195. pthread_cond_destroy(&ctx->done_cond);
  196. pthread_mutex_destroy(&ctx->done_mutex);
  197. av_freep(&ctx->workers);
  198. av_freep(pctx);
  199. }
  200. #else /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
  201. int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
  202. void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
  203. void (*main_func)(void *priv),
  204. int nb_threads)
  205. {
  206. *pctx = NULL;
  207. return AVERROR(EINVAL);
  208. }
  209. void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
  210. {
  211. av_assert0(0);
  212. }
  213. void avpriv_slicethread_free(AVSliceThread **pctx)
  214. {
  215. av_assert0(!pctx || !*pctx);
  216. }
  217. #endif /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */