]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: move coroutines code
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 7 Aug 2015 21:17:33 +0000 (14:17 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 9 Feb 2016 21:40:53 +0000 (13:40 -0800)
move the generic code to rgw_coroutine.{h,cc}

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/Makefile.am
src/rgw/rgw_coroutine.cc [new file with mode: 0644]
src/rgw/rgw_coroutine.h [new file with mode: 0644]
src/rgw/rgw_sync.cc
src/rgw/rgw_sync.h

index a848d2b2865402cdbbe31e506c030153979b4c0c..1423a1088456466dea60dff3d82f2c8a8e30d69f 100644 (file)
@@ -18,6 +18,7 @@ librgw_la_SOURCES =  \
        rgw/rgw_acl_s3.cc \
        rgw/rgw_acl_swift.cc \
        rgw/rgw_client_io.cc \
+       rgw/rgw_coroutine.cc \
        rgw/rgw_fcgi.cc \
        rgw/rgw_xml.cc \
        rgw/rgw_usage.cc \
@@ -132,6 +133,7 @@ noinst_HEADERS += \
        rgw/rgw_acl_s3.h \
        rgw/rgw_acl_swift.h \
        rgw/rgw_client_io.h \
+       rgw/rgw_coroutine.h \
        rgw/rgw_fcgi.h \
        rgw/rgw_xml.h \
        rgw/rgw_basic_types.h \
@@ -176,6 +178,7 @@ noinst_HEADERS += \
        rgw/rgw_rest_opstate.h \
        rgw/rgw_rest_replica_log.h \
        rgw/rgw_rest_config.h \
+       rgw/rgw_sync.h \
        rgw/rgw_usage.h \
        rgw/rgw_user.h \
        rgw/rgw_bucket.h \
diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc
new file mode 100644 (file)
index 0000000..c94501c
--- /dev/null
@@ -0,0 +1,310 @@
+
+
+#include "rgw_coroutine.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+
+
+RGWCoroutinesStack::RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start) : cct(_cct), ops_mgr(_ops_mgr),
+                                                                                                         done_flag(false), error_flag(false), blocked_flag(false) {
+  if (start) {
+    ops.push_back(start);
+  }
+  pos = ops.begin();
+}
+
+int RGWCoroutinesStack::operate(RGWCoroutinesEnv *env)
+{
+  RGWCoroutine *op = *pos;
+  int r = op->do_operate(env);
+  if (r < 0) {
+    ldout(cct, 0) << "ERROR: op->operate() returned r=" << r << dendl;
+  }
+
+  error_flag = op->is_error();
+  blocked_flag = op->is_blocked();
+
+  if (op->is_done()) {
+    op->put();
+    r = unwind(r);
+    done_flag = (pos == ops.end());
+    return r;
+  }
+
+  /* should r ever be negative at this point? */
+  assert(r >= 0);
+
+  return 0;
+}
+
+string RGWCoroutinesStack::error_str()
+{
+  if (pos != ops.end()) {
+    return (*pos)->error_str();
+  }
+  return string();
+}
+
+int RGWCoroutinesStack::call(RGWCoroutine *next_op, int ret) {
+  ops.push_back(next_op);
+  if (pos != ops.end()) {
+    ++pos;
+  } else {
+    pos = ops.begin();
+  }
+  return ret;
+}
+
+int RGWCoroutinesStack::unwind(int retcode)
+{
+  if (pos == ops.begin()) {
+    pos = ops.end();
+    return retcode;
+  }
+
+  --pos;
+  ops.pop_back();
+  RGWCoroutine *op = *pos;
+  op->set_retcode(retcode);
+  return 0;
+}
+
+void RGWCoroutinesStack::set_blocked(bool flag)
+{
+  blocked_flag = flag;
+  if (pos != ops.end()) {
+    (*pos)->set_blocked(flag);
+  }
+}
+
+static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg);
+
+static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg)
+{
+  ((RGWAioCompletionNotifier *)arg)->cb();
+}
+
+RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data) : completion_mgr(_mgr), user_data(_user_data) {
+  c = librados::Rados::aio_create_completion((void *)this, _aio_completion_notifier_cb, NULL);
+}
+
+RGWAioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier()
+{
+  return ops_mgr->create_completion_notifier(this);
+}
+
+RGWCompletionManager *RGWCoroutinesStack::get_completion_mgr()
+{
+  return ops_mgr->get_completion_mgr();
+}
+
+bool RGWCoroutinesStack::unblock_stack(RGWCoroutinesStack **s)
+{
+  if (blocking_stacks.empty()) {
+    return false;
+  }
+
+  set<RGWCoroutinesStack *>::iterator iter = blocking_stacks.begin();
+  *s = *iter;
+  blocking_stacks.erase(iter);
+  (*s)->blocked_by_stack.erase(this);
+
+  return true;
+}
+
+void RGWCoroutinesManager::report_error(RGWCoroutinesStack *op)
+{
+#warning need to have error logging infrastructure that logs on backend
+  lderr(cct) << "ERROR: failed operation: " << op->error_str() << dendl;
+}
+
+void RGWCoroutinesManager::handle_unblocked_stack(list<RGWCoroutinesStack *>& stacks, RGWCoroutinesStack *stack, int *waiting_count)
+{
+  --(*waiting_count);
+  stack->set_blocked(false);
+  if (!stack->is_done()) {
+    stacks.push_back(stack);
+  } else {
+    delete stack;
+  }
+}
+
+int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
+{
+  int waiting_count = 0;
+  RGWCoroutinesEnv env;
+
+  env.manager = this;
+  env.stacks = &stacks;
+
+  for (list<RGWCoroutinesStack *>::iterator iter = stacks.begin(); iter != stacks.end();) {
+    RGWCoroutinesStack *stack = *iter;
+    env.stack = stack;
+    int ret = stack->operate(&env);
+    if (ret < 0) {
+      ldout(cct, 0) << "ERROR: stack->operate() returned ret=" << ret << dendl;
+    }
+
+    if (stack->is_error()) {
+      report_error(stack);
+    }
+
+    if (stack->is_blocked_by_stack()) {
+      /* do nothing, we'll re-add the stack when the blocking stack is done */
+    } else if (stack->is_blocked()) {
+      waiting_count++;
+    } else if (stack->is_done()) {
+      RGWCoroutinesStack *s;
+      while (stack->unblock_stack(&s)) {
+       if (!s->is_blocked_by_stack() && !s->is_done()) {
+         if (s->is_blocked()) {
+           waiting_count++;
+         } else {
+           stacks.push_back(s);
+         }
+       }
+      }
+      delete stack;
+    } else {
+      stacks.push_back(stack);
+    }
+
+    RGWCoroutinesStack *blocked_stack;
+    while (completion_mgr.try_get_next((void **)&blocked_stack)) {
+      handle_unblocked_stack(stacks, blocked_stack, &waiting_count);
+    }
+
+    if (waiting_count >= ops_window) {
+      int ret = completion_mgr.get_next((void **)&blocked_stack);
+      if (ret < 0) {
+       ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
+      }
+      handle_unblocked_stack(stacks, blocked_stack, &waiting_count);
+    }
+
+    ++iter;
+    stacks.pop_front();
+    while (iter == stacks.end() && waiting_count > 0) {
+      int ret = completion_mgr.get_next((void **)&blocked_stack);
+      if (ret < 0) {
+       ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
+      }
+      handle_unblocked_stack(stacks, blocked_stack, &waiting_count);
+      iter = stacks.begin();
+    }
+  }
+
+  return 0;
+}
+
+int RGWCoroutinesManager::run(RGWCoroutine *op)
+{
+  list<RGWCoroutinesStack *> stacks;
+  RGWCoroutinesStack *stack = allocate_stack();
+  op->get();
+  int r = stack->call(op);
+  if (r < 0) {
+    ldout(cct, 0) << "ERROR: stack->call() returned r=" << r << dendl;
+    return r;
+  }
+
+  stacks.push_back(stack);
+
+  r = run(stacks);
+  if (r < 0) {
+    ldout(cct, 0) << "ERROR: run(stacks) returned r=" << r << dendl;
+  }
+
+  r = op->get_ret_status();
+  op->put();
+
+  return r;
+}
+
+RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack)
+{
+  return new RGWAioCompletionNotifier(&completion_mgr, (void *)stack);
+}
+
+void RGWCoroutine::call(RGWCoroutine *op)
+{
+  int r = env->stack->call(op, 0);
+  assert(r == 0);
+}
+
+void RGWCoroutine::spawn(RGWCoroutine *op)
+{
+  RGWCoroutinesStack *stack = env->manager->allocate_stack();
+
+  int r = stack->call(op, 0);
+  assert(r == 0);
+
+  env->stacks->push_back(stack);
+
+  env->stack->set_blocked_by(stack);
+}
+
+int RGWSimpleCoroutine::operate()
+{
+  switch (state) {
+    case Init:
+      ldout(cct, 20) << __func__ << ": init request" << dendl;
+      return state_init();
+    case SendRequest:
+      ldout(cct, 20) << __func__ << ": send request" << dendl;
+      return state_send_request();
+    case RequestComplete:
+      ldout(cct, 20) << __func__ << ": request complete" << dendl;
+      return state_request_complete();
+    case AllComplete:
+      ldout(cct, 20) << __func__ << ": all complete" << dendl;
+      return state_all_complete();
+    case Done:
+      ldout(cct, 20) << __func__ << ": done" << dendl;
+      break;
+    case Error:
+      ldout(cct, 20) << __func__ << ": error" << dendl;
+      break;
+  }
+
+  return 0;
+}
+
+int RGWSimpleCoroutine::state_init()
+{
+  int ret = init();
+  if (ret < 0) {
+    return set_state(Error, ret);
+  }
+  return set_state(SendRequest);
+}
+
+int RGWSimpleCoroutine::state_send_request()
+{
+  int ret = send_request();
+  if (ret < 0) {
+    return set_state(Error, ret);
+  }
+  return yield(set_state(RequestComplete));
+}
+
+int RGWSimpleCoroutine::state_request_complete()
+{
+  int ret = request_complete();
+  if (ret < 0) {
+    return set_state(Error, ret);
+  }
+  return set_state(AllComplete);
+}
+
+int RGWSimpleCoroutine::state_all_complete()
+{
+  int ret = finish();
+  if (ret < 0) {
+    return set_state(Error, ret);
+  }
+  return set_state(Done);
+}
+
+
diff --git a/src/rgw/rgw_coroutine.h b/src/rgw/rgw_coroutine.h
new file mode 100644 (file)
index 0000000..ab5d316
--- /dev/null
@@ -0,0 +1,209 @@
+#ifndef CEPH_RGW_COROUTINE_H
+#define CEPH_RGW_COROUTINE_H
+
+#include "rgw_http_client.h"
+
+#include "common/RefCountedObj.h"
+
+
+
+#define RGW_ASYNC_OPS_MGR_WINDOW 16
+
+class RGWCoroutinesStack;
+class RGWCoroutinesManager;
+
+/* a single use librados aio completion notifier that hooks into the RGWCompletionManager */
+class RGWAioCompletionNotifier : public RefCountedObject {
+  librados::AioCompletion *c;
+  RGWCompletionManager *completion_mgr;
+  void *user_data;
+
+public:
+  RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data);
+  ~RGWAioCompletionNotifier() {
+    c->release();
+  }
+
+  librados::AioCompletion *completion() {
+    return c;
+  }
+
+  void cb() {
+    completion_mgr->complete(user_data);
+    put();
+  }
+};
+
+
+struct RGWCoroutinesEnv {
+  RGWCoroutinesManager *manager;
+  list<RGWCoroutinesStack *> *stacks;
+  RGWCoroutinesStack *stack;
+
+  RGWCoroutinesEnv() : manager(NULL), stacks(NULL), stack(NULL) {}
+};
+
+class RGWCoroutine : public RefCountedObject {
+  friend class RGWCoroutinesStack;
+protected:
+  RGWCoroutinesEnv *env;
+  bool blocked;
+  int retcode;
+
+  stringstream error_stream;
+
+  void set_blocked(bool flag) { blocked = flag; }
+  int yield(int ret) {
+    set_blocked(true);
+    return ret;
+  }
+
+  int do_operate(RGWCoroutinesEnv *_env) {
+    env = _env;
+    return operate();
+  }
+
+  void call(RGWCoroutine *op);
+  void spawn(RGWCoroutine *op);
+
+public:
+  RGWCoroutine() : env(NULL), blocked(false), retcode(0) {}
+  virtual ~RGWCoroutine() {}
+
+  virtual int operate() = 0;
+
+  virtual bool is_done() = 0;
+  virtual bool is_error() = 0;
+
+  stringstream& log_error() { return error_stream; }
+  string error_str() {
+    return error_stream.str();
+  }
+
+  bool is_blocked() { return blocked; }
+
+  void set_retcode(int r) {
+    retcode = r;
+  }
+
+  int get_ret_status() {
+    return retcode;
+  }
+};
+
+class RGWCoroutinesStack {
+  CephContext *cct;
+
+  RGWCoroutinesManager *ops_mgr;
+
+  list<RGWCoroutine *> ops;
+  list<RGWCoroutine *>::iterator pos;
+
+  set<RGWCoroutinesStack *> blocked_by_stack;
+  set<RGWCoroutinesStack *> blocking_stacks;
+
+
+  bool done_flag;
+  bool error_flag;
+  bool blocked_flag;
+
+public:
+  RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL);
+
+  int operate(RGWCoroutinesEnv *env);
+
+  bool is_done() {
+    return done_flag;
+  }
+  bool is_error() {
+    return error_flag;
+  }
+  bool is_blocked_by_stack() {
+    return !blocked_by_stack.empty();
+  }
+  bool is_blocked() {
+    return blocked_flag || is_blocked_by_stack();
+  }
+
+  void set_blocked(bool flag);
+
+  string error_str();
+
+  int call(RGWCoroutine *next_op, int ret = 0);
+  int unwind(int retcode);
+
+  RGWAioCompletionNotifier *create_completion_notifier();
+  RGWCompletionManager *get_completion_mgr();
+
+  void set_blocked_by(RGWCoroutinesStack *s) {
+    blocked_by_stack.insert(s);
+    s->blocking_stacks.insert(this);
+  }
+
+  bool unblock_stack(RGWCoroutinesStack **s);
+};
+
+class RGWCoroutinesManager {
+  CephContext *cct;
+
+  void handle_unblocked_stack(list<RGWCoroutinesStack *>& stacks, RGWCoroutinesStack *stack, int *waiting_count);
+protected:
+  RGWCompletionManager completion_mgr;
+
+  int ops_window;
+
+  void put_completion_notifier(RGWAioCompletionNotifier *cn);
+public:
+  RGWCoroutinesManager(CephContext *_cct) : cct(_cct), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {}
+  virtual ~RGWCoroutinesManager() {}
+
+  int run(list<RGWCoroutinesStack *>& ops);
+  int run(RGWCoroutine *op);
+
+  virtual void report_error(RGWCoroutinesStack *op);
+
+  RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack);
+  RGWCompletionManager *get_completion_mgr() { return &completion_mgr; }
+
+  RGWCoroutinesStack *allocate_stack() {
+    return new RGWCoroutinesStack(cct, this);
+  }
+};
+
+class RGWSimpleCoroutine : public RGWCoroutine {
+  enum State {
+    Init                      = 0,
+    SendRequest               = 1,
+    RequestComplete           = 2,
+    AllComplete               = 3,
+    Done                      = 100,
+    Error                     = 200,
+  } state;
+
+  int set_state(State s, int ret = 0) {
+    state = s;
+    return ret;
+  }
+  int operate();
+
+  int state_init();
+  int state_send_request();
+  int state_request_complete();
+  int state_all_complete();
+
+protected:
+  CephContext *cct;
+
+public:
+  RGWSimpleCoroutine(CephContext *_cct) : state(Init), cct(_cct) {}
+
+  virtual int init() { return 0; }
+  virtual int send_request() = 0;
+  virtual int request_complete() = 0;
+  virtual int finish() { return 0; }
+
+  bool is_done() { return (state == Done || state == Error); }
+  bool is_error() { return (state == Error); }
+};
+
+#endif
index 6eef00f2bb1a75edbb7662741baae318c89120c6..dd8ee47fb07b097f0facfe9c7dee1d932a6d68ed 100644 (file)
@@ -53,40 +53,8 @@ void rgw_mdlog_shard_data::decode_json(JSONObj *obj) {
   JSONDecoder::decode_json("entries", entries, obj);
 };
 
-static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg);
-
-/* a single use librados aio completion notifier that hooks into the RGWCompletionManager */
-class AioCompletionNotifier : public RefCountedObject {
-  librados::AioCompletion *c;
-  RGWCompletionManager *completion_mgr;
-  void *user_data;
-
-public:
-  AioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data) : completion_mgr(_mgr), user_data(_user_data) {
-    c = librados::Rados::aio_create_completion((void *)this, _aio_completion_notifier_cb, NULL);
-  }
-
-  ~AioCompletionNotifier() {
-    c->release();
-  }
-
-  librados::AioCompletion *completion() {
-    return c;
-  }
-
-  void cb() {
-    completion_mgr->complete(user_data);
-    put();
-  }
-};
-
-static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg)
-{
-  ((AioCompletionNotifier *)arg)->cb();
-}
-
 class RGWAsyncRadosRequest {
-  AioCompletionNotifier *notifier;
+  RGWAioCompletionNotifier *notifier;
 
   void *user_info;
   int retcode;
@@ -94,7 +62,7 @@ class RGWAsyncRadosRequest {
 protected:
   virtual int _send_request() = 0;
 public:
-  RGWAsyncRadosRequest(AioCompletionNotifier *_cn) : notifier(_cn) {}
+  RGWAsyncRadosRequest(RGWAioCompletionNotifier *_cn) : notifier(_cn) {}
   virtual ~RGWAsyncRadosRequest() {}
 
   void send_request() {
@@ -119,7 +87,7 @@ protected:
     return store->get_system_obj(*obj_ctx, read_state, objv_tracker, obj, *pbl, ofs, end, NULL);
   }
 public:
-  RGWAsyncGetSystemObj(AioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
+  RGWAsyncGetSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
                        RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj,
                        bufferlist *_pbl, off_t _ofs, off_t _end) : RGWAsyncRadosRequest(cn), store(_store), obj_ctx(_obj_ctx),
                                                                    objv_tracker(_objv_tracker), obj(_obj), pbl(_pbl),
@@ -127,6 +95,29 @@ public:
   }
 };
 
+class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
+  RGWRados *store;
+  RGWObjVersionTracker *objv_tracker;
+  rgw_obj obj;
+  bool exclusive;
+  bufferlist bl;
+  map<string, bufferlist> attrs;
+  time_t mtime;
+
+protected:
+  int _send_request() {
+    return store->put_system_obj(NULL, obj, bl.c_str(), bl.length(), exclusive,
+                                 NULL, attrs, objv_tracker, mtime);
+  }
+public:
+  RGWAsyncPutSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store,
+                       RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj, bool _exclusive,
+                       bufferlist& _bl, time_t _mtime = 0) : RGWAsyncRadosRequest(cn), store(_store),
+                                                                   objv_tracker(_objv_tracker), obj(_obj), exclusive(_exclusive),
+                                                                  bl(_bl), mtime(_mtime) {}
+};
+
+
 
 
 class RGWAsyncRadosProcessor {
@@ -424,124 +415,8 @@ int RGWMetaSyncStatusManager::set_state(RGWMetaSyncGlobalStatus::SyncState state
   return 0;
 }
 
-void RGWCoroutine::call(RGWCoroutine *op)
-{
-  int r = env->stack->call(op, 0);
-  assert(r == 0);
-}
-
-void RGWCoroutine::spawn(RGWCoroutine *op)
-{
-  RGWCoroutinesStack *stack = env->manager->allocate_stack();
-
-  int r = stack->call(op, 0);
-  assert(r == 0);
-
-  env->stacks->push_back(stack);
-
-  env->stack->set_blocked_by(stack);
-}
-
-class RGWSimpleCoroutine : public RGWCoroutine {
-  enum State {
-    Init                      = 0,
-    SendRequest               = 1,
-    RequestComplete           = 2,
-    AllComplete               = 3,
-    Done                      = 100,
-    Error                     = 200,
-  } state;
-
-  int set_state(State s, int ret = 0) {
-    state = s;
-    return ret;
-  }
-  int operate();
-
-  int state_init();
-  int state_send_request();
-  int state_request_complete();
-  int state_all_complete();
-
-protected:
-  CephContext *cct;
-
-public:
-  RGWSimpleCoroutine(CephContext *_cct) : state(Init), cct(_cct) {}
-
-  virtual int init() { return 0; }
-  virtual int send_request() = 0;
-  virtual int request_complete() = 0;
-  virtual int finish() { return 0; }
-
-  bool is_done() { return (state == Done || state == Error); }
-  bool is_error() { return (state == Error); }
-};
-
-int RGWSimpleCoroutine::operate()
-{
-  switch (state) {
-    case Init:
-      ldout(cct, 20) << __func__ << ": init request" << dendl;
-      return state_init();
-    case SendRequest:
-      ldout(cct, 20) << __func__ << ": send request" << dendl;
-      return state_send_request();
-    case RequestComplete:
-      ldout(cct, 20) << __func__ << ": request complete" << dendl;
-      return state_request_complete();
-    case AllComplete:
-      ldout(cct, 20) << __func__ << ": all complete" << dendl;
-      return state_all_complete();
-    case Done:
-      ldout(cct, 20) << __func__ << ": done" << dendl;
-      break;
-    case Error:
-      ldout(cct, 20) << __func__ << ": error" << dendl;
-      break;
-  }
-
-  return 0;
-}
-
-int RGWSimpleCoroutine::state_init()
-{
-  int ret = init();
-  if (ret < 0) {
-    return set_state(Error, ret);
-  }
-  return set_state(SendRequest);
-}
-
-int RGWSimpleCoroutine::state_send_request()
-{
-  int ret = send_request();
-  if (ret < 0) {
-    return set_state(Error, ret);
-  }
-  return yield(set_state(RequestComplete));
-}
-
-int RGWSimpleCoroutine::state_request_complete()
-{
-  int ret = request_complete();
-  if (ret < 0) {
-    return set_state(Error, ret);
-  }
-  return set_state(AllComplete);
-}
-
-int RGWSimpleCoroutine::state_all_complete()
-{
-  int ret = finish();
-  if (ret < 0) {
-    return set_state(Error, ret);
-  }
-  return set_state(Done);
-}
-
 template <class T>
-class RGWSimpleRadosCoroutine : public RGWSimpleCoroutine {
+class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
   RGWAsyncRadosProcessor *async_rados;
   RGWRados *store;
   RGWObjectCtx& obj_ctx;
@@ -555,7 +430,7 @@ class RGWSimpleRadosCoroutine : public RGWSimpleCoroutine {
   RGWAsyncGetSystemObj *req;
 
 public:
-  RGWSimpleRadosCoroutine(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+  RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
                      RGWObjectCtx& _obj_ctx,
                      rgw_bucket& _pool, const string& _oid,
                      T *_result) : RGWSimpleCoroutine(_store->ctx()),
@@ -565,7 +440,7 @@ public:
                                                result(_result),
                                                 req(NULL) { }
                                                          
-  ~RGWSimpleRadosCoroutine() {
+  ~RGWSimpleRadosReadCR() {
     delete req;
   }
 
@@ -578,7 +453,7 @@ public:
 };
 
 template <class T>
-int RGWSimpleRadosCoroutine<T>::send_request()
+int RGWSimpleRadosReadCR<T>::send_request()
 {
   rgw_obj obj = rgw_obj(pool, oid);
   req = new RGWAsyncGetSystemObj(env->stack->create_completion_notifier(),
@@ -590,7 +465,7 @@ int RGWSimpleRadosCoroutine<T>::send_request()
 }
 
 template <class T>
-int RGWSimpleRadosCoroutine<T>::request_complete()
+int RGWSimpleRadosReadCR<T>::request_complete()
 {
   int ret = req->get_ret_status();
   retcode = ret;
@@ -611,7 +486,7 @@ int RGWSimpleRadosCoroutine<T>::request_complete()
   return handle_data(*result);
 }
 
-class RGWReadSyncStatusCoroutine : public RGWSimpleRadosCoroutine<RGWMetaSyncGlobalStatus> {
+class RGWReadSyncStatusCoroutine : public RGWSimpleRadosReadCR<RGWMetaSyncGlobalStatus> {
   RGWAsyncRadosProcessor *async_rados;
   RGWRados *store;
   RGWObjectCtx& obj_ctx;
@@ -622,7 +497,7 @@ class RGWReadSyncStatusCoroutine : public RGWSimpleRadosCoroutine<RGWMetaSyncGlo
 public:
   RGWReadSyncStatusCoroutine(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
                      RGWObjectCtx& _obj_ctx,
-                     RGWMetaSyncGlobalStatus *_gs) : RGWSimpleRadosCoroutine(_async_rados, _store, _obj_ctx,
+                     RGWMetaSyncGlobalStatus *_gs) : RGWSimpleRadosReadCR(_async_rados, _store, _obj_ctx,
                                                                            _store->get_zone_params().log_pool,
                                                                            "mdlog.state.global",
                                                                            _gs),
@@ -637,9 +512,9 @@ int RGWReadSyncStatusCoroutine::handle_data(RGWMetaSyncGlobalStatus& data)
   if (retcode == -ENOENT) {
     return retcode;
   }
-  spawn(new RGWSimpleRadosCoroutine<rgw_sync_marker>(async_rados, store, obj_ctx, store->get_zone_params().log_pool,
+  spawn(new RGWSimpleRadosReadCR<rgw_sync_marker>(async_rados, store, obj_ctx, store->get_zone_params().log_pool,
                                 "mdlog.state.0", &sync_marker));
-  spawn(new RGWSimpleRadosCoroutine<rgw_sync_marker>(async_rados, store, obj_ctx, store->get_zone_params().log_pool,
+  spawn(new RGWSimpleRadosReadCR<rgw_sync_marker>(async_rados, store, obj_ctx, store->get_zone_params().log_pool,
                                 "mdlog.state.1", &sync_marker));
   return 0;
 }
@@ -738,7 +613,7 @@ class RGWCloneMetaLogCoroutine : public RGWCoroutine {
 
   RGWRESTReadResource *http_op;
 
-  AioCompletionNotifier *md_op_notifier;
+  RGWAioCompletionNotifier *md_op_notifier;
 
   int req_ret;
   RGWMetadataLogInfo shard_info;
@@ -784,216 +659,6 @@ public:
   bool is_error() { return (state == Error); }
 };
 
-RGWCoroutinesStack::RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start) : cct(_cct), ops_mgr(_ops_mgr),
-                                                                                                         done_flag(false), error_flag(false), blocked_flag(false) {
-  if (start) {
-    ops.push_back(start);
-  }
-  pos = ops.begin();
-}
-
-int RGWCoroutinesStack::operate(RGWCoroutinesEnv *env)
-{
-  RGWCoroutine *op = *pos;
-  int r = op->do_operate(env);
-  if (r < 0) {
-    ldout(cct, 0) << "ERROR: op->operate() returned r=" << r << dendl;
-  }
-
-  error_flag = op->is_error();
-  blocked_flag = op->is_blocked();
-
-  if (op->is_done()) {
-    op->put();
-    r = unwind(r);
-    done_flag = (pos == ops.end());
-    return r;
-  }
-
-  /* should r ever be negative at this point? */
-  assert(r >= 0);
-
-  return 0;
-}
-
-string RGWCoroutinesStack::error_str()
-{
-  if (pos != ops.end()) {
-    return (*pos)->error_str();
-  }
-  return string();
-}
-
-int RGWCoroutinesStack::call(RGWCoroutine *next_op, int ret) {
-  ops.push_back(next_op);
-  if (pos != ops.end()) {
-    ++pos;
-  } else {
-    pos = ops.begin();
-  }
-  return ret;
-}
-
-int RGWCoroutinesStack::unwind(int retcode)
-{
-  if (pos == ops.begin()) {
-    pos = ops.end();
-    return retcode;
-  }
-
-  --pos;
-  ops.pop_back();
-  RGWCoroutine *op = *pos;
-  op->set_retcode(retcode);
-  return 0;
-}
-
-void RGWCoroutinesStack::set_blocked(bool flag)
-{
-  blocked_flag = flag;
-  if (pos != ops.end()) {
-    (*pos)->set_blocked(flag);
-  }
-}
-
-AioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier()
-{
-  return ops_mgr->create_completion_notifier(this);
-}
-
-RGWCompletionManager *RGWCoroutinesStack::get_completion_mgr()
-{
-  return ops_mgr->get_completion_mgr();
-}
-
-bool RGWCoroutinesStack::unblock_stack(RGWCoroutinesStack **s)
-{
-  if (blocking_stacks.empty()) {
-    return false;
-  }
-
-  set<RGWCoroutinesStack *>::iterator iter = blocking_stacks.begin();
-  *s = *iter;
-  blocking_stacks.erase(iter);
-  (*s)->blocked_by_stack.erase(this);
-
-  return true;
-}
-
-void RGWCoroutinesManager::report_error(RGWCoroutinesStack *op)
-{
-#warning need to have error logging infrastructure that logs on backend
-  lderr(cct) << "ERROR: failed operation: " << op->error_str() << dendl;
-}
-
-void RGWCoroutinesManager::handle_unblocked_stack(list<RGWCoroutinesStack *>& stacks, RGWCoroutinesStack *stack, int *waiting_count)
-{
-  --(*waiting_count);
-  stack->set_blocked(false);
-  if (!stack->is_done()) {
-    stacks.push_back(stack);
-  } else {
-    delete stack;
-  }
-}
-
-int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
-{
-  int waiting_count = 0;
-  RGWCoroutinesEnv env;
-
-  env.manager = this;
-  env.stacks = &stacks;
-
-  for (list<RGWCoroutinesStack *>::iterator iter = stacks.begin(); iter != stacks.end();) {
-    RGWCoroutinesStack *stack = *iter;
-    env.stack = stack;
-    int ret = stack->operate(&env);
-    if (ret < 0) {
-      ldout(cct, 0) << "ERROR: stack->operate() returned ret=" << ret << dendl;
-    }
-
-    if (stack->is_error()) {
-      report_error(stack);
-    }
-
-    if (stack->is_blocked_by_stack()) {
-      /* do nothing, we'll re-add the stack when the blocking stack is done */
-    } else if (stack->is_blocked()) {
-      waiting_count++;
-    } else if (stack->is_done()) {
-      RGWCoroutinesStack *s;
-      while (stack->unblock_stack(&s)) {
-       if (!s->is_blocked_by_stack() && !s->is_done()) {
-         if (s->is_blocked()) {
-           waiting_count++;
-         } else {
-           stacks.push_back(s);
-         }
-       }
-      }
-      delete stack;
-    } else {
-      stacks.push_back(stack);
-    }
-
-    RGWCoroutinesStack *blocked_stack;
-    while (completion_mgr.try_get_next((void **)&blocked_stack)) {
-      handle_unblocked_stack(stacks, blocked_stack, &waiting_count);
-    }
-
-    if (waiting_count >= ops_window) {
-      int ret = completion_mgr.get_next((void **)&blocked_stack);
-      if (ret < 0) {
-       ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
-      }
-      handle_unblocked_stack(stacks, blocked_stack, &waiting_count);
-    }
-
-    ++iter;
-    stacks.pop_front();
-    while (iter == stacks.end() && waiting_count > 0) {
-      int ret = completion_mgr.get_next((void **)&blocked_stack);
-      if (ret < 0) {
-       ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
-      }
-      handle_unblocked_stack(stacks, blocked_stack, &waiting_count);
-      iter = stacks.begin();
-    }
-  }
-
-  return 0;
-}
-
-int RGWCoroutinesManager::run(RGWCoroutine *op)
-{
-  list<RGWCoroutinesStack *> stacks;
-  RGWCoroutinesStack *stack = allocate_stack();
-  op->get();
-  int r = stack->call(op);
-  if (r < 0) {
-    ldout(cct, 0) << "ERROR: stack->call() returned r=" << r << dendl;
-    return r;
-  }
-
-  stacks.push_back(stack);
-
-  r = run(stacks);
-  if (r < 0) {
-    ldout(cct, 0) << "ERROR: run(stacks) returned r=" << r << dendl;
-  }
-
-  r = op->get_ret_status();
-  op->put();
-
-  return r;
-}
-
-AioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack)
-{
-  return new AioCompletionNotifier(&completion_mgr, (void *)stack);
-}
-
 int RGWRemoteMetaLog::clone_shards()
 {
   list<RGWCoroutinesStack *> stacks;
@@ -1187,7 +852,7 @@ int RGWCloneMetaLogCoroutine::state_store_mdlog_entries()
     marker = entry.id;
   }
 
-  AioCompletionNotifier *cn = env->stack->create_completion_notifier();
+  RGWAioCompletionNotifier *cn = env->stack->create_completion_notifier();
 
   int ret = store->meta_mgr->store_md_log_entries(dest_entries, shard_id, cn->completion());
   if (ret < 0) {
index e0922bb28a8532174dd58d7df909d9587135d9ec..e242ef850ce04b9d0cb69b3629b23fd0c983769e 100644 (file)
@@ -1,16 +1,9 @@
 #ifndef CEPH_RGW_SYNC_H
 #define CEPH_RGW_SYNC_H
 
-#include "rgw_common.h"
-#include "rgw_rados.h"
-#include "rgw_metadata.h"
-#include "rgw_http_client.h"
+#include "rgw_coroutine.h"
 
 #include "common/RWLock.h"
-#include "common/RefCountedObj.h"
-
-
-#define dout_subsys ceph_subsys_rgw
 
 
 struct rgw_mdlog_info {
@@ -21,147 +14,6 @@ struct rgw_mdlog_info {
   void decode_json(JSONObj *obj);
 };
 
-#define RGW_ASYNC_OPS_MGR_WINDOW 16
-
-class RGWCoroutinesStack;
-class RGWCoroutinesManager;
-class AioCompletionNotifier;
-
-struct RGWCoroutinesEnv {
-  RGWCoroutinesManager *manager;
-  list<RGWCoroutinesStack *> *stacks;
-  RGWCoroutinesStack *stack;
-
-  RGWCoroutinesEnv() : manager(NULL), stacks(NULL), stack(NULL) {}
-};
-
-class RGWCoroutine : public RefCountedObject {
-  friend class RGWCoroutinesStack;
-protected:
-  RGWCoroutinesEnv *env;
-  bool blocked;
-  int retcode;
-
-  stringstream error_stream;
-
-  void set_blocked(bool flag) { blocked = flag; }
-  int yield(int ret) {
-    set_blocked(true);
-    return ret;
-  }
-
-  int do_operate(RGWCoroutinesEnv *_env) {
-    env = _env;
-    return operate();
-  }
-
-  void call(RGWCoroutine *op);
-  void spawn(RGWCoroutine *op);
-
-public:
-  RGWCoroutine() : env(NULL), blocked(false), retcode(0) {}
-  virtual ~RGWCoroutine() {}
-
-  virtual int operate() = 0;
-
-  virtual bool is_done() = 0;
-  virtual bool is_error() = 0;
-
-  stringstream& log_error() { return error_stream; }
-  string error_str() {
-    return error_stream.str();
-  }
-
-  bool is_blocked() { return blocked; }
-
-  void set_retcode(int r) {
-    retcode = r;
-  }
-
-  int get_ret_status() {
-    return retcode;
-  }
-};
-
-class RGWCoroutinesStack {
-  CephContext *cct;
-
-  RGWCoroutinesManager *ops_mgr;
-
-  list<RGWCoroutine *> ops;
-  list<RGWCoroutine *>::iterator pos;
-
-  set<RGWCoroutinesStack *> blocked_by_stack;
-  set<RGWCoroutinesStack *> blocking_stacks;
-
-
-  bool done_flag;
-  bool error_flag;
-  bool blocked_flag;
-
-public:
-  RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL);
-
-  int operate(RGWCoroutinesEnv *env);
-
-  bool is_done() {
-    return done_flag;
-  }
-  bool is_error() {
-    return error_flag;
-  }
-  bool is_blocked_by_stack() {
-    return !blocked_by_stack.empty();
-  }
-  bool is_blocked() {
-    return blocked_flag || is_blocked_by_stack();
-  }
-
-  void set_blocked(bool flag);
-
-  string error_str();
-
-  int call(RGWCoroutine *next_op, int ret = 0);
-  int unwind(int retcode);
-
-  AioCompletionNotifier *create_completion_notifier();
-  RGWCompletionManager *get_completion_mgr();
-
-  void set_blocked_by(RGWCoroutinesStack *s) {
-    blocked_by_stack.insert(s);
-    s->blocking_stacks.insert(this);
-  }
-
-  bool unblock_stack(RGWCoroutinesStack **s);
-};
-
-class RGWCoroutinesManager {
-  CephContext *cct;
-
-  void handle_unblocked_stack(list<RGWCoroutinesStack *>& stacks, RGWCoroutinesStack *stack, int *waiting_count);
-protected:
-  RGWCompletionManager completion_mgr;
-
-  int ops_window;
-
-  void put_completion_notifier(AioCompletionNotifier *cn);
-public:
-  RGWCoroutinesManager(CephContext *_cct) : cct(_cct), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {}
-  virtual ~RGWCoroutinesManager() {}
-
-  int run(list<RGWCoroutinesStack *>& ops);
-  int run(RGWCoroutine *op);
-
-  virtual void report_error(RGWCoroutinesStack *op);
-
-  AioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack);
-  RGWCompletionManager *get_completion_mgr() { return &completion_mgr; }
-
-  RGWCoroutinesStack *allocate_stack() {
-    return new RGWCoroutinesStack(cct, this);
-  }
-};
-
 struct RGWMetaSyncGlobalStatus {
   enum SyncState {
     StateInit = 0,