"""
Synthesis (:mod:`~harmonia.surveyor.synthesis`)
===========================================================================
Systhesise FKP-style paired random catalogues for given survey
specifications.
This can be used for determining survey window functions and estimating
correlation induced by geometric filtering.
.. autosummary::
SyntheticCatalogue
CovarianceEstimator
generate_compression_matrix
|
"""
import warnings
import numpy as np
import psutil
from scipy.interpolate import InterpolatedUnivariateSpline as Spline
from mcfit import P2xi
from nbodykit.lab import FKPCatalog
from harmonia.algorithms.arrays import CartesianArray
from harmonia.mapper.catalogue_maker import (
RandomCatalogue,
SphericalFKPCatalogue,
)
from harmonia.mapper.map_transform import CartesianMap, SphericalMap
from harmonia.reader.likelihoods import spherical_covariance
[docs]class SyntheticCatalogue:
r"""Synthetic catalogue for probing survey geometry.
Parameters
----------
mean_density : float
Mean number density (in cubic :math:`h`/Mpc).
boxsize : float, array_like
Catalogue boxsize (in Mpc/:math:`h`) as a scalar or a triple of
scalars.
contrast : float or None, optional
Contrast of the number density of the secondary random catalogue
to that of the primary. If `None` (default), the secondary
catalogue is not produced. This cannot be `None` if
`sphericalise` is provided.
expansion : float, optional
Expansion factor of the catalogue box for grid assignment
(default is 1.). Ignored if `sphericalise` is set.
sphericalise : float or None, optional
If not `None` (default), this is passed to the `radius` parameter
of :class:`~.mapper.catalogue_maker.SphericalFKPCatalogue` for
instantiating the synthetic catalogue.
mask, selection, weight : callable or None, optional
Survey mask, selection or weight function (default is `None`).
Must be given as a function of Cartesian coordinate only assuming
the origin is at the centre of the catalogues.
apply_selection_as_veto : bool, optional
If `True` (default is `False`), `selection` returning boolean
values is applied to the 'Selection' column of the catalogues
instead of the 'NZ' column (see `nbodykit` documentation for
this peculiarity.)
Attributes
----------
spherical_coefficients : :class:`~.arrays.SphericalArray` or None
Density contrast coefficients. Only available after
:meth:`compute_spherical_coefficients` is called.
power_multipoles : :class:`~.algorithms.arrays.CartesianArray` or None
Power spectrum multipoles. Only available after
:meth:`compute_power` is called.
window_multipoles : :class:`~.algorithms.arrays.CartesianArray` or None
Power spectrum multipoles of the window function at sampled
separations. Only available after :meth:`compute_window` is
called.
mask_multipoles : :class:`~.algorithms.arrays.CartesianArray` or None
Correlation function multipoles of the window function at sampled
separations. Only available after :meth:`compute_mask` is
called.
attrs : dict
Attributes inherited from input parameters.
"""
def __init__(self, number_density, boxsize, contrast=None, expansion=1.,
sphericalise=None, mask=None, selection=None, weight=None,
apply_selection_as_veto=False):
boxsize = [boxsize,] * 3 if np.isscalar(boxsize) \
else boxsize
self.attrs = {
'number_density': number_density,
'boxsize': boxsize,
'contrast': contrast,
'expansion': expansion,
'sphericalise': sphericalise,
}
self._synthetic_catalogue = self._synthesise(
number_density, boxsize, contrast, expansion,
mask, selection, weight, apply_selection_as_veto,
radius=sphericalise
)
self.spherical_coefficients = None
self.power_multipoles = None
self.window_multipoles = None
self.mask_multipoles = None
self._intrpl_k = None
self._intrpl_power = None
self._intrpl_s = None
self._intrpl_correlation = None
def __str__(self):
str_info = ", ".join(
[f"{name}={val}" for name, val in self.attrs.items()]
)
return f"{self.__class__.__name__}({str_info})"
[docs] def compute_spherical_coefficients(self, disc):
"""Compute the spherical Fourier density contrast coefficients of
the synthetic catalogue.
Parameters
----------
disc : :class:`~.algorithms.discretisation.DiscreteSpectrum`
Discrete spectrum.
Returns
-------
:class:`~harmonia.algorithms.arrays.SphericalArray`
Density contrast coefficients.
See Also
--------
:class:`~harmonia.mapper.map_transform.SphericalMap`
"""
spherical_map = SphericalMap(self._synthetic_catalogue, disc)
return spherical_map.density_contrast
[docs] def compute_power(self, orders, kmin=1.e-4, kmax=None, dk=None,
num_mesh=256, resampler='tsc', interlaced=True):
"""Compute the power spectrum multipoles of the synthetic
catalogue.
Parameters
----------
orders : sequence of int
Orders of the power spectrum multipoles.
kmin : float, optional
Minimum wavenumber of the map (in :math:`h`/Mpc)
(default is 1.e-4).
kmax : float or None, optional
Maximum wavenumber of the map (in :math:`h`/Mpc)
(default is `None`). If `None`, `kmax` is the Nyquist
wavenumber determined from `num_mesh`.
dk : float or None, optional
Wavenumber bin width (in :math:`h`/Mpc) (default is `None`).
num_mesh : int, optional
Mesh number per dimension for interpolating the discrete
catalogues on a grid.
resampler : {'cic', 'tsc', 'pcs'}, optional
Grid assignment scheme (default is ``'tsc'``) for catalogue
interpolation.
interlaced : bool, optional
If `True` (default), use interlacing for aliasing mitigation.
Returns
-------
:class:`~harmonia.algorithms.arrays.CartesianArray`
Power spectrum multipoles binned in wavenumbers.
See Also
--------
:class:`~harmonia.mapper.map_transform.CartesianMap`
"""
if self.power_multipoles is not None:
warnings.warn(
"Power spectrum multipoles have already been computed; "
"they are now being recomputed."
)
# Check if there is sufficient memory; if not,
# reduce the mesh number.
while psutil.virtual_memory().available < 10 * 768**3:
num_mesh /= 2
warnings.warn(
"Mesh number reduced due to limited memory", RuntimeWarning
)
# Set the Nyquist wavenumber as ther upper limit.
if kmax is None:
kmax = np.pi * num_mesh \
/ min(self._synthetic_catalogue.attrs['BoxSize'])
cartesian_map = CartesianMap(
self._synthetic_catalogue,
orders, kmin=kmin, kmax=kmax, dk=dk,
num_mesh=num_mesh, resampler=resampler, interlaced=interlaced
)
with warnings.catch_warnings():
# Filter out warnings when the secondary catalogue
# in the synthetic FKP-style catalogue pair is None,
# as it would cause division by zero.
warnings.filterwarnings(
'ignore',
category=RuntimeWarning,
message=".*invalid value encountered.*"
)
warnings.filterwarnings(
'ignore',
category=RuntimeWarning,
message="divide by zero encountered in double_scalars"
)
self.power_multipoles = cartesian_map.power_multipoles
return self.power_multipoles
[docs] def compute_window(self, orders, kmin=1.e-6, **kwargs):
"""Compute the Fourier-space window function multipoles from the
synthetic catalogue.
Parameters
----------
orders : sequence of int
Orders of the power spectrum multipoles.
kmin : float, optional
Minimum wavenumber (in :math:`h`/Mpc) (default is 1.e-6).
**kwargs
Any other parameters to be passed to :meth:`~.compute_power`.
Returns
-------
:class:`~harmonia.algorithms.arrays.CartesianArray`
Window function multipoles binned in logarithmic wavenumbers.
See Also
--------
:meth:`~.compute_power`
"""
LOG10_K_MAX = 1.
NUM_EXTENSION = 1000
NUM_INTERPOLATION = pow(2, 10)
if np.isclose(self.attrs['expansion'], 1.):
warnings.warn(
"No padding for catalogue box when computing "
"the window function. You may want to resynthesise "
"the catalogue with `expansion` set above 1."
)
# Need to compute power spectrum multipoles first.
power_multipoles = self.compute_power(orders, kmin=kmin, **kwargs)
raw_k = power_multipoles.attrs['wavenumbers']
raw_power = {
ell: power_multipoles.array['power'][
power_multipoles.array['order'] == ell
]
for ell in orders
}
# Linear padding and then logarithmic padding in wavenumbers.
padding = np.mean(np.abs(np.diff(raw_k)))
lin_extension = np.max(raw_k) + padding * np.arange(1, NUM_EXTENSION)
log_extension = np.logspace(
np.log10(np.max(raw_k) + padding * NUM_EXTENSION),
LOG10_K_MAX,
num=NUM_EXTENSION
)
ext_k = np.r_[raw_k, lin_extension, log_extension]
ext_power = {
ell: np.r_[raw_power[ell], np.zeros(2 * NUM_EXTENSION - 1)]
for ell in orders
}
# Resample at log-spaced wavenumbers.
self._intrpl_k = np.logspace(
*np.log10(ext_k[[0, -1]]), num=NUM_INTERPOLATION
)
self._intrpl_power = {
ell: Spline(ext_k, ext_power[ell], k=1)(self._intrpl_k)
for ell in orders
}
# Store as an attribute.
orders = np.sort(orders).tolist()
self.window_multipoles = CartesianArray(orders, self._intrpl_k)
self.window_multipoles[:] = np.concatenate(
[self._intrpl_power[ell] for ell in orders]
)
# Normalise all values to the monopole at vanishing wavenumbers.
self.window_multipoles[:] /= self.window_multipoles[0]
return self.window_multipoles
[docs] def compute_mask(self, orders, *args, **kwargs):
"""Compute the configuration-space mask function multipoles from
the synthetic catalogue by Hankel transform of the window function
multipoles.
Parameters
----------
orders : sequence of int
Orders of the mask function multipoles.
*args, **kwargs
Any other position and keyword parameters to be passed to
:meth:`~.compute_window`. Only required when
:attr:`window_multipoles` is not available.
Returns
-------
:class:`numpy.ndarray`
Mask function multipoles binned in logarithmic separations.
This is a NumPy structured array similar to
:attr:`array` of :class:`~.algorithms.arrays.CartesianArray`.
See Also
--------
:class:`~.algorithms.arrays.CartesianArray`
"""
NUM_INTERPOL = pow(2, 14)
# Need to compute window function multiples first.
if self.window_multipoles is None \
or max(orders) not in self._intrpl_power:
self.window_multipoles = self.compute_window(
orders, *args, **kwargs
)
# Hankel transform window function multipoles.
with warnings.catch_warnings(): # suppress warnings from `mcfit`
warnings.filterwarnings(
'ignore',
message=(
"The default value of extrap has been changed to False, "
"set it to True if you cannot reproduce previous results"
)
)
raw_correlation = {
ell: P2xi(self._intrpl_k, l=ell, lowring=True)(
self._intrpl_power[ell], extrap=False
)
for ell in orders
}
raw_s = [raw_correlation[ell][0] for ell in orders]
raw_correlation.update({ell: raw_correlation[ell] for ell in orders})
# Resample at log-spaced separations.
self._intrpl_s = np.logspace(
np.log10(np.min(raw_s)), np.log10(np.max(raw_s)), num=NUM_INTERPOL
)
self._intrpl_correlation = {
ell: Spline(*raw_correlation[ell], k=1)(self._intrpl_s)
for ell in orders
}
# Create a structure array dedicated for storing mask multipoles.
self.mask_multipoles = np.empty(
np.size(orders) * np.size(self._intrpl_s),
dtype=np.dtype({
'names': ['order', 'separation', 'correlation'],
'formats': ['i4', 'f8', 'f8'],
})
)
self.mask_multipoles['order'] = np.repeat(orders, len(self._intrpl_s))
self.mask_multipoles['separation'] = np.tile(
self._intrpl_s, len(orders)
)
# Normalise all values to the monopole at vanishing separations.
self.mask_multipoles['correlation'] = np.concatenate(
[self._intrpl_correlation[ell] for ell in orders]
) / raw_correlation[0][-1][0]
return self.mask_multipoles
@staticmethod
def _synthesise(number_density, boxsize, contrast, expansion,
mask, selection, weight, apply_selection_as_veto,
radius=None):
primary_catalogue = RandomCatalogue(number_density, boxsize)
if contrast is not None:
secondary_catalogue = RandomCatalogue(
contrast * number_density, boxsize
)
else:
secondary_catalogue = None
# No sphericalisation; reimplement FKP build similar to
# :class:`~harmonia.mapper.catalogue_maker.SphericalFKPCatalogue`.
if radius is None:
for catalogue in [primary_catalogue, secondary_catalogue]:
if catalogue is None:
break
catalogue['Location'] = \
catalogue['Position'] - np.divide(boxsize, 2)
catalogue['NZ'] = number_density * catalogue['Selection']
if callable(mask):
catalogue['Selection'] *= mask(catalogue['Location'])
if callable(selection):
if apply_selection_as_veto:
catalogue['Selection'] *= \
selection(catalogue['Location'])
else:
catalogue['NZ'] *= selection(catalogue['Location'])
if callable(weight):
catalogue['Weight'] *= weight(catalogue['Location'])
return FKPCatalog(
primary_catalogue, secondary_catalogue,
BoxSize=np.multiply(expansion, boxsize)
)
# Reuse :class:`~.mapper.catalogue_maker.SphericalFKPCatalogue`.
spherical_catalogue = SphericalFKPCatalogue(
radius,
data_catalogue=primary_catalogue,
random_catalogue=secondary_catalogue,
mask=mask, selection=selection, weight=weight,
apply_selection_as_veto=apply_selection_as_veto
)
return FKPCatalog(
spherical_catalogue.data_catalogue,
spherical_catalogue.random_catalogue,
BoxSize=np.multiply(expansion, boxsize)
)
[docs]class CovarianceEstimator:
"""Covariance matrix estimator for power spectrum multipoles at
fiducial values.
Parameters
----------
realisations : sequence of :class:`~.algorithms.arrays.CartesianArray`
Independent realisations of power spectrum multipoles from which
the covariance matrix is estimated. For each realisation, orders
and wavenumbers of the multipoles must be sorted.
reference : :class:`~.arrays.CartesianArray` *or None, optional*
Underlying power spectrum multipoles binned in wavenumber that
are realised for covariance estimation. Orders and wavenumbers of
the multipoles must be sorted. If `None` (default), this is
determined by the average of `realisations`.
Attributes
----------
realisations : sequence of :class:`~.algorithms.arrays.CartesianArray`
Independent realisations of power spectrum multipoles from which
the covariance matrix is estimated.
reference : :class:`~.arrays.CartesianArray` or None
Underlying power spectrum multipoles binned in wavenumber that
are realised for covariance estimation.
wavenumbers : float :class:`ndarray`
Wavenumbers at which the covariance matrix is estimated.
This is set by `reference` if available; otherwise it is set by
the first of `realisations`.
See Also
--------
:class:`~harmonia.algorithms.arrays.CartesianArray`
"""
def __init__(self, realisations, reference=None):
self.realisations = realisations
self.reference = reference
if self.reference is not None:
self.wavenumbers = np.unique(self.reference.array['wavenumber'])
else:
self.wavenumbers = np.unique(
self.realisations[0].array['wavenumber']
)
def __getstate__(self):
state = self.__dict__
if self.reference is not None:
state.update({'reference': self.reference.__getstate__()})
return state
def __setstate__(self, state):
for attr, value in state.items():
if attr == 'reference' and value is not None:
reference = object.__new__(CartesianArray)
reference.__setstate__(value)
setattr(self, 'reference', reference)
else:
setattr(self, attr, value)
[docs] def save(self, output_file):
"""Save the estimator with its attributes as a .npz file.
Parameters
----------
output_file : *str or* :class:`pathlib.Path`
Output file path.
"""
np.savez(output_file, **self.__getstate__())
[docs] @classmethod
def load(cls, input_file):
"""Load the estimator with its attributes from a .npz file.
Parameters
----------
input_file : *str or* :class:`pathlib.Path`
Input file path.
"""
state_data = np.load(input_file, allow_pickle=True)
state = {}
for attr in state_data.files:
try:
state.update({attr: state_data[attr].item()})
except (AttributeError, ValueError):
state.update({attr: state_data[attr]})
self = object.__new__(cls)
self.__setstate__(state)
return self
[docs] def get_fiducial_vector(self, pivot):
"""Return the fiducial data vector for which the covariance
matrix is estimated.
Parameters
----------
pivot : {'order', 'wavenumber'}
Pivot order used for data vectorisation.
Returns
-------
float :class:`numpy.ndarray`
Fiducial vector.
"""
data_matrix = [
realisation.vectorise(pivot) for realisation in self.realisations
]
data_mean = np.mean(data_matrix, axis=0)
if self.reference is None:
return data_mean
fiducial_vector = self.reference.vectorise(pivot)
if not np.allclose(data_mean, fiducial_vector, rtol=0.1):
warnings.warn(
"Realisation average deviates from `truth` "
"by more than 10% for some data entries."
)
return fiducial_vector
[docs] def get_fiducial_covariance(self, pivot):
"""Return the fiducial covariance matrix estimated from data
realisations.
Parameters
----------
pivot : {'order', 'wavenumber'}
Pivot order used for data vectorisation.
Returns
-------
float :class:`numpy.ndarray`
Estimate fiducialcovariance.
"""
data_matrix = [
realisation.vectorise(pivot) for realisation in self.realisations
]
return np.cov(data_matrix, rowvar=False, ddof=1)
[docs]def generate_compression_matrix(fiducial_model_kwargs,
extremal_model_kwargs=None,
sensitivity_threshold=0.01, discard=None):
r"""Generate a compression matrix for spherical modes.
Notes
-----
Compression is achieved by discarding non-positive eigenvalue modes
that are at least :math:`10^{-8}` times smaller than the largest and
in addition any of the following means:
* `discard` is passed to discard a number of low-eigenvalue modes;
* `extremal_model_kwargs` is passed and eigenvalues of the
resulting model covariance are compared with those from
`fiducial_covariance`. Modes corresponding to low, insensitive
(i.e. relative difference less than `sensitivity_threshold`)
are discarded.
* A combination of the above if the appropriate parameters
are passed.
Parameters
----------
fiducial_model_kwargs : dict
Fiducial model parameters to be passed to
:func:`~.reader.likelihoods.spherical_covariance`.
extremal_model_kwargs : dict or None, optional
Extremal model parameters to be passed to
:func:`~.reader.likelihoods.spherical_covariance`.
sensitivity_threshold: float, optional
Sensitivity threshold for modes deemed discardable
(default is 0.01).
discard : int or None, optional
Number of low-eigenvalue modes to discard from all modes
(default is `None`).
Returns
-------
compression_matrix : :class:`numpy.ndarray`
Compression matrix.
"""
fiducial_covariance = spherical_covariance(**fiducial_model_kwargs)
evals_fiducial, evecs = np.linalg.eigh(fiducial_covariance)
selectors = []
# Compression by positive magnitude.
selectors.append(evals_fiducial > 1.e-8 * np.max(evals_fiducial))
# Compression by discard.
if discard is not None:
selectors.append(np.arange(len(evals_fiducial)) >= discard)
# Compression by comparison for sensitivity.
extremal_covariance = spherical_covariance(**extremal_model_kwargs)
evals_extremal = np.linalg.eigvalsh(extremal_covariance)
selectors.append(
~np.isclose(evals_extremal, evals_fiducial, rtol=sensitivity_threshold)
)
# pylint: disable=no-member
# Compress and reverse order.
evecs = evecs[:, np.logical_and.reduce(selectors)][:, ::-1]
compression_matrix = np.conj(evecs).T
return compression_matrix