"""Collect Markov Chain"""
from copy import deepcopy as _cp
import numpy as _np
from ..tools import History as _History
from ..tools.indicator import merge_function_with_indicator as _indmerge
from ..tools._doc import _inherit_docstring
import logging
logger = logging.getLogger(__name__)
[docs]class MarkovChain(object):
r"""A Markov chain to generate samples from the target density.
:param target:
The target density. Must be a function accepting a 1d numpy
array and returning a float, namely :math:`\log(P(x))`,
the log of the target `P`.
:param proposal:
The proposal density `q`.
Should be of type :py:class:`pypmc.density.base.LocalDensity`.
.. hint::
If your proposal density is symmetric, define the member
variable ``proposal.symmetric = True``. This will omit calls
to proposal.evaluate in the Metropolis-Hastings steps.
:param start:
The starting point of the Markov chain. (numpy array)
:param indicator:
The indicator function receives a numpy array and returns bool.
The target is only called if indicator(proposed_point)
returns True, otherwise the proposed point is rejected
without call to target.
Use this function to specify the support of the target.
.. seealso::
:py:mod:`pypmc.tools.indicator`
:param prealloc:
Integer; the number of Markov chain samples for which memory in
``self.samples`` is allocated. If more memory is needed, it will
be allocated on demand.
.. hint::
Preallocating memory can speed up the calculation, in
particular if it is known in advance how long the chains
are run.
:param save_target_values:
Bool; if ``True``, store the evaluated ``target`` at every visited
point in ``self.target_values``
:param rng:
The rng passed to the proposal when calling proposal.propose
.. important::
``rng`` must return a sample from the uniform distribution
in [0,1) when calling **rng.rand()**
.. seealso::
``rng`` must also fulfill the requirements of your proposal
:py:meth:`pypmc.density.base.LocalDensity.propose`
"""
def __init__(self, target, proposal, start, indicator=None,
prealloc=0, save_target_values=False, rng=_np.random.mtrand):
# store input into instance
self.current_point = _np.array(start, dtype=float) # call array constructor to make sure to have a copy
self.samples = _History(len(self.current_point), prealloc)
self.proposal = _cp(proposal)
self.rng = rng
self.target = _indmerge(target, indicator, -_np.inf)
self.target_values = _History(1, prealloc) if save_target_values else None
self.current_target_eval = self.target(self.current_point)
if not _np.isfinite(self.current_target_eval):
raise ValueError('``target(start)`` must evaluate to a finite value and ``indicator(start)`` must be ``True``')
def clear(self):
'''Clear history of visited points (stored in ``self.samples``) and
other internal variables to free memory.
.. note::
The current state that defines the Markov chain is untouched.
'''
self.samples.clear()
if self.target_values is not None:
self.target_values.clear()
def run(self, N=1, continue_on_NaN=False):
'''Run the chain and store the history of visited points into
the member variable ``self.samples``. Returns the number of
accepted points during the run.
.. seealso::
:py:class:`pypmc.tools.History`
:param N:
An int which defines the number of steps to run the chain.
:param continue_on_NaN:
A boolean flag defining the behavior when encountering an NaN in
the user-supplied target density for a proposed point.
Default: ``False`` (-> raise ``ValueError``). If ``True``, reject
the proposed point and continue.
'''
if N == 0:
return 0
# set the accept function
if self.proposal.symmetric:
get_log_rho = self._get_log_rho_metropolis
else:
get_log_rho = self._get_log_rho_metropolis_hastings
# allocate an empty numpy array to store the run
if self.target_values is not None:
this_target_values = self.target_values.append(N)
this_run = self.samples.append(N)
accept_count = 0
for i_N in range(N):
# propose new point
proposed_point = self.proposal.propose(self.current_point, self.rng)
proposed_eval = self.target(proposed_point)
# log_rho := log(probability to accept point), where log_rho > 0 is meant to imply rho = 1
log_rho = get_log_rho(proposed_point, proposed_eval)
# check for NaN
if _np.isnan(log_rho):
if not continue_on_NaN: raise ValueError('encountered NaN')
# otherwise reject
this_run[i_N] = self.current_point
else:
# accept if rho >= 1 or with with probability rho (if rho < 1)
if log_rho >=0 or log_rho >= _np.log(self.rng.rand()) :
accept_count += 1
this_run[i_N] = proposed_point
self.current_point = proposed_point
self.current_target_eval = proposed_eval
# reject if not accepted
else:
this_run[i_N] = self.current_point
# save target value if desired
if self.target_values is not None:
this_target_values[i_N] = self.current_target_eval
# ---------------------- end for --------------------------------
return accept_count
def _get_log_rho_metropolis(self, proposed_point, proposed_eval):
"""calculate the log of the metropolis ratio"""
return proposed_eval - self.current_target_eval
def _get_log_rho_metropolis_hastings(self, proposed_point, proposed_eval):
"""calculate log(metropolis ratio times hastings factor)"""
return self._get_log_rho_metropolis(proposed_point, proposed_eval)\
- self.proposal.evaluate (proposed_point, self.current) \
+ self.proposal.evaluate (self.current, proposed_point)
[docs]class AdaptiveMarkovChain(MarkovChain):
# set the docstring --> inherit from Base class, but replace:
# - MarkovChain(*args, **kwargs) --> AdaptiveMarkovChain(*args, **kwargs)
# - A Markov chain --> A Markov chain with proposal covariance adaptation
# - ProposalDensity by Multivariate in description of :param propoasal:
__doc__ = MarkovChain.__doc__\
.replace('MarkovChain(', 'AdaptiveMarkovChain(')\
.replace('A Markov chain', '''A Markov chain with proposal covariance adaptation as in [HST01]_''' , 1)\
.replace('ProposalDensity', 'Multivariate')
def __init__(self, *args, **kwargs):
# set adaptation params
self.adapt_count = 1
self.covar_scale_multiplier = kwargs.pop('covar_scale_multiplier' , 1.5 )
self.covar_scale_factor = kwargs.pop('covar_scale_factor' , None )
self.covar_scale_factor_max = kwargs.pop('covar_scale_factor_max' , 100. )
self.covar_scale_factor_min = kwargs.pop('covar_scale_factor_min' , .0001)
self.force_acceptance_max = kwargs.pop('force_acceptance_max' , .35 )
self.force_acceptance_min = kwargs.pop('force_acceptance_min' , .15 )
self.damping = kwargs.pop('damping' , .5 )
super(AdaptiveMarkovChain, self).__init__(*args, **kwargs)
if self.covar_scale_factor is None:
self.covar_scale_factor = 2.38**2/len(self.current_point)
# initialize unscaled sigma
self.unscaled_sigma = self.proposal.sigma / self.covar_scale_factor
@_inherit_docstring(MarkovChain)
def run(self, N=1, continue_on_NaN=False):
if N == 0:
return 0
self._last_accept_count = super(AdaptiveMarkovChain, self).run(N, continue_on_NaN)
return self._last_accept_count
def set_adapt_params(self, *args, **kwargs):
r"""Sets variables for covariance adaptation.
When :meth:`.adapt` is called, the proposal's covariance matrix is
adapted in order to improve the chain's performance. The aim
is to improve the efficiency of the chain by making better
proposals and forcing the acceptance rate :math:`\alpha` of
the chain to lie in an interval ensuring good exploration:
:param force_acceptance_max:
Float, the upper limit (in (0,1])
Default: :math:`\alpha_{max}=.35`
:param force_acceptance_min:
Float, the lower limit (in [0,1))
Default: :math:`\alpha_{min}=.15`
This is achieved in two steps:
1. **Estimate the target covariance**: compute the sample
covariance from the last (the t-th) run as :math:`S^t`
then combine with previous estimate :math:`\Sigma^{t-1}`
with a weight damping out over time as
.. math::
\Sigma^t = (1-a^t) \Sigma^{t-1} + a^t S^t
where the weight is given by
.. math::
a^t = 1/t^{\lambda}.
:param damping:
Float, see formula above
Default: :math:`\lambda=.5`
The ``damping`` :math:`\lambda` is neccessary to assure
convergence and should be in [0,1]. A default value of 0.5 was
found to work well in practice. For details, see [HST01]_.
2. **Rescale the covariance matrix**: Remember that the goal
is to force the acceptance rate into a specific interval.
Suppose that the chain already is in a region of significant
probability mass (should be the case before adapting it).
When the acceptance rate is close to zero, the chain cannot
move at all; i.e., the proposed points have a low probability
relative to the current point. In this case the proposal
covariance should decrease to increase "locality" of the
chain. In the opposite case, when the acceptance rate is
close to one, the chain most probably only explores a small
volume of the target. Then enlarging the covariance matrix
decreases "locality". In this implementation, the proposal
covariance matrix is :math:`c \Sigma^t`
:param covar_scale_factor:
Float, this number ``c`` is multiplied to :math:`\Sigma^t`
after it has been recalculated. The higher the dimension
:math:`d`, the smaller it should be. For a Gaussian
proposal and target, the optimal factor is
:math:`2.38^2/d`. Use this argument to increase
performance from the start before any adaptation.
Default: :math:`c=2.38^2/d`
``covar_scale_factor`` is updated using :math:`\beta`
:param covar_scale_multiplier:
Float;
if the acceptance rate is larger than ``force_acceptance_max``,
:math:`c \to \beta c`.
If the acceptance rate is smaller than ``force_acceptance_min``,
:math:`c \to c / \beta`.
Default :math:`\beta=1.5`
Additionally, an upper and a lower limit on
``covar_scale_factor`` can be provided. This is useful to hint
at bugs in the target or MC implementation that cause the
efficiency to run away.
:param covar_scale_factor_max:
Float, ``covar_scale_factor`` is kept below this value.
Default: :math:`c_{max}=100`
:param covar_scale_factor_min:
Float, ``covar_scale_factor`` is kept above this value.
Default: :math:`c_{max}=10^{-4}`
"""
if args != (): raise TypeError('keyword args only; try set_adapt_parameters(keyword = value)')
self.covar_scale_multiplier = kwargs.pop('covar_scale_multiplier' , self.covar_scale_multiplier)
self.covar_scale_factor = kwargs.pop('covar_scale_factor' , self.covar_scale_factor )
self.covar_scale_factor_max = kwargs.pop('covar_scale_factor_max' , self.covar_scale_factor_max)
self.covar_scale_factor_min = kwargs.pop('covar_scale_factor_min' , self.covar_scale_factor_min)
self.force_acceptance_max = kwargs.pop('force_acceptance_max' , self.force_acceptance_max )
self.force_acceptance_min = kwargs.pop('force_acceptance_min' , self.force_acceptance_min )
self.damping = kwargs.pop('damping' , self.damping )
if not kwargs == {}: raise TypeError('unexpected keyword(s): ' + str(kwargs.keys()))
def adapt(self):
r"""Update the proposal using the points
stored in ``self.samples[-1]`` and the parameters which can be set via
:py:meth:`.set_adapt_params`.
In the above referenced function's docstring, the algorithm is
described in detail. If the resulting matrix is not a valid covariance,
its offdiagonal elements are set to zero and a warning is printed. If
that also fails, the proposal's covariance matrix is divided by the
``covar_scale_multiplier`` :math:`\beta`.
.. note::
This function only uses the points obtained during the last run.
"""
last_run = self.samples[-1]
accept_rate = float(self._last_accept_count) / len(last_run)
# careful with rowvar!
# in this form it is expected that each column of ``points``
# represents sampling values of a variable
# this is the case if points is a list of sampled points
covar_estimator = _np.cov(last_run, rowvar=0)
# update sigma
time_dependent_damping_factor = 1./self.adapt_count**self.damping
self.unscaled_sigma = (1-time_dependent_damping_factor) * self.unscaled_sigma\
+ time_dependent_damping_factor * covar_estimator
self._update_scale_factor(accept_rate)
scaled_sigma = self.covar_scale_factor * self.unscaled_sigma
# increase count now before proposal update. It may fail and raise an exception.
self.adapt_count += 1
try:
self.proposal.update(scaled_sigma)
except _np.linalg.LinAlgError:
logger.warning("Markov chain self adaptation failed; trying diagonalization")
# try to insert offdiagonal elements only
diagonal_matrix = _np.zeros_like(scaled_sigma)
_np.fill_diagonal(diagonal_matrix, _np.diag(scaled_sigma))
try:
self.proposal.update(diagonal_matrix)
logger.warning('Diagonalization succeeded')
except _np.linalg.LinAlgError:
logger.warning('Diagonalization failed')
# just scale the old covariance matrix if everything else fails
self.proposal.update(self.proposal.sigma / self.covar_scale_multiplier)
def _update_scale_factor(self, accept_rate):
'''Private function.
Updates the covariance scaling factor ``covar_scale_factor``
according to its limits
'''
if accept_rate > self.force_acceptance_max and self.covar_scale_factor < self.covar_scale_factor_max:
self.covar_scale_factor *= self.covar_scale_multiplier
elif accept_rate < self.force_acceptance_min and self.covar_scale_factor > self.covar_scale_factor_min:
self.covar_scale_factor /= self.covar_scale_multiplier