Source code for pypmc.mix_adapt.hierarchical

"""Hierarchical clustering as described in [GR04]_

"""
import copy
import numpy as np
import scipy.linalg as linalg

import logging
logger = logging.getLogger(__name__)

[docs]class Hierarchical(object): """Hierarchical clustering as described in [GR04]_. Find a Gaussian mixture density :math:`g` with components :math:`g_j` that most closely matches the Gaussian mixture density specified by :math:`f` and its components :math:`f_i`, but with less components. The algorithm is an iterative EM procedure alternating between a *regroup* and a *refit* step, and requires an ``initial_guess`` of the output density that defines the maximum number of components to use. :param input_components: :py:class:`pypmc.density.mixture.MixtureDensity` with Gaussian (:py:class:`pypmc.density.gauss.Gauss`) components; the Gaussian mixture to be reduced. :param initial_guess: :py:class:`pypmc.density.mixture.MixtureDensity` with Gaussian (:py:class:`pypmc.density.gauss.Gauss`) components; initial guess for the EM algorithm. .. seealso:: :py:func:`pypmc.density.mixture.create_gaussian_mixture` """ def __init__(self, input_components, initial_guess): # read and verify component numbers self.nin = len(input_components.components) self.nout = len(initial_guess.components) assert self.nin > self.nout, "Got more output (%i) than input (%i) components" % (self.nout, self.nin) assert self.nout > 0, "Invalid number of output components %s" % self.nout self.f = input_components self.g = copy.deepcopy(initial_guess) # inverse map: several inputs can map to one output, so need list self.inv_map = {} for j in range(self.nout): self.inv_map[j] = None # the i-th element is :math:`min_j KL(f_i || g_j)` self.min_kl = np.zeros(self.nin) + np.inf def _cleanup(self, kill): """Look for dead components (weight=0) and remove them if enabled by ``kill``. Resize storage. Recompute determinant and covariance. """ if kill: removed_indices = self.g.prune() self.nout -= len(removed_indices) if removed_indices: logger.info('Removing %s' % removed_indices) for j in removed_indices: self.inv_map.pop(j[0]) def _distance(self): """Compute the distance function d(f,g,\pi), Eq. (3)""" return np.average(self.min_kl, weights=self.f.weights) def _refit(self): """Update the map :math:`\pi` keeping the output :math:`g` fixed Use Eq. (7) and below in [GR04]_ """ # temporary variables for manipulation mu_diff = np.empty_like(self.f.components[0].mu) sigma = np.empty_like(self.f.components[0].sigma) mean = np.empty_like(mu_diff) cov = np.empty_like(sigma) for j, c in enumerate(self.g.components): # stop if inv_map is empty for j-th comp. if not self.inv_map[j]: self.g.weights[j] = 0. continue # (re-)initialize new mean/cov to zero mean[:] = 0.0 cov[:] = 0.0 # compute total weight and mean self.g.weights[j] = self.f.weights[self.inv_map[j]].sum() for i in self.inv_map[j]: mean += self.f.weights[i] * self.f.components[i].mu # rescale by total weight mean /= self.g.weights[j] # update covariance for i in self.inv_map[j]: # mu_diff = mu'_j - mu_i mu_diff[:] = mean mu_diff -= self.f.components[i].mu # sigma = (mu'_j - mu_i) (mu'_j - mu_i)^T sigma[:] = np.outer(mu_diff, mu_diff) # sigma += sigma_i sigma += self.f.components[i].sigma # multiply with alpha_i sigma *= self.f.weights[i] # sigma_j += alpha_i * (sigma_i + (mu'_j - mu_i) (mu'_j - mu_i)^T cov += sigma # 1 / beta_j cov /= self.g.weights[j] # update the Mixture c.update(mean, cov) def _regroup(self): """Update the output :math:`g` keeping the map :math:`\pi` fixed. Compute the KL between all input and output components. """ # clean up old maps for j in range(self.nout): self.inv_map[j] = [] # find smallest divergence between input component i # and output component j of the cluster mixture density for i in range(self.nin): self.min_kl[i] = np.inf j_min = None for j in range(self.nout): kl = kullback_leibler(self.f.components[i], self.g.components[j]) if kl < self.min_kl[i]: self.min_kl[i] = kl j_min = j assert j_min is not None self.inv_map[j_min].append(i)
[docs] def run(self, eps=1e-4, kill=True, max_steps=50, verbose=False): r"""Perform the clustering on the input components updating the initial guess. The result is available in the member ``self.g``. Return the number of iterations at convergence, or None. :param eps: If relative change of distance between current and last step falls below ``eps``, declare convergence: .. math:: 0 < \frac{d^t - d^{t-1}}{d^t} < \varepsilon :param kill: If a component is assigned zero weight (no input components), it is removed. :param max_steps: Perform a maximum number of update steps. """ if verbose: from pypmc.tools.util import depr_warn_verbose depr_warn_verbose(__name__) old_distance = np.finfo(np.float64).max new_distance = np.finfo(np.float64).max logger.info('Starting hierarchical clustering with %d components.' % len(self.g.components)) converged = False for step in range(1, max_steps + 1): self._cleanup(kill) self._regroup() self._refit() new_distance = self._distance() assert new_distance >= 0, 'Found non-positive distance %d' % new_distance logger.info('Distance in step %d: %g' % (step, new_distance)) if new_distance == old_distance: converged = True logger.info('Exact minimum found after %d steps' % step) break rel_change = (old_distance - new_distance) / old_distance assert not (rel_change < -1e-13), 'distance increased' if rel_change < eps and not converged and step > 0: converged = True if new_distance != old_distance: logger.info('Close enough to local minimum after %d steps' % step) break # save distance for comparison in next step old_distance = new_distance self._cleanup(kill) logger.info('%d components remain.' % len(self.g.components)) if converged: return step
# else return None
[docs]def kullback_leibler(c1, c2): """Kullback Leibler divergence of two Gaussians, :math:`KL(1||2)`""" d = c2.log_det_sigma - c1.log_det_sigma d += np.trace(c2.inv_sigma.dot(c1.sigma)) mean_diff = c1.mu - c2.mu d += mean_diff.transpose().dot(c2.inv_sigma).dot(mean_diff) d -= len(c1.mu) return 0.5 * d