From 9b3c91cbd932094aa1a542f5e1d454d48bd9ad40 Mon Sep 17 00:00:00 2001 From: Orit Wasserman Date: Sun, 21 May 2017 13:26:37 +0300 Subject: [PATCH] rgw: start resharding theard Signed-off-by: Orit Wasserman --- src/rgw/librgw.cc | 3 ++- src/rgw/rgw_admin.cc | 2 +- src/rgw/rgw_main.cc | 2 +- src/rgw/rgw_object_expirer.cc | 2 +- src/rgw/rgw_rados.cc | 16 ++++++++++++---- src/rgw/rgw_rados.h | 15 +++++++++------ src/rgw/rgw_realm_reloader.cc | 3 ++- src/test/test_rgw_admin_opstate.cc | 2 +- 8 files changed, 29 insertions(+), 16 deletions(-) diff --git a/src/rgw/librgw.cc b/src/rgw/librgw.cc index f7bb502fa00..17e2dff1ed0 100644 --- a/src/rgw/librgw.cc +++ b/src/rgw/librgw.cc @@ -481,7 +481,8 @@ namespace rgw { g_conf->rgw_enable_gc_threads, g_conf->rgw_enable_lc_threads, g_conf->rgw_enable_quota_threads, - g_conf->rgw_run_sync_thread); + g_conf->rgw_run_sync_thread, + g_conf->rgw_dynamic_resharding); if (!store) { mutex.Lock(); diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index aa3ce9e1b23..23ae6b4a5c8 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -2915,7 +2915,7 @@ int main(int argc, const char **argv) if (raw_storage_op) { store = RGWStoreManager::get_raw_storage(g_ceph_context); } else { - store = RGWStoreManager::get_storage(g_ceph_context, false, false, false, false); + store = RGWStoreManager::get_storage(g_ceph_context, false, false, false, false, false); } if (!store) { cerr << "couldn't init storage provider" << std::endl; diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index 5c3dbb0582b..f4e57be713a 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -339,7 +339,7 @@ int main(int argc, const char **argv) RGWRados *store = RGWStoreManager::get_storage(g_ceph_context, g_conf->rgw_enable_gc_threads, g_conf->rgw_enable_lc_threads, g_conf->rgw_enable_quota_threads, - g_conf->rgw_run_sync_thread); + g_conf->rgw_run_sync_thread, g_conf->rgw_dynamic_resharding); if (!store) { mutex.Lock(); init_timer.cancel_all_events(); diff --git a/src/rgw/rgw_object_expirer.cc b/src/rgw/rgw_object_expirer.cc index 1d44a8797cc..21b8f9f2999 100644 --- a/src/rgw/rgw_object_expirer.cc +++ b/src/rgw/rgw_object_expirer.cc @@ -79,7 +79,7 @@ int main(const int argc, const char **argv) common_init_finish(g_ceph_context); - store = RGWStoreManager::get_storage(g_ceph_context, false, false, false, false); + store = RGWStoreManager::get_storage(g_ceph_context, false, false, false, false, false); if (!store) { std::cerr << "couldn't init storage provider" << std::endl; return EIO; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 7594840540a..38bf77a89f4 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -3723,6 +3723,10 @@ void RGWRados::finalize() reshard_wait->stop(); reshard_wait.reset(); } + + if (run_reshard_thread) { + reshard->stop_processor(); + } delete reshard; delete index_completion_manager; } @@ -4512,10 +4516,10 @@ int RGWRados::init_complete() lc = new RGWLC(); lc->initialize(cct, this); - + if (use_lc_thread) lc->start_processor(); - + quota_handler = RGWQuotaHandler::generate_handler(this, quota_threads); bucket_index_max_shards = (cct->_conf->rgw_override_bucket_index_max_shards ? cct->_conf->rgw_override_bucket_index_max_shards : @@ -4539,6 +4543,10 @@ int RGWRados::init_complete() reshard_wait = std::make_shared(this); reshard = new RGWReshard(this); + if (run_reshard_thread) { + reshard->start_processor(); + } + index_completion_manager = new RGWIndexCompletionManager(this); ret = index_completion_manager->start(); @@ -13534,7 +13542,7 @@ uint64_t RGWRados::next_bucket_id() return ++max_bucket_id; } -RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread) +RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread) { int use_cache = cct->_conf->rgw_cache_enabled; RGWRados *store = NULL; @@ -13544,7 +13552,7 @@ RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_t store = new RGWCache; } - if (store->initialize(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread) < 0) { + if (store->initialize(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread, run_reshard_thread) < 0) { delete store; return NULL; } diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index fe6633b9da3..9d1a1c7de38 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -2226,6 +2226,7 @@ class RGWRados bool use_lc_thread; bool quota_threads; bool run_sync_thread; + bool run_reshard_thread; RGWAsyncRadosProcessor* async_rados; @@ -2300,7 +2301,7 @@ protected: RGWQuotaHandler *quota_handler; Finisher *finisher; - + RGWCoroutinesManagerRegistry *cr_registry; RGWSyncModulesManager *sync_modules_manager{nullptr}; @@ -2318,7 +2319,7 @@ protected: public: RGWRados() : lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL), gc(NULL), lc(NULL), obj_expirer(NULL), use_gc_thread(false), use_lc_thread(false), quota_threads(false), - run_sync_thread(false), async_rados(nullptr), meta_notifier(NULL), + run_sync_thread(false), run_reshard_thread(false), async_rados(nullptr), meta_notifier(NULL), data_notifier(NULL), meta_sync_processor_thread(NULL), meta_sync_thread_lock("meta_sync_thread_lock"), data_sync_thread_lock("data_sync_thread_lock"), num_watchers(0), watchers(NULL), @@ -2497,12 +2498,13 @@ public: CephContext *ctx() { return cct; } /** do all necessary setup of the storage device */ - int initialize(CephContext *_cct, bool _use_gc_thread, bool _use_lc_thread, bool _quota_threads, bool _run_sync_thread) { + int initialize(CephContext *_cct, bool _use_gc_thread, bool _use_lc_thread, bool _quota_threads, bool _run_sync_thread, bool _run_reshard_thread) { set_context(_cct); use_gc_thread = _use_gc_thread; use_lc_thread = _use_lc_thread; quota_threads = _quota_threads; run_sync_thread = _run_sync_thread; + run_reshard_thread = _run_reshard_thread; return initialize(); } /** Initialize the RADOS instance and prepare to do other ops */ @@ -3619,15 +3621,16 @@ public: class RGWStoreManager { public: RGWStoreManager() {} - static RGWRados *get_storage(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread) { - RGWRados *store = init_storage_provider(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread); + static RGWRados *get_storage(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread) { + RGWRados *store = init_storage_provider(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread, + run_reshard_thread); return store; } static RGWRados *get_raw_storage(CephContext *cct) { RGWRados *store = init_raw_storage_provider(cct); return store; } - static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread); + static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread); static RGWRados *init_raw_storage_provider(CephContext *cct); static void close_storage(RGWRados *store); diff --git a/src/rgw/rgw_realm_reloader.cc b/src/rgw/rgw_realm_reloader.cc index 8bd65b45d9f..8df554a5f3b 100644 --- a/src/rgw/rgw_realm_reloader.cc +++ b/src/rgw/rgw_realm_reloader.cc @@ -100,7 +100,8 @@ void RGWRealmReloader::reload() cct->_conf->rgw_enable_gc_threads, cct->_conf->rgw_enable_lc_threads, cct->_conf->rgw_enable_quota_threads, - cct->_conf->rgw_run_sync_thread); + cct->_conf->rgw_run_sync_thread, + cct->_conf->rgw_dynamic_resharding); ldout(cct, 1) << "Creating new store" << dendl; diff --git a/src/test/test_rgw_admin_opstate.cc b/src/test/test_rgw_admin_opstate.cc index 00dde0cb815..1a41df30f1a 100644 --- a/src/test/test_rgw_admin_opstate.cc +++ b/src/test/test_rgw_admin_opstate.cc @@ -808,7 +808,7 @@ int main(int argc, char *argv[]){ auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0); common_init_finish(g_ceph_context); - store = RGWStoreManager::get_storage(g_ceph_context, false, false, false, false); + store = RGWStoreManager::get_storage(g_ceph_context, false, false, false, false, false); g_test = new admin_log::test_helper(); finisher = new Finisher(g_ceph_context); #ifdef GTEST -- 2.39.5