return result
- def _get_pool_params(self) -> Tuple[str, str]:
+ def _get_pool_params(self) -> Tuple[int, str]:
num_replicas = self.get_ceph_option('osd_pool_default_size')
assert type(num_replicas) is int
@handle_orch_error
def apply_mds(self, spec):
# type: (ServiceSpec) -> str
- return self.rook_cluster.apply_filesystem(spec)
+ num_replicas, leaf_type = self._get_pool_params()
+ return self.rook_cluster.apply_filesystem(spec, num_replicas, leaf_type)
@handle_orch_error
def apply_rgw(self, spec):
else:
raise
- def apply_filesystem(self, spec: ServiceSpec) -> str:
+ def apply_filesystem(self, spec: ServiceSpec, num_replicas: int,
+ leaf_type: str) -> str:
# TODO use spec.placement
# TODO warn if spec.extended has entries we don't kow how
# to action.
+ all_hosts = self.get_hosts()
def _update_fs(new: cfs.CephFilesystem) -> cfs.CephFilesystem:
new.spec.metadataServer.activeCount = spec.placement.count or 1
+ new.spec.metadataServer.placement = cfs.Placement(
+ nodeAffinity=cfs.NodeAffinity(
+ requiredDuringSchedulingIgnoredDuringExecution=cfs.RequiredDuringSchedulingIgnoredDuringExecution(
+ nodeSelectorTerms=cfs.NodeSelectorTermsList(
+ [placement_spec_to_node_selector(spec.placement, all_hosts)]
+ )
+ )
+ )
+ )
return new
-
def _create_fs() -> cfs.CephFilesystem:
- return cfs.CephFilesystem(
+ fs = cfs.CephFilesystem(
apiVersion=self.rook_env.api_name,
metadata=dict(
name=spec.service_id,
namespace=self.rook_env.namespace,
),
spec=cfs.Spec(
- None,
- None,
+ dataPools=cfs.DataPoolsList(
+ {
+ cfs.DataPoolsItem(
+ failureDomain=leaf_type,
+ replicated=cfs.Replicated(
+ size=num_replicas
+ )
+ )
+ }
+ ),
+ metadataPool=cfs.MetadataPool(
+ failureDomain=leaf_type,
+ replicated=cfs.Replicated(
+ size=num_replicas
+ )
+ ),
metadataServer=cfs.MetadataServer(
activeCount=spec.placement.count or 1,
- activeStandby=True
+ activeStandby=True,
+ placement=
+ cfs.Placement(
+ nodeAffinity=cfs.NodeAffinity(
+ requiredDuringSchedulingIgnoredDuringExecution=cfs.RequiredDuringSchedulingIgnoredDuringExecution(
+ nodeSelectorTerms=cfs.NodeSelectorTermsList(
+ [placement_spec_to_node_selector(spec.placement, all_hosts)]
+ )
+ )
+ )
+ )
)
)
)
+ return fs
assert spec.service_id is not None
return self._create_or_patch(
cfs.CephFilesystem, 'cephfilesystems', spec.service_id,