# This file is part of sbi, a toolkit for simulation-based inference. sbi is licensed
# under the Apache License Version 2.0, see <https://www.apache.org/licenses/>
import warnings
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
import numpy as np
import torch
from sklearn.base import BaseEstimator, clone
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import KFold
from sklearn.neural_network import MLPClassifier
from skorch import NeuralNetBinaryClassifier
from skorch.callbacks import EarlyStopping
from torch import Tensor
from tqdm import tqdm
from sbi.utils.diagnostics_utils import remove_nans_and_infs_in_x
from sbi.utils.metrics import MLPClassifierModule
[docs]
class LC2ST:
r"""L-C2ST: Local Classifier Two-Sample Test.
Implementation based on the official code from [1] and the exisiting C2ST
metric [2], using scikit-learn classifiers.
L-C2ST tests the local consistency of a posterior estimator :math:`q` with
respect to the true posterior :math:`p`, at a fixed observation :math:`x_o`,
i.e., whether the following null hypothesis holds:
:math:`H_0(x_o) := q(\theta \mid x_o) = p(\theta \mid x_o)`.
L-C2ST proceeds as follows:
1. It first trains a classifier to distinguish between samples from two joint
distributions :math:`[\theta_p, x_p]` and :math:`[\theta_q, x_q]`, and
evaluates the L-C2ST statistic at a given observation :math:`x_o`.
2. The L-C2ST statistic is the mean squared error between the predicted
probabilities of being in p (class 0) and a Dirac at 0.5, which corresponds to
the chance level of the classifier, unable to distinguish between p and q.
- If ``num_ensemble>1``, the average prediction over all classifiers is used.
- If ``num_folds>1`` the average statistic over all cv-folds is used.
To evaluate the test, steps 1 and 2 are performed over multiple trials under the
null hypothesis (H0). If the null distribution is not known, it is estimated
using the permutation method, i.e. by training the classifier on the permuted
data. The statistics obtained under (H0) is then compared to the one obtained
on observed data to compute the p-value, used to decide whether to reject (H0)
or not.
"""
def __init__(
self,
thetas: Tensor,
xs: Tensor,
posterior_samples: Tensor,
seed: int = 1,
num_folds: int = 1,
num_ensemble: int = 1,
classifier: Union[str, Type[BaseEstimator]] = MLPClassifier,
z_score: bool = False,
classifier_kwargs: Optional[Dict[str, Any]] = None,
num_trials_null: int = 100,
permutation: bool = True,
device: str = "cpu",
) -> None:
"""Initialize L-C2ST.
Args:
thetas: Samples from the prior, of shape (sample_size, dim).
xs: Corresponding simulated data, of shape (sample_size, dim_x).
posterior_samples: Samples from the estiamted posterior,
of shape (sample_size, dim)
seed: Seed for the sklearn classifier and the KFold cross validation,
defaults to 1.
num_folds: Number of folds for the cross-validation,
defaults to 1 (no cross-validation).
This is useful to reduce variance coming from the data.
num_ensemble: Number of classifiers for ensembling, defaults to 1.
This is useful to reduce variance coming from the classifier.
z_score: Whether to z-score to normalize the data, defaults to False.
classifier: Classification architecture to use, can be one of the following:
- "random_forest" or "mlp", defaults to "mlp" or
- A classifier class (e.g., RandomForestClassifier, MLPClassifier)
Note:
For the "mlp" case , if a GPU or MPS device is passed and
available, the implementation will use
'skorch.NeuralNetBinaryClassifier', and corresponding kwargs
can be passed via the 'classifier_kwargs' argument.
classifier_kwargs: Custom kwargs for the sklearn classifier or skorch
classifier (depending on device argument), defaults to None.
num_trials_null: Number of trials to estimate the null distribution,
defaults to 100.
permutation: Whether to use the permutation method for the null hypothesis,
defaults to True.
device: The device to use for training the classifier, e.g., "cpu" or
"cuda" or "mps". Defaults to "cpu". GPU support is only available for
the MLP classifier.
References:
[1] : https://arxiv.org/abs/2306.03580, https://github.com/JuliaLinhart/lc2st
[2] : https://github.com/sbi-dev/sbi/blob/main/sbi/utils/metrics.py
"""
# check inputs
thetas, xs = remove_nans_and_infs_in_x(thetas, xs)
assert thetas.shape[0] == xs.shape[0] == posterior_samples.shape[0], (
"Number of samples must match"
)
# set observed data for classification
self.theta_p = posterior_samples
self.x_p = xs
self.theta_q = thetas
self.x_q = xs
# z-score normalization parameters
self.z_score = z_score
self.theta_p_mean = torch.mean(self.theta_p, dim=0)
self.theta_p_std = torch.std(self.theta_p, dim=0)
self.x_p_mean = torch.mean(self.x_p, dim=0)
self.x_p_std = torch.std(self.x_p, dim=0)
# set parameters for classifier training
self.seed = seed
self.num_folds = num_folds
self.num_ensemble = num_ensemble
self.device = device
# initialize classifier
if isinstance(classifier, str):
if classifier.lower() == 'mlp':
use_gpu = (
self.device.lower() == 'cuda' and torch.cuda.is_available()
) or (
self.device.lower() == 'mps' and torch.backends.mps.is_available()
)
if use_gpu:
classifier = NeuralNetBinaryClassifier
else:
if self.device.lower() in ("cuda", "mps"):
warnings.warn(
f"The requested device '{self.device}' is not available. "
"For the MLP classifier, computation will proceed on CPU.",
UserWarning,
stacklevel=2,
)
classifier = MLPClassifier
elif classifier.lower() == 'random_forest':
if self.device.lower() in ("cuda", "mps"):
warnings.warn(
"RandomForestClassifier does not support GPU or MPS training. "
"Computation will proceed on CPU.",
UserWarning,
stacklevel=2,
)
classifier = RandomForestClassifier
else:
raise ValueError(
f'Invalid classifier: "{classifier}".'
'Expected "mlp", "random_forest", '
'or a valid scikit-learn classifier class.'
)
assert issubclass(classifier, BaseEstimator), (
"classier must either be a string or a subclass of BaseEstimator."
)
self.clf_class = classifier
# prepare user overrides
user_kwargs = classifier_kwargs if classifier_kwargs is not None else {}
# define Defaults based on classifier type
if self.clf_class == MLPClassifier:
ndim = thetas.shape[-1]
self.clf_kwargs = {
"activation": "relu",
"hidden_layer_sizes": (10 * ndim, 10 * ndim),
"max_iter": 1000,
"solver": "adam",
"early_stopping": True,
"n_iter_no_change": 50,
}
elif self.clf_class == NeuralNetBinaryClassifier:
ndim = thetas.shape[-1]
input_dim = thetas.shape[-1] + xs.shape[-1]
self.clf_kwargs = {
"module": MLPClassifierModule,
"module__input_dim": input_dim,
"module__output_dim": 1,
"module__hidden_layer_sizes": (10 * ndim, 10 * ndim),
"criterion": torch.nn.BCEWithLogitsLoss,
"optimizer": torch.optim.Adam,
"max_epochs": 1000,
"batch_size": 200,
"callbacks": [
(
'es',
EarlyStopping(
patience=50, monitor='valid_loss', load_best=True
),
)
],
"optimizer__weight_decay": 1e-4,
"device": self.device,
"verbose": 0,
"iterator_train__shuffle": True,
}
else:
self.clf_kwargs: Dict[str, Any] = {}
# update with User Overrides (if any)
if user_kwargs:
self.clf_kwargs.update(user_kwargs)
# initialize classifiers, will be set after training
self.trained_clfs = None
self.trained_clfs_null = None
# parameters for the null hypothesis testing
self.num_trials_null = num_trials_null
self.permutation = permutation
# can be specified if known and independent of x (see `LC2ST-NF`)
self.null_distribution = None
def _train(
self,
theta_p: Tensor,
theta_q: Tensor,
x_p: Tensor,
x_q: Tensor,
verbosity: int = 0,
) -> List[Any]:
"""Returns the classifiers trained on observed data.
Args:
theta_p: Samples from P, of shape (sample_size, dim).
theta_q: Samples from Q, of shape (sample_size, dim).
x_p: Observations corresponding to P, of shape (sample_size, dim_x).
x_q: Observations corresponding to Q, of shape (sample_size, dim_x).
verbosity: Verbosity level, defaults to 0.
Returns:
List of trained classifiers for each cv fold.
"""
# prepare data
if self.z_score:
theta_p = (theta_p - self.theta_p_mean) / self.theta_p_std
theta_q = (theta_q - self.theta_p_mean) / self.theta_p_std
x_p = (x_p - self.x_p_mean) / self.x_p_std
x_q = (x_q - self.x_p_mean) / self.x_p_std
# initialize classifier
clf = self.clf_class(**self.clf_kwargs or {})
if self.num_ensemble > 1:
clf = EnsembleClassifier(clf, self.num_ensemble, verbosity=verbosity)
# cross-validation
if self.num_folds > 1:
trained_clfs = []
kf = KFold(n_splits=self.num_folds, shuffle=True, random_state=self.seed)
cv_splits = kf.split(theta_p.numpy())
for train_idx, _ in tqdm(
cv_splits, desc="Cross-validation", disable=verbosity < 1
):
# get train split
theta_p_train, theta_q_train = theta_p[train_idx], theta_q[train_idx]
x_p_train, x_q_train = x_p[train_idx], x_q[train_idx]
# train classifier
clf_n = train_lc2st(
theta_p_train, theta_q_train, x_p_train, x_q_train, clf
)
trained_clfs.append(clf_n)
else:
# train single classifier
clf = train_lc2st(theta_p, theta_q, x_p, x_q, clf)
trained_clfs = [clf]
return trained_clfs
[docs]
def get_scores(
self,
theta_o: Tensor,
x_o: Tensor,
trained_clfs: List[Any],
return_probs: bool = False,
) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]:
"""Computes the L-C2ST scores given the trained classifiers.
Mean squared error (MSE) between 0.5 and the predicted probabilities
of being in class 0 over the dataset (`theta_o`, `x_o`).
Args:
theta_o: Samples from the posterior conditioned on the observation `x_o`,
of shape (sample_size, dim).
x_o: The observation, of shape (,dim_x).
trained_clfs: List of trained classifiers, of length `num_folds`.
return_probs: Whether to return the predicted probabilities of being in P,
defaults to False.
Returns: one of
scores: L-C2ST scores at `x_o`, of shape (`num_folds`,).
(probs, scores): Predicted probabilities and L-C2ST scores at `x_o`,
each of shape (`num_folds`,).
"""
if x_o.shape == self.x_p_mean.shape:
x_o = x_o.unsqueeze(0)
# prepare data
if self.z_score:
theta_o = (theta_o - self.theta_p_mean) / self.theta_p_std
x_o = (x_o - self.x_p_mean) / self.x_p_std
probs, scores = [], []
# evaluate classifiers
for clf in trained_clfs:
proba, score = eval_lc2st(theta_o, x_o, clf, return_proba=True)
probs.append(proba)
scores.append(score)
probs, scores = np.array(probs), np.array(scores)
if return_probs:
return probs, scores
else:
return scores
[docs]
def train_on_observed_data(
self, seed: Optional[int] = None, verbosity: int = 1
) -> Union[None, List[Any]]:
"""Trains the classifier on the observed data.
Saves the trained classifier(s) as a list of length `num_folds`.
Args:
seed: random state of the classifier, defaults to None.
verbosity: Verbosity level, defaults to 1.
"""
# set random state
if seed is not None:
if "random_state" in self.clf_kwargs:
print("WARNING: changing the random state of the classifier.")
self.clf_kwargs["random_state"] = seed
# train the classifier
trained_clfs = self._train(
self.theta_p, self.theta_q, self.x_p, self.x_q, verbosity=verbosity
)
self.trained_clfs = trained_clfs
[docs]
def get_statistic_on_observed_data(
self,
theta_o: Tensor,
x_o: Tensor,
) -> float:
"""Computes the L-C2ST statistics for the observed data.
Mean over all cv-scores.
Args:
theta_o: Samples from the posterior conditioned on the observation `x_o`,
of shape (sample_size, dim).
x_o: The observation, of shape (, dim_x)
Returns:
L-C2ST statistic at `x_o`.
"""
assert self.trained_clfs is not None, (
"No trained classifiers found. Run `train_on_observed_data` first."
)
_, scores = self.get_scores(
theta_o=theta_o,
x_o=x_o,
trained_clfs=self.trained_clfs,
return_probs=True,
)
return float(scores.mean())
[docs]
def p_value(
self,
theta_o: Tensor,
x_o: Tensor,
) -> float:
r"""Computes the p-value for L-C2ST.
The p-value is the proportion of times the L-C2ST statistic under the null
hypothesis is greater than the L-C2ST statistic at the observation `x_o`.
It is computed by taking the empirical mean over statistics computed on
several trials under the null hypothesis: $1/H \sum_{h=1}^{H} I(T_h < T_o)$.
Args:
theta_o: Samples from the posterior conditioned on the observation `x_o`,
of dhape (sample_size, dim).
x_o: The observation, of shape (, dim_x).
Returns:
p-value for L-C2ST at `x_o`.
"""
stat_data = self.get_statistic_on_observed_data(theta_o=theta_o, x_o=x_o)
_, stats_null = self.get_statistics_under_null_hypothesis(
theta_o=theta_o, x_o=x_o, return_probs=True, verbosity=0
)
return float((stat_data < stats_null).mean())
[docs]
def reject_test(
self,
theta_o: Tensor,
x_o: Tensor,
alpha: float = 0.05,
) -> bool:
"""Computes the test result for L-C2ST at a given significance level.
Args:
theta_o: Samples from the posterior conditioned on the observation `x_o`,
of shape (sample_size, dim).
x_o: The observation, of shape (, dim_x).
alpha: Significance level, defaults to 0.05.
Returns:
The L-C2ST result: True if rejected, False otherwise.
"""
return bool(self.p_value(theta_o=theta_o, x_o=x_o) < alpha)
[docs]
def train_under_null_hypothesis(
self,
verbosity: int = 1,
) -> None:
"""Computes the L-C2ST scores under the null hypothesis (H0).
Saves the trained classifiers for each null trial.
Args:
verbosity: Verbosity level, defaults to 1.
"""
trained_clfs_null = {}
for t in tqdm(
range(self.num_trials_null),
desc=f"Training the classifiers under H0, permutation = {self.permutation}",
disable=verbosity < 1,
):
# prepare data
if self.permutation:
joint_p = torch.cat([self.theta_p, self.x_p], dim=1)
joint_q = torch.cat([self.theta_q, self.x_q], dim=1)
# permute data (same as permuting the labels)
joint_p_perm, joint_q_perm = permute_data(joint_p, joint_q, seed=t)
# extract the permuted P and Q and x
theta_p_t, x_p_t = (
joint_p_perm[:, : self.theta_p.shape[-1]],
joint_p_perm[:, self.theta_p.shape[1] :],
)
theta_q_t, x_q_t = (
joint_q_perm[:, : self.theta_q.shape[-1]],
joint_q_perm[:, self.theta_q.shape[1] :],
)
else:
assert self.null_distribution is not None, (
"You need to provide a null distribution"
)
theta_p_t = self.null_distribution.sample((self.theta_p.shape[0],))
theta_q_t = self.null_distribution.sample((self.theta_p.shape[0],))
x_p_t, x_q_t = self.x_p, self.x_q
if self.z_score:
theta_p_t = (theta_p_t - self.theta_p_mean) / self.theta_p_std
theta_q_t = (theta_q_t - self.theta_p_mean) / self.theta_p_std
x_p_t = (x_p_t - self.x_p_mean) / self.x_p_std
x_q_t = (x_q_t - self.x_p_mean) / self.x_p_std
# train
clf_t = self._train(theta_p_t, theta_q_t, x_p_t, x_q_t, verbosity=0)
trained_clfs_null[t] = clf_t
self.trained_clfs_null = trained_clfs_null
[docs]
def get_statistics_under_null_hypothesis(
self,
theta_o: Tensor,
x_o: Tensor,
return_probs: bool = False,
verbosity: int = 0,
) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]:
"""Computes the L-C2ST scores under the null hypothesis.
Args:
theta_o: Samples from the posterior conditioned on the observation `x_o`,
of shape (sample_size, dim).
x_o: The observation, of shape (, dim_x).
return_probs: Whether to return the predicted probabilities of being in P,
defaults to False.
verbosity: Verbosity level, defaults to 1.
Returns: one of
scores: L-C2ST scores under (H0).
(probs, scores): Predicted probabilities and L-C2ST scores under (H0).
"""
if self.trained_clfs_null is None:
raise ValueError(
"You need to train the classifiers under (H0). \
Run `train_under_null_hypothesis`."
)
else:
assert len(self.trained_clfs_null) == self.num_trials_null, (
"You need one classifier per trial."
)
probs_null, stats_null = [], []
for t in tqdm(
range(self.num_trials_null),
desc=f"Computing T under (H0) - permutation = {self.permutation}",
disable=verbosity < 1,
):
# prepare data
if self.permutation:
theta_o_t = theta_o
else:
assert self.null_distribution is not None, (
"You need to provide a null distribution"
)
theta_o_t = self.null_distribution.sample((theta_o.shape[0],))
if self.z_score:
theta_o_t = (theta_o_t - self.theta_p_mean) / self.theta_p_std
x_o = (x_o - self.x_p_mean) / self.x_p_std
# evaluate
clf_t = self.trained_clfs_null[t]
probs, scores = self.get_scores(
theta_o=theta_o_t, x_o=x_o, trained_clfs=clf_t, return_probs=True
)
probs_null.append(probs)
stats_null.append(scores.mean())
probs_null, stats_null = np.array(probs_null), np.array(stats_null)
if return_probs:
return probs_null, stats_null
else:
return stats_null
class LC2ST_NF(LC2ST):
def __init__(
self,
thetas: Tensor,
xs: Tensor,
posterior_samples: Tensor,
flow_inverse_transform: Callable[[Tensor, Tensor], Tensor],
flow_base_dist: torch.distributions.Distribution,
num_eval: int = 10_000,
trained_clfs_null: Optional[Dict[str, List[Any]]] = None,
**kwargs: Any,
) -> None:
r"""L-C2ST for Normalizing Flows.
LC2ST_NF is a subclass of LC2ST that performs the test in the space of the
base distribution of a normalizing flow. It uses the inverse transform of the
normalizing flow $T_\phi^{-1}$ to map the samples from the prior and the
posterior to the base distribution space. Following Theorem 4, Eq. 17 from [1],
the new null hypothesis for a Gaussian base distribution is:
:math:`H_0(x_o) := p(T_\phi^{-1}(\theta ; x_o) \mid x_o) = \mathcal{N}(0,`
:math:`I_m)`.
This is because a sample from the normalizing flow is a sample from the base
distribution pushed through the flow:
:math:`z = T_\phi^{-1}(\theta) \sim \mathcal{N}(0, I_m) \iff`
:math:`\theta = T_\phi(z)`.
This defines the two classes P and Q for the L-C2ST test, one of which is
the Gaussion distribution that can be easily be sampled from and is independent
of the observation `x_o` and the estimator q.
Important features are:
- no `theta_o` is passed to the evaluation functions (e.g. `get_scores`),
as the base distribution is known, samples are drawn at initialization.
- no permutation method is used, as the null distribution is known,
samples are drawn during `train_under_null_hypothesis`.
- the classifiers can be pre-trained under the null and `trained_clfs_null`
passed as an argument at initialization. They do not depend on the
observed data (i.e. `posterior_samples` and `xs`).
Args:
thetas: Samples from the prior, of shape (sample_size, dim).
xs: Corresponding simulated data, of shape (sample_size, dim_x).
posterior_samples: Samples from the estiamted posterior,
of shape (sample_size, dim).
flow_inverse_transform: Inverse transform of the normalizing flow.
Takes thetas and xs as input and returns noise.
flow_base_dist: Base distribution of the normalizing flow.
num_eval: Number of samples to evaluate the L-C2ST.
trained_clfs_null: Pre-trained classifiers under the null.
kwargs: Additional arguments for the LC2ST class.
References:
[1] : https://arxiv.org/abs/2306.03580, https://github.com/JuliaLinhart/lc2st
"""
# Apply the inverse transform to the thetas and the posterior samples
self.flow_inverse_transform = flow_inverse_transform
inverse_thetas = flow_inverse_transform(thetas, xs).detach()
inverse_posterior_samples = flow_inverse_transform(
posterior_samples, xs
).detach()
# Initialize the LC2ST class with the inverse transformed samples
super().__init__(inverse_thetas, xs, inverse_posterior_samples, **kwargs)
# Set the parameters for the null hypothesis testing
self.null_distribution = flow_base_dist
self.permutation = False
self.trained_clfs_null = trained_clfs_null
# Draw samples from the base distribution for evaluation
self.theta_o = flow_base_dist.sample(torch.Size([num_eval]))
def get_scores(
self,
x_o: Tensor,
trained_clfs: List[Any],
return_probs: bool = False,
**kwargs: Any,
) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]:
"""Computes the L-C2ST scores given the trained classifiers.
Args:
x_o: The observation, of shape (,dim_x).
trained_clfs: Trained classifiers.
return_probs: Whether to return the predicted probabilities of being in P,
defaults to False.
kwargs: Additional arguments used in the parent class.
Returns: one of
scores: L-C2ST scores at `x_o`.
(probs, scores): Predicted probabilities and L-C2ST scores at `x_o`.
"""
return super().get_scores(
theta_o=self.theta_o,
x_o=x_o,
trained_clfs=trained_clfs,
return_probs=return_probs,
)
def get_statistic_on_observed_data(
self,
x_o: Tensor,
**kwargs: Any,
) -> float:
"""Computes the L-C2ST statistics for the observed data:
Mean over all cv-scores.
Args:
x_o: The observation, of shape (, dim_x).
kwargs: Additional arguments used in the parent class.
Returns:
L-C2ST statistic at `x_o`.
"""
return super().get_statistic_on_observed_data(theta_o=self.theta_o, x_o=x_o)
def p_value(
self,
x_o: Tensor,
**kwargs: Any,
) -> float:
r"""Computes the p-value for L-C2ST.
The p-value is the proportion of times the L-C2ST statistic under the null
hypothesis is greater than the L-C2ST statistic at the observation `x_o`.
It is computed by taking the empirical mean over statistics computed on
several trials under the null hypothesis: $1/H \sum_{h=1}^{H} I(T_h < T_o)$.
Args:
x_o: The observation, of shape (, dim_x).
kwargs: Additional arguments used in the parent class.
Returns:
p-value for L-C2ST at `x_o`.
"""
return super().p_value(theta_o=self.theta_o, x_o=x_o)
def reject_test(
self,
x_o: Tensor,
alpha: float = 0.05,
**kwargs: Any,
) -> bool:
"""Computes the test result for L-C2ST at a given significance level.
Args:
x_o: The observation, of shape (, dim_x).
alpha: Significance level, defaults to 0.05.
kwargs: Additional arguments used in the parent class.
Returns:
L-C2ST result: True if rejected, False otherwise.
"""
return super().reject_test(theta_o=self.theta_o, x_o=x_o, alpha=alpha)
def train_under_null_hypothesis(
self,
verbosity: int = 1,
) -> None:
"""Computes the L-C2ST scores under the null hypothesis.
Saves the trained classifiers for the null distribution.
Args:
verbosity: Verbosity level, defaults to 1.
"""
if self.trained_clfs_null is not None:
raise ValueError(
"Classifiers have already been trained under the null \
and can be used to evaluate any new estimator."
)
return super().train_under_null_hypothesis(verbosity=verbosity)
def get_statistics_under_null_hypothesis(
self,
x_o: Tensor,
return_probs: bool = False,
verbosity: int = 0,
**kwargs: Any,
) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]:
"""Computes the L-C2ST scores under the null hypothesis.
Args:
x_o: The observation.
Shape (, dim_x)
return_probs: Whether to return the predicted probabilities of being in P.
Defaults to False.
verbosity: Verbosity level, defaults to 1.
kwargs: Additional arguments used in the parent class.
"""
return super().get_statistics_under_null_hypothesis(
theta_o=self.theta_o,
x_o=x_o,
return_probs=return_probs,
verbosity=verbosity,
)
def train_lc2st(
theta_p: Tensor, theta_q: Tensor, x_p: Tensor, x_q: Tensor, clf: BaseEstimator
) -> Any:
"""Trains the classifier on the joint data for the L-C2ST.
Args:
theta_p: Samples from P, of shape (sample_size, dim).
theta_q: Samples from Q, of shape (sample_size, dim).
x_p: Observations corresponding to P, of shape (sample_size, dim_x).
x_q: Observations corresponding to Q, of shape (sample_size, dim_x).
clf: Classifier to train.
Returns:
Trained classifier.
"""
# concatenate to get joint data
joint_p = np.concatenate([theta_p.cpu().numpy(), x_p.cpu().numpy()], axis=1)
joint_q = np.concatenate([theta_q.cpu().numpy(), x_q.cpu().numpy()], axis=1)
# prepare data
data = np.concatenate((joint_p, joint_q))
# labels (float32 for compatibility with BCEWithLogitsLoss in skorch)
target = np.concatenate((
np.zeros((joint_p.shape[0],), dtype=np.float32),
np.ones((joint_q.shape[0],), dtype=np.float32),
))
# train classifier
clf_ = clone(clf)
clf_.fit(data, target) # type: ignore
return clf_
def eval_lc2st(
theta_p: Tensor, x_o: Tensor, clf: BaseEstimator, return_proba: bool = False
) -> Union[float, Tuple[np.ndarray, float]]:
"""Evaluates the classifier returned by `train_lc2st` for one observation
`x_o` and over the samples `P`.
Args:
theta_p: Samples from p (class 0), of shape (sample_size, dim).
x_o: The observation, of shape (1, dim_x).
clf: Trained classifier.
return_proba: Whether to return the predicted probabilities of being in P,
defaults to False.
Returns:
L-C2ST score at `x_o`: MSE between 0.5 and the predicted classifier
probability for class 0 on `theta_p`.
"""
# concatenate to get joint data
joint_p = np.concatenate(
[theta_p.cpu().numpy(), x_o.repeat(len(theta_p), 1).cpu().numpy()], axis=1
)
# evaluate classifier
# probability of being in P (class 0)
proba = clf.predict_proba(joint_p)[:, 0] # type: ignore
# mean squared error between proba and dirac at 0.5
score = float(((proba - [0.5] * len(proba)) ** 2).mean())
if return_proba:
return proba, score
else:
return score
def permute_data(
theta_p: Tensor, theta_q: Tensor, seed: int = 1
) -> Tuple[Tensor, Tensor]:
"""Permutes the concatenated data [P,Q] to create null samples.
Args:
theta_p: samples from P, of shape (sample_size, dim).
theta_q: samples from Q, of shape (sample_size, dim).
seed: random seed, defaults to 1.
Returns:
Permuted data [theta_p,theta_qss]
"""
# set seed
torch.manual_seed(seed)
# check inputs
assert theta_p.shape[0] == theta_q.shape[0]
sample_size = theta_p.shape[0]
X = torch.cat([theta_p, theta_q], dim=0)
x_perm = X[torch.randperm(sample_size * 2)]
return x_perm[:sample_size], x_perm[sample_size:]
class EnsembleClassifier(BaseEstimator):
def __init__(self, clf, num_ensemble=1, verbosity=1):
self.clf = clf
self.num_ensemble = num_ensemble
self.trained_clfs = []
self.verbosity = verbosity
def fit(self, X, y):
for n in tqdm(
range(self.num_ensemble),
desc="Ensemble training",
disable=self.verbosity < 1,
):
clf = clone(self.clf)
if hasattr(clf, 'random_state'):
if clf.random_state is not None:
clf.random_state += n
else:
clf.random_state = n + 1
clf.fit(X, y) # type: ignore
self.trained_clfs.append(clf)
def predict_proba(self, X):
probas = [clf.predict_proba(X) for clf in self.trained_clfs]
return np.mean(probas, axis=0)