From f7ac12228241e9c29172ad562337d4f22d3eb59b Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 8 Sep 2015 15:24:16 -0700 Subject: [PATCH] rgw: run and stop sync thread Signed-off-by: Yehuda Sadeh --- src/common/config_opts.h | 1 + src/rgw/rgw_admin.cc | 2 +- src/rgw/rgw_coroutine.cc | 6 ++++- src/rgw/rgw_coroutine.h | 5 ++++ src/rgw/rgw_main.cc | 3 ++- src/rgw/rgw_rados.cc | 55 ++++++++++++++++++++++++++++++++++++++-- src/rgw/rgw_rados.h | 15 +++++++---- src/rgw/rgw_sync.h | 4 +++ 8 files changed, 81 insertions(+), 10 deletions(-) diff --git a/src/common/config_opts.h b/src/common/config_opts.h index f897fcc2c7a74..efa439731eb89 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -1291,6 +1291,7 @@ OPTION(rgw_enable_static_website, OPT_BOOL, false) // enable static website feat OPTION(rgw_num_async_rados_threads, OPT_INT, 32) // num of threads to use for async rados operations OPTION(rgw_md_notify_interval_msec, OPT_INT, 200) // metadata changes notification interval to followers +OPTION(rgw_run_sync_thread, OPT_BOOL, true) // whether radosgw (not radosgw-admin) spawns the sync thread OPTION(mutex_perf_counter, OPT_BOOL, false) // enable/disable mutex perf counter OPTION(throttler_perf_counter, OPT_BOOL, true) // enable/disable throttler perf counter diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 76a925db726b0..e9ce55ec8914a 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -1724,7 +1724,7 @@ int main(int argc, char **argv) if (raw_storage_op) { store = RGWStoreManager::get_raw_storage(g_ceph_context); } else { - store = RGWStoreManager::get_storage(g_ceph_context, false, false); + store = RGWStoreManager::get_storage(g_ceph_context, false, false, false); } if (!store) { cerr << "couldn't init storage provider" << std::endl; diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc index 7678ff78e12e7..3fd23a4b1b102 100644 --- a/src/rgw/rgw_coroutine.cc +++ b/src/rgw/rgw_coroutine.cc @@ -220,7 +220,7 @@ int RGWCoroutinesManager::run(list& stacks) env.manager = this; env.stacks = &stacks; - for (list::iterator iter = stacks.begin(); iter != stacks.end();) { + for (list::iterator iter = stacks.begin(); iter != stacks.end() && !going_down.read();) { RGWCoroutinesStack *stack = *iter; env.stack = stack; @@ -273,6 +273,10 @@ int RGWCoroutinesManager::run(list& stacks) if (ret < 0) { ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl; } + if (going_down.read() > 0) { + ldout(cct, 5) << __func__ << "(): was stopped, exiting" << dendl; + return 0; + } handle_unblocked_stack(stacks, blocked_stack, &blocked_count); iter = stacks.begin(); } diff --git a/src/rgw/rgw_coroutine.h b/src/rgw/rgw_coroutine.h index 223bf6c580236..0c5731f8a99a6 100644 --- a/src/rgw/rgw_coroutine.h +++ b/src/rgw/rgw_coroutine.h @@ -266,6 +266,7 @@ void RGWConsumerCR::receive(const T& p, bool wakeup) class RGWCoroutinesManager { CephContext *cct; + atomic_t going_down; void handle_unblocked_stack(list& stacks, RGWCoroutinesStack *stack, int *waiting_count); protected: @@ -280,6 +281,10 @@ public: int run(list& ops); int run(RGWCoroutine *op); + void stop() { + going_down.set(1); + completion_mgr.go_down(); + } virtual void report_error(RGWCoroutinesStack *op); diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index df3af8a8c257d..090ccbd299b81 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -1115,7 +1115,8 @@ int main(int argc, const char **argv) int r = 0; RGWRados *store = RGWStoreManager::get_storage(g_ceph_context, - g_conf->rgw_enable_gc_threads, g_conf->rgw_enable_quota_threads); + g_conf->rgw_enable_gc_threads, g_conf->rgw_enable_quota_threads, + g_conf->rgw_run_sync_thread); if (!store) { mutex.Lock(); init_timer.cancel_all_events(); diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 8aa936a33f830..55bf52f694c08 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -54,6 +54,7 @@ using namespace librados; #include "rgw_gc.h" #include "rgw_object_expirer_core.h" +#include "rgw_sync.h" #define dout_subsys ceph_subsys_rgw @@ -2066,6 +2067,7 @@ public: stop(); } + virtual int init() { return 0; } virtual int process() = 0; bool going_down() { return down_flag.read() != 0; } @@ -2165,6 +2167,40 @@ int RGWMetaNotifier::process() return 0; } +class RGWSyncProcessorThread : public RGWRadosThread { + CephContext *cct; + RGWMetaSyncStatusManager sync; + + uint64_t interval_msec() { + return 0; /* no interval associated, it'll run once until stopped */ + } + void stop_processor() { + sync.stop(); + } +public: + RGWSyncProcessorThread(RGWRados *_store) : RGWRadosThread(_store), cct(_store->ctx()), sync(_store) {} + + int init(); + int process(); +}; + +int RGWSyncProcessorThread::init() +{ + int ret = sync.init(); + if (ret < 0) { + ldout(cct, 0) << "ERROR: sync.init() returned " << ret << dendl; + return ret; + } + + return 0; +} + +int RGWSyncProcessorThread::process() +{ + sync.run(); + return 0; +} + int RGWRados::get_required_alignment(rgw_bucket& bucket, uint64_t *alignment) { IoCtx ioctx; @@ -2251,6 +2287,11 @@ void RGWRados::finalize() obj_expirer = NULL; meta_notifier->stop(); + if (run_sync_thread) { + sync_processor_thread->stop(); + delete sync_processor_thread; + sync_processor_thread = NULL; + } delete rest_master_conn; map::iterator iter; @@ -2544,6 +2585,16 @@ int RGWRados::init_complete() meta_notifier->start(); } + if (run_sync_thread) { + sync_processor_thread = new RGWSyncProcessorThread(this); + ret = sync_processor_thread->init(); + if (ret < 0) { + ldout(cct, 0) << "ERROR: failed to initialize" << dendl; + return ret; + } + sync_processor_thread->start(); + } + quota_handler = RGWQuotaHandler::generate_handler(this, quota_threads); bucket_index_max_shards = (cct->_conf->rgw_override_bucket_index_max_shards ? cct->_conf->rgw_override_bucket_index_max_shards : @@ -10322,7 +10373,7 @@ uint64_t RGWRados::next_bucket_id() return ++max_bucket_id; } -RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool quota_threads) +RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool quota_threads, bool run_sync_thread) { int use_cache = cct->_conf->rgw_cache_enabled; RGWRados *store = NULL; @@ -10332,7 +10383,7 @@ RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_t store = new RGWCache; } - if (store->initialize(cct, use_gc_thread, quota_threads) < 0) { + if (store->initialize(cct, use_gc_thread, quota_threads, run_sync_thread) < 0) { delete store; return NULL; } diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 0c791a1799701..006f0a99d3aa2 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -23,6 +23,7 @@ class ACLOwner; class RGWGC; class RGWMetaNotifier; class RGWObjectExpirer; +class RGWSyncProcessorThread; class RGWRESTConn; /* flags for put_obj_meta() */ @@ -1523,6 +1524,7 @@ class RGWRados friend class RGWGC; friend class RGWMetaNotifier; friend class RGWObjectExpirer; + friend class RGWSyncProcessorThread; friend class RGWStateLog; friend class RGWReplicaLogger; @@ -1569,8 +1571,10 @@ class RGWRados RGWObjectExpirer *obj_expirer; bool use_gc_thread; bool quota_threads; + bool run_sync_thread; RGWMetaNotifier *meta_notifier; + RGWSyncProcessorThread *sync_processor_thread; int num_watchers; RGWWatcher **watchers; @@ -1628,7 +1632,7 @@ protected: public: RGWRados() : max_req_id(0), lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL), gc(NULL), obj_expirer(NULL), use_gc_thread(false), quota_threads(false), - meta_notifier(NULL), + run_sync_thread(false), meta_notifier(NULL), num_watchers(0), watchers(NULL), watch_initialized(false), bucket_id_lock("rados_bucket_id"), @@ -1709,10 +1713,11 @@ public: CephContext *ctx() { return cct; } /** do all necessary setup of the storage device */ - int initialize(CephContext *_cct, bool _use_gc_thread, bool _quota_threads) { + int initialize(CephContext *_cct, bool _use_gc_thread, bool _quota_threads, bool _run_sync_thread) { set_context(_cct); use_gc_thread = _use_gc_thread; quota_threads = _quota_threads; + run_sync_thread = _run_sync_thread; return initialize(); } /** Initialize the RADOS instance and prepare to do other ops */ @@ -2667,15 +2672,15 @@ public: class RGWStoreManager { public: RGWStoreManager() {} - static RGWRados *get_storage(CephContext *cct, bool use_gc_thread, bool quota_threads) { - RGWRados *store = init_storage_provider(cct, use_gc_thread, quota_threads); + static RGWRados *get_storage(CephContext *cct, bool use_gc_thread, bool quota_threads, bool run_sync_thread) { + RGWRados *store = init_storage_provider(cct, use_gc_thread, quota_threads, run_sync_thread); return store; } static RGWRados *get_raw_storage(CephContext *cct) { RGWRados *store = init_raw_storage_provider(cct); return store; } - static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread, bool quota_threads); + static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread, bool quota_threads, bool run_sync_thread); static RGWRados *init_raw_storage_provider(CephContext *cct); static void close_storage(RGWRados *store); diff --git a/src/rgw/rgw_sync.h b/src/rgw/rgw_sync.h index 8d06bf67d44a0..8db33c9fab4fb 100644 --- a/src/rgw/rgw_sync.h +++ b/src/rgw/rgw_sync.h @@ -204,6 +204,10 @@ public: int clone_shards() { return master_log.clone_shards(num_shards, clone_markers); } int run() { return master_log.run_sync(num_shards, sync_status); } + + void stop() { + master_log.stop(); + } }; #endif -- 2.39.5