return wrapper
+class RookEnv(object):
+ def __init__(self):
+ # POD_NAMESPACE already exist for Rook 0.9
+ pod_namespace = os.environ.get('POD_NAMESPACE', 'rook-ceph')
+ self.cluster_ns = os.environ.get('ROOK_CLUSTER_NS', pod_namespace)
+
+ # ROOK_CLUSTER_NAME was a previously used env var name.
+ rook_cluster_name = os.environ.get('ROOK_CLUSTER_NAME', pod_namespace)
+ # ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0
+ self.cluster_name = os.environ.get('ROOK_CEPH_CLUSTER_CRD_NAME', rook_cluster_name)
+
+ self.operator_ns = os.environ.get('ROOK_OPERATOR_NAMESPACE', "rook-ceph-system")
+ self.crd_version = os.environ.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
+ self.api_name = "ceph.rook.io/" + self.crd_version
+
+ def api_version_match(self):
+ return self.crd_version == 'v1'
+
+
class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
MODULE_OPTIONS = [
# TODO: configure k8s API addr instead of assuming local
@staticmethod
def can_run():
- if kubernetes_imported:
- return True, ""
- else:
+ if not kubernetes_imported:
return False, "`kubernetes` python module not found"
+ if not RookEnv().api_version_match():
+ return False, "Rook version unsupported."
+ return True, ''
def available(self):
if not kubernetes_imported:
return False, "`kubernetes` python module not found"
- elif not self._in_cluster_name:
+ elif not self._rook_env.cluster_ns:
return False, "ceph-mgr not running in Rook cluster"
try:
- self.k8s.list_namespaced_pod(self.rook_cluster.cluster_name)
+ self.k8s.list_namespaced_pod(self._rook_env.cluster_name)
except ApiException as e:
return False, "Cannot reach Kubernetes API: {}".format(e)
else:
self._initialized = threading.Event()
self._k8s = None
self._rook_cluster = None
+ self._rook_env = RookEnv()
self._shutdown = threading.Event()
@property
def rook_cluster(self):
+ # type: () -> RookCluster
self._initialized.wait()
return self._rook_cluster
- @property
- def _in_cluster_name(self):
- """
- Check if we appear to be running inside a Kubernetes/Rook
- cluster
-
- :return: str
- """
- if 'POD_NAMESPACE' in os.environ:
- return os.environ['POD_NAMESPACE']
- if 'ROOK_CLUSTER_NAME' in os.environ:
- return os.environ['ROOK_CLUSTER_NAME']
-
def serve(self):
# For deployed clusters, we should always be running inside
# a Rook cluster. For development convenience, also support
# running outside (reading ~/.kube config)
- if self._in_cluster_name:
+ if self._rook_env.cluster_name:
config.load_incluster_config()
- cluster_name = self._in_cluster_name
+ cluster_name = self._rook_env.cluster_name
else:
self.log.warning("DEVELOPMENT ONLY: Reading kube config from ~")
config.load_kube_config()
- cluster_name = "rook"
+ cluster_name = "rook-ceph"
# So that I can do port forwarding from my workstation - jcsp
from kubernetes.client import configuration
self._rook_cluster = RookCluster(
self._k8s,
- cluster_name)
+ self._rook_env)
self._initialized.set()
return result
@deferred_read
- def describe_service(self, service_type, service_id, nodename):
+ def describe_service(self, service_type=None, service_id=None, node_name=None):
assert service_type in ("mds", "osd", "mgr", "mon", "nfs", None), service_type + " unsupported"
- pods = self.rook_cluster.describe_pods(service_type, service_id, nodename)
+ pods = self.rook_cluster.describe_pods(service_type, service_id, node_name)
result = []
for p in pods:
def is_complete():
# Find OSD pods on this host
pod_osd_ids = set()
- pods = self._k8s.list_namespaced_pod("rook-ceph",
- label_selector="rook_cluster=rook-ceph,app=rook-ceph-osd",
+ pods = self._k8s.list_namespaced_pod(self._rook_env.cluster_ns,
+ label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
field_selector="spec.nodeName={0}".format(
drive_group.hosts(all_hosts)[0]
)).items
This module is runnable outside of ceph-mgr, useful for testing.
"""
-
-from six.moves.urllib.parse import urljoin # pylint: disable=import-error
import logging
import json
from contextlib import contextmanager
+from typing import List
+
+from six.moves.urllib.parse import urljoin # pylint: disable=import-error
# Optional kubernetes imports to enable MgrModule.can_run
# to behave cleanly.
try:
import orchestrator
+ from rook.module import RookEnv
except ImportError:
pass # just used for type checking.
-ROOK_SYSTEM_NS = "rook-ceph-system"
-ROOK_API_VERSION = "v1"
-ROOK_API_NAME = "ceph.rook.io/%s" % ROOK_API_VERSION
-
-log = logging.getLogger('rook')
+log = logging.getLogger(__name__)
class ApplyException(Exception):
class RookCluster(object):
- def __init__(self, k8s, cluster_name):
- self.cluster_name = cluster_name
+ def __init__(self, k8s, rook_env):
+ self.rook_env = rook_env # type: RookEnv
self.k8s = k8s
- @property
- def rook_namespace(self):
- # For the moment, assume Rook NS always equal to cluster name
- # (this is also assumed some places in Rook source, may
- # be formalized at some point)
- return self.cluster_name
-
def rook_url(self, path):
prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
- ROOK_API_VERSION, self.rook_namespace)
+ self.rook_env.crd_version, self.rook_env.cluster_ns)
return urljoin(prefix, path)
def rook_api_call(self, verb, path, **kwargs):
try:
result = self.k8s.list_namespaced_config_map(
- ROOK_SYSTEM_NS,
+ self.rook_env.operator_ns,
label_selector=label_selector)
except ApiException as e:
- log.warning("Failed to fetch device metadata: {0}".format(e))
+ log.exception("Failed to fetch device metadata: {0}".format(e))
raise
nodename_to_devices = {}
# Label filter is rook_cluster=<cluster name>
# rook_file_system=<self.fs_name>
- label_filter = "rook_cluster={0}".format(self.cluster_name)
+ label_filter = "rook_cluster={0}".format(self.rook_env.cluster_name)
if service_type != None:
label_filter += ",app=rook-ceph-{0}".format(service_type)
if service_id != None:
field_filter = ""
if nodename != None:
- field_filter = "spec.nodeName={0}".format(nodename);
+ field_filter = "spec.nodeName={0}".format(nodename)
pods = self.k8s.list_namespaced_pod(
- self.rook_namespace,
+ self.rook_env.cluster_ns,
label_selector=label_filter,
field_selector=field_filter)
# to action.
rook_fs = {
- "apiVersion": ROOK_API_NAME,
+ "apiVersion": self.rook_env.api_name,
"kind": "CephFilesystem",
"metadata": {
"name": spec.name,
- "namespace": self.rook_namespace
+ "namespace": self.rook_env.cluster_ns
},
"spec": {
"onlyManageDaemons": True,
# to action.
rook_nfsgw = {
- "apiVersion": ROOK_API_NAME,
+ "apiVersion": self.rook_env.api_name,
"kind": "CephNFS",
"metadata": {
"name": spec.name,
- "namespace": self.rook_namespace
+ "namespace": self.rook_env.cluster_ns
},
"spec": {
"rados": {
def add_objectstore(self, spec):
rook_os = {
- "apiVersion": ROOK_API_NAME,
+ "apiVersion": self.rook_env.api_name,
"kind": "CephObjectStore",
"metadata": {
"name": spec.name,
- "namespace": self.rook_namespace
+ "namespace": self.rook_env.cluster_ns
},
"spec": {
"metaDataPool": {
def can_create_osd(self):
current_cluster = self.rook_api_get(
- "cephclusters/{0}".format(self.cluster_name))
+ "cephclusters/{0}".format(self.rook_env.cluster_name))
use_all_nodes = current_cluster['spec'].get('useAllNodes', False)
# If useAllNodes is set, then Rook will not be paying attention
def node_exists(self, node_name):
try:
- self.k8s.read_node(node_name)
+ self.k8s.read_node(node_name, exact=False, export=True)
except ApiException as e:
if e.status == 404:
return False
try:
self.rook_api_patch(
- "cephclusters/{0}".format(self.cluster_name),
+ "cephclusters/{0}".format(self.rook_env.cluster_name),
body=patch)
except ApiException as e:
log.exception("API exception: {0}".format(e))
return "Updated NFS server count for {0} to {1}".format(svc_id, newcount)
def add_osds(self, drive_group, all_hosts):
- # type: (orchestrator.DriveGroupSpec, List[str]) -> None
+ # type: (orchestrator.DriveGroupSpec, List[str]) -> str
"""
Rook currently (0.8) can only do single-drive OSDs, so we
treat all drive groups as just a list of individual OSDs.
# storeType: bluestore
current_cluster = self.rook_api_get(
- "cephclusters/{0}".format(self.cluster_name))
+ "cephclusters/{0}".format(self.rook_env.cluster_name))
patch = []
try:
self.rook_api_patch(
- "cephclusters/{0}".format(self.cluster_name),
+ "cephclusters/{0}".format(self.rook_env.cluster_name),
body=patch)
except ApiException as e:
log.exception("API exception: {0}".format(e))