]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: collect skips a specific coroutine stack 10845/head
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 13 Jul 2016 03:36:35 +0000 (20:36 -0700)
committerCasey Bodley <cbodley@redhat.com>
Wed, 24 Aug 2016 20:48:29 +0000 (16:48 -0400)
Fixes: http://tracker.ceph.com/issues/16665
Instead of drain_all_but() that specifies number of stacks to leave behind,
added drain_all_but_stack() that has a specific stack specified. This is needed
so that we don't call wakeup() through lease_cr->go_down() on a cr stack that
was already collected.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
(cherry picked from commit 5a2e8f0526db92a290c711f82627fc5042c290ea)

src/rgw/rgw_coroutine.cc
src/rgw/rgw_coroutine.h
src/rgw/rgw_data_sync.cc
src/rgw/rgw_sync.cc

index a09afc85532cb044db8ad02417711b301913c402..2e478f23850605d43e35d8f5d464bbc76174d6b2 100644 (file)
@@ -292,7 +292,7 @@ int RGWCoroutinesStack::unwind(int retcode)
 }
 
 
-bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret) /* returns true if needs to be called again */
+bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
 {
   rgw_spawned_stacks *s = (op ? &op->spawned : &spawned);
   *ret = 0;
@@ -300,7 +300,7 @@ bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret) /* returns true if
 
   for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) {
     RGWCoroutinesStack *stack = *iter;
-    if (!stack->is_done()) {
+    if (stack == skip_stack || !stack->is_done()) {
       new_list.push_back(stack);
       ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is still running" << dendl;
       continue;
@@ -349,9 +349,9 @@ bool RGWCoroutinesStack::collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesS
   return false;
 }
 
-bool RGWCoroutinesStack::collect(int *ret) /* returns true if needs to be called again */
+bool RGWCoroutinesStack::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
 {
-  return collect(NULL, ret);
+  return collect(NULL, ret, skip_stack);
 }
 
 static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg);
@@ -712,9 +712,9 @@ RGWCoroutinesStack *RGWCoroutine::spawn(RGWCoroutine *op, bool wait)
   return stack->spawn(this, op, wait);
 }
 
-bool RGWCoroutine::collect(int *ret) /* returns true if needs to be called again */
+bool RGWCoroutine::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
 {
-  return stack->collect(this, ret);
+  return stack->collect(this, ret, skip_stack);
 }
 
 bool RGWCoroutine::collect_next(int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */
@@ -752,14 +752,18 @@ ostream& operator<<(ostream& out, const RGWCoroutine& cr)
   return out;
 }
 
-bool RGWCoroutine::drain_children(int num_cr_left)
+bool RGWCoroutine::drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack)
 {
   bool done = false;
+  assert(num_cr_left >= 0);
+  if (num_cr_left == 0 && skip_stack) {
+    num_cr_left = 1;
+  }
   reenter(&drain_cr) {
     while (num_spawned() > (size_t)num_cr_left) {
       yield wait_for_child();
       int ret;
-      while (collect(&ret)) {
+      while (collect(&ret, skip_stack)) {
         if (ret < 0) {
           ldout(cct, 10) << "collect() returned ret=" << ret << dendl;
           /* we should have reported this error */
index 11addf68b2300dbe29939cd1f6e0cf5793be26f2..6b17fa0a69b7b838b976e9fe431706e64f09c17e 100644 (file)
@@ -265,11 +265,11 @@ public:
 
   void call(RGWCoroutine *op); /* call at the same stack we're in */
   RGWCoroutinesStack *spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */
-  bool collect(int *ret); /* returns true if needs to be called again */
+  bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
   bool collect_next(int *ret, RGWCoroutinesStack **collected_stack = NULL); /* returns true if found a stack to collect */
 
   int wait(const utime_t& interval);
-  bool drain_children(int num_cr_left); /* returns true if needed to be called again */
+  bool drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack = NULL); /* returns true if needed to be called again */
   void wakeup();
   void set_sleeping(bool flag); /* put in sleep, or wakeup from sleep */
 
@@ -306,6 +306,10 @@ do {                            \
   drain_cr = boost::asio::coroutine(); \
   yield_until_true(drain_children(n))
 
+#define drain_all_but_stack(stack) \
+  drain_cr = boost::asio::coroutine(); \
+  yield_until_true(drain_children(1, stack))
+
 template <class T>
 class RGWConsumerCR : public RGWCoroutine {
   list<T> product;
@@ -371,7 +375,7 @@ protected:
   RGWCoroutinesStack *parent;
 
   RGWCoroutinesStack *spawn(RGWCoroutine *source_op, RGWCoroutine *next_op, bool wait);
-  bool collect(RGWCoroutine *op, int *ret); /* returns true if needs to be called again */
+  bool collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
   bool collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack); /* returns true if found a stack to collect */
 public:
   RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL);
@@ -442,7 +446,7 @@ public:
   int wait(const utime_t& interval);
   void wakeup();
 
-  bool collect(int *ret); /* returns true if needs to be called again */
+  bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
 
   RGWAioCompletionNotifier *create_completion_notifier();
   RGWCompletionManager *get_completion_mgr();
index 0ef55230ef929f164efbdbd9bd62803c7f9e53d5..95d5b8de1bebe999cd3a5354d5a746fdca3b1e10 100644 (file)
@@ -471,7 +471,7 @@ public:
           spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, i, &shards_info[i]), true);
        }
       }
-      while (collect(&ret)) {
+      while (collect(&ret, NULL)) {
        if (ret < 0) {
          return set_state(RGWCoroutine_Error);
        }
@@ -496,7 +496,7 @@ public:
        call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
                                     lock_name, cookie));
       }
-      while (collect(&ret)) {
+      while (collect(&ret, NULL)) {
        if (ret < 0) {
          return set_state(RGWCoroutine_Error);
        }
@@ -723,7 +723,7 @@ public:
           yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
                                                           EIO, string("failed to build bucket instances map")));
       }
-      while (collect(&ret)) {
+      while (collect(&ret, NULL)) {
        if (ret < 0) {
           yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
                                                           -ret, string("failed to store sync status: ") + cpp_strerror(-ret)));
@@ -985,6 +985,7 @@ class RGWDataSyncShardCR : public RGWCoroutine {
   set<string> spawned_keys;
 
   RGWContinuousLeaseCR *lease_cr;
+  RGWCoroutinesStack *lease_stack;
   string status_oid;
 
 
@@ -1012,7 +1013,7 @@ public:
                                                      sync_marker(_marker),
                                                       marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
                                                       total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
-                                                      lease_cr(NULL), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
+                                                      lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
                                                       retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT) {
     set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id;
     status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
@@ -1079,7 +1080,7 @@ public:
     lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid,
                                         lock_name, lock_duration, this);
     lease_cr->get();
-    spawn(lease_cr, false);
+    lease_stack = spawn(lease_cr, false);
   }
 
   int full_sync() {
@@ -1251,7 +1252,7 @@ public:
             set_status() << "num_spawned() > spawn_window";
             yield wait_for_child();
             int ret;
-            while (collect(&ret)) {
+            while (collect(&ret, lease_stack)) {
               if (ret < 0) {
                 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
                 /* we have reported this error */
@@ -2191,6 +2192,7 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine {
   int total_entries;
 
   RGWContinuousLeaseCR *lease_cr;
+  RGWCoroutinesStack *lease_stack;
 
   string status_oid;
 
@@ -2204,7 +2206,7 @@ public:
                                                                             full_marker(_full_marker), marker_tracker(NULL),
                                                                             spawn_window(BUCKET_SYNC_SPAWN_WINDOW), entry(NULL),
                                                                             op(CLS_RGW_OP_ADD),
-                                                                            total_entries(0), lease_cr(NULL) {
+                                                                            total_entries(0), lease_cr(nullptr), lease_stack(nullptr) {
     status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs);
     logger.init(sync_env, "BucketFull", bs.get_key());
   }
@@ -2231,7 +2233,7 @@ int RGWBucketShardFullSyncCR::operate()
       lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid,
                                           lock_name, lock_duration, this);
       lease_cr->get();
-      spawn(lease_cr, false);
+      lease_stack = spawn(lease_cr, false);
     }
     while (!lease_cr->is_locked()) {
       if (lease_cr->is_done()) {
@@ -2280,7 +2282,7 @@ int RGWBucketShardFullSyncCR::operate()
         }
         while ((int)num_spawned() > spawn_window) {
           yield wait_for_child();
-          while (collect(&ret)) {
+          while (collect(&ret, lease_stack)) {
             if (ret < 0) {
               ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
               /* we have reported this error */
@@ -2291,7 +2293,7 @@ int RGWBucketShardFullSyncCR::operate()
     } while (list_result.is_truncated);
     set_status("done iterating over all objects");
     /* wait for all operations to complete */
-    drain_all_but(1); /* still need to hold lease cr */
+    drain_all_but_stack(lease_stack); /* still need to hold lease cr */
     /* update sync state to incremental */
     yield {
       rgw_bucket_shard_sync_info sync_status;
@@ -2329,6 +2331,7 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
   const int spawn_window{BUCKET_SYNC_SPAWN_WINDOW};
   bool updated_status{false};
   RGWContinuousLeaseCR *lease_cr{nullptr};
+  RGWCoroutinesStack *lease_stack{nullptr};
   const string status_oid;
 
   string name;
@@ -2375,7 +2378,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
       lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid,
                                           lock_name, lock_duration, this);
       lease_cr->get();
-      spawn(lease_cr, false);
+      lease_stack = spawn(lease_cr, false);
     }
     while (!lease_cr->is_locked()) {
       if (lease_cr->is_done()) {
@@ -2396,7 +2399,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
                                              &list_result));
       if (retcode < 0 && retcode != -ENOENT) {
         /* wait for all operations to complete */
-        drain_all_but(1);
+        drain_all_but_stack(lease_stack);
         lease_cr->go_down();
         drain_all();
         return set_cr_error(retcode);
@@ -2476,7 +2479,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
           }
           ldout(sync_env->cct, 5) << *this << ": [inc sync] can't do op on key=" << key << " need to wait for conflicting operation to complete" << dendl;
           yield wait_for_child();
-          while (collect(&ret)) {
+          while (collect(&ret, lease_stack)) {
             if (ret < 0) {
               ldout(sync_env->cct, 0) << "ERROR: a child operation returned error (ret=" << ret << ")" << dendl;
               /* we have reported this error */
@@ -2508,7 +2511,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
         while ((int)num_spawned() > spawn_window) {
           set_status() << "num_spawned() > spawn_window";
           yield wait_for_child();
-          while (collect(&ret)) {
+          while (collect(&ret, lease_stack)) {
             if (ret < 0) {
               ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
               /* we have reported this error */
@@ -2524,13 +2527,13 @@ int RGWBucketShardIncrementalSyncCR::operate()
     }
     if (retcode < 0) {
       ldout(sync_env->cct, 0) << "ERROR: marker_tracker->flush() returned retcode=" << retcode << dendl;
-      drain_all_but(1);
+      drain_all_but_stack(lease_stack);
       lease_cr->go_down();
       drain_all();
       return set_cr_error(retcode);
     }
 
-    drain_all_but(1);
+    drain_all_but_stack(lease_stack);
     lease_cr->go_down();
     /* wait for all operations to complete */
     drain_all();
index de71dffffd17620382c9fed72317907c4e9863eb..4afa879a10a65010a9603a1526fb2cb188d113db 100644 (file)
@@ -591,12 +591,13 @@ class RGWInitSyncStatusCoroutine : public RGWCoroutine {
   rgw_meta_sync_info status;
   vector<RGWMetadataLogInfo> shards_info;
   RGWContinuousLeaseCR *lease_cr;
+  RGWCoroutinesStack *lease_stack;
 public:
   RGWInitSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
                              const rgw_meta_sync_info &status)
     : RGWCoroutine(_sync_env->store->ctx()), sync_env(_sync_env),
       status(status), shards_info(status.num_shards),
-      lease_cr(NULL) {}
+      lease_cr(nullptr), lease_stack(nullptr) {}
 
   ~RGWInitSyncStatusCoroutine() {
     if (lease_cr) {
@@ -616,7 +617,7 @@ public:
        lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_env->status_oid(),
                                             lock_name, lock_duration, this);
         lease_cr->get();
-        spawn(lease_cr, false);
+        lease_stack = spawn(lease_cr, false);
       }
       while (!lease_cr->is_locked()) {
         if (lease_cr->is_done()) {
@@ -649,7 +650,7 @@ public:
        }
       }
 
-      drain_all_but(1); /* the lease cr still needs to run */
+      drain_all_but_stack(lease_stack); /* the lease cr still needs to run */
 
       yield {
         set_status("updating sync status");
@@ -672,7 +673,7 @@ public:
       }
       set_status("drop lock lease");
       yield lease_cr->go_down();
-      while (collect(&ret)) {
+      while (collect(&ret, NULL)) {
        if (ret < 0) {
          return set_cr_error(ret);
        }
@@ -735,6 +736,7 @@ class RGWFetchAllMetaCR : public RGWCoroutine {
   RGWShardedOmapCRManager *entries_index;
 
   RGWContinuousLeaseCR *lease_cr;
+  RGWCoroutinesStack *lease_stack;
   bool lost_lock;
   bool failed;
 
@@ -744,7 +746,8 @@ public:
   RGWFetchAllMetaCR(RGWMetaSyncEnv *_sync_env, int _num_shards,
                     map<uint32_t, rgw_meta_sync_marker>& _markers) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
                                                      num_shards(_num_shards),
-                                                     ret_status(0), entries_index(NULL), lease_cr(NULL), lost_lock(false), failed(false), markers(_markers) {
+                                                     ret_status(0), entries_index(NULL), lease_cr(nullptr), lease_stack(nullptr),
+                                                      lost_lock(false), failed(false), markers(_markers) {
   }
 
   ~RGWFetchAllMetaCR() {
@@ -789,7 +792,7 @@ public:
        lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, sync_env->store, sync_env->store->get_zone_params().log_pool, sync_env->status_oid(),
                                             lock_name, lock_duration, this);
         lease_cr->get();
-        spawn(lease_cr, false);
+        lease_stack = spawn(lease_cr, false);
       }
       while (!lease_cr->is_locked()) {
         if (lease_cr->is_done()) {
@@ -868,12 +871,12 @@ public:
         }
       }
 
-      drain_all_but(1); /* the lease cr still needs to run */
+      drain_all_but_stack(lease_stack); /* the lease cr still needs to run */
 
       yield lease_cr->go_down();
 
       int ret;
-      while (collect(&ret)) {
+      while (collect(&ret, NULL)) {
        if (ret < 0) {
          return set_cr_error(ret);
        }
@@ -1250,6 +1253,7 @@ class RGWMetaSyncShardCR : public RGWCoroutine {
   boost::asio::coroutine full_cr;
 
   RGWContinuousLeaseCR *lease_cr = nullptr;
+  RGWCoroutinesStack *lease_stack = nullptr;
   bool lost_lock = false;
 
   bool *reset_backoff;
@@ -1380,7 +1384,7 @@ public:
                                             sync_env->shard_obj_name(shard_id),
                                             lock_name, lock_duration, this);
         lease_cr->get();
-        spawn(lease_cr, false);
+        lease_stack = spawn(lease_cr, false);
         lost_lock = false;
       }
       while (!lease_cr->is_locked()) {
@@ -1502,7 +1506,7 @@ public:
                                               sync_env->shard_obj_name(shard_id),
                                               lock_name, lock_duration, this);
           lease_cr->get();
-          spawn(lease_cr, false);
+          lease_stack = spawn(lease_cr, false);
           lost_lock = false;
         }
         while (!lease_cr->is_locked()) {
@@ -1725,7 +1729,7 @@ public:
           }
         }
         // wait for each shard to complete
-        collect(&ret);
+        collect(&ret, NULL);
         drain_all();
         {
           // drop shard cr refs under lock