def _fetch(self) -> str:
""" Execute the requested api method as a one-off fetch"""
response = self.api_func(**self.kwargs)
- # try-except pattern because customObjectApi objects aren't subscriptable
- try:
- metadata = response['metadata']
- self._items = {item['metadata']['name']: item for item in response['items']}
- log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
- return metadata['resourceVersion']
- except TypeError:
- metadata = response.metadata
- self._items = {item.metadata.name: item for item in response.items}
- log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
- return metadata.resource_version
+ metadata = response.metadata
+ self._items = {item.metadata.name: item for item in response.items}
+ log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
+ return metadata.resource_version
@property
def items(self) -> Iterable[T]:
return self._items.values()
+ def get_item_name(self, item: Any) -> Any:
+ try:
+ return item.metadata.name
+ except AttributeError:
+ raise AttributeError(
+ "{} doesn't contain a metadata.name. Unable to track changes".format(
+ self.api_func))
+
@threaded
def _watch(self, res_ver: Optional[str]) -> None:
""" worker thread that runs the kubernetes watch """
**self.kwargs):
self.health = ''
item = event['object']
- try:
- name = item['metadata']['name']
- except AttributeError:
- raise AttributeError(
- "{} doesn't contain a metadata.name. Unable to track changes".format(
- self.api_func))
- except TypeError:
- name = item.metadata.name
+ name = self.get_item_name(item)
log.info('{} event: {}'.format(event['type'], name))
self.exception = e
raise
+class KubernetesCustomResource(KubernetesResource):
+ def _fetch(self) -> str:
+ response = self.api_func(**self.kwargs)
+ metadata = response['metadata']
+ self._items = {item['metadata']['name']: item for item in response['items']}
+ log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
+ return metadata['resourceVersion']
+
+ def get_item_name(self, item: Any) -> Any:
+ try:
+ return item['metadata']['name']
+ except AttributeError:
+ raise AttributeError(
+ "{} doesn't contain a metadata.name. Unable to track changes".format(
+ self.api_func))
class RookCluster(object):
# import of client.CoreV1Api must be optional at import time.