diff --git a/light-curve/light_curve/light_curve_py/features/rainbow/bolometric.py b/light-curve/light_curve/light_curve_py/features/rainbow/bolometric.py index 0f5be883..12fc0e38 100644 --- a/light-curve/light_curve/light_curve_py/features/rainbow/bolometric.py +++ b/light-curve/light_curve/light_curve_py/features/rainbow/bolometric.py @@ -1,10 +1,19 @@ +import math from abc import abstractmethod from dataclasses import dataclass from typing import Dict, List, Union import numpy as np +from scipy.special import lambertw -__all__ = ["bolometric_terms", "BaseBolometricTerm", "SigmoidBolometricTerm", "BazinBolometricTerm"] +__all__ = [ + "bolometric_terms", + "BaseBolometricTerm", + "SigmoidBolometricTerm", + "BazinBolometricTerm", + "LinexpBolometricTerm", + "DoublexpBolometricTerm", +] @dataclass() @@ -186,7 +195,152 @@ def peak_time(t0, amplitude, rise_time, fall_time): return t0 + np.log(fall_time / rise_time) * rise_time * fall_time / (rise_time + fall_time) +@dataclass() +class LinexpBolometricTerm(BaseBolometricTerm): + """Linexp function, symmetric form. Generated using a prototype version of Multi-view + Symbolic Regression (Russeil et al. 2024, https://arxiv.org/abs/2402.04298) on + a SLSN ZTF light curve (https://ztf.snad.space/dr17/view/821207100004043)""" + + @staticmethod + def parameter_names(): + return ["reference_time", "amplitude", "rise_time"] + + @staticmethod + def parameter_scalings(): + return ["time", "flux", "timescale"] + + @staticmethod + def value(t, t0, amplitude, rise_time): + dt = t0 - t + protected_rise = math.copysign(max(1e-5, abs(rise_time)), rise_time) + + # Coefficient to make peak amplitude equal to unity + scale = 1 / (protected_rise * np.exp(-1)) + + power = -dt / protected_rise + power = np.where(power > 100, 100, power) + result = amplitude * scale * dt * np.exp(power) + + return result + + @staticmethod + def initial_guesses(t, m, sigma, band): + + A = np.ptp(m) + + # Compute points after or before maximum + peak_time = t[np.argmax(m)] + after = t[-1] - peak_time + before = peak_time - t[0] + + # Peak position as weighted centroid of everything above zero + idx = m > np.median(m) + # Weighted centroid sigma + dt = np.sqrt(np.sum((t[idx] - peak_time) ** 2 * m[idx] / sigma[idx]) / np.sum(m[idx] / sigma[idx])) + # Empirical conversion of sigma to rise/rise times + rise_time = dt / 2 + rise_time = rise_time if before >= after else -rise_time + + initial = {} + # Reference of linexp correspond to the moment where flux == 0 + initial["reference_time"] = peak_time + rise_time + initial["amplitude"] = A + initial["rise_time"] = rise_time + + return initial + + @staticmethod + def limits(t, m, sigma, band): + t_amplitude = np.ptp(t) + m_amplitude = np.ptp(m) + + limits = {} + limits["reference_time"] = (np.min(t) - 10 * t_amplitude, np.max(t) + 10 * t_amplitude) + limits["amplitude"] = (0, 10 * m_amplitude) + limits["rise_time"] = (-10 * t_amplitude, 10 * t_amplitude) + + return limits + + @staticmethod + def peak_time(t0, amplitude, rise_time): + return t0 - rise_time + + +@dataclass() +class DoublexpBolometricTerm(BaseBolometricTerm): + """Doublexp function generated using Multi-view Symbolic Regression on ZTF SNIa light curves + Russeil et al. 2024, https://arxiv.org/abs/2402.04298""" + + @staticmethod + def parameter_names(): + return ["reference_time", "amplitude", "time1", "time2", "p"] + + @staticmethod + def parameter_scalings(): + return ["time", "flux", "timescale", "timescale", "None"] + + @staticmethod + def value(t, t0, amplitude, time1, time2, p): + dt = t - t0 + + result = np.zeros_like(dt) + + # To avoid numerical overflows + maxp = 20 + A = -(dt / time1) * (p - np.exp(-(dt / time2))) + A = np.where(A > maxp, maxp, A) + + result = amplitude * np.exp(A) + + return result + + @staticmethod + def initial_guesses(t, m, sigma, band): + A = np.ptp(m) + + # Naive peak position from the highest point + t0 = t[np.argmax(m)] + # Peak position as weighted centroid of everything above zero + idx = m > np.median(m) + # t0 = np.sum(t[idx] * m[idx] / sigma[idx]) / np.sum(m[idx] / sigma[idx]) + # Weighted centroid sigma + dt = np.sqrt(np.sum((t[idx] - t0) ** 2 * m[idx] / sigma[idx]) / np.sum(m[idx] / sigma[idx])) + + # Empirical conversion of sigma to rise/fall times + time1 = 10 * dt + time2 = 10 * dt + + initial = {} + initial["reference_time"] = t0 + initial["amplitude"] = A + initial["time1"] = time1 + initial["time2"] = time2 + initial["p"] = 1 + + return initial + + @staticmethod + def limits(t, m, sigma, band): + t_amplitude = np.ptp(t) + m_amplitude = np.ptp(m) + + limits = {} + limits["reference_time"] = (np.min(t) - 10 * t_amplitude, np.max(t) + 10 * t_amplitude) + limits["amplitude"] = (0.0, 10 * m_amplitude) + limits["time1"] = (1e-1, 2 * t_amplitude) + limits["time2"] = (1e-1, 2 * t_amplitude) + limits["p"] = (0, 100) + + return limits + + @staticmethod + def peak_time(t0, p): + return t0 + np.real(-lambertw(p * np.exp(1)) + 1) + + bolometric_terms = { "sigmoid": SigmoidBolometricTerm, "bazin": BazinBolometricTerm, + "linexp": LinexpBolometricTerm, + "doublexp": DoublexpBolometricTerm, } diff --git a/light-curve/light_curve/light_curve_py/features/rainbow/temperature.py b/light-curve/light_curve/light_curve_py/features/rainbow/temperature.py index 5da7bfbe..f569f6f7 100644 --- a/light-curve/light_curve/light_curve_py/features/rainbow/temperature.py +++ b/light-curve/light_curve/light_curve_py/features/rainbow/temperature.py @@ -87,24 +87,24 @@ class SigmoidTemperatureTerm(BaseTemperatureTerm): @staticmethod def parameter_names(): - return ["reference_time", "Tmin", "Tmax", "k_sig"] + return ["reference_time", "Tmin", "Tmax", "t_color"] @staticmethod def parameter_scalings(): return ["time", None, None, "timescale"] @staticmethod - def value(t, t0, temp_min, temp_max, k_sig): + def value(t, t0, temp_min, temp_max, t_color): dt = t - t0 # To avoid numerical overflows, let's only compute the exponent not too far from t0 - idx1 = dt <= -100 * k_sig - idx2 = (dt > -100 * k_sig) & (dt < 100 * k_sig) - idx3 = dt >= 100 * k_sig + idx1 = dt <= -100 * t_color + idx2 = (dt > -100 * t_color) & (dt < 100 * t_color) + idx3 = dt >= 100 * t_color result = np.zeros(len(dt)) result[idx1] = temp_max - result[idx2] = temp_min + (temp_max - temp_min) / (1.0 + np.exp(dt[idx2] / k_sig)) + result[idx2] = temp_min + (temp_max - temp_min) / (1.0 + np.exp(dt[idx2] / t_color)) result[idx3] = temp_min return result @@ -114,7 +114,7 @@ def initial_guesses(t, m, sigma, band): initial = {} initial["Tmin"] = 7000.0 initial["Tmax"] = 10000.0 - initial["k_sig"] = 1.0 + initial["t_color"] = 1.0 return initial @@ -125,7 +125,7 @@ def limits(t, m, sigma, band): limits = {} limits["Tmin"] = (1e3, 2e6) # K limits["Tmax"] = (1e3, 2e6) # K - limits["k_sig"] = (1e-4, 10 * t_amplitude) + limits["t_color"] = (1e-4, 10 * t_amplitude) return limits @@ -136,24 +136,24 @@ class DelayedSigmoidTemperatureTerm(BaseTemperatureTerm): @staticmethod def parameter_names(): - return ["reference_time", "Tmin", "Tmax", "k_sig", "t_delay"] + return ["reference_time", "Tmin", "Tmax", "t_color", "t_delay"] @staticmethod def parameter_scalings(): return ["time", None, None, "timescale", "timescale"] @staticmethod - def value(t, t0, Tmin, Tmax, k_sig, t_delay): + def value(t, t0, Tmin, Tmax, t_color, t_delay): dt = t - t0 - t_delay # To avoid numerical overflows, let's only compute the exponent not too far from t0 - idx1 = dt <= -100 * k_sig - idx2 = (dt > -100 * k_sig) & (dt < 100 * k_sig) - idx3 = dt >= 100 * k_sig + idx1 = dt <= -100 * t_color + idx2 = (dt > -100 * t_color) & (dt < 100 * t_color) + idx3 = dt >= 100 * t_color result = np.zeros(len(dt)) result[idx1] = Tmax - result[idx2] = Tmin + (Tmax - Tmin) / (1.0 + np.exp(dt[idx2] / k_sig)) + result[idx2] = Tmin + (Tmax - Tmin) / (1.0 + np.exp(dt[idx2] / t_color)) result[idx3] = Tmin return result @@ -163,7 +163,7 @@ def initial_guesses(t, m, sigma, band): initial = {} initial["Tmin"] = 7000.0 initial["Tmax"] = 10000.0 - initial["k_sig"] = 1.0 + initial["t_color"] = 1.0 initial["t_delay"] = 0.0 return initial @@ -175,7 +175,7 @@ def limits(t, m, sigma, band): limits = {} limits["Tmin"] = (1e3, 2e6) # K limits["Tmax"] = (1e3, 2e6) # K - limits["k_sig"] = (1e-4, 10 * t_amplitude) + limits["t_color"] = (1e-4, 10 * t_amplitude) limits["t_delay"] = (-t_amplitude, t_amplitude) return limits diff --git a/light-curve/tests/light_curve_py/features/test_rainbow.py b/light-curve/tests/light_curve_py/features/test_rainbow.py index f612637a..f5e09719 100644 --- a/light-curve/tests/light_curve_py/features/test_rainbow.py +++ b/light-curve/tests/light_curve_py/features/test_rainbow.py @@ -3,44 +3,6 @@ from light_curve.light_curve_py import RainbowFit -def test_noisy_no_baseline(): - rng = np.random.default_rng(0) - - band_wave_aa = {"g": 4770.0, "r": 6231.0, "i": 7625.0, "z": 9134.0} - - reference_time = 60000.0 - amplitude = 1.0 - rise_time = 5.0 - fall_time = 30.0 - Tmin = 5e3 - Tmax = 15e3 - k_sig = 4.0 - - expected = [reference_time, amplitude, rise_time, fall_time, Tmin, Tmax, k_sig, 1.0] - - feature = RainbowFit.from_angstrom(band_wave_aa, with_baseline=False, temperature="sigmoid", bolometric="bazin") - - t = np.sort(rng.uniform(reference_time - 3 * rise_time, reference_time + 3 * fall_time, 1000)) - band = rng.choice(list(band_wave_aa), size=len(t)) - - flux = feature.model(t, band, *expected) - # S/N = 10 for minimum flux, scale for Poisson noise - flux_err = np.sqrt(flux * np.min(flux)) / 10.0 - flux += rng.normal(0.0, flux_err) - - actual = feature(t, flux, sigma=flux_err, band=band) - - # import matplotlib.pyplot as plt - # plt.scatter(t, flux, s=5, label="data") - # plt.errorbar(t, flux, yerr=flux_err, ls="none", capsize=1) - # plt.plot(t, feature.model(t, band, *expected), "x", label="expected") - # plt.plot(t, feature.model(t, band, *actual), "*", label="actual") - # plt.legend() - # plt.show() - - np.testing.assert_allclose(actual[:-1], expected[:-1], rtol=0.1) - - def test_noisy_with_baseline(): rng = np.random.default_rng(0) @@ -80,74 +42,74 @@ def test_noisy_with_baseline(): np.testing.assert_allclose(actual[:-1], expected[:-1], rtol=0.1) -def test_noisy_constant_temperature(): +def test_noisy_all_functions_combination(): rng = np.random.default_rng(0) - band_wave_aa = {"g": 4770.0, "r": 6231.0, "i": 7625.0, "z": 9134.0} + t = np.sort(rng.uniform(59985.0, 60090.0, 1000)) + band = rng.choice(list(band_wave_aa), size=len(t)) - reference_time = 60000.0 - amplitude = 1.0 - rise_time = 5.0 - fall_time = 30.0 - Tmin = 5e3 - baselines = {b: 0.3 * amplitude + rng.exponential(scale=0.3 * amplitude) for b in band_wave_aa} - - expected = [reference_time, amplitude, rise_time, fall_time, Tmin, *baselines.values(), 1.0] + bazin_parameters = [ + 60000.0, # reference_time + 1.0, # amplitude + 5.0, # rise_time + 30.0, # fall_time + ] - feature = RainbowFit.from_angstrom(band_wave_aa, with_baseline=True, temperature="constant", bolometric="bazin") + sigmoid_parameters = [ + 60000.0, # reference_time + 1.0, # amplitude + 5.0, # rise_time + ] - t = np.sort(rng.uniform(reference_time - 3 * rise_time, reference_time + 3 * fall_time, 1000)) - band = rng.choice(list(band_wave_aa), size=len(t)) + linexp_parameters = [ + 60000.0, # reference_time + 1, # amplitude + -20, # rise_time + ] - flux = feature.model(t, band, *expected) - # S/N = 10 for minimum flux, scale for Poisson noise - flux_err = np.sqrt(flux * np.min(flux)) / 10.0 - flux += rng.normal(0.0, flux_err) + doublexp_parameters = [60000.0, 1, 3, 5, 0.1] # reference_time # amplitude # time1 # time2 # p - actual = feature(t, flux, sigma=flux_err, band=band) + bolometric_names = ["bazin", "sigmoid", "linexp", "doublexp"] + bolometric_params = [bazin_parameters, sigmoid_parameters, linexp_parameters, doublexp_parameters] - # import matplotlib.pyplot as plt - # plt.scatter(t, flux, s=5, label="data") - # plt.errorbar(t, flux, yerr=flux_err, ls="none", capsize=1) - # plt.plot(t, feature.model(t, band, *expected), "x", label="expected") - # plt.plot(t, feature.model(t, band, *actual), "*", label="actual") - # plt.legend() - # plt.show() + Tsigmoid_parameters = [5e3, 15e3, 4.0] # Tmin # Tmax # t_color - np.testing.assert_allclose(actual[:-1], expected[:-1], rtol=0.1) + constant_parameters = [1e4] # T + temperature_names = ["constant", "sigmoid"] + temperature_params = [constant_parameters, Tsigmoid_parameters] -def test_noisy_constant_temperature_rising_only(): - rng = np.random.default_rng(0) + for idx_b in range(len(bolometric_names)): + for idx_t in range(len(temperature_names)): - band_wave_aa = {"g": 4770.0, "r": 6231.0, "i": 7625.0, "z": 9134.0} + expected = [*bolometric_params[idx_b], *temperature_params[idx_t], 1.0] - reference_time = 60000.0 - amplitude = 1.0 - rise_time = 5.0 - Tmin = 5e3 - baselines = {b: 0.3 * amplitude + rng.exponential(scale=0.3 * amplitude) for b in band_wave_aa} + feature = RainbowFit.from_angstrom( + band_wave_aa, + with_baseline=False, + temperature=temperature_names[idx_t], + bolometric=bolometric_names[idx_b], + ) - expected = [reference_time, amplitude, rise_time, Tmin, *baselines.values(), 1.0] + flux = feature.model(t, band, *expected) - feature = RainbowFit.from_angstrom(band_wave_aa, with_baseline=True, temperature="constant", bolometric="sigmoid") + # The linexp function can reach unphysical negative flux values + protected_flux = np.where(flux > 1e-3, flux, 1e-3) - t = np.sort(rng.uniform(reference_time - 3 * rise_time, reference_time + 3 * rise_time, 1000)) - band = rng.choice(list(band_wave_aa), size=len(t)) + # S/N = 10 for minimum flux, scale for Poisson noise + flux_err = np.sqrt(protected_flux * np.min(protected_flux)) / 10.0 + flux += rng.normal(0.0, flux_err) - flux = feature.model(t, band, *expected) - # S/N = 5 for minimum flux, scale for Poisson noise - flux_err = np.sqrt(flux * np.min(flux) / 5.0) - flux += rng.normal(0.0, flux_err) + actual = feature(t, flux, sigma=flux_err, band=band) - actual = feature(t, flux, sigma=flux_err, band=band) + # import matplotlib.pyplot as plt + # plt.figure() + # plt.scatter(t, flux, s=5, label="data") + # plt.errorbar(t, flux, yerr=flux_err, ls="none", capsize=1) + # plt.plot(t, feature.model(t, band, *expected), "x", label="expected") + # plt.plot(t, feature.model(t, band, *actual), "*", label="actual") + # plt.ylim(-.05, flux.max()+0.1) + # plt.legend() + # plt.show() - # import matplotlib.pyplot as plt - # plt.scatter(t, flux, s=5, label="data") - # plt.errorbar(t, flux, yerr=flux_err, ls="none", capsize=1) - # plt.plot(t, feature.model(t, band, *expected), "x", label="expected") - # plt.plot(t, feature.model(t, band, *actual), "*", label="actual") - # plt.legend() - # plt.show() - - np.testing.assert_allclose(actual[:-1], expected[:-1], rtol=0.1) + np.testing.assert_allclose(actual[:-1], expected[:-1], rtol=0.1)