From fe42ff23abf5207c6cf723a028f73d4145bad108 Mon Sep 17 00:00:00 2001 From: joq Date: Thu, 29 May 2003 18:00:45 +0000 Subject: [PATCH] jackrec uses ringbuffer (0.71.5) git-svn-id: svn+ssh://jackaudio.org/trunk/jack@401 0c269be4-1314-0410-8aa9-9f06e86f4224 --- configure.in | 2 +- example-clients/Makefile.am | 2 +- example-clients/capture_client.c | 260 +++++++++++--------------- example-clients/ringbuffer.c | 307 +++++++++++++++++++++++++++++++ example-clients/ringbuffer.h | 42 +++++ jack/jack.h | 3 +- 6 files changed, 460 insertions(+), 156 deletions(-) create mode 100644 example-clients/ringbuffer.c create mode 100644 example-clients/ringbuffer.h diff --git a/configure.in b/configure.in index 8aa1176..f7f8938 100644 --- a/configure.in +++ b/configure.in @@ -14,7 +14,7 @@ dnl changes are made dnl --- JACK_MAJOR_VERSION=0 JACK_MINOR_VERSION=71 -JACK_MICRO_VERSION=4 +JACK_MICRO_VERSION=5 dnl --- dnl HOWTO: updating the jack protocal version diff --git a/example-clients/Makefile.am b/example-clients/Makefile.am index 2984ca8..f52189a 100644 --- a/example-clients/Makefile.am +++ b/example-clients/Makefile.am @@ -62,7 +62,7 @@ jack_showtime_LDADD = ../libjack/libjack.la if HAVE_SNDFILE -jackrec_SOURCES = capture_client.c +jackrec_SOURCES = capture_client.c ringbuffer.c ringbuffer.h jackrec_LDFLAGS = @SNDFILE_LIBS@ -lrt -ldl -lpthread jackrec_LDADD = ../libjack/libjack.la endif diff --git a/example-clients/capture_client.c b/example-clients/capture_client.c index cc19315..16f7126 100644 --- a/example-clients/capture_client.c +++ b/example-clients/capture_client.c @@ -1,5 +1,6 @@ /* Copyright (C) 2001 Paul Davis + Copyright (C) 2003 Jack O'Quin This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -16,6 +17,7 @@ Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. * 2002/08/23 - modify for libsndfile 1.0.0 + * 2003/05/26 - use ringbuffers - joq $Id$ */ @@ -28,206 +30,131 @@ #include #include #include - #include -#include +#include "ringbuffer.h" typedef struct _thread_info { pthread_t thread_id; SNDFILE *sf; jack_nframes_t duration; + jack_nframes_t rb_size; jack_client_t *client; unsigned int channels; int bitdepth; - int can_capture; char *path; - int status; - int can_process; + volatile int can_capture; + volatile int can_process; + volatile int status; } thread_info_t; +/* JACK data */ unsigned int nports; jack_port_t **ports; - -pthread_mutex_t buffer_lock = PTHREAD_MUTEX_INITIALIZER; +jack_default_audio_sample_t **in; +jack_nframes_t nframes; +const size_t sample_size = sizeof(jack_default_audio_sample_t); + +/* Synchronization between process thread and disk thread. */ +#define DEFAULT_RB_SIZE 16384 /* ringbuffer size in frames */ +ringbuffer_t *rb; +pthread_mutex_t disk_thread_lock = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t data_ready = PTHREAD_COND_INITIALIZER; +long overruns = 0; -typedef struct _sample_buffer { - jack_nframes_t nframes; - jack_default_audio_sample_t **data; -} sample_buffer_t; - -sample_buffer_t * -sample_buffer_new (jack_nframes_t nframes, unsigned int nchans) -{ - sample_buffer_t *buf; - unsigned int i; - - buf = (sample_buffer_t *) malloc (sizeof (sample_buffer_t)); - buf->nframes = nframes; - buf->data = (jack_default_audio_sample_t **) malloc (sizeof (jack_default_audio_sample_t *) * nchans); - - for (i = 0; i < nchans; i++) { - buf->data[i] = (jack_default_audio_sample_t *) malloc (sizeof (jack_default_audio_sample_t) * nframes); - } - - return buf; -} - -JSList *pending_writes = NULL; -JSList *free_buffers = NULL; - -sample_buffer_t * -get_free_buffer (jack_nframes_t nframes, unsigned int nchans) -{ - sample_buffer_t *buf; - - if (free_buffers == NULL) { - buf = sample_buffer_new (nframes, nchans); - } else { - buf = (sample_buffer_t *) free_buffers->data; - free_buffers = jack_slist_next (free_buffers); - } - - return buf; -} - -sample_buffer_t * -get_write_buffer () -{ - sample_buffer_t *buf; - - if (pending_writes == NULL) { - return NULL; - } - - buf = (sample_buffer_t *) pending_writes->data; - pending_writes = jack_slist_next (pending_writes); - - return buf; -} - -void -put_write_buffer (sample_buffer_t *buf) -{ - pending_writes = jack_slist_append (pending_writes, buf); -} - -void -put_free_buffer (sample_buffer_t *buf) -{ - free_buffers = jack_slist_prepend (free_buffers, buf); -} void * disk_thread (void *arg) { - sample_buffer_t *buf; thread_info_t *info = (thread_info_t *) arg; - unsigned int i; - unsigned int chn; - jack_nframes_t total_captured = 0; - int done = 0; - float *fbuf; + static jack_nframes_t total_captured = 0; + jack_nframes_t samples_per_frame = info->channels; + size_t bytes_per_frame = samples_per_frame * sample_size; + void *framebuf = malloc (bytes_per_frame); pthread_setcanceltype (PTHREAD_CANCEL_ASYNCHRONOUS, NULL); - pthread_mutex_lock (&buffer_lock); - - /* preload the buffer cache */ - - for (i = 0; i < 8; i++) { - buf = sample_buffer_new (jack_get_buffer_size (info->client), info->channels); - put_free_buffer (buf); - } + pthread_mutex_lock (&disk_thread_lock); info->status = 0; - while (!done) { - pthread_cond_wait (&data_ready, &buffer_lock); + while (1) { - while ((buf = get_write_buffer ()) != 0) { - pthread_mutex_unlock (&buffer_lock); + /* Write the data one frame at a time. This is + * inefficient, but makes things simpler. */ + while (info->can_capture && + (ringbuffer_read_space (rb) >= bytes_per_frame)) { - /* libsndfile requires interleaved data */ - - if (info->can_capture) { + ringbuffer_read (rb, framebuf, bytes_per_frame); - fbuf = (float *) malloc (sizeof (float) * buf->nframes * info->channels); - - for (chn = 0; chn < info->channels; chn++) { - for (i = 0; i < buf->nframes; i++) { - fbuf[chn+(i*info->channels)] = buf->data[chn][i]; - } - } - - if (sf_writef_float (info->sf, fbuf, buf->nframes) != (sf_count_t)buf->nframes) { - char errstr[256]; - sf_error_str (0, errstr, sizeof (errstr) - 1); - fprintf (stderr, "cannot write data to sndfile (%s)\n", errstr); - info->status = -1; - done = 1; - break; - } + if (sf_writef_float (info->sf, framebuf, 1) != 1) { + char errstr[256]; + sf_error_str (0, errstr, sizeof (errstr) - 1); + fprintf (stderr, + "cannot write sndfile (%s)\n", + errstr); + info->status = EIO; /* write failed */ + goto done; + } - free (fbuf); - total_captured += buf->nframes; - - if (total_captured >= info->duration) { - printf ("disk thread finished\n"); - done = 1; - break; - } + if (++total_captured >= info->duration) { + printf ("disk thread finished\n"); + goto done; } - - pthread_mutex_lock (&buffer_lock); - put_free_buffer (buf); } + + /* wait until process() signals more data */ + pthread_cond_wait (&data_ready, &disk_thread_lock); } - pthread_mutex_unlock (&buffer_lock); + done: + pthread_mutex_unlock (&disk_thread_lock); + free (framebuf); return 0; } int process (jack_nframes_t nframes, void *arg) - { + int chn; + size_t i; thread_info_t *info = (thread_info_t *) arg; - jack_default_audio_sample_t *in; - sample_buffer_t *buf; - unsigned int i; - if (!info->can_process) { + /* Do nothing until we're ready to begin. */ + if ((!info->can_process) || (!info->can_capture)) return 0; - } - - /* we don't like taking locks, but until we have a lock - free ringbuffer written in C, this is what has to be done - */ - pthread_mutex_lock (&buffer_lock); - - buf = get_free_buffer (nframes, nports); - - for (i = 0; i < nports; i++) { - in = (jack_default_audio_sample_t *) jack_port_get_buffer (ports[i], nframes); - memcpy (buf->data[i], in, sizeof (jack_default_audio_sample_t) * nframes); + for (chn = 0; chn < nports; chn++) + in[chn] = jack_port_get_buffer (ports[chn], nframes); + + /* Sndfile requires interleaved data. It is simpler here to + * just queue interleaved samples to a single ringbuffer. */ + for (i = 0; i < nframes; i++) { + for (chn = 0; chn < nports; chn++) { + if (ringbuffer_write (rb, (void *) (in[chn]+i), + sample_size) + < sample_size) + overruns++; + } } - put_write_buffer (buf); - - /* tell the disk thread that there is work to do */ - - pthread_cond_signal (&data_ready); - pthread_mutex_unlock (&buffer_lock); + /* Tell the disk thread there is work to do. If it is already + * running, the lock will not be available. We can't wait + * here in the process() thread, but we don't need to signal + * in that case, because the disk thread will read all the + * data queued before waiting again. */ + if (pthread_mutex_trylock (&disk_thread_lock) == 0) { + pthread_cond_signal (&data_ready); + pthread_mutex_unlock (&disk_thread_lock); + } - return 0; + return 0; } void jack_shutdown (void *arg) { fprintf (stderr, "JACK shutdown\n"); - exit (0); + // exit (0); + abort(); } void @@ -273,6 +200,13 @@ run_disk_thread (thread_info_t *info) info->can_capture = 1; pthread_join (info->thread_id, NULL); sf_close (info->sf); + if (overruns > 0) { + fprintf (stderr, + "jackrec failed with %ld overruns.\n", overruns); + fprintf (stderr, " try a bigger buffer than -B %ld.\n", + info->rb_size); + info->status = EPIPE; + } if (info->status) { unlink (info->path); } @@ -282,10 +216,22 @@ void setup_ports (int sources, char *source_names[], thread_info_t *info) { unsigned int i; + size_t in_size; + /* Allocate data structures that depend on the number of ports. */ nports = sources; - ports = (jack_port_t **) malloc (sizeof (jack_port_t *) * nports); + in_size = nports * sizeof (jack_default_audio_sample_t *); + in = (jack_default_audio_sample_t **) malloc (in_size); + rb = ringbuffer_create (nports * sample_size * info->rb_size); + + /* When JACK is running realtime, jack_activate() will have + * called mlockall() to lock our pages into memory. But, we + * still need to touch any newly allocated pages before + * process() starts using them. Otherwise, a page fault could + * create a delay that would force JACK to shut us down. */ + memset(in, 0, in_size); + memset(rb->buf, 0, rb->size); for (i = 0; i < nports; i++) { char name[64]; @@ -307,7 +253,7 @@ setup_ports (int sources, char *source_names[], thread_info_t *info) } } - info->can_process = 1; + info->can_process = 1; /* process() can start, now */ } int @@ -320,16 +266,18 @@ main (int argc, char *argv[]) int longopt_index = 0; extern int optind, opterr; int show_usage = 0; - char *optstring = "d:f:b:h"; + char *optstring = "d:f:b:B:h"; struct option long_options[] = { - { "help", 1, 0, 'h' }, + { "help", 0, 0, 'h' }, { "duration", 1, 0, 'd' }, { "file", 1, 0, 'f' }, { "bitdepth", 1, 0, 'b' }, + { "bufsize", 1, 0, 'B' }, { 0, 0, 0, 0 } }; memset (&thread_info, 0, sizeof (thread_info)); + thread_info.rb_size = DEFAULT_RB_SIZE; opterr = 0; while ((c = getopt_long (argc, argv, optstring, long_options, &longopt_index)) != -1) { @@ -350,6 +298,9 @@ main (int argc, char *argv[]) case 'b': thread_info.bitdepth = atoi (optarg); break; + case 'B': + thread_info.rb_size = atoi (optarg); + break; default: fprintf (stderr, "error\n"); show_usage++; @@ -358,7 +309,7 @@ main (int argc, char *argv[]) } if (show_usage || thread_info.path == NULL || optind == argc) { - fprintf (stderr, "usage: jackrec -f filename [ -d second ] [ -b bitdepth ] port1 [ port2 ... ]\n"); + fprintf (stderr, "usage: jackrec -f filename [ -d second ] [ -b bitdepth ] [ -B bufsize ] port1 [ port2 ... ]\n"); exit (1); } @@ -374,7 +325,7 @@ main (int argc, char *argv[]) setup_disk_thread (&thread_info); jack_set_process_callback (client, process, &thread_info); - jack_on_shutdown (client, jack_shutdown, NULL); + jack_on_shutdown (client, jack_shutdown, &thread_info); if (jack_activate (client)) { fprintf (stderr, "cannot activate client"); @@ -385,5 +336,8 @@ main (int argc, char *argv[]) run_disk_thread (&thread_info); jack_client_close (client); + + ringbuffer_free (rb); + exit (0); } diff --git a/example-clients/ringbuffer.c b/example-clients/ringbuffer.c new file mode 100644 index 0000000..1561839 --- /dev/null +++ b/example-clients/ringbuffer.c @@ -0,0 +1,307 @@ +/* + Copyright (C) 2000 Paul Davis + Copyright (C) 2003 Rohan Drape + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 2.1 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + + ISO/POSIX C version of Paul Davis's lock free ringbuffer C++ code. + This is safe for the case of one read thread and one write thread. +*/ + +#include +#include +#include +#include "ringbuffer.h" + +/* Create a new ringbuffer to hold at least `sz' bytes of data. The + actual buffer size is rounded up to the next power of two. */ + +ringbuffer_t * +ringbuffer_create (int sz) +{ + int power_of_two; + ringbuffer_t *rb; + + rb = malloc (sizeof (ringbuffer_t)); + + for (power_of_two = 1; 1 << power_of_two < sz; power_of_two++); + + rb->size = 1 << power_of_two; + rb->size_mask = rb->size; + rb->size_mask -= 1; + rb->write_ptr = 0; + rb->read_ptr = 0; + rb->buf = malloc (rb->size); + rb->mlocked = 0; + + return rb; +} + +/* Free all data associated with the ringbuffer `rb'. */ + +void +ringbuffer_free (ringbuffer_t * rb) +{ + if (rb->mlocked) { + munlock (rb->buf, rb->size); + } + free (rb->buf); +} + +/* Lock the data block of `rb' using the system call 'mlock'. */ + +int +ringbuffer_mlock (ringbuffer_t * rb) +{ + if (mlock (rb->buf, rb->size)) { + return -1; + } + rb->mlocked = 1; + return 0; +} + +/* Reset the read and write pointers to zero. This is not thread + safe. */ + +void +ringbuffer_reset (ringbuffer_t * rb) +{ + rb->read_ptr = 0; + rb->write_ptr = 0; +} + +/* Return the number of bytes available for reading. This is the + number of bytes in front of the read pointer and behind the write + pointer. */ + +size_t +ringbuffer_read_space (ringbuffer_t * rb) +{ + size_t w, r; + + w = rb->write_ptr; + r = rb->read_ptr; + + if (w > r) { + return w - r; + } else { + return (w - r + rb->size) & rb->size_mask; + } +} + +/* Return the number of bytes available for writing. This is the + number of bytes in front of the write pointer and behind the read + pointer. */ + +size_t +ringbuffer_write_space (ringbuffer_t * rb) +{ + size_t w, r; + + w = rb->write_ptr; + r = rb->read_ptr; + + if (w > r) { + return ((r - w + rb->size) & rb->size_mask) - 1; + } else if (w < r) { + return (r - w) - 1; + } else { + return rb->size - 1; + } +} + +/* The copying data reader. Copy at most `cnt' bytes from `rb' to + `dest'. Returns the actual number of bytes copied. */ + +size_t +ringbuffer_read (ringbuffer_t * rb, char *dest, size_t cnt) +{ + size_t free_cnt; + size_t cnt2; + size_t to_read; + size_t n1, n2; + + if ((free_cnt = ringbuffer_read_space (rb)) == 0) { + return 0; + } + + to_read = cnt > free_cnt ? free_cnt : cnt; + + cnt2 = rb->read_ptr + to_read; + + if (cnt2 > rb->size) { + n1 = rb->size - rb->read_ptr; + n2 = cnt2 & rb->size_mask; + } else { + n1 = to_read; + n2 = 0; + } + + memcpy (dest, &(rb->buf[rb->read_ptr]), n1); + rb->read_ptr += n1; + rb->read_ptr &= rb->size_mask; + + if (n2) { + memcpy (dest + n1, &(rb->buf[rb->read_ptr]), n2); + rb->read_ptr += n2; + rb->read_ptr &= rb->size_mask; + } + + return to_read; +} + +/* The copying data writer. Copy at most `cnt' bytes to `rb' from + `src'. Returns the actual number of bytes copied. */ + +size_t +ringbuffer_write (ringbuffer_t * rb, char *src, size_t cnt) +{ + size_t free_cnt; + size_t cnt2; + size_t to_write; + size_t n1, n2; + + if ((free_cnt = ringbuffer_write_space (rb)) == 0) { + return 0; + } + + to_write = cnt > free_cnt ? free_cnt : cnt; + + cnt2 = rb->write_ptr + to_write; + + if (cnt2 > rb->size) { + n1 = rb->size - rb->write_ptr; + n2 = cnt2 & rb->size_mask; + } else { + n1 = to_write; + n2 = 0; + } + + memcpy (&(rb->buf[rb->write_ptr]), src, n1); + rb->write_ptr += n1; + rb->write_ptr &= rb->size_mask; + + if (n2) { + memcpy (&(rb->buf[rb->write_ptr]), src + n1, n2); + rb->write_ptr += n2; + rb->write_ptr &= rb->size_mask; + } + + return to_write; +} + +/* Advance the read pointer `cnt' places. */ + +void +ringbuffer_read_advance (ringbuffer_t * rb, size_t cnt) +{ + rb->read_ptr += cnt; + rb->read_ptr &= rb->size_mask; +} + +/* Advance the write pointer `cnt' places. */ + +void +ringbuffer_write_advance (ringbuffer_t * rb, size_t cnt) +{ + rb->write_ptr += cnt; + rb->write_ptr &= rb->size_mask; +} + +/* The non-copying data reader. `vec' is an array of two places. Set + the values at `vec' to hold the current readable data at `rb'. If + the readable data is in one segment the second segment has zero + length. */ + +void +ringbuffer_get_read_vector (ringbuffer_t * rb, + ringbuffer_data_t * vec) +{ + size_t free_cnt; + size_t cnt2; + size_t w, r; + + w = rb->write_ptr; + r = rb->read_ptr; + + if (w > r) { + free_cnt = w - r; + } else { + free_cnt = (w - r + rb->size) & rb->size_mask; + } + + cnt2 = r + free_cnt; + + if (cnt2 > rb->size) { + + /* Two part vector: the rest of the buffer after the current write + ptr, plus some from the start of the buffer. */ + + vec[0].buf = &(rb->buf[r]); + vec[0].len = rb->size - r; + vec[1].buf = rb->buf; + vec[1].len = cnt2 & rb->size_mask; + + } else { + + /* Single part vector: just the rest of the buffer */ + + vec[0].buf = &(rb->buf[r]); + vec[0].len = free_cnt; + vec[1].len = 0; + } +} + +/* The non-copying data writer. `vec' is an array of two places. Set + the values at `vec' to hold the current writeable data at `rb'. If + the writeable data is in one segment the second segment has zero + length. */ + +void +ringbuffer_get_write_vector (ringbuffer_t * rb, + ringbuffer_data_t * vec) +{ + size_t free_cnt; + size_t cnt2; + size_t w, r; + + w = rb->write_ptr; + r = rb->read_ptr; + + if (w > r) { + free_cnt = ((r - w + rb->size) & rb->size_mask) - 1; + } else if (w < r) { + free_cnt = (r - w) - 1; + } else { + free_cnt = rb->size - 1; + } + + cnt2 = w + free_cnt; + + if (cnt2 > rb->size) { + + /* Two part vector: the rest of the buffer after the current write + ptr, plus some from the start of the buffer. */ + + vec[0].buf = &(rb->buf[w]); + vec[0].len = rb->size - w; + vec[1].buf = rb->buf; + vec[1].len = cnt2 & rb->size_mask; + } else { + vec[0].buf = &(rb->buf[w]); + vec[0].len = free_cnt; + vec[1].len = 0; + } +} diff --git a/example-clients/ringbuffer.h b/example-clients/ringbuffer.h new file mode 100644 index 0000000..acdef99 --- /dev/null +++ b/example-clients/ringbuffer.h @@ -0,0 +1,42 @@ +#ifndef _RINGBUFFER_H +#define _RINGBUFFER_H + +#include + +typedef struct +{ + char *buf; + size_t len; +} +ringbuffer_data_t ; + +typedef struct +{ + char *buf; + volatile size_t write_ptr; + volatile size_t read_ptr; + size_t size; + size_t size_mask; + int mlocked; +} +ringbuffer_t ; + +ringbuffer_t *ringbuffer_create(int sz); +void ringbuffer_free(ringbuffer_t *rb); + +int ringbuffer_mlock(ringbuffer_t *rb); +void ringbuffer_reset(ringbuffer_t *rb); + +void ringbuffer_write_advance(ringbuffer_t *rb, size_t cnt); +void ringbuffer_read_advance(ringbuffer_t *rb, size_t cnt); + +size_t ringbuffer_write_space(ringbuffer_t *rb); +size_t ringbuffer_read_space(ringbuffer_t *rb); + +size_t ringbuffer_read(ringbuffer_t *rb, char *dest, size_t cnt); +size_t ringbuffer_write(ringbuffer_t *rb, char *src, size_t cnt); + +void ringbuffer_get_read_vector(ringbuffer_t *rb, ringbuffer_data_t *vec); +void ringbuffer_get_write_vector(ringbuffer_t *rb, ringbuffer_data_t *vec); + +#endif diff --git a/jack/jack.h b/jack/jack.h index cf5a0a3..53f070d 100644 --- a/jack/jack.h +++ b/jack/jack.h @@ -105,7 +105,8 @@ void jack_on_shutdown (jack_client_t *client, void (*function)(void *arg), void * for more information. * * - * @return 0 on success, otherwise a non-zero error code + * @return 0 on success, otherwise a non-zero error code, causing JACK + * to remove that client from the process() graph. */ int jack_set_process_callback (jack_client_t *, JackProcessCallback process_callback, void *arg);