out << "shard " << shard << ": ";
if (fixture->shard_peering_states.contains(shard)) {
PeeringState *ps = fixture->shard_peering_states[shard].get();
- out << *ps << " ";
+ out << *ps;
+
+ // Add missing stats like PG::operator<< does (mimics production code)
+ out << " m=" << ps->get_num_missing();
+ if (ps->is_primary()) {
+ uint64_t unfound = ps->get_num_unfound();
+ out << " u=" << unfound;
+ }
+ if (!ps->is_clean()) {
+ out << " mbc=" << ps->get_missing_by_count();
+ }
+
+ out << " ";
}
return out;
}
void ECPeeringTestFixture::SetUp() {
PGBackendTestFixture::SetUp();
+
+ // The harness does not use CRUSH, so we must set an upmap. Choose the upmap
+ // to have shard == osd.
+ {
+ std::vector<int> initial_acting;
+ for (int i = 0; i < k + m; ++i) {
+ initial_acting.push_back(i);
+ }
+ OSDMap::Incremental inc(osdmap->get_epoch() + 1);
+ inc.fsid = osdmap->get_fsid();
+ inc.new_pg_upmap[pgid] =
+ mempool::osdmap::vector<int32_t>(
+ initial_acting.begin(),
+ initial_acting.end());
+ osdmap->apply_incremental(inc);
+ }
+
for (int i = 0; i < k + m; i++) {
create_peering_state(i);
}
messenger->register_typed_handler<MOSDPeeringOp>(MSG_OSD_PG_LOG, peering_handler);
messenger->register_typed_handler<MOSDPeeringOp>(MSG_OSD_PG_LEASE, peering_handler);
messenger->register_typed_handler<MOSDPeeringOp>(MSG_OSD_PG_LEASE_ACK, peering_handler);
-
+ messenger->register_typed_handler<MOSDPeeringOp>(MSG_OSD_RECOVERY_RESERVE, peering_handler);
+
// Register idle callback to check for buffered messages
event_loop->register_idle_callback([this]() -> bool {
bool found_messages = false;
PGBackendTestFixture::TearDown();
}
+void ECPeeringTestFixture::set_config(const std::string& option, const std::string& value) {
+ g_ceph_context->_conf.set_val(option, value);
+ g_ceph_context->_conf.apply_changes(nullptr);
+}
+
PeeringState* ECPeeringTestFixture::get_peering_state(int shard) {
ceph_assert(shard >= 0 && shard < k + m);
auto it = shard_peering_states.find(shard);
}
}
+void ECPeeringTestFixture::inject_read_error_for_shard(const std::string& obj_name, int shard, int error_code) {
+ hobject_t hoid(object_t(obj_name), "", CEPH_NOSNAP, 0, pool_id, "");
+ ghobject_t ghoid(hoid, ghobject_t::NO_GEN, shard_id_t(shard));
+
+ store->inject_read_error(ghoid, error_code);
+}
+
PeeringState* ECPeeringTestFixture::create_peering_state(int shard)
{
const pg_pool_t& pi = get_pool();
std::vector<int> up_osds;
int up_primary = -1;
osdmap->pg_to_up_acting_osds(this->pgid, &up_osds, &up_primary, nullptr, nullptr);
-
+
std::vector<int> acting_temp = listener->next_acting;
if (acting_temp.empty()) {
acting_temp = up_osds;
}
+ // For EC pools with optimizations, transform to primaryfirst order before
+ // storing in pg_temp. This matches what the real monitor does and what
+ // _get_temp_osds() expects when it calls pgtemp_undo_primaryfirst().
+ const pg_pool_t* pool = osdmap->get_pg_pool(this->pgid.pool());
+ if (pool && pool->allows_ecoptimizations()) {
+ acting_temp = osdmap->pgtemp_primaryfirst(*pool, acting_temp);
+ }
+
pending_inc.new_pg_temp[this->pgid] =
- mempool::osdmap::vector<int>(acting_temp.begin(), acting_temp.end());
-
+ mempool::osdmap::vector<int32_t>(acting_temp.begin(), acting_temp.end());
+
listener->pg_temp_wanted = false;
did_work = true;
}
update_osdmap_with_peering(new_osdmap);
}
+void ECPeeringTestFixture::run_recovery_and_verify_callbacks(
+ const std::string& obj_name,
+ int removed_osd,
+ const std::string& expected_data)
+{
+ // Delegate to the parallel version with a single object
+ run_parallel_recovery_and_verify_callbacks(
+ {obj_name},
+ removed_osd,
+ {expected_data});
+}
+
+void ECPeeringTestFixture::run_parallel_recovery_and_verify_callbacks(
+ const std::vector<std::string>& obj_names,
+ int target_osd,
+ const std::vector<std::string>& expected_data)
+{
+ // Verify we have matching sizes
+ ASSERT_EQ(obj_names.size(), expected_data.size())
+ << "obj_names and expected_data must have the same size";
+
+ // Get the actual primary from the OSDMap
+ int primary_shard = get_primary_shard_from_osdmap();
+ if (primary_shard < 0 || primary_shard == CRUSH_ITEM_NONE) {
+ // No valid primary, cannot run recovery
+ return;
+ }
+ auto primary_ps = get_peering_state(primary_shard);
+ pg_shard_t target_shard(target_osd, shard_id_t(target_osd));
+
+ std::cout << "\n=== Starting Parallel Recovery for " << obj_names.size()
+ << " objects ===" << std::endl;
+
+ // Step 1: Verify all objects are in the missing set and prepare recovery
+ std::vector<hobject_t> hoids;
+ std::vector<ObjectContextRef> obcs;
+ std::vector<pg_missing_item> missing_items;
+
+ for (size_t i = 0; i < obj_names.size(); ++i) {
+ hobject_t hoid = make_test_object(obj_names[i]);
+ hoids.push_back(hoid);
+
+ pg_missing_item missing_item;
+
+ // Check if the target OSD is the current primary
+ // If so, check the primary's own missing set; otherwise check peer_missing
+ if (target_osd == primary_shard) {
+ // The target OSD became primary again after coming back up
+ // Check the primary's own missing set
+ const pg_missing_t& primary_missing = primary_ps->get_pg_log().get_missing();
+ ASSERT_TRUE(primary_missing.have_missing())
+ << "Primary OSD " << target_osd << " should have missing objects after coming back up";
+
+ ASSERT_TRUE(primary_missing.is_missing(hoid, &missing_item))
+ << "Object " << obj_names[i] << " should be in primary " << target_osd << "'s missing set";
+
+ std::cout << " OSD " << target_osd << " is the primary and has object " << obj_names[i] << " in its own missing set" << std::endl;
+ } else {
+
+ // The target OSD is a peer, check peer_missing
+ const auto& peer_missing_map = primary_ps->get_peer_missing();
+ auto peer_missing_it = peer_missing_map.find(target_shard);
+ ASSERT_NE(peer_missing_it, peer_missing_map.end())
+ << "Primary should have peer_missing entry for OSD " << target_osd;
+
+ const pg_missing_t& peer_missing = peer_missing_it->second;
+ ASSERT_TRUE(peer_missing.have_missing())
+ << "Peer OSD " << target_osd << " should have missing objects after coming back up";
+
+ ASSERT_TRUE(peer_missing.is_missing(hoid, &missing_item))
+ << "Object " << obj_names[i] << " should be in peer " << target_osd << "'s missing set";
+
+ auto target_ps = get_peering_state(target_osd);
+ const pg_missing_t& target_missing = target_ps->get_pg_log().get_missing();
+ ASSERT_TRUE(target_missing.have_missing())
+ << "Target OSD " << target_osd << " should have missing objects after coming back up";
+
+ pg_missing_item target_missing_item;
+ ASSERT_TRUE(target_missing.is_missing(hoid, &target_missing_item))
+ << "Object " << obj_names[i] << " should be in peer " << target_osd << "'s missing set";
+
+ ASSERT_EQ(target_missing_item, missing_item) << "Missing on shard and primary should match";
+
+
+ std::cout << " OSD " << target_osd << " is a peer and has object " << obj_names[i] << " in peer_missing" << std::endl;
+ }
+
+ missing_items.push_back(missing_item);
+
+ // Create OBC for this object
+ ObjectContextRef obc = get_or_create_obc(hoid, true, expected_data[i].length());
+ ASSERT_FALSE(obc->attr_cache.empty())
+ << "OBC attr_cache must be populated for recovery of " << obj_names[i];
+ obcs.push_back(obc);
+ }
+
+ // Reset recovery callback tracker before starting recovery
+ auto* primary_listener = get_primary_listener();
+ primary_listener->recovery_tracker.reset();
+
+ // Step 2: Open a single recovery operation handle
+ std::cout << "\n Opening single recovery operation for all objects..." << std::endl;
+ PGBackend::RecoveryHandle *h = get_primary_backend()->open_recovery_op();
+
+ // Step 3: Queue ALL objects for recovery in this single operation
+ // This is the key difference - all objects share the same recovery operation
+ std::cout << " Queuing all " << obj_names.size() << " objects for parallel recovery..." << std::endl;
+ for (size_t i = 0; i < obj_names.size(); ++i) {
+ std::cout << " Queuing object " << obj_names[i] << " (hoid: " << hoids[i] << ")" << std::endl;
+ int r = get_primary_backend()->recover_object(
+ hoids[i],
+ missing_items[i].need,
+ ObjectContextRef(),
+ obcs[i],
+ h);
+ ASSERT_EQ(0, r) << "recover_object should successfully queue " << obj_names[i];
+ }
+
+ // Step 4: Run the recovery operation ONCE for all objects
+ // This processes all queued recoveries together in a single operation
+ std::cout << "\n Running single recovery operation for all queued objects..." << std::endl;
+ std::cout << " (This is where Bug 75432 would trigger if present)" << std::endl;
+ get_primary_backend()->run_recovery_op(h, 10); // priority = 10
+ event_loop->run_until_idle();
+
+ // Step 5: Verify recovery callbacks and data for all objects
+ std::cout << "\n === Recovery Callback Verification ===" << std::endl;
+ std::cout << " on_local_recover calls: " << primary_listener->recovery_tracker.on_local_recover_calls << std::endl;
+ std::cout << " on_peer_recover calls: " << primary_listener->recovery_tracker.on_peer_recover_calls.size() << " peers" << std::endl;
+ std::cout << " on_global_recover calls: " << primary_listener->recovery_tracker.on_global_recover_calls << std::endl;
+
+ for (size_t i = 0; i < obj_names.size(); ++i) {
+ std::cout << "\n Verifying object " << obj_names[i] << "..." << std::endl;
+
+ // Verify recovery callback was called for this object
+ bool callback_found = false;
+ if (target_osd == primary_shard) {
+ // Local recovery
+ for (const auto& obj : primary_listener->recovery_tracker.on_local_recover_objects) {
+ if (obj == hoids[i]) {
+ callback_found = true;
+ break;
+ }
+ }
+ EXPECT_TRUE(callback_found)
+ << "on_local_recover should be called for " << obj_names[i];
+ } else {
+ // Peer recovery
+ for (const auto& [peer, obj] : primary_listener->recovery_tracker.on_peer_recover_objects) {
+ if (peer == target_shard && obj == hoids[i]) {
+ callback_found = true;
+ break;
+ }
+ }
+ EXPECT_TRUE(callback_found)
+ << "on_peer_recover should be called for " << obj_names[i];
+ }
+
+ // Verify the recovered data
+ bufferlist read_bl;
+ int r = read_object(obj_names[i], 0, expected_data[i].length(),
+ read_bl, expected_data[i].length());
+ EXPECT_EQ(r, (int)expected_data[i].length())
+ << "Should read full object " << obj_names[i];
+
+ std::string read_data(read_bl.c_str(), read_bl.length());
+ EXPECT_EQ(read_data, expected_data[i])
+ << "Recovered data should match for " << obj_names[i];
+
+ std::cout << " ✓ Object " << obj_names[i] << " recovered successfully" << std::endl;
+ }
+
+ // Verify on_global_recover was called for all objects
+ EXPECT_EQ((int)obj_names.size(), primary_listener->recovery_tracker.on_global_recover_calls)
+ << "on_global_recover should be called once for each object";
+
+ std::cout << "\n === All parallel recovery callbacks and data verified successfully ===" << std::endl;
+}
void event_advance_map();
void event_activate_map();
+ /**
+ * set_config - Set a configuration option for testing
+ *
+ * @param option The configuration option name
+ * @param value The value to set
+ */
+ void set_config(const std::string& option, const std::string& value);
+
private:
/**
* dispatch_buffered_messages - Check for and dispatch any buffered messages
* @param to_osd The OSD number to unblock messages to (from the primary)
*/
void unsuspend_primary_to_osd(int to_osd);
+
+ /**
+ * Inject a read error for a specific object on a specific shard's store.
+ * The error will be returned on the next read() call for this object,
+ * then automatically cleared.
+ *
+ * @param obj_name The name of the object to inject an error for
+ * @param shard The shard number whose store should return the error
+ * @param error_code The error code to return (should be negative, e.g., -EIO)
+ */
+ void inject_read_error_for_shard(const std::string& obj_name, int shard, int error_code);
+
+ /**
+ * run_recovery_and_verify_callbacks - Run recovery for an object and verify callbacks
+ *
+ * This helper function encapsulates the complete EC recovery flow:
+ * 1. Verifies the object is in the peer's missing set
+ * 2. Runs the recovery operation
+ * 3. Verifies all recovery callbacks were invoked correctly
+ * 4. Verifies PeeringState was updated correctly
+ *
+ * @param obj_name The name of the object to recover
+ * @param removed_osd The OSD that was down and needs recovery
+ * @param expected_data The expected data content after recovery
+ */
+ void run_recovery_and_verify_callbacks(
+ const std::string& obj_name,
+ int removed_osd,
+ const std::string& expected_data);
+
+ /**
+ * run_parallel_recovery_and_verify_callbacks - Run parallel recovery for multiple objects
+ *
+ * This helper function recovers multiple objects in parallel within a single recovery
+ * operation. This is the key difference from run_recovery_and_verify_callbacks which
+ * recovers objects sequentially (one at a time).
+ *
+ * The parallel recovery flow:
+ * 1. Calls recover_object() for ALL objects first (queues them)
+ * 2. Calls run_recovery_op() ONCE to process all queued recoveries together
+ * 3. Verifies callbacks and data for all objects
+ *
+ * This reproduces Bug 75432 where multiple objects in a single operation can cause
+ * assertion failures when some complete while others need resend.
+ *
+ * @param obj_names Vector of object names to recover in parallel
+ * @param target_osd The OSD that was down and needs recovery
+ * @param expected_data Vector of expected data content (must match obj_names size)
+ */
+ void run_parallel_recovery_and_verify_callbacks(
+ const std::vector<std::string>& obj_names,
+ int target_osd,
+ const std::vector<std::string>& expected_data);
};
OpTracker *op_tracker = nullptr;
PerfCounters *perf_logger = nullptr;
+ // Recovery callback tracking
+ struct RecoveryCallbackTracker {
+ int on_local_recover_calls = 0;
+ std::vector<hobject_t> on_local_recover_objects;
+
+ std::map<pg_shard_t, int> on_peer_recover_calls;
+ std::vector<std::pair<pg_shard_t, hobject_t>> on_peer_recover_objects;
+
+ int on_global_recover_calls = 0;
+ std::vector<hobject_t> on_global_recover_objects;
+
+ void reset() {
+ on_local_recover_calls = 0;
+ on_local_recover_objects.clear();
+ on_peer_recover_calls.clear();
+ on_peer_recover_objects.clear();
+ on_global_recover_calls = 0;
+ on_global_recover_objects.clear();
+ }
+ };
+ RecoveryCallbackTracker recovery_tracker;
+
MockPGBackendListener(OSDMapRef osdmap, int64_t pool_id, DoutPrefixProvider *dpp, pg_shard_t pg_whoami, PeeringState *ps = nullptr) :
osdmap(osdmap), pool_id(pool_id), log(g_ceph_context), dpp(dpp), pg_whoami(pg_whoami), peering_state(ps) {
// Create a full OSD PerfCounters using the standard build_osd_logger function.
// Recovery callbacks
void on_local_recover(
const hobject_t &oid,
- const ObjectRecoveryInfo &recovery_info,
+ const ObjectRecoveryInfo &_recovery_info,
ObjectContextRef obc,
bool is_delete,
ObjectStore::Transaction *t) override {
+ recovery_tracker.on_local_recover_calls++;
+ recovery_tracker.on_local_recover_objects.push_back(oid);
+
+ // Make a copy of recovery_info as we may need to modify it
+ ObjectRecoveryInfo recovery_info(_recovery_info);
+ if (!is_delete && peering_state &&
+ peering_state->get_pg_log().get_missing().is_missing(recovery_info.soid) &&
+ peering_state->get_pg_log().get_missing().get_items().find(recovery_info.soid)->second.need > recovery_info.version) {
+ ceph_assert(pgb_is_primary());
+ }
+
+ // Call into PeeringState to update recovery state
+ if (peering_state) {
+ peering_state->recover_got(recovery_info.soid, recovery_info.version, is_delete, *t);
+ }
+
+ // Register transaction callbacks (similar to PrimaryLogPG::on_local_recover)
+ // Note: In the mock, we don't track active_pushes or handle all the same callbacks,
+ // but we should register basic callbacks if needed by tests
+ if (peering_state && pgb_is_primary()) {
+ if (!is_delete && obc) {
+ obc->obs.exists = true;
+ obc->obs.oi = recovery_info.oi;
+ }
+ }
}
void on_global_recover(
const hobject_t &oid,
const object_stat_sum_t &stat_diff,
bool is_delete) override {
+ recovery_tracker.on_global_recover_calls++;
+ recovery_tracker.on_global_recover_objects.push_back(oid);
+
+ // Call into PeeringState to mark object as fully recovered
+ if (peering_state) {
+ peering_state->object_recovered(oid, stat_diff);
+ }
}
void on_peer_recover(
pg_shard_t peer,
const hobject_t &oid,
const ObjectRecoveryInfo &recovery_info) override {
+ recovery_tracker.on_peer_recover_calls[peer]++;
+ recovery_tracker.on_peer_recover_objects.push_back({peer, oid});
+
+ // Call into PeeringState to update peer missing state
if (peering_state) {
peering_state->on_peer_recover(peer, oid, recovery_info.version);
}
void apply_stats(
const hobject_t &soid,
const object_stat_sum_t &delta_stats) override {
+ // Mimic PrimaryLogPG::apply_stats() - apply stats to PeeringState
+ if (peering_state) {
+ peering_state->apply_op_stats(soid, delta_stats);
+ }
}
void on_failed_pull(
// Shard information
const std::set<pg_shard_t> &get_acting_recovery_backfill_shards() const override {
+ if (peering_state) {
+ return peering_state->get_acting_recovery_backfill();
+ }
return shardset;
}
- const shard_id_set &get_acting_recovery_backfill_shard_id_set() const {
+ const shard_id_set &get_acting_recovery_backfill_shard_id_set() const override {
+ if (peering_state) {
+ return peering_state->get_acting_recovery_backfill_shard_id_set();
+ }
return acting_recovery_backfill_shard_id_set;
}
}
bool should_send_op(pg_shard_t peer, const hobject_t &hoid) override {
+ // If we're sending to ourselves (primary), always send
+ if (peer == pg_whoami)
+ return true;
+
+ // If we have a peering_state, use it to check async_recovery_targets
+ if (peering_state) {
+ // Check if peer is an async_recovery_target with this object missing
+ if (peering_state->is_async_recovery_target(peer)) {
+ const pg_missing_t &peer_missing = peering_state->get_peer_missing(peer);
+ if (peer_missing.is_missing(hoid)) {
+ // Object is missing on async_recovery_target, send empty transaction
+ return false;
+ }
+ }
+
+ // Check backfill logic
+ if (peering_state->is_backfill_target(peer)) {
+ const pg_info_t &peer_info = peering_state->get_peer_info(peer);
+ // If object is beyond peer's last_backfill, don't send full transaction
+ if (hoid > peer_info.last_backfill) {
+ return false;
+ }
+ }
+ }
+
return true;
}
}
bool is_missing_object(const hobject_t& oid) const override {
- return false;
+ // Check if object is in the missing set (same as PrimaryLogPG::is_missing_object)
+ if (peering_state) {
+ return peering_state->get_pg_log().get_missing().get_items().count(oid);
+ }
+ // Fallback for tests without peering_state
+ return log.get_missing().get_items().count(oid);
}
void send_message_osd_cluster(
int osd, MOSDPGPush* msg, epoch_t from_epoch) override {
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2026 IBM
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include "os/memstore/MemStore.h"
+#include <map>
+#include <mutex>
+
+/**
+ * MockMemStore - MemStore wrapper with error injection capabilities
+ *
+ * This class extends MemStore to allow injecting read errors for specific
+ * objects. This is useful for testing error handling in EC recovery scenarios.
+ *
+ * Error injection is one-time: after an error is injected and returned,
+ * it is automatically cleared so subsequent reads succeed.
+ */
+class MockStore : public MemStore {
+private:
+ /// Map of object -> error code to inject on next read
+ std::map<ghobject_t, int> injected_read_errors;
+
+ /// Mutex to protect injected_read_errors map
+ std::mutex error_injection_mutex;
+
+public:
+ MockStore(CephContext *cct, const std::string& path)
+ : MemStore(cct, path) {}
+
+ ~MockStore() override = default;
+
+ /**
+ * Inject a read error for a specific object.
+ * The error will be returned on the next read() call for this object,
+ * then automatically cleared.
+ *
+ * @param oid The object to inject an error for
+ * @param error_code The error code to return (should be negative, e.g., -EIO)
+ */
+ void inject_read_error(const ghobject_t& oid, int error_code) {
+ std::lock_guard<std::mutex> lock(error_injection_mutex);
+ injected_read_errors[oid] = error_code;
+ }
+
+ /**
+ * Clear any injected read error for a specific object.
+ *
+ * @param oid The object to clear the error for
+ */
+ void clear_read_error(const ghobject_t& oid) {
+ std::lock_guard<std::mutex> lock(error_injection_mutex);
+ injected_read_errors.erase(oid);
+ }
+
+ /**
+ * Clear all injected read errors.
+ */
+ void clear_all_read_errors() {
+ std::lock_guard<std::mutex> lock(error_injection_mutex);
+ injected_read_errors.clear();
+ }
+
+ /**
+ * Override read() to check for injected errors before calling parent.
+ * If an error is injected for this object, return it and clear the injection.
+ * Otherwise, call the parent MemStore::read().
+ */
+ int read(
+ CollectionHandle &c,
+ const ghobject_t& oid,
+ uint64_t offset,
+ size_t len,
+ ceph::buffer::list& bl,
+ uint32_t op_flags = 0) override
+ {
+ // Check if we should inject an error for this object
+ int error_code = 0;
+ {
+ std::lock_guard<std::mutex> lock(error_injection_mutex);
+ auto it = injected_read_errors.find(oid);
+ if (it != injected_read_errors.end()) {
+ error_code = it->second;
+ // Clear the error after using it (one-time injection)
+ injected_read_errors.erase(it);
+ }
+ }
+
+ // If we have an injected error, return it
+ if (error_code != 0) {
+ return error_code;
+ }
+
+ // Otherwise, call the parent implementation
+ return MemStore::read(c, oid, offset, len, bl, op_flags);
+ }
+};
OSDMap::Incremental inc(osdmap.get_epoch() + 1);
inc.fsid = osdmap.get_fsid();
inc.new_state[osd_id] = CEPH_OSD_EXISTS; // Mark as down (exists but not UP)
+
+ // Preserve xinfo features when marking OSD down
+ // This is critical for peering to work correctly with feature checks
+ const osd_xinfo_t& existing_xinfo = osdmap.get_xinfo(osd_id);
+ inc.new_xinfo[osd_id] = existing_xinfo;
+
osdmap.apply_incremental(inc);
}
OSDMap::Incremental inc(osdmap.get_epoch() + 1);
inc.fsid = osdmap.get_fsid();
inc.new_state[osd_id] = CEPH_OSD_EXISTS | CEPH_OSD_UP;
+
+ // Preserve xinfo features when marking OSD up
+ // This is critical for peering to work correctly with feature checks
+ const osd_xinfo_t& existing_xinfo = osdmap.get_xinfo(osd_id);
+ inc.new_xinfo[osd_id] = existing_xinfo;
+
osdmap.apply_incremental(inc);
}
inc.fsid = osdmap.get_fsid();
for (int osd_id : osd_ids) {
inc.new_state[osd_id] = CEPH_OSD_EXISTS; // Mark as down (exists but not UP)
+
+ // Preserve xinfo features when marking OSD down
+ // This is critical for peering to work correctly with feature checks
+ const osd_xinfo_t& existing_xinfo = osdmap.get_xinfo(osd_id);
+ inc.new_xinfo[osd_id] = existing_xinfo;
}
osdmap.apply_incremental(inc);
}
#include "messages/MOSDECSubOpReadReply.h"
#include "messages/MOSDRepOp.h"
#include "messages/MOSDRepOpReply.h"
+#include "messages/MOSDPGPush.h"
+#include "messages/MOSDPGPushReply.h"
void PGBackendTestFixture::setup_ec_pool()
{
make_backend_handler.template operator()<MOSDECSubOpWriteReply>(MSG_OSD_EC_WRITE_REPLY);
make_backend_handler.template operator()<MOSDECSubOpRead>(MSG_OSD_EC_READ);
make_backend_handler.template operator()<MOSDECSubOpReadReply>(MSG_OSD_EC_READ_REPLY);
+ make_backend_handler.template operator()<MOSDPGPush>(MSG_OSD_PG_PUSH);
+ make_backend_handler.template operator()<MOSDPGPushReply>(MSG_OSD_PG_PUSH_REPLY);
for (int i = 0; i < num_osds; i++) {
listeners[i]->set_messenger(messenger.get());
}
// Step 3: Clear all attr_caches before on_change()
- // The cached OI attributes may be stale after a peering event
+ // The cached OI attributes may be stale after a peering event.
+ // Also drop any stale outstanding write tracking: once we enter a new
+ // interval, blocked/in-flight writes from the previous interval should no
+ // longer prevent OBC reloading for rollback/recovery verification.
clear_all_attr_caches();
+ outstanding_writes.clear();
// Step 4: Schedule on_change() calls as event loop actions
// This allows them to be delayed and processed after the new epoch
}
}
}
+
+
+int PGBackendTestFixture::write_attribute(
+ const std::string& obj_name,
+ const std::string& attr_name,
+ const std::string& attr_value,
+ bool force_all_shards)
+{
+ hobject_t hoid = make_test_object(obj_name);
+ PGTransactionUPtr pg_t = std::make_unique<PGTransaction>();
+
+ ObjectContextRef obc = get_or_create_obc(hoid, true, 0);
+ pg_t->obc_map[hoid] = obc;
+
+ outstanding_writes[hoid]++;
+
+ eversion_t prior_version = obc->obs.oi.version;
+ eversion_t at_version = get_next_version();
+
+ object_info_t new_oi = obc->obs.oi;
+ new_oi.version = at_version;
+ new_oi.prior_version = prior_version;
+
+ {
+ bufferlist oi_bl;
+ new_oi.encode(oi_bl, osdmap->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
+ pg_t->setattr(hoid, OI_ATTR, oi_bl);
+ }
+
+ {
+ bufferlist attr_bl;
+ attr_bl.append(attr_value);
+ pg_t->setattr(hoid, attr_name, attr_bl);
+ }
+
+ obc->obs.oi = new_oi;
+
+ std::vector<pg_log_entry_t> log_entries;
+ pg_log_entry_t entry;
+ entry.op = pg_log_entry_t::MODIFY;
+ entry.soid = hoid;
+ entry.version = at_version;
+ entry.prior_version = prior_version;
+ log_entries.push_back(entry);
+
+ object_stat_sum_t delta_stats;
+
+ auto write_complete = [this, hoid, obc, prior_version](int r) {
+ if (outstanding_writes[hoid] > 0) {
+ outstanding_writes[hoid]--;
+ if (outstanding_writes[hoid] == 0) {
+ outstanding_writes.erase(hoid);
+ }
+ }
+
+ if (r != 0 && r != -EINPROGRESS) {
+ obc->obs.oi.version = prior_version;
+ obc->attr_cache.clear();
+ outstanding_writes.erase(hoid);
+ }
+ };
+
+ // Control first_write_in_interval to simulate different write patterns
+ if (force_all_shards && pool_type == EC) {
+ PGBackend* primary_backend = get_primary_backend();
+ if (primary_backend) {
+ primary_backend->on_change();
+ }
+ }
+
+ int result = do_transaction_and_complete(
+ hoid, std::move(pg_t), delta_stats, at_version, std::move(log_entries), write_complete);
+
+ return result;
+}
+
+object_info_t PGBackendTestFixture::read_shard_object_info(
+ const std::string& obj_name,
+ int shard)
+{
+ hobject_t hoid = make_test_object(obj_name);
+ ghobject_t ghoid(hoid, ghobject_t::NO_GEN, shard_id_t(shard));
+
+ auto ch_it = chs.find(shard);
+ if (ch_it == chs.end()) {
+ std::cerr << "ERROR: No collection handle for shard " << shard << std::endl;
+ return object_info_t(hoid);
+ }
+
+ ceph::buffer::ptr value_ptr;
+ int r = store->getattr(ch_it->second, ghoid, OI_ATTR, value_ptr);
+ if (r < 0) {
+ std::cerr << "ERROR: Failed to read OI_ATTR from shard " << shard
+ << ": " << cpp_strerror(r) << std::endl;
+ return object_info_t(hoid);
+ }
+
+ bufferlist bl;
+ bl.append(value_ptr);
+ auto p = bl.cbegin();
+ object_info_t oi;
+ oi.decode(p);
+ return oi;
+}
#include "test/osd/MockMessenger.h"
#include "common/TrackedOp.h"
#include "os/memstore/MemStore.h"
+#include "test/osd/MockStore.h"
#include "osd/ECSwitch.h"
#include "osd/ECExtentCache.h"
#include "osd/ReplicatedBackend.h"
// Default includes both OVERWRITES and OPTIMIZATIONS flags.
uint64_t pool_flags = pg_pool_t::FLAG_EC_OVERWRITES | pg_pool_t::FLAG_EC_OPTIMIZATIONS;
- std::unique_ptr<MemStore> store;
+ std::unique_ptr<MockStore> store;
std::string data_dir;
ObjectStore::CollectionHandle ch;
coll_t coll;
}
ASSERT_EQ(0, r);
- // Create MemStore - contexts are stolen by MockPGBackendListener, so we don't need manual_finisher
- store.reset(new MemStore(g_ceph_context, data_dir));
+ // Create MockMemStore - contexts are stolen by MockPGBackendListener, so we don't need manual_finisher
+ store.reset(new MockStore(g_ceph_context, data_dir));
ASSERT_TRUE(store);
ASSERT_EQ(0, store->mkfs());
ASSERT_EQ(0, store->mount());
/// Unlike make_object_context(), this method reuses OBCs for the same
/// object across operations, which is essential for attr_cache continuity
/// in EC pools.
+ /// @param primary_shard The shard ID to read attributes from (for EC pools)
ObjectContextRef get_or_create_obc(
const hobject_t& hoid,
bool exists = false,
- uint64_t size = 0)
+ uint64_t size = 0,
+ int primary_shard = 0)
{
auto it = object_contexts.find(hoid);
+ ObjectContextRef obc;
+
if (it != object_contexts.end()) {
- return it->second;
+ obc = it->second;
+ } else {
+ obc = make_object_context(hoid, exists, size);
+ object_contexts[hoid] = obc;
}
- ObjectContextRef obc = make_object_context(hoid, exists, size);
-
+
// If the object exists and this is an EC pool, populate attr_cache with
- // ALL attributes from disk. This matches production behavior where the OBC
- // is loaded with all xattrs from the object store.
- if (exists && pool_type == EC && store && !chs.empty()) {
+ // ALL attributes from disk if not already populated. This matches production
+ // behavior where the OBC is loaded with all xattrs from the object store.
+ // In EC, attributes are stored per-shard, so we must read from the specified shard.
+ if (exists && pool_type == EC && store && !chs.empty() && obc->attr_cache.empty()) {
auto writes_it = outstanding_writes.find(hoid);
bool has_outstanding_writes = (writes_it != outstanding_writes.end() && writes_it->second > 0);
-
- // Only read from disk if there are no outstanding writes
- if (!has_outstanding_writes) {
- ObjectStore::CollectionHandle ch_primary = chs[0];
- if (ch_primary) {
- ghobject_t ghoid(hoid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD);
- std::map<std::string, ceph::buffer::ptr, std::less<>> attrs;
- int r = store->getattrs(ch_primary, ghoid, attrs);
-
- if (r >= 0) {
- // Successfully read all attributes from disk - populate the cache
- for (auto& [key, value_ptr] : attrs) {
- bufferlist bl;
- bl.append(value_ptr);
- obc->attr_cache[key] = std::move(bl);
- }
+
+ // Cannot read from disk if there are outstanding writes - test bug
+ ceph_assert(!has_outstanding_writes);
+
+ // For EC pools, attributes are stored with the shard ID in the ghobject_t
+ ceph_assert(primary_shard >= 0 && primary_shard < (int)chs.size());
+ ObjectStore::CollectionHandle ch = chs[primary_shard];
+ if (ch) {
+ ghobject_t ghoid(hoid, ghobject_t::NO_GEN, shard_id_t(primary_shard));
+ std::map<std::string, ceph::buffer::ptr, std::less<>> attrs;
+ int r = store->getattrs(ch, ghoid, attrs);
+
+ if (r >= 0) {
+ // Successfully read all attributes from disk - populate the cache
+ for (auto& [key, value_ptr] : attrs) {
+ bufferlist bl;
+ bl.append(value_ptr);
+ obc->attr_cache[key] = std::move(bl);
}
}
}
}
-
- object_contexts[hoid] = obc;
+
return obc;
}
*/
void clear_all_attr_caches();
+ /**
+ * Write attributes to an object with control over first_write_in_interval.
+ *
+ * This simulates different types of writes in EC pools:
+ * - force_all_shards=true: Simulates first_write_in_interval=true, causing
+ * all_shards_written() which updates ALL shards (data + parity)
+ * - force_all_shards=false: Simulates first_write_in_interval=false, causing
+ * only PRIMARY shards (shard 0 + parity shards) to be updated
+ *
+ * This is useful for testing EC rollback scenarios where version mismatches
+ * can occur between primary and non-primary shards.
+ *
+ * @param obj_name Name of the object
+ * @param attr_name Name of the attribute to write
+ * @param attr_value Value of the attribute
+ * @param force_all_shards If true, forces all shards to be written
+ * @return Result code (0 on success, -EINPROGRESS if blocked, negative on error)
+ */
+ int write_attribute(
+ const std::string& obj_name,
+ const std::string& attr_name,
+ const std::string& attr_value,
+ bool force_all_shards);
+
+ /**
+ * Read object_info_t directly from the ObjectStore for a specific shard.
+ *
+ * This bypasses the OBC cache and reads the actual on-disk state,
+ * which is useful for verifying version consistency across shards
+ * after rollback or peering events.
+ *
+ * @param obj_name Name of the object
+ * @param shard Shard ID to read from
+ * @return The object_info_t decoded from the shard's OI_ATTR
+ */
+ object_info_t read_shard_object_info(
+ const std::string& obj_name,
+ int shard);
+
};
#include <gtest/gtest.h>
#include "test/osd/ECPeeringTestFixture.h"
#include "test/osd/TestCommon.h"
+#include "osd/ECSwitch.h"
using namespace std;
std::cout << "\n=== RollbackAfterOSDFailure Test Complete ===" << std::endl;
}
+/**
+ * ECRecoveryTest - Test EC recovery scenario with missing objects
+ *
+ * This test verifies the EC recovery mechanism by:
+ * 1. Writing and verifying an object
+ * 2. Removing an OSD from the acting set (simulating OSD failure)
+ * 3. Performing an overwrite to the object (creating a version mismatch)
+ * 4. Adding the OSD back to the acting set
+ * 5. Inspecting the missing list to verify the object is marked as missing
+ * 6. Demonstrating that the primary can open a recovery operation
+ *
+ * The test runs multiple times, once for each OSD to fail:
+ * - OSD 1 (always)
+ */
+TEST_P(TestECFailoverWithPeering, ECRecoveryTest) {
+ ASSERT_TRUE(all_shards_active()) << "Initial peering must complete";
+
+ std::vector<int> osds_to_test;
+ osds_to_test.push_back(1); // Non-primary
+ osds_to_test.push_back(0); // Primary
+ osds_to_test.push_back(k); // First coding shard
+
+ // Run the test for each OSD
+ for (int removed_osd : osds_to_test) {
+ const std::string obj_name = "test_ec_recovery_osd" + std::to_string(removed_osd);
+ const size_t data_size = stripe_unit * k; // One full stripe.
+ std::string pattern_a(data_size, 'A');
+ std::string pattern_b(data_size, 'B');
+
+ create_and_write_verify(obj_name, pattern_a);
+ mark_osd_down(removed_osd);
+ write_verify(obj_name, 0, pattern_b, data_size);
+ mark_osd_up(removed_osd);
+
+ // Use the fixture helper to run recovery and verify callbacks
+ run_recovery_and_verify_callbacks(obj_name, removed_osd, pattern_b);
+
+ std::cout << "=== Recovery test with OSD " << removed_osd << " completed successfully ===" << std::endl;
+ }
+}
+
+/**
+ * ECSequentialOSDFailoverTest - Test sequential OSD failure and recovery
+ *
+ * This test verifies the EC recovery mechanism by sequentially failing and
+ * recovering each OSD in the cluster:
+ * 1. Create an object and write initial data
+ * 2. For each OSD (0 to (k+m)*num_zones - 1):
+ * a. Fail the OSD
+ * b. Write new data to the object (overwrite)
+ * c. Recover the OSD
+ * d. Verify recovery completes
+ * 3. Verify final data is correct
+ *
+ * Unlike ECRecoveryTest which creates a new object for each OSD failure,
+ * this test performs a new write to the same object on each cycle.
+ */
+TEST_P(TestECFailoverWithPeering, ECSequentialOSDFailoverTest) {
+ ASSERT_TRUE(all_shards_active()) << "Initial peering must complete";
+
+ const std::string obj_name = "test_sequential_failover";
+ const size_t data_size = stripe_unit * k; // One full stripe
+
+ // Calculate total number of OSDs to test
+ int total_osds = (k + m);
+
+ std::cout << "\n=== Testing sequential OSD failover for " << total_osds
+ << " OSDs (k=" << k << ", m=" << m << ") ===" << std::endl;
+
+ // Create object with initial pattern
+ std::string initial_pattern(data_size, 'A');
+ create_and_write_verify(obj_name, initial_pattern);
+
+ // Cycle through each OSD, failing and recovering it
+ for (int osd_to_fail = 0; osd_to_fail < total_osds; osd_to_fail++) {
+ char pattern_char = 'B' + (osd_to_fail % 25); // Cycle through B-Z, then wrap
+ std::string cycle_pattern(data_size, pattern_char);
+ mark_osd_down(osd_to_fail);
+ write_verify(obj_name, 0, cycle_pattern, data_size);
+ mark_osd_up(osd_to_fail);
+ run_recovery_and_verify_callbacks(obj_name, osd_to_fail, cycle_pattern);
+ }
+
+ std::cout << "\n=== Sequential OSD failover test completed successfully ===" << std::endl;
+}
+
+/**
+ * ECZoneRecoveryTest - Test zone-level EC recovery scenario (zone 0 fails first)
+ *
+ * This test reproduces a bug whereby a full write, following a partial write
+ * will rollback to an OI with an incorrect previous version.
+ *
+ * Recreate https://tracker.ceph.com/issues/76213
+ */
+TEST_P(TestECFailoverWithPeering, DISABLED_RollbackVersionMismatch) {
+ if (k < 3) {
+ GTEST_SKIP() << "SnapshotTrimRollbackVersionMismatch requires at least 3 data shards";
+ }
+
+ ASSERT_TRUE(all_shards_active()) << "Initial peering must complete";
+
+ const std::string obj_name = "test_attr_rollback";
+ int temp_failing_shard = 2; // Temporarily fail shard 2 for peering interval change
+
+ create_and_write_verify(obj_name, "initial_data");
+ eversion_t v1 = read_shard_object_info(obj_name, 0).version;
+ ASSERT_EQ(v1, read_shard_object_info(obj_name, 1).version);
+ ASSERT_EQ(v1, read_shard_object_info(obj_name, k).version);
+
+ int result = write_attribute(obj_name, "test_attr", "value1", false);
+ ASSERT_EQ(0, result);
+ event_loop->run_until_idle();
+ eversion_t v2 = read_shard_object_info(obj_name, 0).version;
+ ASSERT_GT(v2, v1);
+ ASSERT_EQ(v1, read_shard_object_info(obj_name, 1).version);
+ ASSERT_EQ(v2, read_shard_object_info(obj_name, k).version);
+
+ suspend_primary_to_osd(k);
+ result = write_attribute(obj_name, "test_attr", "value2", true);
+ ASSERT_NE(0, result);
+ mark_osd_down(temp_failing_shard);
+ unsuspend_primary_to_osd(k);
+ event_loop->run_until_idle();
+ ASSERT_EQ(v2, read_shard_object_info(obj_name, 0).version);
+ ASSERT_EQ(v1, read_shard_object_info(obj_name, 1).version);
+ ASSERT_EQ(v2, read_shard_object_info(obj_name, k).version);
+
+}
+
+/**
+ * TEST: MultiObjectRecoveryReadCrash
+ *
+ * This test reproduces Bug 75432: Assertion failure in ECCommon::ReadPipeline::do_read_op()
+ * when handling multi-object EC reads with partial failures.
+ *
+ * The bug occurs when:
+ * 1. Multiple objects of different sizes are read simultaneously
+ * 2. Smaller objects complete successfully (shard_reads cleared)
+ * 3. A larger object needs additional reads due to a shard failure (need_resend = true)
+ * 4. do_read_op() is called with both completed and incomplete objects
+ */
+TEST_P(TestECFailoverWithPeering, MultiObjectRecoveryReadCrash) {
+ // This test requires k >= 3 and m >= 2
+ if (k < 3 || m < 2) {
+ GTEST_SKIP() << "Test requires k >= 3 and m >= 2";
+ }
+
+ ASSERT_TRUE(all_shards_active()) << "Initial peering must complete";
+
+ // Create objects of different sizes with initial pattern
+ const std::string obj1_name = "crash_test_obj1";
+ const std::string obj1_pattern_a(stripe_unit, 'A'); // 1 chunk
+
+ const std::string obj2_name = "crash_test_obj2";
+ const std::string obj2_pattern_a(2 * stripe_unit, 'A'); // 2 chunks
+
+ const std::string obj3_name = "crash_test_obj3";
+ const std::string obj3_pattern_a(3 * stripe_unit, 'A'); // 3 chunks
+
+ // Write initial pattern to all objects
+ int result = create_and_write(obj1_name, obj1_pattern_a);
+ EXPECT_EQ(result, 0) << "First object write should complete";
+
+ result = create_and_write(obj2_name, obj2_pattern_a);
+ EXPECT_EQ(result, 0) << "Second object write should complete";
+
+ result = create_and_write(obj3_name, obj3_pattern_a);
+ EXPECT_EQ(result, 0) << "Third object write should complete";
+
+ EXPECT_TRUE(all_shards_clean()) << "All shards should be clean";
+
+ // Mark shard 1 as down - this will require recovery
+ int failed_osd = 1;
+ mark_osd_down(failed_osd);
+
+ // Write new pattern to all objects while OSD 1 is down
+ // This creates objects that need recovery on OSD 1
+ const std::string obj1_pattern_b(stripe_unit, 'B');
+ const std::string obj2_pattern_b(2 * stripe_unit, 'B');
+ const std::string obj3_pattern_b(3 * stripe_unit, 'B');
+
+ result = write(obj1_name, 0, obj1_pattern_b, obj1_pattern_b.length());
+ EXPECT_EQ(result, 0) << "First object update should complete";
+
+ result = write(obj2_name, 0, obj2_pattern_b, obj2_pattern_b.length());
+ EXPECT_EQ(result, 0) << "Second object update should complete";
+
+ result = write(obj3_name, 0, obj3_pattern_b, obj3_pattern_b.length());
+ EXPECT_EQ(result, 0) << "Third object update should complete";
+
+ // Bring OSD back up to trigger peering
+ // Peering will detect that OSD 1 has stale data and populate peer_missing
+ mark_osd_up(failed_osd);
+
+ // Inject read error on shard 2 for object 3 only
+ // This will cause object 3's recovery to fail and need resend
+ inject_read_error_for_shard(obj3_name, 2, -EIO);
+
+ // Now trigger recovery for all 3 objects simultaneously
+ // This is the key: recovery reads multiple objects in a single operation
+ // obj1: 1 chunk - reads shard 0 only -> succeeds -> shard_reads cleared
+ // obj2: 2 chunks - reads shards 0, k -> succeeds -> shard_reads cleared
+ // obj3: 3 chunks - reads shards 0, 2, k -> shard 2 fails -> needs resend
+ // BUG: do_read_op() called with obj1/obj2 having empty shard_reads
+
+ std::cout << "Starting recovery for all 3 objects..." << std::endl;
+
+ run_recovery_and_verify_callbacks(obj1_name, failed_osd, obj1_pattern_b);
+ run_recovery_and_verify_callbacks(obj2_name, failed_osd, obj2_pattern_b);
+ run_recovery_and_verify_callbacks(obj3_name, failed_osd, obj3_pattern_b);
+
+ // If the bug is present, we'll crash before getting here
+ // If the bug is fixed, recovery should complete successfully
+ std::cout << "Recovery completed for all objects" << std::endl;
+
+ SUCCEED() << "Multi-object recovery completed without crash";
+}
+
+/**
+ * TEST: MultiObjectParallelRecoveryCrash
+ *
+ * This test reproduces Bug 75432 by recovering multiple objects in parallel
+ * within a single recovery operation (not sequentially).
+ *
+ * The bug occurs when:
+ * 1. Multiple objects are recovered in a single operation (parallel recovery)
+ * 2. Smaller objects complete successfully (shard_reads cleared)
+ * 3. A larger object needs additional reads due to a shard failure (need_resend = true)
+ * 4. do_read_op() is called with both completed and incomplete objects
+ *
+ * Recreate for tracker https://tracker.ceph.com/issues/75432
+ *
+ * Expected behavior WITH fix: Test completes successfully.
+ */
+TEST_P(TestECFailoverWithPeering, DISABLED_MultiObjectParallelRecoveryCrash) {
+ // This test requires k >= 3 and m >= 2
+ if (k < 3 || m < 2) {
+ GTEST_SKIP() << "Test requires k >= 3 and m >= 2";
+ }
+
+ ASSERT_TRUE(all_shards_active()) << "Initial peering must complete";
+
+ // Create objects of different sizes with initial pattern
+ const std::string obj1_name = "crash_test_obj1";
+ const std::string obj1_pattern_a(stripe_unit, 'A'); // 1 chunk
+
+ const std::string obj2_name = "crash_test_obj2";
+ const std::string obj2_pattern_a(2 * stripe_unit, 'A'); // 2 chunks
+
+ const std::string obj3_name = "crash_test_obj3";
+ const std::string obj3_pattern_a(3 * stripe_unit, 'A'); // 3 chunks
+
+ // Write initial pattern to all objects
+ int result = create_and_write(obj1_name, obj1_pattern_a);
+ EXPECT_EQ(result, 0) << "First object write should complete";
+
+ result = create_and_write(obj2_name, obj2_pattern_a);
+ EXPECT_EQ(result, 0) << "Second object write should complete";
+
+ result = create_and_write(obj3_name, obj3_pattern_a);
+ EXPECT_EQ(result, 0) << "Third object write should complete";
+
+ EXPECT_TRUE(all_shards_clean()) << "All shards should be clean";
+
+ // Mark shard 1 as down - this will require recovery
+ int failed_osd = 1;
+ mark_osd_down(failed_osd);
+
+ // Write new pattern to all objects while OSD 1 is down
+ // This creates objects that need recovery on OSD 1
+ const std::string obj1_pattern_b(stripe_unit, 'B');
+ const std::string obj2_pattern_b(2 * stripe_unit, 'B');
+ const std::string obj3_pattern_b(3 * stripe_unit, 'B');
+
+ result = write(obj1_name, 0, obj1_pattern_b, obj1_pattern_b.length());
+ EXPECT_EQ(result, 0) << "First object update should complete";
+
+ result = write(obj2_name, 0, obj2_pattern_b, obj2_pattern_b.length());
+ EXPECT_EQ(result, 0) << "Second object update should complete";
+
+ result = write(obj3_name, 0, obj3_pattern_b, obj3_pattern_b.length());
+ EXPECT_EQ(result, 0) << "Third object update should complete";
+
+ // Bring OSD back up to trigger peering
+ // Peering will detect that OSD 1 has stale data and populate peer_missing
+ mark_osd_up(failed_osd);
+
+ // Inject read error on shard 2 for object 3 only
+ // This will cause object 3's recovery to fail and need resend
+ inject_read_error_for_shard(obj3_name, 2, -EIO);
+
+ // Now trigger recovery for all 3 objects in parallel (single operation)
+ // This is the key difference from the sequential test
+ std::cout << "Starting parallel recovery for all 3 objects..." << std::endl;
+
+ std::vector<std::string> obj_names = {obj1_name, obj2_name, obj3_name};
+ std::vector<std::string> expected_data = {obj1_pattern_b, obj2_pattern_b, obj3_pattern_b};
+ run_parallel_recovery_and_verify_callbacks(obj_names, failed_osd, expected_data);
+
+ // If the bug is present, we'll crash before getting here
+ // If the bug is fixed, recovery should complete successfully
+ std::cout << "Parallel recovery completed for all objects" << std::endl;
+
+ SUCCEED() << "Multi-object parallel recovery completed without crash";
+}
+
+/**
+ * Test rollback after a sequence of blocked full-stripe and chunk writes.
+ * Recreate for tracker https://tracker.ceph.com/issues/75211
+ */
+TEST_P(
+ TestECFailoverWithPeering,
+ DISABLED_RollbackAfterMixedBlockedWritesWithOSDFailure
+) {
+ if (m < 2) {
+ GTEST_SKIP() << "RollbackAfterMixedBlockedWritesWithOSDFailure requires m >= 2";
+ }
+
+ // Set osd_async_recovery_min_cost to 0 to ensure even single-object
+ // recovery uses async recovery. This is necessary because the test
+ // harness doesn't block writes during synchronous recovery, which
+ // would cause writes to missing objects to crash.
+ set_config("osd_async_recovery_min_cost", "0");
+
+ const int blocked_shard = k + 1;
+ const int recovery_target_shard = 1;
+ const std::string obj_name = "test_mixed_blocked_writes";
+ const size_t full_stripe_size = stripe_unit * k;
+ const std::string pattern_p1(full_stripe_size, 'A');
+ const std::string pattern_p2(full_stripe_size, 'B');
+
+ // Trigger an async recovery on shard 1.
+ mark_osd_down(recovery_target_shard);
+ create_and_write_verify(obj_name, pattern_p1);
+ mark_osd_up(recovery_target_shard);
+
+ // Create a dummy object. This is purely here to be the first write in a
+ // new interval, which has some special behavior.
+ create_and_write_verify("dummy", pattern_p1);
+
+ // This has the effect of preventing ops from completing.
+ suspend_primary_to_osd(blocked_shard);
+
+ // Force next partial write to go to all shards (including non-primary)
+ // This uses a side effect of call_write_ordered() which causes the next op
+ // to be sent to all shards, even if it is a partial write.
+ ECSwitch* ec_switch = dynamic_cast<ECSwitch*>(get_primary_backend());
+ ASSERT_NE(nullptr, ec_switch) << "Primary backend must be ECSwitch";
+ ec_switch->call_write_ordered([] {});
+
+ // This is a partial write that will be sent to all shards due to the above
+ // above mechanism. NOTE: This is different to the force_all_shards boolean
+ // below, which generates a full write, rather than a partial write sent to
+ // all shards!
+ int result = write_attribute(obj_name, "test_attr", "value2", false);
+ ASSERT_EQ(-EINPROGRESS, result);
+
+ // Add a full write. In the defect, the diverge log "merge" code ended up
+ // using this version in the missing list - which is wrong.
+ result = write(obj_name, 0, pattern_p2, full_stripe_size);
+ ASSERT_EQ(-EINPROGRESS, result);
+
+ // Mark an otherwise-uninvolved shard as down to trigger the rollback of
+ // above
+ mark_osd_down(2);
+ unsuspend_primary_to_osd(blocked_shard);
+ event_loop->run_until_idle();
+
+ // Now run the recovery - the target shard asserts it is being written with
+ // the object version it is expecting. In the defect, this assert failed.
+ run_recovery_and_verify_callbacks(obj_name, recovery_target_shard, pattern_p1);
+
+ // Undo our config change!
+ set_config("osd_async_recovery_min_cost", "100");
+}
+
+/**
+ * Test rollback after a sequence of blocked full-stripe and chunk writes.
+ * This is a similar scenario to the previous test, but we force the shard
+ * to do a sync, rather than async recovery at the end.
+ * Recreate for tracker https://tracker.ceph.com/issues/75211
+ */
+TEST_P(
+ TestECFailoverWithPeering,
+ DISABLED_RollbackAfterMixedBlockedWritesWithOSDFailure2
+) {
+ if (m < 2) {
+ GTEST_SKIP() << "RollbackAfterMixedBlockedWritesWithOSDFailure requires m >= 2";
+ }
+
+ // Set osd_async_recovery_min_cost to 0 to ensure even single-object
+ // recovery uses async recovery. This is necessary because the test
+ // harness doesn't block writes during synchronous recovery, which
+ // would cause writes to missing objects to crash.
+ set_config("osd_async_recovery_min_cost", "0");
+
+ const int blocked_shard = k + 1;
+ const int recovery_target_shard = 1;
+ const std::string obj_name = "test_mixed_blocked_writes";
+ const size_t full_stripe_size = stripe_unit * k;
+ const std::string pattern_p1(full_stripe_size, 'A');
+ const std::string pattern_p2(full_stripe_size, 'B');
+
+ // Trigger an async recovery on shard 1.
+ mark_osd_down(recovery_target_shard);
+ create_and_write_verify(obj_name, pattern_p1);
+ mark_osd_up(recovery_target_shard);
+
+ // Create a dummy object. This is purely here to be the first write in a
+ // new interval, which has some special behavior.
+ create_and_write_verify("dummy", pattern_p1);
+
+ // This has the effect of preventing ops from completing.
+ suspend_primary_to_osd(blocked_shard);
+
+ // Force next partial write to go to all shards (including non-primary)
+ // This uses a side effect of call_write_ordered() which causes the next op
+ // to be sent to all shards, even if it is a partial write.
+ ECSwitch* ec_switch = dynamic_cast<ECSwitch*>(get_primary_backend());
+ ASSERT_NE(nullptr, ec_switch) << "Primary backend must be ECSwitch";
+ ec_switch->call_write_ordered([] {});
+
+ // This is a partial write that will be sent to all shards due to the above
+ // above mechanism. NOTE: This is different to the force_all_shards boolean
+ // below, which generates a full write, rather than a partial write sent to
+ // all shards!
+ int result = write_attribute(obj_name, "test_attr", "value2", false);
+ ASSERT_EQ(-EINPROGRESS, result);
+
+ // Add a full write. In the defect, the diverge log "merge" code ended up
+ // using this version in the missing list - which is wrong.
+ result = write(obj_name, 0, pattern_p2, full_stripe_size);
+ ASSERT_EQ(-EINPROGRESS, result);
+
+ set_config("osd_async_recovery_min_cost", "100");
+
+ // Mark an otherwise-uninvolved shard as down to trigger the rollback of
+ // above
+ mark_osd_down(2);
+ unsuspend_primary_to_osd(blocked_shard);
+ event_loop->run_until_idle();
+
+ // Now run the recovery - the target shard asserts it is being written with
+ // the object version it is expecting. In the defect, this assert failed.
+ run_recovery_and_verify_callbacks(obj_name, recovery_target_shard, pattern_p1);
+}
// ---------------------------------------------------------------------------
// Instantiate TestECFailoverWithPeering with EC configurations