"""
Cosmological likelihoods (:mod:`~harmonia.reader.likelihoods`)
===========================================================================
Construct likelihoods from Spherical Fourier coefficients and Cartesian
power spectrum multipoles of random fields for cosmological parameter
inference.
Probability distributions
---------------------------------------------------------------------------
.. autosummary::
complex_normal_pdf
multivariate_normal_pdf
modified_student_pdf
Likelihood inference
---------------------------------------------------------------------------
.. autosummary::
spherical_covariance
cartesian_moments
LogLikelihood
|
.. |correlator_matrix| replace::
:meth:`~harmonia.reader.models.SphericalCorrelator.correlator_matrix`
.. |convolved_power_multipoles| replace::
:meth:`~.reader.models.CartesianMultipoles.convolved_power_multipoles`
"""
import logging
import warnings
# pylint: disable=no-name-in-module
import numpy as np
from scipy.special import loggamma
from harmonia.utils import (
PositiveDefinitenessWarning,
is_positive_definite,
mat_logdet,
)
# Probability distributions
# -----------------------------------------------------------------------------
def _are_valid_moments(first_moment, second_moment):
# Check dimensions of the expectation and variance are consistent.
criterion1 = (np.squeeze(first_moment).ndim == 1)
criterion2 = (np.squeeze(second_moment).ndim == 2)
criterion3 = (np.shape(first_moment) * 2 == np.shape(second_moment))
return all([criterion1, criterion2, criterion3])
[docs]def chi_square(data_vector, covariance_matrix):
"""Calculate chi-square from zero-centred data vector and its
covariance matrix.
Parameters
----------
data_vector : complex, array_like
Zero-centred data vector.
covariance_matrix : complex, array_like
Covariance matrix.
Returns
-------
chi_sq : float :class:`numpy.ndarray`
Chi-square value.
"""
data_vector = np.squeeze(data_vector)
covariance_matrix = np.squeeze(covariance_matrix)
if not _are_valid_moments(data_vector, covariance_matrix):
raise ValueError("Check input dimensions.")
# pylint: disable=unexpected-keyword-arg
chi_sq = np.dot(
np.conj(data_vector), np.linalg.solve(covariance_matrix, data_vector)
)
return chi_sq
[docs]def complex_normal_pdf(data_vector, covariance_matrix, ret_log=True,
downscale=None):
"""Compute the complex normal probability density function or its
natural logarithm given the zero-centred data vector and its
covariance matrix.
Parameters
----------
data_vector : complex, array_like
Zero-centred data vector.
covariance_matrix : complex, array_like
Covariance matrix.
ret_log : bool, optional
If `True` (default), return logarithmic probability density.
downscale : float or None, optional
If not `None` (default), the data vector and covariance matrix are
simultaneous downscaled to avoid numerical issue.
Returns
-------
float, array_like
(Logarithmic) probability density.
"""
data_vector = np.squeeze(data_vector)
covariance_matrix = np.squeeze(covariance_matrix)
if not _are_valid_moments(data_vector, covariance_matrix):
raise ValueError("Check input dimensions.")
dim = np.size(data_vector)
log_normalisation = dim * np.log(np.pi)
if downscale is not None:
data_vector = data_vector / downscale
covariance_matrix = covariance_matrix / downscale ** 2
log_normalisation -= 2 * dim * np.log(downscale)
if not is_positive_definite(covariance_matrix):
warnings.warn(
"`covariance_matrix` is not positive definite.",
PositiveDefinitenessWarning
)
density = \
- log_normalisation \
- mat_logdet(covariance_matrix) \
- np.real_if_close(
chi_square(data_vector, covariance_matrix), tol=10**10
)
return density if ret_log else np.exp(density)
[docs]def multivariate_normal_pdf(data_vector, expectation_vector, covariance_matrix,
ret_log=True):
"""Compute the multivariate normal probability density function or its
natural logarithm given the data vector, its mean vector and covariance
matrix.
Parameters
----------
data_vector : float, array_like
Data vector.
expectation_vector : float, array_like
Mean vector.
covariance_matrix : float, array_like
Covariance matrix.
ret_log : bool, optional
If `True` (default), return logarithmic probability density.
Returns
-------
density : float, array_like
Logarithmic probability density value.
"""
data_vector = np.squeeze(data_vector)
expectation_vector = np.squeeze(expectation_vector)
covariance_matrix = np.squeeze(covariance_matrix)
if not _are_valid_moments(data_vector, covariance_matrix) or \
not _are_valid_moments(expectation_vector, covariance_matrix):
raise ValueError("Check input dimensions.")
dim = np.size(data_vector)
log_normalisation = dim * np.log(2*np.pi)
log_determinant = mat_logdet(covariance_matrix)
chi_sq = chi_square(data_vector - expectation_vector, covariance_matrix)
density = - (log_normalisation + log_determinant + chi_sq) / 2
return density if ret_log else np.exp(density)
[docs]def modified_student_pdf(data_vector, expectation_vector, covariance_matrix,
degree, ret_log=True):
"""Compute the multivariate modified Student probability density
function or its natural logarithm given the data vector, its mean
vector and covariance matrix.
Parameters
----------
data_vector : float, array_like
Data vector.
expectation_vector : float, array_like
Mean vector.
covariance_matrix : float, array_like
Covariance matrix.
degree : int
The degree number. This could be the number of empirical
covariance matrices used to obtain the estimated
`covariance_matrix`.
ret_log : bool, optional
If `True` (default), return logarithmic probability density.
Returns
-------
float, array_like
(Logarithmic) probability density value.
"""
data_vector = np.squeeze(data_vector)
expectation_vector = np.squeeze(expectation_vector)
covariance_matrix = np.squeeze(covariance_matrix)
if not _are_valid_moments(data_vector, covariance_matrix) \
or not _are_valid_moments(expectation_vector, covariance_matrix):
raise ValueError("Check input dimensions.")
dim = np.size(data_vector)
log_normalisation = dim / 2. * np.log((degree - 1) * np.pi) \
+ loggamma((degree - dim) / 2.) \
- loggamma(degree / 2.)
log_determinant = mat_logdet(covariance_matrix)
log_pseudo_chisq = degree / 2. * np.log(
1 + chi_square(
data_vector - expectation_vector, covariance_matrix
) / (degree - 1)
)
density = - (log_normalisation + log_determinant / 2 + log_pseudo_chisq)
return density if ret_log else np.exp(density)
# Likelihoods
# -----------------------------------------------------------------------------
[docs]class LikelihoodWarning(UserWarning):
"""Likelihood evaluation warning.
"""
[docs]def spherical_covariance(pivot, spherical_model, **kwargs):
r"""Compute the parametrised covariance matrix of spherical Fourier
coefficients.
Parameters
----------
pivot : {'natural', 'spectral'}
Pivot order for vectorisation.
spherical_model : :class:`~harmonia.reader.models.SphericalCorrelator`
Spherical correlator base model.
**kwargs
Parameters (other than `pivot`) to be passed to |correlator_matrix|
of `spherical_correlator`.
Returns
-------
covariance_matrix : complex :class:`numpy.ndarray`
Covariance matrix.
See Also
--------
:class:`~harmonia.reader.models.SphericalCorrelator`
"""
covariance_matrix = spherical_model.correlator_matrix(pivot, **kwargs)
return covariance_matrix
[docs]def cartesian_moments(pivot, orders, cartesian_model,
covariance_estimator=None, mode_counts=None, **kwargs):
"""Compute the parametrised mean and covariance of Cartesian
power spectrum multipoles.
Parameters
----------
pivot : {'order', 'wavenumber'}
Pivot order for vectorisation.
orders : list of int
Orders of the power spectrum multipoles.
cartesian_model : :class:`~.models.CartesianMultipoles`
Cartesian power multipoles base model.
covariance_estimator : :class:`~.CovarianceEstimator` *or None, optional*
Cartesian power multipole covariance estimator. Its
:attr:`wavenumbers` must match wavenumbers associated
with `cartesian_model`. If `None`, no correlation between power
spectrum multipoles is assumed but `mode_counts` must be provided
for calculating the power spectrum variance.
mode_counts : int, array_like or None, optional
Number of independent modes for each power spectrum measurement
(default is `None`) used to calculate the power spectrum variance.
Ignored if `covariance_estimator` is provided.
**kwargs
Parameters (other than `orders`) to be passed to
|convolved_power_multipoles| of `cartesian_model`.
Returns
-------
expectation : float :class:`numpy.ndarray`
Power spectrum expectation at the specified wavenumbers.
covariance : float :class:`numpy.ndarray`
Power spectrum variance at the specified wavenumbers.
"""
expectation = cartesian_model.convolved_power_multipoles(
orders, **kwargs
).vectorise(pivot)
# Check model and estimator wavenumbers agree.
if covariance_estimator is None:
covariance = expectation ** 2 / np.asarray(mode_counts)
else:
assert np.allclose(
cartesian_model.attrs['wavenumbers'],
covariance_estimator.wavenumbers,
atol=0.001
), (
"The wavenumbers at which the Cartesian power multipole model "
"is evaluated must match the wavenumbers at which "
"the fiducial covariance matrix is estimated."
)
fiducial_expectation = \
covariance_estimator.get_fiducial_vector(pivot)
fiducial_covariance = \
covariance_estimator.get_fiducial_covariance(pivot)
covariance = np.linalg.multi_dot([
np.diag(expectation / fiducial_expectation),
fiducial_covariance,
np.diag(expectation / fiducial_expectation)
])
return expectation, covariance
[docs]class LogLikelihood:
"""Construct the logarithmic likelihood function from
cosmological data.
Parameters
----------
spherical_data : :class:`~.arrays.SphericalArray` *or None, optional*
Spherical Fourier coefficient data (default is `None`).
cartesian_data : :class:`~.arrays.CartesianArray` *or None, optional*
Spherical Fourier coefficient data (default is `None`).
covariance_estimator : :class:`~.CovarianceEstimator` *or None, optional*
Cartesian multipole covariance estimator (default is `None`).
mode_counts : int, array_like or None, optional
Number of independent modes for each Cartesian data point (default
is `None`) as an alternative to `covariance_estimator`. Ignored
if `covariance_estimator` is provided.
base_spherical_model : :class:`~.SphericalCorrelator` *or None, optional*
Baseline spherical correlator model (default is `None`).
base_cartesian_model : :class:`~.CartesianMultipoles` *or None, optional*
Baseline Cartesian multipole model (default is `None`).
spherical_pivot : {'natural', 'spectral'}, optional
Pivot order for spherical map data vectorisation (default is
'natural').
cartesian_pivot : {'order', 'wavenumber'}, optional
Pivot order for Cartesian map data vectorisation (default is
'order').
nbar : float or None, optional
Mean particle number density (in cubic :math:`h`/Mpc). If
`None` (default), shot noise is neglected.
contrast : float or None, optional
If not `None` (default), this adds additional shot noise level
``1 / (contrast * nbar)`` due to a FKP-style random catalogue.
tracer_p : float, optional
Tracer-dependent parameter for bias modulation by `f_nl`
(default is 1.).
comm : :class:`mpi4py.MPI.Comm` *or None, optional*
MPI communicator (default is `None`).
Attributes
----------
attrs : dict
Directory holding input parameters not corresponding to any of
the following attributes.
spherical_data : :class:`~.algorithms.arrays.SphericalArray` or None
Spherical Fourier coefficient data.
cartesian_data : :class:`~.algorithms.arrays.CartesianArray` or None
Spherical Fourier coefficient data.
covariance_estimator : :class:`~.CovarianceEstimator` or None
Cartesian multipole covariance estimator.
base_spherical_model : :class:`~.SphericalCorrelator` or None
Baseline spherical correlator model.
base_cartesian_model : :class:`~.CartesianMultipoles` or None
Baseline Cartesian multipole model.
"""
def __init__(self, spherical_data=None, cartesian_data=None,
covariance_estimator=None, mode_counts=None,
base_spherical_model=None, base_cartesian_model=None,
spherical_pivot='natural', cartesian_pivot='order',
nbar=None, contrast=None, tracer_p=1., comm=None):
self.logger = logging.getLogger(self.__class__.__name__)
self.comm = comm
self.attrs = {
'spherical_pivot': spherical_pivot,
'cartesian_pivot': cartesian_pivot,
'mode_counts': mode_counts,
'nbar': nbar,
'contrast': contrast,
'tracer_p': tracer_p,
}
self.spherical_data = spherical_data
self.cartesian_data = cartesian_data
self.covariance_estimator = covariance_estimator
self.base_spherical_model = base_spherical_model
self.base_cartesian_model = base_cartesian_model
[docs] def spherical_map_likelihood(self, b_1, f_nl, exclude_degrees=(),
compression_matrix=None, **kwargs):
"""Evaluate the spherical map logarithmic likelihood.
Parameters
----------
b_1 : float
Scale-independent linear bias.
f_nl : float or None
Local primordial non-Gaussianity.
exclude_degrees : tuple of int, optional
If not empty (default), modes whose spherical degree
match one of its elements are removed from the likelihood.
compression_matrix : :class:`numpy.ndarray` *or None*, optional
If not `None` (default), both the data vector and the model
covariance matrix are processed for data compression. This
must be compatible with `exclude_degrees`, i.e. it accounts
for elements removed from the data vector and covariance
matrix by `exclude_degrees`.
**kwargs
Additional parameters to be passed to |correlator_matrix| of
:attr:`base_spherical_model`.
Returns
-------
log_likelihood : float
Logarithmic likelihood.
See Also
--------
:class:`~harmonia.surveyor.synthesis.generate_compression_matrix`
:class:`~harmonia.reader.likelihoods.spherical_covariance`
|correlator_matrix|
"""
_OVERFLOW_DOWNSCALE = 10**4
data_vector = \
self.spherical_data.vectorise(self.attrs['spherical_pivot'])
covariance_matrix = spherical_covariance(
self.attrs['spherical_pivot'], self.base_spherical_model,
b_1=b_1, f_nl=f_nl,
nbar=self.attrs['nbar'],
contrast=self.attrs['contrast'],
tracer_p=self.attrs['tracer_p'],
**kwargs
)
# pylint: disable=no-member
if exclude_degrees:
deselector = np.logical_and.reduce([
self.spherical_data.array['index'][:, 0] == deg
for deg in exclude_degrees
])
data_vector = data_vector[~deselector]
covariance_matrix = \
covariance_matrix[~deselector, :][:, ~deselector]
if compression_matrix is not None:
data_vector = np.linalg.multi_dot([
compression_matrix, data_vector
])
covariance_matrix = np.linalg.multi_dot([
compression_matrix,
covariance_matrix,
np.conj(compression_matrix.T)
])
log_likelihood = complex_normal_pdf(
data_vector, covariance_matrix, downscale=_OVERFLOW_DOWNSCALE,
)
return log_likelihood
[docs] def cartesian_map_likelihood(self, b_1, f_nl, orders=None,
num_samples=None, **kwargs):
"""Evaluate the Cartesian map logarithmic likelihood.
Parameters
----------
b_1 : float
Scale-independent linear bias of the tracer particles.
f_nl : float or None
Local primordial non-Gaussianity.
orders : list of int or None, optional
Orders of the power spectrum multipoles. If `None` (default),
only the monopole is used.
num_samples : int or None, optional
If `None` (default), the normal distribution is used without
correction for covariance estimation uncertainty; otherwise
it is passed as `degree` to :func:`modified_student_pdf` for
covariance estimation uncertainty correction [1]_.
**kwargs
Additional parameters to be passed to
|convolved_power_multipoles| of :attr:`base_cartesian_model`.
Returns
-------
log_likelihood : float
Logarithmic likelihood.
See Also
--------
:class:`~harmonia.reader.likelihoods.cartesian_moments`
|convolved_power_multipoles|
.. [1] Sellentin E. & Heavens A. F., 2016. MNRAS 456(1), L132–L136.
[arXiv: `1511.05969 <https://arxiv.org/abs/1511.05969>`_]
"""
orders = orders or [0]
data_vector = \
self.cartesian_data.vectorise(self.attrs['cartesian_pivot'])
expectation_vector, covariance_matrix = cartesian_moments(
self.attrs['cartesian_pivot'], orders, self.base_cartesian_model,
covariance_estimator=self.covariance_estimator,
mode_counts=self.attrs['mode_counts'],
b_1=b_1, f_nl=f_nl,
nbar=self.attrs['nbar'],
contrast=self.attrs['contrast'],
tracer_p=self.attrs['tracer_p'],
**kwargs
)
if self.covariance_estimator is not None and num_samples is not None:
log_likelihood = modified_student_pdf(
data_vector, expectation_vector, covariance_matrix,
degree=num_samples
)
else:
log_likelihood = multivariate_normal_pdf(
data_vector, expectation_vector, covariance_matrix
)
return log_likelihood