diff --git a/README.md b/README.md index 61c45b3..613b10d 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ Minimally, pywhy-stats requires: * Python (>=3.8) * numpy * scipy + * scikit-learn ## User Installation @@ -42,4 +43,4 @@ To install the package from github, clone the repository and then `cd` into the # Contributing -We welcome contributions from the community. Please refer to our [contributing document](./CONTRIBUTING.md) and [developer document](./DEVELOPING.md) for information on developer workflows. \ No newline at end of file +We welcome contributions from the community. Please refer to our [contributing document](./CONTRIBUTING.md) and [developer document](./DEVELOPING.md) for information on developer workflows. diff --git a/doc/api.rst b/doc/api.rst index 327a50c..e898ec4 100644 --- a/doc/api.rst +++ b/doc/api.rst @@ -57,10 +57,25 @@ contains the p-value and the test statistic and optionally additional informatio Testing for conditional independence among variables is a core part of many data analysis procedures. -.. currentmodule:: pywhy_stats +.. currentmodule:: pywhy_stats.independence .. autosummary:: :toctree: generated/ - + fisherz kci + +(Conditional) K-Sample Testing +============================== + +Testing for invariances among conditional distributions is a core part +of many data analysis procedures. Currently, we only support conditional +2-sample testing among two distributions. + +.. currentmodule:: pywhy_stats.discrepancy +.. autosummary:: + :toctree: generated/ + + bregman + kcd + diff --git a/doc/conditional_independence.rst b/doc/conditional_independence.rst index 735a132..6f80225 100644 --- a/doc/conditional_independence.rst +++ b/doc/conditional_independence.rst @@ -80,13 +80,14 @@ various proposals in the literature for estimating CMI, which we summarize here: estimating :math:`P(y|x)` and :math:`P(y|x,z)`, which can be used as plug-in estimates to the equation for CMI. -:mod:`pywhy_stats.fisherz` Partial (Pearson) Correlation --------------------------------------------------------- +:mod:`pywhy_stats.independence.fisherz` Partial (Pearson) Correlation +--------------------------------------------------------------------- Partial correlation based on the Pearson correlation is equivalent to CMI in the setting of normally distributed data. Computing partial correlation is fast and efficient and thus attractive to use. However, this **relies on the assumption that the variables are Gaussiany**, which may be unrealistic in certain datasets. +.. currentmodule:: pywhy_stats.independence .. autosummary:: :toctree: generated/ @@ -100,8 +101,8 @@ each discrete variable. An exponential amount of data is needed for increasing l for a discrete variable. -Kernel-Approaches ------------------ +:mod:`pywhy_stats.independence.kci` Kernel-Approaches +----------------------------------------------------- Kernel independence tests are statistical methods used to determine if two random variables are independent or conditionally independent. One such test is the Hilbert-Schmidt Independence Criterion (HSIC), which examines the independence between two random variables, X and Y. HSIC employs kernel methods and, more specifically, it computes @@ -121,6 +122,12 @@ Kernel-based tests are attractive for many applications, since they are semi-par that have been shown to be robust in the machine-learning field. For more information, see :footcite:`Zhang2011`. +.. currentmodule:: pywhy_stats.independence +.. autosummary:: + :toctree: generated/ + + kci + Classifier-based Approaches --------------------------- Another suite of approaches that rely on permutation testing is the classifier-based approach. @@ -140,9 +147,9 @@ helps maintain dependence between (X, Z) and (Y, Z) (if it exists), but generate conditionally independent dataset. -======================= -Conditional Discrepancy -======================= +========================================= +Conditional Distribution 2-Sample Testing +========================================= .. currentmodule:: pywhy_stats @@ -166,22 +173,35 @@ indices of the distribution, one can convert the CD test: :math:`P_{i=j}(y|x) =? P_{i=k}(y|x)` into the CI test :math:`P(y|x,i) = P(y|x)`, which can be tested with the Chi-square CI tests. -Kernel-Approaches ------------------ +:mod:`pywhy_stats.discrepancy.kcd` Kernel-Approaches +----------------------------------------------------- Kernel-based tests are attractive since they are semi-parametric and use kernel-based ideas that have been shown to be robust in the machine-learning field. The Kernel CD test is a test that computes a test statistic from kernels of the data and uses a weighted permutation testing based on the estimated propensity scores to generate samples from the null distribution :footcite:`Park2021conditional`, which are then used to estimate a pvalue. +.. currentmodule:: pywhy_stats.discrepancy +.. autosummary:: + :toctree: generated/ + + kcd -Bregman-Divergences -------------------- + +:mod:`pywhy_stats.discrepancy.bregman` Bregman-Divergences +---------------------------------------------------------- The Bregman CD test is a divergence-based test that computes a test statistic from estimated Von-Neumann divergences of the data and uses a weighted permutation testing based on the estimated propensity scores to generate samples from the null distribution :footcite:`Yu2020Bregman`, which are then used to estimate a pvalue. + +.. currentmodule:: pywhy_stats.discrepancy +.. autosummary:: + :toctree: generated/ + + bregman + ========== References ========== diff --git a/doc/conf.py b/doc/conf.py index c28203d..c983300 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -39,7 +39,7 @@ # If your documentation needs a minimal Sphinx version, state it here. # -needs_sphinx = "4.0" +needs_sphinx = "5.0" # Add any Sphinx extension module names here, as strings. They can be # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom @@ -146,9 +146,7 @@ "PValueResult": "pywhy_stats.pvalue_result.PValueResult", # numpy "NDArray": "numpy.ndarray", - # "ArrayLike": "numpy.typing.ArrayLike", "ArrayLike": ":term:`array_like`", - "fisherz": "pywhy_stats.fisherz", } autodoc_typehints_format = "short" diff --git a/doc/whats_new/v0.1.rst b/doc/whats_new/v0.1.rst index f424283..d010d0a 100644 --- a/doc/whats_new/v0.1.rst +++ b/doc/whats_new/v0.1.rst @@ -26,8 +26,9 @@ Version 0.1 Changelog --------- -- |Feature| Implement partial correlation test :func:`pywhy_stats.fisherz`, by `Adam Li`_ (:pr:`7`) -- |Feature| Add (un)conditional kernel independence test by `Patrick Blöbaum`_, co-authored by `Adam Li`_ (:pr:`14`) +- |Feature| Implement partial correlation test `pywhy_stats.fisherz`, by `Adam Li`_ (:pr:`7`) +- |Feature| Add (un)conditional kernel independence test, `pywhy_stats.kci`, by `Patrick Blöbaum`_, co-authored by `Adam Li`_ (:pr:`14`) +- |Feature| Add conditional kernel and Bregman discrepancy tests, `pywhy_stats.kcd` and `pywhy_stats.bregman` by `Adam Li`_ (:pr:`15`) Code and Documentation Contributors @@ -37,4 +38,4 @@ Thanks to everyone who has contributed to the maintenance and improvement of the project since version inception, including: * `Adam Li`_ - +* `Patrick Blöbaum`_ diff --git a/pywhy_stats/__init__.py b/pywhy_stats/__init__.py index a20920c..2cce2a9 100644 --- a/pywhy_stats/__init__.py +++ b/pywhy_stats/__init__.py @@ -1,3 +1,5 @@ -from . import fisherz, kci +from . import discrepancy, independence from ._version import __version__ # noqa: F401 -from .independence import Methods, independence_test +from .api import Methods, independence_test +from .discrepancy import bregman, kcd # noqa: F401 +from .independence import fisherz, kci # noqa: F401 diff --git a/pywhy_stats/independence.py b/pywhy_stats/api.py similarity index 88% rename from pywhy_stats/independence.py rename to pywhy_stats/api.py index dd99897..59767ee 100644 --- a/pywhy_stats/independence.py +++ b/pywhy_stats/api.py @@ -6,7 +6,7 @@ import scipy.stats from numpy.typing import ArrayLike -from pywhy_stats import fisherz, kci +from pywhy_stats.independence import fisherz, kci from .pvalue_result import PValueResult @@ -18,10 +18,10 @@ class Methods(Enum): """Choose an automatic method based on the data.""" FISHERZ = fisherz - """:py:mod:`~pywhy_stats.fisherz`: Fisher's Z test for independence""" + """:py:mod:`~pywhy_stats.independence.fisherz`: Fisher's Z test for independence""" KCI = kci - """:py:mod:`~pywhy_stats.kci`: Conditional kernel independence test""" + """:py:mod:`~pywhy_stats.independence.kci`: Conditional kernel independence test""" def independence_test( @@ -59,8 +59,8 @@ def independence_test( See Also -------- - fisherz : Fisher's Z test for independence - kci : Kernel Conditional Independence test + independence.fisherz : Fisher's Z test for independence + independence.kci : Kernel Conditional Independence test """ method_module: ModuleType if method == Methods.AUTO: diff --git a/pywhy_stats/discrepancy/__init__.py b/pywhy_stats/discrepancy/__init__.py new file mode 100644 index 0000000..fa08b4b --- /dev/null +++ b/pywhy_stats/discrepancy/__init__.py @@ -0,0 +1 @@ +from . import bregman, kcd diff --git a/pywhy_stats/discrepancy/base.py b/pywhy_stats/discrepancy/base.py new file mode 100644 index 0000000..7634765 --- /dev/null +++ b/pywhy_stats/discrepancy/base.py @@ -0,0 +1,131 @@ +from typing import Callable, Optional + +import numpy as np +from joblib import Parallel, delayed +from numpy.typing import ArrayLike +from sklearn.base import BaseEstimator +from sklearn.linear_model import LogisticRegression + +from pywhy_stats.kernel_utils import _default_regularization + + +def _preprocess_propensity_data( + group_ind: ArrayLike, + propensity_model: Optional[BaseEstimator], + propensity_weights: Optional[ArrayLike], +): + if group_ind.ndim != 1: + raise RuntimeError("group_ind must be a 1d array.") + if len(np.unique(group_ind)) != 2: + raise RuntimeError( + f"There should only be two groups. Found {len(np.unique(group_ind))} groups." + ) + if propensity_model is not None and propensity_weights is not None: + raise ValueError( + "Both propensity model and propensity estimates are specified. Only one is allowed." + ) + if propensity_weights is not None: + if propensity_weights.shape[0] != len(group_ind): + raise ValueError( + f"There are {propensity_weights.shape[0]} pre-defined estimates, while " + f"there are {len(group_ind)} samples." + ) + if propensity_weights.shape[1] != len(np.unique(group_ind.squeeze())): + raise ValueError( + f"There are {propensity_weights.shape[1]} group pre-defined estimates, while " + f"there are {len(np.unique(group_ind))} unique groups." + ) + + +def _compute_propensity_scores( + group_ind: ArrayLike, + propensity_model: Optional[BaseEstimator] = None, + propensity_weights: Optional[ArrayLike] = None, + n_jobs: Optional[int] = None, + random_state: Optional[int] = None, + **kwargs, +): + if propensity_model is None: + K: ArrayLike = kwargs.get("K") + + # compute a default penalty term if using a kernel matrix + # C is the inverse of the regularization parameter + if K.shape[0] == K.shape[1]: + # default regularization is 1 / (2 * K) + propensity_penalty_ = _default_regularization(K) + C = 1 / (2 * propensity_penalty_) + else: + # defaults to no regularization + propensity_penalty_ = 0.0 + C = 1.0 + + # default model is logistic regression + propensity_model_ = LogisticRegression( + penalty="l2", + n_jobs=n_jobs, + warm_start=True, + solver="lbfgs", + random_state=random_state, + C=C, + ) + else: + propensity_model_ = propensity_model + + # either use pre-defined propensity weights, or estimate them + if propensity_weights is None: + K = kwargs.get("K") + # fit and then obtain the probabilities of treatment + # for each sample (i.e. the propensity scores) + propensity_weights = propensity_model_.fit(K, group_ind.ravel()).predict_proba(K)[:, 1] + else: + propensity_weights = propensity_weights[:, 1] + return propensity_weights + + +def compute_null( + func: Callable, + e_hat: ArrayLike, + X: ArrayLike, + Y: ArrayLike, + null_reps: int = 1000, + n_jobs=None, + seed=None, + **kwargs, +) -> ArrayLike: + """Estimate null distribution using propensity weights. + + Parameters + ---------- + func : Callable + The function to compute the test statistic. + e_hat : Array-like of shape (n_samples,) + The predicted propensity score for ``group_ind == 1``. + X : Array-Like of shape (n_samples, n_features_x) + The X (covariates) array. + Y : Array-Like of shape (n_samples, n_features_y) + The Y (outcomes) array. + null_reps : int, optional + Number of times to sample null, by default 1000. + n_jobs : int, optional + Number of jobs to run in parallel, by default None. + seed : int, optional + Random generator, or random seed, by default None. + + Returns + ------- + null_dist : Array-like of shape (n_samples,) + The null distribution of test statistics. + """ + rng = np.random.default_rng(seed) + n_samps = X.shape[0] + + # compute the test statistic on the conditionally permuted + # dataset, where each group label is resampled for each sample + # according to its propensity score + null_dist = Parallel(n_jobs=n_jobs)( + [ + delayed(func)(X, Y, group_ind=rng.binomial(1, e_hat, size=n_samps), **kwargs) + for _ in range(null_reps) + ] + ) + return np.asarray(null_dist) diff --git a/pywhy_stats/discrepancy/bregman.py b/pywhy_stats/discrepancy/bregman.py new file mode 100644 index 0000000..123d492 --- /dev/null +++ b/pywhy_stats/discrepancy/bregman.py @@ -0,0 +1,205 @@ +"""Bregman (conditional) discrepancy test. + +Also known as a conditional k-sample test, where the null hypothesis is that the +conditional distributions are equal across different population groups. The +Bregman tests for conditional divergence using correntropy. + +Returns +------- +PValueResult + The result of the test, which includes the test statistic and pvalue. +""" + +from typing import Callable, Optional, Tuple + +import numpy as np +from numpy.typing import ArrayLike +from sklearn.base import BaseEstimator +from sklearn.preprocessing import LabelBinarizer + +from pywhy_stats.kernel_utils import _preprocess_kernel_data, corrent_matrix, von_neumann_divergence + +from ..pvalue_result import PValueResult +from .base import _compute_propensity_scores, _preprocess_propensity_data, compute_null + + +def condind( + X: ArrayLike, + Y: ArrayLike, + group_ind: ArrayLike, + kernel: Optional[Callable[[ArrayLike], ArrayLike]] = None, + null_sample_size: int = 1000, + propensity_model: Optional[Callable] = None, + propensity_weights: Optional[ArrayLike] = None, + centered: bool = False, + n_jobs: Optional[int] = None, + random_seed: Optional[int] = None, +) -> PValueResult: + """ + Test whether Y conditioned on X is invariant across the groups. + + For testing conditional independence on continuous data, we compute Bregman divergences + :footcite:`Yu2020Bregman`. This specifically + tests the (conditional) invariance null hypothesis :math:: + + P_{Z=1}(Y|X) = P_{Z=0}(Y|X) + + Parameters + ---------- + X : ArrayLike of shape (n_samples, n_features_x) + Data for variable X, which can be multidimensional. + Y : ArrayLike of shape (n_samples, n_features_y) + Data for variable Y, which can be multidimensional. + group_ind : ArrayLike of shape (n_samples,) + Data for group indicator Z, which can be multidimensional. This assigns each sample + to a group indicated by 0 or 1. + kernel_X : Callable[[ArrayLike], ArrayLike] + The kernel function for X. By default, the RBF kernel is used for continuous and the delta + kernel for categorical data. Note that we currently only consider string values as categorical data. + kernel_Y : Callable[[ArrayLike], ArrayLike] + The kernel function for Y. By default, the RBF kernel is used for continuous and the delta + kernel for categorical data. Note that we currently only consider string values as categorical data. + null_sample_size : int + The number of samples to generate for the bootstrap distribution to approximate the pvalue, + by default 1000. + propensity_model : Optional[sklearn.base.BaseEstimator], optional + The propensity model to use to estimate the propensity score, by default None. + propensity_weights : Optional[ArrayLike], optional + The propensity weights to use, by default None, which means that the propensity scores + will be estimated from the propensity_model. + centered : bool + Whether the kernel matrix should be centered, by default True. + n_jobs : Optional[int], optional + The number of jobs to run in parallel, by default None. + random_seed : Optional[int], optional + Random seed, by default None. + + Notes + ----- + Any callable can be given to create the kernel matrix. For instance, to use a particular kernel + from sklearn:: + + kernel_X = func:`sklearn.metrics.pairwise.pairwise_kernels.polynomial` + + References + ---------- + .. footbibliography:: + """ + test_statistic, pvalue = _bregman_test( + X=X, + group_ind=group_ind, + Y=Y, + kernel=kernel, + propensity_weights=propensity_weights, + propensity_model=propensity_model, + null_sample_size=null_sample_size, + centered=centered, + n_jobs=n_jobs, + random_seed=random_seed, + ) + return PValueResult(pvalue=pvalue, statistic=test_statistic) + + +def _bregman_test( + X: ArrayLike, + group_ind: ArrayLike, + Y: ArrayLike, + kernel: Optional[Callable[[np.ndarray], np.ndarray]], + propensity_weights: Optional[ArrayLike], + propensity_model: Optional[BaseEstimator], + null_sample_size: int, + centered: bool, + n_jobs: Optional[int], + random_seed: Optional[int], +) -> Tuple[float, float]: + X, Y, _ = _preprocess_kernel_data(X, Y, normalize_data=False) + _preprocess_propensity_data( + group_ind=group_ind, + propensity_weights=propensity_weights, + propensity_model=propensity_model, + ) + + enc = LabelBinarizer(neg_label=0, pos_label=1) + group_ind = enc.fit_transform(group_ind).squeeze() + # We are interested in testing: P_1(y|x) = P_2(y|x) + # compute the conditional divergence, which is symmetric by construction + # 1/2 * (D(p_1(y|x) || p_2(y|x)) + D(p_2(y|x) || p_1(y|x))) + conditional_div = _compute_test_statistic( + X=X, Y=Y, group_ind=group_ind, metric=kernel, centered=centered, n_jobs=n_jobs + ) + + # compute propensity scores + e_hat = _compute_propensity_scores( + group_ind, + propensity_model=propensity_model, + propensity_weights=propensity_weights, + n_jobs=n_jobs, + random_state=random_seed, + K=X, + ) + + # now compute null distribution + null_dist = compute_null( + _compute_test_statistic, + e_hat, + X=X, + Y=Y, + null_reps=null_sample_size, + seed=random_seed, + n_jobs=n_jobs, + metric=kernel, + centered=centered, + ) + + # compute pvalue + pvalue = (1.0 + np.sum(null_dist >= conditional_div)) / (1 + null_sample_size) + return conditional_div, pvalue + + +def _compute_test_statistic( + X: ArrayLike, + Y: ArrayLike, + group_ind: ArrayLike, + metric: Optional[Callable[[ArrayLike], ArrayLike]], + centered: bool = True, + n_jobs: Optional[int] = None, +) -> float: + if set(np.unique(group_ind)) != {0, 1}: + raise ValueError("group_ind must be binary") + first_group = group_ind == 0 + second_group = group_ind == 1 + + X1 = X[first_group, :] + X2 = X[second_group, :] + Y1 = Y[first_group, :] + Y2 = Y[second_group, :] + + # first compute the centered correntropy matrices, C_xy^1 + Cx1y1 = corrent_matrix(np.hstack((X1, Y1)), metric=metric, centered=centered, n_jobs=n_jobs) + Cx2y2 = corrent_matrix(np.hstack((X2, Y2)), metric=metric, centered=centered, n_jobs=n_jobs) + + # compute the centered correntropy matrices for just C_x^1 and C_x^2 + Cx1 = corrent_matrix( + X1, + metric=metric, + centered=centered, + n_jobs=n_jobs, + ) + Cx2 = corrent_matrix( + X2, + metric=metric, + centered=centered, + n_jobs=n_jobs, + ) + + # compute the conditional divergence with the Von Neumann div + # D(p_1(y|x) || p_2(y|x)) + joint_div1 = von_neumann_divergence(Cx1y1, Cx2y2) + joint_div2 = von_neumann_divergence(Cx2y2, Cx1y1) + x_div1 = von_neumann_divergence(Cx1, Cx2) + x_div2 = von_neumann_divergence(Cx2, Cx1) + + # compute the conditional divergence, which is symmetric by construction + # 1/2 * (D(p_1(y|x) || p_2(y|x)) + D(p_2(y|x) || p_1(y|x))) + conditional_div = 1.0 / 2 * (joint_div1 - x_div1 + joint_div2 - x_div2) + return conditional_div diff --git a/pywhy_stats/discrepancy/kcd.py b/pywhy_stats/discrepancy/kcd.py new file mode 100644 index 0000000..427bc62 --- /dev/null +++ b/pywhy_stats/discrepancy/kcd.py @@ -0,0 +1,262 @@ +"""Kernel (conditional) discrepancy test. + +Also known as a conditional k-sample test, where the null hypothesis is that the +conditional distributions are equal across different population groups. + +Returns +------- +PValueResult + The result of the test, which includes the test statistic and pvalue. +""" + +from typing import Callable, Optional, Tuple + +import numpy as np +from numpy.typing import ArrayLike +from sklearn.base import BaseEstimator +from sklearn.preprocessing import LabelBinarizer + +from pywhy_stats.kernel_utils import ( + _default_regularization, + _preprocess_kernel_data, + compute_kernel, +) + +from ..pvalue_result import PValueResult +from .base import _compute_propensity_scores, _preprocess_propensity_data, compute_null + + +# XXX: determine if we can do this with Y being optional. +def condind( + X: ArrayLike, + Y: ArrayLike, + group_ind: ArrayLike, + kernel_X: Optional[Callable[[ArrayLike], ArrayLike]] = None, + kernel_Y: Optional[Callable[[ArrayLike], ArrayLike]] = None, + null_sample_size: int = 1000, + normalize_data: bool = True, + propensity_model=None, + propensity_weights=None, + centered: bool = True, + n_jobs: Optional[int] = None, + random_seed: Optional[int] = None, +) -> PValueResult: + """ + Test whether Y conditioned on X is invariant across the groups. + + For testing conditional independence on continuous data, we leverage kernels + :footcite:`Zhang2011` that are computationally efficient. This specifically + tests the (conditional) invariance null hypothesis :math:: + + P_{Z=1}(Y|X) = P_{Z=0}(Y|X) + + Parameters + ---------- + X : ArrayLike of shape (n_samples, n_features_x) + Data for variable X, which can be multidimensional. + Y : ArrayLike of shape (n_samples, n_features_y) + Data for variable Y, which can be multidimensional. + group_ind : ArrayLike of shape (n_samples,) + Data for group indicator Z, which can be multidimensional. This assigns each sample + to a group indicated by 0 or 1. + kernel_X : Callable[[ArrayLike], ArrayLike] + The kernel function for X. By default, the RBF kernel is used for continuous and the delta + kernel for categorical data. Note that we currently only consider string values as categorical data. + Kernels can be specified in the same way as for :func:`~sklearn.metrics.pairwise.pairwise_kernels` + with the addition that 'delta' kernel is supported for categorical data. + kernel_Y : Callable[[ArrayLike], ArrayLike] + The kernel function for Y. By default, the RBF kernel is used for continuous and the delta + kernel for categorical data. Note that we currently only consider string values as categorical data. + Kernels can be specified in the same way as for :func:`~sklearn.metrics.pairwise.pairwise_kernels` + with the addition that 'delta' kernel is supported for categorical data. + null_sample_size : int + The number of samples to generate for the bootstrap distribution to approximate the pvalue, + by default 1000. + normalize_data : bool + Whether the data should be standardized to unit variance, by default True. + propensity_model : Optional[sklearn.base.BaseEstimator], optional + The propensity model to use to estimate the propensity score, by default None. + propensity_weights : Optional[ArrayLike], optional + The propensity weights to use, by default None, which means that the propensity scores + will be estimated from the propensity_model. + centered : bool + Whether the kernel matrix should be centered, by default True. + n_jobs : Optional[int], optional + The number of jobs to run in parallel, by default None. + random_seed : Optional[int], optional + Random seed, by default None. + + Notes + ----- + Any callable can be given to create the kernel matrix. For instance, to use a particular kernel + from sklearn:: + + kernel_X = func:`sklearn.metrics.pairwise.pairwise_kernels.polynomial` + + In addition, we implement an efficient delta kernel. The delta kernel can be specified using the + 'kernel' string argument. + + References + ---------- + .. footbibliography:: + """ + test_statistic, pvalue = _kernel_test( + X=X, + Y=Y, + group_ind=group_ind, + kernel_X=kernel_X, + kernel_Y=kernel_Y, + propensity_weights=propensity_weights, + propensity_model=propensity_model, + null_sample_size=null_sample_size, + normalize_data=normalize_data, + centered=centered, + n_jobs=n_jobs, + random_seed=random_seed, + ) + return PValueResult(pvalue=pvalue, statistic=test_statistic) + + +def _kernel_test( + X: ArrayLike, + group_ind: ArrayLike, + Y: ArrayLike, + kernel_X: Optional[Callable[[np.ndarray], np.ndarray]], + kernel_Y: Optional[Callable[[np.ndarray], np.ndarray]], + propensity_weights: Optional[ArrayLike], + propensity_model: Optional[BaseEstimator], + null_sample_size: int, + normalize_data: bool, + centered: bool, + n_jobs: Optional[int], + random_seed: Optional[int], +) -> Tuple[float, float]: + X, Y, _ = _preprocess_kernel_data(X, Y, normalize_data=normalize_data) + _preprocess_propensity_data( + group_ind=group_ind, + propensity_weights=propensity_weights, + propensity_model=propensity_model, + ) + + enc = LabelBinarizer(neg_label=0, pos_label=1) + group_ind = enc.fit_transform(group_ind) + + # compute kernels in each data space + L = compute_kernel( + Y, + metric=kernel_Y, + centered=centered, + n_jobs=n_jobs, + ) + K = compute_kernel( + X, + metric=kernel_X, + centered=centered, + n_jobs=n_jobs, + ) + + # compute the test statistic + stat = _compute_test_statistic(K, L, group_ind) + + # compute propensity scores + e_hat = _compute_propensity_scores( + group_ind, + propensity_model=propensity_model, + propensity_weights=propensity_weights, + n_jobs=n_jobs, + random_state=random_seed, + K=K, + ) + + # now compute null distribution + null_dist = compute_null( + _compute_test_statistic, + e_hat, + X=K, + Y=L, + null_reps=null_sample_size, + seed=random_seed, + n_jobs=n_jobs, + ) + + # compute the pvalue + pvalue = (1 + np.sum(null_dist >= stat)) / (1 + null_sample_size) + return stat, pvalue + + +def _compute_test_statistic(K: ArrayLike, L: ArrayLike, group_ind: ArrayLike): + n_samples = len(K) + + # compute W matrices from K and z + W0, W1 = _compute_inverse_kernel(K, group_ind) + + # compute L kernels + group_ind = np.squeeze(group_ind) + first_mask = np.array(1 - group_ind, dtype=bool) + second_mask = np.array(group_ind, dtype=bool) + L0 = L[np.ix_(first_mask, first_mask)] + L1 = L[np.ix_(second_mask, second_mask)] + L01 = L[np.ix_(first_mask, second_mask)] + + # compute the final test statistic + K0 = K[:, first_mask] + K1 = K[:, second_mask] + KW0 = K0 @ W0 + KW1 = K1 @ W1 + + # compute the three terms in Lemma 4.4 + first_term = np.trace(KW0.T @ KW0 @ L0) + second_term = np.trace(KW1.T @ KW0 @ L01) + third_term = np.trace(KW1.T @ KW1 @ L1) + + # compute final statistic + stat = (first_term - 2 * second_term + third_term) / n_samples + return stat + + +def _compute_inverse_kernel(K, z) -> Tuple[ArrayLike, ArrayLike]: + """Compute W matrices as done in KCD test. + + Parameters + ---------- + K : ArrayLike of shape (n_samples, n_samples) + The kernel matrix. + z : ArrayLike of shape (n_samples) + The indicator variable of 1's and 0's for which samples belong + to which group. + + Returns + ------- + W0 : ArrayLike of shape (n_samples_i, n_samples_i) + The inverse of the kernel matrix from the first group. + W1 : NDArraArrayLike of shape (n_samples_j, n_samples_j) + The inverse of the kernel matrix from the second group. + + Notes + ----- + Compute the W matrix for the estimated conditional average in + the KCD test :footcite:`Park2021conditional`. + + References + ---------- + .. footbibliography:: + """ + # compute kernel matrices + z = np.squeeze(z) + first_mask = np.array(1 - z, dtype=bool) + second_mask = np.array(z, dtype=bool) + + K0 = K[np.ix_(first_mask, first_mask)] + K1 = K[np.ix_(second_mask, second_mask)] + + # compute regularization factors + regs_0 = _default_regularization(K0) + regs_1 = _default_regularization(K1) + + # compute the number of samples in each + n0 = int(np.sum(1 - z)) + n1 = int(np.sum(z)) + + W0 = np.linalg.inv(K0 + regs_0 * np.identity(n0)) + W1 = np.linalg.inv(K1 + regs_1 * np.identity(n1)) + return W0, W1 diff --git a/pywhy_stats/independence/__init__.py b/pywhy_stats/independence/__init__.py new file mode 100644 index 0000000..fcab339 --- /dev/null +++ b/pywhy_stats/independence/__init__.py @@ -0,0 +1 @@ +from . import fisherz, kci diff --git a/pywhy_stats/fisherz.py b/pywhy_stats/independence/fisherz.py similarity index 97% rename from pywhy_stats/fisherz.py rename to pywhy_stats/independence/fisherz.py index eb286e7..2126dd9 100644 --- a/pywhy_stats/fisherz.py +++ b/pywhy_stats/independence/fisherz.py @@ -4,7 +4,7 @@ It works on Gaussian random variables. When the data is not Gaussian, this test is not valid. In this case, we recommend -using the Kernel independence test at . +using the Kernel independence test `pywhy_stats.kci`. Examples -------- @@ -21,7 +21,7 @@ from numpy.typing import ArrayLike from scipy.stats import norm -from .pvalue_result import PValueResult +from pywhy_stats.pvalue_result import PValueResult def ind(X: ArrayLike, Y: ArrayLike, correlation_matrix: Optional[ArrayLike] = None) -> PValueResult: diff --git a/pywhy_stats/kci.py b/pywhy_stats/independence/kci.py similarity index 82% rename from pywhy_stats/kci.py rename to pywhy_stats/independence/kci.py index 866c07d..57d7024 100644 --- a/pywhy_stats/kci.py +++ b/pywhy_stats/independence/kci.py @@ -1,15 +1,23 @@ -from functools import partial +"""Independence test using Kernel test. + +Examples +-------- +>>> import pywhy_stats as ps +>>> res = ps.kci.ind([1, 2, 3], [4, 5, 6]) +>>> print(res.pvalue) +>>> 1.0 +""" from typing import Callable, Optional, Tuple import numpy as np -from numpy._typing import ArrayLike +from numpy.typing import ArrayLike from scipy import stats -from sklearn.metrics.pairwise import rbf_kernel -from pywhy_stats.kernels import delta_kernel, estimate_squared_sigma_rbf -from pywhy_stats.pvalue_result import PValueResult +from pywhy_stats.kernel_utils import _preprocess_kernel_data, compute_kernel from pywhy_stats.utils import preserve_random_state +from ..pvalue_result import PValueResult + @preserve_random_state def ind( @@ -20,7 +28,9 @@ def ind( approx: bool = True, null_sample_size: int = 1000, threshold: float = 1e-5, + centered: bool = True, normalize_data: bool = True, + n_jobs: Optional[int] = None, random_seed: Optional[int] = None, ) -> PValueResult: """ @@ -49,8 +59,12 @@ def ind( threshold : float The threshold set on the value of eigenvalues, by default 1e-5. Used to regularize the method. + centered : bool + Whether to center the kernel matrix or not, by default True. normalize_data : bool Whether the data should be standardized to unit variance, by default True. + n_jobs : Optional[int], optional + The number of jobs to run computations in parallel, by default None. random_seed : Optional[int], optional Random seed, by default None. @@ -66,7 +80,18 @@ def ind( .. footbibliography:: """ test_statistic, pvalue = _kernel_test( - X, Y, None, kernel_X, kernel_Y, None, approx, null_sample_size, threshold, normalize_data + X, + Y, + None, + kernel_X, + kernel_Y, + None, + approx, + null_sample_size, + threshold, + centered=centered, + normalize_data=normalize_data, + n_jobs=n_jobs, ) return PValueResult(pvalue=pvalue, statistic=test_statistic) @@ -82,7 +107,9 @@ def condind( approx: bool = True, null_sample_size: int = 1000, threshold: float = 1e-5, + centered: bool = True, normalize_data: bool = True, + n_jobs: Optional[int] = None, random_seed: Optional[int] = None, ) -> PValueResult: """ @@ -106,12 +133,18 @@ def condind( kernel_X : Callable[[ArrayLike], ArrayLike] The kernel function for X. By default, the RBF kernel is used for continuous and the delta kernel for categorical data. Note that we currently only consider string values as categorical data. + Kernels can be specified in the same way as for :func:`~sklearn.metrics.pairwise.pairwise_kernels` + with the addition that 'delta' kernel is supported for categorical data. kernel_Y : Callable[[ArrayLike], ArrayLike] The kernel function for Y. By default, the RBF kernel is used for continuous and the delta kernel for categorical data. Note that we currently only consider string values as categorical data. + Kernels can be specified in the same way as for :func:`~sklearn.metrics.pairwise.pairwise_kernels` + with the addition that 'delta' kernel is supported for categorical data. kernel_Z : Callable[[ArrayLike], ArrayLike] The kernel function for Z. By default, the RBF kernel is used for continuous and the delta kernel for categorical data. Note that we currently only consider string values as categorical data. + Kernels can be specified in the same way as for :func:`~sklearn.metrics.pairwise.pairwise_kernels` + with the addition that 'delta' kernel is supported for categorical data. approx : bool Whether to use the Gamma distribution approximation for the pvalue, by default True. null_sample_size : int @@ -120,8 +153,12 @@ def condind( threshold : float The threshold set on the value of eigenvalues, by default 1e-5. Used to regularize the method. + centered : bool + Whether to center the kernel matrices, by default True. normalize_data : bool Whether the data should be standardized to unit variance, by default True. + n_jobs : Optional[int], optional + The number of jobs to run computations in parallel, by default None. random_seed : Optional[int], optional Random seed, by default None. @@ -132,12 +169,26 @@ def condind( kernel_X = func:`sklearn.metrics.pairwise.pairwise_kernels.polynomial` + In addition, we implement an efficient delta kernel. The delta kernel can be specified using the + 'kernel' string argument. + References ---------- .. footbibliography:: """ test_statistic, pvalue = _kernel_test( - X, Y, Z, kernel_X, kernel_Y, kernel_Z, approx, null_sample_size, threshold, normalize_data + X, + Y, + Z, + kernel_X, + kernel_Y, + kernel_Z, + approx, + null_sample_size, + threshold, + centered=centered, + normalize_data=normalize_data, + n_jobs=n_jobs, ) return PValueResult(pvalue=pvalue, statistic=test_statistic) @@ -153,40 +204,19 @@ def _kernel_test( null_sample_size: int, threshold: float, normalize_data: bool, + centered: bool, + n_jobs: Optional[int], ) -> Tuple[float, float]: - if X.ndim == 1: - X = X.reshape(-1, 1) - if Y.ndim == 1: - Y = Y.reshape(-1, 1) - if Z is not None and Z.ndim == 1: - Z = Z.reshape(-1, 1) - - if normalize_data: - # first normalize the data to have zero mean and unit variance - # along the columns of the data - X = _normalize_data(X) - Y = _normalize_data(Y) - if Z is not None: - Z = _normalize_data(Z) - - if kernel_X is None: - kernel_X = _get_default_kernel(X) - if kernel_Y is None: - kernel_Y = _get_default_kernel(Y) - - Kx = kernel_X(X) - Ky = kernel_Y(Y) + X, Y, Z = _preprocess_kernel_data(X, Y, Z, normalize_data) + + # compute kernels in each data space if Z is not None: - if kernel_Z is None: - kernel_Z = _get_default_kernel(Z) - Kz = kernel_Z(Z) - # Equivalent to concatenating them beforehand. - # However, here we can then have individual kernels. - Kx = Kx * Kz - Kz = _fast_centering(Kz) + Kz = compute_kernel(Z, metric=kernel_Z, centered=centered, n_jobs=n_jobs) - Kx = _fast_centering(Kx) - Ky = _fast_centering(Ky) + # concatenate the (X, Z) data to compute the K_xz kernel + X = np.concatenate((X, Z), axis=1) + Kx = compute_kernel(X, metric=kernel_X, centered=centered, n_jobs=n_jobs) + Ky = compute_kernel(Y, metric=kernel_Y, centered=centered, n_jobs=n_jobs) if Z is None: return _ind(Kx, Ky, approx, null_sample_size, threshold) @@ -365,38 +395,3 @@ def _compute_null_ci(uu_prod, n_samples, threshold): # of chi-squared random variables weighted by the eigenvalue products null_dist = eig_uu.T.dot(f_rand) return null_dist - - -def _fast_centering(k: np.ndarray) -> np.ndarray: - """ - Compute centered kernel matrix in time O(n^2). - - The centered kernel matrix is defined as K_c = H @ K @ H, with - H = identity - 1/ n * ones(n,n). Computing H @ K @ H via matrix multiplication scales with n^3. - The implementation circumvents this and runs in time n^2. - - Originally authored by Jonas Kuebler - """ - n = len(k) - return ( - k - - 1 / n * np.outer(np.ones(n), np.sum(k, axis=0)) - - 1 / n * np.outer(np.sum(k, axis=1), np.ones(n)) - + 1 / n**2 * np.sum(k) * np.ones((n, n)) - ) - - -def _get_default_kernel(X: np.ndarray) -> Callable[[np.ndarray], np.ndarray]: - if X.dtype.type != np.str_: - return partial(rbf_kernel, gamma=0.5 * estimate_squared_sigma_rbf(X)) - else: - return delta_kernel - - -def _normalize_data(X: np.ndarray) -> np.ndarray: - for column in range(X.shape[1]): - if isinstance(X[0, column], int) or isinstance(X[0, column], float): - X[:, column] = stats.zscore(X[:, column]) - X[:, column] = np.nan_to_num(X[:, column]) # in case some dim of X is constant - - return X diff --git a/pywhy_stats/kernel_utils.py b/pywhy_stats/kernel_utils.py new file mode 100644 index 0000000..f05cbb8 --- /dev/null +++ b/pywhy_stats/kernel_utils.py @@ -0,0 +1,281 @@ +import inspect +from typing import Callable, Dict, Optional, Tuple, Union + +import numpy as np +from numpy.typing import ArrayLike +from scipy import stats +from scipy.linalg import logm +from scipy.optimize import minimize_scalar +from sklearn.metrics import pairwise_kernels +from sklearn.metrics.pairwise import PAIRWISE_KERNEL_FUNCTIONS +from sklearn.preprocessing import LabelEncoder + +from pywhy_stats.kernels import delta_kernel, estimate_squared_sigma_rbf + +# Note that this is added to the list of possible kernels for :func:`~sklearn.metrics.pairwise.pairwise_kernels`. +# because it is more efficient to compute the kernel over the entire matrices at once +# since numpy has vectorized operations. +PAIRWISE_KERNEL_FUNCTIONS["delta"] = delta_kernel + + +def _default_regularization(K: ArrayLike) -> float: + """Compute a default regularization for Kernel Logistic Regression. + + Parameters + ---------- + K : ArrayLike of shape (n_samples, n_samples) + The kernel matrix. + + Returns + ------- + x : float + The default l2 regularization term. + """ + n_samples = K.shape[0] + svals = np.linalg.svd(K, compute_uv=False, hermitian=True) + res = minimize_scalar( + lambda reg: np.sum(svals**2 / (svals + reg) ** 2) / n_samples + reg, + bounds=(0.0001, 1000), + method="bounded", + ) + return res.x + + +def _fast_centering(k: ArrayLike) -> ArrayLike: + """ + Compute centered kernel matrix in time O(n^2). + + The centered kernel matrix is defined as K_c = H @ K @ H, with + H = identity - 1/ n * ones(n,n). Computing H @ K @ H via matrix multiplication scales with n^3. + The implementation circumvents this and runs in time n^2. + + Originally authored by Jonas Kuebler + """ + n = len(k) + return ( + k + - 1 / n * np.outer(np.ones(n), np.sum(k, axis=0)) + - 1 / n * np.outer(np.sum(k, axis=1), np.ones(n)) + + 1 / n**2 * np.sum(k) * np.ones((n, n)) + ) + + +def _get_default_kernel(X: ArrayLike) -> Tuple[str, Dict]: + """Attempt to infer the kernel function from the data type of X. + + Parameters + ---------- + X : ArrayLike + Data array. If the data type is not string, the RBF kernel is used. + If data type is string, the delta kernel is used. + + Returns + ------- + Callable[[ArrayLike], ArrayLike] + The kernel function. + """ + if X.dtype.type not in (np.str_, np.object_): + return "rbf", {"gamma": 0.5 * estimate_squared_sigma_rbf(X)} + else: + return "delta", dict() + + +def _normalize_data(X: ArrayLike) -> ArrayLike: + """Normalize data to zero mean and unit variance.""" + for column in range(X.shape[1]): + if isinstance(X[0, column], int) or isinstance(X[0, column], float): + X[:, column] = stats.zscore(X[:, column]) + X[:, column] = np.nan_to_num(X[:, column]) # in case some dim of X is constant + return X + + +def compute_kernel( + X: ArrayLike, + Y: Optional[ArrayLike] = None, + metric: Optional[Union[Callable, str]] = None, + centered: bool = True, + n_jobs: Optional[int] = None, +) -> Tuple[ArrayLike, float]: + """Compute a kernel matrix and corresponding width. + + Also optionally estimates the kernel width parameter. + + Parameters + ---------- + X : ArrayLike of shape (n_samples_X, n_features_X) + The X array. + Y : ArrayLike of shape (n_samples_Y, n_features_Y), optional + The Y array, by default None. + metric : str, optional + The metric to compute the kernel function, by default 'rbf'. + Can be any string as defined in + :func:`sklearn.metrics.pairwise.pairwise_kernels`. Note 'rbf' + and 'gaussian' are the same metric. + centered : bool, optional + Whether to center the kernel matrix or not, by default True. + When centered, the kernel matrix induces a zero mean. The main purpose of + centering is to remove the bias or mean shift from the data represented + by the kernel matrix. + n_jobs : int, optional + The number of jobs to run computations in parallel, by default None. + + Returns + ------- + kernel : ArrayLike of shape (n_samples_X, n_samples_X) or (n_samples_X, n_samples_Y) + The kernel matrix. + """ + # if the width of the kernel is not set, then use the median trick to set the + # kernel width based on the data X + if metric is None: + metric, kernel_params = _get_default_kernel(X) + else: + kernel_params = dict() + + # compute the potentially pairwise kernel matrix + # If the number of arguments is just one, then we bypass the pairwise kernel + # optimized computation via sklearn and opt to use the metric function directly + if callable(metric) and len(inspect.getfullargspec(metric).args) == 1: + kernel = metric(X) + else: + kernel = pairwise_kernels( + X, Y=Y, metric=metric, n_jobs=n_jobs, filter_params=False, **kernel_params + ) + + if centered: + kernel = _fast_centering(kernel) + return kernel + + +def corrent_matrix( + data: ArrayLike, + metric: Optional[Union[str, Callable[[ArrayLike], ArrayLike]]] = None, + centered: bool = True, + n_jobs: Optional[int] = None, +) -> ArrayLike: + r"""Compute the centered correntropy of a matrix. + + Parameters + ---------- + data : ArrayLike of shape (n_samples, n_features) + The data. + metric : str + The kernel metric. + centered : bool, optional + Whether to center the kernel matrix or not, by default True. + n_jobs : int, optional + The number of jobs to run computations in parallel, by default None. + + Returns + ------- + data : ArrayLike of shape (n_features, n_features) + A symmetric centered correntropy matrix of the data. + + Notes + ----- + The estimator for the correntropy array is given by the formula + :math:`1 / N \\sum_{i=1}^N k(x_i, y_i) - 1 / N**2 \\sum_{i=1}^N \\sum_{j=1}^N k(x_i, y_j)`. + The first term is the estimate, and the second term is the bias, and together they form + an unbiased estimate. + """ + n_samples, n_features = data.shape + corren_arr = np.zeros(shape=(n_features, n_features)) + + # compute kernel between each feature, which is now (n_features, n_features) array + for idx in range(n_features): + for jdx in range(idx + 1): + K = compute_kernel( + X=data[:, [idx]], + Y=data[:, [jdx]], + metric=metric, + centered=centered, + n_jobs=n_jobs, + ) + + # compute the bias due to finite-samples + bias = np.sum(K) / n_samples**2 + + # compute the sample centered correntropy + corren = (1.0 / n_samples) * np.trace(K) - bias + + corren_arr[idx, jdx] = corren_arr[jdx, idx] = corren + return corren_arr + + +def von_neumann_divergence(A: ArrayLike, B: ArrayLike) -> float: + """Compute Von Neumann divergence between two PSD matrices. + + Parameters + ---------- + A : ArrayLike of shape (n_features, n_features) + The first PSD matrix. + B : ArrayLike of shape (n_features, n_features) + The second PSD matrix. + + Returns + ------- + div : float + The divergence value. + + Notes + ----- + The Von Neumann divergence, or what is known as the Bregman divergence in + :footcite:`Yu2020Bregman` is computed as follows with + :math:`D(A || B) = Tr(A (log(A) - log(B)) - A + B)`. + """ + div = np.trace(A.dot(logm(A) - logm(B)) - A + B) + return div + + +def _preprocess_kernel_data( + X: ArrayLike, + Y: ArrayLike, + Z: Optional[ArrayLike] = None, + normalize_data: bool = True, +): + """Preprocess the data for kernel methods. + + Parameters + ---------- + X : ArrayLike of shape (n_samples_X, n_features_X) + The X array. + Y : ArrayLike of shape (n_samples_Y, n_features_Y) + The Y array. + Z : ArrayLike of shape (n_samples_Z, n_features_Z), optional + The Z array, by default None. + normalize_data : bool + Whether to standard-normalize the data or not. + """ + X = np.array(X) + Y = np.array(Y) + if Z is not None: + Z = np.array(Z) + + if X.ndim == 1: + X = X.reshape(-1, 1) + if Y.ndim == 1: + Y = Y.reshape(-1, 1) + if Z is not None and Z.ndim == 1: + Z = Z.reshape(-1, 1) + + # handle strings as categorical data automatically + if X.dtype.type in (np.str_, np.object_, np.unicode_): + enc = LabelEncoder() + for idx in range(X.shape[1]): + X[:, idx] = enc.fit_transform(X[:, idx]) + if Y.dtype.type in (np.str_, np.object_, np.unicode_): + enc = LabelEncoder() + for idx in range(Y.shape[1]): + Y[:, idx] = enc.fit_transform(Y[:, idx]) + if Z is not None and Z.dtype.type in (np.str_, np.object_, np.unicode_): + enc = LabelEncoder() + for idx in range(Z.shape[1]): + Z[:, idx] = enc.fit_transform(Z[:, idx]) + + if normalize_data: + # first normalize the data to have zero mean and unit variance + # along the columns of the data + X = _normalize_data(X) + Y = _normalize_data(Y) + if Z is not None: + Z = _normalize_data(Z) + return X, Y, Z diff --git a/pywhy_stats/kernels.py b/pywhy_stats/kernels.py index 1bc4763..660f97b 100644 --- a/pywhy_stats/kernels.py +++ b/pywhy_stats/kernels.py @@ -2,31 +2,36 @@ from numpy._typing import ArrayLike from scipy.stats import iqr from sklearn.metrics import pairwise_distances +from sklearn.metrics.pairwise import check_pairwise_arrays -def delta_kernel(X: np.ndarray) -> np.ndarray: +def delta_kernel(X: ArrayLike, Y=None) -> ArrayLike: """Delta kernel for categorical values. This is, the similarity is 1 if the values are equal and 0 otherwise. Parameters ---------- - X : ArrayLike of shape (n_samples, n_columns) + X : ArrayLike of shape (n_samples, n_dimensions_x) Input data. + Y : ArrayLike of shape (n_samples, n_dimensions_y), optional + By default None. Returns ------- result : ArrayLike of shape (n_samples, n_samples) The resulting kernel matrix after applying the delta kernel. """ - if X.ndim == 1: - X = X.reshape(-1, 1) - - return ( - np.array(list(map(lambda value: value == X, X))) - .reshape(X.shape[0], X.shape[0]) - .astype(np.float32) - ) + X, Y = check_pairwise_arrays(X, Y) + + if Y is None: + return _delta_kernel(X, X) + else: + return _delta_kernel(X, Y) + + +def _delta_kernel(X, Y): + return np.equal(X[:, np.newaxis], Y).all(axis=-1).astype(int) def estimate_squared_sigma_rbf( diff --git a/tests/discrepancy/__init__.py b/tests/discrepancy/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/discrepancy/test_base.py b/tests/discrepancy/test_base.py new file mode 100644 index 0000000..b6af01e --- /dev/null +++ b/tests/discrepancy/test_base.py @@ -0,0 +1,29 @@ +import numpy as np +import pytest +from numpy.testing import assert_array_almost_equal + +from pywhy_stats.discrepancy.base import compute_null + + +# Define a dummy test statistic function for testing +def _test_statistic(X, Y, group_ind, **kwargs): + # Return the sum of X and Y as the test statistic + return np.sum(X[group_ind]) + np.sum(Y[group_ind]) + + +def test_compute_null(): + # Set up test data + e_hat = np.array([0.7, 0.5, 0.3]) + X = np.array([[1, 2], [3, 4], [5, 6]]) + Y = np.array([[7, 8], [9, 10], [11, 12]]) + + rng = np.random.default_rng() + + # Call the compute_null function + null_dist = compute_null(_test_statistic, e_hat, X, Y, null_reps=10, n_jobs=-1, seed=rng) + + # Check the shape of the null_dist array + assert null_dist.shape == (10,) + + # The null distribution should correctly sample some variation + assert len(np.unique(null_dist)) > 1 diff --git a/tests/discrepancy/test_cd.py b/tests/discrepancy/test_cd.py new file mode 100644 index 0000000..2d5b2ba --- /dev/null +++ b/tests/discrepancy/test_cd.py @@ -0,0 +1,233 @@ +import numpy as np +import pandas as pd +import pytest +from flaky import flaky +from sklearn.ensemble import RandomForestClassifier + +from pywhy_stats import bregman, kcd + +seed = 12345 +rng = np.random.default_rng(seed) +# number of samples to use in generating test dataset; the lower the faster +n_samples = 150 + + +def _single_env_scm(n_samples=200, offset=0.0, seed=None): + # We construct a SCM where X1 -> Y <- X and Y -> Z + # so X1 is independent from X, but conditionally dependent + # given Y or Z + rng = np.random.default_rng(seed) + + X = rng.standard_normal((n_samples, 1)) + offset + X1 = rng.standard_normal((n_samples, 1)) + offset + Y = X + X1 + 0.05 * rng.standard_normal((n_samples, 1)) + Z = Y + 0.05 * rng.standard_normal((n_samples, 1)) + + # create input for the CD test + df = pd.DataFrame(np.hstack((X, X1, Y, Z)), columns=["x", "x1", "y", "z"]) + + # assign groups randomly + df["group"] = rng.choice([0, 1], size=len(df)) + return df + + +def _multi_env_scm(n_samples=100, offset=1.5, seed=None): + df = _single_env_scm(n_samples=n_samples, seed=seed) + df["group"] = 0 + + new_df = _single_env_scm(n_samples=n_samples, offset=offset, seed=seed) + new_df["group"] = 1 + df = pd.concat((df, new_df), axis=0) + return df + + +@pytest.mark.parametrize( + "cd_func", + [ + kcd, + bregman, + ], +) +def test_cd_tests_error(cd_func): + x = "x" + y = "y" + sample_df = _single_env_scm(n_samples=10, seed=seed) + Y = sample_df[y].to_numpy() + X = sample_df[x].to_numpy() + group_ind = sample_df[["x", "group"]].to_numpy() + + with pytest.raises(RuntimeError, match="group_ind must be a 1d array"): + cd_func.condind( + X=X, + Y=Y, + group_ind=group_ind, + random_seed=seed, + ) + + # all the group indicators have different values now from 0/1 + sample_df["group"] = sample_df["group"] + 3 + group_ind = sample_df[["group"]].to_numpy() + group_ind = rng.integers(0, 10, size=len(group_ind)) + + with pytest.raises(RuntimeError, match="There should only be two groups"): + cd_func.condind( + X=X, + Y=Y, + group_ind=group_ind, + random_seed=seed, + ) + + group_ind = sample_df[["group"]].to_numpy().squeeze() + + # test pre-fit propensity scores, or custom propensity model + with pytest.raises( + ValueError, match="Both propensity model and propensity estimates are specified" + ): + cd_func.condind( + X=X, + Y=Y, + group_ind=group_ind, + propensity_model=RandomForestClassifier(), + propensity_weights=[0.5, 0.5], + random_seed=seed, + ) + + Y = sample_df[y].to_numpy() + X = sample_df[x].to_numpy() + with pytest.raises(ValueError, match="There are 3 group pre-defined estimates"): + cd_func.condind( + X=X, + Y=Y, + group_ind=group_ind, + propensity_weights=np.ones((10, 3)) * 0.5, + random_seed=seed, + ) + + with pytest.raises(ValueError, match="There are 100 pre-defined estimates"): + cd_func.condind( + X=X, + Y=Y, + group_ind=group_ind, + propensity_weights=np.ones((100, 2)) * 0.5, + random_seed=seed, + ) + + +@pytest.mark.parametrize( + ["cd_func", "cd_kwargs"], + [ + [bregman, dict()], + [kcd, dict()], + [bregman, {"propensity_model": RandomForestClassifier(n_estimators=50, random_state=seed)}], + [bregman, {"propensity_weights": np.ones((n_samples, 2)) * 0.5}], + [kcd, {"propensity_model": RandomForestClassifier(n_estimators=50, random_state=seed)}], + [kcd, {"propensity_weights": np.ones((n_samples, 2)) * 0.5}], + ], +) +def test_given_single_environment_when_conditional_ksample_fail_to_reject_null(cd_func, cd_kwargs): + """Test conditional discrepancy tests on a single environment. + + Here the distributions of each variable are the same across groups of data. + """ + df = _single_env_scm(n_samples=n_samples, offset=2.0, seed=rng) + + group_col = "group" + alpha = 0.1 + null_sample_size = 25 + + res = cd_func.condind( + X=df["x"], + Y=df["x1"], + group_ind=df[group_col], + null_sample_size=null_sample_size, + **cd_kwargs, + random_seed=seed, + ) + assert res.pvalue > alpha, f"Fails with {res.pvalue} not greater than {alpha}" + res = cd_func.condind( + X=df["x"], + Y=df["z"], + group_ind=df[group_col], + null_sample_size=null_sample_size, + n_jobs=-1, + **cd_kwargs, + random_seed=seed, + ) + assert res.pvalue > alpha, f"Fails with {res.pvalue} not greater than {alpha}" + res = cd_func.condind( + X=df["x"], + Y=df["y"], + group_ind=df[group_col], + null_sample_size=null_sample_size, + n_jobs=-1, + **cd_kwargs, + random_seed=seed, + ) + assert res.pvalue > alpha, f"Fails with {res.pvalue} not greater than {alpha}" + + +@pytest.mark.parametrize( + ["cd_func", "cd_kwargs"], + [ + [bregman, dict()], + [kcd, dict()], + [bregman, {"propensity_model": RandomForestClassifier(n_estimators=50, random_state=seed)}], + [kcd, {"propensity_model": RandomForestClassifier(n_estimators=50, random_state=seed)}], + ], +) +def test_cd_simulation_multi_environment(cd_func, cd_kwargs): + """Test conditional discrepancy tests with multiple environments. + + In this setting, we form a selection diagram + + X1 -> Y <- X and Y -> Z, with S-nodes pointing to X1 and X. That means + the distributions of X1 and X are different across groups. Therefore + we should reject the null hypothesis, except for P(Z | Y). + """ + df = _multi_env_scm(n_samples=n_samples // 2, offset=2.0, seed=rng) + + group_col = "group" + alpha = 0.1 + null_sample_size = 20 + + res = cd_func.condind( + X=df[["x"]], + Y=df[["z"]].copy(), + group_ind=df[group_col], + null_sample_size=null_sample_size, + n_jobs=-1, + random_seed=seed, + **cd_kwargs, + ) + assert res.pvalue < alpha, f"Fails with {res.pvalue} not less than {alpha}" + res = cd_func.condind( + X=df["x"], + Y=df["y"], + group_ind=df[group_col], + null_sample_size=null_sample_size, + n_jobs=-1, + random_seed=seed, + **cd_kwargs, + ) + assert res.pvalue < alpha, f"Fails with {res.pvalue} not less than {alpha}" + res = cd_func.condind( + X=df["x1"], + Y=df["z"], + group_ind=df[group_col], + null_sample_size=null_sample_size, + n_jobs=-1, + random_seed=seed, + **cd_kwargs, + ) + assert res.pvalue < alpha, f"Fails with {res.pvalue} not less than {alpha}" + + res = cd_func.condind( + X=df[["x", "x1"]], + Y=df[["z"]].copy(), + group_ind=df[group_col], + null_sample_size=null_sample_size, + n_jobs=-1, + random_seed=seed, + **cd_kwargs, + ) + assert res.pvalue > alpha, f"Fails with {res.pvalue} not less than {alpha}" diff --git a/tests/test_kci.py b/tests/test_kci.py index a1df2bb..16bf9f2 100644 --- a/tests/test_kci.py +++ b/tests/test_kci.py @@ -1,6 +1,6 @@ import numpy as np from flaky import flaky -from sklearn.metrics.pairwise import linear_kernel, polynomial_kernel, rbf_kernel +from sklearn.metrics.pairwise import rbf_kernel from pywhy_stats import kci from pywhy_stats.kernels import delta_kernel @@ -8,12 +8,15 @@ #################################################### # Unconditional independence tests - Continuous #################################################### +seed = 12345 +rng = np.random.default_rng(seed) @flaky(max_runs=3) def test_given_continuous_independent_data_when_perform_kernel_based_test_then_not_reject(): - x = np.random.randn(1000, 1) - y = np.exp(np.random.rand(1000, 1)) + n_samples = 200 + x = rng.standard_normal((200, 1)) + y = np.exp(rng.uniform(size=(200, 1))) assert kci.ind(x, y, approx=True).pvalue > 0.05 assert kci.ind(x, y, approx=False).pvalue > 0.05 @@ -27,9 +30,9 @@ def test_given_continuous_dependent_data_when_perform_kernel_based_test_then_rej X <- Z -> Y """ - z = np.random.randn(1000, 1) - x = np.exp(z + np.random.rand(1000, 1)) - y = np.exp(z + np.random.rand(1000, 1)) + z = rng.standard_normal((1000, 1)) + x = np.exp(z + rng.uniform(size=(1000, 1))) + y = np.exp(z + rng.uniform(size=(1000, 1))) assert kci.ind(x, y, approx=True).pvalue < 0.05 assert kci.ind(x, y, approx=False).pvalue < 0.05 @@ -42,9 +45,10 @@ def test_given_continuous_dependent_data_when_perform_kernel_based_test_then_rej @flaky(max_runs=3) def test_given_continuous_conditionally_independent_data_when_perform_kernel_based_test_then_not_reject(): - z = np.random.randn(1000, 1) - x = np.exp(z + np.random.rand(1000, 1)) - y = np.exp(z + np.random.rand(1000, 1)) + n_samples = 200 + z = rng.standard_normal((n_samples, 1)) + x = np.exp(z + rng.uniform(size=(n_samples, 1))) + y = np.exp(z + rng.uniform(size=(n_samples, 1))) assert kci.condind(x, y, z, approx=True).pvalue > 0.05 assert kci.condind(x, y, z, approx=False).pvalue > 0.05 @@ -52,10 +56,11 @@ def test_given_continuous_conditionally_independent_data_when_perform_kernel_bas @flaky(max_runs=3) def test_given_continuous_conditionally_dependent_data_when_perform_kernel_based_test_then_reject(): - z = np.random.randn(1000, 1) - w = np.random.randn(1000, 1) - x = np.exp(z + np.random.rand(1000, 1)) - y = np.exp(z + np.random.rand(1000, 1)) + n_samples = 200 + z = rng.standard_normal((n_samples, 1)) + w = rng.standard_normal((n_samples, 1)) + x = np.exp(z + rng.standard_normal(size=(n_samples, 1))) + y = np.exp(z + rng.standard_normal(size=(n_samples, 1))) assert kci.condind(x, y, w, approx=True).pvalue < 0.05 assert kci.condind(x, y, w, approx=False).pvalue < 0.05 @@ -68,8 +73,9 @@ def test_given_continuous_conditionally_dependent_data_when_perform_kernel_based @flaky(max_runs=3) def test_given_categorical_independent_data_when_perform_kernel_based_test_then_not_reject(): - x = np.random.normal(0, 1, 1000) - y = (np.random.choice(2, 1000) == 1).astype(str) + n_samples = 200 + x = rng.normal(0, 1, n_samples) + y = (rng.choice(2, n_samples) == 1).astype(str) assert kci.ind(x, y, approx=True).pvalue > 0.05 assert kci.ind(x, y, approx=False).pvalue > 0.05 @@ -77,7 +83,8 @@ def test_given_categorical_independent_data_when_perform_kernel_based_test_then_ @flaky(max_runs=3) def test_given_categorical_dependent_data_when_perform_kernel_based_test_then_reject(): - x = np.random.normal(0, 1, 1000) + n_samples = 200 + x = rng.normal(0, 1, n_samples) y = [] for v in x: @@ -93,7 +100,8 @@ def test_given_categorical_dependent_data_when_perform_kernel_based_test_then_re @flaky(max_runs=3) def test_given_dependent_mixed_data_types_when_perform_kernel_based_test_then_reject(): - x = np.random.normal(0, 1, 1000) + n_samples = 200 + x = rng.normal(0, 1, n_samples) y = [] for v in x: @@ -103,7 +111,7 @@ def test_given_dependent_mixed_data_types_when_perform_kernel_based_test_then_re y.append(1) y = np.array(y).astype(str) - w = np.random.normal(0, 1, 1000) + w = rng.normal(0, 1, n_samples) y = np.vstack([y, w]).T def my_custom_kernel(X): @@ -122,14 +130,15 @@ def my_custom_kernel(X): @flaky(max_runs=3) def test_given_categorical_conditionally_independent_data_when_perform_kernel_based_test_then_not_reject(): - x = np.random.normal(0, 1, 1000) + n_samples = 200 + x = rng.normal(0, 1, n_samples) z = [] for v in x: if v > 0: z.append(0) else: z.append(1) - y = z + np.random.randn(len(z)) + y = z + rng.standard_normal(len(z)) z = np.array(z).astype(str) z[z == "0"] = "Class 1" z[z == "1"] = "Class 2" @@ -140,14 +149,15 @@ def test_given_categorical_conditionally_independent_data_when_perform_kernel_ba @flaky(max_runs=3) def test_given_categorical_conditionally_dependent_data_when_perform_kernel_based_test_then_reject(): - x = np.random.normal(0, 1, 1000) + n_samples = 200 + x = rng.normal(0, 1, 200) z = [] for v in x: if v > 0: z.append(0) else: z.append(1) - y = z + np.random.randn(len(z)) + y = z + rng.standard_normal(len(z)) z = np.array(z).astype(str) z[z == "0"] = "Class 1" z[z == "1"] = "Class 2" @@ -158,18 +168,19 @@ def test_given_categorical_conditionally_dependent_data_when_perform_kernel_base @flaky(max_runs=3) def test_given_conditionally_dependent_mixed_data_types_with_custom_kernel_when_perform_kernel_based_test_then_reject(): - x = np.random.normal(0, 1, 1000) + n_samples = 500 + x = rng.standard_normal(n_samples) z = [] for v in x: if v > 0: z.append(0) else: z.append(1) - y = z + np.random.randn(len(z)) + y = z + rng.normal(0, 5, len(z)) z = np.array(z).astype(str) z[z == "0"] = "Class 1" z[z == "1"] = "Class 2" - w = np.random.normal(0, 1, 1000) + w = rng.standard_normal(n_samples) z = np.vstack([z, w]).T def my_custom_kernel(X): @@ -188,38 +199,35 @@ def my_custom_kernel(X): @flaky(max_runs=3) def test_given_gaussian_data_and_linear_kernel_when_perform_kernel_based_test_then_returns_expected_result(): - X = np.random.randn(300, 1) - X1 = np.random.randn(300, 1) - Y = X + X1 + 0.5 * np.random.randn(300, 1) - Z = Y + 0.5 * np.random.randn(300, 1) + X = rng.standard_normal((300, 1)) + X1 = rng.standard_normal((300, 1)) + Y = X + X1 + 0.5 * rng.standard_normal((300, 1)) + Z = Y + 0.5 * rng.standard_normal((300, 1)) - assert kci.ind(X, X1, kernel_X=linear_kernel, kernel_Y=linear_kernel).pvalue > 0.05 - assert kci.ind(X, Z, kernel_X=linear_kernel, kernel_Y=linear_kernel).pvalue < 0.05 + assert kci.ind(X, X1, kernel_X="linear", kernel_Y="linear").pvalue > 0.05 + assert kci.ind(X, Z, kernel_X="linear", kernel_Y="linear").pvalue < 0.05 assert ( - kci.condind( - X, Z, Y, kernel_X=linear_kernel, kernel_Y=linear_kernel, kernel_Z=linear_kernel - ).pvalue - > 0.05 + kci.condind(X, Z, Y, kernel_X="linear", kernel_Y="linear", kernel_Z="linear").pvalue > 0.05 ) @flaky(max_runs=3) def test_given_gaussian_data_and_polynomial_kernel_when_perform_kernel_based_test_then_returns_expected_result(): - X = np.random.randn(300, 1) - X1 = np.random.randn(300, 1) - Y = X + X1 + 0.5 * np.random.randn(300, 1) - Z = Y + 0.5 * np.random.randn(300, 1) + X = rng.standard_normal((300, 1)) + X1 = rng.standard_normal((300, 1)) + Y = X + X1 + 0.5 * rng.standard_normal((300, 1)) + Z = Y + 0.5 * rng.standard_normal((300, 1)) - assert kci.ind(X, X1, kernel_X=polynomial_kernel, kernel_Y=polynomial_kernel).pvalue > 0.05 - assert kci.ind(X, Z, kernel_X=polynomial_kernel, kernel_Y=polynomial_kernel).pvalue < 0.05 + assert kci.ind(X, X1, kernel_X="polynomial", kernel_Y="polynomial").pvalue > 0.05 + assert kci.ind(X, Z, kernel_X="polynomial", kernel_Y="polynomial").pvalue < 0.05 assert ( kci.condind( X, Z, Y, - kernel_X=polynomial_kernel, - kernel_Y=polynomial_kernel, - kernel_Z=polynomial_kernel, + kernel_X="polynomial", + kernel_Y="polynomial", + kernel_Z="polynomial", ).pvalue > 0.05 ) diff --git a/tests/test_kernels.py b/tests/test_kernels.py index 94ea37e..999b6db 100644 --- a/tests/test_kernels.py +++ b/tests/test_kernels.py @@ -1,7 +1,9 @@ import numpy as np import pytest +from numpy.testing import assert_array_equal +from sklearn.metrics.pairwise import pairwise_kernels -from pywhy_stats.kernels import estimate_squared_sigma_rbf +from pywhy_stats.kernels import delta_kernel, estimate_squared_sigma_rbf def test_given_simple_data_when_estimate_squared_sigma_rbf_then_returns_correct_results(): @@ -13,3 +15,17 @@ def test_given_simple_data_when_estimate_squared_sigma_rbf_then_returns_correct_ # 1 / median_distance_between_points**2 assert estimate_squared_sigma_rbf(X, method="median") == pytest.approx(0.1111111111111111) + + +def test_delta_kernel_works_with_sklearn(): + X = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]) + Y = np.array([[1, 2, 3], [7, 8, 9]]) + + kernel = delta_kernel(X, Y) + assert_array_equal(kernel, np.array([[1, 0], [0, 0], [0, 1]])) + + X = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]) + Y = np.array([[1, 2, 3], [7, 8, 9]]) + + kernel = pairwise_kernels(X, Y, metric="delta") + assert_array_equal(kernel, np.array([[1, 0], [0, 0], [0, 1]]))