import logging
import sys
from ipaddress import ip_address
-from threading import Lock, Condition, Event
+from threading import Lock, Condition
from typing import no_type_check, NewType
import urllib
from functools import wraps
def __str__(self) -> str:
return "{0} ({1})".format(self.errno, self.error_str)
+
class RTimer(Timer):
"""
recurring timer variant of Timer
logger.error("task exception: %s", e)
raise
+
@contextlib.contextmanager
def lock_timeout_log(lock: Lock, timeout: int = 5) -> Iterator[None]:
start = time.time()
return fs_list
-
@contextlib.contextmanager
def open_filesystem(fsc: CephfsClient, fs_name: str) -> Generator["cephfs.LibCephFS", None, None]:
"""
:param organisation: String representing the Organisation(O) RDN (default='Ceph')
:param common_name: String representing the Common Name(CN) RDN (default='mgr')
- :param dname: Optional dictionary containing RDNs to use for crt/key generation
+ :param dname: Optional dictionary containing RDNs to use for crt/key generation
:return: ssl crt and key in utf-8 format
raise ServerConfigException(
'Invalid certificate {}: {}'.format(cert_fname, str(e)))
-def get_cert_issuer_info(crt: str) -> Tuple[Optional[str],Optional[str]]:
+
+def get_cert_issuer_info(crt: str) -> Tuple[Optional[str], Optional[str]]:
"""Basic validation of a ca cert"""
from OpenSSL import crypto, SSL
except (ValueError, crypto.Error) as e:
raise ServerConfigException(f'Invalid certificate key: {e}')
+
def verify_tls(crt, key):
# type: (str, str) -> None
verify_cacrt_content(crt)
raise ServerConfigException(f'Invalid cert/key pair: {e}')
-
def verify_tls_files(cert_fname, pkey_fname):
# type: (str, str) -> None
"""Basic checks for TLS certificate and key files
return 0.0
return rates[-1][1]
+
def get_time_series_rates(data: List[Tuple[float, float]]) -> List[Tuple[float, float]]:
""" Rates from time series data
return [(data2[0], _derivative(data1, data2) if data1 is not None else 0.0) for data1, data2 in
_pairwise(data)]
+
def name_to_config_section(name: str) -> ConfEntity:
"""
Map from daemon names to ceph entity names (as seen in config)
if n < datetime.timedelta(hours=48):
return str(int(n.total_seconds()) // 3600) + 'h'
if n < datetime.timedelta(days=14):
- return str(int(n.total_seconds()) // (3600*24)) + 'd'
- if n < datetime.timedelta(days=7*12):
- return str(int(n.total_seconds()) // (3600*24*7)) + 'w'
- if n < datetime.timedelta(days=365*2):
- return str(int(n.total_seconds()) // (3600*24*30)) + 'M'
- return str(int(n.total_seconds()) // (3600*24*365)) + 'y'
+ return str(int(n.total_seconds()) // (3600 * 24)) + 'd'
+ if n < datetime.timedelta(days=7 * 12):
+ return str(int(n.total_seconds()) // (3600 * 24 * 7)) + 'w'
+ if n < datetime.timedelta(days=365 * 2):
+ return str(int(n.total_seconds()) // (3600 * 24 * 30)) + 'M'
+ return str(int(n.total_seconds()) // (3600 * 24 * 365)) + 'y'
def profile_method(skip_attribute: bool = False) -> Callable[[Callable[..., T]], Callable[..., T]]: