]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/test: Add EC peering test infrastructure and recovery test cases 68697/head
authorAlex Ainscow <aainscow@uk.ibm.com>
Mon, 27 Apr 2026 13:24:45 +0000 (14:24 +0100)
committerAlex Ainscow <aainscow@uk.ibm.com>
Thu, 30 Apr 2026 15:24:25 +0000 (16:24 +0100)
This commit enhances the EC peering test framework and adds test cases
for erasure-coded pool recovery scenarios:

NOTE: Many of the tests cases are disabled as they recreate certain
problems. Later commits will enable these tests and fix the production
issues, but under different PRs.

Test Infrastructure Improvements:
- Add MockStore wrapper with read error injection capabilities for testing
  error handling in EC recovery
- Enhance ECPeeringTestFixture with recovery callback verification
- Add support for pg_upmap to better simulate OSD placement
- Implement write_attribute() for testing partial vs full stripe writes
- Add read_shard_object_info() to verify on-disk version consistency
- Improve logging with missing object stats (m=, u=, mbc=)
- Add support for doing object recovery in Fast EC.
- Add set_config() helper for runtime configuration changes
- Preserve xinfo features when marking OSDs up/down
- Fix pg_temp handling for EC pools with optimizations

Mock Object Enhancements:
- Update MockPGBackendListener with recovery callback tracking
- Add on_local_recover, on_peer_recover, on_global_recover tracking
- Implement proper stats publishing (pg_stats_publish)
- Add is_missing_object() implementation
- Enhance should_send_op() with async_recovery_target logic
- Add apply_stats() to update PeeringState statistics

Test Cases Added:
- ECRecoveryTest: Verifies recovery with missing objects after OSD failure
- ECSequentialOSDFailoverTest: Tests sequential OSD failure/recovery cycles
- MultiObjectRecoveryReadCrash: Reproduces bug #75432 (multi-object reads)
- RollbackVersionMismatch: Reproduces bug #76213 (version mismatch)
- RollbackAfterMixedBlockedWrites: Reproduces bug #75211 (rollback issues)

These tests validate EC recovery mechanisms including:
- Object version tracking across shards
- Recovery callback invocation (local, peer, global)
- Handling of read errors during recovery
- Rollback behavior after blocked writes
- Multi-object recovery with partial failures

Assisted-by: IBM Bob, using Claude Sonnet
Signed-off-by: Alex Ainscow <aainscow@uk.ibm.com>
src/test/osd/ECPeeringTestFixture.cc
src/test/osd/ECPeeringTestFixture.h
src/test/osd/MockPGBackendListener.h
src/test/osd/MockStore.h [new file with mode: 0644]
src/test/osd/OSDMapTestHelpers.h
src/test/osd/PGBackendTestFixture.cc
src/test/osd/PGBackendTestFixture.h
src/test/osd/TestECFailoverWithPeering.cc

index 8b96fa78bb018a3e3aad43e0d178fe58eafb93d4..172aa9a8dfb60fe924957771b1e578435f539d46 100644 (file)
@@ -22,7 +22,19 @@ std::ostream& ECPeeringTestFixture::ShardDpp::gen_prefix(std::ostream& out) cons
   out << "shard " << shard << ": ";
   if (fixture->shard_peering_states.contains(shard)) {
     PeeringState *ps = fixture->shard_peering_states[shard].get();
-    out << *ps << " ";
+    out << *ps;
+
+    // Add missing stats like PG::operator<< does (mimics production code)
+    out << " m=" << ps->get_num_missing();
+    if (ps->is_primary()) {
+      uint64_t unfound = ps->get_num_unfound();
+      out << " u=" << unfound;
+    }
+    if (!ps->is_clean()) {
+      out << " mbc=" << ps->get_missing_by_count();
+    }
+
+    out << " ";
   }
   return out;
 }
@@ -41,6 +53,23 @@ ECPeeringTestFixture::ECPeeringTestFixture()
 
 void ECPeeringTestFixture::SetUp() {
   PGBackendTestFixture::SetUp();
+
+  // The harness does not use CRUSH, so we must set an upmap.  Choose the upmap
+  // to have shard == osd.
+  {
+    std::vector<int> initial_acting;
+    for (int i = 0; i < k + m; ++i) {
+      initial_acting.push_back(i);
+    }
+    OSDMap::Incremental inc(osdmap->get_epoch() + 1);
+    inc.fsid = osdmap->get_fsid();
+    inc.new_pg_upmap[pgid] =
+      mempool::osdmap::vector<int32_t>(
+        initial_acting.begin(),
+        initial_acting.end());
+    osdmap->apply_incremental(inc);
+  }
+
   for (int i = 0; i < k + m; i++) {
     create_peering_state(i);
   }
@@ -84,7 +113,8 @@ void ECPeeringTestFixture::SetUp() {
   messenger->register_typed_handler<MOSDPeeringOp>(MSG_OSD_PG_LOG, peering_handler);
   messenger->register_typed_handler<MOSDPeeringOp>(MSG_OSD_PG_LEASE, peering_handler);
   messenger->register_typed_handler<MOSDPeeringOp>(MSG_OSD_PG_LEASE_ACK, peering_handler);
-  
+  messenger->register_typed_handler<MOSDPeeringOp>(MSG_OSD_RECOVERY_RESERVE, peering_handler);
+
   // Register idle callback to check for buffered messages
   event_loop->register_idle_callback([this]() -> bool {
     bool found_messages = false;
@@ -110,6 +140,11 @@ void ECPeeringTestFixture::TearDown() {
   PGBackendTestFixture::TearDown();
 }
 
+void ECPeeringTestFixture::set_config(const std::string& option, const std::string& value) {
+  g_ceph_context->_conf.set_val(option, value);
+  g_ceph_context->_conf.apply_changes(nullptr);
+}
+
 PeeringState* ECPeeringTestFixture::get_peering_state(int shard) {
   ceph_assert(shard >= 0 && shard < k + m);
   auto it = shard_peering_states.find(shard);
@@ -322,6 +357,13 @@ void ECPeeringTestFixture::unsuspend_primary_to_osd(int to_osd) {
   }
 }
 
+void ECPeeringTestFixture::inject_read_error_for_shard(const std::string& obj_name, int shard, int error_code) {
+  hobject_t hoid(object_t(obj_name), "", CEPH_NOSNAP, 0, pool_id, "");
+  ghobject_t ghoid(hoid, ghobject_t::NO_GEN, shard_id_t(shard));
+
+  store->inject_read_error(ghoid, error_code);
+}
+
 PeeringState* ECPeeringTestFixture::create_peering_state(int shard)
 {
   const pg_pool_t& pi = get_pool();
@@ -454,15 +496,23 @@ bool ECPeeringTestFixture::new_epoch(bool if_required)
       std::vector<int> up_osds;
       int up_primary = -1;
       osdmap->pg_to_up_acting_osds(this->pgid, &up_osds, &up_primary, nullptr, nullptr);
-      
+
       std::vector<int> acting_temp = listener->next_acting;
       if (acting_temp.empty()) {
         acting_temp = up_osds;
       }
 
+      // For EC pools with optimizations, transform to primaryfirst order before
+      // storing in pg_temp. This matches what the real monitor does and what
+      // _get_temp_osds() expects when it calls pgtemp_undo_primaryfirst().
+      const pg_pool_t* pool = osdmap->get_pg_pool(this->pgid.pool());
+      if (pool && pool->allows_ecoptimizations()) {
+        acting_temp = osdmap->pgtemp_primaryfirst(*pool, acting_temp);
+      }
+
       pending_inc.new_pg_temp[this->pgid] =
-        mempool::osdmap::vector<int>(acting_temp.begin(), acting_temp.end());
-      
+      mempool::osdmap::vector<int32_t>(acting_temp.begin(), acting_temp.end());
+
       listener->pg_temp_wanted = false;
       did_work = true;
     }
@@ -541,3 +591,181 @@ void ECPeeringTestFixture::advance_epoch()
   update_osdmap_with_peering(new_osdmap);
 }
 
+void ECPeeringTestFixture::run_recovery_and_verify_callbacks(
+  const std::string& obj_name,
+  int removed_osd,
+  const std::string& expected_data)
+{
+  // Delegate to the parallel version with a single object
+  run_parallel_recovery_and_verify_callbacks(
+    {obj_name},
+    removed_osd,
+    {expected_data});
+}
+
+void ECPeeringTestFixture::run_parallel_recovery_and_verify_callbacks(
+  const std::vector<std::string>& obj_names,
+  int target_osd,
+  const std::vector<std::string>& expected_data)
+{
+  // Verify we have matching sizes
+  ASSERT_EQ(obj_names.size(), expected_data.size())
+    << "obj_names and expected_data must have the same size";
+
+  // Get the actual primary from the OSDMap
+  int primary_shard = get_primary_shard_from_osdmap();
+  if (primary_shard < 0 || primary_shard == CRUSH_ITEM_NONE) {
+    // No valid primary, cannot run recovery
+    return;
+  }
+  auto primary_ps = get_peering_state(primary_shard);
+  pg_shard_t target_shard(target_osd, shard_id_t(target_osd));
+
+  std::cout << "\n=== Starting Parallel Recovery for " << obj_names.size()
+            << " objects ===" << std::endl;
+
+  // Step 1: Verify all objects are in the missing set and prepare recovery
+  std::vector<hobject_t> hoids;
+  std::vector<ObjectContextRef> obcs;
+  std::vector<pg_missing_item> missing_items;
+
+  for (size_t i = 0; i < obj_names.size(); ++i) {
+    hobject_t hoid = make_test_object(obj_names[i]);
+    hoids.push_back(hoid);
+
+    pg_missing_item missing_item;
+
+    // Check if the target OSD is the current primary
+    // If so, check the primary's own missing set; otherwise check peer_missing
+    if (target_osd == primary_shard) {
+      // The target OSD became primary again after coming back up
+      // Check the primary's own missing set
+      const pg_missing_t& primary_missing = primary_ps->get_pg_log().get_missing();
+      ASSERT_TRUE(primary_missing.have_missing())
+        << "Primary OSD " << target_osd << " should have missing objects after coming back up";
+
+      ASSERT_TRUE(primary_missing.is_missing(hoid, &missing_item))
+        << "Object " << obj_names[i] << " should be in primary " << target_osd << "'s missing set";
+
+      std::cout << "  OSD " << target_osd << " is the primary and has object " << obj_names[i] << " in its own missing set" << std::endl;
+    } else {
+
+      // The target OSD is a peer, check peer_missing
+      const auto& peer_missing_map = primary_ps->get_peer_missing();
+      auto peer_missing_it = peer_missing_map.find(target_shard);
+      ASSERT_NE(peer_missing_it, peer_missing_map.end())
+        << "Primary should have peer_missing entry for OSD " << target_osd;
+
+      const pg_missing_t& peer_missing = peer_missing_it->second;
+      ASSERT_TRUE(peer_missing.have_missing())
+        << "Peer OSD " << target_osd << " should have missing objects after coming back up";
+
+      ASSERT_TRUE(peer_missing.is_missing(hoid, &missing_item))
+        << "Object " << obj_names[i] << " should be in peer " << target_osd << "'s missing set";
+
+      auto target_ps = get_peering_state(target_osd);
+      const pg_missing_t& target_missing = target_ps->get_pg_log().get_missing();
+      ASSERT_TRUE(target_missing.have_missing())
+        << "Target OSD " << target_osd << " should have missing objects after coming back up";
+
+      pg_missing_item target_missing_item;
+      ASSERT_TRUE(target_missing.is_missing(hoid, &target_missing_item))
+        << "Object " << obj_names[i] << " should be in peer " << target_osd << "'s missing set";
+
+      ASSERT_EQ(target_missing_item, missing_item) << "Missing on shard and primary should match";
+
+
+      std::cout << "  OSD " << target_osd << " is a peer and has object " << obj_names[i] << " in peer_missing" << std::endl;
+    }
+
+    missing_items.push_back(missing_item);
+
+    // Create OBC for this object
+    ObjectContextRef obc = get_or_create_obc(hoid, true, expected_data[i].length());
+    ASSERT_FALSE(obc->attr_cache.empty())
+      << "OBC attr_cache must be populated for recovery of " << obj_names[i];
+    obcs.push_back(obc);
+  }
+
+  // Reset recovery callback tracker before starting recovery
+  auto* primary_listener = get_primary_listener();
+  primary_listener->recovery_tracker.reset();
+
+  // Step 2: Open a single recovery operation handle
+  std::cout << "\n  Opening single recovery operation for all objects..." << std::endl;
+  PGBackend::RecoveryHandle *h = get_primary_backend()->open_recovery_op();
+
+  // Step 3: Queue ALL objects for recovery in this single operation
+  // This is the key difference - all objects share the same recovery operation
+  std::cout << "  Queuing all " << obj_names.size() << " objects for parallel recovery..." << std::endl;
+  for (size_t i = 0; i < obj_names.size(); ++i) {
+    std::cout << "    Queuing object " << obj_names[i] << " (hoid: " << hoids[i] << ")" << std::endl;
+    int r = get_primary_backend()->recover_object(
+      hoids[i],
+      missing_items[i].need,
+      ObjectContextRef(),
+      obcs[i],
+      h);
+    ASSERT_EQ(0, r) << "recover_object should successfully queue " << obj_names[i];
+  }
+
+  // Step 4: Run the recovery operation ONCE for all objects
+  // This processes all queued recoveries together in a single operation
+  std::cout << "\n  Running single recovery operation for all queued objects..." << std::endl;
+  std::cout << "  (This is where Bug 75432 would trigger if present)" << std::endl;
+  get_primary_backend()->run_recovery_op(h, 10);  // priority = 10
+  event_loop->run_until_idle();
+
+  // Step 5: Verify recovery callbacks and data for all objects
+  std::cout << "\n  === Recovery Callback Verification ===" << std::endl;
+  std::cout << "  on_local_recover calls: " << primary_listener->recovery_tracker.on_local_recover_calls << std::endl;
+  std::cout << "  on_peer_recover calls: " << primary_listener->recovery_tracker.on_peer_recover_calls.size() << " peers" << std::endl;
+  std::cout << "  on_global_recover calls: " << primary_listener->recovery_tracker.on_global_recover_calls << std::endl;
+
+  for (size_t i = 0; i < obj_names.size(); ++i) {
+    std::cout << "\n  Verifying object " << obj_names[i] << "..." << std::endl;
+
+    // Verify recovery callback was called for this object
+    bool callback_found = false;
+    if (target_osd == primary_shard) {
+      // Local recovery
+      for (const auto& obj : primary_listener->recovery_tracker.on_local_recover_objects) {
+        if (obj == hoids[i]) {
+          callback_found = true;
+          break;
+        }
+      }
+      EXPECT_TRUE(callback_found)
+        << "on_local_recover should be called for " << obj_names[i];
+    } else {
+      // Peer recovery
+      for (const auto& [peer, obj] : primary_listener->recovery_tracker.on_peer_recover_objects) {
+        if (peer == target_shard && obj == hoids[i]) {
+          callback_found = true;
+          break;
+        }
+      }
+      EXPECT_TRUE(callback_found)
+        << "on_peer_recover should be called for " << obj_names[i];
+    }
+
+    // Verify the recovered data
+    bufferlist read_bl;
+    int r = read_object(obj_names[i], 0, expected_data[i].length(),
+                       read_bl, expected_data[i].length());
+    EXPECT_EQ(r, (int)expected_data[i].length())
+      << "Should read full object " << obj_names[i];
+
+    std::string read_data(read_bl.c_str(), read_bl.length());
+    EXPECT_EQ(read_data, expected_data[i])
+      << "Recovered data should match for " << obj_names[i];
+
+    std::cout << "  ✓ Object " << obj_names[i] << " recovered successfully" << std::endl;
+  }
+
+  // Verify on_global_recover was called for all objects
+  EXPECT_EQ((int)obj_names.size(), primary_listener->recovery_tracker.on_global_recover_calls)
+    << "on_global_recover should be called once for each object";
+
+  std::cout << "\n  === All parallel recovery callbacks and data verified successfully ===" << std::endl;
+}
index 2cad715cd3de373ae0101e62c6b60fb5367fd03b..e5a56bf9988c1d9f772ffb82998eb1d04d9d8f98 100644 (file)
@@ -87,6 +87,14 @@ public:
   void event_advance_map();
   void event_activate_map();
   
+  /**
+   * set_config - Set a configuration option for testing
+   *
+   * @param option The configuration option name
+   * @param value The value to set
+   */
+  void set_config(const std::string& option, const std::string& value);
+  
 private:
   /**
    * dispatch_buffered_messages - Check for and dispatch any buffered messages
@@ -184,5 +192,58 @@ public:
    * @param to_osd The OSD number to unblock messages to (from the primary)
    */
   void unsuspend_primary_to_osd(int to_osd);
+
+  /**
+   * Inject a read error for a specific object on a specific shard's store.
+   * The error will be returned on the next read() call for this object,
+   * then automatically cleared.
+   *
+   * @param obj_name The name of the object to inject an error for
+   * @param shard The shard number whose store should return the error
+   * @param error_code The error code to return (should be negative, e.g., -EIO)
+   */
+  void inject_read_error_for_shard(const std::string& obj_name, int shard, int error_code);
+
+  /**
+   * run_recovery_and_verify_callbacks - Run recovery for an object and verify callbacks
+   *
+   * This helper function encapsulates the complete EC recovery flow:
+   * 1. Verifies the object is in the peer's missing set
+   * 2. Runs the recovery operation
+   * 3. Verifies all recovery callbacks were invoked correctly
+   * 4. Verifies PeeringState was updated correctly
+   *
+   * @param obj_name The name of the object to recover
+   * @param removed_osd The OSD that was down and needs recovery
+   * @param expected_data The expected data content after recovery
+   */
+  void run_recovery_and_verify_callbacks(
+    const std::string& obj_name,
+    int removed_osd,
+    const std::string& expected_data);
+
+  /**
+   * run_parallel_recovery_and_verify_callbacks - Run parallel recovery for multiple objects
+   *
+   * This helper function recovers multiple objects in parallel within a single recovery
+   * operation. This is the key difference from run_recovery_and_verify_callbacks which
+   * recovers objects sequentially (one at a time).
+   *
+   * The parallel recovery flow:
+   * 1. Calls recover_object() for ALL objects first (queues them)
+   * 2. Calls run_recovery_op() ONCE to process all queued recoveries together
+   * 3. Verifies callbacks and data for all objects
+   *
+   * This reproduces Bug 75432 where multiple objects in a single operation can cause
+   * assertion failures when some complete while others need resend.
+   *
+   * @param obj_names Vector of object names to recover in parallel
+   * @param target_osd The OSD that was down and needs recovery
+   * @param expected_data Vector of expected data content (must match obj_names size)
+   */
+  void run_parallel_recovery_and_verify_callbacks(
+    const std::vector<std::string>& obj_names,
+    int target_osd,
+    const std::vector<std::string>& expected_data);
 };
 
index e0e6a581adce2e6f04cfa1933ad433b539d0740e..03b22352f1398702fc5b925886a1956c7c5687b6 100644 (file)
@@ -65,6 +65,28 @@ public:
   OpTracker *op_tracker = nullptr;
   PerfCounters *perf_logger = nullptr;
 
+  // Recovery callback tracking
+  struct RecoveryCallbackTracker {
+    int on_local_recover_calls = 0;
+    std::vector<hobject_t> on_local_recover_objects;
+
+    std::map<pg_shard_t, int> on_peer_recover_calls;
+    std::vector<std::pair<pg_shard_t, hobject_t>> on_peer_recover_objects;
+
+    int on_global_recover_calls = 0;
+    std::vector<hobject_t> on_global_recover_objects;
+
+    void reset() {
+      on_local_recover_calls = 0;
+      on_local_recover_objects.clear();
+      on_peer_recover_calls.clear();
+      on_peer_recover_objects.clear();
+      on_global_recover_calls = 0;
+      on_global_recover_objects.clear();
+    }
+  };
+  RecoveryCallbackTracker recovery_tracker;
+
   MockPGBackendListener(OSDMapRef osdmap, int64_t pool_id, DoutPrefixProvider *dpp, pg_shard_t pg_whoami, PeeringState *ps = nullptr) :
     osdmap(osdmap), pool_id(pool_id), log(g_ceph_context), dpp(dpp), pg_whoami(pg_whoami), peering_state(ps) {
     // Create a full OSD PerfCounters using the standard build_osd_logger function.
@@ -108,22 +130,58 @@ public:
   // Recovery callbacks
   void on_local_recover(
     const hobject_t &oid,
-    const ObjectRecoveryInfo &recovery_info,
+    const ObjectRecoveryInfo &_recovery_info,
     ObjectContextRef obc,
     bool is_delete,
     ObjectStore::Transaction *t) override {
+    recovery_tracker.on_local_recover_calls++;
+    recovery_tracker.on_local_recover_objects.push_back(oid);
+
+    // Make a copy of recovery_info as we may need to modify it
+    ObjectRecoveryInfo recovery_info(_recovery_info);
+    if (!is_delete && peering_state &&
+        peering_state->get_pg_log().get_missing().is_missing(recovery_info.soid) &&
+        peering_state->get_pg_log().get_missing().get_items().find(recovery_info.soid)->second.need > recovery_info.version) {
+      ceph_assert(pgb_is_primary());
+    }
+
+    // Call into PeeringState to update recovery state
+    if (peering_state) {
+      peering_state->recover_got(recovery_info.soid, recovery_info.version, is_delete, *t);
+    }
+
+    // Register transaction callbacks (similar to PrimaryLogPG::on_local_recover)
+    // Note: In the mock, we don't track active_pushes or handle all the same callbacks,
+    // but we should register basic callbacks if needed by tests
+    if (peering_state && pgb_is_primary()) {
+      if (!is_delete && obc) {
+        obc->obs.exists = true;
+        obc->obs.oi = recovery_info.oi;
+      }
+    }
   }
 
   void on_global_recover(
     const hobject_t &oid,
     const object_stat_sum_t &stat_diff,
     bool is_delete) override {
+    recovery_tracker.on_global_recover_calls++;
+    recovery_tracker.on_global_recover_objects.push_back(oid);
+
+    // Call into PeeringState to mark object as fully recovered
+    if (peering_state) {
+      peering_state->object_recovered(oid, stat_diff);
+    }
   }
 
   void on_peer_recover(
     pg_shard_t peer,
     const hobject_t &oid,
     const ObjectRecoveryInfo &recovery_info) override {
+    recovery_tracker.on_peer_recover_calls[peer]++;
+    recovery_tracker.on_peer_recover_objects.push_back({peer, oid});
+
+    // Call into PeeringState to update peer missing state
     if (peering_state) {
       peering_state->on_peer_recover(peer, oid, recovery_info.version);
     }
@@ -140,6 +198,10 @@ public:
   void apply_stats(
     const hobject_t &soid,
     const object_stat_sum_t &delta_stats) override {
+    // Mimic PrimaryLogPG::apply_stats() - apply stats to PeeringState
+    if (peering_state) {
+      peering_state->apply_op_stats(soid, delta_stats);
+    }
   }
 
   void on_failed_pull(
@@ -254,10 +316,16 @@ public:
 
   // Shard information
   const std::set<pg_shard_t> &get_acting_recovery_backfill_shards() const override {
+    if (peering_state) {
+      return peering_state->get_acting_recovery_backfill();
+    }
     return shardset;
   }
 
-  const shard_id_set &get_acting_recovery_backfill_shard_id_set() const {
+  const shard_id_set &get_acting_recovery_backfill_shard_id_set() const override {
+    if (peering_state) {
+      return peering_state->get_acting_recovery_backfill_shard_id_set();
+    }
     return acting_recovery_backfill_shard_id_set;
   }
 
@@ -392,6 +460,31 @@ public:
   }
 
   bool should_send_op(pg_shard_t peer, const hobject_t &hoid) override {
+    // If we're sending to ourselves (primary), always send
+    if (peer == pg_whoami)
+      return true;
+
+    // If we have a peering_state, use it to check async_recovery_targets
+    if (peering_state) {
+      // Check if peer is an async_recovery_target with this object missing
+      if (peering_state->is_async_recovery_target(peer)) {
+        const pg_missing_t &peer_missing = peering_state->get_peer_missing(peer);
+        if (peer_missing.is_missing(hoid)) {
+          // Object is missing on async_recovery_target, send empty transaction
+          return false;
+        }
+      }
+
+      // Check backfill logic
+      if (peering_state->is_backfill_target(peer)) {
+        const pg_info_t &peer_info = peering_state->get_peer_info(peer);
+        // If object is beyond peer's last_backfill, don't send full transaction
+        if (hoid > peer_info.last_backfill) {
+          return false;
+        }
+      }
+    }
+
     return true;
   }
 
@@ -672,7 +765,12 @@ public:
   }
 
   bool is_missing_object(const hobject_t& oid) const override {
-    return false;
+    // Check if object is in the missing set (same as PrimaryLogPG::is_missing_object)
+    if (peering_state) {
+      return peering_state->get_pg_log().get_missing().get_items().count(oid);
+    }
+    // Fallback for tests without peering_state
+    return log.get_missing().get_items().count(oid);
   }
   void send_message_osd_cluster(
     int osd, MOSDPGPush* msg, epoch_t from_epoch) override {
diff --git a/src/test/osd/MockStore.h b/src/test/osd/MockStore.h
new file mode 100644 (file)
index 0000000..572dfed
--- /dev/null
@@ -0,0 +1,109 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2026 IBM
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include "os/memstore/MemStore.h"
+#include <map>
+#include <mutex>
+
+/**
+ * MockMemStore - MemStore wrapper with error injection capabilities
+ *
+ * This class extends MemStore to allow injecting read errors for specific
+ * objects. This is useful for testing error handling in EC recovery scenarios.
+ *
+ * Error injection is one-time: after an error is injected and returned,
+ * it is automatically cleared so subsequent reads succeed.
+ */
+class MockStore : public MemStore {
+private:
+  /// Map of object -> error code to inject on next read
+  std::map<ghobject_t, int> injected_read_errors;
+  
+  /// Mutex to protect injected_read_errors map
+  std::mutex error_injection_mutex;
+
+public:
+  MockStore(CephContext *cct, const std::string& path)
+    : MemStore(cct, path) {}
+  
+  ~MockStore() override = default;
+
+  /**
+   * Inject a read error for a specific object.
+   * The error will be returned on the next read() call for this object,
+   * then automatically cleared.
+   *
+   * @param oid The object to inject an error for
+   * @param error_code The error code to return (should be negative, e.g., -EIO)
+   */
+  void inject_read_error(const ghobject_t& oid, int error_code) {
+    std::lock_guard<std::mutex> lock(error_injection_mutex);
+    injected_read_errors[oid] = error_code;
+  }
+
+  /**
+   * Clear any injected read error for a specific object.
+   *
+   * @param oid The object to clear the error for
+   */
+  void clear_read_error(const ghobject_t& oid) {
+    std::lock_guard<std::mutex> lock(error_injection_mutex);
+    injected_read_errors.erase(oid);
+  }
+
+  /**
+   * Clear all injected read errors.
+   */
+  void clear_all_read_errors() {
+    std::lock_guard<std::mutex> lock(error_injection_mutex);
+    injected_read_errors.clear();
+  }
+
+  /**
+   * Override read() to check for injected errors before calling parent.
+   * If an error is injected for this object, return it and clear the injection.
+   * Otherwise, call the parent MemStore::read().
+   */
+  int read(
+    CollectionHandle &c,
+    const ghobject_t& oid,
+    uint64_t offset,
+    size_t len,
+    ceph::buffer::list& bl,
+    uint32_t op_flags = 0) override
+  {
+    // Check if we should inject an error for this object
+    int error_code = 0;
+    {
+      std::lock_guard<std::mutex> lock(error_injection_mutex);
+      auto it = injected_read_errors.find(oid);
+      if (it != injected_read_errors.end()) {
+        error_code = it->second;
+        // Clear the error after using it (one-time injection)
+        injected_read_errors.erase(it);
+      }
+    }
+
+    // If we have an injected error, return it
+    if (error_code != 0) {
+      return error_code;
+    }
+
+    // Otherwise, call the parent implementation
+    return MemStore::read(c, oid, offset, len, bl, op_flags);
+  }
+};
index f3904acb99a1b54b1528bd0457dcdd455ecba1fb..ef3753a2e1ca90ed1e06a43eb6acd61f94f5090c 100644 (file)
@@ -286,6 +286,12 @@ public:
     OSDMap::Incremental inc(osdmap.get_epoch() + 1);
     inc.fsid = osdmap.get_fsid();
     inc.new_state[osd_id] = CEPH_OSD_EXISTS;  // Mark as down (exists but not UP)
+
+    // Preserve xinfo features when marking OSD down
+    // This is critical for peering to work correctly with feature checks
+    const osd_xinfo_t& existing_xinfo = osdmap.get_xinfo(osd_id);
+    inc.new_xinfo[osd_id] = existing_xinfo;
+
     osdmap.apply_incremental(inc);
   }
   
@@ -306,6 +312,12 @@ public:
     OSDMap::Incremental inc(osdmap.get_epoch() + 1);
     inc.fsid = osdmap.get_fsid();
     inc.new_state[osd_id] = CEPH_OSD_EXISTS | CEPH_OSD_UP;
+    
+    // Preserve xinfo features when marking OSD up
+    // This is critical for peering to work correctly with feature checks
+    const osd_xinfo_t& existing_xinfo = osdmap.get_xinfo(osd_id);
+    inc.new_xinfo[osd_id] = existing_xinfo;
+    
     osdmap.apply_incremental(inc);
   }
   
@@ -327,6 +339,11 @@ public:
     inc.fsid = osdmap.get_fsid();
     for (int osd_id : osd_ids) {
       inc.new_state[osd_id] = CEPH_OSD_EXISTS;  // Mark as down (exists but not UP)
+
+      // Preserve xinfo features when marking OSD down
+      // This is critical for peering to work correctly with feature checks
+      const osd_xinfo_t& existing_xinfo = osdmap.get_xinfo(osd_id);
+      inc.new_xinfo[osd_id] = existing_xinfo;
     }
     osdmap.apply_incremental(inc);
   }
index 39b1fc9a3c1e4c79db35e7238007355cfb703d2f..aeff882f72d9fc580197b81e6434240d85fd4daa 100644 (file)
@@ -21,6 +21,8 @@
 #include "messages/MOSDECSubOpReadReply.h"
 #include "messages/MOSDRepOp.h"
 #include "messages/MOSDRepOpReply.h"
+#include "messages/MOSDPGPush.h"
+#include "messages/MOSDPGPushReply.h"
 
 void PGBackendTestFixture::setup_ec_pool()
 {
@@ -196,6 +198,8 @@ void PGBackendTestFixture::setup_ec_pool()
   make_backend_handler.template operator()<MOSDECSubOpWriteReply>(MSG_OSD_EC_WRITE_REPLY);
   make_backend_handler.template operator()<MOSDECSubOpRead>(MSG_OSD_EC_READ);
   make_backend_handler.template operator()<MOSDECSubOpReadReply>(MSG_OSD_EC_READ_REPLY);
+  make_backend_handler.template operator()<MOSDPGPush>(MSG_OSD_PG_PUSH);
+  make_backend_handler.template operator()<MOSDPGPushReply>(MSG_OSD_PG_PUSH_REPLY);
 
   for (int i = 0; i < num_osds; i++) {
     listeners[i]->set_messenger(messenger.get());
@@ -747,8 +751,12 @@ void PGBackendTestFixture::update_osdmap(
   }
 
   // Step 3: Clear all attr_caches before on_change()
-  // The cached OI attributes may be stale after a peering event
+  // The cached OI attributes may be stale after a peering event.
+  // Also drop any stale outstanding write tracking: once we enter a new
+  // interval, blocked/in-flight writes from the previous interval should no
+  // longer prevent OBC reloading for rollback/recovery verification.
   clear_all_attr_caches();
+  outstanding_writes.clear();
 
   // Step 4: Schedule on_change() calls as event loop actions
   // This allows them to be delayed and processed after the new epoch
@@ -783,3 +791,107 @@ void PGBackendTestFixture::clear_all_attr_caches()
     }
   }
 }
+
+
+int PGBackendTestFixture::write_attribute(
+  const std::string& obj_name,
+  const std::string& attr_name,
+  const std::string& attr_value,
+  bool force_all_shards)
+{
+  hobject_t hoid = make_test_object(obj_name);
+  PGTransactionUPtr pg_t = std::make_unique<PGTransaction>();
+  
+  ObjectContextRef obc = get_or_create_obc(hoid, true, 0);
+  pg_t->obc_map[hoid] = obc;
+  
+  outstanding_writes[hoid]++;
+  
+  eversion_t prior_version = obc->obs.oi.version;
+  eversion_t at_version = get_next_version();
+  
+  object_info_t new_oi = obc->obs.oi;
+  new_oi.version = at_version;
+  new_oi.prior_version = prior_version;
+  
+  {
+    bufferlist oi_bl;
+    new_oi.encode(oi_bl, osdmap->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
+    pg_t->setattr(hoid, OI_ATTR, oi_bl);
+  }
+  
+  {
+    bufferlist attr_bl;
+    attr_bl.append(attr_value);
+    pg_t->setattr(hoid, attr_name, attr_bl);
+  }
+  
+  obc->obs.oi = new_oi;
+  
+  std::vector<pg_log_entry_t> log_entries;
+  pg_log_entry_t entry;
+  entry.op = pg_log_entry_t::MODIFY;
+  entry.soid = hoid;
+  entry.version = at_version;
+  entry.prior_version = prior_version;
+  log_entries.push_back(entry);
+  
+  object_stat_sum_t delta_stats;
+  
+  auto write_complete = [this, hoid, obc, prior_version](int r) {
+    if (outstanding_writes[hoid] > 0) {
+      outstanding_writes[hoid]--;
+      if (outstanding_writes[hoid] == 0) {
+        outstanding_writes.erase(hoid);
+      }
+    }
+    
+    if (r != 0 && r != -EINPROGRESS) {
+      obc->obs.oi.version = prior_version;
+      obc->attr_cache.clear();
+      outstanding_writes.erase(hoid);
+    }
+  };
+  
+  // Control first_write_in_interval to simulate different write patterns
+  if (force_all_shards && pool_type == EC) {
+    PGBackend* primary_backend = get_primary_backend();
+    if (primary_backend) {
+      primary_backend->on_change();
+    }
+  }
+  
+  int result = do_transaction_and_complete(
+    hoid, std::move(pg_t), delta_stats, at_version, std::move(log_entries), write_complete);
+  
+  return result;
+}
+
+object_info_t PGBackendTestFixture::read_shard_object_info(
+  const std::string& obj_name,
+  int shard)
+{
+  hobject_t hoid = make_test_object(obj_name);
+  ghobject_t ghoid(hoid, ghobject_t::NO_GEN, shard_id_t(shard));
+  
+  auto ch_it = chs.find(shard);
+  if (ch_it == chs.end()) {
+    std::cerr << "ERROR: No collection handle for shard " << shard << std::endl;
+    return object_info_t(hoid);
+  }
+  
+  ceph::buffer::ptr value_ptr;
+  int r = store->getattr(ch_it->second, ghoid, OI_ATTR, value_ptr);
+  if (r < 0) {
+    std::cerr << "ERROR: Failed to read OI_ATTR from shard " << shard 
+              << ": " << cpp_strerror(r) << std::endl;
+    return object_info_t(hoid);
+  }
+  
+  bufferlist bl;
+  bl.append(value_ptr);
+  auto p = bl.cbegin();
+  object_info_t oi;
+  oi.decode(p);
+  return oi;
+}
index cab26ba58f8f7ce785e3b3c6cada0be93a545106..0269f9c3c2653da5ae46f9d11356b3cbba29402f 100644 (file)
@@ -28,6 +28,7 @@
 #include "test/osd/MockMessenger.h"
 #include "common/TrackedOp.h"
 #include "os/memstore/MemStore.h"
+#include "test/osd/MockStore.h"
 #include "osd/ECSwitch.h"
 #include "osd/ECExtentCache.h"
 #include "osd/ReplicatedBackend.h"
@@ -58,7 +59,7 @@ protected:
   // Default includes both OVERWRITES and OPTIMIZATIONS flags.
   uint64_t pool_flags = pg_pool_t::FLAG_EC_OVERWRITES | pg_pool_t::FLAG_EC_OPTIMIZATIONS;
   
-  std::unique_ptr<MemStore> store;
+  std::unique_ptr<MockStore> store;
   std::string data_dir;
   ObjectStore::CollectionHandle ch;
   coll_t coll;
@@ -148,8 +149,8 @@ public:
     }
     ASSERT_EQ(0, r);
     
-    // Create MemStore - contexts are stolen by MockPGBackendListener, so we don't need manual_finisher
-    store.reset(new MemStore(g_ceph_context, data_dir));
+    // Create MockMemStore - contexts are stolen by MockPGBackendListener, so we don't need manual_finisher
+    store.reset(new MockStore(g_ceph_context, data_dir));
     ASSERT_TRUE(store);
     ASSERT_EQ(0, store->mkfs());
     ASSERT_EQ(0, store->mount());
@@ -282,45 +283,53 @@ public:
   /// Unlike make_object_context(), this method reuses OBCs for the same
   /// object across operations, which is essential for attr_cache continuity
   /// in EC pools.
+  /// @param primary_shard The shard ID to read attributes from (for EC pools)
   ObjectContextRef get_or_create_obc(
     const hobject_t& hoid,
     bool exists = false,
-    uint64_t size = 0)
+    uint64_t size = 0,
+    int primary_shard = 0)
   {
     auto it = object_contexts.find(hoid);
+    ObjectContextRef obc;
+
     if (it != object_contexts.end()) {
-      return it->second;
+      obc = it->second;
+    } else {
+      obc = make_object_context(hoid, exists, size);
+      object_contexts[hoid] = obc;
     }
-    ObjectContextRef obc = make_object_context(hoid, exists, size);
-    
+
     // If the object exists and this is an EC pool, populate attr_cache with
-    // ALL attributes from disk. This matches production behavior where the OBC
-    // is loaded with all xattrs from the object store.
-    if (exists && pool_type == EC && store && !chs.empty()) {
+    // ALL attributes from disk if not already populated. This matches production
+    // behavior where the OBC is loaded with all xattrs from the object store.
+    // In EC, attributes are stored per-shard, so we must read from the specified shard.
+    if (exists && pool_type == EC && store && !chs.empty() && obc->attr_cache.empty()) {
       auto writes_it = outstanding_writes.find(hoid);
       bool has_outstanding_writes = (writes_it != outstanding_writes.end() && writes_it->second > 0);
-      
-      // Only read from disk if there are no outstanding writes
-      if (!has_outstanding_writes) {
-        ObjectStore::CollectionHandle ch_primary = chs[0];
-        if (ch_primary) {
-          ghobject_t ghoid(hoid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD);
-          std::map<std::string, ceph::buffer::ptr, std::less<>> attrs;
-          int r = store->getattrs(ch_primary, ghoid, attrs);
-          
-          if (r >= 0) {
-            // Successfully read all attributes from disk - populate the cache
-            for (auto& [key, value_ptr] : attrs) {
-              bufferlist bl;
-              bl.append(value_ptr);
-              obc->attr_cache[key] = std::move(bl);
-            }
+
+      // Cannot read from disk if there are outstanding writes - test bug
+      ceph_assert(!has_outstanding_writes);
+
+      // For EC pools, attributes are stored with the shard ID in the ghobject_t
+      ceph_assert(primary_shard >= 0 && primary_shard < (int)chs.size());
+      ObjectStore::CollectionHandle ch = chs[primary_shard];
+      if (ch) {
+        ghobject_t ghoid(hoid, ghobject_t::NO_GEN, shard_id_t(primary_shard));
+        std::map<std::string, ceph::buffer::ptr, std::less<>> attrs;
+        int r = store->getattrs(ch, ghoid, attrs);
+
+        if (r >= 0) {
+          // Successfully read all attributes from disk - populate the cache
+          for (auto& [key, value_ptr] : attrs) {
+            bufferlist bl;
+            bl.append(value_ptr);
+            obc->attr_cache[key] = std::move(bl);
           }
         }
       }
     }
-    
-    object_contexts[hoid] = obc;
+
     return obc;
   }
   
@@ -458,5 +467,44 @@ public:
    */
   void clear_all_attr_caches();
 
+  /**
+   * Write attributes to an object with control over first_write_in_interval.
+   *
+   * This simulates different types of writes in EC pools:
+   * - force_all_shards=true: Simulates first_write_in_interval=true, causing
+   *   all_shards_written() which updates ALL shards (data + parity)
+   * - force_all_shards=false: Simulates first_write_in_interval=false, causing
+   *   only PRIMARY shards (shard 0 + parity shards) to be updated
+   *
+   * This is useful for testing EC rollback scenarios where version mismatches
+   * can occur between primary and non-primary shards.
+   *
+   * @param obj_name Name of the object
+   * @param attr_name Name of the attribute to write
+   * @param attr_value Value of the attribute
+   * @param force_all_shards If true, forces all shards to be written
+   * @return Result code (0 on success, -EINPROGRESS if blocked, negative on error)
+   */
+  int write_attribute(
+    const std::string& obj_name,
+    const std::string& attr_name,
+    const std::string& attr_value,
+    bool force_all_shards);
+
+  /**
+   * Read object_info_t directly from the ObjectStore for a specific shard.
+   *
+   * This bypasses the OBC cache and reads the actual on-disk state,
+   * which is useful for verifying version consistency across shards
+   * after rollback or peering events.
+   *
+   * @param obj_name Name of the object
+   * @param shard Shard ID to read from
+   * @return The object_info_t decoded from the shard's OI_ATTR
+   */
+  object_info_t read_shard_object_info(
+    const std::string& obj_name,
+    int shard);
+
 };
 
index 6a6a3d3476182a85fb31755192ddbf10712f29af..2ac088d3975ad15133ca93cc29701a09ac881dab 100644 (file)
@@ -16,6 +16,7 @@
 #include <gtest/gtest.h>
 #include "test/osd/ECPeeringTestFixture.h"
 #include "test/osd/TestCommon.h"
+#include "osd/ECSwitch.h"
 
 using namespace std;
 
@@ -358,7 +359,453 @@ TEST_P(
 
   std::cout << "\n=== RollbackAfterOSDFailure Test Complete ===" << std::endl;
 }
+/**
+ * ECRecoveryTest - Test EC recovery scenario with missing objects
+ *
+ * This test verifies the EC recovery mechanism by:
+ * 1. Writing and verifying an object
+ * 2. Removing an OSD from the acting set (simulating OSD failure)
+ * 3. Performing an overwrite to the object (creating a version mismatch)
+ * 4. Adding the OSD back to the acting set
+ * 5. Inspecting the missing list to verify the object is marked as missing
+ * 6. Demonstrating that the primary can open a recovery operation
+ *
+ * The test runs multiple times, once for each OSD to fail:
+ * - OSD 1 (always)
+ */
+TEST_P(TestECFailoverWithPeering, ECRecoveryTest) {
+  ASSERT_TRUE(all_shards_active()) << "Initial peering must complete";
+
+  std::vector<int> osds_to_test;
+  osds_to_test.push_back(1); // Non-primary
+  osds_to_test.push_back(0); // Primary
+  osds_to_test.push_back(k); // First coding shard
+
+  // Run the test for each OSD
+  for (int removed_osd : osds_to_test) {
+    const std::string obj_name = "test_ec_recovery_osd" + std::to_string(removed_osd);
+    const size_t data_size = stripe_unit * k;  // One full stripe.
+    std::string pattern_a(data_size, 'A');
+    std::string pattern_b(data_size, 'B');
+
+    create_and_write_verify(obj_name, pattern_a);
+    mark_osd_down(removed_osd);
+    write_verify(obj_name, 0, pattern_b, data_size);
+    mark_osd_up(removed_osd);
+
+    // Use the fixture helper to run recovery and verify callbacks
+    run_recovery_and_verify_callbacks(obj_name, removed_osd, pattern_b);
+
+    std::cout << "=== Recovery test with OSD " << removed_osd << " completed successfully ===" << std::endl;
+  }
+}
+
+/**
+ * ECSequentialOSDFailoverTest - Test sequential OSD failure and recovery
+ *
+ * This test verifies the EC recovery mechanism by sequentially failing and
+ * recovering each OSD in the cluster:
+ * 1. Create an object and write initial data
+ * 2. For each OSD (0 to (k+m)*num_zones - 1):
+ *    a. Fail the OSD
+ *    b. Write new data to the object (overwrite)
+ *    c. Recover the OSD
+ *    d. Verify recovery completes
+ * 3. Verify final data is correct
+ *
+ * Unlike ECRecoveryTest which creates a new object for each OSD failure,
+ * this test performs a new write to the same object on each cycle.
+ */
+TEST_P(TestECFailoverWithPeering, ECSequentialOSDFailoverTest) {
+  ASSERT_TRUE(all_shards_active()) << "Initial peering must complete";
+
+  const std::string obj_name = "test_sequential_failover";
+  const size_t data_size = stripe_unit * k;  // One full stripe
+
+  // Calculate total number of OSDs to test
+  int total_osds = (k + m);
+
+  std::cout << "\n=== Testing sequential OSD failover for " << total_osds
+            << " OSDs (k=" << k << ", m=" << m << ") ===" << std::endl;
+
+  // Create object with initial pattern
+  std::string initial_pattern(data_size, 'A');
+  create_and_write_verify(obj_name, initial_pattern);
+
+  // Cycle through each OSD, failing and recovering it
+  for (int osd_to_fail = 0; osd_to_fail < total_osds; osd_to_fail++) {
+    char pattern_char = 'B' + (osd_to_fail % 25);  // Cycle through B-Z, then wrap
+    std::string cycle_pattern(data_size, pattern_char);
+    mark_osd_down(osd_to_fail);
+    write_verify(obj_name, 0, cycle_pattern, data_size);
+    mark_osd_up(osd_to_fail);
+    run_recovery_and_verify_callbacks(obj_name, osd_to_fail, cycle_pattern);
+  }
+
+  std::cout << "\n=== Sequential OSD failover test completed successfully ===" << std::endl;
+}
+
+/**
+ * ECZoneRecoveryTest - Test zone-level EC recovery scenario (zone 0 fails first)
+ *
+ * This test reproduces a bug whereby a full write, following a partial write
+ * will rollback to an OI with an incorrect previous version.
+ *
+ * Recreate https://tracker.ceph.com/issues/76213
+ */
+TEST_P(TestECFailoverWithPeering, DISABLED_RollbackVersionMismatch) {
+  if (k < 3) {
+    GTEST_SKIP() << "SnapshotTrimRollbackVersionMismatch requires at least 3 data shards";
+  }
+
+  ASSERT_TRUE(all_shards_active()) << "Initial peering must complete";
+
+  const std::string obj_name = "test_attr_rollback";
+  int temp_failing_shard = 2;     // Temporarily fail shard 2 for peering interval change
+
+  create_and_write_verify(obj_name, "initial_data");
+  eversion_t v1 = read_shard_object_info(obj_name, 0).version;
+  ASSERT_EQ(v1, read_shard_object_info(obj_name, 1).version);
+  ASSERT_EQ(v1, read_shard_object_info(obj_name, k).version);
+
+  int result = write_attribute(obj_name, "test_attr", "value1", false);
+  ASSERT_EQ(0, result);
+  event_loop->run_until_idle();
 
+  eversion_t v2 = read_shard_object_info(obj_name, 0).version;
+  ASSERT_GT(v2, v1);
+  ASSERT_EQ(v1, read_shard_object_info(obj_name, 1).version);
+  ASSERT_EQ(v2, read_shard_object_info(obj_name, k).version);
+
+  suspend_primary_to_osd(k);
+  result = write_attribute(obj_name, "test_attr", "value2", true);
+  ASSERT_NE(0, result);
+  mark_osd_down(temp_failing_shard);
+  unsuspend_primary_to_osd(k);
+  event_loop->run_until_idle();
+  ASSERT_EQ(v2, read_shard_object_info(obj_name, 0).version);
+  ASSERT_EQ(v1, read_shard_object_info(obj_name, 1).version);
+  ASSERT_EQ(v2, read_shard_object_info(obj_name, k).version);
+
+}
+
+/**
+ * TEST: MultiObjectRecoveryReadCrash
+ *
+ * This test reproduces Bug 75432: Assertion failure in ECCommon::ReadPipeline::do_read_op()
+ * when handling multi-object EC reads with partial failures.
+ *
+ * The bug occurs when:
+ * 1. Multiple objects of different sizes are read simultaneously
+ * 2. Smaller objects complete successfully (shard_reads cleared)
+ * 3. A larger object needs additional reads due to a shard failure (need_resend = true)
+ * 4. do_read_op() is called with both completed and incomplete objects
+ */
+TEST_P(TestECFailoverWithPeering, MultiObjectRecoveryReadCrash) {
+  // This test requires k >= 3 and m >= 2
+  if (k < 3 || m < 2) {
+    GTEST_SKIP() << "Test requires k >= 3 and m >= 2";
+  }
+
+  ASSERT_TRUE(all_shards_active()) << "Initial peering must complete";
+
+  // Create objects of different sizes with initial pattern
+  const std::string obj1_name = "crash_test_obj1";
+  const std::string obj1_pattern_a(stripe_unit, 'A');  // 1 chunk
+
+  const std::string obj2_name = "crash_test_obj2";
+  const std::string obj2_pattern_a(2 * stripe_unit, 'A');  // 2 chunks
+
+  const std::string obj3_name = "crash_test_obj3";
+  const std::string obj3_pattern_a(3 * stripe_unit, 'A');  // 3 chunks
+
+  // Write initial pattern to all objects
+  int result = create_and_write(obj1_name, obj1_pattern_a);
+  EXPECT_EQ(result, 0) << "First object write should complete";
+
+  result = create_and_write(obj2_name, obj2_pattern_a);
+  EXPECT_EQ(result, 0) << "Second object write should complete";
+
+  result = create_and_write(obj3_name, obj3_pattern_a);
+  EXPECT_EQ(result, 0) << "Third object write should complete";
+
+  EXPECT_TRUE(all_shards_clean()) << "All shards should be clean";
+
+  // Mark shard 1 as down - this will require recovery
+  int failed_osd = 1;
+  mark_osd_down(failed_osd);
+
+  // Write new pattern to all objects while OSD 1 is down
+  // This creates objects that need recovery on OSD 1
+  const std::string obj1_pattern_b(stripe_unit, 'B');
+  const std::string obj2_pattern_b(2 * stripe_unit, 'B');
+  const std::string obj3_pattern_b(3 * stripe_unit, 'B');
+
+  result = write(obj1_name, 0, obj1_pattern_b, obj1_pattern_b.length());
+  EXPECT_EQ(result, 0) << "First object update should complete";
+
+  result = write(obj2_name, 0, obj2_pattern_b, obj2_pattern_b.length());
+  EXPECT_EQ(result, 0) << "Second object update should complete";
+
+  result = write(obj3_name, 0, obj3_pattern_b, obj3_pattern_b.length());
+  EXPECT_EQ(result, 0) << "Third object update should complete";
+
+  // Bring OSD back up to trigger peering
+  // Peering will detect that OSD 1 has stale data and populate peer_missing
+  mark_osd_up(failed_osd);
+
+  // Inject read error on shard 2 for object 3 only
+  // This will cause object 3's recovery to fail and need resend
+  inject_read_error_for_shard(obj3_name, 2, -EIO);
+
+  // Now trigger recovery for all 3 objects simultaneously
+  // This is the key: recovery reads multiple objects in a single operation
+  // obj1: 1 chunk - reads shard 0 only -> succeeds -> shard_reads cleared
+  // obj2: 2 chunks - reads shards 0, k -> succeeds -> shard_reads cleared
+  // obj3: 3 chunks - reads shards 0, 2, k -> shard 2 fails -> needs resend
+  // BUG: do_read_op() called with obj1/obj2 having empty shard_reads
+
+  std::cout << "Starting recovery for all 3 objects..." << std::endl;
+
+  run_recovery_and_verify_callbacks(obj1_name, failed_osd, obj1_pattern_b);
+  run_recovery_and_verify_callbacks(obj2_name, failed_osd, obj2_pattern_b);
+  run_recovery_and_verify_callbacks(obj3_name, failed_osd, obj3_pattern_b);
+
+  // If the bug is present, we'll crash before getting here
+  // If the bug is fixed, recovery should complete successfully
+  std::cout << "Recovery completed for all objects" << std::endl;
+
+  SUCCEED() << "Multi-object recovery completed without crash";
+}
+
+/**
+ * TEST: MultiObjectParallelRecoveryCrash
+ *
+ * This test reproduces Bug 75432 by recovering multiple objects in parallel
+ * within a single recovery operation (not sequentially).
+ *
+ * The bug occurs when:
+ * 1. Multiple objects are recovered in a single operation (parallel recovery)
+ * 2. Smaller objects complete successfully (shard_reads cleared)
+ * 3. A larger object needs additional reads due to a shard failure (need_resend = true)
+ * 4. do_read_op() is called with both completed and incomplete objects
+ *
+ * Recreate for tracker https://tracker.ceph.com/issues/75432
+ *
+ * Expected behavior WITH fix: Test completes successfully.
+ */
+TEST_P(TestECFailoverWithPeering, DISABLED_MultiObjectParallelRecoveryCrash) {
+  // This test requires k >= 3 and m >= 2
+  if (k < 3 || m < 2) {
+    GTEST_SKIP() << "Test requires k >= 3 and m >= 2";
+  }
+
+  ASSERT_TRUE(all_shards_active()) << "Initial peering must complete";
+
+  // Create objects of different sizes with initial pattern
+  const std::string obj1_name = "crash_test_obj1";
+  const std::string obj1_pattern_a(stripe_unit, 'A');  // 1 chunk
+
+  const std::string obj2_name = "crash_test_obj2";
+  const std::string obj2_pattern_a(2 * stripe_unit, 'A');  // 2 chunks
+
+  const std::string obj3_name = "crash_test_obj3";
+  const std::string obj3_pattern_a(3 * stripe_unit, 'A');  // 3 chunks
+
+  // Write initial pattern to all objects
+  int result = create_and_write(obj1_name, obj1_pattern_a);
+  EXPECT_EQ(result, 0) << "First object write should complete";
+
+  result = create_and_write(obj2_name, obj2_pattern_a);
+  EXPECT_EQ(result, 0) << "Second object write should complete";
+
+  result = create_and_write(obj3_name, obj3_pattern_a);
+  EXPECT_EQ(result, 0) << "Third object write should complete";
+
+  EXPECT_TRUE(all_shards_clean()) << "All shards should be clean";
+
+  // Mark shard 1 as down - this will require recovery
+  int failed_osd = 1;
+  mark_osd_down(failed_osd);
+
+  // Write new pattern to all objects while OSD 1 is down
+  // This creates objects that need recovery on OSD 1
+  const std::string obj1_pattern_b(stripe_unit, 'B');
+  const std::string obj2_pattern_b(2 * stripe_unit, 'B');
+  const std::string obj3_pattern_b(3 * stripe_unit, 'B');
+
+  result = write(obj1_name, 0, obj1_pattern_b, obj1_pattern_b.length());
+  EXPECT_EQ(result, 0) << "First object update should complete";
+
+  result = write(obj2_name, 0, obj2_pattern_b, obj2_pattern_b.length());
+  EXPECT_EQ(result, 0) << "Second object update should complete";
+
+  result = write(obj3_name, 0, obj3_pattern_b, obj3_pattern_b.length());
+  EXPECT_EQ(result, 0) << "Third object update should complete";
+
+  // Bring OSD back up to trigger peering
+  // Peering will detect that OSD 1 has stale data and populate peer_missing
+  mark_osd_up(failed_osd);
+
+  // Inject read error on shard 2 for object 3 only
+  // This will cause object 3's recovery to fail and need resend
+  inject_read_error_for_shard(obj3_name, 2, -EIO);
+
+  // Now trigger recovery for all 3 objects in parallel (single operation)
+  // This is the key difference from the sequential test
+  std::cout << "Starting parallel recovery for all 3 objects..." << std::endl;
+
+  std::vector<std::string> obj_names = {obj1_name, obj2_name, obj3_name};
+  std::vector<std::string> expected_data = {obj1_pattern_b, obj2_pattern_b, obj3_pattern_b};
+  run_parallel_recovery_and_verify_callbacks(obj_names, failed_osd, expected_data);
+
+  // If the bug is present, we'll crash before getting here
+  // If the bug is fixed, recovery should complete successfully
+  std::cout << "Parallel recovery completed for all objects" << std::endl;
+
+  SUCCEED() << "Multi-object parallel recovery completed without crash";
+}
+
+/**
+ * Test rollback after a sequence of blocked full-stripe and chunk writes.
+ * Recreate for tracker https://tracker.ceph.com/issues/75211
+ */
+TEST_P(
+  TestECFailoverWithPeering,
+  DISABLED_RollbackAfterMixedBlockedWritesWithOSDFailure
+) {
+  if (m < 2) {
+    GTEST_SKIP() << "RollbackAfterMixedBlockedWritesWithOSDFailure requires m >= 2";
+  }
+
+  // Set osd_async_recovery_min_cost to 0 to ensure even single-object
+  // recovery uses async recovery. This is necessary because the test
+  // harness doesn't block writes during synchronous recovery, which
+  // would cause writes to missing objects to crash.
+  set_config("osd_async_recovery_min_cost", "0");
+
+  const int blocked_shard = k + 1;
+  const int recovery_target_shard = 1;
+  const std::string obj_name = "test_mixed_blocked_writes";
+  const size_t full_stripe_size = stripe_unit * k;
+  const std::string pattern_p1(full_stripe_size, 'A');
+  const std::string pattern_p2(full_stripe_size, 'B');
+
+  // Trigger an async recovery on shard 1.
+  mark_osd_down(recovery_target_shard);
+  create_and_write_verify(obj_name, pattern_p1);
+  mark_osd_up(recovery_target_shard);
+
+  // Create a dummy object. This is purely here to be the first write in a
+  // new interval, which has some special behavior.
+  create_and_write_verify("dummy", pattern_p1);
+
+  // This has the effect of preventing ops from completing.
+  suspend_primary_to_osd(blocked_shard);
+
+  // Force next partial write to go to all shards (including non-primary)
+  // This uses a side effect of call_write_ordered() which causes the next op
+  // to be sent to all shards, even if it is a partial write.
+  ECSwitch* ec_switch = dynamic_cast<ECSwitch*>(get_primary_backend());
+  ASSERT_NE(nullptr, ec_switch) << "Primary backend must be ECSwitch";
+  ec_switch->call_write_ordered([] {});
+
+  // This is a partial write that will be sent to all shards due to the above
+  // above mechanism. NOTE: This is different to the force_all_shards boolean
+  // below, which generates a full write, rather than a partial write sent to
+  // all shards!
+  int result = write_attribute(obj_name, "test_attr", "value2", false);
+  ASSERT_EQ(-EINPROGRESS, result);
+
+  // Add a full write. In the defect, the diverge log "merge" code ended up
+  // using this version in the missing list - which is wrong.
+  result = write(obj_name, 0, pattern_p2, full_stripe_size);
+  ASSERT_EQ(-EINPROGRESS, result);
+
+  // Mark an otherwise-uninvolved shard as down to trigger the rollback of
+  // above
+  mark_osd_down(2);
+  unsuspend_primary_to_osd(blocked_shard);
+  event_loop->run_until_idle();
+
+  // Now run the recovery - the target shard asserts it is being written with
+  // the object version it is expecting. In the defect, this assert failed.
+  run_recovery_and_verify_callbacks(obj_name, recovery_target_shard, pattern_p1);
+
+  // Undo our config change!
+  set_config("osd_async_recovery_min_cost", "100");
+}
+
+/**
+ * Test rollback after a sequence of blocked full-stripe and chunk writes.
+ * This is a similar scenario to the previous test, but we force the shard
+ * to do a sync, rather than async recovery at the end.
+ * Recreate for tracker https://tracker.ceph.com/issues/75211
+ */
+TEST_P(
+  TestECFailoverWithPeering,
+  DISABLED_RollbackAfterMixedBlockedWritesWithOSDFailure2
+) {
+  if (m < 2) {
+    GTEST_SKIP() << "RollbackAfterMixedBlockedWritesWithOSDFailure requires m >= 2";
+  }
+
+  // Set osd_async_recovery_min_cost to 0 to ensure even single-object
+  // recovery uses async recovery. This is necessary because the test
+  // harness doesn't block writes during synchronous recovery, which
+  // would cause writes to missing objects to crash.
+  set_config("osd_async_recovery_min_cost", "0");
+
+  const int blocked_shard = k + 1;
+  const int recovery_target_shard = 1;
+  const std::string obj_name = "test_mixed_blocked_writes";
+  const size_t full_stripe_size = stripe_unit * k;
+  const std::string pattern_p1(full_stripe_size, 'A');
+  const std::string pattern_p2(full_stripe_size, 'B');
+
+  // Trigger an async recovery on shard 1.
+  mark_osd_down(recovery_target_shard);
+  create_and_write_verify(obj_name, pattern_p1);
+  mark_osd_up(recovery_target_shard);
+
+  // Create a dummy object. This is purely here to be the first write in a
+  // new interval, which has some special behavior.
+  create_and_write_verify("dummy", pattern_p1);
+
+  // This has the effect of preventing ops from completing.
+  suspend_primary_to_osd(blocked_shard);
+
+  // Force next partial write to go to all shards (including non-primary)
+  // This uses a side effect of call_write_ordered() which causes the next op
+  // to be sent to all shards, even if it is a partial write.
+  ECSwitch* ec_switch = dynamic_cast<ECSwitch*>(get_primary_backend());
+  ASSERT_NE(nullptr, ec_switch) << "Primary backend must be ECSwitch";
+  ec_switch->call_write_ordered([] {});
+
+  // This is a partial write that will be sent to all shards due to the above
+  // above mechanism. NOTE: This is different to the force_all_shards boolean
+  // below, which generates a full write, rather than a partial write sent to
+  // all shards!
+  int result = write_attribute(obj_name, "test_attr", "value2", false);
+  ASSERT_EQ(-EINPROGRESS, result);
+
+  // Add a full write. In the defect, the diverge log "merge" code ended up
+  // using this version in the missing list - which is wrong.
+  result = write(obj_name, 0, pattern_p2, full_stripe_size);
+  ASSERT_EQ(-EINPROGRESS, result);
+
+  set_config("osd_async_recovery_min_cost", "100");
+
+  // Mark an otherwise-uninvolved shard as down to trigger the rollback of
+  // above
+  mark_osd_down(2);
+  unsuspend_primary_to_osd(blocked_shard);
+  event_loop->run_until_idle();
+
+  // Now run the recovery - the target shard asserts it is being written with
+  // the object version it is expecting. In the defect, this assert failed.
+  run_recovery_and_verify_callbacks(obj_name, recovery_target_shard, pattern_p1);
+}
 
 // ---------------------------------------------------------------------------
 // Instantiate TestECFailoverWithPeering with EC configurations