dout(5) << "starting the db mgr thread at epoch: " << new_membership.epoch << dendl;
db_thread_should_exit = false;
quiesce_db_thread.create("quiesce_db_mgr");
- } else {
+ } else if (quiesce_db_thread.is_started()) {
submit_condition.notify_all();
}
if (inject_request) {
- pending_requests.push_front(inject_request);
+ if (will_participate || quiesce_db_thread.is_started()) {
+ pending_requests.push_front(inject_request);
+ } else {
+ inject_request->complete(ENOTTY);
+ }
}
if (will_participate) {
cluster_membership = new_membership;
} else {
cluster_membership.reset();
+ db_thread_should_clear_db = true;
}
std::lock_guard lc(agent_mutex);
}
}
-std::pair<QuiesceDbManager::IsMemberBool, QuiesceDbManager::ShouldExitBool> QuiesceDbManager::membership_upkeep()
+std::pair<QuiesceDbManager::IsMemberBool, QuiesceDbManager::ShouldExitBool>
+QuiesceDbManager::membership_upkeep()
{
+ if (db_thread_should_clear_db) {
+ dout(5) << "a reset of the db has been requested" << dendl;
+ db_thread_should_clear_db = false;
+ membership.epoch = 0;
+ // clear the peers to bootstrap from scratch if we are the leader
+ peers.clear();
+ // reset the db
+ db.clear();
+ // not clearing awaits and requests, they will be handled below
+ }
+
if (cluster_membership && cluster_membership->epoch == membership.epoch) {
// no changes
return {true, db_thread_should_exit};
for (auto peer : cluster_membership->members) {
peers.try_emplace(peer);
}
-
- if (db.set_version == 0) {
- db.time_zero = QuiesceClock::now();
- db.sets.clear();
- }
-
} else {
peers.clear();
// abort awaits with EINPROGRESS
if (cluster_membership) {
membership = *cluster_membership;
dout(15) << "Updated membership" << dendl;
- } else {
- membership.epoch = 0;
- peers.clear();
- awaits.clear();
- db.clear();
}
return { cluster_membership.has_value(), db_thread_should_exit };
db.time_zero = time_zero;
if (db.set_version > update.db_version.set_version) {
- dout(3) << "got an older version of DB from the leader: " << db.set_version << " > " << update.db_version.set_version << dendl;
+ dout(3) << "got an older version of DB from the leader: " << update.db_version.set_version << " < " << db.set_version << dendl;
dout(3) << "discarding the DB" << dendl;
db.clear();
} else {
ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(1), mds_gid_t(2), mds_gid_t(3) }));
// we expect the db to be populated since the new leader must have discovered newer versions
- // we expect the sets to become quiescing since there's at least one member that's not acking (the new one)
+ // we expect the sets to become quiesced since all members are now acking
EXPECT_EQ(OK(), run_request([](auto& r) {
r.set_id = "set1";
r.await = sec(1);
});
// add back a quiescing peer
- ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(1), mds_gid_t(2), mds_gid_t(3)}));
-
- EXPECT_EQ(OK(), run_request([](auto& r) {}));
- ASSERT_EQ(2, last_request->response.sets.size());
- EXPECT_EQ(QS_QUIESCED, last_request->response.sets.at("set1").rstate.state);
- EXPECT_EQ(QS_QUIESCED, last_request->response.sets.at("set2").rstate.state);
+ ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(1), mds_gid_t(2), mds_gid_t(3) }));
EXPECT_EQ(std::future_status::ready, did_ack3.wait_for(std::chrono::milliseconds(2000)));
-
EXPECT_EQ(OK(), run_request([](auto& r) {}));
ASSERT_EQ(2, last_request->response.sets.size());
EXPECT_EQ(QS_QUIESCED, last_request->response.sets.at("set1").rstate.state);