import numpy as np
import matplotlib.pyplot as plt
import astropy.units as u
from astropy.io import fits
from astropy.time import Time, TimeDelta
from astropy.coordinates import SkyCoord, cartesian_to_spherical, Galactic
from mhealpy import HealpixMap
import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
from matplotlib import cm, colors
from scipy import interpolate
from scoords import Attitude, SpacecraftFrame
from cosipy.response import FullDetectorResponse
from .scatt_map import SpacecraftAttitudeMap
import logging
logger = logging.getLogger(__name__)
[docs]class SpacecraftFile():
def __init__(self, time, x_pointings = None, y_pointings = None, \
z_pointings = None, earth_zenith = None, altitude = None,\
attitude = None, livetime = None, instrument = "COSI", \
frame = "galactic"):
"""
Handles the spacecraft orientation. Calculates the dwell time
map and point source response over a certain orientation period.
Exports the point source response as RMF and ARF files that can be read by XSPEC.
Parameters
----------
Time : astropy.time.Time
The time stamps for each pointings. Note this is NOT the time duration.
x_pointings : astropy.coordinates.SkyCoord, optional
The pointings (galactic system) of the x axis of the local
coordinate system attached to the spacecraft (the default
is `None`, which implies no input for the x pointings).
y_pointings : astropy.coordinates.SkyCoord, optional
The pointings (galactic system) of the y axis of the local
coordinate system attached to the spacecraft (the default
is `None`, which implies no input for the y pointings).
z_pointings : astropy.coordinates.SkyCoord, optional
The pointings (galactic system) of the z axis of the local
coordinate system attached to the spacecraft (the default
is `None`, which implies no input for the z pointings).
earth_zenith : astropy.coordinates.SkyCoord, optional
The pointings (galactic system) of the Earth zenith (the
default is `None`, which implies no input for the earth pointings).
altitude : array, optional
Altitude of the spacecraft in km.
livetime : array, optional
Time in seconds the instrument is live for the corresponding
energy bin (using left endpoints so that the last entry in
the ori file is 0).
attitude : numpy.ndarray, optional
The attitude of the spacecraft (the default is `None`,
which implies no input for the attitude of the spacecraft).
instrument : str, optional
The instrument name (the default is "COSI").
frame : str, optional
The frame on which the analysis will be based (the default is "galactic").
"""
# check if the inputs are valid
# Time
if isinstance(time, Time):
self._time = time
else:
raise TypeError("The time should be a astropy.time.Time object")
# Altitude
if not isinstance(altitude, (type(None))):
self._altitude = np.array(altitude)
# livetime
if not isinstance(livetime, (type(None))):
self.livetime = np.array(livetime)
# x pointings
if isinstance(x_pointings, (SkyCoord, type(None))):
self.x_pointings = x_pointings
else:
raise TypeError("The x_pointing should be a NoneType or SkyCoord object!")
# y pointings
if isinstance(y_pointings, (SkyCoord, type(None))):
self.y_pointings = y_pointings
else:
raise TypeError("The y_pointing should be a NoneType or SkyCoord object!")
# z pointings
if isinstance(z_pointings, (SkyCoord, type(None))):
self.z_pointings = z_pointings
else:
raise TypeError("The z_pointing should be a NoneType or SkyCoord object!")
# earth pointings
if isinstance(earth_zenith, (SkyCoord, type(None))):
self.earth_zenith = earth_zenith
else:
raise TypeError("The earth_zenith should be a NoneType or SkyCoord object!")
# check if the x, y and z pointings are all None (no inputs). If all None, tt will try to read from attitude parameter
if self.x_pointings is None and self.y_pointings is None and self.z_pointings is None:
if attitude != None:
if type(attitude) is Attitude:
self.attitude = attitude
else:
raise TypeError("The attitude must be `scoords.attitude.Attitude` object")
else:
raise ValueError("Please input the pointings of as least two axes or attitude!")
else:
self.attitude = None # if you have the inputs of x, y and z pointings, the attitude will be overwritten by a None value regardless of the input for the attitude variable.
self._load_time = self._time.to_value(format = "unix") # this is not necessary, but just to make sure evething works fine...
self._x_direction = np.array([x_pointings.l.deg, x_pointings.b.deg]).T # this is not necessary, but just to make sure evething works fine...
self._z_direction = np.array([z_pointings.l.deg, z_pointings.b.deg]).T # this is not necessary, but just to make sure evething works fine...
self._earth_direction = np.array([earth_zenith.l.deg, earth_zenith.b.deg]).T # this is not necessary, but just to make sure evething works fine...
self.frame = frame
[docs] @classmethod
def parse_from_file(cls, file):
"""
Parses timestamps, axis positions from file and returns to __init__.
Parameters
----------
file : str
The file path of the pointings.
Returns
-------
cosipy.spacecraftfile.SpacecraftFile
The SpacecraftFile object.
"""
orientation_file = np.loadtxt(file, usecols=(1, 2, 3, 4, 5, 6, 7, 8, 9),delimiter=' ', skiprows=1, comments=("#", "EN"))
time_stamps = orientation_file[:, 0]
axis_1 = orientation_file[:, [2, 1]]
axis_2 = orientation_file[:, [4, 3]]
axis_3 = orientation_file[:, [7, 6]]
altitude = np.array(orientation_file[:, 5])
livetime = np.array(orientation_file[:, 8])
livetime = livetime[:-1] # left end points, so remove last bin.
time = Time(time_stamps, format = "unix")
xpointings = SkyCoord(l = axis_1[:,0]*u.deg, b = axis_1[:,1]*u.deg, frame = "galactic")
zpointings = SkyCoord(l = axis_2[:,0]*u.deg, b = axis_2[:,1]*u.deg, frame = "galactic")
earthpointings = SkyCoord(l = axis_3[:,0]*u.deg, b = axis_3[:,1]*u.deg, frame = "galactic")
return cls(time, x_pointings = xpointings, z_pointings = zpointings, earth_zenith = earthpointings, altitude = altitude, livetime=livetime)
[docs] def get_time(self, time_array = None):
"""
Return the array pf pointing times as a astropy.Time object.
Parameters
----------
time_array : numpy.ndarray, optional
The time array (the default is `None`, which implies the time array will be taken from the instance).
Returns
-------
astropy.time.Time
The time stamps of the orientation.
"""
if time_array == None:
self._time = Time(self._load_time, format = "unix")
else:
self._time = Time(time_array, format = "unix")
return self._time
[docs] def get_altitude(self):
"""
Return the array of Earth altitude.
Returns
-------
numpy array
the Earth altitude.
"""
return self._altitude
[docs] def get_time_delta(self, time_array = None):
"""
Return an array of the time period between neighbouring time points.
Parameters
----------
time_array : numpy.ndarray, optional
The time delta array (the default is `None`, which implies the time array will be taken from the instance).
Returns
-------
time_delta : astropy.time.Time
The time difference between the neighbouring time stamps.
"""
if time_array == None:
self._time_delta = np.diff(self._load_time)
else:
self._time_delta = np.diff(time_array)
time_delta = TimeDelta(self._time_delta * u.second)
return time_delta
[docs] def interpolate_direction(self, trigger, idx, direction):
"""
Linearly interpolates position at a given time between two timestamps.
Parameters
----------
trigger : astropy.time.Time
The time of the event.
idx : int
The closest index in the pointing to the trigger time.
direction : numpy.ndarray
The pointing axis (x,z).
Returns
-------
numpy.ndarray
The interpolated positions.
"""
new_direction_lat = np.interp(trigger.value, self._load_time[idx : idx + 2], direction[idx : idx + 2, 1])
if (direction[idx, 0] > direction[idx + 1, 0]):
new_direction_long = np.interp(trigger.value, self._load_time[idx : idx + 2], [direction[idx, 0], 360 + direction[idx + 1, 0]])
new_direction_long = new_direction_long - 360
else:
new_direction_long = np.interp(trigger.value, self._load_time[idx : idx + 2], direction[idx : idx + 2, 0])
return np.array([new_direction_long, new_direction_lat])
[docs] def source_interval(self, start, stop):
"""
Returns the SpacecraftFile file class object for the source interval.
Parameters
----------
start : astropy.time.Time
The start time of the orientation period.
stop : astropy.time.Time
The end time of the orientation period.
Returns
-------
cosipy.spacecraft.SpacecraftFile
"""
if(start.format != 'unix' or stop.format != 'unix'):
start = Time(start.unix, format='unix')
stop = Time(stop.unix, format='unix')
if(start > stop):
raise ValueError("start time cannot be after stop time.")
stop_idx = self._load_time.searchsorted(stop.value)
if (start.value % 1 == 0):
start_idx = self._load_time.searchsorted(start.value)
new_times = self._load_time[start_idx : stop_idx + 1]
new_x_direction = self._x_direction[start_idx : stop_idx + 1]
new_z_direction = self._z_direction[start_idx : stop_idx + 1]
new_earth_direction = self._earth_direction[start_idx : stop_idx + 1]
new_earth_altitude = self._altitude[start_idx : stop_idx + 1]
new_livetime = self.livetime[start_idx : stop_idx]
else:
start_idx = self._load_time.searchsorted(start.value) - 1
x_direction_start = self.interpolate_direction(start, start_idx, self._x_direction)
z_direction_start = self.interpolate_direction(start, start_idx, self._z_direction)
earth_direction_start = self.interpolate_direction(start, start_idx, self._earth_direction)
new_times = self._load_time[start_idx + 1 : stop_idx + 1]
new_times = np.insert(new_times, 0, start.value)
new_x_direction = self._x_direction[start_idx + 1 : stop_idx + 1]
new_x_direction = np.insert(new_x_direction, 0, x_direction_start, axis = 0)
new_z_direction = self._z_direction[start_idx + 1 : stop_idx + 1]
new_z_direction = np.insert(new_z_direction, 0, z_direction_start, axis = 0)
new_earth_direction = self._earth_direction[start_idx + 1 : stop_idx + 1]
new_earth_direction = np.insert(new_earth_direction, 0, earth_direction_start, axis = 0)
# Use linear interpolation to get starting altitude at desired time.
f = interpolate.interp1d(self._time.value, self._altitude, kind="linear")
starting_alt = f(start.value)
new_earth_altitude = self._altitude[start_idx + 1 : stop_idx + 1]
new_earth_altitude = np.insert(new_earth_altitude, 0, starting_alt)
# SAA livetime:
if self.livetime[start_idx] == 0:
udpated_livetime = 0
else:
updated_livetime = new_times[1] - new_times[0]
new_livetime = self.livetime[start_idx + 1 : stop_idx]
new_livetime = np.insert(new_livetime, 0, updated_livetime)
if (stop.value % 1 != 0):
stop_idx = self._load_time.searchsorted(stop.value) - 1
x_direction_stop = self.interpolate_direction(stop, stop_idx, self._x_direction)
z_direction_stop = self.interpolate_direction(stop, stop_idx, self._z_direction)
earth_direction_stop = self.interpolate_direction(stop, stop_idx, self._earth_direction)
new_times = np.delete(new_times, -1)
new_times = np.append(new_times, stop.value)
new_x_direction = new_x_direction[:-1]
new_x_direction = np.append(new_x_direction, [x_direction_stop], axis = 0)
new_z_direction = new_z_direction[:-1]
new_z_direction = np.append(new_z_direction, [z_direction_stop], axis = 0)
new_earth_direction = new_earth_direction[:-1]
new_earth_direction = np.append(new_earth_direction, [earth_direction_stop], axis = 0)
# Use linear interpolation to get starting altitude at desired time.
f = interpolate.interp1d(self._time.value, self._altitude, kind="linear")
stop_alt = f(stop.value)
new_earth_altitude = new_earth_altitude[:-1]
new_earth_altitude = np.append(new_earth_altitude, [stop_alt])
# SAA livetime:
if new_livetime[-1] == 0:
udpated_livetime = 0
else:
updated_livetime = new_times[-1] - new_times[-2]
new_livetime = new_livetime[:-1]
new_livetime = np.append(new_livetime, updated_livetime)
time = Time(new_times, format = "unix")
xpointings = SkyCoord(l = new_x_direction[:,0]*u.deg, b = new_x_direction[:,1]*u.deg, frame = "galactic")
zpointings = SkyCoord(l = new_z_direction[:,0]*u.deg, b = new_z_direction[:,1]*u.deg, frame = "galactic")
earthpointings = SkyCoord(l = new_earth_direction[:,0]*u.deg, b = new_earth_direction[:,1]*u.deg, frame = "galactic")
altitude = new_earth_altitude
return self.__class__(time, x_pointings = xpointings, z_pointings = zpointings, earth_zenith = earthpointings, altitude = altitude, livetime = new_livetime)
[docs] def get_attitude(self, x_pointings = None, y_pointings = None, z_pointings = None):
"""
Converts the x, y and z pointings to the attitude of the telescope.
Parameters
----------
x_pointings : astropy.coordinates.SkyCoord, optional
The pointings (galactic system) of the x axis of the local coordinate system attached to the spacecraft (the default is `None`, which implies that the x pointings will be taken from the instance).
y_pointings : astropy.coordinates.SkyCoord, optional
The pointings (galactic system) of the y axis of the local coordinate system attached to the spacecraft (the default is `None`, which implies that the y pointings will be taken from the instance).
z_pointings : astropy.coordinates.SkyCoord, optional
The pointings (galactic system) of the z axis of the local coordinate system attached to the spacecraft (the default is `None`, which implies that the z pointings will be taken from the instance).
Returns
-------
scoords.attitude.Attitude
The attitude of the spacecraft.
"""
if self.attitude is None:
# the attitude is None, we will calculate from the x, y and z pointings
if x_pointings is not None:
self.x_pointings = x_pointings
if y_pointings is not None:
self.y_pointings = y_pointings
if z_pointings is not None:
self.z_pointings = z_pointings
list_ = [self.x_pointings, self.y_pointings, self.z_pointings]
coord_list_of_path = [x for x in list_ if x!=None] # check how many pointings the user input
# Check if the user input pointings from at least two axes
if len(coord_list_of_path) <= 1:
raise ValueError("You must input pointings of at least two axes")
# Check if the inputs are SkyCoord objects
for i in coord_list_of_path:
if type(i) != SkyCoord:
raise ValueError("The coordiates must be a SkyCoord object")
self.attitude = Attitude.from_axes(x=self.x_pointings,
y=self.y_pointings,
z=self.z_pointings,
frame = self.frame)
return self.attitude
[docs] def get_target_in_sc_frame(self, target_name, target_coord, attitude = None, quiet = False, save = False):
"""
Convert the x, y and z pointings of the spacescraft axes to the path of the source in the spacecraft frame.
Specify the pointings of at least two axes.
Parameters
----------
target_name : str
The name of the target object.
target_coord : astropy.coordinates.SkyCoord
The coordinates of the target object.
attitude: scoords.Attitude, optional
The attitude of the spacecraft (the default is `None`, which implies the attitude will be taken from the instance).
quiet : bool, default=False
Setting `True` to stop printing the messages.
save : bool, default=False
Setting `True` to save the target coordinates in the spacecraft frame.
Returns
-------
astropy.coordinates.SkyCoord
The target coordinates in the spacecraft frame.
"""
if attitude != None:
self.attitude = attitude
else:
self.attitude = self.get_attitude()
self.target_name = target_name
if quiet == False:
logger.info("Now converting to the Spacecraft frame...")
self.src_path_cartesian = SkyCoord(np.dot(self.attitude.rot.inv().as_matrix(), target_coord.cartesian.xyz.value),
representation_type = 'cartesian',
frame = SpacecraftFrame())
# The conversion above is in Cartesian frame, so we have to convert them to the spherical one.
self.src_path_spherical = cartesian_to_spherical(self.src_path_cartesian.x,
self.src_path_cartesian.y,
self.src_path_cartesian.z)
if quiet == False:
logger.info(f"Conversion completed!")
# generate the numpy array of l and b to save to a npy file
l = np.array(self.src_path_spherical[2].deg) # note that 0 is Quanty, 1 is latitude and 2 is longitude and they are in rad not deg
b = np.array(self.src_path_spherical[1].deg)
self.src_path_lb = np.stack((l,b), axis=-1)
if save == True:
np.save(self.target_name+"_source_path_in_SC_frame", self.src_path_lb)
# convert to SkyCoord objects to get the output object of this method
self.src_path_skycoord = SkyCoord(self.src_path_lb[:,0], self.src_path_lb[:,1], unit = "deg", frame = SpacecraftFrame())
return self.src_path_skycoord
[docs] def get_dwell_map(self, response, src_path = None, save = False, pa_convention=None):
"""
Generates the dwell time map for the source.
Parameters
----------
response : str or pathlib.Path
The path to the response file.
src_path : astropy.coordinates.SkyCoord, optional
The movement of source in the detector frame (the default is `None`, which implies that the `src_path` will be read from the instance).
save : bool, default=False
Set True to save the dwell time map.
pa_convention : str, optional
Polarization convention of response ('RelativeX', 'RelativeY', or 'RelativeZ')
Returns
-------
mhealpy.containers.healpix_map.HealpixMap
The dwell time map.
"""
# Define the response
self.response_file = response
# Define the dts
self.dts = self.get_time_delta()
# define the target source path in the SC frame
if src_path is None:
path = self.src_path_skycoord
else:
path = src_path
# check if the target source path is astropy.Skycoord object
if type(path) != SkyCoord:
raise TypeError("The coordinates of the source movement in the Spacecraft frame must be a SkyCoord object")
if path.shape[0]-1 != self.dts.shape[0]:
raise ValueError("The dimensions of the dts or source coordinates are not correct. Please check your inputs.")
with FullDetectorResponse.open(self.response_file, pa_convention=pa_convention) as response:
self.dwell_map = HealpixMap(base = response,
coordsys = SpacecraftFrame())
# Get the unique pixels to weight, and sum all the correspondint weights first, so
# each pixels needs to be called only once.
# Based on https://stackoverflow.com/questions/23268605/grouping-indices-of-unique-elements-in-numpy
# remove the last value. Effectively a 0th order interpolations
pixels, weights = self.dwell_map.get_interp_weights(theta = self.src_path_skycoord[:-1])
weighted_duration = weights * self.dts.to_value(u.second)[None]
pixels = pixels.flatten()
weighted_duration = weighted_duration.flatten()
pixels_argsort = np.argsort(pixels)
pixels = pixels[pixels_argsort]
weighted_duration = weighted_duration[pixels_argsort]
first_unique = np.concatenate(([True], pixels[1:] != pixels[:-1]))
pixel_unique = pixels[first_unique]
splits = np.nonzero(first_unique)[0][1:]
pixel_durations = [np.sum(weighted_duration[start:stop]) for start,stop in zip(np.append(0,splits), np.append(splits, pixels.size))]
for pix, dur in zip(pixel_unique, pixel_durations):
self.dwell_map[pix] += dur
self.dwell_map.to(u.second, update = False, copy = False)
if save == True:
self.dwell_map.write_map(self.target_name + "_DwellMap.fits", overwrite = True)
return self.dwell_map
[docs] def get_scatt_map(self,
target_coord,
nside,
scheme = 'ring',
coordsys = 'galactic',
r_earth = 6378.0,
earth_occ = True
):
"""
Bin the spacecraft attitude history into a 4D histogram that
contains the accumulated time the axes of the spacecraft where
looking at a given direction.
Parameters
----------
target_coord : astropy.coordinates.SkyCoord
The coordinates of the target object.
nside : int
The nside of the scatt map.
scheme : str, optional
The scheme of the scatt map (the default is "ring")
coordsys : str, optional
The coordinate system used in the scatt map (the default is "galactic).
r_earth : float, optional
Earth radius in km (default is 6378 km).
earth_occ : bool, optional
Option to include Earth occultation in scatt map calculation.
Default is True.
Returns
-------
h_ori : cosipy.spacecraftfile.scatt_map.SpacecraftAttitudeMap
The spacecraft attitude map.
"""
# Get orientations
timestamps = self.get_time()
attitudes = self.get_attitude()
# Altitude at each point in the orbit:
altitude = self._altitude
# Earth zenith at each point in the orbit:
earth_zenith = self.earth_zenith
# Fill (only 2 axes needed to fully define the orientation)
h_ori = SpacecraftAttitudeMap(nside = nside,
scheme = scheme,
coordsys = coordsys)
x,y,z = attitudes[:-1].as_axes()
# Get max angle based on altitude:
max_angle = np.pi - np.arcsin(r_earth/(r_earth + altitude))
max_angle *= (180/np.pi) # angles in degree
# Calculate angle between source direction and Earth zenith
# for each time stamp:
src_angle = target_coord.separation(earth_zenith)
# Get pointings that are occulted by Earth:
earth_occ_index = src_angle.value >= max_angle
# Define weights and set to 0 if blocked by Earth:
weight = self.livetime*u.s
if earth_occ == True:
weight[earth_occ_index[:-1]] = 0
# Fill histogram:
h_ori.fill(x, y, weight = weight)
return h_ori
[docs] def get_psr_rsp(self, response = None, dwell_map = None, dts = None, pa_convention=None):
"""
Generates the point source response based on the response file and dwell time map.
dts is used to find the exposure time for this observation.
Parameters
----------
:response : str or pathlib.Path, optional
The response for the observation (the defaul is `None`, which implies that the `response` will be read from the instance).
dwell_map : str, optional
The time dwell map for the source, you can load saved dwell time map using this parameter if you've saved it before (the defaul is `None`, which implies that the `dwell_map` will be read from the instance).
dts : numpy.ndarray or str, optional
The elapsed time for each pointing. It must has the same size as the pointings. If you have saved this array, you can load it using this parameter (the defaul is `None`, which implies that the `dts` will be read from the instance).
Returns
-------
Ei_edges : numpy.ndarray
The edges of the incident energy.
Ei_lo : numpy.ndarray
The lower edges of the incident energy.
Ei_hi : numpy.ndarray
The upper edges of the incident energy.
Em_edges : numpy.ndarray
The edges of the measured energy.
Em_lo : numpy.ndarray
The lower edges of the measured energy.
Em_hi : numpy.ndarray
The upper edges of the measured energy.
areas : numpy.ndarray
The effective area of each energy bin.
matrix : numpy.ndarray
The energy dispersion matrix.
pa_convention : str, optional
Polarization convention of response ('RelativeX', 'RelativeY', or 'RelativeZ')
"""
if response == None:
pass # will use the response defined in the previous steps
else:
self.response_file = response
if dwell_map is None: # must use is None, or it throws error!
pass # will use the dwelltime map calculated in the previous steps
else:
self.dwell_map = HealpixMap.read_map(dwell_map)
if dts == None:
self.dts = self.get_time_delta()
else:
self.dts = TimeDelta(dts*u.second)
with FullDetectorResponse.open(self.response_file, pa_convention=pa_convention) as response:
# get point source response
self.psr = response.get_point_source_response(self.dwell_map)
self.Ei_edges = np.array(response.axes['Ei'].edges)
self.Ei_lo = np.float32(self.Ei_edges[:-1]) # use float32 to match the requirement of the data type
self.Ei_hi = np.float32(self.Ei_edges[1:])
self.Em_edges = np.array(response.axes['Em'].edges)
self.Em_lo = np.float32(self.Em_edges[:-1])
self.Em_hi = np.float32(self.Em_edges[1:])
# get the effective area and matrix
logger.info("Getting the effective area ...")
self.areas = np.float32(np.array(self.psr.project('Ei').to_dense().contents))/self.dts.to_value(u.second).sum()
spectral_response = np.float32(np.array(self.psr.project(['Ei','Em']).to_dense().contents))
self.matrix = np.float32(np.zeros((self.Ei_lo.size,self.Em_lo.size))) # initate the matrix
logger.info("Getting the energy redistribution matrix ...")
for i in np.arange(self.Ei_lo.size):
new_raw = spectral_response[i,:]/spectral_response[i,:].sum()
self.matrix[i,:] = new_raw
self.matrix = self.matrix.T
return self.Ei_edges, self.Ei_lo, self.Ei_hi, self.Em_edges, self.Em_lo, self.Em_hi, self.areas, self.matrix
[docs] def get_arf(self, out_name = None):
"""
Converts the point source response to an arf file that can be read by XSPEC.
Parameters
----------
out_name: str, optional
The name of the arf file to save. (the default is `None`, which implies that the saving name will be the target name of the instance).
"""
if out_name == None:
self.out_name = self.target_name
else:
self.out_name = out_name
# blow write the arf file
copyright_string=" FITS (Flexible Image Transport System) format is defined in 'Astronomy and Astrophysics', volume 376, page 359; bibcode: 2001A&A...376..359H "
## Create PrimaryHDU
primaryhdu = fits.PrimaryHDU() # create an empty primary HDU
primaryhdu.header["BITPIX"] = -32 # since it's an empty HDU, I can just change the data type by resetting the BIPTIX value
primaryhdu.header["COMMENT"] = copyright_string # add comments
primaryhdu.header # print headers and their values
col1_energ_lo = fits.Column(name="ENERG_LO", format="E",unit = "keV", array=self.Em_lo)
col2_energ_hi = fits.Column(name="ENERG_HI", format="E",unit = "keV", array=self.Em_hi)
col3_specresp = fits.Column(name="SPECRESP", format="E",unit = "cm**2", array=self.areas)
cols = fits.ColDefs([col1_energ_lo, col2_energ_hi, col3_specresp]) # create a ColDefs (column-definitions) object for all columns
specresp_bintablehdu = fits.BinTableHDU.from_columns(cols) # create a binary table HDU object
specresp_bintablehdu.header.comments["TTYPE1"] = "label for field 1"
specresp_bintablehdu.header.comments["TFORM1"] = "data format of field: 4-byte REAL"
specresp_bintablehdu.header.comments["TUNIT1"] = "physical unit of field"
specresp_bintablehdu.header.comments["TTYPE2"] = "label for field 2"
specresp_bintablehdu.header.comments["TFORM2"] = "data format of field: 4-byte REAL"
specresp_bintablehdu.header.comments["TUNIT2"] = "physical unit of field"
specresp_bintablehdu.header.comments["TTYPE3"] = "label for field 3"
specresp_bintablehdu.header.comments["TFORM3"] = "data format of field: 4-byte REAL"
specresp_bintablehdu.header.comments["TUNIT3"] = "physical unit of field"
specresp_bintablehdu.header["EXTNAME"] = ("SPECRESP","name of this binary table extension")
specresp_bintablehdu.header["TELESCOP"] = ("COSI","mission/satellite name")
specresp_bintablehdu.header["INSTRUME"] = ("COSI","instrument/detector name")
specresp_bintablehdu.header["FILTER"] = ("NONE","filter in use")
specresp_bintablehdu.header["HDUCLAS1"] = ("RESPONSE","dataset relates to spectral response")
specresp_bintablehdu.header["HDUCLAS2"] = ("SPECRESP","extension contains an ARF")
specresp_bintablehdu.header["HDUVERS"] = ("1.1.0","version of format")
new_arfhdus = fits.HDUList([primaryhdu, specresp_bintablehdu])
new_arfhdus.writeto(f'{self.out_name}.arf', overwrite=True)
return
[docs] def get_rmf(self, out_name = None):
"""
Converts the point source response to an rmf file that can be read by XSPEC.
Parameters
----------
out_name: str, optional
The name of the arf file to save. (the default is None, which implies that the saving name will be the target name of the instance).
"""
if out_name == None:
self.out_name = self.target_name
else:
self.out_name = out_name
# blow write the arf file
copyright_string=" FITS (Flexible Image Transport System) format is defined in 'Astronomy and Astrophysics', volume 376, page 359; bibcode: 2001A&A...376..359H "
## Create PrimaryHDU
primaryhdu = fits.PrimaryHDU() # create an empty primary HDU
primaryhdu.header["BITPIX"] = -32 # since it's an empty HDU, I can just change the data type by resetting the BIPTIX value
primaryhdu.header["COMMENT"] = copyright_string # add comments
primaryhdu.header # print headers and their values
## Create binary table HDU for MATRIX
### prepare colums
energ_lo = []
energ_hi = []
n_grp = []
f_chan = []
n_chan = []
matrix = []
for i in np.arange(len(self.Ei_lo)):
energ_lo_temp = np.float32(self.Em_lo[i])
energ_hi_temp = np.float32(self.Ei_hi[i])
if self.matrix[:,i].sum() != 0:
nz_matrix_idx = np.nonzero(self.matrix[:,i])[0] # non-zero index for the matrix
subsets = np.split(nz_matrix_idx, np.where(np.diff(nz_matrix_idx) != 1)[0]+1)
n_grp_temp = np.int16(len(subsets))
f_chan_temp = []
n_chan_temp = []
matrix_temp = []
for m in np.arange(n_grp_temp):
f_chan_temp += [subsets[m][0]]
n_chan_temp += [len(subsets[m])]
for m in nz_matrix_idx:
matrix_temp += [self.matrix[:,i][m]]
f_chan_temp = np.int16(np.array(f_chan_temp))
n_chan_temp = np.int16(np.array(n_chan_temp))
matrix_temp = np.float32(np.array(matrix_temp))
else:
n_grp_temp = np.int16(0)
f_chan_temp = np.int16(np.array([0]))
n_chan_temp = np.int16(np.array([0]))
matrix_temp = np.float32(np.array([0]))
energ_lo.append(energ_lo_temp)
energ_hi.append(energ_hi_temp)
n_grp.append(n_grp_temp)
f_chan.append(f_chan_temp)
n_chan.append(n_chan_temp)
matrix.append(matrix_temp)
col1_energ_lo = fits.Column(name="ENERG_LO", format="E",unit = "keV", array=energ_lo)
col2_energ_hi = fits.Column(name="ENERG_HI", format="E",unit = "keV", array=energ_hi)
col3_n_grp = fits.Column(name="N_GRP", format="I", array=n_grp)
col4_f_chan = fits.Column(name="F_CHAN", format="PI(54)", array=f_chan)
col5_n_chan = fits.Column(name="N_CHAN", format="PI(54)", array=n_chan)
col6_n_chan = fits.Column(name="MATRIX", format="PE(161)", array=matrix)
cols = fits.ColDefs([col1_energ_lo, col2_energ_hi, col3_n_grp, col4_f_chan, col5_n_chan, col6_n_chan]) # create a ColDefs (column-definitions) object for all columns
matrix_bintablehdu = fits.BinTableHDU.from_columns(cols) # create a binary table HDU object
matrix_bintablehdu.header.comments["TTYPE1"] = "label for field 1 "
matrix_bintablehdu.header.comments["TFORM1"] = "data format of field: 4-byte REAL"
matrix_bintablehdu.header.comments["TUNIT1"] = "physical unit of field"
matrix_bintablehdu.header.comments["TTYPE2"] = "label for field 2"
matrix_bintablehdu.header.comments["TFORM2"] = "data format of field: 4-byte REAL"
matrix_bintablehdu.header.comments["TUNIT2"] = "physical unit of field"
matrix_bintablehdu.header.comments["TTYPE3"] = "label for field 3 "
matrix_bintablehdu.header.comments["TFORM3"] = "data format of field: 2-byte INTEGER"
matrix_bintablehdu.header.comments["TTYPE4"] = "label for field 4"
matrix_bintablehdu.header.comments["TFORM4"] = "data format of field: variable length array"
matrix_bintablehdu.header.comments["TTYPE5"] = "label for field 5"
matrix_bintablehdu.header.comments["TFORM5"] = "data format of field: variable length array"
matrix_bintablehdu.header.comments["TTYPE6"] = "label for field 6"
matrix_bintablehdu.header.comments["TFORM6"] = "data format of field: variable length array"
matrix_bintablehdu.header["EXTNAME"] = ("MATRIX","name of this binary table extension")
matrix_bintablehdu.header["TELESCOP"] = ("COSI","mission/satellite name")
matrix_bintablehdu.header["INSTRUME"] = ("COSI","instrument/detector name")
matrix_bintablehdu.header["FILTER"] = ("NONE","filter in use")
matrix_bintablehdu.header["CHANTYPE"] = ("PI","total number of detector channels")
matrix_bintablehdu.header["DETCHANS"] = (len(self.Em_lo),"total number of detector channels")
matrix_bintablehdu.header["HDUCLASS"] = ("OGIP","format conforms to OGIP standard")
matrix_bintablehdu.header["HDUCLAS1"] = ("RESPONSE","dataset relates to spectral response")
matrix_bintablehdu.header["HDUCLAS2"] = ("RSP_MATRIX","dataset is a spectral response matrix")
matrix_bintablehdu.header["HDUVERS"] = ("1.3.0","version of format")
matrix_bintablehdu.header["TLMIN4"] = (0,"minimum value legally allowed in column 4")
## Create binary table HDU for EBOUNDS
channels = np.int16(np.arange(len(self.Em_lo)))
e_min = np.float32(self.Em_lo)
e_max = np.float32(self.Em_hi)
col1_channels = fits.Column(name="CHANNEL", format="I", array=channels)
col2_e_min = fits.Column(name="E_MIN", format="E",unit="keV", array=e_min)
col3_e_max = fits.Column(name="E_MAX", format="E",unit="keV", array=e_max)
cols = fits.ColDefs([col1_channels, col2_e_min, col3_e_max])
ebounds_bintablehdu = fits.BinTableHDU.from_columns(cols)
ebounds_bintablehdu.header.comments["TTYPE1"] = "label for field 1"
ebounds_bintablehdu.header.comments["TFORM1"] = "data format of field: 2-byte INTEGER"
ebounds_bintablehdu.header.comments["TTYPE2"] = "label for field 2"
ebounds_bintablehdu.header.comments["TFORM2"] = "data format of field: 4-byte REAL"
ebounds_bintablehdu.header.comments["TUNIT2"] = "physical unit of field"
ebounds_bintablehdu.header.comments["TTYPE3"] = "label for field 3"
ebounds_bintablehdu.header.comments["TFORM3"] = "data format of field: 4-byte REAL"
ebounds_bintablehdu.header.comments["TUNIT3"] = "physical unit of field"
ebounds_bintablehdu.header["EXTNAME"] = ("EBOUNDS","name of this binary table extension")
ebounds_bintablehdu.header["TELESCOP"] = ("COSI","mission/satellite")
ebounds_bintablehdu.header["INSTRUME"] = ("COSI","nstrument/detector name")
ebounds_bintablehdu.header["FILTER"] = ("NONE","filter in use")
ebounds_bintablehdu.header["CHANTYPE"] = ("PI","channel type (PHA or PI)")
ebounds_bintablehdu.header["DETCHANS"] = (len(self.Em_lo),"total number of detector channels")
ebounds_bintablehdu.header["HDUCLASS"] = ("OGIP","format conforms to OGIP standard")
ebounds_bintablehdu.header["HDUCLAS1"] = ("RESPONSE","dataset relates to spectral response")
ebounds_bintablehdu.header["HDUCLAS2"] = ("EBOUNDS","dataset is a spectral response matrix")
ebounds_bintablehdu.header["HDUVERS"] = ("1.2.0","version of format")
new_rmfhdus = fits.HDUList([primaryhdu, matrix_bintablehdu,ebounds_bintablehdu])
new_rmfhdus.writeto(f'{self.out_name}.rmf', overwrite=True)
return
[docs] def get_pha(self, src_counts, errors, rmf_file = None, arf_file = None, bkg_file = None, exposure_time = None, dts = None, telescope="COSI", instrument="COSI"):
"""
Generate the pha file that can be read by XSPEC. This file stores the counts info of the source.
Parameters
----------
src_counts : numpy.ndarray
The counts in each energy band. If you have src_counts with unit counts/kev/s, you must convert it to counts by multiplying it with exposure time and the energy band width.
errors : numpy.ndarray
The error for counts. It has the same unit requirement as src_counts.
rmf_file : str, optional
The rmf file name to be written into the pha file (the default is `None`, which implies that it uses the rmf file generate by function `get_rmf`)
arf_file : str, optional
The arf file name to be written into the pha file (the default is `None`, which implies that it uses the arf file generate by function `get_arf`)
bkg_file : str, optional
The background file name (the default is `None`, which implied the `src_counts` is source counts only).
exposure_time : float, optional
The exposure time for this source observation (the default is `None`, which implied that the exposure time will be calculated by `dts`).
dts : numpy.ndarray, optional
It's used to calculate the exposure time. It has the same effect as `exposure_time`. If both `exposure_time` and `dts` are given, `dts` will write over the exposure_time (the default is `None`, which implies that the `dts` will be read from the instance).
telescope : str, optional
The name of the telecope (the default is "COSI").
instrument : str, optional
The instrument name (the default is "COSI").
"""
self.src_counts = src_counts
self.errors = errors
if bkg_file != None:
self.bkg_file = bkg_file
else:
self.bkg_file = "None"
self.bkg_file = bkg_file
if rmf_file != None:
self.rmf_file = rmf_file
else:
self.rmf_file = f'{self.out_name}.rmf'
if arf_file != None:
self.arf_file = arf_file
else:
self.arf_file = f'{self.out_name}.arf'
if exposure_time != None:
self.exposure_time = exposure_time
if dts != None:
self.dts = self.__str_or_array(dts)
self.exposure_time = self.dts.sum()
self.telescope = telescope
self.instrument = instrument
self.channel_number = len(self.src_counts)
# define other hardcoded inputs
copyright_string=" FITS (Flexible Image Transport System) format is defined in 'Astronomy and Astrophysics', volume 376, page 359; bibcode: 2001A&A...376..359H "
channels = np.arange(self.channel_number)
# Create PrimaryHDU
primaryhdu = fits.PrimaryHDU() # create an empty primary HDU
primaryhdu.header["BITPIX"] = -32 # since it's an empty HDU, I can just change the data type by resetting the BIPTIX value
primaryhdu.header["COMMENT"] = copyright_string # add comments
primaryhdu.header["TELESCOP"] = telescope # add telescope keyword valie
primaryhdu.header["INSTRUME"] = instrument # add instrument keyword valie
primaryhdu.header # print headers and their values
# Create binary table HDU
a1 = np.array(channels,dtype="int32") # I guess I need to convert the dtype to match the format J
a2 = np.array(self.src_counts,dtype="int64") # int32 is not enough for counts
a3 = np.array(self.errors,dtype="int64") # int32 is not enough for errors
col1 = fits.Column(name="CHANNEL", format="J", array=a1)
col2 = fits.Column(name="COUNTS", format="K", array=a2,unit="count")
col3 = fits.Column(name="STAT_ERR", format="K", array=a3,unit="count")
cols = fits.ColDefs([col1, col2, col3]) # create a ColDefs (column-definitions) object for all columns
bintablehdu = fits.BinTableHDU.from_columns(cols) # create a binary table HDU object
#add other BinTableHDU hear keywords,their values, and comments
bintablehdu.header.comments["TTYPE1"] = "label for field 1"
bintablehdu.header.comments["TFORM1"] = "data format of field: 32-bit integer"
bintablehdu.header.comments["TTYPE2"] = "label for field 2"
bintablehdu.header.comments["TFORM2"] = "data format of field: 32-bit integer"
bintablehdu.header.comments["TUNIT2"] = "physical unit of field 2"
bintablehdu.header["EXTNAME"] = ("SPECTRUM","name of this binary table extension")
bintablehdu.header["TELESCOP"] = (self.telescope,"telescope/mission name")
bintablehdu.header["INSTRUME"] = (self.instrument,"instrument/detector name")
bintablehdu.header["FILTER"] = ("NONE","filter type if any")
bintablehdu.header["EXPOSURE"] = (self.exposure_time,"integration time in seconds")
bintablehdu.header["BACKFILE"] = (self.bkg_file,"background filename")
bintablehdu.header["BACKSCAL"] = (1,"background scaling factor")
bintablehdu.header["CORRFILE"] = ("NONE","associated correction filename")
bintablehdu.header["CORRSCAL"] = (1,"correction file scaling factor")
bintablehdu.header["CORRSCAL"] = (1,"correction file scaling factor")
bintablehdu.header["RESPFILE"] = (self.rmf_file,"associated rmf filename")
bintablehdu.header["ANCRFILE"] = (self.arf_file,"associated arf filename")
bintablehdu.header["AREASCAL"] = (1,"area scaling factor")
bintablehdu.header["STAT_ERR"] = (0,"statistical error specified if any")
bintablehdu.header["SYS_ERR"] = (0,"systematic error specified if any")
bintablehdu.header["GROUPING"] = (0,"grouping of the data has been defined if any")
bintablehdu.header["QUALITY"] = (0,"data quality information specified")
bintablehdu.header["HDUCLASS"] = ("OGIP","format conforms to OGIP standard")
bintablehdu.header["HDUCLAS1"] = ("SPECTRUM","PHA dataset")
bintablehdu.header["HDUVERS"] = ("1.2.1","version of format")
bintablehdu.header["POISSERR"] = (False,"Poissonian errors to be assumed, T as True")
bintablehdu.header["CHANTYPE"] = ("PI","channel type (PHA or PI)")
bintablehdu.header["DETCHANS"] = (self.channel_number,"total number of detector channels")
new_phahdus = fits.HDUList([primaryhdu, bintablehdu])
new_phahdus.writeto(f'{self.out_name}.pha', overwrite=True)
return
[docs] def plot_arf(self, file_name = None, save_name = None, dpi = 300):
"""
Read the arf fits file, plot and save it.
Parameters
----------
file_name: str, optional
The directory if the arf fits file (the default is `None`, which implies the file name will be read from the instance).
save_name: str, optional
The name of the saved image of effective area (the default is `None`, which implies the file name will be read from the instance).
dpi: int, optional
The dpi of the saved image (the default is 300).
"""
if file_name != None:
self.file_name = file_name
else:
self.file_name = f'{self.out_name}.arf'
if save_name != None:
self.save_name = save_name
else:
self.save_name = self.out_name
self.dpi = dpi
self.arf = fits.open(self.file_name) # read file
# SPECRESP HDU
self.specresp_hdu = self.arf["SPECRESP"]
self.areas = np.array(self.specresp_hdu.data["SPECRESP"])
self.Em_lo = np.array(self.specresp_hdu.data["ENERG_LO"])
self.Em_hi = np.array(self.specresp_hdu.data["ENERG_HI"])
E_center = (self.Em_lo+self.Em_hi)/2
E_edges = np.append(self.Em_lo,self.Em_hi[-1])
fig, ax = plt.subplots()
ax.hist(E_center,E_edges,weights=self.areas,histtype='step')
ax.set_title("Effective area")
ax.set_xlabel("Energy[$keV$]")
ax.set_ylabel(r"Effective area [$cm^2$]")
ax.set_xscale("log")
fig.savefig(f"Effective_area_for_{self.save_name}.png", bbox_inches = "tight", pad_inches=0.1, dpi=self.dpi)
#fig.show()
return
[docs] def plot_rmf(self, file_name = None, save_name = None, dpi = 300):
"""
Read the rmf fits file, plot and save it.
Parameters
----------
file_name: str, optional
The directory if the arf fits file (the default is `None`, which implies the file name will be read from the instance).
save_name: str, optional
The name of the saved image of effective area (the default is `None`, which implies the file name will be read from the instance).
dpi: int, optional
The dpi of the saved image (the default is 300).
"""
if file_name != None:
self.file_name = file_name
else:
self.file_name = f'{self.out_name}.rmf'
if save_name != None:
self.save_name = save_name
else:
self.save_name = self.out_name
self.dpi = dpi
# Read rmf file
self.rmf = fits.open(self.file_name) # read file
# Read the ENOUNDS information
ebounds_ext = self.rmf["EBOUNDS"]
channel_low = ebounds_ext.data["E_MIN"] # energy bin lower edges for channels (channels are just incident energy bins)
channel_high = ebounds_ext.data["E_MAX"] # energy bin higher edges for channels (channels are just incident energy bins)
# Read the MATRIX extension
matrix_ext = self.rmf['MATRIX']
#logger.info(repr(matrix_hdu.header[:60]))
energy_low = matrix_ext.data["ENERG_LO"] # energy bin lower edges for measured energies
energy_high = matrix_ext.data["ENERG_HI"] # energy bin higher edges for measured energies
data = matrix_ext.data
# Create a 2-d numpy array and store probability data into the redistribution matrix
rmf_matrix = np.zeros((len(energy_low),len(channel_low))) # create an empty matrix
for i in np.arange(data.shape[0]): # i is the measured energy index, examine the matrix_ext.data rows by rows
if data[i][5].sum() == 0: # if the sum of probabilities is zero, then skip since there is no data at all
pass
else:
#measured_energy_index = np.argwhere(energy_low == data[157][0])[0][0]
f_chan = data[i][3] # get the starting channel of each subsets
n_chann = data[i][4] # get the number of channels in each subsets
matrix = data[i][5] # get the probabilities of this row (incident energy)
indices = []
for k in f_chan:
channels = 0
channels = np.arange(k,k + n_chann[np.argwhere(f_chan == k)]).tolist() # generate the cha
indices += channels # fappend the channels togeter
indices = np.array(indices)
for m in indices:
rmf_matrix[i][m] = matrix[np.argwhere(indices == m)[0][0]] # write the probabilities into the empty matrix
# plot the redistribution matrix
xcenter = np.divide(energy_low+energy_high,2)
x_center_coords = np.repeat(xcenter, 10)
y_center_coords = np.tile(xcenter, 10)
energy_all_edges = np.append(energy_low,energy_high[-1])
#bin_edges = np.array([incident_energy_bins,incident_energy_bins]) # doesn't work
bin_edges = np.vstack((energy_all_edges, energy_all_edges))
#logger.info(bin_edges)
self.probability = []
for i in np.arange(10):
for j in np.arange(10):
self.probability.append(rmf_matrix[i][j])
#logger.info(type(probability))
plt.hist2d(x=x_center_coords,y=y_center_coords,weights=self.probability,bins=bin_edges, norm=LogNorm())
plt.xscale('log')
plt.yscale('log')
plt.xlabel("Incident energy [$keV$]")
plt.ylabel("Measured energy [$keV$]")
plt.title("Redistribution matrix")
#plt.xlim([70,10000])
#plt.ylim([70,10000])
plt.colorbar(norm=LogNorm())
plt.savefig(f"Redistribution_matrix_for_{self.save_name}.png", bbox_inches = "tight", pad_inches=0.1, dpi=300)
#plt.show()
return