]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds/quiesce-db: keep the db thread alive until shutdown 54485/head
authorLeonid Usov <leonid.usov@ibm.com>
Wed, 6 Mar 2024 16:06:50 +0000 (18:06 +0200)
committerLeonid Usov <leonid.usov@ibm.com>
Wed, 6 Mar 2024 16:07:19 +0000 (18:07 +0200)
With the change we can now avoid having to join it during the membership update, preventing potential deadlocks

Signed-off-by: Leonid Usov <leonid.usov@ibm.com>
src/mds/MDSRank.cc
src/mds/QuiesceDbManager.cc
src/mds/QuiesceDbManager.h
src/test/mds/TestQuiesceDb.cc

index c14ca9e1cb1e364af1df6fbb7533067204b58c07..a2bab2802a827612da9a9a48c886eeb8bc8808f4 100644 (file)
@@ -864,17 +864,15 @@ void MDSRankDispatcher::shutdown()
 
   progress_thread.shutdown();
 
-  if (quiesce_db_manager) {
-    // shutdown the manager
-    quiesce_db_manager->update_membership({});
-  }
-
   // release mds_lock for finisher/messenger threads (e.g.
   // MDSDaemon::ms_handle_reset called from Messenger).
   mds_lock.unlock();
 
   // shut down messenger
   messenger->shutdown();
+
+  // the quiesce db membership is
+  // managed by the mds map update, no need to address that here
   if (quiesce_agent) {
     // reset any tracked roots
     quiesce_agent->shutdown();
index ca69fef73fdaf2e45289e1f37595c1002ee74975..6fccaacf10c475c97a1b57b8f286796de3a8fe43 100644 (file)
@@ -22,7 +22,7 @@
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_mds_quiesce
 #undef dout_prefix
-#define dout_prefix *_dout << "quiesce.mgr <" << __func__ << "> "
+#define dout_prefix *_dout << "quiesce.mgr." << membership.me << " <" << __func__ << "> "
 
 #undef dout
 #define dout(lvl)                                                        \
@@ -54,23 +54,23 @@ static QuiesceTimeInterval time_distance(QuiesceTimePoint lhs, QuiesceTimePoint
 
 bool QuiesceDbManager::db_thread_has_work() const
 {
-  return false
+  return db_thread_should_exit
       || pending_acks.size() > 0
       || pending_requests.size() > 0
       || pending_db_updates.size() > 0
       || (agent_callback.has_value() && agent_callback->if_newer < db_version())
-      || (!cluster_membership.has_value() || cluster_membership->epoch != membership.epoch);
+      || (cluster_membership.has_value() && cluster_membership->epoch != membership.epoch);
 }
 
 void* QuiesceDbManager::quiesce_db_thread_main()
 {
-  db_thread_enter();
-
   std::unique_lock ls(submit_mutex);
   QuiesceTimeInterval next_event_at_age = QuiesceTimeInterval::max();
   QuiesceDbVersion last_acked = {0, 0};
 
-  while (true) {
+  dout(5) << "Entering the main thread" << dendl;
+  bool keep_working = true;
+  while (keep_working) {
 
     auto db_age = db.get_age();
 
@@ -78,11 +78,10 @@ void* QuiesceDbManager::quiesce_db_thread_main()
       submit_condition.wait_for(ls, next_event_at_age - db_age);
     }
 
-    if (!membership_upkeep()) {
-      break;
-    }
+    auto [is_member, should_exit] = membership_upkeep();
+    keep_working = !should_exit;
 
-    {
+    if (is_member) {
       decltype(pending_acks) acks(std::move(pending_acks));
       decltype(pending_requests) requests(std::move(pending_requests));
       decltype(pending_db_updates) db_updates(std::move(pending_db_updates));
@@ -105,6 +104,10 @@ void* QuiesceDbManager::quiesce_db_thread_main()
       } else {
         next_event_at_age = replica_upkeep(std::move(db_updates));
       }
+    } else {
+      ls.unlock();
+      dout(15) << "not a cluster member, keeping idle " << dendl;
+      next_event_at_age = QuiesceTimeInterval::max();
     }
   
     complete_requests();
@@ -131,7 +134,7 @@ void* QuiesceDbManager::quiesce_db_thread_main()
       }
     }
 
-    if (send_ack) {
+    if (is_member && send_ack) {
       auto db_version = quiesce_map.db_version;
       dout(20) << "synchronous agent ack: " << quiesce_map << dendl;
       auto rc = membership.send_ack(std::move(quiesce_map));
@@ -148,7 +151,7 @@ void* QuiesceDbManager::quiesce_db_thread_main()
 
   ls.unlock();
 
-  db_thread_exit();
+  dout(5) << "Exiting the main thread" << dendl;
 
   return 0;
 }
@@ -160,43 +163,36 @@ void QuiesceDbManager::update_membership(const QuiesceClusterMembership& new_mem
   bool will_participate = new_membership.members.contains(new_membership.me);
   dout(20) << "will participate: " << std::boolalpha << will_participate << std::noboolalpha << dendl;
 
-  if (cluster_membership && !will_participate) {
-    // stop the thread
-    cluster_membership.reset();
+  if (will_participate && !quiesce_db_thread.is_started()) {
+    // start the thread
+    dout(5) << "starting the db mgr thread at epoch: " << new_membership.epoch << dendl;
+    db_thread_should_exit = false;
+    quiesce_db_thread.create("quiesce_db_mgr");
+  } else {
     submit_condition.notify_all();
-    lock.unlock();
-    ceph_assert(quiesce_db_thread.is_started());
-    dout(5) << "stopping the db mgr thread at epoch: " << new_membership.epoch << dendl;
-    quiesce_db_thread.join();
-  } else if (will_participate) {
-    if (!cluster_membership) {
-      // start the thread
-      dout(5) << "starting the db mgr thread at epoch: " << new_membership.epoch << dendl;
-      quiesce_db_thread.create("quiesce_db_mgr");
-    } else {
-      submit_condition.notify_all();
-    }
-    if (inject_request) {
-      pending_requests.push_front(inject_request);
-    }
+  }
+
+  if (inject_request) {
+    pending_requests.push_front(inject_request);
+  }
+
+  if (will_participate) {
     cluster_membership = new_membership;
-    
-    std::lock_guard lc(agent_mutex);
-    if (agent_callback) {
-        agent_callback->if_newer = {0, 0};
-    }
+  } else {
+    cluster_membership.reset();
   }
 
-  if (!will_participate && inject_request) {
-    inject_request->complete(-EPERM);
+  std::lock_guard lc(agent_mutex);
+  if (agent_callback) {
+      agent_callback->if_newer = {0, 0};
   }
 }
 
-bool QuiesceDbManager::membership_upkeep()
+std::pair<QuiesceDbManager::IsMemberBool, QuiesceDbManager::ShouldExitBool> QuiesceDbManager::membership_upkeep()
 {
   if (cluster_membership && cluster_membership->epoch == membership.epoch) {
     // no changes
-    return true;
+    return {true, db_thread_should_exit};
   }
 
   bool was_leader = membership.epoch > 0 && membership.leader == membership.me;
@@ -206,7 +202,7 @@ bool QuiesceDbManager::membership_upkeep()
       << std::boolalpha << was_leader << "->" << is_leader << std::noboolalpha
       << " members:" << cluster_membership->members << dendl;
   } else {
-    dout(10) << "shutdown! was_leader: " << was_leader << dendl;
+    dout(10) << "not a member! was_leader: " << was_leader << dendl;
   }
 
   if (is_leader) {
@@ -238,18 +234,24 @@ bool QuiesceDbManager::membership_upkeep()
       done_requests[await_ctx.req_ctx] = EINPROGRESS;
     }
     awaits.clear();
-    // reject pending requests
+    // reject pending requests as not leader
     while (!pending_requests.empty()) {
-      done_requests[pending_requests.front()] = EPERM;
+      done_requests[pending_requests.front()] = ENOTTY;
       pending_requests.pop_front();
     }
   }
 
   if (cluster_membership) {
     membership = *cluster_membership;
+    dout(15) << "Updated membership" << dendl;
+  } else {
+    membership.epoch = 0;
+    peers.clear();
+    awaits.clear();
+    db.clear();
   }
 
-  return cluster_membership.has_value();
+  return { cluster_membership.has_value(), db_thread_should_exit };
 }
 
 QuiesceTimeInterval QuiesceDbManager::replica_upkeep(decltype(pending_db_updates)&& db_updates)
@@ -291,7 +293,7 @@ QuiesceTimeInterval QuiesceDbManager::replica_upkeep(decltype(pending_db_updates
   if (db.set_version > update.db_version.set_version) {
     dout(3) << "got an older version of DB from the leader: " << db.set_version << " > " << update.db_version.set_version << dendl;
     dout(3) << "discarding the DB" << dendl;
-    db.reset();
+    db.clear();
   } else {
     for (auto& [qs_id, qs] : update.sets) {
       db.sets.insert_or_assign(qs_id, std::move(qs));
index 08c8392d9815243848f6fc889355be89b4bfddfb..98d0b84fc24b6d8894ad97e24b1ffd7d781e066a 100644 (file)
@@ -49,7 +49,19 @@ class QuiesceDbManager {
     QuiesceDbManager() : quiesce_db_thread(this) {};
     virtual ~QuiesceDbManager()
     {
+      shutdown();
+    }
+
+    void shutdown() {
       update_membership({});
+
+      if (quiesce_db_thread.is_started()) {
+        submit_mutex.lock();
+        db_thread_should_exit = true;
+        submit_condition.notify_all();
+        submit_mutex.unlock();
+        quiesce_db_thread.join();
+      }
     }
 
     // This will reset the manager state
@@ -191,6 +203,7 @@ class QuiesceDbManager {
     std::queue<QuiesceDbPeerListing> pending_db_updates;
     std::queue<QuiesceDbPeerAck> pending_acks;
     std::deque<RequestContext*> pending_requests;
+    bool db_thread_should_exit = false;
 
     class QuiesceDbThread : public Thread {
       public:
@@ -220,7 +233,7 @@ class QuiesceDbManager {
       QuiesceTimeInterval get_age() const {
         return QuiesceClock::now() - time_zero;
       }
-      void reset() { 
+      void clear() { 
         set_version = 0; 
         sets.clear();
         time_zero = QuiesceClock::now();
@@ -257,23 +270,11 @@ class QuiesceDbManager {
     std::unordered_map<RequestContext*, int> done_requests;
 
     void* quiesce_db_thread_main();
-
-    void db_thread_enter() {
-      // this will invalidate the membership, see membership_upkeep()
-      membership.epoch = 0;
-      peers.clear();
-      awaits.clear();
-      done_requests.clear();
-      db.reset();
-    }
-
-    void db_thread_exit() {
-      complete_requests();
-    }
-
     bool db_thread_has_work() const;
 
-    bool membership_upkeep();
+    using IsMemberBool = bool;
+    using ShouldExitBool = bool;
+    std::pair<IsMemberBool, ShouldExitBool> membership_upkeep();
 
     QuiesceTimeInterval replica_upkeep(decltype(pending_db_updates)&& db_updates);
     bool leader_bootstrap(decltype(pending_db_updates)&& db_updates, QuiesceTimeInterval &next_event_at_age);
index f930f6c042a6b14f3fcc47b4dd5d12884828949f..2ffba9778831e5bce0cd778ee176028d74358462 100644 (file)
@@ -230,7 +230,7 @@ class QuiesceDbTest: public testing::Test {
         response.clear();
         request.reset(c);
 
-        int rr = -1;
+        int rr = -ENOTTY;
 
         for (auto& [rank, mgr] : parent.managers) {
           if (!(rr = mgr->submit_request(this))) {
@@ -238,10 +238,8 @@ class QuiesceDbTest: public testing::Test {
           }
         }
 
-        if (rr == EPERM) {
-          // change the error to something never returned for a request
-          // EPIPE seems reasonable as we couldn't find the leader to send the command to
-          complete(EPIPE);
+        if (rr) {
+          complete(rr);
           return false;
         }
 
@@ -358,6 +356,7 @@ TEST_F(QuiesceDbTest, ManagerStartup) {
   ASSERT_EQ(OK(), run_request_for(100, [](auto& r) {}));
   ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(2) }));
   ASSERT_EQ(OK(), run_request_for(100, [](auto& r) {}));
+  managers[mds_gid_t(2)]->shutdown();
   ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(1), mds_gid_t(2) }));
   ASSERT_EQ(OK(), run_request_for(100, [](auto& r) {}));
 }
@@ -1328,9 +1327,9 @@ TEST_F(QuiesceDbTest, LeaderShutdown)
 
   ASSERT_EQ(managers.at(mds_gid_t(1))->internal_pending_requests().size(), pending_requests.size());
 
-  // reset the membership of the manager
+  // shutdown the manager
   // this will block until the db thread exits
-  managers.at(mds_gid_t(1))->update_membership({});
+  managers.at(mds_gid_t(1))->shutdown();
 
   // as of now all requests must have finished
   while(!outstanding_awaits.empty()) {
@@ -1341,7 +1340,7 @@ TEST_F(QuiesceDbTest, LeaderShutdown)
 
   while (!pending_requests.empty()) {
     auto& r = *pending_requests.front();
-    EXPECT_EQ(ERR(EPERM), r.check_result());
+    EXPECT_EQ(ERR(ENOTTY), r.check_result());
     pending_requests.pop();
   }
 }