]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
qa/cephfs-data-scan: Add progress update tests
authorEdwin Rodriguez <edwin.rodriguez1@ibm.com>
Fri, 24 Oct 2025 18:06:20 +0000 (14:06 -0400)
committerEdwin Rodriguez <edwin.rodriguez1@ibm.com>
Tue, 9 Dec 2025 18:05:35 +0000 (13:05 -0500)
Basic progress tracking - Tests that scan_extents and scan_inodes report progress
Event completion - Verifies events transition to completed state
Multiple workers - Tests progress with parallel execution
Graceful degradation - Ensures scans work without cli_api module
Unique event IDs - Verifies each process creates unique progress events with PID
ETA calculation - Checks that progress messages include time estimates
Rate limiting - Verifies progress updates respect the 5-second interval

Fixes: https://tracker.ceph.com/issues/63191
Signed-off-by: Edwin Rodriguez <edwin.rodriguez1@ibm.com>
qa/tasks/cephfs/test_data_scan.py

index d3c13985a3976a0eaf49db82bdd6dbcd0e80e85e..b2690c2784c7d09e182969b1cdf65ff8e91fbd2a 100644 (file)
@@ -927,3 +927,266 @@ class TestDataScan(CephFSTestCase):
     def test_extra_data_pool_stashed_layout(self):
         pool_name = self._prepare_extra_data_pool(False)
         self._rebuild_metadata(StripedStashedLayout(self.fs, self.mount_a, pool_name))
+
+    def _enable_progress_tracking(self):
+        """
+        Enable the cli_api module to allow progress tracking
+        """
+        # Give mgr time to load modules
+        time.sleep(2)
+        self.mgr_cluster.mon_manager.raw_cluster_cmd('mgr', 'module', 'enable', 'cli_api')
+        # Give it a moment to start
+        time.sleep(2)
+        
+    def _clear_progress_events(self):
+        """
+        Enable the cli_api module to allow progress tracking
+        """
+        self.mgr_cluster.mon_manager.raw_cluster_cmd('mgr', 'cli', 'clear_all_progress_events')
+        # Give it a moment to start
+        time.sleep(2)
+
+    def _get_progress_events(self):
+        """
+        Get all progress events from the manager
+        """
+        try:
+            out = self.mgr_cluster.mon_manager.raw_cluster_cmd("--status", "--format=json")
+            status = json.loads(out)
+            log.debug(f"_get_progress_events {len(status)} status={status}")
+            progress_events = status.get('progress_events', {}).values()
+            log.debug(f"_get_progress_events {len(progress_events)} progress_events={progress_events}")
+
+            result = {
+                'progress': [],
+                'completed': []
+            }
+
+            for event in progress_events:
+                if isinstance(event, dict):
+                    progress_value = event.get('progress', 0.0)
+                    if progress_value == 100.0:
+                        result['completed'].append(event)
+                    else:
+                        result['progress'].append(event)
+
+            return result
+        except Exception as e:
+            log.error(f"Failed to get progress events: {e}")
+            return {'progress': [], 'completed': []}
+
+    def _find_data_scan_event(self, operation_name, event_type='all'):
+        """
+        Find a progress event matching the given data scan operation
+
+        Args:
+            operation_name: Name of the operation to search for
+            event_type: Type of events to search - 'progress', 'completed', or 'all' (default)
+
+        Returns the event or None if not found
+        """
+        all_progress_events = self._get_progress_events()
+        log.debug(f"Looking for data scan event '{operation_name}' in '{event_type}' events from {all_progress_events}")
+
+        # Determine which events to search based on event_type
+        events_to_search = []
+        if event_type == 'progress':
+            events_to_search = all_progress_events.get('progress', [])
+        elif event_type == 'completed':
+            events_to_search = all_progress_events.get('completed', [])
+        elif event_type == 'all':
+            events_to_search = all_progress_events.get('progress', []) + all_progress_events.get('completed', [])
+        else:
+            log.warning(f"Invalid event_type '{event_type}', defaulting to 'all'")
+            events_to_search = all_progress_events.get('progress', []) + all_progress_events.get('completed', [])
+
+        for event in events_to_search:
+            if not isinstance(event, dict):
+                continue
+            message = event.get('message', '')
+            # Data scan events include operation name and process ID
+            if operation_name in message:
+                return event
+        return None
+
+    def _wait_for_progress_event(self, operation_name, timeout=60, event_type='all'):
+        """
+        Wait for a progress event with the given operation name to appear
+        """
+        def _event_exists():
+            return self._find_data_scan_event(operation_name, event_type) is not None
+
+        self.wait_until_true(_event_exists, timeout=timeout)
+        return self._find_data_scan_event(operation_name, event_type)
+
+    def test_progress_tracking_scan_extents(self):
+        """
+        Test that scan_extents reports progress to the manager
+        """
+        self._enable_progress_tracking()
+        self._clear_progress_events()
+
+        # Create some files to scan
+        file_count = 50
+        self.mount_a.create_n_files("testfile", file_count)
+
+        self.mount_a.umount_wait()
+        self.fs.fail()
+
+        # Start scan_extents in background
+        import threading
+        scan_thread = threading.Thread(
+            target=lambda: self.fs.data_scan(["scan_extents"])
+        )
+        scan_thread.start()
+
+        try:
+            # Wait for progress event to appear
+            event = self._wait_for_progress_event("scan_extents", timeout=30, event_type='completed')
+            self.assertIsNotNone(event, "scan_extents progress event not found")
+
+            # Verify event structure
+            self.assertIn('message', event)
+            self.assertIn('scan_extents', event['message'])
+            self.assertIn('progress', event)
+
+            # Progress should be between 0 and 1
+            progress = event['progress']
+            self.assertGreaterEqual(progress, 0.0)
+            self.assertGreaterEqual(progress, 100.0)
+
+        finally:
+            scan_thread.join(timeout=120)
+
+    def test_progress_tracking_scan_inodes(self):
+        """
+        Test that scan_inodes reports progress to the manager
+        """
+        self._enable_progress_tracking()
+        self._clear_progress_events()
+
+        # Create files and run scan_extents first
+        file_count = 50
+        self.mount_a.create_n_files("testfile", file_count)
+
+        self.mount_a.umount_wait()
+        self.fs.fail()
+
+        self.fs.data_scan(["scan_extents"])
+
+        # Start scan_inodes in background
+        import threading
+        scan_thread = threading.Thread(
+            target=lambda: self.fs.data_scan(["scan_inodes"])
+        )
+        scan_thread.start()
+
+        try:
+            # Wait for progress event
+            event = self._wait_for_progress_event("scan_inodes", timeout=30, event_type='completed')
+            self.assertIsNotNone(event, "scan_inodes progress event not found")
+
+            # Verify event has expected fields
+            self.assertIn('message', event)
+            self.assertIn('scan_inodes', event['message'])
+            self.assertIn('progress', event)
+
+        finally:
+            scan_thread.join(timeout=120)
+
+
+    def test_progress_with_multiple_workers(self):
+        """
+        Test progress tracking when using multiple workers
+        """
+        self._enable_progress_tracking()
+        self._clear_progress_events()
+
+        # Create enough files to make parallel processing worthwhile
+        file_count = 100
+        self.mount_a.create_n_files("testfile", file_count)
+
+        self.mount_a.umount_wait()
+        self.fs.fail()
+
+        # Run with multiple workers
+        worker_count = 4
+        import threading
+        scan_thread = threading.Thread(
+            target=lambda: self.fs.data_scan(["scan_extents"], worker_count=worker_count)
+        )
+        scan_thread.start()
+
+        try:
+            # With multiple workers, we should still see a single progress event
+            event = self._wait_for_progress_event("scan_extents", timeout=30, event_type='completed')
+            self.assertIsNotNone(event, "Progress event not found with multiple workers")
+
+            # Event should still be properly formatted
+            self.assertIn('progress', event)
+            self.assertGreaterEqual(event['progress'], 0.0)
+
+        finally:
+            scan_thread.join(timeout=120)
+
+    def test_progress_without_cli_api_module(self):
+        """
+        Test that data scan still works when cli_api module is not enabled
+        Progress updates should fail gracefully
+        """
+        self._clear_progress_events()
+        # Explicitly disable cli_api if it's enabled
+        try:
+            self.mgr_cluster.mon_manager.raw_cluster_cmd('mgr', 'module', 'disable', 'cli_api')
+        except Exception:
+            pass  # May already be disabled
+
+        # Create files
+        file_count = 20
+        self.mount_a.create_n_files("testfile", file_count)
+
+        self.mount_a.umount_wait()
+        self.fs.fail()
+
+        # Scan should complete successfully even without progress module
+        self.fs.data_scan(["scan_extents"])
+        self.fs.data_scan(["scan_inodes"])
+
+        # Verify no progress events were created
+        progress = self._get_progress_events()
+        all_events = progress.get('events', []) + progress.get('completed', [])
+
+        data_scan_events = [e for e in all_events if 'scan_' in e.get('message', '')]
+        # Should be empty or minimal since cli_api is disabled
+        self.assertEqual(len(data_scan_events), 0,
+                         "Progress events found despite cli_api being disabled")
+
+    def test_progress_event_unique_per_process(self):
+        """
+        Test that each data scan process creates a unique progress event
+        identified by operation name and PID
+        """
+        self._enable_progress_tracking()
+        self._clear_progress_events()
+
+        file_count = 30
+        self.mount_a.create_n_files("testfile", file_count)
+
+        self.mount_a.umount_wait()
+        self.fs.fail()
+
+        # Run scan_extents
+        self.fs.data_scan(["scan_extents"])
+
+        # Get events
+        progress = self._get_progress_events()
+        all_events = progress.get('events', []) + progress.get('completed', [])
+
+        scan_events = [e for e in all_events if 'scan_extents' in e.get('message', '')]
+
+        if scan_events:
+            # Each event should have a unique identifier with PID
+            event = scan_events[0]
+            self.assertIn('id', event)
+            # The event ID format is typically "operation_name_pid"
+            self.assertRegex(event['id'], r'scan_extents_\d+')