#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_mds_quiesce
#undef dout_prefix
-#define dout_prefix *_dout << "quiesce.mgr <" << __func__ << "> "
+#define dout_prefix *_dout << "quiesce.mgr." << membership.me << " <" << __func__ << "> "
#undef dout
#define dout(lvl) \
bool QuiesceDbManager::db_thread_has_work() const
{
- return false
+ return db_thread_should_exit
|| pending_acks.size() > 0
|| pending_requests.size() > 0
|| pending_db_updates.size() > 0
|| (agent_callback.has_value() && agent_callback->if_newer < db_version())
- || (!cluster_membership.has_value() || cluster_membership->epoch != membership.epoch);
+ || (cluster_membership.has_value() && cluster_membership->epoch != membership.epoch);
}
void* QuiesceDbManager::quiesce_db_thread_main()
{
- db_thread_enter();
-
std::unique_lock ls(submit_mutex);
QuiesceTimeInterval next_event_at_age = QuiesceTimeInterval::max();
QuiesceDbVersion last_acked = {0, 0};
- while (true) {
+ dout(5) << "Entering the main thread" << dendl;
+ bool keep_working = true;
+ while (keep_working) {
auto db_age = db.get_age();
submit_condition.wait_for(ls, next_event_at_age - db_age);
}
- if (!membership_upkeep()) {
- break;
- }
+ auto [is_member, should_exit] = membership_upkeep();
+ keep_working = !should_exit;
- {
+ if (is_member) {
decltype(pending_acks) acks(std::move(pending_acks));
decltype(pending_requests) requests(std::move(pending_requests));
decltype(pending_db_updates) db_updates(std::move(pending_db_updates));
} else {
next_event_at_age = replica_upkeep(std::move(db_updates));
}
+ } else {
+ ls.unlock();
+ dout(15) << "not a cluster member, keeping idle " << dendl;
+ next_event_at_age = QuiesceTimeInterval::max();
}
complete_requests();
}
}
- if (send_ack) {
+ if (is_member && send_ack) {
auto db_version = quiesce_map.db_version;
dout(20) << "synchronous agent ack: " << quiesce_map << dendl;
auto rc = membership.send_ack(std::move(quiesce_map));
ls.unlock();
- db_thread_exit();
+ dout(5) << "Exiting the main thread" << dendl;
return 0;
}
bool will_participate = new_membership.members.contains(new_membership.me);
dout(20) << "will participate: " << std::boolalpha << will_participate << std::noboolalpha << dendl;
- if (cluster_membership && !will_participate) {
- // stop the thread
- cluster_membership.reset();
+ if (will_participate && !quiesce_db_thread.is_started()) {
+ // start the thread
+ dout(5) << "starting the db mgr thread at epoch: " << new_membership.epoch << dendl;
+ db_thread_should_exit = false;
+ quiesce_db_thread.create("quiesce_db_mgr");
+ } else {
submit_condition.notify_all();
- lock.unlock();
- ceph_assert(quiesce_db_thread.is_started());
- dout(5) << "stopping the db mgr thread at epoch: " << new_membership.epoch << dendl;
- quiesce_db_thread.join();
- } else if (will_participate) {
- if (!cluster_membership) {
- // start the thread
- dout(5) << "starting the db mgr thread at epoch: " << new_membership.epoch << dendl;
- quiesce_db_thread.create("quiesce_db_mgr");
- } else {
- submit_condition.notify_all();
- }
- if (inject_request) {
- pending_requests.push_front(inject_request);
- }
+ }
+
+ if (inject_request) {
+ pending_requests.push_front(inject_request);
+ }
+
+ if (will_participate) {
cluster_membership = new_membership;
-
- std::lock_guard lc(agent_mutex);
- if (agent_callback) {
- agent_callback->if_newer = {0, 0};
- }
+ } else {
+ cluster_membership.reset();
}
- if (!will_participate && inject_request) {
- inject_request->complete(-EPERM);
+ std::lock_guard lc(agent_mutex);
+ if (agent_callback) {
+ agent_callback->if_newer = {0, 0};
}
}
-bool QuiesceDbManager::membership_upkeep()
+std::pair<QuiesceDbManager::IsMemberBool, QuiesceDbManager::ShouldExitBool> QuiesceDbManager::membership_upkeep()
{
if (cluster_membership && cluster_membership->epoch == membership.epoch) {
// no changes
- return true;
+ return {true, db_thread_should_exit};
}
bool was_leader = membership.epoch > 0 && membership.leader == membership.me;
<< std::boolalpha << was_leader << "->" << is_leader << std::noboolalpha
<< " members:" << cluster_membership->members << dendl;
} else {
- dout(10) << "shutdown! was_leader: " << was_leader << dendl;
+ dout(10) << "not a member! was_leader: " << was_leader << dendl;
}
if (is_leader) {
done_requests[await_ctx.req_ctx] = EINPROGRESS;
}
awaits.clear();
- // reject pending requests
+ // reject pending requests as not leader
while (!pending_requests.empty()) {
- done_requests[pending_requests.front()] = EPERM;
+ done_requests[pending_requests.front()] = ENOTTY;
pending_requests.pop_front();
}
}
if (cluster_membership) {
membership = *cluster_membership;
+ dout(15) << "Updated membership" << dendl;
+ } else {
+ membership.epoch = 0;
+ peers.clear();
+ awaits.clear();
+ db.clear();
}
- return cluster_membership.has_value();
+ return { cluster_membership.has_value(), db_thread_should_exit };
}
QuiesceTimeInterval QuiesceDbManager::replica_upkeep(decltype(pending_db_updates)&& db_updates)
if (db.set_version > update.db_version.set_version) {
dout(3) << "got an older version of DB from the leader: " << db.set_version << " > " << update.db_version.set_version << dendl;
dout(3) << "discarding the DB" << dendl;
- db.reset();
+ db.clear();
} else {
for (auto& [qs_id, qs] : update.sets) {
db.sets.insert_or_assign(qs_id, std::move(qs));
QuiesceDbManager() : quiesce_db_thread(this) {};
virtual ~QuiesceDbManager()
{
+ shutdown();
+ }
+
+ void shutdown() {
update_membership({});
+
+ if (quiesce_db_thread.is_started()) {
+ submit_mutex.lock();
+ db_thread_should_exit = true;
+ submit_condition.notify_all();
+ submit_mutex.unlock();
+ quiesce_db_thread.join();
+ }
}
// This will reset the manager state
std::queue<QuiesceDbPeerListing> pending_db_updates;
std::queue<QuiesceDbPeerAck> pending_acks;
std::deque<RequestContext*> pending_requests;
+ bool db_thread_should_exit = false;
class QuiesceDbThread : public Thread {
public:
QuiesceTimeInterval get_age() const {
return QuiesceClock::now() - time_zero;
}
- void reset() {
+ void clear() {
set_version = 0;
sets.clear();
time_zero = QuiesceClock::now();
std::unordered_map<RequestContext*, int> done_requests;
void* quiesce_db_thread_main();
-
- void db_thread_enter() {
- // this will invalidate the membership, see membership_upkeep()
- membership.epoch = 0;
- peers.clear();
- awaits.clear();
- done_requests.clear();
- db.reset();
- }
-
- void db_thread_exit() {
- complete_requests();
- }
-
bool db_thread_has_work() const;
- bool membership_upkeep();
+ using IsMemberBool = bool;
+ using ShouldExitBool = bool;
+ std::pair<IsMemberBool, ShouldExitBool> membership_upkeep();
QuiesceTimeInterval replica_upkeep(decltype(pending_db_updates)&& db_updates);
bool leader_bootstrap(decltype(pending_db_updates)&& db_updates, QuiesceTimeInterval &next_event_at_age);
response.clear();
request.reset(c);
- int rr = -1;
+ int rr = -ENOTTY;
for (auto& [rank, mgr] : parent.managers) {
if (!(rr = mgr->submit_request(this))) {
}
}
- if (rr == EPERM) {
- // change the error to something never returned for a request
- // EPIPE seems reasonable as we couldn't find the leader to send the command to
- complete(EPIPE);
+ if (rr) {
+ complete(rr);
return false;
}
ASSERT_EQ(OK(), run_request_for(100, [](auto& r) {}));
ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(2) }));
ASSERT_EQ(OK(), run_request_for(100, [](auto& r) {}));
+ managers[mds_gid_t(2)]->shutdown();
ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(1), mds_gid_t(2) }));
ASSERT_EQ(OK(), run_request_for(100, [](auto& r) {}));
}
ASSERT_EQ(managers.at(mds_gid_t(1))->internal_pending_requests().size(), pending_requests.size());
- // reset the membership of the manager
+ // shutdown the manager
// this will block until the db thread exits
- managers.at(mds_gid_t(1))->update_membership({});
+ managers.at(mds_gid_t(1))->shutdown();
// as of now all requests must have finished
while(!outstanding_awaits.empty()) {
while (!pending_requests.empty()) {
auto& r = *pending_requests.front();
- EXPECT_EQ(ERR(EPERM), r.check_result());
+ EXPECT_EQ(ERR(ENOTTY), r.check_result());
pending_requests.pop();
}
}