Skip to content

Commit 9016205

Browse files
Cursor Botclaude
andcommitted
Add Parquet-based trajectory storage with ~25x compression
Migrate trajectory files from JSONL to Parquet format for improved storage efficiency and query performance. Key changes: - Add `art migrate` CLI command for migrating existing JSONL files - Backend now writes trajectories as Parquet with ZSTD compression - Auto-migrate JSONL files to Parquet on model registration - Update benchmarking utilities to read Parquet using DuckDB - Add trajectory_migration.py with legacy JSONL support and migration tools - Add duckdb and pyarrow dependencies 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 3b4a942 commit 9016205

9 files changed

Lines changed: 1599 additions & 245 deletions

File tree

pyproject.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ backend = [
3232
"setuptools>=78.1.0",
3333
"wandb==0.22.1",
3434
"polars>=1.26.0",
35+
"duckdb>=1.0.0",
36+
"pyarrow>=15.0.0",
3537
"transformers==4.53.2",
3638
"trl==0.20.0",
3739
"nbclient>=0.10.1",
@@ -108,6 +110,8 @@ dev = [
108110
"pytest-xdist>=3.8.0",
109111
"pyright[nodejs]>=1.1.403",
110112
"pytest-asyncio>=1.1.0",
113+
"duckdb>=1.0.0",
114+
"pyarrow>=15.0.0",
111115
]
112116

113117
[tool.uv.sources]

src/art/cli.py

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import json
22
import socket
3-
from typing import Any, AsyncIterator
3+
from pathlib import Path
4+
from typing import Any, AsyncIterator, Optional
45

56
import pydantic
67
import typer
@@ -26,6 +27,95 @@
2627
app = typer.Typer()
2728

2829

30+
@app.command()
31+
def migrate(
32+
path: Path = typer.Argument(..., help="Path to model dir, project dir, or trajectories dir"),
33+
dry_run: bool = typer.Option(False, "--dry-run", "-n", help="Show what would be migrated without making changes"),
34+
keep_jsonl: bool = typer.Option(False, "--keep-jsonl", help="Keep original JSONL files after conversion"),
35+
verbose: bool = typer.Option(False, "--verbose", "-v", help="Print progress for each file"),
36+
) -> None:
37+
"""
38+
Migrate trajectory files from JSONL to Parquet format.
39+
40+
This command converts old .jsonl trajectory files to the new .parquet format,
41+
which provides ~25x compression and ~20x faster queries.
42+
43+
Examples:
44+
art migrate /path/to/.art/project/models/my-model
45+
art migrate /path/to/.art/project --dry-run
46+
art migrate /path/to/trajectories --keep-jsonl --verbose
47+
"""
48+
from .utils.trajectory_migration import (
49+
migrate_model_dir,
50+
migrate_trajectories_dir,
51+
)
52+
53+
if not path.exists():
54+
typer.echo(f"Error: Path does not exist: {path}", err=True)
55+
raise typer.Exit(1)
56+
57+
# Determine what kind of path this is
58+
if (path / "trajectories").exists():
59+
# This is a model directory
60+
typer.echo(f"Migrating model directory: {path}")
61+
result = migrate_model_dir(
62+
path,
63+
delete_originals=not keep_jsonl,
64+
dry_run=dry_run,
65+
progress_callback=lambda f: typer.echo(f" {f}") if verbose else None,
66+
)
67+
elif path.name == "trajectories" or any(path.glob("*/[0-9]*.jsonl")):
68+
# This is a trajectories directory
69+
typer.echo(f"Migrating trajectories directory: {path}")
70+
result = migrate_trajectories_dir(
71+
path,
72+
delete_originals=not keep_jsonl,
73+
dry_run=dry_run,
74+
progress_callback=lambda f: typer.echo(f" {f}") if verbose else None,
75+
)
76+
elif (path / "models").exists():
77+
# This is a project directory
78+
typer.echo(f"Migrating project directory: {path}")
79+
from .utils.trajectory_migration import MigrationResult
80+
81+
result = MigrationResult()
82+
models_dir = path / "models"
83+
for model_dir in models_dir.iterdir():
84+
if model_dir.is_dir():
85+
if verbose:
86+
typer.echo(f"Processing model: {model_dir.name}")
87+
model_result = migrate_model_dir(
88+
model_dir,
89+
delete_originals=not keep_jsonl,
90+
dry_run=dry_run,
91+
progress_callback=lambda f: typer.echo(f" {f}") if verbose else None,
92+
)
93+
result = result + model_result
94+
else:
95+
typer.echo(f"Error: Could not determine path type. Expected a model, project, or trajectories directory.", err=True)
96+
raise typer.Exit(1)
97+
98+
# Print summary
99+
if dry_run:
100+
typer.echo(f"\n[DRY RUN] Would migrate {result.files_migrated} files")
101+
if result.bytes_before > 0:
102+
typer.echo(f" Estimated space savings: {result.space_saved / 1024 / 1024:.1f} MB")
103+
else:
104+
typer.echo(f"\nMigrated {result.files_migrated} files")
105+
if result.files_skipped > 0:
106+
typer.echo(f"Skipped {result.files_skipped} files")
107+
if result.bytes_before > 0 and result.bytes_after > 0:
108+
typer.echo(f"Space saved: {result.space_saved / 1024 / 1024:.1f} MB ({result.compression_ratio:.1f}x compression)")
109+
110+
if result.errors:
111+
typer.echo(f"\nErrors ({len(result.errors)}):", err=True)
112+
for error in result.errors[:10]:
113+
typer.echo(f" {error}", err=True)
114+
if len(result.errors) > 10:
115+
typer.echo(f" ... and {len(result.errors) - 10} more errors", err=True)
116+
raise typer.Exit(1)
117+
118+
29119
@app.command()
30120
def run(host: str = "0.0.0.0", port: int = 7999) -> None:
31121
"""Run the ART CLI."""

src/art/local/backend.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
pull_model_from_s3,
4444
push_model_to_s3,
4545
)
46-
from art.utils.trajectory_logging import serialize_trajectory_groups
46+
from art.utils.trajectory_logging import write_trajectory_groups_parquet
4747
from mp_actors import close_proxy, move_to_child_process
4848

4949
from .. import dev
@@ -125,6 +125,11 @@ async def register(
125125
with open(f"{output_dir}/model.json", "w") as f:
126126
json.dump(model.model_dump(), f)
127127

128+
# Auto-migrate any old JSONL trajectory files to Parquet
129+
from art.utils.trajectory_migration import auto_migrate_on_register
130+
131+
auto_migrate_on_register(output_dir)
132+
128133
# Initialize wandb and weave early if this is a trainable model
129134
if model.trainable and "WANDB_API_KEY" in os.environ:
130135
_ = self._get_wandb_run(model)
@@ -356,11 +361,10 @@ async def _log(
356361

357362
# Get the file name for the current iteration, or default to 0 for non-trainable models
358363
iteration = self.__get_step(model)
359-
file_name = f"{iteration:04d}.jsonl"
364+
file_name = f"{iteration:04d}.parquet"
360365

361-
# Write the logs to the file
362-
with open(f"{parent_dir}/{file_name}", "w") as f:
363-
f.write(serialize_trajectory_groups(trajectory_groups))
366+
# Write the logs to Parquet file (with ZSTD compression)
367+
write_trajectory_groups_parquet(trajectory_groups, f"{parent_dir}/{file_name}")
364368

365369
# Collect all metrics (including reward) across all trajectories
366370
all_metrics: dict[str, list[float]] = {"reward": [], "exception_rate": []}

0 commit comments

Comments
 (0)