#include <boost/asio/awaitable.hpp>
#include <boost/asio/bind_cancellation_slot.hpp>
#include <boost/asio/co_spawn.hpp>
+#include <boost/asio/experimental/awaitable_operators.hpp>
+
#include <boost/container/flat_set.hpp>
#include <boost/container/flat_map.hpp>
+
#include <boost/system/system_error.hpp>
#include "common/async/parallel_for_each.h"
bool watch,
bool renew)
{
- if (!log_data) {
- co_return;
- }
+ down_flag = false;
+ cancel_strand = asio::make_strand(rados->get_executor());
+
auto defbacking = to_log_type(
cct->_conf.get_val<std::string>("rgw_default_data_log_backing"));
// Should be guaranteed by `set_enum_allowed`
throw;
}
+ if (!log_data) {
+ co_return;
+ }
+
if (renew) {
- asio::co_spawn(co_await asio::this_coro::executor,
- renew_run(renew_signal),
- asio::bind_cancellation_slot(renew_signal->slot(),
- asio::detached));
+ asio::co_spawn(
+ co_await asio::this_coro::executor,
+ renew_run(renew_signal),
+ asio::bind_cancellation_slot(renew_signal->slot(),
+ asio::bind_executor(*cancel_strand,
+ asio::detached)));
}
if (watch) {
// Establish watch here so we won't be 'started up' until we're watching.
throw sys::system_error{ENOTCONN, sys::generic_category(),
"Unable to establish recovery watch!"};
}
- asio::co_spawn(co_await asio::this_coro::executor,
- watch_loop(watch_signal),
- asio::bind_cancellation_slot(watch_signal->slot(),
- asio::detached));
+ asio::co_spawn(
+ co_await asio::this_coro::executor,
+ watch_loop(watch_signal),
+ asio::bind_cancellation_slot(watch_signal->slot(),
+ asio::bind_executor(*cancel_strand,
+ asio::detached)));
}
if (recovery) {
// Recovery can run concurrent with normal operation, so we don't
// have to block startup while we do all that I/O.
- asio::co_spawn(co_await asio::this_coro::executor,
- recover(dpp, recovery_signal),
- asio::bind_cancellation_slot(recovery_signal->slot(),
- asio::detached));
+ asio::co_spawn(
+ co_await asio::this_coro::executor,
+ recover(dpp, recovery_signal),
+ asio::bind_cancellation_slot(recovery_signal->slot(),
+ asio::bind_executor(*cancel_strand,
+ asio::detached)));
}
co_return;
}
<< "recovery won't decrement semaphores." << dendl;
continue;
}
- if (going_down()) {
+ if (going_down() || e.code() == asio::error::operation_aborted){
+ need_rewatch = false;
break;
} else {
need_rewatch = true;
return down_flag;
}
-RGWDataChangesLog::~RGWDataChangesLog() {
- shutdown();
-}
-
-void RGWDataChangesLog::shutdown() {
+asio::awaitable<void> RGWDataChangesLog::shutdown() {
+ DoutPrefix dp{cct, ceph_subsys_rgw, "Datalog Shutdown"};
if (down_flag) {
- return;
+ co_return;
}
down_flag = true;
renew_stop();
// Revisit this later
if (renew_signal)
- renew_signal->emit(asio::cancellation_type::terminal);
- if (recovery_signal)
- recovery_signal->emit(asio::cancellation_type::terminal);
+ asio::dispatch(*cancel_strand,
+ [this]() {
+ renew_signal->emit(asio::cancellation_type::terminal);
+ });
if (recovery_signal)
+ asio::dispatch(*cancel_strand,
+ [this]() {
+ recovery_signal->emit(asio::cancellation_type::terminal);
+ });
+ if (watch_signal)
+ asio::dispatch(*cancel_strand,
+ [this]() {
+ watch_signal->emit(asio::cancellation_type::terminal);
+ });
+ if (watchcookie && rados->check_watch(watchcookie)) {
+ auto wc = watchcookie;
+ watchcookie = 0;
+ co_await rados->unwatch(wc, loc, asio::use_awaitable);
+ }
+ co_await renew_entries(&dp);
+}
+
+asio::awaitable<void> RGWDataChangesLog::shutdown_or_timeout() {
+ using namespace asio::experimental::awaitable_operators;
+ asio::steady_timer t(co_await asio::this_coro::executor, 3s);
+ co_await (shutdown() || t.async_wait(asio::use_awaitable));
+ if (renew_signal) {
+ renew_signal->emit(asio::cancellation_type::terminal);
+ }
+ if (recovery_signal) {
recovery_signal->emit(asio::cancellation_type::terminal);
+ }
+ if (watch_signal) {
+ watch_signal->emit(asio::cancellation_type::terminal);
+ }
+}
+
+RGWDataChangesLog::~RGWDataChangesLog() {
+ if (log_data && !down_flag) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << ": RGWDataChangesLog destructed without dhutdown." << dendl;
+ }
+}
+
+void RGWDataChangesLog::blocking_shutdown() {
+ if (!down_flag) {
+ try {
+ auto eptr = asio::co_spawn(rados->get_io_context(),
+ shutdown_or_timeout(),
+ async::use_blocked);
+ if (eptr) {
+ std::rethrow_exception(eptr);
+ }
+ } catch (const sys::system_error& e) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": Failed to shutting down: " << e.what()
+ << dendl;
+ } catch (const std::exception& e) {
+ lderr(cct) << __PRETTY_FUNCTION__
+ << ": Failed to shutting down: " << e.what()
+ << dendl;
+ }
+ }
}
asio::awaitable<void> RGWDataChangesLog::renew_run(decltype(renew_signal)) {
}
class RGWDataChangesLog {
- friend class DataLogTest;
+ friend class DataLogTestBase;
friend DataLogBackends;
CephContext *cct;
neorados::RADOS* rados;
+ std::optional<asio::strand<asio::io_context::executor_type>> cancel_strand;
neorados::IOContext loc;
rgw::BucketChangeObserver *observer = nullptr;
bool log_data = false;
std::shared_mutex modified_lock;
bc::flat_map<int, bc::flat_set<rgw_data_notify_entry>> modified_shards;
- std::atomic<bool> down_flag = { false };
+ std::atomic<bool> down_flag = { true };
struct ChangeStatus {
std::shared_ptr<const rgw_sync_policy_info> sync_policy;
int trim_generations(const DoutPrefixProvider *dpp,
std::optional<uint64_t>& through,
optional_yield y);
- void shutdown();
asio::awaitable<void> read_all_sems(int index,
bc::flat_map<std::string, uint64_t>* out);
asio::awaitable<bool>
asio::awaitable<void> recover_shard(const DoutPrefixProvider* dpp, int index);
asio::awaitable<void> recover(const DoutPrefixProvider* dpp,
decltype(recovery_signal));
+ asio::awaitable<void> shutdown();
+ asio::awaitable<void> shutdown_or_timeout();
+ void blocking_shutdown();
};
class RGWDataChangesBE : public boost::intrusive_ref_counter<RGWDataChangesBE> {