]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: async ops can generate new stacks for parallel execution
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 6 Aug 2015 00:19:11 +0000 (17:19 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 9 Feb 2016 20:59:41 +0000 (12:59 -0800)
Can now have an op generating another op that will be called at
a different stack. The current op will halt execution until new
op finishes. We can call this multiple times so that multiple ops
will be executed concurrently. This can be useful in the case we
want to generate a single op that will read from multiple objects,
after it had read some meta information. So the root op will read
the meta information and will generate multiple request that will
execute concurrently (albeit on a single thread).

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_sync.cc
src/rgw/rgw_sync.h

index ebf81325803262bf1117cd67b58141f8105475d9..7c64d61891a0d78d429710310fe6e9e4d1465031 100644 (file)
@@ -424,6 +424,24 @@ int RGWMetaSyncStatusManager::set_state(RGWMetaSyncGlobalStatus::SyncState state
   return 0;
 }
 
+void RGWAsyncOp::call(RGWAsyncOp *op)
+{
+  int r = env->stack->call(op, 0);
+  assert(r == 0);
+}
+
+void RGWAsyncOp::call_concurrent(RGWAsyncOp *op)
+{
+  RGWAsyncOpsStack *stack = env->manager->allocate_stack();
+
+  int r = stack->call(op, 0);
+  assert(r == 0);
+
+  env->stacks->push_back(stack);
+
+  env->stack->set_blocked_by(stack);
+}
+
 class RGWSimpleAsyncOp : public RGWAsyncOp {
   enum State {
     Init                      = 0,
@@ -448,11 +466,6 @@ class RGWSimpleAsyncOp : public RGWAsyncOp {
 protected:
   CephContext *cct;
 
-  void call(RGWAsyncOp *op) {
-    int r = env->stack->call(op, 0);
-    assert(r == 0);
-  }
-
 public:
   RGWSimpleAsyncOp(CephContext *_cct) : state(Init), cct(_cct) {}
 
@@ -603,6 +616,7 @@ class RGWReadSyncStatusOp : public RGWSimpleRadosAsyncOp<RGWMetaSyncGlobalStatus
   RGWObjectCtx& obj_ctx;
 
   RGWMetaSyncGlobalStatus *global_status;
+  rgw_sync_marker sync_marker;
 
 public:
   RGWReadSyncStatusOp(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
@@ -619,6 +633,10 @@ public:
 
 int RGWReadSyncStatusOp::handle_data(RGWMetaSyncGlobalStatus& data)
 {
+  call_concurrent(new RGWSimpleRadosAsyncOp<rgw_sync_marker>(async_rados, store, obj_ctx, store->get_zone_params().log_pool,
+                                "mdlog.state.0", &sync_marker));
+  call_concurrent(new RGWSimpleRadosAsyncOp<rgw_sync_marker>(async_rados, store, obj_ctx, store->get_zone_params().log_pool,
+                                "mdlog.state.1", &sync_marker));
   return 0;
 }
 
@@ -778,13 +796,14 @@ int RGWAsyncOpsStack::operate(RGWAsyncOpsEnv *env)
     ldout(cct, 0) << "ERROR: op->operate() returned r=" << r << dendl;
   }
 
-  done_flag = op->is_done();
   error_flag = op->is_error();
   blocked_flag = op->is_blocked();
 
-  if (done_flag) {
+  if (op->is_done()) {
     op->put();
-    return unwind(r);
+    r = unwind(r);
+    done_flag = (pos == ops.end());
+    return r;
   }
 
   /* should r ever be negative at this point? */
@@ -843,6 +862,20 @@ RGWCompletionManager *RGWAsyncOpsStack::get_completion_mgr()
   return ops_mgr->get_completion_mgr();
 }
 
+bool RGWAsyncOpsStack::unblock_stack(RGWAsyncOpsStack **s)
+{
+  if (blocking_stacks.empty()) {
+    return false;
+  }
+
+  set<RGWAsyncOpsStack *>::iterator iter = blocking_stacks.begin();
+  *s = *iter;
+  blocking_stacks.erase(iter);
+  (*s)->blocked_by_stack.erase(this);
+
+  return true;
+}
+
 void RGWAsyncOpsManager::report_error(RGWAsyncOpsStack *op)
 {
 #warning need to have error logging infrastructure that logs on backend
@@ -880,9 +913,21 @@ int RGWAsyncOpsManager::run(list<RGWAsyncOpsStack *>& stacks)
       report_error(stack);
     }
 
-    if (stack->is_blocked()) {
+    if (stack->is_blocked_by_stack()) {
+      /* do nothing, we'll re-add the stack when the blocking stack is done */
+    } else if (stack->is_blocked()) {
       waiting_count++;
     } else if (stack->is_done()) {
+      RGWAsyncOpsStack *s;
+      while (stack->unblock_stack(&s)) {
+       if (!s->is_blocked_by_stack() && !s->is_done()) {
+         if (s->is_blocked()) {
+           waiting_count++;
+         } else {
+           stacks.push_back(s);
+         }
+       }
+      }
       delete stack;
     } else {
       stacks.push_back(stack);
@@ -919,7 +964,7 @@ int RGWAsyncOpsManager::run(list<RGWAsyncOpsStack *>& stacks)
 int RGWAsyncOpsManager::run(RGWAsyncOp *op)
 {
   list<RGWAsyncOpsStack *> stacks;
-  RGWAsyncOpsStack *stack = new RGWAsyncOpsStack(cct, this);
+  RGWAsyncOpsStack *stack = allocate_stack();
   int r = stack->call(op);
   if (r < 0) {
     ldout(cct, 0) << "ERROR: stack->call() returned r=" << r << dendl;
index 2ba575e44a0cef48a1a220a8e39b84516990568c..5aa23c54ba298f9b4eb7751f6b58c498f459bf02 100644 (file)
@@ -55,6 +55,9 @@ protected:
     return operate();
   }
 
+  void call(RGWAsyncOp *op);
+  void call_concurrent(RGWAsyncOp *op);
+
 public:
   RGWAsyncOp() : env(NULL), blocked(false), retcode(0) {}
   virtual ~RGWAsyncOp() {}
@@ -84,6 +87,10 @@ class RGWAsyncOpsStack {
   list<RGWAsyncOp *> ops;
   list<RGWAsyncOp *>::iterator pos;
 
+  set<RGWAsyncOpsStack *> blocked_by_stack;
+  set<RGWAsyncOpsStack *> blocking_stacks;
+
+
   bool done_flag;
   bool error_flag;
   bool blocked_flag;
@@ -99,8 +106,11 @@ public:
   bool is_error() {
     return error_flag;
   }
+  bool is_blocked_by_stack() {
+    return !blocked_by_stack.empty();
+  }
   bool is_blocked() {
-    return blocked_flag;
+    return blocked_flag || is_blocked_by_stack();
   }
 
   void set_blocked(bool flag);
@@ -112,6 +122,13 @@ public:
 
   AioCompletionNotifier *create_completion_notifier();
   RGWCompletionManager *get_completion_mgr();
+
+  void set_blocked_by(RGWAsyncOpsStack *s) {
+    blocked_by_stack.insert(s);
+    s->blocking_stacks.insert(this);
+  }
+
+  bool unblock_stack(RGWAsyncOpsStack **s);
 };
 
 class RGWAsyncOpsManager {
@@ -135,6 +152,10 @@ public:
 
   AioCompletionNotifier *create_completion_notifier(RGWAsyncOpsStack *stack);
   RGWCompletionManager *get_completion_mgr() { return &completion_mgr; }
+
+  RGWAsyncOpsStack *allocate_stack() {
+    return new RGWAsyncOpsStack(cct, this);
+  }
 };
 
 struct RGWMetaSyncGlobalStatus {