import yaml
import errno
import base64
+import functools
from mgr_module import MgrModule, CLICommand, HandleCommandResult
import orchestrator
from ceph.rgw.types import RGWAMException, RGWAMEnvMgr, RealmToken
from ceph.rgw.rgwam_core import EnvArgs, RGWAM
+from orchestrator import OrchestratorClientMixin, OrchestratorError
+class OrchestratorAPI(OrchestratorClientMixin):
+ def __init__(self, mgr):
+ super(OrchestratorAPI, self).__init__()
+ self.set_mgr(mgr) # type: ignore
+
+ def status(self):
+ try:
+ status, message, _module_details = super().available()
+ return dict(available=status, message=message)
+ except (RuntimeError, OrchestratorError, ImportError) as e:
+ return dict(available=False, message=f'Orchestrator is unavailable: {e}')
+
class RGWAMOrchMgr(RGWAMEnvMgr):
def __init__(self, mgr):
self.mgr = mgr
self.inited = False
self.lock = threading.Lock()
super(Module, self).__init__(*args, **kwargs)
+ self.api = OrchestratorAPI(self)
# ensure config options members are initialized; see config_notify()
self.config_notify()
self.get_ceph_option(opt))
self.log.debug(' native option %s = %s', opt, getattr(self, opt))
+ def check_orchestrator():
+ def inner(func):
+ @functools.wraps(func)
+ def wrapper(self, *args, **kwargs):
+ available = self.api.status()['available']
+ if available:
+ return func(self, *args, **kwargs)
+ else:
+ err_msg = f"Cephadm is not available. Please enable cephadm by 'ceph mgr module enable cephadm'."
+ return HandleCommandResult(retval=-errno.EINVAL, stdout='', stderr=err_msg)
+ return wrapper
+ return inner
+
@CLICommand('rgw admin', perm='rw')
def _cmd_rgw_admin(self, params: Sequence[str]):
"""rgw admin"""
return HandleCommandResult(retval=returncode, stdout=out, stderr=err)
@CLICommand('rgw realm bootstrap', perm='rw')
+ @check_orchestrator()
def _cmd_rgw_realm_bootstrap(self,
realm_name: Optional[str] = None,
zonegroup_name: Optional[str] = None,
return (e.retcode, e.message, e.stderr)
@CLICommand('rgw zone create', perm='rw')
+ @check_orchestrator()
def _cmd_rgw_zone_create(self,
zone_name: Optional[str] = None,
realm_token: Optional[str] = None,