Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cluster_experiments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from cluster_experiments.inference.dimension import Dimension
from cluster_experiments.inference.hypothesis_test import HypothesisTest
from cluster_experiments.inference.metric import Metric, RatioMetric, SimpleMetric
from cluster_experiments.inference.split import Split
from cluster_experiments.inference.variant import Variant
from cluster_experiments.perturbator import (
BetaRelativePerturbator,
Expand Down Expand Up @@ -88,6 +89,7 @@
"SimpleMetric",
"RatioMetric",
"Dimension",
"Split",
"Variant",
"HypothesisTest",
"RelativeMixedPerturbator",
Expand Down
54 changes: 31 additions & 23 deletions cluster_experiments/inference/analysis_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from cluster_experiments.inference.dimension import Dimension
from cluster_experiments.inference.hypothesis_test import HypothesisTest
from cluster_experiments.inference.metric import Metric
from cluster_experiments.inference.split import DefaultSplit
from cluster_experiments.inference.variant import Variant

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -171,37 +172,44 @@ def analyze(
Method to run the experiment analysis.
"""

# Validate input data at the beginning
self._validate_data(exp_data, pre_exp_data)

analysis_results = AnalysisPlanResults()

for test in self.tests:
exp_data = test.add_covariates(exp_data, pre_exp_data)

splits_to_iterate = test.splits if test.splits else [DefaultSplit()]

for treatment_variant in self.treatment_variants:
for dimension in test.dimensions:
for dimension_value in dimension.iterate_dimension_values():

if verbose:
logger.info(
f"Metric: {test.metric.alias}, "
f"Treatment: {treatment_variant.name}, "
f"Dimension: {dimension.name}, "
f"Value: {dimension_value}"
)

test_results = test.get_test_results(
exp_data=exp_data,
control_variant=self.control_variant,
treatment_variant=treatment_variant,
variant_col=self.variant_col,
dimension=dimension,
dimension_value=dimension_value,
alpha=self.alpha,
)

analysis_results = analysis_results + test_results
for split in splits_to_iterate:
for split_value in split.iterate_dimension_values():
for dimension in test.dimensions:
for dimension_value in dimension.iterate_dimension_values():

if verbose:
logger.info(
f"Metric: {test.metric.alias}, "
f"Treatment: {treatment_variant.name}, "
f"Split: {split.name}, "
f"Value: {split_value}, "
f"Dimension: {dimension.name}, "
f"Value: {dimension_value}"
)

test_results = test.get_test_results(
exp_data=exp_data,
control_variant=self.control_variant,
treatment_variant=treatment_variant,
variant_col=self.variant_col,
dimension=dimension,
dimension_value=dimension_value,
alpha=self.alpha,
split=split,
split_value=split_value,
)

analysis_results = analysis_results + test_results

return analysis_results

Expand Down
26 changes: 24 additions & 2 deletions cluster_experiments/inference/analysis_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class AnalysisPlanResults:
std_error: List[float] = field(default_factory=lambda: [])
dimension_name: List[str] = field(default_factory=lambda: [])
dimension_value: List[str] = field(default_factory=lambda: [])
split_name: List[str] = field(default_factory=lambda: [])
split_value: List[str] = field(default_factory=lambda: [])
alpha: List[float] = field(default_factory=lambda: [])

def __add__(self, other):
Expand All @@ -85,11 +87,31 @@ def __add__(self, other):
std_error=self.std_error + other.std_error,
dimension_name=self.dimension_name + other.dimension_name,
dimension_value=self.dimension_value + other.dimension_value,
split_name=self.split_name + other.split_name,
split_value=self.split_value + other.split_value,
alpha=self.alpha + other.alpha,
)

def to_dataframe(self):
return pd.DataFrame(asdict(self))
def to_dataframe(self, drop_empty: bool = False):
data_dict = asdict(self)
max_len = max(len(v) for v in data_dict.values()) if data_dict else 0
for k, v in data_dict.items():
if len(v) < max_len:
data_dict[k] = v + [""] * (max_len - len(v))

df = pd.DataFrame(data_dict)

if drop_empty:
defaults = {
"dimension_name": "__total_dimension",
"dimension_value": "total",
"split_name": "__total_split",
"split_value": "total",
}
for col, val in defaults.items():
if col in df.columns and (df[col] == val).all():
df = df.drop(columns=[col])
return df

def __str__(self) -> str:
n = len(self.ate)
Expand Down
2 changes: 2 additions & 0 deletions cluster_experiments/inference/dimension.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ class Dimension:
"""
A class used to represent a Dimension with a name and values.

Dimensions describe stable attributes of units that do not change during the experiment.

Attributes
----------
name : str
Expand Down
117 changes: 110 additions & 7 deletions cluster_experiments/inference/hypothesis_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from cluster_experiments.inference.analysis_results import AnalysisPlanResults
from cluster_experiments.inference.dimension import DefaultDimension, Dimension
from cluster_experiments.inference.metric import Metric, RatioMetric
from cluster_experiments.inference.split import DefaultSplit, Split
from cluster_experiments.inference.variant import Variant
from cluster_experiments.power_config import analysis_mapping

Expand All @@ -25,7 +26,9 @@ class HypothesisTest:
analysis_config : Optional[dict]
An optional dictionary representing the configuration for the analysis
dimensions : Optional[List[Dimension]]
An optional list of Dimension instances
An optional list of Dimension instances. Dimensions describe stable unit attributes.
splits : Optional[List[Split]]
An optional list of Split instances. Splits describe attributes that can change during the experiment.
cupac_config : Optional[dict]
An optional dictionary representing the configuration for the cupac model
custom_analysis_type_mapper : Optional[Dict[str, ExperimentAnalysis]]
Expand All @@ -38,6 +41,7 @@ def __init__(
analysis_type: str,
analysis_config: Optional[dict] = None,
dimensions: Optional[List[Dimension]] = None,
splits: Optional[List[Split]] = None,
cupac_config: Optional[dict] = None,
custom_analysis_type_mapper: Optional[Dict[str, ExperimentAnalysis]] = None,
):
Expand All @@ -52,6 +56,8 @@ def __init__(
An optional dictionary representing the configuration for the analysis
dimensions : Optional[List[Dimension]]
An optional list of Dimension instances
splits : Optional[List[Split]]
An optional list of Split instances
cupac_config : Optional[dict]
An optional dictionary representing the configuration for the cupac model
custom_analysis_type_mapper : Optional[Dict[str, ExperimentAnalysis]]
Expand All @@ -62,13 +68,15 @@ def __init__(
analysis_type,
analysis_config,
dimensions,
splits,
cupac_config,
custom_analysis_type_mapper,
)
self.metric = metric
self.analysis_type = analysis_type
self.analysis_config = analysis_config or {}
self.dimensions = [DefaultDimension()] + (dimensions or [])
self.splits = [DefaultSplit()] + splits if splits else []
self.cupac_config = cupac_config or {}
self.custom_analysis_type_mapper = custom_analysis_type_mapper or {}

Expand Down Expand Up @@ -141,6 +149,7 @@ def _validate_inputs(
analysis_type: str,
analysis_config: Optional[dict],
dimensions: Optional[List[Dimension]],
splits: Optional[List[Split]] = None,
cupac_config: Optional[dict] = None,
custom_analysis_type_mapper: Optional[Dict[str, ExperimentAnalysis]] = None,
):
Expand All @@ -157,6 +166,8 @@ def _validate_inputs(
An optional dictionary representing the configuration for the analysis
dimensions : Optional[List[Dimension]]
An optional list of Dimension instances
splits : Optional[List[Split]]
An optional list of Split instances
cupac_config : Optional[dict]
An optional dictionary representing the configuration for the cupac model
custom_analysis_type_mapper : Optional[dict[str, ExperimentAnalysis]]
Expand Down Expand Up @@ -187,6 +198,15 @@ def _validate_inputs(
f"Dimensions must be a list of Dimension instances if provided, got {dimensions}"
)

# Check if splits is a list of Split instances when provided
if splits is not None and (
not isinstance(splits, list)
or not all(isinstance(split, Split) for split in splits)
):
raise TypeError(
f"Splits must be a list of Split instances if provided, got {splits}"
)

# Validate custom_analysis_type_mapper if provided
if custom_analysis_type_mapper:
# Ensure it's a dictionary
Expand Down Expand Up @@ -277,6 +297,34 @@ def _prepare_analysis_config(self, treatment_col: str, treatment: str) -> None:

self.new_analysis_config = new_analysis_config

@staticmethod
def _aggregate_by_cluster(
df: pd.DataFrame,
cluster_cols: List[str],
treatment_col: str,
metric: Metric,
covariates: Optional[List[str]] = None,
) -> pd.DataFrame:
"""
Aggregate metric values by cluster.
"""
agg_cols = {}
if isinstance(metric, RatioMetric):
agg_cols[metric.target_column] = "sum"
agg_cols[metric.scale_column] = "sum"
else:
agg_cols[metric.target_column] = "sum"

if covariates:
for covariate in covariates:
if covariate not in df.columns:
raise ValueError(
f"Covariate '{covariate}' is not present in the data for cluster aggregation"
)
agg_cols[covariate] = "mean"

return df.groupby(cluster_cols + [treatment_col], as_index=False).agg(agg_cols)

@staticmethod
def prepare_data(
data: pd.DataFrame,
Expand All @@ -285,18 +333,39 @@ def prepare_data(
control_variant: Variant,
dimension_name: str,
dimension_value: str,
split_name: Optional[str] = None,
split_value: Optional[str] = None,
cluster_cols: Optional[List[str]] = None,
metric: Optional[Metric] = None,
covariates: Optional[List[str]] = None,
) -> pd.DataFrame:
"""
Prepares the data for the experiment analysis pipeline
"""
prepared_df = data.copy()

prepared_df = prepared_df.assign(__total_dimension="total")

prepared_df = prepared_df.query(
f"{variant_col}.isin(['{treatment_variant.name}','{control_variant.name}'])"
).query(f"{dimension_name} == '{dimension_value}'")

if split_name is not None:
prepared_df = prepared_df.assign(__total_split="total")
if split_value is None:
raise ValueError("split_value must be provided when split_name is used")

prepared_df = prepared_df.query(f"{split_name} == '{split_value}'")

if not cluster_cols:
raise ValueError(
f"Split '{split_name}' requires 'cluster_cols' for aggregation."
)

prepared_df = HypothesisTest._aggregate_by_cluster(
df=prepared_df,
cluster_cols=cluster_cols,
treatment_col=variant_col,
metric=metric,
covariates=covariates,
)

return prepared_df

def add_covariates(
Expand All @@ -321,6 +390,8 @@ def get_test_results(
dimension: Dimension,
dimension_value: str,
alpha: float,
split: Optional[Split] = None,
split_value: Optional[str] = None,
) -> AnalysisPlanResults:
"""
Performs the hypothesis test on the provided data, for the given dimension value.
Expand All @@ -341,6 +412,11 @@ def get_test_results(
The value of the dimension
alpha : float
The significance level to be used in the inference analysis.
split : Optional[Split], optional
The split instance to use for segmented analysis and cluster aggregation,
by default None
split_value : Optional[str], optional
The specific value of the split to filter on, by default None

Returns
-------
Expand All @@ -359,6 +435,11 @@ def get_test_results(
control_variant=control_variant,
dimension_name=dimension.name,
dimension_value=dimension_value,
split_name=split.name if split else None,
split_value=split_value,
cluster_cols=self.analysis_config.get("cluster_cols"),
metric=self.metric,
covariates=self.analysis_config.get("covariates", []),
)

inference_results = self.get_inference_results(df=prepared_df, alpha=alpha)
Expand All @@ -370,6 +451,11 @@ def get_test_results(
prepared_df.query(f"{variant_col}=='{treatment_variant.name}'")
)

has_real_dimensions = any(
not isinstance(d, DefaultDimension) for d in self.dimensions
)
has_real_splits = any(not isinstance(s, DefaultSplit) for s in self.splits)

test_results = AnalysisPlanResults(
metric_alias=[self.metric.alias],
control_variant_name=[control_variant.name],
Expand All @@ -382,8 +468,20 @@ def get_test_results(
ate_ci_upper=[inference_results.conf_int.upper],
p_value=[inference_results.p_value],
std_error=[inference_results.std_error],
dimension_name=[dimension.name],
dimension_value=[dimension_value],
dimension_name=(
[dimension.name] if has_real_dimensions else ["__total_dimension"]
),
dimension_value=[dimension_value] if has_real_dimensions else ["total"],
split_name=(
[split.name if split else "total"]
if has_real_splits
else ["__total_split"]
),
split_value=(
[split_value if split_value else "total"]
if has_real_splits
else ["total"]
),
alpha=[alpha],
)

Expand All @@ -409,11 +507,16 @@ def from_config(cls, config: dict) -> "HypothesisTest":
Dimension.from_metrics_config(dimension_config)
for dimension_config in config.get("dimensions", [])
]
splits = [
Split.from_metrics_config(split_config)
for split_config in config.get("splits", [])
]
return cls(
metric=metric,
analysis_type=config["analysis_type"],
analysis_config=config.get("analysis_config"),
dimensions=dimensions,
splits=splits,
cupac_config=config.get("cupac_config"),
custom_analysis_type_mapper=config.get("custom_analysis_type_mapper"),
)
Loading