Source code for py21cmsense.observation

"""A module defining interferometric observation objects."""

from __future__ import annotations

import attr
import collections
import numpy as np
from astropy import units as un
from astropy.cosmology import LambdaCDM, Planck15
from astropy.io.misc import yaml
from attr import validators as vld
from collections import defaultdict
from functools import cached_property, partial
from hickleable import hickleable
from os import path
from typing import Any, Callable

from . import _utils as ut
from . import conversions as conv
from . import observatory as obs
from . import units as tp


[docs] @hickleable(evaluate_cached_properties=True) @attr.s(kw_only=True) class Observation: """ A class defining an interferometric Observation. Note that to achieve the same behaviour as the original 21cmSense's ``--track`` option (i.e. track-scan instead of drift-scan) merely requires setting ``obs_duration`` to the length of the tracked scan. Parameters ---------- observatory : :class:`~py21cmsense.observatory.Observatory` An object defining attributes of the observatory itself (its location etc.) hours_per_day : float or Quantity, optional The number of good observing hours per day. This corresponds to the size of a low-foreground region in right ascension for a drift scanning instrument. The total observing time is `n_days*hours_per_day`. Default is 6. If simulating a tracked scan, `hours_per_day` should be a multiple of the length of the track (i.e. for two three-hour tracks per day, `hours_per_day` should be 6). track Tracked scan observation duration. This is an alias for ``obs_duration``, and is provided only for ease of use for those familiar with 21cmSense v1. obs_duration : float or Quantity, optional The time assigned to a single LST bin, by default the time it takes for a source to travel through the beam's FWHM. If a float, assumed to be in minutes. integration_time : float or Quantity, optional The amount of time integrated into a single visibility, by default a minute. If a float, assumed to be in seconds. n_channels : int, optional Number of channels used in the observation. Defaults to 82, which is equivalent to 1024 channels over 100 MHz of bandwidth. Sets maximum k_parallel that can be probed, but little to no overall effect on sensitivity. bandwidth : float or Quantity, optional The bandwidth used for the observation, assumed to be in MHz. Note this is not the total instrument bandwidth, but the redshift range that can be considered co-eval. n_days : int, optional The number of days observed (for the same set of LSTs). The default is 180, which is the maximum a particular R.A. can be observed in one year if one only observes at night. The total observing time is `n_days*hours_per_day`. baseline_filters A function that takes a single value: a length-3 array of baseline co-ordinates, and returns a bool indicating whether to include the baseline. Built-in filters are provided in the :mod:`~baseline_filters` module. redundancy_tol : int, optional The number of decimal places to which baseline vectors must match (in all dimensions) to be considered redundant. coherent : bool, optional Whether to add different baselines coherently if they are not instantaneously redundant. spectral_index : float, optional The spectral index of the foreground model. The foreground model is approximated as a spatially-independent power-law, and used only for generating sky noise temperature. The default is 2.6, based on Mozdzen et al. 2017: 2017MNRAS.464.4995M, figure 8, with Galaxy down (see also `tsky_amplitude` and `tsky_ref_freq`). tsky_amplitude : float or Quantity, optional The temperature of foregrounds at `tsky_ref_freq`. See `spectral_index`. Default assumed to be in mK. tsky_ref_freq : float or Quantity Frequency at which the foreground model is equal to `tsky_amplitude`. See `spectral_index`. Default assumed to be in MHz. use_approximate_cosmo : bool Whether to use approximate cosmological conversion factors. Doing so will give the same results as the original 21cmSense code, but non-approximate versions that use astropy are preferred. cosmo : LambdaCDM An astropy cosmology object to use. """ observatory: obs.Observatory = attr.ib(validator=vld.instance_of(obs.Observatory)) time_per_day: tp.Time = attr.ib( 6 * un.hour, validator=(tp.vld_physical_type("time"), ut.between(0 * un.hour, 24 * un.hour)), ) track: tp.Time | None = attr.ib( None, validator=attr.validators.optional( [tp.vld_physical_type("time"), ut.between(0, 24 * un.hour)] ), ) lst_bin_size: tp.Time = attr.ib( validator=(tp.vld_physical_type("time"), ut.between(0, 24 * un.hour)), ) integration_time: tp.Time = attr.ib( 60 * un.second, validator=(tp.vld_physical_type("time"), ut.positive) ) n_channels: int = attr.ib(82, converter=int, validator=ut.positive) bandwidth: tp.Frequency = attr.ib( 8 * un.MHz, validator=(tp.vld_physical_type("frequency"), ut.positive) ) n_days: int = attr.ib(default=180, converter=int, validator=ut.positive) baseline_filters: tuple[Callable[[tp.Length], bool]] = attr.ib( default=(), converter=tp._tuplify ) redundancy_tol: int = attr.ib(default=1, converter=int, validator=ut.nonnegative) coherent: bool = attr.ib(default=True, converter=bool) # The following defaults are based on Mozdzen et al. 2017: 2017MNRAS.464.4995M, # figure 8, with galaxy down. spectral_index: float = attr.ib( default=2.6, converter=float, validator=ut.between(1.5, 4) ) tsky_amplitude: tp.Temperature = attr.ib( default=260000 * un.mK, validator=ut.nonnegative, ) tsky_ref_freq: tp.Frequency = attr.ib(default=150 * un.MHz, validator=ut.positive) use_approximate_cosmo: bool = attr.ib(default=False, converter=bool) cosmo: LambdaCDM = attr.ib(default=Planck15, converter=Planck15.from_format)
[docs] @classmethod def from_yaml(cls, yaml_file): """Construct an :class:`Observation` from a YAML file.""" if isinstance(yaml_file, str): with open(yaml_file) as fl: data = yaml.load(fl) elif isinstance(yaml_file, collections.abc.Mapping): data = yaml_file else: raise ValueError( "yaml_file must be a string filepath or a raw dict from such a file." ) if ( isinstance(data["observatory"], str) and isinstance(yaml_file, str) and not path.isabs(data["observatory"]) ): data["observatory"] = path.join( path.dirname(yaml_file), data["observatory"] ) observatory = obs.Observatory.from_yaml(data.pop("observatory")) return cls(observatory=observatory, **data)
def __gethstate__(self) -> dict[str, Any]: """Get the hickle state.""" d = attr.asdict(self, recurse=False) d["cosmo"] = d["cosmo"].to_format("mapping") del d["cosmo"]["cosmology"] # The class. return d def __sethstate__(self, d: dict[str, Any]) -> None: """Set the hickle state.""" d["cosmo"]["cosmology"] = type(Planck15) d["cosmo"] = Planck15.from_format(d["cosmo"]) self.__dict__.update(d) @lst_bin_size.validator def _lst_bin_size_vld(self, att, val): if val > self.time_per_day: raise ValueError("lst_bin_size must be <= time_per_day") @integration_time.validator def _integration_time_vld(self, att, val): if val > self.lst_bin_size: raise ValueError("integration_time must be <= lst_bin_size") @lst_bin_size.default def _obstime_default(self): # time it takes the sky to drift through beam FWHM if self.track is not None: return self.track else: return self.observatory.observation_duration @cached_property def beam_crossing_time(self): """The time it takes for a source to cross the beam. This defines the number of independent cosmic fields observed in a night, which in turn defines the cosmic sample variance. Observations are not assumed to be averaged coherently within this time (for thermal variance calculations). """ return self.observatory.observation_duration @cached_property def baseline_groups( self, ) -> dict[tuple(float, float, float), list[tuple(int, int)]]: """A dictionary of redundant baseline groups. Keys are tuples of floats (X,Y,LENGTH), and values are lists of two-tuples of baseline antenna indices in that particular baseline group. """ return self.observatory.get_redundant_baselines( baseline_filters=self.baseline_filters, ndecimals=self.redundancy_tol ) def __getstate__(self): """Get state so that defaultdict is not used.""" d = dict(self.__dict__.items()) if "baseline_groups" in d: d["baseline_groups"] = dict(d["baseline_groups"]) return d def __setstate__(self, state): """Set state so that defaultdict is restored.""" if "baseline_groups" in state: state["baseline_groups"] = defaultdict(list, state["baseline_groups"]) self.__dict__.update(state) @cached_property def baseline_group_coords(self) -> un.Quantity[un.m]: """Co-ordinates of baseline groups in metres.""" return self.observatory.baseline_coords_from_groups(self.baseline_groups) @cached_property def baseline_group_counts(self) -> np.ndarray: """The number of baselines in each group.""" return self.observatory.baseline_weights_from_groups(self.baseline_groups) @cached_property def baseline_group_lengths(self) -> un.Quantity[un.m]: """The displacement magnitude of the baseline groups.""" return np.sqrt(np.sum(self.baseline_group_coords**2, axis=1)) @cached_property def bl_min(self) -> un.Quantity[un.m]: """Shortest included baseline.""" return self.baseline_group_lengths.min() @cached_property def bl_max(self) -> un.Quantity[un.m]: """Shortest included baseline.""" return self.baseline_group_lengths.max() @property def frequency(self) -> un.Quantity[un.MHz]: """Frequency of the observation.""" return self.observatory.frequency @cached_property def uv_coverage(self) -> np.ndarray: # sourcery skip: assign-if-exp, swap-if-expression """A 2D array specifying the effective number of baselines in a grid of UV. Defined after earth rotation synthesis for a particular LST bin. The u-values on each side of the grid are given by :func:`ugrid`. """ return self.observatory.grid_baselines( coherent=self.coherent, baselines=self.baseline_group_coords, weights=self.baseline_group_counts, integration_time=self.integration_time, observation_duration=self.lst_bin_size, ndecimals=self.redundancy_tol, ) @cached_property def n_lst_bins(self) -> float: """ Number of LST bins in the complete observation. An LST bin is considered a chunk of time that may be averaged coherently, so this is given by `hours_per_day/obs_duration`, where `obs_duration` is the time it takes for a source to travel through the beam FWHM. """ return (self.time_per_day / self.lst_bin_size).to("").value @cached_property def Tsky(self) -> un.Quantity[un.K]: """Temperature of the sky at the default frequency.""" return self.tsky_amplitude.to("K") * (self.frequency / self.tsky_ref_freq) ** ( -self.spectral_index ) @cached_property def Trcv(self) -> un.Quantity[un.K]: """Receiver temperature.""" if isinstance(self.observatory.Trcv, un.Quantity): return self.observatory.Trcv.to("K") else: return self.observatory.Trcv(self.frequency).to("K") @cached_property def Tsys(self) -> un.Quantity[un.K]: """System temperature (i.e. Tsky + Trcv).""" return self.Tsky.to("K") + self.Trcv @cached_property def redshift(self) -> float: """Central redshift of the observation.""" return conv.f2z(self.frequency) @cached_property def eta(self) -> un.Quantity[1 / un.MHz]: """The fourier dual of the frequencies of the observation.""" return np.fft.fftfreq( self.n_channels, self.bandwidth.to("MHz") / self.n_channels ) @cached_property def kparallel(self) -> un.Quantity[un.littleh / un.Mpc]: """1D array of kpar values, defined by the bandwidth and number of channels. Order of the values is the same as `fftfreq` (i.e. zero-first) """ return ( conv.dk_deta( self.redshift, self.cosmo, approximate=self.use_approximate_cosmo ) * self.eta ) @cached_property def total_integration_time(self) -> un.Quantity[un.s]: """The total (effective) integration time over UV bins for a particular LST bin. The u-values on each side of the grid are given by :func:`ugrid`. """ return self.uv_coverage * self.n_days * self.integration_time.to(un.s) @cached_property def Trms(self) -> un.Quantity[un.K]: """Effective radiometric noise temperature per UV bin. (i.e. divided by bandwidth and integration time). The u-values on each side of the grid are given by :func:`ugrid`. """ out = np.ones(self.total_integration_time.shape) * np.inf * self.Tsys.unit mask = self.total_integration_time > 0 out[mask] = self.Tsys.to("K") / np.sqrt( 2 * self.bandwidth * self.total_integration_time[mask] ).to("") return out @cached_property def ugrid(self) -> np.ndarray: """Centres of the linear grid which defines a side of the UV grid. The UV grid is defined by :func:`uv_coverage`. """ return self.observatory.ugrid(self.bl_max) @cached_property def ugrid_edges(self) -> np.ndarray: """Edges of the linear grid which defines a side of the UV grid. The UV grid is defined by :func:`uv_coverage`. """ return self.observatory.ugrid_edges(self.bl_max)
[docs] def clone(self, **kwargs) -> Observation: """Create a clone of this instance, with arbitrary changes to parameters.""" return attr.evolve(self, **kwargs)