]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: track all stacks, not just scheduled
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 23 Nov 2015 20:09:49 +0000 (12:09 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Feb 2016 00:13:37 +0000 (16:13 -0800)
And don't expand spawned stacks out of existing cr. This flattens the
admin socket output.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_coroutine.cc
src/rgw/rgw_coroutine.h

index 8cd451517f33ae65f586be5dc97f073cbf231b89..938984b93649aef3dd4bea78096628dd8545db7e 100644 (file)
@@ -182,7 +182,7 @@ RGWCoroutinesStack *RGWCoroutinesStack::spawn(RGWCoroutine *source_op, RGWCorout
   stack->get(); /* we'll need to collect the stack */
   stack->call(op);
 
-  env->stacks->push_back(stack);
+  env->manager->schedule(env, stack);
 
   if (wait) {
     set_blocked_by(stack);
@@ -344,14 +344,14 @@ void RGWCoroutinesStack::dump(Formatter *f) const {
   f->close_section();
 }
 
-void RGWCoroutinesManager::handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& stacks, RGWCoroutinesStack *stack, int *blocked_count)
+void RGWCoroutinesManager::handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks, RGWCoroutinesStack *stack, int *blocked_count)
 {
   RWLock::WLocker wl(lock);
   --(*blocked_count);
   stack->set_io_blocked(false);
   stack->set_interval_wait(false);
   if (!stack->is_done()) {
-    stacks.push_back(stack);
+    scheduled_stacks.push_back(stack);
   } else {
     RWLock::WLocker wl(lock);
     context_stacks.erase(stack);
@@ -359,25 +359,36 @@ void RGWCoroutinesManager::handle_unblocked_stack(set<RGWCoroutinesStack *>& con
   }
 }
 
+void RGWCoroutinesManager::schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack)
+{
+  assert(lock.is_wlocked());
+  env->scheduled_stacks->push_back(stack);
+  set<RGWCoroutinesStack *>& context_stacks = run_contexts[env->run_context];
+  context_stacks.insert(stack);
+}
+
 int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
 {
   int blocked_count = 0;
   int interval_wait_count = 0;
   RGWCoroutinesEnv env;
 
-  uint64_t run_num = run_context_count.inc();
+  uint64_t run_context = run_context_count.inc();
 
   lock.get_write();
-  set<RGWCoroutinesStack *>& context_stacks = run_contexts[run_num];
+  set<RGWCoroutinesStack *>& context_stacks = run_contexts[run_context];
+  list<RGWCoroutinesStack *> scheduled_stacks;
   for (auto& st : stacks) {
     context_stacks.insert(st);
+    scheduled_stacks.push_back(st);
   }
   lock.unlock();
 
+  env.run_context = run_context;
   env.manager = this;
-  env.stacks = &stacks;
+  env.scheduled_stacks = &scheduled_stacks;
 
-  for (list<RGWCoroutinesStack *>::iterator iter = stacks.begin(); iter != stacks.end() && !going_down.read();) {
+  for (list<RGWCoroutinesStack *>::iterator iter = scheduled_stacks.begin(); iter != scheduled_stacks.end() && !going_down.read();) {
     lock.get_write();
 
     RGWCoroutinesStack *stack = *iter;
@@ -434,7 +445,7 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
 
     RGWCoroutinesStack *blocked_stack;
     while (completion_mgr.try_get_next((void **)&blocked_stack)) {
-      handle_unblocked_stack(context_stacks, stacks, blocked_stack, &blocked_count);
+      handle_unblocked_stack(context_stacks, scheduled_stacks, blocked_stack, &blocked_count);
     }
 
     /*
@@ -447,14 +458,14 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
       if (ret < 0) {
        ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
       }
-      handle_unblocked_stack(context_stacks, stacks, blocked_stack, &blocked_count);
+      handle_unblocked_stack(context_stacks, scheduled_stacks, blocked_stack, &blocked_count);
     }
 
     ++iter;
-    stacks.pop_front();
+    scheduled_stacks.pop_front();
 
 
-    while (stacks.empty() && blocked_count > 0) {
+    while (scheduled_stacks.empty() && blocked_count > 0) {
       int ret = completion_mgr.get_next((void **)&blocked_stack);
       if (ret < 0) {
        ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
@@ -463,12 +474,12 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
        ldout(cct, 5) << __func__ << "(): was stopped, exiting" << dendl;
        return -ECANCELED;
       }
-      handle_unblocked_stack(context_stacks, stacks, blocked_stack, &blocked_count);
-      iter = stacks.begin();
+      handle_unblocked_stack(context_stacks, scheduled_stacks, blocked_stack, &blocked_count);
+      iter = scheduled_stacks.begin();
     }
 
-    if (iter == stacks.end()) {
-      iter = stacks.begin();
+    if (iter == scheduled_stacks.end()) {
+      iter = scheduled_stacks.begin();
     }
   }
 
@@ -520,6 +531,12 @@ void RGWCoroutinesManager::dump(Formatter *f) const {
   f->close_section();
 }
 
+RGWCoroutinesStack *RGWCoroutinesManager::allocate_stack() {
+  RGWCoroutinesStack *stack = new RGWCoroutinesStack(cct, this);
+  stack->get();
+  return stack;
+}
+
 string RGWCoroutinesManager::get_id()
 {
   if (!id.empty()) {
@@ -660,7 +677,9 @@ void RGWCoroutine::dump(Formatter *f) const {
   if (!spawned.entries.empty()) {
     f->open_array_section("spawned");
     for (auto& i : spawned.entries) {
-      encode_json("entry", *i, f);
+      char buf[32];
+      snprintf(buf, sizeof(buf), "%p", (void *)i);
+      encode_json("stack", string(buf), f);
     }
     f->close_section();
   }
index 7a403c58db89dbcd4e9f00e8a10e5b6a1212a4bf..396b7fd21fb025bd06ed04b338cd561ed9b9e16f 100644 (file)
@@ -93,13 +93,13 @@ public:
   }
 };
 
-
 struct RGWCoroutinesEnv {
+  uint64_t run_context;
   RGWCoroutinesManager *manager;
-  list<RGWCoroutinesStack *> *stacks;
+  list<RGWCoroutinesStack *> *scheduled_stacks;
   RGWCoroutinesStack *stack;
 
-  RGWCoroutinesEnv() : manager(NULL), stacks(NULL), stack(NULL) {}
+  RGWCoroutinesEnv() : run_context(0), manager(NULL), scheduled_stacks(NULL), stack(NULL) {}
 };
 
 enum RGWCoroutineState {
@@ -337,7 +337,7 @@ public:
 
   void schedule(list<RGWCoroutinesStack *> *stacks = NULL) {
     if (!stacks) {
-      stacks = env->stacks;
+      stacks = env->scheduled_stacks;
     }
     if (!is_scheduled) {
       stacks->push_back(this);
@@ -432,7 +432,7 @@ class RGWCoroutinesManager {
 
   RWLock lock;
 
-  void handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& stacks, RGWCoroutinesStack *stack, int *waiting_count);
+  void handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks, RGWCoroutinesStack *stack, int *waiting_count);
 protected:
   RGWCompletionManager completion_mgr;
   RGWCoroutinesManagerRegistry *cr_registry;
@@ -467,11 +467,8 @@ public:
   RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack);
   RGWCompletionManager *get_completion_mgr() { return &completion_mgr; }
 
-  RGWCoroutinesStack *allocate_stack() {
-    RGWCoroutinesStack *stack = new RGWCoroutinesStack(cct, this);
-    stack->get();
-    return stack;
-  }
+  void schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack);
+  RGWCoroutinesStack *allocate_stack();
 
   virtual string get_id();
   void dump(Formatter *f) const;