You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

702 lines
21KB

  1. /*
  2. * UDP prototype streaming system
  3. * Copyright (c) 2000, 2001, 2002 Fabrice Bellard
  4. *
  5. * This file is part of FFmpeg.
  6. *
  7. * FFmpeg is free software; you can redistribute it and/or
  8. * modify it under the terms of the GNU Lesser General Public
  9. * License as published by the Free Software Foundation; either
  10. * version 2.1 of the License, or (at your option) any later version.
  11. *
  12. * FFmpeg is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  15. * Lesser General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU Lesser General Public
  18. * License along with FFmpeg; if not, write to the Free Software
  19. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  20. */
  21. /**
  22. * @file
  23. * UDP protocol
  24. */
  25. #define _BSD_SOURCE /* Needed for using struct ip_mreq with recent glibc */
  26. #include "avformat.h"
  27. #include "avio_internal.h"
  28. #include "libavutil/parseutils.h"
  29. #include "libavutil/fifo.h"
  30. #include "libavutil/intreadwrite.h"
  31. #include "libavutil/avstring.h"
  32. #include <unistd.h>
  33. #include "internal.h"
  34. #include "network.h"
  35. #ifdef WIN32
  36. # define if_nametoindex( str ) atoi( str )
  37. #else
  38. # include <unistd.h>
  39. # include <net/if.h>
  40. #endif
  41. #include "os_support.h"
  42. #include "url.h"
  43. #if HAVE_PTHREADS
  44. #include <pthread.h>
  45. #endif
  46. #include <sys/time.h>
  47. #ifndef IPV6_ADD_MEMBERSHIP
  48. #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
  49. #define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
  50. #endif
  51. #define UDP_TX_BUF_SIZE 32768
  52. #define UDP_MAX_PKT_SIZE 65536
  53. typedef struct {
  54. int udp_fd;
  55. int ttl;
  56. int buffer_size;
  57. int is_multicast;
  58. int local_port;
  59. int reuse_socket;
  60. struct sockaddr_storage dest_addr;
  61. int dest_addr_len;
  62. int is_connected;
  63. /* Circular Buffer variables for use in UDP receive code */
  64. int circular_buffer_size;
  65. AVFifoBuffer *fifo;
  66. int circular_buffer_error;
  67. #if HAVE_PTHREADS
  68. pthread_t circular_buffer_thread;
  69. pthread_mutex_t mutex;
  70. pthread_cond_t cond;
  71. int thread_started;
  72. volatile int exit_thread;
  73. #endif
  74. uint8_t tmp[UDP_MAX_PKT_SIZE+4];
  75. int remaining_in_dg;
  76. } UDPContext;
  77. static int udp_set_multicast_ttl(int sockfd, int mcastTTL,
  78. struct sockaddr *addr)
  79. {
  80. #ifdef IP_MULTICAST_TTL
  81. if (addr->sa_family == AF_INET) {
  82. if (setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_TTL, &mcastTTL, sizeof(mcastTTL)) < 0) {
  83. av_log(NULL, AV_LOG_ERROR, "setsockopt(IP_MULTICAST_TTL): %s\n", strerror(errno));
  84. return -1;
  85. }
  86. }
  87. #endif
  88. #if defined(IPPROTO_IPV6) && defined(IPV6_MULTICAST_HOPS)
  89. if (addr->sa_family == AF_INET6) {
  90. if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &mcastTTL, sizeof(mcastTTL)) < 0) {
  91. av_log(NULL, AV_LOG_ERROR, "setsockopt(IPV6_MULTICAST_HOPS): %s\n", strerror(errno));
  92. return -1;
  93. }
  94. }
  95. #endif
  96. return 0;
  97. }
  98. static int udp_join_multicast_group(int sockfd, struct sockaddr *addr, int miface_nr)
  99. {
  100. #ifdef IP_ADD_MEMBERSHIP
  101. if (addr->sa_family == AF_INET) {
  102. struct ip_mreq mreq;
  103. struct group_req gr;
  104. mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
  105. mreq.imr_interface.s_addr= INADDR_ANY;
  106. if (setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
  107. av_log(NULL, AV_LOG_ERROR, "setsockopt(IP_ADD_MEMBERSHIP): %s\n", strerror(errno));
  108. return -1;
  109. }
  110. #ifdef MCAST_JOIN_GROUP
  111. memset(&gr, 0, sizeof(struct group_req));
  112. gr.gr_interface = miface_nr;
  113. memcpy(&gr.gr_group, addr, sizeof(struct sockaddr_in));
  114. if (setsockopt(sockfd, SOL_IP, MCAST_JOIN_GROUP, &gr, sizeof(struct group_req)) < 0) {
  115. av_log(NULL, AV_LOG_ERROR, "setsockopt(MCAST_JOIN_GROUP): %s\n", strerror(errno));
  116. return -1;
  117. }
  118. #endif
  119. }
  120. #endif
  121. #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
  122. if (addr->sa_family == AF_INET6) {
  123. struct ipv6_mreq mreq6;
  124. memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
  125. mreq6.ipv6mr_interface= 0;
  126. if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
  127. av_log(NULL, AV_LOG_ERROR, "setsockopt(IPV6_ADD_MEMBERSHIP): %s\n", strerror(errno));
  128. return -1;
  129. }
  130. }
  131. #endif
  132. return 0;
  133. }
  134. static int udp_leave_multicast_group(int sockfd, struct sockaddr *addr)
  135. {
  136. #ifdef IP_DROP_MEMBERSHIP
  137. if (addr->sa_family == AF_INET) {
  138. struct ip_mreq mreq;
  139. mreq.imr_multiaddr.s_addr = ((struct sockaddr_in *)addr)->sin_addr.s_addr;
  140. mreq.imr_interface.s_addr= INADDR_ANY;
  141. if (setsockopt(sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, (const void *)&mreq, sizeof(mreq)) < 0) {
  142. av_log(NULL, AV_LOG_ERROR, "setsockopt(IP_DROP_MEMBERSHIP): %s\n", strerror(errno));
  143. return -1;
  144. }
  145. }
  146. #endif
  147. #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
  148. if (addr->sa_family == AF_INET6) {
  149. struct ipv6_mreq mreq6;
  150. memcpy(&mreq6.ipv6mr_multiaddr, &(((struct sockaddr_in6 *)addr)->sin6_addr), sizeof(struct in6_addr));
  151. mreq6.ipv6mr_interface= 0;
  152. if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, &mreq6, sizeof(mreq6)) < 0) {
  153. av_log(NULL, AV_LOG_ERROR, "setsockopt(IPV6_DROP_MEMBERSHIP): %s\n", strerror(errno));
  154. return -1;
  155. }
  156. }
  157. #endif
  158. return 0;
  159. }
  160. static struct addrinfo* udp_resolve_host(const char *hostname, int port,
  161. int type, int family, int flags)
  162. {
  163. struct addrinfo hints, *res = 0;
  164. int error;
  165. char sport[16];
  166. const char *node = 0, *service = "0";
  167. if (port > 0) {
  168. snprintf(sport, sizeof(sport), "%d", port);
  169. service = sport;
  170. }
  171. if ((hostname) && (hostname[0] != '\0') && (hostname[0] != '?')) {
  172. node = hostname;
  173. }
  174. memset(&hints, 0, sizeof(hints));
  175. hints.ai_socktype = type;
  176. hints.ai_family = family;
  177. hints.ai_flags = flags;
  178. if ((error = getaddrinfo(node, service, &hints, &res))) {
  179. res = NULL;
  180. av_log(NULL, AV_LOG_ERROR, "udp_resolve_host: %s\n", gai_strerror(error));
  181. }
  182. return res;
  183. }
  184. static int udp_set_url(struct sockaddr_storage *addr,
  185. const char *hostname, int port)
  186. {
  187. struct addrinfo *res0;
  188. int addr_len;
  189. res0 = udp_resolve_host(hostname, port, SOCK_DGRAM, AF_UNSPEC, 0);
  190. if (res0 == 0) return AVERROR(EIO);
  191. memcpy(addr, res0->ai_addr, res0->ai_addrlen);
  192. addr_len = res0->ai_addrlen;
  193. freeaddrinfo(res0);
  194. return addr_len;
  195. }
  196. static int udp_socket_create(UDPContext *s, struct sockaddr_storage *addr,
  197. int *addr_len, const char *localaddr)
  198. {
  199. int udp_fd = -1;
  200. struct addrinfo *res0 = NULL, *res = NULL;
  201. int family = AF_UNSPEC;
  202. if (((struct sockaddr *) &s->dest_addr)->sa_family)
  203. family = ((struct sockaddr *) &s->dest_addr)->sa_family;
  204. res0 = udp_resolve_host(localaddr[0] ? localaddr : NULL, s->local_port,
  205. SOCK_DGRAM, family, AI_PASSIVE);
  206. if (res0 == 0)
  207. goto fail;
  208. for (res = res0; res; res=res->ai_next) {
  209. udp_fd = socket(res->ai_family, SOCK_DGRAM, 0);
  210. if (udp_fd > 0) break;
  211. av_log(NULL, AV_LOG_ERROR, "socket: %s\n", strerror(errno));
  212. }
  213. if (udp_fd < 0)
  214. goto fail;
  215. memcpy(addr, res->ai_addr, res->ai_addrlen);
  216. *addr_len = res->ai_addrlen;
  217. freeaddrinfo(res0);
  218. return udp_fd;
  219. fail:
  220. if (udp_fd >= 0)
  221. closesocket(udp_fd);
  222. if(res0)
  223. freeaddrinfo(res0);
  224. return -1;
  225. }
  226. static int udp_port(struct sockaddr_storage *addr, int addr_len)
  227. {
  228. char sbuf[sizeof(int)*3+1];
  229. if (getnameinfo((struct sockaddr *)addr, addr_len, NULL, 0, sbuf, sizeof(sbuf), NI_NUMERICSERV) != 0) {
  230. av_log(NULL, AV_LOG_ERROR, "getnameinfo: %s\n", strerror(errno));
  231. return -1;
  232. }
  233. return strtol(sbuf, NULL, 10);
  234. }
  235. /**
  236. * If no filename is given to av_open_input_file because you want to
  237. * get the local port first, then you must call this function to set
  238. * the remote server address.
  239. *
  240. * url syntax: udp://host:port[?option=val...]
  241. * option: 'ttl=n' : set the ttl value (for multicast only)
  242. * 'localport=n' : set the local port
  243. * 'pkt_size=n' : set max packet size
  244. * 'miface=if' : set multicast input interface
  245. * 'reuse=1' : enable reusing the socket
  246. *
  247. * @param h media file context
  248. * @param uri of the remote server
  249. * @return zero if no error.
  250. */
  251. int ff_udp_set_remote_url(URLContext *h, const char *uri)
  252. {
  253. UDPContext *s = h->priv_data;
  254. char hostname[256], buf[10];
  255. int port;
  256. const char *p;
  257. av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
  258. /* set the destination address */
  259. s->dest_addr_len = udp_set_url(&s->dest_addr, hostname, port);
  260. if (s->dest_addr_len < 0) {
  261. return AVERROR(EIO);
  262. }
  263. s->is_multicast = ff_is_multicast_address((struct sockaddr*) &s->dest_addr);
  264. p = strchr(uri, '?');
  265. if (p) {
  266. if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
  267. int was_connected = s->is_connected;
  268. s->is_connected = strtol(buf, NULL, 10);
  269. if (s->is_connected && !was_connected) {
  270. if (connect(s->udp_fd, (struct sockaddr *) &s->dest_addr,
  271. s->dest_addr_len)) {
  272. s->is_connected = 0;
  273. av_log(h, AV_LOG_ERROR, "connect: %s\n", strerror(errno));
  274. return AVERROR(EIO);
  275. }
  276. }
  277. }
  278. }
  279. return 0;
  280. }
  281. /**
  282. * Return the local port used by the UDP connection
  283. * @param h media file context
  284. * @return the local port number
  285. */
  286. int ff_udp_get_local_port(URLContext *h)
  287. {
  288. UDPContext *s = h->priv_data;
  289. return s->local_port;
  290. }
  291. /**
  292. * Return the udp file handle for select() usage to wait for several RTP
  293. * streams at the same time.
  294. * @param h media file context
  295. */
  296. static int udp_get_file_handle(URLContext *h)
  297. {
  298. UDPContext *s = h->priv_data;
  299. return s->udp_fd;
  300. }
  301. #if HAVE_PTHREADS
  302. static void *circular_buffer_task( void *_URLContext)
  303. {
  304. URLContext *h = _URLContext;
  305. UDPContext *s = h->priv_data;
  306. fd_set rfds;
  307. struct timeval tv;
  308. while(!s->exit_thread) {
  309. int left;
  310. int ret;
  311. int len;
  312. if (ff_check_interrupt(&h->interrupt_callback)) {
  313. s->circular_buffer_error = AVERROR(EINTR);
  314. goto end;
  315. }
  316. FD_ZERO(&rfds);
  317. FD_SET(s->udp_fd, &rfds);
  318. tv.tv_sec = 1;
  319. tv.tv_usec = 0;
  320. ret = select(s->udp_fd + 1, &rfds, NULL, NULL, &tv);
  321. if (ret < 0) {
  322. if (ff_neterrno() == AVERROR(EINTR))
  323. continue;
  324. s->circular_buffer_error = AVERROR(EIO);
  325. goto end;
  326. }
  327. if (!(ret > 0 && FD_ISSET(s->udp_fd, &rfds)))
  328. continue;
  329. /* How much do we have left to the end of the buffer */
  330. /* Whats the minimum we can read so that we dont comletely fill the buffer */
  331. left = av_fifo_space(s->fifo);
  332. /* No Space left, error, what do we do now */
  333. if(left < UDP_MAX_PKT_SIZE + 4) {
  334. av_log(h, AV_LOG_ERROR, "circular_buffer: OVERRUN\n");
  335. s->circular_buffer_error = AVERROR(EIO);
  336. goto end;
  337. }
  338. left = FFMIN(left, s->fifo->end - s->fifo->wptr);
  339. len = recv(s->udp_fd, s->tmp+4, sizeof(s->tmp)-4, 0);
  340. if (len < 0) {
  341. if (ff_neterrno() != AVERROR(EAGAIN) && ff_neterrno() != AVERROR(EINTR)) {
  342. s->circular_buffer_error = AVERROR(EIO);
  343. goto end;
  344. }
  345. continue;
  346. }
  347. AV_WL32(s->tmp, len);
  348. pthread_mutex_lock(&s->mutex);
  349. av_fifo_generic_write(s->fifo, s->tmp, len+4, NULL);
  350. pthread_cond_signal(&s->cond);
  351. pthread_mutex_unlock(&s->mutex);
  352. }
  353. end:
  354. pthread_mutex_lock(&s->mutex);
  355. pthread_cond_signal(&s->cond);
  356. pthread_mutex_unlock(&s->mutex);
  357. return NULL;
  358. }
  359. #endif
  360. /* put it in UDP context */
  361. /* return non zero if error */
  362. static int udp_open(URLContext *h, const char *uri, int flags)
  363. {
  364. char hostname[1024], localaddr[1024] = "";
  365. int port, udp_fd = -1, tmp, bind_ret = -1;
  366. UDPContext *s = h->priv_data;
  367. int is_output;
  368. const char *p;
  369. char buf[256];
  370. struct sockaddr_storage my_addr;
  371. int len;
  372. int reuse_specified = 0;
  373. int miface_nr = 0;
  374. h->is_streamed = 1;
  375. h->max_packet_size = 1472;
  376. is_output = !(flags & AVIO_FLAG_READ);
  377. s->ttl = 16;
  378. s->buffer_size = is_output ? UDP_TX_BUF_SIZE : UDP_MAX_PKT_SIZE;
  379. s->circular_buffer_size = 7*188*4096;
  380. p = strchr(uri, '?');
  381. if (p) {
  382. if (av_find_info_tag(buf, sizeof(buf), "reuse", p)) {
  383. char *endptr = NULL;
  384. s->reuse_socket = strtol(buf, &endptr, 10);
  385. /* assume if no digits were found it is a request to enable it */
  386. if (buf == endptr)
  387. s->reuse_socket = 1;
  388. reuse_specified = 1;
  389. }
  390. if (av_find_info_tag(buf, sizeof(buf), "ttl", p)) {
  391. s->ttl = strtol(buf, NULL, 10);
  392. }
  393. if (av_find_info_tag(buf, sizeof(buf), "localport", p)) {
  394. s->local_port = strtol(buf, NULL, 10);
  395. }
  396. if (av_find_info_tag(buf, sizeof(buf), "pkt_size", p)) {
  397. h->max_packet_size = strtol(buf, NULL, 10);
  398. }
  399. if (av_find_info_tag(buf, sizeof(buf), "buffer_size", p)) {
  400. s->buffer_size = strtol(buf, NULL, 10);
  401. }
  402. if (av_find_info_tag(buf, sizeof(buf), "connect", p)) {
  403. s->is_connected = strtol(buf, NULL, 10);
  404. }
  405. if (av_find_info_tag(buf, sizeof(buf), "fifo_size", p)) {
  406. s->circular_buffer_size = strtol(buf, NULL, 10)*188;
  407. }
  408. if (av_find_info_tag(buf, sizeof(buf), "localaddr", p)) {
  409. av_strlcpy(localaddr, buf, sizeof(localaddr));
  410. }
  411. if (av_find_info_tag(buf, sizeof(buf), "miface", p)) {
  412. miface_nr = if_nametoindex (buf);
  413. }
  414. }
  415. /* fill the dest addr */
  416. av_url_split(NULL, 0, NULL, 0, hostname, sizeof(hostname), &port, NULL, 0, uri);
  417. /* XXX: fix av_url_split */
  418. if (hostname[0] == '\0' || hostname[0] == '?') {
  419. /* only accepts null hostname if input */
  420. if (!(flags & AVIO_FLAG_READ))
  421. goto fail;
  422. } else {
  423. if (ff_udp_set_remote_url(h, uri) < 0)
  424. goto fail;
  425. }
  426. if ((s->is_multicast || !s->local_port) && (h->flags & AVIO_FLAG_READ))
  427. s->local_port = port;
  428. udp_fd = udp_socket_create(s, &my_addr, &len, localaddr);
  429. if (udp_fd < 0)
  430. goto fail;
  431. /* Follow the requested reuse option, unless it's multicast in which
  432. * case enable reuse unless explicitly disabled.
  433. */
  434. if (s->reuse_socket || (s->is_multicast && !reuse_specified)) {
  435. s->reuse_socket = 1;
  436. if (setsockopt (udp_fd, SOL_SOCKET, SO_REUSEADDR, &(s->reuse_socket), sizeof(s->reuse_socket)) != 0)
  437. goto fail;
  438. }
  439. /* If multicast, try binding the multicast address first, to avoid
  440. * receiving UDP packets from other sources aimed at the same UDP
  441. * port. This fails on windows. This makes sending to the same address
  442. * using sendto() fail, so only do it if we're opened in read-only mode. */
  443. if (s->is_multicast && !(h->flags & AVIO_FLAG_WRITE)) {
  444. bind_ret = bind(udp_fd,(struct sockaddr *)&s->dest_addr, len);
  445. }
  446. /* bind to the local address if not multicast or if the multicast
  447. * bind failed */
  448. /* the bind is needed to give a port to the socket now */
  449. if (bind_ret < 0 && bind(udp_fd,(struct sockaddr *)&my_addr, len) < 0) {
  450. av_log(h, AV_LOG_ERROR, "bind failed: %s\n", strerror(errno));
  451. goto fail;
  452. }
  453. len = sizeof(my_addr);
  454. getsockname(udp_fd, (struct sockaddr *)&my_addr, &len);
  455. s->local_port = udp_port(&my_addr, len);
  456. if (s->is_multicast) {
  457. if (h->flags & AVIO_FLAG_WRITE) {
  458. /* output */
  459. if (udp_set_multicast_ttl(udp_fd, s->ttl, (struct sockaddr *)&s->dest_addr) < 0)
  460. goto fail;
  461. }
  462. if (h->flags & AVIO_FLAG_READ) {
  463. /* input */
  464. if (udp_join_multicast_group(udp_fd, (struct sockaddr *)&s->dest_addr, miface_nr) < 0)
  465. goto fail;
  466. }
  467. }
  468. if (is_output) {
  469. /* limit the tx buf size to limit latency */
  470. tmp = s->buffer_size;
  471. if (setsockopt(udp_fd, SOL_SOCKET, SO_SNDBUF, &tmp, sizeof(tmp)) < 0) {
  472. av_log(h, AV_LOG_ERROR, "setsockopt(SO_SNDBUF): %s\n", strerror(errno));
  473. goto fail;
  474. }
  475. } else {
  476. /* set udp recv buffer size to the largest possible udp packet size to
  477. * avoid losing data on OSes that set this too low by default. */
  478. tmp = s->buffer_size;
  479. if (setsockopt(udp_fd, SOL_SOCKET, SO_RCVBUF, &tmp, sizeof(tmp)) < 0) {
  480. av_log(h, AV_LOG_WARNING, "setsockopt(SO_RECVBUF): %s\n", strerror(errno));
  481. }
  482. /* make the socket non-blocking */
  483. ff_socket_nonblock(udp_fd, 1);
  484. }
  485. if (s->is_connected) {
  486. if (connect(udp_fd, (struct sockaddr *) &s->dest_addr, s->dest_addr_len)) {
  487. av_log(h, AV_LOG_ERROR, "connect: %s\n", strerror(errno));
  488. goto fail;
  489. }
  490. }
  491. s->udp_fd = udp_fd;
  492. #if HAVE_PTHREADS
  493. if (!is_output && s->circular_buffer_size) {
  494. int ret;
  495. /* start the task going */
  496. s->fifo = av_fifo_alloc(s->circular_buffer_size);
  497. ret = pthread_mutex_init(&s->mutex, NULL);
  498. if (ret != 0) {
  499. av_log(h, AV_LOG_ERROR, "pthread_mutex_init failed : %s\n", strerror(ret));
  500. goto fail;
  501. }
  502. ret = pthread_cond_init(&s->cond, NULL);
  503. if (ret != 0) {
  504. av_log(h, AV_LOG_ERROR, "pthread_cond_init failed : %s\n", strerror(ret));
  505. goto cond_fail;
  506. }
  507. ret = pthread_create(&s->circular_buffer_thread, NULL, circular_buffer_task, h);
  508. if (ret != 0) {
  509. av_log(h, AV_LOG_ERROR, "pthread_create failed : %s\n", strerror(ret));
  510. goto thread_fail;
  511. }
  512. s->thread_started = 1;
  513. }
  514. #endif
  515. return 0;
  516. #if HAVE_PTHREADS
  517. thread_fail:
  518. pthread_cond_destroy(&s->cond);
  519. cond_fail:
  520. pthread_mutex_destroy(&s->mutex);
  521. #endif
  522. fail:
  523. if (udp_fd >= 0)
  524. closesocket(udp_fd);
  525. av_fifo_free(s->fifo);
  526. return AVERROR(EIO);
  527. }
  528. static int udp_read(URLContext *h, uint8_t *buf, int size)
  529. {
  530. UDPContext *s = h->priv_data;
  531. int ret;
  532. int avail;
  533. #if HAVE_PTHREADS
  534. if (s->fifo) {
  535. pthread_mutex_lock(&s->mutex);
  536. do {
  537. avail = av_fifo_size(s->fifo);
  538. if (avail) { // >=size) {
  539. uint8_t tmp[4];
  540. pthread_mutex_unlock(&s->mutex);
  541. av_fifo_generic_read(s->fifo, tmp, 4, NULL);
  542. avail= AV_RL32(tmp);
  543. if(avail > size){
  544. av_log(h, AV_LOG_WARNING, "Part of datagram lost due to insufficient buffer size\n");
  545. avail= size;
  546. }
  547. av_fifo_generic_read(s->fifo, buf, avail, NULL);
  548. av_fifo_drain(s->fifo, AV_RL32(tmp) - avail);
  549. return avail;
  550. } else if(s->circular_buffer_error){
  551. pthread_mutex_unlock(&s->mutex);
  552. return s->circular_buffer_error;
  553. } else if(h->flags & AVIO_FLAG_NONBLOCK) {
  554. pthread_mutex_unlock(&s->mutex);
  555. return AVERROR(EAGAIN);
  556. }
  557. else {
  558. pthread_cond_wait(&s->cond, &s->mutex);
  559. }
  560. } while( 1);
  561. }
  562. #endif
  563. if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
  564. ret = ff_network_wait_fd(s->udp_fd, 0);
  565. if (ret < 0)
  566. return ret;
  567. }
  568. ret = recv(s->udp_fd, buf, size, 0);
  569. return ret < 0 ? ff_neterrno() : ret;
  570. }
  571. static int udp_write(URLContext *h, const uint8_t *buf, int size)
  572. {
  573. UDPContext *s = h->priv_data;
  574. int ret;
  575. if (!(h->flags & AVIO_FLAG_NONBLOCK)) {
  576. ret = ff_network_wait_fd(s->udp_fd, 1);
  577. if (ret < 0)
  578. return ret;
  579. }
  580. if (!s->is_connected) {
  581. ret = sendto (s->udp_fd, buf, size, 0,
  582. (struct sockaddr *) &s->dest_addr,
  583. s->dest_addr_len);
  584. } else
  585. ret = send(s->udp_fd, buf, size, 0);
  586. return ret < 0 ? ff_neterrno() : ret;
  587. }
  588. static int udp_close(URLContext *h)
  589. {
  590. UDPContext *s = h->priv_data;
  591. int ret;
  592. if (s->is_multicast && (h->flags & AVIO_FLAG_READ))
  593. udp_leave_multicast_group(s->udp_fd, (struct sockaddr *)&s->dest_addr);
  594. closesocket(s->udp_fd);
  595. av_fifo_free(s->fifo);
  596. #if HAVE_PTHREADS
  597. if (s->thread_started) {
  598. s->exit_thread = 1;
  599. ret = pthread_join(s->circular_buffer_thread, NULL);
  600. if (ret != 0)
  601. av_log(h, AV_LOG_ERROR, "pthread_join(): %s\n", strerror(ret));
  602. }
  603. pthread_mutex_destroy(&s->mutex);
  604. pthread_cond_destroy(&s->cond);
  605. #endif
  606. return 0;
  607. }
  608. URLProtocol ff_udp_protocol = {
  609. .name = "udp",
  610. .url_open = udp_open,
  611. .url_read = udp_read,
  612. .url_write = udp_write,
  613. .url_close = udp_close,
  614. .url_get_file_handle = udp_get_file_handle,
  615. .priv_data_size = sizeof(UDPContext),
  616. .flags = URL_PROTOCOL_FLAG_NETWORK,
  617. };