Skip to content

Modular model API #398

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
743 changes: 743 additions & 0 deletions notebooks/modular_models.ipynb

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions pymc_experimental/model/modular/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from pymc_experimental.model.modular.components import Intercept, Regression, Spline
from pymc_experimental.model.modular.likelihood import NormalLikelihood

__all__ = [
"Intercept",
"Regression",
"Spline",
"NormalLikelihood",
]
383 changes: 383 additions & 0 deletions pymc_experimental/model/modular/components.py

Large diffs are not rendered by default.

196 changes: 196 additions & 0 deletions pymc_experimental/model/modular/likelihood.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
from abc import ABC, abstractmethod
from collections.abc import Sequence
from io import StringIO
from typing import Literal, get_args

import arviz as az
import pandas as pd
import pymc as pm
import pytensor.tensor as pt
import rich

from pymc.backends.arviz import apply_function_over_dataset
from pymc.model.fgraph import clone_model
from pymc.pytensorf import reseed_rngs
from pytensor.tensor.random.type import RandomType

from pymc_experimental.model.marginal.marginal_model import MarginalModel
from pymc_experimental.model.modular.utilities import ColumnType, encode_categoricals
from pymc_experimental.printing import model_table

LIKELIHOOD_TYPES = Literal["lognormal", "logt", "mixture", "unmarginalized-mixture"]
valid_likelihoods = get_args(LIKELIHOOD_TYPES)


class Likelihood(ABC):
"""Class to represent a likelihood function for a GLM component. Subclasses should implement the _get_model_class
method to return the type of model used by the likelihood function, and should implement a `register_xx` method for
each parameter unique to that likelihood function."""

def __init__(self, target_col: ColumnType, data: pd.DataFrame):
"""
Initialization logic common to all likelihoods.
All subclasses should call super().__init__(y) to register data and create a model object. The subclass __init__
method should then create a PyMC model inside the self.model context.
Parameters
----------
y: Series or DataFrame, optional
Observed data. Must have a name attribute (if a Series), and an index with a name attribute.
"""

if not isinstance(target_col, str):
[target_col] = target_col
self.target_col = target_col

X_df = data.drop(columns=[target_col])

self.obs_dim = data.index.name if data.index.name is not None else "obs_idx"
self.coords = {
self.obs_dim: data.index.values,
}

X_df, self.coords = encode_categoricals(X_df, self.coords)

numeric_cols = [
col for col, dtype in X_df.dtypes.to_dict().items() if dtype.name.startswith("float")
]
self.coords["feature"] = numeric_cols

with self._get_model_class(self.coords) as self.model:
pm.Data(f"{target_col}_observed", data[target_col], dims=self.obs_dim)
pm.Data(
"X_data",
X_df,
dims=(self.obs_dim, "feature"),
shape=(None, len(self.coords["feature"])),
)

self._predict_fn = None # We are caching function for faster predictions

def sample(self, **sample_kwargs):
with self.model:
return pm.sample(**sample_kwargs)

def sample_prior_predictive(self, **sample_kwargs):
with self.model:
return pm.sample_prior_predictive(**sample_kwargs)

def predict(
self,
idata: az.InferenceData,
predict_df: pd.DataFrame,
random_seed=None,
compile_kwargs=None,
):
# Makes sure only features present during fitting are used and sorted during prediction
X_data = predict_df[list(self.model.coords["feature"])].copy()
for col, dtype in X_data.dtypes.to_dict().items():
if dtype.name.startswith("float"):
pass
elif dtype.name == "object":
X_data[col] = X_data[col].map(self.column_labels[col]).astype("float64")
elif dtype.name.startswith("int"):
X_data[col] = X_data[col].astype("float64")
else:
raise NotImplementedError(
f"Haven't decided how to handle the following type: {dtype.name}"
)

coords = {self.obs_dim: X_data.index.values}

predict_fn = self._predict_fn

if predict_fn is None:
model_copy = clone_model(self.model)
# TODO: Freeze everything that is not supposed to change, when PyMC allows it
# dims = [dim for dim self.model.coords.keys() if dim != self.obs_dim]
# model_copy = freeze_dims_and_data(model_copy, dims=dims, data=[])
observed_RVs = model_copy.observed_RVs
if compile_kwargs is None:
compile_kwargs = {}
predict_fn = model_copy.compile_fn(
observed_RVs,
inputs=model_copy.free_RVs,
mode=compile_kwargs.pop("mode", None),
on_unused_input="ignore",
**compile_kwargs,
)
predict_fn.trust_input = True
self._predict_fn = predict_fn

[X_var] = [shared for shared in predict_fn.f.get_shared() if shared.name == "X_data"]
if random_seed is not None:
rngs = [
shared
for shared in predict_fn.f.get_shared()
if isinstance(shared.type, RandomType)
]
reseed_rngs(rngs, random_seed)
X_var.set_value(X_data.values, borrow=True)

return apply_function_over_dataset(
fn=predict_fn,
dataset=idata.posterior[[rv.name for rv in self.model.free_RVs]],
output_var_names=[rv.name for rv in self.model.observed_RVs],
dims={rv.name: [self.obs_dim] for rv in self.model.observed_RVs},
coords=coords,
progressbar=False,
)

@abstractmethod
def _get_model_class(self, coords: dict[str, Sequence]) -> pm.Model | MarginalModel:
"""Return the type on model used by the likelihood function"""
raise NotImplementedError

def register_mu(self, mu=None):
with self.model:
if mu is not None:
return pm.Deterministic("mu", mu.build(self.model), dims=[self.obs_dim])
return pm.Normal("mu", 0, 100)

def register_sigma(self, sigma=None):
with self.model:
if sigma is not None:
return pm.Deterministic(
"sigma", pt.exp(sigma.build(self.model)), dims=[self.obs_dim]
)
return pm.Exponential("sigma", lam=1)

def __repr__(self):
table = model_table(self.model)
buffer = StringIO()
rich.print(table, file=buffer)

return buffer.getvalue()

def to_graphviz(self):
return self.model.to_graphviz()

# def _repr_html_(self):
# return model_table(self.model)


class NormalLikelihood(Likelihood):
"""
A model with normally distributed errors
"""

def __init__(self, mu, sigma, target_col: ColumnType, data: pd.DataFrame):
super().__init__(target_col=target_col, data=data)

with self.model:
mu = self.register_mu(mu)
sigma = self.register_sigma(sigma)

pm.Normal(
target_col,
mu=mu,
sigma=sigma,
observed=self.model[f"{target_col}_observed"],
dims=[self.obs_dim],
)

def _get_model_class(self, coords: dict[str, Sequence]) -> pm.Model | MarginalModel:
return pm.Model(coords=coords)
413 changes: 413 additions & 0 deletions pymc_experimental/model/modular/utilities.py

Large diffs are not rendered by default.

106 changes: 106 additions & 0 deletions tests/model/modular/test_components.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import numpy as np
import pandas as pd
import pymc as pm
import pytest

from model.modular.utilities import at_least_list, encode_categoricals

from pymc_experimental.model.modular.components import Intercept, PoolingType, Regression, Spline


@pytest.fixture(scope="session")
def rng():
return np.random.default_rng()


@pytest.fixture(scope="session")
def model(rng):
city = ["A", "B", "C"]
race = ["white", "black", "hispanic"]

df = pd.DataFrame(
{
"city": np.random.choice(city, 1000),
"age": rng.normal(size=1000),
"race": rng.choice(race, size=1000),
"income": rng.normal(size=1000),
}
)

coords = {"feature": df.columns, "obs_idx": df.index}

df, coords = encode_categoricals(df, coords)

with pm.Model(coords=coords) as m:
X = pm.Data("X_data", df, dims=["obs_idx", "features"])

return m


@pytest.mark.parametrize("pooling", ["partial", "none", "complete"], ids=str)
@pytest.mark.parametrize("prior", ["Normal", "Laplace", "StudentT"], ids=str)
def test_intercept(pooling: PoolingType, prior, model):
intercept = Intercept(name=None, pooling=pooling, pooling_columns="city", prior=prior)

x = intercept.build(model.copy()).eval()

if pooling != "complete":
assert x.shape[0] == len(model.coords["obs_idx"])
assert np.unique(x).shape[0] == len(model.coords["city"])
else:
assert np.unique(x).shape[0] == 1


@pytest.mark.parametrize("pooling", ["partial", "none", "complete"], ids=str)
@pytest.mark.parametrize("prior", ["Normal", "Laplace", "StudentT"], ids=str)
@pytest.mark.parametrize(
"feature_columns", ["income", ["age", "income"]], ids=["single", "multiple"]
)
def test_regression(pooling: PoolingType, prior, feature_columns, model):
regression = Regression(
name=None,
feature_columns=feature_columns,
prior=prior,
pooling=pooling,
pooling_columns="city",
)

temp_model = model.copy()
xb = regression.build(temp_model)
assert f"Regression({feature_columns})_features" in temp_model.coords.keys()

if pooling != "complete":
assert f"Regression({feature_columns})_city_effect" in temp_model.named_vars
assert f"Regression({feature_columns})_city_effect_sigma" in temp_model.named_vars

if pooling == "partial":
assert (
f"Regression({feature_columns})_city_effect_offset" in temp_model.named_vars_to_dims
)
else:
assert f"Regression({feature_columns})" in temp_model.named_vars

xb_val = xb.eval()

X, beta = xb.owner.inputs[0].owner.inputs
beta_val = beta.eval()
n_features = len(at_least_list(feature_columns))

if pooling != "complete":
assert xb_val.shape[0] == len(model.coords["obs_idx"])
assert np.unique(beta_val).shape[0] == len(model.coords["city"]) * n_features
else:
assert np.unique(beta_val).shape[0] == n_features


@pytest.mark.parametrize("pooling", ["partial", "none", "complete"], ids=str)
@pytest.mark.parametrize("prior", ["Normal", "Laplace", "StudentT"], ids=str)
def test_spline(pooling: PoolingType, prior, model):
spline = Spline(
name=None, feature_column="income", prior=prior, pooling=pooling, pooling_columns="city"
)

temp_model = model.copy()
xb = spline.build(temp_model)

assert "Spline(income, df=10, degree=3)_knots" in temp_model.coords.keys()
31 changes: 31 additions & 0 deletions tests/model/modular/test_likelihood.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import numpy as np
import pandas as pd
import pytest

from pymc_experimental.model.modular.likelihood import NormalLikelihood


@pytest.fixture(scope="session")
def rng():
return np.random.default_rng()


@pytest.fixture(scope="session")
def data(rng):
city = ["A", "B", "C"]
race = ["white", "black", "hispanic"]

df = pd.DataFrame(
{
"city": np.random.choice(city, 1000),
"age": rng.normal(size=1000),
"race": rng.choice(race, size=1000),
"income": rng.normal(size=1000),
}
)
return df


def test_normal_likelihood(data):
model = NormalLikelihood(mu=None, sigma=None, target_col="income", data=data)
idata = model.sample_prior_predictive()
186 changes: 186 additions & 0 deletions tests/model/modular/test_utilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import numpy as np
import pandas as pd
import pymc as pm
import pytensor
import pytest

from numpy.testing import assert_allclose

from pymc_experimental.model.modular.utilities import (
encode_categoricals,
make_level_maps,
make_next_level_hierarchy_variable,
make_partial_pooled_hierarchy,
make_unpooled_hierarchy,
select_data_columns,
)


@pytest.fixture(scope="session")
def rng():
return np.random.default_rng()


@pytest.fixture(scope="session")
def df(rng):
N = 1000

level_0_labels = ["Beverage", "Snack"]
level_1_labels = {
"Beverage": ["Soft Drinks", "Milk", "Smoothies", "Sports Drinks", "Alcoholic Beverages"],
"Snack": ["Jerky", "Pretzels", "Nuts", "Tea"],
}
level_2_labels = {
"Soft Drinks": ["Lemonade", "Cola", "Root Beer", "Ginger Ale"],
"Milk": ["Oat Milk", "Cow Milk", "Soy Milk"],
"Smoothies": ["Green Smoothies", "Berry Smoothies"],
"Sports Drinks": ["Gatorade", "Powerade"],
"Alcoholic Beverages": ["Beer", "Wine", "Spirits"],
"Jerky": ["Vegan Jerky", "Beef Jerky"],
"Pretzels": ["Salted Pretzels", "Unsalted Pretzels"],
"Nuts": ["Peanuts", "Almonds", "Cashews", "Pistachios", "Walnuts"],
"Tea": ["Black Tea", "Green Tea", "Herbal Tea"],
}

level_0_data = rng.choice(level_0_labels, N)
level_1_data = [rng.choice(level_1_labels[level_0]) for level_0 in level_0_data]
level_2_data = [rng.choice(level_2_labels[level_1]) for level_1 in level_1_data]

df = pd.DataFrame(
{
"level_0": level_0_data,
"level_1": level_1_data,
"level_2": level_2_data,
"A": rng.normal(size=N),
"B": rng.normal(size=N),
"C": rng.normal(size=N),
"sales": rng.normal(size=N),
}
)

return df


@pytest.fixture(scope="session")
def encoded_df_and_coords(df):
return encode_categoricals(df, {"feature": df.columns.tolist(), "obs_idx": df.index.tolist()})


@pytest.fixture(scope="session")
def model(encoded_df_and_coords, rng):
df, coords = encoded_df_and_coords

with pm.Model(coords=coords) as m:
X = pm.Data("X", df[coords["feature"]], dims=["obs_idx", "features"])

return m


@pytest.mark.parametrize("cols", ["A", ["A", "B"]], ids=["single", "multiple"])
def test_select_data_columns(model, cols):
col = select_data_columns(cols, model, data_name="X")

idxs = [model.coords["feature"].index(col) for col in cols]
assert_allclose(col.eval(), model["X"].get_value()[:, idxs].squeeze())


def test_select_missing_column_raises(model):
with pytest.raises(ValueError):
select_data_columns("D", model, data_name="X")


def test_make_level_maps(model, encoded_df_and_coords, df):
df_encoded, coords = encoded_df_and_coords
data = pytensor.shared(df_encoded.values)

level_maps = make_level_maps(data, coords, ordered_levels=["level_0", "level_1", "level_2"])

level_maps = [x.eval() for x in level_maps[1:]]
m0, m1, m2 = level_maps

# Rebuild the labels from the level maps
new_labels = np.array(coords["level_0"])[m0]
new_labels = np.array([x + "_" + y for x, y in zip(new_labels, coords["level_1"])])[m1]
new_labels = np.array([x + "_" + y for x, y in zip(new_labels, coords["level_2"])])[m2]

new_labels = pd.Series(new_labels).apply(
lambda x: pd.Series(x.split("_"), index=["level_0", "level_1", "level_2"])
)

pd.testing.assert_frame_equal(new_labels, df[["level_0", "level_1", "level_2"]])


def test_make_simple_hierarchical_variable():
with pm.Model(coords={"level_0": ["A", "B", "C"]}) as m:
mapping = np.random.choice(3, size=(10,))
effect_mu = pm.Normal("effect_mu")
mu = make_next_level_hierarchy_variable(
"effect", mu=effect_mu, offset_dims="level_0", sigma_dims=None, mapping=None
)
mu = mu[mapping]

expected_names = ["effect_mu", "effect_sigma", "effect_offset"]
assert all([x.name in expected_names for x in m.free_RVs])


def test_make_two_level_hierarchical_variable():
with pm.Model(coords={"level_0": ["A", "B", "C"], "level_1": range(9)}) as m:
mapping = np.random.choice(3, size=(10,))

level_0_mu = pm.Normal("level_0_effect_mu")
mu = make_next_level_hierarchy_variable(
"level_0_effect", mu=level_0_mu, offset_dims="level_0", sigma_dims=None, mapping=None
)

mu = make_next_level_hierarchy_variable(
"level_1_effect", mu=mu, offset_dims="level_1", sigma_dims="level_0", mapping=mapping
)

expected_names = [
"level_0_effect_mu",
"level_0_effect_sigma",
"level_0_effect_offset",
"level_0_effect",
"level_1_effect_sigma",
"level_1_effect_offset",
]
assert all([x.name in expected_names for x in m.free_RVs])

m0, s0, o0, m1, s1, o1 = (m[name] for name in expected_names)
assert m1.shape.eval() == (3,)
assert s1.shape.eval() == (3,)
assert o1.shape.eval() == (9,)


def test_make_next_level_no_pooling():
data = pd.DataFrame({"level_0": np.random.choice(["A", "B", "C"], size=(10,))})
data, coords = encode_categoricals(data, {"feature": data.columns, "obs_idx": data.index})

with pm.Model(coords=coords) as m:
X = pm.Data("X_data", data, dims=["obs_idx", "feature"])
mu = make_unpooled_hierarchy(
"effect",
X=X,
levels=["level_0"],
model=m,
sigma_dims=None,
)

assert "effect_sigma" not in m.named_vars
assert "effect_offset" not in m.named_vars
assert mu.shape.eval() == (10,)


@pytest.mark.parametrize(
"pooling_columns", [["level_0"], ["level_0", "level_1"], ["level_0", "level_1", "level_2"]]
)
def test_hierarchical_prior_to_requested_depth(model, encoded_df_and_coords, pooling_columns):
temp_model = model.copy()
with temp_model:
intercept = make_partial_pooled_hierarchy(
name="intercept", X=model["X"], pooling_columns=pooling_columns, model=temp_model
)

intercept = intercept.eval()
assert intercept.shape[0] == len(model.coords["obs_idx"])
assert len(np.unique(intercept)) == len(model.coords[pooling_columns[-1]])