Browse Source

More REST tests, quick hacky code for websockets

tags/v2.1-alpha1-winvst
falkTX 6 years ago
parent
commit
f7fcf2ccdf
5 changed files with 303 additions and 128 deletions
  1. +1
    -1
      source/backend/CarlaHost.h
  2. +65
    -74
      source/frontend/carla_backend_qtweb.py
  3. +4
    -1
      source/rest/Makefile
  4. +8
    -13
      source/rest/carla-host.cpp
  5. +225
    -39
      source/rest/rest-server.cpp

+ 1
- 1
source/backend/CarlaHost.h View File

@@ -731,7 +731,7 @@ CARLA_EXPORT float carla_get_internal_parameter_value(uint pluginId, int32_t par
* Get a plugin's peak values.
* @param pluginId Plugin
*/
float* carla_get_peak_values(uint pluginId);
CARLA_EXPORT float* carla_get_peak_values(uint pluginId);

/*!
* Get a plugin's input peak value.


+ 65
- 74
source/frontend/carla_backend_qtweb.py View File

@@ -20,6 +20,7 @@
# Imports (Global)

import requests
from websocket import WebSocket, WebSocketConnectionClosedException

# ---------------------------------------------------------------------------------------------------------------------
# Imports (Custom)
@@ -29,49 +30,6 @@ from carla_backend_qt import *
import os
from time import sleep

# ---------------------------------------------------------------------------------------------------------------------
# Iterates over the content of a file-like object line-by-line.
# Based on code by Lars Kellogg-Stedman, see https://github.com/requests/requests/issues/2433

def iterate_stream_nonblock(stream, chunk_size=1024):
pending = None

while True:
try:
chunk = os.read(stream.raw.fileno(), chunk_size)
except BlockingIOError:
break
if not chunk:
break

if pending is not None:
chunk = pending + chunk
pending = None

lines = chunk.splitlines()

if lines and lines[-1]:
pending = lines.pop()

for line in lines:
yield line

if not pending:
break

if pending:
yield pending

# ---------------------------------------------------------------------------------------------------------------------

def create_stream(baseurl):
stream = requests.get("{}/stream".format(baseurl), stream=True, timeout=0.1)

if stream.encoding is None:
stream.encoding = 'utf-8'

return stream

# ---------------------------------------------------------------------------------------------------------------------
# Carla Host object for connecting to the REST API backend

@@ -79,14 +37,23 @@ class CarlaHostQtWeb(CarlaHostQtNull):
def __init__(self):
CarlaHostQtNull.__init__(self)

self.baseurl = "http://localhost:2228"
self.stream = create_stream(self.baseurl)
self.host = "localhost"
self.port = 2228

self.baseurl = "http://{}:{}".format(self.host, self.port)

self.socket = WebSocket()
self.socket.connect("ws://{}:{}/ws".format(self.host, self.port), timeout=1)

self.isRemote = True
self.isRunning = True
self.peaks = []

for i in range(99):
self.peaks.append((0.0, 0.0, 0.0, 0.0))

def get_engine_driver_count(self):
# FIXME
return int(requests.get("{}/get_engine_driver_count".format(self.baseurl)).text) - 1
return int(requests.get("{}/get_engine_driver_count".format(self.baseurl)).text)

def get_engine_driver_name(self, index):
return requests.get("{}/get_engine_driver_name".format(self.baseurl), params={
@@ -114,13 +81,22 @@ class CarlaHostQtWeb(CarlaHostQtNull):
return bool(int(requests.get("{}/engine_close".format(self.baseurl)).text))

def engine_idle(self):
closed = False
stream = self.stream
if not self.isRunning:
return

while True:
try:
line = self.socket.recv().strip()
except WebSocketConnectionClosedException:
self.isRunning = False
if self.fEngineCallback is None:
self.fEngineCallback(None, ENGINE_CALLBACK_QUIT, 0, 0, 0, 0.0, "")
return

for line in iterate_stream_nonblock(stream):
line = line.decode('utf-8', errors='ignore')
if line == "Keep-Alive":
return

if line.startswith("Carla: "):
elif line.startswith("Carla: "):
if self.fEngineCallback is None:
continue

@@ -137,15 +113,24 @@ class CarlaHostQtWeb(CarlaHostQtNull):
# pass to callback
self.fEngineCallback(None, action, pluginId, value1, value2, value3, valueStr)

elif line == "Connection: close":
if not closed:
self.stream = create_stream(self.baseurl)
closed = True
elif line.startswith("Peaks: "):
# split values from line
pluginId, value1, value2, value3, value4 = line[7:].split(" ",5)

if closed:
stream.close()
# convert to proper types
pluginId = int(pluginId)
value1 = float(value1)
value2 = float(value2)
value3 = float(value3)
value4 = float(value4)

# store peaks
self.peaks[pluginId] = (value1, value2, value3, value4)

def is_engine_running(self):
if not self.isRunning:
return False

try:
return bool(int(requests.get("{}/is_engine_running".format(self.baseurl)).text))
except requests.exceptions.ConnectionError:
@@ -192,7 +177,7 @@ class CarlaHostQtWeb(CarlaHostQtNull):

def patchbay_refresh(self, external):
return bool(int(requests.get("{}/patchbay_refresh".format(self.baseurl), params={
'external': external,
'external': int(external),
}).text))

def transport_play(self):
@@ -215,7 +200,13 @@ class CarlaHostQtWeb(CarlaHostQtNull):
return int(requests.get("{}/get_current_transport_frame".format(self.baseurl)).text)

def get_transport_info(self):
return requests.get("{}/get_transport_info".format(self.baseurl)).json()
if self.isRunning:
try:
return requests.get("{}/get_transport_info".format(self.baseurl)).json()
except requests.exceptions.ConnectionError:
if self.fEngineCallback is None:
self.fEngineCallback(None, ENGINE_CALLBACK_QUIT, 0, 0, 0, 0.0, "")
return PyCarlaTransportInfo()

def get_current_plugin_count(self):
return int(requests.get("{}/get_current_plugin_count".format(self.baseurl)).text)
@@ -411,10 +402,16 @@ class CarlaHostQtWeb(CarlaHostQtNull):
}).text)

def get_current_parameter_value(self, pluginId, parameterId):
return float(requests.get("{}/get_current_parameter_value".format(self.baseurl), params={
'pluginId': pluginId,
'parameterId': parameterId,
}).text)
if self.isRunning:
try:
return float(requests.get("{}/get_current_parameter_value".format(self.baseurl), params={
'pluginId': pluginId,
'parameterId': parameterId,
}).text)
except requests.exceptions.ConnectionError:
if self.fEngineCallback is None:
self.fEngineCallback(None, ENGINE_CALLBACK_QUIT, 0, 0, 0, 0.0, "")
return 0.0

def get_internal_parameter_value(self, pluginId, parameterId):
return float(requests.get("{}/get_internal_parameter_value".format(self.baseurl), params={
@@ -423,28 +420,22 @@ class CarlaHostQtWeb(CarlaHostQtNull):
}).text)

def get_input_peak_value(self, pluginId, isLeft):
return float(requests.get("{}/get_input_peak_value".format(self.baseurl), params={
'pluginId': pluginId,
'isLeft': isLeft,
}).text)
return self.peaks[pluginId][0 if isLeft else 1]

def get_output_peak_value(self, pluginId, isLeft):
return float(requests.get("{}/get_output_peak_value".format(self.baseurl), params={
'pluginId': pluginId,
'isLeft': isLeft,
}).text)
return self.peaks[pluginId][2 if isLeft else 3]

def set_option(self, pluginId, option, yesNo):
requests.get("{}/set_option".format(self.baseurl), params={
'pluginId': pluginId,
'option': option,
'yesNo': yesNo,
'yesNo': int(yesNo),
})

def set_active(self, pluginId, onOff):
requests.get("{}/set_active".format(self.baseurl), params={
'pluginId': pluginId,
'onOff': onOff,
'onOff': int(onOff),
})

def set_drywet(self, pluginId, value):


+ 4
- 1
source/rest/Makefile View File

@@ -23,7 +23,10 @@ endif

BUILD_CXX_FLAGS += -I$(CWD) -I$(CWD)/backend -I$(CWD)/includes -I$(CWD)/modules -I$(CWD)/utils

LINK_FLAGS += -L$(BINDIR) -lcarla_standalone2 -lcarla_utils -lrestbed -lpthread -Wl,-rpath=$(shell realpath $(CWD)/../bin)
LINK_FLAGS += -Wl,-rpath=$(shell realpath $(CWD)/../bin)
LINK_FLAGS += -L$(BINDIR) -lcarla_standalone2 -lcarla_utils
LINK_FLAGS += -lrestbed -lssl -lcrypto
LINK_FLAGS += -lpthread

# ----------------------------------------------------------------------------------------------------------------------



+ 8
- 13
source/rest/carla-host.cpp View File

@@ -24,23 +24,15 @@

static bool gEngineRunning = false;

void engine_idle_handler()
{
if (gEngineRunning)
carla_engine_idle();
}

// -------------------------------------------------------------------------------------------------------------------

static void EngineCallback(void* ptr, EngineCallbackOpcode action, uint pluginId, int value1, int value2, float value3, const char* valueStr)
{
#if 0
carla_stdout("EngineCallback(%p, %u:%s, %u, %i, %i, %f, %s)",
ptr, (uint)action, EngineCallbackOpcode2Str(action), pluginId, value1, value2, value3, valueStr);
#endif
carla_debug("EngineCallback(%p, %u:%s, %u, %i, %i, %f, %s)",
ptr, (uint)action, EngineCallbackOpcode2Str(action), pluginId, value1, value2, value3, valueStr);

char msgBuf[1024];
std::snprintf(msgBuf, 1023, "Carla: %u %u %i %i %f %s\n", action, pluginId, value1, value2, value3, valueStr);
std::snprintf(msgBuf, 1023, "Carla: %u %u %i %i %f %s", action, pluginId, value1, value2, value3, valueStr);
msgBuf[1023] = '\0';

switch (action)
@@ -56,7 +48,10 @@ static void EngineCallback(void* ptr, EngineCallbackOpcode action, uint pluginId
break;
}

send_server_side_message(msgBuf);
return send_server_side_message(msgBuf);

// maybe unused
(void)ptr;
}

static const char* FileCallback(void* ptr, FileCallbackOpcode action, bool isDir, const char* title, const char* filter)
@@ -274,7 +269,7 @@ void handle_carla_transport_bpm(const std::shared_ptr<Session> session)
const std::shared_ptr<const Request> request = session->get_request();

const double bpm = std::atof(request->get_query_parameter("bpm").c_str());
CARLA_SAFE_ASSERT_RETURN(bpm > 0.0,) // FIXME
CARLA_SAFE_ASSERT_RETURN(bpm > 0.0, session->close(OK)) // FIXME

carla_transport_bpm(bpm);
session->close(OK);


+ 225
- 39
source/rest/rest-server.cpp View File

@@ -31,11 +31,26 @@

// -------------------------------------------------------------------------------------------------------------------

std::vector<std::shared_ptr<Session>> gSessions;
#include <map>
#include <restbed>
#include <system_error>
#include <openssl/sha.h>
#include <openssl/hmac.h>
#include <openssl/evp.h>
#include <openssl/bio.h>
#include <openssl/buffer.h>

using namespace std;
using namespace restbed;
using namespace std::chrono;

// std::vector<std::shared_ptr<Session>> gSessions;

CarlaStringList gSessionMessages;
CarlaMutex gSessionMessagesMutex;

std::map< string, shared_ptr< WebSocket > > sockets = { };

// -------------------------------------------------------------------------------------------------------------------

void send_server_side_message(const char* const message)
@@ -47,20 +62,6 @@ void send_server_side_message(const char* const message)

// -------------------------------------------------------------------------------------------------------------------

static void register_server_side_handler(const std::shared_ptr<Session> session)
{
const auto headers = std::multimap<std::string, std::string> {
{ "Connection", "keep-alive" },
{ "Cache-Control", "no-cache" },
{ "Content-Type", "text/event-stream" },
{ "Access-Control-Allow-Origin", "*" } //Only required for demo purposes.
};

session->yield(OK, headers, [](const std::shared_ptr<Session> rsession) {
gSessions.push_back(rsession);
});
}

static void event_stream_handler(void)
{
static bool firstInit = true;
@@ -71,12 +72,10 @@ static void event_stream_handler(void)
carla_stdout("Carla REST-API Server started");
}

gSessions.erase(
std::remove_if(gSessions.begin(), gSessions.end(),
[](const std::shared_ptr<Session> &a) {
return a->is_closed();
}),
gSessions.end());
const bool running = carla_is_engine_running();

if (running)
carla_engine_idle();

CarlaStringList messages;

@@ -89,27 +88,214 @@ static void event_stream_handler(void)

for (auto message : messages)
{
// std::puts(message);
for (auto entry : sockets)
{
auto socket = entry.second;

if (socket->is_open())
socket->send(message);
}
}

if (running)
{
if (const uint count = carla_get_current_plugin_count())
{
char msgBuf[1024];
float* peaks;

for (uint i=0; i<count; ++i)
{
peaks = carla_get_peak_values(i);
CARLA_SAFE_ASSERT_BREAK(peaks != nullptr);

std::snprintf(msgBuf, 1023, "Peaks: %u %f %f %f %f", i, peaks[0], peaks[1], peaks[2], peaks[3]);
msgBuf[1023] = '\0';

for (auto entry : sockets)
{
auto socket = entry.second;

if (socket->is_open())
socket->send(msgBuf);
}
}
}
}

for (auto entry : sockets)
{
auto socket = entry.second;

if (socket->is_open())
socket->send("Keep-Alive");
}
}


// -------------------------------------------------------------------------------------------------------------------

string base64_encode( const unsigned char* input, int length )
{
BIO* bmem, *b64;
BUF_MEM* bptr;

b64 = BIO_new( BIO_f_base64( ) );
bmem = BIO_new( BIO_s_mem( ) );
b64 = BIO_push( b64, bmem );
BIO_write( b64, input, length );
( void ) BIO_flush( b64 );
BIO_get_mem_ptr( b64, &bptr );

char* buff = ( char* )malloc( bptr->length );
memcpy( buff, bptr->data, bptr->length - 1 );
buff[ bptr->length - 1 ] = 0;

BIO_free_all( b64 );

return buff;
}

multimap< string, string > build_websocket_handshake_response_headers( const shared_ptr< const Request >& request )
{
auto key = request->get_header( "Sec-WebSocket-Key" );
key.append( "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" );

Byte hash[ SHA_DIGEST_LENGTH ];
SHA1( reinterpret_cast< const unsigned char* >( key.data( ) ), key.length( ), hash );

multimap< string, string > headers;
headers.insert( make_pair( "Upgrade", "websocket" ) );
headers.insert( make_pair( "Connection", "Upgrade" ) );
headers.insert( make_pair( "Sec-WebSocket-Accept", base64_encode( hash, SHA_DIGEST_LENGTH ) ) );

for (auto session : gSessions)
session->yield(OK, message);
return headers;
}

void close_handler( const shared_ptr< WebSocket > socket )
{
carla_stdout("CLOSE %i", __LINE__);

if ( socket->is_open( ) )
{
auto response = make_shared< WebSocketMessage >( WebSocketMessage::CONNECTION_CLOSE_FRAME, Bytes( { 10, 00 } ) );
socket->send( response );
}
carla_stdout("CLOSE %i", __LINE__);

const auto key = socket->get_key( );
sockets.erase( key );

fprintf( stderr, "Closed connection to %s.\n", key.data( ) );
}

void error_handler( const shared_ptr< WebSocket > socket, const error_code error )
{
const auto key = socket->get_key( );
fprintf( stderr, "WebSocket Errored '%s' for %s.\n", error.message( ).data( ), key.data( ) );
}

void message_handler( const shared_ptr< WebSocket > source, const shared_ptr< WebSocketMessage > message )
{
const auto opcode = message->get_opcode( );

if (const uint count = carla_get_current_plugin_count())
if ( opcode == WebSocketMessage::PING_FRAME )
{
auto response = make_shared< WebSocketMessage >( WebSocketMessage::PONG_FRAME, message->get_data( ) );
source->send( response );
}
else if ( opcode == WebSocketMessage::PONG_FRAME )
{
char msgBuf[1024];
float* peaks;
//Ignore PONG_FRAME.
//
//Every time the ping_handler is scheduled to run, it fires off a PING_FRAME to each
//WebSocket. The client, if behaving correctly, will respond with a PONG_FRAME.
//
//On each occasion the underlying TCP socket sees any packet data transfer, whether
//a PING, PONG, TEXT, or BINARY... frame. It will automatically reset the timeout counter
//leaving the connection active; see also Settings::set_connection_timeout.
return;
}
else if ( opcode == WebSocketMessage::CONNECTION_CLOSE_FRAME )
{
source->close( );
}
else if ( opcode == WebSocketMessage::BINARY_FRAME )
{
//We don't support binary data.
auto response = make_shared< WebSocketMessage >( WebSocketMessage::CONNECTION_CLOSE_FRAME, Bytes( { 10, 03 } ) );
source->send( response );
}
else if ( opcode == WebSocketMessage::TEXT_FRAME )
{
auto response = make_shared< WebSocketMessage >( *message );
response->set_mask( 0 );

for (uint i=0; i<count; ++i)
for ( auto socket : sockets )
{
peaks = carla_get_peak_values(i);
CARLA_SAFE_ASSERT_BREAK(peaks != nullptr);
auto destination = socket.second;
destination->send( response );
}

const auto key = source->get_key( );
const auto data = String::format( "Received message '%.*s' from %s\n", message->get_data( ).size( ), message->get_data( ).data( ), key.data( ) );
fprintf( stderr, "%s", data.data( ) );
}
}

std::snprintf(msgBuf, 1023, "Peaks: %u %f %f %f %f\n", i, peaks[0], peaks[1], peaks[2], peaks[3]);
msgBuf[1023] = '\0';
void get_method_handler(const shared_ptr<Session> session)
{
carla_stdout("HERE %i", __LINE__);
const auto request = session->get_request();
const auto connection_header = request->get_header("connection", String::lowercase);
carla_stdout("HERE %i", __LINE__);

for (auto session : gSessions)
session->yield(OK, msgBuf);
if ( connection_header.find( "upgrade" ) not_eq string::npos )
{
if ( request->get_header( "upgrade", String::lowercase ) == "websocket" )
{
const auto headers = build_websocket_handshake_response_headers( request );

session->upgrade( SWITCHING_PROTOCOLS, headers, [ ]( const shared_ptr< WebSocket > socket )
{
if ( socket->is_open( ) )
{
socket->set_close_handler( close_handler );
socket->set_error_handler( error_handler );
socket->set_message_handler( message_handler );

socket->send("Welcome to Corvusoft Chat!");

auto key = socket->get_key( );
sockets[key] = socket;
}
else
{
fprintf( stderr, "WebSocket Negotiation Failed: Client closed connection.\n" );
}
} );

return;
}
}

session->close( BAD_REQUEST );
}

void ping_handler( void )
{
for ( auto entry : sockets )
{
auto key = entry.first;
auto socket = entry.second;

if ( socket->is_open( ) )
{
socket->send( WebSocketMessage::PING_FRAME );
}
else
{
socket->close( );
}
}
}
@@ -132,11 +318,11 @@ int main(int, const char**)
{
Service service;

// server-side messages
// websocket
{
std::shared_ptr<Resource> resource = std::make_shared<Resource>();
resource->set_path("/stream");
resource->set_method_handler("GET", register_server_side_handler);
resource->set_path("/ws");
resource->set_method_handler("GET", get_method_handler);
service.publish(resource);
}

@@ -251,8 +437,8 @@ int main(int, const char**)
make_resource(service, "/get_cached_plugin_info", handle_carla_get_cached_plugin_info);

// schedule events
service.schedule(engine_idle_handler); // FIXME, crashes on fast times, but we need ~30Hz for OSC..
service.schedule(event_stream_handler, std::chrono::milliseconds(500));
service.schedule(event_stream_handler, std::chrono::milliseconds(33));
service.schedule(ping_handler, milliseconds(5000));

std::shared_ptr<Settings> settings = std::make_shared<Settings>();
settings->set_port(2228);


Loading…
Cancel
Save