| """ |
| GiftEval evaluation script for Reverso. |
| """ |
| import os |
| import json |
| import math |
| import argparse |
| import csv |
| from types import SimpleNamespace |
| from typing import List, Optional, Tuple |
| from datetime import datetime |
|
|
| import numpy as np |
| import torch |
| import pandas as pd |
|
|
| from reverso.forecast import load_checkpoint |
|
|
| try: |
| from torch.cuda.amp import autocast as autocast_fp |
| except Exception: |
| autocast_fp = None |
|
|
| def numpy_fill(arr: np.ndarray) -> np.ndarray: |
| mask = np.isnan(arr) |
| idx = np.where(~mask, np.arange(mask.shape[1]), 0) |
| np.maximum.accumulate(idx, axis=1, out=idx) |
| out = arr[np.arange(idx.shape[0])[:, None], idx] |
| return out |
|
|
|
|
| class ReversoPredictor: |
| """GiftEval predictor for reverso.Model.""" |
|
|
| def __init__( |
| self, |
| prediction_length: int, |
| checkpoint_path: Optional[str] = None, |
| device: str = "cuda", |
| seq_len: int = 2048, |
| input_token_len: int = 2048, |
| output_token_len: int = 48, |
| e_layers: int = 8, |
| d_model: int = 128, |
| d_intermediate: int = 512, |
| output_bottleneck_dim: int = 48, |
| expand_v: float = 1.0, |
| state_weaving: int = 1, |
| gating_kernel_size: int = 3, |
| main_module: str = "conv,attn,conv,attn,conv,attn,conv,attn", |
| num_samples: int = 100, |
| batch_size: int = 256, |
| use_amp: int = 1, |
| downsample_factor: int = 1, |
| force_flip_invariance: bool = False, |
| ): |
| self.device = torch.device(device if torch.cuda.is_available() else "cpu") |
| self.prediction_length = int(prediction_length) |
| self.num_samples = int(num_samples) |
| self.batch_size = int(batch_size) |
| self.seq_len = int(seq_len) |
| self.input_token_len = int(input_token_len) |
| self.output_token_len = int(output_token_len) |
| self.use_amp = int(use_amp) |
| self.downsample_factor = int(downsample_factor) |
| self.force_flip_invariance = bool(force_flip_invariance) |
|
|
| args = SimpleNamespace( |
| input_token_len=self.input_token_len, |
| output_token_len=self.output_token_len, |
| seq_len=self.seq_len, |
| d_model=int(d_model), |
| d_intermediate=int(d_intermediate), |
| use_norm=True, |
| learn_bias=1, |
| output_bottleneck_dim=int(output_bottleneck_dim), |
| expand_v=float(expand_v), |
| state_weaving=int(state_weaving), |
| gating_kernel_size=int(gating_kernel_size), |
| main_module=str(main_module), |
| ) |
|
|
| from reverso import model as model_impl |
| try: |
| self.model = model_impl.Model(args).to(self.device) |
| except RuntimeError as e: |
| if "CUDA" in str(e): |
| print(f"CUDA not usable ({e}); falling back to CPU.") |
| self.device = torch.device("cpu") |
| self.use_amp = 0 |
| self.model = model_impl.Model(args).to(self.device) |
| else: |
| raise |
| self.model.eval() |
|
|
| if checkpoint_path is not None and os.path.isfile(checkpoint_path): |
| self._load_checkpoint(checkpoint_path) |
| else: |
| print("Warning: checkpoint_path not provided or file not found. Using randomly initialized weights.") |
|
|
| def _load_checkpoint(self, ckpt_path: str): |
| load_checkpoint(self.model, ckpt_path, device=str(self.device)) |
|
|
| def _downsample_if_needed(self, series: torch.Tensor) -> Tuple[torch.Tensor, int]: |
| cur = series |
| if self.downsample_factor > 1: |
| cur = cur[::self.downsample_factor] |
| return cur, self.downsample_factor |
|
|
| def _left_pad_to_len(self, arr: np.ndarray, target_len: int) -> Tuple[np.ndarray, int]: |
| if arr.shape[0] >= target_len: |
| return arr[-target_len:], 0 |
| pad_len = target_len - arr.shape[0] |
| fill_value = arr[0] if arr.shape[0] > 0 else 0.0 |
| padding = np.full((pad_len,), fill_value, dtype=arr.dtype) |
| return np.concatenate([padding, arr], axis=0), pad_len |
|
|
| def _prepare_context_matrix(self, context: List[torch.Tensor]) -> Tuple[torch.Tensor, List[int]]: |
| xs = [] |
| downsample_factors = [] |
|
|
| for c in context: |
| cur, downsample_factor = self._downsample_if_needed(c) |
| downsample_factors.append(downsample_factor) |
|
|
| cur_np = cur.detach().cpu().float().numpy() |
| cur_np, _ = self._left_pad_to_len(cur_np, self.seq_len) |
|
|
| x2d = cur_np[None, :] |
| x_interp = np.copy(x2d) |
| series = x2d[0] |
| if np.any(np.isnan(series)): |
| valid_mask = ~np.isnan(series) |
| if np.sum(valid_mask) >= 2: |
| valid_indices = np.where(valid_mask)[0] |
| valid_values = series[valid_mask] |
| x_interp[0] = np.interp(np.arange(len(series)), valid_indices, valid_values) |
| else: |
| x_interp = numpy_fill(x2d) |
| ff = numpy_fill(x_interp) |
| bf = np.flip(numpy_fill(np.flip(x_interp, axis=1)), axis=1) |
| x_imp = np.where(np.isnan(ff), bf, ff) |
| x_imp = np.where(np.isnan(x_imp), 0.0, x_imp) |
| xs.append(x_imp[0]) |
|
|
| x = torch.tensor(np.stack(xs), device=self.device, dtype=torch.float32).unsqueeze(-1) |
| return x, downsample_factors |
|
|
| def _decode_autoregressive(self, init_ctx: torch.Tensor, use_bf16: bool, downsample_factors: List[int]) -> torch.Tensor: |
| B, _, C = init_ctx.shape |
| roll_len = int(self.output_token_len) |
|
|
| target_pred_lens = [int(self.prediction_length) // int(max(1, df)) for df in downsample_factors] |
| max_target_pred_len = max(target_pred_lens) |
| steps = math.ceil(max_target_pred_len / roll_len) |
| preds: List[torch.Tensor] = [] |
| batch_ctx = init_ctx |
|
|
| y_mark = torch.zeros(B, self.output_token_len, C, device=self.device, dtype=init_ctx.dtype) |
|
|
| for _ in range(steps): |
| x_in = batch_ctx[:, -self.seq_len:, :] |
| x_mark = torch.zeros_like(x_in) |
|
|
| if autocast_fp is not None and self.use_amp and use_bf16: |
| try: |
| with autocast_fp(dtype=torch.bfloat16): |
| outputs = self.model(x_in, x_mark, y_mark) |
| except Exception: |
| outputs = self.model(x_in, x_mark, y_mark) |
| else: |
| outputs = self.model(x_in, x_mark, y_mark) |
|
|
| out_chunk = outputs[:, -self.output_token_len:, :] |
| take_chunk = out_chunk[:, :roll_len, :] |
| preds.append(take_chunk) |
| batch_ctx = torch.cat([batch_ctx, take_chunk], dim=1) |
|
|
| return torch.cat(preds, dim=1) |
|
|
| @torch.no_grad() |
| def predict(self, test_data_input, use_bf16_if_available: bool = True): |
| from gluonts.itertools import batcher |
| from gluonts.model.forecast import SampleForecast |
|
|
| forecasts = [] |
| use_bf16 = bool( |
| use_bf16_if_available |
| and self.device.type == "cuda" |
| and torch.cuda.is_available() |
| and torch.cuda.is_bf16_supported() |
| ) |
|
|
| for batch in batcher(test_data_input, batch_size=self.batch_size): |
| targets = [torch.tensor(entry["target"], dtype=torch.float32) for entry in batch] |
| batch_ctx, downsample_factors = self._prepare_context_matrix(targets) |
|
|
| pred_pos = self._decode_autoregressive(batch_ctx, use_bf16, downsample_factors) |
| if self.force_flip_invariance: |
| pred_neg = self._decode_autoregressive(-batch_ctx, use_bf16, downsample_factors) |
| pred_full = 0.5 * (pred_pos - pred_neg) |
| else: |
| pred_full = pred_pos |
|
|
| if torch.isnan(pred_full).any(): |
| pf_2d = pred_full.squeeze(-1).detach().cpu().numpy() |
| pf_2d = numpy_fill(pf_2d) |
| pred_full = torch.tensor(pf_2d, device=pred_full.device, dtype=pred_full.dtype).unsqueeze(-1) |
|
|
| pred_full_np = pred_full.float().squeeze(-1).detach().cpu().numpy() |
| pred_list = [] |
| for i in range(len(downsample_factors)): |
| df = downsample_factors[i] |
| target_pred_len = int(self.prediction_length) // int(max(1, df)) |
| seq = pred_full_np[i, :target_pred_len] |
| if df > 1: |
| old_len = len(seq) |
| new_len = int(self.prediction_length) |
| seq = np.interp(np.linspace(0, 1, new_len), np.linspace(0, 1, old_len), seq) |
| pred_list.append(seq) |
| pred_full_np = np.array(pred_list) |
|
|
| for i, ts in enumerate(batch): |
| start_date = ts["start"] + len(ts["target"]) |
| samples = np.repeat(pred_full_np[i][None, :], self.num_samples, axis=0) |
| forecasts.append(SampleForecast(samples=samples, start_date=start_date)) |
|
|
| return forecasts |
|
|
|
|
| |
| |
| |
| from gluonts.ev.metrics import ( |
| MAE, MAPE, MASE, MSE, MSIS, ND, NRMSE, RMSE, SMAPE, |
| MeanWeightedSumQuantileLoss, |
| ) |
|
|
| METRICS = [ |
| MSE(forecast_type="mean"), |
| MSE(forecast_type=0.5), |
| MAE(), |
| MASE(), |
| MAPE(), |
| SMAPE(), |
| MSIS(), |
| RMSE(), |
| NRMSE(), |
| ND(), |
| MeanWeightedSumQuantileLoss(quantile_levels=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]), |
| ] |
|
|
| PRETTY_NAMES = { |
| "saugeenday": "saugeen", |
| "temperature_rain_with_missing": "temperature_rain", |
| "kdd_cup_2018_with_missing": "kdd_cup_2018", |
| "car_parts_with_missing": "car_parts", |
| } |
|
|
| SHORT_DATASETS = "m4_yearly m4_quarterly m4_monthly m4_weekly m4_daily m4_hourly electricity/15T electricity/H electricity/D electricity/W solar/10T solar/H solar/D solar/W hospital covid_deaths us_births/D us_births/M us_births/W saugeenday/D saugeenday/M saugeenday/W temperature_rain_with_missing kdd_cup_2018_with_missing/H kdd_cup_2018_with_missing/D car_parts_with_missing restaurant hierarchical_sales/D hierarchical_sales/W LOOP_SEATTLE/5T LOOP_SEATTLE/H LOOP_SEATTLE/D SZ_TAXI/15T SZ_TAXI/H M_DENSE/H M_DENSE/D ett1/15T ett1/H ett1/D ett1/W ett2/W ett2/D jena_weather/10T jena_weather/H jena_weather/D bitbrains_fast_storage/5T bitbrains_fast_storage/H bitbrains_rnd/5T bitbrains_rnd/H bizitobs_application bizitobs_service bizitobs_l2c/5T bizitobs_l2c/H" |
|
|
| MED_LONG_DATASETS = "electricity/15T electricity/H solar/10T solar/H kdd_cup_2018_with_missing/H LOOP_SEATTLE/5T LOOP_SEATTLE/H SZ_TAXI/15T M_DENSE/H ett1/15T ett1/H ett2/15T ett2/H jena_weather/10T jena_weather/H bitbrains_fast_storage/5T bitbrains_rnd/5T bizitobs_application bizitobs_service bizitobs_l2c/5T bizitobs_l2c/H" |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Run Reverso GiftEval across datasets") |
| parser.add_argument("--checkpoint", default='checkpoints/reverso_small/checkpoint.pth', help="Path to model checkpoint") |
| parser.add_argument("--json_path", default='checkpoints/reverso_small/args.json', help="Path to JSON file with model config overrides") |
| parser.add_argument("--output-dir", dest="output_dir", default='results/reverso_small', help="Output directory for results") |
| parser.add_argument("--dataset", default=None, help="Filter to specific dataset (substring match)") |
| parser.add_argument("--term", default=None, choices=["short", "medium", "long"], help="Filter to specific term") |
| parser.add_argument("--force-flip-invariance", dest="force_flip_invariance", action="store_true", |
| help="Average f(x) with -f(-x) for flip invariance") |
| parser.add_argument("--downsample-json", dest="downsample_json", |
| default="config/downsample_factors.json", |
| help="Path to JSON with downsample factors per dataset/term") |
| args = parser.parse_args() |
|
|
| |
| json_cfg = {} |
| if args.json_path and os.path.isfile(args.json_path): |
| with open(args.json_path, "r") as f: |
| json_cfg = json.load(f) |
|
|
| |
| SEQ_LEN = int(json_cfg.get("seq_len", 2048)) |
| INPUT_TOKEN_LEN = int(json_cfg.get("input_token_len", 2048)) |
| OUTPUT_TOKEN_LEN = int(json_cfg.get("output_token_len", 48)) |
| E_LAYERS = int(json_cfg.get("e_layers", 8)) |
| D_MODEL = int(json_cfg.get("d_model", 128)) |
| D_INTERMEDIATE = int(json_cfg.get("d_intermediate", 512)) |
| OUTPUT_BOTTLENECK_DIM = int(json_cfg.get("output_bottleneck_dim", 48)) |
| EXPAND_V = float(json_cfg.get("expand_v", 1.0)) |
| STATE_WEAVING = int(json_cfg.get("state_weaving", 1)) |
| GATING_KERNEL_SIZE = int(json_cfg.get("gating_kernel_size", 3)) |
| MAIN_MODULE = str(json_cfg.get("main_module", "conv,attn,conv,attn,conv,attn,conv,attn")) |
|
|
| DEVICE = "cuda" |
| NUM_SAMPLES = 100 |
| BATCH_SIZE = 256 |
| USE_AMP = 1 |
|
|
| downsample_map = {} |
| if os.path.isfile(args.downsample_json): |
| with open(args.downsample_json, "r") as f: |
| downsample_map = json.load(f) |
|
|
| |
| all_datasets = sorted(set(SHORT_DATASETS.split() + MED_LONG_DATASETS.split())) |
| med_long_set = set(MED_LONG_DATASETS.split()) |
| all_terms = ["short", "medium", "long"] |
|
|
| with open("config/dataset_properties.json", "r") as f: |
| dataset_properties = json.load(f) |
|
|
| os.environ.setdefault("GIFT_EVAL", os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "data")) |
|
|
| if args.dataset: |
| all_datasets = [ds for ds in all_datasets if args.dataset in ds] |
| if not all_datasets: |
| print(f"No datasets found matching '{args.dataset}'") |
| return |
|
|
| if args.term: |
| all_terms = [args.term] |
|
|
| |
| output_dir = args.output_dir or os.path.join(os.path.dirname(os.path.abspath(__file__)), "results") |
| os.makedirs(output_dir, exist_ok=True) |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| csv_path = os.path.join(output_dir, f"all_results_{timestamp}.csv") |
|
|
| with open(csv_path, "w", newline="") as f: |
| writer = csv.writer(f) |
| writer.writerow([ |
| "dataset", "model", |
| "eval_metrics/MSE[mean]", "eval_metrics/MSE[0.5]", |
| "eval_metrics/MAE[0.5]", "eval_metrics/MASE[0.5]", |
| "eval_metrics/MAPE[0.5]", "eval_metrics/sMAPE[0.5]", |
| "eval_metrics/MSIS", "eval_metrics/RMSE[mean]", |
| "eval_metrics/NRMSE[mean]", "eval_metrics/ND[0.5]", |
| "eval_metrics/mean_weighted_sum_quantile_loss", |
| "domain", "num_variates", |
| ]) |
|
|
| from gluonts.model import evaluate_model |
| from gluonts.time_feature import get_seasonality |
| from gift_eval.data import Dataset |
|
|
| print(f"Evaluating {len(all_datasets)} datasets, terms: {all_terms}") |
| print(f"Flip invariance: {args.force_flip_invariance}") |
|
|
| for ds_num, ds_name in enumerate(all_datasets): |
| if "/" in ds_name: |
| ds_key = PRETTY_NAMES.get(ds_name.split("/")[0].lower(), ds_name.split("/")[0].lower()) |
| ds_freq = ds_name.split("/")[1] |
| else: |
| ds_key = PRETTY_NAMES.get(ds_name.lower(), ds_name.lower()) |
| ds_freq = dataset_properties[ds_key]["frequency"] |
|
|
| print(f"[{ds_num + 1}/{len(all_datasets)}] {ds_name}") |
|
|
| for term in all_terms: |
| if term in ("medium", "long") and ds_name not in med_long_set: |
| continue |
|
|
| ds_config = f"{ds_key}/{ds_freq}/{term}" |
| probe = Dataset(name=ds_name, term=term, to_univariate=False) |
| to_univariate = probe.target_dim != 1 |
| dataset = Dataset(name=ds_name, term=term, to_univariate=to_univariate) |
| season_length = get_seasonality(dataset.freq) |
|
|
| downsample_key = f"{ds_key}/{ds_freq}/{term}".lower() |
| downsample_factor = downsample_map.get(downsample_key, 1) |
|
|
| info = f" {term}: {len(dataset.test_data)} instances" |
| if downsample_factor > 1: |
| info += f", downsample={downsample_factor}" |
| print(info) |
|
|
| predictor = ReversoPredictor( |
| prediction_length=dataset.prediction_length, |
| checkpoint_path=args.checkpoint, |
| device=DEVICE, |
| seq_len=SEQ_LEN, |
| input_token_len=INPUT_TOKEN_LEN, |
| output_token_len=OUTPUT_TOKEN_LEN, |
| e_layers=E_LAYERS, |
| d_model=D_MODEL, |
| d_intermediate=D_INTERMEDIATE, |
| output_bottleneck_dim=OUTPUT_BOTTLENECK_DIM, |
| expand_v=EXPAND_V, |
| state_weaving=STATE_WEAVING, |
| gating_kernel_size=GATING_KERNEL_SIZE, |
| main_module=MAIN_MODULE, |
| num_samples=NUM_SAMPLES, |
| batch_size=BATCH_SIZE, |
| use_amp=USE_AMP, |
| downsample_factor=downsample_factor, |
| force_flip_invariance=args.force_flip_invariance, |
| ) |
|
|
| res = evaluate_model( |
| predictor, |
| test_data=dataset.test_data, |
| metrics=METRICS, |
| batch_size=BATCH_SIZE, |
| axis=None, |
| mask_invalid_label=True, |
| allow_nan_forecast=False, |
| seasonality=season_length, |
| ) |
|
|
| with open(csv_path, "a", newline="") as f: |
| writer = csv.writer(f) |
| writer.writerow([ |
| ds_config, "reverso", |
| res["MSE[mean]"][0], res["MSE[0.5]"][0], |
| res["MAE[0.5]"][0], res["MASE[0.5]"][0], |
| res["MAPE[0.5]"][0], res["sMAPE[0.5]"][0], |
| res["MSIS"][0], res["RMSE[mean]"][0], |
| res["NRMSE[mean]"][0], res["ND[0.5]"][0], |
| res["mean_weighted_sum_quantile_loss"][0], |
| dataset_properties[ds_key]["domain"], |
| dataset_properties[ds_key]["num_variates"], |
| ]) |
|
|
| print(f"\nResults saved to: {csv_path}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|