#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Carla Backend code (Web stuff) # Copyright (C) 2018 Filipe Coelho # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License as # published by the Free Software Foundation; either version 2 of # the License, or any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # For a full copy of the GNU General Public License see the doc/GPL.txt file. # --------------------------------------------------------------------------------------------------------------------- # Imports (Global) import requests # --------------------------------------------------------------------------------------------------------------------- # Imports (Custom) from carla_backend_qt import * import os from time import sleep # --------------------------------------------------------------------------------------------------------------------- # Iterates over the content of a file-like object line-by-line. # Based on code by Lars Kellogg-Stedman, see https://github.com/requests/requests/issues/2433 def iterate_stream_nonblock(stream, chunk_size=1024): pending = None while True: try: chunk = os.read(stream.raw.fileno(), chunk_size) except BlockingIOError: break if not chunk: break if pending is not None: chunk = pending + chunk pending = None lines = chunk.splitlines() if lines and lines[-1]: pending = lines.pop() for line in lines: yield line if not pending: break if pending: yield pending # --------------------------------------------------------------------------------------------------------------------- def create_stream(baseurl): stream = requests.get("{}/stream".format(baseurl), stream=True, timeout=0.1) if stream.encoding is None: stream.encoding = 'utf-8' return stream # --------------------------------------------------------------------------------------------------------------------- # Carla Host object for connecting to the REST API backend class CarlaHostQtWeb(CarlaHostQtNull): def __init__(self): CarlaHostQtNull.__init__(self) self.baseurl = "http://localhost:2228" self.stream = create_stream(self.baseurl) self.isRemote = True def get_engine_driver_count(self): # FIXME return int(requests.get("{}/get_engine_driver_count".format(self.baseurl)).text) - 1 def get_engine_driver_name(self, index): return requests.get("{}/get_engine_driver_name".format(self.baseurl), params={ 'index': index, }).text def get_engine_driver_device_names(self, index): return requests.get("{}/get_engine_driver_device_names".format(self.baseurl), params={ 'index': index, }).text.split("\n") def get_engine_driver_device_info(self, index, name): return requests.get("{}/get_engine_driver_device_info".format(self.baseurl), params={ 'index': index, 'name': name, }).json() def engine_init(self, driverName, clientName): return bool(int(requests.get("{}/engine_init".format(self.baseurl), params={ 'driverName': driverName, 'clientName': clientName, }).text)) def engine_close(self): return bool(int(requests.get("{}/engine_close".format(self.baseurl)).text)) def engine_idle(self): closed = False stream = self.stream for line in iterate_stream_nonblock(stream): line = line.decode('utf-8', errors='ignore') if line.startswith("Carla: "): if self.fEngineCallback is None: continue # split values from line action, pluginId, value1, value2, value3, valueStr = line[7:].split(" ",5) # convert to proper types action = int(action) pluginId = int(pluginId) value1 = int(value1) value2 = int(value2) value3 = float(value3) # pass to callback self.fEngineCallback(None, action, pluginId, value1, value2, value3, valueStr) elif line == "Connection: close": if not closed: self.stream = create_stream(self.baseurl) closed = True if closed: stream.close() def is_engine_running(self): try: return bool(int(requests.get("{}/is_engine_running".format(self.baseurl)).text)) except requests.exceptions.ConnectionError: if self.fEngineCallback is None: self.fEngineCallback(None, ENGINE_CALLBACK_QUIT, 0, 0, 0, 0.0, "") def set_engine_about_to_close(self): return bool(int(requests.get("{}/set_engine_about_to_close".format(self.baseurl)).text)) def set_engine_option(self, option, value, valueStr): requests.get("{}/set_engine_option".format(self.baseurl), params={ 'option': option, 'value': value, 'valueStr': valueStr, }) def load_file(self, filename): return bool(int(requests.get("{}/load_file".format(self.baseurl), params={ 'filename': filename, }).text)) def load_project(self, filename): return bool(int(requests.get("{}/load_project".format(self.baseurl), params={ 'filename': filename, }).text)) def save_project(self, filename): return bool(int(requests.get("{}/save_project".format(self.baseurl), params={ 'filename': filename, }).text)) def patchbay_connect(self, groupIdA, portIdA, groupIdB, portIdB): return bool(int(requests.get("{}/patchbay_connect".format(self.baseurl), params={ 'groupIdA': groupIdA, 'portIdA': portIdA, 'groupIdB': groupIdB, 'portIdB': portIdB, }).text)) def patchbay_disconnect(self, connectionId): return bool(int(requests.get("{}/patchbay_disconnect".format(self.baseurl), params={ 'connectionId': connectionId, }).text)) def patchbay_refresh(self, external): return bool(int(requests.get("{}/patchbay_refresh".format(self.baseurl), params={ 'external': external, }).text)) def transport_play(self): requests.get("{}/transport_play".format(self.baseurl)) def transport_pause(self): requests.get("{}/transport_pause".format(self.baseurl)) def transport_bpm(self, bpm): requests.get("{}/transport_bpm".format(self.baseurl), params={ 'bpm': bpm, }) def transport_relocate(self, frame): requests.get("{}/transport_relocate".format(self.baseurl), params={ 'frame': frame, }) def get_current_transport_frame(self): return int(requests.get("{}/get_current_transport_frame".format(self.baseurl)).text) def get_transport_info(self): return requests.get("{}/get_transport_info".format(self.baseurl)).json() def get_current_plugin_count(self): return int(requests.get("{}/get_current_plugin_count".format(self.baseurl)).text) def get_max_plugin_number(self): return int(requests.get("{}/get_max_plugin_number".format(self.baseurl)).text) def add_plugin(self, btype, ptype, filename, name, label, uniqueId, extraPtr, options): return bool(int(requests.get("{}/add_plugin".format(self.baseurl), params={ 'btype': btype, 'ptype': ptype, 'filename': filename, 'name': name, 'label': label, 'uniqueId': uniqueId, 'options': options, }).text)) def remove_plugin(self, pluginId): return bool(int(requests.get("{}/remove_plugin".format(self.baseurl), params={ 'filename': pluginId, }).text)) def remove_all_plugins(self): return bool(int(requests.get("{}/remove_all_plugins".format(self.baseurl)).text)) def rename_plugin(self, pluginId, newName): return requests.get("{}/rename_plugin".format(self.baseurl), params={ 'pluginId': pluginId, 'newName': newName, }).text def clone_plugin(self, pluginId): return bool(int(requests.get("{}/clone_plugin".format(self.baseurl), params={ 'pluginId': pluginId, }).text)) def replace_plugin(self, pluginId): return bool(int(requests.get("{}/replace_plugin".format(self.baseurl), params={ 'pluginId': pluginId, }).text)) def switch_plugins(self, pluginIdA, pluginIdB): return bool(int(requests.get("{}/switch_plugins".format(self.baseurl), params={ 'pluginIdA': pluginIdA, 'pluginIdB': pluginIdB, }).text)) def load_plugin_state(self, pluginId, filename): return bool(int(requests.get("{}/load_plugin_state".format(self.baseurl), params={ 'pluginId': pluginId, 'filename': filename, }).text)) def save_plugin_state(self, pluginId, filename): return bool(int(requests.get("{}/save_plugin_state".format(self.baseurl), params={ 'pluginId': pluginId, 'filename': filename, }).text)) def export_plugin_lv2(self, pluginId, lv2path): return bool(int(requests.get("{}/export_plugin_lv2".format(self.baseurl), params={ 'pluginId': pluginId, 'lv2path': lv2path, }).text)) def get_plugin_info(self, pluginId): return requests.get("{}/get_plugin_info".format(self.baseurl), params={ 'pluginId': pluginId, }).json() def get_audio_port_count_info(self, pluginId): return requests.get("{}/get_audio_port_count_info".format(self.baseurl), params={ 'pluginId': pluginId, }).json() def get_midi_port_count_info(self, pluginId): return requests.get("{}/get_midi_port_count_info".format(self.baseurl), params={ 'pluginId': pluginId, }).json() def get_parameter_count_info(self, pluginId): return requests.get("{}/get_parameter_count_info".format(self.baseurl), params={ 'pluginId': pluginId, }).json() def get_parameter_info(self, pluginId, parameterId): return requests.get("{}/get_parameter_info".format(self.baseurl), params={ 'pluginId': pluginId, 'parameterId': parameterId, }).json() def get_parameter_scalepoint_info(self, pluginId, parameterId, scalePointId): return requests.get("{}/get_parameter_scalepoint_info".format(self.baseurl), params={ 'pluginId': pluginId, 'parameterId': parameterId, 'scalePointId': scalePointId, }).json() def get_parameter_data(self, pluginId, parameterId): return requests.get("{}/get_parameter_data".format(self.baseurl), params={ 'pluginId': pluginId, 'parameterId': parameterId, }).json() def get_parameter_ranges(self, pluginId, parameterId): return requests.get("{}/get_parameter_ranges".format(self.baseurl), params={ 'pluginId': pluginId, 'parameterId': parameterId, }).json() def get_midi_program_data(self, pluginId, midiProgramId): return requests.get("{}/get_midi_program_data".format(self.baseurl), params={ 'pluginId': pluginId, 'midiProgramId': midiProgramId, }).json() def get_custom_data(self, pluginId, customDataId): return requests.get("{}/get_custom_data".format(self.baseurl), params={ 'pluginId': pluginId, 'customDataId': customDataId, }).json() def get_custom_data_value(self, pluginId, type_, key): return requests.get("{}/get_custom_data_value".format(self.baseurl), params={ 'pluginId': pluginId, 'type_': type_, 'key': key, }).text def get_chunk_data(self, pluginId): return requests.get("{}/get_chunk_data".format(self.baseurl), params={ 'pluginId': pluginId, }).text def get_parameter_count(self, pluginId): return int(requests.get("{}/get_parameter_count".format(self.baseurl), params={ 'pluginId': pluginId, }).text) def get_program_count(self, pluginId): return int(requests.get("{}/get_program_count".format(self.baseurl), params={ 'pluginId': pluginId, }).text) def get_midi_program_count(self, pluginId): return int(requests.get("{}/get_midi_program_count".format(self.baseurl), params={ 'pluginId': pluginId, }).text) def get_custom_data_count(self, pluginId): return int(requests.get("{}/get_custom_data_count".format(self.baseurl), params={ 'pluginId': pluginId, }).text) def get_parameter_text(self, pluginId, parameterId): return requests.get("{}/get_parameter_text".format(self.baseurl), params={ 'pluginId': pluginId, 'parameterId': parameterId, }).text def get_program_name(self, pluginId, programId): return requests.get("{}/get_program_name".format(self.baseurl), params={ 'pluginId': pluginId, 'programId': programId, }).text def get_midi_program_name(self, pluginId, midiProgramId): return requests.get("{}/get_midi_program_name".format(self.baseurl), params={ 'pluginId': pluginId, 'midiProgramId': midiProgramId, }).text def get_real_plugin_name(self, pluginId): return requests.get("{}/get_real_plugin_name".format(self.baseurl), params={ 'pluginId': pluginId, }).text def get_current_program_index(self, pluginId): return int(requests.get("{}/get_custom_data_count".format(self.baseurl), params={ 'pluginId': pluginId, }).text) def get_current_midi_program_index(self, pluginId): return int(requests.get("{}/get_custom_data_count".format(self.baseurl), params={ 'pluginId': pluginId, }).text) def get_default_parameter_value(self, pluginId, parameterId): return float(requests.get("{}/get_default_parameter_value".format(self.baseurl), params={ 'pluginId': pluginId, 'parameterId': parameterId, }).text) def get_current_parameter_value(self, pluginId, parameterId): return float(requests.get("{}/get_current_parameter_value".format(self.baseurl), params={ 'pluginId': pluginId, 'parameterId': parameterId, }).text) def get_internal_parameter_value(self, pluginId, parameterId): return float(requests.get("{}/get_internal_parameter_value".format(self.baseurl), params={ 'pluginId': pluginId, 'parameterId': parameterId, }).text) def get_input_peak_value(self, pluginId, isLeft): return float(requests.get("{}/get_input_peak_value".format(self.baseurl), params={ 'pluginId': pluginId, 'isLeft': isLeft, }).text) def get_output_peak_value(self, pluginId, isLeft): return float(requests.get("{}/get_output_peak_value".format(self.baseurl), params={ 'pluginId': pluginId, 'isLeft': isLeft, }).text) def set_option(self, pluginId, option, yesNo): requests.get("{}/set_option".format(self.baseurl), params={ 'pluginId': pluginId, 'option': option, 'yesNo': yesNo, }) def set_active(self, pluginId, onOff): requests.get("{}/set_active".format(self.baseurl), params={ 'pluginId': pluginId, 'onOff': onOff, }) def set_drywet(self, pluginId, value): requests.get("{}/set_drywet".format(self.baseurl), params={ 'pluginId': pluginId, 'value': value, }) def set_volume(self, pluginId, value): requests.get("{}/set_volume".format(self.baseurl), params={ 'pluginId': pluginId, 'value': value, }) def set_balance_left(self, pluginId, value): requests.get("{}/set_balance_left".format(self.baseurl), params={ 'pluginId': pluginId, 'value': value, }) def set_balance_right(self, pluginId, value): requests.get("{}/set_balance_right".format(self.baseurl), params={ 'pluginId': pluginId, 'value': value, }) def set_panning(self, pluginId, value): requests.get("{}/set_panning".format(self.baseurl), params={ 'pluginId': pluginId, 'value': value, }) def set_ctrl_channel(self, pluginId, channel): requests.get("{}/set_ctrl_channel".format(self.baseurl), params={ 'pluginId': pluginId, 'channel': channel, }) def set_parameter_value(self, pluginId, parameterId, value): requests.get("{}/set_parameter_value".format(self.baseurl), params={ 'pluginId': pluginId, 'parameterId': parameterId, 'value': value, }) def set_parameter_midi_channel(self, pluginId, parameterId, channel): requests.get("{}/set_parameter_midi_channel".format(self.baseurl), params={ 'pluginId': pluginId, 'parameterId': parameterId, 'channel': channel, }) def set_parameter_midi_cc(self, pluginId, parameterId, cc): requests.get("{}/set_parameter_midi_cc".format(self.baseurl), params={ 'pluginId': pluginId, 'parameterId': parameterId, 'cc': cc, }) def set_program(self, pluginId, programId): requests.get("{}/set_program".format(self.baseurl), params={ 'pluginId': pluginId, }) def set_midi_program(self, pluginId, midiProgramId): requests.get("{}/set_midi_program".format(self.baseurl), params={ 'pluginId': pluginId, 'midiProgramId': midiProgramId, }) def set_custom_data(self, pluginId, type_, key, value): requests.get("{}/set_custom_data".format(self.baseurl), params={ 'pluginId': pluginId, 'type': type_, 'key': key, 'value': value, }) def set_chunk_data(self, pluginId, chunkData): requests.get("{}/set_chunk_data".format(self.baseurl), params={ 'pluginId': pluginId, 'chunkData': chunkData, }) def prepare_for_save(self, pluginId): requests.get("{}/prepare_for_save".format(self.baseurl), params={ 'pluginId': pluginId, }) def reset_parameters(self, pluginId): requests.get("{}/reset_parameters".format(self.baseurl), params={ 'pluginId': pluginId, }) def randomize_parameters(self, pluginId): requests.get("{}/randomize_parameters".format(self.baseurl), params={ 'pluginId': pluginId, }) def send_midi_note(self, pluginId, channel, note, velocity): requests.get("{}/send_midi_note".format(self.baseurl), params={ 'pluginId': pluginId, 'channel': channel, 'note': note, 'velocity': velocity, }) def get_buffer_size(self): return int(requests.get("{}/get_buffer_size".format(self.baseurl)).text) def get_sample_rate(self): return float(requests.get("{}/get_sample_rate".format(self.baseurl)).text) def get_last_error(self): return requests.get("{}/get_last_error".format(self.baseurl)).text def get_host_osc_url_tcp(self): return requests.get("{}/get_host_osc_url_tcp".format(self.baseurl)).text def get_host_osc_url_udp(self): return requests.get("{}/get_host_osc_url_udp".format(self.baseurl)).text