# Copyright (C) 2016 Christopher M. Biwer
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# self.option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
"""This modules defines functions for reading and writing samples that the
inference samplers generate.
"""
import os
import sys
import logging
import numpy
import h5py
from pycbc import DYN_RANGE_FAC
from pycbc.io import FieldArray
from pycbc.types import FrequencySeries
from pycbc.waveform import parameters as wfparams
from .. import sampler as gwin_sampler
class _PosteriorOnlyParser(object):
"""Provides interface for reading/writing samples from/to an InferenceFile
that contains flattened posterior samples.
"""
@staticmethod
def _read_fields(fp, fields_group, fields, array_class,
thin_start=None, thin_interval=None, thin_end=None,
iteration=None):
"""Reads fields from the given file.
"""
if iteration is not None:
get_index = iteration
else:
get_index = fp.get_slice(thin_start=thin_start, thin_end=thin_end,
thin_interval=thin_interval)
# load
arrays = {}
group = fields_group + '/{}'
arrays = {field: fp[group.format(field)][get_index]
for field in fields}
return array_class.from_kwargs(**arrays)
@classmethod
def read_samples(cls, fp, parameters, samples_group=None,
thin_start=0, thin_end=None, thin_interval=1,
iteration=None, array_class=None):
"""Reads posterior samples from a posterior-only file.
"""
# get the group to load from
if samples_group is None:
samples_group = fp.samples_group
# get the type of array class to use
if array_class is None:
array_class = FieldArray
# get the names of fields needed for the given parameters
possible_fields = fp[samples_group].keys()
loadfields = array_class.parse_parameters(parameters, possible_fields)
return cls._read_fields(fp, samples_group, loadfields, array_class,
thin_start=thin_start,
thin_interval=thin_interval, thin_end=thin_end,
iteration=iteration)
@staticmethod
def write_samples_group(fp, samples_group, fields, samples):
"""Writes the given samples to the given samples group.
"""
for field in samples.fieldnames:
grp = '{}/{}'.format(samples_group, field)
fp[grp] = samples[field]
@classmethod
def n_independent_samples(cls, fp):
"""Returns the number of independent samples stored in the file.
"""
return cls.read_samples(fp, fp.variable_params[0]).size
[docs]class InferenceFile(h5py.File):
""" A subclass of the h5py.File object that has extra functions for
handling reading and writing the samples from the samplers.
Parameters
-----------
path : str
The path to the HDF file.
mode : {None, str}
The mode to open the file, eg. "w" for write and "r" for read.
"""
name = "hdf"
samples_group = 'samples'
stats_group = 'model_stats'
sampler_group = 'sampler_states'
def __init__(self, path, mode=None, **kwargs):
super(InferenceFile, self).__init__(path, mode, **kwargs)
@property
def posterior_only(self):
"""Whether the file only contains flattened posterior samples.
"""
try:
return self.attrs['posterior_only']
except KeyError:
return False
@property
def sampler_name(self):
"""Returns the name of the sampler that was used."""
return self.attrs["sampler"]
@property
def sampler_class(self):
"""Returns the sampler class that was used."""
try:
sampler = self.sampler_name
except KeyError:
return None
return gwin_sampler.samplers[sampler]
@property
def samples_parser(self):
"""Returns the class to use to read/write samples from/to the file."""
if self.posterior_only:
return _PosteriorOnlyParser
else:
return self.sampler_class
@property
def model_name(self):
"""Returns the name of the model that was used."""
return self.attrs["model"]
@property
def variable_params(self):
"""Returns list of variable_params.
Returns
-------
variable_params : {list, str}
List of str that contain variable_params keys.
"""
return self.attrs["variable_params"]
@property
def static_params(self):
"""Returns a dictionary of the static_params. The keys are the argument
names, values are the value they were set to.
"""
return {arg: self.attrs[arg] for arg in self.attrs["static_params"]}
@property
def sampling_params(self):
"""Returns the parameters that were used to sample.
Returns
-------
sampling_params : {list, str}
List of the sampling params.
"""
return self.attrs["sampling_params"]
@property
def lognl(self):
"""Returns the log noise likelihood."""
return self.attrs["lognl"]
@property
def niterations(self):
"""Returns number of iterations performed.
Returns
-------
niterations : int
Number of iterations performed.
"""
return self.attrs["niterations"]
@property
def n_independent_samples(self):
"""Returns the number of independent samples stored in the file.
"""
return self.samples_parser.n_independent_samples(self)
@property
def burn_in_iterations(self):
"""Returns number of iterations in the burn in.
"""
return self.attrs["burn_in_iterations"]
@property
def is_burned_in(self):
"""Returns whether or not the sampler is burned in.
"""
return self.attrs["is_burned_in"]
@property
def nwalkers(self):
"""Returns number of walkers used.
Returns
-------
nwalkesr : int
Number of walkers used.
"""
return self.attrs["nwalkers"]
@property
def ntemps(self):
"""Returns number of temperatures used."""
return self.attrs["ntemps"]
@property
def acl(self):
""" Returns the saved autocorelation length (ACL).
Returns
-------
acl : {int, float}
The ACL.
"""
return self.attrs["acl"]
@property
def cmd(self):
"""Returns the (last) saved command line.
If the file was created from a run that resumed from a checkpoint, only
the last command line used is returned.
Returns
-------
cmd : string
The command line that created this InferenceFile.
"""
cmd = self.attrs["cmd"]
if isinstance(cmd, numpy.ndarray):
cmd = cmd[-1]
return cmd
@property
def resume_points(self):
"""The iterations at which a run was resumed from checkpoint.
Returns
-------
resume_points : array or None
An array of integers giving the points at which the run resumed.
Raises
------
KeyError
If the run never resumed from a checkpoint.
"""
return self.attrs['resume_points']
@property
def log_evidence(self):
"""Returns the log of the evidence and its error, if they exist in the
file. Raises a KeyError otherwise.
"""
return self.attrs["log_evidence"], self.attrs["dlog_evidence"]
[docs] def read_samples(self, parameters, samples_group=None, **kwargs):
"""Reads samples from the file.
Parameters
-----------
parameters : (list of) strings
The parameter(s) to retrieve. A parameter can be the name of any
field in `samples_group`, a virtual field or method of
`FieldArray` (as long as the file contains the necessary fields
to derive the virtual field or method), and/or a function of
these.
samples_group : str
Group in HDF InferenceFile that parameters belong to.
**kwargs :
The rest of the keyword args are passed to the sampler's
`read_samples` method.
Returns
-------
FieldArray
Samples for the given parameters, as an instance of a
FieldArray.
"""
# get the appropriate sampler class
samples_group = samples_group if samples_group else self.samples_group
return self.samples_parser.read_samples(self, parameters,
samples_group=samples_group,
**kwargs)
[docs] def read_model_stats(self, **kwargs):
"""Reads model stats from self.
Parameters
-----------
**kwargs :
The keyword args are passed to the sampler's
``read_model_stats`` method.
Returns
-------
stats : {FieldArray, None}
Likelihood stats in the file, as a FieldArray. The fields of the
array are the names of the stats that are in the ``model_stats``
group.
"""
parameters = self[self.stats_group].keys()
return self.read_samples(parameters, samples_group=self.stats_group,
**kwargs)
[docs] def read_acceptance_fraction(self, **kwargs):
"""Returns the acceptance fraction that was written to the file.
Parameters
----------
**kwargs :
All keyword arguments are passed to the sampler's
`read_acceptance_fraction` function.
Returns
-------
numpy.array
The acceptance fraction.
"""
return self.sampler_class.read_acceptance_fraction(self, **kwargs)
[docs] def read_acls(self):
"""Returns all of the individual chains' acls. See the `read_acls`
function of this file's sampler for more details.
"""
return self.sampler_class.read_acls(self)
[docs] def read_label(self, parameter, error_on_none=False):
"""Returns the label for the parameter.
Parameters
-----------
parameter : str
Name of parameter to get a label for. Will first try to retrieve
a label from this file's "label" attributes. If the parameter
is not found there, will look for a label from
pycbc.waveform.parameters.
error_on_none : {False, bool}
If True, will raise a ValueError if a label cannot be found, or if
the label is None. Otherwise, the parameter will just be returned
if no label can be found.
Returns
-------
label : str
A formatted string for the name of the paramter.
"""
# get label
try:
label = self[parameter].attrs["label"]
except KeyError:
# try looking in pycbc.waveform.parameters
try:
label = getattr(wfparams, parameter).label
except AttributeError:
label = None
if label is None:
if error_on_none:
raise ValueError("Cannot find a label for paramter %s" % (
parameter))
else:
return parameter
return label
[docs] def read_random_state(self, group=None):
""" Reads the state of the random number generator from the file.
Parameters
----------
group : str
Name of group to read random state from.
Returns
-------
tuple
A tuple with 5 elements that can be passed to numpy.set_state.
"""
group = self.sampler_group if group is None else group
dataset_name = "/".join([group, "random_state"])
arr = self[dataset_name][:]
s = self[dataset_name].attrs["s"]
pos = self[dataset_name].attrs["pos"]
has_gauss = self[dataset_name].attrs["has_gauss"]
cached_gauss = self[dataset_name].attrs["cached_gauss"]
return s, arr, pos, has_gauss, cached_gauss
[docs] def write_strain(self, strain_dict, group=None):
"""Writes strain for each IFO to file.
Parameters
-----------
strain : {dict, FrequencySeries}
A dict of FrequencySeries where the key is the IFO.
group : {None, str}
The group to write the strain to. If None, will write to the top
level.
"""
subgroup = "{ifo}/strain"
if group is None:
group = subgroup
else:
group = '/'.join([group, subgroup])
for ifo, strain in strain_dict.items():
self[group.format(ifo=ifo)] = strain
self[group.format(ifo=ifo)].attrs['delta_t'] = strain.delta_t
self[group.format(ifo=ifo)].attrs['start_time'] = \
float(strain.start_time)
[docs] def write_stilde(self, stilde_dict, group=None):
"""Writes stilde for each IFO to file.
Parameters
-----------
stilde : {dict, FrequencySeries}
A dict of FrequencySeries where the key is the IFO.
group : {None, str}
The group to write the strain to. If None, will write to the top
level.
"""
subgroup = "{ifo}/stilde"
if group is None:
group = subgroup
else:
group = '/'.join([group, subgroup])
for ifo, stilde in stilde_dict.items():
self[group.format(ifo=ifo)] = stilde
self[group.format(ifo=ifo)].attrs['delta_f'] = stilde.delta_f
self[group.format(ifo=ifo)].attrs['epoch'] = float(stilde.epoch)
[docs] def write_psd(self, psds, low_frequency_cutoff, group=None):
"""Writes PSD for each IFO to file.
Parameters
-----------
psds : {dict, FrequencySeries}
A dict of FrequencySeries where the key is the IFO.
low_frequency_cutoff : {dict, float}
A dict of the low-frequency cutoff where the key is the IFO. The
minimum value will be stored as an attr in the File.
group : {None, str}
The group to write the strain to. If None, will write to the top
level.
"""
subgroup = "{ifo}/psds/0"
if group is None:
group = subgroup
else:
group = '/'.join([group, subgroup])
self.attrs["low_frequency_cutoff"] = min(low_frequency_cutoff.values())
for ifo in psds:
self[group.format(ifo=ifo)] = psds[ifo]
self[group.format(ifo=ifo)].attrs['delta_f'] = psds[ifo].delta_f
[docs] def write_data(self, strain_dict=None, stilde_dict=None,
psd_dict=None, low_frequency_cutoff_dict=None,
group=None):
"""Writes the strain/stilde/psd.
Parameters
----------
strain_dict : {None, dict}
A dictionary of strains. If None, no strain will be written.
stilde_dict : {None, dict}
A dictionary of stilde. If None, no stilde will be written.
psd_dict : {None, dict}
A dictionary of psds. If None, no psds will be written.
low_freuency_cutoff_dict : {None, dict}
A dictionary of low frequency cutoffs used for each detector in
`psd_dict`; must be provided if `psd_dict` is not None.
group : {None, str}
The group to write the strain to. If None, will write to the top
level.
"""
# save PSD
if psd_dict is not None:
if low_frequency_cutoff_dict is None:
raise ValueError("must provide low_frequency_cutoff_dict if "
"saving psds to output")
# apply dynamic range factor for saving PSDs since
# plotting code expects it
psd_dyn_dict = {}
for key, val in psd_dict.iteritems():
psd_dyn_dict[key] = FrequencySeries(val*DYN_RANGE_FAC**2,
delta_f=val.delta_f)
self.write_psd(psds=psd_dyn_dict,
low_frequency_cutoff=low_frequency_cutoff_dict,
group=group)
# save stilde
if stilde_dict is not None:
self.write_stilde(stilde_dict, group=group)
# save strain if desired
if strain_dict is not None:
self.write_strain(strain_dict, group=group)
[docs] def write_injections(self, injection_file, ifo):
""" Writes injection parameters for an IFO to file.
Parameters
----------
injection_file : str
Path to HDF injection file.
ifo : str
IFO name.
"""
subgroup = "{ifo}/injections"
self.create_group(subgroup.format(ifo=ifo))
try:
with h5py.File(injection_file, "r") as fp:
for param in fp.keys():
self[subgroup.format(ifo=ifo)][param] = fp[param][:]
for key in fp.attrs.keys():
self[subgroup.format(ifo=ifo)].attrs[key] = fp.attrs[key]
except IOError:
logging.warn("Could not read %s as an HDF file", injection_file)
[docs] def write_command_line(self):
"""Writes command line to attributes.
The command line is written to the file's ``attrs['cmd']``. If this
attribute already exists in the file (this can happen when resuming
from a checkpoint), ``attrs['cmd']`` will be a list storing the current
command line and all previous command lines.
"""
cmd = [" ".join(sys.argv)]
try:
previous = self.attrs["cmd"]
if isinstance(previous, str):
# convert to list
previous = [previous]
elif isinstance(previous, numpy.ndarray):
previous = previous.tolist()
except KeyError:
previous = []
self.attrs["cmd"] = cmd + previous
[docs] def write_resume_point(self):
"""Keeps a list of the number of iterations that were in a file when a
run was resumed from a checkpoint."""
try:
resume_pts = self.attrs["resume_points"].tolist()
except KeyError:
resume_pts = []
try:
niterations = self.niterations
except KeyError:
niterations = 0
resume_pts.append(niterations)
self.attrs["resume_points"] = resume_pts
[docs] def write_random_state(self, group=None, state=None):
""" Writes the state of the random number generator from the file.
Parameters
----------
group : str
Name of group to read random state to.
state : tuple, optional
Specify the random state to write. If None, will use
``numpy.random.get_state()``.
"""
group = self.sampler_group if group is None else group
dataset_name = "/".join([group, "random_state"])
if state is None:
state = numpy.random.get_state()
s, arr, pos, has_gauss, cached_gauss = state
if group in self:
self[dataset_name][:] = arr
else:
self.create_dataset(dataset_name, arr.shape, fletcher32=True,
dtype=arr.dtype)
self[dataset_name][:] = arr
self[dataset_name].attrs["s"] = s
self[dataset_name].attrs["pos"] = pos
self[dataset_name].attrs["has_gauss"] = has_gauss
self[dataset_name].attrs["cached_gauss"] = cached_gauss
[docs] def get_slice(self, thin_start=None, thin_interval=None, thin_end=None):
"""Formats a slice using the given arguments that can be used to
retrieve a thinned array from an InferenceFile.
Parameters
----------
thin_start : {None, int}
The starting index to use. If None, will try to retrieve the
`burn_in_iterations` from the given file. If no
`burn_in_iterations` exists, will default to the start of the
array.
thin_interval : {None, int}
The interval to use. If None, will try to retrieve the acl from the
given file. If no acl attribute exists, will default to 1.
thin_end : {None, int}
The end index to use. If None, will retrieve to the end of the
array.
Returns
-------
slice :
The slice needed.
"""
# default is to skip burn in samples
if thin_start is None:
try:
thin_start = self.burn_in_iterations
# if the sampler hasn't burned in, the burn_in_iterations will
# be the same as the number of iterations, which would result
# in 0 samples. In that case, just use the last one
if thin_start == self.niterations:
thin_start = thin_start - 1
except KeyError:
pass
# default is to use stored ACL and accept every i-th sample
if thin_interval is None:
try:
thin_interval = int(numpy.ceil(self.acl))
except KeyError:
pass
return slice(thin_start, thin_end, thin_interval)
[docs] def copy(self, other, parameters=None, parameter_names=None,
posterior_only=False, **kwargs):
"""Copies data in this file to another file.
The samples and stats to copy may be down selected using the given
kwargs. All other data (the "metadata") are copied exactly.
Parameters
----------
other : str or InferenceFile
The file to write to. May be either a string giving a filename,
or an open hdf file. If the former, the file will be opened with
the write attribute (note that if a file already exists with that
name, it will be deleted).
parameters : list of str, optional
List of parameters to copy. If None, will copy all parameters.
parameter_names : dict, optional
Rename one or more parameters to the given name. The dictionary
should map parameter -> parameter name. If None, will just use the
original parameter names.
posterior_only : bool, optional
Write the samples and model stats as flattened arrays, and
set other's posterior_only attribute. For example, if this file
has a parameter's samples written to
`{samples_group}/{param}/walker{x}`, then other will have all of
the selected samples from all walkers written to
`{samples_group}/{param}/`.
**kwargs :
All other keyword arguments are passed to `read_samples`.
Returns
-------
InferenceFile
The open file handler to other.
"""
if not isinstance(other, h5py.File):
# check that we're not trying to overwrite this file
if other == self.name:
raise IOError("destination is the same as this file")
other = InferenceFile(other, 'w')
# copy metadata over
self.copy_metadata(other)
# update other's posterior attribute
if posterior_only:
other.attrs['posterior_only'] = posterior_only
# select the samples to copy
logging.info("Reading samples to copy")
if parameters is None:
parameters = self.variable_params
# if list of desired parameters is different, rename model params
if set(parameters) != set(self.variable_params):
other.attrs['variable_params'] = parameters
# if only the posterior is desired, we'll flatten the results
if not posterior_only and not self.posterior_only:
kwargs['flatten'] = False
samples = self.read_samples(parameters, **kwargs)
logging.info("Copying {} samples".format(samples.size))
# if different parameter names are desired, get them from the samples
if parameter_names:
arrs = {pname: samples[p] for p, pname in parameter_names.items()}
arrs.update({p: samples[p] for p in parameters if
p not in parameter_names})
samples = FieldArray.from_kwargs(**arrs)
other.attrs['variable_params'] = samples.fieldnames
logging.info("Writing samples")
other.samples_parser.write_samples_group(other, self.samples_group,
samples.fieldnames, samples)
# do the same for the model stats
logging.info("Reading stats to copy")
stats = self.read_model_stats(**kwargs)
logging.info("Writing stats")
other.samples_parser.write_samples_group(other, self.stats_group,
stats.fieldnames, stats)
# if any down selection was done, re-set the burn in iterations and
# the acl, and the niterations.
# The last dimension of the samples returned by the sampler should
# be the number of iterations.
if samples.shape[-1] != self.niterations:
other.attrs['acl'] = 1
other.attrs['burn_in_iterations'] = 0
other.attrs['niterations'] = samples.shape[-1]
return other
[docs]def check_integrity(filename):
"""Checks the integrity of an InferenceFile.
Checks done are:
* can the file open?
* do all of the datasets in the samples group have the same shape?
* can the first and last sample in all of the datasets in the samples
group be read?
If any of these checks fail, an IOError is raised.
Parameters
----------
filename: str
Name of an InferenceFile to check.
Raises
------
ValueError
If the given file does not exist.
KeyError
If the samples group does not exist.
IOError
If any of the checks fail.
"""
# check that the file exists
if not os.path.exists(filename):
raise ValueError("file {} does not exist".format(filename))
# if the file is corrupted such that it cannot be opened, the next line
# will raise an IOError
with InferenceFile(filename, 'r') as fp:
# check that all datasets in samples have the same shape
parameters = fp[fp.samples_group].keys()
group = fp.samples_group + '/{}'
# use the first parameter as a reference shape
ref_shape = fp[group.format(parameters[0])].shape
if not all(fp[group.format(param)].shape == ref_shape
for param in parameters):
raise IOError("not all datasets in the samples group have the "
"same shape")
# check that we can read the first/last sample
firstidx = tuple([0]*len(ref_shape))
lastidx = tuple([-1]*len(ref_shape))
for param in parameters:
fp[group.format(param)][firstidx]
fp[group.format(param)][lastidx]