From: Patrick Donnelly Date: Mon, 22 Mar 2021 18:48:46 +0000 (-0700) Subject: pybind/mgr/volumes: log mutex locks to help debug deadlocks X-Git-Tag: v17.1.0~2378^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=cf2a1ad651208242868a2a7fdf25006f30ad9cc8;p=ceph.git pybind/mgr/volumes: log mutex locks to help debug deadlocks There is a hang in get_job which is holding the mutex [1]. This debug output is meant to help find this issue in upstream QA logs. [1] https://tracker.ceph.com/issues/49605#note-5 Signed-off-by: Patrick Donnelly --- diff --git a/src/pybind/mgr/mgr_util.py b/src/pybind/mgr/mgr_util.py index e303599525015..be03423518882 100644 --- a/src/pybind/mgr/mgr_util.py +++ b/src/pybind/mgr/mgr_util.py @@ -19,7 +19,7 @@ if sys.version_info >= (3, 3): else: from threading import _Timer as Timer -from typing import Tuple, Any, Callable, Optional, Dict, TYPE_CHECKING, TypeVar, List, Iterable, Generator, Generic +from typing import Tuple, Any, Callable, Optional, Dict, TYPE_CHECKING, TypeVar, List, Iterable, Generator, Generic, Iterator T = TypeVar('T') if TYPE_CHECKING: @@ -73,6 +73,26 @@ class RTimer(Timer): logger.error("task exception: %s", e) raise +@contextlib.contextmanager +def lock_timeout_log(lock: Lock, timeout: int = 5) -> Iterator[bool]: + start = time.time() + WARN_AFTER = 30 + warned = False + locked = False + while True: + logger.debug("locking {} with {} timeout".format(lock, timeout)) + locked = lock.acquire(timeout=timeout) + if locked: + logger.debug("locked {}".format(lock)) + break + now = time.time() + if not warned and now - start > WARN_AFTER: + logger.info("possible deadlock acquiring {}".format(lock)) + warned = True + yield + if locked: + lock.release() + class CephfsConnectionPool(object): class Connection(object): def __init__(self, mgr: Module_T, fs_name: str): diff --git a/src/pybind/mgr/volumes/fs/async_cloner.py b/src/pybind/mgr/volumes/fs/async_cloner.py index c042e624c58ca..28c0f385d32c8 100644 --- a/src/pybind/mgr/volumes/fs/async_cloner.py +++ b/src/pybind/mgr/volumes/fs/async_cloner.py @@ -6,6 +6,7 @@ import logging from contextlib import contextmanager import cephfs +from mgr_util import lock_timeout_log from .async_job import AsyncJobs from .exception import IndexException, MetadataMgrException, OpSmException, VolumeException @@ -329,7 +330,7 @@ class Cloner(AsyncJobs): # to persist the new state, async cloner accesses the volume in exclusive mode. # accessing the volume in exclusive mode here would lead to deadlock. assert track_idx is not None - with self.lock: + with lock_timeout_log(self.lock): with open_volume_lockless(self.vc, volname) as fs_handle: with open_group(fs_handle, self.vc.volspec, groupname) as group: with open_subvol(self.vc.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume: diff --git a/src/pybind/mgr/volumes/fs/async_job.py b/src/pybind/mgr/volumes/fs/async_job.py index 095ecdd36f8c1..6f7741fc99d8d 100644 --- a/src/pybind/mgr/volumes/fs/async_job.py +++ b/src/pybind/mgr/volumes/fs/async_job.py @@ -4,6 +4,7 @@ import logging import threading import traceback from collections import deque +from mgr_util import lock_timeout_log from .exception import NotImplementedException @@ -127,7 +128,7 @@ class AsyncJobs(threading.Thread): def run(self): log.debug("tick thread {} starting".format(self.name)) - with self.lock: + with lock_timeout_log(self.lock): while not self.stopping.is_set(): c = len(self.threads) if c > self.nr_concurrent_jobs: @@ -211,7 +212,7 @@ class AsyncJobs(threading.Thread): queue a volume for asynchronous job execution. """ log.info("queuing job for volume '{0}'".format(volname)) - with self.lock: + with lock_timeout_log(self.lock): if not volname in self.q: self.q.append(volname) self.jobs[volname] = [] @@ -263,21 +264,21 @@ class AsyncJobs(threading.Thread): return canceled def cancel_job(self, volname, job): - with self.lock: + with lock_timeout_log(self.lock): return self._cancel_job(volname, job) def cancel_jobs(self, volname): """ cancel all executing jobs for a given volume. """ - with self.lock: + with lock_timeout_log(self.lock): self._cancel_jobs(volname) def cancel_all_jobs(self): """ call all executing jobs for all volumes. """ - with self.lock: + with lock_timeout_log(self.lock): for volname in list(self.q): self._cancel_jobs(volname)