]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: Add unittests for PeeringState 67304/head
authorBill Scales <bill_scales@uk.ibm.com>
Thu, 22 Jan 2026 10:53:54 +0000 (10:53 +0000)
committerBill Scales <bill_scales@uk.ibm.com>
Thu, 19 Feb 2026 13:37:59 +0000 (13:37 +0000)
Add test harness for testing PeeringState. Add mock classes
for PeeringListener, PGBackendListener and PGBackend to allow
instances of PeeringState to be created. Test harness to pass
peering messages between OSD instances, configure the OSDMap
with a pool and one PG. Some basic initialization unittests
plus tests that run the whole peering process to active+cleam
including testing rollback of incomplete writes in an EC pool,
recovery, async recovery and backfill.

Includes tests for issues 71493, 73891 and 74218

AI: AI assisted in writing the mock classes providing stubs
for all the virtual functions and writing the simpler unittests.
Signed-off-by: Bill Scales <bill_scales@uk.ibm.com>
src/test/osd/CMakeLists.txt
src/test/osd/TestPeeringState.cc [new file with mode: 0644]

index cbe5113d3829fb9fedf86a55777e33186dd865a0..f4b59c8c7b5bf73f09ddf6b6fc4d9f22ade12b6b 100644 (file)
@@ -110,7 +110,12 @@ add_executable(unittest_pglog
   )
 add_ceph_unittest(unittest_pglog)
 target_link_libraries(unittest_pglog osd os global ${CMAKE_DL_LIBS} ${BLKID_LIBRARIES})
-
+# unittest_peeringstate
+add_executable(unittest_peeringstate
+  TestPeeringState.cc
+  )
+add_ceph_unittest(unittest_peeringstate)
+target_link_libraries(unittest_peeringstate osd os global ${CMAKE_DL_LIBS} ${BLKID_LIBRARIES})
 # unittest_hitset
 add_executable(unittest_hitset
   hitset.cc
diff --git a/src/test/osd/TestPeeringState.cc b/src/test/osd/TestPeeringState.cc
new file mode 100644 (file)
index 0000000..52f2a18
--- /dev/null
@@ -0,0 +1,3428 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2025 IBM
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Library Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Library Public License for more details.
+ *
+ */
+
+ /* This is a test harness for testing PeeringState (with PGLog
+  * and MissingLoc) and the peering process. Because the main
+  * purpose of peering is to reconcile the state of a PG across
+  * the cluster the test harness simulates multiple OSDs each with
+  * their own instance of PeeringState and emulates messenger and
+  * the OSD scheduler passing peering messages between them. This
+  * allows both unit tests of individual functions and tests of
+  * the whole peering cycle. PeeringState coordinates recovery
+  * and backfill, the test harness emulates enough of PG and
+  * PrimaryLogPG to allow this to be tested.
+  */
+
+#include <memory>
+#include <gtest/gtest.h>
+#include "osd/PeeringState.h"
+#include "osd/PGBackend.h"
+#include "osd/ReplicatedBackend.h"
+#include "osd/ECBackend.h"
+#include "osd/OSDMap.h"
+#include "osd/osd_types.h"
+#include "osd/osd_perf_counters.h"
+#include "common/ceph_context.h"
+#include "common/ostream_temp.h"
+#include "common/perf_counters_collection.h"
+#include "common/WorkQueue.h"
+#include "common/intrusive_timer.h"
+#include "global/global_context.h"
+#include "global/global_init.h"
+#include "messages/MOSDPeeringOp.h"
+#include "msg/Connection.h"
+#include "os/ObjectStore.h"
+
+// dout using global context and OSD subsystem
+// main sets OSD subsystem debug level
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_osd
+
+using namespace std;
+
+//MockConnection - simple stub. Required because PeeringState needs
+//to know the features of the peer OSD which sent a peering message
+class MockConnection : public Connection {
+ public:
+  MockConnection() : Connection(g_ceph_context, nullptr) {
+    set_features(CEPH_FEATURES_ALL);
+  }
+
+  bool is_connected() override {
+    return true;
+  }
+
+  int send_message(Message *m) override {
+    m->put();
+    return 0;
+  }
+
+  void send_keepalive() override {
+  }
+
+  void mark_down() override {
+  }
+
+  void mark_disposable() override {
+  }
+
+  entity_addr_t get_peer_socket_addr() const override {
+    return entity_addr_t();
+  }
+};
+
+//MockLog - simple stub
+class MockLog : public LoggerSinkSet {
+ public:
+  void debug(std::stringstream& s) final
+  {
+    std::cout << "\n<<debug>> " << s.str() << std::endl;
+  }
+
+  void info(std::stringstream& s) final
+  {
+    std::cout << "\n<<info>> " << s.str() << std::endl;
+  }
+
+  void sec(std::stringstream& s) final
+  {
+    std::cout << "\n<<sec>> " << s.str() << std::endl;
+  }
+
+  void warn(std::stringstream& s) final
+  {
+    std::cout << "\n<<warn>> " << s.str() << std::endl;
+  }
+
+  void error(std::stringstream& s) final
+  {
+    err_count++;
+    std::cout << "\n<<error>> " << s.str() << std::endl;
+  }
+
+  OstreamTemp info() final { return OstreamTemp(CLOG_INFO, this); }
+  OstreamTemp warn() final { return OstreamTemp(CLOG_WARN, this); }
+  OstreamTemp error() final { return OstreamTemp(CLOG_ERROR, this); }
+  OstreamTemp sec() final { return OstreamTemp(CLOG_ERROR, this); }
+  OstreamTemp debug() final { return OstreamTemp(CLOG_DEBUG, this); }
+
+  void do_log(clog_type prio, std::stringstream& ss) final
+  {
+    switch (prio) {
+      case CLOG_DEBUG:
+        debug(ss);
+        break;
+      case CLOG_INFO:
+        info(ss);
+        break;
+      case CLOG_SEC:
+        sec(ss);
+        break;
+      case CLOG_WARN:
+        warn(ss);
+        break;
+      case CLOG_ERROR:
+      default:
+        error(ss);
+        break;
+    }
+  }
+
+  void do_log(clog_type prio, const std::string& ss) final
+  {
+    switch (prio) {
+      case CLOG_DEBUG:
+        debug() << ss;
+        break;
+      case CLOG_INFO:
+        info() << ss;
+        break;
+      case CLOG_SEC:
+        sec() << ss;
+        break;
+      case CLOG_WARN:
+        warn() << ss;
+        break;
+      case CLOG_ERROR:
+      default:
+        error() << ss;
+        break;
+    }
+  }
+
+  virtual ~MockLog() {}
+
+  int err_count{0};
+  int expected_err_count{0};
+  void set_expected_err_count(int c) { expected_err_count = c; }
+};
+
+// MockECRecPred - simple stub for IsPGRecoverablePredicate
+// Warning - this always returns true. This means we cannot test scenarios
+// where there are too many OSDs down and the PG should be incomplete
+class MockECRecPred : public IsPGRecoverablePredicate {
+ public:
+  MockECRecPred() {}
+
+  bool operator()(const std::set<pg_shard_t> &_have) const override {
+    return true;
+  }
+};
+
+IsPGRecoverablePredicate *get_is_recoverable_predicate() {
+  return new MockECRecPred();
+}
+
+// MockECReadPred - simple stub for IsPGReadablePredicate
+// Warning - this always returns true. This means we cannot test scenarios
+// where there are too many OSDs down and the PG should be incomplete
+class MockECReadPred : public IsPGReadablePredicate {
+ public:
+  MockECReadPred() {}
+  bool operator()(const std::set<pg_shard_t> &_have) const override {
+    return true;
+  }
+};
+
+IsPGReadablePredicate *get_is_readable_predicate() {
+  return new MockECReadPred();
+}
+
+// MockPGBackendListener - simple stub for PGBackend::Listener
+class MockPGBackendListener : public PGBackend::Listener {
+public:
+  pg_info_t info;
+  OSDMapRef osdmap;
+  const pg_pool_t pool;
+  PGLog log;
+  DoutPrefixProvider *dpp;
+  pg_shard_t pg_whoami;
+  std::set<pg_shard_t> shardset;
+  std::map<pg_shard_t, pg_info_t> shard_info;
+  std::map<pg_shard_t, pg_missing_t> shard_missing;
+  std::map<hobject_t, std::set<pg_shard_t>> missing_loc_shards;
+  pg_missing_tracker_t local_missing;
+
+  MockPGBackendListener(OSDMapRef osdmap, const pg_pool_t pi, DoutPrefixProvider *dpp, pg_shard_t pg_whoami) :
+    osdmap(osdmap), pool(pi), log(g_ceph_context), dpp(dpp), pg_whoami(pg_whoami) {}
+
+  // Debugging
+  DoutPrefixProvider *get_dpp() override {
+    return dpp;
+  }
+
+  // Recovery callbacks
+  void on_local_recover(
+    const hobject_t &oid,
+    const ObjectRecoveryInfo &recovery_info,
+    ObjectContextRef obc,
+    bool is_delete,
+    ObjectStore::Transaction *t) override {
+  }
+
+  void on_global_recover(
+    const hobject_t &oid,
+    const object_stat_sum_t &stat_diff,
+    bool is_delete) override {
+  }
+
+  void on_peer_recover(
+    pg_shard_t peer,
+    const hobject_t &oid,
+    const ObjectRecoveryInfo &recovery_info) override {
+  }
+
+  void begin_peer_recover(
+    pg_shard_t peer,
+    const hobject_t oid) override {
+  }
+
+  void apply_stats(
+    const hobject_t &soid,
+    const object_stat_sum_t &delta_stats) override {
+  }
+
+  void on_failed_pull(
+    const std::set<pg_shard_t> &from,
+    const hobject_t &soid,
+    const eversion_t &v) override {
+  }
+
+  void cancel_pull(const hobject_t &soid) override {
+  }
+
+  void remove_missing_object(
+    const hobject_t &oid,
+    eversion_t v,
+    Context *on_complete) override {
+  }
+
+  // Locking
+  void pg_lock() override {}
+  void pg_unlock() override {}
+  void pg_add_ref() override {}
+  void pg_dec_ref() override {}
+
+  // Context wrapping
+  Context *bless_context(Context *c) override {
+    return c;
+  }
+
+  GenContext<ThreadPool::TPHandle&> *bless_gencontext(
+    GenContext<ThreadPool::TPHandle&> *c) override {
+    return c;
+  }
+
+  GenContext<ThreadPool::TPHandle&> *bless_unlocked_gencontext(
+    GenContext<ThreadPool::TPHandle&> *c) override {
+    return c;
+  }
+
+  // Messaging
+  void send_message(int to_osd, Message *m) override {
+  }
+
+  void queue_transaction(
+    ObjectStore::Transaction&& t,
+    OpRequestRef op = OpRequestRef()) override {
+  }
+
+  void queue_transactions(
+    std::vector<ObjectStore::Transaction>& tls,
+    OpRequestRef op = OpRequestRef()) override {
+  }
+
+  epoch_t get_interval_start_epoch() const override {
+    return 1;
+  }
+
+  epoch_t get_last_peering_reset_epoch() const override {
+    return 1;
+  }
+
+  // Shard information
+  const std::set<pg_shard_t> &get_acting_recovery_backfill_shards() const override {
+    return shardset;
+  }
+
+  const std::set<pg_shard_t> &get_acting_shards() const override {
+    return shardset;
+  }
+
+  const std::set<pg_shard_t> &get_backfill_shards() const override {
+    return shardset;
+  }
+
+  std::ostream& gen_dbg_prefix(std::ostream& out) const override {
+    return out << "MockPGBackend ";
+  }
+
+  const std::map<hobject_t, std::set<pg_shard_t>> &get_missing_loc_shards() const override {
+    return missing_loc_shards;
+  }
+
+  const pg_missing_tracker_t &get_local_missing() const override {
+    return local_missing;
+  }
+
+  void add_local_next_event(const pg_log_entry_t& e) override {
+  }
+
+  const std::map<pg_shard_t, pg_missing_t> &get_shard_missing() const override {
+    return shard_missing;
+  }
+
+  const pg_missing_const_i &get_shard_missing(pg_shard_t peer) const override {
+    return local_missing;
+  }
+
+  const std::map<pg_shard_t, pg_info_t> &get_shard_info() const override {
+    return shard_info;
+  }
+
+  const PGLog &get_log() const override {
+    return log;
+  }
+
+  bool pgb_is_primary() const override {
+    return true;
+  }
+
+  const OSDMapRef& pgb_get_osdmap() const override {
+    return osdmap;
+  }
+
+  epoch_t pgb_get_osdmap_epoch() const override {
+    return osdmap->get_epoch();
+  }
+
+  const pg_info_t &get_info() const override {
+    return info;
+  }
+
+  const pg_pool_t &get_pool() const override {
+    return pool;
+  }
+
+  eversion_t get_pg_committed_to() const override {
+    return eversion_t();
+  }
+
+  ObjectContextRef get_obc(
+    const hobject_t &hoid,
+    const std::map<std::string, ceph::buffer::list, std::less<>> &attrs) override {
+    return ObjectContextRef();
+  }
+
+  bool try_lock_for_read(
+    const hobject_t &hoid,
+    ObcLockManager &manager) override {
+    return true;
+  }
+
+  void release_locks(ObcLockManager &manager) override {
+  }
+
+  void op_applied(const eversion_t &applied_version) override {
+  }
+
+  bool should_send_op(pg_shard_t peer, const hobject_t &hoid) override {
+    return true;
+  }
+
+  bool pg_is_undersized() const override {
+    return false;
+  }
+
+  bool pg_is_repair() const override {
+    return false;
+  }
+
+#if POOL_MIGRATION
+  void update_migration_watermark(const hobject_t &watermark) override {
+  }
+#endif
+
+#if POOL_MIGRATION
+  std::optional<hobject_t> consider_updating_migration_watermark(
+    std::set<hobject_t> &deleted) override {
+    return std::nullopt;
+  }
+#endif
+
+  void log_operation(
+    std::vector<pg_log_entry_t>&& logv,
+    const std::optional<pg_hit_set_history_t> &hset_history,
+    const eversion_t &trim_to,
+    const eversion_t &roll_forward_to,
+    const eversion_t &pg_committed_to,
+    bool transaction_applied,
+    ObjectStore::Transaction &t,
+    bool async = false) override {
+  }
+
+  void pgb_set_object_snap_mapping(
+    const hobject_t &soid,
+    const std::set<snapid_t> &snaps,
+    ObjectStore::Transaction *t) override {
+  }
+
+  void pgb_clear_object_snap_mapping(
+    const hobject_t &soid,
+    ObjectStore::Transaction *t) override {
+  }
+
+  void update_peer_last_complete_ondisk(
+    pg_shard_t fromosd,
+    eversion_t lcod) override {
+  }
+
+  void update_last_complete_ondisk(eversion_t lcod) override {
+  }
+
+  void update_pct(eversion_t pct) override {
+  }
+
+  void update_stats(const pg_stat_t &stat) override {
+  }
+
+  void schedule_recovery_work(
+    GenContext<ThreadPool::TPHandle&> *c,
+    uint64_t cost) override {
+  }
+
+  common::intrusive_timer &get_pg_timer() override {
+    ceph_abort("Not supported");
+  }
+
+  pg_shard_t whoami_shard() const override {
+    return pg_whoami;
+  }
+
+  spg_t primary_spg_t() const override {
+    return spg_t();
+  }
+
+  pg_shard_t primary_shard() const override {
+    return pg_shard_t();
+  }
+
+  uint64_t min_peer_features() const override {
+    return CEPH_FEATURES_ALL;
+  }
+
+  uint64_t min_upacting_features() const override {
+    return CEPH_FEATURES_ALL;
+  }
+
+  pg_feature_vec_t get_pg_acting_features() const override {
+    return pg_feature_vec_t();
+  }
+
+  hobject_t get_temp_recovery_object(
+    const hobject_t& target,
+    eversion_t version) override {
+    return hobject_t();
+  }
+
+  void send_message_osd_cluster(
+    int peer, Message *m, epoch_t from_epoch) override {
+  }
+
+  void send_message_osd_cluster(
+    std::vector<std::pair<int, Message*>>& messages, epoch_t from_epoch) override {
+  }
+
+  void send_message_osd_cluster(MessageRef, Connection *con) override {
+  }
+
+  void send_message_osd_cluster(Message *m, const ConnectionRef& con) override {
+  }
+
+  void start_mon_command(
+    std::vector<std::string>&& cmd, bufferlist&& inbl,
+    bufferlist *outbl, std::string *outs,
+    Context *onfinish) override {
+  }
+
+  ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch) override {
+    return nullptr;
+  }
+
+  entity_name_t get_cluster_msgr_name() override {
+    return entity_name_t();
+  }
+
+  PerfCounters *get_logger() override {
+    return nullptr;
+  }
+
+  ceph_tid_t get_tid() override {
+    return 0;
+  }
+
+  OstreamTemp clog_error() override {
+    return OstreamTemp(CLOG_ERROR, nullptr);
+  }
+
+  OstreamTemp clog_warn() override {
+    return OstreamTemp(CLOG_WARN, nullptr);
+  }
+
+  bool check_failsafe_full() override {
+    return false;
+  }
+
+  void inc_osd_stat_repaired() override {
+  }
+
+  bool pg_is_remote_backfilling() override {
+    return false;
+  }
+
+  void pg_add_local_num_bytes(int64_t num_bytes) override {
+  }
+
+  void pg_sub_local_num_bytes(int64_t num_bytes) override {
+  }
+
+  void pg_add_num_bytes(int64_t num_bytes) override {
+  }
+
+  void pg_sub_num_bytes(int64_t num_bytes) override {
+  }
+
+  bool maybe_preempt_replica_scrub(const hobject_t& oid) override {
+    return false;
+  }
+
+  struct ECListener *get_eclistener() override {
+    return nullptr;
+  }
+};
+
+// MockPGBackend - simple stub for PGBackend
+class MockPGBackend : public PGBackend {
+public:
+  MockPGBackend(CephContext* cct, Listener *l, ObjectStore *store,
+                const coll_t &coll, ObjectStore::CollectionHandle &ch)
+    : PGBackend(cct, l, store, coll, ch) {}
+
+  // Recovery operations
+  RecoveryHandle *open_recovery_op() override {
+    return nullptr;
+  }
+
+  void run_recovery_op(RecoveryHandle *h, int priority) override {
+  }
+
+  int recover_object(
+    const hobject_t &hoid,
+    eversion_t v,
+    ObjectContextRef head,
+    ObjectContextRef obc,
+    RecoveryHandle *h) override {
+    return 0;
+  }
+
+  // Message handling
+  bool can_handle_while_inactive(OpRequestRef op) override {
+    return false;
+  }
+
+  bool _handle_message(OpRequestRef op) override {
+    return false;
+  }
+
+  void check_recovery_sources(const OSDMapRef& osdmap) override {
+  }
+
+  // State management
+  void on_change() override {
+  }
+
+  void clear_recovery_state() override {
+  }
+
+  // Predicates
+  IsPGRecoverablePredicate *get_is_recoverable_predicate() const override {
+    return nullptr;
+  }
+
+  IsPGReadablePredicate *get_is_readable_predicate() const override {
+    return nullptr;
+  }
+
+  bool get_ec_supports_crc_encode_decode() const override {
+    return false;
+  }
+
+  void dump_recovery_info(ceph::Formatter *f) const override {
+  }
+
+  bool ec_can_decode(const shard_id_set &available_shards) const override {
+    return false;
+  }
+
+  shard_id_map<bufferlist> ec_encode_acting_set(
+    const bufferlist &in_bl) const override {
+    return {0};
+  }
+
+  shard_id_map<bufferlist> ec_decode_acting_set(
+    const shard_id_map<bufferlist> &shard_map, int chunk_size) const override {
+    return {0};
+  }
+
+  ECUtil::stripe_info_t ec_get_sinfo() const override {
+    return {0, 0, 0};
+  }
+
+  // Transaction submission
+  void submit_transaction(
+    const hobject_t &hoid,
+    const object_stat_sum_t &delta_stats,
+    const eversion_t &at_version,
+    PGTransactionUPtr &&t,
+    const eversion_t &trim_to,
+    const eversion_t &pg_committed_to,
+    std::vector<pg_log_entry_t>&& log_entries,
+    std::optional<pg_hit_set_history_t> &hset_history,
+    Context *on_all_commit,
+    ceph_tid_t tid,
+    osd_reqid_t reqid,
+    OpRequestRef op) override {
+  }
+
+  void call_write_ordered(std::function<void(void)> &&cb) override {
+    cb();
+  }
+
+  // Object operations
+  int objects_read_sync(
+    const hobject_t &hoid,
+    uint64_t off,
+    uint64_t len,
+    uint32_t op_flags,
+    ceph::buffer::list *bl) override {
+    return 0;
+  }
+
+  void objects_read_async(
+    const hobject_t &hoid,
+    uint64_t object_size,
+    const std::list<std::pair<ec_align_t,
+      std::pair<ceph::buffer::list*, Context*>>> &to_read,
+    Context *on_complete, bool fast_read = false) override {
+  }
+
+  bool auto_repair_supported() const override {
+    return false;
+  }
+
+  uint64_t be_get_ondisk_size(uint64_t logical_size,
+                              shard_id_t shard_id,
+                              bool object_is_legacy_ec) const override {
+    return logical_size;
+  }
+
+  int be_deep_scrub(
+    const Scrub::ScrubCounterSet& io_counters,
+    const hobject_t &oid,
+    ScrubMap &map,
+    ScrubMapBuilder &pos,
+    ScrubMap::object &o) override {
+    return 0;
+  }
+};
+
+// MockPGLogEntryHandler
+//
+// This is a fully functional implementation of the PGLog::LogEntryHandler
+// interface. It calls code in PGBackend to perform the requested operations
+// although some of the stubs in MockPGBackend return questionable information
+// about the object size so the generated ObjectStore::Transaction is probably
+// not correct. The main purpose is to use partial_write to update the PWLC
+// information when appending entries to the log
+class MockPGLogEntryHandler : public PGLog::LogEntryHandler {
+ public:
+  MockPGBackend *backend;
+  ObjectStore::Transaction *t;
+  MockPGLogEntryHandler(MockPGBackend *backend, ObjectStore::Transaction *t) : backend(backend), t(t) {}
+
+  // LogEntryHandler
+  void remove(const hobject_t &hoid) override {
+    dout(0) << "MockPGLogEntryHandler::remove " << hoid << dendl;
+    backend->remove(hoid, t);
+  }
+  void try_stash(const hobject_t &hoid, version_t v) override {
+    dout(0) << "MockPGLogEntryHandler::try_stash " << hoid << " " << v << dendl;
+    backend->try_stash(hoid, v, t);
+  }
+  void rollback(const pg_log_entry_t &entry) override {
+    dout(0) << "MockPGLogEntryHandler::rollback " << entry << dendl;
+    ceph_assert(entry.can_rollback());
+    backend->rollback(entry, t);
+  }
+  void rollforward(const pg_log_entry_t &entry) override {
+    dout(0) << "MockPGLogEntryHandler::rollforward " << entry << dendl;
+    backend->rollforward(entry, t);
+  }
+  void trim(const pg_log_entry_t &entry) override {
+    dout(0) << "MockPGLogEntryHandler::trim " << entry << dendl;
+    backend->trim(entry, t);
+  }
+  void partial_write(pg_info_t *info, eversion_t previous_version,
+                      const pg_log_entry_t &entry
+    ) override {
+    dout(0) << "MockPGLogEntryHandler::partial_write " << entry << dendl;
+    backend->partial_write(info, previous_version, entry);
+  }
+};
+
+// Mock PeeringListener - stub of PeeringState::PeeringListener
+// to help with testing of PeeringState. Keep track of calls
+// from PeeringState and emulate some of PrimaryLogPG/PG
+// functionality for testing purposes.
+//
+// There are some inject_* variables that can be used to help
+// tests create race hazards or test failure paths
+class MockPeeringListener : public PeeringState::PeeringListener {
+ public:
+  pg_shard_t pg_whoami;
+  MockLog logger;
+  PeeringState *ps;
+  unique_ptr<MockPGBackendListener> backend_listener;
+  coll_t coll;
+  ObjectStore::CollectionHandle ch;
+  unique_ptr<MockPGBackend> backend;
+  PerfCounters* recoverystate_perf;
+  PerfCounters* logger_perf;
+  std::vector<int> next_acting;
+
+#ifdef WITH_CRIMSON
+  // Per OSD state
+  std::map<int,std::list<MessageURef>> messages;
+#else
+  // Per OSD state
+  std::map<int,std::list<MessageRef>> messages;
+#endif
+  std::vector<HeartbeatStampsRef> hb_stamps;
+  std::list<PGPeeringEventRef> events;
+  std::list<PGPeeringEventRef> stalled_events;
+
+  // By default MockPeeringListener will add events to the event queue immediately
+  // simulating the responses that PrimaryLogPG normally generates. These inject
+  // booleans can change the behavior to test other code paths
+
+  // If inject_event_stall is true then events are added to the stalled_events list
+  // and the test case must manually dispatch the event
+  bool inject_event_stall = false;
+
+  // If inject_keep_preempt is true then the preempt event for a local/remote
+  // reservation is added to the stalled_events list so the test case can later
+  // dispatch this event to test a preempted reservation
+  bool inject_keep_preempt = false;
+
+  // If inject_fail_reserve_recovery_space is true then reject backfill/pool
+  // migration requests with too full
+  bool inject_fail_reserve_recovery_space = false;
+
+  MockPeeringListener(OSDMapRef osdmap,
+                      const pg_pool_t pi,
+                      DoutPrefixProvider *dpp,
+                      pg_shard_t pg_whoami) : pg_whoami(pg_whoami) {
+    backend_listener = make_unique<MockPGBackendListener>(osdmap, pi, dpp, pg_whoami);
+    backend = make_unique<MockPGBackend>(g_ceph_context, backend_listener.get(), nullptr, coll, ch);
+    recoverystate_perf = build_recoverystate_perf(g_ceph_context);
+    g_ceph_context->get_perfcounters_collection()->add(recoverystate_perf);
+    logger_perf = build_osd_logger(g_ceph_context);
+    g_ceph_context->get_perfcounters_collection()->add(logger_perf);
+  }
+
+  // EpochSource interface
+  epoch_t get_osdmap_epoch() const override {
+    return current_epoch;
+  }
+
+  // PeeringListener interface
+  void prepare_write(
+    pg_info_t &info,
+    pg_info_t &last_written_info,
+    PastIntervals &past_intervals,
+    PGLog &pglog,
+    bool dirty_info,
+    bool dirty_big_info,
+    bool need_write_epoch,
+    ObjectStore::Transaction &t) override {
+    prepare_write_called = true;
+  }
+
+  void scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type) override {
+    scrub_requested_called = true;
+  }
+
+  uint64_t get_snap_trimq_size() const override {
+    return snap_trimq_size;
+  }
+
+#ifdef WITH_CRIMSON
+  void send_cluster_message(
+    int osd, MessageURef m, epoch_t epoch, bool share_map_update=false) override {
+    dout(0) << "send_cluster_message to " << osd << " " << m << dendl;
+    messages[osd].push_back(m);
+    messages_sent++;
+  }
+#else
+  void send_cluster_message(
+    int osd, MessageRef m, epoch_t epoch, bool share_map_update=false) override {
+    dout(0) << "send_cluster_message to " << osd << " " << m << dendl;
+    messages[osd].push_back(m);
+    messages_sent++;
+  }
+#endif
+
+  void send_pg_created(pg_t pgid) override {
+    pg_created_sent = true;
+  }
+  ceph::signedspan get_mnow() const override {
+    return ceph::signedspan::zero();
+  }
+
+  HeartbeatStampsRef get_hb_stamps(int peer) override {
+    if (peer >= (int)hb_stamps.size()) {
+      hb_stamps.resize(peer + 1);
+    }
+    if (!hb_stamps[peer]) {
+      hb_stamps[peer] = ceph::make_ref<HeartbeatStamps>(peer);
+    }
+    return hb_stamps[peer];
+  }
+
+  void schedule_renew_lease(epoch_t plr, ceph::timespan delay) override {
+    renew_lease_scheduled = true;
+  }
+
+  void queue_check_readable(epoch_t lpr, ceph::timespan delay) override {
+    check_readable_queued = true;
+  }
+
+  void recheck_readable() override {
+    readable_rechecked = true;
+  }
+
+  unsigned get_target_pg_log_entries() const override {
+    return target_pg_log_entries;
+  }
+
+
+  bool try_flush_or_schedule_async() override {
+    return true;
+  }
+
+  void start_flush_on_transaction(ObjectStore::Transaction &t) override {
+    flush_started = true;
+  }
+
+  void on_flushed() override {
+    flushed = true;
+  }
+
+  void schedule_event_after(
+    PGPeeringEventRef event,
+    float delay) override {
+    stalled_events.push_back(std::move(event));
+    events_scheduled++;
+  }
+
+  void request_local_background_io_reservation(
+    unsigned priority,
+    PGPeeringEventURef on_grant,
+    PGPeeringEventURef on_preempt) override {
+    if (inject_event_stall) {
+      stalled_events.push_back(std::move(on_grant));
+    } else {
+      events.push_back(std::move(on_grant));
+    }
+    if (inject_keep_preempt) {
+      stalled_events.push_back(std::move(on_preempt));
+    }
+    io_reservations_requested++;
+  }
+
+  void update_local_background_io_priority(
+    unsigned priority) override {
+    io_priority_updated = true;
+  }
+
+  void cancel_local_background_io_reservation() override {
+    io_reservation_cancelled = true;
+  }
+
+  void request_remote_recovery_reservation(
+    unsigned priority,
+    PGPeeringEventURef on_grant,
+    PGPeeringEventURef on_preempt) override {
+    if (inject_event_stall) {
+      stalled_events.push_back(std::move(on_grant));
+    } else {
+      events.push_back(std::move(on_grant));
+    }
+    if (inject_keep_preempt) {
+      stalled_events.push_back(std::move(on_preempt));
+    }
+    remote_recovery_reservations_requested++;
+  }
+
+  void cancel_remote_recovery_reservation() override {
+    remote_recovery_reservation_cancelled = true;
+  }
+
+  void schedule_event_on_commit(
+    ObjectStore::Transaction &t,
+    PGPeeringEventRef on_commit) override {
+    if (inject_event_stall) {
+      stalled_events.push_back(std::move(on_commit));
+    } else {
+      events.push_back(std::move(on_commit));
+    }
+    events_on_commit_scheduled++;
+  }
+
+  void update_heartbeat_peers(std::set<int> peers) override {
+    heartbeat_peers_updated = true;
+  }
+
+  void set_probe_targets(const std::set<pg_shard_t> &probe_set) override {
+    probe_targets_set = true;
+  }
+
+  void clear_probe_targets() override {
+    probe_targets_cleared = true;
+  }
+
+  void queue_want_pg_temp(const std::vector<int> &wanted) override {
+    pg_temp_wanted = true;
+    next_acting = wanted;
+  }
+
+  void clear_want_pg_temp() override {
+    pg_temp_cleared = true;
+  }
+
+#if POOL_MIGRATION
+  void send_pg_migrated_pool() override {
+    pg_migrated_pool_sent = true;
+  }
+#endif
+
+  void publish_stats_to_osd() override {
+    stats_published = true;
+  }
+
+  void clear_publish_stats() override {
+    stats_cleared = true;
+  }
+
+  void check_recovery_sources(const OSDMapRef& newmap) override {
+    recovery_sources_checked = true;
+  }
+
+  void check_blocklisted_watchers() override {
+    blocklisted_watchers_checked = true;
+  }
+
+  void clear_primary_state() override {
+    primary_state_cleared = true;
+  }
+
+  void on_active_exit() override {
+    active_exited = true;
+  }
+
+  void on_active_actmap() override {
+    active_actmap_called = true;
+  }
+
+  void on_active_advmap(const OSDMapRef &osdmap) override {
+    active_advmap_called = true;
+  }
+
+  void on_backfill_reserved() override {
+    backfill_reserved = true;
+  }
+
+  void on_recovery_reserved() override {
+    recovery_reserved = true;
+  }
+
+  Context *on_clean() override {
+    clean_called = true;
+    return nullptr;
+  }
+
+  void on_activate(interval_set<snapid_t> snaps) override {
+    activate_called = true;
+  }
+
+  void on_change(ObjectStore::Transaction &t) override {
+    first_write_in_interval = true;
+    change_called = true;
+  }
+
+  std::pair<ghobject_t, bool> do_delete_work(
+    ObjectStore::Transaction &t, ghobject_t _next) override {
+    delete_work_done = true;
+    return std::make_pair(ghobject_t(), true);
+  }
+
+  void clear_ready_to_merge() override {
+    ready_to_merge_cleared = true;
+  }
+
+  void set_not_ready_to_merge_target(pg_t pgid, pg_t src) override {
+    not_ready_to_merge_target_set = true;
+  }
+
+  void set_not_ready_to_merge_source(pg_t pgid) override {
+    not_ready_to_merge_source_set = true;
+  }
+
+  void set_ready_to_merge_target(eversion_t lu, epoch_t les, epoch_t lec) override {
+    ready_to_merge_target_set = true;
+  }
+
+  void set_ready_to_merge_source(eversion_t lu) override {
+    ready_to_merge_source_set = true;
+  }
+
+  epoch_t cluster_osdmap_trim_lower_bound() override {
+    return 1;
+  }
+
+  void on_backfill_suspended() override {
+    backfill_suspended = true;
+  }
+
+  void on_recovery_cancelled() override {
+    recovery_cancelled = true;
+  }
+
+#if POOL_MIGRATION
+  void on_pool_migration_reserved() override {
+    pool_migration_reserved = true;
+  }
+#endif
+
+#if POOL_MIGRATION
+  void on_pool_migration_suspended() override {
+    pool_migration_suspended = true;
+  }
+#endif
+
+  bool try_reserve_recovery_space(
+    int64_t primary_num_bytes,
+    int64_t local_num_bytes) override {
+    recovery_space_reserved = true;
+    if (inject_fail_reserve_recovery_space) {
+      return false;
+    }
+    return true;
+  }
+
+  void unreserve_recovery_space() override {
+    recovery_space_unreserved = true;
+  }
+
+  PGLog::LogEntryHandlerRef get_log_handler(
+    ObjectStore::Transaction &t) override {
+    return std::make_unique<MockPGLogEntryHandler>(backend.get(), &t);
+  }
+
+  void rebuild_missing_set_with_deletes(PGLog &pglog) override {
+    missing_set_rebuilt = true;
+  }
+
+  PerfCounters &get_peering_perf() override {
+    return *recoverystate_perf;
+  }
+
+  PerfCounters &get_perf_logger() override {
+    return *logger_perf;
+  }
+
+  void log_state_enter(const char *state) override {
+    last_state_entered = string(state);
+    state_entered = true;
+  }
+
+  void log_state_exit(
+    const char *state_name, utime_t enter_time,
+    uint64_t events, utime_t event_dur) override {
+    last_state_exited = string(state_name);
+    state_exited = true;
+  }
+
+  void dump_recovery_info(ceph::Formatter *f) const override {
+    recovery_info_dumped = true;
+  }
+
+  OstreamTemp get_clog_info() override {
+    return logger.info();
+  }
+
+  OstreamTemp get_clog_error() override {
+    return logger.error();
+  }
+
+  OstreamTemp get_clog_debug() override {
+    return logger.debug();
+  }
+
+  void on_activate_complete() override {
+    dout(0) << __func__ << dendl;
+    std::list<PGPeeringEventRef> *event_queue;
+    if (inject_event_stall) {
+      event_queue = &stalled_events;
+    } else {
+      event_queue = &events;
+    }
+
+    if (ps->needs_recovery()) {
+      dout(10) << "activate not all replicas are up-to-date, queueing recovery" << dendl;
+      event_queue->push_back(
+          std::make_shared<PGPeeringEvent>(
+            get_osdmap_epoch(),
+            get_osdmap_epoch(),
+            PeeringState::DoRecovery()));
+    } else if (ps->needs_backfill()) {
+      dout(10) << "activate queueing backfill" << dendl;
+      event_queue->push_back(
+          std::make_shared<PGPeeringEvent>(
+            get_osdmap_epoch(),
+            get_osdmap_epoch(),
+            PeeringState::RequestBackfill()));
+#if POOL_MIGRATION
+    } else if (ps->needs_pool_migration()) {
+      dout(10) << "activate queueing pool migration" << dendl;
+      event_queue->push_back(
+          std::make_shared<PGPeeringEvent>(
+            get_osdmap_epoch(),
+            get_osdmap_epoch(),
+            PeeringState::DoPoolMigration()));
+#endif
+    } else {
+      dout(10) << "activate all replicas clean, no recovery" << dendl;
+      event_queue->push_back(
+          std::make_shared<PGPeeringEvent>(
+            get_osdmap_epoch(),
+            get_osdmap_epoch(),
+            PeeringState::AllReplicasRecovered()));
+    }
+    activate_complete_called = true;
+  }
+
+  void on_activate_committed() override {
+    activate_committed_called = true;
+  }
+
+  void on_new_interval() override {
+    new_interval_called = true;
+  }
+
+  void on_pool_change() override {
+    pool_changed = true;
+  }
+
+  void on_role_change() override {
+    role_changed = true;
+  }
+
+  void on_removal(ObjectStore::Transaction &t) override {
+    removal_called = true;
+  }
+
+  // Test state tracking
+  unsigned target_pg_log_entries = 100;
+  bool renew_lease_scheduled = false;
+  bool check_readable_queued = false;
+  bool readable_rechecked = false;
+  bool heartbeat_peers_updated = false;
+  bool probe_targets_set = false;
+  bool probe_targets_cleared = false;
+  bool pg_temp_wanted = false;
+  bool pg_temp_cleared = false;
+  bool pg_migrated_pool_sent = false;
+  bool stats_published = false;
+  bool stats_cleared = false;
+  bool recovery_sources_checked = false;
+  bool blocklisted_watchers_checked = false;
+  bool primary_state_cleared = false;
+  bool delete_work_done = false;
+  bool ready_to_merge_cleared = false;
+  bool not_ready_to_merge_target_set = false;
+  bool not_ready_to_merge_source_set = false;
+  bool ready_to_merge_target_set = false;
+  bool ready_to_merge_source_set = false;
+  bool backfill_suspended = false;
+  bool recovery_cancelled = false;
+  bool pool_migration_reserved = false;
+  bool pool_migration_suspended = false;
+  bool recovery_space_reserved = false;
+  bool recovery_space_unreserved = false;
+  bool missing_set_rebuilt = false;
+  string last_state_entered;
+  bool state_entered = false;
+  string last_state_exited;
+  bool state_exited = false;
+  mutable bool recovery_info_dumped = false;
+  epoch_t current_epoch = 1;
+  uint64_t snap_trimq_size = 0;
+  bool prepare_write_called = false;
+  bool scrub_requested_called = false;
+  bool pg_created_sent = false;
+  bool flush_started = false;
+  bool flushed = false;
+  bool io_priority_updated = false;
+  bool io_reservation_cancelled = false;
+  bool remote_recovery_reservation_cancelled = false;
+  bool active_exited = false;
+  bool active_actmap_called = false;
+  bool active_advmap_called = false;
+  bool backfill_reserved = false;
+  bool backfill_cancelled = false;
+  bool recovery_reserved = false;
+  bool clean_called = false;
+  bool activate_called = false;
+  bool activate_complete_called = false;
+  bool change_called = false;
+  bool activate_committed_called = false;
+  bool new_interval_called = false;
+  bool primary_status_changed = false;
+  bool pool_changed = false;
+  bool role_changed = false;
+  bool removal_called = false;
+  bool shutdown_called = false;
+  int messages_sent = 0;
+  int events_scheduled = 0;
+  int io_reservations_requested = 0;
+  int remote_recovery_reservations_requested = 0;
+  int events_on_commit_scheduled = 0;
+  bool first_write_in_interval = false;
+};
+
+// Test fixture for PeeringState tests
+class PeeringStateTest : public ::testing::Test {
+protected:
+  std::shared_ptr<OSDMap> osdmap;
+  std::vector<int> up;
+  std::vector<int> acting;
+  std::vector<int> up_acting;
+  int up_primary;
+  int acting_primary;
+  uint64_t pool_id;
+  int pool_size;
+  osd_reqid_t reqid;
+
+  // Per OSD state
+  std::map<int,unique_ptr<PeeringState>> osd_peeringstate;
+  std::map<int,unique_ptr<PeeringCtx>> osd_peeringctx;
+  std::map<int,unique_ptr<MockPeeringListener>> listeners;
+
+  // Dpp helper
+  // Generate log output that includes the OSD and shard (because tests
+  // simulate multiple OSDs) and the PeeringState
+  class DppHelper : public NoDoutPrefix {
+    public:
+    PeeringStateTest *t;
+    int osd;
+    int shard;
+    DppHelper(CephContext *cct, unsigned subsys, PeeringStateTest *t, int osd, int shard)
+    : NoDoutPrefix(cct, subsys), t(t), osd(osd), shard(shard) {}
+
+    std::ostream& gen_prefix(std::ostream& out) const override
+    {
+      out << "osd " << osd << "(" << shard << "): ";
+      if (t->osd_peeringstate.contains(osd)) {
+        PeeringState *ps = t->osd_peeringstate[osd].get();
+        out << *ps << " ";
+      }
+      return out;
+    }
+  };
+  std::map<int,unique_ptr<DppHelper>> dpp;
+
+  DoutPrefixProvider *get_dpp(int osd)
+  {
+    return dpp[osd].get();
+  }
+
+  PeeringState *get_ps(int osd)
+  {
+    return osd_peeringstate[osd].get();
+  }
+
+  bool has_ps(int osd)
+  {
+    return osd_peeringstate.contains(osd);
+  }
+
+  PeeringCtx *get_ctx(int osd)
+  {
+    return osd_peeringctx[osd].get();
+  }
+
+  MockPeeringListener *get_listener(int osd)
+  {
+    return listeners[osd].get();
+  }
+
+  // ============================================================================
+  // Helpers for OSDMap, Pools, Epochs and Acting Set
+  // ============================================================================
+
+  // Helper to create OSDMap
+  std::shared_ptr<OSDMap> setup_osdmap(int num_osds)
+  {
+    dout(0) << "setup_osdmap" << dendl;
+    auto osdmap = std::make_shared<OSDMap>();
+    uuid_d fsid;
+    osdmap->build_simple(g_ceph_context, 0, fsid, num_osds);
+    OSDMap::Incremental pending_inc(osdmap->get_epoch() + 1);
+    pending_inc.fsid = osdmap->get_fsid();
+    entity_addrvec_t sample_addrs;
+    sample_addrs.v.push_back(entity_addr_t());
+    uuid_d sample_uuid;
+    for (int i = 0; i < num_osds; ++i) {
+      sample_uuid.generate_random();
+      sample_addrs.v[0].nonce = i;
+      pending_inc.new_state[i] = CEPH_OSD_EXISTS | CEPH_OSD_NEW;
+      pending_inc.new_up_client[i] = sample_addrs;
+      pending_inc.new_up_cluster[i] = sample_addrs;
+      pending_inc.new_hb_back_up[i] = sample_addrs;
+      pending_inc.new_hb_front_up[i] = sample_addrs;
+      pending_inc.new_weight[i] = CEPH_OSD_IN;
+      pending_inc.new_uuid[i] = sample_uuid;
+      // Setup osd_xinfo as PeeringState checks features
+      osd_xinfo_t xi;
+      xi.features = CEPH_FEATURES_ALL;
+      pending_inc.new_xinfo[i] = xi;
+    }
+    osdmap->apply_incremental(pending_inc);
+    return osdmap;
+  }
+
+  // Helper to update OSDMap and the epoch number in each listener
+  void apply_incremental(OSDMap::Incremental inc)
+  {
+    osdmap->apply_incremental(inc);
+    for (const auto &[osd, l] : listeners) {
+      l->current_epoch = osdmap->get_epoch();
+    }
+  }
+
+  // Helper to create new OSDMap epoch and process up_thru and pg_temp updates
+  // if_required - if set to true only generates a new epoch if up_thru or pg_temp change required
+  // returns true if a new epoch was created
+  bool new_epoch(bool if_required = false)
+  {
+    bool did_work = false;
+    epoch_t e = osdmap->get_epoch();
+    OSDMap::Incremental pending_inc(e + 1);
+    pending_inc.fsid = osdmap->get_fsid();
+    for (auto osd: up_acting) {
+      if (has_ps(osd)) {
+        if (get_ps(osd)->get_need_up_thru()) {
+          dout(0) << "new_epoch updating up_thru for osd " << osd << dendl;
+          pending_inc.new_up_thru[osd] = e;
+          did_work = true;
+        }
+      }
+      if (osd == acting_primary && get_listener(osd)) {
+        MockPeeringListener *listener = get_listener(osd);
+        if (listener->pg_temp_wanted) {
+          dout(0) << "new_epoch updating acting set to " << listener->next_acting << dendl;
+          acting = listener->next_acting;
+          if (acting.empty()) {
+            acting = up;
+          }
+          listener->pg_temp_wanted = false;
+          did_work = true;
+        }
+      }
+    }
+    if (!did_work && !if_required) {
+      // No need for a new epoch
+      return false;
+    }
+    apply_incremental(pending_inc);
+    return true;
+  }
+
+  // Note: Currently the acting and up set are being maintained separatly from
+  // the OSDMap which gives the test harness a little more control over choice
+  // of OSDs and shards than using CRUSH. It would probably be better to use
+  // pg_upmap and pg_temp in the OSDMap to control this
+  // Unlike OSDMap, the test harness requires both up and acting to be set
+  // even if they are the same.
+
+  // Helper to configure up set and acting set
+  void setup_up_acting()
+  {
+    // Simple configuration - up set = acting set = {0, 1, 2, ... }
+    up.clear();
+    acting.clear();
+    up_acting.clear();
+    for (int osd = 0; osd < pool_size; osd++) {
+      up.push_back(osd);
+      acting.push_back(osd);
+      up_acting.push_back(osd);
+    }
+    up_primary = 0;
+    acting_primary = 0;
+  }
+
+  // Helper to create an EC Pool
+  void create_ec_pool(int k = 2, int m = 2, bool fast_ec = true)
+  {
+    pool_size = k + m;
+    OSDMap::Incremental new_pool_inc(osdmap->get_epoch() + 1);
+    new_pool_inc.new_pool_max = osdmap->get_pool_max();
+    new_pool_inc.fsid = osdmap->get_fsid();
+    pool_id = ++new_pool_inc.new_pool_max;
+    pg_pool_t empty;
+    auto p = new_pool_inc.get_new_pool(pool_id, &empty);
+    p->size = pool_size;
+    p->min_size = k; // lower than normal to allow more error-injects
+    p->set_pg_num(1);
+    p->set_pgp_num(1);
+    p->type = pg_pool_t::TYPE_ERASURE;
+    int r = osdmap->crush->add_simple_rule(
+        "erasure", "default", "osd", "",
+        "indep", pg_pool_t::TYPE_ERASURE,
+        &cerr);
+    p->crush_rule = r;
+    p->set_flag(pg_pool_t::FLAG_HASHPSPOOL);
+    // Warning - name not unique
+    new_pool_inc.new_pool_names[pool_id] = "pool";
+    std::map<std::string, std::string> erasure_code_profile =
+      {{"plugin", "isa"},
+       {"technique", "reed_sol_van"},
+       {"k", fmt::format("{}", k)},
+       {"m", fmt::format("{}", m)},
+       {"stripe_unit", "16384"}};
+    // Warning - profile not unique
+    osdmap->set_erasure_code_profile(
+        "default",
+        erasure_code_profile);
+    p->erasure_code_profile =
+        "default";
+    p->set_flag(pg_pool_t::FLAG_EC_OVERWRITES);
+    if (fast_ec) {
+      p->nonprimary_shards.clear();
+      for (int i = 1; i < k; i++) {
+        p->nonprimary_shards.insert(shard_id_t(i));
+      }
+      p->set_flag(pg_pool_t::FLAG_EC_OPTIMIZATIONS);
+    }
+    apply_incremental(new_pool_inc);
+    setup_up_acting();
+  }
+
+  // Helper to create a Replica Pool
+  void create_rep_pool(int n = 3)
+  {
+    // Create a replica pool
+    pool_size = n;
+    OSDMap::Incremental new_pool_inc(osdmap->get_epoch() + 1);
+    new_pool_inc.new_pool_max = osdmap->get_pool_max();
+    new_pool_inc.fsid = osdmap->get_fsid();
+    pool_id = ++new_pool_inc.new_pool_max;
+    pg_pool_t empty;
+    auto p = new_pool_inc.get_new_pool(pool_id, &empty);
+    p->size = pool_size;
+    p->min_size = 1; // lower than normal to allow more error-injects
+    p->set_pg_num(1);
+    p->set_pgp_num(1);
+    p->type = pg_pool_t::TYPE_REPLICATED;
+    p->crush_rule = 0;
+    p->set_flag(pg_pool_t::FLAG_HASHPSPOOL);
+    // Warning - name not unique
+    new_pool_inc.new_pool_names[pool_id] = "pool";
+    apply_incremental(new_pool_inc);
+    setup_up_acting();
+  }
+
+#if POOL_MIGRATION
+  // Create a 2nd pool as an EC pool and set up migration from the old pool
+  void migrate_to_ec_pool(int k = 2, int m = 2, bool fast_ec = true)
+  {
+    uint64_t old_pool_id = pool_id;
+    int old_pool_size = pool_size;
+    create_ec_pool(k, m, fast_ec);
+    OSDMap::Incremental pending_inc(osdmap->get_epoch() + 1);
+    const pg_pool_t *sp = osdmap->get_pg_pool(old_pool_id);
+    pg_pool_t *spi = pending_inc.get_new_pool(old_pool_id, sp);
+    const pg_pool_t *tp = osdmap->get_pg_pool(pool_id);
+    pg_pool_t *tpi = pending_inc.get_new_pool(pool_id, tp);
+    spi->migration_src.reset();
+    spi->migration_target = pool_id;
+    tpi->migration_src = old_pool_id;
+    tpi->migration_target.reset();
+    spi->migrating_pgs.emplace(pg_t(0, old_pool_id));
+    spi->lowest_migrated_pg = 1;
+    apply_incremental(pending_inc);
+    // Reset to the source pool
+    pool_id = old_pool_id;
+    pool_size = old_pool_size;
+    setup_up_acting();
+  }
+#endif
+
+  // Helper to swap an OSD in the up and acting set
+  void modify_up_acting(int offset, int osd)
+  {
+    up[offset] = osd;
+    acting[offset] = osd;
+    up_acting.push_back(osd);
+    new_epoch();
+  }
+
+  // Helper - take OSD down by removing it from up/acting set
+  void osd_down(int offset, int osd) {
+    dout(0) << "= osd." << osd << "(" << offset << ") set down+out =" << dendl;
+    ceph_assert(up[offset] == osd);
+    up[offset] = pg_pool_t::pg_CRUSH_ITEM_NONE;
+    ceph_assert(acting[offset] == osd);
+    acting[offset] = pg_pool_t::pg_CRUSH_ITEM_NONE;
+    up_acting.erase(remove(up_acting.begin(), up_acting.end(), osd), up_acting.end());
+    // Mark the OSD down+out in the OSDMap
+    OSDMap::Incremental pending_inc(osdmap->get_epoch() + 1);
+    pending_inc.pending_osd_state_set(osd, CEPH_OSD_UP); // XORed
+    pending_inc.new_weight[osd] = CEPH_OSD_OUT;
+    apply_incremental(pending_inc);
+  }
+
+  // Helper - bring OSD up by adding it to up/acting set
+  void osd_up(int offset, int osd) {
+    dout(0) << "= osd." << osd << "(" << offset << ") set up+in =" << dendl;
+    ceph_assert(up[offset] == pg_pool_t::pg_CRUSH_ITEM_NONE);
+    up[offset] = osd;
+    ceph_assert(acting[offset] == pg_pool_t::pg_CRUSH_ITEM_NONE);
+    acting[offset] = osd;
+    up_acting.push_back(osd);
+    // Mark the OSD up+in in the OSD Map
+    OSDMap::Incremental pending_inc(osdmap->get_epoch() + 1);
+    pending_inc.pending_osd_state_set(osd, CEPH_OSD_UP);
+    pending_inc.new_weight[osd] = CEPH_OSD_IN;
+    apply_incremental(pending_inc);
+  }
+
+  // ============================================================================
+  // Dispatchers - Send work (peering messages) to other OSDs and process queued
+  // peering events. These functions emulate Messenger and the OSD Scheduler.
+  // Normally dispatch_all() is called to process all the queues until they are
+  // all empty, but there are more granular versions which can be used to test
+  // state or inject changes after each step
+  // ============================================================================
+
+  // Dispatcher - send peering messages to other OSDs from a given OSD. Peering
+  // messages are queued in message_map in PeeringCtx
+  // toosd - if specified only send messages to the specified OSD
+  // num_messages - if specified only send up to the specified number of messages
+  bool dispatch_peering_messages( int fromosd, int toosd = -1, int num_messages = -1 )
+  {
+    PeeringCtx *ctx = get_ctx(fromosd);
+    bool did_work = false;
+    for (auto& [osd, ls] : ctx->message_map) {
+      if (!osdmap->is_up(osd)) {
+        dout(0) << __func__ << " skipping down osd." << osd << dendl;
+        continue;
+      }
+      if ( toosd >= 0 && osd != toosd) {
+        continue;
+      }
+      for (auto it = ls.begin(); it != ls.end();) {
+        auto m = *it;
+        it = ls.erase(it);
+        MOSDPeeringOp *pm = static_cast<MOSDPeeringOp*>(m.detach());
+        dout(0) << __func__ << " sending from osd." << fromosd << " to osd." << osd << " " << *pm << dendl;
+        ceph_msg_header h = pm->get_header();
+        h.src.num = fromosd;
+        pm->set_header(h);
+#if WITH_CRIMSON
+        pm->set_features(CEPH_FEATURES_ALL);
+#else
+        ConnectionRef c = new MockConnection();
+        pm->set_connection(c);
+#endif
+        get_ps(osd)->handle_event(PGPeeringEventRef(pm->get_event()), get_ctx(osd));
+        did_work = true;
+        if (num_messages > 0 && --num_messages == 0) {
+          return did_work;
+        }
+      }
+    }
+    return did_work;
+  }
+
+  // Dispatch peering messages to all OSDs until all queues empty
+  bool dispatch_all_peering_messages()
+  {
+    bool rc = false;
+    bool did_work;
+    do {
+      did_work = false;
+      for (auto osd: up_acting) {
+        did_work |= dispatch_peering_messages(osd);
+      }
+      rc |= did_work;
+    } while (did_work);
+    return rc;
+  }
+
+  // Dispatcher - send clustering messages to other OSDs from a given OSD. Cluster
+  // messages are sent by send_cluster_message and are queued by MockPeeringListener
+  // toosd - if specified only send messages to the specified OSD
+  // num_messages - if specified only send up to the specified number of messages
+  bool dispatch_cluster_messages( int fromosd, int toosd = -1, int num_messages = -1 )
+  {
+    bool did_work = false;
+    for (auto& [osd, ls] : get_listener(fromosd)->messages) {
+      if (!osdmap->is_up(osd)) {
+        dout(0) << __func__ << " skipping down osd." << osd << dendl;
+        continue;
+      }
+      if ( toosd >= 0 && osd != toosd) {
+        continue;
+      }
+      for (auto it = ls.begin(); it != ls.end();) {
+        auto mr = *it;
+        auto m = mr.detach();
+        it = ls.erase(it);
+        // TODO : Should handle messages other than MOSDPeeringOp events, however
+        // for now this seems to be sufficient
+        dout(0) << __func__ << " message type = " << m->get_type() << dendl;
+        MOSDPeeringOp *pm = static_cast<MOSDPeeringOp*>(m);
+        dout(0) << __func__ << " sending from osd." << fromosd << " to osd." << osd << " " << *pm << dendl;
+        ceph_msg_header h = pm->get_header();
+        h.src.num = fromosd;
+        pm->set_header(h);
+#if WITH_CRIMSON
+        pm->set_features(CEPH_FEATURES_ALL);
+#else
+        ConnectionRef c = new MockConnection();
+        pm->set_connection(c);
+#endif
+        get_ps(osd)->handle_event(PGPeeringEventRef(pm->get_event()), get_ctx(osd));
+        did_work = true;
+        if (num_messages > 0 && --num_messages == 0) {
+          return did_work;
+        }
+      }
+    }
+    return did_work;
+  }
+
+  // Dispatch cluster messages to all OSDs until all queues empty
+  bool dispatch_all_cluster_messages()
+  {
+    bool rc = false;
+    bool did_work;
+    do {
+      did_work = false;
+      for (auto osd: up_acting) {
+        did_work |= dispatch_cluster_messages(osd);
+      }
+      rc |= did_work;
+    } while (did_work);
+    return rc;
+  }
+
+  // Dispatcher - dispatch events. Events are generated by MockPeeringListener in
+  // response to some of the calls from PeeringState, this emulates the behavior
+  // of PrimaryLogPg and PG
+  // stalled - true if dispatching stalled events (see inject_event_stall)
+  // num_events - if specified only send up to the specified number of events
+  bool dispatch_events( int fromosd, bool stalled = false, int num_events = -1 )
+  {
+    bool did_work = false;
+    auto &ls = stalled ? get_listener(fromosd)->stalled_events :
+                         get_listener(fromosd)->events;
+    for (auto it = ls.begin(); it != ls.end();) {
+      auto evt = *it;
+      it = ls.erase(it);
+      dout(0) << __func__ << " event to osd." << fromosd << " " << evt->get_desc() << dendl;
+      get_ps(fromosd)->handle_event(evt, get_ctx(fromosd));
+      did_work = true;
+      if (num_events > 0 && --num_events == 0) {
+        return did_work;
+      }
+    }
+    return did_work;
+  }
+
+  // Dispatch events to all OSDs until all queues empty
+  // stalled - true if dispatching stalled events (see inject_event_stall)
+  bool dispatch_all_events(bool stalled = false)
+  {
+    bool rc = false;
+    bool did_work;
+    do {
+      did_work = false;
+      for (auto osd: up_acting) {
+        did_work |= dispatch_events(osd, stalled);
+      }
+      rc |= did_work;
+    } while (did_work);
+    return rc;
+  }
+
+  // Dispatch all types of queued work repeatedly until queues are empty
+  bool dispatch_all()
+  {
+    bool rc = false;
+    bool did_work;
+    do {
+      did_work = dispatch_all_peering_messages();
+      did_work |= dispatch_all_cluster_messages();
+      did_work |= dispatch_all_events();
+      rc |= did_work;
+    } while (did_work);
+    return rc;
+  }
+
+  // ============================================================================
+  // Factory to create PeeringState instance for a given osd and shard
+  // ============================================================================
+
+  // Helper to create a single PeeringState instance
+  PeeringState *create_peering_state(int osd, int shard)
+  {
+    dout(0) << "= create_peering_state osd." << osd << "(" << shard <<") =" << dendl;
+    const pg_pool_t pi = *osdmap->get_pg_pool(pool_id);
+    pg_shard_t pg_whoami(osd, pi.is_erasure() ? shard_id_t(shard) : shard_id_t::NO_SHARD);
+    PGPool pool(osdmap, pool_id, pi, osdmap->get_pool_name(pool_id));
+    dpp[osd] = make_unique<DppHelper>(g_ceph_context, dout_subsys, this, osd, shard);
+    spg_t spgid = spg_t(pg_t(0, pool_id), pg_whoami.shard);
+    listeners[osd] = make_unique<MockPeeringListener>(osdmap, pi, get_dpp(osd), pg_whoami);
+    get_listener(osd)->current_epoch = osdmap->get_epoch();
+    unique_ptr<PeeringState> ps = make_unique<PeeringState>(
+      g_ceph_context,
+      pg_whoami,
+      spgid,
+      pool,
+      osdmap,
+      PG_FEATURE_CLASSIC_ALL,
+      get_dpp(osd),
+      get_listener(osd));
+    listeners[osd]->ps = ps.get();
+    ps->set_backend_predicates(
+      get_is_readable_predicate(),
+      get_is_recoverable_predicate());
+    osd_peeringstate[osd] = std::move(ps);
+    osd_peeringctx[osd] = make_unique<PeeringCtx>();
+    return get_ps(osd);
+  }
+
+  // ============================================================================
+  // Helpers to test PeeringState interfaces
+  // ============================================================================
+
+  // Helper - create peering state for all osds
+  void test_create_peering_state(int toosd = -1, int shard = -1)
+  {
+    dout(0) << "= test_create_peering_state =" << dendl;
+    for (auto osd : up_acting ) {
+      if (toosd != -1 && toosd != osd) {
+        continue;
+      }
+      create_peering_state(osd, (shard == -1) ? osd : shard);
+    }
+  }
+
+  // Helper - init for all osds
+  void test_init(int toosd = -1, bool dne = false)
+  {
+    dout(0) << "= test_init =" << dendl;
+    pg_history_t history;
+    history.same_interval_since = osdmap->get_epoch();
+    history.epoch_pool_created = osdmap->get_epoch();
+    history.last_epoch_clean = osdmap->get_epoch();
+    if (!dne) {
+      history.epoch_created = osdmap->get_epoch();
+    }
+    PastIntervals past_intervals;
+    ObjectStore::Transaction t;
+
+    for (auto osd : up_acting ) {
+      if (toosd != -1 && toosd != osd) {
+        continue;
+      }
+      get_ps(osd)->init(
+        (osd == acting_primary) ? 0 /* role: primary */ : 1 /* role: replica */,
+        up,
+        up_primary,
+        acting,
+        acting_primary,
+        history,
+        past_intervals,
+        t);
+    }
+  }
+
+  // Helper - init from disk for all osds
+  void test_init_from_disk(int toosd = -1)
+  {
+    dout(0) << "= test_init_from_disk =" << dendl;
+
+    for (auto osd : up_acting ) {
+      if (toosd != -1 && toosd != osd) {
+        continue;
+      }
+      pg_info_t info;
+      spg_t spgid = spg_t(pg_t(0, pool_id), shard_id_t(0));
+      info.pgid = spgid;
+      PastIntervals past_intervals;
+
+      get_ps(osd)->init_from_disk_state(
+        std::move(info),
+        std::move(past_intervals),
+        [](PGLog &log) { return 0; });
+    }
+  }
+
+  // Helper - initialize event for all osds
+  void test_event_initialize(int toosd = -1)
+  {
+    dout(0) << "= test_event_initialize =" << dendl;
+    for (auto osd : up_acting ) {
+      if (toosd != -1 && toosd != osd) {
+        continue;
+      }
+      auto evt = std::make_shared<PGPeeringEvent>(
+        osdmap->get_epoch(),
+        osdmap->get_epoch(),
+        PeeringState::Initialize());
+
+      get_ps(osd)->handle_event(evt, get_ctx(osd));
+    }
+  }
+
+  // Helper - advance map for all osds
+  void test_event_advance_map(int toosd = -1)
+  {
+    dout(0) << "= test_event_advance_map =" << dendl;
+    for (auto osd : up_acting ) {
+      if (toosd != -1 && toosd != osd) {
+        continue;
+      }
+      get_ps(osd)->advance_map(osdmap, osdmap, up, up_primary, acting, acting_primary, *(get_ctx(osd)));
+    }
+  }
+
+  // Helper - activate map event for all osds
+  void test_event_activate_map(int toosd = -1)
+  {
+    dout(0) << "= test_event_activate_map =" << dendl;
+    for (auto osd : up_acting ) {
+      if (toosd != -1 && toosd != osd) {
+        continue;
+      }
+      get_ps(osd)->activate_map(*(get_ctx(osd)));
+    }
+  }
+
+  // Helper - construct a shard_id_set from a vector of integers
+  shard_id_set ss(vector<int> v) {
+    shard_id_set s;
+    for (auto e : v) {
+      s.insert(shard_id_t(e));
+    }
+    return s;
+  }
+  const shard_id_set ss_all;
+
+  // Helper - construct a pg_log_entry and call append_log for specified osds
+  // By default create a full write and append the log entry to all the OSDs
+  // written - if provided then create a partial write to the specified OSDs
+  // osds - if provided then only append the log entry to the specified OSDs
+  // update_only - if true then only update the log, do not complete it
+  // do_trim - if true then trim earlier log entries (usefull for testing backfill)
+  // returns eversion of the new log entry
+  eversion_t test_append_log_entry(
+    shard_id_set written = shard_id_set(),
+    shard_id_set osds = shard_id_set(),
+    bool update_only = false,
+    bool do_trim = false)
+  {
+    if (osds.empty()) {
+      int shard = 0;
+      for (auto osd : acting) {
+        if (osd != pg_pool_t::pg_CRUSH_ITEM_NONE) {
+          osds.insert(shard_id_t(shard));
+        }
+        shard++;
+      }
+    }
+    dout(0) << "= test_append_log_entry written=" << written << " osds=" << osds << " =" << dendl;
+    if (get_listener(acting_primary)->first_write_in_interval) {
+      // Fix for issue 73891
+      if (!written.empty()) {
+        dout(0) << "First write in new interval is promoted to a full write" << dendl;
+        written.clear();
+        get_listener(acting_primary)->first_write_in_interval = false;
+      }
+    }
+    object_t oid("foo");
+    hobject_t soid(oid, oid.name, 0, 1234, pool_id, "");
+    ++reqid.tid;
+    eversion_t v, pv;
+    version_t uv;
+    v.epoch = osdmap->get_epoch();
+    v.version = reqid.tid;
+    if (reqid.tid > 1) {
+      pv.epoch = v.epoch;
+      pv.version = reqid.tid - 1;
+    } else {
+      pv.epoch = 0;
+      pv.version = 0;
+    }
+    uv = v.version;
+    pg_log_entry_t entry(pg_log_entry_t::MODIFY, soid, v, pv, uv, reqid, utime_t(), 0);
+    entry.written_shards = written;
+
+    for (auto osd : osds) {
+      // Skips OSDs that are not written
+      if (!written.empty() && !written.contains(osd)) {
+        continue;
+      }
+      std::vector<pg_log_entry_t> entries;
+      ObjectStore::Transaction t;
+      entries.push_back(entry);
+
+      eversion_t trim;
+      if (do_trim) {
+        trim = pv;
+      }
+      get_ps(int(osd))->append_log(
+        std::move(entries),
+        trim, // trim_to
+        trim, // roll_forward_to
+        trim, // pg_committed_to
+        t,
+        true,
+        false);
+      if (!update_only) {
+        std::vector<pg_log_entry_t> empty;
+        get_ps(int(osd))->append_log(
+          std::move(empty),
+          trim, // trim_to
+          v, // roll_forward_to
+          v, // pg_committed_to
+          t,
+          true,
+          false);
+      }
+    }
+    return v;
+  }
+
+  // Helper - call begin_peer_recover
+  void test_begin_peer_recover(int osd, int shard)
+  {
+    dout(0) << "= test_begin_peer_recover " << osd << "(" << shard << ") =" << dendl;
+    object_t oid("foo");
+    hobject_t soid(oid, oid.name, 0, 1234, pool_id, "");
+    get_ps(acting_primary)->begin_peer_recover(pg_shard_t(osd, shard_id_t(shard)), soid);
+  }
+
+  // Helper - call on_peer_recover
+  void test_on_peer_recover(int osd, int shard, eversion_t v)
+  {
+    dout(0) << "= test_on_peer_recover " << osd << "(" << shard << ")" << v << " =" << dendl;
+    object_t oid("foo");
+    hobject_t soid(oid, oid.name, 0, 1234, pool_id, "");
+    ObjectStore::Transaction t;
+    get_ps(acting_primary)->on_peer_recover(pg_shard_t(osd, shard_id_t(shard)), soid, v);
+  }
+
+  // Helper - call recover got to indicate an object has been recovered
+  void test_recover_got(int osd, eversion_t v)
+  {
+    dout(0) << "= test_recover_got " << osd << " " << v << " =" << dendl;
+    object_t oid("foo");
+    hobject_t soid(oid, oid.name, 0, 1234, pool_id, "");
+    ObjectStore::Transaction t;
+    get_ps(osd)->recover_got(soid, v, false, t);
+  }
+
+  // Helper - call object_recovered
+  void test_object_recovered()
+  {
+    dout(0) << "= test_object_recovered =" << dendl;
+    object_t oid("foo");
+    hobject_t soid(oid, oid.name, 0, 1234, pool_id, "");
+    object_stat_sum_t stat_diff;
+    get_ps(acting_primary)->object_recovered(soid, stat_diff);
+  }
+
+  void test_prepare_backfill_for_missing(int osd, int shard, eversion_t version)
+  {
+    dout(0) << "= test_prepare_backfill_for_missing " << osd << " " << shard << " " << version << " =" << dendl;
+    object_t oid("foo");
+    hobject_t soid(oid, oid.name, 0, 1234, pool_id, "");
+    get_ps(acting_primary)->prepare_backfill_for_missing(soid, version, {pg_shard_t(osd,shard_id_t(shard))});
+  }
+
+  void test_update_peer_last_backfill(int osd, int shard, hobject_t last_backfill)
+  {
+    dout(0) << "= test_update_peer_last_backfill =" << dendl;
+    get_ps(acting_primary)->update_peer_last_backfill(pg_shard_t(osd, shard_id_t(shard)), last_backfill);
+  }
+
+  void test_update_backfill_progress(int osd, hobject_t last_backfill)
+  {
+    dout(0) << "= test_update_backfill_progress =" << dendl;
+    pg_stat_t stats;
+    ObjectStore::Transaction t;
+    get_ps(osd)->update_backfill_progress(last_backfill, stats, false, t);
+  }
+
+  // Helper - all replicas recovered
+  void test_event_all_replicas_recovered()
+  {
+    dout(0) << "= test_event_all_replicas_recovered =" << dendl;
+    auto evt = std::make_shared<PGPeeringEvent>(
+      osdmap->get_epoch(),
+      osdmap->get_epoch(),
+      PeeringState::AllReplicasRecovered());
+
+    get_ps(acting_primary)->handle_event(evt, get_ctx(acting_primary));
+  }
+
+  // Helper - recovery done
+  void test_event_recovery_done(int osd)
+  {
+    dout(0) << "= test_event_recovery_done =" << dendl;
+    auto evt = std::make_shared<PGPeeringEvent>(
+      osdmap->get_epoch(),
+      osdmap->get_epoch(),
+      RecoveryDone());
+
+    get_ps(osd)->handle_event(evt, get_ctx(osd));
+  }
+
+  // Helper - backfilled
+  void test_event_backfilled()
+  {
+    dout(0) << "= test_event_backfilled =" << dendl;
+    auto evt = std::make_shared<PGPeeringEvent>(
+      osdmap->get_epoch(),
+      osdmap->get_epoch(),
+      PeeringState::Backfilled());
+
+    get_ps(acting_primary)->handle_event(evt, get_ctx(acting_primary));
+  }
+
+#if POOL_MIGRATION
+// Helper - migration done
+  void test_event_migration_done()
+  {
+    dout(0) << "= test_event_migration_done =" << dendl;
+    auto evt = std::make_shared<PGPeeringEvent>(
+      osdmap->get_epoch(),
+      osdmap->get_epoch(),
+      PeeringState::PoolMigrationDone());
+
+    get_ps(acting_primary)->handle_event(evt, get_ctx(acting_primary));
+  }
+#endif
+
+  // Helper - run peering cycle with new epochs if required for upthru or pgtemp
+  void test_peering()
+  {
+    // Full peering cycle
+    while (true) {
+      test_event_advance_map();
+      test_event_activate_map();
+      dispatch_all();
+      bool did_work = new_epoch(false); // New epoch if required for upthru or pgtemp
+      if (!did_work) {
+        break;
+      }
+    }
+  }
+
+  // ============================================================================
+  // Verification Helper Functions
+  // ============================================================================
+
+  // Helper - verify primary OSD is in active+clean state
+  void verify_primary_active_clean(int osd)
+  {
+    dout(0) << "Verifying primary osd " << osd << " is active+clean" << dendl;
+    auto ps = get_ps(osd);
+    EXPECT_TRUE(get_listener(osd)->clean_called);
+    EXPECT_TRUE(get_listener(osd)->activate_complete_called);
+    EXPECT_TRUE(ps->is_clean());
+    EXPECT_FALSE(ps->is_recovering());
+    EXPECT_FALSE(ps->is_backfilling());
+#if POOL_MIGRATION
+    EXPECT_FALSE(ps->is_migrating());
+#endif
+  }
+
+  // Helper - verify primary OSD is in active+recovering state
+  void verify_primary_active_recovering(int osd)
+  {
+    dout(0) << "Verifying primary osd " << osd << " is active+recovering" << dendl;
+    auto ps = get_ps(osd);
+    EXPECT_TRUE(get_listener(osd)->activate_complete_called);
+    EXPECT_FALSE(ps->is_clean());
+    EXPECT_TRUE(ps->is_recovering());
+    EXPECT_FALSE(ps->is_backfilling());
+#if POOL_MIGRATION
+    EXPECT_FALSE(ps->is_migrating());
+#endif
+  }
+
+  // Helper - verify primary OSD is in active+backfilling state
+  void verify_primary_active_backfilling(int osd)
+  {
+    dout(0) << "Verifying primary osd " << osd << " is active+backfilling" << dendl;
+    auto ps = get_ps(osd);
+    EXPECT_TRUE(get_listener(osd)->activate_complete_called);
+    EXPECT_FALSE(ps->is_clean());
+    EXPECT_FALSE(ps->is_recovering());
+    EXPECT_TRUE(ps->is_backfilling());
+#if POOL_MIGRATION
+    EXPECT_FALSE(ps->is_migrating());
+#endif
+  }
+
+  // Helper - verify primary OSD is in active+migrating state
+  void verify_primary_active_migrating(int osd)
+  {
+    dout(0) << "Verifying primary osd " << osd << " is active+migrating" << dendl;
+    auto ps = get_ps(osd);
+    EXPECT_TRUE(get_listener(osd)->activate_complete_called);
+    EXPECT_FALSE(ps->is_clean());
+    EXPECT_FALSE(ps->is_recovering());
+    EXPECT_FALSE(ps->is_backfilling());
+#if POOL_MIGRATION
+    EXPECT_TRUE(ps->is_migrating());
+#endif
+  }
+
+  // Helper - verify replica OSD is activated
+  void verify_replica_activated(int osd)
+  {
+    dout(0) << "Verifying replica osd " << osd << " is activated" << dendl;
+    EXPECT_TRUE(get_listener(osd)->activate_committed_called);
+  }
+
+  // Helper - verify OSD is active and peered
+  void verify_active_and_peered(int osd)
+  {
+    auto ps = get_ps(osd);
+    EXPECT_TRUE(ps->is_active());
+    EXPECT_TRUE(ps->is_peered());
+  }
+
+  // Helper - verify log state for an OSD
+  void verify_log_state(int osd, const eversion_t& expected_update,
+                        const eversion_t& expected_complete,
+                        const eversion_t& expected_head,
+                        const eversion_t& expected_tail)
+  {
+    auto ps = get_ps(osd);
+    EXPECT_EQ(ps->get_info().last_update, expected_update);
+    EXPECT_EQ(ps->get_info().last_complete, expected_complete);
+    EXPECT_EQ(ps->get_pg_log().get_head(), expected_head);
+    EXPECT_EQ(ps->get_pg_log().get_tail(), expected_tail);
+  }
+
+  // Helper - verify logs
+  // identical - if identical is true, all logs should be identical
+  //            if identical is false, all logs should be equivalent
+  //            with differences only because of partial writes
+  void verify_logs(bool identical = false)
+  {
+    auto logp = get_ps(acting_primary)->get_pg_log();
+    int shard = 0;
+    for (auto osd: acting) {
+      if (osd == acting_primary || osd == pg_pool_t::pg_CRUSH_ITEM_NONE) {
+        ++shard;
+        continue;
+      }
+      dout(0) << "Verifying log for osd " << osd << dendl;
+      auto log = get_ps(osd)->get_pg_log();
+      // Head and tail should match
+      EXPECT_EQ(log.get_head(), logp.get_head());
+      EXPECT_EQ(log.get_tail(), logp.get_tail());
+      auto pi = logp.get_log().log.begin();
+      auto pe = logp.get_log().log.end();
+      auto i = log.get_log().log.begin();
+      auto e = log.get_log().log.end();
+      while (pi != pe && i != e) {
+        if (pi->version < i->version && !pi->is_written_shard(shard_id_t(shard))) {
+          // Primary may have partial write log entries that this
+          // shard does not have because it was not written to
+          ++pi;
+          continue;
+        }
+        // Log entries should match (not a perfect check, but good enough for peering)
+        EXPECT_EQ(pi->version, i->version);
+        EXPECT_EQ(pi->soid, i->soid);
+        EXPECT_EQ(pi->op, i->op);
+        ++pi;
+        ++i;
+      }
+      while (pi != pe && !pi->is_written_shard(shard_id_t(shard))) {
+        // Primary may have partial write log entries that this
+        // shard does not have because it was not written to
+        ++pi;
+      }
+      // No extra entries in either log
+      EXPECT_EQ(pi, pe);
+      EXPECT_EQ(i, e);
+      ++shard;
+    }
+  }
+
+  // Helper - verify that there are no missing or unfound objects
+  void verify_no_missing_or_unfound(int osd, int shard, bool check_missing_loc = true)
+  {
+    auto ps = get_ps(osd);
+    EXPECT_FALSE(ps->have_missing());
+    EXPECT_FALSE(ps->have_unfound());
+    if (check_missing_loc) {
+      EXPECT_EQ(ps->get_missing_loc().get_missing_locs().size(), 0);
+    }
+    // Primary shard peer missing state should agree with the shard
+    if (osd != up_primary) {
+      pg_shard_t pg_whoami = get_listener(osd)->pg_whoami;
+      ps = get_ps(up_primary);
+      EXPECT_EQ(ps->get_peer_missing(pg_whoami).num_missing(), 0);
+    }
+  }
+
+  // Helper - verify that there are missing objects but not unfound objects
+  void verify_missing(int osd, int shard)
+  {
+    auto ps = get_ps(osd);
+    EXPECT_TRUE(ps->have_missing());
+    EXPECT_FALSE(ps->have_unfound());
+    // Primary shard peer missing state should agree with the shard
+    if (osd != up_primary) {
+      pg_shard_t pg_whoami = get_listener(osd)->pg_whoami;
+      auto pps = get_ps(up_primary);
+      EXPECT_EQ(pps->get_peer_missing(pg_whoami).num_missing(),
+                ps->get_num_missing());
+    }
+  }
+
+  // Helper - verify all OSDs in active+clean state with log checks
+  void verify_all_active_clean(const eversion_t& expected_update = eversion_t(),
+                                const eversion_t& expected_tail = eversion_t())
+  {
+    int shard = 0;
+    for (auto osd: up) {
+      dout(0) << "Verifying state of osd " << osd << dendl;
+      if (osd == up_primary) {
+        verify_primary_active_clean(osd);
+      } else {
+        verify_replica_activated(osd);
+      }
+      verify_no_missing_or_unfound(osd, shard);
+      verify_active_and_peered(osd);
+      verify_log_state(osd, expected_update, expected_update, expected_update, expected_tail);
+      ++shard;
+    }
+    verify_logs();
+  }
+
+  // Helper - verify all OSDs in active+recovering state with log checks
+  void verify_all_active_recovering(const eversion_t& expected_update,
+                                     const eversion_t& expected_complete,
+                                     const eversion_t& expected_complete_recovering,
+                                     int recovering_osd,
+                                     const eversion_t& expected_tail = eversion_t()) {
+    int shard = 0;
+    for (auto osd: up) {
+      dout(0) << "Verifying state of osd " << osd << dendl;
+      if (osd == up_primary) {
+        verify_primary_active_recovering(osd);
+      } else {
+        verify_replica_activated(osd);
+      }
+      if (osd != recovering_osd) {
+        verify_no_missing_or_unfound(osd, shard, osd != up_primary);
+      } else {
+        verify_missing(osd, shard);
+      }
+      verify_active_and_peered(osd);
+      verify_log_state(
+        osd,
+        expected_update,
+        (osd == recovering_osd) ? expected_complete_recovering : expected_complete,
+        expected_update,
+        expected_tail);
+      ++shard;
+    }
+    verify_logs();
+  }
+
+  // Helper - verify all OSDs in active+backfilling state with log checks
+  void verify_all_active_backfilling(const eversion_t& expected_update,
+                                      const eversion_t& expected_tail = eversion_t())
+  {
+    int shard = 0;
+    for (auto osd: up) {
+      dout(0) << "Verifying state of osd " << osd << dendl;
+      if (osd == up_primary) {
+        verify_primary_active_backfilling(osd);
+      } else {
+        verify_replica_activated(osd);
+      }
+      verify_no_missing_or_unfound(osd, shard);
+      verify_active_and_peered(osd);
+      verify_log_state(osd, expected_update, expected_update, expected_update, expected_tail);
+      ++shard;
+    }
+    verify_logs();
+  }
+
+#if POOL_MIGRATION
+  // Helper - verify all OSDs in active+migrating state with log checks
+  void verify_all_active_migrating(const eversion_t& expected_update = eversion_t(),
+                                const eversion_t& expected_tail = eversion_t())
+  {
+    int shard = 0;
+    for (auto osd: up) {
+      dout(0) << "Verifying state of osd " << osd << dendl;
+      if (osd == up_primary) {
+        verify_primary_active_migrating(osd);
+      } else {
+        verify_replica_activated(osd);
+      }
+      verify_no_missing_or_unfound(osd, shard);
+      verify_active_and_peered(osd);
+      verify_log_state(osd, expected_update, expected_update, expected_update, expected_tail);
+      ++shard;
+    }
+    verify_logs();
+  }
+#endif
+
+  // Helper - verify PWLC state
+  void verify_pwlc(epoch_t e,  std::map<shard_id_t,std::pair<eversion_t, eversion_t>> pwlc, int toosd = -1)
+  {
+    for (auto osd: acting) {
+      if (toosd != -1 && toosd != osd) {
+        continue;
+      }
+      dout(0) << "Verifying PWLC for osd " << osd << dendl;
+      auto ps = get_ps(osd);
+      auto info = ps->get_info();
+      EXPECT_EQ(info.partial_writes_last_complete_epoch, e);
+      EXPECT_EQ(info.partial_writes_last_complete, pwlc);
+    }
+  }
+
+  // ============================================================================
+  // GTest - Setup and Teardown
+  // ============================================================================
+
+  // GTest SetUp function - called before each test
+  void SetUp() override
+  {
+    g_ceph_context->_log->set_max_new(0);
+    dout(0) << "SetUp" << dendl;
+
+    // Create a basic OSDMap
+    osdmap = setup_osdmap(10);
+    // By default tests use a fast EC 2+2 pool
+    create_ec_pool();
+  }
+
+  // GTest TearDown function - called after each test
+  void TearDown() override
+  {
+    osd_peeringstate.clear();
+    osd_peeringctx.clear();
+    listeners.clear();
+    dpp.clear();
+  }
+};
+
+// ============================================================================
+// Test Stubs - Basic Initialization
+// ============================================================================
+
+TEST_F(PeeringStateTest, Construction) {
+  dout(0) << "== Construction ==" << dendl;
+  // Test basic construction of PeeringState
+  test_create_peering_state(acting[0]);
+  // Verify
+  ASSERT_NE(get_ps(acting[0]), nullptr);
+}
+
+TEST_F(PeeringStateTest, InitFresh) {
+  dout(0) << "== InitFresh ==" << dendl;
+  // Test initialization of a fresh PG
+  test_create_peering_state(acting[0]);
+  test_init(acting[0]);
+  // Verify
+  EXPECT_EQ(get_ps(acting[0])->get_role(), 0);
+  EXPECT_TRUE(get_ps(acting[0])->is_primary());
+  EXPECT_TRUE(get_listener(acting[0])->new_interval_called);
+}
+
+TEST_F(PeeringStateTest, InitFromDisk) {
+  dout(0) << "== InitFromDisk ==" << dendl;
+  // Test initialization from disk state
+  test_create_peering_state(acting[0]);
+  test_init_from_disk(acting[0]);
+  // Verify
+  EXPECT_EQ(get_ps(acting[0])->get_info().pgid,
+            spg_t(pg_t(0, pool_id), shard_id_t(0)));
+}
+
+// ============================================================================
+// Test Stubs - State Machine Events
+// ============================================================================
+
+TEST_F(PeeringStateTest, HandleInitialize) {
+  dout(0) << "== HandleInitialize ==" << dendl;
+  // Test handling Initialize event
+  test_create_peering_state(acting[0]);
+  test_init(acting[0]);
+  test_event_initialize(acting[0]);
+  // Verify state transitions occurred
+  EXPECT_TRUE(get_listener(acting[0])->new_interval_called);
+}
+
+
+TEST_F(PeeringStateTest, HandleAdvMap) {
+  dout(0) << "== HandleAdvMap ==" << dendl;
+  // Test handling ActMap event
+  test_create_peering_state(acting[0]);
+  test_init(acting[0]);
+  test_event_initialize(acting[0]);
+  test_event_advance_map(acting[0]);
+}
+
+TEST_F(PeeringStateTest, HandleActMap) {
+  dout(0) << "== HandleActMap ==" << dendl;
+  // Test handling ActMap event
+  test_create_peering_state(acting[0]);
+  test_init(acting[0]);
+  test_event_initialize(acting[0]);
+  test_event_advance_map(acting[0]);
+  test_event_activate_map(acting[0]);
+}
+
+// ============================================================================
+// Test Stubs - Peering Operations
+// ============================================================================
+
+TEST_F(PeeringStateTest, ChooseActing) {
+  // Test choosing acting set
+  test_create_peering_state(acting[0]);
+  test_init(acting[0]);
+
+  // Test that acting set is properly configured
+  EXPECT_EQ(get_ps(acting[0])->get_acting().size(), acting.size());
+  EXPECT_EQ(get_ps(acting[0])->get_acting_primary(), acting_primary);
+}
+
+// ============================================================================
+// Test Stubs - Log Operations
+// ============================================================================
+
+TEST_F(PeeringStateTest, PGLogAccess) {
+  // Test accessing PG log
+  auto ps = create_peering_state(0, 0);
+
+  const PGLog& log = ps->get_pg_log();
+  EXPECT_EQ(log.get_head(), eversion_t());
+}
+
+TEST_F(PeeringStateTest, AppendLog) {
+  // Test appending to PG log
+  auto ps = create_peering_state(0, 0);
+
+  std::vector<pg_log_entry_t> entries;
+  ObjectStore::Transaction t;
+
+  // Create a test log entry
+  pg_log_entry_t entry;
+  entry.version = eversion_t(1, 1);
+  entry.op = pg_log_entry_t::MODIFY;
+  entries.push_back(entry);
+
+  ps->append_log(
+    std::move(entries),
+    eversion_t(),
+    eversion_t(),
+    eversion_t(),
+    t,
+    false,
+    false);
+}
+
+TEST_F(PeeringStateTest, TrimLog) {
+  // Test log trimming
+  auto ps = create_peering_state(0, 0);
+
+  ps->update_trim_to();
+  eversion_t trim_to = ps->get_pg_trim_to();
+
+  // Fresh PG should have no trim boundary
+  EXPECT_EQ(trim_to, eversion_t());
+}
+
+// ============================================================================
+// Test Stubs - Missing Objects
+// ============================================================================
+
+TEST_F(PeeringStateTest, MissingLocTracking) {
+  // Test missing location tracking
+  auto ps = create_peering_state(0, 0);
+
+  const MissingLoc& missing_loc = ps->get_missing_loc();
+  EXPECT_FALSE(missing_loc.have_unfound());
+}
+
+TEST_F(PeeringStateTest, RecoverGot) {
+  // Test marking object as recovered
+  auto ps = create_peering_state(0, 0);
+
+  hobject_t oid;
+  oid.pool = 1;
+  oid.oid = "test_object";
+  ObjectStore::Transaction t;
+  ps->recover_got(oid, eversion_t(1, 1), false, t);
+}
+
+// ============================================================================
+// Test Stubs - State Queries
+// ============================================================================
+
+TEST_F(PeeringStateTest, StateQueries) {
+  // Test various state query methods
+  auto ps = create_peering_state(0, 0);
+
+  EXPECT_FALSE(ps->is_active());
+  EXPECT_FALSE(ps->is_clean());
+  EXPECT_FALSE(ps->is_peered());
+  EXPECT_FALSE(ps->is_recovering());
+  EXPECT_FALSE(ps->is_backfilling());
+}
+
+TEST_F(PeeringStateTest, RoleQueries) {
+  // Test role query methods
+  auto ps = create_peering_state(0, 0);
+
+  std::vector<int> up = {0, 1, 2};
+  std::vector<int> acting = {0, 1, 2};
+  pg_history_t history;
+  PastIntervals past_intervals;
+  ObjectStore::Transaction t;
+
+  ps->init(0, up, 0, acting, 0, history, past_intervals, t);
+
+  EXPECT_TRUE(ps->is_primary());
+  EXPECT_FALSE(ps->is_nonprimary());
+  EXPECT_EQ(ps->get_role(), 0);
+}
+
+TEST_F(PeeringStateTest, ActingSetQueries) {
+  // Test acting set queries
+  auto ps = create_peering_state(0, 0);
+
+  std::vector<int> up = {0, 1, 2};
+  std::vector<int> acting = {0, 1, 2};
+  pg_history_t history;
+  PastIntervals past_intervals;
+  ObjectStore::Transaction t;
+
+  ps->init(0, up, 0, acting, 0, history, past_intervals, t);
+
+  EXPECT_EQ(ps->get_acting().size(), 3u);
+  EXPECT_EQ(ps->get_up().size(), 3u);
+  EXPECT_TRUE(ps->is_acting(pg_shard_t(0, shard_id_t(0))));
+}
+
+// ============================================================================
+// Test Stubs - OSDMap Operations
+// ============================================================================
+
+TEST_F(PeeringStateTest, OSDMapAccess) {
+  // Test OSDMap access
+  auto ps = create_peering_state(0, 0);
+
+  const OSDMapRef& map = ps->get_osdmap();
+  EXPECT_NE(map, nullptr);
+  EXPECT_GT(map->get_epoch(), 0u);
+}
+
+TEST_F(PeeringStateTest, PoolInfo) {
+  // Test pool information access
+  auto ps = create_peering_state(0, 0);
+
+  const PGPool& pool = ps->get_pgpool();
+  EXPECT_EQ(pool.id, 1);
+}
+
+// ============================================================================
+// Test Stubs - Feature Tracking
+// ============================================================================
+
+TEST_F(PeeringStateTest, FeatureTracking) {
+  // Test feature vector tracking
+  auto ps = create_peering_state(0, 0);
+
+  uint64_t features = ps->get_min_peer_features();
+  EXPECT_GT(features, 0u);
+
+  uint64_t acting_features = ps->get_min_acting_features();
+  EXPECT_GT(acting_features, 0u);
+}
+
+// ============================================================================
+// Test Stubs - Statistics
+// ============================================================================
+
+TEST_F(PeeringStateTest, Statistics) {
+  // Test statistics access
+  auto ps = create_peering_state(0, 0);
+
+  const pg_info_t& info = ps->get_info();
+  EXPECT_EQ(info.stats.stats.sum.num_objects, 0u);
+}
+
+TEST_F(PeeringStateTest, UpdateStats) {
+  // Test statistics updates
+  auto ps = create_peering_state(0, 0);
+
+  ps->update_stats(
+    [](pg_history_t &history, pg_stat_t &stats) {
+      stats.stats.sum.num_objects = 10;
+      return true;
+    });
+
+  EXPECT_EQ(ps->get_info().stats.stats.sum.num_objects, 10u);
+}
+
+// ============================================================================
+// Test Stubs - Deletion
+// ============================================================================
+
+TEST_F(PeeringStateTest, DeletionState) {
+  // Test deletion state
+  auto ps = create_peering_state(0, 0);
+
+  EXPECT_FALSE(ps->is_deleting());
+  EXPECT_FALSE(ps->is_deleted());
+}
+
+// ============================================================================
+// Test Stubs - Flush Operations
+// ============================================================================
+
+TEST_F(PeeringStateTest, FlushOperations) {
+  // Test flush tracking
+  auto ps = create_peering_state(0, 0);
+
+  EXPECT_FALSE(ps->needs_flush());
+
+  ObjectStore::Transaction t;
+  ps->complete_flush();
+}
+
+// ============================================================================
+// Test Stubs - History and Past Intervals
+// ============================================================================
+
+TEST_F(PeeringStateTest, HistoryUpdate) {
+  // Test history updates
+  test_create_peering_state(acting[0]);
+
+  pg_history_t new_history;
+  new_history.epoch_created = 1;
+  new_history.last_epoch_clean = 1;
+
+  get_ps(acting[0])->update_history(new_history);
+}
+
+TEST_F(PeeringStateTest, PastIntervals) {
+  // Test past intervals access
+  auto ps = create_peering_state(0, 0);
+
+  const PastIntervals& past_intervals = ps->get_past_intervals();
+  EXPECT_EQ(past_intervals.size(), 0u);
+}
+
+// ============================================================================
+// Multi OSD Peering Tests
+// ============================================================================
+
+// Multi-OSD test of peering for a new (empty PG) all the way to active+clean
+TEST_F(PeeringStateTest, SimplePeeringToActiveClean) {
+  dout(0) << "== SimplePeeringToActiveClean ==" << dendl;
+  // Full peering cycle
+  test_create_peering_state();
+  test_init();
+  test_event_initialize();
+  test_event_advance_map();
+  test_event_activate_map();
+  dispatch_all();
+  new_epoch(); // for wait_upthru
+  test_event_advance_map();
+  test_event_activate_map();
+  dispatch_all();
+  // Verify that we got to active+clean
+  verify_all_active_clean();
+}
+
+// Multi-OSD test of peering for PG with 1 full write log entry all the way to active+clean
+TEST_F(PeeringStateTest, FullWritePeeringToActiveClean) {
+  dout(0) << "== FullWritePeeringToActiveClean ==" << dendl;
+  // Init
+  test_create_peering_state();
+  test_init();
+  test_event_initialize();
+  // Append 1 log entry to all shards
+  eversion_t expected = test_append_log_entry();
+  // Full peering cycle
+  test_event_advance_map();
+  test_event_activate_map();
+  dispatch_all();
+  new_epoch(); // for wait_upthru
+  test_event_advance_map();
+  test_event_activate_map();
+  dispatch_all();
+  // Verify that we got to active+clean and that log entry was kept
+  verify_all_active_clean(expected, eversion_t());
+}
+
+// Multi-OSD test of peering for PG with 2 log entries that are incomplete all the way to active+clean
+TEST_F(PeeringStateTest, IncompleteWritePeeringToActiveClean) {
+  dout(0) << "== IncompleteWritePeeringToActiveClean ==" << dendl;
+  // Init
+  test_create_peering_state();
+  test_init();
+  test_event_initialize();
+  // Append 2 log entries one to just shard 0,1 and one to just shard 0
+  test_append_log_entry(ss_all, ss({0, 1}));
+  test_append_log_entry(ss_all, ss({0}));
+  // Full peering cycle
+  test_event_advance_map();
+  dispatch_all();
+  test_event_activate_map();
+  dispatch_all();
+  new_epoch(); // for wait_upthru
+  test_event_advance_map();
+  dispatch_all();
+  test_event_activate_map();
+  dispatch_all();
+  // Verify that we got to active+clean and that log entries got rolled backwards
+  verify_all_active_clean(eversion_t(), eversion_t());
+}
+
+// Multi-OSD test of a replica pool peering for PG with 2 log entries that are incomplete
+// all the way to active+recovering. Unlike EC pools the log entries get rolled forward
+TEST_F(PeeringStateTest, RepIncompleteWritePeeringToActiveRecovering) {
+  dout(0) << "== RepIncompleteWritePeeringToActiveRecovering ==" << dendl;
+  // Init
+  create_rep_pool();
+  test_create_peering_state();
+  test_init();
+  test_event_initialize();
+  // Append 2 log entries to shard 0,1
+  eversion_t previous = test_append_log_entry(ss_all, ss({0, 1}));
+  eversion_t expected = test_append_log_entry(ss_all, ss({0 ,1}));
+  // Full peering cycle
+  test_event_advance_map();
+  dispatch_all();
+  test_event_activate_map();
+  dispatch_all();
+  new_epoch(); // for wait_upthru
+  test_event_advance_map();
+  dispatch_all();
+  test_event_activate_map();
+  dispatch_all();
+  // Verify that we got to active+recovering
+  verify_all_active_recovering(expected, expected, previous, 2, eversion_t());
+}
+
+// Multi-OSD test of peering for PG with 1 partial write log entry all the way to active+clean
+TEST_F(PeeringStateTest, PartialWritePeeringToActiveClean) {
+  dout(0) << "== PartialWritePeeringToActiveClean ==" << dendl;
+  // Init
+  test_create_peering_state();
+  test_init();
+  test_event_initialize();
+  // Append 1 log entry to a partial set of shards
+  eversion_t expected = test_append_log_entry(ss({0, 2, 3}), ss({0, 2, 3}));
+  // Verify that shard 1 has no PWLC and shards 0,2,3 have PWLC saying shard 1 missed writes 0'0-2'1
+  {
+    std::map<shard_id_t,std::pair<eversion_t, eversion_t>> expected_pwlc;
+    verify_pwlc(epoch_t(), expected_pwlc, acting[1]);
+    expected_pwlc[shard_id_t(1)] = std::pair(eversion_t(), expected);
+    epoch_t expected_pwlc_epoch = osdmap->get_epoch();
+    verify_pwlc(expected_pwlc_epoch, expected_pwlc, acting[0]);
+    verify_pwlc(expected_pwlc_epoch, expected_pwlc, acting[2]);
+    verify_pwlc(expected_pwlc_epoch, expected_pwlc, acting[3]);
+  }
+  for (int osd : acting) {
+    dout(0) << osd << " PWLC=" << get_ps(osd)->get_info().partial_writes_last_complete << dendl;
+  }
+  // Full peering cycle
+  test_event_advance_map();
+  test_event_activate_map();
+  dispatch_all();
+  new_epoch(); // for wait_upthru
+  test_event_advance_map();
+  test_event_activate_map();
+  dispatch_all();
+  // Verify that we got to active+clean and that log entry was kept
+  verify_all_active_clean(expected, eversion_t());
+  // Verify all shards have PWLC saying shard 1 missed writes 0'0-2'1
+  {
+    std::map<shard_id_t,std::pair<eversion_t, eversion_t>> expected_pwlc;
+    expected_pwlc[shard_id_t(1)] = std::pair(eversion_t(), expected);
+#ifndef ISSUE_74218
+    epoch_t expected_pwlc_epoch = 2;
+#else
+    epoch_t expected_pwlc_epoch = osdmap->get_epoch();
+#endif
+    verify_pwlc(expected_pwlc_epoch, expected_pwlc);
+  }
+}
+
+// Multi-OSD test of peering for PG with 1 partial write (not complete) log entry all the way to active+clean
+TEST_F(PeeringStateTest, PartialWriteNotCompletePeeringToActiveClean) {
+  dout(0) << "== PartialWritePeeringToActiveClean ==" << dendl;
+  // Init
+  test_create_peering_state();
+  test_init();
+  test_event_initialize();
+  // Append 1 log entry to a partial set of shards but do not advance last_complete
+  eversion_t expected = test_append_log_entry(ss({0, 2, 3}), ss({0, 2, 3}), true);
+  // Verify no shards have PWLC as the write has not been completed
+  {
+    std::map<shard_id_t,std::pair<eversion_t, eversion_t>> expected_pwlc;
+    verify_pwlc(epoch_t(0), expected_pwlc);
+  }
+  // Full peering cycle - the definitive shard will be OSD 1 that didn't see the partial write
+  // but proc_master_log will determine that all the other shards saw the partial write and
+  // will roll it forward
+  test_event_advance_map();
+  test_event_activate_map();
+  dispatch_all();
+  new_epoch(); // for wait_upthru
+  test_event_advance_map();
+  test_event_activate_map();
+  dispatch_all();
+  // Verify that we got to active+clean and that log entry was kept
+  verify_all_active_clean(expected, eversion_t());
+  // Verify all shards have PWLC saying shard 1 missed writes 0'0-2'1
+  {
+    std::map<shard_id_t,std::pair<eversion_t, eversion_t>> expected_pwlc;
+    expected_pwlc[shard_id_t(1)] = std::pair(eversion_t(), expected);
+#ifndef ISSUE_74218
+    epoch_t expected_pwlc_epoch = 2;
+#else
+    epoch_t expected_pwlc_epoch = osdmap->get_epoch();
+#endif
+    verify_pwlc(expected_pwlc_epoch, expected_pwlc);
+  }
+}
+
+// Multi-OSD test of peering for PG with 3 full write log entries all the way to active+clean
+// then change acting set and run async recovery, recover the object and all the way to active+clean
+TEST_F(PeeringStateTest, AsyncRecovery) {
+  dout(0) << "== AsyncRecovery ==" << dendl;
+  // Init
+  test_create_peering_state();
+  test_init();
+  test_event_initialize();
+  // Append 3 log entries to all shards - enough to force async recovery
+  // later on because main reduced osd_async_recovery_min_cost to 2
+  test_append_log_entry();
+  eversion_t previous = test_append_log_entry();
+  eversion_t expected = test_append_log_entry();
+  // Full peering cycle
+  test_peering();
+  // Verify that we got to active+clean and that log entries were kept
+  verify_all_active_clean(expected, eversion_t());
+  // Swap out OSD 1 for OSD 9
+  modify_up_acting(1, 9);
+  test_create_peering_state(9, 1);
+  test_init(9);
+  test_event_initialize(9);
+  // Full peering cycle
+  test_peering();
+  // Verify that we got to active+recovering
+  verify_all_active_recovering(expected, expected, previous, 9, eversion_t());
+  // Now recover the object
+  test_begin_peer_recover(9, 1);
+  test_on_peer_recover(9, 1, expected);
+  test_recover_got(9, expected);
+  test_object_recovered();
+  test_event_all_replicas_recovered();
+  EXPECT_TRUE(new_epoch(true));
+  test_peering();
+  // Verify that we got to active+clean and that log entries were kept
+  verify_all_active_clean(expected, eversion_t());
+}
+
+// Multi-OSD test of peering for PG with 1 full write log entry all the way to active+clean
+// then change acting set and run sync recovery, recover the object and all the way to active+clean
+TEST_F(PeeringStateTest, SyncRecovery) {
+  dout(0) << "== SyncRecovery ==" << dendl;
+  // Init
+  test_create_peering_state();
+  test_init();
+  test_event_initialize();
+  // Append 1 log entries to all shards
+  eversion_t previous;
+  eversion_t expected = test_append_log_entry();
+  // Full peering cycle
+  test_peering();
+  // Verify that we got to active+clean and that log entry was kept
+  verify_all_active_clean(expected, eversion_t());
+  // Swap out OSD 1 for OSD 9
+  modify_up_acting(1, 9);
+  test_create_peering_state(9, 1);
+  test_init(9);
+  test_event_initialize(9);
+  // Full peering cycle
+  test_peering();
+  // Verify that we got to active+recovering
+  verify_all_active_recovering(expected, expected, previous, 9, eversion_t());
+  // Now recover the object
+  test_begin_peer_recover(9, 1);
+  test_on_peer_recover(9, 1, expected);
+  test_recover_got(9, expected);
+  test_object_recovered();
+  test_event_all_replicas_recovered();
+  // Verify that we got to active+clean and that log entry was kept
+  verify_all_active_clean(expected, eversion_t());
+}
+
+// Multi-OSD test of peering for PG with 3 full write log entries all the way to active+clean
+// then change acting set and run backfill, recover the object and all the way to active+clean
+TEST_F(PeeringStateTest, Backfill) {
+  dout(0) << "== Backfill ==" << dendl;
+  // Init
+  test_create_peering_state();
+  test_init();
+  test_event_initialize();
+  // Append 3 log entries to all shards - trim the log so that
+  // backfill occurs later on
+  test_append_log_entry();
+  eversion_t expected_tail = test_append_log_entry();
+  eversion_t expected = test_append_log_entry(shard_id_set(), shard_id_set(), false, true);
+  // Full peering cycle
+  test_peering();
+  // Verify that we got to active+clean and that log entries were kept
+  verify_all_active_clean(expected, expected_tail);
+  // Swap out OSD 1 for OSD 9
+  modify_up_acting(1, 9);
+  test_create_peering_state(9, 1);
+  test_init(9);
+  test_event_initialize(9);
+  // Full peering cycle
+  test_peering();
+  // Verify that we got to active+backfill
+  verify_all_active_backfilling(expected, expected_tail);
+  // Backfill the object
+  test_prepare_backfill_for_missing(9, 1, expected);
+  test_begin_peer_recover(9, 1);
+  test_on_peer_recover(9, 1, expected);
+  test_recover_got(9, expected);
+  test_object_recovered();
+  test_update_peer_last_backfill(9, 1, hobject_t::get_max());
+  test_update_backfill_progress(9, hobject_t::get_max()); // MOSDPGBackfill::OP_BACKFILL_PROGRESS
+  // Signal backfill has completed
+  test_event_recovery_done(9); // MOSDPGBackfill::OP_BACKFILL_FINISH
+  test_event_backfilled();
+  dispatch_all();
+  EXPECT_TRUE(new_epoch(true));
+  test_peering();
+  // Verify that we got to active+clean and that log entries were kept
+  verify_all_active_clean(expected, expected_tail);
+}
+
+#if POOL_MIGRATION
+// Multi-OSD test of peering with pool migration all the way to active+clean
+TEST_F(PeeringStateTest, PoolMigration) {
+  dout(0) << "== PoolMigration ==" << dendl;
+  // Configure pool migration to an EC pool
+  migrate_to_ec_pool();
+  // Init
+  test_create_peering_state();
+  test_init();
+  test_event_initialize();
+  // Full peering cycle
+  test_peering();
+  // Verify pool migration started
+  verify_all_active_migrating(eversion_t(), eversion_t());
+  // Signal migration complete
+  test_event_migration_done();
+  // Verify that we got to active+clean
+  verify_all_active_clean(eversion_t(), eversion_t());
+  EXPECT_TRUE(get_listener(acting[0])->pg_migrated_pool_sent);
+}
+#endif
+
+#if POOL_MIGRATION
+// Multi-OSD test of peering with pool migration too full
+TEST_F(PeeringStateTest, PoolMigrationTooFull) {
+  dout(0) << "== PoolMigration ==" << dendl;
+  // Configure pool migration to an EC pool
+  migrate_to_ec_pool();
+  // Init
+  test_create_peering_state();
+  test_init();
+  test_event_initialize();
+  // Fail reservation on OSD 1
+  get_listener(acting[1])->inject_fail_reserve_recovery_space = true;
+  // Full peering cycle
+  test_peering();
+  // Verify that pool migration is stalled with too full
+  EXPECT_EQ(get_listener(acting[0])->events_scheduled, 1);
+  EXPECT_TRUE(get_ps(acting[0])->state_test(PG_STATE_MIGRATION_TOOFULL));
+  EXPECT_EQ(get_listener(acting[0])->last_state_entered, "Started/Primary/Active/NotMigrating");
+  EXPECT_EQ(get_listener(acting[1])->last_state_entered, "Started/ReplicaActive/RepNotRecovering");
+  EXPECT_EQ(get_listener(acting[2])->last_state_entered, "Started/ReplicaActive/RepNotRecovering");
+  EXPECT_EQ(get_listener(acting[3])->last_state_entered, "Started/ReplicaActive/RepNotRecovering");
+
+  // Fail reservation on OSD 2 as well
+  get_listener(acting[2])->inject_fail_reserve_recovery_space = true;
+  dispatch_all_events(true); // run stalled schedule_after event to retry migration
+  dispatch_all();
+  // Verify that pool migration is stalled with too full
+  EXPECT_EQ(get_listener(acting[0])->events_scheduled, 2);
+  EXPECT_TRUE(get_ps(acting[0])->state_test(PG_STATE_MIGRATION_TOOFULL));
+  EXPECT_EQ(get_listener(acting[0])->last_state_entered, "Started/Primary/Active/NotMigrating");
+  EXPECT_EQ(get_listener(acting[1])->last_state_entered, "Started/ReplicaActive/RepNotRecovering");
+  EXPECT_EQ(get_listener(acting[2])->last_state_entered, "Started/ReplicaActive/RepNotRecovering");
+  EXPECT_EQ(get_listener(acting[3])->last_state_entered, "Started/ReplicaActive/RepNotRecovering");
+
+  // Clear injects
+  get_listener(acting[1])->inject_fail_reserve_recovery_space = false;
+  get_listener(acting[2])->inject_fail_reserve_recovery_space = false;
+  dispatch_all_events(true);  // run stalled schedule_after event to retry migration
+  dispatch_all();
+  // Verify pool migration started
+  verify_all_active_migrating(eversion_t(), eversion_t());
+  // Signal migration complete
+  test_event_migration_done();
+  // Verify that we got to active+clean
+  verify_all_active_clean(eversion_t(), eversion_t());
+  EXPECT_TRUE(get_listener(acting[0])->pg_migrated_pool_sent);
+}
+#endif
+
+#if POOL_MIGRATION
+// Multi-OSD test of peering with pool migration reservtion preempt
+TEST_F(PeeringStateTest, PoolMigrationPrempt) {
+  dout(0) << "== PoolMigration ==" << dendl;
+  // Configure pool migration to an EC pool
+  migrate_to_ec_pool();
+  // Init
+  test_create_peering_state();
+  test_init();
+  test_event_initialize();
+  // Keep preempt reservation event on OSD 1
+  get_listener(acting[1])->inject_keep_preempt = true;
+  // Full peering cycle
+  test_peering();
+  // Verify pool migration started
+  verify_all_active_migrating(eversion_t(), eversion_t());
+  EXPECT_EQ(get_listener(acting[0])->io_reservations_requested, 1);
+  EXPECT_EQ(get_listener(acting[1])->remote_recovery_reservations_requested, 1);
+  EXPECT_EQ(get_listener(acting[2])->remote_recovery_reservations_requested, 1);
+  EXPECT_EQ(get_listener(acting[3])->remote_recovery_reservations_requested, 1);
+  EXPECT_EQ(get_listener(acting[0])->stalled_events.size(), 0);
+  EXPECT_EQ(get_listener(acting[1])->stalled_events.size(), 1); // captured preempt event
+  EXPECT_EQ(get_listener(acting[2])->stalled_events.size(), 0);
+  EXPECT_EQ(get_listener(acting[3])->stalled_events.size(), 0);
+  EXPECT_EQ(get_listener(acting[0])->last_state_entered, "Started/Primary/Active/MigratingSource");
+  EXPECT_EQ(get_listener(acting[1])->last_state_entered, "Started/ReplicaActive/RepRecovering");
+  EXPECT_EQ(get_listener(acting[2])->last_state_entered, "Started/ReplicaActive/RepRecovering");
+  EXPECT_EQ(get_listener(acting[3])->last_state_entered, "Started/ReplicaActive/RepRecovering");
+  dout(0) << "= PoolMigration prempt reservation osd 1 =" << dendl;
+  dispatch_all_events(true); // preempt reservation on OSD 1
+  EXPECT_EQ(get_listener(acting[0])->last_state_entered, "Started/Primary/Active/MigratingSource");
+  EXPECT_EQ(get_listener(acting[1])->last_state_entered, "Started/ReplicaActive/RepNotRecovering");
+  EXPECT_EQ(get_listener(acting[2])->last_state_entered, "Started/ReplicaActive/RepRecovering");
+  EXPECT_EQ(get_listener(acting[3])->last_state_entered, "Started/ReplicaActive/RepRecovering");
+  // Keep preempt reservation event on OSD 2 as well
+  get_listener(acting[2])->inject_keep_preempt = true;
+  dispatch_all();
+  verify_all_active_migrating(eversion_t(), eversion_t());
+  EXPECT_EQ(get_listener(acting[0])->io_reservations_requested, 2);
+  EXPECT_EQ(get_listener(acting[1])->remote_recovery_reservations_requested, 2);
+  EXPECT_EQ(get_listener(acting[2])->remote_recovery_reservations_requested, 2);
+  EXPECT_EQ(get_listener(acting[3])->remote_recovery_reservations_requested, 2);
+  EXPECT_EQ(get_listener(acting[0])->stalled_events.size(), 0);
+  EXPECT_EQ(get_listener(acting[1])->stalled_events.size(), 1); // captured preempt event
+  EXPECT_EQ(get_listener(acting[2])->stalled_events.size(), 1); // captured preempt event
+  EXPECT_EQ(get_listener(acting[3])->stalled_events.size(), 0);
+  EXPECT_EQ(get_listener(acting[0])->last_state_entered, "Started/Primary/Active/MigratingSource");
+  EXPECT_EQ(get_listener(acting[1])->last_state_entered, "Started/ReplicaActive/RepRecovering");
+  EXPECT_EQ(get_listener(acting[2])->last_state_entered, "Started/ReplicaActive/RepRecovering");
+  EXPECT_EQ(get_listener(acting[3])->last_state_entered, "Started/ReplicaActive/RepRecovering");
+  dout(0) << "= PoolMigration prempt reservation osd 1 and 2 =" << dendl;
+  dispatch_all_events(true); // preempt reservation on OSD 1 and 2
+  EXPECT_EQ(get_listener(acting[0])->last_state_entered, "Started/Primary/Active/MigratingSource");
+  EXPECT_EQ(get_listener(acting[1])->last_state_entered, "Started/ReplicaActive/RepNotRecovering");
+  EXPECT_EQ(get_listener(acting[2])->last_state_entered, "Started/ReplicaActive/RepNotRecovering");
+  EXPECT_EQ(get_listener(acting[3])->last_state_entered, "Started/ReplicaActive/RepRecovering");
+  // Keep preempt reservation event on OSD 0 (primary) as well
+  get_listener(acting[0])->inject_keep_preempt = true;
+  dispatch_all();
+  verify_all_active_migrating(eversion_t(), eversion_t());
+  EXPECT_EQ(get_listener(acting[0])->io_reservations_requested, 3);
+  EXPECT_EQ(get_listener(acting[1])->remote_recovery_reservations_requested, 3);
+  EXPECT_EQ(get_listener(acting[2])->remote_recovery_reservations_requested, 3);
+  EXPECT_EQ(get_listener(acting[3])->remote_recovery_reservations_requested, 3);
+  EXPECT_EQ(get_listener(acting[0])->stalled_events.size(), 1); // captured preempt event
+  EXPECT_EQ(get_listener(acting[1])->stalled_events.size(), 1); // captured preempt event
+  EXPECT_EQ(get_listener(acting[2])->stalled_events.size(), 1); // captured preempt event
+  EXPECT_EQ(get_listener(acting[3])->stalled_events.size(), 0);
+  EXPECT_EQ(get_listener(acting[0])->last_state_entered, "Started/Primary/Active/MigratingSource");
+  EXPECT_EQ(get_listener(acting[1])->last_state_entered, "Started/ReplicaActive/RepRecovering");
+  EXPECT_EQ(get_listener(acting[2])->last_state_entered, "Started/ReplicaActive/RepRecovering");
+  EXPECT_EQ(get_listener(acting[3])->last_state_entered, "Started/ReplicaActive/RepRecovering");
+  dout(0) << "= PoolMigration prempt reservation osd 0, 1 and 2 =" << dendl;
+  // 0 Sends RELEASE first, then 1,2 send REVOKE
+  dispatch_events(acting[0], true , 1);
+  dispatch_events(acting[1], true , 1);
+  dispatch_events(acting[2], true , 1);
+  dispatch_all();
+  dispatch_events(acting[0], true , 1);
+  EXPECT_EQ(get_listener(acting[0])->last_state_entered, "Started/Primary/Active/WaitLocalPoolMigrationReserved");
+  EXPECT_EQ(get_listener(acting[1])->last_state_entered, "Started/ReplicaActive/RepNotRecovering");
+  EXPECT_EQ(get_listener(acting[2])->last_state_entered, "Started/ReplicaActive/RepNotRecovering");
+  EXPECT_EQ(get_listener(acting[3])->last_state_entered, "Started/ReplicaActive/RepNotRecovering");
+  dispatch_all();
+  // Verify pool migration started
+  verify_all_active_migrating(eversion_t(), eversion_t());
+  EXPECT_EQ(get_listener(acting[0])->io_reservations_requested, 4);
+  EXPECT_EQ(get_listener(acting[1])->remote_recovery_reservations_requested, 4);
+  EXPECT_EQ(get_listener(acting[2])->remote_recovery_reservations_requested, 4);
+  EXPECT_EQ(get_listener(acting[3])->remote_recovery_reservations_requested, 4);
+  EXPECT_EQ(get_listener(acting[0])->stalled_events.size(), 1); // captured preempt event
+  EXPECT_EQ(get_listener(acting[1])->stalled_events.size(), 1); // captured preempt event
+  EXPECT_EQ(get_listener(acting[2])->stalled_events.size(), 1); // captured preempt event
+  EXPECT_EQ(get_listener(acting[3])->stalled_events.size(), 0);
+  EXPECT_EQ(get_listener(acting[0])->last_state_entered, "Started/Primary/Active/MigratingSource");
+  EXPECT_EQ(get_listener(acting[1])->last_state_entered, "Started/ReplicaActive/RepRecovering");
+  EXPECT_EQ(get_listener(acting[2])->last_state_entered, "Started/ReplicaActive/RepRecovering");
+  EXPECT_EQ(get_listener(acting[3])->last_state_entered, "Started/ReplicaActive/RepRecovering");
+  dout(0) << "= PoolMigration prempt reservation osd 0, 1 and 2 race hazard =" << dendl;
+  // 1,2 send REVOKE first, then 0 sends RELEASE
+  get_listener(acting[0])->inject_keep_preempt = false;
+  get_listener(acting[1])->inject_keep_preempt = false;
+  get_listener(acting[2])->inject_keep_preempt = false;
+  // 1 and 2 prempt reservation and send messages to osd 0
+  dispatch_events(acting[1], true);
+  dispatch_events(acting[2], true);
+  dispatch_cluster_messages(1);
+  dispatch_cluster_messages(2);
+  // 0 prempt reservation is slow
+  dispatch_events(acting[0], true);
+  dispatch_all();
+  // Verify pool migration started
+  verify_all_active_migrating(eversion_t(), eversion_t());
+  // Signal migration complete
+  test_event_migration_done();
+  // Verify that we got to active+clean
+  verify_all_active_clean(eversion_t(), eversion_t());
+  EXPECT_TRUE(get_listener(acting[0])->pg_migrated_pool_sent);
+}
+#endif
+
+// ============================================================================
+// Tests for bug fixes
+// ============================================================================
+
+// Multi-OSD test of peering for fix for https://tracker.ceph.com/issues/71493
+TEST_F(PeeringStateTest, Issue71493) {
+  // Scenario: Backfill gets stopped by a remote reservastion being preempted
+  // but this races with MOSDPGBackfill::OP_BACKFILL_FINISH event. The
+  // probelmatic sequence is:
+  //
+  // Primary calls update_pee_last_backfill to set last_backfill to hobject_t::MAX
+  // Primary sends a MOSDPGBackfill::OP_BACKFILL_FINISH message to the backfilling shard
+  // Backfilling shard preempts the remote reservation and sends Revoked message
+  // Backfilling shard processes the MOSDPGBackfill::OP_BACKFILL_FINISH message
+  // Primary processes RemoteReservation revoked message from the backfilling OSD
+  //   Sets BACKFILL_WAIT and suspends backfill (fix only does this if needs_backfill() returns true)
+  //   Discards the event because needs_backfill() returns false
+  // Primary sends and processes Backfilled event
+  //   Leaves BACKFILL_WAIT set
+  dout(0) << "== Issue71493 ==" << dendl;
+  // Init
+  test_create_peering_state();
+  test_init();
+  test_event_initialize();
+  // Append 3 log entries to all shards - trim the log so that
+  // backfill occurs later on
+  test_append_log_entry();
+  eversion_t expected_tail = test_append_log_entry();
+  eversion_t expected = test_append_log_entry(shard_id_set(), shard_id_set(), false, true);
+  // Full peering cycle
+  test_peering();
+  // Verify that we got to active+clean and that log entries were kept
+  verify_all_active_clean(expected, expected_tail);
+  // Swap out OSD 1 for OSD 9
+  modify_up_acting(1, 9);
+  test_create_peering_state(9, 1);
+  test_init(9);
+  test_event_initialize(9);
+  // Keep preempt reservation event on OSD 9 for later
+  get_listener(acting[1])->inject_keep_preempt = true;
+  // Full peering cycle
+  test_peering();
+  // Verify that we got to active+backfill
+  verify_all_active_backfilling(expected, expected_tail);
+  // Backfill the object
+  test_prepare_backfill_for_missing(9, 1, expected);
+  test_begin_peer_recover(9, 1);
+  test_on_peer_recover(9, 1, expected);
+  test_recover_got(9, expected);
+  test_object_recovered();
+  test_update_peer_last_backfill(9, 1, hobject_t::get_max());
+  test_update_backfill_progress(9, hobject_t::get_max()); // MOSDPGBackfill::OP_BACKFILL_PROGRESS
+  dout(0) << "= revoking remote reservation =" << dendl;
+  // Preempt remote reservation ahead of MOSDPGBackfill::OP_BACKFILL_FINISH
+  dispatch_all_events(true);
+  dispatch_all();
+  // Signal backfill has completed
+  test_event_recovery_done(9); // MOSDPGBackfill::OP_BACKFILL_FINISH
+  dispatch_all();
+  test_event_backfilled();
+  dispatch_all();
+  // Without the fix BACKFILL_WAIT is still set on the primary
+#ifdef ISSUE_71493
+  EXPECT_FALSE(get_ps(acting[0])->state_test(PG_STATE_BACKFILL_WAIT));
+#endif
+  // Even without the fix the new interval caused by the acting set changing
+  // after the backfill has completed will clean up the PG state
+  EXPECT_TRUE(new_epoch(true));
+  test_peering();
+  // Verify that we got to active+clean and that log entries were kept
+  verify_all_active_clean(expected, expected_tail);
+}
+
+// Multi-OSD test of peering for fix for https://tracker.ceph.com/issues/73891
+TEST_F(PeeringStateTest, Issue73891) {
+  // Scenario: 3+2 EC pool [0,1,2,3,4]
+  // Partial write A to OSDs 0,3,4 completes
+  // Partial write B to OSDs 0,1,3,4 only updates 0, 1 before interuption
+  // OSD 1 down, peering runs and rolls-backward the partial write B
+  // Partial write C to OSDs 0,3,4 that is completed
+  // OSD 1 up, peering runs
+  //   Without fix OSD 3 doesn't roll back write B
+  dout(0) << "== Issue73891 ==" << dendl;
+  // Init
+  create_ec_pool(3,2);
+  test_create_peering_state();
+  test_init();
+  test_event_initialize();
+  // Partial Write A to OSDs 0,3,4
+  test_append_log_entry(ss({0, 3, 4}), ss({0, 3, 4}));
+  // Partial Write B to OSDs 0,1,3,4 only updates 0, 1
+  test_append_log_entry(ss({0, 1, 3, 4}), ss({0, 1}), true);
+  // OSD 1 down, run peering
+  osd_down(1, 1);
+  test_peering();
+  // Partial Write C to 0,3,4
+  test_append_log_entry(ss({0, 3, 4}), ss({0, 3, 4}));
+  // OSD 1 up, run peering
+  osd_up(1, 1);
+  test_peering();
+  verify_logs(); // Without fix will fail
+}
+
+// Multi-OSD test of peering for fix for https://tracker.ceph.com/issues/74218
+TEST_F(PeeringStateTest, Issue74218) {
+  // Scenario:
+  // Partial write to OSDs 0,2,3 that has not been completed
+  // OSD 2 down, peering runs and rolls-forward the partial write
+  // OSD 1 down, peering runs
+  // Partial write to OSDs 0,[2],3 that is completed
+  // OSD 2 up, peering runs
+  //   OSD 2 rolls forward the 1st write, without the fix this overwrites pwlc with a stale version
+  // OSD 1 up, peering runs - make sure OSD 2 replys to GetInfo last
+  //   OSD 2 provides its stale copy of pwlc
+  //   primary gives stale pwlc to osd 1 which asserts
+  dout(0) << "== Issue74218 ==" << dendl;
+  // Init
+  test_create_peering_state();
+  test_init();
+  test_event_initialize();
+  // Partial write to OSDs 0,2,3 but do not advance last_complete
+  eversion_t expected1 = test_append_log_entry(ss({0, 2, 3}), ss({0, 2, 3}), true);
+  // Verify no shards have PWLC as the write has not been completed
+  {
+    std::map<shard_id_t,std::pair<eversion_t, eversion_t>> expected_pwlc;
+    verify_pwlc(epoch_t(0), expected_pwlc);
+  }
+  // OSD 2 down, run peering
+  osd_down(2, 2);
+  test_peering();
+  // OSD 1 down, run peering
+  osd_down(1, 1);
+  test_peering();
+  // Partial write to OSDs 0,[2],3
+  eversion_t expected2 = test_append_log_entry(ss({0, 2, 3}), ss({0, 3}));
+  // OSD 2 up, run peering
+  osd_up(2, 2);
+  test_peering();
+  // OSD 1 up, run peering
+  osd_up(1, 1);
+  // Full peering cycle - but delay OSD 2's response to GetInfo
+  test_event_advance_map();
+  test_event_activate_map();
+  dispatch_peering_messages(0); // Send query info to OSDs 1,2,3
+  dispatch_peering_messages(1); // Send reply from OSD 1
+  dispatch_peering_messages(3); // Send reply from OSD 3
+  dispatch_peering_messages(2); // Send reply from OSD 2 - must be last
+  // Run rest of peering
+  dispatch_all();
+  new_epoch(); // for wait_upthru
+  test_event_advance_map();
+  test_event_activate_map();
+  dispatch_all();
+  // Verify that we got to active+recovering
+  // Prior to fix for issue 73891 the last write would not modify OSD 1 so it would
+  // not need recovery. With this fix the write becomes a full-write and OSD 1
+  // ends up with a missing object
+  // verify_all_active_recovering(expected2, expected2, expected1, 2, eversion_t());
+  verify_primary_active_recovering(acting[0]);
+  verify_no_missing_or_unfound(acting[0], 0, false);
+  verify_active_and_peered(acting[0]);
+  verify_log_state(acting[0], expected2, expected2, expected2, eversion_t());
+  verify_replica_activated(acting[1]);
+  verify_missing(acting[1], 1);
+  verify_log_state(acting[1], expected2, expected1, expected2, eversion_t());
+  verify_replica_activated(acting[2]);
+  verify_missing(acting[2], 2);
+  verify_log_state(acting[2], expected2, expected1, expected2, eversion_t());
+  verify_replica_activated(acting[3]);
+  verify_no_missing_or_unfound(acting[3], 3, true);
+  verify_log_state(acting[3], expected2, expected2, expected2, eversion_t());
+  verify_logs();
+}
+
+// ============================================================================
+// Main
+// ============================================================================
+
+int main(int argc, char **argv)
+{
+  std::map<std::string, std::string> defaults = {
+    // our map is flat, so just try and split across OSDs, not hosts or whatever
+    {"osd_crush_chooseleaf_type", "0"},
+    // debug level 20 for OSD so we get full logs
+    {"debug_osd", "20"},
+    // reduce async recovery cost to 2 to make it easier to test async recovery
+    {"osd_async_recovery_min_cost","2"},
+  };
+  std::vector<const char*> args(argv, argv + argc);
+  auto cct = global_init(&defaults,
+                         args,
+                         CEPH_ENTITY_TYPE_CLIENT,
+                         CODE_ENVIRONMENT_UTILITY,
+                         CINIT_FLAG_NO_MON_CONFIG);
+  common_init_finish(g_ceph_context);
+
+  ::testing::InitGoogleTest(&argc, argv);
+  return RUN_ALL_TESTS();
+}