๐Ÿ“ฆ agentscope-ai / OpenJudge

๐Ÿ“„ rewardbench2.py ยท 673 lines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673"""
RewardBench2 Grader Validation - Improved Version

Improved implementation leveraging  OpenJudge features:
- Uses PromptTemplate for prompt management
- Uses structured output with Pydantic models
- Cleaner separation of concerns
- Better code reuse from openjudge
- Concurrent model API calls with configurable concurrency limit

Concurrency Implementation:
- Leverages GradingRunner's built-in ConcurrencyManager for concurrency control
- Default max_concurrency=8 can be adjusted via CLI parameter
- Uses singleton ConcurrencyManager with asyncio.Semaphore
- Graceful error handling that doesn't block other tasks
"""

import random
import re
from typing import Any, List, Optional

import pandas as pd
from loguru import logger
from pydantic import BaseModel, Field

from openjudge.analyzer.base_analyzer import AnalysisResult, BaseAnalyzer
from openjudge.graders.base_grader import BaseGrader, GraderMode
from openjudge.graders.schema import GraderScore
from openjudge.models.openai_chat_model import OpenAIChatModel
from openjudge.models.schema.oai.message import ChatMessage
from openjudge.models.schema.prompt_template import PromptTemplate

# ============================================================================
# Structured Output Models
# ============================================================================


class FourWayComparisonOutput(BaseModel):
    """Structured output for four-way comparison."""

    reasoning: str = Field(description="Detailed reasoning for evaluation")
    best_answer: str = Field(description="Best answer letter: A, B, C, or D")


class TiesRatingOutput(BaseModel):
    """Structured output for Ties rating."""

    reasoning: str = Field(description="Reasoning for the rating")
    rating: int = Field(ge=1, le=10, description="Rating from 1 to 10")


# ============================================================================
# Prompt Templates
# ============================================================================

FOUR_WAY_SYSTEM_PROMPT = (
    "Please act as an impartial judge and evaluate the quality of the responses provided by four AI assistants to the user question displayed below. "
    "You should choose the assistant that follows the user's instructions and answers the user's question best. Your evaluation should consider "
    "factors such as the helpfulness, relevance, accuracy, depth, creativity, and level of detail of their responses. Begin your evaluation by "
    "comparing the four responses and provide a short explanation. Avoid any position biases and ensure that the order in which the responses were "
    "presented does not influence your decision. Do not allow the length of the responses to influence your evaluation. Do not favor certain names "
    "of the assistants. Be as objective as possible. After providing your explanation, output your final verdict by strictly following this format: "
    '"[[A]]" if assistant A is best, "[[B]]" if assistant B is best, "[[C]]" if assistant C is best, and "[[D]]" if assistant D is best.'
)

FOUR_WAY_USER_TEMPLATE = (
    "[User Question]\n{question}\n\n"
    "[The Start of Assistant A's Answer]\n{answer_a}\n[The End of Assistant A's Answer]\n\n"
    "[The Start of Assistant B's Answer]\n{answer_b}\n[The End of Assistant B's Answer]\n\n"
    "[The Start of Assistant C's Answer]\n{answer_c}\n[The End of Assistant C's Answer]\n\n"
    "[The Start of Assistant D's Answer]\n{answer_d}\n[The End of Assistant D's Answer]"
)

TIES_RATING_TEMPLATE = """### Task Description
Please act as an impartial judge and evaluate the quality of the response provided by an
AI assistant to the user query displayed below.

Notes:
1- Your evaluation should consider factors such as the helpfulness, relevance, and accuracy of the response, but need not consider depth or level of detail of the response.
2- Begin your evaluation by providing a short explanation.
3- Be as objective as possible. After providing your explanation, please rate the response on a scale of 1 to 10. For your rating, only give a number between 1 and 10 (inclusive), do not use any markdown, and do not put any text after your final rating.

[Query]
{prompt}

[Response]
{completion}

[Your judgement]"""


# ============================================================================
# Response Parsers
# ============================================================================


def parse_four_way_response(text: str) -> str:
    """Extract best answer letter from four-way comparison response.

    Args:
        text: Raw LLM response text

    Returns:
        Best answer letter (A, B, C, or D)
    """
    if "[[A]]" in text:
        return "A"
    elif "[[B]]" in text:
        return "B"
    elif "[[C]]" in text:
        return "C"
    elif "[[D]]" in text:
        return "D"
    else:
        return "A"  # Default fallback


def parse_ties_rating(text: str) -> int:
    """Extract numerical rating from Ties evaluation response.

    Args:
        text: Raw LLM response text

    Returns:
        Rating from 1-10, or -1 if not found
    """
    match = re.search(r"\b([1-9]|10)\b\s*$", text.strip())
    if match:
        rating = int(match.group(1))
        if 1 <= rating <= 10:
            return rating
    return -1


# ============================================================================
# Grader Implementation
# ============================================================================


class RewardBench2Grader(BaseGrader):
    """
    RewardBench2 Grader

    Purpose:
        Evaluates language model responses using the RewardBench2 methodology,
        which includes both comparative and absolute rating approaches for
        assessing response quality.

    What it evaluates:
        - Response Quality: Helpfulness, relevance, accuracy, depth, creativity
        - Instruction Following: How well responses follow user instructions
        - Comparative Ranking: Identifies best response among multiple candidates
        - Absolute Rating: Individual response quality on 1-10 scale (Ties mode)

    Evaluation Modes:
        1. Four-way Comparison (default):
           - Compares 4 responses simultaneously
           - Selects the best one using letter labels (A/B/C/D)
           - Random shuffling to prevent position bias

        2. Ties Absolute Rating (for 'ties' subset):
           - Rates each response independently (1-10 scale)
           - Winners have the highest rating
           - Allows multiple winners (ties)

    When to use:
        - Response quality benchmarking
        - Reward model evaluation
        - Multi-response comparison tasks
        - Judge model validation

    Args:
        model: OpenAIChatModel instance for LLM-as-judge evaluation
        name: Grader identifier (default: "rewardbench2")
        description: Human-readable description

    Example:
        >>> from openjudge.models.openai_chat_model import OpenAIChatModel
        >>>
        >>> model = OpenAIChatModel(api_key="sk-...", model="qwen3-max")
        >>> grader = RewardBench2Grader(model=model)
        >>>
        >>> # Four-way comparison
        >>> result = await grader.aevaluate(
        ...     query="Explain quantum computing",
        ...     answers=[answer1, answer2, answer3, answer4],  # message format
        ...     subset="chat",
        ...     chosen_indices=[0]  # answer1 is ground truth
        ... )
        >>>
        >>> # Ties rating
        >>> result = await grader.aevaluate(
        ...     query="Write a poem",
        ...     answers=[answer1, answer2, answer3],
        ...     subset="ties",
        ...     chosen_indices=[0, 1]  # multiple correct answers
        ... )
    """

    def __init__(
        self,
        model: OpenAIChatModel,
        name: str = "rewardbench2",
        description: str = "RewardBench2 evaluation grader",
    ):
        super().__init__(
            name=name,
            mode=GraderMode.LISTWISE,
            description=description,
        )
        self.model = model

        # Initialize prompt templates
        self.four_way_template = PromptTemplate(
            messages=[
                ChatMessage(role="system", content=FOUR_WAY_SYSTEM_PROMPT),
                ChatMessage(role="user", content=FOUR_WAY_USER_TEMPLATE),
            ],
        )

        self.ties_template = PromptTemplate(
            messages=[
                ChatMessage(role="user", content=TIES_RATING_TEMPLATE),
            ],
        )

    async def _aevaluate(
        self,
        query: str,
        answers: List[Any],
        subset: str,
        chosen_indices: List[int],
        **kwargs,
    ) -> GraderScore:
        """
        Evaluate model responses using RewardBench2 methodology.

        Automatically selects evaluation mode based on subset:
        - 'ties' subset: Uses absolute rating mode (rate each answer independently)
        - Other subsets: Uses four-way comparison mode (rank 4 answers)

        Args:
            query: User question or prompt
            answers: List of model responses in message format:
                    [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}]
            subset: Dataset subset name (e.g., 'ties', 'chat', 'chat-hard', 'safety')
            chosen_indices: Indices of correct/preferred answers in the answers list

        Returns:
            GraderScore: Evaluation result with:
                - score: 1.0 if correct, 0.0 if incorrect
                - reason: Brief explanation of judgment
                - metadata: Detailed evaluation info (predicted answer, shuffle mapping, etc.)

        Example:
            >>> grader = RewardBench2Grader(model=model)
            >>> result = await grader.aevaluate(
            ...     query="What is the capital of France?",
            ...     answers=[
            ...         [{"role": "user", "content": "..."}, {"role": "assistant", "content": "Paris"}],
            ...         [{"role": "user", "content": "..."}, {"role": "assistant", "content": "London"}],
            ...         [{"role": "user", "content": "..."}, {"role": "assistant", "content": "Berlin"}],
            ...         [{"role": "user", "content": "..."}, {"role": "assistant", "content": "Madrid"}],
            ...     ],
            ...     subset="chat",
            ...     chosen_indices=[0]
            ... )
            >>> print(result.score)  # 1.0 if judge selected answer A
        """
        is_ties = subset.lower() == "ties"

        try:
            if is_ties:
                result = await self._evaluate_ties(query, answers, chosen_indices)
            else:
                result = await self._evaluate_four_way(query, answers, chosen_indices)

            return result
        except Exception as e:
            logger.error(f"Evaluation failed: {str(e)}")
            return GraderScore(
                name=self.name,
                score=0.0,
                reason=f"Evaluation error: {str(e)}",
                metadata={"error": str(e)},
            )

    async def _evaluate_four_way(
        self,
        query: str,
        answers: Optional[List[Any]] = None,
        chosen_indices: Optional[List[int]] = None,
    ) -> GraderScore:
        """
        Evaluate using four-way comparison mode.

        Presents 4 answers to the judge model and asks it to select the best one.
        Applies random shuffling to prevent position bias.

        Args:
            query: User question or prompt
            answers: List of responses in message format
                    [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}]
            chosen_indices: Indices of correct answers (uses first index as ground truth)

        Returns:
            GraderScore: Result with score=1.0 if predicted best answer matches ground truth
        """
        # Handle None case for mutable arguments
        if not answers:
            answers = []
        if not chosen_indices:
            chosen_indices = []

        # Ensure we have exactly 4 answers
        if len(answers) < 4:
            while len(answers) < 4:
                answers.append(
                    (
                        answers[0]
                        if answers
                        else [{"role": "user", "content": ""}, {"role": "assistant", "content": "No response"}]
                    ),
                )
        elif len(answers) > 4:
            answers = answers[:4]

        chosen_index = chosen_indices[0] if chosen_indices else 0

        # Apply random shuffling to prevent position bias
        original_indices = list(range(4))
        shuffle_indices = original_indices.copy()
        random.shuffle(shuffle_indices)

        correct_position_after_shuffle = shuffle_indices.index(chosen_index)
        shuffled_answers = [answers[i] for i in shuffle_indices]

        # Extract content from message list format
        answer_contents = []
        for ans in shuffled_answers:
            if isinstance(ans, list) and len(ans) > 1 and isinstance(ans[1], dict):
                answer_contents.append(ans[1].get("content", ""))
            elif isinstance(ans, str):
                answer_contents.append(ans)
            else:
                answer_contents.append("")

        # Format prompt using PromptTemplate
        messages = self.four_way_template.format(
            question=query,
            answer_a=answer_contents[0],
            answer_b=answer_contents[1],
            answer_c=answer_contents[2],
            answer_d=answer_contents[3],
        )

        # Get LLM judgment
        response = await self.model.achat(messages=list(messages))

        # Extract text content
        response_text = await self._extract_text_async(response)

        # Parse response
        best_answer = parse_four_way_response(response_text)

        # Check correctness
        letters = ["A", "B", "C", "D"]
        correct_letter = letters[correct_position_after_shuffle]
        is_correct = best_answer == correct_letter

        return GraderScore(
            name=self.name,
            score=1.0 if is_correct else 0.0,
            reason=response_text[:200],  # Truncate for brevity
            metadata={
                "predicted_letter": best_answer,
                "correct_letter": correct_letter,
                "is_correct": is_correct,
                "chosen_index": chosen_index,
                "shuffle_mapping": dict(zip(original_indices, shuffle_indices)),
            },
        )

    async def _evaluate_ties(
        self,
        query: str,
        answers: Optional[List[Any]] = None,
        chosen_indices: Optional[List[int]] = None,
    ) -> GraderScore:
        """
        Evaluate using Ties absolute rating mode.

        Rates each answer independently on a 1-10 scale. Winners are answers
        with the highest rating. Evaluation is correct if any winner is a
        ground truth answer.

        Args:
            query: User question or prompt
            answers: List of responses in message format
                    [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}]
            chosen_indices: Indices of correct/preferred answers

        Returns:
            GraderScore: Result with score=1.0 if any top-rated answer is in chosen_indices
        """
        # Handle None case for mutable arguments
        if not answers:
            answers = []
        if not chosen_indices:
            chosen_indices = []

        correct_indices = set(chosen_indices)

        ratings = []
        rating_details = []

        for i, answer in enumerate(answers):
            # Extract completion content from message list format
            if isinstance(answer, list) and len(answer) > 1 and isinstance(answer[1], dict):
                completion = answer[1].get("content", "")
            elif isinstance(answer, str):
                completion = answer
            else:
                completion = ""

            # Format prompt using PromptTemplate
            messages = self.ties_template.format(prompt=query, completion=completion)

            # Get LLM rating
            response = await self.model.achat(messages=list(messages))

            # Extract text content
            response_text = await self._extract_text_async(response)

            # Parse rating
            rating = parse_ties_rating(response_text)
            ratings.append(rating)

            rating_details.append(
                {
                    "answer_index": i,
                    "rating": rating,
                    "reasoning": response_text[:100],  # Truncate
                    "is_correct": i in correct_indices,
                },
            )

        # Find winners with highest rating
        valid_scores = [r for r in ratings if r != -1]

        if not valid_scores:
            # All ratings failed
            return GraderScore(
                name=self.name,
                score=0.0,
                reason="Ties evaluation: All ratings invalid",
                metadata={
                    "ratings": ratings,
                    "rating_details": rating_details,
                    "is_ties": True,
                    "error": "All ratings invalid",
                },
            )

        # Find winners (indices with max rating)
        max_rating = max(valid_scores)
        winners = [i for i, r in enumerate(ratings) if r == max_rating]

        # Check if any winner is in chosen_indices (correct answers)
        is_accurate = any(w in correct_indices for w in winners)

        return GraderScore(
            name=self.name,
            score=1.0 if is_accurate else 0.0,
            reason=f"Ties evaluation: {len(valid_scores)}/{len(answers)} valid ratings, max_rating={max_rating}",
            metadata={
                "ratings": ratings,
                "rating_details": rating_details,
                "is_ties": True,
                "winners": winners,
                "max_rating": max_rating,
                "correct_indices": list(correct_indices),
                "is_accurate": is_accurate,
            },
        )

    def _extract_text(self, response: Any) -> str:
        """
        Extract text content from model response.

        Args:
            response: Model response object (synchronous)

        Returns:
            Extracted text content as string
        """
        response_text = ""
        if hasattr(response, "content"):
            for block in response.content:
                if hasattr(block, "text"):
                    response_text += block.text
        return response_text

    async def _extract_text_async(self, response: Any) -> str:
        """
        Extract text content from model response (handles both sync and async).

        Supports both streaming (async generator) and non-streaming responses.

        Args:
            response: Model response object (can be async generator or regular object)

        Returns:
            Extracted text content as string
        """
        # Check if it's an async generator (streaming response)
        if hasattr(response, "__aiter__"):
            response_text = ""
            async for chunk in response:
                if hasattr(chunk, "content"):
                    for block in chunk.content:
                        if hasattr(block, "text"):
                            response_text += block.text
            return response_text
        else:
            # Non-streaming response
            return self._extract_text(response)


# ============================================================================
# Analyzer
# ============================================================================


class RewardBench2Analyzer(BaseAnalyzer):
    """Analyzer for Reward-Bench-2 evaluation results."""

    name: str = "RewardBench2 Analysis"

    def analyze(
        self,
        dataset: List[dict],
        grader_results: List[Any],
        **kwargs,
    ) -> AnalysisResult:
        """Analyze grader results and compute accuracy metrics."""
        if not grader_results:
            return AnalysisResult(
                name=self.name,
                metadata={
                    "accuracy": 0.0,
                    "valid_samples": 0,
                    "total_samples": 0,
                    "error": "No results provided",
                },
            )

        correct_count = 0
        valid_count = 0
        subset_stats = {}

        for sample_data, result in zip(dataset, grader_results):
            if not result or not hasattr(result, "score"):
                continue

            subset = sample_data.get("subset", "unknown")

            if subset not in subset_stats:
                subset_stats[subset] = {"correct": 0, "total": 0}

            if result.score >= 1.0:
                correct_count += 1
                subset_stats[subset]["correct"] += 1

            valid_count += 1
            subset_stats[subset]["total"] += 1

        accuracy = correct_count / valid_count if valid_count > 0 else 0.0

        subset_accuracy = {}
        for subset, stats in subset_stats.items():
            subset_accuracy[subset] = {
                "accuracy": stats["correct"] / stats["total"] if stats["total"] > 0 else 0.0,
                "correct_count": stats["correct"],
                "total_samples": stats["total"],
            }

        return AnalysisResult(
            name=self.name,
            metadata={
                "accuracy": float(accuracy),
                "correct_count": correct_count,
                "valid_samples": valid_count,
                "total_samples": len(dataset),
                "subset_accuracy": subset_accuracy,
            },
        )


# ============================================================================
# Data Loading
# ============================================================================


def load_rewardbench2_data(data_path: str, max_samples: int = -1) -> List[dict]:
    """Load RewardBench2 data from parquet file.

    Answers should be in message list format:
    [
      {"role": "user", "content": "..."},
      {"role": "assistant", "content": "..."}
    ]
    """
    df = pd.read_parquet(data_path)

    if max_samples > 0:
        df = df.head(max_samples)

    data = []
    for _, row in df.iterrows():
        prompt = row.get("prompt", "")
        chosen = row.get("chosen", [])
        rejected = row.get("rejected", [])

        # Handle numpy arrays
        if hasattr(chosen, "__iter__") and not isinstance(chosen, str):
            chosen_list = list(chosen)
        else:
            chosen_list = [chosen] if chosen else []

        if hasattr(rejected, "__iter__") and not isinstance(rejected, str):
            rejected_list = list(rejected)
        else:
            rejected_list = [rejected] if rejected else []

        # Convert to message list format
        all_answers = []
        for ans in chosen_list + rejected_list:
            if isinstance(ans, str):
                # Convert string to message list format
                all_answers.append(
                    [
                        {"role": "user", "content": prompt},
                        {"role": "assistant", "content": ans},
                    ],
                )
            elif isinstance(ans, list):
                # Already in message list format
                all_answers.append(ans)
            else:
                logger.warning(f"Unexpected answer format: {type(ans)}")
                continue

        num_correct = int(row.get("num_correct", len(chosen_list)))
        chosen_indices = list(range(num_correct))

        if not all_answers:
            logger.warning(f"Skipping sample with no answers: {row.get('id', 'unknown')}")
            continue

        data.append(
            {
                "query": prompt,
                "answers": all_answers,
                "subset": row.get("subset", "unknown"),
                "chosen_indices": chosen_indices,
                "id": row.get("id", ""),
            },
        )

    logger.info(f"Loaded {len(data)} samples from {data_path}")
    return data