nanochat/scripts/base_eval.py
Tsvika Shapira b1925368f9 refactor: migrate from os.path to pathlib.Path across codebase
Converted all path operations to use pathlib.Path instead of os.path module.
This modernizes the codebase and fixes all 135 ruff PTH violations.

Changes:
- Replace os.path.join() with Path / operator
- Replace os.path.exists() with Path.exists()
- Replace os.makedirs() with Path.mkdir()
- Replace open() with Path.open() where appropriate
- Replace os.remove() with Path.unlink()
- Replace os.getcwd() with Path.cwd()
- Replace os.path.expanduser("~") with Path.home()
- Add type hints for Path parameters in function signatures

All path objects are now created at first occurrence and propagated
through the codebase, eliminating unnecessary string conversions.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-26 12:49:38 +02:00

213 lines
8.2 KiB
Python

"""
Evaluate the CORE metric for a given model.
Run on a single GPU:
python -m scripts.base_eval
Run with torchrun on e.g. 8 GPUs:
torchrun --nproc_per_node=8 -m scripts.base_eval
The script will print the CORE metric to the console.
"""
import csv
import time
import json
import yaml
import shutil
import random
import zipfile
import tempfile
from contextlib import nullcontext
from pathlib import Path
import torch
from nanochat.common import compute_init, compute_cleanup, print0, get_base_dir, autodetect_device_type, download_file_with_lock
from nanochat.tokenizer import HuggingFaceTokenizer
from nanochat.checkpoint_manager import load_model
from nanochat.core_eval import evaluate_task
# -----------------------------------------------------------------------------
# nanochat specific function dealing with I/O etc.
# ~162MB of data needed to evaluate the CORE metric
EVAL_BUNDLE_URL = "https://karpathy-public.s3.us-west-2.amazonaws.com/eval_bundle.zip"
def place_eval_bundle(file_path):
# here file_path is the path to the eval_bundle.zip file
# we need to unzip it and place it in the base directory
base_dir = get_base_dir()
eval_bundle_dir = base_dir / "eval_bundle"
with tempfile.TemporaryDirectory() as tmpdir:
with zipfile.ZipFile(file_path, 'r') as zip_ref:
zip_ref.extractall(tmpdir)
extracted_bundle_dir = Path(tmpdir) / "eval_bundle"
shutil.move(str(extracted_bundle_dir), str(eval_bundle_dir))
print0(f"Placed eval_bundle directory at {eval_bundle_dir}")
def evaluate_model(model, tokenizer, device, max_per_task=-1):
"""
Evaluate a base model on the CORE benchmark.
- max_per_task: crop the data to this many examples per task for testing (-1 = disable)
"""
# Load config and task metadata
base_dir = get_base_dir()
eval_bundle_dir = base_dir / "eval_bundle"
# Download the eval bundle to disk (and unzip if needed)
if not eval_bundle_dir.exists():
download_file_with_lock(EVAL_BUNDLE_URL, "eval_bundle.zip", postprocess_fn=place_eval_bundle)
config_path = eval_bundle_dir / "core.yaml"
data_base_path = eval_bundle_dir / "eval_data"
eval_meta_data = eval_bundle_dir / "eval_meta_data.csv"
with config_path.open('r', encoding='utf-8') as f:
config = yaml.safe_load(f)
tasks = config['icl_tasks']
# Load random baseline values from eval metadata
random_baselines = {}
with eval_meta_data.open('r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
task_name = row['Eval Task']
random_baseline = row['Random baseline']
random_baselines[task_name] = float(random_baseline)
# Evaluate each task
results = {}
centered_results = {}
for task in tasks:
start_time = time.time()
label = task['label']
task_meta = {
'task_type': task['icl_task_type'],
'dataset_uri': task['dataset_uri'],
'num_fewshot': task['num_fewshot'][0],
'continuation_delimiter': task.get('continuation_delimiter', ' ')
}
print0(f"Evaluating: {label} ({task_meta['num_fewshot']}-shot, type: {task_meta['task_type']})... ", end='')
# Load data for this task
data_path = data_base_path / task_meta['dataset_uri']
with data_path.open('r', encoding='utf-8') as f:
data = [json.loads(line.strip()) for line in f]
# shuffle the data because in many cases it appears ordered but we want
# the ability to only run a subset of the data for debugging purposes etc.
shuffle_rng = random.Random(1337)
shuffle_rng.shuffle(data)
if max_per_task > 0:
data = data[:max_per_task]
# run the evaluation for this task
accuracy = evaluate_task(model, tokenizer, data, device, task_meta)
results[label] = accuracy
random_baseline = random_baselines[label]
centered_result = (accuracy - 0.01 * random_baseline) / (1.0 - 0.01 * random_baseline)
centered_results[label] = centered_result
end_time = time.time()
print0(f"accuracy: {accuracy:.4f} | centered: {centered_result:.4f} | time: {end_time - start_time:.2f}s")
core_metric = sum(centered_results.values()) / len(centered_results)
out = {
"results": results,
"centered_results": centered_results,
"core_metric": core_metric
}
return out
# -----------------------------------------------------------------------------
# HuggingFace loading utilities and light wrappers for a model
class ModelWrapper:
"""Lightweight wrapper for a HuggingFace model"""
def __init__(self, model, max_seq_len=None):
self.model = model
self.max_seq_len = max_seq_len
def __call__(self, input_ids):
outputs = self.model(input_ids)
logits = outputs.logits
return logits
def load_hf_model(hf_path: str, device):
print0(f"Loading model from: {hf_path}")
# Load the model
from transformers import AutoModelForCausalLM
model = AutoModelForCausalLM.from_pretrained(hf_path)
model.to(device)
model.eval()
max_seq_len = 1024 if "openai-community/gpt2" in hf_path else None
model = ModelWrapper(model, max_seq_len=max_seq_len)
# Load the tokenizer
tokenizer = HuggingFaceTokenizer.from_pretrained(hf_path)
return model, tokenizer
# -----------------------------------------------------------------------------
def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--hf-path', type=str, default=None, help='HuggingFace model path to evaluate')
parser.add_argument('--max-per-task', type=int, default=-1, help='Max examples per task to evaluate (-1 = disable)')
args = parser.parse_args()
# distributed / precision setup
device_type = autodetect_device_type()
ddp, ddp_rank, ddp_local_rank, ddp_world_size, device = compute_init(device_type)
autocast_ctx = torch.amp.autocast(device_type=device_type, dtype=torch.bfloat16) if device_type == "cuda" else nullcontext()
# Load model and tokenizer from command line or from file system
if args.hf_path is not None:
# atm assume that if a path is given, it's a huggingface model path
hf_path = args.hf_path
print0(f"Loading huggingface model from: {hf_path}")
model, tokenizer = load_hf_model(hf_path, device)
model_name = hf_path # just for logging
model_slug = hf_path.replace("/", "-") # for the output csv file
else:
# load a local model from the file system
model, tokenizer, meta = load_model("base", device, phase="eval")
model_name = f"base_model (step {meta['step']})" # just for logging
model_slug = f"base_model_{meta['step']:06d}" # for the output csv file
# Evaluate the model
with autocast_ctx:
out = evaluate_model(model, tokenizer, device, max_per_task=args.max_per_task)
# Write out the results to a csv file
core_metric = None
centered_results = {}
if ddp_rank == 0:
base_dir = get_base_dir()
output_csv_path = base_dir / "base_eval" / f"{model_slug}.csv"
output_csv_path.parent.mkdir(parents=True, exist_ok=True)
results = out["results"]
centered_results = out["centered_results"]
core_metric = out["core_metric"]
with output_csv_path.open('w', encoding='utf-8', newline='') as f:
f.write(f"{'Task':<35}, {'Accuracy':<10}, {'Centered':<10}\n")
for label in results:
f.write(f"{label:<35}, {results[label]:<10.6f}, {centered_results[label]:<10.6f}\n")
f.write(f"{'CORE':<35}, {'':<10}, {core_metric:<10.6f}\n")
# Print the content of the csv file to console too
print0("="*80)
print0(f"Model: {model_name}")
print0("="*80)
with output_csv_path.open('r', encoding='utf-8') as f:
print0(f.read())
# Log to report
from nanochat.report import get_report
get_report().log(section="Base model evaluation", data=[
{
"Model": model_name,
"CORE metric": core_metric,
},
centered_results, # the full table
])
compute_cleanup()
if __name__ == "__main__":
main()