Browse Source

Buffer send/recv in Bridge, better network error handling

pull/1639/head
Andrew Belt 7 years ago
parent
commit
2e21a8c496
3 changed files with 62 additions and 40 deletions
  1. +1
    -1
      include/bridgeprotocol.hpp
  2. +0
    -1
      src/Core/AudioInterface.cpp
  3. +61
    -38
      src/bridge.cpp

+ 1
- 1
include/bridgeprotocol.hpp View File

@@ -36,7 +36,7 @@ enum BridgeCommand {
send send
- uint8_t msg[3] - uint8_t msg[3]
*/ */
MIDI_MESSAGE_SEND_COMMAND,
MIDI_MESSAGE_COMMAND,
/** Sets the audio sample rate /** Sets the audio sample rate
send send
- uint32_t sampleRate - uint32_t sampleRate


+ 0
- 1
src/Core/AudioInterface.cpp View File

@@ -90,7 +90,6 @@ struct AudioInterfaceIO : AudioIO {
} }


void onChannelsChange() override { void onChannelsChange() override {
debug("Channels changed %d %d", numOutputs, numInputs);
} }
}; };




+ 61
- 38
src/bridge.cpp View File

@@ -49,10 +49,14 @@ struct BridgeClientConnection {
#else #else
int flags = 0; int flags = 0;
#endif #endif
ssize_t actual = ::send(client, (const char*) buffer, length, flags);
if (actual != length) {
ready = false;
return false;
ssize_t remaining = 0;
while (remaining < length) {
ssize_t actual = ::send(client, (const char*) buffer, length, flags);
if (actual <= 0) {
ready = false;
return false;
}
remaining += actual;
} }
return true; return true;
} }
@@ -72,10 +76,14 @@ struct BridgeClientConnection {
#else #else
int flags = 0; int flags = 0;
#endif #endif
ssize_t actual = ::recv(client, (char*) buffer, length, flags);
if (actual != length) {
ready = false;
return false;
ssize_t remaining = 0;
while (remaining < length) {
ssize_t actual = ::recv(client, (char*) buffer + remaining, length - remaining, flags);
if (actual <= 0) {
ready = false;
return false;
}
remaining += actual;
} }
return true; return true;
} }
@@ -86,14 +94,12 @@ struct BridgeClientConnection {
} }


void flush() { void flush() {
int err;
// Turn off Nagle // Turn off Nagle
int flag = 1; int flag = 1;
err = setsockopt(client, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof(int));
setsockopt(client, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof(int));
// Turn on Nagle // Turn on Nagle
flag = 0; flag = 0;
err = setsockopt(client, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof(int));
(void) err;
setsockopt(client, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof(int));
} }


void run() { void run() {
@@ -118,9 +124,8 @@ struct BridgeClientConnection {


/** Accepts a command from the client */ /** Accepts a command from the client */
void step() { void step() {
uint8_t command = NO_COMMAND;
uint8_t command;
if (!recv<uint8_t>(&command)) { if (!recv<uint8_t>(&command)) {
ready = false;
return; return;
} }


@@ -132,7 +137,6 @@ struct BridgeClientConnection {
} break; } break;


case QUIT_COMMAND: { case QUIT_COMMAND: {
debug("Bridge client quitting");
ready = false; ready = false;
} break; } break;


@@ -142,10 +146,9 @@ struct BridgeClientConnection {
setPort(port); setPort(port);
} break; } break;


case MIDI_MESSAGE_SEND_COMMAND: {
case MIDI_MESSAGE_COMMAND: {
MidiMessage message; MidiMessage message;
if (!recv(&message, 3)) { if (!recv(&message, 3)) {
ready = false;
return; return;
} }
processMidi(message); processMidi(message);
@@ -167,13 +170,17 @@ struct BridgeClientConnection {


float input[BRIDGE_INPUTS * frames]; float input[BRIDGE_INPUTS * frames];
if (!recv(&input, BRIDGE_INPUTS * frames * sizeof(float))) { if (!recv(&input, BRIDGE_INPUTS * frames * sizeof(float))) {
ready = false;
debug("Failed to receive");
return; return;
} }

float output[BRIDGE_OUTPUTS * frames]; float output[BRIDGE_OUTPUTS * frames];
memset(&output, 0, sizeof(output)); memset(&output, 0, sizeof(output));
processStream(input, output, frames); processStream(input, output, frames);
send(&output, BRIDGE_OUTPUTS * frames * sizeof(float));
if (!send(&output, BRIDGE_OUTPUTS * frames * sizeof(float))) {
debug("Failed to send");
return;
}
// flush(); // flush();
} break; } break;
} }
@@ -232,23 +239,41 @@ struct BridgeClientConnection {


static void clientRun(int client) { static void clientRun(int client) {
defer({ defer({
close(client);
#ifdef ARCH_WIN
if (shutdown(client, SD_SEND)) {
warn("Bridge client shutdown() failed");
}
if (closesocket(client)) {
warn("Bridge client closesocket() failed");
}
#else
if (close(client)) {
warn("Bridge client close() failed");
}
#endif
}); });
int err;
(void) err;


#ifdef ARCH_MAC #ifdef ARCH_MAC
// Avoid SIGPIPE // Avoid SIGPIPE
int flag = 1; int flag = 1;
setsockopt(client, SOL_SOCKET, SO_NOSIGPIPE, &flag, sizeof(int));
if (setsockopt(client, SOL_SOCKET, SO_NOSIGPIPE, &flag, sizeof(int))) {
warn("Bridge client setsockopt() failed");
return;
}
#endif #endif


// Disable non-blocking // Disable non-blocking
#ifdef ARCH_WIN #ifdef ARCH_WIN
unsigned long blockingMode = 0; unsigned long blockingMode = 0;
ioctlsocket(client, FIONBIO, &blockingMode);
if (ioctlsocket(client, FIONBIO, &blockingMode)) {
warn("Bridge client ioctlsocket() failed");
return;
}
#else #else
err = fcntl(client, F_SETFL, fcntl(client, F_GETFL, 0) & ~O_NONBLOCK);
if (fcntl(client, F_SETFL, fcntl(client, F_GETFL, 0) & ~O_NONBLOCK)) {
warn("Bridge client fcntl() failed");
return;
}
#endif #endif


BridgeClientConnection connection; BridgeClientConnection connection;
@@ -258,19 +283,16 @@ static void clientRun(int client) {




static void serverConnect() { static void serverConnect() {
int err;

// Initialize sockets // Initialize sockets
#ifdef ARCH_WIN #ifdef ARCH_WIN
WSADATA wsaData; WSADATA wsaData;
err = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (WSAStartup(MAKEWORD(2, 2), &wsaData)) {
warn("Bridge server WSAStartup() failed");
return;
}
defer({ defer({
WSACleanup(); WSACleanup();
}); });
if (err) {
warn("Could not initialize Winsock");
return;
}
#endif #endif


// Get address // Get address
@@ -295,15 +317,13 @@ static void serverConnect() {
}); });


// Bind socket to address // Bind socket to address
err = bind(server, (struct sockaddr*) &addr, sizeof(addr));
if (err) {
if (bind(server, (struct sockaddr*) &addr, sizeof(addr))) {
warn("Bridge server bind() failed"); warn("Bridge server bind() failed");
return; return;
} }


// Listen for clients // Listen for clients
err = listen(server, 20);
if (err) {
if (listen(server, 20)) {
warn("Bridge server listen() failed"); warn("Bridge server listen() failed");
return; return;
} }
@@ -312,10 +332,13 @@ static void serverConnect() {
// Enable non-blocking // Enable non-blocking
#ifdef ARCH_WIN #ifdef ARCH_WIN
unsigned long blockingMode = 1; unsigned long blockingMode = 1;
ioctlsocket(server, FIONBIO, &blockingMode);
if (ioctlsocket(server, FIONBIO, &blockingMode)) {
warn("Bridge server ioctlsocket() failed");
return;
}
#else #else
int flags = fcntl(server, F_GETFL, 0); int flags = fcntl(server, F_GETFL, 0);
err = fcntl(server, F_SETFL, flags | O_NONBLOCK);
fcntl(server, F_SETFL, flags | O_NONBLOCK);
#endif #endif


// Accept clients // Accept clients


Loading…
Cancel
Save