from __future__ import annotations
import os
import warnings
from functools import cache
from typing import Any
import arviz as az
import numpy as np
import pandas as pd
import pymc as pm
import pytensor.tensor as pt
import scipy.stats as stats
from sklearn import preprocessing
from footix.models.score_matrix import GoalMatrix
from footix.utils.decorators import verify_required_column
from footix.utils.typing import SampleProbaResult
def _resolve_column_name(df: pd.DataFrame, candidates: tuple[str, ...]) -> str | None:
"""Resolve the first available column name among candidates.
Args:
df: Input dataframe.
candidates: Candidate column names in priority order.
Returns:
The first matching column name or None.
"""
for candidate in candidates:
if candidate in df.columns:
return candidate
return None
def _extract_optional_stats_data(df: pd.DataFrame) -> dict[str, Any]:
"""Extract optional match-statistics channels with per-channel validity masks.
Args:
df: Training dataframe.
Returns:
Dict with channel flags and masked arrays for shots, shots on target, and corners.
"""
result: dict[str, Any] = {
"has_shots": False,
"has_sot": False,
"has_corners": False,
"shots_idx": None,
"shots_home_log": None,
"shots_away_log": None,
"sot_idx": None,
"sot_home_log": None,
"sot_away_log": None,
"corners_idx": None,
"corners_home_log": None,
"corners_away_log": None,
}
channel_specs = [
(
"shots",
("hs", "HS"),
("as", "AS"),
"shots_idx",
"shots_home_log",
"shots_away_log",
"has_shots",
),
(
"sot",
("hst", "HST"),
("ast", "AST"),
"sot_idx",
"sot_home_log",
"sot_away_log",
"has_sot",
),
(
"corners",
("hc", "HC"),
("ac", "AC"),
"corners_idx",
"corners_home_log",
"corners_away_log",
"has_corners",
),
]
for (
_,
home_candidates,
away_candidates,
idx_key,
home_key,
away_key,
flag_key,
) in channel_specs:
home_col = _resolve_column_name(df, home_candidates)
away_col = _resolve_column_name(df, away_candidates)
if home_col is None or away_col is None:
continue
home_vals = pd.to_numeric(df[home_col], errors="coerce").to_numpy(dtype=np.float64)
away_vals = pd.to_numeric(df[away_col], errors="coerce").to_numpy(dtype=np.float64)
valid_mask = np.isfinite(home_vals) & np.isfinite(away_vals)
if not np.any(valid_mask):
continue
valid_idx = np.where(valid_mask)[0].astype(np.int64)
home_clipped = np.clip(home_vals[valid_mask], a_min=0.5, a_max=None)
away_clipped = np.clip(away_vals[valid_mask], a_min=0.5, a_max=None)
result[idx_key] = valid_idx
result[home_key] = np.log(home_clipped + 1e-5)
result[away_key] = np.log(away_clipped + 1e-5)
result[flag_key] = True
return result
[docs]
class BayesianModel:
"""Bayesian hierarchical model for football scores using a Negative Binomial likelihood.
Attributes
----------
n_teams : int
Number of distinct teams in the league.
n_goals : int
Maximum number of goals considered when computing score probabilities.
trace : arviz.InferenceData | None
Posterior samples after calling `fit`. None until the model is fitted.
"""
def __init__(
self,
n_goals: int,
n_teams: int | None = None,
calibrate: bool = False,
use_stats: bool = False,
):
self.n_teams = n_teams
self.n_goals = n_goals
self.calibrate = calibrate
self.use_stats = use_stats
self.trace: az.InferenceData | None = None
self.label = preprocessing.LabelEncoder()
[docs]
@verify_required_column(column_names={"home_team", "away_team", "fthg", "ftag"})
def fit(self, X_train: pd.DataFrame, sample_kwargs: dict[str, Any] | None = None):
x_train_cop = X_train.copy(deep=False)
teams = pd.concat([X_train["home_team"], X_train["away_team"]]).unique()
if self.n_teams is None:
self.n_teams = len(teams)
elif self.n_teams != len(teams):
raise ValueError(
f"Teams in training data do not match the initialized teams. "
f"Expected: {self.n_teams}, got: {teams}."
)
self.label.fit(teams) # type: ignore
x_train_cop["home_team_id"] = self.label.transform(X_train["home_team"])
x_train_cop["away_team_id"] = self.label.transform(X_train["away_team"])
goals_home_obs = x_train_cop["fthg"].to_numpy()
goals_away_obs = x_train_cop["ftag"].to_numpy()
home_team = x_train_cop["home_team_id"].to_numpy()
away_team = x_train_cop["away_team_id"].to_numpy()
optional_stats = _extract_optional_stats_data(x_train_cop) if self.use_stats else None
hierarchical_kwargs: dict[str, Any] = {"optional_stats": optional_stats}
if sample_kwargs is not None:
hierarchical_kwargs["sample_kwargs"] = sample_kwargs
self.trace = self.hierarchical_bayes(
goals_home_obs,
goals_away_obs,
home_team,
away_team,
**hierarchical_kwargs,
)
@cache
def _posterior_means(self) -> dict[str, np.ndarray]:
p = self.trace.posterior # type:ignore
self._cached_means = {
"home": p["home"].mean(("chain", "draw")).values,
"intercept": p["intercept"].mean(("chain", "draw")).values.item(),
"atts": p["atts"].mean(("chain", "draw")).values,
"defs": p["defs"].mean(("chain", "draw")).values,
}
if self.calibrate:
self._cached_means["tau"] = p["tau"].mean(("chain", "draw")).values.item()
self._cached_means["bias"] = p["bias"].mean(("chain", "draw")).values
return self._cached_means
def _apply_calibration(
self, probs: np.ndarray, tau: np.ndarray | float, bias: np.ndarray
) -> np.ndarray:
"""Apply calibration transformation to match outcome probabilities.
Parameters
----------
probs : np.ndarray
Raw probabilities for [Home, Draw, Away], shape (3,) or (n, 3)
tau : float
Temperature parameter for scaling
bias : np.ndarray
Class-wise bias for [Home, Draw, Away], shape (3,)
Returns
-------
np.ndarray
Calibrated probabilities with same shape as input
"""
eps = 1e-12
probs = np.clip(probs, eps, 1.0)
# Convert to logits
logits = np.log(probs + eps)
# Apply calibration: tau * logits + bias
calibrated_logits = tau * logits + bias
# Softmax to get calibrated probabilities
exp_logits = np.exp(calibrated_logits - np.max(calibrated_logits, axis=-1, keepdims=True))
calibrated_probs = exp_logits / np.sum(exp_logits, axis=-1, keepdims=True)
return calibrated_probs
[docs]
def predict(self, home_team: str, away_team: str) -> GoalMatrix:
home_id, away_id = self.label.transform([home_team, away_team])
home_mu, away_mu = self.goal_expectation(home_team_id=home_id, away_team_id=away_id)
ks = np.arange(self.n_goals)
home_pmf = stats.poisson.pmf(ks, home_mu)
away_pmf = stats.poisson.pmf(ks, away_mu)
goal_matrix = GoalMatrix(home_pmf, away_pmf)
# Apply calibration if enabled
if self.calibrate:
# Get raw match probabilities
raw_probas = goal_matrix.return_probas()
raw_probs = np.array(
[raw_probas.proba_home, raw_probas.proba_draw, raw_probas.proba_away]
)
# Get calibration parameters and apply transformation
means = self._posterior_means()
calibrated_probs = self._apply_calibration(raw_probs, means["tau"], means["bias"])
# Create a correlation matrix that adjusts the outcome probabilities
# to match the calibrated values while preserving the goal distribution shape
home_win_mask = np.tril(np.ones((self.n_goals, self.n_goals)), -1)
draw_mask = np.diag(np.ones(self.n_goals))
away_win_mask = np.triu(np.ones((self.n_goals, self.n_goals)), 1)
# Calculate scaling factors for each outcome
scale_home = calibrated_probs[0] / (raw_probs[0] + 1e-12)
scale_draw = calibrated_probs[1] / (raw_probs[1] + 1e-12)
scale_away = calibrated_probs[2] / (raw_probs[2] + 1e-12)
# Build correlation matrix
correlation_matrix = (
scale_home * home_win_mask + scale_draw * draw_mask + scale_away * away_win_mask
)
# Create new GoalMatrix with correlation applied
goal_matrix = GoalMatrix(home_pmf, away_pmf, correlation_matrix=correlation_matrix)
return goal_matrix
[docs]
def goal_expectation(self, home_team_id: int, away_team_id: int):
posterior = self.trace.posterior # type:ignore
# posterior means
home = posterior["home"].mean(dim=["chain", "draw"]).values
intercept = posterior["intercept"].mean(dim=["chain", "draw"]).values
atts = posterior["atts"].mean(dim=["chain", "draw"]).values
defs = posterior["defs"].mean(dim=["chain", "draw"]).values
# linear predictors → expected counts
home_mu = np.exp(intercept + home[home_team_id] + atts[home_team_id] + defs[away_team_id])
away_mu = np.exp(intercept + atts[away_team_id] + defs[home_team_id])
# return both expectations and the dispersion α
return home_mu, away_mu
[docs]
@cache
def get_samples(self, home_team: str, away_team: str, **kwargs: Any) -> SampleProbaResult:
"""Generates posterior predictive samples for the specified home and away teams based on
the model.
home_team (str): The name of the home team.
away_team (str): The name of the away team.
tuple[np.ndarray, np.ndarray]:
A tuple containing two one-dimensional numpy arrays:
- The first array represents the sampled lambda values for the home team.
- The second array represents the sampled lambda values for the away team.
Notes:
This function transforms the team names into their corresponding indices, retrieves
the posterior samples for model parameters from the trace, computes the expected
goal rates (lambda values) for both teams, and flattens the arrays to provide a
simplified output.
"""
if kwargs:
warnings.warn(
f"Ignoring unexpected keyword arguments: {list(kwargs.keys())}", stacklevel=2
)
home_team_id, away_team_id = self.label.transform([home_team, away_team])
_posterior = self.trace.posterior # type:ignore
home = _posterior["home"].stack(sample=("chain", "draw")).values
atts = _posterior["atts"].stack(sample=("chain", "draw")).values
defs = _posterior["defs"].stack(sample=("chain", "draw")).values
intercept = _posterior["intercept"].stack(sample=("chain", "draw")).values
n_samples = intercept.shape[0]
# Extract calibration parameters if enabled
if self.calibrate:
tau_samples = _posterior["tau"].stack(sample=("chain", "draw")).values
bias_samples = _posterior["bias"].stack(sample=("chain", "draw")).values
prob_H_list = []
prob_D_list = []
prob_A_list = []
for i in range(n_samples):
mu_home = np.exp(
intercept[i]
+ home[home_team_id, i]
+ atts[home_team_id, i]
+ defs[away_team_id, i]
)
mu_away = np.exp(intercept[i] + atts[away_team_id, i] + defs[home_team_id, i])
home_goals = np.random.poisson(mu_home, 150)
away_goals = np.random.poisson(mu_away, 150)
prob_H = np.mean(home_goals > away_goals)
prob_D = np.mean(home_goals == away_goals)
prob_A = np.mean(home_goals < away_goals)
# Apply calibration if enabled
if self.calibrate:
raw_probs = np.array([prob_H, prob_D, prob_A])
calibrated_probs = self._apply_calibration(
raw_probs, tau_samples[i], bias_samples[:, i]
)
prob_H, prob_D, prob_A = (
calibrated_probs[0],
calibrated_probs[1],
calibrated_probs[2],
)
prob_H_list.append(prob_H)
prob_D_list.append(prob_D)
prob_A_list.append(prob_A)
return SampleProbaResult(
proba_home=np.asarray(prob_H_list),
proba_draw=np.asarray(prob_D_list),
proba_away=np.asarray(prob_A_list),
)
[docs]
def hierarchical_bayes(
self,
goals_home_obs: np.ndarray,
goals_away_obs: np.ndarray,
home_team: np.ndarray,
away_team: np.ndarray,
optional_stats: dict[str, Any] | None = None,
sample_kwargs: dict[str, Any] | None = None,
) -> az.InferenceData:
optional_stats = optional_stats or {}
match_obs = np.where(
goals_home_obs > goals_away_obs, 0, np.where(goals_home_obs == goals_away_obs, 1, 2)
)
with pm.Model():
# Use pm.Data for the observed data and covariates
goals_home_data = pm.Data("goals_home", goals_home_obs)
goals_away_data = pm.Data("goals_away", goals_away_obs)
home_team_data = pm.Data("home_team", home_team)
away_team_data = pm.Data("away_team", away_team)
# Home advantage and intercept
home = pm.Normal("home", mu=0, sigma=1, shape=self.n_teams)
intercept = pm.Normal("intercept", mu=3, sigma=1)
# Attack ratings with non-centered parameterization
tau_att = pm.HalfNormal("tau_att", sigma=2)
raw_atts = pm.Normal("raw_atts", mu=0, sigma=1, shape=self.n_teams)
atts_uncentered = raw_atts * tau_att
atts = pm.Deterministic("atts", atts_uncentered - pm.math.mean(atts_uncentered))
# Defence ratings with non-centered parameterization
tau_def = pm.HalfNormal("tau_def", sigma=2)
raw_defs = pm.Normal("raw_defs", mu=0, sigma=1, shape=self.n_teams)
defs_uncentered = raw_defs * tau_def
defs = pm.Deterministic("defs", defs_uncentered - pm.math.mean(defs_uncentered))
# Calculate theta for home and away
home_theta = pm.math.exp(
intercept + home[home_team_data] + atts[home_team_data] + defs[away_team_data]
)
away_theta = pm.math.exp(intercept + atts[away_team_data] + defs[home_team_data])
pm.Poisson(
"home_goals",
mu=home_theta,
observed=goals_home_data,
)
pm.Poisson(
"away_goals",
mu=away_theta,
observed=goals_away_data,
)
# Optional auxiliary channels from match statistics
if optional_stats.get("has_shots", False):
beta_shots = pm.Normal("beta_shots", mu=2.5, sigma=0.5)
sigma_shots = pm.HalfNormal("sigma_shots", sigma=0.4)
shots_idx = optional_stats["shots_idx"]
shots_home_team = home_team[shots_idx]
shots_away_team = away_team[shots_idx]
mu_shots_home = beta_shots + atts[shots_home_team] + defs[shots_away_team]
mu_shots_away = beta_shots + atts[shots_away_team] + defs[shots_home_team]
pm.Normal(
"log_shots_home_obs",
mu=mu_shots_home,
sigma=sigma_shots,
observed=optional_stats["shots_home_log"],
)
pm.Normal(
"log_shots_away_obs",
mu=mu_shots_away,
sigma=sigma_shots,
observed=optional_stats["shots_away_log"],
)
if optional_stats.get("has_sot", False):
beta_sot = pm.Normal("beta_sot", mu=1.5, sigma=0.5)
sigma_sot = pm.HalfNormal("sigma_sot", sigma=0.4)
sigma_theta = pm.HalfNormal("sigma_theta", sigma=0.2)
theta_raw = pm.Normal("theta_raw", mu=0.0, sigma=sigma_theta, shape=self.n_teams)
theta = pm.Deterministic("theta", theta_raw - pm.math.mean(theta_raw))
sot_idx = optional_stats["sot_idx"]
sot_home_team = home_team[sot_idx]
sot_away_team = away_team[sot_idx]
mu_sot_home = (
beta_sot + atts[sot_home_team] + defs[sot_away_team] + theta[sot_home_team]
)
mu_sot_away = (
beta_sot + atts[sot_away_team] + defs[sot_home_team] + theta[sot_away_team]
)
pm.Normal(
"log_sot_home_obs",
mu=mu_sot_home,
sigma=sigma_sot,
observed=optional_stats["sot_home_log"],
)
pm.Normal(
"log_sot_away_obs",
mu=mu_sot_away,
sigma=sigma_sot,
observed=optional_stats["sot_away_log"],
)
if optional_stats.get("has_corners", False):
beta_corners = pm.Normal("beta_corners", mu=1.5, sigma=0.5)
sigma_corners = pm.HalfNormal("sigma_corners", sigma=0.5)
corners_idx = optional_stats["corners_idx"]
corners_home_team = home_team[corners_idx]
corners_away_team = away_team[corners_idx]
mu_corners_home = beta_corners + atts[corners_home_team] + defs[corners_away_team]
mu_corners_away = beta_corners + atts[corners_away_team] + defs[corners_home_team]
pm.Normal(
"log_corners_home_obs",
mu=mu_corners_home,
sigma=sigma_corners,
observed=optional_stats["corners_home_log"],
)
pm.Normal(
"log_corners_away_obs",
mu=mu_corners_away,
sigma=sigma_corners,
observed=optional_stats["corners_away_log"],
)
if self.calibrate:
match_res_data = pm.Data("match_res", match_obs)
eps = 1e-12 # tiny floor
p_H = pm.Deterministic(
"p_H", var=p_skellam_gt0_continuity(mu1=home_theta, mu2=away_theta)
)
p_D = pm.Deterministic("p_D", var=p_skellam_eq0(mu1=home_theta, mu2=away_theta))
p_A = pm.Deterministic("p_A", var=1 - p_H - p_D)
match_probs_raw = pt.stack([p_H, p_D, p_A], axis=1)
match_probs_raw = pt.clip(match_probs_raw, eps, 1.0) # strictly > 0
match_probs_raw = match_probs_raw / match_probs_raw.sum(axis=1, keepdims=True)
# Calibration layer: temperature scaling + class-wise bias
tau = pm.Normal("tau", mu=1.0, sigma=0.3) # temperature parameter
bias = pm.Normal("bias", mu=0.0, sigma=0.3, shape=3) # class-wise bias (H, D, A)
# Apply calibration: tau * log(probs) + bias, then softmax
logit_raw = pm.math.log(match_probs_raw + eps) # safe log
calibrated_logits = tau * logit_raw + bias
match_probs = pm.Deterministic(
"match_probs", pt.special.softmax(calibrated_logits, axis=1)
)
pm.Categorical("match_outcomes", p=match_probs, observed=match_res_data)
inference_kwargs: dict[str, Any] = {
"draws": 2000,
"tune": 1000,
"cores": min(4, os.cpu_count() or 1),
"nuts_sampler": "blackjax",
"init": "adapt_diag_grad",
"target_accept": 0.95,
"return_inferencedata": True,
}
if sample_kwargs is not None:
inference_kwargs.update(sample_kwargs)
trace = pm.sample(**inference_kwargs)
return trace
[docs]
def p_skellam_gt0_continuity(mu1, mu2, eps=1e-9):
"""Approx P(K>0) for K ~ Skellam(mu1, mu2) using normal approx + continuity correction.
Returns a PyTensor node usable inside a PyMC model.
"""
var = pm.math.maximum(mu1 + mu2, eps) # stabilité num.
z = (mu1 - mu2 - 0.5) / pm.math.sqrt(var) # correction de continuité
# Φ(z) : CDF normale standard
return (1.0 + pm.math.erf(z / pm.math.sqrt(2.0))) / 2.0
[docs]
def p_skellam_eq0(mu1, mu2, eps=1e-9):
"""Probabilité exacte P(K = 0) pour K ~ Skellam(mu1, mu2) = exp(-(mu1+mu2)) * I0(
2*sqrt(mu1*mu2) )
- mu1, mu2 peuvent être scalaires ou tenseurs aléatoires (stochastiques du modèle)
- eps évite les problèmes de dérivées pour mu1*mu2 = 0
"""
x = 2.0 * pm.math.sqrt(pm.math.maximum(mu1 * mu2, eps))
return pm.math.exp(-(mu1 + mu2)) * pt.i0(x) # pt.i0 : Bessel I0