From 1d98e8dab6162aafeede62c3a4be6bef7d5b926d Mon Sep 17 00:00:00 2001 From: Matan Breizman Date: Tue, 7 Nov 2023 10:24:34 +0000 Subject: [PATCH] crimson/osd/pg: move submit_error_log to do_osd_ops_execute Previously, submit_error_log was chained to failure_func returned future. Now submit_error_log is called from within do_osd_ops_execute Fixes: https://tracker.ceph.com/issues/61651 Signed-off-by: Matan Breizman --- src/crimson/osd/pg.cc | 112 ++++++++++++++++++++++++------------------ src/crimson/osd/pg.h | 3 ++ 2 files changed, 68 insertions(+), 47 deletions(-) diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 076cb1b49d97f..c61fd541e9d37 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -805,6 +805,9 @@ template PG::do_osd_ops_iertr::future> PG::do_osd_ops_execute( seastar::lw_shared_ptr ox, + ObjectContextRef obc, + const OpInfo &op_info, + Ref m, std::vector& ops, SuccessFunc&& success_func, FailureFunc&& failure_func) @@ -814,6 +817,17 @@ PG::do_osd_ops_execute( return obc_loader.reload_obc(obc).handle_error_interruptible( load_obc_ertr::assert_all{"can't live with object state messed up"}); }); + auto maybe_submit_error_log = [&, op_info, m, obc] + (const std::error_code& e, const ceph_tid_t& rep_tid) { + // call submit_error_log only for non-internal clients + if constexpr (!std::is_same_v) { + if(op_info.may_write()) { + return submit_error_log(m, op_info, obc, e, rep_tid); + } + } + return seastar::now(); + }; + auto error_func_ptr = seastar::make_lw_shared(std::move(maybe_submit_error_log)); auto failure_func_ptr = seastar::make_lw_shared(std::move(failure_func)); return interruptor::do_for_each(ops, [ox](OSDOp& osd_op) { logger().debug( @@ -873,7 +887,7 @@ PG::do_osd_ops_execute( std::move(log_entries)); }); }).safe_then_unpack_interruptible( - [success_func=std::move(success_func), rollbacker, this, failure_func_ptr] + [success_func=std::move(success_func), error_func_ptr, rollbacker, this, failure_func_ptr] (auto submitted_fut, auto _all_completed_fut) mutable { auto all_completed_fut = _all_completed_fut.safe_then_interruptible_tuple( @@ -910,23 +924,27 @@ PG::do_osd_ops_execute( std::move(all_completed_fut) ); }, OpsExecuter::osd_op_errorator::all_same_way( - [this, rollbacker, failure_func_ptr] + [this, error_func_ptr, rollbacker, failure_func_ptr] (const std::error_code& e) mutable { - auto submitted_fut = seastar::now(); + PG::interruptible_future<> maybe_rollback_fut = seastar::now(); ceph_tid_t rep_tid = shard_services.get_tid(); - auto all_completed_fut = e.value() == ENOENT ? - (*failure_func_ptr)(e, rep_tid, true) : - rollbacker.rollback_obc_if_modified(e).then_interruptible( - [e, failure_func_ptr, rep_tid] { - return (*failure_func_ptr)(e, rep_tid, true); - }); + if (e.value() == ENOENT) { + maybe_rollback_fut = rollbacker.rollback_obc_if_modified(e); + } - return PG::do_osd_ops_iertr::make_ready_future>( - std::move(submitted_fut), - std::move(all_completed_fut) - ); + return maybe_rollback_fut.then_interruptible( + [error_func_ptr, e, rep_tid, failure_func_ptr] { + // record error log + return (*error_func_ptr)(e, rep_tid).then( + [failure_func_ptr, e, rep_tid] { + return PG::do_osd_ops_iertr::make_ready_future>( + std::move(seastar::now()), + std::move((*failure_func_ptr)(e, rep_tid, true)) + ); + }); + }); })); } @@ -1016,6 +1034,9 @@ PG::do_osd_ops( return do_osd_ops_execute>( seastar::make_lw_shared( Ref{this}, obc, op_info, *m, conn, snapc), + obc, + op_info, + m, m->ops, // success_func [this, m, obc, may_write = op_info.may_write(), @@ -1061,43 +1082,37 @@ PG::do_osd_ops( (const std::error_code& e, const ceph_tid_t& rep_tid, bool record_error) { logger().error("do_osd_ops_execute::failure_func {} got error: {} record_error: {}", *m, e, record_error); - auto error_log_fut = seastar::now(); epoch_t epoch = get_osdmap_epoch(); auto last_complete = peering_state.get_info().last_complete; - if (op_info.may_write()) { - error_log_fut = submit_error_log(m, op_info, obc, e, rep_tid); - } - return error_log_fut.then([m, e, epoch, &op_info, rep_tid, last_complete, this] { - auto fut = seastar::now(); - if (record_error && !peering_state.pg_has_reset_since(epoch) && op_info.may_write()) { - logger().debug("do_osd_ops_execute::failure_func finding rep_tid {}", - rep_tid); - ceph_assert(log_entry_version.contains(rep_tid)); - auto it = log_entry_update_waiting_on.find(rep_tid); - ceph_assert(it != log_entry_update_waiting_on.end()); - auto it2 = it->second.waiting_on.find(pg_whoami); - ceph_assert(it2 != it->second.waiting_on.end()); - it->second.waiting_on.erase(it2); - if (it->second.waiting_on.empty()) { - log_entry_update_waiting_on.erase(it); + auto fut = seastar::now(); + if (record_error && !peering_state.pg_has_reset_since(epoch) && op_info.may_write()) { + logger().debug("do_osd_ops_execute::failure_func finding rep_tid {}", + rep_tid); + ceph_assert(log_entry_version.contains(rep_tid)); + auto it = log_entry_update_waiting_on.find(rep_tid); + ceph_assert(it != log_entry_update_waiting_on.end()); + auto it2 = it->second.waiting_on.find(pg_whoami); + ceph_assert(it2 != it->second.waiting_on.end()); + it->second.waiting_on.erase(it2); + if (it->second.waiting_on.empty()) { + log_entry_update_waiting_on.erase(it); + peering_state.complete_write(log_entry_version[rep_tid], last_complete); + log_entry_version.erase(rep_tid); + logger().debug("do_osd_ops_execute::failure_func write complete," + " erasing rep_tid {}", rep_tid); + + } else { + fut = it->second.all_committed.get_shared_future().then( + [this, last_complete, rep_tid] { + logger().debug("do_osd_ops_execute::failure_func awaited {}", rep_tid); peering_state.complete_write(log_entry_version[rep_tid], last_complete); - log_entry_version.erase(rep_tid); - logger().debug("do_osd_ops_execute::failure_func write complete," - " erasing rep_tid {}", rep_tid); - - } else { - fut = it->second.all_committed.get_shared_future().then( - [this, last_complete, rep_tid] { - logger().debug("do_osd_ops_execute::failure_func awaited {}", rep_tid); - peering_state.complete_write(log_entry_version[rep_tid], last_complete); - ceph_assert(!log_entry_update_waiting_on.contains(rep_tid)); - return seastar::now(); - }); - } + ceph_assert(!log_entry_update_waiting_on.contains(rep_tid)); + return seastar::now(); + }); } - return fut.then([this, m, e] { - return log_reply(m, e); - }); + } + return fut.then([this, m, e] { + return log_reply(m, e); }); }); } @@ -1147,12 +1162,15 @@ PG::do_osd_ops( return do_osd_ops_execute( seastar::make_lw_shared( Ref{this}, - std::move(obc), + obc, op_info, msg_params, msg_params.get_connection(), SnapContext{} ), + obc, + op_info, + Ref(), ops, // success_func [] { diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 93e3ae82ec6eb..711270e4c5e79 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -583,6 +583,9 @@ private: template do_osd_ops_iertr::future> do_osd_ops_execute( seastar::lw_shared_ptr ox, + ObjectContextRef obc, + const OpInfo &op_info, + Ref m, std::vector& ops, SuccessFunc&& success_func, FailureFunc&& failure_func); -- 2.39.5