]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds/quiesce: agent: avoid a race condition with rapid db updates 56956/head
authorLeonid Usov <leonid.usov@ibm.com>
Wed, 17 Apr 2024 11:49:34 +0000 (14:49 +0300)
committerLeonid Usov <leonid.usov@ibm.com>
Wed, 17 Apr 2024 13:52:38 +0000 (16:52 +0300)
When new roots begin processing but don't yet make it into the
currently tracked set, there is a window for the next update
with the same roots to treat them as new.

We fix it by simplifying the agent model, getting rid of
the intermediate `working` set. Since we never remove or add
items into the current roots collection, it's safe to update the
current set directly from the pending set.

The race was due to the fact that `db_update()` relied on the `current`
to deduce new roots into `pending`, while the same new root
could have already been seen and posted into the `working` set.
This would lead to submitting the same new root twice.
Without the `working` set such race isn't possible.

Fixes: https://tracker.ceph.com/issues/65545
Signed-off-by: Leonid Usov <leonid.usov@ibm.com>
src/mds/MDSRankQuiesce.cc
src/mds/QuiesceAgent.cc
src/mds/QuiesceAgent.h
src/test/mds/TestQuiesceAgent.cc

index 004e153648313aa9fcee5eda0d20ba91475b64d5..539976cb367c24fadfb2e9d5220cc4c458b57e3f 100644 (file)
@@ -518,16 +518,9 @@ void MDSRank::quiesce_agent_setup() {
 
       if (!inserted) {
         dout(3) << "duplicate quiesce request for root '" << it->first << "'" << dendl;
-        // we must update the request id so that old one can't cancel this request.
-        it->second.first = req_id;
-        if (it->second.second) {
-          it->second.second->complete(-EINTR);
-          it->second.second = c;
-        } else {
-          // if we have no context, it means we've completed it
-          // since we weren't inserted, we must have successfully quiesced
-          c->complete(0);
-        }
+        // report error for the duplicate request, just as MDCache would do
+        c->complete(-EINPROGRESS);
+        return std::nullopt;
       } else if (debug_rank && (debug_rank != whoami)) {
         // the root was pinned to a different rank
         // we should acknowledge the quiesce regardless of the other flags
index 1b5dfe44460e05195106de261d058c2f636a8705..6bc6770e31046f69c2258a5d07935124e4178ccb 100644 (file)
@@ -87,36 +87,38 @@ bool QuiesceAgent::db_update(QuiesceMap& map)
 }
 
 void* QuiesceAgent::agent_thread_main() {
-  working.clear();
-  std::unique_lock lock(agent_mutex);
+  std::unique_lock agent_lock(agent_mutex);
+
+  while (!stop_agent_thread) {
+    TrackedRootsVersion old;
 
-  while(!stop_agent_thread) {
     if (pending.armed) {
-      working.roots.swap(pending.roots);
-      working.db_version = pending.db_version;
-    } else {
-      // copy current roots
-      working.roots = current.roots;
-      working.db_version = current.db_version;
+      std::swap(old, current);
+      current.roots.swap(pending.roots);
+      current.db_version = pending.db_version;
     }
 
     dout(20)
-        << "current = " << current.db_version
-        << ", working = " << working.db_version
-        << ", pending = " << pending.db_version << dendl;
-
-    current.armed = false;
-    working.armed = true;
+        << "old = " << old.db_version
+        << ", current = " << current.db_version
+        << dendl;
 
-    // it's safe to clear the pending roots under lock because it shouldn't
+    // it's safe to clear the pending roots under agent_lock because it shouldn't
     // ever hold a last shared ptr to quiesced tracked roots, causing their destructors to run cancel.
     pending.clear();
-    lock.unlock();
+    current.armed = true;
+    upkeep_needed = false;
+
+    // for somebody waiting for the internal state to progress
+    agent_cond.notify_all();
+    agent_lock.unlock();
+
+    _agent_thread_will_work();
 
-    QuiesceMap ack(working.db_version);
+    QuiesceMap ack(current.db_version);
   
     // upkeep what we believe is the current state.
-    for (auto& [root, info] : working.roots) {
+    for (auto& [root, info] : current.roots) {
 
       info->lock();
       bool should_quiesce = info->should_quiesce();
@@ -141,7 +143,7 @@ void* QuiesceAgent::agent_thread_main() {
             info->unlock();
 
             // TODO: capturing QuiesceAgent& `this` is potentially dangerous
-            //       the assumption is that since the root pointer is weak
+            //       the assumption is that since the tracked root pointer is weak
             //       it will have been deleted by the QuiesceAgent shutdown sequence
             set_upkeep_needed();
           }
@@ -165,16 +167,10 @@ void* QuiesceAgent::agent_thread_main() {
       }
     }
 
-    lock.lock();
+    _agent_thread_did_work();
 
-    bool new_version = current.db_version < working.db_version;
-    current.roots.swap(working.roots);
-    current.db_version = working.db_version;
-
-    lock.unlock();
-
-    // clear the old roots and send the ack outside of the lock
-    working.roots.clear();
+    // send the ack and clear the old roots outside of the lock
+    bool new_version = current.db_version != old.db_version;
     if (new_version || !ack.roots.empty()) {
       dout(20) << "asyncrhonous ack for " << (new_version ? "a new" : "the current") << " version: " << ack << dendl;
       int rc = quiesce_control.agent_ack(std::move(ack));
@@ -182,20 +178,19 @@ void* QuiesceAgent::agent_thread_main() {
         dout(3) << "got error: " << rc << " trying to send " << ack << dendl;
       }
     }
+    old.clear();
     ack.clear();
 
-    lock.lock();
-
-    // notify that we're done working on this version and all acks (if any) were sent
-    working.clear();
+    agent_lock.lock();
 
+    current.armed = false;
     // a new pending version could be set while we weren't locked
     // if that's the case just go for another pass
     // otherwise, wait for updates
-    if (!pending.armed && !current.armed && !stop_agent_thread) {
+    while (!pending.armed && !current.armed && !upkeep_needed && !stop_agent_thread) {
       // for somebody waiting for the thread to idle
       agent_cond.notify_all();
-      agent_cond.wait(lock);
+      agent_cond.wait(agent_lock);
     }
   }
   agent_cond.notify_all();
@@ -206,13 +201,11 @@ void QuiesceAgent::set_pending_roots(QuiesceDbVersion version, TrackedRoots&& ne
 {
   std::unique_lock l(agent_mutex);
 
-  auto actual_version = std::max(current.db_version, working.db_version);
-  bool rollback = actual_version > version;
-  
+  bool rollback = current.db_version > version;
+
   if (rollback) {
     dout(5) << "version rollback to " << version 
-      << ". current = " << current.db_version 
-      << ", working = " << working.db_version 
+      << ". current = " << current.db_version
       << ", pending = " << pending.db_version << dendl;
   }
 
@@ -230,10 +223,9 @@ void QuiesceAgent::set_upkeep_needed()
 
   dout(20)
       << "current = " << current.db_version
-      << ", working = " << working.db_version
       << ", pending = " << pending.db_version << dendl;
 
-  current.armed = true;
+  upkeep_needed = true;
   agent_cond.notify_all();
 }
 
index 4b1ef84b4a54bbd40ac33c0807aa353065f69337..57460ee946aa16b19898ef69f4aa42677d79d348 100644 (file)
@@ -33,7 +33,7 @@ class QuiesceAgent {
       agent_thread.create("quiesce.agt");
     };
 
-    ~QuiesceAgent() {
+    virtual ~QuiesceAgent() {
       shutdown();
     }
 
@@ -213,20 +213,28 @@ class QuiesceAgent {
     operator<<(std::basic_ostream<CharT, Traits>& os, const QuiesceAgent::TrackedRootsVersion& tr);
 
     TrackedRootsVersion current;
-    TrackedRootsVersion working;
     TrackedRootsVersion pending;
 
     std::mutex agent_mutex;
     std::condition_variable agent_cond;
     bool stop_agent_thread;
+    bool upkeep_needed;
   
     template<class L>
     QuiesceDbVersion await_idle_locked(L &lock) {
-      agent_cond.wait(lock, [this] {
-        return !(current.armed || working.armed || pending.armed);
+      return await_phase_locked(lock, false, false);
+    }
+
+    template <class L>
+    QuiesceDbVersion await_phase_locked(L& lock, bool pending_armed, bool current_armed)
+    {
+      agent_cond.wait(lock, [=, this] {
+        return ( !upkeep_needed
+          && current.armed == current_armed 
+          && pending.armed == pending_armed);
       });
 
-      return current.db_version;
+      return std::max(current.db_version, pending.db_version);
     }
 
     void set_pending_roots(QuiesceDbVersion db_version, TrackedRoots&& new_roots);
@@ -249,4 +257,8 @@ class QuiesceAgent {
     } agent_thread;
 
     void* agent_thread_main();
+
+    virtual void _agent_thread_will_work() { }
+    virtual void _agent_thread_did_work() { }
+
 };
index ae95115895459616ecbb9874cc43d4d2009b6fce..d32d48b9675512854741f69ecd2a01a9a7cb57a9 100644 (file)
@@ -50,7 +50,7 @@ class QuiesceAgentTest : public testing::Test {
       QuiesceDbVersion get_latest_version()
       {
         std::lock_guard l(agent_mutex);
-        return std::max({current.db_version, working.db_version, pending.db_version});
+        return std::max(current.db_version, pending.db_version);
       }
       TrackedRoots& mutable_tracked_roots() {
         return current.roots;
@@ -60,6 +60,16 @@ class QuiesceAgentTest : public testing::Test {
         std::unique_lock l(agent_mutex);
         return await_idle_locked(l);
       }
+
+      using TRV = TrackedRootsVersion;
+      std::optional<std::function<void(TRV& pending, TRV& current)>> before_work;
+
+      void _agent_thread_will_work() {
+        auto f = before_work;
+        if (f) {
+          (*f)(pending, current);
+        }
+      }
     };
     QuiesceMap latest_ack;
     std::unordered_map<QuiesceRoot, QuiescingRoot> quiesce_requests;
@@ -98,19 +108,12 @@ class QuiesceAgentTest : public testing::Test {
         auto [it, inserted] = quiesce_requests.try_emplace(r, req_id, c);
 
         if (!inserted) {
-          // we must update the request id so that old one can't cancel this request.
-          it->second.first = req_id;
-          if (it->second.second) {
-            it->second.second->complete(-EINTR);
-            it->second.second = c;
-          } else {
-            // if we have no context, it means we've completed it
-            // since we weren't inserted, we must have successfully quiesced
-            c->complete(0);
-          }
+          // it's a conflict that MDCache doesn't deal with
+          c->complete(-EINPROGRESS);
+          return req_id;
+        } else {
+          return it->second.first;
         }
-
-        return it->second.first;
       };
       
       ci.cancel_request = [this](RequestHandle h) {
@@ -171,10 +174,10 @@ class QuiesceAgentTest : public testing::Test {
     }
 
     template <class _Rep = std::chrono::seconds::rep, class _Period = std::chrono::seconds::period, typename D = std::chrono::duration<_Rep, _Period>>
-    bool await_idle_v(QuiesceDbVersion version, D timeout = std::chrono::duration_cast<D>(std::chrono::seconds(10)))
+    bool await_idle_v(QuiesceSetVersion v, D timeout = std::chrono::duration_cast<D>(std::chrono::seconds(10)))
     {
-      return timed_run(timeout, [this, version] {
-        while (version > agent->await_idle()) { };
+      return timed_run(timeout, [this, v] {
+        while (QuiesceDbVersion {1, v} > agent->await_idle()) { };
       });
     }
 
@@ -475,32 +478,35 @@ TEST_F(QuiesceAgentTest, DuplicateQuiesceRequest) {
 
   EXPECT_TRUE(await_idle());
 
-  // now we should have seen the ack with root2 quiesced
+  // root1 and root2 are still registered internally
+  // so it should result in a failure to quiesce them again
   EXPECT_EQ(3, latest_ack.db_version);
-  EXPECT_EQ(1, latest_ack.roots.size());
-  EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root1").state);
+  EXPECT_EQ(2, latest_ack.roots.size());
+  EXPECT_EQ(QS_FAILED, latest_ack.roots.at("root1").state);
+  EXPECT_EQ(QS_FAILED, latest_ack.roots.at("root2").state);
 
   // the actual state of the pinned objects shouldn't have changed
   EXPECT_EQ(QS_QUIESCED, pinned1->get_actual_state());
-  EXPECT_EQ(QS_FAILED, pinned2->get_actual_state());
+  EXPECT_EQ(QS_QUIESCING, pinned2->get_actual_state());
 
   EXPECT_EQ(0, *pinned1->quiesce_result);
-  EXPECT_EQ(-EINTR, *pinned2->quiesce_result);
+  EXPECT_FALSE(pinned2->quiesce_result.has_value());
 
-  // releasing the pinned objects will attempt to cancel, but that shouldn't interfere with the current state
+  // releasing the pinned objects should cancel and remove from internal requests
   pinned1.reset();
   pinned2.reset();
 
-  EXPECT_TRUE(quiesce_requests.contains("root1"));
-  EXPECT_TRUE(quiesce_requests.contains("root2"));
+  EXPECT_FALSE(quiesce_requests.contains("root1"));
+  EXPECT_FALSE(quiesce_requests.contains("root2"));
 
-  EXPECT_TRUE(complete_quiesce("root2"));
+  EXPECT_TRUE(complete_quiesce("root3"));
 
   EXPECT_TRUE(await_idle());
   EXPECT_EQ(3, latest_ack.db_version);
-  EXPECT_EQ(2, latest_ack.roots.size());
-  EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root1").state);
-  EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root2").state);
+  EXPECT_EQ(3, latest_ack.roots.size());
+  EXPECT_EQ(QS_FAILED, latest_ack.roots.at("root1").state);
+  EXPECT_EQ(QS_FAILED, latest_ack.roots.at("root2").state);
+  EXPECT_EQ(QS_QUIESCED, latest_ack.roots.at("root3").state);
 }
 
 TEST_F(QuiesceAgentTest, TimeoutBeforeComplete)
@@ -543,3 +549,51 @@ TEST_F(QuiesceAgentTest, TimeoutBeforeComplete)
     EXPECT_EQ(0, tracked.size());
   }
 }
+
+
+TEST_F(QuiesceAgentTest, RapidDbUpdates)
+{
+  // This validates that the same new root that happens to be reported
+  // more than once before we have chance to process it is not submitted
+  // multiple times
+
+  // set a handler that will post v2 whlie we're working on v1
+  agent->before_work = [this](TestQuiesceAgent::TRV& p, TestQuiesceAgent::TRV& c) {
+    if (c.db_version.set_version != 1) {
+      return;
+    }
+    agent->before_work.reset();
+    auto ack = update(2, {
+                             { "root1", QS_QUIESCING },
+                             { "root2", QS_QUIESCING },
+                         });
+
+    ASSERT_TRUE(ack.has_value());
+    EXPECT_EQ(2, ack->db_version);
+    EXPECT_EQ(0, ack->roots.size());
+  };
+
+  {
+    auto ack = update(1, {
+                             { "root1", QS_QUIESCING },
+                         });
+
+    ASSERT_TRUE(ack.has_value());
+    EXPECT_EQ(1, ack->db_version);
+    EXPECT_EQ(0, ack->roots.size());
+  }
+
+  EXPECT_TRUE(await_idle_v(2));
+
+  // nothing should be in the ack
+  // if we incorrectly submit root1 twice
+  // then it should be repored here as FAILED
+  EXPECT_EQ(2, latest_ack.db_version);
+  EXPECT_EQ(0, latest_ack.roots.size());
+
+  {
+    auto tracked = agent->tracked_roots();
+    EXPECT_EQ(2, tracked.size());
+  }
+}
+