return;
}
+void PeeringState::update_peer_info(const pg_shard_t &from,
+ const pg_info_t &oinfo)
+{
+ if (!oinfo.partial_writes_last_complete.empty()) {
+ bool updated = false;
+ // oinfo includes partial_writes_last_complete data.
+ // Merge this with our copy keeping the most up to date versions
+ for (const auto & [shard, versionrange] :
+ oinfo.partial_writes_last_complete) {
+ auto & [ofromversion, otoversion] = versionrange;
+ if (info.partial_writes_last_complete.contains(shard)) {
+ auto & [fromversion, toversion] =
+ info.partial_writes_last_complete[shard];
+ // Prefer pwlc with a newer toversion, if toversion matches prefer an
+ // older fromversion.
+ if ((otoversion > toversion) ||
+ ((otoversion == toversion) && (ofromversion < fromversion))) {
+ if (!updated) {
+ updated = true;
+ psdout(10) << "osd." << from
+ << " has pwlc=" << oinfo.partial_writes_last_complete
+ << dendl;
+ }
+ psdout(10) << "osd." << from << " updating shard " << shard << dendl;
+ info.partial_writes_last_complete[shard] = versionrange;
+ }
+ } else {
+ if (!updated) {
+ updated = true;
+ psdout(10) << "osd." << from
+ << " has pwlc=" << oinfo.partial_writes_last_complete
+ << dendl;
+ }
+ psdout(10) << "osd." << from << " setting shard " << shard << dendl;
+ info.partial_writes_last_complete[shard] = versionrange;
+ }
+ }
+ if (updated) {
+ psdout(10) << "pwlc=" << info.partial_writes_last_complete << dendl;
+ }
+ }
+ // 3 cases:
+ // We are the primary - from is the shard that sent the oinfo
+ // We are a replica - from is the primary, it will not have pwlc infomation
+ // Merge - from is pg_whoami, oinfo is a source pg that is being merged
+ if ((from != pg_whoami) &&
+ info.partial_writes_last_complete.contains(from.shard)) {
+ // Check if last_complete and last_update can be advanced based on
+ // knowledge of partial_writes
+ const auto & [fromversion, toversion] =
+ info.partial_writes_last_complete[from.shard];
+ if (toversion > peer_info[from].last_complete) {
+ if (fromversion <= peer_info[from].last_complete) {
+ psdout(10) << "osd." << from << " has last_complete "
+ << peer_info[from].last_complete
+ << " but pwlc says its at " << toversion
+ << dendl;
+ peer_info[from].last_complete = toversion;
+ if (toversion > peer_info[from].last_update) {
+ peer_info[from].last_update = toversion;
+ }
+ } else {
+ psdout(10) << "osd." << from << " has last_complete "
+ << peer_info[from].last_complete
+ << " cannot apply pwlc from " << fromversion
+ << " to " << toversion
+ << dendl;
+ }
+ }
+ }
+}
+
bool PeeringState::proc_replica_notify(const pg_shard_t &from, const pg_notify_t ¬ify)
{
const pg_info_t &oinfo = notify.info;
psdout(10) << " got osd." << from << " " << oinfo << dendl;
ceph_assert(is_primary());
peer_info[from] = oinfo;
+ update_peer_info(from, oinfo);
might_have_unfound.insert(from);
update_history(oinfo.history);
oinfo.last_update != eversion_t()) {
pg_info_t tinfo(oinfo);
tinfo.pgid.shard = pg_whoami.shard;
+ // add partial write from our info
+ tinfo.partial_writes_last_complete = info.partial_writes_last_complete;
+ if (!tinfo.partial_writes_last_complete.empty()) {
+ psdout(20) << "sending info to " << from
+ << " pwcl=" << tinfo.partial_writes_last_complete
+ << " info=" << tinfo
+ << dendl;
+ }
ctx.send_info(
from.osd,
spg_t(info.pgid.pgid, from.shard),
if (!pi.is_empty()) {
psdout(10) << "activate peer osd." << peer
<< " is up to date, queueing in pending_activators" << dendl;
+ if (!info.partial_writes_last_complete.empty()) {
+ psdout(20) << "sending info to " << peer
+ << " pwcl=" << info.partial_writes_last_complete
+ << " info=" << info
+ << dendl;
+ }
ctx.send_info(
peer.osd,
spg_t(info.pgid.pgid, peer.shard),
// non-divergent).
merge_log(t, oinfo, std::move(olog), from);
peer_info[from] = oinfo;
+ update_peer_info(from, oinfo);
psdout(10) << " peer osd." << from << " now " << oinfo
<< " " << omissing << dendl;
might_have_unfound.insert(from);
pg_log.proc_replica_log(oinfo, olog, omissing, from);
peer_info[from] = oinfo;
+ update_peer_info(from, oinfo);
psdout(10) << " peer osd." << from << " now "
<< oinfo << " " << omissing << dendl;
might_have_unfound.insert(from);
pg_info_t i = ps->info;
i.history.last_epoch_started = evt.activation_epoch;
i.history.last_interval_started = i.history.same_interval_since;
+ if (!i.partial_writes_last_complete.empty()) {
+ psdout(20) << "sending info to " << ps->get_primary() << " pwcl="
+ << i.partial_writes_last_complete << " info=" << i << dendl;
+ }
rctx.send_info(
ps->get_primary().osd,
spg_t(ps->info.pgid.pgid, ps->get_primary().shard),
psdout(10) << "received log from " << logevt.from << dendl;
ObjectStore::Transaction &t = context<PeeringMachine>().get_cur_transaction();
ps->merge_log(t, logevt.msg->info, std::move(logevt.msg->log), logevt.from);
+ ps->update_peer_info(logevt.from, logevt.msg->info);
ceph_assert(ps->pg_log.get_head() == ps->info.last_update);
if (logevt.msg->lease) {
ps->proc_lease(*logevt.msg->lease);
ps->pg_log.reset_backfill();
} else {
ps->merge_log(t, msg->info, std::move(msg->log), logevt.from);
+ ps->update_peer_info(logevt.from, msg->info);
}
if (logevt.msg->lease) {
ps->proc_lease(*logevt.msg->lease);
ceph_assert(infoevt.info.last_update == ps->info.last_update);
ceph_assert(ps->pg_log.get_head() == ps->info.last_update);
-
+ // Update pwlc
+ ps->update_peer_info(infoevt.from, infoevt.info);
post_event(Activate(infoevt.info.last_epoch_started));
return transit<ReplicaActive>();
}
psdout(10) << "Noting missing from osd." << logevt.from << dendl;
ps->peer_missing[logevt.from].claim(std::move(logevt.msg->missing));
ps->peer_info[logevt.from] = logevt.msg->info;
+ ps->update_peer_info(logevt.from, logevt.msg->info);
return discard_event();
}