import redfish
import sys
from util import Logger
+from typing import Dict
log = Logger(__name__)
PREFIX = '/redfish/v1'
- def __init__(self, host, username, password):
+ def __init__(self,
+ host: str,
+ username: str,
+ password: str) -> None:
log.logger.info("redfish client initialization...")
self.host = host
self.username = username
self.password = password
- self.redfish_obj = None
+ self.redfish_obj: 'redfish.redfish_client' = None
- def login(self):
+ def login(self) -> 'redfish.redfish_client':
self.redfish_obj = redfish.redfish_client(base_url=self.host,
username=self.username,
password=self.password,
log.logger.error(f"Server not reachable or does not support RedFish:\n{e}")
sys.exit(1)
- def get_path(self, path):
+ def get_path(self, path: str) -> Dict:
try:
if self.PREFIX not in path:
path = f"{self.PREFIX}{path}"
log.logger.error(f"Error detected.\n{e}")
pass
- def logout(self):
+ def logout(self) -> None:
log.logger.info('logging out...')
self.redfish_obj.logout()
from redfish_system import RedfishSystem
from util import Logger, normalize_dict
+from typing import Dict, Any
log = Logger(__name__)
class RedfishDell(RedfishSystem):
- def __init__(self, **kw):
+ def __init__(self, **kw: Any) -> None:
if kw.get('system_endpoint') is None:
kw['system_endpoint'] = '/Systems/System.Embedded.1'
super().__init__(**kw)
- def _update_network(self):
+ def _update_network(self) -> None:
net_path = self._system['EthernetInterfaces']['@odata.id']
log.logger.info("Updating network")
network_info = self.client.get_path(net_path)
self._system['network'] = {}
- result = dict()
+ result: Dict[str, Dict[str, Dict]] = dict()
for interface in network_info['Members']:
interface_path = interface['@odata.id']
interface_info = self.client.get_path(interface_path)
result[interface_id]['status'] = interface_info['Status']
self._system['network'] = normalize_dict(result)
- def _update_processors(self):
+ def _update_processors(self) -> None:
cpus_path = self._system['Processors']['@odata.id']
log.logger.info("Updating processors")
cpus_info = self.client.get_path(cpus_path)
self._system['processors'] = {}
- result = dict()
+ result: Dict[str, Dict[str, Dict]] = dict()
for cpu in cpus_info['Members']:
cpu_path = cpu['@odata.id']
cpu_info = self.client.get_path(cpu_path)
result[cpu_id]['manufacturer'] = cpu_info['Manufacturer']
self._system['processors'] = normalize_dict(result)
- def _update_storage(self):
+ def _update_storage(self) -> None:
storage_path = self._system['Storage']['@odata.id']
log.logger.info("Updating storage")
storage_info = self.client.get_path(storage_path)
- result = dict()
+ result: Dict[str, Dict[str, Dict]] = dict()
for storage in storage_info['Members']:
entity_path = storage['@odata.id']
entity_info = self.client.get_path(entity_path)
result[drive_id]['location'] = drive_info['PhysicalLocation']
self._system['storage'] = normalize_dict(result)
- def _update_metadata(self):
+ def _update_metadata(self) -> None:
log.logger.info("Updating metadata")
pass
- def _update_memory(self):
+ def _update_memory(self) -> None:
log.logger.info("Updating memory")
pass
- def _update_power(self):
+ def _update_power(self) -> None:
log.logger.info("Updating power")
pass
from threading import Thread, Lock
from time import sleep
from util import Logger
+from typing import Dict, Any
log = Logger(__name__)
class RedfishSystem(System):
- def __init__(self, **kw):
+ def __init__(self, **kw: Any) -> None:
super().__init__(**kw)
- self.host = kw.get('host')
- self.username = kw.get('username')
- self.password = kw.get('password')
+ self.host: str = kw['host']
+ self.username: str = kw['username']
+ self.password: str = kw['password']
self.system_endpoint = kw.get('system_endpoint', '/Systems/1')
log.logger.info(f"redfish system initialization, host: {self.host}, user: {self.username}")
self.client = RedFishClient(self.host, self.username, self.password)
- self._system = {}
- self.run = False
- self.thread = None
+ self._system: Dict[str, Dict[str, Any]] = {}
+ self.run: bool = False
+ self.thread: Thread
self.start_client()
- self.data_ready = False
- self.previous_data = {}
- self.lock = Lock()
+ self.data_ready: bool = False
+ self.previous_data: Dict = {}
+ self.lock: Lock = Lock()
- def start_client(self):
+ def start_client(self) -> None:
log.logger.info(f"redfish system initialization, host: {self.host}, user: {self.username}")
self.client = RedFishClient(self.host, self.username, self.password)
self.client.login()
- def get_system(self):
+ def get_system(self) -> Dict[str, Dict[str, Dict]]:
result = {
'storage': self.get_storage(),
'processors': self.get_processors(),
}
return result
- def get_status(self):
+ def get_status(self) -> Dict[str, Dict[str, Dict]]:
return self._system['status']
- def get_metadata(self):
+ def get_metadata(self) -> Dict[str, Dict[str, Dict]]:
return self._system['metadata']
- def get_memory(self):
+ def get_memory(self) -> Dict[str, Dict[str, Dict]]:
return self._system['memory']
- def get_power(self):
+ def get_power(self) -> Dict[str, Dict[str, Dict]]:
return self._system['power']
- def get_processors(self):
+ def get_processors(self) -> Dict[str, Dict[str, Dict]]:
return self._system['processors']
- def get_network(self):
+ def get_network(self) -> Dict[str, Dict[str, Dict]]:
return self._system['network']
- def get_storage(self):
+ def get_storage(self) -> Dict[str, Dict[str, Dict]]:
return self._system['storage']
- def _update_system(self):
+ def _update_system(self) -> None:
redfish_system = self.client.get_path(self.system_endpoint)
self._system = {**redfish_system, **self._system}
- def _update_metadata(self):
+ def _update_metadata(self) -> None:
raise NotImplementedError()
- def _update_memory(self):
+ def _update_memory(self) -> None:
raise NotImplementedError()
- def _update_power(self):
+ def _update_power(self) -> None:
raise NotImplementedError()
- def _update_network(self):
+ def _update_network(self) -> None:
raise NotImplementedError()
- def _update_processors(self):
+ def _update_processors(self) -> None:
raise NotImplementedError()
- def _update_storage(self):
+ def _update_storage(self) -> None:
raise NotImplementedError()
- def start_update_loop(self):
+ def start_update_loop(self) -> None:
self.run = True
self.thread = Thread(target=self.update)
self.thread.start()
- def stop_update_loop(self):
+ def stop_update_loop(self) -> None:
self.run = False
self.thread.join()
- def update(self):
+ def update(self) -> None:
# this loop can have:
# - caching logic
try:
import requests
import time
from util import Logger
+from typing import Any
log = Logger(__name__)
class Reporter:
- def __init__(self, system, observer_url):
+ def __init__(self, system: Any, observer_url: str) -> None:
self.system = system
self.observer_url = observer_url
self.finish = False
- def stop(self):
+ def stop(self) -> None:
self.finish = True
self.thread.join()
- def run(self):
+ def run(self) -> None:
self.thread = Thread(target=self.loop)
self.thread.start()
- def loop(self):
+ def loop(self) -> None:
while not self.finish:
# Any logic to avoid sending the all the system
# information every loop can go here. In a real
from redfish_dell import RedfishDell
from reporter import Reporter
from util import Config, Logger
+from typing import Dict
import sys
# for devel purposes
exposed = True
@cherrypy.tools.json_out()
- def GET(self):
+ def GET(self) -> Dict[str, Dict[str, Dict]]:
return {'memory': system.get_memory()}
exposed = True
@cherrypy.tools.json_out()
- def GET(self):
+ def GET(self) -> Dict[str, Dict[str, Dict]]:
return {'network': system.get_network()}
exposed = True
@cherrypy.tools.json_out()
- def GET(self):
+ def GET(self) -> Dict[str, Dict[str, Dict]]:
return {'processors': system.get_processors()}
exposed = True
@cherrypy.tools.json_out()
- def GET(self):
+ def GET(self) -> Dict[str, Dict[str, Dict]]:
return {'storage': system.get_storage()}
exposed = True
@cherrypy.tools.json_out()
- def GET(self):
+ def GET(self) -> Dict[str, Dict[str, Dict]]:
return {'status': system.get_status()}
class Shutdown:
exposed = True
- def POST(self):
+ def POST(self) -> str:
_stop()
cherrypy.engine.exit()
return 'Server shutdown...'
-def _stop():
+def _stop() -> None:
system.stop_update_loop()
system.client.logout()
reporter_agent.stop()
class Start:
exposed = True
- def POST(self):
+ def POST(self) -> str:
system.start_client()
system.start_update_loop()
reporter_agent.run()
class Stop:
exposed = True
- def POST(self):
+ def POST(self) -> str:
_stop()
return 'node-proxy daemon stopped'
class ConfigReload:
exposed = True
- def __init__(self, config):
+ def __init__(self, config: cherrypy.config) -> None:
self.config = config
- def POST(self):
+ def POST(self) -> str:
self.config['node_proxy'].reload()
return 'node-proxy config reloaded'
stop = Stop()
config_reload = ConfigReload(cherrypy.config)
- def GET(self):
+ def GET(self) -> str:
return 'use /system'
if __name__ == '__main__':
cherrypy.config.update({
'node_proxy': config,
- 'server.socket_port': config.server['port']
+ 'server.socket_port': config.__dict__['server']['port']
})
c = {'/': {
'request.methods_with_bodies': ('POST', 'PUT', 'PATCH'),
+from util import Config
+from typing import Dict, Any
+
+
class System:
- def __init__(self, **kw):
- self._system = {}
- self.config = kw['config']
+ def __init__(self, **kw: Any) -> None:
+ self._system: Dict = {}
+ self.config: Config = kw['config']
- def get_system(self):
+ def get_system(self) -> Dict[str, Dict[str, Dict]]:
raise NotImplementedError()
- def get_status(self):
+ def get_status(self) -> Dict[str, Dict[str, Dict]]:
raise NotImplementedError()
- def get_metadata(self):
+ def get_metadata(self) -> Dict[str, Dict[str, Dict]]:
raise NotImplementedError()
- def get_processors(self):
+ def get_processors(self) -> Dict[str, Dict[str, Dict]]:
raise NotImplementedError()
- def get_memory(self):
+ def get_memory(self) -> Dict[str, Dict[str, Dict]]:
raise NotImplementedError()
- def get_power(self):
+ def get_power(self) -> Dict[str, Dict[str, Dict]]:
raise NotImplementedError()
- def get_network(self):
+ def get_network(self) -> Dict[str, Dict[str, Dict]]:
raise NotImplementedError()
- def get_storage(self):
+ def get_storage(self) -> Dict[str, Dict[str, Dict]]:
raise NotImplementedError()
import logging
import yaml
import os
+from typing import Dict, List
-def normalize_dict(test_dict):
+def normalize_dict(test_dict: Dict) -> Dict:
res = dict()
for key in test_dict.keys():
if isinstance(test_dict[key], dict):
class Config:
def __init__(self,
- config_file='/etc/ceph/node-proxy.yaml',
- default_config={}):
+ config_file: str = '/etc/ceph/node-proxy.yaml',
+ default_config: Dict[str, Any] = {}) -> None:
self.config_file = config_file
self.default_config = default_config
self.load_config()
- def load_config(self):
+ def load_config(self) -> None:
if os.path.exists(self.config_file):
with open(self.config_file, 'r') as f:
self.config = yaml.safe_load(f)
# TODO: need to be improved
for _l in Logger._Logger:
- _l.logger.setLevel(self.logging['level'])
- _l.logger.handlers[0].setLevel(self.logging['level'])
+ _l.logger.setLevel(self.logging['level']) # type: ignore
+ _l.logger.handlers[0].setLevel(self.logging['level']) # type: ignore
- def reload(self, config_file=None):
+ def reload(self, config_file: str = '') -> None:
if config_file != '':
self.config_file = config_file
self.load_config()
class Logger:
- _Logger = []
+ _Logger: List['Logger'] = []
- def __init__(self, name, level=logging.INFO):
+ def __init__(self, name: str, level: int = logging.INFO):
self.name = name
self.level = level
Logger._Logger.append(self)
self.logger = self.get_logger()
- def get_logger(self):
+ def get_logger(self) -> logging.Logger:
logger = logging.getLogger(self.name)
logger.setLevel(self.level)
handler = logging.StreamHandler()