From 6e2b0299dfc1457f3eda31cd41409652100a69bc Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Wed, 7 Feb 2024 02:05:42 +0000 Subject: [PATCH] crimson/.../client_request: convert ClientRequest::do_process to coroutine Signed-off-by: Samuel Just (cherry picked from commit 30237472bea485cb57f5c14031fb545aed97da15) --- .../osd/osd_operations/client_request.cc | 122 +++++++++--------- .../osd/osd_operations/client_request.h | 7 +- 2 files changed, 68 insertions(+), 61 deletions(-) diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index 2514c08993f..22c32dd4e50 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -284,23 +284,30 @@ ClientRequest::process_op( [FNAME, this, pg, this_instance_id, obc, &ihref]() mutable { DEBUGDPP("{}.{}: in process stage, calling do_process", *pg, *this, this_instance_id); - return do_process(ihref, pg, obc, this_instance_id); - }); - } - ).handle_error_interruptible( - PG::load_obc_ertr::all_same_way( - [FNAME, this, pg=std::move(pg), this_instance_id]( - const auto &code - ) -> interruptible_future<> { - DEBUGDPP("{}.{}: saw error code {}", - *pg, *this, this_instance_id, code); - assert(code.value() > 0); - return reply_op_error(pg, -code.value()); - }) - ); + return do_process( + ihref, pg, obc, this_instance_id + ).handle_error_interruptible( + crimson::ct_error::eagain::handle( + [this, pg, this_instance_id, &ihref]() mutable { + return process_op(ihref, pg, this_instance_id); + }) + ); + } + ); + }).handle_error_interruptible( + PG::load_obc_ertr::all_same_way( + [FNAME, this, pg=std::move(pg), this_instance_id]( + const auto &code + ) -> interruptible_future<> { + DEBUGDPP("{}.{}: saw error code {}", + *pg, *this, this_instance_id, code); + assert(code.value() > 0); + return reply_op_error(pg, -code.value()); + }) + ); } -ClientRequest::interruptible_future<> +ClientRequest::do_process_iertr::future<> ClientRequest::do_process( instance_handle_t &ihref, Ref pg, crimson::osd::ObjectContextRef obc, @@ -308,7 +315,8 @@ ClientRequest::do_process( { LOG_PREFIX(ClientRequest::do_process); if (m->has_flag(CEPH_OSD_FLAG_PARALLELEXEC)) { - return reply_op_error(pg, -EINVAL); + co_await reply_op_error(pg, -EINVAL); + co_return; } const pg_pool_t pool = pg->get_pgpool().info; if (pool.has_flag(pg_pool_t::FLAG_EIO)) { @@ -316,36 +324,44 @@ ClientRequest::do_process( if (m->has_flag(CEPH_OSD_FLAG_SUPPORTSPOOLEIO)) { DEBUGDPP("{}.{}: discarding op due to pool EIO flag", *pg, *this, this_instance_id); - return seastar::now(); + co_return; } else { DEBUGDPP("{}.{}: replying EIO due to pool EIO flag", *pg, *this, this_instance_id); - return reply_op_error(pg, -EIO); + co_await reply_op_error(pg, -EIO); + co_return; } } if (m->get_oid().name.size() > crimson::common::local_conf()->osd_max_object_name_len) { - return reply_op_error(pg, -ENAMETOOLONG); + co_await reply_op_error(pg, -ENAMETOOLONG); + co_return; } else if (m->get_hobj().get_key().size() > crimson::common::local_conf()->osd_max_object_name_len) { - return reply_op_error(pg, -ENAMETOOLONG); + co_await reply_op_error(pg, -ENAMETOOLONG); + co_return; } else if (m->get_hobj().nspace.size() > crimson::common::local_conf()->osd_max_object_namespace_len) { - return reply_op_error(pg, -ENAMETOOLONG); + co_await reply_op_error(pg, -ENAMETOOLONG); + co_return; } else if (m->get_hobj().oid.name.empty()) { - return reply_op_error(pg, -EINVAL); + co_await reply_op_error(pg, -EINVAL); + co_return; } else if (m->get_hobj().is_internal_pg_local()) { // clients are not allowed to write to hobject_t::INTERNAL_PG_LOCAL_NS - return reply_op_error(pg, -EINVAL); + co_await reply_op_error(pg, -EINVAL); + co_return; } else if (pg->get_osdmap()->is_blocklisted( get_foreign_connection().get_peer_addr())) { DEBUGDPP("{}.{}: {} is blocklisted", *pg, *this, this_instance_id, get_foreign_connection().get_peer_addr()); - return reply_op_error(pg, -EBLOCKLISTED); + co_await reply_op_error(pg, -EBLOCKLISTED); + co_return; } if (!obc->obs.exists && !op_info.may_write()) { - return reply_op_error(pg, -ENOENT); + co_await reply_op_error(pg, -ENOENT); + co_return; } SnapContext snapc = get_snapc(*pg,obc); @@ -357,7 +373,8 @@ ClientRequest::do_process( *pg, *this, this_instance_id, snapc.seq, obc->ssc->snapset.seq, obc->obs.oi.soid); - return reply_op_error(pg, -EOLDSNAPC); + co_await reply_op_error(pg, -EOLDSNAPC); + co_return; } if (!pg->is_primary()) { @@ -365,50 +382,35 @@ ClientRequest::do_process( if (is_misdirected(*pg)) { DEBUGDPP("{}.{}: dropping misdirected op", *pg, *this, this_instance_id); - return seastar::now(); + co_return; } else if (const hobject_t& hoid = m->get_hobj(); !pg->get_peering_state().can_serve_replica_read(hoid)) { DEBUGDPP("{}.{}: unstable write on replica, bouncing to primary", *pg, *this, this_instance_id); - return reply_op_error(pg, -EAGAIN); + co_await reply_op_error(pg, -EAGAIN); + co_return; } else { DEBUGDPP("{}.{}: serving replica read on oid {}", *pg, *this, this_instance_id, m->get_hobj()); } } - return pg->do_osd_ops( + + auto [submitted, all_completed] = co_await pg->do_osd_ops( m, r_conn, obc, op_info, snapc - ).safe_then_unpack_interruptible( - [FNAME, this, pg, this_instance_id, &ihref]( - auto submitted, auto all_completed) mutable { - return submitted.then_interruptible( - [this, pg, &ihref] { - return ihref.enter_stage(client_pp(*pg).wait_repop, *this); - }).then_interruptible( - [FNAME, this, pg, this_instance_id, - all_completed=std::move(all_completed), &ihref]() mutable { - return all_completed.safe_then_interruptible( - [FNAME, this, pg, this_instance_id, &ihref]( - MURef reply) { - return ihref.enter_stage(client_pp(*pg).send_reply, *this - ).then_interruptible( - [FNAME, this, pg, this_instance_id, - reply=std::move(reply)]() mutable { - DEBUGDPP("{}.{}: sending response", - *pg, *this, this_instance_id); - // TODO: gate the crosscore sending - return get_foreign_connection( - ).send_with_throttling(std::move(reply)); - }); - }, crimson::ct_error::eagain::handle( - [this, pg, this_instance_id, &ihref]() mutable { - return process_op(ihref, pg, this_instance_id); - })); - }); - }, crimson::ct_error::eagain::handle( - [this, pg, this_instance_id, &ihref]() mutable { - return process_op(ihref, pg, this_instance_id); - })); + ); + co_await std::move(submitted); + + co_await ihref.enter_stage(client_pp(*pg).wait_repop, *this); + + auto reply = co_await std::move(all_completed); + + co_await ihref.enter_stage(client_pp(*pg).send_reply, *this); + DEBUGDPP("{}.{}: sending response", + *pg, *this, this_instance_id); + // TODO: gate the crosscore sending + co_await interruptor::make_interruptible( + get_foreign_connection().send_with_throttling(std::move(reply)) + ); } bool ClientRequest::is_misdirected(const PG& pg) const diff --git a/src/crimson/osd/osd_operations/client_request.h b/src/crimson/osd/osd_operations/client_request.h index 358a6bbe44e..eb27f912964 100644 --- a/src/crimson/osd/osd_operations/client_request.h +++ b/src/crimson/osd/osd_operations/client_request.h @@ -267,7 +267,12 @@ private: interruptible_future<> with_sequencer(FuncT&& func); interruptible_future<> reply_op_error(const Ref& pg, int err); - interruptible_future<> do_process( + + using do_process_iertr = + ::crimson::interruptible::interruptible_errorator< + ::crimson::osd::IOInterruptCondition, + ::crimson::errorator>; + do_process_iertr::future<> do_process( instance_handle_t &ihref, Ref pg, crimson::osd::ObjectContextRef obc, -- 2.39.5