Source code for cosipy.image_deconvolution.exposure_table

import logging
logger = logging.getLogger(__name__)

import pandas as pd
from tqdm.autonotebook import tqdm
import numpy as np
import healpy as hp
from astropy.io import fits
import astropy.units as u

from cosipy.spacecraftfile import SpacecraftAttitudeMap

[docs]class SpacecraftAttitudeExposureTable(pd.DataFrame): """ A class to analyze exposure time per each spacecraft attitude Table columns are: - scatt_binning_index: int - healpix_index: list of tuple. Each tuple is (healpix_index_zpointing, healpix_index_xpointing). - zpointing: np.array of [l, b] in degrees. Array of z-pointings assigned to each scatt bin. - xpointing: np.array of [l, b] in degrees. Array of x-pointings assigned to each scatt bin. - zpointing_averaged: [l, b] in degrees. Averaged z-pointing in each scatt bin. - xpointing_averaged: [l, b] in degrees. Averaged x-pointing in each scatt bin. - delta_time: np.array of float in second. Exposure times for pointings assigned to each scatt bin. - exposure: float in second. total exposure for each scatt bin. - num_pointings: number of pointings assigned to each scatt bin. - bkg_group: index of the backgroud group. will be used in the data analysis. Attributes ---------- df : :py:class:`pd.DataFrame` pandas dataframe with the above columns nside : int Healpix NSIDE parameter. scheme : str, default 'ring' Healpix scheme. Either 'ring', 'nested'. """ def __init__(self, df, nside, scheme = 'ring'): super().__init__(pd.DataFrame(df)) self.nside = nside if scheme == 'ring' or scheme == 'nested': self.scheme = scheme else: logger.warning('The scheme should be "ring" or "nested" in SpacecraftAttitudeExposureTable. It will be set to "ring".') self.scheme = 'ring' def __eq__(self, other): for name in ['scatt_binning_index', 'healpix_index', 'exposure', 'num_pointings', 'bkg_group']: if not np.all(self[name] == other[name]): return False for name in ['delta_time', 'zpointing', 'xpointing', 'zpointing_averaged', 'xpointing_averaged']: for self_, other_ in zip(self[name], other[name]): if not np.all(self_ == other_): return False return (self.nside == other.nside) and (self.scheme == other.scheme)
[docs] @classmethod def from_orientation(cls, orientation, nside, scheme = 'ring', start = None, stop = None, min_exposure = None, min_num_pointings = None): """ Produce exposure table from orientation. Parameters ---------- orientation : :py:class:`cosipy.spacecraftfile.SpacecraftFile` Orientation nside : int Healpix NSIDE parameter. scheme : str, default 'ring' Healpix scheme. Either 'ring', 'nested'. start : :py:class:`astropy.time.Time` or None, default None Start time to analyze the orientation stop : :py:class:`astropy.time.Time` or None, default None Stop time to analyze the orientation min_exposure : float or None, default None Minimum exposure time required for each scatt bin min_num_pointings : int or None, default None Minimum number of pointings required for each scatt bin Returns ------- :py:class:`cosipy.spacecraftfile.SpacecraftAttitudeExposureTable` """ df = cls.analyze_orientation(orientation, nside, scheme, start, stop, min_exposure, min_num_pointings) new = cls(df, nside, scheme) return new
# GTI should be a mandary parameter
[docs] @classmethod def analyze_orientation(cls, orientation, nside, scheme = 'ring', start = None, stop = None, min_exposure = None, min_num_pointings = None): """ Produce pd.DataFrame from orientation. Parameters ---------- orientation : :py:class:`cosipy.spacecraftfile.SpacecraftFile` Orientation nside : int Healpix NSIDE parameter. scheme : str, default 'ring' Healpix scheme. Either 'ring', 'nested'. start : :py:class:`astropy.time.Time` or None, default None Start time to analyze the orientation stop : :py:class:`astropy.time.Time` or None, default None Stop time to analyze the orientation min_exposure : float or None, default None Minimum exposure time required for each scatt bin min_num_pointings : int or None, default None Minimum number of pointings required for each scatt bin Returns ------- :py:class:`pd.DataFrame` """ logger.info("angular resolution: ", hp.nside2resol(nside) * 180 / np.pi, "deg.") indices_healpix = [] # (idx_z, idx_x) delta_times = [] xpointings = [] # [l_x, b_x] zpointings = [] # [l_z, b_z] if start is not None and stop is not None: orientation = orientation.source_interval(start, stop) elif start is not None: logger.error("please specify the stop time") return elif stop is not None: logger.error("please specify the start time") return ori_time = orientation.get_time() logger.info("duration: ", (ori_time[-1] - ori_time[0]).to("day")) attitude = orientation.get_attitude() pointing_list = attitude.transform_to("galactic").as_axes() n_pointing = len(pointing_list[0]) x_1, x_2 = pointing_list[0][:-1], pointing_list[0][1:] l_x, b_x = 0.5 * (x_1.l.degree + x_2.l.degree), 0.5 * (x_1.b.degree + x_2.b.degree) z_1, z_2 = pointing_list[2][:-1], pointing_list[2][1:] l_z, b_z = 0.5 * (z_1.l.degree + z_2.l.degree), 0.5 * (z_1.b.degree + z_2.b.degree) if scheme == 'ring': nest = False elif scheme == 'nested': nest = True else: logger.warning('Warning: the scheme should be "ring" or "nested". It was set to "ring".') nest = False idx_x = hp.ang2pix(nside, l_x, b_x, nest=nest, lonlat=True) idx_z = hp.ang2pix(nside, l_z, b_z, nest=nest, lonlat=True) delta_time = (ori_time[1:] - ori_time[:-1]).to('s').value for i in tqdm(range(n_pointing - 1)): if (idx_z[i], idx_x[i]) in indices_healpix: idx = indices_healpix.index((idx_z[i], idx_x[i])) delta_times[idx].append(delta_time[i]) xpointings[idx].append([l_x[i], b_x[i]]) zpointings[idx].append([l_z[i], b_z[i]]) else: indices_healpix.append((idx_z[i], idx_x[i])) delta_times.append([delta_time[i]]) xpointings.append([[l_x[i], b_x[i]]]) zpointings.append([[l_z[i], b_z[i]]]) indices_scatt_binning = [i for i in range(len(indices_healpix))] # to numpy zpointings = [ np.array(_) for _ in zpointings] xpointings = [ np.array(_) for _ in xpointings] zpointings_averaged = [ cls._get_averaged_pointing(z, dt) for (z, dt) in zip(zpointings, delta_times) ] xpointings_averaged = [ cls._get_averaged_pointing(x, dt) for (x, dt) in zip(xpointings, delta_times) ] exposures = [ np.sum(np.array(_)) for _ in delta_times] num_pointings = [ len(_) for _ in delta_times] bkg_groups = [ 0 for i in delta_times] df = pd.DataFrame(data = {'scatt_binning_index': indices_scatt_binning, 'healpix_index': indices_healpix, 'zpointing': zpointings, 'xpointing': xpointings, 'zpointing_averaged': zpointings_averaged, 'xpointing_averaged': xpointings_averaged, 'delta_time': delta_times, 'exposure': exposures, 'num_pointings': num_pointings, 'bkg_group': bkg_groups}) if min_exposure is not None: df = df[df['exposure'] >= min_exposure] if min_num_pointings is not None: df = df[df['num_pointings'] >= min_num_pointings] if min_exposure is not None or min_num_pointings is not None: df['scatt_binning_index'] = [i for i in range(len(df))] return df
[docs] @classmethod def from_fits(cls, filename): """ Read exposure table from a fits file. Parameters ---------- filename : str Path to file Returns ------- :py:class:`cosipy.image_deconvolution.SpacecraftAttitudeExposureTable` """ infile = fits.open(filename) hdu = infile[1] if hdu.name != "EXPOSURETABLE": logger.error("cannot find EXPOSURETABLE") return indices_scatt_binning = hdu.data['scatt_binning_index'] indices_healpix = [ (z, x) for (z, x) in zip(hdu.data['healpix_index_z_pointing'], hdu.data['healpix_index_x_pointing']) ] zpointings = [ [ [l, b] for (l, b) in zip(z_l, z_b) ] for (z_l, z_b) in zip(hdu.data['zpointing_l'], hdu.data['zpointing_b']) ] zpointings = [ np.array(_) for _ in zpointings] xpointings = [ [ [l, b] for (l, b) in zip(x_l, x_b) ] for (x_l, x_b) in zip(hdu.data['xpointing_l'], hdu.data['xpointing_b']) ] xpointings = [ np.array(_) for _ in xpointings] zpointings_averaged = [ np.array([z_ave_l, z_ave_b]) for (z_ave_l, z_ave_b) in zip(hdu.data['zpointing_averaged_l'], hdu.data['zpointing_averaged_b']) ] xpointings_averaged = [ np.array([x_ave_l, x_ave_b]) for (x_ave_l, x_ave_b) in zip(hdu.data['xpointing_averaged_l'], hdu.data['xpointing_averaged_b']) ] delta_times = np.array(hdu.data['delta_time']) exposures = hdu.data['exposure'] num_pointings = hdu.data['num_pointings'] bkg_groups = hdu.data['bkg_group'] df = pd.DataFrame(data = {'scatt_binning_index': indices_scatt_binning, 'healpix_index': indices_healpix, 'zpointing': zpointings, 'xpointing': xpointings, 'zpointing_averaged': zpointings_averaged, 'xpointing_averaged': xpointings_averaged, 'delta_time': delta_times, 'exposure': exposures, 'num_pointings': num_pointings, 'bkg_group': bkg_groups}) nside = hdu.header['NSIDE'] scheme = hdu.header['SCHEME'] new = cls(df, nside, scheme) return new
[docs] def save_as_fits(self, filename, overwrite = False): """ Save exposure table as a fits file. Parameters ---------- filename : str Path to file overwrite : bool, default False """ # primary HDU primary_hdu = fits.PrimaryHDU() #exposure table names = ['scatt_binning_index', 'exposure', 'num_pointings', 'bkg_group'] formats = ['K', 'D', 'K', 'K'] units = ['', 's', '', ''] columns = [ fits.Column(name=names[i], array=self[names[i]].to_numpy(), format = formats[i], unit = units[i]) for i in range(len(names))] column_healpix_index_z_pointing = fits.Column(name='healpix_index_z_pointing', array=np.array([idx[0] for idx in self['healpix_index']]), format = 'K') column_healpix_index_x_pointing = fits.Column(name='healpix_index_x_pointing', array=np.array([idx[1] for idx in self['healpix_index']]), format = 'K') columns.append(column_healpix_index_z_pointing) columns.append(column_healpix_index_x_pointing) column_delta_time = fits.Column(name='delta_time', format='PD()', unit = 's', array=np.array(self['delta_time'].array, dtype=np.object_)) columns.append(column_delta_time) column_zpointing_l = fits.Column(name='zpointing_l', format='PD()', unit = 'degree', array=np.array([[pointing[0] for pointing in pointings] for pointings in self['zpointing']], dtype=np.object_)) columns.append(column_zpointing_l) column_zpointing_b = fits.Column(name='zpointing_b', format='PD()', unit = 'degree', array=np.array([[pointing[1] for pointing in pointings] for pointings in self['zpointing']], dtype=np.object_)) columns.append(column_zpointing_b) column_xpointing_l = fits.Column(name='xpointing_l', format='PD()', unit = 'degree', array=np.array([[pointing[0] for pointing in pointings] for pointings in self['xpointing']], dtype=np.object_)) columns.append(column_xpointing_l) column_xpointing_b = fits.Column(name='xpointing_b', format='PD()', unit = 'degree', array=np.array([[pointing[1] for pointing in pointings] for pointings in self['xpointing']], dtype=np.object_)) columns.append(column_xpointing_b) column_zpointing_averaged_l = fits.Column(name='zpointing_averaged_l', format='D', unit = 'degree', array=np.array([_[0] for _ in self['zpointing_averaged']])) columns.append(column_zpointing_averaged_l) column_zpointing_averaged_b = fits.Column(name='zpointing_averaged_b', format='D', unit = 'degree', array=np.array([_[1] for _ in self['zpointing_averaged']])) columns.append(column_zpointing_averaged_b) column_xpointing_averaged_l = fits.Column(name='xpointing_averaged_l', format='D', unit = 'degree', array=np.array([_[0] for _ in self['xpointing_averaged']])) columns.append(column_xpointing_averaged_l) column_xpointing_averaged_b = fits.Column(name='xpointing_averaged_b', format='D', unit = 'degree', array=np.array([_[1] for _ in self['xpointing_averaged']])) columns.append(column_xpointing_averaged_b) table_hdu = fits.BinTableHDU.from_columns(columns) table_hdu.name = 'exposuretable' table_hdu.header['nside'] = self.nside table_hdu.header['scheme'] = self.scheme #save file hdul = fits.HDUList([primary_hdu, table_hdu]) hdul.writeto(filename, overwrite = overwrite)
@classmethod def _get_averaged_pointing(cls, pointing, delta_time): """ Calculate an averaged pointing from given lists of pointings and exposure time on each pointing Parameters ---------- pointing : list of np.array List of pointings in degrees, e.g., [ np.array([l, b]), np.array([l, b]), ...] delta_time : list of float List of exposure time in seconds for each pointing, e.g, [ 1.0, 1.0, ...] Returns ------- :py:class:`np.array` Averaged pointing in degrees, as np.array([l, b]) """ averaged_vector = np.sum(hp.ang2vec(pointing.T[0], pointing.T[1], lonlat = True).T * delta_time, axis = (1)) averaged_vector /= np.linalg.norm(averaged_vector) averaged_l = hp.vec2ang(averaged_vector, lonlat = True)[0][0] averaged_b = hp.vec2ang(averaged_vector, lonlat = True)[1][0] averaged_pointing = np.array([averaged_l, averaged_b]) return averaged_pointing
[docs] def calc_pointing_trajectory_map(self): """ Calculate a 2-dimensional map showing exposure time for each spacecraft attitude. Returns ------- :py:class:`cosipy.spacecraft.SpacecraftAttitudeMap` Notes ----- The default axes in SpacecraftAttitudeMap is x- and y-pointings, but here the spacecraft attitude is described with z- and x-pointings. """ map_pointing_zx = SpacecraftAttitudeMap(nside = self.nside, scheme = self.scheme, coordsys = 'galactic', labels = ['z', 'x']) for hp_index, exposure in zip(self['healpix_index'], self['exposure']): map_pointing_zx[hp_index[0], hp_index[1]] = exposure * u.s return map_pointing_zx