"""A module defining interferometric observation objects."""
from __future__ import annotations
import collections
from collections import defaultdict
from functools import cached_property
from os import path
from typing import Any, Callable
import attr
import numpy as np
from astropy import units as un
from astropy.cosmology import LambdaCDM, Planck15
from astropy.io.misc import yaml
from attr import validators as vld
from hickleable import hickleable
from . import _utils as ut
from . import conversions as conv
from . import observatory as obs
from . import units as tp
[docs]
@hickleable(evaluate_cached_properties=True)
@attr.s(kw_only=True)
class Observation:
"""
A class defining an interferometric Observation.
Note that to achieve the same behaviour as the original 21cmSense's ``--track``
option (i.e. track-scan instead of drift-scan) merely requires setting ``lst_bin_size``
to the length of the tracked scan.
Parameters
----------
observatory : :class:`~py21cmsense.observatory.Observatory`
An object defining attributes of the observatory itself (its location etc.)
hours_per_day : float or Quantity, optional
The number of good observing hours per day. This corresponds to the size of a
low-foreground region in right ascension for a drift scanning instrument on the Earth.
The total observing time is `n_days*hours_per_day` where a day is 24 hours on the
Earth and 655.2 hours on the moon. Default is 6 (163.8 on moon).
If simulating a tracked scan, `hours_per_day` should be a multiple of the length
of the track (i.e. for two three-hour tracks per day, `hours_per_day` should be 6).
track
Tracked scan observation duration. This is an alias for ``lst_bin_size``, and
is provided only for ease of use for those familiar with 21cmSense v1.
lst_bin_size : float or Quantity, optional
The time assigned to a single LST bin, by default the time it takes for a source
to travel through the beam's FWHM. If a float, assumed to be in minutes.
integration_time : float or Quantity, optional
The amount of time integrated into a single visibility, by default a minute.
If a float, assumed to be in seconds.
bandwidth : Quantity, optional
The bandwidth used for the observation; must be an astropy Quantity with frequency units.
Note this is not the total instrument bandwidth, but the redshift range that can be
considered coeval.
channel_bandwidth : Quantity, optional
The bandwidth of a single frequency channel; must be an astropy Quantity with frequency
units. If provided, overrides ``n_channels`` (which is computed as
``bandwidth / channel_bandwidth``). Not set by default. If both ``channel_bandwidth``
and ``n_channels`` are explicitly provided, ``n_channels`` takes precedence.
n_channels : int, optional
Number of channels across the coeval bandwidth (see ``bandwidth`` parameter).
Defaults to 82, to match the HERA 97 kHz channel width across 8 MHz. Sets maximum
k_parallel that can be probed, but little to no overall effect on sensitivity.
Overridden by ``channel_bandwidth`` if that parameter is provided.
n_days : int, optional
The number of days observed (for the same set of LSTs). The default is 180 (6 on moon),
which is the maximum a particular R.A. can be observed in one year if one only observes
at night. The total observing time is `n_days*hours_per_day` where a day is 24 hours on
the Earth and 655.2 hours on the moon.
baseline_filters
A function that takes a single value: a length-3 array of baseline co-ordinates,
and returns a bool indicating whether to include the baseline. Built-in filters
are provided in the :mod:`~baseline_filters` module.
redundancy_tol : int, optional
The number of decimal places to which baseline vectors must match (in
all dimensions) to be considered redundant.
coherent : bool, optional
Whether to add different baselines coherently if they are not instantaneously redundant.
spectral_index : float, optional
The spectral index of the foreground model. The foreground model is approximated as
a spatially-independent power-law, and used only for generating sky noise temperature.
The default is 2.6, based on Mozdzen et al. 2017: 2017MNRAS.464.4995M, figure 8,
with Galaxy down (see also `tsky_amplitude` and `tsky_ref_freq`).
tsky_amplitude : float or Quantity, optional
The temperature of foregrounds at `tsky_ref_freq`. See `spectral_index`.
Default assumed to be in mK.
tsky_ref_freq : float or Quantity
Frequency at which the foreground model is equal to `tsky_amplitude`.
See `spectral_index`. Default assumed to be in MHz.
use_approximate_cosmo : bool
Whether to use approximate cosmological conversion factors. Doing so will give
the same results as the original 21cmSense code, but non-approximate versions
that use astropy are preferred.
cosmo : LambdaCDM
An astropy cosmology object to use.
phase_center_dec
The declination of the phase center (typically used for a tracked scan
observation). By default, this is set to the latitude of the observatory
(i.e. the phase-center passes through zenith, like a typical drift-scan
observation).
"""
observatory: obs.Observatory = attr.ib(validator=vld.instance_of(obs.Observatory))
time_per_day: tp.Time = attr.ib(
validator=(tp.vld_physical_type("time")),
)
track: tp.Time | None = attr.ib(
None,
validator=attr.validators.optional([tp.vld_physical_type("time")]),
)
lst_bin_size: tp.Time = attr.ib(
validator=(tp.vld_physical_type("time")),
)
integration_time: tp.Time = attr.ib(
60 * un.second, validator=(tp.vld_physical_type("time"), ut.positive)
)
bandwidth: tp.Frequency = attr.ib(
8 * un.MHz, validator=(tp.vld_physical_type("frequency"), ut.positive)
)
channel_bandwidth: tp.Frequency | None = attr.ib(
default=None,
validator=attr.validators.optional([tp.vld_physical_type("frequency"), ut.positive]),
)
n_channels: int = attr.ib(converter=int, validator=ut.positive)
n_days: int = attr.ib(converter=int, validator=ut.positive)
baseline_filters: tuple[Callable[[tp.Length], bool]] = attr.ib(
default=(), converter=tp._tuplify
)
redundancy_tol: int = attr.ib(default=1, converter=int, validator=ut.nonnegative)
coherent: bool = attr.ib(default=True, converter=bool)
# The following defaults are based on Mozdzen et al. 2017: 2017MNRAS.464.4995M,
# figure 8, with galaxy down.
spectral_index: float = attr.ib(default=2.6, converter=float, validator=ut.between(1.5, 4))
tsky_amplitude: tp.Temperature = attr.ib(
default=260000 * un.mK,
validator=ut.nonnegative,
)
tsky_ref_freq: tp.Frequency = attr.ib(default=150 * un.MHz, validator=ut.positive)
use_approximate_cosmo: bool = attr.ib(default=False, converter=bool)
cosmo: LambdaCDM = attr.ib(default=Planck15, converter=Planck15.from_format)
phase_center_dec = attr.ib(validator=(tp.vld_physical_type("angle")))
[docs]
@classmethod
def from_yaml(cls, yaml_file):
"""Construct an :class:`Observation` from a YAML file."""
if isinstance(yaml_file, str):
with open(yaml_file) as fl:
data = yaml.load(fl)
elif isinstance(yaml_file, collections.abc.Mapping):
data = yaml_file
else:
raise ValueError("yaml_file must be a string filepath or a raw dict from such a file.")
if (
isinstance(data["observatory"], str)
and isinstance(yaml_file, str)
and not path.isabs(data["observatory"])
):
data["observatory"] = path.join(path.dirname(yaml_file), data["observatory"])
observatory = obs.Observatory.from_yaml(data.pop("observatory"))
return cls(observatory=observatory, **data)
def __gethstate__(self) -> dict[str, Any]:
"""Get the hickle state."""
d = attr.asdict(self, recurse=False)
d["cosmo"] = d["cosmo"].to_format("mapping")
del d["cosmo"]["cosmology"] # The class.
return d
def __sethstate__(self, d: dict[str, Any]) -> None:
"""Set the hickle state."""
d["cosmo"]["cosmology"] = type(Planck15)
d["cosmo"] = Planck15.from_format(d["cosmo"])
self.__dict__.update(d)
@time_per_day.validator
def _time_per_day_vld(self, att, val):
day_length = 24 * un.hour if self.observatory.world == "earth" else 655.2 * un.hour
if not 0 * un.hour <= val <= day_length:
raise ValueError(f"time_per_day should be between 0 and {day_length}")
@track.validator
def _track_vld(self, att, val):
if val is not None:
day_length = 24 * un.hour if self.observatory.world == "earth" else 655.2 * un.hour
if not 0 * un.hour <= val <= day_length:
raise ValueError(f"track should be between 0 and {day_length}")
@lst_bin_size.validator
def _lst_bin_size_vld(self, att, val):
day_length = 24 * un.hour if self.observatory.world == "earth" else 655.2 * un.hour
if not 0 * un.hour <= val <= day_length:
raise ValueError(f"lst_bin_size should be between 0 and {day_length}")
if val > self.time_per_day:
raise ValueError("lst_bin_size must be <= time_per_day")
@integration_time.validator
def _integration_time_vld(self, att, val):
if val > self.lst_bin_size:
raise ValueError("integration_time must be <= lst_bin_size")
@time_per_day.default
def _time_per_day_default(self):
if self.observatory.world == "earth":
return 6 * un.hour
else:
return 163.8 * un.hour
@lst_bin_size.default
def _lst_bin_size_default(self):
# time it takes the sky to drift through beam FWHM
if self.track is not None:
return self.track
else:
return self.observatory.observation_duration
@n_days.default
def _n_days_default(self):
if self.observatory.world == "earth":
return 180
else:
return 6
@n_channels.default
def _n_channels_default(self):
if self.channel_bandwidth is not None:
return round((self.bandwidth / self.channel_bandwidth).to("").value)
return 82
@phase_center_dec.default
def _phase_center_dec_default(self):
return self.observatory.latitude
@cached_property
def beam_crossing_time(self):
"""The time it takes for a source to cross the beam.
This defines the number of independent cosmic fields observed in a night, which
in turn defines the cosmic sample variance. Observations are not assumed to be
averaged coherently within this time (for thermal variance calculations).
"""
return self.observatory.observation_duration
@cached_property
def baseline_groups(
self,
) -> dict[tuple[float, float, float], list[tuple[int, int]]]:
"""A dictionary of redundant baseline groups.
Keys are tuples of floats (X,Y,LENGTH), and
values are lists of two-tuples of baseline antenna indices in that particular
baseline group.
"""
return self.observatory.get_redundant_baselines(
baseline_filters=self.baseline_filters, ndecimals=self.redundancy_tol
)
def __getstate__(self):
"""Get state so that defaultdict is not used."""
d = dict(self.__dict__.items())
if "baseline_groups" in d:
d["baseline_groups"] = dict(d["baseline_groups"])
return d
def __setstate__(self, state):
"""Set state so that defaultdict is restored."""
if "baseline_groups" in state:
state["baseline_groups"] = defaultdict(list, state["baseline_groups"])
self.__dict__.update(state)
@cached_property
def baseline_group_coords(self) -> un.Quantity[un.m]:
"""Co-ordinates of baseline groups in metres."""
return self.observatory.baseline_coords_from_groups(self.baseline_groups)
@cached_property
def baseline_group_counts(self) -> np.ndarray:
"""The number of baselines in each group."""
return self.observatory.baseline_weights_from_groups(self.baseline_groups)
@cached_property
def baseline_group_lengths(self) -> un.Quantity[un.m]:
"""The displacement magnitude of the baseline groups."""
return np.sqrt(np.sum(self.baseline_group_coords**2, axis=1))
@cached_property
def bl_min(self) -> un.Quantity[un.m]:
"""Shortest included baseline."""
return self.baseline_group_lengths.min()
@cached_property
def bl_max(self) -> un.Quantity[un.m]:
"""Shortest included baseline."""
return self.baseline_group_lengths.max()
@property
def frequency(self) -> un.Quantity[un.MHz]:
"""Frequency of the observation."""
return self.observatory.frequency
@cached_property
def uv_coverage(self) -> np.ndarray:
# sourcery skip: assign-if-exp, swap-if-expression
"""A 2D array specifying the effective number of baselines in a grid of UV.
Defined after earth rotation synthesis for a particular LST bin.
The u-values on each side of the grid are given by :func:`ugrid`.
"""
return self.observatory.grid_baselines(
coherent=self.coherent,
baselines=self.baseline_group_coords,
weights=self.baseline_group_counts,
integration_time=self.integration_time,
observation_duration=self.lst_bin_size,
ndecimals=self.redundancy_tol,
phase_center_dec=self.phase_center_dec,
)
@cached_property
def n_lst_bins(self) -> float:
"""
Number of LST bins in the complete observation.
An LST bin is considered a chunk of time that may be averaged
coherently, so this is given by `hours_per_day/lst_bin_size`,
where `lst_bin_size` is the time it takes for a source to travel
through the beam FWHM.
"""
return (self.time_per_day / self.lst_bin_size).to("").value
@cached_property
def Tsky(self) -> un.Quantity[un.K]:
"""Temperature of the sky at the default frequency."""
return self.tsky_amplitude.to("K") * (self.frequency / self.tsky_ref_freq) ** (
-self.spectral_index
)
@cached_property
def Trcv(self) -> un.Quantity[un.K]:
"""Receiver temperature."""
if isinstance(self.observatory.Trcv, un.Quantity):
return self.observatory.Trcv.to("K")
else:
return self.observatory.Trcv(self.frequency).to("K")
@cached_property
def Tsys(self) -> un.Quantity[un.K]:
"""System temperature (i.e. Tsky + Trcv)."""
return self.Tsky.to("K") + self.Trcv
@cached_property
def redshift(self) -> float:
"""Central redshift of the observation."""
return conv.f2z(self.frequency)
@cached_property
def eta(self) -> un.Quantity[1 / un.MHz]:
"""The fourier dual of the frequencies of the observation."""
return np.fft.fftfreq(self.n_channels, self.bandwidth.to("MHz") / self.n_channels)
@cached_property
def kparallel(self) -> un.Quantity[un.littleh / un.Mpc]:
"""1D array of kpar values, defined by the bandwidth and number of channels.
Order of the values is the same as `fftfreq` (i.e. zero-first)
"""
return (
conv.dk_deta(self.redshift, self.cosmo, approximate=self.use_approximate_cosmo)
* self.eta
)
@cached_property
def total_integration_time(self) -> un.Quantity[un.s]:
"""The total (effective) integration time over UV bins for a particular LST bin.
The u-values on each side of the grid are given by :func:`ugrid`.
"""
return self.uv_coverage * self.n_days * self.integration_time.to(un.s)
@cached_property
def Trms(self) -> un.Quantity[un.K]:
"""Effective radiometric noise temperature per UV bin.
(i.e. divided by bandwidth and integration time).
The u-values on each side of the grid are given by :func:`ugrid`.
"""
out = np.ones(self.total_integration_time.shape) * np.inf * self.Tsys.unit
mask = self.total_integration_time > 0
out[mask] = self.Tsys.to("K") / np.sqrt(
2 * self.bandwidth * self.total_integration_time[mask]
).to("")
return out
@cached_property
def ugrid(self) -> np.ndarray:
"""Centres of the linear grid which defines a side of the UV grid.
The UV grid is defined by :func:`uv_coverage`.
"""
return self.observatory.ugrid(self.bl_max)
@cached_property
def ugrid_edges(self) -> np.ndarray:
"""Edges of the linear grid which defines a side of the UV grid.
The UV grid is defined by :func:`uv_coverage`.
"""
return self.observatory.ugrid_edges(self.bl_max)
[docs]
def clone(self, **kwargs) -> Observation:
"""Create a clone of this instance, with arbitrary changes to parameters."""
return attr.evolve(self, **kwargs)