Source code for spectral_cube.stokes_spectral_cube

import numpy as np

from astropy.io.registry import UnifiedReadWriteMethod
from .io.core import StokesSpectralCubeRead, StokesSpectralCubeWrite
from .spectral_cube import SpectralCube, BaseSpectralCube
from . import wcs_utils
from .masks import BooleanArrayMask, is_broadcastable_and_smaller

__all__ = ['StokesSpectralCube']

VALID_STOKES = ['I', 'Q', 'U', 'V', 'RR', 'LL', 'RL', 'LR', 'XX', 'XY', 'YX', 'YY', 
                'RX', 'RY', 'LX', 'LY', 'XR', 'XL', 'YR', 'YL', 'PP', 'PQ', 'QP', 'QQ', 
                'RCircular', 'LCircular', 'Linear', 'Ptotal', 'Plinear', 'PFtotal', 
                'PFlinear', 'Pangle']
STOKES_SKY = ['I','Q','U','V']
STOKES_FEED_LINEAR = ['XX', 'XY', 'YX', 'YY']
STOKES_FEED_CIRCULAR = ['RR', 'RL', 'LR', 'LL']
STOKES_FEED_GENERIC = ['PP', 'PQ', 'QP', 'QQ']


[docs] class StokesSpectralCube(object): """ A class to store a spectral cube with multiple Stokes parameters. The individual Stokes cubes can share a common mask in addition to having component-specific masks. """ def __init__(self, stokes_data, mask=None, meta=None, fill_value=None): self._stokes_data = stokes_data self._meta = meta or {} self._fill_value = fill_value reference = tuple(stokes_data.keys())[0] for component in stokes_data: if not isinstance(stokes_data[component], BaseSpectralCube): raise TypeError("stokes_data should be a dictionary of " "SpectralCube objects") if not wcs_utils.check_equality(stokes_data[component].wcs, stokes_data[reference].wcs): raise ValueError("All spectral cubes in stokes_data " "should have the same WCS") if component not in VALID_STOKES: raise ValueError("Invalid Stokes component: {0} - should be one of I, Q, U, V, RR, LL, RL, LR, XX, XY, YX, YY, \ RX, RY, LX, LY, XR, XL, YR, YL, PP, PQ, QP, QQ, \ RCircular, LCircular, Linear, Ptotal, Plinear, PFtotal, PFlinear, Pangle".format(component)) if stokes_data[component].shape != stokes_data[reference].shape: raise ValueError("All spectral cubes should have the same shape") self._wcs = stokes_data[reference].wcs self._shape = stokes_data[reference].shape if isinstance(mask, BooleanArrayMask): if not is_broadcastable_and_smaller(mask.shape, self._shape): raise ValueError("Mask shape is not broadcastable to data shape:" " {0} vs {1}".format(mask.shape, self._shape)) if set(stokes_data).issubset(set(STOKES_SKY)): self._stokes_type = 'SKY_STOKES' elif set(stokes_data).issubset(set(STOKES_FEED_LINEAR)): self._stokes_type = 'FEED_LINEAR' elif set(stokes_data).issubset(set(STOKES_FEED_CIRCULAR)): self._stokes_type = 'FEED_CIRCULAR' elif set(stokes_data).issubset(set(STOKES_FEED_GENERIC)): self._stokes_type = 'FEED_GENERIC' elif set(stokes_data).issubset(set(VALID_STOKES)): self._stokes_type = 'VALID_STOKES' self._mask = mask def __getitem__(self, key): if key in self._stokes_data: return self._stokes_data[key] else: raise KeyError("Key {0} does not exist in this cube.".format(key)) def __setitem__(self, key, item): if key in self._stokes_data: self._stokes_data[key] = item else: errmsg = "Assigning new Stokes axes is not yet supported." raise NotImplementedError(errmsg) @property def shape(self): return self._shape @property def stokes_data(self): """ The underlying data """ return self._stokes_data @property def mask(self): """ The underlying mask """ return self._mask @property def wcs(self): return self._wcs def __dir__(self): return self.components + super(StokesSpectralCube, self).__dir__() @property def components(self): return list(self._stokes_data.keys()) @property def stokes_type(self): """ Defines the type of stokes that has been setup. `stokes_type` can be sky, linear, circular, generic, or other. * `Sky` refers to stokes in sky basis, I,Q,U,V * `Linear` refers to the stokes in linear feed basis, XX, XY, YX, YY * `Circular` refers to stokes in circular feed basis, RR, RL, LR, LL * `Generic` refers to the general four orthogonal components, PP, PQ, QP, QQ """ return self._stokes_type def __getattr__(self, attribute): """ Descriptor to return the Stokes cubes """ if attribute in self._stokes_data: if self.mask is not None: return self._stokes_data[attribute].with_mask(self.mask) else: return self._stokes_data[attribute] else: raise AttributeError("StokesSpectralCube has no attribute {0}".format(attribute))
[docs] def with_mask(self, mask, inherit_mask=True): """ Return a new StokesSpectralCube instance that contains a composite mask of the current StokesSpectralCube and the new ``mask``. Parameters ---------- mask : :class:`MaskBase` instance, or boolean numpy array The mask to apply. If a boolean array is supplied, it will be converted into a mask, assuming that `True` values indicate included elements. inherit_mask : bool (optional, default=True) If True, combines the provided mask with the mask currently attached to the cube Returns ------- new_cube : :class:`StokesSpectralCube` A cube with the new mask applied. Notes ----- This operation returns a view into the data, and not a copy. """ if isinstance(mask, np.ndarray): if not is_broadcastable_and_smaller(mask.shape, self.shape): raise ValueError("Mask shape is not broadcastable to data shape: " "%s vs %s" % (mask.shape, self.shape)) mask = BooleanArrayMask(mask, self.wcs) if self._mask is not None: return self._new_cube_with(mask=self.mask & mask if inherit_mask else mask) else: return self._new_cube_with(mask=mask)
def _new_cube_with(self, stokes_data=None, mask=None, meta=None, fill_value=None): data = self._stokes_data if stokes_data is None else stokes_data mask = self._mask if mask is None else mask if meta is None: meta = {} meta.update(self._meta) fill_value = self._fill_value if fill_value is None else fill_value cube = StokesSpectralCube(stokes_data=data, mask=mask, meta=meta, fill_value=fill_value) return cube
[docs] def transform_basis(self, stokes_basis=''): """ The goal of this function is to transform the stokes basis of the cube to the one specified if possible. In principle one needs at least two related stokes of a given basis for the transformation to be possible. At this moment we are limiting it to the cases where all four stokes parameters in a given basis be available within the cube. The transformations are as follows Linear to Sky I = (XX + YY) / 2 Q = (XX - YY) / 2 U = (XY + YX) / 2 V = 1j(XY - YX) / 2 Circular to Sky I = (RR + LL) / 2 Q = (RL + LR) / 2 U = 1j*(RL - LR) / 2 V = (RR - LL) / 2 """ if self.shape[0] < 4: errmsg = "Transformation of a subset of Stokes axes is not yet supported." errmsg_template = "Transformation from {} to {} is not yet supported." raise NotImplementedError(errmsg) elif stokes_basis == "Generic": data = self._stokes_data elif self.stokes_type == "Linear": if stokes_basis == "Circular": errmsg = errmsg_template.format("Linear", "Circular") raise NotImplementedError(errmsg) elif stokes_basis == "Sky": data = dict(I = self.__class__(self._stokes_data['XX'] + self._stokes_data['YY'], wcs = self.wcs, mask=self._mask['XX']&self._mask['YY']), Q = self.__class__(self._stokes_data['XX'] - self._stokes_data['YY'], wcs = self.wcs, mask=self._mask['XX']&self._mask['YY']), U = self.__class__(self._stokes_data['XY'] + self._stokes_data['YX'], wcs = self.wcs, mask=self._mask['XY']&self._mask['YX']), V = self.__class__(1j*(self._stokes_data['XY'] - self._stokes_data['YX']), wcs = self.wcs, mask=self._mask['XY']&self._mask['YX'])) elif stokes_basis == "Linear": data = self._stokes_data elif self.stokes_type == "Circular": if stokes_basis == "Linear": errmsg = errmsg_template.format("Circular", "Linear") raise NotImplementedError(errmsg) elif stokes_basis == "Sky": data = dict(I = self.__class__(self._stokes_data['RR'] + self._stokes_data['LL'], wcs = self.wcs, mask=self._mask['RR']&self._mask['LL']), Q = self.__class__(self._stokes_data['RL'] + self._stokes_data['RL'], wcs = self.wcs, mask=self._mask['RL']&self._mask['LR']), U = self.__class__(1j*(self._stokes_data['RL'] - self._stokes_data['LR']), wcs = self.wcs, mask=self._mask['RL']&self._mask['LR']), V = self.__class__(self._stokes_data['RR'] - self._stokes_data['LL'], wcs = self.wcs, mask=self._mask['RR']&self._mask['LL'])) elif stokes_basis == "Circular": data = self._stokes_data elif self.stokes_type == "Sky": if stokes_basis == "Linear": data = dict(XX = self.__class__(0.5*(self._stokes_data['I'] + self._stokes_data['Q']), wcs = self.wcs, mask=self._mask['I']&self._mask['Q']), XY = self.__class__(0.5*(self._stokes_data['U'] + 1j*self._stokes_data['V']), wcs = self.wcs, mask=self._mask['U']&self._mask['V']), YX = self.__class__(0.5*(self._stokes_data['U'] - 1j*self._stokes_data['V']), wcs = self.wcs, mask=self._mask['U']&self._mask['V']), YY = self.__class__(0.5*(self._stokes_data['I'] - self._stokes_data['Q']), wcs = self.wcs, mask=self._mask['I']&self._mask['Q'])) elif stokes_basis == "Circular": data = dict(RR = self.__class__(0.5*(self._stokes_data['I'] + self._stokes_data['V']), wcs = self.wcs, mask=self._mask['I']&self._mask['V']), RL = self.__class__(0.5*(self._stokes_data['Q'] + 1j*self._stokes_data['U']), wcs = self.wcs, mask=self._mask['Q']&self._mask['U']), LR = self.__class__(0.5*(self._stokes_data['Q'] - 1j*self._stokes_data['U']), wcs = self.wcs, mask=self._mask['Q']&self._mask['U']), LL = self.__class__(0.5*(self._stokes_data['I'] - self._stokes_data['V']), wcs = self.wcs, mask=self._mask['I']&self._mask['V'])) elif stokes_basis == "Sky": data = self._stokes_data return self._new_cube_with(stokes_data=data)
[docs] def with_spectral_unit(self, unit, **kwargs): stokes_data = {k: self._stokes_data[k].with_spectral_unit(unit, **kwargs) for k in self._stokes_data} return self._new_cube_with(stokes_data=stokes_data)
read = UnifiedReadWriteMethod(StokesSpectralCubeRead) write = UnifiedReadWriteMethod(StokesSpectralCubeWrite)