#include <seastar/core/future.hh>
#include <seastar/core/do_with.hh>
+#include "crimson/common/log.h"
#include "crimson/osd/pg.h"
#include "crimson/osd/pg_backend.h"
#include "osd/osd_types_fmt.h"
#include "replicated_recovery_backend.h"
#include "msg/Message.h"
-namespace {
- seastar::logger& logger() {
- return crimson::get_logger(ceph_subsys_osd);
- }
-}
+SET_SUBSYS(osd);
using std::less;
using std::map;
const hobject_t& soid,
eversion_t need)
{
- logger().debug("{}: {}, {}", __func__, soid, need);
+ LOG_PREFIX(ReplicatedRecoveryBackend::recover_object);
+ DEBUGDPP("{}, {}", pg, soid, need);
// always add_recovering(soid) before recover_object(soid)
assert(is_recovering(soid));
// start tracking the recovery of soid
- return maybe_pull_missing_obj(soid, need).then_interruptible([this, soid, need] {
- logger().debug("recover_object: loading obc: {}", soid);
- return pg.obc_loader.with_obc<RWState::RWREAD>(soid,
- [this, soid, need](auto head, auto obc) {
- if (!obc->obs.exists) {
- // XXX: this recovery must be triggered by backfills and the corresponding
- // object must have been deleted by some client request after the object
- // is enqueued for push but before the lock is acquired by the recovery.
- //
- // Abort the recovery in this case, a "recover_delete" must have been
- // added for this object by the client request that deleted it.
- return interruptor::now();
- }
- logger().debug("recover_object: loaded obc: {}", obc->obs.oi.soid);
- auto& recovery_waiter = get_recovering(soid);
- recovery_waiter.obc = obc;
- return maybe_push_shards(head, soid, need);
- }, false).handle_error_interruptible(
- crimson::osd::PG::load_obc_ertr::all_same_way([soid](auto& code) {
- // TODO: may need eio handling?
- logger().error("recover_object saw error code {}, ignoring object {}",
- code, soid);
- return seastar::now();
- }));
+ return maybe_pull_missing_obj(
+ soid, need
+ ).then_interruptible([FNAME, this, soid, need] {
+ DEBUGDPP("loading obc: {}", pg, soid);
+ return pg.obc_loader.with_obc<RWState::RWREAD>(
+ soid,
+ [FNAME, this, soid, need](auto head, auto obc) {
+ if (!obc->obs.exists) {
+ // XXX: this recovery must be triggered by backfills and the corresponding
+ // object must have been deleted by some client request after the object
+ // is enqueued for push but before the lock is acquired by the recovery.
+ //
+ // Abort the recovery in this case, a "recover_delete" must have been
+ // added for this object by the client request that deleted it.
+ return interruptor::now();
+ }
+ DEBUGDPP("loaded obc: {}", pg, obc->obs.oi.soid);
+ auto& recovery_waiter = get_recovering(soid);
+ recovery_waiter.obc = obc;
+ return maybe_push_shards(head, soid, need);
+ }, false).handle_error_interruptible(
+ crimson::osd::PG::load_obc_ertr::all_same_way(
+ [FNAME, this, soid](auto& code) {
+ // TODO: may need eio handling?
+ ERRORDPP("saw error code {}, ignoring object {}",
+ pg, code, soid);
+ return seastar::now();
+ }));
});
}
const hobject_t& soid,
eversion_t need)
{
- logger().debug("{}: {}, {}", __func__, soid, need);
+ LOG_PREFIX(ReplicatedRecoveryBackend::maybe_pull_missing_obj);
+ DEBUGDPP("{}, {}", pg, soid, need);
pg_missing_tracker_t local_missing = pg.get_local_missing();
if (!local_missing.is_missing(soid)) {
// object is not missing, don't pull
const hobject_t& soid,
eversion_t need)
{
- logger().debug("{}: {}, {}", __func__, soid, need);
+ LOG_PREFIX(ReplicatedRecoveryBackend::push_delete);
+ DEBUGDPP("{}, {}", pg, soid, need);
epoch_t min_epoch = pg.get_last_peering_reset();
assert(pg.get_acting_recovery_backfill().size() > 0);
return interruptor::parallel_for_each(pg.get_acting_recovery_backfill(),
- [this, soid, need, min_epoch](pg_shard_t shard)
+ [FNAME, this, soid, need, min_epoch](pg_shard_t shard)
-> interruptible_future<> {
if (shard == pg.get_pg_whoami())
return seastar::make_ready_future<>();
if (iter == pg.get_shard_missing().end())
return seastar::make_ready_future<>();
if (iter->second.is_missing(soid)) {
- logger().debug("push_delete: will remove {} from {}", soid, shard);
+ DEBUGDPP("will remove {} from {}", pg, soid, shard);
pg.begin_peer_recover(shard, soid);
spg_t target_pg(pg.get_info().pgid.pgid, shard.shard);
auto msg = crimson::make_message<MOSDPGRecoveryDelete>(
ReplicatedRecoveryBackend::handle_recovery_delete(
Ref<MOSDPGRecoveryDelete> m)
{
- logger().debug("{}: {}", __func__, *m);
+ LOG_PREFIX(ReplicatedRecoveryBackend::handle_recovery_delete);
+ DEBUGDPP("{}", pg, *m);
auto& p = m->objects.front(); //TODO: only one delete per message for now.
return local_recover_delete(p.first, p.second, pg.get_osdmap_epoch())
bool is_delete,
epoch_t epoch_frozen)
{
- logger().debug("{}", __func__);
+ LOG_PREFIX(ReplicatedRecoveryBackend::on_local_recover_persist);
+ DEBUGDPP("", pg);
return seastar::do_with(
ceph::os::Transaction(),
- [this, soid, &_recovery_info, is_delete, epoch_frozen](auto &t) {
+ [FNAME, this, soid, &_recovery_info, is_delete, epoch_frozen](auto &t) {
return pg.get_recovery_handler()->on_local_recover(
soid, _recovery_info, is_delete, t
- ).then_interruptible([this, &t] {
- logger().debug("ReplicatedRecoveryBackend::{}: do_transaction...", __func__);
+ ).then_interruptible([FNAME, this, &t] {
+ DEBUGDPP("submitting transaction", pg);
return shard_services.get_store().do_transaction(coll, std::move(t));
}).then_interruptible(
[this, epoch_frozen, last_complete = pg.get_info().last_complete] {
eversion_t need,
epoch_t epoch_to_freeze)
{
- logger().debug("{}: {}, {}", __func__, soid, need);
- return backend->load_metadata(soid).safe_then_interruptible([this]
+ LOG_PREFIX(ReplicatedRecoveryBackend::local_recover_delete);
+ DEBUGDPP("{}, {}", pg, soid, need);
+ return backend->load_metadata(soid).safe_then_interruptible([FNAME, this]
(auto lomt) -> interruptible_future<> {
if (lomt->os.exists) {
return seastar::do_with(ceph::os::Transaction(),
- [this, lomt = std::move(lomt)](auto& txn) mutable {
+ [FNAME, this, lomt = std::move(lomt)](auto& txn) mutable {
return interruptor::async([this, lomt=std::move(lomt), &txn] {
pg.remove_maybe_snapmapped_object(txn, lomt->os.oi.soid);
}).then_interruptible(
- [this, &txn]() mutable {
- logger().debug("ReplicatedRecoveryBackend::local_recover_delete: do_transaction...");
+ [FNAME, this, &txn]() mutable {
+ DEBUGDPP("submitting transaction", pg);
return shard_services.get_store().do_transaction(coll,
std::move(txn));
});
ReplicatedRecoveryBackend::recover_delete(
const hobject_t &soid, eversion_t need)
{
- logger().debug("{}: {}, {}", __func__, soid, need);
+ LOG_PREFIX(ReplicatedRecoveryBackend::recover_delete);
+ DEBUGDPP("{}, {}", pg, soid, need);
epoch_t cur_epoch = pg.get_osdmap_epoch();
return seastar::do_with(object_stat_sum_t(),
- [this, soid, need, cur_epoch](auto& stat_diff) {
+ [FNAME, this, soid, need, cur_epoch](auto& stat_diff) {
return local_recover_delete(soid, need, cur_epoch).then_interruptible(
- [this, &stat_diff, cur_epoch, soid, need]()
+ [FNAME, this, &stat_diff, cur_epoch, soid, need]()
-> interruptible_future<> {
if (!pg.has_reset_since(cur_epoch)) {
bool object_missing = false;
if (shard == pg.get_pg_whoami())
continue;
if (pg.get_shard_missing(shard)->is_missing(soid)) {
- logger().debug("recover_delete: soid {} needs to deleted from replca {}",
- soid, shard);
+ DEBUGDPP(
+ "soid {} needs to be deleted from replica {}",
+ pg, soid, shard);
object_missing = true;
break;
}
eversion_t need,
pg_shard_t pg_shard)
{
- logger().debug("{}: {}, {}", __func__, soid, need);
+ LOG_PREFIX(ReplicatedRecoveryBackend::prep_push_to_replica);
+ DEBUGDPP("{}, {}", pg, soid, need);
auto& recovery_waiter = get_recovering(soid);
auto& obc = recovery_waiter.obc;
// try to base push off of clones that succeed/preceed poid
// we need the head (and current SnapSet) locally to do that.
if (pg.get_local_missing().is_missing(head)) {
- logger().debug("{} missing head {}, pushing raw clone",
- __func__, head);
+ DEBUGDPP("missing head {}, pushing raw clone",
+ pg, head);
if (obc->obs.oi.size) {
subsets.data_subset.insert(0, obc->obs.oi.size);
}
auto ssc = obc->ssc;
ceph_assert(ssc);
push_info_ss = ssc->snapset;
- logger().debug("push_to_replica snapset is {}",
- ssc->snapset);
+ DEBUGDPP("snapset is {}", pg, ssc->snapset);
subsets = crimson::osd::calc_clone_subsets(
ssc->snapset, soid,
// base this on partially on replica's clones?
auto ssc = obc->ssc;
ceph_assert(ssc);
- logger().debug("push_to_replica snapset is {}",
- ssc->snapset);
+ DEBUGDPP("snapset is {}", pg, ssc->snapset);
subsets = crimson::osd::calc_head_subsets(
obc->obs.oi.size,
ssc->snapset, soid,
const crimson::osd::subsets_t& subsets,
const SnapSet push_info_ss)
{
- logger().debug("{}: {}, {}", __func__, soid, need);
+ LOG_PREFIX(ReplicatedRecoveryBackend::prep_push);
+ DEBUGDPP("{}, {}", pg, soid, need);
auto& recovery_waiter = get_recovering(soid);
auto& obc = recovery_waiter.obc;
PullOp& pull_op,
pull_info_t& pull_info,
const hobject_t& soid,
- eversion_t need) {
- logger().debug("{}: {}, {}", __func__, soid, need);
+ eversion_t need)
+{
+ LOG_PREFIX(ReplicatedRecoveryBackend::prepare_pull);
+ DEBUGDPP("{}, {}", pg, soid, need);
pg_missing_tracker_t local_missing = pg.get_local_missing();
const auto missing_iter = local_missing.get_items().find(soid);
const hobject_t& soid,
const crimson::osd::SnapSetContextRef ssc)
{
+ LOG_PREFIX(ReplicatedRecoveryBackend::set_recovery_info);
pg_missing_tracker_t local_missing = pg.get_local_missing();
const auto missing_iter = local_missing.get_items().find(soid);
ObjectRecoveryInfo recovery_info;
auto subsets = crimson::osd::calc_clone_subsets(
ssc->snapset, soid, local_missing, pg.get_info().last_backfill);
crimson::osd::set_subsets(subsets, recovery_info);
- logger().debug("{}: pulling {}", __func__, recovery_info);
+ DEBUGDPP("pulling {}", pg, recovery_info);
ceph_assert(ssc->snapset.clone_size.count(soid.snap));
recovery_info.size = ssc->snapset.clone_size[soid.snap];
} else {
const ObjectRecoveryProgress& progress,
object_stat_sum_t* stat)
{
- logger().debug("{} {} @{}",
- __func__, recovery_info.soid, recovery_info.version);
+ LOG_PREFIX(ReplicatedRecoveryBackend::build_push_op);
+ DEBUGDPP("{} @{}", pg, recovery_info.soid, recovery_info.version);
return seastar::do_with(ObjectRecoveryProgress(progress),
uint64_t(crimson::common::local_conf()
->osd_recovery_max_chunk),
recovery_info.version,
PushOp(),
- [this, &recovery_info, &progress, stat]
+ [FNAME, this, &recovery_info, &progress, stat]
(auto& new_progress, auto& available, auto& v, auto& push_op) {
return read_metadata_for_push_op(recovery_info.soid,
progress, new_progress,
v, &push_op
- ).then_interruptible([&](eversion_t local_ver) mutable {
+ ).then_interruptible([&, FNAME](eversion_t local_ver) mutable {
// If requestor didn't know the version, use ours
if (v == eversion_t()) {
v = local_ver;
} else if (v != local_ver) {
- logger().error("build_push_op: {} push {} v{} failed because local copy is {}",
- pg.get_pgid(), recovery_info.soid, recovery_info.version, local_ver);
+ ERRORDPP(
+ "push {} v{} failed because local copy is {}",
+ pg, recovery_info.soid, recovery_info.version, local_ver);
// TODO: bail out
}
return read_omap_for_push_op(recovery_info.soid,
progress,
new_progress,
available, &push_op);
- }).then_interruptible([this, &recovery_info, &progress,
+ }).then_interruptible([FNAME, this, &recovery_info, &progress,
&available, &push_op]() mutable {
- logger().debug("build_push_op: available: {}, copy_subset: {}",
- available, recovery_info.copy_subset);
+ DEBUGDPP("available: {}, copy_subset: {}",
+ pg, available, recovery_info.copy_subset);
return read_object_for_push_op(recovery_info.soid,
recovery_info.copy_subset,
progress.data_recovered_to,
available, &push_op);
- }).then_interruptible([&recovery_info, &v, &progress,
+ }).then_interruptible([FNAME, this, &recovery_info, &v, &progress,
&new_progress, stat, &push_op]
(uint64_t recovered_to) mutable {
new_progress.data_recovered_to = recovered_to;
push_op.recovery_info = recovery_info;
push_op.after_progress = new_progress;
push_op.before_progress = progress;
- logger().debug("build_push_op: push_op version:"
- " {}, push_op data length: {}",
- push_op.version, push_op.data.length());
+ DEBUGDPP("push_op version: {}, push_op data length: {}",
+ pg, push_op.version, push_op.data.length());
return seastar::make_ready_future<PushOp>(std::move(push_op));
});
});
eversion_t ver,
PushOp* push_op)
{
- logger().debug("{}, {}", __func__, oid);
+ LOG_PREFIX(ReplicatedRecoveryBackend::read_metadata_for_push_op);
+ DEBUGDPP("{}", pg, oid);
if (!progress.first) {
return seastar::make_ready_future<eversion_t>(ver);
}
coll, ghobject_t(oid), CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
).handle_error_interruptible<false>(
crimson::os::FuturizedStore::Shard::read_errorator::all_same_way(
- [oid] (const std::error_code& e) {
- logger().debug("read_metadata_for_push_op, error {} when getting omap header: {}", e, oid);
+ [FNAME, this, oid] (const std::error_code& e) {
+ DEBUGDPP("error {} when getting omap header: {}", pg, e, oid);
return seastar::make_ready_future<bufferlist>();
})),
interruptor::make_interruptible(
store->get_attrs(coll, ghobject_t(oid), CEPH_OSD_OP_FLAG_FADVISE_DONTNEED)
).handle_error_interruptible<false>(
crimson::os::FuturizedStore::Shard::get_attrs_ertr::all_same_way(
- [oid] (const std::error_code& e) {
- logger().debug("read_metadata_for_push_op, error {} when getting attrs: {}", e, oid);
+ [FNAME, this, oid] (const std::error_code& e) {
+ DEBUGDPP("error {} when getting attrs: {}", pg, e, oid);
return seastar::make_ready_future<crimson::os::FuturizedStore::Shard::attrs_t>();
}))
- )).then_unpack_interruptible([&new_progress, push_op](auto bl, auto attrs) {
+ )).then_unpack_interruptible([FNAME, this, &new_progress, push_op](auto bl, auto attrs) {
if (bl.length() == 0) {
- logger().warn("read_metadata_for_push_op: fail to read omap header");
+ WARNDPP("fail to read omap header", pg);
} else if (attrs.empty()) {
- logger().error("read_metadata_for_push_op: fail to read attrs");
+ ERRORDPP("fail to read attrs", pg);
return eversion_t{};
}
push_op->omap_header.claim_append(std::move(bl));
for (auto&& [key, val] : attrs) {
push_op->attrset.emplace(std::move(key), std::move(val));
}
- logger().debug("read_metadata_for_push_op: {}", push_op->attrset[OI_ATTR]);
+ DEBUGDPP("{}", pg, push_op->attrset[OI_ATTR]);
object_info_t oi;
oi.decode_no_oid(push_op->attrset[OI_ATTR]);
new_progress.first = false;
uint64_t max_len,
PushOp* push_op)
{
+ LOG_PREFIX(ReplicatedRecoveryBackend::read_object_for_push_op);
if (max_len == 0 || copy_subset.empty()) {
push_op->data_included.clear();
return seastar::make_ready_future<uint64_t>(offset);
recovered_to = push_op->data_included.range_end();
}
return seastar::make_ready_future<uint64_t>(recovered_to);
- }, PGBackend::read_errorator::all_same_way([](auto e) {
- logger().debug("build_push_op: read exception");
+ }, PGBackend::read_errorator::all_same_way([FNAME, this](auto e) {
+ DEBUGDPP("read exception", pg);
return seastar::make_exception_future<uint64_t>(e);
}));
}
RecoveryBackend::interruptible_future<>
ReplicatedRecoveryBackend::handle_pull(Ref<MOSDPGPull> m)
{
- logger().debug("{}: {}", __func__, *m);
+ LOG_PREFIX(ReplicatedRecoveryBackend::handle_pull);
+ DEBUGDPP("{}", pg, *m);
if (pg.can_discard_replica_op(*m)) {
- logger().debug("{}: discarding {}", __func__, *m);
+ DEBUGDPP("discarding {}", pg, *m);
return seastar::now();
}
- return seastar::do_with(m->take_pulls(), [this, from=m->from](auto& pulls) {
- return interruptor::parallel_for_each(pulls,
- [this, from](auto& pull_op) {
+ return seastar::do_with(m->take_pulls(), [FNAME, this, from=m->from](auto& pulls) {
+ return interruptor::parallel_for_each(
+ pulls,
+ [FNAME, this, from](auto& pull_op) {
const hobject_t& soid = pull_op.soid;
- logger().debug("handle_pull: {}", soid);
+ DEBUGDPP("{}", pg, soid);
return backend->stat(coll, ghobject_t(soid)).then_interruptible(
[this, &pull_op](auto st) {
ObjectRecoveryInfo &recovery_info = pull_op.recovery_info;
PullOp* response,
ceph::os::Transaction* t)
{
- logger().debug("handle_pull_response {} {} data.size() is {} data_included: {}",
- push_op.recovery_info, push_op.after_progress,
- push_op.data.length(), push_op.data_included);
+ LOG_PREFIX(ReplicatedRecoveryBackend::handle_pull);
+ DEBUGDPP("{} {} data.size() is {} data_included: {}",
+ pg, push_op.recovery_info, push_op.after_progress,
+ push_op.data.length(), push_op.data_included);
const hobject_t &hoid = push_op.soid;
auto& recovery_waiter = get_recovering(hoid);
if (pull_info.recovery_progress.first) {
prepare_waiter = pg.obc_loader.with_obc<RWState::RWNONE>(
pull_info.recovery_info.soid,
- [this, &pull_info, &recovery_waiter, &push_op](auto, auto obc) {
+ [FNAME, this, &pull_info, &recovery_waiter, &push_op](auto, auto obc) {
pull_info.obc = obc;
recovery_waiter.obc = obc;
obc->obs.oi.decode_no_oid(push_op.attrset.at(OI_ATTR),
obc->ssc->snapset = SnapSet(ss_attr_iter->second);
obc->ssc->exists = true;
} catch (const buffer::error&) {
- logger().warn("unable to decode SnapSet");
+ WARNDPP("unable to decode SnapSet", pg);
throw crimson::osd::invalid_argument();
}
assert(!pull_info.obc->ssc->exists ||
}, false).handle_error_interruptible(crimson::ct_error::assert_all{});
};
return prepare_waiter.then_interruptible(
- [this, &pull_info, &push_op, t, response]() mutable {
+ [FNAME, this, &pull_info, &push_op, t, response]() mutable {
const bool first = pull_info.recovery_progress.first;
pull_info.recovery_progress = push_op.after_progress;
- logger().debug("new recovery_info {}, new progress {}",
- pull_info.recovery_info, pull_info.recovery_progress);
+ DEBUGDPP("new recovery_info {}, new progress {}",
+ pg, pull_info.recovery_info, pull_info.recovery_progress);
interval_set<uint64_t> data_zeros;
{
uint64_t offset = push_op.before_progress.data_recovered_to;
ReplicatedRecoveryBackend::handle_pull_response(
Ref<MOSDPGPush> m)
{
+ LOG_PREFIX(ReplicatedRecoveryBackend::handle_pull_response);
if (pg.can_discard_replica_op(*m)) {
- logger().debug("{}: discarding {}", __func__, *m);
+ DEBUGDPP("discarding {}", pg, *m);
return seastar::now();
}
const PushOp& push_op = m->pushes[0]; //TODO: only one push per message for now.
m->from, push_op.soid)));
}
- logger().debug("{}: {}", __func__, *m);
+ DEBUGDPP("{}", pg, *m);
return seastar::do_with(PullOp(), [this, m](auto& response) {
return seastar::do_with(ceph::os::Transaction(), m.get(),
- [this, &response](auto& t, auto& m) {
+ [FNAME, this, &response](auto& t, auto& m) {
pg_shard_t from = m->from;
PushOp& push_op = m->pushes[0]; // only one push per message for now
return _handle_pull_response(from, push_op, &response, &t
).then_interruptible(
- [this, &t](bool complete) {
+ [FNAME, this, &t](bool complete) {
epoch_t epoch_frozen = pg.get_osdmap_epoch();
- logger().debug("ReplicatedRecoveryBackend::handle_pull_response: do_transaction...");
+ DEBUGDPP("submitting transaction", pg);
return shard_services.get_store().do_transaction(coll, std::move(t))
.then([this, epoch_frozen, complete,
last_complete = pg.get_info().last_complete] {
PushReplyOp *response,
ceph::os::Transaction *t)
{
- logger().debug("{}", __func__);
+ LOG_PREFIX(ReplicatedRecoveryBackend::_handle_push);
+ DEBUGDPP("{}", pg);
bool first = push_op.before_progress.first;
interval_set<uint64_t> data_zeros;
ReplicatedRecoveryBackend::handle_push(
Ref<MOSDPGPush> m)
{
+ LOG_PREFIX(ReplicatedRecoveryBackend::handle_push);
if (pg.can_discard_replica_op(*m)) {
- logger().debug("{}: discarding {}", __func__, *m);
+ DEBUGDPP("discarding {}", pg, *m);
return seastar::now();
}
if (pg.is_primary()) {
return handle_pull_response(m);
}
- logger().debug("{}: {}", __func__, *m);
- return seastar::do_with(PushReplyOp(), [this, m](auto& response) {
+ DEBUGDPP("{}", pg, *m);
+ return seastar::do_with(PushReplyOp(), [FNAME, this, m](auto& response) {
PushOp& push_op = m->pushes[0]; // TODO: only one push per message for now
return seastar::do_with(ceph::os::Transaction(),
- [this, m, &push_op, &response](auto& t) {
+ [FNAME, this, m, &push_op, &response](auto& t) {
return _handle_push(m->from, push_op, &response, &t).then_interruptible(
- [this, &t] {
+ [FNAME, this, &t] {
epoch_t epoch_frozen = pg.get_osdmap_epoch();
- logger().debug("ReplicatedRecoveryBackend::handle_push: do_transaction...");
+ DEBUGDPP("submitting transaction", pg);
return interruptor::make_interruptible(
shard_services.get_store().do_transaction(coll, std::move(t))).then_interruptible(
[this, epoch_frozen, last_complete = pg.get_info().last_complete] {
pg_shard_t peer,
const PushReplyOp &op)
{
+ LOG_PREFIX(ReplicatedRecoveryBackend::handle_push);
const hobject_t& soid = op.soid;
- logger().debug("{}, soid {}, from {}", __func__, soid, peer);
+ DEBUGDPP("soid {}, from {}", pg, soid, peer);
auto recovering_iter = recovering.find(soid);
if (recovering_iter == recovering.end()
|| !recovering_iter->second->pushing.count(peer)) {
- logger().debug("huh, i wasn't pushing {} to osd.{}", soid, peer);
+ DEBUGDPP("huh, i wasn't pushing {} to osd.{}", pg, soid, peer);
return seastar::make_ready_future<std::optional<PushOp>>();
} else {
auto& push_info = recovering_iter->second->pushing[peer];
ReplicatedRecoveryBackend::handle_push_reply(
Ref<MOSDPGPushReply> m)
{
- logger().debug("{}: {}", __func__, *m);
+ LOG_PREFIX(ReplicatedRecoveryBackend::handle_push_reply);
+ DEBUGDPP("{}", pg, *m);
auto from = m->from;
auto& push_reply = m->replies[0]; //TODO: only one reply per message
const interval_set<uint64_t> &intervals_received,
ceph::bufferlist data_received)
{
- logger().debug("{}", __func__);
+ LOG_PREFIX(ReplicatedRecoveryBackend::trim_pushed_data);
+ DEBUGDPP("", pg);
// what i have is only a subset of what i want
if (intervals_received.subset_of(copy_subset)) {
return {intervals_received, data_received};
const map<string, bufferlist, less<>>& attrs,
bufferlist&& omap_header)
{
+ LOG_PREFIX(ReplicatedRecoveryBackend::prep_push_target);
if (!first) {
return seastar::make_ready_future<hobject_t>(
get_temp_recovery_object(recovery_info.soid,
} else {
target_oid = ghobject_t(get_temp_recovery_object(recovery_info.soid,
recovery_info.version));
- logger().debug("{}: Adding oid {} in the temp collection",
- __func__, target_oid);
+ DEBUGDPP("Adding oid {} in the temp collection",
+ pg, target_oid);
add_temp_obj(target_oid.hobj);
}
// create a new object
// clone overlap content in local object if using a new object
return interruptor::make_interruptible(store->stat(coll, ghobject_t(recovery_info.soid)))
.then_interruptible(
- [this, &recovery_info, t, target_oid] (auto st) {
+ [FNAME, this, &recovery_info, t, target_oid] (auto st) {
// TODO: pg num bytes counting
uint64_t local_size = std::min(recovery_info.size, (uint64_t)st.st_size);
interval_set<uint64_t> local_intervals_included, local_intervals_excluded;
local_intervals_included.subtract(local_intervals_excluded);
}
for (auto [off, len] : local_intervals_included) {
- logger().debug(" clone_range {} {}~{}",
- recovery_info.soid, off, len);
+ DEBUGDPP("clone_range {} {}~{}",
+ pg, recovery_info.soid, off, len);
t->clone_range(coll->get_cid(), ghobject_t(recovery_info.soid),
target_oid, off, len, off);
}
map<string, bufferlist>&& omap_entries,
ObjectStore::Transaction *t)
{
- logger().debug("{}", __func__);
+ LOG_PREFIX(ReplicatedRecoveryBackend::submit_push_data);
+ DEBUGDPP("", pg);
return prep_push_target(recovery_info, first, complete,
clear_omap, t, attrs,
std::move(omap_header)).then_interruptible(
- [this,
+ [FNAME, this,
&recovery_info, t,
first, complete,
data_zeros=std::move(data_zeros),
assert(intervals_included.subset_of(data_zeros));
data_zeros.subtract(intervals_included);
- logger().debug("submit_push_data recovering object {} copy_subset: {} "
- "intervals_included: {} data_zeros: {}",
- recovery_info.soid, recovery_info.copy_subset,
- intervals_included, data_zeros);
+ DEBUGDPP("recovering object {} copy_subset: {} "
+ "intervals_included: {} data_zeros: {}",
+ pg, recovery_info.soid, recovery_info.copy_subset,
+ intervals_included, data_zeros);
for (auto [start, len] : data_zeros) {
t->zero(coll->get_cid(), ghobject_t(target_oid), start, len);
if (complete) {
if (!first) {
- logger().debug("submit_push_data: Removing oid {} from the temp collection",
- target_oid);
+ DEBUGDPP("Removing oid {} from the temp collection",
+ pg, target_oid);
clear_temp_obj(target_oid);
t->remove(coll->get_cid(), ghobject_t(recovery_info.soid));
t->collection_move_rename(coll->get_cid(), ghobject_t(target_oid),
}
submit_push_complete(recovery_info, t);
}
- logger().debug("submit_push_data: done");
+ DEBUGDPP("done", pg);
return seastar::make_ready_future<>();
});
}
const ObjectRecoveryInfo &recovery_info,
ObjectStore::Transaction *t)
{
+ LOG_PREFIX(ReplicatedRecoveryBackend::submit_push_complete);
for (const auto& [oid, extents] : recovery_info.clone_subset) {
for (const auto& [off, len] : extents) {
- logger().debug(" clone_range {} {}~{}", oid, off, len);
+ DEBUGDPP("clone_range {} {}~{}", pg, oid, off, len);
t->clone_range(coll->get_cid(), ghobject_t(oid), ghobject_t(recovery_info.soid),
off, len, off);
}
Ref<MOSDFastDispatchOp> m,
crimson::net::ConnectionXcoreRef conn)
{
+ LOG_PREFIX(ReplicatedRecoveryBackend::handle_recovery_op);
if (pg.can_discard_replica_op(*m)) {
- logger().debug("{}: discarding {}", __func__, *m);
+ DEBUGDPP("discarding {}", pg, *m);
return seastar::now();
}