for the corresponding change to appear in the
Ceph cluster (slow)
- Right now, wre calling the k8s API synchronously.
+ Right now, we are calling the k8s API synchronously.
"""
MODULE_OPTIONS = [
super(RookOrchestrator, self).__init__(*args, **kwargs)
self._initialized = threading.Event()
- self._k8s = None
+ self._k8s_CoreV1_api = None
+ self._k8s_BatchV1_api = None
self._rook_cluster = None
self._rook_env = RookEnv()
def k8s(self):
# type: () -> client.CoreV1Api
self._initialized.wait()
- assert self._k8s is not None
- return self._k8s
+ assert self._k8s_CoreV1_api is not None
+ return self._k8s_CoreV1_api
@property
def rook_cluster(self):
from kubernetes.client import configuration
configuration.verify_ssl = False
- self._k8s = client.CoreV1Api()
+ self._k8s_CoreV1_api = client.CoreV1Api()
+ self._k8s_BatchV1_api = client.BatchV1Api()
try:
# XXX mystery hack -- I need to do an API call from
# this context, or subsequent API usage from handle_command
# fails with SSLError('bad handshake'). Suspect some kind of
# thread context setup in SSL lib?
- self._k8s.list_namespaced_pod(cluster_name)
+ self._k8s_CoreV1_api.list_namespaced_pod(cluster_name)
except ApiException:
# Ignore here to make self.available() fail with a proper error message
pass
self._rook_cluster = RookCluster(
- self._k8s,
+ self._k8s_CoreV1_api,
+ self._k8s_BatchV1_api,
self._rook_env)
self._initialized.set()
c = self.get_hosts().then(execute)
return c
+
+ def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> RookCompletion:
+ return write_completion(
+ on_complete=lambda: self.rook_cluster.blink_light(
+ ident_fault, on, locs),
+ message="Switching <{}> identification light in {}".format(
+ on, ",".join(["{}:{}".format(loc.host, loc.dev) for loc in locs])),
+ mgr=self
+ )
import logging
import json
from contextlib import contextmanager
+from time import sleep
import jsonpatch
from six.moves.urllib.parse import urljoin # pylint: disable=import-error
pass # just for type annotations
try:
+ from kubernetes import client, watch
from kubernetes.client.rest import ApiException
- from kubernetes.client import V1ListMeta, CoreV1Api, V1Pod, V1DeleteOptions
- from kubernetes import watch
except ImportError:
class ApiException(Exception): # type: ignore
status = 0
def _fetch(self):
""" Execute the requested api method as a one-off fetch"""
response = self.api_func(**self.kwargs)
- # metadata is a V1ListMeta object type
- metadata = response.metadata # type: V1ListMeta
+ # metadata is a client.V1ListMeta object type
+ metadata = response.metadata # type: client.V1ListMeta
self._items = {item.metadata.name: item for item in response.items}
log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
return metadata.resource_version
class RookCluster(object):
- def __init__(self, k8s, rook_env):
+ def __init__(self, coreV1_api, batchV1_api, rook_env):
self.rook_env = rook_env # type: RookEnv
- self.k8s = k8s # type: CoreV1Api
+ self.coreV1_api = coreV1_api # client.CoreV1Api
+ self.batchV1_api = batchV1_api
# TODO: replace direct k8s calls with Rook API calls
# when they're implemented
- self.inventory_maps = KubernetesResource(self.k8s.list_namespaced_config_map,
+ self.inventory_maps = KubernetesResource(self.coreV1_api.list_namespaced_config_map,
namespace=self.rook_env.operator_namespace,
label_selector="app=rook-discover")
- self.rook_pods = KubernetesResource(self.k8s.list_namespaced_pod,
+ self.rook_pods = KubernetesResource(self.coreV1_api.list_namespaced_pod,
namespace=self.rook_env.namespace,
label_selector="rook_cluster={0}".format(
self.rook_env.cluster_name))
- self.nodes = KubernetesResource(self.k8s.list_node)
+ self.nodes = KubernetesResource(self.coreV1_api.list_node)
def rook_url(self, path):
prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
full_path = self.rook_url(path)
log.debug("[%s] %s" % (verb, full_path))
- return self.k8s.api_client.call_api(
+ return self.coreV1_api.api_client.call_api(
full_path,
verb,
auth_settings=['BearerToken'],
try:
result = [i for i in self.inventory_maps.items if predicate(i)]
- except ApiException as e:
+ except ApiException as dummy_e:
log.exception("Failed to fetch device metadata")
raise
rook_file_system=<self.fs_name>
"""
def predicate(item):
- # type: (V1Pod) -> bool
+ # type: (client.V1Pod) -> bool
metadata = item.metadata
if service_type is not None:
if metadata.labels['app'] != "rook-ceph-{0}".format(service_type):
daemon_id = d['metadata']['labels']['ceph_daemon_id']
name = daemon_type + '.' + daemon_id
if name in names:
- self.k8s.delete_namespaced_pod(
+ self.coreV1_api.delete_namespaced_pod(
d['metadata']['name'],
self.rook_env.namespace,
- body=V1DeleteOptions()
+ body=client.V1DeleteOptions()
)
num += 1
return "Removed %d pods" % num
self.rook_api_post("{}/".format(crd_name),
body=new.to_json())
return "Created"
+ def get_ceph_image(self) -> str:
+ try:
+ api_response = self.coreV1_api.list_namespaced_pod(self.rook_env.namespace,
+ label_selector="app=rook-ceph-mon",
+ timeout_seconds=10)
+ if api_response.items:
+ return api_response.items[-1].spec.containers[0].image
+ else:
+ raise orchestrator.OrchestratorError(
+ "Error getting ceph image. Cluster without monitors")
+ except ApiException as e:
+ raise orchestrator.OrchestratorError("Error getting ceph image: {}".format(e))
+
+
+ def _execute_blight_job(self, ident_fault: str, on: bool, loc: orchestrator.DeviceLightLoc) -> str:
+ operation_id = str(hash(loc))
+ message = ""
+
+ # job definition
+ job_metadata = client.V1ObjectMeta(name=operation_id,
+ namespace= self.rook_env.namespace,
+ labels={"ident": operation_id})
+ pod_metadata = client.V1ObjectMeta(labels={"ident": operation_id})
+ pod_container = client.V1Container(name="ceph-lsmcli-command",
+ security_context=client.V1SecurityContext(privileged=True),
+ image=self.get_ceph_image(),
+ command=["lsmcli",],
+ args=['local-disk-%s-led-%s' % (ident_fault,'on' if on else 'off'),
+ '--path', loc.path or loc.dev,],
+ volume_mounts=[client.V1VolumeMount(name="devices", mount_path="/dev"),
+ client.V1VolumeMount(name="run-udev", mount_path="/run/udev")])
+ pod_spec = client.V1PodSpec(containers=[pod_container],
+ active_deadline_seconds=30, # Max time to terminate pod
+ restart_policy="Never",
+ node_selector= {"kubernetes.io/hostname": loc.host},
+ volumes=[client.V1Volume(name="devices",
+ host_path=client.V1HostPathVolumeSource(path="/dev")),
+ client.V1Volume(name="run-udev",
+ host_path=client.V1HostPathVolumeSource(path="/run/udev"))])
+ pod_template = client.V1PodTemplateSpec(metadata=pod_metadata,
+ spec=pod_spec)
+ job_spec = client.V1JobSpec(active_deadline_seconds=60, # Max time to terminate job
+ ttl_seconds_after_finished=10, # Alfa. Lifetime after finishing (either Complete or Failed)
+ backoff_limit=0,
+ template=pod_template)
+ job = client.V1Job(api_version="batch/v1",
+ kind="Job",
+ metadata=job_metadata,
+ spec=job_spec)
+
+ # delete previous job if it exists
+ try:
+ try:
+ api_response = self.batchV1_api.delete_namespaced_job(operation_id,
+ self.rook_env.namespace,
+ propagation_policy="Background")
+ except ApiException as e:
+ if e.status != 404: # No problem if the job does not exist
+ raise
+
+ # wait until the job is not present
+ deleted = False
+ retries = 0
+ while not deleted and retries < 10:
+ api_response = self.batchV1_api.list_namespaced_job(self.rook_env.namespace,
+ label_selector="ident=%s" % operation_id,
+ timeout_seconds=10)
+ deleted = not api_response.items
+ if retries > 5:
+ sleep(0.1)
+ ++retries
+ if retries == 10 and not deleted:
+ raise orchestrator.OrchestratorError(
+ "Light <{}> in <{}:{}> cannot be executed. Cannot delete previous job <{}>".format(
+ on, loc.host, loc.path or loc.dev, operation_id))
+
+ # create the job
+ api_response = self.batchV1_api.create_namespaced_job(self.rook_env.namespace, job)
+
+ # get the result
+ finished = False
+ while not finished:
+ api_response = self.batchV1_api.read_namespaced_job(operation_id,
+ self.rook_env.namespace)
+ finished = api_response.status.succeeded or api_response.status.failed
+ if finished:
+ message = api_response.status.conditions[-1].message
+
+ # get the result of the lsmcli command
+ api_response=self.coreV1_api.list_namespaced_pod(self.rook_env.namespace,
+ label_selector="ident=%s" % operation_id,
+ timeout_seconds=10)
+ if api_response.items:
+ pod_name = api_response.items[-1].metadata.name
+ message = self.coreV1_api.read_namespaced_pod_log(pod_name,
+ self.rook_env.namespace)
+
+ except ApiException as e:
+ log.exception('K8s API failed. {}'.format(e))
+ raise
+
+ # Finally, delete the job.
+ # The job uses <ttl_seconds_after_finished>. This makes that the TTL controller delete automatically the job.
+ # This feature is in Alpha state, so extra explicit delete operations trying to delete the Job has been used strategically
+ try:
+ api_response = self.batchV1_api.delete_namespaced_job(operation_id,
+ self.rook_env.namespace,
+ propagation_policy="Background")
+ except ApiException as e:
+ if e.status != 404: # No problem if the job does not exist
+ raise
+
+ return message
+
+ def blink_light(self, ident_fault, on, locs):
+ # type: (str, bool, List[orchestrator.DeviceLightLoc]) -> List[str]
+ return [self._execute_blight_job(ident_fault, on, loc) for loc in locs]