From 085ee7326e3f75f40b13f47fb05982fa6eef231e Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 23 Nov 2015 12:09:49 -0800 Subject: [PATCH] rgw: track all stacks, not just scheduled And don't expand spawned stacks out of existing cr. This flattens the admin socket output. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_coroutine.cc | 51 +++++++++++++++++++++++++++------------- src/rgw/rgw_coroutine.h | 17 ++++++-------- 2 files changed, 42 insertions(+), 26 deletions(-) diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc index 8cd451517f33a..938984b93649a 100644 --- a/src/rgw/rgw_coroutine.cc +++ b/src/rgw/rgw_coroutine.cc @@ -182,7 +182,7 @@ RGWCoroutinesStack *RGWCoroutinesStack::spawn(RGWCoroutine *source_op, RGWCorout stack->get(); /* we'll need to collect the stack */ stack->call(op); - env->stacks->push_back(stack); + env->manager->schedule(env, stack); if (wait) { set_blocked_by(stack); @@ -344,14 +344,14 @@ void RGWCoroutinesStack::dump(Formatter *f) const { f->close_section(); } -void RGWCoroutinesManager::handle_unblocked_stack(set& context_stacks, list& stacks, RGWCoroutinesStack *stack, int *blocked_count) +void RGWCoroutinesManager::handle_unblocked_stack(set& context_stacks, list& scheduled_stacks, RGWCoroutinesStack *stack, int *blocked_count) { RWLock::WLocker wl(lock); --(*blocked_count); stack->set_io_blocked(false); stack->set_interval_wait(false); if (!stack->is_done()) { - stacks.push_back(stack); + scheduled_stacks.push_back(stack); } else { RWLock::WLocker wl(lock); context_stacks.erase(stack); @@ -359,25 +359,36 @@ void RGWCoroutinesManager::handle_unblocked_stack(set& con } } +void RGWCoroutinesManager::schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack) +{ + assert(lock.is_wlocked()); + env->scheduled_stacks->push_back(stack); + set& context_stacks = run_contexts[env->run_context]; + context_stacks.insert(stack); +} + int RGWCoroutinesManager::run(list& stacks) { int blocked_count = 0; int interval_wait_count = 0; RGWCoroutinesEnv env; - uint64_t run_num = run_context_count.inc(); + uint64_t run_context = run_context_count.inc(); lock.get_write(); - set& context_stacks = run_contexts[run_num]; + set& context_stacks = run_contexts[run_context]; + list scheduled_stacks; for (auto& st : stacks) { context_stacks.insert(st); + scheduled_stacks.push_back(st); } lock.unlock(); + env.run_context = run_context; env.manager = this; - env.stacks = &stacks; + env.scheduled_stacks = &scheduled_stacks; - for (list::iterator iter = stacks.begin(); iter != stacks.end() && !going_down.read();) { + for (list::iterator iter = scheduled_stacks.begin(); iter != scheduled_stacks.end() && !going_down.read();) { lock.get_write(); RGWCoroutinesStack *stack = *iter; @@ -434,7 +445,7 @@ int RGWCoroutinesManager::run(list& stacks) RGWCoroutinesStack *blocked_stack; while (completion_mgr.try_get_next((void **)&blocked_stack)) { - handle_unblocked_stack(context_stacks, stacks, blocked_stack, &blocked_count); + handle_unblocked_stack(context_stacks, scheduled_stacks, blocked_stack, &blocked_count); } /* @@ -447,14 +458,14 @@ int RGWCoroutinesManager::run(list& stacks) if (ret < 0) { ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl; } - handle_unblocked_stack(context_stacks, stacks, blocked_stack, &blocked_count); + handle_unblocked_stack(context_stacks, scheduled_stacks, blocked_stack, &blocked_count); } ++iter; - stacks.pop_front(); + scheduled_stacks.pop_front(); - while (stacks.empty() && blocked_count > 0) { + while (scheduled_stacks.empty() && blocked_count > 0) { int ret = completion_mgr.get_next((void **)&blocked_stack); if (ret < 0) { ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl; @@ -463,12 +474,12 @@ int RGWCoroutinesManager::run(list& stacks) ldout(cct, 5) << __func__ << "(): was stopped, exiting" << dendl; return -ECANCELED; } - handle_unblocked_stack(context_stacks, stacks, blocked_stack, &blocked_count); - iter = stacks.begin(); + handle_unblocked_stack(context_stacks, scheduled_stacks, blocked_stack, &blocked_count); + iter = scheduled_stacks.begin(); } - if (iter == stacks.end()) { - iter = stacks.begin(); + if (iter == scheduled_stacks.end()) { + iter = scheduled_stacks.begin(); } } @@ -520,6 +531,12 @@ void RGWCoroutinesManager::dump(Formatter *f) const { f->close_section(); } +RGWCoroutinesStack *RGWCoroutinesManager::allocate_stack() { + RGWCoroutinesStack *stack = new RGWCoroutinesStack(cct, this); + stack->get(); + return stack; +} + string RGWCoroutinesManager::get_id() { if (!id.empty()) { @@ -660,7 +677,9 @@ void RGWCoroutine::dump(Formatter *f) const { if (!spawned.entries.empty()) { f->open_array_section("spawned"); for (auto& i : spawned.entries) { - encode_json("entry", *i, f); + char buf[32]; + snprintf(buf, sizeof(buf), "%p", (void *)i); + encode_json("stack", string(buf), f); } f->close_section(); } diff --git a/src/rgw/rgw_coroutine.h b/src/rgw/rgw_coroutine.h index 7a403c58db89d..396b7fd21fb02 100644 --- a/src/rgw/rgw_coroutine.h +++ b/src/rgw/rgw_coroutine.h @@ -93,13 +93,13 @@ public: } }; - struct RGWCoroutinesEnv { + uint64_t run_context; RGWCoroutinesManager *manager; - list *stacks; + list *scheduled_stacks; RGWCoroutinesStack *stack; - RGWCoroutinesEnv() : manager(NULL), stacks(NULL), stack(NULL) {} + RGWCoroutinesEnv() : run_context(0), manager(NULL), scheduled_stacks(NULL), stack(NULL) {} }; enum RGWCoroutineState { @@ -337,7 +337,7 @@ public: void schedule(list *stacks = NULL) { if (!stacks) { - stacks = env->stacks; + stacks = env->scheduled_stacks; } if (!is_scheduled) { stacks->push_back(this); @@ -432,7 +432,7 @@ class RGWCoroutinesManager { RWLock lock; - void handle_unblocked_stack(set& context_stacks, list& stacks, RGWCoroutinesStack *stack, int *waiting_count); + void handle_unblocked_stack(set& context_stacks, list& scheduled_stacks, RGWCoroutinesStack *stack, int *waiting_count); protected: RGWCompletionManager completion_mgr; RGWCoroutinesManagerRegistry *cr_registry; @@ -467,11 +467,8 @@ public: RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack); RGWCompletionManager *get_completion_mgr() { return &completion_mgr; } - RGWCoroutinesStack *allocate_stack() { - RGWCoroutinesStack *stack = new RGWCoroutinesStack(cct, this); - stack->get(); - return stack; - } + void schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack); + RGWCoroutinesStack *allocate_stack(); virtual string get_id(); void dump(Formatter *f) const; -- 2.39.5