self.start_client()
def start_client(self) -> None:
- if not self.client:
- self.client = RedFishClient(host=self.host, username=self.username, password=self.password)
self.client.login()
+ self.start_update_loop()
def start_update_loop(self) -> None:
self.run = True
def update(self) -> None:
# this loop can have:
# - caching logic
- try:
- while self.run:
- self.log.logger.debug("waiting for a lock.")
- self.lock.acquire()
- self.log.logger.debug("lock acquired.")
- try:
- self._update_system()
- # following calls in theory can be done in parallel
- self._update_metadata()
- self._update_memory()
- self._update_power()
- self._update_fans()
- self._update_network()
- self._update_processors()
- self._update_storage()
- self.data_ready = True
- sleep(5)
- finally:
- self.lock.release()
- self.log.logger.debug("lock released.")
- # Catching 'Exception' is probably not a good idea (devel only)
- except Exception as e:
- self.log.logger.error(f"Error detected, logging out from redfish api.\n{e}")
- self.client.logout()
- raise
+ while self.run:
+ self.log.logger.debug("waiting for a lock.")
+ self.lock.acquire()
+ self.log.logger.debug("lock acquired.")
+ try:
+ self._update_system()
+ # following calls in theory can be done in parallel
+ self._update_metadata()
+ self._update_memory()
+ self._update_power()
+ self._update_fans()
+ self._update_network()
+ self._update_processors()
+ self._update_storage()
+ self.data_ready = True
+ sleep(5)
+ except RuntimeError as e:
+ self.run = False
+ self.log.logger.error(f"Error detected, trying to gracefully log out from redfish api.\n{e}")
+ self.client.logout()
+ finally:
+ self.lock.release()
+ self.log.logger.debug("lock released.")
def flush(self) -> None:
self.log.logger.info("Acquiring lock to flush data.")
@retry(retries=10, delay=2)
def _get_path(self, path: str) -> Dict:
- result = self.client.get_path(path)
+ try:
+ result = self.client.get_path(path)
+ except RuntimeError:
+ raise
if result is None:
self.log.logger.error(f"The client reported an error when getting path: {path}")
raise RuntimeError(f"Could not get path: {path}")
class RedFishClient(BaseClient):
PREFIX = '/redfish/v1/'
+
def __init__(self,
host: str = "",
port: str = "443",
self.token: Dict[str, str] = {}
self.location: str = ''
- def login(self) -> Dict[str, Any]:
- self.log.logger.info(f"Logging in to {self.host} as '{self.username}'")
- idrac_credentials = json.dumps({"UserName": self.username, "Password": self.password})
- headers = {"Content-Type": "application/json"}
+ def login(self) -> None:
+ if not self.is_logged_in():
+ self.log.logger.info("Logging in to "
+ f"{self.host} as '{self.username}'")
+ idrac_credentials = json.dumps({"UserName": self.username,
+ "Password": self.password})
+ headers = {"Content-Type": "application/json"}
+
+ try:
+ _headers, _data, _status_code = self.query(data=idrac_credentials,
+ headers=headers,
+ endpoint='/redfish/v1/SessionService/Sessions/')
+ if _status_code != 201:
+ self.log.logger.error(f"Can't log in to {self.host} as '{self.username}': {_status_code}")
+ raise RuntimeError
+ except URLError as e:
+ msg = f"Can't log in to {self.host} as '{self.username}': {e}"
+ self.log.logger.error(msg)
+ raise RuntimeError
+ self.token = {"X-Auth-Token": _headers['X-Auth-Token']}
+ self.location = _headers['Location']
+ def is_logged_in(self) -> bool:
+ self.log.logger.debug(f"Checking token validity for {self.host}")
+ if not self.location or not self.token.get('X-Auth-Token'):
+ self.log.logger.debug(f"No token found for {self.host}.")
+ return False
+ headers = {"X-Auth-Token": self.token['X-Auth-Token']}
try:
- _headers, _data = self.query(data=idrac_credentials,
- headers=headers,
- endpoint='/redfish/v1/SessionService/Sessions/')
+ _headers, _data, _status_code = self.query(headers=headers,
+ endpoint=self.location)
except URLError as e:
- self.log.logger.error(f"Can't log in to {self.host} as '{self.username}'.\n{e}")
- return {}
- self.token = {"X-Auth-Token": _headers['X-Auth-Token']}
- self.location = _headers['Location']
-
- return json.loads(_data)
+ self.log.logger.error("Can't check token "
+ f"validity for {self.host}: {e}")
+ raise RuntimeError
+ return _status_code == 200
def logout(self) -> Dict[str, Any]:
try:
- _, _data = self.query(method='DELETE', headers=self.token, endpoint=self.location)
- except URLError as e:
+ _, _data, _status_code = self.query(method='DELETE',
+ headers=self.token,
+ endpoint=self.location)
+ except URLError:
self.log.logger.error(f"Can't log out from {self.host}")
return {}
if self.PREFIX not in path:
path = f"{self.PREFIX}{path}"
try:
- _, result = self.query(headers=self.token, endpoint=path)
+ _, result, _status_code = self.query(headers=self.token,
+ endpoint=path)
result_json = json.loads(result)
return result_json
except URLError as e:
self.log.logger.error(f"Can't get path {path}:\n{e}")
- return {}
+ raise RuntimeError
def query(self,
data: Optional[str] = None,
headers: Dict[str, str] = {},
method: Optional[str] = None,
- endpoint: str = '') -> Tuple[Dict[str, str], str]:
+ endpoint: str = '',
+ timeout: int = 10) -> Tuple[Dict[str, str], str, int]:
url = f'{self.host}{endpoint}'
# ssl_ctx = ssl.create_default_context()
_data = bytes(data, 'ascii') if data else None
try:
req = Request(url, _data, headers=headers, method=method)
- send_time = time.monotonic()
- with urlopen(req, context=ssl_ctx) as response:
+ with urlopen(req, context=ssl_ctx, timeout=timeout) as response:
response_str = response.read()
response_headers = response.headers
- response_json = json.loads(response_str)
- total_request_time = datetime.timedelta(seconds=(time.monotonic() - send_time)).total_seconds()
- except Exception as e:
- self.log.logger.error(f"{e}")
+ except URLError as e:
+ self.log.logger.debug(f"{e}")
raise
- return response_headers, response_str
+ return response_headers, response_str, response.status