Audio plugin host https://kx.studio/carla
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

165 lines
5.7KB

  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import math
  4. import lilv
  5. import sys
  6. import wave
  7. import numpy
  8. class WavFile(object):
  9. """Helper class for accessing wav file data. Should work on the most common
  10. formats (8 bit unsigned, 16 bit signed, 32 bit signed). Audio data is
  11. converted to float32."""
  12. # (struct format code, is_signedtype) for each sample width:
  13. WAV_SPECS = {
  14. 1: ("B", False),
  15. 2: ("h", True),
  16. 4: ("l", True),
  17. }
  18. def __init__(self, wav_in_path):
  19. self.wav_in = wave.open(wav_in_path, 'r')
  20. self.framerate = self.wav_in.getframerate()
  21. self.nframes = self.wav_in.getnframes()
  22. self.nchannels = self.wav_in.getnchannels()
  23. self.sampwidth = self.wav_in.getsampwidth()
  24. wav_spec = self.WAV_SPECS[self.sampwidth]
  25. self.struct_fmt_code, self.signed = wav_spec
  26. self.range = 2 ** (8*self.sampwidth)
  27. def read(self):
  28. """Read data from an open wav file. Return a list of channels, where each
  29. channel is a list of floats."""
  30. raw_bytes = self.wav_in.readframes(self.nframes)
  31. struct_fmt = "%u%s" % (len(raw_bytes) / self.sampwidth, self.struct_fmt_code)
  32. data = wave.struct.unpack(struct_fmt, raw_bytes)
  33. if self.signed:
  34. data = [i / float(self.range/2) for i in data]
  35. else:
  36. data = [(i - float(range/2)) / float(range/2) for i in data]
  37. channels = []
  38. for i in xrange(self.nchannels):
  39. channels.append([data[j] for j in xrange(0, len(data), self.nchannels) ])
  40. return channels
  41. def close(self):
  42. self.wav_in.close()
  43. def main():
  44. # Read command line arguments
  45. if len(sys.argv) != 4:
  46. print('USAGE: lv2_apply.py PLUGIN_URI INPUT_WAV OUTPUT_WAV')
  47. sys.exit(1)
  48. # Initialise Lilv
  49. world = lilv.World()
  50. world.load_all()
  51. plugin_uri = sys.argv[1]
  52. wav_in_path = sys.argv[2]
  53. wav_out_path = sys.argv[3]
  54. # Find plugin
  55. plugin_uri_node = world.new_uri(plugin_uri)
  56. plugin = world.get_all_plugins().get_by_uri(plugin_uri_node)
  57. if not plugin:
  58. print("Unknown plugin `%s'\n" % plugin_uri)
  59. sys.exit(1)
  60. lv2_InputPort = world.new_uri(lilv.LILV_URI_INPUT_PORT)
  61. lv2_OutputPort = world.new_uri(lilv.LILV_URI_OUTPUT_PORT)
  62. lv2_AudioPort = world.new_uri(lilv.LILV_URI_AUDIO_PORT)
  63. lv2_ControlPort = world.new_uri(lilv.LILV_URI_CONTROL_PORT)
  64. lv2_default = world.new_uri("http://lv2plug.in/ns/lv2core#default")
  65. n_audio_in = plugin.get_num_ports_of_class(lv2_InputPort, lv2_AudioPort)
  66. n_audio_out = plugin.get_num_ports_of_class(lv2_OutputPort, lv2_AudioPort)
  67. if n_audio_out == 0:
  68. print("Plugin has no audio outputs\n")
  69. sys.exit(1)
  70. # Open input file
  71. try:
  72. wav_in = WavFile(wav_in_path)
  73. except:
  74. print("Failed to open input `%s'\n" % wav_in_path)
  75. sys.exit(1)
  76. if wav_in.nchannels != n_audio_in:
  77. print("Input has %d channels, but plugin has %d audio inputs\n" % (
  78. wav_in.nchannels, n_audio_in))
  79. sys.exit(1)
  80. # Open output file
  81. wav_out = wave.open(wav_out_path, 'w')
  82. if not wav_out:
  83. print("Failed to open output `%s'\n" % wav_out_path)
  84. sys.exit(1)
  85. # Set output file to same format as input (except possibly nchannels)
  86. wav_out.setparams(wav_in.wav_in.getparams())
  87. wav_out.setnchannels(n_audio_out)
  88. print('%s => %s => %s @ %d Hz'
  89. % (wav_in_path, plugin.get_name(), wav_out_path, wav_in.framerate))
  90. instance = lilv.Instance(plugin, wav_in.framerate)
  91. channels = wav_in.read()
  92. wav_in.close()
  93. # Connect all ports to buffers. NB if we fail to connect any buffer, lilv
  94. # will segfault.
  95. audio_input_buffers = []
  96. audio_output_buffers = []
  97. control_input_buffers = []
  98. control_output_buffers = []
  99. for index in range(plugin.get_num_ports()):
  100. port = plugin.get_port_by_index(index)
  101. if port.is_a(lv2_InputPort):
  102. if port.is_a(lv2_AudioPort):
  103. audio_input_buffers.append(numpy.array(channels[len(audio_input_buffers)], numpy.float32))
  104. instance.connect_port(index, audio_input_buffers[-1])
  105. elif port.is_a(lv2_ControlPort):
  106. #if port.has_property(lv2_default): # Doesn't seem to work
  107. default = lilv.lilv_node_as_float(lilv.lilv_nodes_get_first(port.get_value(lv2_default)))
  108. control_input_buffers.append(numpy.array([default], numpy.float32))
  109. instance.connect_port(index, control_input_buffers[-1])
  110. else:
  111. raise ValueError("Unhandled port type")
  112. elif port.is_a(lv2_OutputPort):
  113. if port.is_a(lv2_AudioPort):
  114. audio_output_buffers.append(numpy.array([0] * wav_in.nframes, numpy.float32))
  115. instance.connect_port(index, audio_output_buffers[-1])
  116. elif port.is_a(lv2_ControlPort):
  117. control_output_buffers.append(numpy.array([0], numpy.float32))
  118. instance.connect_port(index, control_output_buffers[-1])
  119. else:
  120. raise ValueError("Unhandled port type")
  121. # Run the plugin:
  122. instance.run(wav_in.nframes)
  123. # Interleave output buffers:
  124. data = numpy.dstack(audio_output_buffers).flatten()
  125. # Return to original int range:
  126. if wav_in.signed:
  127. data = data * float(wav_in.range / 2)
  128. else:
  129. data = (data + 1) * float(wav_in.range/2)
  130. # Write output file in chunks to stop memory usage getting out of hand:
  131. CHUNK_SIZE = 8192
  132. for chunk in numpy.array_split(data, CHUNK_SIZE):
  133. wav_out.writeframes(wave.struct.pack("%u%s" % (len(chunk), wav_in.struct_fmt_code), *chunk))
  134. wav_out.close()
  135. if __name__ == "__main__":
  136. main()