+from logging import error
+import logging
import threading
import functools
import os
else:
raise orchestrator.OrchestratorError(f'Service type {service_type} not supported')
+ def zap_device(self, host: str, path: str) -> OrchResult[str]:
+ try:
+ self.rook_cluster.create_zap_job(host, path)
+ except Exception as e:
+ logging.error(e)
+ return OrchResult(None, Exception("Unable to zap device: " + str(e.with_traceback(None))))
+ return OrchResult(f'{path} on {host} zapped')
+
@handle_orch_error
def apply_mon(self, spec):
# type: (ServiceSpec) -> str
ret.append(spec)
return ret
+ def create_zap_job(self, host: str, path: str) -> None:
+ body = client.V1Job(
+ api_version="batch/v1",
+ metadata=client.V1ObjectMeta(
+ name="rook-ceph-device-zap",
+ namespace="rook-ceph"
+ ),
+ spec=client.V1JobSpec(
+ template=client.V1PodTemplateSpec(
+ spec=client.V1PodSpec(
+ containers=[
+ client.V1Container(
+ name="device-zap",
+ image="rook/ceph:master",
+ command=["bash"],
+ args=["-c", f"ceph-volume raw list {path} && dd if=/dev/zero of=\"{path}\" bs=1M count=1 oflag=direct,dsync || ceph-volume lvm zap --destroy {path}"],
+ env=[
+ client.V1EnvVar(
+ name="ROOK_CEPH_USERNAME",
+ value_from=client.V1EnvVarSource(
+ secret_key_ref=client.V1SecretKeySelector(
+ key="ceph-username",
+ name="rook-ceph-mon"
+ )
+ )
+ ),
+ client.V1EnvVar(
+ name="ROOK_CEPH_SECRET",
+ value_from=client.V1EnvVarSource(
+ secret_key_ref=client.V1SecretKeySelector(
+ key="ceph-secret",
+ name="rook-ceph-mon"
+ )
+ )
+ )
+ ],
+ security_context=client.V1SecurityContext(
+ privileged=True
+ ),
+ volume_mounts=[
+ client.V1VolumeMount(
+ mount_path="/etc/ceph",
+ name="ceph-conf-emptydir"
+ ),
+ client.V1VolumeMount(
+ mount_path="/etc/rook",
+ name="rook-config"
+ ),
+ client.V1VolumeMount(
+ mount_path="/dev",
+ name="devices"
+ )
+ ]
+ )
+ ],
+ volumes=[
+ client.V1Volume(
+ name="ceph-conf-emptydir",
+ empty_dir=client.V1EmptyDirVolumeSource()
+ ),
+ client.V1Volume(
+ name="rook-config",
+ empty_dir=client.V1EmptyDirVolumeSource()
+ ),
+ client.V1Volume(
+ name="devices",
+ host_path=client.V1HostPathVolumeSource(
+ path="/dev"
+ )
+ ),
+ ],
+ node_selector={
+ "kubernetes.io/hostname": host
+ },
+ restart_policy="Never"
+ )
+ )
+ )
+ )
+ self.batchV1_api.create_namespaced_job('rook-ceph', body)
+
def _patch(self, crd: Type, crd_name: str, cr_name: str, func: Callable[[CrdClassT, CrdClassT], CrdClassT]) -> str:
current_json = self.rook_api_get(
"{}/{}".format(crd_name, cr_name)