]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/rbd_support: recover from rados client blocklisting
authorRamana Raja <rraja@redhat.com>
Wed, 15 Feb 2023 15:12:54 +0000 (10:12 -0500)
committerRamana Raja <rraja@redhat.com>
Mon, 8 May 2023 20:45:33 +0000 (16:45 -0400)
In certain scenarios the OSDs were slow to process RBD requests.
This lead to the rbd_support module's RBD client not being able to
gracefully handover a RBD exclusive lock to another RBD client.
After the condition persisted for some time, the other RBD client
forcefully acquired the lock by blocklisting the rbd_support module's
RBD client, and consequently blocklisted the module's RADOS client. The
rbd_support module stopped working. To recover the module, the entire
mgr service had to be restarted which reloaded other mgr modules.

Instead of recovering the rbd_support module from client blocklisting
by being disruptive to other mgr modules, recover the module
automatically without restarting the mgr serivce. On client getting
blocklisted, shutdown the module's handlers and blocklisted client,
create a new rados client for the module, and start the new handlers.

Fixes: https://tracker.ceph.com/issues/56724
Signed-off-by: Ramana Raja <rraja@redhat.com>
src/pybind/mgr/mgr_module.py
src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py
src/pybind/mgr/rbd_support/module.py
src/pybind/mgr/rbd_support/perf.py
src/pybind/mgr/rbd_support/schedule.py
src/pybind/mgr/rbd_support/task.py
src/pybind/mgr/rbd_support/trash_purge_schedule.py

index 1db322334cbaa19ccf17cbfdb1054d4f71dc9590..2fabbae87c52ea60a926d5e248e3e23a2d89ddf0 100644 (file)
@@ -1342,6 +1342,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
             addrs = self._rados.get_addrs()
             self._rados.shutdown()
             self._ceph_unregister_client(addrs)
+            self._rados = None
 
     @API.expose
     def get(self, data_name: str) -> Any:
index 395cbdc9fa92145be21cb53a067060b012888e09..921a65c094536ebe7da3c582196056173dd88d5e 100644 (file)
@@ -48,6 +48,7 @@ class CreateSnapshotRequests:
         self.wait_for_pending()
 
     def wait_for_pending(self) -> None:
+        self.log.debug("CreateSnapshotRequests.wait_for_pending")
         with self.lock:
             while self.pending:
                 self.condition.wait()
@@ -330,7 +331,6 @@ class MirrorSnapshotScheduleHandler:
 
     lock = Lock()
     condition = Condition(lock)
-    thread = None
 
     def __init__(self, module: Any) -> None:
         self.module = module
@@ -340,16 +340,23 @@ class MirrorSnapshotScheduleHandler:
 
         self.init_schedule_queue()
 
+        self.stop_thread = False
         self.thread = Thread(target=self.run)
         self.thread.start()
 
-    def _cleanup(self) -> None:
+    def shutdown(self) -> None:
+        self.log.info("MirrorSnapshotScheduleHandler: shutting down")
+        self.stop_thread = True
+        if self.thread.is_alive():
+            self.log.debug("MirrorSnapshotScheduleHandler: joining thread")
+            self.thread.join()
         self.create_snapshot_requests.wait_for_pending()
+        self.log.info("MirrorSnapshotScheduleHandler: shut down")
 
     def run(self) -> None:
         try:
             self.log.info("MirrorSnapshotScheduleHandler: starting")
-            while True:
+            while not self.stop_thread:
                 refresh_delay = self.refresh_images()
                 with self.lock:
                     (image_spec, wait_time) = self.dequeue()
@@ -361,6 +368,9 @@ class MirrorSnapshotScheduleHandler:
                 with self.lock:
                     self.enqueue(datetime.now(), pool_id, namespace, image_id)
 
+        except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
+            self.log.exception("MirrorSnapshotScheduleHandler: client blocklisted")
+            self.module.client_blocklisted.set()
         except Exception as ex:
             self.log.fatal("Fatal runtime error: {}\n{}".format(
                 ex, traceback.format_exc()))
@@ -451,6 +461,8 @@ class MirrorSnapshotScheduleHandler:
                     self.log.debug(
                         "load_pool_images: adding image {}".format(name))
                     images[pool_id][namespace][image_id] = name
+        except rbd.ConnectionShutdown:
+            raise
         except Exception as e:
             self.log.error(
                 "load_pool_images: exception when scanning pool {}: {}".format(
index 86bb385883052ab75768f0ccb905ea8fa8383792..a190930ecc604b22e81500af95eefdcfc6f8d768 100644 (file)
@@ -8,9 +8,11 @@ import functools
 import inspect
 import rados
 import rbd
+import traceback
 from typing import cast, Any, Callable, Optional, Tuple, TypeVar
 
 from mgr_module import CLIReadCommand, CLIWriteCommand, MgrModule, Option
+from threading import Thread, Event
 
 from .common import NotAuthorizedError
 from .mirror_snapshot_schedule import image_validator, namespace_validator, \
@@ -35,6 +37,8 @@ FuncT = TypeVar('FuncT', bound=Callable)
 def with_latest_osdmap(func: FuncT) -> FuncT:
     @functools.wraps(func)
     def wrapper(self: 'Module', *args: Any, **kwargs: Any) -> Tuple[int, str, str]:
+        if not self.module_ready:
+            return -errno.EAGAIN, "", ""
         # ensure we have latest pools available
         self.rados.wait_for_latest_osdmap()
         try:
@@ -46,6 +50,10 @@ def with_latest_osdmap(func: FuncT) -> FuncT:
                 # log the full traceback but don't send it to the CLI user
                 self.log.exception("Fatal runtime error: ")
                 raise
+        except (rados.ConnectionShutdown, rbd.ConnectionShutdown) as ex:
+            self.log.debug("with_latest_osdmap: client blocklisted")
+            self.client_blocklisted.set()
+            return -errno.EAGAIN, "", str(ex)
         except rados.Error as ex:
             return -ex.errno, "", str(ex)
         except rbd.OSError as ex:
@@ -74,11 +82,46 @@ class Module(MgrModule):
 
     def __init__(self, *args: Any, **kwargs: Any) -> None:
         super(Module, self).__init__(*args, **kwargs)
+        self.client_blocklisted = Event()
+        self.recovery_thread = Thread(target=self.run)
+        self.recovery_thread.start()
+        self.setup()
+
+    def setup(self) -> None:
+        self.log.info("starting setup")
+        # new client is created and registed in the MgrMap implicitly
+        # as 'rados' is a property attribute.
         self.rados.wait_for_latest_osdmap()
         self.mirror_snapshot_schedule = MirrorSnapshotScheduleHandler(self)
         self.perf = PerfHandler(self)
         self.task = TaskHandler(self)
         self.trash_purge_schedule = TrashPurgeScheduleHandler(self)
+        self.log.info("setup complete")
+        self.module_ready = True
+
+    def run(self) -> None:
+        self.log.info("recovery thread starting")
+        try:
+            while True:
+                # block until rados client is blocklisted
+                self.client_blocklisted.wait()
+                self.log.info("restarting")
+                self.shutdown()
+                self.client_blocklisted.clear()
+                self.setup()
+                self.log.info("restarted")
+        except Exception as ex:
+            self.log.fatal("Fatal runtime error: {}\n{}".format(
+                ex, traceback.format_exc()))
+
+    def shutdown(self) -> None:
+        self.module_ready = False
+        self.mirror_snapshot_schedule.shutdown()
+        self.trash_purge_schedule.shutdown()
+        self.task.shutdown()
+        self.perf.shutdown()
+        # shut down client and deregister it from MgrMap
+        super().shutdown()
 
     @CLIWriteCommand('rbd mirror snapshot schedule add')
     @with_latest_osdmap
index 572d75f5b8b9b77b88d35e9aa19518c2a4b58fcd..4bcf0a18c56b71c043d78d9d451c0dc44518c680 100644 (file)
@@ -71,7 +71,6 @@ class PerfHandler:
     lock = Lock()
     query_condition = Condition(lock)
     refresh_condition = Condition(lock)
-    thread = None
 
     image_name_cache: Dict[Tuple[int, str], Dict[str, str]] = {}
     image_name_refresh_time = datetime.fromtimestamp(0)
@@ -118,13 +117,22 @@ class PerfHandler:
         self.module = module
         self.log = module.log
 
+        self.stop_thread = False
         self.thread = Thread(target=self.run)
         self.thread.start()
 
+    def shutdown(self) -> None:
+        self.log.info("PerfHandler: shutting down")
+        self.stop_thread = True
+        if self.thread.is_alive():
+            self.log.debug("PerfHandler: joining thread")
+            self.thread.join()
+        self.log.info("PerfHandler: shut down")
+
     def run(self) -> None:
         try:
             self.log.info("PerfHandler: starting")
-            while True:
+            while not self.stop_thread:
                 with self.lock:
                     self.scrub_expired_queries()
                     self.process_raw_osd_perf_counters()
@@ -135,6 +143,9 @@ class PerfHandler:
 
                 self.log.debug("PerfHandler: tick")
 
+        except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
+            self.log.exception("PerfHandler: client blocklisted")
+            self.module.client_blocklisted.set()
         except Exception as ex:
             self.log.fatal("Fatal runtime error: {}\n{}".format(
                 ex, traceback.format_exc()))
index 7a24d58ce773230d7c94b2f4a582b9bd5d940da3..c6ce99182afd474b8aa1836d36ede1432420d105 100644 (file)
@@ -418,6 +418,8 @@ class Schedules:
                 with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx:
                     self.load_from_pool(ioctx, namespace_validator,
                                         image_validator)
+            except rados.ConnectionShutdown:
+                raise
             except rados.Error as e:
                 self.handler.log.error(
                     "Failed to load schedules for pool {}: {}".format(
index 8397ed7a0c2aa31866c504ad9720202b1b6b2a7e..7dba510baa781d415c996d021ae2770e279ee92e 100644 (file)
@@ -153,7 +153,6 @@ MigrationStatusT = Dict[str, str]
 class TaskHandler:
     lock = Lock()
     condition = Condition(lock)
-    thread = None
 
     in_progress_task = None
     tasks_by_sequence: Dict[int, Task] = dict()
@@ -170,6 +169,7 @@ class TaskHandler:
         with self.lock:
             self.init_task_queue()
 
+        self.stop_thread = False
         self.thread = Thread(target=self.run)
         self.thread.start()
 
@@ -191,10 +191,18 @@ class TaskHandler:
         return (match.group(1) or self.default_pool_name, match.group(2) or '',
                 match.group(3))
 
+    def shutdown(self) -> None:
+        self.log.info("TaskHandler: shutting down")
+        self.stop_thread = True
+        if self.thread.is_alive():
+            self.log.debug("TaskHandler: joining thread")
+            self.thread.join()
+        self.log.info("TaskHandler: shut down")
+
     def run(self) -> None:
         try:
             self.log.info("TaskHandler: starting")
-            while True:
+            while not self.stop_thread:
                 with self.lock:
                     now = datetime.now()
                     for sequence in sorted([sequence for sequence, task
@@ -205,6 +213,9 @@ class TaskHandler:
                     self.condition.wait(5)
                     self.log.debug("TaskHandler: tick")
 
+        except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
+            self.log.exception("TaskHandler: client blocklisted")
+            self.module.client_blocklisted.set()
         except Exception as ex:
             self.log.fatal("Fatal runtime error: {}\n{}".format(
                 ex, traceback.format_exc()))
@@ -428,6 +439,9 @@ class TaskHandler:
                 self.complete_progress(task)
                 self.remove_task(None, task)
 
+        except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
+            raise
+
         except (rados.Error, rbd.Error) as e:
             self.log.error("execute_task: {}".format(e))
             task.retry_message = "{}".format(e)
index ca1f111e66c4f0d912036b9cc005a07efd244002..9cb349fcac0e7f729f43db07cf0862dc62a73766 100644 (file)
@@ -18,7 +18,6 @@ class TrashPurgeScheduleHandler:
 
     lock = Lock()
     condition = Condition(lock)
-    thread = None
 
     def __init__(self, module: Any) -> None:
         self.module = module
@@ -27,13 +26,22 @@ class TrashPurgeScheduleHandler:
 
         self.init_schedule_queue()
 
+        self.stop_thread = False
         self.thread = Thread(target=self.run)
         self.thread.start()
 
+    def shutdown(self) -> None:
+        self.log.info("TrashPurgeScheduleHandler: shutting down")
+        self.stop_thread = True
+        if self.thread.is_alive():
+            self.log.debug("TrashPurgeScheduleHandler: joining thread")
+            self.thread.join()
+        self.log.info("TrashPurgeScheduleHandler: shut down")
+
     def run(self) -> None:
         try:
             self.log.info("TrashPurgeScheduleHandler: starting")
-            while True:
+            while not self.stop_thread:
                 refresh_delay = self.refresh_pools()
                 with self.lock:
                     (ns_spec, wait_time) = self.dequeue()
@@ -45,6 +53,9 @@ class TrashPurgeScheduleHandler:
                 with self.lock:
                     self.enqueue(datetime.now(), pool_id, namespace)
 
+        except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
+            self.log.exception("TrashPurgeScheduleHandler: client blocklisted")
+            self.module.client_blocklisted.set()
         except Exception as ex:
             self.log.fatal("Fatal runtime error: {}\n{}".format(
                 ex, traceback.format_exc()))
@@ -54,6 +65,8 @@ class TrashPurgeScheduleHandler:
             with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
                 ioctx.set_namespace(namespace)
                 rbd.RBD().trash_purge(ioctx, datetime.now())
+        except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
+            raise
         except Exception as e:
             self.log.error("exception when purging {}/{}: {}".format(
                 pool_id, namespace, e))
@@ -114,6 +127,8 @@ class TrashPurgeScheduleHandler:
             pool_namespaces += rbd.RBD().namespace_list(ioctx)
         except rbd.OperationNotSupported:
             self.log.debug("namespaces not supported")
+        except rbd.ConnectionShutdown:
+            raise
         except Exception as e:
             self.log.error("exception when scanning pool {}: {}".format(
                 pool_name, e))