self._initialized = threading.Event()
self._k8s_CoreV1_api: Optional[client.CoreV1Api] = None
self._k8s_BatchV1_api: Optional[client.BatchV1Api] = None
+ self._k8s_CustomObjects_api: Optional[client.CustomObjectsApi] = None
self._rook_cluster: Optional[RookCluster] = None
self._rook_env = RookEnv()
self._k8s_CoreV1_api = client.CoreV1Api()
self._k8s_BatchV1_api = client.BatchV1Api()
+ self._k8s_CustomObjects_api = client.CustomObjectsApi()
try:
# XXX mystery hack -- I need to do an API call from
self._rook_cluster = RookCluster(
self._k8s_CoreV1_api,
self._k8s_BatchV1_api,
+ self._k8s_CustomObjects_api,
self._rook_env)
self._initialized.set()
class RookCluster(object):
# import of client.CoreV1Api must be optional at import time.
# Instead allow mgr/rook to be imported anyway.
- def __init__(self, coreV1_api: 'client.CoreV1Api', batchV1_api: 'client.BatchV1Api', rook_env: 'RookEnv'):
+ def __init__(self, coreV1_api: 'client.CoreV1Api', batchV1_api: 'client.BatchV1Api', customObjects_api: 'client.CustomObjectsApi', rook_env: 'RookEnv'):
self.rook_env = rook_env # type: RookEnv
self.coreV1_api = coreV1_api # client.CoreV1Api
self.batchV1_api = batchV1_api
+ self.customObjects_api = customObjects_api
# TODO: replace direct k8s calls with Rook API calls
# when they're implemented