from ceph.deployment import inventory
from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec
+from typing import List, Dict, Optional, Callable, Any, TypeVar, Tuple
+
try:
- from typing import List, Dict, Optional, Callable, Any
from ceph.deployment.drive_group import DriveGroupSpec
except ImportError:
pass # just for type checking
# https://github.com/kubernetes-client/python/issues/895
from kubernetes.client.models.v1_container_image import V1ContainerImage
- def names(self, names):
+ def names(self: Any, names: Any) -> None:
self._names = names
V1ContainerImage.names = V1ContainerImage.names.setter(names)
from .rook_cluster import RookCluster
+T = TypeVar('T')
+FuncT = TypeVar('FuncT', bound=Callable)
+ServiceSpecT = TypeVar('ServiceSpecT', bound=ServiceSpec)
+
+
-class RookCompletion(orchestrator.Completion):
- def evaluate(self):
+class RookCompletion(orchestrator.Completion[T]):
+ def evaluate(self) -> None:
self.finalize(None)
def deferred_read(f):
- # type: (Callable) -> Callable[..., RookCompletion]
+ # type: (Callable[..., T]) -> Callable[..., RookCompletion[T]]
+
+ # See https://stackoverflow.com/questions/65936408/typing-function-when-decorator-change-generic-return-type
"""
Decorator to make RookOrchestrator methods return
a completion object that executes themselves.
"""
@functools.wraps(f)
- def wrapper(*args, **kwargs):
+ def wrapper(*args: Any, **kwargs: Any) -> RookCompletion[T]:
return RookCompletion(on_complete=lambda _: f(*args, **kwargs))
return wrapper
-def write_completion(on_complete, # type: Callable
+def write_completion(on_complete, # type: Callable[[], T]
message, # type: str
- mgr,
+ mgr: 'RookOrchestrator',
calc_percent=None # type: Optional[Callable[[], RookCompletion]]
):
- # type: (...) -> RookCompletion
+ # type: (...) -> RookCompletion[T]
return RookCompletion.with_progress(
message=message,
mgr=mgr,
class RookEnv(object):
- def __init__(self):
+ def __init__(self) -> None:
# POD_NAMESPACE already exist for Rook 0.9
self.namespace = os.environ.get('POD_NAMESPACE', 'rook-ceph')
self.crd_version = os.environ.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
self.api_name = "ceph.rook.io/" + self.crd_version
- def api_version_match(self):
+ def api_version_match(self) -> bool:
return self.crd_version == 'v1'
- def has_namespace(self):
+ def has_namespace(self) -> bool:
return 'POD_NAMESPACE' in os.environ
p.evaluate()
@staticmethod
- def can_run():
+ def can_run() -> Tuple[bool, str]:
if not kubernetes_imported:
return False, "`kubernetes` python module not found"
if not RookEnv().api_version_match():
return False, "Rook version unsupported."
return True, ''
- def available(self):
+ def available(self) -> Tuple[bool, str]:
if not kubernetes_imported:
return False, "`kubernetes` python module not found"
elif not self._rook_env.has_namespace():
else:
return True, ""
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
super(RookOrchestrator, self).__init__(*args, **kwargs)
self._initialized = threading.Event()
- self._k8s_CoreV1_api = None
- self._k8s_BatchV1_api = None
- self._rook_cluster = None
+ self._k8s_CoreV1_api: Optional[client.CoreV1Api] = None
+ self._k8s_BatchV1_api: Optional[client.BatchV1Api] = None
+ self._rook_cluster: Optional[RookCluster] = None
self._rook_env = RookEnv()
self._shutdown = threading.Event()
self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
- def shutdown(self):
+ def shutdown(self) -> None:
self._shutdown.set()
@property
assert self._rook_cluster is not None
return self._rook_cluster
- def serve(self):
+ def serve(self) -> None:
# For deployed clusters, we should always be running inside
# a Rook cluster. For development convenience, also support
# running outside (reading ~/.kube config)
self._shutdown.wait(5)
- def cancel_completions(self):
+ def cancel_completions(self) -> None:
for p in self.all_progress_references:
p.fail()
self.all_progress_references.clear()
@deferred_read
- def get_inventory(self, host_filter=None, refresh=False):
+ def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
host_list = None
if host_filter and host_filter.hosts:
# Explicit host list
# it into RookCluster.get_discovered_devices
raise NotImplementedError()
- devs = self.rook_cluster.get_discovered_devices(host_list)
+ discovered_devs = self.rook_cluster.get_discovered_devices(host_list)
result = []
- for host_name, host_devs in devs.items():
+ for host_name, host_devs in discovered_devs.items():
devs = []
for d in host_devs:
if 'cephVolumeData' in d and d['cephVolumeData']:
return [orchestrator.HostSpec(n) for n in self.rook_cluster.get_node_names()]
@deferred_read
- def describe_service(self, service_type=None, service_name=None,
- refresh=False):
+ def describe_service(self,
+ service_type: Optional[str] = None,
+ service_name: Optional[str] = None,
+ refresh: bool = False) -> List[orchestrator.ServiceDescription]:
now = datetime.datetime.utcnow()
# CephCluster
service.container_image_id = dd.container_image_id
if not service.container_image_name:
service.container_image_name = dd.container_image_name
- if not service.last_refresh or not dd.last_refresh or dd.last_refresh < service.last_refresh:
+ if service.last_refresh is None or not dd.last_refresh or dd.last_refresh < service.last_refresh:
service.last_refresh = dd.last_refresh
- if not service.created or dd.created < service.created:
+ if service.created is None or dd.created is None or dd.created < service.created:
service.created = dd.created
return [v for k, v in spec.items()]
@deferred_read
- def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None,
- refresh=False):
+ def list_daemons(self,
+ service_name: Optional[str] = None,
+ daemon_type: Optional[str] = None,
+ daemon_id: Optional[str] = None,
+ host: Optional[str] = None,
+ refresh: bool = False) -> List[orchestrator.DaemonDescription]:
return self._list_daemons(service_name=service_name,
daemon_type=daemon_type,
daemon_id=daemon_id,
host=host,
refresh=refresh)
- def _list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None,
- refresh=False):
+ def _list_daemons(self,
+ service_name: Optional[str] = None,
+ daemon_type: Optional[str] = None,
+ daemon_id: Optional[str] = None,
+ host: Optional[str] = None,
+ refresh: bool = False) -> List[orchestrator.DaemonDescription]:
pods = self.rook_cluster.describe_pods(daemon_type, daemon_id, host)
self.log.debug('pods %s' % pods)
result = []
return result
- def _service_add_decorate(self, typename, spec, func):
+ def _service_add_decorate(self, typename: str, spec: ServiceSpecT, func: Callable[[ServiceSpecT], T]) -> RookCompletion[T]:
return write_completion(
on_complete=lambda : func(spec),
message="Creating {} services for {}".format(typename, spec.service_id),
mgr=self
)
- def _service_rm_decorate(self, typename, name, func):
+ def _service_rm_decorate(self, typename: str, name: str, func: Callable[[], T]) -> RookCompletion[T]:
return write_completion(
on_complete=lambda : func(),
message="Removing {} services for {}".format(typename, name),
mgr=self
)
- def remove_service(self, service_name):
+ def remove_service(self, service_name: str) -> RookCompletion[str]:
service_type, service_name = service_name.split('.', 1)
if service_type == 'mds':
return self._service_rm_decorate(
return self._service_rm_decorate(
'NFS', service_name, lambda: self.rook_cluster.rm_service('cephnfses', service_name)
)
+ else:
+ raise orchestrator.OrchestratorError(f'Service type {service_type} not supported')
def apply_mon(self, spec):
- # type: (ServiceSpec) -> RookCompletion
+ # type: (ServiceSpec) -> RookCompletion[str]
if spec.placement.hosts or spec.placement.label:
raise RuntimeError("Host list or label is not supported by rook.")
)
def apply_mds(self, spec):
- # type: (ServiceSpec) -> RookCompletion
+ # type: (ServiceSpec) -> RookCompletion[str]
return self._service_add_decorate('MDS', spec,
self.rook_cluster.apply_filesystem)
def apply_rgw(self, spec):
- # type: (RGWSpec) -> RookCompletion
+ # type: (RGWSpec) -> RookCompletion[str]
return self._service_add_decorate('RGW', spec,
self.rook_cluster.apply_objectstore)
def apply_nfs(self, spec):
- # type: (NFSServiceSpec) -> RookCompletion
+ # type: (NFSServiceSpec) -> RookCompletion[str]
return self._service_add_decorate("NFS", spec,
self.rook_cluster.apply_nfsgw)
- def remove_daemons(self, names):
+ def remove_daemons(self, names: List[str]) -> RookCompletion[List[str]]:
return write_completion(
lambda: self.rook_cluster.remove_pods(names),
"Removing daemons {}".format(','.join(names)),
)
def create_osds(self, drive_group):
- # type: (DriveGroupSpec) -> RookCompletion
+ # type: (DriveGroupSpec) -> RookCompletion[str]
""" Creates OSDs from a drive group specification.
$: ceph orch osd create -i <dg.file>
)
@deferred_read
- def has_osds(matching_hosts):
+ def has_osds(matching_hosts: List[str]) -> bool:
# Find OSD pods on this host
pod_osd_ids = set()
from urllib3.exceptions import ProtocolError
from ceph.deployment.drive_group import DriveGroupSpec
-from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec
+from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec
from mgr_util import merge_dicts
-try:
- from typing import Optional
-except ImportError:
- pass # just for type annotations
+from typing import Optional, TypeVar, List, Callable, Any, cast, Generic, \
+ Iterable, Dict, Iterator, Type
try:
from kubernetes import client, watch
from .rook_client.ceph import cephnfs as cnfs
from .rook_client.ceph import cephobjectstore as cos
from .rook_client.ceph import cephcluster as ccl
+from .rook_client._helper import CrdClass
import orchestrator
try:
from rook.module import RookEnv
- from typing import List, Dict
except ImportError:
pass # just used for type checking.
+
+T = TypeVar('T')
+FuncT = TypeVar('FuncT', bound=Callable)
+
+CrdClassT = TypeVar('CrdClassT', bound=CrdClass)
+
+
log = logging.getLogger(__name__)
-def _urllib3_supports_read_chunked():
+def __urllib3_supports_read_chunked() -> bool:
# There is a bug in CentOS 7 as it ships a urllib3 which is lower
# than required by kubernetes-client
try:
return False
-_urllib3_supports_read_chunked = _urllib3_supports_read_chunked()
+_urllib3_supports_read_chunked = __urllib3_supports_read_chunked()
class ApplyException(orchestrator.OrchestratorError):
"""
"""
-def threaded(f):
- def wrapper(*args, **kwargs):
+def threaded(f: Callable[..., None]) -> Callable[..., threading.Thread]:
+ def wrapper(*args: Any, **kwargs: Any) -> threading.Thread:
t = threading.Thread(target=f, args=args, kwargs=kwargs)
t.start()
return t
- return wrapper
+ return cast(Callable[..., threading.Thread], wrapper)
-class KubernetesResource(object):
- def __init__(self, api_func, **kwargs):
+class KubernetesResource(Generic[T]):
+ def __init__(self, api_func: Callable, **kwargs: Any) -> None:
"""
Generic kubernetes Resource parent class
self.api_func = api_func
# ``_items`` is accessed by different threads. I assume assignment is atomic.
- self._items = dict()
+ self._items: Dict[str, T] = dict()
self.thread = None # type: Optional[threading.Thread]
- self.exception = None
+ self.exception: Optional[Exception] = None
if not _urllib3_supports_read_chunked:
logging.info('urllib3 is too old. Fallback to full fetches')
- def _fetch(self):
+ def _fetch(self) -> str:
""" Execute the requested api method as a one-off fetch"""
response = self.api_func(**self.kwargs)
# metadata is a client.V1ListMeta object type
return metadata.resource_version
@property
- def items(self):
+ def items(self) -> Iterable[T]:
"""
Returns the items of the request.
Creates the watcher as a side effect.
return self._items.values()
@threaded
- def _watch(self, res_ver):
+ def _watch(self, res_ver: Optional[str]) -> None:
""" worker thread that runs the kubernetes watch """
self.exception = None
class RookCluster(object):
- def __init__(self, coreV1_api, batchV1_api, rook_env):
+ # import of client.CoreV1Api must be optional at import time.
+ # Instead allow mgr/rook to be imported anyway.
+ def __init__(self, coreV1_api: 'client.CoreV1Api', batchV1_api: 'client.BatchV1Api', rook_env: 'RookEnv'):
self.rook_env = rook_env # type: RookEnv
self.coreV1_api = coreV1_api # client.CoreV1Api
self.batchV1_api = batchV1_api
# TODO: replace direct k8s calls with Rook API calls
# when they're implemented
- self.inventory_maps = KubernetesResource(self.coreV1_api.list_namespaced_config_map,
+ self.inventory_maps: KubernetesResource[client.V1ConfigMapList] = KubernetesResource(self.coreV1_api.list_namespaced_config_map,
namespace=self.rook_env.operator_namespace,
label_selector="app=rook-discover")
- self.rook_pods = KubernetesResource(self.coreV1_api.list_namespaced_pod,
+ self.rook_pods: KubernetesResource[client.V1Pod] = KubernetesResource(self.coreV1_api.list_namespaced_pod,
namespace=self.rook_env.namespace,
label_selector="rook_cluster={0}".format(
self.rook_env.namespace))
- self.nodes = KubernetesResource(self.coreV1_api.list_node)
+ self.nodes: KubernetesResource[client.V1Node] = KubernetesResource(self.coreV1_api.list_node)
- def rook_url(self, path):
+ def rook_url(self, path: str) -> str:
prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
self.rook_env.crd_version, self.rook_env.namespace)
return urljoin(prefix, path)
- def rook_api_call(self, verb, path, **kwargs):
+ def rook_api_call(self, verb: str, path: str, **kwargs: Any) -> Any:
full_path = self.rook_url(path)
log.debug("[%s] %s" % (verb, full_path))
_preload_content=True,
**kwargs)
- def rook_api_get(self, path, **kwargs):
+ def rook_api_get(self, path: str, **kwargs: Any) -> Any:
return self.rook_api_call("GET", path, **kwargs)
- def rook_api_delete(self, path):
+ def rook_api_delete(self, path: str) -> Any:
return self.rook_api_call("DELETE", path)
- def rook_api_patch(self, path, **kwargs):
+ def rook_api_patch(self, path: str, **kwargs: Any) -> Any:
return self.rook_api_call("PATCH", path,
header_params={"Content-Type": "application/json-patch+json"},
**kwargs)
- def rook_api_post(self, path, **kwargs):
+ def rook_api_post(self, path: str, **kwargs: Any) -> Any:
return self.rook_api_call("POST", path, **kwargs)
- def get_discovered_devices(self, nodenames=None):
- def predicate(item):
+ def get_discovered_devices(self, nodenames: Optional[List[str]] = None) -> Dict[str, dict]:
+ def predicate(item: client.V1ConfigMapList) -> bool:
if nodenames is not None:
return item.metadata.labels['rook.io/node'] in nodenames
else:
return nodename_to_devices
- def get_nfs_conf_url(self, nfs_cluster, instance):
+ def get_nfs_conf_url(self, nfs_cluster: str, instance: str) -> Optional[str]:
#
# Fetch cephnfs object for "nfs_cluster" and then return a rados://
# URL for the instance within that cluster. If the fetch fails, just
url = "rados://{0}/{1}/conf-{2}.{3}".format(pool, namespace, nfs_cluster, instance)
return url
- def describe_pods(self, service_type, service_id, nodename):
+ def describe_pods(self,
+ service_type: Optional[str],
+ service_id: Optional[str],
+ nodename: Optional[str]) -> List[Dict[str, Any]]:
"""
Go query the k8s API about deployment, containers related to this
filesystem
return pods_summary
- def remove_pods(self, names):
+ def remove_pods(self, names: List[str]) -> List[str]:
pods = [i for i in self.rook_pods.items]
- num = 0
for p in pods:
d = p.to_dict()
daemon_type = d['metadata']['labels']['app'].replace('rook-ceph-','')
self.rook_env.namespace,
body=client.V1DeleteOptions()
)
- num += 1
- return "Removed %d pods" % num
+ return [f'Removed Pod {n}' for n in names]
- def get_node_names(self):
+ def get_node_names(self) -> List[str]:
return [i.metadata.name for i in self.nodes.items]
@contextmanager
- def ignore_409(self, what):
+ def ignore_409(self, what: str) -> Iterator[None]:
try:
yield
except ApiException as e:
else:
raise
- def apply_filesystem(self, spec: ServiceSpec) -> None:
+ def apply_filesystem(self, spec: ServiceSpec) -> str:
# TODO use spec.placement
# TODO warn if spec.extended has entries we don't kow how
# to action.
)
)
)
+ assert spec.service_id is not None
return self._create_or_patch(
cfs.CephFilesystem, 'cephfilesystems', spec.service_id,
_update_fs, _create_fs)
- def apply_objectstore(self, spec):
+ def apply_objectstore(self, spec: RGWSpec) -> str:
# FIXME: service_id is $realm.$zone, but rook uses realm
# $crname and zone $crname. The '.' will confuse kubernetes.
# For now, assert that realm==zone.
+ assert spec.service_id is not None
(realm, zone) = spec.service_id.split('.', 1)
assert realm == zone
assert spec.subcluster is None
cos.CephObjectStore, 'cephobjectstores', name,
_update_zone, _create_zone)
- def apply_nfsgw(self, spec: NFSServiceSpec) -> None:
+ def apply_nfsgw(self, spec: NFSServiceSpec) -> str:
# TODO use spec.placement
# TODO warn if spec.extended has entries we don't kow how
# to action.
return rook_nfsgw
+ assert spec.service_id is not None
return self._create_or_patch(cnfs.CephNFS, 'cephnfses', spec.service_id,
_update_nfs, _create_nfs)
- def rm_service(self, rooktype, service_id):
+ def rm_service(self, rooktype: str, service_id: str) -> str:
objpath = "{0}/{1}".format(rooktype, service_id)
else:
raise
- def can_create_osd(self):
+ return f'Removed {objpath}'
+
+ def can_create_osd(self) -> bool:
current_cluster = self.rook_api_get(
"cephclusters/{0}".format(self.rook_env.cluster_name))
use_all_nodes = current_cluster['spec'].get('useAllNodes', False)
# to anything we put in 'nodes', so can't do OSD creation.
return not use_all_nodes
- def node_exists(self, node_name):
+ def node_exists(self, node_name: str) -> bool:
return node_name in self.get_node_names()
- def update_mon_count(self, newcount):
+ def update_mon_count(self, newcount: Optional[int]) -> str:
def _update_mon_count(current, new):
# type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
+ if newcount is None:
+ raise orchestrator.OrchestratorError('unable to set mon count to None')
new.spec.mon.count = newcount
return new
return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count)
return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds)
- def _patch(self, crd, crd_name, cr_name, func):
+ def _patch(self, crd: Type, crd_name: str, cr_name: str, func: Callable[[CrdClassT, CrdClassT], CrdClassT]) -> str:
current_json = self.rook_api_get(
"{}/{}".format(crd_name, cr_name)
)
return "Success"
- def _create_or_patch(self, crd, crd_name, cr_name, update_func, create_func):
+ def _create_or_patch(self,
+ crd: Type,
+ crd_name: str,
+ cr_name: str,
+ update_func: Callable[[CrdClassT], CrdClassT],
+ create_func: Callable[[], CrdClassT]) -> str:
try:
current_json = self.rook_api_get(
"{}/{}".format(crd_name, cr_name)