]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: a few bug fixes related to async operations
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 5 Aug 2015 23:22:49 +0000 (16:22 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 9 Feb 2016 20:59:41 +0000 (12:59 -0800)
fix uninitialized state, trim stacks, etc.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_http_client.cc
src/rgw/rgw_http_client.h
src/rgw/rgw_sync.cc
src/rgw/rgw_sync.h

index 5850cc9217e00d86f8ac188e6bd1911088db6c23..3647de8f9f0cea961ceaf2bd74707d14fcb990ef 100644 (file)
@@ -328,6 +328,17 @@ int RGWCompletionManager::get_next(void **user_info)
   return 0;
 }
 
+bool RGWCompletionManager::try_get_next(void **user_info)
+{
+  Mutex::Locker l(lock);
+  if (complete_reqs.empty()) {
+    return false;
+  }
+  *user_info = complete_reqs.front();
+  complete_reqs.pop_front();
+  return true;
+}
+
 void RGWCompletionManager::go_down()
 {
   Mutex::Locker l(lock);
index 86d9c8b81d0d7f7d95191e61988c1767932f8d8e..984a0308e2a87c99fad003bb4319840e75ae8086 100644 (file)
@@ -79,6 +79,7 @@ public:
 
   void complete(void *user_info);
   int get_next(void **user_info);
+  bool try_get_next(void **user_info);
 
   void go_down();
 };
index 13df181ef1c9b20ca98188274ab6a23e2b62fedc..ebf81325803262bf1117cd67b58141f8105475d9 100644 (file)
@@ -425,8 +425,6 @@ int RGWMetaSyncStatusManager::set_state(RGWMetaSyncGlobalStatus::SyncState state
 }
 
 class RGWSimpleAsyncOp : public RGWAsyncOp {
-  CephContext *cct;
-
   enum State {
     Init                      = 0,
     SendRequest               = 1,
@@ -448,13 +446,15 @@ class RGWSimpleAsyncOp : public RGWAsyncOp {
   int state_all_complete();
 
 protected:
+  CephContext *cct;
+
   void call(RGWAsyncOp *op) {
     int r = env->stack->call(op, 0);
     assert(r == 0);
   }
 
 public:
-  RGWSimpleAsyncOp(CephContext *_cct) : cct(_cct) {}
+  RGWSimpleAsyncOp(CephContext *_cct) : state(Init), cct(_cct) {}
 
   virtual int init() { return 0; }
   virtual int send_request() = 0;
@@ -478,7 +478,7 @@ int RGWSimpleAsyncOp::operate()
       ldout(cct, 20) << __func__ << ": request complete" << dendl;
       return state_request_complete();
     case AllComplete:
-      ldout(cct, 20) << __func__ << ": request complete" << dendl;
+      ldout(cct, 20) << __func__ << ": all complete" << dendl;
       return state_all_complete();
     case Done:
       ldout(cct, 20) << __func__ << ": done" << dendl;
@@ -558,6 +558,10 @@ public:
 
   int send_request();
   int request_complete();
+
+  virtual int handle_data(T& data) {
+    return 0;
+  }
 };
 
 template <class T>
@@ -590,7 +594,7 @@ int RGWSimpleRadosAsyncOp<T>::request_complete()
     *result = T();
   }
 
-  return 0;
+  return handle_data(*result);
 }
 
 class RGWReadSyncStatusOp : public RGWSimpleRadosAsyncOp<RGWMetaSyncGlobalStatus> {
@@ -610,10 +614,14 @@ public:
                                                       async_rados(_async_rados), store(_store),
                                                       obj_ctx(_obj_ctx), global_status(_gs) {}
 
-  ~RGWReadSyncStatusOp() {
-  }
+  int handle_data(RGWMetaSyncGlobalStatus& data);
 };
 
+int RGWReadSyncStatusOp::handle_data(RGWMetaSyncGlobalStatus& data)
+{
+  return 0;
+}
+
 class RGWMetaSyncOp : public RGWAsyncOp {
   RGWRados *store;
   RGWMetadataLog *mdlog;
@@ -806,10 +814,12 @@ int RGWAsyncOpsStack::call(RGWAsyncOp *next_op, int ret) {
 int RGWAsyncOpsStack::unwind(int retcode)
 {
   if (pos == ops.begin()) {
+    pos = ops.end();
     return retcode;
   }
 
   --pos;
+  ops.pop_back();
   RGWAsyncOp *op = *pos;
   op->set_retcode(retcode);
   return 0;
@@ -839,6 +849,17 @@ void RGWAsyncOpsManager::report_error(RGWAsyncOpsStack *op)
   lderr(cct) << "ERROR: failed operation: " << op->error_str() << dendl;
 }
 
+void RGWAsyncOpsManager::handle_unblocked_stack(list<RGWAsyncOpsStack *>& stacks, RGWAsyncOpsStack *stack, int *waiting_count)
+{
+  --(*waiting_count);
+  stack->set_blocked(false);
+  if (!stack->is_done()) {
+    stacks.push_back(stack);
+  } else {
+    delete stack;
+  }
+}
+
 int RGWAsyncOpsManager::run(list<RGWAsyncOpsStack *>& stacks)
 {
   int waiting_count = 0;
@@ -847,7 +868,7 @@ int RGWAsyncOpsManager::run(list<RGWAsyncOpsStack *>& stacks)
   env.manager = this;
   env.stacks = &stacks;
 
-  for (list<RGWAsyncOpsStack *>::iterator iter = stacks.begin(); iter != stacks.end(); ++iter) {
+  for (list<RGWAsyncOpsStack *>::iterator iter = stacks.begin(); iter != stacks.end();) {
     RGWAsyncOpsStack *stack = *iter;
     env.stack = stack;
     int ret = stack->operate(&env);
@@ -867,31 +888,28 @@ int RGWAsyncOpsManager::run(list<RGWAsyncOpsStack *>& stacks)
       stacks.push_back(stack);
     }
 
+    RGWAsyncOpsStack *blocked_stack;
+    while (completion_mgr.try_get_next((void **)&blocked_stack)) {
+      handle_unblocked_stack(stacks, blocked_stack, &waiting_count);
+    }
+
     if (waiting_count >= ops_window) {
-      RGWAsyncOpsStack *blocked_stack;
       int ret = completion_mgr.get_next((void **)&blocked_stack);
       if (ret < 0) {
        ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
-      } else {
-        waiting_count--;
-      }
-      blocked_stack->set_blocked(false);
-      if (!blocked_stack->is_done()) {
-       stacks.push_back(blocked_stack);
-      } else {
-       delete blocked_stack;
       }
+      handle_unblocked_stack(stacks, blocked_stack, &waiting_count);
     }
-  }
 
-  while (waiting_count > 0) {
-    RGWAsyncOpsStack *stack;
-    int ret = completion_mgr.get_next((void **)&stack);
-    if (ret < 0) {
-      ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
-      return ret;
-    } else {
-      waiting_count--;
+    ++iter;
+    stacks.pop_front();
+    while (iter == stacks.end() && waiting_count > 0) {
+      int ret = completion_mgr.get_next((void **)&blocked_stack);
+      if (ret < 0) {
+       ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
+      }
+      handle_unblocked_stack(stacks, blocked_stack, &waiting_count);
+      iter = stacks.begin();
     }
   }
 
@@ -915,8 +933,6 @@ int RGWAsyncOpsManager::run(RGWAsyncOp *op)
     ldout(cct, 0) << "ERROR: run(stacks) returned r=" << r << dendl;
   }
 
-  delete stack;
-
   return r;
 }
 
index 010f9bdf171c9825f0fdeed23b14e2edf4a12e7a..2ba575e44a0cef48a1a220a8e39b84516990568c 100644 (file)
@@ -117,6 +117,7 @@ public:
 class RGWAsyncOpsManager {
   CephContext *cct;
 
+  void handle_unblocked_stack(list<RGWAsyncOpsStack *>& stacks, RGWAsyncOpsStack *stack, int *waiting_count);
 protected:
   RGWCompletionManager completion_mgr;