]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: metadata and data sync share RGWAsyncRadosProcessor
authorCasey Bodley <cbodley@redhat.com>
Tue, 5 Jan 2016 18:30:48 +0000 (13:30 -0500)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Feb 2016 00:13:55 +0000 (16:13 -0800)
each RGWAsyncRadosProcessor creates 32 worker threads by default, so we
should only create one instance and share it between callers

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_admin.cc
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_sync.cc
src/rgw/rgw_sync.h

index a3af3ea779d3e40cc30ac1078918cd8fd81c167d..ce9711c016a2077f13ba9114d457562dfa842724 100644 (file)
@@ -4465,7 +4465,7 @@ next:
   }
   
   if (opt_cmd == OPT_MDLOG_FETCH) {
-    RGWMetaSyncStatusManager sync(store);
+    RGWMetaSyncStatusManager sync(store, store->get_async_rados());
 
     int ret = sync.init();
     if (ret < 0) {
@@ -4482,7 +4482,7 @@ next:
   }
 
   if (opt_cmd == OPT_METADATA_SYNC_STATUS) {
-    RGWMetaSyncStatusManager sync(store);
+    RGWMetaSyncStatusManager sync(store, store->get_async_rados());
 
     int ret = sync.init();
     if (ret < 0) {
@@ -4524,7 +4524,7 @@ next:
   }
 
   if (opt_cmd == OPT_METADATA_SYNC_INIT) {
-    RGWMetaSyncStatusManager sync(store);
+    RGWMetaSyncStatusManager sync(store, store->get_async_rados());
 
     int ret = sync.init();
     if (ret < 0) {
@@ -4540,7 +4540,7 @@ next:
 
 
   if (opt_cmd == OPT_METADATA_SYNC_RUN) {
-    RGWMetaSyncStatusManager sync(store);
+    RGWMetaSyncStatusManager sync(store, store->get_async_rados());
 
     int ret = sync.init();
     if (ret < 0) {
@@ -4560,7 +4560,7 @@ next:
       cerr << "ERROR: source zone not specified" << std::endl;
       return EINVAL;
     }
-    RGWDataSyncStatusManager sync(store, source_zone);
+    RGWDataSyncStatusManager sync(store, store->get_async_rados(), source_zone);
 
     int ret = sync.init();
     if (ret < 0) {
@@ -4605,7 +4605,7 @@ next:
       cerr << "ERROR: source zone not specified" << std::endl;
       return EINVAL;
     }
-    RGWDataSyncStatusManager sync(store, source_zone);
+    RGWDataSyncStatusManager sync(store, store->get_async_rados(), source_zone);
 
     int ret = sync.init();
     if (ret < 0) {
@@ -4625,7 +4625,7 @@ next:
       cerr << "ERROR: source zone not specified" << std::endl;
       return EINVAL;
     }
-    RGWDataSyncStatusManager sync(store, source_zone);
+    RGWDataSyncStatusManager sync(store, store->get_async_rados(), source_zone);
 
     int ret = sync.init();
     if (ret < 0) {
index 986aca6d6eda1f4683e38884e031f50213d71997..2c468c5c698b3227396875e126a7837f9eb24621 100644 (file)
@@ -371,18 +371,10 @@ int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn)
   }
 
   source_zone = _source_zone;
-
-  CephContext *cct = store->ctx();
-  async_rados = new RGWAsyncRadosProcessor(store, cct->_conf->rgw_num_async_rados_threads);
-  async_rados->start();
-
   conn = _conn;
 
   int ret = http_manager.set_threaded();
   if (ret < 0) {
-    async_rados->stop();
-    delete async_rados;
-    async_rados = NULL;
     ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
     return ret;
   }
@@ -395,10 +387,6 @@ int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn)
 void RGWRemoteDataLog::finish()
 {
   stop();
-  if (async_rados) {
-    async_rados->stop();
-  }
-  delete async_rados;
 }
 
 int RGWRemoteDataLog::get_shard_info(int shard_id)
index eb64054c7a33a5bad4bd3ac476d65df2d7cfef9f..e5a59dbf365d3831f17fa4f7a812034bba54d620 100644 (file)
@@ -156,11 +156,13 @@ class RGWRemoteDataLog : public RGWCoroutinesManager {
   bool initialized;
 
 public:
-  RGWRemoteDataLog(RGWRados *_store, RGWDataSyncStatusManager *_sm) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store),
-                                       conn(NULL),
-                                       http_manager(store->ctx(), &completion_mgr),
-                                       status_manager(_sm), lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
-                                       initialized(false) {}
+  RGWRemoteDataLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
+                   RGWDataSyncStatusManager *_sm)
+    : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
+      store(_store), conn(NULL), async_rados(async_rados),
+      http_manager(store->ctx(), &completion_mgr),
+      status_manager(_sm), lock("RGWRemoteDataLog::lock"), data_sync_cr(NULL),
+      initialized(false) {}
 
   int init(const string& _source_zone, RGWRESTConn *_conn);
   void finish();
@@ -193,8 +195,10 @@ class RGWDataSyncStatusManager {
   int num_shards;
 
 public:
-  RGWDataSyncStatusManager(RGWRados *_store, const string& _source_zone) : store(_store), source_zone(_source_zone), conn(NULL),
-                                                                           source_log(store, this), num_shards(0) {}
+  RGWDataSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
+                           const string& _source_zone)
+    : store(_store), source_zone(_source_zone), conn(NULL),
+      source_log(store, async_rados, this), num_shards(0) {}
   int init();
 
   rgw_data_sync_status& get_sync_status() { return sync_status; }
index 02aca1eed5466632c05415c0141c0417169b4b51..bdfdd470bea5cee546f7bc398b3960f46c9f5f08 100644 (file)
@@ -21,6 +21,7 @@
 #include "rgw_metadata.h"
 #include "rgw_bucket.h"
 #include "rgw_rest_conn.h"
+#include "rgw_cr_rados.h"
 #include "rgw_cr_rest.h"
 
 #include "cls/rgw/cls_rgw_ops.h"
@@ -2899,7 +2900,8 @@ class RGWMetaSyncProcessorThread : public RGWSyncProcessorThread
     sync.stop();
   }
 public:
-  RGWMetaSyncProcessorThread(RGWRados *_store) : RGWSyncProcessorThread(_store), sync(_store) {}
+  RGWMetaSyncProcessorThread(RGWRados *_store, RGWAsyncRadosProcessor *async_rados)
+    : RGWSyncProcessorThread(_store), sync(_store, async_rados) {}
 
   void wakeup_sync_shards(set<int>& shard_ids) {
     for (set<int>::iterator iter = shard_ids.begin(); iter != shard_ids.end(); ++iter) {
@@ -2939,9 +2941,10 @@ class RGWDataSyncProcessorThread : public RGWSyncProcessorThread
     sync.stop();
   }
 public:
-  RGWDataSyncProcessorThread(RGWRados *_store, const string& _source_zone) :  RGWSyncProcessorThread(_store),
-                                                                              sync(_store, _source_zone),
-                                                                              initialized(false) {}
+  RGWDataSyncProcessorThread(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
+                             const string& _source_zone)
+    : RGWSyncProcessorThread(_store), sync(_store, async_rados, _source_zone),
+      initialized(false) {}
 
   void wakeup_sync_shards(map<int, set<string> >& shard_ids) {
     for (map<int, set<string> >::iterator iter = shard_ids.begin(); iter != shard_ids.end(); ++iter) {
@@ -3090,6 +3093,10 @@ void RGWRados::finalize()
   }
   delete meta_mgr;
   delete data_log;
+  if (async_rados) {
+    async_rados->stop();
+    delete async_rados;
+  }
   if (use_gc_thread) {
     gc->stop_processor();
     obj_expirer->stop_processor();
@@ -3664,9 +3671,12 @@ int RGWRados::init_complete()
     run_sync_thread = false;
   }
 
+  async_rados = new RGWAsyncRadosProcessor(this, cct->_conf->rgw_num_async_rados_threads);
+  async_rados->start();
+
   if (run_sync_thread) {
     Mutex::Locker l(meta_sync_thread_lock);
-    meta_sync_processor_thread = new RGWMetaSyncProcessorThread(this);
+    meta_sync_processor_thread = new RGWMetaSyncProcessorThread(this, async_rados);
     ret = meta_sync_processor_thread->init();
     if (ret < 0) {
       ldout(cct, 0) << "ERROR: failed to initialize" << dendl;
@@ -3677,7 +3687,7 @@ int RGWRados::init_complete()
     Mutex::Locker dl(data_sync_thread_lock);
     for (map<string, RGWRESTConn *>::iterator iter = zone_conn_map.begin(); iter != zone_conn_map.end(); ++iter) {
       ldout(cct, 5) << "starting data sync thread for zone " << iter->first << dendl;
-      RGWDataSyncProcessorThread *thread = new RGWDataSyncProcessorThread(this, iter->first);
+      RGWDataSyncProcessorThread *thread = new RGWDataSyncProcessorThread(this, async_rados, iter->first);
       ret = thread->init();
       if (ret < 0) {
         ldout(cct, 0) << "ERROR: failed to initialize" << dendl;
index 03271071dbd2fb682558ee797e01293cf089799e..fe88933da688056e28c38cb33dee3ba35f63a77a 100644 (file)
@@ -1668,6 +1668,7 @@ struct RGWObjectCtx {
 };
 
 class Finisher;
+class RGWAsyncRadosProcessor;
 
 class RGWRados
 {
@@ -1725,6 +1726,8 @@ class RGWRados
   bool quota_threads;
   bool run_sync_thread;
 
+  RGWAsyncRadosProcessor* async_rados;
+
   RGWMetaNotifier *meta_notifier;
   RGWDataNotifier *data_notifier;
   RGWMetaSyncProcessorThread *meta_sync_processor_thread;
@@ -1798,8 +1801,8 @@ protected:
 public:
   RGWRados() : max_req_id(0), lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL),
                gc(NULL), obj_expirer(NULL), use_gc_thread(false), quota_threads(false),
-               run_sync_thread(false), meta_notifier(NULL), data_notifier(NULL),
-               meta_sync_processor_thread(NULL),
+               run_sync_thread(false), async_rados(nullptr), meta_notifier(NULL),
+               data_notifier(NULL), meta_sync_processor_thread(NULL),
                meta_sync_thread_lock("meta_sync_thread_lock"), data_sync_thread_lock("data_sync_thread_lock"),
                num_watchers(0), watchers(NULL),
                watch_initialized(false),
@@ -1910,6 +1913,8 @@ public:
   // maintains a connected history of periods
   std::unique_ptr<RGWPeriodHistory> period_history;
 
+  RGWAsyncRadosProcessor* get_async_rados() const { return async_rados; };
+
   RGWMetadataManager *meta_mgr;
 
   RGWDataChangesLog *data_log;
index 45015ab6e3fffbae0fdc57b01d5522310cd1ba10..27184567fb6e37f61e79d8594d639e8be4d71cb8 100644 (file)
@@ -153,10 +153,6 @@ int RGWRemoteMetaLog::read_log_info(rgw_mdlog_info *log_info)
 
 int RGWRemoteMetaLog::init()
 {
-  CephContext *cct = store->ctx();
-  async_rados = new RGWAsyncRadosProcessor(store, cct->_conf->rgw_num_async_rados_threads);
-  async_rados->start();
-
   conn = store->rest_master_conn;
 
   int ret = http_manager.set_threaded();
@@ -174,10 +170,6 @@ void RGWRemoteMetaLog::finish()
 {
   going_down.set(1);
   stop();
-  if (async_rados) {
-    async_rados->stop();
-  }
-  delete async_rados;
 }
 
 int RGWRemoteMetaLog::list_shards(int num_shards)
index 787217dbcc9dbdfffbb0791f6dd789d6a2ea5b4b..1c413456fbfd74c7c41546236ebb526660839e75 100644 (file)
@@ -111,10 +111,12 @@ class RGWRemoteMetaLog : public RGWCoroutinesManager {
   atomic_t going_down;
 
 public:
-  RGWRemoteMetaLog(RGWRados *_store, RGWMetaSyncStatusManager *_sm) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store),
-                                       conn(NULL), async_rados(nullptr),
-                                       http_manager(store->ctx(), &completion_mgr),
-                                       status_manager(_sm), meta_sync_cr(NULL) {}
+  RGWRemoteMetaLog(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
+                   RGWMetaSyncStatusManager *_sm)
+    : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()),
+      store(_store), conn(NULL), async_rados(async_rados),
+      http_manager(store->ctx(), &completion_mgr),
+      status_manager(_sm), meta_sync_cr(NULL) {}
 
   int init();
   void finish();
@@ -167,8 +169,9 @@ class RGWMetaSyncStatusManager {
   vector<string> clone_markers;
 
 public:
-  RGWMetaSyncStatusManager(RGWRados *_store) : store(_store), master_log(store, this), num_shards(0),
-                                               ts_to_shard_lock("ts_to_shard_lock") {}
+  RGWMetaSyncStatusManager(RGWRados *_store, RGWAsyncRadosProcessor *async_rados)
+    : store(_store), master_log(store, async_rados, this),
+      num_shards(0), ts_to_shard_lock("ts_to_shard_lock") {}
   int init();
   void finish();