Stateless Services
------------------
-.. automethod:: Orchestrator.add_stateless_service
-.. automethod:: Orchestrator.update_stateless_service
-.. automethod:: Orchestrator.remove_stateless_service
+.. autoclass:: StatelessServiceSpec
+
+.. automethod:: Orchestrator.add_mds
+.. automethod:: Orchestrator.remove_mds
+.. automethod:: Orchestrator.update_mds
+.. automethod:: Orchestrator.add_rgw
+.. automethod:: Orchestrator.remove_rgw
+.. automethod:: Orchestrator.update_rgw
+
+.. autoclass:: NFSServiceSpec
+
+.. automethod:: Orchestrator.add_nfs
+.. automethod:: Orchestrator.remove_nfs
+.. automethod:: Orchestrator.update_nfs
Upgrades
--------
-------
.. automethod:: Orchestrator.available
+.. automethod:: Orchestrator.get_feature_set
Client Modules
--------------
return ARSChangeOperation(self.ar_client, self.log, operations)
- def add_stateless_service(self, service_type, spec):
- """ Add a stateless service in the cluster
+ def add_rgw(self, spec):
+ # type: (orchestrator.RGWSpec) -> PlaybookOperation
+ """ Add a RGW service in the cluster
- : service_type: Kind of service (nfs, rgw, mds)
- : spec : an Orchestrator.StatelessServiceSpec object
+ : spec : an Orchestrator.RGWSpec object
: returns : Completion object
"""
- # Check service_type is supported
- if service_type not in ["rgw"]:
- raise orchestrator.OrchestratorError(
- "{} service not supported".format(service_type))
# Add the hosts to the inventory in the right group
- hosts = spec.service_spec.hosts
+ hosts = spec.hosts
if not hosts:
- raise orchestrator.OrchestratorError("No hosts provided."\
- "At least one destination host is needed to install the RGW "\
+ raise orchestrator.OrchestratorError("No hosts provided. "
+ "At least one destination host is needed to install the RGW "
"service")
- InventoryGroup("{}s".format(service_type), self.ar_client).update(hosts)
+ InventoryGroup("rgws", self.ar_client).update(hosts)
# Limit playbook execution to certain hosts
limited = ",".join(hosts)
# Add the settings for this service
- extravars = vars(spec.service_spec)
+ extravars = {k:v for (k,v) in spec.__dict__.items() if k.startswith('rgw_')}
+ extravars['rgw_zone'] = spec.name
# Group hosts by resource (used in rm ops)
- if service_type == "rgw":
- resource_group = "rgw_zone_{}".format(spec.service_spec.rgw_zone)
+ resource_group = "rgw_zone_{}".format(spec.name)
InventoryGroup(resource_group, self.ar_client).update(hosts)
return playbook_operation
- def remove_stateless_service(self, service_type, id_resource):
- """ Remove a stateles services providing <sv_id> resources
+ def remove_rgw(self, zone):
+ """ Remove a RGW service providing <zone>
- :svc_type : Kind of service (nfs, rgw, mds)
- :id_resource : Id of the resource provided
- <zone name> if service is RGW
+ :zone : <zone name> of the RGW
...
: returns : Completion object
"""
- # Check service_type is supported
- if service_type not in ["rgw"]:
- raise orchestrator.OrchestratorError(
- "{} service not supported".format(service_type))
# Ansible Inventory group for the kind of service
- group = "{}s".format(service_type)
+ group = "rgws"
# get the list of hosts where to remove the service
# (hosts in resource group)
- if service_type == "rgw":
- group_prefix = "rgw_zone_{}"
-
- resource_group = group_prefix.format(id_resource)
+ resource_group = "rgw_zone_{}".format(zone)
hosts_list = list(InventoryGroup(resource_group, self.ar_client))
limited = ",".join(hosts_list)
playbook_operation.clean_hosts_on_success = clean_inventory
# Execute the playbook
- self.log.info("Removing service %s for resource %s", service_type,
- id_resource)
+ self.log.info("Removing service rgw for resource %s", zone)
self._launch_operation(playbook_operation)
return playbook_operation
"""
return not self.is_persistent
+def _hide_in_features(f):
+ f._hide_in_features = True
+ return f
class Orchestrator(object):
"""
while you scan hosts every time.
"""
+ @_hide_in_features
def is_orchestrator_module(self):
"""
Enable other modules to interrogate this module to discover
"""
return True
+ @_hide_in_features
def available(self):
# type: () -> Tuple[bool, str]
"""
(e.g. based on a periodic background ping of the orchestrator)
if that's necessary to make this method fast.
- ..note:: `True` doesn't mean that the desired functionality
+ .. note::
+ `True` doesn't mean that the desired functionality
is actually available in the orchestrator. I.e. this
won't work as expected::
"""
raise NotImplementedError()
+ @_hide_in_features
def wait(self, completions):
"""
Given a list of Completion instances, progress any which are
"""
raise NotImplementedError()
+ @_hide_in_features
+ def get_feature_set(self):
+ """Describes which methods this orchestrator implements
+
+ .. note::
+ `True` doesn't mean that the desired functionality
+ is actually possible in the orchestrator. I.e. this
+ won't work as expected::
+
+ >>> api = OrchestratorClientMixin()
+ ... if api.get_feature_set()['get_hosts']['available']: # wrong.
+ ... api.get_hosts()
+
+ It's better to ask for forgiveness instead::
+
+ >>> try:
+ ... OrchestratorClientMixin().get_hosts()
+ ... except (OrchestratorError, NotImplementedError):
+ ... ...
+
+
+ :returns: Dict of API method names to ``{'available': True or False}``
+ """
+ module = self.__class__
+ features = {a: {'available': getattr(Orchestrator, a, None) != getattr(module, a)}
+ for a in Orchestrator.__dict__
+ if not a.startswith('_') and not getattr(getattr(Orchestrator, a), '_hide_in_features', False)
+ }
+ return features
+
def add_host(self, host):
# type: (str) -> WriteCompletion
"""
"""
raise NotImplementedError()
- def add_stateless_service(self, service_type, spec):
- # type: (str, StatelessServiceSpec) -> WriteCompletion
- """
- Installing and adding a completely new service to the cluster.
+ def add_mds(self, spec):
+ # type: (StatelessServiceSpec) -> WriteCompletion
+ """Create a new MDS cluster"""
+ raise NotImplementedError()
- This is not about starting services.
- """
+ def remove_mds(self, name):
+ # type: (str) -> WriteCompletion
+ """Remove an MDS cluster"""
raise NotImplementedError()
- def update_stateless_service(self, service_type, spec):
- # type: (str, StatelessServiceSpec) -> WriteCompletion
+ def update_mds(self, spec):
+ # type: (StatelessServiceSpec) -> WriteCompletion
"""
- This is about changing / redeploying existing services. Like for
- example changing the number of service instances.
-
- :rtype: WriteCompletion
+ Update / redeploy existing MDS cluster
+ Like for example changing the number of service instances.
"""
raise NotImplementedError()
- def remove_stateless_service(self, service_type, id_resource):
- # type: (str, str) -> WriteCompletion
+ def add_nfs(self, spec):
+ # type: (NFSServiceSpec) -> WriteCompletion
+ """Create a new MDS cluster"""
+ raise NotImplementedError()
+
+ def remove_nfs(self, name):
+ # type: (str) -> WriteCompletion
+ """Remove a NFS cluster"""
+ raise NotImplementedError()
+
+ def update_nfs(self, spec):
+ # type: (NFSServiceSpec) -> WriteCompletion
"""
- Uninstalls an existing service from the cluster.
+ Update / redeploy existing NFS cluster
+ Like for example changing the number of service instances.
+ """
+ raise NotImplementedError()
+
+ def add_rgw(self, spec):
+ # type: (RGWSpec) -> WriteCompletion
+ """Create a new MDS zone"""
+ raise NotImplementedError()
+
+ def remove_rgw(self, zone):
+ # type: (str) -> WriteCompletion
+ """Remove a RGW zone"""
+ raise NotImplementedError()
- This is not about stopping services.
+ def update_rgw(self, spec):
+ # type: (StatelessServiceSpec) -> WriteCompletion
+ """
+ Update / redeploy existing RGW zone
+ Like for example changing the number of service instances.
"""
raise NotImplementedError()
+ @_hide_in_features
def upgrade_start(self, upgrade_spec):
# type: (UpgradeSpec) -> WriteCompletion
raise NotImplementedError()
+ @_hide_in_features
def upgrade_status(self):
# type: () -> ReadCompletion[UpgradeStatusSpec]
"""
"""
raise NotImplementedError()
+ @_hide_in_features
def upgrade_available(self):
# type: () -> ReadCompletion[List[str]]
"""
"""
Details of stateless service creation.
- This is *not* supposed to contain all the configuration
- of the services: it's just supposed to be enough information to
- execute the binaries.
+ Request to orchestrator for a group of stateless services
+ such as MDS, RGW or iscsi gateway
"""
+ # This structure is supposed to be enough information to
+ # start the services.
- def __init__(self):
- self.placement = PlacementSpec()
+ def __init__(self, name, placement=None, count=None):
+ self.placement = PlacementSpec() if placement is None else placement
+
+ #: Give this set of statelss services a name: typically it would
+ #: be the name of a CephFS filesystem, RGW zone, etc. Must be unique
+ #: within one ceph cluster.
+ self.name = name
+
+ #: Count of service instances
+ self.count = 1 if count is None else count
+
+ def validate_add(self):
+ if not self.name:
+ raise OrchestratorValidationError('Cannot add Service: Name required')
+
+
+class NFSServiceSpec(StatelessServiceSpec):
+ def __init__(self, name, pool=None, namespace=None, count=1, placement=None):
+ super(NFSServiceSpec, self).__init__(name, placement, count)
+
+ #: RADOS pool where NFS client recovery data is stored.
+ self.pool = pool
- # Give this set of statelss services a name: typically it would
- # be the name of a CephFS filesystem, RGW zone, etc. Must be unique
- # within one ceph cluster.
- self.name = ""
+ #: RADOS namespace where NFS client recovery data is stored in the pool.
+ self.namespace = namespace
- # Count of service instances
- self.count = 1
+ def validate_add(self):
+ super(NFSServiceSpec, self).validate_add()
- # Arbitrary JSON-serializable object.
- # Maybe you're using e.g. kubenetes and you want to pass through
- # some replicaset special sauce for autoscaling?
- self.extended = {}
+ if not self.pool:
+ raise OrchestratorValidationError('Cannot add NFS: No Pool specified')
- # Object with specific settings for the service
- self.service_spec = None
-class RGWSpec(object):
+class RGWSpec(StatelessServiceSpec):
"""
- Settings to configure a multisite Ceph RGW
+ Settings to configure a (multisite) Ceph RGW
"""
+ # TODO: move all default values to a dedicated method. I don't want to overwrite
+ # Rook's default values.
def __init__(self, hosts=None, rgw_multisite=True, rgw_zone="Default_Zone",
rgw_zonemaster=True, rgw_zonesecondary=False,
rgw_multisite_proto="http", rgw_frontend_port="8080",
rgw_realm="RGW_Realm", system_access_key=None,
system_secret_key=None):
+ super(RGWSpec, self).__init__(name=rgw_zone)
self.hosts = hosts
self.rgw_multisite = rgw_multisite
- self.rgw_zone = rgw_zone
self.rgw_zonemaster = rgw_zonemaster
self.rgw_zonesecondary = rgw_zonesecondary
self.rgw_multisite_proto = rgw_multisite_proto
self.rgw_frontend_port = rgw_frontend_port
- if hosts:
+ if hosts and self.rgw_multisite:
self.rgw_multisite_endpoint_addr = hosts[0]
self.rgw_multisite_endpoints_list = ",".join(
:nchars : Length of the returned string
"""
+ # TODO Python 3: use Secrets module instead.
return ''.join(random.choice(string.ascii_uppercase +
string.ascii_lowercase +
string.digits) for _ in range(nchars))
@classmethod
- def from_json(self, json_rgw_spec):
+ def from_json(cls, json_rgw_spec):
+ # type: (dict) -> RGWSpec
"""
- Initialize 'RGWSpec' object geting data from a json estructure
- :param json_rgw_spec: A valid json string with a the RGW settings
+ Initialize 'RGWSpec' object data from a json structure
+ :param json_rgw_spec: A valid dict with a the RGW settings
"""
+ # TODO: also add PlacementSpec(**json_rgw_spec['placement'])
args = {k:v for k, v in json_rgw_spec.items()}
return RGWSpec(**args)
orchestrator.raise_if_exception(completion)
return HandleCommandResult(stdout=str(completion.result))
- def _add_stateless_svc(self, svc_type, spec):
- completion = self.add_stateless_service(svc_type, spec)
- self._orchestrator_wait([completion])
- orchestrator.raise_if_exception(completion)
- return HandleCommandResult(stdout=str(completion.result))
-
@_write_cli('orchestrator mds add',
"name=svc_arg,type=CephString",
'Create an MDS service')
def _mds_add(self, svc_arg):
- spec = orchestrator.StatelessServiceSpec()
- spec.name = svc_arg
- return self._add_stateless_svc("mds", spec)
+ spec = orchestrator.StatelessServiceSpec(svc_arg)
+ completion = self.add_mds(spec)
+ self._orchestrator_wait([completion])
+ orchestrator.raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result)
@_write_cli('orchestrator rgw add',
'name=svc_arg,type=CephString,req=false',
'Create an RGW service. A complete <rgw_spec> can be provided'\
' using <-i> to customize completelly the RGW service')
def _rgw_add(self, svc_arg=None, inbuf=None):
- """
- """
usage = """
Usage:
ceph orchestrator rgw add -i <json_file>
msg = 'Failed to read JSON input: {}'.format(str(e)) + usage
return HandleCommandResult(-errno.EINVAL, stderr=msg)
elif svc_arg:
- rgw_spec = orchestrator.RGWSpec()
- rgw_spec.zone_name = svc_arg
-
- spec = orchestrator.StatelessServiceSpec()
- spec.service_spec = rgw_spec
- spec.name = rgw_spec.rgw_zone
+ rgw_spec = orchestrator.RGWSpec()
+ rgw_spec.zone_name = svc_arg
+ else:
+ return HandleCommandResult(-errno.EINVAL, stderr=usage)
- return self._add_stateless_svc("rgw", spec)
+ completion = self.add_rgw(rgw_spec)
+ self._orchestrator_wait([completion])
+ orchestrator.raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result)
@_write_cli('orchestrator nfs add',
"name=svc_arg,type=CephString "
"name=namespace,type=CephString,req=false",
'Create an NFS service')
def _nfs_add(self, svc_arg, pool, namespace=None):
- spec = orchestrator.StatelessServiceSpec()
- spec.name = svc_arg
- spec.extended = { "pool":pool }
- if namespace is not None:
- spec.extended["namespace"] = namespace
- return self._add_stateless_svc("nfs", spec)
-
- def _rm_stateless_svc(self, svc_type, svc_id):
- completion = self.remove_stateless_service(svc_type, svc_id)
+ spec = orchestrator.NFSServiceSpec(svc_arg, pool=pool, namespace=namespace)
+ spec.validate_add()
+ completion = self.add_nfs(spec)
self._orchestrator_wait([completion])
orchestrator.raise_if_exception(completion)
- return HandleCommandResult(stdout=str(completion.result))
+ return HandleCommandResult(stdout=completion.result)
@_write_cli('orchestrator mds rm',
"name=svc_id,type=CephString",
'Remove an MDS service')
def _mds_rm(self, svc_id):
- return self._rm_stateless_svc("mds", svc_id)
+ completion = self.remove_mds(svc_id)
+ self._orchestrator_wait([completion])
+ orchestrator.raise_if_exception(completion)
+ return HandleCommandResult()
@_write_cli('orchestrator rgw rm',
"name=svc_id,type=CephString",
'Remove an RGW service')
def _rgw_rm(self, svc_id):
- return self._rm_stateless_svc("rgw", svc_id)
+ completion = self.remove_rgw(svc_id)
+ self._orchestrator_wait([completion])
+ orchestrator.raise_if_exception(completion)
+ return HandleCommandResult()
@_write_cli('orchestrator nfs rm',
"name=svc_id,type=CephString",
'Remove an NFS service')
def _nfs_rm(self, svc_id):
- return self._rm_stateless_svc("nfs", svc_id)
+ completion = self.remove_nfs(svc_id)
+ self._orchestrator_wait([completion])
+ orchestrator.raise_if_exception(completion)
+ return HandleCommandResult()
@_write_cli('orchestrator nfs update',
"name=svc_id,type=CephString "
"name=num,type=CephInt",
'Scale an NFS service')
def _nfs_update(self, svc_id, num):
- spec = orchestrator.StatelessServiceSpec()
- spec.name = svc_id
- spec.count = num
- completion = self.update_stateless_service("nfs", spec)
+ spec = orchestrator.NFSServiceSpec(svc_id, count=num)
+ completion = self.update_nfs(spec)
self._orchestrator_wait([completion])
return HandleCommandResult()
import pytest
-from orchestrator import InventoryDevice, ReadCompletion, raise_if_exception
+from orchestrator import InventoryDevice, ReadCompletion, raise_if_exception, RGWSpec
def test_inventory_device():
i_d = InventoryDevice()
c.exception = ZeroDivisionError()
with pytest.raises(ZeroDivisionError):
raise_if_exception(c)
+
+
+def test_rgwspec():
+ """
+ {
+ "rgw_zone": "zonename",
+ "rgw_frontend_port": 8080,
+ "rgw_zonegroup": "group",
+ "rgw_zone_user": "user",
+ "rgw_realm": "realm",
+ "count": 3
+ }
+ """
+ example = json.loads(test_rgwspec.__doc__.strip())
+ spec = RGWSpec.from_json(example)
+ assert spec.validate_add() is None
return RookWriteCompletion(lambda: func(spec), None,
"Creating {0} services for {1}".format(typename, spec.name))
- def add_stateless_service(self, service_type, spec):
- # assert isinstance(spec, orchestrator.StatelessServiceSpec)
- if service_type == "mds":
- return self._service_add_decorate("Filesystem", spec,
- self.rook_cluster.add_filesystem)
- elif service_type == "rgw" :
- return self._service_add_decorate("RGW", spec,
- self.rook_cluster.add_objectstore)
- elif service_type == "nfs" :
- return self._service_add_decorate("NFS", spec,
- self.rook_cluster.add_nfsgw)
- else:
- raise NotImplementedError(service_type)
+ def add_mds(self, spec):
+ return self._service_add_decorate("Filesystem", spec,
+ self.rook_cluster.add_filesystem)
+
+ def add_rgw(self, spec):
+ return self._service_add_decorate("RGW", spec,
+ self.rook_cluster.add_objectstore)
+
+ def add_nfs(self, spec):
+ return self._service_add_decorate("NFS", spec,
+ self.rook_cluster.add_nfsgw)
+
+ def remove_mds(self, name):
+ return RookWriteCompletion(
+ lambda: self.rook_cluster.rm_service('cephfilesystems', name), None,
+ "Removing {0} services for {1}".format('mds', name))
+
+ def remove_rgw(self, zone):
+ return RookWriteCompletion(
+ lambda: self.rook_cluster.rm_service('cephobjectstores', zone), None,
+ "Removing {0} services for {1}".format('rgw', zone))
- def remove_stateless_service(self, service_type, service_id):
+ def remove_nfs(self, name):
return RookWriteCompletion(
- lambda: self.rook_cluster.rm_service(service_type, service_id), None,
- "Removing {0} services for {1}".format(service_type, service_id))
+ lambda: self.rook_cluster.rm_service('cephnfses', name), None,
+ "Removing {0} services for {1}".format('nfs', name))
def update_mons(self, num, hosts):
if hosts:
lambda: self.rook_cluster.update_mon_count(num), None,
"Updating mon count to {0}".format(num))
- def update_stateless_service(self, svc_type, spec):
- # only nfs is currently supported
- if svc_type != "nfs":
- raise NotImplementedError(svc_type)
-
+ def update_nfs(self, spec):
num = spec.count
return RookWriteCompletion(
lambda: self.rook_cluster.update_nfs_count(spec.name, num), None,
self.rook_api_post("cephfilesystems/", body=rook_fs)
def add_nfsgw(self, spec):
+ # type: (orchestrator.NFSServiceSpec) -> None
# TODO use spec.placement
- # TODO warn if spec.extended has entries we don't kow how
- # to action.
rook_nfsgw = {
"apiVersion": self.rook_env.api_name,
},
"spec": {
"rados": {
- "pool": spec.extended["pool"]
+ "pool": spec.pool
},
"server": {
"active": spec.count,
}
}
- if "namespace" in spec.extended:
- rook_nfsgw["spec"]["rados"]["namespace"] = spec.extended["namespace"]
+ if spec.namespace:
+ rook_nfsgw["spec"]["rados"]["namespace"] = spec.namespace
with self.ignore_409("NFS cluster '{0}' already exists".format(spec.name)):
self.rook_api_post("cephnfses/", body=rook_nfsgw)
with self.ignore_409("CephObjectStore '{0}' already exists".format(spec.name)):
self.rook_api_post("cephobjectstores/", body=rook_os)
- def rm_service(self, service_type, service_id):
- assert service_type in ("mds", "rgw", "nfs")
-
- if service_type == "mds":
- rooktype = "cephfilesystems"
- elif service_type == "rgw":
- rooktype = "cephobjectstores"
- elif service_type == "nfs":
- rooktype = "cephnfses"
+ def rm_service(self, rooktype, service_id):
objpath = "{0}/{1}".format(rooktype, service_id)
self.rook_api_delete(objpath)
except ApiException as e:
if e.status == 404:
- log.info("{0} service '{1}' does not exist".format(service_type, service_id))
+ log.info("{0} service '{1}' does not exist".format(rooktype, service_id))
# Idempotent, succeed.
else:
raise
return result
- @deferred_write("Adding stateless service")
- def add_stateless_service(self, service_type, spec):
- pass
@deferred_write("create_osds")
def create_osds(self, drive_group, all_hosts):
def service_action(self, action, service_type, service_name=None, service_id=None):
pass
- @deferred_write("remove_stateless_service")
- def remove_stateless_service(self, service_type, id_):
+ @deferred_write("Adding NFS service")
+ def add_nfs(self, spec):
+ assert isinstance(spec.pool, str)
+
+ @deferred_write("remove_nfs")
+ def remove_nfs(self, name):
+ pass
+
+ @deferred_write("update_nfs")
+ def update_nfs(self, spec):
+ pass
+
+ @deferred_write("add_mds")
+ def add_mds(self, spec):
pass
- @deferred_write("update_stateless_service")
- def update_stateless_service(self, service_type, spec):
+ @deferred_write("remove_mds")
+ def remove_mds(self, name):
pass
+ @deferred_write("add_rgw")
+ def add_rgw(self, spec):
+ pass
+
+ @deferred_write("remove_rgw")
+ def remove_rgw(self, zone):
+ pass
+
+
@deferred_read
def get_hosts(self):
return [orchestrator.InventoryNode('localhost', [])]
return self.mgr.mon_command(command)
def create_mds(self, fs_name):
- spec = orchestrator.StatelessServiceSpec()
- spec.name = fs_name
+ spec = orchestrator.StatelessServiceSpec(fs_name)
try:
- completion = self.mgr.add_stateless_service("mds", spec)
+ completion = self.mgr.add_mds(spec)
self.mgr._orchestrator_wait([completion])
orchestrator.raise_if_exception(completion)
except (ImportError, orchestrator.OrchestratorError):
self.connection_pool.del_fs_handle(volname)
# Tear down MDS daemons
try:
- completion = self.mgr.remove_stateless_service("mds", volname)
+ completion = self.mgr.remove_mds(volname)
self.mgr._orchestrator_wait([completion])
orchestrator.raise_if_exception(completion)
except (ImportError, orchestrator.OrchestratorError):