Source code for fit

# -*- coding: utf-8 -*-
import numpy as np
import os
from PyQt5.QtWidgets import QFileDialog
from scipy import special as _special
from scipy.linalg import expm
from scipy.optimize import lsq_linear
[docs] def load_npy(parent=None, normalize_per_wl=True): """ Opens a dialog to load a treated data file (.npy). Parameters ---------- parent : QWidget, optional The parent widget for the dialog. Defaults to None. normalize_per_wl : bool, optional (Currently unused) Flag to normalize per wavelength. Returns ------- tuple A tuple containing: - data_c (numpy.ndarray): Data matrix. - TD (numpy.ndarray): Time delay vector. - WL (numpy.ndarray): Wavele ngth vector. - base_dir (str): Directory of the selected file. Raises ------ ValueError If the user cancels the file selection. """ file_path, _ = QFileDialog.getOpenFileName(parent, "Select treated data file", "", "NumPy files (*.npy)") if not file_path: raise ValueError("No file selected") data = np.load(file_path, allow_pickle=True).item() data_c = data['data_c'].astype(float) WL = data['WL'].flatten() TD = data['TD'].flatten() base_dir = os.path.dirname(file_path) return data_c, TD, WL, base_dir
[docs] def crop_spectrum(data_c, WL, WLmin, WLmax): """ Crops the spectral data to a specific wavelength range. Parameters ---------- data_c : numpy.ndarray Original data matrix (Times x Wavelengths). WL : numpy.ndarray Wavelength vector. WLmin : float Lower wavelength limit. WLmax : float Upper wavelength limit. Returns ------- tuple Cropped data matrix and cropped wavelength vector. """ mask = (WL >= WLmin) & (WL <= WLmax) return data_c[:, mask], WL[mask]
[docs] def crop_kinetics(data_c, TD, TDmin, TDmax): """ Crops the kinetics to a specific time range. Parameters ---------- data_c : numpy.ndarray Original data matrix (Times x Wavelengths). TD : numpy.ndarray Time delay vector. TDmin : float Lower time limit. TDmax : float Upper time limit. Returns ------- tuple Cropped data matrix and cropped time vector. """ mask = (TD >= TDmin) & (TD <= TDmax) return data_c[:, mask], TD[mask]
[docs] def binning(data_c, WL, bin_size): """ Bins adjacent wavelength channels to improve the signal-to-noise ratio. Parameters ---------- data_c : numpy.ndarray Original data matrix. WL : numpy.ndarray Wavelength vector. bin_size : int Number of channels to bin together. Returns ------- tuple Averaged data matrix and averaged wavelength vector. """ numWL = len(WL) // bin_size datacAVG = np.zeros((numWL, data_c.shape[1])) WLAVG = np.zeros(numWL) for i in range(numWL): datacAVG[i, :] = np.mean(data_c[i*bin_size:(i+1)*bin_size, :], axis=0) WLAVG[i] = np.mean(WL[i*bin_size:(i+1)*bin_size]) return datacAVG, WLAVG
import numpy as np import scipy.special as _special
[docs] def convolved_exp_vectorized(t, t0, taus, w): """ Calculates a sum of exponential decays convolved with a Gaussian IRF. VERSIÓN HÍBRIDA ESTABLE CON BROADCASTING CORREGIDO. """ if t.ndim == 1: t = t[:, np.newaxis] taus = np.asarray(taus) if taus.ndim == 1: taus = taus[np.newaxis, :] sigma = w / (2 * np.sqrt(2 * np.log(2))) tau_safe = np.maximum(taus, 1e-12) sigma_safe = np.maximum(sigma, 1e-12) t_diff = t - t0 # 'x' tendrá tamaño (N_tiempos, M_taus) debido al broadcasting x = (sigma_safe**2 - tau_safe * t_diff) / (np.sqrt(2) * sigma_safe * tau_safe) out = np.zeros_like(x) # --------------------------------------------------------- # EL ARREGLO: Expandimos las matrices para que coincidan con # la forma de la máscara booleana antes de indexar. # --------------------------------------------------------- t_diff_full = np.broadcast_to(t_diff, x.shape) tau_full = np.broadcast_to(tau_safe, x.shape) # RÉGIMEN 1: Tiempos tempranos (x >= 0) mask_pos = x >= 0 if np.any(mask_pos): exponent_erfcx = - (t_diff_full[mask_pos]**2) / (2 * sigma_safe**2) out[mask_pos] = 0.5 * np.exp(exponent_erfcx) * _special.erfcx(x[mask_pos]) # RÉGIMEN 2: Tiempos tardíos (x < 0) mask_neg = ~mask_pos if np.any(mask_neg): arg1 = (sigma_safe**2 - 2 * tau_full[mask_neg] * t_diff_full[mask_neg]) / (2 * tau_full[mask_neg]**2) out[mask_neg] = 0.5 * np.exp(arg1) * _special.erfc(x[mask_neg]) return out
# ============================================================================= # MODEL EVALUATION FUNCTIONS # =============================================================================
[docs] def eval_global_model(x, t, numExp, numWL, t0_choice_str): """ Evaluates the global parallel model (DAS - Decay Associated Spectra). Calculates the 2D data surface (Time x Wavelength) based on shared lifetimes and independent amplitudes per wavelength. Parameters ---------- x : list or numpy.ndarray Array of fitting parameters (w, t0, taus, amplitudes). t : numpy.ndarray Time delay vector. numExp : int Number of exponential components. numWL : int Number of wavelengths to fit. t0_choice_str : str 'Yes' for fitting with chirp correction (variable t0 per wavelength), any other value for standard global fitting (single shared t0). Returns ------- numpy.ndarray Simulated data matrix (Time x Wavelength). """ F = np.zeros((len(t), numWL)) if t0_choice_str == 'Yes': # CHIRP MODE w = x[0] taus = x[1:1+numExp] # Array de todos los Taus base_idx = 1 + numExp for j in range(numWL): idx = base_idx + j*(numExp+1) t0 = x[idx] Amps = x[idx+1 : idx+1+numExp] # Array de Amplitudes (shape: numExp,) # 1. Calculamos TODAS las bases exponenciales de golpe para este t0 # Devuelve matriz (Tiempo x numExp) directamente. bases = convolved_exp_vectorized(t, t0, taus, w) # 2. Multiplicamos matricialmente Bases @ Amplitudes # (N_t, N_exp) @ (N_exp,) -> (N_t,) (Vector plano) F[:, j] = bases @ Amps else: # GLOBAL FIT (Optimización Máxima) w = x[0] t0 = x[1] taus = x[2:2+numExp] A_base = 2 + numExp # Devuelve directamente la matriz (Tiempo x numExp) basis_functions = convolved_exp_vectorized(t, t0, taus, w) # Extraer Amplitudes all_Amps = x[A_base:].reshape(numWL, numExp).T # Multiplicación F = basis_functions @ all_Amps return F
[docs] def get_sequential_populations(t, t0, w, taus): """ Calculates the populations for a sequential model (A -> B -> C...). Uses a dynamic Bateman equations generator to support any number of exponential components. Parameters ---------- t : numpy.ndarray Time vector. t0 : float Time zero. w : float Width of the IRF. taus : list List of lifetimes for each sequential species. Returns ------- list of numpy.ndarray List where each element is the population over time for the corresponding species. """ k = 1.0 / np.asarray(taus) # Rates (k = 1/tau) pops = [] # Get all basic exponentials at once E_matrix = convolved_exp_vectorized(t, t0, taus, w) E = [E_matrix[:, i] for i in range(len(taus))] num_species = len(taus) for i in range(num_species): if i == 0: # Species 1 is always just the first exponential decay pops.append(E[0]) else: # For Species i (where i > 0), calculate the Bateman equation dynamically # 1. Product of all previous rates: k_0 * k_1 * ... * k_{i-1} rate_prod = np.prod(k[:i]) # 2. Sum over all exponentials up to i species_pop = np.zeros_like(E[0]) for j in range(i + 1): # Calculate the denominator product: prod(k_m - k_j) for m != j denom = 1.0 for m in range(i + 1): if m != j: diff = k[m] - k[j] # Safety for degenerate rates (if two taus are too similar) if abs(diff) < 1e-12: diff = 1e-12 if diff >= 0 else -1e-12 denom *= diff # Add the term to the sum species_pop += E[j] / denom # 3. Final population for species i pops.append(rate_prod * species_pop) return pops
[docs] def eval_sequential_model(x, t, numExp, numWL, t0_choice_str): """ Evaluates the sequential model (SAS - Species Associated Spectra). Parameters ---------- x : list Parameter array. t : numpy.ndarray Time vector. numExp : int Number of species in the cascade. numWL : int Number of wavelengths. t0_choice_str : str 'Yes' for chirp correction mode. Returns ------- numpy.ndarray Simulated data matrix. """ F = np.zeros((len(t), numWL)) if t0_choice_str == 'Yes': w = x[0] taus = x[1:1+numExp] base_idx = 1 + numExp for j in range(numWL): idx = base_idx + j*(numExp+1) t0 = x[idx] sas_coeffs = x[idx+1 : idx+1+numExp] pops_list = get_sequential_populations(t, t0, w, taus) # Manual dot product for the single WL kinetics = np.zeros_like(t) for n in range(numExp): kinetics += sas_coeffs[n] * pops_list[n] F[:, j] = kinetics else: w = x[0] t0 = x[1] taus = x[2:2+numExp] A_base = 2 + numExp # 1. Basis Functions (Time x Species) pops_list = get_sequential_populations(t, t0, w, taus) basis_functions = np.column_stack(pops_list) # 2. Amplitudes / SAS (Species x WL) all_SAS = x[A_base:].reshape(numWL, numExp).T # 3. Matrix Multiplication F = basis_functions @ all_SAS return F
[docs] def damped_oscillation(t, t0, alpha, omega, phi, w): """ Calculates a damped oscillation with a smooth step (approximating IRF convolution). Equation used: $S(t) = 0.5 \cdot (1 + \text{erf}((t-t0)/(\sqrt{2}w))) \cdot \exp(-\alpha(t-t0)) \cdot \sin(\omega(t-t0) + \phi)$ Parameters ---------- t : numpy.ndarray Time vector. t0 : float Time zero (start of the oscillation). alpha : float Damping rate. omega : float Angular frequency of the oscillation. phi : float Initial phase (in radians). w : float Width of the IRF (controls the smoothness of the onset). Returns ------- numpy.ndarray Vector with the damped oscillatory signal. """ t_shifted = t - t0 # 1. Safety Mask: Prevent exp() overflow for very negative times. safe_mask = t_shifted > -6 * w osc = np.zeros_like(t_shifted) # Only calculate where it is numerically safe ts_safe = t_shifted[safe_mask] # Smooth Step (Simulates convolution with Gaussian IRF) # Using erf here is standard for step smoothing step = 0.5 * (1 + _special.erf(ts_safe / (np.sqrt(2) * w))) # Damped Sine decay = np.exp(-alpha * ts_safe) sine = np.sin(omega * ts_safe + phi) osc[safe_mask] = step * decay * sine return osc
[docs] def eval_oscillation_model(x, t, numExp, numWL, t0_choice_str): """ Evaluates a model combining parallel exponential decays and a damped oscillation. Parameters ---------- x : list Model parameters (w, t0, taus, alpha, omega, phi, local amplitudes). t : numpy.ndarray Time vector. numExp : int Number of exponential decays. numWL : int Number of wavelengths. t0_choice_str : str Must be 'No' (Chirp is not yet implemented for this model). Returns ------- numpy.ndarray Simulated data matrix. Raises ------ NotImplementedError If attempted to use with `t0_choice_str == 'Yes'`. """ F = np.zeros((len(t), numWL)) if t0_choice_str == 'Yes': raise NotImplementedError("Chirp not implemented for Oscillation model yet.") # --- Global Parameters --- w = x[0] t0 = x[1] taus = x[2:2+numExp] # Oscillation parameters alpha = x[2+numExp] omega = x[2+numExp+1] phi = x[2+numExp+2] A_base = 2 + numExp + 3 # 1. Decay Bases (Vectorized) -> Matrix (Time x Exp) basis_exp = convolved_exp_vectorized(t, t0, taus, w) # 2. Oscillation Basis -> Matrix (Time x 1) basis_osc = damped_oscillation(t, t0, alpha, omega, phi, w).reshape(-1, 1) # 3. Stack all bases: [T x (numExp + 1)] all_bases = np.hstack([basis_exp, basis_osc]) # 4. Extract Local Amplitudes [A1...An, B] per wavelength num_local_params = numExp + 1 all_amps = x[A_base:].reshape(numWL, num_local_params).T # 5. Matrix Multiplication F = all_bases @ all_amps return F
# ============================================================================= # NUEVO MOTOR VARPRO: Generadores de Matrices de Concentración (C) # =============================================================================
[docs] def get_concentration_matrix_global(x_nl, t, numExp, use_art=False): w, t0 = x_nl[0], x_nl[1] taus = x_nl[2:2+numExp] C = convolved_exp_vectorized(t, t0, taus, w) if use_art: C = np.hstack([C, get_coherent_artifact(t, t0, w)]) return C
[docs] def get_concentration_matrix_sequential(x_nl, t, numExp, use_art=False): w, t0 = x_nl[0], x_nl[1] taus = x_nl[2:2+numExp] pops_list = get_sequential_populations(t, t0, w, taus) C = np.column_stack(pops_list) if use_art: C = np.hstack([C, get_coherent_artifact(t, t0, w)]) return C
[docs] def get_concentration_matrix_oscillation(x_nl, t, numExp, use_art=False): w, t0 = x_nl[0], x_nl[1] taus = x_nl[2:2+numExp] alpha, omega, phi = x_nl[2+numExp], x_nl[2+numExp+1], x_nl[2+numExp+2] basis_exp = convolved_exp_vectorized(t, t0, taus, w) basis_osc = damped_oscillation(t, t0, alpha, omega, phi, w).reshape(-1, 1) C = np.hstack([basis_exp, basis_osc]) if use_art: C = np.hstack([C, get_coherent_artifact(t, t0, w)]) return C
[docs] def eval_varpro_model(C, data_c_T, enforce_nonneg=False, numExp=None): """ Ejecuta la Proyección Variable. Si enforce_nonneg es True, fuerza a que las amplitudes de las especies (SAS) sean >= 0, pero permite que el artefacto coherente fluya libremente. """ if enforce_nonneg and numExp is not None: S_T = np.zeros((C.shape[1], data_c_T.shape[1])) # Límites: [0, infinito] para las especies exponenciales # [-infinito, infinito] para el artefacto coherente o la oscilación lb = np.full(C.shape[1], -np.inf) lb[:numExp] = 0.0 # Lazo súper optimizado usando el método Bounded Variables (BVLS) for i in range(data_c_T.shape[1]): res = lsq_linear(C, data_c_T[:, i], bounds=(lb, np.inf), method='bvls') S_T[:, i] = res.x F = C @ S_T return F, S_T else: # Mínimos cuadrados lineales estándar (Sin límites) S_T, _, _, _ = np.linalg.lstsq(C, data_c_T, rcond=None) F = C @ S_T return F, S_T
[docs] def get_coherent_artifact(t, t0, w): """Genera la base matemática para absorber Raman y XPM (IRF y sus derivadas).""" if t.ndim == 1: t = t[:, np.newaxis] sigma = max(w / (2 * np.sqrt(2 * np.log(2))), 1e-12) t_diff = t - t0 # IRF Gaussiano (Absorbe Raman y Absorción de 2 fotones) irf = np.exp(-0.5 * (t_diff / sigma)**2) # Primera derivada (Absorbe XPM y efectos dispersivos) irf_d1 = - (t_diff / sigma**2) * irf # Segunda derivada irf_d2 = ((t_diff**2 - sigma**2) / sigma**4) * irf # Normalizamos para estabilidad numérica del solver lineal irf = irf / (np.max(np.abs(irf)) + 1e-12) irf_d1 = irf_d1 / (np.max(np.abs(irf_d1)) + 1e-12) irf_d2 = irf_d2 / (np.max(np.abs(irf_d2)) + 1e-12) return np.hstack([irf, irf_d1, irf_d2])