]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/.../replicated_recovery_backend: convert to logging macros, some formatting...
authorSamuel Just <sjust@redhat.com>
Tue, 7 Jan 2025 20:55:48 +0000 (12:55 -0800)
committerSamuel Just <sjust@redhat.com>
Wed, 29 Jan 2025 05:00:33 +0000 (05:00 +0000)
Signed-off-by: Samuel Just <sjust@redhat.com>
src/crimson/osd/replicated_recovery_backend.cc

index feb19a12675af9860ccdf206382038c7f02fabd4..154b56af336ac5f8e026cab872a6afcb07047d59 100644 (file)
@@ -6,17 +6,14 @@
 #include <seastar/core/future.hh>
 #include <seastar/core/do_with.hh>
 
+#include "crimson/common/log.h"
 #include "crimson/osd/pg.h"
 #include "crimson/osd/pg_backend.h"
 #include "osd/osd_types_fmt.h"
 #include "replicated_recovery_backend.h"
 #include "msg/Message.h"
 
-namespace {
-  seastar::logger& logger() {
-    return crimson::get_logger(ceph_subsys_osd);
-  }
-}
+SET_SUBSYS(osd);
 
 using std::less;
 using std::map;
@@ -27,34 +24,39 @@ ReplicatedRecoveryBackend::recover_object(
   const hobject_t& soid,
   eversion_t need)
 {
-  logger().debug("{}: {}, {}", __func__, soid, need);
+  LOG_PREFIX(ReplicatedRecoveryBackend::recover_object);
+  DEBUGDPP("{}, {}", pg, soid, need);
   // always add_recovering(soid) before recover_object(soid)
   assert(is_recovering(soid));
   // start tracking the recovery of soid
-  return maybe_pull_missing_obj(soid, need).then_interruptible([this, soid, need] {
-    logger().debug("recover_object: loading obc: {}", soid);
-    return pg.obc_loader.with_obc<RWState::RWREAD>(soid,
-      [this, soid, need](auto head, auto obc) {
-      if (!obc->obs.exists) {
-        // XXX: this recovery must be triggered by backfills and the corresponding
-        //      object must have been deleted by some client request after the object
-        //      is enqueued for push but before the lock is acquired by the recovery.
-        //
-        //      Abort the recovery in this case, a "recover_delete" must have been
-        //      added for this object by the client request that deleted it.
-        return interruptor::now();
-      }
-      logger().debug("recover_object: loaded obc: {}", obc->obs.oi.soid);
-      auto& recovery_waiter = get_recovering(soid);
-      recovery_waiter.obc = obc;
-      return maybe_push_shards(head, soid, need);
-    }, false).handle_error_interruptible(
-      crimson::osd::PG::load_obc_ertr::all_same_way([soid](auto& code) {
-      // TODO: may need eio handling?
-      logger().error("recover_object saw error code {}, ignoring object {}",
-                     code, soid);
-      return seastar::now();
-    }));
+  return maybe_pull_missing_obj(
+    soid, need
+  ).then_interruptible([FNAME, this, soid, need] {
+    DEBUGDPP("loading obc: {}", pg, soid);
+    return pg.obc_loader.with_obc<RWState::RWREAD>(
+      soid,
+      [FNAME, this, soid, need](auto head, auto obc) {
+       if (!obc->obs.exists) {
+         // XXX: this recovery must be triggered by backfills and the corresponding
+         //      object must have been deleted by some client request after the object
+         //      is enqueued for push but before the lock is acquired by the recovery.
+         //
+         //      Abort the recovery in this case, a "recover_delete" must have been
+         //      added for this object by the client request that deleted it.
+         return interruptor::now();
+       }
+       DEBUGDPP("loaded obc: {}", pg, obc->obs.oi.soid);
+       auto& recovery_waiter = get_recovering(soid);
+       recovery_waiter.obc = obc;
+       return maybe_push_shards(head, soid, need);
+      }, false).handle_error_interruptible(
+       crimson::osd::PG::load_obc_ertr::all_same_way(
+         [FNAME, this, soid](auto& code) {
+           // TODO: may need eio handling?
+           ERRORDPP("saw error code {}, ignoring object {}",
+                    pg, code, soid);
+           return seastar::now();
+         }));
   });
 }
 
@@ -116,7 +118,8 @@ ReplicatedRecoveryBackend::maybe_pull_missing_obj(
   const hobject_t& soid,
   eversion_t need)
 {
-  logger().debug("{}: {}, {}", __func__, soid, need);
+  LOG_PREFIX(ReplicatedRecoveryBackend::maybe_pull_missing_obj);
+  DEBUGDPP("{}, {}", pg, soid, need);
   pg_missing_tracker_t local_missing = pg.get_local_missing();
   if (!local_missing.is_missing(soid)) {
     // object is not missing, don't pull
@@ -159,12 +162,13 @@ ReplicatedRecoveryBackend::push_delete(
   const hobject_t& soid,
   eversion_t need)
 {
-  logger().debug("{}: {}, {}", __func__, soid, need);
+  LOG_PREFIX(ReplicatedRecoveryBackend::push_delete);
+  DEBUGDPP("{}, {}", pg, soid, need);
   epoch_t min_epoch = pg.get_last_peering_reset();
 
   assert(pg.get_acting_recovery_backfill().size() > 0);
   return interruptor::parallel_for_each(pg.get_acting_recovery_backfill(),
-    [this, soid, need, min_epoch](pg_shard_t shard)
+    [FNAME, this, soid, need, min_epoch](pg_shard_t shard)
     -> interruptible_future<> {
     if (shard == pg.get_pg_whoami())
       return seastar::make_ready_future<>();
@@ -172,7 +176,7 @@ ReplicatedRecoveryBackend::push_delete(
     if (iter == pg.get_shard_missing().end())
       return seastar::make_ready_future<>();
     if (iter->second.is_missing(soid)) {
-      logger().debug("push_delete: will remove {} from {}", soid, shard);
+      DEBUGDPP("will remove {} from {}", pg, soid, shard);
       pg.begin_peer_recover(shard, soid);
       spg_t target_pg(pg.get_info().pgid.pgid, shard.shard);
       auto msg = crimson::make_message<MOSDPGRecoveryDelete>(
@@ -194,7 +198,8 @@ RecoveryBackend::interruptible_future<>
 ReplicatedRecoveryBackend::handle_recovery_delete(
   Ref<MOSDPGRecoveryDelete> m)
 {
-  logger().debug("{}: {}", __func__, *m);
+  LOG_PREFIX(ReplicatedRecoveryBackend::handle_recovery_delete);
+  DEBUGDPP("{}", pg, *m);
 
   auto& p = m->objects.front(); //TODO: only one delete per message for now.
   return local_recover_delete(p.first, p.second, pg.get_osdmap_epoch())
@@ -218,14 +223,15 @@ ReplicatedRecoveryBackend::on_local_recover_persist(
   bool is_delete,
   epoch_t epoch_frozen)
 {
-  logger().debug("{}", __func__);
+  LOG_PREFIX(ReplicatedRecoveryBackend::on_local_recover_persist);
+  DEBUGDPP("", pg);
   return seastar::do_with(
     ceph::os::Transaction(),
-    [this, soid, &_recovery_info, is_delete, epoch_frozen](auto &t) {
+    [FNAME, this, soid, &_recovery_info, is_delete, epoch_frozen](auto &t) {
     return pg.get_recovery_handler()->on_local_recover(
       soid, _recovery_info, is_delete, t
-    ).then_interruptible([this, &t] {
-      logger().debug("ReplicatedRecoveryBackend::{}: do_transaction...", __func__);
+    ).then_interruptible([FNAME, this, &t] {
+      DEBUGDPP("submitting transaction", pg);
       return shard_services.get_store().do_transaction(coll, std::move(t));
     }).then_interruptible(
       [this, epoch_frozen, last_complete = pg.get_info().last_complete] {
@@ -241,17 +247,18 @@ ReplicatedRecoveryBackend::local_recover_delete(
   eversion_t need,
   epoch_t epoch_to_freeze)
 {
-  logger().debug("{}: {}, {}", __func__, soid, need);
-  return backend->load_metadata(soid).safe_then_interruptible([this]
+  LOG_PREFIX(ReplicatedRecoveryBackend::local_recover_delete);
+  DEBUGDPP("{}, {}", pg, soid, need);
+  return backend->load_metadata(soid).safe_then_interruptible([FNAME, this]
     (auto lomt) -> interruptible_future<> {
     if (lomt->os.exists) {
       return seastar::do_with(ceph::os::Transaction(),
-       [this, lomt = std::move(lomt)](auto& txn) mutable {
+        [FNAME, this, lomt = std::move(lomt)](auto& txn) mutable {
         return interruptor::async([this, lomt=std::move(lomt), &txn] {
           pg.remove_maybe_snapmapped_object(txn, lomt->os.oi.soid);
         }).then_interruptible(
-         [this, &txn]() mutable {
-         logger().debug("ReplicatedRecoveryBackend::local_recover_delete: do_transaction...");
+         [FNAME, this, &txn]() mutable {
+         DEBUGDPP("submitting transaction", pg);
          return shard_services.get_store().do_transaction(coll,
                                                           std::move(txn));
        });
@@ -285,13 +292,14 @@ RecoveryBackend::interruptible_future<>
 ReplicatedRecoveryBackend::recover_delete(
   const hobject_t &soid, eversion_t need)
 {
-  logger().debug("{}: {}, {}", __func__, soid, need);
+  LOG_PREFIX(ReplicatedRecoveryBackend::recover_delete);
+  DEBUGDPP("{}, {}", pg, soid, need);
 
   epoch_t cur_epoch = pg.get_osdmap_epoch();
   return seastar::do_with(object_stat_sum_t(),
-    [this, soid, need, cur_epoch](auto& stat_diff) {
+    [FNAME, this, soid, need, cur_epoch](auto& stat_diff) {
     return local_recover_delete(soid, need, cur_epoch).then_interruptible(
-      [this, &stat_diff, cur_epoch, soid, need]()
+      [FNAME, this, &stat_diff, cur_epoch, soid, need]()
       -> interruptible_future<> {
       if (!pg.has_reset_since(cur_epoch)) {
        bool object_missing = false;
@@ -299,8 +307,9 @@ ReplicatedRecoveryBackend::recover_delete(
          if (shard == pg.get_pg_whoami())
            continue;
          if (pg.get_shard_missing(shard)->is_missing(soid)) {
-           logger().debug("recover_delete: soid {} needs to deleted from replca {}",
-                          soid, shard);
+           DEBUGDPP(
+             "soid {} needs to be deleted from replica {}",
+             pg, soid, shard);
            object_missing = true;
            break;
          }
@@ -330,7 +339,8 @@ ReplicatedRecoveryBackend::prep_push_to_replica(
   eversion_t need,
   pg_shard_t pg_shard)
 {
-  logger().debug("{}: {}, {}", __func__, soid, need);
+  LOG_PREFIX(ReplicatedRecoveryBackend::prep_push_to_replica);
+  DEBUGDPP("{}, {}", pg, soid, need);
 
   auto& recovery_waiter = get_recovering(soid);
   auto& obc = recovery_waiter.obc;
@@ -347,8 +357,8 @@ ReplicatedRecoveryBackend::prep_push_to_replica(
     // try to base push off of clones that succeed/preceed poid
     // we need the head (and current SnapSet) locally to do that.
     if (pg.get_local_missing().is_missing(head)) {
-      logger().debug("{} missing head {}, pushing raw clone",
-                     __func__, head);
+      DEBUGDPP("missing head {}, pushing raw clone",
+              pg, head);
       if (obc->obs.oi.size) {
         subsets.data_subset.insert(0, obc->obs.oi.size);
       }
@@ -361,8 +371,7 @@ ReplicatedRecoveryBackend::prep_push_to_replica(
     auto ssc = obc->ssc;
     ceph_assert(ssc);
     push_info_ss = ssc->snapset;
-    logger().debug("push_to_replica snapset is {}",
-                   ssc->snapset);
+    DEBUGDPP("snapset is {}", pg, ssc->snapset);
 
     subsets = crimson::osd::calc_clone_subsets(
       ssc->snapset, soid,
@@ -375,8 +384,7 @@ ReplicatedRecoveryBackend::prep_push_to_replica(
     // base this on partially on replica's clones?
     auto ssc = obc->ssc;
     ceph_assert(ssc);
-    logger().debug("push_to_replica snapset is {}",
-                   ssc->snapset);
+    DEBUGDPP("snapset is {}", pg, ssc->snapset);
     subsets = crimson::osd::calc_head_subsets(
       obc->obs.oi.size,
       ssc->snapset, soid,
@@ -399,7 +407,8 @@ ReplicatedRecoveryBackend::prep_push(
   const crimson::osd::subsets_t& subsets,
   const SnapSet push_info_ss)
 {
-  logger().debug("{}: {}, {}", __func__, soid, need);
+  LOG_PREFIX(ReplicatedRecoveryBackend::prep_push);
+  DEBUGDPP("{}, {}", pg, soid, need);
   auto& recovery_waiter = get_recovering(soid);
   auto& obc = recovery_waiter.obc;
 
@@ -439,8 +448,10 @@ void ReplicatedRecoveryBackend::prepare_pull(
   PullOp& pull_op,
   pull_info_t& pull_info,
   const hobject_t& soid,
-  eversion_t need) {
-  logger().debug("{}: {}, {}", __func__, soid, need);
+  eversion_t need)
+{
+  LOG_PREFIX(ReplicatedRecoveryBackend::prepare_pull);
+  DEBUGDPP("{}, {}", pg, soid, need);
 
   pg_missing_tracker_t local_missing = pg.get_local_missing();
   const auto missing_iter = local_missing.get_items().find(soid);
@@ -471,6 +482,7 @@ ObjectRecoveryInfo ReplicatedRecoveryBackend::set_recovery_info(
   const hobject_t& soid,
   const crimson::osd::SnapSetContextRef ssc)
 {
+  LOG_PREFIX(ReplicatedRecoveryBackend::set_recovery_info);
   pg_missing_tracker_t local_missing = pg.get_local_missing();
   const auto missing_iter = local_missing.get_items().find(soid);
   ObjectRecoveryInfo recovery_info;
@@ -481,7 +493,7 @@ ObjectRecoveryInfo ReplicatedRecoveryBackend::set_recovery_info(
     auto subsets = crimson::osd::calc_clone_subsets(
       ssc->snapset, soid, local_missing, pg.get_info().last_backfill);
     crimson::osd::set_subsets(subsets, recovery_info);
-    logger().debug("{}: pulling {}", __func__, recovery_info);
+    DEBUGDPP("pulling {}", pg, recovery_info);
     ceph_assert(ssc->snapset.clone_size.count(soid.snap));
     recovery_info.size = ssc->snapset.clone_size[soid.snap];
   } else {
@@ -504,40 +516,41 @@ ReplicatedRecoveryBackend::build_push_op(
     const ObjectRecoveryProgress& progress,
     object_stat_sum_t* stat)
 {
-  logger().debug("{} {} @{}",
-                __func__, recovery_info.soid, recovery_info.version);
+  LOG_PREFIX(ReplicatedRecoveryBackend::build_push_op);
+  DEBUGDPP("{} @{}", pg, recovery_info.soid, recovery_info.version);
   return seastar::do_with(ObjectRecoveryProgress(progress),
                          uint64_t(crimson::common::local_conf()
                            ->osd_recovery_max_chunk),
                          recovery_info.version,
                          PushOp(),
-    [this, &recovery_info, &progress, stat]
+    [FNAME, this, &recovery_info, &progress, stat]
     (auto& new_progress, auto& available, auto& v, auto& push_op) {
     return read_metadata_for_push_op(recovery_info.soid,
                                      progress, new_progress,
                                      v, &push_op
-    ).then_interruptible([&](eversion_t local_ver) mutable {
+    ).then_interruptible([&, FNAME](eversion_t local_ver) mutable {
       // If requestor didn't know the version, use ours
       if (v == eversion_t()) {
         v = local_ver;
       } else if (v != local_ver) {
-        logger().error("build_push_op: {} push {} v{} failed because local copy is {}",
-                       pg.get_pgid(), recovery_info.soid, recovery_info.version, local_ver);
+        ERRORDPP(
+         "push {} v{} failed because local copy is {}",
+         pg, recovery_info.soid, recovery_info.version, local_ver);
         // TODO: bail out
       }
       return read_omap_for_push_op(recovery_info.soid,
                                    progress,
                                    new_progress,
                                    available, &push_op);
-    }).then_interruptible([this, &recovery_info, &progress,
+    }).then_interruptible([FNAME, this, &recovery_info, &progress,
                            &available, &push_op]() mutable {
-      logger().debug("build_push_op: available: {}, copy_subset: {}",
-                    available, recovery_info.copy_subset);
+      DEBUGDPP("available: {}, copy_subset: {}",
+              pg, available, recovery_info.copy_subset);
       return read_object_for_push_op(recovery_info.soid,
                                     recovery_info.copy_subset,
                                     progress.data_recovered_to,
                                     available, &push_op);
-    }).then_interruptible([&recovery_info, &v, &progress,
+    }).then_interruptible([FNAME, this, &recovery_info, &v, &progress,
                            &new_progress, stat, &push_op]
             (uint64_t recovered_to) mutable {
       new_progress.data_recovered_to = recovered_to;
@@ -559,9 +572,8 @@ ReplicatedRecoveryBackend::build_push_op(
       push_op.recovery_info = recovery_info;
       push_op.after_progress = new_progress;
       push_op.before_progress = progress;
-      logger().debug("build_push_op: push_op version:"
-                     " {}, push_op data length: {}",
-                    push_op.version, push_op.data.length());
+      DEBUGDPP("push_op version: {}, push_op data length: {}",
+              pg, push_op.version, push_op.data.length());
       return seastar::make_ready_future<PushOp>(std::move(push_op));
     });
   });
@@ -575,7 +587,8 @@ ReplicatedRecoveryBackend::read_metadata_for_push_op(
     eversion_t ver,
     PushOp* push_op)
 {
-  logger().debug("{}, {}", __func__, oid);
+  LOG_PREFIX(ReplicatedRecoveryBackend::read_metadata_for_push_op);
+  DEBUGDPP("{}", pg, oid);
   if (!progress.first) {
     return seastar::make_ready_future<eversion_t>(ver);
   }
@@ -584,30 +597,30 @@ ReplicatedRecoveryBackend::read_metadata_for_push_op(
         coll, ghobject_t(oid), CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
       ).handle_error_interruptible<false>(
        crimson::os::FuturizedStore::Shard::read_errorator::all_same_way(
-         [oid] (const std::error_code& e) {
-         logger().debug("read_metadata_for_push_op, error {} when getting omap header: {}", e, oid);
+         [FNAME, this, oid] (const std::error_code& e) {
+         DEBUGDPP("error {} when getting omap header: {}", pg, e, oid);
          return seastar::make_ready_future<bufferlist>();
        })),
       interruptor::make_interruptible(
         store->get_attrs(coll, ghobject_t(oid), CEPH_OSD_OP_FLAG_FADVISE_DONTNEED)
       ).handle_error_interruptible<false>(
        crimson::os::FuturizedStore::Shard::get_attrs_ertr::all_same_way(
-         [oid] (const std::error_code& e) {
-         logger().debug("read_metadata_for_push_op, error {} when getting attrs: {}", e, oid);
+         [FNAME, this, oid] (const std::error_code& e) {
+         DEBUGDPP("error {} when getting attrs: {}", pg, e, oid);
          return seastar::make_ready_future<crimson::os::FuturizedStore::Shard::attrs_t>();
        }))
-  )).then_unpack_interruptible([&new_progress, push_op](auto bl, auto attrs) {
+  )).then_unpack_interruptible([FNAME, this, &new_progress, push_op](auto bl, auto attrs) {
     if (bl.length() == 0) {
-      logger().warn("read_metadata_for_push_op: fail to read omap header");
+      WARNDPP("fail to read omap header", pg);
     } else if (attrs.empty()) {
-      logger().error("read_metadata_for_push_op: fail to read attrs");
+      ERRORDPP("fail to read attrs", pg);
       return eversion_t{};
     }
     push_op->omap_header.claim_append(std::move(bl));
     for (auto&& [key, val] : attrs) {
       push_op->attrset.emplace(std::move(key), std::move(val));
     }
-    logger().debug("read_metadata_for_push_op: {}", push_op->attrset[OI_ATTR]);
+    DEBUGDPP("{}", pg, push_op->attrset[OI_ATTR]);
     object_info_t oi;
     oi.decode_no_oid(push_op->attrset[OI_ATTR]);
     new_progress.first = false;
@@ -623,6 +636,7 @@ ReplicatedRecoveryBackend::read_object_for_push_op(
     uint64_t max_len,
     PushOp* push_op)
 {
+  LOG_PREFIX(ReplicatedRecoveryBackend::read_object_for_push_op);
   if (max_len == 0 || copy_subset.empty()) {
     push_op->data_included.clear();
     return seastar::make_ready_future<uint64_t>(offset);
@@ -668,8 +682,8 @@ ReplicatedRecoveryBackend::read_object_for_push_op(
       recovered_to = push_op->data_included.range_end();
     }
     return seastar::make_ready_future<uint64_t>(recovered_to);
-  }, PGBackend::read_errorator::all_same_way([](auto e) {
-    logger().debug("build_push_op: read exception");
+  }, PGBackend::read_errorator::all_same_way([FNAME, this](auto e) {
+    DEBUGDPP("read exception", pg);
     return seastar::make_exception_future<uint64_t>(e);
   }));
 }
@@ -757,16 +771,18 @@ ReplicatedRecoveryBackend::get_shards_to_push(const hobject_t& soid) const
 RecoveryBackend::interruptible_future<>
 ReplicatedRecoveryBackend::handle_pull(Ref<MOSDPGPull> m)
 {
-  logger().debug("{}: {}", __func__, *m);
+  LOG_PREFIX(ReplicatedRecoveryBackend::handle_pull);
+  DEBUGDPP("{}", pg, *m);
   if (pg.can_discard_replica_op(*m)) {
-    logger().debug("{}: discarding {}", __func__, *m);
+    DEBUGDPP("discarding {}", pg, *m);
     return seastar::now();
   }
-  return seastar::do_with(m->take_pulls(), [this, from=m->from](auto& pulls) {
-    return interruptor::parallel_for_each(pulls,
-                                      [this, from](auto& pull_op) {
+  return seastar::do_with(m->take_pulls(), [FNAME, this, from=m->from](auto& pulls) {
+    return interruptor::parallel_for_each(
+      pulls,
+      [FNAME, this, from](auto& pull_op) {
       const hobject_t& soid = pull_op.soid;
-      logger().debug("handle_pull: {}", soid);
+      DEBUGDPP("{}", pg, soid);
       return backend->stat(coll, ghobject_t(soid)).then_interruptible(
         [this, &pull_op](auto st) {
         ObjectRecoveryInfo &recovery_info = pull_op.recovery_info;
@@ -806,9 +822,10 @@ ReplicatedRecoveryBackend::_handle_pull_response(
   PullOp* response,
   ceph::os::Transaction* t)
 {
-  logger().debug("handle_pull_response {} {} data.size() is {} data_included: {}",
-      push_op.recovery_info, push_op.after_progress,
-      push_op.data.length(), push_op.data_included);
+  LOG_PREFIX(ReplicatedRecoveryBackend::handle_pull);
+  DEBUGDPP("{} {} data.size() is {} data_included: {}",
+          pg, push_op.recovery_info, push_op.after_progress,
+          push_op.data.length(), push_op.data_included);
 
   const hobject_t &hoid = push_op.soid;
   auto& recovery_waiter = get_recovering(hoid);
@@ -828,7 +845,7 @@ ReplicatedRecoveryBackend::_handle_pull_response(
   if (pull_info.recovery_progress.first) {
     prepare_waiter = pg.obc_loader.with_obc<RWState::RWNONE>(
       pull_info.recovery_info.soid,
-      [this, &pull_info, &recovery_waiter, &push_op](auto, auto obc) {
+      [FNAME, this, &pull_info, &recovery_waiter, &push_op](auto, auto obc) {
         pull_info.obc = obc;
         recovery_waiter.obc = obc;
         obc->obs.oi.decode_no_oid(push_op.attrset.at(OI_ATTR),
@@ -843,7 +860,7 @@ ReplicatedRecoveryBackend::_handle_pull_response(
             obc->ssc->snapset = SnapSet(ss_attr_iter->second);
             obc->ssc->exists = true;
           } catch (const buffer::error&) {
-            logger().warn("unable to decode SnapSet");
+            WARNDPP("unable to decode SnapSet", pg);
             throw crimson::osd::invalid_argument();
           }
           assert(!pull_info.obc->ssc->exists ||
@@ -859,11 +876,11 @@ ReplicatedRecoveryBackend::_handle_pull_response(
       }, false).handle_error_interruptible(crimson::ct_error::assert_all{});
   };
   return prepare_waiter.then_interruptible(
-    [this, &pull_info, &push_op, t, response]() mutable {
+    [FNAME, this, &pull_info, &push_op, t, response]() mutable {
     const bool first = pull_info.recovery_progress.first;
     pull_info.recovery_progress = push_op.after_progress;
-    logger().debug("new recovery_info {}, new progress {}",
-                  pull_info.recovery_info, pull_info.recovery_progress);
+    DEBUGDPP("new recovery_info {}, new progress {}",
+            pg, pull_info.recovery_info, pull_info.recovery_progress);
     interval_set<uint64_t> data_zeros;
     {
       uint64_t offset = push_op.before_progress.data_recovered_to;
@@ -923,8 +940,9 @@ RecoveryBackend::interruptible_future<>
 ReplicatedRecoveryBackend::handle_pull_response(
   Ref<MOSDPGPush> m)
 {
+  LOG_PREFIX(ReplicatedRecoveryBackend::handle_pull_response);
   if (pg.can_discard_replica_op(*m)) {
-    logger().debug("{}: discarding {}", __func__, *m);
+    DEBUGDPP("discarding {}", pg, *m);
     return seastar::now();
   }
   const PushOp& push_op = m->pushes[0]; //TODO: only one push per message for now.
@@ -938,17 +956,17 @@ ReplicatedRecoveryBackend::handle_pull_response(
            m->from, push_op.soid)));
   }
 
-  logger().debug("{}: {}", __func__, *m);
+  DEBUGDPP("{}", pg, *m);
   return seastar::do_with(PullOp(), [this, m](auto& response) {
     return seastar::do_with(ceph::os::Transaction(), m.get(),
-      [this, &response](auto& t, auto& m) {
+     [FNAME, this, &response](auto& t, auto& m) {
       pg_shard_t from = m->from;
       PushOp& push_op = m->pushes[0]; // only one push per message for now
       return _handle_pull_response(from, push_op, &response, &t
       ).then_interruptible(
-       [this, &t](bool complete) {
+       [FNAME, this, &t](bool complete) {
        epoch_t epoch_frozen = pg.get_osdmap_epoch();
-       logger().debug("ReplicatedRecoveryBackend::handle_pull_response: do_transaction...");
+       DEBUGDPP("submitting transaction", pg);
        return shard_services.get_store().do_transaction(coll, std::move(t))
          .then([this, epoch_frozen, complete,
          last_complete = pg.get_info().last_complete] {
@@ -982,7 +1000,8 @@ ReplicatedRecoveryBackend::_handle_push(
   PushReplyOp *response,
   ceph::os::Transaction *t)
 {
-  logger().debug("{}", __func__);
+  LOG_PREFIX(ReplicatedRecoveryBackend::_handle_push);
+  DEBUGDPP("{}", pg);
 
   bool first = push_op.before_progress.first;
   interval_set<uint64_t> data_zeros;
@@ -1021,23 +1040,24 @@ RecoveryBackend::interruptible_future<>
 ReplicatedRecoveryBackend::handle_push(
   Ref<MOSDPGPush> m)
 {
+  LOG_PREFIX(ReplicatedRecoveryBackend::handle_push);
   if (pg.can_discard_replica_op(*m)) {
-    logger().debug("{}: discarding {}", __func__, *m);
+    DEBUGDPP("discarding {}", pg, *m);
     return seastar::now();
   }
   if (pg.is_primary()) {
     return handle_pull_response(m);
   }
 
-  logger().debug("{}: {}", __func__, *m);
-  return seastar::do_with(PushReplyOp(), [this, m](auto& response) {
+  DEBUGDPP("{}", pg, *m);
+  return seastar::do_with(PushReplyOp(), [FNAME, this, m](auto& response) {
     PushOp& push_op = m->pushes[0]; // TODO: only one push per message for now
     return seastar::do_with(ceph::os::Transaction(),
-      [this, m, &push_op, &response](auto& t) {
+      [FNAME, this, m, &push_op, &response](auto& t) {
       return _handle_push(m->from, push_op, &response, &t).then_interruptible(
-       [this, &t] {
+       [FNAME, this, &t] {
        epoch_t epoch_frozen = pg.get_osdmap_epoch();
-       logger().debug("ReplicatedRecoveryBackend::handle_push: do_transaction...");
+       DEBUGDPP("submitting transaction", pg);
        return interruptor::make_interruptible(
            shard_services.get_store().do_transaction(coll, std::move(t))).then_interruptible(
          [this, epoch_frozen, last_complete = pg.get_info().last_complete] {
@@ -1065,12 +1085,13 @@ ReplicatedRecoveryBackend::_handle_push_reply(
   pg_shard_t peer,
   const PushReplyOp &op)
 {
+  LOG_PREFIX(ReplicatedRecoveryBackend::handle_push);
   const hobject_t& soid = op.soid;
-  logger().debug("{}, soid {}, from {}", __func__, soid, peer);
+  DEBUGDPP("soid {}, from {}", pg, soid, peer);
   auto recovering_iter = recovering.find(soid);
   if (recovering_iter == recovering.end()
       || !recovering_iter->second->pushing.count(peer)) {
-    logger().debug("huh, i wasn't pushing {} to osd.{}", soid, peer);
+    DEBUGDPP("huh, i wasn't pushing {} to osd.{}", pg, soid, peer);
     return seastar::make_ready_future<std::optional<PushOp>>();
   } else {
     auto& push_info = recovering_iter->second->pushing[peer];
@@ -1103,7 +1124,8 @@ RecoveryBackend::interruptible_future<>
 ReplicatedRecoveryBackend::handle_push_reply(
   Ref<MOSDPGPushReply> m)
 {
-  logger().debug("{}: {}", __func__, *m);
+  LOG_PREFIX(ReplicatedRecoveryBackend::handle_push_reply);
+  DEBUGDPP("{}", pg, *m);
   auto from = m->from;
   auto& push_reply = m->replies[0]; //TODO: only one reply per message
 
@@ -1133,7 +1155,8 @@ ReplicatedRecoveryBackend::trim_pushed_data(
   const interval_set<uint64_t> &intervals_received,
   ceph::bufferlist data_received)
 {
-  logger().debug("{}", __func__);
+  LOG_PREFIX(ReplicatedRecoveryBackend::trim_pushed_data);
+  DEBUGDPP("", pg);
   // what i have is only a subset of what i want
   if (intervals_received.subset_of(copy_subset)) {
     return {intervals_received, data_received};
@@ -1168,6 +1191,7 @@ ReplicatedRecoveryBackend::prep_push_target(
   const map<string, bufferlist, less<>>& attrs,
   bufferlist&& omap_header)
 {
+  LOG_PREFIX(ReplicatedRecoveryBackend::prep_push_target);
   if (!first) {
     return seastar::make_ready_future<hobject_t>(
       get_temp_recovery_object(recovery_info.soid,
@@ -1181,8 +1205,8 @@ ReplicatedRecoveryBackend::prep_push_target(
   } else {
     target_oid = ghobject_t(get_temp_recovery_object(recovery_info.soid,
                                                      recovery_info.version));
-    logger().debug("{}: Adding oid {} in the temp collection",
-                   __func__, target_oid);
+    DEBUGDPP("Adding oid {} in the temp collection",
+            pg, target_oid);
     add_temp_obj(target_oid.hobj);
   }
   // create a new object
@@ -1214,7 +1238,7 @@ ReplicatedRecoveryBackend::prep_push_target(
   // clone overlap content in local object if using a new object
   return interruptor::make_interruptible(store->stat(coll, ghobject_t(recovery_info.soid)))
   .then_interruptible(
-    [this, &recovery_info, t, target_oid] (auto st) {
+    [FNAME, this, &recovery_info, t, target_oid] (auto st) {
     // TODO: pg num bytes counting
     uint64_t local_size = std::min(recovery_info.size, (uint64_t)st.st_size);
     interval_set<uint64_t> local_intervals_included, local_intervals_excluded;
@@ -1224,8 +1248,8 @@ ReplicatedRecoveryBackend::prep_push_target(
       local_intervals_included.subtract(local_intervals_excluded);
     }
     for (auto [off, len] : local_intervals_included) {
-      logger().debug(" clone_range {} {}~{}",
-                     recovery_info.soid, off, len);
+      DEBUGDPP("clone_range {} {}~{}",
+              pg, recovery_info.soid, off, len);
       t->clone_range(coll->get_cid(), ghobject_t(recovery_info.soid),
                      target_oid, off, len, off);
     }
@@ -1246,11 +1270,12 @@ ReplicatedRecoveryBackend::submit_push_data(
   map<string, bufferlist>&& omap_entries,
   ObjectStore::Transaction *t)
 {
-  logger().debug("{}", __func__);
+  LOG_PREFIX(ReplicatedRecoveryBackend::submit_push_data);
+  DEBUGDPP("", pg);
   return prep_push_target(recovery_info, first, complete,
                           clear_omap, t, attrs,
                           std::move(omap_header)).then_interruptible(
-    [this,
+    [FNAME, this,
      &recovery_info, t,
      first, complete,
      data_zeros=std::move(data_zeros),
@@ -1266,10 +1291,10 @@ ReplicatedRecoveryBackend::submit_push_data(
       assert(intervals_included.subset_of(data_zeros));
       data_zeros.subtract(intervals_included);
 
-      logger().debug("submit_push_data recovering object {} copy_subset: {} "
-         "intervals_included: {} data_zeros: {}",
-         recovery_info.soid, recovery_info.copy_subset,
-         intervals_included, data_zeros);
+      DEBUGDPP("recovering object {} copy_subset: {} "
+              "intervals_included: {} data_zeros: {}",
+              pg, recovery_info.soid, recovery_info.copy_subset,
+              intervals_included, data_zeros);
 
       for (auto [start, len] : data_zeros) {
         t->zero(coll->get_cid(), ghobject_t(target_oid), start, len);
@@ -1291,8 +1316,8 @@ ReplicatedRecoveryBackend::submit_push_data(
 
     if (complete) {
       if (!first) {
-       logger().debug("submit_push_data: Removing oid {} from the temp collection",
-         target_oid);
+       DEBUGDPP("Removing oid {} from the temp collection",
+                pg, target_oid);
        clear_temp_obj(target_oid);
        t->remove(coll->get_cid(), ghobject_t(recovery_info.soid));
        t->collection_move_rename(coll->get_cid(), ghobject_t(target_oid),
@@ -1300,7 +1325,7 @@ ReplicatedRecoveryBackend::submit_push_data(
       }
       submit_push_complete(recovery_info, t);
     }
-    logger().debug("submit_push_data: done");
+    DEBUGDPP("done", pg);
     return seastar::make_ready_future<>();
   });
 }
@@ -1309,9 +1334,10 @@ void ReplicatedRecoveryBackend::submit_push_complete(
   const ObjectRecoveryInfo &recovery_info,
   ObjectStore::Transaction *t)
 {
+  LOG_PREFIX(ReplicatedRecoveryBackend::submit_push_complete);
   for (const auto& [oid, extents] : recovery_info.clone_subset) {
     for (const auto& [off, len] : extents) {
-      logger().debug(" clone_range {} {}~{}", oid, off, len);
+      DEBUGDPP("clone_range {} {}~{}", pg, oid, off, len);
       t->clone_range(coll->get_cid(), ghobject_t(oid), ghobject_t(recovery_info.soid),
                      off, len, off);
     }
@@ -1336,8 +1362,9 @@ ReplicatedRecoveryBackend::handle_recovery_op(
   Ref<MOSDFastDispatchOp> m,
   crimson::net::ConnectionXcoreRef conn)
 {
+  LOG_PREFIX(ReplicatedRecoveryBackend::handle_recovery_op);
   if (pg.can_discard_replica_op(*m)) {
-    logger().debug("{}: discarding {}", __func__, *m);
+    DEBUGDPP("discarding {}", pg, *m);
     return seastar::now();
   }