Subversion Repositories Kolibri OS

Rev

Blame | Last modification | View Log | RSS feed

  1. /*
  2.  * Input async protocol.
  3.  * Copyright (c) 2015 Zhang Rui <bbcallen@gmail.com>
  4.  *
  5.  * This file is part of FFmpeg.
  6.  *
  7.  * FFmpeg is free software; you can redistribute it and/or
  8.  * modify it under the terms of the GNU Lesser General Public
  9.  * License as published by the Free Software Foundation; either
  10.  * version 2.1 of the License, or (at your option) any later version.
  11.  *
  12.  * FFmpeg is distributed in the hope that it will be useful,
  13.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  15.  * Lesser General Public License for more details.
  16.  *
  17.  * You should have received a copy of the GNU Lesser General Public
  18.  * License along with FFmpeg; if not, write to the Free Software
  19.  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  20.  *
  21.  * Based on libavformat/cache.c by Michael Niedermayer
  22.  */
  23.  
  24.  /**
  25.  * @TODO
  26.  *      support timeout
  27.  *      support backward short seek
  28.  *      support work with concatdec, hls
  29.  */
  30.  
  31. #include "libavutil/avassert.h"
  32. #include "libavutil/avstring.h"
  33. #include "libavutil/error.h"
  34. #include "libavutil/fifo.h"
  35. #include "libavutil/log.h"
  36. #include "libavutil/opt.h"
  37. #include "url.h"
  38. #include <stdint.h>
  39. #include <pthread.h>
  40.  
  41. #if HAVE_UNISTD_H
  42. #include <unistd.h>
  43. #endif
  44.  
  45. #define BUFFER_CAPACITY         (4 * 1024 * 1024)
  46. #define SHORT_SEEK_THRESHOLD    (256 * 1024)
  47.  
  48. typedef struct Context {
  49.     AVClass        *class;
  50.     URLContext     *inner;
  51.  
  52.     int             seek_request;
  53.     size_t          seek_pos;
  54.     int             seek_whence;
  55.     int             seek_completed;
  56.     int64_t         seek_ret;
  57.  
  58.     int             io_error;
  59.     int             io_eof_reached;
  60.  
  61.     size_t          logical_pos;
  62.     size_t          logical_size;
  63.     AVFifoBuffer   *fifo;
  64.  
  65.     pthread_cond_t  cond_wakeup_main;
  66.     pthread_cond_t  cond_wakeup_background;
  67.     pthread_mutex_t mutex;
  68.     pthread_t       async_buffer_thread;
  69.  
  70.     int             abort_request;
  71.     AVIOInterruptCB interrupt_callback;
  72. } Context;
  73.  
  74. static int async_check_interrupt(void *arg)
  75. {
  76.     URLContext *h   = arg;
  77.     Context    *c   = h->priv_data;
  78.  
  79.     if (c->abort_request)
  80.         return 1;
  81.  
  82.     if (ff_check_interrupt(&c->interrupt_callback))
  83.         c->abort_request = 1;
  84.  
  85.     return c->abort_request;
  86. }
  87.  
  88. static void *async_buffer_task(void *arg)
  89. {
  90.     URLContext   *h    = arg;
  91.     Context      *c    = h->priv_data;
  92.     AVFifoBuffer *fifo = c->fifo;
  93.     int           ret  = 0;
  94.  
  95.     while (1) {
  96.         int fifo_space, to_copy;
  97.  
  98.         pthread_mutex_lock(&c->mutex);
  99.         if (async_check_interrupt(h)) {
  100.             c->io_eof_reached = 1;
  101.             c->io_error       = AVERROR_EXIT;
  102.             pthread_cond_signal(&c->cond_wakeup_main);
  103.             pthread_mutex_unlock(&c->mutex);
  104.             break;
  105.         }
  106.  
  107.         if (c->seek_request) {
  108.             ret = ffurl_seek(c->inner, c->seek_pos, c->seek_whence);
  109.             if (ret < 0) {
  110.                 c->io_eof_reached = 1;
  111.                 c->io_error       = ret;
  112.             } else {
  113.                 c->io_eof_reached = 0;
  114.                 c->io_error       = 0;
  115.             }
  116.  
  117.             c->seek_completed = 1;
  118.             c->seek_ret       = ret;
  119.             c->seek_request   = 0;
  120.  
  121.             av_fifo_reset(fifo);
  122.  
  123.             pthread_cond_signal(&c->cond_wakeup_main);
  124.             pthread_mutex_unlock(&c->mutex);
  125.             continue;
  126.         }
  127.  
  128.         fifo_space = av_fifo_space(fifo);
  129.         if (c->io_eof_reached || fifo_space <= 0) {
  130.             pthread_cond_signal(&c->cond_wakeup_main);
  131.             pthread_cond_wait(&c->cond_wakeup_background, &c->mutex);
  132.             pthread_mutex_unlock(&c->mutex);
  133.             continue;
  134.         }
  135.         pthread_mutex_unlock(&c->mutex);
  136.  
  137.         to_copy = FFMIN(4096, fifo_space);
  138.         ret = av_fifo_generic_write(fifo, c->inner, to_copy, (void *)ffurl_read);
  139.  
  140.         pthread_mutex_lock(&c->mutex);
  141.         if (ret <= 0) {
  142.             c->io_eof_reached = 1;
  143.             if (ret < 0) {
  144.                 c->io_error = ret;
  145.             }
  146.         }
  147.  
  148.         pthread_cond_signal(&c->cond_wakeup_main);
  149.         pthread_mutex_unlock(&c->mutex);
  150.     }
  151.  
  152.     return NULL;
  153. }
  154.  
  155. static int async_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
  156. {
  157.     Context         *c = h->priv_data;
  158.     int              ret;
  159.     AVIOInterruptCB  interrupt_callback = {.callback = async_check_interrupt, .opaque = h};
  160.  
  161.     av_strstart(arg, "async:", &arg);
  162.  
  163.     c->fifo = av_fifo_alloc(BUFFER_CAPACITY);
  164.     if (!c->fifo) {
  165.         ret = AVERROR(ENOMEM);
  166.         goto fifo_fail;
  167.     }
  168.  
  169.     /* wrap interrupt callback */
  170.     c->interrupt_callback = h->interrupt_callback;
  171.     ret = ffurl_open(&c->inner, arg, flags, &interrupt_callback, options);
  172.     if (ret != 0) {
  173.         av_log(h, AV_LOG_ERROR, "ffurl_open failed : %s, %s\n", av_err2str(ret), arg);
  174.         goto url_fail;
  175.     }
  176.  
  177.     c->logical_size = ffurl_size(c->inner);
  178.     h->is_streamed  = c->inner->is_streamed;
  179.  
  180.     ret = pthread_mutex_init(&c->mutex, NULL);
  181.     if (ret != 0) {
  182.         av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", av_err2str(ret));
  183.         goto mutex_fail;
  184.     }
  185.  
  186.     ret = pthread_cond_init(&c->cond_wakeup_main, NULL);
  187.     if (ret != 0) {
  188.         av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", av_err2str(ret));
  189.         goto cond_wakeup_main_fail;
  190.     }
  191.  
  192.     ret = pthread_cond_init(&c->cond_wakeup_background, NULL);
  193.     if (ret != 0) {
  194.         av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", av_err2str(ret));
  195.         goto cond_wakeup_background_fail;
  196.     }
  197.  
  198.     ret = pthread_create(&c->async_buffer_thread, NULL, async_buffer_task, h);
  199.     if (ret) {
  200.         av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", av_err2str(ret));
  201.         goto thread_fail;
  202.     }
  203.  
  204.     return 0;
  205.  
  206. thread_fail:
  207.     pthread_cond_destroy(&c->cond_wakeup_background);
  208. cond_wakeup_background_fail:
  209.     pthread_cond_destroy(&c->cond_wakeup_main);
  210. cond_wakeup_main_fail:
  211.     pthread_mutex_destroy(&c->mutex);
  212. mutex_fail:
  213.     ffurl_close(c->inner);
  214. url_fail:
  215.     av_fifo_freep(&c->fifo);
  216. fifo_fail:
  217.     return ret;
  218. }
  219.  
  220. static int async_close(URLContext *h)
  221. {
  222.     Context *c = h->priv_data;
  223.     int      ret;
  224.  
  225.     pthread_mutex_lock(&c->mutex);
  226.     c->abort_request = 1;
  227.     pthread_cond_signal(&c->cond_wakeup_background);
  228.     pthread_mutex_unlock(&c->mutex);
  229.  
  230.     ret = pthread_join(c->async_buffer_thread, NULL);
  231.     if (ret != 0)
  232.         av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", av_err2str(ret));
  233.  
  234.     pthread_cond_destroy(&c->cond_wakeup_background);
  235.     pthread_cond_destroy(&c->cond_wakeup_main);
  236.     pthread_mutex_destroy(&c->mutex);
  237.     ffurl_close(c->inner);
  238.     av_fifo_freep(&c->fifo);
  239.  
  240.     return 0;
  241. }
  242.  
  243. static int async_read_internal(URLContext *h, void *dest, int size, int read_complete,
  244.                                void (*func)(void*, void*, int))
  245. {
  246.     Context      *c       = h->priv_data;
  247.     AVFifoBuffer *fifo    = c->fifo;
  248.     int           to_read = size;
  249.     int           ret     = 0;
  250.  
  251.     pthread_mutex_lock(&c->mutex);
  252.  
  253.     while (to_read > 0) {
  254.         int fifo_size, to_copy;
  255.         if (async_check_interrupt(h)) {
  256.             ret = AVERROR_EXIT;
  257.             break;
  258.         }
  259.         fifo_size = av_fifo_size(fifo);
  260.         to_copy   = FFMIN(to_read, fifo_size);
  261.         if (to_copy > 0) {
  262.             av_fifo_generic_read(fifo, dest, to_copy, func);
  263.             if (!func)
  264.                 dest = (uint8_t *)dest + to_copy;
  265.             c->logical_pos += to_copy;
  266.             to_read        -= to_copy;
  267.             ret             = size - to_read;
  268.  
  269.             if (to_read <= 0 || !read_complete)
  270.                 break;
  271.         } else if (c->io_eof_reached) {
  272.             if (ret <= 0)
  273.                 ret = AVERROR_EOF;
  274.             break;
  275.         }
  276.         pthread_cond_signal(&c->cond_wakeup_background);
  277.         pthread_cond_wait(&c->cond_wakeup_main, &c->mutex);
  278.     }
  279.  
  280.     pthread_cond_signal(&c->cond_wakeup_background);
  281.     pthread_mutex_unlock(&c->mutex);
  282.  
  283.     return ret;
  284. }
  285.  
  286. static int async_read(URLContext *h, unsigned char *buf, int size)
  287. {
  288.     return async_read_internal(h, buf, size, 0, NULL);
  289. }
  290.  
  291. static void fifo_do_not_copy_func(void* dest, void* src, int size) {
  292.     // do not copy
  293. }
  294.  
  295. static int64_t async_seek(URLContext *h, int64_t pos, int whence)
  296. {
  297.     Context      *c    = h->priv_data;
  298.     AVFifoBuffer *fifo = c->fifo;
  299.     int64_t       ret;
  300.     int64_t       new_logical_pos;
  301.     int fifo_size;
  302.  
  303.     if (whence == AVSEEK_SIZE) {
  304.         av_log(h, AV_LOG_TRACE, "async_seek: AVSEEK_SIZE: %"PRId64"\n", (int64_t)c->logical_size);
  305.         return c->logical_size;
  306.     } else if (whence == SEEK_CUR) {
  307.         av_log(h, AV_LOG_TRACE, "async_seek: %"PRId64"\n", pos);
  308.         new_logical_pos = pos + c->logical_pos;
  309.     } else if (whence == SEEK_SET){
  310.         av_log(h, AV_LOG_TRACE, "async_seek: %"PRId64"\n", pos);
  311.         new_logical_pos = pos;
  312.     } else {
  313.         return AVERROR(EINVAL);
  314.     }
  315.     if (new_logical_pos < 0)
  316.         return AVERROR(EINVAL);
  317.  
  318.     fifo_size = av_fifo_size(fifo);
  319.     if (new_logical_pos == c->logical_pos) {
  320.         /* current position */
  321.         return c->logical_pos;
  322.     } else if ((new_logical_pos > c->logical_pos) &&
  323.                (new_logical_pos < (c->logical_pos + fifo_size + SHORT_SEEK_THRESHOLD))) {
  324.         /* fast seek */
  325.         av_log(h, AV_LOG_TRACE, "async_seek: fask_seek %"PRId64" from %d dist:%d/%d\n",
  326.                 new_logical_pos, (int)c->logical_pos,
  327.                 (int)(new_logical_pos - c->logical_pos), fifo_size);
  328.         async_read_internal(h, NULL, new_logical_pos - c->logical_pos, 1, fifo_do_not_copy_func);
  329.         return c->logical_pos;
  330.     } else if (c->logical_size <= 0) {
  331.         /* can not seek */
  332.         return AVERROR(EINVAL);
  333.     } else if (new_logical_pos > c->logical_size) {
  334.         /* beyond end */
  335.         return AVERROR(EINVAL);
  336.     }
  337.  
  338.     pthread_mutex_lock(&c->mutex);
  339.  
  340.     c->seek_request   = 1;
  341.     c->seek_pos       = new_logical_pos;
  342.     c->seek_whence    = SEEK_SET;
  343.     c->seek_completed = 0;
  344.     c->seek_ret       = 0;
  345.  
  346.     while (1) {
  347.         if (async_check_interrupt(h)) {
  348.             ret = AVERROR_EXIT;
  349.             break;
  350.         }
  351.         if (c->seek_completed) {
  352.             if (c->seek_ret >= 0)
  353.                 c->logical_pos  = c->seek_ret;
  354.             ret = c->seek_ret;
  355.             break;
  356.         }
  357.         pthread_cond_signal(&c->cond_wakeup_background);
  358.         pthread_cond_wait(&c->cond_wakeup_main, &c->mutex);
  359.     }
  360.  
  361.     pthread_mutex_unlock(&c->mutex);
  362.  
  363.     return ret;
  364. }
  365.  
  366. #define OFFSET(x) offsetof(Context, x)
  367. #define D AV_OPT_FLAG_DECODING_PARAM
  368.  
  369. static const AVOption options[] = {
  370.     {NULL},
  371. };
  372.  
  373. static const AVClass async_context_class = {
  374.     .class_name = "Async",
  375.     .item_name  = av_default_item_name,
  376.     .option     = options,
  377.     .version    = LIBAVUTIL_VERSION_INT,
  378. };
  379.  
  380. URLProtocol ff_async_protocol = {
  381.     .name                = "async",
  382.     .url_open2           = async_open,
  383.     .url_read            = async_read,
  384.     .url_seek            = async_seek,
  385.     .url_close           = async_close,
  386.     .priv_data_size      = sizeof(Context),
  387.     .priv_data_class     = &async_context_class,
  388. };
  389.  
  390. #ifdef TEST
  391.  
  392. #define TEST_SEEK_POS    (1536)
  393. #define TEST_STREAM_SIZE (2048)
  394.  
  395. typedef struct TestContext {
  396.     AVClass        *class;
  397.     size_t          logical_pos;
  398.     size_t          logical_size;
  399. } TestContext;
  400.  
  401. static int async_test_open(URLContext *h, const char *arg, int flags, AVDictionary **options)
  402. {
  403.     TestContext *c = h->priv_data;
  404.     c->logical_pos  = 0;
  405.     c->logical_size = TEST_STREAM_SIZE;
  406.     return 0;
  407. }
  408.  
  409. static int async_test_close(URLContext *h)
  410. {
  411.     return 0;
  412. }
  413.  
  414. static int async_test_read(URLContext *h, unsigned char *buf, int size)
  415. {
  416.     TestContext *c = h->priv_data;
  417.     int          i;
  418.     int          read_len = 0;
  419.  
  420.     if (c->logical_pos >= c->logical_size)
  421.         return AVERROR_EOF;
  422.  
  423.     for (i = 0; i < size; ++i) {
  424.         buf[i] = c->logical_pos & 0xFF;
  425.  
  426.         c->logical_pos++;
  427.         read_len++;
  428.  
  429.         if (c->logical_pos >= c->logical_size)
  430.             break;
  431.     }
  432.  
  433.     return read_len;
  434. }
  435.  
  436. static int64_t async_test_seek(URLContext *h, int64_t pos, int whence)
  437. {
  438.     TestContext *c = h->priv_data;
  439.     int64_t      new_logical_pos;
  440.  
  441.     if (whence == AVSEEK_SIZE) {
  442.         return c->logical_size;
  443.     } else if (whence == SEEK_CUR) {
  444.         new_logical_pos = pos + c->logical_pos;
  445.     } else if (whence == SEEK_SET){
  446.         new_logical_pos = pos;
  447.     } else {
  448.         return AVERROR(EINVAL);
  449.     }
  450.     if (new_logical_pos < 0)
  451.         return AVERROR(EINVAL);
  452.  
  453.     c->logical_pos = new_logical_pos;
  454.     return new_logical_pos;
  455. }
  456.  
  457. static const AVClass async_test_context_class = {
  458.     .class_name = "Async-Test",
  459.     .item_name  = av_default_item_name,
  460.     .version    = LIBAVUTIL_VERSION_INT,
  461. };
  462.  
  463. URLProtocol ff_async_test_protocol = {
  464.     .name                = "async-test",
  465.     .url_open2           = async_test_open,
  466.     .url_read            = async_test_read,
  467.     .url_seek            = async_test_seek,
  468.     .url_close           = async_test_close,
  469.     .priv_data_size      = sizeof(TestContext),
  470.     .priv_data_class     = &async_test_context_class,
  471. };
  472.  
  473. int main(void)
  474. {
  475.     URLContext   *h = NULL;
  476.     int           i;
  477.     int           ret;
  478.     int64_t       size;
  479.     int64_t       pos;
  480.     int64_t       read_len;
  481.     unsigned char buf[4096];
  482.  
  483.     ffurl_register_protocol(&ff_async_protocol);
  484.     ffurl_register_protocol(&ff_async_test_protocol);
  485.  
  486.     ret = ffurl_open(&h, "async:async-test:", AVIO_FLAG_READ, NULL, NULL);
  487.     printf("open: %d\n", ret);
  488.  
  489.     size = ffurl_size(h);
  490.     printf("size: %"PRId64"\n", size);
  491.  
  492.     pos = ffurl_seek(h, 0, SEEK_CUR);
  493.     read_len = 0;
  494.     while (1) {
  495.         ret = ffurl_read(h, buf, sizeof(buf));
  496.         if (ret == AVERROR_EOF) {
  497.             printf("read-error: AVERROR_EOF at %"PRId64"\n", ffurl_seek(h, 0, SEEK_CUR));
  498.             break;
  499.         }
  500.         else if (ret == 0)
  501.             break;
  502.         else if (ret < 0) {
  503.             printf("read-error: %d at %"PRId64"\n", ret, ffurl_seek(h, 0, SEEK_CUR));
  504.             goto fail;
  505.         } else {
  506.             for (i = 0; i < ret; ++i) {
  507.                 if (buf[i] != (pos & 0xFF)) {
  508.                     printf("read-mismatch: actual %d, expecting %d, at %"PRId64"\n",
  509.                            (int)buf[i], (int)(pos & 0xFF), pos);
  510.                     break;
  511.                 }
  512.                 pos++;
  513.             }
  514.         }
  515.  
  516.         read_len += ret;
  517.     }
  518.     printf("read: %"PRId64"\n", read_len);
  519.  
  520.     ret = ffurl_read(h, buf, 1);
  521.     printf("read: %d\n", ret);
  522.  
  523.     pos = ffurl_seek(h, TEST_SEEK_POS, SEEK_SET);
  524.     printf("seek: %"PRId64"\n", pos);
  525.  
  526.     read_len = 0;
  527.     while (1) {
  528.         ret = ffurl_read(h, buf, sizeof(buf));
  529.         if (ret == AVERROR_EOF)
  530.             break;
  531.         else if (ret == 0)
  532.             break;
  533.         else if (ret < 0) {
  534.             printf("read-error: %d at %"PRId64"\n", ret, ffurl_seek(h, 0, SEEK_CUR));
  535.             goto fail;
  536.         } else {
  537.             for (i = 0; i < ret; ++i) {
  538.                 if (buf[i] != (pos & 0xFF)) {
  539.                     printf("read-mismatch: actual %d, expecting %d, at %"PRId64"\n",
  540.                            (int)buf[i], (int)(pos & 0xFF), pos);
  541.                     break;
  542.                 }
  543.                 pos++;
  544.             }
  545.         }
  546.  
  547.         read_len += ret;
  548.     }
  549.     printf("read: %"PRId64"\n", read_len);
  550.  
  551.     ret = ffurl_read(h, buf, 1);
  552.     printf("read: %d\n", ret);
  553.  
  554. fail:
  555.     ffurl_close(h);
  556.     return 0;
  557. }
  558.  
  559. #endif
  560.