Source code for cascade.cpm_model.cpm_model

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# This file is part of CASCADe package
#
# Developed within the ExoplANETS-A H2020 program.
#
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
# Copyright (C) 2018, 2020, 2021  Jeroen Bouwman
"""
Module defining the causal model.

The cpm_model module defines the solver and other functionality for the
regression model used in causal pixel model.
"""
import numpy as np
from types import SimpleNamespace
import itertools
from collections.abc import Iterable
import ast
import warnings
import time as time_module
import copy
import gc
import ray
from numba import jit
from scipy.linalg import svd
from scipy.linalg import solve_triangular
from scipy.linalg import cholesky
import astropy.units as u
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler

from ..exoplanet_tools import lightcurve
from ..exoplanet_tools import  spotprofile
from ..data_model import SpectralData
from ..data_model import SpectralDataTimeSeries
from cascade import __version__

__all__ = ['ols',
           'check_causality', 'select_regressors', 'return_design_matrix',
           'log_likelihood', 'modified_AIC', 'create_regularization_matrix',
           'return_lambda_grid', 'regressionDataServer',
           'rayRegressionDataServer', 'regressionControler',
           'rayRegressionControler', 'ridge', 'rayRidge',
           'make_bootstrap_samples',
           'regressionParameterServer', 'rayRegressionParameterServer',
           'regressionWorker', 'rayRegressionWorker']


[docs]def ols(design_matrix, data, covariance=None): r""" Ordinary least squares. Parameters ---------- design_matrix : 'numpy.ndarray' The design or regression matrix used in the regression modeling data : 'numpy.ndarray' Vecor of data point to be modeled. weights : 'numpy.ndarray', optional Weights used in the regression. Typically the inverse of the coveraice matrix. The default is None. Returns ------- fit_parameters : 'numpy.ndarray' Linear regirssion parameters. err_fit_parameters : 'numpy.ndarray' Error estimate on the regression parameters. sigma_hat_sqr : 'float' Mean squared error. Notes ----- This routine solves the linear equation .. math:: A x = y by finding optimal solution :math:'\hat{x}' by minimizing .. math:: || y - A*\hat{x} ||^2 For details on the implementation see [1]_, [2]_, [3]_, [4]_ References ---------- .. [1] PHD thesis by Diana Maria SIMA, "Regularization techniques in Model Fitting and Parameter estimation", KU Leuven 2006 .. [2] Hogg et al 2010, "Data analysis recipies: Fitting a model to data" .. [3] Rust & O'Leaary, "Residual periodograms for choosing regularization parameters for ill-posed porblems" .. [4] Krakauer et al "Using generalized cross-validationto select parameters in inversions for regional carbon fluxes" Examples -------- >>> import numpy as np >>> from cascade.cpm_model import solve_linear_equation >>> A = np.array([[1, 0, -1], [0, 1, 0], [1, 0, 1], [1, 1, 0], [-1, 1, 0]]) >>> coef = np.array([4, 2, 7]) >>> b = np.dot(A, coef) >>> b = b + np.random.normal(0.0, 0.01, size=b.size) >>> results = solve_linear_equation(A, b) >>> print(results) """ if not isinstance(covariance, type(None)): Gcovariance = cholesky(covariance, lower=True) weighted_design_matrix = solve_triangular(Gcovariance, design_matrix, lower=True, check_finite=False) data_weighted = solve_triangular(Gcovariance, data, lower=True, check_finite=False) scaling_matrix = np.diag(np.full(len(data_weighted), 1.0/np.mean(data_weighted))) data_weighted = np.dot(scaling_matrix, data_weighted) weighted_design_matrix = np.dot(scaling_matrix, weighted_design_matrix) else: weighted_design_matrix = design_matrix data_weighted = data dim_dm = weighted_design_matrix.shape if dim_dm[0] - dim_dm[1] < 1: AssertionError("Wrong dimensions of design matrix: \ more regressors as data; Aborting") # First make SVD of design matrix A U, sigma, VH = svd(weighted_design_matrix) # residual_not_reg = (u[:,rnk:].dot(u[:,rnk:].T)).dot(y) residual_not_reg = np.linalg.multi_dot([U[:, dim_dm[1]:], U[:, dim_dm[1]:].T, data_weighted]) # calculate the filter factors F = np.identity(sigma.shape[0]) Fsigma_inv = np.diag(1.0/sigma) # Solution of the linear system fit_parameters = np.linalg.multi_dot([VH.T, Fsigma_inv, U.T[:dim_dm[1], :], data_weighted]) # calculate the general risidual vector (b-model), which can be caculated # by using U1 (mxn) and U2 (mxm-n), with U=[U1,U2] residual_reg = residual_not_reg + \ np.linalg.multi_dot([U[:, :dim_dm[1]], np.identity(dim_dm[1]) - F, U[:, :dim_dm[1]].T, data_weighted]) effective_degrees_of_freedom = (dim_dm[0] - dim_dm[1]) sigma_hat_sqr = np.dot(residual_reg.T, residual_reg) / \ effective_degrees_of_freedom # calculate the errors on the fit parameters err_fit_parameters = np.sqrt(sigma_hat_sqr * np.diag(np.linalg.multi_dot([VH.T, Fsigma_inv**2, VH]))) return fit_parameters, err_fit_parameters, sigma_hat_sqr
rayOls = ray.remote(ols)
[docs]def ridge(input_regression_matrix, input_data, input_covariance, input_delta, input_alpha): r""" Ridge regression. Parameters ---------- input_regression_matrix : 'numpy.ndarray' The design or regression matrix used in the regularized least square fit. input_data : 'numpy.ndarray' Vector of data to be fit. input_covariance : 'numpy.ndarray' Covariacne matrix used as weight in the least quare fit. input_delta : 'numpy.ndarray' Regularization matrix. For ridge regression this is the unity matrix. input_alpha : 'float' or 'numpy.ndarray' Regularization strength. Returns ------- beta : 'numpy.ndarray' Fitted regression parameters. rss : 'float' Sum of squared residuals. mse : 'float' Mean square error degrees_of_freedom : 'float' The effective degress of Freedo of the fit. model_unscaled : 'numpy.ndarray' The fitted regression model. optimal_regularization : 'numpy.ndarray' The optimal regularization strength determened by generalized cross validation. aicc : float' Corrected Aikake information criterium. Notes ----- This routine solves the linear equation .. math:: A x = y by finding optimal solution :math:'\^x' by minimizing .. math:: || y - A*\hat{x} ||^2 + \lambda * || \hat{x} ||^2 For details on the implementation see [5]_, [6]_, [7]_, [8]_ References ---------- .. [5] PHD thesis by Diana Maria SIMA, "Regularization techniques in Model Fitting and Parameter estimation", KU Leuven 2006 .. [6] Hogg et al 2010, "Data analysis recipies: Fitting a model to data" .. [7] Rust & O'Leaary, "Residual periodograms for choosing regularization parameters for ill-posed porblems" .. [8] Krakauer et al "Using generalized cross-validationto select parameters in inversions for regional carbon fluxes" Examples -------- >>> import numpy as np >>> from cascade.cpm_model import solve_linear_equation >>> A = np.array([[1, 0, -1], [0, 1, 0], [1, 0, 1], [1, 1, 0], [-1, 1, 0]]) >>> coef = np.array([4, 2, 7]) >>> b = np.dot(A, coef) >>> b = b + np.random.normal(0.0, 0.01, size=b.size) >>> results = solve_linear_equation(A, b) >>> print(results) """ n_data, n_parameter = input_regression_matrix.shape # Get data and regression matrix Gcovariance = cholesky(input_covariance, lower=True) regression_matrix = solve_triangular(Gcovariance, input_regression_matrix, lower=True, check_finite=False) data = solve_triangular(Gcovariance, input_data, lower=True, check_finite=False) scaling_matrix = np.diag(np.full(len(data), 1.0/np.mean(data))) data = np.dot(scaling_matrix, data) regression_matrix = np.dot(scaling_matrix, regression_matrix) # Start of regression with SVD U, D, Vh = svd(regression_matrix, full_matrices=False, check_finite=False, overwrite_a=True, lapack_driver='gesdd') R = np.dot(U, np.diag(D)) delta = np.dot(np.dot(Vh, input_delta), Vh.T) RY = np.dot(R.T, data) unity_matrix_ndata = np.identity(n_data) if isinstance(input_alpha, Iterable): gcv_list = [] mse_list = [] for alpha_try in input_alpha: F = np.diag(D**2) + alpha_try*delta G = cholesky(F, lower=True) x = solve_triangular(G, R.T, lower=True, check_finite=False) H = np.dot(x.T, x) residual = np.dot(unity_matrix_ndata-H, data) rss = np.dot(residual.T, residual) degrees_of_freedom = np.trace(H) if (n_data-degrees_of_freedom) >= 1: mse = rss/(n_data-degrees_of_freedom) gcv = n_data*(np.trace(unity_matrix_ndata-H))**-2 * rss else: mse = 1.e16 gcv = 1.e16 gcv_list.append(gcv) mse_list.append(mse) opt_idx = np.argmin(gcv_list) optimal_regularization = input_alpha[opt_idx] else: optimal_regularization = input_alpha # Solve linear system with optimal regularization F = np.diag((D)**2) + optimal_regularization*delta G = cholesky(F, lower=True) x = solve_triangular(G, R.T, lower=True, check_finite=False) H = np.dot(x.T, x) x = solve_triangular(G, RY, lower=True, check_finite=False) x = solve_triangular(G.T, x, lower=False, check_finite=False) beta = np.dot(Vh.T, x) residual = np.dot(unity_matrix_ndata-H, data) rss = np.dot(residual.T, residual) degrees_of_freedom = np.trace(H) mse = rss/(n_data-degrees_of_freedom) aicc = n_data*np.log(rss) + 2*degrees_of_freedom + \ (2*degrees_of_freedom * (degrees_of_freedom+1)) / \ (n_data-degrees_of_freedom-1) # model_optimal = np.dot(H, data) model_unscaled = np.dot(input_regression_matrix, beta) return beta, rss, mse, degrees_of_freedom, model_unscaled, \ optimal_regularization, aicc
rayRidge = ray.remote(ridge)
[docs]def check_causality(): """ Check if all data has a causal connection. Returns ------- causal_mask : ndarray of 'bool' Mask of data which has good causal connection with other data. """ causal_mask = True return causal_mask
[docs]def select_regressors(selection_mask, exclusion_distance): """ Return list with indici of the regressors for each wavelength data point. Parameters ---------- selectionMask : 'ndarray' of 'bool' Mask selection all data for which a regressor matrix have to be constructed. exclusion_distance : 'int' Minimum distance to data point within no data is selected to be used as regressor. Returns ------- regressor_list : 'list' list of indicex pais of data index and indici of the data used as regressors for the specified data point. """ if selection_mask.ndim == 1: selection_mask = np.expand_dims(selection_mask, axis=1) used_data_index = \ [tuple(coord) for coord in np.argwhere(~selection_mask).tolist()] all_data_index = list(np.where(~selection_mask)) ndatapoints = len(used_data_index) regressor_list = [] for coord in used_data_index: idx = np.abs(coord[0]-all_data_index[0]) >= exclusion_distance regressor_list.append([coord, (all_data_index[0][idx], all_data_index[1][idx]), ndatapoints]) return regressor_list
def return_PCA(matrix, n_components): """ Return PCA componentns of input matrix. Parameters ---------- matrix : 'numpy.ndarray' Input matrix for whcih the principal components are calculated. n_components : 'int' Number of PCA composnents. Returns ------- pca_matrix : 'numpy.ndarray' The principal components. pca_back_transnformation : 'function' The function which back-transforms the PC into the original matrix. """ pca = PCA(n_components=np.min([n_components, matrix.shape[0]]), whiten=False, svd_solver='auto') pca_matrix = pca.fit_transform(matrix.T).T pca_scores = pca.components_.T return pca_matrix, pca_scores # @jit(nopython=True, cache=True, parallel=True)
[docs]def log_likelihood(data, covariance, model): """ Calculate the log likelihood. Parameters ---------- data : 'ndarray' Data array to be modeled covariance : 'ndarray' The covariance of the data. model : 'ndarray' Regression model of the data. Returns ------- lnL : 'float' Log likelihood. Notes ----- For the determinent term in the log likelyhood calculation use: 2*np.sum(np.log(np.diag(np.linalg.cholesky(covariance)))) np.dot(np.dot((data-model), np.diag(weights)), (data-model)) """ ndata = len(data) residual = data-model # Cholesky decomposition and inversion: G = cholesky(covariance, lower=True) RG = solve_triangular(G, residual, lower=True, check_finite=False) lnL = -0.5*(ndata*np.log(2.0*np.pi) + 2*np.sum(np.log(np.diag(G))) + np.dot(RG.T, RG)) return lnL
[docs]@jit(nopython=True, cache=True) def modified_AIC(lnL, n_data, n_parameters): """ Calculate the modified AIC. Parameters ---------- lnL : 'float' Log likelihood. n_data : 'int' Number of data points n_parameters : 'int' Number of free model parameters. Returns ------- AICc : 'float' modelifed Aikake information criterium. """ AIC = -2*lnL + 2*n_parameters AICc = AIC + (2*n_parameters*(n_parameters+1))/(n_data-n_parameters-1) return AICc
[docs]def create_regularization_matrix(method, n_regressors, n_not_regularized): """ Create regularization matrix. Two options are implemented: The first one 'value' returns a penalty matrix for the clasical ridge rigression. The second option 'derivative' is consistend with fused ridge penalty (as introduced by Goeman, 2008). Parameters ---------- method : 'string' Method used to calculated regularization matrix. Allawed values are 'value' or 'derivative' n_regressors : 'int' Number of regressors. n_not_regularized : 'int' Number of regressors whi should not have a regulariation term. Raises ------ ValueError Incase the method input parameter has a wrong value a ValueError is raised. Returns ------- delta : 'ndarray' Regularization matrix. """ allowed_methods = ['value', 'derivative'] if method not in allowed_methods: raise ValueError("regularization method not recognized. " "Allowd values are: {}".format(allowed_methods)) if method == 'value': # regularization on value delta = np.diag(np.zeros((n_regressors))) delta[n_not_regularized:, n_not_regularized:] += \ np.diag(np.ones(n_regressors-n_not_regularized)) elif method == 'derivative': # regularazation on derivative delta_temp = np.diag(-1*np.ones(n_regressors-n_not_regularized-1), 1) +\ np.diag(-1*np.ones(n_regressors-n_not_regularized-1), -1) + \ np.diag(2*np.ones(n_regressors-n_not_regularized)) delta_temp[0, 0] = 1.0 delta_temp[-1, -1] = 1.0 #delta_temp = delta_temp/np.linspace(1,3, delta_temp.shape[0])**2 delta = np.diag(np.zeros((n_regressors))) delta[n_not_regularized:, n_not_regularized:] += delta_temp return delta
[docs]def return_lambda_grid(lambda_min, lambda_max, n_lambda): """ Create grid for regularization parameters lambda. Parameters ---------- lambda_min : TYPE DESCRIPTION. lambda_max : TYPE DESCRIPTION. n_lambda : TYPE DESCRIPTION. Returns ------- lambda_grid : TYPE DESCRIPTION. """ if n_lambda <= 1: lambda_grid = np.array([lambda_min]) return lambda_grid delta_lam = np.abs(np.log10(lambda_max)-np.log10(lambda_min))/(n_lambda-1) lambda_grid = 10**(np.log10(lambda_min) + np.linspace(0, n_lambda-1, n_lambda)*delta_lam) return lambda_grid
[docs]def make_bootstrap_samples(ndata, nsamples): """ Make bootstrap sample indicii. Parameters ---------- ndata : 'int' Number of data points. nsamples : 'int' Number of bootstrap samples. Returns ------- bootsptrap_indici : 'ndarray' of 'int' (nsample+1 X ndata) array containing the permutated indicii of the data array. The first row is the unsampled list of indici. non_common_indici : 'list' For ech nootstrap sampling, list of indici not sampled. """ all_indici = np.arange(ndata) bootsptrap_indici = np.zeros((nsamples+1, ndata), dtype=int) non_common_indici = [] bootsptrap_indici[0, :] = all_indici non_common_indici.append(np.setxor1d(all_indici, all_indici)) np.random.seed(1984) for i in range(nsamples): bootsptrap_indici[i+1, :] = np.sort(np.random.choice(ndata, ndata)) non_common_indici.append(np.setxor1d(all_indici, bootsptrap_indici[i+1, :])) return bootsptrap_indici, non_common_indici
[docs]def return_design_matrix(data, selection_list): """ Return the design matrix based on the data set itself. Parameters ---------- data : 'ndarray' Input timeseries data. selection_list : 'tuple' Tuple containing the indici of the data used as regressor for a given wvelength (index). Returns ------- design_matrix : 'ndarray' Design matrix. """ (il, ir), (idx_cal, trace), nwave = selection_list if data.ndim == 2: data = data[:, np.newaxis, :].copy() design_matrix = data[idx_cal, trace, :] return design_matrix
[docs]class regressionDataServer: """ Class which provied all needed input daqta for the regression modeling. The is class load the data and cleaned data to define for each wavelength the timeseries data at that wavelength which will be abalysed and the regressors which will be used for the analysis. """ def __init__(self, dataset, regressor_dataset): self.fit_dataset = copy.deepcopy(dataset) self.regressor_dataset = copy.deepcopy(regressor_dataset) self.RS = StandardScaler()
[docs] def sync_with_parameter_server(self, parameter_server_handle): """ Sync data server with the parameter server. Parameters ---------- parameter_server_handle : 'regressionParameterServer' instance of the regressionParameterServer class. Returns ------- None. """ self.cascade_configuration = \ parameter_server_handle.get_configuration() self.regression_parameters = \ parameter_server_handle.get_regression_parameters()
[docs] def get_data_info(self): """ Get the relevant information of the observations. Returns ------- ndim : 'int' Dimension of the dataset. shape : 'tuple' Shape of the dataset. ROI : 'ndarray' Region of interest. data_unit : 'astropy unit' Physical unit of the data. wavelength_unit : 'astropy unit' Physical unit of the wavelength. time_unit : 'astropy unit' Unit of the time. time_bjd_zero : 'float' Time in BJD of first integration data_product : 'string' Data product. """ ndim = self.fit_dataset.data.ndim shape = self.fit_dataset.data.shape ROI = self.regressor_dataset.mask.any(axis=1) data_unit = self.regressor_dataset.data_unit wavelength_unit = self.regressor_dataset.wavelength_unit time_unit = self.regressor_dataset.time_unit time_bjd_zero = self.fit_dataset.time_bjd.data.flat[0] data_product = self.fit_dataset.dataProduct return ndim, shape, ROI, data_unit, wavelength_unit, time_unit, \ time_bjd_zero, data_product
[docs] def initialze_lightcurve_model(self): """ Initialize the ligthcurve model. Returns ------- None. """ self.lightcurve_model = lightcurve(self.cascade_configuration) self.spot_model = spotprofile(self.cascade_configuration) try: time_offset = \ ast.literal_eval(self.cascade_configuration.model_time_offset) except AttributeError: time_offset = 0.0 fit_lightcurve_model, fit_ld_correcton, fit_dilution_correction = \ self.lightcurve_model.interpolated_lc_model( self.fit_dataset, time_offset=time_offset ) mid_transit_time = \ self.lightcurve_model.return_mid_transit( self.fit_dataset, time_offset=time_offset ) fit_spot_model = self.spot_model.return_spot_profile(self.fit_dataset) self.fit_lightcurve_model = fit_lightcurve_model self.fit_ld_correcton = fit_ld_correcton self.fit_dilution_correction = fit_dilution_correction self.mid_transit_time = mid_transit_time self.fit_ld_coefficients = self.lightcurve_model.limbdarkning_model.ld self.fit_spot_model = fit_spot_model
[docs] def get_lightcurve_model(self): """ Get the lightcurve model. Returns ------- 'tuple' Tuple containing the lightcurve model, the limbdarkening correction,the dilution correction, the lightcurve model parameters and the mid transit time. """ return (self.fit_lightcurve_model, self.fit_ld_correcton, self.fit_ld_coefficients, self.fit_dilution_correction, self.lightcurve_model.par, self.mid_transit_time)
[docs] def unpack_datasets(self): """ Unpack al datasets into masked arrays. Returns ------- None. """ self.unpack_regressor_dataset() self.unpack_fit_dataset()
[docs] def unpack_regressor_dataset(self): """ Unpack dataset containing data to be used as regressors. Returns ------- None. """ self.regressor_data = \ self.regressor_dataset.return_masked_array('data') np.ma.set_fill_value(self.regressor_data, 0.0) # note we use the fit_dataset here as additional info is always # attached to the main dataset, not the cleaned one. for regressor in self.regression_parameters.additional_regressor_list: if regressor.split('_')[0] == 'time': temp0 = self.fit_dataset.return_masked_array('time') temp1 = (temp0-np.min(temp0))/(np.max(temp0)-np.min(temp0)) order = int(regressor.split('_')[1]) setattr(self, 'regressor_'+regressor, (-temp1)**order) elif regressor.split('_')[0] == 'position': temp0 = self.fit_dataset.return_masked_array('position') temp1 = (temp0-np.min(temp0))/(np.max(temp0)-np.min(temp0)) order = int(regressor.split('_')[1]) setattr(self, 'regressor_'+regressor, (temp1)**order) elif regressor.split('_')[0] == 'fwhm': temp0 = self.fit_dataset.return_masked_array('fwhm') temp1 = (temp0-np.min(temp0))/(np.max(temp0)-np.min(temp0)) order = int(regressor.split('_')[1]) setattr(self, 'regressor_'+regressor, (temp1)**order) else: setattr(self, 'regressor_'+regressor, self.fit_dataset.return_masked_array(regressor))
[docs] def unpack_fit_dataset(self): """ Unpack dataset containing data to be fitted. Returns ------- None. """ self.fit_data = self.fit_dataset.return_masked_array('data') np.ma.set_fill_value(self.fit_data, 0.0) self.fit_data_wavelength = \ self.fit_dataset.return_masked_array('wavelength') self.fit_data_uncertainty = \ self.fit_dataset.return_masked_array('uncertainty') np.ma.set_fill_value(self.fit_data_uncertainty, 1.e8) self.fit_data_time = self.fit_dataset.return_masked_array('time')
[docs] @staticmethod def select_regressors(data, selection, bootstrap_indici=None): """ Return the design matrix for a given selection. This function selects the data to be used as regressor. To be used in combination with the select_data function. Parameters ---------- data : 'ndarray' Spectroscopic data. selection : 'tuple' Tuple containing the indici of the data to be used as regressors for each wavelength (index). bootstrap_indici : 'ndarray' of 'int', optional The time indici indicating which data to be used for a bootstrap sampling. The default is None. Returns ------- design_matrix : 'ndarray' The design matrix used in the regression analysis. """ (_, _), (index_disp_regressors, index_cross_disp_regressors), _ = \ selection if bootstrap_indici is None: bootstrap_indici = np.arange(data.shape[-1]) if data.ndim == 2: regressor_matrix = data[:, np.newaxis, :] return \ regressor_matrix[index_disp_regressors, index_cross_disp_regressors, :][..., bootstrap_indici]
[docs] @staticmethod def select_data(data, selection, bootstrap_indici=None): """ Return the data for a given selection. This functions selects the data for to be used the the regression analysis. To be used in combination with the select_regressors function. Parameters ---------- data : 'ndarray' Spectroscopic data.. selection : 'tuple' Tuple containing the indici of the data to be used as regressors for each wavelength (index). bootstrap_indici : 'ndarray' of 'int', optional The time indici indicating which data to be used for a bootstrap sampling. The default is None. Returns ------- design_matrix : 'ndarray' The selected data to me modeled. """ (index_dispersion, index_cross_dispersion), (_, _), _ = \ selection if bootstrap_indici is None: bootstrap_indici = np.arange(data.shape[-1]) if data.ndim == 2: selected_data = data[:, np.newaxis, :] return selected_data[index_dispersion, index_cross_dispersion, :][..., bootstrap_indici]
[docs] def setup_regression_data(self, selection, bootstrap_indici=None): """ Setupe the data which will be fitted. Parameters ---------- selection : 'tuple' Tuple containing the indici of the data to be used as regressors for each wavelength (index). bootstrap_indici : 'ndarray' of 'int', optional The time indici indicating which data to be used for a bootstrap sampling. The default is None. Returns ------- None. """ if bootstrap_indici is None: bootstrap_indici = np.arange(self.fit_data.shape[-1]) selected_fit_data = \ self.select_data(self.fit_data, selection, bootstrap_indici=bootstrap_indici) selected_fit_wavelength = \ self.select_data(self.fit_data_wavelength, selection, bootstrap_indici=bootstrap_indici) selected_fit_wavelength = np.ma.median(selected_fit_wavelength) selected_fit_time = \ self.select_data(self.fit_data_time, selection, bootstrap_indici=bootstrap_indici).data selected_covariance = \ np.ma.diag(self.select_data(self.fit_data_uncertainty, selection, bootstrap_indici=bootstrap_indici)**2) selected_covariance.set_fill_value(1.e16) self.regression_data_selection = \ (selected_fit_data.filled(), selected_fit_wavelength, selected_fit_time, selected_covariance.filled(), selected_fit_data.mask)
[docs] def setup_regression_matrix(self, selection, bootstrap_indici=None): """ Define the regression matrix. Parameters ---------- selection : 'tuple' Tuple containing the indici of the data to be used as regressors for each wavelength (index). bootstrap_indici : 'ndarray' of 'int', optional The time indici indicating which data to be used for a bootstrap sampling. The default is None. Returns ------- None. """ if bootstrap_indici is None: bootstrap_indici = np.arange(self.regressor_data.shape[-1]) regression_matrix = \ self.select_regressors(self.regressor_data, selection, bootstrap_indici=bootstrap_indici) additional_regressors = [] for regressor in self.regression_parameters.additional_regressor_list: additional_regressors.append( self.select_data(getattr(self, 'regressor_'+regressor), selection, bootstrap_indici=bootstrap_indici) ) # add spot model if self.fit_spot_model is not np.nan: additional_regressors.append( self.select_data(self.fit_spot_model, selection, bootstrap_indici=bootstrap_indici) ) n_additional = len(additional_regressors) + 2 regression_matrix = \ np.vstack(additional_regressors+[regression_matrix]) regression_matrix = self.RS.fit_transform(regression_matrix.T).T # no scaling for spot model if self.fit_spot_model is not np.nan: regression_matrix[n_additional-2-1, :] = \ regression_matrix[n_additional-2-1, :] * \ self.RS.scale_[n_additional-2-1] + \ self.RS.mean_[n_additional-2-1] self.RS.scale_[n_additional-2-1] = 1.0 self.RS.mean_[n_additional-2-1] = 0.0 lc = self.select_data(self.fit_lightcurve_model, selection, bootstrap_indici=bootstrap_indici) intercept = np.ones_like(lc) regression_matrix = np.vstack([intercept, lc, regression_matrix]).T self.regression_matrix_selection = \ (regression_matrix, n_additional, self.RS.mean_, self.RS.scale_)
[docs] def get_regression_data(self, selection, bootstrap_indici=None, return_data_only=False): """ Get all relevant data. Parameters ---------- selection : 'tuple' Tuple containing the indici of the data to be used as regressors for each wavelength (index). bootstrap_indici : 'ndarray' of 'int', optional The time indici indicating which data to be used for a bootstrap sampling. The default is None. return_data_only : 'bool', optional If set, the design matrix is not determined and returned as None. Returns ------- 'ndarray' Data to be modeled. 'ndarray' Design matrix for te regression analysis of the data. """ self.setup_regression_data(selection, bootstrap_indici=bootstrap_indici) if not return_data_only: self.setup_regression_matrix(selection, bootstrap_indici=bootstrap_indici) else: self.regression_matrix_selection = None return self.regression_data_selection, self.regression_matrix_selection
[docs] def get_all_regression_data(self, selection_list, bootstrap_indici=None, return_data_only=False): """ Get all relevant data for a slection list for a single bootstrap step. Parameters ---------- selection_list : 'list' Tuple containing the indici of the data to be used as regressors for each wavelength (index). bootstrap_indici : 'ndarray' of 'int', optional The time indici indicating which data to be used for a bootstrap sampling. The default is None. return_data_only : 'bool', optional If set, the design matrix is not determined and returned as None. Returns ------- 'ndarray' Data to be modeled. 'ndarray' Design matrix for the regression analysis of the data. """ regression_selection_list = [] for selection in selection_list: regression_selection = \ self.get_regression_data(selection, bootstrap_indici=bootstrap_indici, return_data_only=return_data_only) regression_selection_list.append(regression_selection) return regression_selection_list
[docs] def get_regression_data_chunk(self, iterator_chunk): """ Get all relevant data for a chunck of the regression iteration. Parameters ---------- iterator_chunk : 'list' list containing the tuple containing the indici of the data to be used as regressors for each wavelength (index) and the bootstrap time indici indicating which data to be used for a bootstrap sampling. Returns ------- regression_selection_list : 'list' List containing the data to be modeled and the corresponding design matrix for te regression analysis of the data. """ regression_selection_list = [] for (_, bootstrap_indici),\ (_, selection) in iterator_chunk: regression_selection = \ self.get_regression_data(selection, bootstrap_indici=bootstrap_indici) regression_selection_list.append(regression_selection) return regression_selection_list
[docs] def initialize_data_server(self, parameter_server_handle): """ Initialize the data server. Parameters ---------- parameter_server_handle : 'regressionParameterServer' insatance of the regressionParameterServer class. Returns ------- None. """ self.sync_with_parameter_server(parameter_server_handle) self.unpack_datasets() self.initialze_lightcurve_model()
@ray.remote class rayRegressionDataServer(regressionDataServer): """Ray wrapper regressionDataServer class.""" def __init__(self, dataset, regressor_dataset): super().__init__(dataset, regressor_dataset) get_regression_data = \ ray.method(num_returns=2)(regressionDataServer.get_regression_data) get_all_regression_data = \ ray.method(num_returns=1)(regressionDataServer.get_all_regression_data) get_regression_data_chunk = \ ray.method(num_returns=1)(regressionDataServer.get_regression_data_chunk) get_data_info = \ ray.method(num_returns=8)(regressionDataServer.get_data_info) get_lightcurve_model =\ ray.method(num_returns=1)(regressionDataServer.get_lightcurve_model) def sync_with_parameter_server(self, parameter_server_handle): """ Sync the regression server with the parameter server. Parameters ---------- parameter_server_handle : 'regressionParameterServer' insatance of the regressionParameterServer class. Returns ------- None. """ self.cascade_configuration = \ ray.get(parameter_server_handle.get_configuration.remote()) self.regression_parameters = \ ray.get(parameter_server_handle.get_regression_parameters.remote())
[docs]class regressionParameterServer: """ Class which provied the parameter server for the regression modeling. The is class contains all parameters needed for the regression analysis and the fitted results. """ def __init__(self, cascade_configuration): self.cascade_configuration = cascade_configuration self.cpm_parameters = SimpleNamespace() self.initialize_regression_configuration() self.data_parameters = SimpleNamespace() self.regularization = SimpleNamespace() self.fitted_parameters = SimpleNamespace() self.processed_parameters = SimpleNamespace()
[docs] def initialize_regression_configuration(self): """ Initialize all regression control parameters. Returns ------- None. """ self.cpm_parameters.use_multi_processes =\ ast.literal_eval( self.cascade_configuration.cascade_use_multi_processes) self.cpm_parameters.max_number_of_cpus = \ ast.literal_eval( self.cascade_configuration.cascade_max_number_of_cpus) try: self.cpm_parameters.cascade_number_of_data_servers = \ ast.literal_eval( self.cascade_configuration.cascade_number_of_data_servers) except AttributeError: self.cpm_parameters.cascade_number_of_data_servers = 1 self.cpm_parameters.nwidth = \ ast.literal_eval(self.cascade_configuration.cpm_deltapix) self.cpm_parameters.nboot = \ ast.literal_eval(self.cascade_configuration.cpm_nbootstrap) self.cpm_parameters.alpha_min = \ ast.literal_eval(self.cascade_configuration.cpm_lam0) self.cpm_parameters.alpha_max = \ ast.literal_eval(self.cascade_configuration.cpm_lam1) self.cpm_parameters.n_alpha = \ ast.literal_eval(self.cascade_configuration.cpm_nlam) self.cpm_parameters.add_time = \ ast.literal_eval(self.cascade_configuration.cpm_add_time) self.cpm_parameters.add_position = \ ast.literal_eval(self.cascade_configuration.cpm_add_position) try: self.cpm_parameters.add_fwhm = \ ast.literal_eval(self.cascade_configuration.cpm_add_fwhm) except AttributeError: self.cpm_parameters.add_fwhm = False self.cpm_parameters.regularize_depth_correction = \ ast.literal_eval(self.cascade_configuration.cpm_regularize_depth_correction) self.cpm_parameters.sigma_mse_cut = \ ast.literal_eval(self.cascade_configuration.cpm_sigma_mse_cut) try: self.cpm_parameters.reg_type_depth_correction = \ self.cascade_configuration.cpm_reg_type_depth_correction except AttributeError: self.cpm_parameters.reg_type_depth_correction = 'derivative' try: self.cpm_parameters.alpha_min_depth_correction = \ ast.literal_eval(self.cascade_configuration.cpm_lam0_depth_correction) self.cpm_parameters.alpha_max_depth_correction = \ ast.literal_eval(self.cascade_configuration.cpm_lam1_depth_correction) self.cpm_parameters.n_alpha_depth_correction = \ ast.literal_eval(self.cascade_configuration.cpm_nlam_depth_correction) except AttributeError: self.cpm_parameters.alpha_min_depth_correction = 0.001 self.cpm_parameters.alpha_max_depth_correction = 1.e7 self.cpm_parameters.n_alpha_depth_correction = 100 try: self.cpm_parameters.number_of_sub_chunks_per_load = \ ast.literal_eval(self.cascade_configuration.cpm_number_of_sub_chunks_per_load) except AttributeError: self.cpm_parameters.number_of_sub_chunks_per_load = 300 additional_regressor_list = [] try: self.cpm_parameters.add_position_model_order = ast.literal_eval( self.cascade_configuration.cpm_add_position_model_order) except AttributeError: self.cpm_parameters.add_position_model_order = 1 if self.cpm_parameters.add_position: for power in range(1, self.cpm_parameters.add_position_model_order+1): additional_regressor_list.append('position_{}'.format(power)) # additional_regressor_list.append('position') try: self.cpm_parameters.add_time_model_order = ast.literal_eval( self.cascade_configuration.cpm_add_time_model_order) except AttributeError: self.cpm_parameters.add_time_model_order = 1 if self.cpm_parameters.add_time: for power in range(1, self.cpm_parameters.add_time_model_order+1): additional_regressor_list.append('time_{}'.format(power)) try: self.cpm_parameters.add_fwhm_model_order = ast.literal_eval( self.cascade_configuration.cpm_add_fwhm_model_order) except AttributeError: self.cpm_parameters.add_fwhm_model_order = 1 if self.cpm_parameters.add_fwhm: for power in range(1, self.cpm_parameters.add_fwhm_model_order+1): additional_regressor_list.append('fwhm_{}'.format(power)) self.cpm_parameters.additional_regressor_list = \ additional_regressor_list try: self.cpm_parameters.add_spot_profile = \ ast.literal_eval(self.cascade_configuration.cpm_add_spot_profile) except AttributeError: self.cpm_parameters.add_spot_profile = False if self.cpm_parameters.add_spot_profile: self.cpm_parameters.n_additional_regressors = \ 3 + len(additional_regressor_list) else: self.cpm_parameters.n_additional_regressors = \ 2 + len(additional_regressor_list)
[docs] def get_regression_parameters(self): """ Get all parameters controling the regression analysis. Returns ------- 'simpleNameSpace' Name spcae holding all parameters controling the regression analysis. """ return self.cpm_parameters
[docs] def get_configuration(self): """ Get the CASCADe configuration. Returns ------- 'cascade.initialize.cascade_configuration' Singleton containing the cascade configuration. """ return self.cascade_configuration
[docs] def sync_with_data_server(self, data_server_handle): """ Sync the parameter server with the data server. Returns ------- None. """ ndim, shape, ROI, data_unit, wavelength_unit, time_unit, \ time_bjd_zero, data_product = data_server_handle.get_data_info() self.data_parameters.ndim = ndim self.data_parameters.shape = shape self.data_parameters.ROI = ROI self.data_parameters.max_spectral_points = \ np.sum(~self.data_parameters.ROI) self.data_parameters.ncorrect = \ np.where(~self.data_parameters.ROI)[0][0] self.data_parameters.data_unit = data_unit self.data_parameters.wavelength_unit = wavelength_unit self.data_parameters.time_unit = time_unit self.data_parameters.time_bjd_zero = time_bjd_zero self.data_parameters.data_product = data_product
[docs] def get_data_parameters(self): """ Get all parameters characterizing the data. Returns ------- simpleNameSpace' Name spcae holding all relevant parameters describing the dataset. """ return self.data_parameters
[docs] def initialize_regularization(self): """ Initialize the regularization parameter test grid and results array. Returns ------- None. """ self.regularization.alpha_grid = \ return_lambda_grid(self.cpm_parameters.alpha_min, self.cpm_parameters.alpha_max, self.cpm_parameters.n_alpha) self.regularization.optimal_alpha = \ list(np.repeat(self.regularization.alpha_grid[np.newaxis, :], self.data_parameters.max_spectral_points, axis=0))
[docs] def get_regularization(self): """ Get the regularization parameters. Returns ------- simpleNameSpace' Name spcae holding all relevant parameters for the regularization. """ return self.regularization
[docs] def update_optimal_regulatization(self, new_regularization): """ Update the fitted optimal regularization strength. Parameters ---------- new_regularization : 'simpleNamespace' New namespace holding the updated optimal regularization. Returns ------- None. """ for i_alpha, new_alpha in enumerate(new_regularization.optimal_alpha): if isinstance(new_alpha, float): self.regularization.optimal_alpha[i_alpha] = new_alpha
[docs] def initialize_parameters(self): """ Initialize the arrays holding the fit results. Returns ------- None. """ self.fitted_parameters.regression_results = \ np.zeros((self.cpm_parameters.nboot+1, self.data_parameters.max_spectral_points, self.data_parameters.max_spectral_points + self.cpm_parameters.n_additional_regressors)) self.fitted_parameters.fitted_spectrum = \ np.zeros((self.cpm_parameters.nboot+1, self.data_parameters.max_spectral_points)) self.fitted_parameters.wavelength_fitted_spectrum = \ np.zeros((self.cpm_parameters.nboot+1, self.data_parameters.max_spectral_points)) self.fitted_parameters.fitted_time = \ np.zeros((self.cpm_parameters.nboot+1, self.data_parameters.max_spectral_points, self.data_parameters.shape[-1])) self.fitted_parameters.fitted_model = \ np.zeros((self.cpm_parameters.nboot+1, self.data_parameters.max_spectral_points, self.data_parameters.shape[-1])) self.fitted_parameters.fitted_mse = \ np.zeros((self.cpm_parameters.nboot+1, self.data_parameters.max_spectral_points)) self.fitted_parameters.fitted_aic = \ np.zeros((self.cpm_parameters.nboot+1, self.data_parameters.max_spectral_points)) self.fitted_parameters.degrees_of_freedom = \ np.zeros((self.cpm_parameters.nboot+1, self.data_parameters.max_spectral_points)) # should be self.processed_results. self.processed_parameters.corrected_fitted_spectrum = \ np.zeros((self.cpm_parameters.nboot+1, self.data_parameters.max_spectral_points)) self.processed_parameters.fitted_baseline =\ np.zeros((self.cpm_parameters.nboot+1, self.data_parameters.max_spectral_points, self.data_parameters.shape[-1])) self.processed_parameters.fit_residuals =\ np.zeros((self.cpm_parameters.nboot+1, self.data_parameters.max_spectral_points, self.data_parameters.shape[-1])) self.processed_parameters.normed_fit_residuals =\ np.zeros((self.cpm_parameters.nboot+1, self.data_parameters.max_spectral_points, self.data_parameters.shape[-1])) self.processed_parameters.normed_fitted_spectrum =\ np.zeros((self.cpm_parameters.nboot+1, self.data_parameters.max_spectral_points)) self.processed_parameters.error_normed_fitted_spectrum =\ np.zeros((self.cpm_parameters.nboot+1, self.data_parameters.max_spectral_points)) self.processed_parameters.wavelength_normed_fitted_spectrum =\ np.zeros((self.cpm_parameters.nboot+1, self.data_parameters.max_spectral_points)) self.processed_parameters.stellar_spectrum =\ np.zeros((self.cpm_parameters.nboot+1, self.data_parameters.max_spectral_points))
[docs] def update_fitted_parameters(self, new_parameters, data_chunk): """Apply new update and returns weights.""" bootstrap_sample_index = data_chunk[0] wavelength_index = data_chunk[1] index_disp_regressors, n_additional = data_chunk[2] self.fitted_parameters.fitted_spectrum[bootstrap_sample_index, wavelength_index] = \ new_parameters.fitted_spectrum self.fitted_parameters.fitted_model[bootstrap_sample_index, wavelength_index, :] = \ new_parameters.fitted_model self.fitted_parameters.fitted_time[bootstrap_sample_index, wavelength_index, :] = \ new_parameters.fitted_time self.fitted_parameters.wavelength_fitted_spectrum[bootstrap_sample_index, wavelength_index] = \ new_parameters.wavelength_fitted_spectrum self.fitted_parameters.fitted_mse[bootstrap_sample_index, wavelength_index] = \ new_parameters.fitted_mse self.fitted_parameters.fitted_aic[bootstrap_sample_index, wavelength_index] = \ new_parameters.fitted_aic self.fitted_parameters.degrees_of_freedom[bootstrap_sample_index, wavelength_index] = \ new_parameters.degrees_of_freedom for j, (i1, i2, i3) in enumerate(zip(bootstrap_sample_index, wavelength_index, index_disp_regressors)): self.fitted_parameters.regression_results[i1, i2, i3] =\ new_parameters.regression_results[0][j] self.fitted_parameters.regression_results[i1, i2, 0:n_additional] =\ new_parameters.regression_results[1][j]
[docs] def update_processed_parameters(self, new_parameters, bootstrap_chunk=None): """ Update processed parameters Parameters ---------- new_parameters : TYPE DESCRIPTION. bootstrap_chunk : TYPE, optional DESCRIPTION. The default is None. Returns ------- None. """ #processed_parameters = copy.deepcopy(self.processed_parameters) if bootstrap_chunk is None: bootstrap_chunk = list( np.arange(self.processed_parameters.corrected_fitted_spectrum.shape[0]) ) bootstrap_sample_counter = list( np.arange(self.processed_parameters.corrected_fitted_spectrum.shape[0]) ) else: bootstrap_chunk, bootstrap_sample_counter = bootstrap_chunk self.processed_parameters.corrected_fitted_spectrum[bootstrap_chunk, ...] =\ new_parameters.corrected_fitted_spectrum[bootstrap_sample_counter, ...] self.processed_parameters.fitted_baseline[bootstrap_chunk, ...] =\ new_parameters.fitted_baseline self.processed_parameters.fit_residuals[bootstrap_chunk, ...] =\ new_parameters.fit_residuals self.processed_parameters.normed_fit_residuals[bootstrap_chunk, ...] =\ new_parameters.normed_fit_residuals self.processed_parameters.normed_fitted_spectrum[bootstrap_chunk, ...] =\ new_parameters.normed_fitted_spectrum self.processed_parameters.error_normed_fitted_spectrum[bootstrap_chunk, ...] =\ new_parameters.error_normed_fitted_spectrum self.processed_parameters.wavelength_normed_fitted_spectrum[bootstrap_chunk, ...] =\ new_parameters.wavelength_normed_fitted_spectrum self.processed_parameters.stellar_spectrum[bootstrap_chunk, ...] =\ new_parameters.stellar_spectrum
#self.processed_parameters = processed_parameters
[docs] def get_fitted_parameters(self, bootstrap_chunk=None): """ Return the fitted parameters. Parameters ---------- bootstrap_chunk: 'list' list of indici of subset of bootstrap samples Returns ------- 'simpleNamespace' Returns a namespace containing all fitted parameters. """ if bootstrap_chunk is None: return self.fitted_parameters else: fitted_parameters_chunk = SimpleNamespace() for k in self.fitted_parameters.__dict__: setattr( fitted_parameters_chunk, k, self.fitted_parameters.__dict__[k][bootstrap_chunk, ...] ) return fitted_parameters_chunk
[docs] def get_processed_parameters(self, bootstrap_chunk=None): """ Return the fitted parameters. Parameters ---------- bootstrap_chunk: 'list' list of indici of subset of bootstrap samples Returns ------- 'simpleNamespace' Returns a namespace containing all fitted parameters. """ if bootstrap_chunk is None: return self.processed_parameters else: processed_parameters_chunk = SimpleNamespace() for k in self.processed_parameters.__dict__: setattr( processed_parameters_chunk, k, self.processed_parameters.__dict__[k][bootstrap_chunk, ...] ) return processed_parameters_chunk
[docs] def add_new_parameters(self, new_parameters): """ Add aditional fitted parameters. Parameters ---------- new_parameters : 'dictionary' Dictionary defining aditional fit parameters of the regression model. Returns ------- None. """ temp = copy.deepcopy(new_parameters) for key, value in temp.items(): setattr(self.fitted_parameters, key, value)
[docs] def reset_parameters(self): """ Reset all regression and regularization parameters. Returns ------- None. """ self.initialize_regularization() self.initialize_parameters()
[docs] def initialize_parameter_server(self, data_server_handle): """ Initialize the parameter server. Parameters ---------- data_server_handle : 'regressionDataServer' Instance of the regressionDataServer class. Returns ------- None. """ self.sync_with_data_server(data_server_handle) self.reset_parameters()
[docs] def reset_parameter_server(self, cascade_configuration, data_server_handle): """ Reset the parameter server. Parameters ---------- cascade_configuration : 'cascade.initialize.cascade_configuration' Singleton containing all cascade configuration parameters. data_server_handle : 'regressionDataServer' Instance of the regressionDataServer class. Returns ------- None. """ self.cascade_configuration = cascade_configuration self.initialize_regression_configuration() self.initialize_parameter_server(data_server_handle)
@ray.remote class rayRegressionParameterServer(regressionParameterServer): """Ray wrapper regressionDataServer class.""" def __init__(self, cascade_configuration): super().__init__(cascade_configuration) def sync_with_data_server(self, data_server_handle): """ Synchronize with data server. This method of the parameter server uses the handle to the dataserver to synchronize the parameters defining the dataset. Returns ------- None. """ ndim, shape, ROI, data_unit, wavelength_unit, time_unit, \ time_bjd_zero, data_product = \ ray.get(data_server_handle.get_data_info.remote()) self.data_parameters.ndim = ndim self.data_parameters.shape = shape self.data_parameters.ROI = ROI self.data_parameters.max_spectral_points = \ np.sum(~self.data_parameters.ROI) self.data_parameters.ncorrect = \ np.where(~self.data_parameters.ROI)[0][0] self.data_parameters.data_unit = data_unit self.data_parameters.wavelength_unit = wavelength_unit self.data_parameters.time_unit = time_unit self.data_parameters.time_bjd_zero = time_bjd_zero self.data_parameters.data_product = data_product
[docs]class regressionControler: """ The main server for the causal regression modeling. This class defines the controler for the regression modeling. It starts the data and parameter server and distributes the tasks to the workers. After completion it processes all results and stores the extrcted planetary spectra in spectral data format. """ def __init__(self, cascade_configuration, dataset, regressor_dataset, number_of_workers=1, number_of_data_servers = 1): self.cascade_configuration = cascade_configuration self.number_of_workers = number_of_workers self.number_of_data_servers = number_of_data_servers self.instantiate_parameter_server() self.instantiate_data_server(dataset, regressor_dataset) self.initialize_servers() self.iterators = SimpleNamespace()
[docs] def instantiate_parameter_server(self): """ Intstantiate the parameter server. Returns ------- None. """ self.parameter_server_handle = \ regressionParameterServer(self.cascade_configuration)
[docs] def instantiate_data_server(self, dataset, regressor_dataset): """ Instantiate the data server. Parameters ---------- dataset : 'SpectralDataTimeSeries' The spectral timeseries dataset to be modeled. regressor_dataset : 'SpectralDataTimeSeries' The cleaned version of the spectral timeseries dataset used for construnction the regression matrici. Returns ------- None. """ #self.data_server_handle = \ # [regressionDataServer(dataset, regressor_dataset) # for _ in range(self.number_of_workers)] self.data_server_handle = \ [regressionDataServer(dataset, regressor_dataset) for _ in range(self.number_of_data_servers)]
[docs] def initialize_servers(self): """ Initialize both data as wel as the parameter server. Note that the order of initialization is important: Firts the data server and then the parameter server. Returns ------- None. """ for server in self.data_server_handle: server.initialize_data_server(self.parameter_server_handle) self.parameter_server_handle.initialize_parameter_server( self.data_server_handle[0])
[docs] def get_fit_parameters_from_server(self, bootstrap_chunk=None): """ Get the regression fit parameters from the parameter server. Parameter --------- bootstrap_chunk: 'list' indici of a subset of all bootstrap samples Returns ------- fitted_parameters: 'simpleNamespace' this namespace contrains all relevant fit parameters used in the extraction and calibration of the planetary signal. """ return self.parameter_server_handle.get_fitted_parameters( bootstrap_chunk=bootstrap_chunk )
[docs] def get_processed_parameters_from_server(self, bootstrap_chunk=None): """ Get the processed regression fit parameters from the parameter server. Returns ------- fitted_parameters: 'simpleNamespace' this namespace contrains all relevant fit parameters used in the extraction and calibration of the planetary signal. """ return self.parameter_server_handle.get_processed_parameters( bootstrap_chunk=bootstrap_chunk )
[docs] def get_regularization_parameters_from_server(self): """ Get the regularization parameters from the parameter server. Returns ------- 'simapleNamespace' Namsespace containing all regularization varaibles and parameters. """ return self.parameter_server_handle.get_regularization()
[docs] def get_control_parameters(self): """ Get the contraol parameters from the parameter server. This function returns all relevant parameters needed to determine the behaviour and settings of the regression modeling. Returns ------- control_parameters : 'SimpleNamespace' This namespace contrain all control parameters of the regression model. """ control_parameters = SimpleNamespace() control_parameters.data_parameters = \ self.parameter_server_handle.get_data_parameters() control_parameters.cpm_parameters = \ self.parameter_server_handle.get_regression_parameters() return control_parameters
[docs] def get_lightcurve_model(self): """ Get the lightcurve model. Returns ------- 'simapleNamespace' Namespace containing all variables and parameters defining the lightcurve model. """ lightcurve_model = SimpleNamespace() lightcurve_model.lightcurve_model, lightcurve_model.ld_correction, \ lightcurve_model.ld_coefficients, \ lightcurve_model.dilution_correction, \ lightcurve_model.lightcurve_parameters, \ lightcurve_model.mid_transit_time = \ self.data_server_handle[0].get_lightcurve_model() return lightcurve_model
#return self.data_server_handle[0].get_lightcurve_model()
[docs] def initialize_regression_iterators(self, nchunks=1): """ Initialize the iterators required in the regression analysis. Returns ------- None. """ cpm_parameters = \ self.parameter_server_handle.get_regression_parameters() data_parameters = self.parameter_server_handle.get_data_parameters() self.iterators.regressor_indici = \ select_regressors(data_parameters.ROI, cpm_parameters.nwidth) self.iterators.bootsptrap_indici, _ = \ make_bootstrap_samples(data_parameters.shape[-1], cpm_parameters.nboot) self.iterators.combined_full_model_indici = itertools.product( enumerate(self.iterators.bootsptrap_indici[:1]), enumerate(self.iterators.regressor_indici)) self.iterators.n_iterators_full_model = \ data_parameters.max_spectral_points self.iterators.combined_bootstrap_model_indici = itertools.product( enumerate(self.iterators.bootsptrap_indici), enumerate(self.iterators.regressor_indici)) self.iterators.n_iterators_bootstrap_model = \ data_parameters.max_spectral_points*(cpm_parameters.nboot+1) self.chunk_iterators(nchunks=nchunks)
[docs] def get_regression_iterators(self): """ Get all iterators used in the regression analysis. Returns ------- 'simplaeNamespace' Namespace containing all iterators (data indici, bootstrap indici) for regression analysis """ return self.iterators
[docs] @staticmethod def grouper_it(it, nchunks, number_of_iterators): """ Split iterator into chunks. Parameters ---------- it : 'itertools.product' Iterator to be split into chunks. nchunks : 'int' Number of chuncks. number_of_iterators : 'int' Number of iterators. Yields ------ chunk_it : 'list' Chunk of the input iterator. """ chunk_size = number_of_iterators // nchunks it = iter(it) nchunks_times_it = itertools.tee(it, nchunks) for i, sub_it in enumerate(nchunks_times_it): start = 0+i*chunk_size if i+1 == nchunks: stop = number_of_iterators else: stop = chunk_size+i*chunk_size chunk_it = itertools.islice(sub_it, start, stop) yield list(chunk_it), stop-start
[docs] def chunk_iterators(self, nchunks=1): """ Split interators into chunks. Parameters ---------- nchunk : 'int', optional Number of chunks in which to split the iterators. The default is 1. Returns ------- None. """ chunked_full_model_iterator = list( self.grouper_it(self.iterators.combined_full_model_indici, nchunks, self.iterators.n_iterators_full_model)) chunked_bootstrap_model_iterator = list( self.grouper_it(self.iterators.combined_bootstrap_model_indici, nchunks, self.iterators.n_iterators_bootstrap_model)) self.iterators.chunked_full_model_iterator = \ chunked_full_model_iterator self.iterators.chunked_bootstrap_model_iterator = \ chunked_bootstrap_model_iterator
[docs] def reset_fit_parameters(self): """ Reset the fitted parameters on the parameter server. Returns ------- None. """ self.parameter_server_handle.reset_parameters()
[docs] def add_fit_parameters_to_parameter_server(self, new_parameters): """ Add the fited refression parameters to the parameter server. Parameters ---------- new_parameters : 'simpleNamespace' Updated fit parameters. Returns ------- None. """ self.parameter_server_handle.add_new_parameters(new_parameters)
[docs] def run_regression_model(self): """ Run the regression model. This method runs the regression method for the instrument systematics and the transit depth determination. Returns ------- None. """ # Number of chunks is the number of workers nchunks = self.number_of_workers # define the iterator chunks self.initialize_regression_iterators(nchunks=nchunks) # This launches workers on the full (non bootstrapped) data set # and determines the optimal regularization #initial_fit_parameters = \ # copy.deepcopy(self.get_fit_parameters_from_server()) initial_regularization = \ self.get_regularization_parameters_from_server() workers = [ regressionWorker(#initial_fit_parameters, initial_regularization, iterator_chunk) for iterator_chunk in self.iterators.chunked_full_model_iterator ] ndata_server=len(self.data_server_handle) futures = [w.async_update_loop(self.parameter_server_handle, self.data_server_handle[iserver%ndata_server]) for iserver, w in enumerate(workers)] # This launches workers on the bootstrapped data set + original data # and determines the fit parameters and error there on updated_regularization = \ copy.deepcopy(self.get_regularization_parameters_from_server()) # re-initialize workers with optimal regularization futures = [w.update_initial_parameters(#initial_fit_parameters, updated_regularization, iterator_chunk) for w, iterator_chunk in zip( workers, self.iterators.chunked_bootstrap_model_iterator ) ] # reset parameters on server for final run. self.parameter_server_handle.reset_parameters() futures = [w.async_update_loop(self.parameter_server_handle, self.data_server_handle[iserver%ndata_server]) for iserver, w in enumerate(workers)]
[docs] @staticmethod def calculate_correction_matrix(lightcurve_model): # correction matricx for limb darkening correction nwave = lightcurve_model.lightcurve_model.shape[0] corr_matrix = np.zeros((nwave, nwave)) + np.identity(nwave) for i in zip(*np.triu_indices(nwave, k=1)): coeff, _, _ = ols(lightcurve_model.lightcurve_model[i[0], :, None], lightcurve_model.lightcurve_model[i[1], :]) corr_matrix[i] = coeff corr_matrix[i[::-1]] = 1/coeff return corr_matrix
[docs] def process_regression_fit(self): """ Process the fitted parameters from the regression anlysis. Returns ------- None. """ control_parameters = self.get_control_parameters() lightcurve_model = self.get_lightcurve_model() # correction for transit depth when using the lightcurve models from # other wavelengths corr_matrix = self.calculate_correction_matrix(lightcurve_model) # chunked index number of the bootstrap samples boostrap_sample_index_chunk = list(self.grouper_it( np.arange(control_parameters.cpm_parameters.nboot+1), self.number_of_workers, control_parameters.cpm_parameters.nboot+1 )) # chunked time sample indici of the bootstrap boostrap_indici_chunk = list(self.grouper_it( self.iterators.bootsptrap_indici, self.number_of_workers, control_parameters.cpm_parameters.nboot+1 )) # create and initialize workers workers = [ processWorker( #self.get_processed_parameters_from_server(bootstrap_chunk=bsic[0]), control_parameters, self.get_fit_parameters_from_server(bootstrap_chunk=bsic[0]), lightcurve_model, corr_matrix, bic[0], bsic[0] ) for bic, bsic in zip(boostrap_indici_chunk, boostrap_sample_index_chunk) ] # step 1: depth correction features = [ w.depth_correction(indici) for w, indici in zip(workers,boostrap_sample_index_chunk) ] # step 2 refitting and baseline determination ndata_server=len(self.data_server_handle) features = [ w.process_lightcurve_fit( indici, self.iterators.regressor_indici, self.data_server_handle[iserver%ndata_server], self.parameter_server_handle) for iserver, (w, indici) in enumerate( zip(workers,boostrap_sample_index_chunk) ) ] del features, workers, ndata_server, del boostrap_sample_index_chunk,control_parameters, lightcurve_model gc.collect()
[docs] def post_process_regression_fit(self): """ Post processing of the regression analysis. Returns ------- None. """ fit_parameters = copy.deepcopy(self.get_fit_parameters_from_server()) processed_parameters = copy.deepcopy(self.get_processed_parameters_from_server()) control_parameters = copy.deepcopy(self.get_control_parameters()) lightcurve_model = copy.deepcopy(self.get_lightcurve_model()) sigma_cut = control_parameters.cpm_parameters.sigma_mse_cut bad_wavelength_mask = \ (fit_parameters.fitted_mse[0, :] > np.median(fit_parameters.fitted_mse[0, :])*sigma_cut) bad_wavelength_mask = \ np.repeat(bad_wavelength_mask[np.newaxis, :], control_parameters.cpm_parameters.nboot+1, axis=0) fitted_spectrum = \ np.ma.array(processed_parameters.corrected_fitted_spectrum.copy(), mask=bad_wavelength_mask.copy()) stellar_spectrum = \ np.ma.array(processed_parameters.stellar_spectrum.copy(), mask=bad_wavelength_mask.copy()) normed_spectrum = \ np.ma.array(processed_parameters.normed_fitted_spectrum.copy(), mask=bad_wavelength_mask.copy()) error_normed_spectrum = \ np.ma.array(processed_parameters.error_normed_fitted_spectrum.copy(), mask=bad_wavelength_mask.copy()) wavelength_normed_spectrum = \ np.ma.array( processed_parameters.wavelength_normed_fitted_spectrum.copy(), mask=bad_wavelength_mask.copy()) if lightcurve_model.lightcurve_parameters['transittype'] == 'secondary': from cascade.exoplanet_tools import transit_to_eclipse normed_spectrum, error_normed_spectrum = \ transit_to_eclipse(normed_spectrum, uncertainty=error_normed_spectrum) # transfrom to percent by multiplying by 100. # Note!!!!! this has to be done after transit_to_eclipse!!!!! normed_spectrum.data[...] = normed_spectrum.data*100 error_normed_spectrum.data[...] = error_normed_spectrum.data*100 from astropy.stats import mad_std # bootstrapped spectrum (not normalized) median_not_normalized_depth_bootstrap = \ np.ma.median(fitted_spectrum[1:, :], axis=1) spectrum_bootstrap = \ np.ma.median(fitted_spectrum[1:, :], axis=0) error_spectrum_bootstrap = \ mad_std((fitted_spectrum[1:, :].T - median_not_normalized_depth_bootstrap).T, axis=0, ignore_nan=True) # 95% confidense interval non normalized transit depth n = len(median_not_normalized_depth_bootstrap) sort = sorted(median_not_normalized_depth_bootstrap) nn_TD_min, nn_TD, nn_TD_max = \ (sort[int(n * 0.05)], sort[int(n * 0.5)], sort[int(n * 0.95)]) # normalized spectrum median_depth = np.ma.median(normed_spectrum[0, :]) # bootstrapped normalized spectrum median_depth_bootstrap = np.ma.median(normed_spectrum[1:, :], axis=1) normed_spectrum_bootstrap = \ np.ma.median(normed_spectrum[1:, :], axis=0) error_normed_spectrum_bootstrap = \ mad_std((normed_spectrum[1:, :].T - median_depth_bootstrap).T, axis=0, ignore_nan=True) # 95% confidense interval n = len(median_depth_bootstrap) sort = sorted(median_depth_bootstrap) TD_min, TD, TD_max = \ (sort[int(n * 0.05)], sort[int(n * 0.5)], sort[int(n * 0.95)]) # bootstrapped stellar spectrum median_stellar_spectrum = np.ma.median(stellar_spectrum[1:, :], axis=1) stellar_spectrum_bootstrap = \ np.ma.median(stellar_spectrum[1:, :], axis=0) error_stellar_spectrum_bootstrap = \ mad_std((stellar_spectrum[1:, :].T - median_stellar_spectrum).T, axis=0, ignore_nan=True) # 95% confidense interval n = len(median_stellar_spectrum) sort = sorted(median_stellar_spectrum) SF_min, SF, SF_max = \ (sort[int(n * 0.05)], sort[int(n * 0.5)], sort[int(n * 0.95)]) observing_time = control_parameters.data_parameters.time_bjd_zero data_product = control_parameters.data_parameters.data_product curent_data = time_module.localtime() creation_time = '{}_{}_{}:{}_{}_{}'.format(curent_data.tm_year, curent_data.tm_mon, curent_data.tm_mday, curent_data.tm_hour, curent_data.tm_min, curent_data.tm_sec) auxilary_data = {'TDDEPTH': [nn_TD_min, nn_TD, nn_TD_max], 'MODELRP': lightcurve_model.lightcurve_parameters['rp'], 'MODELA': lightcurve_model.lightcurve_parameters['a'], 'MODELINC': lightcurve_model.lightcurve_parameters['inc']*u.deg, 'MODELECC': lightcurve_model.lightcurve_parameters['ecc'], 'MODELW': lightcurve_model.lightcurve_parameters['w']*u.deg, 'MODELEPH': lightcurve_model.lightcurve_parameters['t0'], 'MODELPER': lightcurve_model.lightcurve_parameters['p'], 'VERSION': __version__, 'CREATIME': creation_time, 'OBSTIME': observing_time, 'MIDTTIME': lightcurve_model.mid_transit_time, 'DATAPROD': data_product} # non normlized dataset wavelength_unit = control_parameters.data_parameters.wavelength_unit data_unit = control_parameters.data_parameters.data_unit non_normalized_exoplanet_spectrum_bootstrap = \ SpectralData(wavelength=wavelength_normed_spectrum[0, :], wavelength_unit=wavelength_unit, data=spectrum_bootstrap, data_unit=data_unit, uncertainty=error_spectrum_bootstrap, ) non_normalized_exoplanet_spectrum_bootstrap.add_auxilary( **auxilary_data ) # non normalized stellar dataset stellar_auxilary_data = copy.deepcopy(auxilary_data) stellar_auxilary_data.pop('TDDEPTH') stellar_auxilary_data['STLRFLUX'] = [SF_min, SF, SF_max] data_unit = control_parameters.data_parameters.data_unit non_normalized_stellar_spectrum_bootstrap = \ SpectralData(wavelength=wavelength_normed_spectrum[0, :], wavelength_unit=wavelength_unit, data=stellar_spectrum_bootstrap, data_unit=data_unit, uncertainty=error_stellar_spectrum_bootstrap, ) non_normalized_stellar_spectrum_bootstrap.add_auxilary( **stellar_auxilary_data ) # normalized datset auxilary_data['TDDEPTH'] = [median_depth] data_unit = u.percent exoplanet_spectrum = \ SpectralData(wavelength=wavelength_normed_spectrum[0, :], wavelength_unit=wavelength_unit, data=normed_spectrum[0, :], data_unit=data_unit, uncertainty=error_normed_spectrum[0, :], ) exoplanet_spectrum.add_auxilary(**auxilary_data) # normalized bootstrapped dataset auxilary_data['TDDEPTH'] = [TD_min, TD, TD_max] exoplanet_spectrum_bootstrap = \ SpectralData(wavelength=wavelength_normed_spectrum[0, :], wavelength_unit=wavelength_unit, data=normed_spectrum_bootstrap, data_unit=data_unit, uncertainty=error_normed_spectrum_bootstrap, ) exoplanet_spectrum_bootstrap.add_auxilary(**auxilary_data) fitted_transit_model = \ SpectralDataTimeSeries( wavelength=wavelength_normed_spectrum[0, :], wavelength_unit=wavelength_unit, data=(lightcurve_model.lightcurve_model.T * normed_spectrum_bootstrap).T, data_unit=data_unit, uncertainty=(lightcurve_model.lightcurve_model.T * error_normed_spectrum_bootstrap).T, time=fit_parameters.fitted_time[0, 0, :], time_unit=control_parameters.data_parameters.time_unit ) fitted_transit_model.add_auxilary(**auxilary_data) # timeseries baseline nboot, nwave, ntime = fit_parameters.fitted_time.shape uniq_time = fit_parameters.fitted_time[0, 0, :] baseline_bootstrap = np.zeros((nwave, ntime)) normed_residual_bootstrap = np.ma.zeros((nwave, ntime)) error_baseline_bootstrap = np.zeros_like(baseline_bootstrap) error_normed_residual_bootstrap = np.ma.zeros((nwave, ntime)) for it, time in enumerate(uniq_time): for il in range(nwave): idx = np.where(fit_parameters.fitted_time[1:, il, :] == time) selection = \ processed_parameters.fitted_baseline[idx[0]+1, il, idx[1]] baseline_bootstrap[il, it] = np.ma.median(selection) error_baseline_bootstrap[il, it] = mad_std(selection, ignore_nan=True) selection = \ processed_parameters.normed_fit_residuals[idx[0]+1, il, idx[1]] normed_residual_bootstrap[il, it] = np.ma.median(selection) error_normed_residual_bootstrap[il, it] = mad_std(selection, ignore_nan=True) time_baseline_bootstrap = uniq_time wavelength_baseline_bootstrap = wavelength_normed_spectrum[0, :] baseline_mask = exoplanet_spectrum_bootstrap.mask baseline_mask = \ np.repeat(baseline_mask[:, np.newaxis], len(uniq_time), axis=1) residual_mask = np.logical_or(normed_residual_bootstrap.mask, baseline_mask) # from cascade.data_model import SpectralDataTimeSeries data_unit = control_parameters.data_parameters.data_unit time_unit = control_parameters.data_parameters.time_unit fitted_systematics_bootstrap = SpectralDataTimeSeries( wavelength=wavelength_baseline_bootstrap, wavelength_unit=wavelength_unit, data=baseline_bootstrap, data_unit=data_unit, uncertainty=error_baseline_bootstrap, time=time_baseline_bootstrap, time_unit=time_unit, mask=baseline_mask) data_unit = u.dimensionless_unscaled time_unit = control_parameters.data_parameters.time_unit fitted_residuals_bootstrap = SpectralDataTimeSeries( wavelength=wavelength_baseline_bootstrap, wavelength_unit=wavelength_unit, data=normed_residual_bootstrap, data_unit=data_unit, uncertainty=error_normed_residual_bootstrap, time=time_baseline_bootstrap, time_unit=time_unit, mask=residual_mask) post_prosessed_results = \ {'exoplanet_spectrum': exoplanet_spectrum, 'exoplanet_spectrum_bootstrap': exoplanet_spectrum_bootstrap, 'non_normalized_exoplanet_spectrum_bootstrap': non_normalized_exoplanet_spectrum_bootstrap, 'fitted_systematics_bootstrap': fitted_systematics_bootstrap, 'fitted_residuals_bootstrap': fitted_residuals_bootstrap, 'fitted_transit_model': fitted_transit_model, 'non_normalized_stellar_spectrum_bootstrap': non_normalized_stellar_spectrum_bootstrap} self.add_fit_parameters_to_parameter_server(post_prosessed_results)
@ray.remote class rayRegressionControler(regressionControler): """Ray wrapper regressionControler class.""" def __init__(self, cascade_configuration, dataset, regressor_dataset, number_of_workers=1, number_of_data_servers=1): super().__init__(cascade_configuration, dataset, regressor_dataset, number_of_workers=number_of_workers, number_of_data_servers=number_of_data_servers) def instantiate_parameter_server(self): """ Create an handle to the parameter server. Returns ------- None. """ self.parameter_server_handle = \ rayRegressionParameterServer.remote(self.cascade_configuration) def instantiate_data_server(self, dataset, regressor_dataset): """ Create an handle to the data server. Parameters ---------- dataset : TYPE DESCRIPTION. regressor_dataset : TYPE DESCRIPTION. Returns ------- None. """ self.data_server_handle = \ [rayRegressionDataServer.remote(dataset, regressor_dataset) for _ in range(self.number_of_data_servers)] def initialize_servers(self): """ Initialize both the data and the parameter server. Note that the order of initialization is important: Firts the data server and then the parameter server. Returns ------- None. """ ftr = [server.initialize_data_server.remote(self.parameter_server_handle) for server in self.data_server_handle] ray.get(ftr) ftr = self.parameter_server_handle.\ initialize_parameter_server.remote(self.data_server_handle[0]) ray.get(ftr) def get_fit_parameters_from_server(self, bootstrap_chunk=None): """ Grab fitted regression parameters from the parameter server. Returns ------- TYPE DESCRIPTION. """ return copy.deepcopy(ray.get( self.parameter_server_handle.get_fitted_parameters.remote( bootstrap_chunk=bootstrap_chunk ) )) def get_processed_parameters_from_server(self, bootstrap_chunk=None): """ Grab fitted regression parameters from the parameter server. Parameters ---------- bootstrap_chunk ; Returns ------- Simplenamespace Simple namespace containing all processed parameters. """ return copy.deepcopy(ray.get( self.parameter_server_handle.get_processed_parameters.remote( bootstrap_chunk=bootstrap_chunk ) )) def get_regularization_parameters_from_server(self): """ Get the regularization parameters from the parameter server. Returns ------- TYPE DESCRIPTION. """ return ray.get( self.parameter_server_handle.get_regularization.remote() ) def get_control_parameters(self): """ Get the regression control parameters from the parameter server. Returns ------- control_parameters : TYPE DESCRIPTION. """ control_parameters = SimpleNamespace() control_parameters.data_parameters = \ ray.get(self.parameter_server_handle.get_data_parameters.remote()) control_parameters.cpm_parameters = \ ray.get( self.parameter_server_handle.get_regression_parameters.remote() ) return control_parameters @ray.method(num_returns=1) def get_lightcurve_model(self): """ Get the lightcurve model from the data server. Returns ------- TYPE DESCRIPTION. """ lightcurve_model = SimpleNamespace() lightcurve_model.lightcurve_model, lightcurve_model.ld_correction, \ lightcurve_model.ld_coefficients, \ lightcurve_model.dilution_correction, \ lightcurve_model.lightcurve_parameters, \ lightcurve_model.mid_transit_time = \ ray.get(self.data_server_handle[0].get_lightcurve_model.remote()) return lightcurve_model def initialize_regression_iterators(self, nchunks=1): """ Initialize all iterators used in the regression analysis. Returns ------- None. """ cpm_parameters = \ ray.get(self.parameter_server_handle. get_regression_parameters.remote()) data_parameters = \ ray.get(self.parameter_server_handle.get_data_parameters.remote()) self.iterators.regressor_indici = \ select_regressors(data_parameters.ROI, cpm_parameters.nwidth) self.iterators.bootsptrap_indici, _ = \ make_bootstrap_samples(data_parameters.shape[-1], cpm_parameters.nboot) self.iterators.combined_full_model_indici = itertools.product( enumerate(self.iterators.bootsptrap_indici[:1]), enumerate(self.iterators.regressor_indici)) self.iterators.n_iterators_full_model = \ data_parameters.max_spectral_points self.iterators.combined_bootstrap_model_indici = itertools.product( enumerate(self.iterators.bootsptrap_indici), enumerate(self.iterators.regressor_indici)) self.iterators.n_iterators_bootstrap_model = \ data_parameters.max_spectral_points*(cpm_parameters.nboot+1) self.chunk_iterators(nchunks=nchunks) def reset_fit_parameters(self): """ Reset the fitted parameters on the parameter server. Returns ------- None. """ ray.get(self.parameter_server_handle.reset_parameters.remote()) # @staticmethod # def get_data_chunck(data_server_handle, regression_selection, # bootstrap_selection): # """ # Get a chunk of the data to be used in the regression analysis. # Parameters # ---------- # data_server_handle : 'regressioDataServer' # Instance of the regressionDataServer class. # regression_selection : 'tuple' # tuple containing indici defing the data and regression matrix for # all wavelength indici. # bootstrap_selection : 'ndarray' # indici defining the bootstrap sampling. # Returns # ------- # regression_data_selection : 'ndarray' # Selected data to be modeled. # regression_matirx_selection : 'ndarray' # data used as design matrix in regression modeling of the # selected data. # """ # regression_data_selection, regression_matirx_selection = \ # ray.get(data_server_handle.get_regression_data.remote( # regression_selection, # bootstrap_indici=bootstrap_selection)) # return regression_data_selection, regression_matirx_selection # @staticmethod # def get_data_per_bootstrap_step(data_server_handle, regression_selections, # bootstrap_selection, return_data_only=True): # """ # Get all data chunks to be used in the regression analysis per bootstrap step. # Parameters # ---------- # data_server_handle : 'regressioDataServer' # Instance of the regressionDataServer class. # regression_selections : TYPE # DESCRIPTION. # bootstrap_selection : 'ndarray' # indici defining the bootstrap sampling. # Returns # ------- # selection_list: 'list' # List with all data and regression matrix selections # """ # selection_list = \ # ray.get(data_server_handle.get_all_regression_data.remote( # regression_selections, bootstrap_indici=bootstrap_selection, # return_data_only=return_data_only)) # return selection_list def add_fit_parameters_to_parameter_server(self, new_parameters): """ Add the fited refression parameters to the parameter server. Parameters ---------- new_parameters : TYPE DESCRIPTION. Returns ------- None. """ copy.deepcopy(ray.get( self.parameter_server_handle. add_new_parameters.remote(new_parameters) )) def run_regression_model(self): """ Run the regression model. This method runs the regression method for the instrument systematics and the transit depth determination. Returns ------- None. """ # number of data chanks is the number of workers nchunks = self.number_of_workers # define the iterator chunks self.initialize_regression_iterators(nchunks=nchunks) # This launches workers on the full (non bootstrapped) data set # and determines the optimal regularization #initial_fit_parameters = \ # self.get_fit_parameters_from_server() initial_regularization = \ self.get_regularization_parameters_from_server() workers = [ rayRegressionWorker.remote(#initial_fit_parameters, initial_regularization, iterator_chunk) for iterator_chunk in self.iterators.chunked_full_model_iterator ] ndata_servers=len(self.data_server_handle) futures = [w.async_update_loop.remote(self.parameter_server_handle, self.data_server_handle[iserver%ndata_servers]) for iserver, w in enumerate(workers)] ray.get(futures) # This launches workers on the bootstrapped data set + original data # and determines the fit parameters and error there on updated_regularization = \ self.get_regularization_parameters_from_server() # re-initialize workers with optimal regularization futures = [w.update_initial_parameters.remote(#initial_fit_parameters, updated_regularization, iterator_chunk) for w, iterator_chunk in zip( workers, self.iterators.chunked_bootstrap_model_iterator )] ray.get(futures) # reset parameters on server for final run. self.reset_fit_parameters() futures = [w.async_update_loop.remote(self.parameter_server_handle, self.data_server_handle[iserver%ndata_servers]) for iserver, w in enumerate(workers)] ray.get(futures) del futures, workers del ndata_servers gc.collect() @staticmethod def calculate_correction_matrix(lightcurve_model): # correction matricx for limb darkening correction nwave = lightcurve_model.lightcurve_model.shape[0] corr_matrix = np.zeros((nwave, nwave)) + np.identity(nwave) refs = [] index = [] for i in zip(*np.triu_indices(nwave, k=1)): refs.append(rayOls.remote(lightcurve_model.lightcurve_model[i[0], :, None], lightcurve_model.lightcurve_model[i[1], :])) index.append(i) returns = ray.get(refs) for (i), (coeff, _, _) in zip(index, returns): corr_matrix[i] = coeff[0] corr_matrix[i[::-1]] = 1/coeff[0] return corr_matrix def process_regression_fit(self): """ Process the fitted parameters from the regression anlysis. Returns ------- None. """ control_parameters = self.get_control_parameters() lightcurve_model = self.get_lightcurve_model() # correction for transit depth when using the lightcurve models from # other wavelengths corr_matrix = self.calculate_correction_matrix(lightcurve_model) # chunked index number of the bootstrap samples boostrap_sample_index_chunk = list(self.grouper_it( np.arange(control_parameters.cpm_parameters.nboot+1), self.number_of_workers, control_parameters.cpm_parameters.nboot+1 )) # chunked time sample indici of the bootstrap boostrap_indici_chunk = list(self.grouper_it( self.iterators.bootsptrap_indici, self.number_of_workers, control_parameters.cpm_parameters.nboot+1 )) # create and initialize workers workers = [ rayProcessWorker.remote( #self.get_processed_parameters_from_server(bootstrap_chunk=bsic[0]), control_parameters, self.get_fit_parameters_from_server(bootstrap_chunk=bsic[0]), lightcurve_model, corr_matrix, bic[0], bsic[0] ) for bic, bsic in zip(boostrap_indici_chunk, boostrap_sample_index_chunk) ] # step 1: depth correction features = [ w.depth_correction.remote(indici) for w, indici in zip(workers,boostrap_sample_index_chunk) ] ray.get(features) # step 2 refitting and baseline determination ndata_server=len(self.data_server_handle) features = [ w.process_lightcurve_fit.remote( indici, self.iterators.regressor_indici, self.data_server_handle[iserver%ndata_server], self.parameter_server_handle ) for iserver, (w, indici) in enumerate( zip(workers,boostrap_sample_index_chunk) ) ] ray.get(features) # # step 3: storing processed results # features = [ # w.update_parameters_on_server.remote(self.parameter_server_handle) # for w in workers # ] # ray.get(features) del features, workers, ndata_server, del boostrap_sample_index_chunk,control_parameters, lightcurve_model gc.collect() class processWorker: """ Post-process worker class. This class defines the workers used in the post-processing of the regression analysis to determine the final systematics and error. """ def __init__(self, #initial_processed_parameters, control_parameters, fit_parameters, lightcurve_model, correction_matrix, bootsptrap_indici, boostrap_sample_index_chunk): #self.processed_parameters= copy.deepcopy(initial_processed_parameters) self.control_parameters = control_parameters self.bootsptrap_indici = bootsptrap_indici self.boostrap_sample_index_chunk = boostrap_sample_index_chunk self.regression_results = fit_parameters.regression_results self.fitted_spectrum = fit_parameters.fitted_spectrum self.fitted_model = fit_parameters.fitted_model self.lightcurve_model = lightcurve_model self.correction_matrix = correction_matrix self.processed_parameters=SimpleNamespace() def update_parameters_on_server(self, parameter_server_handle, bootstrap_chunk=None): """ Update parameters on parameter server. Parameters ---------- parameter_server_handle : 'regressionParameterServer'' Instane of the parameter server class bootstrap_chunk : 'tuple' of 'lists' tuple containing the list of processed bootstrap samples and list with index counter of the processed bootstep step. Returns ------- None. """ if bootstrap_chunk is None: bootstrap_chunk = (self.boostrap_sample_index_chunk, list(np.arange(len(self.boostrap_sample_index_chunk)))) ftrs = parameter_server_handle.\ update_processed_parameters( self.processed_parameters, bootstrap_chunk=bootstrap_chunk ) def depth_correction(self, bootstrap_chunck): #return #for iboot in bootstrap_chunck[0]: # fit_results = self.regression_results[iboot,...] # spectrum = self.fitted_spectrum[iboot] corrected_fitted_spectrum = [] for iboot, (fit_results, spectrum) in enumerate( zip(self.regression_results, self.fitted_spectrum)): W1 = np.delete( fit_results, list(np.arange(self.control_parameters.cpm_parameters.\ n_additional_regressors ) ), 1) K = np.identity(W1.shape[0]) - W1 * self.correction_matrix # note spectrum is already corrected for LD using renormalized LC # correction for differenc in band shape is the corr_matrix if self.control_parameters.cpm_parameters.regularize_depth_correction: input_covariance = np.diag(np.ones_like(spectrum)) input_delta = create_regularization_matrix( self.control_parameters.cpm_parameters.\ reg_type_depth_correction, len(spectrum), 0) reg_min = self.control_parameters.cpm_parameters.\ alpha_min_depth_correction reg_max = self.control_parameters.cpm_parameters.\ alpha_max_depth_correction nreg = self.control_parameters.cpm_parameters.\ n_alpha_depth_correction input_alpha = return_lambda_grid(reg_min, reg_max, nreg) results = ridge(K, spectrum, input_covariance, input_delta, input_alpha) corrected_spectrum = results[0] if (results[-2] <= reg_min) | (results[-2] >= reg_max): warnings.warn("optimal regularization value of {} used in " "TD subtraction correction outside the " "range [{}, {}]".format(results[-2], reg_min, reg_max)) else: corrected_spectrum, _, _ = ols(K, spectrum) corrected_fitted_spectrum.append(corrected_spectrum) self.processed_parameters.corrected_fitted_spectrum = \ np.array(corrected_fitted_spectrum) gc.collect() @staticmethod def get_data_per_bootstrap_step(data_server_handle, regression_selections, bootstrap_selection, return_data_only=True): """ Get all data chunks to be used in the regression analysis per bootstrap step. Parameters ---------- data_server_handle : 'regressioDataServer' Instance of the regressionDataServer class. regression_selections : TYPE DESCRIPTION. bootstrap_selection : 'ndarray' indici defining the bootstrap sampling. return_data_only : 'bool', optional If set, the design matrix is not determined and returned as None. Returns ------- selection_list: 'list' List with all data and regression matrix selections """ selection_list = \ data_server_handle.get_all_regression_data( regression_selections, bootstrap_indici=bootstrap_selection, return_data_only=return_data_only) return selection_list def process_lightcurve_fit(self, bootstrap_chunck, regressor_indici, data_server_handle, parameter_server_handle): #return #for iboot in bootstrap_chunck[0]: # bootstrap_selection = self.bootsptrap_indici[iboot,...] # models = self.fitted_model[iboot, ...] # corrected_spectrum = \ # self.processed_parameters.corrected_fitted_spectrum[iboot,:] for bootstrap_counter, (iboot, bootstrap_selection, models, corrected_spectrum) in enumerate( zip(self.boostrap_sample_index_chunk, self.bootsptrap_indici, self.fitted_model, self.processed_parameters.corrected_fitted_spectrum ) ): self.processed_parameters.fitted_baseline = np.zeros( self.control_parameters.data_parameters.shape) self.processed_parameters.fit_residuals = np.ma.zeros( self.control_parameters.data_parameters.shape) self.processed_parameters.normed_fit_residuals = np.ma.zeros( self.control_parameters.data_parameters.shape) lc_model = \ self.lightcurve_model.lightcurve_model[..., bootstrap_selection] self.processed_parameters.normed_fitted_spectrum = np.zeros(( self.control_parameters.data_parameters.max_spectral_points)) self.processed_parameters.error_normed_fitted_spectrum = np.zeros( self.control_parameters.data_parameters.max_spectral_points) self.processed_parameters.wavelength_normed_fitted_spectrum = np.zeros(( self.control_parameters.data_parameters.max_spectral_points)) regression_data_selections = \ self.get_data_per_bootstrap_step(data_server_handle, regressor_indici, bootstrap_selection) for ipixel, (regression_selection, (regression_data_selection, _)) in\ enumerate(zip(regressor_indici, regression_data_selections)): (il, _), (_, _), nwave = regression_selection data_unscaled, wavelength, phase, covariance, mask= \ regression_data_selection lc = lc_model[il, :] base = models[ipixel] - (corrected_spectrum)[ipixel]*lc self.processed_parameters.fitted_baseline[il, :] = base self.processed_parameters.fit_residuals[il, :] = \ np.ma.array(data_unscaled - models[ipixel], mask=mask) data_normed = data_unscaled/base covariance_normed = covariance*np.diag(base**-2) normed_depth, error_normed_depth, sigma_hat = \ ols(lc[:, np.newaxis], data_normed-1.0, covariance=covariance_normed) self.processed_parameters.normed_fit_residuals[il, :] = \ np.ma.array(data_normed-1.0-normed_depth*lc, mask=mask) self.processed_parameters.normed_fitted_spectrum[ipixel] = \ normed_depth[0]*self.lightcurve_model.dilution_correction[il, 0] self.processed_parameters.error_normed_fitted_spectrum[ipixel] = \ error_normed_depth[0]*self.lightcurve_model.dilution_correction[il, 0] self.processed_parameters.wavelength_normed_fitted_spectrum[ipixel] = wavelength del regression_data_selections, regression_selection, regression_data_selection del data_unscaled, wavelength, phase, covariance, mask, il, nwave gc.collect() # self.processed_parameters.fitted_baseline[iboot,:] = baseline_model # self.processed_parameters.fit_residuals[iboot,:] = residual # self.processed_parameters.normed_fit_residuals[iboot,:] = \ # normed_residual # self.processed_parameters.normed_fitted_spectrum[iboot,:] = \ # normed_spectrum # self.processed_parameters.error_normed_fitted_spectrum[iboot,:] = \ # error_normed_spectrum # self.processed_parameters.wavelength_normed_fitted_spectrum[iboot,:] = \ # wavelength_normed_spectrum # self.processed_parameters.stellar_spectrum[iboot,:] = \ # corrected_spectrum/normed_spectrum #self.processed_parameters.fitted_baseline = baseline_model #self.processed_parameters.fit_residuals = residual #self.processed_parameters.normed_fit_residuals = \ # normed_residual #self.processed_parameters.normed_fitted_spectrum = \ # normed_spectrum #self.processed_parameters.error_normed_fitted_spectrum = \ # error_normed_spectrum #self.processed_parameters.wavelength_normed_fitted_spectrum = \ # wavelength_normed_spectrum self.processed_parameters.stellar_spectrum = \ corrected_spectrum/self.processed_parameters.normed_fitted_spectrum self.update_parameters_on_server(parameter_server_handle, bootstrap_chunk=([iboot,], [bootstrap_counter,])) #bootstrap_counter += 1 gc.collect() @ray.remote class rayProcessWorker(processWorker): """Ray wrapper regressionDataServer class.""" def __init__(self, #initial_processed_parameters, control_parameters, fit_parameters, lightcurve_model, correction_matrix, bootsptrap_indici, boostrap_sample_index_chunk): super().__init__(#initial_processed_parameters, control_parameters, fit_parameters, lightcurve_model, correction_matrix, bootsptrap_indici, boostrap_sample_index_chunk) def update_parameters_on_server(self, parameter_server_handle, bootstrap_chunk=None): """ Update parameters on parameter server. Parameters ---------- parameter_server_handle : 'regressionParameterServer'' Instane of the parameter server class bootstrap_chunk : 'tuple' of 'lists' tuple containing the list of processed bootstrap samples and list with index counter of the processed bootstep step. Returns ------- None. """ if bootstrap_chunk is None: bootstrap_chunk = (self.boostrap_sample_index_chunk, list(np.arange(len(self.boostrap_sample_index_chunk)))) ftrs = parameter_server_handle.\ update_processed_parameters.remote( self.processed_parameters, bootstrap_chunk=bootstrap_chunk ) ray.get(ftrs) @staticmethod def get_data_per_bootstrap_step(data_server_handle, regression_selections, bootstrap_selection, return_data_only=True): """ Get all data chunks to be used in the regression analysis per bootstrap step. Parameters ---------- data_server_handle : 'regressioDataServer' Instance of the regressionDataServer class. regression_selections : TYPE DESCRIPTION. bootstrap_selection : 'ndarray' indici defining the bootstrap sampling. return_data_only : 'bool', optional If set, the design matrix is not determined and returned as None. Returns ------- selection_list: 'list' List with all data and regression matrix selections """ selection_list = \ copy.deepcopy(ray.get(data_server_handle.get_all_regression_data.remote( regression_selections, bootstrap_indici=bootstrap_selection, return_data_only=return_data_only))) return selection_list
[docs]class regressionWorker: """ Regression worker class. This class defines the workers used in the regression analysis to determine the systematics and transit model parameters. """ def __init__(self, #initial_fit_parameters, initial_regularization, iterator_chunk): #self.fit_parameters = copy.deepcopy(initial_fit_parameters) self.regularization = copy.deepcopy(initial_regularization) self.iterator = iterator_chunk self.fit_parameters = SimpleNamespace()
[docs] def update_initial_parameters(self, #updated_fit_parameters, updated_regularization, updated_iterator_chunk): """ Update all parameters. Parameters ---------- updated_fit_parameters : 'simpleNameSpace' All parameters controling the regression model. updated_regularization : 'simpleNameSpace' All parameters controling the regularization. updated_iterator_chunk : 'list' Iterator chunck over data and bootstrap selections. Returns ------- None. """ #self.fit_parameters = copy.deepcopy(updated_fit_parameters) self.regularization = copy.deepcopy(updated_regularization) self.iterator = copy.deepcopy(updated_iterator_chunk)
[docs] def compute_model(self, regression_data, regularization_method, alpha): """ Compute the regression model. Parameters ---------- regression_selection : 'list' DESCRIPTION. bootstrap_selection : 'list' DESCRIPTION. data_server_handle : 'regressionDataServer' DESCRIPTION. regularization_method : 'str' DESCRIPTION. alpha : 'float' or 'ndarray' DESCRIPTION. Returns ------- beta_optimal : 'ndarray' DESCRIPTION. rss : 'float' DESCRIPTION. mse : 'float' DESCRIPTION. degrees_of_freedom : 'float' DESCRIPTION. model_unscaled : 'ndarray' DESCRIPTION. alpha : 'float' DESCRIPTION. """ # Get data and regression matrix regression_data_selection, regression_matirx_selection = \ regression_data data_unscaled, wavelength, phase, covariance, _ = \ regression_data_selection (regression_matrix_unscaled, n_additional, feature_mean, feature_scale) = regression_matirx_selection # create regularization matrix n_data, n_parameter = regression_matrix_unscaled.shape delta = create_regularization_matrix(regularization_method, n_parameter, n_additional) # do ridge regression (beta_optimal, rss, mse, degrees_of_freedom, model_unscaled, alpha, aic) = \ ridge(regression_matrix_unscaled, data_unscaled, covariance, delta, alpha) # scale coefficients back beta_optimal[0] -= np.sum(beta_optimal[2:]*feature_mean / feature_scale) beta_optimal[2:] = beta_optimal[2:]/feature_scale return (beta_optimal, rss, mse, degrees_of_freedom, model_unscaled, alpha, aic, phase, wavelength)
[docs] @staticmethod def get_data_chunck(data_server_handle, regression_selection, bootstrap_selection): """ Get a chanck of the data. Parameters ---------- data_server_handle : 'regressionDataDerver' Instance of the regressionDataDerver class. regression_selection : 'list' List of indici defining the data to tbe modeld and the corresponding data to tbe used as regressors. bootstrap_selection : 'list' List of indici defining the bootstrap selection. Returns ------- regression_data_selection : 'ndarray' Selection of data to be modeled regression_matirx_selection : TYPE Selection of data used as regression matrix. """ regression_data_selection, regression_matirx_selection = \ data_server_handle.get_regression_data( regression_selection, bootstrap_indici=bootstrap_selection) return regression_data_selection, regression_matirx_selection
# @staticmethod # def get_data_per_bootstrap_step(data_server_handle, regression_selections, # bootstrap_selection): # """ # Get all data chunks to be used in the regression analysis per bootstrap step. # Parameters # ---------- # data_server_handle : 'regressioDataServer' # Instance of the regressionDataServer class. # regression_selections : TYPE # DESCRIPTION. # bootstrap_selection : 'ndarray' # indici defining the bootstrap sampling. # Returns # ------- # selection_list: 'list' # List with all data and regression matrix selections # """ # selection_list = \ # data_server_handle.get_all_regression_data( # regression_selections, bootstrap_indici=bootstrap_selection) # return selection_list
[docs] @staticmethod def get_regression_data_chunk(data_server_handle, iterator_chunk): """ bla. Parameters ---------- data_server_handle : TYPE DESCRIPTION. iterator_chunk : TYPE DESCRIPTION. Returns ------- selection_list : TYPE DESCRIPTION. """ selection_list = \ data_server_handle.get_regression_data_chunk(iterator_chunk) return selection_list
[docs] @staticmethod def get_regression_parameters(parameter_server_handle): """ Get regression controll parameters from parameter server. Parameters ---------- parameter_server_handle : regressionParameterServer instance of the parameter server. Returns ------- n_additional : 'int' Number of additional regressors. ncorrect : 'int' Number of data points at the short wavelength side cut by the region of interest compared to the full dataset. This parameter is used to make sure the parameters are stored correctly in an array with a size corresponding to the total data volume. """ regression_par = parameter_server_handle.get_regression_parameters() n_additional = regression_par.n_additional_regressors n_sub_chunks = regression_par.number_of_sub_chunks_per_load data_par = parameter_server_handle.get_data_parameters() ncorrect = data_par.ncorrect return n_additional, ncorrect, n_sub_chunks
[docs] def update_parameters_on_server(self, parameter_server_handle, data_chunk): """ Update parameters on parameter server. Parameters ---------- parameter_server_handle : 'regressionParameterServer'' Instane of the parameter server class Returns ------- None. """ ftrs = parameter_server_handle.\ update_fitted_parameters(self.fit_parameters, data_chunk) ftrs = parameter_server_handle.\ update_optimal_regulatization(self.regularization)
[docs] @staticmethod def chunks(lst, n): """Yield successive n-sized chunks from lst.""" for i in range(0, len(lst), n): yield lst[i:i + n]
[docs] def async_update_loop(self, parameter_server_handle, data_server_handle): """ Regression loop over regressin and bootstrap selection. Parameters ---------- parameter_server_handle : 'regressionParameterServer' Instance of the paramter server data_server_handle : 'regressionDataServer' Instance of the data server. Returns ------- None. """ n_additional, ncorrect, n_sub_chunks = \ self.get_regression_parameters(parameter_server_handle) regularization_method = 'value' iterator_chunk, chunk_size = self.iterator sub_chunks = self.chunks(iterator_chunk, n_sub_chunks) for sub_chunk in sub_chunks: bootstrap_sample_index = [] wavelength_index = [] regressor_index = [] self.fit_parameters.fitted_spectrum = [] self.fit_parameters.fitted_model = [] self.fit_parameters.fitted_time = [] self.fit_parameters.wavelength_fitted_spectrum = [] self.fit_parameters.fitted_mse = [] self.fit_parameters.fitted_aic = [] self.fit_parameters.degrees_of_freedom = [] self.fit_parameters.regression_results = [[],[]] regression_data_sub_chunk = \ self.get_regression_data_chunk(data_server_handle, sub_chunk) for ((iboot, bootstrap_selection), (idata_point, regression_selection)),\ regression_data in zip(sub_chunk,regression_data_sub_chunk) : (_, _), (index_disp_regressors, _), nwave = regression_selection bootstrap_sample_index.append(iboot) wavelength_index.append(idata_point) regressor_index.append(index_disp_regressors+n_additional-ncorrect) (beta_optimal, rss, mse, degrees_of_freedom, model_unscaled, alpha, aic, phase, wavelength) = self.compute_model( regression_data,regularization_method, self.regularization.optimal_alpha[idata_point]) self.regularization.optimal_alpha[idata_point] = alpha # self.fit_parameters.\ # fitted_spectrum[iboot, idata_point] = beta_optimal[1] # self.fit_parameters.\ # fitted_model[iboot, idata_point, :] = model_unscaled # self.fit_parameters.\ # fitted_time[iboot, idata_point, :] = phase # self.fit_parameters.\ # wavelength_fitted_spectrum[iboot, idata_point] = wavelength # self.fit_parameters.fitted_mse[iboot, idata_point] = mse # self.fit_parameters.fitted_aic[iboot, idata_point] = aic # self.fit_parameters.\ # degrees_of_freedom[iboot, idata_point] = degrees_of_freedom # self.fit_parameters.\ # regression_results[ # iboot, idata_point, # index_disp_regressors+n_additional-ncorrect # ] = beta_optimal[n_additional:] # self.fit_parameters.\ # regression_results[iboot, idata_point, 0:n_additional] = \ # beta_optimal[0:n_additional] self.fit_parameters.fitted_spectrum.append(beta_optimal[1]) self.fit_parameters.fitted_model.append(model_unscaled) self.fit_parameters.fitted_time.append(phase) self.fit_parameters.wavelength_fitted_spectrum.append(wavelength) self.fit_parameters.fitted_mse.append(mse) self.fit_parameters.fitted_aic.append(aic) self.fit_parameters.degrees_of_freedom.append(degrees_of_freedom) self.fit_parameters.regression_results[0].append(beta_optimal[n_additional:]) self.fit_parameters.regression_results[1].append(beta_optimal[0:n_additional]) self.update_parameters_on_server(parameter_server_handle, data_chunk=(bootstrap_sample_index, wavelength_index, (regressor_index, n_additional))) del regression_data_sub_chunk gc.collect() del sub_chunks, iterator_chunk #self.update_parameters_on_server(parameter_server_handle) gc.collect()
@ray.remote class rayRegressionWorker(regressionWorker): """Ray wrapper regressionDataServer class.""" def __init__(self, #initial_fit_parameters, initial_regularization, iterator_chunk): super().__init__(#initial_fit_parameters, initial_regularization, iterator_chunk) @staticmethod def get_data_chunck(data_server_handle, regression_selection, bootstrap_selection): """ Get a chanck of the data. Parameters ---------- data_server_handle : 'regressionDataDerver' DESCRIPTION. regression_selection : 'list' List of indici defining the data to tbe modeld and the corresponding data to tbe used as regressors. bootstrap_selection : 'list' List of indici defining the bootstrap selection. Returns ------- regression_data_selection : 'ndarray' Selection of data to be modeled regression_matirx_selection : TYPE Selection of data used as regression matrix. """ regression_data_selection, regression_matirx_selection = \ copy.deepcopy(ray.get(data_server_handle.get_regression_data.remote( regression_selection, bootstrap_indici=bootstrap_selection))) return regression_data_selection, regression_matirx_selection # @staticmethod # def get_data_per_bootstrap_step(data_server_handle, regression_selections, # bootstrap_selection): # """ # Get all data chunks to be used in the regression analysis per bootstrap step. # Parameters # ---------- # data_server_handle : 'regressioDataServer' # Instance of the regressionDataServer class. # regression_selections : TYPE # DESCRIPTION. # bootstrap_selection : 'ndarray' # indici defining the bootstrap sampling. # Returns # ------- # selection_list: 'list' # List with all data and regression matrix selections # """ # selection_list = \ # ray.get(data_server_handle.get_all_regression_data.remote( # regression_selections, bootstrap_indici=bootstrap_selection)) # return selection_list @staticmethod def get_regression_data_chunk(data_server_handle, iterator_chunk): """ bla. Parameters ---------- data_server_handle : TYPE DESCRIPTION. iterator_chunk : TYPE DESCRIPTION. Returns ------- selection_list : TYPE DESCRIPTION. """ selection_list = \ copy.deepcopy(ray.get(data_server_handle.get_regression_data_chunk.remote( iterator_chunk))) return selection_list @staticmethod def get_regression_parameters(parameter_server_handle): """ Get regression controll parameters from parameter server. Parameters ---------- parameter_server_handle : regressionParameterServer instance of the parameter server. Returns ------- n_additional : 'int' Number of additional regressors. ncorrect : 'int' Number of data points at the short wavelength side cut by the region of interest compared to the full dataset. This parameter is used to make sure the parameters are stored correctly in an array with a size corresponding to the total data volume. """ regression_par = \ copy.deepcopy(ray.get(parameter_server_handle.get_regression_parameters.remote())) n_additional = regression_par.n_additional_regressors n_sub_chunks = regression_par.number_of_sub_chunks_per_load data_par = \ copy.deepcopy(ray.get(parameter_server_handle.get_data_parameters.remote())) ncorrect = data_par.ncorrect return n_additional, ncorrect, n_sub_chunks def update_parameters_on_server(self, parameter_server_handle, data_chunk): """ Update parameters on parameter server. Parameters ---------- parameter_server_handle : 'regressionParameterServer'' Instane of the parameter server class Returns ------- None. """ ftrs = parameter_server_handle.\ update_fitted_parameters.remote(self.fit_parameters, data_chunk) ray.get(ftrs) ftrs = parameter_server_handle.\ update_optimal_regulatization.remote(self.regularization) ray.get(ftrs)