In certain scenarios the OSDs were slow to process RBD requests.
This lead to the rbd_support module's RBD client not being able to
gracefully handover a RBD exclusive lock to another RBD client.
After the condition persisted for some time, the other RBD client
forcefully acquired the lock by blocklisting the rbd_support module's
RBD client, and consequently blocklisted the module's RADOS client. The
rbd_support module stopped working. To recover the module, the entire
mgr service had to be restarted which reloaded other mgr modules.
Instead of recovering the rbd_support module from client blocklisting
by being disruptive to other mgr modules, recover the module
automatically without restarting the mgr serivce. On client getting
blocklisted, shutdown the module's handlers and blocklisted client,
create a new rados client for the module, and start the new handlers.
Fixes: https://tracker.ceph.com/issues/56724
Signed-off-by: Ramana Raja <rraja@redhat.com>
(cherry picked from commit
cc0468738e5ddb98f7ac10b50e54446197b9c9a0)
Conflicts:
src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py
src/pybind/mgr/rbd_support/module.py
src/pybind/mgr/rbd_support/perf.py
src/pybind/mgr/rbd_support/task.py
src/pybind/mgr/rbd_support/trash_purge_schedule.py
- Above conflicts were due to commit
e4a16e2
("mgr/rbd_support: add type annotation") not in pacific
- Above conflicts were due to commit
dcb51b0
("mgr/rbd_support: define commands using CLICommand") not in pacific
addrs = self._rados.get_addrs()
self._rados.shutdown()
self._ceph_unregister_client(addrs)
+ self._rados = None
@API.expose
def get(self, data_name: str) -> Any:
self.wait_for_pending()
def wait_for_pending(self):
+ self.log.debug("CreateSnapshotRequests.wait_for_pending")
with self.lock:
while self.pending:
self.condition.wait()
lock = Lock()
condition = Condition(lock)
- thread = None
def __init__(self, module):
self.module = module
self.init_schedule_queue()
+ self.stop_thread = False
self.thread = Thread(target=self.run)
self.thread.start()
- def _cleanup(self):
+ def shutdown(self):
+ self.log.info("MirrorSnapshotScheduleHandler: shutting down")
+ self.stop_thread = True
+ if self.thread.is_alive():
+ self.log.debug("MirrorSnapshotScheduleHandler: joining thread")
+ self.thread.join()
self.create_snapshot_requests.wait_for_pending()
+ self.log.info("MirrorSnapshotScheduleHandler: shut down")
def run(self):
try:
self.log.info("MirrorSnapshotScheduleHandler: starting")
- while True:
+ while not self.stop_thread:
refresh_delay = self.refresh_images()
with self.lock:
(image_spec, wait_time) = self.dequeue()
with self.lock:
self.enqueue(datetime.now(), pool_id, namespace, image_id)
+ except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
+ self.log.exception("MirrorSnapshotScheduleHandler: client blocklisted")
+ self.module.client_blocklisted.set()
except Exception as ex:
self.log.fatal("Fatal runtime error: {}\n{}".format(
ex, traceback.format_exc()))
self.log.debug(
"load_pool_images: adding image {}".format(name))
images[pool_id][namespace][image_id] = name
+ except rbd.ConnectionShutdown:
+ raise
except Exception as e:
self.log.error(
"load_pool_images: exception when scanning pool {}: {}".format(
import traceback
from mgr_module import MgrModule
+from threading import Thread, Event
from .common import NotAuthorizedError
from .mirror_snapshot_schedule import MirrorSnapshotScheduleHandler
def __init__(self, *args, **kwargs):
super(Module, self).__init__(*args, **kwargs)
+ self.client_blocklisted = Event()
+ self.recovery_thread = Thread(target=self.run)
+ self.recovery_thread.start()
+ self.setup()
+
+ def setup(self):
+ self.log.info("starting setup")
+ # new client is created and registed in the MgrMap implicitly
+ # as 'rados' is a property attribute.
self.rados.wait_for_latest_osdmap()
self.mirror_snapshot_schedule = MirrorSnapshotScheduleHandler(self)
self.perf = PerfHandler(self)
self.task = TaskHandler(self)
self.trash_purge_schedule = TrashPurgeScheduleHandler(self)
+ self.log.info("setup complete")
+ self.module_ready = True
+
+ def run(self):
+ self.log.info("recovery thread starting")
+ try:
+ while True:
+ # block until rados client is blocklisted
+ self.client_blocklisted.wait()
+ self.log.info("restarting")
+ self.shutdown()
+ self.client_blocklisted.clear()
+ self.setup()
+ self.log.info("restarted")
+ except Exception as ex:
+ self.log.fatal("Fatal runtime error: {}\n{}".format(
+ ex, traceback.format_exc()))
+
+ def shutdown(self):
+ self.module_ready = False
+ self.mirror_snapshot_schedule.shutdown()
+ self.trash_purge_schedule.shutdown()
+ self.task.shutdown()
+ self.perf.shutdown()
+ # shut down client and deregister it from MgrMap
+ super().shutdown()
def handle_command(self, inbuf, cmd):
+ if not self.module_ready:
+ return -errno.EAGAIN, "", ""
# ensure we have latest pools available
self.rados.wait_for_latest_osdmap()
ex, traceback.format_exc()))
raise
+ except (rados.ConnectionShutdown, rbd.ConnectionShutdown) as ex:
+ self.log.debug("handle_command: client blocklisted")
+ self.client_blocklisted.set()
+ return -errno.EAGAIN, "", str(ex)
except rados.Error as ex:
return -ex.errno, "", str(ex)
except rbd.OSError as ex:
lock = Lock()
query_condition = Condition(lock)
refresh_condition = Condition(lock)
- thread = None
image_name_cache = {}
image_name_refresh_time = datetime.fromtimestamp(0)
self.module = module
self.log = module.log
+ self.stop_thread = False
self.thread = Thread(target=self.run)
self.thread.start()
+ def shutdown(self):
+ self.log.info("PerfHandler: shutting down")
+ self.stop_thread = True
+ if self.thread.is_alive():
+ self.log.debug("PerfHandler: joining thread")
+ self.thread.join()
+ self.log.info("PerfHandler: shut down")
+
def run(self):
try:
self.log.info("PerfHandler: starting")
- while True:
+ while not self.stop_thread:
with self.lock:
self.scrub_expired_queries()
self.process_raw_osd_perf_counters()
self.log.debug("PerfHandler: tick")
+ except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
+ self.log.exception("PerfHandler: client blocklisted")
+ self.module.client_blocklisted.set()
except Exception as ex:
self.log.fatal("Fatal runtime error: {}\n{}".format(
ex, traceback.format_exc()))
with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx:
self.load_from_pool(ioctx, namespace_validator,
image_validator)
+ except rados.ConnectionShutdown:
+ raise
except rados.Error as e:
self.handler.log.error(
"Failed to load schedules for pool {}: {}".format(
class TaskHandler:
lock = Lock()
condition = Condition(lock)
- thread = None
in_progress_task = None
tasks_by_sequence = dict()
with self.lock:
self.init_task_queue()
+ self.stop_thread = False
self.thread = Thread(target=self.run)
self.thread.start()
return (match.group(1) or self.default_pool_name, match.group(2) or '',
match.group(3))
+ def shutdown(self):
+ self.log.info("TaskHandler: shutting down")
+ self.stop_thread = True
+ if self.thread.is_alive():
+ self.log.debug("TaskHandler: joining thread")
+ self.thread.join()
+ self.log.info("TaskHandler: shut down")
+
def run(self):
try:
self.log.info("TaskHandler: starting")
- while True:
+ while not self.stop_thread:
with self.lock:
now = datetime.now()
for sequence in sorted([sequence for sequence, task
self.condition.wait(5)
self.log.debug("TaskHandler: tick")
+ except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
+ self.log.exception("TaskHandler: client blocklisted")
+ self.module.client_blocklisted.set()
except Exception as ex:
self.log.fatal("Fatal runtime error: {}\n{}".format(
ex, traceback.format_exc()))
self.complete_progress(task)
self.remove_task(None, task)
+ except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
+ raise
+
except (rados.Error, rbd.Error) as e:
self.log.error("execute_task: {}".format(e))
task.retry_message = "{}".format(e)
lock = Lock()
condition = Condition(lock)
- thread = None
def __init__(self, module):
self.module = module
self.init_schedule_queue()
+ self.stop_thread = False
self.thread = Thread(target=self.run)
self.thread.start()
+ def shutdown(self):
+ self.log.info("TrashPurgeScheduleHandler: shutting down")
+ self.stop_thread = True
+ if self.thread.is_alive():
+ self.log.debug("TrashPurgeScheduleHandler: joining thread")
+ self.thread.join()
+ self.log.info("TrashPurgeScheduleHandler: shut down")
+
def run(self):
try:
self.log.info("TrashPurgeScheduleHandler: starting")
- while True:
+ while not self.stop_thread:
refresh_delay = self.refresh_pools()
with self.lock:
(ns_spec, wait_time) = self.dequeue()
with self.lock:
self.enqueue(datetime.now(), pool_id, namespace)
+ except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
+ self.log.exception("TrashPurgeScheduleHandler: client blocklisted")
+ self.module.client_blocklisted.set()
except Exception as ex:
self.log.fatal("Fatal runtime error: {}\n{}".format(
ex, traceback.format_exc()))
with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
ioctx.set_namespace(namespace)
rbd.RBD().trash_purge(ioctx, datetime.now())
+ except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
+ raise
except Exception as e:
self.log.error("exception when purgin {}/{}: {}".format(
pool_id, namespace, e))
pool_namespaces += rbd.RBD().namespace_list(ioctx)
except rbd.OperationNotSupported:
self.log.debug("namespaces not supported")
+ except rbd.ConnectionShutdown:
+ raise
except Exception as e:
self.log.error("exception when scanning pool {}: {}".format(
pool_name, e))