|| pending_acks.size() > 0
|| pending_requests.size() > 0
|| pending_db_updates.size() > 0
- || (agent_callback.has_value() && agent_callback->if_newer < db_version())
+ || (agent_callback.has_value() && agent_callback->if_newer < db.version())
|| (cluster_membership.has_value() && cluster_membership->epoch != membership.epoch);
}
ls.unlock();
if (membership.is_leader()) {
- if (leader_bootstrap(std::move(db_updates), next_event_at_age)) {
+ const QuiesceTimeInterval bootstrap_delay = leader_bootstrap(std::move(db_updates));
+ if (bootstrap_delay == QuiesceTimeInterval::zero()){
// we're good to process things
next_event_at_age = leader_upkeep(std::move(acks), std::move(requests));
} else {
- // not yet there. Put the requests back onto the queue
+ // not yet there. Put the requests back onto the queue and wait for updates
ls.lock();
while (!requests.empty()) {
pending_requests.emplace_front(std::move(requests.back()));
requests.pop_back();
}
+ if (pending_db_updates.empty()) {
+ dout(5) << "bootstrap: waiting for peer updates with timeout " << bootstrap_delay << dendl;
+ submit_condition.wait_for(ls, bootstrap_delay);
+ }
continue;
}
} else {
complete_requests();
// by default, only send ack if the version has changed
- bool send_ack = last_acked != db_version();
- QuiesceMap quiesce_map(db_version());
+ bool send_ack = last_acked != db.version();
+ QuiesceMap quiesce_map(db.version());
{
std::lock_guard lc(agent_mutex);
if (agent_callback) {
- if (agent_callback->if_newer < db_version()) {
- dout(20) << "notifying agent with db version " << db_version() << dendl;
+ if (agent_callback->if_newer < db.version()) {
+ dout(20) << "notifying agent with db version " << db.version() << dendl;
calculate_quiesce_map(quiesce_map);
send_ack = agent_callback->notify(quiesce_map);
- agent_callback->if_newer = db_version();
+ agent_callback->if_newer = db.version();
} else {
send_ack = false;
}
for (auto peer : cluster_membership->members) {
peers.try_emplace(peer);
}
+ // update the db epoch
+ db.epoch = cluster_membership->epoch;
} else {
peers.clear();
// abort awaits with EINPROGRESS
QuiesceDbListing &update = db_updates.back().db;
- if (update.db_version.epoch != membership.epoch) {
- dout(10) << "ignoring db update from another epoch: " << update.db_version << " != " << db_version() << dendl;
- return QuiesceTimeInterval::max();
- }
-
if (update.db_version.set_version == 0) {
- // this is a call from a leader
- // to upload our local db version
+ // this is a call from the leader to upload our local db version
update.sets = db.sets;
update.db_version.set_version = db.set_version;
update.db_age = db.get_age();
dout(10) << "significant db_time_zero change to " << time_zero << " from " << db.time_zero << dendl;
}
db.time_zero = time_zero;
+ db.epoch = update.db_version.epoch;
if (db.set_version > update.db_version.set_version) {
dout(3) << "got an older version of DB from the leader: " << update.db_version.set_version << " < " << db.set_version << dendl;
return QuiesceTimeInterval::max();
}
-bool QuiesceDbManager::leader_bootstrap(decltype(pending_db_updates)&& db_updates, QuiesceTimeInterval &next_event_at_age)
+QuiesceTimeInterval QuiesceDbManager::leader_bootstrap(decltype(pending_db_updates)&& db_updates)
{
+ const QuiesceTimeInterval PEER_DISCOVERY_INTERVAL = std::chrono::seconds(1);
+ QuiesceTimeInterval bootstrap_delay = PEER_DISCOVERY_INTERVAL;
+
// check that we've heard from all peers in this epoch
std::unordered_set<QuiesceInterface::PeerId> unknown_peers;
for (auto&& [peer, info] : peers) {
if (db.set_version < update.db_version.set_version) {
dout(3) << "preferring version from peer "
<< from << " (" << update.db_version
- << ") over mine (" << db_version() << ")"
+ << ") over mine (" << db.version() << ")"
<< " and incrementing it to collect acks" << dendl;
db.time_zero = QuiesceClock::now() - update.db_age;
db.set_version = update.db_version.set_version + 1;
db_updates.pop();
}
+ QuiesceTimePoint const now = QuiesceClock::now();
for (auto & peer: unknown_peers) {
PeerInfo & info = peers[peer];
- QuiesceTimePoint next_discovery = info.last_seen + std::chrono::seconds(1);
- if (info.last_seen == QuiesceClock::zero() || next_discovery < QuiesceClock::now()) {
+ QuiesceTimePoint next_discovery = info.last_activity + PEER_DISCOVERY_INTERVAL;
+ if (next_discovery < now) {
// send a discovery request to unknown peers
dout(5) << " sending a discovery request to " << peer << dendl;
membership.send_listing_to(peer, QuiesceDbListing(membership.epoch));
- info.last_seen = QuiesceClock::now();
- next_discovery = info.last_seen + std::chrono::seconds(1);
+ info.last_activity = now;
+ next_discovery = info.last_activity + PEER_DISCOVERY_INTERVAL;
}
- QuiesceTimeInterval next_discovery_at_age = next_discovery - db.time_zero;
+ // next_discovery is >= now
+ if (bootstrap_delay > next_discovery - now) {
+ bootstrap_delay = (next_discovery - now);
+ }
+ }
- next_event_at_age = std::min(next_event_at_age, next_discovery_at_age);
+ bool all_peers_known = unknown_peers.empty();
+
+ if (!all_peers_known) {
+ dout(10) << "unknown peers: " << unknown_peers << dendl;
}
- // true if all peers are known
- return unknown_peers.empty();
+ // add some margin to hit the discovery interval for the earliest discovery.
+ const QuiesceTimeInterval a_little_more = std::chrono::milliseconds(100);
+ return all_peers_known ? QuiesceTimeInterval::zero() : (bootstrap_delay + a_little_more);
}
QuiesceTimeInterval QuiesceDbManager::leader_upkeep(decltype(pending_acks)&& acks, decltype(pending_requests)&& requests)
r.clear();
if (membership.leader == membership.me) {
r.db_age = db.get_age();
- r.db_version = db_version();
+ r.db_version = db.version();
if (req->request.set_id) {
Db::Sets::const_iterator it = db.sets.find(*req->request.set_id);
auto & info = it->second;
- if (diff_map.db_version > db_version()) {
- dout(15) << "future version ack by peer " << from << " (" << diff_map.db_version << " > " << db_version() << ")" << dendl;
- if (diff_map.db_version.epoch > db_version().epoch && diff_map.db_version.set_version <= db_version().set_version) {
+ if (diff_map.db_version > db.version()) {
+ dout(15) << "future version ack by peer " << from << " (" << diff_map.db_version << " > " << db.version() << ")" << dendl;
+ if (diff_map.db_version.epoch > db.version().epoch && diff_map.db_version.set_version <= db.version().set_version) {
dout(15) << "my epoch is behind, ignoring this until my membership is updated" << dendl;
} else {
dout(5) << "will send the peer a full DB" << dendl;
} else {
dout(20) << "ack " << diff_map << " from peer " << from << dendl;
info.diff_map = std::move(diff_map);
- info.last_seen = QuiesceClock::now();
+ info.last_activity = QuiesceClock::now();
}
}
for (auto &[peer, sets]: peer_updates) {
QuiesceDbListing update;
update.db_age = db.get_age();
- update.db_version = db_version();
+ update.db_version = db.version();
std::ranges::copy(sets, std::inserter(update.sets, update.sets.end()));
dout(20) << "updating peer " << peer << " with " << sets.size()
std::multimap<QuiesceState, std::pair<QuiesceInterface::PeerId, QuiesceDbVersion>> reporting_peers;
for (auto& [peer, info] : peers) {
- // we consider the last bit of information we had from a given peer
- // however, we want to skip peers which haven't been bootstrapped yet
- if (info.diff_map.db_version.set_version == 0) {
- continue;
- }
+ // we consider the last bit of information we had from the peer
auto dit = info.diff_map.roots.find(root);
QuiesceState reported_state = set.get_requested_member_state();
void QuiesceDbManager::calculate_quiesce_map(QuiesceMap &map)
{
map.roots.clear();
- map.db_version = db_version();
+ map.db_version = db.version();
auto db_age = db.get_age();
for(auto & [set_id, set]: db.sets) {