struct QuiesceDbPeerAck {
QuiesceInterface::PeerId origin;
QuiesceMap diff_map;
+
+ QuiesceDbPeerAck() = default;
+ QuiesceDbPeerAck(QuiesceDbPeerAck const&) = default;
+ QuiesceDbPeerAck(QuiesceDbPeerAck &&) = default;
+ QuiesceDbPeerAck(QuiesceInterface::PeerId origin, std::convertible_to<QuiesceMap> auto&& diff_map)
+ : origin(origin)
+ , diff_map(std::forward<QuiesceMap>(diff_map))
+ {}
+
+ QuiesceDbPeerAck& operator=(QuiesceDbPeerAck const&) = default;
+ QuiesceDbPeerAck& operator=(QuiesceDbPeerAck&&) = default;
};
template <class CharT, class Traits>
// we're good to process things
next_event_at_age = leader_upkeep(std::move(acks), std::move(requests));
} else {
- // not yet there. Put the requests back onto the queue and wait for updates
+ // not yet there. Put the acks and requests back onto the queue and wait for updates
ls.lock();
while (!requests.empty()) {
pending_requests.emplace_front(std::move(requests.back()));
requests.pop_back();
}
+ while (!acks.empty()) {
+ pending_acks.emplace_front(std::move(acks.back()));
+ acks.pop_back();
+ }
if (pending_db_updates.empty()) {
- dout(5) << "bootstrap: waiting for peer updates with timeout " << bootstrap_delay << dendl;
+ dout(5) << "bootstrap: waiting for new peers with pending acks: " << pending_acks.size()
+ << " requests: " << pending_requests.size()
+ << ". Wait timeout: " << bootstrap_delay << dendl;
submit_condition.wait_for(ls, bootstrap_delay);
}
continue;
while (!acks.empty()) {
auto& [from, diff_map] = acks.front();
leader_record_ack(from, std::move(diff_map));
- acks.pop();
+ acks.pop_front();
}
// process requests
}
}
}
+
// non-zero result codes are all errors
+ dout(10) << "completing request '" << req->request << " with rc: " << -res << dendl;
req->complete(-res);
}
done_requests.clear();
continue;
}
reported_state = pr_state.state;
- reporting_peers.insert({pr_state.state, {peer, info.diff_map.db_version}});
}
// but we only consider the peer up to date given the version
up_to_date_peers++;
}
- min_reported_state = std::min(min_reported_state, reported_state);
- max_reported_state = std::max(max_reported_state, reported_state);
+ // we keep track of reported states only if the peer actually said something
+ // even if for an older version
+ if (info.diff_map.db_version.set_version > 0) {
+ reporting_peers.insert({ reported_state, { peer, info.diff_map.db_version } });
+ min_reported_state = std::min(min_reported_state, reported_state);
+ max_reported_state = std::max(max_reported_state, reported_state);
+ }
}
if (min_reported_state == QS__MAX) {
for (auto it = awaits.begin(); it != awaits.end();) {
auto & [set_id, actx] = *it;
Db::Sets::const_iterator set_it = db.sets.find(set_id);
+ QuiesceState set_state = QS__INVALID;
int rc = db.get_age() >= actx.expire_at_age ? EINPROGRESS : EBUSY;
if (set_it == db.sets.cend()) {
rc = ENOENT;
} else {
- auto const & set = set_it->second;
-
- switch(set.rstate.state) {
+ auto const& set = set_it->second;
+ set_state = set.rstate.state;
+ switch(set_state) {
case QS_CANCELED:
rc = ECANCELED;
break;
}
if (rc != EBUSY) {
- dout(10) << "completing an await for the set '" << set_id << "' with rc: " << rc << dendl;
done_requests[actx.req_ctx] = rc;
it = awaits.erase(it);
} else {
return -ESTALE;
}
- pending_acks.push(std::move(ack));
+ pending_acks.emplace_back(std::move(ack));
submit_condition.notify_all();
return 0;
}
if (cluster_membership->leader == cluster_membership->me) {
// local delivery
- pending_acks.push({ cluster_membership->me, std::move(diff_map) });
+ pending_acks.emplace_back(cluster_membership->me, std::move(diff_map));
submit_condition.notify_all();
} else {
// send to the leader outside of the lock
std::optional<AgentCallback> agent_callback;
std::optional<QuiesceClusterMembership> cluster_membership;
std::queue<QuiesceDbPeerListing> pending_db_updates;
- std::queue<QuiesceDbPeerAck> pending_acks;
+ std::deque<QuiesceDbPeerAck> pending_acks;
std::deque<RequestContext*> pending_requests;
bool db_thread_should_exit = false;
bool db_thread_should_clear_db = true;