group rewards

environment Mar 19, 2026 5 min read

compute_group_reward is called once per prompt-group. it receives the full list of completions in a given group, letting you compute rewards that depend on cross-rollout context. it runs alongside compute_reward, not instead of it; the trainer merges both reward dicts.

async def compute_group_reward(
    self, rollout_ids, completions, ground_truths, **kwargs
) -> list[dict[str, float]]:
    ...
    return [{"component": score} for score in scores]

keep every component non-negative, the same as compute_reward. see best practices.

pairwise win-rate

judges are usually better at “A beats B” than at producing calibrated absolute scores. fan out one judge call per pair, then aggregate into a win-rate per rollout.

async def compute_group_reward(
    self, rollout_ids, completions, ground_truths, **kwargs
) -> list[dict[str, float]]:
    texts = {rid: self._extract_text(c) for rid, c in zip(rollout_ids, completions)}
    wins = {rid: 0 for rid in rollout_ids}

    # fan out all pairwise comparisons concurrently
    pairs = [(a, b) for i, a in enumerate(rollout_ids) for b in rollout_ids[i+1:]]
    results = await asyncio.gather(*[
        self._judge_pair(texts[a], texts[b], ground_truths[0]) for a, b in pairs
    ])

    for (a, b), winner in zip(pairs, results):
        if winner == "a":
            wins[a] += 1
        elif winner == "b":
            wins[b] += 1

    n = len(rollout_ids)
    max_wins = n - 1
    return [{"win_rate": wins[rid] / max_wins} for rid in rollout_ids]

the async signature exists for exactly this fan-out pattern: all n*(n-1)/2 judge calls run in parallel.

rank transform

convert raw scores to in-group rank before they become advantages. this is robust to reward-model scale drift and outliers: a score of 9.8 when everyone else scored 9.7 matters; a score of 9.8 when everyone else scored 2.0 matters differently. ranking normalizes both cases.

async def compute_group_reward(
    self, rollout_ids, completions, ground_truths, **kwargs
) -> list[dict[str, float]]:
    raw = [self._score(c, gt) for c, gt in zip(completions, ground_truths)]

    # rank within group: 0.0 = worst, 1.0 = best
    order = sorted(range(len(raw)), key=lambda i: raw[i])
    ranks = [0.0] * len(raw)
    for position, idx in enumerate(order):
        ranks[idx] = position / (len(raw) - 1) if len(raw) > 1 else 1.0

    return [{"rank": r} for r in ranks]

self-consistency (label-free)

when ground_truths is empty or unreliable, majority consensus across the group acts as a pseudo-label. rollouts that agree with the majority get a positive signal; outliers get none.

async def compute_group_reward(
    self, rollout_ids, completions, ground_truths, **kwargs
) -> list[dict[str, float]]:
    answers = [self._extract_answer(c) for c in completions]

    # find the most common answer
    counts = Counter(answers)
    majority, majority_count = counts.most_common(1)[0]

    # reward agreement with majority, scaled by how strong the consensus is.
    # outliers score 0, not a negative value (keep components non-negative).
    consensus_strength = majority_count / len(answers)
    return [
        {"consistency": consensus_strength if a == majority else 0.0}
        for a in answers
    ]

useful for tasks where you have prompts but no labels, or where ground truth is expensive to produce. works best when the group size is large enough (≥8) that the majority answer is meaningful.

test coverage / complementarity

reward passing the cases others missed. this pushes the group toward pass@k coverage (one rollout specializes in the edge cases another handles easily) rather than having all rollouts pile onto the same easy tests.

async def compute_group_reward(
    self, rollout_ids, completions, ground_truths, **kwargs
) -> list[dict[str, float]]:
    # passed[i] is a set of test-case ids that rollout i passed
    passed = [self._run_tests(c) for c in completions]
    all_passed = [tc for p in passed for tc in p]
    pass_counts = Counter(all_passed)

    scores = []
    for p in passed:
        # tests passed by fewer rollouts are worth more
        complementarity = sum(1.0 / pass_counts[tc] for tc in p)
        # normalize by group size so scale is stable
        scores.append(complementarity / len(rollout_ids))

    return [{"complementarity": s} for s in scores]

a test passed by only one rollout contributes 1/1 = 1.0; one passed by all contributes 1/n ≈ 0. the result incentivizes finding unique solutions rather than rediscovering the majority path.

diversity scaling

without diversity pressure, the model can collapse to a single strategy per prompt. scale_by_diversity groups the rollouts into clusters of similar responses, then divides each rollout’s reward by the size of its cluster. a strategy that five rollouts converge on has its reward split five ways; a unique strategy keeps full reward. this rewards exploring different approaches instead of piling onto one.

from benchmax.rewards.diversity import DiversityConfig, scale_by_diversity

async def compute_group_reward(self, rollout_ids, messages_list, tasks, **kwargs):
    raw = [
        await self.compute_reward(rid, m, t)
        for rid, m, t in zip(rollout_ids, messages_list, tasks)
    ]
    texts = [m[-1]["content"] for m in messages_list if m]
    scaled, cluster_info = await scale_by_diversity(
        rewards=raw,
        texts=texts,
        config=DiversityConfig(method="ngram"),
    )
    return scaled

what counts as “similar” depends on the method:

methodcompareshowtradeoff
ngramsurface formcharacter n-gram overlap (Jaccard similarity), so responses that share wording cluster togetherfast, offline, no API calls. good when distinct strategies also read differently
llmmeaninga judge model groups responses by underlying tactic, so two differently-worded responses with the same approach still clustercatches paraphrases, but costs judge calls and needs model + base_url

DiversityConfig controls the method and its parameters:

# fast offline clustering (default)
config = DiversityConfig(method="ngram", similarity_threshold=0.5)

# semantic clustering via LLM
config = DiversityConfig(
    method="llm",
    model="gpt-5.4-mini",
    base_url="https://api.openai.com/v1",
    api_key="...",
)

on clustering failure, the default fallback treats every rollout as unique (no penalty). set fallback_on_error="uniform" to penalize all rollouts equally instead.