import os
from ceph.deployment import inventory
-from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec
+from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec
try:
from typing import List, Dict, Optional, Callable, Any
spec = {}
spec['mon'] = orchestrator.ServiceDescription(
service_name='mon',
- spec=orchestrator.ServiceSpec(
+ spec=ServiceSpec(
'mon',
- placement=orchestrator.PlacementSpec(
+ placement=PlacementSpec(
count=cl['spec'].get('mon', {}).get('count', 1),
),
),
)
spec['mgr'] = orchestrator.ServiceDescription(
service_name='mgr',
- spec=orchestrator.ServiceSpec(
+ spec=ServiceSpec(
'mgr',
- placement=orchestrator.PlacementSpec.from_string('count:1'),
+ placement=PlacementSpec.from_string('count:1'),
),
size=1,
container_image_name=image_name,
if not cl['spec'].get('crashCollector', {}).get('disable', False):
spec['crash'] = orchestrator.ServiceDescription(
service_name='crash',
- spec=orchestrator.ServiceSpec(
+ spec=ServiceSpec(
'crash',
- placement=orchestrator.PlacementSpec.from_string('all:true'),
+ placement=PlacementSpec.from_string('all:true'),
),
size=num_nodes,
container_image_name=image_name,
total_mds = active * 2
spec[svc] = orchestrator.ServiceDescription(
service_name=svc,
- spec=orchestrator.ServiceSpec(
+ spec=ServiceSpec(
svc,
- placement=orchestrator.PlacementSpec(count=active),
+ placement=PlacementSpec(count=active),
),
size=total_mds,
container_image_name=image_name,
mgr=self
)
- def add_mds(self, spec):
- # type: (ServiceSpec) -> RookCompletion
- return self._service_add_decorate('MDS', spec,
- self.rook_cluster.add_filesystem)
-
def add_rgw(self, spec):
# type: (RGWSpec) -> RookCompletion
return self._service_add_decorate('RGW', spec,
def apply_mds(self, spec):
# type: (ServiceSpec) -> RookCompletion
- num = spec.placement.count
- return write_completion(
- lambda: self.rook_cluster.update_mds_count(spec.service_id, num),
- "Updating MDS server count in {0} to {1}".format(spec.service_id, num),
- mgr=self
- )
+ return self._service_add_decorate('MDS', spec,
+ self.rook_cluster.apply_filesystem)
def apply_nfs(self, spec):
# type: (NFSServiceSpec) -> RookCompletion
else:
raise
- def add_filesystem(self, spec):
+ def apply_filesystem(self, spec):
# type: (ServiceSpec) -> None
# TODO use spec.placement
# TODO warn if spec.extended has entries we don't kow how
# to action.
+ def _update_fs(current, new):
+ # type: (cfs.CephFilesystem, cfs.CephFilesystem) -> cfs.CephFilesystem
+ new.spec.metadataServer.activeCount = spec.placement.count or 1
+ return new
- rook_fs = cfs.CephFilesystem(
- apiVersion=self.rook_env.api_name,
- metadata=dict(
- name=spec.service_id,
- namespace=self.rook_env.namespace,
- ),
- spec=cfs.Spec(
- metadataServer=cfs.MetadataServer(
- activeCount=spec.placement.count,
- activeStandby=True
+ def _create_fs():
+ # type: () -> cfs.CephFilesystem
+ return cfs.CephFilesystem(
+ apiVersion=self.rook_env.api_name,
+ metadata=dict(
+ name=spec.service_id,
+ namespace=self.rook_env.namespace,
+ ),
+ spec=cfs.Spec(
+ metadataServer=cfs.MetadataServer(
+ activeCount=spec.placement.count or 1,
+ activeStandby=True
+ )
)
)
- )
-
- with self.ignore_409("CephFilesystem '{0}' already exists".format(spec.service_id)):
- self.rook_api_post("cephfilesystems/", body=rook_fs.to_json())
+ return self._create_or_patch(
+ cfs.CephFilesystem, 'cephfilesystems', spec.service_id,
+ _update_fs, _create_fs)
def add_nfsgw(self, spec):
# TODO use spec.placement
return new
return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count)
- def update_mds_count(self, svc_id, newcount):
- def _update_nfs_count(current, new):
- # type: (cfs.CephFilesystem, cfs.CephFilesystem) -> cfs.CephFilesystem
- new.spec.metadataServer.activeCount = newcount
- return new
- return self._patch(cnfs.CephNFS, 'cephnfses', svc_id, _update_nfs_count)
-
def update_nfs_count(self, svc_id, newcount):
def _update_nfs_count(current, new):
# type: (cnfs.CephNFS, cnfs.CephNFS) -> cnfs.CephNFS
"Failed to update {}/{}: {}".format(crd_name, cr_name, e))
return "Success"
+
+ def _create_or_patch(self, crd, crd_name, cr_name, update_func, create_func):
+ try:
+ current_json = self.rook_api_get(
+ "{}/{}".format(crd_name, cr_name)
+ )
+ except ApiException as e:
+ if e.status == 404:
+ current_json = None
+ else:
+ raise
+
+ if current_json:
+ current = crd.from_json(current_json)
+ new = crd.from_json(current_json) # no deepcopy.
+
+ new = update_func(current, new)
+
+ patch = list(jsonpatch.make_patch(current_json, new.to_json()))
+
+ log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch))
+
+ if len(patch) == 0:
+ return "No change"
+
+ try:
+ self.rook_api_patch(
+ "{}/{}".format(crd_name, cr_name),
+ body=patch)
+ except ApiException as e:
+ log.exception("API exception: {0}".format(e))
+ raise ApplyException(
+ "Failed to update {}/{}: {}".format(crd_name, cr_name, e))
+ return "Updated"
+ else:
+ new = create_func()
+ with self.ignore_409("{} {} already exists".format(crd_name,
+ cr_name)):
+ self.rook_api_post("{}/".format(crd_name),
+ body=new.to_json())
+ return "Created"