]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: guard metadata full/incremental sync
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 23 Oct 2015 00:42:23 +0000 (17:42 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Feb 2016 00:13:19 +0000 (16:13 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_sync.cc

index e3f54665706076b15fa002cff8b2ef0b17ebd447..51eeed410742a900d95fc0366cbf07c033e58ad7 100644 (file)
@@ -926,6 +926,9 @@ class RGWMetaSyncShardCR : public RGWCoroutine {
   boost::asio::coroutine incremental_cr;
   boost::asio::coroutine full_cr;
 
+  RGWContinuousLeaseCR *lease_cr;
+  bool lost_lock;
+
 
 public:
   RGWMetaSyncShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
@@ -936,11 +939,15 @@ public:
                                                      pool(_pool),
                                                      shard_id(_shard_id),
                                                      sync_marker(_marker),
-                                                      marker_tracker(NULL), truncated(false), inc_lock("RGWMetaSyncShardCR::inc_lock") {
+                                                      marker_tracker(NULL), truncated(false), inc_lock("RGWMetaSyncShardCR::inc_lock"),
+                                                      lease_cr(NULL), lost_lock(false) {
   }
 
   ~RGWMetaSyncShardCR() {
     delete marker_tracker;
+    if (lease_cr) {
+      lease_cr->put();
+    }
   }
 
   void set_marker_tracker(RGWMetaSyncShardMarkerTrack *mt) {
@@ -949,13 +956,21 @@ public:
   }
 
   int operate() {
+    int r;
     while (true) {
       switch (sync_marker.state) {
       case rgw_meta_sync_marker::FullSync:
-        return full_sync();
+         r  = full_sync();
+         if (r < 0) {
+           ldout(store->ctx(), 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
+         }
+         return 0;
       case rgw_meta_sync_marker::IncrementalSync:
-        return incremental_sync();
-        break;
+         r  = incremental_sync();
+         if (r < 0) {
+           ldout(store->ctx(), 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
+         }
+         return 0;
       default:
         return set_state(RGWCoroutine_Error, -EIO);
       }
@@ -970,10 +985,37 @@ public:
     int max_entries = OMAP_GET_MAX_ENTRIES;
     reenter(&full_cr) {
       oid = full_sync_index_shard_oid(shard_id);
+      /* grab lock */
+      yield {
+        uint32_t lock_duration = 30;
+        string lock_name = "sync_lock";
+        if (lease_cr) {
+          lease_cr->put();
+        }
+       lease_cr = new RGWContinuousLeaseCR(async_rados, store, store->get_zone_params().log_pool,
+                                            RGWMetaSyncStatusManager::shard_obj_name(shard_id),
+                                            lock_name, lock_duration);
+        lease_cr->get();
+        spawn(lease_cr, false);
+        lost_lock = false;
+      }
+      while (!lease_cr->is_locked()) {
+        if (lease_cr->is_done()) {
+          ldout(cct, 0) << "ERROR: lease cr failed, done early " << dendl;
+          return set_state(RGWCoroutine_Error, lease_cr->get_ret_status());
+        }
+        yield;
+      }
+      /* prepare marker tracker */
       set_marker_tracker(new RGWMetaSyncShardMarkerTrack(store, http_manager, async_rados,
                                                          RGWMetaSyncStatusManager::shard_obj_name(shard_id),
                                                          sync_marker));
+      /* sync! */
       do {
+        if (!lease_cr->is_locked()) {
+          lost_lock = true;
+          break;
+        }
         yield return call(new RGWRadosGetOmapKeysCR(store, pool, oid, sync_marker.marker, &entries, max_entries));
         if (retcode < 0) {
           ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
@@ -992,19 +1034,30 @@ public:
         }
       } while ((int)entries.size() == max_entries);
 
-      drain_all();
+      drain_all_but(1);
 
-      yield {
-        /* update marker to reflect we're done with full sync */
-        sync_marker.state = rgw_meta_sync_marker::IncrementalSync;
-        sync_marker.marker = sync_marker.next_step_marker;
-        sync_marker.next_step_marker.clear();
-        call(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(async_rados, store, store->get_zone_params().log_pool,
-                                                                   RGWMetaSyncStatusManager::shard_obj_name(shard_id), sync_marker));
+      if (!lost_lock) {
+        yield {
+          /* update marker to reflect we're done with full sync */
+          sync_marker.state = rgw_meta_sync_marker::IncrementalSync;
+          sync_marker.marker = sync_marker.next_step_marker;
+          sync_marker.next_step_marker.clear();
+          call(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(async_rados, store, store->get_zone_params().log_pool,
+                                                               RGWMetaSyncStatusManager::shard_obj_name(shard_id), sync_marker));
+        }
+        if (retcode < 0) {
+          ldout(store->ctx(), 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
+          return set_state(RGWCoroutine_Error, retcode);
+        }
       }
-      if (retcode < 0) {
-        ldout(store->ctx(), 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
-        return set_state(RGWCoroutine_Error, retcode);
+
+      yield lease_cr->go_down();
+
+      lease_cr->put();
+      lease_cr = NULL;
+
+      if (lost_lock) {
+        return -EBUSY;
       }
     }
     return 0;
@@ -1013,11 +1066,37 @@ public:
 
   int incremental_sync() {
     reenter(&incremental_cr) {
+      /* grab lock */
+      yield {
+        uint32_t lock_duration = 30;
+        string lock_name = "sync_lock";
+        if (lease_cr) {
+          lease_cr->put();
+        }
+       lease_cr = new RGWContinuousLeaseCR(async_rados, store, store->get_zone_params().log_pool,
+                                            RGWMetaSyncStatusManager::shard_obj_name(shard_id),
+                                            lock_name, lock_duration);
+        lease_cr->get();
+        spawn(lease_cr, false);
+        lost_lock = false;
+      }
+      while (!lease_cr->is_locked()) {
+        if (lease_cr->is_done()) {
+          ldout(cct, 0) << "ERROR: lease cr failed, done early " << dendl;
+          return set_state(RGWCoroutine_Error, lease_cr->get_ret_status());
+        }
+        yield;
+      }
       mdlog_marker = sync_marker.marker;
       set_marker_tracker(new RGWMetaSyncShardMarkerTrack(store, http_manager, async_rados,
                                                          RGWMetaSyncStatusManager::shard_obj_name(shard_id),
                                                          sync_marker));
+      /* inc sync */
       do {
+        if (!lease_cr->is_locked()) {
+          lost_lock = true;
+          break;
+        }
 #define INCREMENTAL_MAX_ENTRIES 100
        ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
        if (mdlog_marker <= sync_marker.marker) {
@@ -1044,6 +1123,15 @@ public:
          yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
        }
       } while (true);
+
+      yield lease_cr->go_down();
+
+      lease_cr->put();
+      lease_cr = NULL;
+
+      if (lost_lock) {
+        return -EBUSY;
+      }
     }
     /* TODO */
     return 0;