]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: cr: tool for spawn window management
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 9 Jun 2020 22:37:29 +0000 (15:37 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Thu, 25 Jun 2020 19:15:12 +0000 (12:15 -0700)
Will make it possible to avoid duplicating lots of boilerplate code

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_coroutine.cc
src/rgw/rgw_coroutine.h

index 4c658ca30ac7c079f87a4daf5e686b47522701d5..ebd9df9f7fe6f8a01d59533c73bc728351c0f83a 100644 (file)
@@ -924,22 +924,58 @@ ostream& operator<<(ostream& out, const RGWCoroutine& cr)
   return out;
 }
 
-bool RGWCoroutine::drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack)
+bool RGWCoroutine::drain_children(int num_cr_left,
+                                  RGWCoroutinesStack *skip_stack,
+                                  std::optional<std::function<void(int ret)> > err_cb)
 {
   bool done = false;
   ceph_assert(num_cr_left >= 0);
   if (num_cr_left == 0 && skip_stack) {
     num_cr_left = 1;
   }
-  reenter(&drain_cr) {
+  reenter(&drain_status.cr) {
     while (num_spawned() > (size_t)num_cr_left) {
       yield wait_for_child();
       int ret;
       while (collect(&ret, skip_stack)) {
+        if (ret < 0) {
+          if (!err_cb) {
+            ldout(cct, 10) << "collect() returned ret=" << ret << dendl;
+            /* we should have reported this error */
+            log_error() << "ERROR: collect() returned error (ret=" << ret << ")";
+          } else {
+            (*err_cb)(ret);
+          }
+        }
+      }
+    }
+    done = true;
+  }
+  return done;
+}
+
+bool RGWCoroutine::drain_children(int num_cr_left,
+                                  std::optional<std::function<int(int ret)> > err_cb)
+{
+  bool done = false;
+  ceph_assert(num_cr_left >= 0);
+
+  reenter(&drain_status.cr) {
+    while (num_spawned() > (size_t)num_cr_left) {
+      yield wait_for_child();
+      int ret;
+      while (collect(&ret, nullptr)) {
         if (ret < 0) {
           ldout(cct, 10) << "collect() returned ret=" << ret << dendl;
           /* we should have reported this error */
           log_error() << "ERROR: collect() returned error (ret=" << ret << ")";
+          if (err_cb && !drain_status.should_exit) {
+            int r = (*err_cb)(ret);
+            if (r < 0) {
+              drain_status.ret = r;
+              num_cr_left = 0; /* need to drain all */
+            }
+          }
         }
       }
     }
index a38421f4fc582a73e2cc212d182cb191c0de63e6..2bb41ffbbae46420abc8c34f08e85c30365cd002 100644 (file)
@@ -219,7 +219,18 @@ class RGWCoroutine : public RefCountedObject, public boost::asio::coroutine {
 
 protected:
   bool _yield_ret;
-  boost::asio::coroutine drain_cr;
+
+  struct {
+    boost::asio::coroutine cr;
+    bool should_exit{false};
+    int ret{0};
+
+    void init() {
+      cr = boost::asio::coroutine();
+      should_exit = false;
+      ret = 0;
+    }
+  } drain_status;
 
   CephContext *cct;
 
@@ -292,7 +303,14 @@ public:
   bool collect_next(int *ret, RGWCoroutinesStack **collected_stack = NULL); /* returns true if found a stack to collect */
 
   int wait(const utime_t& interval);
-  bool drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack = NULL); /* returns true if needed to be called again */
+  bool drain_children(int num_cr_left,
+                      RGWCoroutinesStack *skip_stack = nullptr,
+                      std::optional<std::function<void(int ret)> > err_cb = std::nullopt); /* returns true if needed to be called again,
+                                                                                              err_cb is just for reporting error */
+  bool drain_children(int num_cr_left,
+                      std::optional<std::function<int(int ret)> > err_cb); /* returns true if needed to be called again,
+                                                                               err_cb is for filtering error. A negative return
+                                                                               value means that we need to exit current cr */
   void wakeup();
   void set_sleeping(bool flag); /* put in sleep, or wakeup from sleep */
 
@@ -336,17 +354,39 @@ do {                            \
 } while (0)
 
 #define drain_all() \
-  drain_cr = boost::asio::coroutine(); \
+  drain_status.init(); \
   yield_until_true(drain_children(0))
 
 #define drain_all_but(n) \
-  drain_cr = boost::asio::coroutine(); \
+  drain_status.init(); \
   yield_until_true(drain_children(n))
 
 #define drain_all_but_stack(stack) \
-  drain_cr = boost::asio::coroutine(); \
+  drain_status.init(); \
   yield_until_true(drain_children(1, stack))
 
+#define drain_all_but_stack_cb(stack, cb) \
+  drain_status.init(); \
+  yield_until_true(drain_children(1, stack, cb))
+
+#define drain_with_cb(n, err_cb) \
+  drain_status.init(); \
+  yield_until_true(drain_children(n, err_cb)); \
+  if (drain_status.should_exit) { \
+    return set_cr_error(drain_status.ret); \
+  }
+
+#define drain_all_cb(cb) \
+  drain_with_cb(0, cb)
+
+#define yield_spawn_window(cr, n, err_cb) \
+  do { \
+    spawn(cr, false); \
+    drain_with_cb(n, err_cb); /* this is guaranteed to yield */ \
+  } while (0)
+
+
+
 template <class T>
 class RGWConsumerCR : public RGWCoroutine {
   list<T> product;