Source code for harmonia.mapper.catalogue_maker

"""
Catalogue maker (:mod:`~harmonia.mapper.catalogue_maker`)
===========================================================================

Make discrete catalogues from observed or simulated realisations.

.. autosummary::

    SourceCatalogue
    RandomCatalogue
    SphericalFKPCatalogue
    spherical_indicator

|

"""
import logging
import warnings

import numpy as np
from nbodykit.lab import CSVCatalog, FKPCatalog, UniformCatalog

from harmonia.utils import normalise_vector


[docs]def spherical_indicator(cartesian_position, bounding_radius): """Indicate whether an object lies within the a spherical domain centred at the origin. Parameters ---------- cartesian_position : float, array_like Object position in Cartesian coordinates. bounding_radius : float Bounding radius of the spherical domain. Returns ------- bool :class:`numpy.ndarray` `True` if the objection position lies within the spherical domain. """ return np.linalg.norm(cartesian_position, axis=-1) <= bounding_radius
[docs]class RandomCatalogue(UniformCatalog): """Uniform random catalogue. Notes ----- Origin of the catalogue is at a corner of the catalogue box. Parameters ---------- mean_density : float Desired mean particle number density (in cubic :math:`h`/Mpc). boxsize : float, array_like Catalogue boxsize (in Mpc/:math:`h`) as a scalar or a triple of scalars. seed : int or None, optional Random seed of the catalogue (default is `None`). """ def __init__(self, mean_density, boxsize, seed=None, comm=None): super().__init__(mean_density, boxsize, seed=seed, comm=comm) self.attrs['BoxSize'] = [boxsize,] * 3 \ if np.isscalar(boxsize) else boxsize self.attrs['nbar'] = mean_density if self.comm is None or self.comm.rank == 0: self.logger.debug( "%s generated with origin at the box centre.", self ) def __str__(self): str_info = "nbar={}, boxsize={}, seed={}".format( self.attrs['nbar'], self.attrs['BoxSize'], self.attrs['seed'] ) return f"{self.__class__.__name__}({str_info})"
[docs]class SourceCatalogue(CSVCatalog): """Catalogue from external sources. Notes ----- This assumes the external catalogue has a Cartesian coordinate system with the origin at a corner of the Cartesian box. Parameters ---------- source_file : str Catalogue source file. headings : list of str Column headings of the catalogue file. This must contain three columns 'x', 'y' and 'z' for holding the Cartesian coordinates. mean_density: float or None, optional If not `None` (default), this overrides default catalogue mean number density (in cubic :math:`h`/Mpc) (e.g. to evade integral constraint). boxsize : float or None, optional If not `None` (default), this overrides default catalogue boxsize (in Mpc/:math:`h`) in case of unit upscaling. upscale : float, optional Scaling factor for converting any length unit to Mpc/:math:`h` (default is 1.), e.g. ``upscale=1.e-3`` for converting Kpc/:math:`h` to Mpc/:math:`h`. offset : bool, optional If `True` (default is `False`), add the velocity columns to position columns (for e.g. redshift-space distortions). offset_upscale : float, optional Scaling factor for converting the velocity offset length unit to Mpc/:math:`h` (default is 1.e-3), e.g. ``offset_upscale=1.e-3`` for converting Kpc/:math:`h` to Mpc/:math:`h`. The velocity offset should include the redshift conversion factor (the conformal Hubble parameter). **kwargs Parameters (other than `path` and `names`) to be passed to :class:`nbodykit.source.catalog.CSVCatalog`. Notes ----- The use of ``offset=True`` assumes the source catalogue contains velocity data columns labelled by, for instance, 'vx', for each of the position data columns, for instance, 'x'. """ def __init__(self, source_file, headings, mean_density=None, boxsize=None, upscale=1., offset=False, offset_upscale=1.e-3, **kwargs): super().__init__(str(source_file), headings, **kwargs) # WARNING: Unresolved MPI bug with `nbodykit`. if self.comm is not None and self.comm.size > 1: warnings.warn( "Beware of enabling multi-processing for data catalogues: " "unresolved MPI issue with 'nbodykit'." ) self.attrs['source'] = source_file self.attrs['offset'] = offset if boxsize is not None: self.attrs['BoxSize'] = [boxsize,] * 3 if np.isscalar(boxsize) \ else boxsize else: self.attrs['BoxSize'] = \ np.max(self['Position']) - np.min(self['Position']) if mean_density is not None: self.attrs['nbar'] = mean_density else: self.attrs['nbar'] = self.size / np.prod(self.attrs['BoxSize']) self['Position'] = \ self['x'][:, None] * [upscale, 0, 0] \ + self['y'][:, None] * [0, upscale, 0] \ + self['z'][:, None] * [0, 0, upscale] # Add redshift-space distortions. if offset: self['Position'] = ( self['vx'][:, None] * [offset_upscale, 0, 0] \ + self['vy'][:, None] * [0, offset_upscale, 0] \ + self['vz'][:, None] * [0, 0, offset_upscale] ) * normalise_vector(self['Position']) if self.comm is None or self.comm.rank == 0: self.logger.debug("%s generated.", self) def __str__(self): str_info = "nbar={}, boxsize={}, source={}".format( self.attrs['nbar'], self.attrs['BoxSize'], self.attrs['source'] ) return f"{self.__class__.__name__}({str_info})"
[docs]class SphericalFKPCatalogue: """FKP-style paired catalogues in a spherical domain. Parameters ---------- data_catalogue : :class:`~.SourceCatalogue` *or None, optional* Data catalogue. Cannot be `None` (default) unless `source_file` and `source_kwargs` are provided as an alternative to read the catalogue source file. source_file : str or None, optional Catalogue source file path. This can only be `None` (default) if `data_catalogue` is already provided. source_kwargs : dict or None, optional Parameters to pass to :class:`~.SourceCatalogue`. This can only be `None` (default) if `data_catalogue` is already provided. random_catalogue : :class:`nbodykit.base.catalog.CatalogSource` *or None, optional* Random catalogue (default is `None`). contrast : float or None, optional Mean density contrast compared to the data catalogue used to generate a random catalogue (default is `None`). Ignored if `random_catalogue` is provided. mask : callable or None, optional Any veto mask function to be applied to both the data and random catalogues. Must be a function of 3-d Cartesian coordinates only, assuming the origin is at the centre of the catalogues. selection : callable or None, optional Any selection function (normalised to unity) to be applied to both the data and random catalogues. Must be a function of 3-d Cartesian coordinates only, assuming the origin is at the centre of the catalogues. weight : callable or None, optional Any weight function to be applied to both the data and random catalogues. Must be a function of 3-d Cartesian coordinates only, assuming the origin is at the centre of the catalogues. random_seed : int or None, optional Random seed of the random catalogue (default is `None`). apply_selection_as_veto : bool, optional If `True` (default is `False`), `selection` returning boolean values is applied to the 'Selection' column of the catalogues instead of the 'NZ' column (see `nbodykit` documentation for this peculiarity.) Attributes ---------- catalogue_pair : |FKPCatalog| FKP-style paired catalogue. data_catalogue : :class:`~.SourceCatalogue` Data catalogue. random_catalogue : :class:`nbodykit.base.catalog.CatalogSource` Random catalogue. attrs : dict Attributes including initialisation parameters. Notes ----- If `selection` is provided, it must be normalised to the overall mean number density of the data catalogue. .. |FKPCatalog| replace:: :class:`nbodykit.algorithms.convpower.catalog.FKPCatalog` """ _msg = { 'boxsizes': \ "Different boxsizes for %s and %s catalogues: %s and %s.", 'inscribe': ( "Bounding sphere larger than the %s catalogue: " "diameter %.1f > boxsize %s." ), 'centre': ( "Centred %s catalogue with centred positions in 'Location' column." ), 'sphericalise': "Restricted %s catalogue to spherical domain.", 'mask': "Applied mask veto to %s catalogue.", 'selection': "Applied selection function to %s catalogue.", 'weight': "Applied weighting scheme to %s catalogue.", } def __init__(self, radius, data_catalogue=None, source_file=None, source_kwargs=None, random_catalogue=None, contrast=None, mask=None, selection=None, weight=None, random_seed=None, apply_selection_as_veto=False): self.logger = logging.getLogger(self.__class__.__name__) if data_catalogue is None: self.data_catalogue = SourceCatalogue(source_file, **source_kwargs) else: self.data_catalogue = data_catalogue try: source_file = self.data_catalogue.attrs['source'] except (AttributeError, KeyError): source_file = None if random_catalogue is not None: self.random_catalogue = random_catalogue contrast = \ random_catalogue.attrs['nbar'] / data_catalogue.attrs['nbar'] elif contrast is not None: self.random_catalogue = RandomCatalogue( contrast * self.data_catalogue.attrs['nbar'], self.data_catalogue.attrs['BoxSize'], seed=random_seed ) else: self.random_catalogue = None self.attrs = { 'bounding_radius': radius, 'source': source_file, 'contrast': contrast, } self.catalogue_pair = self._initialise( mask, selection, weight, apply_selection_as_veto ) def __str__(self): str_info = ( "radius={bounding_radius}, source={source}, contrast={contrast}" ).format(**self.attrs) return f"{self.__class__.__name__}({str_info})" def _initialise(self, mask, selection, weight, apply_selection_as_veto): radius = self.attrs['bounding_radius'] data_boxsize = self.data_catalogue.attrs['BoxSize'] if self.random_catalogue is None: rand_boxsize = None else: rand_boxsize = self.random_catalogue.attrs['BoxSize'] # Compare data and random catalogue boxsizes. if not np.allclose(data_boxsize, rand_boxsize): warnings.warn( self._msg['boxsizes'], 'data', 'random', np.around(data_boxsize, decimals=0), np.around(rand_boxsize, decimals=0) ) # Restrict catalogued in the spherical domain. for name, catalogue, boxsize in zip( ['data', 'random'], [self.data_catalogue, self.random_catalogue], [data_boxsize, rand_boxsize] ): if catalogue is None: break # Check the bounding sphere is entirely within the catalogue. if np.any(np.less(boxsize, 2 * radius)): self.logger.debug( self._msg['inscribe'], name, 2 * radius, boxsize ) # Centre the coordinate origin if not already centred. is_origin_at_corner = np.any( np.isclose( np.min(np.abs(catalogue['Position']), axis=0) / np.max(np.abs(catalogue['Position']), axis=0), 0., atol=1.e-2 ) ) if is_origin_at_corner: catalogue['Location'] = \ catalogue['Position'] - np.divide(boxsize, 2) self.logger.debug(self._msg['centre'], name) else: catalogue['Location'] = catalogue['Position'] # Sphericalise and set base selection values ('NZ') for the # `nbodykit` `ConvolvedFFTPower` algorithm. catalogue['Selection'] &= \ spherical_indicator(catalogue['Location'], radius) self.logger.debug(self._msg['sphericalise'], name) # Apply any mask, selection or weight. catalogue['NZ'] = \ self.data_catalogue.attrs['nbar'] * np.ones(catalogue.csize) if callable(mask): catalogue['Selection'] &= mask(catalogue['Location']) self.logger.debug(self._msg['mask'], name) if callable(selection): if apply_selection_as_veto: catalogue['Selection'] *= selection(catalogue['Location']) else: catalogue['NZ'] *= selection(catalogue['Location']) self.logger.debug(self._msg['selection'], name) if callable(weight): catalogue['Weight'] *= weight(catalogue['Location']) self.logger.debug(self._msg['weight'], name) # Construct FKP catalogue. catalogue_pair = FKPCatalog(self.data_catalogue, self.random_catalogue) return catalogue_pair