--- /dev/null
+import cherrypy
+from urllib.error import HTTPError
+from cherrypy._cpserver import Server
+from threading import Thread, Event
+from typing import Dict, Any, List
+from ceph_node_proxy.util import Config, Logger, write_tmp_file
+from ceph_node_proxy.basesystem import BaseSystem
+from ceph_node_proxy.reporter import Reporter
+from typing import TYPE_CHECKING, Optional
+
+if TYPE_CHECKING:
+ from ceph_node_proxy.main import NodeProxy
+
+
+@cherrypy.tools.auth_basic(on=True)
+@cherrypy.tools.allow(methods=['PUT'])
+@cherrypy.tools.json_out()
+class Admin():
+ def __init__(self, api: 'API') -> None:
+ self.api = api
+
+ @cherrypy.expose
+ def start(self) -> Dict[str, str]:
+ self.api.backend.start_client()
+ # self.backend.start_update_loop()
+ self.api.reporter.run()
+ return {'ok': 'node-proxy daemon started'}
+
+ @cherrypy.expose
+ def reload(self) -> Dict[str, str]:
+ self.api.config.reload()
+ return {'ok': 'node-proxy config reloaded'}
+
+ def _stop(self) -> None:
+ self.api.backend.stop_update_loop()
+ self.api.backend.client.logout()
+ self.api.reporter.stop()
+
+ @cherrypy.expose
+ def stop(self) -> Dict[str, str]:
+ self._stop()
+ return {'ok': 'node-proxy daemon stopped'}
+
+ @cherrypy.expose
+ def shutdown(self) -> Dict[str, str]:
+ self._stop()
+ cherrypy.engine.exit()
+ return {'ok': 'Server shutdown.'}
+
+ @cherrypy.expose
+ def flush(self) -> Dict[str, str]:
+ self.api.backend.flush()
+ return {'ok': 'node-proxy data flushed'}
+
+
+class API(Server):
+ def __init__(self,
+ backend: 'BaseSystem',
+ reporter: 'Reporter',
+ config: 'Config',
+ addr: str = '0.0.0.0',
+ port: int = 0) -> None:
+ super().__init__()
+ self.log = Logger(__name__)
+ self.backend = backend
+ self.reporter = reporter
+ self.config = config
+ self.socket_port = self.config.__dict__['server']['port'] if not port else port
+ self.socket_host = addr
+ self.subscribe()
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['GET'])
+ @cherrypy.tools.json_out()
+ def memory(self) -> Dict[str, Any]:
+ return {'memory': self.backend.get_memory()}
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['GET'])
+ @cherrypy.tools.json_out()
+ def network(self) -> Dict[str, Any]:
+ return {'network': self.backend.get_network()}
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['GET'])
+ @cherrypy.tools.json_out()
+ def processors(self) -> Dict[str, Any]:
+ return {'processors': self.backend.get_processors()}
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['GET'])
+ @cherrypy.tools.json_out()
+ def storage(self) -> Dict[str, Any]:
+ return {'storage': self.backend.get_storage()}
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['GET'])
+ @cherrypy.tools.json_out()
+ def power(self) -> Dict[str, Any]:
+ return {'power': self.backend.get_power()}
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['GET'])
+ @cherrypy.tools.json_out()
+ def fans(self) -> Dict[str, Any]:
+ return {'fans': self.backend.get_fans()}
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['GET'])
+ @cherrypy.tools.json_out()
+ def firmwares(self) -> Dict[str, Any]:
+ return {'firmwares': self.backend.get_firmwares()}
+
+ def _cp_dispatch(self, vpath: List[str]) -> 'API':
+ if vpath[0] == 'led' and len(vpath) > 1: # /led/{type}/{id}
+ _type = vpath[1]
+ cherrypy.request.params['type'] = _type
+ vpath.pop(1) # /led/{id} or # /led
+ if _type == 'drive' and len(vpath) > 1: # /led/{id}
+ _id = vpath[1]
+ vpath.pop(1) # /led
+ cherrypy.request.params['id'] = _id
+ vpath[0] = '_led'
+ # /<endpoint>
+ return self
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['POST'])
+ @cherrypy.tools.json_in()
+ @cherrypy.tools.json_out()
+ @cherrypy.tools.auth_basic(on=True)
+ def shutdown(self, **kw: Any) -> int:
+ data: Dict[str, bool] = cherrypy.request.json
+
+ if 'force' not in data.keys():
+ msg = "The key 'force' wasn't passed."
+ self.log.logger.debug(msg)
+ raise cherrypy.HTTPError(400, msg)
+ try:
+ result: int = self.backend.shutdown(force=data['force'])
+ except HTTPError as e:
+ raise cherrypy.HTTPError(e.code, e.reason)
+ return result
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['POST'])
+ @cherrypy.tools.json_in()
+ @cherrypy.tools.json_out()
+ @cherrypy.tools.auth_basic(on=True)
+ def powercycle(self, **kw: Any) -> int:
+ try:
+ result: int = self.backend.powercycle()
+ except HTTPError as e:
+ raise cherrypy.HTTPError(e.code, e.reason)
+ return result
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['GET', 'PATCH'])
+ @cherrypy.tools.json_in()
+ @cherrypy.tools.json_out()
+ @cherrypy.tools.auth_basic(on=True)
+ def _led(self, **kw: Any) -> Dict[str, Any]:
+ method: str = cherrypy.request.method
+ led_type: Optional[str] = kw.get('type')
+ id_drive: Optional[str] = kw.get('id')
+ result: Dict[str, Any] = dict()
+
+ if not led_type:
+ msg = "the led type must be provided (either 'chassis' or 'drive')."
+ self.log.logger.debug(msg)
+ raise cherrypy.HTTPError(400, msg)
+
+ if led_type == 'drive':
+ id_drive_required = not id_drive
+ if id_drive_required or id_drive not in self.backend.get_storage():
+ msg = 'A valid device ID must be provided.'
+ self.log.logger.debug(msg)
+ raise cherrypy.HTTPError(400, msg)
+
+ try:
+ if method == 'PATCH':
+ data: Dict[str, Any] = cherrypy.request.json
+
+ if 'state' not in data or data['state'] not in ['on', 'off']:
+ msg = "Invalid data. 'state' must be provided and have a valid value (on|off)."
+ self.log.logger.error(msg)
+ raise cherrypy.HTTPError(400, msg)
+
+ func: Any = (self.backend.device_led_on if led_type == 'drive' and data['state'] == 'on' else
+ self.backend.device_led_off if led_type == 'drive' and data['state'] == 'off' else
+ self.backend.chassis_led_on if led_type != 'drive' and data['state'] == 'on' else
+ self.backend.chassis_led_off if led_type != 'drive' and data['state'] == 'off' else None)
+
+ else:
+ func = self.backend.get_device_led if led_type == 'drive' else self.backend.get_chassis_led
+
+ result = func(id_drive) if led_type == 'drive' else func()
+
+ except HTTPError as e:
+ raise cherrypy.HTTPError(e.code, e.reason)
+ return result
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['GET'])
+ @cherrypy.tools.json_out()
+ def get_led(self, **kw: Dict[str, Any]) -> Dict[str, Any]:
+ return self.backend.get_led()
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['PATCH'])
+ @cherrypy.tools.json_in()
+ @cherrypy.tools.json_out()
+ @cherrypy.tools.auth_basic(on=True)
+ def set_led(self, **kw: Dict[str, Any]) -> Dict[str, Any]:
+ data = cherrypy.request.json
+ rc = self.backend.set_led(data)
+
+ if rc != 200:
+ cherrypy.response.status = rc
+ result = {'state': 'error: please, verify the data you sent.'}
+ else:
+ result = {'state': data['state'].lower()}
+ return result
+
+ def stop(self) -> None:
+ self.unsubscribe()
+ super().stop()
+
+
+class NodeProxyApi(Thread):
+ def __init__(self,
+ node_proxy: 'NodeProxy',
+ username: str,
+ password: str,
+ ssl_crt: str,
+ ssl_key: str) -> None:
+ super().__init__()
+ self.log = Logger(__name__)
+ self.cp_shutdown_event = Event()
+ self.node_proxy = node_proxy
+ self.username = username
+ self.password = password
+ self.ssl_crt = ssl_crt
+ self.ssl_key = ssl_key
+ self.api = API(self.node_proxy.system,
+ self.node_proxy.reporter_agent,
+ self.node_proxy.config)
+
+ def check_auth(self, realm: str, username: str, password: str) -> bool:
+ return self.username == username and \
+ self.password == password
+
+ def shutdown(self) -> None:
+ self.log.logger.info('Stopping node-proxy API...')
+ self.cp_shutdown_event.set()
+
+ def run(self) -> None:
+ self.log.logger.info('node-proxy API configuration...')
+ cherrypy.config.update({
+ 'environment': 'production',
+ 'engine.autoreload.on': False,
+ 'log.screen': True,
+ })
+ config = {'/': {
+ 'request.methods_with_bodies': ('POST', 'PUT', 'PATCH'),
+ 'tools.trailing_slash.on': False,
+ 'tools.auth_basic.realm': 'localhost',
+ 'tools.auth_basic.checkpassword': self.check_auth
+ }}
+ cherrypy.tree.mount(self.api, '/', config=config)
+ # cherrypy.tree.mount(admin, '/admin', config=config)
+
+ ssl_crt = write_tmp_file(self.ssl_crt,
+ prefix_name='listener-crt-')
+ ssl_key = write_tmp_file(self.ssl_key,
+ prefix_name='listener-key-')
+
+ self.api.ssl_certificate = ssl_crt.name
+ self.api.ssl_private_key = ssl_key.name
+
+ cherrypy.server.unsubscribe()
+ try:
+ cherrypy.engine.start()
+ self.log.logger.info('node-proxy API started.')
+ self.cp_shutdown_event.wait()
+ self.cp_shutdown_event.clear()
+ cherrypy.engine.stop()
+ cherrypy.server.httpserver = None
+ self.log.logger.info('node-proxy API shutdown.')
+ except Exception as e:
+ self.log.logger.error(f'node-proxy API error: {e}')
--- /dev/null
+from typing import Dict, Any
+
+
+class BaseClient:
+ def __init__(self,
+ host: str,
+ username: str,
+ password: str) -> None:
+ self.host = host
+ self.username = username
+ self.password = password
+
+ def login(self) -> None:
+ raise NotImplementedError()
+
+ def logout(self) -> Dict[str, Any]:
+ raise NotImplementedError()
+
+ def get_path(self, path: str) -> Dict:
+ raise NotImplementedError()
--- /dev/null
+import concurrent.futures
+import json
+from ceph_node_proxy.basesystem import BaseSystem
+from ceph_node_proxy.redfish_client import RedFishClient
+from threading import Thread, Lock
+from time import sleep
+from ceph_node_proxy.util import Logger, retry
+from typing import Dict, Any, List, Callable, Union
+from urllib.error import HTTPError, URLError
+
+
+class BaseRedfishSystem(BaseSystem):
+ def __init__(self, **kw: Any) -> None:
+ super().__init__(**kw)
+ self.common_endpoints: List[str] = kw.get('common_endpoints', ['/Systems/System.Embedded.1',
+ '/UpdateService'])
+ self.chassis_endpoint: str = kw.get('chassis_endpoint', '/Chassis/System.Embedded.1')
+ self.log = Logger(__name__)
+ self.host: str = kw['host']
+ self.port: str = kw['port']
+ self.username: str = kw['username']
+ self.password: str = kw['password']
+ # move the following line (class attribute?)
+ self.client: RedFishClient = RedFishClient(host=self.host, port=self.port, username=self.username, password=self.password)
+ self.log.logger.info(f'redfish system initialization, host: {self.host}, user: {self.username}')
+
+ self.run: bool = False
+ self.thread: Thread
+ self.data_ready: bool = False
+ self.previous_data: Dict = {}
+ self.lock: Lock = Lock()
+ self.data: Dict[str, Dict[str, Any]] = {}
+ self._system: Dict[str, Dict[str, Any]] = {}
+ self._sys: Dict[str, Any] = {}
+ self.job_service_endpoint: str = ''
+ self.create_reboot_job_endpoint: str = ''
+ self.setup_job_queue_endpoint: str = ''
+
+ self.start_client()
+
+ def start_client(self) -> None:
+ self.client.login()
+ self.start_update_loop()
+
+ def start_update_loop(self) -> None:
+ self.run = True
+ self.thread = Thread(target=self.update)
+ self.thread.start()
+
+ def stop_update_loop(self) -> None:
+ self.run = False
+ self.thread.join()
+
+ def update(self) -> None:
+ # this loop can have:
+ # - caching logic
+ while self.run:
+ self.log.logger.debug('waiting for a lock in the update loop.')
+ self.lock.acquire()
+ self.log.logger.debug('lock acquired in the update loop.')
+ try:
+ self._update_system()
+ self._update_sn()
+ update_funcs = [self._update_memory,
+ self._update_power,
+ self._update_fans,
+ self._update_network,
+ self._update_processors,
+ self._update_storage,
+ self._update_firmwares]
+
+ with concurrent.futures.ThreadPoolExecutor() as executor:
+ executor.map(lambda f: f(), update_funcs)
+
+ self.data_ready = True
+ sleep(5)
+ except RuntimeError as e:
+ self.run = False
+ self.log.logger.error(f'Error detected, trying to gracefully log out from redfish api.\n{e}')
+ self.client.logout()
+ finally:
+ self.lock.release()
+ self.log.logger.debug('lock released in the update loop.')
+
+ def flush(self) -> None:
+ self.log.logger.debug('Acquiring lock to flush data.')
+ self.lock.acquire()
+ self.log.logger.debug('Lock acquired, flushing data.')
+ self._system = {}
+ self.previous_data = {}
+ self.log.logger.info('Data flushed.')
+ self.data_ready = False
+ self.log.logger.debug('Data marked as not ready.')
+ self.lock.release()
+ self.log.logger.debug('Released the lock after flushing data.')
+
+ @retry(retries=10, delay=2)
+ def _get_path(self, path: str) -> Dict:
+ try:
+ result = self.client.get_path(path)
+ except RuntimeError:
+ raise
+ if result is None:
+ self.log.logger.error(f'The client reported an error when getting path: {path}')
+ raise RuntimeError(f'Could not get path: {path}')
+ return result
+
+ def get_members(self, data: Dict[str, Any], path: str) -> List:
+ _path = data[path]['@odata.id']
+ _data = self._get_path(_path)
+ return [self._get_path(member['@odata.id']) for member in _data['Members']]
+
+ def get_system(self) -> Dict[str, Any]:
+ result = {
+ 'host': self.get_host(),
+ 'sn': self.get_sn(),
+ 'status': {
+ 'storage': self.get_storage(),
+ 'processors': self.get_processors(),
+ 'network': self.get_network(),
+ 'memory': self.get_memory(),
+ 'power': self.get_power(),
+ 'fans': self.get_fans()
+ },
+ 'firmwares': self.get_firmwares(),
+ 'chassis': {'redfish_endpoint': f'/redfish/v1{self.chassis_endpoint}'} # TODO(guits): not ideal
+ }
+ return result
+
+ def _update_system(self) -> None:
+ for endpoint in self.common_endpoints:
+ result = self.client.get_path(endpoint)
+ _endpoint = endpoint.strip('/').split('/')[0]
+ self._system[_endpoint] = result
+
+ def _update_sn(self) -> None:
+ raise NotImplementedError()
+
+ def _update_memory(self) -> None:
+ raise NotImplementedError()
+
+ def _update_power(self) -> None:
+ raise NotImplementedError()
+
+ def _update_fans(self) -> None:
+ raise NotImplementedError()
+
+ def _update_network(self) -> None:
+ raise NotImplementedError()
+
+ def _update_processors(self) -> None:
+ raise NotImplementedError()
+
+ def _update_storage(self) -> None:
+ raise NotImplementedError()
+
+ def _update_firmwares(self) -> None:
+ raise NotImplementedError()
+
+ def device_led_on(self, device: str) -> int:
+ data: Dict[str, bool] = {'LocationIndicatorActive': True}
+ try:
+ result = self.set_device_led(device, data)
+ except (HTTPError, KeyError):
+ return 0
+ return result
+
+ def device_led_off(self, device: str) -> int:
+ data: Dict[str, bool] = {'LocationIndicatorActive': False}
+ try:
+ result = self.set_device_led(device, data)
+ except (HTTPError, KeyError):
+ return 0
+ return result
+
+ def chassis_led_on(self) -> int:
+ data: Dict[str, str] = {'IndicatorLED': 'Blinking'}
+ result = self.set_chassis_led(data)
+ return result
+
+ def chassis_led_off(self) -> int:
+ data: Dict[str, str] = {'IndicatorLED': 'Lit'}
+ result = self.set_chassis_led(data)
+ return result
+
+ def get_device_led(self, device: str) -> Dict[str, Any]:
+ endpoint = self._sys['storage'][device]['redfish_endpoint']
+ try:
+ result = self.client.query(method='GET',
+ endpoint=endpoint,
+ timeout=10)
+ except HTTPError as e:
+ self.log.logger.error(f"Couldn't get the ident device LED status for device '{device}': {e}")
+ raise
+ response_json = json.loads(result[1])
+ _result: Dict[str, Any] = {'http_code': result[2]}
+ if result[2] == 200:
+ _result['LocationIndicatorActive'] = response_json['LocationIndicatorActive']
+ else:
+ _result['LocationIndicatorActive'] = None
+ return _result
+
+ def set_device_led(self, device: str, data: Dict[str, bool]) -> int:
+ try:
+ _, response, status = self.client.query(
+ data=json.dumps(data),
+ method='PATCH',
+ endpoint=self._sys['storage'][device]['redfish_endpoint']
+ )
+ except (HTTPError, KeyError) as e:
+ self.log.logger.error(f"Couldn't set the ident device LED for device '{device}': {e}")
+ raise
+ return status
+
+ def get_chassis_led(self) -> Dict[str, Any]:
+ endpoint = f'/redfish/v1/{self.chassis_endpoint}'
+ try:
+ result = self.client.query(method='GET',
+ endpoint=endpoint,
+ timeout=10)
+ except HTTPError as e:
+ self.log.logger.error(f"Couldn't get the ident chassis LED status: {e}")
+ raise
+ response_json = json.loads(result[1])
+ _result: Dict[str, Any] = {'http_code': result[2]}
+ if result[2] == 200:
+ _result['LocationIndicatorActive'] = response_json['LocationIndicatorActive']
+ else:
+ _result['LocationIndicatorActive'] = None
+ return _result
+
+ def set_chassis_led(self, data: Dict[str, str]) -> int:
+ # '{"IndicatorLED": "Lit"}' -> LocationIndicatorActive = false
+ # '{"IndicatorLED": "Blinking"}' -> LocationIndicatorActive = true
+ try:
+ _, response, status = self.client.query(
+ data=json.dumps(data),
+ method='PATCH',
+ endpoint=f'/redfish/v1{self.chassis_endpoint}'
+ )
+ except HTTPError as e:
+ self.log.logger.error(f"Couldn't set the ident chassis LED: {e}")
+ raise
+ return status
+
+ def shutdown(self, force: bool = False) -> int:
+ reboot_type: str = 'GracefulRebootWithForcedShutdown' if force else 'GracefulRebootWithoutForcedShutdown'
+
+ try:
+ job_id: str = self.create_reboot_job(reboot_type)
+ status = self.schedule_reboot_job(job_id)
+ except (HTTPError, KeyError) as e:
+ self.log.logger.error(f"Couldn't create the reboot job: {e}")
+ raise
+ return status
+
+ def powercycle(self) -> int:
+ try:
+ job_id: str = self.create_reboot_job('PowerCycle')
+ status = self.schedule_reboot_job(job_id)
+ except (HTTPError, URLError) as e:
+ self.log.logger.error(f"Couldn't perform power cycle: {e}")
+ raise
+ return status
+
+ def create_reboot_job(self, reboot_type: str) -> str:
+ data: Dict[str, str] = dict(RebootJobType=reboot_type)
+ try:
+ headers, response, status = self.client.query(
+ data=json.dumps(data),
+ endpoint=self.create_reboot_job_endpoint
+ )
+ job_id: str = headers['Location'].split('/')[-1]
+ except (HTTPError, URLError) as e:
+ self.log.logger.error(f"Couldn't create the reboot job: {e}")
+ raise
+ return job_id
+
+ def schedule_reboot_job(self, job_id: str) -> int:
+ data: Dict[str, Union[List[str], str]] = dict(JobArray=[job_id], StartTimeInterval='TIME_NOW')
+ try:
+ headers, response, status = self.client.query(
+ data=json.dumps(data),
+ endpoint=self.setup_job_queue_endpoint
+ )
+ except (HTTPError, KeyError) as e:
+ self.log.logger.error(f"Couldn't schedule the reboot job: {e}")
+ raise
+ return status
--- /dev/null
+import socket
+from ceph_node_proxy.util import Config
+from typing import Dict, Any
+from ceph_node_proxy.baseclient import BaseClient
+
+
+class BaseSystem:
+ def __init__(self, **kw: Any) -> None:
+ self._system: Dict = {}
+ self.config: Config = kw['config']
+ self.client: BaseClient
+
+ def get_system(self) -> Dict[str, Any]:
+ raise NotImplementedError()
+
+ def get_status(self) -> Dict[str, Dict[str, Dict]]:
+ raise NotImplementedError()
+
+ def get_metadata(self) -> Dict[str, Dict[str, Dict]]:
+ raise NotImplementedError()
+
+ def get_processors(self) -> Dict[str, Dict[str, Dict]]:
+ raise NotImplementedError()
+
+ def get_memory(self) -> Dict[str, Dict[str, Dict]]:
+ raise NotImplementedError()
+
+ def get_fans(self) -> Dict[str, Dict[str, Dict]]:
+ raise NotImplementedError()
+
+ def get_power(self) -> Dict[str, Dict[str, Dict]]:
+ raise NotImplementedError()
+
+ def get_network(self) -> Dict[str, Dict[str, Dict]]:
+ raise NotImplementedError()
+
+ def get_storage(self) -> Dict[str, Dict[str, Dict]]:
+ raise NotImplementedError()
+
+ def get_firmwares(self) -> Dict[str, Dict[str, Dict]]:
+ raise NotImplementedError()
+
+ def get_sn(self) -> str:
+ raise NotImplementedError()
+
+ def get_led(self) -> Dict[str, Any]:
+ raise NotImplementedError()
+
+ def set_led(self, data: Dict[str, str]) -> int:
+ raise NotImplementedError()
+
+ def get_chassis_led(self) -> Dict[str, Any]:
+ raise NotImplementedError()
+
+ def set_chassis_led(self, data: Dict[str, str]) -> int:
+ raise NotImplementedError()
+
+ def device_led_on(self, device: str) -> int:
+ raise NotImplementedError()
+
+ def device_led_off(self, device: str) -> int:
+ raise NotImplementedError()
+
+ def get_device_led(self, device: str) -> Dict[str, Any]:
+ raise NotImplementedError()
+
+ def set_device_led(self, device: str, data: Dict[str, bool]) -> int:
+ raise NotImplementedError()
+
+ def chassis_led_on(self) -> int:
+ raise NotImplementedError()
+
+ def chassis_led_off(self) -> int:
+ raise NotImplementedError()
+
+ def get_host(self) -> str:
+ return socket.gethostname()
+
+ def start_update_loop(self) -> None:
+ raise NotImplementedError()
+
+ def stop_update_loop(self) -> None:
+ raise NotImplementedError()
+
+ def start_client(self) -> None:
+ raise NotImplementedError()
+
+ def flush(self) -> None:
+ raise NotImplementedError()
+
+ def shutdown(self, force: bool = False) -> int:
+ raise NotImplementedError()
+
+ def powercycle(self) -> int:
+ raise NotImplementedError()
--- /dev/null
+from threading import Thread
+from ceph_node_proxy.redfishdellsystem import RedfishDellSystem
+from ceph_node_proxy.api import NodeProxyApi
+from ceph_node_proxy.reporter import Reporter
+from ceph_node_proxy.util import Config, Logger, http_req, write_tmp_file
+from typing import Dict, Any, Optional
+
+import argparse
+import traceback
+import logging
+import os
+import ssl
+import json
+import time
+
+logger = logging.getLogger(__name__)
+
+DEFAULT_CONFIG = {
+ 'reporter': {
+ 'check_interval': 5,
+ 'push_data_max_retries': 30,
+ 'endpoint': 'https://127.0.0.1:7150/node-proxy/data',
+ },
+ 'system': {
+ 'refresh_interval': 5
+ },
+ 'server': {
+ 'port': 8080,
+ },
+ 'logging': {
+ 'level': 20,
+ }
+}
+
+
+class NodeProxyManager(Thread):
+ def __init__(self,
+ mgr_host: str,
+ cephx_name: str,
+ cephx_secret: str,
+ ca_path: str,
+ api_ssl_crt: str,
+ api_ssl_key: str,
+ mgr_agent_port: int = 7150):
+ super().__init__()
+ self.mgr_host = mgr_host
+ self.cephx_name = cephx_name
+ self.cephx_secret = cephx_secret
+ self.ca_path = ca_path
+ self.api_ssl_crt = api_ssl_crt
+ self.api_ssl_key = api_ssl_key
+ self.mgr_agent_port = str(mgr_agent_port)
+ self.stop = False
+ self.ssl_ctx = ssl.create_default_context()
+ self.ssl_ctx.check_hostname = True
+ self.ssl_ctx.verify_mode = ssl.CERT_REQUIRED
+ self.ssl_ctx.load_verify_locations(self.ca_path)
+
+ def run(self) -> None:
+ self.init()
+ self.loop()
+
+ def init(self) -> None:
+ node_proxy_meta = {
+ 'cephx': {
+ 'name': self.cephx_name,
+ 'secret': self.cephx_secret
+ }
+ }
+ headers, result, status = http_req(hostname=self.mgr_host,
+ port=self.mgr_agent_port,
+ data=json.dumps(node_proxy_meta),
+ endpoint='/node-proxy/oob',
+ ssl_ctx=self.ssl_ctx)
+ if status != 200:
+ msg = f'No out of band tool details could be loaded: {status}, {result}'
+ logger.debug(msg)
+ raise RuntimeError(msg)
+
+ result_json = json.loads(result)
+ kwargs = {
+ 'host': result_json['result']['addr'],
+ 'username': result_json['result']['username'],
+ 'password': result_json['result']['password'],
+ 'cephx': node_proxy_meta['cephx'],
+ 'mgr_host': self.mgr_host,
+ 'mgr_agent_port': self.mgr_agent_port,
+ 'api_ssl_crt': self.api_ssl_crt,
+ 'api_ssl_key': self.api_ssl_key
+ }
+ if result_json['result'].get('port'):
+ kwargs['port'] = result_json['result']['port']
+
+ self.node_proxy: NodeProxy = NodeProxy(**kwargs)
+ self.node_proxy.start()
+
+ def loop(self) -> None:
+ while not self.stop:
+ try:
+ status = self.node_proxy.check_status()
+ label = 'Ok' if status else 'Critical'
+ logger.debug(f'node-proxy status: {label}')
+ except Exception as e:
+ logger.error(f'node-proxy not running: {e.__class__.__name__}: {e}')
+ time.sleep(120)
+ self.init()
+ else:
+ logger.debug('node-proxy alive, next check in 60sec.')
+ time.sleep(60)
+
+ def shutdown(self) -> None:
+ self.stop = True
+ # if `self.node_proxy.shutdown()` is called before self.start(), it will fail.
+ if self.__dict__.get('node_proxy'):
+ self.node_proxy.shutdown()
+
+
+class NodeProxy(Thread):
+ def __init__(self, **kw: Any) -> None:
+ super().__init__()
+ self.username: str = kw.get('username', '')
+ self.password: str = kw.get('password', '')
+ self.host: str = kw.get('host', '')
+ self.port: int = kw.get('port', 443)
+ self.cephx: Dict[str, Any] = kw.get('cephx', {})
+ self.reporter_scheme: str = kw.get('reporter_scheme', 'https')
+ self.mgr_host: str = kw.get('mgr_host', '')
+ self.mgr_agent_port: str = kw.get('mgr_agent_port', '')
+ self.reporter_endpoint: str = kw.get('reporter_endpoint', '/node-proxy/data')
+ self.api_ssl_crt: str = kw.get('api_ssl_crt', '')
+ self.api_ssl_key: str = kw.get('api_ssl_key', '')
+ self.exc: Optional[Exception] = None
+ self.log = Logger(__name__)
+
+ def run(self) -> None:
+ try:
+ self.main()
+ except Exception as e:
+ self.exc = e
+ return
+
+ def shutdown(self) -> None:
+ self.log.logger.info('Shutting down node-proxy...')
+ self.system.client.logout()
+ self.system.stop_update_loop()
+ self.reporter_agent.stop()
+
+ def check_status(self) -> bool:
+ if self.__dict__.get('system') and not self.system.run:
+ raise RuntimeError('node-proxy encountered an error.')
+ if self.exc:
+ traceback.print_tb(self.exc.__traceback__)
+ self.log.logger.error(f'{self.exc.__class__.__name__}: {self.exc}')
+ raise self.exc
+ return True
+
+ def main(self) -> None:
+ # TODO: add a check and fail if host/username/password/data aren't passed
+ self.config = Config('/etc/ceph/node-proxy.yml', default_config=DEFAULT_CONFIG)
+ self.log = Logger(__name__, level=self.config.__dict__['logging']['level'])
+
+ # create the redfish system and the obsever
+ self.log.logger.info('Server initialization...')
+ try:
+ self.system = RedfishDellSystem(host=self.host,
+ port=self.port,
+ username=self.username,
+ password=self.password,
+ config=self.config)
+ except RuntimeError:
+ self.log.logger.error("Can't initialize the redfish system.")
+ raise
+
+ try:
+ self.reporter_agent = Reporter(self.system,
+ self.cephx,
+ reporter_scheme=self.reporter_scheme,
+ reporter_hostname=self.mgr_host,
+ reporter_port=self.mgr_agent_port,
+ reporter_endpoint=self.reporter_endpoint)
+ self.reporter_agent.run()
+ except RuntimeError:
+ self.log.logger.error("Can't initialize the reporter.")
+ raise
+
+ try:
+ self.log.logger.info('Starting node-proxy API...')
+ self.api = NodeProxyApi(self,
+ username=self.username,
+ password=self.password,
+ ssl_crt=self.api_ssl_crt,
+ ssl_key=self.api_ssl_key)
+ self.api.start()
+ except Exception as e:
+ self.log.logger.error(f"Can't start node-proxy API: {e}")
+ raise
+
+
+def main() -> None:
+ parser = argparse.ArgumentParser(
+ description='Ceph Node-Proxy for HW Monitoring',
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ parser.add_argument(
+ '--config',
+ help='path of config file in json format',
+ required=True
+ )
+
+ args = parser.parse_args()
+
+ if not os.path.exists(args.config):
+ raise Exception(f'No config file found at provided config path: {args.config}')
+
+ with open(args.config, 'r') as f:
+ try:
+ config_json = f.read()
+ config = json.loads(config_json)
+ except Exception as e:
+ raise Exception(f'Failed to load json config: {str(e)}')
+
+ target_ip = config['target_ip']
+ target_port = config['target_port']
+ keyring = config['keyring']
+ root_cert = config['root_cert.pem']
+ listener_cert = config['listener.crt']
+ listener_key = config['listener.key']
+ name = config['name']
+
+ f = write_tmp_file(root_cert,
+ prefix_name='cephadm-endpoint-root-cert')
+
+ node_proxy_mgr = NodeProxyManager(mgr_host=target_ip,
+ cephx_name=name,
+ cephx_secret=keyring,
+ mgr_agent_port=target_port,
+ ca_path=f.name,
+ api_ssl_crt=listener_cert,
+ api_ssl_key=listener_key)
+ if not node_proxy_mgr.is_alive():
+ node_proxy_mgr.start()
+
+
+if __name__ == '__main__':
+ main()
--- /dev/null
+import json
+from urllib.error import HTTPError, URLError
+from ceph_node_proxy.baseclient import BaseClient
+from ceph_node_proxy.util import Logger, http_req
+from typing import Dict, Any, Tuple, Optional
+from http.client import HTTPMessage
+
+
+class RedFishClient(BaseClient):
+ PREFIX = '/redfish/v1/'
+
+ def __init__(self,
+ host: str = '',
+ port: str = '443',
+ username: str = '',
+ password: str = ''):
+ super().__init__(host, username, password)
+ self.log: Logger = Logger(__name__)
+ self.log.logger.info(f'Initializing redfish client {__name__}')
+ self.host: str = host
+ self.port: str = port
+ self.url: str = f'https://{self.host}:{self.port}'
+ self.token: str = ''
+ self.location: str = ''
+
+ def login(self) -> None:
+ if not self.is_logged_in():
+ self.log.logger.info('Logging in to '
+ f"{self.url} as '{self.username}'")
+ oob_credentials = json.dumps({'UserName': self.username,
+ 'Password': self.password})
+ headers = {'Content-Type': 'application/json'}
+
+ try:
+ _headers, _data, _status_code = self.query(data=oob_credentials,
+ headers=headers,
+ endpoint='/redfish/v1/SessionService/Sessions/')
+ if _status_code != 201:
+ self.log.logger.error(f"Can't log in to {self.url} as '{self.username}': {_status_code}")
+ raise RuntimeError
+ except URLError as e:
+ msg = f"Can't log in to {self.url} as '{self.username}': {e}"
+ self.log.logger.error(msg)
+ raise RuntimeError
+ self.token = _headers['X-Auth-Token']
+ self.location = _headers['Location']
+
+ def is_logged_in(self) -> bool:
+ self.log.logger.debug(f'Checking token validity for {self.url}')
+ if not self.location or not self.token:
+ self.log.logger.debug(f'No token found for {self.url}.')
+ return False
+ headers = {'X-Auth-Token': self.token}
+ try:
+ _headers, _data, _status_code = self.query(headers=headers,
+ endpoint=self.location)
+ except URLError as e:
+ self.log.logger.error("Can't check token "
+ f'validity for {self.url}: {e}')
+ raise
+ return _status_code == 200
+
+ def logout(self) -> Dict[str, Any]:
+ result: Dict[str, Any] = {}
+ try:
+ if self.is_logged_in():
+ _, _data, _status_code = self.query(method='DELETE',
+ headers={'X-Auth-Token': self.token},
+ endpoint=self.location)
+ result = json.loads(_data)
+ except URLError:
+ self.log.logger.error(f"Can't log out from {self.url}")
+
+ self.location = ''
+ self.token = ''
+
+ return result
+
+ def get_path(self, path: str) -> Dict[str, Any]:
+ if self.PREFIX not in path:
+ path = f'{self.PREFIX}{path}'
+ try:
+ _, result, _status_code = self.query(endpoint=path)
+ result_json = json.loads(result)
+ return result_json
+ except URLError as e:
+ self.log.logger.error(f"Can't get path {path}:\n{e}")
+ raise RuntimeError
+
+ def query(self,
+ data: Optional[str] = None,
+ headers: Dict[str, str] = {},
+ method: Optional[str] = None,
+ endpoint: str = '',
+ timeout: int = 10) -> Tuple[HTTPMessage, str, int]:
+ _headers = headers.copy() if headers else {}
+ if self.token:
+ _headers['X-Auth-Token'] = self.token
+ if not _headers.get('Content-Type') and method in ['POST', 'PUT', 'PATCH']:
+ _headers['Content-Type'] = 'application/json'
+ try:
+ (response_headers,
+ response_str,
+ response_status) = http_req(hostname=self.host,
+ port=self.port,
+ endpoint=endpoint,
+ headers=_headers,
+ method=method,
+ data=data,
+ timeout=timeout)
+
+ return response_headers, response_str, response_status
+ except (HTTPError, URLError) as e:
+ self.log.logger.debug(f'{e}')
+ raise
--- /dev/null
+from ceph_node_proxy.baseredfishsystem import BaseRedfishSystem
+from ceph_node_proxy.util import Logger, normalize_dict, to_snake_case
+from typing import Dict, Any, List
+
+
+class RedfishDellSystem(BaseRedfishSystem):
+ def __init__(self, **kw: Any) -> None:
+ super().__init__(**kw)
+ self.log = Logger(__name__)
+ self.job_service_endpoint: str = '/redfish/v1/Managers/iDRAC.Embedded.1/Oem/Dell/DellJobService'
+ self.create_reboot_job_endpoint: str = f'{self.job_service_endpoint}/Actions/DellJobService.CreateRebootJob'
+ self.setup_job_queue_endpoint: str = f'{self.job_service_endpoint}/Actions/DellJobService.SetupJobQueue'
+
+ def build_common_data(self,
+ data: Dict[str, Any],
+ fields: List,
+ path: str) -> Dict[str, Dict[str, Dict]]:
+ result: Dict[str, Dict[str, Dict]] = dict()
+ for member_info in self.get_members(data, path):
+ member_id = member_info['Id']
+ result[member_id] = dict()
+ for field in fields:
+ try:
+ result[member_id][to_snake_case(field)] = member_info[field]
+ except KeyError:
+ self.log.logger.warning(f'Could not find field: {field} in member_info: {member_info}')
+
+ return normalize_dict(result)
+
+ def build_chassis_data(self,
+ fields: Dict[str, List[str]],
+ path: str) -> Dict[str, Dict[str, Dict]]:
+ result: Dict[str, Dict[str, Dict]] = dict()
+ data = self._get_path(f'{self.chassis_endpoint}/{path}')
+
+ for elt, _fields in fields.items():
+ for member_elt in data[elt]:
+ _id = member_elt['MemberId']
+ result[_id] = dict()
+ for field in _fields:
+ try:
+ result[_id][to_snake_case(field)] = member_elt[field]
+ except KeyError:
+ self.log.logger.warning(f'Could not find field: {field} in data: {data[elt]}')
+ return normalize_dict(result)
+
+ def get_sn(self) -> str:
+ return self._sys['SKU']
+
+ def get_status(self) -> Dict[str, Dict[str, Dict]]:
+ return self._sys['status']
+
+ def get_memory(self) -> Dict[str, Dict[str, Dict]]:
+ return self._sys['memory']
+
+ def get_processors(self) -> Dict[str, Dict[str, Dict]]:
+ return self._sys['processors']
+
+ def get_network(self) -> Dict[str, Dict[str, Dict]]:
+ return self._sys['network']
+
+ def get_storage(self) -> Dict[str, Dict[str, Dict]]:
+ return self._sys['storage']
+
+ def get_firmwares(self) -> Dict[str, Dict[str, Dict]]:
+ return self._sys['firmwares']
+
+ def get_power(self) -> Dict[str, Dict[str, Dict]]:
+ return self._sys['power']
+
+ def get_fans(self) -> Dict[str, Dict[str, Dict]]:
+ return self._sys['fans']
+
+ def _update_network(self) -> None:
+ fields = ['Description', 'Name', 'SpeedMbps', 'Status']
+ self.log.logger.debug('Updating network')
+ self._sys['network'] = self.build_common_data(data=self._system['Systems'],
+ fields=fields,
+ path='EthernetInterfaces')
+
+ def _update_processors(self) -> None:
+ fields = ['Description',
+ 'TotalCores',
+ 'TotalThreads',
+ 'ProcessorType',
+ 'Model',
+ 'Status',
+ 'Manufacturer']
+ self.log.logger.debug('Updating processors')
+ self._sys['processors'] = self.build_common_data(data=self._system['Systems'],
+ fields=fields,
+ path='Processors')
+
+ def _update_storage(self) -> None:
+ fields = ['Description',
+ 'CapacityBytes',
+ 'Model', 'Protocol',
+ 'SerialNumber', 'Status',
+ 'PhysicalLocation']
+ entities = self.get_members(data=self._system['Systems'],
+ path='Storage')
+ self.log.logger.debug('Updating storage')
+ result: Dict[str, Dict[str, Dict]] = dict()
+ for entity in entities:
+ for drive in entity['Drives']:
+ drive_path = drive['@odata.id']
+ drive_info = self._get_path(drive_path)
+ drive_id = drive_info['Id']
+ result[drive_id] = dict()
+ result[drive_id]['redfish_endpoint'] = drive['@odata.id']
+ for field in fields:
+ result[drive_id][to_snake_case(field)] = drive_info[field]
+ result[drive_id]['entity'] = entity['Id']
+ self._sys['storage'] = normalize_dict(result)
+
+ def _update_sn(self) -> None:
+ self.log.logger.debug('Updating serial number')
+ self._sys['SKU'] = self._system['Systems']['SKU']
+
+ def _update_memory(self) -> None:
+ fields = ['Description',
+ 'MemoryDeviceType',
+ 'CapacityMiB',
+ 'Status']
+ self.log.logger.debug('Updating memory')
+ self._sys['memory'] = self.build_common_data(data=self._system['Systems'],
+ fields=fields,
+ path='Memory')
+
+ def _update_power(self) -> None:
+ fields = {
+ 'PowerSupplies': [
+ 'Name',
+ 'Model',
+ 'Manufacturer',
+ 'Status'
+ ]
+ }
+ self.log.logger.debug('Updating powersupplies')
+ self._sys['power'] = self.build_chassis_data(fields, 'Power')
+
+ def _update_fans(self) -> None:
+ fields = {
+ 'Fans': [
+ 'Name',
+ 'PhysicalContext',
+ 'Status'
+ ],
+ }
+ self.log.logger.debug('Updating fans')
+ self._sys['fans'] = self.build_chassis_data(fields, 'Thermal')
+
+ def _update_firmwares(self) -> None:
+ fields = [
+ 'Name',
+ 'Description',
+ 'ReleaseDate',
+ 'Version',
+ 'Updateable',
+ 'Status',
+ ]
+ self.log.logger.debug('Updating firmwares')
+ self._sys['firmwares'] = self.build_common_data(data=self._system['UpdateService'],
+ fields=fields,
+ path='FirmwareInventory')
--- /dev/null
+from threading import Thread
+import time
+import json
+from ceph_node_proxy.util import Logger, http_req
+from urllib.error import HTTPError, URLError
+from typing import Dict, Any
+
+
+class Reporter:
+ def __init__(self,
+ system: Any,
+ cephx: Dict[str, Any],
+ reporter_scheme: str = 'https',
+ reporter_hostname: str = '',
+ reporter_port: str = '443',
+ reporter_endpoint: str = '/node-proxy/data') -> None:
+ self.system = system
+ self.data: Dict[str, Any] = {}
+ self.finish = False
+ self.cephx = cephx
+ self.data['cephx'] = self.cephx
+ self.reporter_scheme: str = reporter_scheme
+ self.reporter_hostname: str = reporter_hostname
+ self.reporter_port: str = reporter_port
+ self.reporter_endpoint: str = reporter_endpoint
+ self.log = Logger(__name__)
+ self.reporter_url: str = (f'{reporter_scheme}:{reporter_hostname}:'
+ f'{reporter_port}{reporter_endpoint}')
+ self.log.logger.info(f'Reporter url set to {self.reporter_url}')
+
+ def stop(self) -> None:
+ self.finish = True
+ self.thread.join()
+
+ def run(self) -> None:
+ self.thread = Thread(target=self.loop)
+ self.thread.start()
+
+ def loop(self) -> None:
+ while not self.finish:
+ # Any logic to avoid sending the all the system
+ # information every loop can go here. In a real
+ # scenario probably we should just send the sub-parts
+ # that have changed to minimize the traffic in
+ # dense clusters
+ self.log.logger.debug('waiting for a lock in reporter loop.')
+ self.system.lock.acquire()
+ self.log.logger.debug('lock acquired in reporter loop.')
+ if self.system.data_ready:
+ self.log.logger.info('data ready to be sent to the mgr.')
+ if not self.system.get_system() == self.system.previous_data:
+ self.log.logger.info('data has changed since last iteration.')
+ self.data['patch'] = self.system.get_system()
+ try:
+ # TODO: add a timeout parameter to the reporter in the config file
+ self.log.logger.info(f'sending data to {self.reporter_url}')
+ http_req(hostname=self.reporter_hostname,
+ port=self.reporter_port,
+ method='POST',
+ headers={'Content-Type': 'application/json'},
+ endpoint=self.reporter_endpoint,
+ scheme=self.reporter_scheme,
+ data=json.dumps(self.data))
+ except (HTTPError, URLError) as e:
+ self.log.logger.error(f"The reporter couldn't send data to the mgr: {e}")
+ # Need to add a new parameter 'max_retries' to the reporter if it can't
+ # send the data for more than x times, maybe the daemon should stop altogether
+ else:
+ self.system.previous_data = self.system.get_system()
+ else:
+ self.log.logger.info('no diff, not sending data to the mgr.')
+ self.system.lock.release()
+ self.log.logger.debug('lock released in reporter loop.')
+ time.sleep(5)
--- /dev/null
+import logging
+import yaml
+import os
+import time
+import re
+import ssl
+from tempfile import NamedTemporaryFile, _TemporaryFileWrapper
+from urllib.error import HTTPError, URLError
+from urllib.request import urlopen, Request
+from typing import Dict, List, Callable, Any, Optional, MutableMapping, Tuple
+
+
+class Logger:
+ _Logger: List['Logger'] = []
+
+ def __init__(self, name: str, level: int = logging.INFO):
+ self.name = name
+ self.level = level
+
+ Logger._Logger.append(self)
+ self.logger = self.get_logger()
+
+ def get_logger(self) -> logging.Logger:
+ logger = logging.getLogger(self.name)
+ logger.setLevel(self.level)
+ handler = logging.StreamHandler()
+ handler.setLevel(self.level)
+ fmt = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+ handler.setFormatter(fmt)
+ logger.handlers.clear()
+ logger.addHandler(handler)
+ logger.propagate = False
+
+ return logger
+
+
+class Config:
+
+ def __init__(self,
+ config_file: str = '/etc/ceph/node-proxy.yaml',
+ default_config: Dict[str, Any] = {}) -> None:
+ self.config_file = config_file
+ self.default_config = default_config
+
+ self.load_config()
+
+ def load_config(self) -> None:
+ if os.path.exists(self.config_file):
+ with open(self.config_file, 'r') as f:
+ self.config = yaml.safe_load(f)
+ else:
+ self.config = self.default_config
+
+ for k, v in self.default_config.items():
+ if k not in self.config.keys():
+ self.config[k] = v
+
+ for k, v in self.config.items():
+ setattr(self, k, v)
+
+ # TODO: need to be improved
+ for _l in Logger._Logger:
+ _l.logger.setLevel(self.logging['level']) # type: ignore
+ _l.logger.handlers[0].setLevel(self.logging['level']) # type: ignore
+
+ def reload(self, config_file: str = '') -> None:
+ if config_file != '':
+ self.config_file = config_file
+ self.load_config()
+
+
+log = Logger(__name__)
+
+
+def to_snake_case(name: str) -> str:
+ name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
+ return re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).lower()
+
+
+def normalize_dict(test_dict: Dict) -> Dict:
+ res = dict()
+ for key in test_dict.keys():
+ if isinstance(test_dict[key], dict):
+ res[key.lower()] = normalize_dict(test_dict[key])
+ else:
+ res[key.lower()] = test_dict[key]
+ return res
+
+
+def retry(exceptions: Any = Exception, retries: int = 20, delay: int = 1) -> Callable:
+ def decorator(f: Callable) -> Callable:
+ def _retry(*args: str, **kwargs: Any) -> Callable:
+ _tries = retries
+ while _tries > 1:
+ try:
+ log.logger.debug('{} {} attempt(s) left.'.format(f, _tries - 1))
+ return f(*args, **kwargs)
+ except exceptions:
+ time.sleep(delay)
+ _tries -= 1
+ log.logger.warn('{} has failed after {} tries'.format(f, retries))
+ return f(*args, **kwargs)
+ return _retry
+ return decorator
+
+
+def http_req(hostname: str = '',
+ port: str = '443',
+ method: Optional[str] = None,
+ headers: MutableMapping[str, str] = {},
+ data: Optional[str] = None,
+ endpoint: str = '/',
+ scheme: str = 'https',
+ ssl_verify: bool = False,
+ timeout: Optional[int] = None,
+ ssl_ctx: Optional[Any] = None) -> Tuple[Any, Any, Any]:
+
+ if not ssl_ctx:
+ ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ if not ssl_verify:
+ ssl_ctx.check_hostname = False
+ ssl_ctx.verify_mode = ssl.CERT_NONE
+ else:
+ ssl_ctx.verify_mode = ssl.CERT_REQUIRED
+
+ url: str = f'{scheme}://{hostname}:{port}{endpoint}'
+ _data = bytes(data, 'ascii') if data else None
+ _headers = headers
+ if data and not method:
+ method = 'POST'
+ if not _headers.get('Content-Type') and method in ['POST', 'PATCH']:
+ _headers['Content-Type'] = 'application/json'
+ try:
+ req = Request(url, _data, _headers, method=method)
+ with urlopen(req, context=ssl_ctx, timeout=timeout) as response:
+ response_str = response.read()
+ response_headers = response.headers
+ response_code = response.code
+ return response_headers, response_str.decode(), response_code
+ except (HTTPError, URLError) as e:
+ print(f'{e}')
+ # handle error here if needed
+ raise
+
+
+def write_tmp_file(data: str, prefix_name: str = 'node-proxy-') -> _TemporaryFileWrapper:
+ f = NamedTemporaryFile(prefix=prefix_name)
+ os.fchmod(f.fileno(), 0o600)
+ f.write(data.encode('utf-8'))
+ f.flush()
+ return f
from pathlib import Path
from urllib.error import HTTPError, URLError
from urllib.request import urlopen, Request
-from cephadmlib.node_proxy.main import NodeProxy
FuncT = TypeVar('FuncT', bound=Callable)
##################################
+@register_daemon_form
+class NodeProxy(ContainerDaemonForm):
+ """Defines a node-proxy container"""
+
+ daemon_type = 'node-proxy'
+ # TODO: update this if we make node-proxy an executable
+ entrypoint = 'python3'
+ required_files = ['node-proxy.json']
+
+ @classmethod
+ def for_daemon_type(cls, daemon_type: str) -> bool:
+ return cls.daemon_type == daemon_type
+
+ def __init__(
+ self,
+ ctx: CephadmContext,
+ ident: DaemonIdentity,
+ config_json: Dict,
+ image: str = DEFAULT_IMAGE,
+ ):
+ self.ctx = ctx
+ self._identity = ident
+ self.image = image
+
+ # config-json options
+ config = dict_get(config_json, 'node-proxy.json', {})
+ self.files = {'node-proxy.json': config}
+
+ # validate the supplied args
+ self.validate()
+
+ @classmethod
+ def init(
+ cls, ctx: CephadmContext, fsid: str, daemon_id: str
+ ) -> 'NodeProxy':
+ return cls.create(
+ ctx, DaemonIdentity(fsid, cls.daemon_type, daemon_id)
+ )
+
+ @classmethod
+ def create(
+ cls, ctx: CephadmContext, ident: DaemonIdentity
+ ) -> 'NodeProxy':
+ return cls(ctx, ident, fetch_configs(ctx), ctx.image)
+
+ @property
+ def identity(self) -> DaemonIdentity:
+ return self._identity
+
+ @property
+ def fsid(self) -> str:
+ return self._identity.fsid
+
+ @property
+ def daemon_id(self) -> str:
+ return self._identity.daemon_id
+
+ def customize_container_mounts(
+ self, ctx: CephadmContext, mounts: Dict[str, str]
+ ) -> None:
+ data_dir = self.identity.data_dir(ctx.data_dir)
+ # TODO: update this when we have the actual location
+ # in the ceph container we are going to keep node-proxy
+ mounts.update({os.path.join(data_dir, 'node-proxy.json'): '/usr/share/ceph/node-proxy.json:z'})
+
+ def customize_process_args(self, ctx: CephadmContext, args: List[str]) -> None:
+ # TODO: this corresponds with the mount location of
+ # the config in _get_container_mounts above. They
+ # will both need to be updated when we have a proper
+ # location in the container for node-proxy
+ args.extend(['/usr/share/ceph/ceph_node_proxy/main.py', '--config', '/usr/share/ceph/node-proxy.json'])
+
+ def validate(self):
+ # type: () -> None
+ if not is_fsid(self.fsid):
+ raise Error('not an fsid: %s' % self.fsid)
+ if not self.daemon_id:
+ raise Error('invalid daemon_id: %s' % self.daemon_id)
+ if not self.image:
+ raise Error('invalid image: %s' % self.image)
+ # check for the required files
+ if self.required_files:
+ for fname in self.required_files:
+ if fname not in self.files:
+ raise Error(
+ 'required file missing from config-json: %s' % fname
+ )
+
+ def get_daemon_name(self):
+ # type: () -> str
+ return '%s.%s' % (self.daemon_type, self.daemon_id)
+
+ def get_container_name(self, desc=None):
+ # type: (Optional[str]) -> str
+ cname = 'ceph-%s-%s' % (self.fsid, self.get_daemon_name())
+ if desc:
+ cname = '%s-%s' % (cname, desc)
+ return cname
+
+ def create_daemon_dirs(self, data_dir, uid, gid):
+ # type: (str, int, int) -> None
+ """Create files under the container data dir"""
+ if not os.path.isdir(data_dir):
+ raise OSError('data_dir is not a directory: %s' % (data_dir))
+
+ logger.info('Writing node-proxy config...')
+ # populate files from the config-json
+ populate_files(data_dir, self.files, uid, gid)
+
+ def container(self, ctx: CephadmContext) -> CephContainer:
+ # So the container can modprobe iscsi_target_mod and have write perms
+ # to configfs we need to make this a privileged container.
+ ctr = daemon_to_container(ctx, self, privileged=True, envs=['PYTHONPATH=$PYTHONPATH:/usr/share/ceph'])
+ return to_deployment_container(ctx, ctr)
+
+ def config_and_keyring(
+ self, ctx: CephadmContext
+ ) -> Tuple[Optional[str], Optional[str]]:
+ return get_config_and_keyring(ctx)
+
+ def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]:
+ return extract_uid_gid(ctx)
+
+ def default_entrypoint(self) -> str:
+ return self.entrypoint
+
+
@contextmanager
def write_new(
destination: Union[str, Path],
supported_daemons.append(CephadmAgent.daemon_type)
supported_daemons.append(SNMPGateway.daemon_type)
supported_daemons.extend(Tracing.components)
+ supported_daemons.append(NodeProxy.daemon_type)
assert len(supported_daemons) == len(set(supported_daemons))
return supported_daemons
sg = SNMPGateway.init(ctx, fsid, daemon_id)
sg.create_daemon_conf()
+ elif daemon_type == NodeProxy.daemon_type:
+ node_proxy = NodeProxy.init(ctx, fsid, daemon_id)
+ node_proxy.create_daemon_dirs(data_dir, uid, gid)
+
_write_custom_conf_files(ctx, daemon_type, str(daemon_id), fsid, uid, gid)
self.agent.ls_gatherer.wakeup()
self.agent.volume_gatherer.wakeup()
logger.debug(f'Got mgr message {data}')
- if 'node_proxy_oob_cmd' in data:
- if data['node_proxy_oob_cmd']['action'] in ['get_led', 'set_led']:
- conn.send(bytes(json.dumps(self.node_proxy_oob_cmd_result), 'utf-8'))
except Exception as e:
logger.error(f'Mgr Listener encountered exception: {e}')
def shutdown(self) -> None:
self.stop = True
- def validate_node_proxy_payload(self, data: Dict[str, Any]) -> None:
- if 'action' not in data.keys():
- raise RuntimeError('node-proxy oob command needs an action.')
- if data['action'] in ['get_led', 'set_led']:
- fields = ['type', 'id']
- if data['type'] not in ['chassis', 'drive']:
- raise RuntimeError('the LED type must be either "chassis" or "drive".')
- for field in fields:
- if field not in data.keys():
- raise RuntimeError('Received invalid node-proxy cmd.')
-
def handle_json_payload(self, data: Dict[Any, Any]) -> None:
- self.node_proxy_oob_cmd_result: Dict[str, Any] = {}
if 'counter' in data:
self.agent.ack = int(data['counter'])
if 'config' in data:
f.write(config[filename])
self.agent.pull_conf_settings()
self.agent.wakeup()
- elif 'node_proxy_shutdown' in data:
- logger.info('Received node_proxy_shutdown command.')
- self.agent.shutdown()
- elif 'node_proxy_oob_cmd' in data:
- node_proxy_cmd: Dict[str, Any] = data['node_proxy_oob_cmd']
- try:
- self.validate_node_proxy_payload(node_proxy_cmd)
- except RuntimeError as e:
- logger.error(f"Couldn't validate node-proxy payload:\n{node_proxy_cmd}\n{e}")
- raise
- logger.info(f'Received node_proxy_oob_cmd command: {node_proxy_cmd}')
- if node_proxy_cmd['action'] == 'get_led':
- if node_proxy_cmd['type'] == 'chassis':
- self.node_proxy_oob_cmd_result = self.agent.node_proxy_mgr.node_proxy.system.get_chassis_led()
- if node_proxy_cmd['action'] == 'set_led':
- if node_proxy_cmd['type'] == 'chassis':
- _data: Dict[str, Any] = json.loads(node_proxy_cmd['data'])
- _result: int = self.agent.node_proxy_mgr.node_proxy.system.set_chassis_led(_data)
- self.node_proxy_oob_cmd_result = {'http_code': _result}
else:
raise RuntimeError('No valid data received.')
-class NodeProxyManager(Thread):
- def __init__(self, agent: 'CephadmAgent', event: Event):
- super().__init__()
- self.agent = agent
- self.event = event
- self.stop = False
-
- def run(self) -> None:
- self.event.wait()
- self.ssl_ctx = self.agent.ssl_ctx
- self.init()
- self.loop()
-
- def init(self) -> None:
- node_proxy_meta = {
- 'cephx': {
- 'name': self.agent.host,
- 'secret': self.agent.keyring
- }
- }
- status, result = http_query(addr=self.agent.target_ip,
- port=self.agent.target_port,
- data=json.dumps(node_proxy_meta).encode('ascii'),
- endpoint='/node-proxy/oob',
- ssl_ctx=self.ssl_ctx)
- if status != 200:
- msg = f'No out of band tool details could be loaded: {status}, {result}'
- logger.debug(msg)
- raise RuntimeError(msg)
-
- result_json = json.loads(result)
- kwargs = {
- 'host': result_json['result']['addr'],
- 'username': result_json['result']['username'],
- 'password': result_json['result']['password'],
- 'cephx': node_proxy_meta['cephx'],
- 'mgr_target_ip': self.agent.target_ip,
- 'mgr_target_port': self.agent.target_port
- }
- if result_json['result'].get('port'):
- kwargs['port'] = result_json['result']['port']
-
- self.node_proxy: NodeProxy = NodeProxy(**kwargs)
- self.node_proxy.start()
-
- def loop(self) -> None:
- while not self.stop:
- try:
- status = self.node_proxy.check_status()
- label = 'Ok' if status else 'Critical'
- logger.debug(f'node-proxy status: {label}')
- except Exception as e:
- logger.error(f'node-proxy not running: {e.__class__.__name__}: {e}')
- time.sleep(120)
- self.init()
- else:
- logger.debug('node-proxy alive, next check in 60sec.')
- time.sleep(60)
-
- def shutdown(self) -> None:
- self.stop = True
- # if `self.node_proxy.shutdown()` is called before self.start(), it will fail.
- if self.__dict__.get('node_proxy'):
- self.node_proxy.shutdown()
-
-
class CephadmAgent():
daemon_type = 'agent'
self.ssl_ctx = ssl.create_default_context()
self.ssl_ctx.check_hostname = True
self.ssl_ctx.verify_mode = ssl.CERT_REQUIRED
- self.node_proxy_mgr_event = Event()
- self.node_proxy_mgr = NodeProxyManager(self, self.node_proxy_mgr_event)
def validate(self, config: Dict[str, str] = {}) -> None:
# check for the required files
def shutdown(self) -> None:
self.stop = True
- if self.node_proxy_mgr.is_alive():
- self.node_proxy_mgr.shutdown()
if self.mgr_listener.is_alive():
self.mgr_listener.shutdown()
if self.ls_gatherer.is_alive():
def run(self) -> None:
self.pull_conf_settings()
self.ssl_ctx.load_verify_locations(self.ca_path)
- # only after self.pull_conf_settings() was called we can actually start
- # node-proxy
- self.node_proxy_mgr_event.set()
try:
for _ in range(1001):
if not self.volume_gatherer.is_alive():
self.volume_gatherer.start()
- if not self.node_proxy_mgr.is_alive():
- self.node_proxy_mgr.start()
-
while not self.stop:
start_time = time.monotonic()
ack = self.ack
+++ /dev/null
-from typing import Dict, Any
-
-
-class BaseClient:
- def __init__(self,
- host: str,
- username: str,
- password: str) -> None:
- self.host = host
- self.username = username
- self.password = password
-
- def login(self) -> None:
- raise NotImplementedError()
-
- def logout(self) -> Dict[str, Any]:
- raise NotImplementedError()
-
- def get_path(self, path: str) -> Dict:
- raise NotImplementedError()
+++ /dev/null
-import concurrent.futures
-from .basesystem import BaseSystem
-from .redfish_client import RedFishClient
-from threading import Thread, Lock
-from time import sleep
-from .util import Logger, retry
-from typing import Dict, Any, List
-
-
-class BaseRedfishSystem(BaseSystem):
- def __init__(self, **kw: Any) -> None:
- super().__init__(**kw)
- self.common_endpoints: List[str] = kw.get('common_endpoints', ['/Systems/System.Embedded.1',
- '/UpdateService'])
- self.chassis_endpoint: str = kw.get('chassis_endpoint', '/Chassis/System.Embedded.1')
- self.log = Logger(__name__)
- self.host: str = kw['host']
- self.port: str = kw['port']
- self.username: str = kw['username']
- self.password: str = kw['password']
- # move the following line (class attribute?)
- self.client: RedFishClient = RedFishClient(host=self.host, port=self.port, username=self.username, password=self.password)
- self.log.logger.info(f'redfish system initialization, host: {self.host}, user: {self.username}')
-
- self.run: bool = False
- self.thread: Thread
- self.data_ready: bool = False
- self.previous_data: Dict = {}
- self.lock: Lock = Lock()
- self.data: Dict[str, Dict[str, Any]] = {}
- self._system: Dict[str, Dict[str, Any]] = {}
- self._sys: Dict[str, Any] = {}
- self.start_client()
-
- def start_client(self) -> None:
- self.client.login()
- self.start_update_loop()
-
- def start_update_loop(self) -> None:
- self.run = True
- self.thread = Thread(target=self.update)
- self.thread.start()
-
- def stop_update_loop(self) -> None:
- self.run = False
- self.thread.join()
-
- def update(self) -> None:
- # this loop can have:
- # - caching logic
- while self.run:
- self.log.logger.debug('waiting for a lock in the update loop.')
- self.lock.acquire()
- self.log.logger.debug('lock acquired in the update loop.')
- try:
- self._update_system()
- self._update_sn()
- update_funcs = [self._update_memory,
- self._update_power,
- self._update_fans,
- self._update_network,
- self._update_processors,
- self._update_storage,
- self._update_firmwares]
-
- with concurrent.futures.ThreadPoolExecutor() as executor:
- executor.map(lambda f: f(), update_funcs)
-
- self.data_ready = True
- sleep(5)
- except RuntimeError as e:
- self.run = False
- self.log.logger.error(f'Error detected, trying to gracefully log out from redfish api.\n{e}')
- self.client.logout()
- finally:
- self.lock.release()
- self.log.logger.debug('lock released in the update loop.')
-
- def flush(self) -> None:
- self.log.logger.debug('Acquiring lock to flush data.')
- self.lock.acquire()
- self.log.logger.debug('Lock acquired, flushing data.')
- self._system = {}
- self.previous_data = {}
- self.log.logger.info('Data flushed.')
- self.data_ready = False
- self.log.logger.debug('Data marked as not ready.')
- self.lock.release()
- self.log.logger.debug('Released the lock after flushing data.')
-
- @retry(retries=10, delay=2)
- def _get_path(self, path: str) -> Dict:
- try:
- result = self.client.get_path(path)
- except RuntimeError:
- raise
- if result is None:
- self.log.logger.error(f'The client reported an error when getting path: {path}')
- raise RuntimeError(f'Could not get path: {path}')
- return result
-
- def get_members(self, data: Dict[str, Any], path: str) -> List:
- _path = data[path]['@odata.id']
- _data = self._get_path(_path)
- return [self._get_path(member['@odata.id']) for member in _data['Members']]
-
- def get_system(self) -> Dict[str, Any]:
- result = {
- 'host': self.get_host(),
- 'sn': self.get_sn(),
- 'status': {
- 'storage': self.get_storage(),
- 'processors': self.get_processors(),
- 'network': self.get_network(),
- 'memory': self.get_memory(),
- 'power': self.get_power(),
- 'fans': self.get_fans()
- },
- 'firmwares': self.get_firmwares(),
- 'chassis': {'redfish_endpoint': f'/redfish/v1{self.chassis_endpoint}'} # TODO(guits): not ideal
- }
- return result
-
- def _update_system(self) -> None:
- for endpoint in self.common_endpoints:
- result = self.client.get_path(endpoint)
- _endpoint = endpoint.strip('/').split('/')[0]
- self._system[_endpoint] = result
-
- def _update_sn(self) -> None:
- raise NotImplementedError()
-
- def _update_memory(self) -> None:
- raise NotImplementedError()
-
- def _update_power(self) -> None:
- raise NotImplementedError()
-
- def _update_fans(self) -> None:
- raise NotImplementedError()
-
- def _update_network(self) -> None:
- raise NotImplementedError()
-
- def _update_processors(self) -> None:
- raise NotImplementedError()
-
- def _update_storage(self) -> None:
- raise NotImplementedError()
-
- def _update_firmwares(self) -> None:
- raise NotImplementedError()
+++ /dev/null
-import socket
-from .util import Config
-from typing import Dict, Any
-from .baseclient import BaseClient
-
-
-class BaseSystem:
- def __init__(self, **kw: Any) -> None:
- self._system: Dict = {}
- self.config: Config = kw['config']
- self.client: BaseClient
-
- def get_system(self) -> Dict[str, Any]:
- raise NotImplementedError()
-
- def get_status(self) -> Dict[str, Dict[str, Dict]]:
- raise NotImplementedError()
-
- def get_metadata(self) -> Dict[str, Dict[str, Dict]]:
- raise NotImplementedError()
-
- def get_processors(self) -> Dict[str, Dict[str, Dict]]:
- raise NotImplementedError()
-
- def get_memory(self) -> Dict[str, Dict[str, Dict]]:
- raise NotImplementedError()
-
- def get_fans(self) -> Dict[str, Dict[str, Dict]]:
- raise NotImplementedError()
-
- def get_power(self) -> Dict[str, Dict[str, Dict]]:
- raise NotImplementedError()
-
- def get_network(self) -> Dict[str, Dict[str, Dict]]:
- raise NotImplementedError()
-
- def get_storage(self) -> Dict[str, Dict[str, Dict]]:
- raise NotImplementedError()
-
- def get_firmwares(self) -> Dict[str, Dict[str, Dict]]:
- raise NotImplementedError()
-
- def get_sn(self) -> str:
- raise NotImplementedError()
-
- def get_led(self) -> Dict[str, Any]:
- raise NotImplementedError()
-
- def set_led(self, data: Dict[str, str]) -> int:
- raise NotImplementedError()
-
- def get_host(self) -> str:
- return socket.gethostname()
-
- def start_update_loop(self) -> None:
- raise NotImplementedError()
-
- def stop_update_loop(self) -> None:
- raise NotImplementedError()
-
- def start_client(self) -> None:
- raise NotImplementedError()
-
- def flush(self) -> None:
- raise NotImplementedError()
+++ /dev/null
-from threading import Thread
-from .redfishdellsystem import RedfishDellSystem
-from .reporter import Reporter
-from .util import Config, Logger
-from typing import Dict, Any, Optional
-import traceback
-
-DEFAULT_CONFIG = {
- 'reporter': {
- 'check_interval': 5,
- 'push_data_max_retries': 30,
- 'endpoint': 'https://127.0.0.1:7150/node-proxy/data',
- },
- 'system': {
- 'refresh_interval': 5
- },
- 'server': {
- 'port': 8080,
- },
- 'logging': {
- 'level': 20,
- }
-}
-
-
-class NodeProxy(Thread):
- def __init__(self, **kw: Any) -> None:
- super().__init__()
- self.username: str = kw.get('username', '')
- self.password: str = kw.get('password', '')
- self.host: str = kw.get('host', '')
- self.port: int = kw.get('port', 443)
- self.cephx: Dict[str, Any] = kw.get('cephx', {})
- self.reporter_scheme: str = kw.get('reporter_scheme', 'https')
- self.mgr_target_ip: str = kw.get('mgr_target_ip', '')
- self.mgr_target_port: str = kw.get('mgr_target_port', '')
- self.reporter_endpoint: str = kw.get('reporter_endpoint', '/node-proxy/data')
- self.exc: Optional[Exception] = None
- self.log = Logger(__name__)
-
- def run(self) -> None:
- try:
- self.main()
- except Exception as e:
- self.exc = e
- return
-
- def shutdown(self) -> None:
- self.log.logger.info('Shutting down node-proxy...')
- self.system.client.logout()
- self.system.stop_update_loop()
- self.reporter_agent.stop()
-
- def check_auth(self, realm: str, username: str, password: str) -> bool:
- return self.username == username and \
- self.password == password
-
- def check_status(self) -> bool:
- if self.__dict__.get('system') and not self.system.run:
- raise RuntimeError('node-proxy encountered an error.')
- if self.exc:
- traceback.print_tb(self.exc.__traceback__)
- self.log.logger.error(f'{self.exc.__class__.__name__}: {self.exc}')
- raise self.exc
- return True
-
- def main(self) -> None:
- # TODO: add a check and fail if host/username/password/data aren't passed
- self.config = Config('/etc/ceph/node-proxy.yml', default_config=DEFAULT_CONFIG)
- self.log = Logger(__name__, level=self.config.__dict__['logging']['level'])
-
- # create the redfish system and the obsever
- self.log.logger.info('Server initialization...')
- try:
- self.system = RedfishDellSystem(host=self.host,
- port=self.port,
- username=self.username,
- password=self.password,
- config=self.config)
- except RuntimeError:
- self.log.logger.error("Can't initialize the redfish system.")
- raise
-
- try:
- self.reporter_agent = Reporter(self.system,
- self.cephx,
- reporter_scheme=self.reporter_scheme,
- reporter_hostname=self.mgr_target_ip,
- reporter_port=self.mgr_target_port,
- reporter_endpoint=self.reporter_endpoint)
- self.reporter_agent.run()
- except RuntimeError:
- self.log.logger.error("Can't initialize the reporter.")
- raise
+++ /dev/null
-import json
-from urllib.error import HTTPError, URLError
-from .baseclient import BaseClient
-from .util import Logger, http_req
-from typing import Dict, Any, Tuple, Optional
-from http.client import HTTPMessage
-
-
-class RedFishClient(BaseClient):
- PREFIX = '/redfish/v1/'
-
- def __init__(self,
- host: str = '',
- port: str = '443',
- username: str = '',
- password: str = ''):
- super().__init__(host, username, password)
- self.log: Logger = Logger(__name__)
- self.log.logger.info(f'Initializing redfish client {__name__}')
- self.host: str = host
- self.port: str = port
- self.url: str = f'https://{self.host}:{self.port}'
- self.token: str = ''
- self.location: str = ''
-
- def login(self) -> None:
- if not self.is_logged_in():
- self.log.logger.info('Logging in to '
- f"{self.url} as '{self.username}'")
- oob_credentials = json.dumps({'UserName': self.username,
- 'Password': self.password})
- headers = {'Content-Type': 'application/json'}
-
- try:
- _headers, _data, _status_code = self.query(data=oob_credentials,
- headers=headers,
- endpoint='/redfish/v1/SessionService/Sessions/')
- if _status_code != 201:
- self.log.logger.error(f"Can't log in to {self.url} as '{self.username}': {_status_code}")
- raise RuntimeError
- except URLError as e:
- msg = f"Can't log in to {self.url} as '{self.username}': {e}"
- self.log.logger.error(msg)
- raise RuntimeError
- self.token = _headers['X-Auth-Token']
- self.location = _headers['Location']
-
- def is_logged_in(self) -> bool:
- self.log.logger.debug(f'Checking token validity for {self.url}')
- if not self.location or not self.token:
- self.log.logger.debug(f'No token found for {self.url}.')
- return False
- headers = {'X-Auth-Token': self.token}
- try:
- _headers, _data, _status_code = self.query(headers=headers,
- endpoint=self.location)
- except URLError as e:
- self.log.logger.error("Can't check token "
- f'validity for {self.url}: {e}')
- raise RuntimeError
- return _status_code == 200
-
- def logout(self) -> Dict[str, Any]:
- try:
- _, _data, _status_code = self.query(method='DELETE',
- headers={'X-Auth-Token': self.token},
- endpoint=self.location)
- except URLError:
- self.log.logger.error(f"Can't log out from {self.url}")
- return {}
-
- response_str = _data
-
- return json.loads(response_str)
-
- def get_path(self, path: str) -> Dict[str, Any]:
- if self.PREFIX not in path:
- path = f'{self.PREFIX}{path}'
- try:
- _, result, _status_code = self.query(endpoint=path)
- result_json = json.loads(result)
- return result_json
- except URLError as e:
- self.log.logger.error(f"Can't get path {path}:\n{e}")
- raise RuntimeError
-
- def query(self,
- data: Optional[str] = None,
- headers: Dict[str, str] = {},
- method: Optional[str] = None,
- endpoint: str = '',
- timeout: int = 10) -> Tuple[HTTPMessage, str, int]:
- _headers = headers.copy() if headers else {}
- if self.token:
- _headers['X-Auth-Token'] = self.token
- if not _headers.get('Content-Type') and method in ['POST', 'PUT', 'PATCH']:
- _headers['Content-Type'] = 'application/json'
- try:
- (response_headers,
- response_str,
- response_status) = http_req(hostname=self.host,
- port=self.port,
- endpoint=endpoint,
- headers=_headers,
- method=method,
- data=data,
- timeout=timeout)
-
- return response_headers, response_str, response_status
- except (HTTPError, URLError) as e:
- self.log.logger.debug(f'{e}')
- raise
+++ /dev/null
-import json
-from .baseredfishsystem import BaseRedfishSystem
-from .util import Logger, normalize_dict, to_snake_case
-from typing import Dict, Any, List
-
-
-class RedfishDellSystem(BaseRedfishSystem):
- def __init__(self, **kw: Any) -> None:
- super().__init__(**kw)
- self.log = Logger(__name__)
-
- def build_common_data(self,
- data: Dict[str, Any],
- fields: List,
- path: str) -> Dict[str, Dict[str, Dict]]:
- result: Dict[str, Dict[str, Dict]] = dict()
- for member_info in self.get_members(data, path):
- member_id = member_info['Id']
- result[member_id] = dict()
- for field in fields:
- try:
- result[member_id][to_snake_case(field)] = member_info[field]
- except KeyError:
- self.log.logger.warning(f'Could not find field: {field} in member_info: {member_info}')
-
- return normalize_dict(result)
-
- def build_chassis_data(self,
- fields: Dict[str, List[str]],
- path: str) -> Dict[str, Dict[str, Dict]]:
- result: Dict[str, Dict[str, Dict]] = dict()
- data = self._get_path(f'{self.chassis_endpoint}/{path}')
-
- for elt, _fields in fields.items():
- for member_elt in data[elt]:
- _id = member_elt['MemberId']
- result[_id] = dict()
- for field in _fields:
- try:
- result[_id][to_snake_case(field)] = member_elt[field]
- except KeyError:
- self.log.logger.warning(f'Could not find field: {field} in data: {data[elt]}')
- return normalize_dict(result)
-
- def get_sn(self) -> str:
- return self._sys['SKU']
-
- def get_status(self) -> Dict[str, Dict[str, Dict]]:
- return self._sys['status']
-
- def get_memory(self) -> Dict[str, Dict[str, Dict]]:
- return self._sys['memory']
-
- def get_processors(self) -> Dict[str, Dict[str, Dict]]:
- return self._sys['processors']
-
- def get_network(self) -> Dict[str, Dict[str, Dict]]:
- return self._sys['network']
-
- def get_storage(self) -> Dict[str, Dict[str, Dict]]:
- return self._sys['storage']
-
- def get_firmwares(self) -> Dict[str, Dict[str, Dict]]:
- return self._sys['firmwares']
-
- def get_power(self) -> Dict[str, Dict[str, Dict]]:
- return self._sys['power']
-
- def get_fans(self) -> Dict[str, Dict[str, Dict]]:
- return self._sys['fans']
-
- def _update_network(self) -> None:
- fields = ['Description', 'Name', 'SpeedMbps', 'Status']
- self.log.logger.debug('Updating network')
- self._sys['network'] = self.build_common_data(data=self._system['Systems'],
- fields=fields,
- path='EthernetInterfaces')
-
- def _update_processors(self) -> None:
- fields = ['Description',
- 'TotalCores',
- 'TotalThreads',
- 'ProcessorType',
- 'Model',
- 'Status',
- 'Manufacturer']
- self.log.logger.debug('Updating processors')
- self._sys['processors'] = self.build_common_data(data=self._system['Systems'],
- fields=fields,
- path='Processors')
-
- def _update_storage(self) -> None:
- fields = ['Description',
- 'CapacityBytes',
- 'Model', 'Protocol',
- 'SerialNumber', 'Status',
- 'PhysicalLocation']
- entities = self.get_members(data=self._system['Systems'],
- path='Storage')
- self.log.logger.debug('Updating storage')
- result: Dict[str, Dict[str, Dict]] = dict()
- for entity in entities:
- for drive in entity['Drives']:
- drive_path = drive['@odata.id']
- drive_info = self._get_path(drive_path)
- drive_id = drive_info['Id']
- result[drive_id] = dict()
- result[drive_id]['redfish_endpoint'] = drive['@odata.id']
- for field in fields:
- result[drive_id][to_snake_case(field)] = drive_info[field]
- result[drive_id]['entity'] = entity['Id']
- self._sys['storage'] = normalize_dict(result)
-
- def _update_sn(self) -> None:
- self.log.logger.debug('Updating serial number')
- self._sys['SKU'] = self._system['Systems']['SKU']
-
- def _update_memory(self) -> None:
- fields = ['Description',
- 'MemoryDeviceType',
- 'CapacityMiB',
- 'Status']
- self.log.logger.debug('Updating memory')
- self._sys['memory'] = self.build_common_data(data=self._system['Systems'],
- fields=fields,
- path='Memory')
-
- def _update_power(self) -> None:
- fields = {
- 'PowerSupplies': [
- 'Name',
- 'Model',
- 'Manufacturer',
- 'Status'
- ]
- }
- self.log.logger.debug('Updating powersupplies')
- self._sys['power'] = self.build_chassis_data(fields, 'Power')
-
- def _update_fans(self) -> None:
- fields = {
- 'Fans': [
- 'Name',
- 'PhysicalContext',
- 'Status'
- ],
- }
- self.log.logger.debug('Updating fans')
- self._sys['fans'] = self.build_chassis_data(fields, 'Thermal')
-
- def _update_firmwares(self) -> None:
- fields = [
- 'Name',
- 'Description',
- 'ReleaseDate',
- 'Version',
- 'Updateable',
- 'Status',
- ]
- self.log.logger.debug('Updating firmwares')
- self._sys['firmwares'] = self.build_common_data(data=self._system['UpdateService'],
- fields=fields,
- path='FirmwareInventory')
-
- def get_chassis_led(self) -> Dict[str, Any]:
- endpoint = f'/redfish/v1/{self.chassis_endpoint}'
- result = self.client.query(method='GET',
- endpoint=endpoint,
- timeout=10)
- response_json = json.loads(result[1])
- _result: Dict[str, Any] = {'http_code': result[2]}
- if result[2] == 200:
- _result['LocationIndicatorActive'] = response_json['LocationIndicatorActive']
- else:
- _result['LocationIndicatorActive'] = None
- return _result
-
- def set_chassis_led(self, data: Dict[str, str]) -> int:
- # '{"IndicatorLED": "Lit"}' -> LocationIndicatorActive = false
- # '{"IndicatorLED": "Blinking"}' -> LocationIndicatorActive = true
- _, response, status = self.client.query(
- data=json.dumps(data),
- method='PATCH',
- endpoint=f'/redfish/v1{self.chassis_endpoint}'
- )
- return status
+++ /dev/null
-from threading import Thread
-import time
-import json
-from .util import Logger, http_req
-from urllib.error import HTTPError, URLError
-from typing import Dict, Any
-
-
-class Reporter:
- def __init__(self,
- system: Any,
- cephx: Dict[str, Any],
- reporter_scheme: str = 'https',
- reporter_hostname: str = '',
- reporter_port: str = '443',
- reporter_endpoint: str = '/node-proxy/data') -> None:
- self.system = system
- self.data: Dict[str, Any] = {}
- self.finish = False
- self.cephx = cephx
- self.data['cephx'] = self.cephx
- self.reporter_scheme: str = reporter_scheme
- self.reporter_hostname: str = reporter_hostname
- self.reporter_port: str = reporter_port
- self.reporter_endpoint: str = reporter_endpoint
- self.log = Logger(__name__)
- self.reporter_url: str = (f'{reporter_scheme}:{reporter_hostname}:'
- f'{reporter_port}{reporter_endpoint}')
- self.log.logger.info(f'Reporter url set to {self.reporter_url}')
-
- def stop(self) -> None:
- self.finish = True
- self.thread.join()
-
- def run(self) -> None:
- self.thread = Thread(target=self.loop)
- self.thread.start()
-
- def loop(self) -> None:
- while not self.finish:
- # Any logic to avoid sending the all the system
- # information every loop can go here. In a real
- # scenario probably we should just send the sub-parts
- # that have changed to minimize the traffic in
- # dense clusters
- self.log.logger.debug('waiting for a lock in reporter loop.')
- self.system.lock.acquire()
- self.log.logger.debug('lock acquired in reporter loop.')
- if self.system.data_ready:
- self.log.logger.info('data ready to be sent to the mgr.')
- if not self.system.get_system() == self.system.previous_data:
- self.log.logger.info('data has changed since last iteration.')
- self.data['patch'] = self.system.get_system()
- try:
- # TODO: add a timeout parameter to the reporter in the config file
- self.log.logger.info(f'sending data to {self.reporter_url}')
- http_req(hostname=self.reporter_hostname,
- port=self.reporter_port,
- method='POST',
- headers={'Content-Type': 'application/json'},
- endpoint=self.reporter_endpoint,
- scheme=self.reporter_scheme,
- data=json.dumps(self.data))
- except (HTTPError, URLError) as e:
- self.log.logger.error(f"The reporter couldn't send data to the mgr: {e}")
- # Need to add a new parameter 'max_retries' to the reporter if it can't
- # send the data for more than x times, maybe the daemon should stop altogether
- else:
- self.system.previous_data = self.system.get_system()
- else:
- self.log.logger.info('no diff, not sending data to the mgr.')
- self.system.lock.release()
- self.log.logger.debug('lock released in reporter loop.')
- time.sleep(5)
+++ /dev/null
-import logging
-import yaml
-import os
-import time
-import re
-import ssl
-from urllib.error import HTTPError, URLError
-from urllib.request import urlopen, Request
-from typing import Dict, List, Callable, Any, Optional, MutableMapping, Tuple
-
-
-class Logger:
- _Logger: List['Logger'] = []
-
- def __init__(self, name: str, level: int = logging.INFO):
- self.name = name
- self.level = level
-
- Logger._Logger.append(self)
- self.logger = self.get_logger()
-
- def get_logger(self) -> logging.Logger:
- logger = logging.getLogger(self.name)
- logger.setLevel(self.level)
- handler = logging.StreamHandler()
- handler.setLevel(self.level)
- fmt = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- handler.setFormatter(fmt)
- logger.handlers.clear()
- logger.addHandler(handler)
- logger.propagate = False
-
- return logger
-
-
-class Config:
-
- def __init__(self,
- config_file: str = '/etc/ceph/node-proxy.yaml',
- default_config: Dict[str, Any] = {}) -> None:
- self.config_file = config_file
- self.default_config = default_config
-
- self.load_config()
-
- def load_config(self) -> None:
- if os.path.exists(self.config_file):
- with open(self.config_file, 'r') as f:
- self.config = yaml.safe_load(f)
- else:
- self.config = self.default_config
-
- for k, v in self.default_config.items():
- if k not in self.config.keys():
- self.config[k] = v
-
- for k, v in self.config.items():
- setattr(self, k, v)
-
- # TODO: need to be improved
- for _l in Logger._Logger:
- _l.logger.setLevel(self.logging['level']) # type: ignore
- _l.logger.handlers[0].setLevel(self.logging['level']) # type: ignore
-
- def reload(self, config_file: str = '') -> None:
- if config_file != '':
- self.config_file = config_file
- self.load_config()
-
-
-log = Logger(__name__)
-
-
-def to_snake_case(name: str) -> str:
- name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
- return re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).lower()
-
-
-def normalize_dict(test_dict: Dict) -> Dict:
- res = dict()
- for key in test_dict.keys():
- if isinstance(test_dict[key], dict):
- res[key.lower()] = normalize_dict(test_dict[key])
- else:
- res[key.lower()] = test_dict[key]
- return res
-
-
-def retry(exceptions: Any = Exception, retries: int = 20, delay: int = 1) -> Callable:
- def decorator(f: Callable) -> Callable:
- def _retry(*args: str, **kwargs: Any) -> Callable:
- _tries = retries
- while _tries > 1:
- try:
- log.logger.debug('{} {} attempt(s) left.'.format(f, _tries - 1))
- return f(*args, **kwargs)
- except exceptions:
- time.sleep(delay)
- _tries -= 1
- log.logger.warn('{} has failed after {} tries'.format(f, retries))
- return f(*args, **kwargs)
- return _retry
- return decorator
-
-
-def http_req(hostname: str = '',
- port: str = '443',
- method: Optional[str] = None,
- headers: MutableMapping[str, str] = {},
- data: Optional[str] = None,
- endpoint: str = '/',
- scheme: str = 'https',
- ssl_verify: bool = False,
- timeout: Optional[int] = None,
- ssl_ctx: Optional[Any] = None) -> Tuple[Any, Any, Any]:
-
- if not ssl_ctx:
- ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
- if not ssl_verify:
- ssl_ctx.check_hostname = False
- ssl_ctx.verify_mode = ssl.CERT_NONE
- else:
- ssl_ctx.verify_mode = ssl.CERT_REQUIRED
-
- url: str = f'{scheme}://{hostname}:{port}{endpoint}'
- _data = bytes(data, 'ascii') if data else None
-
- try:
- req = Request(url, _data, headers, method=method)
- with urlopen(req, context=ssl_ctx, timeout=timeout) as response:
- response_str = response.read()
- response_headers = response.headers
- response_code = response.code
- return response_headers, response_str.decode(), response_code
- except (HTTPError, URLError) as e:
- print(f'{e}')
- # handle error here if needed
- raise
[mypy-kubernetes.*]
ignore_missing_imports = True
+[mypy-setuptools]
+ignore_missing_imports = True
# Make dashboard happy:
[mypy-coverage]
from orchestrator import DaemonDescriptionStatus
from orchestrator._interface import daemon_type_to_service
-from ceph.utils import datetime_now
+from ceph.utils import datetime_now, http_req
from ceph.deployment.inventory import Devices
from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
from cephadm.ssl_cert_utils import SSLCerts
from mgr_util import test_port_allocation, PortAlreadyInUse
-from typing import Any, Dict, List, Set, TYPE_CHECKING, Optional
+from urllib.error import HTTPError, URLError
+from typing import Any, Dict, List, Set, TYPE_CHECKING, Optional, MutableMapping
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
self.validate_node_proxy_data(data)
- host = data["cephx"]["name"]
- results['result'] = self.mgr.node_proxy_cache.oob.get(host)
+ # expecting name to be "node-proxy.<hostname>"
+ hostname = data['cephx']['name'][11:]
+ results['result'] = self.mgr.node_proxy_cache.oob.get(hostname, '')
if not results['result']:
raise cherrypy.HTTPError(400, 'The provided host has no iDrac details.')
return results
raise cherrypy.HTTPError(400, 'The field \'cephx\' must be provided.')
elif 'name' not in data['cephx'].keys():
cherrypy.response.status = 400
- raise cherrypy.HTTPError(400, 'The field \'host\' must be provided.')
- elif 'secret' not in data['cephx'].keys():
- raise cherrypy.HTTPError(400, 'The agent keyring must be provided.')
- elif not self.mgr.agent_cache.agent_keys.get(data['cephx']['name']):
- raise cherrypy.HTTPError(502, f'Make sure the agent is running on {data["cephx"]["name"]}')
- elif data['cephx']['secret'] != self.mgr.agent_cache.agent_keys[data['cephx']['name']]:
- raise cherrypy.HTTPError(403, f'Got wrong keyring from agent on host {data["cephx"]["name"]}.')
+ raise cherrypy.HTTPError(400, 'The field \'name\' must be provided.')
+ # expecting name to be "node-proxy.<hostname>"
+ hostname = data['cephx']['name'][11:]
+ if 'secret' not in data['cephx'].keys():
+ raise cherrypy.HTTPError(400, 'The node-proxy keyring must be provided.')
+ elif not self.mgr.node_proxy_cache.keyrings.get(hostname, ''):
+ raise cherrypy.HTTPError(502, f'Make sure the node-proxy is running on {hostname}')
+ elif data['cephx']['secret'] != self.mgr.node_proxy_cache.keyrings[hostname]:
+ raise cherrypy.HTTPError(403, f'Got wrong keyring from agent on host {hostname}.')
except AttributeError:
raise cherrypy.HTTPError(400, 'Malformed data received.')
:rtype: dict[str, Any]
"""
method: str = cherrypy.request.method
+ header: MutableMapping[str, str] = {}
hostname: Optional[str] = kw.get('hostname')
led_type: Optional[str] = kw.get('type')
id_drive: Optional[str] = kw.get('id')
- data: Optional[str] = None
- # this method is restricted to 'GET' or 'PATCH'
- action: str = 'get_led' if method == 'GET' else 'set_led'
+ payload: Optional[Dict[str, str]] = None
+ endpoint: List[Any] = ['led', led_type]
+ device: str = id_drive if id_drive else ''
+
+ ssl_root_crt = self.mgr.http_server.agent.ssl_certs.get_root_cert()
+ ssl_ctx = ssl.create_default_context()
+ ssl_ctx.check_hostname = True
+ ssl_ctx.verify_mode = ssl.CERT_REQUIRED
+ ssl_ctx.load_verify_locations(cadata=ssl_root_crt)
if not hostname:
msg: str = "listing enclosure LED status for all nodes is not implemented."
self.mgr.log.debug(msg)
raise cherrypy.HTTPError(400, msg)
+ if led_type == 'drive':
+ endpoint.append(device)
+
if hostname not in self.mgr.node_proxy_cache.data.keys():
# TODO(guits): update unit test for this
msg = f"'{hostname}' not found."
self.mgr.log.debug(msg)
raise cherrypy.HTTPError(400, msg)
+ addr: str = self.mgr.inventory.get_addr(hostname)
+
if method == 'PATCH':
# TODO(guits): need to check the request is authorized
# allowing a specific keyring only ? (client.admin or client.agent.. ?)
- data = json.dumps(cherrypy.request.json)
+ data: Dict[str, Any] = cherrypy.request.json
+ if 'state' not in data.keys():
+ msg = "'state' key not provided."
+ raise cherrypy.HTTPError(400, msg)
+ if 'keyring' not in data.keys():
+ msg = "'keyring' key must be provided."
+ raise cherrypy.HTTPError(400, msg)
+ if data['keyring'] != self.mgr.node_proxy_cache.keyrings.get(hostname):
+ msg = 'wrong keyring provided.'
+ raise cherrypy.HTTPError(401, msg)
+ payload = {}
+ payload['state'] = data['state']
if led_type == 'drive':
if id_drive not in self.mgr.node_proxy_cache.data[hostname]['status']['storage'].keys():
self.mgr.log.debug(msg)
raise cherrypy.HTTPError(400, msg)
- payload: Dict[str, Any] = {"node_proxy_oob_cmd":
- {"action": action,
- "type": led_type,
- "id": id_drive,
- "host": hostname,
- "data": data}}
- try:
- message_thread = AgentMessageThread(
- hostname, self.mgr.agent_cache.agent_ports[hostname], payload, self.mgr)
- message_thread.start()
- message_thread.join() # TODO(guits): Add a timeout?
- except KeyError:
- raise cherrypy.HTTPError(502, f"{hostname}'s agent not running, please check.")
- agent_response = message_thread.get_agent_response()
+ endpoint = f'/{"/".join(endpoint)}'
+ header = self.mgr.node_proxy.generate_auth_header(hostname)
+
try:
- response_json: Dict[str, Any] = json.loads(agent_response)
- except json.decoder.JSONDecodeError:
- cherrypy.response.status = 503
- else:
- cherrypy.response.status = response_json.get('http_code', 503)
- if cherrypy.response.status != 200:
- raise cherrypy.HTTPError(cherrypy.response.status, "Couldn't change the LED status.")
+ headers, result, status = http_req(hostname=addr,
+ port='8080',
+ headers=header,
+ method=method,
+ data=json.dumps(payload),
+ endpoint=endpoint,
+ ssl_ctx=ssl_ctx)
+ response_json = json.loads(result)
+ except HTTPError as e:
+ self.mgr.log.debug(e)
+ except URLError:
+ raise cherrypy.HTTPError(502, f'Make sure the node-proxy agent is deployed and running on {hostname}')
+
return response_json
@cherrypy.expose
host, self.mgr.agent_cache.agent_ports[host], payload, self.mgr, daemon_spec)
message_thread.start()
- def _shutdown_node_proxy(self) -> None:
- hosts = set([h for h in self.mgr.cache.get_hosts() if
- (h in self.mgr.agent_cache.agent_ports and not self.mgr.agent_cache.messaging_agent(h))])
-
- for host in hosts:
- payload: Dict[str, Any] = {'node_proxy_shutdown': host}
- message_thread = AgentMessageThread(
- host, self.mgr.agent_cache.agent_ports[host], payload, self.mgr)
- message_thread.start()
-
def _request_ack_all_not_up_to_date(self) -> None:
self.mgr.agent_helpers._request_agent_acks(
set([h for h in self.mgr.cache.get_hosts() if
HOST_CACHE_PREFIX = "host."
SPEC_STORE_PREFIX = "spec."
AGENT_CACHE_PREFIX = 'agent.'
-NODE_PROXY_CACHE_PREFIX = 'node_proxy/data'
+NODE_PROXY_CACHE_PREFIX = 'node_proxy'
class HostCacheStatus(enum.Enum):
self.mgr = mgr
self.data: Dict[str, Any] = {}
self.oob: Dict[str, Any] = {}
+ self.keyrings: Dict[str, str] = {}
def load(self) -> None:
- _oob = self.mgr.get_store('node_proxy/oob', "{}")
+ _oob = self.mgr.get_store(f'{NODE_PROXY_CACHE_PREFIX}/oob', '{}')
self.oob = json.loads(_oob)
- for k, v in self.mgr.get_store_prefix(NODE_PROXY_CACHE_PREFIX).items():
+ _keyrings = self.mgr.get_store(f'{NODE_PROXY_CACHE_PREFIX}/keyrings', '{}')
+ self.keyrings = json.loads(_keyrings)
+
+ for k, v in self.mgr.get_store_prefix(f'{NODE_PROXY_CACHE_PREFIX}/data').items():
host = k.split('/')[-1:][0]
if host not in self.mgr.inventory.keys():
# remove entry for host that no longer exists
- self.mgr.set_store(f"{NODE_PROXY_CACHE_PREFIX}/{host}", None)
+ self.mgr.set_store(f"{NODE_PROXY_CACHE_PREFIX}/data/{host}", None)
try:
self.oob.pop(host)
self.data.pop(host)
+ self.keyrings.pop(host)
except KeyError:
pass
continue
def save(self,
host: str = '',
data: Dict[str, Any] = {}) -> None:
- self.mgr.set_store(f"{NODE_PROXY_CACHE_PREFIX}/{host}", json.dumps(data))
+ self.mgr.set_store(f"{NODE_PROXY_CACHE_PREFIX}/data/{host}", json.dumps(data))
+
+ def update_oob(self, host: str, host_oob_info: Dict[str, str]) -> None:
+ self.oob[host] = host_oob_info
+ self.mgr.set_store(f"{NODE_PROXY_CACHE_PREFIX}/oob", json.dumps(self.oob))
+
+ def update_keyring(self, host: str, key: str) -> None:
+ self.keyrings[host] = key
+ self.mgr.set_store(f"{NODE_PROXY_CACHE_PREFIX}/keyrings", json.dumps(self.keyrings))
def fullreport(self, **kw: Any) -> Dict[str, Any]:
"""
from contextlib import contextmanager
from functools import wraps
from tempfile import TemporaryDirectory, NamedTemporaryFile
+from urllib.error import HTTPError
from threading import Event
from cephadm.service_discovery import ServiceDiscovery
from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
NodeExporterService, SNMPGatewayService, LokiService, PromtailService
from .services.jaeger import ElasticSearchService, JaegerAgentService, JaegerCollectorService, JaegerQueryService
+from .services.node_proxy import NodeProxy
from .schedule import HostAssignment
from .inventory import Inventory, SpecStore, HostCache, AgentCache, EventStore, \
ClientKeyringStore, ClientKeyringSpec, TunedProfileStore, NodeProxyCache
default=3.0,
desc='Multiplied by agent refresh rate to calculate how long agent must not report before being marked down'
),
+ Option(
+ 'hw_monitoring',
+ type='bool',
+ default=False,
+ desc='Deploy hw monitoring daemon on every host.'
+ ),
Option(
'max_osd_draining_count',
type='int',
self.agent_refresh_rate = 0
self.agent_down_multiplier = 0.0
self.agent_starting_port = 0
+ self.hw_monitoring = False
self.service_discovery_port = 0
self.secure_monitoring_stack = False
self.apply_spec_fails: List[Tuple[str, str]] = []
PrometheusService, NodeExporterService, LokiService, PromtailService, CrashService, IscsiService,
IngressService, CustomContainerService, CephfsMirrorService, NvmeofService,
CephadmAgent, CephExporterService, SNMPGatewayService, ElasticSearchService,
- JaegerQueryService, JaegerAgentService, JaegerCollectorService
+ JaegerQueryService, JaegerAgentService, JaegerCollectorService, NodeProxy
]
# https://github.com/python/mypy/issues/8993
self.osd_service: OSDService = cast(OSDService, self.cephadm_services['osd'])
self.iscsi_service: IscsiService = cast(IscsiService, self.cephadm_services['iscsi'])
self.nvmeof_service: NvmeofService = cast(NvmeofService, self.cephadm_services['nvmeof'])
+ self.node_proxy_service: NodeProxy = cast(NodeProxy, self.cephadm_services['node-proxy'])
self.scheduled_async_actions: List[Callable] = []
self.http_server = CephadmHttpServer(self)
self.http_server.start()
+
+ self.node_proxy = NodeProxy(self)
+
self.agent_helpers = CephadmAgentHelpers(self)
if self.use_agent:
self.agent_helpers._apply_agent()
Generate a unique random service name
"""
suffix = daemon_type not in [
- 'mon', 'crash', 'ceph-exporter',
+ 'mon', 'crash', 'ceph-exporter', 'node-proxy',
'prometheus', 'node-exporter', 'grafana', 'alertmanager',
'container', 'agent', 'snmp-gateway', 'loki', 'promtail',
'elasticsearch', 'jaeger-collector', 'jaeger-agent', 'jaeger-query'
spec.oob['addr'] = spec.hostname
if not spec.oob.get('port'):
spec.oob['port'] = '443'
- data = json.loads(self.get_store('node_proxy/oob', '{}'))
- data[spec.hostname] = dict()
- data[spec.hostname]['addr'] = spec.oob['addr']
- data[spec.hostname]['port'] = spec.oob['port']
- data[spec.hostname]['username'] = spec.oob['username']
- data[spec.hostname]['password'] = spec.oob['password']
- self.set_store('node_proxy/oob', json.dumps(data))
+ host_oob_info = dict()
+ host_oob_info['addr'] = spec.oob['addr']
+ host_oob_info['port'] = spec.oob['port']
+ host_oob_info['username'] = spec.oob['username']
+ host_oob_info['password'] = spec.oob['password']
+ self.node_proxy_cache.update_oob(spec.hostname, host_oob_info)
# prime crush map?
if spec.location:
def add_host(self, spec: HostSpec) -> str:
return self._add_host(spec)
+ @handle_orch_error
+ def hardware_light(self, light_type: str, action: str, hostname: str, device: Optional[str] = None) -> Dict[str, Any]:
+ try:
+ result = self.node_proxy.led(light_type=light_type,
+ action=action,
+ hostname=hostname,
+ device=device)
+ except RuntimeError as e:
+ self.log.error(e)
+ raise OrchestratorValidationError(f'Make sure the node-proxy agent is deployed and running on {hostname}')
+ except HTTPError as e:
+ self.log.error(e)
+ raise OrchestratorValidationError(f"http error while querying node-proxy API: {e}")
+ return result
+
+ @handle_orch_error
+ def hardware_shutdown(self, hostname: str, force: Optional[bool] = False, yes_i_really_mean_it: bool = False) -> str:
+ if not yes_i_really_mean_it:
+ raise OrchestratorError("you must pass --yes-i-really-mean-it")
+
+ try:
+ self.node_proxy.shutdown(hostname, force)
+ except RuntimeError as e:
+ self.log.error(e)
+ raise OrchestratorValidationError(f'Make sure the node-proxy agent is deployed and running on {hostname}')
+ except HTTPError as e:
+ self.log.error(e)
+ raise OrchestratorValidationError(f"Can't shutdown node {hostname}: {e}")
+ return f'Shutdown scheduled on {hostname}'
+
+ @handle_orch_error
+ def hardware_powercycle(self, hostname: str, yes_i_really_mean_it: bool = False) -> str:
+ if not yes_i_really_mean_it:
+ raise OrchestratorError("you must pass --yes-i-really-mean-it")
+
+ try:
+ self.node_proxy.powercycle(hostname)
+ except RuntimeError as e:
+ self.log.error(e)
+ raise OrchestratorValidationError(f'Make sure the node-proxy agent is deployed and running on {hostname}')
+ except HTTPError as e:
+ self.log.error(e)
+ raise OrchestratorValidationError(f"Can't perform powercycle on node {hostname}: {e}")
+ return f'Powercycle scheduled on {hostname}'
+
@handle_orch_error
def node_proxy_summary(self, hostname: Optional[str] = None) -> Dict[str, Any]:
return self.node_proxy_cache.summary(hostname=hostname)
pass
deps = sorted([self.get_mgr_ip(), server_port, root_cert,
str(self.device_enhanced_scan)])
+ elif daemon_type == 'node-proxy':
+ root_cert = ''
+ server_port = ''
+ try:
+ server_port = str(self.http_server.agent.server_port)
+ root_cert = self.http_server.agent.ssl_certs.get_root_cert()
+ except Exception:
+ pass
+ deps = sorted([self.get_mgr_ip(), server_port, root_cert])
elif daemon_type == 'iscsi':
if spec:
iscsi_spec = cast(IscsiServiceSpec, spec)
if self.mgr.agent_helpers._handle_use_agent_setting():
continue
+ if self.mgr.node_proxy_service.handle_hw_monitoring_setting():
+ continue
+
if self.mgr.upgrade.continue_upgrade():
continue
# the CephService class refers to service types, not daemon types
if daemon_type in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'nvmeof', 'ingress', 'ceph-exporter']:
return AuthEntity(f'client.{daemon_type}.{daemon_id}')
- elif daemon_type in ['crash', 'agent']:
+ elif daemon_type in ['crash', 'agent', 'node-proxy']:
if host == "":
raise OrchestratorError(
f'Host not provided to generate <{daemon_type}> auth entity name')
return daemon_spec
- def pre_remove(self, daemon: DaemonDescription) -> None:
- super().pre_remove(daemon)
-
- assert daemon.daemon_id is not None
- daemon_id: str = daemon.daemon_id
-
- logger.info('Removing agent %s...' % daemon_id)
-
- self.mgr.agent_helpers._shutdown_node_proxy()
-
def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
agent = self.mgr.http_server.agent
try:
--- /dev/null
+import json
+import ssl
+import base64
+
+from urllib.error import HTTPError, URLError
+from typing import List, Any, Dict, Tuple, Optional, MutableMapping
+
+from .cephadmservice import CephadmDaemonDeploySpec, CephService
+from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
+from ceph.utils import http_req
+from orchestrator import OrchestratorError
+
+
+class NodeProxy(CephService):
+ TYPE = 'node-proxy'
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+ daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+
+ if not self.mgr.http_server.agent:
+ raise OrchestratorError('Cannot deploy node-proxy before creating cephadm endpoint')
+
+ keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host), [])
+ daemon_spec.keyring = keyring
+ self.mgr.node_proxy_cache.update_keyring(host, keyring)
+
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
+ return daemon_spec
+
+ def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
+ # node-proxy is re-using the agent endpoint and therefore
+ # needs similar checks to see if the endpoint is ready.
+ self.agent_endpoint = self.mgr.http_server.agent
+ try:
+ assert self.agent_endpoint
+ assert self.agent_endpoint.ssl_certs.get_root_cert()
+ assert self.agent_endpoint.server_port
+ except Exception:
+ raise OrchestratorError(
+ 'Cannot deploy node-proxy daemons until cephadm endpoint has finished generating certs')
+
+ listener_cert, listener_key = self.agent_endpoint.ssl_certs.generate_cert(daemon_spec.host, self.mgr.inventory.get_addr(daemon_spec.host))
+ cfg = {
+ 'target_ip': self.mgr.get_mgr_ip(),
+ 'target_port': self.agent_endpoint.server_port,
+ 'name': f'node-proxy.{daemon_spec.host}',
+ 'keyring': daemon_spec.keyring,
+ 'root_cert.pem': self.agent_endpoint.ssl_certs.get_root_cert(),
+ 'listener.crt': listener_cert,
+ 'listener.key': listener_key,
+ }
+ config = {'node-proxy.json': json.dumps(cfg)}
+
+ return config, sorted([str(self.mgr.get_mgr_ip()), str(self.agent_endpoint.server_port),
+ self.agent_endpoint.ssl_certs.get_root_cert()])
+
+ def handle_hw_monitoring_setting(self) -> bool:
+ # function to apply or remove node-proxy service spec depending
+ # on whether the hw_mointoring config option is set or not.
+ # It should return True when it either creates or deletes a spec
+ # and False otherwise.
+ if self.mgr.hw_monitoring:
+ if 'node-proxy' not in self.mgr.spec_store:
+ spec = ServiceSpec(
+ service_type='node-proxy',
+ placement=PlacementSpec(host_pattern='*')
+ )
+ self.mgr.spec_store.save(spec)
+ return True
+ return False
+ else:
+ if 'node-proxy' in self.mgr.spec_store:
+ self.mgr.spec_store.rm('node-proxy')
+ return True
+ return False
+
+ def get_ssl_ctx(self) -> ssl.SSLContext:
+ ssl_root_crt = self.mgr.http_server.agent.ssl_certs.get_root_cert()
+ ssl_ctx = ssl.create_default_context()
+ ssl_ctx.check_hostname = True
+ ssl_ctx.verify_mode = ssl.CERT_REQUIRED
+ ssl_ctx.load_verify_locations(cadata=ssl_root_crt)
+ return ssl_ctx
+
+ def led(self, light_type: str, action: str, hostname: str, device: Optional[str] = None) -> Dict[str, Any]:
+ ssl_ctx: ssl.SSLContext = self.get_ssl_ctx()
+ header: MutableMapping[str, str] = {}
+ method: str = 'PATCH' if action in ['on', 'off'] else 'GET'
+ payload: Optional[Dict[str, str]] = None
+ addr: str = self.mgr.inventory.get_addr(hostname)
+ endpoint: List[str] = ['led', light_type]
+ _device: str = device if device else ''
+
+ if light_type == 'drive':
+ endpoint.append(_device)
+
+ if method == 'PATCH':
+ payload = dict(state=action)
+
+ header = self.generate_auth_header(hostname)
+
+ endpoint = f'/{"/".join(endpoint)}'
+
+ try:
+ headers, result, status = http_req(hostname=addr,
+ port='8080',
+ headers=header,
+ method=method,
+ data=json.dumps(payload),
+ endpoint=endpoint,
+ ssl_ctx=ssl_ctx)
+ result_json = json.loads(result)
+ except HTTPError as e:
+ self.mgr.log.error(e)
+ raise
+ except URLError as e:
+ raise RuntimeError(e)
+
+ return result_json
+
+ def generate_auth_header(self, hostname: str) -> Dict[str, str]:
+ try:
+ username = self.mgr.node_proxy_cache.oob[hostname]['username']
+ password = self.mgr.node_proxy_cache.oob[hostname]['password']
+ auth: bytes = f'{username}:{password}'.encode('utf-8')
+ auth_str: str = base64.b64encode(auth).decode('utf-8')
+ header = {'Authorization': f'Basic {auth_str}'}
+ except KeyError as e:
+ self.mgr.log.error(f'Check oob information is provided for {hostname}.')
+ raise RuntimeError(e)
+ return header
+
+ def shutdown(self, hostname: str, force: Optional[bool] = False) -> Dict[str, Any]:
+ ssl_ctx: ssl.SSLContext = self.get_ssl_ctx()
+ header: Dict[str, str] = self.generate_auth_header(hostname)
+ addr: str = self.mgr.inventory.get_addr(hostname)
+
+ endpoint = '/shutdown'
+ payload: Dict[str, Optional[bool]] = dict(force=force)
+
+ try:
+ headers, result, status = http_req(hostname=addr,
+ port='8080',
+ headers=header,
+ data=json.dumps(payload),
+ endpoint=endpoint,
+ ssl_ctx=ssl_ctx)
+ result_json = json.loads(result)
+ except HTTPError as e:
+ self.mgr.log.error(e)
+ raise
+ except URLError as e:
+ raise RuntimeError(e)
+
+ return result_json
+
+ def powercycle(self, hostname: str) -> Dict[str, Any]:
+ ssl_ctx: ssl.SSLContext = self.get_ssl_ctx()
+ header: Dict[str, str] = self.generate_auth_header(hostname)
+ addr: str = self.mgr.inventory.get_addr(hostname)
+
+ endpoint = '/powercycle'
+
+ try:
+ headers, result, status = http_req(hostname=addr,
+ port='8080',
+ headers=header,
+ data="{}",
+ endpoint=endpoint,
+ ssl_ctx=ssl_ctx)
+ result_json = json.loads(result)
+ except HTTPError as e:
+ self.mgr.log.error(e)
+ raise
+ except URLError as e:
+ raise RuntimeError(e)
+
+ return result_json
class TestNodeProxyEndpoint(helper.CPWebCase):
mgr = FakeMgr()
app = NodeProxyEndpoint(mgr)
- mgr.agent_cache.agent_keys = {"host01": "fake-secret01",
- "host02": "fake-secret02"}
+ mgr.node_proxy.keyrings = {"host01": "fake-secret01",
+ "host02": "fake-secret02"}
mgr.node_proxy.oob = {"host01": {"username": "oob-user01",
"password": "oob-pass01"},
"host02": {"username": "oob-user02",
self.assertStatus('400 Bad Request')
def test_oob_data_misses_secret_field(self):
- data = '{"cephx": {"name": "host01"}}'
+ data = '{"cephx": {"name": "node-proxy.host01"}}'
self.getPage("/oob", method="POST", body=data, headers=[('Content-Type', 'application/json'),
('Content-Length', str(len(data)))])
self.assertStatus('400 Bad Request')
def test_oob_agent_not_running(self):
- data = '{"cephx": {"name": "host03", "secret": "fake-secret03"}}'
+ data = '{"cephx": {"name": "node-proxy.host03", "secret": "fake-secret03"}}'
self.getPage("/oob", method="POST", body=data, headers=[('Content-Type', 'application/json'),
('Content-Length', str(len(data)))])
self.assertStatus('502 Bad Gateway')
def test_oob_wrong_keyring(self):
- data = '{"cephx": {"name": "host01", "secret": "wrong-keyring"}}'
+ data = '{"cephx": {"name": "node-proxy.host01", "secret": "wrong-keyring"}}'
self.getPage("/oob", method="POST", body=data, headers=[('Content-Type', 'application/json'),
('Content-Length', str(len(data)))])
self.assertStatus('403 Forbidden')
def test_oob_ok(self):
- data = '{"cephx": {"name": "host01", "secret": "fake-secret01"}}'
+ data = '{"cephx": {"name": "node-proxy.host01", "secret": "fake-secret01"}}'
self.getPage("/oob", method="POST", body=data, headers=[('Content-Type', 'application/json'),
('Content-Length', str(len(data)))])
self.assertStatus('200 OK')
def test_data_missing_patch(self):
- data = '{"cephx": {"name": "host01", "secret": "fake-secret01"}}'
+ data = '{"cephx": {"name": "node-proxy.host01", "secret": "fake-secret01"}}'
self.getPage("/data", method="POST", body=data, headers=[('Content-Type', 'application/json'),
('Content-Length', str(len(data)))])
self.assertStatus('400 Bad Request')
def test_data_raises_alert(self):
patch = node_proxy_data.full_set_with_critical
- data = {"cephx": {"name": "host01", "secret": "fake-secret01"}, "patch": patch}
+ data = {"cephx": {"name": "node-proxy.host01", "secret": "fake-secret01"}, "patch": patch}
data_str = json.dumps(data)
self.getPage("/data", method="POST", body=data_str, headers=[('Content-Type', 'application/json'),
('Content-Length', str(len(data_str)))])
CEPH_UPGRADE_ORDER = CEPH_TYPES + GATEWAY_TYPES + MONITORING_STACK_TYPES
# these daemon types use the ceph container image
-CEPH_IMAGE_TYPES = CEPH_TYPES + ['iscsi', 'nfs']
+CEPH_IMAGE_TYPES = CEPH_TYPES + ['iscsi', 'nfs', 'node-proxy']
# these daemons do not use the ceph image. There are other daemons
# that also don't use the ceph image, but we only care about those
"""
raise NotImplementedError()
+ def hardware_light(self, light_type: str, action: str, hostname: str, device: Optional[str] = None) -> OrchResult[Dict[str, Any]]:
+ """
+ Light a chassis or device ident LED.
+
+ :param light_type: led type (chassis or device).
+ :param action: set or get status led.
+ :param hostname: the name of the host.
+ :param device: the device id (when light_type = 'device')
+ """
+ raise NotImplementedError()
+
+ def hardware_powercycle(self, hostname: str, yes_i_really_mean_it: bool = False) -> OrchResult[str]:
+ """
+ Reboot a host.
+
+ :param hostname: the name of the host being rebooted.
+ """
+ raise NotImplementedError()
+
+ def hardware_shutdown(self, hostname: str, force: Optional[bool] = False, yes_i_really_mean_it: bool = False) -> OrchResult[str]:
+ """
+ Shutdown a host.
+
+ :param hostname: the name of the host to shutdown.
+ """
+ raise NotImplementedError()
+
def hardware_status(self, hostname: Optional[str] = None, category: Optional[str] = 'summary') -> OrchResult[str]:
"""
Display hardware status.
'crashcollector': 'crash', # Specific Rook Daemon
'container': 'container',
'agent': 'agent',
+ 'node-proxy': 'node-proxy',
'snmp-gateway': 'snmp-gateway',
'elasticsearch': 'elasticsearch',
'jaeger-agent': 'jaeger-agent',
'crash': ['crash'],
'container': ['container'],
'agent': ['agent'],
+ 'node-proxy': ['node-proxy'],
'snmp-gateway': ['snmp-gateway'],
'elasticsearch': ['elasticsearch'],
'jaeger-agent': ['jaeger-agent'],
return table.get_string()
+ class HardwareLightType(enum.Enum):
+ chassis = 'chassis'
+ device = 'drive'
+
+ class HardwareLightAction(enum.Enum):
+ on = 'on'
+ off = 'off'
+ get = 'get'
+
+ @_cli_write_command('orch hardware light')
+ def _hardware_light(self,
+ light_type: HardwareLightType, action: HardwareLightAction,
+ hostname: str, device: Optional[str] = None) -> HandleCommandResult:
+ """Enable or Disable a device or chassis LED"""
+ if light_type == self.HardwareLightType.device and not device:
+ return HandleCommandResult(stderr='you must pass a device ID.',
+ retval=-errno.ENOENT)
+
+ completion = self.hardware_light(light_type.value, action.value, hostname, device)
+ data = raise_if_exception(completion)
+ output: str = ''
+ if action == self.HardwareLightAction.get:
+ status = 'on' if data["LocationIndicatorActive"] else 'off'
+ if light_type == self.HardwareLightType.device:
+ output = f'ident LED for {device} on {hostname} is: {status}'
+ else:
+ output = f'ident chassis LED for {hostname} is: {status}'
+ else:
+ pass
+ return HandleCommandResult(stdout=output)
+
+ @_cli_write_command('orch hardware powercycle')
+ def _hardware_powercycle(self, hostname: str, yes_i_really_mean_it: bool = False) -> HandleCommandResult:
+ """Reboot a host"""
+ completion = self.hardware_powercycle(hostname, yes_i_really_mean_it=yes_i_really_mean_it)
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
+ @_cli_write_command('orch hardware shutdown')
+ def _hardware_shutdown(self, hostname: str, force: Optional[bool] = False, yes_i_really_mean_it: bool = False) -> HandleCommandResult:
+ """Shutdown a host"""
+ completion = self.hardware_shutdown(hostname, force, yes_i_really_mean_it=yes_i_really_mean_it)
+ raise_if_exception(completion)
+ return HandleCommandResult(stdout=completion.result_str())
+
@_cli_write_command('orch host rm')
def _remove_host(self, hostname: str, force: bool = False, offline: bool = False) -> HandleCommandResult:
"""Remove a host"""
KNOWN_SERVICE_TYPES = 'alertmanager crash grafana iscsi nvmeof loki promtail mds mgr mon nfs ' \
'node-exporter osd prometheus rbd-mirror rgw agent ceph-exporter ' \
'container ingress cephfs-mirror snmp-gateway jaeger-tracing ' \
- 'elasticsearch jaeger-agent jaeger-collector jaeger-query'.split()
+ 'elasticsearch jaeger-agent jaeger-collector jaeger-query ' \
+ 'node-proxy'.split()
REQUIRES_SERVICE_ID = 'iscsi nvmeof mds nfs rgw container ingress '.split()
MANAGED_CONFIG_OPTIONS = [
'mds_join_fs',
import datetime
import re
import string
+import ssl
-from typing import Optional
+from typing import Optional, MutableMapping, Tuple, Any
+from urllib.error import HTTPError, URLError
+from urllib.request import urlopen, Request
+
+import logging
+
+log = logging.getLogger(__name__)
def datetime_now() -> datetime.datetime:
return False
return True
+
+
+def http_req(hostname: str = '',
+ port: str = '443',
+ method: Optional[str] = None,
+ headers: MutableMapping[str, str] = {},
+ data: Optional[str] = None,
+ endpoint: str = '/',
+ scheme: str = 'https',
+ ssl_verify: bool = False,
+ timeout: Optional[int] = None,
+ ssl_ctx: Optional[Any] = None) -> Tuple[Any, Any, Any]:
+
+ if not ssl_ctx:
+ ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+ if not ssl_verify:
+ ssl_ctx.check_hostname = False
+ ssl_ctx.verify_mode = ssl.CERT_NONE
+ else:
+ ssl_ctx.verify_mode = ssl.CERT_REQUIRED
+
+ url: str = f'{scheme}://{hostname}:{port}{endpoint}'
+ _data = bytes(data, 'ascii') if data else None
+ _headers = headers
+ if data and not method:
+ method = 'POST'
+ if not _headers.get('Content-Type') and method in ['POST', 'PATCH']:
+ _headers['Content-Type'] = 'application/json'
+ try:
+ req = Request(url, _data, _headers, method=method)
+ with urlopen(req, context=ssl_ctx, timeout=timeout) as response:
+ response_str = response.read()
+ response_headers = response.headers
+ response_code = response.code
+ return response_headers, response_str.decode(), response_code
+ except (HTTPError, URLError) as e:
+ log.error(e)
+ # handle error here if needed
+ raise