-import cherrypy
+import cherrypy # type: ignore
from urllib.error import HTTPError
-from cherrypy._cpserver import Server
+from cherrypy._cpserver import Server # type: ignore
from threading import Thread, Event
from typing import Dict, Any, List
-from ceph_node_proxy.util import Config, Logger, write_tmp_file
+from ceph_node_proxy.util import Config, get_logger, write_tmp_file
from ceph_node_proxy.basesystem import BaseSystem
from ceph_node_proxy.reporter import Reporter
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
- from ceph_node_proxy.main import NodeProxy
+ from ceph_node_proxy.main import NodeProxyManager
@cherrypy.tools.auth_basic(on=True)
@cherrypy.expose
def start(self) -> Dict[str, str]:
- self.api.backend.start_client()
- # self.backend.start_update_loop()
+ self.api.backend.start()
self.api.reporter.run()
return {'ok': 'node-proxy daemon started'}
return {'ok': 'node-proxy config reloaded'}
def _stop(self) -> None:
- self.api.backend.stop_update_loop()
- self.api.backend.client.logout()
- self.api.reporter.stop()
+ self.api.backend.shutdown()
+ self.api.reporter.shutdown()
@cherrypy.expose
def stop(self) -> Dict[str, str]:
addr: str = '0.0.0.0',
port: int = 0) -> None:
super().__init__()
- self.log = Logger(__name__)
+ self.log = get_logger(__name__)
self.backend = backend
self.reporter = reporter
self.config = config
- self.socket_port = self.config.__dict__['server']['port'] if not port else port
+ self.socket_port = self.config.__dict__['api']['port'] if not port else port
self.socket_host = addr
self.subscribe()
if 'force' not in data.keys():
msg = "The key 'force' wasn't passed."
- self.log.logger.debug(msg)
+ self.log.debug(msg)
raise cherrypy.HTTPError(400, msg)
try:
- result: int = self.backend.shutdown(force=data['force'])
+ result: int = self.backend.shutdown_host(force=data['force'])
except HTTPError as e:
raise cherrypy.HTTPError(e.code, e.reason)
return result
if not led_type:
msg = "the led type must be provided (either 'chassis' or 'drive')."
- self.log.logger.debug(msg)
+ self.log.debug(msg)
raise cherrypy.HTTPError(400, msg)
if led_type == 'drive':
id_drive_required = not id_drive
if id_drive_required or id_drive not in self.backend.get_storage():
msg = 'A valid device ID must be provided.'
- self.log.logger.debug(msg)
+ self.log.debug(msg)
raise cherrypy.HTTPError(400, msg)
try:
if 'state' not in data or data['state'] not in ['on', 'off']:
msg = "Invalid data. 'state' must be provided and have a valid value (on|off)."
- self.log.logger.error(msg)
+ self.log.error(msg)
raise cherrypy.HTTPError(400, msg)
func: Any = (self.backend.device_led_on if led_type == 'drive' and data['state'] == 'on' else
class NodeProxyApi(Thread):
- def __init__(self,
- node_proxy: 'NodeProxy',
- username: str,
- password: str,
- ssl_crt: str,
- ssl_key: str) -> None:
+ def __init__(self, node_proxy_mgr: 'NodeProxyManager') -> None:
super().__init__()
- self.log = Logger(__name__)
+ self.log = get_logger(__name__)
self.cp_shutdown_event = Event()
- self.node_proxy = node_proxy
- self.username = username
- self.password = password
- self.ssl_crt = ssl_crt
- self.ssl_key = ssl_key
- self.api = API(self.node_proxy.system,
- self.node_proxy.reporter_agent,
- self.node_proxy.config)
+ self.node_proxy_mgr = node_proxy_mgr
+ self.username = self.node_proxy_mgr.username
+ self.password = self.node_proxy_mgr.password
+ self.ssl_crt = self.node_proxy_mgr.api_ssl_crt
+ self.ssl_key = self.node_proxy_mgr.api_ssl_key
+ self.system = self.node_proxy_mgr.system
+ self.reporter_agent = self.node_proxy_mgr.reporter_agent
+ self.config = self.node_proxy_mgr.config
+ self.api = API(self.system, self.reporter_agent, self.config)
def check_auth(self, realm: str, username: str, password: str) -> bool:
return self.username == username and \
self.password == password
def shutdown(self) -> None:
- self.log.logger.info('Stopping node-proxy API...')
+ self.log.info('Stopping node-proxy API...')
self.cp_shutdown_event.set()
def run(self) -> None:
- self.log.logger.info('node-proxy API configuration...')
+ self.log.info('node-proxy API configuration...')
cherrypy.config.update({
'environment': 'production',
'engine.autoreload.on': False,
cherrypy.server.unsubscribe()
try:
cherrypy.engine.start()
- self.log.logger.info('node-proxy API started.')
+ self.log.info('node-proxy API started.')
self.cp_shutdown_event.wait()
self.cp_shutdown_event.clear()
- cherrypy.engine.stop()
+ cherrypy.engine.exit()
cherrypy.server.httpserver = None
- self.log.logger.info('node-proxy API shutdown.')
+ self.log.info('node-proxy API shutdown.')
except Exception as e:
- self.log.logger.error(f'node-proxy API error: {e}')
+ self.log.error(f'node-proxy API error: {e}')
import json
from ceph_node_proxy.basesystem import BaseSystem
from ceph_node_proxy.redfish_client import RedFishClient
-from threading import Thread, Lock
from time import sleep
-from ceph_node_proxy.util import Logger, retry
+from ceph_node_proxy.util import get_logger
from typing import Dict, Any, List, Callable, Union
from urllib.error import HTTPError, URLError
self.common_endpoints: List[str] = kw.get('common_endpoints', ['/Systems/System.Embedded.1',
'/UpdateService'])
self.chassis_endpoint: str = kw.get('chassis_endpoint', '/Chassis/System.Embedded.1')
- self.log = Logger(__name__)
+ self.log = get_logger(__name__)
self.host: str = kw['host']
self.port: str = kw['port']
self.username: str = kw['username']
self.password: str = kw['password']
# move the following line (class attribute?)
self.client: RedFishClient = RedFishClient(host=self.host, port=self.port, username=self.username, password=self.password)
- self.log.logger.info(f'redfish system initialization, host: {self.host}, user: {self.username}')
-
- self.run: bool = False
- self.thread: Thread
+ self.log.info(f'redfish system initialization, host: {self.host}, user: {self.username}')
self.data_ready: bool = False
self.previous_data: Dict = {}
- self.lock: Lock = Lock()
self.data: Dict[str, Dict[str, Any]] = {}
self._system: Dict[str, Dict[str, Any]] = {}
self._sys: Dict[str, Any] = {}
'firmwares'])
self.update_funcs: List[Callable] = []
for component in self.component_list:
- self.log.logger.debug(f'adding: {component} to hw component gathered list.')
+ self.log.debug(f'adding: {component} to hw component gathered list.')
func = f'_update_{component}'
if hasattr(self, func):
f = getattr(self, func)
self.update_funcs.append(f)
- self.start_client()
-
- def start_client(self) -> None:
+ def main(self) -> None:
+ self.stop = False
self.client.login()
- self.start_update_loop()
-
- def start_update_loop(self) -> None:
- self.run = True
- self.thread = Thread(target=self.update)
- self.thread.start()
-
- def stop_update_loop(self) -> None:
- self.run = False
- self.thread.join()
-
- def update(self) -> None:
- # this loop can have:
- # - caching logic
- while self.run:
- self.log.logger.debug('waiting for a lock in the update loop.')
- self.lock.acquire()
- self.log.logger.debug('lock acquired in the update loop.')
- try:
- self._update_system()
- self._update_sn()
-
- with concurrent.futures.ThreadPoolExecutor() as executor:
- executor.map(lambda f: f(), self.update_funcs)
-
- self.data_ready = True
- except RuntimeError as e:
- self.run = False
- self.log.logger.error(f'Error detected, trying to gracefully log out from redfish api.\n{e}')
- self.client.logout()
- finally:
- self.lock.release()
- sleep(5)
- self.log.logger.debug('lock released in the update loop.')
+ while not self.stop:
+ self.log.debug('waiting for a lock in the update loop.')
+ with self.lock:
+ if not self.pending_shutdown:
+ self.log.debug('lock acquired in the update loop.')
+ try:
+ self._update_system()
+ self._update_sn()
+
+ with concurrent.futures.ThreadPoolExecutor() as executor:
+ executor.map(lambda f: f(), self.update_funcs)
+
+ self.data_ready = True
+ except RuntimeError as e:
+ self.stop = True
+ self.log.error(f'Error detected, trying to gracefully log out from redfish api.\n{e}')
+ self.client.logout()
+ raise
+ sleep(5)
+ self.log.debug('lock released in the update loop.')
+ self.log.debug('exiting update loop.')
+ raise SystemExit(0)
def flush(self) -> None:
- self.log.logger.debug('Acquiring lock to flush data.')
+ self.log.debug('Acquiring lock to flush data.')
self.lock.acquire()
- self.log.logger.debug('Lock acquired, flushing data.')
+ self.log.debug('Lock acquired, flushing data.')
self._system = {}
self.previous_data = {}
- self.log.logger.info('Data flushed.')
+ self.log.info('Data flushed.')
self.data_ready = False
- self.log.logger.debug('Data marked as not ready.')
+ self.log.debug('Data marked as not ready.')
self.lock.release()
- self.log.logger.debug('Released the lock after flushing data.')
+ self.log.debug('Released the lock after flushing data.')
- @retry(retries=10, delay=2)
+ # @retry(retries=10, delay=2)
def _get_path(self, path: str) -> Dict:
+ result: Dict[str, Any] = {}
try:
- result = self.client.get_path(path)
+ if not self.pending_shutdown:
+ self.log.debug(f'Getting path: {path}')
+ result = self.client.get_path(path)
+ else:
+ self.log.debug(f'Pending shutdown, aborting query to {path}')
except RuntimeError:
raise
if result is None:
- self.log.logger.error(f'The client reported an error when getting path: {path}')
+ self.log.error(f'The client reported an error when getting path: {path}')
raise RuntimeError(f'Could not get path: {path}')
return result
endpoint=endpoint,
timeout=10)
except HTTPError as e:
- self.log.logger.error(f"Couldn't get the ident device LED status for device '{device}': {e}")
+ self.log.error(f"Couldn't get the ident device LED status for device '{device}': {e}")
raise
response_json = json.loads(result[1])
_result: Dict[str, Any] = {'http_code': result[2]}
endpoint=self._sys['storage'][device]['redfish_endpoint']
)
except (HTTPError, KeyError) as e:
- self.log.logger.error(f"Couldn't set the ident device LED for device '{device}': {e}")
+ self.log.error(f"Couldn't set the ident device LED for device '{device}': {e}")
raise
return status
endpoint=endpoint,
timeout=10)
except HTTPError as e:
- self.log.logger.error(f"Couldn't get the ident chassis LED status: {e}")
+ self.log.error(f"Couldn't get the ident chassis LED status: {e}")
raise
response_json = json.loads(result[1])
_result: Dict[str, Any] = {'http_code': result[2]}
endpoint=f'/redfish/v1{self.chassis_endpoint}'
)
except HTTPError as e:
- self.log.logger.error(f"Couldn't set the ident chassis LED: {e}")
+ self.log.error(f"Couldn't set the ident chassis LED: {e}")
raise
return status
- def shutdown(self, force: bool = False) -> int:
+ def shutdown_host(self, force: bool = False) -> int:
reboot_type: str = 'GracefulRebootWithForcedShutdown' if force else 'GracefulRebootWithoutForcedShutdown'
try:
job_id: str = self.create_reboot_job(reboot_type)
status = self.schedule_reboot_job(job_id)
except (HTTPError, KeyError) as e:
- self.log.logger.error(f"Couldn't create the reboot job: {e}")
+ self.log.error(f"Couldn't create the reboot job: {e}")
raise
return status
job_id: str = self.create_reboot_job('PowerCycle')
status = self.schedule_reboot_job(job_id)
except (HTTPError, URLError) as e:
- self.log.logger.error(f"Couldn't perform power cycle: {e}")
+ self.log.error(f"Couldn't perform power cycle: {e}")
raise
return status
)
job_id: str = headers['Location'].split('/')[-1]
except (HTTPError, URLError) as e:
- self.log.logger.error(f"Couldn't create the reboot job: {e}")
+ self.log.error(f"Couldn't create the reboot job: {e}")
raise
return job_id
endpoint=self.setup_job_queue_endpoint
)
except (HTTPError, KeyError) as e:
- self.log.logger.error(f"Couldn't schedule the reboot job: {e}")
+ self.log.error(f"Couldn't schedule the reboot job: {e}")
raise
return status
import socket
-from ceph_node_proxy.util import Config
+from threading import Lock
+from ceph_node_proxy.util import Config, get_logger, BaseThread
from typing import Dict, Any
from ceph_node_proxy.baseclient import BaseClient
-class BaseSystem:
+class BaseSystem(BaseThread):
def __init__(self, **kw: Any) -> None:
+ super().__init__()
+ self.lock: Lock = Lock()
self._system: Dict = {}
- self.config: Config = kw['config']
+ self.config: Config = kw.get('config', {})
self.client: BaseClient
+ self.log = get_logger(__name__)
+
+ def main(self) -> None:
+ raise NotImplementedError()
def get_system(self) -> Dict[str, Any]:
raise NotImplementedError()
def get_host(self) -> str:
return socket.gethostname()
- def start_update_loop(self) -> None:
- raise NotImplementedError()
-
def stop_update_loop(self) -> None:
raise NotImplementedError()
- def start_client(self) -> None:
- raise NotImplementedError()
-
def flush(self) -> None:
raise NotImplementedError()
- def shutdown(self, force: bool = False) -> int:
+ def shutdown_host(self, force: bool = False) -> int:
raise NotImplementedError()
def powercycle(self) -> int:
-from threading import Thread
from ceph_node_proxy.redfishdellsystem import RedfishDellSystem
from ceph_node_proxy.api import NodeProxyApi
from ceph_node_proxy.reporter import Reporter
-from ceph_node_proxy.util import Config, Logger, http_req, write_tmp_file
+from ceph_node_proxy.util import Config, get_logger, http_req, write_tmp_file, CONFIG
from typing import Dict, Any, Optional
import argparse
-import traceback
-import logging
import os
import ssl
import json
import time
+import signal
-logger = logging.getLogger(__name__)
-
-DEFAULT_CONFIG = {
- 'reporter': {
- 'check_interval': 5,
- 'push_data_max_retries': 30,
- 'endpoint': 'https://127.0.0.1:7150/node-proxy/data',
- },
- 'system': {
- 'refresh_interval': 5
- },
- 'server': {
- 'port': 8080,
- },
- 'logging': {
- 'level': 20,
- }
-}
-
-
-class NodeProxyManager(Thread):
- def __init__(self,
- mgr_host: str,
- cephx_name: str,
- cephx_secret: str,
- ca_path: str,
- api_ssl_crt: str,
- api_ssl_key: str,
- mgr_agent_port: int = 7150):
- super().__init__()
- self.mgr_host = mgr_host
- self.cephx_name = cephx_name
- self.cephx_secret = cephx_secret
- self.ca_path = ca_path
- self.api_ssl_crt = api_ssl_crt
- self.api_ssl_key = api_ssl_key
- self.mgr_agent_port = str(mgr_agent_port)
- self.stop = False
+
+class NodeProxyManager:
+ def __init__(self, **kw: Any) -> None:
+ self.exc: Optional[Exception] = None
+ self.log = get_logger(__name__)
+ self.mgr_host: str = kw['mgr_host']
+ self.cephx_name: str = kw['cephx_name']
+ self.cephx_secret: str = kw['cephx_secret']
+ self.ca_path: str = kw['ca_path']
+ self.api_ssl_crt: str = kw['api_ssl_crt']
+ self.api_ssl_key: str = kw['api_ssl_key']
+ self.mgr_agent_port: str = str(kw['mgr_agent_port'])
+ self.stop: bool = False
self.ssl_ctx = ssl.create_default_context()
self.ssl_ctx.check_hostname = True
self.ssl_ctx.verify_mode = ssl.CERT_REQUIRED
self.ssl_ctx.load_verify_locations(self.ca_path)
+ self.reporter_scheme: str = kw.get('reporter_scheme', 'https')
+ self.reporter_endpoint: str = kw.get('reporter_endpoint', '/node-proxy/data')
+ self.cephx = {'cephx': {'name': self.cephx_name,
+ 'secret': self.cephx_secret}}
+ self.config = Config('/etc/ceph/node-proxy.yml', config=CONFIG)
def run(self) -> None:
self.init()
self.loop()
def init(self) -> None:
- node_proxy_meta = {
- 'cephx': {
- 'name': self.cephx_name,
- 'secret': self.cephx_secret
- }
- }
+ self.init_system()
+ self.init_reporter()
+ self.init_api()
+
+ def fetch_oob_details(self) -> Dict[str, str]:
headers, result, status = http_req(hostname=self.mgr_host,
port=self.mgr_agent_port,
- data=json.dumps(node_proxy_meta),
+ data=json.dumps(self.cephx),
endpoint='/node-proxy/oob',
ssl_ctx=self.ssl_ctx)
if status != 200:
msg = f'No out of band tool details could be loaded: {status}, {result}'
- logger.debug(msg)
+ self.log.debug(msg)
raise RuntimeError(msg)
result_json = json.loads(result)
- kwargs = {
+ oob_details: Dict[str, str] = {
'host': result_json['result']['addr'],
'username': result_json['result']['username'],
'password': result_json['result']['password'],
- 'cephx': node_proxy_meta['cephx'],
- 'mgr_host': self.mgr_host,
- 'mgr_agent_port': self.mgr_agent_port,
- 'api_ssl_crt': self.api_ssl_crt,
- 'api_ssl_key': self.api_ssl_key
+ 'port': result_json['result'].get('port', '443')
}
- if result_json['result'].get('port'):
- kwargs['port'] = result_json['result']['port']
-
- self.node_proxy: NodeProxy = NodeProxy(**kwargs)
- self.node_proxy.start()
-
- def loop(self) -> None:
- while not self.stop:
- try:
- status = self.node_proxy.check_status()
- label = 'Ok' if status else 'Critical'
- logger.debug(f'node-proxy status: {label}')
- except Exception as e:
- logger.error(f'node-proxy not running: {e.__class__.__name__}: {e}')
- time.sleep(120)
- self.init()
- else:
- logger.debug('node-proxy alive, next check in 60sec.')
- time.sleep(60)
-
- def shutdown(self) -> None:
- self.stop = True
- # if `self.node_proxy.shutdown()` is called before self.start(), it will fail.
- if self.__dict__.get('node_proxy'):
- self.node_proxy.shutdown()
+ return oob_details
-
-class NodeProxy(Thread):
- def __init__(self, **kw: Any) -> None:
- super().__init__()
- self.username: str = kw.get('username', '')
- self.password: str = kw.get('password', '')
- self.host: str = kw.get('host', '')
- self.port: int = kw.get('port', 443)
- self.cephx: Dict[str, Any] = kw.get('cephx', {})
- self.reporter_scheme: str = kw.get('reporter_scheme', 'https')
- self.mgr_host: str = kw.get('mgr_host', '')
- self.mgr_agent_port: str = kw.get('mgr_agent_port', '')
- self.reporter_endpoint: str = kw.get('reporter_endpoint', '/node-proxy/data')
- self.api_ssl_crt: str = kw.get('api_ssl_crt', '')
- self.api_ssl_key: str = kw.get('api_ssl_key', '')
- self.exc: Optional[Exception] = None
- self.log = Logger(__name__)
-
- def run(self) -> None:
+ def init_system(self) -> None:
+ oob_details = self.fetch_oob_details()
+ self.username: str = oob_details['username']
+ self.password: str = oob_details['password']
try:
- self.main()
- except Exception as e:
- self.exc = e
- return
-
- def shutdown(self) -> None:
- self.log.logger.info('Shutting down node-proxy...')
- self.system.client.logout()
- self.system.stop_update_loop()
- self.reporter_agent.stop()
-
- def check_status(self) -> bool:
- if self.__dict__.get('system') and not self.system.run:
- raise RuntimeError('node-proxy encountered an error.')
- if self.exc:
- traceback.print_tb(self.exc.__traceback__)
- self.log.logger.error(f'{self.exc.__class__.__name__}: {self.exc}')
- raise self.exc
- return True
-
- def main(self) -> None:
- # TODO: add a check and fail if host/username/password/data aren't passed
- self.config = Config('/etc/ceph/node-proxy.yml', default_config=DEFAULT_CONFIG)
- self.log = Logger(__name__, level=self.config.__dict__['logging']['level'])
-
- # create the redfish system and the obsever
- self.log.logger.info('Server initialization...')
- try:
- self.system = RedfishDellSystem(host=self.host,
- port=self.port,
- username=self.username,
- password=self.password,
+ self.system = RedfishDellSystem(host=oob_details['host'],
+ port=oob_details['port'],
+ username=oob_details['username'],
+ password=oob_details['password'],
config=self.config)
+ self.system.start()
except RuntimeError:
- self.log.logger.error("Can't initialize the redfish system.")
+ self.log.error("Can't initialize the redfish system.")
raise
+ def init_reporter(self) -> None:
try:
self.reporter_agent = Reporter(self.system,
self.cephx,
reporter_hostname=self.mgr_host,
reporter_port=self.mgr_agent_port,
reporter_endpoint=self.reporter_endpoint)
- self.reporter_agent.run()
+ self.reporter_agent.start()
except RuntimeError:
- self.log.logger.error("Can't initialize the reporter.")
+ self.log.error("Can't initialize the reporter.")
raise
+ def init_api(self) -> None:
try:
- self.log.logger.info('Starting node-proxy API...')
- self.api = NodeProxyApi(self,
- username=self.username,
- password=self.password,
- ssl_crt=self.api_ssl_crt,
- ssl_key=self.api_ssl_key)
+ self.log.info('Starting node-proxy API...')
+ self.api = NodeProxyApi(self)
self.api.start()
except Exception as e:
- self.log.logger.error(f"Can't start node-proxy API: {e}")
+ self.log.error(f"Can't start node-proxy API: {e}")
raise
+ def loop(self) -> None:
+ while not self.stop:
+ for thread in [self.system, self.reporter_agent]:
+ try:
+ status = thread.check_status()
+ label = 'Ok' if status else 'Critical'
+ self.log.debug(f'{thread} status: {label}')
+ except Exception as e:
+ self.log.error(f'{thread} not running: {e.__class__.__name__}: {e}')
+ thread.shutdown()
+ self.init_system()
+ self.init_reporter()
+ self.log.debug('All threads are alive, next check in 20sec.')
+ time.sleep(20)
+
+ def shutdown(self) -> None:
+ self.stop = True
+ # if `self.system.shutdown()` is called before self.start(), it will fail.
+ if hasattr(self, 'api'):
+ self.api.shutdown()
+ if hasattr(self, 'reporter_agent'):
+ self.reporter_agent.shutdown()
+ if hasattr(self, 'system'):
+ self.system.shutdown()
+
+
+def handler(signum: Any, frame: Any, t_mgr: 'NodeProxyManager') -> None:
+ t_mgr.system.pending_shutdown = True
+ t_mgr.log.info('SIGTERM caught, shutting down threads...')
+ t_mgr.shutdown()
+ t_mgr.log.info('Logging out from RedFish API')
+ t_mgr.system.client.logout()
+ raise SystemExit(0)
+
def main() -> None:
parser = argparse.ArgumentParser(
help='path of config file in json format',
required=True
)
+ parser.add_argument(
+ '--debug',
+ help='increase logging verbosity (debug level)',
+ action='store_true',
+ )
args = parser.parse_args()
+ if args.debug:
+ CONFIG['logging']['level'] = 10
if not os.path.exists(args.config):
raise Exception(f'No config file found at provided config path: {args.config}')
listener_key = config['listener.key']
name = config['name']
- f = write_tmp_file(root_cert,
- prefix_name='cephadm-endpoint-root-cert')
+ ca_file = write_tmp_file(root_cert,
+ prefix_name='cephadm-endpoint-root-cert')
node_proxy_mgr = NodeProxyManager(mgr_host=target_ip,
cephx_name=name,
cephx_secret=keyring,
mgr_agent_port=target_port,
- ca_path=f.name,
+ ca_path=ca_file.name,
api_ssl_crt=listener_cert,
api_ssl_key=listener_key)
- if not node_proxy_mgr.is_alive():
- node_proxy_mgr.start()
+ signal.signal(signal.SIGTERM,
+ lambda signum, frame: handler(signum, frame, node_proxy_mgr))
+ node_proxy_mgr.run()
if __name__ == '__main__':
import json
from urllib.error import HTTPError, URLError
from ceph_node_proxy.baseclient import BaseClient
-from ceph_node_proxy.util import Logger, http_req
+from ceph_node_proxy.util import get_logger, http_req
from typing import Dict, Any, Tuple, Optional
from http.client import HTTPMessage
username: str = '',
password: str = ''):
super().__init__(host, username, password)
- self.log: Logger = Logger(__name__)
- self.log.logger.info(f'Initializing redfish client {__name__}')
+ self.log = get_logger(__name__)
+ self.log.info(f'Initializing redfish client {__name__}')
self.host: str = host
self.port: str = port
self.url: str = f'https://{self.host}:{self.port}'
def login(self) -> None:
if not self.is_logged_in():
- self.log.logger.info('Logging in to '
- f"{self.url} as '{self.username}'")
+ self.log.info('Logging in to '
+ f"{self.url} as '{self.username}'")
oob_credentials = json.dumps({'UserName': self.username,
'Password': self.password})
headers = {'Content-Type': 'application/json'}
headers=headers,
endpoint='/redfish/v1/SessionService/Sessions/')
if _status_code != 201:
- self.log.logger.error(f"Can't log in to {self.url} as '{self.username}': {_status_code}")
+ self.log.error(f"Can't log in to {self.url} as '{self.username}': {_status_code}")
raise RuntimeError
except URLError as e:
msg = f"Can't log in to {self.url} as '{self.username}': {e}"
- self.log.logger.error(msg)
+ self.log.error(msg)
raise RuntimeError
self.token = _headers['X-Auth-Token']
self.location = _headers['Location']
def is_logged_in(self) -> bool:
- self.log.logger.debug(f'Checking token validity for {self.url}')
+ self.log.debug(f'Checking token validity for {self.url}')
if not self.location or not self.token:
- self.log.logger.debug(f'No token found for {self.url}.')
+ self.log.debug(f'No token found for {self.url}.')
return False
headers = {'X-Auth-Token': self.token}
try:
_headers, _data, _status_code = self.query(headers=headers,
endpoint=self.location)
except URLError as e:
- self.log.logger.error("Can't check token "
- f'validity for {self.url}: {e}')
+ self.log.error("Can't check token "
+ f'validity for {self.url}: {e}')
raise
return _status_code == 200
endpoint=self.location)
result = json.loads(_data)
except URLError:
- self.log.logger.error(f"Can't log out from {self.url}")
+ self.log.error(f"Can't log out from {self.url}")
self.location = ''
self.token = ''
result_json = json.loads(result)
return result_json
except URLError as e:
- self.log.logger.error(f"Can't get path {path}:\n{e}")
+ self.log.error(f"Can't get path {path}:\n{e}")
raise RuntimeError
def query(self,
return response_headers, response_str, response_status
except (HTTPError, URLError) as e:
- self.log.logger.debug(f'{e}')
+ self.log.debug(f'{e}')
raise
from ceph_node_proxy.baseredfishsystem import BaseRedfishSystem
-from ceph_node_proxy.util import Logger, normalize_dict, to_snake_case
+from ceph_node_proxy.util import get_logger, normalize_dict, to_snake_case
from typing import Dict, Any, List
class RedfishDellSystem(BaseRedfishSystem):
def __init__(self, **kw: Any) -> None:
super().__init__(**kw)
- self.log = Logger(__name__)
+ self.log = get_logger(__name__)
self.job_service_endpoint: str = '/redfish/v1/Managers/iDRAC.Embedded.1/Oem/Dell/DellJobService'
self.create_reboot_job_endpoint: str = f'{self.job_service_endpoint}/Actions/DellJobService.CreateRebootJob'
self.setup_job_queue_endpoint: str = f'{self.job_service_endpoint}/Actions/DellJobService.SetupJobQueue'
try:
result[member_id][to_snake_case(field)] = member_info[field]
except KeyError:
- self.log.logger.warning(f'Could not find field: {field} in member_info: {member_info}')
+ self.log.warning(f'Could not find field: {field} in member_info: {member_info}')
return normalize_dict(result)
try:
result[_id][to_snake_case(field)] = member_elt[field]
except KeyError:
- self.log.logger.warning(f'Could not find field: {field} in data: {data[elt]}')
+ self.log.warning(f'Could not find field: {field} in data: {data[elt]}')
return normalize_dict(result)
def get_sn(self) -> str:
def _update_network(self) -> None:
fields = ['Description', 'Name', 'SpeedMbps', 'Status']
- self.log.logger.debug('Updating network')
+ self.log.debug('Updating network')
self._sys['network'] = self.build_common_data(data=self._system['Systems'],
fields=fields,
path='EthernetInterfaces')
'Model',
'Status',
'Manufacturer']
- self.log.logger.debug('Updating processors')
+ self.log.debug('Updating processors')
self._sys['processors'] = self.build_common_data(data=self._system['Systems'],
fields=fields,
path='Processors')
'PhysicalLocation']
entities = self.get_members(data=self._system['Systems'],
path='Storage')
- self.log.logger.debug('Updating storage')
+ self.log.debug('Updating storage')
result: Dict[str, Dict[str, Dict]] = dict()
for entity in entities:
for drive in entity['Drives']:
self._sys['storage'] = normalize_dict(result)
def _update_sn(self) -> None:
- self.log.logger.debug('Updating serial number')
+ self.log.debug('Updating serial number')
self._sys['SKU'] = self._system['Systems']['SKU']
def _update_memory(self) -> None:
'MemoryDeviceType',
'CapacityMiB',
'Status']
- self.log.logger.debug('Updating memory')
+ self.log.debug('Updating memory')
self._sys['memory'] = self.build_common_data(data=self._system['Systems'],
fields=fields,
path='Memory')
'Status'
]
}
- self.log.logger.debug('Updating powersupplies')
+ self.log.debug('Updating powersupplies')
self._sys['power'] = self.build_chassis_data(fields, 'Power')
def _update_fans(self) -> None:
'Status'
],
}
- self.log.logger.debug('Updating fans')
+ self.log.debug('Updating fans')
self._sys['fans'] = self.build_chassis_data(fields, 'Thermal')
def _update_firmwares(self) -> None:
'Updateable',
'Status',
]
- self.log.logger.debug('Updating firmwares')
+ self.log.debug('Updating firmwares')
self._sys['firmwares'] = self.build_common_data(data=self._system['UpdateService'],
fields=fields,
path='FirmwareInventory')
-from threading import Thread
import time
import json
-from ceph_node_proxy.util import Logger, http_req
+from ceph_node_proxy.util import get_logger, http_req, BaseThread
from urllib.error import HTTPError, URLError
from typing import Dict, Any
-class Reporter:
+class Reporter(BaseThread):
def __init__(self,
system: Any,
cephx: Dict[str, Any],
reporter_hostname: str = '',
reporter_port: str = '443',
reporter_endpoint: str = '/node-proxy/data') -> None:
+ super().__init__()
self.system = system
self.data: Dict[str, Any] = {}
- self.finish = False
+ self.stop: bool = False
self.cephx = cephx
- self.data['cephx'] = self.cephx
+ self.data['cephx'] = self.cephx['cephx']
self.reporter_scheme: str = reporter_scheme
self.reporter_hostname: str = reporter_hostname
self.reporter_port: str = reporter_port
self.reporter_endpoint: str = reporter_endpoint
- self.log = Logger(__name__)
+ self.log = get_logger(__name__)
self.reporter_url: str = (f'{reporter_scheme}://{reporter_hostname}:'
f'{reporter_port}{reporter_endpoint}')
- self.log.logger.info(f'Reporter url set to {self.reporter_url}')
+ self.log.info(f'Reporter url set to {self.reporter_url}')
- def stop(self) -> None:
- self.finish = True
- self.thread.join()
-
- def run(self) -> None:
- self.thread = Thread(target=self.loop)
- self.thread.start()
-
- def loop(self) -> None:
- while not self.finish:
+ def main(self) -> None:
+ while not self.stop:
# Any logic to avoid sending the all the system
# information every loop can go here. In a real
# scenario probably we should just send the sub-parts
# that have changed to minimize the traffic in
# dense clusters
- self.log.logger.debug('waiting for a lock in reporter loop.')
- self.system.lock.acquire()
- self.log.logger.debug('lock acquired in reporter loop.')
- if self.system.data_ready:
- self.log.logger.debug('data ready to be sent to the mgr.')
- if not self.system.get_system() == self.system.previous_data:
- self.log.logger.info('data has changed since last iteration.')
- self.data['patch'] = self.system.get_system()
- try:
- # TODO: add a timeout parameter to the reporter in the config file
- self.log.logger.info(f'sending data to {self.reporter_url}')
- http_req(hostname=self.reporter_hostname,
- port=self.reporter_port,
- method='POST',
- headers={'Content-Type': 'application/json'},
- endpoint=self.reporter_endpoint,
- scheme=self.reporter_scheme,
- data=json.dumps(self.data))
- except (HTTPError, URLError) as e:
- self.log.logger.error(f"The reporter couldn't send data to the mgr: {e}")
- # Need to add a new parameter 'max_retries' to the reporter if it can't
- # send the data for more than x times, maybe the daemon should stop altogether
- else:
- self.system.previous_data = self.system.get_system()
- else:
- self.log.logger.debug('no diff, not sending data to the mgr.')
- self.system.lock.release()
- self.log.logger.debug('lock released in reporter loop.')
- time.sleep(5)
+ self.log.debug('waiting for a lock in reporter loop.')
+ with self.system.lock:
+ if not self.system.pending_shutdown:
+ self.log.debug('lock acquired in reporter loop.')
+ if self.system.data_ready:
+ self.log.debug('data ready to be sent to the mgr.')
+ if not self.system.get_system() == self.system.previous_data:
+ self.log.info('data has changed since last iteration.')
+ self.data['patch'] = self.system.get_system()
+ try:
+ # TODO: add a timeout parameter to the reporter in the config file
+ self.log.info(f'sending data to {self.reporter_url}')
+ http_req(hostname=self.reporter_hostname,
+ port=self.reporter_port,
+ method='POST',
+ headers={'Content-Type': 'application/json'},
+ endpoint=self.reporter_endpoint,
+ scheme=self.reporter_scheme,
+ data=json.dumps(self.data))
+ except (HTTPError, URLError) as e:
+ self.log.error(f"The reporter couldn't send data to the mgr: {e}")
+ raise
+ # Need to add a new parameter 'max_retries' to the reporter if it can't
+ # send the data for more than x times, maybe the daemon should stop altogether
+ else:
+ self.system.previous_data = self.system.get_system()
+ else:
+ self.log.debug('no diff, not sending data to the mgr.')
+ time.sleep(5)
+ self.log.debug('lock released in reporter loop.')
+ self.log.debug('exiting reporter loop.')
+ raise SystemExit(0)
import time
import re
import ssl
+import traceback
+import threading
from tempfile import NamedTemporaryFile, _TemporaryFileWrapper
from urllib.error import HTTPError, URLError
from urllib.request import urlopen, Request
-from typing import Dict, List, Callable, Any, Optional, MutableMapping, Tuple
-
-
-class Logger:
- _Logger: List['Logger'] = []
-
- def __init__(self, name: str, level: int = logging.INFO):
- self.name = name
- self.level = level
-
- Logger._Logger.append(self)
- self.logger = self.get_logger()
-
- def get_logger(self) -> logging.Logger:
- logger = logging.getLogger(self.name)
- logger.setLevel(self.level)
- handler = logging.StreamHandler()
- handler.setLevel(self.level)
- fmt = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- handler.setFormatter(fmt)
- logger.handlers.clear()
- logger.addHandler(handler)
- logger.propagate = False
-
- return logger
+from typing import Dict, Callable, Any, Optional, MutableMapping, Tuple, Union
+
+
+CONFIG: Dict[str, Any] = {
+ 'reporter': {
+ 'check_interval': 5,
+ 'push_data_max_retries': 30,
+ 'endpoint': 'https://%(mgr_host):%(mgr_port)/node-proxy/data',
+ },
+ 'system': {
+ 'refresh_interval': 5
+ },
+ 'api': {
+ 'port': 9456,
+ },
+ 'logging': {
+ 'level': logging.INFO,
+ }
+}
+
+
+def get_logger(name: str, level: Union[int, str] = logging.NOTSET) -> logging.Logger:
+ if level == logging.NOTSET:
+ log_level = CONFIG['logging']['level']
+ logger = logging.getLogger(name)
+ logger.setLevel(log_level)
+ handler = logging.StreamHandler()
+ handler.setLevel(log_level)
+ fmt = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+ handler.setFormatter(fmt)
+ logger.handlers.clear()
+ logger.addHandler(handler)
+ logger.propagate = False
+
+ return logger
+
+
+logger = get_logger(__name__)
class Config:
-
def __init__(self,
config_file: str = '/etc/ceph/node-proxy.yaml',
- default_config: Dict[str, Any] = {}) -> None:
+ config: Dict[str, Any] = {}) -> None:
self.config_file = config_file
- self.default_config = default_config
+ self.config = config
self.load_config()
with open(self.config_file, 'r') as f:
self.config = yaml.safe_load(f)
else:
- self.config = self.default_config
+ self.config = self.config
- for k, v in self.default_config.items():
+ for k, v in self.config.items():
if k not in self.config.keys():
self.config[k] = v
for k, v in self.config.items():
setattr(self, k, v)
- # TODO: need to be improved
- for _l in Logger._Logger:
- _l.logger.setLevel(self.logging['level']) # type: ignore
- _l.logger.handlers[0].setLevel(self.logging['level']) # type: ignore
-
def reload(self, config_file: str = '') -> None:
if config_file != '':
self.config_file = config_file
self.load_config()
-log = Logger(__name__)
+class BaseThread(threading.Thread):
+ def __init__(self) -> None:
+ super().__init__()
+ self.exc: Optional[Exception] = None
+ self.stop: bool = False
+ self.daemon = True
+ self.name = self.__class__.__name__
+ self.log: logging.Logger = get_logger(__name__)
+ self.pending_shutdown: bool = False
+
+ def run(self) -> None:
+ logger.info(f'Starting {self.name}')
+ try:
+ self.main()
+ except Exception as e:
+ self.exc = e
+ return
+
+ def shutdown(self) -> None:
+ self.stop = True
+ self.pending_shutdown = True
+
+ def check_status(self) -> bool:
+ logger.debug(f'Checking status of {self.name}')
+ if self.exc:
+ traceback.print_tb(self.exc.__traceback__)
+ logger.error(f'Caught exception: {self.exc.__class__.__name__}')
+ raise self.exc
+ if not self.is_alive():
+ logger.info(f'{self.name} not alive')
+ self.start()
+ return True
+
+ def main(self) -> None:
+ raise NotImplementedError()
def to_snake_case(name: str) -> str:
_tries = retries
while _tries > 1:
try:
- log.logger.debug('{} {} attempt(s) left.'.format(f, _tries - 1))
+ logger.debug('{} {} attempt(s) left.'.format(f, _tries - 1))
return f(*args, **kwargs)
except exceptions:
time.sleep(delay)
_tries -= 1
- log.logger.warn('{} has failed after {} tries'.format(f, retries))
+ logger.warn('{} has failed after {} tries'.format(f, retries))
return f(*args, **kwargs)
return _retry
return decorator
daemon_type = 'node-proxy'
# TODO: update this if we make node-proxy an executable
- entrypoint = 'python3'
+ entrypoint = '/usr/sbin/ceph-node-proxy'
required_files = ['node-proxy.json']
@classmethod
# the config in _get_container_mounts above. They
# will both need to be updated when we have a proper
# location in the container for node-proxy
- args.extend(['/usr/share/ceph/ceph_node_proxy/main.py', '--config', '/usr/share/ceph/node-proxy.json'])
+ args.extend(['--config', '/usr/share/ceph/node-proxy.json'])
def validate(self):
# type: () -> None
def container(self, ctx: CephadmContext) -> CephContainer:
# So the container can modprobe iscsi_target_mod and have write perms
# to configfs we need to make this a privileged container.
- ctr = daemon_to_container(ctx, self, privileged=True, envs=['PYTHONPATH=$PYTHONPATH:/usr/share/ceph'])
+ ctr = daemon_to_container(ctx, self, privileged=True)
return to_deployment_container(ctx, ctr)
def config_and_keyring(
try:
headers, result, status = http_req(hostname=addr,
- port='8080',
+ port='9456',
headers=header,
method=method,
data=json.dumps(payload),
try:
headers, result, status = http_req(hostname=addr,
- port='8080',
+ port='9456',
headers=header,
method=method,
data=json.dumps(payload),
try:
headers, result, status = http_req(hostname=addr,
- port='8080',
+ port='9456',
headers=header,
data=json.dumps(payload),
endpoint=endpoint,
try:
headers, result, status = http_req(hostname=addr,
- port='8080',
+ port='9456',
headers=header,
data="{}",
endpoint=endpoint,