]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: start resharding theard
authorOrit Wasserman <owasserm@redhat.com>
Sun, 21 May 2017 10:26:37 +0000 (13:26 +0300)
committerYehuda Sadeh <yehuda@redhat.com>
Mon, 5 Jun 2017 20:18:00 +0000 (13:18 -0700)
Signed-off-by: Orit Wasserman <owasserm@redhat.com>
src/rgw/librgw.cc
src/rgw/rgw_admin.cc
src/rgw/rgw_main.cc
src/rgw/rgw_object_expirer.cc
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_realm_reloader.cc
src/test/test_rgw_admin_opstate.cc

index f7bb502fa0060b70184dcf49b6ab9ab2fea51afc..17e2dff1ed08d1b806b4a661e096f4fcbd3d5633 100644 (file)
@@ -481,7 +481,8 @@ namespace rgw {
                                         g_conf->rgw_enable_gc_threads,
                                         g_conf->rgw_enable_lc_threads,
                                         g_conf->rgw_enable_quota_threads,
-                                        g_conf->rgw_run_sync_thread);
+                                        g_conf->rgw_run_sync_thread,
+                                        g_conf->rgw_dynamic_resharding);
 
     if (!store) {
       mutex.Lock();
index aa3ce9e1b23c0c0238b29261e7a39a3368c0ac84..23ae6b4a5c824aea1988ba137d717de3a0a92f05 100644 (file)
@@ -2915,7 +2915,7 @@ int main(int argc, const char **argv)
   if (raw_storage_op) {
     store = RGWStoreManager::get_raw_storage(g_ceph_context);
   } else {
-    store = RGWStoreManager::get_storage(g_ceph_context, false, false, false, false);
+    store = RGWStoreManager::get_storage(g_ceph_context, false, false, false, false, false);
   }
   if (!store) {
     cerr << "couldn't init storage provider" << std::endl;
index 5c3dbb0582b6beb3e9fddd694d1166763ea7250c..f4e57be713a9268fd22fc28072775ff46955d0d8 100644 (file)
@@ -339,7 +339,7 @@ int main(int argc, const char **argv)
 
   RGWRados *store = RGWStoreManager::get_storage(g_ceph_context,
       g_conf->rgw_enable_gc_threads, g_conf->rgw_enable_lc_threads, g_conf->rgw_enable_quota_threads,
-      g_conf->rgw_run_sync_thread);
+      g_conf->rgw_run_sync_thread, g_conf->rgw_dynamic_resharding);
   if (!store) {
     mutex.Lock();
     init_timer.cancel_all_events();
index 1d44a8797ccf2f2fa9b9e7915d974b7458bcaadb..21b8f9f299948cc7116c511b66f70fecf9c3dd33 100644 (file)
@@ -79,7 +79,7 @@ int main(const int argc, const char **argv)
 
   common_init_finish(g_ceph_context);
 
-  store = RGWStoreManager::get_storage(g_ceph_context, false, false, false, false);
+  store = RGWStoreManager::get_storage(g_ceph_context, false, false, false, false, false);
   if (!store) {
     std::cerr << "couldn't init storage provider" << std::endl;
     return EIO;
index 7594840540aefa8b084b272abd02b23be7acd4b0..38bf77a89f42f0a34ececaf54e0f33e7994b2cc6 100644 (file)
@@ -3723,6 +3723,10 @@ void RGWRados::finalize()
     reshard_wait->stop();
     reshard_wait.reset();
   }
+
+  if (run_reshard_thread) {
+    reshard->stop_processor();
+  }
   delete reshard;
   delete index_completion_manager;
 }
@@ -4512,10 +4516,10 @@ int RGWRados::init_complete()
 
   lc = new RGWLC();
   lc->initialize(cct, this);
-  
+
   if (use_lc_thread)
     lc->start_processor();
-  
+
   quota_handler = RGWQuotaHandler::generate_handler(this, quota_threads);
 
   bucket_index_max_shards = (cct->_conf->rgw_override_bucket_index_max_shards ? cct->_conf->rgw_override_bucket_index_max_shards :
@@ -4539,6 +4543,10 @@ int RGWRados::init_complete()
   reshard_wait = std::make_shared<RGWReshardWait>(this);
 
   reshard = new RGWReshard(this);
+  if (run_reshard_thread) {
+    reshard->start_processor();
+  }
+
   index_completion_manager = new RGWIndexCompletionManager(this);
   ret = index_completion_manager->start();
 
@@ -13534,7 +13542,7 @@ uint64_t RGWRados::next_bucket_id()
   return ++max_bucket_id;
 }
 
-RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread)
+RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread)
 {
   int use_cache = cct->_conf->rgw_cache_enabled;
   RGWRados *store = NULL;
@@ -13544,7 +13552,7 @@ RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_t
     store = new RGWCache<RGWRados>; 
   }
 
-  if (store->initialize(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread) < 0) {
+  if (store->initialize(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread, run_reshard_thread) < 0) {
     delete store;
     return NULL;
   }
index fe6633b9da3a936013cd37cd6f12d0990756ba54..9d1a1c7de38a00e84a8c94a392ad16a3f8c3f60f 100644 (file)
@@ -2226,6 +2226,7 @@ class RGWRados
   bool use_lc_thread;
   bool quota_threads;
   bool run_sync_thread;
+  bool run_reshard_thread;
 
   RGWAsyncRadosProcessor* async_rados;
 
@@ -2300,7 +2301,7 @@ protected:
   RGWQuotaHandler *quota_handler;
 
   Finisher *finisher;
-  
+
   RGWCoroutinesManagerRegistry *cr_registry;
 
   RGWSyncModulesManager *sync_modules_manager{nullptr};
@@ -2318,7 +2319,7 @@ protected:
 public:
   RGWRados() : lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL),
                gc(NULL), lc(NULL), obj_expirer(NULL), use_gc_thread(false), use_lc_thread(false), quota_threads(false),
-               run_sync_thread(false), async_rados(nullptr), meta_notifier(NULL),
+               run_sync_thread(false), run_reshard_thread(false), async_rados(nullptr), meta_notifier(NULL),
                data_notifier(NULL), meta_sync_processor_thread(NULL),
                meta_sync_thread_lock("meta_sync_thread_lock"), data_sync_thread_lock("data_sync_thread_lock"),
                num_watchers(0), watchers(NULL),
@@ -2497,12 +2498,13 @@ public:
 
   CephContext *ctx() { return cct; }
   /** do all necessary setup of the storage device */
-  int initialize(CephContext *_cct, bool _use_gc_thread, bool _use_lc_thread, bool _quota_threads, bool _run_sync_thread) {
+  int initialize(CephContext *_cct, bool _use_gc_thread, bool _use_lc_thread, bool _quota_threads, bool _run_sync_thread, bool _run_reshard_thread) {
     set_context(_cct);
     use_gc_thread = _use_gc_thread;
     use_lc_thread = _use_lc_thread;
     quota_threads = _quota_threads;
     run_sync_thread = _run_sync_thread;
+    run_reshard_thread = _run_reshard_thread;
     return initialize();
   }
   /** Initialize the RADOS instance and prepare to do other ops */
@@ -3619,15 +3621,16 @@ public:
 class RGWStoreManager {
 public:
   RGWStoreManager() {}
-  static RGWRados *get_storage(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread) {
-    RGWRados *store = init_storage_provider(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread);
+  static RGWRados *get_storage(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread) {
+    RGWRados *store = init_storage_provider(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread,
+                                           run_reshard_thread);
     return store;
   }
   static RGWRados *get_raw_storage(CephContext *cct) {
     RGWRados *store = init_raw_storage_provider(cct);
     return store;
   }
-  static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread);
+  static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread);
   static RGWRados *init_raw_storage_provider(CephContext *cct);
   static void close_storage(RGWRados *store);
 
index 8bd65b45d9fb0d319cddcfd14b071c1e665bb9a4..8df554a5f3b31415947178be4ddf3af538f27e67 100644 (file)
@@ -100,7 +100,8 @@ void RGWRealmReloader::reload()
                                          cct->_conf->rgw_enable_gc_threads,
                                          cct->_conf->rgw_enable_lc_threads,
                                          cct->_conf->rgw_enable_quota_threads,
-                                         cct->_conf->rgw_run_sync_thread);
+                                         cct->_conf->rgw_run_sync_thread,
+                                         cct->_conf->rgw_dynamic_resharding);
 
     ldout(cct, 1) << "Creating new store" << dendl;
 
index 00dde0cb81571a59197ae9dfa486cdcc755fb559..1a41df30f1aa08a3bb03292cdd6a8a9890016824 100644 (file)
@@ -808,7 +808,7 @@ int main(int argc, char *argv[]){
   auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
                         CODE_ENVIRONMENT_UTILITY, 0);
   common_init_finish(g_ceph_context);
-  store = RGWStoreManager::get_storage(g_ceph_context, false, false, false, false);
+  store = RGWStoreManager::get_storage(g_ceph_context, false, false, false, false, false);
   g_test = new admin_log::test_helper();
   finisher = new Finisher(g_ceph_context);
 #ifdef GTEST