import errno
import base64
import functools
+import sys
-from mgr_module import MgrModule, CLICommand, HandleCommandResult
+from mgr_module import MgrModule, CLICommand, HandleCommandResult, Option
import orchestrator
from ceph.deployment.service_spec import RGWSpec, PlacementSpec, SpecValidationError
-from typing import Any, Optional, Sequence, Iterator, List
+from typing import Any, Optional, Sequence, Iterator, List, Callable, TypeVar, cast, Dict, Tuple, Union
from ceph.rgw.types import RGWAMException, RGWAMEnvMgr, RealmToken
from ceph.rgw.rgwam_core import EnvArgs, RGWAM
-from orchestrator import OrchestratorClientMixin, OrchestratorError
+from orchestrator import OrchestratorClientMixin, OrchestratorError, DaemonDescription, OrchResult
+
+
+FuncT = TypeVar('FuncT', bound=Callable[..., Any])
+
+# this uses a version check as opposed to a try/except because this
+# form makes mypy happy and try/except doesn't.
+if sys.version_info >= (3, 8):
+ from typing import Protocol
+else:
+ # typing_extensions will not be available for the real mgr server
+ from typing_extensions import Protocol
+
+
+class MgrModuleProtocol(Protocol):
+ def tool_exec(self, args: List[str]) -> Tuple[int, str, str]:
+ ...
+
+ def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]:
+ ...
+
+ def list_daemons(self, service_name: Optional[str] = None,
+ daemon_type: Optional[str] = None,
+ daemon_id: Optional[str] = None,
+ host: Optional[str] = None,
+ refresh: bool = False) -> OrchResult[List['DaemonDescription']]:
+ ...
class RGWSpecParsingError(Exception):
class OrchestratorAPI(OrchestratorClientMixin):
- def __init__(self, mgr):
+ def __init__(self, mgr: MgrModule):
super(OrchestratorAPI, self).__init__()
- self.set_mgr(mgr) # type: ignore
+ self.set_mgr(mgr)
- def status(self):
+ def status(self) -> Dict[str, Union[str, bool]]:
try:
status, message, _module_details = super().available()
return dict(available=status, message=message)
class RGWAMOrchMgr(RGWAMEnvMgr):
- def __init__(self, mgr):
+ def __init__(self, mgr: MgrModuleProtocol):
self.mgr = mgr
- def tool_exec(self, prog, args):
+ def tool_exec(self, prog: str, args: List[str]) -> Tuple[List[str], int, str, str]:
cmd = [prog] + args
rc, stdout, stderr = self.mgr.tool_exec(args=cmd)
return cmd, rc, stdout, stderr
- def apply_rgw(self, spec):
+ def apply_rgw(self, spec: RGWSpec) -> None:
completion = self.mgr.apply_rgw(spec)
orchestrator.raise_if_exception(completion)
- def list_daemons(self, service_name, daemon_type=None, daemon_id=None, host=None, refresh=True):
+ def list_daemons(self, service_name: Optional[str] = None,
+ daemon_type: Optional[str] = None,
+ daemon_id: Optional[str] = None,
+ host: Optional[str] = None,
+ refresh: bool = True) -> List['DaemonDescription']:
completion = self.mgr.list_daemons(service_name,
daemon_type,
daemon_id=daemon_id,
return orchestrator.raise_if_exception(completion)
+def check_orchestrator(func: FuncT) -> FuncT:
+ @functools.wraps(func)
+ def wrapper(self: Any, *args: Any, **kwargs: Any) -> HandleCommandResult:
+ available = self.api.status()['available']
+ if available:
+ return func(self, *args, **kwargs)
+ else:
+ err_msg = "Cephadm is not available. Please enable cephadm by 'ceph mgr module enable cephadm'."
+ return HandleCommandResult(retval=-errno.EINVAL, stdout='', stderr=err_msg)
+ return cast(FuncT, wrapper)
+
+
class Module(orchestrator.OrchestratorClientMixin, MgrModule):
- MODULE_OPTIONS = []
+ MODULE_OPTIONS: List[Option] = []
# These are "native" Ceph options that this module cares about.
- NATIVE_OPTIONS = []
+ NATIVE_OPTIONS: List[Option] = []
def __init__(self, *args: Any, **kwargs: Any):
self.inited = False
# Do the same for the native options.
for opt in self.NATIVE_OPTIONS:
setattr(self,
- opt,
+ opt, # type: ignore
self.get_ceph_option(opt))
- self.log.debug(' native option %s = %s', opt, getattr(self, opt))
-
- def check_orchestrator():
- def inner(func):
- @functools.wraps(func)
- def wrapper(self, *args, **kwargs):
- available = self.api.status()['available']
- if available:
- return func(self, *args, **kwargs)
- else:
- err_msg = "Orchestrator is not available. Please enable cephadm by 'ceph mgr module enable cephadm'."
- return HandleCommandResult(retval=-errno.EINVAL, stdout='', stderr=err_msg)
- return wrapper
- return inner
+ self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore
@CLICommand('rgw admin', perm='rw')
- def _cmd_rgw_admin(self, params: Sequence[str]):
+ def _cmd_rgw_admin(self, params: Sequence[str]) -> HandleCommandResult:
"""rgw admin"""
cmd, returncode, out, err = self.env.mgr.tool_exec('radosgw-admin', params or [])
return HandleCommandResult(retval=returncode, stdout=out, stderr=err)
@CLICommand('rgw realm bootstrap', perm='rw')
- @check_orchestrator()
+ @check_orchestrator
def _cmd_rgw_realm_bootstrap(self,
realm_name: Optional[str] = None,
zonegroup_name: Optional[str] = None,
placement: Optional[str] = None,
zone_endpoints: Optional[str] = None,
start_radosgw: Optional[bool] = True,
- inbuf: Optional[str] = None):
+ inbuf: Optional[str] = None) -> HandleCommandResult:
"""Bootstrap new rgw realm, zonegroup, and zone"""
if inbuf:
return HandleCommandResult(retval=0, stdout="Realm(s) created correctly. Please, use 'ceph rgw realm tokens' to get the token.", stderr='')
- def _parse_rgw_specs(self, inbuf: Optional[str] = None) -> List[RGWSpec]:
+ def _parse_rgw_specs(self, inbuf: str) -> List[RGWSpec]:
"""Parse RGW specs from a YAML file."""
# YAML '---' document separator with no content generates
# None entries in the output. Let's skip them silently.
def _cmd_rgw_realm_new_zone_creds(self,
realm_name: Optional[str] = None,
endpoints: Optional[str] = None,
- sys_uid: Optional[str] = None):
+ sys_uid: Optional[str] = None) -> HandleCommandResult:
"""Create credentials for new zone creation"""
try:
return HandleCommandResult(retval=retval, stdout=out, stderr=err)
@CLICommand('rgw realm zone-creds remove', perm='rw')
- def _cmd_rgw_realm_rm_zone_creds(self, realm_token: Optional[str] = None):
+ def _cmd_rgw_realm_rm_zone_creds(self, realm_token: Optional[str] = None) -> HandleCommandResult:
"""Create credentials for new zone creation"""
try:
return HandleCommandResult(retval=retval, stdout=out, stderr=err)
@CLICommand('rgw realm tokens', perm='r')
- def list_realm_tokens(self):
+ def list_realm_tokens(self) -> HandleCommandResult:
try:
realms_info = []
for realm_info in RGWAM(self.env).get_realms_info():
return HandleCommandResult(retval=0, stdout=json.dumps(realms_info, indent=4), stderr='')
@CLICommand('rgw zone modify', perm='rw')
- def update_zone_info(self, realm_name: str, zonegroup_name: str, zone_name: str, realm_token: str, zone_endpoints: List[str]):
+ def update_zone_info(self, realm_name: str, zonegroup_name: str, zone_name: str, realm_token: str, zone_endpoints: List[str]) -> HandleCommandResult:
try:
retval, out, err = RGWAM(self.env).zone_modify(realm_name,
zonegroup_name,
zone_name,
zone_endpoints,
realm_token)
- return (retval, 'Zone updated successfully', '')
+ return HandleCommandResult(retval, 'Zone updated successfully', '')
except RGWAMException as e:
self.log.error('cmd run exception: (%d) %s' % (e.retcode, e.message))
return HandleCommandResult(retval=e.retcode, stdout=e.stdout, stderr=e.stderr)
@CLICommand('rgw zone create', perm='rw')
- @check_orchestrator()
+ @check_orchestrator
def _cmd_rgw_zone_create(self,
zone_name: Optional[str] = None,
realm_token: Optional[str] = None,
placement: Optional[str] = None,
start_radosgw: Optional[bool] = True,
zone_endpoints: Optional[str] = None,
- inbuf: Optional[str] = None):
+ inbuf: Optional[str] = None) -> HandleCommandResult:
"""Bootstrap new rgw zone that syncs with zone on another cluster in the same realm"""
if inbuf:
created_zones = []
for rgw_spec in rgw_specs:
RGWAM(self.env).zone_create(rgw_spec, start_radosgw)
- created_zones.append(rgw_spec.rgw_zone)
+ if rgw_spec.rgw_zone is not None:
+ created_zones.append(rgw_spec.rgw_zone)
except RGWAMException as e:
self.log.error('cmd run exception: (%d) %s' % (e.retcode, e.message))
return HandleCommandResult(retval=e.retcode, stdout=e.stdout, stderr=e.stderr)
realm_name: Optional[str] = None,
zonegroup_name: Optional[str] = None,
zone_name: Optional[str] = None,
- update: Optional[bool] = False):
+ update: Optional[bool] = False) -> HandleCommandResult:
"""Bootstrap new rgw zone that syncs with existing zone"""
try: