From: Casey Bodley Date: Tue, 5 Jan 2016 18:30:48 +0000 (-0500) Subject: rgw: metadata and data sync share RGWAsyncRadosProcessor X-Git-Tag: v10.1.0~354^2~48 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=79d6952f240d55b2d28ee729e8f872564c6f14a4;p=ceph.git rgw: metadata and data sync share RGWAsyncRadosProcessor each RGWAsyncRadosProcessor creates 32 worker threads by default, so we should only create one instance and share it between callers Signed-off-by: Casey Bodley --- diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index a3af3ea779d3..ce9711c016a2 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -4465,7 +4465,7 @@ next: } if (opt_cmd == OPT_MDLOG_FETCH) { - RGWMetaSyncStatusManager sync(store); + RGWMetaSyncStatusManager sync(store, store->get_async_rados()); int ret = sync.init(); if (ret < 0) { @@ -4482,7 +4482,7 @@ next: } if (opt_cmd == OPT_METADATA_SYNC_STATUS) { - RGWMetaSyncStatusManager sync(store); + RGWMetaSyncStatusManager sync(store, store->get_async_rados()); int ret = sync.init(); if (ret < 0) { @@ -4524,7 +4524,7 @@ next: } if (opt_cmd == OPT_METADATA_SYNC_INIT) { - RGWMetaSyncStatusManager sync(store); + RGWMetaSyncStatusManager sync(store, store->get_async_rados()); int ret = sync.init(); if (ret < 0) { @@ -4540,7 +4540,7 @@ next: if (opt_cmd == OPT_METADATA_SYNC_RUN) { - RGWMetaSyncStatusManager sync(store); + RGWMetaSyncStatusManager sync(store, store->get_async_rados()); int ret = sync.init(); if (ret < 0) { @@ -4560,7 +4560,7 @@ next: cerr << "ERROR: source zone not specified" << std::endl; return EINVAL; } - RGWDataSyncStatusManager sync(store, source_zone); + RGWDataSyncStatusManager sync(store, store->get_async_rados(), source_zone); int ret = sync.init(); if (ret < 0) { @@ -4605,7 +4605,7 @@ next: cerr << "ERROR: source zone not specified" << std::endl; return EINVAL; } - RGWDataSyncStatusManager sync(store, source_zone); + RGWDataSyncStatusManager sync(store, store->get_async_rados(), source_zone); int ret = sync.init(); if (ret < 0) { @@ -4625,7 +4625,7 @@ next: cerr << "ERROR: source zone not specified" << std::endl; return EINVAL; } - RGWDataSyncStatusManager sync(store, source_zone); + RGWDataSyncStatusManager sync(store, store->get_async_rados(), source_zone); int ret = sync.init(); if (ret < 0) { diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 986aca6d6eda..2c468c5c698b 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -371,18 +371,10 @@ int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn) } source_zone = _source_zone; - - CephContext *cct = store->ctx(); - async_rados = new RGWAsyncRadosProcessor(store, cct->_conf->rgw_num_async_rados_threads); - async_rados->start(); - conn = _conn; int ret = http_manager.set_threaded(); if (ret < 0) { - async_rados->stop(); - delete async_rados; - async_rados = NULL; ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl; return ret; } @@ -395,10 +387,6 @@ int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn) void RGWRemoteDataLog::finish() { stop(); - if (async_rados) { - async_rados->stop(); - } - delete async_rados; } int RGWRemoteDataLog::get_shard_info(int shard_id) diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index eb64054c7a33..e5a59dbf365d 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -156,11 +156,13 @@ class RGWRemoteDataLog : public RGWCoroutinesManager { bool initialized; public: - RGWRemoteDataLog(RGWRados *_store, RGWDataSyncStatusManager *_sm) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store), - conn(NULL), - http_manager(store->ctx(), &completion_mgr), - status_manager(_sm), lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL), - initialized(false) {} + RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados, + RGWDataSyncStatusManager *_sm) + : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), + store(_store), conn(NULL), async_rados(async_rados), + http_manager(store->ctx(), &completion_mgr), + status_manager(_sm), lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL), + initialized(false) {} int init(const string& _source_zone, RGWRESTConn *_conn); void finish(); @@ -193,8 +195,10 @@ class RGWDataSyncStatusManager { int num_shards; public: - RGWDataSyncStatusManager(RGWRados *_store, const string& _source_zone) : store(_store), source_zone(_source_zone), conn(NULL), - source_log(store, this), num_shards(0) {} + RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados, + const string& _source_zone) + : store(_store), source_zone(_source_zone), conn(NULL), + source_log(store, async_rados, this), num_shards(0) {} int init(); rgw_data_sync_status& get_sync_status() { return sync_status; } diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 02aca1eed546..bdfdd470bea5 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -21,6 +21,7 @@ #include "rgw_metadata.h" #include "rgw_bucket.h" #include "rgw_rest_conn.h" +#include "rgw_cr_rados.h" #include "rgw_cr_rest.h" #include "cls/rgw/cls_rgw_ops.h" @@ -2899,7 +2900,8 @@ class RGWMetaSyncProcessorThread : public RGWSyncProcessorThread sync.stop(); } public: - RGWMetaSyncProcessorThread(RGWRados *_store) : RGWSyncProcessorThread(_store), sync(_store) {} + RGWMetaSyncProcessorThread(RGWRados *_store, RGWAsyncRadosProcessor *async_rados) + : RGWSyncProcessorThread(_store), sync(_store, async_rados) {} void wakeup_sync_shards(set& shard_ids) { for (set::iterator iter = shard_ids.begin(); iter != shard_ids.end(); ++iter) { @@ -2939,9 +2941,10 @@ class RGWDataSyncProcessorThread : public RGWSyncProcessorThread sync.stop(); } public: - RGWDataSyncProcessorThread(RGWRados *_store, const string& _source_zone) : RGWSyncProcessorThread(_store), - sync(_store, _source_zone), - initialized(false) {} + RGWDataSyncProcessorThread(RGWRados *_store, RGWAsyncRadosProcessor *async_rados, + const string& _source_zone) + : RGWSyncProcessorThread(_store), sync(_store, async_rados, _source_zone), + initialized(false) {} void wakeup_sync_shards(map >& shard_ids) { for (map >::iterator iter = shard_ids.begin(); iter != shard_ids.end(); ++iter) { @@ -3090,6 +3093,10 @@ void RGWRados::finalize() } delete meta_mgr; delete data_log; + if (async_rados) { + async_rados->stop(); + delete async_rados; + } if (use_gc_thread) { gc->stop_processor(); obj_expirer->stop_processor(); @@ -3664,9 +3671,12 @@ int RGWRados::init_complete() run_sync_thread = false; } + async_rados = new RGWAsyncRadosProcessor(this, cct->_conf->rgw_num_async_rados_threads); + async_rados->start(); + if (run_sync_thread) { Mutex::Locker l(meta_sync_thread_lock); - meta_sync_processor_thread = new RGWMetaSyncProcessorThread(this); + meta_sync_processor_thread = new RGWMetaSyncProcessorThread(this, async_rados); ret = meta_sync_processor_thread->init(); if (ret < 0) { ldout(cct, 0) << "ERROR: failed to initialize" << dendl; @@ -3677,7 +3687,7 @@ int RGWRados::init_complete() Mutex::Locker dl(data_sync_thread_lock); for (map::iterator iter = zone_conn_map.begin(); iter != zone_conn_map.end(); ++iter) { ldout(cct, 5) << "starting data sync thread for zone " << iter->first << dendl; - RGWDataSyncProcessorThread *thread = new RGWDataSyncProcessorThread(this, iter->first); + RGWDataSyncProcessorThread *thread = new RGWDataSyncProcessorThread(this, async_rados, iter->first); ret = thread->init(); if (ret < 0) { ldout(cct, 0) << "ERROR: failed to initialize" << dendl; diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 03271071dbd2..fe88933da688 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1668,6 +1668,7 @@ struct RGWObjectCtx { }; class Finisher; +class RGWAsyncRadosProcessor; class RGWRados { @@ -1725,6 +1726,8 @@ class RGWRados bool quota_threads; bool run_sync_thread; + RGWAsyncRadosProcessor* async_rados; + RGWMetaNotifier *meta_notifier; RGWDataNotifier *data_notifier; RGWMetaSyncProcessorThread *meta_sync_processor_thread; @@ -1798,8 +1801,8 @@ protected: public: RGWRados() : max_req_id(0), lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL), gc(NULL), obj_expirer(NULL), use_gc_thread(false), quota_threads(false), - run_sync_thread(false), meta_notifier(NULL), data_notifier(NULL), - meta_sync_processor_thread(NULL), + run_sync_thread(false), async_rados(nullptr), meta_notifier(NULL), + data_notifier(NULL), meta_sync_processor_thread(NULL), meta_sync_thread_lock("meta_sync_thread_lock"), data_sync_thread_lock("data_sync_thread_lock"), num_watchers(0), watchers(NULL), watch_initialized(false), @@ -1910,6 +1913,8 @@ public: // maintains a connected history of periods std::unique_ptr period_history; + RGWAsyncRadosProcessor* get_async_rados() const { return async_rados; }; + RGWMetadataManager *meta_mgr; RGWDataChangesLog *data_log; diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 45015ab6e3ff..27184567fb6e 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -153,10 +153,6 @@ int RGWRemoteMetaLog::read_log_info(rgw_mdlog_info *log_info) int RGWRemoteMetaLog::init() { - CephContext *cct = store->ctx(); - async_rados = new RGWAsyncRadosProcessor(store, cct->_conf->rgw_num_async_rados_threads); - async_rados->start(); - conn = store->rest_master_conn; int ret = http_manager.set_threaded(); @@ -174,10 +170,6 @@ void RGWRemoteMetaLog::finish() { going_down.set(1); stop(); - if (async_rados) { - async_rados->stop(); - } - delete async_rados; } int RGWRemoteMetaLog::list_shards(int num_shards) diff --git a/src/rgw/rgw_sync.h b/src/rgw/rgw_sync.h index 787217dbcc9d..1c413456fbfd 100644 --- a/src/rgw/rgw_sync.h +++ b/src/rgw/rgw_sync.h @@ -111,10 +111,12 @@ class RGWRemoteMetaLog : public RGWCoroutinesManager { atomic_t going_down; public: - RGWRemoteMetaLog(RGWRados *_store, RGWMetaSyncStatusManager *_sm) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store), - conn(NULL), async_rados(nullptr), - http_manager(store->ctx(), &completion_mgr), - status_manager(_sm), meta_sync_cr(NULL) {} + RGWRemoteMetaLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados, + RGWMetaSyncStatusManager *_sm) + : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), + store(_store), conn(NULL), async_rados(async_rados), + http_manager(store->ctx(), &completion_mgr), + status_manager(_sm), meta_sync_cr(NULL) {} int init(); void finish(); @@ -167,8 +169,9 @@ class RGWMetaSyncStatusManager { vector clone_markers; public: - RGWMetaSyncStatusManager(RGWRados *_store) : store(_store), master_log(store, this), num_shards(0), - ts_to_shard_lock("ts_to_shard_lock") {} + RGWMetaSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados) + : store(_store), master_log(store, async_rados, this), + num_shards(0), ts_to_shard_lock("ts_to_shard_lock") {} int init(); void finish();