self._k8s_StorageV1_api: Optional[client.StorageV1Api] = None
self._rook_cluster: Optional[RookCluster] = None
self._rook_env = RookEnv()
+ self._k8s_AppsV1_api: Optional[client.AppsV1Api] = None
self.storage_class = self.get_module_option('storage_class')
self._shutdown = threading.Event()
self._k8s_BatchV1_api = client.BatchV1Api()
self._k8s_CustomObjects_api = client.CustomObjectsApi()
self._k8s_StorageV1_api = client.StorageV1Api()
+ self._k8s_AppsV1_api = client.AppsV1Api()
try:
# XXX mystery hack -- I need to do an API call from
self._k8s_BatchV1_api,
self._k8s_CustomObjects_api,
self._k8s_StorageV1_api,
+ self._k8s_AppsV1_api,
self._rook_env,
self.storage_class)
raise AttributeError(
"{} doesn't contain a metadata.name. Unable to track changes".format(
self.api_func))
-
@threaded
def _watch(self, res_ver: Optional[str]) -> None:
""" worker thread that runs the kubernetes watch """
class RookCluster(object):
# import of client.CoreV1Api must be optional at import time.
# Instead allow mgr/rook to be imported anyway.
- def __init__(self, coreV1_api: 'client.CoreV1Api', batchV1_api: 'client.BatchV1Api', customObjects_api: 'client.CustomObjectsApi', storageV1_api: 'client.StorageV1Api', rook_env: 'RookEnv', storage_class: 'str'):
+ def __init__(self, coreV1_api: 'client.CoreV1Api', batchV1_api: 'client.BatchV1Api', customObjects_api: 'client.CustomObjectsApi', storageV1_api: 'client.StorageV1Api', appsV1_api: 'client.AppsV1Api', rook_env: 'RookEnv', storage_class: 'str'):
self.rook_env = rook_env # type: RookEnv
self.coreV1_api = coreV1_api # client.CoreV1Api
self.batchV1_api = batchV1_api
self.customObjects_api = customObjects_api
self.storageV1_api = storageV1_api # client.StorageV1Api
+ self.appsV1_api = appsV1_api # client.AppsV1Api
self.storage_class = storage_class # type: str
# TODO: replace direct k8s calls with Rook API calls