}
if (opt_cmd == OPT_MDLOG_FETCH) {
- RGWMetaSyncStatusManager sync(store);
+ RGWMetaSyncStatusManager sync(store, store->get_async_rados());
int ret = sync.init();
if (ret < 0) {
}
if (opt_cmd == OPT_METADATA_SYNC_STATUS) {
- RGWMetaSyncStatusManager sync(store);
+ RGWMetaSyncStatusManager sync(store, store->get_async_rados());
int ret = sync.init();
if (ret < 0) {
}
if (opt_cmd == OPT_METADATA_SYNC_INIT) {
- RGWMetaSyncStatusManager sync(store);
+ RGWMetaSyncStatusManager sync(store, store->get_async_rados());
int ret = sync.init();
if (ret < 0) {
if (opt_cmd == OPT_METADATA_SYNC_RUN) {
- RGWMetaSyncStatusManager sync(store);
+ RGWMetaSyncStatusManager sync(store, store->get_async_rados());
int ret = sync.init();
if (ret < 0) {
cerr << "ERROR: source zone not specified" << std::endl;
return EINVAL;
}
- RGWDataSyncStatusManager sync(store, source_zone);
+ RGWDataSyncStatusManager sync(store, store->get_async_rados(), source_zone);
int ret = sync.init();
if (ret < 0) {
cerr << "ERROR: source zone not specified" << std::endl;
return EINVAL;
}
- RGWDataSyncStatusManager sync(store, source_zone);
+ RGWDataSyncStatusManager sync(store, store->get_async_rados(), source_zone);
int ret = sync.init();
if (ret < 0) {
cerr << "ERROR: source zone not specified" << std::endl;
return EINVAL;
}
- RGWDataSyncStatusManager sync(store, source_zone);
+ RGWDataSyncStatusManager sync(store, store->get_async_rados(), source_zone);
int ret = sync.init();
if (ret < 0) {
}
source_zone = _source_zone;
-
- CephContext *cct = store->ctx();
- async_rados = new RGWAsyncRadosProcessor(store, cct->_conf->rgw_num_async_rados_threads);
- async_rados->start();
-
conn = _conn;
int ret = http_manager.set_threaded();
if (ret < 0) {
- async_rados->stop();
- delete async_rados;
- async_rados = NULL;
ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
return ret;
}
void RGWRemoteDataLog::finish()
{
stop();
- if (async_rados) {
- async_rados->stop();
- }
- delete async_rados;
}
int RGWRemoteDataLog::get_shard_info(int shard_id)
bool initialized;
public:
- RGWRemoteDataLog(RGWRados *_store, RGWDataSyncStatusManager *_sm) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store),
- conn(NULL),
- http_manager(store->ctx(), &completion_mgr),
- status_manager(_sm), lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
- initialized(false) {}
+ RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
+ RGWDataSyncStatusManager *_sm)
+ : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
+ store(_store), conn(NULL), async_rados(async_rados),
+ http_manager(store->ctx(), &completion_mgr),
+ status_manager(_sm), lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
+ initialized(false) {}
int init(const string& _source_zone, RGWRESTConn *_conn);
void finish();
int num_shards;
public:
- RGWDataSyncStatusManager(RGWRados *_store, const string& _source_zone) : store(_store), source_zone(_source_zone), conn(NULL),
- source_log(store, this), num_shards(0) {}
+ RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
+ const string& _source_zone)
+ : store(_store), source_zone(_source_zone), conn(NULL),
+ source_log(store, async_rados, this), num_shards(0) {}
int init();
rgw_data_sync_status& get_sync_status() { return sync_status; }
#include "rgw_metadata.h"
#include "rgw_bucket.h"
#include "rgw_rest_conn.h"
+#include "rgw_cr_rados.h"
#include "rgw_cr_rest.h"
#include "cls/rgw/cls_rgw_ops.h"
sync.stop();
}
public:
- RGWMetaSyncProcessorThread(RGWRados *_store) : RGWSyncProcessorThread(_store), sync(_store) {}
+ RGWMetaSyncProcessorThread(RGWRados *_store, RGWAsyncRadosProcessor *async_rados)
+ : RGWSyncProcessorThread(_store), sync(_store, async_rados) {}
void wakeup_sync_shards(set<int>& shard_ids) {
for (set<int>::iterator iter = shard_ids.begin(); iter != shard_ids.end(); ++iter) {
sync.stop();
}
public:
- RGWDataSyncProcessorThread(RGWRados *_store, const string& _source_zone) : RGWSyncProcessorThread(_store),
- sync(_store, _source_zone),
- initialized(false) {}
+ RGWDataSyncProcessorThread(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
+ const string& _source_zone)
+ : RGWSyncProcessorThread(_store), sync(_store, async_rados, _source_zone),
+ initialized(false) {}
void wakeup_sync_shards(map<int, set<string> >& shard_ids) {
for (map<int, set<string> >::iterator iter = shard_ids.begin(); iter != shard_ids.end(); ++iter) {
}
delete meta_mgr;
delete data_log;
+ if (async_rados) {
+ async_rados->stop();
+ delete async_rados;
+ }
if (use_gc_thread) {
gc->stop_processor();
obj_expirer->stop_processor();
run_sync_thread = false;
}
+ async_rados = new RGWAsyncRadosProcessor(this, cct->_conf->rgw_num_async_rados_threads);
+ async_rados->start();
+
if (run_sync_thread) {
Mutex::Locker l(meta_sync_thread_lock);
- meta_sync_processor_thread = new RGWMetaSyncProcessorThread(this);
+ meta_sync_processor_thread = new RGWMetaSyncProcessorThread(this, async_rados);
ret = meta_sync_processor_thread->init();
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to initialize" << dendl;
Mutex::Locker dl(data_sync_thread_lock);
for (map<string, RGWRESTConn *>::iterator iter = zone_conn_map.begin(); iter != zone_conn_map.end(); ++iter) {
ldout(cct, 5) << "starting data sync thread for zone " << iter->first << dendl;
- RGWDataSyncProcessorThread *thread = new RGWDataSyncProcessorThread(this, iter->first);
+ RGWDataSyncProcessorThread *thread = new RGWDataSyncProcessorThread(this, async_rados, iter->first);
ret = thread->init();
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to initialize" << dendl;
};
class Finisher;
+class RGWAsyncRadosProcessor;
class RGWRados
{
bool quota_threads;
bool run_sync_thread;
+ RGWAsyncRadosProcessor* async_rados;
+
RGWMetaNotifier *meta_notifier;
RGWDataNotifier *data_notifier;
RGWMetaSyncProcessorThread *meta_sync_processor_thread;
public:
RGWRados() : max_req_id(0), lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL),
gc(NULL), obj_expirer(NULL), use_gc_thread(false), quota_threads(false),
- run_sync_thread(false), meta_notifier(NULL), data_notifier(NULL),
- meta_sync_processor_thread(NULL),
+ run_sync_thread(false), async_rados(nullptr), meta_notifier(NULL),
+ data_notifier(NULL), meta_sync_processor_thread(NULL),
meta_sync_thread_lock("meta_sync_thread_lock"), data_sync_thread_lock("data_sync_thread_lock"),
num_watchers(0), watchers(NULL),
watch_initialized(false),
// maintains a connected history of periods
std::unique_ptr<RGWPeriodHistory> period_history;
+ RGWAsyncRadosProcessor* get_async_rados() const { return async_rados; };
+
RGWMetadataManager *meta_mgr;
RGWDataChangesLog *data_log;
int RGWRemoteMetaLog::init()
{
- CephContext *cct = store->ctx();
- async_rados = new RGWAsyncRadosProcessor(store, cct->_conf->rgw_num_async_rados_threads);
- async_rados->start();
-
conn = store->rest_master_conn;
int ret = http_manager.set_threaded();
{
going_down.set(1);
stop();
- if (async_rados) {
- async_rados->stop();
- }
- delete async_rados;
}
int RGWRemoteMetaLog::list_shards(int num_shards)
atomic_t going_down;
public:
- RGWRemoteMetaLog(RGWRados *_store, RGWMetaSyncStatusManager *_sm) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store),
- conn(NULL), async_rados(nullptr),
- http_manager(store->ctx(), &completion_mgr),
- status_manager(_sm), meta_sync_cr(NULL) {}
+ RGWRemoteMetaLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
+ RGWMetaSyncStatusManager *_sm)
+ : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
+ store(_store), conn(NULL), async_rados(async_rados),
+ http_manager(store->ctx(), &completion_mgr),
+ status_manager(_sm), meta_sync_cr(NULL) {}
int init();
void finish();
vector<string> clone_markers;
public:
- RGWMetaSyncStatusManager(RGWRados *_store) : store(_store), master_log(store, this), num_shards(0),
- ts_to_shard_lock("ts_to_shard_lock") {}
+ RGWMetaSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados)
+ : store(_store), master_log(store, async_rados, this),
+ num_shards(0), ts_to_shard_lock("ts_to_shard_lock") {}
int init();
void finish();