Simply use the RADOS handle and `rgw_rados_ref` directly.
Also move `async_processor` out from `RGWSI_RADOS` and into
`RGWServices_Def`. This is as good a place as it for any, for now, as
it's reachable by everyone who needs it and exists through the
lifetime of the process.
Eventually it's going to go away due to coroutinization, anyway.
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
}
sync_module.reset(new RGWDefaultSyncModuleInstance());
- auto async_rados = driver->svc()->rados->get_async_processor();
+ auto async_rados = driver->svc()->async_processor;
sync_env.init(this, driver->ctx(), driver,
driver->svc(), async_rados, &http_manager,
RGWDataSyncEnv env;
RGWSyncModuleInstanceRef module; // null sync module
- env.init(dpp, driver->ctx(), driver, driver->svc(), driver->svc()->rados->get_async_processor(),
+ env.init(dpp, driver->ctx(), driver, driver->svc(), driver->svc()->async_processor,
nullptr, nullptr, nullptr, module, nullptr);
RGWDataSyncCtx sc;
{
rgw::sal::RadosStore* rados_store = static_cast<rgw::sal::RadosStore*>(driver);
// initialize a sync status manager to read the status
- RGWMetaSyncStatusManager mgr(rados_store, rados_store->svc()->rados->get_async_processor());
+ RGWMetaSyncStatusManager mgr(rados_store, rados_store->svc()->async_processor);
int r = mgr.init(dpp);
if (r < 0) {
return r;
/* Before joining any sync threads, drain outstanding requests &
* mark the async_processor as going_down() */
if (svc.rados) {
- svc.rados->stop_processor();
+ svc.async_processor->stop();
}
if (run_sync_thread) {
<< pt.second.name << " present in zonegroup" << dendl;
}
}
- auto async_processor = svc.rados->get_async_processor();
+ auto async_processor = svc.async_processor;
std::lock_guard l{meta_sync_thread_lock};
meta_sync_processor_thread = new RGWMetaSyncProcessorThread(this->driver, async_processor);
ret = meta_sync_processor_thread->init(dpp);
std::lock_guard dl{data_sync_thread_lock};
for (auto source_zone : svc.zone->get_data_sync_source_zones()) {
ldpp_dout(dpp, 5) << "starting data sync thread for zone " << source_zone->name << dendl;
- auto *thread = new RGWDataSyncProcessorThread(this->driver, svc.rados->get_async_processor(), source_zone);
+ auto *thread = new RGWDataSyncProcessorThread(this->driver, svc.async_processor, source_zone);
ret = thread->init(dpp);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: failed to initialize data sync thread" << dendl;
#include "common/errno.h"
#include "rgw_bucket.h"
+#include "rgw_cr_rados.h"
#include "rgw_datalog.h"
#include "rgw_metadata.h"
#include "rgw_otp.h"
sysobj_core = std::make_unique<RGWSI_SysObj_Core>(cct);
user_rados = std::make_unique<RGWSI_User_RADOS>(cct);
role_rados = std::make_unique<RGWSI_Role_RADOS>(cct);
+ async_processor = std::make_unique<RGWAsyncRadosProcessor>(
+ cct, cct->_conf->rgw_num_async_rados_threads);
if (have_cache) {
sysobj_cache = std::make_unique<RGWSI_SysObj_Cache>(dpp, cct);
vector<RGWSI_MetaBackend *> meta_bes{meta_be_sobj.get(), meta_be_otp.get()};
+ async_processor->start();
finisher->init();
bi_rados->init(zone.get(), radoshandle, bilog_rados.get(), datalog_rados.get());
bilog_rados->init(bi_rados.get());
bucket_sobj.get());
cls->init(zone.get(), radoshandle);
config_key_rados->init(radoshandle);
- mdlog->init(rados.get(), zone.get(), sysobj.get(), cls.get());
+ mdlog->init(radoshandle, zone.get(), sysobj.get(), cls.get(),
+ async_processor.get());
meta->init(sysobj.get(), mdlog.get(), meta_bes);
meta_be_sobj->init(sysobj.get(), mdlog.get());
meta_be_otp->init(sysobj.get(), mdlog.get(), cls.get());
quota->shutdown();
zone_utils->shutdown();
zone->shutdown();
+ async_processor->stop();
rados->shutdown();
has_shutdown = true;
-
}
int RGWServices::do_init(CephContext *_cct, bool have_cache, bool raw,
core = _svc.sysobj_core.get();
user = _svc.user_rados.get();
role = _svc.role_rados.get();
+ async_processor = _svc.async_processor.get();
return 0;
}
class RGWSI_User_RADOS;
class RGWDataChangesLog;
class RGWSI_Role_RADOS;
+class RGWAsyncRadosProcessor;
struct RGWServices_Def
{
std::unique_ptr<RGWSI_User_RADOS> user_rados;
std::unique_ptr<RGWDataChangesLog> datalog_rados;
std::unique_ptr<RGWSI_Role_RADOS> role_rados;
+ std::unique_ptr<RGWAsyncRadosProcessor> async_processor;
RGWServices_Def();
~RGWServices_Def();
RGWSI_SysObj_Core *core{nullptr};
RGWSI_User *user{nullptr};
RGWSI_Role_RADOS *role{nullptr};
+ RGWAsyncRadosProcessor* async_processor;
int do_init(CephContext *cct, bool have_cache, bool raw_storage,
bool run_sync, librados::Rados* radoshandle, optional_yield y,
get_policy_params.zone = zone_id;
get_policy_params.bucket = bucket;
- yield call(new RGWBucketGetSyncPolicyHandlerCR(store->svc()->rados->get_async_processor(),
+ yield call(new RGWBucketGetSyncPolicyHandlerCR(store->svc()->async_processor,
store,
get_policy_params,
source_policy,
}
while (clean_info && retries < MAX_RETRIES) {
yield call(new RGWPutBucketInstanceInfoCR(
- store->svc()->rados->get_async_processor(),
+ store->svc()->async_processor,
store, clean_info->first, false, {},
no_change_attrs(), dpp));
// Raced, try again.
if (retcode == -ECANCELED) {
yield call(new RGWGetBucketInstanceInfoCR(
- store->svc()->rados->get_async_processor(),
+ store->svc()->async_processor,
store, clean_info->first.bucket,
&(clean_info->first), nullptr, dpp));
if (retcode < 0) {
return buckets.size() < config.buckets_per_interval;
};
- call(new MetadataListCR(cct, store->svc()->rados->get_async_processor(),
+ call(new MetadataListCR(cct, store->svc()->async_processor,
store->ctl()->meta.mgr,
section, status.marker, cb));
}
// prevent others from trimming for our entire wait interval
set_status("acquiring trim lock");
- yield call(new RGWSimpleRadosLockCR(store->svc()->rados->get_async_processor(), store,
+ yield call(new RGWSimpleRadosLockCR(store->svc()->async_processor, store,
obj, name, cookie,
config.trim_interval_sec));
if (retcode < 0) {
if (retcode < 0) {
// on errors, unlock so other gateways can try
set_status("unlocking");
- yield call(new RGWSimpleRadosUnlockCR(store->svc()->rados->get_async_processor(), store,
+ yield call(new RGWSimpleRadosUnlockCR(store->svc()->async_processor, store,
obj, name, cookie));
}
}
// prevent other gateways from attempting to trim for the duration
set_status("acquiring trim lock");
- yield call(new RGWSimpleRadosLockCR(store->svc()->rados->get_async_processor(), store,
+ yield call(new RGWSimpleRadosLockCR(store->svc()->async_processor, store,
rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, lock_oid),
"data_trim", lock_cookie,
// interval is a small number and unlikely to overflow
env(env), mdlog(mdlog), period_id(env.current.get_period().get_id())
{
meta_env.init(env.dpp, cct, env.store, env.store->svc()->zone->get_master_conn(),
- env.store->svc()->rados->get_async_processor(), env.http, nullptr,
+ env.store->svc()->async_processor, env.http, nullptr,
env.store->getRados()->get_sync_tracer());
}
// prevent others from trimming for our entire wait interval
set_status("acquiring trim lock");
- yield call(new RGWSimpleRadosLockCR(store->svc()->rados->get_async_processor(), store,
+ yield call(new RGWSimpleRadosLockCR(store->svc()->async_processor, store,
obj, name, cookie,
// interval is a small number and unlikely to overflow
// coverity[store_truncates_time_t:SUPPRESS]
if (retcode < 0) {
// on errors, unlock so other gateways can try
set_status("unlocking");
- yield call(new RGWSimpleRadosUnlockCR(store->svc()->rados->get_async_processor(), store,
+ yield call(new RGWSimpleRadosUnlockCR(store->svc()->async_processor, store,
obj, name, cookie));
}
}
static void get_md_sync_status(list<string>& status)
{
- RGWMetaSyncStatusManager sync(static_cast<rgw::sal::RadosStore*>(driver), static_cast<rgw::sal::RadosStore*>(driver)->svc()->rados->get_async_processor());
+ RGWMetaSyncStatusManager sync(static_cast<rgw::sal::RadosStore*>(driver), static_cast<rgw::sal::RadosStore*>(driver)->svc()->async_processor);
int ret = sync.init(dpp());
if (ret < 0) {
flush_ss(ss, status);
return;
}
- RGWDataSyncStatusManager sync(static_cast<rgw::sal::RadosStore*>(driver), static_cast<rgw::sal::RadosStore*>(driver)->svc()->rados->get_async_processor(), source_zone, nullptr);
+ RGWDataSyncStatusManager sync(static_cast<rgw::sal::RadosStore*>(driver), static_cast<rgw::sal::RadosStore*>(driver)->svc()->async_processor, source_zone, nullptr);
int ret = sync.init(dpp());
if (ret < 0) {
}
if (opt_cmd == OPT::METADATA_SYNC_STATUS) {
- RGWMetaSyncStatusManager sync(static_cast<rgw::sal::RadosStore*>(driver), static_cast<rgw::sal::RadosStore*>(driver)->svc()->rados->get_async_processor());
+ RGWMetaSyncStatusManager sync(static_cast<rgw::sal::RadosStore*>(driver), static_cast<rgw::sal::RadosStore*>(driver)->svc()->async_processor);
int ret = sync.init(dpp());
if (ret < 0) {
}
if (opt_cmd == OPT::METADATA_SYNC_INIT) {
- RGWMetaSyncStatusManager sync(static_cast<rgw::sal::RadosStore*>(driver), static_cast<rgw::sal::RadosStore*>(driver)->svc()->rados->get_async_processor());
+ RGWMetaSyncStatusManager sync(static_cast<rgw::sal::RadosStore*>(driver), static_cast<rgw::sal::RadosStore*>(driver)->svc()->async_processor);
int ret = sync.init(dpp());
if (ret < 0) {
if (opt_cmd == OPT::METADATA_SYNC_RUN) {
- RGWMetaSyncStatusManager sync(static_cast<rgw::sal::RadosStore*>(driver), static_cast<rgw::sal::RadosStore*>(driver)->svc()->rados->get_async_processor());
+ RGWMetaSyncStatusManager sync(static_cast<rgw::sal::RadosStore*>(driver), static_cast<rgw::sal::RadosStore*>(driver)->svc()->async_processor);
int ret = sync.init(dpp());
if (ret < 0) {
cerr << "ERROR: source zone not specified" << std::endl;
return EINVAL;
}
- RGWDataSyncStatusManager sync(static_cast<rgw::sal::RadosStore*>(driver), static_cast<rgw::sal::RadosStore*>(driver)->svc()->rados->get_async_processor(), source_zone, nullptr);
+ RGWDataSyncStatusManager sync(static_cast<rgw::sal::RadosStore*>(driver), static_cast<rgw::sal::RadosStore*>(driver)->svc()->async_processor, source_zone, nullptr);
int ret = sync.init(dpp());
if (ret < 0) {
return EINVAL;
}
- RGWDataSyncStatusManager sync(static_cast<rgw::sal::RadosStore*>(driver), static_cast<rgw::sal::RadosStore*>(driver)->svc()->rados->get_async_processor(), source_zone, nullptr);
+ RGWDataSyncStatusManager sync(static_cast<rgw::sal::RadosStore*>(driver), static_cast<rgw::sal::RadosStore*>(driver)->svc()->async_processor, source_zone, nullptr);
int ret = sync.init(dpp());
if (ret < 0) {
return ret;
}
- RGWDataSyncStatusManager sync(static_cast<rgw::sal::RadosStore*>(driver), static_cast<rgw::sal::RadosStore*>(driver)->svc()->rados->get_async_processor(), source_zone, nullptr, sync_module);
+ RGWDataSyncStatusManager sync(static_cast<rgw::sal::RadosStore*>(driver), static_cast<rgw::sal::RadosStore*>(driver)->svc()->async_processor, source_zone, nullptr, sync_module);
ret = sync.init(dpp());
if (ret < 0) {
RGWSI_MDLog::~RGWSI_MDLog() {
}
-int RGWSI_MDLog::init(RGWSI_RADOS *_rados_svc, RGWSI_Zone *_zone_svc, RGWSI_SysObj *_sysobj_svc, RGWSI_Cls *_cls_svc)
+int RGWSI_MDLog::init(librados::Rados* rados_, RGWSI_Zone *_zone_svc,
+ RGWSI_SysObj *_sysobj_svc, RGWSI_Cls *_cls_svc,
+ RGWAsyncRadosProcessor* async_processor_)
{
svc.zone = _zone_svc;
svc.sysobj = _sysobj_svc;
svc.mdlog = this;
- svc.rados = _rados_svc;
+ rados = rados_;
svc.cls = _cls_svc;
+ async_processor = async_processor_;
return 0;
}
ReadHistoryCR(const DoutPrefixProvider *dpp,
const Svc& svc,
Cursor *cursor,
- RGWObjVersionTracker *objv_tracker)
+ RGWObjVersionTracker *objv_tracker,
+ RGWAsyncRadosProcessor* async_processor)
: RGWCoroutine(svc.zone->ctx()), dpp(dpp), svc(svc),
cursor(cursor),
objv_tracker(objv_tracker),
- async_processor(svc.rados->get_async_processor())
+ async_processor(async_processor)
{}
int operate(const DoutPrefixProvider *dpp) {
WriteHistoryCR(const DoutPrefixProvider *dpp,
Svc& svc,
const Cursor& cursor,
- RGWObjVersionTracker *objv)
+ RGWObjVersionTracker *objv,
+ RGWAsyncRadosProcessor* async_processor)
: RGWCoroutine(svc.zone->ctx()), dpp(dpp), svc(svc),
cursor(cursor), objv(objv),
- async_processor(svc.rados->get_async_processor())
+ async_processor(async_processor)
{}
int operate(const DoutPrefixProvider *dpp) {
RGWObjVersionTracker *objv; //< to prevent racing updates
Cursor next; //< target cursor for oldest log period
Cursor existing; //< existing cursor read from disk
+ RGWAsyncRadosProcessor* async_processor;
public:
- TrimHistoryCR(const DoutPrefixProvider *dpp, const Svc& svc, Cursor cursor, RGWObjVersionTracker *objv)
+ TrimHistoryCR(const DoutPrefixProvider *dpp, const Svc& svc, Cursor cursor,
+ RGWObjVersionTracker *objv,
+ RGWAsyncRadosProcessor* async_processor)
: RGWCoroutine(svc.zone->ctx()), dpp(dpp), svc(svc),
- cursor(cursor), objv(objv), next(cursor) {
+ cursor(cursor), objv(objv), next(cursor),
+ async_processor(async_processor) {
next.next(); // advance past cursor
}
int operate(const DoutPrefixProvider *dpp) {
reenter(this) {
// read an existing history, and write the new history if it's newer
- yield call(new ReadHistoryCR(dpp, svc, &existing, objv));
+ yield call(new ReadHistoryCR(dpp, svc, &existing, objv, async_processor));
if (retcode < 0) {
return set_cr_error(retcode);
}
return set_cr_error(-ECANCELED);
}
// overwrite with updated history
- yield call(new WriteHistoryCR(dpp, svc, next, objv));
+ yield call(new WriteHistoryCR(dpp, svc, next, objv, async_processor));
if (retcode < 0) {
return set_cr_error(retcode);
}
RGWCoroutine* RGWSI_MDLog::read_oldest_log_period_cr(const DoutPrefixProvider *dpp,
Cursor *period, RGWObjVersionTracker *objv) const
{
- return new mdlog::ReadHistoryCR(dpp, svc, period, objv);
+ return new mdlog::ReadHistoryCR(dpp, svc, period, objv, async_processor);
}
RGWCoroutine* RGWSI_MDLog::trim_log_period_cr(const DoutPrefixProvider *dpp,
Cursor period, RGWObjVersionTracker *objv) const
{
- return new mdlog::TrimHistoryCR(dpp, svc, period, objv);
+ return new mdlog::TrimHistoryCR(dpp, svc, period, objv, async_processor);
}
RGWMetadataLog* RGWSI_MDLog::get_log(const std::string& period)
class RGWSI_Zone;
class RGWSI_SysObj;
-class RGWSI_RADOS;
namespace mdlog {
class ReadHistoryCR;
RGWSI_MDLog(CephContext *cct, bool run_sync);
virtual ~RGWSI_MDLog();
+ librados::Rados* rados{nullptr};
+ RGWAsyncRadosProcessor* async_processor{nullptr};
+
struct Svc {
- RGWSI_RADOS *rados{nullptr};
RGWSI_Zone *zone{nullptr};
RGWSI_SysObj *sysobj{nullptr};
RGWSI_MDLog *mdlog{nullptr};
RGWSI_Cls *cls{nullptr};
} svc;
- int init(RGWSI_RADOS *_rados_svc,
+ int init(librados::Rados* rados_,
RGWSI_Zone *_zone_svc,
RGWSI_SysObj *_sysobj_svc,
- RGWSI_Cls *_cls_svc);
+ RGWSI_Cls *_cls_svc,
+ RGWAsyncRadosProcessor* async_processor_);
int do_start(optional_yield y, const DoutPrefixProvider *dpp) override;
return ret;
}
- async_processor.reset(new RGWAsyncRadosProcessor(cct, cct->_conf->rgw_num_async_rados_threads));
- async_processor->start();
-
return 0;
}
void RGWSI_RADOS::shutdown()
{
- if (async_processor) {
- async_processor->stop();
- }
rados.shutdown();
}
-void RGWSI_RADOS::stop_processor()
-{
- if (async_processor) {
- async_processor->stop();
- }
-}
-
librados::Rados* RGWSI_RADOS::get_rados_handle()
{
return &rados;
class RGWSI_RADOS : public RGWServiceInstance
{
librados::Rados rados;
- std::unique_ptr<RGWAsyncRadosProcessor> async_processor;
int do_start(optional_yield, const DoutPrefixProvider *dpp) override;
void init() {}
void shutdown() override;
- void stop_processor();
std::string cluster_fsid();
uint64_t instance_id();
bool check_secure_mon_conn(const DoutPrefixProvider *dpp) const;
- RGWAsyncRadosProcessor *get_async_processor() {
- return async_processor.get();
- }
-
int clog_warn(const std::string& msg);
class Handle;