compute_group_reward is called once per prompt-group. it receives the full list of completions in a given group, letting you compute rewards that depend on cross-rollout context. it runs alongside compute_reward, not instead of it; the trainer merges both reward dicts.
async def compute_group_reward(
self, rollout_ids, completions, ground_truths, **kwargs
) -> list[dict[str, float]]:
...
return [{"component": score} for score in scores]
keep every component non-negative, the same as compute_reward. see best practices.
pairwise win-rate
judges are usually better at “A beats B” than at producing calibrated absolute scores. fan out one judge call per pair, then aggregate into a win-rate per rollout.
async def compute_group_reward(
self, rollout_ids, completions, ground_truths, **kwargs
) -> list[dict[str, float]]:
texts = {rid: self._extract_text(c) for rid, c in zip(rollout_ids, completions)}
wins = {rid: 0 for rid in rollout_ids}
# fan out all pairwise comparisons concurrently
pairs = [(a, b) for i, a in enumerate(rollout_ids) for b in rollout_ids[i+1:]]
results = await asyncio.gather(*[
self._judge_pair(texts[a], texts[b], ground_truths[0]) for a, b in pairs
])
for (a, b), winner in zip(pairs, results):
if winner == "a":
wins[a] += 1
elif winner == "b":
wins[b] += 1
n = len(rollout_ids)
max_wins = n - 1
return [{"win_rate": wins[rid] / max_wins} for rid in rollout_ids]
the async signature exists for exactly this fan-out pattern: all n*(n-1)/2 judge calls run in parallel.
rank transform
convert raw scores to in-group rank before they become advantages. this is robust to reward-model scale drift and outliers: a score of 9.8 when everyone else scored 9.7 matters; a score of 9.8 when everyone else scored 2.0 matters differently. ranking normalizes both cases.
async def compute_group_reward(
self, rollout_ids, completions, ground_truths, **kwargs
) -> list[dict[str, float]]:
raw = [self._score(c, gt) for c, gt in zip(completions, ground_truths)]
# rank within group: 0.0 = worst, 1.0 = best
order = sorted(range(len(raw)), key=lambda i: raw[i])
ranks = [0.0] * len(raw)
for position, idx in enumerate(order):
ranks[idx] = position / (len(raw) - 1) if len(raw) > 1 else 1.0
return [{"rank": r} for r in ranks]
self-consistency (label-free)
when ground_truths is empty or unreliable, majority consensus across the group acts as a pseudo-label. rollouts that agree with the majority get a positive signal; outliers get none.
async def compute_group_reward(
self, rollout_ids, completions, ground_truths, **kwargs
) -> list[dict[str, float]]:
answers = [self._extract_answer(c) for c in completions]
# find the most common answer
counts = Counter(answers)
majority, majority_count = counts.most_common(1)[0]
# reward agreement with majority, scaled by how strong the consensus is.
# outliers score 0, not a negative value (keep components non-negative).
consensus_strength = majority_count / len(answers)
return [
{"consistency": consensus_strength if a == majority else 0.0}
for a in answers
]
useful for tasks where you have prompts but no labels, or where ground truth is expensive to produce. works best when the group size is large enough (≥8) that the majority answer is meaningful.
test coverage / complementarity
reward passing the cases others missed. this pushes the group toward pass@k coverage (one rollout specializes in the edge cases another handles easily) rather than having all rollouts pile onto the same easy tests.
async def compute_group_reward(
self, rollout_ids, completions, ground_truths, **kwargs
) -> list[dict[str, float]]:
# passed[i] is a set of test-case ids that rollout i passed
passed = [self._run_tests(c) for c in completions]
all_passed = [tc for p in passed for tc in p]
pass_counts = Counter(all_passed)
scores = []
for p in passed:
# tests passed by fewer rollouts are worth more
complementarity = sum(1.0 / pass_counts[tc] for tc in p)
# normalize by group size so scale is stable
scores.append(complementarity / len(rollout_ids))
return [{"complementarity": s} for s in scores]
a test passed by only one rollout contributes 1/1 = 1.0; one passed by all contributes 1/n ≈ 0. the result incentivizes finding unique solutions rather than rediscovering the majority path.
diversity scaling
without diversity pressure, the model can collapse to a single strategy per prompt. scale_by_diversity groups the rollouts into clusters of similar responses, then divides each rollout’s reward by the size of its cluster. a strategy that five rollouts converge on has its reward split five ways; a unique strategy keeps full reward. this rewards exploring different approaches instead of piling onto one.
from benchmax.rewards.diversity import DiversityConfig, scale_by_diversity
async def compute_group_reward(self, rollout_ids, messages_list, tasks, **kwargs):
raw = [
await self.compute_reward(rid, m, t)
for rid, m, t in zip(rollout_ids, messages_list, tasks)
]
texts = [m[-1]["content"] for m in messages_list if m]
scaled, cluster_info = await scale_by_diversity(
rewards=raw,
texts=texts,
config=DiversityConfig(method="ngram"),
)
return scaled
what counts as “similar” depends on the method:
| method | compares | how | tradeoff |
|---|---|---|---|
ngram | surface form | character n-gram overlap (Jaccard similarity), so responses that share wording cluster together | fast, offline, no API calls. good when distinct strategies also read differently |
llm | meaning | a judge model groups responses by underlying tactic, so two differently-worded responses with the same approach still cluster | catches paraphrases, but costs judge calls and needs model + base_url |
DiversityConfig controls the method and its parameters:
# fast offline clustering (default)
config = DiversityConfig(method="ngram", similarity_threshold=0.5)
# semantic clustering via LLM
config = DiversityConfig(
method="llm",
model="gpt-5.4-mini",
base_url="https://api.openai.com/v1",
api_key="...",
)
on clustering failure, the default fallback treats every rollout as unique (no penalty). set fallback_on_error="uniform" to penalize all rollouts equally instead.