]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr/rbd_support: recover from rados client blocklisting
authorRamana Raja <rraja@redhat.com>
Wed, 15 Feb 2023 15:12:54 +0000 (10:12 -0500)
committerRamana Raja <rraja@redhat.com>
Tue, 8 Aug 2023 20:26:10 +0000 (16:26 -0400)
In certain scenarios the OSDs were slow to process RBD requests.
This lead to the rbd_support module's RBD client not being able to
gracefully handover a RBD exclusive lock to another RBD client.
After the condition persisted for some time, the other RBD client
forcefully acquired the lock by blocklisting the rbd_support module's
RBD client, and consequently blocklisted the module's RADOS client. The
rbd_support module stopped working. To recover the module, the entire
mgr service had to be restarted which reloaded other mgr modules.

Instead of recovering the rbd_support module from client blocklisting
by being disruptive to other mgr modules, recover the module
automatically without restarting the mgr serivce. On client getting
blocklisted, shutdown the module's handlers and blocklisted client,
create a new rados client for the module, and start the new handlers.

Fixes: https://tracker.ceph.com/issues/56724
Signed-off-by: Ramana Raja <rraja@redhat.com>
(cherry picked from commit cc0468738e5ddb98f7ac10b50e54446197b9c9a0)

Conflicts:
src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py
src/pybind/mgr/rbd_support/module.py
src/pybind/mgr/rbd_support/perf.py
src/pybind/mgr/rbd_support/task.py
src/pybind/mgr/rbd_support/trash_purge_schedule.py
 - Above conflicts were due to commit e4a16e2
   ("mgr/rbd_support: add type annotation") not in pacific
 - Above conflicts were due to commit dcb51b0
   ("mgr/rbd_support: define commands using CLICommand") not in pacific

src/pybind/mgr/mgr_module.py
src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py
src/pybind/mgr/rbd_support/module.py
src/pybind/mgr/rbd_support/perf.py
src/pybind/mgr/rbd_support/schedule.py
src/pybind/mgr/rbd_support/task.py
src/pybind/mgr/rbd_support/trash_purge_schedule.py

index c827c4feeb68c59cdafd48847cb8cbaeb2bff15b..d70e9113babc9811902ab7236635517439c93931 100644 (file)
@@ -1075,6 +1075,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
             addrs = self._rados.get_addrs()
             self._rados.shutdown()
             self._ceph_unregister_client(addrs)
+            self._rados = None
 
     @API.expose
     def get(self, data_name: str) -> Any:
index 71dd40484937a43ffb3db1cce310c59b6614d996..9d59627e6f0e82cb7c702e33ef66cd96957676a9 100644 (file)
@@ -40,6 +40,7 @@ class CreateSnapshotRequests:
         self.wait_for_pending()
 
     def wait_for_pending(self):
+        self.log.debug("CreateSnapshotRequests.wait_for_pending")
         with self.lock:
             while self.pending:
                 self.condition.wait()
@@ -305,7 +306,6 @@ class MirrorSnapshotScheduleHandler:
 
     lock = Lock()
     condition = Condition(lock)
-    thread = None
 
     def __init__(self, module):
         self.module = module
@@ -315,16 +315,23 @@ class MirrorSnapshotScheduleHandler:
 
         self.init_schedule_queue()
 
+        self.stop_thread = False
         self.thread = Thread(target=self.run)
         self.thread.start()
 
-    def _cleanup(self):
+    def shutdown(self):
+        self.log.info("MirrorSnapshotScheduleHandler: shutting down")
+        self.stop_thread = True
+        if self.thread.is_alive():
+            self.log.debug("MirrorSnapshotScheduleHandler: joining thread")
+            self.thread.join()
         self.create_snapshot_requests.wait_for_pending()
+        self.log.info("MirrorSnapshotScheduleHandler: shut down")
 
     def run(self):
         try:
             self.log.info("MirrorSnapshotScheduleHandler: starting")
-            while True:
+            while not self.stop_thread:
                 refresh_delay = self.refresh_images()
                 with self.lock:
                     (image_spec, wait_time) = self.dequeue()
@@ -336,6 +343,9 @@ class MirrorSnapshotScheduleHandler:
                 with self.lock:
                     self.enqueue(datetime.now(), pool_id, namespace, image_id)
 
+        except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
+            self.log.exception("MirrorSnapshotScheduleHandler: client blocklisted")
+            self.module.client_blocklisted.set()
         except Exception as ex:
             self.log.fatal("Fatal runtime error: {}\n{}".format(
                 ex, traceback.format_exc()))
@@ -422,6 +432,8 @@ class MirrorSnapshotScheduleHandler:
                     self.log.debug(
                         "load_pool_images: adding image {}".format(name))
                     images[pool_id][namespace][image_id] = name
+        except rbd.ConnectionShutdown:
+            raise
         except Exception as e:
             self.log.error(
                 "load_pool_images: exception when scanning pool {}: {}".format(
index 82bd06e6238d1035d65542e7571723e906b4ce48..3e2193702a242af7334262350773e4589710055f 100644 (file)
@@ -8,6 +8,7 @@ import rbd
 import traceback
 
 from mgr_module import MgrModule
+from threading import Thread, Event
 
 from .common import NotAuthorizedError
 from .mirror_snapshot_schedule import MirrorSnapshotScheduleHandler
@@ -156,13 +157,50 @@ class Module(MgrModule):
 
     def __init__(self, *args, **kwargs):
         super(Module, self).__init__(*args, **kwargs)
+        self.client_blocklisted = Event()
+        self.recovery_thread = Thread(target=self.run)
+        self.recovery_thread.start()
+        self.setup()
+
+    def setup(self):
+        self.log.info("starting setup")
+        # new client is created and registed in the MgrMap implicitly
+        # as 'rados' is a property attribute.
         self.rados.wait_for_latest_osdmap()
         self.mirror_snapshot_schedule = MirrorSnapshotScheduleHandler(self)
         self.perf = PerfHandler(self)
         self.task = TaskHandler(self)
         self.trash_purge_schedule = TrashPurgeScheduleHandler(self)
+        self.log.info("setup complete")
+        self.module_ready = True
+
+    def run(self):
+        self.log.info("recovery thread starting")
+        try:
+            while True:
+                # block until rados client is blocklisted
+                self.client_blocklisted.wait()
+                self.log.info("restarting")
+                self.shutdown()
+                self.client_blocklisted.clear()
+                self.setup()
+                self.log.info("restarted")
+        except Exception as ex:
+            self.log.fatal("Fatal runtime error: {}\n{}".format(
+                ex, traceback.format_exc()))
+
+    def shutdown(self):
+        self.module_ready = False
+        self.mirror_snapshot_schedule.shutdown()
+        self.trash_purge_schedule.shutdown()
+        self.task.shutdown()
+        self.perf.shutdown()
+        # shut down client and deregister it from MgrMap
+        super().shutdown()
 
     def handle_command(self, inbuf, cmd):
+        if not self.module_ready:
+            return -errno.EAGAIN, "", ""
         # ensure we have latest pools available
         self.rados.wait_for_latest_osdmap()
 
@@ -188,6 +226,10 @@ class Module(MgrModule):
                     ex, traceback.format_exc()))
                 raise
 
+        except (rados.ConnectionShutdown, rbd.ConnectionShutdown) as ex:
+            self.log.debug("handle_command: client blocklisted")
+            self.client_blocklisted.set()
+            return -errno.EAGAIN, "", str(ex)
         except rados.Error as ex:
             return -ex.errno, "", str(ex)
         except rbd.OSError as ex:
index c5accf1140c5584221f6509305eabc5df47facc5..d8f863fcb416ab707f4d254a91e2df11ce50405d 100644 (file)
@@ -45,7 +45,6 @@ class PerfHandler:
     lock = Lock()
     query_condition = Condition(lock)
     refresh_condition = Condition(lock)
-    thread = None
 
     image_name_cache = {}
     image_name_refresh_time = datetime.fromtimestamp(0)
@@ -89,13 +88,22 @@ class PerfHandler:
         self.module = module
         self.log = module.log
 
+        self.stop_thread = False
         self.thread = Thread(target=self.run)
         self.thread.start()
 
+    def shutdown(self):
+        self.log.info("PerfHandler: shutting down")
+        self.stop_thread = True
+        if self.thread.is_alive():
+            self.log.debug("PerfHandler: joining thread")
+            self.thread.join()
+        self.log.info("PerfHandler: shut down")
+
     def run(self):
         try:
             self.log.info("PerfHandler: starting")
-            while True:
+            while not self.stop_thread:
                 with self.lock:
                     self.scrub_expired_queries()
                     self.process_raw_osd_perf_counters()
@@ -106,6 +114,9 @@ class PerfHandler:
 
                 self.log.debug("PerfHandler: tick")
 
+        except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
+            self.log.exception("PerfHandler: client blocklisted")
+            self.module.client_blocklisted.set()
         except Exception as ex:
             self.log.fatal("Fatal runtime error: {}\n{}".format(
                 ex, traceback.format_exc()))
index 167514bbbfa44bd8a815e6f58ad7cd7c4cde9732..d9ffb6f72316b49af1cfde3353267cb8cbc7d1aa 100644 (file)
@@ -376,6 +376,8 @@ class Schedules:
                 with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx:
                     self.load_from_pool(ioctx, namespace_validator,
                                         image_validator)
+            except rados.ConnectionShutdown:
+                raise
             except rados.Error as e:
                 self.handler.log.error(
                     "Failed to load schedules for pool {}: {}".format(
index ff096fd9bd5e3a1ffe1a6be01bb6ebda43d3987a..cfd37c7d158983ca5c73b68bbd0c09f2ab5c194b 100644 (file)
@@ -138,7 +138,6 @@ class Task:
 class TaskHandler:
     lock = Lock()
     condition = Condition(lock)
-    thread = None
 
     in_progress_task = None
     tasks_by_sequence = dict()
@@ -155,6 +154,7 @@ class TaskHandler:
         with self.lock:
             self.init_task_queue()
 
+        self.stop_thread = False
         self.thread = Thread(target=self.run)
         self.thread.start()
 
@@ -176,10 +176,18 @@ class TaskHandler:
         return (match.group(1) or self.default_pool_name, match.group(2) or '',
                 match.group(3))
 
+    def shutdown(self):
+        self.log.info("TaskHandler: shutting down")
+        self.stop_thread = True
+        if self.thread.is_alive():
+            self.log.debug("TaskHandler: joining thread")
+            self.thread.join()
+        self.log.info("TaskHandler: shut down")
+
     def run(self):
         try:
             self.log.info("TaskHandler: starting")
-            while True:
+            while not self.stop_thread:
                 with self.lock:
                     now = datetime.now()
                     for sequence in sorted([sequence for sequence, task
@@ -190,6 +198,9 @@ class TaskHandler:
                     self.condition.wait(5)
                     self.log.debug("TaskHandler: tick")
 
+        except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
+            self.log.exception("TaskHandler: client blocklisted")
+            self.module.client_blocklisted.set()
         except Exception as ex:
             self.log.fatal("Fatal runtime error: {}\n{}".format(
                 ex, traceback.format_exc()))
@@ -403,6 +414,9 @@ class TaskHandler:
                 self.complete_progress(task)
                 self.remove_task(None, task)
 
+        except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
+            raise
+
         except (rados.Error, rbd.Error) as e:
             self.log.error("execute_task: {}".format(e))
             task.retry_message = "{}".format(e)
index df6221f53fb3d68aa6985ab973dd78ca3f9441be..56ad24876088e891815085a69327536536bf79d1 100644 (file)
@@ -19,7 +19,6 @@ class TrashPurgeScheduleHandler:
 
     lock = Lock()
     condition = Condition(lock)
-    thread = None
 
     def __init__(self, module):
         self.module = module
@@ -28,13 +27,22 @@ class TrashPurgeScheduleHandler:
 
         self.init_schedule_queue()
 
+        self.stop_thread = False
         self.thread = Thread(target=self.run)
         self.thread.start()
 
+    def shutdown(self):
+        self.log.info("TrashPurgeScheduleHandler: shutting down")
+        self.stop_thread = True
+        if self.thread.is_alive():
+            self.log.debug("TrashPurgeScheduleHandler: joining thread")
+            self.thread.join()
+        self.log.info("TrashPurgeScheduleHandler: shut down")
+
     def run(self):
         try:
             self.log.info("TrashPurgeScheduleHandler: starting")
-            while True:
+            while not self.stop_thread:
                 refresh_delay = self.refresh_pools()
                 with self.lock:
                     (ns_spec, wait_time) = self.dequeue()
@@ -46,6 +54,9 @@ class TrashPurgeScheduleHandler:
                 with self.lock:
                     self.enqueue(datetime.now(), pool_id, namespace)
 
+        except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
+            self.log.exception("TrashPurgeScheduleHandler: client blocklisted")
+            self.module.client_blocklisted.set()
         except Exception as ex:
             self.log.fatal("Fatal runtime error: {}\n{}".format(
                 ex, traceback.format_exc()))
@@ -55,6 +66,8 @@ class TrashPurgeScheduleHandler:
             with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
                 ioctx.set_namespace(namespace)
                 rbd.RBD().trash_purge(ioctx, datetime.now())
+        except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
+            raise
         except Exception as e:
             self.log.error("exception when purgin {}/{}: {}".format(
                 pool_id, namespace, e))
@@ -115,6 +128,8 @@ class TrashPurgeScheduleHandler:
             pool_namespaces += rbd.RBD().namespace_list(ioctx)
         except rbd.OperationNotSupported:
             self.log.debug("namespaces not supported")
+        except rbd.ConnectionShutdown:
+            raise
         except Exception as e:
             self.log.error("exception when scanning pool {}: {}".format(
                 pool_name, e))