executor_type get_executor() const;
boost::asio::io_context& get_io_context();
+ operator bool() {
+ return bool(impl);
+ }
+
private:
template<typename CompletionToken>
auto consign(CompletionToken&& token) {
#include <boost/system/system_error.hpp>
-#include "common/async/parallel_for_each.h"
#include "include/fs_types.h"
#include "include/neorados/RADOS.hpp"
}
};
-RGWDataChangesLog::RGWDataChangesLog(CephContext* cct)
- : cct(cct),
+RGWDataChangesLog::RGWDataChangesLog(rgw::sal::RadosStore* driver)
+ : cct(driver->ctx()), rados(driver->get_neorados()),
+ executor(driver->get_io_context().get_executor()),
num_shards(cct->_conf->rgw_data_log_num_shards),
prefix(get_prefix()),
changes(cct->_conf->rgw_data_log_changes_size) {}
RGWDataChangesLog::RGWDataChangesLog(CephContext *cct, bool log_data,
- neorados::RADOS *rados,
+ neorados::RADOS rados,
std::optional<int> num_shards,
std::optional<uint64_t> sem_max_keys)
- : cct(cct), rados(rados), log_data(log_data),
+ : cct(cct), rados(rados), log_data(log_data), executor(rados.get_executor()),
num_shards(num_shards ? *num_shards :
cct->_conf->rgw_data_log_num_shards),
prefix(get_prefix()), changes(cct->_conf->rgw_data_log_changes_size),
int RGWDataChangesLog::start(const DoutPrefixProvider *dpp,
const RGWZone* zone,
const RGWZoneParams& zoneparams,
- rgw::sal::RadosStore* store,
bool background_tasks)
{
log_data = zone->log_data;
- rados = &store->get_neorados();
try {
// Blocking in startup code, not ideal, but won't hurt anything.
std::exception_ptr eptr
- = asio::co_spawn(store->get_io_context(),
+ = asio::co_spawn(executor,
start(dpp, zoneparams.log_pool,
background_tasks, background_tasks,
background_tasks),
bool renew)
{
down_flag = false;
- cancel_strand = asio::make_strand(rados->get_executor());
ran_background = (recovery || watch || renew);
auto defbacking = to_log_type(
if (renew) {
asio::co_spawn(
- co_await asio::this_coro::executor,
- renew_run(renew_signal),
- asio::bind_cancellation_slot(renew_signal->slot(),
- asio::bind_executor(*cancel_strand,
+ renew_strand,
+ renew_run(shared_from_this()),
+ asio::bind_cancellation_slot(renew_signal.slot(),
+ asio::bind_executor(renew_strand,
asio::detached)));
}
if (watch) {
"Unable to establish recovery watch!"};
}
asio::co_spawn(
- co_await asio::this_coro::executor,
- watch_loop(watch_signal),
- asio::bind_cancellation_slot(watch_signal->slot(),
- asio::bind_executor(*cancel_strand,
+ watch_strand,
+ watch_loop(shared_from_this()),
+ asio::bind_cancellation_slot(watch_signal.slot(),
+ asio::bind_executor(watch_strand,
asio::detached)));
}
if (recovery) {
// Recovery can run concurrent with normal operation, so we don't
// have to block startup while we do all that I/O.
asio::co_spawn(
- co_await asio::this_coro::executor,
- recover(dpp, recovery_signal),
- asio::bind_cancellation_slot(recovery_signal->slot(),
- asio::bind_executor(*cancel_strand,
+ recovery_strand,
+ recover(dpp, shared_from_this()),
+ asio::bind_cancellation_slot(recovery_signal.slot(),
+ asio::bind_executor(recovery_strand,
asio::detached)));
}
co_return;
}
}
-asio::awaitable<void> RGWDataChangesLog::watch_loop(decltype(watch_signal)) {
+asio::awaitable<void>
+RGWDataChangesLog::watch_loop(std::shared_ptr<RGWDataChangesLog>)
+{
const DoutPrefix dp(cct, dout_subsys, "rgw data changes log: ");
const auto oid = get_sem_set_oid(0);
bool need_rewatch = false;
}
renew_stop();
// Revisit this later
- if (renew_signal)
- asio::dispatch(*cancel_strand,
- [this]() {
- renew_signal->emit(asio::cancellation_type::terminal);
- });
- if (recovery_signal)
- asio::dispatch(*cancel_strand,
- [this]() {
- recovery_signal->emit(asio::cancellation_type::terminal);
- });
- if (watch_signal)
- asio::dispatch(*cancel_strand,
- [this]() {
- watch_signal->emit(asio::cancellation_type::terminal);
- });
+ asio::dispatch(renew_strand,
+ [this]() {
+ renew_signal.emit(asio::cancellation_type::terminal);
+ });
+ asio::dispatch(recovery_strand,
+ [this]() {
+ recovery_signal.emit(asio::cancellation_type::terminal);
+ });
+ asio::dispatch(watch_strand,
+ [this]() {
+ watch_signal.emit(asio::cancellation_type::terminal);
+ });
if (watchcookie && rados->check_watch(watchcookie)) {
auto wc = watchcookie;
watchcookie = 0;
using namespace asio::experimental::awaitable_operators;
asio::steady_timer t(co_await asio::this_coro::executor, 3s);
co_await (shutdown() || t.async_wait(asio::use_awaitable));
- if (renew_signal) {
- renew_signal->emit(asio::cancellation_type::terminal);
- }
- if (recovery_signal) {
- recovery_signal->emit(asio::cancellation_type::terminal);
- }
- if (watch_signal) {
- watch_signal->emit(asio::cancellation_type::terminal);
- }
}
RGWDataChangesLog::~RGWDataChangesLog() {
}
}
-asio::awaitable<void> RGWDataChangesLog::renew_run(decltype(renew_signal)) {
+asio::awaitable<void> RGWDataChangesLog::renew_run(
+ std::shared_ptr<RGWDataChangesLog>) {
static constexpr auto runs_per_prune = 150;
auto run = 0;
renew_timer.emplace(co_await asio::this_coro::executor);
co_return;
}
-asio::awaitable<void> RGWDataChangesLog::recover(const DoutPrefixProvider* dpp,
- decltype(recovery_signal))
+asio::awaitable<void> RGWDataChangesLog::recover(
+ const DoutPrefixProvider* dpp,
+ std::shared_ptr<RGWDataChangesLog>)
{
- auto strand = asio::make_strand(co_await asio::this_coro::executor);
co_await asio::co_spawn(
- strand,
- [this](const DoutPrefixProvider* dpp)-> asio::awaitable<void, decltype(strand)> {
- auto ex = co_await boost::asio::this_coro::executor;
+ recovery_strand,
+ [this](const DoutPrefixProvider* dpp)-> asio::awaitable<void, strand_t> {
+ auto ex = recovery_strand;
auto group = async::spawn_group{ex, static_cast<size_t>(num_shards)};
for (auto i = 0; i < num_shards; ++i) {
boost::asio::co_spawn(ex, recover_shard(dpp, i), group);
};
}
-class RGWDataChangesLog {
+class RGWDataChangesLog
+ : public std::enable_shared_from_this<RGWDataChangesLog> {
friend class DataLogTestBase;
friend DataLogBackends;
CephContext *cct;
- neorados::RADOS* rados;
- std::optional<asio::strand<asio::io_context::executor_type>> cancel_strand;
+ std::optional<neorados::RADOS> rados;
neorados::IOContext loc;
rgw::BucketChangeObserver *observer = nullptr;
bool log_data = false;
std::unique_ptr<DataLogBackends> bes;
- std::shared_ptr<asio::cancellation_signal> renew_signal =
- std::make_shared<asio::cancellation_signal>();
- std::shared_ptr<asio::cancellation_signal> watch_signal =
- std::make_shared<asio::cancellation_signal>();
- std::shared_ptr<asio::cancellation_signal> recovery_signal =
- std::make_shared<asio::cancellation_signal>();
+ using executor_t = asio::io_context::executor_type;
+ executor_t executor;
+ using strand_t = asio::strand<executor_t>;
+ strand_t renew_strand{executor};
+ asio::cancellation_signal renew_signal = asio::cancellation_signal();
+ strand_t watch_strand{executor};
+ asio::cancellation_signal watch_signal = asio::cancellation_signal();
+ strand_t recovery_strand{executor};
+ asio::cancellation_signal recovery_signal = asio::cancellation_signal();
+
ceph::mono_time last_recovery = ceph::mono_clock::zero();
const int num_shards;
ceph::real_time expiration);
std::optional<asio::steady_timer> renew_timer;
- asio::awaitable<void> renew_run(decltype(renew_signal) renew_signal);
+ asio::awaitable<void> renew_run(
+ std::shared_ptr<RGWDataChangesLog> renew_signal);
void renew_stop();
std::function<bool(const rgw_bucket& bucket, optional_yield y,
public:
- RGWDataChangesLog(CephContext* cct);
+ RGWDataChangesLog(rgw::sal::RadosStore* driver);
// For testing.
RGWDataChangesLog(CephContext* cct, bool log_data,
- neorados::RADOS* rados,
+ neorados::RADOS rados,
std::optional<int> num_shards = std::nullopt,
std::optional<uint64_t> sem_max_keys = std::nullopt);
~RGWDataChangesLog();
bool recovery, bool watch, bool renew);
int start(const DoutPrefixProvider *dpp, const RGWZone* _zone,
- const RGWZoneParams& zoneparams, rgw::sal::RadosStore* store,
- bool background_tasks);
+ const RGWZoneParams& zoneparams, bool background_tasks);
asio::awaitable<bool> establish_watch(const DoutPrefixProvider* dpp,
std::string_view oid);
asio::awaitable<void> process_notification(const DoutPrefixProvider* dpp,
std::string_view oid);
- asio::awaitable<void> watch_loop(decltype(watch_signal));
+ asio::awaitable<void> watch_loop(std::shared_ptr<RGWDataChangesLog>);
int choose_oid(const rgw_bucket_shard& bs);
asio::awaitable<void> add_entry(const DoutPrefixProvider *dpp,
const RGWBucketInfo& bucket_info,
bc::flat_map<std::string, uint64_t>&& semcount);
asio::awaitable<void> recover_shard(const DoutPrefixProvider* dpp, int index);
asio::awaitable<void> recover(const DoutPrefixProvider* dpp,
- decltype(recovery_signal));
+ std::shared_ptr<RGWDataChangesLog>);
asio::awaitable<void> shutdown();
asio::awaitable<void> shutdown_or_timeout();
void blocking_shutdown();
const rgw::SiteConfig& site, rgw::sal::ConfigStore* cfgstore)
{
set_context(_cct);
- int ret = driver->init_neorados(dpp);
- if (ret < 0) {
- ldpp_dout(dpp, 0) << "ERROR: failed to initialize neorados (ret=" << cpp_strerror(-ret) << ")" << dendl;
- return ret;
- }
- ret = init_rados();
+ auto ret = init_rados();
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: failed to initialize librados (ret=" << cpp_strerror(-ret) << ")" << dendl;
return ret;
extern "C" {
-void* newRadosStore(void* io_context)
+ void* newRadosStore(void* io_context, void* dpp_)
{
+ auto dpp = static_cast<DoutPrefixProvider*>(dpp_);
rgw::sal::RadosStore* store = new rgw::sal::RadosStore(
*static_cast<boost::asio::io_context*>(io_context));
if (store) {
+ int ret = store->init_neorados(dpp);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to initialize neorados (ret=" << cpp_strerror(-ret) << ")" << dendl;
+ delete store;
+ store = nullptr;
+ return store;
+ }
+
RGWRados* rados = new RGWRados();
if (!rados) {
bilog_rados = std::make_unique<RGWSI_BILog_RADOS>(cct);
cls = std::make_unique<RGWSI_Cls>(cct);
config_key_rados = std::make_unique<RGWSI_ConfigKey_RADOS>(cct);
- datalog_rados = std::make_unique<RGWDataChangesLog>(cct);
+ datalog_rados = std::make_shared<RGWDataChangesLog>(driver);
mdlog = std::make_unique<RGWSI_MDLog>(cct, run_sync, cfgstore);
notify = std::make_unique<RGWSI_Notify>(cct);
zone = std::make_unique<RGWSI_Zone>(cct, cfgstore, site);
r = datalog_rados->start(dpp, &zone->get_zone(),
zone->get_zone_params(),
- driver, background_tasks);
+ background_tasks);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: failed to start datalog_rados service (" << cpp_strerror(-r) << dendl;
return r;
std::unique_ptr<RGWSI_SysObj_Core> sysobj_core;
std::unique_ptr<RGWSI_SysObj_Cache> sysobj_cache;
std::unique_ptr<RGWSI_User_RADOS> user_rados;
- std::unique_ptr<RGWDataChangesLog> datalog_rados;
+ std::shared_ptr<RGWDataChangesLog> datalog_rados;
std::unique_ptr<RGWAsyncRadosProcessor> async_processor;
RGWServices_Def();
extern "C" {
#ifdef WITH_RADOSGW_RADOS
-extern rgw::sal::Driver* newRadosStore(boost::asio::io_context* io_context);
+extern rgw::sal::Driver* newRadosStore(void* io_context, const void* dpp);
#endif
#ifdef WITH_RADOSGW_DBSTORE
extern rgw::sal::Driver* newDBStore(CephContext *cct);
if (cfg.store_name.compare("rados") == 0) {
#ifdef WITH_RADOSGW_RADOS
- driver = newRadosStore(&io_context);
+ driver = newRadosStore(&io_context, dpp);
RGWRados* rados = static_cast<rgw::sal::RadosStore* >(driver)->getRados();
if ((*rados).set_use_cache(use_cache)
rgw::sal::Driver* driver = nullptr;
if (cfg.store_name.compare("rados") == 0) {
#ifdef WITH_RADOSGW_RADOS
- driver = newRadosStore(&io_context);
+ driver = newRadosStore(&io_context, dpp);
RGWRados* rados = static_cast<rgw::sal::RadosStore* >(driver)->getRados();
rados->set_context(cct);
boost::asio::use_awaitable);
}
- virtual asio::awaitable<std::unique_ptr<RGWDataChangesLog>>
+ virtual asio::awaitable<std::shared_ptr<RGWDataChangesLog>>
create_datalog() = 0;
protected:
- std::unique_ptr<RGWDataChangesLog> datalog;
+ std::shared_ptr<RGWDataChangesLog> datalog;
neorados::RADOS& rados() noexcept { return *rados_; }
const std::string& pool_name() const noexcept { return pool_name_; }
}
auto recover(const DoutPrefixProvider* dpp) {
- return datalog->recover(dpp, nullptr);
+ return datalog->recover(dpp, datalog->shared_from_this());
}
void add_to_cur_cycle(const BucketGen& bg) {
class DataLogTest : public DataLogTestBase {
private:
- asio::awaitable<std::unique_ptr<RGWDataChangesLog>> create_datalog() override {
- auto datalog = std::make_unique<RGWDataChangesLog>(rados().cct(), true,
- &rados());
+ asio::awaitable<std::shared_ptr<RGWDataChangesLog>> create_datalog() override {
+ auto datalog = std::make_shared<RGWDataChangesLog>(rados().cct(), true,
+ rados());
co_await datalog->start(dpp(), rgw_pool(pool_name()), false, true, false);
co_return std::move(datalog);
}
class DataLogWatchless : public DataLogTestBase {
private:
- asio::awaitable<std::unique_ptr<RGWDataChangesLog>> create_datalog() override {
- auto datalog = std::make_unique<RGWDataChangesLog>(rados().cct(), true,
- &rados());
+ asio::awaitable<std::shared_ptr<RGWDataChangesLog>> create_datalog() override {
+ auto datalog = std::make_shared<RGWDataChangesLog>(rados().cct(), true,
+ rados());
co_await datalog->start(dpp(), rgw_pool(pool_name()), false, false, false);
co_return std::move(datalog);
}
class DataLogBulky : public DataLogTestBase {
private:
- asio::awaitable<std::unique_ptr<RGWDataChangesLog>> create_datalog() override {
+ asio::awaitable<std::shared_ptr<RGWDataChangesLog>> create_datalog() override {
// Decrease max push/list and force everything into one shard so we
// can test iterated increment/decrement/list code.
- auto datalog = std::make_unique<RGWDataChangesLog>(rados().cct(), true,
- &rados(), 1, 7);
+ auto datalog = std::make_shared<RGWDataChangesLog>(rados().cct(), true,
+ rados(), 1, 7);
co_await datalog->start(dpp(), rgw_pool(pool_name()), false, true, false);
co_return std::move(datalog);
}