]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/multisite: full sync and incremental sync changes to handle lost_lock and lost_bid
authorShilpa Jagannath <smanjara@redhat.com>
Thu, 11 May 2023 17:36:25 +0000 (13:36 -0400)
committerShilpa Jagannath <smanjara@redhat.com>
Wed, 14 Jun 2023 23:59:55 +0000 (19:59 -0400)
Signed-off-by: Shilpa Jagannath <smanjara@redhat.com>
src/rgw/driver/rados/rgw_data_sync.cc
src/rgw/driver/rados/rgw_data_sync.h

index 9b6401530b5ea3b41f6fddd82747da30e9c13a80..135b2d958b282a4ebef3a481ba0b5f2902db6bcc 100644 (file)
@@ -1766,6 +1766,8 @@ class RGWDataFullSyncShardCR : public RGWDataBaseSyncShardCR {
   std::map<std::string, bufferlist> entries;
   std::map<std::string, bufferlist>::iterator iter;
   string error_marker;
+  bool lost_lock = false;
+  bool lost_bid = false;
 
 public:
 
@@ -1789,10 +1791,17 @@ public:
       entry_timestamp = sync_marker.timestamp; // time when full sync started
       do {
         if (!lease_cr->is_locked()) {
-          drain_all();
           tn->log(1, "lease is lost, abort");
-          return set_cr_error(-ECANCELED);
+          lost_lock = true;
+          break;
+          }
+
+        if (!sc->env->bid_manager->is_highest_bidder(shard_id)) {
+          tn->log(1, "lost bid");
+          lost_bid = true;
+          break;
         }
+
         omapvals = std::make_shared<RGWRadosGetOmapValsCR::Result>();
         yield call(new RGWRadosGetOmapValsCR(sc->env->driver,
                                             rgw_raw_obj(pool, oid),
@@ -1836,29 +1845,35 @@ public:
       } while (omapvals->more);
       omapvals.reset();
 
-      drain_all();
-
       tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
 
-      /* update marker to reflect we're done with full sync */
-      sync_marker.state = rgw_data_sync_marker::IncrementalSync;
-      sync_marker.marker = sync_marker.next_step_marker;
-      sync_marker.next_step_marker.clear();
-      yield call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(
-             sc->env->dpp, sc->env->driver,
-             rgw_raw_obj(pool, status_oid), sync_marker, &objv));
-      if (retcode < 0) {
-        tn->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode));
-        return set_cr_error(retcode);
+      if (lost_bid) {
+        yield call(marker_tracker->flush());
+      } else if (!lost_lock) {
+        /* update marker to reflect we're done with full sync */
+        sync_marker.state = rgw_data_sync_marker::IncrementalSync;
+        sync_marker.marker = sync_marker.next_step_marker;
+        sync_marker.next_step_marker.clear();
+        yield call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(
+              sc->env->dpp, sc->env->driver,
+              rgw_raw_obj(pool, status_oid), sync_marker, &objv));
+        if (retcode < 0) {
+          tn->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode));
+          return set_cr_error(retcode);
+        }
+
+        // clean up full sync index, ignoring errors
+        yield call(new RGWRadosRemoveCR(sc->env->driver, {pool, oid}));
+
+        // transition to incremental sync
+        return set_cr_done();
       }
 
-      // clean up full sync index, ignoring errors
-      yield call(new RGWRadosRemoveCR(sc->env->driver, {pool, oid}));
+      if (lost_lock || lost_bid) {
+        return set_cr_error(-EBUSY);
+      }
 
-      // transition to incremental sync
-      return set_cr_done();
-    }
-    return 0;
+    }  return 0;
   }
 };
 
@@ -1884,6 +1899,8 @@ class RGWDataIncSyncShardCR : public RGWDataBaseSyncShardCR {
   decltype(log_entries)::iterator log_iter;
   bool truncated = false;
   int cbret = 0;
+  bool lost_lock = false;
+  bool lost_bid = false;
 
   utime_t get_idle_interval() const {
     ceph::timespan interval = std::chrono::seconds(cct->_conf->rgw_data_sync_poll_interval);
@@ -1923,9 +1940,15 @@ public:
       marker_tracker.emplace(sc, status_oid, sync_marker, tn, objv);
       do {
         if (!lease_cr->is_locked()) {
-          drain_all();
+          lost_lock = true;
           tn->log(1, "lease is lost, abort");
-          return set_cr_error(-ECANCELED);
+          break;
+        }
+
+        if (!sc->env->bid_manager->is_highest_bidder(shard_id)) {
+          tn->log(1, "lost bid");
+          lost_bid = true;
+          break;
         }
        {
          current_modified.clear();
@@ -1942,13 +1965,9 @@ public:
             modified_iter != current_modified.end();
             ++modified_iter) {
          if (!lease_cr->is_locked()) {
-           drain_all();
-           yield call(marker_tracker->flush());
-           if (retcode < 0) {
-             tn->log(0, SSTR("ERROR: data sync marker_tracker.flush() returned retcode=" << retcode));
-             return set_cr_error(retcode);
-           }
-           return set_cr_error(-ECANCELED);
+          tn->log(1, "lease is lost, abort");
+          lost_lock = true;
+          break;
          }
           retcode = parse_bucket_key(modified_iter->key, source_bs);
           if (retcode < 0) {
@@ -1976,13 +1995,9 @@ public:
           iter = error_entries.begin();
           for (; iter != error_entries.end(); ++iter) {
            if (!lease_cr->is_locked()) {
-             drain_all();
-             yield call(marker_tracker->flush());
-             if (retcode < 0) {
-               tn->log(0, SSTR("ERROR: data sync marker_tracker.flush() returned retcode=" << retcode));
-               return set_cr_error(retcode);
-             }
-             return set_cr_error(-ECANCELED);
+          tn->log(1, "lease is lost, abort");
+          lost_lock = true;
+          break;
            }
             error_marker = iter->first;
             entry_timestamp = rgw::error_repo::decode_value(iter->second);
@@ -2043,13 +2058,9 @@ public:
             log_iter != log_entries.end();
             ++log_iter) {
          if (!lease_cr->is_locked()) {
-           drain_all();
-           yield call(marker_tracker->flush());
-           if (retcode < 0) {
-             tn->log(0, SSTR("ERROR: data sync marker_tracker.flush() returned retcode=" << retcode));
-             return set_cr_error(retcode);
-           }
-           return set_cr_error(-ECANCELED);
+          tn->log(1, "lease is lost, abort");
+          lost_lock = true;
+          break;
          }
 
           tn->log(20, SSTR("shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key));
@@ -2101,6 +2112,15 @@ public:
          yield wait(get_idle_interval());
        }
       } while (true);
+
+      if (lost_bid) {
+        return set_cr_error(-EBUSY);
+      } else if (lost_lock) {
+        drain_all();
+        yield marker_tracker->flush();
+        return set_cr_error(-ECANCELED);
+      }
+
     }
     return 0;
   }
@@ -2160,6 +2180,12 @@ public:
 
   int operate(const DoutPrefixProvider *dpp) override {
     reenter(this) {
+
+      if (!sc->env->bid_manager->is_highest_bidder(shard_id)) {
+        tn->log(10, "not the highest bidder");
+        return set_cr_error(-EBUSY);
+      }
+
       yield init_lease_cr();
       while (!lease_cr->is_locked()) {
         if (lease_cr->is_done()) {
@@ -2173,7 +2199,7 @@ public:
       }
       *reset_backoff = true;
       tn->log(10, "took lease");
-      /* Reread data sync status to fech latest marker and objv */
+      /* Reread data sync status to fetch latest marker and objv */
       objv.clear();
       yield call(new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->dpp, sync_env->driver,
                                                              rgw_raw_obj(pool, status_oid),
@@ -2387,7 +2413,7 @@ public:
          if (init_lease->is_done()) {
            tn->log(5, "ERROR: failed to take data sync status lease");
            set_status("lease lock failed, early abort");
-           drain_all();
+           drain_all_but_stack(notify_stack.get());
            return set_cr_error(init_lease->get_ret_status());
          }
          tn->log(5, "waiting on data sync status lease");
@@ -2415,7 +2441,7 @@ public:
         if (retcode < 0) {
           tn->log(0, SSTR("ERROR: failed to init sync, retcode=" << retcode));
          init_lease->go_down();
-         drain_all();
+         drain_all_but_stack(notify_stack.get());
           return set_cr_error(retcode);
         }
         // sets state = StateBuildingFullSyncMaps
@@ -2437,7 +2463,7 @@ public:
 
         if (!init_lease->is_locked()) {
           init_lease->go_down();
-          drain_all();
+          drain_all_but_stack(notify_stack.get());
           return set_cr_error(-ECANCELED);
         }
         /* state: building full sync maps */
@@ -2450,7 +2476,7 @@ public:
 
         if (!init_lease->is_locked()) {
           init_lease->go_down();
-          drain_all();
+          drain_all_but_stack(notify_stack.get());
           return set_cr_error(-ECANCELED);
         }
         /* update new state */
@@ -2472,7 +2498,7 @@ public:
       if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
         if (init_lease) {
           init_lease->go_down();
-          drain_all();
+          drain_all_but_stack(notify_stack.get());
           init_lease.reset();
           lease_stack.reset();
         }
@@ -2491,6 +2517,8 @@ public:
         }
       }
 
+      notify_stack.get()->cancel();
+
       return set_cr_done();
     }
     return 0;
index 5fc25f6472801c63dc971042ee66a1cedd423e92..4e160f14cc486221c2baa55699923096f6abc42c 100644 (file)
@@ -23,6 +23,7 @@
 #include "rgw_sync_policy.h"
 
 #include "rgw_bucket_sync.h"
+#include "sync_fairness.h"
 
 // represents an obligation to sync an entry up a given time
 struct rgw_data_sync_obligation {