Subversion Repositories Kolibri OS

Rev

Go to most recent revision | Blame | Last modification | View Log | RSS feed

  1. /*
  2.  *
  3.  * This file is part of FFmpeg.
  4.  *
  5.  * FFmpeg is free software; you can redistribute it and/or
  6.  * modify it under the terms of the GNU Lesser General Public
  7.  * License as published by the Free Software Foundation; either
  8.  * version 2.1 of the License, or (at your option) any later version.
  9.  *
  10.  * FFmpeg is distributed in the hope that it will be useful,
  11.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13.  * Lesser General Public License for more details.
  14.  *
  15.  * You should have received a copy of the GNU Lesser General Public
  16.  * License along with FFmpeg; if not, write to the Free Software
  17.  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  18.  */
  19.  
  20. /**
  21.  * @file
  22.  * Libavfilter multithreading support
  23.  */
  24.  
  25. #include "config.h"
  26.  
  27. #include "libavutil/common.h"
  28. #include "libavutil/cpu.h"
  29. #include "libavutil/mem.h"
  30.  
  31. #include "avfilter.h"
  32. #include "internal.h"
  33. #include "thread.h"
  34.  
  35. #if HAVE_PTHREADS
  36. #include <pthread.h>
  37. #elif HAVE_OS2THREADS
  38. #include "compat/os2threads.h"
  39. #elif HAVE_W32THREADS
  40. #include "compat/w32pthreads.h"
  41. #endif
  42.  
  43. typedef struct ThreadContext {
  44.     AVFilterGraph *graph;
  45.  
  46.     int nb_threads;
  47.     pthread_t *workers;
  48.     avfilter_action_func *func;
  49.  
  50.     /* per-execute perameters */
  51.     AVFilterContext *ctx;
  52.     void *arg;
  53.     int   *rets;
  54.     int nb_rets;
  55.     int nb_jobs;
  56.  
  57.     pthread_cond_t last_job_cond;
  58.     pthread_cond_t current_job_cond;
  59.     pthread_mutex_t current_job_lock;
  60.     int current_job;
  61.     unsigned int current_execute;
  62.     int done;
  63. } ThreadContext;
  64.  
  65. static void* attribute_align_arg worker(void *v)
  66. {
  67.     ThreadContext *c = v;
  68.     int our_job      = c->nb_jobs;
  69.     int nb_threads   = c->nb_threads;
  70.     unsigned int last_execute = 0;
  71.     int self_id;
  72.  
  73.     pthread_mutex_lock(&c->current_job_lock);
  74.     self_id = c->current_job++;
  75.     for (;;) {
  76.         while (our_job >= c->nb_jobs) {
  77.             if (c->current_job == nb_threads + c->nb_jobs)
  78.                 pthread_cond_signal(&c->last_job_cond);
  79.  
  80.             while (last_execute == c->current_execute && !c->done)
  81.                 pthread_cond_wait(&c->current_job_cond, &c->current_job_lock);
  82.             last_execute = c->current_execute;
  83.             our_job = self_id;
  84.  
  85.             if (c->done) {
  86.                 pthread_mutex_unlock(&c->current_job_lock);
  87.                 return NULL;
  88.             }
  89.         }
  90.         pthread_mutex_unlock(&c->current_job_lock);
  91.  
  92.         c->rets[our_job % c->nb_rets] = c->func(c->ctx, c->arg, our_job, c->nb_jobs);
  93.  
  94.         pthread_mutex_lock(&c->current_job_lock);
  95.         our_job = c->current_job++;
  96.     }
  97. }
  98.  
  99. static void slice_thread_uninit(ThreadContext *c)
  100. {
  101.     int i;
  102.  
  103.     pthread_mutex_lock(&c->current_job_lock);
  104.     c->done = 1;
  105.     pthread_cond_broadcast(&c->current_job_cond);
  106.     pthread_mutex_unlock(&c->current_job_lock);
  107.  
  108.     for (i = 0; i < c->nb_threads; i++)
  109.          pthread_join(c->workers[i], NULL);
  110.  
  111.     pthread_mutex_destroy(&c->current_job_lock);
  112.     pthread_cond_destroy(&c->current_job_cond);
  113.     pthread_cond_destroy(&c->last_job_cond);
  114.     av_freep(&c->workers);
  115. }
  116.  
  117. static void slice_thread_park_workers(ThreadContext *c)
  118. {
  119.     while (c->current_job != c->nb_threads + c->nb_jobs)
  120.         pthread_cond_wait(&c->last_job_cond, &c->current_job_lock);
  121.     pthread_mutex_unlock(&c->current_job_lock);
  122. }
  123.  
  124. static int thread_execute(AVFilterContext *ctx, avfilter_action_func *func,
  125.                           void *arg, int *ret, int nb_jobs)
  126. {
  127.     ThreadContext *c = ctx->graph->internal->thread;
  128.     int dummy_ret;
  129.  
  130.     if (nb_jobs <= 0)
  131.         return 0;
  132.  
  133.     pthread_mutex_lock(&c->current_job_lock);
  134.  
  135.     c->current_job = c->nb_threads;
  136.     c->nb_jobs     = nb_jobs;
  137.     c->ctx         = ctx;
  138.     c->arg         = arg;
  139.     c->func        = func;
  140.     if (ret) {
  141.         c->rets    = ret;
  142.         c->nb_rets = nb_jobs;
  143.     } else {
  144.         c->rets    = &dummy_ret;
  145.         c->nb_rets = 1;
  146.     }
  147.     c->current_execute++;
  148.  
  149.     pthread_cond_broadcast(&c->current_job_cond);
  150.  
  151.     slice_thread_park_workers(c);
  152.  
  153.     return 0;
  154. }
  155.  
  156. static int thread_init_internal(ThreadContext *c, int nb_threads)
  157. {
  158.     int i, ret;
  159.  
  160.     if (!nb_threads) {
  161.         int nb_cpus = av_cpu_count();
  162.         // use number of cores + 1 as thread count if there is more than one
  163.         if (nb_cpus > 1)
  164.             nb_threads = nb_cpus + 1;
  165.         else
  166.             nb_threads = 1;
  167.     }
  168.  
  169.     if (nb_threads <= 1)
  170.         return 1;
  171.  
  172.     c->nb_threads = nb_threads;
  173.     c->workers = av_mallocz(sizeof(*c->workers) * nb_threads);
  174.     if (!c->workers)
  175.         return AVERROR(ENOMEM);
  176.  
  177.     c->current_job = 0;
  178.     c->nb_jobs     = 0;
  179.     c->done        = 0;
  180.  
  181.     pthread_cond_init(&c->current_job_cond, NULL);
  182.     pthread_cond_init(&c->last_job_cond,    NULL);
  183.  
  184.     pthread_mutex_init(&c->current_job_lock, NULL);
  185.     pthread_mutex_lock(&c->current_job_lock);
  186.     for (i = 0; i < nb_threads; i++) {
  187.         ret = pthread_create(&c->workers[i], NULL, worker, c);
  188.         if (ret) {
  189.            pthread_mutex_unlock(&c->current_job_lock);
  190.            c->nb_threads = i;
  191.            slice_thread_uninit(c);
  192.            return AVERROR(ret);
  193.         }
  194.     }
  195.  
  196.     slice_thread_park_workers(c);
  197.  
  198.     return c->nb_threads;
  199. }
  200.  
  201. int ff_graph_thread_init(AVFilterGraph *graph)
  202. {
  203.     int ret;
  204.  
  205. #if HAVE_W32THREADS
  206.     w32thread_init();
  207. #endif
  208.  
  209.     if (graph->nb_threads == 1) {
  210.         graph->thread_type = 0;
  211.         return 0;
  212.     }
  213.  
  214.     graph->internal->thread = av_mallocz(sizeof(ThreadContext));
  215.     if (!graph->internal->thread)
  216.         return AVERROR(ENOMEM);
  217.  
  218.     ret = thread_init_internal(graph->internal->thread, graph->nb_threads);
  219.     if (ret <= 1) {
  220.         av_freep(&graph->internal->thread);
  221.         graph->thread_type = 0;
  222.         graph->nb_threads  = 1;
  223.         return (ret < 0) ? ret : 0;
  224.     }
  225.     graph->nb_threads = ret;
  226.  
  227.     graph->internal->thread_execute = thread_execute;
  228.  
  229.     return 0;
  230. }
  231.  
  232. void ff_graph_thread_free(AVFilterGraph *graph)
  233. {
  234.     if (graph->internal->thread)
  235.         slice_thread_uninit(graph->internal->thread);
  236.     av_freep(&graph->internal->thread);
  237. }
  238.