from .PointSourceResponse import PointSourceResponse
from .DetectorResponse import DetectorResponse
from .ExtendedSourceResponse import ExtendedSourceResponse
from astromodels.core.model_parser import ModelParser
import matplotlib.pyplot as plt
from astropy.time import Time
from astropy.coordinates import SkyCoord
from astropy.units import Quantity
import astropy.units as u
from sparse import COO
from pathlib import Path
import numpy as np
import mhealpy as hp
from mhealpy import HealpixBase, HealpixMap
import glob
from scipy.special import erf
from yayc import Configurator
from scoords import SpacecraftFrame, Attitude
from histpy import Histogram, Axes, Axis, HealpixAxis
import h5py as h5
import os
import textwrap
import argparse
import logging
logger = logging.getLogger(__name__)
from copy import copy, deepcopy
import gzip
#from tqdm import tqdm
from tqdm.autonotebook import tqdm
import subprocess
import sys
import pathlib
import gc
[docs]class FullDetectorResponse(HealpixBase):
"""
Handles the multi-dimensional matrix that describes the
full all-sky response of the instrument.
You can access the :py:class:`DetectorResponse` at a given pixel using the ``[]``
operator. Alternatively you can obtain the interpolated reponse using
:py:func:`get_interp_response`.
"""
def __init__(self, *args, **kwargs):
# Overload parent init. Called in class methods.
pass
[docs] @classmethod
def open(cls, filename,Spectrumfile=None,norm="Linear" ,single_pixel = False,alpha=0,emin=90,emax=10000, pa_convention=None):
"""
Open a detector response file.
Parameters
----------
filename : str, :py:class:`~pathlib.Path`
Path to the response file (.h5 or .rsp)
Spectrumfile : str,
path to the input spectrum file used
for the simulation (optional).
norm : str,
type of normalisation : file (then specify also SpectrumFile)
,powerlaw, Mono or Linear
alpha : int,
if the normalisation is "powerlaw", value of the spectral index.
single_pixel : bool,
True if there is only one pixel and not full-sky.
emin,emax : float
emin/emax used in the simulation source file.
pa_convention : str, optional
Polarization convention of response ('RelativeX', 'RelativeY', or 'RelativeZ')
"""
filename = Path(filename)
if filename.suffix == ".h5":
return cls._open_h5(filename, pa_convention)
elif "".join(filename.suffixes[-2:]) == ".rsp.gz":
return cls._open_rsp(filename,Spectrumfile,norm ,single_pixel,alpha,emin,emax, pa_convention)
else:
raise ValueError(
"Unsupported file format. Only .h5 and .rsp.gz extensions are supported.")
@classmethod
def _open_h5(cls, filename, pa_convention=None):
"""
Open a detector response h5 file.
Parameters
----------
filename : str, :py:class:`~pathlib.Path`
Path to HDF5 file
pa_convention : str, optional
Polarization convention of response ('RelativeX', 'RelativeY', or 'RelativeZ')
"""
new = cls(filename)
new._file = h5.File(filename, mode='r')
new._drm = new._file['DRM']
new._unit = u.Unit(new._drm.attrs['UNIT'])
try:
new._sparse = new._drm.attrs['SPARSE']
except KeyError:
new._sparse = True
# Axes
axes = []
for axis_label in new._drm["AXES"]:
axis = new._drm['AXES'][axis_label]
axis_type = axis.attrs['TYPE']
if axis_type == 'healpix':
axes += [HealpixAxis(edges=np.array(axis),
nside=axis.attrs['NSIDE'],
label=axis_label,
scheme=axis.attrs['SCHEME'],
coordsys=SpacecraftFrame())]
else:
axes += [Axis(np.array(axis) * u.Unit(axis.attrs['UNIT']),
scale=axis_type,
label=axis_label)]
new._axes = Axes(axes)
# Init HealpixMap (local coordinates, main axis)
HealpixBase.__init__(new,
base=new.axes['NuLambda'],
coordsys=SpacecraftFrame())
new.pa_convention = pa_convention
if 'Pol' in new._axes.labels and not (pa_convention == 'RelativeX' or pa_convention == 'RelativeY' or pa_convention == 'RelativeZ'):
raise RuntimeError("Polarization angle convention of response ('RelativeX', 'RelativeY', or 'RelativeZ') must be provided")
return new
@classmethod
def _open_rsp(cls, filename, Spectrumfile=None,norm="Linear" ,single_pixel = False,alpha=0,emin=90,emax=10000, pa_convention=None):
"""
Open a detector response rsp file.
Parameters
----------
filename : str, :py:class:`~pathlib.Path`
Path to rsp file
Spectrumfile : str,
path to the input spectrum file used
for the simulation (optional).
norm : str,
type of normalisation : file (then specify also SpectrumFile)
,powerlaw, Mono or Linear
alpha : int,
if the normalisation is "powerlaw", value of the spectral index.
single_pixel : bool,
True if there is only one pixel and not full-sky.
emin,emax : float
emin/emax used in the simulation source file.
pa_convention : str, optional
Polarization convention of response ('RelativeX', 'RelativeY', or 'RelativeZ')
"""
axes_names = []
axes_edges = []
axes_types = []
sparse = None
# get the header infos of the rsp file (nsim,area,bin_edges,etc...)
with gzip.open(filename, "rt") as file:
for n, line in enumerate(file):
line = line.split()
if len(line) == 0:
continue
key = line[0]
if key == 'TS':
nevents_sim = int(line[1])
elif key == 'SA':
area_sim = float(line[1])
elif key == "SP" :
try :
norm = str(line[1])
except :
logger.info(f"norm not found in the file ! We assume {norm}")
if norm =="Linear" :
emin = int(line[2])
emax = int(line[3])
if norm == "Gaussian" :
Gauss_mean = float(line[2])
Gauss_sig = float(line[3])
Gauss_cutoff = float(line[4])
elif key == "MS":
if line[1] == "true" :
sparse = True
if line[1] == "false" :
sparse = False
elif key == 'AN':
axes_names += [" ".join(line[1:])]
elif key == 'AD':
if axes_types[-1] == "FISBEL":
raise RuntimeError("FISBEL binning not currently supported")
elif axes_types[-1] == "HEALPix":
if line[2] != "RING":
raise RuntimeError(f"Scheme {line[2]} not supported")
if line[1] == '-1':
# Single bin axis --i.e. all-sky
axes_edges.append(-1)
else:
nside = int(2**int(line[1]))
axes_edges.append(int(12*nside**2))
else:
axes_edges.append(np.array(line[1:], dtype='float'))
elif key == 'AT':
axes_types += [line[2]]
elif key == 'RD':
break
elif key == "StartStream":
nbins = int(line[1])
break
# Check axes names and relabel
if np.array_equal(axes_names, ['"Initial energy [keV]"', '"#nu [deg]" "#lambda [deg]"', '"Polarization Angle [deg]"', '"Measured energy [keV]"', '"#phi [deg]"', '"#psi [deg]" "#chi [deg]"', '"#sigma [deg]" "#tau [deg]"', '"Distance [cm]"']):
has_polarization = True
labels = ("Ei", "NuLambda", "Pol", "Em", "Phi", "PsiChi", "SigmaTau", "Dist")
elif np.array_equal(axes_names, ['"Initial energy [keV]"', '"#nu [deg]" "#lambda [deg]"', '"Measured energy [keV]"', '"#phi [deg]"', '"#psi [deg]" "#chi [deg]"', '"#sigma [deg]" "#tau [deg]"', '"Distance [cm]"']):
has_polarization = False
labels = ("Ei", "NuLambda", "Em", "Phi", "PsiChi", "SigmaTau", "Dist")
else:
raise InputError("Unknown response format")
#check if the type of spectrum is known
assert norm=="powerlaw" or norm=="Mono" or norm=="Linear" or norm=="Gaussian",f"unknown normalisation ! {norm}"
#check the number of simulated events is not 0
assert nevents_sim != 0,"number of simulated events is 0 !"
logger.info("normalisation is {0}".format(norm))
if sparse == None :
logger.info("Sparse paramater not found in the file : We assume this is a non sparse matrice !")
sparse = False
else :
logger.info("Sparse matrice ? {0}".format(sparse))
edges = ()
for axis_edges, axis_type in zip(axes_edges, axes_types):
if axis_type == 'HEALPix':
if axis_edges == -1:
# Single bin axis --i.e. all-sky
edges += ([0,1],)
else:
edges += (np.arange(axis_edges+1),)
elif axis_type == "FISBEL":
raise RuntimeError("FISBEL binning not currently supported")
else:
edges += (axis_edges,)
if sparse :
axes = Axes(edges, labels=labels)
else :
axes = Axes(edges[:-2], labels=labels[:-2])
if sparse :
# Need to get number of lines for progress bar.
# First try fast method for unix-based systems:
try:
proc=subprocess.Popen('gunzip -c %s | wc -l' %filename, \
shell=True, stdout=subprocess.PIPE)
nlines = int(proc.communicate()[0])
# If fast method fails, use long method, which should work in all cases.
except:
logger.info("Initial attempt failed.")
logger.info("Using long method...")
nlines = sum(1 for _ in gzip.open(filename,"rt"))
# Preallocate arrays
coords = np.empty([axes.ndim, nlines], dtype=np.uint32)
data = np.empty(nlines, dtype=np.uint32)
# Calculate the memory usage in Gigabytes
memory_size = ((nlines * data.itemsize)+(axes.ndim*nlines*coords.itemsize))/(1024*1024*1024)
logger.info(f"Estimated RAM you need to read the file : {memory_size} GB")
else :
nlines = nbins
# Preallocate arrays
data = np.empty(nlines, dtype=np.uint32)
# Calculate the memory usage in Gigabytes
memory_size = (nlines * data.itemsize)/(1024*1024*1024)
logger.info(f"Estimated RAM you need to read the file : {memory_size} GB")
# Loop
sbin = 0
# read the rsp file and get the bin number and counts
with gzip.open(filename, "rt") as file:
#sparse case
if sparse :
progress_bar = tqdm(file, total=nlines, desc="Progress", unit="line")
for line in progress_bar:
line = line.split()
if len(line) == 0:
continue
key = line[0]
if key == 'RD':
b = np.array(line[1:-1], dtype=np.uint32)
c = int(line[-1])
coords[:, sbin] = b
data[sbin] = c
sbin += 1
if sbin%10e6 == 0 :
progress_bar.update(10e6)
progress_bar.close()
nbins_sparse = sbin
#non sparse case
else :
binLine = False
for line in file:
line = line.split()
if len(line) == 0:
continue
if line[0] == "StartStream" :
binLine = True
continue
if binLine :
#check we have same number of bin than values read
if len(line)!=nbins :
logger.info("nb of bin content read ({0}) != nb of bins {1}".format(len(line),nbins))
sys.exit()
for i in tqdm(range(nbins), desc="Processing", unit="bin"):
data[i] = line[i]
# we reshape the bincontent to the response matrice dimension
# note that for non sparse matrice SigmaTau and Dist are not used
data = np.reshape(data,tuple(axes.nbins),order="F")
break
logger.info("response file read ! Now we create the histogram and weight in order to "+
"get the effective area")
# create histpy histogram
if sparse :
dr = Histogram(axes, contents=COO(coords=coords[:, :nbins_sparse], data= data[:nbins_sparse], shape = tuple(axes.nbins)))
else :
dr = Histogram(axes, contents=data)
# Weight to get effective area
ewidth = dr.axes['Ei'].widths
ecenters = dr.axes['Ei'].centers
#print(ewidth)
#print(ecenters)
#if we have one single bin, treat the gaussian norm like the mono one
#also check that the gaussian spectrum is fully contained in that bin
if len(ewidth) == 1 and norm == "Gaussian":
edges = dr.axes['Ei'].edges
gauss_int = 0.5 * (1 + erf( (edges[0]-Gauss_mean)/(4*np.sqrt(2)) ) ) + 0.5 * (1 + erf( (edges[1]-Gauss_mean)/(4*np.sqrt(2)) ) )
assert gauss_int == 1, "The gaussian spectrum is not fully contained in this single bin !"
logger.info("Only one bin so we will use the Mono normalisation")
norm ="Mono"
if Spectrumfile is not None and norm=="file":
logger.info("normalisation : spectrum file")
# From spectrum file
spec = pd.read_csv(Spectrumfile, sep=" ")
spec = spec.iloc[:-1]
hspec = Histogram([h_spec.axes[1]])
hspec[:] = np.interp(hspec.axis.centers,
spec.iloc[:, 0].to_numpy(),
spec.iloc[:, 1].to_numpy(),
left=0,
right=0) * ewidth
hspec /= np.sum(hspec)
nperchannel_norm = hspec[:]
elif norm=="powerlaw":
logger.info("normalisation : powerlaw with index {0} with energy range [{1}-{2}]keV".format(alpha,emin,emax))
# From powerlaw
e_lo = dr.axes['Ei'].lower_bounds
e_hi = dr.axes['Ei'].upper_bounds
e_lo = np.minimum(emax, e_lo)
e_hi = np.minimum(emax, e_hi)
e_lo = np.maximum(emin, e_lo)
e_hi = np.maximum(emin, e_hi)
if alpha == 1:
nperchannel_norm = np.log(e_hi/e_low) / np.log(emax/emin)
else:
a = 1 - alpha
nperchannel_norm = (e_hi**a - e_lo**a) / (emax**a - emin**a)
elif norm =="Linear" :
logger.info("normalisation : linear with energy range [{0}-{1}]".format(emin,emax))
nperchannel_norm = ewidth / (emax-emin)
elif norm=="Mono" :
logger.info("normalisation : mono")
nperchannel_norm = np.array([1.])
elif norm == "Gaussian" :
raise NotImplementedError("Gausssian norm for multiple bins not yet implemented")
nperchannel = nperchannel_norm * nevents_sim
# Full-sky?
if not single_pixel:
# Assumming all FISBEL pixels have the same area
nperchannel /= dr.axes['NuLambda'].nbins
# Area
counts2area = area_sim / nperchannel
dr_area = dr * dr.expand_dims(counts2area, 'Ei')
#delete the array of data in order to release some memory
del data
# end of weight now we create the .h5 structure
# remove the .h5 file if it already exist
try:
os.remove(filename.replace(".rsp.gz", "_nside{0}.area.h5".format(nside)))
except:
pass
# create a .h5 file with the good structure
filename = Path(str(filename).replace(".rsp.gz","_nside{0}.area.h5".format(nside)))
cls._write_h5(dr_area, filename)
new = cls(filename)
new._file = h5.File(filename, mode='r')
new._drm = new._file['DRM']
new._unit = u.Unit(new._drm.attrs['UNIT'])
new._sparse = new._drm.attrs['SPARSE']
# Axes
axes = []
for axis_label in new._drm["AXES"]:
axis = new._drm['AXES'][axis_label]
axis_type = axis.attrs['TYPE']
if axis_type == 'healpix':
axes += [HealpixAxis(edges=np.array(axis),
nside=axis.attrs['NSIDE'],
label=axis_label,
scheme=axis.attrs['SCHEME'],
coordsys=SpacecraftFrame())]
else:
axes += [Axis(np.array(axis) * u.Unit(axis.attrs['UNIT']),
scale=axis_type,
label=axis_label)]
new._axes = Axes(axes)
# Init HealpixMap (local coordinates, main axis)
HealpixBase.__init__(new,
base=new.axes['NuLambda'],
coordsys=SpacecraftFrame())
new.pa_convention = pa_convention
if 'Pol' in new._axes.labels and not (pa_convention == 'RelativeX' or pa_convention == 'RelativeY' or pa_convention == 'RelativeZ'):
raise RuntimeError("Polarization angle convention of response ('RelativeX', 'RelativeY', or 'RelativeZ') must be provided")
return new
@staticmethod
def _write_h5(dr_area, filename):
"""
Write a Histogram containing the response into a HDF5 file response format
Parameters
----------
dr_area : Histogram,
Histogram containing the response matrix in unit of differential area
filename : str, :py:class:`~pathlib.Path`
Path to .h5 file
"""
npix = dr_area.axes['NuLambda'].nbins
nside = HealpixBase(npix = npix).nside
has_polarization = "Pol" in dr_area.axes.labels
sparse = dr_area.is_sparse
f = h5.File(filename, mode='w')
drm = f.create_group('DRM')
# Header
drm.attrs['UNIT'] = 'cm2'
axis_description = {'Ei': "Initial simulated energy",
'NuLambda': "Location of the simulated source in the spacecraft coordinates",
'Pol': "Polarization angle",
'Em': "Measured energy",
'Phi': "Compton angle",
'PsiChi': "Location in the Compton Data Space",
'SigmaTau': "Electron recoil angle",
'Dist': "Distance from first interaction"
}
#keep the same dimension order of the data
axes_to_write = ['NuLambda', 'Ei']
if has_polarization:
axes_to_write += ['Pol']
axes_to_write += ['Em', 'Phi', 'PsiChi']
if sparse:
drm.attrs['SPARSE'] = True
# singletos. Save space in dense
axes_to_write += ['SigmaTau', 'Dist']
else:
drm.attrs['SPARSE'] = False
axes = drm.create_group('AXES', track_order=True)
for axis in dr_area.axes[axes_to_write]:
axis_dataset = axes.create_dataset(axis.label,
data=axis.edges)
if axis.label in ['NuLambda', 'PsiChi', 'SigmaTau']:
# HEALPix
axis_dataset.attrs['TYPE'] = 'healpix'
axis_dataset.attrs['NSIDE'] = nside
axis_dataset.attrs['SCHEME'] = 'ring'
else:
# 1D
axis_dataset.attrs['TYPE'] = axis.axis_scale
if axis.label in ['Ei', 'Em']:
axis_dataset.attrs['UNIT'] = 'keV'
axis_dataset.attrs['TYPE'] = 'log'
elif axis.label in ['Phi', 'Pol']:
axis_dataset.attrs['UNIT'] = 'deg'
axis_dataset.attrs['TYPE'] = 'linear'
elif axis.label in ['Dist']:
axis_dataset.attrs['UNIT'] = 'cm'
axis_dataset.attrs['TYPE'] = 'linear'
else:
raise ValueError("Shouldn't happend")
axis_dataset.attrs['DESCRIPTION'] = axis_description[axis.label]
# sparse matrice
if sparse:
progress_bar = tqdm(total=npix, desc="Progress", unit="nbpixel")
# Contents. Sparse arrays
coords = drm.create_dataset('BIN_NUMBERS',
(npix,),
dtype=h5.vlen_dtype(int),
compression="gzip")
data = drm.create_dataset('CONTENTS',
(npix,),
dtype=h5.vlen_dtype(float),
compression="gzip")
for b in range(npix):
# print(f"{b}/{npix}")
pix_slice = dr_area[{'NuLambda': b}]
coords[b] = pix_slice.coords.flatten()
data[b] = pix_slice.data
progress_bar.update(1)
progress_bar.close()
# non sparse
else:
if has_polarization == True:
rsp_axes = [1,0,2,3,4,5]
else:
rsp_axes = [1,0,2,3,4]
data = drm.create_dataset('CONTENTS',
data=np.transpose(dr_area.contents, axes = rsp_axes),
compression="gzip")
#close the .h5 file in write mode in order to reopen it in read mode after
f.close()
@property
def is_sparse(self):
return self._sparse
@property
def ndim(self):
"""
Dimensionality of detector response matrix.
Returns
-------
int
"""
return self.axes.ndim
@property
def axes(self):
"""
List of axes.
Returns
-------
:py:class:`histpy.Axes`
"""
return self._axes
@property
def unit(self):
"""
Physical unit of the contents of the detector reponse.
Returns
-------
:py:class:`astropy.units.Unit`
"""
return self._unit
def __getitem__(self, pix):
if not isinstance(pix, (int, np.integer)) or pix < 0 or not pix < self.npix:
raise IndexError("Pixel number out of range, or not an integer")
#check if we have sparse matrice or not
if self._sparse:
coords = np.reshape(
self._file['DRM']['BIN_NUMBERS'][pix], (self.ndim-1, -1))
data = np.array(self._file['DRM']['CONTENTS'][pix])
return DetectorResponse(self.axes[1:],
contents=COO(coords=coords,
data=data,
shape=tuple(self.axes.nbins[1:])),
unit=self.unit)
else :
data = self._file['DRM']['CONTENTS'][pix]
return DetectorResponse(self.axes[1:],
contents=data, unit=self.unit)
[docs] def close(self):
"""
Close the HDF5 file containing the response
"""
self._file.close()
def __enter__(self):
"""
Start a context manager
"""
return self
def __exit__(self, type, value, traceback):
"""
Exit a context manager
"""
self.close()
@property
def filename(self):
"""
Path to on-disk file containing DetectorResponse
Returns
-------
:py:class:`~pathlib.Path`
"""
return Path(self._file.filename)
[docs] def get_interp_response(self, coord):
"""
Get the bilinearly interpolated response at a given coordinate location.
Parameters
----------
coord : :py:class:`astropy.coordinates.SkyCoord`
Coordinate in the :py:class:`.SpacecraftFrame`
Returns
-------
:py:class:`DetectorResponse`
"""
pixels, weights = self.get_interp_weights(coord)
dr = DetectorResponse(self.axes[1:],
sparse=self._sparse,
unit=self.unit)
for p, w in zip(pixels, weights):
dr += self[p]*w
return dr
[docs] def get_point_source_response(self,
exposure_map = None,
coord = None,
scatt_map = None,
Earth_occ = True):
"""
Convolve the all-sky detector response with exposure for a source at a given
sky location.
Provide either a exposure map (aka dweel time map) or a combination of a
sky coordinate and a spacecraft attitude map.
Parameters
----------
exposure_map : :py:class:`mhealpy.HealpixMap`
Effective time spent by the source at each pixel location in spacecraft coordinates
coord : :py:class:`astropy.coordinates.SkyCoord`
Source coordinate
scatt_map : :py:class:`SpacecraftAttitudeMap`
Spacecraft attitude map
Earth_occ : bool, optional
Option to include Earth occultation in the respeonce.
Default is True, in which case you can only pass one
coord, which must be the same as was used for the scatt map.
Returns
-------
:py:class:`PointSourceResponse`
"""
# TODO: deprecate exposure_map in favor of coords + scatt map for both local
# and interntial coords
if Earth_occ == True:
if coord != None:
if coord.size > 1:
raise ValueError("For Earth occultation you must use the same coordinate as was used for the scatt map!")
if exposure_map is not None:
if not self.conformable(exposure_map):
raise ValueError(
"Exposure map has a different grid than the detector response")
psr = PointSourceResponse(self.axes[1:],
sparse=self._sparse,
unit=u.cm*u.cm*u.s)
for p in range(self.npix):
if exposure_map[p] != 0:
psr += self[p]*exposure_map[p]
return psr
else:
# Rotate to inertial coordinates
if coord is None or scatt_map is None:
raise ValueError("Provide either exposure map or coord + scatt_map")
if isinstance(coord.frame, SpacecraftFrame):
raise ValueError("Local coordinate + scatt_map not currently supported")
if self.is_sparse:
raise ValueError("Coord + scatt_map currently only supported for dense responses")
axis = "PsiChi"
coords_axis = Axis(np.arange(coord.size+1), label = 'coords')
psr = Histogram([coords_axis] + list(deepcopy(self.axes[1:])),
unit = self.unit * scatt_map.unit)
psr.axes[axis].coordsys = coord.frame
for i,(pixels, exposure) in \
enumerate(zip(scatt_map.contents.coords.transpose(),
scatt_map.contents.data)):
#gc.collect() # HDF5 cache issues
att = Attitude.from_axes(x = scatt_map.axes['x'].pix2skycoord(pixels[0]),
y = scatt_map.axes['y'].pix2skycoord(pixels[1]))
coord.attitude = att
#TODO: Change this to interpolation
loc_nulambda_pixels = np.array(self.axes['NuLambda'].find_bin(coord),
ndmin = 1)
dr_pix = Histogram.concatenate(coords_axis, [self[i] for i in loc_nulambda_pixels])
dr_pix.axes['PsiChi'].coordsys = SpacecraftFrame(attitude = att)
self._sum_rot_hist(dr_pix, psr, exposure, coord, self.pa_convention)
# Convert to PSR
psr = tuple([PointSourceResponse(psr.axes[1:],
contents = data,
sparse = psr.is_sparse,
unit = psr.unit)
for data in psr[:]])
if coord.size == 1:
return psr[0]
else:
return psr
def _setup_extended_source_response_params(self, coordsys, nside_image, nside_scatt_map):
"""
Validate coordinate system and setup NSIDE parameters for extended source response generation.
Parameters
----------
coordsys : str
Coordinate system to be used (currently only 'galactic' is supported)
nside_image : int or None
NSIDE parameter for the image reconstruction.
If None, uses the full detector response's NSIDE.
nside_scatt_map : int or None
NSIDE parameter for scatt map generation.
If None, uses the full detector response's NSIDE.
Returns
-------
tuple
(coordsys, nside_image, nside_scatt_map) : validated parameters
"""
if coordsys != 'galactic':
raise ValueError(f'The coordsys {coordsys} not currently supported')
if nside_image is None:
nside_image = self.nside
if nside_scatt_map is None:
nside_scatt_map = self.nside
return coordsys, nside_image, nside_scatt_map
[docs] def get_point_source_response_per_image_pixel(self, ipix_image, orientation, coordsys = 'galactic', nside_image = None, nside_scatt_map = None, Earth_occ = True):
"""
Generate point source response for a specific HEALPix pixel by convolving
the all-sky detector response with exposure.
Parameters
----------
ipix_image : int
HEALPix pixel index
orientation : cosipy.spacecraftfile.SpacecraftFile
Spacecraft attitude information
coordsys : str, default 'galactic'
Coordinate system (currently only 'galactic' is supported)
nside_image : int, optional
NSIDE parameter for image reconstruction.
If None, uses the detector response's NSIDE.
nside_scatt_map : int, optional
NSIDE parameter for scatt map generation.
If None, uses the detector response's NSIDE.
Earth_occ : bool, default True
Whether to include Earth occultation in the response
Returns
-------
:py:class:`PointSourceResponse`
Point source response for the specified pixel
"""
coordsys, nside_image, nside_scatt_map = self._setup_extended_source_response_params(coordsys, nside_image, nside_scatt_map)
image_axes = HealpixAxis(nside = nside_image, coordsys = coordsys, scheme='ring', label = 'NuLambda') # The label should be 'lb' in the future
coord = image_axes.pix2skycoord(ipix_image)
scatt_map = orientation.get_scatt_map(target_coord = coord,
nside = nside_scatt_map,
scheme='ring',
coordsys=coordsys,
earth_occ=Earth_occ)
psr = self.get_point_source_response(coord = coord, scatt_map = scatt_map, Earth_occ = Earth_occ)
return psr
[docs] def get_extended_source_response(self, orientation, coordsys = 'galactic', nside_image = None, nside_scatt_map = None, Earth_occ = True):
"""
Generate extended source response by convolving the all-sky detector
response with exposure over the entire sky.
Parameters
----------
orientation : cosipy.spacecraftfile.SpacecraftFile
Spacecraft attitude information
coordsys : str, default 'galactic'
Coordinate system (currently only 'galactic' is supported)
nside_image : int, optional
NSIDE parameter for image reconstruction.
If None, uses the detector response's NSIDE.
nside_scatt_map : int, optional
NSIDE parameter for scatt map generation.
If None, uses the detector response's NSIDE.
Earth_occ : bool, default True
Whether to include Earth occultation in the response
Returns
-------
:py:class:`ExtendedSourceResponse`
Extended source response covering the entire sky
"""
coordsys, nside_image, nside_scatt_map = self._setup_extended_source_response_params(coordsys, nside_image, nside_scatt_map)
axes = [HealpixAxis(nside = nside_image, coordsys = coordsys, scheme='ring', label = 'NuLambda')] # The label should be 'lb' in the future
axes += list(self.axes[1:])
axes[-1].coordsys = coordsys
extended_source_response = ExtendedSourceResponse(axes, unit = u.Unit("cm2 s"))
for ipix in tqdm(range(hp.nside2npix(nside_image))):
psr = self.get_point_source_response_per_image_pixel(ipix, orientation, coordsys = coordsys,
nside_image = nside_image, nside_scatt_map = nside_scatt_map, Earth_occ = Earth_occ)
extended_source_response[ipix] = psr.contents
return extended_source_response
[docs] def merge_psr_to_extended_source_response(self, basename, coordsys = 'galactic', nside_image = None):
"""
Create extended source response by merging multiple point source responses.
Reads point source response files matching the pattern `basename` + index + file_extension.
For example, with basename='histograms/hist_', filenames are expected to be like 'histograms/hist_00001.hdf5'.
Parameters
----------
basename : str
Base filename pattern for point source response files
coordsys : str, default 'galactic'
Coordinate system (currently only 'galactic' is supported)
nside_image : int, optional
NSIDE parameter for image reconstruction.
If None, uses the detector response's NSIDE.
Returns
-------
:py:class:`ExtendedSourceResponse`
Combined extended source response
"""
coordsys, nside_image, _ = self._setup_extended_source_response_params(coordsys, nside_image, None)
psr_files = glob.glob(basename + "*")
if not psr_files:
raise FileNotFoundError(f"No files found matching pattern {basename}*")
axes = [HealpixAxis(nside = nside_image, coordsys = coordsys, scheme='ring', label = 'NuLambda')] # The label should be 'lb' in the future
axes += list(self.axes[1:])
axes[-1].coordsys = coordsys
extended_source_response = ExtendedSourceResponse(axes, unit = u.Unit("cm2 s"))
filled_pixels = []
for filename in psr_files:
ipix = int(filename[len(basename):].split(".")[0])
psr = Histogram.open(filename)
extended_source_response[ipix] = psr.contents
filled_pixels.append(ipix)
expected_pixels = set(range(extended_source_response.axes[0].npix))
if set(filled_pixels) != expected_pixels:
raise ValueError(f"Missing pixels in the response files. Expected {extended_source_response.axes[0].npix} pixels, got {len(filled_pixels)} pixels")
return extended_source_response
@staticmethod
def _sum_rot_hist(h, h_new, exposure, coord, pa_convention, axis = "PsiChi"):
"""
Rotate a histogram with HealpixAxis h into the grid of h_new, and sum
it up with the weight of exposure.
Meant to rotate the PsiChi of a CDS from local to galactic
"""
axis_id = h.axes.label_to_index(axis)
old_axes = h.axes
new_axes = h_new.axes
old_axis = h.axes[axis_id]
new_axis = h_new.axes[axis_id]
# Convolve
# TODO: Change this to interpolation (pixels + weights)
old_pixels = old_axis.find_bin(new_axis.pix2skycoord(np.arange(new_axis.nbins)))
if 'Pol' in h.axes.labels and h_new.axes[axis].coordsys.name != 'spacecraftframe':
if coord.size > 1:
raise ValueError("For polarization, only a single source coordinate is supported")
from cosipy.polarization.polarization_angle import PolarizationAngle
from cosipy.polarization.conventions import IAUPolarizationConvention
pol_axis_id = h.axes.label_to_index('Pol')
old_pol_axis = h.axes[pol_axis_id]
new_pol_axis = h_new.axes[pol_axis_id]
old_pol_indices = []
for i in range(h_new.axes['Pol'].nbins):
pa = PolarizationAngle(h_new.axes['Pol'].centers.to_value(u.deg)[i] * u.deg, coord.transform_to('icrs'), convention=IAUPolarizationConvention())
pa_old = pa.transform_to(pa_convention, attitude=coord.attitude)
if pa_old.angle.deg == 180.:
pa_old = PolarizationAngle(0. * u.deg, coord, convention=IAUPolarizationConvention())
old_pol_indices.append(old_pol_axis.find_bin(pa_old.angle))
old_pol_indices = np.array(old_pol_indices)
# NOTE: there are some pixels that are duplicated, since the center 2 pixels
# of the original grid can land within the boundaries of a single pixel
# of the target grid. The following commented code fixes this, but it's slow, and
# the effect is very small, so maybe not worth it
# nulambda_npix = h.axes['NuLamnda'].nbins
# new_norm = np.zeros(shape = nulambda_npix)
# for p in old_pixels:
# h_slice = h[{axis:p}]
# norm_rot += np.sum(h_slice, axis = tuple(np.arange(1, h_slice.ndim)))
# old_norm = np.sum(h, axis = tuple(np.arange(1, h.ndim)))
# norm_corr = h.expand_dims(norm / norm_rot, "NuLambda")
for old_pix,new_pix in zip(old_pixels,range(new_axis.npix)):
#h_new[{axis:new_pix}] += exposure * h[{axis: old_pix}] # * norm_corr
# The following code does the same than the code above, but is faster
if not 'Pol' in h.axes.labels:
old_index = (slice(None),)*axis_id + (old_pix,)
new_index = (slice(None),)*axis_id + (new_pix,)
h_new[new_index] += exposure * u.s * h[old_index] # * norm_corr
else:
for old_pol_bin,new_pol_bin in zip(old_pol_indices,range(new_pol_axis.nbins)):
if pol_axis_id < axis_id:
old_index = (slice(None),)*pol_axis_id + (old_pol_bin,) + (slice(None),)*(axis_id-pol_axis_id-1) + (old_pix,)
new_index = (slice(None),)*pol_axis_id + (new_pol_bin,) + (slice(None),)*(axis_id-pol_axis_id-1) + (new_pix,)
else:
old_index = (slice(None),)*axis_id + (old_pix,) + (slice(None),)*(pol_axis_id-axis_id-1) + (old_pol_bin,)
new_index = (slice(None),)*axis_id + (new_pix,) + (slice(None),)*(pol_axis_id-axis_id-1) + (new_pol_bin,)
h_new[new_index] += exposure * u.s * h[old_index] # * norm_corr
def __str__(self):
return f"{self.__class__.__name__}(filename = '{self.filename.resolve()}')"
def __repr__(self):
output = (f"FILENAME: '{self.filename.resolve()}'\n"
f"AXES:\n")
for naxis, axis in enumerate(self.axes):
if naxis == 0:
description = "Location of the simulated source in the spacecraft coordinates"
else:
description = self._drm['AXES'][axis.label].attrs['DESCRIPTION']
output += (f" {axis.label}:\n"
f" DESCRIPTION: '{description}'\n")
if isinstance(axis, HealpixAxis):
output += (f" TYPE: 'healpix'\n"
f" NPIX: {axis.npix}\n"
f" NSIDE: {axis.nside}\n"
f" SCHEME: '{axis.scheme}'\n")
else:
output += (f" TYPE: '{axis.axis_scale}'\n"
f" UNIT: '{axis.unit}'\n"
f" NBINS: {axis.nbins}\n"
f" EDGES: [{', '.join([str(e) for e in axis.edges])}]\n")
return output
def _repr_pretty_(self, p, cycle):
if cycle:
p.text(str(self))
else:
p.text(repr(self))
def cosi_response(argv=None):
"""
Print the content of a detector response to stdout.
"""
# Parse arguments from commandline
apar = argparse.ArgumentParser(
usage=textwrap.dedent(
"""
%(prog)s [--help] <command> [<args>] <filename> [<options>]
"""),
description=textwrap.dedent(
"""
Quick view of the information contained in a response file
%(prog)s --help
%(prog)s dump header [FILENAME]
%(prog)s dump aeff [FILENAME] --lon [LON] --lat [LAT]
%(prog)s dump expectation [FILENAME] --config [CONFIG]
%(prog)s plot aeff [FILENAME] --lon [LON] --lat [LAT]
%(prog)s plot dispersion [FILENAME] --lon [LON] --lat [LAT]
%(prog)s plot expectation [FILENAME] --lon [LON] --lat [LAT]
Arguments:
- header: Response header and axes information
- aeff: Effective area
- dispersion: Energy dispection matrix
- expectation: Expected number of counts
"""),
formatter_class=argparse.RawTextHelpFormatter)
apar.add_argument('command',
help=argparse.SUPPRESS)
apar.add_argument('args', nargs='*',
help=argparse.SUPPRESS)
apar.add_argument('filename',
help="Path to instrument response")
apar.add_argument('--lon',
help="Longitude in sopacecraft coordinates. e.g. '11deg'")
apar.add_argument('--lat',
help="Latitude in sopacecraft coordinates. e.g. '10deg'")
apar.add_argument('--output', '-o',
help="Save output to file. Default: stdout")
apar.add_argument('--config', '-c',
help="Path to config file describing exposure and source charateristics.")
apar.add_argument('--config-override', dest='override',
help="Override option in config file")
args = apar.parse_args(argv)
# Config
if args.config is None:
config = Configurator()
else:
config = Configurator.open(args.config)
if args.override is not None:
config.override(args.override)
# Get info
with FullDetectorResponse.open(args.filename) as response:
# Commands and functions
def get_drm():
lat = Quantity(args.lat)
lon = Quantity(args.lon)
loc = SkyCoord(lon=lon, lat=lat, frame=SpacecraftFrame())
return response.get_interp_response(loc)
def get_expectation():
# Exposure map
exposure_map = HealpixMap(base=response,
unit=u.s,
coordsys=SpacecraftFrame())
ti = Time(config['exposure:time_i'])
tf = Time(config['exposure:time_f'])
dt = (tf-ti).to(u.s)
exposure_map[:4] = dt/4
logger.warning(f"Spacecraft file not yet implemented, faking source on "
f"axis from {ti} to {tf} ({dt:.2f})")
# Point source response
psr = response.get_point_source_response(exposure_map)
# Spectrum
model = ModelParser(model_dict=config['sources']).get_model()
spectrum = model.point_sources['source'].components['main'].shape
logger.info(f"Using spectrum:\n {spectrum}")
# Expectation
expectation = psr.get_expectation(spectrum).project('Em')
return expectation
def command_dump():
if len(args.args) != 1:
apar.error("Command 'dump' takes a single argument")
option = args.args[0]
if option == 'header':
result = repr(response)
elif option == 'aeff':
drm = get_drm()
aeff = drm.get_spectral_response().get_effective_area()
result = "#Energy[keV] Aeff[cm2]\n"
for e, a in zip(aeff.axis.centers, aeff):
# IMC: fix this latter when histpy has units
result += f"{e.to_value(u.keV):>12.2e} {a.to_value(u.cm*u.cm):>12.2e}\n"
elif option == 'expectation':
expectation = get_expectation()
result = "#Energy_min[keV] Energy_max[keV] Expected_counts\n"
for emin, emax, ex in zip(expectation.axis.lower_bounds,
expectation.axis.upper_bounds,
expectation):
# IMC: fix this latter when histpy has units
result += (f"{emin.to_value(u.keV):>16.2e} "
f"{emax.to_value(u.keV):>16.2e} "
f"{ex:>15.2e}\n")
else:
apar.error(f"Argument '{option}' not valid for 'dump' command")
if args.output is None:
logger.info(result)
else:
logger.info(f"Saving result to {Path(args.output).resolve()}")
f = open(args.output, 'a')
f.write(result)
f.close()
def command_plot():
if len(args.args) != 1:
apar.error("Command 'plot' takes a single argument")
option = args.args[0]
if option == 'aeff':
drm = get_drm()
drm.get_spectral_response().get_effective_area().plot(errorbars=False)
elif option == 'dispersion':
drm = get_drm()
drm.get_spectral_response().get_dispersion_matrix().plot()
elif option == 'expectation':
expectation = get_expectation().plot(errorbars=False)
else:
apar.error(f"Argument '{option}' not valid for 'plot' command")
if args.output is None:
plt.show()
else:
logger.info(f"Saving plot to {Path(args.output).resolve()}")
plt.savefig(args.output)
# Run
if args.command == 'plot':
command_plot()
elif args.command == 'dump':
command_dump()
else:
apar.error(f"Command '{args.command}' unknown")