Browse Source

Bridge refactoring

tags/v0.6.0
Andrew Belt 7 years ago
parent
commit
f18b182ccd
3 changed files with 72 additions and 56 deletions
  1. +1
    -1
      include/bridge.hpp
  2. +1
    -1
      src/audio.cpp
  3. +70
    -54
      src/bridge.cpp

+ 1
- 1
include/bridge.hpp View File

@@ -12,7 +12,7 @@ void bridgeInit();
void bridgeDestroy();
void bridgeAudioSubscribe(int channel, AudioIO *audio);
void bridgeAudioUnsubscribe(int channel, AudioIO *audio);
bool bridgeAudioIsSubscribed(int channel, AudioIO *audio);
bool bridgeAudioIsActive(int channel, AudioIO *audio);


} // namespace rack

+ 1
- 1
src/audio.cpp View File

@@ -298,7 +298,7 @@ bool AudioIO::isActive() {
return rtAudio->isStreamRunning();
}
if (driver == BRIDGE_DRIVER) {
bridgeAudioIsSubscribed(device, this);
bridgeAudioIsActive(device, this);
}
return false;
}


+ 70
- 54
src/bridge.cpp View File

@@ -43,6 +43,7 @@ static bool serverQuit;


struct BridgeClientConnection {
int client;
RingBuffer<uint8_t, RECV_QUEUE_SIZE> recvQueue;
BridgeCommand currentCommand = START_COMMAND;
bool closeRequested = false;
@@ -51,6 +52,24 @@ struct BridgeClientConnection {
int audioChannels = 0;
int audioBufferLength = -1;

void send(const uint8_t *buffer, int length) {
if (length <= 0)
return;
#ifdef ARCH_LIN
int sendFlags = MSG_NOSIGNAL;
#else
int sendFlags = 0;
#endif
ssize_t written = ::send(client, buffer, length, sendFlags);
if (written < 0)
closeRequested = true;
}

template <typename T>
void send(T x) {
send((uint8_t*) &x, sizeof(x));
}

/** Does not check if the queue has enough data.
You must do that yourself before calling this method.
*/
@@ -139,12 +158,18 @@ struct BridgeClientConnection {
}
else {
if (recvQueue.size() >= (size_t) (sizeof(float) * audioBufferLength)) {
// Get input buffer
int frames = audioBufferLength / 2;
float input[audioBufferLength];
float output[audioBufferLength];
memset(output, 0, sizeof(output));
recvQueue.shiftBuffer((uint8_t*) input, sizeof(float) * audioBufferLength);
int frames = audioBufferLength / 2;
// Process stream
float output[audioBufferLength];
processStream(input, output, frames);
// Send output buffer
send<uint8_t>(AUDIO_BUFFER_SEND_COMMAND);
send<uint32_t>(audioBufferLength);
send((uint8_t*) output, audioBufferLength * sizeof(float));

audioBufferLength = -1;
currentCommand = NO_COMMAND;
return true;
@@ -170,20 +195,6 @@ struct BridgeClientConnection {
return false;
}

void recv(uint8_t *buffer, int length) {
// Make sure we can fill the buffer
if (recvQueue.capacity() < (size_t) length) {
// If we can't accept it, future messages will be incomplete
closeRequested = true;
return;
}

recvQueue.pushBuffer(buffer, length);

// Loop the state machine until it returns false
while (step()) {}
}

void processStream(const float *input, float *output, int frames) {
if (!(0 <= channel && channel < BRIDGE_CHANNELS))
return;
@@ -191,27 +202,43 @@ struct BridgeClientConnection {
return;
audioListeners[channel]->processStream(input, output, frames);
}
};

void run() {
info("Bridge client connected");

while (!closeRequested) {
uint8_t buffer[RECV_BUFFER_SIZE];
#ifdef ARCH_LIN
int recvFlags = MSG_NOSIGNAL;
#else
int recvFlags = 0;
#endif
ssize_t received = ::recv(client, (char*) buffer, sizeof(buffer), recvFlags);
if (received <= 0)
break;

static void clientRun(int client) {
int err;
BridgeClientConnection connection;
// Make sure we can fill the buffer
if (recvQueue.capacity() < (size_t) received) {
// If we can't accept it, future messages will be incomplete
break;
}

recvQueue.pushBuffer(buffer, received);

// Loop the state machine until it returns false
while (step()) {}
}

// // Get client address
// struct sockaddr_in addr;
// socklen_t clientAddrLen = sizeof(addr);
// err = getpeername(client, (struct sockaddr*) &addr, &clientAddrLen);
// assert(!err);
info("Bridge client closed");
}
};

// // Get client IP address
// struct in_addr ipAddr = addr.sin_addr;
// char ipBuffer[INET_ADDRSTRLEN];
// inet_ntop(AF_INET, &ipAddr, ipBuffer, INET_ADDRSTRLEN);

// info("Bridge client %s connected", ipBuffer);
info("Bridge client connected");
static void clientRun(int client) {
defer({
close(client);
});
int err;

#ifdef ARCH_MAC
// Avoid SIGPIPE
@@ -219,29 +246,18 @@ static void clientRun(int client) {
setsockopt(client, SOL_SOCKET, SO_NOSIGPIPE, &flag, sizeof(int));
#endif

// Disable non-blocking
#ifdef ARCH_WIN
unsigned long blockingMode = 1;
unsigned long blockingMode = 0;
ioctlsocket(client, FIONBIO, &blockingMode);
#else
err = fcntl(client, F_SETFL, fcntl(client, F_GETFL, 0) & ~O_NONBLOCK);
(void) err;
#endif

while (!connection.closeRequested) {
uint8_t buffer[RECV_BUFFER_SIZE];
#ifdef ARCH_LIN
ssize_t received = recv(client, (char*) buffer, sizeof(buffer), MSG_NOSIGNAL);
#else
ssize_t received = recv(client, (char*) buffer, sizeof(buffer), 0);
#endif
if (received <= 0)
break;

connection.recv(buffer, received);
}

err = close(client);
(void) err;
info("Bridge client closed");
BridgeClientConnection connection;
connection.client = client;
connection.run();
}


@@ -270,7 +286,7 @@ static void serverRun() {
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
hints.ai_flags = AI_PASSIVE;
err = getaddrinfo(NULL, "5000", &hints, &result);
err = getaddrinfo("127.0.0.1", "5000", &hints, &result);
if (err) {
warn("Could not get Bridge server address");
return;
@@ -301,7 +317,6 @@ static void serverRun() {
close(server);
});


// Bind socket to address
#ifdef ARCH_WIN
err = bind(server, result->ai_addr, (int)result->ai_addrlen);
@@ -321,12 +336,13 @@ static void serverRun() {
}
info("Bridge server started");

// Make server non-blocking
// Enable non-blocking
#ifdef ARCH_WIN
unsigned long blockingMode = 1;
ioctlsocket(server, FIONBIO, &blockingMode);
#else
err = fcntl(server, F_SETFL, fcntl(server, F_GETFL, 0) | O_NONBLOCK);
int flags = fcntl(server, F_GETFL, 0);
err = fcntl(server, F_SETFL, flags | O_NONBLOCK);
#endif

// Accept clients
@@ -373,7 +389,7 @@ void bridgeAudioUnsubscribe(int channel, AudioIO *audio) {
audioListeners[channel] = NULL;
}

bool bridgeAudioIsSubscribed(int channel, AudioIO *audio) {
bool bridgeAudioIsActive(int channel, AudioIO *audio) {
if (!(0 <= channel && channel < BRIDGE_CHANNELS))
return false;
return (audioListeners[channel] == audio);


Loading…
Cancel
Save