"""Description
-----------
FASST (Flexible Audio Source Separation Toolbox) class
subclass it to obtain your own flavoured source separation model!
Usage
-----
TBD
Reference
---------
.. [Ozerov2012] A. Ozerov, E. Vincent and F. Bimbot
\"A General Flexible Framework for the Handling of Prior Information
in Audio Source Separation,\"
IEEE Transactions on Audio, Speech and Signal Processing 20(4),
pp. 1118-1133 (2012)
Available: `Archive on HAL <http://hal.inria.fr/hal-00626962/>`_
Adapted from the Matlab toolbox available at
http://bass-db.gforge.inria.fr/fasst/
Copyright (TBD)
---------------
Jean-Louis Durrieu, EPFL-STI-IEL-LTS5
::
jean DASH louis AT durrieu DOT ch
2012-2013
http://www.durrieu.ch
Reference
---------
"""
import numpy as np
from numpy.testing import assert_array_almost_equal # FOR DEBUG/DEV
import warnings, os
import audioObject as ao
import demixTF as demix
import SeparateLeadStereo.SeparateLeadStereoTF as SLS
from sourcefilter.filter import generateHannBasis
from spatial.steering_vectors import gen_steer_vec_far_src_uniform_linear_array
import tftransforms.tft as tft # loads the possible transforms
import tools.signalTools as st
from tools.signalTools import inv_herm_mat_2d
from tools.nmf import NMF_decomp_init, NMF_decomposition
tftransforms = {
'stftold': tft.TFTransform, # just making dummy, in FASST, not used
'stft': tft.STFT,
'mqt': tft.MinQTransfo,
'minqt': tft.MinQTransfo,
'nsgmqt': tft.NSGMinQT,
'cqt': tft.CQTransfo}
eps = 1e-10
log_prior_small_cst = 1e-70
soundCelerity = 340. # m/s
########## Main classes for audio models ##########
[docs]class FASST(object):
"""**FASST**: Flexible Audio Source Separation Toolbox
This is the superclass that implements the core functions for
the framework for audio source separation as introduced in [Ozerov2012]_
A. Ozerov, E. Vincent and F. Bimbot
\"A General Flexible Framework for the Handling of Prior
Information in Audio Source Separation,\"
IEEE Transactions on Audio, Speech and Signal Processing 20(4),
pp. 1118-1133 (2012)
Available: http://hal.inria.fr/hal-00626962/
In order to use it, one should sub-class this class, and in particular
define several elements that are assumed by the core functions for
estimation and separation in this class, see below for a list.
:param audio: the audio filename
:param transf:
a string describing the desired Time-Frequency (TF) representation
:param wlen: length of the analysis windows, mostly for STFT representation
:param integer hopsize: the size of samples between two analysis frames
:param integer iter_num: number of GEM iterations for parameter esitmation
:param str sim_ann_opt: type of annealing strategy (`'ann'`. `'no_ann'`)
:param list ann_PSD_lim:
list of 2 elements, `ann_PSD_lim[0]` is the amount of added noise
to the PSD at beginning, and `ann_PSD_lim[1]` is this amount at the
end of the estimation.
:param integer verbose:
level of verbose: 0 for almost nothing, greater than 1 for debug
messages
:param double nmfUpdateCoeff:
the exponent for the Non-Negative Matrix Factorization-type updates
within the GEM iteration
:param tffmin: minimum frequency for the TF representation
:param tffmax: maximal frequency
:param tfWinFunc: window function (please provide a python function here)
:param integer tfbpo:
number of bins per octave, for Constant-Q-based representations
:param double lambdaCorr:
penalization term to control the correlation between the sources.
Some important attributes of this class are:
:var dict spat_comps:
a dictionary containing the spatial component parameters and
variables. In particular, for a given component numbered `spat_ind`,
:var dict spec_comps:
the spectral component parameters dictionary.
:var pyfasst.audioObject.AudioObject audioObject:
the audio object that is to be processed.
See :py:class:`pyfasst.audioObject.AudioObject` for details.
:var sig_repr_params:
Parameters for the computation of the signal TF representation. The
keys in this dictionary are:
`'transf'` - the type of TF representation
`'wlen'` - the window length in samples
`'fsize'` - the size of the Fourier transform (in samples)
`'hopsize'` - the hop-size in samples between two consecutive frames
`'tffmin'`, `'tffmax'`, `'tfbpo'`, `'tfWinFunc'`, `'hopfactor'` -
variables related to specific TF representations.
:var pyfasst.tftransforms.tft.TFTransform tft:
The object that implements the TF transform.
See :py:class:`pyfasst.tftransforms.tft.TFTransform`
:var numpy.ndarray Cx:
The computed transform, after :py:meth:`FASST.omp_transf_Cx` has been
called. For memory management, as of 20130820, :py:attr:`FASST.Cx`,
for a given frame and given frequency, is supposed to be Hermitian:
only the upper diagonal is therefore kept.
"""
# for now only stft:
implemented_transf = ['stft','stftold', 'mqt', 'minqt', 'cqt']
implemented_annealing = ['ann', 'no_ann', ]
def __init__(self,
audio,
transf='stft',
wlen=2048,
hopsize=512,
iter_num=50,
sim_ann_opt='ann',
ann_PSD_lim=[None, None],
verbose=0,
nmfUpdateCoeff=1.,
tffmin=25,
tffmax=18000,
tfWinFunc=None,
tfbpo=48,
lambdaCorr=0.):
"""**FASST**: Flexible Audio Source Separation Toolbox
"""
self.verbose = verbose
self.nmfUpdateCoeff = nmfUpdateCoeff
if isinstance(audio, ao.AudioObject):
self.audioObject = audio
elif isinstance(audio, str) or isinstance(audio, unicode):
self.audioObject = ao.AudioObject(filename=audio)
else:
raise AttributeError("The provided audio parameter is"+
"not a supported format.")
# parameters to compute the signal representation:
self.sig_repr_params = {}
self.sig_repr_params['transf'] = transf.lower() # transformation type
self.sig_repr_params['wlen'] = ao.nextpow2(wlen) # window length
self.sig_repr_params['fsize'] = ao.nextpow2(wlen) # Fourier length
self.sig_repr_params['hopsize'] = hopsize
self.sig_repr_params['tffmin'] = tffmin
self.sig_repr_params['tffmax'] = tffmax
self.sig_repr_params['tfbpo'] = tfbpo
self.sig_repr_params['tfWinFunc'] = tfWinFunc
self.sig_repr_params['hopfactor'] = (
1. * hopsize / self.sig_repr_params['wlen'])
if self.sig_repr_params['transf'] not in self.implemented_transf \
or self.sig_repr_params['transf'] not in tftransforms:
raise NotImplementedError(self.sig_repr_params['transf']
+ " not yet implemented.")
elif self.sig_repr_params['transf'] != 'stftold':
self.tft = tftransforms[self.sig_repr_params['transf']](
fmin=tffmin,
fmax=tffmax,
bins=tfbpo,
fs=self.audioObject.samplerate,
perfRast=1,
linFTLen=self.sig_repr_params['fsize'],
atomHopFactor=self.sig_repr_params['hopfactor'],
)
elif self.sig_repr_params['transf'] == 'stftold':
self.tft = tftransforms['stft'](
fmin=tffmin,
fmax=tffmax,
bins=tfbpo,
fs=self.audioObject.samplerate,
perfRast=1,
linFTLen=self.sig_repr_params['fsize'],
atomHopFactor=self.sig_repr_params['hopfactor'],
)
# demix parameters:
self.demixParams = {
'tffmin': tffmin, 'tffmax': tffmax,
'tfbpo': tfbpo,
'tfrepresentation': transf.lower(), # 'stft', #transf.lower()
'wlen': self.sig_repr_params['wlen'],
'hopsize': self.sig_repr_params['wlen']/2,
'neighbors': 20,
'winFunc': tfWinFunc,
}
# noise parameters
self.noise = {}
self.noise['PSD'] = np.zeros(self.sig_repr_params['fsize']/2+1)
self.noise['sim_ann_opt'] = sim_ann_opt
self.noise['ann_PSD_lim'] = ann_PSD_lim
self.spat_comps = {}
self.spec_comps = {}
self.iter_num = iter_num
self.lambdaCorr = lambdaCorr
[docs] def comp_transf_Cx(self):
"""Computes the signal representation, according
to the provided signal representation flag, in
:py:attr:`FASST.sig_repr_params['transf']`
"""
if not hasattr(self.audioObject, '_data'):
self.audioObject._read()
if self.sig_repr_params['transf'] not in self.implemented_transf:
raise ValueError(self.sig_repr_params['transf'] +
" not implemented - yet?")
if self.verbose:
print ("Computing the chosen signal representation:",
self.sig_repr_params['transf'] )
nc = self.audioObject.channels
Xchan = []
for n in range(nc):
if self.sig_repr_params['transf'] == 'stftold':
X, freqs, times = ao.stft(
self.audioObject.data[:,n],
window=np.hanning(self.sig_repr_params['wlen']),
hopsize=self.sig_repr_params['hopsize'],
nfft=self.sig_repr_params['fsize'],
fs=self.audioObject.samplerate
)
else:
self.tft.computeTransform(self.audioObject.data[:,n],)
X = self.tft.transfo
Xchan.append(X)
if self.verbose>1:
print X.shape
self.nbFreqsSigRepr, self.nbFramesSigRepr = X.shape
##assert self.nbFreqsSigRepr == self.tft.freqbins
del X
del self.audioObject.data
if nc == 1:
self.Cx = np.abs(Xchan[0])**2
else:
self.Cx = np.zeros([nc * (nc + 1) / 2,
self.nbFreqsSigRepr,
self.nbFramesSigRepr],
dtype=complex)
for n1 in range(nc):
for n2 in range(n1, nc):
# note : we keep only upper diagonal of Cx
# lower diagonal is conjugate of upper one.
n = n2 - n1 + np.sum(np.arange(nc, nc-n1, -1))
self.Cx[n] = Xchan[n1] * np.conj(Xchan[n2])
if self.noise['ann_PSD_lim'][0] is None or \
self.noise['ann_PSD_lim'][1] is None:
mix_psd = 0
# average power, for each frequency band, across the frames
if nc == 1:
mix_psd += np.mean(self.Cx, axis=1)
else:
for n1 in range(nc):
n = np.sum(np.arange(nc, nc-n1, -1)) # n2 = n1
mix_psd += np.mean(self.Cx[n], axis=1)
if self.verbose>1:
print "mix_psd", mix_psd
mix_psd /= nc
if self.verbose>1:
print "mix_psd/nc", mix_psd
if self.noise['ann_PSD_lim'][0] is None:
self.noise['ann_PSD_lim'][0] = np.real(mix_psd) / 100.
if self.noise['ann_PSD_lim'][1] is None:
self.noise['ann_PSD_lim'][1] = np.real(mix_psd) / 10000.
if self.noise['sim_ann_opt'] in ('ann'):
self.noise['PSD'] = self.noise['ann_PSD_lim'][0]
# useless for the rest of computations:
del Xchan
[docs] def estim_param_a_post_model(self,):
"""Estimates the `a posteriori` model for the provided
audio signal. In particular, this runs self.iter_num times
the Generalized Expectation-Maximisation algorithm
:py:meth:`FASST.GEM_iteration`, to
update the various parameters of the model, so as to
maximize the likelihood of the data given these parameters.
From these parameters, the posterior expectation of the
\"hidden\" or latent variables (here the spatial and spectral
components) can be computed, leading to the estimation of the
separated underlying sources.
Consider using :py:meth:`FASST.separate_spat_comps` or
:py:meth:`FASST.separate_spatial_filter_comp` to obtain the separated time
series, once the parameters have been estimated.
:returns:
`logliks`: The log-likelihoods as computed after each GEM iteration.
"""
logliks = np.ones(self.iter_num)
# TODO: move this back in __init__, and remove from subclasses...
if self.noise['sim_ann_opt'] in ['ann', ]:
self.noise['PSD'] = self.noise['ann_PSD_lim'][0]
elif self.noise['sim_ann_opt'] is 'no_ann':
self.noise['PSD'] = self.noise['ann_PSD_lim'][1]
else:
warnings.warn("To add noise to the signal, provide the "+
"sim_ann_opt from any of 'ann', "+
"'no_ann' or 'ann_ns_inj' ")
for i in range(self.iter_num):
if self.verbose:
print "Iteration", i+1, "on", self.iter_num
# adding the noise psd if required:
if self.noise['sim_ann_opt'] in ['ann', 'ann_ns_inj']:
self.noise['PSD'] = (
(np.sqrt(self.noise['ann_PSD_lim'][0]) *
(self.iter_num - i) +
np.sqrt(self.noise['ann_PSD_lim'][1]) * i) /
self.iter_num) ** 2
# running the GEM iteration:
logliks[i] = self.GEM_iteration()
if self.verbose:
print " log-likelihood:", logliks[i]
if i>0:
print " improvement:", logliks[i]-logliks[i-1]
return logliks
[docs] def GEM_iteration(self,):
"""GEM iteration: one iteration of the Generalized Expectation-
Maximization algorithm to update the various parameters whose
:py:attr:`FASST.spec_comp[spec_ind]['frdm_prior']` is set to ``'free'``.
:returns:
`loglik` (double): the log-likelihood of the data,
given the updated parameters
"""
if self.audioObject.channels==2:
spat_comp_powers, mix_matrix, rank_part_ind = (
self.retrieve_subsrc_params())
# compute the sufficient statistics
hat_Rxx, hat_Rxs, hat_Rss, hat_Ws, loglik = (
self.compute_suff_stat(spat_comp_powers, mix_matrix))
# update the mixing matrix
self.update_mix_matrix(hat_Rxs, hat_Rss, mix_matrix, rank_part_ind)
# from sub-sources to sources
# (as given by the different spatial comps)
# had better have shape = [nbSpatComps,F,N]
hat_W = np.zeros([len(rank_part_ind),
self.nbFreqsSigRepr,
self.nbFramesSigRepr])
if self.verbose > 1:
print "rank_part_in", rank_part_ind
for w in range(len(rank_part_ind)):
hat_W[w] = np.mean(hat_Ws[rank_part_ind[w]], axis=0)
del spat_comp_powers, mix_matrix, rank_part_ind
del hat_Rxx, hat_Rxs, hat_Rss, hat_Ws
else:
raise AttributeError("Nb channels "+str(self.audioObject.channels)+
" not implemented yet")
# update the spectral parameters
self.update_spectral_components(hat_W)
# normalize parameters
self.renormalize_parameters()
return loglik
[docs] def comp_spat_comp_power(self, spat_comp_ind,
spec_comp_ind=[], factor_ind=[]):
"""Matlab FASST Toolbox help::
% V = comp_spat_comp_power(mix_str, spat_comp_ind,
% spec_comp_ind, factor_ind);
%
% compute spatial component power
%
%
% input
% -----
%
% mix_str : mixture structure
% spat_comp_ind : spatial component index
% spec_comp_ind : (opt) factor index (def = [], use all components)
% factor_ind : (opt) factor index (def = [], use all factors)
%
%
% output
% ------
%
% V : (F x N) spatial component power
"""
V = np.zeros([self.nbFreqsSigRepr, self.nbFramesSigRepr])
if len(spec_comp_ind):
spec_comp_ind_arr = spec_comp_ind
else:
spec_comp_ind_arr = self.spec_comps.keys()
for k in spec_comp_ind_arr:
if spat_comp_ind == self.spec_comps[k]['spat_comp_ind']:
V_comp = np.ones([self.nbFreqsSigRepr, self.nbFramesSigRepr])
if len(factor_ind):
factors_ind_arr = factor_ind
else:
factors_ind_arr = self.spec_comps[k]['factor'].keys()
for f in factors_ind_arr:
factor = self.spec_comps[k]['factor'][f]
W = np.dot(factor['FB'], factor['FW'])
if len(factor['TB']):
H = np.dot(factor['TW'],factor['TB'])
else:
H = factor['TW']
V_comp *= np.dot(W, H)
del W
del H
V += V_comp
del V_comp
del factor
return V
[docs] def comp_spat_cmps_powers(self, spat_comp_ind,
spec_comp_ind=[], factor_ind=[]):
"""Compute the sum of the spectral powers corresponding to the
spatial components as provided in the list `spat_comp_ind`
NB: because this does not take into account the mixing process,
the resulting power does not, in general, correspond to the
the observed signal's parameterized spectral power.
"""
V = 0
for i in spat_comp_ind:
V += self.comp_spat_comp_power(spat_comp_ind=i)
return V
[docs] def retrieve_subsrc_params(self,):
"""Computes the various quantities necessary for the estimation of the
main parameters:
**Outputs**
`spat_comp_powers`
(`total_spat_rank` x `nbFreqsSigRepr` x `nbFramesSigRepr`) ndarray
the spatial component power spectra. Note that total_spat_rank
is the sum of all the spatial ranks for all the sources.
`mix_matrix`
(total_spat_rank x nchannels x nbFreqsSigRepr) ndarray
the mixing matrices for each source
`rank_part_ind`
dictionary: each key is one source, and the values are the indices
in `spat_comp_powers` and `mix_matrix` that correspond to that source.
If the spatial rank of source `j` is 2, then its spectra will appear
twice in `spat_comp_powers`, with mixing parameters (potentially
different one from the other) appearing in two sub-matrices of
`mix_matrix`.
"""
K = len(self.spat_comps)
rank_total = 0
rank_part_ind = {}
for j in range(K):
# this is the ranks
if self.spat_comps[j]['mix_type'] == 'inst':
rank = self.spat_comps[j]['params'].shape[1]
else:
rank = self.spat_comps[j]['params'].shape[0]
if self.verbose>1:
print " Rank of spatial source %d" %j +\
" is %d" %rank
rank_part_ind[j] = (
rank_total +
np.arange(rank))
rank_total += rank
spat_comp_powers = np.zeros([rank_total,
self.nbFreqsSigRepr,
self.nbFramesSigRepr])
mix_matrix = np.zeros([rank_total,
self.audioObject.channels,
self.nbFreqsSigRepr], dtype=complex)
for j, spat_comp in self.spat_comps.items():
spat_comp_j = self.comp_spat_comp_power(spat_comp_ind=j)
for r in rank_part_ind[j]:
spat_comp_powers[r] = spat_comp_j
if spat_comp['mix_type'] == 'inst':
for f in range(self.nbFreqsSigRepr):
#print rank_part_ind[j]
#print spat_comp['params'].shape
#print mix_matrix[rank_part_ind[j],:,f].shape
mix_matrix[rank_part_ind[j],:,f] = spat_comp['params'].T
else:
mix_matrix[rank_part_ind[j]] = spat_comp['params']
return spat_comp_powers, mix_matrix, rank_part_ind
[docs] def compute_suff_stat(self, spat_comp_powers, mix_matrix):
"""
Outputs:
hat_Rxx
hat_Rxs
hat_Rss
hat_Ws
loglik
"""
if self.audioObject.channels != 2:
raise ValueError("Nb channels not supported:"+
str(self.audioObject.channels))
if self.verbose: print " Computing sufficient statistics"
nbspatcomp = spat_comp_powers.shape[0]
# CAUTION! non-initialized arrays !
sigma_x_diag = np.empty([2,
self.nbFreqsSigRepr,
self.nbFramesSigRepr])
#sigma_x_off = np.zeros([self.nbFreqsSigRepr,
# self.nbFramesSigRepr], dtype=complex)
# setting the first element with spat_comp 0:
r = 0
sigma_x_diag[0] = (
np.vstack(np.abs(mix_matrix[r][0])**2) *
spat_comp_powers[r]
)
sigma_x_diag[1] = (
np.vstack(np.abs(mix_matrix[r][1])**2) *
spat_comp_powers[r]
)
sigma_x_off = (
np.vstack(mix_matrix[r][0] *
np.conj(mix_matrix[r][1])) *
spat_comp_powers[r]
)
for n in range(2):
sigma_x_diag[n] += np.vstack(self.noise['PSD'])
# noise PSD should be of size nbFreqs
for r in range(1, nbspatcomp):
sigma_x_diag[0] += (
np.vstack(np.abs(mix_matrix[r][0])**2) *
spat_comp_powers[r]
)
sigma_x_diag[1] += (
np.vstack(np.abs(mix_matrix[r][1])**2) *
spat_comp_powers[r]
)
sigma_x_off += (
np.vstack(mix_matrix[r][0] *
np.conj(mix_matrix[r][1])) *
spat_comp_powers[r]
)
inv_sigma_x_diag, inv_sigma_x_off, det_sigma_x = (
inv_herm_mat_2d(sigma_x_diag, sigma_x_off,
verbose=self.verbose))
del sigma_x_diag, sigma_x_off
# compute log likelihood
loglik = - np.mean(np.log(det_sigma_x * np.pi) +
inv_sigma_x_diag[0] * self.Cx[0] +
inv_sigma_x_diag[1] * self.Cx[2] +
2. * np.real(inv_sigma_x_off * np.conj(self.Cx[1]))
)
# compute expectations of Rss and Ws sufficient statistics
Gs = np.empty((2, nbspatcomp,
self.nbFreqsSigRepr,
self.nbFramesSigRepr), dtype=np.complex) # {}
# one for each channel (stereo, here)
#Gs[0] = {}
#Gs[1] = {}
for r in range(nbspatcomp):
Gs[0, r] = (
(np.vstack(np.conj(mix_matrix[r][0])) * inv_sigma_x_diag[0] +
np.vstack(np.conj(mix_matrix[r][1])) *
np.conj(inv_sigma_x_off)) *
spat_comp_powers[r]
)
Gs[1, r] = (
(np.vstack(np.conj(mix_matrix[r][0])) * inv_sigma_x_off +
np.vstack(np.conj(mix_matrix[r][1])) * inv_sigma_x_diag[1]) *
spat_comp_powers[r]
)
# the following quantities are assigned later, so
# an empty allocation should do.
hat_Rss = np.empty([self.nbFreqsSigRepr,
nbspatcomp,
nbspatcomp],
dtype=complex)
hat_Ws = np.empty([nbspatcomp,
self.nbFreqsSigRepr,
self.nbFramesSigRepr])
hatRssLoc1 = np.empty_like(self.Cx[0])
hatRssLoc2 = np.empty_like(self.Cx[0])
hatRssLoc3 = np.empty_like(self.Cx[0])
for r1 in range(nbspatcomp):
for r2 in range(nbspatcomp):
# TODO: could probably factor a bit more the following formula:
hatRssLoc1[:] = np.copy(self.Cx[0])
hatRssLoc1 *= np.conj(Gs[0,r2])
hatRssLoc1 += (np.conj(Gs[1,r2]) * self.Cx[1])
hatRssLoc1 *= Gs[0,r1]
hatRssLoc2[:] = np.copy(self.Cx[2])
hatRssLoc2 *= np.conj(Gs[1,r2])
hatRssLoc2 += (np.conj(Gs[0,r2] * self.Cx[1]))
hatRssLoc2 *= Gs[1,r1]
hatRssLoc3[:] = np.copy(Gs[0,r1])
hatRssLoc3 *= np.vstack(mix_matrix[r2,0])
hatRssLoc3 += (Gs[1,r1] * np.vstack(mix_matrix[r2,1]))
hatRssLoc3 *= spat_comp_powers[r2]
hatRssLoc1 += hatRssLoc2
hatRssLoc1 -= hatRssLoc3
#hatRssLoc = (Gs[0][r1] * np.conj(Gs[0][r2]) * self.Cx[0] +
# Gs[1][r1] * np.conj(Gs[1][r2]) * self.Cx[2] +
# Gs[0][r1] * np.conj(Gs[1][r2]) * self.Cx[1] +
# Gs[1][r1]*np.conj(Gs[0][r2])*np.conj(self.Cx[1])-
# (Gs[0][r1] * np.vstack(mix_matrix[r2][0]) +
# Gs[1][r1] * np.vstack(mix_matrix[r2][1]))
# * spat_comp_powers[r2]
# )
if r1 == r2:
hatRssLoc1 += spat_comp_powers[r1]
hat_Ws[r1] = np.abs(np.real(hatRssLoc1))
hat_Rss[:,r1,r2] = np.mean(hatRssLoc1, axis=1)
# To assure hermitian symmetry:
for f in range(self.nbFreqsSigRepr):
if self.verbose>10: # DEBUG
assert_array_almost_equal(
hat_Rss[f],
(hat_Rss[f] + np.conj(hat_Rss[f]).T) / 2.)
hat_Rss[f] = (hat_Rss[f] + np.conj(hat_Rss[f]).T) / 2.
# Expectations of Rxs sufficient statistics
hat_Rxs = np.empty([self.nbFreqsSigRepr,
2,
nbspatcomp],
dtype=complex)
for r in range(nbspatcomp):
hat_Rxs[:,0,r] = (
np.mean(np.conj(Gs[0][r]) * self.Cx[0] +
np.conj(Gs[1][r]) * self.Cx[1], axis=1)
)
hat_Rxs[:,1,r] = (
np.mean(np.conj(Gs[0][r]) * np.conj(self.Cx[1]) +
np.conj(Gs[1][r]) * self.Cx[2], axis=1)
)
del Gs
# at last Rxx sufficient statistics:
hat_Rxx = np.mean(self.Cx, axis=-1)
# recommendation, use logarithm:
# hat_Rxx[]
return hat_Rxx, hat_Rxs, hat_Rss, hat_Ws, loglik
[docs] def update_mix_matrix(self,hat_Rxs, hat_Rss, mix_matrix, rank_part_ind):
"""
"""
# deriving which components have which updating rule:
upd_inst_ind = []
upd_inst_other_ind = []
upd_conv_ind = []
upd_conv_other_ind = []
for j, spat_comp_j in self.spat_comps.items():
if spat_comp_j['frdm_prior'] == 'free' and \
spat_comp_j['mix_type'] == 'inst':
upd_inst_ind.extend(rank_part_ind[j])
else:
upd_inst_other_ind.extend(rank_part_ind[j])
if spat_comp_j['frdm_prior'] == 'free' and \
spat_comp_j['mix_type'] == 'conv':
upd_conv_ind.extend(rank_part_ind[j])
else:
upd_conv_other_ind.extend(rank_part_ind[j])
# update linear instantaneous coefficients:
K_inst = len(upd_inst_ind)
if len(upd_inst_ind):
if self.verbose:
print " Updating mixing matrix, instantaneous sources"
#hat_Rxs_bis = np.zeros([self.nbFreqsSigRepr,
# 2,
# K_inst])
hat_Rxs_bis = hat_Rxs[:,:,upd_inst_ind]
if len(upd_inst_other_ind):
for f in range(self.nbFreqsSigRepr):
hat_Rxs_bis[f] -= (
np.dot(mix_matrix[upd_inst_other_ind,:,f].T,
hat_Rss[f][np.vstack(upd_inst_other_ind),
upd_inst_ind]))
# hat_Rss[f][upd_inst_other_ind][:,upd_inst_ind])
hat_Rxs_bis = np.real(np.mean(hat_Rxs_bis, axis=0))
rm_hat_Rss = np.real(np.mean(hat_Rss[:,np.vstack(upd_inst_ind),
upd_inst_ind], axis=0))
# in ozerov's code:
##mix_matrix_inst = np.dot(hat_Rxs_bis, np.linalg.inv(rm_hat_Rss))
mix_matrix_inst = np.linalg.solve(rm_hat_Rss.T, hat_Rxs_bis.T)
# sym_pos=True).T
if self.verbose>1:
print "mix_matrix", mix_matrix
print "mix_matrix_inst", mix_matrix_inst
print "mix_matrix_inst.shape",mix_matrix_inst.shape
print mix_matrix.shape, \
mix_matrix[upd_inst_ind].shape
for f in range(self.nbFreqsSigRepr):
mix_matrix[upd_inst_ind,:,f] = mix_matrix_inst
del mix_matrix_inst
# update convolutive coefficients:
if len(upd_conv_ind):
if self.verbose:
print " Updating mixing matrix, convolutive sources"
hat_Rxs_bis = hat_Rxs[:,:,upd_conv_ind]
if len(upd_conv_other_ind):
for f in range(self.nbFreqsSigRepr):
hat_Rxs_bis[f] -= (
np.dot(mix_matrix[upd_conv_other_ind,:,f].T,
hat_Rss[f][np.vstack(upd_conv_other_ind),
upd_conv_ind]))
for f in range(self.nbFreqsSigRepr):
try:
mix_matrix[upd_conv_ind,:,f] = (
np.linalg.solve(hat_Rss[f].T, hat_Rxs_bis[f].T))
except np.linalg.linalg.LinAlgError:
print "hat_Rss[f]:", hat_Rss[f]
print "hat_Rxs_bis[f]:", hat_Rxs_bis[f]
raise np.linalg.LinAlgError('Singular Matrix')
except:
raise # re-raise the exception if that was not linalgerror...
## smoothing
##for n in upd_conv_ind:
## for nc in range(self.audioObject.channels):
## smoothAbsMix = (
## st.medianFilter(np.abs(mix_matrix[n,nc,:]),
## length=self.nbFreqsSigRepr/200)
## )
## mix_matrix[n,nc,:] = (
## smoothAbsMix *
## np.exp(1j * np.angle(mix_matrix[n,nc,:]))
## )
# update the matrix in the component parameters:
for k, spat_comp_k in self.spat_comps.items():
if spat_comp_k['frdm_prior'] == 'free':
if spat_comp_k['mix_type'] == 'inst':
spat_comp_k['params'] = (
np.mean(mix_matrix[rank_part_ind[k]], axis=2)).T
else:
spat_comp_k['params'] = (
mix_matrix[rank_part_ind[k]])
# mix_matrix should have changed outside this method... TBC
# should we normalize here?
##self.renormalize_parameters()
[docs] def separate_spatial_filter_comp(self,
dir_results=None,
suffix=None):
"""separate_spatial_filter_comp
Separates the sources using only the estimated spatial
filter (i.e. the mixing parameters in self.spat_comps[j]['params'])
In particular, we consider here the corresponding MVDR filter,
as exposed in [Maazaoui2011]_.
per channel, the filter steering vector, source p:
.. math::
b(f,p) = \\frac{R_{aa,f}^{-1} a(f,p)}{a^{H}(f,p) R_{aa,f}^{-1} a(f,p)}
with
.. math::
R_{aa,f} = \\sum_q a(f,q) a^{H}(f,q)
It corresponds also to the given model in FASST, assuming that all the
spectral powers are equal across all sources. Here, by computing the Wiener
Gain WG to get the images, we actually have
.. math::
b(f,p) a(f,p)^H
and the denominator therefore is the trace of the \"numerator\".
.. [Maazaoui2011] Maazaoui, M.; Grenier, Y. and Abed-Meraim, K.
Blind Source Separation for Robot Audition using
Fixed Beamforming with HRTFs,
in proc. of INTERSPEECH, 2011.
"""
# grouping the indices by spatial component
spec_comp_ind = {}
for spat_ind in range(len(self.spat_comps)):
spec_comp_ind[spat_ind] = []
for spec_ind, spec_comp in self.spec_comps.items():
# add the spec comp index to the corresponding spatial comp:
spec_comp_ind[spec_comp['spat_comp_ind']].append(spec_ind)
# copying from separate_spec_comps - could modify that one later...
if dir_results is None:
dir_results = (
'/'.join(
self.audioObject.filename.split('/')[:-1])
)
if self.verbose:
print "Writing to same directory as input file: " + dir_results
nc = self.audioObject.channels
if nc != 2:
raise NotImplementedError()
nbSources = len(spec_comp_ind)
sigma_comps_diag = np.zeros([nbSources, 2,
self.nbFreqsSigRepr,
self.nbFramesSigRepr])
sigma_comps_off = np.zeros([nbSources,
self.nbFreqsSigRepr,
self.nbFramesSigRepr], dtype=np.complex)
# computing individual spatial variance
R_diag0 = np.zeros([nbSources, self.nbFreqsSigRepr])
R_diag1 = np.zeros([nbSources, self.nbFreqsSigRepr])
R_off = np.zeros([nbSources, self.nbFreqsSigRepr], dtype=np.complex)
for n in range(nbSources):
if self.spat_comps[n]['mix_type'] == 'inst':
raise NotImplementedError('Mixing params not convolutive...')
mix_coefficients = self.spat_comps[n]['params'].T
# mix_coefficients.shape should be (rank, nchannels)
elif self.spat_comps[n]['mix_type'] == 'conv':
mix_coefficients = self.spat_comps[n]['params']
# mix_coefficients.shape should be (rank, nchannels, freq)
# R_diag = np.zeros(2, self.nbFreqsSigRepr)
R_diag0[n] = np.atleast_1d(
(np.abs(mix_coefficients[:, 0])**2).sum(axis=0))
R_diag1[n] = np.atleast_1d(
(np.abs(mix_coefficients[:, 1])**2).sum(axis=0))
# element at (1,2):
R_off[n] = np.atleast_1d((
mix_coefficients[:, 0] *
np.conj(mix_coefficients[:, 1])).sum(axis=0))
Raa_00 = np.mean(R_diag0, axis=0)
Raa_11 = np.mean(R_diag1, axis=0)
Raa_01 = np.mean(R_off, axis=0)
inv_Raa_diag, inv_Raa_off, det_mat = inv_herm_mat_2d(
[Raa_00, Raa_11],
Raa_01, verbose=self.verbose)
if not hasattr(self, 'files'):
self.files = {}
self.files['spatial'] = []
fileroot = self.audioObject.filename.split('/')[-1][:-4]
for n in range(nbSources):
WG = self.compute_Wiener_gain_2d(
[R_diag0[n], R_diag1[n]],
R_off[n],
inv_Raa_diag,
inv_Raa_off,
timeInvariant=True)
normalization = np.real(WG[0,0] + WG[1,1])
WG /= [[normalization]]
if self.sig_repr_params['transf'] is 'stftold':
# compute the stft/istft
ndata = ao.filter_stft(
self.audioObject.data, WG, analysisWindow=None,
synthWindow=np.hanning(self.sig_repr_params['wlen']),
hopsize=self.sig_repr_params['hopsize'],
nfft=self.sig_repr_params['fsize'],
fs=self.audioObject.samplerate)
else:
#raise NotImplementedError("TODO")
X = []
for chan1 in range(nc):
self.tft.computeTransform(
self.audioObject.data[:,chan1])
X.append(self.tft.transfo)
ndata = []
if WG.ndim == 3:
for chan1 in range(nc):
self.tft.transfo = np.zeros([self.nbFreqsSigRepr,
self.nbFramesSigRepr],
dtype=np.complex)
for chan2 in range(nc):
self.tft.transfo += (
np.vstack(WG[chan1, chan2])
* X[chan2])
ndata.append(self.tft.invertTransform())
del self.tft.transfo
elif WG.ndim == 4:
for chan1 in range(nc):
self.tft.transfo = np.zeros([self.nbFreqsSigRepr,
self.nbFramesSigRepr],
dtype=np.complex)
for chan2 in range(nc):
self.tft.transfo += (
WG[chan1, chan2]
* X[chan2])
ndata.append(self.tft.invertTransform())
del self.tft.transfo
ndata = np.array(ndata).T
_suffix = '_spatial'
if suffix is not None and n in suffix:
_suffix += '_' + suffix[n]
outAudioName = (
dir_results + '/' + fileroot + '_' + str(n) +
'-' + str(nbSources) + _suffix + '.wav')
self.files['spatial'].append(outAudioName)
outAudioObj = ao.AudioObject(filename=outAudioName,
mode='w')
outAudioObj._data = np.int16(
ndata[:self.audioObject.nframes,:] *
self.audioObject._maxdata)#(2**15))
outAudioObj._maxdata = 1
outAudioObj._encoding = 'pcm16'
outAudioObj.samplerate = self.audioObject.samplerate
outAudioObj._write()
[docs] def separate_spat_comps(self,
dir_results=None,
suffix=None):
"""separate_spat_comps
This separates the sources for each spatial component.
"""
spec_comp_ind = {}
for spat_ind in range(len(self.spat_comps)):
spec_comp_ind[spat_ind] = []
for spec_ind, spec_comp in self.spec_comps.items():
spec_comp_ind[spec_comp['spat_comp_ind']].append(spec_ind)
self.separate_comps(dir_results=dir_results,
spec_comp_ind=spec_comp_ind,
suffix=suffix)
[docs] def separate_comps(self,
dir_results=None,
spec_comp_ind=None,
suffix=None):
"""separate_comps
Separate the sources as defined by the spectral
components provided in spec_comp_ind.
This function differs from separate_spat_comps in the way
that it does not assume the sources are defined by their spatial
positions.
Note: Trying to bring into one method
ozerov's separate_spec_comps and separate_spat_comps
"""
if dir_results is None:
dir_results = (
'/'.join(
self.audioObject.filename.split('/')[:-1])
)
if self.verbose:
print "Writing to same directory as input file: " + dir_results
nc = self.audioObject.channels
if nc != 2:
raise NotImplementedError()
if spec_comp_ind is None:
spec_comp_ind = {}
for spec_ind in range(len(self.spec_comps)):
spec_comp_ind[spec_ind] = [spec_ind,]
nbSources = len(spec_comp_ind)
sigma_comps_diag = np.zeros([nbSources, 2,
self.nbFreqsSigRepr,
self.nbFramesSigRepr])
sigma_comps_off = np.zeros([nbSources,
self.nbFreqsSigRepr,
self.nbFramesSigRepr], dtype=np.complex)
# computing individual source variance
for n in range(nbSources):
if self.verbose>1: print " source",n+1,"out of",nbSources
spat_comp_ind = np.unique(
[self.spec_comps[spec_ind]['spat_comp_ind']
for spec_ind in spec_comp_ind[n]]
)
if self.verbose>1: print " spat_comp_ind", spat_comp_ind
for spat_ind in spat_comp_ind:
if self.verbose>1:
print " spatial comp",spat_ind+1, \
"out of", (spat_comp_ind)
sigma_c_diag, sigma_c_off = (
self.compute_sigma_comp_2d(spat_ind, spec_comp_ind[n])
)
sigma_comps_diag[n] += sigma_c_diag
sigma_comps_off[n] += sigma_c_off
del sigma_c_diag, sigma_c_off
# deriving inverse of mix covariance:
inv_sigma_x_diag, inv_sigma_x_off = self.compute_inv_sigma_mix_2d(
sigma_comps_diag,
sigma_comps_off)
if not hasattr(self, "files"):
self.files = {}
self.files['spat_comp'] = []
if True: # self # IF TRANSFO is STFT !!!... 20130507 corrected now?
fileroot = self.audioObject.filename.split('/')[-1][:-4]
for n in range(nbSources):
# get the Wiener filters:
WG = self.compute_Wiener_gain_2d(
sigma_comps_diag[n],
sigma_comps_off[n],
inv_sigma_x_diag,
inv_sigma_x_off)
# compute the stft/istft
if self.sig_repr_params['transf'] == 'stftold':
ndata = ao.filter_stft(
self.audioObject.data, WG, analysisWindow=None,
synthWindow=np.hanning(self.sig_repr_params['wlen']),
hopsize=self.sig_repr_params['hopsize'],
nfft=self.sig_repr_params['fsize'],
fs=self.audioObject.samplerate)
else:
X = []
for chan1 in range(nc):
self.tft.computeTransform(
self.audioObject.data[:,chan1])
X.append(self.tft.transfo)
ndata = []
if WG.ndim == 3:
for chan1 in range(nc):
self.tft.transfo = np.zeros([self.nbFreqsSigRepr,
self.nbFramesSigRepr],
dtype=np.complex)
for chan2 in range(nc):
self.tft.transfo += (
np.vstack(WG[chan1, chan2])
* X[chan2])
ndata.append(self.tft.invertTransform())
del self.tft.transfo
elif WG.ndim == 4:
for chan1 in range(nc):
self.tft.transfo = np.zeros([self.nbFreqsSigRepr,
self.nbFramesSigRepr],
dtype=np.complex)
for chan2 in range(nc):
self.tft.transfo += (
WG[chan1, chan2]
* X[chan2])
ndata.append(self.tft.invertTransform())
del self.tft.transfo
ndata = np.array(ndata).T
_suffix = ''
if suffix is not None and n in suffix:
_suffix = '_' + suffix[n]
outAudioName = \
dir_results + '/' + fileroot + '_' + str(n) + \
'-' + str(nbSources) + _suffix + '.wav'
self.files['spat_comp'].append(outAudioName)
outAudioObj = ao.AudioObject(filename=outAudioName,
mode='w')
outAudioObj._data = np.int16(
ndata[:self.audioObject.nframes,:] *
self.audioObject._maxdata)#(2**15))
outAudioObj._maxdata = 1
outAudioObj._encoding = 'pcm16'
outAudioObj.samplerate = self.audioObject.samplerate
outAudioObj._write()
## TODO: else for the other transforms
## should work all the same, but with cqt, not very good
## means to cut signals and paste them back together...
[docs] def mvdr_2d(self,
theta,
distanceInterMic=.3,
):
"""mvdr_2d(self,
theta, # in radians
distanceInterMic=.3, # in meters
)
MVDR minimum variance distortion-less response spatial
filter, for a given angle theta and given distance between the mics.
self.Cx is supposed to provide the necessary covariance matrix, for
the \"Capon\" filter.
"""
Cx = np.copy(self.Cx)
Cx[0][:,:] = np.vstack(Cx[0].mean(axis=1))
Cx[1][:,:] = np.vstack(Cx[1].mean(axis=1))
Cx[2][:,:] = np.vstack(Cx[2].mean(axis=1))
if self.verbose>1:
print Cx
inv_Cx_diag, inv_Cx_off, det_Cx = inv_herm_mat_2d(
[Cx[0], Cx[2]],
Cx[1],
verbose=self.verbose)
freqs = (
np.arange(self.nbFreqsSigRepr) * 1. /
self.sig_repr_params['fsize'] * self.audioObject.samplerate
)
filt = gen_steer_vec_far_src_uniform_linear_array(
freqs,
nchannels=self.audioObject.channels,
theta=theta,
distanceInterMic=distanceInterMic)
W = np.zeros([self.audioObject.channels, # nc x nc x F x N
self.audioObject.channels,
self.nbFreqsSigRepr,
self.nbFramesSigRepr], dtype=np.complex)
den = (
np.vstack(np.abs(filt[0])**2) * inv_Cx_diag[0] +
np.vstack(np.abs(filt[1])**2) * inv_Cx_diag[1] +
2 * np.real(
np.vstack(np.conj(filt[0]) * filt[1]) * inv_Cx_off)
)
W[1,1] = (
np.vstack(filt[1] * np.conj(filt[0])) * inv_Cx_off
)
W[0,0] = (
np.conj(W[1,1]) +
np.vstack(np.abs(filt[0])**2) * inv_Cx_diag[0]
)
W[1,1] += (
np.vstack(np.abs(filt[1])**2) * inv_Cx_diag[1]
)
W[0,1] = (
np.vstack(np.abs(filt[0])**2) * inv_Cx_off +
np.vstack(filt[0] * np.conj(filt[1])) * np.conj(inv_Cx_diag[1])
)
W[1,0] = (
np.vstack(np.abs(filt[1])**2) * np.conj(inv_Cx_off) +
np.vstack(filt[1] * np.conj(filt[0])) * np.conj(inv_Cx_diag[0])
)
#if self.verbose>1:
# print W
# should check that self.sig_repr_params['transf'] == 'stft'
return ao.filter_stft(
self.audioObject.data,
W,
analysisWindow=np.hanning(self.sig_repr_params['wlen']),
synthWindow=np.hanning(self.sig_repr_params['wlen']),
hopsize=self.sig_repr_params['hopsize'],
nfft=self.sig_repr_params['fsize'],
fs=self.audioObject.samplerate)
[docs] def gcc_phat_tdoa_2d(self):
"""Using the cross-spectrum in self.Cx[1] to estimate the time
difference of arrival detection function (the Generalized Cross-
Correllation GCC), with the phase transform (GCC-PHAT) weighing
function for the cross-spectrum.
"""
return np.fft.irfft(self.Cx[1]/np.abs(self.Cx[1]),
n=self.sig_repr_params['fsize'],
axis=0)
[docs] def compute_sigma_comp_2d(self, spat_ind, spec_comp_ind):
"""only for stereo case self.audioObject.channels==2
"""
spat_comp_power = self.comp_spat_comp_power(
spat_comp_ind=spat_ind,
spec_comp_ind=spec_comp_ind)
# getting the mixing coefficients for corresponding
# spatial source, depending on mix_type
if self.spat_comps[spat_ind]['mix_type'] == 'inst':
mix_coefficients = self.spat_comps[spat_ind]['params'].T
# mix_coefficients.shape should be (rank, nchannels)
elif self.spat_comps[spat_ind]['mix_type'] == 'conv':
mix_coefficients = self.spat_comps[spat_ind]['params']
# mix_coefficients.shape should be (rank, nchannels, freq)
# R_diag = np.zeros(2, self.nbFreqsSigRepr)
R_diag0 = np.atleast_1d(
(np.abs(mix_coefficients[:, 0])**2).sum(axis=0))
R_diag1 = np.atleast_1d(
(np.abs(mix_coefficients[:, 1])**2).sum(axis=0))
# element at (1,2):
R_off = np.atleast_1d((
mix_coefficients[:, 0] *
np.conj(mix_coefficients[:, 1])).sum(axis=0))
sigma_comp_diag = np.zeros([2,
self.nbFreqsSigRepr,
self.nbFramesSigRepr])
if self.verbose>1:
print R_diag0, "R_diag0.shape", R_diag0.shape
print R_diag1, "R_diag1.shape", R_diag1.shape
print R_off, "R_off.shape", R_off.shape
sigma_comp_diag[0] = (
np.vstack(R_diag0) *
spat_comp_power)
sigma_comp_diag[1] = (
np.vstack(R_diag1) *
spat_comp_power)
sigma_comp_off = (
np.vstack(R_off) * spat_comp_power)
return sigma_comp_diag, sigma_comp_off
[docs] def compute_inv_sigma_mix_2d(self,
sigma_comps_diag,
sigma_comps_off):
"""only for nb channels = 2
sigma_comps_diag ncomp x nchan x nfreq x nframes
"""
sigma_x_diag = sigma_comps_diag.sum(axis=0)
sigma_x_off = sigma_comps_off.sum(axis=0)
for n in range(2):
sigma_x_diag[n] += np.vstack(self.noise['PSD'])
# noise PSD should be of size nbFreqs
inv_sigma_x_diag, inv_sigma_x_off, _ = (
inv_herm_mat_2d(sigma_x_diag, sigma_x_off,
verbose=self.verbose))
del sigma_x_diag, sigma_x_off
return inv_sigma_x_diag, inv_sigma_x_off
[docs] def compute_Wiener_gain_2d(self,
sigma_comp_diag,
sigma_comp_off,
inv_sigma_mix_diag,
inv_sigma_mix_off,
timeInvariant=False):
"""
Matlab FASST Toolbox help::
% WG = comp_WG_spat_comps(mix_str);
%
% compute Wiener gains for spatial components
%
%
% input
% -----
%
% mix_str : input mix structure
%
%
% output
% ------
%
% WG : Wiener gains [M x M x F x N x K_spat]
%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Flexible Audio Source Separation Toolbox (FASST), Version 1.0
%
% Copyright 2011 Alexey Ozerov, Emmanuel Vincent and Frederic Bimbot
% (alexey.ozerov -at- inria.fr, emmanuel.vincent -at- inria.fr,
% frederic.bimbot -at- irisa.fr)
%
% This software is distributed under the terms of the GNU Public
% License version 3 (http://www.gnu.org/licenses/gpl.txt)
%
% If you use this code please cite this research report
%
% A. Ozerov, E. Vincent and F. Bimbot
% \"A General Flexible Framework for the Handling of Prior
% Information in Audio Source Separation,\"
% IEEE Transactions on Audio, Speech and Signal Processing 20(4),
% pp. 1118-1133 (2012).
% Available: http://hal.inria.fr/hal-00626962/
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
"""
# Here, WG is given by the product:
# np.dot(sigma_comp
if timeInvariant:
WG = np.zeros([2, 2,
self.nbFreqsSigRepr,],
dtype=complex)# stands for Wiener Gains
else:
WG = np.zeros([2, 2,
self.nbFreqsSigRepr,
self.nbFramesSigRepr],
dtype=complex)# stands for Wiener Gains
WG[0,0] = sigma_comp_off * np.conj(inv_sigma_mix_off)
WG[1,1] = np.conj(WG[0,0])
WG[0,0] += sigma_comp_diag[0] * inv_sigma_mix_diag[0]
WG[1,1] += sigma_comp_diag[1] * inv_sigma_mix_diag[1]
WG[0,1] = (
sigma_comp_diag[0] * inv_sigma_mix_off +
sigma_comp_off * inv_sigma_mix_diag[1]
)
WG[1,0] = (
np.conj(sigma_comp_off) * inv_sigma_mix_diag[0] +
sigma_comp_diag[1] * np.conj(inv_sigma_mix_off)
)
return WG
[docs] def update_spectral_components(self, hat_W):
"""Update the spectral components,
with hat_W as the expected value of power
"""
if self.verbose:
print " Update the spectral components"
omega = self.nmfUpdateCoeff
nbspeccomp = len(self.spec_comps)
for spec_comp_ind, spec_comp in self.spec_comps.items():
nbfactors = len(spec_comp['factor'])
spat_comp_ind = spec_comp['spat_comp_ind']
# DEBUG
if self.lambdaCorr > 0: # min inter-src correlation approach
# this is the sum of all the spatial component powers
spat_comp_powers = np.maximum(self.comp_spat_cmps_powers(
self.spat_comps.keys()), eps)
### we need the squared of that matrix too:
##spat_comp_powers_sqd = spat_comp_powers ** 2
# the initial spatial comp. power of the current comp:
spat_comp_power = (
np.maximum(
self.comp_spat_comp_power(
spat_comp_ind,
#spec_comp_ind=[spec_comp_ind],
),
eps)
)
# ... and removing from the other powers - for correlation
# control:
spat_comp_pow_minus = spat_comp_powers - spat_comp_power
if np.all(spat_comp_pow_minus >=0): # DEBUG
warnings.warn(
"Not all spat_comp_pow_minus, "+
"%d negative values!" %np.sum(spat_comp_pow_minus >=0))
spat_comp_pow_minus = np.maximum(spat_comp_pow_minus, eps)
for fact_ind, factor in spec_comp['factor'].items():
# update FB - freq basis
other_fact_ind_arr = range(nbfactors)
other_fact_ind_arr.remove(fact_ind)
other_fact_power = (
np.maximum(
self.comp_spat_comp_power(
spat_comp_ind=spat_comp_ind,
spec_comp_ind=[spec_comp_ind],
factor_ind=other_fact_ind_arr),
eps)
)
if factor['FB_frdm_prior'] == 'free':
if self.verbose>1:
print " Updating frequency basis %d-%d" %(
spec_comp_ind, fact_ind)
spat_comp_power = (
np.maximum(
self.comp_spat_comp_power(
spat_comp_ind,
#spec_comp_ind=[spec_comp_ind]
),
eps)
)
#comp_num = hat_W[spat_comp_ind] / spat_comp_power**(2)
#comp_den = 1 / spat_comp_power
if len(factor['TB']):
H = np.dot(factor['TW'], factor['TB'])
else:
H = factor['TW']
FW_H = np.dot(factor['FW'], H).T
# denominator + correlation penalization
if self.lambdaCorr > 0:
corrPen = (
self.lambdaCorr
* spat_comp_pow_minus #np.maximum(spat_comp_powers,
# eps)
/ np.maximum(spat_comp_powers**2, eps)
)
else:
corrPen = 0.
comp_den = (
np.dot(other_fact_power *
(1. / spat_comp_power +
corrPen),
FW_H))
# numerator
if self.lambdaCorr > 0:
corrPen *= 2 *(
spat_comp_power
/ spat_comp_powers
)
comp_num = (
np.dot((hat_W[spat_comp_ind]
/ (spat_comp_power**2)
# np.maximum(spat_comp_power**(2), eps)
+ corrPen)
* other_fact_power,
FW_H))
factor['FB'] *= (
comp_num / np.maximum(comp_den, eps)) ** omega
del comp_num, comp_den, spat_comp_power, H, FW_H
# update FW - freq weight
if factor['FW_frdm_prior'] == 'free':
if self.verbose>1:
print " Updating frequency weights %d-%d" %(
spec_comp_ind, fact_ind)
spat_comp_power = (
np.maximum(
self.comp_spat_comp_power(
spat_comp_ind,
spec_comp_ind=[spec_comp_ind]),
eps)
)
if len(factor['TB']):
H = np.dot(factor['TW'], factor['TB'])
else:
H = factor['TW']
# denominator + correlation penalization
if self.lambdaCorr > 0:
corrPen = (
self.lambdaCorr
* np.maximum(spat_comp_pow_minus,#-spat_comp_power,
eps)
/ np.maximum(spat_comp_powers**2, eps)
)
else:
corrPen = 0.
comp_den = (
np.dot(factor['FB'].T,
np.dot(other_fact_power *
(1. / spat_comp_power +
corrPen),
#other_fact_power /
#spat_comp_power,
H.T))
)
# numerator
if self.lambdaCorr > 0:#
corrPen *= 2 *(
spat_comp_power
/ spat_comp_powers
)
comp_num = (
np.dot(factor['FB'].T,
np.dot((hat_W[spat_comp_ind]
/ (spat_comp_power**2) #np.maximum(spat_comp_power**2,eps)
+ corrPen)
* other_fact_power,
H.T))
)
factor['FW'] *= (
comp_num / np.maximum(comp_den, eps)) ** omega
del comp_num, comp_den, spat_comp_power, H
# update TW - time weights
if factor['TW_frdm_prior'] == 'free':
if factor['TW_constr'] == 'NMF':
if self.verbose>1:
print " Updating time weights %d-%d" %(
spec_comp_ind, fact_ind)
spat_comp_power = (
np.maximum(
self.comp_spat_comp_power(
spat_comp_ind,
spec_comp_ind=[spec_comp_ind]),
eps)
)
W = np.dot(factor['FB'], factor['FW'])
# correlation penalization
if self.lambdaCorr > 0:
corrPen = (
self.lambdaCorr
* np.maximum(spat_comp_pow_minus,
# - spat_comp_power,
eps)
/ np.maximum(spat_comp_powers**2, eps)
)
##if self.verbose>2: # DEBUG
## # pedantic :
## print "correlation stuff",
## print corrPen.mean(), (1./spat_comp_power).mean()
else:
corrPen = 0.
if len(factor['TB']):
# denominator
comp_den = (
np.dot(W.T,
np.dot(other_fact_power *
(1. / spat_comp_power +
corrPen),#other_fact_power /
#spat_comp_power,
factor['TB'].T)
)
)
# numerator
if self.lambdaCorr > 0:# corrPen > 0:
corrPen *= 2 *(
spat_comp_power
/ spat_comp_powers
)
comp_num = (
np.dot(W.T,
np.dot((hat_W[spat_comp_ind] /
(spat_comp_power**2) #np.maximum(spat_comp_power**2,
# eps)
+ corrPen)
* other_fact_power,
factor['TB'].T)
)
)
else:
# denominator
comp_den = (
np.dot(W.T,
other_fact_power *
(1. / spat_comp_power +
corrPen), #other_fact_power /
#spat_comp_power
)
)
# numerator
if self.lambdaCorr > 0:# corrPen > 0:
corrPen *= 2 *(
spat_comp_power
/ spat_comp_powers
)
##if self.verbose>5: # DEBUG to discover origin of NaN
## print "corrPen", corrPen
## print "other_fact_power", other_fact_power
## print "hat_W", hat_W[spat_comp_ind]
## print "squared", np.maximum(spat_comp_power**2,eps)
comp_num = (
np.dot(W.T,
other_fact_power * (hat_W[spat_comp_ind] /
(spat_comp_power**2)
+ corrPen)
)
)
##if self.verbose > 8: #DEBUG
## print "comp_num", comp_num
## print "comp_den", comp_den
factor['TW'] *= (
comp_num / np.maximum(comp_den, eps)) ** omega
del comp_num, comp_den, spat_comp_power, W
elif factor['TW_constr'] in ('GMM', 'GSMM', 'HMM', 'SHMM'):
warnings.warn(
"The GMM/GSMM/HMM still needs to be adapted "+
"to take into account the different factors. ")
nbfaccomps = factor['TW'].shape[0]
if self.verbose>1:
print " Updating time weights, "+\
"discrete state-based constraints"
if len(factor['TB']):
errorMsg = "In this implementation, "+\
"as in Ozerov's, non-trivial "+\
"time blobs TB is incompatible with "+\
"discrete state-based constraints for"+\
" the time weights TW"
raise AttributeError(errorMsg)
if not('TW_all' in factor):
factor['TW_all'] = (
np.outer(np.ones(nbfaccomps),
np.max(factor['TW'], axis=0))
)
if 'TW_DP_params' not in factor:
if factor['TW_constr'] in ('GMM', 'GSMM'):
# prior probabilities
factor['TW_DP_params'] = (
np.ones(nbfaccomps) /
np.double(nbfaccomps))
else:
# transition probabilities
factor['TW_DP_params'] = (
np.ones([nbfaccomps, nbfaccomps]) /
np.double(nbfaccomps))
if factor['TW_constr'] in ('GMM', 'HMM') and \
(np.max(factor['TW_all'])>1 or \
np.min(factor['TW_all'])<1):
factor['FB'] *= np.mean(factor['TW_all'])
factor['TW_all'][:] = 1.
if self.verbose:
print " Computing the Itakura Saito distance"+\
" matrix"
ISdivMatrix = np.zeros([nbfaccomps,
self.nbFramesSigRepr])
for compnb in range(nbfaccomps):
factor['TW'][:] = 0
factor['TW'][compnb] = factor['TW_all'][compnb]
if factor['TW_constr'] not in ('GMM', 'HMM'):
# re-estimating the weights for discrete
# state model with the constraint on the
# single state presence active.
# NB: for GMM and HMM, these weights are
# assumed to be 1
spat_comp_power = (
np.maximum(
self.comp_spat_comp_power(
spat_comp_ind,
spec_comp_ind=[spec_comp_ind],),
eps)
)
# NMF like updating for estimating the weight
Wbasis = np.dot(factor['FB'],
factor['FW'][:,compnb])
comp_num = (
np.dot(Wbasis,
hat_W[spat_comp_ind] /
np.maximum(spat_comp_power**2, eps))
)
comp_den = (
np.dot(Wbasis,
1 / spat_comp_power)
)
factor['TW'][compnb] *= (
comp_num /
np.maximum(comp_den, eps)
) ** omega
factor['TW_all'][compnb]=factor['TW'][compnb]
del comp_num, comp_den, spat_comp_power
# ratio to compute IS divergence between expected
# variance hat_W and the spatial component
# with the discrete state restriction
spat_comp_power = (
np.maximum(
self.comp_spat_comp_power(spat_comp_ind),
eps)
)
W_V_ratio = (
hat_W[spat_comp_ind] /
spat_comp_power)
ISdivMatrix[compnb] = (
np.sum(W_V_ratio
- np.log(np.maximum(W_V_ratio, eps))
- 1,axis=0)
)
del W_V_ratio, spat_comp_power
# decode the state sequence that minimizes the
# track in the IS div matrix, with best
# trade-off with the provided TW_DP_params
# (temporal constraints)
if self.verbose:
print " Decoding the state sequence"
if factor['TW_constr'] in ('GMM', 'GSMM'):
active_state_seq = (
np.argmin(
ISdivMatrix -
np.vstack(
np.log(factor['TW_DP_params'] + eps)),
axis=0)
)
del ISdivMatrix
elif factor['TW_constr'] in ('HMM', 'SHMM'):
if self.verbose:
print " Viterbi algorithm to "+\
"determine the active state sequence"
accumulateVec = (
ISdivMatrix[:,0] -
np.log(1. / nbfaccomps)
)
antecedentMat = np.zeros([nbfaccomps,
self.nbFramesSigRepr],
dtype=np.int32)
for n in range(1, self.nbFramesSigRepr):
tmpMat = (
np.vstack(accumulateVec) -
np.log(factor['TW_DP_params'] + eps))
antecedentMat[:,n] = (
np.argmin(tmpMat, axis=0)
)
accumulateVec += (
tmpMat[antecedentMat[:,n],
range(nbfaccomps)] +
ISdivMatrix[:,n]
)
# to avoid overflow?
accumulateVec -= accumulateVec.min()
del tmpMat
active_state_seq = np.zeros(self.nbFramesSigRepr,
dtype=np.int32)
active_state_seq[-1] = np.argmin(accumulateVec)
for framenb in range(self.nbFramesSigRepr-1,0,-1):
active_state_seq[framenb-1] = (
antecedentMat[active_state_seq[framenb],
framenb-1]
)
else:
raise NotImplementedError(
"No implementation for time constraint other "+
"than GMM, GSMM, HMM and SHMM")
if self.verbose:
print " Update Time Weights"
factor['TW'][:] = 0.
for framenb in range(self.nbFramesSigRepr):
factor['TW'][active_state_seq[framenb],framenb] = (
factor['TW_all'][active_state_seq[framenb],
framenb]
)
if factor['TW_DP_frdm_prior'] == 'free':
print " Updating the transition probabilities"
if factor['TW_constr'] in ('GMM', 'GSMM'):
for compnb in range(nbfaccomps):
factor['TW_DP_params'][compnb] = (
np.sum(active_state_seq==compnb) * 1. /
self.nbFramesSigRepr
)
elif factor['TW_constr'] in ('HMM', 'SHMM'):
for prevstate in range(nbfaccomps):
upd_den = np.sum(
active_state_seq[:-1]==prevstate)
if upd_den:
for nextstate in range(nbfaccomps):
upd_num = 1. * np.sum(
(active_state_seq[:-1]==
prevstate) *
(active_state_seq[1:]==
nextstate))
factor['TW_DP_params'][prevstate,
nextstate]=(
upd_num / upd_den
) # TODO: check this part
else:
raise NotImplementedError(
"Required time constraints not "+
"implemented.")
# update TB = time basis
if len(factor['TB']) and factor['TB_frdm_prior'] == 'free':
if self.verbose>1: print " Updating Time basis"
spat_comp_power = (
np.maximum(
self.comp_spat_comp_power(
spat_comp_ind,
spec_comp_ind=[spec_comp_ind],),
eps)
)
W = (
np.dot(np.dot(factor['FB'], factor['FW']),
factor['TW'])
)
# denominator + correlation penalization
if self.lambdaCorr > 0:
corrPen = (
self.lambdaCorr
* np.maximum(spat_comp_pow_minus,# - spat_comp_power,
eps)
/ np.maximum(spat_comp_powers**2, eps)
)
##if self.verbose>2:#DEBUG
## # pedantic :
## print corrPen.mean(), (1./spat_comp_power).mean()
else:
corrPen = 0.
comp_den = (
np.dot(W.T,
other_fact_power *
(1. / spat_comp_power +
corrPen))
)
# numerator
if self.lambdaCorr > 0:# corrPen > 0:
corrPen *= 2 *(
spat_comp_power
/ spat_comp_powers
)
comp_num = (
np.dot(W.T,
(hat_W[spat_comp_ind]
/ np.maximum(spat_comp_power**2, eps)
+ corrPen)
* other_fact_power)
)
factor['TB'] *= (
comp_num / np.maximum(comp_den, eps)) ** omega
del comp_num, comp_den, spat_comp_power, W
[docs] def renormalize_parameters(self):
"""renormalize_parameters
Re-normalize the components
"""
if self.verbose>0:
print " re-normalizing components"
pass
if self.verbose>1:
print " normalizing spatial components..."
# renormalize spatial components
Kspat = len(self.spat_comps)
spat_global_energy = np.zeros(Kspat)
for spat_ind, spat_comp in self.spat_comps.items():
spat_global_energy[spat_ind] = (
np.mean (np.abs(spat_comp['params'])**2))
spat_comp['params'] /= np.sqrt(spat_global_energy[spat_ind])
if self.verbose>5:
print "spat_global_energy", spat_global_energy
# renormalize spectral components
Kspec = len(self.spec_comps)
for spec_ind, spec_comp in self.spec_comps.items():
global_energy = spat_global_energy[spec_comp['spat_comp_ind']]
nbfactors = len(spec_comp['factor'])
for fact_ind, factor in spec_comp['factor'].items():
factor['FB'] *= global_energy
w = factor['FB'].max(axis=0)#.mean(axis=0)
w[w==0] = 1.
factor['FB'] /= w
factor['FW'] *= np.vstack(w)
if factor['TW_constr'] not in ('GMM', 'HMM'):
w = factor['FW'].mean(axis=0)
w[w==0] = 1.
factor['FW'] /= w
factor['TW'] *= np.vstack(w)
# Only testing this: in order to avoid
# big crash, if for one factor, everything in TW
# turns out to get 0, then "restart" it with random
if np.sum(factor['TW']) < eps:
factor['TW'] = np.random.randn(*factor['TW'].shape)**2
factor['TW'] *= 1e3 * eps # so it s not too small
if self.verbose:
print " renorm: reinitialized TW for spec",
print spec_ind, "factor", fact_ind
if len(factor['TB']):
w = factor['TB'].mean(axis=1)
w[w==0] = 1.
factor['TB'] /= np.vstack(w)
factor['TW'] *= w
global_energy = factor['TW'].mean()
if fact_ind < (nbfactors - 1):
factor['TW'] /= global_energy
else:
raise NotImplementedError(
"Temporal discrete state mngmt not done yet. ")
[docs] def setComponentParameter(self, newValue, spec_ind, fact_ind=0,
partLabel='FB', prior='free',
keepDimensions=True):
"""A helper function to set a
self.spec_comp[spec_ind]['factor'][fact_ind][partLabel] to
the given value.
TODO 20130522 finish this function to make it general purpose...
"""
###### DEBUG #####
print "NOT IMPLEMENTED YET, PLEASE SET THE COMPONENTS DIRECTLY"
pass
###### DEBUG #####
if keepDimenstions:
if (newValue.shape !=
self.spec_comp[spec_ind]['factor'][fact_ind][partLabel].shape):
raise ValueError("the provided value does not have the correct"+
" size:"+str(newValue.shape)+
" instead of "+
str(
self.spec_comp[spec_ind]['factor'][fact_ind][partLabel].shape))
else:
# nightmare of error checking for sizes...
if partLabel == 'FB':
newShape = newValue.shape
oldShape = self.spec_comp[
spec_ind]['factor'][fact_ind]['FB'].shape
if newShape[0] != self.nbFreqsSigRepr:
raise ValueError("FB: cannot change dimension of "+
"signal representation.")
if newShape[1] != oldShape[1]:
if self.verbose:
print " Changing the Freq Weights for FB:"
self.spec_comp[spec_ind]['factor'][fact_ind]['FB']
self.spec_comp[spec_ind]['factor'][fact_ind]['FB'] = newValue
elif partLabel == 'FW':
pass
elif partLabel == 'TW':
pass
elif partLabel == 'TB':
pass
else:
raise ValueError("No such thing as "+
partLabel+
" in components!")
self.spec_comp[spec_ind]['factor'][fact_ind][
partLabel+'_frdm_prior'] = prior
[docs] def initialize_all_spec_comps_with_NMF(self,
sameInitAll=False,
**kwargs):
"""Computes an NMF on the one-channel mix (averaging diagonal
of self.Cx, which are the power spectra of the corresponding
channel)
.. math::
C_x \\approx W H
then, for all spec_comp in self.spec_comps, we set::
spec_comp['FB'] = W
spec_comp['TW'] = H
"""
if sameInitAll:
# initialize the components with the same parameters
return self.initialize_all_spec_comps_with_NMF_same(**kwargs)
else:
# initialize the components with individual params,
# in particular, initializing the NMF with the available
# components (but only with factor 0)
return self.initialize_all_spec_comps_with_NMF_indiv(**kwargs)
[docs] def initialize_all_spec_comps_with_NMF_indiv(self, niter=10,
updateFreqBasis=True,
updateTimeWeight=True,
**kwargs):
"""initialize the spectral components with an NMF decomposition,
with individual decomposition of the monophonic signal TF
representation.
TODO make keepFBind and keepTWind, in order to provide
finer control on which indices are updated. Also requires
a modified NMF decomposition function.
"""
# list of the sizes of the 0th factors, of all components
nbSpecComps = [spec_comp['factor'][0]['FB'].shape[1]
for spec_comp in self.spec_comps.values()]
totalNMFComps = np.sum(nbSpecComps)
# initializing the NMF FreqBasis (FB) and TimeWeight (TW)
# with the corresponding quantities in self.spec_comps:
FBinit = np.zeros([self.nbFreqsSigRepr, totalNMFComps])
TWinit = np.zeros([totalNMFComps, self.nbFramesSigRepr])
for spec_ind, spec_comp in self.spec_comps.items():
ind_start = np.sum(nbSpecComps[:spec_ind])
ind_stop = ind_start + nbSpecComps[spec_ind]
FBinit[:,ind_start:ind_stop] = (
spec_comp['factor'][0]['FB'])
TWinit[ind_start:ind_stop] = (
spec_comp['factor'][0]['TW'])
# computing the monaural signal representation
# summing the contributions over all the channels:
nc = self.audioObject.channels
Cx = np.copy(np.real(self.Cx[0]))
for chan in range(1, nc):
# stored an "efficient" way, so index "complicated":
index = np.sum(np.arange(nc, nc-chan, -1))
Cx += np.real(self.Cx[index])
Cx /= np.double(nc)
W, H = NMF_decomp_init(SX=Cx, nbComps=totalNMFComps,
niter=niter, verbose=self.verbose,
Winit=FBinit, Hinit=TWinit,
updateW=updateFreqBasis,
updateH=updateTimeWeight)
# copy the result in the corresponding spec_comps:
for spec_ind, spec_comp in self.spec_comps.items():
ind_start = np.sum(nbSpecComps[:spec_ind])
ind_stop = ind_start + nbSpecComps[spec_ind]
if updateFreqBasis:
spec_comp['factor'][0]['FB'] = (
np.maximum(
W[:,ind_start:ind_stop],
eps)
)
if updateTimeWeight:
spec_comp['factor'][0]['TW'] = (
np.maximum(H[ind_start:ind_stop],eps))
self.renormalize_parameters()
[docs] def initialize_all_spec_comps_with_NMF_same(self, niter=10,
**kwargs):
"""
Initialize all the components with the same amplitude and spectral
matrices `W` and `H`.
"""
if not np.all([len(spec_comp['factor'])==1
for spec_comp in self.spec_comps.values()]):
raise NotImplementedError(
"NMF init not implemented for multi factor models.")
nbSpecComps = [spec_comp['factor'][0]['FB'].shape[1]
for spec_comp in self.spec_comps.values()]
nbComps = np.max(nbSpecComps)
nc = self.audioObject.channels
# computing the signal representation
Cx = np.copy(np.real(self.Cx[0]))
for chan in range(1, nc):
# stored an "efficient" way, so index "complicated":
index = np.sum(np.arange(nc, nc-chan, -1))
Cx += np.real(self.Cx[index])
Cx /= np.double(nc)
# computing NMF of Cx:
W, H = NMF_decomposition(SX=Cx, verbose=self.verbose,
nbComps=nbComps, niter=niter)
# reordering so that most energy in first components
Hsum = H.sum(axis=1)
indexSort = np.argsort(Hsum)[::-1]
W = W[:,indexSort]
H = H[indexSort]
for spec_comp in self.spec_comps.values():
ncomp = spec_comp['factor'][0]['FB'].shape[1]
spec_comp['factor'][0]['FB'][:] = W[:, :ncomp]
spec_comp['factor'][0]['TW'][:] = H[:ncomp]
self.renormalize_parameters()
[docs] def initializeConvParams(self, initMethod='demix'):
"""setting the spatial parameters
"""
nc = self.audioObject.channels
for spat_ind, spat_comp in self.spat_comps.items():
if spat_comp['mix_type'] != 'inst':
warnings.warn("Spatial component %d "%spat_ind+
"already not instantaneous, overwriting...")
# spat_comp['time_dep'] = 'indep'
spat_comp['mix_type'] = 'conv'
# spat_comp['frdm_prior'] = 'free'
if initMethod == 'demix':
maxclusters = max(40, 10 * len(self.spat_comps))
neighbours = 15
# default for demix to work best: #FIXME!!!
wlen = self.demixParams['wlen']# 20482048
hopsize = self.demixParams['hopsize']#1024
demixInst = demix.DEMIX(
audio=self.audioObject.filename,
nsources=len(self.spat_comps), # spatial comps for demix
wlen=wlen,
hopsize=hopsize,
neighbors=neighbours,
verbose=self.verbose,
maxclusters=maxclusters)
demixInst.comp_pcafeatures()
demixInst.comp_parameters()
demixInst.init_subpts_set()
demixInst.comp_clusters()
demixInst.refine_clusters()
# mixing parameters from DEMIX estimation:
# results in an nsrc x nfreqs x nc array
A = demixInst.steeringVectorsFromCentroids()
del demixInst
elif 'rand' in initMethod:
A = (
np.random.randn(len(self.spat_comps),
self.nbFreqsSigRepr,
nc,)
+ 1j * np.random.randn(len(self.spat_comps),
self.nbFreqsSigRepr,
nc,)
)
else:
raise ValueError("Init method not implemented.")
# filling the spatial components:
for nspat, (spat_ind, spat_comp) in enumerate(self.spat_comps.items()):
spat_comp_param_inst = spat_comp['params']
spat_comp['params'] = np.zeros([self.rank[nspat],
nc,
self.nbFreqsSigRepr],
dtype=np.complex)
for r in range(self.rank[nspat]):
spat_comp['params'][r] = (
A[spat_ind].T
)
[docs]class MultiChanNMFInst_FASST(FASST):
"""MultiChanNMFInst_FASST
sub-classes FASST
This class implements the Multi-channel Non-Negative Matrix Factorisation
(NMF)
"""
def __init__(self, audio,
nbComps=3, nbNMFComps=4,
spatial_rank=2,
**kwargs):
"""
**DESCRIPTION**
**ARGUMENTS**
nbComps (int)
The number of (spatial) components in FASST framework.
nbNMFComps (int)
The number of NMF components in each spatial component.
"""
super(MultiChanNMFInst_FASST, self).__init__(audio=audio, **kwargs)
self.comp_transf_Cx()
self.nbComps = nbComps
self.nbNMFComps = nbNMFComps
self.rank = np.atleast_1d(spatial_rank)
if self.rank.size < self.nbComps:
self.rank = [self.rank[0],] * self.nbComps
self._initialize_structures()
def _initialize_structures(self): #, nbComps, nbNMFComps, spatial_rank):
nc = self.audioObject.channels
self.spat_comps = {}
self.spec_comps = {}
for j in range(self.nbComps):
# initialize the spatial component
self.spat_comps[j] = {}
self.spat_comps[j]['time_dep'] = 'indep'
self.spat_comps[j]['mix_type'] = 'inst'
self.spat_comps[j]['frdm_prior'] = 'free'
self.spat_comps[j]['params'] = np.random.randn(nc, self.rank[j])
if nc == 2: # spreading the sources evenly for init on stereo
self.spat_comps[j]['params'] = (
np.array([np.sin((j+1) * np.pi / (2.*(self.nbComps + 1))) +
np.random.randn(self.rank[j])*np.sqrt(0.01),
np.cos((j+1) * np.pi / (2.*(self.nbComps + 1))) +
np.random.randn(self.rank[j])*np.sqrt(0.01)]))
# initialize single factor spectral component
self.spec_comps[j] = {}
self.spec_comps[j]['spat_comp_ind'] = j
self.spec_comps[j]['factor'] = {}
self.spec_comps[j]['factor'][0] = {}
self.spec_comps[j]['factor'][0]['FB'] = (
0.75 * np.abs(np.random.randn(self.nbFreqsSigRepr,
self.nbNMFComps)) +
0.25)
self.spec_comps[j]['factor'][0]['FW'] = (
np.eye(self.nbNMFComps))
self.spec_comps[j]['factor'][0]['TW'] = (
0.75 * np.abs(np.random.randn(self.nbNMFComps,
self.nbFramesSigRepr)) +
0.25)
self.spec_comps[j]['factor'][0]['TB'] = []
self.spec_comps[j]['factor'][0]['FB_frdm_prior'] = 'free'
self.spec_comps[j]['factor'][0]['FW_frdm_prior'] = 'fixed'
self.spec_comps[j]['factor'][0]['TW_frdm_prior'] = 'free'
self.spec_comps[j]['factor'][0]['TB_frdm_prior'] = []
self.spec_comps[j]['factor'][0]['TW_constr'] = 'NMF'
self.renormalize_parameters()
[docs] def setSpecCompFB(self, compNb, FB, FB_frdm_prior='fixed'):
"""SetSpecCompFB
sets the spectral component's frequency basis.
"""
speccomp = self.spec_comps[compNb]['factor'][0]
if self.nbFreqsSigRepr != FB.shape[0]:
raise AttributeError("Size of provided FB is not consistent"+
" with inner attributes")
speccomp['FB'] = np.copy(FB)
ncomp = FB.shape[1]
speccomp['FW'] = np.eye(ncomp)
speccomp['TW'] = (
0.75 * np.abs(np.random.randn(ncomp,
self.nbFramesSigRepr)) +
0.25)
speccomp['FB_frdm_prior'] = FB_frdm_prior
[docs]class MultiChanNMFConv(MultiChanNMFInst_FASST):
"""Takes the multichannel NMF instantaneous class, and makes it
convolutive!
"""
def __init__(self, audio,
nbComps=3, nbNMFComps=4,
spatial_rank=2,
**kwargs):
super(MultiChanNMFConv, self).__init__(audio=audio,
nbComps=nbComps,
nbNMFComps=nbNMFComps,
spatial_rank=spatial_rank,
**kwargs)
# self.makeItConvolutive()
# DIY: upgrade to convolutive after a few instantaneous, maybe?
[docs] def makeItConvolutive(self):
nc = self.audioObject.channels
for nspat, (spat_ind, spat_comp) in enumerate(self.spat_comps.items()):
if spat_comp['mix_type'] != 'inst':
warnings.warn("Spatial component %d "%spat_ind+
"already not instantaneous, skipping...")
else:
# spat_comp['time_dep'] = 'indep'
spat_comp['mix_type'] = 'conv'
# spat_comp['frdm_prior'] = 'free'
spat_comp_param_inst = spat_comp['params']
spat_comp['params'] = np.zeros([self.rank[nspat],
nc,
self.nbFreqsSigRepr],
dtype=np.complex)
for f in range(self.nbFreqsSigRepr):
spat_comp['params'][:,:,f] = spat_comp_param_inst.T
[docs]class MultiChanHMM(MultiChanNMFConv):
def __init__(self, audio,
nbComps=3, nbNMFComps=4,
spatial_rank=2,
**kwargs):
super(MultiChanHMM, self).__init__(audio=audio,
nbComps=nbComps,
nbNMFComps=nbNMFComps,
spatial_rank=spatial_rank,
**kwargs)
[docs] def makeItHMM(self):
"""
Turns the required parameters into HMM time constraints
"""
for spec_ind, spec_comp in self.spec_comps.items():
for fac_ind, factor in spec_comp['factor'].items():
factor['TW_constr'] = 'HMM'
factor['TW_DP_frdm_prior'] = 'free'
[docs] def makeItSHMM(self):
"""
Turns the required parameters into SHMM time constraints
"""
for spec_ind, spec_comp in self.spec_comps.items():
for fac_ind, factor in spec_comp['factor'].items():
nbfaccomps = factor['TW'].shape[0]
factor['TW_constr'] = 'SHMM'
factor['TW_DP_params'] = (
9 * np.eye( nbfaccomps)
)
factor['TW_DP_params'] += 1.
factor['TW_DP_params'] /= (
np.vstack(factor['TW_DP_params'].sum(axis=1)))
factor['TW_DP_frdm_prior'] = 'fixed'
# factor['TW_DP_frdm_prior'] = 'free'
[docs]class multiChanSourceF0Filter(FASST):
"""multi channel source/filter model
nbcomps components, nbcomps-1 SF models, 1 residual component
"""
def __init__(self, audio,
nbComps=3,
nbNMFResComps=1,
nbFilterComps=20,
nbFilterWeigs=[4,],
minF0=39, maxF0=2000, minF0search=80, maxF0search=800,
stepnoteF0=16, chirpPerF0=1,
spatial_rank=1,
sparsity=None,
**kwargs):
"""
**DESCRIPTION**
__init__(self, audio,
nbComps=3, ## nb of components
nbNMFResComps=3, ## nb of residual components
nbFilterComps=20, ## nb of filter components
nbFilterWeigs=4, ## nb of filter components
minF0=80, maxF0=800, ## range for comb spectra
stepnoteF0=4, chirpPerF0=1,
spatial_rank=1,
sparsity=None,
**kwargs)
**ARGUMENTS**
nbComps (int)
The number of (spatial) components in FASST framework.
nbNMFComps (int)
The number of NMF components in each spatial component.
sparsity (list of size 1 or nbComps)
"""
super(multiChanSourceF0Filter, self).__init__(audio=audio, **kwargs)
self.comp_transf_Cx()
self.sourceParams = {'minF0': minF0,
'maxF0': maxF0,
'stepnoteF0': stepnoteF0,
'chirpPerF0': chirpPerF0,
'minF0search': minF0search,
'maxF0search': maxF0search,}
# __c quoi ca...__ 'chirpPerF02072': chirpPerF0}
self.nbComps = nbComps
self.nbNMFResComps = nbNMFResComps
self.nbFilterComps = nbFilterComps
if len(nbFilterWeigs) < self.nbComps - 1:
self.nbFilterWeigs = [nbFilterWeigs[0],] * self.nbComps
else:
self.nbFilterWeigs = nbFilterWeigs
# initialize the spatial_ranks, reformating here.
# 20130611 TODO check that it does not break too much everywhere!
self.spatial_rank = np.atleast_1d(spatial_rank)
if self.spatial_rank.size < self.nbComps:
self.spatial_rank = [self.spatial_rank[0],] * self.nbComps
# the source dictionary is shared among all the components,
# so storing it one for all:
self.F0Table, WF0, trfoBis = (
SLS.slf.generate_WF0_TR_chirped(
transform=self.tft,
minF0=minF0, maxF0=maxF0,
stepNotes=stepnoteF0,
Ot=0.5, perF0=chirpPerF0,
depthChirpInSemiTone=0.5, loadWF0=True,
verbose=self.verbose,)
)
# removing patterns in low energy bins - setting to eps:
for nwf0comp in range(WF0.shape[1]):
indLowEnergy = np.where(WF0[:,nwf0comp]<WF0[:,nwf0comp].max()*1e-4)
WF0[indLowEnergy, nwf0comp] = eps
self.sourceFreqComps = (
np.ascontiguousarray(
np.hstack([WF0[:self.nbFreqsSigRepr],
np.vstack(np.ones(self.nbFreqsSigRepr))]))
)
del WF0
self.nbSourceComps = self.sourceFreqComps.shape[1]
self.sourceFreqWeights = np.eye(self.nbSourceComps)
# ... and the same for the filter part
self.filterFreqComps = (
generateHannBasis(
numberFrequencyBins=self.nbFreqsSigRepr,
sizeOfFourier=self.sig_repr_params['fsize'],
Fs=self.audioObject.samplerate,
frequencyScale='linear',
numberOfBasis=self.nbFilterComps)
)
self.sparsity = sparsity
self._initialize_structures()
def _initialize_structures(self, seed=None):
"""initialize the structures for the model.
"""
np.random.seed(seed) # essential for DEBUG
self.rank = self.spatial_rank
nc = self.audioObject.channels
sparsity = self.sparsity
self.spat_comps = {}
self.spec_comps = {}
for j in range(self.nbComps - 1):
# initialize the spatial component
self.spat_comps[j] = {}
self.spat_comps[j]['time_dep'] = 'indep'
self.spat_comps[j]['mix_type'] = 'inst'
self.spat_comps[j]['frdm_prior'] = 'free'
self.spat_comps[j]['params'] = np.random.randn(nc, self.rank[j])
if nc == 2: # spreading the sources evenly for init on stereo
self.spat_comps[j]['params'] = (
np.array([np.sin((j+1) * np.pi / (2.*(self.nbComps))) +
np.random.randn(self.rank[j])*np.sqrt(0.01),
np.cos((j+1) * np.pi / (2.*(self.nbComps))) +
np.random.randn(self.rank[j])*np.sqrt(0.01)]))
# initialize source factor spectral component
self.spec_comps[j] = {}
self.spec_comps[j]['spat_comp_ind'] = j
self.spec_comps[j]['factor'] = {}
self.spec_comps[j]['factor'][0] = {}
self.spec_comps[j]['factor'][0]['FB'] = self.sourceFreqComps
self.spec_comps[j]['factor'][0]['FW'] = self.sourceFreqWeights
self.spec_comps[j]['factor'][0]['TW'] = (
0.75 * np.abs(np.random.randn(self.nbSourceComps,
self.nbFramesSigRepr)) +
0.25)
self.spec_comps[j]['factor'][0]['TB'] = []
self.spec_comps[j]['factor'][0]['FB_frdm_prior'] = 'fixed'
self.spec_comps[j]['factor'][0]['FW_frdm_prior'] = 'fixed'
self.spec_comps[j]['factor'][0]['TW_frdm_prior'] = 'free'
self.spec_comps[j]['factor'][0]['TB_frdm_prior'] = []
self.spec_comps[j]['factor'][0]['TW_constr'] = 'NMF'
# initialize filter factor spectral components
self.spec_comps[j]['factor'][1] = {}
self.spec_comps[j]['factor'][1]['FB'] = self.filterFreqComps
self.spec_comps[j]['factor'][1]['FW'] = (
0.75 * np.abs(np.random.randn(self.nbFilterComps,
self.nbFilterWeigs[j])) +
0.25)
self.spec_comps[j]['factor'][1]['TW'] = (
0.75 * np.abs(np.random.randn(self.nbFilterWeigs[j],
self.nbFramesSigRepr)) +
0.25)
self.spec_comps[j]['factor'][1]['TB'] = []
self.spec_comps[j]['factor'][1]['FB_frdm_prior'] = 'fixed'
self.spec_comps[j]['factor'][1]['FW_frdm_prior'] = 'free'
self.spec_comps[j]['factor'][1]['TW_frdm_prior'] = 'free'
self.spec_comps[j]['factor'][1]['TB_frdm_prior'] = []
self.spec_comps[j]['factor'][1]['TW_constr'] = 'NMF'
# residual component:
self.resSpatialRank = self.rank[-1]#2
j = self.nbComps - 1
# initialize the spatial component
self.spat_comps[j] = {}
self.spat_comps[j]['time_dep'] = 'indep'
self.spat_comps[j]['mix_type'] = 'inst'
self.spat_comps[j]['frdm_prior'] = 'free'
self.spat_comps[j]['params'] = np.random.randn(nc, self.resSpatialRank)
# 20120920 trying no initialization for residual:
##if nc == 2: # spreading the sources evenly for init on stereo
## self.spat_comps[j]['params'] = (
## np.array([np.sin((j+1) * np.pi / (2.*(self.nbComps + 1))) +
## np.random.randn(self.resSpatialRank)*np.sqrt(0.01),
## np.cos((j+1) * np.pi / (2.*(self.nbComps + 1))) +
## np.random.randn(self.resSpatialRank)*np.sqrt(0.01)]))
# initialize single factor spectral component
self.spec_comps[j] = {}
self.spec_comps[j]['spat_comp_ind'] = j
self.spec_comps[j]['factor'] = {}
self.spec_comps[j]['factor'][0] = {}
self.spec_comps[j]['factor'][0]['FB'] = (
0.75 * np.abs(np.random.randn(self.nbFreqsSigRepr,
self.nbNMFResComps)) +
0.25)
self.spec_comps[j]['factor'][0]['FW'] = (
np.eye(self.nbNMFResComps))
self.spec_comps[j]['factor'][0]['TW'] = (
0.75 * np.abs(np.random.randn(self.nbNMFResComps,
self.nbFramesSigRepr)) +
0.25)
self.spec_comps[j]['factor'][0]['TB'] = []
self.spec_comps[j]['factor'][0]['FB_frdm_prior'] = 'free'
self.spec_comps[j]['factor'][0]['FW_frdm_prior'] = 'fixed'
self.spec_comps[j]['factor'][0]['TW_frdm_prior'] = 'free'
self.spec_comps[j]['factor'][0]['TB_frdm_prior'] = []
self.spec_comps[j]['factor'][0]['TW_constr'] = 'NMF'
if sparsity is None or len(sparsity) not in (1, self.nbComps):
for j in range(self.nbComps):
self.spec_comps[j]['sparsity'] = False
elif len(sparsity) == self.nbComps:
# sparsity induces a "sparse" activation of
# self.spec_comps[j]['factor'][0]['TW'], that is,
# the time weights for the source part.
# This is implemented as in:
# Durrieu, J.-L. & Thiran, J.-P.
# Sparse Non-Negative Decomposition Of Speech Power Spectra For
# Formant Tracking
# in proc. of the IEEE International Conference on Acoustics,
# Speech and Signal Processing, Pragues, Czech Republic, 2011.
#
# This means that at each GEM iteration, the TW coefficients
# are further shrinked down to be concentrating around a
# single component (a single F0 in SF model)
for j in range(self.nbComps):
self.spec_comps[j]['sparsity'] = sparsity[j]
else:
for j in range(self.nbComps):
self.spec_comps[j]['sparsity'] = sparsity[0]
self.renormalize_parameters()
[docs] def initSpecCompsWithLabelAndFiles(self, instrus=[], instru2modelfile={},
freqBasisAdaptive='fixed'):
"""Initialize the spectral components with the instrument labels as
well as with the components stored in the provided dictionary in
`instru2modelfile`
`instrus` is a list with labels:
`'SourceFilter'`:
keep the intialized source filter model
`'Free_<nb_comp>'`:
initialize the model with an adaptable
spectral component using `nb_comp` elements in the NMF
frequency basis
`<key_in_instru2modelfile>`:
initialize with the :py:class:GSMM
available and stored in the archive npz with filename
`instru2modelfile[key_in_instru2modelfile]`
NB: needs the gmm-gsmm module to be installed and in the pythonpath
"""
instrumentNames = {}
for n, i in enumerate(instrus):
instrumentNames[n] = i
if i == 'SourceFilter':
self.spec_comps[n]['label'] = i
print " Source", n, "left as general Source-Filter model."
elif 'Free' in i: # assumes Free_nbNMFComps
nbNMFComps = int(i.split('_')[-1])
print " Source", n, "set as free NMF source."
# initialize single factor spectral component
self.spec_comps[n] = {}
self.spec_comps[n]['label'] = i
self.spec_comps[n]['spat_comp_ind'] = n
self.spec_comps[n]['factor'] = {}
self.spec_comps[n]['factor'][0] = {}
self.spec_comps[n]['factor'][0]['FB'] = (
0.75 * np.abs(np.random.randn(self.nbFreqsSigRepr,
nbNMFComps)) +
0.25)
self.spec_comps[n]['factor'][0]['FW'] = (
np.eye(nbNMFComps))
self.spec_comps[n]['factor'][0]['TW'] = (
0.75 * np.abs(np.random.randn(nbNMFComps,
self.nbFramesSigRepr)) +
0.25)
self.spec_comps[n]['factor'][0]['TB'] = []
self.spec_comps[n]['factor'][0]['FB_frdm_prior'] = 'free'
self.spec_comps[n]['factor'][0]['FW_frdm_prior'] = 'fixed'
self.spec_comps[n]['factor'][0]['TW_frdm_prior'] = 'free'
self.spec_comps[n]['factor'][0]['TB_frdm_prior'] = []
self.spec_comps[n]['factor'][0]['TW_constr'] = 'NMF'
# sparsity stuff
sparsity = self.sparsity
if sparsity is None or len(sparsity) not in (1, self.nbComps):
self.spec_comps[n]['sparsity'] = False
elif len(sparsity) == self.nbComps:
self.spec_comps[n]['sparsity'] = sparsity[n]
else:
self.spec_comps[n]['sparsity'] = sparsity[0]
else: #if i != 'SourceFilter':
print " Source", n, "is", i
modelfile = instru2modelfile[i]
struc = np.load(modelfile)
gsmm = struc['gsmm'].tolist()
# Keeping only spectra that are not flat:
decisionSpectra = np.any(np.diff(gsmm.sigw, axis=1)!=0, axis=1)
# keeping only the spectra with enough weight:
# hard decision, remove all spectra with w == min(w)
# decisionOnWeight = np.where(gsmm.w!=gsmm.w.min())[0]
# harder decision: remove all with w under a threshold:
decisionOnWeight = (gsmm.w > gsmm.w.max()*1e-3)
keepIndex = np.where(decisionSpectra+decisionOnWeight)[0]
FB = np.ascontiguousarray(gsmm.sigw[keepIndex].T)
#self.setSpecCompFB(compNb=n, FB=FB, FB_frdm_prior='fixed')
self.setSpecCompFB(compNb=n, FB=FB,
FB_frdm_prior=freqBasisAdaptive)
self.spec_comps[n]['label'] = i
struc.close()
return instrumentNames
[docs] def setSpecCompFB(self, compNb, FB, FB_frdm_prior='fixed',):
"""SetSpecCompFB
sets the spectral component's frequency basis.
"""
speccomp = self.spec_comps[compNb]['factor'][0]
if self.nbFreqsSigRepr != FB.shape[0]:
raise AttributeError("Size of provided FB is not consistent"+
" with inner attributes")
speccomp['FB'] = np.copy(FB)
ncomp = FB.shape[1]
speccomp['FW'] = np.eye(ncomp)
speccomp['TW'] = (
0.75 * np.abs(np.random.randn(ncomp,
self.nbFramesSigRepr)) +
0.25)
speccomp['FB_frdm_prior'] = FB_frdm_prior
[docs] def initializeFreeMats(self, niter=10):
"""initialize free matrices, with NMF decomposition
"""
# we initialize the matrices with NMF decomposition using the
# source matrix as basis W, the residual is left uninitialized
nc = self.audioObject.channels
# computing the signal representation
Cx = np.copy(np.real(self.Cx[0]))
for chan in range(1, nc):
# stored an "efficient" way, so index "complicated":
index = np.sum(np.arange(nc, nc-chan, -1))
Cx += np.real(self.Cx[index])
Cx /= np.double(nc)
# computing NMF of Cx:
W, H = NMF_decomp_init(SX=Cx,
Winit=np.dot(
self.sourceFreqComps,
self.spec_comps[0]['factor'][0]['FW']),
verbose=self.verbose,
nbComps=self.nbSourceComps,
niter=niter,
updateW=False, updateH=True,
)
for ncomp in range(self.nbComps-1):
spec_comp = self.spec_comps[ncomp]
spec_comp['factor'][0]['TW'][:] = np.copy(H) / (self.nbComps-1)
self.renormalize_parameters()
[docs] def makeItConvolutive(self):
"""Takes the spatial parameters and sets them to a convolutive
mixture, in case the parameter has not yet been changed to
'conv' mode.
"""
nc = self.audioObject.channels
for nspat, (spat_ind, spat_comp) in enumerate(self.spat_comps.items()):
if spat_comp['mix_type'] != 'inst':
warnings.warn("Spatial component %d "%spat_ind+
"already not instantaneous, skipping...")
else:
# spat_comp['time_dep'] = 'indep'
spat_comp['mix_type'] = 'conv'
# spat_comp['frdm_prior'] = 'free'
spat_comp_param_inst = spat_comp['params']
spat_comp['params'] = np.zeros([self.rank[nspat],
nc,
self.nbFreqsSigRepr],
dtype=np.complex)
for f in range(self.nbFreqsSigRepr):
spat_comp['params'][:,:,f] = (
np.atleast_2d(spat_comp_param_inst.T))
[docs] def estim_param_a_post_model(self,):
"""estim_param_a_post_model
Estimation of model parameters, using the sparsity constraints.
"""
logSigma0 = np.log(np.max([spec['factor'][0]['TW'].shape[0]
for spec in self.spec_comps.values()])**2)
logSigmaInf = np.log(9.0)
logliks = np.ones(self.iter_num)
if self.noise['sim_ann_opt'] in ['ann', ]:
self.noise['PSD'] = self.noise['ann_PSD_lim'][0]
elif self.noise['sim_ann_opt'] is 'no_ann':
self.noise['PSD'] = self.noise['ann_PSD_lim'][1]
else:
warnings.warn("To add noise to the signal, provide the "+
"sim_ann_opt from any of 'ann', "+
"'no_ann' or 'ann_ns_inj' ")
for i in range(self.iter_num):
if self.verbose:
print "Iteration", i+1, "on", self.iter_num
# adding the noise psd if required:
if self.noise['sim_ann_opt'] in ['ann', 'ann_ns_inj']:
self.noise['PSD'] = (
(np.sqrt(self.noise['ann_PSD_lim'][0]) *
(self.iter_num - i) +
np.sqrt(self.noise['ann_PSD_lim'][1]) * i) /
self.iter_num) ** 2
# running the GEM iteration:
logliks[i] = self.GEM_iteration()
if self.verbose:
print " log-likelihood:", logliks[i]
if i>0:
print " improvement:", logliks[i]-logliks[i-1]
# sparsity
sigma = np.exp(logSigma0 +
(logSigmaInf -
logSigma0) /
max(self.iter_num - 1.0, 1.) * i)
self.reweigh_sparsity_constraint(sigma)
return logliks
[docs] def reweigh_sparsity_constraint(self, sigma):
"""reweigh_sparsity_constraint
"""
if self.verbose>1:
print "reweigh_sparsity_constraint:"
print " sigma", sigma
for j in range(self.nbComps):
spec_comp = self.spec_comps[j]
if spec_comp['sparsity'] and \
spec_comp['factor'][0]['TW'].shape[0]>2:
TW = spec_comp['factor'][0]['TW']
K = TW.shape[0]
# barycenter from energy of factor 0 TW component
muTW = (
np.dot(np.arange(K - 1) *
(np.arange(K - 1, 0, -1))**2,
TW[:-1,:]) /
np.dot((np.arange(K - 1, 0, -1))**2,
np.maximum(TW[:-1,:], eps))
)
# smoothing the sequence:
muTW = st.medianFilter(muTW, length=spec_comp['sparsity'])
if self.verbose>1:
print " muTW NaNs in comp %d:" %j,
print np.any(np.isnan(muTW))
twmask = (
np.exp(- 0.5 *
((np.vstack(np.arange(K)) - muTW)**2) /
sigma)
)
twmask[-1] = twmask.max(axis=0)
twmask[:,twmask[-1]>0] /= twmask[-1][twmask[-1]>0]
TW *= twmask
[docs]class multichanLead(multiChanSourceF0Filter):
"""Multiple Channel Source Separation, with Lead/Accompaniment initial
separation
This instantiation of :class:`multiChanSourceF0Filter` provides convenient
methods (:func:`multichanLead.runDecomp` for instance) to separate the
lead instrument from the accompaniment, as in [Durrieu2011]_, and
then use the obtained parameters/signals in order to initialize the more
general source separation algorithm.
Tentative plan for estimation:
1 estimate the Lead/Accompaniment using SIMM
2 estimate the spatial parameters for each of the separated signals
3 plug the SIMM params and the spatial params into pyFASST, and
4 re-estimate
5 write the estimated signals and enjoy success!
NB: as for now, the sole Lead/Accompaniment separation achieves better
separation than the combination of all the possibilities, probably
because of a more flexible framework for the former than for the latter.
Some results have been published at the
`SiSEC <http://sisec.wiki.irisa.fr>`_ 2013 evaluation campaign.
"""
def __init__(self, *args, **kwargs):
"""multichanLead
subclasses multiChanSourceF0Filter
Provides additional methods to estimate the lead/accompaniment parameters
meant to be used as initial parameters for one of the sources.
"""
super(multichanLead, self).__init__(*args, **kwargs)
# removing some data from the object, recomputing when needed:
del self.Cx
del self.spat_comps
##del self.spec_comps
[docs] def runDecomp(self, instrus=[],
instru2modelfile={},
dir_results='tmp/', maxFrames=4000,
niter_nmf=20, niter_simm=30):
"""Running the scheme that should make me famous.
"""
# checking the folder for results
if not os.path.isdir(dir_results):
os.mkdir(dir_results)
# running some checks that the input is alright:
for i in instrus:
if not(i=='SourceFilter' or
i in instru2modelfile or
i.startswith("Free_")):
raise ValueError('Instrument %s not known.' %i)
# just running everything in __init__:
# estimating the separated
self.estimSUIMM(maxFrames=maxFrames,
dir_results=dir_results,
simmIterNum=niter_simm)
##############
# entering vacuum of nightmare of research trial and errors...
# thus expect many undesirable commented lines...
# putting everything in the right containers:
self.comp_transf_Cx()
self._initialize_structures()
self.makeItConvolutive()
# running DEMIX:
## 20130604 no need anymore, only for ALead:
ALead, AAccp = self.demixOnSepSIMM(unvoiced=True)
# spatial components:
# accompaniment parameters:
## THE FOLLOWING SEEMS TO LEAD TO ISSUES and results not so good...
## 20130604 do this after initialize with NMF...
## for j in range(1, self.nbComps-1):
## for r in range(self.rank):
## ## the following assumes the instruments are sorted in the
## ## right order, but we still need to think about that !
## # self.spat_comps[j]['params'][r][:,:] = AAccp[j-1].T
## # so for now, we just go for the sum of all the mixing params
## self.spat_comps[j]['params'][r][:,:] = AAccp.sum(axis=0).T
## Trying randomized init:
self.initializeConvParams(initMethod='rand')
# no modif for noise component...
# lead instrument spatial mat:
for r in range(self.rank[0]):
self.spat_comps[0]['params'][r][:,:] = ALead[0].T
# spectral components:
## Using the instrument models to initialize the matrices:
# For convenience, we do this in a separate method:
instrumentNames = self.initSpecCompsWithLabelAndFiles(
instrus=instrus,
instru2modelfile=instru2modelfile,
freqBasisAdaptive='fixed')
## instrumentNames = {}
## for n, i in enumerate(instrus):
## instrumentNames[n] = i
## if i == 'SourceFilter':
## print " Source", n, "left as general Source-Filter model."
## elif 'Free' in i: # assumes Free_nbNMFComps
## nbNMFComps = int(i.split('_')[-1])
## print " Source", n, "set as free NMF source."
## # initialize single factor spectral component
## self.spec_comps[n] = {}
## self.spec_comps[n]['spat_comp_ind'] = n
## self.spec_comps[n]['factor'] = {}
## self.spec_comps[n]['factor'][0] = {}
## self.spec_comps[n]['factor'][0]['FB'] = (
## 0.75 * np.abs(np.random.randn(self.nbFreqsSigRepr,
## nbNMFComps)) +
## 0.25)
## self.spec_comps[n]['factor'][0]['FW'] = (
## np.eye(nbNMFComps))
## self.spec_comps[n]['factor'][0]['TW'] = (
## 0.75 * np.abs(np.random.randn(nbNMFComps,
## self.nbFramesSigRepr)) +
## 0.25)
## self.spec_comps[n]['factor'][0]['TB'] = []
## self.spec_comps[n]['factor'][0]['FB_frdm_prior'] = 'free'
## self.spec_comps[n]['factor'][0]['FW_frdm_prior'] = 'fixed'
## self.spec_comps[n]['factor'][0]['TW_frdm_prior'] = 'free'
## self.spec_comps[n]['factor'][0]['TB_frdm_prior'] = []
## self.spec_comps[n]['factor'][0]['TW_constr'] = 'NMF'
## # sparsity stuff
## sparsity = self.sparsity
## if sparsity is None or len(sparsity) not in (1, self.nbComps):
## self.spec_comps[n]['sparsity'] = False
## elif len(sparsity) == self.nbComps:
## self.spec_comps[n]['sparsity'] = sparsity[n]
## else:
## self.spec_comps[n]['sparsity'] = sparsity[0]
## else: #if i != 'SourceFilter':
## print " Source", n, "is", i
## modelfile = instru2modelfile[i]
## struc = np.load(modelfile)
## gsmm = struc['gsmm'].tolist()
## # Keeping only spectra that are not flat:
## decisionSpectra = np.any(np.diff(gsmm.sigw, axis=1)!=0, axis=1)
## # keeping only the spectra with enough weight:
## # hard decision, remove all spectra with w == min(w)
## # decisionOnWeight = np.where(gsmm.w!=gsmm.w.min())[0]
## # harder decision: remove all with w under a threshold:
## decisionOnWeight = (gsmm.w > gsmm.w.max()*1e-3)
##
## keepIndex = np.where(decisionSpectra+decisionOnWeight)[0]
##
## FB = np.ascontiguousarray(gsmm.sigw[keepIndex].T)
## #self.setSpecCompFB(compNb=n, FB=FB, FB_frdm_prior='fixed')
## self.setSpecCompFB(compNb=n, FB=FB, FB_frdm_prior='free')
## struc.close()
suffix = dict(instrumentNames)
# suffix[len(suffix)] = ''
if self.verbose>1:
print 'suffix', suffix
self.renormalize_parameters()
# initialize parameters with NMF:
# putting the HF0 from the SIMM model back in:
# lead instrument
##self.spec_comps[0]['factor'][0]['TW'][:-1] = (
## self.simmModel.SIMMParams['HF00'])
startincqt = np.sort(np.where(self.tft.time_stamps>=0)[0])[0]
stopincqt = (
startincqt + self.simmModel.SIMMParams['HF00'].shape[1])
self.spec_comps[0][
'factor'][0]['TW'][:-1, startincqt:stopincqt] = (
self.simmModel.SIMMParams['HF00'])
self.initialize_all_spec_comps_with_NMF(updateFreqBasis=False,
niter=niter_nmf)
# putting the HF0 from the SIMM model back in:
# lead instrument
self.spec_comps[0][
'factor'][0]['TW'][:-1, startincqt:stopincqt] = (
self.simmModel.SIMMParams['HF00'])
##self.spec_comps[0]['factor'][0]['TW'][:-1] = (
## self.simmModel.SIMMParams['HF00'])
# the following are too variable to be kept for now:
#self.spec_comps[0]['factor'][1]['TW'][:-1] = (
# self.simmModel.SIMMParams['HPHI'])
#self.spec_comps[0]['factor'][1]['FW'][:-1] = (
# self.simmModel.SIMMParams['HGAMMA'])
#self.spec_comps[0]['factor'][1]['FB'][:-1] = (
# self.simmModel.SIMMParams['WGAMMA'])
# accompaniment: nothing for now.
# accompaniment: avoid or reduce effect of stuff in source 0, maybe:
for j in range(1,self.nbComps-1):
if instrumentNames[j] == 'SourceFilter':
self.spec_comps[j]['factor'][0]['TW'][
:-1, startincqt:stopincqt] = 1.*(
self.simmModel.SIMMParams['HF00']==0)
# noise: nothing for now.
## 20130605T0104
## Should we iterate a sequence of (estim_param_a_post_model, demix)
## here?
# separate the files with these parameters:
self.renormalize_parameters()
self.separate_spat_comps(dir_results=dir_results,
suffix=suffix)
if self.verbose>1:
print suffix
# replace this with method:
# run DEMIX on the separated files:
##estFiles = self.files['spat_comp']
##nbSources = len(self.spat_comps)
##if self.verbose>1:
## print nbSources, "sources:", estFiles
##for nest, estfilename in enumerate(estFiles):
## if self.verbose>1:
## print estfilename
## A = self.demixOnGivenFile(estfilename, nsources=1)
## for r in range(self.rank[nest]):
## self.spat_comps[nest]['params'][r][:,:] = (
## A[0].T + 1e-3 * np.random.randn(*A[0].T.shape))
##
##self.renormalize_parameters()
estFiles = self.initConvDemixOnSepSrc(suffix)
self.separate_spatial_filter_comp(dir_results=dir_results,
suffix=suffix)
# Re-estimating all the parameters:
logliks = self.estim_param_a_post_model()
# Separate and Write them...
if self.verbose:
print "Writing files to", dir_results
print self.files
self.separate_spat_comps(dir_results=dir_results,
suffix=suffix)
return logliks
[docs] def estimSIMM(self, maxFrames=4000, dir_results='tmp/', simmIterNum=30):
"""This method runs the SIMM estimation on the provided audio file.
The lead source is assumed to be self.spec_comps[0]
"""
##numCompAccomp = (
## np.sum([spec_comp['factor'][0]['FB'].shape[1]
## for ncomp, spec_comp in self.spec_comps.items()])-
## self.spec_comps[0]['factor'][0]['FB'].shape[1]
## )
numCompAccomp = 40 # TODO: check if this improves solo/acc separation
if simmIterNum is None:
simmIterNum = self.iter_num
self.simmModel = SLS.SeparateLeadProcess(
inputAudioFilename=self.audioObject.filename,
stepNotes=self.sourceParams['stepnoteF0'],
chirpPerF0=self.sourceParams['chirpPerF0'],
nbIter=simmIterNum,
windowSize=(
self.sig_repr_params['wlen']/
np.double(self.audioObject.samplerate)), # in seconds
hopsize=self.sig_repr_params['hopsize'],
NFT=self.sig_repr_params['fsize'],
numCompAccomp=numCompAccomp,
K_numFilters=self.nbFilterWeigs[0],
P_numAtomFilters=self.nbFilterComps,
#imageCanvas=canvas,
minF0search=self.sourceParams['minF0search'],
maxF0search=self.sourceParams['maxF0search'],
minF0=self.sourceParams['minF0'],
maxF0=self.sourceParams['maxF0'],
verbose=self.verbose,
tfrepresentation=self.sig_repr_params['transf'],
cqtfmax=self.sig_repr_params['tffmax'],#4000,
cqtfmin=self.sig_repr_params['tffmin'],#50,
cqtbins=self.sig_repr_params['tfbpo'],#48,
cqtWinFunc=self.sig_repr_params['tfWinFunc'],
#slf.minqt.sqrt_blackmanharris,
cqtAtomHopFactor=self.sig_repr_params['hopfactor'],#0.25,
outputDirSuffix='tmp/', # dir_results,
# this is not working, have to find a way
initHF00='random',
freeMemory=False)
self.simmModel.autoMelSepAndWrite(maxFrames=maxFrames)
[docs] def estimSUIMM(self, maxFrames=4000, **kwargs):
"""separates the audio signal into lead+accompaniment,
including more noisy components for the lead than `self.estimSIMM`
"""
if not hasattr(self, "simmModel"):
self.estimSIMM(maxFrames=maxFrames, **kwargs)
self.simmModel.estimStereoSUIMMParamsWriteSeps(maxFrames=maxFrames)
[docs] def demixOnSepSIMM(self, unvoiced=True):
"""run DEMIX on the separated signals resulting from SIMM model
"""
if not hasattr(self, 'simmModel'):
self.estimSIMM()
if unvoiced:
self.estimSUIMM()
if unvoiced:
suffix = '_VUIMM'
else:
suffix = ''
# DEMIX on lead instrument
leadfilename = (
self.simmModel.files['voc_output_file'][:-4] +
suffix + '.wav')
ALead = self.demixOnGivenFile(
leadfilename,
nsources=1)
# DEMIX on accompaniment
accpfilename = (
self.simmModel.files['mus_output_file'][:-4] +
suffix + '.wav')
AAccp = self.demixOnGivenFile(
accpfilename,
nsources=self.nbComps-2)
return ALead, AAccp
[docs] def demixOnGivenFile(self, filename, nsources=1):
'''running the DEMIX algorithm from :demix.DEMIX:
'''
maxclusters = 40
neighbours = 15
# default for demix to work best: #FIXME!!!
#wlen = 2048
#hopsize = 1024
demixInst = demix.DEMIX(
audio=filename,
nsources=nsources, # spatial comps for demix
#wlen=wlen,
#hopsize=hopsize,
#neighbors=neighbours,
verbose=self.verbose,
maxclusters=maxclusters,
**self.demixParams)
#demixInst.comp_pcafeatures()
#demixInst.comp_parameters()
#demixInst.init_subpts_set()
demixInst.comp_clusters()
demixInst.refine_clusters()
# mixing parameters from DEMIX estimation:
# results in an nsrc x nfreqs x nc array
A = demixInst.steeringVectorsFromCentroids()
del demixInst
if A.size == 0:
warnMsg = "There are no clusters in demix, returning dummy matrix."
warnings.warn(warnMsg)
if self.verbose:
print warnMsg
return np.cos(0.25 * np.pi) * np.ones([nsources,
A.shape[1], A.shape[2]])
return A
[docs] def initConvDemixOnSepSrc(self, suffix):
"""initialize the convolutive parameters with DEMIX, running on each of
the separated sources
"""
if not hasattr(self, "files"):
warnings.warn("The sources were not separated, compute them first"+
" with separate_spat_comps.")
return None
estFiles = self.files['spat_comp']
nbSources = len(self.spat_comps)
if self.verbose>1:
print nbSources, "sources:", estFiles
for nest, estfilename in enumerate(estFiles):
if self.verbose>1:
print estfilename
A = self.demixOnGivenFile(estfilename, nsources=1)
for r in range(self.rank[nest]):
self.spat_comps[nest]['params'][r][:,:] = (
A[0].T + 1e-3 * np.random.randn(*A[0].T.shape))
self.renormalize_parameters()
return estFiles