def test_extra_data_pool_stashed_layout(self):
pool_name = self._prepare_extra_data_pool(False)
self._rebuild_metadata(StripedStashedLayout(self.fs, self.mount_a, pool_name))
+
+ def _enable_progress_tracking(self):
+ """
+ Enable the cli_api module to allow progress tracking
+ """
+ # Give mgr time to load modules
+ time.sleep(2)
+ self.mgr_cluster.mon_manager.raw_cluster_cmd('mgr', 'module', 'enable', 'cli_api')
+ # Give it a moment to start
+ time.sleep(2)
+
+ def _clear_progress_events(self):
+ """
+ Enable the cli_api module to allow progress tracking
+ """
+ self.mgr_cluster.mon_manager.raw_cluster_cmd('mgr', 'cli', 'clear_all_progress_events')
+ # Give it a moment to start
+ time.sleep(2)
+
+ def _get_progress_events(self):
+ """
+ Get all progress events from the manager
+ """
+ try:
+ out = self.mgr_cluster.mon_manager.raw_cluster_cmd("--status", "--format=json")
+ status = json.loads(out)
+ log.debug(f"_get_progress_events {len(status)} status={status}")
+ progress_events = status.get('progress_events', {}).values()
+ log.debug(f"_get_progress_events {len(progress_events)} progress_events={progress_events}")
+
+ result = {
+ 'progress': [],
+ 'completed': []
+ }
+
+ for event in progress_events:
+ if isinstance(event, dict):
+ progress_value = event.get('progress', 0.0)
+ if progress_value == 100.0:
+ result['completed'].append(event)
+ else:
+ result['progress'].append(event)
+
+ return result
+ except Exception as e:
+ log.error(f"Failed to get progress events: {e}")
+ return {'progress': [], 'completed': []}
+
+ def _find_data_scan_event(self, operation_name, event_type='all'):
+ """
+ Find a progress event matching the given data scan operation
+
+ Args:
+ operation_name: Name of the operation to search for
+ event_type: Type of events to search - 'progress', 'completed', or 'all' (default)
+
+ Returns the event or None if not found
+ """
+ all_progress_events = self._get_progress_events()
+ log.debug(f"Looking for data scan event '{operation_name}' in '{event_type}' events from {all_progress_events}")
+
+ # Determine which events to search based on event_type
+ events_to_search = []
+ if event_type == 'progress':
+ events_to_search = all_progress_events.get('progress', [])
+ elif event_type == 'completed':
+ events_to_search = all_progress_events.get('completed', [])
+ elif event_type == 'all':
+ events_to_search = all_progress_events.get('progress', []) + all_progress_events.get('completed', [])
+ else:
+ log.warning(f"Invalid event_type '{event_type}', defaulting to 'all'")
+ events_to_search = all_progress_events.get('progress', []) + all_progress_events.get('completed', [])
+
+ for event in events_to_search:
+ if not isinstance(event, dict):
+ continue
+ message = event.get('message', '')
+ # Data scan events include operation name and process ID
+ if operation_name in message:
+ return event
+ return None
+
+ def _wait_for_progress_event(self, operation_name, timeout=60, event_type='all'):
+ """
+ Wait for a progress event with the given operation name to appear
+ """
+ def _event_exists():
+ return self._find_data_scan_event(operation_name, event_type) is not None
+
+ self.wait_until_true(_event_exists, timeout=timeout)
+ return self._find_data_scan_event(operation_name, event_type)
+
+ def test_progress_tracking_scan_extents(self):
+ """
+ Test that scan_extents reports progress to the manager
+ """
+ self._enable_progress_tracking()
+ self._clear_progress_events()
+
+ # Create some files to scan
+ file_count = 50
+ self.mount_a.create_n_files("testfile", file_count)
+
+ self.mount_a.umount_wait()
+ self.fs.fail()
+
+ # Start scan_extents in background
+ import threading
+ scan_thread = threading.Thread(
+ target=lambda: self.fs.data_scan(["scan_extents"])
+ )
+ scan_thread.start()
+
+ try:
+ # Wait for progress event to appear
+ event = self._wait_for_progress_event("scan_extents", timeout=30, event_type='completed')
+ self.assertIsNotNone(event, "scan_extents progress event not found")
+
+ # Verify event structure
+ self.assertIn('message', event)
+ self.assertIn('scan_extents', event['message'])
+ self.assertIn('progress', event)
+
+ # Progress should be between 0 and 1
+ progress = event['progress']
+ self.assertGreaterEqual(progress, 0.0)
+ self.assertGreaterEqual(progress, 100.0)
+
+ finally:
+ scan_thread.join(timeout=120)
+
+ def test_progress_tracking_scan_inodes(self):
+ """
+ Test that scan_inodes reports progress to the manager
+ """
+ self._enable_progress_tracking()
+ self._clear_progress_events()
+
+ # Create files and run scan_extents first
+ file_count = 50
+ self.mount_a.create_n_files("testfile", file_count)
+
+ self.mount_a.umount_wait()
+ self.fs.fail()
+
+ self.fs.data_scan(["scan_extents"])
+
+ # Start scan_inodes in background
+ import threading
+ scan_thread = threading.Thread(
+ target=lambda: self.fs.data_scan(["scan_inodes"])
+ )
+ scan_thread.start()
+
+ try:
+ # Wait for progress event
+ event = self._wait_for_progress_event("scan_inodes", timeout=30, event_type='completed')
+ self.assertIsNotNone(event, "scan_inodes progress event not found")
+
+ # Verify event has expected fields
+ self.assertIn('message', event)
+ self.assertIn('scan_inodes', event['message'])
+ self.assertIn('progress', event)
+
+ finally:
+ scan_thread.join(timeout=120)
+
+
+ def test_progress_with_multiple_workers(self):
+ """
+ Test progress tracking when using multiple workers
+ """
+ self._enable_progress_tracking()
+ self._clear_progress_events()
+
+ # Create enough files to make parallel processing worthwhile
+ file_count = 100
+ self.mount_a.create_n_files("testfile", file_count)
+
+ self.mount_a.umount_wait()
+ self.fs.fail()
+
+ # Run with multiple workers
+ worker_count = 4
+ import threading
+ scan_thread = threading.Thread(
+ target=lambda: self.fs.data_scan(["scan_extents"], worker_count=worker_count)
+ )
+ scan_thread.start()
+
+ try:
+ # With multiple workers, we should still see a single progress event
+ event = self._wait_for_progress_event("scan_extents", timeout=30, event_type='completed')
+ self.assertIsNotNone(event, "Progress event not found with multiple workers")
+
+ # Event should still be properly formatted
+ self.assertIn('progress', event)
+ self.assertGreaterEqual(event['progress'], 0.0)
+
+ finally:
+ scan_thread.join(timeout=120)
+
+ def test_progress_without_cli_api_module(self):
+ """
+ Test that data scan still works when cli_api module is not enabled
+ Progress updates should fail gracefully
+ """
+ self._clear_progress_events()
+ # Explicitly disable cli_api if it's enabled
+ try:
+ self.mgr_cluster.mon_manager.raw_cluster_cmd('mgr', 'module', 'disable', 'cli_api')
+ except Exception:
+ pass # May already be disabled
+
+ # Create files
+ file_count = 20
+ self.mount_a.create_n_files("testfile", file_count)
+
+ self.mount_a.umount_wait()
+ self.fs.fail()
+
+ # Scan should complete successfully even without progress module
+ self.fs.data_scan(["scan_extents"])
+ self.fs.data_scan(["scan_inodes"])
+
+ # Verify no progress events were created
+ progress = self._get_progress_events()
+ all_events = progress.get('events', []) + progress.get('completed', [])
+
+ data_scan_events = [e for e in all_events if 'scan_' in e.get('message', '')]
+ # Should be empty or minimal since cli_api is disabled
+ self.assertEqual(len(data_scan_events), 0,
+ "Progress events found despite cli_api being disabled")
+
+ def test_progress_event_unique_per_process(self):
+ """
+ Test that each data scan process creates a unique progress event
+ identified by operation name and PID
+ """
+ self._enable_progress_tracking()
+ self._clear_progress_events()
+
+ file_count = 30
+ self.mount_a.create_n_files("testfile", file_count)
+
+ self.mount_a.umount_wait()
+ self.fs.fail()
+
+ # Run scan_extents
+ self.fs.data_scan(["scan_extents"])
+
+ # Get events
+ progress = self._get_progress_events()
+ all_events = progress.get('events', []) + progress.get('completed', [])
+
+ scan_events = [e for e in all_events if 'scan_extents' in e.get('message', '')]
+
+ if scan_events:
+ # Each event should have a unique identifier with PID
+ event = scan_events[0]
+ self.assertIn('id', event)
+ # The event ID format is typically "operation_name_pid"
+ self.assertRegex(event['id'], r'scan_extents_\d+')