From e730737e8b84924829343e92460acc0c647dd0d3 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Mon, 8 Feb 2021 18:19:25 +0800 Subject: [PATCH] crimson/osd: extract methods out of ClientRequest::process_op() * do_recover_missing() * do_process() for better readability Signed-off-by: Kefu Chai --- .../osd/osd_operations/client_request.cc | 120 ++++++++++-------- .../osd/osd_operations/client_request.h | 12 +- 2 files changed, 73 insertions(+), 59 deletions(-) diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index 3143dfc13ffab..6729ad735e108 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -116,63 +116,21 @@ seastar::future<> ClientRequest::process_pg_op( }); } -seastar::future<> ClientRequest::process_op( - Ref &pgref) +seastar::future<> ClientRequest::process_op(Ref &pg) { - PG& pg = *pgref; - return with_blocking_future( - handle.enter(pp(pg).recover_missing) - ).then([this, &pg, pgref] { - eversion_t ver; - const hobject_t& soid = m->get_hobj(); - logger().debug("{} check for recovery, {}", *this, soid); - if (pg.is_unreadable_object(soid, &ver) || - pg.is_degraded_or_backfilling_object(soid)) { - logger().debug("{} need to wait for recovery, {}", *this, soid); - if (pg.get_recovery_backend()->is_recovering(soid)) { - return pg.get_recovery_backend()->get_recovering(soid).wait_for_recovered(); - } else { - auto [op, fut] = osd.get_shard_services().start_operation( - soid, ver, pgref, osd.get_shard_services(), pg.get_osdmap_epoch()); - return std::move(fut); - } - } - return seastar::now(); - }).then([this, &pg] { - return with_blocking_future(handle.enter(pp(pg).get_obc)); - }).then([this, &pg, &pgref]() -> PG::load_obc_ertr::future<> { - op_info.set_from_op(&*m, *pg.get_osdmap()); - return pg.with_locked_obc(m, op_info, this, [this, &pg, &pgref](auto obc) { - return with_blocking_future( - handle.enter(pp(pg).process) - ).then([this, &pg, obc]() - -> crimson::errorator::future> { - if (!pg.is_primary()) { - // primary can handle both normal ops and balanced reads - if (is_misdirected(pg)) { - logger().trace("process_op: dropping misdirected op"); - return seastar::make_ready_future>(); - } else if (const hobject_t& hoid = m->get_hobj(); - !pg.get_peering_state().can_serve_replica_read(hoid)) { - auto reply = make_message( - m.get(), -EAGAIN, pg.get_osdmap_epoch(), - m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK), - !m->has_flag(CEPH_OSD_FLAG_RETURNVEC)); - return seastar::make_ready_future>(std::move(reply)); - } - } - return pg.do_osd_ops(m, obc, op_info); - }).safe_then([this](Ref reply) { - if (reply) { - return conn->send(std::move(reply)); - } else { - return seastar::now(); - } - }, crimson::ct_error::eagain::handle([this, &pgref] { - return process_op(pgref); - })); + return with_blocking_future(handle.enter(pp(*pg).recover_missing)).then([&] { + return do_recover_missing(pg); + }).then([&] { + return with_blocking_future(handle.enter(pp(*pg).get_obc)); + }).then([this, &pg]() -> PG::load_obc_ertr::future<> { + op_info.set_from_op(&*m, *pg->get_osdmap()); + return pg->with_locked_obc(m, op_info, this, [this, &pg](auto obc) { + return with_blocking_future(handle.enter(pp(*pg).process)).then( + [this, &pg, obc] { + return do_process(pg, obc); + }); }); - }).safe_then([pgref=std::move(pgref)] { + }).safe_then([pg=std::move(pg)] { return seastar::now(); }, PG::load_obc_ertr::all_same_way([](auto &code) { logger().error("ClientRequest saw error code {}", code); @@ -180,6 +138,58 @@ seastar::future<> ClientRequest::process_op( })); } +seastar::future<> ClientRequest::do_recover_missing(Ref& pg) +{ + eversion_t ver; + const hobject_t& soid = m->get_hobj(); + logger().debug("{} check for recovery, {}", *this, soid); + if (!pg->is_unreadable_object(soid, &ver) && + !pg->is_degraded_or_backfilling_object(soid)) { + return seastar::now(); + } + logger().debug("{} need to wait for recovery, {}", *this, soid); + if (pg->get_recovery_backend()->is_recovering(soid)) { + return pg->get_recovery_backend()->get_recovering(soid).wait_for_recovered(); + } else { + auto [op, fut] = + osd.get_shard_services().start_operation( + soid, ver, pg, osd.get_shard_services(), pg->get_osdmap_epoch()); + return std::move(fut); + } +} + +seastar::future<> +ClientRequest::do_process(Ref& pg, crimson::osd::ObjectContextRef obc) +{ + using do_ops_return_t = + crimson::errorator::future>; + return [&pg, obc]() -> do_ops_return_t { + if (!pg->is_primary()) { + // primary can handle both normal ops and balanced reads + if (is_misdirected(*pg)) { + logger().trace("process_op: dropping misdirected op"); + return seastar::make_ready_future>(); + } else if (const hobject_t& hoid = m->get_hobj(); + !pg->get_peering_state().can_serve_replica_read(hoid)) { + auto reply = make_message( + m.get(), -EAGAIN, pg->get_osdmap_epoch(), + m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK), + !m->has_flag(CEPH_OSD_FLAG_RETURNVEC)); + return seastar::make_ready_future>(std::move(reply)); + } + } + return pg->do_osd_ops(m, obc, op_info); + }().safe_then([this](Ref reply) { + if (reply) { + return conn->send(std::move(reply)); + } else { + return seastar::now(); + } + }, crimson::ct_error::eagain::handle([this, &pg] { + return process_op(pg); + })); +} + bool ClientRequest::is_misdirected(const PG& pg) const { // otherwise take a closer look diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h index 099a8067ee662..b721958508346 100644 --- a/src/crimson/osd/osd_operations/client_request.h +++ b/src/crimson/osd/osd_operations/client_request.h @@ -5,6 +5,7 @@ #include "osd/osd_op_util.h" #include "crimson/net/Connection.h" +#include "crimson/osd/object_context.h" #include "crimson/osd/osd_operation.h" #include "crimson/common/type_helpers.h" #include "messages/MOSDOp.h" @@ -60,10 +61,13 @@ public: seastar::future<> start(); private: - seastar::future<> process_pg_op( - Ref &pg); - seastar::future<> process_op( - Ref &pg); + seastar::future<> process_pg_op(Ref& pg); + seastar::future<> process_op(Ref& pg); + seastar::future<> do_recover_missing(Ref& pgref); + seastar::future<> do_process( + Ref& pg, + crimson::osd::ObjectContextRef obc); + bool is_pg_op() const; ConnectionPipeline &cp(); -- 2.39.5