|
-
- from __future__ import print_function
- import threading
- import rtaudio as rt
-
- from math import cos
-
- import struct
-
-
- class audio_generator:
- def __init__(self):
- self.idx = -1
- self.freq = 440.
- def __call__(self):
- self.idx += 1
- if self.idx%48000 == 0:
- self.freq *= 2**(1/12.)
- return 0.5*cos(2.*3.1416*self.freq*self.idx/48000.)
-
-
- class callback:
- def __init__(self, gen):
- self.gen = gen
- self.i = 0
- def __call__(self,playback, capture):
- [struct.pack_into("f", playback, 4*o, self.gen()) for o in range(256)]
- self.i = self.i + 256
- if self.i > 48000*10:
- print('.')
- return 1
-
- try:
- # if we have numpy, replace the above class
- import numpy as np
- class callback:
- def __init__(self, gen):
- print('Using Numpy.')
- self.freq = 440.
- t = np.arange(256, dtype=np.float32) / 48000.0
- self.phase = 2*np.pi*t
- self.inc = 2*np.pi*256/48000
- self.k = 0
- def __call__(self, playback, capture):
- # Calculate sinusoid using numpy vector operation, as
- # opposed to per-sample computations in the generator
- # above that must be collected and packed one at a time.
- self.k += 256
- if self.k > 48000:
- self.freq *= 2**(1/12.)
- self.k = 0
- self.phase += self.inc
- samples = 0.5*np.cos(self.phase * self.freq)
-
- # Ensure result is the right size!
- assert samples.shape[0] == 256
- assert samples.dtype == np.float32
-
- # Use numpy array view to do a once-copy into memoryview
- # (ie. we only do a single byte-wise copy of the final
- # result into 'playback')
- usamples = samples.view(dtype=np.uint8)
- playback_array = np.array(playback, copy=False)
- np.copyto(playback_array, usamples)
- except ModuleNotFoundError:
- print('Numpy not available, using struct.')
-
- dac = rt.RtAudio()
-
- n = dac.getDeviceCount()
- print('Number of devices available: ', n)
-
- for i in range(n):
- try:
- print(dac.getDeviceInfo(i))
- except rt.RtError as e:
- print(e)
-
-
- print('Default output device: ', dac.getDefaultOutputDevice())
- print('Default input device: ', dac.getDefaultInputDevice())
-
- print('is stream open: ', dac.isStreamOpen())
- print('is stream running: ', dac.isStreamRunning())
-
- oParams = {'deviceId': 0, 'nChannels': 1, 'firstChannel': 0}
- iParams = {'deviceId': 0, 'nChannels': 1, 'firstChannel': 0}
-
- try:
- dac.openStream(oParams,oParams,48000,256,callback(audio_generator()) )
- except rt.RtError as e:
- print(e)
- else:
- dac.startStream()
-
- import time
- print('latency: ', dac.getStreamLatency())
-
- while (dac.isStreamRunning()):
- time.sleep(0.1)
-
- print(dac.getStreamTime())
-
- dac.stopStream()
- dac.abortStream()
- dac.closeStream()
|