You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

275 lines
7.5KB

  1. /*
  2. * Copyright (c) 2013 Stefano Sabatini
  3. *
  4. * This file is part of FFmpeg.
  5. *
  6. * FFmpeg is free software; you can redistribute it and/or
  7. * modify it under the terms of the GNU Lesser General Public
  8. * License as published by the Free Software Foundation; either
  9. * version 2.1 of the License, or (at your option) any later version.
  10. *
  11. * FFmpeg is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  14. * Lesser General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU Lesser General Public
  17. * License along with FFmpeg; if not, write to the Free Software
  18. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  19. */
  20. /**
  21. * @file
  22. * receive commands through libzeromq and broker them to filters
  23. */
  24. #include <zmq.h>
  25. #include "libavutil/avstring.h"
  26. #include "libavutil/bprint.h"
  27. #include "libavutil/opt.h"
  28. #include "avfilter.h"
  29. #include "internal.h"
  30. #include "audio.h"
  31. #include "video.h"
  32. typedef struct ZMQContext {
  33. const AVClass *class;
  34. void *zmq;
  35. void *responder;
  36. char *bind_address;
  37. int command_count;
  38. } ZMQContext;
  39. #define OFFSET(x) offsetof(ZMQContext, x)
  40. #define FLAGS AV_OPT_FLAG_FILTERING_PARAM | AV_OPT_FLAG_AUDIO_PARAM | AV_OPT_FLAG_VIDEO_PARAM
  41. static const AVOption options[] = {
  42. { "bind_address", "set bind address", OFFSET(bind_address), AV_OPT_TYPE_STRING, {.str = "tcp://*:5555"}, 0, 0, FLAGS },
  43. { "b", "set bind address", OFFSET(bind_address), AV_OPT_TYPE_STRING, {.str = "tcp://*:5555"}, 0, 0, FLAGS },
  44. { NULL }
  45. };
  46. static av_cold int init(AVFilterContext *ctx)
  47. {
  48. ZMQContext *zmq = ctx->priv;
  49. zmq->zmq = zmq_ctx_new();
  50. if (!zmq->zmq) {
  51. av_log(ctx, AV_LOG_ERROR,
  52. "Could not create ZMQ context: %s\n", zmq_strerror(errno));
  53. return AVERROR_EXTERNAL;
  54. }
  55. zmq->responder = zmq_socket(zmq->zmq, ZMQ_REP);
  56. if (!zmq->responder) {
  57. av_log(ctx, AV_LOG_ERROR,
  58. "Could not create ZMQ socket: %s\n", zmq_strerror(errno));
  59. return AVERROR_EXTERNAL;
  60. }
  61. if (zmq_bind(zmq->responder, zmq->bind_address) == -1) {
  62. av_log(ctx, AV_LOG_ERROR,
  63. "Could not bind ZMQ socket to address '%s': %s\n",
  64. zmq->bind_address, zmq_strerror(errno));
  65. return AVERROR_EXTERNAL;
  66. }
  67. zmq->command_count = -1;
  68. return 0;
  69. }
  70. static void av_cold uninit(AVFilterContext *ctx)
  71. {
  72. ZMQContext *zmq = ctx->priv;
  73. zmq_close(zmq->responder);
  74. zmq_ctx_destroy(zmq->zmq);
  75. }
  76. typedef struct Command {
  77. char *target, *command, *arg;
  78. } Command;
  79. #define SPACES " \f\t\n\r"
  80. static int parse_command(Command *cmd, const char *command_str, void *log_ctx)
  81. {
  82. const char **buf = &command_str;
  83. cmd->target = av_get_token(buf, SPACES);
  84. if (!cmd->target || !cmd->target[0]) {
  85. av_log(log_ctx, AV_LOG_ERROR,
  86. "No target specified in command '%s'\n", command_str);
  87. return AVERROR(EINVAL);
  88. }
  89. cmd->command = av_get_token(buf, SPACES);
  90. if (!cmd->command || !cmd->command[0]) {
  91. av_log(log_ctx, AV_LOG_ERROR,
  92. "No command specified in command '%s'\n", command_str);
  93. return AVERROR(EINVAL);
  94. }
  95. cmd->arg = av_get_token(buf, SPACES);
  96. return 0;
  97. }
  98. static int recv_msg(AVFilterContext *ctx, char **buf, int *buf_size)
  99. {
  100. ZMQContext *zmq = ctx->priv;
  101. zmq_msg_t msg;
  102. int ret = 0;
  103. if (zmq_msg_init(&msg) == -1) {
  104. av_log(ctx, AV_LOG_WARNING,
  105. "Could not initialize receive message: %s\n", zmq_strerror(errno));
  106. return AVERROR_EXTERNAL;
  107. }
  108. if (zmq_msg_recv(&msg, zmq->responder, ZMQ_DONTWAIT) == -1) {
  109. if (errno != EAGAIN)
  110. av_log(ctx, AV_LOG_WARNING,
  111. "Could not receive message: %s\n", zmq_strerror(errno));
  112. ret = AVERROR_EXTERNAL;
  113. goto end;
  114. }
  115. *buf_size = zmq_msg_size(&msg) + 1;
  116. *buf = av_malloc(*buf_size);
  117. if (!*buf) {
  118. ret = AVERROR(ENOMEM);
  119. goto end;
  120. }
  121. memcpy(*buf, zmq_msg_data(&msg), *buf_size);
  122. (*buf)[*buf_size-1] = 0;
  123. end:
  124. zmq_msg_close(&msg);
  125. return ret;
  126. }
  127. static int filter_frame(AVFilterLink *inlink, AVFrame *ref)
  128. {
  129. AVFilterContext *ctx = inlink->dst;
  130. ZMQContext *zmq = ctx->priv;
  131. while (1) {
  132. char cmd_buf[1024];
  133. char *recv_buf, *send_buf;
  134. int recv_buf_size;
  135. Command cmd = {0};
  136. int ret;
  137. /* receive command */
  138. if (recv_msg(ctx, &recv_buf, &recv_buf_size) < 0)
  139. break;
  140. zmq->command_count++;
  141. /* parse command */
  142. if (parse_command(&cmd, recv_buf, ctx) < 0) {
  143. av_log(ctx, AV_LOG_ERROR, "Could not parse command #%d\n", zmq->command_count);
  144. goto end;
  145. }
  146. /* process command */
  147. av_log(ctx, AV_LOG_VERBOSE,
  148. "Processing command #%d target:%s command:%s arg:%s\n",
  149. zmq->command_count, cmd.target, cmd.command, cmd.arg);
  150. ret = avfilter_graph_send_command(inlink->graph,
  151. cmd.target, cmd.command, cmd.arg,
  152. cmd_buf, sizeof(cmd_buf),
  153. AVFILTER_CMD_FLAG_ONE);
  154. send_buf = av_asprintf("%d %s%s%s",
  155. -ret, av_err2str(ret), cmd_buf[0] ? "\n" : "", cmd_buf);
  156. if (!send_buf) {
  157. ret = AVERROR(ENOMEM);
  158. goto end;
  159. }
  160. av_log(ctx, AV_LOG_VERBOSE,
  161. "Sending command reply for command #%d:\n%s\n",
  162. zmq->command_count, send_buf);
  163. if (zmq_send(zmq->responder, send_buf, strlen(send_buf), 0) == -1)
  164. av_log(ctx, AV_LOG_ERROR, "Failed to send reply for command #%d: %s\n",
  165. zmq->command_count, zmq_strerror(ret));
  166. end:
  167. av_freep(&send_buf);
  168. av_freep(&recv_buf);
  169. recv_buf_size = 0;
  170. av_freep(&cmd.target);
  171. av_freep(&cmd.command);
  172. av_freep(&cmd.arg);
  173. }
  174. return ff_filter_frame(ctx->outputs[0], ref);
  175. }
  176. #if CONFIG_ZMQ_FILTER
  177. #define zmq_options options
  178. AVFILTER_DEFINE_CLASS(zmq);
  179. static const AVFilterPad zmq_inputs[] = {
  180. {
  181. .name = "default",
  182. .type = AVMEDIA_TYPE_VIDEO,
  183. .filter_frame = filter_frame,
  184. },
  185. { NULL }
  186. };
  187. static const AVFilterPad zmq_outputs[] = {
  188. {
  189. .name = "default",
  190. .type = AVMEDIA_TYPE_VIDEO,
  191. },
  192. { NULL }
  193. };
  194. AVFilter ff_vf_zmq = {
  195. .name = "zmq",
  196. .description = NULL_IF_CONFIG_SMALL("Receive commands through ZMQ and broker them to filters."),
  197. .init = init,
  198. .uninit = uninit,
  199. .priv_size = sizeof(ZMQContext),
  200. .inputs = zmq_inputs,
  201. .outputs = zmq_outputs,
  202. .priv_class = &zmq_class,
  203. };
  204. #endif
  205. #if CONFIG_AZMQ_FILTER
  206. #define azmq_options options
  207. AVFILTER_DEFINE_CLASS(azmq);
  208. static const AVFilterPad azmq_inputs[] = {
  209. {
  210. .name = "default",
  211. .type = AVMEDIA_TYPE_AUDIO,
  212. .filter_frame = filter_frame,
  213. },
  214. { NULL }
  215. };
  216. static const AVFilterPad azmq_outputs[] = {
  217. {
  218. .name = "default",
  219. .type = AVMEDIA_TYPE_AUDIO,
  220. },
  221. { NULL }
  222. };
  223. AVFilter ff_af_azmq = {
  224. .name = "azmq",
  225. .description = NULL_IF_CONFIG_SMALL("Receive commands through ZMQ and broker them to filters."),
  226. .init = init,
  227. .uninit = uninit,
  228. .priv_size = sizeof(ZMQContext),
  229. .inputs = azmq_inputs,
  230. .outputs = azmq_outputs,
  231. .priv_class = &azmq_class,
  232. };
  233. #endif