Source code for footix.data_io.prediction_export

"""Prediction export utilities for model predictions.

This module transforms model outputs into a normalized JSON-compatible structure
for prediction record consumers.
"""

from __future__ import annotations

import re
import unicodedata
from datetime import datetime, timedelta, timezone
from typing import Any, Callable, Mapping, Protocol, Sequence

import numpy as np
from scipy.stats import gaussian_kde

from footix.metrics import confidence_1x2_from_samples, confidence_curve
from footix.models.score_matrix import GoalMatrix
from footix.utils.typing import SampleProbaResult


def _format_distribution_bin(value: float) -> str:
    rounded = float(np.round(value, decimals=2))
    if np.isclose(rounded * 10.0, np.round(rounded * 10.0)):
        return f"{rounded:.1f}"
    return f"{rounded:.2f}"


DEFAULT_DISTRIBUTION_BINS: tuple[str, ...] = tuple(
    _format_distribution_bin(value) for value in np.linspace(0.0, 1.0, 21)
)
DEFAULT_DISTRIBUTION_EDGES = np.linspace(0.0, 1.0, len(DEFAULT_DISTRIBUTION_BINS))
DEFAULT_DISTRIBUTION_COUNT = len(DEFAULT_DISTRIBUTION_BINS) - 1
DEFAULT_FALLBACK_TIME = "20:00:00"
DEFAULT_TZ = timezone(timedelta(hours=1))


class PredictionExportModel(Protocol):
    """Protocol for models that can generate prediction-export inputs."""

    def predict(self, home_team: str, away_team: str, **kwargs: Any) -> GoalMatrix: ...

    def get_samples(self, home_team: str, away_team: str, **kwargs: Any) -> SampleProbaResult: ...


def _round_float(value: float, decimals: int = 4) -> float:
    return float(np.round(float(value), decimals=decimals))


def _format_confidence_score(value: float) -> float:
    bounded = float(np.clip(value, 0.0, 1.0))
    return float(np.round(bounded, decimals=2))


def _normalize_prob_vector(values: np.ndarray) -> np.ndarray:
    clipped = np.clip(np.asarray(values, dtype=np.float64), 0.0, 1.0)
    total = float(clipped.sum())
    if not np.isfinite(total) or total <= 0.0:
        return np.array([1.0 / clipped.size] * clipped.size, dtype=np.float64)
    return clipped / total


def _slugify(text: str, compact: bool = False) -> str:
    normalized = unicodedata.normalize("NFKD", text)
    ascii_text = normalized.encode("ascii", "ignore").decode("ascii")
    lowercase = ascii_text.lower().replace("'", " ")
    if compact:
        compact_text = re.sub(r"[^a-z0-9]+", "", lowercase)
        return compact_text or "unknown"
    dashed = re.sub(r"[^a-z0-9]+", "-", lowercase).strip("-")
    return re.sub(r"-{2,}", "-", dashed) or "unknown"


def _resolve_mapping_value(
    values: Mapping[str, Any] | None,
    candidates: tuple[str, ...],
) -> Any | None:
    if values is None:
        return None
    for candidate in candidates:
        if candidate in values:
            candidate_value = values[candidate]
            if candidate_value is None:
                continue
            if isinstance(candidate_value, str) and candidate_value.strip() == "":
                continue
            return candidate_value
    return None


def _league_from_payload(
    fixture: Mapping[str, Any], payload_metadata: Mapping[str, Any] | None
) -> str:
    from_fields = _resolve_mapping_value(
        fixture,
        ("league", "competition", "championship"),
    )
    if from_fields is None:
        from_fields = _resolve_mapping_value(
            payload_metadata,
            ("league", "competition", "championship"),
        )
    if isinstance(from_fields, str) and from_fields.strip():
        return from_fields.strip()

    league_url = _resolve_mapping_value(payload_metadata, ("league_url",))
    if isinstance(league_url, str):
        lowered = league_url.lower()
        if "ligue-1" in lowered:
            return "Ligue 1"
        if "ligue-2" in lowered:
            return "Ligue 2"
        if "premier-league" in lowered:
            return "Premier League"
        if "bundesliga" in lowered:
            return "Bundesliga"

    return "Unknown League"


def _parse_kickoff_iso(
    fixture: Mapping[str, Any], payload_metadata: Mapping[str, Any] | None
) -> str:
    kickoff_raw = _resolve_mapping_value(
        fixture,
        ("kickoff", "match_time", "datetime", "date", "kickoff_paris"),
    )
    if kickoff_raw is None:
        kickoff_raw = _resolve_mapping_value(payload_metadata, ("scraped_at_paris",))
    if kickoff_raw is None:
        now_dt = datetime.now(DEFAULT_TZ)
        return now_dt.isoformat(timespec="seconds")

    kickoff_str = str(kickoff_raw).strip()
    if len(kickoff_str) == 10 and re.match(r"^\d{4}-\d{2}-\d{2}$", kickoff_str):
        kickoff_str = f"{kickoff_str}T{DEFAULT_FALLBACK_TIME}"

    try:
        kickoff_dt = datetime.fromisoformat(kickoff_str.replace("Z", "+00:00"))
    except ValueError:
        for fmt in ("%d/%m/%Y", "%Y-%m-%d", "%d-%m-%Y"):
            try:
                parsed_date = datetime.strptime(kickoff_str, fmt)
                kickoff_dt = datetime(
                    parsed_date.year,
                    parsed_date.month,
                    parsed_date.day,
                    20,
                    0,
                    0,
                )
                break
            except ValueError:
                continue
        else:
            fallback = datetime.now(DEFAULT_TZ)
            return fallback.isoformat(timespec="seconds")

    if kickoff_dt.tzinfo is None:
        kickoff_dt = kickoff_dt.replace(tzinfo=DEFAULT_TZ)
    return kickoff_dt.isoformat(timespec="seconds")


def _kde_distribution_from_samples(values: np.ndarray) -> list[float]:
    """Build probability masses from KDE-smoothed posterior samples."""
    cleaned = np.asarray(values, dtype=np.float64)
    cleaned = cleaned[np.isfinite(cleaned)]
    if cleaned.size == 0:
        base = np.zeros(DEFAULT_DISTRIBUTION_COUNT, dtype=np.float64)
        base[0] = 1.0
        return [_round_float(x) for x in base]

    clipped = np.clip(cleaned, 0.0, 1.0)
    if clipped.size <= 1 or np.allclose(clipped, clipped[0]):
        base = np.zeros(DEFAULT_DISTRIBUTION_COUNT, dtype=np.float64)
        idx = int(
            np.clip(
                np.floor(clipped.mean() * DEFAULT_DISTRIBUTION_COUNT),
                0,
                DEFAULT_DISTRIBUTION_COUNT - 1,
            )
        )
        base[idx] = 1.0
        return [_round_float(x) for x in base]

    try:
        kde = gaussian_kde(clipped)
        grid = np.linspace(0.0, 1.0, 200)
        density = np.clip(kde(grid), 0.0, None)

        edges = DEFAULT_DISTRIBUTION_EDGES
        masses = np.zeros(DEFAULT_DISTRIBUTION_COUNT, dtype=np.float64)
        for i in range(DEFAULT_DISTRIBUTION_COUNT):
            if i < DEFAULT_DISTRIBUTION_COUNT - 1:
                mask = (grid >= edges[i]) & (grid < edges[i + 1])
            else:
                mask = (grid >= edges[i]) & (grid <= edges[i + 1])

            x_slice = grid[mask]
            y_slice = density[mask]
            if x_slice.size >= 2:
                masses[i] = float(np.trapezoid(y_slice, x=x_slice))
            else:
                center = 0.5 * (edges[i] + edges[i + 1])
                masses[i] = max(float(kde(center)), 0.0) * 0.1

        norm_masses = _normalize_prob_vector(masses)
        return [_round_float(x) for x in norm_masses]
    except Exception:
        hist, _ = np.histogram(clipped, bins=DEFAULT_DISTRIBUTION_EDGES, density=False)
        norm_hist = _normalize_prob_vector(hist.astype(np.float64))
        return [_round_float(x) for x in norm_hist]


def _score_matrix_5x5(matrix_array: np.ndarray) -> list[list[float]]:
    source = np.asarray(matrix_array, dtype=np.float64)
    if source.ndim != 2:
        source = np.zeros((5, 5), dtype=np.float64)

    rows = min(5, source.shape[0])
    cols = min(5, source.shape[1])
    output = np.zeros((5, 5), dtype=np.float64)
    output[:rows, :cols] = source[:rows, :cols]
    output = np.clip(output, 0.0, 1.0)
    return [[_round_float(x) for x in row] for row in output]


def _sanitize_distribution(values: list[float]) -> list[float]:
    arr = np.asarray(values, dtype=np.float64)
    if arr.size > DEFAULT_DISTRIBUTION_COUNT:
        arr = arr[:DEFAULT_DISTRIBUTION_COUNT]
    if arr.size < DEFAULT_DISTRIBUTION_COUNT:
        arr = np.pad(
            arr,
            (0, DEFAULT_DISTRIBUTION_COUNT - arr.size),
            mode="constant",
            constant_values=0.0,
        )
    arr = _normalize_prob_vector(arr)
    return [_round_float(x) for x in arr]


def _sanitize_prediction(
    home_win: float, draw: float, away_win: float
) -> tuple[float, float, float]:
    normalized = _normalize_prob_vector(np.array([home_win, draw, away_win], dtype=np.float64))
    return _round_float(normalized[0]), _round_float(normalized[1]), _round_float(normalized[2])


def _has_invalid_values(obj: Any) -> bool:
    if obj is None:
        return True
    if isinstance(obj, float):
        return not np.isfinite(obj)
    if isinstance(obj, str):
        return obj.strip() == ""
    if isinstance(obj, list):
        return any(_has_invalid_values(item) for item in obj)
    if isinstance(obj, dict):
        return any(_has_invalid_values(v) for v in obj.values())
    return False


def _validate_record_shape(record: Mapping[str, Any]) -> tuple[bool, str | None]:
    required_fields = (
        "id",
        "league",
        "kickoff",
        "home_team",
        "away_team",
        "prediction",
        "home_distribution",
        "draw_distribution",
        "away_distribution",
        "distribution_bins",
        "score_matrix",
    )
    for field in required_fields:
        if field not in record:
            return False, f"missing field '{field}'"
    prediction = record["prediction"]
    if not isinstance(prediction, dict):
        return False, "prediction must be an object"
    total = (
        float(prediction.get("home_win_prob", 0.0))
        + float(prediction.get("draw_prob", 0.0))
        + float(prediction.get("away_win_prob", 0.0))
    )
    if abs(total - 1.0) > 0.01:
        return False, "prediction probabilities do not sum to 1"
    if len(record["distribution_bins"]) != len(DEFAULT_DISTRIBUTION_BINS):
        return False, f"distribution_bins must have length {len(DEFAULT_DISTRIBUTION_BINS)}"
    expected_distribution_length = len(record["distribution_bins"]) - 1
    if len(record["home_distribution"]) != expected_distribution_length:
        return False, f"home_distribution must have length {expected_distribution_length}"
    if len(record["draw_distribution"]) != expected_distribution_length:
        return False, f"draw_distribution must have length {expected_distribution_length}"
    if len(record["away_distribution"]) != expected_distribution_length:
        return False, f"away_distribution must have length {expected_distribution_length}"
    score_matrix = record["score_matrix"]
    if len(score_matrix) != 5 or any(len(row) != 5 for row in score_matrix):
        return False, "score_matrix must be 5x5"
    if _has_invalid_values(record):
        return False, "record contains null, NaN, or empty values"
    return True, None


def _build_match_identifier(league: str, home_team: str, away_team: str, kickoff_iso: str) -> str:
    kickoff_date = kickoff_iso.split("T", maxsplit=1)[0]
    league_slug = _slugify(league, compact=True)
    home_slug = _slugify(home_team, compact=False)
    away_slug = _slugify(away_team, compact=False)
    return f"{league_slug}-{home_slug}-{away_slug}-{kickoff_date}"


[docs] def build_prediction_records_from_predictions( fixtures: Sequence[Mapping[str, Any]], goal_matrices: Mapping[str, GoalMatrix], samples: Mapping[str, SampleProbaResult], payload_metadata: Mapping[str, Any] | None = None, team_normalizer: Callable[[str], str] | None = None, confidence_gamma: float | None = 0.7, ) -> tuple[list[dict[str, Any]], list[dict[str, str]]]: """Build prediction records from existing prediction artifacts. Args: fixtures: Raw fixtures payload from odds JSON. goal_matrices: Mapping from match key to score matrix predictions. samples: Mapping from match key to posterior probability samples. payload_metadata: Optional metadata extracted from odds payload. team_normalizer: Optional callable for team-name normalization. Returns: Tuple of valid records and technical error reports. """ records: list[dict[str, Any]] = [] errors: list[dict[str, str]] = [] for fixture in fixtures: raw_home = _resolve_mapping_value(fixture, ("home_team", "team_home", "home")) raw_away = _resolve_mapping_value(fixture, ("away_team", "team_away", "away")) if raw_home is None or raw_away is None: errors.append({"match": "unknown", "error": "missing home/away"}) continue home_team = str(raw_home).replace("Cotes", "").strip() away_team = str(raw_away).strip() if team_normalizer is not None: home_team = team_normalizer(home_team) away_team = team_normalizer(away_team) match_key = f"{home_team} - {away_team}" goal_matrix = goal_matrices.get(match_key) sample_result = samples.get(match_key) if goal_matrix is None: errors.append({"match": match_key, "error": "missing goal matrix"}) continue if sample_result is None: errors.append({"match": match_key, "error": "missing posterior samples"}) continue league = _league_from_payload(fixture, payload_metadata) kickoff_iso = _parse_kickoff_iso(fixture, payload_metadata) proba_result = goal_matrix.return_probas() p_home, p_draw, p_away = _sanitize_prediction( proba_result.proba_home, proba_result.proba_draw, proba_result.proba_away, ) confidence_components = confidence_1x2_from_samples(sample_result) raw_confidence = float(np.clip(confidence_components.confidence, 0.0, 100.0)) / 100.0 confidence_score = ( _format_confidence_score( confidence_curve(raw_confidence * 100.0, gamma=confidence_gamma) / 100.0 ) if confidence_gamma is not None else _format_confidence_score(raw_confidence) ) home_distribution = _sanitize_distribution( _kde_distribution_from_samples(np.asarray(sample_result.proba_home, dtype=np.float64)) ) draw_distribution = _sanitize_distribution( _kde_distribution_from_samples(np.asarray(sample_result.proba_draw, dtype=np.float64)) ) away_distribution = _sanitize_distribution( _kde_distribution_from_samples(np.asarray(sample_result.proba_away, dtype=np.float64)) ) gameweek_value = _resolve_mapping_value(fixture, ("gameweek", "game_week")) parsed_gameweek: int | None = None if gameweek_value is not None: try: parsed_gameweek = int(gameweek_value) except (TypeError, ValueError): parsed_gameweek = None record: dict[str, object] = { "id": _build_match_identifier(league, home_team, away_team, kickoff_iso), "league": league, "kickoff": kickoff_iso, "home_team": home_team, "away_team": away_team, "prediction": { "home_win_prob": p_home, "draw_prob": p_draw, "away_win_prob": p_away, "confidence_score": confidence_score, }, "home_distribution": home_distribution, "draw_distribution": draw_distribution, "away_distribution": away_distribution, "distribution_bins": list(DEFAULT_DISTRIBUTION_BINS), "score_matrix": _score_matrix_5x5(goal_matrix.matrix_array), } if parsed_gameweek is not None: record["gameweek"] = parsed_gameweek is_valid, validation_error = _validate_record_shape(record) if not is_valid: errors.append({"match": match_key, "error": validation_error or "invalid record"}) continue records.append(record) records.sort(key=lambda match: match["kickoff"]) return records, errors
[docs] def export_prediction_records_from_model( model: PredictionExportModel, fixtures: Sequence[Mapping[str, Any]], payload_metadata: Mapping[str, Any] | None = None, team_normalizer: Callable[[str], str] | None = None, predict_kwargs: Mapping[str, Any] | None = None, sample_kwargs: Mapping[str, Any] | None = None, confidence_gamma: float | None = 0.7, ) -> tuple[list[dict[str, Any]], list[dict[str, str]]]: """Compute predictions from a model and export prediction records. Args: model: Predictive model supporting predict/get_samples. fixtures: Raw fixtures payload from odds JSON. payload_metadata: Optional metadata extracted from odds payload. team_normalizer: Optional callable for team-name normalization. predict_kwargs: Optional extra kwargs forwarded to predict. sample_kwargs: Optional extra kwargs forwarded to get_samples. Returns: Tuple of valid records and technical error reports. """ predict_kwargs = dict(predict_kwargs or {}) sample_kwargs = dict(sample_kwargs or {}) goal_matrices: dict[str, GoalMatrix] = {} sample_map: dict[str, SampleProbaResult] = {} errors: list[dict[str, str]] = [] for fixture in fixtures: raw_home = _resolve_mapping_value(fixture, ("home_team", "team_home", "home")) raw_away = _resolve_mapping_value(fixture, ("away_team", "team_away", "away")) if raw_home is None or raw_away is None: errors.append({"match": "unknown", "error": "missing home/away"}) continue home_team = str(raw_home).replace("Cotes", "").strip() away_team = str(raw_away).strip() if team_normalizer is not None: home_team = team_normalizer(home_team) away_team = team_normalizer(away_team) match_key = f"{home_team} - {away_team}" try: goal_matrices[match_key] = model.predict(home_team, away_team, **predict_kwargs) sample_map[match_key] = model.get_samples(home_team, away_team, **sample_kwargs) except Exception as exc: # pragma: no cover errors.append({"match": match_key, "error": str(exc)}) records, build_errors = build_prediction_records_from_predictions( fixtures=fixtures, goal_matrices=goal_matrices, samples=sample_map, payload_metadata=payload_metadata, team_normalizer=team_normalizer, confidence_gamma=confidence_gamma, ) return records, errors + build_errors
__all__ = [ "build_prediction_records_from_predictions", "export_prediction_records_from_model", ]