Browse Source

Remove recvBuffer from bridge server

pull/1639/head
Andrew Belt 7 years ago
parent
commit
1c5e909310
2 changed files with 99 additions and 153 deletions
  1. +2
    -6
      include/bridgeprotocol.hpp
  2. +97
    -147
      src/bridge.cpp

+ 2
- 6
include/bridgeprotocol.hpp View File

@@ -1,4 +1,5 @@
#pragma once #pragma once
#include <stdint.h>




namespace rack { namespace rack {
@@ -6,7 +7,7 @@ namespace rack {


static const int BRIDGE_NUM_PORTS = 16; static const int BRIDGE_NUM_PORTS = 16;
// A random number which prevents connection from other protocols and old Bridge versions // A random number which prevents connection from other protocols and old Bridge versions
const int BRIDGE_HELLO = 0xff00fefd;
const uint32_t BRIDGE_HELLO = 0xff00fefd;




/** All commands are called from the client and served by the server /** All commands are called from the client and served by the server
@@ -15,11 +16,6 @@ send
*/ */
enum BridgeCommand { enum BridgeCommand {
NO_COMMAND = 0, NO_COMMAND = 0,
/** Initial state of the state machine. The client should not send the command number itself, just its arguments.
send
- uint32_t hello
*/
START_COMMAND,
/** Requests the server to shut down the client */ /** Requests the server to shut down the client */
QUIT_COMMAND, QUIT_COMMAND,
/** Sets the port /** Sets the port


+ 97
- 147
src/bridge.cpp View File

@@ -33,226 +33,176 @@ static bool serverRunning;


struct BridgeClientConnection { struct BridgeClientConnection {
int client; int client;
RingBuffer<uint8_t, RECV_QUEUE_SIZE> recvQueue;
BridgeCommand currentCommand = START_COMMAND;
bool closeRequested = false;
bool running = false;

int port = -1; int port = -1;
int sampleRate = -1;
int sampleRate = 0;
int audioChannels = 0; int audioChannels = 0;
int audioBufferLength = -1;
bool audioActive = false; bool audioActive = false;


~BridgeClientConnection() { ~BridgeClientConnection() {
setPort(-1); setPort(-1);
} }


void send(const uint8_t *buffer, int length) {
/** Returns true if successful */
bool send(const void *buffer, int length) {
if (length <= 0) if (length <= 0)
return;
return false;

#ifdef ARCH_LIN #ifdef ARCH_LIN
int sendFlags = MSG_NOSIGNAL;
int flags = MSG_NOSIGNAL;
#else #else
int sendFlags = 0;
int flags = 0;
#endif #endif
ssize_t written = ::send(client, (const char*) buffer, length, sendFlags);
// We must write the entire buffer
if (written < length)
closeRequested = true;
}

template <typename T>
void send(T x) {
send((uint8_t*) &x, sizeof(x));
ssize_t actual = ::send(client, buffer, length, flags);
if (actual != length) {
running = false;
return false;
}
return true;
} }


/** Does not check if the queue has enough data.
You must do that yourself before calling this method.
*/
template <typename T> template <typename T>
T shift() {
T x;
recvQueue.shiftBuffer((uint8_t*) &x, sizeof(x));
return x;
bool send(T x) {
return send(&x, sizeof(x));
} }


void run() {
info("Bridge client connected");
/** Returns true if successful */
bool recv(void *buffer, int length) {
if (length <= 0)
return false;


while (!closeRequested) {
uint8_t buffer[RECV_BUFFER_SIZE];
#ifdef ARCH_LIN #ifdef ARCH_LIN
int recvFlags = MSG_NOSIGNAL;
int flags = MSG_NOSIGNAL;
#else #else
int recvFlags = 0;
int flags = 0;
#endif #endif
ssize_t received = ::recv(client, (char*) buffer, sizeof(buffer), recvFlags);
if (received <= 0)
break;
ssize_t actual = ::recv(client, buffer, length, flags);
if (actual != length) {
running = false;
return false;
}
return true;
}


// Make sure we can fill the buffer
if (recvQueue.capacity() < (size_t) received) {
// If we can't accept it, future messages will be incomplete
break;
}
template <typename T>
bool recv(T *x) {
return recv(x, sizeof(*x));
}


recvQueue.pushBuffer(buffer, received);
void run() {
info("Bridge client connected");

// Check hello key
uint32_t hello;
recv(&hello);
if (hello != BRIDGE_HELLO) {
info("Bridge client protocol mismatch");
return;
}


// Loop the state machine until it returns false
while (step()) {}
// Process commands until no longer running
running = true;
while (running) {
step();
} }


info("Bridge client closed"); info("Bridge client closed");
} }


/** Steps the state machine
Returns true if step() should be called again
*/
bool step() {
switch (currentCommand) {
case NO_COMMAND: {
if (recvQueue.size() >= 1) {
// Read command type
uint8_t c = shift<uint8_t>();
currentCommand = (BridgeCommand) c;
return true;
}
} break;
/** Accepts a command from the client */
void step() {
uint8_t command = NO_COMMAND;
recv(&command);


case START_COMMAND: {
// To prevent other TCP protocols from connecting, require a "password" on startup to continue the connection.
const int password = 0xff00fefd;
if (recvQueue.size() >= 4) {
int p = shift<uint32_t>();
if (p == password) {
currentCommand = NO_COMMAND;
return true;
}
else {
closeRequested = true;
}
}
switch (command) {
default:
case NO_COMMAND: {
warn("Bridge client: bad command detected, closing");
running = false;
} break; } break;


case QUIT_COMMAND: { case QUIT_COMMAND: {
closeRequested = true;
currentCommand = NO_COMMAND;
debug("Quitting!");
debug("Bridge client quitting");
running = true;
} break; } break;


case PORT_SET_COMMAND: { case PORT_SET_COMMAND: {
if (recvQueue.size() >= 1) {
int port = shift<uint8_t>();
setPort(port);
debug("Set port %d", port);
currentCommand = NO_COMMAND;
return true;
}
uint32_t port = -1;
recv(&port);
setPort(port);
} break; } break;


case MIDI_MESSAGE_SEND_COMMAND: { case MIDI_MESSAGE_SEND_COMMAND: {
if (recvQueue.size() >= 3) {
uint8_t midiBuffer[3];
recvQueue.shiftBuffer(midiBuffer, 3);
// debug("MIDI: %02x %02x %02x", midiBuffer[0], midiBuffer[1], midiBuffer[2]);
currentCommand = NO_COMMAND;
return true;
}
uint8_t midiBuffer[3];
recv(&midiBuffer);
debug("MIDI: %02x %02x %02x", midiBuffer[0], midiBuffer[1], midiBuffer[2]);
} break; } break;


case AUDIO_SAMPLE_RATE_SET_COMMAND: { case AUDIO_SAMPLE_RATE_SET_COMMAND: {
if (recvQueue.size() >= 4) {
sampleRate = shift<uint32_t>();
debug("Set sample rate %d", sampleRate);
currentCommand = NO_COMMAND;
return true;
}
uint32_t sampleRate = 0;
recv(&sampleRate);
setSampleRate(sampleRate);
} break; } break;


case AUDIO_CHANNELS_SET_COMMAND: { case AUDIO_CHANNELS_SET_COMMAND: {
if (recvQueue.size() >= 1) {
audioChannels = shift<uint8_t>();
debug("Set audio channels %d", audioChannels);
currentCommand = NO_COMMAND;
return true;
}
uint8_t channels = 0;
recv(&channels);
// TODO
} break; } break;


case AUDIO_PROCESS_COMMAND: { case AUDIO_PROCESS_COMMAND: {
if (audioBufferLength < 0) {
// Get audio buffer size
if (recvQueue.size() >= 4) {
audioBufferLength = shift<uint32_t>();
if (audioBufferLength <= RECV_QUEUE_SIZE) {
return true;
}
else {
// Audio buffer is too large
closeRequested = true;
}
}
}
else {
if (recvQueue.size() >= (size_t) (sizeof(float) * audioBufferLength)) {
// Get input buffer
int frames = audioBufferLength / 2;
float input[audioBufferLength];
recvQueue.shiftBuffer((uint8_t*) input, sizeof(float) * audioBufferLength);
// Process stream
float output[audioBufferLength];
processStream(input, output, frames);
// Send output buffer
send((uint8_t*) output, audioBufferLength * sizeof(float));

audioBufferLength = -1;
currentCommand = NO_COMMAND;
return true;
}
uint32_t length = 0;
recv(&length);
if (length == 0) {
running = false;
return;
} }

float input[length];
recv(&input, length * sizeof(float));
float output[length];
int frames = length / 2;
processStream(input, output, frames);
send(&output, length * sizeof(float));
} break; } break;


case AUDIO_ACTIVATE: { case AUDIO_ACTIVATE: {
audioActive = true; audioActive = true;
refreshAudioActive(); refreshAudioActive();
currentCommand = NO_COMMAND;
return true;
} break; } break;


case AUDIO_DEACTIVATE: { case AUDIO_DEACTIVATE: {
audioActive = false; audioActive = false;
refreshAudioActive(); refreshAudioActive();
currentCommand = NO_COMMAND;
return true;
} break;

default: {
warn("Bridge client: bad command detected, closing");
closeRequested = true;
} break; } break;
} }

// Stop looping the state machine
return false;
} }


void setPort(int newPort) {
void setPort(int port) {
// Unbind from existing port // Unbind from existing port
if (port >= 0 && connections[port] == this) {
if (audioListeners[port])
audioListeners[port]->setChannels(0, 0);
connections[port] = NULL;
if (this->port >= 0 && connections[this->port] == this) {
if (audioListeners[this->port])
audioListeners[this->port]->setChannels(0, 0);
connections[this->port] = NULL;
} }


// Bind to new port // Bind to new port
if (newPort >= 0 && !connections[newPort]) {
connections[newPort] = this;
if (port >= 0 && !connections[port]) {
this->port = port;
connections[this->port] = this;
refreshAudioActive(); refreshAudioActive();
port = newPort;
} }
else { else {
port = -1;
this->port = -1;
} }
} }


void setSampleRate(int sampleRate) {
// TODO
this->sampleRate = sampleRate;
}

void processStream(const float *input, float *output, int frames) { void processStream(const float *input, float *output, int frames) {
if (!(0 <= port && port < BRIDGE_NUM_PORTS)) if (!(0 <= port && port < BRIDGE_NUM_PORTS))
return; return;


Loading…
Cancel
Save