else:
from threading import _Timer as Timer
-try:
- from typing import Tuple, Any, Callable, Optional, Dict
-except ImportError:
- TYPE_CHECKING = False # just for type checking
+from typing import Tuple, Any, Callable, Optional, Dict, TYPE_CHECKING, TypeVar, List, Iterable, Generator, Generic
+T = TypeVar('T')
+
+if TYPE_CHECKING:
+ from mgr_module import MgrModule
+
+Module_T = TypeVar('Module_T', bound="MgrModule")
(
BLACK,
class CephfsConnectionException(Exception):
- def __init__(self, error_code, error_message):
+ def __init__(self, error_code: int, error_message: str):
self.errno = error_code
self.error_str = error_message
- def to_tuple(self):
+ def to_tuple(self) -> Tuple[int, str, str]:
return self.errno, "", self.error_str
- def __str__(self):
+ def __str__(self) -> str:
return "{0} ({1})".format(self.errno, self.error_str)
class CephfsConnectionPool(object):
class Connection(object):
- def __init__(self, mgr, fs_name):
- self.fs = None
+ def __init__(self, mgr: Module_T, fs_name: str):
+ self.fs: Optional["cephfs.LibCephFS"] = None
self.mgr = mgr
self.fs_name = fs_name
self.ops_in_progress = 0
self.last_used = time.time()
self.fs_id = self.get_fs_id()
- def get_fs_id(self):
+ def get_fs_id(self) -> int:
fs_map = self.mgr.get('fs_map')
for fs in fs_map['filesystems']:
if fs['mdsmap']['fs_name'] == self.fs_name:
raise CephfsConnectionException(
-errno.ENOENT, "FS '{0}' not found".format(self.fs_name))
- def get_fs_handle(self):
+ def get_fs_handle(self) -> "cephfs.LibCephFS":
self.last_used = time.time()
self.ops_in_progress += 1
return self.fs
- def put_fs_handle(self, notify):
+ def put_fs_handle(self, notify: Callable) -> None:
assert self.ops_in_progress > 0
self.ops_in_progress -= 1
if self.ops_in_progress == 0:
notify()
- def del_fs_handle(self, waiter):
+ def del_fs_handle(self, waiter: Optional[Callable]) -> None:
if waiter:
while self.ops_in_progress != 0:
waiter()
else:
self.abort()
- def is_connection_valid(self):
+ def is_connection_valid(self) -> bool:
fs_id = None
try:
fs_id = self.get_fs_id()
logger.debug("self.fs_id={0}, fs_id={1}".format(self.fs_id, fs_id))
return self.fs_id == fs_id
- def is_connection_idle(self, timeout):
+ def is_connection_idle(self, timeout: float) -> bool:
return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout))
- def connect(self):
+ def connect(self) -> None:
assert self.ops_in_progress == 0
logger.debug("Connecting to cephfs '{0}'".format(self.fs_name))
self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados)
logger.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
self.mgr._ceph_register_client(self.fs.get_addrs())
- def disconnect(self):
+ def disconnect(self) -> None:
try:
assert self.fs
assert self.ops_in_progress == 0
logger.debug("disconnect: ({0})".format(e))
raise
- def abort(self):
+ def abort(self) -> None:
assert self.fs
assert self.ops_in_progress == 0
logger.info("aborting connection from cephfs '{0}'".format(self.fs_name))
TIMER_TASK_RUN_INTERVAL = 30.0 # seconds
CONNECTION_IDLE_INTERVAL = 60.0 # seconds
- def __init__(self, mgr):
+ def __init__(self, mgr: Module_T):
self.mgr = mgr
- self.connections = {}
+ self.connections: Dict[str, CephfsConnectionPool.Connection] = {}
self.lock = Lock()
self.cond = Condition(self.lock)
self.timer_task = CephfsConnectionPool.RTimer(
self.cleanup_connections)
self.timer_task.start()
- def cleanup_connections(self):
+ def cleanup_connections(self) -> None:
with self.lock:
logger.info("scanning for idle connections..")
idle_fs = [fs_name for fs_name, conn in self.connections.items()
logger.info("cleaning up connection for '{}'".format(fs_name))
self._del_fs_handle(fs_name)
- def get_fs_handle(self, fs_name):
+ def get_fs_handle(self, fs_name: str) -> "cephfs.LibCephFS":
with self.lock:
conn = None
try:
self.connections[fs_name] = conn
return conn.get_fs_handle()
- def put_fs_handle(self, fs_name):
+ def put_fs_handle(self, fs_name: str) -> None:
with self.lock:
conn = self.connections.get(fs_name, None)
if conn:
conn.put_fs_handle(notify=lambda: self.cond.notifyAll())
- def _del_fs_handle(self, fs_name, wait=False):
+ def _del_fs_handle(self, fs_name: str, wait: bool = False) -> None:
conn = self.connections.pop(fs_name, None)
if conn:
conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
- def del_fs_handle(self, fs_name, wait=False):
+ def del_fs_handle(self, fs_name: str, wait: bool = False) -> None:
with self.lock:
self._del_fs_handle(fs_name, wait)
- def del_all_handles(self):
+ def del_all_handles(self) -> None:
with self.lock:
for fs_name in list(self.connections.keys()):
logger.info("waiting for pending ops for '{}'".format(fs_name))
assert len(self.connections) == 0
-class CephfsClient(object):
- def __init__(self, mgr):
+class CephfsClient(Generic[Module_T]):
+ def __init__(self, mgr: Module_T):
self.mgr = mgr
self.stopping = Event()
self.connection_pool = CephfsConnectionPool(self.mgr)
- def is_stopping(self):
+ def is_stopping(self) -> bool:
return self.stopping.is_set()
- def shutdown(self):
+ def shutdown(self) -> None:
logger.info("shutting down")
# first, note that we're shutting down
self.stopping.set()
# second, delete all libcephfs handles from connection pool
self.connection_pool.del_all_handles()
- def get_fs(self, fs_name):
+ def get_fs(self, fs_name: str) -> Optional["cephfs.LibCephFS"]:
fs_map = self.mgr.get('fs_map')
for fs in fs_map['filesystems']:
if fs['mdsmap']['fs_name'] == fs_name:
return fs
return None
- def get_mds_names(self, fs_name):
+ def get_mds_names(self, fs_name: str) -> List[str]:
fs = self.get_fs(fs_name)
if fs is None:
return []
return [mds['name'] for mds in fs['mdsmap']['info'].values()]
- def get_metadata_pool(self, fs_name):
+ def get_metadata_pool(self, fs_name: str) -> Optional[str]:
fs = self.get_fs(fs_name)
if fs:
return fs['mdsmap']['metadata_pool']
@contextlib.contextmanager
-def open_filesystem(fsc, fs_name):
+def open_filesystem(fsc: CephfsClient, fs_name: str) -> Generator["cephfs.LibCephFS", None, None]:
"""
Open a volume with shared access.
This API is to be used as a context manager.
fsc.connection_pool.put_fs_handle(fs_name)
-def colorize(msg, color, dark=False):
+def colorize(msg: str, color: int, dark: bool = False) -> str:
"""
Decorate `msg` with escape sequences to give the requested color
"""
+ msg + RESET_SEQ
-def bold(msg):
+def bold(msg: str) -> str:
"""
Decorate `msg` with escape sequences to make it appear bold
"""
return BOLD_SEQ + msg + RESET_SEQ
-def format_units(n, width, colored, decimal):
+def format_units(n: int, width: int, colored: bool, decimal: bool) -> str:
"""
Format a number without units, so as to fit into `width` characters, substituting
an appropriate unit suffix.
return formatted
-def format_dimless(n, width, colored=False):
+def format_dimless(n: int, width: int, colored: bool = False) -> str:
return format_units(n, width, colored, decimal=True)
-def format_bytes(n, width, colored=False):
+def format_bytes(n: int, width: int, colored: bool = False) -> str:
return format_units(n, width, colored, decimal=False)
-def merge_dicts(*args):
- # type: (dict) -> dict
+def merge_dicts(*args: Dict[T, Any]) -> Dict[T, Any]:
"""
>>> merge_dicts({1:2}, {3:4})
{1: 2, 3: 4}
def get_default_addr():
# type: () -> str
- def is_ipv6_enabled():
+ def is_ipv6_enabled() -> bool:
try:
sock = socket.socket(socket.AF_INET6)
with contextlib.closing(sock):
pkey_fname, cert_fname, str(e)))
-def get_most_recent_rate(rates):
+def get_most_recent_rate(rates: Optional[List[Tuple[float, float]]]) -> float:
""" Get most recent rate from rates
:param rates: The derivative between all time series data points [time in seconds, value]
return 0.0
return rates[-1][1]
-def get_time_series_rates(data):
+def get_time_series_rates(data: List[Tuple[float, float]]) -> List[Tuple[float, float]]:
""" Rates from time series data
:param data: Time series data [time in seconds, value]
data = _filter_time_series(data)
if not data:
return []
- return [(data2[0], _derivative(data1, data2)) for data1, data2 in
+ return [(data2[0], _derivative(data1, data2) if data1 is not None else 0.0) for data1, data2 in
_pairwise(data)]
-def _filter_time_series(data):
+def _filter_time_series(data: List[Tuple[float, float]]) -> List[Tuple[float, float]]:
""" Filters time series data
Filters out samples with the same timestamp in given time series data.
return filtered
-def _derivative(p1, p2):
+def _derivative(p1: Tuple[float, float], p2: Tuple[float, float]) -> float:
""" Derivative between two time series data points
:param p1: Time series data [time in seconds, value]
return (p2[1] - p1[1]) / float(p2[0] - p1[0])
-def _pairwise(iterable):
+def _pairwise(iterable: Iterable[T]) -> Generator[Tuple[Optional[T], T], None, None]:
it = iter(iterable)
a = next(it, None)
a = b
-def to_pretty_timedelta(n):
+def to_pretty_timedelta(n: datetime.timedelta) -> str:
if n < datetime.timedelta(seconds=120):
return str(n.seconds) + 's'
if n < datetime.timedelta(minutes=120):
return str(n.days // 365) + 'y'
-def profile_method(skip_attribute=False):
+def profile_method(skip_attribute: bool = False) -> Callable[[Callable[..., T]], Callable[..., T]]:
"""
Decorator for methods of the Module class. Logs the name of the given
function f with the time it takes to execute it.
"""
- def outer(f):
+ def outer(f: Callable[..., T]) -> Callable[..., T]:
@wraps(f)
- def wrapper(*args, **kwargs):
+ def wrapper(*args: Any, **kwargs: Any) -> T:
self = args[0]
t = time.time()
self.log.debug('Starting method {}.'.format(f.__name__))