Source code for cosipy.ts_map.fast_ts_fit

from histpy import Histogram, Axis, Axes
import h5py as h5
import sys
from cosipy import SpacecraftFile
from cosipy.response import PointSourceResponse
import healpy as hp
from mhealpy import HealpixMap
import numpy as np
import os
import multiprocessing
from itertools import product
from .fast_norm_fit import FastNormFit as fnf
from pathlib import Path
from cosipy.response import FullDetectorResponse
import time
import scipy.stats
import os
import psutil
import gc
import matplotlib.pyplot as plt

import logging
logger = logging.getLogger(__name__)

[docs]class FastTSMap(): def __init__(self, data, bkg_model, response_path, orientation = None, cds_frame = "local", scheme = "RING"): """ Initialize the instance if a TS map fit. Parameters ---------- data : histpy.Histogram Observed data, which includes counts from both signal and background. bkg_model : histpy.Histogram Background model, which includes the background counts to model the background in the observed data. response_path : str or pathlib.Path The path to the response file. orientation : cosipy.SpacecraftFile, optional The orientation of the spacecraft when data are collected (the default is `None`, which implies the orientation file is not needed). cds_frame : str, optional "local" or "galactic", it's the Compton data space (CDS) frame of the data, bkg_model and the response. In other words, they should have the same cds frame (the default is "local", which implied that a local frame that attached to the spacecraft). scheme : str, optional The scheme of the CDS of data (the default is "RING", which implies a "RING" scheme of the data). """ self._data = data.project(["Em", "PsiChi", "Phi"]) self._bkg_model = bkg_model.project(["Em", "PsiChi", "Phi"]) self._orientation = orientation self._response_path = Path(response_path) self._cds_frame = cds_frame self._scheme = scheme
[docs] @staticmethod def slice_energy_channel(hist, channel_start, channel_stop): """ Slice one or more bins along first axis of the `histogram`. Parameters ---------- hist : histpy.Histogram The histogram object to be sliced. channel_start : int The start of the slice (inclusive). channel_stop : int The stop of the slice (exclusive). Returns ------- sliced_hist : histpy.Histogram The sliced histogram. """ sliced_hist = hist.slice[channel_start:channel_stop,:] return sliced_hist
[docs] @staticmethod def get_hypothesis_coords(nside, scheme = "RING", coordsys = "galactic"): """ Get a list of hypothesis coordinates. Parameters ---------- nside : int The nside of the map. scheme : str, optional The scheme of the map where the hypothesis coordinates are generated (the default is "RING", which implies the "RING" scheme is used to get the hypothesis coordinates). coordsys : str, optional The coordinate system used in the map where the hypothesis coordinates are generated (the default is "galactic", which implies the galactic coordinates system is used). Returns ------- hypothesis_coords : list The list of the hypothesis coordinates at the center of each pixel. """ data_array = np.zeros(hp.nside2npix(nside)) ts_temp = HealpixMap(data = data_array, scheme = scheme, coordsys = coordsys) hypothesis_coords = [] for i in np.arange(data_array.shape[0]): hypothesis_coords += [ts_temp.pix2skycoord(i)] return hypothesis_coords
[docs] @staticmethod def get_cds_array(hist, energy_channel): """ Get the flattened cds array from input Histogram. Parameters ----------- hist : histpy.Histogram The input Histogram. energy_channel : list The format is `[lower_channel, upper_chanel]`. The lower_channel is inclusive while the upper_channel is exclusive. Returns ------- cds_array : numpy.ndarray The flattended Compton data space (CDS) array. """ if not isinstance(hist, Histogram): raise TypeError("Please input hist must be a histpy.Histogram object.") hist_axes_labels = hist.axes.labels cds_labels = ["PsiChi", "Phi"] if not all([label in hist_axes_labels for label in cds_labels]): raise ValueError("The data doesn't contain the full Compton Data Space!") hist = hist.project(["Em", "PsiChi", "Phi"]) # make sure the first axis is the measured energy hist_cds_sliced = FastTSMap.slice_energy_channel(hist, energy_channel[0], energy_channel[1]) hist_cds = hist_cds_sliced.project(["PsiChi", "Phi"]) cds_array = np.array(hist_cds.to_dense()[:]).flatten() # here [:] is equivalent to [:, :] del hist del hist_cds_sliced del hist_cds gc.collect() return cds_array
[docs] @staticmethod def get_psr_in_galactic(hypothesis_coord, response_path, spectrum): """ Get the point source response (psr) in galactic. Please be aware that you must use a galactic response! To do: to make the weight parameter not hardcoded Parameters ---------- hypothesis_coord : astropy.coordinates.SkyCoord The hypothesis coordinate. response_path : str or path.lib.Path The path to the response. spectrum : astromodels.functions The spectrum of the source to be placed at the hypothesis coordinate. Returns ------- psr : histpy.Histogram The point source response of the spectrum at the hypothesis coordinate. """ # Open the response # Notes from Israel: Inside it contains a single histogram with all the regular axes for a Compton Data Space (CDS) analysis, in galactic coordinates. Since there is no class yet to handle it, this is how to read in the HDF5 manually. with h5.File(response_path) as f: axes_group = f['hist/axes'] axes = [] for axis in axes_group.values(): # Get class. Backwards compatible with version # with only Axis axis_cls = Axis if '__class__' in axis.attrs: class_module, class_name = axis.attrs['__class__'] axis_cls = getattr(sys.modules[class_module], class_name) axes += [axis_cls._open(axis)] axes = Axes(axes) # get the pixel number of the hypothesis coordinate map_temp = HealpixMap(base = axes[0]) hypothesis_coord_pix_number = map_temp.ang2pix(hypothesis_coord) # get the expectation for the hypothesis coordinate (a point source) with h5.File(response_path) as f: pix = hypothesis_coord_pix_number psr = PointSourceResponse(axes[1:], f['hist/contents'][pix+1], unit = f['hist'].attrs['unit']) return psr
[docs] @staticmethod def get_ei_cds_array(hypothesis_coord, energy_channel, response_path, spectrum, cds_frame, orientation = None): """ Get the expected counts in CDS in local or galactic frame. Parameters ---------- hypothesis_coord : astropy.coordinates.SkyCoord The hypothesis coordinate. energy_channel : list The format is `[lower_channel, upper_chanel]`. The lower_channel is inclusive while the upper_channel is exclusive. response_path : str or pathlib.Path The path to the response file. spectrum : astromodels.functions The spectrum of the source. cds_frame : str, optional "local" or "galactic", it's the Compton data space (CDS) frame of the data, bkg_model and the response. In other words, they should have the same cds frame. orientation : cosipy.spacecraftfile.SpacecraftFile, optional The orientation of the spacecraft when data are collected (the default is `None`, which implies the orientation file is not needed). Returns ------- cds_array : numpy.ndarray The flattended Compton data space (CDS) array. """ # check inputs, will complete later # the local and galactic frame works very differently, so we need to compuate the point source response (psr) accordingly #time_cds_start = time.time() if cds_frame == "local": if orientation == None: raise TypeError("The when the data are binned in local frame, orientation must be provided to compute the expected counts.") #time_coord_convert_start = time.time() # convert the hypothesis coord to the local frame (Spacecraft frame) hypothesis_in_sc_frame = orientation.get_target_in_sc_frame(target_name = "Hypothesis", target_coord = hypothesis_coord, quiet = True) #time_coord_convert_end = time.time() #time_coord_convert_used = time_coord_convert_end - time_coord_convert_start #logger.info(f"The time used for coordinate conversion is {time_coord_convert_used}s.") #time_dwell_start = time.time() # get the dwell time map: the map of the time spent on each pixel in the local frame dwell_time_map = orientation.get_dwell_map(response = response_path) #time_dwell_end = time.time() #time_dwell_used = time_dwell_end - time_dwell_start #logger.info(f"The time used for dwell time map is {time_dwell_used}s.") #time_psr_start = time.time() # convolve the response with the dwell_time_map to get the point source response with FullDetectorResponse.open(response_path) as response: psr = response.get_point_source_response(dwell_time_map) #time_psr_end = time.time() #time_psr_used = time_psr_end - time_psr_start #logger.info(f"The time used for psr is {time_psr_used}s.") elif cds_frame == "galactic": psr = FastTSMap.get_psr_in_galactic(hypothesis_coord = hypothesis_coord, response_path = response_path, spectrum = spectrum) else: raise ValueError("The point source response must be calculated in the local and galactic frame. Others are not supported (yet)!") # convolve the point source reponse with the spectrum to get the expected counts expectation = psr.get_expectation(spectrum) del psr gc.collect() # slice energy channals and project it to CDS ei_cds_array = FastTSMap.get_cds_array(expectation, energy_channel) del expectation gc.collect() #time_cds_end = time.time() #time_cds_used = time_cds_end - time_cds_start #logger.info(f"The time used for cds is {time_cds_used}s.") return ei_cds_array
[docs] @staticmethod def fast_ts_fit(hypothesis_coord, energy_channel, data_cds_array, bkg_model_cds_array, orientation, response_path, spectrum, cds_frame, ts_nside, ts_scheme): """ Perform a TS fit on a single location at `hypothesis_coord`. Parameters ---------- hypothesis_coord : astropy.coordinates.SkyCoord The hypothesis coordinate. energy_channel : list The format is `[lower_channel, upper_chanel]`. The lower_channel is inclusive while the upper_channel is exclusive. data_cds_array : numpy.ndarray The flattened Compton data space (CDS) array of the data. bkg_model_cds_array : numpy.ndarray The flattened Compton data space (CDS) array of the background model. orientation : cosipy.spacecraftfile.SpacecraftFile The orientation of the spacecraft when data are collected. response_path : str or pathlib.Path The path to the response file. spectrum : astromodels.functions The spectrum of the source. cds_frame : str "local" or "galactic", it's the Compton data space (CDS) frame of the data, bkg_model and the response. In other words, they should have the same cds frame . ts_nside : int The nside of the ts map. ts_scheme : str The scheme of the Ts map. Returns ------- list The list of the resulting TS fit: [pix number, ts value, norm, norm_err, failed, iterations, time_ei_cds_array, time_fit, time_fast_ts_fit] """ start_fast_ts_fit = time.time() # get the indices of the pixels to fit pix = hp.ang2pix(nside = ts_nside, theta = hypothesis_coord.l.deg, phi = hypothesis_coord.b.deg, lonlat = True) # get the expected counts in the flattened cds array start_ei_cds_array = time.time() ei_cds_array = FastTSMap.get_ei_cds_array(hypothesis_coord = hypothesis_coord, cds_frame = cds_frame, energy_channel = energy_channel, orientation = orientation, response_path = response_path, spectrum = spectrum) end_ei_cds_array = time.time() time_ei_cds_array = end_ei_cds_array - start_ei_cds_array # start the fit start_fit = time.time() fit = fnf(max_iter=1000) result = fit.solve(data_cds_array, bkg_model_cds_array, ei_cds_array) end_fit = time.time() time_fit = end_fit - start_fit end_fast_ts_fit = time.time() time_fast_ts_fit = end_fast_ts_fit - start_fast_ts_fit return [pix, result[0], result[1], result[2], result[3], result[4], time_ei_cds_array, time_fit, time_fast_ts_fit]
[docs] def parallel_ts_fit(self, hypothesis_coords, energy_channel, spectrum, ts_scheme = "RING", start_method = "fork", cpu_cores = None, ts_nside = None): """ Perform parallel computation on all the hypothesis coordinates. Parameters ---------- hypothesis_coords : list A list of the hypothesis coordinates to fit energy_channel : list the energy channel you want to use: [lower_channel, upper_channel]. lower_channel is inclusive while upper_channel is exclusive. spectrum : astromodels.functions The spectrum of the source. ts_scheme : str, optional The scheme of the TS map (the default is "RING", which implies a "RING" scheme of the TS map). start_method : str, optional The starting method of the parallel computation (the default is "fork", which implies using the fork method to start parallel computation). cpu_cores : int, optional The number of cpu cores you wish to use for the parallel computation (the default is `None`, which implies using all the available number of cores -1 to perform the parallel computation). ts_nside : int, optional The nside of the ts map. This must be given if the number of hypothesis_coords isn't equal to the number of pixels of the total ts map, which means that you fit only a portion of the total ts map. (the default is `None`, which means that you fit the full ts map). Returns ------- results : numpy.ndarray The result of the ts fit over all the hypothesis coordinates. """ # decide the ts_nside from the list of hypothesis coordinates if not given if ts_nside == None: ts_nside = hp.npix2nside(len(hypothesis_coords)) # get the flattened data_cds_array data_cds_array = FastTSMap.get_cds_array(self._data, energy_channel).flatten() bkg_model_cds_array = FastTSMap.get_cds_array(self._bkg_model, energy_channel).flatten() if (data_cds_array[bkg_model_cds_array ==0]!=0).sum() != 0: #raise ValueError("You have data!=0 but bkg=0, check your inputs!") # let's try to set the data bin to zero when the corresponding bkg bin isn't zero. # Need further investigate, why bkg = 0 but data!=0 happens? ==> it's more like an issue related to simulated data instead of code # This first happened in GRB fitting, but got fixed somehow <== I now understand it's caused by using different PsiChi binning in the same fit # But it also happened to Crab while the PsiChi binning are both galactic for Crab and the Albedo, why???? ?_? data_cds_array[bkg_model_cds_array == 0] =0 # set up the number of cores to use for the parallel computation total_cores = multiprocessing.cpu_count() if cpu_cores == None or cpu_cores >= total_cores: # if you don't specify the number of cpu cores to use or the specified number of cpu cores is the same as the total number of cores you have # it will use the [total_cores - 1] number of cores to run the parallel computation. cores = total_cores - 1 logger.info(f"You have total {total_cores} CPU cores, using {cores} CPU cores for parallel computation.") else: cores = cpu_cores logger.info(f"You have total {total_cores} CPU cores, using {cores} CPU cores for parallel computation.") start = time.time() multiprocessing.set_start_method(start_method, force = True) pool = multiprocessing.Pool(processes = cores) results = pool.starmap(FastTSMap.fast_ts_fit, product(hypothesis_coords, [energy_channel], [data_cds_array], [bkg_model_cds_array], [self._orientation], [self._response_path], [spectrum], [self._cds_frame], [ts_nside], [ts_scheme])) pool.close() pool.join() end = time.time() elapsed_seconds = end - start elapsed_minutes = elapsed_seconds/60 logger.info(f"The time used for the parallel TS map computation is {elapsed_minutes} minutes") results = np.array(results) # turn to a numpy array results = results[results[:, 0].argsort()] # arrange the order by the pixel numbering self.result_array = results # the full result array self.ts_array = results[:,1] # the ts array return results
@staticmethod def _plot_ts(ts_array, skycoord = None, containment = None, save_plot = False, save_dir = "", save_name = "ts_map.png", dpi = 300): """ Plot the containment region of the TS map. Parameters ---------- ts_array : numpy.ndarray The array of ts values from parallel ts fit. skyoord : astropy.coordinates.SkyCoord, optional The true location of the source (the default is `None`, which implies that there are no coordiantes to be printed on the TS map). containment : float, optional The containment level of the source (the default is `None`, which will plot raw TS values). save_plot : bool, optional Set `True` to save the plot (the default is `False`, which means it won't save the plot. save_dir : str or pathlib.Path, optional The directory to save the plot. save_name : str, optional The file name of the plot to be save. dpi : int, optional The dpi for plotting and saving. """ if skycoord != None: lon = skycoord.l.deg lat = skycoord.b.deg # get the ts value m_ts = ts_array # get plotting canvas fig, ax = plt.subplots(dpi=dpi) # plot the ts map with containment region if containment != None: critical = FastTSMap.get_chi_critical_value(containment = containment) percentage = containment*100 max_ts = np.max(m_ts[:]) min_ts = np.min(m_ts[:]) hp.mollview(m_ts[:], max = max_ts, min = max_ts-critical, title = f"Containment {percentage}%", coord = "G", hold = True) elif containment == None: hp.mollview(m_ts[:], coord = "G", hold = True) if skycoord != None: hp.projscatter(lon, lat, marker = "x", linewidths = 0.5, lonlat=True, coord = "G", label = f"True location at l={lon}, b={lat}", color = "fuchsia") hp.projscatter(0, 0, marker = "o", linewidths = 0.5, lonlat=True, coord = "G", color = "red") hp.projtext(350, 0, "(l=0, b=0)", lonlat=True, coord = "G", color = "red") if save_plot == True: fig.savefig(Path(save_dir)/save_name, dpi = dpi) return
[docs] def plot_ts(self, ts_array = None, skycoord = None, containment = None, save_plot = False, save_dir = "", save_name = "ts_map.png", dpi = 300): """ Plot the containment region of the TS map. Parameters ---------- skyoord : astropy.coordinates.SkyCoord, optional The true location of the source (the default is `None`, which implies that there are no coordiantes to be printed on the TS map). containment : float, optional The containment level of the source (the default is `None`, which will plot raw TS values). save_plot : bool, optional Set `True` to save the plot (the default is `False`, which means it won't save the plot. save_dir : str or pathlib.Path, optional The directory to save the plot. save_name : str, optional The file name of the plot to be save. dpi : int, optional The dpi for plotting and saving. """ if ts_array is not None: FastTSMap._plot_ts(ts_array = ts_array, skycoord = skycoord, containment = containment, save_plot = save_plot, save_dir = save_dir, save_name = save_name, dpi = dpi) else: FastTSMap._plot_ts(ts_array = self.ts_array, skycoord = skycoord, containment = containment, save_plot = save_plot, save_dir = save_dir, save_name = save_name, dpi = dpi) return
[docs] @staticmethod def get_chi_critical_value(containment = 0.90): """ Get the critical value of the chi^2 distribution based ob the confidence level. Parameters ---------- containment : float, optional The confidence level of the chi^2 distribution (the default is `0.9`, which implies that the 90% containment region). Returns ------- float The critical value corresponds to the confidence level. """ return scipy.stats.chi2.ppf(containment, df=2)
[docs] @staticmethod def show_memory_info(hint): pid = os.getpid() p = psutil.Process(pid) info = p.memory_full_info() memory = info.uss / 1024. / 1024 logger.info('{} memory used: {} MB'.format(hint, memory))