#include "messages/MOSDOpReply.h"
#include "crimson/common/exception.h"
+#include "crimson/common/log.h"
#include "crimson/osd/pg.h"
#include "crimson/osd/osd.h"
#include "common/Formatter.h"
#include "crimson/osd/osd_connection_priv.h"
#include "osd/object_state_fmt.h"
-namespace {
- seastar::logger& logger() {
- return crimson::get_logger(ceph_subsys_osd);
- }
-}
-
SET_SUBSYS(osd);
namespace crimson::osd {
{
LOG_PREFIX(ClientRequest::Orderer::requeue);
for (auto &req: list) {
- DEBUGI("{}: {} requeueing {}", __func__, *pg, req);
+ DEBUGDPP("requeueing {}", *pg, req);
req.reset_instance_handle();
std::ignore = req.with_pg_int(shard_services, pg);
}
}
-void ClientRequest::Orderer::clear_and_cancel()
+void ClientRequest::Orderer::clear_and_cancel(PG &pg)
{
LOG_PREFIX(ClientRequest::Orderer::clear_and_cancel);
for (auto i = list.begin(); i != list.end(); ) {
- DEBUGI(
- "ClientRequest::Orderer::clear_and_cancel: {}",
- *i);
+ DEBUGDPP("{}", pg, *i);
i->complete_request();
remove_request(*(i++));
}
ClientRequest::~ClientRequest()
{
- LOG_PREFIX(ClientRequest::~ClientRequest);
- DEBUGI("{}: destroying", *this);
+ LOG_PREFIX(~ClientRequest);
+ DEBUG("{}: destroying", *this);
}
void ClientRequest::print(std::ostream &lhs) const
void ClientRequest::dump_detail(Formatter *f) const
{
LOG_PREFIX(ClientRequest::dump_detail);
- DEBUGI("{}: dumping", *this);
+ TRACE("{}: dumping", *this);
std::apply([f] (auto... event) {
(..., event.dump(f));
}, tracking_events);
{
LOG_PREFIX(ClientRequest::with_pg_int);
epoch_t same_interval_since = pgref->get_interval_start_epoch();
- DEBUGI("{} same_interval_since: {}", *this, same_interval_since);
+ DEBUGDPP("{}: same_interval_since: {}", *pgref, *this, same_interval_since);
if (m->finish_decode()) {
m->clear_payload();
}
auto instance_handle = get_instance_handle();
auto &ihref = *instance_handle;
return interruptor::with_interruption(
- [this, pgref, this_instance_id, &ihref, &shard_services]() mutable {
- LOG_PREFIX(ClientRequest::with_pg_int);
- DEBUGI("{} start", *this);
+ [FNAME, this, pgref, this_instance_id, &ihref, &shard_services]() mutable {
+ DEBUGDPP("{} start", *pgref, *this);
PG &pg = *pgref;
if (pg.can_discard_op(*m)) {
return shard_services.send_incremental_map(
std::ref(*conn), m->get_map_epoch()
- ).then([this, this_instance_id, pgref] {
- LOG_PREFIX(ClientRequest::with_pg_int);
- DEBUGI("{}.{}: discarding", *this, this_instance_id);
+ ).then([FNAME, this, this_instance_id, pgref] {
+ DEBUGDPP("{}: discarding {}", *pgref, *this, this_instance_id);
pgref->client_request_orderer.remove_request(*this);
complete_request();
return interruptor::now();
});
}
+ DEBUGDPP("{}.{}: entering await_map stage",
+ *pgref, *this, this_instance_id);
return ihref.enter_stage<interruptor>(client_pp(pg).await_map, *this
- ).then_interruptible([this, this_instance_id, &pg, &ihref] {
- LOG_PREFIX(ClientRequest::with_pg_int);
- DEBUGI("{}.{}: after await_map stage", *this, this_instance_id);
+ ).then_interruptible([FNAME, this, this_instance_id, &pg, &ihref] {
+ DEBUGDPP("{}.{}: entered await_map stage, waiting for map",
+ pg, *this, this_instance_id);
return ihref.enter_blocker(
*this, pg.osdmap_gate, &decltype(pg.osdmap_gate)::wait_for_map,
m->get_min_epoch(), nullptr);
- }).then_interruptible([this, this_instance_id, &pg, &ihref](auto map) {
- LOG_PREFIX(ClientRequest::with_pg_int);
- DEBUGI("{}.{}: after wait_for_map", *this, this_instance_id);
+ }).then_interruptible(
+ [FNAME, this, this_instance_id, &pg, &ihref](auto map_epoch) {
+ DEBUGDPP("{}.{}: map epoch got {}, entering wait_for_active",
+ pg, *this, this_instance_id, map_epoch);
return ihref.enter_stage<interruptor>(client_pp(pg).wait_for_active, *this);
- }).then_interruptible([this, this_instance_id, &pg, &ihref]() {
- LOG_PREFIX(ClientRequest::with_pg_int);
- DEBUGI("{}.{}: after wait_for_active stage", *this, this_instance_id);
+ }).then_interruptible([FNAME, this, this_instance_id, &pg, &ihref]() {
+ DEBUGDPP("{}.{}: entered wait_for_active stage, waiting for active",
+ pg, *this, this_instance_id);
return ihref.enter_blocker(
*this,
pg.wait_for_active_blocker,
&decltype(pg.wait_for_active_blocker)::wait);
- }).then_interruptible([this, pgref, this_instance_id, &ihref]() mutable
- -> interruptible_future<> {
- LOG_PREFIX(ClientRequest::with_pg_int);
- DEBUGI("{}.{}: after wait_for_active", *this, this_instance_id);
+ }).then_interruptible(
+ [FNAME, this, pgref, this_instance_id, &ihref]() mutable
+ -> interruptible_future<> {
+ DEBUGDPP("{}.{}: pg active, entering process[_pg]_op",
+ *pgref, *this, this_instance_id);
if (is_pg_op()) {
return process_pg_op(pgref);
} else {
- return process_op(ihref, pgref);
+ return process_op(ihref, pgref, this_instance_id);
}
- }).then_interruptible([this, this_instance_id, &ihref] {
- logger().debug("{}.{}: complete", *this, this_instance_id);
+ }).then_interruptible([FNAME, this, this_instance_id, pgref, &ihref] {
+ DEBUGDPP("{}.{}: process[_pg]_op complete, completing handle",
+ *pgref, *this, this_instance_id);
return ihref.handle.complete();
- }).then_interruptible([this, this_instance_id, pgref] {
- LOG_PREFIX(ClientRequest::with_pg_int);
- DEBUGI("{}.{}: after process*", *this, this_instance_id);
+ }).then_interruptible([FNAME, this, this_instance_id, pgref] {
+ DEBUGDPP("{}.{}: process[_pg]_op complete,"
+ "removing request from orderer",
+ *pgref, *this, this_instance_id);
pgref->client_request_orderer.remove_request(*this);
complete_request();
});
- }, [this, this_instance_id, pgref](std::exception_ptr eptr) {
- LOG_PREFIX(ClientRequest::with_pg_int);
- // TODO: better debug output
- DEBUGI("{}.{}: interrupted {}", *this, this_instance_id, eptr);
- },
- pgref
- ).finally(
- [opref=std::move(opref), pgref,
- instance_handle=std::move(instance_handle), &ihref,
- this_instance_id, this] {
- logger().debug("{}.{}: exit", *this, this_instance_id);
- ihref.handle.exit();
- });
+ }, [FNAME, this, this_instance_id, pgref](std::exception_ptr eptr) {
+ DEBUGDPP("{}.{}: interrupted due to {}",
+ *pgref, *this, this_instance_id, eptr);
+ }, pgref).finally(
+ [this, FNAME, opref=std::move(opref), pgref=std::move(pgref),
+ this_instance_id, instance_handle=std::move(instance_handle), &ihref] {
+ DEBUGDPP("{}.{}: exit", *pgref, *this, this_instance_id);
+ ihref.handle.exit();
+ });
}
seastar::future<> ClientRequest::with_pg(
auto ClientRequest::reply_op_error(const Ref<PG>& pg, int err)
{
LOG_PREFIX(ClientRequest::reply_op_error);
- DEBUGI("{}: replying with error {}", *this, err);
+ DEBUGDPP("{}: replying with error {}", *pg, *this, err);
auto reply = crimson::make_message<MOSDOpReply>(
m.get(), err, pg->get_osdmap_epoch(),
m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK),
}
ClientRequest::interruptible_future<>
-ClientRequest::process_op(instance_handle_t &ihref, Ref<PG> &pg)
+ClientRequest::process_op(
+ instance_handle_t &ihref, Ref<PG> &pg, unsigned this_instance_id)
{
+ LOG_PREFIX(ClientRequest::process_op);
return ihref.enter_stage<interruptor>(
client_pp(*pg).recover_missing, *this
).then_interruptible([pg, this]() mutable {
return recover_missings(pg, m->get_hobj(), snaps_need_to_recover());
- }).then_interruptible([this, pg, &ihref]() mutable {
+ }).then_interruptible([FNAME, this, pg, this_instance_id, &ihref]() mutable {
+ DEBUGDPP("{}.{}: checking already_complete",
+ *pg, *this, this_instance_id);
return pg->already_complete(m->get_reqid()).then_interruptible(
- [this, pg, &ihref](auto completed) mutable
+ [FNAME, this, pg, this_instance_id, &ihref](auto completed) mutable
-> PG::load_obc_iertr::future<> {
if (completed) {
+ DEBUGDPP("{}.{}: already completed, sending reply",
+ *pg, *this, this_instance_id);
auto reply = crimson::make_message<MOSDOpReply>(
m.get(), completed->err, pg->get_osdmap_epoch(),
CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false);
// TODO: gate the crosscore sending
return conn->send_with_throttling(std::move(reply));
} else {
+ DEBUGDPP("{}.{}: not completed, entering get_obc stage",
+ *pg, *this, this_instance_id);
return ihref.enter_stage<interruptor>(client_pp(*pg).get_obc, *this
).then_interruptible(
- [this, pg, &ihref]() mutable -> PG::load_obc_iertr::future<> {
- LOG_PREFIX(ClientRequest::process_op);
- DEBUGI("{}: in get_obc stage", *this);
+ [FNAME, this, pg, this_instance_id, &ihref]() mutable
+ -> PG::load_obc_iertr::future<> {
+ DEBUGDPP("{}.{}: entered get_obc stage, about to wait_scrub",
+ *pg, *this, this_instance_id);
op_info.set_from_op(&*m, *pg->get_osdmap());
- return pg->with_locked_obc(
- m->get_hobj(), op_info,
- [this, pg, &ihref](auto head, auto obc) mutable {
- LOG_PREFIX(ClientRequest::process_op);
- DEBUGI("{}: got obc {}", *this, obc->obs);
- return ihref.enter_stage<interruptor>(
- client_pp(*pg).process, *this
- ).then_interruptible([this, pg, obc, &ihref]() mutable {
- return do_process(ihref, pg, obc);
- });
- });
+ return pg->with_locked_obc(
+ m->get_hobj(), op_info,
+ [FNAME, this, pg, this_instance_id, &ihref](
+ auto head, auto obc) mutable {
+ DEBUGDPP("{}.{}: got obc {}, entering process stage",
+ *pg, *this, this_instance_id, obc->obs);
+ return ihref.enter_stage<interruptor>(
+ client_pp(*pg).process, *this
+ ).then_interruptible(
+ [FNAME, this, pg, this_instance_id, obc, &ihref]() mutable {
+ DEBUGDPP("{}.{}: in process stage, calling do_process",
+ *pg, *this, this_instance_id);
+ return do_process(ihref, pg, obc, this_instance_id);
+ });
+ });
});
}
});
}).handle_error_interruptible(
- PG::load_obc_ertr::all_same_way([this, pg=std::move(pg)](const auto &code) {
- LOG_PREFIX(ClientRequest::process_op);
- ERRORI("ClientRequest saw error code {}", code);
+ PG::load_obc_ertr::all_same_way(
+ [FNAME, this, pg=std::move(pg), this_instance_id](const auto &code) {
+ DEBUGDPP("{}.{}: saw error code {}",
+ *pg, *this, this_instance_id, code);
assert(code.value() > 0);
return reply_op_error(pg, -code.value());
}));
ClientRequest::interruptible_future<>
ClientRequest::do_process(
instance_handle_t &ihref,
- Ref<PG>& pg, crimson::osd::ObjectContextRef obc)
+ Ref<PG>& pg, crimson::osd::ObjectContextRef obc,
+ unsigned this_instance_id)
{
LOG_PREFIX(ClientRequest::do_process);
if (m->has_flag(CEPH_OSD_FLAG_PARALLELEXEC)) {
if (pool.has_flag(pg_pool_t::FLAG_EIO)) {
// drop op on the floor; the client will handle returning EIO
if (m->has_flag(CEPH_OSD_FLAG_SUPPORTSPOOLEIO)) {
- DEBUGI("discarding op due to pool EIO flag");
+ DEBUGDPP("{}.{}: discarding op due to pool EIO flag",
+ *pg, *this, this_instance_id);
return seastar::now();
} else {
- DEBUGI("replying EIO due to pool EIO flag");
+ DEBUGDPP("{}.{}: replying EIO due to pool EIO flag",
+ *pg, *this, this_instance_id);
return reply_op_error(pg, -EIO);
}
}
} else if (m->get_hobj().oid.name.empty()) {
return reply_op_error(pg, -EINVAL);
} else if (pg->get_osdmap()->is_blocklisted(conn->get_peer_addr())) {
- logger().info("{} is blocklisted", conn->get_peer_addr());
+ DEBUGDPP("{}.{}: {} is blocklisted",
+ *pg, *this, this_instance_id, conn->get_peer_addr());
return reply_op_error(pg, -EBLOCKLISTED);
}
SnapContext snapc = get_snapc(pg,obc);
- if (m->has_flag(CEPH_OSD_FLAG_ORDERSNAP) &&
- snapc.seq < obc->ssc->snapset.seq) {
- DEBUGI("{} ORDERSNAP flag set and snapc seq {}",
- " < snapset seq {} on {}",
- __func__, snapc.seq, obc->ssc->snapset.seq,
- obc->obs.oi.soid);
+ if ((m->has_flag(CEPH_OSD_FLAG_ORDERSNAP)) &&
+ snapc.seq < obc->ssc->snapset.seq) {
+ DEBUGDPP("{}.{}: ORDERSNAP flag set "
+ "and snapc seq {} < snapset seq {} on {}",
+ *pg, *this, this_instance_id,
+ snapc.seq, obc->ssc->snapset.seq,
+ obc->obs.oi.soid);
return reply_op_error(pg, -EOLDSNAPC);
}
if (!pg->is_primary()) {
// primary can handle both normal ops and balanced reads
if (is_misdirected(*pg)) {
- TRACEI("do_process: dropping misdirected op");
+ DEBUGDPP("{}.{}: dropping misdirected op",
+ *pg, *this, this_instance_id);
return seastar::now();
} else if (const hobject_t& hoid = m->get_hobj();
!pg->get_peering_state().can_serve_replica_read(hoid)) {
- DEBUGI("{}: unstable write on replica, "
- "bouncing to primary",
- __func__);
+ DEBUGDPP("{}.{}: unstable write on replica, bouncing to primary",
+ *pg, *this, this_instance_id);
return reply_op_error(pg, -EAGAIN);
} else {
- DEBUGI("{}: serving replica read on oid {}",
- __func__, m->get_hobj());
+ DEBUGDPP("{}.{}: serving replica read on oid {}",
+ *pg, *this, this_instance_id, m->get_hobj());
}
}
return pg->do_osd_ops(m, conn, obc, op_info, snapc).safe_then_unpack_interruptible(
- [this, pg, &ihref](auto submitted, auto all_completed) mutable {
- logger().debug("do_process::{} in submitted", *this);
- return submitted.then_interruptible([this, pg, &ihref] {
- logger().debug("do_process::{} in enter_stage wait_repop", *this);
+ [FNAME, this, pg, this_instance_id, &ihref](
+ auto submitted, auto all_completed) mutable {
+ return submitted.then_interruptible(
+ [FNAME, this, pg, this_instance_id, &ihref] {
return ihref.enter_stage<interruptor>(client_pp(*pg).wait_repop, *this);
}).then_interruptible(
- [this, pg, all_completed=std::move(all_completed), &ihref]() mutable {
- logger().debug("do_process::{} in all_completed", *this);
+ [FNAME, this, pg, this_instance_id,
+ all_completed=std::move(all_completed), &ihref]() mutable {
return all_completed.safe_then_interruptible(
- [this, pg, &ihref](MURef<MOSDOpReply> reply) {
+ [FNAME, this, pg, this_instance_id, &ihref](
+ MURef<MOSDOpReply> reply) {
return ihref.enter_stage<interruptor>(client_pp(*pg).send_reply, *this
).then_interruptible(
- [this, reply=std::move(reply)]() mutable {
- LOG_PREFIX(ClientRequest::do_process);
- DEBUGI("{}: sending response", *this);
- // TODO: gate the crosscore sending
- return conn->send_with_throttling(std::move(reply));
- }
- );
- }, crimson::ct_error::eagain::handle([this, pg, &ihref]() mutable {
- return process_op(ihref, pg);
+ [FNAME, this, pg, this_instance_id,
+ reply=std::move(reply)]() mutable {
+ DEBUGDPP("{}.{}: sending response",
+ *pg, *this, this_instance_id);
+ return conn->send(std::move(reply));
+ });
+ }, crimson::ct_error::eagain::handle(
+ [FNAME, this, pg, this_instance_id, &ihref]() mutable {
+ return process_op(ihref, pg, this_instance_id);
}));
});
- }, crimson::ct_error::eagain::handle([this, pg, &ihref]() mutable {
- return process_op(ihref, pg);
- }));
+ }, crimson::ct_error::eagain::handle(
+ [FNAME, this, pg, this_instance_id, &ihref]() mutable {
+ return process_op(ihref, pg, this_instance_id);
+ }));
}
bool ClientRequest::is_misdirected(const PG& pg) const
if (pg->get_pgpool().info.is_pool_snaps_mode()) {
// use pool's snapc
snapc = pg->get_pgpool().snapc;
- DEBUGI("{} using pool's snapc snaps={}",
- __func__, snapc.snaps);
-
+ DEBUGDPP("{} using pool's snapc snaps={}",
+ *pg, *this, snapc.snaps);
} else {
// client specified snapc
snapc.seq = m->get_snap_seq();
snapc.snaps = m->get_snaps();
- DEBUGI("{} client specified snapc seq={} snaps={}",
- __func__, snapc.seq, snapc.snaps);
+ DEBUGDPP("{}: client specified snapc seq={} snaps={}",
+ *pg, *this, snapc.seq, snapc.snaps);
}
}
return snapc;