Subversion Repositories Kolibri OS

Rev

Go to most recent revision | Blame | Last modification | View Log | RSS feed

  1. /*
  2.  * Copyright (c) 2013 Stefano Sabatini
  3.  *
  4.  * This file is part of FFmpeg.
  5.  *
  6.  * FFmpeg is free software; you can redistribute it and/or
  7.  * modify it under the terms of the GNU Lesser General Public
  8.  * License as published by the Free Software Foundation; either
  9.  * version 2.1 of the License, or (at your option) any later version.
  10.  *
  11.  * FFmpeg is distributed in the hope that it will be useful,
  12.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14.  * Lesser General Public License for more details.
  15.  *
  16.  * You should have received a copy of the GNU Lesser General Public
  17.  * License along with FFmpeg; if not, write to the Free Software
  18.  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  19.  */
  20.  
  21. /**
  22.  * @file
  23.  * receive commands through libzeromq and broker them to filters
  24.  */
  25.  
  26. #include <zmq.h>
  27. #include "libavutil/avstring.h"
  28. #include "libavutil/bprint.h"
  29. #include "libavutil/opt.h"
  30. #include "avfilter.h"
  31. #include "internal.h"
  32. #include "avfiltergraph.h"
  33. #include "audio.h"
  34. #include "video.h"
  35.  
  36. typedef struct {
  37.     const AVClass *class;
  38.     void *zmq;
  39.     void *responder;
  40.     char *bind_address;
  41.     int command_count;
  42. } ZMQContext;
  43.  
  44. #define OFFSET(x) offsetof(ZMQContext, x)
  45. #define FLAGS AV_OPT_FLAG_FILTERING_PARAM | AV_OPT_FLAG_AUDIO_PARAM | AV_OPT_FLAG_VIDEO_PARAM
  46. static const AVOption options[] = {
  47.     { "bind_address", "set bind address", OFFSET(bind_address), AV_OPT_TYPE_STRING, {.str = "tcp://*:5555"}, 0, 0, FLAGS },
  48.     { "b",            "set bind address", OFFSET(bind_address), AV_OPT_TYPE_STRING, {.str = "tcp://*:5555"}, 0, 0, FLAGS },
  49.     { NULL }
  50. };
  51.  
  52. static av_cold int init(AVFilterContext *ctx)
  53. {
  54.     ZMQContext *zmq = ctx->priv;
  55.  
  56.     zmq->zmq = zmq_ctx_new();
  57.     if (!zmq->zmq) {
  58.         av_log(ctx, AV_LOG_ERROR,
  59.                "Could not create ZMQ context: %s\n", zmq_strerror(errno));
  60.         return AVERROR_EXTERNAL;
  61.     }
  62.  
  63.     zmq->responder = zmq_socket(zmq->zmq, ZMQ_REP);
  64.     if (!zmq->responder) {
  65.         av_log(ctx, AV_LOG_ERROR,
  66.                "Could not create ZMQ socket: %s\n", zmq_strerror(errno));
  67.         return AVERROR_EXTERNAL;
  68.     }
  69.  
  70.     if (zmq_bind(zmq->responder, zmq->bind_address) == -1) {
  71.         av_log(ctx, AV_LOG_ERROR,
  72.                "Could not bind ZMQ socket to address '%s': %s\n",
  73.                zmq->bind_address, zmq_strerror(errno));
  74.         return AVERROR_EXTERNAL;
  75.     }
  76.  
  77.     zmq->command_count = -1;
  78.     return 0;
  79. }
  80.  
  81. static void av_cold uninit(AVFilterContext *ctx)
  82. {
  83.     ZMQContext *zmq = ctx->priv;
  84.  
  85.     zmq_close(zmq->responder);
  86.     zmq_ctx_destroy(zmq->zmq);
  87. }
  88.  
  89. typedef struct {
  90.     char *target, *command, *arg;
  91. } Command;
  92.  
  93. #define SPACES " \f\t\n\r"
  94.  
  95. static int parse_command(Command *cmd, const char *command_str, void *log_ctx)
  96. {
  97.     const char **buf = &command_str;
  98.  
  99.     cmd->target = av_get_token(buf, SPACES);
  100.     if (!cmd->target || !cmd->target[0]) {
  101.         av_log(log_ctx, AV_LOG_ERROR,
  102.                "No target specified in command '%s'\n", command_str);
  103.         return AVERROR(EINVAL);
  104.     }
  105.  
  106.     cmd->command = av_get_token(buf, SPACES);
  107.     if (!cmd->command || !cmd->command[0]) {
  108.         av_log(log_ctx, AV_LOG_ERROR,
  109.                "No command specified in command '%s'\n", command_str);
  110.         return AVERROR(EINVAL);
  111.     }
  112.  
  113.     cmd->arg = av_get_token(buf, SPACES);
  114.     return 0;
  115. }
  116.  
  117. static int recv_msg(AVFilterContext *ctx, char **buf, int *buf_size)
  118. {
  119.     ZMQContext *zmq = ctx->priv;
  120.     zmq_msg_t msg;
  121.     int ret = 0;
  122.  
  123.     if (zmq_msg_init(&msg) == -1) {
  124.         av_log(ctx, AV_LOG_WARNING,
  125.                "Could not initialize receive message: %s\n", zmq_strerror(errno));
  126.         return AVERROR_EXTERNAL;
  127.     }
  128.  
  129.     if (zmq_msg_recv(&msg, zmq->responder, ZMQ_DONTWAIT) == -1) {
  130.         if (errno != EAGAIN)
  131.             av_log(ctx, AV_LOG_WARNING,
  132.                    "Could not receive message: %s\n", zmq_strerror(errno));
  133.         ret = AVERROR_EXTERNAL;
  134.         goto end;
  135.     }
  136.  
  137.     *buf_size = zmq_msg_size(&msg) + 1;
  138.     *buf = av_malloc(*buf_size);
  139.     if (!*buf) {
  140.         ret = AVERROR(ENOMEM);
  141.         goto end;
  142.     }
  143.     memcpy(*buf, zmq_msg_data(&msg), *buf_size);
  144.     (*buf)[*buf_size-1] = 0;
  145.  
  146. end:
  147.     zmq_msg_close(&msg);
  148.     return ret;
  149. }
  150.  
  151. static int filter_frame(AVFilterLink *inlink, AVFrame *ref)
  152. {
  153.     AVFilterContext *ctx = inlink->dst;
  154.     ZMQContext *zmq = ctx->priv;
  155.  
  156.     while (1) {
  157.         char cmd_buf[1024];
  158.         char *recv_buf, *send_buf;
  159.         int recv_buf_size;
  160.         Command cmd = {0};
  161.         int ret;
  162.  
  163.         /* receive command */
  164.         if (recv_msg(ctx, &recv_buf, &recv_buf_size) < 0)
  165.             break;
  166.         zmq->command_count++;
  167.  
  168.         /* parse command */
  169.         if (parse_command(&cmd, recv_buf, ctx) < 0) {
  170.             av_log(ctx, AV_LOG_ERROR, "Could not parse command #%d\n", zmq->command_count);
  171.             goto end;
  172.         }
  173.  
  174.         /* process command */
  175.         av_log(ctx, AV_LOG_VERBOSE,
  176.                "Processing command #%d target:%s command:%s arg:%s\n",
  177.                zmq->command_count, cmd.target, cmd.command, cmd.arg);
  178.         ret = avfilter_graph_send_command(inlink->graph,
  179.                                           cmd.target, cmd.command, cmd.arg,
  180.                                           cmd_buf, sizeof(cmd_buf),
  181.                                           AVFILTER_CMD_FLAG_ONE);
  182.         send_buf = av_asprintf("%d %s%s%s",
  183.                                -ret, av_err2str(ret), cmd_buf[0] ? "\n" : "", cmd_buf);
  184.         if (!send_buf) {
  185.             ret = AVERROR(ENOMEM);
  186.             goto end;
  187.         }
  188.         av_log(ctx, AV_LOG_VERBOSE,
  189.                "Sending command reply for command #%d:\n%s\n",
  190.                zmq->command_count, send_buf);
  191.         if (zmq_send(zmq->responder, send_buf, strlen(send_buf), 0) == -1)
  192.             av_log(ctx, AV_LOG_ERROR, "Failed to send reply for command #%d: %s\n",
  193.                    zmq->command_count, zmq_strerror(ret));
  194.  
  195.     end:
  196.         av_freep(&send_buf);
  197.         av_freep(&recv_buf);
  198.         recv_buf_size = 0;
  199.         av_freep(&cmd.target);
  200.         av_freep(&cmd.command);
  201.         av_freep(&cmd.arg);
  202.     }
  203.  
  204.     return ff_filter_frame(ctx->outputs[0], ref);
  205. }
  206.  
  207. #if CONFIG_ZMQ_FILTER
  208.  
  209. #define zmq_options options
  210. AVFILTER_DEFINE_CLASS(zmq);
  211.  
  212. static const AVFilterPad zmq_inputs[] = {
  213.     {
  214.         .name         = "default",
  215.         .type         = AVMEDIA_TYPE_VIDEO,
  216.         .filter_frame = filter_frame,
  217.     },
  218.     { NULL }
  219. };
  220.  
  221. static const AVFilterPad zmq_outputs[] = {
  222.     {
  223.         .name = "default",
  224.         .type = AVMEDIA_TYPE_VIDEO,
  225.     },
  226.     { NULL }
  227. };
  228.  
  229. AVFilter avfilter_vf_zmq = {
  230.     .name        = "zmq",
  231.     .description = NULL_IF_CONFIG_SMALL("Receive commands through ZMQ and broker them to filters."),
  232.     .init        = init,
  233.     .uninit      = uninit,
  234.     .priv_size   = sizeof(ZMQContext),
  235.     .inputs      = zmq_inputs,
  236.     .outputs     = zmq_outputs,
  237.     .priv_class  = &zmq_class,
  238. };
  239.  
  240. #endif
  241.  
  242. #if CONFIG_AZMQ_FILTER
  243.  
  244. #define azmq_options options
  245. AVFILTER_DEFINE_CLASS(azmq);
  246.  
  247. static const AVFilterPad azmq_inputs[] = {
  248.     {
  249.         .name         = "default",
  250.         .type         = AVMEDIA_TYPE_AUDIO,
  251.         .filter_frame = filter_frame,
  252.     },
  253.     { NULL }
  254. };
  255.  
  256. static const AVFilterPad azmq_outputs[] = {
  257.     {
  258.         .name = "default",
  259.         .type = AVMEDIA_TYPE_AUDIO,
  260.     },
  261.     { NULL }
  262. };
  263.  
  264. AVFilter avfilter_af_azmq = {
  265.     .name        = "azmq",
  266.     .description = NULL_IF_CONFIG_SMALL("Receive commands through ZMQ and broker them to filters."),
  267.     .init        = init,
  268.     .uninit      = uninit,
  269.     .priv_size   = sizeof(ZMQContext),
  270.     .inputs      = azmq_inputs,
  271.     .outputs     = azmq_outputs,
  272.     .priv_class  = &azmq_class,
  273. };
  274.  
  275. #endif
  276.