From: Alex Ainscow Date: Sun, 29 Mar 2026 20:55:00 +0000 (+0100) Subject: Updates for review X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=dba51e2ad35dfb4738791d7079df2487ea3f60d9;p=ceph.git Updates for review Signed-off-by: Alex Ainscow --- diff --git a/cmake/modules/AddCephTest.cmake b/cmake/modules/AddCephTest.cmake index 65aafaf18839..cd8cc90676b0 100644 --- a/cmake/modules/AddCephTest.cmake +++ b/cmake/modules/AddCephTest.cmake @@ -3,6 +3,7 @@ #adds makes target/script into a test, test to check target, sets necessary environment variables function(add_ceph_test test_name test_path) add_test(NAME ${test_name} COMMAND ${test_path} ${ARGN} + WORKING_DIRECTORY ${CMAKE_BINARY_DIR} COMMAND_EXPAND_LISTS) if(TARGET ${test_name}) add_dependencies(tests ${test_name}) diff --git a/src/test/osd/CMakeLists.txt b/src/test/osd/CMakeLists.txt index 0b7f7c7dba0a..b6eb90c0007a 100644 --- a/src/test/osd/CMakeLists.txt +++ b/src/test/osd/CMakeLists.txt @@ -114,6 +114,8 @@ target_link_libraries(unittest_pglog osd os global ${CMAKE_DL_LIBS} ${BLKID_LIBR # unittest_peeringstate add_executable(unittest_peeringstate TestPeeringState.cc + $ + $ ) add_ceph_unittest(unittest_peeringstate) target_link_libraries(unittest_peeringstate osd os global ${CMAKE_DL_LIBS} ${BLKID_LIBRARIES}) @@ -121,13 +123,14 @@ target_link_libraries(unittest_peeringstate osd os global ${CMAKE_DL_LIBS} ${BLK add_library(pg_backend_test_fixture OBJECT PGBackendTestFixture.cc ) -target_link_libraries(pg_backend_test_fixture osd os global) +target_link_libraries(pg_backend_test_fixture osd os global GTest::GTest) # ec_peering_test_fixture: object library for ECPeeringTestFixture implementation add_library(ec_peering_test_fixture OBJECT ECPeeringTestFixture.cc + MockPeeringListener.cc ) -target_link_libraries(ec_peering_test_fixture osd os global) +target_link_libraries(ec_peering_test_fixture osd os global GTest::GTest) # unittest_backend_basics (replaces unittest_ecbasics + unittest_replicatedbasics) add_executable(unittest_backend_basics @@ -148,7 +151,7 @@ add_executable(unittest_ecfailover_with_peering ) add_ceph_unittest(unittest_ecfailover_with_peering) target_link_libraries(unittest_ecfailover_with_peering osd os global ${CMAKE_DL_LIBS} ${BLKID_LIBRARIES}) -add_dependencies(unittest_ecfailover_with_peering ec_isa) +add_dependencies(unittest_ecfailover_with_peering ec_isa ec_jerasure) # unittest_hitset add_executable(unittest_hitset hitset.cc diff --git a/src/test/osd/ECPeeringTestFixture.cc b/src/test/osd/ECPeeringTestFixture.cc index 25f031dff6a7..8b96fa78bb01 100644 --- a/src/test/osd/ECPeeringTestFixture.cc +++ b/src/test/osd/ECPeeringTestFixture.cc @@ -14,6 +14,313 @@ */ #include "test/osd/ECPeeringTestFixture.h" +#include "test/osd/MockECRecPred.h" +#include "test/osd/MockECReadPred.h" + +// ShardDpp implementation +std::ostream& ECPeeringTestFixture::ShardDpp::gen_prefix(std::ostream& out) const { + out << "shard " << shard << ": "; + if (fixture->shard_peering_states.contains(shard)) { + PeeringState *ps = fixture->shard_peering_states[shard].get(); + out << *ps << " "; + } + return out; +} + +IsPGRecoverablePredicate* ECPeeringTestFixture::get_is_recoverable_predicate() { + return new MockECRecPred(k, m); +} + +IsPGReadablePredicate* ECPeeringTestFixture::get_is_readable_predicate() { + return new MockECReadPred(k, m); +} + +ECPeeringTestFixture::ECPeeringTestFixture() + : PGBackendTestFixture(PGBackendTestFixture::EC) { +} + +void ECPeeringTestFixture::SetUp() { + PGBackendTestFixture::SetUp(); + for (int i = 0; i < k + m; i++) { + create_peering_state(i); + } + + // Override epoch getter to use shard_peering_listeners instead of base class listeners + // (which are moved into shard_peering_listeners during create_peering_state) + messenger->set_epoch_getter([this](int osd) -> epoch_t { + auto it = shard_peering_listeners.find(osd); + if (it != shard_peering_listeners.end()) { + return it->second->get_osdmap_epoch(); + } + // Fallback to test fixture's osdmap + return osdmap->get_epoch(); + }); + + // Register handlers for peering messages (MOSDPeeringOp) + // All peering messages (Query, Notify, Info, Log) use the same handler pattern + // since they all inherit from MOSDPeeringOp and use get_event() + auto peering_handler = [this](int from_osd, int to_osd, MOSDPeeringOp* op) -> bool { + // Message is already correctly typed as MOSDPeeringOp* + ceph_assert(op); + + // Get the peering event from the message + PGPeeringEventRef evt_ref(op->get_event()); + + // Handle the event on the destination shard's peering state + PeeringCtx* ctx = get_peering_ctx(to_osd); + auto ps = get_peering_state(to_osd); + ps->handle_event(evt_ref, ctx); + + auto t = ctx->transaction.claim_and_reset(); + int r = queue_transaction_helper(to_osd, std::move(t)); + ceph_assert( r >= 0 ); + return true; // Message handled + }; + + // Register the same handler for all peering message types + messenger->register_typed_handler(MSG_OSD_PG_QUERY2, peering_handler); + messenger->register_typed_handler(MSG_OSD_PG_NOTIFY2, peering_handler); + messenger->register_typed_handler(MSG_OSD_PG_INFO2, peering_handler); + messenger->register_typed_handler(MSG_OSD_PG_LOG, peering_handler); + messenger->register_typed_handler(MSG_OSD_PG_LEASE, peering_handler); + messenger->register_typed_handler(MSG_OSD_PG_LEASE_ACK, peering_handler); + + // Register idle callback to check for buffered messages + event_loop->register_idle_callback([this]() -> bool { + bool found_messages = false; + // Check all PeeringCtx objects for buffered messages + for (auto& [osd, ctx] : shard_peering_ctxs) { + if (!ctx->message_map.empty()) { + dispatch_buffered_messages(osd, ctx.get()); + found_messages = true; + } + } + return found_messages; + }); + + // Run initial peering cycle to get all shards to active state + run_first_peering(); +} + +void ECPeeringTestFixture::TearDown() { + shard_peering_states.clear(); + shard_peering_ctxs.clear(); + shard_peering_listeners.clear(); + shard_dpps.clear(); + PGBackendTestFixture::TearDown(); +} + +PeeringState* ECPeeringTestFixture::get_peering_state(int shard) { + ceph_assert(shard >= 0 && shard < k + m); + auto it = shard_peering_states.find(shard); + ceph_assert(it != shard_peering_states.end()); + ceph_assert(it->second != nullptr); + return it->second.get(); +} + +PeeringCtx* ECPeeringTestFixture::get_peering_ctx(int shard) { + ceph_assert(shard >= 0 && shard < k + m); + auto it = shard_peering_ctxs.find(shard); + ceph_assert(it != shard_peering_ctxs.end()); + ceph_assert(it->second != nullptr); + return it->second.get(); +} + +MockPeeringListener* ECPeeringTestFixture::get_peering_listener(int shard) { + ceph_assert(shard >= 0 && shard < k + m); + auto it = shard_peering_listeners.find(shard); + ceph_assert(it != shard_peering_listeners.end()); + ceph_assert(it->second != nullptr); + return it->second.get(); +} + +int ECPeeringTestFixture::get_primary_shard_from_osdmap() const { + std::vector acting_osds; + int acting_primary = -1; + osdmap->pg_to_acting_osds(this->pgid, &acting_osds, &acting_primary); + return acting_primary; +} + +MockPGBackendListener* ECPeeringTestFixture::get_primary_listener() { + int primary_shard = get_primary_shard_from_osdmap(); + if (primary_shard < 0) { + return nullptr; + } + + auto it = shard_peering_listeners.find(primary_shard); + if (it != shard_peering_listeners.end() && it->second && + it->second->backend_listener) { + // Assert that the backend listener agrees it's primary + ceph_assert(it->second->backend_listener->pgb_is_primary()); + return it->second->backend_listener.get(); + } + return nullptr; +} + +PGBackend* ECPeeringTestFixture::get_primary_backend() { + int primary_shard = get_primary_shard_from_osdmap(); + if (primary_shard < 0) { + return nullptr; + } + + auto listener_it = shard_peering_listeners.find(primary_shard); + if (listener_it != shard_peering_listeners.end() && listener_it->second && + listener_it->second->backend_listener) { + // Assert that the backend listener agrees it's primary + ceph_assert(listener_it->second->backend_listener->pgb_is_primary()); + + // Return the backend from the base class's backends map, not from + // the peering listener, because the base class backend is connected + // to the event loop and message routers + auto backend_it = backends.find(primary_shard); + return (backend_it != backends.end()) ? backend_it->second.get() : nullptr; + } + return nullptr; +} + +void ECPeeringTestFixture::event_initialize() { + // Get acting set from OSDMap + std::vector acting_osds; + int acting_primary = -1; + osdmap->pg_to_acting_osds(this->pgid, &acting_osds, &acting_primary); + + for (int shard : acting_osds) { + // Skip failed OSDs (marked as CRUSH_ITEM_NONE) + if (shard == CRUSH_ITEM_NONE) { + continue; + } + auto evt = std::make_shared( + osdmap->get_epoch(), + osdmap->get_epoch(), + PeeringState::Initialize()); + + get_peering_state(shard)->handle_event(evt, get_peering_ctx(shard)); + } + event_loop->run_until_idle(); +} + +void ECPeeringTestFixture::event_advance_map() { + // Capture the current osdmap and pgid for use in the lambda + OSDMapRef current_osdmap = osdmap; + pg_t current_pgid = this->pgid; + + // Schedule advance_map events for each shard instead of running directly + for (auto& [shard, ctx] : shard_peering_ctxs) { + PeeringState* ps = shard_peering_states.at(shard).get(); + OSDMapRef lastmap = ps->get_osdmap(); + PeeringCtx* peering_ctx = ctx.get(); + + event_loop->schedule_peering_event(shard, [ps, current_osdmap, lastmap, current_pgid, peering_ctx]() { + // Get up/acting sets from OSDMap inside the lambda + std::vector up_osds, acting_osds; + int up_primary = -1, acting_primary = -1; + current_osdmap->pg_to_up_acting_osds(current_pgid, &up_osds, &up_primary, &acting_osds, &acting_primary); + + ps->advance_map( + current_osdmap, lastmap, up_osds, up_primary, acting_osds, acting_primary, + *peering_ctx); + }); + } + event_loop->run_until_idle(); +} + +void ECPeeringTestFixture::event_activate_map() { + // Schedule activate_map events for each shard instead of running directly + for (auto& [shard, ctx] : shard_peering_ctxs) { + PeeringState* ps = shard_peering_states.at(shard).get(); + PeeringCtx* peering_ctx = ctx.get(); + + event_loop->schedule_peering_event(shard, [ps, peering_ctx]() { + ps->activate_map(*peering_ctx); + }); + } + event_loop->run_until_idle(); +} + +void ECPeeringTestFixture::dispatch_buffered_messages(int from_shard, PeeringCtx* ctx) { + ceph_assert(messenger); + ceph_assert(ctx); + + // Check if there are any buffered messages in the context + for (auto& [target_osd, msg_list] : ctx->message_map) { + for (auto& msg : msg_list) { + // Route the message through the messenger + // msg is a MessageRef (boost::intrusive_ptr), need to get raw pointer + // MockMessenger will set the connection when it processes the message + messenger->send_message(from_shard, target_osd, msg.get()); + } + msg_list.clear(); + } + ctx->message_map.clear(); +} + +bool ECPeeringTestFixture::all_shards_active() { + // Get acting set from OSDMap + std::vector acting_osds; + int acting_primary = -1; + osdmap->pg_to_acting_osds(this->pgid, &acting_osds, &acting_primary); + + for (int shard : acting_osds) { + // Skip failed OSDs (marked as CRUSH_ITEM_NONE) + if (shard == CRUSH_ITEM_NONE) { + continue; + } + if (!get_peering_state(shard)->is_active()) { + return false; + } + } + return true; +} + +bool ECPeeringTestFixture::all_shards_clean() { + // Get primary from OSDMap + std::vector acting_osds; + int acting_primary = -1; + osdmap->pg_to_acting_osds(this->pgid, &acting_osds, &acting_primary); + + if (acting_primary >= 0 && acting_primary != CRUSH_ITEM_NONE) { + return get_peering_state(acting_primary)->is_clean(); + } + return false; +} + +std::string ECPeeringTestFixture::get_state_name(int shard) { + return get_peering_state(shard)->get_current_state(); +} + +void ECPeeringTestFixture::suspend_osd(int osd) { + if (event_loop) { + event_loop->suspend_to_osd(osd); + } +} + +void ECPeeringTestFixture::unsuspend_osd(int osd) { + if (event_loop) { + event_loop->unsuspend_to_osd(osd); + } +} + +bool ECPeeringTestFixture::is_osd_suspended(int osd) { + return event_loop && event_loop->is_to_osd_suspended(osd); +} + +void ECPeeringTestFixture::suspend_primary_to_osd(int to_osd) { + if (event_loop) { + int primary = get_primary_shard_from_osdmap(); + if (primary >= 0) { + event_loop->suspend_from_to_osd(primary, to_osd); + } + } +} + +void ECPeeringTestFixture::unsuspend_primary_to_osd(int to_osd) { + if (event_loop) { + int primary = get_primary_shard_from_osdmap(); + if (primary >= 0) { + event_loop->unsuspend_from_to_osd(primary, to_osd); + } + } +} PeeringState* ECPeeringTestFixture::create_peering_state(int shard) { @@ -23,36 +330,25 @@ PeeringState* ECPeeringTestFixture::create_peering_state(int shard) shard_dpps[shard] = std::make_unique(g_ceph_context, this, shard); + // Construct MockPeeringListener, transferring ownership of the backend + // listener created by setup_ec_pool() directly. No throw-away construction. shard_peering_listeners[shard] = std::make_unique( - osdmap, pool_id, shard_dpps[shard].get(), pg_whoami); - shard_peering_listeners[shard]->current_epoch = osdmap->get_epoch(); - - shard_peering_listeners[shard]->queue_transaction_callback = + osdmap, pool_id, shard_dpps[shard].get(), pg_whoami, + std::move(listeners[shard]), + store.get(), colls[shard], chs[shard]); + + auto& pl = shard_peering_listeners[shard]; + pl->current_epoch = osdmap->get_epoch(); + pl->set_messenger(messenger.get()); + pl->set_event_loop(event_loop.get()); + pl->set_fixture(this); + pl->backend_listener->set_messenger(messenger.get()); + + pl->queue_transaction_callback = [this, shard](ObjectStore::Transaction&& t) -> int { return queue_transaction_helper(shard, std::move(t)); }; - // Transfer ownership of the backend listener from the base class listeners[] - // map into the peering listener. The factory (set in our constructor) already - // recorded a raw pointer in backend_listeners[] so we know which entry to move. - // After the move, listeners[shard] holds a null unique_ptr; TearDown() already - // guards against that with "if (list)". - shard_peering_listeners[shard]->backend_listener = std::move(listeners[shard]); - shard_peering_listeners[shard]->coll = colls[shard]; - shard_peering_listeners[shard]->ch = chs[shard]; - - // Recreate backend with the correct backend_listener pointer. - // The MockPeeringListener constructor created backend with the temporary - // backend_listener it allocated internally, but we just replaced backend_listener - // with the one from the base class listeners[] map. We must recreate backend - // so its parent pointer points to the new backend_listener, not the destroyed one. - shard_peering_listeners[shard]->backend = std::make_unique( - g_ceph_context, - shard_peering_listeners[shard]->backend_listener.get(), - nullptr, - colls[shard], - chs[shard]); - spg_t spgid(pgid, shard_id_t(shard)); auto ps = std::make_unique( g_ceph_context, @@ -62,16 +358,16 @@ PeeringState* ECPeeringTestFixture::create_peering_state(int shard) osdmap, PG_FEATURE_CLASSIC_ALL, shard_dpps[shard].get(), - shard_peering_listeners[shard].get()); + pl.get()); + + pl->ps = ps.get(); - shard_peering_listeners[shard]->ps = ps.get(); - ps->set_backend_predicates( get_is_readable_predicate(), get_is_recoverable_predicate()); shard_peering_states[shard] = std::move(ps); - shard_peering_listeners[shard]->backend_listener->set_peering_state(shard_peering_states[shard].get()); + pl->backend_listener->set_peering_state(shard_peering_states[shard].get()); shard_peering_ctxs[shard] = std::make_unique(); return shard_peering_states[shard].get(); @@ -116,41 +412,17 @@ void ECPeeringTestFixture::update_osdmap_with_peering( OSDMapRef old_osdmap = osdmap; update_osdmap(new_osdmap, new_primary); + new_epoch(false); + new_epoch_loop(); +} - // Update peering listeners for ALL shards (even failed ones need epoch updates) - for (auto& [shard, listener] : shard_peering_listeners) { - listener->current_epoch = new_osdmap->get_epoch(); - } - - // Get primary from OSDMap for advance_map calls using base class pgid member - std::vector up_osds, acting_osds; - int up_primary = -1, acting_primary = -1; - osdmap->pg_to_up_acting_osds(this->pgid, &up_osds, &up_primary, &acting_osds, &acting_primary); - - // Call advance_map on ALL shards that have peering states, including failed ones - // This ensures that failed OSDs are notified of map changes (e.g., primary failover) - // Use the newly computed up_osds and acting_osds from the new OSDMap - for (auto& [shard, ps] : shard_peering_states) { - ps->advance_map( - osdmap, old_osdmap, up_osds, up_primary, acting_osds, acting_primary, - *get_peering_ctx(shard)); - } - - // Call activate_map on ALL shards that have peering states - // This ensures failed OSDs properly transition state and notify their backends - for (auto& [shard, ps] : shard_peering_states) { - ps->activate_map(*get_peering_ctx(shard)); - } - - dispatch_all(); - - // Handle up_thru requirements - keep creating new epochs until peering completes. - // Note: For primary failover scenarios, full peering may not complete immediately. - int max_iterations = 3; +void ECPeeringTestFixture::new_epoch_loop() { + int max = 10; do { + ceph_assert(--max); event_advance_map(); event_activate_map(); - } while (new_epoch(true) && --max_iterations); + } while (new_epoch(true)); } bool ECPeeringTestFixture::new_epoch(bool if_required) @@ -179,7 +451,6 @@ bool ECPeeringTestFixture::new_epoch(bool if_required) if (acting_primary >= 0) { auto& listener = shard_peering_listeners[acting_primary]; if (listener->pg_temp_wanted) { - // Get up set from OSDMap std::vector up_osds; int up_primary = -1; osdmap->pg_to_up_acting_osds(this->pgid, &up_osds, &up_primary, nullptr, nullptr); @@ -188,18 +459,9 @@ bool ECPeeringTestFixture::new_epoch(bool if_required) if (acting_temp.empty()) { acting_temp = up_osds; } - - // Apply the pg_temp change that peering requested. - // For EC pools with optimizations, transform to primaryfirst order - // (this simulates what the monitor does in production). - const pg_pool_t* pool = osdmap->get_pg_pool(this->pgid.pool()); - std::vector pg_temp_acting = acting_temp; - if (pool && pool->allows_ecoptimizations()) { - pg_temp_acting = osdmap->pgtemp_primaryfirst(*pool, acting_temp); - } - + pending_inc.new_pg_temp[this->pgid] = - mempool::osdmap::vector(pg_temp_acting.begin(), pg_temp_acting.end()); + mempool::osdmap::vector(acting_temp.begin(), acting_temp.end()); listener->pg_temp_wanted = false; did_work = true; @@ -219,26 +481,10 @@ bool ECPeeringTestFixture::new_epoch(bool if_required) return true; } -void ECPeeringTestFixture::run_peering_cycle() -{ +void ECPeeringTestFixture::run_first_peering() { init_peering(); event_initialize(); - dispatch_all(); - event_advance_map(); - dispatch_all(); - event_activate_map(); - dispatch_all(); - - // Handle up_thru requirements - keep creating new epochs until peering completes. - int max_iterations = 10; - for (int i = 0; i < max_iterations && !all_shards_active(); i++) { - if (new_epoch(true)) { - event_advance_map(); - dispatch_all(); - event_activate_map(); - dispatch_all(); - } - } + new_epoch_loop(); } int ECPeeringTestFixture::queue_transaction_helper(int shard, ObjectStore::Transaction&& t) @@ -264,14 +510,6 @@ void ECPeeringTestFixture::mark_osd_down(int osd_id) OSDMapTestHelpers::mark_osd_down(new_osdmap, osd_id); update_osdmap_with_peering(new_osdmap); - dispatch_all(); - - // Process any pg_temp requests from peering (emulates monitor processing MOSDPGTemp) - // This will apply the primaryfirst transformation if needed - if (new_epoch(false)) { - event_advance_map(); - dispatch_all(); - } } void ECPeeringTestFixture::mark_osd_up(int osd_id) @@ -282,7 +520,6 @@ void ECPeeringTestFixture::mark_osd_up(int osd_id) OSDMapTestHelpers::mark_osd_up(new_osdmap, osd_id); update_osdmap_with_peering(new_osdmap); - dispatch_all(); } void ECPeeringTestFixture::mark_osds_down(const std::vector& osd_ids) @@ -293,7 +530,6 @@ void ECPeeringTestFixture::mark_osds_down(const std::vector& osd_ids) OSDMapTestHelpers::mark_osds_down(new_osdmap, osd_ids); update_osdmap_with_peering(new_osdmap); - dispatch_all(); } void ECPeeringTestFixture::advance_epoch() @@ -303,6 +539,5 @@ void ECPeeringTestFixture::advance_epoch() OSDMapTestHelpers::advance_epoch(new_osdmap); update_osdmap_with_peering(new_osdmap); - dispatch_all(); } diff --git a/src/test/osd/ECPeeringTestFixture.h b/src/test/osd/ECPeeringTestFixture.h index 5c33603cd8c6..2cad715cd3de 100644 --- a/src/test/osd/ECPeeringTestFixture.h +++ b/src/test/osd/ECPeeringTestFixture.h @@ -20,12 +20,12 @@ #include #include "test/osd/PGBackendTestFixture.h" #include "test/osd/MockPeeringListener.h" -#include "test/osd/MockConnection.h" -#include "test/osd/MockECRecPred.h" -#include "test/osd/MockECReadPred.h" -#include "test/osd/OSDMapTestHelpers.h" #include "osd/PeeringState.h" -#include "messages/MOSDPeeringOp.h" +#include "messages/MOSDPGNotify2.h" +#include "test/osd/MockMessenger.h" + +// Forward declaration +class ECPeeringTestFixture; /** * ECPeeringTestFixture - EC test fixture with full peering infrastructure @@ -41,15 +41,6 @@ protected: std::map> shard_peering_ctxs; std::map> shard_peering_listeners; - std::map> shard_messages; - std::map> shard_events; - - // Raw-pointer map giving this fixture direct access to the backend listeners - // created by the listener_factory. The pointers are valid for the lifetime - // of the test because ownership is transferred to - // shard_peering_listeners[i]->backend_listener in create_peering_state(). - std::map backend_listeners; - class ShardDpp : public NoDoutPrefix { public: ECPeeringTestFixture *fixture; @@ -58,94 +49,26 @@ protected: ShardDpp(CephContext *cct, ECPeeringTestFixture *f, int s) : NoDoutPrefix(cct, ceph_subsys_osd), fixture(f), shard(s) {} - std::ostream& gen_prefix(std::ostream& out) const override { - out << "shard " << shard << ": "; - if (fixture->shard_peering_states.contains(shard)) { - PeeringState *ps = fixture->shard_peering_states[shard].get(); - out << *ps << " "; - } - return out; - } + std::ostream& gen_prefix(std::ostream& out) const override; }; std::map> shard_dpps; - IsPGRecoverablePredicate *get_is_recoverable_predicate() { - return new MockECRecPred(k, m); - } - - IsPGReadablePredicate *get_is_readable_predicate() { - return new MockECReadPred(k, m); - } + IsPGRecoverablePredicate *get_is_recoverable_predicate(); + IsPGReadablePredicate *get_is_readable_predicate(); public: - ECPeeringTestFixture() : PGBackendTestFixture(PGBackendTestFixture::EC) { - // Install a listener_factory so that setup_ec_pool() creates listeners - // that we can access directly (via backend_listeners[]) without needing - // to steal ownership via release_listener(). - // - // The factory records a raw pointer in backend_listeners[instance] and - // returns the unique_ptr to the base class, which stores it in listeners[]. - // In create_peering_state() we then move that unique_ptr from listeners[] - // into shard_peering_listeners[]->backend_listener, at which point the - // raw pointer in backend_listeners[] remains valid (owned by the peering - // listener for the rest of the test). - listener_factory = [this]( - int instance, - std::shared_ptr om, - int64_t pool_id, - DoutPrefixProvider* dpp_arg, - pg_shard_t whoami) -> std::unique_ptr - { - auto bl = std::make_unique( - om, pool_id, dpp_arg, whoami); - // Record raw pointer so tests can access the listener directly - backend_listeners[instance] = bl.get(); - return bl; - }; - } - - void SetUp() override { - PGBackendTestFixture::SetUp(); - for (int i = 0; i < k + m; i++) { - create_peering_state(i); - } - } + ECPeeringTestFixture(); + + int queue_transaction_helper(int shard, ObjectStore::Transaction&& t); - void TearDown() override { - shard_peering_states.clear(); - shard_peering_ctxs.clear(); - shard_peering_listeners.clear(); - shard_dpps.clear(); - shard_messages.clear(); - shard_events.clear(); - PGBackendTestFixture::TearDown(); - } + void SetUp() override; + void TearDown() override; PeeringState* create_peering_state(int shard); - PeeringState* get_peering_state(int shard) { - ceph_assert(shard >= 0 && shard < k + m); - auto it = shard_peering_states.find(shard); - ceph_assert(it != shard_peering_states.end()); - ceph_assert(it->second != nullptr); - return it->second.get(); - } - - PeeringCtx* get_peering_ctx(int shard) { - ceph_assert(shard >= 0 && shard < k + m); - auto it = shard_peering_ctxs.find(shard); - ceph_assert(it != shard_peering_ctxs.end()); - ceph_assert(it->second != nullptr); - return it->second.get(); - } - - MockPeeringListener* get_peering_listener(int shard) { - ceph_assert(shard >= 0 && shard < k + m); - auto it = shard_peering_listeners.find(shard); - ceph_assert(it != shard_peering_listeners.end()); - ceph_assert(it->second != nullptr); - return it->second.get(); - } + PeeringState* get_peering_state(int shard); + PeeringCtx* get_peering_ctx(int shard); + MockPeeringListener* get_peering_listener(int shard); /** * Query the OSDMap to determine which shard is the primary. @@ -153,287 +76,39 @@ public: * * @return The shard ID of the primary, or -1 if no primary exists */ - int get_primary_shard_from_osdmap() const { - std::vector acting_osds; - int acting_primary = -1; - osdmap->pg_to_acting_osds(this->pgid, &acting_osds, &acting_primary); - return acting_primary; - } + int get_primary_shard_from_osdmap() const; // Override base class methods to work with peering fixture's structure - MockPGBackendListener* get_primary_listener() override { - int primary_shard = get_primary_shard_from_osdmap(); - if (primary_shard < 0) { - return nullptr; - } - - auto it = shard_peering_listeners.find(primary_shard); - if (it != shard_peering_listeners.end() && it->second && - it->second->backend_listener) { - // Assert that the backend listener agrees it's primary - ceph_assert(it->second->backend_listener->pgb_is_primary()); - return it->second->backend_listener.get(); - } - return nullptr; - } - - PGBackend* get_primary_backend() override { - int primary_shard = get_primary_shard_from_osdmap(); - if (primary_shard < 0) { - return nullptr; - } - - auto listener_it = shard_peering_listeners.find(primary_shard); - if (listener_it != shard_peering_listeners.end() && listener_it->second && - listener_it->second->backend_listener) { - // Assert that the backend listener agrees it's primary - ceph_assert(listener_it->second->backend_listener->pgb_is_primary()); - - // Return the backend from the base class's backends map, not from - // the peering listener, because the base class backend is connected - // to the event loop and message routers - auto backend_it = backends.find(primary_shard); - return (backend_it != backends.end()) ? backend_it->second.get() : nullptr; - } - return nullptr; - } + MockPGBackendListener* get_primary_listener() override; + PGBackend* get_primary_backend() override; void init_peering(bool dne = false); - - void event_initialize() { - // Get acting set from OSDMap - std::vector acting_osds; - int acting_primary = -1; - osdmap->pg_to_acting_osds(this->pgid, &acting_osds, &acting_primary); - - for (int shard : acting_osds) { - // Skip failed OSDs (marked as CRUSH_ITEM_NONE) - if (shard == CRUSH_ITEM_NONE) { - continue; - } - auto evt = std::make_shared( - osdmap->get_epoch(), - osdmap->get_epoch(), - PeeringState::Initialize()); - - get_peering_state(shard)->handle_event(evt, get_peering_ctx(shard)); - } - } - - void event_advance_map() { - // Get primary from OSDMap - query once before the loop - std::vector up_osds, acting_osds; - int up_primary = -1, acting_primary = -1; - osdmap->pg_to_up_acting_osds(this->pgid, &up_osds, &up_primary, &acting_osds, &acting_primary); - - for (int shard : acting_osds) { - // Skip failed OSDs (marked as CRUSH_ITEM_NONE) - if (shard == CRUSH_ITEM_NONE) { - continue; - } - get_peering_state(shard)->advance_map( - osdmap, osdmap, up_osds, up_primary, acting_osds, acting_primary, - *get_peering_ctx(shard)); - } - } - - void event_activate_map() { - // Get acting set from OSDMap - must use same set as advance_map - std::vector up_osds, acting_osds; - int up_primary = -1, acting_primary = -1; - osdmap->pg_to_up_acting_osds(this->pgid, &up_osds, &up_primary, &acting_osds, &acting_primary); - - for (int shard : acting_osds) { - // Skip failed OSDs (marked as CRUSH_ITEM_NONE) - if (shard == CRUSH_ITEM_NONE) { - continue; - } - get_peering_state(shard)->activate_map(*get_peering_ctx(shard)); - } - } + void event_initialize(); + void event_advance_map(); + void event_activate_map(); private: - // Dispatch all messages from a map>. - // Templated to work with both std::vector (PeeringCtx::message_map) and - // std::list (MockPeeringListener::messages). - template - bool dispatch_messages_from_map(int from_shard, - std::map& msg_map) { - bool did_work = false; - - // Get acting set from OSDMap - std::vector acting_osds; - int acting_primary = -1; - osdmap->pg_to_acting_osds(this->pgid, &acting_osds, &acting_primary); - - for (auto& [to_shard, msg_list] : msg_map) { - if (std::find(acting_osds.begin(), acting_osds.end(), to_shard) == acting_osds.end()) { - continue; - } - - while (!msg_list.empty()) { - MessageRef m = msg_list.front(); - msg_list.erase(msg_list.begin()); - - // Cast to MOSDPeeringOp - all peering messages inherit from this. - // Use dynamic_cast with assertion to catch unexpected message types. - // Use m.get() (not m.detach()) to avoid leaking the raw pointer. - MOSDPeeringOp *op = dynamic_cast(m.get()); - ceph_assert(op != nullptr) /* message must be a MOSDPeeringOp */; - - // Set connection peer to the SENDER, not the destination - ceph_msg_header h = op->get_header(); - h.src.num = from_shard; - op->set_header(h); - - ConnectionRef conn = new MockConnection(from_shard); - op->set_connection(conn); - - // get_event() returns a newly allocated PGPeeringEvent, - // so we take ownership directly into a shared_ptr (matching OSD.cc pattern) - PGPeeringEventRef evt_ref(op->get_event()); - - get_peering_state(to_shard)->handle_event( - evt_ref, - get_peering_ctx(to_shard)); - - did_work = true; - } - } - - return did_work; - } + /** + * dispatch_buffered_messages - Check for and dispatch any buffered messages + * + * After handling a peering event, PeeringState may have buffered messages + * in the PeeringCtx that need to be dispatched. This function checks for + * such messages and routes them through the messenger. + */ + void dispatch_buffered_messages(int from_shard, PeeringCtx* ctx); public: - bool dispatch_peering_messages(int from_shard) { - auto* ctx = get_peering_ctx(from_shard); - return dispatch_messages_from_map(from_shard, ctx->message_map); - } - bool dispatch_cluster_messages(int from_shard) { - auto& listener = shard_peering_listeners[from_shard]; - return dispatch_messages_from_map(from_shard, listener->messages); - } - - bool dispatch_all_peering_messages() { - bool did_work = false; - bool work_this_round; - - // Get acting set from OSDMap - std::vector acting_osds; - int acting_primary = -1; - osdmap->pg_to_acting_osds(this->pgid, &acting_osds, &acting_primary); - - do { - work_this_round = false; - for (int shard : acting_osds) { - // Skip failed OSDs (marked as CRUSH_ITEM_NONE) - if (shard == CRUSH_ITEM_NONE) { - continue; - } - work_this_round |= dispatch_peering_messages(shard); - } - did_work |= work_this_round; - } while (work_this_round); - - return did_work; - } - - bool dispatch_events(int shard, bool stalled = false) { - auto& listener = shard_peering_listeners[shard]; - std::list& event_queue = - stalled ? listener->stalled_events : listener->events; - - if (event_queue.empty()) { - return false; - } - - bool did_work = false; - while (!event_queue.empty()) { - PGPeeringEventRef evt = event_queue.front(); - event_queue.pop_front(); - - get_peering_state(shard)->handle_event(evt, get_peering_ctx(shard)); - did_work = true; - } - - return did_work; - } - - bool dispatch_all_events(bool stalled = false) { - bool did_work = false; - bool work_this_round; - - // Get acting set from OSDMap - std::vector acting_osds; - int acting_primary = -1; - osdmap->pg_to_acting_osds(this->pgid, &acting_osds, &acting_primary); - - do { - work_this_round = false; - for (int shard : acting_osds) { - // Skip failed OSDs (marked as CRUSH_ITEM_NONE) - if (shard == CRUSH_ITEM_NONE) { - continue; - } - work_this_round |= dispatch_events(shard, stalled); - } - did_work |= work_this_round; - } while (work_this_round); - - return did_work; - } - - bool dispatch_all_cluster_messages() { - bool did_work = false; - bool work_this_round; - - // Get acting set from OSDMap - std::vector acting_osds; - int acting_primary = -1; - osdmap->pg_to_acting_osds(this->pgid, &acting_osds, &acting_primary); - - do { - work_this_round = false; - for (int shard : acting_osds) { - // Skip failed OSDs (marked as CRUSH_ITEM_NONE) - if (shard == CRUSH_ITEM_NONE) { - continue; - } - work_this_round |= dispatch_cluster_messages(shard); - } - did_work |= work_this_round; - } while (work_this_round); - - return did_work; - } - - bool dispatch_all() { - bool did_work = false; - bool work_this_round; - - do { - work_this_round = false; - work_this_round |= dispatch_all_peering_messages(); - work_this_round |= dispatch_all_cluster_messages(); - work_this_round |= dispatch_all_events(); - did_work |= work_this_round; - } while (work_this_round); - - return did_work; - } - // IMPORTANT: For EC pools, shard positions in acting array must be preserved. // Failed OSDs should be replaced with CRUSH_ITEM_NONE, not removed. void update_osdmap_with_peering( std::shared_ptr new_osdmap, std::optional new_primary = std::nullopt); + void new_epoch_loop(); bool new_epoch(bool if_required = false); - int queue_transaction_helper(int shard, ObjectStore::Transaction&& t); - - void run_peering_cycle(); + void run_first_peering(); // OSDMap manipulation helpers - these create a new epoch and trigger peering @@ -461,39 +136,53 @@ public: */ void advance_epoch(); - bool all_shards_active() { - // Get acting set from OSDMap - std::vector acting_osds; - int acting_primary = -1; - osdmap->pg_to_acting_osds(this->pgid, &acting_osds, &acting_primary); - - for (int shard : acting_osds) { - // Skip failed OSDs (marked as CRUSH_ITEM_NONE) - if (shard == CRUSH_ITEM_NONE) { - continue; - } - if (!get_peering_state(shard)->is_active()) { - return false; - } - } - return true; - } + bool all_shards_active(); // In EC pools, only the primary tracks PG_STATE_CLEAN. - bool all_shards_clean() { - // Get primary from OSDMap - std::vector acting_osds; - int acting_primary = -1; - osdmap->pg_to_acting_osds(this->pgid, &acting_osds, &acting_primary); - - if (acting_primary >= 0 && acting_primary != CRUSH_ITEM_NONE) { - return get_peering_state(acting_primary)->is_clean(); - } - return false; - } + bool all_shards_clean(); - std::string get_state_name(int shard) { - return get_peering_state(shard)->get_current_state(); - } + std::string get_state_name(int shard); + + /** + * Suspend an OSD - queues events for this OSD without executing them. + * This simulates an OSD being temporarily unavailable. + * Events remain queued and will be processed when the OSD is unsuspended. + * + * @param osd The OSD number to suspend + */ + void suspend_osd(int osd); + + /** + * Unsuspend a previously suspended OSD. + * Queued events for this OSD will be processed on the next event loop iteration. + * + * @param osd The OSD number to unsuspend + */ + void unsuspend_osd(int osd); + + /** + * Check if an OSD is currently suspended. + * + * @param osd The OSD number to check + * @return true if the OSD is suspended, false otherwise + */ + bool is_osd_suspended(int osd); + + /** + * Suspend messages from the primary to a specific OSD. + * This blocks communication from the primary to the target OSD while + * allowing other communication to proceed normally. + * + * @param to_osd The OSD number to block messages to (from the primary) + */ + void suspend_primary_to_osd(int to_osd); + + /** + * Unsuspend messages from the primary to a specific OSD. + * Queued messages will be processed on the next event loop iteration. + * + * @param to_osd The OSD number to unblock messages to (from the primary) + */ + void unsuspend_primary_to_osd(int to_osd); }; diff --git a/src/test/osd/EventLoop.h b/src/test/osd/EventLoop.h index 1167b82b4866..c54d3fb603da 100644 --- a/src/test/osd/EventLoop.h +++ b/src/test/osd/EventLoop.h @@ -17,8 +17,9 @@ #include #include -#include +#include #include +#include #include #include #include "include/types.h" @@ -50,17 +51,28 @@ public: private: struct Event { EventType type; - int osd; // -1 for generic events + int from_osd; // -1 for generic events or when not applicable + int to_osd; // -1 for generic events or when not applicable GenericEvent callback; - Event(EventType t, int o, GenericEvent cb) - : type(t), osd(o), callback(std::move(cb)) {} + Event(EventType t, int from, int to, GenericEvent cb) + : type(t), from_osd(from), to_osd(to), callback(std::move(cb)) {} }; - std::queue event_queue; + std::deque events; bool verbose = false; int events_executed = 0; std::map events_by_type; + std::set suspended_from_osds; + std::set suspended_to_osds; + std::set> suspended_from_to_osds; + int current_osd = -1; + + // Map from OSD number to its queue of suspended events + std::map> suspended_events; + + // Callback to check for more work when idle (returns true if more work was generated) + std::list> idle_callbacks; static constexpr const char* event_type_name(EventType type) { switch (type) { @@ -78,35 +90,47 @@ public: EventLoop(bool verbose = false) : verbose(verbose) {} void schedule_generic(GenericEvent event) { - event_queue.emplace(EventType::GENERIC, -1, std::move(event)); + events.emplace_back(EventType::GENERIC, -1, -1, std::move(event)); } - void schedule_osd_message(int osd, GenericEvent callback) { - event_queue.emplace(EventType::OSD_MESSAGE, osd, std::move(callback)); + void schedule_osd_message(int from_osd, int to_osd, GenericEvent callback) { + ceph_assert(from_osd >= 0); + ceph_assert(to_osd >= 0); + events.emplace_back(EventType::OSD_MESSAGE, from_osd, to_osd, std::move(callback)); } void schedule_transaction(int osd, GenericEvent callback) { - event_queue.emplace(EventType::TRANSACTION, osd, std::move(callback)); + ceph_assert(osd >= 0); + events.emplace_back(EventType::TRANSACTION, -1, osd, std::move(callback)); } - void schedule_peering_message(int to_osd, GenericEvent callback) { - event_queue.emplace(EventType::PEERING_MESSAGE, to_osd, std::move(callback)); + void schedule_peering_message(int from_osd, int to_osd, GenericEvent callback) { + ceph_assert(from_osd >= 0); + ceph_assert(to_osd >= 0); + events.emplace_back(EventType::PEERING_MESSAGE, from_osd, to_osd, std::move(callback)); } - void schedule_cluster_message(int to_osd, GenericEvent callback) { - event_queue.emplace(EventType::CLUSTER_MESSAGE, to_osd, std::move(callback)); + void schedule_cluster_message(int from_osd, int to_osd, GenericEvent callback) { + ceph_assert(from_osd >= 0); + ceph_assert(to_osd >= 0); + events.emplace_back(EventType::CLUSTER_MESSAGE, from_osd, to_osd, std::move(callback)); } void schedule_peering_event(int osd, GenericEvent callback) { - event_queue.emplace(EventType::PEERING_EVENT, osd, std::move(callback)); + ceph_assert(osd >= 0); + events.emplace_back(EventType::PEERING_EVENT, -1, osd, std::move(callback)); } bool has_events() const { - return !event_queue.empty(); + return !events.empty(); + } + + void register_idle_callback(std::function callback) { + idle_callbacks.emplace_back(std::move(callback)); } size_t queued_event_count() const { - return event_queue.size(); + return events.size(); } int get_events_executed() const { @@ -123,22 +147,71 @@ public: } bool run_one() { - if (event_queue.empty()) { + if (events.empty()) { return false; } - Event event = std::move(event_queue.front()); - event_queue.pop(); + Event event = std::move(events.front()); + events.pop_front(); + + // Check if this event should be suspended based on from_osd, to_osd, or from-to pair + bool should_suspend = false; + int suspend_osd = -1; + + if (event.from_osd >= 0 && is_from_osd_suspended(event.from_osd)) { + should_suspend = true; + suspend_osd = event.from_osd; + } else if (event.to_osd >= 0 && is_to_osd_suspended(event.to_osd)) { + should_suspend = true; + suspend_osd = event.to_osd; + } else if (event.from_osd >= 0 && event.to_osd >= 0 && + is_from_to_osd_suspended(event.from_osd, event.to_osd)) { + should_suspend = true; + suspend_osd = event.to_osd; // Use to_osd for queue key + } + + if (should_suspend) { + // Move to suspended queue + if (verbose) { + std::cout << " [Event " << (events_executed + 1) << "] " + << event_type_name(event.type); + if (event.from_osd >= 0 && event.to_osd >= 0) { + std::cout << " (OSD." << event.from_osd << " -> OSD." << event.to_osd << ")"; + } else if (event.to_osd >= 0) { + std::cout << " (to OSD." << event.to_osd << ")"; + } else if (event.from_osd >= 0) { + std::cout << " (from OSD." << event.from_osd << ")"; + } + std::cout << " *** SUSPENDED - moving to suspended queue ***" << std::endl; + } + suspended_events[suspend_osd].push_back(std::move(event)); + return true; // We processed an event (by suspending it) + } + + // Print banner if switching to a different OSD + int active_osd = (event.to_osd >= 0) ? event.to_osd : event.from_osd; + if (active_osd >= 0 && active_osd != current_osd) { + current_osd = active_osd; + if (verbose) { + std::cout << "\n==== Processing events for OSD." << current_osd + << " ====" << std::endl; + } + } if (verbose) { std::cout << " [Event " << (events_executed + 1) << "] " << event_type_name(event.type); - if (event.osd >= 0) { - std::cout << " (OSD " << event.osd << ")"; + if (event.from_osd >= 0 && event.to_osd >= 0) { + std::cout << " (OSD." << event.from_osd << " -> OSD." << event.to_osd << ")"; + } else if (event.to_osd >= 0) { + std::cout << " (to OSD." << event.to_osd << ")"; + } else if (event.from_osd >= 0) { + std::cout << " (from OSD." << event.from_osd << ")"; } std::cout << " Executing..." << std::endl; } + // Execute the event event.callback(); events_executed++; events_by_type[event.type]++; @@ -157,18 +230,29 @@ public: } if (verbose) { - std::cout << "=== Executed " << executed << " events, " - << event_queue.size() << " remaining ===" << std::endl; + std::cout << "=== Executed " << executed << " events, " + << events.size() << " remaining ===" << std::endl; } return executed; } - + + bool do_idle_callbacks() { + bool new_work = false; + for (auto cb : idle_callbacks) { + if (cb()) { + new_work = true; + } + } + + return new_work; + } + /** * Run until the queue is empty or max_events is reached. * Returns -1 if max_events was reached before the queue emptied. */ - int run_until_idle(int max_events = 0) { + void run_until_idle(int max_events = 10000) { if (verbose) { std::cout << "\n=== Running until idle"; if (max_events > 0) { @@ -176,78 +260,168 @@ public: } std::cout << " ===" << std::endl; } - - int executed = 0; - while (has_events()) { - if (max_events > 0 && executed >= max_events) { - if (verbose) { - std::cout << "=== Max events (" << max_events << ") reached, " - << event_queue.size() << " events remaining ===" << std::endl; - } - return -1; // Timeout + + do { + ceph_assert(--max_events); + while (has_events()) { + run_one(); } - - run_one(); - executed++; - } - + } while (do_idle_callbacks()); + } + + void clear() { + events.clear(); + suspended_events.clear(); + } + + void set_verbose(bool v) { + verbose = v; + } + + // OSD management methods + /** + * Suspend events FROM an OSD - events originating from this OSD will be queued + * but not executed until the OSD is unsuspended. + */ + void suspend_from_osd(int osd) { + suspended_from_osds.insert(osd); if (verbose) { - std::cout << "=== Idle: Executed " << executed << " events ===" << std::endl; - print_stats(); + std::cout << "*** Events FROM OSD." << osd << " marked as SUSPENDED ***" << std::endl; } + } + + /** + * Unsuspend events FROM an OSD - queued events from this OSD will be processed + * on subsequent run_one() calls. + */ + void unsuspend_from_osd(int osd) { + suspended_from_osds.erase(osd); - return executed; + // Move all suspended events for this OSD back to the main queue + auto it = suspended_events.find(osd); + if (it != suspended_events.end()) { + if (verbose) { + std::cout << "*** Events FROM OSD." << osd << " marked as UNSUSPENDED - restoring " + << it->second.size() << " suspended events ***" << std::endl; + } + + // Append suspended events to the main queue + for (auto& event : it->second) { + events.push_back(std::move(event)); + } + + suspended_events.erase(it); + } else { + if (verbose) { + std::cout << "*** Events FROM OSD." << osd << " marked as UNSUSPENDED ***" << std::endl; + } + } } /** - * Run until a condition is met, idle, or max_events is reached. - * The condition is checked after each event execution. - * Returns -1 if max_events was reached. + * Suspend events TO an OSD - events destined for this OSD will be queued + * but not executed until the OSD is unsuspended. */ - int run_until(int max_events, std::function condition) { + void suspend_to_osd(int osd) { + suspended_to_osds.insert(osd); if (verbose) { - std::cout << "\n=== Running until condition"; - if (max_events > 0) { - std::cout << " (max " << max_events << " events)"; - } - std::cout << " ===" << std::endl; + std::cout << "*** Events TO OSD." << osd << " marked as SUSPENDED ***" << std::endl; } + } + + /** + * Unsuspend events TO an OSD - queued events to this OSD will be processed + * on subsequent run_one() calls. + */ + void unsuspend_to_osd(int osd) { + suspended_to_osds.erase(osd); - int executed = 0; - while (has_events()) { - if (max_events > 0 && executed >= max_events) { - if (verbose) { - std::cout << "=== Max events (" << max_events << ") reached ===" << std::endl; - } - return -1; // Timeout + // Move all suspended events for this OSD back to the main queue + auto it = suspended_events.find(osd); + if (it != suspended_events.end()) { + if (verbose) { + std::cout << "*** Events TO OSD." << osd << " marked as UNSUSPENDED - restoring " + << it->second.size() << " suspended events ***" << std::endl; } - run_one(); - executed++; + // Append suspended events to the main queue + for (auto& event : it->second) { + events.push_back(std::move(event)); + } - if (condition()) { - if (verbose) { - std::cout << "=== Condition met after " << executed << " events ===" << std::endl; - } - return executed; + suspended_events.erase(it); + } else { + if (verbose) { + std::cout << "*** Events TO OSD." << osd << " marked as UNSUSPENDED ***" << std::endl; } } - + } + + /** + * Suspend events FROM one OSD TO another OSD - events from from_osd to to_osd + * will be queued but not executed until unsuspended. + */ + void suspend_from_to_osd(int from_osd, int to_osd) { + suspended_from_to_osds.insert({from_osd, to_osd}); if (verbose) { - std::cout << "=== Idle: Executed " << executed << " events, condition not met ===" << std::endl; + std::cout << "*** Events FROM OSD." << from_osd << " TO OSD." << to_osd + << " marked as SUSPENDED ***" << std::endl; } - - return executed; } - void clear() { - while (!event_queue.empty()) { - event_queue.pop(); + /** + * Unsuspend events FROM one OSD TO another OSD - queued events will be processed + * on subsequent run_one() calls. + */ + void unsuspend_from_to_osd(int from_osd, int to_osd) { + suspended_from_to_osds.erase({from_osd, to_osd}); + + // Move all suspended events for this OSD pair back to the main queue + auto it = suspended_events.find(to_osd); + if (it != suspended_events.end()) { + if (verbose) { + std::cout << "*** Events FROM OSD." << from_osd << " TO OSD." << to_osd + << " marked as UNSUSPENDED - restoring " + << it->second.size() << " suspended events ***" << std::endl; + } + + // Append suspended events to the main queue + for (auto& event : it->second) { + events.push_back(std::move(event)); + } + + suspended_events.erase(it); + } else { + if (verbose) { + std::cout << "*** Events FROM OSD." << from_osd << " TO OSD." << to_osd + << " marked as UNSUSPENDED ***" << std::endl; + } } } - void set_verbose(bool v) { - verbose = v; + bool is_from_osd_suspended(int osd) const { + return suspended_from_osds.find(osd) != suspended_from_osds.end(); + } + + bool is_to_osd_suspended(int osd) const { + return suspended_to_osds.find(osd) != suspended_to_osds.end(); + } + + bool is_from_to_osd_suspended(int from_osd, int to_osd) const { + return suspended_from_to_osds.find({from_osd, to_osd}) != suspended_from_to_osds.end(); + } + + size_t get_suspended_event_count(int osd) const { + auto it = suspended_events.find(osd); + return (it != suspended_events.end()) ? it->second.size() : 0; + } + + size_t get_total_suspended_events() const { + size_t total = 0; + for (const auto& pair : suspended_events) { + total += pair.second.size(); + } + return total; } void print_stats() const { diff --git a/src/test/osd/MockMessenger.h b/src/test/osd/MockMessenger.h new file mode 100644 index 000000000000..20a39b989b86 --- /dev/null +++ b/src/test/osd/MockMessenger.h @@ -0,0 +1,242 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 sts=2 expandtab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2026 IBM + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include +#include "msg/Message.h" +#include "test/osd/EventLoop.h" +#include "test/osd/MockConnection.h" +#include "common/dout.h" +#include "common/TrackedOp.h" +#include "osd/OpRequest.h" +#include "msg/async/frames_v2.h" + +#define dout_subsys ceph_subsys_osd + +/** + * MockMessenger - Routes messages through EventLoop with registered handlers + * + * This class provides a simple message routing mechanism for OSD tests. + * Event handlers are registered as lambdas that process messages, and + * messages are scheduled on the EventLoop for asynchronous processing. + * + * Epoch Handling: + * - Messages are tagged with the sender's epoch when sent + * - Messages are dropped if the receiver's current epoch doesn't match + * - This simulates real OSD behavior where stale messages are ignored + * + * OpRequest Handling: + * - Messages are wrapped in OpRequestRef before being passed to handlers + * - This ensures proper lifetime management and prevents use-after-free issues + * - Handlers receive OpRequestRef instead of raw Message* pointers + */ +class MockMessenger { +public: + using MessageHandler = std::function; + using EpochGetter = std::function; + +private: + EventLoop* event_loop = nullptr; + std::map handlers; + EpochGetter epoch_getter; + DoutPrefixProvider *dpp = nullptr; + +public: + MockMessenger(EventLoop* loop, CephContext* cct, DoutPrefixProvider *dpp = nullptr) + : event_loop(loop), dpp(dpp) { + ceph_assert(event_loop != nullptr); + ceph_assert(cct != nullptr); + } + + /** + * Set the epoch getter callback. + * This callback is used to get the current epoch for a given OSD. + * + * @param getter Lambda that accepts an OSD number and returns its current epoch + */ + void set_epoch_getter(EpochGetter getter) { + epoch_getter = std::move(getter); + } + + /** + * Set the DoutPrefixProvider for logging + */ + void set_dpp(DoutPrefixProvider *d) { + dpp = d; + } + + /** + * Register an event handler for processing messages. + * Handlers are called in registration order until one returns true. + * + * @param type Message type code, or -1 for a catch-all handler + * @param handler Lambda that accepts (from_osd, to_osd, Message*) and returns true if handled + * Handler is responsible for wrapping the message in appropriate tracker/reference type + */ + void register_handler(int type, MessageHandler handler) { + ceph_assert(!handlers.contains(type)); + handlers.emplace(type, std::move(handler)); + } + + /** + * Register a type-safe handler for a specific message type. + * Uses C++20 concepts to ensure type safety at compile time. + * The handler receives the correctly-typed message pointer. + * + * @tparam MsgType The specific message type (e.g., MOSDECSubOpWrite) + * @param msg_type The message type code (e.g., MSG_OSD_EC_WRITE) + * @param handler Lambda that accepts (from_osd, to_osd, MsgType*) and returns true if handled + */ + template + requires std::derived_from + void register_typed_handler(int msg_type, std::function handler) { + // Wrap the typed handler in a generic handler that performs the cast + register_handler(msg_type, [handler](int from_osd, int to_osd, Message* m) -> bool { + MsgType* typed_msg = dynamic_cast(m); + if (!typed_msg) { + return false; // Wrong type, let other handlers try + } + return handler(from_osd, to_osd, typed_msg); + }); + } + + /** + * Send a message from one OSD to another. + * The message will be scheduled on the EventLoop and processed by + * registered handlers. The sender's epoch is captured at send time. + * Messages are dropped if the receiver's epoch doesn't match the message epoch. + * Handlers receive the raw Message* and are responsible for wrapping it in + * appropriate tracker/reference types (e.g., OpRequestRef). + * Panics if no handler processes the message. + * + * @param from_osd Source OSD number + * @param to_osd Destination OSD number + * @param m Message to send (takes ownership) + */ + void send_message(int from_osd, int to_osd, Message* m) { + ceph_assert(from_osd >= 0); + ceph_assert(to_osd >= 0); + ceph_assert(m != nullptr); + + // Wrap in MessageRef to manage lifetime + MessageRef mref(m); + + // Capture the receiver's epoch at send time for epoch checking + epoch_t send_epoch = 0; + if (epoch_getter) { + send_epoch = epoch_getter(to_osd); + } + + // Set the message header's source to the sender OSD BEFORE encoding + // This is critical for peering messages - MOSDPGInfo2::get_event() uses + // get_source().num() to construct the pg_shard_t for MInfoRec + ceph_msg_header h = m->get_header(); + h.src.num = from_osd; + m->set_header(h); + + if (dpp) { + ldpp_dout(dpp, 10) << "MockMessenger: Scheduling message from OSD." << from_osd + << " to OSD." << to_osd << " (type: " << m->get_type() + << ", epoch: " << send_epoch << ")" << dendl; + } + + // Encode the message payload to prepare it for transmission + // This ensures internal structures like txn_payload are properly serialized + m->encode(CEPH_FEATURES_ALL, 0); + + // Copy the message components to simulate network transmission + // This creates independent copies that can be decoded into a new message object + // on the receiver side, avoiding use-after-free issues + + ceph_msg_header &header = m->get_header(); + ceph_msg_footer &footer = m->get_footer(); + + ceph_msg_header2 header2{header.seq, header.tid, + header.type, header.priority, + header.version, + ceph_le32(0), header.data_off, + ceph_le64(0), + footer.flags, header.compat_version, + header.reserved}; + + auto mf = ceph::msgr::v2::MessageFrame::Encode( + header2, + m->get_payload(), + m->get_middle(), + m->get_data()); + + + // Schedule event on EventLoop with the copied message components + event_loop->schedule_osd_message(from_osd, to_osd, + [this, header, footer, mf, from_osd, to_osd, send_epoch]() mutable { + // Get the receiver's current epoch when processing + epoch_t current_epoch = 0; + if (epoch_getter) { + current_epoch = epoch_getter(to_osd); + } + + // Drop messages from different epochs (both old and new) + if (current_epoch != send_epoch) { + if (dpp) { + ldpp_dout(dpp, 10) << "MockMessenger: Dropping message from OSD." << from_osd + << " to OSD." << to_osd << " (type: " << header.type + << ") - epoch mismatch: send_epoch=" << send_epoch + << " current_epoch=" << current_epoch << dendl; + } + return; + } + + // Decode the message components into a new message object + // This uses the proper decode_message() function that real messengers use, + // which supports the new footer format with message authentication + Message *decoded_msg = decode_message(g_ceph_context, 0, header, footer, + mf.front(), mf.middle(), mf.data(), new MockConnection(from_osd)); + + ceph_assert(decoded_msg); + + // Try specific handler first, then catch-all handler (-1) + if (!handlers.contains(header.type)) { + std::cerr << "ERROR: No handler registered for message type " << header.type + << " (0x" << std::hex << header.type << std::dec << ")" << std::endl; + std::cerr << "Registered handlers: "; + for (const auto& [type, _] : handlers) { + std::cerr << type << " (0x" << std::hex << type << std::dec << "), "; + } + std::cerr << std::endl; + } + ceph_assert(handlers.contains(header.type)); + ceph_assert(handlers.at(header.type)(from_osd, to_osd, decoded_msg)); + }); + } + + /** + * Get the number of registered handlers + */ + size_t handler_count() const { + return handlers.size(); + } + + /** + * Clear all registered handlers + */ + void clear_handlers() { + handlers.clear(); + } + +}; + +// Made with Bob diff --git a/src/test/osd/MockPGBackendListener.h b/src/test/osd/MockPGBackendListener.h index 35049f4c0832..e0e6a581adce 100644 --- a/src/test/osd/MockPGBackendListener.h +++ b/src/test/osd/MockPGBackendListener.h @@ -32,6 +32,7 @@ #include "global/global_context.h" #include "test/osd/MockConnection.h" #include "test/osd/EventLoop.h" +#include "test/osd/MockMessenger.h" #include "osd/OpRequest.h" // MockPGBackendListener - mock PGBackend::Listener and ECListener for multi-instance testing. @@ -60,8 +61,7 @@ public: ObjectStore *store = nullptr; ObjectStore::CollectionHandle ch; EventLoop *event_loop = nullptr; - std::function handle_message_callback; - std::map> *message_router = nullptr; + MockMessenger *messenger = nullptr; OpTracker *op_tracker = nullptr; PerfCounters *perf_logger = nullptr; @@ -96,12 +96,8 @@ public: peering_state = ps; } - void set_handle_message_callback(std::function cb) { - handle_message_callback = cb; - } - - void set_message_router(std::map> *router) { - message_router = router; + void set_messenger(MockMessenger *m) { + messenger = m; } // Debugging @@ -182,35 +178,19 @@ public: return c; } - // Routes messages through EventLoop for asynchronous EC message processing. + // Routes messages through MockMessenger for asynchronous message processing. void send_message(int to_osd, Message *m) override { MessageRef mref(m); sent_messages.push_back(mref); sent_messages_with_dest.push_back({to_osd, mref}); - if (event_loop && op_tracker && message_router) { + if (messenger) { // Capture the sender's OSD ID int from_osd = pg_whoami.osd; - // IMPORTANT: Encode the message payload to simulate network transmission - // This ensures that txn_payload is moved to the middle section for MOSDRepOp messages - // Without this, Transaction::decode will fail because the message structure is incomplete - mref->encode_payload(CEPH_FEATURES_ALL); - - event_loop->schedule_osd_message(to_osd, [this, mref, to_osd, from_osd]() { - if (!mref->get_connection()) { - // Set connection peer to the SENDER, not the destination - ConnectionRef conn = new MockConnection(from_osd); - mref->set_connection(conn); - } - OpRequestRef op = op_tracker->create_request(mref.get()); - - // Route to the correct shard's backend using the message router - auto it = message_router->find(to_osd); - if (it != message_router->end()) { - it->second(op); - } - }); + // Use MockMessenger to route the message with epoch tracking + // MockMessenger handles encoding, MockConnection setup, and epoch capture + messenger->send_message(from_osd, to_osd, m); } } @@ -659,7 +639,7 @@ public: const pg_missing_const_i * maybe_get_shard_missing( pg_shard_t peer) const override { if (peering_state) { - if (peer == peering_state->get_primary()) { + if (peer == primary_shard()) { return &peering_state->get_pg_log().get_missing(); } else { auto i = peering_state->get_peer_missing().find(peer); diff --git a/src/test/osd/MockPeeringListener.cc b/src/test/osd/MockPeeringListener.cc new file mode 100644 index 000000000000..00a9567041b0 --- /dev/null +++ b/src/test/osd/MockPeeringListener.cc @@ -0,0 +1,141 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 sts=2 expandtab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2026 IBM + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "test/osd/MockPeeringListener.h" +#include "test/osd/ECPeeringTestFixture.h" +#include "test/osd/EventLoop.h" + +// Implementation of MockPeeringListener::request_local_background_io_reservation +// This must be defined after ECPeeringTestFixture is fully defined to avoid incomplete type errors +void MockPeeringListener::request_local_background_io_reservation( + unsigned priority, + PGPeeringEventURef on_grant, + PGPeeringEventURef on_preempt) { + // If the test has configured an event loop (i.e. ECPeeringTestFixture), + // then use the event loop to run this event, rather than putting it on the queue. + if (event_loop && fixture) { + // Schedule the event through the event loop for deterministic execution + PGPeeringEventRef evt_ref = std::move(on_grant); + int shard = pg_whoami.osd; + event_loop->schedule_peering_event(shard, [this, evt_ref, shard]() { + fixture->get_peering_state(shard)->handle_event(evt_ref, fixture->get_peering_ctx(shard)); + }); + } else if (inject_event_stall) { + stalled_events.push_back(std::move(on_grant)); + } else { + events.push_back(std::move(on_grant)); + } + if (inject_keep_preempt) { + stalled_events.push_back(std::move(on_preempt)); + } + io_reservations_requested++; +} + +// Implementation of MockPeeringListener::request_remote_recovery_reservation +// This must be defined after ECPeeringTestFixture is fully defined to avoid incomplete type errors +void MockPeeringListener::request_remote_recovery_reservation( + unsigned priority, + PGPeeringEventURef on_grant, + PGPeeringEventURef on_preempt) { + // If the test has configured an event loop (i.e. ECPeeringTestFixture), + // then use the event loop to run this event, rather than putting it on the queue. + if (event_loop && fixture) { + // Schedule the event through the event loop for deterministic execution + PGPeeringEventRef evt_ref = std::move(on_grant); + int shard = pg_whoami.osd; + event_loop->schedule_peering_event(shard, [this, evt_ref, shard]() { + fixture->get_peering_state(shard)->handle_event(evt_ref, fixture->get_peering_ctx(shard)); + }); + } else if (inject_event_stall) { + stalled_events.push_back(std::move(on_grant)); + } else { + events.push_back(std::move(on_grant)); + } + if (inject_keep_preempt) { + stalled_events.push_back(std::move(on_preempt)); + } + remote_recovery_reservations_requested++; +} + +// Implementation of MockPeeringListener::schedule_event_on_commit +// This must be defined after ECPeeringTestFixture is fully defined to avoid incomplete type errors +void MockPeeringListener::schedule_event_on_commit( + ObjectStore::Transaction &t, + PGPeeringEventRef on_commit) { + // If the test has configured an event loop (i.e. ECPeeringTestFixture), + // then use the event loop to run this event, rather than putting it on the queue. + if (event_loop && fixture) { + // Schedule the event through the event loop for deterministic execution + int shard = pg_whoami.osd; + event_loop->schedule_peering_event(shard, [this, on_commit, shard]() { + fixture->get_peering_state(shard)->handle_event(on_commit, fixture->get_peering_ctx(shard)); + }); + } else if (inject_event_stall) { + stalled_events.push_back(std::move(on_commit)); + } else { + events.push_back(std::move(on_commit)); + } + events_on_commit_scheduled++; +} + +// Implementation of MockPeeringListener::on_activate_complete +// This must be defined after ECPeeringTestFixture is fully defined to avoid incomplete type errors +void MockPeeringListener::on_activate_complete() { + dout(0) << __func__ << dendl; + + // Helper lambda to schedule an event + auto schedule_event = [this](PGPeeringEventRef evt) { + if (event_loop && fixture) { + // Use event loop for deterministic execution + int shard = pg_whoami.osd; + event_loop->schedule_peering_event(shard, [this, evt, shard]() { + fixture->get_peering_state(shard)->handle_event(evt, fixture->get_peering_ctx(shard)); + }); + } else if (inject_event_stall) { + stalled_events.push_back(evt); + } else { + events.push_back(evt); + } + }; + + if (ps->needs_recovery()) { + dout(10) << "activate not all replicas are up-to-date, queueing recovery" << dendl; + schedule_event(std::make_shared( + get_osdmap_epoch(), + get_osdmap_epoch(), + PeeringState::DoRecovery())); + } else if (ps->needs_backfill()) { + dout(10) << "activate queueing backfill" << dendl; + schedule_event(std::make_shared( + get_osdmap_epoch(), + get_osdmap_epoch(), + PeeringState::RequestBackfill())); +#if POOL_MIGRATION + } else if (ps->needs_pool_migration()) { + dout(10) << "activate queueing pool migration" << dendl; + schedule_event(std::make_shared( + get_osdmap_epoch(), + get_osdmap_epoch(), + PeeringState::DoPoolMigration())); +#endif + } else { + dout(10) << "activate all replicas clean, no recovery" << dendl; + schedule_event(std::make_shared( + get_osdmap_epoch(), + get_osdmap_epoch(), + PeeringState::AllReplicasRecovered())); + } + activate_complete_called = true; +} + diff --git a/src/test/osd/MockPeeringListener.h b/src/test/osd/MockPeeringListener.h index 12c05151146a..ee80b2217424 100644 --- a/src/test/osd/MockPeeringListener.h +++ b/src/test/osd/MockPeeringListener.h @@ -25,8 +25,13 @@ #include "MockPGBackendListener.h" #include "MockPGBackend.h" #include "MockPGLogEntryHandler.h" +#include "MockMessenger.h" #include "global/global_context.h" +// Forward declarations +class EventLoop; +class ECPeeringTestFixture; + #define dout_context g_ceph_context #define dout_subsys ceph_subsys_osd @@ -44,11 +49,22 @@ class MockPeeringListener : public PeeringState::PeeringListener { PerfCounters* logger_perf; std::vector next_acting; + // MockMessenger for routing cluster messages (optional - used by ECPeeringTestFixture) + MockMessenger* messenger = nullptr; + + // EventLoop for routing events (optional - used by ECPeeringTestFixture) + class EventLoop* event_loop = nullptr; + + // Fixture pointer for accessing peering state and context (optional - used by ECPeeringTestFixture) + class ECPeeringTestFixture* fixture = nullptr; + #ifdef WITH_CRIMSON - // Per OSD state + // Per OSD state - kept for backward compatibility with TestPeeringState + // When messenger is set, messages are routed through it instead std::map> messages; #else - // Per OSD state + // Per OSD state - kept for backward compatibility with TestPeeringState + // When messenger is set, messages are routed through it instead std::map> messages; #endif std::vector hb_stamps; @@ -85,6 +101,30 @@ class MockPeeringListener : public PeeringState::PeeringListener { logger_perf = build_osd_logger(g_ceph_context); g_ceph_context->get_perfcounters_collection()->add(logger_perf); } + /// Constructor for ECPeeringTestFixture: accepts a pre-created backend listener + /// instead of creating one internally. This avoids the throw-away construction + /// pattern where the internally-created listener would be immediately replaced. + MockPeeringListener(OSDMapRef osdmap, + int64_t pool_id, + DoutPrefixProvider *dpp, + pg_shard_t pg_whoami, + std::unique_ptr bl, + ObjectStore *object_store, + coll_t coll_arg, + ObjectStore::CollectionHandle ch_arg) + : pg_whoami(pg_whoami), + backend_listener(std::move(bl)), + coll(coll_arg), + ch(ch_arg) + { + backend = std::make_unique( + g_ceph_context, backend_listener.get(), object_store, coll, ch); + recoverystate_perf = build_recoverystate_perf(g_ceph_context); + g_ceph_context->get_perfcounters_collection()->add(recoverystate_perf); + logger_perf = build_osd_logger(g_ceph_context); + g_ceph_context->get_perfcounters_collection()->add(logger_perf); + } + ~MockPeeringListener() { if (recoverystate_perf) { @@ -134,19 +174,43 @@ class MockPeeringListener : public PeeringState::PeeringListener { #ifdef WITH_CRIMSON void send_cluster_message( int osd, MessageURef m, epoch_t epoch, bool share_map_update=false) override { - dout(0) << "send_cluster_message to " << osd << " " << m << dendl; - messages[osd].push_back(m); + dout(0) << "send_cluster_message to " << osd << " " << m << " epoch " << epoch << dendl; + if (messenger) { + // Use MockMessenger for EventLoop-based routing with epoch tracking + messenger->send_message(pg_whoami.osd, osd, m.detach()); + } else { + // Fall back to direct message queue for TestPeeringState compatibility + messages[osd].push_back(m); + } messages_sent++; } #else void send_cluster_message( int osd, MessageRef m, epoch_t epoch, bool share_map_update=false) override { - dout(0) << "send_cluster_message to " << osd << " " << m << dendl; - messages[osd].push_back(m); + dout(0) << "send_cluster_message to " << osd << " " << m << " epoch " << epoch << dendl; + if (messenger) { + // Use MockMessenger for EventLoop-based routing with epoch tracking + messenger->send_message(pg_whoami.osd, osd, m.detach()); + } else { + // Fall back to direct message queue for TestPeeringState compatibility + messages[osd].push_back(m); + } messages_sent++; } #endif + void set_messenger(MockMessenger* m) { + messenger = m; + } + + void set_event_loop(EventLoop* el) { + event_loop = el; + } + + void set_fixture(ECPeeringTestFixture* f) { + fixture = f; + } + void send_pg_created(pg_t pgid) override { pg_created_sent = true; } @@ -203,17 +267,7 @@ class MockPeeringListener : public PeeringState::PeeringListener { void request_local_background_io_reservation( unsigned priority, PGPeeringEventURef on_grant, - PGPeeringEventURef on_preempt) override { - if (inject_event_stall) { - stalled_events.push_back(std::move(on_grant)); - } else { - events.push_back(std::move(on_grant)); - } - if (inject_keep_preempt) { - stalled_events.push_back(std::move(on_preempt)); - } - io_reservations_requested++; - } + PGPeeringEventURef on_preempt) override; void update_local_background_io_priority( unsigned priority) override { @@ -227,17 +281,7 @@ class MockPeeringListener : public PeeringState::PeeringListener { void request_remote_recovery_reservation( unsigned priority, PGPeeringEventURef on_grant, - PGPeeringEventURef on_preempt) override { - if (inject_event_stall) { - stalled_events.push_back(std::move(on_grant)); - } else { - events.push_back(std::move(on_grant)); - } - if (inject_keep_preempt) { - stalled_events.push_back(std::move(on_preempt)); - } - remote_recovery_reservations_requested++; - } + PGPeeringEventURef on_preempt) override; void cancel_remote_recovery_reservation() override { remote_recovery_reservation_cancelled = true; @@ -245,14 +289,7 @@ class MockPeeringListener : public PeeringState::PeeringListener { void schedule_event_on_commit( ObjectStore::Transaction &t, - PGPeeringEventRef on_commit) override { - if (inject_event_stall) { - stalled_events.push_back(std::move(on_commit)); - } else { - events.push_back(std::move(on_commit)); - } - events_on_commit_scheduled++; - } + PGPeeringEventRef on_commit) override; void update_heartbeat_peers(std::set peers) override { heartbeat_peers_updated = true; @@ -444,48 +481,7 @@ class MockPeeringListener : public PeeringState::PeeringListener { return OstreamTemp(CLOG_DEBUG, nullptr); } - void on_activate_complete() override { - dout(0) << __func__ << dendl; - std::list *event_queue; - if (inject_event_stall) { - event_queue = &stalled_events; - } else { - event_queue = &events; - } - - if (ps->needs_recovery()) { - dout(10) << "activate not all replicas are up-to-date, queueing recovery" << dendl; - event_queue->push_back( - std::make_shared( - get_osdmap_epoch(), - get_osdmap_epoch(), - PeeringState::DoRecovery())); - } else if (ps->needs_backfill()) { - dout(10) << "activate queueing backfill" << dendl; - event_queue->push_back( - std::make_shared( - get_osdmap_epoch(), - get_osdmap_epoch(), - PeeringState::RequestBackfill())); -#if POOL_MIGRATION - } else if (ps->needs_pool_migration()) { - dout(10) << "activate queueing pool migration" << dendl; - event_queue->push_back( - std::make_shared( - get_osdmap_epoch(), - get_osdmap_epoch(), - PeeringState::DoPoolMigration())); -#endif - } else { - dout(10) << "activate all replicas clean, no recovery" << dendl; - event_queue->push_back( - std::make_shared( - get_osdmap_epoch(), - get_osdmap_epoch(), - PeeringState::AllReplicasRecovered())); - } - activate_complete_called = true; - } + void on_activate_complete() override; void on_activate_committed() override { activate_committed_called = true; diff --git a/src/test/osd/OSDMapTestHelpers.h b/src/test/osd/OSDMapTestHelpers.h index 916f7ccd7ac0..f3904acb99a1 100644 --- a/src/test/osd/OSDMapTestHelpers.h +++ b/src/test/osd/OSDMapTestHelpers.h @@ -178,12 +178,13 @@ public: int k, int m, uint64_t stripe_width, - uint64_t flags, - int64_t pool_id = 0) + uint64_t flags) { pg_pool_t pool; pool.type = pg_pool_t::TYPE_ERASURE; + pool.size = k + m; + pool.min_size = k; pool.crush_rule = 0; pool.erasure_code_profile = "default"; diff --git a/src/test/osd/PGBackendTestFixture.cc b/src/test/osd/PGBackendTestFixture.cc index c54bfc5d2d4e..39b1fc9a3c1e 100644 --- a/src/test/osd/PGBackendTestFixture.cc +++ b/src/test/osd/PGBackendTestFixture.cc @@ -15,15 +15,23 @@ #include "test/osd/PGBackendTestFixture.h" #include "common/errno.h" +#include "messages/MOSDECSubOpWrite.h" +#include "messages/MOSDECSubOpWriteReply.h" +#include "messages/MOSDECSubOpRead.h" +#include "messages/MOSDECSubOpReadReply.h" +#include "messages/MOSDRepOp.h" +#include "messages/MOSDRepOpReply.h" void PGBackendTestFixture::setup_ec_pool() { CephContext *cct = g_ceph_context; + int num_osds = k + m; + osdmap = std::make_shared(); - osdmap->set_max_osd(k + m); + osdmap->set_max_osd(num_osds); - for (int i = 0; i < k + m; i++) { + for (int i = 0; i < num_osds; i++) { osdmap->set_state(i, CEPH_OSD_EXISTS); osdmap->set_weight(i, CEPH_OSD_OUT); osdmap->crush->set_item_name(i, "osd." + std::to_string(i)); @@ -33,7 +41,7 @@ void PGBackendTestFixture::setup_ec_pool() OSDMap::Incremental inc(osdmap->get_epoch() + 1); inc.fsid = osdmap->get_fsid(); - for (int i = 0; i < k + m; i++) { + for (int i = 0; i < num_osds; i++) { inc.new_state[i] = CEPH_OSD_UP; inc.new_weight[i] = CEPH_OSD_IN; @@ -52,13 +60,17 @@ void PGBackendTestFixture::setup_ec_pool() // This will properly calculate up_osd_features osdmap->apply_incremental(inc); - pg_pool_t pool = OSDMapTestHelpers::create_ec_pool(k, m, stripe_unit * k, pool_flags, pool_id); + pg_pool_t pool = OSDMapTestHelpers::create_ec_pool(k, m, stripe_unit * k, pool_flags); OSDMapTestHelpers::add_pool(osdmap, pool_id, pool); pgid = pg_t(0, pool_id); spgid = spg_t(pgid, shard_id_t(0)); - OSDMapTestHelpers::setup_ec_pg(osdmap, pgid, k, m, 0); + std::vector acting; + for (int i = 0; i < num_osds; i++) { + acting.push_back(i); + } + OSDMapTestHelpers::set_pg_acting(osdmap, pgid, acting); // Finalize the CRUSH map to calculate working_size // This is required for crush_init_workspace() to work correctly @@ -95,7 +107,7 @@ void PGBackendTestFixture::setup_ec_pool() } ObjectStore::Transaction t; - for (int i = 0; i < k + m; i++) { + for (int i = 0; i < num_osds; i++) { spg_t shard_spgid(pgid, shard_id_t(i)); coll_t shard_coll(shard_spgid); auto shard_ch = store->create_new_collection(shard_coll); @@ -115,28 +127,14 @@ void PGBackendTestFixture::setup_ec_pool() const pg_pool_t* pool_ptr = OSDMapTestHelpers::get_pool(osdmap, pool_id); ceph_assert(pool_ptr != nullptr); - for (int i = 0; i < k + m; i++) { - std::unique_ptr shard_listener; - if (listener_factory) { - shard_listener = listener_factory( - i, - osdmap, - pool_id, - dpp.get(), - pg_shard_t(i, shard_id_t(i))); - } else { - shard_listener = std::make_unique( - osdmap, - pool_id, - dpp.get(), - pg_shard_t(i, shard_id_t(i)) - ); - } + for (int i = 0; i < num_osds; i++) { + auto shard_listener = std::make_unique( + osdmap, pool_id, dpp.get(), pg_shard_t(i, shard_id_t(i))); // Initialize the listener's own info.pgid so OSDMap queries work shard_listener->info.pgid = spg_t(pgid, shard_id_t(i)); - for (int j = 0; j < k + m; j++) { + for (int j = 0; j < num_osds; j++) { shard_listener->shardset.insert(pg_shard_t(j, shard_id_t(j))); shard_listener->acting_recovery_backfill_shard_id_set.insert(shard_id_t(j)); @@ -152,7 +150,6 @@ void PGBackendTestFixture::setup_ec_pool() shard_listener->set_store(store.get(), chs[i]); shard_listener->set_event_loop(event_loop.get()); - shard_listener->set_op_tracker(op_tracker.get()); auto shard_lru = std::make_unique(1024 * 1024 * 100); auto shard_ec_switch = std::make_unique( @@ -164,18 +161,44 @@ void PGBackendTestFixture::setup_ec_pool() backends[i] = std::move(shard_ec_switch); } - for (int i = 0; i < k + m; i++) { - message_router[i] = [this, i](OpRequestRef op) -> bool { - return backends[i]->_handle_message(op); - }; - } - - for (int i = 0; i < k + m; i++) { - listeners[i]->set_message_router(&message_router); - listeners[i]->set_handle_message_callback( - [this, i](OpRequestRef op) -> bool { - return backends[i]->_handle_message(op); + // Create MockMessenger and register a single handler that routes to backends + messenger = std::make_unique(event_loop.get(), cct); + + // Set up epoch getter for MockMessenger to enable epoch-based message filtering + messenger->set_epoch_getter([this](int osd) -> epoch_t { + // Get the epoch from the listener's osdmap + auto it = listeners.find(osd); + if (it != listeners.end()) { + return it->second->pgb_get_osdmap_epoch(); + } + // If listener doesn't exist yet, use the test fixture's osdmap + return osdmap->get_epoch(); + }); + + // Create an OpTracker for wrapping messages in OpRequestRef + // This is needed because PGBackend::_handle_message expects OpRequestRef + // Store as member variable so it can be properly shut down in TearDown() + op_tracker = std::make_shared(cct, true, 1); + + // Helper lambda to create a typed handler that wraps messages and routes to backends + auto make_backend_handler = [this](int msg_type) { + messenger->register_typed_handler(msg_type, + [this](int from_osd, int to_osd, MsgType* m) -> bool { + auto it = backends.find(to_osd); + ceph_assert(it != backends.end()); + OpRequestRef op = this->op_tracker->create_request(m); + return it->second->_handle_message(op); }); + }; + + // Register typed handlers for all EC message types + make_backend_handler.template operator()(MSG_OSD_EC_WRITE); + make_backend_handler.template operator()(MSG_OSD_EC_WRITE_REPLY); + make_backend_handler.template operator()(MSG_OSD_EC_READ); + make_backend_handler.template operator()(MSG_OSD_EC_READ_REPLY); + + for (int i = 0; i < num_osds; i++) { + listeners[i]->set_messenger(messenger.get()); } } @@ -233,22 +256,8 @@ void PGBackendTestFixture::setup_replicated_pool() ceph_assert(pool_ptr != nullptr); for (int i = 0; i < num_replicas; i++) { - std::unique_ptr replica_listener; - if (listener_factory) { - replica_listener = listener_factory( - i, - osdmap, - pool_id, - dpp.get(), - pg_shard_t(i, shard_id_t::NO_SHARD)); - } else { - replica_listener = std::make_unique( - osdmap, - pool_id, - dpp.get(), - pg_shard_t(i, shard_id_t::NO_SHARD) - ); - } + auto replica_listener = std::make_unique( + osdmap, pool_id, dpp.get(), pg_shard_t(i, shard_id_t::NO_SHARD)); // Initialize the listener's own info.pgid so OSDMap queries work replica_listener->info.pgid = spg_t(pgid, shard_id_t::NO_SHARD); @@ -269,7 +278,6 @@ void PGBackendTestFixture::setup_replicated_pool() replica_listener->set_store(store.get(), chs[i]); replica_listener->set_event_loop(event_loop.get()); - replica_listener->set_op_tracker(op_tracker.get()); auto replica_backend = std::make_unique( replica_listener.get(), colls[i], chs[i], store.get(), cct); @@ -278,18 +286,42 @@ void PGBackendTestFixture::setup_replicated_pool() backends[i] = std::move(replica_backend); } - for (int i = 0; i < num_replicas; i++) { - message_router[i] = [this, i](OpRequestRef op) -> bool { - return backends[i]->_handle_message(op); - }; - } + // Create MockMessenger and register a single handler that routes to backends + messenger = std::make_unique(event_loop.get(), cct); + + // Set up epoch getter for MockMessenger to enable epoch-based message filtering + messenger->set_epoch_getter([this](int osd) -> epoch_t { + // Get the epoch from the listener's osdmap + auto it = listeners.find(osd); + if (it != listeners.end()) { + return it->second->pgb_get_osdmap_epoch(); + } + // If listener doesn't exist yet, use the test fixture's osdmap + return osdmap->get_epoch(); + }); + + // Create an OpTracker for wrapping messages in OpRequestRef + // This is needed because PGBackend::_handle_message expects OpRequestRef + // Store as member variable so it can be properly shut down in TearDown() + op_tracker = std::make_shared(cct, true, 1); + + // Helper lambda to create a typed handler that wraps messages and routes to backends + auto make_backend_handler = [this](int msg_type) { + messenger->register_typed_handler(msg_type, + [this](int from_osd, int to_osd, MsgType* m) -> bool { + auto it = backends.find(to_osd); + ceph_assert(it != backends.end()); + OpRequestRef op = this->op_tracker->create_request(m); + return it->second->_handle_message(op); + }); + }; + + // Register typed handlers for replicated backend message types + make_backend_handler.template operator()(MSG_OSD_REPOP); + make_backend_handler.template operator()(MSG_OSD_REPOPREPLY); for (int i = 0; i < num_replicas; i++) { - listeners[i]->set_message_router(&message_router); - listeners[i]->set_handle_message_callback( - [this, i](OpRequestRef op) -> bool { - return backends[i]->_handle_message(op); - }); + listeners[i]->set_messenger(messenger.get()); } } @@ -298,7 +330,8 @@ int PGBackendTestFixture::do_transaction_and_complete( PGTransactionUPtr pg_t, const object_stat_sum_t& delta_stats, const eversion_t& at_version, - std::vector log_entries) + std::vector log_entries, + std::function on_write_complete) { eversion_t trim_to(0, 0); eversion_t pg_committed_to(0, 0); @@ -306,12 +339,16 @@ int PGBackendTestFixture::do_transaction_and_complete( bool completed = false; int completion_result = -1; - Context *on_complete = new LambdaContext([&completed, &completion_result](int r) { + Context *on_complete = new LambdaContext([&completed, &completion_result, on_write_complete](int r) { completed = true; completion_result = r; + // Call the write-specific completion lambda if provided + if (on_write_complete) { + on_write_complete(r); + } }); - ceph_tid_t tid = 1; + ceph_tid_t tid = next_tid++; osd_reqid_t reqid(entity_name_t::OSD(0), 0, tid); PGBackend* primary_backend = get_primary_backend(); @@ -331,10 +368,10 @@ int PGBackendTestFixture::do_transaction_and_complete( OpRequestRef() ); - event_loop->run_until_idle(10000); + event_loop->run_until_idle(); if (!completed) { - throw std::runtime_error("Transaction did not complete within timeout"); + completion_result = -EINPROGRESS; } return completion_result; @@ -342,16 +379,26 @@ int PGBackendTestFixture::do_transaction_and_complete( int PGBackendTestFixture::create_and_write( const std::string& obj_name, - const std::string& data, - const eversion_t& at_version) + const std::string& data) { + // Auto-generate version + eversion_t at_version = get_next_version(); + hobject_t hoid = make_test_object(obj_name); PGTransactionUPtr pg_t = std::make_unique(); pg_t->create(hoid); - ObjectContextRef obc = make_object_context(hoid, false, 0); + // Use persistent OBC so attr_cache is maintained across operations + ObjectContextRef obc = get_or_create_obc(hoid, false, 0); pg_t->obc_map[hoid] = obc; + // Note: We do NOT pre-seed attr_cache here. For a new object, attr_cache + // should be empty. ECTransaction::attr_updates() will update attr_cache + // with the new OI from PGTransaction::attr_updates during the transaction. + + // Track outstanding write + outstanding_writes[hoid]++; + bufferlist bl; bl.append(data); pg_t->write(hoid, 0, bl.length(), bl); @@ -360,6 +407,34 @@ int PGBackendTestFixture::create_and_write( delta_stats.num_objects = 1; delta_stats.num_bytes = bl.length(); + // Build the NEW OI that finish_ctx() would produce + object_info_t new_oi = obc->obs.oi; + new_oi.version = at_version; + new_oi.prior_version = obc->obs.oi.version; + new_oi.size = bl.length(); + + // Encode new OI and put into PGTransaction as an attr update. + // This matches PrimaryLogPG::finish_ctx() lines 9127-9130,9142. + { + bufferlist oi_bl; + new_oi.encode(oi_bl, + osdmap->get_features(CEPH_ENTITY_TYPE_OSD, nullptr)); + pg_t->setattr(hoid, OI_ATTR, oi_bl); + } + + // snapset + if (hoid.snap == CEPH_NOSNAP) { + bufferlist bss; + encode(SnapSet(), bss); + pg_t->setattr(hoid, SS_ATTR, bss); + } + + // Update OBC obs to new state BEFORE submitting the transaction. + // This matches PrimaryLogPG::finish_ctx() line 9187: ctx->obc->obs = ctx->new_obs + // At this point: obc->obs.oi has NEW state, obc->attr_cache[OI_ATTR] has OLD state. + obc->obs.oi = new_oi; + obc->obs.exists = true; + std::vector log_entries; pg_log_entry_t entry; entry.mark_unrollbackable(); @@ -369,45 +444,113 @@ int PGBackendTestFixture::create_and_write( entry.prior_version = eversion_t(0, 0); log_entries.push_back(entry); - int result = do_transaction_and_complete( - hoid, std::move(pg_t), delta_stats, at_version, std::move(log_entries)); + // Create completion lambda for write-specific cleanup + auto write_complete = [this, hoid, obc](int r) { + // Note: we do NOT update obc->obs after completion — it was already + // updated above before submit, matching PrimaryLogPG behavior. + // ECTransaction::attr_updates() will have updated attr_cache[OI_ATTR] + // to the new encoded OI during the transaction. + + // Decrement outstanding writes counter + if (outstanding_writes[hoid] > 0) { + outstanding_writes[hoid]--; + // Clean up the counter if it reaches 0, but don't clear attr_cache here. + // The attr_cache will be cleared on on_change() events. + if (outstanding_writes[hoid] == 0) { + outstanding_writes.erase(hoid); + } + } - if (result == 0) { - obc->obs.exists = true; - obc->obs.oi.size = bl.length(); - obc->obs.oi.version = at_version; - } + if (r != 0 && r != -EINPROGRESS) { + // Transaction failed — roll back OBC state. + // In production this would be handled differently, but for tests + // we just reset to a clean state. + obc->obs.oi = object_info_t(hoid); + obc->obs.exists = false; + obc->attr_cache.clear(); + outstanding_writes.erase(hoid); + } + }; + + int result = do_transaction_and_complete( + hoid, std::move(pg_t), delta_stats, at_version, std::move(log_entries), write_complete); return result; } +ObjectContextRef PGBackendTestFixture::get_object_context( + const hobject_t& hoid) +{ + PGBackend* primary_backend = get_primary_backend(); + ObjectContextRef obc = std::make_shared(); + obc->obs.oi = object_info_t(hoid); + obc->obs.exists = false; + obc->ssc = nullptr; + + // Try to read the ObjectInfo from the store + ghobject_t ghoid(hoid, ghobject_t::NO_GEN, primary_backend->get_parent()->whoami_shard().shard); + ceph::buffer::ptr value_ptr; + int r = store->getattr(ch, ghoid, OI_ATTR, value_ptr); + ceph_assert(r >= 0 && value_ptr.length() > 0); + + bufferlist bl; + bl.append(value_ptr); + auto p = bl.cbegin(); + obc->obs.oi.decode(p); + obc->obs.exists = true; + + return obc; +} + int PGBackendTestFixture::write( const std::string& obj_name, uint64_t offset, const std::string& data, - const eversion_t& prior_version, - const eversion_t& at_version, uint64_t object_size) { hobject_t hoid = make_test_object(obj_name); PGTransactionUPtr pg_t = std::make_unique(); - ObjectContextRef obc = make_object_context(hoid, true, object_size); - obc->obs.oi.version = prior_version; + ObjectContextRef obc = get_or_create_obc(hoid, true, object_size); pg_t->obc_map[hoid] = obc; + // Track outstanding write + outstanding_writes[hoid]++; + bufferlist bl; bl.append(data); pg_t->write(hoid, offset, bl.length(), bl); - object_stat_sum_t delta_stats; uint64_t new_size = std::max(object_size, offset + bl.length()); + + object_stat_sum_t delta_stats; if (new_size > object_size) { delta_stats.num_bytes = new_size - object_size; } else { delta_stats.num_bytes = 0; } + // Prior version comes from the object's current version + eversion_t prior_version = obc->obs.oi.version; + eversion_t at_version = get_next_version(); + + // Build the NEW OI + object_info_t new_oi = obc->obs.oi; + new_oi.version = at_version; + new_oi.prior_version = prior_version; + new_oi.size = new_size; + + // Encode new OI into PGTransaction + { + bufferlist oi_bl; + new_oi.encode(oi_bl, + osdmap->get_features(CEPH_ENTITY_TYPE_OSD, nullptr)); + pg_t->setattr(hoid, OI_ATTR, oi_bl); + } + + // Update OBC obs to new state BEFORE submitting + obc->obs.oi = new_oi; + std::vector log_entries; pg_log_entry_t entry; // Don't mark as unrollbackable - partial writes need rollback support @@ -417,13 +560,29 @@ int PGBackendTestFixture::write( entry.prior_version = prior_version; log_entries.push_back(entry); - int result = do_transaction_and_complete( - hoid, std::move(pg_t), delta_stats, at_version, std::move(log_entries)); + // Create completion lambda for write-specific cleanup + auto write_complete = [this, hoid, obc, prior_version, object_size](int r) { + // Decrement outstanding writes counter + if (outstanding_writes[hoid] > 0) { + outstanding_writes[hoid]--; + // Clean up the counter if it reaches 0, but don't clear attr_cache here. + // The attr_cache will be cleared on on_change() events. + if (outstanding_writes[hoid] == 0) { + outstanding_writes.erase(hoid); + } + } - if (result == 0) { - obc->obs.oi.size = new_size; - obc->obs.oi.version = at_version; - } + if (r != 0 && r != -EINPROGRESS) { + // Roll back OBC on failure + obc->obs.oi.version = prior_version; + obc->obs.oi.size = object_size; + obc->attr_cache.clear(); + outstanding_writes.erase(hoid); + } + }; + + int result = do_transaction_and_complete( + hoid, std::move(pg_t), delta_stats, at_version, std::move(log_entries), write_complete); return result; } @@ -468,11 +627,9 @@ int PGBackendTestFixture::read_object( false ); - event_loop->run_until_idle(10000); + event_loop->run_until_idle(); - if (!completed) { - throw std::runtime_error("Read operation did not complete within timeout"); - } + ceph_assert(completed); return completion_result; } else { @@ -493,6 +650,62 @@ int PGBackendTestFixture::read_object( } } +void PGBackendTestFixture::verify_object( + const std::string& obj_name, + const std::string& expected_data, + size_t offset, + size_t object_size) +{ + bufferlist read_data; + int read_result = read_object(obj_name, offset, expected_data.length(), read_data, object_size); + + EXPECT_GE(read_result, 0) << "Read should complete successfully"; + EXPECT_EQ(read_data.length(), expected_data.length()) << "Read data length should match"; + + if (read_data.length() == expected_data.length()) { + std::string read_string(read_data.c_str(), read_data.length()); + EXPECT_EQ(read_string, expected_data) << "Data should match"; + } +} + +void PGBackendTestFixture::create_and_write_verify( + const std::string& obj_name, + const std::string& data) +{ + int result = create_and_write(obj_name, data); + + EXPECT_GE(result, 0) << "Write should complete successfully"; + + // Always verify - tests should only use this helper when success is expected + verify_object(obj_name, data, 0, data.length()); +} + +void PGBackendTestFixture::write_verify( + const std::string& obj_name, + size_t offset, + const std::string& data, + size_t object_size, + const std::string& context_msg) +{ + int result = write(obj_name, offset, data, object_size); + + std::string msg_suffix = context_msg.empty() ? "" : " (" + context_msg + ")"; + EXPECT_GE(result, 0) << "Write should complete successfully" << msg_suffix; + + // Always verify - tests should only use this helper when success is expected + bufferlist read_data; + int read_result = read_object(obj_name, offset, data.length(), read_data, + std::max(object_size, offset + data.length())); + + EXPECT_GE(read_result, 0) << "Read should complete successfully" << msg_suffix; + EXPECT_EQ(read_data.length(), data.length()) << "Read data length should match" << msg_suffix; + + if (read_data.length() == data.length()) { + std::string read_string(read_data.c_str(), read_data.length()); + EXPECT_EQ(read_string, data) << "Written data should match" << msg_suffix; + } +} + // --------------------------------------------------------------------------- // NOTE: update_osdmap() intentionally does NOT reconcile listener acting sets // @@ -523,22 +736,31 @@ void PGBackendTestFixture::update_osdmap( std::shared_ptr new_osdmap, std::optional new_primary) { - // Step 1: Call on_change() on all backends to clear in-flight operations - for (auto& [instance, be] : backends) { - if (be) { - be->on_change(); - } - } - - // Step 2: Update the osdmap reference + // Step 1: Update the osdmap reference first osdmap = new_osdmap; - // Step 3: Update the osdmap in all listeners + // Step 2: Update the osdmap in all listeners for (auto& [instance, list] : listeners) { if (list) { list->osdmap = new_osdmap; } } + + // Step 3: Clear all attr_caches before on_change() + // The cached OI attributes may be stale after a peering event + clear_all_attr_caches(); + + // Step 4: Schedule on_change() calls as event loop actions + // This allows them to be delayed and processed after the new epoch + for (auto& [instance, be] : backends) { + if (be) { + PGBackend* backend_ptr = be.get(); + event_loop->schedule_peering_event(instance, [backend_ptr]() { + backend_ptr->on_change(); + }); + } + } + event_loop->run_until_idle(); } void PGBackendTestFixture::cleanup_data_dir() @@ -551,3 +773,13 @@ void PGBackendTestFixture::cleanup_data_dir() } } +void PGBackendTestFixture::clear_all_attr_caches() +{ + // Clear attr_cache for all objects. This is called on on_change() to + // invalidate cached attributes that might be stale after a peering event. + for (auto& [hoid, obc] : object_contexts) { + if (obc) { + obc->attr_cache.clear(); + } + } +} diff --git a/src/test/osd/PGBackendTestFixture.h b/src/test/osd/PGBackendTestFixture.h index 49c703ffaa01..cab26ba58f8f 100644 --- a/src/test/osd/PGBackendTestFixture.h +++ b/src/test/osd/PGBackendTestFixture.h @@ -25,6 +25,7 @@ #include "test/osd/MockErasureCode.h" #include "test/osd/MockPGBackendListener.h" #include "test/osd/EventLoop.h" +#include "test/osd/MockMessenger.h" #include "common/TrackedOp.h" #include "os/memstore/MemStore.h" #include "osd/ECSwitch.h" @@ -63,36 +64,28 @@ protected: coll_t coll; std::shared_ptr osdmap; - std::unique_ptr op_tracker; std::unique_ptr event_loop; - std::map> message_router; + std::unique_ptr messenger; std::map> listeners; std::map> backends; std::map colls; std::map chs; - /** - * Optional listener factory callback. - * - * If set, setup_ec_pool() and setup_replicated_pool() will call this - * factory instead of constructing MockPGBackendListener directly. - * The factory receives the instance index and the parameters needed to - * construct the listener, and must return a unique_ptr to the new - * MockPGBackendListener. The returned object is stored in listeners[i] - * as usual, so ownership stays with the base class. - * - * Derived classes (e.g. ECPeeringTestFixture) can set this in their - * constructor to gain direct access to the created listeners without - * needing to steal ownership via release_listener(). - */ - std::function( - int instance, - std::shared_ptr osdmap, - int64_t pool_id, - DoutPrefixProvider* dpp, - pg_shard_t whoami)> listener_factory; - + /// Persistent OBC storage - emulates PrimaryLogPG's object_contexts LRU. + /// Keyed by hobject_t, values are shared_ptr so the same OBC is reused + /// across sequential operations on the same object. This is critical for + /// EC attr_cache continuity. + std::map object_contexts; + + /// Track outstanding writes per object. When this reaches 0, we can safely + /// clear attr_cache (as there are no in-flight writes that might have stale + /// cached OI data). + std::map outstanding_writes; + + // OpTracker for wrapping messages in OpRequestRef + std::shared_ptr op_tracker; + ceph::ErasureCodeInterfaceRef ec_impl; std::map> lrus; int k = 4; // data chunks @@ -100,7 +93,7 @@ protected: uint64_t stripe_unit = 4096; // aka chunk_size std::string ec_plugin = "isa"; std::string ec_technique = "reed_sol_van"; - + int num_replicas = 3; int min_size = 2; @@ -108,6 +101,13 @@ protected: pg_t pgid; spg_t spgid; + // Transaction ID counter - increments with each transaction + ceph_tid_t next_tid = 1; + + // Version counter for auto-generating versions in write* functions + // The epoch comes from osdmap, this tracks the second version number + uint64_t next_version = 1; + class TestDpp : public NoDoutPrefix { public: TestDpp(CephContext *cct) : NoDoutPrefix(cct, ceph_subsys_osd) {} @@ -159,7 +159,6 @@ public: CephContext *cct = g_ceph_context; dpp = std::make_unique(cct); event_loop = std::make_unique(false); - op_tracker = std::make_unique(cct, false, 1); if (pool_type == EC) { setup_ec_pool(); @@ -169,55 +168,44 @@ public: } void TearDown() override { - // 0. Process any remaining events in the EventLoop. - // If the test passed, orphaned events indicate a bug - warn and skip draining - // so the test fails loudly. If the test already failed, drain silently to - // allow the rest of TearDown to complete without cascading errors. + if (event_loop) { if (event_loop->has_events()) { if (!HasFailure()) { ADD_FAILURE() << "TearDown: " << event_loop->queued_event_count() << " orphaned events remain after a passing test"; } - event_loop->run_until_idle(1000); + event_loop->run_until_idle(); } } - - // 1. Clean up all backend instances (polymorphic cleanup) - // Note: We skip calling on_change() during teardown as it may access - // invalid state. The backends will be destroyed anyway. + + if (op_tracker) { + op_tracker->on_shutdown(); + op_tracker.reset(); + } + backends.clear(); + object_contexts.clear(); + outstanding_writes.clear(); - // 2. Clean up EC-specific resources if (pool_type == EC) { lrus.clear(); ec_impl.reset(); } - - // 3. Clean up listeners + listeners.clear(); - - // 4. Reset op tracker (call on_shutdown first) - if (op_tracker) { - op_tracker->on_shutdown(); - op_tracker.reset(); - } - - // 5. Reset all collection handles chs.clear(); colls.clear(); if (ch) { ch.reset(); } - - // 6. Unmount and destroy the store + if (store) { store->umount(); store.reset(); } - - // 7. Clean up the test directory + cleanup_data_dir(); } @@ -290,17 +278,89 @@ public: return obc; } + /// Get an existing OBC or create a new one. + /// Unlike make_object_context(), this method reuses OBCs for the same + /// object across operations, which is essential for attr_cache continuity + /// in EC pools. + ObjectContextRef get_or_create_obc( + const hobject_t& hoid, + bool exists = false, + uint64_t size = 0) + { + auto it = object_contexts.find(hoid); + if (it != object_contexts.end()) { + return it->second; + } + ObjectContextRef obc = make_object_context(hoid, exists, size); + + // If the object exists and this is an EC pool, populate attr_cache with + // ALL attributes from disk. This matches production behavior where the OBC + // is loaded with all xattrs from the object store. + if (exists && pool_type == EC && store && !chs.empty()) { + auto writes_it = outstanding_writes.find(hoid); + bool has_outstanding_writes = (writes_it != outstanding_writes.end() && writes_it->second > 0); + + // Only read from disk if there are no outstanding writes + if (!has_outstanding_writes) { + ObjectStore::CollectionHandle ch_primary = chs[0]; + if (ch_primary) { + ghobject_t ghoid(hoid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD); + std::map> attrs; + int r = store->getattrs(ch_primary, ghoid, attrs); + + if (r >= 0) { + // Successfully read all attributes from disk - populate the cache + for (auto& [key, value_ptr] : attrs) { + bufferlist bl; + bl.append(value_ptr); + obc->attr_cache[key] = std::move(bl); + } + } + } + } + } + + object_contexts[hoid] = obc; + return obc; + } + + /** + * Set the next version number for auto-generation. + * This can be used by tests after rollback to set the version to a specific value. + * The epoch will still come from the osdmap. + */ + void set_next_version(uint64_t version) { + next_version = version; + } + + /** + * Get the next version as an eversion_t with epoch from osdmap. + * This auto-increments the version counter. + */ + eversion_t get_next_version() { + epoch_t epoch = osdmap->get_epoch(); + return eversion_t(epoch, next_version++); + } + + /** + * Read ObjectInfo from the store for an existing object. + * Returns an ObjectContext with the decoded ObjectInfo, or a new + * ObjectContext with default values if the object doesn't exist. + */ + ObjectContextRef get_object_context( + const hobject_t& hoid); + int do_transaction_and_complete( const hobject_t& hoid, PGTransactionUPtr pg_t, const object_stat_sum_t& delta_stats, const eversion_t& at_version, - std::vector log_entries); + std::vector log_entries, + std::function on_write_complete = nullptr); virtual int create_and_write( const std::string& obj_name, - const std::string& data, - const eversion_t& at_version = eversion_t(1, 1)); + const std::string& data); public: @@ -308,8 +368,6 @@ public: const std::string& obj_name, uint64_t offset, const std::string& data, - const eversion_t& prior_version, - const eversion_t& at_version, uint64_t object_size); int read_object( @@ -319,6 +377,63 @@ public: bufferlist& out_data, uint64_t object_size); + /** + * Read an object and verify that its contents match expected data. + * + * This helper function combines read_object with assertions to verify: + * 1. The read operation completes successfully (result >= 0) + * 2. The read data length matches expected length + * 3. The read data content matches expected content + * + * @param obj_name Name of the object to read + * @param expected_data Expected data content + * @param offset Offset to read from (default: 0) + * @param context_msg Optional context message to append to assertion messages + */ + void verify_object( + const std::string& obj_name, + const std::string& expected_data, + size_t offset, + size_t object_size); + + /** + * Create and write an object, then verify it was written correctly. + * + * This helper function combines create_and_write with verify_object to: + * 1. Create and write the object + * 2. Verify the write completed successfully (result == 0) + * 3. Read back and verify the data matches + * + * @param obj_name Name of the object to create and write + * @param data Data to write + * @param context_msg Optional context message to append to assertion messages + */ + void create_and_write_verify( + const std::string& obj_name, + const std::string& data); + + /** + * Write to an object (potentially with offset), then verify the write succeeded. + * + * This helper function combines write with verification to: + * 1. Write data at the specified offset + * 2. Verify the write completed successfully (result == 0) + * 3. Read back and verify the written data matches + * + * @param obj_name Name of the object to write + * @param offset Offset to write at + * @param data Data to write + * @param object_size Current size of the object + * @param context_msg Optional context message to append to assertion messages + * @return The result code from the write operation + */ + void write_verify( + const std::string& obj_name, + size_t offset, + const std::string& data, + size_t object_size, + const std::string& context_msg = ""); + /** * Update the OSDMap and trigger backend cleanup. * @@ -336,5 +451,12 @@ public: std::shared_ptr new_osdmap, std::optional new_primary = std::nullopt); + /** + * Clear attr_cache for all objects. + * Called on on_change() to invalidate cached attributes that might be stale + * after a peering event or OSDMap change. + */ + void clear_all_attr_caches(); + }; diff --git a/src/test/osd/TestBackendBasics.cc b/src/test/osd/TestBackendBasics.cc index 8e81f7546a46..2fae5b6da5af 100644 --- a/src/test/osd/TestBackendBasics.cc +++ b/src/test/osd/TestBackendBasics.cc @@ -79,6 +79,61 @@ public: void SetUp() override { PGBackendTestFixture::SetUp(); } + + /** + * Simulate failure of multiple OSDs by marking them down in the OSDMap. + * This is similar to TestECFailover::simulate_osd_failure but handles + * multiple failures at once. + */ + void simulate_multiple_osd_failures(const std::vector& failed_osds) { + auto new_osdmap = std::make_shared(); + new_osdmap->deepish_copy_from(*osdmap); + + // Build new acting set with failed OSDs replaced by CRUSH_ITEM_NONE + std::vector new_acting; + int total_osds = k + m; + + for (int i = 0; i < total_osds; i++) { + bool is_failed = std::find(failed_osds.begin(), failed_osds.end(), i) != failed_osds.end(); + new_acting.push_back(is_failed ? CRUSH_ITEM_NONE : i); + } + + // Get the pool to use pgtemp_primaryfirst transformation + const pg_pool_t* pool = new_osdmap->get_pg_pool(pgid.pool()); + ceph_assert(pool != nullptr); + + // For EC pools with optimizations, pgtemp_primaryfirst reorders the acting set + std::vector transformed_acting = new_osdmap->pgtemp_primaryfirst(*pool, new_acting); + + // Use OSDMap::Incremental to set pg_temp and mark OSDs as down + OSDMap::Incremental inc(new_osdmap->get_epoch() + 1); + inc.fsid = new_osdmap->get_fsid(); + + for (int failed_osd : failed_osds) { + inc.new_state[failed_osd] = CEPH_OSD_EXISTS; // Mark as down (exists but not UP) + } + + // Convert to mempool vector for pg_temp + mempool::osdmap::vector pg_temp_vec(transformed_acting.begin(), transformed_acting.end()); + inc.new_pg_temp[pgid] = pg_temp_vec; + + new_osdmap->apply_incremental(inc); + + // Finalize the CRUSH map + new_osdmap->crush->finalize(); + + // Update listener shardsets to remove failed shards + for (int failed_osd : failed_osds) { + pg_shard_t failed_shard(failed_osd, shard_id_t(failed_osd)); + for (auto& [instance_id, list] : listeners) { + list->shardset.erase(failed_shard); + list->acting_recovery_backfill_shard_id_set.erase(shard_id_t(failed_osd)); + } + } + + // update_osdmap will query the OSDMap to determine the primary + update_osdmap(new_osdmap); + } }; // --------------------------------------------------------------------------- @@ -100,9 +155,8 @@ TEST_P(TestBackendBasics, WriteThenRead) { std::string test_data(param.size, param.fill); std::string obj_name = "test_backend_" + backend_config.label + "_" + param.label; - // Execute create+write operation - int result = create_and_write(obj_name, test_data); - EXPECT_EQ(result, 0) << param.label << " write should complete successfully"; + // Execute create+write operation and verify + create_and_write_verify(obj_name, test_data); // Verify messages were sent to replicas/shards auto* primary_listener = get_primary_listener(); @@ -125,26 +179,8 @@ TEST_P(TestBackendBasics, WriteThenRead) { primary_listener->sent_messages.clear(); primary_listener->sent_messages_with_dest.clear(); - // Perform the read operation - bufferlist read_data; - int read_result = read_object( - obj_name, - 0, // offset - test_data.length(), // length - read_data, - test_data.length() // object_size - ); - - EXPECT_GE(read_result, 0) << param.label << " read should complete successfully"; - - // Verify data length - ASSERT_EQ(read_data.length(), test_data.length()) - << param.label << " read data length should match written data length"; - - // Verify data content - std::string read_string(read_data.c_str(), read_data.length()); - EXPECT_EQ(read_string, test_data) - << param.label << " read data should match written data"; + // Verify object can be read back correctly + verify_object(obj_name, test_data, 0, test_data.size()); // For EC backends: verify read messages were sent to shards if (backend_config.pool_type == EC) { @@ -195,7 +231,7 @@ TEST_P(TestBackendBasics, PartialWrite) { // Create initial data filled with the parameterized fill character std::string initial_data(initial_size, param.fill); - int result = create_and_write(obj_name, initial_data, eversion_t(1, 1)); + int result = create_and_write(obj_name, initial_data); EXPECT_EQ(result, 0) << param.label << " initial write should complete successfully"; // Partial write data uses the next fill character (wraps around 'z' -> 'a') @@ -206,8 +242,6 @@ TEST_P(TestBackendBasics, PartialWrite) { obj_name, partial_offset, partial_data, - eversion_t(1, 1), // prior_version - eversion_t(1, 2), // at_version initial_size // object_size ); EXPECT_EQ(result, 0) << param.label << " partial write should complete successfully"; @@ -483,13 +517,8 @@ TEST_P(TestECFailover, BasicOSDMapUpdate) { const std::string obj_name = "test_failover_object"; const std::string test_data = "Initial data before OSDMap change"; - int result = create_and_write(obj_name, test_data); - EXPECT_EQ(result, 0) << "Initial write should complete successfully"; - - bufferlist read_data; - int read_result = read_object(obj_name, 0, test_data.length(), read_data, test_data.length()); - EXPECT_GE(read_result, 0) << "Read should complete successfully"; - ASSERT_EQ(read_data.length(), test_data.length()); + // Write and verify initial data + create_and_write_verify(obj_name, test_data); auto new_osdmap = std::make_shared(); new_osdmap->deepish_copy_from(*osdmap); @@ -502,29 +531,16 @@ TEST_P(TestECFailover, BasicOSDMapUpdate) { ASSERT_TRUE(primary_listener != nullptr) << "Primary listener should exist"; EXPECT_EQ(primary_listener->osdmap, new_osdmap) << "Listener OSDMap should be updated"; - bufferlist read_data2; - read_result = read_object(obj_name, 0, test_data.length(), read_data2, test_data.length()); - EXPECT_GE(read_result, 0) << "Read after OSDMap update should complete successfully"; - ASSERT_EQ(read_data2.length(), test_data.length()); - - std::string read_string(read_data2.c_str(), read_data2.length()); - EXPECT_EQ(read_string, test_data) << "Data should match after OSDMap update"; + // Verify data can still be read after OSDMap update + verify_object(obj_name, test_data, 0, test_data.size()); } TEST_P(TestECFailover, PrimaryFailover) { const std::string obj_name = "test_primary_failover"; const std::string test_data = "Data written before primary failover"; - int result = create_and_write(obj_name, test_data); - EXPECT_EQ(result, 0) << "Initial write should complete successfully"; - - bufferlist read_data; - int read_result = read_object(obj_name, 0, test_data.length(), read_data, test_data.length()); - EXPECT_GE(read_result, 0) << "Read should complete successfully"; - ASSERT_EQ(read_data.length(), test_data.length()); - - std::string read_string(read_data.c_str(), read_data.length()); - EXPECT_EQ(read_string, test_data) << "Data should match before failover"; + // Write and verify initial data + create_and_write_verify(obj_name, test_data); EXPECT_TRUE(listeners[0]->pgb_is_primary()) << "Instance 0 should be primary before failover"; @@ -553,13 +569,8 @@ TEST_P(TestECFailover, PrimaryFailover) { EXPECT_EQ(new_primary_backend, backends[expected_new_primary].get()) << "get_primary_backend() should return the new primary"; - bufferlist read_data_after; - int read_result_after = read_object(obj_name, 0, test_data.length(), read_data_after, test_data.length()); - EXPECT_GE(read_result_after, 0) << "Degraded read should complete successfully after failover"; - ASSERT_EQ(read_data_after.length(), test_data.length()); - - std::string read_string_after(read_data_after.c_str(), read_data_after.length()); - EXPECT_EQ(read_string_after, test_data) << "Data should match after failover with EC reconstruction"; + // Verify degraded read works after failover with EC reconstruction + verify_object(obj_name, test_data, 0, test_data.size()); EXPECT_TRUE(new_primary_listener != nullptr) << "Primary listener should exist after failover"; EXPECT_GT(new_primary_listener->osdmap->get_epoch(), 1) diff --git a/src/test/osd/TestECFailoverWithPeering.cc b/src/test/osd/TestECFailoverWithPeering.cc index 37fffcb2fd8b..6a6a3d347618 100644 --- a/src/test/osd/TestECFailoverWithPeering.cc +++ b/src/test/osd/TestECFailoverWithPeering.cc @@ -15,28 +15,36 @@ #include #include "test/osd/ECPeeringTestFixture.h" +#include "test/osd/TestCommon.h" using namespace std; -class TestECFailoverWithPeering : public ECPeeringTestFixture { +/** + * TestECFailoverWithPeering - parameterized EC peering and failover tests. + * + * This fixture is parameterized over BackendConfig to test multiple EC + * configurations (different k/m values, stripe units, plugins, and optimizations). + * Only EC configurations are tested since peering and failover are EC-specific. + */ +class TestECFailoverWithPeering : public ECPeeringTestFixture, + public ::testing::WithParamInterface { public: TestECFailoverWithPeering() : ECPeeringTestFixture() { - k = 4; - m = 2; - stripe_unit = 4096; - ec_plugin = "isa"; - ec_technique = "reed_sol_van"; + const auto& config = GetParam(); + k = config.k; + m = config.m; + stripe_unit = config.stripe_unit; + ec_plugin = config.ec_plugin; + ec_technique = config.ec_technique; + pool_flags = config.pool_flags; + } + + void SetUp() override { + ECPeeringTestFixture::SetUp(); } }; -TEST_F(TestECFailoverWithPeering, BasicPeeringCycle) { - run_peering_cycle(); - - EXPECT_TRUE(all_shards_active()) << "All shards should be active after peering"; - - // Note: In EC pools, only the primary tracks PG_STATE_CLEAN. - // Replicas are in ReplicaActive state and don't set the CLEAN flag. - // Get acting_primary from OSDMap +TEST_P(TestECFailoverWithPeering, BasicPeeringCycle) { pg_t pgid = get_peering_state(0)->get_info().pgid.pgid; std::vector acting_osds; int acting_primary = -1; @@ -55,116 +63,50 @@ TEST_F(TestECFailoverWithPeering, BasicPeeringCycle) { } } -TEST_F(TestECFailoverWithPeering, WriteWithPeering) { - run_peering_cycle(); - ASSERT_TRUE(all_shards_active()) << "Peering must complete before write"; - +TEST_P(TestECFailoverWithPeering, WriteWithPeering) { + const std::string obj_name = "test_write_with_peering"; const std::string test_data = "Data written with full peering support"; - int result = create_and_write(obj_name, test_data); - EXPECT_EQ(result, 0) << "Write should complete successfully"; - - bufferlist read_data; - int read_result = read_object(obj_name, 0, test_data.length(), read_data, test_data.length()); - EXPECT_GE(read_result, 0) << "Read should complete successfully"; - ASSERT_EQ(read_data.length(), test_data.length()); - - std::string read_string(read_data.c_str(), read_data.length()); - EXPECT_EQ(read_string, test_data) << "Data should match"; - + create_and_write_verify(obj_name, test_data); + auto* primary_ps = get_peering_state(0); EXPECT_GT(primary_ps->get_pg_log().get_log().log.size(), 0) << "Primary should have log entries after write"; } -TEST_F(TestECFailoverWithPeering, OSDFailureWithPeering) { - run_peering_cycle(); +TEST_P(TestECFailoverWithPeering, OSDFailureWithPeering) { ASSERT_TRUE(all_shards_active()) << "Initial peering must complete"; const std::string obj_name = "test_osd_failure"; - // Write 16KB but read only 8KB to force reconstruction when shard 1 is down - const std::string test_data(16384, 'X'); // 16KB write - const size_t read_length = 8192; // 8KB read - - int result = create_and_write(obj_name, test_data); - EXPECT_EQ(result, 0) << "Initial write should complete"; - - // Pre-failover read: measure baseline message count with all OSDs up - // Clear message counters first - for (auto& [shard, listener] : backend_listeners) { - listener->sent_messages.clear(); - } - + uint64_t object_size = k * stripe_unit; + const std::string test_data_full(object_size, 'X'); + const size_t read_length = 2 * stripe_unit; + const std::string test_data_read(read_length, 'X'); + int failed_osd = 1; // Fail shard 1 which contains part of the data + + create_and_write_verify(obj_name, test_data_full); + event_loop->reset_stats(); bufferlist pre_failover_read; - int pre_read_result = read_object(obj_name, 0, read_length, - pre_failover_read, test_data.length()); - EXPECT_GE(pre_read_result, 0) << "Pre-failover read should complete"; - - // Count messages sent during pre-failover read - size_t pre_failover_msg_count = 0; - for (auto& [shard, listener] : backend_listeners) { - pre_failover_msg_count += listener->sent_messages.size(); - } + verify_object(obj_name, test_data_read, 0, object_size); + EXPECT_EQ(4, event_loop->get_stats_by_type().at(EventLoop::EventType::OSD_MESSAGE)); - int failed_osd = 1; // Fail shard 1 which contains part of the data - // Use fixture helper to mark OSD as down mark_osd_down(failed_osd); - // Primary (OSD 0) should remain active after non-primary OSD failure - auto* primary_ps = get_peering_state(0); - std::string primary_state = get_state_name(0); - EXPECT_TRUE(primary_state.find("Peering") != std::string::npos || - primary_state.find("Active") != std::string::npos) - << "Primary should be peering or active after OSD failure, got: " << primary_state; - - EXPECT_TRUE(primary_ps->get_acting_recovery_backfill().count(pg_shard_t(failed_osd, shard_id_t(failed_osd))) == 0) - << "Failed OSD should not be in acting set"; - - // Clear message counters before post-failover read - for (auto& [shard, listener] : backend_listeners) { - listener->sent_messages.clear(); - } - - // Post-failover read: verify EC reconstruction works with one OSD down - bufferlist post_failover_read; - int post_read_result = read_object(obj_name, 0, read_length, - post_failover_read, test_data.length()); - EXPECT_GE(post_read_result, 0) << "Read should complete successfully after OSD failure"; - ASSERT_EQ(post_failover_read.length(), read_length) - << "Read length should match after OSD failure"; - - std::string read_string(post_failover_read.c_str(), post_failover_read.length()); - std::string expected_data(read_length, 'X'); - EXPECT_EQ(read_string, expected_data) - << "Data should be correctly reconstructed via EC after OSD failure"; - - // Count messages sent during post-failover read - size_t post_failover_msg_count = 0; - for (auto& [shard, listener] : backend_listeners) { - post_failover_msg_count += listener->sent_messages.size(); - } - - // This is an 8k read of a 16k object in a 4+2 array. This means that if shard 1 - // is missing, then this should result in 4 reads, rather than 2 to recover. - EXPECT_GT(post_failover_msg_count, pre_failover_msg_count) - << "Post-failover read should complete successfully " - << "(pre: " << pre_failover_msg_count << ", post: " << post_failover_msg_count << ")"; + // Reset EventLoop stats before post-failover read + event_loop->reset_stats(); + verify_object(obj_name, test_data_read, 0, object_size); + EXPECT_EQ(k * 2, event_loop->get_stats_by_type().at(EventLoop::EventType::OSD_MESSAGE)); } -TEST_F(TestECFailoverWithPeering, PrimaryFailoverWithPeering) { - run_peering_cycle(); +TEST_P(TestECFailoverWithPeering, PrimaryFailoverWithPeering) { ASSERT_TRUE(all_shards_active()) << "Initial peering must complete"; const std::string obj_name = "test_primary_failover"; const std::string test_data = "Data before primary failover"; - int result = create_and_write(obj_name, test_data); - EXPECT_EQ(result, 0) << "Initial write should complete"; - - EXPECT_TRUE(get_peering_listener(0)->backend_listener->pgb_is_primary()) - << "Shard 0 should be primary initially"; + create_and_write_verify(obj_name, test_data); // Mark OSD 0 (the initial primary) as down // PeeringState will automatically determine the new primary @@ -200,27 +142,21 @@ TEST_F(TestECFailoverWithPeering, PrimaryFailoverWithPeering) { << "New primary should be in Active state"; // Verify reads work after primary failover (with EC reconstruction) - bufferlist read_data; - int read_result = read_object(obj_name, 0, test_data.length(), - read_data, test_data.length()); - EXPECT_GE(read_result, 0) << "Read should complete successfully after primary failover"; - ASSERT_EQ(read_data.length(), test_data.length()) - << "Read length should match after primary failover"; - - std::string read_string(read_data.c_str(), read_data.length()); - EXPECT_EQ(read_string, test_data) - << "Data should be correctly reconstructed via EC after primary failover"; + verify_object(obj_name, test_data, 0, test_data.length()); } -TEST_F(TestECFailoverWithPeering, MultipleOSDFailuresWithPeering) { - run_peering_cycle(); +TEST_P(TestECFailoverWithPeering, MultipleOSDFailuresWithPeering) { + // This test only runs for configurations with m=2 + if (m != 2) { + GTEST_SKIP() << "MultipleOSDFailuresWithPeering only runs for m=2"; + } + ASSERT_TRUE(all_shards_active()) << "Initial peering must complete"; const std::string obj_name = "test_multiple_failures"; const std::string test_data = "Data before multiple failures"; - int result = create_and_write(obj_name, test_data); - EXPECT_EQ(result, 0) << "Initial write should complete"; + create_and_write_verify(obj_name, test_data); std::vector failed_osds = {1, 2}; // Fail 2 data shards ASSERT_EQ(failed_osds.size(), static_cast(m)) @@ -243,125 +179,7 @@ TEST_F(TestECFailoverWithPeering, MultipleOSDFailuresWithPeering) { << "Primary should be operational, got: " << primary_state; } -TEST_F(TestECFailoverWithPeering, PeeringWithLogDivergence) { - run_peering_cycle(); - ASSERT_TRUE(all_shards_active()) << "Initial peering must complete"; - - const std::string pre_div_obj = "test_pre_divergence"; - const std::string pre_div_data = "Data written before divergence"; - - int result = create_and_write(pre_div_obj, pre_div_data, eversion_t(1, 1)); - EXPECT_EQ(result, 0) << "Pre-divergence write should complete"; - - auto* primary_ps = get_peering_state(0); - size_t initial_log_size = primary_ps->get_pg_log().get_log().log.size(); - EXPECT_GT(initial_log_size, 0) << "Primary should have log entries after pre-divergence write"; - - // Note: get_pg_log().get_log().head reflects the log entries added via append_log - eversion_t pre_div_log_head = primary_ps->get_pg_log().get_log().head; - EXPECT_GT(pre_div_log_head.version, 0u) << "PG log head should be non-zero after write"; - - const std::string post_div_obj = "test_post_divergence"; - const std::string post_div_data = "Data written after divergence point"; - - result = create_and_write(post_div_obj, post_div_data, eversion_t(1, 2)); - EXPECT_EQ(result, 0) << "Post-divergence write should complete"; - - eversion_t post_div_log_head = primary_ps->get_pg_log().get_log().head; - EXPECT_GT(post_div_log_head.version, pre_div_log_head.version) - << "PG log head should advance after post-divergence write"; - - size_t log_size_after_writes = primary_ps->get_pg_log().get_log().log.size(); - EXPECT_GE(log_size_after_writes, initial_log_size) - << "Primary log should have at least as many entries after second write"; - - // Trigger a new peering cycle by advancing the map to simulate re-peering - // after a shard had a divergent log. - advance_epoch(); - - std::string primary_state = get_state_name(0); - ASSERT_TRUE(all_shards_active() || - primary_state.find("Recovery") != std::string::npos || - primary_state.find("Peering") != std::string::npos) - << "Shards should be active, recovering, or peering after map advance, got: " - << primary_state; - - // --- Verify pre-divergence data is readable and correct --- - bufferlist pre_div_read; - int read_result = read_object(pre_div_obj, 0, pre_div_data.length(), - pre_div_read, pre_div_data.length()); - EXPECT_GE(read_result, 0) << "Pre-divergence object should be readable after reconciliation"; - ASSERT_EQ(pre_div_read.length(), pre_div_data.length()) - << "Pre-divergence read length should match"; - { - std::string read_str(pre_div_read.c_str(), pre_div_read.length()); - EXPECT_EQ(read_str, pre_div_data) - << "Pre-divergence data should match after log reconciliation"; - } - - // --- Verify post-divergence data is readable and correct --- - bufferlist post_div_read; - read_result = read_object(post_div_obj, 0, post_div_data.length(), - post_div_read, post_div_data.length()); - EXPECT_GE(read_result, 0) << "Post-divergence object should be readable after reconciliation"; - ASSERT_EQ(post_div_read.length(), post_div_data.length()) - << "Post-divergence read length should match"; - { - std::string read_str(post_div_read.c_str(), post_div_read.length()); - EXPECT_EQ(read_str, post_div_data) - << "Post-divergence data should match after log reconciliation"; - } - - // After peering, the primary's PG log head should reflect all writes. - eversion_t primary_log_head = primary_ps->get_pg_log().get_log().head; - EXPECT_EQ(primary_log_head, post_div_log_head) - << "Primary PG log head should reflect all writes after reconciliation"; - - pg_t pgid = get_peering_state(0)->get_info().pgid.pgid; - std::vector acting_osds; - int acting_primary = -1; - osdmap->pg_to_acting_osds(pgid, &acting_osds, &acting_primary); - - for (int shard : acting_osds) { - if (shard == CRUSH_ITEM_NONE) { - continue; - } - auto* shard_ps = get_peering_state(shard); - if (shard_ps->is_active()) { - eversion_t shard_info_last_update = shard_ps->get_info().last_update; - if (shard == acting_primary) { - EXPECT_EQ(shard_info_last_update, post_div_log_head) - << "Primary shard info.last_update should match post-divergence log head"; - } else { - EXPECT_LE(shard_info_last_update, post_div_log_head) - << "Shard " << shard << " info.last_update should not exceed primary's log head"; - } - } - } - - // Verify the formerly-failed shard's PG log is accessible and consistent. - // We use the last data shard (k-1) as the "formerly-failed" shard to check. - int reconciled_shard = k - 1; - if (reconciled_shard >= 0 && reconciled_shard < k + m) { - auto* reconciled_ps = get_peering_state(reconciled_shard); - size_t reconciled_log_size = reconciled_ps->get_pg_log().get_log().log.size(); - auto* primary_ps_check = get_peering_state(acting_primary); - size_t primary_log_size = primary_ps_check->get_pg_log().get_log().log.size(); - EXPECT_LE(reconciled_log_size, primary_log_size) - << "Reconciled shard " << reconciled_shard - << " log size should not exceed primary's log size"; - - if (reconciled_ps->is_active()) { - eversion_t reconciled_info_lu = reconciled_ps->get_info().last_update; - EXPECT_LE(reconciled_info_lu, post_div_log_head) - << "Reconciled shard " << reconciled_shard - << " info.last_update should not exceed primary's log head after log reconciliation"; - } - } -} - -TEST_F(TestECFailoverWithPeering, RecoveryWithPeering) { - run_peering_cycle(); +TEST_P(TestECFailoverWithPeering, RecoveryWithPeering) { ASSERT_TRUE(all_shards_active()) << "Initial peering must complete"; const std::string obj1_name = "test_recovery_obj1"; @@ -370,10 +188,10 @@ TEST_F(TestECFailoverWithPeering, RecoveryWithPeering) { const std::string obj2_name = "test_recovery_obj2"; const std::string obj2_data = "Second object data for recovery test"; - int result = create_and_write(obj1_name, obj1_data, eversion_t(1, 1)); + int result = create_and_write(obj1_name, obj1_data); EXPECT_EQ(result, 0) << "First pre-failure write should complete"; - result = create_and_write(obj2_name, obj2_data, eversion_t(1, 2)); + result = create_and_write(obj2_name, obj2_data); EXPECT_EQ(result, 0) << "Second pre-failure write should complete"; EXPECT_TRUE(all_shards_clean()) << "All shards should be clean before recovery test"; @@ -423,7 +241,7 @@ TEST_F(TestECFailoverWithPeering, RecoveryWithPeering) { const std::string post_recovery_obj = "test_post_recovery"; const std::string post_recovery_data = "Data written after OSD failure and recovery"; - result = create_and_write(post_recovery_obj, post_recovery_data, eversion_t(1, 3)); + result = create_and_write(post_recovery_obj, post_recovery_data); EXPECT_EQ(result, 0) << "Write after OSD failure should complete successfully"; bufferlist post_recovery_read; @@ -461,3 +279,97 @@ TEST_F(TestECFailoverWithPeering, RecoveryWithPeering) { << "on_activate_complete should have been called during peering"; } +// --------------------------------------------------------------------------- +// EC backend configurations for parameterized tests +// --------------------------------------------------------------------------- + +namespace { + +/** + * EC-only backend configurations for TestECFailoverWithPeering. + * These configurations test various EC parameters: + * - Different k/m ratios (2+1, 4+2, 8+3) + * - Different stripe units (4k, 8k, 16k) + * - Different plugins (isa, jerasure) + * - Optimized vs non-optimized EC + * - Multi-zone configurations + */ +const std::vector kECPeeringConfigs = { + // ISA plugin with optimizations (modern EC) + {PGBackendTestFixture::EC, "isa", "reed_sol_van", pg_pool_t::FLAG_EC_OVERWRITES | pg_pool_t::FLAG_EC_OPTIMIZATIONS, 4096, 4, 2, "EC_ISA_Opt_k4m2_su4k"}, + {PGBackendTestFixture::EC, "isa", "reed_sol_van", pg_pool_t::FLAG_EC_OVERWRITES | pg_pool_t::FLAG_EC_OPTIMIZATIONS, 8192, 4, 2, "EC_ISA_Opt_k4m2_su8k"}, + {PGBackendTestFixture::EC, "isa", "reed_sol_van", pg_pool_t::FLAG_EC_OVERWRITES | pg_pool_t::FLAG_EC_OPTIMIZATIONS, 16384, 4, 2, "EC_ISA_Opt_k4m2_su16k"}, + {PGBackendTestFixture::EC, "isa", "reed_sol_van", pg_pool_t::FLAG_EC_OVERWRITES | pg_pool_t::FLAG_EC_OPTIMIZATIONS, 4096, 2, 1, "EC_ISA_Opt_k2m1_su4k"}, + {PGBackendTestFixture::EC, "isa", "reed_sol_van", pg_pool_t::FLAG_EC_OVERWRITES | pg_pool_t::FLAG_EC_OPTIMIZATIONS, 4096, 8, 3, "EC_ISA_Opt_k8m3_su4k"}, + + // Jerasure plugin with optimizations (modern EC) + {PGBackendTestFixture::EC, "jerasure", "reed_sol_van", pg_pool_t::FLAG_EC_OVERWRITES | pg_pool_t::FLAG_EC_OPTIMIZATIONS, 4096, 4, 2, "EC_Jerasure_Opt_k4m2_su4k"}, + {PGBackendTestFixture::EC, "jerasure", "reed_sol_van", pg_pool_t::FLAG_EC_OVERWRITES | pg_pool_t::FLAG_EC_OPTIMIZATIONS, 8192, 4, 2, "EC_Jerasure_Opt_k4m2_su8k"}, + {PGBackendTestFixture::EC, "jerasure", "reed_sol_van", pg_pool_t::FLAG_EC_OVERWRITES | pg_pool_t::FLAG_EC_OPTIMIZATIONS, 16384, 4, 2, "EC_Jerasure_Opt_k4m2_su16k"}, + {PGBackendTestFixture::EC, "jerasure", "reed_sol_van", pg_pool_t::FLAG_EC_OVERWRITES | pg_pool_t::FLAG_EC_OPTIMIZATIONS, 4096, 2, 1, "EC_Jerasure_Opt_k2m1_su4k"}, + {PGBackendTestFixture::EC, "jerasure", "reed_sol_van", pg_pool_t::FLAG_EC_OVERWRITES | pg_pool_t::FLAG_EC_OPTIMIZATIONS, 4096, 8, 3, "EC_Jerasure_Opt_k8m3_su4k"}, +}; + +} // namespace + +/** + * Test OSD failure and recovery with peering. + * + * This test simulates the following scenario: + * 1. Write full stripe with pattern A (committed to all shards) + * 2. Write full stripe with pattern B (committed to all shards) + * 3. Mark OSD 5 as down (forcing peering) + * 4. Trigger peering - PG should remain active/recovering + * 5. Read data back - should get pattern B (latest write) + * + * This verifies that the test infrastructure properly handles OSD failures + * and peering without leaving OSDs in a suspended state that would block + * teardown. + */ +TEST_P( + TestECFailoverWithPeering, + RollbackAfterOSDFailure +) { + // GTEST_SKIP(); // Temporary + int failing_shard = k + m - 1; + int blocked_shard = 1; + const std::string obj_name = "test"; + const size_t data_size = stripe_unit * k; // One full stripe. + std::string pattern_a(data_size, 'A'); + std::string pattern_b(data_size, 'B'); + std::string pattern_c(data_size, 'C'); + + ASSERT_TRUE(all_shards_active()) << "Initial peering must complete"; + + create_and_write_verify(obj_name, pattern_a); + suspend_primary_to_osd(blocked_shard); + int result = write(obj_name, 0, pattern_b, data_size); + ASSERT_EQ(-EINPROGRESS, result); + result = write(obj_name, 0, pattern_c, data_size); + ASSERT_EQ(-EINPROGRESS, result); + mark_osd_down(failing_shard); + unsuspend_primary_to_osd(blocked_shard); + event_loop->run_until_idle(); + + // Ensure all shards have completed peering and applied rollback transactions + ASSERT_TRUE(all_shards_active()) << "All shards should be active after peering"; + + verify_object(obj_name, pattern_a, 0, pattern_a.length()); + + std::cout << "\n=== RollbackAfterOSDFailure Test Complete ===" << std::endl; +} + + +// --------------------------------------------------------------------------- +// Instantiate TestECFailoverWithPeering with EC configurations +// --------------------------------------------------------------------------- + +INSTANTIATE_TEST_SUITE_P( + ECConfigs, + TestECFailoverWithPeering, + ::testing::ValuesIn(kECPeeringConfigs), + [](const ::testing::TestParamInfo& info) { + return info.param.label; + } +); +