else:
from threading import _Timer as Timer
-from typing import Tuple, Any, Callable, Optional, Dict, TYPE_CHECKING, TypeVar, List, Iterable, Generator, Generic
+from typing import Tuple, Any, Callable, Optional, Dict, TYPE_CHECKING, TypeVar, List, Iterable, Generator, Generic, Iterator
T = TypeVar('T')
if TYPE_CHECKING:
logger.error("task exception: %s", e)
raise
+@contextlib.contextmanager
+def lock_timeout_log(lock: Lock, timeout: int = 5) -> Iterator[bool]:
+ start = time.time()
+ WARN_AFTER = 30
+ warned = False
+ locked = False
+ while True:
+ logger.debug("locking {} with {} timeout".format(lock, timeout))
+ locked = lock.acquire(timeout=timeout)
+ if locked:
+ logger.debug("locked {}".format(lock))
+ break
+ now = time.time()
+ if not warned and now - start > WARN_AFTER:
+ logger.info("possible deadlock acquiring {}".format(lock))
+ warned = True
+ yield
+ if locked:
+ lock.release()
+
class CephfsConnectionPool(object):
class Connection(object):
def __init__(self, mgr: Module_T, fs_name: str):
from contextlib import contextmanager
import cephfs
+from mgr_util import lock_timeout_log
from .async_job import AsyncJobs
from .exception import IndexException, MetadataMgrException, OpSmException, VolumeException
# to persist the new state, async cloner accesses the volume in exclusive mode.
# accessing the volume in exclusive mode here would lead to deadlock.
assert track_idx is not None
- with self.lock:
+ with lock_timeout_log(self.lock):
with open_volume_lockless(self.vc, volname) as fs_handle:
with open_group(fs_handle, self.vc.volspec, groupname) as group:
with open_subvol(self.vc.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume:
import threading
import traceback
from collections import deque
+from mgr_util import lock_timeout_log
from .exception import NotImplementedException
def run(self):
log.debug("tick thread {} starting".format(self.name))
- with self.lock:
+ with lock_timeout_log(self.lock):
while not self.stopping.is_set():
c = len(self.threads)
if c > self.nr_concurrent_jobs:
queue a volume for asynchronous job execution.
"""
log.info("queuing job for volume '{0}'".format(volname))
- with self.lock:
+ with lock_timeout_log(self.lock):
if not volname in self.q:
self.q.append(volname)
self.jobs[volname] = []
return canceled
def cancel_job(self, volname, job):
- with self.lock:
+ with lock_timeout_log(self.lock):
return self._cancel_job(volname, job)
def cancel_jobs(self, volname):
"""
cancel all executing jobs for a given volume.
"""
- with self.lock:
+ with lock_timeout_log(self.lock):
self._cancel_jobs(volname)
def cancel_all_jobs(self):
"""
call all executing jobs for all volumes.
"""
- with self.lock:
+ with lock_timeout_log(self.lock):
for volname in list(self.q):
self._cancel_jobs(volname)