Source code for py21cmsense.observation

"""A module defining interferometric observation objects."""

from __future__ import annotations

import collections
from collections import defaultdict
from functools import cached_property
from os import path
from typing import Any, Callable

import attr
import numpy as np
from astropy import units as un
from astropy.cosmology import LambdaCDM, Planck15
from astropy.io.misc import yaml
from attr import validators as vld
from hickleable import hickleable

from . import _utils as ut
from . import conversions as conv
from . import observatory as obs
from . import units as tp


[docs] @hickleable(evaluate_cached_properties=True) @attr.s(kw_only=True) class Observation: """ A class defining an interferometric Observation. Note that to achieve the same behaviour as the original 21cmSense's ``--track`` option (i.e. track-scan instead of drift-scan) merely requires setting ``lst_bin_size`` to the length of the tracked scan. Parameters ---------- observatory : :class:`~py21cmsense.observatory.Observatory` An object defining attributes of the observatory itself (its location etc.) hours_per_day : float or Quantity, optional The number of good observing hours per day. This corresponds to the size of a low-foreground region in right ascension for a drift scanning instrument on the Earth. The total observing time is `n_days*hours_per_day` where a day is 24 hours on the Earth and 655.2 hours on the moon. Default is 6 (163.8 on moon). If simulating a tracked scan, `hours_per_day` should be a multiple of the length of the track (i.e. for two three-hour tracks per day, `hours_per_day` should be 6). track Tracked scan observation duration. This is an alias for ``lst_bin_size``, and is provided only for ease of use for those familiar with 21cmSense v1. lst_bin_size : float or Quantity, optional The time assigned to a single LST bin, by default the time it takes for a source to travel through the beam's FWHM. If a float, assumed to be in minutes. integration_time : float or Quantity, optional The amount of time integrated into a single visibility, by default a minute. If a float, assumed to be in seconds. n_channels : int, optional Number of channels used in the observation. Defaults to 82, which is equivalent to 1024 channels over 100 MHz of bandwidth. Sets maximum k_parallel that can be probed, but little to no overall effect on sensitivity. bandwidth : float or Quantity, optional The bandwidth used for the observation, assumed to be in MHz. Note this is not the total instrument bandwidth, but the redshift range that can be considered co-eval. n_days : int, optional The number of days observed (for the same set of LSTs). The default is 180 (6 on moon), which is the maximum a particular R.A. can be observed in one year if one only observes at night. The total observing time is `n_days*hours_per_day` where a day is 24 hours on the Earth and 655.2 hours on the moon. baseline_filters A function that takes a single value: a length-3 array of baseline co-ordinates, and returns a bool indicating whether to include the baseline. Built-in filters are provided in the :mod:`~baseline_filters` module. redundancy_tol : int, optional The number of decimal places to which baseline vectors must match (in all dimensions) to be considered redundant. coherent : bool, optional Whether to add different baselines coherently if they are not instantaneously redundant. spectral_index : float, optional The spectral index of the foreground model. The foreground model is approximated as a spatially-independent power-law, and used only for generating sky noise temperature. The default is 2.6, based on Mozdzen et al. 2017: 2017MNRAS.464.4995M, figure 8, with Galaxy down (see also `tsky_amplitude` and `tsky_ref_freq`). tsky_amplitude : float or Quantity, optional The temperature of foregrounds at `tsky_ref_freq`. See `spectral_index`. Default assumed to be in mK. tsky_ref_freq : float or Quantity Frequency at which the foreground model is equal to `tsky_amplitude`. See `spectral_index`. Default assumed to be in MHz. use_approximate_cosmo : bool Whether to use approximate cosmological conversion factors. Doing so will give the same results as the original 21cmSense code, but non-approximate versions that use astropy are preferred. cosmo : LambdaCDM An astropy cosmology object to use. phase_center_dec The declination of the phase center (typically used for a tracked scan observation). By default, this is set to the latitude of the observatory (i.e. the phase-center passes through zenith, like a typical drift-scan observation). """ observatory: obs.Observatory = attr.ib(validator=vld.instance_of(obs.Observatory)) time_per_day: tp.Time = attr.ib( validator=(tp.vld_physical_type("time")), ) track: tp.Time | None = attr.ib( None, validator=attr.validators.optional([tp.vld_physical_type("time")]), ) lst_bin_size: tp.Time = attr.ib( validator=(tp.vld_physical_type("time")), ) integration_time: tp.Time = attr.ib( 60 * un.second, validator=(tp.vld_physical_type("time"), ut.positive) ) n_channels: int = attr.ib(82, converter=int, validator=ut.positive) bandwidth: tp.Frequency = attr.ib( 8 * un.MHz, validator=(tp.vld_physical_type("frequency"), ut.positive) ) n_days: int = attr.ib(converter=int, validator=ut.positive) baseline_filters: tuple[Callable[[tp.Length], bool]] = attr.ib( default=(), converter=tp._tuplify ) redundancy_tol: int = attr.ib(default=1, converter=int, validator=ut.nonnegative) coherent: bool = attr.ib(default=True, converter=bool) # The following defaults are based on Mozdzen et al. 2017: 2017MNRAS.464.4995M, # figure 8, with galaxy down. spectral_index: float = attr.ib(default=2.6, converter=float, validator=ut.between(1.5, 4)) tsky_amplitude: tp.Temperature = attr.ib( default=260000 * un.mK, validator=ut.nonnegative, ) tsky_ref_freq: tp.Frequency = attr.ib(default=150 * un.MHz, validator=ut.positive) use_approximate_cosmo: bool = attr.ib(default=False, converter=bool) cosmo: LambdaCDM = attr.ib(default=Planck15, converter=Planck15.from_format) phase_center_dec = attr.ib(validator=(tp.vld_physical_type("angle")))
[docs] @classmethod def from_yaml(cls, yaml_file): """Construct an :class:`Observation` from a YAML file.""" if isinstance(yaml_file, str): with open(yaml_file) as fl: data = yaml.load(fl) elif isinstance(yaml_file, collections.abc.Mapping): data = yaml_file else: raise ValueError("yaml_file must be a string filepath or a raw dict from such a file.") if ( isinstance(data["observatory"], str) and isinstance(yaml_file, str) and not path.isabs(data["observatory"]) ): data["observatory"] = path.join(path.dirname(yaml_file), data["observatory"]) observatory = obs.Observatory.from_yaml(data.pop("observatory")) return cls(observatory=observatory, **data)
def __gethstate__(self) -> dict[str, Any]: """Get the hickle state.""" d = attr.asdict(self, recurse=False) d["cosmo"] = d["cosmo"].to_format("mapping") del d["cosmo"]["cosmology"] # The class. return d def __sethstate__(self, d: dict[str, Any]) -> None: """Set the hickle state.""" d["cosmo"]["cosmology"] = type(Planck15) d["cosmo"] = Planck15.from_format(d["cosmo"]) self.__dict__.update(d) @time_per_day.validator def _time_per_day_vld(self, att, val): day_length = 24 * un.hour if self.observatory.world == "earth" else 655.2 * un.hour if not 0 * un.hour <= val <= day_length: raise ValueError(f"time_per_day should be between 0 and {day_length}") @track.validator def _track_vld(self, att, val): if val is not None: day_length = 24 * un.hour if self.observatory.world == "earth" else 655.2 * un.hour if not 0 * un.hour <= val <= day_length: raise ValueError(f"track should be between 0 and {day_length}") @lst_bin_size.validator def _lst_bin_size_vld(self, att, val): day_length = 24 * un.hour if self.observatory.world == "earth" else 655.2 * un.hour if not 0 * un.hour <= val <= day_length: raise ValueError(f"lst_bin_size should be between 0 and {day_length}") if val > self.time_per_day: raise ValueError("lst_bin_size must be <= time_per_day") @integration_time.validator def _integration_time_vld(self, att, val): if val > self.lst_bin_size: raise ValueError("integration_time must be <= lst_bin_size") @time_per_day.default def _time_per_day_default(self): if self.observatory.world == "earth": return 6 * un.hour else: return 163.8 * un.hour @lst_bin_size.default def _lst_bin_size_default(self): # time it takes the sky to drift through beam FWHM if self.track is not None: return self.track else: return self.observatory.observation_duration @n_days.default def _n_days_default(self): if self.observatory.world == "earth": return 180 else: return 6 @phase_center_dec.default def _phase_center_dec_default(self): return self.observatory.latitude @cached_property def beam_crossing_time(self): """The time it takes for a source to cross the beam. This defines the number of independent cosmic fields observed in a night, which in turn defines the cosmic sample variance. Observations are not assumed to be averaged coherently within this time (for thermal variance calculations). """ return self.observatory.observation_duration @cached_property def baseline_groups( self, ) -> dict[tuple[float, float, float], list[tuple[int, int]]]: """A dictionary of redundant baseline groups. Keys are tuples of floats (X,Y,LENGTH), and values are lists of two-tuples of baseline antenna indices in that particular baseline group. """ return self.observatory.get_redundant_baselines( baseline_filters=self.baseline_filters, ndecimals=self.redundancy_tol ) def __getstate__(self): """Get state so that defaultdict is not used.""" d = dict(self.__dict__.items()) if "baseline_groups" in d: d["baseline_groups"] = dict(d["baseline_groups"]) return d def __setstate__(self, state): """Set state so that defaultdict is restored.""" if "baseline_groups" in state: state["baseline_groups"] = defaultdict(list, state["baseline_groups"]) self.__dict__.update(state) @cached_property def baseline_group_coords(self) -> un.Quantity[un.m]: """Co-ordinates of baseline groups in metres.""" return self.observatory.baseline_coords_from_groups(self.baseline_groups) @cached_property def baseline_group_counts(self) -> np.ndarray: """The number of baselines in each group.""" return self.observatory.baseline_weights_from_groups(self.baseline_groups) @cached_property def baseline_group_lengths(self) -> un.Quantity[un.m]: """The displacement magnitude of the baseline groups.""" return np.sqrt(np.sum(self.baseline_group_coords**2, axis=1)) @cached_property def bl_min(self) -> un.Quantity[un.m]: """Shortest included baseline.""" return self.baseline_group_lengths.min() @cached_property def bl_max(self) -> un.Quantity[un.m]: """Shortest included baseline.""" return self.baseline_group_lengths.max() @property def frequency(self) -> un.Quantity[un.MHz]: """Frequency of the observation.""" return self.observatory.frequency @cached_property def uv_coverage(self) -> np.ndarray: # sourcery skip: assign-if-exp, swap-if-expression """A 2D array specifying the effective number of baselines in a grid of UV. Defined after earth rotation synthesis for a particular LST bin. The u-values on each side of the grid are given by :func:`ugrid`. """ return self.observatory.grid_baselines( coherent=self.coherent, baselines=self.baseline_group_coords, weights=self.baseline_group_counts, integration_time=self.integration_time, observation_duration=self.lst_bin_size, ndecimals=self.redundancy_tol, phase_center_dec=self.phase_center_dec, ) @cached_property def n_lst_bins(self) -> float: """ Number of LST bins in the complete observation. An LST bin is considered a chunk of time that may be averaged coherently, so this is given by `hours_per_day/lst_bin_size`, where `lst_bin_size` is the time it takes for a source to travel through the beam FWHM. """ return (self.time_per_day / self.lst_bin_size).to("").value @cached_property def Tsky(self) -> un.Quantity[un.K]: """Temperature of the sky at the default frequency.""" return self.tsky_amplitude.to("K") * (self.frequency / self.tsky_ref_freq) ** ( -self.spectral_index ) @cached_property def Trcv(self) -> un.Quantity[un.K]: """Receiver temperature.""" if isinstance(self.observatory.Trcv, un.Quantity): return self.observatory.Trcv.to("K") else: return self.observatory.Trcv(self.frequency).to("K") @cached_property def Tsys(self) -> un.Quantity[un.K]: """System temperature (i.e. Tsky + Trcv).""" return self.Tsky.to("K") + self.Trcv @cached_property def redshift(self) -> float: """Central redshift of the observation.""" return conv.f2z(self.frequency) @cached_property def eta(self) -> un.Quantity[1 / un.MHz]: """The fourier dual of the frequencies of the observation.""" return np.fft.fftfreq(self.n_channels, self.bandwidth.to("MHz") / self.n_channels) @cached_property def kparallel(self) -> un.Quantity[un.littleh / un.Mpc]: """1D array of kpar values, defined by the bandwidth and number of channels. Order of the values is the same as `fftfreq` (i.e. zero-first) """ return ( conv.dk_deta(self.redshift, self.cosmo, approximate=self.use_approximate_cosmo) * self.eta ) @cached_property def total_integration_time(self) -> un.Quantity[un.s]: """The total (effective) integration time over UV bins for a particular LST bin. The u-values on each side of the grid are given by :func:`ugrid`. """ return self.uv_coverage * self.n_days * self.integration_time.to(un.s) @cached_property def Trms(self) -> un.Quantity[un.K]: """Effective radiometric noise temperature per UV bin. (i.e. divided by bandwidth and integration time). The u-values on each side of the grid are given by :func:`ugrid`. """ out = np.ones(self.total_integration_time.shape) * np.inf * self.Tsys.unit mask = self.total_integration_time > 0 out[mask] = self.Tsys.to("K") / np.sqrt( 2 * self.bandwidth * self.total_integration_time[mask] ).to("") return out @cached_property def ugrid(self) -> np.ndarray: """Centres of the linear grid which defines a side of the UV grid. The UV grid is defined by :func:`uv_coverage`. """ return self.observatory.ugrid(self.bl_max) @cached_property def ugrid_edges(self) -> np.ndarray: """Edges of the linear grid which defines a side of the UV grid. The UV grid is defined by :func:`uv_coverage`. """ return self.observatory.ugrid_edges(self.bl_max)
[docs] def clone(self, **kwargs) -> Observation: """Create a clone of this instance, with arbitrary changes to parameters.""" return attr.evolve(self, **kwargs)