Subversion Repositories Kolibri OS

Rev

Blame | Last modification | View Log | RSS feed

  1. /*
  2.  * UDP prototype streaming system
  3.  * Copyright (c) 2000, 2001, 2002 Fabrice Bellard
  4.  *
  5.  * This file is part of FFmpeg.
  6.  *
  7.  * FFmpeg is free software; you can redistribute it and/or
  8.  * modify it under the terms of the GNU Lesser General Public
  9.  * License as published by the Free Software Foundation; either
  10.  * version 2.1 of the License, or (at your option) any later version.
  11.  *
  12.  * FFmpeg is distributed in the hope that it will be useful,
  13.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  15.  * Lesser General Public License for more details.
  16.  *
  17.  * You should have received a copy of the GNU Lesser General Public
  18.  * License along with FFmpeg; if not, write to the Free Software
  19.  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  20.  */
  21.  
  22. /**
  23.  * @file
  24.  * UDP protocol
  25.  */
  26.  
  27. #define _BSD_SOURCE     /* Needed for using struct ip_mreq with recent glibc */
  28.  
  29. #include "avformat.h"
  30. #include "avio_internal.h"
  31. #include "libavutil/parseutils.h"
  32. #include "libavutil/fifo.h"
  33. #include "libavutil/intreadwrite.h"
  34. #include "libavutil/avstring.h"
  35. #include "libavutil/opt.h"
  36. #include "libavutil/log.h"
  37. #include "libavutil/time.h"
  38. #include "internal.h"
  39. #include "network.h"
  40. #include "os_support.h"
  41. #include "url.h"
  42.  
  43. #if HAVE_UDPLITE_H
  44. #include "udplite.h"
  45. #else
  46. /* On many Linux systems, udplite.h is missing but the kernel supports UDP-Lite.
  47.  * So, we provide a fallback here.
  48.  */
  49. #define UDPLITE_SEND_CSCOV                               10
  50. #define UDPLITE_RECV_CSCOV                               11
  51. #endif
  52.  
  53. #ifndef IPPROTO_UDPLITE
  54. #define IPPROTO_UDPLITE                                  136
  55. #endif
  56.  
  57. #if HAVE_PTHREAD_CANCEL
  58. #include <pthread.h>
  59. #endif
  60.  
  61. #ifndef HAVE_PTHREAD_CANCEL
  62. #define HAVE_PTHREAD_CANCEL 0
  63. #endif
  64.  
  65. #ifndef IPV6_ADD_MEMBERSHIP
  66. #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
  67. #define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
  68. #endif
  69.  
  70. #define UDP_TX_BUF_SIZE 32768
  71. #define UDP_MAX_PKT_SIZE 65536
  72. #define UDP_HEADER_SIZE 8
  73.  
  74. typedef struct UDPContext {
  75.     const AVClass *class;
  76.     int udp_fd;
  77.     int ttl;
  78.     int udplite_coverage;
  79.     int buffer_size;
  80.     int pkt_size;
  81.     int is_multicast;
  82.     int is_broadcast;
  83.     int local_port;
  84.     int reuse_socket;
  85.     int overrun_nonfatal;
  86.     struct sockaddr_storage dest_addr;
  87.     int dest_addr_len;
  88.     int is_connected;
  89.  
  90.     /* Circular Buffer variables for use in UDP receive code */
  91.     int circular_buffer_size;
  92.     AVFifoBuffer *fifo;
  93.     int circular_buffer_error;
  94. #if HAVE_PTHREAD_CANCEL
  95.     pthread_t circular_buffer_thread;
  96.     pthread_mutex_t mutex;
  97.     pthread_cond_t cond;
  98.     int thread_started;
  99. #endif
  100.     uint8_t tmp[UDP_MAX_PKT_SIZE+4];
  101.     int remaining_in_dg;
  102.     char *localaddr;
  103.     int timeout;
  104.     struct sockaddr_storage local_addr_storage;
  105.     char *sources;
  106.     char *block;
  107. } UDPContext;
  108.  
  109. #define OFFSET(x) offsetof(UDPContext, x)
  110. #define D AV_OPT_FLAG_DECODING_PARAM
  111. #define E AV_OPT_FLAG_ENCODING_PARAM
  112. static const AVOption options[] = {
  113.     { "buffer_size",    "System data size (in bytes)",                     OFFSET(buffer_size),    AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, .flags = D|E },
  114.     { "localport",      "Local port",                                      OFFSET(local_port),     AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, D|E },
  115.     { "local_port",     "Local port",                                      OFFSET(local_port),     AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, INT_MAX, .flags = D|E },
  116.     { "localaddr",      "Local address",                                   OFFSET(localaddr),      AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
  117.     { "udplite_coverage", "choose UDPLite head size which should be validated by checksum", OFFSET(udplite_coverage), AV_OPT_TYPE_INT, {.i64 = 0}, 0, INT_MAX, D|E },
  118.     { "pkt_size",       "Maximum UDP packet size",                         OFFSET(pkt_size),       AV_OPT_TYPE_INT,    { .i64 = 1472 },  -1, INT_MAX, .flags = D|E },
  119.     { "reuse",          "explicitly allow reusing UDP sockets",            OFFSET(reuse_socket),   AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, 1,       D|E },
  120.     { "reuse_socket",   "explicitly allow reusing UDP sockets",            OFFSET(reuse_socket),   AV_OPT_TYPE_INT,    { .i64 = -1 },    -1, 1,       .flags = D|E },
  121.     { "broadcast", "explicitly allow or disallow broadcast destination",   OFFSET(is_broadcast),   AV_OPT_TYPE_INT,    { .i64 = 0  },     0, 1,       E },
  122.     { "ttl",            "Time to live (multicast only)",                   OFFSET(ttl),            AV_OPT_TYPE_INT,    { .i64 = 16 },     0, INT_MAX, E },
  123.     { "connect",        "set if connect() should be called on socket",     OFFSET(is_connected),   AV_OPT_TYPE_INT,    { .i64 =  0 },     0, 1,       .flags = D|E },
  124.     { "fifo_size",      "set the UDP receiving circular buffer size, expressed as a number of packets with size of 188 bytes", OFFSET(circular_buffer_size), AV_OPT_TYPE_INT, {.i64 = 7*4096}, 0, INT_MAX, D },
  125.     { "overrun_nonfatal", "survive in case of UDP receiving circular buffer overrun", OFFSET(overrun_nonfatal), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 1,    D },
  126.     { "timeout",        "set raise error timeout (only in read mode)",     OFFSET(timeout),        AV_OPT_TYPE_INT,    { .i64 = 0 },      0, INT_MAX, D },
  127.     { "sources",        "Source list",                                     OFFSET(sources),        AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
  128.     { "block",          "Block list",                                      OFFSET(block),          AV_OPT_TYPE_STRING, { .str = NULL },               .flags = D|E },
  129.     { NULL }
  130. };
  131.  
  132. static const AVClass udp_class = {
  133.     .class_name = "udp",
  134.     .item_name  = av_default_item_name,
  135.     .option     = options,
  136.     .version    = LIBAVUTIL_VERSION_INT,
  137. };
  138.  
  139. static const AVClass udplite_context_class = {
  140.     .class_name     = "udplite",
  141.     .item_name      = av_default_item_name,
  142.     .option         = options,
  143.     .version        = LIBAVUTIL_VERSION_INT,
  144. };
  145.  
  146. static void log_net_error(void *ctx, int level, const char* prefix)
  147. {
  148.     char errbuf[100];
  149.     av_strerror(ff_neterrno(), errbuf, sizeof(errbuf));
  150.     av_log(ctx, level, "%s: %s\n", prefix, errbuf);
  151. }
  152.  
  153. static int udp_set_multicast_ttl(int sockfd, int mcastTTL,
  154.                                  struct sockaddr *addr)
  155. {
  156. #ifdef IP_MULTICAST_TTL
  157.     if (addr->sa_family == AF_INET) {
  158.         if (setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_TTL, &mcastTTL, sizeof(mcastTTL)) < 0) {
  159.             log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_MULTICAST_TTL)");
  160.             return -1;
  161.         }
  162.     }
  163. #endif
  164. #if defined(IPPROTO_IPV6) && defined(IPV6_MULTICAST_HOPS)
  165.     if (addr->sa_family == AF_INET6) {
  166.         if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &mcastTTL, sizeof(mcastTTL)) < 0) {
  167.             log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_MULTICAST_HOPS)");
  168.             return -1;
  169.         }
  170.     }
  171. #endif
  172.     return 0;
  173. }
  174.  
  175. static int udp_join_multicast_group(int sockfd, struct sockaddr *addr,struct sockaddr *local_addr)
  176. {
  177. #ifdef IP_ADD_MEMBERSHIP
  178.     if (addr->sa_family == AF_INET) {
  179.         struct ip_mreq mreq;
  180.  
  181.         mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
  182.         if (local_addr)
  183.             mreq.imr_interface= ((struct sockaddr_in *)local_addr)->sin_addr;
  184.         else
  185.             mreq.imr_interface.s_addr= INADDR_ANY;
  186.         if (setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
  187.             log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_ADD_MEMBERSHIP)");
  188.             return -1;
  189.         }
  190.     }
  191. #endif
  192. #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
  193.     if (addr->sa_family == AF_INET6) {
  194.         struct ipv6_mreq mreq6;
  195.  
  196.         memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
  197.         mreq6.ipv6mr_interface= 0;
  198.         if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
  199.             log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_ADD_MEMBERSHIP)");
  200.             return -1;
  201.         }
  202.     }
  203. #endif
  204.     return 0;
  205. }
  206.  
  207. static int udp_leave_multicast_group(int sockfd, struct sockaddr *addr,struct sockaddr *local_addr)
  208. {
  209. #ifdef IP_DROP_MEMBERSHIP
  210.     if (addr->sa_family == AF_INET) {
  211.         struct ip_mreq mreq;
  212.  
  213.         mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
  214.         if (local_addr)
  215.             mreq.imr_interface= ((struct sockaddr_in *)local_addr)->sin_addr;
  216.         else
  217.             mreq.imr_interface.s_addr= INADDR_ANY;
  218.         if (setsockopt(sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
  219.             log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_DROP_MEMBERSHIP)");
  220.             return -1;
  221.         }
  222.     }
  223. #endif
  224. #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
  225.     if (addr->sa_family == AF_INET6) {
  226.         struct ipv6_mreq mreq6;
  227.  
  228.         memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
  229.         mreq6.ipv6mr_interface= 0;
  230.         if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
  231.             log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IPV6_DROP_MEMBERSHIP)");
  232.             return -1;
  233.         }
  234.     }
  235. #endif
  236.     return 0;
  237. }
  238.  
  239. static struct addrinfo* udp_resolve_host(const char *hostname, int port,
  240.                                          int type, int family, int flags)
  241. {
  242.     struct addrinfo hints = { 0 }, *res = 0;
  243.     int error;
  244.     char sport[16];
  245.     const char *node = 0, *service = "0";
  246.  
  247.     if (port > 0) {
  248.         snprintf(sport, sizeof(sport), "%d", port);
  249.         service = sport;
  250.     }
  251.     if ((hostname) && (hostname[0] != '\0') && (hostname[0] != '?')) {
  252.         node = hostname;
  253.     }
  254.     hints.ai_socktype = type;
  255.     hints.ai_family   = family;
  256.     hints.ai_flags = flags;
  257.     if ((error = getaddrinfo(node, service, &hints, &res))) {
  258.         res = NULL;
  259.         av_log(NULL, AV_LOG_ERROR, "udp_resolve_host: %s\n", gai_strerror(error));
  260.     }
  261.  
  262.     return res;
  263. }
  264.  
  265. static int udp_set_multicast_sources(int sockfd, struct sockaddr *addr,
  266.                                      int addr_len, char **sources,
  267.                                      int nb_sources, int include)
  268. {
  269. #if HAVE_STRUCT_GROUP_SOURCE_REQ && defined(MCAST_BLOCK_SOURCE) && !defined(_WIN32)
  270.     /* These ones are available in the microsoft SDK, but don't seem to work
  271.      * as on linux, so just prefer the v4-only approach there for now. */
  272.     int i;
  273.     for (i = 0; i < nb_sources; i++) {
  274.         struct group_source_req mreqs;
  275.         int level = addr->sa_family == AF_INET ? IPPROTO_IP : IPPROTO_IPV6;
  276.         struct addrinfo *sourceaddr = udp_resolve_host(sources[i], 0,
  277.                                                        SOCK_DGRAM, AF_UNSPEC,
  278.                                                        0);
  279.         if (!sourceaddr)
  280.             return AVERROR(ENOENT);
  281.  
  282.         mreqs.gsr_interface = 0;
  283.         memcpy(&mreqs.gsr_group, addr, addr_len);
  284.         memcpy(&mreqs.gsr_source, sourceaddr->ai_addr, sourceaddr->ai_addrlen);
  285.         freeaddrinfo(sourceaddr);
  286.  
  287.         if (setsockopt(sockfd, level,
  288.                        include ? MCAST_JOIN_SOURCE_GROUP : MCAST_BLOCK_SOURCE,
  289.                        (const void *)&mreqs, sizeof(mreqs)) < 0) {
  290.             if (include)
  291.                 log_net_error(NULL, AV_LOG_ERROR, "setsockopt(MCAST_JOIN_SOURCE_GROUP)");
  292.             else
  293.                 log_net_error(NULL, AV_LOG_ERROR, "setsockopt(MCAST_BLOCK_SOURCE)");
  294.             return ff_neterrno();
  295.         }
  296.     }
  297. #elif HAVE_STRUCT_IP_MREQ_SOURCE && defined(IP_BLOCK_SOURCE)
  298.     int i;
  299.     if (addr->sa_family != AF_INET) {
  300.         av_log(NULL, AV_LOG_ERROR,
  301.                "Setting multicast sources only supported for IPv4\n");
  302.         return AVERROR(EINVAL);
  303.     }
  304.     for (i = 0; i < nb_sources; i++) {
  305.         struct ip_mreq_source mreqs;
  306.         struct addrinfo *sourceaddr = udp_resolve_host(sources[i], 0,
  307.                                                        SOCK_DGRAM, AF_UNSPEC,
  308.                                                        0);
  309.         if (!sourceaddr)
  310.             return AVERROR(ENOENT);
  311.         if (sourceaddr->ai_addr->sa_family != AF_INET) {
  312.             freeaddrinfo(sourceaddr);
  313.             av_log(NULL, AV_LOG_ERROR, "%s is of incorrect protocol family\n",
  314.                    sources[i]);
  315.             return AVERROR(EINVAL);
  316.         }
  317.  
  318.         mreqs.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
  319.         mreqs.imr_interface.s_addr = INADDR_ANY;
  320.         mreqs.imr_sourceaddr.s_addr = ((struct sockaddr_in *)sourceaddr->ai_addr)->sin_addr.s_addr;
  321.         freeaddrinfo(sourceaddr);
  322.  
  323.         if (setsockopt(sockfd, IPPROTO_IP,
  324.                        include ? IP_ADD_SOURCE_MEMBERSHIP : IP_BLOCK_SOURCE,
  325.                        (const void *)&mreqs, sizeof(mreqs)) < 0) {
  326.             if (include)
  327.                 log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_ADD_SOURCE_MEMBERSHIP)");
  328.             else
  329.                 log_net_error(NULL, AV_LOG_ERROR, "setsockopt(IP_BLOCK_SOURCE)");
  330.             return ff_neterrno();
  331.         }
  332.     }
  333. #else
  334.     return AVERROR(ENOSYS);
  335. #endif
  336.     return 0;
  337. }
  338. static int udp_set_url(struct sockaddr_storage *addr,
  339.                        const char *hostname, int port)
  340. {
  341.     struct addrinfo *res0;
  342.     int addr_len;
  343.  
  344.     res0 = udp_resolve_host(hostname, port, SOCK_DGRAM, AF_UNSPEC, 0);
  345.     if (!res0) return AVERROR(EIO);
  346.     memcpy(addr, res0->ai_addr, res0->ai_addrlen);
  347.     addr_len = res0->ai_addrlen;
  348.     freeaddrinfo(res0);
  349.  
  350.     return addr_len;
  351. }
  352.  
  353. static int udp_socket_create(UDPContext *s, struct sockaddr_storage *addr,
  354.                              socklen_t *addr_len, const char *localaddr)
  355. {
  356.     int udp_fd = -1;
  357.     struct addrinfo *res0, *res;
  358.     int family = AF_UNSPEC;
  359.  
  360.     if (((struct sockaddr *) &s->dest_addr)->sa_family)
  361.         family = ((struct sockaddr *) &s->dest_addr)->sa_family;
  362.     res0 = udp_resolve_host((localaddr && localaddr[0]) ? localaddr : NULL, s->local_port,
  363.                             SOCK_DGRAM, family, AI_PASSIVE);
  364.     if (!res0)
  365.         goto fail;
  366.     for (res = res0; res; res=res->ai_next) {
  367.         if (s->udplite_coverage)
  368.             udp_fd = ff_socket(res->ai_family, SOCK_DGRAM, IPPROTO_UDPLITE);
  369.         else
  370.             udp_fd = ff_socket(res->ai_family, SOCK_DGRAM, 0);
  371.         if (udp_fd != -1) break;
  372.         log_net_error(NULL, AV_LOG_ERROR, "socket");
  373.     }
  374.  
  375.     if (udp_fd < 0)
  376.         goto fail;
  377.  
  378.     memcpy(addr, res->ai_addr, res->ai_addrlen);
  379.     *addr_len = res->ai_addrlen;
  380.  
  381.     freeaddrinfo(res0);
  382.  
  383.     return udp_fd;
  384.  
  385.  fail:
  386.     if (udp_fd >= 0)
  387.         closesocket(udp_fd);
  388.     if(res0)
  389.         freeaddrinfo(res0);
  390.     return -1;
  391. }
  392.  
  393. static int udp_port(struct sockaddr_storage *addr, int addr_len)
  394. {
  395.     char sbuf[sizeof(int)*3+1];
  396.     int error;
  397.  
  398.     if ((error = getnameinfo((struct sockaddr *)addr, addr_len, NULL, 0,  sbuf, sizeof(sbuf), NI_NUMERICSERV)) != 0) {
  399.         av_log(NULL, AV_LOG_ERROR, "getnameinfo: %s\n", gai_strerror(error));
  400.         return -1;
  401.     }
  402.  
  403.     return strtol(sbuf, NULL, 10);
  404. }
  405.  
  406.  
  407. /**
  408.  * If no filename is given to av_open_input_file because you want to
  409.  * get the local port first, then you must call this function to set
  410.  * the remote server address.
  411.  *
  412.  * url syntax: udp://host:port[?option=val...]
  413.  * option: 'ttl=n'       : set the ttl value (for multicast only)
  414.  *         'localport=n' : set the local port
  415.  *         'pkt_size=n'  : set max packet size
  416.  *         'reuse=1'     : enable reusing the socket
  417.  *         'overrun_nonfatal=1': survive in case of circular buffer overrun
  418.  *
  419.  * @param h media file context
  420.  * @param uri of the remote server
  421.  * @return zero if no error.
  422.  */
  423. int ff_udp_set_remote_url(URLContext *h, const char *uri)
  424. {
  425.     UDPContext *s = h->priv_data;
  426.     char hostname[256], buf[10];
  427.     int port;
  428.     const char *p;
  429.  
  430.     av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
  431.  
  432.     /* set the destination address */
  433.     s->dest_addr_len = udp_set_url(&s->dest_addr, hostname, port);
  434.     if (s->dest_addr_len < 0) {
  435.         return AVERROR(EIO);
  436.     }
  437.     s->is_multicast = ff_is_multicast_address((struct sockaddr*) &s->dest_addr);
  438.     p = strchr(uri, '?');
  439.     if (p) {
  440.         if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
  441.             int was_connected = s->is_connected;
  442.             s->is_connected = strtol(buf, NULL, 10);
  443.             if (s->is_connected && !was_connected) {
  444.                 if (connect(s->udp_fd, (struct sockaddr *) &s->dest_addr,
  445.                             s->dest_addr_len)) {
  446.                     s->is_connected = 0;
  447.                     log_net_error(h, AV_LOG_ERROR, "connect");
  448.                     return AVERROR(EIO);
  449.                 }
  450.             }
  451.         }
  452.     }
  453.  
  454.     return 0;
  455. }
  456.  
  457. /**
  458.  * Return the local port used by the UDP connection
  459.  * @param h media file context
  460.  * @return the local port number
  461.  */
  462. int ff_udp_get_local_port(URLContext *h)
  463. {
  464.     UDPContext *s = h->priv_data;
  465.     return s->local_port;
  466. }
  467.  
  468. /**
  469.  * Return the udp file handle for select() usage to wait for several RTP
  470.  * streams at the same time.
  471.  * @param h media file context
  472.  */
  473. static int udp_get_file_handle(URLContext *h)
  474. {
  475.     UDPContext *s = h->priv_data;
  476.     return s->udp_fd;
  477. }
  478.  
  479. #if HAVE_PTHREAD_CANCEL
  480. static void *circular_buffer_task( void *_URLContext)
  481. {
  482.     URLContext *h = _URLContext;
  483.     UDPContext *s = h->priv_data;
  484.     int old_cancelstate;
  485.  
  486.     pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
  487.     pthread_mutex_lock(&s->mutex);
  488.     if (ff_socket_nonblock(s->udp_fd, 0) < 0) {
  489.         av_log(h, AV_LOG_ERROR, "Failed to set blocking mode");
  490.         s->circular_buffer_error = AVERROR(EIO);
  491.         goto end;
  492.     }
  493.     while(1) {
  494.         int len;
  495.  
  496.         pthread_mutex_unlock(&s->mutex);
  497.         /* Blocking operations are always cancellation points;
  498.            see "General Information" / "Thread Cancelation Overview"
  499.            in Single Unix. */
  500.         pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancelstate);
  501.         len = recv(s->udp_fd, s->tmp+4, sizeof(s->tmp)-4, 0);
  502.         pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancelstate);
  503.         pthread_mutex_lock(&s->mutex);
  504.         if (len < 0) {
  505.             if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) {
  506.                 s->circular_buffer_error = ff_neterrno();
  507.                 goto end;
  508.             }
  509.             continue;
  510.         }
  511.         AV_WL32(s->tmp, len);
  512.  
  513.         if(av_fifo_space(s->fifo) < len + 4) {
  514.             /* No Space left */
  515.             if (s->overrun_nonfatal) {
  516.                 av_log(h, AV_LOG_WARNING, "Circular buffer overrun. "
  517.                         "Surviving due to overrun_nonfatal option\n");
  518.                 continue;
  519.             } else {
  520.                 av_log(h, AV_LOG_ERROR, "Circular buffer overrun. "
  521.                         "To avoid, increase fifo_size URL option. "
  522.                         "To survive in such case, use overrun_nonfatal option\n");
  523.                 s->circular_buffer_error = AVERROR(EIO);
  524.                 goto end;
  525.             }
  526.         }
  527.         av_fifo_generic_write(s->fifo, s->tmp, len+4, NULL);
  528.         pthread_cond_signal(&s->cond);
  529.     }
  530.  
  531. end:
  532.     pthread_cond_signal(&s->cond);
  533.     pthread_mutex_unlock(&s->mutex);
  534.     return NULL;
  535. }
  536. #endif
  537.  
  538. static int parse_source_list(char *buf, char **sources, int *num_sources,
  539.                              int max_sources)
  540. {
  541.     char *source_start;
  542.  
  543.     source_start = buf;
  544.     while (1) {
  545.         char *next = strchr(source_start, ',');
  546.         if (next)
  547.             *next = '\0';
  548.         sources[*num_sources] = av_strdup(source_start);
  549.         if (!sources[*num_sources])
  550.             return AVERROR(ENOMEM);
  551.         source_start = next + 1;
  552.         (*num_sources)++;
  553.         if (*num_sources >= max_sources || !next)
  554.             break;
  555.     }
  556.     return 0;
  557. }
  558.  
  559. /* put it in UDP context */
  560. /* return non zero if error */
  561. static int udp_open(URLContext *h, const char *uri, int flags)
  562. {
  563.     char hostname[1024], localaddr[1024] = "";
  564.     int port, udp_fd = -1, tmp, bind_ret = -1, dscp = -1;
  565.     UDPContext *s = h->priv_data;
  566.     int is_output;
  567.     const char *p;
  568.     char buf[256];
  569.     struct sockaddr_storage my_addr;
  570.     socklen_t len;
  571.     int i, num_include_sources = 0, num_exclude_sources = 0;
  572.     char *include_sources[32], *exclude_sources[32];
  573.  
  574.     h->is_streamed = 1;
  575.  
  576.     is_output = !(flags & AVIO_FLAG_READ);
  577.     if (s->buffer_size < 0)
  578.         s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_MAX_PKT_SIZE;
  579.  
  580.     if (s->sources) {
  581.         if (parse_source_list(s->sources, include_sources,
  582.                               &num_include_sources,
  583.                               FF_ARRAY_ELEMS(include_sources)))
  584.             goto fail;
  585.     }
  586.  
  587.     if (s->block) {
  588.         if (parse_source_list(s->block, exclude_sources, &num_exclude_sources,
  589.                               FF_ARRAY_ELEMS(exclude_sources)))
  590.             goto fail;
  591.     }
  592.  
  593.     if (s->pkt_size > 0)
  594.         h->max_packet_size = s->pkt_size;
  595.  
  596.     p = strchr(uri, '?');
  597.     if (p) {
  598.         if (av_find_info_tag(buf, sizeof(buf), "reuse", p)) {
  599.             char *endptr = NULL;
  600.             s->reuse_socket = strtol(buf, &endptr, 10);
  601.             /* assume if no digits were found it is a request to enable it */
  602.             if (buf == endptr)
  603.                 s->reuse_socket = 1;
  604.         }
  605.         if (av_find_info_tag(buf, sizeof(buf), "overrun_nonfatal", p)) {
  606.             char *endptr = NULL;
  607.             s->overrun_nonfatal = strtol(buf, &endptr, 10);
  608.             /* assume if no digits were found it is a request to enable it */
  609.             if (buf == endptr)
  610.                 s->overrun_nonfatal = 1;
  611.             if (!HAVE_PTHREAD_CANCEL)
  612.                 av_log(h, AV_LOG_WARNING,
  613.                        "'overrun_nonfatal' option was set but it is not supported "
  614.                        "on this build (pthread support is required)\n");
  615.         }
  616.         if (av_find_info_tag(buf, sizeof(buf), "ttl", p)) {
  617.             s->ttl = strtol(buf, NULL, 10);
  618.         }
  619.         if (av_find_info_tag(buf, sizeof(buf), "udplite_coverage", p)) {
  620.             s->udplite_coverage = strtol(buf, NULL, 10);
  621.         }
  622.         if (av_find_info_tag(buf, sizeof(buf), "localport", p)) {
  623.             s->local_port = strtol(buf, NULL, 10);
  624.         }
  625.         if (av_find_info_tag(buf, sizeof(buf), "pkt_size", p)) {
  626.             s->pkt_size = strtol(buf, NULL, 10);
  627.         }
  628.         if (av_find_info_tag(buf, sizeof(buf), "buffer_size", p)) {
  629.             s->buffer_size = strtol(buf, NULL, 10);
  630.         }
  631.         if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
  632.             s->is_connected = strtol(buf, NULL, 10);
  633.         }
  634.         if (av_find_info_tag(buf, sizeof(buf), "dscp", p)) {
  635.             dscp = strtol(buf, NULL, 10);
  636.         }
  637.         if (av_find_info_tag(buf, sizeof(buf), "fifo_size", p)) {
  638.             s->circular_buffer_size = strtol(buf, NULL, 10);
  639.             if (!HAVE_PTHREAD_CANCEL)
  640.                 av_log(h, AV_LOG_WARNING,
  641.                        "'circular_buffer_size' option was set but it is not supported "
  642.                        "on this build (pthread support is required)\n");
  643.         }
  644.         if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) {
  645.             av_strlcpy(localaddr, buf, sizeof(localaddr));
  646.         }
  647.         if (av_find_info_tag(buf, sizeof(buf), "sources", p)) {
  648.             if (parse_source_list(buf, include_sources, &num_include_sources,
  649.                                   FF_ARRAY_ELEMS(include_sources)))
  650.                 goto fail;
  651.         }
  652.         if (av_find_info_tag(buf, sizeof(buf), "block", p)) {
  653.             if (parse_source_list(buf, exclude_sources, &num_exclude_sources,
  654.                                   FF_ARRAY_ELEMS(exclude_sources)))
  655.                 goto fail;
  656.         }
  657.         if (!is_output && av_find_info_tag(buf, sizeof(buf), "timeout", p))
  658.             s->timeout = strtol(buf, NULL, 10);
  659.         if (is_output && av_find_info_tag(buf, sizeof(buf), "broadcast", p))
  660.             s->is_broadcast = strtol(buf, NULL, 10);
  661.     }
  662.     /* handling needed to support options picking from both AVOption and URL */
  663.     s->circular_buffer_size *= 188;
  664.     if (flags & AVIO_FLAG_WRITE) {
  665.         h->max_packet_size = s->pkt_size;
  666.     } else {
  667.         h->max_packet_size = UDP_MAX_PKT_SIZE;
  668.     }
  669.     h->rw_timeout = s->timeout;
  670.  
  671.     /* fill the dest addr */
  672.     av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
  673.  
  674.     /* XXX: fix av_url_split */
  675.     if (hostname[0] == '\0' || hostname[0] == '?') {
  676.         /* only accepts null hostname if input */
  677.         if (!(flags & AVIO_FLAG_READ))
  678.             goto fail;
  679.     } else {
  680.         if (ff_udp_set_remote_url(h, uri) < 0)
  681.             goto fail;
  682.     }
  683.  
  684.     if ((s->is_multicast || s->local_port <= 0) && (h->flags & AVIO_FLAG_READ))
  685.         s->local_port = port;
  686.  
  687.     if (localaddr[0])
  688.         udp_fd = udp_socket_create(s, &my_addr, &len, localaddr);
  689.     else
  690.         udp_fd = udp_socket_create(s, &my_addr, &len, s->localaddr);
  691.     if (udp_fd < 0)
  692.         goto fail;
  693.  
  694.     s->local_addr_storage=my_addr; //store for future multicast join
  695.  
  696.     /* Follow the requested reuse option, unless it's multicast in which
  697.      * case enable reuse unless explicitly disabled.
  698.      */
  699.     if (s->reuse_socket > 0 || (s->is_multicast && s->reuse_socket < 0)) {
  700.         s->reuse_socket = 1;
  701.         if (setsockopt (udp_fd, SOL_SOCKET, SO_REUSEADDR, &(s->reuse_socket), sizeof(s->reuse_socket)) != 0)
  702.             goto fail;
  703.     }
  704.  
  705.     if (s->is_broadcast) {
  706. #ifdef SO_BROADCAST
  707.         if (setsockopt (udp_fd, SOL_SOCKET, SO_BROADCAST, &(s->is_broadcast), sizeof(s->is_broadcast)) != 0)
  708. #endif
  709.            goto fail;
  710.     }
  711.  
  712.     /* Set the checksum coverage for UDP-Lite (RFC 3828) for sending and receiving.
  713.      * The receiver coverage has to be less than or equal to the sender coverage.
  714.      * Otherwise, the receiver will drop all packets.
  715.      */
  716.     if (s->udplite_coverage) {
  717.         if (setsockopt (udp_fd, IPPROTO_UDPLITE, UDPLITE_SEND_CSCOV, &(s->udplite_coverage), sizeof(s->udplite_coverage)) != 0)
  718.             av_log(h, AV_LOG_WARNING, "socket option UDPLITE_SEND_CSCOV not available");
  719.  
  720.         if (setsockopt (udp_fd, IPPROTO_UDPLITE, UDPLITE_RECV_CSCOV, &(s->udplite_coverage), sizeof(s->udplite_coverage)) != 0)
  721.             av_log(h, AV_LOG_WARNING, "socket option UDPLITE_RECV_CSCOV not available");
  722.     }
  723.  
  724.     if (dscp >= 0) {
  725.         dscp <<= 2;
  726.         if (setsockopt (udp_fd, IPPROTO_IP, IP_TOS, &dscp, sizeof(dscp)) != 0)
  727.             goto fail;
  728.     }
  729.  
  730.     /* If multicast, try binding the multicast address first, to avoid
  731.      * receiving UDP packets from other sources aimed at the same UDP
  732.      * port. This fails on windows. This makes sending to the same address
  733.      * using sendto() fail, so only do it if we're opened in read-only mode. */
  734.     if (s->is_multicast && !(h->flags & AVIO_FLAG_WRITE)) {
  735.         bind_ret = bind(udp_fd,(struct sockaddr *)&s->dest_addr, len);
  736.     }
  737.     /* bind to the local address if not multicast or if the multicast
  738.      * bind failed */
  739.     /* the bind is needed to give a port to the socket now */
  740.     if (bind_ret < 0 && bind(udp_fd,(struct sockaddr *)&my_addr, len) < 0) {
  741.         log_net_error(h, AV_LOG_ERROR, "bind failed");
  742.         goto fail;
  743.     }
  744.  
  745.     len = sizeof(my_addr);
  746.     getsockname(udp_fd, (struct sockaddr *)&my_addr, &len);
  747.     s->local_port = udp_port(&my_addr, len);
  748.  
  749.     if (s->is_multicast) {
  750.         if (h->flags & AVIO_FLAG_WRITE) {
  751.             /* output */
  752.             if (udp_set_multicast_ttl(udp_fd, s->ttl, (struct sockaddr *)&s->dest_addr) < 0)
  753.                 goto fail;
  754.         }
  755.         if (h->flags & AVIO_FLAG_READ) {
  756.             /* input */
  757.             if (num_include_sources && num_exclude_sources) {
  758.                 av_log(h, AV_LOG_ERROR, "Simultaneously including and excluding multicast sources is not supported\n");
  759.                 goto fail;
  760.             }
  761.             if (num_include_sources) {
  762.                 if (udp_set_multicast_sources(udp_fd, (struct sockaddr *)&s->dest_addr, s->dest_addr_len, include_sources, num_include_sources, 1) < 0)
  763.                     goto fail;
  764.             } else {
  765.                 if (udp_join_multicast_group(udp_fd, (struct sockaddr *)&s->dest_addr,(struct sockaddr *)&s->local_addr_storage) < 0)
  766.                     goto fail;
  767.             }
  768.             if (num_exclude_sources) {
  769.                 if (udp_set_multicast_sources(udp_fd, (struct sockaddr *)&s->dest_addr, s->dest_addr_len, exclude_sources, num_exclude_sources, 0) < 0)
  770.                     goto fail;
  771.             }
  772.         }
  773.     }
  774.  
  775.     if (is_output) {
  776.         /* limit the tx buf size to limit latency */
  777.         tmp = s->buffer_size;
  778.         if (setsockopt(udp_fd, SOL_SOCKET, SO_SNDBUF, &tmp, sizeof(tmp)) < 0) {
  779.             log_net_error(h, AV_LOG_ERROR, "setsockopt(SO_SNDBUF)");
  780.             goto fail;
  781.         }
  782.     } else {
  783.         /* set udp recv buffer size to the requested value (default 64K) */
  784.         tmp = s->buffer_size;
  785.         if (setsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, sizeof(tmp)) < 0) {
  786.             log_net_error(h, AV_LOG_WARNING, "setsockopt(SO_RECVBUF)");
  787.         }
  788.         len = sizeof(tmp);
  789.         if (getsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, &len) < 0) {
  790.             log_net_error(h, AV_LOG_WARNING, "getsockopt(SO_RCVBUF)");
  791.         } else {
  792.             av_log(h, AV_LOG_DEBUG, "end receive buffer size reported is %d\n", tmp);
  793.             if(tmp < s->buffer_size)
  794.                 av_log(h, AV_LOG_WARNING, "attempted to set receive buffer to size %d but it only ended up set as %d", s->buffer_size, tmp);
  795.         }
  796.  
  797.         /* make the socket non-blocking */
  798.         ff_socket_nonblock(udp_fd, 1);
  799.     }
  800.     if (s->is_connected) {
  801.         if (connect(udp_fd, (struct sockaddr *) &s->dest_addr, s->dest_addr_len)) {
  802.             log_net_error(h, AV_LOG_ERROR, "connect");
  803.             goto fail;
  804.         }
  805.     }
  806.  
  807.     for (i = 0; i < num_include_sources; i++)
  808.         av_freep(&include_sources[i]);
  809.     for (i = 0; i < num_exclude_sources; i++)
  810.         av_freep(&exclude_sources[i]);
  811.  
  812.     s->udp_fd = udp_fd;
  813.  
  814. #if HAVE_PTHREAD_CANCEL
  815.     if (!is_output && s->circular_buffer_size) {
  816.         int ret;
  817.  
  818.         /* start the task going */
  819.         s->fifo = av_fifo_alloc(s->circular_buffer_size);
  820.         ret = pthread_mutex_init(&s->mutex, NULL);
  821.         if (ret != 0) {
  822.             av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
  823.             goto fail;
  824.         }
  825.         ret = pthread_cond_init(&s->cond, NULL);
  826.         if (ret != 0) {
  827.             av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
  828.             goto cond_fail;
  829.         }
  830.         ret = pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h);
  831.         if (ret != 0) {
  832.             av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
  833.             goto thread_fail;
  834.         }
  835.         s->thread_started = 1;
  836.     }
  837. #endif
  838.  
  839.     return 0;
  840. #if HAVE_PTHREAD_CANCEL
  841.  thread_fail:
  842.     pthread_cond_destroy(&s->cond);
  843.  cond_fail:
  844.     pthread_mutex_destroy(&s->mutex);
  845. #endif
  846.  fail:
  847.     if (udp_fd >= 0)
  848.         closesocket(udp_fd);
  849.     av_fifo_freep(&s->fifo);
  850.     for (i = 0; i < num_include_sources; i++)
  851.         av_freep(&include_sources[i]);
  852.     for (i = 0; i < num_exclude_sources; i++)
  853.         av_freep(&exclude_sources[i]);
  854.     return AVERROR(EIO);
  855. }
  856.  
  857. static int udplite_open(URLContext *h, const char *uri, int flags)
  858. {
  859.     UDPContext *s = h->priv_data;
  860.  
  861.     // set default checksum coverage
  862.     s->udplite_coverage = UDP_HEADER_SIZE;
  863.  
  864.     return udp_open(h, uri, flags);
  865. }
  866.  
  867. static int udp_read(URLContext *h, uint8_t *buf, int size)
  868. {
  869.     UDPContext *s = h->priv_data;
  870.     int ret;
  871. #if HAVE_PTHREAD_CANCEL
  872.     int avail, nonblock = h->flags & AVIO_FLAG_NONBLOCK;
  873.  
  874.     if (s->fifo) {
  875.         pthread_mutex_lock(&s->mutex);
  876.         do {
  877.             avail = av_fifo_size(s->fifo);
  878.             if (avail) { // >=size) {
  879.                 uint8_t tmp[4];
  880.  
  881.                 av_fifo_generic_read(s->fifo, tmp, 4, NULL);
  882.                 avail= AV_RL32(tmp);
  883.                 if(avail > size){
  884.                     av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n");
  885.                     avail= size;
  886.                 }
  887.  
  888.                 av_fifo_generic_read(s->fifo, buf, avail, NULL);
  889.                 av_fifo_drain(s->fifo, AV_RL32(tmp) - avail);
  890.                 pthread_mutex_unlock(&s->mutex);
  891.                 return avail;
  892.             } else if(s->circular_buffer_error){
  893.                 int err = s->circular_buffer_error;
  894.                 pthread_mutex_unlock(&s->mutex);
  895.                 return err;
  896.             } else if(nonblock) {
  897.                 pthread_mutex_unlock(&s->mutex);
  898.                 return AVERROR(EAGAIN);
  899.             }
  900.             else {
  901.                 /* FIXME: using the monotonic clock would be better,
  902.                    but it does not exist on all supported platforms. */
  903.                 int64_t t = av_gettime() + 100000;
  904.                 struct timespec tv = { .tv_sec  =  t / 1000000,
  905.                                        .tv_nsec = (t % 1000000) * 1000 };
  906.                 if (pthread_cond_timedwait(&s->cond, &s->mutex, &tv) < 0) {
  907.                     pthread_mutex_unlock(&s->mutex);
  908.                     return AVERROR(errno == ETIMEDOUT ? EAGAIN : errno);
  909.                 }
  910.                 nonblock = 1;
  911.             }
  912.         } while( 1);
  913.     }
  914. #endif
  915.  
  916.     if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
  917.         ret = ff_network_wait_fd(s->udp_fd, 0);
  918.         if (ret < 0)
  919.             return ret;
  920.     }
  921.     ret = recv(s->udp_fd, buf, size, 0);
  922.  
  923.     return ret < 0 ? ff_neterrno() : ret;
  924. }
  925.  
  926. static int udp_write(URLContext *h, const uint8_t *buf, int size)
  927. {
  928.     UDPContext *s = h->priv_data;
  929.     int ret;
  930.  
  931.     if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
  932.         ret = ff_network_wait_fd(s->udp_fd, 1);
  933.         if (ret < 0)
  934.             return ret;
  935.     }
  936.  
  937.     if (!s->is_connected) {
  938.         ret = sendto (s->udp_fd, buf, size, 0,
  939.                       (struct sockaddr *) &s->dest_addr,
  940.                       s->dest_addr_len);
  941.     } else
  942.         ret = send(s->udp_fd, buf, size, 0);
  943.  
  944.     return ret < 0 ? ff_neterrno() : ret;
  945. }
  946.  
  947. static int udp_close(URLContext *h)
  948. {
  949.     UDPContext *s = h->priv_data;
  950.  
  951.     if (s->is_multicast && (h->flags & AVIO_FLAG_READ))
  952.         udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr,(struct sockaddr *)&s->local_addr_storage);
  953.     closesocket(s->udp_fd);
  954. #if HAVE_PTHREAD_CANCEL
  955.     if (s->thread_started) {
  956.         int ret;
  957.         pthread_cancel(s->circular_buffer_thread);
  958.         ret = pthread_join(s->circular_buffer_thread, NULL);
  959.         if (ret != 0)
  960.             av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
  961.         pthread_mutex_destroy(&s->mutex);
  962.         pthread_cond_destroy(&s->cond);
  963.     }
  964. #endif
  965.     av_fifo_freep(&s->fifo);
  966.     return 0;
  967. }
  968.  
  969. URLProtocol ff_udp_protocol = {
  970.     .name                = "udp",
  971.     .url_open            = udp_open,
  972.     .url_read            = udp_read,
  973.     .url_write           = udp_write,
  974.     .url_close           = udp_close,
  975.     .url_get_file_handle = udp_get_file_handle,
  976.     .priv_data_size      = sizeof(UDPContext),
  977.     .priv_data_class     = &udp_class,
  978.     .flags               = URL_PROTOCOL_FLAG_NETWORK,
  979. };
  980.  
  981. URLProtocol ff_udplite_protocol = {
  982.     .name                = "udplite",
  983.     .url_open            = udplite_open,
  984.     .url_read            = udp_read,
  985.     .url_write           = udp_write,
  986.     .url_close           = udp_close,
  987.     .url_get_file_handle = udp_get_file_handle,
  988.     .priv_data_size      = sizeof(UDPContext),
  989.     .priv_data_class     = &udplite_context_class,
  990.     .flags               = URL_PROTOCOL_FLAG_NETWORK,
  991. };
  992.