stderr = err.getvalue()
return HandleCommandResult(retval, stdout, stderr)
+ @SelftestCLICommand('mgr self-test logging')
+ def test_logging(self) -> Tuple[int, str, str]:
+ '''
+ Test logging isolation between mgr modules
+ '''
+ import logging
+
+ errors = []
+
+ mod_logger = self.getLogger()
+ if mod_logger.name != self.module_name:
+ errors.append(
+ f"getLogger() returned '{mod_logger.name}', "
+ f"expected '{self.module_name}'"
+ )
+
+ if mod_logger is logging.getLogger():
+ errors.append("getLogger() returned root logger")
+
+ if not mod_logger.handlers:
+ errors.append("module logger has no handlers")
+
+ if mod_logger.propagate:
+ errors.append("module logger propagate is True, expected False")
+
+ child = logging.getLogger(f"{self.module_name}.sub.test")
+ if not child.name.startswith(self.module_name + "."):
+ errors.append(
+ f"child logger '{child.name}' is not under "
+ f"'{self.module_name}'"
+ )
+
+ if not child.propagate:
+ errors.append("child logger propagate is False")
+
+ root = logging.getLogger()
+ if not root.handlers:
+ errors.append("root logger has no fallback handler")
+
+ try:
+ mod_logger.debug("selftest: debug message")
+ mod_logger.info("selftest: info message")
+ mod_logger.warning("selftest: warning message")
+ child.info("selftest: child logger message")
+ except Exception as e:
+ errors.append(f"logging raised exception: {e}")
+
+ # --- Cross-module isolation ---
+ other_logger = logging.getLogger("balancer")
+ our_handlers = set(id(h) for h in mod_logger.handlers)
+ other_handlers = set(id(h) for h in other_logger.handlers)
+ shared = our_handlers & other_handlers
+ if shared:
+ errors.append(
+ f"selftest and balancer share {len(shared)} handler(s)"
+ )
+
+ # --- Message capture and routing ---
+ captured_mod = []
+ captured_root = []
+
+ class ModCaptureHandler(logging.Handler):
+ def emit(self, record):
+ captured_mod.append(record)
+
+ class RootCaptureHandler(logging.Handler):
+ def emit(self, record):
+ captured_root.append(record)
+
+ mod_capture = ModCaptureHandler()
+ mod_capture.setLevel(logging.DEBUG)
+ mod_logger.addHandler(mod_capture)
+
+ root_capture = RootCaptureHandler()
+ root_capture.setLevel(logging.DEBUG)
+ root.handlers.insert(0, root_capture)
+
+ try:
+ # Module logger should be captured
+ mod_logger.info("selftest: direct module message")
+
+ # Child logger should propagate to module handler
+ child_logger = logging.getLogger(f"{self.module_name}.test.capture")
+ child_logger.info("selftest: child module message")
+
+ # Verify both messages reached module handler
+ if len(captured_mod) < 2:
+ errors.append(
+ f"expected at least 2 module messages, got {len(captured_mod)}"
+ )
+
+ # Verify correct attribution
+ mod_names = [r.name for r in captured_mod]
+ if self.module_name not in mod_names:
+ errors.append(f"no message from '{self.module_name}'")
+ expected_child = f"{self.module_name}.test.capture"
+ if expected_child not in mod_names:
+ errors.append(f"no message from '{expected_child}'")
+
+ # Verify no foreign messages leaked into module handler
+ for r in captured_mod:
+ if not r.name.startswith(self.module_name):
+ errors.append(
+ f"unexpected message from '{r.name}' in module handler"
+ )
+
+ # Module messages should NOT reach root (propagate=False)
+ if any(r.name.startswith(self.module_name) for r in captured_root):
+ errors.append("module message leaked to root handler")
+
+ # Orphan logger should reach root
+ import uuid
+ marker = f"selftest-orphan-{uuid.uuid4().hex[:8]}"
+ orphan = logging.getLogger("some.random.library")
+ orphan.warning(marker)
+
+ if not any(marker in r.getMessage() for r in captured_root):
+ errors.append("orphan message did not reach root handler")
+
+ finally:
+ mod_logger.removeHandler(mod_capture)
+ root.removeHandler(root_capture)
+
+ if errors:
+ return -1, '', 'Logging self-test failed:\n' + '\n'.join(errors)
+ return 0, '', 'Logging self-test passed'
+
def serve(self) -> None:
while True:
if self._workload == Workload.COMMAND_SPAM: