|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
-
- import math
- import lilv
- import sys
- import wave
- import numpy
-
- class WavFile(object):
- """Helper class for accessing wav file data. Should work on the most common
- formats (8 bit unsigned, 16 bit signed, 32 bit signed). Audio data is
- converted to float32."""
-
- # (struct format code, is_signedtype) for each sample width:
- WAV_SPECS = {
- 1: ("B", False),
- 2: ("h", True),
- 4: ("l", True),
- }
-
- def __init__(self, wav_in_path):
- self.wav_in = wave.open(wav_in_path, 'r')
- self.framerate = self.wav_in.getframerate()
- self.nframes = self.wav_in.getnframes()
- self.nchannels = self.wav_in.getnchannels()
- self.sampwidth = self.wav_in.getsampwidth()
- wav_spec = self.WAV_SPECS[self.sampwidth]
- self.struct_fmt_code, self.signed = wav_spec
- self.range = 2 ** (8*self.sampwidth)
-
- def read(self):
- """Read data from an open wav file. Return a list of channels, where each
- channel is a list of floats."""
- raw_bytes = self.wav_in.readframes(self.nframes)
- struct_fmt = "%u%s" % (len(raw_bytes) / self.sampwidth, self.struct_fmt_code)
- data = wave.struct.unpack(struct_fmt, raw_bytes)
- if self.signed:
- data = [i / float(self.range/2) for i in data]
- else:
- data = [(i - float(range/2)) / float(range/2) for i in data]
-
- channels = []
- for i in range(self.nchannels):
- channels.append([data[j] for j in range(0, len(data), self.nchannels) ])
-
- return channels
-
- def close(self):
- self.wav_in.close()
-
- def main():
- # Read command line arguments
- if len(sys.argv) != 4:
- print('USAGE: lv2_apply.py PLUGIN_URI INPUT_WAV OUTPUT_WAV')
- sys.exit(1)
-
- # Initialise Lilv
- world = lilv.World()
- ns = world.ns
- world.load_all()
-
- plugin_uri = sys.argv[1]
- wav_in_path = sys.argv[2]
- wav_out_path = sys.argv[3]
-
- # Find plugin
- plugin_uri_node = world.new_uri(plugin_uri)
- plugins = world.get_all_plugins()
- if plugin_uri_node not in plugins:
- print("Unknown plugin `%s'" % plugin_uri)
- sys.exit(1)
-
- plugin = plugins[plugin_uri_node]
- n_audio_in = plugin.get_num_ports_of_class(ns.lv2.InputPort, ns.lv2.AudioPort)
- n_audio_out = plugin.get_num_ports_of_class(ns.lv2.OutputPort, ns.lv2.AudioPort)
- if n_audio_out == 0:
- print("Plugin has no audio outputs\n")
- sys.exit(1)
-
- # Open input file
- try:
- wav_in = WavFile(wav_in_path)
- except:
- print("Failed to open input `%s'\n" % wav_in_path)
- sys.exit(1)
-
- if wav_in.nchannels != n_audio_in:
- print("Input has %d channels, but plugin has %d audio inputs\n" % (
- wav_in.nchannels, n_audio_in))
- sys.exit(1)
-
- # Open output file
- wav_out = wave.open(wav_out_path, 'w')
- if not wav_out:
- print("Failed to open output `%s'\n" % wav_out_path)
- sys.exit(1)
-
- # Set output file to same format as input (except possibly nchannels)
- wav_out.setparams(wav_in.wav_in.getparams())
- wav_out.setnchannels(n_audio_out)
-
- print('%s => %s => %s @ %d Hz'
- % (wav_in_path, plugin.get_name(), wav_out_path, wav_in.framerate))
-
- instance = lilv.Instance(plugin, wav_in.framerate)
-
- channels = wav_in.read()
- wav_in.close()
-
- # Connect all ports to buffers. NB if we fail to connect any buffer, lilv
- # will segfault.
- audio_input_buffers = []
- audio_output_buffers = []
- control_input_buffers = []
- control_output_buffers = []
- for index in range(plugin.get_num_ports()):
- port = plugin.get_port_by_index(index)
- if port.is_a(ns.lv2.InputPort):
- if port.is_a(ns.lv2.AudioPort):
- audio_input_buffers.append(numpy.array(channels[len(audio_input_buffers)], numpy.float32))
- instance.connect_port(index, audio_input_buffers[-1])
- elif port.is_a(ns.lv2.ControlPort):
- default = float(port.get(ns.lv2.default))
- control_input_buffers.append(numpy.array([default], numpy.float32))
- instance.connect_port(index, control_input_buffers[-1])
- else:
- raise ValueError("Unhandled port type")
- elif port.is_a(ns.lv2.OutputPort):
- if port.is_a(ns.lv2.AudioPort):
- audio_output_buffers.append(numpy.array([0] * wav_in.nframes, numpy.float32))
- instance.connect_port(index, audio_output_buffers[-1])
- elif port.is_a(ns.lv2.ControlPort):
- control_output_buffers.append(numpy.array([0], numpy.float32))
- instance.connect_port(index, control_output_buffers[-1])
- else:
- raise ValueError("Unhandled port type")
-
- # Run the plugin:
- instance.run(wav_in.nframes)
-
- # Interleave output buffers:
- data = numpy.dstack(audio_output_buffers).flatten()
-
- # Return to original int range:
- if wav_in.signed:
- data = data * float(wav_in.range / 2)
- else:
- data = (data + 1) * float(wav_in.range/2)
-
- # Write output file in chunks to stop memory usage getting out of hand:
- CHUNK_SIZE = 8192
- for chunk in numpy.array_split(data, CHUNK_SIZE):
- wav_out.writeframes(wave.struct.pack("%u%s" % (len(chunk), wav_in.struct_fmt_code), *chunk.astype(int)))
- wav_out.close()
-
-
- if __name__ == "__main__":
- main()
|