]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: use objv_tracker to read/write bucket sync status
authorCasey Bodley <cbodley@redhat.com>
Tue, 31 Mar 2020 13:23:23 +0000 (09:23 -0400)
committerCasey Bodley <cbodley@redhat.com>
Mon, 13 Apr 2020 18:08:18 +0000 (14:08 -0400)
use cls_version on bucket sync status to detect racing writes - whether
from other gateways, or from radosgw-admin commands like 'bucket sync'
or 'bucket sync init'

classes that require a non-null version tracker take it by reference

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h

index 8f1ec4f9d5f746acc6485f950484f0ada30bb8e1..918395f65c860ab5074f9f98ee5dcfb6370d0a8d 100644 (file)
@@ -994,6 +994,7 @@ class RGWRunBucketSyncCoroutine : public RGWCoroutine {
   rgw_bucket_sync_pipe sync_pipe;
   rgw_bucket_shard_sync_info sync_status;
   RGWMetaSyncEnv meta_sync_env;
+  RGWObjVersionTracker objv_tracker;
   ceph::real_time* progress;
 
   const std::string status_oid;
@@ -2767,16 +2768,17 @@ class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
   const string sync_status_oid;
 
   rgw_bucket_shard_sync_info& status;
-
+  RGWObjVersionTracker& objv_tracker;
   rgw_bucket_index_marker_info info;
 public:
   RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncCtx *_sc,
                                         const rgw_bucket_sync_pair_info& _sync_pair,
-                                        rgw_bucket_shard_sync_info& _status)
+                                        rgw_bucket_shard_sync_info& _status,
+                                        RGWObjVersionTracker& objv_tracker)
     : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
       sync_pair(_sync_pair),
       sync_status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, _sync_pair)),
-      status(_status)
+      status(_status), objv_tracker(objv_tracker)
   {}
 
   int operate() override {
@@ -2817,9 +2819,9 @@ public:
         if (write_status) {
           map<string, bufferlist> attrs;
           status.encode_all_attrs(attrs);
-          call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc->sysobj, obj, attrs));
+          call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc->sysobj, obj, attrs, &objv_tracker));
         } else {
-          call(new RGWRadosRemoveCR(store, obj));
+          call(new RGWRadosRemoveCR(store, obj, &objv_tracker));
         }
       }
       if (info.syncstopped) {
@@ -2869,12 +2871,12 @@ RGWRemoteBucketManager::RGWRemoteBucketManager(const DoutPrefixProvider *_dpp,
   sc.init(sync_env, conn, source_zone);
 }
 
-RGWCoroutine *RGWRemoteBucketManager::init_sync_status_cr(int num)
+RGWCoroutine *RGWRemoteBucketManager::init_sync_status_cr(int num, RGWObjVersionTracker& objv_tracker)
 {
   if ((size_t)num >= sync_pairs.size()) {
     return nullptr;
   }
-  return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pairs[num], init_status);
+  return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pairs[num], init_status, objv_tracker);
 }
 
 #define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync."
@@ -2941,15 +2943,17 @@ class RGWReadBucketPipeSyncStatusCoroutine : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
   string oid;
   rgw_bucket_shard_sync_info *status;
-
+  RGWObjVersionTracker* objv_tracker;
   map<string, bufferlist> attrs;
 public:
   RGWReadBucketPipeSyncStatusCoroutine(RGWDataSyncCtx *_sc,
                                    const rgw_bucket_sync_pair_info& sync_pair,
-                                   rgw_bucket_shard_sync_info *_status)
+                                   rgw_bucket_shard_sync_info *_status,
+                                   RGWObjVersionTracker* objv_tracker)
     : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
       oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pair)),
-      status(_status) {}
+      status(_status), objv_tracker(objv_tracker)
+  {}
   int operate() override;
 };
 
@@ -2958,7 +2962,7 @@ int RGWReadBucketPipeSyncStatusCoroutine::operate()
   reenter(this) {
     yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->svc->sysobj,
                                              rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, oid),
-                                             &attrs, true));
+                                             &attrs, true, objv_tracker));
     if (retcode == -ENOENT) {
       *status = rgw_bucket_shard_sync_info();
       return set_cr_done();
@@ -3153,7 +3157,7 @@ RGWCoroutine *RGWRemoteBucketManager::read_sync_status_cr(int num, rgw_bucket_sh
     return nullptr;
   }
 
-  return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pairs[num], sync_status);
+  return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pairs[num], sync_status, nullptr);
 }
 
 RGWBucketPipeSyncStatusManager::RGWBucketPipeSyncStatusManager(rgw::sal::RGWRadosStore *_store,
@@ -3351,20 +3355,19 @@ class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj
 
   string marker_oid;
   rgw_bucket_shard_full_sync_marker sync_marker;
-
   RGWSyncTraceNodeRef tn;
+  RGWObjVersionTracker& objv_tracker;
 
 public:
   RGWBucketFullSyncShardMarkerTrack(RGWDataSyncCtx *_sc,
-                         const string& _marker_oid,
-                         const rgw_bucket_shard_full_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
-                                                                sc(_sc), sync_env(_sc->env),
-                                                                marker_oid(_marker_oid),
-                                                                sync_marker(_marker) {}
-
-  void set_tn(RGWSyncTraceNodeRef& _tn) {
-    tn = _tn;
-  }
+                                    const string& _marker_oid,
+                                    const rgw_bucket_shard_full_sync_marker& _marker,
+                                    RGWSyncTraceNodeRef tn,
+                                    RGWObjVersionTracker& objv_tracker)
+    : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
+      sc(_sc), sync_env(_sc->env), marker_oid(_marker_oid),
+      sync_marker(_marker), tn(std::move(tn)), objv_tracker(objv_tracker)
+  {}
 
   RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
     sync_marker.position = new_marker;
@@ -3376,7 +3379,7 @@ public:
     tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
     return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc->sysobj,
                                           rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, marker_oid),
-                                          attrs);
+                                          attrs, &objv_tracker);
   }
 
   RGWOrderCallCR *allocate_order_control_cr() override {
@@ -3390,14 +3393,14 @@ class RGWWriteBucketShardIncSyncStatus : public RGWCoroutine {
   rgw_raw_obj obj;
   rgw_bucket_shard_inc_sync_marker sync_marker;
   ceph::real_time* stable_timestamp;
-  RGWObjVersionTracker* objv_tracker;
+  RGWObjVersionTracker& objv_tracker;
   std::map<std::string, bufferlist> attrs;
  public:
   RGWWriteBucketShardIncSyncStatus(RGWDataSyncEnv *sync_env,
                                    const rgw_raw_obj& obj,
                                    const rgw_bucket_shard_inc_sync_marker& sync_marker,
                                    ceph::real_time* stable_timestamp,
-                                   RGWObjVersionTracker* objv_tracker)
+                                   RGWObjVersionTracker& objv_tracker)
     : RGWCoroutine(sync_env->cct), sync_env(sync_env), obj(obj),
       sync_marker(sync_marker), stable_timestamp(stable_timestamp),
       objv_tracker(objv_tracker)
@@ -3407,7 +3410,7 @@ class RGWWriteBucketShardIncSyncStatus : public RGWCoroutine {
       sync_marker.encode_attr(attrs);
 
       yield call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc->sysobj,
-                                                obj, attrs, objv_tracker));
+                                                obj, attrs, &objv_tracker));
       if (retcode < 0) {
         return set_cr_error(retcode);
       }
@@ -3437,6 +3440,7 @@ class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string,
   std::set<std::string> pending_olh; // object names with pending olh operations
 
   RGWSyncTraceNodeRef tn;
+  RGWObjVersionTracker& objv_tracker;
   ceph::real_time* stable_timestamp;
 
   void handle_finish(const string& marker) override {
@@ -3457,15 +3461,15 @@ public:
   RGWBucketIncSyncShardMarkerTrack(RGWDataSyncCtx *_sc,
                          const string& _marker_oid,
                          const rgw_bucket_shard_inc_sync_marker& _marker,
+                         RGWSyncTraceNodeRef tn,
+                         RGWObjVersionTracker& objv_tracker,
                          ceph::real_time* stable_timestamp)
     : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
       sc(_sc), sync_env(_sc->env),
       obj(sync_env->svc->zone->get_zone_params().log_pool, _marker_oid),
-      sync_marker(_marker), stable_timestamp(stable_timestamp)
+      sync_marker(_marker), tn(std::move(tn)), objv_tracker(objv_tracker),
+      stable_timestamp(stable_timestamp)
   {}
-  void set_tn(RGWSyncTraceNodeRef& _tn) {
-    tn = _tn;
-  }
 
   RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
     sync_marker.position = new_marker;
@@ -3473,7 +3477,7 @@ public:
 
     tn->log(20, SSTR("updating marker marker_oid=" << obj.oid << " marker=" << new_marker << " timestamp=" << timestamp));
     return new RGWWriteBucketShardIncSyncStatus(sync_env, obj, sync_marker,
-                                                stable_timestamp, nullptr);
+                                                stable_timestamp, objv_tracker);
   }
 
   /*
@@ -3683,7 +3687,6 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine {
   bucket_list_result list_result;
   list<bucket_list_entry>::iterator entries_iter;
   rgw_bucket_shard_sync_info& sync_info;
-  RGWBucketFullSyncShardMarkerTrack marker_tracker;
   rgw_obj_key list_marker;
   bucket_list_entry *entry{nullptr};
 
@@ -3696,6 +3699,7 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine {
   rgw_zone_set zones_trace;
 
   RGWSyncTraceNodeRef tn;
+  RGWBucketFullSyncShardMarkerTrack marker_tracker;
 
   struct _prefix_handler {
     RGWBucketSyncFlowManager::pipe_rules_ref rules;
@@ -3747,16 +3751,17 @@ public:
                            const std::string& status_oid,
                            RGWContinuousLeaseCR *lease_cr,
                            rgw_bucket_shard_sync_info& sync_info,
-                           RGWSyncTraceNodeRef tn_parent)
+                           RGWSyncTraceNodeRef tn_parent,
+                           RGWObjVersionTracker& objv_tracker)
     : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
       sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
       lease_cr(lease_cr), sync_info(sync_info),
-      marker_tracker(sc, status_oid, sync_info.full_marker),
       status_oid(status_oid),
       tn(sync_env->sync_tracer->add_node(tn_parent, "full_sync",
-                                         SSTR(bucket_shard_str{bs}))) {
+                                         SSTR(bucket_shard_str{bs}))),
+      marker_tracker(sc, status_oid, sync_info.full_marker, tn, objv_tracker)
+  {
     zones_trace.insert(sc->source_zone.id, sync_pipe.info.dest_bs.bucket.get_key());
-    marker_tracker.set_tn(tn);
     prefix_handler.set_rules(sync_pipe.get_rules());
   }
 
@@ -3896,9 +3901,7 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
   rgw_bucket_shard_sync_info& sync_info;
   rgw_obj_key key;
   rgw_bi_log_entry *entry{nullptr};
-  RGWBucketIncSyncShardMarkerTrack marker_tracker;
   bool updated_status{false};
-  const string& status_oid;
   rgw_zone_id zone_id;
   string target_location_key;
 
@@ -3908,6 +3911,7 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
   bool syncstopped{false};
 
   RGWSyncTraceNodeRef tn;
+  RGWBucketIncSyncShardMarkerTrack marker_tracker;
 
 public:
   RGWBucketShardIncrementalSyncCR(RGWDataSyncCtx *_sc,
@@ -3916,19 +3920,20 @@ public:
                                   RGWContinuousLeaseCR *lease_cr,
                                   rgw_bucket_shard_sync_info& sync_info,
                                   RGWSyncTraceNodeRef& _tn_parent,
+                                  RGWObjVersionTracker& objv_tracker,
                                   ceph::real_time* stable_timestamp)
     : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
       sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
       lease_cr(lease_cr), sync_info(sync_info),
-      marker_tracker(sc, status_oid, sync_info.inc_marker, stable_timestamp),
-      status_oid(status_oid), zone_id(sync_env->svc->zone->get_zone().id),
+      zone_id(sync_env->svc->zone->get_zone().id),
       tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync",
-                                         SSTR(bucket_shard_str{bs})))
+                                         SSTR(bucket_shard_str{bs}))),
+      marker_tracker(sc, status_oid, sync_info.inc_marker, tn,
+                     objv_tracker, stable_timestamp)
   {
     set_description() << "bucket shard incremental sync bucket="
         << bucket_shard_str{bs};
     set_status("init");
-    marker_tracker.set_tn(tn);
     rules = sync_pipe.get_rules();
     target_location_key = sync_pipe.info.dest_bs.bucket.get_key();
   }
@@ -4745,7 +4750,7 @@ int RGWRunBucketSyncCoroutine::operate()
     }
 
     tn->log(10, "took lease");
-    yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &sync_status));
+    yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &sync_status, &objv_tracker));
     if (retcode < 0 && retcode != -ENOENT) {
       tn->log(0, "ERROR: failed to read sync status for bucket");
       lease_cr->go_down();
@@ -4778,7 +4783,7 @@ int RGWRunBucketSyncCoroutine::operate()
     do {
       if (sync_status.state == rgw_bucket_shard_sync_info::StateInit ||
           sync_status.state == rgw_bucket_shard_sync_info::StateStopped) {
-        yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pair, sync_status));
+        yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pair, sync_status, objv_tracker));
         if (retcode == -ENOENT) {
           tn->log(0, "bucket sync disabled");
           lease_cr->abort(); // deleted lease object, abort/wakeup instead of unlock
@@ -4801,7 +4806,7 @@ int RGWRunBucketSyncCoroutine::operate()
       if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
         yield call(new RGWBucketShardFullSyncCR(sc, sync_pipe,
                                                 status_oid, lease_cr.get(),
-                                                sync_status, tn));
+                                                sync_status, tn, objv_tracker));
         if (retcode < 0) {
           tn->log(5, SSTR("full sync on bucket failed, retcode=" << retcode));
           lease_cr->go_down();
@@ -4814,7 +4819,7 @@ int RGWRunBucketSyncCoroutine::operate()
         yield call(new RGWBucketShardIncrementalSyncCR(sc, sync_pipe,
                                                        status_oid, lease_cr.get(),
                                                        sync_status, tn,
-                                                       progress));
+                                                       objv_tracker, progress));
         if (retcode < 0) {
           tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode));
           lease_cr->go_down();
@@ -4899,12 +4904,15 @@ int RGWBucketPipeSyncStatusManager::init()
 int RGWBucketPipeSyncStatusManager::init_sync_status()
 {
   list<RGWCoroutinesStack *> stacks;
+  // pass an empty objv tracker to each so that the version gets incremented
+  std::list<RGWObjVersionTracker> objvs;
 
   for (auto& mgr : source_mgrs) {
     RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
 
     for (int i = 0; i < mgr->num_pipes(); ++i) {
-      stack->call(mgr->init_sync_status_cr(i));
+      objvs.emplace_back();
+      stack->call(mgr->init_sync_status_cr(i, objvs.back()));
     }
 
     stacks.push_back(stack);
@@ -5039,7 +5047,7 @@ class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR {
     }
     sync_pair.source_bs = source_bs;
     sync_pair.dest_bs = dest_bs;
-    spawn(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &*i), false);
+    spawn(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &*i, nullptr), false);
     ++i;
     ++source_bs.shard_id;
     if (shard_to_shard_sync) {
index 7c00db3efa646848f5d0e72d70332e48c5d5c7b5..18d52b03197f4815d3e750142953757ca71a9cd9 100644 (file)
@@ -603,7 +603,7 @@ public:
             const rgw_bucket& dest_bucket);
 
   RGWCoroutine *read_sync_status_cr(int num, rgw_bucket_shard_sync_info *sync_status);
-  RGWCoroutine *init_sync_status_cr(int num);
+  RGWCoroutine *init_sync_status_cr(int num, RGWObjVersionTracker& objv_tracker);
   RGWCoroutine *run_sync_cr(int num);
 
   int num_pipes() {