]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add locking to coroutines reporting
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 20 Nov 2015 19:58:18 +0000 (11:58 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Feb 2016 00:13:37 +0000 (16:13 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_coroutine.cc
src/rgw/rgw_coroutine.h

index 89f9c21a0fb04af4bee12adbee14a67f99985ef5..25633be668acff5a1b62cc8ab0b9fc7d2aa1f136 100644 (file)
@@ -341,14 +341,16 @@ void RGWCoroutinesStack::dump(Formatter *f) const {
   f->close_section();
 }
 
-void RGWCoroutinesManager::handle_unblocked_stack(list<RGWCoroutinesStack *>& stacks, RGWCoroutinesStack *stack, int *blocked_count)
+void RGWCoroutinesManager::handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& stacks, RGWCoroutinesStack *stack, int *blocked_count)
 {
+  RWLock::WLocker wl(lock);
   --(*blocked_count);
   stack->set_io_blocked(false);
   stack->set_interval_wait(false);
   if (!stack->is_done()) {
     stacks.push_back(stack);
   } else {
+    context_stacks.erase(stack);
     stack->put();
   }
 }
@@ -370,6 +372,8 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
   env.stacks = &stacks;
 
   for (list<RGWCoroutinesStack *>::iterator iter = stacks.begin(); iter != stacks.end() && !going_down.read();) {
+    lock.get_write();
+
     RGWCoroutinesStack *stack = *iter;
     env.stack = stack;
 
@@ -420,9 +424,11 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
       stack->schedule();
     }
 
+    lock.unlock();
+
     RGWCoroutinesStack *blocked_stack;
     while (completion_mgr.try_get_next((void **)&blocked_stack)) {
-      handle_unblocked_stack(stacks, blocked_stack, &blocked_count);
+      handle_unblocked_stack(context_stacks, stacks, blocked_stack, &blocked_count);
     }
 
     /*
@@ -435,7 +441,7 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
       if (ret < 0) {
        ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
       }
-      handle_unblocked_stack(stacks, blocked_stack, &blocked_count);
+      handle_unblocked_stack(context_stacks, stacks, blocked_stack, &blocked_count);
     }
 
     ++iter;
@@ -451,7 +457,7 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
        ldout(cct, 5) << __func__ << "(): was stopped, exiting" << dendl;
        return -ECANCELED;
       }
-      handle_unblocked_stack(stacks, blocked_stack, &blocked_count);
+      handle_unblocked_stack(context_stacks, stacks, blocked_stack, &blocked_count);
       iter = stacks.begin();
     }
 
@@ -492,6 +498,8 @@ RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCo
 }
 
 void RGWCoroutinesManager::dump(Formatter *f) const {
+  RWLock::RLocker rl(lock);
+
   f->open_array_section("run_contexts");
   for (auto& i : run_contexts) {
     char buf[32];
index b4331c59fcec2ed3d0749d2d8473f2f2956f3cc5..4410d455934ede3cb252bbae9b26b5dfcb68af72 100644 (file)
@@ -408,7 +408,9 @@ class RGWCoroutinesManager {
   atomic64_t run_context_count;
   map<uint64_t, set<RGWCoroutinesStack *> > run_contexts;
 
-  void handle_unblocked_stack(list<RGWCoroutinesStack *>& stacks, RGWCoroutinesStack *stack, int *waiting_count);
+  RWLock lock;
+
+  void handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& stacks, RGWCoroutinesStack *stack, int *waiting_count);
 protected:
   RGWCompletionManager completion_mgr;
 
@@ -416,7 +418,7 @@ protected:
 
   void put_completion_notifier(RGWAioCompletionNotifier *cn);
 public:
-  RGWCoroutinesManager(CephContext *_cct) : cct(_cct), completion_mgr(cct), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {}
+  RGWCoroutinesManager(CephContext *_cct) : cct(_cct), lock("RGWCoroutinesManager::lock"), completion_mgr(cct), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {}
   virtual ~RGWCoroutinesManager() {}
 
   int run(list<RGWCoroutinesStack *>& ops);