if (next_event_at_age <= db_age) {
break;
}
+ dout(20) << "db idle, age: " << db_age << " next_event_at_age: " << next_event_at_age << dendl;
auto timeout = std::min(max_wait, next_event_at_age - db_age);
- auto wait_result = submit_condition.wait_for(ls, timeout);
- if (std::cv_status::timeout == wait_result) {
- dout(20) << "db idle, age: " << db_age << dendl;
- }
+ submit_condition.wait_for(ls, timeout);
}
auto [is_member, should_exit] = membership_upkeep();
next_event_at_age = leader_upkeep(std::move(acks), std::move(requests));
} else {
// not yet there. Put the acks and requests back onto the queue and wait for updates
+ // We should mark the next event age in case we get caught up in the sleep above
+ next_event_at_age = db.get_age() + bootstrap_delay;
ls.lock();
while (!requests.empty()) {
pending_requests.emplace_front(std::move(requests.back()));
acks.pop_back();
}
if (pending_db_updates.empty()) {
+ // we are waiting here because if requests/acks aren't empty
+ // the code above will skip the sleep due to the `db_thread_has_work`
+ // returning true, causing a busy-loop of the quiesce manager thread.
+ // This sleep may be interrupted by the submit_condition, in which case
+ // we will re-consider everything and may end up here again, but with a shorter
+ // bootstrap_delay.
dout(5) << "bootstrap: waiting for new peers with pending acks: " << pending_acks.size()
<< " requests: " << pending_requests.size()
<< ". Wait timeout: " << bootstrap_delay << dendl;
submit_condition.notify_all();
return ++cluster_membership->epoch;
}
+ std::atomic<std::optional<bool>> has_work_override;
+ bool db_thread_has_work() const override {
+ if (auto has_work = has_work_override.load()) {
+ return *has_work;
+ }
+ return QuiesceDbManager::db_thread_has_work();
+ }
+
+ void spurious_submit_wakeup()
+ {
+ std::lock_guard l(submit_mutex);
+ submit_condition.notify_all();
+ }
};
epoch_t epoch = 0;
return promise.get_future();
}
+ using ListingHook = std::function<bool(QuiesceInterface::PeerId, QuiesceDbListing&)>;
+ std::list<std::pair<ListingHook, std::promise<void>>> listing_hooks;
+
+ std::future<void> add_listing_hook(ListingHook&& predicate)
+ {
+ std::lock_guard l(comms_mutex);
+ auto&& [_, promise] = listing_hooks.emplace_back(predicate, std::promise<void> {});
+ return promise.get_future();
+ }
+
void SetUp() override {
for (QuiesceInterface::PeerId r = mds_gid_t(1); r < mds_gid_t(11); r++) {
managers[r].reset(new TestQuiesceDbManager());
std::unique_lock l(comms_mutex);
if (epoch == this->epoch) {
if (this->managers.contains(recipient)) {
+ std::queue<std::promise<void>> done_hooks;
dout(10) << "listing from " << me << " (leader=" << leader << ") to " << recipient << " for version " << listing.db_version << " with " << listing.sets.size() << " sets" << dendl;
+ for (auto it = listing_hooks.begin(); it != listing_hooks.end();) {
+ if (it->first(recipient, listing)) {
+ done_hooks.emplace(std::move(it->second));
+ it = listing_hooks.erase(it);
+ } else {
+ it++;
+ }
+ }
+
ceph::bufferlist bl;
encode(listing, bl);
listing.clear();
this->managers[recipient]->submit_peer_listing({me, std::move(listing)});
comms_cond.notify_all();
+ l.unlock();
+ while (!done_hooks.empty()) {
+ done_hooks.front().set_value();
+ done_hooks.pop();
+ }
return 0;
}
}
}
}
+/* ================================================================ */
+TEST_F(QuiesceDbTest, MultiRankBootstrap)
+{
+ // create a cluster with a peer that doesn't process messages
+ managers.at(mds_gid_t(2))->has_work_override = false;
+ ASSERT_NO_FATAL_FAILURE(configure_cluster({ mds_gid_t(1), mds_gid_t(2) }));
+
+ const QuiesceTimeInterval PEER_DISCOVERY_INTERVAL = std::chrono::milliseconds(1100);
+
+ // we should be now in the bootstrap loop,
+ // which should send discoveries to silent peers
+ // once in PEER_DISCOVERY_INTERVAL
+ for (int i = 0; i < 5; i++) {
+
+ if (i > 2) {
+ // through a wrench by disrupting the wait sleep in the bootstrap flow
+ managers.at(mds_gid_t(1))->spurious_submit_wakeup();
+ }
+
+ // wait for the next peer discovery request
+ auto saw_discovery = add_listing_hook([](auto recipient, auto const& listing) {
+ return recipient == mds_gid_t(2) && listing.db_version.set_version == 0;
+ });
+
+ EXPECT_EQ(std::future_status::ready, saw_discovery.wait_for(PEER_DISCOVERY_INTERVAL + std::chrono::milliseconds(100)));
+ }
+}
+
/* ================================================================ */
TEST_F(QuiesceDbTest, MultiRankQuiesce)
{