]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: Optimized EC don't apply pwlc for divergent writes
authorBill Scales <bill_scales@uk.ibm.com>
Wed, 30 Jul 2025 11:44:10 +0000 (12:44 +0100)
committerAlex Ainscow <aainscow@uk.ibm.com>
Wed, 17 Sep 2025 08:43:26 +0000 (09:43 +0100)
Split pwlc epoch into a separate variable so that we
can use epoch and version number when comparing if
last_update is within a pwlc range. This ensures that
pwlc is not applied to a shard that has a divergent
write, but still tracks the most recent update of pwlc.

Signed-off-by: Bill Scales <bill_scales@uk.ibm.com>
(cherry picked from commit d634f824f229677aa6df7dded57352f7a59f3597)

src/osd/PGBackend.cc
src/osd/PeeringState.cc
src/osd/osd_types.cc
src/osd/osd_types.h

index 6e43dbd9b80d90183b653aa21df13a966d581755..a1c26fc69830c2304d68004ded22ddc7485b5ce8 100644 (file)
@@ -420,13 +420,19 @@ void PGBackend::partial_write(
   if (entry.written_shards.empty() && info->partial_writes_last_complete.empty()) {
     return;
   }
+  const pg_pool_t &pool = get_parent()->get_pool();
+  if (pool.is_nonprimary_shard(get_parent()->whoami_shard().shard)) {
+    // Don't update pwlc on nonprimary shards because they only
+    // observe writes that update their shard
+    return;
+  }
   auto dpp = get_parent()->get_dpp();
   ldpp_dout(dpp, 20) << __func__ << " version=" << entry.version
                     << " written_shards=" << entry.written_shards
-                    << " pwlc=" << info->partial_writes_last_complete
+                    << " pwlc=e" << info->partial_writes_last_complete_epoch
+                    << ":" << info->partial_writes_last_complete
                     << " previous_version=" << previous_version
                     << dendl;
-  const pg_pool_t &pool = get_parent()->get_pool();
   for (shard_id_t shard : pool.nonprimary_shards) {
     auto pwlc_iter = info->partial_writes_last_complete.find(shard);
     if (!entry.is_written_shard(shard)) {
@@ -434,7 +440,7 @@ void PGBackend::partial_write(
        // 1st partial write since all logs were updated
        info->partial_writes_last_complete[shard] =
          std::pair(previous_version, entry.version);
-
+        info->partial_writes_last_complete_epoch = get_osdmap_epoch();
        continue;
       }
       auto &&[old_v,  new_v] = pwlc_iter->second;
@@ -444,7 +450,7 @@ void PGBackend::partial_write(
          // invalid
          ldpp_dout(dpp, 20) << __func__ << " pwlc invalid " << shard
                           << dendl;
-       } else if (old_v.version >= entry.version.version) {
+       } else if (old_v >= entry.version) {
          // Abnormal case - consider_adjusting_pwlc may advance pwlc
          // during peering because all shards have updates but these
          // have not been marked complete. At the end of peering
@@ -455,10 +461,12 @@ void PGBackend::partial_write(
        } else {
          old_v = previous_version;
          new_v = entry.version;
+         info->partial_writes_last_complete_epoch = get_osdmap_epoch();
        }
       } else if (new_v == previous_version) {
        // Subsequent partial write, contiguous versions
        new_v = entry.version;
+       info->partial_writes_last_complete_epoch = get_osdmap_epoch();
       } else {
        // Subsequent partial write, discontiguous versions
        ldpp_dout(dpp, 20) << __func__ << " cannot update shard " << shard
@@ -471,17 +479,19 @@ void PGBackend::partial_write(
        // shard is backfilling or in async recovery, pwlc is invalid
        ldpp_dout(dpp, 20) << __func__ << " pwlc invalid " << shard
                           << dendl;
-      } else if (old_v.version >= entry.version.version) {
+      } else if (old_v >= entry.version) {
        // Abnormal case - see above
        ldpp_dout(dpp, 20) << __func__ << " pwlc is ahead of entry " << shard
                           << dendl;
       } else {
        old_v = new_v = entry.version;
+       info->partial_writes_last_complete_epoch = get_osdmap_epoch();
       }
     }
   }
-  ldpp_dout(dpp, 20) << __func__ << " after pwlc="
-                    << info->partial_writes_last_complete << dendl;
+  ldpp_dout(dpp, 20) << __func__ << " after pwlc=e"
+                    << info->partial_writes_last_complete_epoch
+                    << ":" << info->partial_writes_last_complete << dendl;
 }
 
 void PGBackend::remove(
index 093dc38d882c78ea314e10c24cf94ff82034f160..3672f2a0be55708eeefd61e4b41c4cb3d5342f33 100644 (file)
@@ -331,7 +331,7 @@ void PeeringState::apply_pwlc(const std::pair<eversion_t, eversion_t> pwlc,
   // knowledge of partial_writes
   const auto & [fromversion, toversion] = pwlc;
   if (toversion > info.last_update) {
-    if (fromversion.version <= info.last_update.version) {
+    if (fromversion <= info.last_update) {
       if (info.last_complete == info.last_update) {
        psdout(10) << "osd." << shard << " has last_complete"
                   << "=last_update " << info.last_update
@@ -368,8 +368,9 @@ void PeeringState::update_peer_info(const pg_shard_t &from,
 {
   // Merge pwlc information from another shard into
   // info.partial_writes_last_complete keeping the newest
-  // updates
-  if (!oinfo.partial_writes_last_complete.empty()) {
+  // updates. Ignore pwlc from nonprimary shards.
+  if (!oinfo.partial_writes_last_complete.empty()&&
+      !pool.info.is_nonprimary_shard(from.shard)) {
     bool updated = false;
     // oinfo includes partial_writes_last_complete data.
     // Merge this with our copy keeping the most up to date versions
@@ -379,12 +380,15 @@ void PeeringState::update_peer_info(const pg_shard_t &from,
       if (info.partial_writes_last_complete.contains(shard)) {
        auto & [fromversion, toversion] =
          info.partial_writes_last_complete[shard];
-       // Prefer pwlc with a newer toversion, if toversion matches prefer an
-       // older fromversion.
-       if ((ofromversion.epoch > fromversion.epoch) ||
-           ((ofromversion.epoch == fromversion.epoch) && (otoversion > toversion)) ||
-           ((ofromversion.epoch == fromversion.epoch) && (otoversion == toversion) &&
-            (ofromversion.version < fromversion.version))) {
+       // Prefer pwlc with a newer epoch, then pwlc with a newer
+       // toversion, then pwlc with an older fromversion.
+       bool newer_epoch = (oinfo.partial_writes_last_complete_epoch >
+                           info.partial_writes_last_complete_epoch);
+       bool same_epoch = (oinfo.partial_writes_last_complete_epoch ==
+                           info.partial_writes_last_complete_epoch);
+       if (newer_epoch ||
+           (same_epoch && (otoversion > toversion)) ||
+           (same_epoch && (otoversion == toversion) && (ofromversion < fromversion))) {
          if (!updated) {
            updated = true;
            psdout(10) << "osd." << from
@@ -408,6 +412,10 @@ void PeeringState::update_peer_info(const pg_shard_t &from,
     if (updated) {
       psdout(10) << "pwlc=" << info.partial_writes_last_complete << dendl;
     }
+    // Update last updated epoch
+    info.partial_writes_last_complete_epoch = std::max(
+        info.partial_writes_last_complete_epoch,
+        oinfo.partial_writes_last_complete_epoch);
   }
   // 3 cases:
   // 1. This is the primary, from is the shard that sent the oinfo which may
@@ -2756,12 +2764,14 @@ bool PeeringState::search_for_missing(
     tinfo.pgid.shard = pg_whoami.shard;
     // add partial write from our info
     tinfo.partial_writes_last_complete = info.partial_writes_last_complete;
+    tinfo.partial_writes_last_complete_epoch = info.partial_writes_last_complete_epoch;
     if (info.partial_writes_last_complete.contains(from.shard)) {
       apply_pwlc(info.partial_writes_last_complete[from.shard], from, tinfo);
     }
     if (!tinfo.partial_writes_last_complete.empty()) {
       psdout(20) << "sending info to " << from
-                << " pwlc=" << tinfo.partial_writes_last_complete
+                << " pwlc=e" << tinfo.partial_writes_last_complete_epoch
+                << ":" << tinfo.partial_writes_last_complete
                 << " info=" << tinfo
                 << dendl;
     }
@@ -3020,7 +3030,8 @@ void PeeringState::activate(
                     << " is up to date, queueing in pending_activators" << dendl;
           if (!info.partial_writes_last_complete.empty()) {
            psdout(20) << "sending info to " << peer
-                      << " pwlc=" << info.partial_writes_last_complete
+                      << " pwlc=e" << info.partial_writes_last_complete_epoch
+                      << ":" << info.partial_writes_last_complete
                       << " info=" << info
                       << dendl;
          }
@@ -3057,6 +3068,7 @@ void PeeringState::activate(
                               << " to " << info.last_update;
 
        pi.partial_writes_last_complete = info.partial_writes_last_complete;
+       pi.partial_writes_last_complete_epoch = info.partial_writes_last_complete_epoch;
        pi.last_update = info.last_update;
        pi.last_complete = info.last_update;
        pi.set_last_backfill(hobject_t());
@@ -3336,12 +3348,10 @@ void PeeringState::consider_rollback_pwlc(eversion_t last_complete)
       psdout(10) << "shard " << shard << " pwlc rolled back to "
                 << info.partial_writes_last_complete[shard] << dendl;
     }
-    // Always assign the current epoch to the version number so that
-    // pwlc adjustments made by the whole proc_master_log process
-    // are recognized as the newest updates
-    info.partial_writes_last_complete[shard].first.epoch =
-      get_osdmap_epoch();
   }
+  // Update the epoch so that pwlc adjustments made by the whole
+  // proc_master_log process are recognized as the newest updates
+  info.partial_writes_last_complete_epoch = get_osdmap_epoch();
 }
 
 void PeeringState::proc_master_log(
@@ -3689,6 +3699,7 @@ void PeeringState::split_into(
 
   // fix up pwlc - it may refer to log entries that are no longer in the log
   child->info.partial_writes_last_complete = info.partial_writes_last_complete;
+  child->info.partial_writes_last_complete_epoch = info.partial_writes_last_complete_epoch;
   pg_log.split_pwlc(info);
   child->pg_log.split_pwlc(child->info);
 
@@ -3857,9 +3868,10 @@ void PeeringState::merge_from(
           info.partial_writes_last_complete) {
        auto &&[old_v,  new_v] = versionrange;
        old_v = new_v = info.last_update;
-       old_v.epoch = get_osdmap_epoch();
       }
-      psdout(10) << "merged pwlc=" << info.partial_writes_last_complete << dendl;
+      info.partial_writes_last_complete_epoch = get_osdmap_epoch();
+      psdout(10) << "merged pwlc=e" << info.partial_writes_last_complete_epoch
+                << ":" << info.partial_writes_last_complete << dendl;
     }
   }
 
@@ -4685,6 +4697,7 @@ void PeeringState::append_log(
       fromversion.version = eversion_t::max().version;
       toversion = fromversion;
     }
+    info.partial_writes_last_complete_epoch = 0;
   }
 
   for (auto p = logv.begin(); p != logv.end(); ++p) {
@@ -6925,8 +6938,10 @@ boost::statechart::result PeeringState::ReplicaActive::react(
   i.history.last_epoch_started = evt.activation_epoch;
   i.history.last_interval_started = i.history.same_interval_since;
   if (!i.partial_writes_last_complete.empty()) {
-    psdout(20) << "sending info to " << ps->get_primary() << " pwlc="
-             << i.partial_writes_last_complete << " info=" << i << dendl;
+    psdout(20) << "sending info to " << ps->get_primary() << " pwlc=e"
+              << i.partial_writes_last_complete_epoch
+              << ":" << i.partial_writes_last_complete
+              << " info=" << i << dendl;
   }
   rctx.send_info(
     ps->get_primary().osd,
@@ -7145,11 +7160,12 @@ boost::statechart::result PeeringState::Stray::react(const MInfoRec& infoevt)
     psdout(20) << "info from osd." << infoevt.from
               << " last_update=" << infoevt.info.last_update
               << " last_complete=" << infoevt.info.last_complete
-              << " pwlc=" << pwlc
+              << " pwlc=e" << infoevt.info.partial_writes_last_complete_epoch
+              << ":" << pwlc
               << " our last_update=" << ps->info.last_update << dendl;
     // Our last update must be in the range described by partial write
     // last_complete
-    ceph_assert(ps->info.last_update.version >= pwlc.first.version);
+    ceph_assert(ps->info.last_update >= pwlc.first);
     // Last complete must match the partial write last_update
     ceph_assert(pwlc.second == infoevt.info.last_update);
   } else {
index 734dfd003fed33b287224b4723989ce9a1ab7520..204edceb0789ac4500288a2a6bd239b8dcacac97 100644 (file)
@@ -3627,7 +3627,7 @@ void pg_history_t::generate_test_instances(list<pg_history_t*>& o)
 
 void pg_info_t::encode(ceph::buffer::list &bl) const
 {
-  ENCODE_START(33, 26, bl);
+  ENCODE_START(34, 26, bl);
   encode(pgid.pgid, bl);
   encode(last_update, bl);
   encode(last_complete, bl);
@@ -3644,12 +3644,13 @@ void pg_info_t::encode(ceph::buffer::list &bl) const
   encode(true, bl); // was last_backfill_bitwise
   encode(last_interval_started, bl);
   encode(partial_writes_last_complete, bl);
+  encode(partial_writes_last_complete_epoch, bl);
   ENCODE_FINISH(bl);
 }
 
 void pg_info_t::decode(ceph::buffer::list::const_iterator &bl)
 {
-  DECODE_START(33, bl);
+  DECODE_START(34, bl);
   decode(pgid.pgid, bl);
   decode(last_update, bl);
   decode(last_complete, bl);
@@ -3681,6 +3682,9 @@ void pg_info_t::decode(ceph::buffer::list::const_iterator &bl)
   if (struct_v >= 33) {
     decode(partial_writes_last_complete, bl);
   }
+  if (struct_v >= 34) {
+    decode(partial_writes_last_complete_epoch, bl);
+  }
   DECODE_FINISH(bl);
 }
 
@@ -3705,6 +3709,7 @@ void pg_info_t::dump(Formatter *f) const
     f->close_section();
   }
   f->close_section();
+  f->dump_stream("partial_writes_last_complete_epoch") << partial_writes_last_complete_epoch;
   f->open_array_section("purged_snaps");
   for (interval_set<snapid_t>::const_iterator i=purged_snaps.begin();
        i != purged_snaps.end();
index 58d83be0d315b1ce6c5dc73fc5b8d24f920fde7f..b14686b77e261782cd10f744b4ef4633ebde5db4 100644 (file)
@@ -3060,6 +3060,7 @@ struct pg_info_t {
 
   std::map<shard_id_t,std::pair<eversion_t, eversion_t>>
     partial_writes_last_complete; ///< last_complete for shards not modified by a partial write
+  epoch_t partial_writes_last_complete_epoch; ///< epoch when pwlc was last updated
 
   pg_stat_t stats;
 
@@ -3078,6 +3079,7 @@ struct pg_info_t {
       l.last_backfill == r.last_backfill &&
       l.purged_snaps == r.purged_snaps &&
       l.partial_writes_last_complete == r.partial_writes_last_complete &&
+      l.partial_writes_last_complete_epoch == r.partial_writes_last_complete_epoch &&
       l.stats == r.stats &&
       l.history == r.history &&
       l.hit_set == r.hit_set;
@@ -3087,7 +3089,8 @@ struct pg_info_t {
     : last_epoch_started(0),
       last_interval_started(0),
       last_user_version(0),
-      last_backfill(hobject_t::get_max())
+      last_backfill(hobject_t::get_max()),
+      partial_writes_last_complete_epoch(0)
   { }
   // cppcheck-suppress noExplicitConstructor
   pg_info_t(spg_t p)
@@ -3095,7 +3098,8 @@ struct pg_info_t {
       last_epoch_started(0),
       last_interval_started(0),
       last_user_version(0),
-      last_backfill(hobject_t::get_max())
+      last_backfill(hobject_t::get_max()),
+      partial_writes_last_complete_epoch(0)
   { }
   
   void set_last_backfill(hobject_t pos) {
@@ -3155,6 +3159,7 @@ struct pg_fast_info_t {
   eversion_t last_complete;
   version_t last_user_version;
   std::map<shard_id_t,std::pair<eversion_t,eversion_t>> partial_writes_last_complete;
+  epoch_t partial_writes_last_complete_epoch;
   struct { // pg_stat_t stats
     eversion_t version;
     version_t reported_seq;
@@ -3185,6 +3190,7 @@ struct pg_fast_info_t {
     last_complete = info.last_complete;
     last_user_version = info.last_user_version;
     partial_writes_last_complete = info.partial_writes_last_complete;
+    partial_writes_last_complete_epoch = info.partial_writes_last_complete_epoch;
     stats.version = info.stats.version;
     stats.reported_seq = info.stats.reported_seq;
     stats.last_fresh = info.stats.last_fresh;
@@ -3212,6 +3218,7 @@ struct pg_fast_info_t {
     info->last_complete = last_complete;
     info->last_user_version = last_user_version;
     info->partial_writes_last_complete = partial_writes_last_complete;
+    info->partial_writes_last_complete_epoch = partial_writes_last_complete_epoch;
     info->stats.version = stats.version;
     info->stats.reported_seq = stats.reported_seq;
     info->stats.last_fresh = stats.last_fresh;
@@ -3235,7 +3242,7 @@ struct pg_fast_info_t {
   }
 
   void encode(ceph::buffer::list& bl) const {
-    ENCODE_START(2, 1, bl);
+    ENCODE_START(3, 1, bl);
     encode(last_update, bl);
     encode(last_complete, bl);
     encode(last_user_version, bl);
@@ -3258,10 +3265,11 @@ struct pg_fast_info_t {
     encode(stats.stats.sum.num_wr_kb, bl);
     encode(stats.stats.sum.num_objects_dirty, bl);
     encode(partial_writes_last_complete, bl);
+    encode(partial_writes_last_complete_epoch, bl);
     ENCODE_FINISH(bl);
   }
   void decode(ceph::buffer::list::const_iterator& p) {
-    DECODE_START(2, p);
+    DECODE_START(3, p);
     decode(last_update, p);
     decode(last_complete, p);
     decode(last_user_version, p);
@@ -3285,6 +3293,8 @@ struct pg_fast_info_t {
     decode(stats.stats.sum.num_objects_dirty, p);
     if (struct_v >= 2)
       decode(partial_writes_last_complete, p);
+    if (struct_v >= 3)
+      decode(partial_writes_last_complete_epoch, p);
     DECODE_FINISH(p);
   }
   void dump(ceph::Formatter *f) const {
@@ -3301,6 +3311,7 @@ struct pg_fast_info_t {
       f->close_section();
     }
     f->close_section();
+    f->dump_stream("partial_writes_last_complete_epoch") << partial_writes_last_complete_epoch;
     f->open_object_section("stats");
     f->dump_stream("version") << stats.version;
     f->dump_unsigned("reported_seq", stats.reported_seq);