From 846761ef7afab43144f38bf5631fd859d6964820 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Mon, 13 Jan 2020 13:01:20 +0100 Subject: [PATCH] mgr/rook: Make use of rook-client-python when talking to Rook Fixes: * `CephFilesystem.spec.onlyManageDaemons` does not exist * `CephObjectStroe.spec.gateway.allNodes` does not exist * Added directory-osds to existsing nodes was broken Signed-off-by: Sebastian Wagner --- src/pybind/mgr/rook/requirements.txt | 2 + src/pybind/mgr/rook/rook_cluster.py | 315 +++++++++++++-------------- 2 files changed, 151 insertions(+), 166 deletions(-) create mode 100644 src/pybind/mgr/rook/requirements.txt diff --git a/src/pybind/mgr/rook/requirements.txt b/src/pybind/mgr/rook/requirements.txt new file mode 100644 index 000000000000..378de08e6693 --- /dev/null +++ b/src/pybind/mgr/rook/requirements.txt @@ -0,0 +1,2 @@ +kubernetes +jsonpatch \ No newline at end of file diff --git a/src/pybind/mgr/rook/rook_cluster.py b/src/pybind/mgr/rook/rook_cluster.py index bb667f0703f0..60ebf1e5b695 100644 --- a/src/pybind/mgr/rook/rook_cluster.py +++ b/src/pybind/mgr/rook/rook_cluster.py @@ -11,6 +11,7 @@ import logging import json from contextlib import contextmanager +import jsonpatch from six.moves.urllib.parse import urljoin # pylint: disable=import-error # Optional kubernetes imports to enable MgrModule.can_run @@ -32,6 +33,11 @@ except ImportError: class ApiException(Exception): # type: ignore status = 0 +from .rook_client.ceph import cephfilesystem as cfs +from .rook_client.ceph import cephnfs as cnfs +from .rook_client.ceph import cephobjectstore as cos +from .rook_client.ceph import cephcluster as ccl + import orchestrator @@ -341,86 +347,81 @@ class RookCluster(object): # TODO warn if spec.extended has entries we don't kow how # to action. - rook_fs = { - "apiVersion": self.rook_env.api_name, - "kind": "CephFilesystem", - "metadata": { - "name": spec.name, - "namespace": self.rook_env.namespace - }, - "spec": { - "onlyManageDaemons": True, - "metadataServer": { - "activeCount": spec.count, - "activeStandby": True - - } - } - } + rook_fs = cfs.CephFilesystem( + apiVersion=self.rook_env.api_name, + metadata=dict( + name=spec.name, + namespace=self.rook_env.namespace, + ), + spec=cfs.Spec( + metadataServer=cfs.MetadataServer( + activeCount=spec.count, + activeStandby=True + ) + ) + ) with self.ignore_409("CephFilesystem '{0}' already exists".format(spec.name)): - self.rook_api_post("cephfilesystems/", body=rook_fs) + self.rook_api_post("cephfilesystems/", body=rook_fs.to_json()) def add_nfsgw(self, spec): - # type: (orchestrator.NFSServiceSpec) -> None # TODO use spec.placement + # TODO warn if spec.extended has entries we don't kow how + # to action. - rook_nfsgw = { - "apiVersion": self.rook_env.api_name, - "kind": "CephNFS", - "metadata": { - "name": spec.name, - "namespace": self.rook_env.namespace - }, - "spec": { - "rados": { - "pool": spec.pool - }, - "server": { - "active": spec.count, - } - } - } + rook_nfsgw = cnfs.CephNFS( + apiVersion=self.rook_env.api_name, + metadata=dict( + name=spec.name, + namespace=self.rook_env.namespace, + ), + spec=cnfs.Spec( + rados=cnfs.Rados( + pool=spec.pool + ), + server=cnfs.Server( + active=spec.count + ) + ) + ) if spec.namespace: - rook_nfsgw["spec"]["rados"]["namespace"] = spec.namespace # type: ignore + rook_nfsgw.spec.rados.namespace = spec.namespace with self.ignore_409("NFS cluster '{0}' already exists".format(spec.name)): - self.rook_api_post("cephnfses/", body=rook_nfsgw) + self.rook_api_post("cephnfses/", body=rook_nfsgw.to_json()) def add_objectstore(self, spec): - # type: (orchestrator.RGWSpec) -> None - rook_os = { - "apiVersion": self.rook_env.api_name, - "kind": "CephObjectStore", - "metadata": { - "name": spec.name, - "namespace": self.rook_env.namespace - }, - "spec": { - "metadataPool": { - "failureDomain": "host", - "replicated": { - "size": 1 - } - }, - "dataPool": { - "failureDomain": "osd", - "replicated": { - "size": 1 - } - }, - "gateway": { - "type": "s3", - "port": spec.rgw_frontend_port if spec.rgw_frontend_port is not None else 80, - "instances": spec.count, - "allNodes": False - } - } - } + rook_os = cos.CephObjectStore( + apiVersion=self.rook_env.api_name, + metadata=dict( + name=spec.name, + namespace=self.rook_env.namespace + ), + spec=cos.Spec( + metadataPool=cos.MetadataPool( + failureDomain='host', + replicated=cos.Replicated( + size=1 + ) + ), + dataPool=cos.DataPool( + failureDomain='osd', + replicated=cos.Replicated( + size=1 + ) + ), + gateway=cos.Gateway( + type='s3', + port=spec.rgw_frontend_port if spec.rgw_frontend_port is not None else 80, + instances=spec.count + ) + ) + ) + with self.ignore_409("CephObjectStore '{0}' already exists".format(spec.name)): - self.rook_api_post("cephobjectstores/", body=rook_os) + self.rook_api_post("cephobjectstores/", body=rook_os.to_json()) def rm_service(self, rooktype, service_id): @@ -448,45 +449,25 @@ class RookCluster(object): return node_name in self.get_node_names() def update_mon_count(self, newcount): - patch = [{"op": "replace", "path": "/spec/mon/count", "value": newcount}] - - try: - self.rook_api_patch( - "cephclusters/{0}".format(self.rook_env.cluster_name), - body=patch) - except ApiException as e: - log.exception("API exception: {0}".format(e)) - raise ApplyException( - "Failed to update mon count in Cluster CRD: {0}".format(e)) - - return "Updated mon count to {0}".format(newcount) + def _update_mon_count(current, new): + # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster + new.spec.mon.count = newcount + return new + return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count) def update_mds_count(self, svc_id, newcount): - patch = [{"op": "replace", "path": "/spec/metadataServer/activeCount", - "value": newcount}] - - try: - self.rook_api_patch( - "cephfilesystems/{0}".format(svc_id), - body=patch) - except ApiException as e: - log.exception("API exception: {0}".format(e)) - raise ApplyException( - "Failed to update NFS server count for {0}: {1}".format(svc_id, e)) - return "Updated NFS server count for {0} to {1}".format(svc_id, newcount) + def _update_nfs_count(current, new): + # type: (cfs.CephFilesystem, cfs.CephFilesystem) -> cfs.CephFilesystem + new.spec.metadataServer.activeCount = newcount + return new + return self._patch(cnfs.CephNFS, 'cephnfses', svc_id, _update_nfs_count) def update_nfs_count(self, svc_id, newcount): - patch = [{"op": "replace", "path": "/spec/server/active", "value": newcount}] - - try: - self.rook_api_patch( - "cephnfses/{0}".format(svc_id), - body=patch) - except ApiException as e: - log.exception("API exception: {0}".format(e)) - raise ApplyException( - "Failed to update NFS server count for {0}: {1}".format(svc_id, e)) - return "Updated NFS server count for {0} to {1}".format(svc_id, newcount) + def _update_nfs_count(current, new): + # type: (cnfs.CephNFS, cnfs.CephNFS) -> cnfs.CephNFS + new.spec.server.active = newcount + return new + return self._patch(cnfs.CephNFS, 'cephnfses',svc_id, _update_nfs_count) def add_osds(self, drive_group, all_hosts): # type: (orchestrator.DriveGroupSpec, List[str]) -> str @@ -499,80 +480,82 @@ class RookCluster(object): assert drive_group.objectstore in ("bluestore", "filestore") - # The CRD looks something like this: - # nodes: - # - name: "gravel1.rockery" - # devices: - # - name: "sdb" - # config: - # storeType: bluestore - - current_cluster = self.rook_api_get( - "cephclusters/{0}".format(self.rook_env.cluster_name)) - - patch = [] - - # FIXME: this is all not really atomic, because jsonpatch doesn't - # let us do "test" operations that would check if items with - # matching names were in existing lists. - - if 'nodes' not in current_cluster['spec']['storage']: - patch.append({ - 'op': 'add', 'path': '/spec/storage/nodes', 'value': [] - }) - - current_nodes = current_cluster['spec']['storage'].get('nodes', []) - - if drive_group.hosts(all_hosts)[0] not in [n['name'] for n in current_nodes]: - pd = { "name": drive_group.hosts(all_hosts)[0], - "config": { "storeType": drive_group.objectstore }} # type: dict - - if block_devices: - pd["devices"] = [{'name': d.path} for d in block_devices] - if directories: - pd["directories"] = [{'path': p} for p in directories] - - patch.append({ "op": "add", "path": "/spec/storage/nodes/-", "value": pd }) # type: ignore - else: - # Extend existing node - node_idx = None - current_node = None - for i, c in enumerate(current_nodes): - if c['name'] == drive_group.hosts(all_hosts)[0]: - current_node = c - node_idx = i - break - - assert node_idx is not None - assert current_node is not None - - new_devices = list(set(block_devices) - set([d['name'] for d in current_node['devices']])) - for n in new_devices: - patch.append({ - "op": "add", - "path": "/spec/storage/nodes/{0}/devices/-".format(node_idx), - "value": {'name': n.path} # type: ignore - }) - if directories: - new_dirs = list(set(directories) - set(current_node['directories'])) - for p in new_dirs: - patch.append({ - "op": "add", - "path": "/spec/storage/nodes/{0}/directories/-".format(node_idx), - "value": {'path': p} # type: ignore - }) + def _add_osds(current_cluster, new_cluster): + # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster + + # FIXME: this is all not really atomic, because jsonpatch doesn't + # let us do "test" operations that would check if items with + # matching names were in existing lists. + + if not hasattr(new_cluster.spec.storage, 'nodes'): + new_cluster.spec.storage.nodes = ccl.NodesList() + + current_nodes = getattr(current_cluster.spec.storage, 'nodes', ccl.NodesList()) + + if drive_group.hosts(all_hosts)[0] not in [n.name for n in current_nodes]: + pd = ccl.NodesItem( + name=drive_group.hosts(all_hosts)[0], + config=ccl.Config( + storeType=drive_group.objectstore + ) + ) + + if block_devices: + pd.devices = ccl.DevicesList( + ccl.DevicesItem(name=d.path) for d in block_devices + ) + if directories: + pd.directories = ccl.DirectoriesList( + ccl.DirectoriesItem(path=p) for p in directories + ) + new_cluster.spec.storage.nodes.append(pd) + else: + for _node in new_cluster.spec.storage.nodes: + current_node = _node # type: ccl.NodesItem + if current_node.name == drive_group.hosts(all_hosts)[0]: + if block_devices: + if not hasattr(current_node, 'devices'): + current_node.devices = ccl.DevicesList() + new_devices = list(set(block_devices) - set([d.name for d in current_node.devices])) + current_node.devices.extend( + ccl.DevicesItem(name=n.path) for n in new_devices + ) + + if directories: + if not hasattr(current_node, 'directories'): + current_node.directories = ccl.DirectoriesList() + new_dirs = list(set(directories) - set([d.path for d in current_node.directories])) + current_node.directories.extend( + ccl.DirectoriesItem(path=n) for n in new_dirs + ) + return new_cluster + + return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds) + + def _patch(self, crd, crd_name, cr_name, func): + current_json = self.rook_api_get( + "{}/{}".format(crd_name, cr_name) + ) + + current = crd.from_json(current_json) + new = crd.from_json(current_json) # no deepcopy. + + new = func(current, new) + + patch = list(jsonpatch.make_patch(current_json, new.to_json())) + + log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch)) if len(patch) == 0: return "No change" try: self.rook_api_patch( - "cephclusters/{0}".format(self.rook_env.cluster_name), + "{}/{}".format(crd_name, cr_name), body=patch) except ApiException as e: log.exception("API exception: {0}".format(e)) raise ApplyException( - "Failed to create OSD entries in Cluster CRD: {0}".format( - e)) + "Failed to update {}/{}: {}".format(crd_name, cr_name, e)) return "Success" -- 2.47.3