From 1c2b8534c06f70f36ea6ae2115a48e4df6c06335 Mon Sep 17 00:00:00 2001 From: Paul Davis Date: Thu, 3 Oct 2013 13:38:53 -0400 Subject: [PATCH] fix up internal MIDI client to do duplex --- drivers/a2j/a2j.h | 18 +- drivers/a2j/input_client.c | 665 +++++++++++++++++++++++++------------ drivers/a2j/port.c | 26 +- drivers/a2j/port.h | 2 +- drivers/a2j/port_thread.c | 66 ++-- 5 files changed, 513 insertions(+), 264 deletions(-) diff --git a/drivers/a2j/a2j.h b/drivers/a2j/a2j.h index 93b74cd..87453ae 100644 --- a/drivers/a2j/a2j.h +++ b/drivers/a2j/a2j.h @@ -22,6 +22,7 @@ #ifndef STRUCTS_H__FD2CC895_411F_4ADE_9200_50FE395EDB72__INCLUDED #define STRUCTS_H__FD2CC895_411F_4ADE_9200_50FE395EDB72__INCLUDED +#include #include #include @@ -33,6 +34,10 @@ #define PORT_HASH_BITS 4 #define PORT_HASH_SIZE (1 << PORT_HASH_BITS) +/* Beside enum use, these are indeces for (struct a2j).stream array */ +#define A2J_PORT_CAPTURE 0 // ALSA playback port -> JACK capture port +#define A2J_PORT_PLAYBACK 1 // JACK playback port -> ALSA capture port + typedef struct a2j_port * a2j_port_hash_t[PORT_HASH_SIZE]; struct a2j; @@ -68,22 +73,23 @@ struct a2j jack_client_t * jack_client; snd_seq_t *seq; - pthread_t alsa_io_thread; + pthread_t alsa_input_thread; + pthread_t alsa_output_thread; int client_id; int port_id; int queue; - int input; - int finishing; - int ignore_hardware_ports; + bool freewheeling; + bool running; + bool finishing; jack_ringbuffer_t* port_add; // snd_seq_addr_t jack_ringbuffer_t* port_del; // struct a2j_port* jack_ringbuffer_t* outbound_events; // struct a2j_delivery_event jack_nframes_t cycle_start; - sem_t io_semaphore; + sem_t output_semaphore; - struct a2j_stream stream; + struct a2j_stream stream[2]; }; #define NSEC_PER_SEC ((int64_t)1000*1000*1000) diff --git a/drivers/a2j/input_client.c b/drivers/a2j/input_client.c index 8798444..ad06b1a 100644 --- a/drivers/a2j/input_client.c +++ b/drivers/a2j/input_client.c @@ -35,11 +35,7 @@ #include "port.h" #include "port_thread.h" -static bool g_freewheeling = false; -bool g_keep_walking = true; -bool g_keep_alsa_walking = false; bool g_stop_request = false; -bool g_started = false; void a2j_info (const char* fmt, ...) @@ -79,24 +75,19 @@ a2j_warning (const char* fmt, ...) } static bool -a2j_stream_init(struct a2j * self) -{ - struct a2j_stream *str = &self->stream; - - str->new_ports = jack_ringbuffer_create (MAX_PORTS * sizeof(struct a2j_port *)); - if (str->new_ports == NULL) { - return false; - } - - snd_midi_event_new (MAX_EVENT_SIZE, &str->codec); - INIT_LIST_HEAD (&str->list); - - return true; -} - -static void -a2j_stream_attach (struct a2j_stream * stream_ptr) +a2j_stream_init(struct a2j * self, int which) { + struct a2j_stream *str = &self->stream[which]; + + str->new_ports = jack_ringbuffer_create (MAX_PORTS * sizeof(struct a2j_port *)); + if (str->new_ports == NULL) { + return false; + } + + snd_midi_event_new (MAX_EVENT_SIZE, &str->codec); + INIT_LIST_HEAD (&str->list); + + return true; } static void @@ -116,9 +107,9 @@ a2j_stream_detach (struct a2j_stream * stream_ptr) static void -a2j_stream_close (struct a2j * self) +a2j_stream_close (struct a2j * self, int which) { - struct a2j_stream *str = &self->stream; + struct a2j_stream *str = &self->stream[which]; if (str->codec) snd_midi_event_free (str->codec); @@ -126,7 +117,25 @@ a2j_stream_close (struct a2j * self) jack_ringbuffer_free (str->new_ports); } - +static void +stop_threads (struct a2j* self) +{ + if (self->running) { + void* thread_status; + + self->running = false; /* tell alsa io thread to stop, whenever they wake up */ + /* do something that we need to do anyway and will wake the io thread, then join */ + snd_seq_disconnect_from (self->seq, self->port_id, SND_SEQ_CLIENT_SYSTEM, SND_SEQ_PORT_SYSTEM_ANNOUNCE); + a2j_debug ("wait for ALSA input thread\n"); + pthread_join (self->alsa_input_thread, &thread_status); + a2j_debug ("input thread done\n"); + + /* wake output thread and join */ + sem_post(&self->output_semaphore); + pthread_join(self->alsa_output_thread, &thread_status); + a2j_debug ("output thread done\n"); + } +} /* * =================== Input/output port handling ========================= @@ -159,15 +168,18 @@ a2j_port_event (struct a2j * self, snd_seq_event_t * ev) } } else if (ev->type == SND_SEQ_EVENT_PORT_EXIT) { a2j_debug("port_event: del %d:%d", addr.client, addr.port); - a2j_port_setdead(self->stream.port_hash, addr); + a2j_port_setdead(self->stream[A2J_PORT_CAPTURE].port_hash, addr); + a2j_port_setdead(self->stream[A2J_PORT_PLAYBACK].port_hash, addr); } } +/* --- INBOUND FROM ALSA TO JACK ---- */ + static void a2j_input_event (struct a2j * self, snd_seq_event_t * alsa_event) { jack_midi_data_t data[MAX_EVENT_SIZE]; - struct a2j_stream *str = &self->stream; + struct a2j_stream *str = &self->stream[A2J_PORT_CAPTURE]; long size; struct a2j_port *port; jack_nframes_t now; @@ -208,28 +220,30 @@ a2j_input_event (struct a2j * self, snd_seq_event_t * alsa_event) limit = (to_write > vec[0].len ? vec[0].len : to_write); - if( limit ) { - memcpy( vec[0].buf, ev_charp, limit ); - to_write -= limit; - ev_charp += limit; - vec[0].buf += limit; - vec[0].len -= limit; + if (limit) { + memcpy( vec[0].buf, ev_charp, limit ); + to_write -= limit; + ev_charp += limit; + vec[0].buf += limit; + vec[0].len -= limit; } - if( to_write ) { - memcpy( vec[1].buf, ev_charp, to_write ); - vec[1].buf += to_write; - vec[1].len -= to_write; + if (to_write) { + memcpy( vec[1].buf, ev_charp, to_write ); + vec[1].buf += to_write; + vec[1].len -= to_write; } to_write = size; ev_charp = (char *)data; limit = (to_write > vec[0].len ? vec[0].len : to_write); - if( limit ) - memcpy( vec[0].buf, ev_charp, limit ); + if (limit) { + memcpy (vec[0].buf, ev_charp, limit); + } to_write -= limit; ev_charp += limit; - if( to_write ) - memcpy( vec[1].buf, ev_charp, to_write ); + if (to_write) { + memcpy (vec[1].buf, ev_charp, to_write); + } jack_ringbuffer_write_advance( port->inbound_events, sizeof(ev) + size ); } else { @@ -238,10 +252,74 @@ a2j_input_event (struct a2j * self, snd_seq_event_t * alsa_event) } +static int +a2j_process_incoming (struct a2j* self, struct a2j_port* port, jack_nframes_t nframes) +{ + jack_nframes_t one_period; + struct a2j_alsa_midi_event ev; + char *ev_buf; + + /* grab data queued by the ALSA input thread and write it into the JACK + port buffer. it will delivered during the JACK period that this + function is called from. + */ + + /* first clear the JACK port buffer in preparation for new data + */ + + // a2j_debug ("PORT: %s process input", jack_port_name (port->jack_port)); + + jack_midi_clear_buffer (port->jack_buf); + + one_period = jack_get_buffer_size (self->jack_client); + + while (jack_ringbuffer_peek (port->inbound_events, (char*)&ev, sizeof(ev) ) == sizeof(ev) ) { + + jack_midi_data_t* buf; + jack_nframes_t offset; + + if (ev.time >= self->cycle_start) { + break; + } + + //jack_ringbuffer_read_advance (port->inbound_events, sizeof (ev)); + ev_buf = (char *) alloca( sizeof(ev) + ev.size ); + + if (jack_ringbuffer_peek (port->inbound_events, ev_buf, sizeof(ev) + ev.size ) != sizeof(ev) + ev.size) + break; + + offset = self->cycle_start - ev.time; + if (offset > one_period) { + /* from a previous cycle, somehow. cram it in at the front */ + offset = 0; + } else { + /* offset from start of the current cycle */ + offset = one_period - offset; + } + + a2j_debug ("event at %d offset %d", ev.time, offset); + + /* make sure there is space for it */ + + buf = jack_midi_event_reserve (port->jack_buf, offset, ev.size); + + if (buf) { + /* grab the event */ + memcpy( buf, ev_buf + sizeof(ev), ev.size ); + } else { + /* throw it away (no space) */ + a2j_error ("threw away MIDI event - not reserved at time %d", ev.time); + } + jack_ringbuffer_read_advance (port->inbound_events, sizeof(ev) + ev.size); + + a2j_debug("input on %s: sucked %d bytes from inbound at %d", jack_port_name (port->jack_port), ev.size, ev.time); + } + + return 0; +} -/* ALSA */ - -void* alsa_input_thread(void * arg) +void* +alsa_input_thread (void* arg) { struct a2j * self = arg; int npfd; @@ -258,7 +336,8 @@ void* alsa_input_thread(void * arg) snd_seq_poll_descriptors(self->seq, pfd, npfd, POLLIN); initial = true; - while (g_keep_alsa_walking) { + + while (self->running) { if ((ret = poll(pfd, npfd, 1000)) > 0) { while (snd_seq_event_input (self->seq, &event) > 0) { @@ -296,147 +375,310 @@ void* alsa_input_thread(void * arg) return (void*) 0; } +/* --- OUTBOUND FROM JACK TO ALSA ---- */ -/* JACK */ +int +a2j_process_outgoing ( + struct a2j * self, + struct a2j_port * port) +{ + /* collect data from JACK port buffer and queue it for delivery by ALSA output thread */ + + int nevents; + jack_ringbuffer_data_t vec[2]; + int i; + int written = 0; + size_t limit; + struct a2j_delivery_event* dev; + size_t gap = 0; + + jack_ringbuffer_get_write_vector (self->outbound_events, vec); + + dev = (struct a2j_delivery_event*) vec[0].buf; + limit = vec[0].len / sizeof (struct a2j_delivery_event); + nevents = jack_midi_get_event_count (port->jack_buf); + + for (i = 0; (i < nevents) && (written < limit); ++i) { + + jack_midi_event_get (&dev->jack_event, port->jack_buf, i); + if (dev->jack_event.size <= MAX_JACKMIDI_EV_SIZE) + { + dev->time = dev->jack_event.time; + dev->port = port; + memcpy( dev->midistring, dev->jack_event.buffer, dev->jack_event.size ); + written++; + ++dev; + } + } + + /* anything left? use the second part of the vector, as much as possible */ + + if (i < nevents) + { + if (vec[0].len) + { + gap = vec[0].len - written * sizeof(struct a2j_delivery_event); + } + + dev = (struct a2j_delivery_event*) vec[1].buf; + + limit += (vec[1].len / sizeof (struct a2j_delivery_event)); + + while ((i < nevents) && (written < limit)) + { + jack_midi_event_get(&dev->jack_event, port->jack_buf, i); + if (dev->jack_event.size <= MAX_JACKMIDI_EV_SIZE) + { + dev->time = dev->jack_event.time; + dev->port = port; + memcpy(dev->midistring, dev->jack_event.buffer, dev->jack_event.size); + written++; + ++dev; + } + ++i; + } + } + + // a2j_debug( "done pushing events: %d ... gap: %d ", (int)written, (int)gap ); + /* clear JACK port buffer; advance ring buffer ptr */ + + jack_ringbuffer_write_advance (self->outbound_events, written * sizeof (struct a2j_delivery_event) + gap); + + return nevents; +} static int -a2j_process (jack_nframes_t nframes, void * arg) +time_sorter (struct a2j_delivery_event * a, struct a2j_delivery_event * b) { - struct a2j* self = (struct a2j *) arg; - struct a2j_stream * stream_ptr; - int i; - struct a2j_port ** port_ptr; - struct a2j_port * port; - - if (g_freewheeling) { - return 0; - } + if (a->time < b->time) { + return -1; + } else if (a->time > b->time) { + return 1; + } + return 0; +} - self->cycle_start = jack_last_frame_time (self->jack_client); - - stream_ptr = &self->stream; - a2j_add_ports (stream_ptr); - - // process ports - - for (i = 0 ; i < PORT_HASH_SIZE ; i++) { - - port_ptr = &stream_ptr->port_hash[i]; - - while (*port_ptr != NULL) { - - struct a2j_alsa_midi_event ev; - jack_nframes_t now; - jack_nframes_t one_period; - char *ev_buf; - - port = *port_ptr; - - if (port->is_dead) { - if (jack_ringbuffer_write_space (self->port_del) >= sizeof(port_ptr)) { - - a2j_debug("jack: removed port %s", port->name); - *port_ptr = port->next; - jack_ringbuffer_write (self->port_del, (char*)&port, sizeof(port)); - } else { - a2j_error ("port deletion lost - no space in event buffer!"); - } +static void* +alsa_output_thread(void * arg) +{ + struct a2j * self = (struct a2j*) arg; + struct a2j_stream *str = &self->stream[A2J_PORT_PLAYBACK]; + int i; + struct list_head evlist; + struct list_head * node_ptr; + jack_ringbuffer_data_t vec[2]; + snd_seq_event_t alsa_event; + struct a2j_delivery_event* ev; + float sr; + jack_nframes_t now; + int err; + int limit; + + while (self->running) { + /* first, make a list of all events in the outbound_events FIFO */ + + INIT_LIST_HEAD(&evlist); - port_ptr = &port->next; - continue; - } + jack_ringbuffer_get_read_vector (self->outbound_events, vec); - port->jack_buf = jack_port_get_buffer(port->jack_port, nframes); - - /* grab data queued by the ALSA input thread and write it into the JACK - port buffer. it will delivered during the JACK period that this - function is called from. - */ - - /* first clear the JACK port buffer in preparation for new data - */ - - // a2j_debug ("PORT: %s process input", jack_port_name (port->jack_port)); - - jack_midi_clear_buffer (port->jack_buf); - - now = jack_frame_time (self->jack_client); - one_period = jack_get_buffer_size (self->jack_client); - - while (jack_ringbuffer_peek (port->inbound_events, (char*)&ev, sizeof(ev) ) == sizeof(ev) ) { - - jack_midi_data_t* buf; - jack_nframes_t offset; - - if (ev.time >= self->cycle_start) { - break; - } - - //jack_ringbuffer_read_advance (port->inbound_events, sizeof (ev)); - ev_buf = (char *) alloca( sizeof(ev) + ev.size ); - - if (jack_ringbuffer_peek (port->inbound_events, ev_buf, sizeof(ev) + ev.size ) != sizeof(ev) + ev.size) - break; - - offset = self->cycle_start - ev.time; - if (offset > one_period) { - /* from a previous cycle, somehow. cram it in at the front */ - offset = 0; - } else { - /* offset from start of the current cycle */ - offset = one_period - offset; - } - - a2j_debug ("event at %d offset %d", ev.time, offset); - - /* make sure there is space for it */ - - buf = jack_midi_event_reserve (port->jack_buf, offset, ev.size); - - if (buf) { - /* grab the event */ - memcpy( buf, ev_buf + sizeof(ev), ev.size ); - } else { - /* throw it away (no space) */ - a2j_error ("threw away MIDI event - not reserved at time %d", ev.time); - } - jack_ringbuffer_read_advance (port->inbound_events, sizeof(ev) + ev.size); - - a2j_debug("input on %s: sucked %d bytes from inbound at %d", jack_port_name (port->jack_port), ev.size, ev.time); - } - - port_ptr = &port->next; - } - } + a2j_debug ("output thread: got %d+%d events", + (vec[0].len / sizeof (struct a2j_delivery_event)), + (vec[1].len / sizeof (struct a2j_delivery_event))); + + ev = (struct a2j_delivery_event*) vec[0].buf; + limit = vec[0].len / sizeof (struct a2j_delivery_event); + for (i = 0; i < limit; ++i) { + list_add_tail(&ev->siblings, &evlist); + ev++; + } + + ev = (struct a2j_delivery_event*) vec[1].buf; + limit = vec[1].len / sizeof (struct a2j_delivery_event); + for (i = 0; i < limit; ++i) { + list_add_tail(&ev->siblings, &evlist); + ev++; + } + + if (vec[0].len < sizeof(struct a2j_delivery_event) && (vec[1].len == 0)) { + /* no events: wait for some */ + a2j_debug ("output thread: wait for events"); + sem_wait (&self->output_semaphore); + a2j_debug ("output thread: AWAKE ... loop back for events"); + continue; + } + + /* now sort this list by time */ + + list_sort(&evlist, struct a2j_delivery_event, siblings, time_sorter); + + /* now deliver */ + + sr = jack_get_sample_rate (self->jack_client); + + list_for_each(node_ptr, &evlist) + { + ev = list_entry(node_ptr, struct a2j_delivery_event, siblings); + + snd_seq_ev_clear(&alsa_event); + snd_midi_event_reset_encode(str->codec); + if (!snd_midi_event_encode(str->codec, (const unsigned char *)ev->midistring, ev->jack_event.size, &alsa_event)) + { + continue; // invalid event + } + + snd_seq_ev_set_source(&alsa_event, self->port_id); + snd_seq_ev_set_dest(&alsa_event, ev->port->remote.client, ev->port->remote.port); + snd_seq_ev_set_direct (&alsa_event); + + now = jack_frame_time (self->jack_client); + + ev->time += self->cycle_start; + + a2j_debug ("@ %d, next event @ %d", now, ev->time); + + /* do we need to wait a while before delivering? */ + + if (ev->time > now) { + struct timespec nanoseconds; + jack_nframes_t sleep_frames = ev->time - now; + float seconds = sleep_frames / sr; + + /* if the gap is long enough, sleep */ + + if (seconds > 0.001) { + nanoseconds.tv_sec = (time_t) seconds; + nanoseconds.tv_nsec = (long) NSEC_PER_SEC * (seconds - nanoseconds.tv_sec); + + a2j_debug ("output thread sleeps for %.2f msec", ((double) nanoseconds.tv_nsec / NSEC_PER_SEC) * 1000.0); + + if (nanosleep (&nanoseconds, NULL) < 0) { + fprintf (stderr, "BAD SLEEP\n"); + /* do something ? */ + } + } + } + + /* its time to deliver */ + err = snd_seq_event_output(self->seq, &alsa_event); + snd_seq_drain_output (self->seq); + now = jack_frame_time (self->jack_client); + a2j_debug("alsa_out: written %d bytes to %s at %d, DELTA = %d", ev->jack_event.size, ev->port->name, now, + (int32_t) (now - ev->time)); + } + + /* free up space in the FIFO */ + + jack_ringbuffer_read_advance (self->outbound_events, vec[0].len + vec[1].len); - return 0; + /* and head back for more */ + } + + return (void*) 0; } +/** CORE JACK PROCESSING */ + + +/* ALSA */ + +static void +a2j_jack_process_internal (struct a2j * self, int dir, jack_nframes_t nframes) +{ + struct a2j_stream * stream_ptr; + int i; + struct a2j_port ** port_ptr_ptr; + struct a2j_port * port_ptr; + int nevents = 0; + + stream_ptr = &self->stream[dir]; + a2j_add_ports(stream_ptr); + + // process ports + for (i = 0 ; i < PORT_HASH_SIZE ; i++) + { + port_ptr_ptr = &stream_ptr->port_hash[i]; + while (*port_ptr_ptr != NULL) + { + port_ptr = *port_ptr_ptr; + + if (!port_ptr->is_dead) { + port_ptr->jack_buf = jack_port_get_buffer(port_ptr->jack_port, nframes); + + if (dir == A2J_PORT_CAPTURE) { + a2j_process_incoming (self, port_ptr, nframes); + } else { + nevents += a2j_process_outgoing (self, port_ptr); + } + + } else if (jack_ringbuffer_write_space (self->port_del) >= sizeof(port_ptr)) { + + a2j_debug("jack: removed port %s", port_ptr->name); + *port_ptr_ptr = port_ptr->next; + jack_ringbuffer_write(self->port_del, (char*)&port_ptr, sizeof(port_ptr)); + continue; + + } + + port_ptr_ptr = &port_ptr->next; + } + } + + if (dir == A2J_PORT_PLAYBACK && nevents > 0) { + int sv; + + /* if we queued up anything for output, tell the output thread in + case its waiting for us. + */ + + sem_getvalue (&self->output_semaphore, &sv); + sem_post (&self->output_semaphore); + } +} + +static int +a2j_process(jack_nframes_t nframes, void * arg) +{ + struct a2j* self = (struct a2j *) arg; + + if (self->freewheeling) { + return 0; + } + + self->cycle_start = jack_last_frame_time (self->jack_client); + + a2j_jack_process_internal (self, A2J_PORT_CAPTURE, nframes); + a2j_jack_process_internal (self, A2J_PORT_PLAYBACK, nframes); + + return 0; +} + +/* --- */ + static void -a2j_freewheel( - int starting, - void * arg) +a2j_freewheel(int starting, void * arg) { - g_freewheeling = starting; + struct a2j* self = (struct a2j*) arg; + self->freewheeling = starting; } static void -a2j_shutdown( - void * arg) +a2j_shutdown (void * arg) { - a2j_warning("JACK server shutdown notification received."); - g_stop_request = true; + struct a2j* self = (struct a2j*) self; + a2j_warning ("JACK server shutdown notification received."); + stop_threads (self); } int connect_to_alsa (struct a2j* self) { int error; - void * thread_status; + void* thread_status; + + self->port_add = jack_ringbuffer_create (2 * MAX_PORTS * sizeof(snd_seq_addr_t)); - self->port_add = jack_ringbuffer_create(2 * MAX_PORTS * sizeof(snd_seq_addr_t)); if (self->port_add == NULL) { goto free_self; } @@ -446,16 +688,25 @@ connect_to_alsa (struct a2j* self) goto free_ringbuffer_add; } - if (!a2j_stream_init(self)) { + self->outbound_events = jack_ringbuffer_create (MAX_EVENT_SIZE * 16 * sizeof(struct a2j_delivery_event)); + if (self->outbound_events == NULL) { + goto free_ringbuffer_del; + } + + if (!a2j_stream_init (self, A2J_PORT_CAPTURE)) { goto free_ringbuffer_outbound; } + if (!a2j_stream_init (self, A2J_PORT_PLAYBACK)) { + goto close_capture_stream; + } + if ((error = snd_seq_open(&self->seq, "hw", SND_SEQ_OPEN_DUPLEX, 0)) < 0) { - a2j_error("failed to open alsa seq"); - goto close_stream; + a2j_error("failed to open alsa seq"); + goto close_playback_stream; } - if ((error = snd_seq_set_client_name(self->seq, "midi_in")) < 0) { + if ((error = snd_seq_set_client_name(self->seq, "jackmidi")) < 0) { a2j_error("snd_seq_set_client_name() failed"); goto close_seq_client; } @@ -485,8 +736,6 @@ connect_to_alsa (struct a2j* self) snd_seq_start_queue (self->seq, self->queue, 0); - a2j_stream_attach (&self->stream); - if ((error = snd_seq_nonblock(self->seq, 1)) < 0) { a2j_error("snd_seq_nonblock() failed"); goto close_seq_client; @@ -494,17 +743,17 @@ connect_to_alsa (struct a2j* self) snd_seq_drop_input (self->seq); - a2j_add_ports(&self->stream); + a2j_add_ports(&self->stream[A2J_PORT_CAPTURE]); + a2j_add_ports(&self->stream[A2J_PORT_PLAYBACK]); - if (sem_init(&self->io_semaphore, 0, 0) < 0) { + if (sem_init(&self->output_semaphore, 0, 0) < 0) { a2j_error("can't create IO semaphore"); goto close_jack_client; } - g_keep_alsa_walking = true; + self->running = true; - if (pthread_create(&self->alsa_io_thread, NULL, alsa_input_thread, self) < 0) - { + if (pthread_create(&self->alsa_input_thread, NULL, alsa_input_thread, self) < 0) { a2j_error("cannot start ALSA input thread"); goto sem_destroy; } @@ -512,28 +761,40 @@ connect_to_alsa (struct a2j* self) /* wake the poll loop in the alsa input thread so initial ports are fetched */ if ((error = snd_seq_connect_from (self->seq, self->port_id, SND_SEQ_CLIENT_SYSTEM, SND_SEQ_PORT_SYSTEM_ANNOUNCE)) < 0) { a2j_error("snd_seq_connect_from() failed"); - goto join_io_thread; + goto join_input_thread; + } + + if (pthread_create(&self->alsa_output_thread, NULL, alsa_output_thread, self) < 0) { + a2j_error("cannot start ALSA input thread"); + goto sem_destroy; } return 0; - g_keep_alsa_walking = false; /* tell alsa threads to stop */ + /* error handling */ + + self->running = false; /* tell alsa threads to stop */ + self->finishing = false; + snd_seq_disconnect_from(self->seq, self->port_id, SND_SEQ_CLIENT_SYSTEM, SND_SEQ_PORT_SYSTEM_ANNOUNCE); - join_io_thread: - pthread_join(self->alsa_io_thread, &thread_status); + join_input_thread: + pthread_join (self->alsa_input_thread, &thread_status); sem_destroy: - sem_destroy(&self->io_semaphore); + sem_destroy (&self->output_semaphore); close_jack_client: if ((error = jack_client_close(self->jack_client)) < 0) { a2j_error("Cannot close jack client"); } close_seq_client: snd_seq_close(self->seq); - close_stream: - a2j_stream_close(self); + close_playback_stream: + a2j_stream_close(self, A2J_PORT_PLAYBACK); + close_capture_stream: + a2j_stream_close(self, A2J_PORT_CAPTURE); free_ringbuffer_outbound: jack_ringbuffer_free(self->outbound_events); - jack_ringbuffer_free(self->port_del); + free_ringbuffer_del: + jack_ringbuffer_free(self->port_del); free_ringbuffer_add: jack_ringbuffer_free(self->port_add); free_self: @@ -555,10 +816,6 @@ jack_initialize (jack_client_t *client, const char* load_init) self->jack_client = client; - self->input = 1; - self->ignore_hardware_ports = 0; - self->finishing = 0; - if (load_init) { char* args = strdup (load_init); char* token; @@ -569,19 +826,14 @@ jack_initialize (jack_client_t *client, const char* load_init) if ((token = strtok_r (ptr, ", ", &savep)) == NULL) { break; } +#if 0 + /* example of how to use tokens */ if (strncasecmp (token, "in", 2) == 0) { self->input = 1; } +#endif - if (strncasecmp (token, "out", 2) == 0) { - self->input = 0; - } - - if (strncasecmp (token, "hw", 2) == 0) { - self->ignore_hardware_ports = 0; - } - ptr = NULL; } @@ -594,8 +846,8 @@ jack_initialize (jack_client_t *client, const char* load_init) } jack_set_process_callback (client, a2j_process, self); - jack_set_freewheel_callback (client, a2j_freewheel, NULL); - jack_on_shutdown (client, a2j_shutdown, NULL); + jack_set_freewheel_callback (client, a2j_freewheel, self); + jack_on_shutdown (client, a2j_shutdown, self); jack_activate (client); @@ -606,28 +858,19 @@ void jack_finish (void *arg) { struct a2j* self = (struct a2j*) arg; - void* thread_status; - self->finishing = 1; - - a2j_debug("midi: delete"); - - g_keep_alsa_walking = false; /* tell alsa io thread to stop, whenever they wake up */ - /* do something that we need to do anyway and will wake the io thread, then join */ - snd_seq_disconnect_from (self->seq, self->port_id, SND_SEQ_CLIENT_SYSTEM, SND_SEQ_PORT_SYSTEM_ANNOUNCE); - a2j_debug ("wait for ALSA io thread\n"); - pthread_join (self->alsa_io_thread, &thread_status); - a2j_debug ("thread done\n"); - + self->finishing = true; + + stop_threads (self); + sem_destroy(&self->output_semaphore); jack_ringbuffer_reset (self->port_add); - - a2j_stream_detach (&self->stream); - + a2j_stream_detach (&self->stream[A2J_PORT_CAPTURE]); + a2j_stream_detach (&self->stream[A2J_PORT_PLAYBACK]); snd_seq_close(self->seq); self->seq = NULL; - - a2j_stream_close (self); - + a2j_stream_close (self, A2J_PORT_CAPTURE); + a2j_stream_close (self, A2J_PORT_PLAYBACK); + jack_ringbuffer_free(self->outbound_events); jack_ringbuffer_free(self->port_add); jack_ringbuffer_free(self->port_del); diff --git a/drivers/a2j/port.c b/drivers/a2j/port.c index 82b4c57..fd35864 100644 --- a/drivers/a2j/port.c +++ b/drivers/a2j/port.c @@ -99,7 +99,7 @@ a2j_port_free (struct a2j_port * port) } void -a2j_port_fill_name (struct a2j_port * port_ptr, int input, snd_seq_client_info_t * client_info_ptr, +a2j_port_fill_name (struct a2j_port * port_ptr, int dir, snd_seq_client_info_t * client_info_ptr, const snd_seq_port_info_t * port_info_ptr, bool make_unique) { char *c; @@ -107,16 +107,18 @@ a2j_port_fill_name (struct a2j_port * port_ptr, int input, snd_seq_client_info_t if (make_unique) { snprintf (port_ptr->name, sizeof(port_ptr->name), - "%s [%d]: %s", + "%s [%d] %s %s", snd_seq_client_info_get_name(client_info_ptr), snd_seq_client_info_get_client(client_info_ptr), - snd_seq_port_info_get_name(port_info_ptr)); + snd_seq_port_info_get_name(port_info_ptr), + (dir == A2J_PORT_CAPTURE ? "in" : "out")); } else { snprintf (port_ptr->name, sizeof(port_ptr->name), - "%s: %s", + "%s %s %s", snd_seq_client_info_get_name(client_info_ptr), - snd_seq_port_info_get_name(port_info_ptr)); + snd_seq_port_info_get_name(port_info_ptr), + (dir == A2J_PORT_CAPTURE ? "in" : "out")); } // replace all offending characters with ' ' @@ -128,7 +130,7 @@ a2j_port_fill_name (struct a2j_port * port_ptr, int input, snd_seq_client_info_t } struct a2j_port * -a2j_port_create (struct a2j * self, snd_seq_addr_t addr, const snd_seq_port_info_t * info) +a2j_port_create (struct a2j * self, int dir, snd_seq_addr_t addr, const snd_seq_port_info_t * info) { struct a2j_port *port; int err; @@ -137,7 +139,7 @@ a2j_port_create (struct a2j * self, snd_seq_addr_t addr, const snd_seq_port_info int jack_caps; struct a2j_stream * stream_ptr; - stream_ptr = &self->stream; + stream_ptr = &self->stream[dir]; if ((err = snd_seq_client_info_malloc (&client_info_ptr)) != 0) { a2j_error("Failed to allocate client info"); @@ -164,12 +166,12 @@ a2j_port_create (struct a2j * self, snd_seq_addr_t addr, const snd_seq_port_info port->jack_port = JACK_INVALID_PORT; port->remote = addr; - a2j_port_fill_name (port, self->input, client_info_ptr, info, true); + a2j_port_fill_name (port, dir, client_info_ptr, info, false); /* Add port to list early, before registering to JACK, so map functionality is guaranteed to work during port registration */ list_add_tail (&port->siblings, &stream_ptr->list); - if (self->input) { + if (dir == A2J_PORT_CAPTURE) { jack_caps = JackPortIsOutput; } else { jack_caps = JackPortIsInput; @@ -186,10 +188,10 @@ a2j_port_create (struct a2j * self, snd_seq_addr_t addr, const snd_seq_port_info goto fail_free_port; } - if (self->input) { - err = a2j_alsa_connect_from(self, port->remote.client, port->remote.port); + if (dir == A2J_PORT_CAPTURE) { + err = a2j_alsa_connect_from (self, port->remote.client, port->remote.port); } else { - err = snd_seq_connect_to(self->seq, self->port_id, port->remote.client, port->remote.port); + err = snd_seq_connect_to (self->seq, self->port_id, port->remote.client, port->remote.port); } if (err) { diff --git a/drivers/a2j/port.h b/drivers/a2j/port.h index 281de6c..07169a3 100644 --- a/drivers/a2j/port.h +++ b/drivers/a2j/port.h @@ -22,7 +22,7 @@ #ifndef PORT_H__757ADD0F_5E53_41F7_8B7F_8119C5E8A9F1__INCLUDED #define PORT_H__757ADD0F_5E53_41F7_8B7F_8119C5E8A9F1__INCLUDED -struct a2j_port* a2j_port_create (struct a2j * self, snd_seq_addr_t addr, const snd_seq_port_info_t * info); +struct a2j_port* a2j_port_create (struct a2j * self, int dir, snd_seq_addr_t addr, const snd_seq_port_info_t * info); void a2j_port_setdead (a2j_port_hash_t hash, snd_seq_addr_t addr); void a2j_port_free (struct a2j_port * port); diff --git a/drivers/a2j/port_thread.c b/drivers/a2j/port_thread.c index 82ca922..98df0e2 100644 --- a/drivers/a2j/port_thread.c +++ b/drivers/a2j/port_thread.c @@ -77,7 +77,7 @@ a2j_find_port_by_jack_port_name( static void -a2j_update_port_type (struct a2j * self, snd_seq_addr_t addr, int caps, const snd_seq_port_info_t * info) +a2j_update_port_type (struct a2j * self, int dir, snd_seq_addr_t addr, int caps, const snd_seq_port_info_t * info) { struct a2j_stream * stream_ptr; int alsa_mask; @@ -85,10 +85,10 @@ a2j_update_port_type (struct a2j * self, snd_seq_addr_t addr, int caps, const sn a2j_debug("update_port_type(%d:%d)", addr.client, addr.port); - stream_ptr = &self->stream; + stream_ptr = &self->stream[dir]; port_ptr = a2j_find_port_by_addr(stream_ptr, addr); - if (self->input) { + if (dir == A2J_PORT_CAPTURE) { alsa_mask = SND_SEQ_PORT_CAP_SUBS_READ; } else { alsa_mask = SND_SEQ_PORT_CAP_SUBS_WRITE; @@ -101,7 +101,7 @@ a2j_update_port_type (struct a2j * self, snd_seq_addr_t addr, int caps, const sn if (port_ptr == NULL && (caps & alsa_mask) == alsa_mask) { if(jack_ringbuffer_write_space(stream_ptr->new_ports) >= sizeof(port_ptr)) { - port_ptr = a2j_port_create (self, addr, info); + port_ptr = a2j_port_create (self, dir, addr, info); if (port_ptr != NULL) { jack_ringbuffer_write(stream_ptr->new_ports, (char *)&port_ptr, sizeof(port_ptr)); } @@ -186,17 +186,13 @@ a2j_update_port (struct a2j * self, snd_seq_addr_t addr, const snd_seq_port_info return; } - if ((port_type & SND_SEQ_PORT_TYPE_HARDWARE) && self->ignore_hardware_ports) { - a2j_debug("Ignoring hardware port"); - return; - } - if (port_caps & SND_SEQ_PORT_CAP_NO_EXPORT) { a2j_debug("Ignoring no-export port"); return; } - a2j_update_port_type (self, addr, port_caps, info); + a2j_update_port_type (self, A2J_PORT_CAPTURE, addr, port_caps, info); + a2j_update_port_type (self, A2J_PORT_PLAYBACK, addr, port_caps, info); } void @@ -204,34 +200,36 @@ a2j_free_ports (jack_ringbuffer_t * ports) { struct a2j_port *port; int sz; - while ((sz = jack_ringbuffer_read(ports, (char*)&port, sizeof(port)))) { - assert (sz == sizeof(port)); - a2j_info("port deleted: %s", port->name); - list_del (&port->siblings); - a2j_port_free(port); + + while ((sz = jack_ringbuffer_read (ports, (char*)&port, sizeof(port)))) { + assert (sz == sizeof(port)); + a2j_info("port deleted: %s", port->name); + list_del (&port->siblings); + a2j_port_free(port); } } void a2j_update_ports (struct a2j * self) { - snd_seq_addr_t addr; - int size; - - while ((size = jack_ringbuffer_read(self->port_add, (char *)&addr, sizeof(addr))) != 0) { - snd_seq_port_info_t * info; - int err; - - snd_seq_port_info_alloca(&info); - - assert (size == sizeof(addr)); - assert (addr.client != self->client_id); - - if ((err = snd_seq_get_any_port_info(self->seq, addr.client, addr.port, info)) >= 0) { - a2j_update_port(self, addr, info); - } else { - //a2j_port_setdead(self->stream[A2J_PORT_CAPTURE].ports, addr); - //a2j_port_setdead(self->stream[A2J_PORT_PLAYBACK].ports, addr); - } - } + snd_seq_addr_t addr; + int size; + + while ((size = jack_ringbuffer_read(self->port_add, (char *)&addr, sizeof(addr))) != 0) { + + snd_seq_port_info_t * info; + int err; + + snd_seq_port_info_alloca(&info); + + assert (size == sizeof(addr)); + assert (addr.client != self->client_id); + + if ((err = snd_seq_get_any_port_info(self->seq, addr.client, addr.port, info)) >= 0) { + a2j_update_port(self, addr, info); + } else { + a2j_port_setdead(self->stream[A2J_PORT_CAPTURE].port_hash, addr); + a2j_port_setdead(self->stream[A2J_PORT_PLAYBACK].port_hash, addr); + } + } }