dout(15) << "my epoch is behind, ignoring this until my membership is updated" << dendl;
} else {
dout(5) << "will send the peer a full DB" << dendl;
- info.diff_map.clear();
+ info.clear();
}
} else {
dout(20) << "ack " << diff_map << " from peer " << from << dendl;
QuiesceTimeInterval QuiesceDbManager::leader_upkeep_db()
{
- std::map<QuiesceInterface::PeerId, std::deque<std::reference_wrapper<Db::Sets::value_type>>> peer_updates;
-
QuiesceTimeInterval next_event_at_age = QuiesceTimeInterval::max();
QuiesceSetVersion max_set_version = db.set_version;
+ struct PeerUpdate {
+ QuiesceInterface::PeerId peer;
+ PeerInfo& info;
+ std::deque<std::reference_wrapper<Db::Sets::value_type>> set_refs;
+ PeerUpdate(QuiesceInterface::PeerId peer, PeerInfo& info)
+ : peer(peer)
+ , info(info)
+ {}
+
+ QuiesceSetVersion known_set_version() const
+ {
+ return info.diff_map.db_version.set_version;
+ }
+ };
+
+ // populate peer_updates with peers except me
+ std::vector<PeerUpdate> peer_updates;
+ for (auto& [peer, info]: peers) {
+ // no need to replicate to myself
+ if (peer != membership.me) {
+ peer_updates.emplace_back(peer, info);
+ }
+ }
+
for(auto & set_it: db.sets) {
auto & [set_id, set] = set_it;
auto next_set_event_at_age = leader_upkeep_set(set_it);
max_set_version = std::max(max_set_version, set.version);
next_event_at_age = std::min(next_event_at_age, next_set_event_at_age);
- for(auto const & [peer, info]: peers) {
+ for(auto & peer_update: peer_updates) {
// update remote peers if their version is lower than this set's
- // don't update myself
- if (peer == membership.me) {
- continue;
- }
- if (info.diff_map.db_version.set_version < set.version) {
- peer_updates[peer].emplace_back(set_it);
+ if (peer_update.known_set_version() < set.version) {
+ peer_update.set_refs.emplace_back(set_it);
}
}
}
db.set_version = max_set_version;
// update the peers
- for (auto &[peer, sets]: peer_updates) {
- QuiesceDbListing update;
- update.db_age = db.get_age();
- update.db_version = db.version();
- std::ranges::copy(sets, std::inserter(update.sets, update.sets.end()));
+ const auto now = QuiesceClock::now();
+ static const QuiesceTimeInterval PEER_REPEATED_UPDATE_INTERVAL = std::chrono::seconds(1);
+ for (auto const & peer_update: peer_updates) {
+ if (peer_update.info.last_sent_version == db.version()) {
+ if (now < (peer_update.info.last_activity + PEER_REPEATED_UPDATE_INTERVAL)) {
+ // don't spam the peer with the same version
+ continue;
+ }
+ dout(5) << "repeated update of the peer " << peer_update.peer << " with version " << db.version() << dendl;
+ }
+
+ QuiesceDbListing listing;
+ listing.db_age = db.get_age();
+ listing.db_version = db.version();
+ std::ranges::copy(peer_update.set_refs, std::inserter(listing.sets, listing.sets.end()));
- dout(20) << "updating peer " << peer << " with " << sets.size()
+ dout(20) << "updating peer " << peer_update.peer << " with " << peer_update.set_refs.size()
<< " sets modified in db version range ("
- << peers[peer].diff_map.db_version << ".." << db.set_version << "]" << dendl;
+ << peer_update.known_set_version() << ".." << db.set_version << "]" << dendl;
- auto rc = membership.send_listing_to(peer, std::move(update));
+ auto rc = membership.send_listing_to(peer_update.peer, std::move(listing));
if (rc != 0) {
dout(1) << "ERROR (" << rc << ") trying to replicate db version "
- << db.set_version << " with " << sets.size()
- << " sets to the peer " << peer << dendl;
+ << db.set_version << " with " << peer_update.set_refs.size()
+ << " sets to the peer " << peer_update.peer << dendl;
+ } else {
+ peer_update.info.last_activity = now;
+ peer_update.info.last_sent_version = db.version();
}
}