"""
Classes from which sensitivities can be obtained.
This module modularizes the previous version's `calc_sense.py`, and enables
multiple sensitivity kinds to be defined. By default, a :class:`PowerSpectrum`
sensitivity class is provided, which offers the same results as previous versions.
In the future, we may provide things like ``ImagingSensitivity`` or
``WaveletSensitivity`` for example.
"""
from __future__ import annotations
import attr
import h5py
import hickle
import importlib
import logging
import numpy as np
import tqdm
from astropy import units as un
from astropy.cosmology import LambdaCDM
from astropy.cosmology.units import littleh, with_H0
from astropy.io.misc import yaml
from attr import validators as vld
from cached_property import cached_property
from collections.abc import Mapping
from hickleable import hickleable
from methodtools import lru_cache
from os import path
from pathlib import Path
from typing import Callable
from . import _utils as ut
from . import config
from . import conversions as conv
from . import observation as obs
from . import units as tp
from .theory import _ALL_THEORY_POWER_SPECTRA, EOS2021, TheoryModel
logger = logging.getLogger(__name__)
[docs]
@hickleable(evaluate_cached_properties=True)
@attr.s(kw_only=True)
class Sensitivity:
"""
Base class for sensitivity calculations.
Parameters
----------
observation : :class:`~py21cmsense.Observation` instance
An object defining the observatory and observation used to derive sensitivity.
no_ns_baselines : bool, optional
Remove pure north/south baselines (u=0) from the sensitivity calculation.
These baselines can potentially have higher systematics, so excluding them
represents a conservative choice.
"""
observation: obs.Observation = attr.ib(validator=vld.instance_of(obs.Observation))
no_ns_baselines: bool = attr.ib(default=False, converter=bool)
@staticmethod
def _load_yaml(yaml_file):
if isinstance(yaml_file, str):
with open(yaml_file) as fl:
data = yaml.load(fl)
elif isinstance(yaml_file, Mapping):
data = yaml_file
else:
raise ValueError(
"yaml_file must be a string filepath or a raw dict from such a file."
)
return data
[docs]
@classmethod
def from_yaml(cls, yaml_file) -> Sensitivity:
"""Construct a :class:`Sensitivity` object from a YAML configuration."""
data = cls._load_yaml(yaml_file)
klass = data.pop("class", cls)
if isinstance(yaml_file, str) and not path.isabs(data["observation"]):
obsfile = path.join(path.dirname(yaml_file), data.pop("observation"))
else:
obsfile = data.pop("observation")
if obsfile.endswith(".yml"):
observation = obs.Observation.from_yaml(obsfile)
elif h5py.is_hdf5(obsfile):
observation = hickle.load(obsfile)
else:
raise ValueError(
"observation must be a filename with extension .yml or .h5"
)
return klass(observation=observation, **data)
[docs]
def clone(self, **kwargs):
"""Clone the object with new parameters."""
return attr.evolve(self, **kwargs)
[docs]
def at_frequency(self, frequency: un.Quantity[un.MHz]) -> Sensitivity:
"""Return a new object at a new frequency."""
return self.clone(
observation=self.observation.clone(
observatory=self.observation.observatory.clone(
beam=self.observation.observatory.beam.clone(frequency=frequency)
)
)
)
@property
def cosmo(self) -> LambdaCDM:
"""The cosmology to use in the sensitivity calculations."""
return self.observation.cosmo
@property
def frequency(self) -> un.Quantity[un.MHz]:
"""The frequency of the observation."""
return self.observation.frequency
[docs]
@attr.s(kw_only=True)
class PowerSpectrum(Sensitivity):
"""
A Power Spectrum sensitivity calculator.
Note that the sensitivity calculation contains two major parts: thermal variance
and sample variance (aka cosmic variance). The latter requires a model of the power
spetrum itself, which you should provide via ``k_21`` and ``delta_21``.
Remember that the power spectrum is redshift dependent, and so should be supplied
differently at each frequency being calculated.
Parameters
----------
horizon_buffer : float or Quantity
A buffer to add to the horizon line in order to excise foreground-contaminated modes.
foreground_model : str, {moderate, optimistic}
Which approach to take for foreground excision. Moderate uses a defined horizon buffer,
while optimistic excludes all k modes inside the primary field of view.
k_21 : array or Quantity, optional
An array of wavenumbers used to define a cosmological power spectrum, in order to get
sample variance. If not a Quantity, will assume k has units of 1/Mpc, though it will
convert these units to h/Mpc throughout the class. Default is to use built-in
data file from 21cmFAST.
delta_21 : array or Quantity, optional
An array of Delta^2 power spectrum values used for sample variance.
If not a Quantity, will assume units of mK^2.
systematics_mask : callable
A function that takes a single kperp and an array of kpar, and returns a boolean
array specifying which of the k's are useable after accounting for systematics.
that is, it returns False for k's affected by systematics.
"""
horizon_buffer: tp.Wavenumber = attr.ib(default=0.1 * littleh / un.Mpc)
foreground_model: str = attr.ib(
default="moderate", validator=vld.in_(["moderate", "optimistic"])
)
theory_model: TheoryModel = attr.ib()
systematics_mask: Callable | None = attr.ib(None)
@horizon_buffer.validator
def _horizon_buffer_validator(self, att, val):
tp.vld_unit(littleh / un.Mpc, with_H0(self.cosmo.H0))(self, att, val)
ut.nonnegative(self, att, val)
[docs]
@classmethod
def from_yaml(cls, yaml_file) -> Sensitivity:
"""
Construct a PowerSpectrum sensitivity from yaml.
YAML spec has p21 as a file which obtains k_21 and delta_21.
It is assumed that k in the file is in units of 1/Mpc (true for 21cmFAST).
"""
data = cls._load_yaml(yaml_file)
if "plugins" in data:
if not isinstance(data["plugins"], list):
raise ValueError(
"plugins in YAML file must be a list of modules."
) # pragma: no cover
for mdl in data.pop("plugins"):
try:
importlib.import_module(mdl)
except Exception as e:
raise ImportError(f"Could not import {mdl}") from e
if "theory_model" in data:
data["theory_model"] = _ALL_THEORY_POWER_SPECTRA[data["theory_model"]]()
if isinstance(yaml_file, str):
obsfile = path.join(path.dirname(yaml_file), data.pop("observation"))
else:
obsfile = data.pop("observation")
data["observation"] = obsfile
return super().from_yaml(data)
@theory_model.default
def _theory_model_default(self):
return EOS2021()
@theory_model.validator
def _theory_model_validator(self, att, val):
if not isinstance(val, TheoryModel):
raise ValueError("The theory_model must be an instance of TheoryModel")
@cached_property
def k1d(self) -> tp.Wavenumber:
"""1D array of wavenumbers for which sensitivities will be generated."""
delta = (
conv.dk_deta(
self.observation.redshift,
self.cosmo,
approximate=self.observation.use_approximate_cosmo,
)
/ self.observation.bandwidth
)
dv = delta.value
return np.arange(dv, dv * self.observation.n_channels, dv) * delta.unit
@cached_property
def X2Y(self) -> un.Quantity[un.Mpc**3 / littleh**3 / un.steradian / un.GHz]:
"""Cosmological scaling factor X^2*Y (eg. Parsons 2012)."""
return conv.X2Y(
self.observation.redshift,
approximate=self.observation.use_approximate_cosmo,
)
@cached_property
def uv_coverage(self) -> np.ndarray:
"""The UV-coverage of the array, with unused/redundant baselines set to zero."""
grid = self.observation.uv_coverage.copy()
size = grid.shape[0]
# Cut unnecessary data out of uv coverage: auto-correlations & half of uv
# plane (which is not statistically independent for real sky)
grid[size // 2, size // 2] = 0.0
grid[:, : size // 2] = 0.0
grid[size // 2 :, size // 2] = 0.0
if self.no_ns_baselines:
grid[:, size // 2] = 0.0
return grid
[docs]
def power_normalisation(self, k: tp.Wavenumber) -> float:
"""Normalisation constant for power spectrum."""
assert hasattr(k, "unit")
assert k.unit.is_equivalent(littleh / un.Mpc)
return (
self.X2Y
* self.observation.observatory.beam.b_eff
* self.observation.bandwidth
* k**3
/ (2 * np.pi**2)
).to_value("")
[docs]
def thermal_noise(
self, k_par: tp.Wavenumber, k_perp: tp.Wavenumber, trms: tp.Temperature
) -> tp.Delta:
"""Thermal noise contribution at particular k mode."""
k = np.sqrt(k_par**2 + k_perp**2)
scalar = self.power_normalisation(k)
return scalar * trms.to("mK") ** 2
[docs]
def sample_noise(self, k_par: tp.Wavenumber, k_perp: tp.Wavenumber) -> tp.Delta:
"""Sample variance contribution at a particular k mode."""
k = np.sqrt(k_par**2 + k_perp**2).to_value(
littleh / un.Mpc if self.theory_model.use_littleh else un.Mpc**-1,
with_H0(self.cosmo.H0),
)
return self.theory_model.delta_squared(self.observation.redshift, k)
@cached_property
def _nsamples_2d(
self,
) -> dict[str, dict[tp.Wavenumber, un.Quantity[1 / un.mK**4]]]:
"""Mid-way product specifying thermal and sample variance over the 2D grid."""
# set up blank arrays/dictionaries
sense = {"sample": {}, "thermal": {}, "both": {}}
# loop over uv_coverage to calculate k_pr
nonzero = np.where(self.uv_coverage > 0)
for iu, iv in tqdm.tqdm(
zip(nonzero[1], nonzero[0]),
desc="calculating 2D sensitivity",
unit="uv-bins",
disable=not config.PROGRESS,
total=len(nonzero[1]),
):
u, v = self.observation.ugrid[iu], self.observation.ugrid[iv]
trms = self.observation.Trms[iv, iu]
if np.isinf(trms):
continue
umag = np.sqrt(u**2 + v**2)
k_perp = umag * conv.dk_du(
self.observation.redshift,
self.cosmo,
approximate=self.observation.use_approximate_cosmo,
)
hor = self.horizon_limit(umag)
if k_perp not in sense["thermal"]:
sense["thermal"][k_perp] = (
np.zeros(len(self.observation.kparallel)) / un.mK**4
)
sense["sample"][k_perp] = (
np.zeros(len(self.observation.kparallel)) / un.mK**4
)
sense["both"][k_perp] = (
np.zeros(len(self.observation.kparallel)) / un.mK**4
)
# Exclude parallel modes dominated by foregrounds
kpars = self.observation.kparallel[self.observation.kparallel >= hor]
if not len(kpars):
continue
start = np.where(self.observation.kparallel >= hor)[0][0]
n_inds = (self.observation.kparallel.size - 1) // 2 + 1
inds = np.arange(start=start, stop=n_inds)
thermal = self.thermal_noise(kpars, k_perp, trms)
sample = self.sample_noise(kpars, k_perp)
# The following assumes that the power spectra are averaged with inverse
# variance weighting.
t = 1.0 / thermal**2
s = 1.0 / sample**2
ts = 1.0 / (thermal + sample) ** 2
sense["thermal"][k_perp][inds] += t
sense["thermal"][k_perp][-inds] += t
sense["sample"][k_perp][inds] += s
sense["sample"][k_perp][-inds] += s
sense["both"][k_perp][inds] += ts
sense["both"][k_perp][-inds] += ts
return sense
[docs]
@lru_cache()
def calculate_sensitivity_2d(
self, thermal: bool = True, sample: bool = True
) -> dict[tp.Wavenumber, tp.Delta]:
"""
Calculate power spectrum sensitivity for a grid of cylindrical k modes.
Parameters
----------
thermal : bool, optional
Whether to calculate thermal contribution to the sensitivity
sample : bool, optional
Whether to calculate sample variance contribution to sensitivity.
Returns
-------
dict :
Keys are cylindrical kperp values and values are arrays aligned with
`observation.kparallel`, defining uncertainty in mK^2.
"""
if thermal and sample:
logger.info("Getting Combined Variance")
sense = self._nsamples_2d["both"]
elif thermal:
logger.info("Getting Thermal Variance")
sense = self._nsamples_2d["thermal"]
elif sample:
logger.info("Getting Sample Variance")
sense = self._nsamples_2d["sample"]
else:
raise ValueError("Either thermal or sample must be True")
# errors were added in inverse quadrature, now need to invert and take
# square root to have error bars; also divide errors by number of indep. fields
final_sense = {}
for k_perp in sense.keys():
mask = sense[k_perp] > 0
if self.systematics_mask is not None:
mask &= self.systematics_mask(k_perp, self.observation.kparallel)
if not np.any(mask):
continue
final_sense[k_perp] = np.inf * np.ones(len(mask)) * un.mK**2
if thermal:
total_std = thermal_std = 1 / np.sqrt(
self._nsamples_2d["thermal"][k_perp][mask]
* self.observation.n_lst_bins
)
if sample:
total_std = sample_std = 1 / np.sqrt(
self._nsamples_2d["sample"][k_perp][mask]
* (
self.observation.time_per_day
/ self.observation.beam_crossing_time
).to("")
)
if thermal and sample:
total_std = thermal_std + sample_std
final_sense[k_perp][mask] = total_std
return final_sense
[docs]
def calculate_sensitivity_2d_grid(
self,
kperp_edges: tp.Wavenumber,
kpar_edges: tp.Wavenumber,
thermal: bool = True,
sample: bool = True,
) -> tp.Delta:
"""Calculate the 2D cylindrical sensitivity on a grid of kperp/kpar.
Parameters
----------
kperp_edges
The edges of the bins in kperp.
kpar_edges
The edges of the bins in kpar.
"""
sense2d_inv = np.zeros((len(kperp_edges) - 1, len(kpar_edges) - 1)) << (
1 / un.mK**4
)
sense = self.calculate_sensitivity_2d(thermal=thermal, sample=sample)
assert np.all(np.diff(kperp_edges) > 0)
assert np.all(np.diff(kpar_edges) > 0)
for k_perp in tqdm.tqdm(
sense.keys(),
desc="averaging to 2D grid",
unit="kperp-bins",
disable=not config.PROGRESS,
):
if k_perp < kperp_edges[0] or k_perp >= kperp_edges[-1]:
continue
# Get the kperp bin it's in.
kperp_indx = np.where(k_perp >= kperp_edges)[0][-1]
kpar_indx = np.digitize(self.observation.kparallel, kpar_edges) - 1
good_ks = kpar_indx >= 0
good_ks &= kpar_indx < len(kpar_edges) - 1
sense2d_inv[kperp_indx][kpar_indx[good_ks]] += (
1.0 / sense[k_perp][good_ks] ** 2
)
# invert errors and take square root again for final answer
sense2d = np.ones(sense2d_inv.shape) * un.mK**2 * np.inf
mask = sense2d_inv > 0
sense2d[mask] = 1 / np.sqrt(sense2d_inv[mask])
return sense2d
[docs]
def horizon_limit(self, umag: float) -> tp.Wavenumber:
"""
Calculate a horizon limit, with included buffer, if appropriate.
Parameters
----------
umag : float
Baseline length (in wavelengths) at which to compute the horizon limit.
Returns
-------
float :
Horizon limit, in h/Mpc.
"""
horizon = (
conv.dk_deta(
self.observation.redshift,
self.cosmo,
approximate=self.observation.use_approximate_cosmo,
)
* umag
/ self.observation.frequency
)
# calculate horizon limit for baseline of length umag
if self.foreground_model in ["moderate", "pessimistic"]:
return horizon + self.horizon_buffer
elif self.foreground_model in ["optimistic"]:
return horizon * np.sin(self.observation.observatory.beam.first_null / 2)
def _average_sense_to_1d(
self, sense: dict[tp.Wavenumber, tp.Delta], k1d: tp.Wavenumber | None = None
) -> tp.Delta:
"""Bin 2D sensitivity down to 1D."""
sense1d_inv = np.zeros(len(self.k1d)) / un.mK**4
if k1d is None:
k1d = self.k1d
for k_perp in tqdm.tqdm(
sense.keys(),
desc="averaging to 1D",
unit="kperp-bins",
disable=not config.PROGRESS,
):
k = np.sqrt(self.observation.kparallel**2 + k_perp**2)
good_ks = k >= k1d.min()
good_ks &= k < k1d.max()
for cnt, kbin in enumerate(ut.find_nearest(k1d, k[good_ks])):
sense1d_inv[kbin] += 1.0 / sense[k_perp][good_ks][cnt] ** 2
# invert errors and take square root again for final answer
sense1d = np.ones(sense1d_inv.shape) * un.mK**2 * np.inf
mask = sense1d_inv > 0
sense1d[mask] = 1 / np.sqrt(sense1d_inv[mask])
return sense1d
[docs]
@lru_cache()
def calculate_sensitivity_1d(
self, thermal: bool = True, sample: bool = True
) -> tp.Delta:
"""Calculate a 1D sensitivity curve.
Parameters
----------
thermal
Whether to calculate thermal contribution to the sensitivity
sample
Whether to calculate sample variance contribution to sensitivity.
Returns
-------
array :
1D array with units mK^2... the variance of spherical k modes.
"""
sense = self.calculate_sensitivity_2d(thermal=thermal, sample=sample)
return self._average_sense_to_1d(sense)
[docs]
def calculate_sensitivity_1d_binned(self, k: tp.Wavenumber, **kwargs):
"""Calculate the 1D sensitivity at arbitrary k-bins."""
sense2d = self.calculate_sensitivity_2d(**kwargs)
return self._average_sense_to_1d(sense2d, k1d=k)
@property
def delta_squared(self) -> tp.Delta:
"""The fiducial 21cm power spectrum evaluated at :attr:`k1d`."""
k = self.k1d.to_value(
littleh / un.Mpc if self.theory_model.use_littleh else un.Mpc**-1,
with_H0(self.cosmo.H0),
)
return self.theory_model.delta_squared(self.observation.redshift, k)
[docs]
@lru_cache()
def calculate_significance(
self, thermal: bool = True, sample: bool = True
) -> float:
"""
Calculate significance of a detection of the default cosmological power spectrum.
Parameters
----------
thermal : bool, optional
Whether to calculate thermal contribution to the sensitivity
sample : bool, optional
Whether to calculate sample variance contribution to sensitivity.
Returns
-------
float :
Significance of detection (in units of sigma).
"""
sense1d = self.calculate_sensitivity_1d(thermal=thermal, sample=sample)
snr = self.delta_squared / sense1d
return np.sqrt(float(np.dot(snr, snr.T)))
[docs]
def plot_sense_2d(self, sense2d: dict[tp.Wavenumber, tp.Delta]):
# sourcery skip: raise-from-previous-error
"""Create a colormap plot of the sensitivity un UV bins."""
try:
import matplotlib.pyplot as plt
except ImportError: # pragma: no cover
raise ImportError("matplotlib is required to make plots...")
keys = sorted(sense2d.keys())
x = np.array([v.value for v in keys])
x = (
np.repeat(x, len(self.observation.kparallel))
.reshape((len(x), len(self.observation.kparallel)))
.T
)
y = np.fft.fftshift(
np.repeat(self.observation.kparallel.value, x.shape[1]).reshape(
(len(self.observation.kparallel), x.shape[1])
)
)
z = np.array([np.fft.fftshift(sense2d[key]) for key in keys]).T
plt.pcolormesh(x, y, np.log10(z))
cbar = plt.colorbar()
cbar.set_label(r"$\log_{10} \delta \Delta^2\ \ [{\rm mK}^2]$", fontsize=14)
plt.xlabel(r"$k_\perp$ [h/Mpc]", fontsize=14)
plt.ylabel(r"$k_{||}$ [h/Mpc]", fontsize=14)
[docs]
def write(
self,
filename: str | Path,
thermal: bool = True,
sample: bool = True,
prefix: str = None,
direc: str | Path = ".",
) -> Path:
"""Save sensitivity results to HDF5 file.
Returns
-------
filename
The path to the file that is written.
"""
out = self._get_all_sensitivity_combos(thermal, sample)
prefix = f"{prefix}_" if prefix else ""
if filename is None:
filename = Path(
f"{prefix}{self.foreground_model}_{self.observation.frequency:.3f}.h5".replace(
" ", ""
)
)
else:
filename = Path(filename)
if direc is not None:
filename = Path(direc) / filename
logger.info(f"Writing sensitivies to '{filename}'")
with h5py.File(filename, "w") as fl:
# TODO: We should be careful to try and write everything into this file
# i.e. all the parameters etc.
for k, v in out.items():
fl[k] = v
fl[k.replace("noise", "snr")] = self.delta_squared / v
fl["k"] = self.k1d.to("1/Mpc", with_H0(self.cosmo.H0)).value
fl["delta_squared"] = self.delta_squared
fl.attrs["total_snr"] = self.calculate_significance()
fl.attrs["foreground_model"] = self.foreground_model
fl.attrs["horizon_buffer"] = self.horizon_buffer
fl.attrs["k_unit"] = "1/Mpc"
return filename
[docs]
def plot_sense_1d(self, sample: bool = True, thermal: bool = True):
"""Create a plot of the sensitivity in 1D k-bins."""
try:
import matplotlib.pyplot as plt
except ImportError: # pragma: no cover
raise ImportError("matplotlib is required to make plots...")
out = self._get_all_sensitivity_combos(thermal, sample)
for key, value in out.items():
plt.plot(self.k1d, value, label=key)
plt.xscale("log")
plt.yscale("log")
plt.xlabel("k [h/Mpc]")
plt.ylabel(r"$\Delta^2_N \ [{\rm mK}^2]$")
plt.legend()
plt.title(f"z={conv.f2z(self.observation.frequency):.2f}")
return plt.gcf()
def _get_all_sensitivity_combos(
self, thermal: bool, sample: bool
) -> dict[str, tp.Delta]:
result = {}
if thermal:
result["thermal_noise"] = self.calculate_sensitivity_1d(sample=False)
if sample:
result["sample_noise"] = self.calculate_sensitivity_1d(
thermal=False, sample=True
)
if thermal and sample:
result["sample+thermal_noise"] = self.calculate_sensitivity_1d(
thermal=True, sample=True
)
return result