"""OpenSimula checkpoint layout: versioned manifest + JSON artifacts under ``opensimula/``.
Use :class:`Checkpointer` as a context manager and call ``bundle.save(cp)``,
``spec.save(cp)``, and optionally :meth:`Checkpointer.write_run_config`, then
:meth:`Checkpointer.push_to_hub` (sync) or :meth:`Checkpointer.apush_to_hub` (async,
non-blocking) once ``manifest.json`` exists—or call :func:`save_checkpoint` /
:func:`push_checkpoint_to_hub` / :func:`apush_checkpoint_to_hub` for shorthand.
On-disk layout (``format_version`` ``1.0``)::
<checkpoint_root>/
opensimula/
manifest.json # producer, format, format_version, digests, file names
taxonomy_bundle.json
sampling_strategy.json # optional
run_config.json # optional :class:`OpenSimulaRunConfig` JSON (typed metadata + knobs)
``huggingface-hub`` is used for optional push/pull of the ``opensimula/`` subtree.
:meth:`Checkpointer.push_to_hub` also writes ``README.md`` (custom or auto-generated dataset card).
"""
from __future__ import annotations
import asyncio
import json
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from typing import Any, Literal
from pydantic import BaseModel, ConfigDict, Field, model_validator
from afterimage.simula.types import (
SamplingStrategySpec,
TaxonomyBundle,
sha256_text,
validate_factor_taxonomy,
)
OPENSIMULA_SUBDIR = "opensimula"
MANIFEST_FILENAME = "manifest.json"
TAXONOMY_BUNDLE_FILENAME = "taxonomy_bundle.json"
SAMPLING_STRATEGY_FILENAME = "sampling_strategy.json"
RUN_CONFIG_FILENAME = "run_config.json"
SUPPORTED_MANIFEST_FORMAT_VERSIONS = frozenset({"1.0"})
def _package_version(dist_name: str = "afterimage") -> str | None:
try:
from importlib.metadata import version
return version(dist_name)
except Exception:
return None
def _sha256_bytes(data: bytes) -> str:
import hashlib
return hashlib.sha256(data).hexdigest()
def _sha256_file(path: Path) -> str:
return _sha256_bytes(path.read_bytes())
def opensimula_dir(checkpoint_root: Path) -> Path:
return Path(checkpoint_root) / OPENSIMULA_SUBDIR
[docs]
class OpenSimulaManifest(BaseModel):
"""Versioned checkpoint manifest (portable across tools that understand ``format``)."""
producer: Literal["afterimage"] = "afterimage"
format: Literal["opensimula"] = "opensimula"
format_version: str = Field(
"1.0",
description="Contract version for manifest fields and sibling file layout.",
)
created_at: str = Field(
...,
description="ISO 8601 timestamp in UTC when the checkpoint was written.",
)
afterimage_version: str | None = Field(
None,
description="Installed afterimage distribution version, if available.",
)
instruction_y_sha256: str = Field(
...,
description="SHA256 hex of UTF-8 bytes of instruction_y (cross-check with bundle).",
)
taxonomy_bundle_sha256: str = Field(
...,
description="SHA256 hex of on-disk taxonomy_bundle.json bytes.",
)
sampling_strategy_sha256: str | None = Field(
None,
description="SHA256 hex of sampling_strategy.json if present.",
)
taxonomy_bundle_file: str = TAXONOMY_BUNDLE_FILENAME
sampling_strategy_file: str | None = None
run_config_file: str | None = None
[docs]
class OpenSimulaRunConfig(BaseModel):
"""Typed metadata and hyperparameters stored in ``run_config.json`` beside a checkpoint."""
model_config = ConfigDict(extra="ignore")
name: str | None = Field(
default=None,
description="Short label for this run (e.g. experiment id or pipeline name).",
)
description: str | None = Field(
default=None,
description="Optional longer note for operators or dashboards.",
)
model: str | None = Field(
default=None,
description="Teacher model id used for OpenSimula LLM calls.",
)
temperature: float | None = None
target_depth_D: int | None = None
proposal_N: int | None = None
meta_prompt_K: int | None = None
complexify_c: float | None = None
max_factors: int | None = None
max_children_per_node: int | None = None
max_frontier_per_depth: int | None = None
num_choices: int | None = Field(
default=None,
description="MCQ-style runs: number of answer options.",
)
num_samples: int | None = None
max_concurrency: int | None = None
seed: int | None = None
data_jsonl: str | None = Field(
default=None,
description="Path to appended sample JSONL (often relative to checkpoint root).",
)
corpus_excerpt_count: int | None = None
@model_validator(mode="before")
@classmethod
def _legacy_example_into_name(cls, data: Any) -> Any:
"""Older checkpoints used ``example``; map to :attr:`name` when :attr:`name` is unset."""
if not isinstance(data, dict):
return data
out = dict(data)
if "example" in out:
legacy = out.pop("example")
if out.get("name") in (None, "") and legacy not in (None, ""):
out["name"] = str(legacy)
return out
AFTERIMAGE_REPO_URL = "https://github.com/altaidevorg/afterimage"
SIMULA_TMLR_PDF_URL = "https://openreview.net/pdf?id=NALsdGEPhB"
SIMULA_MECHANISM_BLOG_URL = (
"https://research.google/blog/designing-synthetic-datasets-for-the-real-world-"
"mechanism-design-and-reasoning-from-first-principles/"
)
def _read_optional_run_config(odir: Path) -> OpenSimulaRunConfig | None:
p = odir / RUN_CONFIG_FILENAME
if not p.is_file():
return None
try:
return OpenSimulaRunConfig.model_validate(
json.loads(p.read_text(encoding="utf-8"))
)
except (json.JSONDecodeError, ValueError, OSError):
return None
def _hub_dataset_tags(run: OpenSimulaRunConfig | None) -> list[str]:
"""Tags for README frontmatter (deduplicated, stable order)."""
tags = ["afterimage", "simula", "opensimula"]
if run is None:
return tags
if run.num_choices is not None:
tags.extend(["mcq", "multiple-choice"])
if run.num_samples is not None and run.num_samples > 0:
tags.append("batch-generation")
if run.num_choices is None:
tags.extend(["single-qa", "question-answering"])
out: list[str] = []
seen: set[str] = set()
for t in tags:
if t not in seen:
seen.add(t)
out.append(t)
return out
def _default_dataset_readme(
*,
manifest: OpenSimulaManifest,
run: OpenSimulaRunConfig | None,
repo_id: str,
) -> str:
"""Auto-generated ``README.md`` for Hub dataset (and other) repos."""
tag_lines = "\n".join(f" - {t}" for t in _hub_dataset_tags(run))
name_note = ""
if run and run.name:
name_note = f" Run label: **{run.name}**."
if run and run.description and (not run.name or run.description != run.name):
name_note += f" {run.description}"
return f"""---
tags:
{tag_lines}
---
# OpenSimula checkpoint — `{repo_id}`
This repository contains an **OpenSimula** checkpoint subtree (folder **`opensimula/`**)
written by [**AfterImage**]({AFTERIMAGE_REPO_URL}).{name_note}
## AfterImage
[**AfterImage**]({AFTERIMAGE_REPO_URL}) is an open-source Python library for synthetic dataset
generation at scale—conversational data, tool calling, structured outputs, preference pairs,
personas, and more. **OpenSimula** lives under `afterimage.simula` as an experimental,
Simula-inspired pipeline (taxonomy → strategies → meta-prompts → critics).
## OpenSimula (Simula-inspired)
**OpenSimula** follows mechanism-design ideas from Davidson et al.,
[*Reasoning-Driven Synthetic Data Generation and Evaluation* (TMLR)]({SIMULA_TMLR_PDF_URL}).
It is **not** a Google product or reference port. For the broader framing, see Google's
[research blog on mechanism design for synthetic data]({SIMULA_MECHANISM_BLOG_URL}).
## Layout
| Path | Role |
|------|------|
| `opensimula/manifest.json` | Producer **`{manifest.producer}`**, format **`{manifest.format}`**, contract **`{manifest.format_version}`**. |
| `opensimula/taxonomy_bundle.json` | Factors and factor taxonomies. |
| `opensimula/sampling_strategy.json` | Weighted joint sampling strategies (if present). |
| `opensimula/run_config.json` | Typed run metadata (`OpenSimulaRunConfig`, if present). |
Download with `afterimage.simula.pull_checkpoint_from_hub` then `load_checkpoint` from a local
directory to obtain `TaxonomyBundle`, `SamplingStrategySpec`, and `OpenSimulaRunConfig`.
"""
[docs]
@dataclass(frozen=True)
class SimulaCheckpoint:
"""Loaded checkpoint: manifest + parsed models + optional extras."""
manifest: OpenSimulaManifest
bundle: TaxonomyBundle
sampling_strategy: SamplingStrategySpec | None
run_config: OpenSimulaRunConfig | None
root: Path
def _validate_bundle_trees(bundle: TaxonomyBundle) -> None:
for t in bundle.taxonomies:
validate_factor_taxonomy(t)
[docs]
class Checkpointer:
"""Collect OpenSimula artifacts under ``<root>/opensimula/`` and write ``manifest.json`` on exit.
Typical usage::
with Checkpointer("./run") as cp:
bundle.save(cp)
spec.save(cp)
cp.write_run_config(OpenSimulaRunConfig(name="demo", model="gemini-2.5-flash"))
url = cp.push_to_hub("org/dataset-repo")
Call :meth:`write_taxonomy_bundle` (or ``bundle.save(cp)``) at least once before the
context exits. Optional files are removed on enter when ``clear_stale_optional`` is
true so omitted ``spec.save`` / ``write_run_config`` do not leave stale JSON.
"""
def __init__(
self,
checkpoint_root: Path | str,
*,
validate_taxonomies: bool = True,
clear_stale_optional: bool = True,
) -> None:
self.root = Path(checkpoint_root)
self.validate_taxonomies = validate_taxonomies
self.clear_stale_optional = clear_stale_optional
self._odir = opensimula_dir(self.root)
self._bundle_written = False
self._instruction_y_digest: str | None = None
self._bundle_digest: str | None = None
self._strat_digest: str | None = None
self._strat_file: str | None = None
self._run_file: str | None = None
self.manifest: OpenSimulaManifest | None = None
self._entered = False
@property
def opensimula_dir(self) -> Path:
return self._odir
def __enter__(self) -> Checkpointer:
self._entered = True
self._bundle_written = False
self._instruction_y_digest = None
self._bundle_digest = None
self._strat_digest = None
self._strat_file = None
self._run_file = None
self.manifest = None
self._odir.mkdir(parents=True, exist_ok=True)
if self.clear_stale_optional:
stale_strat = self._odir / SAMPLING_STRATEGY_FILENAME
stale_run = self._odir / RUN_CONFIG_FILENAME
if stale_strat.is_file():
stale_strat.unlink()
if stale_run.is_file():
stale_run.unlink()
return self
def __exit__(self, exc_type: object, exc: object, tb: object) -> Literal[False]:
self._entered = False
if exc_type is None and self._bundle_written:
self._write_manifest()
return False
def _require_context(self) -> None:
if not self._entered:
raise RuntimeError(
"Use Checkpointer as a context manager: with Checkpointer(path) as cp: ..."
)
[docs]
def write_taxonomy_bundle(self, bundle: TaxonomyBundle) -> None:
"""Write ``taxonomy_bundle.json`` and record digests for the manifest."""
self._require_context()
if self.validate_taxonomies:
_validate_bundle_trees(bundle)
bundle_path = self._odir / TAXONOMY_BUNDLE_FILENAME
bundle_path.write_text(
bundle.model_dump_json(indent=2) + "\n", encoding="utf-8"
)
inst_digest = sha256_text(bundle.instruction_y)
roundtrip = TaxonomyBundle.model_validate_json(
bundle_path.read_text(encoding="utf-8"),
)
if sha256_text(roundtrip.instruction_y) != inst_digest:
raise RuntimeError(
"taxonomy_bundle.json round-trip instruction_y digest mismatch"
)
self._bundle_written = True
self._instruction_y_digest = inst_digest
self._bundle_digest = _sha256_file(bundle_path)
[docs]
def write_sampling_strategy(self, spec: SamplingStrategySpec) -> None:
"""Write ``sampling_strategy.json`` (call after :meth:`write_taxonomy_bundle`)."""
self._require_context()
if not self._bundle_written:
raise RuntimeError(
"write_taxonomy_bundle (or bundle.save) before write_sampling_strategy"
)
strat_path = self._odir / SAMPLING_STRATEGY_FILENAME
strat_path.write_text(spec.model_dump_json(indent=2) + "\n", encoding="utf-8")
self._strat_digest = _sha256_file(strat_path)
self._strat_file = SAMPLING_STRATEGY_FILENAME
[docs]
def write_run_config(self, config: OpenSimulaRunConfig) -> None:
"""Write ``run_config.json`` (call after :meth:`write_taxonomy_bundle`)."""
self._require_context()
if not self._bundle_written:
raise RuntimeError(
"write_taxonomy_bundle (or bundle.save) before write_run_config"
)
run_path = self._odir / RUN_CONFIG_FILENAME
raw = config.model_dump(mode="json", exclude_none=True)
run_path.write_text(
json.dumps(raw, indent=2, ensure_ascii=False, sort_keys=True) + "\n",
encoding="utf-8",
)
self._run_file = RUN_CONFIG_FILENAME
[docs]
def finalize(self) -> OpenSimulaManifest:
"""Write ``manifest.json`` immediately (usually you rely on context exit instead)."""
self._require_context()
self._write_manifest()
assert self.manifest is not None
return self.manifest
def _write_manifest(self) -> None:
if (
not self._bundle_written
or self._instruction_y_digest is None
or self._bundle_digest is None
):
raise RuntimeError("Cannot finalize: taxonomy bundle was not written")
manifest = OpenSimulaManifest(
created_at=datetime.now(UTC)
.replace(microsecond=0)
.isoformat()
.replace("+00:00", "Z"),
afterimage_version=_package_version(),
instruction_y_sha256=self._instruction_y_digest,
taxonomy_bundle_sha256=self._bundle_digest,
sampling_strategy_sha256=self._strat_digest,
sampling_strategy_file=self._strat_file,
run_config_file=self._run_file,
)
(self._odir / MANIFEST_FILENAME).write_text(
manifest.model_dump_json(indent=2) + "\n",
encoding="utf-8",
)
self.manifest = manifest
[docs]
def push_to_hub(
self,
repo_id: str,
*,
repo_type: Literal["dataset", "model", "space"] = "dataset",
token: str | None = None,
commit_message: str | None = None,
private: bool = False,
path_in_repo: str = OPENSIMULA_SUBDIR,
dataset_card: str | None = None,
) -> str:
"""Upload ``<root>/opensimula/`` to the Hugging Face Hub (creates the repo if missing).
Requires ``manifest.json`` on disk—for example after the ``with`` block exits or
after :meth:`finalize`.
``dataset_card`` becomes the repository ``README.md`` at the Hub root. When omitted
or blank, a default card is generated (YAML ``tags`` frontmatter plus a short
introduction with links to AfterImage and the Simula paper / blog).
"""
from huggingface_hub import HfApi, create_repo
manifest_path = self._odir / MANIFEST_FILENAME
if not manifest_path.is_file():
raise FileNotFoundError(
f"No OpenSimula manifest at {manifest_path}. "
"Finish saving (exit `with Checkpointer(...)` or call finalize()) before push_to_hub.",
)
manifest = OpenSimulaManifest.model_validate_json(
manifest_path.read_text(encoding="utf-8"),
)
run = _read_optional_run_config(self._odir)
card = (dataset_card or "").strip()
if not card:
card = _default_dataset_readme(manifest=manifest, run=run, repo_id=repo_id)
api = HfApi(token=token)
create_repo(
repo_id, repo_type=repo_type, private=private, exist_ok=True, token=token
)
api.upload_folder(
folder_path=str(self._odir),
path_in_repo=path_in_repo.strip("/") or OPENSIMULA_SUBDIR,
repo_id=repo_id,
repo_type=repo_type,
commit_message=commit_message
or "Upload OpenSimula checkpoint (opensimula/)",
token=token,
)
readme_commit = (
f"{commit_message} — README.md"
if commit_message
else "Add OpenSimula dataset README"
)
api.upload_file(
path_or_fileobj=card.encode("utf-8"),
path_in_repo="README.md",
repo_id=repo_id,
repo_type=repo_type,
commit_message=readme_commit,
token=token,
)
host = "https://huggingface.co"
if repo_type == "dataset":
return f"{host}/datasets/{repo_id}"
if repo_type == "space":
return f"{host}/spaces/{repo_id}"
return f"{host}/{repo_id}"
[docs]
async def apush_to_hub(
self,
repo_id: str,
*,
repo_type: Literal["dataset", "model", "space"] = "dataset",
token: str | None = None,
commit_message: str | None = None,
private: bool = False,
path_in_repo: str = OPENSIMULA_SUBDIR,
dataset_card: str | None = None,
) -> str:
"""Same as :meth:`push_to_hub`, but runs blocking Hub I/O in a worker thread.
Prefer this from async code so uploads do not block the event loop.
"""
return await asyncio.to_thread(
self.push_to_hub,
repo_id,
repo_type=repo_type,
token=token,
commit_message=commit_message,
private=private,
path_in_repo=path_in_repo,
dataset_card=dataset_card,
)
[docs]
def save_checkpoint(
checkpoint_root: Path | str,
*,
bundle: TaxonomyBundle,
sampling_strategy: SamplingStrategySpec | None = None,
run_config: OpenSimulaRunConfig | None = None,
validate_taxonomies: bool = True,
) -> OpenSimulaManifest:
"""Write ``opensimula/`` under ``checkpoint_root`` and return the manifest.
Equivalent to using :class:`Checkpointer` with ``bundle.save`` / ``spec.save`` /
:meth:`Checkpointer.write_run_config`.
"""
cp = Checkpointer(checkpoint_root, validate_taxonomies=validate_taxonomies)
with cp:
bundle.save(cp)
if sampling_strategy is not None:
sampling_strategy.save(cp)
if run_config is not None:
cp.write_run_config(run_config)
assert cp.manifest is not None
return cp.manifest
[docs]
def load_checkpoint(
checkpoint_root: Path | str,
*,
verify_digests: bool = True,
validate_taxonomies: bool = True,
) -> SimulaCheckpoint:
"""Load ``opensimula/`` from ``checkpoint_root``."""
root = Path(checkpoint_root)
odir = opensimula_dir(root)
manifest_path = odir / MANIFEST_FILENAME
if not manifest_path.is_file():
raise FileNotFoundError(
f"Missing manifest: {manifest_path} (expected OpenSimula checkpoint layout)",
)
manifest = OpenSimulaManifest.model_validate_json(
manifest_path.read_text(encoding="utf-8"),
)
if manifest.format_version not in SUPPORTED_MANIFEST_FORMAT_VERSIONS:
raise ValueError(
f"Unsupported OpenSimula manifest format_version={manifest.format_version!r}; "
f"supported: {sorted(SUPPORTED_MANIFEST_FORMAT_VERSIONS)}",
)
if manifest.producer != "afterimage" or manifest.format != "opensimula":
raise ValueError(
f"Unexpected manifest producer/format: {manifest.producer!r} / {manifest.format!r}",
)
bundle_path = odir / manifest.taxonomy_bundle_file
if not bundle_path.is_file():
raise FileNotFoundError(f"Missing taxonomy bundle file: {bundle_path}")
if verify_digests:
got = _sha256_file(bundle_path)
if got != manifest.taxonomy_bundle_sha256:
raise ValueError(
f"taxonomy_bundle.json digest mismatch: expected {manifest.taxonomy_bundle_sha256}, got {got}",
)
bundle = TaxonomyBundle.model_validate_json(bundle_path.read_text(encoding="utf-8"))
if (
verify_digests
and sha256_text(bundle.instruction_y) != manifest.instruction_y_sha256
):
raise ValueError("instruction_y digest does not match manifest")
if validate_taxonomies:
_validate_bundle_trees(bundle)
strategy: SamplingStrategySpec | None = None
strat_name = manifest.sampling_strategy_file or SAMPLING_STRATEGY_FILENAME
strat_candidate = odir / strat_name
if strat_candidate.is_file():
if verify_digests and manifest.sampling_strategy_sha256:
sg = _sha256_file(strat_candidate)
if sg != manifest.sampling_strategy_sha256:
raise ValueError(
f"sampling strategy digest mismatch: expected {manifest.sampling_strategy_sha256}, got {sg}",
)
strategy = SamplingStrategySpec.model_validate_json(
strat_candidate.read_text(encoding="utf-8"),
)
run_cfg: OpenSimulaRunConfig | None = None
if manifest.run_config_file:
rp = odir / manifest.run_config_file
if rp.is_file():
run_cfg = OpenSimulaRunConfig.model_validate(
json.loads(rp.read_text(encoding="utf-8")),
)
return SimulaCheckpoint(
manifest=manifest,
bundle=bundle,
sampling_strategy=strategy,
run_config=run_cfg,
root=root,
)
[docs]
def push_checkpoint_to_hub(
checkpoint_root: Path | str,
repo_id: str,
*,
repo_type: Literal["dataset", "model", "space"] = "dataset",
token: str | None = None,
commit_message: str | None = None,
private: bool = False,
path_in_repo: str = OPENSIMULA_SUBDIR,
dataset_card: str | None = None,
) -> str:
"""Upload local ``opensimula/`` to the Hub under ``path_in_repo`` (default ``opensimula``).
Same as ``Checkpointer(checkpoint_root).push_to_hub(...)``. Returns the canonical repo URL.
"""
return Checkpointer(checkpoint_root).push_to_hub(
repo_id,
repo_type=repo_type,
token=token,
commit_message=commit_message,
private=private,
path_in_repo=path_in_repo,
dataset_card=dataset_card,
)
async def apush_checkpoint_to_hub(
checkpoint_root: Path | str,
repo_id: str,
*,
repo_type: Literal["dataset", "model", "space"] = "dataset",
token: str | None = None,
commit_message: str | None = None,
private: bool = False,
path_in_repo: str = OPENSIMULA_SUBDIR,
dataset_card: str | None = None,
) -> str:
"""Async wrapper for :func:`push_checkpoint_to_hub` (Hub I/O in ``asyncio.to_thread``)."""
return await asyncio.to_thread(
push_checkpoint_to_hub,
checkpoint_root,
repo_id,
repo_type=repo_type,
token=token,
commit_message=commit_message,
private=private,
path_in_repo=path_in_repo,
dataset_card=dataset_card,
)
[docs]
def pull_checkpoint_from_hub(
repo_id: str,
checkpoint_root: Path | str,
*,
repo_type: Literal["dataset", "model", "space"] = "dataset",
revision: str | None = None,
token: str | None = None,
path_in_repo: str = OPENSIMULA_SUBDIR,
) -> Path:
"""Download ``path_in_repo/**`` from the Hub into ``checkpoint_root`` (merging with ``snapshot_download``).
Returns ``opensimula_dir(checkpoint_root)``.
"""
from huggingface_hub import snapshot_download
root = Path(checkpoint_root)
root.mkdir(parents=True, exist_ok=True)
prefix = path_in_repo.strip("/")
allow = [f"{prefix}/**"] if prefix else None
snapshot_download(
repo_id=repo_id,
repo_type=repo_type,
revision=revision,
local_dir=str(root),
allow_patterns=allow,
token=token,
)
odir = opensimula_dir(root)
if not (odir / MANIFEST_FILENAME).is_file():
raise FileNotFoundError(
f"After Hub download, expected manifest at {odir / MANIFEST_FILENAME}. "
f"Check repo layout and path_in_repo (got {path_in_repo!r}).",
)
return odir