]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
scripts: add Jenkins unit test analysis tool 68851/head
authorstzuraski898 <steven.zuraski@ibm.com>
Mon, 11 May 2026 20:10:07 +0000 (20:10 +0000)
committerstzuraski898 <steven.zuraski@ibm.com>
Tue, 26 May 2026 21:29:00 +0000 (21:29 +0000)
Add analyze_unittest_jenkins.py to aggregate and analyze unit test
results across multiple Ceph PRs by mining Jenkins CI build logs.

The script fetches recent open PRs from GitHub, extracts Jenkins build
URLs from PR checks, downloads console logs in parallel, and parses
CTest output to generate comprehensive failure reports.

This enables data-driven decisions about test infrastructure
improvements and helps identify systemic issues in the test suite.

Fixes: https://tracker.ceph.com/issues/76513
Assisted-by: IBM Bob (Claude 3.7 Sonnet)
Signed-off-by: Steven Zuraski <steven.zuraski@ibm.com>
src/script/analyze_unittest_jenkins.py [new file with mode: 0644]

diff --git a/src/script/analyze_unittest_jenkins.py b/src/script/analyze_unittest_jenkins.py
new file mode 100644 (file)
index 0000000..3877fda
--- /dev/null
@@ -0,0 +1,1161 @@
+#!/usr/bin/env python3
+"""
+Analyze unit test results across Ceph PRs on Jenkins.
+
+This script:
+1. Fetches recent open PRs from GitHub
+2. Extracts Jenkins build URLs from PR checks
+3. Downloads console logs
+4. Parses for unit test results (all tests or filtered by pattern)
+5. Generates statistics and failure reports
+
+Usage:
+    python3 analyze_unittest_jenkins.py --help
+    python3 analyze_unittest_jenkins.py [--max-prs 100] [--output report.json]
+    python3 analyze_unittest_jenkins.py --test-filter unittest_mgr --max-prs 50
+    python3 analyze_unittest_jenkins.py --release reef --max-prs 30
+    python3 analyze_unittest_jenkins.py --release main --test-filter unittest_mgr
+    
+Requirements:
+    pip install requests beautifulsoup4
+"""
+
+import argparse
+import json
+import os
+import re
+import sys
+import threading
+import time
+from collections import defaultdict
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from dataclasses import dataclass
+from datetime import datetime
+from itertools import islice
+from pathlib import Path
+from typing import Any, Dict, Generator, List, Optional, TextIO, Tuple
+from urllib.parse import urljoin
+
+try:
+    import requests
+    from requests.adapters import HTTPAdapter
+    from bs4 import BeautifulSoup
+except ImportError:
+    print("Error: Required packages not installed.")
+    print("Please run: pip install requests beautifulsoup4")
+    sys.exit(1)
+
+
+@dataclass(frozen=True)
+class TestOutcome:
+    """Configuration for a test outcome type.
+    
+    Attributes:
+        counter_key: The results dictionary key to increment ('passed' or 'failed')
+        is_failure: Whether this outcome represents a test failure
+        suffix: Optional suffix to append to failed test names (e.g., ' (timeout)')
+    """
+    counter_key: str
+    is_failure: bool
+    suffix: Optional[str] = None
+
+
+class JenkinsAnalyzer:
+    """Analyzes unit test results from Jenkins builds."""
+    
+    # API Configuration Constants
+    DEFAULT_PER_PAGE = 100
+    REQUEST_TIMEOUT = 30
+    MAX_RETRY_ATTEMPTS = 3
+    MIN_404_THRESHOLD = 3  # Minimum 404s before early termination
+    
+    def __init__(
+        self,
+        github_token: Optional[str] = None,
+        max_workers: int = 10,
+        test_filter: Optional[str] = None,
+        verbose: bool = True,
+        log_file: Optional[str] = None
+    ) -> None:
+        """Initialize Jenkins analyzer.
+        
+        Args:
+            github_token: GitHub API token for higher rate limits
+            max_workers: Number of parallel workers for downloads
+            test_filter: Optional test name pattern to filter results
+            verbose: Enable verbose console output
+            log_file: Optional file path for detailed logging
+        """
+        self.github_token: Optional[str] = github_token
+        self.max_workers: int = max_workers
+        self.test_filter: Optional[str] = test_filter
+        self.verbose: bool = verbose
+        self.log_file: Optional[str] = log_file
+        self.log_lock: threading.Lock = threading.Lock()
+        self.session: requests.Session = requests.Session()
+        self.log_fp: Optional[TextIO] = None
+        
+        # Configure session with connection pooling for better performance
+        if github_token:
+            self.session.headers['Authorization'] = f'token {github_token}'
+        
+        # Enable connection pooling with larger pool size
+        adapter = HTTPAdapter(
+            pool_connections=max_workers * 2,
+            pool_maxsize=max_workers * 4,
+            max_retries=0  # We handle retries manually
+        )
+        self.session.mount('http://', adapter)
+        self.session.mount('https://', adapter)
+        
+        # Open log file with proper encoding
+        if log_file:
+            self.log_fp = Path(log_file).open('w', encoding='utf-8')
+        
+        # Build test parsing patterns
+        self.test_patterns: Dict[str, re.Pattern] = self._build_test_patterns(test_filter)
+    
+    def _build_test_patterns(self, test_filter: Optional[str]) -> Dict[str, re.Pattern]:
+        """Build regex patterns for parsing CTest console output.
+        
+        Note: There's NO space between dots and ***Failed/***Timeout/etc.
+        
+        Args:
+            test_filter: Optional test name pattern to filter results
+            
+        Returns:
+            Dictionary of compiled regex patterns for parsing test output
+        """
+        if test_filter:
+            # Filter for specific test pattern
+            test_pattern = rf'({re.escape(test_filter)}\S*)'
+        else:
+            # Capture ALL tests (no filter)
+            test_pattern = r'(\S+)'
+        
+        return {
+            'test_start': re.compile(rf'Start\s+\d+:\s+{test_pattern}'),
+            'test_passed': re.compile(rf'\d+/\d+\s+Test\s+#\d+:\s+{test_pattern}\s+\.+\s+Passed\s+([\d.]+)\s+sec'),
+            'test_failed': re.compile(rf'\d+/\d+\s+Test\s+#\d+:\s+{test_pattern}\s+\.+\*\*\*Failed\s+([\d.]+)\s+sec'),
+            'test_timeout': re.compile(rf'\d+/\d+\s+Test\s+#\d+:\s+{test_pattern}\s+\.+\*\*\*Timeout\s+([\d.]+)\s+sec'),
+            'test_exception': re.compile(rf'\d+/\d+\s+Test\s+#\d+:\s+{test_pattern}\s+\.+\*\*\*Exception:\s+'),
+            'test_not_run': re.compile(rf'\d+/\d+\s+Test\s+#\d+:\s+{test_pattern}\s+\.+\*\*\*Not Run\s+'),
+            'summary': re.compile(r'(\d+)%\s+tests\s+passed,\s+(\d+)\s+tests\s+failed\s+out\s+of\s+(\d+)'),
+        }
+    
+    def log(self, message: str, force_print: bool = False, thread_id: Optional[int] = None):
+        """Log message to file and/or console based on verbosity settings.
+        
+        Args:
+            message: The message to log
+            force_print: Force printing even if verbose is False
+            thread_id: Optional thread ID to include in the log message
+        """
+        if thread_id is not None:
+            formatted_message = f"  [TID:{thread_id}] {message}"
+        else:
+            formatted_message = message
+            
+        with self.log_lock:
+            if self.log_fp:
+                self.log_fp.write(formatted_message + '\n')
+                self.log_fp.flush()
+            if self.verbose or force_print:
+                print(formatted_message)
+    
+    def __enter__(self):
+        """Context manager entry."""
+        return self
+    
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        """Context manager exit with automatic cleanup."""
+        self.close()
+        return False
+    
+    def close(self):
+        """Close log file if open."""
+        if self.log_fp and not self.log_fp.closed:
+            self.log_fp.close()
+            self.log_fp = None
+    
+    def _fetch_with_retry(
+        self, 
+        url: str, 
+        params: Dict[str, Any],
+        max_retries: Optional[int] = None
+    ) -> Optional[requests.Response]:
+        """Fetch URL with exponential backoff retry logic.
+        
+        Args:
+            url: URL to fetch
+            params: Query parameters
+            max_retries: Maximum number of retry attempts (defaults to class constant)
+            
+        Returns:
+            Response object or None if all retries failed
+        """
+        if max_retries is None:
+            max_retries = self.MAX_RETRY_ATTEMPTS
+            
+        for attempt in range(max_retries):
+            try:
+                response = self.session.get(url, params=params, timeout=self.REQUEST_TIMEOUT)
+                response.raise_for_status()
+                return response
+            except requests.RequestException as e:
+                if attempt == max_retries - 1:
+                    self.log(f"  Error fetching PRs after {max_retries} attempts: {e}")
+                    return None
+                
+                wait_time = 2 ** attempt  # Exponential backoff: 1s, 2s, 4s
+                self.log(f"  Request failed (attempt {attempt + 1}/{max_retries}), retrying in {wait_time}s: {e}")
+                time.sleep(wait_time)
+        
+        return None
+    
+    def _fetch_pr_pages(self, per_page: Optional[int] = None) -> Generator[List[Dict], None, None]:
+        """Generator that yields pages of open PRs from GitHub API.
+        
+        Args:
+            per_page: Number of PRs per page (defaults to class constant)
+            
+        Yields:
+            List of PR dictionaries from each API page
+        """
+        if per_page is None:
+            per_page = self.DEFAULT_PER_PAGE
+            
+        page = 1
+        url = "https://api.github.com/repos/ceph/ceph/pulls"
+        
+        while True:
+            params = {
+                'state': 'open',
+                'sort': 'updated',
+                'direction': 'desc',
+                'per_page': per_page,
+                'page': page
+            }
+            
+            response = self._fetch_with_retry(url, params)
+            if response is None:
+                break
+                
+            page_prs = response.json()
+            if not page_prs:
+                break
+                
+            yield page_prs
+            page += 1
+    
+    def fetch_recent_prs(self, max_prs: int = 100, release: str = "main") -> List[Dict]:
+        """Fetch recent open PRs from GitHub, filtering by target release branch.
+        
+        Args:
+            max_prs: Maximum number of PRs to fetch
+            release: Target release branch to filter by
+            
+        Returns:
+            List of PR dictionaries matching the release filter
+        """
+        self.log("Fetching recent open PRs from GitHub...", force_print=True)
+
+        filtered_prs: List[Dict] = []
+        total_fetched = 0
+
+        for page_num, page_prs in enumerate(self._fetch_pr_pages(), start=1):
+            total_fetched += len(page_prs)
+            
+            # Filter PRs by release and collect up to remaining quota
+            matching_prs = (
+                pr for pr in page_prs
+                if pr.get('base', {}).get('ref') == release
+            )
+            remaining = max_prs - len(filtered_prs)
+            new_prs = list(islice(matching_prs, remaining))
+            filtered_prs.extend(new_prs)
+
+            self.log(
+                f"  Fetched page {page_num}: {len(page_prs)} open PRs "
+                f"({len(new_prs)} targeting '{release}', "
+                f"{len(filtered_prs)} collected so far)"
+            )
+
+            if len(filtered_prs) >= max_prs:
+                break
+
+        self.log(
+            f"Fetched {total_fetched} open PRs, returning "
+            f"{len(filtered_prs)} targeting '{release}'",
+            force_print=True
+        )
+        return filtered_prs
+    
+    def _should_skip_build(self, name: str, skip_builds: set) -> bool:
+        """Check if a build should be skipped based on its name.
+        
+        Args:
+            name: The build name to check
+            skip_builds: Set of build name patterns to skip
+            
+        Returns:
+            True if the build should be skipped, False otherwise
+        """
+        if not skip_builds:
+            return False
+        name_lower = name.lower()
+        return any(skip.lower() in name_lower for skip in skip_builds)
+    
+    def _get_skip_builds(self) -> set:
+        """Get the set of build names to skip based on test filter.
+        
+        These builds never contain unit tests, so we skip them to save time.
+        
+        Returns:
+            Set of build name patterns to skip
+        """
+        # Always skip these - they're not actual builds
+        always_skip = {'Signed-off-by', 'Unmodified Submodules'}
+        
+        # These builds never run unit tests, so always skip them
+        no_unittest_builds = {
+            'ceph API tests',
+            'ceph windows tests',
+            'ceph dashboard tests',
+            'ceph dashboard cephadm e2e tests'
+        }
+        
+        return always_skip | no_unittest_builds
+    
+    def _fetch_check_runs(self, pr_data: Dict) -> List[Dict]:
+        """Fetch check-runs for a PR using its head commit SHA.
+        
+        Args:
+            pr_data: The PR data dictionary containing head commit info
+            
+        Returns:
+            List of check-run dictionaries, or empty list on error
+        """
+        pr_number = pr_data.get('number')
+        head_sha = pr_data.get('head', {}).get('sha')
+        
+        if not head_sha:
+            self.log(f"  No head SHA found for PR #{pr_number}")
+            return []
+            
+        url = f"https://api.github.com/repos/ceph/ceph/commits/{head_sha}/check-runs"
+        try:
+            response = self.session.get(url, timeout=30)
+            if response.status_code == 200:
+                return response.json().get('check_runs', [])
+        except requests.RequestException as e:
+            self.log(f"  Error fetching check-runs for PR #{pr_number}: {e}")
+        return []
+    
+    def _fetch_statuses(self, pr_number: int) -> List[Dict]:
+        """Fetch statuses for a PR.
+        
+        Args:
+            pr_number: The PR number to fetch statuses for
+            
+        Returns:
+            List of status dictionaries, or empty list on error
+        """
+        try:
+            pr_url = f"https://api.github.com/repos/ceph/ceph/pulls/{pr_number}"
+            response = self.session.get(pr_url, timeout=30)
+            response.raise_for_status()
+            pr_data = response.json()
+            
+            statuses_url = pr_data.get('statuses_url')
+            if statuses_url:
+                response = self.session.get(statuses_url, timeout=30)
+                if response.status_code == 200:
+                    return response.json()
+        except requests.RequestException as e:
+            self.log(f"  Error fetching statuses for PR #{pr_number}: {e}")
+        return []
+    
+    def _extract_urls_from_check_runs(self, check_runs: List[Dict], skip_builds: set) -> Dict[str, str]:
+        """Extract Jenkins URLs from GitHub check-runs.
+        
+        Args:
+            check_runs: List of check-run dictionaries from GitHub API
+            skip_builds: Set of build name patterns to skip
+            
+        Returns:
+            Dictionary mapping build names to Jenkins URLs
+        """
+        return {
+            check['name']: check['details_url']
+            for check in check_runs
+            if 'jenkins.ceph.com' in check.get('details_url', '')
+            and not self._should_skip_build(check.get('name', ''), skip_builds)
+        }
+    
+    def _extract_urls_from_statuses(self, statuses: List[Dict], skip_builds: set, existing_urls: Dict[str, str]) -> Dict[str, str]:
+        """Extract Jenkins URLs from GitHub statuses.
+        
+        Args:
+            statuses: List of status dictionaries from GitHub API
+            skip_builds: Set of build name patterns to skip
+            existing_urls: Dictionary of already-extracted URLs to avoid duplicates
+            
+        Returns:
+            Dictionary mapping build contexts to Jenkins URLs
+        """
+        return {
+            status['context']: status['target_url']
+            for status in statuses
+            if 'jenkins.ceph.com' in status.get('target_url', '')
+            and status.get('context', '') not in existing_urls
+            and not self._should_skip_build(status.get('context', ''), skip_builds)
+        }
+    
+    def extract_jenkins_urls(self, pr_data: Dict) -> Dict[str, str]:
+        """Extract Jenkins build URLs from PR checks using both check-runs and statuses APIs.
+        
+        Args:
+            pr_data: The PR data dictionary from GitHub API
+            
+        Returns:
+            Dictionary mapping build names to Jenkins URLs
+        """
+        pr_number = pr_data.get('number', 0)
+        skip_builds = self._get_skip_builds()
+        
+        # Fetch and extract from check-runs API (newer PRs)
+        check_runs = self._fetch_check_runs(pr_data)
+        jenkins_urls = self._extract_urls_from_check_runs(check_runs, skip_builds)
+        
+        # Fetch and extract from statuses API (older PRs)
+        if pr_number:
+            statuses = self._fetch_statuses(pr_number)
+            jenkins_urls.update(
+                self._extract_urls_from_statuses(statuses, skip_builds, jenkins_urls)
+            )
+        
+        return jenkins_urls
+    
+    def download_console_log(self, jenkins_url: str) -> tuple[Optional[str], bool]:
+        """Download console log from Jenkins build.
+        
+        Returns:
+            tuple: (console_log_text, is_404)
+        """
+        console_url = jenkins_url.rstrip('/') + '/consoleText'
+        
+        try:
+            response = requests.get(console_url, timeout=60)
+            response.raise_for_status()
+            return response.text, False
+        except requests.HTTPError as e:
+            # Track 404 errors for early termination logic
+            if e.response.status_code == 404:
+                return None, True
+            else:
+                self.log(f"Error downloading console log: {e}", thread_id=threading.get_ident())
+                return None, False
+        except requests.RequestException as e:
+            self.log(f"Error downloading console log: {e}", thread_id=threading.get_ident())
+            return None, False
+    
+    def parse_test_results(self, console_log: str) -> Dict[str, Any]:
+        """Parse test results from CTest console log with optimized single-pass parsing.
+        
+        Uses dataclass-based outcome configuration for type safety and performance.
+        Eliminates repeated dictionary lookups by using immutable TestOutcome objects.
+        """
+        # Outcome configuration using dataclass for type safety and performance
+        OUTCOME_CONFIGS = {
+            'test_passed': TestOutcome('passed', False),
+            'test_failed': TestOutcome('failed', True),
+            'test_timeout': TestOutcome('failed', True, ' (timeout)'),
+            'test_exception': TestOutcome('failed', True, ' (exception)'),
+            'test_not_run': TestOutcome('failed', True, ' (not run)')
+        }
+        
+        results = {
+            'found': False,
+            'test_suites': set(),
+            'total_tests': 0,
+            'passed': 0,
+            'failed': 0,
+            'failed_tests': [],
+            'execution_time_ms': 0
+        }
+        
+        tracked_tests = set()
+        
+        for line in console_log.splitlines():
+            # Parse test start - collect all matching tests
+            match = self.test_patterns['test_start'].search(line)
+            if match:
+                tracked_tests.add(match.group(1))
+                results['found'] = True
+                continue
+            
+            # Single-pass outcome matching with early exit
+            for pattern_key, outcome in OUTCOME_CONFIGS.items():
+                match = self.test_patterns[pattern_key].search(line)
+                if match:
+                    test_name = match.group(1)
+                    if test_name in tracked_tests:
+                        # Update counter using dataclass field
+                        results[outcome.counter_key] += 1
+                        
+                        # Add to failed tests if this is a failure outcome
+                        if outcome.is_failure:
+                            suffix = outcome.suffix or ''
+                            results['failed_tests'].append(f"{test_name}{suffix}")
+                        
+                        # Track unique test suite
+                        results['test_suites'].add(test_name)
+                    break  # Outcomes are mutually exclusive
+            
+            # Parse summary line (preserved for potential future use)
+            if self.test_patterns['summary'].search(line) and results['found']:
+                pass  # Summary already captured via per-test counts
+        
+        # Final conversions
+        results['test_suites'] = list(results['test_suites'])
+        results['total_tests'] = results['passed'] + results['failed']
+        
+        return results
+    
+    def _process_build(self, context_url: Tuple[str, str]) -> Tuple[str, str, Optional[Dict], bool]:
+        """Process a single Jenkins build.
+        
+        Args:
+            context_url: Tuple of (build_context, jenkins_url)
+            
+        Returns:
+            Tuple of (context, url, test_results, is_404)
+        """
+        context, url = context_url
+        console_log, is_404 = self.download_console_log(url)
+        if not console_log:
+            return context, url, None, is_404
+        
+        test_results = self.parse_test_results(console_log)
+        return context, url, test_results, False
+    
+    def _should_cancel_remaining_builds(self, not_found_count, processed_count,
+                                       early_404_count, builds_processed):
+        """Check if remaining builds should be cancelled due to 404s.
+        
+        Returns:
+            bool: True if builds should be cancelled
+        """
+        # Optimized early exit: if first MIN_404_THRESHOLD builds are ALL 404s
+        if (early_404_count >= self.MIN_404_THRESHOLD and
+            builds_processed <= self.MIN_404_THRESHOLD):
+            return True
+        
+        # Standard early exit: if we've processed at least MIN_404_THRESHOLD
+        # builds and ALL are 404s
+        if (not_found_count >= self.MIN_404_THRESHOLD and
+            not_found_count == processed_count):
+            return True
+        
+        return False
+
+    def _process_build_result(self, future, result, builds_processed,
+                             total_futures, tid):
+        """Process a single build result and update counters.
+        
+        Returns:
+            tuple: (is_404, should_continue)
+        """
+        build_start = time.perf_counter()
+        context, url, test_results, is_404 = future.result()
+        build_time = time.perf_counter() - build_start
+        
+        if is_404:
+            self.log(
+                f"Build 404 ({builds_processed}/{total_futures}): {context} "
+                f"({build_time:.2f}s)",
+                thread_id=tid
+            )
+            return True, True
+        
+        if test_results is None:
+            self.log(
+                f"Build failed ({builds_processed}/{total_futures}): {context} "
+                f"({build_time:.2f}s)",
+                thread_id=tid
+            )
+            return False, True
+        
+        result['jenkins_builds'][context] = {
+            'url': url,
+            'test_results': test_results
+        }
+        
+        if test_results['found']:
+            self.log(
+                f"Build complete ({builds_processed}/{total_futures}): {context} - "
+                f"{test_results['passed']} passed, {test_results['failed']} failed "
+                f"({build_time:.2f}s)",
+                thread_id=tid
+            )
+        else:
+            filter_msg = f" (filter: {self.test_filter})" if self.test_filter else ""
+            self.log(
+                f"Build complete ({builds_processed}/{total_futures}): {context} - "
+                f"No matching tests found{filter_msg} ({build_time:.2f}s)",
+                thread_id=tid
+            )
+        
+        return False, False
+
+    def _handle_404_result(self, future, futures, initial_submission_futures,
+                           not_found_count, builds_processed, result, tid):
+        """Update 404 tracking and cancel remaining builds when appropriate.
+        
+        Returns:
+            int: Updated 404 count
+        """
+        not_found_count += 1
+        early_404_count = 1 if future in initial_submission_futures else 0
+        processed_non_failures = len(result['jenkins_builds']) + not_found_count
+        
+        if self._should_cancel_remaining_builds(
+            not_found_count, processed_non_failures,
+            early_404_count, builds_processed
+        ):
+            cancelled_count = sum(1 for f in futures if f.cancel())
+            self.log(
+                f"Cancelled {cancelled_count} remaining builds "
+                f"(all 404s detected)",
+                thread_id=tid
+            )
+            return not_found_count, True
+        
+        return not_found_count, False
+
+    def _process_pr_builds(self, jenkins_urls, result, tid):
+        """Process Jenkins builds for a PR in parallel.
+        
+        Returns:
+            int: Number of 404 build results
+        """
+        not_found_count = 0
+
+        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
+            futures = [executor.submit(self._process_build, item)
+                      for item in jenkins_urls.items()]
+            initial_submission_futures = set(
+                futures[:self.MIN_404_THRESHOLD]
+                if len(futures) >= self.MIN_404_THRESHOLD
+                else futures
+            )
+            builds_processed = 0
+
+            for future in as_completed(futures):
+                builds_processed += 1
+                is_404, should_continue = self._process_build_result(
+                    future, result, builds_processed, len(futures), tid
+                )
+
+                if is_404:
+                    not_found_count, should_stop = self._handle_404_result(
+                        future, futures, initial_submission_futures,
+                        not_found_count, builds_processed, result, tid
+                    )
+                    if should_stop:
+                        break
+
+                if should_continue:
+                    continue
+
+        return not_found_count
+
+    def analyze_pr(self, pr: Dict) -> Dict:
+        """Analyze test results for a single PR with parallel processing.
+        
+        Args:
+            pr: PR dictionary from GitHub API
+            
+        Returns:
+            Dictionary containing PR metadata and Jenkins build results
+        """
+        start_time = time.perf_counter()
+        pr_number = pr['number']
+        tid = threading.get_ident()
+        
+        result = {
+            'pr_number': pr_number,
+            'title': pr['title'],
+            'merged_at': pr.get('merged_at', 'unknown'),
+            'jenkins_builds': {}
+        }
+        
+        # Extract Jenkins URLs
+        url_extract_start = time.perf_counter()
+        jenkins_urls = self.extract_jenkins_urls(pr)
+        url_extract_time = time.perf_counter() - url_extract_start
+        
+        if not jenkins_urls:
+            total_time = time.perf_counter() - start_time
+            self.log(f"No Jenkins builds found (total: {total_time:.2f}s)",
+                    thread_id=tid)
+            return result
+        
+        self.log(
+            f"Found {len(jenkins_urls)} Jenkins builds "
+            f"(URL extraction: {url_extract_time:.2f}s)",
+            thread_id=tid
+        )
+        
+        processing_start = time.perf_counter()
+        not_found_count = self._process_pr_builds(jenkins_urls, result, tid)
+        processing_time = time.perf_counter() - processing_start
+        total_time = time.perf_counter() - start_time
+        
+        self.log(
+            f"PR #{pr_number} analysis complete: "
+            f"{len(result['jenkins_builds'])} successful builds, "
+            f"{not_found_count} 404s (processing: {processing_time:.2f}s, "
+            f"total: {total_time:.2f}s)",
+            thread_id=tid
+        )
+        
+        return result
+    
+    def _collect_test_statistics(self, results):
+        """Collect test statistics from analysis results.
+        
+        Returns:
+            tuple: (total_tests_run, total_passed, total_failed,
+                   test_failures, test_suite_stats)
+        """
+        total_tests_run = 0
+        total_passed = 0
+        total_failed = 0
+        test_failures = defaultdict(list)
+        test_suite_stats = defaultdict(lambda: {'passed': 0, 'failed': 0})
+        
+        for result in results:
+            pr_number = result['pr_number']
+            pr_title = result['title']
+            
+            for context, build in result['jenkins_builds'].items():
+                if not build['test_results']['found']:
+                    continue
+                
+                total_tests_run += build['test_results']['total_tests']
+                total_passed += build['test_results']['passed']
+                total_failed += build['test_results']['failed']
+                
+                # Track per-suite statistics
+                for suite in build['test_results']['test_suites']:
+                    if (suite in build['test_results']['failed_tests'] or
+                        any(suite in ft for ft in build['test_results']['failed_tests'])):
+                        test_suite_stats[suite]['failed'] += 1
+                    else:
+                        test_suite_stats[suite]['passed'] += 1
+                
+                # Track which PR/build failed each test
+                for test in build['test_results']['failed_tests']:
+                    test_failures[test].append({
+                        'pr_number': pr_number,
+                        'pr_title': pr_title,
+                        'build_context': context
+                    })
+        
+        return (total_tests_run, total_passed, total_failed,
+                test_failures, test_suite_stats)
+
+    def _print_summary_statistics(self, results, total_tests_run,
+                                  total_passed, total_failed):
+        """Print summary statistics to console."""
+        total_prs = len(results)
+        prs_with_jenkins = sum(1 for r in results if r['jenkins_builds'])
+        prs_with_tests = sum(
+            1 for r in results
+            if any(b['test_results']['found']
+                  for b in r['jenkins_builds'].values())
+        )
+        
+        filter_msg = (f" (filter: {self.test_filter})"
+                     if self.test_filter else " (all tests)")
+        
+        print(f"\n{'='*80}")
+        print("ANALYSIS SUMMARY")
+        print(f"{'='*80}\n")
+        print(f"Total PRs analyzed: {total_prs}")
+        print(f"PRs with Jenkins builds: {prs_with_jenkins}")
+        print(f"PRs with test results{filter_msg}: {prs_with_tests}")
+        print(f"\nTest Statistics:")
+        print(f"  Total test executions: {total_tests_run}")
+        print(f"  Total passed: {total_passed}")
+        print(f"  Total failed: {total_failed}")
+        
+        if total_tests_run > 0:
+            pass_rate = (total_passed / total_tests_run) * 100
+            print(f"  Pass rate: {pass_rate:.2f}%")
+
+    def _print_suite_statistics(self, test_suite_stats):
+        """Print per-suite success rates."""
+        if not test_suite_stats:
+            return
+        
+        print(f"\nTest Suite Success Rates:")
+        sorted_suites = sorted(test_suite_stats.items(), key=lambda x: x[0])
+        for suite, stats in sorted_suites:
+            total = stats['passed'] + stats['failed']
+            if total > 0:
+                success_rate = (stats['passed'] / total) * 100
+                print(f"  {suite}: {success_rate:.1f}% "
+                      f"({stats['passed']}/{total})")
+
+    def _print_failure_report(self, test_failures):
+        """Print detailed failure report."""
+        sorted_failures = sorted(test_failures.items(),
+                                key=lambda x: len(x[1]), reverse=True)
+        
+        if not sorted_failures:
+            return sorted_failures
+        
+        print(f"\n{'='*80}")
+        print("FAILURE REPORT (sorted by failure count)")
+        print(f"{'='*80}\n")
+        
+        for test_name, failures in sorted_failures:
+            print(f"{test_name}: {len(failures)} failure(s)")
+            for failure in failures:
+                pr_num = failure['pr_number']
+                pr_title = failure['pr_title'][:60]
+                build_ctx = failure['build_context']
+                print(f"\tPR #{pr_num} ({build_ctx}): {pr_title}")
+            print()
+        
+        return sorted_failures
+
+    def _save_json_report(self, output_file, results, total_tests_run,
+                         total_passed, total_failed, test_suite_stats,
+                         sorted_failures):
+        """Save detailed JSON report to file."""
+        output_path = Path(output_file)
+        output_path.parent.mkdir(parents=True, exist_ok=True)
+        
+        total_prs = len(results)
+        prs_with_jenkins = sum(1 for r in results if r['jenkins_builds'])
+        prs_with_tests = sum(
+            1 for r in results
+            if any(b['test_results']['found']
+                  for b in r['jenkins_builds'].values())
+        )
+        
+        report = {
+            'generated_at': datetime.now().isoformat(),
+            'test_filter': self.test_filter,
+            'summary': {
+                'total_prs': total_prs,
+                'prs_with_jenkins': prs_with_jenkins,
+                'prs_with_tests': prs_with_tests,
+                'total_tests_run': total_tests_run,
+                'total_passed': total_passed,
+                'total_failed': total_failed,
+                'pass_rate': ((total_passed / total_tests_run * 100)
+                            if total_tests_run > 0 else 0)
+            },
+            'test_suite_stats': {
+                suite: {
+                    'passed': stats['passed'],
+                    'failed': stats['failed'],
+                    'success_rate': (
+                        (stats['passed'] /
+                         (stats['passed'] + stats['failed']) * 100)
+                        if (stats['passed'] + stats['failed']) > 0
+                        else 100.0
+                    )
+                }
+                for suite, stats in test_suite_stats.items()
+            },
+            'test_failures': {test: failures
+                            for test, failures in sorted_failures},
+            'pr_results': results
+        }
+        
+        with open(output_path, 'w') as f:
+            json.dump(report, f, indent=2)
+        
+        print(f"\nDetailed results saved to: {output_file}")
+
+    def generate_report(self, results: List[Dict], output_file: str):
+        """Generate analysis report with failure breakdown by test."""
+        # Collect statistics
+        (total_tests_run, total_passed, total_failed,
+         test_failures, test_suite_stats) = self._collect_test_statistics(results)
+        
+        # Print console reports
+        self._print_summary_statistics(results, total_tests_run,
+                                       total_passed, total_failed)
+        self._print_suite_statistics(test_suite_stats)
+        sorted_failures = self._print_failure_report(test_failures)
+        
+        # Save JSON report
+        self._save_json_report(output_file, results, total_tests_run,
+                              total_passed, total_failed, test_suite_stats,
+                              sorted_failures)
+
+
+def parse_arguments():
+    """Parse and return command-line arguments."""
+    parser = argparse.ArgumentParser(
+        description='Analyze unit test results across Ceph PRs on Jenkins',
+        epilog='''
+Examples:
+  # Analyze all tests across 100 recent PRs
+  %(prog)s --max-prs 100
+
+  # Focus on mgr unit tests in reef branch
+  %(prog)s --release reef --test-filter unittest_mgr --max-prs 50
+
+  # High-performance analysis with custom output
+  %(prog)s --workers 20 --download-workers 15 --output my_report.json
+        ''',
+        formatter_class=argparse.RawDescriptionHelpFormatter
+    )
+    parser.add_argument(
+        '--max-prs',
+        type=int,
+        default=100,
+        help='Maximum number of PRs to analyze (default: 100)'
+    )
+    parser.add_argument(
+        '--output',
+        default='unittest_analysis.json',
+        help='Output file for detailed results (default: unittest_analysis.json)'
+    )
+    parser.add_argument(
+        '--github-token',
+        help='GitHub personal access token (optional, for higher rate limits)'
+    )
+    parser.add_argument(
+        '--release',
+        default='main',
+        help='Target release branch to analyze PRs for (default: main). Examples: main, reef, quincy, pacific'
+    )
+    parser.add_argument(
+        '--test-filter',
+        default=None,
+        help='Filter tests by name pattern (e.g., "unittest_mgr"). If not specified, analyzes ALL tests.'
+    )
+    parser.add_argument(
+        '--workers',
+        type=int,
+        default=None,
+        help='Number of parallel workers for analyzing PRs (default: auto-calculated based on CPU cores)'
+    )
+    parser.add_argument(
+        '--download-workers',
+        type=int,
+        default=None,
+        help='Number of parallel workers per PR for downloading logs (default: auto-calculated)'
+    )
+    parser.add_argument(
+        '--max-connections',
+        type=int,
+        default=3000,
+        help='Maximum total concurrent connections (default: 3000, prevents overwhelming servers)'
+    )
+    parser.add_argument(
+        '--quiet',
+        action='store_true',
+        help='Reduce terminal output (only show important messages)'
+    )
+    parser.add_argument(
+        '--log-file',
+        help='Write detailed logs to file (useful with --quiet)'
+    )
+    return parser.parse_args()
+
+
+def calculate_worker_counts(args):
+    """Calculate optimal worker and download worker counts.
+    
+    Returns:
+        tuple: (workers, download_workers, total_connections)
+    """
+    cpu_count = os.cpu_count() or 4
+    
+    if args.workers is None or args.download_workers is None:
+        # Mathematical optimization:
+        # Goal: Maximize throughput while respecting max_connections
+        # Constraint: workers × download_workers ≤ max_connections
+        # Strategy: Balance PR-level and download-level parallelism
+        
+        if args.workers is None and args.download_workers is None:
+            # Optimal distribution for I/O-bound workload
+            # Use sqrt distribution: workers ≈ sqrt(max_connections)
+            optimal_workers = int(args.max_connections ** 0.5)
+            optimal_downloads = args.max_connections // optimal_workers
+            
+            # Clamp to reasonable ranges
+            workers = min(optimal_workers, cpu_count // 2, 100)
+            download_workers = max(3, min(optimal_downloads, 10))
+        elif args.workers is None:
+            # User specified download_workers, calculate workers
+            download_workers = args.download_workers
+            workers = min(args.max_connections // download_workers, cpu_count // 2, 100)
+        else:
+            # User specified workers, calculate download_workers
+            workers = args.workers
+            download_workers = max(3, min(args.max_connections // workers, 10))
+    else:
+        workers = args.workers
+        download_workers = args.download_workers
+    
+    total_connections = workers * download_workers
+    return workers, download_workers, total_connections
+
+
+def print_configuration(args, workers, download_workers, total_connections):
+    """Print worker configuration and analysis settings."""
+    cpu_count = os.cpu_count() or 4
+    
+    print(f"\n{'='*80}")
+    print("WORKER CONFIGURATION")
+    print(f"{'='*80}")
+    print(f"CPU cores available: {cpu_count}")
+    print(f"PR-level workers: {workers}")
+    print(f"Download workers per PR: {download_workers}")
+    print(f"Total concurrent connections: {total_connections}")
+    print(f"Max connections limit: {args.max_connections}")
+    print(f"{'='*80}\n")
+    
+    print(f"Target release: {args.release}")
+    if args.test_filter:
+        print(f"Test filter: {args.test_filter}")
+    else:
+        print("Test filter: None (analyzing ALL tests)")
+    print()
+
+
+def create_pr_chunks(prs, workers):
+    """Divide PRs into chunks for parallel processing.
+    
+    Args:
+        prs: List of PR dictionaries
+        workers: Number of worker threads
+        
+    Returns:
+        list: List of PR chunks, each containing (index, pr) tuples
+    """
+    pr_chunks = [[] for _ in range(workers)]
+    for i, pr in enumerate(prs):
+        pr_chunks[i % workers].append((i + 1, pr))
+    
+    # Remove empty chunks
+    return [chunk for chunk in pr_chunks if chunk]
+
+
+def analyze_pr_chunk(chunk, analyzer, prs, completed_count, progress_lock):
+    """Analyze a pre-assigned chunk of PRs in a single thread.
+    
+    Args:
+        chunk: List of (index, pr) tuples to analyze
+        analyzer: JenkinsAnalyzer instance
+        prs: Full list of PRs (for progress reporting)
+        completed_count: Shared list with single counter element
+        progress_lock: Threading lock for progress updates
+        
+    Returns:
+        list: Analysis results for the chunk
+    """
+    thread_id = threading.get_ident()
+    chunk_results = []
+    
+    for i, pr in chunk:
+        result = analyzer.analyze_pr(pr)
+        chunk_results.append(result)
+        
+        # Thread-safe progress reporting
+        with progress_lock:
+            completed_count[0] += 1
+            print(f"[{completed_count[0]}/{len(prs)}] [TID:{thread_id}] "
+                  f"Completed PR #{pr['number']}: {pr['title'][:50]}...")
+    
+    return chunk_results
+
+
+def run_parallel_analysis(prs, pr_chunks, analyzer, download_workers):
+    """Execute parallel analysis of PR chunks.
+    
+    Args:
+        prs: Full list of PRs
+        pr_chunks: List of PR chunks for parallel processing
+        analyzer: JenkinsAnalyzer instance
+        download_workers: Number of download workers per PR
+        
+    Returns:
+        list: Combined analysis results from all chunks
+    """
+    print(f"Analyzing {len(prs)} PRs with {len(pr_chunks)} workers...")
+    print(f"PRs per worker: {[len(chunk) for chunk in pr_chunks]}")
+    print(f"Each worker uses {download_workers} download threads\n")
+    
+    results = []
+    completed_count = [0]
+    progress_lock = threading.Lock()
+    
+    with ThreadPoolExecutor(max_workers=len(pr_chunks)) as executor:
+        # Submit chunk analysis tasks (one per worker)
+        futures = [
+            executor.submit(analyze_pr_chunk, chunk, analyzer, prs,
+                          completed_count, progress_lock)
+            for chunk in pr_chunks
+        ]
+        
+        # Collect results as they complete
+        for future in as_completed(futures):
+            try:
+                chunk_results = future.result()
+                results.extend(chunk_results)
+            except Exception as e:
+                print(f"\nError analyzing PR chunk: {e}")
+    
+    return results
+
+
+def main():
+    """Main entry point for the Jenkins unit test analyzer."""
+    args = parse_arguments()
+    
+    # Calculate worker configuration
+    workers, download_workers, total_connections = calculate_worker_counts(args)
+    
+    # Print configuration
+    print_configuration(args, workers, download_workers, total_connections)
+    
+    # Create analyzer
+    analyzer = JenkinsAnalyzer(
+        github_token=args.github_token,
+        max_workers=download_workers,
+        test_filter=args.test_filter
+    )
+    
+    # Fetch PRs
+    prs = analyzer.fetch_recent_prs(max_prs=args.max_prs, release=args.release)
+    
+    if not prs:
+        print("No PRs found to analyze")
+        return
+    
+    # Create PR chunks for parallel processing
+    pr_chunks = create_pr_chunks(prs, workers)
+    
+    # Run parallel analysis
+    results = run_parallel_analysis(prs, pr_chunks, analyzer, download_workers)
+    
+    # Generate report
+    analyzer.generate_report(results, args.output)
+
+
+if __name__ == '__main__':
+    main()
+
+# Made with Bob