"""
Definition (:mod:`~harmonia.surveyor.definition`)
===========================================================================
Produce survey definition and specifications for processing and analysing
cosmological data.
.. autosummary::
generate_mask_by_sky_fraction
generate_mask_from_map
generate_selection_by_distribution
generate_selection_from_samples
generate_selection_samples
|
"""
# pylint: disable=no-name-in-module
import healpy as hp
import numpy as np
from scipy.interpolate import InterpolatedUnivariateSpline as Interpolation
from scipy.special import loggamma
from scipy.stats import gamma, norm
from harmonia.surveyor.coordinates import (
sky_to_spherical,
to_box_coords,
_is_coord_3d,
)
[docs]def generate_mask_by_sky_fraction(coord_system, sky_fraction=1.,
box_shift=None):
"""Generate mask function given the sky fraction corresponding to a
spherical cap (rather than a wedge).
Parameters
----------
coord_system : {'cartesian', 'spherical', 'sky'}
Coordinate system of the generated mask function. If 'sky', the
returned function can accept either 2-d or 3-d coordinates.
sky_fraction : float, optional
Sky coverage, ``0 < sky_fraction <= 1``.
box_shift : float, array_like or None, optional
Passed as `box_centre` to :func:`~.coordinates.to_box_coords` so
that the resulting mask function accepts Cartesian coordinates
with origin at a box corner. Ignored unless `coord_system` is
'cartesian'.
Returns
-------
mask_function : callable
A mask function accepting `coord_system` as its coordinate system
and returning boolean values.
"""
# Any conversion to the 'cartesian' coordinate system as internal
# `mask_function` is defined for native spherical coordinates.
if coord_system == 'cartesian':
apply_to_box_coords_from_native = 'spherical'
else:
apply_to_box_coords_from_native = 'null'
box_shift = None
# `coords` can be either 'spherical' or 'sky' but the native
# implementation is 'spherical'; the decorator takes care of
# 'cartesian' inputs.
@to_box_coords(apply_to_box_coords_from_native, box_centre=box_shift)
def mask_function(coords):
# Transform to spherical surface coords.
surf_coords = sky_to_spherical(coords) if coord_system == 'sky' \
else np.atleast_2d(coords)
return surf_coords[:, -2] <= np.arccos(1 - 2 * sky_fraction)
return mask_function
[docs]def generate_mask_from_map(coord_system, mask_map=None, nside=None,
mask_map_file=None, box_shift=None,
ret_nside=False):
"""Generate mask function from a veto mask map.
Parameters
----------
coord_system : {'cartesian', 'spherical', 'sky'}
Coordinate system of the generated mask function. If 'sky', the
returned function can accept either 2-d or 3-d coordinates.
mask_map : :class:`numpy.ndarray` or None, optional
A veto array corresponding to pixels in a `healpy` mask map with
parameter `nside` (default is `None`). Ignored if
a `healpy`-generated .fits file for the mask map is provided.
nside : int or None, optional
'NSIDE' parameter of the `healpy` mask map (default is `None`).
Ignored if a `healpy`-generated .fits file for the
mask map is provided.
mask_map_file : *str or* :class:`pathlib.Path`
`healpy`-generated .fits file for the mask map. Must contain
the 'NSIDE' parameter in its header.
box_shift : float, array_like or None, optional
Passed as `box_centre` to :func:`~.coordinates.to_box_coords` so
that the resulting mask function accepts Cartesian coordinates
with origin at a box corner. Ignored unless `coord_system` is
'cartesian'.
ret_nside : bool, optional
If `True` (default is `False`), return the `nside` parameter
from `mask_map` read in.
Returns
-------
mask_function : callable
A mask function accepting `coord_system` as its coordinate system
and returning boolean values.
"""
if mask_map_file is not None:
mask_map, mask_map_header = hp.read_map(
mask_map_file, dtype=float, h=True, verbose=False
)
nside = dict(mask_map_header)['NSIDE']
# Any conversion to the 'cartesian' coordinate system as internal
# `mask_function` is defined for native spherical coordinates.
if coord_system == 'cartesian':
apply_to_box_coords_from_native = 'spherical'
else:
apply_to_box_coords_from_native = 'null'
box_shift = None
# `coords` can be either 'spherical' or 'sky' but the native
# implementation is 'spherical'; the decorator takes care of
# 'cartesian' inputs.
@to_box_coords(apply_to_box_coords_from_native, box_centre=box_shift)
def mask_function(coords):
# Transform to spherical surface coords.
surf_coords = sky_to_spherical(coords) if coord_system == 'sky' \
else np.atleast_2d(coords)
pixel = hp.ang2pix(nside, *surf_coords[:, [-2, -1]].T)
return mask_map[pixel].astype(bool)
if ret_nside:
return mask_function, nside
return mask_function
[docs]def generate_selection_by_distribution(coord_scale, distribution, peak,
scale=None, location=None, shape=None):
"""Generate selection function based on a probability distribution
suitably rescale horizontally and vectically.
Notes
-----
If the generated selection function detects 3-d input coordinate
arrays, it would assume the coordinates are the Cartesian; otherwise
it assumes the coordinates are the radial coordinate.
Parameters
----------
coord_scale : float
Coordinate scale of the selection function, e.g. the maximum
comoving radius of a catalogue.
peak : float
Peak value of the selection function used for renormalising
the probability density function by its maximum value.
distribution : {'gaussian', 'gamma'}
Distribution to use, either 'gaussian' or 'gamma'.
scale, location, shape : float or None, optional
Scale, location and shape parameter of the distribution. For
a normal distribution, `scale` is the standard deviation and
`location` is the mean in units of `coord_scale`; for a gamma
distribution, see :class:`scipy.stats.gamma`.
Returns
-------
callable
Normalised selection function of the same coordinate as the sample
coordinates.
"""
def rescale_coords(coords):
coords = np.linalg.norm(coords, axis=-1) if _is_coord_3d(coords) \
else np.squeeze(coords)
return coords / coord_scale
def gaussian_selection(coords):
r = rescale_coords(coords)
pdf_peak = 1 / (np.sqrt(2 * np.pi) * scale)
return peak / pdf_peak * norm.pdf(r, loc=location, scale=scale)
def gamma_selection(coords):
r = rescale_coords(coords) / (1 + 5 / np.sqrt(shape))
pdf_peak = np.exp(
(shape - 1) * np.log(shape - 1)
- (shape - 1) - np.log(scale) - loggamma(shape)
)
return peak / pdf_peak * gamma.pdf(r, a=shape, scale=scale)
if distribution == 'gaussian':
return gaussian_selection
if distribution == 'gamma':
return gamma_selection
raise ValueError(f"Unsupproted distribution: {distribution}.")
[docs]def generate_selection_by_cut(low_end, high_end):
"""Generate selection function at a constant value by a cut.
Notes
-----
If the generated selection function detects 3-d input coordinate
arrays, it would assume the coordinates are the Cartesian; otherwise
it assumes the coordinates are the radial coordinate.
Parameters
----------
low_end, high_end : float
Low and high end of the cut in some selection coordinate.
Returns
-------
callable
Constant cut selection function.
"""
def selection_function(coords):
coord = np.linalg.norm(coords, axis=-1) if _is_coord_3d(coords) \
else np.squeeze(coords)
return np.logical_and(
np.greater_equal(coord, low_end), np.less_equal(coord, high_end)
)
return selection_function
[docs]def generate_selection_samples(selection_coordinate, coord_scale,
redshift_samples, cosmo, sky_fraction=1.,
bins=None):
"""Generate selection function samples from redshift samples.
The selection function is normalised by the mean number density
inferred from the samples and the sky fraction.
Parameters
----------
selection_coordinate : {'z', 'r'}
Coordinate of the generated selection function, either redshift
'z' or comoving distance 'r'.
coord_scale : float
Coordinate scale used to rescale the selection coordinates, e.g.
the maximum comoving radius of the catalogue to which the
selection function is applied.
redshift_samples : float, array_like
Redshift samples.
cosmo : :class:`nbodykit.cosmology.cosmology.Cosmology`
Cosmological model providing the redshift-to-distance conversion
and thus normalisation of the selection function by volume.
sky_fraction : float, optional
The sky coverage of redshift samples as a fraction used to
normalise the selection function (default is 1.),
``0 < sky_fraction <= 1``.
bins : int, str or None, optional
Number of bins or binning scheme used to generate the selection
function samples (see :func:`numpy.histogram_bin_edges`).
If `None` (default), Scott's rule for binning is used.
Returns
-------
samples, coords : :class:`numpy.ndarray`
Normalised (dimensionless) selection function samples and
corresponding coordinates.
"""
if not 0 < sky_fraction <= 1:
raise ValueError("Sky fraction must be between 0 and 1.")
# Perform binning with specified or optimal Scott's binning.
redshift_samples = redshift_samples[redshift_samples >= 0.]
bins = bins or 'scott'
bin_edges = np.histogram_bin_edges(redshift_samples, bins=bins)
bin_counts, z_edges = np.histogram(redshift_samples, bins=bin_edges)
z_centres = (z_edges[:-1] + z_edges[1:]) / 2.
# Find mean number density in each redshift shell bin and normalise
# by overall density.
r_edges = cosmo.comoving_distance(z_edges)
bin_volumes = sky_fraction * (4./3. * np.pi) \
* (r_edges[1:] ** 3 - r_edges[:-1] ** 3)
overall_nbar = np.sum(bin_counts) / np.sum(bin_volumes)
bin_selections = (bin_counts / bin_volumes) / overall_nbar
bin_centres = z_centres if selection_coordinate == 'z' \
else cosmo.comoving_distance(z_centres)
# Generate a selection function of the appropriate coordinate.
interpolated_selection = Interpolation(bin_centres, bin_selections, ext=1)
# Attempt resampling and reject volatile ends with negative
# interpolated values.
interpolated_selection_samples = interpolated_selection(
np.linspace(bin_centres.min(), bin_centres.max(), num=len(bin_centres))
)
max_point = np.argmax(bin_counts)
low_partition = interpolated_selection_samples[:max_point]
high_partition = interpolated_selection_samples[max_point:]
low_end = np.argmax(low_partition[::-1] < 0)
high_end = np.argmax(high_partition < 0)
burnt_remains = slice(max_point - low_end, max_point + high_end)
coords = bin_centres[burnt_remains] * coord_scale / np.max(bin_centres)
samples = bin_selections[burnt_remains]
return samples, coords
[docs]def generate_selection_from_samples(sample_selections, sample_coords):
"""Generate selection function interpolated from normalised selection
function samples at sample coodinates.
Notes
-----
If the generated selection function detects 3-d input coordinate
arrays, it would assume the coordinates are the Cartesian; otherwise
it assumes the coordinates are the radial coordinate.
Parameters
----------
sample_selections, sample_coords : float :class:`numpy.ndarray`
Selection function samples and coordinates.
Returns
-------
callable
Normalised selection function of the same coordinate as the sample
coordinates.
"""
def selection_function(coords):
r = np.linalg.norm(coords, axis=-1) if _is_coord_3d(coords) \
else np.squeeze(coords)
return Interpolation(sample_coords, sample_selections, ext=1)(r)
return selection_function