]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/rook: Make use of rook-client-python when talking to Rook 29427/head
authorSebastian Wagner <sebastian.wagner@suse.com>
Mon, 13 Jan 2020 12:01:20 +0000 (13:01 +0100)
committerSebastian Wagner <sebastian.wagner@suse.com>
Thu, 13 Feb 2020 10:34:50 +0000 (11:34 +0100)
Fixes:

* `CephFilesystem.spec.onlyManageDaemons` does not exist
* `CephObjectStroe.spec.gateway.allNodes` does not exist
* Added directory-osds to existsing nodes was broken

Signed-off-by: Sebastian Wagner <sebastian.wagner@suse.com>
src/pybind/mgr/rook/requirements.txt [new file with mode: 0644]
src/pybind/mgr/rook/rook_cluster.py

diff --git a/src/pybind/mgr/rook/requirements.txt b/src/pybind/mgr/rook/requirements.txt
new file mode 100644 (file)
index 0000000..378de08
--- /dev/null
@@ -0,0 +1,2 @@
+kubernetes
+jsonpatch
\ No newline at end of file
index bb667f0703f0f69a7b008b7ad012964c463f0638..60ebf1e5b6951b000ddc9bab8d8985ec0ccbef5f 100644 (file)
@@ -11,6 +11,7 @@ import logging
 import json
 from contextlib import contextmanager
 
+import jsonpatch
 from six.moves.urllib.parse import urljoin  # pylint: disable=import-error
 
 # Optional kubernetes imports to enable MgrModule.can_run
@@ -32,6 +33,11 @@ except ImportError:
     class ApiException(Exception):  # type: ignore
         status = 0
 
+from .rook_client.ceph import cephfilesystem as cfs
+from .rook_client.ceph import cephnfs as cnfs
+from .rook_client.ceph import cephobjectstore as cos
+from .rook_client.ceph import cephcluster as ccl
+
 
 import orchestrator
 
@@ -341,86 +347,81 @@ class RookCluster(object):
         # TODO warn if spec.extended has entries we don't kow how
         #      to action.
 
-        rook_fs = {
-            "apiVersion": self.rook_env.api_name,
-            "kind": "CephFilesystem",
-            "metadata": {
-                "name": spec.name,
-                "namespace": self.rook_env.namespace
-            },
-            "spec": {
-                "onlyManageDaemons": True,
-                "metadataServer": {
-                    "activeCount": spec.count,
-                    "activeStandby": True
-
-                }
-            }
-        }
+        rook_fs = cfs.CephFilesystem(
+            apiVersion=self.rook_env.api_name,
+            metadata=dict(
+                name=spec.name,
+                namespace=self.rook_env.namespace,
+            ),
+            spec=cfs.Spec(
+                metadataServer=cfs.MetadataServer(
+                    activeCount=spec.count,
+                    activeStandby=True
+                )
+            )
+        )
 
         with self.ignore_409("CephFilesystem '{0}' already exists".format(spec.name)):
-            self.rook_api_post("cephfilesystems/", body=rook_fs)
+            self.rook_api_post("cephfilesystems/", body=rook_fs.to_json())
 
     def add_nfsgw(self, spec):
-        # type: (orchestrator.NFSServiceSpec) -> None
         # TODO use spec.placement
+        # TODO warn if spec.extended has entries we don't kow how
+        #      to action.
 
-        rook_nfsgw = {
-            "apiVersion": self.rook_env.api_name,
-            "kind": "CephNFS",
-            "metadata": {
-                "name": spec.name,
-                "namespace": self.rook_env.namespace
-            },
-            "spec": {
-                "rados": {
-                    "pool": spec.pool
-                },
-                "server": {
-                    "active": spec.count,
-                }
-            }
-        }
+        rook_nfsgw = cnfs.CephNFS(
+            apiVersion=self.rook_env.api_name,
+            metadata=dict(
+                name=spec.name,
+                namespace=self.rook_env.namespace,
+            ),
+            spec=cnfs.Spec(
+                rados=cnfs.Rados(
+                    pool=spec.pool
+                ),
+                server=cnfs.Server(
+                    active=spec.count
+                )
+            )
+        )
 
         if spec.namespace:
-            rook_nfsgw["spec"]["rados"]["namespace"] = spec.namespace  # type: ignore
+            rook_nfsgw.spec.rados.namespace = spec.namespace
 
         with self.ignore_409("NFS cluster '{0}' already exists".format(spec.name)):
-            self.rook_api_post("cephnfses/", body=rook_nfsgw)
+            self.rook_api_post("cephnfses/", body=rook_nfsgw.to_json())
 
     def add_objectstore(self, spec):
-        # type: (orchestrator.RGWSpec) -> None
-        rook_os = {
-            "apiVersion": self.rook_env.api_name,
-            "kind": "CephObjectStore",
-            "metadata": {
-                "name": spec.name,
-                "namespace": self.rook_env.namespace
-            },
-            "spec": {
-                "metadataPool": {
-                    "failureDomain": "host",
-                    "replicated": {
-                        "size": 1
-                    }
-                },
-                "dataPool": {
-                    "failureDomain": "osd",
-                    "replicated": {
-                        "size": 1
-                    }
-                },
-                "gateway": {
-                    "type": "s3",
-                    "port": spec.rgw_frontend_port if spec.rgw_frontend_port is not None else 80,
-                    "instances": spec.count,
-                    "allNodes": False
-                }
-            }
-        }
 
+        rook_os = cos.CephObjectStore(
+            apiVersion=self.rook_env.api_name,
+            metadata=dict(
+                name=spec.name,
+                namespace=self.rook_env.namespace
+            ),
+            spec=cos.Spec(
+                metadataPool=cos.MetadataPool(
+                    failureDomain='host',
+                    replicated=cos.Replicated(
+                        size=1
+                    )
+                ),
+                dataPool=cos.DataPool(
+                    failureDomain='osd',
+                    replicated=cos.Replicated(
+                        size=1
+                    )
+                ),
+                gateway=cos.Gateway(
+                    type='s3',
+                    port=spec.rgw_frontend_port if spec.rgw_frontend_port is not None else 80,
+                    instances=spec.count
+                )
+            )
+        )
+        
         with self.ignore_409("CephObjectStore '{0}' already exists".format(spec.name)):
-            self.rook_api_post("cephobjectstores/", body=rook_os)
+            self.rook_api_post("cephobjectstores/", body=rook_os.to_json())
 
     def rm_service(self, rooktype, service_id):
 
@@ -448,45 +449,25 @@ class RookCluster(object):
         return node_name in self.get_node_names()
 
     def update_mon_count(self, newcount):
-        patch = [{"op": "replace", "path": "/spec/mon/count", "value": newcount}]
-
-        try:
-            self.rook_api_patch(
-                "cephclusters/{0}".format(self.rook_env.cluster_name),
-                body=patch)
-        except ApiException as e:
-            log.exception("API exception: {0}".format(e))
-            raise ApplyException(
-                "Failed to update mon count in Cluster CRD: {0}".format(e))
-
-        return "Updated mon count to {0}".format(newcount)
+        def _update_mon_count(current, new):
+            # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
+            new.spec.mon.count = newcount
+            return new
+        return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count)
 
     def update_mds_count(self, svc_id, newcount):
-        patch = [{"op": "replace", "path": "/spec/metadataServer/activeCount",
-                  "value": newcount}]
-
-        try:
-            self.rook_api_patch(
-                "cephfilesystems/{0}".format(svc_id),
-                body=patch)
-        except ApiException as e:
-            log.exception("API exception: {0}".format(e))
-            raise ApplyException(
-                "Failed to update NFS server count for {0}: {1}".format(svc_id, e))
-        return "Updated NFS server count for {0} to {1}".format(svc_id, newcount)
+        def _update_nfs_count(current, new):
+            # type: (cfs.CephFilesystem, cfs.CephFilesystem) -> cfs.CephFilesystem
+            new.spec.metadataServer.activeCount = newcount
+            return new
+        return self._patch(cnfs.CephNFS, 'cephnfses', svc_id, _update_nfs_count)
 
     def update_nfs_count(self, svc_id, newcount):
-        patch = [{"op": "replace", "path": "/spec/server/active", "value": newcount}]
-
-        try:
-            self.rook_api_patch(
-                "cephnfses/{0}".format(svc_id),
-                body=patch)
-        except ApiException as e:
-            log.exception("API exception: {0}".format(e))
-            raise ApplyException(
-                "Failed to update NFS server count for {0}: {1}".format(svc_id, e))
-        return "Updated NFS server count for {0} to {1}".format(svc_id, newcount)
+        def _update_nfs_count(current, new):
+            # type: (cnfs.CephNFS, cnfs.CephNFS) -> cnfs.CephNFS
+            new.spec.server.active = newcount
+            return new
+        return self._patch(cnfs.CephNFS, 'cephnfses',svc_id, _update_nfs_count)
 
     def add_osds(self, drive_group, all_hosts):
         # type: (orchestrator.DriveGroupSpec, List[str]) -> str
@@ -499,80 +480,82 @@ class RookCluster(object):
 
         assert drive_group.objectstore in ("bluestore", "filestore")
 
-        # The CRD looks something like this:
-        #     nodes:
-        #       - name: "gravel1.rockery"
-        #         devices:
-        #          - name: "sdb"
-        #         config:
-        #           storeType: bluestore
-
-        current_cluster = self.rook_api_get(
-            "cephclusters/{0}".format(self.rook_env.cluster_name))
-
-        patch = []
-
-        # FIXME: this is all not really atomic, because jsonpatch doesn't
-        # let us do "test" operations that would check if items with
-        # matching names were in existing lists.
-
-        if 'nodes' not in current_cluster['spec']['storage']:
-            patch.append({
-                'op': 'add', 'path': '/spec/storage/nodes', 'value': []
-            })
-
-        current_nodes = current_cluster['spec']['storage'].get('nodes', [])
-
-        if drive_group.hosts(all_hosts)[0] not in [n['name'] for n in current_nodes]:
-            pd = { "name": drive_group.hosts(all_hosts)[0],
-                   "config": { "storeType": drive_group.objectstore }}  # type: dict
-
-            if block_devices:
-                pd["devices"] = [{'name': d.path} for d in block_devices]
-            if directories:
-                pd["directories"] = [{'path': p} for p in directories]
-
-            patch.append({ "op": "add", "path": "/spec/storage/nodes/-", "value": pd })  # type: ignore
-        else:
-            # Extend existing node
-            node_idx = None
-            current_node = None
-            for i, c in enumerate(current_nodes):
-                if c['name'] == drive_group.hosts(all_hosts)[0]:
-                    current_node = c
-                    node_idx = i
-                    break
-
-            assert node_idx is not None
-            assert current_node is not None
-
-            new_devices = list(set(block_devices) - set([d['name'] for d in current_node['devices']]))
-            for n in new_devices:
-                patch.append({
-                    "op": "add",
-                    "path": "/spec/storage/nodes/{0}/devices/-".format(node_idx),
-                    "value": {'name': n.path}  # type: ignore
-                })
-            if directories:
-                new_dirs = list(set(directories) - set(current_node['directories']))
-                for p in new_dirs:
-                    patch.append({
-                        "op": "add",
-                        "path": "/spec/storage/nodes/{0}/directories/-".format(node_idx),
-                        "value": {'path': p}  # type: ignore
-                    })
+        def _add_osds(current_cluster, new_cluster):
+            # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
+
+            # FIXME: this is all not really atomic, because jsonpatch doesn't
+            # let us do "test" operations that would check if items with
+            # matching names were in existing lists.
+
+            if not hasattr(new_cluster.spec.storage, 'nodes'):
+                new_cluster.spec.storage.nodes = ccl.NodesList()
+
+            current_nodes = getattr(current_cluster.spec.storage, 'nodes', ccl.NodesList())
+
+            if drive_group.hosts(all_hosts)[0] not in [n.name for n in current_nodes]:
+                pd = ccl.NodesItem(
+                    name=drive_group.hosts(all_hosts)[0],
+                    config=ccl.Config(
+                        storeType=drive_group.objectstore
+                    )
+                )
+
+                if block_devices:
+                    pd.devices = ccl.DevicesList(
+                        ccl.DevicesItem(name=d.path) for d in block_devices
+                    )
+                if directories:
+                    pd.directories = ccl.DirectoriesList(
+                        ccl.DirectoriesItem(path=p) for p in directories
+                    )
+                new_cluster.spec.storage.nodes.append(pd)
+            else:
+                for _node in new_cluster.spec.storage.nodes:
+                    current_node = _node  # type: ccl.NodesItem
+                    if current_node.name == drive_group.hosts(all_hosts)[0]:
+                        if block_devices:
+                            if not hasattr(current_node, 'devices'):
+                                current_node.devices = ccl.DevicesList()
+                            new_devices = list(set(block_devices) - set([d.name for d in current_node.devices]))
+                            current_node.devices.extend(
+                                ccl.DevicesItem(name=n.path) for n in new_devices
+                            )
+
+                        if directories:
+                            if not hasattr(current_node, 'directories'):
+                                current_node.directories = ccl.DirectoriesList()
+                            new_dirs = list(set(directories) - set([d.path for d in current_node.directories]))
+                            current_node.directories.extend(
+                                ccl.DirectoriesItem(path=n) for n in new_dirs
+                            )
+            return new_cluster
+
+        return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds)
+
+    def _patch(self, crd, crd_name, cr_name, func):
+        current_json = self.rook_api_get(
+            "{}/{}".format(crd_name, cr_name)
+        )
+
+        current = crd.from_json(current_json)
+        new = crd.from_json(current_json)  # no deepcopy.
+
+        new = func(current, new)
+
+        patch = list(jsonpatch.make_patch(current_json, new.to_json()))
+
+        log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch))
 
         if len(patch) == 0:
             return "No change"
 
         try:
             self.rook_api_patch(
-                "cephclusters/{0}".format(self.rook_env.cluster_name),
+                "{}/{}".format(crd_name, cr_name),
                 body=patch)
         except ApiException as e:
             log.exception("API exception: {0}".format(e))
             raise ApplyException(
-                "Failed to create OSD entries in Cluster CRD: {0}".format(
-                    e))
+                "Failed to update {}/{}: {}".format(crd_name, cr_name, e))
 
         return "Success"