Source code for cascade.instruments.HST.HST

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# This file is part of the CASCADe package which has been
# developed within the ExoplANETS-A H2020 program.
#
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# Copyright (C) 2018, 2019, 2021  Jeroen Bouwman
"""HST Observatory and Instruments specific module of the CASCADe package."""

import os
import collections
import ast
from types import SimpleNamespace
import gc
import warnings
import numpy as np
import pandas as pd
from tqdm import tqdm
from astropy.io import fits
import astropy.units as u
from astropy.time import Time
from astropy import coordinates as coord
from astropy.wcs import WCS
from astropy.stats import sigma_clipped_stats
from astropy.convolution import Gaussian2DKernel
from astropy.convolution import Gaussian1DKernel
from photutils.detection import IRAFStarFinder
from scipy.optimize import nnls

from ...initialize import cascade_configuration
from ...initialize import cascade_default_data_path
from ...initialize import cascade_default_path
from ...data_model import SpectralDataTimeSeries
from ...utilities import find
from ..InstrumentsBaseClasses import ObservatoryBase, InstrumentBase

__all__ = ['HST', 'HSTWFC3']


[docs]class HST(ObservatoryBase): """ Class defining HST observatory. This observatory class defines the instuments and data handling for the spectropgraphs of the Hubble Space telescope. """ def __init__(self): # check if cascade is initialized if cascade_configuration.isInitialized: # check if model is implemented and pick model if (cascade_configuration.instrument in self.observatory_instruments): if cascade_configuration.instrument == 'WFC3': factory = HSTWFC3() self.par = factory.par cascade_configuration.telescope_collecting_area = \ self.collecting_area self.data = factory.data self.spectral_trace = factory.spectral_trace if self.par['obs_has_backgr']: self.data_background = factory.data_background self.instrument = factory.name self.instrument_calibration = \ factory.instrument_calibration cascade_configuration.instrument_dispersion_scale = \ factory.dispersion_scale else: raise ValueError("HST instrument not recognized, " "check your init file for the following " "valid instruments: {}. Aborting loading " "instrument".format(self.observatory_instruments)) else: raise ValueError("CASCADe not initialized, \ aborting loading Observatory") @property def name(self): """ Name of observatory. Returns 'HST' """ return "HST" @property def location(self): """ Location of observatory. Returns 'SPACE' """ return "SPACE" @property def collecting_area(self): """ Size of the collecting area of the telescope. Returns ------- 4.525 m**2 """ return '4.525 m2' @property def NAIF_ID(self): """ NAIF ID of observatory. Returns -48 for HST observatory """ return -48 @property def observatory_instruments(self): """ Instruments of the HST observatory usable with CASCADe. Returns ------- {'WFC3'} """ return{"WFC3"}
[docs]class HSTWFC3(InstrumentBase): """ Defines the WFC3 instrument. This instrument class defines the properties of the WFC3 instrument of the Hubble Space Telescope For the instrument and observations the following valid options are available: - detector subarrays : {'IRSUB128', 'IRSUB256', 'IRSUB512', 'GRISM128', 'GRISM256', 'GRISM512', 'GRISM1024'} - spectroscopic filters : {'G141', 'G102'} - imaging filters : {'F139M', 'F132N', 'F167N', 'F126N', 'F130N', 'F140W'} - data type : {'SPECTRUM', 'SPECTRAL_IMAGE', 'SPECTRAL_CUBE'} - observing strategy : {'STARING'} - data products : {'SPC', 'flt', 'COE'} """ __valid_sub_array = {'IRSUB128', 'IRSUB256', 'IRSUB512', 'GRISM128', 'GRISM256', 'GRISM512', 'GRISM1024'} __valid_spectroscopic_filter = {'G141', 'G102'} __valid_imaging_filter = {'F139M', 'F132N', 'F167N', 'F126N', 'F130N', 'F140W'} __valid_beams = {'A'} __valid_data = {'SPECTRUM', 'SPECTRAL_IMAGE', 'SPECTRAL_CUBE'} __valid_observing_strategy = {'STARING', 'SCANNING'} __valid_data_products = {'SPC', 'flt', 'ima', 'COE', 'CAE'} def __init__(self): self.par = self.get_instrument_setup() if self.par['obs_has_backgr']: self.data, self.data_background = self.load_data() else: self.data = self.load_data() self.spectral_trace = self.get_spectral_trace() self._define_region_of_interest() try: self.instrument_calibration = self.wfc3_cal except AttributeError: self.instrument_calibration = None @property def name(self): """ Define name of instrument. This function returns the tame of the HST instrument: 'WFC3' """ return "WFC3" @property def dispersion_scale(self): __all_scales = {'UVIS': '13.0 Angstrom', 'G102': '24.5 Angstrom', 'G141': '46.5 Angstrom'} return __all_scales[self.par["inst_filter"]]
[docs] def load_data(self): """ Load observational data. This function loads the WFC3 data form disk based on the parameters defined during the initialization of the TSO object. """ if self.par["obs_data"] == 'SPECTRUM': data = self.get_spectra() if self.par['obs_has_backgr']: data_back = self.get_spectra(is_background=True) elif self.par["obs_data"] == 'SPECTRAL_IMAGE': data = self.get_spectral_images() if self.par['obs_has_backgr']: if not self.par['obs_uses_backgr_model']: data_back = self.get_spectral_images(is_background=True) else: self._get_background_cal_data() data_back = self._fit_background(data) elif self.par["obs_data"] == 'SPECTRAL_CUBE': data = self.get_spectral_cubes() if self.par['obs_has_backgr']: if not self.par['obs_uses_backgr_model']: data_back = self.get_spectral_cubes(is_background=True) else: self._get_background_cal_data() data_back = self._fit_background(data) if self.par['obs_has_backgr']: return data, data_back return data
[docs] def get_instrument_setup(self): """ Get all instrument parameters. This funtion retrieves all relevant parameters defining the instrument and observational data setup. Returns ------- par : `collections.OrderedDict` Dictionary containg all relevant parameters Raises ------ ValueError If obseervationla parameters are not or incorrect defined an error will be raised """ # instrument parameters inst_obs_name = cascade_configuration.instrument_observatory inst_inst_name = cascade_configuration.instrument inst_filter = cascade_configuration.instrument_filter inst_cal_filter = cascade_configuration.instrument_cal_filter inst_aperture = cascade_configuration.instrument_aperture inst_cal_aperture = cascade_configuration.instrument_cal_aperture inst_beam = cascade_configuration.instrument_beam # object parameters obj_period = \ u.Quantity(cascade_configuration.object_period).to(u.day) obj_period = obj_period.value obj_ephemeris = \ u.Quantity(cascade_configuration.object_ephemeris).to(u.day) obj_ephemeris = obj_ephemeris.value # observation parameters obs_type = cascade_configuration.observations_type obs_mode = cascade_configuration.observations_mode obs_data = cascade_configuration.observations_data obs_path = cascade_configuration.observations_path if not os.path.isabs(obs_path): obs_path = os.path.join(cascade_default_data_path, obs_path) obs_cal_path = cascade_configuration.observations_cal_path if not os.path.isabs(obs_cal_path): obs_cal_path = os.path.join(cascade_default_path, obs_cal_path) obs_id = cascade_configuration.observations_id obs_cal_version = cascade_configuration.observations_cal_version obs_data_product = cascade_configuration.observations_data_product obs_target_name = cascade_configuration.observations_target_name obs_has_backgr = ast.literal_eval(cascade_configuration. observations_has_background) # processing try: proc_source_selection = \ cascade_configuration.processing_source_selection_method except AttributeError: proc_source_selection = 'nearest' try: proc_drop_samples = cascade_configuration.processing_drop_frames proc_drop_samples = ast.literal_eval(proc_drop_samples) for key, values in proc_drop_samples.items(): proc_drop_samples[key] = [int(i) for i in values] except AttributeError: proc_drop_samples = {'up': [-1], 'down': [-1]} try: proc_bits_not_to_flag = \ ast.literal_eval(cascade_configuration. processing_bits_not_to_flag) except AttributeError: proc_bits_not_to_flag = [0, 12, 14] try: proc_extend_roi = cascade_configuration.processing_extend_roi proc_extend_roi = ast.literal_eval(proc_extend_roi) except AttributeError: proc_extend_roi = [1.0, 1.0, 1.0, 1.0] # cpm try: cpm_ncut_first_int = \ cascade_configuration.cpm_ncut_first_integrations cpm_ncut_first_int = ast.literal_eval(cpm_ncut_first_int) except AttributeError: cpm_ncut_first_int = 0 # background observations try: obs_uses_backgr_model = \ ast.literal_eval(cascade_configuration. observations_uses_background_model) except AttributeError: obs_uses_backgr_model = False if obs_has_backgr and not obs_uses_backgr_model: obs_backgr_id = cascade_configuration.observations_background_id obs_backgr_target_name = \ cascade_configuration.observations_background_name if not (obs_data in self.__valid_data): raise ValueError("Data type not recognized, \ check your init file for the following \ valid types: {}. \ Aborting loading data".format(self.__valid_data)) if not (obs_mode in self.__valid_observing_strategy): raise ValueError("Observational stategy not recognized, \ check your init file for the following \ valid types: {}. Aborting loading \ data".format(self.__valid_observing_strategy)) if not (inst_filter in self.__valid_spectroscopic_filter): raise ValueError("Instrument spectroscopic filter not recognized, " "check your init file for the following " "valid types: {}. Aborting loading " "data".format(self.__valid_spectroscopic_filter)) if not (inst_cal_filter in self.__valid_imaging_filter): raise ValueError("Filter of calibration image not recognized, " "check your init file for the following " "valid types: {}. Aborting loading " "data".format(self.__valid_imaging_filter)) if not (inst_aperture in self.__valid_sub_array): raise ValueError("Spectroscopic subarray not recognized, \ check your init file for the following \ valid types: {}. Aborting loading \ data".format(self.__valid_sub_array)) if not (inst_cal_aperture in self.__valid_sub_array): raise ValueError("Calibration image subarray not recognized, \ check your init file for the following \ valid types: {}. Aborting loading \ data".format(self.__valid_sub_array)) if not (inst_beam in self.__valid_beams): raise ValueError("Beam (spectral order) not recognized, \ check your init file for the following \ valid types: {}. Aborting loading \ data".format(self.__valid_beams)) if not (obs_data_product in self.__valid_data_products): raise ValueError("Data product not recognized, \ check your init file for the following \ valid types: {}. Aborting loading \ data".format(self.__valid_data_products)) par = collections.OrderedDict( inst_obs_name=inst_obs_name, inst_inst_name=inst_inst_name, inst_filter=inst_filter, inst_cal_filter=inst_cal_filter, inst_aperture=inst_aperture, inst_cal_aperture=inst_cal_aperture, inst_beam=inst_beam, obj_period=obj_period, obj_ephemeris=obj_ephemeris, obs_type=obs_type, obs_mode=obs_mode, obs_data=obs_data, obs_path=obs_path, obs_cal_path=obs_cal_path, obs_id=obs_id, obs_cal_version=obs_cal_version, obs_data_product=obs_data_product, obs_target_name=obs_target_name, obs_has_backgr=obs_has_backgr, obs_uses_backgr_model=obs_uses_backgr_model, proc_source_selection=proc_source_selection, proc_drop_samp=proc_drop_samples, proc_bits_not_to_flag=proc_bits_not_to_flag, proc_extend_roi=proc_extend_roi, cpm_ncut_first_int=cpm_ncut_first_int) if obs_has_backgr and not obs_uses_backgr_model: par.update({'obs_backgr_id': obs_backgr_id}) par.update({'obs_backgr_target_name': obs_backgr_target_name}) return par
[docs] def get_spectra(self, is_background=False): """ Load spectral(1D) timeseries data. This function combines all functionallity to read fits files containing the (uncalibrated) spectral timeseries, including orbital phase and wavelength information Parameters ---------- is_background : `bool` if `True` the data represents an observaton of the IR background to be subtracted of the data of the transit spectroscopy target. Returns ------- SpectralTimeSeries : `cascade.data_model.SpectralDataTimeSeries` Instance of `SpectralDataTimeSeries` containing all spectroscopic data including uncertainties, time, wavelength and bad pixel mask. Raises ------ AssertionError, KeyError Raises an error if no data is found or if certain expected fits keywords are not present in the data files. """ # get data files if is_background: # obsid = self.par['obs_backgr_id'] target_name = self.par['obs_backgr_target_name'] else: # obsid = self.par['obs_id'] target_name = self.par['obs_target_name'] path_to_files = os.path.join(self.par['obs_path'], self.par['inst_obs_name'], self.par['inst_inst_name'], target_name, 'SPECTRA/') data_files = find(self.par['obs_id'] + '*' + self.par['obs_data_product']+'.fits', path_to_files) # number of integrations nintegrations = len(data_files) if nintegrations < 2: raise AssertionError("No Timeseries data found in dir " + path_to_files) spectral_data_file = data_files[0] spectral_data = fits.getdata(spectral_data_file, ext=1) nwavelength = spectral_data.shape[0] # get the data spectral_data = np.zeros((nwavelength, nintegrations)) uncertainty_spectral_data = np.zeros((nwavelength, nintegrations)) wavelength_data = np.zeros((nwavelength, nintegrations)) mask = np.ma.make_mask_none(spectral_data.shape) position = np.zeros((nintegrations)) dispersion_position = np.zeros((nintegrations)) angle = np.zeros((nintegrations)) scaling = np.zeros((nintegrations)) time = np.zeros((nintegrations)) scan_direction = np.zeros((nintegrations)) sample_number = np.zeros((nintegrations)) for im, spectral_data_file in enumerate(tqdm(data_files, desc="Loading Spectra", dynamic_ncols=True)): # WARNING fits data is single precision!! spectrum = fits.getdata(spectral_data_file, ext=1) spectral_data[:, im] = spectrum['FLUX'] uncertainty_spectral_data[:, im] = spectrum['FERROR'] wavelength_data[:, im] = spectrum['LAMBDA'] try: mask[:, im] = spectrum['MASK'] except KeyError: pass try: dispersion_position[im] = fits.getval(spectral_data_file, "DISP_POS", ext=0) angle[im] = fits.getval(spectral_data_file, "ANGLE", ext=0) scaling[im] = fits.getval(spectral_data_file, "SCALE", ext=0) hasOtherPos = True except KeyError: hasOtherPos = False try: position[im] = fits.getval(spectral_data_file, "POSITION", ext=0) except KeyError: pass try: time[im] = fits.getval(spectral_data_file, "TIME_BJD", ext=0) hasTimeBJD = True except KeyError: hasTimeBJD = False exptime = fits.getval(spectral_data_file, "EXPTIME", ext=0) expstart = fits.getval(spectral_data_file, "EXPSTART", ext=0) time[im] = expstart + 0.5*(exptime/(24.0*3600.0)) try: scan_direction[im] = fits.getval(spectral_data_file, "SCANDIR", ext=0) hasScanDir = True except KeyError: hasScanDir = False try: sample_number[im] = fits.getval(spectral_data_file, "SAMPLENR", ext=0) hasSampleNumber = True except KeyError: hasSampleNumber = False idx = np.argsort(time)[self.par["cpm_ncut_first_int"]:] time = time[idx] spectral_data = spectral_data[:, idx] uncertainty_spectral_data = uncertainty_spectral_data[:, idx] wavelength_data = wavelength_data[:, idx] mask = mask[:, idx] data_files = [data_files[i] for i in idx] position = position[idx] dispersion_position = dispersion_position[idx] angle = angle[idx] scaling = scaling[idx] scan_direction = scan_direction[idx] sample_number = sample_number[idx] try: medPos = fits.getval(data_files[0], "MEDPOS") medPosUnit = fits.getval(data_files[0], "MPUNIT") hasMedPos = True except KeyError: hasMedPos = False try: posUnit = fits.getval(data_files[0], "PUNIT") hasPosUnit = True except KeyError: hasPosUnit = False try: dispPosUnit = fits.getval(data_files[0], "DPUNIT") angleUnit = fits.getval(data_files[0], "AUNIT") scaleUnit = fits.getval(data_files[0], "SUNIT") hasOtherPosUnit = True except KeyError: hasOtherPosUnit = False if (not hasTimeBJD): # convert to BJD ra_target = fits.getval(data_files[0], "RA_TARG") dec_target = fits.getval(data_files[0], "DEC_TARG") target_coord = coord.SkyCoord(ra_target, dec_target, unit=(u.deg, u.deg), frame='icrs') time_obs = Time(time, format='mjd', scale='utc', location=('0d', '0d')) ltt_bary_jpl = time_obs.light_travel_time(target_coord, ephemeris='jpl') time_barycentre = time_obs.tdb + ltt_bary_jpl time = time_barycentre.jd # orbital phase phase = (time - self.par['obj_ephemeris']) / self.par['obj_period'] phase = phase - int(np.max(phase)) if np.max(phase) < 0.0: phase = phase + 1.0 phase = phase - np.rint(phase) if self.par['obs_type'] == 'ECLIPSE': phase[phase < 0] = phase[phase < 0] + 1.0 idx = np.argsort(phase) phase = phase[idx] time = time[idx] spectral_data = spectral_data[:, idx] uncertainty_spectral_data = uncertainty_spectral_data[:, idx] wavelength_data = wavelength_data[:, idx] mask = mask[:, idx] data_files = [data_files[i] for i in idx] position = position[idx] dispersion_position = dispersion_position[idx] angle = angle[idx] scaling = scaling[idx] scan_direction = scan_direction[idx] sample_number = sample_number[idx] # set the units of the observations if ((self.par['obs_data_product'] == 'COE') | (self.par['obs_data_product'] == 'CAE')): flux_unit = u.Unit("electron/s") wave_unit = u.Unit('micron') time_unit = u.day else: flux_unit = u.Unit("erg cm-2 s-1 Angstrom-1") wave_unit = u.Unit('Angstrom') time_unit = u.day time = time * time_unit SpectralTimeSeries = \ SpectralDataTimeSeries(wavelength=wavelength_data, wavelength_unit=wave_unit, data=spectral_data, data_unit=flux_unit, uncertainty=uncertainty_spectral_data, time=phase, time_unit=u.dimensionless_unscaled, mask=mask, time_bjd=time, position=position, isRampFitted=True, isNodded=False, target_name=target_name, dataProduct=self.par['obs_data_product'], dataFiles=data_files) # make sure that the date units are as "standard" as posible data_unit = (1.0*SpectralTimeSeries.data_unit).decompose().unit SpectralTimeSeries.data_unit = data_unit wave_unit = (1.0*SpectralTimeSeries.wavelength_unit).decompose().unit SpectralTimeSeries.wavelength_unit = wave_unit # To make the as standard as posible, by defaut change to # mean nomalized data units and use micron as wavelength unit mean_signal, _, _ = \ sigma_clipped_stats(SpectralTimeSeries.return_masked_array("data"), sigma=3, maxiters=10) data_unit = u.Unit(mean_signal*SpectralTimeSeries.data_unit) SpectralTimeSeries.data_unit = data_unit SpectralTimeSeries.wavelength_unit = u.micron SpectralTimeSeries.period = self.par['obj_period'] SpectralTimeSeries.ephemeris = self.par['obj_ephemeris'] if hasPosUnit: SpectralTimeSeries.position_unit = u.Unit(posUnit) if hasMedPos: SpectralTimeSeries.median_position = medPos SpectralTimeSeries.median_position_unit = u.Unit(medPosUnit) if hasOtherPosUnit & hasOtherPos: SpectralTimeSeries.add_measurement( disp_position=dispersion_position, disp_position_unit=u.Unit(dispPosUnit)) SpectralTimeSeries.add_measurement( angle=angle, angle_unit=u.Unit(angleUnit)) SpectralTimeSeries.add_measurement( scale=scaling, scale_unit=u.Unit(scaleUnit)) if hasScanDir: SpectralTimeSeries.scan_direction = list(scan_direction) if hasSampleNumber: SpectralTimeSeries.sample_number = list(sample_number) self._define_convolution_kernel() return SpectralTimeSeries
[docs] def get_spectral_images(self, is_background=False): """ Get spectral image timeseries data. This function combines all functionallity to read fits files containing the (uncalibrated) spectral image timeseries, including orbital phase and wavelength information Parameters ---------- is_background : `bool` if `True` the data represents an observaton of the IR background to be subtracted of the data of the transit spectroscopy target. Returns ------- SpectralTimeSeries : `cascade.data_model.SpectralDataTimeSeries` Instance of `SpectralDataTimeSeries` containing all spectroscopic data including uncertainties, time, wavelength and bad pixel mask. Raises ------ AssertionError, KeyError Raises an error if no data is found or if certain expected fits keywords are not present in the data files. """ # get data files if is_background and not self.par['obs_uses_backgr_model']: target_name = self.par['obs_backgr_target_name'] else: target_name = self.par['obs_target_name'] path_to_files = os.path.join(self.par['obs_path'], self.par['inst_obs_name'], self.par['inst_inst_name'], target_name, 'SPECTRAL_IMAGES/') data_files = find('*' + self.par['obs_id'] + '*_' + self.par['obs_data_product']+'.fits', path_to_files) # check if time series data can be found if len(data_files) < 2: raise AssertionError("No Timeseries data found in the \ directory: {}".format(path_to_files)) # get the data calibration_image_cube = [] spectral_image_cube = [] spectral_image_unc_cube = [] spectral_image_dq_cube = [] spectral_image_exposure_time = [] spectral_offset2 = [] spectral_offset1 = [] time = [] cal_time = [] spectral_data_files = [] calibration_data_files = [] cal_offset2 = [] cal_offset1 = [] spectral_data_nrptexp = [] for im, image_file in enumerate(tqdm(data_files, desc="Loading Spectral Images", dynamic_ncols=True)): with fits.open(image_file) as hdul: instrument_fiter = hdul['PRIMARY'].header['FILTER'] instrument_aperture = hdul['PRIMARY'].header['APERTURE'] exptime = hdul['PRIMARY'].header['EXPTIME'] expstart = hdul['PRIMARY'].header['EXPSTART'] if instrument_fiter != self.par["inst_filter"]: if ((instrument_fiter == self.par["inst_cal_filter"]) and (instrument_aperture == self.par["inst_cal_aperture"])): calibration_image = hdul['SCI'].data calibration_image_cube.append(calibration_image.copy()) calibration_data_files.append(image_file) cal_time.append(expstart + 0.5*(exptime/(24.0*3600.0))) calOffset2 = hdul['PRIMARY'].header['POSTARG2'] calOffset1 = hdul['PRIMARY'].header['POSTARG1'] cal_offset2.append(calOffset2) cal_offset1.append(calOffset1) del calibration_image continue spectral_image = hdul['SCI'].data spectral_image_cube.append(spectral_image.copy()) spectral_image_unc = hdul['ERR'].data spectral_image_unc_cube.append(spectral_image_unc.copy()) spectral_image_dq = hdul['DQ'].data spectral_image_dq_cube.append(spectral_image_dq.copy()) spectral_data_files.append(image_file) time.append(expstart + 0.5*(exptime/(24.0*3600.0))) spectral_image_exposure_time.append(exptime) Offset2 = hdul['PRIMARY'].header['POSTARG2'] Offset1 = hdul['PRIMARY'].header['POSTARG1'] spectral_offset2.append(Offset2) spectral_offset1.append(Offset1) nrptexp = hdul['PRIMARY'].header['NRPTEXP'] spectral_data_nrptexp.append(nrptexp) if len(spectral_image_cube) == 0: raise ValueError("No science data found for the \ filter: {}".format(self.par["inst_filter"])) if len(calibration_image_cube) == 0: raise ValueError("No calibration image found for the \ filter: {}".format(self.par["inst_cal_filter"])) # WARNING fits data is single precision!! spectral_image_cube = np.array(spectral_image_cube, dtype=np.float64) spectral_image_unc_cube = \ np.array(spectral_image_unc_cube, dtype=np.float64) spectral_image_dq_cube = \ np.array(spectral_image_dq_cube, dtype=np.int64) calibration_image_cube = \ np.array(calibration_image_cube, dtype=np.float64) time = np.array(time, dtype=np.float64) cal_time = np.array(cal_time, dtype=np.float64) spectral_image_exposure_time = np.array(spectral_image_exposure_time, dtype=np.float64) spectral_offset1 = np.array(spectral_offset1, dtype=np.float64) spectral_offset2 = np.array(spectral_offset2, dtype=np.float64) cal_offset1 = np.array(cal_offset1, dtype=np.float64) cal_offset2 = np.array(cal_offset2, dtype=np.float64) spectral_data_nrptexp = np.array(spectral_data_nrptexp, dtype=np.int64) idx_time_sort = np.argsort(time) time = time[idx_time_sort] spectral_image_cube = spectral_image_cube[idx_time_sort, :, :] spectral_image_unc_cube = spectral_image_unc_cube[idx_time_sort, :, :] spectral_image_dq_cube = spectral_image_dq_cube[idx_time_sort, :, :] spectral_data_files = \ list(np.array(spectral_data_files)[idx_time_sort]) spectral_image_exposure_time = \ spectral_image_exposure_time[idx_time_sort] spectral_offset1 = spectral_offset1[idx_time_sort] spectral_offset2 = spectral_offset2[idx_time_sort] spectral_data_nrptexp = spectral_data_nrptexp[idx_time_sort] # check for spurious longer or shorter exosures. median_exposure_time = np.median(spectral_image_exposure_time) idx_remove = (((spectral_image_exposure_time-0.01) > median_exposure_time) | ((spectral_image_exposure_time+0.01) < median_exposure_time)) spectral_image_exposure_time = \ spectral_image_exposure_time[~idx_remove] time = time[~idx_remove] spectral_image_cube = spectral_image_cube[~idx_remove, :, :] spectral_image_unc_cube = spectral_image_unc_cube[~idx_remove, :, :] spectral_image_dq_cube = spectral_image_dq_cube[~idx_remove, :, :] spectral_data_files = \ list(np.array(spectral_data_files)[~idx_remove]) spectral_offset1 = spectral_offset1[~idx_remove] spectral_offset2 = spectral_offset2[~idx_remove] idx_time_sort = np.argsort(cal_time) cal_time = cal_time[idx_time_sort] calibration_image_cube = calibration_image_cube[idx_time_sort] calibration_data_files = \ list(np.array(calibration_data_files)[idx_time_sort]) cal_offset1 = cal_offset1[idx_time_sort] cal_offset2 = cal_offset2[idx_time_sort] nintegrations, mpix, npix = spectral_image_cube.shape nintegrations_cal, ypix_cal, xpix_cal = calibration_image_cube.shape mask = self. _create_mask_from_dq(spectral_image_dq_cube) # convert to BJD ra_target = fits.getval(spectral_data_files[0], "RA_TARG") dec_target = fits.getval(spectral_data_files[0], "DEC_TARG") target_coord = coord.SkyCoord(ra_target, dec_target, unit=(u.deg, u.deg), frame='icrs') time_obs = Time(time, format='mjd', scale='utc', location=('0d', '0d')) cal_time_obs = Time(cal_time, format='mjd', scale='utc', location=('0d', '0d')) ltt_bary_jpl = time_obs.light_travel_time(target_coord, ephemeris='jpl') time_barycentre = time_obs.tdb + ltt_bary_jpl time = time_barycentre.jd cal_ltt_bary_jpl = cal_time_obs.light_travel_time(target_coord, ephemeris='jpl') cal_time_barycentre = cal_time_obs.tdb + cal_ltt_bary_jpl cal_time = cal_time_barycentre.jd # orbital phase phase = (time - self.par['obj_ephemeris']) / self.par['obj_period'] phase = phase - int(np.max(phase)) if np.max(phase) < 0.0: phase = phase + 1.0 if np.min(phase) > 0.5: phase = phase - 1.0 cal_phase = (cal_time - self.par['obj_ephemeris']) / \ self.par['obj_period'] cal_phase = cal_phase - int(np.max(cal_phase)) if np.max(cal_phase) < 0.0: cal_phase = cal_phase + 1.0 if np.min(cal_phase) > 0.5: cal_phase = cal_phase - 1.0 self._define_convolution_kernel() offset_x = spectral_offset1 offset_y = spectral_offset2 cal_offset_x = cal_offset1 cal_offset_y = cal_offset2 self._determine_position_offset(offset_x, offset_y, cal_offset_x, cal_offset_y) self._determine_source_position_from_cal_image( calibration_image_cube, calibration_data_files) self._read_grism_configuration_files() self._read_reference_pixel_file() self._get_subarray_size(calibration_image_cube, spectral_image_cube) wave_cal = self._get_wavelength_calibration() flux_unit = u.electron/u.second spectral_image_cube = spectral_image_cube.T * flux_unit spectral_image_unc_cube = spectral_image_unc_cube.T * flux_unit mask = mask.T time_unit = u.day time = time * time_unit SpectralTimeSeries = \ SpectralDataTimeSeries(wavelength=wave_cal, data=spectral_image_cube, uncertainty=spectral_image_unc_cube, time=phase, mask=mask, time_bjd=time, isRampFitted=True, isNodded=False, target_name=target_name, dataProduct=self.par['obs_data_product'], dataFiles=spectral_data_files) if is_background and self.par['obs_uses_backgr_model']: self._get_background_cal_data() SpectralTimeSeries = self._fit_background(SpectralTimeSeries) return SpectralTimeSeries
[docs] def get_spectral_cubes(self, is_background=False): """ Load spectral cubes. This function combines all functionallity to read fits files containing the (uncalibrated) spectral image cubes timeseries, including orbital phase and wavelength information Parameters ---------- is_background : `bool` if `True` the data represents an observaton of the IR background to be subtracted of the data of the transit spectroscopy target. Returns ------- SpectralTimeSeries : `cascade.data_model.SpectralDataTimeSeries` Instance of `SpectralDataTimeSeries` containing all spectroscopic data including uncertainties, time, wavelength and bad pixel mask. Raises ------ AssertionError, KeyError Raises an error if no data is found or if certain expected fits keywords are not present in the data files. """ # get data files if is_background and not self.par['obs_uses_backgr_model']: target_name = self.par['obs_backgr_target_name'] # obsid = self.par['obs_backgr_id'] else: # obsid = self.par['obs_id'] target_name = self.par['obs_target_name'] path_to_files = os.path.join(self.par['obs_path'], self.par['inst_obs_name'], self.par['inst_inst_name'], target_name, 'SPECTRAL_IMAGES/') data_files = find('*' + self.par['obs_id'] + '*_' + self.par['obs_data_product']+'.fits', path_to_files) # check if time series data can be found if len(data_files) < 2: raise AssertionError("No Timeseries data found in the \ directory: {}".format(path_to_files)) calibration_image_cube = [] spectral_image_cube = [] spectral_image_unc_cube = [] spectral_image_dq_cube = [] spectral_sampling_time = [] spectral_image_number_of_samples = [] time = [] cal_time = [] calibration_data_files = [] spectral_data_files_in = [] spectral_data_files_out = [] spectral_sample_number = [] spectral_scan_directon = [] spectral_scan_length = [] spectral_total_scan_length = [] spectral_scan_offset2 = [] spectral_scan_offset1 = [] cal_offset2 = [] cal_offset1 = [] spectral_data_nrptexp = [] for im, image_file in enumerate(tqdm(data_files, desc="Loading Spectral Cubes", dynamic_ncols=True)): with fits.open(image_file) as hdul: instrument_fiter = hdul['PRIMARY'].header['FILTER'] instrument_aperture = hdul['PRIMARY'].header['APERTURE'] if instrument_fiter != self.par["inst_filter"]: if ((instrument_fiter == self.par["inst_cal_filter"]) and (instrument_aperture == self.par["inst_cal_aperture"])): image_file_flt = image_file.replace('_ima', '_flt') with fits.open(image_file_flt) as hdul_cal: calibration_image = hdul_cal['SCI'].data exptime = hdul_cal['PRIMARY'].header['EXPTIME'] expstart = hdul_cal['PRIMARY'].header['EXPSTART'] calOffset2 = hdul_cal['PRIMARY'].header['POSTARG2'] calOffset1 = hdul_cal['PRIMARY'].header['POSTARG1'] calibration_image_cube.append(calibration_image.copy()) cal_time.append(expstart+0.5*exptime/(24.0*3600.0)) cal_offset2.append(calOffset2) cal_offset1.append(calOffset1) calibration_data_files.append(image_file_flt) continue isSparse = self._is_sparse_sequence(hdul) cubeCalType = self._get_cube_cal_type(hdul) nsamp = hdul['PRIMARY'].header['NSAMP'] exptime = hdul['PRIMARY'].header['EXPTIME'] nrptexp = hdul['PRIMARY'].header['NRPTEXP'] scanLeng = hdul['PRIMARY'].header['SCAN_LEN'] scanRate = hdul['PRIMARY'].header['SCAN_RAT'] scanOffset2 = hdul['PRIMARY'].header['POSTARG2'] scanOffset1 = hdul['PRIMARY'].header['POSTARG1'] scanAng = hdul['PRIMARY'].header['SCAN_ANG'] # The angle difference between “SCAN_ANG” and “PA_V3" # determines if it is an up or down scan. # For up scan, this angle is around +90 degrees (91.8), # for down scan, this angle is around -90 degrees (-88.1). if scanAng - 135.0 > 0: isUpScan = True else: isUpScan = False if isSparse: nsampMax = nsamp-2 else: nsampMax = nsamp-1 sample_counter = 0 for isample in range(0, nsampMax): if isUpScan: # ramp data stored reversed in time if (nsampMax-1-isample) in \ self.par['proc_drop_samp']['up']: continue else: if (nsampMax-1-isample) in \ self.par['proc_drop_samp']['down']: continue routtime = hdul[1+5*isample].header['ROUTTIME'] deltaTime = hdul[1+5*isample].header['DELTATIM'] if ((cubeCalType == 'COUNTS') | (cubeCalType == 'ELECTRONS')): # Note that there is always a 10 pixel # border in ima files spectral_image = \ (hdul[1+5*isample].data[5:-5, 5:-5] - hdul[1+5*(isample+1)].data[5:-5, 5:-5])/deltaTime spectral_image_unc = \ np.sqrt(hdul[2+5*isample].data[5:-5, 5:-5]**2 + hdul[2+5*(isample+1)].data[5:-5, 5:-5]**2) spectral_image_unc = spectral_image_unc/deltaTime spectral_image_dq = \ (hdul[3+5*isample].data[5:-5, 5:-5] | hdul[3+5*(isample+1)].data[5:-5, 5:-5]) elif ((cubeCalType == 'COUNTRATE') | (cubeCalType == 'ELECTRONRATE')): # Note the 10 pixel border in ima files spectral_image = hdul[1+5*isample].data[5:-5, 5:-5] spectral_image_unc = hdul[2+5*isample].data[5:-5, 5:-5] spectral_image_dq = hdul[3+5*isample].data[5:-5, 5:-5] else: raise ValueError("Unknown cubeCalType") spectral_image_cube.append(spectral_image.copy()) spectral_image_unc_cube.append(spectral_image_unc.copy()) spectral_image_dq_cube.append(spectral_image_dq.copy()) spectral_data_nrptexp.append(nrptexp) spectral_sample_number.append(sample_counter) spectral_scan_directon.append(isUpScan) spectral_sampling_time.append(deltaTime) spectral_total_scan_length.append(scanLeng) spectral_scan_length.append(scanRate*exptime) spectral_scan_offset2.append(scanOffset2) spectral_scan_offset1.append(scanOffset1) time.append(routtime - 0.5*deltaTime/86400.0) image_file_sample = \ image_file.replace('_ima', '_sample{0:04d}_ima'.format(isample) ) spectral_data_files_out.append(image_file_sample) spectral_data_files_in.append(image_file) spectral_image_number_of_samples.append(nsampMax) sample_counter += 1 if len(spectral_image_cube) == 0: raise ValueError("No science data found for the \ filter: {}".format(self.par["inst_filter"])) if len(calibration_image_cube) == 0: raise ValueError("No calibration image found for the \ filter: {}".format(self.par["inst_cal_filter"])) # WARNING fits data is single precision!! spectral_image_cube = np.array(spectral_image_cube, dtype=np.float64) spectral_image_unc_cube = \ np.array(spectral_image_unc_cube, dtype=np.float64) spectral_image_dq_cube = \ np.array(spectral_image_dq_cube, dtype=np.int64) calibration_image_cube = \ np.array(calibration_image_cube, dtype=np.float64) spectral_sampling_time = np.array(spectral_sampling_time, dtype=np.float64) time = np.array(time, dtype=np.float64) cal_time = np.array(cal_time, dtype=np.float64) spectral_sample_number = \ np.array(spectral_sample_number, dtype=np.int64) spectral_scan_directon = \ np.array(spectral_scan_directon, dtype=np.int64) spectral_image_number_of_samples = \ np.array(spectral_image_number_of_samples, dtype=np.int64) spectral_scan_length = np.array(spectral_scan_length, dtype=np.float64) spectral_total_scan_length = np.array(spectral_total_scan_length, dtype=np.float64) spectral_scan_offset2 = np.array(spectral_scan_offset2, dtype=np.float64) spectral_scan_offset1 = np.array(spectral_scan_offset1, dtype=np.float64) cal_offset1 = np.array(cal_offset1, dtype=np.int64) cal_offset2 = np.array(cal_offset2, dtype=np.int64) spectral_data_nrptexp = np.array(spectral_data_nrptexp, dtype=np.int64) idx_time_sort = np.argsort(time) time = time[idx_time_sort] spectral_sampling_time = spectral_sampling_time[idx_time_sort] spectral_image_cube = spectral_image_cube[idx_time_sort, :, :] spectral_image_unc_cube = spectral_image_unc_cube[idx_time_sort, :, :] spectral_image_dq_cube = spectral_image_dq_cube[idx_time_sort, :, :] spectral_data_files_out = \ list(np.array(spectral_data_files_out)[idx_time_sort]) spectral_sample_number = spectral_sample_number[idx_time_sort] spectral_scan_directon = spectral_scan_directon[idx_time_sort] spectral_image_number_of_samples = \ spectral_image_number_of_samples[idx_time_sort] spectral_scan_length = spectral_scan_length[idx_time_sort] spectral_total_scan_length = spectral_total_scan_length[idx_time_sort] spectral_scan_offset2 = spectral_scan_offset2[idx_time_sort] spectral_scan_offset1 = spectral_scan_offset1[idx_time_sort] spectral_data_nrptexp = spectral_data_nrptexp[idx_time_sort] med_number_of_samples = np.median(spectral_image_number_of_samples) idx_remove = spectral_image_number_of_samples != med_number_of_samples time = time[~idx_remove] spectral_sampling_time = spectral_sampling_time[~idx_remove] spectral_image_cube = spectral_image_cube[~idx_remove, :, :] spectral_image_unc_cube = spectral_image_unc_cube[~idx_remove, :, :] spectral_image_dq_cube = spectral_image_dq_cube[~idx_remove, :, :] spectral_data_files_out = \ list(np.array(spectral_data_files_out)[~idx_remove]) spectral_sample_number = spectral_sample_number[~idx_remove] spectral_scan_directon = spectral_scan_directon[~idx_remove] spectral_scan_length = spectral_scan_length[~idx_remove] spectral_total_scan_length = spectral_total_scan_length[~idx_remove] spectral_scan_offset2 = spectral_scan_offset2[~idx_remove] spectral_scan_offset1 = spectral_scan_offset1[~idx_remove] idx_time_sort = np.argsort(cal_time) cal_time = cal_time[idx_time_sort] calibration_image_cube = calibration_image_cube[idx_time_sort] calibration_data_files = \ list(np.array(calibration_data_files)[idx_time_sort]) cal_offset1 = cal_offset1[idx_time_sort] cal_offset2 = cal_offset2[idx_time_sort] nintegrations, mpix, npix = spectral_image_cube.shape nintegrations_cal, ypix_cal, xpix_cal = calibration_image_cube.shape mask = self. _create_mask_from_dq(spectral_image_dq_cube) # convert to BJD ra_target = fits.getval(spectral_data_files_in[0], "RA_TARG") dec_target = fits.getval(spectral_data_files_in[0], "DEC_TARG") target_coord = coord.SkyCoord(ra_target, dec_target, unit=(u.deg, u.deg), frame='icrs') time_obs = Time(time, format='mjd', scale='utc', location=('0d', '0d')) cal_time_obs = Time(cal_time, format='mjd', scale='utc', location=('0d', '0d')) ltt_bary_jpl = time_obs.light_travel_time(target_coord, ephemeris='jpl') time_barycentre = time_obs.tdb + ltt_bary_jpl time = time_barycentre.jd cal_ltt_bary_jpl = cal_time_obs.light_travel_time(target_coord, ephemeris='jpl') cal_time_barycentre = cal_time_obs.tdb + cal_ltt_bary_jpl cal_time = cal_time_barycentre.jd # orbital phase phase = (time - self.par['obj_ephemeris']) / self.par['obj_period'] phase = phase - int(np.max(phase)) if np.max(phase) < 0.0: phase = phase + 1.0 if np.min(phase) > 0.5: phase = phase - 1.0 cal_phase = (cal_time - self.par['obj_ephemeris']) / \ self.par['obj_period'] cal_phase = cal_phase - int(np.max(cal_phase)) if np.max(cal_phase) < 0.0: cal_phase = cal_phase + 1.0 if np.min(cal_phase) > 0.5: cal_phase = cal_phase - 1.0 # self._determine_relative_source_position(spectral_image_cube, mask) self._define_convolution_kernel() self._determine_scan_offset(spectral_scan_offset1, spectral_scan_offset2, cal_offset1, cal_offset2, spectral_scan_length, spectral_total_scan_length, spectral_scan_directon) self._determine_source_position_from_cal_image( calibration_image_cube, calibration_data_files) self._read_grism_configuration_files() self._read_reference_pixel_file() self._get_subarray_size(calibration_image_cube, spectral_image_cube) wave_cal = self._get_wavelength_calibration() if 'COUNTS' in cubeCalType: flux_unit = u.counts/u.second else: flux_unit = u.electron/u.second spectral_image_cube = spectral_image_cube.T * flux_unit spectral_image_unc_cube = spectral_image_unc_cube.T * flux_unit mask = mask.T time_unit = u.day time = time * time_unit SpectralTimeSeries = \ SpectralDataTimeSeries(wavelength=wave_cal, data=spectral_image_cube, uncertainty=spectral_image_unc_cube, time=phase, mask=mask, time_bjd=time, isRampFitted=True, isNodded=False, target_name=target_name, dataProduct=self.par['obs_data_product'], dataFiles=spectral_data_files_out) SpectralTimeSeries.sample_number = list(spectral_sample_number) SpectralTimeSeries.scan_direction = list(spectral_scan_directon) if is_background and self.par['obs_uses_backgr_model']: self._get_background_cal_data() SpectralTimeSeries = self._fit_background(SpectralTimeSeries) return SpectralTimeSeries
[docs] def _create_mask_from_dq(self, dq_cube): """ Create mask from DQ cube. Parameters ---------- dq_cube : TYPE DESCRIPTION. Returns ------- mask : TYPE DESCRIPTION. Note ---- Standard bit values not to flag are 0, 12 and 14. Bit valiue 10 (blobs) is not set by default but can be selected not to be flagged in case of problem. """ bits_not_to_flag = self.par['proc_bits_not_to_flag'] bits_to_flag = [] for ibit in range(1, 16): if ibit not in bits_not_to_flag: bits_to_flag.append(ibit) all_flag_values = np.unique(dq_cube) bit_select = np.zeros_like(all_flag_values, dtype='int') for ibit in bits_to_flag: bit_select = bit_select + (all_flag_values & (1 << (ibit - 1))) bit_select = bit_select.astype('bool') mask = np.zeros_like(dq_cube, dtype='bool') for iflag in all_flag_values[bit_select]: mask = mask | (dq_cube == iflag) return mask
[docs] def _determine_position_offset(self, scan_offset_x, scan_offset_y, cal_offset_x, cal_offset_y): """ Determine the scan offset. Parameters ---------- scan_offset_x : TYPE DESCRIPTION. scan_offset_y : TYPE DESCRIPTION. Returns ------- None. """ xc_offset = (np.mean(scan_offset_x)-np.mean(cal_offset_x))/0.135 # xc_offset = (-np.mean(cal_offset_x))/0.135 yc_offset = (np.mean(scan_offset_y)-np.mean(cal_offset_y))/0.121 try: self.wfc3_cal except AttributeError: self.wfc3_cal = SimpleNamespace() finally: self.wfc3_cal.xc_offset = xc_offset self.wfc3_cal.yc_offset = yc_offset return
[docs] def _determine_scan_offset(self, scan_offset_x, scan_offset_y, cal_offset_x, cal_offset_y, scan_length, total_scan_length, scan_directions): """ Determine the scan offset. Parameters ---------- scan_offset_x : TYPE DESCRIPTION. scan_offset_y : TYPE DESCRIPTION. scan_length : 'ndarray' Scan length covered during exposure. total_scan_length: 'ndarray' Total scan length. scan_directions : TYPE DESCRIPTION. Returns ------- None. """ xc_offset = (np.mean(scan_offset_x)-np.mean(cal_offset_x))/0.135 unique_directions = np.unique(scan_directions) yc_temp = np.zeros_like(unique_directions) for i, scan_direction in enumerate(unique_directions): idx = (scan_directions == scan_direction) scan_length_diff = total_scan_length[idx] - scan_length[idx] if scan_direction == 0: yc_temp[i] = np.mean(scan_offset_y[idx]+0.5*scan_length[idx] + scan_length_diff) else: yc_temp[i] = np.mean(scan_offset_y[idx]-0.5*scan_length[idx] - scan_length_diff) yc_offset = (np.mean(yc_temp)-np.mean(cal_offset_y))/0.121 try: self.wfc3_cal except AttributeError: self.wfc3_cal = SimpleNamespace() finally: self.wfc3_cal.xc_offset = xc_offset self.wfc3_cal.yc_offset = yc_offset self.wfc3_cal.scan_length = int(np.median(scan_length)/0.121) + 1 return
[docs] @staticmethod def _get_cube_cal_type(hdul): """ Determine the type of applied flux calibration of the spectral cubes. This function checks which type of flux calibraiton has been applied to the spetral data cubes and returns the calibration type according to the values of the relevant header keywords found in the fits data files. Parameters ---------- hdul : 'astropy.io.fits.hdu.hdulist.HDUList' Returns ------- calType : 'str' Raises ------ ValueError """ unitcorr = hdul['PRIMARY'].header['UNITCORR'] flatcorr = hdul['PRIMARY'].header['FLATCORR'] bunit = hdul['SCI'].header['BUNIT'] if (unitcorr == 'OMIT') & (flatcorr == 'OMIT'): calType = 'COUNTS' elif (unitcorr == 'OMIT') & (flatcorr == 'COMPLETE'): calType = 'ELECTRONS' elif (unitcorr == 'COMPLETE') & (flatcorr == 'OMIT'): calType = 'COUNTRATE' elif (unitcorr == 'COMPLETE') & (flatcorr == 'COMPLETE'): calType = 'ELECTRONRATE' else: raise ValueError("Unknown dataproduct {} in Spectral Cubes. \ Check the UNITCORR and FLATCORR header \ keywords".format(bunit)) return calType
[docs] @staticmethod def _is_sparse_sequence(hdul): """ Check fo sparse sequence. This funtion checks for SPARSE timing sequence. If so, the first readout time is shorter than the rest and should be discarded. Parameters ---------- hdul : 'astropy.io.fits.hdu.hdulist.HDUList' Returns ------- isSparse : 'bool' """ isSparse = 'SPARS' in hdul['PRIMARY'].header['SAMP_SEQ'] return isSparse
[docs] def _define_convolution_kernel(self): """ Define convolution kernel. Define the instrument specific convolution kernel which will be used in the correction procedure of bad pixels. """ if self.par["obs_data"] == 'SPECTRUM': kernel = Gaussian1DKernel(4.0, x_size=19) else: kernel = Gaussian2DKernel(x_stddev=0.2, y_stddev=4.0, theta=-0.0092, x_size=5, y_size=19) try: self.wfc3_cal except AttributeError: self.wfc3_cal = SimpleNamespace() finally: self.wfc3_cal.convolution_kernel = kernel return
[docs] def _define_region_of_interest(self): """ Define ROI. Defines region on detector which containes the intended target star. """ dim = self.data.data.shape if self.par["inst_beam"] == 'A': if self.par['inst_filter'] == 'G141': if len(dim) <= 2: wavelength_min = \ self.par['proc_extend_roi'][0]*1.096*u.micron wavelength_max = \ self.par['proc_extend_roi'][1]*1.6963*u.micron else: wavelength_min = \ self.par['proc_extend_roi'][0]*1.058*u.micron wavelength_max = \ self.par['proc_extend_roi'][1]*1.72*u.micron elif self.par['inst_filter'] == 'G102': if len(dim) <= 2: wavelength_min = \ self.par['proc_extend_roi'][0]*0.816*u.micron wavelength_max = \ self.par['proc_extend_roi'][1]*1.1456*u.micron else: wavelength_min = \ self.par['proc_extend_roi'][0]*0.78*u.micron wavelength_max = \ self.par['proc_extend_roi'][1]*1.17*u.micron if self.par["obs_mode"] == 'STARING': roi_width = 20 elif self.par["obs_mode"] == 'SCANNING': try: roi_width = self.wfc3_cal.scan_length+25 except AttributeError: roi_width = 150 else: roi_width = 20 else: raise ValueError("Only beam A implemented") trace = self.spectral_trace.copy() mask_min = trace['wavelength'] > wavelength_min mask_max = trace['wavelength'] < wavelength_max mask_not_defined = trace['wavelength'] == 0. idx_min = int(np.min(trace['wavelength_pixel'].value[mask_min])) idx_max = int(np.max(trace['wavelength_pixel'].value[mask_max])) if len(dim) <= 2: roi = np.zeros((dim[0]), dtype=bool) roi[0:idx_min] = True roi[idx_max+1:] = True roi[mask_not_defined] = True else: center_pix = int(np.mean(trace['positional_pixel']. value[idx_min:idx_max])) min_idx_pix = center_pix - \ int((roi_width//2)*self.par['proc_extend_roi'][2]) max_idx_pix = center_pix + \ int((roi_width//2)*self.par['proc_extend_roi'][3]) roi = np.ones((dim[:-1]), dtype=bool) roi[idx_min:idx_max, min_idx_pix:max_idx_pix] = False try: self.wfc3_cal except AttributeError: self.wfc3_cal = SimpleNamespace() finally: self.wfc3_cal.roi = roi return
[docs] def _get_background_cal_data(self): """ Get all calibration data for background fit. Get the calibration data from which the background in the science images can be determined. Raises ------ FileNotFoundError, AttributeError An error is raised if the calibration images are not found or the background data is not properly defined. Notes ----- For further details see: http://www.stsci.edu/hst/wfc3/documents/ISRs/WFC3-2015-17.pdf """ _applied_flatfields = {"G141": "uc72113oi_pfl.fits", "G102": "uc721143i_pfl.fits"} _grism_flatfields = {"G141": "u4m1335mi_pfl.fits", "G102": "u4m1335li_pfl.fits"} _zodi_cal_files = {"G141": "zodi_G141_clean.fits", "G102": "zodi_G102_clean.fits"} _helium_cal_files = {"G141": "excess_lo_G141_clean.fits", "G102": "excess_G102_clean.fits"} _scattered_cal_files = {"G141": "G141_scattered_light.fits"} calibration_file_name_flatfield = \ os.path.join(self.par['obs_cal_path'], self.par['inst_obs_name'], self.par['inst_inst_name'], _applied_flatfields[self.par['inst_filter']]) calibration_file_name_grism_flatfield = \ os.path.join(self.par['obs_cal_path'], self.par['inst_obs_name'], self.par['inst_inst_name'], _grism_flatfields[self.par['inst_filter']]) calibration_file_name_zodi = \ os.path.join(self.par['obs_cal_path'], self.par['inst_obs_name'], self.par['inst_inst_name'], _zodi_cal_files[self.par['inst_filter']]) calibration_file_name_helium = \ os.path.join(self.par['obs_cal_path'], self.par['inst_obs_name'], self.par['inst_inst_name'], _helium_cal_files[self.par['inst_filter']]) try: zodi = fits.getdata(calibration_file_name_zodi, ext=0) except FileNotFoundError: print("Calibration file {} for the " "contribution of the zodi to the " "background not found. " "Aborting".format(calibration_file_name_zodi)) raise try: helium = fits.getdata(calibration_file_name_helium, ext=0) except FileNotFoundError: print("Calibration file {} for the contribution of the helium " "excess to the background not found. " "Aborting".format(calibration_file_name_helium)) raise if self.par['inst_filter'] == 'G141': calibration_file_name_scattered = \ os.path.join(self.par['obs_cal_path'], self.par['inst_obs_name'], self.par['inst_inst_name'], _scattered_cal_files[self.par['inst_filter']]) try: scattered = fits.getdata(calibration_file_name_scattered, ext=0) except FileNotFoundError: print("Calibration file {} for the contribution of the " "excess scattered light to the background not found. " "Aborting".format(calibration_file_name_scattered)) raise else: scattered = 1.0 try: flatfield = fits.getdata(calibration_file_name_flatfield, ext=1)[:-10, :-10] except FileNotFoundError: print("Flatfield calibration file {} not found. " "Aborting".format(calibration_file_name_flatfield)) raise try: grism_flatfield = \ fits.getdata(calibration_file_name_grism_flatfield, ext=1)[:-10, :-10] except FileNotFoundError: print("Flatfield calibration file {} not found. " "Aborting".format(calibration_file_name_grism_flatfield)) raise zodi = zodi*flatfield/grism_flatfield helium = helium*flatfield/grism_flatfield scattered = scattered*flatfield/grism_flatfield try: self.wfc3_cal.subarray_sizes except AttributeError: print("Necessary WFC3 subarray sizes not yet defined. " "Aborting loading background calibration files") raise subarray = self.wfc3_cal.subarray_sizes['science_image_size'] if subarray < 1014: i0 = (1014 - subarray) // 2 zodi = zodi[i0: i0 + subarray, i0: i0 + subarray] helium = helium[i0: i0 + subarray, i0: i0 + subarray] scattered = scattered[i0: i0 + subarray, i0: i0 + subarray] self.wfc3_cal.background_cal_data = {"zodi": zodi.T, "helium": helium.T, "scattered": scattered.T} return
[docs] def _fit_background(self, science_data_in): """ Fit background. Determes the background in the HST Grism data using a model for the background to the spectral timeseries data Parameters ---------- science_data_in : `masked quantity` Input data for which the background will be determined Returns ------- SpectralTimeSeries : `SpectralDataTimeSeries` The fitted IR bacgound as a function of time Notes ----- All details of the implemented model is described in: http://www.stsci.edu/hst/wfc3/documents/ISRs/WFC3-2015-17.pdf """ try: background_cal_data = self.wfc3_cal.background_cal_data zodi = background_cal_data['zodi'] helium = background_cal_data['helium'] except AttributeError: raise AttributeError("Necessary calibration data not yet defined. \ Aborting fitting background level") mask_science_data_in = science_data_in.mask data_science_data_in = science_data_in.data.data.value uncertainty_science_data_in = science_data_in.uncertainty.data.value time_science_data_in = science_data_in.time wavelength_science_data_in = science_data_in.wavelength # step 1 mask = mask_science_data_in weights = np.zeros_like(uncertainty_science_data_in) weights[~mask] = uncertainty_science_data_in[~mask]**-2 # step 2a fited_background = np.median(data_science_data_in, axis=[0, 1], keepdims=True) nflagged = np.count_nonzero(mask) iter_count = 0 while(iter_count < 10): print('iteration background fit: {}'.format(iter_count)) # step 2b residual = np.abs(np.sqrt(weights) * (data_science_data_in-fited_background)) mask_source = residual > 3.0 # step 3 mask = np.logical_or(mask_source, mask) nflagged_new = np.count_nonzero(mask) if nflagged_new != nflagged: nflagged = nflagged_new else: print("no additional sources found, stopping iteration") break # step 4 weights[mask] = 0.0 # step 5 _, _, nint = data_science_data_in.shape design_matrix = \ np.diag(np.hstack([np.sum(weights[:, :, :].T*helium.T*helium.T, axis=(1, 2)), np.sum(weights[:, :, :].T*zodi.T*zodi.T)])) design_matrix[:-1, -1] = np.sum(weights[:, :, :].T*helium.T*zodi.T, axis=(1, 2)) design_matrix[-1, :-1] = design_matrix[:-1, -1] vector = np.hstack([np.sum(weights[:, :, :].T * helium.T * data_science_data_in.T, axis=(1, 2)), np.sum(weights[:, :, :].T*zodi.T * data_science_data_in.T)]) fit_parameters, chi = nnls(design_matrix, vector) fitted_backgroud = np.tile(helium.T, (nint, 1, 1)).T * \ fit_parameters[:-1] + (zodi*fit_parameters[-1])[:, :, None] # update iter count and break if to many iterations iter_count += 1 sigma_hat_sqr = chi / (fit_parameters.size - 1) err_fit_parameters = \ np.sqrt(sigma_hat_sqr * np.diag(np.linalg.inv(np.dot(design_matrix.T, design_matrix)))) background = {'parameter': fit_parameters, 'error': err_fit_parameters} self.wfc3_cal.background_model_parameters = background uncertainty_background_model = np.tile(helium.T, (nint, 1, 1)).T * \ err_fit_parameters[:-1] + (zodi*err_fit_parameters[-1])[:, :, None] data_init = science_data_in.data_unit uncertainty_background_model = uncertainty_background_model*data_init fitted_backgroud = fitted_backgroud*data_init SpectralTimeSeries = \ SpectralDataTimeSeries(wavelength=wavelength_science_data_in, data=fitted_backgroud, uncertainty=uncertainty_background_model, time=time_science_data_in, mask=mask_science_data_in, isRampFitted=True, isNodded=False) return SpectralTimeSeries
[docs] def _determine_source_position_from_cal_image(self, calibration_image_cube, calibration_data_files): """ Determine the source position from the target aquicition image. Determines the source position on the detector of the target source in the calibration image takes prior to the spectroscopic observations. Parameters ---------- calibration_image_cube : `ndarray` Cube containing all acquisition images of the target. calibration_data_files : `list` of `str` List containing the file names associated with the calibraton data. Attributes ---------- calibration_source_position : `list' of `tuple` The position of the source in the acquisition images associated with the HST spectral timeseries observations. """ # if self.par['proc_source_selection'] == 'nearest': brightest = 10 # else: # brightest = 1 calibration_source_position = [] expected_calibration_source_position = [] # calibration_images = [] for im, image_file in enumerate(calibration_data_files): with fits.open(image_file) as hdul: ra_target = hdul['PRIMARY'].header['RA_TARG'] dec_target = hdul['PRIMARY'].header['DEC_TARG'] w = WCS(hdul['SCI'].header) expected_target_position = \ w.all_world2pix(ra_target, dec_target, 0) expexted_xcentroid = expected_target_position[0].reshape(1)[0] expected_ycentroid = expected_target_position[1].reshape(1)[0] mean, median, std = \ sigma_clipped_stats(calibration_image_cube[im, :, :], sigma=3.0, maxiters=5) source = 0 threshold = 50. while source == 0: iraffind = IRAFStarFinder(fwhm=1.8, threshold=threshold*std, brightest=brightest) sources = iraffind(calibration_image_cube[im, :, :]-median) if sources is None: warnings.warn("No aquisition target found above " "a threshold of {}".format(threshold)) threshold = 0.9*threshold warnings.warn("Lowering threshold to a value " "of {}".format(threshold)) else: source = 1 sources = iraffind(calibration_image_cube[im, :, :]-median) distances = \ np.sqrt((sources['xcentroid']-expexted_xcentroid)**2 + (sources['ycentroid']-expected_ycentroid)**2) fluxes = sources['flux'] if self.par['proc_source_selection'] == 'nearest': idx_target = np.argsort(distances)[0] elif self.par['proc_source_selection'] == 'second_nearest': idx_target = np.argsort(distances)[1] elif self.par['proc_source_selection'] == 'second_brightest': idx_target = np.argsort(fluxes)[-2] else: idx_target = np.argsort(fluxes)[-1] source_position = (sources[idx_target]['xcentroid'], sources[idx_target]['ycentroid']) expected_source_position = \ (expexted_xcentroid, expected_ycentroid) calibration_source_position.append(source_position) expected_calibration_source_position.\ append(expected_source_position) gc.collect() try: self.wfc3_cal except AttributeError: self.wfc3_cal = SimpleNamespace() finally: self.wfc3_cal.calibration_source_position = \ calibration_source_position self.wfc3_cal.expected_calibration_source_position = \ expected_calibration_source_position self.wfc3_cal.calibration_images = \ calibration_image_cube return
[docs] def _read_grism_configuration_files(self): """ Get the relevant data from the WFC3 configuration files. Attributes ---------- DYDX : `list` The parameters for the spectral trace DLDP : 'list` The parameters for the wavelength calibration Raises ------ ValueError An error is raised if the parameters associated with the specified instrument mode can not be found in the calibration file. """ calibration_file_name = os.path.join(self.par['obs_cal_path'], self.par['inst_obs_name'], self.par['inst_inst_name'], self.par['inst_filter'] + '.' + self.par['inst_cal_filter']+'.V' + self.par['obs_cal_version'] + '.conf') with open(calibration_file_name, 'r') as content_file: content = content_file.readlines() flag_DYDX0 = True flag_DYDX1 = True flag_DLDP0 = True flag_DLDP1 = True flag_RESOL = True for line in content: # parameters spectral trace if 'DYDX_'+self.par['inst_beam']+'_0' in line: DYDX0 = np.array(line.strip().split()[1:], dtype='float64') flag_DYDX0 = False continue if 'DYDX_'+self.par['inst_beam']+'_1' in line: DYDX1 = np.array(line.strip().split()[1:], dtype='float64') flag_DYDX1 = False continue # parameters wavelength calibration if 'DLDP_'+self.par['inst_beam']+'_0' in line: DLDP0 = np.array(line.strip().split()[1:], dtype='float64') flag_DLDP0 = False continue if 'DLDP_'+self.par['inst_beam']+'_1' in line: DLDP1 = np.array(line.strip().split()[1:], dtype='float64') flag_DLDP1 = False continue if 'DRZRESOLA' in line: PXLRESOL = float(line.strip().split()[1])*u.Angstrom flag_RESOL = False continue if flag_DYDX0: raise ValueError("Spectral trace not found in calibration file, \ check {} file for the following entry: {} \ Aborting".format(calibration_file_name, 'DYDX_'+self.par['inst_beam']+'_0')) if flag_DYDX1: raise ValueError("Spectral trace not found in calibration file, \ check {} file for the following entry: {} \ Aborting".format(calibration_file_name, 'DYDX_'+self.par['inst_beam']+'_1')) if flag_DLDP0: raise ValueError("Wavelength definition not found in calibration \ file, check {} file for the following entry: {} \ Aborting".format(calibration_file_name, 'DLDP_'+self.par['inst_beam']+'_0')) if flag_DLDP1: raise ValueError("Wavelength definition not found in calibration \ file, check {} file for the following entry: {} \ Aborting".format(calibration_file_name, 'DLDP_'+self.par['inst_beam']+'_1')) if flag_RESOL: raise ValueError("Dispersion scale not found in calibration \ file, check {} file for the following entry: DRZRESOLA \ Aborting".format(calibration_file_name)) DYDX = [DYDX0, DYDX1] DLDP = [DLDP0, DLDP1] try: self.wfc3_cal except AttributeError: self.wfc3_cal = SimpleNamespace() finally: self.wfc3_cal.DYDX = DYDX self.wfc3_cal.DLDP = DLDP self.wfc3_cal.PXLRESOL = PXLRESOL return
[docs] def _read_reference_pixel_file(self): """ Get the reference pixel. Read the calibration file containig the definition of the reference pixel appropriate for a given sub array and or filer Attributes ---------- reference_pixels : `collections.OrderedDict` Ordered dict containing the reference pixels to be used in the wavelength calibration. """ calibration_file_name = os.path.join(self.par['obs_cal_path'], self.par['inst_obs_name'], self.par['inst_inst_name'], 'wavelength_ref_pixel_' + self.par['inst_inst_name']. lower()+'.txt') ptable_cal = pd.read_table(calibration_file_name, delim_whitespace=True, low_memory=False, skiprows=1, names=['APERTURE', 'FILTER', 'XREF', 'YREF']) XREF_GRISM, YREF_GRISM = \ self._search_ref_pixel_cal_file(ptable_cal, self.par["inst_aperture"], self.par["inst_filter"]) XREF_IMAGE, YREF_IMAGE = \ self._search_ref_pixel_cal_file(ptable_cal, self.par["inst_cal_aperture"], self.par["inst_cal_filter"]) reference_pixels = collections.OrderedDict(XREF_GRISM=XREF_GRISM, YREF_GRISM=YREF_GRISM, XREF_IMAGE=XREF_IMAGE, YREF_IMAGE=YREF_IMAGE) try: self.wfc3_cal except AttributeError: self.wfc3_cal = SimpleNamespace() finally: self.wfc3_cal.reference_pixels = reference_pixels return
[docs] @staticmethod def _search_ref_pixel_cal_file(ptable, inst_aperture, inst_filter): """ Search for the reference pixel. Search the reference pixel calibration file for the reference pixel given the instrument aperture and filter. Parameters ---------- ptable : `dict` Calibratrion table with reference positions inst_aperture : `str` The instrument aperture inst_filter : `str` The instrument filter Returns ------- XREF : `float` X reference position for the acquisition image YREF : `float` Y reference position for the acquisition image Raises ------ ValueError An error is raises if the instrument aperture if filter is not fount in the calibration table Notes ----- See http://www.stsci.edu/hst/observatory/apertures/wfc3.html """ ptable_aperture = \ ptable[(ptable.APERTURE == inst_aperture)] if ptable_aperture.shape[0] == 1: XREF = ptable_aperture.XREF.values YREF = ptable_aperture.YREF.values else: ptable_aperture_filter = \ ptable_aperture[(ptable_aperture.FILTER == inst_filter)] if ptable_aperture_filter.shape[0] == 1: XREF = ptable_aperture_filter.XREF.values YREF = ptable_aperture_filter.YREF.values else: ptable_grism_filter = \ ptable_aperture[(ptable_aperture.FILTER.isnull())] if ptable_grism_filter.shape[0] == 1: XREF = ptable_grism_filter.XREF.values YREF = ptable_grism_filter.YREF.values else: raise ValueError("Filter or Aperture not found in, \ reference pixel calibration file, Aborting") return XREF, YREF
[docs] def _get_subarray_size(self, calibration_data, spectral_data): """ Get the size of the used WFC3 subarray. This function determines the size of the used subarray. Parameters ---------- calibration_data spectral_data Attributes ---------- subarray_sizes Raises ------ AttributeError """ nintegrations, nspatial, nwavelength = spectral_data.shape nintegrations_cal, npix_y_cal, npix_x_cal = calibration_data.shape subarray_sizes = \ collections.OrderedDict(cal_image_size=npix_x_cal, science_image_size=nwavelength) try: self.wfc3_cal except AttributeError: self.wfc3_cal = SimpleNamespace() finally: self.wfc3_cal.subarray_sizes = subarray_sizes return
[docs] def _get_wavelength_calibration(self): """ Return the WFC3 wavelength calibration. Using the source position determined from the aquisition image this function returns the wavelength solution for the spectra. Returns ------- wave_cal : `ndarray` Wavelength calibration of the observation. Raises ------ AttributeError An error is raised if the necessary calibration data is not yet defined. """ try: self.wfc3_cal except AttributeError: raise AttributeError("Necessary calibration data not yet defined. \ Aborting wavelength to pixel assignment") xc, yc = self.wfc3_cal.calibration_source_position[0] DYDX = self.wfc3_cal.DYDX DLDP = self.wfc3_cal.DLDP reference_pixels = self.wfc3_cal.reference_pixels subarray_sizes = self.wfc3_cal.subarray_sizes yc_offset = self.wfc3_cal.yc_offset xc_offset = self.wfc3_cal.xc_offset yc = yc + yc_offset xc = xc + xc_offset wave_cal = \ self._WFC3Dispersion( xc, yc, DYDX, DLDP, xref=reference_pixels["XREF_IMAGE"][0], yref=reference_pixels["YREF_IMAGE"][0], xref_grism=reference_pixels["XREF_GRISM"][0], yref_grism=reference_pixels["YREF_GRISM"][0], subarray=subarray_sizes['cal_image_size'], subarray_grism=subarray_sizes['science_image_size']) return wave_cal
[docs] def get_spectral_trace(self): """ Get spectral trace. Returns ------- spectral_trace : `collections.OrderedDict` The spectral trace of the dispersed light (both position and wavelength) Raises ------ AttributeError An error is raised in the necessary calibration data is not yet defined. """ dim = self.data.data.shape # wavelength_unit = self.data.wavelength_unit wave_pixel_grid = np.arange(dim[0]) * u.pix if self.par["obs_data"] == 'SPECTRUM': position_pixel_grid = np.zeros_like(wave_pixel_grid) spectral_trace = \ collections.OrderedDict(wavelength_pixel=wave_pixel_grid, positional_pixel=position_pixel_grid, wavelength=self.data.wavelength. data[:, 0]) return spectral_trace try: self.wfc3_cal except AttributeError: raise AttributeError("Necessary calibration data not yet defined. \ Aborting trace determination") xc, yc = self.wfc3_cal.calibration_source_position[0] DYDX = self.wfc3_cal.DYDX DLDP = self.wfc3_cal.DLDP reference_pixels = self.wfc3_cal.reference_pixels subarray_sizes = self.wfc3_cal.subarray_sizes yc_offset = self.wfc3_cal.yc_offset xc_offset = self.wfc3_cal.xc_offset yc = yc + yc_offset xc = xc + xc_offset trace = self._WFC3Trace( xc, yc, DYDX, xref=reference_pixels["XREF_IMAGE"], yref=reference_pixels["YREF_IMAGE"], xref_grism=reference_pixels["XREF_GRISM"], yref_grism=reference_pixels["YREF_GRISM"], subarray=subarray_sizes['cal_image_size'], subarray_grism=subarray_sizes['science_image_size']) trace = trace * u.pix wavelength = self._WFC3Dispersion( xc, yc, DYDX, DLDP, xref=reference_pixels["XREF_IMAGE"], yref=reference_pixels["YREF_IMAGE"], xref_grism=reference_pixels["XREF_GRISM"], yref_grism=reference_pixels["YREF_GRISM"], subarray=subarray_sizes['cal_image_size'], subarray_grism=subarray_sizes['science_image_size']) spectral_trace = \ collections.OrderedDict(wavelength_pixel=wave_pixel_grid, positional_pixel=trace, wavelength=wavelength) return spectral_trace
[docs] @staticmethod def _WFC3Trace(xc, yc, DYDX, xref=522, yref=522, xref_grism=522, yref_grism=522, subarray=256, subarray_grism=256): """ Define the spectral trace for the wfc3 grism modes. Parameters ---------- xc yc DYDX xref=522 yref=522 xref_grism=522 yref_grism=522 subarray=256 subarray_grism=256 Returns ------- trace Notes ----- Details can be found in: http://www.stsci.edu/hst/wfc3/documents/ISRs/WFC3-2016-15.pdf and http://www.stsci.edu/hst/instrumentation/wfc3/documentation/ grism-resources """ # adjust position in case different subarrays are used. xc = xc - (xref - xref_grism) yc = yc - (yref - yref_grism) coord0 = (1014 - subarray) // 2 xc = xc + coord0 yc = yc + coord0 dx = np.arange(1014) - xc M = np.sqrt(1.0 + (DYDX[1][0] + DYDX[1][1] * xc + DYDX[1][2] * yc + DYDX[1][3] * xc**2 + DYDX[1][4] * xc * yc + DYDX[1][5] * yc**2)**2 ) dp = dx * M trace = (DYDX[0][0] + DYDX[0][1]*xc + DYDX[0][2]*yc + DYDX[0][3]*xc**2 + DYDX[0][4]*xc*yc + DYDX[0][5]*yc**2) + \ dp * (DYDX[1][0] + DYDX[1][1]*xc + DYDX[1][2]*yc + DYDX[1][3]*xc**2 + DYDX[1][4]*xc*yc + DYDX[1][5]*yc**2) / M if subarray < 1014: i0 = (1014 - subarray) // 2 trace = trace[i0: i0 + subarray] idx_min = (subarray-subarray_grism)//2 idx_max = (subarray-subarray_grism)//2 + subarray_grism trace = trace[idx_min:idx_max] return trace + yc - (1014 - subarray_grism) // 2
[docs] @staticmethod def _WFC3Dispersion(xc, yc, DYDX, DLDP, xref=522, yref=522, xref_grism=522, yref_grism=522, subarray=256, subarray_grism=256): """ Convert pixel coordinate to wavelength. Parameters ---------- xc : 'float' X coordinate of direct image centroid yc : 'float' Y coordinate of direct image centroid xref : 'int' Reference X coordinate of target aquisition image. Default 522 yref : 'int' Reference Y coordinate of target aquisition image. Default 522 xref_grism : 'int' Reference X coordinate of used grism spectral image. Default 522 yref_grism : Reference Y coordinate of used grism spectral image. Default 522 subarray :'int' Used subarray of target aquisition image. Default 256 subarray_grism : 'int' Used subarray of spectral image. Default 256 Returns ------- wavelength : 'astropy.units.core.Quantity' return wavelength mapping of x coordinate in micron Notes ----- For details of the method and coefficient adopted see [1]_ and [2]_. See also: http://www.stsci.edu/hst/wfc3/documents/ISRs/WFC3-2016-15.pdf In case the direct image and spectral image are not taken with the same aperture, the centroid measurement is adjusted according to the table in: http://www.stsci.edu/hst/observatory/apertures/wfc3.html References ---------- .. [1] Kuntschner et al. (2009) .. [2] Wilkins et al. (2014) """ # adjust position in case different subarrays are used. xc = xc - (xref - xref_grism) yc = yc - (yref - yref_grism) coord0 = (1014 - subarray) // 2 xc = xc + coord0 yc = yc + coord0 # calculate field dependent dispersion coefficient p0 = (DLDP[0][0] + DLDP[0][1]*xc + DLDP[0][2]*yc + DLDP[0][3]*xc**2 + DLDP[0][4]*xc*yc + DLDP[0][5]*yc**2) p1 = (DLDP[1][0] + DLDP[1][1]*xc + DLDP[1][2]*yc + DLDP[1][3]*xc**2 + DLDP[1][4]*xc*yc + DLDP[1][5]*yc**2) dx = np.arange(1014) - xc M = np.sqrt(1.0 + (DYDX[1][0] + DYDX[1][1]*xc + DYDX[1][2]*yc + DYDX[1][3]*xc**2 + DYDX[1][4]*xc*yc + DYDX[1][5]*yc**2)**2 ) dp = dx * M wavelength = (p0 + dp * p1) if subarray < 1014: i0 = (1014 - subarray) // 2 wavelength = wavelength[i0: i0 + subarray] idx_min = (subarray-subarray_grism)//2 idx_max = (subarray-subarray_grism)//2 + subarray_grism wavelength = wavelength[idx_min:idx_max] * u.Angstrom return wavelength.to(u.micron)