From d4b7ce7eca69f39704acf984c6bac6fbb5c9e8cd Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Tue, 11 Oct 2022 21:05:21 -0700 Subject: [PATCH 1/3] Add type-hints to adaptive/learner/integrator_learner.py --- adaptive/learner/integrator_learner.py | 86 ++++++++++++++------------ 1 file changed, 47 insertions(+), 39 deletions(-) diff --git a/adaptive/learner/integrator_learner.py b/adaptive/learner/integrator_learner.py index da9bd7ffc..f0712778d 100644 --- a/adaptive/learner/integrator_learner.py +++ b/adaptive/learner/integrator_learner.py @@ -5,6 +5,7 @@ from collections import defaultdict from math import sqrt from operator import attrgetter +from typing import TYPE_CHECKING, Callable import cloudpickle import numpy as np @@ -25,7 +26,7 @@ with_pandas = False -def _downdate(c, nans, depth): +def _downdate(c: np.ndarray, nans: list[int], depth: int) -> np.ndarray: # This is algorithm 5 from the thesis of Pedro Gonnet. b = coeff.b_def[depth].copy() m = coeff.ns[depth] - 1 @@ -45,7 +46,7 @@ def _downdate(c, nans, depth): return c -def _zero_nans(fx): +def _zero_nans(fx: np.ndarray) -> list[int]: """Caution: this function modifies fx.""" nans = [] for i in range(len(fx)): @@ -55,7 +56,7 @@ def _zero_nans(fx): return nans -def _calc_coeffs(fx, depth): +def _calc_coeffs(fx: np.ndarray, depth: int) -> np.ndarray: """Caution: this function modifies fx.""" nans = _zero_nans(fx) c_new = coeff.V_inv[depth] @ fx @@ -135,19 +136,24 @@ class _Interval: "removed", ] - def __init__(self, a, b, depth, rdepth): - self.children = [] - self.data = {} + def __init__(self, a: int | float, b: int | float, depth: int, rdepth: int) -> None: + self.children: list[_Interval] = [] + self.data: dict[float, float] = {} self.a = a self.b = b self.depth = depth self.rdepth = rdepth - self.done_leaves = set() - self.depth_complete = None + self.done_leaves: set[_Interval] = set() + self.depth_complete: int | None = None self.removed = False + if TYPE_CHECKING: + self.ndiv: int + self.parent: _Interval | None + self.err: float + self.c: np.ndarray @classmethod - def make_first(cls, a, b, depth=2): + def make_first(cls, a: int, b: int, depth: int = 2) -> _Interval: ival = _Interval(a, b, depth, rdepth=1) ival.ndiv = 0 ival.parent = None @@ -155,7 +161,7 @@ def make_first(cls, a, b, depth=2): return ival @property - def T(self): + def T(self) -> np.ndarray: """Get the correct shift matrix. Should only be called on children of a split interval. @@ -166,24 +172,24 @@ def T(self): assert left != right return coeff.T_left if left else coeff.T_right - def refinement_complete(self, depth): + def refinement_complete(self, depth: int) -> bool: """The interval has all the y-values to calculate the intergral.""" if len(self.data) < coeff.ns[depth]: return False return all(p in self.data for p in self.points(depth)) - def points(self, depth=None): + def points(self, depth: int | None = None) -> np.ndarray: if depth is None: depth = self.depth a = self.a b = self.b return (a + b) / 2 + (b - a) * coeff.xi[depth] / 2 - def refine(self): + def refine(self) -> _Interval: self.depth += 1 return self - def split(self): + def split(self) -> list[_Interval]: points = self.points() m = points[len(points) // 2] ivals = [ @@ -198,10 +204,10 @@ def split(self): return ivals - def calc_igral(self): + def calc_igral(self) -> None: self.igral = (self.b - self.a) * self.c[0] / sqrt(2) - def update_heuristic_err(self, value): + def update_heuristic_err(self, value: float) -> None: """Sets the error of an interval using a heuristic (half the error of the parent) when the actual error cannot be calculated due to its parents not being finished yet. This error is propagated down to its @@ -214,7 +220,7 @@ def update_heuristic_err(self, value): continue child.update_heuristic_err(value / 2) - def calc_err(self, c_old): + def calc_err(self, c_old: np.ndarray) -> float: c_new = self.c c_diff = np.zeros(max(len(c_old), len(c_new))) c_diff[: len(c_old)] = c_old @@ -226,9 +232,9 @@ def calc_err(self, c_old): child.update_heuristic_err(self.err / 2) return c_diff - def calc_ndiv(self): + def calc_ndiv(self) -> None: div = self.parent.c00 and self.c00 / self.parent.c00 > 2 - self.ndiv += div + self.ndiv += int(div) if self.ndiv > coeff.ndiv_max and 2 * self.ndiv > self.rdepth: raise DivergentIntegralError @@ -237,7 +243,7 @@ def calc_ndiv(self): for child in self.children: child.update_ndiv_recursively() - def update_ndiv_recursively(self): + def update_ndiv_recursively(self) -> None: self.ndiv += 1 if self.ndiv > coeff.ndiv_max and 2 * self.ndiv > self.rdepth: raise DivergentIntegralError @@ -245,7 +251,7 @@ def update_ndiv_recursively(self): for child in self.children: child.update_ndiv_recursively() - def complete_process(self, depth): + def complete_process(self, depth: int) -> tuple[bool, bool] | tuple[bool, np.bool_]: """Calculate the integral contribution and error from this interval, and update the done leaves of all ancestor intervals.""" assert self.depth_complete is None or self.depth_complete == depth - 1 @@ -322,7 +328,7 @@ def complete_process(self, depth): return force_split, remove - def __repr__(self): + def __repr__(self) -> str: lst = [ f"(a, b)=({self.a:.5f}, {self.b:.5f})", f"depth={self.depth}", @@ -334,7 +340,7 @@ def __repr__(self): class IntegratorLearner(BaseLearner): - def __init__(self, function, bounds, tol): + def __init__(self, function: Callable, bounds: tuple[int, int], tol: float) -> None: """ Parameters ---------- @@ -368,16 +374,18 @@ def __init__(self, function, bounds, tol): plot : hv.Scatter Plots all the points that are evaluated. """ - self.function = function + self.function = function # type: ignore self.bounds = bounds self.tol = tol self.max_ivals = 1000 - self.priority_split = [] + self.priority_split: list[_Interval] = [] self.data = {} self.pending_points = set() - self._stack = [] - self.x_mapping = defaultdict(lambda: SortedSet([], key=attrgetter("rdepth"))) - self.ivals = set() + self._stack: list[float] = [] + self.x_mapping: dict[float, SortedSet] = defaultdict( + lambda: SortedSet([], key=attrgetter("rdepth")) + ) + self.ivals: set[_Interval] = set() ival = _Interval.make_first(*self.bounds) self.add_ival(ival) self.first_ival = ival @@ -387,10 +395,10 @@ def new(self) -> IntegratorLearner: return IntegratorLearner(self.function, self.bounds, self.tol) @property - def approximating_intervals(self): + def approximating_intervals(self) -> set[_Interval]: return self.first_ival.done_leaves - def tell(self, point, value): + def tell(self, point: float, value: float) -> None: if point not in self.x_mapping: raise ValueError(f"Point {point} doesn't belong to any interval") self.data[point] = value @@ -426,7 +434,7 @@ def tell(self, point, value): def tell_pending(self): pass - def propagate_removed(self, ival): + def propagate_removed(self, ival: _Interval) -> None: def _propagate_removed_down(ival): ival.removed = True self.ivals.discard(ival) @@ -436,7 +444,7 @@ def _propagate_removed_down(ival): _propagate_removed_down(ival) - def add_ival(self, ival): + def add_ival(self, ival: _Interval) -> None: for x in ival.points(): # Update the mappings self.x_mapping[x].add(ival) @@ -447,7 +455,7 @@ def add_ival(self, ival): self._stack.append(x) self.ivals.add(ival) - def ask(self, n, tell_pending=True): + def ask(self, n: int, tell_pending: bool = True) -> tuple[list[float], list[float]]: """Choose points for learners.""" if not tell_pending: with restore(self): @@ -455,7 +463,7 @@ def ask(self, n, tell_pending=True): else: return self._ask_and_tell_pending(n) - def _ask_and_tell_pending(self, n): + def _ask_and_tell_pending(self, n: int) -> tuple[list[float], list[float]]: points, loss_improvements = self.pop_from_stack(n) n_left = n - len(points) while n_left > 0: @@ -471,7 +479,7 @@ def _ask_and_tell_pending(self, n): return points, loss_improvements - def pop_from_stack(self, n): + def pop_from_stack(self, n: int) -> tuple[list[float], list[float]]: points = self._stack[:n] self._stack = self._stack[n:] loss_improvements = [ @@ -482,7 +490,7 @@ def pop_from_stack(self, n): def remove_unfinished(self): pass - def _fill_stack(self): + def _fill_stack(self) -> list[float]: # XXX: to-do if all the ivals have err=inf, take the interval # with the lowest rdepth and no children. force_split = bool(self.priority_split) @@ -518,16 +526,16 @@ def _fill_stack(self): return self._stack @property - def npoints(self): + def npoints(self) -> int: """Number of evaluated points.""" return len(self.data) @property - def igral(self): + def igral(self) -> float: return sum(i.igral for i in self.approximating_intervals) @property - def err(self): + def err(self) -> float: if self.approximating_intervals: err = sum(i.err for i in self.approximating_intervals) if err > sys.float_info.max: From 8c71e0b1f72e8ae2f3395dafd1d79b5f733fc736 Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Tue, 11 Oct 2022 21:05:30 -0700 Subject: [PATCH 2/3] Add type-hints to adaptive/tests/algorithm_4.py --- adaptive/tests/algorithm_4.py | 67 +++++++++++++++++++++-------------- 1 file changed, 40 insertions(+), 27 deletions(-) diff --git a/adaptive/tests/algorithm_4.py b/adaptive/tests/algorithm_4.py index fb401e866..4566c0fa1 100644 --- a/adaptive/tests/algorithm_4.py +++ b/adaptive/tests/algorithm_4.py @@ -2,7 +2,8 @@ # Copyright 2017 Christoph Groth from collections import defaultdict -from fractions import Fraction as Frac +from fractions import Fraction +from typing import Callable, List, Tuple, Union import numpy as np from numpy.testing import assert_allclose @@ -11,7 +12,7 @@ eps = np.spacing(1) -def legendre(n): +def legendre(n: int) -> List[List[Fraction]]: """Return the first n Legendre polynomials. The polynomials have *standard* normalization, i.e. @@ -19,12 +20,12 @@ def legendre(n): The return value is a list of list of fraction.Fraction instances. """ - result = [[Frac(1)], [Frac(0), Frac(1)]] + result = [[Fraction(1)], [Fraction(0), Fraction(1)]] if n <= 2: return result[:n] for i in range(2, n): # Use Bonnet's recursion formula. - new = (i + 1) * [Frac(0)] + new = (i + 1) * [Fraction(0)] new[1:] = (r * (2 * i - 1) for r in result[-1]) new[:-2] = (n - r * (i - 1) for n, r in zip(new[:-2], result[-2])) new[:] = (n / i for n in new) @@ -32,7 +33,7 @@ def legendre(n): return result -def newton(n): +def newton(n: int) -> np.ndarray: """Compute the monomial coefficients of the Newton polynomial over the nodes of the n-point Clenshaw-Curtis quadrature rule. """ @@ -89,7 +90,7 @@ def newton(n): return cf -def scalar_product(a, b): +def scalar_product(a: List[Fraction], b: List[Fraction]) -> Fraction: """Compute the polynomial scalar product int_-1^1 dx a(x) b(x). The args must be sequences of polynomial coefficients. This @@ -110,7 +111,7 @@ def scalar_product(a, b): return 2 * sum(c[i] / (i + 1) for i in range(0, lc, 2)) -def calc_bdef(ns): +def calc_bdef(ns: Tuple[int, int, int, int]) -> List[np.ndarray]: """Calculate the decompositions of Newton polynomials (over the nodes of the n-point Clenshaw-Curtis quadrature rule) in terms of Legandre polynomials. @@ -123,7 +124,7 @@ def calc_bdef(ns): result = [] for n in ns: poly = [] - a = list(map(Frac, newton(n))) + a = list(map(Fraction, newton(n))) for b in legs[: n + 1]: igral = scalar_product(a, b) @@ -145,7 +146,7 @@ def calc_bdef(ns): b_def = calc_bdef(n) -def calc_V(xi, n): +def calc_V(xi: np.ndarray, n: int) -> np.ndarray: V = [np.ones(xi.shape), xi.copy()] for i in range(2, n): V.append((2 * i - 1) / i * xi * V[-1] - (i - 1) / i * V[-2]) @@ -183,7 +184,7 @@ def calc_V(xi, n): gamma = np.concatenate([[0, 0], np.sqrt(k[2:] ** 2 / (4 * k[2:] ** 2 - 1))]) -def _downdate(c, nans, depth): +def _downdate(c: np.ndarray, nans: List[int], depth: int) -> None: # This is algorithm 5 from the thesis of Pedro Gonnet. b = b_def[depth].copy() m = n[depth] - 1 @@ -200,7 +201,7 @@ def _downdate(c, nans, depth): m -= 1 -def _zero_nans(fx): +def _zero_nans(fx: np.ndarray) -> List[int]: nans = [] for i in range(len(fx)): if not np.isfinite(fx[i]): @@ -209,7 +210,7 @@ def _zero_nans(fx): return nans -def _calc_coeffs(fx, depth): +def _calc_coeffs(fx: np.ndarray, depth: int) -> np.ndarray: """Caution: this function modifies fx.""" nans = _zero_nans(fx) c_new = V_inv[depth] @ fx @@ -220,7 +221,7 @@ def _calc_coeffs(fx, depth): class DivergentIntegralError(ValueError): - def __init__(self, msg, igral, err, nr_points): + def __init__(self, msg: str, igral: float, err: None, nr_points: int) -> None: self.igral = igral self.err = err self.nr_points = nr_points @@ -230,19 +231,23 @@ def __init__(self, msg, igral, err, nr_points): class _Interval: __slots__ = ["a", "b", "c", "fx", "igral", "err", "depth", "rdepth", "ndiv", "c00"] - def __init__(self, a, b, depth, rdepth): + def __init__( + self, a: Union[int, float], b: Union[int, float], depth: int, rdepth: int + ) -> None: self.a = a self.b = b self.depth = depth self.rdepth = rdepth - def points(self): + def points(self) -> np.ndarray: a = self.a b = self.b return (a + b) / 2 + (b - a) * xi[self.depth] / 2 @classmethod - def make_first(cls, f, a, b, depth=2): + def make_first( + cls, f: Callable, a: int, b: int, depth: int = 2 + ) -> Tuple["_Interval", int]: ival = _Interval(a, b, depth, 1) fx = f(ival.points()) ival.c = _calc_coeffs(fx, depth) @@ -251,7 +256,7 @@ def make_first(cls, f, a, b, depth=2): ival.ndiv = 0 return ival, n[depth] - def calc_igral_and_err(self, c_old): + def calc_igral_and_err(self, c_old: np.ndarray) -> float: self.c = c_new = _calc_coeffs(self.fx, self.depth) c_diff = np.zeros(max(len(c_old), len(c_new))) c_diff[: len(c_old)] = c_old @@ -262,7 +267,9 @@ def calc_igral_and_err(self, c_old): self.err = w * c_diff return c_diff - def split(self, f): + def split( + self, f: Callable + ) -> Union[Tuple[Tuple[float, float, float], int], Tuple[List["_Interval"], int]]: m = (self.a + self.b) / 2 f_center = self.fx[(len(self.fx) - 1) // 2] @@ -287,7 +294,7 @@ def split(self, f): return ivals, nr_points - def refine(self, f): + def refine(self, f: Callable) -> Tuple[np.ndarray, bool, int]: """Increase degree of interval.""" self.depth = depth = self.depth + 1 points = self.points() @@ -299,7 +306,9 @@ def refine(self, f): return points, split, n[depth] - n[depth - 1] -def algorithm_4(f, a, b, tol, N_loops=int(1e9)): +def algorithm_4( + f: Callable, a: int, b: int, tol: float, N_loops: int = int(1e9) +) -> Tuple[float, float, int, List["_Interval"]]: """ALGORITHM_4 evaluates an integral using adaptive quadrature. The algorithm uses Clenshaw-Curtis quadrature rules of increasing degree in each interval and bisects the interval if either the @@ -403,10 +412,10 @@ def algorithm_4(f, a, b, tol, N_loops=int(1e9)): return igral, err, nr_points, ivals -################ Tests ################ +# ############### Tests ################ -def f0(x): +def f0(x: Union[float, np.ndarray]) -> Union[float, np.ndarray]: return x * np.sin(1 / x) * np.sqrt(abs(1 - x)) @@ -414,18 +423,20 @@ def f7(x): return x**-0.5 -def f24(x): +def f24(x: Union[float, np.ndarray]) -> Union[float, np.ndarray]: return np.floor(np.exp(x)) -def f21(x): +def f21(x: Union[float, np.ndarray]) -> Union[float, np.ndarray]: y = 0 for i in range(1, 4): y += 1 / np.cosh(20**i * (x - 2 * i / 10)) return y -def f63(x, alpha, beta): +def f63( + x: Union[float, np.ndarray], alpha: float, beta: float +) -> Union[float, np.ndarray]: return abs(x - beta) ** alpha @@ -433,7 +444,7 @@ def F63(x, alpha, beta): return (x - beta) * abs(x - beta) ** alpha / (alpha + 1) -def fdiv(x): +def fdiv(x: Union[float, np.ndarray]) -> Union[float, np.ndarray]: return abs(x - 0.987654321) ** -1.1 @@ -461,7 +472,9 @@ def test_scalar_product(n=33): selection = [0, 5, 7, n - 1] for i in selection: for j in selection: - assert scalar_product(legs[i], legs[j]) == ((i == j) and Frac(2, 2 * i + 1)) + assert scalar_product(legs[i], legs[j]) == ( + (i == j) and Fraction(2, 2 * i + 1) + ) def simple_newton(n): From ae70ec8e6ee83ef75a729e2c5bbabf94bc818ea5 Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Tue, 11 Oct 2022 21:05:20 -0700 Subject: [PATCH 3/3] Add type-hints to adaptive/learner/integrator_coeffs.py --- adaptive/learner/integrator_coeffs.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/adaptive/learner/integrator_coeffs.py b/adaptive/learner/integrator_coeffs.py index 9ccc54be1..711f30b76 100644 --- a/adaptive/learner/integrator_coeffs.py +++ b/adaptive/learner/integrator_coeffs.py @@ -1,4 +1,5 @@ # Based on an adaptive quadrature algorithm by Pedro Gonnet +from __future__ import annotations from collections import defaultdict from fractions import Fraction @@ -8,7 +9,7 @@ import scipy.linalg -def legendre(n): +def legendre(n: int) -> list[list[Fraction]]: """Return the first n Legendre polynomials. The polynomials have *standard* normalization, i.e. @@ -29,7 +30,7 @@ def legendre(n): return result -def newton(n): +def newton(n: int) -> np.ndarray: """Compute the monomial coefficients of the Newton polynomial over the nodes of the n-point Clenshaw-Curtis quadrature rule. """ @@ -86,7 +87,7 @@ def newton(n): return cf -def scalar_product(a, b): +def scalar_product(a: list[Fraction], b: list[Fraction]) -> Fraction: """Compute the polynomial scalar product int_-1^1 dx a(x) b(x). The args must be sequences of polynomial coefficients. This @@ -107,7 +108,7 @@ def scalar_product(a, b): return 2 * sum(c[i] / (i + 1) for i in range(0, lc, 2)) -def calc_bdef(ns): +def calc_bdef(ns: tuple[int, int, int, int]) -> list[np.ndarray]: """Calculate the decompositions of Newton polynomials (over the nodes of the n-point Clenshaw-Curtis quadrature rule) in terms of Legandre polynomials. @@ -133,7 +134,7 @@ def calc_bdef(ns): return result -def calc_V(x, n): +def calc_V(x: np.ndarray, n: int) -> np.ndarray: V = [np.ones(x.shape), x.copy()] for i in range(2, n): V.append((2 * i - 1) / i * x * V[-1] - (i - 1) / i * V[-2])