From 2ed1dac04004dc39f5fe3b73ee4ad7d75b886d79 Mon Sep 17 00:00:00 2001 From: Sebastian Wagner Date: Mon, 1 Apr 2019 15:25:33 +0200 Subject: [PATCH] mgr/rook: Support other system namespaces Fixes http://tracker.ceph.com/issues/38799 Signed-off-by: Sebastian Wagner --- src/pybind/mgr/rook/module.py | 61 +++++++++++++++++------------ src/pybind/mgr/rook/rook_cluster.py | 59 ++++++++++++---------------- 2 files changed, 60 insertions(+), 60 deletions(-) diff --git a/src/pybind/mgr/rook/module.py b/src/pybind/mgr/rook/module.py index ec96f34679ea9..23e2dd6ec4c34 100644 --- a/src/pybind/mgr/rook/module.py +++ b/src/pybind/mgr/rook/module.py @@ -134,6 +134,25 @@ def deferred_read(f): return wrapper +class RookEnv(object): + def __init__(self): + # POD_NAMESPACE already exist for Rook 0.9 + pod_namespace = os.environ.get('POD_NAMESPACE', 'rook-ceph') + self.cluster_ns = os.environ.get('ROOK_CLUSTER_NS', pod_namespace) + + # ROOK_CLUSTER_NAME was a previously used env var name. + rook_cluster_name = os.environ.get('ROOK_CLUSTER_NAME', pod_namespace) + # ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0 + self.cluster_name = os.environ.get('ROOK_CEPH_CLUSTER_CRD_NAME', rook_cluster_name) + + self.operator_ns = os.environ.get('ROOK_OPERATOR_NAMESPACE', "rook-ceph-system") + self.crd_version = os.environ.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1') + self.api_name = "ceph.rook.io/" + self.crd_version + + def api_version_match(self): + return self.crd_version == 'v1' + + class RookOrchestrator(MgrModule, orchestrator.Orchestrator): MODULE_OPTIONS = [ # TODO: configure k8s API addr instead of assuming local @@ -190,19 +209,20 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): @staticmethod def can_run(): - if kubernetes_imported: - return True, "" - else: + if not kubernetes_imported: return False, "`kubernetes` python module not found" + if not RookEnv().api_version_match(): + return False, "Rook version unsupported." + return True, '' def available(self): if not kubernetes_imported: return False, "`kubernetes` python module not found" - elif not self._in_cluster_name: + elif not self._rook_env.cluster_ns: return False, "ceph-mgr not running in Rook cluster" try: - self.k8s.list_namespaced_pod(self.rook_cluster.cluster_name) + self.k8s.list_namespaced_pod(self._rook_env.cluster_name) except ApiException as e: return False, "Cannot reach Kubernetes API: {}".format(e) else: @@ -214,6 +234,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): self._initialized = threading.Event() self._k8s = None self._rook_cluster = None + self._rook_env = RookEnv() self._shutdown = threading.Event() @@ -227,35 +248,23 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): @property def rook_cluster(self): + # type: () -> RookCluster self._initialized.wait() return self._rook_cluster - @property - def _in_cluster_name(self): - """ - Check if we appear to be running inside a Kubernetes/Rook - cluster - - :return: str - """ - if 'POD_NAMESPACE' in os.environ: - return os.environ['POD_NAMESPACE'] - if 'ROOK_CLUSTER_NAME' in os.environ: - return os.environ['ROOK_CLUSTER_NAME'] - def serve(self): # For deployed clusters, we should always be running inside # a Rook cluster. For development convenience, also support # running outside (reading ~/.kube config) - if self._in_cluster_name: + if self._rook_env.cluster_name: config.load_incluster_config() - cluster_name = self._in_cluster_name + cluster_name = self._rook_env.cluster_name else: self.log.warning("DEVELOPMENT ONLY: Reading kube config from ~") config.load_kube_config() - cluster_name = "rook" + cluster_name = "rook-ceph" # So that I can do port forwarding from my workstation - jcsp from kubernetes.client import configuration @@ -275,7 +284,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): self._rook_cluster = RookCluster( self._k8s, - cluster_name) + self._rook_env) self._initialized.set() @@ -334,11 +343,11 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): return result @deferred_read - def describe_service(self, service_type, service_id, nodename): + def describe_service(self, service_type=None, service_id=None, node_name=None): assert service_type in ("mds", "osd", "mgr", "mon", "nfs", None), service_type + " unsupported" - pods = self.rook_cluster.describe_pods(service_type, service_id, nodename) + pods = self.rook_cluster.describe_pods(service_type, service_id, node_name) result = [] for p in pods: @@ -436,8 +445,8 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): def is_complete(): # Find OSD pods on this host pod_osd_ids = set() - pods = self._k8s.list_namespaced_pod("rook-ceph", - label_selector="rook_cluster=rook-ceph,app=rook-ceph-osd", + pods = self._k8s.list_namespaced_pod(self._rook_env.cluster_ns, + label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name), field_selector="spec.nodeName={0}".format( drive_group.hosts(all_hosts)[0] )).items diff --git a/src/pybind/mgr/rook/rook_cluster.py b/src/pybind/mgr/rook/rook_cluster.py index d229c13e36667..060d001318a44 100644 --- a/src/pybind/mgr/rook/rook_cluster.py +++ b/src/pybind/mgr/rook/rook_cluster.py @@ -6,11 +6,12 @@ call methods. This module is runnable outside of ceph-mgr, useful for testing. """ - -from six.moves.urllib.parse import urljoin # pylint: disable=import-error import logging import json from contextlib import contextmanager +from typing import List + +from six.moves.urllib.parse import urljoin # pylint: disable=import-error # Optional kubernetes imports to enable MgrModule.can_run # to behave cleanly. @@ -21,15 +22,12 @@ except ImportError: try: import orchestrator + from rook.module import RookEnv except ImportError: pass # just used for type checking. -ROOK_SYSTEM_NS = "rook-ceph-system" -ROOK_API_VERSION = "v1" -ROOK_API_NAME = "ceph.rook.io/%s" % ROOK_API_VERSION - -log = logging.getLogger('rook') +log = logging.getLogger(__name__) class ApplyException(Exception): @@ -41,20 +39,13 @@ class ApplyException(Exception): class RookCluster(object): - def __init__(self, k8s, cluster_name): - self.cluster_name = cluster_name + def __init__(self, k8s, rook_env): + self.rook_env = rook_env # type: RookEnv self.k8s = k8s - @property - def rook_namespace(self): - # For the moment, assume Rook NS always equal to cluster name - # (this is also assumed some places in Rook source, may - # be formalized at some point) - return self.cluster_name - def rook_url(self, path): prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % ( - ROOK_API_VERSION, self.rook_namespace) + self.rook_env.crd_version, self.rook_env.cluster_ns) return urljoin(prefix, path) def rook_api_call(self, verb, path, **kwargs): @@ -96,10 +87,10 @@ class RookCluster(object): try: result = self.k8s.list_namespaced_config_map( - ROOK_SYSTEM_NS, + self.rook_env.operator_ns, label_selector=label_selector) except ApiException as e: - log.warning("Failed to fetch device metadata: {0}".format(e)) + log.exception("Failed to fetch device metadata: {0}".format(e)) raise nodename_to_devices = {} @@ -149,7 +140,7 @@ class RookCluster(object): # Label filter is rook_cluster= # rook_file_system= - label_filter = "rook_cluster={0}".format(self.cluster_name) + label_filter = "rook_cluster={0}".format(self.rook_env.cluster_name) if service_type != None: label_filter += ",app=rook-ceph-{0}".format(service_type) if service_id != None: @@ -171,10 +162,10 @@ class RookCluster(object): field_filter = "" if nodename != None: - field_filter = "spec.nodeName={0}".format(nodename); + field_filter = "spec.nodeName={0}".format(nodename) pods = self.k8s.list_namespaced_pod( - self.rook_namespace, + self.rook_env.cluster_ns, label_selector=label_filter, field_selector=field_filter) @@ -213,11 +204,11 @@ class RookCluster(object): # to action. rook_fs = { - "apiVersion": ROOK_API_NAME, + "apiVersion": self.rook_env.api_name, "kind": "CephFilesystem", "metadata": { "name": spec.name, - "namespace": self.rook_namespace + "namespace": self.rook_env.cluster_ns }, "spec": { "onlyManageDaemons": True, @@ -238,11 +229,11 @@ class RookCluster(object): # to action. rook_nfsgw = { - "apiVersion": ROOK_API_NAME, + "apiVersion": self.rook_env.api_name, "kind": "CephNFS", "metadata": { "name": spec.name, - "namespace": self.rook_namespace + "namespace": self.rook_env.cluster_ns }, "spec": { "rados": { @@ -262,11 +253,11 @@ class RookCluster(object): def add_objectstore(self, spec): rook_os = { - "apiVersion": ROOK_API_NAME, + "apiVersion": self.rook_env.api_name, "kind": "CephObjectStore", "metadata": { "name": spec.name, - "namespace": self.rook_namespace + "namespace": self.rook_env.cluster_ns }, "spec": { "metaDataPool": { @@ -316,7 +307,7 @@ class RookCluster(object): def can_create_osd(self): current_cluster = self.rook_api_get( - "cephclusters/{0}".format(self.cluster_name)) + "cephclusters/{0}".format(self.rook_env.cluster_name)) use_all_nodes = current_cluster['spec'].get('useAllNodes', False) # If useAllNodes is set, then Rook will not be paying attention @@ -325,7 +316,7 @@ class RookCluster(object): def node_exists(self, node_name): try: - self.k8s.read_node(node_name) + self.k8s.read_node(node_name, exact=False, export=True) except ApiException as e: if e.status == 404: return False @@ -339,7 +330,7 @@ class RookCluster(object): try: self.rook_api_patch( - "cephclusters/{0}".format(self.cluster_name), + "cephclusters/{0}".format(self.rook_env.cluster_name), body=patch) except ApiException as e: log.exception("API exception: {0}".format(e)) @@ -362,7 +353,7 @@ class RookCluster(object): return "Updated NFS server count for {0} to {1}".format(svc_id, newcount) def add_osds(self, drive_group, all_hosts): - # type: (orchestrator.DriveGroupSpec, List[str]) -> None + # type: (orchestrator.DriveGroupSpec, List[str]) -> str """ Rook currently (0.8) can only do single-drive OSDs, so we treat all drive groups as just a list of individual OSDs. @@ -381,7 +372,7 @@ class RookCluster(object): # storeType: bluestore current_cluster = self.rook_api_get( - "cephclusters/{0}".format(self.cluster_name)) + "cephclusters/{0}".format(self.rook_env.cluster_name)) patch = [] @@ -440,7 +431,7 @@ class RookCluster(object): try: self.rook_api_patch( - "cephclusters/{0}".format(self.cluster_name), + "cephclusters/{0}".format(self.rook_env.cluster_name), body=patch) except ApiException as e: log.exception("API exception: {0}".format(e)) -- 2.39.5