]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
pybind/mgr/volumes: log mutex locks to help debug deadlocks
authorPatrick Donnelly <pdonnell@redhat.com>
Mon, 22 Mar 2021 18:48:46 +0000 (11:48 -0700)
committerPatrick Donnelly <pdonnell@redhat.com>
Thu, 22 Apr 2021 16:17:41 +0000 (09:17 -0700)
There is a hang in get_job which is holding the mutex [1]. This debug
output is meant to help find this issue in upstream QA logs.

[1] https://tracker.ceph.com/issues/49605#note-5
Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
(cherry picked from commit cf2a1ad651208242868a2a7fdf25006f30ad9cc8)

src/pybind/mgr/mgr_util.py
src/pybind/mgr/volumes/fs/async_cloner.py
src/pybind/mgr/volumes/fs/async_job.py

index e30359952501520bf2182b88762f89103e32f1cc..be034235188825195b9fcb0cea0f0b03f57449f7 100644 (file)
@@ -19,7 +19,7 @@ if sys.version_info >= (3, 3):
 else:
     from threading import _Timer as Timer
 
-from typing import Tuple, Any, Callable, Optional, Dict, TYPE_CHECKING, TypeVar, List, Iterable, Generator, Generic
+from typing import Tuple, Any, Callable, Optional, Dict, TYPE_CHECKING, TypeVar, List, Iterable, Generator, Generic, Iterator
 T = TypeVar('T')
 
 if TYPE_CHECKING:
@@ -73,6 +73,26 @@ class RTimer(Timer):
             logger.error("task exception: %s", e)
             raise
 
+@contextlib.contextmanager
+def lock_timeout_log(lock: Lock, timeout: int = 5) -> Iterator[bool]:
+    start = time.time()
+    WARN_AFTER = 30
+    warned = False
+    locked = False
+    while True:
+        logger.debug("locking {} with {} timeout".format(lock, timeout))
+        locked = lock.acquire(timeout=timeout)
+        if locked:
+            logger.debug("locked {}".format(lock))
+            break
+        now = time.time()
+        if not warned and now - start > WARN_AFTER:
+            logger.info("possible deadlock acquiring {}".format(lock))
+            warned = True
+    yield
+    if locked:
+        lock.release()
+
 class CephfsConnectionPool(object):
     class Connection(object):
         def __init__(self, mgr: Module_T, fs_name: str):
index c042e624c58ca62212b553fcf736753163be16cc..28c0f385d32c8051c3f9d6452683c82f340592f8 100644 (file)
@@ -6,6 +6,7 @@ import logging
 from contextlib import contextmanager
 
 import cephfs
+from mgr_util import lock_timeout_log
 
 from .async_job import AsyncJobs
 from .exception import IndexException, MetadataMgrException, OpSmException, VolumeException
@@ -329,7 +330,7 @@ class Cloner(AsyncJobs):
             # to persist the new state, async cloner accesses the volume in exclusive mode.
             # accessing the volume in exclusive mode here would lead to deadlock.
             assert track_idx is not None
-            with self.lock:
+            with lock_timeout_log(self.lock):
                 with open_volume_lockless(self.vc, volname) as fs_handle:
                     with open_group(fs_handle, self.vc.volspec, groupname) as group:
                         with open_subvol(self.vc.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume:
index 095ecdd36f8c1dce189cc8a40e275b2b015d6fb1..6f7741fc99d8def00e7ea2cb7d03a0aa045f96f2 100644 (file)
@@ -4,6 +4,7 @@ import logging
 import threading
 import traceback
 from collections import deque
+from mgr_util import lock_timeout_log
 
 from .exception import NotImplementedException
 
@@ -127,7 +128,7 @@ class AsyncJobs(threading.Thread):
 
     def run(self):
         log.debug("tick thread {} starting".format(self.name))
-        with self.lock:
+        with lock_timeout_log(self.lock):
             while not self.stopping.is_set():
                 c = len(self.threads)
                 if c > self.nr_concurrent_jobs:
@@ -211,7 +212,7 @@ class AsyncJobs(threading.Thread):
         queue a volume for asynchronous job execution.
         """
         log.info("queuing job for volume '{0}'".format(volname))
-        with self.lock:
+        with lock_timeout_log(self.lock):
             if not volname in self.q:
                 self.q.append(volname)
                 self.jobs[volname] = []
@@ -263,21 +264,21 @@ class AsyncJobs(threading.Thread):
         return canceled
 
     def cancel_job(self, volname, job):
-        with self.lock:
+        with lock_timeout_log(self.lock):
             return self._cancel_job(volname, job)
 
     def cancel_jobs(self, volname):
         """
         cancel all executing jobs for a given volume.
         """
-        with self.lock:
+        with lock_timeout_log(self.lock):
             self._cancel_jobs(volname)
 
     def cancel_all_jobs(self):
         """
         call all executing jobs for all volumes.
         """
-        with self.lock:
+        with lock_timeout_log(self.lock):
             for volname in list(self.q):
                 self._cancel_jobs(volname)