From 2a3faf1a0b65fbd62e6be688b702af94103b0106 Mon Sep 17 00:00:00 2001 From: Leonid Usov Date: Wed, 17 Apr 2024 14:49:34 +0300 Subject: [PATCH] mds/quiesce: agent: avoid a race condition with rapid db updates When new roots begin processing but don't yet make it into the currently tracked set, there is a window for the next update with the same roots to treat them as new. We fix it by simplifying the agent model, getting rid of the intermediate `working` set. Since we never remove or add items into the current roots collection, it's safe to update the current set directly from the pending set. The race was due to the fact that `db_update()` relied on the `current` to deduce new roots into `pending`, while the same new root could have already been seen and posted into the `working` set. This would lead to submitting the same new root twice. Without the `working` set such race isn't possible. Fixes: https://tracker.ceph.com/issues/65545 Signed-off-by: Leonid Usov --- src/mds/MDSRankQuiesce.cc | 13 +--- src/mds/QuiesceAgent.cc | 76 ++++++++++----------- src/mds/QuiesceAgent.h | 22 +++++-- src/test/mds/TestQuiesceAgent.cc | 110 +++++++++++++++++++++++-------- 4 files changed, 136 insertions(+), 85 deletions(-) diff --git a/src/mds/MDSRankQuiesce.cc b/src/mds/MDSRankQuiesce.cc index 004e153648313..539976cb367c2 100644 --- a/src/mds/MDSRankQuiesce.cc +++ b/src/mds/MDSRankQuiesce.cc @@ -518,16 +518,9 @@ void MDSRank::quiesce_agent_setup() { if (!inserted) { dout(3) << "duplicate quiesce request for root '" << it->first << "'" << dendl; - // we must update the request id so that old one can't cancel this request. - it->second.first = req_id; - if (it->second.second) { - it->second.second->complete(-EINTR); - it->second.second = c; - } else { - // if we have no context, it means we've completed it - // since we weren't inserted, we must have successfully quiesced - c->complete(0); - } + // report error for the duplicate request, just as MDCache would do + c->complete(-EINPROGRESS); + return std::nullopt; } else if (debug_rank && (debug_rank != whoami)) { // the root was pinned to a different rank // we should acknowledge the quiesce regardless of the other flags diff --git a/src/mds/QuiesceAgent.cc b/src/mds/QuiesceAgent.cc index 1b5dfe44460e0..6bc6770e31046 100644 --- a/src/mds/QuiesceAgent.cc +++ b/src/mds/QuiesceAgent.cc @@ -87,36 +87,38 @@ bool QuiesceAgent::db_update(QuiesceMap& map) } void* QuiesceAgent::agent_thread_main() { - working.clear(); - std::unique_lock lock(agent_mutex); + std::unique_lock agent_lock(agent_mutex); + + while (!stop_agent_thread) { + TrackedRootsVersion old; - while(!stop_agent_thread) { if (pending.armed) { - working.roots.swap(pending.roots); - working.db_version = pending.db_version; - } else { - // copy current roots - working.roots = current.roots; - working.db_version = current.db_version; + std::swap(old, current); + current.roots.swap(pending.roots); + current.db_version = pending.db_version; } dout(20) - << "current = " << current.db_version - << ", working = " << working.db_version - << ", pending = " << pending.db_version << dendl; - - current.armed = false; - working.armed = true; + << "old = " << old.db_version + << ", current = " << current.db_version + << dendl; - // it's safe to clear the pending roots under lock because it shouldn't + // it's safe to clear the pending roots under agent_lock because it shouldn't // ever hold a last shared ptr to quiesced tracked roots, causing their destructors to run cancel. pending.clear(); - lock.unlock(); + current.armed = true; + upkeep_needed = false; + + // for somebody waiting for the internal state to progress + agent_cond.notify_all(); + agent_lock.unlock(); + + _agent_thread_will_work(); - QuiesceMap ack(working.db_version); + QuiesceMap ack(current.db_version); // upkeep what we believe is the current state. - for (auto& [root, info] : working.roots) { + for (auto& [root, info] : current.roots) { info->lock(); bool should_quiesce = info->should_quiesce(); @@ -141,7 +143,7 @@ void* QuiesceAgent::agent_thread_main() { info->unlock(); // TODO: capturing QuiesceAgent& `this` is potentially dangerous - // the assumption is that since the root pointer is weak + // the assumption is that since the tracked root pointer is weak // it will have been deleted by the QuiesceAgent shutdown sequence set_upkeep_needed(); } @@ -165,16 +167,10 @@ void* QuiesceAgent::agent_thread_main() { } } - lock.lock(); + _agent_thread_did_work(); - bool new_version = current.db_version < working.db_version; - current.roots.swap(working.roots); - current.db_version = working.db_version; - - lock.unlock(); - - // clear the old roots and send the ack outside of the lock - working.roots.clear(); + // send the ack and clear the old roots outside of the lock + bool new_version = current.db_version != old.db_version; if (new_version || !ack.roots.empty()) { dout(20) << "asyncrhonous ack for " << (new_version ? "a new" : "the current") << " version: " << ack << dendl; int rc = quiesce_control.agent_ack(std::move(ack)); @@ -182,20 +178,19 @@ void* QuiesceAgent::agent_thread_main() { dout(3) << "got error: " << rc << " trying to send " << ack << dendl; } } + old.clear(); ack.clear(); - lock.lock(); - - // notify that we're done working on this version and all acks (if any) were sent - working.clear(); + agent_lock.lock(); + current.armed = false; // a new pending version could be set while we weren't locked // if that's the case just go for another pass // otherwise, wait for updates - if (!pending.armed && !current.armed && !stop_agent_thread) { + while (!pending.armed && !current.armed && !upkeep_needed && !stop_agent_thread) { // for somebody waiting for the thread to idle agent_cond.notify_all(); - agent_cond.wait(lock); + agent_cond.wait(agent_lock); } } agent_cond.notify_all(); @@ -206,13 +201,11 @@ void QuiesceAgent::set_pending_roots(QuiesceDbVersion version, TrackedRoots&& ne { std::unique_lock l(agent_mutex); - auto actual_version = std::max(current.db_version, working.db_version); - bool rollback = actual_version > version; - + bool rollback = current.db_version > version; + if (rollback) { dout(5) << "version rollback to " << version - << ". current = " << current.db_version - << ", working = " << working.db_version + << ". current = " << current.db_version << ", pending = " << pending.db_version << dendl; } @@ -230,10 +223,9 @@ void QuiesceAgent::set_upkeep_needed() dout(20) << "current = " << current.db_version - << ", working = " << working.db_version << ", pending = " << pending.db_version << dendl; - current.armed = true; + upkeep_needed = true; agent_cond.notify_all(); } diff --git a/src/mds/QuiesceAgent.h b/src/mds/QuiesceAgent.h index 4b1ef84b4a54b..57460ee946aa1 100644 --- a/src/mds/QuiesceAgent.h +++ b/src/mds/QuiesceAgent.h @@ -33,7 +33,7 @@ class QuiesceAgent { agent_thread.create("quiesce.agt"); }; - ~QuiesceAgent() { + virtual ~QuiesceAgent() { shutdown(); } @@ -213,20 +213,28 @@ class QuiesceAgent { operator<<(std::basic_ostream& os, const QuiesceAgent::TrackedRootsVersion& tr); TrackedRootsVersion current; - TrackedRootsVersion working; TrackedRootsVersion pending; std::mutex agent_mutex; std::condition_variable agent_cond; bool stop_agent_thread; + bool upkeep_needed; template QuiesceDbVersion await_idle_locked(L &lock) { - agent_cond.wait(lock, [this] { - return !(current.armed || working.armed || pending.armed); + return await_phase_locked(lock, false, false); + } + + template + QuiesceDbVersion await_phase_locked(L& lock, bool pending_armed, bool current_armed) + { + agent_cond.wait(lock, [=, this] { + return ( !upkeep_needed + && current.armed == current_armed + && pending.armed == pending_armed); }); - return current.db_version; + return std::max(current.db_version, pending.db_version); } void set_pending_roots(QuiesceDbVersion db_version, TrackedRoots&& new_roots); @@ -249,4 +257,8 @@ class QuiesceAgent { } agent_thread; void* agent_thread_main(); + + virtual void _agent_thread_will_work() { } + virtual void _agent_thread_did_work() { } + }; diff --git a/src/test/mds/TestQuiesceAgent.cc b/src/test/mds/TestQuiesceAgent.cc index ae95115895459..d32d48b967551 100644 --- a/src/test/mds/TestQuiesceAgent.cc +++ b/src/test/mds/TestQuiesceAgent.cc @@ -50,7 +50,7 @@ class QuiesceAgentTest : public testing::Test { QuiesceDbVersion get_latest_version() { std::lock_guard l(agent_mutex); - return std::max({current.db_version, working.db_version, pending.db_version}); + return std::max(current.db_version, pending.db_version); } TrackedRoots& mutable_tracked_roots() { return current.roots; @@ -60,6 +60,16 @@ class QuiesceAgentTest : public testing::Test { std::unique_lock l(agent_mutex); return await_idle_locked(l); } + + using TRV = TrackedRootsVersion; + std::optional> before_work; + + void _agent_thread_will_work() { + auto f = before_work; + if (f) { + (*f)(pending, current); + } + } }; QuiesceMap latest_ack; std::unordered_map quiesce_requests; @@ -98,19 +108,12 @@ class QuiesceAgentTest : public testing::Test { auto [it, inserted] = quiesce_requests.try_emplace(r, req_id, c); if (!inserted) { - // we must update the request id so that old one can't cancel this request. - it->second.first = req_id; - if (it->second.second) { - it->second.second->complete(-EINTR); - it->second.second = c; - } else { - // if we have no context, it means we've completed it - // since we weren't inserted, we must have successfully quiesced - c->complete(0); - } + // it's a conflict that MDCache doesn't deal with + c->complete(-EINPROGRESS); + return req_id; + } else { + return it->second.first; } - - return it->second.first; }; ci.cancel_request = [this](RequestHandle h) { @@ -171,10 +174,10 @@ class QuiesceAgentTest : public testing::Test { } template > - bool await_idle_v(QuiesceDbVersion version, D timeout = std::chrono::duration_cast(std::chrono::seconds(10))) + bool await_idle_v(QuiesceSetVersion v, D timeout = std::chrono::duration_cast(std::chrono::seconds(10))) { - return timed_run(timeout, [this, version] { - while (version > agent->await_idle()) { }; + return timed_run(timeout, [this, v] { + while (QuiesceDbVersion {1, v} > agent->await_idle()) { }; }); } @@ -475,32 +478,35 @@ TEST_F(QuiesceAgentTest, DuplicateQuiesceRequest) { EXPECT_TRUE(await_idle()); - // now we should have seen the ack with root2 quiesced + // root1 and root2 are still registered internally + // so it should result in a failure to quiesce them again EXPECT_EQ(3, latest_ack.db_version); - EXPECT_EQ(1, latest_ack.roots.size()); - EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root1").state); + EXPECT_EQ(2, latest_ack.roots.size()); + EXPECT_EQ(QS_FAILED, latest_ack.roots.at("root1").state); + EXPECT_EQ(QS_FAILED, latest_ack.roots.at("root2").state); // the actual state of the pinned objects shouldn't have changed EXPECT_EQ(QS_QUIESCED, pinned1->get_actual_state()); - EXPECT_EQ(QS_FAILED, pinned2->get_actual_state()); + EXPECT_EQ(QS_QUIESCING, pinned2->get_actual_state()); EXPECT_EQ(0, *pinned1->quiesce_result); - EXPECT_EQ(-EINTR, *pinned2->quiesce_result); + EXPECT_FALSE(pinned2->quiesce_result.has_value()); - // releasing the pinned objects will attempt to cancel, but that shouldn't interfere with the current state + // releasing the pinned objects should cancel and remove from internal requests pinned1.reset(); pinned2.reset(); - EXPECT_TRUE(quiesce_requests.contains("root1")); - EXPECT_TRUE(quiesce_requests.contains("root2")); + EXPECT_FALSE(quiesce_requests.contains("root1")); + EXPECT_FALSE(quiesce_requests.contains("root2")); - EXPECT_TRUE(complete_quiesce("root2")); + EXPECT_TRUE(complete_quiesce("root3")); EXPECT_TRUE(await_idle()); EXPECT_EQ(3, latest_ack.db_version); - EXPECT_EQ(2, latest_ack.roots.size()); - EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root1").state); - EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root2").state); + EXPECT_EQ(3, latest_ack.roots.size()); + EXPECT_EQ(QS_FAILED, latest_ack.roots.at("root1").state); + EXPECT_EQ(QS_FAILED, latest_ack.roots.at("root2").state); + EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root3").state); } TEST_F(QuiesceAgentTest, TimeoutBeforeComplete) @@ -543,3 +549,51 @@ TEST_F(QuiesceAgentTest, TimeoutBeforeComplete) EXPECT_EQ(0, tracked.size()); } } + + +TEST_F(QuiesceAgentTest, RapidDbUpdates) +{ + // This validates that the same new root that happens to be reported + // more than once before we have chance to process it is not submitted + // multiple times + + // set a handler that will post v2 whlie we're working on v1 + agent->before_work = [this](TestQuiesceAgent::TRV& p, TestQuiesceAgent::TRV& c) { + if (c.db_version.set_version != 1) { + return; + } + agent->before_work.reset(); + auto ack = update(2, { + { "root1", QS_QUIESCING }, + { "root2", QS_QUIESCING }, + }); + + ASSERT_TRUE(ack.has_value()); + EXPECT_EQ(2, ack->db_version); + EXPECT_EQ(0, ack->roots.size()); + }; + + { + auto ack = update(1, { + { "root1", QS_QUIESCING }, + }); + + ASSERT_TRUE(ack.has_value()); + EXPECT_EQ(1, ack->db_version); + EXPECT_EQ(0, ack->roots.size()); + } + + EXPECT_TRUE(await_idle_v(2)); + + // nothing should be in the ack + // if we incorrectly submit root1 twice + // then it should be repored here as FAILED + EXPECT_EQ(2, latest_ack.db_version); + EXPECT_EQ(0, latest_ack.roots.size()); + + { + auto tracked = agent->tracked_roots(); + EXPECT_EQ(2, tracked.size()); + } +} + -- 2.39.5