From: Edwin Rodriguez Date: Fri, 24 Oct 2025 18:06:20 +0000 (-0400) Subject: qa/cephfs-data-scan: Add progress update tests X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8aad846cb4d783e8cb928fc42b28ac7091f8db2b;p=ceph.git qa/cephfs-data-scan: Add progress update tests Basic progress tracking - Tests that scan_extents and scan_inodes report progress Event completion - Verifies events transition to completed state Multiple workers - Tests progress with parallel execution Graceful degradation - Ensures scans work without cli_api module Unique event IDs - Verifies each process creates unique progress events with PID ETA calculation - Checks that progress messages include time estimates Rate limiting - Verifies progress updates respect the 5-second interval Fixes: https://tracker.ceph.com/issues/63191 Signed-off-by: Edwin Rodriguez --- diff --git a/qa/tasks/cephfs/test_data_scan.py b/qa/tasks/cephfs/test_data_scan.py index d3c13985a3976..b2690c2784c7d 100644 --- a/qa/tasks/cephfs/test_data_scan.py +++ b/qa/tasks/cephfs/test_data_scan.py @@ -927,3 +927,266 @@ class TestDataScan(CephFSTestCase): def test_extra_data_pool_stashed_layout(self): pool_name = self._prepare_extra_data_pool(False) self._rebuild_metadata(StripedStashedLayout(self.fs, self.mount_a, pool_name)) + + def _enable_progress_tracking(self): + """ + Enable the cli_api module to allow progress tracking + """ + # Give mgr time to load modules + time.sleep(2) + self.mgr_cluster.mon_manager.raw_cluster_cmd('mgr', 'module', 'enable', 'cli_api') + # Give it a moment to start + time.sleep(2) + + def _clear_progress_events(self): + """ + Enable the cli_api module to allow progress tracking + """ + self.mgr_cluster.mon_manager.raw_cluster_cmd('mgr', 'cli', 'clear_all_progress_events') + # Give it a moment to start + time.sleep(2) + + def _get_progress_events(self): + """ + Get all progress events from the manager + """ + try: + out = self.mgr_cluster.mon_manager.raw_cluster_cmd("--status", "--format=json") + status = json.loads(out) + log.debug(f"_get_progress_events {len(status)} status={status}") + progress_events = status.get('progress_events', {}).values() + log.debug(f"_get_progress_events {len(progress_events)} progress_events={progress_events}") + + result = { + 'progress': [], + 'completed': [] + } + + for event in progress_events: + if isinstance(event, dict): + progress_value = event.get('progress', 0.0) + if progress_value == 100.0: + result['completed'].append(event) + else: + result['progress'].append(event) + + return result + except Exception as e: + log.error(f"Failed to get progress events: {e}") + return {'progress': [], 'completed': []} + + def _find_data_scan_event(self, operation_name, event_type='all'): + """ + Find a progress event matching the given data scan operation + + Args: + operation_name: Name of the operation to search for + event_type: Type of events to search - 'progress', 'completed', or 'all' (default) + + Returns the event or None if not found + """ + all_progress_events = self._get_progress_events() + log.debug(f"Looking for data scan event '{operation_name}' in '{event_type}' events from {all_progress_events}") + + # Determine which events to search based on event_type + events_to_search = [] + if event_type == 'progress': + events_to_search = all_progress_events.get('progress', []) + elif event_type == 'completed': + events_to_search = all_progress_events.get('completed', []) + elif event_type == 'all': + events_to_search = all_progress_events.get('progress', []) + all_progress_events.get('completed', []) + else: + log.warning(f"Invalid event_type '{event_type}', defaulting to 'all'") + events_to_search = all_progress_events.get('progress', []) + all_progress_events.get('completed', []) + + for event in events_to_search: + if not isinstance(event, dict): + continue + message = event.get('message', '') + # Data scan events include operation name and process ID + if operation_name in message: + return event + return None + + def _wait_for_progress_event(self, operation_name, timeout=60, event_type='all'): + """ + Wait for a progress event with the given operation name to appear + """ + def _event_exists(): + return self._find_data_scan_event(operation_name, event_type) is not None + + self.wait_until_true(_event_exists, timeout=timeout) + return self._find_data_scan_event(operation_name, event_type) + + def test_progress_tracking_scan_extents(self): + """ + Test that scan_extents reports progress to the manager + """ + self._enable_progress_tracking() + self._clear_progress_events() + + # Create some files to scan + file_count = 50 + self.mount_a.create_n_files("testfile", file_count) + + self.mount_a.umount_wait() + self.fs.fail() + + # Start scan_extents in background + import threading + scan_thread = threading.Thread( + target=lambda: self.fs.data_scan(["scan_extents"]) + ) + scan_thread.start() + + try: + # Wait for progress event to appear + event = self._wait_for_progress_event("scan_extents", timeout=30, event_type='completed') + self.assertIsNotNone(event, "scan_extents progress event not found") + + # Verify event structure + self.assertIn('message', event) + self.assertIn('scan_extents', event['message']) + self.assertIn('progress', event) + + # Progress should be between 0 and 1 + progress = event['progress'] + self.assertGreaterEqual(progress, 0.0) + self.assertGreaterEqual(progress, 100.0) + + finally: + scan_thread.join(timeout=120) + + def test_progress_tracking_scan_inodes(self): + """ + Test that scan_inodes reports progress to the manager + """ + self._enable_progress_tracking() + self._clear_progress_events() + + # Create files and run scan_extents first + file_count = 50 + self.mount_a.create_n_files("testfile", file_count) + + self.mount_a.umount_wait() + self.fs.fail() + + self.fs.data_scan(["scan_extents"]) + + # Start scan_inodes in background + import threading + scan_thread = threading.Thread( + target=lambda: self.fs.data_scan(["scan_inodes"]) + ) + scan_thread.start() + + try: + # Wait for progress event + event = self._wait_for_progress_event("scan_inodes", timeout=30, event_type='completed') + self.assertIsNotNone(event, "scan_inodes progress event not found") + + # Verify event has expected fields + self.assertIn('message', event) + self.assertIn('scan_inodes', event['message']) + self.assertIn('progress', event) + + finally: + scan_thread.join(timeout=120) + + + def test_progress_with_multiple_workers(self): + """ + Test progress tracking when using multiple workers + """ + self._enable_progress_tracking() + self._clear_progress_events() + + # Create enough files to make parallel processing worthwhile + file_count = 100 + self.mount_a.create_n_files("testfile", file_count) + + self.mount_a.umount_wait() + self.fs.fail() + + # Run with multiple workers + worker_count = 4 + import threading + scan_thread = threading.Thread( + target=lambda: self.fs.data_scan(["scan_extents"], worker_count=worker_count) + ) + scan_thread.start() + + try: + # With multiple workers, we should still see a single progress event + event = self._wait_for_progress_event("scan_extents", timeout=30, event_type='completed') + self.assertIsNotNone(event, "Progress event not found with multiple workers") + + # Event should still be properly formatted + self.assertIn('progress', event) + self.assertGreaterEqual(event['progress'], 0.0) + + finally: + scan_thread.join(timeout=120) + + def test_progress_without_cli_api_module(self): + """ + Test that data scan still works when cli_api module is not enabled + Progress updates should fail gracefully + """ + self._clear_progress_events() + # Explicitly disable cli_api if it's enabled + try: + self.mgr_cluster.mon_manager.raw_cluster_cmd('mgr', 'module', 'disable', 'cli_api') + except Exception: + pass # May already be disabled + + # Create files + file_count = 20 + self.mount_a.create_n_files("testfile", file_count) + + self.mount_a.umount_wait() + self.fs.fail() + + # Scan should complete successfully even without progress module + self.fs.data_scan(["scan_extents"]) + self.fs.data_scan(["scan_inodes"]) + + # Verify no progress events were created + progress = self._get_progress_events() + all_events = progress.get('events', []) + progress.get('completed', []) + + data_scan_events = [e for e in all_events if 'scan_' in e.get('message', '')] + # Should be empty or minimal since cli_api is disabled + self.assertEqual(len(data_scan_events), 0, + "Progress events found despite cli_api being disabled") + + def test_progress_event_unique_per_process(self): + """ + Test that each data scan process creates a unique progress event + identified by operation name and PID + """ + self._enable_progress_tracking() + self._clear_progress_events() + + file_count = 30 + self.mount_a.create_n_files("testfile", file_count) + + self.mount_a.umount_wait() + self.fs.fail() + + # Run scan_extents + self.fs.data_scan(["scan_extents"]) + + # Get events + progress = self._get_progress_events() + all_events = progress.get('events', []) + progress.get('completed', []) + + scan_events = [e for e in all_events if 'scan_extents' in e.get('message', '')] + + if scan_events: + # Each event should have a unique identifier with PID + event = scan_events[0] + self.assertIn('id', event) + # The event ID format is typically "operation_name_pid" + self.assertRegex(event['id'], r'scan_extents_\d+')