"""SeparateLeadStereo, with Time-Frequency choice
Provides a class (``SeparateLeadProcess``) within which several
processings can be run on an audio file, in order to extract the
lead instrument/main voice from a (stereophonic) audio mixture.
copyright (C) 2011 - 2013 Jean-Louis Durrieu
"""
# Importing necessary packages:
import numpy as np
try:
import scipy.io.wavfile as wav
except ImportError:
import scipy
spver = scipy.__version__
raise ImportError('Version of scipy is %s, ' %(spver) + \
'to read WAV files, one needs >= 0.7.0')
from .SIMM import SIMM as SIMM
import os
import warnings
# importing the cython version of tracking:
#from tracking import viterbiTrackingArray
from .tracking._tracking import viterbiTracking as viterbiTrackingArray
# the following import gets useful functions for this class:
from . import separateLeadFunctions as slf
import scipy.optimize
from ..tftransforms import tft # time-freq transforms
eps = 10 ** -9
knownTransfos = ['stft', 'hybridcqt', 'minqt',
'cqt', 'mqt'] # TODO: 'cqt', 'erb'?
[docs]class SeparateLeadProcess():
"""SeparateLeadProcess
class which implements the source separation algorithm, separating the
'lead' voice from the 'accompaniment'. It can deal automatically with the
task (the 'lead' voice becomes the most energetic one), or can be manually
told what the 'lead' is (through the melody line).
**Attributes**
dataType : dtype
this is the input data type (usually the same as the audio encoding)
displayEvolution : boolean
display the evolution of the arrays (notably HF0)
F, N : integer, integer
F the number of frequency bins in the time-frequency representation
(this is half the Fourier bins, + 1)
N the number of analysis input frames
files :
dictionary containing the filenames of the output files for the
separated signals, with the following keys (after initialization)
'inputAudioFilename' : input filename
'mus_output_file' : output filename for the estimated
'accompaniment', appending '_acc.wav' to the radical.
'outputDirSuffix' : the subfolder name to be appended to the path
of the directory of the input file, the output files will be
written in that subfolder
'outputDir' : the full path of the output files directory
'pathBaseName' : base name for the output files
(full path + radical for all output files)
'pitch_output_file' : output filename for the estimated melody line
appending '_pitches.txt' to the radical.
'voc_output_file' : output filename for the estimated 'lead
instrument', appending '_voc.wav' to the radical.
Additionally, the estimated 'accompaniment' and 'lead' with unvoiced
parts estimation are written to the corresponding filename without
these unvoiced parts, to which '_VUIMM.wav' is appended.
imageCanvas : instance from MplCanvas or MplCanvas3Axes
canvas used to draw the image of HF0
scaleData : double
maximum value of the input data array.
With scipy.io.wavfile, the data array type is integer, and does not
fit well with the algorithm, so we need this scaleData parameter to
navigate back and forth between the double and integer representation.
scopeAllowedHF0 : double
scope of allowed F0s around the estimated/given melody line
stftParams : dictionary with the parameters for the time-frequency
representation (Short-Time Fourier Transform - STFT), with the keys:
'hopsize' : the step, in number of samples, between analysis
frames for the STFT
'NFT' : the number of Fourier bins on which the Fourier transforms
are computed.
'windowSizeInSamples' : analysis frame length, in samples
SIMMParams : dictionary with the parameters of the SIMM model
(Smoothed Instantaneous Mixture Model [DRDF2010]_), with following keys:
'alphaL', 'alphaR' : double
stereo model, panoramic parameters for the lead part
'betaL', 'betaR' : (R,) ndarray
stereo model, panoramic parameters for each of the component of
the accompaniment part.
'chirpPerF0' : integer
number of F0s between two 'stable' F0s, modelled
as chirps.
'F0Table' : (NF0,) ndarray
frequency in Hz for each of the F0s appearing in WF0
'HF0' : (NF0*chirpPerF0, N) ndarray, *estimated*
amplitude array corresponding to the different F0s (this is
what you want if you want the visualisation representation of
the pitch saliances).
'HF00' : (NF0*chirpPerF0, N) ndarray, *estimated*
amplitude array HF0, after being zeroed everywhere outside
the given scope from the estimated melody
'HGAMMA' : (P, K) ndarray, *estimated*
amplitude array corresponding to the different smooth shapes,
decomposition of the filters on the smooth shapes in WGAMMA
'HM' : (R, N) ndarray, *estimated*
amplitude array corresponding to the decomposition of the
accompaniment on the spectral shapes in WM
'HPHI' : (K, N) ndarray, *estimated*
amplitude array corresponding to the decomposition of the
filter part on the filter spectral shapes in WPHI, defined
as np.dot(WGAMMA, HGAMMA)
'K' : integer
number of filters for the filter part decomposition
'maxF0' : double
the highest F0 candidate
'minF0' : double
the lowest F0 candidate
'NF0' : integer
number of F0s in total
'niter' : integer
number of iterations for the estimation algorithm
'P' : integer
number of smooth spectral shapes for the filter part (in WGAMMA)
'R' : integer
number of spectral shapes for the accompaniment part (in WM)
'stepNotes' : integer
number of F0s between two semitones
'WF0' : (F, NF0*chirpPerF0) ndarray, *fixed*
'dictionary' of harmonic spectral shapes for the F0 candidates
generated thanks to the KLGLOTT88 model [DRDF2010]
'WGAMMA' : (F, P) ndarray, *fixed*
'dictionary' of smooth spectral shapes for the filter part
'WM' : (F, R) ndarray, *estimated*
array of spectral shapes that are directly *estimated* on the
signal
verbose : boolean
if True, the program writes some information about what is happening
wavCanvas : instance from MplCanvas or MplCanvas3Axes
the canvas that is going to be used to draw the input audio waveform
XL, XR : (F, N) ndarray
resp. left and right channel STFT arrays
**Methods**
Constructor : reads the input audio file, computes the STFT,
generates the different dictionaries (for the source part,
harmonic patterns WF0, and for the filter part, smooth
patterns WGAMMA).
automaticMelodyAndSeparation :
launches sequence of methods to estimate the parameters, estimate the
melody, then re-estimate the parameters and at last separate the
lead from the rest, considering the lead is the most energetic source
of the mixture (with some continuity regularity)
estimSIMMParams :
estimates the parameters of the SIMM, i.e. HF0, HPHI, HGAMMA, HM and WM
estimStereoSIMMParams :
estimates the parameters of the stereo version of the SIMM,
i.e. same parameters as estimSIMMParams, with the alphas and betas
estimStereoSUIMMParams :
same as above, but first adds 'noise' components to the source part
initiateHF0WithIndexBestPath :
computes the initial HF0, before the estimation, given the melody line
(estimated or not)
runViterbi :
estimates the melody line from HF0, the energies of each F0 candidates
setOutputFileNames :
triggered when the text fields are changed, changing the output
filenames
writeSeparatedSignals :
computing and writing the adaptive Wiener filtered separated files
:py:func:`writeSeparatedSignalsWithUnvoice` :
computing and writing the adaptive Wiener filtered separated files,
unvoiced parts.
**References**
This is a class that encapsulates our work on source separation,
published as:
.. [DDR2011] J.-L. Durrieu, B. David and G. Richard,
A Musically Motivated Mid-Level Representation
For Pitch Estimation And Musical Audio Source Separation,
IEEE Journal of Selected Topics on Signal Processing,
October 2011, Vol. 5 (6), pp. 1180 - 1191.
and
.. [DRDF2010] J.-L. Durrieu, G. Richard, B. David and C. F\'evotte,
Source/Filter Model for Main Melody Extraction
From Polyphonic Audio Signals,
IEEE Transactions on Audio, Speech and Language Processing,
special issue on Signal Models and Representations of Musical
and Environmental Sounds, March 2010, vol. 18 (3), pp. 564 -- 575.
As of 3/1/2012, available at http://www.durrieu.ch/research
"""
# files : dict containing filename to I/O
# stftParams : dict containing the parameters for the STFT
files = {}
stftParams = {}
SIMMParams = {}
def __init__(self, inputAudioFilename,
windowSize=0.0464, hopsize=None, NFT=None, nbIter=10,
numCompAccomp=40,
minF0=39, maxF0=2000, stepNotes=16,
chirpPerF0=1,
K_numFilters=4,
P_numAtomFilters=30,
imageCanvas=None, wavCanvas=None,
progressBar=None,
verbose=True,
outputDirSuffix='/',
minF0search=None,
maxF0search=None,
tfrepresentation='stft',
cqtfmax=4000,
cqtfmin=50,
cqtbins=48,
cqtWinFunc=slf.minqt.sqrt_blackmanharris,
cqtAtomHopFactor=0.25,
initHF00='random',
freeMemory=True):
"""During init, process is initiated, STFTs are computed,
and the parameters are stored.
**Parameters**
inputAudioFilename : string
filename of the input audio file
windowSize : double, optional
analysis frame ('windows') size, in s. By default, 0.0464s
nbIter : integer, optional
number of iterations for the estimation algorithm. By default, 10
numCompAccomp : integer, optional
number of components for the accompaniment, default = 40
minF0 : double/integer, optional
lowest F0 candidate (in Hz), default=60Hz
maxF0 : double/integer, optional
highest F0 candidate (in Hz), default=2000Hz
stepNotes : integer, optional
number of F0 candidates in one semitone, default=16 F0s/semitone
K_numFilters : integer, optional
number of filter spectral shapes, default=4
P_numAtomFilters : integer, optional
number of atomic filter smooth spectral shapes, default=30
imageCanvas : MplCanvas/MplCanvas3Axes, optional
an instance of the MplCanvas/MplCanvas3Axes, giving access to the
axes where to draw the HF0 image. By default=None
wavCanvas : MplCanvas/MplCanvas3Axes, optional
an instance of the MplCanvas/MplCanvas3Axes, giving access to the
axes to draw the waveform of the input signal.
progressBar : boolean, optional ???
???
verbose : boolean, optional
Whether to write out or not information about the evolution of the
algorithm. By default=False.
outputDirSuffix : string, optional
the subfolder name (to be appended to the full path to the audio
signal), where the output files are going to be written. By default
='/'
"""
# discarding upper case letters from the input stri
tfrepresentation = tfrepresentation.lower()
if tfrepresentation not in knownTransfos:
raise AttributeError("The desired Time-Freq representation "+
tfrepresentation+" is not a recognized one.\n"+
"Please choose from "+str(knownTransfos))
self.tfrepresentation = tfrepresentation
# representation specific parameters:
if self.tfrepresentation in knownTransfos:
self.stftParams['cqtfmin'] = cqtfmin
self.stftParams['cqtfmax'] = cqtfmax
self.stftParams['cqtbins'] = cqtbins
self.stftParams['cqtWinFunc'] = cqtWinFunc
self.stftParams['cqtAtomHopFactor'] = cqtAtomHopFactor
self.files['inputAudioFilename'] = str(inputAudioFilename)
self.imageCanvas = imageCanvas
self.wavCanvas = wavCanvas
self.displayEvolution = True
self.verbose=verbose
if self.imageCanvas is None:
self.displayEvolution = False
if inputAudioFilename[-4:] != ".wav":
raise ValueError("File not WAV file? Only WAV format support, "+\
"for now...")
self.files['outputDirSuffix'] = outputDirSuffix
self.files['outputDir'] = str('/').join(\
self.files['inputAudioFilename'].split('/')[:-1])+\
'/'+self.files['outputDirSuffix'] +'/'
if os.path.isdir(self.files['outputDir']):
print "Output directory already existing - "+\
"NB: overwriting files in:"
print self.files['outputDir']
else:
print "Creating output directory"
print self.files['outputDir']
os.mkdir(self.files['outputDir'])
self.files['pathBaseName'] = self.files['outputDir'] + \
self.files['inputAudioFilename'\
].split('/')[-1][:-4]
self.files['mus_output_file'] = str(self.files['pathBaseName']+\
'_acc.wav')
self.files['voc_output_file'] = str(self.files['pathBaseName']+\
'_lead.wav')
self.files['pitch_output_file'] = str(self.files['pathBaseName']+\
'_pitches.txt')
print "Writing the different following output files:"
print " separated lead in", \
self.files['voc_output_file']
print " separated accompaniment in", \
self.files['mus_output_file']
print " separated lead + unvoc in", \
self.files['voc_output_file'][:-4] + '_VUIMM.wav'
print " separated acc - unvoc in", \
self.files['mus_output_file'][:-4] + '_VUIMM.wav'
print " estimated pitches in", \
self.files['pitch_output_file']
# read the WAV file and store the STFT
self.fs, data = wav.read(self.files['inputAudioFilename'])
# for some bad format wav files, data is a str?
# cf. files from beat/tempo evaluation campaign of MIREX
## print self.fs, data
self.scaleData = 1.2 * np.abs(data).max() # to rescale the data.
self.dataType = data.dtype
data = np.double(data) / self.scaleData # makes data vary from -1 to 1
if data.shape[0] == data.size: # data is multi-channel
print "The audio file is not stereo. Making stereo out of mono."
print "(You could also try the older separateLead.py...)"
data = np.vstack([data, data]).T
self.numberChannels = 1
if data.shape[1] != 2:
print "The data is multichannel, but not stereo... \n"
print "Unfortunately this program does not scale well. Data is \n"
print "reduced to its 2 first channels.\n"
data = data[:,0:2]
self.numberChannels = data.shape[1]
# parameters for the STFT:
self.stftParams['windowSizeInSamples'] = \
slf.nextpow2(np.round(windowSize * self.fs))
if hopsize is None:
self.stftParams['hopsize'] = (
self.stftParams['windowSizeInSamples'] / 8.)
else:
self.stftParams['hopsize'] = np.double(hopsize)
if NFT is None:
self.stftParams['NFT'] = self.stftParams['windowSizeInSamples']
else:
self.stftParams['NFT'] = NFT
# offsets are the number of samples added to the beginning of data
# during the TF representation computation:
# TODO: make this less of a hack?
self.stftParams['offsets'] = {
'stft': self.stftParams['windowSizeInSamples'] / 2,
'minqt': 0,
'mqt': 0,
'hybridcqt': 0,
'cqt': 0,}
self.SIMMParams['niter'] = nbIter
self.SIMMParams['R'] = numCompAccomp
##self.XR, F, N = slf.stft(data[:,0], fs=self.fs,
## hopsize=self.stftParams['hopsize'] ,
## window=slf.sinebell(\
## self.stftParams['windowSizeInSamples']),
## nfft=self.stftParams['NFT'] )
##self.XL, F, N = slf.stft(data[:,1], fs=self.fs,
## hopsize=self.stftParams['hopsize'] ,
## window=slf.sinebell(\
## self.stftParams['windowSizeInSamples']),
## nfft=self.stftParams['NFT'] )
# non need to store this.
## self.SXR = np.abs(self.XR) ** 2
## self.SXL = np.abs(self.XL) ** 2
# drawing the waveform to wavCanvas:
if not(self.wavCanvas is None):
if self.wavCanvas==self.imageCanvas:
self.wavCanvas.ax2.clear()
self.wavCanvas.ax2.plot(np.arange(data.shape[0]) / \
np.double(self.stftParams['hopsize']),\
data)
#self.wavCanvas.ax2.plot(np.arange(data.shape[0]) / \
# np.double(self.fs), \
# data)
self.wavCanvas.ax2.axis('tight')
self.wavCanvas.draw()
else:
self.wavCanvas.ax.clear()
self.wavCanvas.ax.plot(np.arange(data.shape[0]) / \
np.double(self.fs), \
data)
self.wavCanvas.ax.axis('tight')
self.wavCanvas.draw()
del data
# TODO: also process these as options:
self.SIMMParams['minF0'] = minF0
self.SIMMParams['maxF0'] = maxF0
self.F = self.stftParams['NFT']/2 + 1
# self.F, self.N = self.XR.shape
# this is the number of F0s within one semitone
self.SIMMParams['stepNotes'] = stepNotes
# number of spectral shapes for the filter part
self.SIMMParams['K'] = K_numFilters
# number of elements in dictionary of smooth filters
self.SIMMParams['P'] = P_numAtomFilters
# number of chirped spectral shapes between each F0
# this feature should be further studied before
# we find a good way of doing that.
self.SIMMParams['chirpPerF0'] = chirpPerF0
self.scopeAllowedHF0 = 4.0 / 1.0
# Create the harmonic combs, for each F0 between minF0 and maxF0:
self.SIMMParams['initHF00'] = initHF00
self.computeWF0()
# for debug:
if False: #DEBUG
self.imageCanvas.ax.imshow(np.log(np.abs(self.XR)),
aspect='auto',origin='lower')
self.imageCanvas.draw()
raise KeyboardInterrupt("Check these matrices !")
if False: #DEBUG
from IPython.Shell import IPShellEmbed
ipshell = IPShellEmbed()
ipshell()
plt.figure()
plt.imshow(np.log(self.SIMMParams['WF0']),
aspect='auto',
origin='lower',)
plt.figure()
plt.imshow(np.log(np.abs(self.XR)),aspect='auto',origin='lower')
# Create the dictionary of smooth filters, for the filter part of
# the lead isntrument:
self.SIMMParams['WGAMMA'] = \
slf.generateHannBasis(numberFrequencyBins=self.F,
sizeOfFourier=self.stftParams['NFT'],
Fs=self.fs,
frequencyScale='linear',
numberOfBasis=self.SIMMParams['P'],
overlap=.75)
self.trackingParams = {}
self.trackingParams['minF0search'] = self.SIMMParams['minF0']
self.trackingParams['maxF0search'] = self.SIMMParams['maxF0']
if minF0search is not None:
self.trackingParams['minF0search'] = minF0search
if maxF0search is not None:
self.trackingParams['maxF0search'] = maxF0search
print "Some parameter settings:"
print " Size of analysis windows: ", \
self.stftParams['windowSizeInSamples']
print " Hopsize: ", self.stftParams['hopsize']
print " Size of Fourier transforms: ", self.stftParams['NFT']
print " Number of iterations to be done: ",self.SIMMParams['niter']
print " Number of elements in WM: ", self.SIMMParams['R']
self.freeMemory = freeMemory
[docs] def setOutputFileNames(self, outputDirSuffix):
"""
If already loaded a wav file, at this point, we can redefine
where we want the output files to be written.
Could be used, for instance, between the first estimation or the
Viterbi smooth estimation of the melody, and the re-estimation
of the parameters.
"""
print "Redefining the Output Filenames !"
self.files['outputDirSuffix'] = outputDirSuffix
self.files['outputDir'] = str('/').join(\
self.files['inputAudioFilename'].split('/')[:-1])+\
'/'+self.files['outputDirSuffix'] +'/'
if os.path.isdir(self.files['outputDir']):
print "Output directory already existing - "+\
"NB: overwriting files in:"
print self.files['outputDir']
else:
print "Creating output directory"
print self.files['outputDir']
os.mkdir(self.files['outputDir'])
self.files['pathBaseName'] = self.files['outputDir'] + \
self.files[\
'inputAudioFilename'].split('/')[-1][:-4]
self.files['mus_output_file'] = str(self.files['pathBaseName']+\
'_acc.wav')
self.files['voc_output_file'] = str(self.files['pathBaseName']+\
'_lead.wav')
self.files['pitch_output_file'] = str(self.files['pathBaseName']+\
'_pitches.txt')
print "Writing the different following output files:"
print " separated lead in", \
self.files['voc_output_file']
print " separated accompaniment in", \
self.files['mus_output_file']
print " separated lead + unvoc in", \
self.files['voc_output_file'][:-4] + '_VUIMM.wav'
print " separated acc - unvoc in", \
self.files['mus_output_file'][:-4] + '_VUIMM.wav'
print " estimated pitches in", \
self.files['pitch_output_file']
[docs] def computeWF0(self):
"""Computes the frequency basis for the source part of SIMM,
if tfrepresentation is a CQT, it also computes the cqt/hybridcqt
transform object.
"""
if self.tfrepresentation == 'stftold':
self.SIMMParams['F0Table'], WF0 = (
slf.generate_WF0_chirped(
minF0=self.SIMMParams['minF0'],
maxF0=self.SIMMParams['maxF0'],
Fs=self.fs,
Nfft=self.stftParams['NFT'],
stepNotes=self.SIMMParams['stepNotes'],
lengthWindow=
self.stftParams['windowSizeInSamples'],
Ot=0.5, # 20130130 used to be 0.25
perF0=self.SIMMParams['chirpPerF0'],
depthChirpInSemiTone=.15,
loadWF0=True,
analysisWindow='sinebell'))
self.SIMMParams['WF0'] = WF0[:self.F, :] # ensure same size as SX
# number of harmonic combs
self.SIMMParams['NF0'] = self.SIMMParams['F0Table'].size
# Normalization:
# by max or by sum?
self.SIMMParams['WF0'] = (
self.SIMMParams['WF0'] /
np.sum(self.SIMMParams['WF0'], axis=0))
elif self.tfrepresentation in ['hybridcqt', 'minqt'] and False:
if self.verbose:
print " Compute WF0, with MinQT transform"
print " - potentially (very) long -"
if self.verbose>1:
print self.stftParams
cqtwindowlength = np.ceil(
self.fs /
(self.stftParams['cqtfmin'] *
(2.**(1./self.stftParams['cqtbins']) - 1))
)
self.SIMMParams['F0Table'], WF0, self.mqt = (
slf.generate_WF0_MinQT_chirped(
minF0=self.SIMMParams['minF0'],
maxF0=self.SIMMParams['maxF0'],
cqtfmax=self.stftParams['cqtfmax'],
cqtfmin=self.stftParams['cqtfmin'],
cqtbins=self.stftParams['cqtbins'],
Fs=self.fs,
Nfft=self.stftParams['NFT'],
stepNotes=self.SIMMParams['stepNotes'],
lengthWindow=cqtwindowlength,
# self.stftParams['windowSizeInSamples'],
Ot=0.5,
perF0=self.SIMMParams['chirpPerF0'],
depthChirpInSemiTone=.5,
loadWF0=True,
cqtWinFunc=self.stftParams['cqtWinFunc'],
atomHopFactor=self.stftParams['cqtAtomHopFactor'],
analysisWindow='sinebell',
verbose=self.verbose)
)
self.SIMMParams['WF0'] = WF0 / np.sum(WF0, axis=0)
# number of harmonic combs
self.SIMMParams['NF0'] = self.SIMMParams['F0Table'].size
# self.F is different from the STFT one, resetting it:
self.F = WF0.shape[0]
self.stftParams['hopsize'] = self.mqt.cqtkernel.atomHOP
self.stftParams['NFT'] = self.mqt.cqtkernel.linFTLen
self.stftParams['windowSizeInSamples'] = (
self.mqt.cqtkernel.linFTLen
* (2 **(self.mqt.octaveNr-1))
) # 20130405T0355 DJL should guarantee better for chunk sizes
else:
self.mqt = tft.tftransforms[self.tfrepresentation](
fmin=self.stftParams['cqtfmin'],
fmax=self.stftParams['cqtfmax'],
bins=self.stftParams['cqtbins'],
fs=self.fs,
linFTLen=self.stftParams['NFT'],
atomHopFactor=self.stftParams['cqtAtomHopFactor'],
winFunc=self.stftParams['cqtWinFunc'],
perfRast=1,
verbose=self.verbose
)
self.SIMMParams['F0Table'], WF0, self.mqt = (
slf.generate_WF0_TR_chirped(
transform=self.mqt,
minF0=self.SIMMParams['minF0'],
maxF0=self.SIMMParams['maxF0'],
stepNotes=self.SIMMParams['stepNotes'],
Ot=0.5, perF0=self.SIMMParams['chirpPerF0'],
depthChirpInSemiTone=0.5, loadWF0=True,
verbose=self.verbose))
self.SIMMParams['WF0'] = WF0 / np.sum(WF0, axis=0)
# number of harmonic combs
self.SIMMParams['NF0'] = self.SIMMParams['F0Table'].size
# self.F is different from the STFT one, resetting it:
self.F = WF0.shape[0]
if hasattr(self.mqt, 'cqtkernel'):
# updating the following parameters if the transform
# is a CQT type transform.
self.stftParams['hopsize'] = self.mqt.cqtkernel.atomHOP
self.stftParams['NFT'] = self.mqt.cqtkernel.FFTLen
#self.stftParams['windowSizeInSamples'] = (
# self.mqt.cqtkernel.Nk_max
# * self.mqt.octaveNr) # to be ckeched...
self.stftParams['windowSizeInSamples'] = (
self.mqt.cqtkernel.FFTLen
* (2**(self.mqt.octaveNr-1))
) # 20130405T0355 DJL better maybe...
[docs] def computeMonoX(self, start=0, stop=None):
"""Computes and return SX, the mono channel or mean over the
channels of the power spectrum of the signal
"""
fs, data = wav.read(self.files['inputAudioFilename'])
data = np.double(data) / self.scaleData
if len(data.shape)>1 and data.shape[1]>1:
data = data.mean(axis=1)
if self.tfrepresentation == 'stft':
X, F, N = slf.stft(data, fs=self.fs,
hopsize=self.stftParams['hopsize'] ,
window=slf.sinebell(\
self.stftParams['windowSizeInSamples']),
nfft=self.stftParams['NFT'] ,
start=start, stop=stop)
del data, F, N
self.F, _ = X.shape
# careful ! F and N are therefore for the whole signal!
# X = X[:,start:stop]
return np.maximum(np.abs(X)**2, 10 ** -8)
elif self.tfrepresentation in knownTransfos:
# start is in frames, same for stop, therefore have to convert
# according to parameters of hybridcqt:
start *= self.mqt.cqtkernel.atomHOP
if stop is not None:
# stop *= self.mqt.cqtkernel.atomHOP
stop = (stop - 1) * self.mqt.cqtkernel.atomHOP
stop += self.stftParams['windowSizeInSamples'] #20130318
else:
stop = data.shape[0]
data = data[start:stop]
self.mqt.computeTransform(data=data)
SX = np.maximum(
np.abs(self.mqt.transfo)**2,
10 ** -8)
del self.mqt.transfo
return SX
[docs] def computeNFrames(self):
"""
compute Nb Frames:
"""
if not hasattr(self, 'totFrames'):
if self.tfrepresentation in knownTransfos:
# NB for hybridcqt should be the same formula,
# but the values are a bit different in nature.
fs, data = wav.read(self.files['inputAudioFilename'])
self.lengthData = data.shape[0]
self.totFrames = (
np.int32(np.ceil((self.lengthData -
0) / # self.stftParams['windowSizeInSamples']) /
self.stftParams['hopsize']
+ 1) + 1)# same number as in slf.stft
)
self.N = self.totFrames
return self.totFrames
[docs] def computeStereoX(self, start=0, stop=None, ):
fs, data = wav.read(self.files['inputAudioFilename'])
data = np.double(data) / self.scaleData
if self.tfrepresentation == 'stft':
starttime = start * self.stftParams['hopsize']
if stop is not None:
stoptime = stop * self.stftParams['hopsize']
else:
stoptime = data.shape[0]
self.originalDataLen = stoptime - starttime
if len(data.shape)>1:
self.XR, F, N = slf.stft(
data[:, 0],#[starttime:stoptime,0],
fs=self.fs,
hopsize=self.stftParams['hopsize'] ,
window=slf.sinebell(
self.stftParams['windowSizeInSamples']),
nfft=self.stftParams['NFT'],
start=start, stop=stop )
# not very useful in practice
else:
self.XR, F, N = slf.stft(
data,#[starttime:stoptime],
fs=self.fs,
hopsize=self.stftParams['hopsize'] ,
window=slf.sinebell(
self.stftParams['windowSizeInSamples']),
nfft=self.stftParams['NFT'],
start=start, stop=stop )
#self.XR = self.XR[:,start:stop]
if len(data.shape)>1 and data.shape[1]>1:
self.XL, F, N = slf.stft(
data[:,1], #[starttime:stoptime,1],
fs=self.fs,
hopsize=self.stftParams['hopsize'] ,
window=slf.sinebell(
self.stftParams['windowSizeInSamples']),
nfft=self.stftParams['NFT'],
start=start, stop=stop)
else:
self.XL = self.XR
del F, N
elif self.tfrepresentation in knownTransfos:
start *= self.mqt.cqtkernel.atomHOP
if stop is not None:
# stop *= self.mqt.cqtkernel.atomHOP
stop = (stop - 1) * self.mqt.cqtkernel.atomHOP
stop += self.stftParams['windowSizeInSamples'] #20130318
else:
stop = data.shape[0]
# also works for multi channel data:
data = data[start:stop]
if len(data.shape)>1:
self.mqt.computeTransform(data=data[:,0])
else:
self.mqt.computeTransform(data=data)
self.XR = np.copy(self.mqt.transfo)
del self.mqt.transfo
if len(data.shape)>1 and data.shape[1]>1:
self.mqt.computeTransform(data=data[:,1])
self.XL = np.copy(self.mqt.transfo)
del self.mqt.transfo
else:
# hybt.computeHybrid(data=data)
self.XL = self.XR
else:
raise AttributeError(self.tfrepresentation
+ " not fully implemented.")
#self.XL = self.XL[:,start:stop]
del data
self.F, _ = self.XR.shape
def computeStereoSX(self, start=0, stop=None, ):
fs, data = wav.read(self.files['inputAudioFilename'])
data = np.double(data) / self.scaleData
if self.tfrepresentation == 'stft':
starttime = start * self.stftParams['hopsize']
if stop is not None:
stoptime = stop * self.stftParams['hopsize']
else:
stoptime = data.shape[0]
self.originalDataLen = stoptime - starttime
if len(data.shape)>1: # multichannel
XR, F, N = slf.stft(
data[:,0], #[starttime:stoptime,0],
fs=self.fs,
hopsize=self.stftParams['hopsize'] ,
window=slf.sinebell(
self.stftParams['windowSizeInSamples']),
nfft=self.stftParams['NFT'] ,
start=start, stop=stop)
else: # single channel
XR, F, N = slf.stft(
data,#[starttime:stoptime],
fs=self.fs,
hopsize=self.stftParams['hopsize'] ,
window=slf.sinebell(
self.stftParams['windowSizeInSamples']),
nfft=self.stftParams['NFT'] ,
start=start, stop=stop)
SXR = np.maximum(np.abs(XR)**2, 1e-8)
del XR
#XR = XR[:,start:stop]
if len(data.shape)>1 and data.shape[1]>1:
XL, F, N = slf.stft(
data[:,1],#[starttime:stoptime,1],
fs=self.fs,
hopsize=self.stftParams['hopsize'] ,
window=slf.sinebell(
self.stftParams['windowSizeInSamples']),
nfft=self.stftParams['NFT'] ,
start=start, stop=stop)
SXL = np.maximum(np.abs(XL)**2, 1e-8)
del XL, F, N
else:
SXL = SXR
elif self.tfrepresentation in knownTransfos:
start *= self.mqt.cqtkernel.atomHOP
if stop is not None:
stop = (stop - 1) * self.mqt.cqtkernel.atomHOP
stop += self.stftParams['windowSizeInSamples'] #20130318
else:
stop = data.shape[0]
# also works for multi channel data:
data = data[start:stop]
if len(data.shape)>1:
self.mqt.computeTransform(data=data[:,0])
else:
self.mqt.computeTransform(data=data)
SXR = np.maximum(np.abs(self.mqt.transfo)**2,10 ** -8)
del self.mqt.transfo
if len(data.shape)>1 and data.shape[1]>1:
self.mqt.computeTransform(data=data[:,1])
SXL = np.maximum(np.abs(self.mqt.transfo)**2,10 ** -8)
del self.mqt.transfo
else:
# hybt.computeHybrid(data=data)
SXL = SXR
else:
raise NotImplementedError("Transform %s not fully implemented"
%self.tfrepresentation)
#XL = XL[:,start:stop]
del data
self.F, _ = SXR.shape
return SXR, SXL
def estimSIMMParams(self, R=1):
## section to estimate the melody, on monophonic algo:
SX = self.computeMonoX()
# First round of parameter estimation:
print " Estimating IMM parameters, on mean of channels, with",R,\
"\n accompaniment components."
HGAMMA, HPHI, HF0, HM, WM, recoError1 = SIMM.SIMM(
# the data to be fitted to:
SX,
# the basis matrices for the spectral combs
WF0=self.SIMMParams['WF0'],
# and for the elementary filters:
WGAMMA=self.SIMMParams['WGAMMA'],
# number of desired filters, accompaniment spectra:
numberOfFilters=self.SIMMParams['K'],
numberOfAccompanimentSpectralShapes=R,#self.SIMMParams['R'],
# putting only 2 elements in accompaniment for a start...
# if any, initial amplitude matrices for
HGAMMA0=None, HPHI0=None,
HF00=None,
WM0=None, HM0=None,
# Some more optional arguments, to control the "convergence"
# of the algo
numberOfIterations=self.SIMMParams['niter'],
updateRulePower=1.,
stepNotes=self.SIMMParams['stepNotes'],
lambdaHF0 = 0.0 / (1.0 * SX.max()), alphaHF0=0.9,
verbose=self.verbose,
displayEvolution=self.displayEvolution,
imageCanvas=self.imageCanvas,
F0Table=self.SIMMParams['F0Table'],
chirpPerF0=self.SIMMParams['chirpPerF0'])
self.SIMMParams['HGAMMA'] = HGAMMA
self.SIMMParams['HPHI'] = HPHI
self.SIMMParams['HF0'] = HF0
self.SIMMParams['HM'] = HM
self.SIMMParams['WM'] = WM
del SX
[docs] def estimHF0(self, R=1, maxFrames=1000):
"""
estimating and storing only HF0 for the whole excerpt,
with only
"""
## section to estimate the melody, on monophonic algo:
#SX = self.computeMonoX() # too heavy, try to guess before hand instead
#totFrames = SX.shape[1]
totFrames, nChunks, maxFrames = self.checkChunkSize(maxFrames)
# First round of parameter estimation:
print " Estimating IMM parameters, on mean of channels, with",R,\
"\n accompaniment components."+\
" Nb of chunks: %d." %nChunks
# del SX
self.SIMMParams['HF0'] = np.zeros([self.SIMMParams['NF0'] * \
self.SIMMParams['chirpPerF0'],
totFrames])
for n in range(nChunks):
if self.verbose:
print "Chunk nb", n+1, "out of", nChunks
start = n*maxFrames
stop = np.minimum((n+1)*maxFrames, totFrames)
SX = self.computeMonoX(start=start, stop=stop)
if self.SIMMParams['initHF00'] == 'nnls':
# probably slower than running from random...
HF00 = np.ones((self.SIMMParams['NF0']
* self.SIMMParams['chirpPerF0'],
stop-start))
for framenb in range(stop-start):
if self.verbose>1:
print "frame", framenb
HF00[:,framenb], _ = scipy.optimize.nnls(
self.SIMMParams['WF0'],
SX[:,framenb])
HF00 += eps
else:
HF00 = None
HGAMMA, HPHI, HF0, HM, WM, recoError1 = SIMM.SIMM(
# the data to be fitted to:
SX,
# the basis matrices for the spectral combs
WF0=self.SIMMParams['WF0'],
# and for the elementary filters:
WGAMMA=self.SIMMParams['WGAMMA'],
# number of desired filters, accompaniment spectra:
numberOfFilters=self.SIMMParams['K'],
numberOfAccompanimentSpectralShapes=R,#self.SIMMParams['R'],
# putting only 2 elements in accompaniment for a start...
# if any, initial amplitude matrices for
HGAMMA0=None, HPHI0=None,
HF00=HF00,
WM0=None, HM0=None,
# Some more optional arguments, to control the "convergence"
# of the algo
numberOfIterations=self.SIMMParams['niter'],
updateRulePower=1.,
stepNotes=self.SIMMParams['stepNotes'],
lambdaHF0 = 0.0 / (1.0 * SX.max()), alphaHF0=0.9,
verbose=self.verbose,
displayEvolution=self.displayEvolution,
imageCanvas=self.imageCanvas,
F0Table=self.SIMMParams['F0Table'],
chirpPerF0=self.SIMMParams['chirpPerF0'])
if self.tfrepresentation == 'stft':
self.SIMMParams['HF0'][:,start:stop] = np.copy(HF0)
elif self.tfrepresentation in knownTransfos:
# the first frame of interest in the CQT representation,
# for our purpose at least
startincqt = np.sort(np.where(self.mqt.time_stamps>0)[0])[0]
# and the last:
stopincqt = (startincqt
+ stop - start)
self.SIMMParams['HF0'][:,start:stop] = (
np.copy(HF0[:,startincqt:stopincqt]))
del SX
F0Table=self.SIMMParams['F0Table']
NF0 = self.SIMMParams['NF0'] * self.SIMMParams['chirpPerF0']
db = SIMM.db
imgYticks = np.int32(np.linspace(NF0/5, NF0-1, num=5)).tolist()
notesFreqs = {}
notesFreqs['A4'] = 442
notesFreqs['A2'] = notesFreqs['A4'] / 4
notesFreqs['A3'] = notesFreqs['A4'] / 2
notesFreqs['A5'] = notesFreqs['A4'] * 2
notesFreqs['A6'] = notesFreqs['A4'] * 4
if (F0Table is None):
imgYticklabels = imgYticks
else:
imgYticklabels = np.int32(F0Table[np.array(imgYticks)/
self.SIMMParams['chirpPerF0']
]).tolist()
for k, v in notesFreqs.items():
closestIndex = np.argmin(np.abs(F0Table-v))
if np.abs(12*np.log2(F0Table[closestIndex])-\
12*np.log2(v)) < .25:
imgYticks.append(closestIndex)
imgYticklabels.append(k)
if self.imageCanvas is not None:
self.imageCanvas.ax.clear()
self.imageCanvas.ax.imshow(db(self.SIMMParams['HF0']),
origin='lower',
cmap='jet',
aspect='auto',
interpolation='nearest')
self.imageCanvas.ax.get_images()[0].set_clim(\
np.amax(db(self.SIMMParams['HF0']))-100,\
np.amax(db(self.SIMMParams['HF0'])))
self.imageCanvas.ax.set_yticks(imgYticks)
self.imageCanvas.ax.set_yticklabels(imgYticklabels)
self.imageCanvas.draw()
# self.imageCanvas.updateGeometry()
[docs] def computeChroma(self, maxFrames=3000):
"""Compute the chroma matrix.
"""
if hasattr(self, 'SIMMParams'):
if 'HF0' not in self.SIMMParams.keys():
self.estimHF0(maxFrames=maxFrames)
else:
raise AttributeError("The parameters for the SIMM are not"+\
" well initialized")
if not hasattr(self, 'N'):
warnings.warn("Issues with the attributes, running again"+\
" the estimation.")
self.estimHF0(maxFrames=maxFrames)
self.chroma = np.zeros([12*self.SIMMParams['stepNotes'],
self.computeNFrames()])#self.N])
for n in range(12*self.SIMMParams['stepNotes']):
self.chroma[n] = \
self.SIMMParams['HF0'][\
n::(12*self.SIMMParams['stepNotes'])].mean(axis=0)
self.chroma /= (self.chroma.sum(axis=0))
[docs] def determineTuning(self):
"""Determine Tuning by checking the peaks corresponding
to all possible patterns
"""
if not hasattr(self, 'chroma'):
self.computeChroma()
chromaSummary = self.chroma.sum(axis=1)
patterns = {}
patterns['minorHarmoPattern'] = np.array([0,2,3,5,7,8,10])
patterns['minorMelodPattern'] = np.array([0,2,3,5,7,9,11])
patterns['majorPattern'] = np.array([0,2,4,5,7,9,11])
patterns['andalusPattern'] = np.array([0,1,4,5,7,8,11])
nbPattern = len(patterns.keys())
nbTunings = self.SIMMParams['stepNotes']
nbKey = 12
scoresPerTuning = np.zeros([nbPattern, nbTunings*nbKey])
for ntun in range(nbTunings):
for nk in range(nbKey):
for npatt, pattern in enumerate(patterns.keys()):
scoresPerTuning[npatt, ntun+nk*nbTunings] = \
chromaSummary[np.mod((patterns[pattern]+nk)*\
nbTunings+\
ntun, chromaSummary.size)].sum()
bestTuning = np.argmax(scoresPerTuning)
bestPattern = bestTuning / (nbTunings * nbKey)
bestTuning = bestTuning - bestPattern * (nbTunings * nbKey)
bestKey = bestTuning / nbTunings
bestTuning = bestTuning - bestKey * nbTunings
return scoresPerTuning, bestTuning, \
bestKey, patterns.keys()[bestPattern]
[docs] def automaticMelodyAndSeparation(self):
"""Fully automated estimation of melody and separation of signals.
"""
raise warnings.warn("This function does not work well with framed " +
"estimation.")
self.runViterbi()
self.initiateHF0WithIndexBestPath()
self.estimStereoSIMMParams()
self.writeSeparatedSignals()
self.estimStereoSUIMMParams()
self.writeSeparatedSignalsWithUnvoice()
[docs] def autoMelSepAndWrite(self, maxFrames=1000):
"""Fully automated estimation of melody and separation of signals.
"""
self.estimHF0(maxFrames=maxFrames)
self.runViterbi()
self.initiateHF0WithIndexBestPath()
self.estimStereoSIMMParamsWriteSeps(maxFrames=maxFrames)
def runViterbi(self):
if not('HF0' in self.SIMMParams.keys()):
raise AttributeError("HF0 has probably not been estimated yet.")
##SX = self.computeMonoX() # useless here?
self.computeNFrames() # just to be sure self.N is total nb of frames
# Viterbi decoding to estimate the predominant fundamental
# frequency line
scale = 1.0
NF0 = self.SIMMParams['NF0'] * self.SIMMParams['chirpPerF0']
# only considering the desired range of
nmaxF0 = NF0
nminF0 = 0
# just so that it s easier to manipulate:
minF0 = self.SIMMParams['minF0']
maxF0 = self.SIMMParams['maxF0']
minF0search = self.trackingParams['minF0search']
maxF0search = self.trackingParams['maxF0search']
if minF0search > minF0 and minF0search < maxF0:
nminF0 = (
np.where(self.SIMMParams['F0Table']>=minF0search)[0][0]
* self.SIMMParams['chirpPerF0'])
if maxF0search > minF0 and maxF0search < maxF0 and \
maxF0search > minF0search:
nmaxF0 = (
(np.where(self.SIMMParams['F0Table']>=maxF0search)[0][0] + 1)
* self.SIMMParams['chirpPerF0'])
NF0 = nmaxF0 - nminF0
print nminF0, nmaxF0 #DEBUG
# filling the transitions probabilities
transitions = np.exp(-np.floor(np.arange(0, NF0)/\
self.SIMMParams['stepNotes']) * \
scale)
cutoffnote = 2 * 5 * self.SIMMParams['stepNotes']
cutoffnote = np.minimum(NF0, cutoffnote)
transitions[cutoffnote:] = transitions[cutoffnote - 1]
transitionMatrixF0 = np.zeros([NF0 + 1, NF0 + 1]) # toeplitz matrix
b = np.arange(NF0)
transitionMatrixF0[0:NF0, 0:NF0] = \
transitions[\
np.array(np.abs(np.outer(np.ones(NF0), b) \
- np.outer(b, np.ones(NF0))), dtype=int)]
pf_0 = transitions[cutoffnote - 1] * 10 ** (-90)
p0_0 = transitions[cutoffnote - 1] * 10 ** (-100)
p0_f = transitions[cutoffnote - 1] * 10 ** (-80)
transitionMatrixF0[0:NF0, NF0] = pf_0
transitionMatrixF0[NF0, 0:NF0] = p0_f
transitionMatrixF0[NF0, NF0] = p0_0
sumTransitionMatrixF0 = np.sum(transitionMatrixF0, axis=1)
transitionMatrixF0 = transitionMatrixF0 \
/ np.outer(sumTransitionMatrixF0, \
np.ones(NF0 + 1))
priorProbabilities = 1 / (NF0 + 1.0) * np.ones([NF0 + 1])
logHF0 = np.zeros([NF0 + 1, self.N])
normHF0 = np.amax(self.SIMMParams['HF0'][nminF0:nmaxF0], axis=0)
logHF0[0:NF0, :] = np.log(self.SIMMParams['HF0'][nminF0:nmaxF0])
logHF0[0:NF0, normHF0==0] = np.amin(logHF0[logHF0>-np.Inf])
logHF0[NF0, :] = np.maximum(np.amin(logHF0[logHF0>-np.Inf]),-100)
# free all what s not needed anymore:
del normHF0, transitions, b
print "Running Viterbi algorithm to track the melody, " + \
str(self.N) + " frames."
indexBestPath = viterbiTrackingArray(NF0, self.N,\
logHF0, np.log(priorProbabilities),
np.log(transitionMatrixF0), verbose=False)
indexBestPath += nminF0
print "Viterbi algorithm done..."
# drawing this as a line is actually a bit confusing, on the image
# TODO: think of a better representation (is contour good enough?)
##if self.displayEvolution and not(self.imageCanvas is None):
## self.imageCanvas.ax.plot(indexBestPath, '-b')
## self.imageCanvas.ax.axis('tight')
## self.imageCanvas.draw()
del logHF0
# detection of silences:
if 'HPHI' in self.SIMMParams and False: # in case not estimated
# TODO: this is broken, when nchunks > 1
# needs a fix, maybe keeping relative energy as
# attribute, instead of computing it here.
chirpPerF0 = self.SIMMParams['chirpPerF0']
stepNotes = self.SIMMParams['stepNotes']
HF00 = np.zeros([NF0 * chirpPerF0, self.N])
scopeAllowedHF0 = self.scopeAllowedHF0# 4.0 / 1.0 # 2.0 / 1.0
dim1index = np.array(\
np.maximum(\
np.minimum(\
np.outer(chirpPerF0 * indexBestPath,
np.ones(chirpPerF0 \
* (2 \
* np.floor(stepNotes / \
scopeAllowedHF0) \
+ 1))) \
+ np.outer(np.ones(self.N),
np.arange(-chirpPerF0 \
* np.floor(stepNotes / \
scopeAllowedHF0),
chirpPerF0 \
* (np.floor(stepNotes / \
scopeAllowedHF0) \
+ 1))),
chirpPerF0 * NF0 - 1),
0),
dtype=int).reshape(1, self.N * chirpPerF0 \
* (2 * np.floor(stepNotes/scopeAllowedHF0)\
+ 1))
dim2index = np.outer(np.arange(self.N),
np.ones(chirpPerF0 \
* (2 * np.floor(stepNotes \
/scopeAllowedHF0)+1),\
dtype=int)\
).reshape(1, self.N * chirpPerF0 \
* (2 * np.floor(stepNotes \
/ scopeAllowedHF0) \
+ 1))
HF00[dim1index, dim2index] = self.SIMMParams['HF0'][dim1index,
dim2index]
HF00[:, indexBestPath == (NF0 - 1)] = 0.0
HF00[:, indexBestPath == 0] = 0.0
thres_energy = 0.000584
SF0 = np.maximum(np.dot(self.SIMMParams['WF0'], HF00), eps)
SPHI = np.maximum(np.dot(self.SIMMParams['WGAMMA'], \
np.dot(self.SIMMParams['HGAMMA'],
self.SIMMParams['HPHI'])), eps)
SM = np.maximum(np.dot(self.SIMMParams['WM'], \
self.SIMMParams['HM']), eps)
hatSX = np.maximum(SPHI * SF0 + SM, eps)
SX = self.computeMonoX()
energyMel = np.sum((((SPHI * SF0)/hatSX)**2) * np.abs(SX),
axis=0)
energyMelSorted = np.sort(energyMel)
energyMelCumul = np.cumsum(energyMelSorted)
energyMelCumulNorm = energyMelCumul / max(energyMelCumul[-1], eps)
# normalized to the maximum of energy:
# expressed in 0.01 times the percentage
ind_999 = np.nonzero(energyMelCumulNorm>thres_energy)[0][0]
if ind_999 is None:
ind_999 = self.N
melNotPresent = (energyMel <= energyMelCumulNorm[ind_999])
indexBestPath[melNotPresent] = 0
else:
if self.verbose:
print " Not using energy threshold, since "+\
"parameters were deleted."
freqMelody = self.SIMMParams['F0Table'][np.array(
indexBestPath
/self.SIMMParams['chirpPerF0'],
dtype=int)]
freqMelody[indexBestPath==0] = - freqMelody[indexBestPath==0]
np.savetxt(self.files['pitch_output_file'],
np.array([np.arange(self.N) * \
self.stftParams['hopsize'] / np.double(self.fs),
freqMelody]).T)
self.indexBestPath = indexBestPath
self.freqMelody = freqMelody
def initiateHF0WithIndexBestPath(self):
# Second round of parameter estimation, with specific
# initial HF00:
NF0 = self.SIMMParams['NF0']
chirpPerF0 = self.SIMMParams['chirpPerF0']
stepNotes = self.SIMMParams['stepNotes']
HF00 = np.zeros([NF0 * chirpPerF0, self.N])
scopeAllowedHF0 = self.scopeAllowedHF0 # 2.0 / 1.0
# indexes for HF00:
# TODO: reprogram this with a 'where'?...
dim1index = np.array(\
np.maximum(\
np.minimum(\
np.outer(self.indexBestPath,# * chirpPerF0 #20130610 DJL???
np.ones(chirpPerF0 \
* (2 \
* np.floor(stepNotes / scopeAllowedHF0) \
+ 1))) \
+ np.outer(np.ones(self.N),
np.arange(-chirpPerF0 \
* np.floor(stepNotes / scopeAllowedHF0),
chirpPerF0 \
* (np.floor(stepNotes / scopeAllowedHF0) \
+ 1))),
chirpPerF0 * NF0 - 1),
0),
dtype=int)
dim1index = dim1index[self.indexBestPath!=0,:]
dim1index = dim1index.reshape(1,dim1index.size)
dim2index = np.outer(np.arange(self.N),
np.ones(chirpPerF0 \
* (2 * np.floor(stepNotes \
/ scopeAllowedHF0) + 1), \
dtype=int)\
)
dim2index = dim2index[self.indexBestPath!=0,:]
dim2index = dim2index.reshape(1,dim2index.size)
HF00[dim1index, dim2index] = self.SIMMParams['HF0'].max()
HF00[:, self.indexBestPath == (NF0 - 1)] = 0.0
HF00[:, self.indexBestPath == 0] = 0.0
self.SIMMParams['HF00'] = HF00
[docs] def estimStereoSIMMParamsWriteSeps(self, maxFrames=1000):
"""Estimates the parameters little by little, by chunks,
and sequentially writes the signals. In the end, concatenates all these
separated signals into the desired output files
"""
#SX = self.computeMonoX()
totFrames, nChunks, maxFrames = self.checkChunkSize(maxFrames)
# del SX
# First round of parameter estimation:
print " Estimating IMM parameters, on stereo channels, with",\
self.SIMMParams['R'],\
"\n accompaniment components."+\
" Nb of chunks: %d." %nChunks
self.SIMMParams['HGAMMA'] = None
for n in range(nChunks):
if self.verbose:
print "Chunk nb", n+1, "out of", nChunks
start = n*maxFrames
stop = np.minimum((n+1)*maxFrames, totFrames)
# computing only the power spectra for each channel:
# - not storing the complex spectra -
SXR, SXL = self.computeStereoSX(start=start, stop=stop)
HF00 = np.zeros([self.SIMMParams['NF0']
* self.SIMMParams['chirpPerF0'],
SXR.shape[1]])
if self.tfrepresentation == 'stft':
startinHF00 = 0
stopinHF00 = stop - start
elif self.tfrepresentation in knownTransfos:
startinHF00 = np.sort(np.where(self.mqt.time_stamps>0)[0])[0]
stopinHF00 = startinHF00 + stop - start
else:
raise AttributeError(self.tfrepresentation
+ " not fully implemented.")
HF00[:,startinHF00:stopinHF00] = (
self.SIMMParams['HF00'][:,start:stop])
alphaR, alphaL, HGAMMA, HPHI, HF0, \
betaR, betaL, HM, WM, recoError2 = SIMM.Stereo_SIMM(
# the data to be fitted to:
SXR, SXL,
# the basis matrices for the spectral combs
WF0=self.SIMMParams['WF0'],
# and for the elementary filters:
WGAMMA=self.SIMMParams['WGAMMA'],
# number of desired filters, accompaniment spectra:
numberOfFilters=self.SIMMParams['K'],
numberOfAccompanimentSpectralShapes=self.SIMMParams['R'],
# if any, initial amplitude matrices for
HGAMMA0=self.SIMMParams['HGAMMA'],
HPHI0=None,
HF00=HF00,
WM0=None, HM0=None,
# Some more optional arguments, to control the "convergence"
# of the algo
numberOfIterations=self.SIMMParams['niter'],
updateRulePower=1.0,
stepNotes=self.SIMMParams['stepNotes'],
lambdaHF0 = 0.0 / (1.0 * SXR.max()), alphaHF0=0.9,
verbose=self.verbose, displayEvolution=False)
self.SIMMParams['HGAMMA'] = HGAMMA
self.SIMMParams['HPHI'] = HPHI
self.SIMMParams['HF0'] = HF0
self.SIMMParams['HM'] = HM
self.SIMMParams['WM'] = WM
self.SIMMParams['alphaR'] = alphaR
self.SIMMParams['alphaL'] = alphaL
self.SIMMParams['betaR'] = betaR
self.SIMMParams['betaL'] = betaL
# keeping the estimated HF0 in memory:
self.SIMMParams['HF00'][:,start:stop] = (
np.copy(HF0[:,startinHF00:stopinHF00]))
del SXR, SXL, HF00
# computing and storing the complex spectra
self.computeStereoX(start=start, stop=stop)
# writing the separated signals as output wavfile with suffix
# equal to the chunk number
self.writeSeparatedSignals(suffix='%05d.wav'%n)
# freeing memory
del self.XR, self.XL
if self.freeMemory:
del self.SIMMParams['HM'], self.SIMMParams['HF0']
del self.SIMMParams['HPHI']
del self.SIMMParams['alphaR'], self.SIMMParams['alphaL']
del self.SIMMParams['betaR'], self.SIMMParams['betaL']
# Now concatenating the wav files
self.overlapAddChunks(
nChunks=nChunks,
suffixIsSUIMM='.wav')
def overlapAddChunks(self, nChunks,
suffixIsSUIMM='.wav'):
# Now concatenating the wav files
wlen = self.stftParams['windowSizeInSamples']
offsetTF = self.stftParams['offsets'][self.tfrepresentation]
# overlap add on the chunks:
if self.tfrepresentation == 'stft':
hopsize = self.stftParams['hopsize']
overlapSamp = wlen - hopsize
# for stft, the overlap is taken into account at computation
# using rectangle synthesis function:
overlapFunc = np.ones(overlapSamp)
elif self.tfrepresentation in knownTransfos:
hopsize = self.mqt.cqtkernel.atomHOP
# for hybridcqt, have to compensate the overlap procedure:
overlapSamp = wlen - hopsize
# using sinebell ** 2 for overlapping function
# (rectangle as analysis function for hybridcqt):
overlapFunc = slf.sinebell(2 * overlapSamp)[overlapSamp:]**2
if self.verbose>3:
print "[DEBUG] check that window adds to 1:",
print overlapFunc + overlapFunc[::-1]
nuDataLen = (
self.totFrames * hopsize
+ 2 * wlen)
data = np.zeros([nuDataLen, 2], np.int16)
cumulframe = 0
##data = []
for n in range(nChunks):
suffix='%05d%s'%(n, suffixIsSUIMM)
fname = self.files['voc_output_file'][:-4] + suffix
# data.append(wav.read(fname)[1])
_, datatmp = wav.read(fname)
if n == 0 and nChunks!=1:
# weighing by the overlapping function
datatmp[-overlapSamp:,0] *= overlapFunc
datatmp[-overlapSamp:,1] *= overlapFunc
lendatatmp = (datatmp.shape[0] - offsetTF) # (datatmp.shape[0] - wlen/2)
data[:lendatatmp, :] = np.copy(
datatmp[offsetTF:, :])# datatmp[wlen/2:, :])
cumulframe = lendatatmp
elif nChunks != 1:
# weighing by the overlapping function
if n!=nChunks-1:
datatmp[-overlapSamp:,0] *= overlapFunc
datatmp[-overlapSamp:,1] *= overlapFunc
datatmp[:overlapSamp,0] *= overlapFunc[::-1]
datatmp[:overlapSamp,1] *= overlapFunc[::-1]
start = cumulframe - wlen + hopsize
lendatatmp = datatmp.shape[0]
stop = start + lendatatmp
data[start:stop, :] += datatmp
cumulframe = stop
else: # n=0 and nChunks = 1:
lendatatmp = datatmp.shape[0] - offsetTF
data[:lendatatmp] = datatmp[offsetTF:, :]
os.remove(fname)
# data = np.vstack(data)
wav.write(self.files['voc_output_file'][:-4] + suffixIsSUIMM,
self.fs,
data[:self.lengthData,:])
data = np.zeros([nuDataLen, 2], np.int16)
# overlap add on the chunks:
cumulframe = 0
##data = []
for n in range(nChunks):
suffix='%05d%s'%(n, suffixIsSUIMM)
fname = self.files['mus_output_file'][:-4] + suffix
#data.append(wav.read(fname)[1])
_, datatmp = wav.read(fname)
if n == 0 and nChunks!=1:
# weighing by the overlapping function
datatmp[-overlapSamp:,0] *= overlapFunc
datatmp[-overlapSamp:,1] *= overlapFunc
lendatatmp = (datatmp.shape[0] - offsetTF) # (datatmp.shape[0] - wlen/2)
data[:lendatatmp, :] = np.copy(
datatmp[offsetTF:, :])# datatmp[wlen/2:, :])
cumulframe = lendatatmp
elif nChunks != 1:
# weighing by the overlapping function
if n!=nChunks-1:
datatmp[-overlapSamp:,0] *= overlapFunc
datatmp[-overlapSamp:,1] *= overlapFunc
datatmp[:overlapSamp,0] *= overlapFunc[::-1]
datatmp[:overlapSamp,1] *= overlapFunc[::-1]
start = cumulframe - wlen + hopsize
lendatatmp = datatmp.shape[0]
stop = start + lendatatmp
data[start:stop, :] += datatmp
cumulframe = stop
else: # n=0 and nChunks = 1:
lendatatmp = datatmp.shape[0] - offsetTF
data[:lendatatmp] = datatmp[offsetTF:, :]
os.remove(fname)
#data = np.vstack(data)
wav.write(self.files['mus_output_file'][:-4] + suffixIsSUIMM,
self.fs,
data[:self.lengthData,:])
[docs] def estimStereoSUIMMParamsWriteSeps(self, maxFrames=1000):
"""same as estimStereoSIMMParamsWriteSeps, but adds the unvoiced
element in HF0
"""
totFrames, nChunks, maxFrames = self.checkChunkSize(maxFrames)
print " Estimating IMM parameters, on stereo channels, with",\
self.SIMMParams['R'],\
"\n accompaniment components."+\
" Nb of chunks: %d." %nChunks
WUF0 = np.hstack([self.SIMMParams['WF0'],
np.ones([self.SIMMParams['WF0'].shape[0], 1])])
self.SIMMParams['WUF0'] = WUF0
for n in range(nChunks):
if self.verbose:
print "Chunk nb", n+1, "out of", nChunks
start = n*maxFrames
stop = np.minimum((n+1)*maxFrames, totFrames)
SXR, SXL = self.computeStereoSX(start=start, stop=stop)
HUF0 = np.zeros([self.SIMMParams['NF0']
* self.SIMMParams['chirpPerF0']
+ 1,
SXR.shape[1]])
if self.tfrepresentation == 'stft':
startinHF00 = 0
stopinHF00 = stop - start
elif self.tfrepresentation in knownTransfos:
startinHF00 = np.sort(np.where(self.mqt.time_stamps>0)[0])[0]
stopinHF00 = startinHF00 + stop - start
else:
raise AttributeError(self.tfrepresentation
+ " not fully implemented.")
HUF0[:-1,startinHF00:stopinHF00] = (
self.SIMMParams['HF00'][:,start:stop])
HUF0[-1] = 1
alphaR, alphaL, HGAMMA, HPHI, HF0, \
betaR, betaL, HM, WM, recoError3 = SIMM.Stereo_SIMM(
# the data to be fitted to:
SXR, SXL,
# the basis matrices for the spectral combs
WUF0,
# and for the elementary filters:
WGAMMA=self.SIMMParams['WGAMMA'],
# number of desired filters, accompaniment spectra:
numberOfFilters=self.SIMMParams['K'],
numberOfAccompanimentSpectralShapes=self.SIMMParams['R'],
# if any, initial amplitude matrices for
HGAMMA0=self.SIMMParams['HGAMMA'],
HPHI0=None,
HF00=HUF0,
WM0=None,#WM,
HM0=None,#HM,
# Some more optional arguments, to control the "convergence"
# of the algo
numberOfIterations=self.SIMMParams['niter'],
updateRulePower=1.0,
stepNotes=self.SIMMParams['stepNotes'],
lambdaHF0 = 0.0 / (1.0 * SXR.max()), alphaHF0=0.9,
verbose=self.verbose, displayEvolution=False,
updateHGAMMA=False)
self.SIMMParams['HGAMMA'] = HGAMMA
self.SIMMParams['HPHI'] = HPHI
self.SIMMParams['HUF0'] = HF0
self.SIMMParams['HM'] = HM
self.SIMMParams['WM'] = WM
self.SIMMParams['alphaR'] = alphaR
self.SIMMParams['alphaL'] = alphaL
self.SIMMParams['betaR'] = betaR
self.SIMMParams['betaL'] = betaL
del SXR, SXL, HUF0
# computing and storing the complex spectra
self.computeStereoX(start=start, stop=stop)
# writing the separated signals as output wavfile with suffix
# equal to the chunk number
self.writeSeparatedSignals(suffix='%05d_VUIMM.wav'%n)
# freeing memory
del self.XR, self.XL
del self.SIMMParams['HM'], self.SIMMParams['HUF0']
del self.SIMMParams['HPHI']
del self.SIMMParams['alphaR'], self.SIMMParams['alphaL']
del self.SIMMParams['betaR'], self.SIMMParams['betaL']
# Now concatenating the wav files
self.overlapAddChunks(
nChunks=nChunks,
suffixIsSUIMM='_VUIMM.wav')
def estimStereoSIMMParams(self):
self.computeStereoX()
SXR = np.abs(self.XR) ** 2
SXL = np.abs(self.XL) ** 2
alphaR, alphaL, HGAMMA, HPHI, HF0, \
betaR, betaL, HM, WM, recoError2 = SIMM.Stereo_SIMM(
# the data to be fitted to:
SXR, SXL,
# the basis matrices for the spectral combs
WF0=self.SIMMParams['WF0'],
# and for the elementary filters:
WGAMMA=self.SIMMParams['WGAMMA'],
# number of desired filters, accompaniment spectra:
numberOfFilters=self.SIMMParams['K'],
numberOfAccompanimentSpectralShapes=self.SIMMParams['R'],
# if any, initial amplitude matrices for
HGAMMA0=None, HPHI0=None,
HF00=self.SIMMParams['HF00'],
WM0=None, HM0=None,
# Some more optional arguments, to control the "convergence"
# of the algo
numberOfIterations=self.SIMMParams['niter'],
updateRulePower=1.0,
stepNotes=self.SIMMParams['stepNotes'],
lambdaHF0 = 0.0 / (1.0 * SXR.max()), alphaHF0=0.9,
verbose=self.verbose, displayEvolution=False)
self.SIMMParams['HGAMMA'] = HGAMMA
self.SIMMParams['HPHI'] = HPHI
self.SIMMParams['HF0'] = HF0
self.SIMMParams['HM'] = HM
self.SIMMParams['WM'] = WM
self.SIMMParams['alphaR'] = alphaR
self.SIMMParams['alphaL'] = alphaL
self.SIMMParams['betaR'] = betaR
self.SIMMParams['betaL'] = betaL
del SXR, SXL
def estimStereoSUIMMParams(self):
SXR = np.abs(self.XR) ** 2
SXL = np.abs(self.XL) ** 2
# adding the unvoiced part in the source basis:
WUF0 = np.hstack([self.SIMMParams['WF0'],
np.ones([self.SIMMParams['WF0'].shape[0], 1])])
HUF0 = np.vstack([self.SIMMParams['HF0'],
np.ones([1, self.SIMMParams['HF0'].shape[1]])])
alphaR, alphaL, HGAMMA, HPHI, HF0, \
betaR, betaL, HM, WM, recoError3 = SIMM.Stereo_SIMM(
# the data to be fitted to:
SXR, SXL,
# the basis matrices for the spectral combs
WUF0,
# and for the elementary filters:
WGAMMA=self.SIMMParams['WGAMMA'],
# number of desired filters, accompaniment spectra:
numberOfFilters=self.SIMMParams['K'],
numberOfAccompanimentSpectralShapes=self.SIMMParams['R'],
# if any, initial amplitude matrices for
HGAMMA0=self.SIMMParams['HGAMMA'],
HPHI0=self.SIMMParams['HPHI'],
HF00=HUF0,
WM0=None,#WM,
HM0=None,#HM,
# Some more optional arguments, to control the "convergence"
# of the algo
numberOfIterations=self.SIMMParams['niter'],
updateRulePower=1.0,
stepNotes=self.SIMMParams['stepNotes'],
lambdaHF0 = 0.0 / (1.0 * SXR.max()), alphaHF0=0.9,
verbose=self.verbose, displayEvolution=False,
updateHGAMMA=False)
self.SIMMParams['HGAMMA'] = HGAMMA
self.SIMMParams['HPHI'] = HPHI
self.SIMMParams['HUF0'] = HF0
self.SIMMParams['WUF0'] = WUF0
self.SIMMParams['HM'] = HM
self.SIMMParams['WM'] = WM
self.SIMMParams['alphaR'] = alphaR
self.SIMMParams['alphaL'] = alphaL
self.SIMMParams['betaR'] = betaR
self.SIMMParams['betaL'] = betaL
[docs] def writeSeparatedSignals(self, suffix='.wav'):
"""Writes the separated signals to the files in self.files.
If suffix contains 'VUIMM', then this method will take
the WF0 and HF0 that contain the estimated unvoiced elements.
"""
if 'VUIMM' in suffix:
WF0 = self.SIMMParams['WUF0']
HF0 = self.SIMMParams['HUF0']
else:
WF0 = self.SIMMParams['WF0']
HF0 = self.SIMMParams['HF0']
WGAMMA = self.SIMMParams['WGAMMA']
HGAMMA = self.SIMMParams['HGAMMA']
HPHI = self.SIMMParams['HPHI']
HM = self.SIMMParams['HM']
WM = self.SIMMParams['WM']
alphaR = self.SIMMParams['alphaR']
alphaL = self.SIMMParams['alphaL']
betaR = self.SIMMParams['betaR']
betaL = self.SIMMParams['betaL']
windowSizeInSamples = self.stftParams['windowSizeInSamples']
SPHI = np.dot(np.dot(WGAMMA, HGAMMA), HPHI)
SF0 = np.dot(WF0, HF0)
hatSXR = (alphaR**2) * SF0 * SPHI + np.dot(np.dot(WM, betaR**2),HM)
hatSXL = (alphaL**2) * SF0 * SPHI + np.dot(np.dot(WM, betaL**2),HM)
hatSXR = np.maximum(hatSXR, eps)
hatSXL = np.maximum(hatSXL, eps)
hatVR = (alphaR**2) * SPHI * SF0 / hatSXR * self.XR
if self.tfrepresentation == 'stft':
vestR = slf.istft(
hatVR,
hopsize=self.stftParams['hopsize'],
nfft=self.stftParams['NFT'],
window=slf.sinebell(windowSizeInSamples),
originalDataLen=None,)#self.originalDataLen)# / 4.0
elif self.tfrepresentation in knownTransfos:
self.mqt.transfo = hatVR
vestR = self.mqt.invertTransform()
del self.mqt.transfo
hatVR = (alphaL**2) * SPHI * SF0 / hatSXL * self.XL
del SPHI, SF0
if self.tfrepresentation == 'stft':
vestL = slf.istft(
hatVR,
hopsize=self.stftParams['hopsize'],
nfft=self.stftParams['NFT'],
window=slf.sinebell(windowSizeInSamples),
originalDataLen=None,)#self.originalDataLen)# / 4.0
elif self.tfrepresentation in knownTransfos:
self.mqt.transfo = hatVR
vestL = self.mqt.invertTransform()
del self.mqt.transfo
del hatVR
vestR = np.array(np.round(vestR*self.scaleData), dtype=self.dataType)
vestL = np.array(np.round(vestL*self.scaleData), dtype=self.dataType)
wav.write(self.files['voc_output_file'][:-4] + suffix,
self.fs,
np.array([vestR,vestL]).T)
del vestR, vestL
hatMR = (np.dot(np.dot(WM,betaR ** 2), HM)) / hatSXR * self.XR
if self.tfrepresentation == 'stft':
mestR = slf.istft(
hatMR,
hopsize=self.stftParams['hopsize'],
nfft=self.stftParams['NFT'],
window=slf.sinebell(windowSizeInSamples),
originalDataLen=None,)#self.originalDataLen) # / 4.0
elif self.tfrepresentation in knownTransfos:
self.mqt.transfo = hatMR
mestR = self.mqt.invertTransform()
del self.mqt.transfo
hatMR = (np.dot(np.dot(WM,betaL ** 2), HM)) / hatSXL * self.XL
if self.tfrepresentation == 'stft':
mestL = slf.istft(
hatMR,
hopsize=self.stftParams['hopsize'],
nfft=self.stftParams['NFT'],
window=slf.sinebell(windowSizeInSamples),
originalDataLen=None,)#self.originalDataLen) # / 4.0
elif self.tfrepresentation in knownTransfos:
self.mqt.transfo = hatMR
mestL = self.mqt.invertTransform()
del self.mqt.transfo
del hatMR
mestR = np.array(np.round(mestR*self.scaleData), dtype=self.dataType)
mestL = np.array(np.round(mestL*self.scaleData), dtype=self.dataType)
wav.write(self.files['mus_output_file'][:-4] + suffix,
self.fs,
np.array([mestR,mestL]).T)
del mestR, mestL
[docs] def writeSeparatedSignalsWithUnvoice(self):
"""A wrapper to give a decent name to the function: simply
calling self.writeSeparatedSignals with the
'_VUIMM.wav' suffix.
"""
self.writeSeparatedSignals(suffix='_VUIMM.wav')
[docs] def checkChunkSize(self, maxFrames):
"""Computes the number of chunks of size maxFrames, and
changes maxFrames in case it does not provide long enough
chunks (especially the last chunk).
"""
totFrames = np.int32(self.computeNFrames())
nChunks = totFrames / maxFrames + 1
# checking size of last chunk, if "small", then making it
# more even sized chunks
if (totFrames-(nChunks-1)*maxFrames <
self.stftParams['windowSizeInSamples'] /
self.stftParams['hopsize'] ):
print "Modifying the maxframes, such that chunks not too small"
maxFrames = np.int(np.ceil(np.double(totFrames)/nChunks))
nChunks = totFrames/maxFrames
print "The chunks are then maximum", maxFrames
return totFrames, nChunks, maxFrames