-__version__ = '1.0.0'
-__release__ = 'squid'
+__version__ = "1.0.0"
+__release__ = "tentacle"
-import cherrypy # type: ignore
+from threading import Event, Thread
+from typing import TYPE_CHECKING, Any, Dict, List, Optional
from urllib.error import HTTPError
+
+import cherrypy # type: ignore
from cherrypy._cpserver import Server # type: ignore
-from threading import Thread, Event
-from typing import Dict, Any, List
+
from ceph_node_proxy.protocols import SystemBackend
-from ceph_node_proxy.util import Config, get_logger, write_tmp_file
from ceph_node_proxy.reporter import Reporter
-from typing import TYPE_CHECKING, Optional
+from ceph_node_proxy.util import Config, get_logger, write_tmp_file
if TYPE_CHECKING:
from ceph_node_proxy.main import NodeProxyManager
# Admin endpoints (start/stop/reload) are not mounted by default.
# To enable, mount: cherrypy.tree.mount(Admin(api), '/admin', config=config)
@cherrypy.tools.auth_basic(on=True)
-@cherrypy.tools.allow(methods=['PUT'])
+@cherrypy.tools.allow(methods=["PUT"])
@cherrypy.tools.json_out()
-class Admin():
- def __init__(self, api: 'API') -> None:
+class Admin:
+ def __init__(self, api: "API") -> None:
self.api = api
@cherrypy.expose
def start(self) -> Dict[str, str]:
self.api.backend.start()
self.api.reporter.run()
- return {'ok': 'node-proxy daemon started'}
+ return {"ok": "node-proxy daemon started"}
@cherrypy.expose
def reload(self) -> Dict[str, str]:
self.api.config.reload()
- return {'ok': 'node-proxy config reloaded'}
+ return {"ok": "node-proxy config reloaded"}
def _stop(self) -> None:
self.api.backend.shutdown()
@cherrypy.expose
def stop(self) -> Dict[str, str]:
self._stop()
- return {'ok': 'node-proxy daemon stopped'}
+ return {"ok": "node-proxy daemon stopped"}
@cherrypy.expose
def shutdown(self) -> Dict[str, str]:
self._stop()
cherrypy.engine.exit()
- return {'ok': 'Server shutdown.'}
+ return {"ok": "Server shutdown."}
@cherrypy.expose
def flush(self) -> Dict[str, str]:
self.api.backend.flush()
- return {'ok': 'node-proxy data flushed'}
+ return {"ok": "node-proxy data flushed"}
class API(Server):
- def __init__(self,
- backend: SystemBackend,
- reporter: 'Reporter',
- config: 'Config',
- addr: str = '0.0.0.0',
- port: int = 0) -> None:
+ def __init__(
+ self,
+ backend: SystemBackend,
+ reporter: "Reporter",
+ config: "Config",
+ addr: str = "0.0.0.0",
+ port: int = 0,
+ ) -> None:
super().__init__()
self.log = get_logger(__name__)
self.backend = backend
self.reporter = reporter
self.config = config
- self.socket_port = port if port else self.config.get('api', {}).get('port', 9456)
+ self.socket_port = (
+ port if port else self.config.get("api", {}).get("port", 9456)
+ )
self.socket_host = addr
self.subscribe()
@cherrypy.expose
- @cherrypy.tools.allow(methods=['GET'])
+ @cherrypy.tools.allow(methods=["GET"])
@cherrypy.tools.json_out()
def memory(self) -> Dict[str, Any]:
- return {'memory': self.backend.get_memory()}
+ return {"memory": self.backend.get_memory()}
@cherrypy.expose
- @cherrypy.tools.allow(methods=['GET'])
+ @cherrypy.tools.allow(methods=["GET"])
@cherrypy.tools.json_out()
def network(self) -> Dict[str, Any]:
- return {'network': self.backend.get_network()}
+ return {"network": self.backend.get_network()}
@cherrypy.expose
- @cherrypy.tools.allow(methods=['GET'])
+ @cherrypy.tools.allow(methods=["GET"])
@cherrypy.tools.json_out()
def processors(self) -> Dict[str, Any]:
- return {'processors': self.backend.get_processors()}
+ return {"processors": self.backend.get_processors()}
@cherrypy.expose
- @cherrypy.tools.allow(methods=['GET'])
+ @cherrypy.tools.allow(methods=["GET"])
@cherrypy.tools.json_out()
def storage(self) -> Dict[str, Any]:
- return {'storage': self.backend.get_storage()}
+ return {"storage": self.backend.get_storage()}
@cherrypy.expose
- @cherrypy.tools.allow(methods=['GET'])
+ @cherrypy.tools.allow(methods=["GET"])
@cherrypy.tools.json_out()
def power(self) -> Dict[str, Any]:
- return {'power': self.backend.get_power()}
+ return {"power": self.backend.get_power()}
@cherrypy.expose
- @cherrypy.tools.allow(methods=['GET'])
+ @cherrypy.tools.allow(methods=["GET"])
@cherrypy.tools.json_out()
def fans(self) -> Dict[str, Any]:
- return {'fans': self.backend.get_fans()}
+ return {"fans": self.backend.get_fans()}
@cherrypy.expose
- @cherrypy.tools.allow(methods=['GET'])
+ @cherrypy.tools.allow(methods=["GET"])
@cherrypy.tools.json_out()
def firmwares(self) -> Dict[str, Any]:
- return {'firmwares': self.backend.get_firmwares()}
+ return {"firmwares": self.backend.get_firmwares()}
- def _cp_dispatch(self, vpath: List[str]) -> 'API':
- if vpath[0] == 'led' and len(vpath) > 1: # /led/{type}/{id}
+ def _cp_dispatch(self, vpath: List[str]) -> "API":
+ if vpath[0] == "led" and len(vpath) > 1: # /led/{type}/{id}
_type = vpath[1]
- cherrypy.request.params['type'] = _type
+ cherrypy.request.params["type"] = _type
vpath.pop(1) # /led/{id} or # /led
- if _type == 'drive' and len(vpath) > 1: # /led/{id}
+ if _type == "drive" and len(vpath) > 1: # /led/{id}
_id = vpath[1]
vpath.pop(1) # /led
- cherrypy.request.params['id'] = _id
- vpath[0] = '_led'
+ cherrypy.request.params["id"] = _id
+ vpath[0] = "_led"
# /<endpoint>
return self
@cherrypy.expose
- @cherrypy.tools.allow(methods=['POST'])
+ @cherrypy.tools.allow(methods=["POST"])
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@cherrypy.tools.auth_basic(on=True)
def shutdown(self, **kw: Any) -> int:
data: Dict[str, bool] = cherrypy.request.json
- if 'force' not in data.keys():
+ if "force" not in data.keys():
msg = "The key 'force' wasn't passed."
self.log.debug(msg)
raise cherrypy.HTTPError(400, msg)
try:
- result: int = self.backend.shutdown_host(force=data['force'])
+ result: int = self.backend.shutdown_host(force=data["force"])
except HTTPError as e:
raise cherrypy.HTTPError(e.code, e.reason)
return result
@cherrypy.expose
- @cherrypy.tools.allow(methods=['POST'])
+ @cherrypy.tools.allow(methods=["POST"])
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@cherrypy.tools.auth_basic(on=True)
return result
@cherrypy.expose
- @cherrypy.tools.allow(methods=['GET', 'PATCH'])
+ @cherrypy.tools.allow(methods=["GET", "PATCH"])
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@cherrypy.tools.auth_basic(on=True)
def _led(self, **kw: Any) -> Dict[str, Any]:
method: str = cherrypy.request.method
- led_type: Optional[str] = kw.get('type')
- id_drive: Optional[str] = kw.get('id')
+ led_type: Optional[str] = kw.get("type")
+ id_drive: Optional[str] = kw.get("id")
result: Dict[str, Any] = dict()
if not led_type:
self.log.debug(msg)
raise cherrypy.HTTPError(400, msg)
- if led_type == 'drive':
+ if led_type == "drive":
id_drive_required = not id_drive
if id_drive_required or id_drive not in self.backend.get_storage():
- msg = 'A valid device ID must be provided.'
+ msg = "A valid device ID must be provided."
self.log.debug(msg)
raise cherrypy.HTTPError(400, msg)
try:
- if method == 'PATCH':
+ if method == "PATCH":
data: Dict[str, Any] = cherrypy.request.json
- if 'state' not in data or data['state'] not in ['on', 'off']:
+ if "state" not in data or data["state"] not in ["on", "off"]:
msg = "Invalid data. 'state' must be provided and have a valid value (on|off)."
self.log.error(msg)
raise cherrypy.HTTPError(400, msg)
- func: Any = (self.backend.device_led_on if led_type == 'drive' and data['state'] == 'on' else
- self.backend.device_led_off if led_type == 'drive' and data['state'] == 'off' else
- self.backend.chassis_led_on if led_type != 'drive' and data['state'] == 'on' else
- self.backend.chassis_led_off if led_type != 'drive' and data['state'] == 'off' else None)
+ func: Any = (
+ self.backend.device_led_on
+ if led_type == "drive" and data["state"] == "on"
+ else (
+ self.backend.device_led_off
+ if led_type == "drive" and data["state"] == "off"
+ else (
+ self.backend.chassis_led_on
+ if led_type != "drive" and data["state"] == "on"
+ else (
+ self.backend.chassis_led_off
+ if led_type != "drive" and data["state"] == "off"
+ else None
+ )
+ )
+ )
+ )
else:
- func = self.backend.get_device_led if led_type == 'drive' else self.backend.get_chassis_led
+ func = (
+ self.backend.get_device_led
+ if led_type == "drive"
+ else self.backend.get_chassis_led
+ )
- result = func(id_drive) if led_type == 'drive' else func()
+ result = func(id_drive) if led_type == "drive" else func()
except HTTPError as e:
raise cherrypy.HTTPError(e.code, e.reason)
return result
@cherrypy.expose
- @cherrypy.tools.allow(methods=['GET'])
+ @cherrypy.tools.allow(methods=["GET"])
@cherrypy.tools.json_out()
def get_led(self, **kw: Dict[str, Any]) -> Dict[str, Any]:
return self.backend.get_led()
@cherrypy.expose
- @cherrypy.tools.allow(methods=['PATCH'])
+ @cherrypy.tools.allow(methods=["PATCH"])
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@cherrypy.tools.auth_basic(on=True)
if rc != 200:
cherrypy.response.status = rc
- result = {'state': 'error: please, verify the data you sent.'}
+ result = {"state": "error: please, verify the data you sent."}
else:
- result = {'state': data['state'].lower()}
+ result = {"state": data["state"].lower()}
return result
def stop(self) -> None:
class NodeProxyApi(Thread):
- def __init__(self, node_proxy_mgr: 'NodeProxyManager') -> None:
+ def __init__(self, node_proxy_mgr: "NodeProxyManager") -> None:
super().__init__()
self.log = get_logger(__name__)
self.cp_shutdown_event = Event()
self.api = API(self.system, self.reporter_agent, self.config)
def check_auth(self, realm: str, username: str, password: str) -> bool:
- return self.username == username and \
- self.password == password
+ return self.username == username and self.password == password
def shutdown(self) -> None:
- self.log.info('Stopping node-proxy API...')
+ self.log.info("Stopping node-proxy API...")
self.cp_shutdown_event.set()
def run(self) -> None:
- self.log.info('node-proxy API configuration...')
- cherrypy.config.update({
- 'environment': 'production',
- 'engine.autoreload.on': False,
- 'log.screen': True,
- })
- config = {'/': {
- 'request.methods_with_bodies': ('POST', 'PUT', 'PATCH'),
- 'tools.trailing_slash.on': False,
- 'tools.auth_basic.realm': 'localhost',
- 'tools.auth_basic.checkpassword': self.check_auth
- }}
- cherrypy.tree.mount(self.api, '/', config=config)
+ self.log.info("node-proxy API configuration...")
+ cherrypy.config.update(
+ {
+ "environment": "production",
+ "engine.autoreload.on": False,
+ "log.screen": True,
+ }
+ )
+ config = {
+ "/": {
+ "request.methods_with_bodies": ("POST", "PUT", "PATCH"),
+ "tools.trailing_slash.on": False,
+ "tools.auth_basic.realm": "localhost",
+ "tools.auth_basic.checkpassword": self.check_auth,
+ }
+ }
+ cherrypy.tree.mount(self.api, "/", config=config)
# cherrypy.tree.mount(admin, '/admin', config=config)
- ssl_crt = write_tmp_file(self.ssl_crt,
- prefix_name='listener-crt-')
- ssl_key = write_tmp_file(self.ssl_key,
- prefix_name='listener-key-')
+ ssl_crt = write_tmp_file(self.ssl_crt, prefix_name="listener-crt-")
+ ssl_key = write_tmp_file(self.ssl_key, prefix_name="listener-key-")
self.api.ssl_certificate = ssl_crt.name
self.api.ssl_private_key = ssl_key.name
cherrypy.server.unsubscribe()
try:
cherrypy.engine.start()
- self.log.info('node-proxy API started.')
+ self.log.info("node-proxy API started.")
self.cp_shutdown_event.wait()
self.cp_shutdown_event.clear()
cherrypy.engine.exit()
cherrypy.server.httpserver = None
- self.log.info('node-proxy API shutdown.')
+ self.log.info("node-proxy API shutdown.")
except Exception as e:
- self.log.error(f'node-proxy API error: {e}')
+ self.log.error(f"node-proxy API error: {e}")
+from typing import Any
+
from ceph_node_proxy.baseredfishsystem import BaseRedfishSystem
from ceph_node_proxy.util import get_logger
-from typing import Any
class AtollonSystem(BaseRedfishSystem):
-from typing import Dict, Any
+from typing import Any, Dict
class BaseClient:
- def __init__(self,
- host: str,
- username: str,
- password: str) -> None:
+ def __init__(self, host: str, username: str, password: str) -> None:
self.host = host
self.username = username
self.password = password
import concurrent.futures
import dataclasses
+from time import sleep
+from typing import Any, Callable, Dict, List, Optional
+
from ceph_node_proxy.basesystem import BaseSystem
from ceph_node_proxy.redfish import (
ComponentUpdateSpec,
update_component,
)
from ceph_node_proxy.redfish_client import RedFishClient
-from ceph_node_proxy.util import get_logger, to_snake_case, normalize_dict
-from time import sleep
-from typing import Dict, Any, List, Callable, Optional
+from ceph_node_proxy.util import get_logger, normalize_dict, to_snake_case
class BaseRedfishSystem(BaseSystem):
- NETWORK_FIELDS: List[str] = ['Description', 'Name', 'SpeedMbps', 'Status']
+ NETWORK_FIELDS: List[str] = ["Description", "Name", "SpeedMbps", "Status"]
PROCESSORS_FIELDS: List[str] = [
- 'Description', 'TotalCores', 'TotalThreads', 'ProcessorType', 'Model', 'Status', 'Manufacturer',
+ "Description",
+ "TotalCores",
+ "TotalThreads",
+ "ProcessorType",
+ "Model",
+ "Status",
+ "Manufacturer",
+ ]
+ MEMORY_FIELDS: List[str] = [
+ "Description",
+ "MemoryDeviceType",
+ "CapacityMiB",
+ "Status",
]
- MEMORY_FIELDS: List[str] = ['Description', 'MemoryDeviceType', 'CapacityMiB', 'Status']
- POWER_FIELDS: List[str] = ['Name', 'Model', 'Manufacturer', 'Status']
- FANS_FIELDS: List[str] = ['Name', 'PhysicalContext', 'Status']
+ POWER_FIELDS: List[str] = ["Name", "Model", "Manufacturer", "Status"]
+ FANS_FIELDS: List[str] = ["Name", "PhysicalContext", "Status"]
FIRMWARES_FIELDS: List[str] = [
- 'Name', 'Description', 'ReleaseDate', 'Version', 'Updateable', 'Status',
+ "Name",
+ "Description",
+ "ReleaseDate",
+ "Version",
+ "Updateable",
+ "Status",
]
COMPONENT_SPECS: Dict[str, ComponentUpdateSpec] = {
- 'network': ComponentUpdateSpec('systems', 'EthernetInterfaces', NETWORK_FIELDS, None),
- 'processors': ComponentUpdateSpec('systems', 'Processors', PROCESSORS_FIELDS, None),
- 'memory': ComponentUpdateSpec('systems', 'Memory', MEMORY_FIELDS, None),
+ "network": ComponentUpdateSpec(
+ "systems", "EthernetInterfaces", NETWORK_FIELDS, None
+ ),
+ "processors": ComponentUpdateSpec(
+ "systems", "Processors", PROCESSORS_FIELDS, None
+ ),
+ "memory": ComponentUpdateSpec("systems", "Memory", MEMORY_FIELDS, None),
# Power supplies: Chassis/.../PowerSubsystem/PowerSupplies (not like other components: like Systems/.../Memory)
- 'power': ComponentUpdateSpec('chassis', 'PowerSubsystem/PowerSupplies', POWER_FIELDS, None),
- 'fans': ComponentUpdateSpec('chassis', 'Thermal', FANS_FIELDS, 'Fans'),
- 'firmwares': ComponentUpdateSpec('update_service', 'FirmwareInventory', FIRMWARES_FIELDS, None),
+ "power": ComponentUpdateSpec(
+ "chassis", "PowerSubsystem/PowerSupplies", POWER_FIELDS, None
+ ),
+ "fans": ComponentUpdateSpec("chassis", "Thermal", FANS_FIELDS, "Fans"),
+ "firmwares": ComponentUpdateSpec(
+ "update_service", "FirmwareInventory", FIRMWARES_FIELDS, None
+ ),
}
def __init__(self, **kw: Any) -> None:
super().__init__(**kw)
self.log = get_logger(__name__)
- self.host: str = kw['host']
- self.port: str = kw['port']
- self.username: str = kw['username']
- self.password: str = kw['password']
+ self.host: str = kw["host"]
+ self.port: str = kw["port"]
+ self.username: str = kw["username"]
+ self.password: str = kw["password"]
# move the following line (class attribute?)
self.client: RedFishClient = RedFishClient(
- host=self.host, port=self.port, username=self.username, password=self.password
+ host=self.host,
+ port=self.port,
+ username=self.username,
+ password=self.password,
)
self.endpoints: EndpointMgr = EndpointMgr(self.client)
- self.log.info(f'redfish system initialization, host: {self.host}, user: {self.username}')
+ self.log.info(
+ f"redfish system initialization, host: {self.host}, user: {self.username}"
+ )
self.data_ready: bool = False
self.previous_data: Dict = {}
self.data: Dict[str, Dict[str, Any]] = {}
self._system: Dict[str, Dict[str, Any]] = {}
self._sys: Dict[str, Any] = {}
- self.job_service_endpoint: str = ''
- self.create_reboot_job_endpoint: str = ''
- self.setup_job_queue_endpoint: str = ''
- self.component_list: List[str] = kw.get('component_list', ['memory',
- 'power',
- 'fans',
- 'network',
- 'processors',
- 'storage',
- 'firmwares'])
+ self.job_service_endpoint: str = ""
+ self.create_reboot_job_endpoint: str = ""
+ self.setup_job_queue_endpoint: str = ""
+ self.component_list: List[str] = kw.get(
+ "component_list",
+ [
+ "memory",
+ "power",
+ "fans",
+ "network",
+ "processors",
+ "storage",
+ "firmwares",
+ ],
+ )
self.update_funcs: List[Callable] = []
for component in self.component_list:
- self.log.debug(f'adding: {component} to hw component gathered list.')
- func = f'_update_{component}'
+ self.log.debug(f"adding: {component} to hw component gathered list.")
+ func = f"_update_{component}"
if hasattr(self, func):
f = getattr(self, func)
self.update_funcs.append(f)
- def update(self,
- collection: str,
- component: str,
- path: str,
- fields: List[str],
- attribute: Optional[str] = None) -> None:
+ def update(
+ self,
+ collection: str,
+ component: str,
+ path: str,
+ fields: List[str],
+ attribute: Optional[str] = None,
+ ) -> None:
update_component(
- self.endpoints, collection, component, path, fields,
- self._sys, self.log, attribute=attribute,
+ self.endpoints,
+ collection,
+ component,
+ path,
+ fields,
+ self._sys,
+ self.log,
+ attribute=attribute,
)
def main(self) -> None:
self.endpoints.init()
while not self.stop:
- self.log.debug('waiting for a lock in the update loop.')
+ self.log.debug("waiting for a lock in the update loop.")
with self.lock:
if not self.pending_shutdown:
- self.log.debug('lock acquired in the update loop.')
+ self.log.debug("lock acquired in the update loop.")
try:
self._update_system()
self._update_sn()
self.data_ready = True
except RuntimeError as e:
self.stop = True
- self.log.error(f'Error detected, trying to gracefully log out from redfish api.\n{e}')
+ self.log.error(
+ f"Error detected, trying to gracefully log out from redfish api.\n{e}"
+ )
self.client.logout()
raise
sleep(5)
- self.log.debug('lock released in the update loop.')
- self.log.debug('exiting update loop.')
+ self.log.debug("lock released in the update loop.")
+ self.log.debug("exiting update loop.")
raise SystemExit(0)
def flush(self) -> None:
- self.log.debug('Acquiring lock to flush data.')
+ self.log.debug("Acquiring lock to flush data.")
self.lock.acquire()
- self.log.debug('Lock acquired, flushing data.')
+ self.log.debug("Lock acquired, flushing data.")
self._system = {}
self.previous_data = {}
- self.log.info('Data flushed.')
+ self.log.info("Data flushed.")
self.data_ready = False
- self.log.debug('Data marked as not ready.')
+ self.log.debug("Data marked as not ready.")
self.lock.release()
- self.log.debug('Released the lock after flushing data.')
+ self.log.debug("Released the lock after flushing data.")
# @retry(retries=10, delay=2)
def _get_path(self, path: str) -> Dict:
result: Dict[str, Any] = {}
try:
if not self.pending_shutdown:
- self.log.debug(f'Getting path: {path}')
+ self.log.debug(f"Getting path: {path}")
result = self.client.get_path(path)
else:
- self.log.debug(f'Pending shutdown, aborting query to {path}')
+ self.log.debug(f"Pending shutdown, aborting query to {path}")
except RuntimeError:
raise
return result
def get_members(self, data: Dict[str, Any], path: str) -> List:
- return [self._get_path(member['@odata.id']) for member in data['Members']]
+ return [self._get_path(member["@odata.id"]) for member in data["Members"]]
def get_system(self) -> Dict[str, Any]:
result = {
- 'host': self.get_host(),
- 'sn': self.get_sn(),
- 'status': {
- 'storage': self.get_storage(),
- 'processors': self.get_processors(),
- 'network': self.get_network(),
- 'memory': self.get_memory(),
- 'power': self.get_power(),
- 'fans': self.get_fans()
+ "host": self.get_host(),
+ "sn": self.get_sn(),
+ "status": {
+ "storage": self.get_storage(),
+ "processors": self.get_processors(),
+ "network": self.get_network(),
+ "memory": self.get_memory(),
+ "power": self.get_power(),
+ "fans": self.get_fans(),
},
- 'firmwares': self.get_firmwares(),
+ "firmwares": self.get_firmwares(),
}
return result
def _update_system(self) -> None:
- system_members: Dict[str, Any] = self.endpoints['systems'].get_members_data()
- update_service_members: Endpoint = self.endpoints['update_service']
+ system_members: Dict[str, Any] = self.endpoints["systems"].get_members_data()
+ update_service_members: Endpoint = self.endpoints["update_service"]
for member, data in system_members.items():
self._system[member] = data
self._system[update_service_members.id] = update_service_members.data
def get_sn(self) -> str:
- return str(self._sys.get('SKU', ''))
+ return str(self._sys.get("SKU", ""))
def get_status(self) -> Dict[str, Dict[str, Dict]]:
- return dict(self._sys.get('status', {}))
+ return dict(self._sys.get("status", {}))
def get_memory(self) -> Dict[str, Dict[str, Dict]]:
- return dict(self._sys.get('memory', {}))
+ return dict(self._sys.get("memory", {}))
def get_processors(self) -> Dict[str, Dict[str, Dict]]:
- return dict(self._sys.get('processors', {}))
+ return dict(self._sys.get("processors", {}))
def get_network(self) -> Dict[str, Dict[str, Dict]]:
- return dict(self._sys.get('network', {}))
+ return dict(self._sys.get("network", {}))
def get_storage(self) -> Dict[str, Dict[str, Dict]]:
- return dict(self._sys.get('storage', {}))
+ return dict(self._sys.get("storage", {}))
def get_firmwares(self) -> Dict[str, Dict[str, Dict]]:
- return dict(self._sys.get('firmwares', {}))
+ return dict(self._sys.get("firmwares", {}))
def get_power(self) -> Dict[str, Dict[str, Dict]]:
- return dict(self._sys.get('power', {}))
+ return dict(self._sys.get("power", {}))
def get_fans(self) -> Dict[str, Dict[str, Dict]]:
- return dict(self._sys.get('fans', {}))
+ return dict(self._sys.get("fans", {}))
def get_component_spec_overrides(self) -> Dict[str, Dict[str, Any]]:
return {}
return dataclasses.replace(spec, **overrides)
def _run_update(self, component: str) -> None:
- self.log.debug(f'Updating {component}')
+ self.log.debug(f"Updating {component}")
spec = self.get_update_spec(component)
- self.update(spec.collection, component, spec.path, spec.fields, attribute=spec.attribute)
+ self.update(
+ spec.collection, component, spec.path, spec.fields, attribute=spec.attribute
+ )
def _update_network(self) -> None:
- self._run_update('network')
+ self._run_update("network")
def _update_processors(self) -> None:
- self._run_update('processors')
+ self._run_update("processors")
def _update_storage(self) -> None:
- fields = ['Description',
- 'CapacityBytes',
- 'Model', 'Protocol',
- 'LocationIndicatorActive',
- 'SerialNumber', 'Status',
- 'PhysicalLocation']
+ fields = [
+ "Description",
+ "CapacityBytes",
+ "Model",
+ "Protocol",
+ "LocationIndicatorActive",
+ "SerialNumber",
+ "Status",
+ "PhysicalLocation",
+ ]
result: Dict[str, Dict[str, Dict]] = dict()
- self.log.debug('Updating storage')
- members_names = self.endpoints['systems'].get_members_names()
+ self.log.debug("Updating storage")
+ members_names = self.endpoints["systems"].get_members_names()
for member in members_names:
result[member] = {}
- members_data = self.endpoints['systems'][member]['Storage'].get_members_data()
+ members_data = self.endpoints["systems"][member][
+ "Storage"
+ ].get_members_data()
for entity in members_data:
- for drive in members_data[entity]['Drives']:
- data: Dict[str, Any] = Endpoint(drive['@odata.id'], self.endpoints.client).data
- drive_id = data['Id']
+ for drive in members_data[entity]["Drives"]:
+ data: Dict[str, Any] = Endpoint(
+ drive["@odata.id"], self.endpoints.client
+ ).data
+ drive_id = data["Id"]
result[member][drive_id] = dict()
- result[member][drive_id]['redfish_endpoint'] = data['@odata.id']
+ result[member][drive_id]["redfish_endpoint"] = data["@odata.id"]
for field in fields:
result[member][drive_id][to_snake_case(field)] = data.get(field)
- result[member][drive_id]['entity'] = entity
- self._sys['storage'] = normalize_dict(result)
+ result[member][drive_id]["entity"] = entity
+ self._sys["storage"] = normalize_dict(result)
def _update_sn(self) -> None:
serials: List[str] = []
- self.log.debug('Updating serial number')
- data: Dict[str, Any] = self.endpoints['systems'].get_members_data()
+ self.log.debug("Updating serial number")
+ data: Dict[str, Any] = self.endpoints["systems"].get_members_data()
for sys in data.keys():
- serials.append(data[sys]['SKU'])
- self._sys['SKU'] = ','.join(serials)
+ serials.append(data[sys]["SKU"])
+ self._sys["SKU"] = ",".join(serials)
def _update_memory(self) -> None:
- self._run_update('memory')
+ self._run_update("memory")
def _update_power(self) -> None:
- self._run_update('power')
+ self._run_update("power")
def _update_fans(self) -> None:
- self._run_update('fans')
+ self._run_update("fans")
def _update_firmwares(self) -> None:
- self._run_update('firmwares')
+ self._run_update("firmwares")
def device_led_on(self, device: str) -> int:
raise NotImplementedError()
import socket
from threading import Lock
-from ceph_node_proxy.util import Config, get_logger, BaseThread
-from typing import Dict, Any, Optional, Union
+from typing import Any, Dict, Optional, Union
+
from ceph_node_proxy.baseclient import BaseClient
+from ceph_node_proxy.util import BaseThread, Config, get_logger
class BaseSystem(BaseThread):
super().__init__()
self.lock: Lock = Lock()
self._system: Dict = {}
- self.config: Optional[Union[Config, Dict[str, Any]]] = kw.get('config')
+ self.config: Optional[Union[Config, Dict[str, Any]]] = kw.get("config")
self.client: BaseClient
self.log = get_logger(__name__)
from ceph_node_proxy.main import NodeProxyManager
-def create_node_proxy_manager(cephadm_config: CephadmConfig) -> 'NodeProxyManager':
+def create_node_proxy_manager(cephadm_config: CephadmConfig) -> "NodeProxyManager":
"""
Build NodeProxyManager from cephadm bootstrap config.
Creates temporary CA file and loads node-proxy YAML config.
ca_file = write_tmp_file(
cephadm_config.root_cert_pem,
- prefix_name='cephadm-endpoint-root-cert-',
+ prefix_name="cephadm-endpoint-root-cert-",
)
config = get_node_proxy_config(
path=cephadm_config.node_proxy_config_path,
from ceph_node_proxy.util import DEFAULTS, Config
-
REQUIRED_CEPHADM_KEYS = (
- 'target_ip',
- 'target_port',
- 'keyring',
- 'root_cert.pem',
- 'listener.crt',
- 'listener.key',
- 'name',
+ "target_ip",
+ "target_port",
+ "keyring",
+ "root_cert.pem",
+ "listener.crt",
+ "listener.key",
+ "name",
)
@dataclass
class CephadmConfig:
"""Parsed cephadm bootstrap config (from --config JSON file)"""
+
target_ip: str
target_port: str
keyring: str
def from_dict(cls, data: Dict[str, Any]) -> CephadmConfig:
for key in REQUIRED_CEPHADM_KEYS:
if key not in data:
- raise ValueError(f'Missing required cephadm config key: {key}')
+ raise ValueError(f"Missing required cephadm config key: {key}")
# Normalize key with dot to attribute name
- node_proxy_config_path = data.get('node_proxy_config') or os.environ.get('NODE_PROXY_CONFIG', '/etc/ceph/node-proxy.yml')
+ node_proxy_config_path = data.get("node_proxy_config") or os.environ.get(
+ "NODE_PROXY_CONFIG", "/etc/ceph/node-proxy.yml"
+ )
assert node_proxy_config_path is not None
return cls(
- target_ip=data['target_ip'],
- target_port=data['target_port'],
- keyring=data['keyring'],
- root_cert_pem=data['root_cert.pem'],
- listener_crt=data['listener.crt'],
- listener_key=data['listener.key'],
- name=data['name'],
+ target_ip=data["target_ip"],
+ target_port=data["target_port"],
+ keyring=data["keyring"],
+ root_cert_pem=data["root_cert.pem"],
+ listener_crt=data["listener.crt"],
+ listener_key=data["listener.key"],
+ name=data["name"],
node_proxy_config_path=node_proxy_config_path,
)
Raises FileNotFoundError if path does not exist, ValueError if invalid.
"""
if not os.path.exists(path):
- raise FileNotFoundError(f'Config file not found: {path}')
- with open(path, 'r') as f:
+ raise FileNotFoundError(f"Config file not found: {path}")
+ with open(path, "r") as f:
try:
data = json.load(f)
except json.JSONDecodeError as e:
- raise ValueError(f'Invalid JSON config: {e}') from e
+ raise ValueError(f"Invalid JSON config: {e}") from e
if not isinstance(data, dict):
- raise ValueError('Config must be a JSON object')
+ raise ValueError("Config must be a JSON object")
return CephadmConfig.from_dict(data)
path: Optional[str] = None,
defaults: Optional[Dict[str, Any]] = None,
) -> Config:
- effective_path = path or os.environ.get('NODE_PROXY_CONFIG', '/etc/ceph/node-proxy.yml')
+ effective_path = path or os.environ.get(
+ "NODE_PROXY_CONFIG", "/etc/ceph/node-proxy.yml"
+ )
assert effective_path is not None
return Config(effective_path, defaults=defaults or DEFAULTS)
-from ceph_node_proxy.api import NodeProxyApi
-from ceph_node_proxy.bootstrap import create_node_proxy_manager
-from ceph_node_proxy.config import load_cephadm_config
-from ceph_node_proxy.registry import get_system_class
-from ceph_node_proxy.reporter import Reporter
-from ceph_node_proxy.util import Config, DEFAULTS, get_logger, http_req
-from tempfile import _TemporaryFileWrapper
-from urllib.error import HTTPError
-from typing import Dict, Any, Optional
-
import argparse
import json
import os
import signal
import ssl
import time
+from tempfile import _TemporaryFileWrapper
+from typing import Any, Dict, Optional
+from urllib.error import HTTPError
+
+from ceph_node_proxy.api import NodeProxyApi
+from ceph_node_proxy.bootstrap import create_node_proxy_manager
+from ceph_node_proxy.config import load_cephadm_config
+from ceph_node_proxy.registry import get_system_class
+from ceph_node_proxy.reporter import Reporter
+from ceph_node_proxy.util import DEFAULTS, Config, get_logger, http_req
class NodeProxyManager:
mgr_agent_port: str,
config: Optional[Config] = None,
config_path: Optional[str] = None,
- reporter_scheme: str = 'https',
- reporter_endpoint: str = '/node-proxy/data',
+ reporter_scheme: str = "https",
+ reporter_endpoint: str = "/node-proxy/data",
) -> None:
self.exc: Optional[Exception] = None
self.log = get_logger(__name__)
self.ssl_ctx.load_verify_locations(self.ca_path)
self.reporter_scheme = reporter_scheme
self.reporter_endpoint = reporter_endpoint
- self.cephx = {'cephx': {'name': self.cephx_name,
- 'secret': self.cephx_secret}}
+ self.cephx = {"cephx": {"name": self.cephx_name, "secret": self.cephx_secret}}
if config is not None:
self.config = config
else:
path = (
- config_path or os.environ.get('NODE_PROXY_CONFIG')
- or '/etc/ceph/node-proxy.yml'
+ config_path
+ or os.environ.get("NODE_PROXY_CONFIG")
+ or "/etc/ceph/node-proxy.yml"
)
self.config = Config(path, defaults=DEFAULTS)
- self.username: str = ''
- self.password: str = ''
+ self.username: str = ""
+ self.password: str = ""
self._ca_temp_file: Optional[_TemporaryFileWrapper[Any]] = None
def run(self) -> None:
def fetch_oob_details(self) -> Dict[str, str]:
try:
- headers, result, status = http_req(hostname=self.mgr_host,
- port=self.mgr_agent_port,
- data=json.dumps(self.cephx),
- endpoint='/node-proxy/oob',
- ssl_ctx=self.ssl_ctx)
+ headers, result, status = http_req(
+ hostname=self.mgr_host,
+ port=self.mgr_agent_port,
+ data=json.dumps(self.cephx),
+ endpoint="/node-proxy/oob",
+ ssl_ctx=self.ssl_ctx,
+ )
except HTTPError as e:
- msg = f'No out of band tool details could be loaded: {e.code}, {e.reason}'
+ msg = f"No out of band tool details could be loaded: {e.code}, {e.reason}"
self.log.debug(msg)
raise
result_json = json.loads(result)
oob_details: Dict[str, str] = {
- 'host': result_json['result']['addr'],
- 'username': result_json['result']['username'],
- 'password': result_json['result']['password'],
- 'port': result_json['result'].get('port', '443')
+ "host": result_json["result"]["addr"],
+ "username": result_json["result"]["username"],
+ "password": result_json["result"]["password"],
+ "port": result_json["result"].get("port", "443"),
}
return oob_details
def init_system(self) -> None:
try:
oob_details = self.fetch_oob_details()
- self.username = oob_details['username']
- self.password = oob_details['password']
+ self.username = oob_details["username"]
+ self.password = oob_details["password"]
except HTTPError:
- self.log.warning('No oob details could be loaded, exiting...')
+ self.log.warning("No oob details could be loaded, exiting...")
raise SystemExit(1)
try:
- vendor = self.config.get('system', {}).get('vendor', 'generic')
+ vendor = self.config.get("system", {}).get("vendor", "generic")
system_cls = get_system_class(vendor)
- self.system = system_cls(host=oob_details['host'],
- port=oob_details['port'],
- username=oob_details['username'],
- password=oob_details['password'],
- config=self.config)
+ self.system = system_cls(
+ host=oob_details["host"],
+ port=oob_details["port"],
+ username=oob_details["username"],
+ password=oob_details["password"],
+ config=self.config,
+ )
self.system.start()
except RuntimeError:
self.log.error("Can't initialize the redfish system.")
def init_reporter(self) -> None:
try:
- max_retries = self.config.get('reporter', {}).get('push_data_max_retries', 30)
- self.reporter_agent = Reporter(self.system,
- self.cephx,
- reporter_scheme=self.reporter_scheme,
- reporter_hostname=self.mgr_host,
- reporter_port=self.mgr_agent_port,
- reporter_endpoint=self.reporter_endpoint,
- max_retries=max_retries)
+ max_retries = self.config.get("reporter", {}).get(
+ "push_data_max_retries", 30
+ )
+ self.reporter_agent = Reporter(
+ self.system,
+ self.cephx,
+ reporter_scheme=self.reporter_scheme,
+ reporter_hostname=self.mgr_host,
+ reporter_port=self.mgr_agent_port,
+ reporter_endpoint=self.reporter_endpoint,
+ max_retries=max_retries,
+ )
self.reporter_agent.start()
except RuntimeError:
self.log.error("Can't initialize the reporter.")
def init_api(self) -> None:
try:
- self.log.info('Starting node-proxy API...')
+ self.log.info("Starting node-proxy API...")
self.api = NodeProxyApi(self)
self.api.start()
except Exception as e:
try:
for thread in [self.system, self.reporter_agent]:
status = thread.check_status()
- label = 'Ok' if status else 'Critical'
- self.log.debug(f'{thread} status: {label}')
+ label = "Ok" if status else "Critical"
+ self.log.debug(f"{thread} status: {label}")
consecutive_failures = 0
check_interval = min_interval
- self.log.debug('All threads are alive, next check in %ds.', check_interval)
+ self.log.debug(
+ "All threads are alive, next check in %ds.", check_interval
+ )
except Exception as e:
consecutive_failures += 1
self.log.error(
- f'{consecutive_failures} failure(s): thread not running: '
- f'{e.__class__.__name__}: {e}'
+ f"{consecutive_failures} failure(s): thread not running: "
+ f"{e.__class__.__name__}: {e}"
)
for thread in [self.system, self.reporter_agent]:
thread.shutdown()
self.init_system()
self.init_reporter()
check_interval = min(int(check_interval * backoff_factor), max_interval)
- self.log.debug('Next check in %ds (backoff).', check_interval)
+ self.log.debug("Next check in %ds (backoff).", check_interval)
time.sleep(check_interval)
def shutdown(self) -> None:
self.stop = True
# if `self.system.shutdown()` is called before self.start(), it will fail.
- if hasattr(self, 'api'):
+ if hasattr(self, "api"):
self.api.shutdown()
- if hasattr(self, 'reporter_agent'):
+ if hasattr(self, "reporter_agent"):
self.reporter_agent.shutdown()
- if hasattr(self, 'system'):
+ if hasattr(self, "system"):
self.system.shutdown()
-def handler(signum: Any, frame: Any, t_mgr: 'NodeProxyManager') -> None:
- if hasattr(t_mgr, 'system') and t_mgr.system is not None:
+def handler(signum: Any, frame: Any, t_mgr: "NodeProxyManager") -> None:
+ if hasattr(t_mgr, "system") and t_mgr.system is not None:
t_mgr.system.pending_shutdown = True
- t_mgr.log.info('SIGTERM caught, shutting down threads...')
+ t_mgr.log.info("SIGTERM caught, shutting down threads...")
t_mgr.shutdown()
- if hasattr(t_mgr, 'system') and t_mgr.system is not None and hasattr(t_mgr.system, 'client') and t_mgr.system.client is not None:
- t_mgr.log.info('Logging out from RedFish API')
+ if (
+ hasattr(t_mgr, "system")
+ and t_mgr.system is not None
+ and hasattr(t_mgr.system, "client")
+ and t_mgr.system.client is not None
+ ):
+ t_mgr.log.info("Logging out from RedFish API")
t_mgr.system.client.logout()
raise SystemExit(0)
def main() -> None:
parser = argparse.ArgumentParser(
- description='Ceph Node-Proxy for HW Monitoring',
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ description="Ceph Node-Proxy for HW Monitoring",
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter,
+ )
parser.add_argument(
- '--config',
- help='path of config file in json format',
- required=True
+ "--config", help="path of config file in json format", required=True
)
parser.add_argument(
- '--debug',
- help='increase logging verbosity (debug level)',
- action='store_true',
+ "--debug",
+ help="increase logging verbosity (debug level)",
+ action="store_true",
)
args = parser.parse_args()
if args.debug:
- DEFAULTS['logging']['level'] = 10
+ DEFAULTS["logging"]["level"] = 10
try:
cephadm_config = load_cephadm_config(args.config)
except FileNotFoundError as e:
- raise SystemExit(f'Config error: {e}')
+ raise SystemExit(f"Config error: {e}")
except ValueError as e:
- raise SystemExit(f'Config error: {e}')
+ raise SystemExit(f"Config error: {e}")
node_proxy_mgr = create_node_proxy_manager(cephadm_config)
- signal.signal(signal.SIGTERM,
- lambda signum, frame: handler(signum, frame, node_proxy_mgr))
+ signal.signal(
+ signal.SIGTERM, lambda signum, frame: handler(signum, frame, node_proxy_mgr)
+ )
node_proxy_mgr.run()
-if __name__ == '__main__':
+if __name__ == "__main__":
main()
@runtime_checkable
class SystemBackend(Protocol):
- def get_memory(self) -> Dict[str, Any]:
- ...
+ def get_memory(self) -> Dict[str, Any]: ...
- def get_network(self) -> Dict[str, Any]:
- ...
+ def get_network(self) -> Dict[str, Any]: ...
- def get_storage(self) -> Dict[str, Any]:
- ...
+ def get_storage(self) -> Dict[str, Any]: ...
- def get_processors(self) -> Dict[str, Any]:
- ...
+ def get_processors(self) -> Dict[str, Any]: ...
- def get_power(self) -> Dict[str, Any]:
- ...
+ def get_power(self) -> Dict[str, Any]: ...
- def get_fans(self) -> Dict[str, Any]:
- ...
+ def get_fans(self) -> Dict[str, Any]: ...
- def get_firmwares(self) -> Dict[str, Any]:
- ...
+ def get_firmwares(self) -> Dict[str, Any]: ...
- def get_led(self) -> Dict[str, Any]:
- ...
+ def get_led(self) -> Dict[str, Any]: ...
- def set_led(self, data: Dict[str, str]) -> int:
- ...
+ def set_led(self, data: Dict[str, str]) -> int: ...
- def get_chassis_led(self) -> Dict[str, Any]:
- ...
+ def get_chassis_led(self) -> Dict[str, Any]: ...
- def chassis_led_on(self) -> int:
- ...
+ def chassis_led_on(self) -> int: ...
- def chassis_led_off(self) -> int:
- ...
+ def chassis_led_off(self) -> int: ...
- def get_device_led(self, device: str) -> Dict[str, Any]:
- ...
+ def get_device_led(self, device: str) -> Dict[str, Any]: ...
- def device_led_on(self, device: str) -> int:
- ...
+ def device_led_on(self, device: str) -> int: ...
- def device_led_off(self, device: str) -> int:
- ...
+ def device_led_off(self, device: str) -> int: ...
- def shutdown_host(self, force: bool = False) -> int:
- ...
+ def shutdown_host(self, force: bool = False) -> int: ...
- def powercycle(self) -> int:
- ...
+ def powercycle(self) -> int: ...
- def flush(self) -> None:
- ...
+ def flush(self) -> None: ...
- def start(self) -> None:
- ...
+ def start(self) -> None: ...
- def shutdown(self) -> None:
- ...
+ def shutdown(self) -> None: ...
@runtime_checkable
pending_shutdown: bool
previous_data: Dict[str, Any]
- def get_system(self) -> Dict[str, Any]:
- ...
+ def get_system(self) -> Dict[str, Any]: ...
import json
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
+from urllib.error import HTTPError, URLError
from ceph_node_proxy.redfish_client import RedFishClient
-from ceph_node_proxy.util import get_logger, to_snake_case, normalize_dict
-from urllib.error import HTTPError, URLError
+from ceph_node_proxy.util import get_logger, normalize_dict, to_snake_case
@dataclass
class EndpointMgr:
"""Manages Redfish root endpoints (Systems, Chassis, etc.) discovered from the service root."""
- NAME: str = 'EndpointMgr'
+ NAME: str = "EndpointMgr"
- def __init__(self,
- client: RedFishClient,
- prefix: str = RedFishClient.PREFIX) -> None:
- self.log = get_logger(f'{__name__}:{EndpointMgr.NAME}')
+ def __init__(
+ self, client: RedFishClient, prefix: str = RedFishClient.PREFIX
+ ) -> None:
+ self.log = get_logger(f"{__name__}:{EndpointMgr.NAME}")
self.prefix: str = prefix
self.client: RedFishClient = client
- self._endpoints: Dict[str, 'Endpoint'] = {}
- self._session_url: str = ''
+ self._endpoints: Dict[str, "Endpoint"] = {}
+ self._session_url: str = ""
def __getitem__(self, index: str) -> Endpoint:
if index not in self._endpoints:
- raise KeyError(f"'{index}' is not a valid endpoint. Available: {list(self._endpoints.keys())}")
+ raise KeyError(
+ f"'{index}' is not a valid endpoint. Available: {list(self._endpoints.keys())}"
+ )
return self._endpoints[index]
def get(self, name: str, default: Any = None) -> Any:
json_data: Dict[str, Any] = json.loads(_data)
for k, v in json_data.items():
- if isinstance(v, dict) and '@odata.id' in v:
+ if isinstance(v, dict) and "@odata.id" in v:
name: str = to_snake_case(k)
- url: str = v['@odata.id']
- self.log.info(f'entrypoint found: {name} = {url}')
+ url: str = v["@odata.id"]
+ self.log.info(f"entrypoint found: {name} = {url}")
self._endpoints[name] = Endpoint(url, self.client)
try:
- self._session_url = json_data['Links']['Sessions']['@odata.id']
+ self._session_url = json_data["Links"]["Sessions"]["@odata.id"]
except (KeyError, TypeError):
- self.log.warning('Session URL not found in root response')
- self._session_url = ''
+ self.log.warning("Session URL not found in root response")
+ self._session_url = ""
except (URLError, KeyError, json.JSONDecodeError) as e:
- msg = f'{error_msg}: {e}'
+ msg = f"{error_msg}: {e}"
self.log.error(msg)
raise RuntimeError(msg) from e
class Endpoint:
"""Single Redfish resource or collection; supports lazy child resolution and member listing."""
- NAME: str = 'Endpoint'
+ NAME: str = "Endpoint"
def __init__(self, url: str, client: RedFishClient) -> None:
- self.log = get_logger(f'{__name__}:{Endpoint.NAME}')
+ self.log = get_logger(f"{__name__}:{Endpoint.NAME}")
self.url: str = url
self.client: RedFishClient = client
- self._children: Dict[str, 'Endpoint'] = {}
+ self._children: Dict[str, "Endpoint"] = {}
self.data: Dict[str, Any] = self.get_data()
- self.id: str = ''
+ self.id: str = ""
self.members_names: List[str] = []
if self.has_members:
if self.data:
try:
- self.id = self.data['Id']
+ self.id = self.data["Id"]
except KeyError:
- self.id = self.data['@odata.id'].split('/')[-1]
+ self.id = self.data["@odata.id"].split("/")[-1]
else:
- self.log.warning(f'No data could be loaded for {self.url}')
+ self.log.warning(f"No data could be loaded for {self.url}")
- def __getitem__(self, key: str) -> 'Endpoint':
- if not isinstance(key, str) or not key or '/' in key:
+ def __getitem__(self, key: str) -> "Endpoint":
+ if not isinstance(key, str) or not key or "/" in key:
raise KeyError(key)
if key not in self._children:
def query(self, url: str) -> Dict[str, Any]:
data: Dict[str, Any] = {}
try:
- self.log.debug(f'Querying {url}')
+ self.log.debug(f"Querying {url}")
_, _data, _ = self.client.query(endpoint=url)
if not _data:
- self.log.warning(f'Empty response from {url}')
+ self.log.warning(f"Empty response from {url}")
else:
data = json.loads(_data)
except KeyError as e:
- self.log.error(f'KeyError while querying {url}: {e}')
+ self.log.error(f"KeyError while querying {url}: {e}")
except HTTPError as e:
- self.log.error(f'HTTP error while querying {url} - {e.code} - {e.reason}')
+ self.log.error(f"HTTP error while querying {url} - {e.code} - {e.reason}")
except json.JSONDecodeError as e:
- self.log.error(f'JSON decode error while querying {url}: {e}')
+ self.log.error(f"JSON decode error while querying {url}: {e}")
except Exception as e:
- self.log.error(f'Unexpected error while querying {url}: {type(e).__name__}: {e}')
+ self.log.error(
+ f"Unexpected error while querying {url}: {type(e).__name__}: {e}"
+ )
return data
def get_data(self) -> Dict[str, Any]:
def get_members_names(self) -> List[str]:
result: List[str] = []
if self.has_members:
- for member in self.data['Members']:
- name: str = member['@odata.id'].split('/')[-1]
+ for member in self.data["Members"]:
+ name: str = member["@odata.id"].split("/")[-1]
result.append(name)
return result
def get_name(self, endpoint: str) -> str:
- return endpoint.split('/')[-1]
+ return endpoint.split("/")[-1]
def get_members_endpoints(self) -> Dict[str, str]:
members: Dict[str, str] = {}
- self.log.debug(f'get_members_endpoints called on {self.url}, has_members={self.has_members}')
+ self.log.debug(
+ f"get_members_endpoints called on {self.url}, has_members={self.has_members}"
+ )
if self.has_members:
- url_parts = self.url.split('/redfish/v1/')
+ url_parts = self.url.split("/redfish/v1/")
if len(url_parts) > 1:
- base_path = '/redfish/v1/' + url_parts[1].split('/')[0]
+ base_path = "/redfish/v1/" + url_parts[1].split("/")[0]
else:
base_path = None
- for member in self.data['Members']:
- name = self.get_name(member['@odata.id'])
- endpoint_url = member['@odata.id']
- self.log.debug(f'Found member: {name} -> {endpoint_url}')
+ for member in self.data["Members"]:
+ name = self.get_name(member["@odata.id"])
+ endpoint_url = member["@odata.id"]
+ self.log.debug(f"Found member: {name} -> {endpoint_url}")
if base_path and not endpoint_url.startswith(base_path):
self.log.warning(
- f'Member endpoint {endpoint_url} does not match base path {base_path} '
- f'from {self.url}. Skipping this member.'
+ f"Member endpoint {endpoint_url} does not match base path {base_path} "
+ f"from {self.url}. Skipping this member."
)
continue
if self.data:
name = self.get_name(self.url)
members[name] = self.url
- self.log.warning(f'No Members array, using endpoint itself: {name} -> {self.url}')
+ self.log.warning(
+ f"No Members array, using endpoint itself: {name} -> {self.url}"
+ )
else:
- self.log.debug(f'Endpoint {self.url} has no data and no Members array')
+ self.log.debug(f"Endpoint {self.url} has no data and no Members array")
return members
def get_members_data(self) -> Dict[str, Any]:
result: Dict[str, Any] = {}
- self.log.debug(f'get_members_data called on {self.url}, has_members={self.has_members}')
+ self.log.debug(
+ f"get_members_data called on {self.url}, has_members={self.has_members}"
+ )
if self.has_members:
- self.log.debug(f'Endpoint {self.url} has Members array: {self.data.get("Members", [])}')
+ self.log.debug(
+ f'Endpoint {self.url} has Members array: {self.data.get("Members", [])}'
+ )
members_endpoints = self.get_members_endpoints()
if not members_endpoints:
self.log.warning(
- f'Endpoint {self.url} has Members array but no valid members after filtering. '
- f'Using endpoint itself as singleton resource.'
+ f"Endpoint {self.url} has Members array but no valid members after filtering. "
+ f"Using endpoint itself as singleton resource."
)
if self.data:
name = self.get_name(self.url)
result[name] = self.data
else:
for member, endpoint_url in members_endpoints.items():
- self.log.info(f'Fetching data for member: {member} at {endpoint_url}')
+ self.log.info(
+ f"Fetching data for member: {member} at {endpoint_url}"
+ )
result[member] = self.query(endpoint_url)
else:
- self.log.debug(f'Endpoint {self.url} has no Members array, returning own data')
+ self.log.debug(
+ f"Endpoint {self.url} has no Members array, returning own data"
+ )
if self.data:
name = self.get_name(self.url)
result[name] = self.data
else:
- self.log.warning(f'Endpoint {self.url} has no members and empty data')
+ self.log.warning(f"Endpoint {self.url} has no members and empty data")
return result
@property
def has_members(self) -> bool:
- return bool(self.data and 'Members' in self.data and isinstance(self.data['Members'], list))
+ return bool(
+ self.data
+ and "Members" in self.data
+ and isinstance(self.data["Members"], list)
+ )
def build_data(
try:
out[to_snake_case(field)] = d[field]
except KeyError:
- log.debug(f'Could not find field: {field} in data: {d}')
+ log.debug(f"Could not find field: {field} in data: {d}")
out[to_snake_case(field)] = None
return out
if attribute is not None:
data_items = data[attribute]
else:
- data_items = [{'MemberId': k, **v} for k, v in data.items()]
+ data_items = [{"MemberId": k, **v} for k, v in data.items()]
log.debug(f"build_data: data_items count={len(data_items)}")
for d in data_items:
- member_id = d.get('MemberId')
+ member_id = d.get("MemberId")
result[member_id] = {}
result[member_id] = process_data(member_id, fields, d)
except (KeyError, TypeError, AttributeError) as e:
def _resolve_path(endpoint: Endpoint, path: str) -> Endpoint:
"""Resolve an endpoint by traversing path segments (example: 'PowerSubsystem/PowerSupplies')."""
- parts = [p for p in path.split('/') if p]
+ parts = [p for p in path.split("/") if p]
current = endpoint
for part in parts:
current = current[part]
data = ep.get_members_data()
else:
data = ep.data
- result[member] = build_data(data=data, fields=fields, log=log, attribute=attribute)
+ result[member] = build_data(
+ data=data, fields=fields, log=log, attribute=attribute
+ )
except HTTPError as e:
- log.error(f'Error while updating {component}: {e}')
+ log.error(f"Error while updating {component}: {e}")
continue
_sys[component] = result
import json
+from http.client import HTTPMessage
+from typing import Any, Dict, Optional, Tuple
from urllib.error import HTTPError, URLError
+
from ceph_node_proxy.baseclient import BaseClient
from ceph_node_proxy.util import get_logger, http_req
-from typing import Dict, Any, Tuple, Optional
-from http.client import HTTPMessage
class RedFishClient(BaseClient):
- PREFIX = '/redfish/v1/'
+ PREFIX = "/redfish/v1/"
- def __init__(self,
- host: str = '',
- port: str = '443',
- username: str = '',
- password: str = ''):
+ def __init__(
+ self, host: str = "", port: str = "443", username: str = "", password: str = ""
+ ):
super().__init__(host, username, password)
self.log = get_logger(__name__)
- self.log.info(f'Initializing redfish client {__name__}')
+ self.log.info(f"Initializing redfish client {__name__}")
self.host: str = host
self.port: str = port
- self.url: str = f'https://{self.host}:{self.port}'
- self.token: str = ''
- self.location: str = ''
- self.session_service: str = ''
+ self.url: str = f"https://{self.host}:{self.port}"
+ self.token: str = ""
+ self.location: str = ""
+ self.session_service: str = ""
def sessionservice_discover(self) -> None:
_error_msg: str = "Can't discover SessionService url"
try:
_headers, _data, _status_code = self.query(endpoint=RedFishClient.PREFIX)
json_data: Dict[str, Any] = json.loads(_data)
- self.session_service = json_data['Links']['Sessions']['@odata.id']
+ self.session_service = json_data["Links"]["Sessions"]["@odata.id"]
except (URLError, KeyError) as e:
- msg = f'{_error_msg}: {e}'
+ msg = f"{_error_msg}: {e}"
self.log.error(msg)
raise RuntimeError
def login(self) -> None:
if not self.is_logged_in():
- self.log.debug('Discovering SessionService url...')
+ self.log.debug("Discovering SessionService url...")
self.sessionservice_discover()
- self.log.debug(f'SessionService url is {self.session_service}')
- self.log.info('Logging in to '
- f"{self.url} as '{self.username}'")
- oob_credentials = json.dumps({'UserName': self.username,
- 'Password': self.password})
- headers = {'Content-Type': 'application/json'}
- location_endpoint: str = ''
+ self.log.debug(f"SessionService url is {self.session_service}")
+ self.log.info("Logging in to " f"{self.url} as '{self.username}'")
+ oob_credentials = json.dumps(
+ {"UserName": self.username, "Password": self.password}
+ )
+ headers = {"Content-Type": "application/json"}
+ location_endpoint: str = ""
try:
- _headers, _data, _status_code = self.query(data=oob_credentials,
- headers=headers,
- endpoint=self.session_service)
+ _headers, _data, _status_code = self.query(
+ data=oob_credentials, headers=headers, endpoint=self.session_service
+ )
if _status_code != 201:
- self.log.error(f"Can't log in to {self.url} as '{self.username}': {_status_code}")
+ self.log.error(
+ f"Can't log in to {self.url} as '{self.username}': {_status_code}"
+ )
raise RuntimeError
except URLError as e:
msg = f"Can't log in to {self.url} as '{self.username}': {e}"
self.log.error(msg)
raise RuntimeError
- self.token = _headers['X-Auth-Token']
- if _headers['Location'].startswith('http'):
+ self.token = _headers["X-Auth-Token"]
+ if _headers["Location"].startswith("http"):
# We assume the value has the following format:
# scheme://address:port/redfish/v1/SessionService/Session
location_endpoint = f"/{_headers['Location'].split('/', 3)[-1:][0]}"
else:
- location_endpoint = _headers['Location']
+ location_endpoint = _headers["Location"]
self.location = location_endpoint
- self.log.info(f'Logged in to {self.url}, Received header "Location": {self.location}')
+ self.log.info(
+ f'Logged in to {self.url}, Received header "Location": {self.location}'
+ )
def is_logged_in(self) -> bool:
- self.log.debug(f'Checking token validity for {self.url}')
+ self.log.debug(f"Checking token validity for {self.url}")
if not self.location or not self.token:
- self.log.debug(f'No token found for {self.url}.')
+ self.log.debug(f"No token found for {self.url}.")
return False
- headers = {'X-Auth-Token': self.token}
+ headers = {"X-Auth-Token": self.token}
try:
- _headers, _data, _status_code = self.query(headers=headers,
- endpoint=self.location)
+ _headers, _data, _status_code = self.query(
+ headers=headers, endpoint=self.location
+ )
except URLError as e:
- self.log.error("Can't check token "
- f'validity for {self.url}: {e}')
+ self.log.error("Can't check token " f"validity for {self.url}: {e}")
raise
return _status_code == 200
result: Dict[str, Any] = {}
try:
if self.is_logged_in():
- _, _data, _status_code = self.query(method='DELETE',
- headers={'X-Auth-Token': self.token},
- endpoint=self.location)
+ _, _data, _status_code = self.query(
+ method="DELETE",
+ headers={"X-Auth-Token": self.token},
+ endpoint=self.location,
+ )
result = json.loads(_data)
except URLError:
self.log.error(f"Can't log out from {self.url}")
- self.location = ''
- self.token = ''
+ self.location = ""
+ self.token = ""
return result
def get_path(self, path: str) -> Dict[str, Any]:
if self.PREFIX not in path:
- path = f'{self.PREFIX}{path}'
+ path = f"{self.PREFIX}{path}"
try:
_, result, _status_code = self.query(endpoint=path)
result_json: Dict[str, Any] = json.loads(result)
self.log.error(f"Can't get path {path}:\n{e}")
raise RuntimeError
- def query(self,
- data: Optional[str] = None,
- headers: Dict[str, str] = {},
- method: Optional[str] = None,
- endpoint: str = '',
- timeout: int = 10) -> Tuple[HTTPMessage, str, int]:
+ def query(
+ self,
+ data: Optional[str] = None,
+ headers: Dict[str, str] = {},
+ method: Optional[str] = None,
+ endpoint: str = "",
+ timeout: int = 10,
+ ) -> Tuple[HTTPMessage, str, int]:
_headers = headers.copy() if headers else {}
if self.token:
- _headers['X-Auth-Token'] = self.token
- if not _headers.get('Content-Type') and method in ['POST', 'PUT', 'PATCH']:
- _headers['Content-Type'] = 'application/json'
+ _headers["X-Auth-Token"] = self.token
+ if not _headers.get("Content-Type") and method in ["POST", "PUT", "PATCH"]:
+ _headers["Content-Type"] = "application/json"
try:
- (response_headers,
- response_str,
- response_status) = http_req(hostname=self.host,
- port=self.port,
- endpoint=endpoint,
- headers=_headers,
- method=method,
- data=data,
- timeout=timeout)
+ response_headers, response_str, response_status = http_req(
+ hostname=self.host,
+ port=self.port,
+ endpoint=endpoint,
+ headers=_headers,
+ method=method,
+ data=data,
+ timeout=timeout,
+ )
return response_headers, response_str, response_status
except (HTTPError, URLError) as e:
- self.log.debug(f'endpoint={endpoint} err={e}')
+ self.log.debug(f"endpoint={endpoint} err={e}")
raise
import json
+from typing import Any, Dict, List, Union
+from urllib.error import HTTPError, URLError
+
from ceph_node_proxy.baseredfishsystem import BaseRedfishSystem
from ceph_node_proxy.util import get_logger
-from typing import Dict, Any, List, Union
-from urllib.error import HTTPError, URLError
class RedfishDellSystem(BaseRedfishSystem):
def __init__(self, **kw: Any) -> None:
super().__init__(**kw)
self.log = get_logger(__name__)
- self.job_service_endpoint: str = '/redfish/v1/Managers/iDRAC.Embedded.1/Oem/Dell/DellJobService'
- self.create_reboot_job_endpoint: str = f'{self.job_service_endpoint}/Actions/DellJobService.CreateRebootJob'
- self.setup_job_queue_endpoint: str = f'{self.job_service_endpoint}/Actions/DellJobService.SetupJobQueue'
+ self.job_service_endpoint: str = (
+ "/redfish/v1/Managers/iDRAC.Embedded.1/Oem/Dell/DellJobService"
+ )
+ self.create_reboot_job_endpoint: str = (
+ f"{self.job_service_endpoint}/Actions/DellJobService.CreateRebootJob"
+ )
+ self.setup_job_queue_endpoint: str = (
+ f"{self.job_service_endpoint}/Actions/DellJobService.SetupJobQueue"
+ )
def device_led_on(self, device: str) -> int:
- data: Dict[str, bool] = {'LocationIndicatorActive': True}
+ data: Dict[str, bool] = {"LocationIndicatorActive": True}
try:
result = self.set_device_led(device, data)
except (HTTPError, KeyError):
return result
def device_led_off(self, device: str) -> int:
- data: Dict[str, bool] = {'LocationIndicatorActive': False}
+ data: Dict[str, bool] = {"LocationIndicatorActive": False}
try:
result = self.set_device_led(device, data)
except (HTTPError, KeyError):
return result
def chassis_led_on(self) -> int:
- data: Dict[str, str] = {'IndicatorLED': 'Blinking'}
+ data: Dict[str, str] = {"IndicatorLED": "Blinking"}
result = self.set_chassis_led(data)
return result
def chassis_led_off(self) -> int:
- data: Dict[str, str] = {'IndicatorLED': 'Lit'}
+ data: Dict[str, str] = {"IndicatorLED": "Lit"}
result = self.set_chassis_led(data)
return result
def get_device_led(self, device: str) -> Dict[str, Any]:
- endpoint = self._sys['storage'][device]['redfish_endpoint']
+ endpoint = self._sys["storage"][device]["redfish_endpoint"]
try:
- result = self.client.query(method='GET',
- endpoint=endpoint,
- timeout=10)
+ result = self.client.query(method="GET", endpoint=endpoint, timeout=10)
except HTTPError as e:
- self.log.error(f"Couldn't get the ident device LED status for device '{device}': {e}")
+ self.log.error(
+ f"Couldn't get the ident device LED status for device '{device}': {e}"
+ )
raise
response_json = json.loads(result[1])
- _result: Dict[str, Any] = {'http_code': result[2]}
+ _result: Dict[str, Any] = {"http_code": result[2]}
if result[2] == 200:
- _result['LocationIndicatorActive'] = response_json['LocationIndicatorActive']
+ _result["LocationIndicatorActive"] = response_json[
+ "LocationIndicatorActive"
+ ]
else:
- _result['LocationIndicatorActive'] = None
+ _result["LocationIndicatorActive"] = None
return _result
def set_device_led(self, device: str, data: Dict[str, bool]) -> int:
try:
_, _, status = self.client.query(
data=json.dumps(data),
- method='PATCH',
- endpoint=self._sys['storage'][device]['redfish_endpoint']
+ method="PATCH",
+ endpoint=self._sys["storage"][device]["redfish_endpoint"],
)
except (HTTPError, KeyError) as e:
- self.log.error(f"Couldn't set the ident device LED for device '{device}': {e}")
+ self.log.error(
+ f"Couldn't set the ident device LED for device '{device}': {e}"
+ )
raise
return status
def get_chassis_led(self) -> Dict[str, Any]:
- endpoint = list(self.endpoints['chassis'].get_members_endpoints().values())[0]
+ endpoint = list(self.endpoints["chassis"].get_members_endpoints().values())[0]
try:
- result = self.client.query(method='GET',
- endpoint=endpoint,
- timeout=10)
+ result = self.client.query(method="GET", endpoint=endpoint, timeout=10)
except HTTPError as e:
self.log.error(f"Couldn't get the ident chassis LED status: {e}")
raise
response_json = json.loads(result[1])
- _result: Dict[str, Any] = {'http_code': result[2]}
+ _result: Dict[str, Any] = {"http_code": result[2]}
if result[2] == 200:
- _result['LocationIndicatorActive'] = response_json['LocationIndicatorActive']
+ _result["LocationIndicatorActive"] = response_json[
+ "LocationIndicatorActive"
+ ]
else:
- _result['LocationIndicatorActive'] = None
+ _result["LocationIndicatorActive"] = None
return _result
def set_chassis_led(self, data: Dict[str, str]) -> int:
try:
_, _, status = self.client.query(
data=json.dumps(data),
- method='PATCH',
- endpoint=list(self.endpoints['chassis'].get_members_endpoints().values())[0]
+ method="PATCH",
+ endpoint=list(
+ self.endpoints["chassis"].get_members_endpoints().values()
+ )[0],
)
except HTTPError as e:
self.log.error(f"Couldn't set the ident chassis LED: {e}")
return status
def shutdown_host(self, force: bool = False) -> int:
- reboot_type: str = 'GracefulRebootWithForcedShutdown' if force else 'GracefulRebootWithoutForcedShutdown'
+ reboot_type: str = (
+ "GracefulRebootWithForcedShutdown"
+ if force
+ else "GracefulRebootWithoutForcedShutdown"
+ )
try:
job_id: str = self.create_reboot_job(reboot_type)
def powercycle(self) -> int:
try:
- job_id: str = self.create_reboot_job('PowerCycle')
+ job_id: str = self.create_reboot_job("PowerCycle")
status = self.schedule_reboot_job(job_id)
except (HTTPError, URLError) as e:
self.log.error(f"Couldn't perform power cycle: {e}")
data: Dict[str, str] = dict(RebootJobType=reboot_type)
try:
headers, _, _ = self.client.query(
- data=json.dumps(data),
- endpoint=self.create_reboot_job_endpoint
+ data=json.dumps(data), endpoint=self.create_reboot_job_endpoint
)
- job_id: str = headers['Location'].split('/')[-1]
+ job_id: str = headers["Location"].split("/")[-1]
except (HTTPError, URLError) as e:
self.log.error(f"Couldn't create the reboot job: {e}")
raise
return job_id
def schedule_reboot_job(self, job_id: str) -> int:
- data: Dict[str, Union[List[str], str]] = dict(JobArray=[job_id], StartTimeInterval='TIME_NOW')
+ data: Dict[str, Union[List[str], str]] = dict(
+ JobArray=[job_id], StartTimeInterval="TIME_NOW"
+ )
try:
_, _, status = self.client.query(
- data=json.dumps(data),
- endpoint=self.setup_job_queue_endpoint
+ data=json.dumps(data), endpoint=self.setup_job_queue_endpoint
)
except (HTTPError, KeyError) as e:
self.log.error(f"Couldn't schedule the reboot job: {e}")
from typing import Dict, Type
-from ceph_node_proxy.baseredfishsystem import BaseRedfishSystem
-
# Built-in implementations
from ceph_node_proxy.atollon import AtollonSystem
+from ceph_node_proxy.baseredfishsystem import BaseRedfishSystem
from ceph_node_proxy.redfishdellsystem import RedfishDellSystem
from ceph_node_proxy.util import get_logger
REDFISH_SYSTEM_CLASSES: Dict[str, Type[BaseRedfishSystem]] = {
- 'generic': BaseRedfishSystem,
- 'dell': RedfishDellSystem,
- 'atollon': AtollonSystem,
+ "generic": BaseRedfishSystem,
+ "dell": RedfishDellSystem,
+ "atollon": AtollonSystem,
}
logger = get_logger(__name__)
-import time
import json
-from ceph_node_proxy.protocols import SystemForReporter
-from ceph_node_proxy.util import get_logger, http_req, BaseThread
+import time
+from typing import Any, Dict
from urllib.error import HTTPError, URLError
-from typing import Dict, Any
+from ceph_node_proxy.protocols import SystemForReporter
+from ceph_node_proxy.util import BaseThread, get_logger, http_req
DEFAULT_MAX_RETRIES = 30
RETRY_SLEEP_SEC = 5
class Reporter(BaseThread):
- def __init__(self,
- system: SystemForReporter,
- cephx: Dict[str, Any],
- reporter_scheme: str = 'https',
- reporter_hostname: str = '',
- reporter_port: str = '443',
- reporter_endpoint: str = '/node-proxy/data',
- max_retries: int = DEFAULT_MAX_RETRIES) -> None:
+ def __init__(
+ self,
+ system: SystemForReporter,
+ cephx: Dict[str, Any],
+ reporter_scheme: str = "https",
+ reporter_hostname: str = "",
+ reporter_port: str = "443",
+ reporter_endpoint: str = "/node-proxy/data",
+ max_retries: int = DEFAULT_MAX_RETRIES,
+ ) -> None:
super().__init__()
self.system = system
self.data: Dict[str, Any] = {}
self.stop: bool = False
self.cephx = cephx
- self.data['cephx'] = self.cephx['cephx']
+ self.data["cephx"] = self.cephx["cephx"]
self.reporter_scheme: str = reporter_scheme
self.reporter_hostname: str = reporter_hostname
self.reporter_port: str = reporter_port
self.reporter_endpoint: str = reporter_endpoint
self.max_retries: int = max_retries
self.log = get_logger(__name__)
- self.reporter_url: str = (f'{reporter_scheme}://{reporter_hostname}:'
- f'{reporter_port}{reporter_endpoint}')
- self.log.info(f'Reporter url set to {self.reporter_url}')
+ self.reporter_url: str = (
+ f"{reporter_scheme}://{reporter_hostname}:"
+ f"{reporter_port}{reporter_endpoint}"
+ )
+ self.log.info(f"Reporter url set to {self.reporter_url}")
def _send_with_retries(self) -> bool:
"""Send data to mgr. Returns True on success, False after max_retries failures."""
for attempt in range(1, self.max_retries + 1):
try:
- self.log.info(f'sending data to {self.reporter_url} (attempt {attempt}/{self.max_retries})')
- http_req(hostname=self.reporter_hostname,
- port=self.reporter_port,
- method='POST',
- headers={'Content-Type': 'application/json'},
- endpoint=self.reporter_endpoint,
- scheme=self.reporter_scheme,
- data=json.dumps(self.data))
+ self.log.info(
+ f"sending data to {self.reporter_url} (attempt {attempt}/{self.max_retries})"
+ )
+ http_req(
+ hostname=self.reporter_hostname,
+ port=self.reporter_port,
+ method="POST",
+ headers={"Content-Type": "application/json"},
+ endpoint=self.reporter_endpoint,
+ scheme=self.reporter_scheme,
+ data=json.dumps(self.data),
+ )
return True
except (HTTPError, URLError) as e:
self.log.error(
def main(self) -> None:
while not self.stop:
- self.log.debug('waiting for a lock in reporter loop.')
+ self.log.debug("waiting for a lock in reporter loop.")
with self.system.lock:
if not self.system.pending_shutdown:
- self.log.debug('lock acquired in reporter loop.')
+ self.log.debug("lock acquired in reporter loop.")
if self.system.data_ready:
- self.log.debug('data ready to be sent to the mgr.')
+ self.log.debug("data ready to be sent to the mgr.")
if self.system.get_system() != self.system.previous_data:
- self.log.info('data has changed since last iteration.')
- self.data['patch'] = self.system.get_system()
+ self.log.info("data has changed since last iteration.")
+ self.data["patch"] = self.system.get_system()
if self._send_with_retries():
self.system.previous_data = self.system.get_system()
else:
self.log.error(
- f'Failed to send data after {self.max_retries} retries; '
- 'will retry on next cycle.'
+ f"Failed to send data after {self.max_retries} retries; "
+ "will retry on next cycle."
)
else:
- self.log.debug('no diff, not sending data to the mgr.')
- self.log.debug('lock released in reporter loop.')
+ self.log.debug("no diff, not sending data to the mgr.")
+ self.log.debug("lock released in reporter loop.")
time.sleep(5)
- self.log.debug('exiting reporter loop.')
+ self.log.debug("exiting reporter loop.")
raise SystemExit(0)
import logging
-import yaml
import os
-import time
import re
import ssl
-import traceback
import threading
+import time
+import traceback
from tempfile import NamedTemporaryFile, _TemporaryFileWrapper
+from typing import Any, Callable, Dict, MutableMapping, Optional, Tuple, Union
from urllib.error import HTTPError, URLError
-from urllib.request import urlopen, Request
-from typing import Dict, Callable, Any, Optional, MutableMapping, Tuple, Union
+from urllib.request import Request, urlopen
+import yaml
DEFAULTS: Dict[str, Any] = {
- 'reporter': {
- 'check_interval': 5,
- 'push_data_max_retries': 30,
- 'endpoint': 'https://%(mgr_host):%(mgr_port)/node-proxy/data',
+ "reporter": {
+ "check_interval": 5,
+ "push_data_max_retries": 30,
+ "endpoint": "https://%(mgr_host):%(mgr_port)/node-proxy/data",
},
- 'system': {
- 'refresh_interval': 5,
- 'vendor': 'generic',
+ "system": {
+ "refresh_interval": 5,
+ "vendor": "generic",
},
- 'api': {
- 'port': 9456,
+ "api": {
+ "port": 9456,
},
- 'logging': {
- 'level': logging.INFO,
+ "logging": {
+ "level": logging.INFO,
},
}
defaults = defaults or {}
if not os.path.exists(path):
return _deep_merge({}, defaults)
- with open(path, 'r') as f:
+ with open(path, "r") as f:
loaded = yaml.safe_load(f) or {}
return _deep_merge(defaults, loaded)
def get_logger(name: str, level: Union[int, str] = logging.NOTSET) -> logging.Logger:
log_level: Union[int, str] = level
if log_level == logging.NOTSET:
- log_level = DEFAULTS['logging']['level']
+ log_level = DEFAULTS["logging"]["level"]
logger = logging.getLogger(name)
logger.setLevel(log_level)
handler = logging.StreamHandler()
handler.setLevel(log_level)
- fmt = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+ fmt = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler.setFormatter(fmt)
logger.handlers.clear()
logger.addHandler(handler)
self.pending_shutdown: bool = False
def run(self) -> None:
- logger.info(f'Starting {self.name}')
+ logger.info(f"Starting {self.name}")
try:
self.main()
except Exception as e:
self.pending_shutdown = True
def check_status(self) -> bool:
- logger.debug(f'Checking status of {self.name}')
+ logger.debug(f"Checking status of {self.name}")
if self.exc:
traceback.print_tb(self.exc.__traceback__)
- logger.error(f'Caught exception: {self.exc.__class__.__name__}')
+ logger.error(f"Caught exception: {self.exc.__class__.__name__}")
raise self.exc
if not self.is_alive():
- logger.info(f'{self.name} not alive')
+ logger.info(f"{self.name} not alive")
self.start()
return True
def to_snake_case(name: str) -> str:
- name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
- return re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).lower()
+ name = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name)
+ return re.sub("([a-z0-9])([A-Z])", r"\1_\2", name).lower()
def normalize_dict(test_dict: Dict) -> Dict:
res[key.lower()] = normalize_dict(test_dict[key])
else:
if test_dict[key] is None:
- test_dict[key] = 'unknown'
+ test_dict[key] = "unknown"
res[key.lower()] = test_dict[key]
return res
-def retry(exceptions: Any = Exception, retries: int = 20, delay: int = 1) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
+def retry(
+ exceptions: Any = Exception, retries: int = 20, delay: int = 1
+) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
def decorator(f: Callable[..., Any]) -> Callable[..., Any]:
def _retry(*args: Any, **kwargs: Any) -> Any:
_tries = retries
while _tries > 1:
try:
- logger.debug('{} {} attempt(s) left.'.format(f, _tries - 1))
+ logger.debug("{} {} attempt(s) left.".format(f, _tries - 1))
return f(*args, **kwargs)
except exceptions:
time.sleep(delay)
_tries -= 1
- logger.warn('{} has failed after {} tries'.format(f, retries))
+ logger.warn("{} has failed after {} tries".format(f, retries))
return f(*args, **kwargs)
+
return _retry
+
return decorator
-def http_req(hostname: str = '',
- port: str = '443',
- method: Optional[str] = None,
- headers: MutableMapping[str, str] = {},
- data: Optional[str] = None,
- endpoint: str = '/',
- scheme: str = 'https',
- ssl_verify: bool = False,
- timeout: Optional[int] = None,
- ssl_ctx: Optional[Any] = None) -> Tuple[Any, Any, Any]:
+def http_req(
+ hostname: str = "",
+ port: str = "443",
+ method: Optional[str] = None,
+ headers: MutableMapping[str, str] = {},
+ data: Optional[str] = None,
+ endpoint: str = "/",
+ scheme: str = "https",
+ ssl_verify: bool = False,
+ timeout: Optional[int] = None,
+ ssl_ctx: Optional[Any] = None,
+) -> Tuple[Any, Any, Any]:
if not ssl_ctx:
ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
else:
ssl_ctx.verify_mode = ssl.CERT_REQUIRED
- url: str = f'{scheme}://{hostname}:{port}{endpoint}'
- _data = bytes(data, 'ascii') if data else None
+ url: str = f"{scheme}://{hostname}:{port}{endpoint}"
+ _data = bytes(data, "ascii") if data else None
_headers = headers
if data and not method:
- method = 'POST'
- if not _headers.get('Content-Type') and method in ['POST', 'PATCH']:
- _headers['Content-Type'] = 'application/json'
+ method = "POST"
+ if not _headers.get("Content-Type") and method in ["POST", "PATCH"]:
+ _headers["Content-Type"] = "application/json"
try:
req = Request(url, _data, _headers, method=method)
with urlopen(req, context=ssl_ctx, timeout=timeout) as response:
except (HTTPError, URLError) as e:
# Log level is debug only.
# We let whatever calls `http_req()` catching and printing the error
- logger.debug(f'url={url} err={e}')
+ logger.debug(f"url={url} err={e}")
# handle error here if needed
raise
-def write_tmp_file(data: str, prefix_name: str = 'node-proxy-') -> _TemporaryFileWrapper:
+def write_tmp_file(
+ data: str, prefix_name: str = "node-proxy-"
+) -> _TemporaryFileWrapper:
f = NamedTemporaryFile(prefix=prefix_name)
os.fchmod(f.fileno(), 0o600)
- f.write(data.encode('utf-8'))
+ f.write(data.encode("utf-8"))
f.flush()
return f