#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# This file is part of the CASCADe package which has been
# developed within the ExoplANETS-A H2020 program.
#
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright (C) 2018, 2019, 2020 Jeroen Bouwman
"""
Spitzer Observatory and Instruments specific module of the CASCADe package
"""
import os
import collections
import ast
from types import SimpleNamespace
import numpy as np
from astropy.io import fits
import astropy.units as u
from astropy.io import ascii
from astropy.convolution import Gaussian2DKernel
from astropy.convolution import Gaussian1DKernel
from astropy.stats import sigma_clipped_stats
from scipy import interpolate
from scipy.interpolate import griddata
from skimage.morphology import dilation
from skimage.morphology import square
from tqdm import tqdm
from ...initialize import cascade_configuration
from ...initialize import cascade_default_data_path
from ...initialize import cascade_default_path
from ...data_model import SpectralDataTimeSeries
from ...utilities import find, get_data_from_fits
from ..InstrumentsBaseClasses import ObservatoryBase, InstrumentBase
__all__ = ['Spitzer', 'SpitzerIRS']
[docs]
class Spitzer(ObservatoryBase):
"""
Class defining the Spitzer observatory.
This observatory class defines the instuments and data handling for the
spectropgraphs of the Spitzer Space telescope
"""
def __init__(self):
# check if cascade is initialized
if cascade_configuration.isInitialized:
# check if model is implemented and pick model
if (cascade_configuration.instrument in
self.observatory_instruments):
if cascade_configuration.instrument == 'IRS':
factory = SpitzerIRS()
self.par = factory.par
cascade_configuration.telescope_collecting_area = \
self.collecting_area
self.data = factory.data
self.spectral_trace = factory.spectral_trace
if self.par['obs_has_backgr']:
self.data_background = factory.data_background
self.instrument = factory.name
self.instrument_calibration = \
factory.instrument_calibration
cascade_configuration.instrument_dispersion_scale = \
factory.dispersion_scale
else:
raise ValueError("Spitzer instrument not recognized, \
check your init file for the following \
valid instruments: {}. Aborting loading \
instrument".format(self.valid_instruments))
else:
raise ValueError("CASCADe not initialized, \
aborting loading Observatory")
@property
def name(self):
"""Name of the observatory."""
return "SPITZER"
@property
def location(self):
"""Location of the observatory."""
return "SPACE"
@property
def collecting_area(self):
"""
Size of the collecting area of the telescope.
Returns
-------
0.5 m**2
"""
return '0.5 m2'
@property
def NAIF_ID(self):
"""NAIF_ID of the observatory."""
return -79
@property
def observatory_instruments(self):
"""All implemented instruments of the observatory."""
return{"IRS"}
[docs]
class SpitzerIRS(InstrumentBase):
"""
Class defining the IRS instrument.
This instrument class defines the properties of the IRS instrument of
the Spitzer Space Telescope.
For the instrument and observations the following valid options are
available:
- detectors : {'SL', 'LL'}
- spectral orders : {'1', '2'}
- data products : {'droop', 'COE'}
- observing mode : {'STARING', 'NODDED'}
- data type : {'SPECTRUM', 'SPECTRAL_IMAGE', 'SPECTRAL_DETECTOR_CUBE'}
"""
__valid_arrays = {'SL', 'LL'}
__valid_orders = {'1', '2'}
__valid_filters = {'SL1', 'SL2', 'LL1', 'LL2'}
__valid_data = {'SPECTRUM', 'SPECTRAL_IMAGE', 'SPECTRAL_DETECTOR_CUBE'}
__valid_observing_strategy = {'STARING', 'NODDED'}
__valid_data_products = {'droop', 'COE', 'FEPS', 'CAE'}
def __init__(self):
self.par = self.get_instrument_setup()
if self.par['obs_has_backgr']:
self.data, self.data_background = self.load_data()
else:
self.data = self.load_data()
self.spectral_trace = self.get_spectral_trace()
self._define_region_of_interest()
try:
self.instrument_calibration = self.IRS_cal
except AttributeError:
self.instrument_calibration = None
@property
def name(self):
"""Reteurn instrument name."""
return "IRS"
@property
def dispersion_scale(self):
__all_scales = {'SL1': '604.5 Angstrom', 'SL2': '371.7 Angstrom',
'LL2': '743.4 Angstrom', 'LL1': '1209.0 Angstrom'}
return __all_scales[self.par["inst_filter"]]
[docs]
def load_data(self):
"""Load data."""
if self.par["obs_data"] == 'SPECTRUM':
data = self.get_spectra()
if self.par['obs_has_backgr']:
data_back = self.get_spectra(is_background=True)
elif self.par["obs_data"] == 'SPECTRAL_IMAGE':
data = self.get_spectral_images()
if self.par['obs_has_backgr']:
data_back = self.get_spectral_images(is_background=True)
elif self.par["obs_data"] == 'SPECTRAL_DETECTOR_CUBE':
data = self.get_detector_cubes()
if self.par['obs_has_backgr']:
data_back = self.get_detector_cubes(is_background=True)
if self.par['obs_has_backgr']:
return data, data_back
else:
return data
[docs]
def get_instrument_setup(self):
"""Retrieve all parameters defining the instrument and data setup."""
inst_obs_name = cascade_configuration.instrument_observatory
inst_inst_name = cascade_configuration.instrument
inst_filter = cascade_configuration.instrument_filter
obj_period = \
u.Quantity(cascade_configuration.object_period).to(u.day)
obj_period = obj_period.value
obj_ephemeris = \
u.Quantity(cascade_configuration.object_ephemeris).to(u.day)
obj_ephemeris = obj_ephemeris.value
obs_mode = cascade_configuration.observations_mode
obs_data = cascade_configuration.observations_data
obs_path = cascade_configuration.observations_path
if not os.path.isabs(obs_path):
obs_path = os.path.join(cascade_default_data_path, obs_path)
obs_cal_path = cascade_configuration.observations_cal_path
if not os.path.isabs(obs_cal_path):
obs_cal_path = os.path.join(cascade_default_path,
obs_cal_path)
obs_cal_version = cascade_configuration.observations_cal_version
obs_data_product = cascade_configuration.observations_data_product
obs_id = cascade_configuration.observations_id
obs_target_name = cascade_configuration.observations_target_name
obs_has_backgr = ast.literal_eval(cascade_configuration.
observations_has_background)
if obs_has_backgr:
obs_backgr_id = cascade_configuration.observations_background_id
obs_backgr_target_name = \
cascade_configuration.observations_background_name
# cpm
try:
cpm_ncut_first_int = \
cascade_configuration.cpm_ncut_first_integrations
cpm_ncut_first_int = ast.literal_eval(cpm_ncut_first_int)
except AttributeError:
cpm_ncut_first_int = 0
if not (obs_data in self.__valid_data):
raise ValueError("Data type not recognized, "
"check your init file for the following "
"valid types: {}. "
"Aborting loading data".format(self.__valid_data))
if not (obs_mode in self.__valid_observing_strategy):
raise ValueError("Observational stategy not recognized, "
"check your init file for the following "
"valid types: {}. Aborting loading "
"data".format(self.__valid_observing_strategy))
if not (inst_filter in self.__valid_filters):
raise ValueError("Spectral filter not recognized, "
"check your init file for the following "
"valid types: {}. Aborting loading "
"data".format(self.__valid_filters))
inst_mode = inst_filter[0:2]
inst_order = inst_filter[2]
if not (inst_mode in self.__valid_arrays):
raise ValueError("Instrument mode not recognized, "
"check your init file for the following "
"valid types: {}. Aborting "
"loading data".format(self.__valid_arrays))
if not (inst_order in self.__valid_orders):
raise ValueError("Spectral order not recognized, "
"check your init file for the following "
"valid types: {}. Aborting loading "
"data".format(self.__valid_orders))
if not (obs_data_product in self.__valid_data_products):
raise ValueError("Data product not recognized, "
"check your init file for the following "
"valid types: {}. Aborting loading "
"data".format(self.__valid_data_products))
par = collections.OrderedDict(inst_obs_name=inst_obs_name,
inst_inst_name=inst_inst_name,
inst_filter=inst_filter,
inst_mode=inst_mode,
inst_order=inst_order,
obj_period=obj_period,
obj_ephemeris=obj_ephemeris,
obs_mode=obs_mode,
obs_data=obs_data,
obs_path=obs_path,
obs_cal_path=obs_cal_path,
obs_data_product=obs_data_product,
obs_cal_version=obs_cal_version,
obs_id=obs_id,
obs_target_name=obs_target_name,
obs_has_backgr=obs_has_backgr,
cpm_ncut_first_int=cpm_ncut_first_int)
if obs_has_backgr:
par.update({'obs_backgr_id': obs_backgr_id})
par.update({'obs_backgr_target_name': obs_backgr_target_name})
return par
[docs]
def get_spectra(self, is_background=False):
"""
Get the 1D spectra.
This function combines all functionallity to read fits files
containing the (uncalibrated) spectral timeseries, including
orbital phase and wavelength information
Parameters
----------
is_background : `bool`
if `True` the data represents an observaton of the IR background
to be subtracted of the data of the transit spectroscopy target.
Returns
-------
SpectralTimeSeries : `cascade.data_model.SpectralDataTimeSeries`
Instance of `SpectralDataTimeSeries` containing all spectroscopic
data including uncertainties, time, wavelength and bad pixel mask.
Raises
------
AssertionError, KeyError
Raises an error if no data is found or if certain expected
fits keywords are not present in the data files.
"""
# get data files
if is_background:
target_name = self.par['obs_backgr_target_name']
else:
target_name = self.par['obs_target_name']
path_to_files = os.path.join(self.par['obs_path'],
self.par['inst_obs_name'],
self.par['inst_inst_name'],
target_name,
'SPECTRA/')
data_files = find('*' + self.par['obs_id'] + '*' +
self.par['obs_data_product']+'.fits', path_to_files)
# number of integrations
nintegrations = len(data_files)
if nintegrations < 2:
raise AssertionError("No Timeseries data found in dir " +
path_to_files)
data_list = ['LAMBDA', 'FLUX', 'FERROR', 'MASK']
auxilary_list = ["POSITION", "MEDPOS", "PUNIT", "MPUNIT", "TIME_BJD",
"DISP_POS", "ANGLE", "SCALE", "DPUNIT", "AUNIT",
"SUNIT"]
data_dict, auxilary_dict = \
get_data_from_fits(data_files, data_list, auxilary_list)
if (not auxilary_dict['TIME_BJD']['flag']):
raise KeyError("No TIME_BJD keyword found in fits files")
wavelength_data = np.array(data_dict['LAMBDA']['data']).T
wave_unit = data_dict['LAMBDA']['data'][0].unit
spectral_data = np.array(data_dict['FLUX']['data']).T
flux_unit = data_dict['FLUX']['data'][0].unit
uncertainty_spectral_data = np.array(data_dict['FERROR']['data']).T
if data_dict['MASK']['flag']:
mask = np.array(data_dict['MASK']['data']).T
else:
mask = np.zeros_like(spectral_data, dtype=bool)
if auxilary_dict['TIME_BJD']['flag']:
time = np.array(auxilary_dict['TIME_BJD']['data']) * u.day
phase = (time.value - self.par['obj_ephemeris']) / \
self.par['obj_period']
phase = phase - int(np.max(phase))
if np.max(phase) < 0.0:
phase = phase + 1.0
position = np.array(auxilary_dict['POSITION']['data'])
posUnit = auxilary_dict['PUNIT']['data_unit']
angle = np.array(auxilary_dict['ANGLE']['data'])
angleUnit = auxilary_dict['AUNIT']['data_unit']
scaling = np.array(auxilary_dict['SCALE']['data'])
scaleUnit = auxilary_dict['SUNIT']['data_unit']
dispersion_position = np.array(auxilary_dict['DISP_POS']['data'])
dispPosUnit = auxilary_dict['DPUNIT']['data_unit']
idx = np.argsort(time)[self.par["cpm_ncut_first_int"]:]
time = time[idx]
spectral_data = spectral_data[:, idx]
uncertainty_spectral_data = uncertainty_spectral_data[:, idx]
wavelength_data = wavelength_data[:, idx]
mask = mask[:, idx]
data_files = [data_files[i] for i in idx]
phase = phase[idx]
position = position[idx]
angle = angle[idx]
scaling = scaling[idx]
dispersion_position = dispersion_position[idx]
SpectralTimeSeries = \
SpectralDataTimeSeries(wavelength=wavelength_data,
wavelength_unit=wave_unit,
data=spectral_data,
data_unit=flux_unit,
uncertainty=uncertainty_spectral_data,
time=phase,
time_unit=u.dimensionless_unscaled,
mask=mask,
time_bjd=time,
position=position,
position_unit=posUnit,
isRampFitted=True,
isNodded=False,
target_name=target_name,
dataProduct=self.par['obs_data_product'],
dataFiles=data_files)
# Standardize signal to mean value.
mean_signal, _, _ = \
sigma_clipped_stats(SpectralTimeSeries.return_masked_array("data"),
sigma=3, maxiters=10)
data_unit = u.Unit(mean_signal*SpectralTimeSeries.data_unit)
SpectralTimeSeries.data_unit = data_unit
SpectralTimeSeries.wavelength_unit = u.micron
SpectralTimeSeries.add_measurement(
disp_position=dispersion_position,
disp_position_unit=dispPosUnit,
angle=angle,
angle_unit=angleUnit,
scale=scaling,
scale_unit=scaleUnit)
self._define_convolution_kernel()
return SpectralTimeSeries
[docs]
def get_spectral_images(self, is_background=False):
"""
Get the 2D spectral images.
This function combines all functionallity to read fits files
containing the (uncalibrated) spectral timeseries, including
orbital phase and wavelength information
Parameters
----------
is_background : `bool`
if `True` the data represents an observaton of the IR background
to be subtracted of the data of the transit spectroscopy target.
Returns
-------
SpectralTimeSeries : `cascade.data_model.SpectralDataTimeSeries`
Instance of `SpectralDataTimeSeries` containing all spectroscopic
data including uncertainties, time, wavelength and bad pixel mask.
Raises
------
AssertionError, KeyError
Raises an error if no data is found or if certain expected
fits keywords are not present in the data files.
Notes
-----
Notes on FOV:
in the fits header the following relevant info is used:
- FOVID 26 IRS_Short-Lo_1st_Order_1st_Position
- FOVID 27 IRS_Short-Lo_1st_Order_2nd_Position
- FOVID 28 IRS_Short-Lo_1st_Order_Center_Position
- FOVID 29 IRS_Short-Lo_Module_Center
- FOVID 32 IRS_Short-Lo_2nd_Order_1st_Position
- FOVID 33 IRS_Short-Lo_2nd_Order_2nd_Position
- FOVID 34 IRS_Short-Lo_2nd_Order_Center_Position
- FOVID 40 IRS_Long-Lo_1st_Order_Center_Position
- FOVID 46 IRS_Long-Lo_2nd_Order_Center_Position
Notes on timing:
- FRAMTIME the total effective exposure time (ramp length)
in seconds
"""
# order mask
mask = self._get_order_mask()
# get data files
if is_background:
obsid = self.par['obs_backgr_id']
target_name = self.par['obs_backgr_target_name']
else:
obsid = self.par['obs_id']
target_name = self.par['obs_target_name']
path_to_files = os.path.join(self.par['obs_path'],
self.par['inst_obs_name'],
self.par['inst_inst_name'],
target_name,
'SPECTRAL_IMAGES/')
data_files = find('*' + obsid + '*_' +
self.par['obs_data_product']+'.fits', path_to_files)
# number of integrations
nintegrations = len(data_files)
if nintegrations < 2:
raise AssertionError("No Timeseries data found in dir " +
path_to_files)
# read in the first image and get information on the observations
# get the size of the spectral images
image_file = data_files[0]
spectral_image = fits.getdata(image_file, ext=0)
npix, mpix = spectral_image.shape
# get frametime from fits header (duration of ramp in sec)
framtime = fits.getval(image_file, "FRAMTIME", ext=0)
# get the FOVID from the header and check for nodded observations
try:
fovid = fits.getval(image_file, "FOVID", ext=0)
except KeyError:
print("FOVID not set in fits files")
print("Using 'observations_mode' parameter instead")
if self.par['obs_mode'] == "STARING":
fovid = 28
else:
fovid = 26
if (fovid != 28) and (fovid != 34) and (fovid != 40) and (fovid != 46):
isNodded = True
nodOffset = -5.0 * u.pix
else:
isNodded = False
nodOffset = 0.0 * u.pix
# get the unit of the spectral images
try:
flux_unit_string = fits.getval(image_file, "BUNIT", ext=0)
flux_unit_string = flux_unit_string.replace("e-", "electron")
flux_unit_string = flux_unit_string.replace("sec", "s")
flux_unit = u.Unit(flux_unit_string)
except KeyError:
print("No flux unit set in fits files")
flux_unit = u.dimensionless_unscaled
# define mask and fill with data from order mask
mask = np.tile(mask.T, (nintegrations, 1, 1)).T
# get the data
image_cube = np.zeros((npix, mpix, nintegrations))
image_unc_cube = np.zeros((npix, mpix, nintegrations))
image_dq_cube = np.zeros((npix, mpix, nintegrations))
time = np.zeros((nintegrations))
for im, image_file in enumerate(tqdm(data_files, dynamic_ncols=True)):
# WARNING fits data is single precision!!
spectral_image = fits.getdata(image_file, ext=0)
image_cube[:, :, im] = spectral_image
time[im] = fits.getval(image_file, "BMJD_OBS", ext=0)
unc_image = fits.getdata(image_file.replace("droop.fits",
"drunc.fits"), ext=0)
image_unc_cube[:, :, im] = unc_image
dq_image = fits.getdata(image_file.replace("droop.fits",
"bmask.fits"), ext=0)
image_dq_cube[:, :, im] = dq_image
data = image_cube * flux_unit
uncertainty = image_unc_cube * flux_unit
npix, mpix, nintegrations = data.shape
mask_dq = image_dq_cube > int('10000000', 2)
mask = np.logical_or(mask, mask_dq)
# The time in the spitzer fits header is -2400000.5 and
# it is the time at the start of the ramp
# As we are using fitted ramps,
# shift time by half ramp of length framtime
time = (time + 2400000.5) + (0.50*framtime) / (24.0*3600.0) # in days
# orbital phase
phase = (time - self.par['obj_ephemeris']) / self.par['obj_period']
phase = phase - np.int(np.max(phase))
if np.max(phase) < 0.0:
phase = phase + 1.0
self._define_convolution_kernel()
# wavelength calibration
wave_cal = self._get_wavelength_calibration(npix, nodOffset)
# reverse spectral data to make sure the shortest wavelengths are
# at the first image row
SpectralTimeSeries = \
SpectralDataTimeSeries(wavelength=wave_cal[::-1, ...],
data=data[::-1, ...],
time=phase,
uncertainty=uncertainty[::-1, ...],
mask=mask[::-1, ...],
time_bjd=time,
isRampFitted=True,
isNodded=isNodded,
target_name=target_name,
dataProduct=self.par['obs_data_product'],
dataFiles=data_files)
return SpectralTimeSeries
[docs]
def _define_convolution_kernel(self):
"""
Define the instrument specific convolution kernel.
This function defines an instrument specific convolution kernel
which will be used in the correction procedure of bad pixels.
"""
if self.par["obs_data"] == 'SPECTRUM':
kernel = Gaussian1DKernel(2.2, x_size=13)
else:
kernel = Gaussian2DKernel(x_stddev=0.2, y_stddev=2.2, theta=0.076,
x_size=5, y_size=13)
try:
self.IRS_cal
except AttributeError:
self.IRS_cal = SimpleNamespace()
finally:
self.IRS_cal.convolution_kernel = kernel
return
[docs]
def _define_region_of_interest(self):
"""
Define region on detector.
This functon defines the region of interest which containes
the intended target star. It defines a mask such that all data flagged
or of no interest for the data calibraiton and spectral extraction.
"""
dim = self.data.data.shape
if len(dim) <= 2:
roi = np.zeros((dim[0]), dtype=bool)
roi[0] = True
roi[-1] = True
else:
roi = self._get_order_mask()
selem = square(1)
roi = dilation(roi, selem)
roi[:, 0:1] = True
roi[:, -1:] = True
roi = roi[::-1, ...]
try:
self.IRS_cal
except AttributeError:
self.IRS_cal = SimpleNamespace()
finally:
self.IRS_cal.roi = roi
return
[docs]
def _get_order_mask(self):
"""
Get the order mask.
This functions gets the mask which defines the pixels used
with a given spectral order
"""
# order mask
order_mask_file_name = \
os.path.join(self.par['obs_cal_path'],
self.par['inst_obs_name'],
self.par['inst_inst_name'],
self.par['obs_cal_version'],
'IRSX_'+self.par['inst_mode']+'_' +
self.par['obs_cal_version']+'_cal.omask.fits')
order_masks = fits.getdata(order_mask_file_name, ext=0)
if self.par['inst_order'] == '1':
mask = np.ones(shape=order_masks.shape, dtype=bool)
mask[order_masks == 1] = False
# as there are often problems at the edge of the detector array,
# cut first and last row
row_check = np.all(mask, axis=1)
idx_row = np.arange(mask.shape[0])
# remove last row
mask[idx_row[np.logical_not(row_check)][-1], :] = True
# remove first row
mask[idx_row[np.logical_not(row_check)][0], :] = True
# remove bad part of LL1
if self.par['inst_mode'] == 'LL':
mask[idx_row[np.logical_not(row_check)][:50], :] = True
mask[idx_row[np.logical_not(row_check)][-5:], :] = True
elif (self.par['inst_order'] == '2') or \
(self.par['inst_order'] == '3'):
# SL2 or LL2
mask1 = np.ones(shape=order_masks.shape, dtype=bool)
mask1[order_masks == 2] = False
# as there are often problems at the edge of the detector array,
# cut first and last row
row_check = np.all(mask1, axis=1)
idx_row = np.arange(mask1.shape[0])
# remove last row
mask1[idx_row[np.logical_not(row_check)][-1], :] = True
# remove first row
mask1[idx_row[np.logical_not(row_check)][0], :] = True
if self.par['inst_mode'] == 'LL':
mask1[idx_row[np.logical_not(row_check)][79:83], :] = True
mask1[idx_row[np.logical_not(row_check)][:5], :] = True
# SL3 or LL3
mask2 = np.ones(shape=order_masks.shape, dtype=bool)
mask2[order_masks == 3] = False
# as there are often problems at the edge of the detector array,
# cut first and last row
row_check = np.all(mask2, axis=1)
idx_row = np.arange(mask2.shape[0])
# remove last row
mask2[idx_row[np.logical_not(row_check)][-1], :] = True
# remove first row
mask2[idx_row[np.logical_not(row_check)][0], :] = True
if self.par['inst_mode'] == 'LL':
mask2[idx_row[np.logical_not(row_check)][:4], :] = True
mask2[idx_row[np.logical_not(row_check)][-2:], :] = True
mask = np.logical_and(mask1, mask2)
return mask
[docs]
def _get_wavelength_calibration(self, numberOfWavelengthPixels, nodOffset):
"""
Get wavelength calibration file.
Parameters
----------
numberOfWavelengthPixels : 'int'
nodOffset : 'float'
"""
wavelength_unit = u.micron
wave_pixel_grid = np.arange(numberOfWavelengthPixels) * u.pix
if self.par["obs_data"] == 'SPECTRUM':
position_pixel_grid = np.zeros_like(wave_pixel_grid)
spectral_trace = \
collections.OrderedDict(wavelength_pixel=wave_pixel_grid,
positional_pixel=position_pixel_grid,
wavelength=self.data.wavelength.
data[:, 0])
return spectral_trace
wave_cal_name = \
os.path.join(self.par['obs_cal_path'],
self.par['inst_obs_name'],
self.par['inst_inst_name'],
self.par['obs_cal_version'],
'IRSX_'+self.par['inst_mode']+'_' +
self.par['obs_cal_version']+'_cal.wavsamp.tbl')
wavesamp = ascii.read(wave_cal_name)
order = wavesamp['order']
spatial_pos = wavesamp['x_center']
wavelength_pos = wavesamp['y_center']
wavelength = wavesamp['wavelength']
x0 = wavesamp['x0']
x1 = wavesamp['x1']
x2 = wavesamp['x2']
x3 = wavesamp['x3']
y0 = wavesamp['y0']
y1 = wavesamp['y1']
y2 = wavesamp['y2']
y3 = wavesamp['y3']
if self.par['inst_order'] == '1':
idx = np.where(order == 1)
elif (self.par['inst_order'] == '2') or \
(self.par['inst_order'] == '3'):
idx = np.where((order == 2) | (order == 3))
spatial_pos = spatial_pos[idx].data - 0.5
wavelength_pos = wavelength_pos[idx].data - 0.5
wavelength = wavelength[idx].data
x0 = x0[idx].data - 0.5
x1 = x1[idx].data - 0.5
x2 = x2[idx].data - 0.5
x3 = x3[idx].data - 0.5
y0 = y0[idx].data - 0.5
y1 = y1[idx].data - 0.5
y2 = y2[idx].data - 0.5
y3 = y3[idx].data - 0.5
spatial_pos_left = (x1+x2)/2
spatial_pos_right = (x0+x3)/2
wavelength_pos_left = (y1+y2)/2
wavelength_pos_right = (y0+y3)/2
f = interpolate.interp1d(wavelength_pos, spatial_pos,
fill_value='extrapolate')
spatial_pos_interpolated = f(wave_pixel_grid.value) * u.pix
f = interpolate.interp1d(wavelength_pos, wavelength,
fill_value='extrapolate')
wavelength_interpolated = f(wave_pixel_grid.value) * wavelength_unit
f = interpolate.interp1d(wavelength_pos_left, spatial_pos_left,
fill_value='extrapolate')
spatial_pos_left_interpolated = f(wave_pixel_grid.value) * u.pix
f = interpolate.interp1d(wavelength_pos_left, wavelength,
fill_value='extrapolate')
wavelength_left_interpolated = \
f(wave_pixel_grid.value) * wavelength_unit
f = interpolate.interp1d(wavelength_pos_right, spatial_pos_right,
fill_value='extrapolate')
spatial_pos_right_interpolated = f(wave_pixel_grid.value) * u.pix
f = interpolate.interp1d(wavelength_pos_right, wavelength,
fill_value='extrapolate')
wavelength_right_interpolated = \
f(wave_pixel_grid.value) * wavelength_unit
grid_x = np.hstack([spatial_pos_left_interpolated.value,
spatial_pos_interpolated.value,
spatial_pos_right_interpolated.value])
grid_y = np.hstack([wave_pixel_grid.value,
wave_pixel_grid.value, wave_pixel_grid.value])
points = np.array([grid_y, grid_x]).T
values = np.hstack([wavelength_left_interpolated.value,
wavelength_interpolated.value,
wavelength_right_interpolated.value])
corrected_spatial_pos = spatial_pos_interpolated+nodOffset
wave_cal = griddata(points, values,
(wave_pixel_grid.value,
corrected_spatial_pos.value),
method='cubic') * wavelength_unit
return wave_cal
[docs]
def get_detector_cubes(self, is_background=False):
"""
Get 3D detector cubes.
This function combines all functionallity to read fits files
containing the (uncalibrated) detector cubes (detector data
on ramp level) timeseries, including
orbital phase and wavelength information
Parameters
----------
is_background : `bool`
if `True` the data represents an observaton of the IR background
to be subtracted of the data of the transit spectroscopy target.
Returns
-------
SpectralTimeSeries : `cascade.data_model.SpectralDataTimeSeries`
Instance of `SpectralDataTimeSeries` containing all spectroscopic
data including uncertainties, time, wavelength and bad pixel mask.
Raises
------
AssertionError, KeyError
Raises an error if no data is found or if certain expected
fits keywords are not present in the data files.
Notes
-----
Notes on timing in header:
There are several integration-time-related keywords.
Of greatest interest to the observer is the
“effective integration time”, which is the time on-chip between
the first and last non-destructive reads for each pixel. It is called:
RAMPTIME = Total integration time for the current DCE.
The value of RAMPTIME gives the usable portion of the integration ramp,
occurring between the beginning of the first read and the end of the
last read. It excludes detector array pre-conditioning time.
It may also be of interest to know the exposure time at other points
along the ramp. The SUR sequence consists of the time taken at the
beginning of a SUR sequence to condition the array
(header keyword DEADTIME), the time taken to complete one read and
one spin through the array (GRPTIME), and the non-destructive reads
separated by uniform wait times. The wait consists of “clocking”
through the array without reading or resetting. The time it takes to
clock through the array once is given by the SAMPTIME keyword.
So, for an N-read ramp:
RAMPTIME = 2x(N-1)xSAMPTIME
and
DCE duration = DEADTIME + GRPTIME + RAMPTIME
Note that peak-up data is not obtained in SUR mode. It is obtained in
Double Correlated Sampling (DCS) mode. In that case, RAMPTIME gives the
time interval between the 2nd sample and the preceeding reset.
"""
# order mask
mask = self._get_order_mask()
# get data files
if is_background:
obsid = self.par['obs_backgr_id']
target_name = self.par['obs_backgr_target_name']
else:
obsid = self.par['obs_id']
target_name = self.par['obs_target_name']
if self.par['inst_mode'] == 'SL':
path_to_files = self.par['obs_path'] + \
target_name + '/IRSX/' + \
self.par['obs_cal_version']+'/bcd/ch0/'
data_files = find('SPITZER_S0*'+obsid +
'*lnz.fits', path_to_files)
else:
path_to_files = self.par['obs_path'] + \
target_name + '/IRSX/' + \
self.par['obs_cal_version']+'/bcd/ch2/'
data_files = find('SPITZER_S2*'+obsid +
'*lnz.fits', path_to_files)
# number of integrations
nintegrations = len(data_files)
if nintegrations < 2:
raise AssertionError("No Timeseries data found in dir " +
path_to_files)
# get the size of the spectral images
# In the spitzer IRS detector cubes, the first axis is
# the number of frames. To get it in the proper data format
# we need to move the frames axis to the back.
image_file = data_files[0]
# WARNING fits data is single precision!!
spectral_image = np.moveaxis(fits.getdata(image_file, ext=0), 0, -1)
# wavelength, spatial, frames axis
npix, mpix, nframes = spectral_image.shape
# get frametime from fits header (duration of ramp in sec)
framtime = fits.getval(image_file, "FRAMTIME", ext=0)
# get deadtime etc. from fits header
# deadtime = fits.getval(image_file, "DEADTIME", ext=0)
samptime = fits.getval(image_file, "SAMPTIME", ext=0)
# get the FOVID from the header and check for nodded observations
try:
fovid = fits.getval(image_file, "FOVID", ext=0)
except KeyError:
print("FOVID not set in fits files")
print("Using 'observations_mode' parameter instead")
if self.par['obs_mode'] == "STARING":
fovid = 28
else:
fovid = 26
if (fovid != 28) and (fovid != 34) and (fovid != 40) and (fovid != 46):
isNodded = True
nodOffset = -5.0 * u.pix
else:
isNodded = False
nodOffset = 0.0 * u.pix
# get the unit of the spectral images
try:
flux_unit_string = fits.getval(image_file, "BUNIT", ext=0)
flux_unit_string = flux_unit_string.replace("e-", "electron")
flux_unit_string = flux_unit_string.replace("sec", "s")
flux_unit = u.Unit(flux_unit_string)
except KeyError:
print("No flux unit set in fits files")
flux_unit = u.dimensionless_unscaled
# define mask and fill with data from order mask
mask = np.tile(mask.T, (nintegrations, nframes, 1, 1)).T
# get the data
# make sure time is last axis
image_cube = np.zeros((npix, mpix, nframes, nintegrations))
time = np.zeros((nintegrations))
for im, image_file in enumerate(tqdm(data_files, dynamic_ncols=True)):
# WARNING fits data is single precision!!
spectral_image = \
np.moveaxis(fits.getdata(image_file, ext=0), 0, -1)
image_cube[:, :, :, im] = spectral_image
time[im] = fits.getval(image_file, "BMJD_OBS", ext=0)
image_cube = np.diff(image_cube, axis=2)
mask = mask[:, :, :-1, :]
flux_unit = flux_unit/(2.0*samptime*u.s)
data = image_cube * flux_unit
npix, mpix, nframes, nintegrations = data.shape
# The time in the spitzer fits header is -2400000.5 and
# it is the time at the start of the ramp
# As we are using fitted ramps,
# shift time by half ramp of length framtime
time = (time + 2400000.5) + (0.50*framtime) / (24.0*3600.0) # in days
# orbital phase
phase = (time - self.par['obj_ephemeris']) / self.par['obj_period']
phase = phase - np.int(np.max(phase))
if np.max(phase) < 0.0:
phase = phase + 1.0
# adjust time stamp for each sample up the ramp
phase = np.tile(phase, (nframes, 1))
time_shift = (np.arange(nframes)*2.0*samptime) - (0.50*framtime)
time_shift = np.tile(time_shift, (nintegrations, 1)).T
time_shift = time_shift / (24.0*3600.0) / self.par['obj_period']
phase = phase + time_shift
self._define_convolution_kernel()
# wavelength calibration
wave_cal = self._get_wavelength_calibration(npix, nodOffset)
SpectralTimeSeries = SpectralDataTimeSeries(wavelength=wave_cal,
data=data, time=phase,
mask=mask,
isRampFitted=False,
isNodded=isNodded)
return SpectralTimeSeries
[docs]
def get_spectral_trace(self):
"""Get spectral trace."""
dim = self.data.data.shape
wavelength_unit = self.data.wavelength_unit
wave_pixel_grid = np.arange(dim[0]) * u.pix
if self.par["obs_data"] == 'SPECTRUM':
position_pixel_grid = np.zeros_like(wave_pixel_grid)
spectral_trace = \
collections.OrderedDict(wavelength_pixel=wave_pixel_grid,
positional_pixel=position_pixel_grid,
wavelength=self.data.wavelength.
data[:, 0])
return spectral_trace
wave_cal_name = \
os.path.join(self.par['obs_cal_path'],
self.par['inst_obs_name'],
self.par['inst_inst_name'],
self.par['obs_cal_version'],
'IRSX_'+self.par['inst_mode']+'_' +
self.par['obs_cal_version']+'_cal.wavsamp.tbl')
wavesamp = ascii.read(wave_cal_name)
order = wavesamp['order']
isNodded = self.data.isNodded
if isNodded:
nodOffset = -5.0 * u.pix
else:
nodOffset = 0.0 * u.pix
spatial_pos = wavesamp['x_center']
wavelength_pos = wavesamp['y_center']
wavelength = wavesamp['wavelength']
x0 = wavesamp['x0']
x1 = wavesamp['x1']
x2 = wavesamp['x2']
x3 = wavesamp['x3']
y0 = wavesamp['y0']
y1 = wavesamp['y1']
y2 = wavesamp['y2']
y3 = wavesamp['y3']
if self.par['inst_order'] == '1':
idx = np.where(order == 1)
elif (self.par['inst_order'] == '2') or \
(self.par['inst_order'] == '3'):
idx = np.where((order == 2) | (order == 3))
spatial_pos = spatial_pos[idx].data - 0.5
wavelength_pos = wavelength_pos[idx].data - 0.5
wavelength = wavelength[idx].data
x0 = x0[idx].data - 0.5
x1 = x1[idx].data - 0.5
x2 = x2[idx].data - 0.5
x3 = x3[idx].data - 0.5
y0 = y0[idx].data - 0.5
y1 = y1[idx].data - 0.5
y2 = y2[idx].data - 0.5
y3 = y3[idx].data - 0.5
spatial_pos_left = (x1+x2)/2
spatial_pos_right = (x0+x3)/2
wavelength_pos_left = (y1+y2)/2
wavelength_pos_right = (y0+y3)/2
f = interpolate.interp1d(wavelength_pos, spatial_pos,
fill_value='extrapolate')
spatial_pos_interpolated = f(wave_pixel_grid.value) * u.pix
f = interpolate.interp1d(wavelength_pos, wavelength,
fill_value='extrapolate')
wavelength_interpolated = f(wave_pixel_grid.value) * wavelength_unit
f = interpolate.interp1d(wavelength_pos_left, spatial_pos_left,
fill_value='extrapolate')
spatial_pos_left_interpolated = f(wave_pixel_grid.value) * u.pix
f = interpolate.interp1d(wavelength_pos_left, wavelength,
fill_value='extrapolate')
wavelength_left_interpolated = \
f(wave_pixel_grid.value) * wavelength_unit
f = interpolate.interp1d(wavelength_pos_right, spatial_pos_right,
fill_value='extrapolate')
spatial_pos_right_interpolated = f(wave_pixel_grid.value) * u.pix
f = interpolate.interp1d(wavelength_pos_right, wavelength,
fill_value='extrapolate')
wavelength_right_interpolated = \
f(wave_pixel_grid.value) * wavelength_unit
grid_x = np.hstack([spatial_pos_left_interpolated.value,
spatial_pos_interpolated.value,
spatial_pos_right_interpolated.value])
grid_y = np.hstack([wave_pixel_grid.value,
wave_pixel_grid.value, wave_pixel_grid.value])
points = np.array([grid_y, grid_x]).T
values = np.hstack([wavelength_left_interpolated.value,
wavelength_interpolated.value,
wavelength_right_interpolated.value])
corrected_spatial_pos = spatial_pos_interpolated+nodOffset
corrected_wavelengths = griddata(points, values,
(wave_pixel_grid.value,
corrected_spatial_pos.value),
method='cubic') * wavelength_unit
spectral_trace = collections.OrderedDict(
wavelength_pixel=wave_pixel_grid,
positional_pixel=corrected_spatial_pos[::-1],
wavelength=corrected_wavelengths[::-1])
return spectral_trace