-
Notifications
You must be signed in to change notification settings - Fork 824
Expand file tree
/
Copy pathruler.py
More file actions
339 lines (285 loc) · 13.3 KB
/
ruler.py
File metadata and controls
339 lines (285 loc) · 13.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
"""
RULER (Relative Universal LLM-Elicited Rewards) - A general-purpose reward function for RL agents.
RULER uses an LLM-as-judge to rank multiple agent trajectories relative to each other,
requiring no labeled data or hand-crafted reward functions. It leverages the insight
that relative scoring is easier than absolute scoring, and GRPO only needs relative
scores within each group.
For detailed documentation and examples, see: https://art.openpipe.ai/fundamentals/ruler
"""
import json
from textwrap import dedent
from typing import List
from litellm import acompletion
from litellm.types.utils import ModelResponse
from openai.types.chat.chat_completion_message_param import ChatCompletionMessageParam
from pydantic import BaseModel, Field
from rich import print
import art
class TrajectoryScore(BaseModel):
"""Individual score for a single trajectory."""
trajectory_id: str = Field(description="The id of the trajectory being scored.")
explanation: str = Field(
description="A short description of the trajectory's performance."
)
score: float = Field(description="A score between 0 and 1.")
class Response(BaseModel):
"""Response format expected from the LLM judge."""
scores: List[TrajectoryScore] = Field(description="The scores for each trajectory.")
DEFAULT_RUBRIC = dedent(
"""
- A trajectory that achieves its goal should always get a significantly higher score than a trajectory that does not achieve its goal.
- A trajectory that achieves its goal more efficiently (eg. by avoiding unproductive detours) should get a higher score than a trajectory that achieves its goal less efficiently.
- If one trajectory is only slightly better than another, the difference in scores should be small. If it is significantly better, the difference in scores should be large.
- You may give some partial credit for a trajectory that makes progress towards its goal but does not complete it.
"""
)
"""Default rubric used by RULER. This generic rubric works well for most tasks,
as RULER extracts task understanding from the system prompts in the trajectories."""
async def ruler(
message_lists: list[list[ChatCompletionMessageParam]],
judge_model: str = "openai/o3",
extra_litellm_params: dict | None = None,
rubric: str | None = DEFAULT_RUBRIC,
tools: list | None = None,
*,
debug: bool = False,
) -> list[TrajectoryScore]:
"""Core RULER implementation that scores a list of message trajectories.
This is the low-level API that works with raw message lists. For integration
with ART's training loop, use `ruler_score_group` instead.
RULER works by:
1. Extracting common prefixes from trajectories to save tokens
2. Passing all trajectories to an LLM judge for relative scoring
3. Returning scores that can be used directly as rewards in GRPO
The key insight is that relative scores within a group are all that matters
for GRPO, which normalizes them anyway.
Args:
message_lists: A list where each item is a list of ChatCompletionMessageParam
dicts representing a single trajectory.
judge_model: The model to use for judging. Common options:
- "openai/gpt-4o-mini" - Fast and cost-effective
- "openai/o3" - Most capable but expensive (default)
- "anthropic/claude-3-opus-20240229" - Alternative judge
extra_litellm_params: Additional parameters to pass to LiteLLM completion.
Can include temperature, max_tokens, etc.
rubric: The grading rubric, or None to use DEFAULT_RUBRIC.
The default rubric works well for most tasks.
tools: Optional list of tool definitions available to the agent. When provided,
the judge will see which tools were available when evaluating tool usage.
debug: If True, pretty-print the judge's reasoning to help understand scores.
Returns:
A list of TrajectoryScore objects with scores and explanations.
Example:
>>> message_lists = [
... [{"role": "system", "content": "You are helpful."},
... {"role": "user", "content": "What is 2+2?"},
... {"role": "assistant", "content": "4"}],
... [{"role": "system", "content": "You are helpful."},
... {"role": "user", "content": "What is 2+2?"},
... {"role": "assistant", "content": "I don't know"}]
... ]
>>> scores = await ruler(message_lists, debug=True)
>>> print(scores[0].score) # Higher score for correct answer
0.9
"""
if rubric is None:
rubric = DEFAULT_RUBRIC
# Short-circuit for the trivial case
if not message_lists:
return []
# Determine the length of the longest common prefix shared by all trajectories.
# This optimization reduces token usage when all trajectories share the same
# system prompt or initial messages.
message_lists = message_lists
common_prefix_len = 0
for idx, msg in enumerate(message_lists[0]):
if all(
len(msg_list) > idx and msg_list[idx] == msg for msg_list in message_lists
):
common_prefix_len += 1
else:
break
# Detect if all trajectories are identical
all_identical = all(
len(msg_list) == common_prefix_len for msg_list in message_lists
)
if all_identical and len(message_lists) > 1:
print(
f"[RULER] Warning: All {len(message_lists)} trajectories are identical. "
"Using absolute scoring (loses relative grounding benefit)."
)
# If there is a non-empty common prefix, serialize it once to save tokens.
# Skip this optimization if all trajectories are identical (we'll send the full trajectory instead).
user_text = ""
if common_prefix_len > 0 and not all_identical:
common_prefix_messages = message_lists[0][:common_prefix_len]
user_text += (
"<context>\n" + json.dumps(common_prefix_messages) + "\n</context>\n\n"
)
# Include available tools so the judge knows which tool calls are valid
if tools:
user_text += (
"<available_tools>\n" + json.dumps(tools) + "\n</available_tools>\n\n"
)
# Serialize each trajectory (minus the common prefix) for the judge.
# If all trajectories are identical, only serialize one full trajectory to save tokens.
serialized_trajectories: List[str] = []
if all_identical:
# Send the full trajectory since they're all identical
full_trajectory = message_lists[0]
serialized_trajectories.append(
f'<trajectory id="1">\n' + json.dumps(full_trajectory) + "\n</trajectory>"
)
else:
# Serialize each unique trajectory
for idx, full_messages in enumerate(message_lists, start=1):
trimmed_messages = full_messages[common_prefix_len:]
serialized_trajectories.append(
f'<trajectory id="{idx}">\n'
+ json.dumps(trimmed_messages)
+ "\n</trajectory>"
)
user_text += "Trajectories:\n\n" + "\n\n".join(serialized_trajectories)
judge_prompt = dedent(
f"""
All of the trajectories below have been given the same goal. Your job is to consider each of them and give them a score between 0 and 1. Take into consideration your best judgement of the agent's goal.
Grading standards:
{rubric}
"""
)
messages = [
{"role": "system", "content": judge_prompt},
{"role": "user", "content": user_text},
]
response = await acompletion(
model=judge_model,
messages=messages,
response_format=Response,
caching=False,
**extra_litellm_params if extra_litellm_params else {},
)
assert isinstance(response, ModelResponse)
if len(response.choices) == 0:
raise ValueError(f"No choices in response: {response}")
first_choice = response.choices[0]
if debug:
raw_content = first_choice.message.content or "{}"
try:
print("\n[RULER] Pretty-printed LLM choice JSON:")
print(json.loads(raw_content))
except json.JSONDecodeError as e:
print(f"[RULER] Could not parse choice content as JSON: {e}")
print(f"[RULER] Raw choice content: {raw_content}")
content = first_choice.message.content or "{}"
parsed = Response.model_validate_json(content)
# If all trajectories were identical, we only sent one to the judge
# Duplicate the score for all trajectories
if all_identical:
if len(parsed.scores) != 1:
raise ValueError(
f"Expected 1 score for identical trajectories, but got {len(parsed.scores)}"
)
single_score = parsed.scores[0]
return [
single_score.model_copy(update={"trajectory_id": str(i)})
for i in range(1, len(message_lists) + 1)
]
else:
if len(parsed.scores) != len(message_lists):
raise ValueError(
f"Expected {len(message_lists)} scores, but got {len(parsed.scores)}"
)
return parsed.scores
async def ruler_score_group(
group: art.TrajectoryGroup,
judge_model: str = "openai/o3",
extra_litellm_params: dict | None = None,
rubric: str | None = DEFAULT_RUBRIC,
*,
swallow_exceptions: bool = False,
debug: bool = False,
) -> art.TrajectoryGroup | None:
"""Score a trajectory group using RULER for use in training loops.
This is the recommended API for using RULER with ART. It integrates seamlessly
with `gather_trajectory_groups` via the `after_each` callback.
Key features:
- Works with TrajectoryGroup objects
- Preserves original rewards in metrics["independent_reward"]
- Adds RULER scores to metrics["ruler_score"]
- Supports graceful error handling with swallow_exceptions
- Returns a new TrajectoryGroup with updated rewards
Args:
group: A TrajectoryGroup containing trajectories to score.
judge_model: The model to use for judging. See `ruler` for options.
extra_litellm_params: Additional parameters to pass to LiteLLM completion.
rubric: Custom rubric, or None to use DEFAULT_RUBRIC. The default works well
for most tasks.
swallow_exceptions: If True, returns None on errors instead of raising.
This is recommended for production to handle API failures gracefully.
debug: If True, prints the judge's reasoning.
Returns:
A new TrajectoryGroup with updated rewards, or None if swallow_exceptions=True
and an error occurred.
Example:
>>> # In your training loop
>>> groups = await art.gather_trajectory_groups(
... (art.TrajectoryGroup(rollout(model, scenario) for _ in range(4))
... for scenario in scenarios),
... after_each=lambda g: ruler_score_group(g, "openai/o3",
... swallow_exceptions=True)
... )
For complete documentation and examples, see: https://art.openpipe.ai/fundamentals/ruler
"""
if rubric is None:
rubric = DEFAULT_RUBRIC
# Validate that we don't have additional histories (not yet supported)
for traj in group.trajectories:
if len(traj.additional_histories) > 0:
raise ValueError("Additional histories are not supported by RULER yet.")
# Create deep copies to avoid modifying the original trajectories
# First create shallow copies to avoid issues with unpicklable objects
new_trajectories = []
for t in group.trajectories:
# Create a new trajectory with the same data but fresh objects
new_traj = t.__class__(
messages_and_choices=t.messages_and_choices.copy(),
tools=t.tools.copy() if t.tools else None,
additional_histories=[
h.model_copy(deep=True) for h in t.additional_histories
],
reward=t.reward,
metrics=t.metrics.copy(),
metadata=t.metadata.copy(),
logs=t.logs.copy(),
)
new_trajectories.append(new_traj)
# Extract message lists and preserve original rewards for comparison
message_lists: list[list[ChatCompletionMessageParam]] = []
for traj in new_trajectories:
message_lists.append(traj.messages())
traj.metrics["independent_reward"] = traj.reward
# Extract tools from first trajectory (they should all be the same)
tools = new_trajectories[0].tools if new_trajectories else None
try:
# Call the core ruler function to get scores
scores = await ruler(
message_lists,
judge_model=judge_model,
extra_litellm_params=extra_litellm_params,
rubric=rubric,
tools=tools,
debug=debug,
)
except Exception as e:
if swallow_exceptions:
# In production, it's often better to skip failed groups than crash
print(f"[art_ruler] Swallowed exception: {e}")
return None
else:
raise
# Update each trajectory with its RULER score
for traj, score in zip(new_trajectories, scores):
traj.metrics["ruler_score"] = score.score
traj.reward = score.score # Replace reward with RULER score
traj.log(f"RULER explanation: {score.explanation}")
return art.TrajectoryGroup(new_trajectories)