From: Yehuda Sadeh Date: Fri, 20 Nov 2015 19:58:18 +0000 (-0800) Subject: rgw: add locking to coroutines reporting X-Git-Tag: v10.1.0~354^2~183 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d920dab88718183b7ef13344aa8bfed2e305289b;p=ceph.git rgw: add locking to coroutines reporting Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc index 89f9c21a0fb0..25633be668ac 100644 --- a/src/rgw/rgw_coroutine.cc +++ b/src/rgw/rgw_coroutine.cc @@ -341,14 +341,16 @@ void RGWCoroutinesStack::dump(Formatter *f) const { f->close_section(); } -void RGWCoroutinesManager::handle_unblocked_stack(list& stacks, RGWCoroutinesStack *stack, int *blocked_count) +void RGWCoroutinesManager::handle_unblocked_stack(set& context_stacks, list& stacks, RGWCoroutinesStack *stack, int *blocked_count) { + RWLock::WLocker wl(lock); --(*blocked_count); stack->set_io_blocked(false); stack->set_interval_wait(false); if (!stack->is_done()) { stacks.push_back(stack); } else { + context_stacks.erase(stack); stack->put(); } } @@ -370,6 +372,8 @@ int RGWCoroutinesManager::run(list& stacks) env.stacks = &stacks; for (list::iterator iter = stacks.begin(); iter != stacks.end() && !going_down.read();) { + lock.get_write(); + RGWCoroutinesStack *stack = *iter; env.stack = stack; @@ -420,9 +424,11 @@ int RGWCoroutinesManager::run(list& stacks) stack->schedule(); } + lock.unlock(); + RGWCoroutinesStack *blocked_stack; while (completion_mgr.try_get_next((void **)&blocked_stack)) { - handle_unblocked_stack(stacks, blocked_stack, &blocked_count); + handle_unblocked_stack(context_stacks, stacks, blocked_stack, &blocked_count); } /* @@ -435,7 +441,7 @@ int RGWCoroutinesManager::run(list& stacks) if (ret < 0) { ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl; } - handle_unblocked_stack(stacks, blocked_stack, &blocked_count); + handle_unblocked_stack(context_stacks, stacks, blocked_stack, &blocked_count); } ++iter; @@ -451,7 +457,7 @@ int RGWCoroutinesManager::run(list& stacks) ldout(cct, 5) << __func__ << "(): was stopped, exiting" << dendl; return -ECANCELED; } - handle_unblocked_stack(stacks, blocked_stack, &blocked_count); + handle_unblocked_stack(context_stacks, stacks, blocked_stack, &blocked_count); iter = stacks.begin(); } @@ -492,6 +498,8 @@ RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCo } void RGWCoroutinesManager::dump(Formatter *f) const { + RWLock::RLocker rl(lock); + f->open_array_section("run_contexts"); for (auto& i : run_contexts) { char buf[32]; diff --git a/src/rgw/rgw_coroutine.h b/src/rgw/rgw_coroutine.h index b4331c59fcec..4410d455934e 100644 --- a/src/rgw/rgw_coroutine.h +++ b/src/rgw/rgw_coroutine.h @@ -408,7 +408,9 @@ class RGWCoroutinesManager { atomic64_t run_context_count; map > run_contexts; - void handle_unblocked_stack(list& stacks, RGWCoroutinesStack *stack, int *waiting_count); + RWLock lock; + + void handle_unblocked_stack(set& context_stacks, list& stacks, RGWCoroutinesStack *stack, int *waiting_count); protected: RGWCompletionManager completion_mgr; @@ -416,7 +418,7 @@ protected: void put_completion_notifier(RGWAioCompletionNotifier *cn); public: - RGWCoroutinesManager(CephContext *_cct) : cct(_cct), completion_mgr(cct), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {} + RGWCoroutinesManager(CephContext *_cct) : cct(_cct), lock("RGWCoroutinesManager::lock"), completion_mgr(cct), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {} virtual ~RGWCoroutinesManager() {} int run(list& ops);