- ceph orch device ls
- ceph orch apply rgw foo
- ceph orch apply mds foo
+ - ceph orch apply rbd-mirror
if ret.exitstatus == 0:
r = json.loads(ret.stdout.getvalue().decode('utf-8'))
for service in r:
- if service['service_type'] in ['rgw', 'mds', 'nfs']:
+ if service['service_type'] in ['rgw', 'mds', 'nfs', 'rbd-mirror']:
_shell(ctx, config, ['ceph', 'orch', 'rm', service['service_name']])
to_remove.append(service['service_name'])
with safe_while(sleep=10, tries=90, action="waiting for service removal") as proceed:
last_refresh=now,
running= sum(osd.status.phase == 'Running' for osd in all_osds)
)
+
+ if service_type == 'rbd-mirror' or service_type is None:
+ # rbd-mirrors
+ all_mirrors = self.rook_cluster.get_resource("cephrbdmirrors")
+ for mirror in all_mirrors:
+ logging.warn(mirror)
+ mirror_name = mirror['metadata']['name']
+ svc = 'rbd-mirror.' + mirror_name
+ if svc in spec:
+ continue
+ spec[svc] = orchestrator.ServiceDescription(
+ spec=ServiceSpec(
+ service_id=mirror_name,
+ service_type="rbd-mirror",
+ placement=PlacementSpec(count=1),
+ ),
+ size=1,
+ last_refresh=now,
+ )
+
for dd in self._list_daemons():
if dd.service_name() not in spec:
continue
@handle_orch_error
def remove_service(self, service_name: str) -> str:
+ if service_name == 'rbd-mirror':
+ return self.rook_cluster.rm_service('cephrbdmirrors', 'default-rbd-mirror')
service_type, service_name = service_name.split('.', 1)
if service_type == 'mds':
return self.rook_cluster.rm_service('cephfilesystems', service_name)
return self.rook_cluster.rm_service('cephobjectstores', service_name)
elif service_type == 'nfs':
return self.rook_cluster.rm_service('cephnfses', service_name)
+ elif service_type == 'rbd-mirror':
+ return self.rook_cluster.rm_service('cephrbdmirrors', service_name)
else:
raise orchestrator.OrchestratorError(f'Service type {service_type} not supported')
return self.rook_cluster.update_mon_count(spec.placement.count)
+ def apply_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[str]:
+ try:
+ self.rook_cluster.rbd_mirror(spec)
+ return OrchResult("Success")
+ except Exception as e:
+ return OrchResult(None, e)
+
@handle_orch_error
def apply_mds(self, spec):
# type: (ServiceSpec) -> str
from .rook_client.ceph import cephnfs as cnfs
from .rook_client.ceph import cephobjectstore as cos
from .rook_client.ceph import cephcluster as ccl
+from .rook_client.ceph import cephrbdmirror as crbdm
from .rook_client._helper import CrdClass
import orchestrator
)
self.batchV1_api.create_namespaced_job('rook-ceph', body)
+ def rbd_mirror(self, spec: ServiceSpec) -> None:
+ service_id = spec.service_id or "default-rbd-mirror"
+ all_hosts = self.get_hosts()
+ def _create_rbd_mirror() -> crbdm.CephRBDMirror:
+ return crbdm.CephRBDMirror(
+ apiVersion=self.rook_env.api_name,
+ metadata=dict(
+ name=service_id,
+ namespace=self.rook_env.namespace,
+ ),
+ spec=crbdm.Spec(
+ count=spec.placement.count or 1,
+ placement=crbdm.Placement(
+ nodeAffinity=crbdm.NodeAffinity(
+ requiredDuringSchedulingIgnoredDuringExecution=crbdm.RequiredDuringSchedulingIgnoredDuringExecution(
+ nodeSelectorTerms=crbdm.NodeSelectorTermsList(
+ [
+ placement_spec_to_node_selector(spec.placement, all_hosts)
+ ]
+ )
+ )
+ )
+ )
+ )
+ )
+ def _update_rbd_mirror(new: crbdm.CephRBDMirror) -> crbdm.CephRBDMirror:
+ new.spec.count = spec.placement.count or 1
+ new.spec.placement = crbdm.Placement(
+ nodeAffinity=crbdm.NodeAffinity(
+ requiredDuringSchedulingIgnoredDuringExecution=crbdm.RequiredDuringSchedulingIgnoredDuringExecution(
+ nodeSelectorTerms=crbdm.NodeSelectorTermsList(
+ [
+ placement_spec_to_node_selector(spec.placement, all_hosts)
+ ]
+ )
+ )
+ )
+ )
+ return new
+ self._create_or_patch(crbdm.CephRBDMirror, 'cephrbdmirrors', service_id, _update_rbd_mirror, _create_rbd_mirror)
def _patch(self, crd: Type, crd_name: str, cr_name: str, func: Callable[[CrdClassT, CrdClassT], CrdClassT]) -> str:
current_json = self.rook_api_get(
"{}/{}".format(crd_name, cr_name)