]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/rook: Blinking lights 33366/head
authorJuan Miguel Olmo Martínez <jolmomar@redhat.com>
Mon, 17 Feb 2020 09:59:46 +0000 (10:59 +0100)
committerJuan Miguel Olmo Martínez <jolmomar@redhat.com>
Thu, 26 Mar 2020 07:45:08 +0000 (08:45 +0100)
Blinking lights implementation

Signed-off-by: Juan Miguel Olmo Martínez <jolmomar@redhat.com>
src/pybind/mgr/rook/module.py
src/pybind/mgr/rook/rook_cluster.py

index 1fe17a60ab4f0fb1f293f029ffc900e2475e8d76..959a75952a410de6fc6d741609caf253b8871550 100644 (file)
@@ -94,7 +94,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
     for the corresponding change to appear in the
     Ceph cluster (slow)
 
-    Right now, wre calling the k8s API synchronously.
+    Right now, we are calling the k8s API synchronously.
     """
 
     MODULE_OPTIONS = [
@@ -135,7 +135,8 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         super(RookOrchestrator, self).__init__(*args, **kwargs)
 
         self._initialized = threading.Event()
-        self._k8s = None
+        self._k8s_CoreV1_api = None
+        self._k8s_BatchV1_api = None
         self._rook_cluster = None
         self._rook_env = RookEnv()
 
@@ -150,8 +151,8 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
     def k8s(self):
         # type: () -> client.CoreV1Api
         self._initialized.wait()
-        assert self._k8s is not None
-        return self._k8s
+        assert self._k8s_CoreV1_api is not None
+        return self._k8s_CoreV1_api
 
     @property
     def rook_cluster(self):
@@ -178,20 +179,22 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
             from kubernetes.client import configuration
             configuration.verify_ssl = False
 
-        self._k8s = client.CoreV1Api()
+        self._k8s_CoreV1_api = client.CoreV1Api()
+        self._k8s_BatchV1_api = client.BatchV1Api()
 
         try:
             # XXX mystery hack -- I need to do an API call from
             # this context, or subsequent API usage from handle_command
             # fails with SSLError('bad handshake').  Suspect some kind of
             # thread context setup in SSL lib?
-            self._k8s.list_namespaced_pod(cluster_name)
+            self._k8s_CoreV1_api.list_namespaced_pod(cluster_name)
         except ApiException:
             # Ignore here to make self.available() fail with a proper error message
             pass
 
         self._rook_cluster = RookCluster(
-            self._k8s,
+            self._k8s_CoreV1_api,
+            self._k8s_BatchV1_api,
             self._rook_env)
 
         self._initialized.set()
@@ -550,3 +553,12 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         c = self.get_hosts().then(execute)
         return c
+
+    def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> RookCompletion:
+        return write_completion(
+            on_complete=lambda: self.rook_cluster.blink_light(
+                ident_fault, on, locs),
+            message="Switching <{}> identification light in {}".format(
+                on, ",".join(["{}:{}".format(loc.host, loc.dev) for loc in locs])),
+            mgr=self
+        )
index f177c1668e8040b4470e870a819efa35aa4e9545..cd4b58990039c88b2d659a5f7d33d389bd724582 100644 (file)
@@ -11,6 +11,7 @@ import threading
 import logging
 import json
 from contextlib import contextmanager
+from time import sleep
 
 import jsonpatch
 from six.moves.urllib.parse import urljoin  # pylint: disable=import-error
@@ -29,9 +30,8 @@ except ImportError:
     pass  # just for type annotations
 
 try:
+    from kubernetes import client, watch
     from kubernetes.client.rest import ApiException
-    from kubernetes.client import V1ListMeta, CoreV1Api, V1Pod, V1DeleteOptions
-    from kubernetes import watch
 except ImportError:
     class ApiException(Exception):  # type: ignore
         status = 0
@@ -108,8 +108,8 @@ class KubernetesResource(object):
     def _fetch(self):
         """ Execute the requested api method as a one-off fetch"""
         response = self.api_func(**self.kwargs)
-        # metadata is a V1ListMeta object type
-        metadata = response.metadata  # type: V1ListMeta
+        # metadata is a client.V1ListMeta object type
+        metadata = response.metadata  # type: client.V1ListMeta
         self._items = {item.metadata.name: item for item in response.items}
         log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
         return metadata.resource_version
@@ -183,21 +183,22 @@ class KubernetesResource(object):
 
 
 class RookCluster(object):
-    def __init__(self, k8s, rook_env):
+    def __init__(self, coreV1_api, batchV1_api, rook_env):
         self.rook_env = rook_env  # type: RookEnv
-        self.k8s = k8s  # type: CoreV1Api
+        self.coreV1_api = coreV1_api  # client.CoreV1Api
+        self.batchV1_api = batchV1_api
 
         #  TODO: replace direct k8s calls with Rook API calls
         # when they're implemented
-        self.inventory_maps = KubernetesResource(self.k8s.list_namespaced_config_map,
+        self.inventory_maps = KubernetesResource(self.coreV1_api.list_namespaced_config_map,
                                                  namespace=self.rook_env.operator_namespace,
                                                  label_selector="app=rook-discover")
 
-        self.rook_pods = KubernetesResource(self.k8s.list_namespaced_pod,
+        self.rook_pods = KubernetesResource(self.coreV1_api.list_namespaced_pod,
                                             namespace=self.rook_env.namespace,
                                             label_selector="rook_cluster={0}".format(
                                                 self.rook_env.cluster_name))
-        self.nodes = KubernetesResource(self.k8s.list_node)
+        self.nodes = KubernetesResource(self.coreV1_api.list_node)
 
     def rook_url(self, path):
         prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
@@ -208,7 +209,7 @@ class RookCluster(object):
         full_path = self.rook_url(path)
         log.debug("[%s] %s" % (verb, full_path))
 
-        return self.k8s.api_client.call_api(
+        return self.coreV1_api.api_client.call_api(
             full_path,
             verb,
             auth_settings=['BearerToken'],
@@ -240,7 +241,7 @@ class RookCluster(object):
 
         try:
             result = [i for i in self.inventory_maps.items if predicate(i)]
-        except ApiException as e:
+        except ApiException as dummy_e:
             log.exception("Failed to fetch device metadata")
             raise
 
@@ -287,7 +288,7 @@ class RookCluster(object):
                         rook_file_system=<self.fs_name>
         """
         def predicate(item):
-            # type: (V1Pod) -> bool
+            # type: (client.V1Pod) -> bool
             metadata = item.metadata
             if service_type is not None:
                 if metadata.labels['app'] != "rook-ceph-{0}".format(service_type):
@@ -361,10 +362,10 @@ class RookCluster(object):
             daemon_id = d['metadata']['labels']['ceph_daemon_id']
             name = daemon_type + '.' + daemon_id
             if name in names:
-                self.k8s.delete_namespaced_pod(
+                self.coreV1_api.delete_namespaced_pod(
                     d['metadata']['name'],
                     self.rook_env.namespace,
-                    body=V1DeleteOptions()
+                    body=client.V1DeleteOptions()
                 )
                 num += 1
         return "Removed %d pods" % num
@@ -652,3 +653,120 @@ class RookCluster(object):
                 self.rook_api_post("{}/".format(crd_name),
                                    body=new.to_json())
             return "Created"
+    def get_ceph_image(self) -> str:
+        try:
+            api_response = self.coreV1_api.list_namespaced_pod(self.rook_env.namespace,
+                                                               label_selector="app=rook-ceph-mon",
+                                                               timeout_seconds=10)
+            if api_response.items:
+                return api_response.items[-1].spec.containers[0].image
+            else:
+                raise orchestrator.OrchestratorError(
+                        "Error getting ceph image. Cluster without monitors")
+        except ApiException as e:
+            raise orchestrator.OrchestratorError("Error getting ceph image: {}".format(e))
+
+
+    def _execute_blight_job(self, ident_fault: str, on: bool, loc: orchestrator.DeviceLightLoc) -> str:
+        operation_id = str(hash(loc))
+        message = ""
+
+        # job definition
+        job_metadata = client.V1ObjectMeta(name=operation_id,
+                                           namespace= self.rook_env.namespace,
+                                           labels={"ident": operation_id})
+        pod_metadata = client.V1ObjectMeta(labels={"ident": operation_id})
+        pod_container = client.V1Container(name="ceph-lsmcli-command",
+                                           security_context=client.V1SecurityContext(privileged=True),
+                                           image=self.get_ceph_image(),
+                                           command=["lsmcli",],
+                                           args=['local-disk-%s-led-%s' % (ident_fault,'on' if on else 'off'),
+                                                 '--path', loc.path or loc.dev,],
+                                           volume_mounts=[client.V1VolumeMount(name="devices", mount_path="/dev"),
+                                                          client.V1VolumeMount(name="run-udev", mount_path="/run/udev")])
+        pod_spec = client.V1PodSpec(containers=[pod_container],
+                                    active_deadline_seconds=30, # Max time to terminate pod
+                                    restart_policy="Never",
+                                    node_selector= {"kubernetes.io/hostname": loc.host},
+                                    volumes=[client.V1Volume(name="devices",
+                                                             host_path=client.V1HostPathVolumeSource(path="/dev")),
+                                             client.V1Volume(name="run-udev",
+                                                             host_path=client.V1HostPathVolumeSource(path="/run/udev"))])
+        pod_template = client.V1PodTemplateSpec(metadata=pod_metadata,
+                                                  spec=pod_spec)
+        job_spec = client.V1JobSpec(active_deadline_seconds=60, # Max time to terminate job
+                                    ttl_seconds_after_finished=10, # Alfa. Lifetime after finishing (either Complete or Failed)
+                                    backoff_limit=0,
+                                    template=pod_template)
+        job = client.V1Job(api_version="batch/v1",
+                           kind="Job",
+                           metadata=job_metadata,
+                           spec=job_spec)
+
+        # delete previous job if it exists
+        try:
+            try:
+                api_response = self.batchV1_api.delete_namespaced_job(operation_id,
+                                                                      self.rook_env.namespace,
+                                                                      propagation_policy="Background")
+            except ApiException as e:
+                if e.status != 404: # No problem if the job does not exist
+                    raise
+
+            # wait until the job is not present
+            deleted = False
+            retries = 0
+            while not deleted and retries < 10:
+                api_response = self.batchV1_api.list_namespaced_job(self.rook_env.namespace,
+                                                                    label_selector="ident=%s" % operation_id,
+                                                                    timeout_seconds=10)
+                deleted = not api_response.items
+                if retries > 5:
+                    sleep(0.1)
+                ++retries
+            if retries == 10 and not deleted:
+                raise orchestrator.OrchestratorError(
+                    "Light <{}> in <{}:{}> cannot be executed. Cannot delete previous job <{}>".format(
+                            on, loc.host, loc.path or loc.dev, operation_id))
+
+            # create the job
+            api_response = self.batchV1_api.create_namespaced_job(self.rook_env.namespace, job)
+
+            # get the result
+            finished = False
+            while not finished:
+                api_response = self.batchV1_api.read_namespaced_job(operation_id,
+                                                                    self.rook_env.namespace)
+                finished = api_response.status.succeeded or api_response.status.failed
+                if finished:
+                    message = api_response.status.conditions[-1].message
+
+            # get the result of the lsmcli command
+            api_response=self.coreV1_api.list_namespaced_pod(self.rook_env.namespace,
+                                                             label_selector="ident=%s" % operation_id,
+                                                             timeout_seconds=10)
+            if api_response.items:
+                pod_name = api_response.items[-1].metadata.name
+                message = self.coreV1_api.read_namespaced_pod_log(pod_name,
+                                                                  self.rook_env.namespace)
+
+        except ApiException as e:
+            log.exception('K8s API failed. {}'.format(e))
+            raise
+
+        # Finally, delete the job.
+        # The job uses <ttl_seconds_after_finished>. This makes that the TTL controller delete automatically the job.
+        # This feature is in Alpha state, so extra explicit delete operations trying to delete the Job has been used strategically
+        try:
+            api_response = self.batchV1_api.delete_namespaced_job(operation_id,
+                                                                  self.rook_env.namespace,
+                                                                  propagation_policy="Background")
+        except ApiException as e:
+            if e.status != 404: # No problem if the job does not exist
+                raise
+
+        return message
+
+    def blink_light(self, ident_fault, on, locs):
+        # type: (str, bool, List[orchestrator.DeviceLightLoc]) -> List[str]
+        return [self._execute_blight_job(ident_fault, on, loc) for loc in locs]