]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: cr: add coroutine stack id
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 25 Jun 2020 20:47:51 +0000 (13:47 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Thu, 25 Jun 2020 20:47:51 +0000 (13:47 -0700)
The stack id is unique per cr stack manager, and can be used to identify
collected stacks.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_coroutine.cc
src/rgw/rgw_coroutine.h
src/rgw/rgw_data_sync.cc

index 3279392507efab13b910bb88cf427960406cc1d7..512ead62d331f86d60cec3065f08fd30823435a3 100644 (file)
@@ -196,12 +196,17 @@ int64_t RGWCoroutinesManager::get_next_io_id()
   return (int64_t)++max_io_id;
 }
 
+uint64_t RGWCoroutinesManager::get_next_stack_id() {
+  return (uint64_t)++max_stack_id;
+}
+
 RGWCoroutinesStack::RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start) : cct(_cct), ops_mgr(_ops_mgr),
                                                                                                          done_flag(false), error_flag(false), blocked_flag(false),
                                                                                                          sleep_flag(false), interval_wait_flag(false), is_scheduled(false), is_waiting_for_child(false),
                                                                                                         retcode(0), run_count(0),
                                                                                                         env(NULL), parent(NULL)
 {
+  id = ops_mgr->get_next_stack_id();
   if (start) {
     ops.push_back(start);
   }
@@ -360,7 +365,7 @@ void RGWCoroutinesStack::cancel()
   put();
 }
 
-bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
+bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id) /* returns true if needs to be called again */
 {
   bool need_retry = false;
   rgw_spawned_stacks *s = (op ? &op->spawned : &spawned);
@@ -378,6 +383,9 @@ bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack
       }
       continue;
     }
+    if (stack_id) {
+      *stack_id = stack->get_id();
+    }
     int r = stack->get_ret_status();
     stack->put();
     if (r < 0) {
@@ -426,9 +434,9 @@ bool RGWCoroutinesStack::collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesS
   return false;
 }
 
-bool RGWCoroutinesStack::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
+bool RGWCoroutinesStack::collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t  *stack_id) /* returns true if needs to be called again */
 {
-  return collect(NULL, ret, skip_stack);
+  return collect(NULL, ret, skip_stack, stack_id);
 }
 
 static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg)
@@ -884,9 +892,9 @@ RGWCoroutinesStack *RGWCoroutine::spawn(RGWCoroutine *op, bool wait)
   return stack->spawn(this, op, wait);
 }
 
-bool RGWCoroutine::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
+bool RGWCoroutine::collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id) /* returns true if needs to be called again */
 {
-  return stack->collect(this, ret, skip_stack);
+  return stack->collect(this, ret, skip_stack, stack_id);
 }
 
 bool RGWCoroutine::collect_next(int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */
@@ -926,7 +934,7 @@ ostream& operator<<(ostream& out, const RGWCoroutine& cr)
 
 bool RGWCoroutine::drain_children(int num_cr_left,
                                   RGWCoroutinesStack *skip_stack,
-                                  std::optional<std::function<void(int ret)> > cb)
+                                  std::optional<std::function<void(uint64_t stack_id, int ret)> > cb)
 {
   bool done = false;
   ceph_assert(num_cr_left >= 0);
@@ -937,14 +945,15 @@ bool RGWCoroutine::drain_children(int num_cr_left,
     while (num_spawned() > (size_t)num_cr_left) {
       yield wait_for_child();
       int ret;
-      while (collect(&ret, skip_stack)) {
+      uint64_t stack_id;
+      while (collect(&ret, skip_stack, &stack_id)) {
         if (ret < 0) {
             ldout(cct, 10) << "collect() returned ret=" << ret << dendl;
             /* we should have reported this error */
             log_error() << "ERROR: collect() returned error (ret=" << ret << ")";
         }
         if (cb) {
-          (*cb)(ret);
+          (*cb)(stack_id, ret);
         }
       }
     }
@@ -954,7 +963,7 @@ bool RGWCoroutine::drain_children(int num_cr_left,
 }
 
 bool RGWCoroutine::drain_children(int num_cr_left,
-                                  std::optional<std::function<int(int ret)> > cb)
+                                  std::optional<std::function<int(uint64_t stack_id, int ret)> > cb)
 {
   bool done = false;
   ceph_assert(num_cr_left >= 0);
@@ -963,14 +972,15 @@ bool RGWCoroutine::drain_children(int num_cr_left,
     while (num_spawned() > (size_t)num_cr_left) {
       yield wait_for_child();
       int ret;
-      while (collect(&ret, nullptr)) {
+      uint64_t stack_id;
+      while (collect(&ret, nullptr, &stack_id)) {
         if (ret < 0) {
           ldout(cct, 10) << "collect() returned ret=" << ret << dendl;
           /* we should have reported this error */
           log_error() << "ERROR: collect() returned error (ret=" << ret << ")";
         }
         if (cb && !drain_status.should_exit) {
-          int r = (*cb)(ret);
+          int r = (*cb)(stack_id, ret);
           if (r < 0) {
             drain_status.ret = r;
             num_cr_left = 0; /* need to drain all */
index 048dc198493860932f67e44f24c1ad80a0d36249..f589b4eac5d0870907598fff9578c2c422acae68 100644 (file)
@@ -299,20 +299,20 @@ public:
 
   void call(RGWCoroutine *op); /* call at the same stack we're in */
   RGWCoroutinesStack *spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */
-  bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
+  bool collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id = nullptr); /* returns true if needs to be called again */
   bool collect_next(int *ret, RGWCoroutinesStack **collected_stack = NULL); /* returns true if found a stack to collect */
 
   int wait(const utime_t& interval);
   bool drain_children(int num_cr_left,
                       RGWCoroutinesStack *skip_stack = nullptr,
-                      std::optional<std::function<void(int ret)> > cb = std::nullopt); /* returns true if needed to be called again,
-                                                                                          cb will be called on completion of every
-                                                                                          completion. */
+                      std::optional<std::function<void(uint64_t stack_id, int ret)> > cb = std::nullopt); /* returns true if needed to be called again,
+                                                                                                             cb will be called on completion of every
+                                                                                                             completion. */
   bool drain_children(int num_cr_left,
-                      std::optional<std::function<int(int ret)> > cb); /* returns true if needed to be called again,
-                                                                          cb will be called on every completion, can filter errors.
-                                                                          A negative return value from cb means that current cr
-                                                                          will need to exit */
+                      std::optional<std::function<int(uint64_t stack_id, int ret)> > cb); /* returns true if needed to be called again,
+                                                                                             cb will be called on every completion, can filter errors.
+                                                                                             A negative return value from cb means that current cr
+                                                                                             will need to exit */
   void wakeup();
   void set_sleeping(bool flag); /* put in sleep, or wakeup from sleep */
 
@@ -425,6 +425,8 @@ class RGWCoroutinesStack : public RefCountedObject {
 
   CephContext *cct;
 
+  int64_t id{-1};
+
   RGWCoroutinesManager *ops_mgr;
 
   list<RGWCoroutine *> ops;
@@ -457,12 +459,16 @@ protected:
   RGWCoroutinesStack *parent;
 
   RGWCoroutinesStack *spawn(RGWCoroutine *source_op, RGWCoroutine *next_op, bool wait);
-  bool collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
+  bool collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id); /* returns true if needs to be called again */
   bool collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack); /* returns true if found a stack to collect */
 public:
   RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL);
   ~RGWCoroutinesStack() override;
 
+  int64_t get_id() const {
+    return id;
+  }
+
   int operate(RGWCoroutinesEnv *env);
 
   bool is_done() {
@@ -534,7 +540,7 @@ public:
   }
   void io_complete(const rgw_io_id& io_id);
 
-  bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
+  bool collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id); /* returns true if needs to be called again */
 
   void cancel();
 
@@ -617,6 +623,7 @@ class RGWCoroutinesManager {
   map<uint64_t, set<RGWCoroutinesStack *> > run_contexts;
 
   std::atomic<int64_t> max_io_id = { 0 };
+  std::atomic<uint64_t> max_stack_id = { 0 };
 
   mutable ceph::shared_mutex lock =
     ceph::make_shared_mutex("RGWCoroutinesManager::lock");
@@ -671,6 +678,7 @@ public:
   RGWCoroutinesStack *allocate_stack();
 
   int64_t get_next_io_id();
+  uint64_t get_next_stack_id();
 
   void set_sleeping(RGWCoroutine *cr, bool flag);
   void io_complete(RGWCoroutine *cr, const rgw_io_id& io_id);
index 9b1d4e7c56ae0dcde2bd4867f15639739a3c80a1..082916d97ac87acada1d47d49a6121ea66d8cf02 100644 (file)
@@ -1576,7 +1576,7 @@ public:
           sync_marker.marker = iter->first;
 
           drain_all_but_stack_cb(lease_stack.get(),
-                                 [&](int ret) {
+                                 [&](uint64_t stack_id, int ret) {
                                    if (ret < 0) {
                                      tn->log(10, "a sync operation returned error");
                                    }
@@ -1724,7 +1724,7 @@ public:
           }
 
           drain_all_but_stack_cb(lease_stack.get(),
-                                 [&](int ret) {
+                                 [&](uint64_t stack_id, int ret) {
                                    if (ret < 0) {
                                      tn->log(10, "a sync operation returned error");
                                    }
@@ -3792,7 +3792,7 @@ int RGWBucketShardFullSyncCR::operate()
                       false);
         }
         drain_with_cb(BUCKET_SYNC_SPAWN_WINDOW,
-                      [&](int ret) {
+                      [&](uint64_t stack_id, int ret) {
                 if (ret < 0) {
                   tn->log(10, "a sync operation returned error");
                   sync_status = ret;
@@ -3804,7 +3804,7 @@ int RGWBucketShardFullSyncCR::operate()
     set_status("done iterating over all objects");
     /* wait for all operations to complete */
 
-    drain_all_cb([&](int ret) {
+    drain_all_cb([&](uint64_t stack_id, int ret) {
       if (ret < 0) {
         tn->log(10, "a sync operation returned error");
         sync_status = ret;
@@ -4089,7 +4089,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
           }
         // }
         drain_with_cb(BUCKET_SYNC_SPAWN_WINDOW,
-                      [&](int ret) {
+                      [&](uint64_t stack_id, int ret) {
                 if (ret < 0) {
                   tn->log(10, "a sync operation returned error");
                   sync_status = ret;
@@ -4099,7 +4099,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
       }
     } while (!list_result.empty() && sync_status == 0 && !syncstopped);
 
-    drain_all_cb([&](int ret) {
+    drain_all_cb([&](uint64_t stack_id, int ret) {
       if (ret < 0) {
         tn->log(10, "a sync operation returned error");
         sync_status = ret;
@@ -4351,7 +4351,7 @@ int RGWRunBucketSourcesSyncCR::operate()
         yield_spawn_window(new RGWRunBucketSyncCoroutine(sc, lease_cr, sync_pair, tn,
                                                          &*cur_shard_progress),
                            BUCKET_SYNC_SPAWN_WINDOW,
-                           [&](int ret) {
+                           [&](uint64_t stack_id, int ret) {
                              if (ret < 0) {
                                tn->log(10, "a sync operation returned error");
                              }
@@ -4359,7 +4359,7 @@ int RGWRunBucketSourcesSyncCR::operate()
                            });
       }
     }
-    drain_all_cb([&](int ret) {
+    drain_all_cb([&](uint64_t stack_id, int ret) {
                    if (ret < 0) {
                      tn->log(10, "a sync operation returned error");
                    }