int RGWDataChangesLog::start(const DoutPrefixProvider *dpp,
const RGWZone* zone,
const RGWZoneParams& zoneparams,
- rgw::sal::RadosStore* store)
+ rgw::sal::RadosStore* store,
+ bool background_tasks)
{
log_data = zone->log_data;
rados = &store->get_neorados();
-
try {
// Blocking in startup code, not ideal, but won't hurt anything.
std::exception_ptr eptr
= asio::co_spawn(store->get_io_context(),
- start(dpp, zoneparams.log_pool),
+ start(dpp, zoneparams.log_pool,
+ background_tasks, background_tasks,
+ background_tasks),
async::use_blocked);
if (eptr) {
std::rethrow_exception(eptr);
{
down_flag = false;
cancel_strand = asio::make_strand(rados->get_executor());
+ ran_background = (recovery || watch || renew);
auto defbacking = to_log_type(
cct->_conf.get_val<std::string>("rgw_default_data_log_backing"));
std::unique_lock l(m);
const auto head_gen = (end() - 1)->second->gen_id;
const auto tail_gen = begin()->first;
- if (target_gen < tail_gen)
+ if (target_gen < tail_gen)
+
co_return;
auto r = 0;
for (auto be = lower_bound(0)->second;
asio::awaitable<void> RGWDataChangesLog::shutdown() {
DoutPrefix dp{cct, ceph_subsys_rgw, "Datalog Shutdown"};
- if (down_flag) {
+ if (down_flag || !ran_background) {
co_return;
}
down_flag = true;
watchcookie = 0;
co_await rados->unwatch(wc, loc, asio::use_awaitable);
}
- co_await renew_entries(&dp);
+ co_return;
}
asio::awaitable<void> RGWDataChangesLog::shutdown_or_timeout() {
RGWDataChangesLog::~RGWDataChangesLog() {
if (log_data && !down_flag) {
lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
- << ": RGWDataChangesLog destructed without dhutdown." << dendl;
+ << ": RGWDataChangesLog destructed without shutdown." << dendl;
}
}
bc::flat_map<int, bc::flat_set<rgw_data_notify_entry>> modified_shards;
std::atomic<bool> down_flag = { true };
+ bool ran_background = false;
struct ChangeStatus {
std::shared_ptr<const rgw_sync_policy_info> sync_policy;
asio::awaitable<void> start(const DoutPrefixProvider* dpp,
const rgw_pool& log_pool,
- // For testing
- bool recovery = true,
- bool watch = true,
- bool renew = true);
+ // Broken out for testing, in use
+ // they're either all on (radosgw) or
+ // all off (radosgw-admin)
+ bool recovery, bool watch, bool renew);
int start(const DoutPrefixProvider *dpp, const RGWZone* _zone,
- const RGWZoneParams& zoneparams,
- rgw::sal::RadosStore* store);
+ const RGWZoneParams& zoneparams, rgw::sal::RadosStore* store,
+ bool background_tasks);
asio::awaitable<bool> establish_watch(const DoutPrefixProvider* dpp,
std::string_view oid);
asio::awaitable<void> process_notification(const DoutPrefixProvider* dpp,
}
int RGWRados::init_svc(bool raw, const DoutPrefixProvider *dpp,
+ bool background_tasks, // Ignored when `raw`
const rgw::SiteConfig& site)
{
if (raw) {
return svc.init_raw(cct, driver, use_cache, null_yield, dpp, site);
}
- return svc.init(cct, driver, use_cache, run_sync_thread, null_yield, dpp, site);
+ return svc.init(cct, driver, use_cache, run_sync_thread, background_tasks, null_yield, dpp, site);
}
/**
* Returns 0 on success, -ERR# on failure.
*/
int RGWRados::init_begin(CephContext* _cct, const DoutPrefixProvider *dpp,
+ bool background_tasks,
const rgw::SiteConfig& site)
{
set_context(_cct);
return ret;
}
- ret = init_svc(false, dpp, site);
+ ret = init_svc(false, dpp, background_tasks, site);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: failed to init services (ret=" << cpp_strerror(-ret) << ")" << dendl;
return ret;
CephContext *ctx() { return cct; }
/** do all necessary setup of the storage device */
int init_begin(CephContext *_cct, const DoutPrefixProvider *dpp,
- const rgw::SiteConfig& site);
+ bool background_tasks, const rgw::SiteConfig& site);
/** Initialize the RADOS instance and prepare to do other ops */
- int init_svc(bool raw, const DoutPrefixProvider *dpp, const rgw::SiteConfig& site);
+ int init_svc(bool raw, const DoutPrefixProvider *dpp, bool background_tasks, const rgw::SiteConfig& site);
virtual int init_rados();
int init_complete(const DoutPrefixProvider *dpp, optional_yield y);
void finalize();
bool have_cache,
bool raw,
bool run_sync,
+ bool background_tasks,
optional_yield y,
const DoutPrefixProvider *dpp)
{
r = datalog_rados->start(dpp, &zone->get_zone(),
zone->get_zone_params(),
- driver);
+ driver, background_tasks);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: failed to start datalog_rados service (" << cpp_strerror(-r) << dendl;
return r;
has_shutdown = true;
}
-int RGWServices::do_init(CephContext *_cct, rgw::sal::RadosStore* driver, bool have_cache, bool raw, bool run_sync, optional_yield y, const DoutPrefixProvider *dpp, const rgw::SiteConfig& _site)
+int RGWServices::do_init(CephContext *_cct, rgw::sal::RadosStore* driver, bool have_cache, bool raw, bool run_sync, bool background_tasks, optional_yield y, const DoutPrefixProvider *dpp, const rgw::SiteConfig& _site)
{
cct = _cct;
site = &_site;
- int r = _svc.init(cct, driver, have_cache, raw, run_sync, y, dpp);
+ int r = _svc.init(cct, driver, have_cache, raw, run_sync, background_tasks, y, dpp);
if (r < 0) {
return r;
}
~RGWServices_Def();
int init(CephContext *cct, rgw::sal::RadosStore* store, bool have_cache,
- bool raw_storage, bool run_sync, optional_yield y,
- const DoutPrefixProvider *dpp);
+ bool raw_storage, bool run_sync, bool background_tasks,
+ optional_yield y, const DoutPrefixProvider *dpp);
void shutdown();
};
RGWAsyncRadosProcessor* async_processor;
int do_init(CephContext *cct, rgw::sal::RadosStore* store, bool have_cache,
- bool raw_storage, bool run_sync, optional_yield y,
+ bool raw_storage, bool run_sync, bool background_tasks, optional_yield y,
const DoutPrefixProvider *dpp, const rgw::SiteConfig& site);
int init(CephContext *cct, rgw::sal::RadosStore* store, bool have_cache,
- bool run_sync, optional_yield y, const DoutPrefixProvider *dpp,
+ bool run_sync, bool background_tasks, optional_yield y, const DoutPrefixProvider *dpp,
const rgw::SiteConfig& site) {
- return do_init(cct, store, have_cache, false, run_sync, y, dpp, site);
+ return do_init(cct, store, have_cache, false, run_sync, background_tasks, y, dpp, site);
}
int init_raw(CephContext *cct, rgw::sal::RadosStore* store,
bool have_cache, optional_yield y,
const DoutPrefixProvider *dpp,
const rgw::SiteConfig& site) {
- return do_init(cct, store, have_cache, true, false, y, dpp, site);
+ return do_init(cct, store, have_cache, true, false, false, y, dpp, site);
}
void shutdown() {
_svc.shutdown();
false,
false,
false,
+ false, // No background tasks!
null_yield,
need_cache && g_conf()->rgw_cache_enabled,
need_gc);
run_quota,
run_sync,
g_conf().get_val<bool>("rgw_dynamic_resharding"),
- true, null_yield, // run notification thread
+ true, true, null_yield, // run notification thread
g_conf()->rgw_cache_enabled);
if (!env.driver) {
return -EIO;
exit(1);
}
- driver = DriverManager::get_storage(&dp, g_ceph_context, cfg, context_pool, site, false, false, false, false, false, false, null_yield);
+ driver = DriverManager::get_storage(&dp, g_ceph_context, cfg, context_pool, site, false, false, false, false, false, false, true, null_yield);
if (!driver) {
std::cerr << "couldn't init storage provider" << std::endl;
return EIO;
cct->_conf->rgw_enable_quota_threads,
cct->_conf->rgw_run_sync_thread,
cct->_conf.get_val<bool>("rgw_dynamic_resharding"),
- true, null_yield, // run notification thread
+ true, true, null_yield, // run notification thread
cct->_conf->rgw_cache_enabled);
}
bool run_reshard_thread,
bool run_notification_thread,
bool use_cache,
- bool use_gc, optional_yield y)
+ bool use_gc,
+ bool background_tasks,
+ optional_yield y)
{
rgw::sal::Driver* driver{nullptr};
.set_run_sync_thread(run_sync_thread)
.set_run_reshard_thread(run_reshard_thread)
.set_run_notification_thread(run_notification_thread)
- .init_begin(cct, dpp, site_config) < 0) {
+ .init_begin(cct, dpp, background_tasks, site_config) < 0) {
delete driver;
return nullptr;
}
.set_run_sync_thread(run_sync_thread)
.set_run_reshard_thread(run_reshard_thread)
.set_run_notification_thread(run_notification_thread)
- .init_begin(cct, dpp, site_config) < 0) {
+ .init_begin(cct, dpp, background_tasks, site_config) < 0) {
delete driver;
return nullptr;
}
return nullptr;
}
- int ret = rados->init_svc(true, dpp, site_config);
+ int ret = rados->init_svc(true, dpp, false, site_config);
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to init services (ret=" << cpp_strerror(-ret) << ")" << dendl;
delete driver;
}
}
-} // namespace rgw::sal
\ No newline at end of file
+} // namespace rgw::sal
bool quota_threads,
bool run_sync_thread,
bool run_reshard_thread,
- bool run_notification_thread, optional_yield y,
+ bool run_notification_thread,
+ bool background_tasks,
+ optional_yield y,
bool use_cache = true,
bool use_gc = true) {
rgw::sal::Driver* driver = init_storage_provider(dpp, cct, cfg, io_context,
run_sync_thread,
run_reshard_thread,
run_notification_thread,
- use_cache, use_gc, y);
+ use_cache, use_gc,
+ background_tasks, y);
return driver;
}
/** Get a stripped down driver by service name */
bool run_reshard_thread,
bool run_notification_thread,
bool use_metadata_cache,
- bool use_gc, optional_yield y);
+ bool use_gc, bool background_tasks,
+ optional_yield y);
/** Initialize a new raw Driver */
static rgw::sal::Driver* init_raw_storage_provider(const DoutPrefixProvider* dpp,
CephContext* cct,
false,
false,
false,
- true, null_yield,
+ true, true, null_yield,
false));
if (!store) {
std::cerr << "couldn't init storage provider" << std::endl;