#adds makes target/script into a test, test to check target, sets necessary environment variables
function(add_ceph_test test_name test_path)
add_test(NAME ${test_name} COMMAND ${test_path} ${ARGN}
+ WORKING_DIRECTORY ${CMAKE_BINARY_DIR}
COMMAND_EXPAND_LISTS)
if(TARGET ${test_name})
add_dependencies(tests ${test_name})
# unittest_peeringstate
add_executable(unittest_peeringstate
TestPeeringState.cc
+ $<TARGET_OBJECTS:pg_backend_test_fixture>
+ $<TARGET_OBJECTS:ec_peering_test_fixture>
)
add_ceph_unittest(unittest_peeringstate)
target_link_libraries(unittest_peeringstate osd os global ${CMAKE_DL_LIBS} ${BLKID_LIBRARIES})
add_library(pg_backend_test_fixture OBJECT
PGBackendTestFixture.cc
)
-target_link_libraries(pg_backend_test_fixture osd os global)
+target_link_libraries(pg_backend_test_fixture osd os global GTest::GTest)
# ec_peering_test_fixture: object library for ECPeeringTestFixture implementation
add_library(ec_peering_test_fixture OBJECT
ECPeeringTestFixture.cc
+ MockPeeringListener.cc
)
-target_link_libraries(ec_peering_test_fixture osd os global)
+target_link_libraries(ec_peering_test_fixture osd os global GTest::GTest)
# unittest_backend_basics (replaces unittest_ecbasics + unittest_replicatedbasics)
add_executable(unittest_backend_basics
)
add_ceph_unittest(unittest_ecfailover_with_peering)
target_link_libraries(unittest_ecfailover_with_peering osd os global ${CMAKE_DL_LIBS} ${BLKID_LIBRARIES})
-add_dependencies(unittest_ecfailover_with_peering ec_isa)
+add_dependencies(unittest_ecfailover_with_peering ec_isa ec_jerasure)
# unittest_hitset
add_executable(unittest_hitset
hitset.cc
*/
#include "test/osd/ECPeeringTestFixture.h"
+#include "test/osd/MockECRecPred.h"
+#include "test/osd/MockECReadPred.h"
+
+// ShardDpp implementation
+std::ostream& ECPeeringTestFixture::ShardDpp::gen_prefix(std::ostream& out) const {
+ out << "shard " << shard << ": ";
+ if (fixture->shard_peering_states.contains(shard)) {
+ PeeringState *ps = fixture->shard_peering_states[shard].get();
+ out << *ps << " ";
+ }
+ return out;
+}
+
+IsPGRecoverablePredicate* ECPeeringTestFixture::get_is_recoverable_predicate() {
+ return new MockECRecPred(k, m);
+}
+
+IsPGReadablePredicate* ECPeeringTestFixture::get_is_readable_predicate() {
+ return new MockECReadPred(k, m);
+}
+
+ECPeeringTestFixture::ECPeeringTestFixture()
+ : PGBackendTestFixture(PGBackendTestFixture::EC) {
+}
+
+void ECPeeringTestFixture::SetUp() {
+ PGBackendTestFixture::SetUp();
+ for (int i = 0; i < k + m; i++) {
+ create_peering_state(i);
+ }
+
+ // Override epoch getter to use shard_peering_listeners instead of base class listeners
+ // (which are moved into shard_peering_listeners during create_peering_state)
+ messenger->set_epoch_getter([this](int osd) -> epoch_t {
+ auto it = shard_peering_listeners.find(osd);
+ if (it != shard_peering_listeners.end()) {
+ return it->second->get_osdmap_epoch();
+ }
+ // Fallback to test fixture's osdmap
+ return osdmap->get_epoch();
+ });
+
+ // Register handlers for peering messages (MOSDPeeringOp)
+ // All peering messages (Query, Notify, Info, Log) use the same handler pattern
+ // since they all inherit from MOSDPeeringOp and use get_event()
+ auto peering_handler = [this](int from_osd, int to_osd, MOSDPeeringOp* op) -> bool {
+ // Message is already correctly typed as MOSDPeeringOp*
+ ceph_assert(op);
+
+ // Get the peering event from the message
+ PGPeeringEventRef evt_ref(op->get_event());
+
+ // Handle the event on the destination shard's peering state
+ PeeringCtx* ctx = get_peering_ctx(to_osd);
+ auto ps = get_peering_state(to_osd);
+ ps->handle_event(evt_ref, ctx);
+
+ auto t = ctx->transaction.claim_and_reset();
+ int r = queue_transaction_helper(to_osd, std::move(t));
+ ceph_assert( r >= 0 );
+ return true; // Message handled
+ };
+
+ // Register the same handler for all peering message types
+ messenger->register_typed_handler<MOSDPeeringOp>(MSG_OSD_PG_QUERY2, peering_handler);
+ messenger->register_typed_handler<MOSDPeeringOp>(MSG_OSD_PG_NOTIFY2, peering_handler);
+ messenger->register_typed_handler<MOSDPeeringOp>(MSG_OSD_PG_INFO2, peering_handler);
+ messenger->register_typed_handler<MOSDPeeringOp>(MSG_OSD_PG_LOG, peering_handler);
+ messenger->register_typed_handler<MOSDPeeringOp>(MSG_OSD_PG_LEASE, peering_handler);
+ messenger->register_typed_handler<MOSDPeeringOp>(MSG_OSD_PG_LEASE_ACK, peering_handler);
+
+ // Register idle callback to check for buffered messages
+ event_loop->register_idle_callback([this]() -> bool {
+ bool found_messages = false;
+ // Check all PeeringCtx objects for buffered messages
+ for (auto& [osd, ctx] : shard_peering_ctxs) {
+ if (!ctx->message_map.empty()) {
+ dispatch_buffered_messages(osd, ctx.get());
+ found_messages = true;
+ }
+ }
+ return found_messages;
+ });
+
+ // Run initial peering cycle to get all shards to active state
+ run_first_peering();
+}
+
+void ECPeeringTestFixture::TearDown() {
+ shard_peering_states.clear();
+ shard_peering_ctxs.clear();
+ shard_peering_listeners.clear();
+ shard_dpps.clear();
+ PGBackendTestFixture::TearDown();
+}
+
+PeeringState* ECPeeringTestFixture::get_peering_state(int shard) {
+ ceph_assert(shard >= 0 && shard < k + m);
+ auto it = shard_peering_states.find(shard);
+ ceph_assert(it != shard_peering_states.end());
+ ceph_assert(it->second != nullptr);
+ return it->second.get();
+}
+
+PeeringCtx* ECPeeringTestFixture::get_peering_ctx(int shard) {
+ ceph_assert(shard >= 0 && shard < k + m);
+ auto it = shard_peering_ctxs.find(shard);
+ ceph_assert(it != shard_peering_ctxs.end());
+ ceph_assert(it->second != nullptr);
+ return it->second.get();
+}
+
+MockPeeringListener* ECPeeringTestFixture::get_peering_listener(int shard) {
+ ceph_assert(shard >= 0 && shard < k + m);
+ auto it = shard_peering_listeners.find(shard);
+ ceph_assert(it != shard_peering_listeners.end());
+ ceph_assert(it->second != nullptr);
+ return it->second.get();
+}
+
+int ECPeeringTestFixture::get_primary_shard_from_osdmap() const {
+ std::vector<int> acting_osds;
+ int acting_primary = -1;
+ osdmap->pg_to_acting_osds(this->pgid, &acting_osds, &acting_primary);
+ return acting_primary;
+}
+
+MockPGBackendListener* ECPeeringTestFixture::get_primary_listener() {
+ int primary_shard = get_primary_shard_from_osdmap();
+ if (primary_shard < 0) {
+ return nullptr;
+ }
+
+ auto it = shard_peering_listeners.find(primary_shard);
+ if (it != shard_peering_listeners.end() && it->second &&
+ it->second->backend_listener) {
+ // Assert that the backend listener agrees it's primary
+ ceph_assert(it->second->backend_listener->pgb_is_primary());
+ return it->second->backend_listener.get();
+ }
+ return nullptr;
+}
+
+PGBackend* ECPeeringTestFixture::get_primary_backend() {
+ int primary_shard = get_primary_shard_from_osdmap();
+ if (primary_shard < 0) {
+ return nullptr;
+ }
+
+ auto listener_it = shard_peering_listeners.find(primary_shard);
+ if (listener_it != shard_peering_listeners.end() && listener_it->second &&
+ listener_it->second->backend_listener) {
+ // Assert that the backend listener agrees it's primary
+ ceph_assert(listener_it->second->backend_listener->pgb_is_primary());
+
+ // Return the backend from the base class's backends map, not from
+ // the peering listener, because the base class backend is connected
+ // to the event loop and message routers
+ auto backend_it = backends.find(primary_shard);
+ return (backend_it != backends.end()) ? backend_it->second.get() : nullptr;
+ }
+ return nullptr;
+}
+
+void ECPeeringTestFixture::event_initialize() {
+ // Get acting set from OSDMap
+ std::vector<int> acting_osds;
+ int acting_primary = -1;
+ osdmap->pg_to_acting_osds(this->pgid, &acting_osds, &acting_primary);
+
+ for (int shard : acting_osds) {
+ // Skip failed OSDs (marked as CRUSH_ITEM_NONE)
+ if (shard == CRUSH_ITEM_NONE) {
+ continue;
+ }
+ auto evt = std::make_shared<PGPeeringEvent>(
+ osdmap->get_epoch(),
+ osdmap->get_epoch(),
+ PeeringState::Initialize());
+
+ get_peering_state(shard)->handle_event(evt, get_peering_ctx(shard));
+ }
+ event_loop->run_until_idle();
+}
+
+void ECPeeringTestFixture::event_advance_map() {
+ // Capture the current osdmap and pgid for use in the lambda
+ OSDMapRef current_osdmap = osdmap;
+ pg_t current_pgid = this->pgid;
+
+ // Schedule advance_map events for each shard instead of running directly
+ for (auto& [shard, ctx] : shard_peering_ctxs) {
+ PeeringState* ps = shard_peering_states.at(shard).get();
+ OSDMapRef lastmap = ps->get_osdmap();
+ PeeringCtx* peering_ctx = ctx.get();
+
+ event_loop->schedule_peering_event(shard, [ps, current_osdmap, lastmap, current_pgid, peering_ctx]() {
+ // Get up/acting sets from OSDMap inside the lambda
+ std::vector<int> up_osds, acting_osds;
+ int up_primary = -1, acting_primary = -1;
+ current_osdmap->pg_to_up_acting_osds(current_pgid, &up_osds, &up_primary, &acting_osds, &acting_primary);
+
+ ps->advance_map(
+ current_osdmap, lastmap, up_osds, up_primary, acting_osds, acting_primary,
+ *peering_ctx);
+ });
+ }
+ event_loop->run_until_idle();
+}
+
+void ECPeeringTestFixture::event_activate_map() {
+ // Schedule activate_map events for each shard instead of running directly
+ for (auto& [shard, ctx] : shard_peering_ctxs) {
+ PeeringState* ps = shard_peering_states.at(shard).get();
+ PeeringCtx* peering_ctx = ctx.get();
+
+ event_loop->schedule_peering_event(shard, [ps, peering_ctx]() {
+ ps->activate_map(*peering_ctx);
+ });
+ }
+ event_loop->run_until_idle();
+}
+
+void ECPeeringTestFixture::dispatch_buffered_messages(int from_shard, PeeringCtx* ctx) {
+ ceph_assert(messenger);
+ ceph_assert(ctx);
+
+ // Check if there are any buffered messages in the context
+ for (auto& [target_osd, msg_list] : ctx->message_map) {
+ for (auto& msg : msg_list) {
+ // Route the message through the messenger
+ // msg is a MessageRef (boost::intrusive_ptr<Message>), need to get raw pointer
+ // MockMessenger will set the connection when it processes the message
+ messenger->send_message(from_shard, target_osd, msg.get());
+ }
+ msg_list.clear();
+ }
+ ctx->message_map.clear();
+}
+
+bool ECPeeringTestFixture::all_shards_active() {
+ // Get acting set from OSDMap
+ std::vector<int> acting_osds;
+ int acting_primary = -1;
+ osdmap->pg_to_acting_osds(this->pgid, &acting_osds, &acting_primary);
+
+ for (int shard : acting_osds) {
+ // Skip failed OSDs (marked as CRUSH_ITEM_NONE)
+ if (shard == CRUSH_ITEM_NONE) {
+ continue;
+ }
+ if (!get_peering_state(shard)->is_active()) {
+ return false;
+ }
+ }
+ return true;
+}
+
+bool ECPeeringTestFixture::all_shards_clean() {
+ // Get primary from OSDMap
+ std::vector<int> acting_osds;
+ int acting_primary = -1;
+ osdmap->pg_to_acting_osds(this->pgid, &acting_osds, &acting_primary);
+
+ if (acting_primary >= 0 && acting_primary != CRUSH_ITEM_NONE) {
+ return get_peering_state(acting_primary)->is_clean();
+ }
+ return false;
+}
+
+std::string ECPeeringTestFixture::get_state_name(int shard) {
+ return get_peering_state(shard)->get_current_state();
+}
+
+void ECPeeringTestFixture::suspend_osd(int osd) {
+ if (event_loop) {
+ event_loop->suspend_to_osd(osd);
+ }
+}
+
+void ECPeeringTestFixture::unsuspend_osd(int osd) {
+ if (event_loop) {
+ event_loop->unsuspend_to_osd(osd);
+ }
+}
+
+bool ECPeeringTestFixture::is_osd_suspended(int osd) {
+ return event_loop && event_loop->is_to_osd_suspended(osd);
+}
+
+void ECPeeringTestFixture::suspend_primary_to_osd(int to_osd) {
+ if (event_loop) {
+ int primary = get_primary_shard_from_osdmap();
+ if (primary >= 0) {
+ event_loop->suspend_from_to_osd(primary, to_osd);
+ }
+ }
+}
+
+void ECPeeringTestFixture::unsuspend_primary_to_osd(int to_osd) {
+ if (event_loop) {
+ int primary = get_primary_shard_from_osdmap();
+ if (primary >= 0) {
+ event_loop->unsuspend_from_to_osd(primary, to_osd);
+ }
+ }
+}
PeeringState* ECPeeringTestFixture::create_peering_state(int shard)
{
shard_dpps[shard] = std::make_unique<ShardDpp>(g_ceph_context, this, shard);
+ // Construct MockPeeringListener, transferring ownership of the backend
+ // listener created by setup_ec_pool() directly. No throw-away construction.
shard_peering_listeners[shard] = std::make_unique<MockPeeringListener>(
- osdmap, pool_id, shard_dpps[shard].get(), pg_whoami);
- shard_peering_listeners[shard]->current_epoch = osdmap->get_epoch();
-
- shard_peering_listeners[shard]->queue_transaction_callback =
+ osdmap, pool_id, shard_dpps[shard].get(), pg_whoami,
+ std::move(listeners[shard]),
+ store.get(), colls[shard], chs[shard]);
+
+ auto& pl = shard_peering_listeners[shard];
+ pl->current_epoch = osdmap->get_epoch();
+ pl->set_messenger(messenger.get());
+ pl->set_event_loop(event_loop.get());
+ pl->set_fixture(this);
+ pl->backend_listener->set_messenger(messenger.get());
+
+ pl->queue_transaction_callback =
[this, shard](ObjectStore::Transaction&& t) -> int {
return queue_transaction_helper(shard, std::move(t));
};
- // Transfer ownership of the backend listener from the base class listeners[]
- // map into the peering listener. The factory (set in our constructor) already
- // recorded a raw pointer in backend_listeners[] so we know which entry to move.
- // After the move, listeners[shard] holds a null unique_ptr; TearDown() already
- // guards against that with "if (list)".
- shard_peering_listeners[shard]->backend_listener = std::move(listeners[shard]);
- shard_peering_listeners[shard]->coll = colls[shard];
- shard_peering_listeners[shard]->ch = chs[shard];
-
- // Recreate backend with the correct backend_listener pointer.
- // The MockPeeringListener constructor created backend with the temporary
- // backend_listener it allocated internally, but we just replaced backend_listener
- // with the one from the base class listeners[] map. We must recreate backend
- // so its parent pointer points to the new backend_listener, not the destroyed one.
- shard_peering_listeners[shard]->backend = std::make_unique<MockPGBackend>(
- g_ceph_context,
- shard_peering_listeners[shard]->backend_listener.get(),
- nullptr,
- colls[shard],
- chs[shard]);
-
spg_t spgid(pgid, shard_id_t(shard));
auto ps = std::make_unique<PeeringState>(
g_ceph_context,
osdmap,
PG_FEATURE_CLASSIC_ALL,
shard_dpps[shard].get(),
- shard_peering_listeners[shard].get());
+ pl.get());
+
+ pl->ps = ps.get();
- shard_peering_listeners[shard]->ps = ps.get();
-
ps->set_backend_predicates(
get_is_readable_predicate(),
get_is_recoverable_predicate());
shard_peering_states[shard] = std::move(ps);
- shard_peering_listeners[shard]->backend_listener->set_peering_state(shard_peering_states[shard].get());
+ pl->backend_listener->set_peering_state(shard_peering_states[shard].get());
shard_peering_ctxs[shard] = std::make_unique<PeeringCtx>();
return shard_peering_states[shard].get();
OSDMapRef old_osdmap = osdmap;
update_osdmap(new_osdmap, new_primary);
+ new_epoch(false);
+ new_epoch_loop();
+}
- // Update peering listeners for ALL shards (even failed ones need epoch updates)
- for (auto& [shard, listener] : shard_peering_listeners) {
- listener->current_epoch = new_osdmap->get_epoch();
- }
-
- // Get primary from OSDMap for advance_map calls using base class pgid member
- std::vector<int> up_osds, acting_osds;
- int up_primary = -1, acting_primary = -1;
- osdmap->pg_to_up_acting_osds(this->pgid, &up_osds, &up_primary, &acting_osds, &acting_primary);
-
- // Call advance_map on ALL shards that have peering states, including failed ones
- // This ensures that failed OSDs are notified of map changes (e.g., primary failover)
- // Use the newly computed up_osds and acting_osds from the new OSDMap
- for (auto& [shard, ps] : shard_peering_states) {
- ps->advance_map(
- osdmap, old_osdmap, up_osds, up_primary, acting_osds, acting_primary,
- *get_peering_ctx(shard));
- }
-
- // Call activate_map on ALL shards that have peering states
- // This ensures failed OSDs properly transition state and notify their backends
- for (auto& [shard, ps] : shard_peering_states) {
- ps->activate_map(*get_peering_ctx(shard));
- }
-
- dispatch_all();
-
- // Handle up_thru requirements - keep creating new epochs until peering completes.
- // Note: For primary failover scenarios, full peering may not complete immediately.
- int max_iterations = 3;
+void ECPeeringTestFixture::new_epoch_loop() {
+ int max = 10;
do {
+ ceph_assert(--max);
event_advance_map();
event_activate_map();
- } while (new_epoch(true) && --max_iterations);
+ } while (new_epoch(true));
}
bool ECPeeringTestFixture::new_epoch(bool if_required)
if (acting_primary >= 0) {
auto& listener = shard_peering_listeners[acting_primary];
if (listener->pg_temp_wanted) {
- // Get up set from OSDMap
std::vector<int> up_osds;
int up_primary = -1;
osdmap->pg_to_up_acting_osds(this->pgid, &up_osds, &up_primary, nullptr, nullptr);
if (acting_temp.empty()) {
acting_temp = up_osds;
}
-
- // Apply the pg_temp change that peering requested.
- // For EC pools with optimizations, transform to primaryfirst order
- // (this simulates what the monitor does in production).
- const pg_pool_t* pool = osdmap->get_pg_pool(this->pgid.pool());
- std::vector<int> pg_temp_acting = acting_temp;
- if (pool && pool->allows_ecoptimizations()) {
- pg_temp_acting = osdmap->pgtemp_primaryfirst(*pool, acting_temp);
- }
-
+
pending_inc.new_pg_temp[this->pgid] =
- mempool::osdmap::vector<int>(pg_temp_acting.begin(), pg_temp_acting.end());
+ mempool::osdmap::vector<int>(acting_temp.begin(), acting_temp.end());
listener->pg_temp_wanted = false;
did_work = true;
return true;
}
-void ECPeeringTestFixture::run_peering_cycle()
-{
+void ECPeeringTestFixture::run_first_peering() {
init_peering();
event_initialize();
- dispatch_all();
- event_advance_map();
- dispatch_all();
- event_activate_map();
- dispatch_all();
-
- // Handle up_thru requirements - keep creating new epochs until peering completes.
- int max_iterations = 10;
- for (int i = 0; i < max_iterations && !all_shards_active(); i++) {
- if (new_epoch(true)) {
- event_advance_map();
- dispatch_all();
- event_activate_map();
- dispatch_all();
- }
- }
+ new_epoch_loop();
}
int ECPeeringTestFixture::queue_transaction_helper(int shard, ObjectStore::Transaction&& t)
OSDMapTestHelpers::mark_osd_down(new_osdmap, osd_id);
update_osdmap_with_peering(new_osdmap);
- dispatch_all();
-
- // Process any pg_temp requests from peering (emulates monitor processing MOSDPGTemp)
- // This will apply the primaryfirst transformation if needed
- if (new_epoch(false)) {
- event_advance_map();
- dispatch_all();
- }
}
void ECPeeringTestFixture::mark_osd_up(int osd_id)
OSDMapTestHelpers::mark_osd_up(new_osdmap, osd_id);
update_osdmap_with_peering(new_osdmap);
- dispatch_all();
}
void ECPeeringTestFixture::mark_osds_down(const std::vector<int>& osd_ids)
OSDMapTestHelpers::mark_osds_down(new_osdmap, osd_ids);
update_osdmap_with_peering(new_osdmap);
- dispatch_all();
}
void ECPeeringTestFixture::advance_epoch()
OSDMapTestHelpers::advance_epoch(new_osdmap);
update_osdmap_with_peering(new_osdmap);
- dispatch_all();
}
#include <vector>
#include "test/osd/PGBackendTestFixture.h"
#include "test/osd/MockPeeringListener.h"
-#include "test/osd/MockConnection.h"
-#include "test/osd/MockECRecPred.h"
-#include "test/osd/MockECReadPred.h"
-#include "test/osd/OSDMapTestHelpers.h"
#include "osd/PeeringState.h"
-#include "messages/MOSDPeeringOp.h"
+#include "messages/MOSDPGNotify2.h"
+#include "test/osd/MockMessenger.h"
+
+// Forward declaration
+class ECPeeringTestFixture;
/**
* ECPeeringTestFixture - EC test fixture with full peering infrastructure
std::map<int, std::unique_ptr<PeeringCtx>> shard_peering_ctxs;
std::map<int, std::unique_ptr<MockPeeringListener>> shard_peering_listeners;
- std::map<int, std::list<MessageRef>> shard_messages;
- std::map<int, std::list<PGPeeringEventRef>> shard_events;
-
- // Raw-pointer map giving this fixture direct access to the backend listeners
- // created by the listener_factory. The pointers are valid for the lifetime
- // of the test because ownership is transferred to
- // shard_peering_listeners[i]->backend_listener in create_peering_state().
- std::map<int, MockPGBackendListener*> backend_listeners;
-
class ShardDpp : public NoDoutPrefix {
public:
ECPeeringTestFixture *fixture;
ShardDpp(CephContext *cct, ECPeeringTestFixture *f, int s)
: NoDoutPrefix(cct, ceph_subsys_osd), fixture(f), shard(s) {}
- std::ostream& gen_prefix(std::ostream& out) const override {
- out << "shard " << shard << ": ";
- if (fixture->shard_peering_states.contains(shard)) {
- PeeringState *ps = fixture->shard_peering_states[shard].get();
- out << *ps << " ";
- }
- return out;
- }
+ std::ostream& gen_prefix(std::ostream& out) const override;
};
std::map<int, std::unique_ptr<ShardDpp>> shard_dpps;
- IsPGRecoverablePredicate *get_is_recoverable_predicate() {
- return new MockECRecPred(k, m);
- }
-
- IsPGReadablePredicate *get_is_readable_predicate() {
- return new MockECReadPred(k, m);
- }
+ IsPGRecoverablePredicate *get_is_recoverable_predicate();
+ IsPGReadablePredicate *get_is_readable_predicate();
public:
- ECPeeringTestFixture() : PGBackendTestFixture(PGBackendTestFixture::EC) {
- // Install a listener_factory so that setup_ec_pool() creates listeners
- // that we can access directly (via backend_listeners[]) without needing
- // to steal ownership via release_listener().
- //
- // The factory records a raw pointer in backend_listeners[instance] and
- // returns the unique_ptr to the base class, which stores it in listeners[].
- // In create_peering_state() we then move that unique_ptr from listeners[]
- // into shard_peering_listeners[]->backend_listener, at which point the
- // raw pointer in backend_listeners[] remains valid (owned by the peering
- // listener for the rest of the test).
- listener_factory = [this](
- int instance,
- std::shared_ptr<OSDMap> om,
- int64_t pool_id,
- DoutPrefixProvider* dpp_arg,
- pg_shard_t whoami) -> std::unique_ptr<MockPGBackendListener>
- {
- auto bl = std::make_unique<MockPGBackendListener>(
- om, pool_id, dpp_arg, whoami);
- // Record raw pointer so tests can access the listener directly
- backend_listeners[instance] = bl.get();
- return bl;
- };
- }
-
- void SetUp() override {
- PGBackendTestFixture::SetUp();
- for (int i = 0; i < k + m; i++) {
- create_peering_state(i);
- }
- }
+ ECPeeringTestFixture();
+
+ int queue_transaction_helper(int shard, ObjectStore::Transaction&& t);
- void TearDown() override {
- shard_peering_states.clear();
- shard_peering_ctxs.clear();
- shard_peering_listeners.clear();
- shard_dpps.clear();
- shard_messages.clear();
- shard_events.clear();
- PGBackendTestFixture::TearDown();
- }
+ void SetUp() override;
+ void TearDown() override;
PeeringState* create_peering_state(int shard);
- PeeringState* get_peering_state(int shard) {
- ceph_assert(shard >= 0 && shard < k + m);
- auto it = shard_peering_states.find(shard);
- ceph_assert(it != shard_peering_states.end());
- ceph_assert(it->second != nullptr);
- return it->second.get();
- }
-
- PeeringCtx* get_peering_ctx(int shard) {
- ceph_assert(shard >= 0 && shard < k + m);
- auto it = shard_peering_ctxs.find(shard);
- ceph_assert(it != shard_peering_ctxs.end());
- ceph_assert(it->second != nullptr);
- return it->second.get();
- }
-
- MockPeeringListener* get_peering_listener(int shard) {
- ceph_assert(shard >= 0 && shard < k + m);
- auto it = shard_peering_listeners.find(shard);
- ceph_assert(it != shard_peering_listeners.end());
- ceph_assert(it->second != nullptr);
- return it->second.get();
- }
+ PeeringState* get_peering_state(int shard);
+ PeeringCtx* get_peering_ctx(int shard);
+ MockPeeringListener* get_peering_listener(int shard);
/**
* Query the OSDMap to determine which shard is the primary.
*
* @return The shard ID of the primary, or -1 if no primary exists
*/
- int get_primary_shard_from_osdmap() const {
- std::vector<int> acting_osds;
- int acting_primary = -1;
- osdmap->pg_to_acting_osds(this->pgid, &acting_osds, &acting_primary);
- return acting_primary;
- }
+ int get_primary_shard_from_osdmap() const;
// Override base class methods to work with peering fixture's structure
- MockPGBackendListener* get_primary_listener() override {
- int primary_shard = get_primary_shard_from_osdmap();
- if (primary_shard < 0) {
- return nullptr;
- }
-
- auto it = shard_peering_listeners.find(primary_shard);
- if (it != shard_peering_listeners.end() && it->second &&
- it->second->backend_listener) {
- // Assert that the backend listener agrees it's primary
- ceph_assert(it->second->backend_listener->pgb_is_primary());
- return it->second->backend_listener.get();
- }
- return nullptr;
- }
-
- PGBackend* get_primary_backend() override {
- int primary_shard = get_primary_shard_from_osdmap();
- if (primary_shard < 0) {
- return nullptr;
- }
-
- auto listener_it = shard_peering_listeners.find(primary_shard);
- if (listener_it != shard_peering_listeners.end() && listener_it->second &&
- listener_it->second->backend_listener) {
- // Assert that the backend listener agrees it's primary
- ceph_assert(listener_it->second->backend_listener->pgb_is_primary());
-
- // Return the backend from the base class's backends map, not from
- // the peering listener, because the base class backend is connected
- // to the event loop and message routers
- auto backend_it = backends.find(primary_shard);
- return (backend_it != backends.end()) ? backend_it->second.get() : nullptr;
- }
- return nullptr;
- }
+ MockPGBackendListener* get_primary_listener() override;
+ PGBackend* get_primary_backend() override;
void init_peering(bool dne = false);
-
- void event_initialize() {
- // Get acting set from OSDMap
- std::vector<int> acting_osds;
- int acting_primary = -1;
- osdmap->pg_to_acting_osds(this->pgid, &acting_osds, &acting_primary);
-
- for (int shard : acting_osds) {
- // Skip failed OSDs (marked as CRUSH_ITEM_NONE)
- if (shard == CRUSH_ITEM_NONE) {
- continue;
- }
- auto evt = std::make_shared<PGPeeringEvent>(
- osdmap->get_epoch(),
- osdmap->get_epoch(),
- PeeringState::Initialize());
-
- get_peering_state(shard)->handle_event(evt, get_peering_ctx(shard));
- }
- }
-
- void event_advance_map() {
- // Get primary from OSDMap - query once before the loop
- std::vector<int> up_osds, acting_osds;
- int up_primary = -1, acting_primary = -1;
- osdmap->pg_to_up_acting_osds(this->pgid, &up_osds, &up_primary, &acting_osds, &acting_primary);
-
- for (int shard : acting_osds) {
- // Skip failed OSDs (marked as CRUSH_ITEM_NONE)
- if (shard == CRUSH_ITEM_NONE) {
- continue;
- }
- get_peering_state(shard)->advance_map(
- osdmap, osdmap, up_osds, up_primary, acting_osds, acting_primary,
- *get_peering_ctx(shard));
- }
- }
-
- void event_activate_map() {
- // Get acting set from OSDMap - must use same set as advance_map
- std::vector<int> up_osds, acting_osds;
- int up_primary = -1, acting_primary = -1;
- osdmap->pg_to_up_acting_osds(this->pgid, &up_osds, &up_primary, &acting_osds, &acting_primary);
-
- for (int shard : acting_osds) {
- // Skip failed OSDs (marked as CRUSH_ITEM_NONE)
- if (shard == CRUSH_ITEM_NONE) {
- continue;
- }
- get_peering_state(shard)->activate_map(*get_peering_ctx(shard));
- }
- }
+ void event_initialize();
+ void event_advance_map();
+ void event_activate_map();
private:
- // Dispatch all messages from a map<int, Container<MessageRef>>.
- // Templated to work with both std::vector (PeeringCtx::message_map) and
- // std::list (MockPeeringListener::messages).
- template <typename Container>
- bool dispatch_messages_from_map(int from_shard,
- std::map<int, Container>& msg_map) {
- bool did_work = false;
-
- // Get acting set from OSDMap
- std::vector<int> acting_osds;
- int acting_primary = -1;
- osdmap->pg_to_acting_osds(this->pgid, &acting_osds, &acting_primary);
-
- for (auto& [to_shard, msg_list] : msg_map) {
- if (std::find(acting_osds.begin(), acting_osds.end(), to_shard) == acting_osds.end()) {
- continue;
- }
-
- while (!msg_list.empty()) {
- MessageRef m = msg_list.front();
- msg_list.erase(msg_list.begin());
-
- // Cast to MOSDPeeringOp - all peering messages inherit from this.
- // Use dynamic_cast with assertion to catch unexpected message types.
- // Use m.get() (not m.detach()) to avoid leaking the raw pointer.
- MOSDPeeringOp *op = dynamic_cast<MOSDPeeringOp*>(m.get());
- ceph_assert(op != nullptr) /* message must be a MOSDPeeringOp */;
-
- // Set connection peer to the SENDER, not the destination
- ceph_msg_header h = op->get_header();
- h.src.num = from_shard;
- op->set_header(h);
-
- ConnectionRef conn = new MockConnection(from_shard);
- op->set_connection(conn);
-
- // get_event() returns a newly allocated PGPeeringEvent,
- // so we take ownership directly into a shared_ptr (matching OSD.cc pattern)
- PGPeeringEventRef evt_ref(op->get_event());
-
- get_peering_state(to_shard)->handle_event(
- evt_ref,
- get_peering_ctx(to_shard));
-
- did_work = true;
- }
- }
-
- return did_work;
- }
+ /**
+ * dispatch_buffered_messages - Check for and dispatch any buffered messages
+ *
+ * After handling a peering event, PeeringState may have buffered messages
+ * in the PeeringCtx that need to be dispatched. This function checks for
+ * such messages and routes them through the messenger.
+ */
+ void dispatch_buffered_messages(int from_shard, PeeringCtx* ctx);
public:
- bool dispatch_peering_messages(int from_shard) {
- auto* ctx = get_peering_ctx(from_shard);
- return dispatch_messages_from_map(from_shard, ctx->message_map);
- }
- bool dispatch_cluster_messages(int from_shard) {
- auto& listener = shard_peering_listeners[from_shard];
- return dispatch_messages_from_map(from_shard, listener->messages);
- }
-
- bool dispatch_all_peering_messages() {
- bool did_work = false;
- bool work_this_round;
-
- // Get acting set from OSDMap
- std::vector<int> acting_osds;
- int acting_primary = -1;
- osdmap->pg_to_acting_osds(this->pgid, &acting_osds, &acting_primary);
-
- do {
- work_this_round = false;
- for (int shard : acting_osds) {
- // Skip failed OSDs (marked as CRUSH_ITEM_NONE)
- if (shard == CRUSH_ITEM_NONE) {
- continue;
- }
- work_this_round |= dispatch_peering_messages(shard);
- }
- did_work |= work_this_round;
- } while (work_this_round);
-
- return did_work;
- }
-
- bool dispatch_events(int shard, bool stalled = false) {
- auto& listener = shard_peering_listeners[shard];
- std::list<PGPeeringEventRef>& event_queue =
- stalled ? listener->stalled_events : listener->events;
-
- if (event_queue.empty()) {
- return false;
- }
-
- bool did_work = false;
- while (!event_queue.empty()) {
- PGPeeringEventRef evt = event_queue.front();
- event_queue.pop_front();
-
- get_peering_state(shard)->handle_event(evt, get_peering_ctx(shard));
- did_work = true;
- }
-
- return did_work;
- }
-
- bool dispatch_all_events(bool stalled = false) {
- bool did_work = false;
- bool work_this_round;
-
- // Get acting set from OSDMap
- std::vector<int> acting_osds;
- int acting_primary = -1;
- osdmap->pg_to_acting_osds(this->pgid, &acting_osds, &acting_primary);
-
- do {
- work_this_round = false;
- for (int shard : acting_osds) {
- // Skip failed OSDs (marked as CRUSH_ITEM_NONE)
- if (shard == CRUSH_ITEM_NONE) {
- continue;
- }
- work_this_round |= dispatch_events(shard, stalled);
- }
- did_work |= work_this_round;
- } while (work_this_round);
-
- return did_work;
- }
-
- bool dispatch_all_cluster_messages() {
- bool did_work = false;
- bool work_this_round;
-
- // Get acting set from OSDMap
- std::vector<int> acting_osds;
- int acting_primary = -1;
- osdmap->pg_to_acting_osds(this->pgid, &acting_osds, &acting_primary);
-
- do {
- work_this_round = false;
- for (int shard : acting_osds) {
- // Skip failed OSDs (marked as CRUSH_ITEM_NONE)
- if (shard == CRUSH_ITEM_NONE) {
- continue;
- }
- work_this_round |= dispatch_cluster_messages(shard);
- }
- did_work |= work_this_round;
- } while (work_this_round);
-
- return did_work;
- }
-
- bool dispatch_all() {
- bool did_work = false;
- bool work_this_round;
-
- do {
- work_this_round = false;
- work_this_round |= dispatch_all_peering_messages();
- work_this_round |= dispatch_all_cluster_messages();
- work_this_round |= dispatch_all_events();
- did_work |= work_this_round;
- } while (work_this_round);
-
- return did_work;
- }
-
// IMPORTANT: For EC pools, shard positions in acting array must be preserved.
// Failed OSDs should be replaced with CRUSH_ITEM_NONE, not removed.
void update_osdmap_with_peering(
std::shared_ptr<OSDMap> new_osdmap,
std::optional<pg_shard_t> new_primary = std::nullopt);
+ void new_epoch_loop();
bool new_epoch(bool if_required = false);
- int queue_transaction_helper(int shard, ObjectStore::Transaction&& t);
-
- void run_peering_cycle();
+ void run_first_peering();
// OSDMap manipulation helpers - these create a new epoch and trigger peering
*/
void advance_epoch();
- bool all_shards_active() {
- // Get acting set from OSDMap
- std::vector<int> acting_osds;
- int acting_primary = -1;
- osdmap->pg_to_acting_osds(this->pgid, &acting_osds, &acting_primary);
-
- for (int shard : acting_osds) {
- // Skip failed OSDs (marked as CRUSH_ITEM_NONE)
- if (shard == CRUSH_ITEM_NONE) {
- continue;
- }
- if (!get_peering_state(shard)->is_active()) {
- return false;
- }
- }
- return true;
- }
+ bool all_shards_active();
// In EC pools, only the primary tracks PG_STATE_CLEAN.
- bool all_shards_clean() {
- // Get primary from OSDMap
- std::vector<int> acting_osds;
- int acting_primary = -1;
- osdmap->pg_to_acting_osds(this->pgid, &acting_osds, &acting_primary);
-
- if (acting_primary >= 0 && acting_primary != CRUSH_ITEM_NONE) {
- return get_peering_state(acting_primary)->is_clean();
- }
- return false;
- }
+ bool all_shards_clean();
- std::string get_state_name(int shard) {
- return get_peering_state(shard)->get_current_state();
- }
+ std::string get_state_name(int shard);
+
+ /**
+ * Suspend an OSD - queues events for this OSD without executing them.
+ * This simulates an OSD being temporarily unavailable.
+ * Events remain queued and will be processed when the OSD is unsuspended.
+ *
+ * @param osd The OSD number to suspend
+ */
+ void suspend_osd(int osd);
+
+ /**
+ * Unsuspend a previously suspended OSD.
+ * Queued events for this OSD will be processed on the next event loop iteration.
+ *
+ * @param osd The OSD number to unsuspend
+ */
+ void unsuspend_osd(int osd);
+
+ /**
+ * Check if an OSD is currently suspended.
+ *
+ * @param osd The OSD number to check
+ * @return true if the OSD is suspended, false otherwise
+ */
+ bool is_osd_suspended(int osd);
+
+ /**
+ * Suspend messages from the primary to a specific OSD.
+ * This blocks communication from the primary to the target OSD while
+ * allowing other communication to proceed normally.
+ *
+ * @param to_osd The OSD number to block messages to (from the primary)
+ */
+ void suspend_primary_to_osd(int to_osd);
+
+ /**
+ * Unsuspend messages from the primary to a specific OSD.
+ * Queued messages will be processed on the next event loop iteration.
+ *
+ * @param to_osd The OSD number to unblock messages to (from the primary)
+ */
+ void unsuspend_primary_to_osd(int to_osd);
};
#include <iostream>
#include <functional>
-#include <queue>
+#include <deque>
#include <map>
+#include <set>
#include <list>
#include <vector>
#include "include/types.h"
private:
struct Event {
EventType type;
- int osd; // -1 for generic events
+ int from_osd; // -1 for generic events or when not applicable
+ int to_osd; // -1 for generic events or when not applicable
GenericEvent callback;
- Event(EventType t, int o, GenericEvent cb)
- : type(t), osd(o), callback(std::move(cb)) {}
+ Event(EventType t, int from, int to, GenericEvent cb)
+ : type(t), from_osd(from), to_osd(to), callback(std::move(cb)) {}
};
- std::queue<Event> event_queue;
+ std::deque<Event> events;
bool verbose = false;
int events_executed = 0;
std::map<EventType, int> events_by_type;
+ std::set<int> suspended_from_osds;
+ std::set<int> suspended_to_osds;
+ std::set<std::pair<int, int>> suspended_from_to_osds;
+ int current_osd = -1;
+
+ // Map from OSD number to its queue of suspended events
+ std::map<int, std::deque<Event>> suspended_events;
+
+ // Callback to check for more work when idle (returns true if more work was generated)
+ std::list<std::function<bool()>> idle_callbacks;
static constexpr const char* event_type_name(EventType type) {
switch (type) {
EventLoop(bool verbose = false) : verbose(verbose) {}
void schedule_generic(GenericEvent event) {
- event_queue.emplace(EventType::GENERIC, -1, std::move(event));
+ events.emplace_back(EventType::GENERIC, -1, -1, std::move(event));
}
- void schedule_osd_message(int osd, GenericEvent callback) {
- event_queue.emplace(EventType::OSD_MESSAGE, osd, std::move(callback));
+ void schedule_osd_message(int from_osd, int to_osd, GenericEvent callback) {
+ ceph_assert(from_osd >= 0);
+ ceph_assert(to_osd >= 0);
+ events.emplace_back(EventType::OSD_MESSAGE, from_osd, to_osd, std::move(callback));
}
void schedule_transaction(int osd, GenericEvent callback) {
- event_queue.emplace(EventType::TRANSACTION, osd, std::move(callback));
+ ceph_assert(osd >= 0);
+ events.emplace_back(EventType::TRANSACTION, -1, osd, std::move(callback));
}
- void schedule_peering_message(int to_osd, GenericEvent callback) {
- event_queue.emplace(EventType::PEERING_MESSAGE, to_osd, std::move(callback));
+ void schedule_peering_message(int from_osd, int to_osd, GenericEvent callback) {
+ ceph_assert(from_osd >= 0);
+ ceph_assert(to_osd >= 0);
+ events.emplace_back(EventType::PEERING_MESSAGE, from_osd, to_osd, std::move(callback));
}
- void schedule_cluster_message(int to_osd, GenericEvent callback) {
- event_queue.emplace(EventType::CLUSTER_MESSAGE, to_osd, std::move(callback));
+ void schedule_cluster_message(int from_osd, int to_osd, GenericEvent callback) {
+ ceph_assert(from_osd >= 0);
+ ceph_assert(to_osd >= 0);
+ events.emplace_back(EventType::CLUSTER_MESSAGE, from_osd, to_osd, std::move(callback));
}
void schedule_peering_event(int osd, GenericEvent callback) {
- event_queue.emplace(EventType::PEERING_EVENT, osd, std::move(callback));
+ ceph_assert(osd >= 0);
+ events.emplace_back(EventType::PEERING_EVENT, -1, osd, std::move(callback));
}
bool has_events() const {
- return !event_queue.empty();
+ return !events.empty();
+ }
+
+ void register_idle_callback(std::function<bool()> callback) {
+ idle_callbacks.emplace_back(std::move(callback));
}
size_t queued_event_count() const {
- return event_queue.size();
+ return events.size();
}
int get_events_executed() const {
}
bool run_one() {
- if (event_queue.empty()) {
+ if (events.empty()) {
return false;
}
- Event event = std::move(event_queue.front());
- event_queue.pop();
+ Event event = std::move(events.front());
+ events.pop_front();
+
+ // Check if this event should be suspended based on from_osd, to_osd, or from-to pair
+ bool should_suspend = false;
+ int suspend_osd = -1;
+
+ if (event.from_osd >= 0 && is_from_osd_suspended(event.from_osd)) {
+ should_suspend = true;
+ suspend_osd = event.from_osd;
+ } else if (event.to_osd >= 0 && is_to_osd_suspended(event.to_osd)) {
+ should_suspend = true;
+ suspend_osd = event.to_osd;
+ } else if (event.from_osd >= 0 && event.to_osd >= 0 &&
+ is_from_to_osd_suspended(event.from_osd, event.to_osd)) {
+ should_suspend = true;
+ suspend_osd = event.to_osd; // Use to_osd for queue key
+ }
+
+ if (should_suspend) {
+ // Move to suspended queue
+ if (verbose) {
+ std::cout << " [Event " << (events_executed + 1) << "] "
+ << event_type_name(event.type);
+ if (event.from_osd >= 0 && event.to_osd >= 0) {
+ std::cout << " (OSD." << event.from_osd << " -> OSD." << event.to_osd << ")";
+ } else if (event.to_osd >= 0) {
+ std::cout << " (to OSD." << event.to_osd << ")";
+ } else if (event.from_osd >= 0) {
+ std::cout << " (from OSD." << event.from_osd << ")";
+ }
+ std::cout << " *** SUSPENDED - moving to suspended queue ***" << std::endl;
+ }
+ suspended_events[suspend_osd].push_back(std::move(event));
+ return true; // We processed an event (by suspending it)
+ }
+
+ // Print banner if switching to a different OSD
+ int active_osd = (event.to_osd >= 0) ? event.to_osd : event.from_osd;
+ if (active_osd >= 0 && active_osd != current_osd) {
+ current_osd = active_osd;
+ if (verbose) {
+ std::cout << "\n==== Processing events for OSD." << current_osd
+ << " ====" << std::endl;
+ }
+ }
if (verbose) {
std::cout << " [Event " << (events_executed + 1) << "] "
<< event_type_name(event.type);
- if (event.osd >= 0) {
- std::cout << " (OSD " << event.osd << ")";
+ if (event.from_osd >= 0 && event.to_osd >= 0) {
+ std::cout << " (OSD." << event.from_osd << " -> OSD." << event.to_osd << ")";
+ } else if (event.to_osd >= 0) {
+ std::cout << " (to OSD." << event.to_osd << ")";
+ } else if (event.from_osd >= 0) {
+ std::cout << " (from OSD." << event.from_osd << ")";
}
std::cout << " Executing..." << std::endl;
}
+ // Execute the event
event.callback();
events_executed++;
events_by_type[event.type]++;
}
if (verbose) {
- std::cout << "=== Executed " << executed << " events, "
- << event_queue.size() << " remaining ===" << std::endl;
+ std::cout << "=== Executed " << executed << " events, "
+ << events.size() << " remaining ===" << std::endl;
}
return executed;
}
-
+
+ bool do_idle_callbacks() {
+ bool new_work = false;
+ for (auto cb : idle_callbacks) {
+ if (cb()) {
+ new_work = true;
+ }
+ }
+
+ return new_work;
+ }
+
/**
* Run until the queue is empty or max_events is reached.
* Returns -1 if max_events was reached before the queue emptied.
*/
- int run_until_idle(int max_events = 0) {
+ void run_until_idle(int max_events = 10000) {
if (verbose) {
std::cout << "\n=== Running until idle";
if (max_events > 0) {
}
std::cout << " ===" << std::endl;
}
-
- int executed = 0;
- while (has_events()) {
- if (max_events > 0 && executed >= max_events) {
- if (verbose) {
- std::cout << "=== Max events (" << max_events << ") reached, "
- << event_queue.size() << " events remaining ===" << std::endl;
- }
- return -1; // Timeout
+
+ do {
+ ceph_assert(--max_events);
+ while (has_events()) {
+ run_one();
}
-
- run_one();
- executed++;
- }
-
+ } while (do_idle_callbacks());
+ }
+
+ void clear() {
+ events.clear();
+ suspended_events.clear();
+ }
+
+ void set_verbose(bool v) {
+ verbose = v;
+ }
+
+ // OSD management methods
+ /**
+ * Suspend events FROM an OSD - events originating from this OSD will be queued
+ * but not executed until the OSD is unsuspended.
+ */
+ void suspend_from_osd(int osd) {
+ suspended_from_osds.insert(osd);
if (verbose) {
- std::cout << "=== Idle: Executed " << executed << " events ===" << std::endl;
- print_stats();
+ std::cout << "*** Events FROM OSD." << osd << " marked as SUSPENDED ***" << std::endl;
}
+ }
+
+ /**
+ * Unsuspend events FROM an OSD - queued events from this OSD will be processed
+ * on subsequent run_one() calls.
+ */
+ void unsuspend_from_osd(int osd) {
+ suspended_from_osds.erase(osd);
- return executed;
+ // Move all suspended events for this OSD back to the main queue
+ auto it = suspended_events.find(osd);
+ if (it != suspended_events.end()) {
+ if (verbose) {
+ std::cout << "*** Events FROM OSD." << osd << " marked as UNSUSPENDED - restoring "
+ << it->second.size() << " suspended events ***" << std::endl;
+ }
+
+ // Append suspended events to the main queue
+ for (auto& event : it->second) {
+ events.push_back(std::move(event));
+ }
+
+ suspended_events.erase(it);
+ } else {
+ if (verbose) {
+ std::cout << "*** Events FROM OSD." << osd << " marked as UNSUSPENDED ***" << std::endl;
+ }
+ }
}
/**
- * Run until a condition is met, idle, or max_events is reached.
- * The condition is checked after each event execution.
- * Returns -1 if max_events was reached.
+ * Suspend events TO an OSD - events destined for this OSD will be queued
+ * but not executed until the OSD is unsuspended.
*/
- int run_until(int max_events, std::function<bool()> condition) {
+ void suspend_to_osd(int osd) {
+ suspended_to_osds.insert(osd);
if (verbose) {
- std::cout << "\n=== Running until condition";
- if (max_events > 0) {
- std::cout << " (max " << max_events << " events)";
- }
- std::cout << " ===" << std::endl;
+ std::cout << "*** Events TO OSD." << osd << " marked as SUSPENDED ***" << std::endl;
}
+ }
+
+ /**
+ * Unsuspend events TO an OSD - queued events to this OSD will be processed
+ * on subsequent run_one() calls.
+ */
+ void unsuspend_to_osd(int osd) {
+ suspended_to_osds.erase(osd);
- int executed = 0;
- while (has_events()) {
- if (max_events > 0 && executed >= max_events) {
- if (verbose) {
- std::cout << "=== Max events (" << max_events << ") reached ===" << std::endl;
- }
- return -1; // Timeout
+ // Move all suspended events for this OSD back to the main queue
+ auto it = suspended_events.find(osd);
+ if (it != suspended_events.end()) {
+ if (verbose) {
+ std::cout << "*** Events TO OSD." << osd << " marked as UNSUSPENDED - restoring "
+ << it->second.size() << " suspended events ***" << std::endl;
}
- run_one();
- executed++;
+ // Append suspended events to the main queue
+ for (auto& event : it->second) {
+ events.push_back(std::move(event));
+ }
- if (condition()) {
- if (verbose) {
- std::cout << "=== Condition met after " << executed << " events ===" << std::endl;
- }
- return executed;
+ suspended_events.erase(it);
+ } else {
+ if (verbose) {
+ std::cout << "*** Events TO OSD." << osd << " marked as UNSUSPENDED ***" << std::endl;
}
}
-
+ }
+
+ /**
+ * Suspend events FROM one OSD TO another OSD - events from from_osd to to_osd
+ * will be queued but not executed until unsuspended.
+ */
+ void suspend_from_to_osd(int from_osd, int to_osd) {
+ suspended_from_to_osds.insert({from_osd, to_osd});
if (verbose) {
- std::cout << "=== Idle: Executed " << executed << " events, condition not met ===" << std::endl;
+ std::cout << "*** Events FROM OSD." << from_osd << " TO OSD." << to_osd
+ << " marked as SUSPENDED ***" << std::endl;
}
-
- return executed;
}
- void clear() {
- while (!event_queue.empty()) {
- event_queue.pop();
+ /**
+ * Unsuspend events FROM one OSD TO another OSD - queued events will be processed
+ * on subsequent run_one() calls.
+ */
+ void unsuspend_from_to_osd(int from_osd, int to_osd) {
+ suspended_from_to_osds.erase({from_osd, to_osd});
+
+ // Move all suspended events for this OSD pair back to the main queue
+ auto it = suspended_events.find(to_osd);
+ if (it != suspended_events.end()) {
+ if (verbose) {
+ std::cout << "*** Events FROM OSD." << from_osd << " TO OSD." << to_osd
+ << " marked as UNSUSPENDED - restoring "
+ << it->second.size() << " suspended events ***" << std::endl;
+ }
+
+ // Append suspended events to the main queue
+ for (auto& event : it->second) {
+ events.push_back(std::move(event));
+ }
+
+ suspended_events.erase(it);
+ } else {
+ if (verbose) {
+ std::cout << "*** Events FROM OSD." << from_osd << " TO OSD." << to_osd
+ << " marked as UNSUSPENDED ***" << std::endl;
+ }
}
}
- void set_verbose(bool v) {
- verbose = v;
+ bool is_from_osd_suspended(int osd) const {
+ return suspended_from_osds.find(osd) != suspended_from_osds.end();
+ }
+
+ bool is_to_osd_suspended(int osd) const {
+ return suspended_to_osds.find(osd) != suspended_to_osds.end();
+ }
+
+ bool is_from_to_osd_suspended(int from_osd, int to_osd) const {
+ return suspended_from_to_osds.find({from_osd, to_osd}) != suspended_from_to_osds.end();
+ }
+
+ size_t get_suspended_event_count(int osd) const {
+ auto it = suspended_events.find(osd);
+ return (it != suspended_events.end()) ? it->second.size() : 0;
+ }
+
+ size_t get_total_suspended_events() const {
+ size_t total = 0;
+ for (const auto& pair : suspended_events) {
+ total += pair.second.size();
+ }
+ return total;
}
void print_stats() const {
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2026 IBM
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <functional>
+#include <vector>
+#include "msg/Message.h"
+#include "test/osd/EventLoop.h"
+#include "test/osd/MockConnection.h"
+#include "common/dout.h"
+#include "common/TrackedOp.h"
+#include "osd/OpRequest.h"
+#include "msg/async/frames_v2.h"
+
+#define dout_subsys ceph_subsys_osd
+
+/**
+ * MockMessenger - Routes messages through EventLoop with registered handlers
+ *
+ * This class provides a simple message routing mechanism for OSD tests.
+ * Event handlers are registered as lambdas that process messages, and
+ * messages are scheduled on the EventLoop for asynchronous processing.
+ *
+ * Epoch Handling:
+ * - Messages are tagged with the sender's epoch when sent
+ * - Messages are dropped if the receiver's current epoch doesn't match
+ * - This simulates real OSD behavior where stale messages are ignored
+ *
+ * OpRequest Handling:
+ * - Messages are wrapped in OpRequestRef before being passed to handlers
+ * - This ensures proper lifetime management and prevents use-after-free issues
+ * - Handlers receive OpRequestRef instead of raw Message* pointers
+ */
+class MockMessenger {
+public:
+ using MessageHandler = std::function<bool(int from_osd, int to_osd, Message*)>;
+ using EpochGetter = std::function<epoch_t(int osd)>;
+
+private:
+ EventLoop* event_loop = nullptr;
+ std::map<int, MessageHandler> handlers;
+ EpochGetter epoch_getter;
+ DoutPrefixProvider *dpp = nullptr;
+
+public:
+ MockMessenger(EventLoop* loop, CephContext* cct, DoutPrefixProvider *dpp = nullptr)
+ : event_loop(loop), dpp(dpp) {
+ ceph_assert(event_loop != nullptr);
+ ceph_assert(cct != nullptr);
+ }
+
+ /**
+ * Set the epoch getter callback.
+ * This callback is used to get the current epoch for a given OSD.
+ *
+ * @param getter Lambda that accepts an OSD number and returns its current epoch
+ */
+ void set_epoch_getter(EpochGetter getter) {
+ epoch_getter = std::move(getter);
+ }
+
+ /**
+ * Set the DoutPrefixProvider for logging
+ */
+ void set_dpp(DoutPrefixProvider *d) {
+ dpp = d;
+ }
+
+ /**
+ * Register an event handler for processing messages.
+ * Handlers are called in registration order until one returns true.
+ *
+ * @param type Message type code, or -1 for a catch-all handler
+ * @param handler Lambda that accepts (from_osd, to_osd, Message*) and returns true if handled
+ * Handler is responsible for wrapping the message in appropriate tracker/reference type
+ */
+ void register_handler(int type, MessageHandler handler) {
+ ceph_assert(!handlers.contains(type));
+ handlers.emplace(type, std::move(handler));
+ }
+
+ /**
+ * Register a type-safe handler for a specific message type.
+ * Uses C++20 concepts to ensure type safety at compile time.
+ * The handler receives the correctly-typed message pointer.
+ *
+ * @tparam MsgType The specific message type (e.g., MOSDECSubOpWrite)
+ * @param msg_type The message type code (e.g., MSG_OSD_EC_WRITE)
+ * @param handler Lambda that accepts (from_osd, to_osd, MsgType*) and returns true if handled
+ */
+ template<typename MsgType>
+ requires std::derived_from<MsgType, Message>
+ void register_typed_handler(int msg_type, std::function<bool(int, int, MsgType*)> handler) {
+ // Wrap the typed handler in a generic handler that performs the cast
+ register_handler(msg_type, [handler](int from_osd, int to_osd, Message* m) -> bool {
+ MsgType* typed_msg = dynamic_cast<MsgType*>(m);
+ if (!typed_msg) {
+ return false; // Wrong type, let other handlers try
+ }
+ return handler(from_osd, to_osd, typed_msg);
+ });
+ }
+
+ /**
+ * Send a message from one OSD to another.
+ * The message will be scheduled on the EventLoop and processed by
+ * registered handlers. The sender's epoch is captured at send time.
+ * Messages are dropped if the receiver's epoch doesn't match the message epoch.
+ * Handlers receive the raw Message* and are responsible for wrapping it in
+ * appropriate tracker/reference types (e.g., OpRequestRef).
+ * Panics if no handler processes the message.
+ *
+ * @param from_osd Source OSD number
+ * @param to_osd Destination OSD number
+ * @param m Message to send (takes ownership)
+ */
+ void send_message(int from_osd, int to_osd, Message* m) {
+ ceph_assert(from_osd >= 0);
+ ceph_assert(to_osd >= 0);
+ ceph_assert(m != nullptr);
+
+ // Wrap in MessageRef to manage lifetime
+ MessageRef mref(m);
+
+ // Capture the receiver's epoch at send time for epoch checking
+ epoch_t send_epoch = 0;
+ if (epoch_getter) {
+ send_epoch = epoch_getter(to_osd);
+ }
+
+ // Set the message header's source to the sender OSD BEFORE encoding
+ // This is critical for peering messages - MOSDPGInfo2::get_event() uses
+ // get_source().num() to construct the pg_shard_t for MInfoRec
+ ceph_msg_header h = m->get_header();
+ h.src.num = from_osd;
+ m->set_header(h);
+
+ if (dpp) {
+ ldpp_dout(dpp, 10) << "MockMessenger: Scheduling message from OSD." << from_osd
+ << " to OSD." << to_osd << " (type: " << m->get_type()
+ << ", epoch: " << send_epoch << ")" << dendl;
+ }
+
+ // Encode the message payload to prepare it for transmission
+ // This ensures internal structures like txn_payload are properly serialized
+ m->encode(CEPH_FEATURES_ALL, 0);
+
+ // Copy the message components to simulate network transmission
+ // This creates independent copies that can be decoded into a new message object
+ // on the receiver side, avoiding use-after-free issues
+
+ ceph_msg_header &header = m->get_header();
+ ceph_msg_footer &footer = m->get_footer();
+
+ ceph_msg_header2 header2{header.seq, header.tid,
+ header.type, header.priority,
+ header.version,
+ ceph_le32(0), header.data_off,
+ ceph_le64(0),
+ footer.flags, header.compat_version,
+ header.reserved};
+
+ auto mf = ceph::msgr::v2::MessageFrame::Encode(
+ header2,
+ m->get_payload(),
+ m->get_middle(),
+ m->get_data());
+
+
+ // Schedule event on EventLoop with the copied message components
+ event_loop->schedule_osd_message(from_osd, to_osd,
+ [this, header, footer, mf, from_osd, to_osd, send_epoch]() mutable {
+ // Get the receiver's current epoch when processing
+ epoch_t current_epoch = 0;
+ if (epoch_getter) {
+ current_epoch = epoch_getter(to_osd);
+ }
+
+ // Drop messages from different epochs (both old and new)
+ if (current_epoch != send_epoch) {
+ if (dpp) {
+ ldpp_dout(dpp, 10) << "MockMessenger: Dropping message from OSD." << from_osd
+ << " to OSD." << to_osd << " (type: " << header.type
+ << ") - epoch mismatch: send_epoch=" << send_epoch
+ << " current_epoch=" << current_epoch << dendl;
+ }
+ return;
+ }
+
+ // Decode the message components into a new message object
+ // This uses the proper decode_message() function that real messengers use,
+ // which supports the new footer format with message authentication
+ Message *decoded_msg = decode_message(g_ceph_context, 0, header, footer,
+ mf.front(), mf.middle(), mf.data(), new MockConnection(from_osd));
+
+ ceph_assert(decoded_msg);
+
+ // Try specific handler first, then catch-all handler (-1)
+ if (!handlers.contains(header.type)) {
+ std::cerr << "ERROR: No handler registered for message type " << header.type
+ << " (0x" << std::hex << header.type << std::dec << ")" << std::endl;
+ std::cerr << "Registered handlers: ";
+ for (const auto& [type, _] : handlers) {
+ std::cerr << type << " (0x" << std::hex << type << std::dec << "), ";
+ }
+ std::cerr << std::endl;
+ }
+ ceph_assert(handlers.contains(header.type));
+ ceph_assert(handlers.at(header.type)(from_osd, to_osd, decoded_msg));
+ });
+ }
+
+ /**
+ * Get the number of registered handlers
+ */
+ size_t handler_count() const {
+ return handlers.size();
+ }
+
+ /**
+ * Clear all registered handlers
+ */
+ void clear_handlers() {
+ handlers.clear();
+ }
+
+};
+
+// Made with Bob
#include "global/global_context.h"
#include "test/osd/MockConnection.h"
#include "test/osd/EventLoop.h"
+#include "test/osd/MockMessenger.h"
#include "osd/OpRequest.h"
// MockPGBackendListener - mock PGBackend::Listener and ECListener for multi-instance testing.
ObjectStore *store = nullptr;
ObjectStore::CollectionHandle ch;
EventLoop *event_loop = nullptr;
- std::function<bool(OpRequestRef)> handle_message_callback;
- std::map<int, std::function<bool(OpRequestRef)>> *message_router = nullptr;
+ MockMessenger *messenger = nullptr;
OpTracker *op_tracker = nullptr;
PerfCounters *perf_logger = nullptr;
peering_state = ps;
}
- void set_handle_message_callback(std::function<bool(OpRequestRef)> cb) {
- handle_message_callback = cb;
- }
-
- void set_message_router(std::map<int, std::function<bool(OpRequestRef)>> *router) {
- message_router = router;
+ void set_messenger(MockMessenger *m) {
+ messenger = m;
}
// Debugging
return c;
}
- // Routes messages through EventLoop for asynchronous EC message processing.
+ // Routes messages through MockMessenger for asynchronous message processing.
void send_message(int to_osd, Message *m) override {
MessageRef mref(m);
sent_messages.push_back(mref);
sent_messages_with_dest.push_back({to_osd, mref});
- if (event_loop && op_tracker && message_router) {
+ if (messenger) {
// Capture the sender's OSD ID
int from_osd = pg_whoami.osd;
- // IMPORTANT: Encode the message payload to simulate network transmission
- // This ensures that txn_payload is moved to the middle section for MOSDRepOp messages
- // Without this, Transaction::decode will fail because the message structure is incomplete
- mref->encode_payload(CEPH_FEATURES_ALL);
-
- event_loop->schedule_osd_message(to_osd, [this, mref, to_osd, from_osd]() {
- if (!mref->get_connection()) {
- // Set connection peer to the SENDER, not the destination
- ConnectionRef conn = new MockConnection(from_osd);
- mref->set_connection(conn);
- }
- OpRequestRef op = op_tracker->create_request<OpRequest>(mref.get());
-
- // Route to the correct shard's backend using the message router
- auto it = message_router->find(to_osd);
- if (it != message_router->end()) {
- it->second(op);
- }
- });
+ // Use MockMessenger to route the message with epoch tracking
+ // MockMessenger handles encoding, MockConnection setup, and epoch capture
+ messenger->send_message(from_osd, to_osd, m);
}
}
const pg_missing_const_i * maybe_get_shard_missing(
pg_shard_t peer) const override {
if (peering_state) {
- if (peer == peering_state->get_primary()) {
+ if (peer == primary_shard()) {
return &peering_state->get_pg_log().get_missing();
} else {
auto i = peering_state->get_peer_missing().find(peer);
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2026 IBM
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "test/osd/MockPeeringListener.h"
+#include "test/osd/ECPeeringTestFixture.h"
+#include "test/osd/EventLoop.h"
+
+// Implementation of MockPeeringListener::request_local_background_io_reservation
+// This must be defined after ECPeeringTestFixture is fully defined to avoid incomplete type errors
+void MockPeeringListener::request_local_background_io_reservation(
+ unsigned priority,
+ PGPeeringEventURef on_grant,
+ PGPeeringEventURef on_preempt) {
+ // If the test has configured an event loop (i.e. ECPeeringTestFixture),
+ // then use the event loop to run this event, rather than putting it on the queue.
+ if (event_loop && fixture) {
+ // Schedule the event through the event loop for deterministic execution
+ PGPeeringEventRef evt_ref = std::move(on_grant);
+ int shard = pg_whoami.osd;
+ event_loop->schedule_peering_event(shard, [this, evt_ref, shard]() {
+ fixture->get_peering_state(shard)->handle_event(evt_ref, fixture->get_peering_ctx(shard));
+ });
+ } else if (inject_event_stall) {
+ stalled_events.push_back(std::move(on_grant));
+ } else {
+ events.push_back(std::move(on_grant));
+ }
+ if (inject_keep_preempt) {
+ stalled_events.push_back(std::move(on_preempt));
+ }
+ io_reservations_requested++;
+}
+
+// Implementation of MockPeeringListener::request_remote_recovery_reservation
+// This must be defined after ECPeeringTestFixture is fully defined to avoid incomplete type errors
+void MockPeeringListener::request_remote_recovery_reservation(
+ unsigned priority,
+ PGPeeringEventURef on_grant,
+ PGPeeringEventURef on_preempt) {
+ // If the test has configured an event loop (i.e. ECPeeringTestFixture),
+ // then use the event loop to run this event, rather than putting it on the queue.
+ if (event_loop && fixture) {
+ // Schedule the event through the event loop for deterministic execution
+ PGPeeringEventRef evt_ref = std::move(on_grant);
+ int shard = pg_whoami.osd;
+ event_loop->schedule_peering_event(shard, [this, evt_ref, shard]() {
+ fixture->get_peering_state(shard)->handle_event(evt_ref, fixture->get_peering_ctx(shard));
+ });
+ } else if (inject_event_stall) {
+ stalled_events.push_back(std::move(on_grant));
+ } else {
+ events.push_back(std::move(on_grant));
+ }
+ if (inject_keep_preempt) {
+ stalled_events.push_back(std::move(on_preempt));
+ }
+ remote_recovery_reservations_requested++;
+}
+
+// Implementation of MockPeeringListener::schedule_event_on_commit
+// This must be defined after ECPeeringTestFixture is fully defined to avoid incomplete type errors
+void MockPeeringListener::schedule_event_on_commit(
+ ObjectStore::Transaction &t,
+ PGPeeringEventRef on_commit) {
+ // If the test has configured an event loop (i.e. ECPeeringTestFixture),
+ // then use the event loop to run this event, rather than putting it on the queue.
+ if (event_loop && fixture) {
+ // Schedule the event through the event loop for deterministic execution
+ int shard = pg_whoami.osd;
+ event_loop->schedule_peering_event(shard, [this, on_commit, shard]() {
+ fixture->get_peering_state(shard)->handle_event(on_commit, fixture->get_peering_ctx(shard));
+ });
+ } else if (inject_event_stall) {
+ stalled_events.push_back(std::move(on_commit));
+ } else {
+ events.push_back(std::move(on_commit));
+ }
+ events_on_commit_scheduled++;
+}
+
+// Implementation of MockPeeringListener::on_activate_complete
+// This must be defined after ECPeeringTestFixture is fully defined to avoid incomplete type errors
+void MockPeeringListener::on_activate_complete() {
+ dout(0) << __func__ << dendl;
+
+ // Helper lambda to schedule an event
+ auto schedule_event = [this](PGPeeringEventRef evt) {
+ if (event_loop && fixture) {
+ // Use event loop for deterministic execution
+ int shard = pg_whoami.osd;
+ event_loop->schedule_peering_event(shard, [this, evt, shard]() {
+ fixture->get_peering_state(shard)->handle_event(evt, fixture->get_peering_ctx(shard));
+ });
+ } else if (inject_event_stall) {
+ stalled_events.push_back(evt);
+ } else {
+ events.push_back(evt);
+ }
+ };
+
+ if (ps->needs_recovery()) {
+ dout(10) << "activate not all replicas are up-to-date, queueing recovery" << dendl;
+ schedule_event(std::make_shared<PGPeeringEvent>(
+ get_osdmap_epoch(),
+ get_osdmap_epoch(),
+ PeeringState::DoRecovery()));
+ } else if (ps->needs_backfill()) {
+ dout(10) << "activate queueing backfill" << dendl;
+ schedule_event(std::make_shared<PGPeeringEvent>(
+ get_osdmap_epoch(),
+ get_osdmap_epoch(),
+ PeeringState::RequestBackfill()));
+#if POOL_MIGRATION
+ } else if (ps->needs_pool_migration()) {
+ dout(10) << "activate queueing pool migration" << dendl;
+ schedule_event(std::make_shared<PGPeeringEvent>(
+ get_osdmap_epoch(),
+ get_osdmap_epoch(),
+ PeeringState::DoPoolMigration()));
+#endif
+ } else {
+ dout(10) << "activate all replicas clean, no recovery" << dendl;
+ schedule_event(std::make_shared<PGPeeringEvent>(
+ get_osdmap_epoch(),
+ get_osdmap_epoch(),
+ PeeringState::AllReplicasRecovered()));
+ }
+ activate_complete_called = true;
+}
+
#include "MockPGBackendListener.h"
#include "MockPGBackend.h"
#include "MockPGLogEntryHandler.h"
+#include "MockMessenger.h"
#include "global/global_context.h"
+// Forward declarations
+class EventLoop;
+class ECPeeringTestFixture;
+
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_osd
PerfCounters* logger_perf;
std::vector<int> next_acting;
+ // MockMessenger for routing cluster messages (optional - used by ECPeeringTestFixture)
+ MockMessenger* messenger = nullptr;
+
+ // EventLoop for routing events (optional - used by ECPeeringTestFixture)
+ class EventLoop* event_loop = nullptr;
+
+ // Fixture pointer for accessing peering state and context (optional - used by ECPeeringTestFixture)
+ class ECPeeringTestFixture* fixture = nullptr;
+
#ifdef WITH_CRIMSON
- // Per OSD state
+ // Per OSD state - kept for backward compatibility with TestPeeringState
+ // When messenger is set, messages are routed through it instead
std::map<int,std::list<MessageURef>> messages;
#else
- // Per OSD state
+ // Per OSD state - kept for backward compatibility with TestPeeringState
+ // When messenger is set, messages are routed through it instead
std::map<int,std::list<MessageRef>> messages;
#endif
std::vector<HeartbeatStampsRef> hb_stamps;
logger_perf = build_osd_logger(g_ceph_context);
g_ceph_context->get_perfcounters_collection()->add(logger_perf);
}
+ /// Constructor for ECPeeringTestFixture: accepts a pre-created backend listener
+ /// instead of creating one internally. This avoids the throw-away construction
+ /// pattern where the internally-created listener would be immediately replaced.
+ MockPeeringListener(OSDMapRef osdmap,
+ int64_t pool_id,
+ DoutPrefixProvider *dpp,
+ pg_shard_t pg_whoami,
+ std::unique_ptr<MockPGBackendListener> bl,
+ ObjectStore *object_store,
+ coll_t coll_arg,
+ ObjectStore::CollectionHandle ch_arg)
+ : pg_whoami(pg_whoami),
+ backend_listener(std::move(bl)),
+ coll(coll_arg),
+ ch(ch_arg)
+ {
+ backend = std::make_unique<MockPGBackend>(
+ g_ceph_context, backend_listener.get(), object_store, coll, ch);
+ recoverystate_perf = build_recoverystate_perf(g_ceph_context);
+ g_ceph_context->get_perfcounters_collection()->add(recoverystate_perf);
+ logger_perf = build_osd_logger(g_ceph_context);
+ g_ceph_context->get_perfcounters_collection()->add(logger_perf);
+ }
+
~MockPeeringListener() {
if (recoverystate_perf) {
#ifdef WITH_CRIMSON
void send_cluster_message(
int osd, MessageURef m, epoch_t epoch, bool share_map_update=false) override {
- dout(0) << "send_cluster_message to " << osd << " " << m << dendl;
- messages[osd].push_back(m);
+ dout(0) << "send_cluster_message to " << osd << " " << m << " epoch " << epoch << dendl;
+ if (messenger) {
+ // Use MockMessenger for EventLoop-based routing with epoch tracking
+ messenger->send_message(pg_whoami.osd, osd, m.detach());
+ } else {
+ // Fall back to direct message queue for TestPeeringState compatibility
+ messages[osd].push_back(m);
+ }
messages_sent++;
}
#else
void send_cluster_message(
int osd, MessageRef m, epoch_t epoch, bool share_map_update=false) override {
- dout(0) << "send_cluster_message to " << osd << " " << m << dendl;
- messages[osd].push_back(m);
+ dout(0) << "send_cluster_message to " << osd << " " << m << " epoch " << epoch << dendl;
+ if (messenger) {
+ // Use MockMessenger for EventLoop-based routing with epoch tracking
+ messenger->send_message(pg_whoami.osd, osd, m.detach());
+ } else {
+ // Fall back to direct message queue for TestPeeringState compatibility
+ messages[osd].push_back(m);
+ }
messages_sent++;
}
#endif
+ void set_messenger(MockMessenger* m) {
+ messenger = m;
+ }
+
+ void set_event_loop(EventLoop* el) {
+ event_loop = el;
+ }
+
+ void set_fixture(ECPeeringTestFixture* f) {
+ fixture = f;
+ }
+
void send_pg_created(pg_t pgid) override {
pg_created_sent = true;
}
void request_local_background_io_reservation(
unsigned priority,
PGPeeringEventURef on_grant,
- PGPeeringEventURef on_preempt) override {
- if (inject_event_stall) {
- stalled_events.push_back(std::move(on_grant));
- } else {
- events.push_back(std::move(on_grant));
- }
- if (inject_keep_preempt) {
- stalled_events.push_back(std::move(on_preempt));
- }
- io_reservations_requested++;
- }
+ PGPeeringEventURef on_preempt) override;
void update_local_background_io_priority(
unsigned priority) override {
void request_remote_recovery_reservation(
unsigned priority,
PGPeeringEventURef on_grant,
- PGPeeringEventURef on_preempt) override {
- if (inject_event_stall) {
- stalled_events.push_back(std::move(on_grant));
- } else {
- events.push_back(std::move(on_grant));
- }
- if (inject_keep_preempt) {
- stalled_events.push_back(std::move(on_preempt));
- }
- remote_recovery_reservations_requested++;
- }
+ PGPeeringEventURef on_preempt) override;
void cancel_remote_recovery_reservation() override {
remote_recovery_reservation_cancelled = true;
void schedule_event_on_commit(
ObjectStore::Transaction &t,
- PGPeeringEventRef on_commit) override {
- if (inject_event_stall) {
- stalled_events.push_back(std::move(on_commit));
- } else {
- events.push_back(std::move(on_commit));
- }
- events_on_commit_scheduled++;
- }
+ PGPeeringEventRef on_commit) override;
void update_heartbeat_peers(std::set<int> peers) override {
heartbeat_peers_updated = true;
return OstreamTemp(CLOG_DEBUG, nullptr);
}
- void on_activate_complete() override {
- dout(0) << __func__ << dendl;
- std::list<PGPeeringEventRef> *event_queue;
- if (inject_event_stall) {
- event_queue = &stalled_events;
- } else {
- event_queue = &events;
- }
-
- if (ps->needs_recovery()) {
- dout(10) << "activate not all replicas are up-to-date, queueing recovery" << dendl;
- event_queue->push_back(
- std::make_shared<PGPeeringEvent>(
- get_osdmap_epoch(),
- get_osdmap_epoch(),
- PeeringState::DoRecovery()));
- } else if (ps->needs_backfill()) {
- dout(10) << "activate queueing backfill" << dendl;
- event_queue->push_back(
- std::make_shared<PGPeeringEvent>(
- get_osdmap_epoch(),
- get_osdmap_epoch(),
- PeeringState::RequestBackfill()));
-#if POOL_MIGRATION
- } else if (ps->needs_pool_migration()) {
- dout(10) << "activate queueing pool migration" << dendl;
- event_queue->push_back(
- std::make_shared<PGPeeringEvent>(
- get_osdmap_epoch(),
- get_osdmap_epoch(),
- PeeringState::DoPoolMigration()));
-#endif
- } else {
- dout(10) << "activate all replicas clean, no recovery" << dendl;
- event_queue->push_back(
- std::make_shared<PGPeeringEvent>(
- get_osdmap_epoch(),
- get_osdmap_epoch(),
- PeeringState::AllReplicasRecovered()));
- }
- activate_complete_called = true;
- }
+ void on_activate_complete() override;
void on_activate_committed() override {
activate_committed_called = true;
int k,
int m,
uint64_t stripe_width,
- uint64_t flags,
- int64_t pool_id = 0)
+ uint64_t flags)
{
pg_pool_t pool;
pool.type = pg_pool_t::TYPE_ERASURE;
+
pool.size = k + m;
+
pool.min_size = k;
pool.crush_rule = 0;
pool.erasure_code_profile = "default";
#include "test/osd/PGBackendTestFixture.h"
#include "common/errno.h"
+#include "messages/MOSDECSubOpWrite.h"
+#include "messages/MOSDECSubOpWriteReply.h"
+#include "messages/MOSDECSubOpRead.h"
+#include "messages/MOSDECSubOpReadReply.h"
+#include "messages/MOSDRepOp.h"
+#include "messages/MOSDRepOpReply.h"
void PGBackendTestFixture::setup_ec_pool()
{
CephContext *cct = g_ceph_context;
+ int num_osds = k + m;
+
osdmap = std::make_shared<OSDMap>();
- osdmap->set_max_osd(k + m);
+ osdmap->set_max_osd(num_osds);
- for (int i = 0; i < k + m; i++) {
+ for (int i = 0; i < num_osds; i++) {
osdmap->set_state(i, CEPH_OSD_EXISTS);
osdmap->set_weight(i, CEPH_OSD_OUT);
osdmap->crush->set_item_name(i, "osd." + std::to_string(i));
OSDMap::Incremental inc(osdmap->get_epoch() + 1);
inc.fsid = osdmap->get_fsid();
- for (int i = 0; i < k + m; i++) {
+ for (int i = 0; i < num_osds; i++) {
inc.new_state[i] = CEPH_OSD_UP;
inc.new_weight[i] = CEPH_OSD_IN;
// This will properly calculate up_osd_features
osdmap->apply_incremental(inc);
- pg_pool_t pool = OSDMapTestHelpers::create_ec_pool(k, m, stripe_unit * k, pool_flags, pool_id);
+ pg_pool_t pool = OSDMapTestHelpers::create_ec_pool(k, m, stripe_unit * k, pool_flags);
OSDMapTestHelpers::add_pool(osdmap, pool_id, pool);
pgid = pg_t(0, pool_id);
spgid = spg_t(pgid, shard_id_t(0));
- OSDMapTestHelpers::setup_ec_pg(osdmap, pgid, k, m, 0);
+ std::vector<int> acting;
+ for (int i = 0; i < num_osds; i++) {
+ acting.push_back(i);
+ }
+ OSDMapTestHelpers::set_pg_acting(osdmap, pgid, acting);
// Finalize the CRUSH map to calculate working_size
// This is required for crush_init_workspace() to work correctly
}
ObjectStore::Transaction t;
- for (int i = 0; i < k + m; i++) {
+ for (int i = 0; i < num_osds; i++) {
spg_t shard_spgid(pgid, shard_id_t(i));
coll_t shard_coll(shard_spgid);
auto shard_ch = store->create_new_collection(shard_coll);
const pg_pool_t* pool_ptr = OSDMapTestHelpers::get_pool(osdmap, pool_id);
ceph_assert(pool_ptr != nullptr);
- for (int i = 0; i < k + m; i++) {
- std::unique_ptr<MockPGBackendListener> shard_listener;
- if (listener_factory) {
- shard_listener = listener_factory(
- i,
- osdmap,
- pool_id,
- dpp.get(),
- pg_shard_t(i, shard_id_t(i)));
- } else {
- shard_listener = std::make_unique<MockPGBackendListener>(
- osdmap,
- pool_id,
- dpp.get(),
- pg_shard_t(i, shard_id_t(i))
- );
- }
+ for (int i = 0; i < num_osds; i++) {
+ auto shard_listener = std::make_unique<MockPGBackendListener>(
+ osdmap, pool_id, dpp.get(), pg_shard_t(i, shard_id_t(i)));
// Initialize the listener's own info.pgid so OSDMap queries work
shard_listener->info.pgid = spg_t(pgid, shard_id_t(i));
- for (int j = 0; j < k + m; j++) {
+ for (int j = 0; j < num_osds; j++) {
shard_listener->shardset.insert(pg_shard_t(j, shard_id_t(j)));
shard_listener->acting_recovery_backfill_shard_id_set.insert(shard_id_t(j));
shard_listener->set_store(store.get(), chs[i]);
shard_listener->set_event_loop(event_loop.get());
- shard_listener->set_op_tracker(op_tracker.get());
auto shard_lru = std::make_unique<ECExtentCache::LRU>(1024 * 1024 * 100);
auto shard_ec_switch = std::make_unique<ECSwitch>(
backends[i] = std::move(shard_ec_switch);
}
- for (int i = 0; i < k + m; i++) {
- message_router[i] = [this, i](OpRequestRef op) -> bool {
- return backends[i]->_handle_message(op);
- };
- }
-
- for (int i = 0; i < k + m; i++) {
- listeners[i]->set_message_router(&message_router);
- listeners[i]->set_handle_message_callback(
- [this, i](OpRequestRef op) -> bool {
- return backends[i]->_handle_message(op);
+ // Create MockMessenger and register a single handler that routes to backends
+ messenger = std::make_unique<MockMessenger>(event_loop.get(), cct);
+
+ // Set up epoch getter for MockMessenger to enable epoch-based message filtering
+ messenger->set_epoch_getter([this](int osd) -> epoch_t {
+ // Get the epoch from the listener's osdmap
+ auto it = listeners.find(osd);
+ if (it != listeners.end()) {
+ return it->second->pgb_get_osdmap_epoch();
+ }
+ // If listener doesn't exist yet, use the test fixture's osdmap
+ return osdmap->get_epoch();
+ });
+
+ // Create an OpTracker for wrapping messages in OpRequestRef
+ // This is needed because PGBackend::_handle_message expects OpRequestRef
+ // Store as member variable so it can be properly shut down in TearDown()
+ op_tracker = std::make_shared<OpTracker>(cct, true, 1);
+
+ // Helper lambda to create a typed handler that wraps messages and routes to backends
+ auto make_backend_handler = [this]<typename MsgType>(int msg_type) {
+ messenger->register_typed_handler<MsgType>(msg_type,
+ [this](int from_osd, int to_osd, MsgType* m) -> bool {
+ auto it = backends.find(to_osd);
+ ceph_assert(it != backends.end());
+ OpRequestRef op = this->op_tracker->create_request<OpRequest, Message*>(m);
+ return it->second->_handle_message(op);
});
+ };
+
+ // Register typed handlers for all EC message types
+ make_backend_handler.template operator()<MOSDECSubOpWrite>(MSG_OSD_EC_WRITE);
+ make_backend_handler.template operator()<MOSDECSubOpWriteReply>(MSG_OSD_EC_WRITE_REPLY);
+ make_backend_handler.template operator()<MOSDECSubOpRead>(MSG_OSD_EC_READ);
+ make_backend_handler.template operator()<MOSDECSubOpReadReply>(MSG_OSD_EC_READ_REPLY);
+
+ for (int i = 0; i < num_osds; i++) {
+ listeners[i]->set_messenger(messenger.get());
}
}
ceph_assert(pool_ptr != nullptr);
for (int i = 0; i < num_replicas; i++) {
- std::unique_ptr<MockPGBackendListener> replica_listener;
- if (listener_factory) {
- replica_listener = listener_factory(
- i,
- osdmap,
- pool_id,
- dpp.get(),
- pg_shard_t(i, shard_id_t::NO_SHARD));
- } else {
- replica_listener = std::make_unique<MockPGBackendListener>(
- osdmap,
- pool_id,
- dpp.get(),
- pg_shard_t(i, shard_id_t::NO_SHARD)
- );
- }
+ auto replica_listener = std::make_unique<MockPGBackendListener>(
+ osdmap, pool_id, dpp.get(), pg_shard_t(i, shard_id_t::NO_SHARD));
// Initialize the listener's own info.pgid so OSDMap queries work
replica_listener->info.pgid = spg_t(pgid, shard_id_t::NO_SHARD);
replica_listener->set_store(store.get(), chs[i]);
replica_listener->set_event_loop(event_loop.get());
- replica_listener->set_op_tracker(op_tracker.get());
auto replica_backend = std::make_unique<ReplicatedBackend>(
replica_listener.get(), colls[i], chs[i], store.get(), cct);
backends[i] = std::move(replica_backend);
}
- for (int i = 0; i < num_replicas; i++) {
- message_router[i] = [this, i](OpRequestRef op) -> bool {
- return backends[i]->_handle_message(op);
- };
- }
+ // Create MockMessenger and register a single handler that routes to backends
+ messenger = std::make_unique<MockMessenger>(event_loop.get(), cct);
+
+ // Set up epoch getter for MockMessenger to enable epoch-based message filtering
+ messenger->set_epoch_getter([this](int osd) -> epoch_t {
+ // Get the epoch from the listener's osdmap
+ auto it = listeners.find(osd);
+ if (it != listeners.end()) {
+ return it->second->pgb_get_osdmap_epoch();
+ }
+ // If listener doesn't exist yet, use the test fixture's osdmap
+ return osdmap->get_epoch();
+ });
+
+ // Create an OpTracker for wrapping messages in OpRequestRef
+ // This is needed because PGBackend::_handle_message expects OpRequestRef
+ // Store as member variable so it can be properly shut down in TearDown()
+ op_tracker = std::make_shared<OpTracker>(cct, true, 1);
+
+ // Helper lambda to create a typed handler that wraps messages and routes to backends
+ auto make_backend_handler = [this]<typename MsgType>(int msg_type) {
+ messenger->register_typed_handler<MsgType>(msg_type,
+ [this](int from_osd, int to_osd, MsgType* m) -> bool {
+ auto it = backends.find(to_osd);
+ ceph_assert(it != backends.end());
+ OpRequestRef op = this->op_tracker->create_request<OpRequest, Message*>(m);
+ return it->second->_handle_message(op);
+ });
+ };
+
+ // Register typed handlers for replicated backend message types
+ make_backend_handler.template operator()<MOSDRepOp>(MSG_OSD_REPOP);
+ make_backend_handler.template operator()<MOSDRepOpReply>(MSG_OSD_REPOPREPLY);
for (int i = 0; i < num_replicas; i++) {
- listeners[i]->set_message_router(&message_router);
- listeners[i]->set_handle_message_callback(
- [this, i](OpRequestRef op) -> bool {
- return backends[i]->_handle_message(op);
- });
+ listeners[i]->set_messenger(messenger.get());
}
}
PGTransactionUPtr pg_t,
const object_stat_sum_t& delta_stats,
const eversion_t& at_version,
- std::vector<pg_log_entry_t> log_entries)
+ std::vector<pg_log_entry_t> log_entries,
+ std::function<void(int)> on_write_complete)
{
eversion_t trim_to(0, 0);
eversion_t pg_committed_to(0, 0);
bool completed = false;
int completion_result = -1;
- Context *on_complete = new LambdaContext([&completed, &completion_result](int r) {
+ Context *on_complete = new LambdaContext([&completed, &completion_result, on_write_complete](int r) {
completed = true;
completion_result = r;
+ // Call the write-specific completion lambda if provided
+ if (on_write_complete) {
+ on_write_complete(r);
+ }
});
- ceph_tid_t tid = 1;
+ ceph_tid_t tid = next_tid++;
osd_reqid_t reqid(entity_name_t::OSD(0), 0, tid);
PGBackend* primary_backend = get_primary_backend();
OpRequestRef()
);
- event_loop->run_until_idle(10000);
+ event_loop->run_until_idle();
if (!completed) {
- throw std::runtime_error("Transaction did not complete within timeout");
+ completion_result = -EINPROGRESS;
}
return completion_result;
int PGBackendTestFixture::create_and_write(
const std::string& obj_name,
- const std::string& data,
- const eversion_t& at_version)
+ const std::string& data)
{
+ // Auto-generate version
+ eversion_t at_version = get_next_version();
+
hobject_t hoid = make_test_object(obj_name);
PGTransactionUPtr pg_t = std::make_unique<PGTransaction>();
pg_t->create(hoid);
- ObjectContextRef obc = make_object_context(hoid, false, 0);
+ // Use persistent OBC so attr_cache is maintained across operations
+ ObjectContextRef obc = get_or_create_obc(hoid, false, 0);
pg_t->obc_map[hoid] = obc;
+ // Note: We do NOT pre-seed attr_cache here. For a new object, attr_cache
+ // should be empty. ECTransaction::attr_updates() will update attr_cache
+ // with the new OI from PGTransaction::attr_updates during the transaction.
+
+ // Track outstanding write
+ outstanding_writes[hoid]++;
+
bufferlist bl;
bl.append(data);
pg_t->write(hoid, 0, bl.length(), bl);
delta_stats.num_objects = 1;
delta_stats.num_bytes = bl.length();
+ // Build the NEW OI that finish_ctx() would produce
+ object_info_t new_oi = obc->obs.oi;
+ new_oi.version = at_version;
+ new_oi.prior_version = obc->obs.oi.version;
+ new_oi.size = bl.length();
+
+ // Encode new OI and put into PGTransaction as an attr update.
+ // This matches PrimaryLogPG::finish_ctx() lines 9127-9130,9142.
+ {
+ bufferlist oi_bl;
+ new_oi.encode(oi_bl,
+ osdmap->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
+ pg_t->setattr(hoid, OI_ATTR, oi_bl);
+ }
+
+ // snapset
+ if (hoid.snap == CEPH_NOSNAP) {
+ bufferlist bss;
+ encode(SnapSet(), bss);
+ pg_t->setattr(hoid, SS_ATTR, bss);
+ }
+
+ // Update OBC obs to new state BEFORE submitting the transaction.
+ // This matches PrimaryLogPG::finish_ctx() line 9187: ctx->obc->obs = ctx->new_obs
+ // At this point: obc->obs.oi has NEW state, obc->attr_cache[OI_ATTR] has OLD state.
+ obc->obs.oi = new_oi;
+ obc->obs.exists = true;
+
std::vector<pg_log_entry_t> log_entries;
pg_log_entry_t entry;
entry.mark_unrollbackable();
entry.prior_version = eversion_t(0, 0);
log_entries.push_back(entry);
- int result = do_transaction_and_complete(
- hoid, std::move(pg_t), delta_stats, at_version, std::move(log_entries));
+ // Create completion lambda for write-specific cleanup
+ auto write_complete = [this, hoid, obc](int r) {
+ // Note: we do NOT update obc->obs after completion — it was already
+ // updated above before submit, matching PrimaryLogPG behavior.
+ // ECTransaction::attr_updates() will have updated attr_cache[OI_ATTR]
+ // to the new encoded OI during the transaction.
+
+ // Decrement outstanding writes counter
+ if (outstanding_writes[hoid] > 0) {
+ outstanding_writes[hoid]--;
+ // Clean up the counter if it reaches 0, but don't clear attr_cache here.
+ // The attr_cache will be cleared on on_change() events.
+ if (outstanding_writes[hoid] == 0) {
+ outstanding_writes.erase(hoid);
+ }
+ }
- if (result == 0) {
- obc->obs.exists = true;
- obc->obs.oi.size = bl.length();
- obc->obs.oi.version = at_version;
- }
+ if (r != 0 && r != -EINPROGRESS) {
+ // Transaction failed — roll back OBC state.
+ // In production this would be handled differently, but for tests
+ // we just reset to a clean state.
+ obc->obs.oi = object_info_t(hoid);
+ obc->obs.exists = false;
+ obc->attr_cache.clear();
+ outstanding_writes.erase(hoid);
+ }
+ };
+
+ int result = do_transaction_and_complete(
+ hoid, std::move(pg_t), delta_stats, at_version, std::move(log_entries), write_complete);
return result;
}
+ObjectContextRef PGBackendTestFixture::get_object_context(
+ const hobject_t& hoid)
+{
+ PGBackend* primary_backend = get_primary_backend();
+ ObjectContextRef obc = std::make_shared<ObjectContext>();
+ obc->obs.oi = object_info_t(hoid);
+ obc->obs.exists = false;
+ obc->ssc = nullptr;
+
+ // Try to read the ObjectInfo from the store
+ ghobject_t ghoid(hoid, ghobject_t::NO_GEN, primary_backend->get_parent()->whoami_shard().shard);
+ ceph::buffer::ptr value_ptr;
+ int r = store->getattr(ch, ghoid, OI_ATTR, value_ptr);
+ ceph_assert(r >= 0 && value_ptr.length() > 0);
+
+ bufferlist bl;
+ bl.append(value_ptr);
+ auto p = bl.cbegin();
+ obc->obs.oi.decode(p);
+ obc->obs.exists = true;
+
+ return obc;
+}
+
int PGBackendTestFixture::write(
const std::string& obj_name,
uint64_t offset,
const std::string& data,
- const eversion_t& prior_version,
- const eversion_t& at_version,
uint64_t object_size)
{
hobject_t hoid = make_test_object(obj_name);
PGTransactionUPtr pg_t = std::make_unique<PGTransaction>();
- ObjectContextRef obc = make_object_context(hoid, true, object_size);
- obc->obs.oi.version = prior_version;
+ ObjectContextRef obc = get_or_create_obc(hoid, true, object_size);
pg_t->obc_map[hoid] = obc;
+ // Track outstanding write
+ outstanding_writes[hoid]++;
+
bufferlist bl;
bl.append(data);
pg_t->write(hoid, offset, bl.length(), bl);
- object_stat_sum_t delta_stats;
uint64_t new_size = std::max(object_size, offset + bl.length());
+
+ object_stat_sum_t delta_stats;
if (new_size > object_size) {
delta_stats.num_bytes = new_size - object_size;
} else {
delta_stats.num_bytes = 0;
}
+ // Prior version comes from the object's current version
+ eversion_t prior_version = obc->obs.oi.version;
+ eversion_t at_version = get_next_version();
+
+ // Build the NEW OI
+ object_info_t new_oi = obc->obs.oi;
+ new_oi.version = at_version;
+ new_oi.prior_version = prior_version;
+ new_oi.size = new_size;
+
+ // Encode new OI into PGTransaction
+ {
+ bufferlist oi_bl;
+ new_oi.encode(oi_bl,
+ osdmap->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
+ pg_t->setattr(hoid, OI_ATTR, oi_bl);
+ }
+
+ // Update OBC obs to new state BEFORE submitting
+ obc->obs.oi = new_oi;
+
std::vector<pg_log_entry_t> log_entries;
pg_log_entry_t entry;
// Don't mark as unrollbackable - partial writes need rollback support
entry.prior_version = prior_version;
log_entries.push_back(entry);
- int result = do_transaction_and_complete(
- hoid, std::move(pg_t), delta_stats, at_version, std::move(log_entries));
+ // Create completion lambda for write-specific cleanup
+ auto write_complete = [this, hoid, obc, prior_version, object_size](int r) {
+ // Decrement outstanding writes counter
+ if (outstanding_writes[hoid] > 0) {
+ outstanding_writes[hoid]--;
+ // Clean up the counter if it reaches 0, but don't clear attr_cache here.
+ // The attr_cache will be cleared on on_change() events.
+ if (outstanding_writes[hoid] == 0) {
+ outstanding_writes.erase(hoid);
+ }
+ }
- if (result == 0) {
- obc->obs.oi.size = new_size;
- obc->obs.oi.version = at_version;
- }
+ if (r != 0 && r != -EINPROGRESS) {
+ // Roll back OBC on failure
+ obc->obs.oi.version = prior_version;
+ obc->obs.oi.size = object_size;
+ obc->attr_cache.clear();
+ outstanding_writes.erase(hoid);
+ }
+ };
+
+ int result = do_transaction_and_complete(
+ hoid, std::move(pg_t), delta_stats, at_version, std::move(log_entries), write_complete);
return result;
}
false
);
- event_loop->run_until_idle(10000);
+ event_loop->run_until_idle();
- if (!completed) {
- throw std::runtime_error("Read operation did not complete within timeout");
- }
+ ceph_assert(completed);
return completion_result;
} else {
}
}
+void PGBackendTestFixture::verify_object(
+ const std::string& obj_name,
+ const std::string& expected_data,
+ size_t offset,
+ size_t object_size)
+{
+ bufferlist read_data;
+ int read_result = read_object(obj_name, offset, expected_data.length(), read_data, object_size);
+
+ EXPECT_GE(read_result, 0) << "Read should complete successfully";
+ EXPECT_EQ(read_data.length(), expected_data.length()) << "Read data length should match";
+
+ if (read_data.length() == expected_data.length()) {
+ std::string read_string(read_data.c_str(), read_data.length());
+ EXPECT_EQ(read_string, expected_data) << "Data should match";
+ }
+}
+
+void PGBackendTestFixture::create_and_write_verify(
+ const std::string& obj_name,
+ const std::string& data)
+{
+ int result = create_and_write(obj_name, data);
+
+ EXPECT_GE(result, 0) << "Write should complete successfully";
+
+ // Always verify - tests should only use this helper when success is expected
+ verify_object(obj_name, data, 0, data.length());
+}
+
+void PGBackendTestFixture::write_verify(
+ const std::string& obj_name,
+ size_t offset,
+ const std::string& data,
+ size_t object_size,
+ const std::string& context_msg)
+{
+ int result = write(obj_name, offset, data, object_size);
+
+ std::string msg_suffix = context_msg.empty() ? "" : " (" + context_msg + ")";
+ EXPECT_GE(result, 0) << "Write should complete successfully" << msg_suffix;
+
+ // Always verify - tests should only use this helper when success is expected
+ bufferlist read_data;
+ int read_result = read_object(obj_name, offset, data.length(), read_data,
+ std::max(object_size, offset + data.length()));
+
+ EXPECT_GE(read_result, 0) << "Read should complete successfully" << msg_suffix;
+ EXPECT_EQ(read_data.length(), data.length()) << "Read data length should match" << msg_suffix;
+
+ if (read_data.length() == data.length()) {
+ std::string read_string(read_data.c_str(), read_data.length());
+ EXPECT_EQ(read_string, data) << "Written data should match" << msg_suffix;
+ }
+}
+
// ---------------------------------------------------------------------------
// NOTE: update_osdmap() intentionally does NOT reconcile listener acting sets
//
std::shared_ptr<OSDMap> new_osdmap,
std::optional<pg_shard_t> new_primary)
{
- // Step 1: Call on_change() on all backends to clear in-flight operations
- for (auto& [instance, be] : backends) {
- if (be) {
- be->on_change();
- }
- }
-
- // Step 2: Update the osdmap reference
+ // Step 1: Update the osdmap reference first
osdmap = new_osdmap;
- // Step 3: Update the osdmap in all listeners
+ // Step 2: Update the osdmap in all listeners
for (auto& [instance, list] : listeners) {
if (list) {
list->osdmap = new_osdmap;
}
}
+
+ // Step 3: Clear all attr_caches before on_change()
+ // The cached OI attributes may be stale after a peering event
+ clear_all_attr_caches();
+
+ // Step 4: Schedule on_change() calls as event loop actions
+ // This allows them to be delayed and processed after the new epoch
+ for (auto& [instance, be] : backends) {
+ if (be) {
+ PGBackend* backend_ptr = be.get();
+ event_loop->schedule_peering_event(instance, [backend_ptr]() {
+ backend_ptr->on_change();
+ });
+ }
+ }
+ event_loop->run_until_idle();
}
void PGBackendTestFixture::cleanup_data_dir()
}
}
+void PGBackendTestFixture::clear_all_attr_caches()
+{
+ // Clear attr_cache for all objects. This is called on on_change() to
+ // invalidate cached attributes that might be stale after a peering event.
+ for (auto& [hoid, obc] : object_contexts) {
+ if (obc) {
+ obc->attr_cache.clear();
+ }
+ }
+}
#include "test/osd/MockErasureCode.h"
#include "test/osd/MockPGBackendListener.h"
#include "test/osd/EventLoop.h"
+#include "test/osd/MockMessenger.h"
#include "common/TrackedOp.h"
#include "os/memstore/MemStore.h"
#include "osd/ECSwitch.h"
coll_t coll;
std::shared_ptr<OSDMap> osdmap;
- std::unique_ptr<OpTracker> op_tracker;
std::unique_ptr<EventLoop> event_loop;
- std::map<int, std::function<bool(OpRequestRef)>> message_router;
+ std::unique_ptr<MockMessenger> messenger;
std::map<int, std::unique_ptr<MockPGBackendListener>> listeners;
std::map<int, std::unique_ptr<PGBackend>> backends;
std::map<int, coll_t> colls;
std::map<int, ObjectStore::CollectionHandle> chs;
- /**
- * Optional listener factory callback.
- *
- * If set, setup_ec_pool() and setup_replicated_pool() will call this
- * factory instead of constructing MockPGBackendListener directly.
- * The factory receives the instance index and the parameters needed to
- * construct the listener, and must return a unique_ptr to the new
- * MockPGBackendListener. The returned object is stored in listeners[i]
- * as usual, so ownership stays with the base class.
- *
- * Derived classes (e.g. ECPeeringTestFixture) can set this in their
- * constructor to gain direct access to the created listeners without
- * needing to steal ownership via release_listener().
- */
- std::function<std::unique_ptr<MockPGBackendListener>(
- int instance,
- std::shared_ptr<OSDMap> osdmap,
- int64_t pool_id,
- DoutPrefixProvider* dpp,
- pg_shard_t whoami)> listener_factory;
-
+ /// Persistent OBC storage - emulates PrimaryLogPG's object_contexts LRU.
+ /// Keyed by hobject_t, values are shared_ptr so the same OBC is reused
+ /// across sequential operations on the same object. This is critical for
+ /// EC attr_cache continuity.
+ std::map<hobject_t, ObjectContextRef> object_contexts;
+
+ /// Track outstanding writes per object. When this reaches 0, we can safely
+ /// clear attr_cache (as there are no in-flight writes that might have stale
+ /// cached OI data).
+ std::map<hobject_t, int> outstanding_writes;
+
+ // OpTracker for wrapping messages in OpRequestRef
+ std::shared_ptr<OpTracker> op_tracker;
+
ceph::ErasureCodeInterfaceRef ec_impl;
std::map<int, std::unique_ptr<ECExtentCache::LRU>> lrus;
int k = 4; // data chunks
uint64_t stripe_unit = 4096; // aka chunk_size
std::string ec_plugin = "isa";
std::string ec_technique = "reed_sol_van";
-
+
int num_replicas = 3;
int min_size = 2;
pg_t pgid;
spg_t spgid;
+ // Transaction ID counter - increments with each transaction
+ ceph_tid_t next_tid = 1;
+
+ // Version counter for auto-generating versions in write* functions
+ // The epoch comes from osdmap, this tracks the second version number
+ uint64_t next_version = 1;
+
class TestDpp : public NoDoutPrefix {
public:
TestDpp(CephContext *cct) : NoDoutPrefix(cct, ceph_subsys_osd) {}
CephContext *cct = g_ceph_context;
dpp = std::make_unique<TestDpp>(cct);
event_loop = std::make_unique<EventLoop>(false);
- op_tracker = std::make_unique<OpTracker>(cct, false, 1);
if (pool_type == EC) {
setup_ec_pool();
}
void TearDown() override {
- // 0. Process any remaining events in the EventLoop.
- // If the test passed, orphaned events indicate a bug - warn and skip draining
- // so the test fails loudly. If the test already failed, drain silently to
- // allow the rest of TearDown to complete without cascading errors.
+
if (event_loop) {
if (event_loop->has_events()) {
if (!HasFailure()) {
ADD_FAILURE() << "TearDown: " << event_loop->queued_event_count()
<< " orphaned events remain after a passing test";
}
- event_loop->run_until_idle(1000);
+ event_loop->run_until_idle();
}
}
-
- // 1. Clean up all backend instances (polymorphic cleanup)
- // Note: We skip calling on_change() during teardown as it may access
- // invalid state. The backends will be destroyed anyway.
+
+ if (op_tracker) {
+ op_tracker->on_shutdown();
+ op_tracker.reset();
+ }
+
backends.clear();
+ object_contexts.clear();
+ outstanding_writes.clear();
- // 2. Clean up EC-specific resources
if (pool_type == EC) {
lrus.clear();
ec_impl.reset();
}
-
- // 3. Clean up listeners
+
listeners.clear();
-
- // 4. Reset op tracker (call on_shutdown first)
- if (op_tracker) {
- op_tracker->on_shutdown();
- op_tracker.reset();
- }
-
- // 5. Reset all collection handles
chs.clear();
colls.clear();
if (ch) {
ch.reset();
}
-
- // 6. Unmount and destroy the store
+
if (store) {
store->umount();
store.reset();
}
-
- // 7. Clean up the test directory
+
cleanup_data_dir();
}
return obc;
}
+ /// Get an existing OBC or create a new one.
+ /// Unlike make_object_context(), this method reuses OBCs for the same
+ /// object across operations, which is essential for attr_cache continuity
+ /// in EC pools.
+ ObjectContextRef get_or_create_obc(
+ const hobject_t& hoid,
+ bool exists = false,
+ uint64_t size = 0)
+ {
+ auto it = object_contexts.find(hoid);
+ if (it != object_contexts.end()) {
+ return it->second;
+ }
+ ObjectContextRef obc = make_object_context(hoid, exists, size);
+
+ // If the object exists and this is an EC pool, populate attr_cache with
+ // ALL attributes from disk. This matches production behavior where the OBC
+ // is loaded with all xattrs from the object store.
+ if (exists && pool_type == EC && store && !chs.empty()) {
+ auto writes_it = outstanding_writes.find(hoid);
+ bool has_outstanding_writes = (writes_it != outstanding_writes.end() && writes_it->second > 0);
+
+ // Only read from disk if there are no outstanding writes
+ if (!has_outstanding_writes) {
+ ObjectStore::CollectionHandle ch_primary = chs[0];
+ if (ch_primary) {
+ ghobject_t ghoid(hoid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD);
+ std::map<std::string, ceph::buffer::ptr, std::less<>> attrs;
+ int r = store->getattrs(ch_primary, ghoid, attrs);
+
+ if (r >= 0) {
+ // Successfully read all attributes from disk - populate the cache
+ for (auto& [key, value_ptr] : attrs) {
+ bufferlist bl;
+ bl.append(value_ptr);
+ obc->attr_cache[key] = std::move(bl);
+ }
+ }
+ }
+ }
+ }
+
+ object_contexts[hoid] = obc;
+ return obc;
+ }
+
+ /**
+ * Set the next version number for auto-generation.
+ * This can be used by tests after rollback to set the version to a specific value.
+ * The epoch will still come from the osdmap.
+ */
+ void set_next_version(uint64_t version) {
+ next_version = version;
+ }
+
+ /**
+ * Get the next version as an eversion_t with epoch from osdmap.
+ * This auto-increments the version counter.
+ */
+ eversion_t get_next_version() {
+ epoch_t epoch = osdmap->get_epoch();
+ return eversion_t(epoch, next_version++);
+ }
+
+ /**
+ * Read ObjectInfo from the store for an existing object.
+ * Returns an ObjectContext with the decoded ObjectInfo, or a new
+ * ObjectContext with default values if the object doesn't exist.
+ */
+ ObjectContextRef get_object_context(
+ const hobject_t& hoid);
+
int do_transaction_and_complete(
const hobject_t& hoid,
PGTransactionUPtr pg_t,
const object_stat_sum_t& delta_stats,
const eversion_t& at_version,
- std::vector<pg_log_entry_t> log_entries);
+ std::vector<pg_log_entry_t> log_entries,
+ std::function<void(int)> on_write_complete = nullptr);
virtual int create_and_write(
const std::string& obj_name,
- const std::string& data,
- const eversion_t& at_version = eversion_t(1, 1));
+ const std::string& data);
public:
const std::string& obj_name,
uint64_t offset,
const std::string& data,
- const eversion_t& prior_version,
- const eversion_t& at_version,
uint64_t object_size);
int read_object(
bufferlist& out_data,
uint64_t object_size);
+ /**
+ * Read an object and verify that its contents match expected data.
+ *
+ * This helper function combines read_object with assertions to verify:
+ * 1. The read operation completes successfully (result >= 0)
+ * 2. The read data length matches expected length
+ * 3. The read data content matches expected content
+ *
+ * @param obj_name Name of the object to read
+ * @param expected_data Expected data content
+ * @param offset Offset to read from (default: 0)
+ * @param context_msg Optional context message to append to assertion messages
+ */
+ void verify_object(
+ const std::string& obj_name,
+ const std::string& expected_data,
+ size_t offset,
+ size_t object_size);
+
+ /**
+ * Create and write an object, then verify it was written correctly.
+ *
+ * This helper function combines create_and_write with verify_object to:
+ * 1. Create and write the object
+ * 2. Verify the write completed successfully (result == 0)
+ * 3. Read back and verify the data matches
+ *
+ * @param obj_name Name of the object to create and write
+ * @param data Data to write
+ * @param context_msg Optional context message to append to assertion messages
+ */
+ void create_and_write_verify(
+ const std::string& obj_name,
+ const std::string& data);
+
+ /**
+ * Write to an object (potentially with offset), then verify the write succeeded.
+ *
+ * This helper function combines write with verification to:
+ * 1. Write data at the specified offset
+ * 2. Verify the write completed successfully (result == 0)
+ * 3. Read back and verify the written data matches
+ *
+ * @param obj_name Name of the object to write
+ * @param offset Offset to write at
+ * @param data Data to write
+ * @param object_size Current size of the object
+ * @param context_msg Optional context message to append to assertion messages
+ * @return The result code from the write operation
+ */
+ void write_verify(
+ const std::string& obj_name,
+ size_t offset,
+ const std::string& data,
+ size_t object_size,
+ const std::string& context_msg = "");
+
/**
* Update the OSDMap and trigger backend cleanup.
*
std::shared_ptr<OSDMap> new_osdmap,
std::optional<pg_shard_t> new_primary = std::nullopt);
+ /**
+ * Clear attr_cache for all objects.
+ * Called on on_change() to invalidate cached attributes that might be stale
+ * after a peering event or OSDMap change.
+ */
+ void clear_all_attr_caches();
+
};
void SetUp() override {
PGBackendTestFixture::SetUp();
}
+
+ /**
+ * Simulate failure of multiple OSDs by marking them down in the OSDMap.
+ * This is similar to TestECFailover::simulate_osd_failure but handles
+ * multiple failures at once.
+ */
+ void simulate_multiple_osd_failures(const std::vector<int>& failed_osds) {
+ auto new_osdmap = std::make_shared<OSDMap>();
+ new_osdmap->deepish_copy_from(*osdmap);
+
+ // Build new acting set with failed OSDs replaced by CRUSH_ITEM_NONE
+ std::vector<int> new_acting;
+ int total_osds = k + m;
+
+ for (int i = 0; i < total_osds; i++) {
+ bool is_failed = std::find(failed_osds.begin(), failed_osds.end(), i) != failed_osds.end();
+ new_acting.push_back(is_failed ? CRUSH_ITEM_NONE : i);
+ }
+
+ // Get the pool to use pgtemp_primaryfirst transformation
+ const pg_pool_t* pool = new_osdmap->get_pg_pool(pgid.pool());
+ ceph_assert(pool != nullptr);
+
+ // For EC pools with optimizations, pgtemp_primaryfirst reorders the acting set
+ std::vector<int> transformed_acting = new_osdmap->pgtemp_primaryfirst(*pool, new_acting);
+
+ // Use OSDMap::Incremental to set pg_temp and mark OSDs as down
+ OSDMap::Incremental inc(new_osdmap->get_epoch() + 1);
+ inc.fsid = new_osdmap->get_fsid();
+
+ for (int failed_osd : failed_osds) {
+ inc.new_state[failed_osd] = CEPH_OSD_EXISTS; // Mark as down (exists but not UP)
+ }
+
+ // Convert to mempool vector for pg_temp
+ mempool::osdmap::vector<int> pg_temp_vec(transformed_acting.begin(), transformed_acting.end());
+ inc.new_pg_temp[pgid] = pg_temp_vec;
+
+ new_osdmap->apply_incremental(inc);
+
+ // Finalize the CRUSH map
+ new_osdmap->crush->finalize();
+
+ // Update listener shardsets to remove failed shards
+ for (int failed_osd : failed_osds) {
+ pg_shard_t failed_shard(failed_osd, shard_id_t(failed_osd));
+ for (auto& [instance_id, list] : listeners) {
+ list->shardset.erase(failed_shard);
+ list->acting_recovery_backfill_shard_id_set.erase(shard_id_t(failed_osd));
+ }
+ }
+
+ // update_osdmap will query the OSDMap to determine the primary
+ update_osdmap(new_osdmap);
+ }
};
// ---------------------------------------------------------------------------
std::string test_data(param.size, param.fill);
std::string obj_name = "test_backend_" + backend_config.label + "_" + param.label;
- // Execute create+write operation
- int result = create_and_write(obj_name, test_data);
- EXPECT_EQ(result, 0) << param.label << " write should complete successfully";
+ // Execute create+write operation and verify
+ create_and_write_verify(obj_name, test_data);
// Verify messages were sent to replicas/shards
auto* primary_listener = get_primary_listener();
primary_listener->sent_messages.clear();
primary_listener->sent_messages_with_dest.clear();
- // Perform the read operation
- bufferlist read_data;
- int read_result = read_object(
- obj_name,
- 0, // offset
- test_data.length(), // length
- read_data,
- test_data.length() // object_size
- );
-
- EXPECT_GE(read_result, 0) << param.label << " read should complete successfully";
-
- // Verify data length
- ASSERT_EQ(read_data.length(), test_data.length())
- << param.label << " read data length should match written data length";
-
- // Verify data content
- std::string read_string(read_data.c_str(), read_data.length());
- EXPECT_EQ(read_string, test_data)
- << param.label << " read data should match written data";
+ // Verify object can be read back correctly
+ verify_object(obj_name, test_data, 0, test_data.size());
// For EC backends: verify read messages were sent to shards
if (backend_config.pool_type == EC) {
// Create initial data filled with the parameterized fill character
std::string initial_data(initial_size, param.fill);
- int result = create_and_write(obj_name, initial_data, eversion_t(1, 1));
+ int result = create_and_write(obj_name, initial_data);
EXPECT_EQ(result, 0) << param.label << " initial write should complete successfully";
// Partial write data uses the next fill character (wraps around 'z' -> 'a')
obj_name,
partial_offset,
partial_data,
- eversion_t(1, 1), // prior_version
- eversion_t(1, 2), // at_version
initial_size // object_size
);
EXPECT_EQ(result, 0) << param.label << " partial write should complete successfully";
const std::string obj_name = "test_failover_object";
const std::string test_data = "Initial data before OSDMap change";
- int result = create_and_write(obj_name, test_data);
- EXPECT_EQ(result, 0) << "Initial write should complete successfully";
-
- bufferlist read_data;
- int read_result = read_object(obj_name, 0, test_data.length(), read_data, test_data.length());
- EXPECT_GE(read_result, 0) << "Read should complete successfully";
- ASSERT_EQ(read_data.length(), test_data.length());
+ // Write and verify initial data
+ create_and_write_verify(obj_name, test_data);
auto new_osdmap = std::make_shared<OSDMap>();
new_osdmap->deepish_copy_from(*osdmap);
ASSERT_TRUE(primary_listener != nullptr) << "Primary listener should exist";
EXPECT_EQ(primary_listener->osdmap, new_osdmap) << "Listener OSDMap should be updated";
- bufferlist read_data2;
- read_result = read_object(obj_name, 0, test_data.length(), read_data2, test_data.length());
- EXPECT_GE(read_result, 0) << "Read after OSDMap update should complete successfully";
- ASSERT_EQ(read_data2.length(), test_data.length());
-
- std::string read_string(read_data2.c_str(), read_data2.length());
- EXPECT_EQ(read_string, test_data) << "Data should match after OSDMap update";
+ // Verify data can still be read after OSDMap update
+ verify_object(obj_name, test_data, 0, test_data.size());
}
TEST_P(TestECFailover, PrimaryFailover) {
const std::string obj_name = "test_primary_failover";
const std::string test_data = "Data written before primary failover";
- int result = create_and_write(obj_name, test_data);
- EXPECT_EQ(result, 0) << "Initial write should complete successfully";
-
- bufferlist read_data;
- int read_result = read_object(obj_name, 0, test_data.length(), read_data, test_data.length());
- EXPECT_GE(read_result, 0) << "Read should complete successfully";
- ASSERT_EQ(read_data.length(), test_data.length());
-
- std::string read_string(read_data.c_str(), read_data.length());
- EXPECT_EQ(read_string, test_data) << "Data should match before failover";
+ // Write and verify initial data
+ create_and_write_verify(obj_name, test_data);
EXPECT_TRUE(listeners[0]->pgb_is_primary())
<< "Instance 0 should be primary before failover";
EXPECT_EQ(new_primary_backend, backends[expected_new_primary].get())
<< "get_primary_backend() should return the new primary";
- bufferlist read_data_after;
- int read_result_after = read_object(obj_name, 0, test_data.length(), read_data_after, test_data.length());
- EXPECT_GE(read_result_after, 0) << "Degraded read should complete successfully after failover";
- ASSERT_EQ(read_data_after.length(), test_data.length());
-
- std::string read_string_after(read_data_after.c_str(), read_data_after.length());
- EXPECT_EQ(read_string_after, test_data) << "Data should match after failover with EC reconstruction";
+ // Verify degraded read works after failover with EC reconstruction
+ verify_object(obj_name, test_data, 0, test_data.size());
EXPECT_TRUE(new_primary_listener != nullptr) << "Primary listener should exist after failover";
EXPECT_GT(new_primary_listener->osdmap->get_epoch(), 1)
#include <gtest/gtest.h>
#include "test/osd/ECPeeringTestFixture.h"
+#include "test/osd/TestCommon.h"
using namespace std;
-class TestECFailoverWithPeering : public ECPeeringTestFixture {
+/**
+ * TestECFailoverWithPeering - parameterized EC peering and failover tests.
+ *
+ * This fixture is parameterized over BackendConfig to test multiple EC
+ * configurations (different k/m values, stripe units, plugins, and optimizations).
+ * Only EC configurations are tested since peering and failover are EC-specific.
+ */
+class TestECFailoverWithPeering : public ECPeeringTestFixture,
+ public ::testing::WithParamInterface<BackendConfig> {
public:
TestECFailoverWithPeering() : ECPeeringTestFixture() {
- k = 4;
- m = 2;
- stripe_unit = 4096;
- ec_plugin = "isa";
- ec_technique = "reed_sol_van";
+ const auto& config = GetParam();
+ k = config.k;
+ m = config.m;
+ stripe_unit = config.stripe_unit;
+ ec_plugin = config.ec_plugin;
+ ec_technique = config.ec_technique;
+ pool_flags = config.pool_flags;
+ }
+
+ void SetUp() override {
+ ECPeeringTestFixture::SetUp();
}
};
-TEST_F(TestECFailoverWithPeering, BasicPeeringCycle) {
- run_peering_cycle();
-
- EXPECT_TRUE(all_shards_active()) << "All shards should be active after peering";
-
- // Note: In EC pools, only the primary tracks PG_STATE_CLEAN.
- // Replicas are in ReplicaActive state and don't set the CLEAN flag.
- // Get acting_primary from OSDMap
+TEST_P(TestECFailoverWithPeering, BasicPeeringCycle) {
pg_t pgid = get_peering_state(0)->get_info().pgid.pgid;
std::vector<int> acting_osds;
int acting_primary = -1;
}
}
-TEST_F(TestECFailoverWithPeering, WriteWithPeering) {
- run_peering_cycle();
- ASSERT_TRUE(all_shards_active()) << "Peering must complete before write";
-
+TEST_P(TestECFailoverWithPeering, WriteWithPeering) {
+
const std::string obj_name = "test_write_with_peering";
const std::string test_data = "Data written with full peering support";
- int result = create_and_write(obj_name, test_data);
- EXPECT_EQ(result, 0) << "Write should complete successfully";
-
- bufferlist read_data;
- int read_result = read_object(obj_name, 0, test_data.length(), read_data, test_data.length());
- EXPECT_GE(read_result, 0) << "Read should complete successfully";
- ASSERT_EQ(read_data.length(), test_data.length());
-
- std::string read_string(read_data.c_str(), read_data.length());
- EXPECT_EQ(read_string, test_data) << "Data should match";
-
+ create_and_write_verify(obj_name, test_data);
+
auto* primary_ps = get_peering_state(0);
EXPECT_GT(primary_ps->get_pg_log().get_log().log.size(), 0)
<< "Primary should have log entries after write";
}
-TEST_F(TestECFailoverWithPeering, OSDFailureWithPeering) {
- run_peering_cycle();
+TEST_P(TestECFailoverWithPeering, OSDFailureWithPeering) {
ASSERT_TRUE(all_shards_active()) << "Initial peering must complete";
const std::string obj_name = "test_osd_failure";
- // Write 16KB but read only 8KB to force reconstruction when shard 1 is down
- const std::string test_data(16384, 'X'); // 16KB write
- const size_t read_length = 8192; // 8KB read
-
- int result = create_and_write(obj_name, test_data);
- EXPECT_EQ(result, 0) << "Initial write should complete";
-
- // Pre-failover read: measure baseline message count with all OSDs up
- // Clear message counters first
- for (auto& [shard, listener] : backend_listeners) {
- listener->sent_messages.clear();
- }
-
+ uint64_t object_size = k * stripe_unit;
+ const std::string test_data_full(object_size, 'X');
+ const size_t read_length = 2 * stripe_unit;
+ const std::string test_data_read(read_length, 'X');
+ int failed_osd = 1; // Fail shard 1 which contains part of the data
+
+ create_and_write_verify(obj_name, test_data_full);
+ event_loop->reset_stats();
bufferlist pre_failover_read;
- int pre_read_result = read_object(obj_name, 0, read_length,
- pre_failover_read, test_data.length());
- EXPECT_GE(pre_read_result, 0) << "Pre-failover read should complete";
-
- // Count messages sent during pre-failover read
- size_t pre_failover_msg_count = 0;
- for (auto& [shard, listener] : backend_listeners) {
- pre_failover_msg_count += listener->sent_messages.size();
- }
+ verify_object(obj_name, test_data_read, 0, object_size);
+ EXPECT_EQ(4, event_loop->get_stats_by_type().at(EventLoop::EventType::OSD_MESSAGE));
- int failed_osd = 1; // Fail shard 1 which contains part of the data
-
// Use fixture helper to mark OSD as down
mark_osd_down(failed_osd);
- // Primary (OSD 0) should remain active after non-primary OSD failure
- auto* primary_ps = get_peering_state(0);
- std::string primary_state = get_state_name(0);
- EXPECT_TRUE(primary_state.find("Peering") != std::string::npos ||
- primary_state.find("Active") != std::string::npos)
- << "Primary should be peering or active after OSD failure, got: " << primary_state;
-
- EXPECT_TRUE(primary_ps->get_acting_recovery_backfill().count(pg_shard_t(failed_osd, shard_id_t(failed_osd))) == 0)
- << "Failed OSD should not be in acting set";
-
- // Clear message counters before post-failover read
- for (auto& [shard, listener] : backend_listeners) {
- listener->sent_messages.clear();
- }
-
- // Post-failover read: verify EC reconstruction works with one OSD down
- bufferlist post_failover_read;
- int post_read_result = read_object(obj_name, 0, read_length,
- post_failover_read, test_data.length());
- EXPECT_GE(post_read_result, 0) << "Read should complete successfully after OSD failure";
- ASSERT_EQ(post_failover_read.length(), read_length)
- << "Read length should match after OSD failure";
-
- std::string read_string(post_failover_read.c_str(), post_failover_read.length());
- std::string expected_data(read_length, 'X');
- EXPECT_EQ(read_string, expected_data)
- << "Data should be correctly reconstructed via EC after OSD failure";
-
- // Count messages sent during post-failover read
- size_t post_failover_msg_count = 0;
- for (auto& [shard, listener] : backend_listeners) {
- post_failover_msg_count += listener->sent_messages.size();
- }
-
- // This is an 8k read of a 16k object in a 4+2 array. This means that if shard 1
- // is missing, then this should result in 4 reads, rather than 2 to recover.
- EXPECT_GT(post_failover_msg_count, pre_failover_msg_count)
- << "Post-failover read should complete successfully "
- << "(pre: " << pre_failover_msg_count << ", post: " << post_failover_msg_count << ")";
+ // Reset EventLoop stats before post-failover read
+ event_loop->reset_stats();
+ verify_object(obj_name, test_data_read, 0, object_size);
+ EXPECT_EQ(k * 2, event_loop->get_stats_by_type().at(EventLoop::EventType::OSD_MESSAGE));
}
-TEST_F(TestECFailoverWithPeering, PrimaryFailoverWithPeering) {
- run_peering_cycle();
+TEST_P(TestECFailoverWithPeering, PrimaryFailoverWithPeering) {
ASSERT_TRUE(all_shards_active()) << "Initial peering must complete";
const std::string obj_name = "test_primary_failover";
const std::string test_data = "Data before primary failover";
- int result = create_and_write(obj_name, test_data);
- EXPECT_EQ(result, 0) << "Initial write should complete";
-
- EXPECT_TRUE(get_peering_listener(0)->backend_listener->pgb_is_primary())
- << "Shard 0 should be primary initially";
+ create_and_write_verify(obj_name, test_data);
// Mark OSD 0 (the initial primary) as down
// PeeringState will automatically determine the new primary
<< "New primary should be in Active state";
// Verify reads work after primary failover (with EC reconstruction)
- bufferlist read_data;
- int read_result = read_object(obj_name, 0, test_data.length(),
- read_data, test_data.length());
- EXPECT_GE(read_result, 0) << "Read should complete successfully after primary failover";
- ASSERT_EQ(read_data.length(), test_data.length())
- << "Read length should match after primary failover";
-
- std::string read_string(read_data.c_str(), read_data.length());
- EXPECT_EQ(read_string, test_data)
- << "Data should be correctly reconstructed via EC after primary failover";
+ verify_object(obj_name, test_data, 0, test_data.length());
}
-TEST_F(TestECFailoverWithPeering, MultipleOSDFailuresWithPeering) {
- run_peering_cycle();
+TEST_P(TestECFailoverWithPeering, MultipleOSDFailuresWithPeering) {
+ // This test only runs for configurations with m=2
+ if (m != 2) {
+ GTEST_SKIP() << "MultipleOSDFailuresWithPeering only runs for m=2";
+ }
+
ASSERT_TRUE(all_shards_active()) << "Initial peering must complete";
const std::string obj_name = "test_multiple_failures";
const std::string test_data = "Data before multiple failures";
- int result = create_and_write(obj_name, test_data);
- EXPECT_EQ(result, 0) << "Initial write should complete";
+ create_and_write_verify(obj_name, test_data);
std::vector<int> failed_osds = {1, 2}; // Fail 2 data shards
ASSERT_EQ(failed_osds.size(), static_cast<size_t>(m))
<< "Primary should be operational, got: " << primary_state;
}
-TEST_F(TestECFailoverWithPeering, PeeringWithLogDivergence) {
- run_peering_cycle();
- ASSERT_TRUE(all_shards_active()) << "Initial peering must complete";
-
- const std::string pre_div_obj = "test_pre_divergence";
- const std::string pre_div_data = "Data written before divergence";
-
- int result = create_and_write(pre_div_obj, pre_div_data, eversion_t(1, 1));
- EXPECT_EQ(result, 0) << "Pre-divergence write should complete";
-
- auto* primary_ps = get_peering_state(0);
- size_t initial_log_size = primary_ps->get_pg_log().get_log().log.size();
- EXPECT_GT(initial_log_size, 0) << "Primary should have log entries after pre-divergence write";
-
- // Note: get_pg_log().get_log().head reflects the log entries added via append_log
- eversion_t pre_div_log_head = primary_ps->get_pg_log().get_log().head;
- EXPECT_GT(pre_div_log_head.version, 0u) << "PG log head should be non-zero after write";
-
- const std::string post_div_obj = "test_post_divergence";
- const std::string post_div_data = "Data written after divergence point";
-
- result = create_and_write(post_div_obj, post_div_data, eversion_t(1, 2));
- EXPECT_EQ(result, 0) << "Post-divergence write should complete";
-
- eversion_t post_div_log_head = primary_ps->get_pg_log().get_log().head;
- EXPECT_GT(post_div_log_head.version, pre_div_log_head.version)
- << "PG log head should advance after post-divergence write";
-
- size_t log_size_after_writes = primary_ps->get_pg_log().get_log().log.size();
- EXPECT_GE(log_size_after_writes, initial_log_size)
- << "Primary log should have at least as many entries after second write";
-
- // Trigger a new peering cycle by advancing the map to simulate re-peering
- // after a shard had a divergent log.
- advance_epoch();
-
- std::string primary_state = get_state_name(0);
- ASSERT_TRUE(all_shards_active() ||
- primary_state.find("Recovery") != std::string::npos ||
- primary_state.find("Peering") != std::string::npos)
- << "Shards should be active, recovering, or peering after map advance, got: "
- << primary_state;
-
- // --- Verify pre-divergence data is readable and correct ---
- bufferlist pre_div_read;
- int read_result = read_object(pre_div_obj, 0, pre_div_data.length(),
- pre_div_read, pre_div_data.length());
- EXPECT_GE(read_result, 0) << "Pre-divergence object should be readable after reconciliation";
- ASSERT_EQ(pre_div_read.length(), pre_div_data.length())
- << "Pre-divergence read length should match";
- {
- std::string read_str(pre_div_read.c_str(), pre_div_read.length());
- EXPECT_EQ(read_str, pre_div_data)
- << "Pre-divergence data should match after log reconciliation";
- }
-
- // --- Verify post-divergence data is readable and correct ---
- bufferlist post_div_read;
- read_result = read_object(post_div_obj, 0, post_div_data.length(),
- post_div_read, post_div_data.length());
- EXPECT_GE(read_result, 0) << "Post-divergence object should be readable after reconciliation";
- ASSERT_EQ(post_div_read.length(), post_div_data.length())
- << "Post-divergence read length should match";
- {
- std::string read_str(post_div_read.c_str(), post_div_read.length());
- EXPECT_EQ(read_str, post_div_data)
- << "Post-divergence data should match after log reconciliation";
- }
-
- // After peering, the primary's PG log head should reflect all writes.
- eversion_t primary_log_head = primary_ps->get_pg_log().get_log().head;
- EXPECT_EQ(primary_log_head, post_div_log_head)
- << "Primary PG log head should reflect all writes after reconciliation";
-
- pg_t pgid = get_peering_state(0)->get_info().pgid.pgid;
- std::vector<int> acting_osds;
- int acting_primary = -1;
- osdmap->pg_to_acting_osds(pgid, &acting_osds, &acting_primary);
-
- for (int shard : acting_osds) {
- if (shard == CRUSH_ITEM_NONE) {
- continue;
- }
- auto* shard_ps = get_peering_state(shard);
- if (shard_ps->is_active()) {
- eversion_t shard_info_last_update = shard_ps->get_info().last_update;
- if (shard == acting_primary) {
- EXPECT_EQ(shard_info_last_update, post_div_log_head)
- << "Primary shard info.last_update should match post-divergence log head";
- } else {
- EXPECT_LE(shard_info_last_update, post_div_log_head)
- << "Shard " << shard << " info.last_update should not exceed primary's log head";
- }
- }
- }
-
- // Verify the formerly-failed shard's PG log is accessible and consistent.
- // We use the last data shard (k-1) as the "formerly-failed" shard to check.
- int reconciled_shard = k - 1;
- if (reconciled_shard >= 0 && reconciled_shard < k + m) {
- auto* reconciled_ps = get_peering_state(reconciled_shard);
- size_t reconciled_log_size = reconciled_ps->get_pg_log().get_log().log.size();
- auto* primary_ps_check = get_peering_state(acting_primary);
- size_t primary_log_size = primary_ps_check->get_pg_log().get_log().log.size();
- EXPECT_LE(reconciled_log_size, primary_log_size)
- << "Reconciled shard " << reconciled_shard
- << " log size should not exceed primary's log size";
-
- if (reconciled_ps->is_active()) {
- eversion_t reconciled_info_lu = reconciled_ps->get_info().last_update;
- EXPECT_LE(reconciled_info_lu, post_div_log_head)
- << "Reconciled shard " << reconciled_shard
- << " info.last_update should not exceed primary's log head after log reconciliation";
- }
- }
-}
-
-TEST_F(TestECFailoverWithPeering, RecoveryWithPeering) {
- run_peering_cycle();
+TEST_P(TestECFailoverWithPeering, RecoveryWithPeering) {
ASSERT_TRUE(all_shards_active()) << "Initial peering must complete";
const std::string obj1_name = "test_recovery_obj1";
const std::string obj2_name = "test_recovery_obj2";
const std::string obj2_data = "Second object data for recovery test";
- int result = create_and_write(obj1_name, obj1_data, eversion_t(1, 1));
+ int result = create_and_write(obj1_name, obj1_data);
EXPECT_EQ(result, 0) << "First pre-failure write should complete";
- result = create_and_write(obj2_name, obj2_data, eversion_t(1, 2));
+ result = create_and_write(obj2_name, obj2_data);
EXPECT_EQ(result, 0) << "Second pre-failure write should complete";
EXPECT_TRUE(all_shards_clean()) << "All shards should be clean before recovery test";
const std::string post_recovery_obj = "test_post_recovery";
const std::string post_recovery_data = "Data written after OSD failure and recovery";
- result = create_and_write(post_recovery_obj, post_recovery_data, eversion_t(1, 3));
+ result = create_and_write(post_recovery_obj, post_recovery_data);
EXPECT_EQ(result, 0) << "Write after OSD failure should complete successfully";
bufferlist post_recovery_read;
<< "on_activate_complete should have been called during peering";
}
+// ---------------------------------------------------------------------------
+// EC backend configurations for parameterized tests
+// ---------------------------------------------------------------------------
+
+namespace {
+
+/**
+ * EC-only backend configurations for TestECFailoverWithPeering.
+ * These configurations test various EC parameters:
+ * - Different k/m ratios (2+1, 4+2, 8+3)
+ * - Different stripe units (4k, 8k, 16k)
+ * - Different plugins (isa, jerasure)
+ * - Optimized vs non-optimized EC
+ * - Multi-zone configurations
+ */
+const std::vector<BackendConfig> kECPeeringConfigs = {
+ // ISA plugin with optimizations (modern EC)
+ {PGBackendTestFixture::EC, "isa", "reed_sol_van", pg_pool_t::FLAG_EC_OVERWRITES | pg_pool_t::FLAG_EC_OPTIMIZATIONS, 4096, 4, 2, "EC_ISA_Opt_k4m2_su4k"},
+ {PGBackendTestFixture::EC, "isa", "reed_sol_van", pg_pool_t::FLAG_EC_OVERWRITES | pg_pool_t::FLAG_EC_OPTIMIZATIONS, 8192, 4, 2, "EC_ISA_Opt_k4m2_su8k"},
+ {PGBackendTestFixture::EC, "isa", "reed_sol_van", pg_pool_t::FLAG_EC_OVERWRITES | pg_pool_t::FLAG_EC_OPTIMIZATIONS, 16384, 4, 2, "EC_ISA_Opt_k4m2_su16k"},
+ {PGBackendTestFixture::EC, "isa", "reed_sol_van", pg_pool_t::FLAG_EC_OVERWRITES | pg_pool_t::FLAG_EC_OPTIMIZATIONS, 4096, 2, 1, "EC_ISA_Opt_k2m1_su4k"},
+ {PGBackendTestFixture::EC, "isa", "reed_sol_van", pg_pool_t::FLAG_EC_OVERWRITES | pg_pool_t::FLAG_EC_OPTIMIZATIONS, 4096, 8, 3, "EC_ISA_Opt_k8m3_su4k"},
+
+ // Jerasure plugin with optimizations (modern EC)
+ {PGBackendTestFixture::EC, "jerasure", "reed_sol_van", pg_pool_t::FLAG_EC_OVERWRITES | pg_pool_t::FLAG_EC_OPTIMIZATIONS, 4096, 4, 2, "EC_Jerasure_Opt_k4m2_su4k"},
+ {PGBackendTestFixture::EC, "jerasure", "reed_sol_van", pg_pool_t::FLAG_EC_OVERWRITES | pg_pool_t::FLAG_EC_OPTIMIZATIONS, 8192, 4, 2, "EC_Jerasure_Opt_k4m2_su8k"},
+ {PGBackendTestFixture::EC, "jerasure", "reed_sol_van", pg_pool_t::FLAG_EC_OVERWRITES | pg_pool_t::FLAG_EC_OPTIMIZATIONS, 16384, 4, 2, "EC_Jerasure_Opt_k4m2_su16k"},
+ {PGBackendTestFixture::EC, "jerasure", "reed_sol_van", pg_pool_t::FLAG_EC_OVERWRITES | pg_pool_t::FLAG_EC_OPTIMIZATIONS, 4096, 2, 1, "EC_Jerasure_Opt_k2m1_su4k"},
+ {PGBackendTestFixture::EC, "jerasure", "reed_sol_van", pg_pool_t::FLAG_EC_OVERWRITES | pg_pool_t::FLAG_EC_OPTIMIZATIONS, 4096, 8, 3, "EC_Jerasure_Opt_k8m3_su4k"},
+};
+
+} // namespace
+
+/**
+ * Test OSD failure and recovery with peering.
+ *
+ * This test simulates the following scenario:
+ * 1. Write full stripe with pattern A (committed to all shards)
+ * 2. Write full stripe with pattern B (committed to all shards)
+ * 3. Mark OSD 5 as down (forcing peering)
+ * 4. Trigger peering - PG should remain active/recovering
+ * 5. Read data back - should get pattern B (latest write)
+ *
+ * This verifies that the test infrastructure properly handles OSD failures
+ * and peering without leaving OSDs in a suspended state that would block
+ * teardown.
+ */
+TEST_P(
+ TestECFailoverWithPeering,
+ RollbackAfterOSDFailure
+) {
+ // GTEST_SKIP(); // Temporary
+ int failing_shard = k + m - 1;
+ int blocked_shard = 1;
+ const std::string obj_name = "test";
+ const size_t data_size = stripe_unit * k; // One full stripe.
+ std::string pattern_a(data_size, 'A');
+ std::string pattern_b(data_size, 'B');
+ std::string pattern_c(data_size, 'C');
+
+ ASSERT_TRUE(all_shards_active()) << "Initial peering must complete";
+
+ create_and_write_verify(obj_name, pattern_a);
+ suspend_primary_to_osd(blocked_shard);
+ int result = write(obj_name, 0, pattern_b, data_size);
+ ASSERT_EQ(-EINPROGRESS, result);
+ result = write(obj_name, 0, pattern_c, data_size);
+ ASSERT_EQ(-EINPROGRESS, result);
+ mark_osd_down(failing_shard);
+ unsuspend_primary_to_osd(blocked_shard);
+ event_loop->run_until_idle();
+
+ // Ensure all shards have completed peering and applied rollback transactions
+ ASSERT_TRUE(all_shards_active()) << "All shards should be active after peering";
+
+ verify_object(obj_name, pattern_a, 0, pattern_a.length());
+
+ std::cout << "\n=== RollbackAfterOSDFailure Test Complete ===" << std::endl;
+}
+
+
+// ---------------------------------------------------------------------------
+// Instantiate TestECFailoverWithPeering with EC configurations
+// ---------------------------------------------------------------------------
+
+INSTANTIATE_TEST_SUITE_P(
+ ECConfigs,
+ TestECFailoverWithPeering,
+ ::testing::ValuesIn(kECPeeringConfigs),
+ [](const ::testing::TestParamInfo<BackendConfig>& info) {
+ return info.param.label;
+ }
+);
+