#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# This file is part of CASCADe package
#
# Developed within the ExoplANETS-A H2020 program.
#
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright (C) 2018, 2021 Jeroen Bouwman
"""Module defiing utility functions used in cascade."""
import os
import glob
import fnmatch
import copy
import warnings
import numpy as np
from astropy.io import fits
from astropy.table import QTable
import astropy.units as u
from tqdm import tqdm
import numba as nb
from cascade.data_model import SpectralDataTimeSeries
__all__ = ['write_timeseries_to_fits', 'find', 'get_data_from_fits',
'spectres', 'write_spectra_to_fits', '_define_band_limits',
'_define_rebin_weights', '_rebin_spectra',
'write_dataset_to_fits', 'read_dataset_from_fits']
[docs]def write_spectra_to_fits(spectral_dataset, path, filename, header_meta,
column_names=['Wavelength', 'Depth', 'Error Depth']):
"""
Write spectra dataset object to fits files.
Parameters
----------
data : 'ndarray' or 'SpectralDataTimeSeries'
The data cube which will be save to fits file. For each time step
a fits file will be generated.
path : 'str'
Path to the directory where the fits files will be saved.
filename: 'str' (optional)
file name of save fits file.
header_meta : 'dict'
All auxilary data to be written to fits header.
column_names : 'list' of 'string', optional
names of the fits table columns.
"""
os.makedirs(path, exist_ok=True)
mask = spectral_dataset.mask
hdr = fits.Header()
for key, value in header_meta.items():
hdr[key] = value
primary_hdu = fits.PrimaryHDU(header=hdr)
columns_list = \
[fits.Column(name=column_names[0], format='D',
unit=spectral_dataset.wavelength_unit.to_string(),
array=spectral_dataset.wavelength.data.value[~mask]),
fits.Column(name=column_names[1], format='D',
unit=spectral_dataset.data_unit.to_string(),
array=spectral_dataset.data.data.value[~mask]),
fits.Column(name=column_names[2], format='D',
unit=spectral_dataset.data_unit.to_string(),
array=spectral_dataset.uncertainty.data.value[~mask]),
]
try:
columns_list = columns_list + \
[fits.Column(name="Bin Size", format='D',
unit=spectral_dataset.wavelength_binsize_unit.to_string(),
array=spectral_dataset.wavelength_binsize.data.value[~mask])]
except AttributeError:
pass
hdu = fits.BinTableHDU.from_columns(columns_list)
hdul = fits.HDUList([primary_hdu, hdu])
hdul.writeto(os.path.join(path, filename), overwrite=True)
[docs]def write_dataset_to_fits(spectral_dataset, path, filename,
header_meta):
"""
Write a spectral dataset to a single fits files.
Parameters
----------
spectral_dataset : 'SpectralDataTimeSeries'
The data cube which will be save to fits file.
path : 'str'
Path to the directory where the fits files will be saved.
filename: 'str' (optional)
file name of save fits file.
header_meta : 'dict'
All auxilary data to be written to fits header.
"""
data = spectral_dataset.return_masked_array('data')
mask = data.mask
wave = spectral_dataset.return_masked_array('wavelength')
unc = spectral_dataset.return_masked_array('uncertainty')
time = spectral_dataset.return_masked_array('time')
try:
position = spectral_dataset.return_masked_array('position')
except:
position = None
try:
scaling = spectral_dataset.return_masked_array('scaling')
except:
scaling = None
hdr = fits.Header()
for key, value in header_meta.items():
hdr[key] = value
primary_hdu = fits.PrimaryHDU(header=hdr)
hdu_sys = fits.ImageHDU(data.data, name='DATA')
hdu_sys.header['UNITS'] = spectral_dataset.data_unit.to_string()
hdu_mask = fits.ImageHDU(np.array(mask, dtype=int), name='MASK')
hdu_wave = fits.ImageHDU(wave.data, name='WAVELENGTH')
hdu_wave.header['UNITS'] = spectral_dataset.wavelength_unit.to_string()
hdu_unc = fits.ImageHDU(unc.data, name='UNCERTAINTY')
hdu_unc.header['UNITS'] = spectral_dataset.data_unit.to_string()
hdu_time = fits.ImageHDU(time.data, name='TIME')
hdu_time.header['UNITS'] = spectral_dataset.time_unit.to_string()
product_list = [primary_hdu, hdu_sys, hdu_mask, hdu_wave, hdu_unc, hdu_time]
if position is not None:
hdu_position = fits.ImageHDU(position.data, name='POSITION')
hdu_position.header['UNITS'] = spectral_dataset.position_unit.to_string()
product_list += [hdu_position]
if scaling is not None:
hdu_scaling = fits.ImageHDU(scaling.data, name='SCALING')
hdu_scaling.header['UNITS'] = spectral_dataset.scaling_unit.to_string()
product_list += [hdu_scaling]
hdul = fits.HDUList(product_list)
os.makedirs(path, exist_ok=True)
hdul.writeto(os.path.join(path, filename), overwrite=True)
[docs]def read_dataset_from_fits(path, filename, auxilary_meta):
"""
Read in a spectral dataset from a single fits files.
Parameters
----------
path : 'str'
Path to the directory where the fits file is located.
filename: 'str' (optional)
file name of save fits file.
auxilary_meta : 'list'
All auxilary data to read from fits header.
Returns
-------
spectral_dataset : 'SpectralDataTimeSeries'
The data cube read from disk.
"""
with fits.open(os.path.join(path, filename)) as hdul:
header_meta = {}
for key in auxilary_meta:
value=hdul['PRIMARY'].header[key]
header_meta[key] = value
data = np.array(hdul['DATA'].data, dtype=np.float64)
data_unit = u.Unit(hdul['DATA'].header['UNITS'])
mask = np.array(hdul['MASK'].data, dtype=bool)
wavelength = np.array(hdul['WAVELENGTH'].data, dtype=np.float64)
wavelength_unit = u.Unit(hdul['WAVELENGTH'].header['UNITS'])
uncertainty = np.array(hdul['UNCERTAINTY'].data, dtype=np.float64)
time = np.array(hdul['TIME'].data, dtype=np.float64)
time_unit = u.Unit(hdul['TIME'].header['UNITS'])
try:
position = np.array(hdul['POSITION'].data, dtype=np.float64)
position_unit = u.Unit(hdul['POSITION'].header['UNITS'])
except:
position = None
try:
scaling = np.array(hdul['SCALING'].data, dtype=np.float64)
scaling_unit = u.Unit(hdul['SCALING'].header['UNITS'])
except:
scaling = None
spectral_dataset = SpectralDataTimeSeries(
wavelength=wavelength,
wavelength_unit=wavelength_unit,
data=data,
data_unit=data_unit,
uncertainty=uncertainty,
time=time,
time_unit=time_unit,
mask=mask)
spectral_dataset.add_auxilary(**header_meta)
if position is not None:
spectral_dataset.add_measurement(position=position,
position_unit=position_unit)
if scaling is not None:
spectral_dataset.add_measurement(scaling=scaling,
scaling_unit=scaling_unit)
return spectral_dataset
[docs]def write_timeseries_to_fits(data, path, additional_file_string=None,
delete_old_files=False):
"""
Write spectral timeseries data object to fits files.
Parameters
----------
data : 'ndarry' or 'SpectralDataTimeSeries'
The data cube which will be save to fits file. For each time step
a fits file will be generated.
path : 'str'
Path to the directory where the fits files will be saved.
additional_file_string : 'str' (optional)
Additional information to be added to the file name.
"""
os.makedirs(path, exist_ok=True)
if delete_old_files:
try:
dataProduct = data.dataProduct
except AttributeError:
warnings.warn("No dataProduct defined. Removing all fits files "
"in target directory before writing spectra.")
dataProduct = ""
files = glob.glob(os.path.join(path, '*'+dataProduct+'.fits'))
for f in files:
try:
os.remove(f)
except OSError as e:
print("Error: %s : %s" % (f, e.strerror))
ndim = data.data.ndim
ntime = data.data.shape[-1]
try:
dataFiles = data.dataFiles
dataFiles = [os.path.split(file)[1] for file in dataFiles]
except AttributeError:
if ndim == 2:
fileBase = "spectrum"
elif ndim == 3:
fileBase = "image"
else:
fileBase = "image_cube"
if not isinstance(additional_file_string, type(None)):
fileBase = fileBase + '_' + str(additional_file_string).strip(' ')
dataFiles = [fileBase+"_{0:0=4d}.fits".format(it)
for it in range(ntime)]
if ndim == 2:
for itime, fileName in enumerate(dataFiles):
hdr = fits.Header()
# makes sure the data is sorted according to wavelength
idx = np.argsort(data.wavelength.data.value[..., itime])
try:
hdr['TARGET'] = data.target_name
except AttributeError:
pass
hdr['COMMENT'] = "Created by CASCADe pipeline"
try:
hdr['PHASE'] = np.ma.mean(data.time[idx, itime]).value
except np.ma.MaskError:
pass
try:
hdr['TIME_BJD'] = np.ma.mean(data.time_bjd[idx, itime]).value
hdr['TBJDUNIT'] = data.time_bjd_unit.to_string()
except AttributeError:
pass
try:
hdr['SCANDIR'] = data.scan_direction[itime]
except AttributeError:
pass
try:
hdr['SAMPLENR'] = data.sample_number[itime]
except AttributeError:
pass
try:
hdr['POSITION'] = np.ma.mean(data.position[idx, itime]).value
hdr['PUNIT'] = data.position_unit.to_string()
except AttributeError:
pass
try:
hdr['MEDPOS'] = float(data.median_position)
hdr['MPUNIT'] = data.position_unit.to_string()
except AttributeError:
pass
try:
hdr['DISP_POS'] = \
np.ma.mean(data.dispersion_position[idx, itime]).value
hdr['DPUNIT'] = data.dispersion_position_unit.to_string()
except AttributeError:
pass
try:
hdr['SCALE'] = np.ma.mean(data.scale[idx, itime]).value
hdr['SUNIT'] = data.scale_unit.to_string()
except AttributeError:
pass
try:
hdr['ANGLE'] = np.ma.mean(data.angle[idx, itime]).value
hdr['AUNIT'] = data.angle_unit.to_string()
except AttributeError:
pass
primary_hdu = fits.PrimaryHDU(header=hdr)
hdu = fits.BinTableHDU.from_columns(
[fits.Column(name='LAMBDA', format='D',
unit=data.wavelength_unit.to_string(),
array=data.wavelength.data.value[idx, itime]),
fits.Column(name='FLUX', format='D',
unit=data.data_unit.to_string(),
array=data.data.data.value[idx, itime]),
fits.Column(name='FERROR', format='D',
unit=data.data_unit.to_string(),
array=data.uncertainty.data.value[idx,
itime]),
fits.Column(name='MASK', format='L',
array=data.mask[idx, itime])
])
hdul = fits.HDUList([primary_hdu, hdu])
hdul.writeto(os.path.join(path, fileName), overwrite=True)
elif ndim > 2:
raise ValueError("Saving spectral images to fits file not \
implemented yet")
[docs]def find(pattern, path):
"""
Return a list of all data files.
Parameters
----------
pattern : 'str'
Pattern used to search for files.
path : 'str'
Path to directory to be searched.
Returns
-------
result : 'list' of 'str'
Sorted list of filenames matching the 'pattern' search
"""
result = []
for root, dirs, files in os.walk(path):
for name in files:
if fnmatch.fnmatch(name, pattern):
result.append(os.path.join(root, name))
return sorted(result)
[docs]def get_data_from_fits(data_files, data_list, auxilary_list):
"""
Read observations from fits.
This function reads in a list of fits files containing the
spectral time series data and auxilary information like position and time.
Parameters
----------
data_files : 'list' of 'str'
List containing the names of all fits filaes to be read in.
data_list : 'list' of 'str'
List containing all the fits data keywords
auxilary_list : 'list' of 'str'
List containing all keywords for the auxilary data
Returns
-------
data_dict : 'dict'
Dictonary containing all spectral time series data
auxilary_dict : dict'
Dictonary containing all auxilary data
"""
ndata = len(data_files)
data_struct = {'data': [], 'flag': True}
data_dict = {k: copy.deepcopy(data_struct) for k in data_list}
auxilary_data_struct = {"data": np.zeros(ndata),
"data_unit": u.dimensionless_unscaled,
"flag": False}
auxilary_dict = {k: copy.deepcopy(auxilary_data_struct)
for k in auxilary_list}
for ifile, fits_file in enumerate(tqdm(data_files,
dynamic_ncols=True)):
with fits.open(fits_file) as hdu_list:
fits_header = hdu_list[0].header
for key, value in auxilary_dict.items():
try:
value['data'][ifile] = fits_header[key]
value['flag'] = True
except KeyError:
value['flag'] = False
except ValueError:
if 'UNIT' in key:
value['data_unit'] = u.Unit(fits_header[key])
data_table = QTable.read(hdu_list[1])
for key, value in data_dict.items():
try:
try:
value['data'].append(data_table[key].value *
u.Unit(data_table[key].
unit.to_string()))
except AttributeError:
value['data'].append(data_table[key])
value['flag'] = True
except KeyError:
value['flag'] = False
return data_dict, auxilary_dict
@nb.vectorize(["float64(int64,int64,int64,int64)",
"float32(float32, float32, float32, float32)",
"float64(float64, float64, float64, float64)"])
def overlap(min1, max1, min2, max2):
"""
Determine the overlap between two intervals.
Parameters
----------
min1 : 'float'
minimum interval 1
max1 : 'float'
maximum interval 1
min2 : 'float'
minimum interval 2
max2 : 'float'
maximum interval 2
Returns
-------
overlap : 'float'
overlapping range
"""
return max(0, min(max1, max2) - max(min1, min2))
@nb.jit(nopython=True, cache=True)
def define_limits(wave):
"""
Define the band for each spectroscopic wavelength.
Parameters
----------
wave : 'ndarray'
Wavelengths (1D array)
Returns
-------
lr : 'ndarray'
lower band limits
ur : 'ndarray'
upper band limits
"""
lr = np.empty(wave.shape, dtype=wave.dtype)
ur = np.empty(wave.shape, dtype=wave.dtype)
wd = np.diff(wave)*0.5
ur[0:-1] = wave[0:-1] + wd
ur[-1] = wave[-1] + wd[-1]
lr[1:] = wave[1:] - wd
lr[0] = wave[0] - wd[0]
return (lr, ur)
@nb.jit(nopython=True, cache=True)
def define_limits2(wave):
"""
Define the band for each spectroscopic wavelength for timeseries data.
Parameters
----------
wave : 'ndarray'
Wavelengths (2D array)
Returns
-------
lr : 'ndarray'
lower band limits
ur : 'ndarray'
upper band limits
"""
nwave, ntime = wave.shape
ur = np.empty((nwave, ntime), dtype=wave.dtype)
lr = np.empty((nwave, ntime), dtype=wave.dtype)
w = np.empty((nwave), dtype=wave.dtype)
for it in nb.prange(ntime):
w[:] = wave[:, it]
ls, us = define_limits(w)
ur[:, it] = us
lr[:, it] = ls
return (lr, ur)
[docs]def _define_band_limits(wave):
"""
Define the limits of the rebin intervals.
Parameters
----------
wave : 'ndarray' of 'float'
Wavelengths (1D or 2D array)
Returns
-------
limits : 'tuple'
lower and upper band limits
Raises
------
TypeError
When the input wavelength array has more than 2 dimensions
ValueError
When the wavelength is not defined for each data point
"""
if isinstance(wave, np.ma.core.MaskedArray):
waveUse = wave.data
else:
waveUse = wave
if (not np.all(np.isfinite(waveUse))) | np.any(waveUse <= 0.0):
raise ValueError("Wavelength not defined for each data point. " +
"Stopping rebin")
ndim = wave.ndim
if ndim == 1:
return define_limits(waveUse)
elif ndim == 2:
return define_limits2(waveUse)
else:
raise TypeError("input wavelength array can have at most 2 dimensions")
@nb.jit(nopython=True, cache=True)
def define_weights(lr0, ur0, lr, ur):
"""
Weight definition for spectra.
Parameters
----------
lr0 : 'ndarray'
lower band limits of reference.
ur0 : 'ndarray'
Upper band limits of reference.
lr : 'ndarray'
lower band limits.
ur : 'ndarray'
upper band limits.
Returns
-------
weights : 'ndarray'
Integration weights.
"""
nwave = lr.shape[0]
nwave0 = lr0.shape[0]
weights = np.zeros((nwave0, nwave), dtype=lr.dtype)
for it in nb.prange(nwave0):
wlr0 = lr0[it]
wur0 = ur0[it]
weights[it, :] = overlap(wlr0, wur0, lr, ur)
return weights
@nb.jit(nopython=True, cache=True)
def define_weights2(lr0, ur0, lr, ur):
"""
Weight definition for spectral timeseries.
Parameters
----------
lr0 : 'ndarray'
lower band limits of reference.
ur0 : 'ndarray'
Upper band limits of reference.
lr : 'ndarray'
lower band limits.
ur : 'ndarray'
upper band limits.
Returns
-------
weights : 'ndarray'
Integration weights.
"""
nwave, ntime = lr.shape
nwave0 = lr0.shape[0]
weights = np.zeros((nwave0, nwave, ntime), dtype=lr.dtype)
for it in nb.prange(nwave0):
wlr0 = lr0[it]
wur0 = ur0[it]
weights[it, :, :] = overlap(wlr0, wur0, lr.T, ur.T).T
return weights
[docs]def _define_rebin_weights(lr0, ur0, lr, ur):
"""
Define the weights used in spectral rebin.
Define the summation weights (i.e. the fractions of the original intervals
used in the new interval after rebinning)
Parameters
----------
lr0 : 'ndarray'
lower limits wavelength band of new wavelength grid
ur0 : 'ndarray'
upper limits wavelength band of new wavelength grid
lr : 'ndarray'
lower limits
ur : 'ndarray'
upper limits
Returns
-------
weights : 'ndarray' of 'float'
Summation weights used to rebin to new wavelength grid
Raises
------
TypeError
When the input wavelength array has more than 2 dimensions.
"""
ndim = lr.ndim
if ndim == 1:
return define_weights(lr0, ur0, lr, ur)
elif ndim == 2:
return define_weights2(lr0, ur0, lr, ur)
else:
raise TypeError("input wavelength array can have at most 2 dimensions")
[docs]def _rebin_spectra(spectra, errors, weights):
"""
Rebin spectra.
Parameters
----------
spectra : 'ndarray'
Input spectral data values
errors: 'ndarray'
Input error on spectral data values
weights: 'ndarray'
rebin weights
Returns
-------
newSpectra: 'ndarray'
Output spectra
newErrors: 'ndarray'
Output error
"""
norm = np.sum(weights, axis=1)
newSpectra = np.sum(weights*spectra, axis=1)/norm
newErrors = np.sqrt(np.sum(weights**2*errors**2, axis=1)/norm**2)
return newSpectra, newErrors
[docs]def spectres(new_spec_wavs, old_spec_wavs, spec_fluxes, spec_errs=None):
"""
SpectRes: A fast spectral resampling function.
Copyright (C) 2017 A. C. Carnall
Function for resampling spectra (and optionally associated uncertainties)
onto a new wavelength basis.
Parameters
----------
new_spec_wavs : numpy.ndarray
Array containing the new wavelength sampling desired for the spectrum
or spectra.
old_spec_wavs : numpy.ndarray
1D array containing the current wavelength sampling of the spectrum or
spectra.
spec_fluxes : numpy.ndarray
Array containing spectral fluxes at the wavelengths specified in
old_spec_wavs, last dimension must correspond to the shape of
old_spec_wavs.
Extra dimensions before this may be used to include multiple spectra.
spec_errs : numpy.ndarray (optional)
Array of the same shape as spec_fluxes containing uncertainties
associated with each spectral flux value.
Returns
-------
resampled_fluxes : numpy.ndarray
Array of resampled flux values, first dimension is the same length
as new_spec_wavs, other dimensions are the same as spec_fluxes
resampled_errs : numpy.ndarray
Array of uncertainties associated with fluxes in resampled_fluxes.
Only returned if spec_errs was specified.
"""
# Generate arrays of left hand side positions and widths for the
# old and new bins
spec_lhs = np.zeros(old_spec_wavs.shape[0])
spec_widths = np.zeros(old_spec_wavs.shape[0])
spec_lhs = np.zeros(old_spec_wavs.shape[0])
spec_lhs[0] = old_spec_wavs[0] - (old_spec_wavs[1] - old_spec_wavs[0])/2
spec_widths[-1] = (old_spec_wavs[-1] - old_spec_wavs[-2])
spec_lhs[1:] = (old_spec_wavs[1:] + old_spec_wavs[:-1])/2
spec_widths[:-1] = spec_lhs[1:] - spec_lhs[:-1]
filter_lhs = np.zeros(new_spec_wavs.shape[0]+1)
filter_widths = np.zeros(new_spec_wavs.shape[0])
filter_lhs[0] = new_spec_wavs[0] - (new_spec_wavs[1] - new_spec_wavs[0])/2
filter_widths[-1] = (new_spec_wavs[-1] - new_spec_wavs[-2])
filter_lhs[-1] = new_spec_wavs[-1]+(new_spec_wavs[-1]-new_spec_wavs[-2])/2
filter_lhs[1:-1] = (new_spec_wavs[1:] + new_spec_wavs[:-1])/2
filter_widths[:-1] = filter_lhs[1:-1] - filter_lhs[:-2]
# Check that the range of wavelengths to be resampled_fluxes onto
# falls within the initial sampling region
if filter_lhs[0] < spec_lhs[0] or filter_lhs[-1] > spec_lhs[-1]:
raise ValueError("spectres: The new wavelengths specified must \
fall within the range of the old wavelength values.")
# Generate output arrays to be populated
resampled_fluxes = np.zeros(spec_fluxes[..., 0].shape+new_spec_wavs.shape)
if spec_errs is not None:
if spec_errs.shape != spec_fluxes.shape:
raise ValueError("If specified, spec_errs must be the same shape \
as spec_fluxes.")
else:
resampled_fluxes_errs = np.copy(resampled_fluxes)
start = 0
stop = 0
# Calculate the new spectral flux and uncertainty values,
# loop over the new bins
for j in range(new_spec_wavs.shape[0]):
# Find the first old bin which is partially covered by the new bin
while spec_lhs[start+1] <= filter_lhs[j]:
start += 1
# Find the last old bin which is partially covered by the new bin
while spec_lhs[stop+1] < filter_lhs[j+1]:
stop += 1
# If the new bin falls entirely within one old bin the are the same
# the new flux and new error are the same as for that bin
if stop == start:
resampled_fluxes[..., j] = spec_fluxes[..., start]
if spec_errs is not None:
resampled_fluxes_errs[..., j] = spec_errs[..., start]
# Otherwise multiply the first and last old bin widths by P_ij,
# all the ones in between have P_ij = 1
else:
start_factor = (spec_lhs[start+1] - filter_lhs[j]) / \
(spec_lhs[start+1] - spec_lhs[start])
end_factor = (filter_lhs[j+1] - spec_lhs[stop]) / \
(spec_lhs[stop+1] - spec_lhs[stop])
spec_widths[start] *= start_factor
spec_widths[stop] *= end_factor
# Populate the resampled_fluxes spectrum and uncertainty arrays
resampled_fluxes[..., j] = np.sum(spec_widths[start:stop+1] *
spec_fluxes[..., start:stop+1],
axis=-1) / \
np.sum(spec_widths[start:stop+1])
if spec_errs is not None:
resampled_fluxes_errs[..., j] = \
np.sqrt(np.sum((spec_widths[start:stop+1] *
spec_errs[..., start:stop+1])**2, axis=-1)
)/np.sum(spec_widths[start:stop+1])
# Put back the old bin widths to their initial values for later use
spec_widths[start] /= start_factor
spec_widths[stop] /= end_factor
# If errors were supplied return the resampled_fluxes spectrum and
# error arrays
if spec_errs is not None:
return resampled_fluxes, resampled_fluxes_errs
# Otherwise just return the resampled_fluxes spectrum array
else:
return resampled_fluxes