From ffeb1255fb3c078b002d405d9934d9a46d09e1fe Mon Sep 17 00:00:00 2001 From: Andrew Belt Date: Thu, 15 Mar 2018 02:37:55 -0400 Subject: [PATCH] Bridge refactoring --- include/bridge.hpp | 2 +- src/audio.cpp | 2 +- src/bridge.cpp | 124 +++++++++++++++++++++++++-------------------- 3 files changed, 72 insertions(+), 56 deletions(-) diff --git a/include/bridge.hpp b/include/bridge.hpp index d1c9e45c..eedca7ab 100644 --- a/include/bridge.hpp +++ b/include/bridge.hpp @@ -12,7 +12,7 @@ void bridgeInit(); void bridgeDestroy(); void bridgeAudioSubscribe(int channel, AudioIO *audio); void bridgeAudioUnsubscribe(int channel, AudioIO *audio); -bool bridgeAudioIsSubscribed(int channel, AudioIO *audio); +bool bridgeAudioIsActive(int channel, AudioIO *audio); } // namespace rack diff --git a/src/audio.cpp b/src/audio.cpp index 6cdb1112..c68b0e8f 100644 --- a/src/audio.cpp +++ b/src/audio.cpp @@ -298,7 +298,7 @@ bool AudioIO::isActive() { return rtAudio->isStreamRunning(); } if (driver == BRIDGE_DRIVER) { - bridgeAudioIsSubscribed(device, this); + bridgeAudioIsActive(device, this); } return false; } diff --git a/src/bridge.cpp b/src/bridge.cpp index 2c220cf1..4846d70b 100644 --- a/src/bridge.cpp +++ b/src/bridge.cpp @@ -43,6 +43,7 @@ static bool serverQuit; struct BridgeClientConnection { + int client; RingBuffer recvQueue; BridgeCommand currentCommand = START_COMMAND; bool closeRequested = false; @@ -51,6 +52,24 @@ struct BridgeClientConnection { int audioChannels = 0; int audioBufferLength = -1; + void send(const uint8_t *buffer, int length) { + if (length <= 0) + return; +#ifdef ARCH_LIN + int sendFlags = MSG_NOSIGNAL; +#else + int sendFlags = 0; +#endif + ssize_t written = ::send(client, buffer, length, sendFlags); + if (written < 0) + closeRequested = true; + } + + template + void send(T x) { + send((uint8_t*) &x, sizeof(x)); + } + /** Does not check if the queue has enough data. You must do that yourself before calling this method. */ @@ -139,12 +158,18 @@ struct BridgeClientConnection { } else { if (recvQueue.size() >= (size_t) (sizeof(float) * audioBufferLength)) { + // Get input buffer + int frames = audioBufferLength / 2; float input[audioBufferLength]; - float output[audioBufferLength]; - memset(output, 0, sizeof(output)); recvQueue.shiftBuffer((uint8_t*) input, sizeof(float) * audioBufferLength); - int frames = audioBufferLength / 2; + // Process stream + float output[audioBufferLength]; processStream(input, output, frames); + // Send output buffer + send(AUDIO_BUFFER_SEND_COMMAND); + send(audioBufferLength); + send((uint8_t*) output, audioBufferLength * sizeof(float)); + audioBufferLength = -1; currentCommand = NO_COMMAND; return true; @@ -170,20 +195,6 @@ struct BridgeClientConnection { return false; } - void recv(uint8_t *buffer, int length) { - // Make sure we can fill the buffer - if (recvQueue.capacity() < (size_t) length) { - // If we can't accept it, future messages will be incomplete - closeRequested = true; - return; - } - - recvQueue.pushBuffer(buffer, length); - - // Loop the state machine until it returns false - while (step()) {} - } - void processStream(const float *input, float *output, int frames) { if (!(0 <= channel && channel < BRIDGE_CHANNELS)) return; @@ -191,27 +202,43 @@ struct BridgeClientConnection { return; audioListeners[channel]->processStream(input, output, frames); } -}; + void run() { + info("Bridge client connected"); + while (!closeRequested) { + uint8_t buffer[RECV_BUFFER_SIZE]; +#ifdef ARCH_LIN + int recvFlags = MSG_NOSIGNAL; +#else + int recvFlags = 0; +#endif + ssize_t received = ::recv(client, (char*) buffer, sizeof(buffer), recvFlags); + if (received <= 0) + break; -static void clientRun(int client) { - int err; - BridgeClientConnection connection; + // Make sure we can fill the buffer + if (recvQueue.capacity() < (size_t) received) { + // If we can't accept it, future messages will be incomplete + break; + } + + recvQueue.pushBuffer(buffer, received); + + // Loop the state machine until it returns false + while (step()) {} + } - // // Get client address - // struct sockaddr_in addr; - // socklen_t clientAddrLen = sizeof(addr); - // err = getpeername(client, (struct sockaddr*) &addr, &clientAddrLen); - // assert(!err); + info("Bridge client closed"); + } +}; - // // Get client IP address - // struct in_addr ipAddr = addr.sin_addr; - // char ipBuffer[INET_ADDRSTRLEN]; - // inet_ntop(AF_INET, &ipAddr, ipBuffer, INET_ADDRSTRLEN); - // info("Bridge client %s connected", ipBuffer); - info("Bridge client connected"); +static void clientRun(int client) { + defer({ + close(client); + }); + int err; #ifdef ARCH_MAC // Avoid SIGPIPE @@ -219,29 +246,18 @@ static void clientRun(int client) { setsockopt(client, SOL_SOCKET, SO_NOSIGPIPE, &flag, sizeof(int)); #endif + // Disable non-blocking #ifdef ARCH_WIN - unsigned long blockingMode = 1; + unsigned long blockingMode = 0; ioctlsocket(client, FIONBIO, &blockingMode); #else err = fcntl(client, F_SETFL, fcntl(client, F_GETFL, 0) & ~O_NONBLOCK); + (void) err; #endif - while (!connection.closeRequested) { - uint8_t buffer[RECV_BUFFER_SIZE]; -#ifdef ARCH_LIN - ssize_t received = recv(client, (char*) buffer, sizeof(buffer), MSG_NOSIGNAL); -#else - ssize_t received = recv(client, (char*) buffer, sizeof(buffer), 0); -#endif - if (received <= 0) - break; - - connection.recv(buffer, received); - } - - err = close(client); - (void) err; - info("Bridge client closed"); + BridgeClientConnection connection; + connection.client = client; + connection.run(); } @@ -270,7 +286,7 @@ static void serverRun() { hints.ai_socktype = SOCK_STREAM; hints.ai_protocol = IPPROTO_TCP; hints.ai_flags = AI_PASSIVE; - err = getaddrinfo(NULL, "5000", &hints, &result); + err = getaddrinfo("127.0.0.1", "5000", &hints, &result); if (err) { warn("Could not get Bridge server address"); return; @@ -301,7 +317,6 @@ static void serverRun() { close(server); }); - // Bind socket to address #ifdef ARCH_WIN err = bind(server, result->ai_addr, (int)result->ai_addrlen); @@ -321,12 +336,13 @@ static void serverRun() { } info("Bridge server started"); - // Make server non-blocking + // Enable non-blocking #ifdef ARCH_WIN unsigned long blockingMode = 1; ioctlsocket(server, FIONBIO, &blockingMode); #else - err = fcntl(server, F_SETFL, fcntl(server, F_GETFL, 0) | O_NONBLOCK); + int flags = fcntl(server, F_GETFL, 0); + err = fcntl(server, F_SETFL, flags | O_NONBLOCK); #endif // Accept clients @@ -373,7 +389,7 @@ void bridgeAudioUnsubscribe(int channel, AudioIO *audio) { audioListeners[channel] = NULL; } -bool bridgeAudioIsSubscribed(int channel, AudioIO *audio) { +bool bridgeAudioIsActive(int channel, AudioIO *audio) { if (!(0 <= channel && channel < BRIDGE_CHANNELS)) return false; return (audioListeners[channel] == audio);