from orchestrator.module import to_format, Format
from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
- CLICommandMeta, DaemonDescription, DaemonDescriptionStatus, handle_orch_error, \
+ DaemonDescription, DaemonDescriptionStatus, handle_orch_error, \
service_to_daemon_types
from orchestrator._interface import GenericSpec
from orchestrator._interface import daemon_type_to_service
from . import utils
from . import ssh
+from .cli import CephadmCLICommand
from .migrations import Migrations
from .services.cephadmservice import MgrService, RgwService
from .services.container import CustomContainerService
return inner
-class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
- metaclass=CLICommandMeta):
-
+class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule):
+ CLICommand = CephadmCLICommand
_STORE_HOST_PREFIX = "host"
instance = None
raise OrchestratorError('ssh connection %s@%s failed' % (self.ssh_user, host))
self.log.info(f'Set ssh {what}')
- @orchestrator._cli_write_command(
+ @CephadmCLICommand.Write(
prefix='cephadm set-ssh-config')
def _set_ssh_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
"""
self._validate_and_set_ssh_val('ssh_config', inbuf, old)
return 0, "", ""
- @orchestrator._cli_write_command('cephadm clear-ssh-config')
+ @CephadmCLICommand.Write('cephadm clear-ssh-config')
def _clear_ssh_config(self) -> Tuple[int, str, str]:
"""
Clear the ssh_config file
self.ssh._reconfig_ssh()
return 0, "", ""
- @orchestrator._cli_read_command('cephadm get-ssh-config')
+ @CephadmCLICommand.Read('cephadm get-ssh-config')
def _get_ssh_config(self) -> HandleCommandResult:
"""
Returns the ssh config as used by cephadm
return HandleCommandResult(stdout=ssh_config)
return HandleCommandResult(stdout=DEFAULT_SSH_CONFIG)
- @orchestrator._cli_write_command('cephadm generate-key')
+ @CephadmCLICommand.Write('cephadm generate-key')
def _generate_key(self) -> Tuple[int, str, str]:
"""
Generate a cluster SSH key (if not present)
self.ssh._reconfig_ssh()
return 0, '', ''
- @orchestrator._cli_write_command(
+ @CephadmCLICommand.Write(
'cephadm set-priv-key')
def _set_priv_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
"""Set cluster SSH private key (use -i <private_key>)"""
self.log.info('Set ssh private key')
return 0, "", ""
- @orchestrator._cli_write_command(
+ @CephadmCLICommand.Write(
'cephadm set-pub-key')
def _set_pub_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
"""Set cluster SSH public key (use -i <public_key>)"""
self._validate_and_set_ssh_val('ssh_identity_pub', inbuf, old)
return 0, "", ""
- @orchestrator._cli_write_command(
+ @CephadmCLICommand.Write(
'cephadm set-signed-cert')
def _set_signed_cert(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
"""Set a signed cert if CA signed keys are being used (use -i <cert_filename>)"""
self._validate_and_set_ssh_val('ssh_identity_cert', inbuf, old)
return 0, "", ""
- @orchestrator._cli_write_command(
+ @CephadmCLICommand.Write(
'cephadm clear-key')
def _clear_key(self) -> Tuple[int, str, str]:
"""Clear cluster SSH key"""
self.log.info('Cleared cluster SSH key')
return 0, '', ''
- @orchestrator._cli_read_command(
+ @CephadmCLICommand.Read(
'cephadm get-pub-key')
def _get_pub_key(self) -> Tuple[int, str, str]:
"""Show SSH public key for connecting to cluster hosts"""
else:
return -errno.ENOENT, '', 'No cluster SSH key defined'
- @orchestrator._cli_read_command(
+ @CephadmCLICommand.Read(
'cephadm get-signed-cert')
def _get_signed_cert(self) -> Tuple[int, str, str]:
"""Show SSH signed cert for connecting to cluster hosts using CA signed keys"""
else:
return -errno.ENOENT, '', 'No signed cert defined'
- @orchestrator._cli_read_command(
+ @CephadmCLICommand.Read(
'cephadm get-user')
def _get_user(self) -> Tuple[int, str, str]:
"""
else:
return 0, self.ssh_user, ''
- @orchestrator._cli_read_command(
+ @CephadmCLICommand.Read(
'cephadm set-user')
def set_ssh_user(self, user: str) -> Tuple[int, str, str]:
"""
self.log.info(msg)
return 0, msg, ''
- @orchestrator._cli_read_command(
+ @CephadmCLICommand.Read(
'cephadm registry-login')
def registry_login(self, url: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
"""
self.cache.distribute_new_registry_login_info()
return 0, "registry login scheduled", ''
- @orchestrator._cli_read_command('cephadm check-host')
+ @CephadmCLICommand.Read('cephadm check-host')
def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
"""Check whether we can access and manage a remote host"""
try:
self.event.set()
return 0, '%s (%s) ok' % (host, addr), '\n'.join(err)
- @orchestrator._cli_read_command(
+ @CephadmCLICommand.Read(
'cephadm prepare-host')
def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]:
"""Prepare a remote host for use with cephadm"""
self.event.set()
return 0, '%s (%s) ok' % (host, addr), '\n'.join(err)
- @orchestrator._cli_write_command(
+ @CephadmCLICommand.Write(
prefix='cephadm set-extra-ceph-conf')
def _set_extra_ceph_conf(self, inbuf: Optional[str] = None) -> HandleCommandResult:
"""
self._kick_serve_loop()
return HandleCommandResult()
- @orchestrator._cli_read_command(
+ @CephadmCLICommand.Read(
'cephadm get-extra-ceph-conf')
def _get_extra_ceph_conf(self) -> HandleCommandResult:
"""
"""
return HandleCommandResult(stdout=self.extra_ceph_conf().conf)
- @orchestrator._cli_read_command('cephadm config-check ls')
+ @CephadmCLICommand.Read('cephadm config-check ls')
def _config_checks_list(self, format: Format = Format.plain) -> HandleCommandResult:
"""List the available configuration checks and their current state"""
return HandleCommandResult(stdout=table.get_string())
- @orchestrator._cli_read_command('cephadm config-check status')
+ @CephadmCLICommand.Read('cephadm config-check status')
def _config_check_status(self) -> HandleCommandResult:
"""Show whether the configuration checker feature is enabled/disabled"""
status = self.config_checks_enabled
return HandleCommandResult(stdout="Enabled" if status else "Disabled")
- @orchestrator._cli_write_command('cephadm config-check enable')
+ @CephadmCLICommand.Write('cephadm config-check enable')
def _config_check_enable(self, check_name: str) -> HandleCommandResult:
"""Enable a specific configuration check"""
if not self._config_check_valid(check_name):
return HandleCommandResult(stdout="ok")
- @orchestrator._cli_write_command('cephadm config-check disable')
+ @CephadmCLICommand.Write('cephadm config-check disable')
def _config_check_disable(self, check_name: str) -> HandleCommandResult:
"""Disable a specific configuration check"""
if not self._config_check_valid(check_name):
return False
return conf.last_modified > dt
- @orchestrator._cli_write_command(
+ @CephadmCLICommand.Write(
'cephadm osd activate'
)
def _osd_activate(self, host: List[str]) -> HandleCommandResult:
return HandleCommandResult(stdout='\n'.join(run(host)))
- @orchestrator._cli_read_command('cephadm systemd-unit ls')
+ @CephadmCLICommand.Read('cephadm systemd-unit ls')
def _systemd_unit_ls(
self,
hostname: Optional[str] = None,
daemons = self.systemd_unit_ls(hostname, daemon_type, daemon_id)
return HandleCommandResult(stdout=json.dumps(daemons, indent=4))
- @orchestrator._cli_read_command('cephadm systemd-unit ls')
+ @CephadmCLICommand.Read('cephadm systemd-unit ls')
def systemd_unit_ls(
self,
hostname: Optional[str] = None,
systemd_unit_dict[host][d_type][d.name()] = systemd_unit
return HandleCommandResult(stdout=json.dumps(systemd_unit_dict, indent=4))
- @orchestrator._cli_read_command('orch client-keyring ls')
+ @CephadmCLICommand.Read('orch client-keyring ls')
def _client_keyring_ls(self, format: Format = Format.plain) -> HandleCommandResult:
"""
List client keyrings under cephadm management
output = table.get_string()
return HandleCommandResult(stdout=output)
- @orchestrator._cli_write_command('orch client-keyring set')
+ @CephadmCLICommand.Write('orch client-keyring set')
def _client_keyring_set(
self,
entity: str,
self._kick_serve_loop()
return HandleCommandResult()
- @orchestrator._cli_write_command('orch client-keyring rm')
+ @CephadmCLICommand.Write('orch client-keyring rm')
def _client_keyring_rm(
self,
entity: str,
from mgr_module import MgrModule, HandleCommandResult, Option
from object_format import Format
+from .cli import OrchestratorCLICommand
from ._interface import (
- CLICommandMeta,
DaemonDescription,
DaemonDescriptionStatus,
DeviceLightLoc,
ServiceDescription,
UpgradeStatusSpec,
TunedProfileSpec,
- _cli_read_command,
- _cli_write_command,
json_to_generic_spec,
raise_if_exception,
completion_to_result,
return table.get_string()
-class OrchestratorCli(OrchestratorClientMixin, MgrModule,
- metaclass=CLICommandMeta):
+class OrchestratorCli(OrchestratorClientMixin, MgrModule):
+ CLICommand = OrchestratorCLICommand
MODULE_OPTIONS = [
Option(
'orchestrator',
locs = [d['location'] for d in self.get('devices')['devices'] if d['devid'] == dev_id]
return [DeviceLightLoc(**loc) for loc in sum(locs, [])]
- @_cli_read_command(prefix='device ls-lights')
+ @OrchestratorCLICommand.Read(prefix='device ls-lights')
def _device_ls(self) -> HandleCommandResult:
"""List currently active device indicator lights"""
return HandleCommandResult(
ident = 'ident'
fault = 'fault'
- @_cli_write_command(prefix='device light')
+ @OrchestratorCLICommand.Write(prefix='device light')
def _device_light(self,
enable: DeviceLightEnable,
devid: str,
def _get_fail_fs_value(self) -> bool:
return bool(self.get_module_option("fail_fs"))
- @_cli_write_command('orch host add')
+ @OrchestratorCLICommand.Write('orch host add')
def _add_host(self,
hostname: str,
addr: Optional[str] = None,
return self._apply_misc([s], False, Format.plain)
- @_cli_write_command('orch hardware status')
+ @OrchestratorCLICommand.Write('orch hardware status')
def _hardware_status(self, hostname: Optional[str] = None, _end_positional_: int = 0, category: str = 'summary', format: Format = Format.plain) -> HandleCommandResult:
"""
Display hardware status summary
off = 'off'
get = 'get'
- @_cli_write_command('orch hardware light')
+ @OrchestratorCLICommand.Write('orch hardware light')
def _hardware_light(self,
light_type: HardwareLightType, action: HardwareLightAction,
hostname: str, device: Optional[str] = None) -> HandleCommandResult:
pass
return HandleCommandResult(stdout=output)
- @_cli_write_command('orch hardware powercycle')
+ @OrchestratorCLICommand.Write('orch hardware powercycle')
def _hardware_powercycle(self, hostname: str, yes_i_really_mean_it: bool = False) -> HandleCommandResult:
"""Reboot a host"""
completion = self.hardware_powercycle(hostname, yes_i_really_mean_it=yes_i_really_mean_it)
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
- @_cli_write_command('orch hardware shutdown')
+ @OrchestratorCLICommand.Write('orch hardware shutdown')
def _hardware_shutdown(self, hostname: str, force: Optional[bool] = False, yes_i_really_mean_it: bool = False) -> HandleCommandResult:
"""Shutdown a host"""
completion = self.hardware_shutdown(hostname, force, yes_i_really_mean_it=yes_i_really_mean_it)
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
- @_cli_write_command('orch host rm')
+ @OrchestratorCLICommand.Write('orch host rm')
def _remove_host(self, hostname: str, force: bool = False, offline: bool = False, rm_crush_entry: bool = False) -> HandleCommandResult:
"""Remove a host"""
completion = self.remove_host(hostname, force, offline, rm_crush_entry)
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
- @_cli_write_command('orch host drain')
+ @OrchestratorCLICommand.Write('orch host drain')
def _drain_host(self, hostname: str, force: bool = False, keep_conf_keyring: bool = False, zap_osd_devices: bool = False) -> HandleCommandResult:
"""drain all daemons from a host"""
completion = self.drain_host(hostname, force, keep_conf_keyring, zap_osd_devices)
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
- @_cli_write_command('orch host drain stop')
+ @OrchestratorCLICommand.Write('orch host drain stop')
def _stop_drain_host(self, hostname: str) -> HandleCommandResult:
"""drain all daemons from a host"""
completion = self.stop_drain_host(hostname)
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
- @_cli_write_command('orch host set-addr')
+ @OrchestratorCLICommand.Write('orch host set-addr')
def _update_set_addr(self, hostname: str, addr: str) -> HandleCommandResult:
"""Update a host address"""
completion = self.update_host_addr(hostname, addr)
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
- @_cli_read_command('orch host ls')
+ @OrchestratorCLICommand.Read('orch host ls')
def _get_hosts(self,
format: Format = Format.plain,
host_pattern: str = '',
output += f' with status {host_status}'
return HandleCommandResult(stdout=output)
- @_cli_write_command('orch host label add')
+ @OrchestratorCLICommand.Write('orch host label add')
def _host_label_add(self, hostname: str, label: str) -> HandleCommandResult:
"""Add a host label"""
completion = self.add_host_label(hostname, label)
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
- @_cli_write_command('orch host label rm')
+ @OrchestratorCLICommand.Write('orch host label rm')
def _host_label_rm(self, hostname: str, label: str, force: bool = False) -> HandleCommandResult:
"""Remove a host label"""
completion = self.remove_host_label(hostname, label, force)
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
- @_cli_write_command('orch host ok-to-stop')
+ @OrchestratorCLICommand.Write('orch host ok-to-stop')
def _host_ok_to_stop(self, hostname: str) -> HandleCommandResult:
"""Check if the specified host can be safely stopped without reducing availability"""""
completion = self.host_ok_to_stop(hostname)
return completion_to_result(completion)
- @_cli_write_command('orch host maintenance enter')
+ @OrchestratorCLICommand.Write('orch host maintenance enter')
def _host_maintenance_enter(self, hostname: str, force: bool = False, yes_i_really_mean_it: bool = False) -> HandleCommandResult:
"""
Prepare a host for maintenance by shutting down and disabling all Ceph daemons (cephadm only)
completion = self.enter_host_maintenance(hostname, force=force, yes_i_really_mean_it=yes_i_really_mean_it)
return completion_to_result(completion)
- @_cli_write_command('orch host maintenance exit')
+ @OrchestratorCLICommand.Write('orch host maintenance exit')
def _host_maintenance_exit(self, hostname: str, force: bool = False, offline: bool = False) -> HandleCommandResult:
"""
Return a host from maintenance, restarting all Ceph daemons (cephadm only)
completion = self.exit_host_maintenance(hostname, force, offline)
return completion_to_result(completion)
- @_cli_write_command('orch host rescan')
+ @OrchestratorCLICommand.Write('orch host rescan')
def _host_rescan(self, hostname: str, with_summary: bool = False) -> HandleCommandResult:
"""Perform a disk rescan on a host"""
completion = self.rescan_host(hostname)
return HandleCommandResult(stdout=completion.result_str())
return HandleCommandResult(stdout=completion.result_str().split('.')[0])
- @_cli_read_command('orch device replace')
+ @OrchestratorCLICommand.Read('orch device replace')
def _replace_device(self,
hostname: str,
device: str,
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
- @_cli_read_command('orch device ls')
+ @OrchestratorCLICommand.Read('orch device ls')
def _list_devices(self,
hostname: Optional[List[str]] = None,
format: Format = Format.plain,
return HandleCommandResult(stdout='\n'.join(out))
- @_cli_write_command('orch device zap')
+ @OrchestratorCLICommand.Write('orch device zap')
def _zap_device(self, hostname: str, path: str, force: bool = False) -> HandleCommandResult:
"""
Zap (erase!) a device so it can be re-used
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
- @_cli_read_command('orch ls')
+ @OrchestratorCLICommand.Read('orch ls')
def _list_services(self,
service_type: Optional[str] = None,
service_name: Optional[str] = None,
return HandleCommandResult(stdout=table.get_string())
- @_cli_read_command('orch ps')
+ @OrchestratorCLICommand.Read('orch ps')
def _list_daemons(self,
hostname: Optional[str] = None,
_end_positional_: int = 0,
result_str += f'{indent}{k}: {v}\n'
return result_str
- @_cli_read_command('orch certmgr reload')
+ @OrchestratorCLICommand.Read('orch certmgr reload')
def _cert_store_reload(self, format: Format = Format.plain) -> HandleCommandResult:
completion = self.cert_store_reload()
output = raise_if_exception(completion)
return HandleCommandResult(stdout=output)
- @_cli_read_command('orch certmgr cert ls')
+ @OrchestratorCLICommand.Read('orch certmgr cert ls')
def _cert_store_cert_ls(self,
filter_by: str = '',
show_details: bool = False,
result_str = self._process_cert_store_json(cert_ls, 0)
return HandleCommandResult(stdout=result_str)
- @_cli_read_command('orch certmgr bindings ls')
+ @OrchestratorCLICommand.Read('orch certmgr bindings ls')
def _cert_store_bindings_ls(self, format: Format = Format.plain) -> HandleCommandResult:
completion = self.cert_store_bindings_ls()
bindings_ls = raise_if_exception(completion)
result_str = yaml.dump(bindings_ls, default_flow_style=False, sort_keys=False)
return HandleCommandResult(stdout=result_str)
- @_cli_read_command('orch certmgr cert check')
+ @OrchestratorCLICommand.Read('orch certmgr cert check')
def _cert_store_cert_check(self, format: Format = Format.plain) -> HandleCommandResult:
completion = self.cert_store_cert_check()
cert_check_report = raise_if_exception(completion)
result_str = "\n".join(f"- {e}" for e in cert_check_report)
return HandleCommandResult(stdout=result_str)
- @_cli_read_command('orch certmgr key ls')
+ @OrchestratorCLICommand.Read('orch certmgr key ls')
def _cert_store_key_ls(self,
include_cephadm_generated_keys: bool = False,
format: Format = Format.plain) -> HandleCommandResult:
result_str = self._process_cert_store_json(key_ls, 0)
return HandleCommandResult(stdout=result_str)
- @_cli_read_command('orch certmgr cert get')
+ @OrchestratorCLICommand.Read('orch certmgr cert get')
def _cert_store_get_cert(
self,
cert_name: str,
cert = raise_if_exception(completion)
return HandleCommandResult(stdout=cert)
- @_cli_read_command('orch certmgr key get')
+ @OrchestratorCLICommand.Read('orch certmgr key get')
def _cert_store_get_key(
self,
key_name: str,
key = raise_if_exception(completion)
return HandleCommandResult(stdout=key)
- @_cli_write_command('orch certmgr cert-key set')
+ @OrchestratorCLICommand.Write('orch certmgr cert-key set')
def _cert_store_cert_key_set(
self,
consumer: str,
output = raise_if_exception(completion)
return HandleCommandResult(stdout=output)
- @_cli_write_command('orch certmgr cert set')
+ @OrchestratorCLICommand.Write('orch certmgr cert set')
def _cert_store_set_cert(
self,
cert_name: str,
output = raise_if_exception(completion)
return HandleCommandResult(stdout=output)
- @_cli_write_command('orch certmgr key set')
+ @OrchestratorCLICommand.Write('orch certmgr key set')
def _cert_store_set_key(
self,
key_name: str,
output = raise_if_exception(completion)
return HandleCommandResult(stdout=output)
- @_cli_write_command('orch certmgr cert rm')
+ @OrchestratorCLICommand.Write('orch certmgr cert rm')
def _cert_store_rm_cert(
self,
cert_name: str,
output = raise_if_exception(completion)
return HandleCommandResult(stdout=output)
- @_cli_write_command('orch certmgr key rm')
+ @OrchestratorCLICommand.Write('orch certmgr key rm')
def _cert_store_rm_key(
self,
key_name: str,
return _username, _password
- @_cli_write_command('orch certmgr generate-certificates')
+ @OrchestratorCLICommand.Write('orch certmgr generate-certificates')
def _cert_mgr_generate_certificates(self, module_name: str) -> HandleCommandResult:
try:
completion = self.generate_certificates(module_name)
except ArgumentError as e:
return HandleCommandResult(-errno.EINVAL, "", (str(e)))
- @_cli_write_command('orch prometheus set-credentials')
+ @OrchestratorCLICommand.Write('orch prometheus set-credentials')
def _set_prometheus_access_info(self, username: Optional[str] = None, password: Optional[str] = None, inbuf: Optional[str] = None) -> HandleCommandResult:
try:
username, password = self._get_credentials(username, password, inbuf)
except ArgumentError as e:
return HandleCommandResult(-errno.EINVAL, "", (str(e)))
- @_cli_write_command('orch prometheus set-target')
+ @OrchestratorCLICommand.Write('orch prometheus set-target')
def _set_prometheus_target(self, url: str) -> HandleCommandResult:
completion = self.set_prometheus_target(url)
result = raise_if_exception(completion)
return HandleCommandResult(stdout=json.dumps(result))
- @_cli_write_command('orch prometheus remove-target')
+ @OrchestratorCLICommand.Write('orch prometheus remove-target')
def _remove_prometheus_target(self, url: str) -> HandleCommandResult:
completion = self.remove_prometheus_target(url)
result = raise_if_exception(completion)
return HandleCommandResult(stdout=json.dumps(result))
- @_cli_write_command('orch alertmanager set-credentials')
+ @OrchestratorCLICommand.Write('orch alertmanager set-credentials')
def _set_alertmanager_access_info(self, username: Optional[str] = None, password: Optional[str] = None, inbuf: Optional[str] = None) -> HandleCommandResult:
try:
username, password = self._get_credentials(username, password, inbuf)
except ArgumentError as e:
return HandleCommandResult(-errno.EINVAL, "", (str(e)))
- @_cli_write_command('orch prometheus get-credentials')
+ @OrchestratorCLICommand.Write('orch prometheus get-credentials')
def _get_prometheus_access_info(self) -> HandleCommandResult:
completion = self.get_prometheus_access_info()
access_info = raise_if_exception(completion)
return HandleCommandResult(stdout=json.dumps(access_info))
- @_cli_write_command('orch get-security-config')
+ @OrchestratorCLICommand.Write('orch get-security-config')
def _get_security_config(self) -> HandleCommandResult:
completion = self.get_security_config()
result = raise_if_exception(completion)
return HandleCommandResult(stdout=json.dumps(result))
- @_cli_write_command('orch alertmanager get-credentials')
+ @OrchestratorCLICommand.Write('orch alertmanager get-credentials')
def _get_alertmanager_access_info(self) -> HandleCommandResult:
completion = self.get_alertmanager_access_info()
access_info = raise_if_exception(completion)
return HandleCommandResult(stdout=json.dumps(access_info))
- @_cli_write_command('orch prometheus set-custom-alerts')
+ @OrchestratorCLICommand.Write('orch prometheus set-custom-alerts')
def _set_custom_prometheus_alerts(self, inbuf: Optional[str] = None) -> HandleCommandResult:
if not inbuf:
raise OrchestratorError('This command requires passing a file with "-i <filepath>"')
out = raise_if_exception(completion)
return HandleCommandResult(stdout=json.dumps(out))
- @_cli_write_command('orch apply osd')
+ @OrchestratorCLICommand.Write('orch apply osd')
def _apply_osd(self,
all_available_devices: bool = False,
format: Format = Format.plain,
return HandleCommandResult(-errno.EINVAL, stderr='--all-available-devices is required')
- @_cli_write_command('orch daemon add osd')
+ @OrchestratorCLICommand.Write('orch daemon add osd')
def _daemon_add_osd(self,
svc_arg: Optional[str] = None,
method: Optional[OSDMethod] = None,
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
- @_cli_write_command('orch osd rm')
+ @OrchestratorCLICommand.Write('orch osd rm')
def _osd_rm_start(self,
osd_id: List[str],
replace: bool = False,
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
- @_cli_write_command('orch osd rm stop')
+ @OrchestratorCLICommand.Write('orch osd rm stop')
def _osd_rm_stop(self, osd_id: List[str]) -> HandleCommandResult:
"""Cancel ongoing OSD removal operation"""
completion = self.stop_remove_osds(osd_id)
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
- @_cli_write_command('orch osd rm status')
+ @OrchestratorCLICommand.Write('orch osd rm status')
def _osd_rm_status(self, format: Format = Format.plain) -> HandleCommandResult:
"""Status of OSD removal operation"""
completion = self.remove_osds_status()
return HandleCommandResult(stdout=out)
- @_cli_write_command('orch osd set-spec-affinity')
+ @OrchestratorCLICommand.Write('orch osd set-spec-affinity')
def _osd_set_spec(self, service_name: str, osd_id: List[str]) -> HandleCommandResult:
"""Set service spec affinity for osd"""
completion = self.set_osd_spec(service_name, osd_id)
return HandleCommandResult(stdout=res)
- @_cli_write_command('orch daemon add')
+ @OrchestratorCLICommand.Write('orch daemon add')
def daemon_add_misc(self,
daemon_type: Optional[ServiceType] = None,
placement: Optional[str] = None,
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
- @_cli_write_command('orch daemon add mds')
+ @OrchestratorCLICommand.Write('orch daemon add mds')
def _mds_add(self,
fs_name: str,
placement: Optional[str] = None,
)
return self._daemon_add_misc(spec)
- @_cli_write_command('orch daemon add rgw')
+ @OrchestratorCLICommand.Write('orch daemon add rgw')
def _rgw_add(self,
svc_id: str,
placement: Optional[str] = None,
)
return self._daemon_add_misc(spec)
- @_cli_write_command('orch daemon add nfs')
+ @OrchestratorCLICommand.Write('orch daemon add nfs')
def _nfs_add(self,
svc_id: str,
placement: Optional[str] = None,
)
return self._daemon_add_misc(spec)
- @_cli_write_command('orch daemon add iscsi')
+ @OrchestratorCLICommand.Write('orch daemon add iscsi')
def _iscsi_add(self,
pool: str,
api_user: str,
)
return self._daemon_add_misc(spec)
- @_cli_write_command('orch daemon add nvmeof')
+ @OrchestratorCLICommand.Write('orch daemon add nvmeof')
def _nvmeof_add(self,
pool: str,
group: str,
)
return self._daemon_add_misc(spec)
- @_cli_write_command('orch')
+ @OrchestratorCLICommand.Write('orch')
def _service_action(self, action: ServiceAction, service_name: str) -> HandleCommandResult:
"""Start, stop, restart, redeploy, or reconfig an entire service (i.e. all daemons)"""
completion = self.service_action(action.value, service_name)
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
- @_cli_write_command('orch daemon')
+ @OrchestratorCLICommand.Write('orch daemon')
def _daemon_action(self, action: DaemonAction, name: str, force: bool = False) -> HandleCommandResult:
"""Start, stop, restart, redeploy, reconfig, or rotate-key for a specific daemon"""
if '.' not in name:
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
- @_cli_write_command('orch daemon redeploy')
+ @OrchestratorCLICommand.Write('orch daemon redeploy')
def _daemon_action_redeploy(self,
name: str,
image: Optional[str] = None) -> HandleCommandResult:
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
- @_cli_write_command('orch daemon rm')
+ @OrchestratorCLICommand.Write('orch daemon rm')
def _daemon_rm(self,
names: List[str],
force: Optional[bool] = False) -> HandleCommandResult:
completion = self.remove_daemons(names)
return completion_to_result(completion)
- @_cli_write_command('orch rm')
+ @OrchestratorCLICommand.Write('orch rm')
def _service_rm(self,
service_name: str,
force: bool = False) -> HandleCommandResult:
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
- @_cli_write_command('orch apply')
+ @OrchestratorCLICommand.Write('orch apply')
def apply_misc(self,
service_type: Optional[ServiceType] = None,
placement: Optional[str] = None,
out = to_format(data, format, many=True, cls=None)
return HandleCommandResult(stdout=out)
- @_cli_write_command('orch apply mds')
+ @OrchestratorCLICommand.Write('orch apply mds')
def _apply_mds(self,
fs_name: str,
placement: Optional[str] = None,
return self._apply_misc([spec], dry_run, format, no_overwrite)
- @_cli_write_command('orch apply rgw')
+ @OrchestratorCLICommand.Write('orch apply rgw')
def _apply_rgw(self,
svc_id: str,
placement: Optional[str] = None,
return self._apply_misc([spec], dry_run, format, no_overwrite)
- @_cli_write_command('orch apply nfs')
+ @OrchestratorCLICommand.Write('orch apply nfs')
def _apply_nfs(self,
svc_id: str,
placement: Optional[str] = None,
return self._apply_misc([spec], dry_run, format, no_overwrite)
- @_cli_write_command('orch apply iscsi')
+ @OrchestratorCLICommand.Write('orch apply iscsi')
def _apply_iscsi(self,
pool: str,
api_user: str,
return self._apply_misc([spec], dry_run, format, no_overwrite)
- @_cli_write_command('orch apply mgmt-gateway')
+ @OrchestratorCLICommand.Write('orch apply mgmt-gateway')
def _apply_mgmt_gateway(self,
port: Optional[int] = None,
ssl: Optional[bool] = True,
return self._apply_misc([spec], dry_run, format, no_overwrite)
- @_cli_write_command('orch apply oauth2-proxy')
+ @OrchestratorCLICommand.Write('orch apply oauth2-proxy')
def _apply_oauth2_proxy(self,
https_address: Optional[str] = None,
placement: Optional[str] = None,
return self._apply_misc([spec], dry_run, format, no_overwrite)
- @_cli_write_command('orch apply nvmeof')
+ @OrchestratorCLICommand.Write('orch apply nvmeof')
def _apply_nvmeof(self,
pool: str,
group: str,
return self._apply_misc([spec], dry_run, format, no_overwrite)
- @_cli_write_command('orch apply snmp-gateway')
+ @OrchestratorCLICommand.Write('orch apply snmp-gateway')
def _apply_snmp_gateway(self,
snmp_version: SNMPGatewaySpec.SNMPVersion,
destination: str,
return self._apply_misc([spec], dry_run, format, no_overwrite)
- @_cli_write_command('orch apply jaeger')
+ @OrchestratorCLICommand.Write('orch apply jaeger')
def _apply_jaeger(self,
es_nodes: Optional[str] = None,
without_query: bool = False,
specs: List[ServiceSpec] = spec.get_tracing_specs()
return self._apply_misc(specs, dry_run, format, no_overwrite)
- @_cli_write_command('orch apply smb')
+ @OrchestratorCLICommand.Write('orch apply smb')
def _apply_smb(
self,
cluster_id: str,
return self._apply_misc([spec], dry_run, format, no_overwrite)
- @_cli_write_command('orch set-unmanaged')
+ @OrchestratorCLICommand.Write('orch set-unmanaged')
def _set_unmanaged(self, service_name: str) -> HandleCommandResult:
"""Set 'unmanaged: true' for the given service name"""
completion = self.set_unmanaged(service_name, True)
out = completion.result_str()
return HandleCommandResult(stdout=out)
- @_cli_write_command('orch set-managed')
+ @OrchestratorCLICommand.Write('orch set-managed')
def _set_managed(self, service_name: str) -> HandleCommandResult:
"""Set 'unmanaged: false' for the given service name"""
completion = self.set_unmanaged(service_name, False)
out = completion.result_str()
return HandleCommandResult(stdout=out)
- @_cli_write_command('orch set backend')
+ @OrchestratorCLICommand.Write('orch set backend')
def _set_backend(self, module_name: Optional[str] = None) -> HandleCommandResult:
"""
Select orchestrator module backend
return HandleCommandResult(-errno.EINVAL, stderr="Module '{0}' not found".format(module_name))
- @_cli_write_command('orch pause')
+ @OrchestratorCLICommand.Write('orch pause')
def _pause(self) -> HandleCommandResult:
"""Pause orchestrator background work"""
self.pause()
return HandleCommandResult()
- @_cli_write_command('orch resume')
+ @OrchestratorCLICommand.Write('orch resume')
def _resume(self) -> HandleCommandResult:
"""Resume orchestrator background work (if paused)"""
self.resume()
return HandleCommandResult()
- @_cli_write_command('orch unpause')
+ @OrchestratorCLICommand.Write('orch unpause')
def _unpause(self) -> HandleCommandResult:
"""Alias to orch resume"""
self.resume()
return HandleCommandResult()
- @_cli_write_command('orch cancel')
+ @OrchestratorCLICommand.Write('orch cancel')
def _cancel(self) -> HandleCommandResult:
"""
Cancel ongoing background operations
self.cancel_completions()
return HandleCommandResult()
- @_cli_read_command('orch status')
+ @OrchestratorCLICommand.Read('orch status')
def _status(self,
detail: bool = False,
format: Format = Format.plain) -> HandleCommandResult:
output += f"\nHost Parallelism: {result['workers']}"
return HandleCommandResult(stdout=output)
- @_cli_write_command('orch tuned-profile apply')
+ @OrchestratorCLICommand.Write('orch tuned-profile apply')
def _apply_tuned_profiles(self,
profile_name: Optional[str] = None,
placement: Optional[str] = None,
res = raise_if_exception(completion)
return HandleCommandResult(stdout=res)
- @_cli_write_command('orch tuned-profile rm')
+ @OrchestratorCLICommand.Write('orch tuned-profile rm')
def _rm_tuned_profiles(self, profile_name: str) -> HandleCommandResult:
completion = self.rm_tuned_profile(profile_name)
res = raise_if_exception(completion)
return HandleCommandResult(stdout=res)
- @_cli_read_command('orch tuned-profile ls')
+ @OrchestratorCLICommand.Read('orch tuned-profile ls')
def _tuned_profile_ls(self, format: Format = Format.plain) -> HandleCommandResult:
completion = self.tuned_profile_ls()
profiles: List[TunedProfileSpec] = raise_if_exception(completion)
out += '---\n'
return HandleCommandResult(stdout=out)
- @_cli_write_command('orch tuned-profile add-setting')
+ @OrchestratorCLICommand.Write('orch tuned-profile add-setting')
def _tuned_profile_add_setting(self, profile_name: str, setting: str, value: str) -> HandleCommandResult:
completion = self.tuned_profile_add_setting(profile_name, setting, value)
res = raise_if_exception(completion)
return HandleCommandResult(stdout=res)
- @_cli_write_command('orch tuned-profile rm-setting')
+ @OrchestratorCLICommand.Write('orch tuned-profile rm-setting')
def _tuned_profile_rm_setting(self, profile_name: str, setting: str) -> HandleCommandResult:
completion = self.tuned_profile_rm_setting(profile_name, setting)
res = raise_if_exception(completion)
return HandleCommandResult(stdout=res)
- @_cli_write_command("orch tuned-profile add-settings")
+ @OrchestratorCLICommand.Write("orch tuned-profile add-settings")
def _tuned_profile_add_settings(self, profile_name: str, settings: str) -> HandleCommandResult:
try:
setting_pairs = settings.split(",")
)
return HandleCommandResult(stderr=error_message)
- @_cli_write_command("orch tuned-profile rm-settings")
+ @OrchestratorCLICommand.Write("orch tuned-profile rm-settings")
def _tuned_profile_rm_settings(self, profile_name: str, settings: str) -> HandleCommandResult:
try:
setting = [s.strip() for s in settings.split(",") if s.strip()]
f" Maybe you meant `--ceph-version {ver}`?"
raise OrchestratorValidationError(s)
- @_cli_write_command('orch upgrade check')
+ @OrchestratorCLICommand.Write('orch upgrade check')
def _upgrade_check(self,
image: Optional[str] = None,
ceph_version: Optional[str] = None) -> HandleCommandResult:
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
- @_cli_read_command('orch upgrade ls')
+ @OrchestratorCLICommand.Read('orch upgrade ls')
def _upgrade_ls(self,
image: Optional[str] = None,
tags: bool = False,
out = json.dumps(r, indent=4)
return HandleCommandResult(stdout=out)
- @_cli_write_command('orch upgrade status')
+ @OrchestratorCLICommand.Write('orch upgrade status')
def _upgrade_status(self, format: Optional[str] = None) -> HandleCommandResult:
"""Check the status of any potential ongoing upgrade operation"""
completion = self.upgrade_status()
return HandleCommandResult(stdout=out)
return HandleCommandResult(stdout="There are no upgrades in progress currently.")
- @_cli_write_command('orch upgrade start')
+ @OrchestratorCLICommand.Write('orch upgrade start')
def _upgrade_start(self,
image: Optional[str] = None,
_end_positional_: int = 0,
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
- @_cli_write_command('orch upgrade pause')
+ @OrchestratorCLICommand.Write('orch upgrade pause')
def _upgrade_pause(self) -> HandleCommandResult:
"""Pause an in-progress upgrade"""
completion = self.upgrade_pause()
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
- @_cli_write_command('orch upgrade resume')
+ @OrchestratorCLICommand.Write('orch upgrade resume')
def _upgrade_resume(self) -> HandleCommandResult:
"""Resume paused upgrade"""
completion = self.upgrade_resume()
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
- @_cli_write_command('orch upgrade stop')
+ @OrchestratorCLICommand.Write('orch upgrade stop')
def _upgrade_stop(self) -> HandleCommandResult:
"""Stop an in-progress upgrade"""
completion = self.upgrade_stop()
raise_if_exception(completion)
return HandleCommandResult(stdout=completion.result_str())
- @_cli_write_command('orch update service')
+ @OrchestratorCLICommand.Write('orch update service')
def _update_service(self, service_type: NonCephImageServiceTypes, image: str) -> HandleCommandResult:
"""Update image for non-ceph image daemon"""
completion = self.update_service(service_type.value, service_type.name, image)