From: Bill Scales Date: Wed, 26 Mar 2025 08:30:32 +0000 (+0000) Subject: osd: EC Optimizations: Share pwlc between peers X-Git-Tag: testing/wip-rishabh-testing-20250426.123842-debug~35^2~18 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=dbfe6b0a4f2f6d1c44f0da11d4c83fb1c12f5434;p=ceph-ci.git osd: EC Optimizations: Share pwlc between peers Optimized EC pools add partial_writes_last_complete (pwlc) data to pg_info_t to track shards that were not updated because of partial writes. During peering the primary collects the info structure from all the replica/strays and then having reconciled the log can send the info back to peers. Different shards may have newer/older versions of pwlc, the primary merges these together to create the definitive copy and then redistributes this to the other shards. The primary also adjusts the last_update and last_complete values in the info structure received from peers using the pwlc data to advance these where shards were not updated because of a partial write. Signed-off-by: Bill Scales --- diff --git a/src/osd/PeeringState.cc b/src/osd/PeeringState.cc index e3d10e4024e..ad6e9283141 100644 --- a/src/osd/PeeringState.cc +++ b/src/osd/PeeringState.cc @@ -321,6 +321,78 @@ void PeeringState::query_unfound(Formatter *f, string state) return; } +void PeeringState::update_peer_info(const pg_shard_t &from, + const pg_info_t &oinfo) +{ + if (!oinfo.partial_writes_last_complete.empty()) { + bool updated = false; + // oinfo includes partial_writes_last_complete data. + // Merge this with our copy keeping the most up to date versions + for (const auto & [shard, versionrange] : + oinfo.partial_writes_last_complete) { + auto & [ofromversion, otoversion] = versionrange; + if (info.partial_writes_last_complete.contains(shard)) { + auto & [fromversion, toversion] = + info.partial_writes_last_complete[shard]; + // Prefer pwlc with a newer toversion, if toversion matches prefer an + // older fromversion. + if ((otoversion > toversion) || + ((otoversion == toversion) && (ofromversion < fromversion))) { + if (!updated) { + updated = true; + psdout(10) << "osd." << from + << " has pwlc=" << oinfo.partial_writes_last_complete + << dendl; + } + psdout(10) << "osd." << from << " updating shard " << shard << dendl; + info.partial_writes_last_complete[shard] = versionrange; + } + } else { + if (!updated) { + updated = true; + psdout(10) << "osd." << from + << " has pwlc=" << oinfo.partial_writes_last_complete + << dendl; + } + psdout(10) << "osd." << from << " setting shard " << shard << dendl; + info.partial_writes_last_complete[shard] = versionrange; + } + } + if (updated) { + psdout(10) << "pwlc=" << info.partial_writes_last_complete << dendl; + } + } + // 3 cases: + // We are the primary - from is the shard that sent the oinfo + // We are a replica - from is the primary, it will not have pwlc infomation + // Merge - from is pg_whoami, oinfo is a source pg that is being merged + if ((from != pg_whoami) && + info.partial_writes_last_complete.contains(from.shard)) { + // Check if last_complete and last_update can be advanced based on + // knowledge of partial_writes + const auto & [fromversion, toversion] = + info.partial_writes_last_complete[from.shard]; + if (toversion > peer_info[from].last_complete) { + if (fromversion <= peer_info[from].last_complete) { + psdout(10) << "osd." << from << " has last_complete " + << peer_info[from].last_complete + << " but pwlc says its at " << toversion + << dendl; + peer_info[from].last_complete = toversion; + if (toversion > peer_info[from].last_update) { + peer_info[from].last_update = toversion; + } + } else { + psdout(10) << "osd." << from << " has last_complete " + << peer_info[from].last_complete + << " cannot apply pwlc from " << fromversion + << " to " << toversion + << dendl; + } + } + } +} + bool PeeringState::proc_replica_notify(const pg_shard_t &from, const pg_notify_t ¬ify) { const pg_info_t &oinfo = notify.info; @@ -342,6 +414,7 @@ bool PeeringState::proc_replica_notify(const pg_shard_t &from, const pg_notify_t psdout(10) << " got osd." << from << " " << oinfo << dendl; ceph_assert(is_primary()); peer_info[from] = oinfo; + update_peer_info(from, oinfo); might_have_unfound.insert(from); update_history(oinfo.history); @@ -2567,6 +2640,14 @@ bool PeeringState::search_for_missing( oinfo.last_update != eversion_t()) { pg_info_t tinfo(oinfo); tinfo.pgid.shard = pg_whoami.shard; + // add partial write from our info + tinfo.partial_writes_last_complete = info.partial_writes_last_complete; + if (!tinfo.partial_writes_last_complete.empty()) { + psdout(20) << "sending info to " << from + << " pwcl=" << tinfo.partial_writes_last_complete + << " info=" << tinfo + << dendl; + } ctx.send_info( from.osd, spg_t(info.pgid.pgid, from.shard), @@ -2820,6 +2901,12 @@ void PeeringState::activate( if (!pi.is_empty()) { psdout(10) << "activate peer osd." << peer << " is up to date, queueing in pending_activators" << dendl; + if (!info.partial_writes_last_complete.empty()) { + psdout(20) << "sending info to " << peer + << " pwcl=" << info.partial_writes_last_complete + << " info=" << info + << dendl; + } ctx.send_info( peer.osd, spg_t(info.pgid.pgid, peer.shard), @@ -3098,6 +3185,7 @@ void PeeringState::proc_master_log( // non-divergent). merge_log(t, oinfo, std::move(olog), from); peer_info[from] = oinfo; + update_peer_info(from, oinfo); psdout(10) << " peer osd." << from << " now " << oinfo << " " << omissing << dendl; might_have_unfound.insert(from); @@ -3130,6 +3218,7 @@ void PeeringState::proc_replica_log( pg_log.proc_replica_log(oinfo, olog, omissing, from); peer_info[from] = oinfo; + update_peer_info(from, oinfo); psdout(10) << " peer osd." << from << " now " << oinfo << " " << omissing << dendl; might_have_unfound.insert(from); @@ -6489,6 +6578,10 @@ boost::statechart::result PeeringState::ReplicaActive::react( pg_info_t i = ps->info; i.history.last_epoch_started = evt.activation_epoch; i.history.last_interval_started = i.history.same_interval_since; + if (!i.partial_writes_last_complete.empty()) { + psdout(20) << "sending info to " << ps->get_primary() << " pwcl=" + << i.partial_writes_last_complete << " info=" << i << dendl; + } rctx.send_info( ps->get_primary().osd, spg_t(ps->info.pgid.pgid, ps->get_primary().shard), @@ -6538,6 +6631,7 @@ boost::statechart::result PeeringState::ReplicaActive::react(const MLogRec& loge psdout(10) << "received log from " << logevt.from << dendl; ObjectStore::Transaction &t = context().get_cur_transaction(); ps->merge_log(t, logevt.msg->info, std::move(logevt.msg->log), logevt.from); + ps->update_peer_info(logevt.from, logevt.msg->info); ceph_assert(ps->pg_log.get_head() == ps->info.last_update); if (logevt.msg->lease) { ps->proc_lease(*logevt.msg->lease); @@ -6651,6 +6745,7 @@ boost::statechart::result PeeringState::Stray::react(const MLogRec& logevt) ps->pg_log.reset_backfill(); } else { ps->merge_log(t, msg->info, std::move(msg->log), logevt.from); + ps->update_peer_info(logevt.from, msg->info); } if (logevt.msg->lease) { ps->proc_lease(*logevt.msg->lease); @@ -6681,7 +6776,8 @@ boost::statechart::result PeeringState::Stray::react(const MInfoRec& infoevt) ceph_assert(infoevt.info.last_update == ps->info.last_update); ceph_assert(ps->pg_log.get_head() == ps->info.last_update); - + // Update pwlc + ps->update_peer_info(infoevt.from, infoevt.info); post_event(Activate(infoevt.info.last_epoch_started)); return transit(); } @@ -7518,6 +7614,7 @@ boost::statechart::result PeeringState::WaitUpThru::react(const MLogRec& logevt) psdout(10) << "Noting missing from osd." << logevt.from << dendl; ps->peer_missing[logevt.from].claim(std::move(logevt.msg->missing)); ps->peer_info[logevt.from] = logevt.msg->info; + ps->update_peer_info(logevt.from, logevt.msg->info); return discard_event(); } diff --git a/src/osd/PeeringState.h b/src/osd/PeeringState.h index ce00852d41c..6cc40c4d4eb 100644 --- a/src/osd/PeeringState.h +++ b/src/osd/PeeringState.h @@ -1580,6 +1580,7 @@ public: void update_heartbeat_peers(); void query_unfound(Formatter *f, std::string state); + void update_peer_info(const pg_shard_t &from, const pg_info_t &oinfo); bool proc_replica_notify(const pg_shard_t &from, const pg_notify_t ¬ify); void remove_down_peer_info(const OSDMapRef &osdmap); void check_recovery_sources(const OSDMapRef& map);