From: Adam Emerson Date: Wed, 1 May 2024 15:41:55 +0000 (-0400) Subject: rgw/multisite/datalog: Orderly shutdown X-Git-Tag: testing/wip-vshankar-testing-20250407.170244-debug~16^2~10 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=2eae9ad492e766fb8ef557f553b19edff4e415e5;p=ceph-ci.git rgw/multisite/datalog: Orderly shutdown Signed-off-by: Adam Emerson --- diff --git a/src/rgw/driver/rados/rgw_datalog.cc b/src/rgw/driver/rados/rgw_datalog.cc index c292d724d1f..f3a2b59fcbb 100644 --- a/src/rgw/driver/rados/rgw_datalog.cc +++ b/src/rgw/driver/rados/rgw_datalog.cc @@ -10,8 +10,11 @@ #include #include #include +#include + #include #include + #include #include "common/async/parallel_for_each.h" @@ -467,9 +470,9 @@ RGWDataChangesLog::start(const DoutPrefixProvider *dpp, bool watch, bool renew) { - if (!log_data) { - co_return; - } + down_flag = false; + cancel_strand = asio::make_strand(rados->get_executor()); + auto defbacking = to_log_type( cct->_conf.get_val("rgw_default_data_log_backing")); // Should be guaranteed by `set_enum_allowed` @@ -497,11 +500,17 @@ RGWDataChangesLog::start(const DoutPrefixProvider *dpp, throw; } + if (!log_data) { + co_return; + } + if (renew) { - asio::co_spawn(co_await asio::this_coro::executor, - renew_run(renew_signal), - asio::bind_cancellation_slot(renew_signal->slot(), - asio::detached)); + asio::co_spawn( + co_await asio::this_coro::executor, + renew_run(renew_signal), + asio::bind_cancellation_slot(renew_signal->slot(), + asio::bind_executor(*cancel_strand, + asio::detached))); } if (watch) { // Establish watch here so we won't be 'started up' until we're watching. @@ -511,18 +520,22 @@ RGWDataChangesLog::start(const DoutPrefixProvider *dpp, throw sys::system_error{ENOTCONN, sys::generic_category(), "Unable to establish recovery watch!"}; } - asio::co_spawn(co_await asio::this_coro::executor, - watch_loop(watch_signal), - asio::bind_cancellation_slot(watch_signal->slot(), - asio::detached)); + asio::co_spawn( + co_await asio::this_coro::executor, + watch_loop(watch_signal), + asio::bind_cancellation_slot(watch_signal->slot(), + asio::bind_executor(*cancel_strand, + asio::detached))); } if (recovery) { // Recovery can run concurrent with normal operation, so we don't // have to block startup while we do all that I/O. - asio::co_spawn(co_await asio::this_coro::executor, - recover(dpp, recovery_signal), - asio::bind_cancellation_slot(recovery_signal->slot(), - asio::detached)); + asio::co_spawn( + co_await asio::this_coro::executor, + recover(dpp, recovery_signal), + asio::bind_cancellation_slot(recovery_signal->slot(), + asio::bind_executor(*cancel_strand, + asio::detached))); } co_return; } @@ -663,7 +676,8 @@ asio::awaitable RGWDataChangesLog::watch_loop(decltype(watch_signal)) { << "recovery won't decrement semaphores." << dendl; continue; } - if (going_down()) { + if (going_down() || e.code() == asio::error::operation_aborted){ + need_rewatch = false; break; } else { need_rewatch = true; @@ -1328,23 +1342,78 @@ bool RGWDataChangesLog::going_down() const return down_flag; } -RGWDataChangesLog::~RGWDataChangesLog() { - shutdown(); -} - -void RGWDataChangesLog::shutdown() { +asio::awaitable RGWDataChangesLog::shutdown() { + DoutPrefix dp{cct, ceph_subsys_rgw, "Datalog Shutdown"}; if (down_flag) { - return; + co_return; } down_flag = true; renew_stop(); // Revisit this later if (renew_signal) - renew_signal->emit(asio::cancellation_type::terminal); - if (recovery_signal) - recovery_signal->emit(asio::cancellation_type::terminal); + asio::dispatch(*cancel_strand, + [this]() { + renew_signal->emit(asio::cancellation_type::terminal); + }); if (recovery_signal) + asio::dispatch(*cancel_strand, + [this]() { + recovery_signal->emit(asio::cancellation_type::terminal); + }); + if (watch_signal) + asio::dispatch(*cancel_strand, + [this]() { + watch_signal->emit(asio::cancellation_type::terminal); + }); + if (watchcookie && rados->check_watch(watchcookie)) { + auto wc = watchcookie; + watchcookie = 0; + co_await rados->unwatch(wc, loc, asio::use_awaitable); + } + co_await renew_entries(&dp); +} + +asio::awaitable RGWDataChangesLog::shutdown_or_timeout() { + using namespace asio::experimental::awaitable_operators; + asio::steady_timer t(co_await asio::this_coro::executor, 3s); + co_await (shutdown() || t.async_wait(asio::use_awaitable)); + if (renew_signal) { + renew_signal->emit(asio::cancellation_type::terminal); + } + if (recovery_signal) { recovery_signal->emit(asio::cancellation_type::terminal); + } + if (watch_signal) { + watch_signal->emit(asio::cancellation_type::terminal); + } +} + +RGWDataChangesLog::~RGWDataChangesLog() { + if (log_data && !down_flag) { + lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": RGWDataChangesLog destructed without dhutdown." << dendl; + } +} + +void RGWDataChangesLog::blocking_shutdown() { + if (!down_flag) { + try { + auto eptr = asio::co_spawn(rados->get_io_context(), + shutdown_or_timeout(), + async::use_blocked); + if (eptr) { + std::rethrow_exception(eptr); + } + } catch (const sys::system_error& e) { + lderr(cct) << __PRETTY_FUNCTION__ + << ": Failed to shutting down: " << e.what() + << dendl; + } catch (const std::exception& e) { + lderr(cct) << __PRETTY_FUNCTION__ + << ": Failed to shutting down: " << e.what() + << dendl; + } + } } asio::awaitable RGWDataChangesLog::renew_run(decltype(renew_signal)) { diff --git a/src/rgw/driver/rados/rgw_datalog.h b/src/rgw/driver/rados/rgw_datalog.h index f690d0010b5..588a79fb76c 100644 --- a/src/rgw/driver/rados/rgw_datalog.h +++ b/src/rgw/driver/rados/rgw_datalog.h @@ -337,10 +337,11 @@ inline bool operator <(const BucketGen& l, const BucketGen& r) { } class RGWDataChangesLog { - friend class DataLogTest; + friend class DataLogTestBase; friend DataLogBackends; CephContext *cct; neorados::RADOS* rados; + std::optional> cancel_strand; neorados::IOContext loc; rgw::BucketChangeObserver *observer = nullptr; bool log_data = false; @@ -364,7 +365,7 @@ class RGWDataChangesLog { std::shared_mutex modified_lock; bc::flat_map> modified_shards; - std::atomic down_flag = { false }; + std::atomic down_flag = { true }; struct ChangeStatus { std::shared_ptr sync_policy; @@ -489,7 +490,6 @@ public: int trim_generations(const DoutPrefixProvider *dpp, std::optional& through, optional_yield y); - void shutdown(); asio::awaitable read_all_sems(int index, bc::flat_map* out); asio::awaitable @@ -505,6 +505,9 @@ public: asio::awaitable recover_shard(const DoutPrefixProvider* dpp, int index); asio::awaitable recover(const DoutPrefixProvider* dpp, decltype(recovery_signal)); + asio::awaitable shutdown(); + asio::awaitable shutdown_or_timeout(); + void blocking_shutdown(); }; class RGWDataChangesBE : public boost::intrusive_ref_counter { diff --git a/src/rgw/driver/rados/rgw_sal_rados.cc b/src/rgw/driver/rados/rgw_sal_rados.cc index 5e82d5b0a23..c7097ae69bd 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.cc +++ b/src/rgw/driver/rados/rgw_sal_rados.cc @@ -2286,7 +2286,7 @@ int RadosStore::meta_remove(const DoutPrefixProvider* dpp, std::string& metadata } void RadosStore::shutdown(void) { - svc()->datalog_rados->shutdown(); + svc()->datalog_rados->blocking_shutdown(); return; }