#!/usr/bin/env python
"""
Functions which perform standard and dynamic nested sampling runs and generate
samples for use in evidence calculations and parameter estimation.
Nested sampling runs are stored a format compatible with the nestcheck
package.
"""
import warnings
import copy
import numpy as np
import scipy.special
import nestcheck.ns_run_utils
import nestcheck.parallel_utils as pu
import nestcheck.io_utils as iou
import perfectns.maths_functions as mf
[docs]def generate_ns_run(settings, random_seed=None):
"""
Performs perfect nested sampling calculation and returns a nested sampling
run in the form of a dictionary.
This function is just a wrapper around the
generate_standard_run (performs standard nested sampling) and
generate_dynamic_run (performs dynamic nested sampling) which are chosen
depending on the input settings.
Parameters
----------
settings: PerfectNSSettings object
random_seed: None, bool or int, optional
Set numpy random seed. Default is to use None (so a random seed is
chosen from the computer's internal state) to ensure reliable results
when multiprocessing. Can set to an integer or to False to not edit the
seed.
Returns
-------
dict
Nested sampling run dictionary containing information about the run's
posterior samples and a record of the settings used. These are as
separate arrays giving values for points in order of increasing
likelihood.
Keys:
'settings': dict recording settings used.
'logl': 1d array of log likelihoods.
'r': 1d array of radial coordinates.
'logx': 1d array of logx coordinates.
'theta': 2d array, each row is sample coordinate. The number of
co-ordinates saved is controlled by
settings.dims_to_sample.
'nlive_array': 1d array of the local number of live points at each
sample.
'thread_min_max': 2d array containing likelihoods at which each
nested sampling thread begins and ends.
'thread_labels': 1d array listing the threads each sample belongs
to.
"""
if random_seed is not False:
np.random.seed(random_seed)
if settings.dynamic_goal is None:
run = generate_standard_run(settings)
else:
run = generate_dynamic_run(settings)
run['random_seed'] = random_seed
return run
[docs]def get_run_data(settings, n_repeat, **kwargs):
"""
Tests if runs with the specified settings are already cached. If not
the runs are generated and saved.
Parameters
----------
settings: PerfectNSSettings object
n_repeat: int
Number of nested sampling runs to generate.
parallel: bool, optional
Should runs be generated in parallel?
max_workers: int or None, optional
Number of processes.
If max_workers is None then concurrent.futures.ProcessPoolExecutor
defaults to using the number of processors of the machine.
N.B. If max_workers=None and running on supercomputer clusters with
multiple nodes, this may default to the number of processors on a
single node and therefore there will be no speedup from multiple
nodes (must specify manually in this case).
load: bool, optional
Should previously saved runs be loaded? If False, new runs are
generated.
save: bool, optional
Should any new runs generated be saved?
cache_dir: str, optional
Directory for caching
overwrite_existing: bool, optional
if a file exists already but we generate new run data, should we
overwrite the existing file when saved?
check_loaded_settings: bool, optional
if we load a cached file, should we check if the loaded file's settings
match the current settings (and generate fresh runs if they do not)?
random_seeds: list, optional
random_seed arguments for each call of generate_ns_run.
Returns
-------
run_list
list of n_repeat nested sampling runs.
"""
parallel = kwargs.pop('parallel', True)
max_workers = kwargs.pop('max_workers', None)
load = kwargs.pop('load', True)
save = kwargs.pop('save', True)
cache_dir = kwargs.pop('cache_dir', 'cache')
overwrite_existing = kwargs.pop('overwrite_existing', True)
check_loaded_settings = kwargs.pop('check_loaded_settings', True)
random_seeds = kwargs.pop('random_seeds', [None] * n_repeat)
assert len(random_seeds) == n_repeat
if kwargs:
raise TypeError('Unexpected **kwargs: {0}'.format(kwargs))
save_name = cache_dir + '/' + settings.save_name()
save_name += '_' + str(n_repeat) + 'reps'
if load:
try:
data = iou.pickle_load(save_name)
if check_loaded_settings:
# Assume all runs in the loaded list have the same settings, in
# which case we only need check the first one.
loaded = copy.deepcopy(data[0]['settings'])
current = copy.deepcopy(settings.get_settings_dict())
# If runs are standard nested sampling there is no need to
# check settings which only affect dynamic ns match
if loaded['dynamic_goal'] is None and (current['dynamic_goal']
is None):
for key in ['dynamic_goal', 'n_samples_max', 'ninit',
'nbatch', 'dynamic_fraction',
'tuned_dynamic_p']:
del loaded[key]
del current[key]
if loaded != current:
# remove shared keys and only print differences
rm = [k for k in set(loaded.keys()) & set(current.keys())
if loaded[k] == current[k]]
loaded_diff = {k: v for k, v in loaded.items() if k
not in rm}
current_diff = {k: v for k, v in current.items() if k
not in rm}
msg = (('Loaded settings != current settings. Differences '
'are: {0} != = {1}. Generating new runs instead.')
.format(loaded_diff, current_diff))
warnings.warn(msg, UserWarning)
del data
load = False
except (OSError, EOFError) as exception:
print(('Loading {0} failed due to {1}'
.format(save_name, type(exception).__name__)
+ ' - try generating new runs instead.'))
load = False
overwrite_existing = True
if not load:
# Must check cache is up to date before parallel_apply or each process
# will have to update the cache seperately
if type(settings.prior).__name__ == 'GaussianCached':
settings.prior.check_cache(settings.n_dim)
data = pu.parallel_apply(generate_ns_run, random_seeds,
func_pre_args=(settings,),
max_workers=max_workers,
parallel=parallel)
if save:
iou.pickle_save(data, save_name,
overwrite_existing=overwrite_existing)
return data
[docs]def generate_standard_run(settings, is_dynamic_initial_run=False):
"""
Performs standard nested sampling using the likelihood and prior specified
in settings.
The run terminates when the estimated posterior mass contained in the live
points is less than settings.termination_fraction. The evidence in the
remaining live points is estimated as
Z_{live} = average likelihood of live points * prior volume remaining
Parameters
----------
settings: PerfectNSSettings object
is_dynamic_initial_run: bool, optional
Set to True if this is the initial exploratory run in dynamic nested
sampling.
Returns
-------
run: dict
Nested sampling run dictionary containing information about the run's
posterior samples and a record of the settings used. See docstring for
generate_ns_run for more details.
"""
if is_dynamic_initial_run:
nlive_const = settings.ninit
else:
nlive_const = settings.nlive_const
# Sample live points as a 2-dimensional array with columns:
# [loglikelihood, radial coordinate, logx coordinate, thread label]
live_points = np.zeros((nlive_const, 4))
# Thread labels are 1 to nlive_const
live_points[:, 3] = np.arange(nlive_const)
live_points[:, 2] = np.log(np.random.random(live_points.shape[0]))
live_points[:, 1] = settings.r_given_logx(live_points[:, 2])
live_points[:, 0] = settings.logl_given_r(live_points[:, 1])
# termination condition variables
logx_i = 0.0
logz_dead = -np.inf
logz_live = (scipy.special.logsumexp(live_points[:, 0]) + logx_i -
np.log(nlive_const))
# Calculate factor for trapezium rule of geometric series
shrinkage = np.exp(-1.0 / nlive_const)
logtrapz = np.log(0.5 * ((shrinkage ** -1) - shrinkage))
# start the array of dead points
dead_points_list = []
while logz_live - np.log(settings.termination_fraction) > logz_dead:
# add to dead points
ind = np.where(live_points[:, 0] == live_points[:, 0].min())[0][0]
dead_points_list.append(copy.deepcopy(live_points[ind, :]))
# update dead evidence estimates
logx_i += -1.0 / nlive_const
logz_dead = scipy.special.logsumexp((logz_dead, live_points[ind, 0] +
logtrapz + logx_i))
# add new point
live_points[ind, 2] += np.log(np.random.random())
live_points[ind, 1] = settings.r_given_logx(live_points[ind, 2])
live_points[ind, 0] = settings.logl_given_r(live_points[ind, 1])
logz_live = (scipy.special.logsumexp(live_points[:, 0]) + logx_i -
np.log(nlive_const))
points = np.array(dead_points_list)
# add remaining live points (sorted by increasing likelihood)
points = np.vstack((points, live_points[np.argsort(live_points[:, 0])]))
# Create a dictionary representing the nested sampling run
run = {'settings': settings.get_settings_dict(),
'logl': points[:, 0],
'r': points[:, 1],
'logx': points[:, 2],
'thread_labels': points[:, 3].astype(int)}
# add array of parameter values sampled from the hyperspheres corresponding
# to the radial coordinate of each point.
run['theta'] = mf.sample_nsphere_shells(run['r'], settings.n_dim,
settings.dims_to_sample)
# Add an array of the local number of live points - this equals nlive_const
# until the run terminates, at which point it reduces by 1 as each thread
# ends.
run['nlive_array'] = np.zeros(run['logl'].shape[0]) + nlive_const
for i in range(1, nlive_const):
run['nlive_array'][-i] = i
# Get array of data on threads' beginnings and ends. Each starts by
# sampling the whole prior and ends on one of the final live points.
run['thread_min_max'] = np.zeros((nlive_const, 2))
run['thread_min_max'][:, 0] = -np.inf
run['thread_min_max'][:, 1] = live_points[:, 0]
return run
# Make dynamic ns run:
# --------------------
[docs]def generate_dynamic_run(settings):
"""
Generate a dynamic nested sampling run.
For details of the dynamic nested sampling algorithm, see 'Dynamic nested
sampling: an improved algorithm for nested sampling parameter estimation
and evidence calculation' (Higson et al., 2019).
The run terminates when the number of samples reaches some limit
settings.n_samples_max. If this is not set, the function will estimate the
number of samples that a standard nested sampling run with
settings.nlive_const would use from the number of samples in the initial
exploratory run.
Parameters
----------
settings: PerfectNSSettings object
settings.dynamic_goal controls whether the algorithm aims to increase
parameter estimation accuracy (dynamic_goal=1), evidence accuracy
(dynamic_goal=0) or places some weight on both.
Returns
-------
dict
Nested sampling run dictionary containing information about the run's
posterior samples and a record of the settings used. See docstring for
generate_ns_run for more details.
"""
assert 1 >= settings.dynamic_goal >= 0, 'dynamic_goal = ' + \
str(settings.dynamic_goal) + ' should be between 0 and 1'
# Step 1: initial exploratory standard ns run with ninit live points
# ------------------------------------------------------------------
standard_run = generate_standard_run(settings, is_dynamic_initial_run=True)
# create samples array with columns:
# [logl, r, logx, thread label, change in nlive, params]
samples = samples_array_given_run(standard_run)
thread_min_max = standard_run['thread_min_max']
n_samples = samples.shape[0]
n_samples_max = copy.deepcopy(settings.n_samples_max)
if n_samples_max is None:
# estimate number of likelihood calls available
n_samples_max = n_samples * (settings.nlive_const / settings.ninit)
# Step 2: add samples wherever they are most useful
# -------------------------------------------------
while n_samples < n_samples_max:
importance = point_importance(samples, thread_min_max, settings)
logl_min_max, logx_min_max = min_max_importance(importance,
samples,
settings)
for _ in range(settings.nbatch):
# make new thread
thread_label = thread_min_max.shape[0]
thread = generate_single_thread(settings,
logx_min_max[1],
thread_label,
logx_start=logx_min_max[0],
keep_final_point=True)
# update run
if logl_min_max[0] != -np.inf:
start_ind = np.where(samples[:, 0] == logl_min_max[0])[0]
# check there is exactly one point with the likelihood at which
# the new thread starts, and note that nlive increases by 1
assert start_ind.shape == (1,)
samples[start_ind, 4] += 1
samples = np.vstack((samples, thread))
lmm = np.asarray([logl_min_max[0], thread[-1, 0]])
thread_min_max = np.vstack((thread_min_max, lmm))
# sort array and update n_samples in preparation for the next iteration
samples = samples[np.argsort(samples[:, 0])]
n_samples = samples.shape[0]
# To compute nlive from the changes in nlive at each step, first find nlive
# for the first point (= the number of threads which sample from the entire
# prior)
run = dict_given_samples_array(samples, thread_min_max)
run['settings'] = settings.get_settings_dict()
return run
# Dynamic NS helper functions
# ------------------------------
[docs]def generate_thread_logx(logx_end, logx_start=0, keep_final_point=True):
"""
Generate logx co-ordinates of a new nested sampling thread (single live
point run).
Parameters
----------
logx_end: float
Logx value at which run terminates.
logx_start: float, optional.
Logx value at which run starts. 0 corresponds to sampling from the
whole prior.
keep_final_point: bool, optional
If False, the final point with logx less than logx_end is removed.
Returns
-------
logx_list: list of floats
"""
logx_list = [logx_start + np.log(np.random.random())]
while logx_list[-1] > logx_end:
logx_list.append(logx_list[-1] + np.log(np.random.random()))
if not keep_final_point:
del logx_list[-1] # remove point which violates termination condition
return logx_list
[docs]def generate_single_thread(settings, logx_end, thread_label, logx_start=0,
keep_final_point=True):
"""
Generates a samples array for a thread (single live point run) between
logx_start and logx_end.
Settings argument specifies how the calculation is done.
Parameters
----------
settings: PerfectNSSettings object
logx_end: float
thread_label: int
Index labelling the thread.
logx_start: float, optional
keep_final_point: bool, optional
See generate_thread_logx docstring.
"""
assert logx_start > logx_end, 'generate_single_thread: logx_start=' + \
str(logx_start) + ' <= logx_end=' + str(logx_end)
logx_list = generate_thread_logx(logx_end, logx_start=logx_start,
keep_final_point=keep_final_point)
if not logx_list: # PEP8 method for testing if sequence is empty
return None
else:
lrxtn = np.zeros((len(logx_list), 5))
lrxtn[:, 3] = thread_label
lrxtn[:, 2] = np.asarray(logx_list)
lrxtn[:, 1] = settings.r_given_logx(lrxtn[:, 2])
lrxtn[:, 0] = settings.logl_given_r(lrxtn[:, 1])
# Check there are non nans mapping logx to r
assert np.all(~np.isnan(lrxtn[:, 1])), (
'nans in r_given_logx(logx)=' + str(lrxtn[:, 1]) +
'\nrows with nans are ' +
str(lrxtn[np.where(np.isnan(lrxtn[:, 1]))[0], :]) +
'\nperhaps the prior is numerically unstable?')
# set change in nlive to -1 where thread ends (zero elsewhere)
lrxtn[-1, 4] = -1
theta = mf.sample_nsphere_shells(lrxtn[:, 1],
settings.n_dim,
settings.dims_to_sample)
return np.hstack([lrxtn, theta])
[docs]def point_importance(samples, thread_min_max, settings, simulate=False):
"""
Calculate the relative importance of each point for use in the dynamic
nested sampling algorithm.
For more details see 'Dynamic nested sampling: an improved algorithm for
nested sampling parameter estimation and evidence calculation' (Higson et
al., 2019).
Parameters
----------
samples: 2d numpy array
See dict_given_samples_arrry docstring for details of columns.
thread_min_max: 2d numpy array
First column is starting logl of each thread and second column is
ending logl.
settings: PerfectNSSettings object
simulate: bool, optional
Passed to nestcheck.ns_run_utils.get_logw.
Returns
-------
importance: 1d numpy array
Relative point importances of the rows of the input samples array.
Normalised so the biggest value in the array is equal to 1.
"""
run_dict = dict_given_samples_array(samples, thread_min_max)
logw = nestcheck.ns_run_utils.get_logw(run_dict, simulate=simulate)
w_relative = np.exp(logw - logw.max())
if settings.dynamic_goal == 0:
return z_importance(w_relative, run_dict['nlive_array'])
elif settings.dynamic_goal == 1:
return p_importance(run_dict['theta'], w_relative,
tuned_dynamic_p=settings.tuned_dynamic_p)
else:
imp_z = z_importance(w_relative, run_dict['nlive_array'])
imp_p = p_importance(run_dict['theta'], w_relative,
tuned_dynamic_p=settings.tuned_dynamic_p)
importance = (imp_z / np.sum(imp_z)) * (1.0 - settings.dynamic_goal)
importance += (imp_p / np.sum(imp_p)) * settings.dynamic_goal
return importance / importance.max()
[docs]def z_importance(w_relative, nlive):
"""
Calculate the relative importance of each point for evidence calculation.
For more details see 'Dynamic nested sampling: an improved algorithm for
nested sampling parameter estimation and evidence calculation'
(Higson et al., 2019).
Parameters
----------
w_relative: 1d numpy array
Relative point weights.
nlive: 1d numpu array
Number of live points.
Returns
-------
importance: 1d numpy array
Relative point importances.
Normalised so the biggest value in the array is equal to 1.
"""
importance = np.cumsum(w_relative)
importance = importance.max() - importance
importance *= 1.0 / nlive
return importance / importance.max()
[docs]def p_importance(theta, w_relative, tuned_dynamic_p=False,
tuning_type='theta1'):
"""
Calculate the relative importance of each point for parameter estimation.
For more details see 'Dynamic nested sampling: an improved algorithm for
nested sampling parameter estimation and evidence calculation' (Higson et
al., 2019).
Parameters
----------
theta: 2d numpy array
Each row gives parameter values of samples.
w_relative: 1d numpy array
Relative point weights.
tuned_dynamic_p: bool, optional
Whether or not to tune for a specific parameter estimation problem.
See the dynamic nested sampling paper for more details.
tuning_type: str, optional
Which parameter estimation problem to tune for. Only used if
tuned_dynamic_p is True. So far only set up to tune for the mean of the
first parameter.
Returns
-------
importance: 1d numpy array
Relative point importances.
Normalised so the biggest value in the array is equal to 1.
"""
if tuned_dynamic_p is False:
return w_relative / w_relative.max()
else:
assert tuning_type == 'theta1', 'so far only set up for theta1'
if tuning_type == 'theta1':
ftheta = theta[:, 0]
# calculate importance in proportion to difference between f values and
# the calculation mean.
fabs = np.absolute(ftheta - (np.sum(ftheta * w_relative) /
np.sum(w_relative)))
importance = fabs * w_relative
return importance / importance.max()
[docs]def min_max_importance(importance, samples, settings):
"""
Find the logl and logx values at which to start and end additional dynamic
nested sampling threads.
Parameters
----------
importance: 1d numpy array
Relative importances of samples.
samples: 2d numpy array
See dict_given_samples_arrry docstring for details of columns.
settings: PerfectNSSettings object
Returns
-------
list of two floats
Contains logl_min and logl_max defining the start and end of the region
from which new points should be sampled.
list of two floats
Logx values corresponding to logl_min and logl_max.
"""
assert settings.dynamic_fraction > 0. and settings.dynamic_fraction < 1., \
'min_max_importance: settings.dynamic_fraction = ' + \
str(settings.dynamic_fraction) + ' must be in [0, 1]'
# where to start the additional threads:
high_importance_inds = np.where(importance > settings.dynamic_fraction)[0]
if high_importance_inds[0] == 0: # start from sampling the whole prior
logl_min = -np.inf
logx_min = 0
else:
logl_min = samples[:, 0][high_importance_inds[0] - 1]
# Use lookup to avoid recalculating the logx values (otherwise there
# may be float comparison errors).
ind = np.where(samples[:, 0] == logl_min)[0]
assert ind.shape == (1,), \
'Should be one unique match for logl=logl_min=' + str(logl_min) + \
'. Instead we have matches at indexes ' + str(ind) + \
' of the samples array (shape ' + str(samples.shape) + ')'
logx_min = samples[ind[0], 2]
# where to end the additional threads:
if high_importance_inds[-1] == samples[:, 0].shape[0] - 1:
logl_max = samples[-1, 0]
logx_max = samples[-1, 2]
else:
logl_max = samples[:, 0][(high_importance_inds[-1] + 1)]
# Use lookup to avoid recalculating the logx values (otherwise there
# may be float comparison errors).
ind = np.where(samples[:, 0] == logl_max)[0]
assert ind.shape == (1,), \
'Should be one unique match for logl=logl_max=' + str(logl_max) + \
'.\n Instead we have matches at indexes ' + str(ind) + \
' of the samples array (shape ' + str(samples.shape) + ')'
logx_max = samples[ind[0], 2]
return [logl_min, logl_max], [logx_min, logx_max]
[docs]def samples_array_given_run(ns_run):
"""
Converts information on samples in a nested sampling run dictionary into a
numpy array representation. This allows fast addition of more samples and
recalculation of nlive.
Parameters
----------
ns_run: dict
Nested sampling run dictionary.
Contains keys: 'logl', 'r', 'logx', 'thread_label', 'nlive_array',
'theta'
Returns
-------
samples: numpy array
Numpy array containing columns
[logl, r, logx, thread label, change in nlive at sample, (thetas)]
with each row representing a single sample.
"""
samples = np.zeros((ns_run['logl'].shape[0], 5 + ns_run['theta'].shape[1]))
samples[:, 0] = ns_run['logl']
samples[:, 1] = ns_run['r']
samples[:, 2] = ns_run['logx']
samples[:, 3] = ns_run['thread_labels'].astype(int)
# Calculate 'change in nlive' after each step
samples[:-1, 4] = np.diff(ns_run['nlive_array'])
samples[-1, 4] = -1 # nlive drops to zero after final point
samples[:, 5:] = ns_run['theta']
return samples
[docs]def dict_given_samples_array(samples, thread_min_max):
"""
Converts an array of information about samples back into a dictionary.
Parameters
----------
samples: numpy array
Numpy array containing columns
[logl, r, logx, thread label, change in nlive at sample, (thetas)]
with each row representing a single sample.
thread_min_max': numpy array, optional
2d array with a row for each thread containing the likelihoods at which
it begins and ends.
Needed to calculate nlive_array (otherwise this is set to None).
Returns
-------
ns_run: dict
Nested sampling run dictionary corresponding to the samples array.
Contains keys: 'logl', 'r', 'logx', 'thread_label', 'nlive_array',
'theta'
N.B. this does not contain a record of the run's settings.
"""
nlive_0 = (thread_min_max[:, 0] == -np.inf).sum()
nlive_array = np.zeros(samples.shape[0]) + nlive_0
nlive_array[1:] += np.cumsum(samples[:-1, 4])
assert nlive_array.min() > 0, (
'nlive contains 0s or negative values!' +
'\nnlive_array = ' + str(nlive_array) +
'\nfinal row of samples arr = ' + str(samples[-4:, :]) +
'\nthread_min_max = ' + str(thread_min_max))
assert nlive_array[-1] == 1, (
'final point in nlive_array != 1!' +
'\nnlive_array = ' + str(nlive_array) +
'\nfinal row of samples arr = ' + str(samples[-4:, :]) +
'\nthread_min_max = ' + str(thread_min_max))
ns_run = {'logl': samples[:, 0],
'r': samples[:, 1],
'logx': samples[:, 2],
'thread_labels': samples[:, 3].astype(int),
'nlive_array': nlive_array,
'thread_min_max': thread_min_max,
'theta': samples[:, 5:]}
return ns_run