From: Leonid Usov Date: Wed, 6 Mar 2024 16:06:50 +0000 (+0200) Subject: mds/quiesce-db: keep the db thread alive until shutdown X-Git-Tag: testing/wip-root-testing-20240411.174241~99^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=aba3c1d240c873fc0cd3e1d30cd86b2dbb71f24b;p=ceph-ci.git mds/quiesce-db: keep the db thread alive until shutdown With the change we can now avoid having to join it during the membership update, preventing potential deadlocks Signed-off-by: Leonid Usov (cherry picked from commit 8b896a9e145796119f0451201dc4d53ddac97db0) --- diff --git a/src/mds/MDSRank.cc b/src/mds/MDSRank.cc index c14ca9e1cb1..a2bab2802a8 100644 --- a/src/mds/MDSRank.cc +++ b/src/mds/MDSRank.cc @@ -864,17 +864,15 @@ void MDSRankDispatcher::shutdown() progress_thread.shutdown(); - if (quiesce_db_manager) { - // shutdown the manager - quiesce_db_manager->update_membership({}); - } - // release mds_lock for finisher/messenger threads (e.g. // MDSDaemon::ms_handle_reset called from Messenger). mds_lock.unlock(); // shut down messenger messenger->shutdown(); + + // the quiesce db membership is + // managed by the mds map update, no need to address that here if (quiesce_agent) { // reset any tracked roots quiesce_agent->shutdown(); diff --git a/src/mds/QuiesceDbManager.cc b/src/mds/QuiesceDbManager.cc index ca69fef73fd..6fccaacf10c 100644 --- a/src/mds/QuiesceDbManager.cc +++ b/src/mds/QuiesceDbManager.cc @@ -22,7 +22,7 @@ #define dout_context g_ceph_context #define dout_subsys ceph_subsys_mds_quiesce #undef dout_prefix -#define dout_prefix *_dout << "quiesce.mgr <" << __func__ << "> " +#define dout_prefix *_dout << "quiesce.mgr." << membership.me << " <" << __func__ << "> " #undef dout #define dout(lvl) \ @@ -54,23 +54,23 @@ static QuiesceTimeInterval time_distance(QuiesceTimePoint lhs, QuiesceTimePoint bool QuiesceDbManager::db_thread_has_work() const { - return false + return db_thread_should_exit || pending_acks.size() > 0 || pending_requests.size() > 0 || pending_db_updates.size() > 0 || (agent_callback.has_value() && agent_callback->if_newer < db_version()) - || (!cluster_membership.has_value() || cluster_membership->epoch != membership.epoch); + || (cluster_membership.has_value() && cluster_membership->epoch != membership.epoch); } void* QuiesceDbManager::quiesce_db_thread_main() { - db_thread_enter(); - std::unique_lock ls(submit_mutex); QuiesceTimeInterval next_event_at_age = QuiesceTimeInterval::max(); QuiesceDbVersion last_acked = {0, 0}; - while (true) { + dout(5) << "Entering the main thread" << dendl; + bool keep_working = true; + while (keep_working) { auto db_age = db.get_age(); @@ -78,11 +78,10 @@ void* QuiesceDbManager::quiesce_db_thread_main() submit_condition.wait_for(ls, next_event_at_age - db_age); } - if (!membership_upkeep()) { - break; - } + auto [is_member, should_exit] = membership_upkeep(); + keep_working = !should_exit; - { + if (is_member) { decltype(pending_acks) acks(std::move(pending_acks)); decltype(pending_requests) requests(std::move(pending_requests)); decltype(pending_db_updates) db_updates(std::move(pending_db_updates)); @@ -105,6 +104,10 @@ void* QuiesceDbManager::quiesce_db_thread_main() } else { next_event_at_age = replica_upkeep(std::move(db_updates)); } + } else { + ls.unlock(); + dout(15) << "not a cluster member, keeping idle " << dendl; + next_event_at_age = QuiesceTimeInterval::max(); } complete_requests(); @@ -131,7 +134,7 @@ void* QuiesceDbManager::quiesce_db_thread_main() } } - if (send_ack) { + if (is_member && send_ack) { auto db_version = quiesce_map.db_version; dout(20) << "synchronous agent ack: " << quiesce_map << dendl; auto rc = membership.send_ack(std::move(quiesce_map)); @@ -148,7 +151,7 @@ void* QuiesceDbManager::quiesce_db_thread_main() ls.unlock(); - db_thread_exit(); + dout(5) << "Exiting the main thread" << dendl; return 0; } @@ -160,43 +163,36 @@ void QuiesceDbManager::update_membership(const QuiesceClusterMembership& new_mem bool will_participate = new_membership.members.contains(new_membership.me); dout(20) << "will participate: " << std::boolalpha << will_participate << std::noboolalpha << dendl; - if (cluster_membership && !will_participate) { - // stop the thread - cluster_membership.reset(); + if (will_participate && !quiesce_db_thread.is_started()) { + // start the thread + dout(5) << "starting the db mgr thread at epoch: " << new_membership.epoch << dendl; + db_thread_should_exit = false; + quiesce_db_thread.create("quiesce_db_mgr"); + } else { submit_condition.notify_all(); - lock.unlock(); - ceph_assert(quiesce_db_thread.is_started()); - dout(5) << "stopping the db mgr thread at epoch: " << new_membership.epoch << dendl; - quiesce_db_thread.join(); - } else if (will_participate) { - if (!cluster_membership) { - // start the thread - dout(5) << "starting the db mgr thread at epoch: " << new_membership.epoch << dendl; - quiesce_db_thread.create("quiesce_db_mgr"); - } else { - submit_condition.notify_all(); - } - if (inject_request) { - pending_requests.push_front(inject_request); - } + } + + if (inject_request) { + pending_requests.push_front(inject_request); + } + + if (will_participate) { cluster_membership = new_membership; - - std::lock_guard lc(agent_mutex); - if (agent_callback) { - agent_callback->if_newer = {0, 0}; - } + } else { + cluster_membership.reset(); } - if (!will_participate && inject_request) { - inject_request->complete(-EPERM); + std::lock_guard lc(agent_mutex); + if (agent_callback) { + agent_callback->if_newer = {0, 0}; } } -bool QuiesceDbManager::membership_upkeep() +std::pair QuiesceDbManager::membership_upkeep() { if (cluster_membership && cluster_membership->epoch == membership.epoch) { // no changes - return true; + return {true, db_thread_should_exit}; } bool was_leader = membership.epoch > 0 && membership.leader == membership.me; @@ -206,7 +202,7 @@ bool QuiesceDbManager::membership_upkeep() << std::boolalpha << was_leader << "->" << is_leader << std::noboolalpha << " members:" << cluster_membership->members << dendl; } else { - dout(10) << "shutdown! was_leader: " << was_leader << dendl; + dout(10) << "not a member! was_leader: " << was_leader << dendl; } if (is_leader) { @@ -238,18 +234,24 @@ bool QuiesceDbManager::membership_upkeep() done_requests[await_ctx.req_ctx] = EINPROGRESS; } awaits.clear(); - // reject pending requests + // reject pending requests as not leader while (!pending_requests.empty()) { - done_requests[pending_requests.front()] = EPERM; + done_requests[pending_requests.front()] = ENOTTY; pending_requests.pop_front(); } } if (cluster_membership) { membership = *cluster_membership; + dout(15) << "Updated membership" << dendl; + } else { + membership.epoch = 0; + peers.clear(); + awaits.clear(); + db.clear(); } - return cluster_membership.has_value(); + return { cluster_membership.has_value(), db_thread_should_exit }; } QuiesceTimeInterval QuiesceDbManager::replica_upkeep(decltype(pending_db_updates)&& db_updates) @@ -291,7 +293,7 @@ QuiesceTimeInterval QuiesceDbManager::replica_upkeep(decltype(pending_db_updates if (db.set_version > update.db_version.set_version) { dout(3) << "got an older version of DB from the leader: " << db.set_version << " > " << update.db_version.set_version << dendl; dout(3) << "discarding the DB" << dendl; - db.reset(); + db.clear(); } else { for (auto& [qs_id, qs] : update.sets) { db.sets.insert_or_assign(qs_id, std::move(qs)); diff --git a/src/mds/QuiesceDbManager.h b/src/mds/QuiesceDbManager.h index 08c8392d981..98d0b84fc24 100644 --- a/src/mds/QuiesceDbManager.h +++ b/src/mds/QuiesceDbManager.h @@ -49,7 +49,19 @@ class QuiesceDbManager { QuiesceDbManager() : quiesce_db_thread(this) {}; virtual ~QuiesceDbManager() { + shutdown(); + } + + void shutdown() { update_membership({}); + + if (quiesce_db_thread.is_started()) { + submit_mutex.lock(); + db_thread_should_exit = true; + submit_condition.notify_all(); + submit_mutex.unlock(); + quiesce_db_thread.join(); + } } // This will reset the manager state @@ -191,6 +203,7 @@ class QuiesceDbManager { std::queue pending_db_updates; std::queue pending_acks; std::deque pending_requests; + bool db_thread_should_exit = false; class QuiesceDbThread : public Thread { public: @@ -220,7 +233,7 @@ class QuiesceDbManager { QuiesceTimeInterval get_age() const { return QuiesceClock::now() - time_zero; } - void reset() { + void clear() { set_version = 0; sets.clear(); time_zero = QuiesceClock::now(); @@ -257,23 +270,11 @@ class QuiesceDbManager { std::unordered_map done_requests; void* quiesce_db_thread_main(); - - void db_thread_enter() { - // this will invalidate the membership, see membership_upkeep() - membership.epoch = 0; - peers.clear(); - awaits.clear(); - done_requests.clear(); - db.reset(); - } - - void db_thread_exit() { - complete_requests(); - } - bool db_thread_has_work() const; - bool membership_upkeep(); + using IsMemberBool = bool; + using ShouldExitBool = bool; + std::pair membership_upkeep(); QuiesceTimeInterval replica_upkeep(decltype(pending_db_updates)&& db_updates); bool leader_bootstrap(decltype(pending_db_updates)&& db_updates, QuiesceTimeInterval &next_event_at_age); diff --git a/src/test/mds/TestQuiesceDb.cc b/src/test/mds/TestQuiesceDb.cc index f930f6c042a..2ffba977883 100644 --- a/src/test/mds/TestQuiesceDb.cc +++ b/src/test/mds/TestQuiesceDb.cc @@ -230,7 +230,7 @@ class QuiesceDbTest: public testing::Test { response.clear(); request.reset(c); - int rr = -1; + int rr = -ENOTTY; for (auto& [rank, mgr] : parent.managers) { if (!(rr = mgr->submit_request(this))) { @@ -238,10 +238,8 @@ class QuiesceDbTest: public testing::Test { } } - if (rr == EPERM) { - // change the error to something never returned for a request - // EPIPE seems reasonable as we couldn't find the leader to send the command to - complete(EPIPE); + if (rr) { + complete(rr); return false; } @@ -358,6 +356,7 @@ TEST_F(QuiesceDbTest, ManagerStartup) { ASSERT_EQ(OK(), run_request_for(100, [](auto& r) {})); ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(2) })); ASSERT_EQ(OK(), run_request_for(100, [](auto& r) {})); + managers[mds_gid_t(2)]->shutdown(); ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(1), mds_gid_t(2) })); ASSERT_EQ(OK(), run_request_for(100, [](auto& r) {})); } @@ -1328,9 +1327,9 @@ TEST_F(QuiesceDbTest, LeaderShutdown) ASSERT_EQ(managers.at(mds_gid_t(1))->internal_pending_requests().size(), pending_requests.size()); - // reset the membership of the manager + // shutdown the manager // this will block until the db thread exits - managers.at(mds_gid_t(1))->update_membership({}); + managers.at(mds_gid_t(1))->shutdown(); // as of now all requests must have finished while(!outstanding_awaits.empty()) { @@ -1341,7 +1340,7 @@ TEST_F(QuiesceDbTest, LeaderShutdown) while (!pending_requests.empty()) { auto& r = *pending_requests.front(); - EXPECT_EQ(ERR(EPERM), r.check_result()); + EXPECT_EQ(ERR(ENOTTY), r.check_result()); pending_requests.pop(); } }