return HandleCommandResult(stdout='\n'.join(run(host)))
+ @orchestrator._cli_read_command('cephadm systemd-unit ls')
+ def _systemd_unit_ls(
+ self,
+ hostname: Optional[str] = None,
+ daemon_type: Optional[str] = None,
+ daemon_id: Optional[str] = None
+ ) -> HandleCommandResult:
+ daemons = self.systemd_unit_ls(hostname, daemon_type, daemon_id)
+ return HandleCommandResult(stdout=json.dumps(daemons, indent=4))
+
+ @orchestrator._cli_read_command('cephadm systemd-unit ls')
+ def systemd_unit_ls(
+ self,
+ hostname: Optional[str] = None,
+ daemon_type: Optional[str] = None,
+ daemon_id: Optional[str] = None
+ ) -> HandleCommandResult:
+ # First, some filtering
+ if hostname and daemon_type:
+ daemons = self.cache.get_daemons_by_type(daemon_type, hostname)
+ elif hostname:
+ daemons = self.cache.get_daemons_by_host(hostname)
+ elif daemon_type:
+ daemons = self.cache.get_daemons_by_type(daemon_type)
+ else:
+ daemons = self.cache.get_daemons()
+ if daemon_id:
+ daemons = [d for d in daemons if d.daemon_id == daemon_id]
+ # intended structure for the dict is
+ # {
+ # <host>: {
+ # <daemon_type>: {
+ # <daemon_name>: systemd unit
+ # }
+ # }
+ # }
+ systemd_unit_dict: Dict[str, Dict[str, Dict[str, str]]] = {}
+ for d in daemons:
+ # for mypy
+ host = d.hostname
+ d_type = d.daemon_type
+ systemd_unit = d.systemd_unit
+ assert host is not None
+ assert d_type is not None
+ assert systemd_unit is not None
+ if host not in systemd_unit_dict:
+ systemd_unit_dict[host] = {}
+ if d_type not in systemd_unit_dict[host]:
+ systemd_unit_dict[host][d_type] = {}
+ systemd_unit_dict[host][d_type][d.name()] = systemd_unit
+ return HandleCommandResult(stdout=json.dumps(systemd_unit_dict, indent=4))
+
@orchestrator._cli_read_command('orch client-keyring ls')
def _client_keyring_ls(self, format: Format = Format.plain) -> HandleCommandResult:
"""