From: Yingxin Cheng Date: Wed, 20 Sep 2023 06:09:22 +0000 (+0800) Subject: crimson/osd/osd_operations: fix the usages of PipelineHandle::complete() and exit() X-Git-Tag: v19.0.0~141^2~6 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=4c7e55d84dd386ec2419cd139ed623919c346065;p=ceph.git crimson/osd/osd_operations: fix the usages of PipelineHandle::complete() and exit() complete() should be called to leave the last phase in the normal path, and exit() to be called in finally() to release the resources under all circumstances. Signed-off-by: Yingxin Cheng --- diff --git a/src/crimson/osd/osd_operations/background_recovery.cc b/src/crimson/osd/osd_operations/background_recovery.cc index 953ec9595dae..74bd238c987b 100644 --- a/src/crimson/osd/osd_operations/background_recovery.cc +++ b/src/crimson/osd/osd_operations/background_recovery.cc @@ -196,7 +196,11 @@ BackfillRecovery::do_recovery() peering_pp(*pg).process ).then_interruptible([this] { pg->get_recovery_handler()->dispatch_backfill_event(std::move(evt)); + return handle.complete(); + }).then_interruptible([] { return seastar::make_ready_future(false); + }).finally([this] { + handle.exit(); }); } diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index 9374fbde2cc0..f01f0c491f1a 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -143,6 +143,9 @@ seastar::future<> ClientRequest::with_pg_int( } else { return process_op(ihref, pgref); } + }).then_interruptible([this, this_instance_id, &ihref] { + logger().debug("{}.{}: complete", *this, this_instance_id); + return ihref.handle.complete(); }).then_interruptible([this, this_instance_id, pgref] { logger().debug("{}.{}: after process*", *this, this_instance_id); pgref->client_request_orderer.remove_request(*this); @@ -151,11 +154,15 @@ seastar::future<> ClientRequest::with_pg_int( }, [this, this_instance_id, pgref](std::exception_ptr eptr) { // TODO: better debug output logger().debug("{}.{}: interrupted {}", *this, this_instance_id, eptr); - }, pgref).finally( - [opref=std::move(opref), pgref=std::move(pgref), - instance_handle=std::move(instance_handle), &ihref] { - ihref.handle.exit(); - }); + }, + pgref + ).finally( + [opref=std::move(opref), pgref, + instance_handle=std::move(instance_handle), &ihref, + this_instance_id, this] { + logger().debug("{}.{}: exit", *this, this_instance_id); + ihref.handle.exit(); + }); } seastar::future<> ClientRequest::with_pg( diff --git a/src/crimson/osd/osd_operations/internal_client_request.cc b/src/crimson/osd/osd_operations/internal_client_request.cc index 87438d4a1468..790eb3f932d0 100644 --- a/src/crimson/osd/osd_operations/internal_client_request.cc +++ b/src/crimson/osd/osd_operations/internal_client_request.cc @@ -110,9 +110,14 @@ seastar::future<> InternalClientRequest::start() }); }); }); - }).handle_error_interruptible(PG::load_obc_ertr::all_same_way([] { - return seastar::now(); - })).then_interruptible([] { + }).si_then([this] { + logger().debug("{}: complete", *this); + return handle.complete(); + }).handle_error_interruptible( + PG::load_obc_ertr::all_same_way([] { + return seastar::now(); + }) + ).then_interruptible([] { return seastar::stop_iteration::yes; }); }, [this](std::exception_ptr eptr) { @@ -124,6 +129,9 @@ seastar::future<> InternalClientRequest::start() }, pg); }).then([this] { track_event(); + }).finally([this] { + logger().debug("{}: exit", *this); + handle.exit(); }); }); } diff --git a/src/crimson/osd/osd_operations/logmissing_request.cc b/src/crimson/osd/osd_operations/logmissing_request.cc index 739b46406500..ee83977cd8a2 100644 --- a/src/crimson/osd/osd_operations/logmissing_request.cc +++ b/src/crimson/osd/osd_operations/logmissing_request.cc @@ -72,8 +72,16 @@ seastar::future<> LogMissingRequest::with_pg( }); }).then_interruptible([this, pg](auto) { return pg->do_update_log_missing(req, conn); + }).then_interruptible([this] { + logger().debug("{}: complete", *this); + return handle.complete(); }); - }, [ref](std::exception_ptr) { return seastar::now(); }, pg); + }, [](std::exception_ptr) { + return seastar::now(); + }, pg).finally([this, ref] { + logger().debug("{}: exit", *this); + handle.exit(); + }); } } diff --git a/src/crimson/osd/osd_operations/logmissing_request_reply.cc b/src/crimson/osd/osd_operations/logmissing_request_reply.cc index b4bf2938e05b..16e61ab4a985 100644 --- a/src/crimson/osd/osd_operations/logmissing_request_reply.cc +++ b/src/crimson/osd/osd_operations/logmissing_request_reply.cc @@ -61,8 +61,17 @@ seastar::future<> LogMissingRequestReply::with_pg( IRef ref = this; return interruptor::with_interruption([this, pg] { - return pg->do_update_log_missing_reply(std::move(req)); - }, [ref](std::exception_ptr) { return seastar::now(); }, pg); + return pg->do_update_log_missing_reply(std::move(req) + ).then_interruptible([this] { + logger().debug("{}: complete", *this); + return handle.complete(); + }); + }, [](std::exception_ptr) { + return seastar::now(); + }, pg).finally([this, ref] { + logger().debug("{}: exit", *this); + handle.exit(); + }); } } diff --git a/src/crimson/osd/osd_operations/peering_event.cc b/src/crimson/osd/osd_operations/peering_event.cc index ea4662bd01e0..0712147ab2b7 100644 --- a/src/crimson/osd/osd_operations/peering_event.cc +++ b/src/crimson/osd/osd_operations/peering_event.cc @@ -85,8 +85,9 @@ seastar::future<> PeeringEvent::with_pg( return this->template enter_stage(peering_pp(*pg).process); }).then_interruptible([this, pg, &shard_services] { return pg->do_peering_event(evt, ctx - ).then_interruptible([this, pg, &shard_services] { - that()->get_handle().exit(); + ).then_interruptible([this] { + return that()->get_handle().complete(); + }).then_interruptible([this, pg, &shard_services] { return complete_rctx(shard_services, pg); }); }).then_interruptible([pg, &shard_services]() @@ -100,7 +101,10 @@ seastar::future<> PeeringEvent::with_pg( }); }, [this](std::exception_ptr ep) { logger().debug("{}: interrupted with {}", *this, ep); - }, pg); + }, pg).finally([this] { + logger().debug("{}: exit", *this); + that()->get_handle().exit(); + }); } template diff --git a/src/crimson/osd/osd_operations/pg_advance_map.cc b/src/crimson/osd/osd_operations/pg_advance_map.cc index 3706af810557..ba63212fc7fc 100644 --- a/src/crimson/osd/osd_operations/pg_advance_map.cc +++ b/src/crimson/osd/osd_operations/pg_advance_map.cc @@ -122,8 +122,12 @@ seastar::future<> PGAdvanceMap::start() return shard_services.send_pg_temp(); }); }); - }).then([this, ref=std::move(ref)] { + }).then([this] { logger().debug("{}: complete", *this); + return handle.complete(); + }).finally([this, ref=std::move(ref)] { + logger().debug("{}: exit", *this); + handle.exit(); }); } diff --git a/src/crimson/osd/osd_operations/recovery_subrequest.cc b/src/crimson/osd/osd_operations/recovery_subrequest.cc index 68655b8da517..dd310d8d7274 100644 --- a/src/crimson/osd/osd_operations/recovery_subrequest.cc +++ b/src/crimson/osd/osd_operations/recovery_subrequest.cc @@ -30,11 +30,17 @@ seastar::future<> RecoverySubRequest::with_pg( track_event(); IRef opref = this; return interruptor::with_interruption([this, pgref] { - return pgref->get_recovery_backend()->handle_recovery_op(m, conn); + return pgref->get_recovery_backend()->handle_recovery_op(m, conn + ).then_interruptible([this] { + logger().debug("{}: complete", *this); + return handle.complete(); + }); }, [](std::exception_ptr) { return seastar::now(); - }, pgref).finally([this, opref, pgref] { + }, pgref).finally([this, opref=std::move(opref), pgref] { + logger().debug("{}: exit", *this); track_event(); + handle.exit(); }); } diff --git a/src/crimson/osd/osd_operations/replicated_request.cc b/src/crimson/osd/osd_operations/replicated_request.cc index 09217575c8ff..7e16b2ebd06a 100644 --- a/src/crimson/osd/osd_operations/replicated_request.cc +++ b/src/crimson/osd/osd_operations/replicated_request.cc @@ -71,10 +71,16 @@ seastar::future<> RepRequest::with_pg( }); }).then_interruptible([this, pg] (auto) { return pg->handle_rep_op(req); + }).then_interruptible([this] { + logger().debug("{}: complete", *this); + return handle.complete(); }); - }, [ref](std::exception_ptr) { + }, [](std::exception_ptr) { return seastar::now(); - }, pg); + }, pg).finally([this, ref=std::move(ref)] { + logger().debug("{}: exit", *this); + handle.exit(); + }); } } diff --git a/src/crimson/osd/osd_operations/snaptrim_event.cc b/src/crimson/osd/osd_operations/snaptrim_event.cc index e4a1b04df142..ffd43d736ad3 100644 --- a/src/crimson/osd/osd_operations/snaptrim_event.cc +++ b/src/crimson/osd/osd_operations/snaptrim_event.cc @@ -80,27 +80,15 @@ void SnapTrimEvent::dump_detail(Formatter *f) const f->close_section(); } -SnapTrimEvent::snap_trim_ertr::future -SnapTrimEvent::start() -{ - logger().debug("{}: {}", *this, __func__); - return with_pg( - pg->get_shard_services(), pg - ).finally([ref=IRef{this}, this] { - logger().debug("{}: complete", *ref); - return handle.complete(); - }); -} - CommonPGPipeline& SnapTrimEvent::client_pp() { return pg->request_pg_pipeline; } SnapTrimEvent::snap_trim_ertr::future -SnapTrimEvent::with_pg( - ShardServices &shard_services, Ref _pg) +SnapTrimEvent::start() { + ShardServices &shard_services = pg->get_shard_services(); return interruptor::with_interruption([&shard_services, this] { return enter_stage( client_pp().wait_for_active @@ -176,7 +164,7 @@ SnapTrimEvent::with_pg( return subop_blocker.wait_completion(); }).finally([this] { pg->snaptrim_mutex.unlock(); - }).safe_then_interruptible([this] { + }).si_then([this] { if (!needs_pause) { return interruptor::now(); } @@ -193,17 +181,24 @@ SnapTrimEvent::with_pg( return seastar::sleep( std::chrono::milliseconds(std::lround(time_to_sleep * 1000))); }); - }).safe_then_interruptible([this] { + }).si_then([this] { logger().debug("{}: all completed", *this); return snap_trim_iertr::make_ready_future( seastar::stop_iteration::no); }); + }).si_then([this](auto stop) { + return handle.complete().then([stop] { + return snap_trim_iertr::make_ready_future(stop); + }); }); }); }, [this](std::exception_ptr eptr) -> snap_trim_ertr::future { logger().debug("{}: interrupted {}", *this, eptr); return crimson::ct_error::eagain::make(); - }, pg); + }, pg).finally([this] { + logger().debug("{}: exit", *this); + handle.exit(); + }); } @@ -212,18 +207,6 @@ CommonPGPipeline& SnapTrimObjSubEvent::client_pp() return pg->request_pg_pipeline; } -SnapTrimObjSubEvent::remove_or_update_iertr::future<> -SnapTrimObjSubEvent::start() -{ - logger().debug("{}: start", *this); - return with_pg( - pg->get_shard_services(), pg - ).finally([ref=IRef{this}, this] { - logger().debug("{}: complete", *ref); - return handle.complete(); - }); -} - SnapTrimObjSubEvent::remove_or_update_iertr::future<> SnapTrimObjSubEvent::remove_clone( ObjectContextRef obc, @@ -466,7 +449,7 @@ SnapTrimObjSubEvent::remove_or_update( *this, coid, old_snaps, new_snaps); ret = adjust_snaps(obc, head_obc, new_snaps, txn, log_entries); } - return std::move(ret).safe_then_interruptible( + return std::move(ret).si_then( [&txn, obc, num_objects_before_trim, log_entries=std::move(log_entries), head_obc=std::move(head_obc), this]() mutable { osd_op_p.at_version = pg->next_version(); @@ -484,7 +467,7 @@ SnapTrimObjSubEvent::remove_or_update( // num_objects_before_trim - delta_stats.num_objects; //add_objects_trimmed_count(num_objects_trimmed); } - }).safe_then_interruptible( + }).si_then( [&txn, log_entries=std::move(log_entries)] () mutable { return remove_or_update_iertr::make_ready_future( std::make_pair(std::move(txn), std::move(log_entries))); @@ -493,8 +476,7 @@ SnapTrimObjSubEvent::remove_or_update( } SnapTrimObjSubEvent::remove_or_update_iertr::future<> -SnapTrimObjSubEvent::with_pg( - ShardServices &shard_services, Ref _pg) +SnapTrimObjSubEvent::start() { return enter_stage( client_pp().wait_for_active @@ -544,10 +526,16 @@ SnapTrimObjSubEvent::with_pg( }); }); }); + }).si_then([this] { + logger().debug("{}: completed", *this); + return handle.complete(); }).handle_error_interruptible( remove_or_update_iertr::pass_further{}, crimson::ct_error::assert_all{"unexpected error in SnapTrimObjSubEvent"} ); + }).finally([this] { + logger().debug("{}: exit", *this); + handle.exit(); }); } diff --git a/src/crimson/osd/osd_operations/snaptrim_event.h b/src/crimson/osd/osd_operations/snaptrim_event.h index a3a970a04c7d..afb24952a045 100644 --- a/src/crimson/osd/osd_operations/snaptrim_event.h +++ b/src/crimson/osd/osd_operations/snaptrim_event.h @@ -53,8 +53,6 @@ public: void print(std::ostream &) const final; void dump_detail(ceph::Formatter* f) const final; snap_trim_ertr::future start(); - snap_trim_ertr::future with_pg( - ShardServices &shard_services, Ref pg); private: CommonPGPipeline& client_pp(); @@ -140,8 +138,6 @@ public: void print(std::ostream &) const final; void dump_detail(ceph::Formatter* f) const final; remove_or_update_iertr::future<> start(); - remove_or_update_iertr::future<> with_pg( - ShardServices &shard_services, Ref pg); CommonPGPipeline& client_pp(); diff --git a/src/crimson/osd/pg_shard_manager.h b/src/crimson/osd/pg_shard_manager.h index 71a9cf1a9c80..e080dc43e4ad 100644 --- a/src/crimson/osd/pg_shard_manager.h +++ b/src/crimson/osd/pg_shard_manager.h @@ -340,7 +340,14 @@ public: opref.get_connection_pipeline().get_pg); }).then([this, &opref] { return get_pg_to_shard_mapping().maybe_create_pg(opref.get_pgid()); - }).then([this, &logger, op=std::move(op)](auto core) mutable { + }).then_wrapped([this, &logger, op=std::move(op)](auto fut) mutable { + if (unlikely(fut.failed())) { + logger.error("{}: failed before with_pg", *op); + op->get_handle().exit(); + return seastar::make_exception_future<>(fut.get_exception()); + } + + auto core = fut.get(); logger.debug("{}: can_create={}, target-core={}", *op, T::can_create(), core); return this->template with_remote_shard_state_and_op(