def __str__(self) -> str:
return "{0} ({1})".format(self.errno, self.error_str)
+class RTimer(Timer):
+ """
+ recurring timer variant of Timer
+ """
+ @no_type_check
+ def run(self):
+ try:
+ while not self.finished.is_set():
+ self.finished.wait(self.interval)
+ self.function(*self.args, **self.kwargs)
+ self.finished.set()
+ except Exception as e:
+ logger.error("task exception: %s", e)
+ raise
class CephfsConnectionPool(object):
class Connection(object):
logger.info("abort done from cephfs '{0}'".format(self.fs_name))
self.fs = None
- class RTimer(Timer):
- """
- recurring timer variant of Timer
- """
- @no_type_check
- def run(self):
- try:
- while not self.finished.is_set():
- self.finished.wait(self.interval)
- self.function(*self.args, **self.kwargs)
- self.finished.set()
- except Exception as e:
- logger.error("CephfsConnectionPool.RTimer: %s", e)
- raise
-
# TODO: make this configurable
TIMER_TASK_RUN_INTERVAL = 30.0 # seconds
CONNECTION_IDLE_INTERVAL = 60.0 # seconds
self.connections: Dict[str, CephfsConnectionPool.Connection] = {}
self.lock = Lock()
self.cond = Condition(self.lock)
- self.timer_task = CephfsConnectionPool.RTimer(
- CephfsConnectionPool.TIMER_TASK_RUN_INTERVAL,
- self.cleanup_connections)
+ self.timer_task = RTimer(CephfsConnectionPool.TIMER_TASK_RUN_INTERVAL,
+ self.cleanup_connections)
self.timer_task.start()
def cleanup_connections(self) -> None: