]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: EC Optimizations: Share pwlc between peers
authorBill Scales <bill_scales@uk.ibm.com>
Wed, 26 Mar 2025 08:30:32 +0000 (08:30 +0000)
committerAlex Ainscow <aainscow@uk.ibm.com>
Tue, 22 Apr 2025 07:04:24 +0000 (08:04 +0100)
Optimized EC pools add partial_writes_last_complete (pwlc) data to
pg_info_t to track shards that were not updated because of partial
writes. During peering the primary collects the info structure from
all the replica/strays and then having reconciled the log can send
the info back to peers. Different shards may have newer/older
versions of pwlc, the primary merges these together to create
the definitive copy and then redistributes this to the other shards.

The primary also adjusts the last_update and last_complete values
in the info structure received from peers using the pwlc data to
advance these where shards were not updated because of a partial
write.

Signed-off-by: Bill Scales <bill_scales@uk.ibm.com>
src/osd/PeeringState.cc
src/osd/PeeringState.h

index e3d10e4024e8e5b473a3f498a295f0eba2fabe5e..ad6e9283141635bd9611f95f531112234f216185 100644 (file)
@@ -321,6 +321,78 @@ void PeeringState::query_unfound(Formatter *f, string state)
   return;
 }
 
+void PeeringState::update_peer_info(const pg_shard_t &from,
+                                   const pg_info_t &oinfo)
+{
+  if (!oinfo.partial_writes_last_complete.empty()) {
+    bool updated = false;
+    // oinfo includes partial_writes_last_complete data.
+    // Merge this with our copy keeping the most up to date versions
+    for (const auto & [shard, versionrange] :
+          oinfo.partial_writes_last_complete) {
+      auto & [ofromversion, otoversion] = versionrange;
+      if (info.partial_writes_last_complete.contains(shard)) {
+       auto & [fromversion, toversion] =
+         info.partial_writes_last_complete[shard];
+       // Prefer pwlc with a newer toversion, if toversion matches prefer an
+       // older fromversion.
+       if ((otoversion > toversion) ||
+           ((otoversion == toversion) && (ofromversion < fromversion))) {
+         if (!updated) {
+           updated = true;
+           psdout(10) << "osd." << from
+                      << " has pwlc=" << oinfo.partial_writes_last_complete
+                      << dendl;
+         }
+          psdout(10) << "osd." << from << " updating shard " << shard << dendl;
+         info.partial_writes_last_complete[shard] = versionrange;
+       }
+      } else {
+       if (!updated) {
+         updated = true;
+         psdout(10) << "osd." << from
+                    << " has pwlc=" << oinfo.partial_writes_last_complete
+                    << dendl;
+       }
+        psdout(10) << "osd." << from << " setting shard " << shard << dendl;
+       info.partial_writes_last_complete[shard] = versionrange;
+      }
+    }
+    if (updated) {
+      psdout(10) << "pwlc=" << info.partial_writes_last_complete << dendl;
+    }
+  }
+  // 3 cases:
+  // We are the primary - from is the shard that sent the oinfo
+  // We are a replica - from is the primary, it will not have pwlc infomation
+  // Merge - from is pg_whoami, oinfo is a source pg that is being merged
+  if ((from != pg_whoami) &&
+      info.partial_writes_last_complete.contains(from.shard)) {
+    // Check if last_complete and last_update can be advanced based on
+    // knowledge of partial_writes
+    const auto & [fromversion, toversion] =
+      info.partial_writes_last_complete[from.shard];
+    if (toversion > peer_info[from].last_complete) {
+      if (fromversion <= peer_info[from].last_complete) {
+       psdout(10) << "osd." << from << " has last_complete "
+                  << peer_info[from].last_complete
+                  << " but pwlc says its at " << toversion
+                  << dendl;
+       peer_info[from].last_complete = toversion;
+       if (toversion > peer_info[from].last_update) {
+         peer_info[from].last_update = toversion;
+       }
+      } else {
+       psdout(10) << "osd." << from << " has last_complete "
+                  << peer_info[from].last_complete
+                  << " cannot apply pwlc from " << fromversion
+                  << " to " << toversion
+                  << dendl;
+      }
+    }
+  }
+}
+
 bool PeeringState::proc_replica_notify(const pg_shard_t &from, const pg_notify_t &notify)
 {
   const pg_info_t &oinfo = notify.info;
@@ -342,6 +414,7 @@ bool PeeringState::proc_replica_notify(const pg_shard_t &from, const pg_notify_t
   psdout(10) << " got osd." << from << " " << oinfo << dendl;
   ceph_assert(is_primary());
   peer_info[from] = oinfo;
+  update_peer_info(from, oinfo);
   might_have_unfound.insert(from);
 
   update_history(oinfo.history);
@@ -2567,6 +2640,14 @@ bool PeeringState::search_for_missing(
       oinfo.last_update != eversion_t()) {
     pg_info_t tinfo(oinfo);
     tinfo.pgid.shard = pg_whoami.shard;
+    // add partial write from our info
+    tinfo.partial_writes_last_complete = info.partial_writes_last_complete;
+    if (!tinfo.partial_writes_last_complete.empty()) {
+      psdout(20) << "sending info to " << from
+                << " pwcl=" << tinfo.partial_writes_last_complete
+                << " info=" << tinfo
+                << dendl;
+    }
     ctx.send_info(
       from.osd,
       spg_t(info.pgid.pgid, from.shard),
@@ -2820,6 +2901,12 @@ void PeeringState::activate(
        if (!pi.is_empty()) {
          psdout(10) << "activate peer osd." << peer
                     << " is up to date, queueing in pending_activators" << dendl;
+          if (!info.partial_writes_last_complete.empty()) {
+           psdout(20) << "sending info to " << peer
+                      << " pwcl=" << info.partial_writes_last_complete
+                      << " info=" << info
+                      << dendl;
+         }
          ctx.send_info(
            peer.osd,
            spg_t(info.pgid.pgid, peer.shard),
@@ -3098,6 +3185,7 @@ void PeeringState::proc_master_log(
   // non-divergent).
   merge_log(t, oinfo, std::move(olog), from);
   peer_info[from] = oinfo;
+  update_peer_info(from, oinfo);
   psdout(10) << " peer osd." << from << " now " << oinfo
             << " " << omissing << dendl;
   might_have_unfound.insert(from);
@@ -3130,6 +3218,7 @@ void PeeringState::proc_replica_log(
   pg_log.proc_replica_log(oinfo, olog, omissing, from);
 
   peer_info[from] = oinfo;
+  update_peer_info(from, oinfo);
   psdout(10) << " peer osd." << from << " now "
             << oinfo << " " << omissing << dendl;
   might_have_unfound.insert(from);
@@ -6489,6 +6578,10 @@ boost::statechart::result PeeringState::ReplicaActive::react(
   pg_info_t i = ps->info;
   i.history.last_epoch_started = evt.activation_epoch;
   i.history.last_interval_started = i.history.same_interval_since;
+  if (!i.partial_writes_last_complete.empty()) {
+    psdout(20) << "sending info to " << ps->get_primary() << " pwcl="
+             << i.partial_writes_last_complete << " info=" << i << dendl;
+  }
   rctx.send_info(
     ps->get_primary().osd,
     spg_t(ps->info.pgid.pgid, ps->get_primary().shard),
@@ -6538,6 +6631,7 @@ boost::statechart::result PeeringState::ReplicaActive::react(const MLogRec& loge
   psdout(10) << "received log from " << logevt.from << dendl;
   ObjectStore::Transaction &t = context<PeeringMachine>().get_cur_transaction();
   ps->merge_log(t, logevt.msg->info, std::move(logevt.msg->log), logevt.from);
+  ps->update_peer_info(logevt.from, logevt.msg->info);
   ceph_assert(ps->pg_log.get_head() == ps->info.last_update);
   if (logevt.msg->lease) {
     ps->proc_lease(*logevt.msg->lease);
@@ -6651,6 +6745,7 @@ boost::statechart::result PeeringState::Stray::react(const MLogRec& logevt)
     ps->pg_log.reset_backfill();
   } else {
     ps->merge_log(t, msg->info, std::move(msg->log), logevt.from);
+    ps->update_peer_info(logevt.from, msg->info);
   }
   if (logevt.msg->lease) {
     ps->proc_lease(*logevt.msg->lease);
@@ -6681,7 +6776,8 @@ boost::statechart::result PeeringState::Stray::react(const MInfoRec& infoevt)
 
   ceph_assert(infoevt.info.last_update == ps->info.last_update);
   ceph_assert(ps->pg_log.get_head() == ps->info.last_update);
-
+  // Update pwlc
+  ps->update_peer_info(infoevt.from, infoevt.info);
   post_event(Activate(infoevt.info.last_epoch_started));
   return transit<ReplicaActive>();
 }
@@ -7518,6 +7614,7 @@ boost::statechart::result PeeringState::WaitUpThru::react(const MLogRec& logevt)
   psdout(10) << "Noting missing from osd." << logevt.from << dendl;
   ps->peer_missing[logevt.from].claim(std::move(logevt.msg->missing));
   ps->peer_info[logevt.from] = logevt.msg->info;
+  ps->update_peer_info(logevt.from, logevt.msg->info);
   return discard_event();
 }
 
index ce00852d41c46d82d17116ff5072e13abed0fa0b..6cc40c4d4eb47b7a3fcc5a9b4a54ccb6150e1489 100644 (file)
@@ -1580,6 +1580,7 @@ public:
 
   void update_heartbeat_peers();
   void query_unfound(Formatter *f, std::string state);
+  void update_peer_info(const pg_shard_t &from, const pg_info_t &oinfo);
   bool proc_replica_notify(const pg_shard_t &from, const pg_notify_t &notify);
   void remove_down_peer_info(const OSDMapRef &osdmap);
   void check_recovery_sources(const OSDMapRef& map);