]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: guard metadata sync initialization step
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 23 Oct 2015 00:00:08 +0000 (17:00 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Feb 2016 00:13:19 +0000 (16:13 -0800)
Guard the fetch-meta stage

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_coroutine.cc
src/rgw/rgw_coroutine.h
src/rgw/rgw_cr_rados.cc
src/rgw/rgw_cr_rados.h
src/rgw/rgw_sync.cc

index 5194fb15c90a78ce727212b5333da02990340e1f..b2146f46c942062e55ef699049ad3e29dcf6f0c0 100644 (file)
@@ -119,8 +119,8 @@ int RGWCoroutinesStack::operate(RGWCoroutinesEnv *_env)
 {
   env = _env;
   RGWCoroutine *op = *pos;
-  ldout(cct, 20) << *op << ": operate()" << dendl;
   op->stack = this;
+  ldout(cct, 20) << *op << ": operate()" << dendl;
   int r = op->operate();
   if (r < 0) {
     ldout(cct, 0) << "ERROR: op->operate() returned r=" << r << dendl;
@@ -492,11 +492,11 @@ ostream& operator<<(ostream& out, const RGWCoroutine& cr)
   return out;
 }
 
-bool RGWCoroutine::drain_children()
+bool RGWCoroutine::drain_children(int num_cr_left)
 {
   bool done = false;
   reenter(&drain_cr) {
-    while (num_spawned() > 0) {
+    while (num_spawned() > num_cr_left) {
       yield wait_for_child();
       int ret;
       while (collect(&ret)) {
index a0aae9b09f02d5bb765e2475ff3b533f9bacfae5..9237b1c80a2adc4b86ae8c3c507116e5cd62f673 100644 (file)
@@ -178,7 +178,7 @@ public:
   bool collect(int *ret); /* returns true if needs to be called again */
 
   int wait(const utime_t& interval);
-  bool drain_children(); /* returns true if needed to be called again */
+  bool drain_children(int num_cr_left); /* returns true if needed to be called again */
   void wakeup();
 
   size_t num_spawned() {
@@ -205,7 +205,10 @@ do {                            \
 } while (0)
 
 #define drain_all() \
-  yield_until_true(drain_children())
+  yield_until_true(drain_children(0))
+
+#define drain_all_but(n) \
+  yield_until_true(drain_children(n))
 
 template <class T>
 class RGWConsumerCR : public RGWCoroutine {
index 1c37dd92ba538c8343e616d7688f4c74d829a42e..c6200e33af19da28899d6a7aacb4929b55de2966 100644 (file)
@@ -497,3 +497,35 @@ int RGWAsyncRemoveObj::_send_request()
   }
   return ret;
 }
+
+int RGWContinuousLeaseCR::operate()
+{
+  reenter(this) {
+    while (!going_down.read()) {
+      yield {
+        int r = call(new RGWSimpleRadosLockCR(async_rados, store, pool, oid, lock_name, cookie, interval));
+        if (r < 0) {
+          ldout(store->ctx(), 0) << *this << ": ERROR: failed to call RGWSimpleRadosLockCR()" << dendl;
+          return set_state(RGWCoroutine_Error, r);
+        }
+      }
+      if (retcode < 0) {
+        set_locked(false);
+        ldout(store->ctx(), 20) << *this << ": couldn't lock " << pool.name << ":" << oid << ":" << lock_name << ": retcode=" << retcode << dendl;
+        return set_state(RGWCoroutine_Error, retcode);
+      }
+      set_locked(true);
+      yield wait(utime_t(interval / 2, 0));
+    }
+    set_locked(false); /* moot at this point anyway */
+    yield {
+      int r = call(new RGWSimpleRadosUnlockCR(async_rados, store, pool, oid, lock_name, cookie));
+      if (r < 0) {
+        ldout(store->ctx(), 0) << *this << ": ERROR: failed to call RGWSimpleRadosUnlockCR()" << dendl;
+        return set_state(RGWCoroutine_Error, r);
+      }
+    }
+    return set_state(RGWCoroutine_Done);
+  }
+  return 0;
+}
index 327b14f5fe2e2f04b28c93e2e162ed60b87e08a9..e21727a36c57f7dd5b48f15363fb2290e0289a84 100644 (file)
@@ -729,4 +729,51 @@ public:
   }
 };
 
+class RGWContinuousLeaseCR : public RGWCoroutine {
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRados *store;
+
+  rgw_bucket pool;
+  string oid;
+
+  string lock_name;
+  string cookie;
+
+  int interval;
+
+  Mutex lock;
+  atomic_t going_down;
+  bool locked;
+
+public:
+  RGWContinuousLeaseCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, rgw_bucket& _pool, const string& _oid,
+                       const string& _lock_name, int _interval) : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
+                                        pool(_pool), oid(_oid), lock_name(_lock_name), interval(_interval),
+                                        lock("RGWContimuousLeaseCR"), locked(false) {
+#define COOKIE_LEN 16
+    char buf[COOKIE_LEN + 1];
+
+    gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
+    cookie = buf;
+  }
+
+  int operate();
+
+  bool is_locked() {
+    Mutex::Locker l(lock);
+    return locked;
+  }
+
+  void set_locked(bool status) {
+    Mutex::Locker l(lock);
+    locked = status;
+  }
+
+  void go_down() {
+    going_down.set(1);
+    wakeup();
+  }
+};
+
+
 #endif
index 1ad451b4b59650b79990aa5b3beb133383b5d6a7..e3f54665706076b15fa002cff8b2ef0b17ebd447 100644 (file)
@@ -395,23 +395,21 @@ class RGWInitSyncStatusCoroutine : public RGWCoroutine {
   RGWHTTPManager *http_manager;
   RGWObjectCtx& obj_ctx;
 
-  string lock_name;
-  string cookie;
   rgw_meta_sync_info status;
   map<int, RGWMetadataLogInfo> shards_info;
+  RGWContinuousLeaseCR *lease_cr;
 public:
   RGWInitSyncStatusCoroutine(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWHTTPManager *_http_mgr,
                      RGWObjectCtx& _obj_ctx, uint32_t _num_shards) : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
                                                 http_manager(_http_mgr),
-                                                obj_ctx(_obj_ctx) {
-    lock_name = "sync_lock";
+                                                obj_ctx(_obj_ctx), lease_cr(NULL) {
     status.num_shards = _num_shards;
+  }
 
-#define COOKIE_LEN 16
-    char buf[COOKIE_LEN + 1];
-
-    gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
-    string cookie = buf;
+  ~RGWInitSyncStatusCoroutine() {
+    if (lease_cr) {
+      lease_cr->put();
+    }
   }
 
   int operate() {
@@ -419,12 +417,18 @@ public:
     reenter(this) {
       yield {
        uint32_t lock_duration = 30;
-       call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, mdlog_sync_status_oid,
-                                    lock_name, cookie, lock_duration));
+        string lock_name = "sync_lock";
+       lease_cr = new RGWContinuousLeaseCR(async_rados, store, store->get_zone_params().log_pool, mdlog_sync_status_oid,
+                                            lock_name, lock_duration);
+        lease_cr->get();
+        spawn(lease_cr, false);
       }
-      if (retcode < 0) {
-        ldout(cct, 0) << "ERROR: failed to take a lock on " << mdlog_sync_status_oid << dendl;
-        return set_state(RGWCoroutine_Error, retcode);
+      while (!lease_cr->is_locked()) {
+        if (lease_cr->is_done()) {
+          ldout(cct, 0) << "ERROR: lease cr failed, done early " << dendl;
+          return set_state(RGWCoroutine_Error, lease_cr->get_ret_status());
+        }
+        yield;
       }
       yield {
         call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(async_rados, store, store->get_zone_params().log_pool,
@@ -437,7 +441,7 @@ public:
        }
       }
 
-      drain_all();
+      drain_all_but(1); /* the lease cr still needs to run */
 
       yield {
         for (int i = 0; i < (int)status.num_shards; i++) {
@@ -452,10 +456,7 @@ public:
         call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(async_rados, store, store->get_zone_params().log_pool,
                                 mdlog_sync_status_oid, status));
       }
-      yield { /* unlock */
-       call(new RGWSimpleRadosUnlockCR(async_rados, store, store->get_zone_params().log_pool, mdlog_sync_status_oid,
-                                    lock_name, cookie));
-      }
+      yield lease_cr->go_down();
       while (collect(&ret)) {
        if (ret < 0) {
          return set_state(RGWCoroutine_Error);
@@ -519,12 +520,21 @@ class RGWFetchAllMetaCR : public RGWCoroutine {
 
   RGWShardedOmapCRManager *entries_index;
 
+  RGWContinuousLeaseCR *lease_cr;
+  bool lost_lock;
+
 public:
   RGWFetchAllMetaCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, int _num_shards) : RGWCoroutine(_store->ctx()), store(_store),
                                                       http_manager(_mgr),
                                                      async_rados(_async_rados),
                                                      num_shards(_num_shards),
-                                                     req_ret(0), entries_index(NULL) {
+                                                     req_ret(0), entries_index(NULL), lease_cr(NULL), lost_lock(false) {
+  }
+
+  ~RGWFetchAllMetaCR() {
+    if (lease_cr) {
+      lease_cr->put();
+    }
   }
 
   void append_section_from_set(set<string>& all_sections, const string& name) {
@@ -557,6 +567,21 @@ public:
     RGWRESTConn *conn = store->rest_master_conn;
 
     reenter(this) {
+      yield {
+        uint32_t lock_duration = 30;
+        string lock_name = "sync_lock";
+       lease_cr = new RGWContinuousLeaseCR(async_rados, store, store->get_zone_params().log_pool, mdlog_sync_status_oid,
+                                            lock_name, lock_duration);
+        lease_cr->get();
+        spawn(lease_cr, false);
+      }
+      while (!lease_cr->is_locked()) {
+        if (lease_cr->is_done()) {
+          ldout(cct, 0) << "ERROR: lease cr failed, done early " << dendl;
+          return set_state(RGWCoroutine_Error, lease_cr->get_ret_status());
+        }
+        yield;
+      }
       entries_index = new RGWShardedOmapCRManager(async_rados, store, this, num_shards,
                                                  store->get_zone_params().log_pool, mdlog_sync_full_sync_index_prefix);
       yield {
@@ -582,6 +607,10 @@ public:
            return set_state(RGWCoroutine_Error);
          }
          for (list<string>::iterator iter = result.begin(); iter != result.end(); ++iter) {
+            if (!lease_cr->is_locked()) {
+              lost_lock = true;
+              break;
+            }
            ldout(store->ctx(), 20) << "list metadata: section=" << *sections_iter << " key=" << *iter << dendl;
            string s = *sections_iter + ":" + *iter;
            entries_index->append(s);
@@ -590,6 +619,11 @@ public:
        }
       }
       yield entries_index->finish();
+
+      drain_all_but(1); /* the lease cr still needs to run */
+
+      yield lease_cr->go_down();
+
       int ret;
       while (collect(&ret)) {
        if (ret < 0) {
@@ -597,6 +631,9 @@ public:
        }
         yield;
       }
+      if (lost_lock) {
+        yield return set_state(RGWCoroutine_Error, -EBUSY);
+      }
       yield return set_state(RGWCoroutine_Done);
     }
     return 0;
@@ -1144,41 +1181,52 @@ int RGWRemoteMetaLog::run_sync(int num_shards, rgw_meta_sync_status& sync_status
         continue;
       }
       if (r < 0) {
-        ldout(store->ctx(), 0) << "ERROR: failed to init sync status" << dendl;
+        ldout(store->ctx(), 0) << "ERROR: failed to init sync status r=" << r << dendl;
         return r;
       }
     }
   } while (sync_status.sync_info.state == rgw_meta_sync_info::StateInit);
 
-  switch ((rgw_meta_sync_info::SyncState)sync_status.sync_info.state) {
-    case rgw_meta_sync_info::StateBuildingFullSyncMaps:
-      ldout(store->ctx(), 20) << __func__ << "(): building full sync maps" << dendl;
-      r = run(new RGWFetchAllMetaCR(store, &http_manager, async_rados, num_shards));
-      if (r < 0) {
-        ldout(store->ctx(), 0) << "ERROR: failed to fetch all metadata keys" << dendl;
-        return r;
-      }
+  do {
+    r = run(new RGWReadSyncStatusCoroutine(async_rados, store, obj_ctx, &sync_status));
+    if (r < 0 && r != -ENOENT) {
+      ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status r=" << r << dendl;
+      return r;
+    }
 
-      sync_status.sync_info.state = rgw_meta_sync_info::StateSync;
-      r = set_sync_info(sync_status.sync_info);
-      if (r < 0) {
-        ldout(store->ctx(), 0) << "ERROR: failed to update sync status" << dendl;
-        return r;
-      }
-      /* fall through */
-    case rgw_meta_sync_info::StateSync:
-      ldout(store->ctx(), 20) << __func__ << "(): sync" << dendl;
-      meta_sync_cr = new RGWMetaSyncCR(store, &http_manager, async_rados, sync_status);
-      r = run(meta_sync_cr);
-      if (r < 0) {
-        ldout(store->ctx(), 0) << "ERROR: failed to fetch all metadata keys" << dendl;
-        return r;
-      }
-      break;
-    default:
-      ldout(store->ctx(), 0) << "ERROR: bad sync state!" << dendl;
-      return -EIO;
-  }
+    switch ((rgw_meta_sync_info::SyncState)sync_status.sync_info.state) {
+      case rgw_meta_sync_info::StateBuildingFullSyncMaps:
+        ldout(store->ctx(), 20) << __func__ << "(): building full sync maps" << dendl;
+        r = run(new RGWFetchAllMetaCR(store, &http_manager, async_rados, num_shards));
+        if (r == -EBUSY) {
+          continue;
+        }
+        if (r < 0) {
+          ldout(store->ctx(), 0) << "ERROR: failed to fetch all metadata keys" << dendl;
+          return r;
+        }
+
+        sync_status.sync_info.state = rgw_meta_sync_info::StateSync;
+        r = set_sync_info(sync_status.sync_info);
+        if (r < 0) {
+          ldout(store->ctx(), 0) << "ERROR: failed to update sync status" << dendl;
+          return r;
+        }
+        /* fall through */
+      case rgw_meta_sync_info::StateSync:
+        ldout(store->ctx(), 20) << __func__ << "(): sync" << dendl;
+        meta_sync_cr = new RGWMetaSyncCR(store, &http_manager, async_rados, sync_status);
+        r = run(meta_sync_cr);
+        if (r < 0) {
+          ldout(store->ctx(), 0) << "ERROR: failed to fetch all metadata keys" << dendl;
+          return r;
+        }
+        break;
+      default:
+        ldout(store->ctx(), 0) << "ERROR: bad sync state!" << dendl;
+        return -EIO;
+    }
+  } while (true);
 
   return 0;
 }