import json
from contextlib import contextmanager
+import jsonpatch
from six.moves.urllib.parse import urljoin # pylint: disable=import-error
# Optional kubernetes imports to enable MgrModule.can_run
class ApiException(Exception): # type: ignore
status = 0
+from .rook_client.ceph import cephfilesystem as cfs
+from .rook_client.ceph import cephnfs as cnfs
+from .rook_client.ceph import cephobjectstore as cos
+from .rook_client.ceph import cephcluster as ccl
+
import orchestrator
# TODO warn if spec.extended has entries we don't kow how
# to action.
- rook_fs = {
- "apiVersion": self.rook_env.api_name,
- "kind": "CephFilesystem",
- "metadata": {
- "name": spec.name,
- "namespace": self.rook_env.namespace
- },
- "spec": {
- "onlyManageDaemons": True,
- "metadataServer": {
- "activeCount": spec.count,
- "activeStandby": True
-
- }
- }
- }
+ rook_fs = cfs.CephFilesystem(
+ apiVersion=self.rook_env.api_name,
+ metadata=dict(
+ name=spec.name,
+ namespace=self.rook_env.namespace,
+ ),
+ spec=cfs.Spec(
+ metadataServer=cfs.MetadataServer(
+ activeCount=spec.count,
+ activeStandby=True
+ )
+ )
+ )
with self.ignore_409("CephFilesystem '{0}' already exists".format(spec.name)):
- self.rook_api_post("cephfilesystems/", body=rook_fs)
+ self.rook_api_post("cephfilesystems/", body=rook_fs.to_json())
def add_nfsgw(self, spec):
- # type: (orchestrator.NFSServiceSpec) -> None
# TODO use spec.placement
+ # TODO warn if spec.extended has entries we don't kow how
+ # to action.
- rook_nfsgw = {
- "apiVersion": self.rook_env.api_name,
- "kind": "CephNFS",
- "metadata": {
- "name": spec.name,
- "namespace": self.rook_env.namespace
- },
- "spec": {
- "rados": {
- "pool": spec.pool
- },
- "server": {
- "active": spec.count,
- }
- }
- }
+ rook_nfsgw = cnfs.CephNFS(
+ apiVersion=self.rook_env.api_name,
+ metadata=dict(
+ name=spec.name,
+ namespace=self.rook_env.namespace,
+ ),
+ spec=cnfs.Spec(
+ rados=cnfs.Rados(
+ pool=spec.pool
+ ),
+ server=cnfs.Server(
+ active=spec.count
+ )
+ )
+ )
if spec.namespace:
- rook_nfsgw["spec"]["rados"]["namespace"] = spec.namespace # type: ignore
+ rook_nfsgw.spec.rados.namespace = spec.namespace
with self.ignore_409("NFS cluster '{0}' already exists".format(spec.name)):
- self.rook_api_post("cephnfses/", body=rook_nfsgw)
+ self.rook_api_post("cephnfses/", body=rook_nfsgw.to_json())
def add_objectstore(self, spec):
- # type: (orchestrator.RGWSpec) -> None
- rook_os = {
- "apiVersion": self.rook_env.api_name,
- "kind": "CephObjectStore",
- "metadata": {
- "name": spec.name,
- "namespace": self.rook_env.namespace
- },
- "spec": {
- "metadataPool": {
- "failureDomain": "host",
- "replicated": {
- "size": 1
- }
- },
- "dataPool": {
- "failureDomain": "osd",
- "replicated": {
- "size": 1
- }
- },
- "gateway": {
- "type": "s3",
- "port": spec.rgw_frontend_port if spec.rgw_frontend_port is not None else 80,
- "instances": spec.count,
- "allNodes": False
- }
- }
- }
+ rook_os = cos.CephObjectStore(
+ apiVersion=self.rook_env.api_name,
+ metadata=dict(
+ name=spec.name,
+ namespace=self.rook_env.namespace
+ ),
+ spec=cos.Spec(
+ metadataPool=cos.MetadataPool(
+ failureDomain='host',
+ replicated=cos.Replicated(
+ size=1
+ )
+ ),
+ dataPool=cos.DataPool(
+ failureDomain='osd',
+ replicated=cos.Replicated(
+ size=1
+ )
+ ),
+ gateway=cos.Gateway(
+ type='s3',
+ port=spec.rgw_frontend_port if spec.rgw_frontend_port is not None else 80,
+ instances=spec.count
+ )
+ )
+ )
+
with self.ignore_409("CephObjectStore '{0}' already exists".format(spec.name)):
- self.rook_api_post("cephobjectstores/", body=rook_os)
+ self.rook_api_post("cephobjectstores/", body=rook_os.to_json())
def rm_service(self, rooktype, service_id):
return node_name in self.get_node_names()
def update_mon_count(self, newcount):
- patch = [{"op": "replace", "path": "/spec/mon/count", "value": newcount}]
-
- try:
- self.rook_api_patch(
- "cephclusters/{0}".format(self.rook_env.cluster_name),
- body=patch)
- except ApiException as e:
- log.exception("API exception: {0}".format(e))
- raise ApplyException(
- "Failed to update mon count in Cluster CRD: {0}".format(e))
-
- return "Updated mon count to {0}".format(newcount)
+ def _update_mon_count(current, new):
+ # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
+ new.spec.mon.count = newcount
+ return new
+ return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count)
def update_mds_count(self, svc_id, newcount):
- patch = [{"op": "replace", "path": "/spec/metadataServer/activeCount",
- "value": newcount}]
-
- try:
- self.rook_api_patch(
- "cephfilesystems/{0}".format(svc_id),
- body=patch)
- except ApiException as e:
- log.exception("API exception: {0}".format(e))
- raise ApplyException(
- "Failed to update NFS server count for {0}: {1}".format(svc_id, e))
- return "Updated NFS server count for {0} to {1}".format(svc_id, newcount)
+ def _update_nfs_count(current, new):
+ # type: (cfs.CephFilesystem, cfs.CephFilesystem) -> cfs.CephFilesystem
+ new.spec.metadataServer.activeCount = newcount
+ return new
+ return self._patch(cnfs.CephNFS, 'cephnfses', svc_id, _update_nfs_count)
def update_nfs_count(self, svc_id, newcount):
- patch = [{"op": "replace", "path": "/spec/server/active", "value": newcount}]
-
- try:
- self.rook_api_patch(
- "cephnfses/{0}".format(svc_id),
- body=patch)
- except ApiException as e:
- log.exception("API exception: {0}".format(e))
- raise ApplyException(
- "Failed to update NFS server count for {0}: {1}".format(svc_id, e))
- return "Updated NFS server count for {0} to {1}".format(svc_id, newcount)
+ def _update_nfs_count(current, new):
+ # type: (cnfs.CephNFS, cnfs.CephNFS) -> cnfs.CephNFS
+ new.spec.server.active = newcount
+ return new
+ return self._patch(cnfs.CephNFS, 'cephnfses',svc_id, _update_nfs_count)
def add_osds(self, drive_group, all_hosts):
# type: (orchestrator.DriveGroupSpec, List[str]) -> str
assert drive_group.objectstore in ("bluestore", "filestore")
- # The CRD looks something like this:
- # nodes:
- # - name: "gravel1.rockery"
- # devices:
- # - name: "sdb"
- # config:
- # storeType: bluestore
-
- current_cluster = self.rook_api_get(
- "cephclusters/{0}".format(self.rook_env.cluster_name))
-
- patch = []
-
- # FIXME: this is all not really atomic, because jsonpatch doesn't
- # let us do "test" operations that would check if items with
- # matching names were in existing lists.
-
- if 'nodes' not in current_cluster['spec']['storage']:
- patch.append({
- 'op': 'add', 'path': '/spec/storage/nodes', 'value': []
- })
-
- current_nodes = current_cluster['spec']['storage'].get('nodes', [])
-
- if drive_group.hosts(all_hosts)[0] not in [n['name'] for n in current_nodes]:
- pd = { "name": drive_group.hosts(all_hosts)[0],
- "config": { "storeType": drive_group.objectstore }} # type: dict
-
- if block_devices:
- pd["devices"] = [{'name': d.path} for d in block_devices]
- if directories:
- pd["directories"] = [{'path': p} for p in directories]
-
- patch.append({ "op": "add", "path": "/spec/storage/nodes/-", "value": pd }) # type: ignore
- else:
- # Extend existing node
- node_idx = None
- current_node = None
- for i, c in enumerate(current_nodes):
- if c['name'] == drive_group.hosts(all_hosts)[0]:
- current_node = c
- node_idx = i
- break
-
- assert node_idx is not None
- assert current_node is not None
-
- new_devices = list(set(block_devices) - set([d['name'] for d in current_node['devices']]))
- for n in new_devices:
- patch.append({
- "op": "add",
- "path": "/spec/storage/nodes/{0}/devices/-".format(node_idx),
- "value": {'name': n.path} # type: ignore
- })
- if directories:
- new_dirs = list(set(directories) - set(current_node['directories']))
- for p in new_dirs:
- patch.append({
- "op": "add",
- "path": "/spec/storage/nodes/{0}/directories/-".format(node_idx),
- "value": {'path': p} # type: ignore
- })
+ def _add_osds(current_cluster, new_cluster):
+ # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
+
+ # FIXME: this is all not really atomic, because jsonpatch doesn't
+ # let us do "test" operations that would check if items with
+ # matching names were in existing lists.
+
+ if not hasattr(new_cluster.spec.storage, 'nodes'):
+ new_cluster.spec.storage.nodes = ccl.NodesList()
+
+ current_nodes = getattr(current_cluster.spec.storage, 'nodes', ccl.NodesList())
+
+ if drive_group.hosts(all_hosts)[0] not in [n.name for n in current_nodes]:
+ pd = ccl.NodesItem(
+ name=drive_group.hosts(all_hosts)[0],
+ config=ccl.Config(
+ storeType=drive_group.objectstore
+ )
+ )
+
+ if block_devices:
+ pd.devices = ccl.DevicesList(
+ ccl.DevicesItem(name=d.path) for d in block_devices
+ )
+ if directories:
+ pd.directories = ccl.DirectoriesList(
+ ccl.DirectoriesItem(path=p) for p in directories
+ )
+ new_cluster.spec.storage.nodes.append(pd)
+ else:
+ for _node in new_cluster.spec.storage.nodes:
+ current_node = _node # type: ccl.NodesItem
+ if current_node.name == drive_group.hosts(all_hosts)[0]:
+ if block_devices:
+ if not hasattr(current_node, 'devices'):
+ current_node.devices = ccl.DevicesList()
+ new_devices = list(set(block_devices) - set([d.name for d in current_node.devices]))
+ current_node.devices.extend(
+ ccl.DevicesItem(name=n.path) for n in new_devices
+ )
+
+ if directories:
+ if not hasattr(current_node, 'directories'):
+ current_node.directories = ccl.DirectoriesList()
+ new_dirs = list(set(directories) - set([d.path for d in current_node.directories]))
+ current_node.directories.extend(
+ ccl.DirectoriesItem(path=n) for n in new_dirs
+ )
+ return new_cluster
+
+ return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds)
+
+ def _patch(self, crd, crd_name, cr_name, func):
+ current_json = self.rook_api_get(
+ "{}/{}".format(crd_name, cr_name)
+ )
+
+ current = crd.from_json(current_json)
+ new = crd.from_json(current_json) # no deepcopy.
+
+ new = func(current, new)
+
+ patch = list(jsonpatch.make_patch(current_json, new.to_json()))
+
+ log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch))
if len(patch) == 0:
return "No change"
try:
self.rook_api_patch(
- "cephclusters/{0}".format(self.rook_env.cluster_name),
+ "{}/{}".format(crd_name, cr_name),
body=patch)
except ApiException as e:
log.exception("API exception: {0}".format(e))
raise ApplyException(
- "Failed to create OSD entries in Cluster CRD: {0}".format(
- e))
+ "Failed to update {}/{}: {}".format(crd_name, cr_name, e))
return "Success"