#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# This file is part of the CASCADe package which has been
# developed within the ExoplANETS-A H2020 program.
#
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright (C) 2018, 2019, 2020, 2021 Jeroen Bouwman
"""Module defining the spectral extraction functionality used in cascade."""
import math
from functools import partial
import collections
import warnings
import copy
from itertools import zip_longest
import multiprocessing as mp
from asyncio import Event
from typing import Tuple
from psutil import virtual_memory, cpu_count
from tqdm import tqdm
import ray
from ray.actor import ActorHandle
import statsmodels.api as sm
from matplotlib import pyplot as plt
import seaborn as sns
import numpy as np
from scipy import ndimage
from scipy.ndimage import binary_dilation
from astropy.convolution import convolve
from astropy.convolution import Gaussian2DKernel
from astropy.convolution import Kernel2D
from astropy.modeling.models import Gaussian2D
from astropy.modeling.parameters import Parameter
from astropy.convolution import interpolate_replace_nans
from astropy.convolution import Gaussian1DKernel
from astropy.stats import sigma_clip
from skimage.registration import phase_cross_correlation
from skimage.transform import warp
from skimage._shared.utils import safe_as_int
from skimage.transform import rotate
from skimage.transform import SimilarityTransform
from sklearn.preprocessing import RobustScaler
from ..data_model import SpectralDataTimeSeries
from ..data_model import MeasurementDesc
from ..data_model import AuxilaryInfoDesc
from ..exoplanet_tools import SpectralModel
from ..utilities import _define_band_limits
from ..utilities import _define_rebin_weights
from ..utilities import _rebin_spectra
__all__ = ['directional_filters', 'create_edge_mask',
'determine_optimal_filter', 'define_image_regions_to_be_filtered',
'filter_image_cube', 'iterative_bad_pixel_flagging',
'extract_spectrum', 'create_extraction_profile',
'determine_relative_source_position',
'warp_polar', 'highpass', '_log_polar_mapping',
'_determine_relative_source_shift', 'register_telescope_movement',
'_determine_relative_rotation_and_scale', '_derotate_image',
'_pad_to_size', '_pad_region_of_interest_to_square',
'correct_wavelength_for_source_movent',
'rebin_to_common_wavelength_grid',
'determine_center_of_light_posision',
'determine_absolute_cross_dispersion_position',
'combine_scan_samples', 'sigma_clip_data',
'sigma_clip_data_cosmic', 'create_cleaned_dataset',
'compressROI', 'compressSpectralTrace',
'compressDataset', 'correct_initial_wavelength_shift',
'renormalize_spatial_scans']
def _round_up_to_odd_integer(value):
i = math.ceil(value)
if i % 2 == 0:
return i + 1
return i
class Banana(Gaussian2D):
"""
Modification of astrpy gaussian2D to get banana distribution.
Notes
-----
https://tiao.io/post/building-probability-distributions-
with-tensorflow-probability-bijector-api/
"""
amplitude = Parameter(default=1)
x_mean = Parameter(default=0)
y_mean = Parameter(default=0)
x_stddev = Parameter(default=1)
y_stddev = Parameter(default=1)
theta = Parameter(default=0.0)
power = Parameter(default=1.0)
sign = Parameter(default=1)
def __init__(self, amplitude=amplitude.default, x_mean=x_mean.default,
y_mean=y_mean.default, x_stddev=None, y_stddev=None,
theta=None, cov_matrix=None, power=power.default,
sign=sign.default, **kwargs):
if power is None:
power = power.default
if sign is None:
sign = sign.default
sign = np.sign(sign)
super().__init__(
amplitude=amplitude, x_mean=x_mean, y_mean=y_mean,
x_stddev=x_stddev, y_stddev=y_stddev, theta=theta,
cov_matrix=cov_matrix, power=power, sign=sign, **kwargs)
@staticmethod
def evaluate(x_in, y_in, amplitude, x_mean, y_mean, x_stddev, y_stddev,
theta, power, sign):
"""Two dimensional Gaussian function."""
x = x_in
y = y_in - sign*(np.abs(x_in)**power + 0.0)
cost2 = np.cos(theta) ** 2
sint2 = np.sin(theta) ** 2
sin2t = np.sin(2. * theta)
xstd2 = x_stddev ** 2
ystd2 = y_stddev ** 2
xdiff = x - x_mean
ydiff = y - y_mean
a = 0.5 * ((cost2 / xstd2) + (sint2 / ystd2))
b = 0.5 * ((sin2t / xstd2) - (sin2t / ystd2))
c = 0.5 * ((sint2 / xstd2) + (cost2 / ystd2))
return amplitude * np.exp(-((a * xdiff ** 2) + (b * xdiff * ydiff) +
(c * ydiff ** 2)))
class Banana2DKernel(Kernel2D):
"""
Modification of astropy Gaussian2DKernel to get a banana shaped kernel.
This class defines a banana shaped convolution kernel mimicking the shape
of the dispersion pattern on the detector near the short and long
wavelength ends.
"""
_separable = True
_is_bool = False
def __init__(self, sigma, power=None, sign=None, **kwargs):
self._model = Banana(1. / (2 * np.pi * sigma[0, 0] * sigma[1, 1]),
0., 0., cov_matrix=sigma, power=power, sign=sign)
self._default_size = _round_up_to_odd_integer(
8 * np.max([np.sqrt(sigma[0, 0]), np.sqrt(sigma[1, 1])]))
super().__init__(**kwargs)
self._truncation = np.abs(1. - self._array.sum())
[docs]def _define_covariance_matrix(x_stddev=None, y_stddev=None, theta=None):
"""
Define covariance matrix.
Define 2D covariance matrix based on standard deviation in x and y and
rotation angle
"""
if x_stddev is None:
x_stddev = 1.0
if y_stddev is None:
y_stddev = 1.0
if theta is None:
theta = 0.0
R = np.array([[np.cos(theta), -np.sin(theta)],
[np.sin(theta), np.cos(theta)]])
S = np.array([[x_stddev, 0.0], [0.0, y_stddev]])
sigma = np.linalg.multi_dot([R, S, S, R.T])
return sigma
[docs]def directional_filters(return_valid_angle_range=False):
"""
Directional filters for smoothing and filtering.
These filters can be used in a Nagao&Matsuyama like edge preserving
smoothing approach and are apropriate for dispersed spectra with a
vertical dispersion direction. If the angle from vertical of the
spectral trace of the dispersed light exceeds +- max(angle) radians,
additional larger values need to be added to the angles list.
Parameters
----------
return_valid_angle_range : 'bool'
optional, if True it returns the maximum angle range of the directional
filters
Returns
-------
nm_mask : numpy.ndarray of 'bool' type
Array containing all oriented masks used for edge preserving smooting.
maximum_angle_range : 'tuple' of 'float'
If return_valid_angle_range is True, the maximum range of angles
from vertical is returned
Notes
-----
When adding kernels, make sure the maximum is in the central pixel
"""
# note that the angels are in radians
angles = [np.radians(0.0), np.radians(-1.5), np.radians(1.5),
np.radians(-3.0), np.radians(3.0), np.radians(-4.5),
np.radians(4.5), np.radians(-6.0), np.radians(6.0),
np.radians(-9.0), np.radians(9.0), np.radians(-12.0),
np.radians(12.0), np.radians(0.0), np.radians(90)-np.radians(60),
np.radians(90)+np.radians(60), np.radians(90)-np.radians(60),
np.radians(90)+np.radians(60)]
if return_valid_angle_range:
return (np.min(angles), np.max(angles))
x_stddev = [0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1,
0.1, 0.1, 0.1, 0.1, 2.0, 0.1, 0.1, 0.1, 0.1]
y_stddev = [3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0, 3.0,
3.0, 3.0, 3.0, 3.0, 2.0, 3.0, 3.0, 3.0, 3.0]
sign = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, -1, -1]
power = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0]
x_kernel_size = 9
y_kernel_size = 9
Filters = np.zeros((x_kernel_size, y_kernel_size, len(angles)))
for ik, (omega, xstd, ystd, p, s) in enumerate(zip(angles, x_stddev,
y_stddev, power, sign)):
sigma = _define_covariance_matrix(xstd, ystd, omega)
kernel = Banana2DKernel(sigma, x_size=x_kernel_size,
y_size=y_kernel_size, power=p, sign=s,
mode='oversample')
kernel.normalize()
Filters[..., ik] = kernel.array
return Filters
[docs]def create_edge_mask(kernel, roi_mask):
"""
Create an edge mask.
Helper function for the optimal extraction task. This function
creates an edge mask to mask all pixels for which the convolution
kernel extends beyond the region of interest.
Parameters
----------
kernel : `array_like`
Convolution kernel specific for a given instrument and observing
mode, used in tasks such as replacing bad pixels and
spectral extraction.
roi_mask : `ndarray`
Mask defining the region of interest from which the speectra will
be extracted.
Returns
-------
edge_mask : 'array_like'
The edge mask based on the input kernel and roi_mask
"""
# dilation_mask = np.ones(kernel.shape)
dilation_mask = kernel
edge_mask = (binary_dilation(roi_mask, structure=dilation_mask,
border_value=True) ^ roi_mask)
return edge_mask
[docs]def define_image_regions_to_be_filtered(ROI, filterShape):
"""
Define image regions to be filtered.
This function defines all pixels and corresponding sub-regions in the
data cube to be be filtered.
Parameters
----------
ROI : 'ndarray' of 'float'
Region of interest on the detector images
fiterShape : 'tuple' of 'int'
y (dispersion direction) and x (spatial direstion) size of the
directional filters.
Returns
-------
enumerated_sub_regions : 'list'
enumerated definition of all regions in the spectral image cube and
corresponding regions of the direction filter.
"""
# find all images indices of the pixels of interest which are not flagged
# in the region of interest
y, x = np.where(ROI == False)
indices_poi = [(yidx, xidx, ...) for (yidx, xidx) in zip(y, x)]
# support boundary values used to define the sub-array ranges
xmax = ROI.shape[1]
xl = (filterShape[1]-1)//2+1
xs = xl-1
ymax = ROI.shape[0]
yl = (filterShape[0]-1)//2+1
ys = yl-1
# all regions around the pixels not flaged in the roi with a size equal
# to the filter size
image_sub_regions = [(slice(np.max([0, yidx-ys]),
np.min([ymax, yidx+yl]), None),
slice(np.max([0, xidx-xs]),
np.min([xmax, xidx+xl]), None),
tidx) for (yidx, xidx, tidx) in indices_poi]
# region of the filter used in the region around the pixels defined in roi
filter_sub_regions = [(slice(np.max([0, yidx-ys])-yidx+ys,
np.min([ymax, yidx+yl])-yidx+yl-1, None),
slice(np.max([0, xidx-xs])-xidx+xs,
np.min([xmax, xidx+xl])-xidx+xl-1, None),
tidx) for (yidx, xidx, tidx) in indices_poi]
# defines all regions in image and corresponding region of filter
sub_regions = list(zip(image_sub_regions, filter_sub_regions))
# eneumerated definition of all regions in image and corresponding region
# of filter
enumerated_sub_regions = \
[(i, sub, poi) for (i, (sub, poi)) in enumerate(zip(sub_regions,
indices_poi))]
return enumerated_sub_regions
[docs]def determine_optimal_filter(ImageCube, Filters, ROIcube, selector):
"""
Determine optimal fileter.
Determine the optimal Filter for the image cube using a procedure similar
to Nagao & Matsuyama edge preserving filtering.
Parameters
----------
ImageCube : 'ndarray'
Cube of Spectral images.
Filters : 'ndarray'
Cube of directional filters
ROIcube : 'ndarray' of 'bool'
Region of Interests for the input ImageCube
selector : 'list'
list containing all relevant information for each pixel within the
ROI to select the sub region in the image cube and filter cube on which
the filtering will be applied.
Returns
-------
selectorNumber : 'int'
id number of the selector (pixel)
optimum_filter_index : 'ndarray' of 'int'
array containing the optimal filter number for each pixel within the
ROI in the image cube.
SubImageOptimalMean : 'ndarray'
Optimal mean for each sub image in the sub image cube defined by the
selector.
SubImageOptimaVariance : 'ndarray'
Variance for each sub image in the sub image cube defined by the
selector using the optimal Filter.
"""
selectorNumber = selector[0]
SubFilters = Filters[selector[1][1]]
SubImageCube = ImageCube[selector[1][0]].data
SubImageMask = ImageCube[selector[1][0]].mask
mask = ROIcube[selector[1][0]]
mask = SubImageMask | mask
maskedCube = np.ma.array(SubImageCube, mask=mask)
SubImageVariance = np.ma.zeros((Filters.shape[-1], SubImageCube.shape[-1]))
SubImageMean = np.ma.zeros(SubImageVariance.shape)
SubImageSquaredMean = np.ma.zeros(SubImageVariance.shape)
for j, filterKernel in enumerate(SubFilters.T):
weights = np.tile(filterKernel,
(maskedCube.shape[-1], 1, 1)).T
SubImageMean[j, :] = np.ma.average(maskedCube, weights=weights,
axis=(0, 1))
SubImageSquaredMean[j, :] = np.ma.average(maskedCube**2,
weights=weights, axis=(0, 1))
SubImageVariance[j, :] = (SubImageSquaredMean[j, :] -
SubImageMean[j, :]**2)
optimum_filter_index = np.ma.argmin(SubImageVariance, axis=0)
idx = [optimum_filter_index, np.arange(len(optimum_filter_index))]
SubImageOptimalMean = SubImageMean[tuple(idx)]
SubImageOptimaVariance = SubImageVariance[tuple(idx)]
return selectorNumber, optimum_filter_index, SubImageOptimalMean, \
SubImageOptimaVariance
def chunks(lst, n):
"""
Yield successive n-sized chunks from lst.
Parameters
----------
lst : 'list'
Input list
n : 'integer'
Chunck size
"""
for i in range(0, len(lst), n):
yield lst[i:i + n]
@ray.remote
def split_work(ImageCube, Filters, ROIcube, selector):
"""
Split work.
Ray wrapper to be able to devide data in chunks which
can be filetered in parallel.
Parameters
----------
ImageCube : TYPE
DESCRIPTION.
Filters : TYPE
DESCRIPTION.
ROIcube : TYPE
DESCRIPTION.
selector : TYPE
DESCRIPTION.
Returns
-------
list
DESCRIPTION.
"""
return [determine_optimal_filter(ImageCube, Filters, ROIcube, x)
for x in selector]
[docs]def filter_image_cube(data_in, Filters, ROIcube, enumeratedSubRegions,
useMultiProcesses=True, ReturnCleanedData=True):
"""
Filter image cube.
This routine filters in input data clube using an optimal choise of a
directional filter and returns the filtered (smoothed) data. Optionally it
also returns a cleaned dataset, where the masked data is replaced by the
fitered data.
Parameters
----------
data_in : 'np.ndarray' of 'float'
Input spectral data cube.
Filters : 'ndarray' of 'float'
Directional filter kernels.
ROIcube : 'ndarray' of 'bool'
Region of interest for each spectral image.
enumeratedSubRegions : 'list'
List containing all information to define the regions around the pixels
within the ROI used in the filtering. This list is created with the
define_image_regions_to_be_filtered function.
useMultiProcesses : 'bool' (optional|True)
Flag to choose to use the multiprocessing module or not.
ReturnCleanedData : 'bool' (optional|True)
Flag to choose to return a cleaned data set or not.
Returns
-------
optimalFilterIndex : 'MaskedArray' of 'int'
Index number of the optimal filter for each pixel in the input data.
filteredImage : 'MaskedArray' of 'float'
The filtered data set.
filteredImageVariance : 'MaskedArray' of 'float'
The variance of the filtered data set.
cleanedData : 'MaskedArray' of 'float'
The cleaned data set.
"""
optimalFilterIndex = np.ma.zeros(data_in.shape, dtype='int')
optimalFilterIndex.mask = ROIcube
filteredImage = np.ma.zeros(data_in.shape)
filteredImage.mask = ROIcube
filteredImageVariance = np.ma.zeros(data_in.shape)
filteredImageVariance.mask = ROIcube
indices_poi = [poi for (_, _, poi) in enumeratedSubRegions]
if not useMultiProcesses:
# create new function with all fixed inout variables fixed.
func = partial(determine_optimal_filter, data_in, Filters, ROIcube)
filter_find_iterator = map(func, enumeratedSubRegions)
for j in tqdm(filter_find_iterator, total=len(enumeratedSubRegions),
dynamic_ncols=True):
optimalFilterIndex[indices_poi[j[0]]] = j[1]
filteredImage[indices_poi[j[0]]] = j[2]
filteredImageVariance[indices_poi[j[0]]] = j[3]
else:
ncpus = int(ray.cluster_resources()['CPU'])
chunksize = len(enumeratedSubRegions)//ncpus + 1
while chunksize > 256:
chunksize = chunksize//ncpus + 1
data_id = ray.put(data_in)
filters_id = ray.put(Filters)
roi_id = ray.put(ROIcube)
result_ids = \
[split_work.remote(data_id, filters_id, roi_id, x) for x in
list(chunks(enumeratedSubRegions, chunksize))]
pbar = tqdm(total=len(result_ids), dynamic_ncols=True,
desc='Filtering image cube')
while len(result_ids):
done_id, result_ids = ray.wait(result_ids)
k = ray.get(done_id[0])
for j in k:
optimalFilterIndex[indices_poi[j[0]]] = j[1]
filteredImage[indices_poi[j[0]]] = j[2]
filteredImageVariance[indices_poi[j[0]]] = j[3]
pbar.update(1)
pbar.close()
if ReturnCleanedData:
cleanedData = data_in.copy()
cleanedData[data_in.mask] = filteredImage[data_in.mask]
cleanedData.mask = cleanedData.mask | ROIcube
return optimalFilterIndex, filteredImage, filteredImageVariance, \
cleanedData
else:
return optimalFilterIndex, filteredImage, filteredImageVariance
[docs]def iterative_bad_pixel_flagging(dataset, ROIcube, Filters,
enumeratedSubRegions, sigmaLimit=4.0,
maxNumberOfIterations=12,
fractionalAcceptanceLimit=0.005,
useMultiProcesses=True,
maxNumberOfCPUs=2):
"""
Flag bad pixels.
This routine flags the bad pixels found in the input dataset and creates
a cleaned dataset.
Parameters
----------
dataset : 'SpectralDataTimeSeries'
ROIcube : 'ndarray' of 'bool'
Region of interest for each spectral image.
Filters : 'ndarray' of 'float'
Directional filter kernels.
enumeratedSubRegions : 'list'
List containing all information to define the regions around the pixels
within the ROI used in the filtering. This list is created with the
define_image_regions_to_be_filtered function.
sigmaLimit : 'float' (optional|4.0)
Standard diviation limit used to identify bad pixels
maxNumberOfIterations : 'int' (optional|12)
The maximum number of iterations used in bad pixel masking
fractionalAcceptanceLimit : 'float' (optional|0.005)
Fractional number of bad pixels still found in the dataset below
which the iteration can be terminated.
useMultiProcesses : 'bool' (optional|True)
Use multiprocessing or not.
max_number_of_cpus : 'int' (optional|12)
Maximum number of CPU's which can be used.
Returns
-------
dataset : 'SpectralDataTimeSeries'
Input data set containing the observed spectral time series. After
completinion of this function, the dataset is returned with an
updated mask with all diviating pixels flaged. Tho indicate this the
flag isSigmaCliped=True is set.
filteredDataset : 'SpectralDataTimeSeries'
The optimal filterd dataset. Includied in this dataset is
the optimal Filter Index, i.e. the index indicating the used
optimal filter for each pixel. To indicate that this is a
filtered dataset the flag isFilteredData=True is set.
cleanedDataset
The cleaned dataset. To indicte that this is a cleaned dataset,
the isCleanedData=True is set.
Notes
-----
In the current version no double progress bar is used as in some IDE's
double progress bars do not work properly.
"""
acceptanceLimit = int(len(enumeratedSubRegions)*fractionalAcceptanceLimit)
initialData = dataset.return_masked_array('data').copy()
if useMultiProcesses:
mem = virtual_memory()
ncpu = int(np.min([maxNumberOfCPUs,
np.max([1, cpu_count(logical=True)//2])
])
)
mem_store = np.max([int(initialData.nbytes*3.0), int(1.1*78643200)])
mem_workers = np.max([int(initialData.nbytes*5.0), int(1.1*52428800)])
if mem.available < (mem_store+mem_workers):
warnings.warn("WARNING: Not enough memory for Ray to start. "
"Required free memory: {} bytes "
"Available: {} bytes".
format(mem_store+mem_workers, mem.available))
# ray.disconnect()
# ray.init(num_cpus=ncpu, object_store_memory=mem_store,
# memory=mem_workers)
# bug fix
ray.init(num_cpus=ncpu, object_store_memory=mem_store)
numberOfFlaggedPixels = np.sum(~ROIcube & dataset.mask)
tqdm.write('Initial bad pixel flagging. '
'# of pixels masked: {}'.format(numberOfFlaggedPixels))
optimalFilterIndex, filteredImage, filteredImageVariance, cleanedData = \
filter_image_cube(initialData, Filters, ROIcube, enumeratedSubRegions,
useMultiProcesses=useMultiProcesses,
ReturnCleanedData=True)
iiteration = 1
while ((numberOfFlaggedPixels > acceptanceLimit) & \
(iiteration <= maxNumberOfIterations)) | (iiteration == 1):
mask = (cleanedData-filteredImage)**2 > (sigmaLimit *
filteredImageVariance)
numberOfFlaggedPixels = np.sum(~ROIcube & mask)
tqdm.write('Iteration #{} for bad pixel flagging. '
'# of pixels masked: {}'.format(iiteration,
numberOfFlaggedPixels))
cleanedData.mask = cleanedData.mask | mask
dataset.mask = dataset.mask | (~ROIcube & mask)
(optimalFilterIndex, filteredImage, filteredImageVariance,
cleanedData) = filter_image_cube(cleanedData, Filters, ROIcube,
enumeratedSubRegions,
useMultiProcesses=useMultiProcesses,
ReturnCleanedData=True)
iiteration += 1
if (iiteration > maxNumberOfIterations) & \
(numberOfFlaggedPixels < acceptanceLimit):
warnings.warn("Iteration not converged in "
"iterative_bad_pixel_flagging. {} mask values not "
"converged. An increase of the maximum number of "
"iteration steps might be advisable.".
format(numberOfFlaggedPixels-acceptanceLimit))
# ray.disconnect()
ray.shutdown()
cleanedUncertainty = np.ma.array(dataset.uncertainty.data.value.copy(),
mask=dataset.mask.copy())
cleanedUncertainty[cleanedUncertainty.mask] = \
np.sqrt(filteredImageVariance[cleanedUncertainty.mask])
cleanedUncertainty.mask = filteredImage.mask
ndim = dataset.data.ndim
selection = tuple((ndim-1)*[0]+[Ellipsis])
filteredDataset = \
SpectralDataTimeSeries(wavelength=dataset.wavelength,
wavelength_unit=dataset.wavelength_unit,
data=filteredImage,
data_unit=dataset.data_unit,
time=dataset.time.data.value[selection],
time_unit=dataset.time_unit,
time_bjd=dataset.time_bjd.data.value[selection],
time_bjd_unit=dataset.time_bjd_unit,
uncertainty=np.sqrt(filteredImageVariance),
target_name=dataset.target_name,
dataProduct=dataset.dataProduct,
dataFiles=dataset.dataFiles,
isFilteredData=True)
filteredDataset.optimalFilterIndex = optimalFilterIndex
cleanedDataset = \
SpectralDataTimeSeries(wavelength=dataset.wavelength,
wavelength_unit=dataset.wavelength_unit,
data=cleanedData,
data_unit=dataset.data_unit,
time=dataset.time.data.value[selection],
time_unit=dataset.time_unit,
time_bjd=dataset.time_bjd.data.value[selection],
time_bjd_unit=dataset.time_bjd_unit,
uncertainty=cleanedUncertainty,
target_name=dataset.target_name,
dataProduct=dataset.dataProduct,
dataFiles=dataset.dataFiles,
isCleanedData=True)
dataset.isSigmaCliped = True
return (dataset, filteredDataset, cleanedDataset)
[docs]def determine_relative_source_position(spectralImageCube, ROICube,
refIntegration,
upsampleFactor=111,
AngleOversampling=2):
"""
Determine the shift of the spectra relative to the first integration.
Parameters
----------
spectralImageCube : 'ndarray'
Input spectral image data cube. Fist dimintion is dispersion direction,
second dimintion is cross dispersion direction and the last dimension
is time.
ROICube : 'ndarray' of 'bool'
Cube containing the Region of interest for each integration.
If not given, it is assumed that the mask of the spectralImageCube
contains the region of interest.
refIntegration : 'int'
Index number of of integration to be taken as reference.
upsampleFactor : 'int', optional
integer factor by which to upsample image for FFT analysis to get
sub-pixel accuracy. Default value is 111
AngleOversampling : 'int', optional
Oversampling factor for angle determination, Default value 2
Returns
-------
relativeSourcePosition : 'collections.OrderedDict'
relative rotation angle, scaling and x and y position as a
function of time.
Raises
------
ValueError
In case refIntegration exceeds number of integrations
Notes
-----
Note that for sub-pixel registration to work correctly it should
be performed on cleaned data i.e. bad pixels have been identified and
corrected using the tools in this module.
"""
nintegrations = spectralImageCube.shape[-1]
refIntegration = int(refIntegration)
if (refIntegration > nintegrations-1) | (refIntegration < 0):
raise ValueError("Index number of reference integration exceeds \
limits. Aborting position determination")
upsampleFactor = np.max([1, int(upsampleFactor)])
AngleOversampling = np.max([1, int(AngleOversampling)])
ImageCube = spectralImageCube.copy()
refImage = ImageCube[..., refIntegration]
if ROICube is None:
ROICube = np.zeros((ImageCube.shape), dtype=bool)
ROIref = ROICube[..., refIntegration]
# First determine the rotation and scaling
relativeAngle = np.zeros((nintegrations))
relativeScale = np.ones((nintegrations))
for it in range(1, nintegrations):
relativeAngle[it], relativeScale[it] = \
_determine_relative_rotation_and_scale(
refImage, ROIref,
ImageCube[..., it],
ROICube[..., it],
upsampleFactor=upsampleFactor,
AngleOversampling=AngleOversampling)
# Second, determine the shift
yshift = np.zeros((nintegrations))
xshift = np.zeros((nintegrations))
for it, image in enumerate(ImageCube.T):
if not np.allclose(relativeAngle[it], 0.0):
derotateRefImage = _derotate_image(refImage, 0.0, ROI=ROIref,
order=3)
derotatedImage = _derotate_image(image.T, relativeAngle[it],
ROI=ROICube[..., it], order=3)
derotatedROIref = np.zeros_like(derotateRefImage).astype(bool)
derotatedROI = np.zeros_like(derotatedImage).astype(bool)
else:
derotateRefImage = refImage
derotatedImage = image.T
derotatedROIref = ROIref
derotatedROI = ROICube[..., it]
shift = _determine_relative_source_shift(derotateRefImage,
derotatedImage,
referenceROI=derotatedROIref,
ROI=derotatedROI,
upsampleFactor=upsampleFactor)
yshift[it] = shift[0]
xshift[it] = shift[1]
relativeSourcePosition = \
collections.OrderedDict(relativeAngle=relativeAngle,
relativeScale=relativeScale,
cross_disp_shift=xshift,
disp_shift=yshift)
return relativeSourcePosition
@ray.remote
def ray_determine_relative_source_position(spectralImageCube, ROICube,
refIntegration, pba,
upsampleFactor=111,
AngleOversampling=2):
"""
Ray wrapper for determine_relative_source_position.
Parameters
----------
spectralImageCube : 'ndarray'
Input spectral image data cube. Fist dimintion is dispersion direction,
second dimintion is cross dispersion direction and the last dimension
is time.
ROICube : 'ndarray' of 'bool'
Cube containing the Region of interest for each integration.
If not given, it is assumed that the mask of the spectralImageCube
contains the region of interest.
refIntegration : 'int'
Index number of of integration to be taken as reference.
upsampleFactor : 'int', optional
integer factor by which to upsample image for FFT analysis to get
sub-pixel accuracy. Default value is 111
AngleOversampling : 'int', optional
Oversampling factor for angle determination, Default value 2
Returns
-------
movement : 'collections.OrderedDict'
relative rotation angle, scaling and x and y position as a
function of time.
"""
movement = determine_relative_source_position(
spectralImageCube, ROICube, refIntegration,
upsampleFactor=upsampleFactor,
AngleOversampling=AngleOversampling)
pba.update.remote(1)
return movement
[docs]def _determine_relative_source_shift(reference_image, image,
referenceROI=None, ROI=None,
upsampleFactor=111, space='real'):
"""
Determine the relative shift of the spectral images.
This routine determine the relative shift between a reference spectral
image and another spectral image.
Parameters
----------
reference_image : 'ndarray or np.ma.MaskedArray' of 'float'
Reference spectral image
image : 'ndarray or np.ma.MaskedArray' of 'float'
Spectral image
referenceROI : ndarray' of 'bool', optional
ROI : 'ndarray' of 'bool', optional
upsampleFactor : 'int', optional
Default value is 111
space : 'str', optional
Default value is 'real'
Returns
-------
relativeImageShiftY
relative shift compared to the reference image in the dispersion
direction of the light (from top to bottom, shortest wavelength should
be at row 0. Note that this shift is defined such that shifting a
spectral image by this amound will place the trace at the exact same
position as that of the reference image
relativeImageShiftX
relative shift compared to the reference image in the cross-dispersion
direction of the light (from top to bottom, shortest wavelength should
be at row 0. Note that this shift is defined such that shifting a
spectral image by this amound will place the trace at the exact same
position as that of the reference image.
"""
ref_im = _pad_region_of_interest_to_square(reference_image, referenceROI)
im = _pad_region_of_interest_to_square(image, ROI)
# convolve with gaussian with sigma of 1 pixel to esnure that undersampled
# spectra are properly registered.
kernel = Gaussian2DKernel(1.0)
ref_im = convolve(ref_im, kernel, boundary='extend')
im = convolve(im, kernel, boundary='extend')
# subpixel precision by oversampling image by upsampleFactor
# returns shift, error and phase difference
shift, _, _ = \
phase_cross_correlation(ref_im, im, upsample_factor=upsampleFactor,
space=space, normalization=None)
relativeImageShiftY = -shift[0]
relativeImageShiftX = -shift[1]
return relativeImageShiftY, relativeImageShiftX
[docs]def _determine_relative_rotation_and_scale(reference_image, referenceROI,
image, ROI,
upsampleFactor=111,
AngleOversampling=2):
"""
Determine rotation and scalng changes.
This routine determines the relative rotation and scale change between
an reference spectral image and another spectral image.
Parameters
----------
reference_image : 'ndarray or np.ma.MaskedArray' of 'float'
Reference image
referenceROI : 'ndarray' of 'float'
image : 'ndarray or np.ma.MaskedArray' of 'float'
Image for which the rotation and translation relative to the reference
image will be determined
ROI : 'ndarray' of 'float'
Region of interest.
upsampleFactor : 'int', optional
Upsampling factor of FFT image used to determine sub-pixel shift.
By default set to 111.
AngleOversampling : 'int', optional
Upsampling factor of the FFT image in polar coordinates for the
determination of sub-degree rotation. Set by default to 2.
Returns
-------
relative_rotation
Relative rotation angle in degrees. The angle is defined such that the
image needs to be rotated by this angle to have the same orientation
as the reference spectral image
relative_scaling
Relative image scaling
"""
AngleOversampling = int(AngleOversampling)
nAngles = 360
NeededImageSize = 2*AngleOversampling*nAngles
ref_im = _pad_region_of_interest_to_square(reference_image, referenceROI)
ref_im = _pad_to_size(ref_im, NeededImageSize, NeededImageSize)
im = _pad_region_of_interest_to_square(image, ROI)
im = _pad_to_size(im, NeededImageSize, NeededImageSize)
# convolve with gaussian with sigma of 1 pixel to esnure that undersampled
# spectra are properly registered.
kernel = Gaussian2DKernel(1.0)
ref_im = convolve(ref_im, kernel, boundary='extend')
im = convolve(im, kernel, boundary='extend')
h = np.hanning(im.shape[0])
han2d = np.outer(h, h) # 2D Hanning window
fft_ref_im = np.abs(np.fft.fftshift(np.fft.fftn(ref_im*han2d)))**2
fft_im = np.abs(np.fft.fftshift(np.fft.fftn(im*han2d)))**2
h, w = fft_ref_im.shape
radius = 0.8*np.min([w/2, h/2])
hpf = highpass((h, w))
fft_ref_im_filtered = fft_ref_im * hpf
warped_fft_ref_im = warp_polar(fft_ref_im_filtered, scaling='log',
radius=radius, output_shape=None,
multichannel=None,
AngleOversampling=AngleOversampling)
fft_im_filtered = fft_im * hpf
warped_fft_im = warp_polar(fft_im_filtered, scaling='log', radius=radius,
output_shape=None, multichannel=None,
AngleOversampling=AngleOversampling)
tparams = phase_cross_correlation(warped_fft_ref_im, warped_fft_im,
upsample_factor=upsampleFactor,
space='real')
shifts = tparams[0]
# calculate rotation
# note, only look for angles between +- 90 degrees,
# remove any flip of 180 degrees due to search
shiftr, shiftc = shifts[:2]
shiftr = shiftr/AngleOversampling
if shiftr > 90.0:
shiftr = shiftr-180.0
if shiftr < -90.0:
shiftr = shiftr+180.0
relative_rotation = -shiftr
# Calculate scale factor from translation
klog = radius / np.log(radius)
relative_scaling = 1 / (np.exp(shiftc / klog))
return relative_rotation, relative_scaling
[docs]def _derotate_image(image, angle, ROI=None, order=3):
"""
Derotate image.
Parameters
----------
image : '2-D ndarray' of 'float'
Input image to be de-rotated by 'angle' degrees.
ROI : '2-D ndarray' of 'bool'
Region of interest (default None)
angle : 'float'
Rotaton angle in degrees
order : 'int'
Order of the used interpolation in the rotation function of the
skimage package.
Returns
-------
derotatedImage : '2-D ndarray' of 'float'
The zero padded and derotated image.
"""
h, w = image.shape
NeededImageSize = np.int(np.sqrt(h**2 + w**2))
im = _pad_region_of_interest_to_square(image, ROI)
im = _pad_to_size(im, NeededImageSize, NeededImageSize)
derotatedImage = rotate(im, angle, order=order)
return derotatedImage
[docs]def _pad_region_of_interest_to_square(image, ROI=None):
"""
Pad ROI to square.
Zero pad the extracted Region Of Interest of a larger image such that the
resulting image is squared.
Parameters
----------
image : '2-D ndarray' of 'float'
Input image to be de-rotated by 'angle' degrees.
ROI : '2-D ndarray' of 'bool'
Region of interest (default None)
Returns
-------
padded_image : '2-D ndarray' of 'float'
"""
if ROI is not None:
label_im, _ = ndimage.label(ROI)
elif isinstance(image, np.ma.core.MaskedArray):
label_im, _ = ndimage.label(image.mask)
else:
raise AttributeError("For image 0 padding either use MaskedArray as \
input or provide ROI. Aborting 0 padding")
slice_y, slice_x = ndimage.find_objects(label_im != 1)[0]
padded_image = image[slice_y, slice_x]
if isinstance(image, np.ma.core.MaskedArray):
padded_image.set_fill_value(0.0)
padded_image = padded_image.filled()
h, w = padded_image.shape
if h == w:
return padded_image
im_size = np.max([h, w])
delta_h = im_size - h
delta_w = im_size - w
padding = ((delta_h//2, delta_h-(delta_h//2)),
(delta_w//2, delta_w-(delta_w//2)))
padded_image = np.pad(padded_image,
padding, 'constant', constant_values=(0))
return padded_image
[docs]def _pad_to_size(image, h, w):
"""
Zero pad the input image to an image of hight h and width w.
Parameters
----------
image : '2-D ndarray' of 'float'
Input image to be zero-padded to size (h, w).
h : 'int'
Hight (number of rows) of output image.
w : 'int'
Width (number of columns) of output image.
Returns
-------
padded_image : '2-D ndarray' of 'float'
"""
padded_image = image.copy()
if isinstance(padded_image, np.ma.core.MaskedArray):
padded_image.set_fill_value(0.0)
padded_image = padded_image.filled()
h_image, w_image = padded_image.shape
npad_h = np.max([1, (h-h_image)//2])
npad_w = np.max([1, (w-w_image)//2])
padding = ((npad_h, npad_h), (npad_w, npad_w))
padded_image = np.pad(padded_image,
padding, 'constant', constant_values=(0))
return padded_image
[docs]def _log_polar_mapping(output_coords, k_angle, k_radius, center):
"""
Inverse mapping function to convert from cartesion to polar coordinates.
Parameters
----------
output_coords : ndarray
`(M, 2)` array of `(col, row)` coordinates in the output image
k_angle : float
Scaling factor that relates the intended number of rows in the output
image to angle: ``k_angle = nrows / (2 * np.pi)``
k_radius : float
Scaling factor that relates the radius of the circle bounding the
area to be transformed to the intended number of columns in the output
image: ``k_radius = width / np.log(radius)``
center : tuple (row, col)
Coordinates that represent the center of the circle that bounds the
area to be transformed in an input image.
Returns
-------
coords : ndarray
`(M, 2)` array of `(col, row)` coordinates in the input image that
correspond to the `output_coords` given as input.
"""
angle = output_coords[:, 1] / k_angle
rr = ((np.exp(output_coords[:, 0] / k_radius)) * np.sin(angle)) + center[0]
cc = ((np.exp(output_coords[:, 0] / k_radius)) * np.cos(angle)) + center[1]
coords = np.column_stack((cc, rr))
return coords
[docs]def _linear_polar_mapping(output_coords, k_angle, k_radius, center):
"""
Inverse mapping function to convert from cartesion to polar coordinates.
Parameters
----------
output_coords : ndarray
`(M, 2)` array of `(col, row)` coordinates in the output image
k_angle : float
Scaling factor that relates the intended number of rows in the output
image to angle: ``k_angle = nrows / (2 * np.pi)``
k_radius : float
Scaling factor that relates the radius of the circle bounding the
area to be transformed to the intended number of columns in the output
image: ``k_radius = ncols / radius``
center : tuple (row, col)
Coordinates that represent the center of the circle that bounds the
area to be transformed in an input image.
Returns
-------
coords : ndarray
`(M, 2)` array of `(col, row)` coordinates in the input image that
correspond to the `output_coords` given as input.
"""
angle = output_coords[:, 1] / k_angle
rr = ((output_coords[:, 0] / k_radius) * np.sin(angle)) + center[0]
cc = ((output_coords[:, 0] / k_radius) * np.cos(angle)) + center[1]
coords = np.column_stack((cc, rr))
return coords
[docs]def warp_polar(image, center=None, *, radius=None, AngleOversampling=1,
output_shape=None, scaling='linear', multichannel=False,
**kwargs):
"""
Remap image to polor or log-polar coordinates space.
Parameters
----------
image : ndarray
Input image. Only 2-D arrays are accepted by default. If
`multichannel=True`, 3-D arrays are accepted and the last axis is
interpreted as multiple channels.
center : tuple (row, col), optional
Point in image that represents the center of the transformation (i.e.,
the origin in cartesian space). Values can be of type `float`.
If no value is given, the center is assumed to be the center point
of the image.
radius : float, optional
Radius of the circle that bounds the area to be transformed.
AngleOversampling : int
Oversample factor for number of angles
output_shape : tuple (row, col), optional
scaling : {'linear', 'log'}, optional
Specify whether the image warp is polar or log-polar. Defaults to
'linear'.
multichannel : bool, optional
Whether the image is a 3-D array in which the third axis is to be
interpreted as multiple channels. If set to `False` (default), only 2-D
arrays are accepted.
**kwargs : keyword arguments
Passed to `transform.warp`.
Returns
-------
warped : ndarray
The polar or log-polar warped image.
Examples
--------
Perform a basic polar warp on a grayscale image:
>>> from skimage import data
>>> from skimage.transform import warp_polar
>>> image = data.checkerboard()
>>> warped = warp_polar(image)
Perform a log-polar warp on a grayscale image:
>>> warped = warp_polar(image, scaling='log')
Perform a log-polar warp on a grayscale image while specifying center,
radius, and output shape:
>>> warped = warp_polar(image, (100,100), radius=100,
... output_shape=image.shape, scaling='log')
Perform a log-polar warp on a color image:
>>> image = data.astronaut()
>>> warped = warp_polar(image, scaling='log', multichannel=True)
"""
if image.ndim != 2 and not multichannel:
raise ValueError("Input array must be 2 dimensions "
"when `multichannel=False`,"
" got {}".format(image.ndim))
if image.ndim != 3 and multichannel:
raise ValueError("Input array must be 3 dimensions "
"when `multichannel=True`,"
" got {}".format(image.ndim))
if center is None:
center = (np.array(image.shape)[:2] / 2) - 0.5
if radius is None:
w, h = np.array(image.shape)[:2] / 2
radius = np.sqrt(w ** 2 + h ** 2)
if output_shape is None:
height = 360*AngleOversampling
width = int(np.ceil(radius))
output_shape = (height, width)
else:
output_shape = safe_as_int(output_shape)
height = output_shape[0]
width = output_shape[1]
if scaling == 'linear':
k_radius = width / radius
map_func = _linear_polar_mapping
elif scaling == 'log':
k_radius = width / np.log(radius)
map_func = _log_polar_mapping
else:
raise ValueError("Scaling value must be in {'linear', 'log'}")
k_angle = height / (2 * np.pi)
warp_args = {'k_angle': k_angle, 'k_radius': k_radius, 'center': center}
warped = warp(image, map_func, map_args=warp_args,
output_shape=output_shape, **kwargs)
return warped
[docs]def highpass(shape):
"""
Return highpass filter to be multiplied with fourier transform.
Parameters
----------
shape : 'ndarray' of 'int'
Input shape of 2d filter
Returns
-------
filter
high pass filter
"""
x = np.outer(
np.cos(np.linspace(-math.pi/2., math.pi/2., shape[0])),
np.cos(np.linspace(-math.pi/2., math.pi/2., shape[1])))
return (1.0 - x) * (2.0 - x)
def grouper(iterable, n, fillvalue=None):
"""
Collect data into fixed-length chunks or blocks.
Parameters
----------
iterable : TYPE
DESCRIPTION.
n : TYPE
DESCRIPTION.
fillvalue : TYPE, optional
DESCRIPTION. The default is None.
Returns
-------
TYPE
DESCRIPTION.
"""
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx
args = [iter(iterable)] * n
return zip_longest(fillvalue=fillvalue, *args)
@ray.remote
class ProgressBarActor:
counter: int
delta: int
event: Event
def __init__(self) -> None:
self.counter = 0
self.delta = 0
self.event = Event()
def update(self, num_items_completed: int) -> None:
"""
Updates the ProgressBar with the incremental
number of items that were just completed.
"""
self.counter += num_items_completed
self.delta += num_items_completed
self.event.set()
async def wait_for_update(self) -> Tuple[int, int]:
"""
Blocking call.
Waits until somebody calls `update`, then returns a tuple of
the number of updates since the last call to
`wait_for_update`, and the total number of completed items.
"""
await self.event.wait()
self.event.clear()
saved_delta = self.delta
self.delta = 0
return saved_delta, self.counter
def get_counter(self) -> int:
"""
Returns the total number of complete items.
"""
return self.counter
class ProgressBar:
progress_actor: ActorHandle
total: int
description: str
pbar: tqdm
def __init__(self, total: int, description: str = ""):
# Ray actors don't seem to play nice with mypy, generating
# a spurious warning for the following line,
# which we need to suppress. The code is fine.
self.progress_actor = ProgressBarActor.remote() # type: ignore
self.total = total
self.description = description
@property
def actor(self) -> ActorHandle:
"""
Returns a reference to the remote `ProgressBarActor`.
When you complete tasks, call `update` on the actor.
"""
return self.progress_actor
def print_until_done(self) -> None:
"""Blocking call.
Do this after starting a series of remote Ray tasks, to which you've
passed the actor handle. Each of them calls `update` on the actor.
When the progress meter reaches 100%, this method returns.
"""
pbar = tqdm(desc=self.description, total=self.total)
while True:
delta, counter = ray.get(self.actor.wait_for_update.remote())
pbar.update(delta)
if counter >= self.total:
pbar.close()
return
def ray_loop(dataCube, ROICube=None, upsampleFactor=111,
AngleOversampling=2, nreference=6, maxNumberOfCPUs=2,
useMultiProcesses=True):
"""
Ray wrapper around determine_relative_source_position function.
Performs parallel loop for different reference integrations to determine
the relative source movement on the detector.
Parameters
----------
dataCube : 'ndarray'
Input spectral image data cube. Fist dimention is dispersion direction,
second dimintion is cross dispersion direction and the last dimension
is time. The shortest wavelengths are at the first row
ROICube : 'ndarray' of 'bool', optional
Region of Interest
nreferences : 'int', optional
Number of reference times used to determine the relative movement
upsampleFactor : 'int, optional
Upsample factor for translational movement
AngleOversampling : 'int, optional
Upsample factor for determination of rotational movement.
max_number_of_cpus : 'int', optional
Maximum number of CPU used when using parallel calculations.
useMultiProcesses : 'bool', optional
If True, calculations will be done in parallel.
Returns
-------
relativeSourcePosition : 'collections.OrderedDict'
Ordered dict containing the relative rotation angle,
scaling and x and y position as a function of time.
"""
ntime = dataCube.shape[-1]
if not useMultiProcesses:
# create new function with all fixed inout variables fixed.
func = partial(determine_relative_source_position, dataCube,
ROICube, upsampleFactor=upsampleFactor,
AngleOversampling=AngleOversampling)
ITR = list(np.linspace(0, ntime-1, nreference, dtype=int))
movement_iterator = map(func, ITR)
for j in tqdm(movement_iterator, total=len(ITR),
dynamic_ncols=True):
yield j
else:
ncpu = int(np.min([maxNumberOfCPUs, np.max([1, mp.cpu_count()-3])]))
ray.init(num_cpus=ncpu)
dataCube_id = ray.put(dataCube)
ROICube_id = ray.put(ROICube)
upsampleFactor_id = ray.put(upsampleFactor)
AngleOversampling_id = ray.put(AngleOversampling)
ITR = iter(np.linspace(0, ntime-1, nreference, dtype=int))
pb = ProgressBar(nreference,
'Determine Telescope movement for '
'{} reference times'.format(nreference))
actor = pb.actor
result_ids = \
[ray_determine_relative_source_position.remote(
dataCube_id,
ROICube_id,
x,
actor,
upsampleFactor=upsampleFactor_id,
AngleOversampling=AngleOversampling_id) for x in ITR]
pb.print_until_done()
MPITR = ray.get(result_ids)
for relativeSourcePosition in MPITR:
yield relativeSourcePosition
ray.shutdown()
[docs]def register_telescope_movement(cleanedDataset, ROICube=None, nreferences=6,
mainReference=4, upsampleFactor=111,
AngleOversampling=2, verbose=False,
verboseSaveFile=None, maxNumberOfCPUs=2,
useMultiProcesses=True):
"""
Register the telescope movement.
Parameters
----------
cleanedDataset : 'SpectralDataTimeSeries'
Input dataset. Note that for image registration to work properly,
bad pixels need ti be removed (cleaned) first. This routine checks if
a cleaned dataset is used by checking for the isCleanedData flag.
ROICube : 'ndarray' of 'bool', optional
Cube containing the Region of interest for each integration.
If not given, it is assumed that the mask of the cleanedDataset
contains the region of interest.
nreferences : 'int', optional
Default is 6.
mainReference : 'int', optional
Default is 4.
upsampleFactor : 'int, optional
Upsample factor of FFT images to determine relative movement at
sub-pixel level. Default is 111
AngleOversampling : 'int, optional
Upsampling factor of the angle in the to polar coordinates transformed
FFT images to determine the relative rotation adn scale change.
Default is 2.
verbose : 'bool', optional
If true diagnostic plots will be generated. Default is False
verboseSaveFile : 'str', optional
If not None, verbose output will be saved to the specified file.
max_number_of_cpus : 'int', optional
Maxiumum bumber of cpu's to be used.
Returns
-------
spectralMovement : 'OrderedDict'
Ordered dict containing the relative rotation, scaling,
and movement in the dispersion and cross dispersion direction.
Raises
------
ValueError, TypeError
Errors are raised if certain data is not present of from the wrong
type.
"""
try:
if cleanedDataset.isCleanedData is False:
raise ValueError
except (ValueError, AttributeError):
raise TypeError("Input dataset is not recognized as cleaned dataset")
maskeddata = cleanedDataset.return_masked_array('data').copy()
dataUse = maskeddata.data
if ROICube is None:
ROICube = maskeddata.mask
else:
ROICube = np.logical_or(maskeddata.mask, ROICube)
ntime = dataUse.shape[-1]
if (nreferences < 1) | (nreferences > ntime):
raise ValueError("Wrong nreferences value")
if (mainReference < 0) | (mainReference > nreferences):
raise ValueError("Wrong mainReference value")
determinePositionIterator = \
ray_loop(dataUse, ROICube=ROICube,
upsampleFactor=upsampleFactor,
AngleOversampling=AngleOversampling,
nreference=nreferences,
maxNumberOfCPUs=maxNumberOfCPUs,
useMultiProcesses=useMultiProcesses)
iteratorResults = list(determinePositionIterator)
referenceIndex = np.linspace(0, ntime-1, nreferences, dtype=int)
testAngle = np.zeros((nreferences, ntime))
testScale = np.zeros((nreferences, ntime))
testCrossDispShift = np.zeros((nreferences, ntime))
testDispShift = np.zeros((nreferences, ntime))
for i in range(nreferences):
testAngle[i, :] = iteratorResults[i]['relativeAngle'] - \
iteratorResults[i]['relativeAngle'][referenceIndex[mainReference]]
testScale[i, :] = iteratorResults[i]['relativeScale'] / \
iteratorResults[i]['relativeScale'][referenceIndex[mainReference]]
testCrossDispShift[i, :] = iteratorResults[i]['cross_disp_shift'] - \
iteratorResults[i]['cross_disp_shift'][referenceIndex[
mainReference]]
testDispShift[i, :] = iteratorResults[i]['disp_shift'] - \
iteratorResults[i]['disp_shift'][referenceIndex[mainReference]]
relativeAngle = np.median(testAngle, axis=0)
relativeScale = np.median(testScale, axis=0)
crossDispersionShift = np.median(testCrossDispShift, axis=0)
dispersionShift = np.median(testDispShift, axis=0)
# shift to first time index
testAngle = testAngle - relativeAngle[0]
testScale = testScale / relativeScale[0]
testCrossDispShift = testCrossDispShift - crossDispersionShift[0]
testDispShift = testDispShift - dispersionShift[0]
relativeAngle = relativeAngle - relativeAngle[0]
relativeScale = relativeScale / relativeScale[0]
crossDispersionShift = crossDispersionShift - crossDispersionShift[0]
dispersionShift = dispersionShift - dispersionShift[0]
if verbose:
sns.set_context("talk", font_scale=1.5, rc={"lines.linewidth": 2.5})
sns.set_style("white", {"xtick.bottom": True, "ytick.left": True})
fig, axes = plt.subplots(figsize=(14, 12), nrows=2, ncols=2)
ax0, ax1, ax2, ax3 = axes.flatten()
ax0.plot(testAngle.T)
ax0.plot(relativeAngle, lw=5)
ax0.set_title('Relative Angle')
ax0.set_xlabel('Integration #')
ax0.set_ylabel('Angle [degrees]')
ax1.plot(testScale.T)
ax1.plot(relativeScale, lw=5)
ax1.set_title('Relative Scale')
ax1.set_xlabel('Integration #')
ax1.set_ylabel('Scaling Factor')
ax2.plot(testCrossDispShift.T)
ax2.plot(crossDispersionShift, lw=5)
ax2.set_title('Relative Cross-dispersion shift')
ax2.set_ylabel('Shift [pixles]')
ax2.set_xlabel('Integration #')
ax3.plot(testDispShift.T)
ax3.plot(dispersionShift, lw=5)
ax3.set_title('Relative Dispersion shift')
ax3.set_xlabel('Integration #')
ax3.set_ylabel('Shift [pixles]')
fig.subplots_adjust(hspace=0.3)
fig.subplots_adjust(wspace=0.45)
plt.show()
if verboseSaveFile is not None:
fig.savefig(verboseSaveFile, bbox_inches='tight')
spectralMovement = \
collections.OrderedDict(relativeAngle=relativeAngle,
relativeScale=relativeScale,
crossDispersionShift=crossDispersionShift,
dispersionShift=dispersionShift,
referenceIndex=referenceIndex[mainReference])
return spectralMovement
[docs]def determine_center_of_light_posision(cleanData, ROI=None, verbose=False,
quantileCut=0.5, orderTrace=2):
"""
Determine the center of light position.
This routine determines the center of light position (cross-dispersion)
of the dispersed light. The center of light is defined in a similar
way as the center of mass. This routine also fits a polynomial to the
spectral trace.
Parameters
----------
cleanData : 'maskedArray' or 'ndarray'
Input data
ROI : 'ndarray' of 'bool', optional
Region of interest
verbose : 'bool'
Default is False
quantileCut : 'float', optional
Default is 0.5
orderTrace : 'int'
Default is 2
Returns
-------
total_light : 'ndarray'
Total summed signal on the detector as function of wavelength.
idx : 'int'
COL_pos : 'ndarray'
Center of light position of the dispersed light.
ytrace : 'ndarray'
Spectral trace position in fraction of pixels in the dispersion
direction
xtrace : 'ndarray'
Spectral trace position in fraction of pixels in the cross dispersion
direction
"""
if isinstance(cleanData, np.ma.core.MaskedArray):
data_use = cleanData.data
if ROI is not None:
mask_use = cleanData.mask | ROI
else:
mask_use = cleanData.mask
else:
data_use = cleanData
if ROI is not None:
mask_use = ROI
else:
mask_use = np.zeros_like(cleanData, dtype='bool')
data = np.ma.array(data_use, mask=mask_use)
npix, mpix = data.shape
position_grid = np.mgrid[0:npix, 0:mpix]
total_light = np.ma.sum(data, axis=1)
COL = np.ma.sum(data*position_grid[1, ...], axis=1) / \
total_light
treshhold = np.quantile(total_light[~total_light.mask], quantileCut)
idx_use = np.ma.where(total_light > treshhold)[0]
ytrace = np.arange(npix)
idx = ytrace[idx_use]
X = []
for i in range(orderTrace+1):
X.append(idx**i)
X = np.array(X).T
robust_fit = sm.RLM(COL[idx_use], X).fit()
z = robust_fit.params[::-1]
f = np.poly1d(z)
xtrace = f(ytrace)
if verbose:
sns.set_context("talk", font_scale=1.5, rc={"lines.linewidth": 2.5})
sns.set_style("white", {"xtick.bottom": True, "ytick.left": True})
fig, ax = plt.subplots(figsize=(10, 10))
ax.plot(idx_use, total_light[idx_use])
ax.set_title('Integrated Signal')
ax.set_xlabel('Pixel Position Dispersion Direction')
ax.set_ylabel('Integrated Signal')
plt.show()
return total_light[idx_use], idx, COL[idx_use], ytrace, xtrace
[docs]def correct_initial_wavelength_shift(referenceDataset, cascade_configuration,
*otherDatasets):
"""
Determine if there is an initial wavelength shift and correct.
Parameters
----------
referenceDataset : 'cascade.data_model.SpectralDataTimeSeries'
Dataset who's wavelength is used as refernce of the wavelength
correction.
cascade_configuration : 'cascade.initialize.initialize.configurator'
Singleton containing the confifuration parameters of cascade.
**otherDatasets : 'cascade.data_model.SpectralDataTimeSeries'
Optional.
Other datasets assumed to have the same walengths as the reference
dataset and which will be corrected simultaneously with the reference.
Returns
-------
referenceDataset : 'list' of 'cascade.data_model.SpectralDataTimeSeries'
Dataset with corrected wavelengths.
otherDatasets_list : 'list' of 'cascade.data_model.SpectralDataTimeSeries'
Optinal output.
modeled_observations : 'list' of 'ndarray'
stellar_model : 'list' of 'ndarray'
corrected_observations : 'list' of 'ndarray'
"""
model_spectra = SpectralModel(cascade_configuration)
wave_shift, error_wave_shift = \
model_spectra.determine_wavelength_shift(referenceDataset)
referenceDataset.wavelength = referenceDataset.wavelength+wave_shift
referenceDataset.add_auxilary(wave_shift=wave_shift.to_string())
referenceDataset.add_auxilary(error_wave_shift=error_wave_shift.to_string())
otherDatasets_list = list(otherDatasets)
for i, dataset in enumerate(otherDatasets_list):
dataset.wavelength = dataset.wavelength+wave_shift
dataset.add_auxilary(wave_shift=wave_shift.to_string())
dataset.add_auxilary(error_wave_shift=error_wave_shift.to_string())
otherDatasets_list[i] = dataset
modeled_observations = \
[model_spectra.model_wavelength, model_spectra.model_observation,
model_spectra.scaling, model_spectra.relative_distanc_sqr,
model_spectra.sensitivity]
stellar_model = \
[model_spectra.model_wavelength, model_spectra.rebinned_stellar_model]
input_stellar_model = [model_spectra.sm[2], model_spectra.sm[3]]
corrected_observations = \
[model_spectra.corrected_wavelength, model_spectra.observation,
wave_shift, error_wave_shift]
stellar_model_parameters = model_spectra.par
if len(otherDatasets_list) > 0:
return [referenceDataset] + otherDatasets_list, modeled_observations,\
stellar_model, corrected_observations, input_stellar_model, \
stellar_model_parameters
return referenceDataset, modeled_observations, stellar_model, \
corrected_observations, input_stellar_model, \
stellar_model_parameters
[docs]def renormalize_spatial_scans(referenceDataset, *otherDatasets):
"""
bla.
Parameters
----------
referenceDataset : TYPE
DESCRIPTION.
*otherDatasets : TYPE
DESCRIPTION.
Returns
-------
TYPE
DESCRIPTION.
"""
otherDatasets_list = list(otherDatasets)
try:
scan_direction = np.array(referenceDataset.scan_direction)
except AttributeError:
if len(otherDatasets_list) > 0:
return [referenceDataset] + otherDatasets_list
return referenceDataset
unique_scan_directions = np.unique(scan_direction)
if len(unique_scan_directions) != 2:
if len(otherDatasets_list) > 0:
return [referenceDataset] + otherDatasets_list
return referenceDataset
idx = scan_direction == 0.0
med0 = np.median(referenceDataset.data[...,idx]).value
med1 = np.median(referenceDataset.data[...,~idx]).value
med = np.median(referenceDataset.data).value
scaling0 = med / med0
scaling1 = med / med1
reference_data = copy.deepcopy(referenceDataset.data)
reference_data[..., idx] = reference_data[..., idx]*scaling0
reference_data[..., ~idx] = reference_data[..., ~idx]*scaling1
referenceDataset.data = reference_data
reference_uncertainty = copy.deepcopy(referenceDataset.uncertainty)
reference_uncertainty[..., idx] = reference_uncertainty[..., idx]*scaling0
reference_uncertainty[...,~idx] = reference_uncertainty[...,~idx]*scaling1
referenceDataset.uncertainty = reference_uncertainty
for i, dataset in enumerate(otherDatasets_list):
reference_data = copy.deepcopy(dataset.data)
reference_data[..., idx] = reference_data[..., idx]*scaling0
reference_data[..., ~idx] = reference_data[..., ~idx]*scaling1
dataset.data = reference_data
reference_uncertainty = copy.deepcopy(dataset.uncertainty)
reference_uncertainty[..., idx] = reference_uncertainty[..., idx]*scaling0
reference_uncertainty[...,~idx] = reference_uncertainty[...,~idx]*scaling1
dataset.uncertainty = reference_uncertainty
otherDatasets_list[i] = dataset
if len(otherDatasets_list) > 0:
return [referenceDataset] + otherDatasets_list
return referenceDataset
[docs]def determine_absolute_cross_dispersion_position(cleanedDataset, initialTrace,
ROI=None,
verbose=False,
verboseSaveFile=None,
quantileCut=0.5,
orderTrace=2):
"""
Determine the initial cross dispersion position.
This routine updates the initial spectral trace for positional shifts in
the cross dispersion direction for the first exposure of the the time
series observation.
Parameters
----------
cleanedDataset : 'SpectralDataTimeSeries'
initialTrace : 'OrderedDict'
input spectral trace.
ROI : 'ndarray' of 'bool'
Region of interest.
verbose : 'bool', optional
If true diagnostic plots will be generated. Default is False
verboseSaveFile : 'str', optional
If not None, verbose output will be saved to the specified file.
quantileCut : 'float', optional
Default is 0.5
orderTrace : 'int', optional
Default is 2
Returns
-------
newShiftedTrace : 'OrderedDict'
To the observed source poisiton shifted spectral trace
newFittedTrace : 'OrderedDict'
Trace determined by fit to the center of light position.
initialCrossDispersionShift : 'float'
Shift between initial guess for spectral trace position and
fitted trace position of the first spectral image.
"""
cleanedData = cleanedDataset.return_masked_array('data')
newShiftedTrace = copy.copy(initialTrace)
newFittedTrace = copy.copy(initialTrace)
if ROI is not None:
roiUse = cleanedData[..., 0].mask | ROI
else:
roiUse = cleanedData[..., 0].mask
kernel = Gaussian2DKernel(1.0)
convolvedFirstImage = convolve(cleanedData[..., 0], kernel,
boundary='extend')
_, idx, col, yTrace, xTrace = \
determine_center_of_light_posision(convolvedFirstImage, ROI=roiUse,
quantileCut=quantileCut,
orderTrace=orderTrace)
medianCrossDispersionPosition = np.ma.median(xTrace[idx])
medianCrossDispersionPositionInitialTrace = \
np.ma.median(initialTrace['positional_pixel'].value[idx])
initialCrossDispersionShift = \
medianCrossDispersionPosition-medianCrossDispersionPositionInitialTrace
newShiftedTrace['positional_pixel'] = \
newShiftedTrace['positional_pixel'] + \
initialCrossDispersionShift*newShiftedTrace['positional_pixel'].unit
newFittedTrace['positional_pixel'] = \
xTrace*newFittedTrace['positional_pixel'].unit
if verbose:
sns.set_context("talk", font_scale=1.5, rc={"lines.linewidth": 2.5})
sns.set_style("white", {"xtick.bottom": True, "ytick.left": True})
fig, ax = plt.subplots(figsize=(10, 10))
ax.plot(idx, col, label='COL')
ax.plot(yTrace, xTrace, label='Fitted Trace')
ax.plot(newShiftedTrace['positional_pixel'].value,
label='Shifted Instrument Trace')
ax.plot(initialTrace['positional_pixel'].value,
label='Initial Instrument Trace')
ax.legend(loc='best')
ax.set_title('Initial Trace Position')
ax.set_xlabel('Pixel Position Dispersion Direction')
ax.set_ylabel('Pixel Position Cross-Dispersion Direction')
plt.show()
if verboseSaveFile is not None:
fig.savefig(verboseSaveFile, bbox_inches='tight')
return newShiftedTrace, newFittedTrace, medianCrossDispersionPosition, \
initialCrossDispersionShift
[docs]def correct_wavelength_for_source_movent(datasetIn, spectral_movement,
useScale=False,
useCrossDispersion=False,
verbose=False, verboseSaveFile=None):
"""
Correct wavelengths for source movement.
This routine corrects the wavelength cube attached to the spectral
image data cube for source (telescope) movements
Parameters
----------
datasetIn : 'SpectralDataTimeSeries'
Input dataset for which the waveength will be corrected for telescope
movement
spectral_movement : 'OrderedDict'
Ordered dict containing the relative rotation, scaling,
and movement in the dispersion and cross dispersion direction.
useScale : 'bool', optional
If set the scale parameter is used to correct the wavelength.
Default is False.
useCrossDispersion : 'bool', optional
If set the coress dispersion movement is used to correct the
wavelength. Default is False.
verbose : 'bool', optional
If true diagnostic plots will be generated. Default is False
verboseSaveFile : 'str', optional
If not None, verbose output will be saved to the specified file.
Returns
-------
dataset_out : 'SpectralDataTimeSeries'
The flag isMovementCorrected=True is set to indicate that this dataset
is corrected
Notes
-----
Scaling changes are not corrected at the moment. Note that the used
rotation and translation to correct the wavelengths is the relative
source movement defined such that shifting the observed spectral image by
these angles and shifts the position would be identical to the reference
image. The correction of the wavelength using the reference spectral image
is hence in the oposite direction.
"""
dataset_out = copy.deepcopy(datasetIn)
correctedWavelength = dataset_out.return_masked_array('wavelength').copy()
# no need for mask here as wavekength should be difined for all pixels
correctedWavelength = correctedWavelength.data
ntime = correctedWavelength.shape[-1]
for it in range(ntime):
rows, cols = (np.array(correctedWavelength.shape)[:2] / 2) - 0.5
center = np.array((cols, rows)) / 2. - 0.5
tform1 = SimilarityTransform(translation=center)
angle_rad = np.deg2rad(-spectral_movement['relativeAngle'][it])
scale = spectral_movement['relativeScale'][it]
tform2 = SimilarityTransform(rotation=angle_rad,
scale=(1.0/scale-1.0)*int(useScale)+1.0)
tform3 = SimilarityTransform(translation=-center)
tform_rotate = tform3 + tform2 + tform1
translation = (-spectral_movement['crossDispersionShift'][it] *
int(useCrossDispersion),
-spectral_movement['dispersionShift'][it])
tform_translate = SimilarityTransform(translation=translation)
tform_combined = tform_translate + tform_rotate
correctedWavelength[..., it] = warp(correctedWavelength[..., it],
tform_combined, order=3,
cval=np.nan)
# mask those regions of the images which are on the edge and migth
# not be present at all times.
correctedWavelength = np.ma.masked_invalid(correctedWavelength)
ncols = correctedWavelength.shape[1]
for ic in range(ncols):
correctedWavelength[:, ic, :] = \
np.ma.mask_rows(correctedWavelength[:, ic, :])
# replace old wavelengths and update mask.
dataset_out._wavelength = correctedWavelength.data
dataset_out.mask = np.logical_or(dataset_out.mask,
correctedWavelength.mask)
dataset_out.isMovementCorrected = True
if verbose:
wnew = dataset_out.return_masked_array('wavelength')
index_valid = np.ma.all(wnew[..., 0].mask, axis=1)
index_valid = ~index_valid.data
wnew = np.ma.median(wnew[index_valid, ...][1:8, ...], axis=1)
wold = datasetIn.return_masked_array('wavelength')
wold = np.ma.median(wold[index_valid, ...][1:8, ...], axis=1)
sns.set_context("talk", font_scale=1.5, rc={"lines.linewidth": 2.5})
sns.set_style("white", {"xtick.bottom": True, "ytick.left": True})
fig, ax0 = plt.subplots(figsize=(6, 5), nrows=1, ncols=1)
ax0.plot(wnew.T, zorder=5, lw=3)
ax0.plot(wold.T, color='gray', zorder=4)
ax0.set_ylabel('Wavelength [{}]'.format(datasetIn.wavelength_unit))
ax0.set_xlabel('Integration #')
ax0.set_title('Wavelength shifts')
plt.show()
if verboseSaveFile is not None:
fig.savefig(verboseSaveFile, bbox_inches='tight')
return dataset_out
[docs]def rebin_to_common_wavelength_grid(dataset, referenceIndex, nrebin=None,
verbose=False, verboseSaveFile=None,
return_weights=False):
"""
Rebin the spectra to single wavelength per row.
Parameters
----------
dataset : 'SpectralDataTimeSeries'
Input dataset
referenceIndex : 'int'
Exposure index number which will be used as reference defining the
uniform wavelength grid.
nrebin : 'float', optional
rebinning factor for the new wavelength grid compare to the old.
verbose : 'bool', optional
If True, diagnostic plots will be created
verboseSaveFile : 'str', optional
If not None, verbose output will be saved to the specified file.
return_weights : 'bool', optional
If set, returns weights used in rebinning.
Returns
-------
rebinnedDataset : 'SpectralDataTimeSeries'
Output to common wavelength grid rebinned dataset
"""
if not isinstance(dataset, SpectralDataTimeSeries):
raise TypeError("the input data to rebin_to_common_wavelength_grid "
"function needs to be a SpectralDataTimeSeries. "
"Aborting rebin to a common wavelength grid.")
# all data with wavelength dependency + time
spectra = dataset.return_masked_array('data')
uncertainty = dataset.return_masked_array('uncertainty')
wavelength = dataset.return_masked_array('wavelength')
time = dataset.return_masked_array('time')
try:
scaling = dataset.return_masked_array('scaling')
except:
scaling = None
# A pixel row (time) does not have the same wavelength in time
# Need to find the miximum-lowest or minimum-higest wavelength for a
# proper rebinning.
min_wavelength = np.ma.min(np.ma.max(wavelength, axis=-1))
max_wavelength = np.ma.max(np.ma.min(wavelength, axis=-1))
referenceWavelength = np.sort(np.array(wavelength[1:-1, referenceIndex]))
idx_min_select = np.where(referenceWavelength >= min_wavelength)[0][0]
idx_max_select = np.where(referenceWavelength <= max_wavelength)[0][-1]
referenceWavelength = referenceWavelength[idx_min_select:idx_max_select]
lr, ur = _define_band_limits(wavelength.data)
if nrebin is not None:
referenceWavelength = \
np.linspace(referenceWavelength[0+int(nrebin/2)],
referenceWavelength[-1-int(nrebin/2)],
int(len(referenceWavelength)/nrebin))
lr0, ur0 = _define_band_limits(referenceWavelength)
weights = _define_rebin_weights(lr0, ur0, lr, ur)
rebinnedSpectra, rebinnedUncertainty = \
_rebin_spectra(spectra, uncertainty, weights)
rebinnedWavelength = np.tile(referenceWavelength,
(rebinnedSpectra.shape[-1], 1)).T
if scaling is not None:
rebinnedScaling, _ = _rebin_spectra(scaling, scaling*0.0, weights)
else:
rebinnedScaling = None
ndim = dataset.data.ndim
selection = tuple((ndim-1)*[0]+[Ellipsis])
dictTimeSeries = {}
dictTimeSeries['data'] = rebinnedSpectra
dictTimeSeries['data_unit'] = dataset.data_unit
dictTimeSeries['uncertainty'] = rebinnedUncertainty
dictTimeSeries['wavelength'] = rebinnedWavelength
dictTimeSeries['wavelength_unit'] = dataset.wavelength_unit
dictTimeSeries['time'] = time[selection]
dictTimeSeries['time_unit'] = dataset.time_unit
dictTimeSeries['isRebinned'] = True
if rebinnedScaling is not None:
dictTimeSeries['scaling'] = rebinnedScaling
dictTimeSeries['scaling_unit'] = dataset.scaling_unit
# get everything else apart from data, wavelength, time and uncertainty
for key in vars(dataset).keys():
if key[0] != "_":
if isinstance(vars(dataset)[key], MeasurementDesc):
measurement = getattr(dataset, key)
if not key in dictTimeSeries.keys():
dictTimeSeries[key] = measurement[selection]
else:
# print('can be added withour rebin')
dictTimeSeries[key] = getattr(dataset, key)
rebinnedDataset = SpectralDataTimeSeries(**dictTimeSeries)
if verbose:
index_valid = np.ma.all(wavelength.mask, axis=1)
index_valid = ~index_valid.data
sns.set_context("talk", font_scale=1.5, rc={"lines.linewidth": 2.5})
sns.set_style("white", {"xtick.bottom": True, "ytick.left": True})
fig, axes = plt.subplots(figsize=(10, 5), nrows=1, ncols=2)
ax0, ax1 = axes.flatten()
ax0.plot(rebinnedWavelength[1:6].T, zorder=5, lw=3)
ax0.plot(wavelength[index_valid, :][2:7].T, color='gray', zorder=4)
ax0.set_ylabel('Wavelength [{}]'.format(dataset.wavelength_unit))
ax0.set_xlabel('Integration #')
ax0.set_title('Rebinned Wavelengths')
ax1.plot(rebinnedSpectra[1:6].T, zorder=5, lw=3)
ax1.plot(spectra[index_valid, :][2:7].T, color='gray', zorder=4)
ax1.set_ylabel('Flux [{}]'.format(dataset.data_unit))
ax1.set_xlabel('Integration #')
ax1.set_title('Rebinned Signal')
fig.subplots_adjust(hspace=0.3)
fig.subplots_adjust(wspace=0.45)
plt.show()
if verboseSaveFile is not None:
fig.savefig(verboseSaveFile, bbox_inches='tight')
if return_weights:
return rebinnedDataset, weights
else:
return rebinnedDataset
[docs]def combine_scan_samples(datasetIn, scanDictionary, verbose=False):
"""
Combine all (scan) samples.
This routine creates a new SpectralDataTimeSeries of integration averaged
spectra from time series data per sample-up-the-ramp.
Parameters
----------
datasetIn : 'SpectralDataTimeSeries'
Input dataset
scanDictionary : 'dict'
Dictionary containg relevant information about the scans
verbose : 'bool', optional
If True, diagnostic plots will be created (default False).
Returns
-------
combinedDataset : 'SpectralDataTimeSeries'
Output dataset with average signals per integration
Raises
------
AttributeError
"""
if not isinstance(datasetIn, SpectralDataTimeSeries):
raise TypeError("the input data to combine_scan_sample function needs "
"to be a SpectralDataTimeSeries. Aborting combining "
"scan samples.")
dataIn = datasetIn.return_masked_array('data').copy()
dataInShape = dataIn.shape
errorIn = datasetIn.return_masked_array('uncertainty').copy()
dataUnit = datasetIn.data_unit
waveIn = datasetIn.return_masked_array('wavelength').copy()
wavelengthUnit = datasetIn.wavelength_unit
timeIn = datasetIn.return_masked_array('time').copy()
timeUnit = datasetIn.time_unit
dictTimeSeries = {}
def reshape_integration(data, shape, nreads):
reshapedData = \
np.ma.mean(np.ma.reshape(data, (shape[0], shape[1]//nreads,
nreads)),
axis=-1)
return reshapedData
def reshape_error(error, shape, nreads):
reshapedError = \
np.ma.sqrt(np.ma.sum(np.ma.reshape(error,
(shape[0], shape[1]//nreads,
nreads))**2,
axis=-1))/nreads
return reshapedError
def reshape_auxilary(data, shape, nreads):
reshapedData = \
np.mean(np.reshape(data, (len(data)//nreads, nreads)), axis=-1)
return list(reshapedData)
def combine_list_of_strings(data, scanDictionary, sort_index):
reshapedData = []
for scan_dir, scan_par in scanDictionary.items():
data_scan = np.ma.compress(scan_par['index'], data, axis=-1)
reshapedData.append(data_scan[::scan_par['nsamples']])
reshapedData = np.hstack(reshapedData)
reshapedData = np.take_along_axis(reshapedData,
sort_index.mean(axis=0, dtype=int),
axis=-1)
reshapedData = list(reshapedData)
base = [j for i in reshapedData
for j in i.split("_") if 'sample' in j]
if len(base) != 0:
reshapedData = \
[i.replace(base[j], "RESAMPLED{:04d}".format(j))
for j, i in enumerate(reshapedData)]
return reshapedData
def combine_scans_auxilary(data, scanDictionary, sort_index):
reshapedData = []
for scan_dir, scan_par in scanDictionary.items():
data_scan = np.ma.compress(scan_par['index'], data, axis=-1)
reshapedData.append(
reshape_auxilary(data_scan, data_scan.shape,
scan_par['nsamples']))
reshapedData = np.hstack(reshapedData)
reshapedData = np.take_along_axis(reshapedData,
sort_index.mean(axis=0, dtype=int),
axis=-1)
if isinstance(data, list):
reshapedData = list(reshapedData)
return reshapedData
def reshape_data(data, scanDictionary, sort_index):
reshapedData = []
for scan_dir, scan_par in scanDictionary.items():
data_scan = np.ma.compress(scan_par['index'], data, axis=-1)
reshapedData.append(
reshape_integration(data_scan, data_scan.shape,
scan_par['nsamples']))
reshapedData = np.hstack(reshapedData)
reshapedData = np.take_along_axis(reshapedData, sort_index, axis=-1)
return reshapedData
def reshape_primary_data(data, wave, error, time, scanDictionary):
reshapedData = []
reshapedTime = []
reshapedWave = []
reshapedError = []
for scan_dir, scan_par in scanDictionary.items():
data_scan = np.ma.compress(scan_par['index'], data, axis=-1)
wave_scan = np.ma.compress(scan_par['index'], wave, axis=-1)
time_scan = np.ma.compress(scan_par['index'], time, axis=-1)
error_scan = np.ma.compress(scan_par['index'], error, axis=-1)
reshapedData.append(
reshape_integration(data_scan, data_scan.shape,
scan_par['nsamples']))
reshapedTime.append(
reshape_integration(time_scan, data_scan.shape,
scan_par['nsamples']))
reshapedWave.append(
reshape_integration(wave_scan, data_scan.shape,
scan_par['nsamples']))
reshapedError.append(
reshape_error(error_scan, data_scan.shape,
scan_par['nsamples']))
reshapedData = np.hstack(reshapedData)
reshapedTime = np.hstack(reshapedTime)
reshapedWave = np.hstack(reshapedWave)
reshapedError = np.hstack(reshapedError)
idx_time_sort = np.argsort(reshapedTime, axis=-1)
reshapedData = np.take_along_axis(reshapedData, idx_time_sort, axis=-1)
reshapedTime = np.take_along_axis(reshapedTime, idx_time_sort, axis=-1)
reshapedWave = np.take_along_axis(reshapedWave, idx_time_sort, axis=-1)
reshapedError = np.take_along_axis(reshapedError, idx_time_sort, axis=-1)
return (reshapedData, reshapedWave, reshapedError, reshapedTime,
idx_time_sort)
(combinedData, combinedWavelength, combinedError, combinedTime,
idx_time_sort) = reshape_primary_data(dataIn, waveIn, errorIn, timeIn,
scanDictionary)
dictTimeSeries['data'] = combinedData
dictTimeSeries['data_unit'] = dataUnit
dictTimeSeries['uncertainty'] = combinedError
dictTimeSeries['wavelength'] = combinedWavelength
dictTimeSeries['wavelength_unit'] = wavelengthUnit
dictTimeSeries['time'] = combinedTime
dictTimeSeries['time_unit'] = timeUnit
# get everything appart from data, wavelength, time and uncertainty
for key in vars(datasetIn).keys():
if key[0] != "_":
if isinstance(vars(datasetIn)[key], MeasurementDesc):
measurement = getattr(datasetIn, key)
# will be rebinned
dictTimeSeries[key] = \
reshape_data(measurement, scanDictionary, idx_time_sort)
elif isinstance(vars(datasetIn)[key], AuxilaryInfoDesc):
aux = getattr(datasetIn, key)
if isinstance(aux, list):
# list
if (len(aux) == dataInShape[-1]) & \
(isinstance(aux[0], str)):
# list of str needs to be rebinned
dictTimeSeries[key] = \
combine_list_of_strings(aux, scanDictionary,
idx_time_sort)
elif len(aux) == dataInShape[-1]:
dictTimeSeries[key] = \
combine_scans_auxilary(aux, scanDictionary,
idx_time_sort)
else:
dictTimeSeries[key] = aux
elif isinstance(aux, np.ndarray):
if len(aux) == dataInShape[-1]:
# no list, no number
dictTimeSeries[key] = \
combine_scans_auxilary(aux, scanDictionary,
idx_time_sort)
else:
# no list but not an array
dictTimeSeries[key] = aux
else:
# other aux, no rebin
dictTimeSeries[key] = aux
else:
# print('can be added withour rebin')
dictTimeSeries[key] = getattr(datasetIn, key)
combinedDataset = \
SpectralDataTimeSeries(**dictTimeSeries)
if verbose:
sns.set_context("talk", font_scale=1.5, rc={"lines.linewidth": 2.5})
sns.set_style("white", {"xtick.bottom": True, "ytick.left": True})
fig, ax0 = plt.subplots(figsize=(6, 5), nrows=1, ncols=1)
ax0.plot(np.ma.mean(dataIn, axis=-1), label='Signal per sample',
zorder=5, lw=3)
ax0.plot(np.ma.mean(combinedData, axis=-1),
label='Signal per integration', zorder=6, lw=3)
ax0.legend(loc='best')
ax0.set_ylabel('Flux [{}]'.format(dataUnit))
ax0.set_xlabel('Wavelength [{}]'.format(wavelengthUnit))
ax0.set_title('Ramp averaged mean signal spectrum')
plt.show()
return combinedDataset
[docs]def sigma_clip_data_cosmic(data, sigma):
"""
Sigma clip of time series data cube allong the time axis.
Parameters
----------
data : `ndarray`
Input data to be cliped, last axis of data to be assumed the time
sigma : `float`
Sigma value of sigmaclip
Returns
-------
sigmaClipMask : `ndarray` of 'bool'
Updated mask for input data with bad data points flagged `(=1)`
"""
# time axis always the last axis in data,
# or the first in the transposed array
filtereData = sigma_clip(data.T, sigma=sigma, axis=0)
sigmaClipMask = filtereData.mask.T
return sigmaClipMask
[docs]def sigma_clip_data(datasetIn, sigma, nfilter):
"""
Perform sigma clip on science data to flag bad data.
Parameters
----------
datasetIn : 'SpectralDataTimeSeries'
Input dataset
sigma : `float`
Sigma value of sigmaclip.
nfilter : 'int'
Filter length for sigma clip.
Returns
-------
datasetOut : 'SpectralDataTimeSeries'
Input data set containing the observed spectral time series. After
completinion of this function, the dataset is returned with an
updated mask with all diviating pixels flaged. Tho indicate this the
flag isSigmaCliped=True is set.
"""
if nfilter % 2 == 0: # even
nfilter += 1
# mask cosmic hits
data = datasetIn.return_masked_array("data")
sigmaClipedMask = sigma_clip_data_cosmic(data, sigma)
# update mask
sigmaClipedMask = np.ma.mask_or(datasetIn.mask, sigmaClipedMask,
shrink=False)
datasetIn.mask = sigmaClipedMask
dim = datasetIn.data.shape
ndim = datasetIn.data.ndim
newMask = datasetIn.mask.copy()
for il in range(0+(nfilter-1)//2, dim[0]-(nfilter-1)//2):
filter_index = \
[slice(il - (nfilter-1)//2, il+(nfilter-1)//2+1, None)] + \
[slice(None)]*(ndim-1)
filter_index = tuple(filter_index)
# reformat to masked array without quantity
data = datasetIn.return_masked_array("data")
# median along time axis
data = np.ma.median(data[filter_index].T, axis=0)
# filter in box in the wavelength direction
data = sigma_clip(data, sigma=sigma, axis=ndim-2)
# specra: tiling=(dim[1], 1)
# spectral images: tiling=(dim[2], 1, 1)
# spectral data cubes: tiling=(dim[3], 1, 1, 1)
tiling = dim[ndim-1:] + tuple(np.ones(ndim-1).astype(int))
mask = np.tile(data.mask, tiling)
# add to mask
newMask[filter_index] = np.ma.mask_or(newMask[filter_index], mask.T,
shrink=False)
newMask = np.ma.mask_or(datasetIn.mask, newMask, shrink=False)
# update mask and set flag
datasetIn.mask = newMask
datasetIn.isSigmaCliped = True
return datasetIn
[docs]def create_cleaned_dataset(datasetIn, ROIcube, kernel, stdvKernelTime):
"""
Create a cleaned dataset to be used in regresion analysis.
Parameters
----------
datasetIn : 'SpectralDataTimeSeries'
Input dataset
ROIcube : 'ndarray' of 'bool'
Region of interest.
kernel : 'ndarray'
Instrument convolution kernel
stdvKernelTime : 'float'
Standeard devistion in time direction used in convolution.
Returns
-------
cleanedDataset : `SpectralDataTimeSeries`
A cleaned version of the spectral timeseries data of the transiting
exoplanet system
"""
dataToBeCleaned = datasetIn.return_masked_array('data')
uncertaintyToBeCleaned = datasetIn.return_masked_array('uncertainty')
dataToBeCleaned.set_fill_value(np.nan)
uncertaintyToBeCleaned.set_fill_value(np.nan)
ndim = dataToBeCleaned.ndim
if ndim == 2:
RS = RobustScaler(with_scaling=True)
data_scaled = RS.fit_transform(dataToBeCleaned.filled().T)
dataToBeCleaned = \
np.ma.array(data_scaled.T, mask=dataToBeCleaned.mask)
RS2 = RobustScaler(with_scaling=True)
data_scaled2 = RS2.fit_transform(uncertaintyToBeCleaned.filled().T)
uncertaintyToBeCleaned = \
np.ma.array(data_scaled2.T, mask=uncertaintyToBeCleaned.mask)
dataToBeCleaned[ROIcube] = 0.0
dataToBeCleaned.mask[ROIcube] = False
dataToBeCleaned.set_fill_value(np.nan)
uncertaintyToBeCleaned[ROIcube] = 1.0
uncertaintyToBeCleaned.mask[ROIcube] = False
uncertaintyToBeCleaned.set_fill_value(np.nan)
kernel_size = kernel.shape[0]
kernel_1d = Gaussian1DKernel(stdvKernelTime, x_size=kernel_size)
kernel = np.repeat(np.expand_dims(kernel, axis=ndim-1),
(kernel_size), axis=ndim-1)
selection = tuple([slice(None)])+tuple([None])*(ndim-1)
kernel = kernel*kernel_1d.array[selection].T
kernel = kernel/np.sum(kernel)
cleanedData = \
interpolate_replace_nans(dataToBeCleaned.filled(),
kernel, boundary='extend')
cleanedUncertainty = \
interpolate_replace_nans(uncertaintyToBeCleaned.filled(),
kernel, boundary='extend')
if ndim == 2:
cleanedData = \
RS.inverse_transform(cleanedData.T).T
cleanedUncertainty = \
RS2.inverse_transform(cleanedUncertainty.T).T
# cleanedData.mask = cleanedData.mask | ROI
selection = tuple((ndim-1)*[0]+[Ellipsis])
cleanedDataset = SpectralDataTimeSeries(
wavelength=datasetIn.wavelength,
wavelength_unit=datasetIn.wavelength_unit,
data=cleanedData,
data_unit=datasetIn.data_unit,
mask=ROIcube,
time=datasetIn.time.data.value[selection],
time_unit=datasetIn.time_unit,
time_bjd=datasetIn.time_bjd.data.value[selection],
time_bjd_unit=datasetIn.time_bjd_unit,
uncertainty=cleanedUncertainty,
target_name=datasetIn.target_name,
dataProduct=datasetIn.dataProduct,
dataFiles=datasetIn.dataFiles,
isCleanedData=True)
try:
scaling = datasetIn.scaling
scaling_unit = datasetIn.scaling_unit
cleanedDataset.add_measurement(scaling=scaling, scaling_unit=scaling_unit)
except AttributeError:
pass
try:
position = datasetIn.position
position_unit = datasetIn.position_unit
cleanedDataset.add_measurement(position=position, position_unit=position_unit)
except AttributeError:
pass
return cleanedDataset
[docs]def compressROI(ROI, compressMask):
"""
Remove masked wavelengths from ROI.
Parameters
----------
ROI : 'ndarray'
Region of interest on detector.
compressMask : 'ndarray'
Compression mask indicating all valid data.
Returns
-------
compressedROI : 'ndarray'
Row (wavelength) compressed region of interest.
"""
compressedROI = ROI[compressMask]
return compressedROI
[docs]def compressSpectralTrace(spectralTrace, compressMask):
"""
Remove masked wavelengths from spectral trace.
Parameters
----------
spectralTrace : 'dict'
Spectral trace of the dispersed light on the detector.
compressMask : 'ndarray'
Compression mask indicating all valid data.
Returns
-------
compressedsSpectralTrace : 'dict'
Row (wavelength) compressed spectral trace.
"""
compressedsSpectralTrace = spectralTrace.copy()
for key in compressedsSpectralTrace.keys():
compressedsSpectralTrace[key] = \
compressedsSpectralTrace[key][compressMask]
compressedsSpectralTrace['wavelength_pixel'] = \
compressedsSpectralTrace['wavelength_pixel'] - \
compressedsSpectralTrace['wavelength_pixel'][0]
return compressedsSpectralTrace
[docs]def compressDataset(datasetIn, ROI):
"""
Remove all flaged wavelengths from data set.
Parameters
----------
datasetIn : 'SpectralDataset'
Spectral dataset.
ROI : 'ndarray'
Region of interest.
Returns
-------
compressedDataset : SpectralDataset'
Row (wavelength) compressed dataset.
"""
dataIn = datasetIn.return_masked_array('data').copy()
dataInShape = dataIn.shape
errorIn = datasetIn.return_masked_array('uncertainty').copy()
dataUnit = datasetIn.data_unit
waveIn = datasetIn.return_masked_array('wavelength').copy()
wavelengthUnit = datasetIn.wavelength_unit
timeIn = datasetIn.return_masked_array('time').copy()
timeUnit = datasetIn.time_unit
fullMask = \
np.ma.mask_or(dataIn.mask,
np.repeat(ROI[..., np.newaxis],
dataInShape[-1],
axis=-1),
shrink=False)
compressMask = ~fullMask.all(axis=tuple(np.arange(1, dataIn.ndim)))
dictTimeSeries = {}
for key in vars(datasetIn).keys():
if key[0] != "_":
if isinstance(vars(datasetIn)[key], MeasurementDesc):
dictTimeSeries[key] = \
getattr(datasetIn, key)[compressMask, ...]
else:
# print('can be added withour rebin')
dictTimeSeries[key] = getattr(datasetIn, key)
dictTimeSeries['data'] = dataIn[compressMask, ...]
dictTimeSeries['data_unit'] = dataUnit
dictTimeSeries['uncertainty'] = errorIn[compressMask, ...]
dictTimeSeries['wavelength'] = waveIn[compressMask, ...]
dictTimeSeries['wavelength_unit'] = wavelengthUnit
dictTimeSeries['time'] = timeIn[compressMask, ...]
dictTimeSeries['time_unit'] = timeUnit
compressedDataset = \
SpectralDataTimeSeries(**dictTimeSeries)
return compressedDataset, compressMask