def get_sn(self) -> str:
raise NotImplementedError()
+ def get_led(self) -> Dict[str, Any]:
+ raise NotImplementedError()
+
+ def set_led(self, data: Dict[str, str]) -> int:
+ raise NotImplementedError()
+
def get_host(self) -> str:
return socket.gethostname()
from .redfishdellsystem import RedfishDellSystem
from .reporter import Reporter
from .util import Config, Logger
-from typing import Dict, Any, Optional
+from typing import Dict, Any, Optional, List
from .basesystem import BaseSystem
import sys
import argparse
def firmwares(self) -> Dict[str, Any]:
return {'firmwares': self.backend.get_firmwares()}
+ def _cp_dispatch(self, vpath: List[str]) -> "API":
+ if vpath[0] == 'led':
+ if cherrypy.request.method == 'GET':
+ return self.get_led
+ if cherrypy.request.method == 'PATCH':
+ return self.set_led
+ return self
+
@cherrypy.expose
+ @cherrypy.tools.allow(methods=['GET'])
@cherrypy.tools.json_out()
+ def get_led(self, **kw: Dict[str, Any]) -> Dict[str, Any]:
+ return self.backend.get_led()
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['PATCH'])
@cherrypy.tools.json_in()
- def index(self, endpoint: str) -> Dict[str, Any]:
- kw = dict(endpoint=endpoint)
- result = self.common(**kw)
+ @cherrypy.tools.json_out()
+ @cherrypy.tools.auth_basic(on=True)
+ def set_led(self, **kw: Dict[str, Any]) -> Dict[str, Any]:
+ data = cherrypy.request.json
+ rc = self.backend.set_led(data)
+
+ if rc != 200:
+ cherrypy.response.status = rc
+ result = {"state": f"error: please, verify the data you sent."}
+ else:
+ result = {"state": data["state"].lower()}
return result
def stop(self) -> None:
self.log: Logger = Logger(__name__)
self.log.logger.info(f"Initializing redfish client {__name__}")
self.host: str = f"https://{host}:{port}"
- self.token: Dict[str, str] = {}
+ self.token: str = ''
self.location: str = ''
def login(self) -> None:
msg = f"Can't log in to {self.host} as '{self.username}': {e}"
self.log.logger.error(msg)
raise RuntimeError
- self.token = {"X-Auth-Token": _headers['X-Auth-Token']}
+ self.token = _headers['X-Auth-Token']
self.location = _headers['Location']
def is_logged_in(self) -> bool:
self.log.logger.debug(f"Checking token validity for {self.host}")
- if not self.location or not self.token.get('X-Auth-Token'):
+ if not self.location or not self.token:
self.log.logger.debug(f"No token found for {self.host}.")
return False
- headers = {"X-Auth-Token": self.token['X-Auth-Token']}
+ headers = {"X-Auth-Token": self.token}
try:
_headers, _data, _status_code = self.query(headers=headers,
endpoint=self.location)
def logout(self) -> Dict[str, Any]:
try:
_, _data, _status_code = self.query(method='DELETE',
- headers=self.token,
+ headers={"X-Auth-Token": self.token},
endpoint=self.location)
except URLError:
self.log.logger.error(f"Can't log out from {self.host}")
if self.PREFIX not in path:
path = f"{self.PREFIX}{path}"
try:
- _, result, _status_code = self.query(headers=self.token,
- endpoint=path)
+ _, result, _status_code = self.query(endpoint=path)
result_json = json.loads(result)
return result_json
except URLError as e:
endpoint: str = '',
timeout: int = 10) -> Tuple[Dict[str, str], str, int]:
url = f'{self.host}{endpoint}'
-
+ _headers = headers.copy() if headers else {}
+ if self.token:
+ _headers['X-Auth-Token'] = self.token
+ if not _headers.get('Content-Type') and method in ['POST', 'PUT', 'PATCH']:
+ _headers['Content-Type'] = 'application/json'
# ssl_ctx = ssl.create_default_context()
# ssl_ctx.check_hostname = True
# ssl_ctx.verify_mode = ssl.CERT_REQUIRED
ssl_ctx = ssl._create_unverified_context()
_data = bytes(data, 'ascii') if data else None
try:
- req = Request(url, _data, headers=headers, method=method)
+ req = Request(url, _data, headers=_headers, method=method)
with urlopen(req, context=ssl_ctx, timeout=timeout) as response:
response_str = response.read()
response_headers = response.headers
+import json
from .baseredfishsystem import BaseRedfishSystem
from .util import Logger, normalize_dict, to_snake_case
from typing import Dict, Any, List
def get_fans(self) -> Dict[str, Dict[str, Dict]]:
return self._sys['fans']
+ def get_led(self) -> Dict[str, Dict[str, Dict]]:
+ endpoint = f"/redfish/v1/{self.chassis_endpoint}"
+ result = self.client.query(method='GET',
+ endpoint=endpoint,
+ timeout=10)
+ response_json = json.loads(result[1])
+ mapper = {
+ 'true': 'on',
+ 'false': 'off'
+ }
+ if result[2] == 200:
+ return {"state": mapper[str(response_json['LocationIndicatorActive']).lower()]}
+ else:
+ return {"error": "Couldn't retrieve enclosure LED status."}
+
+ def set_led(self, data: Dict[str, str]) -> int:
+ # '{"IndicatorLED": "Lit"}' -> LocationIndicatorActive = false
+ # '{"IndicatorLED": "Blinking"}' -> LocationIndicatorActive = true
+ mapper = {
+ "on": 'Blinking',
+ "off": 'Lit'
+ }
+ try:
+ _data = {
+ "IndicatorLED": mapper[data["state"].lower()]
+ }
+ _, response, status = self.client.query(
+ data=json.dumps(_data),
+ method='PATCH',
+ endpoint=f"/redfish/v1{self.chassis_endpoint}"
+ )
+ except KeyError:
+ status = 400
+ result = status
+ return result
+
def _update_network(self) -> None:
fields = ['Description', 'Name', 'SpeedMbps', 'Status']
self.log.logger.debug('Updating network')
self._sys['firmwares'] = self.build_common_data(data=self._system['UpdateService'],
fields=fields,
path='FirmwareInventory')
- self.log.logger.warning(f"guits-debug1:{self._sys['firmwares']}")
import tempfile
import threading
import time
+import base64
from orchestrator import DaemonDescriptionStatus
from orchestrator._interface import daemon_type_to_service
from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
from cephadm.ssl_cert_utils import SSLCerts
from mgr_util import test_port_allocation, PortAlreadyInUse
+from urllib.request import urlopen, Request
+from urllib.error import HTTPError, URLError
-from typing import Any, Dict, List, Set, TYPE_CHECKING, Optional
+from typing import Any, Dict, List, Set, Tuple, TYPE_CHECKING, Optional
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
def configure(self) -> None:
self.host_data = HostData(self.mgr, self.server_port, self.server_addr)
- self.node_proxy = NodeProxy(self.mgr)
self.configure_tls(self.host_data)
+ self.node_proxy = NodeProxy(self.mgr)
self.configure_routes()
self.find_free_port()
class NodeProxy:
def __init__(self, mgr: "CephadmOrchestrator"):
self.mgr = mgr
+ self.ssl_root_crt = self.mgr.http_server.agent.ssl_certs.get_root_cert()
+ self.ssl_ctx = ssl.create_default_context()
+ self.ssl_ctx.check_hostname = True
+ self.ssl_ctx.verify_mode = ssl.CERT_REQUIRED
+ self.ssl_ctx.load_verify_locations(cadata=self.ssl_root_crt)
def _cp_dispatch(self, vpath: List[str]) -> "NodeProxy":
if len(vpath) == 2:
def validate_node_proxy_data(self, data: Dict[str, Any]) -> bool:
self.validate_msg = 'valid node-proxy data received.'
cherrypy.response.status = 200
- if 'cephx' not in data:
- cherrypy.response.status = 400
- self.validate_msg = 'The field \'host\' must be provided.'
- elif 'secret' not in data['cephx']:
- cherrypy.response.status = 400
- self.validate_msg = 'The agent keyring must be provided.'
- elif not self.mgr.agent_cache.agent_keys.get(data['cephx']['name']):
+ try:
+ if 'cephx' not in data.keys():
+ cherrypy.response.status = 400
+ self.validate_msg = 'The field \'cephx\' must be provided.'
+ elif 'name' not in data['cephx'].keys():
+ cherrypy.response.status = 400
+ self.validate_msg = 'The field \'host\' must be provided.'
+ elif 'secret' not in data['cephx'].keys():
+ cherrypy.response.status = 400
+ self.validate_msg = 'The agent keyring must be provided.'
+ elif not self.mgr.agent_cache.agent_keys.get(data['cephx']['name']):
+ cherrypy.response.status = 400
+ self.validate_msg = f'Make sure the agent is running on {data["cephx"]["name"]}'
+ elif data['cephx']['secret'] != self.mgr.agent_cache.agent_keys[data['cephx']['name']]:
+ cherrypy.response.status = 403
+ self.validate_msg = f'Got wrong keyring from agent on host {data["cephx"]["name"]}.'
+ except AttributeError:
cherrypy.response.status = 400
- self.validate_msg = f'Make sure the agent is running on {data["cephx"]["name"]}'
- elif data['cephx']['secret'] != self.mgr.agent_cache.agent_keys[data['cephx']['name']]:
- cherrypy.response.status = 403
- self.validate_msg = f'Got wrong keyring from agent on host {data["cephx"]["name"]}.'
+ self.validate_msg = 'Malformed data received.'
return cherrypy.response.status == 200
return results
+ def query_endpoint(self,
+ addr: str = '',
+ port: str = '',
+ method: Optional[str] = None,
+ headers: Optional[Dict[str, str]] = {},
+ data: Optional[bytes] = None,
+ endpoint: str = '',
+ ssl_ctx: Optional[Any] = None) -> Tuple[int, Dict[str, Any]]:
+ url = f'https://{addr}:{port}{endpoint}'
+ _headers = headers
+ response_json = {}
+ if not _headers.get('Content-Type'):
+ # default to application/json if nothing provided
+ _headers['Content-Type'] = 'application/json'
+ _data = bytes(data, 'ascii') if data else None
+ try:
+ req = Request(url, _data, _headers, method=method)
+ with urlopen(req, context=ssl_ctx) as response:
+ response_str = response.read()
+ response_json = json.loads(response_str)
+ response_status = response.status
+ except HTTPError as e:
+ self.mgr.log.debug(f"{e.code} {e.reason}")
+ response_status = e.code
+ except URLError as e:
+ self.mgr.log.debug(f"{e.reason}")
+ raise
+ except Exception as e:
+ self.mgr.log.error(f"{e}")
+ raise
+ return (response_status, response_json)
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['GET', 'PATCH'])
+ @cherrypy.tools.json_in()
+ @cherrypy.tools.json_out()
+ def led(self, **kw: Any) -> Dict[str, Any]:
+ method: str = cherrypy.request.method
+ hostname: Optional[str] = kw.get('hostname')
+ headers: Dict[str, str] = {}
+
+ if not hostname:
+ msg: str = "listing enclosure LED status for all nodes is not implemented."
+ self.mgr.log.debug(msg)
+ raise cherrypy.HTTPError(501, msg)
+
+ addr = self.mgr.inventory.get_addr(hostname)
+
+ if method == 'PATCH':
+ # TODO(guits): need to check the request is authorized
+ # allowing a specific keyring only ? (client.admin or client.agent.. ?)
+ data: str = json.dumps(cherrypy.request.json)
+ username = self.mgr.node_proxy.idrac[hostname]['username']
+ password = self.mgr.node_proxy.idrac[hostname]['password']
+ auth = f"{username}:{password}".encode("utf-8")
+ auth = base64.b64encode(auth).decode("utf-8")
+ headers = {"Authorization": f"Basic {auth}"}
+
+ try:
+ status, result = self.query_endpoint(data=data,
+ headers=headers,
+ addr=addr,
+ method=method,
+ port=8080,
+ endpoint='/led',
+ ssl_ctx=self.ssl_ctx)
+ except URLError as e:
+ raise cherrypy.HTTPError(502, f"{e}")
+ cherrypy.response.status = status
+ return result
+
@cherrypy.expose
@cherrypy.tools.allow(methods=['GET'])
@cherrypy.tools.json_out()