Source code for fvgp.gp_mcmc

#!/usr/bin/env python

import numpy as np
import time
import warnings
from dask.distributed import get_worker


## --------------------------------------------------------------------- ##
#  A generic Metropolis sampler.  You have to supply the log likelihood   #
#  function, which need not really be a likelihood function at all.       #
#
#  Translated from Shaby's R code.
#
#  Uppercase K is the size of the blocks of iterations used for
#  adapting the proposal.
#  Lowercase k is the offset to get rid of wild swings in adaptation
#  process that otherwise happen the early
#  iterations.
# Dr. Likun Zhang: https://stat.missouri.edu/people/zhang


# x0 ........................................... initial values
# likelihood_fn ............................................ log likelihood
# prior_fn ....................................... prior function for theta
# prior_args .............................................. for prior functions
# n_updates .................................. number of Metropolis updates
# prop_Sigma ................................... proposal covariance matrix

# adapt_cov ......................... whether to update proposal covariance
# return_prop_Sigma_trace........ save proposal covariance from each update
# r_opt ........................................... optimal acceptance rate
# c_0, c_1 .. two coefficients for updating sigma_m and proposal covariance
# K .............................................. adapt every K iterations
#                                                                         #


[docs] class gpMCMC: """ This class allows the user to customize an MCMC via user-defined proposal distributions and a prior. Parameters ---------- log_likelihood_function : callable The log of the likelihood to be sampled. Function of the form def likelihood(x,args) that returns a scalar. bounds : np.ndarray, optional The bounds of the domain. Only used if the default uniform prior is used. The default is None, which means no bounds and thus a uniform prior over the whole space. prior_function : callable, optional Function to query for the prior probability of form: func(x, bounds, args), where x is the current vector and bounds is the bounds array. The default is a uniform prior within ``bounds``. In that case, bounds have to be provided. proposal_distributions : iterable, optional A list of object instances of ProposalDistribution. The default is a single normal distribution with the default ``init_prop_Sigma``. args : Any, optional Arguments that will be communicated to all Callables. Attributes ---------- trace : dict Solution after run_mcmc is executed. """ def __init__(self, log_likelihood_function, bounds=None, prior_function=None, proposal_distributions="normal", args=None ): self.log_likelihood_function = log_likelihood_function assert bounds is not None or prior_function is not None, \ "Provide either bounds (for the default uniform prior) or a prior_function." if prior_function is None: self.prior_function = lambda theta, b, _: 0. if np.all( (theta >= b[:, 0]) & (theta <= b[:, 1])) else -np.inf else: self.prior_function = prior_function if proposal_distributions == "normal": assert bounds is not None, \ "bounds must be provided to initialize the default normal proposal distribution." domain_size = bounds[:, 1] - bounds[:, 0] std_diag = domain_size * 0.2 / np.sqrt(12) proposal_distributions = [ProposalDistribution(np.arange(len(bounds)), init_prop_Sigma=np.diag(std_diag**2))] self.proposal_distributions = proposal_distributions self.args = args self.bounds = bounds self.trace = None self.mcmc_info = {}
[docs] def run_mcmc(self, *, x0, n_updates=10000, info=False, break_condition=None, run_in_every_iteration=None): """ This function runs the mcmc. Parameters ---------- x0 : np.ndarray Starting point of the mcmc. n_updates: int, optional The log of the likelihood to be sampled. info : bool Whether to print information about the mcmc iterations. break_condition : callable or string or None A break condition that specified when the mcmc is terminated. If None, mcmc will run until ``n_updates`` is reached. If a callable is provided, it will get the mcmc object instance as input: def break(obj). The only allowed string is ``default`` and in that case the mcmc will be terminated if the mean of the position has not changed significantly in the last 200 iterations. run_in_every_iteration : callable, optional A callable that is executed in every iteration. Form: func(obj). Default no-op. Returns ------- trace information : dict Mean, medians, and variances of the last 1% are presented. All other returns consider the whole trace. The traces ``x`` are all the accepted positions in the MCMC. """ start_time = time.time() n_updates = max(n_updates, 2) assert isinstance(x0, np.ndarray) and np.ndim(x0) == 1, "x0 must be a 1-d np.ndarray" #break condition if break_condition is None: break_condition = lambda a: False elif break_condition == "default": break_condition = self._default_break_condition elif callable(break_condition): pass else: raise Exception("No valid input for break condition provided!") if run_in_every_iteration is None: run_in_every_iteration = lambda a: False self.trace = {"f(x)": [], "x": [], "time stamp": []} # Set up and initialize trace objects self.trace["x"].append(x0) # Initialize Metropolis x = x0.copy() likelihood = self.log_likelihood_function(x, self.args) if info: print("Starting likelihood. f(x)= ", likelihood) prior = self.prior_function(x, self.bounds, self.args) ######################################################### # Begin main loop for i in np.arange(1, n_updates): for obj in self.proposal_distributions: x, prior, likelihood, jt = self._jump(x, obj, prior, likelihood) obj.jump_trace.append(jt) obj.adapt(i, self) # Update the trace objects self.trace["x"].append(x) self.trace["f(x)"].append(likelihood) self.trace["time stamp"].append(time.time() - start_time) run_in_every_iteration(self) if info and (i % 10) == 0: print("Finished ", i, " out of ", n_updates, " iterations. f(x)= ", likelihood) if break_condition(self): break # Collect trace objects to return arg_max = np.argmax(self.trace["f(x)"]) dist_index = int(len(self.trace["x"]) - (len(self.trace["x"]) / 100)) self.mcmc_info = {"f(x)": self.trace["f(x)"], "max f(x)": self.trace["f(x)"][arg_max], "MAP": self.trace["f(x)"][arg_max], "max x": np.asarray(self.trace["x"])[arg_max], "time stamps": self.trace["time stamp"], "x": np.asarray(self.trace["x"]), "mean(x)": np.mean(np.asarray(self.trace["x"])[dist_index:], axis=0), "median(x)": np.median(np.asarray(self.trace["x"])[dist_index:], axis=0), "var(x)": np.var(np.asarray(self.trace["x"])[dist_index:], axis=0)} # End main loop return self.mcmc_info
@staticmethod def _default_break_condition(obj): loglik = np.asarray(obj.trace["f(x)"]) i = len(loglik) W = 100 tol = 1e-3 if i < 1000: return False stabilized = abs(loglik[-W:].mean() - loglik[-2 * W:-W].mean()) < tol if stabilized: return True else: return False ############################################################### def _jump(self, x_old, obj, prior_eval, likelihood): x_star = x_old.copy() assert callable(obj.proposal_dist), "proposal_dist must be callable" # get proposed x (x_star) x_star[obj.indices] = obj.proposal_dist(x_old[obj.indices].copy(), x_old, obj) # evaluate prior(x_star) prior_evaluation_x_star = self.prior_function(x_star, self.bounds, self.args) jump_trace = 0. # if prior(x_start) is not -inf, get likelihood if prior_evaluation_x_star != -np.inf: likelihood_star = self.log_likelihood_function(x_star, self.args) if np.isnan(likelihood_star): raise Exception("Likelihood evaluation = NaN in gpMCMC") expo = prior_evaluation_x_star + likelihood_star - prior_eval - likelihood if expo < 50: metr_ratio = np.exp(expo) else: metr_ratio = 1.1 if np.isnan(metr_ratio): metr_ratio = 0. if metr_ratio > np.random.uniform(0, 1, 1) or obj.auto_accept: x = x_star prior_eval = prior_evaluation_x_star likelihood = likelihood_star jump_trace = 1. else: x = x_old else: x = x_old return x, prior_eval, likelihood, jump_trace def __getstate__(self): return self.__dict__ def __setstate__(self, state): self.__dict__.update(state)
###############################################################
[docs] class ProposalDistribution: def __init__(self, indices, proposal_dist="normal", init_prop_Sigma=None, adapt_callable=None, r_opt=.234, c_0=10, c_1=.8, K=10, auto_accept=False, adapt_cov=True, prop_args=None, ID=None): """ Class to define a proposal distribution. Parameters ---------- indices : iterable of int The indices of the parameters that should be drawn from this proposal distribution. proposal_dist : callable, optional A callable to calculate the proposal distribution evaluation. It is defined as ``def name(x, para, obj)``, where ``obj`` is a :py:class:`ProposalDistribution` instance. The function should return a new proposal for ``x``. ``para`` are all other parameters. Default is a normal distribution with the default ``init_prop_Sigma``. init_prop_Sigma : np.ndarray, optional If the proposal distribution is normal this is the covariance of the initial proposal distribution. It will be updated if adapt_callable = ``normal`` or a callable. While it is optional to provide it, it is highly recommended to do so. A warning will be printed in that case. A good rule of thumb is to orient yourself on the size of your domain. The default is the identity matrix. adapt_callable : Callable or None or string, optional A callable to adapt the distribution. The default is None which means the proposal distribution will not be adapted. Use ``normal`` (default) for the default adaption procedure for normal distributions. The callable should be defined as ``def adapt(index, mcmc_obj)`` and not return anything but update the :py:attr:`ProposalDistribution.prop_args` attribute. Note, any adapt function will have to be well thought through. Most adapt functions will not lead to a stationary final distributions. Use with caution. auto_accept : bool, optional Indicates whether to auto-accept the jump. prop_args : Any, optional Arguments that will be available as obj attribute in ``proposal_dist`` and ``adapt_callable``. """ self.indices = indices self.r_opt = r_opt self.c_0 = c_0 self.c_1 = c_1 self.K = K self.auto_accept = auto_accept self.adapt_cov = adapt_cov self.ID = ID dim = len(indices) self.jump_trace = [] if proposal_dist == "normal": self.proposal_dist = self.normal_proposal_dist elif callable(proposal_dist): self.proposal_dist = proposal_dist else: raise Exception("No proposal distribution specified!") if proposal_dist == "normal" and init_prop_Sigma is None: init_prop_Sigma = np.identity(dim) warnings.warn("You are using the normal proposal distribution for normal distributions\n \ but did not provide `init_prop_sigma`. This can lead to slow convergence") if callable(adapt_callable): self.adapt = adapt_callable elif adapt_callable == "normal" or proposal_dist == "normal": self.adapt = self._adapt else: if isinstance(adapt_callable, str): raise Exception("Invalid string provided for adapt callable.") self.adapt = self._no_adapt if prop_args is None: self.prop_args = {"prop_Sigma": init_prop_Sigma, "sigma_m": 2.4 ** 2 / dim} else: self.prop_args = prop_args if adapt_callable == "normal": self.prop_args["prop_Sigma"] = init_prop_Sigma self.prop_args["sigma_m"] = 2.4 ** 2 / dim #########################################################
[docs] def normal_proposal_dist(self, x, hps, obj): """ Default multivariate-normal proposal: draw from N(x, Σ) where Σ is ``obj.prop_args["prop_Sigma"]``. Parameters ---------- x : np.ndarray Current position in hyperparameter space, shape (N,). hps : np.ndarray Current hyperparameters (unused here; present for interface compatibility). obj : ProposalDistribution The enclosing proposal-distribution object, which carries ``prop_args``. Returns ------- proposal : np.ndarray Proposed hyperparameter vector, shape (N,). """ cov = obj.prop_args["prop_Sigma"] proposal_hps = np.random.multivariate_normal( mean=x, cov=cov, size=1).reshape(len(x)) return proposal_hps
def _adapt(self, end, mcmc_obj): K = self.K if (end % K) == 0: k = 3 c_0 = self.c_0 c_1 = self.c_1 r_opt = self.r_opt prop_Sigma = self.prop_args["prop_Sigma"] sigma_m = self.prop_args["sigma_m"] jump_trace = self.jump_trace trace = np.asarray(mcmc_obj.trace["x"]).T start = (end - K + 1) gamma2 = 1. / ((end / K) + k) ** c_1 gamma1 = c_0 * gamma2 r_hat = np.mean(jump_trace[start: end]) sigma_m = np.exp(np.log(sigma_m) + gamma1 * (r_hat - r_opt)) if self.adapt_cov: prop_Sigma = prop_Sigma + gamma2 * (np.cov(trace[self.indices, start: end]) - prop_Sigma) self.prop_args["prop_Sigma"] = prop_Sigma self.prop_args["sigma_m"] = sigma_m def _no_adapt(self, end, mcmc_obj): return def __getstate__(self): return self.__dict__ def __setstate__(self, state): self.__dict__.update(state)