import time
import enum
from collections import namedtuple
-import tempfile
+from tempfile import NamedTemporaryFile
from mgr_module import CLIReadCommand, MgrModule, MgrStandbyModule, PG_STATES, Option, ServiceInfoT, HandleCommandResult, CLIWriteCommand
from mgr_util import get_default_addr, profile_method, build_url
try:
security_config = json.loads(out)
if security_config.get('security_enabled', False):
- self.setup_tls_using_cephadm(server_addr, server_port)
+ self.setup_tls_config(server_addr, server_port)
return
except Exception as e:
self.log.exception(f'Failed to setup cephadm based secure monitoring stack: {e}\n',
'Falling back to default configuration')
- # In any error fallback to plain http mode
- self.setup_default_config(server_addr, server_port)
-
def setup_default_config(self, server_addr: str, server_port: int) -> None:
cherrypy.config.update({
'server.socket_host': server_addr,
self.set_uri(build_url(scheme='http', host=self.get_server_addr(),
port=server_port, path='/'))
- def setup_tls_using_cephadm(self, server_addr: str, server_port: int) -> None:
+ def setup_tls_config(self, server_addr: str, server_port: int) -> None:
+ # Temporarily disabling the verify function due to issues.
+ # Please check verify_tls_files below to more information.
+ # from mgr_util import verify_tls_files
cmd = {'prefix': 'orch certmgr generate-certificates',
'module_name': 'prometheus',
'format': 'json'}
return
cert_key = json.loads(out)
- self.cert_file = tempfile.NamedTemporaryFile()
+ self.cert_file = NamedTemporaryFile()
self.cert_file.write(cert_key['cert'].encode('utf-8'))
self.cert_file.flush() # cert_tmp must not be gc'ed
- self.key_file = tempfile.NamedTemporaryFile()
+ self.key_file = NamedTemporaryFile()
self.key_file.write(cert_key['key'].encode('utf-8'))
self.key_file.flush() # pkey_tmp must not be gc'ed
+ # Temporarily disabling the verify function due to issues:
+ # See https://github.com/pyca/bcrypt/issues/694 for details.
+ # Re-enable once the issue is resolved.
+ # verify_tls_files(self.cert_file.name, self.key_file.name)
cert_file_path, key_file_path = self.cert_file.name, self.key_file.name
cherrypy.config.update({
import functools
import os
import json
+import base64
+import time
+from typing import Optional, Dict, Union, Tuple, Type, Optional
+from functools import wraps
from ceph.deployment import inventory
from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec
from ceph.utils import datetime_now
-from typing import List, Dict, Optional, Callable, Any, TypeVar, Tuple, TYPE_CHECKING
+from typing import List, Dict, Optional, Callable, Any, TypeVar, Tuple, TYPE_CHECKING, cast
try:
from ceph.deployment.drive_group import DriveGroupSpec
try:
from kubernetes import client, config
- from kubernetes.client.rest import ApiException
+ from kubernetes.client import ApiException, CoreV1Api, V1Secret
kubernetes_imported = True
kubernetes_imported = False
client = None
config = None
+ ApiException = Exception
+ CoreV1Api = None
+ V1Secret = object
from mgr_module import MgrModule, Option, NFS_POOL_NAME
import orchestrator
FuncT = TypeVar('FuncT', bound=Callable)
ServiceSpecT = TypeVar('ServiceSpecT', bound=ServiceSpec)
+def retry(
+ on_exception: Union[Type[Exception], Tuple[Type[Exception], ...]],
+ tries: int = 3,
+ delay: int = 1,
+ backoff: int = 2,
+ max_delay: int = 60,
+ logger: Optional[logging.Logger] = None,
+) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
+ def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
+ @wraps(func)
+ def wrapper(*args: Any, **kwargs: Any) -> Any:
+ wait = delay
+ err: Optional[Exception] = None
+ for i in range(tries):
+ try:
+ return func(*args, **kwargs)
+ except on_exception as e:
+ err = e
+ if logger:
+ logger.warning(
+ f"Retry #{i+1}/{tries} after exception in '{func.__name__}': {e}"
+ )
+ if i < tries - 1:
+ time.sleep(min(wait, max_delay))
+ wait *= backoff
+ raise err # type: ignore
+ return wrapper
+ return decorator
class RookEnv(object):
def __init__(self) -> None:
default='local',
desc='storage class name for LSO-discovered PVs',
),
+ Option(
+ 'secure_monitoring_stack',
+ type='bool',
+ default=False,
+ desc='Enable TLS security for all the monitoring stack daemons'
+ ),
+ Option(
+ 'prometheus_tls_secret_name',
+ type='str',
+ default='rook-ceph-prometheus-server-tls',
+ desc='name of tls secret in k8s for prometheus',
+ )
]
@staticmethod
@handle_orch_error
def get_security_config(self) -> Dict[str, bool]:
- return {}
+ secure_monitoring_stack = cast(
+ bool, self.get_module_option_ex('rook', 'secure_monitoring_stack', False)
+ )
+ return {
+ 'security_enabled': secure_monitoring_stack,
+ 'mgmt_gw_enabled': False
+ }
@handle_orch_error
def remove_service(self, service_name: str, force: bool = False) -> str:
except Exception as e:
logging.error(e)
return OrchResult(None, Exception("Unable to zap device: " + str(e.with_traceback(None))))
- return OrchResult(f'{path} on {host} zapped')
+ return OrchResult(f'{path} on {host} zapped')
@handle_orch_error
def apply_mon(self, spec):
@handle_orch_error
def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool]) -> Dict[Any, Any]:
return {}
+
+ # Retry decorator for handling transient Kubernetes API failures
+ @retry(on_exception=ApiException, tries=7, delay=1, backoff=2, max_delay=60)
+ def fetch_k8s_secret(self, secret_name: str) -> Optional[V1Secret]:
+ if self._k8s_CoreV1_api is None:
+ logging.warning("CoreV1Api client is not initialized, returning None.")
+ return None
+
+ try:
+ return self._k8s_CoreV1_api.read_namespaced_secret(
+ name=secret_name,
+ namespace=self._rook_env.namespace
+ )
+ except Exception as e:
+ logging.warning(f"Failed to fetch secret '{secret_name}': {e}")
+ return None
+
+ @handle_orch_error
+ def generate_certificates(self, module_name: str) -> Optional[Dict[str, str]]:
+ api_response = None
+ cert, key = "", ""
+ supported_modules = ['prometheus']
+ if module_name not in supported_modules:
+ raise orchestrator.OrchestratorError(f'Unsupported module {module_name}. Supported module are: {supported_modules}')
+
+ secret_name = self.get_module_option(f'{module_name}_tls_secret_name')
+ try:
+ api_response = self.fetch_k8s_secret(secret_name)
+ except ApiException as e:
+ raise orchestrator.OrchestratorError(f'Unable to get certificates for {module_name}, error: {e}')
+
+ if api_response is None:
+ raise orchestrator.OrchestratorError(f'Unable to get certificates for {module_name}')
+ else:
+ cert = base64.b64decode(api_response.data.get('tls.crt','')).decode('utf-8')
+ key = base64.b64decode(api_response.data.get('tls.key', '')).decode('utf-8')
+ if cert == "" or key == "":
+ raise orchestrator.OrchestratorError(f'Unable to parse certificates for {module_name} module')
+
+ return {'cert': cert, 'key': key}