]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: track last timestamp written for bucket sync
authorCasey Bodley <cbodley@redhat.com>
Tue, 31 Mar 2020 13:23:07 +0000 (09:23 -0400)
committerCasey Bodley <cbodley@redhat.com>
Mon, 13 Apr 2020 18:08:18 +0000 (14:08 -0400)
bucket sync remembers the latest timestamp that it successfully wrote to
the bucket sync status. data sync can use this to make future decisions
without having to reread its sync status

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_data_sync.cc

index 4d6200a00072430845bc41ce46c7c88eb87af2b1..4b485599dcc92c8012f224cbd2e8aaf08fd38315 100644 (file)
@@ -1025,6 +1025,7 @@ class RGWRunBucketSyncCoroutine : public RGWCoroutine {
   rgw_bucket_sync_pipe sync_pipe;
   rgw_bucket_shard_sync_info sync_status;
   RGWMetaSyncEnv meta_sync_env;
+  ceph::real_time* progress;
 
   const std::string status_oid;
 
@@ -1034,8 +1035,11 @@ class RGWRunBucketSyncCoroutine : public RGWCoroutine {
   RGWSyncTraceNodeRef tn;
 
 public:
-  RGWRunBucketSyncCoroutine(RGWDataSyncCtx *_sc, const rgw_bucket_sync_pair_info& _sync_pair, const RGWSyncTraceNodeRef& _tn_parent)
-    : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), sync_pair(_sync_pair),
+  RGWRunBucketSyncCoroutine(RGWDataSyncCtx *_sc, const rgw_bucket_sync_pair_info& _sync_pair,
+                            const RGWSyncTraceNodeRef& _tn_parent,
+                            ceph::real_time* progress)
+    : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
+      sync_pair(_sync_pair), progress(progress),
       status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pair)),
       tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket",
                                          SSTR(bucket_shard_str{_sync_pair.dest_bs} << "<-" << bucket_shard_str{_sync_pair.source_bs} ))) {
@@ -1250,8 +1254,9 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
   boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
 
   RGWSyncTraceNodeRef tn;
-  std::vector<RGWDataSyncCtx> scs;
-  RGWDataSyncCtx *cur_sc{nullptr};
+  ceph::real_time* progress;
+  std::vector<ceph::real_time> shard_progress;
+  std::vector<ceph::real_time>::iterator cur_shard_progress;
 
   RGWRESTConn *conn{nullptr};
   rgw_zone_id last_zone;
@@ -1269,7 +1274,8 @@ public:
   RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
                             std::optional<rgw_bucket_shard> _target_bs,
                             std::optional<rgw_bucket_shard> _source_bs,
-                            const RGWSyncTraceNodeRef& _tn_parent);
+                            const RGWSyncTraceNodeRef& _tn_parent,
+                            ceph::real_time* progress);
   ~RGWRunBucketSourcesSyncCR() override {
     if (lease_cr) {
       lease_cr->abort();
@@ -1288,6 +1294,7 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine {
   boost::intrusive_ptr<RGWOmapAppend> error_repo;
   RGWSyncTraceNodeRef tn;
 
+  ceph::real_time progress;
   int sync_status = 0;
 public:
   RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& source_bs,
@@ -1313,7 +1320,10 @@ public:
         yield call(new RGWRunBucketSourcesSyncCR(sc,
                                                  std::nullopt, /* target_bs */
                                                  source_bs,
-                                                 tn));
+                                                 tn, &progress));
+        if (retcode == 0) {
+          tn->log(20, SSTR("RunBucketSources progress=" << progress));
+        }
       } while (marker_tracker && marker_tracker->need_retry(obligation.key));
 
       sync_status = retcode;
@@ -3384,11 +3394,47 @@ public:
   }
 };
 
+// write the incremental sync status and update 'stable_timestamp' on success
+class RGWWriteBucketShardIncSyncStatus : public RGWCoroutine {
+  RGWDataSyncEnv *sync_env;
+  rgw_raw_obj obj;
+  rgw_bucket_shard_inc_sync_marker sync_marker;
+  ceph::real_time* stable_timestamp;
+  RGWObjVersionTracker* objv_tracker;
+  std::map<std::string, bufferlist> attrs;
+ public:
+  RGWWriteBucketShardIncSyncStatus(RGWDataSyncEnv *sync_env,
+                                   const rgw_raw_obj& obj,
+                                   const rgw_bucket_shard_inc_sync_marker& sync_marker,
+                                   ceph::real_time* stable_timestamp,
+                                   RGWObjVersionTracker* objv_tracker)
+    : RGWCoroutine(sync_env->cct), sync_env(sync_env), obj(obj),
+      sync_marker(sync_marker), stable_timestamp(stable_timestamp),
+      objv_tracker(objv_tracker)
+  {}
+  int operate() {
+    reenter(this) {
+      sync_marker.encode_attr(attrs);
+
+      yield call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc->sysobj,
+                                                obj, attrs, objv_tracker));
+      if (retcode < 0) {
+        return set_cr_error(retcode);
+      }
+      if (stable_timestamp) {
+        *stable_timestamp = sync_marker.timestamp;
+      }
+      return set_cr_done();
+    }
+    return 0;
+  }
+};
+
 class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, rgw_obj_key> {
   RGWDataSyncCtx *sc;
   RGWDataSyncEnv *sync_env;
 
-  string marker_oid;
+  rgw_raw_obj obj;
   rgw_bucket_shard_inc_sync_marker sync_marker;
 
   map<rgw_obj_key, string> key_to_marker;
@@ -3401,6 +3447,7 @@ class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string,
   std::set<std::string> pending_olh; // object names with pending olh operations
 
   RGWSyncTraceNodeRef tn;
+  ceph::real_time* stable_timestamp;
 
   void handle_finish(const string& marker) override {
     auto iter = marker_to_op.find(marker);
@@ -3419,11 +3466,13 @@ class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string,
 public:
   RGWBucketIncSyncShardMarkerTrack(RGWDataSyncCtx *_sc,
                          const string& _marker_oid,
-                         const rgw_bucket_shard_inc_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
-                                                                sc(_sc), sync_env(_sc->env),
-                                                                marker_oid(_marker_oid),
-                                                                sync_marker(_marker) {}
-
+                         const rgw_bucket_shard_inc_sync_marker& _marker,
+                         ceph::real_time* stable_timestamp)
+    : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
+      sc(_sc), sync_env(_sc->env),
+      obj(sync_env->svc->zone->get_zone_params().log_pool, _marker_oid),
+      sync_marker(_marker), stable_timestamp(stable_timestamp)
+  {}
   void set_tn(RGWSyncTraceNodeRef& _tn) {
     tn = _tn;
   }
@@ -3432,14 +3481,9 @@ public:
     sync_marker.position = new_marker;
     sync_marker.timestamp = timestamp;
 
-    map<string, bufferlist> attrs;
-    sync_marker.encode_attr(attrs);
-
-    tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker << " timestamp=" << timestamp));
-    return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados,
-                                          sync_env->svc->sysobj,
-                                          rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, marker_oid),
-                                          attrs);
+    tn->log(20, SSTR("updating marker marker_oid=" << obj.oid << " marker=" << new_marker << " timestamp=" << timestamp));
+    return new RGWWriteBucketShardIncSyncStatus(sync_env, obj, sync_marker,
+                                                stable_timestamp, nullptr);
   }
 
   /*
@@ -3881,11 +3925,12 @@ public:
                                   const std::string& status_oid,
                                   RGWContinuousLeaseCR *lease_cr,
                                   rgw_bucket_shard_sync_info& sync_info,
-                                  RGWSyncTraceNodeRef& _tn_parent)
+                                  RGWSyncTraceNodeRef& _tn_parent,
+                                  ceph::real_time* stable_timestamp)
     : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
       sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
       lease_cr(lease_cr), sync_info(sync_info),
-      marker_tracker(sc, status_oid, sync_info.inc_marker),
+      marker_tracker(sc, status_oid, sync_info.inc_marker, stable_timestamp),
       status_oid(status_oid), zone_id(sync_env->svc->zone->get_zone().id),
       tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync",
                                          SSTR(bucket_shard_str{bs})))
@@ -4288,13 +4333,13 @@ std::ostream& operator<<(std::ostream& out, std::optional<rgw_bucket_shard>& bs)
 RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
                                                      std::optional<rgw_bucket_shard> _target_bs,
                                                      std::optional<rgw_bucket_shard> _source_bs,
-                                                     const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sc->env->cct),
-      sc(_sc),
-      sync_env(_sc->env),
-      target_bs(_target_bs),
-      source_bs(_source_bs),
-      tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources",
-                                         SSTR( "target=" << target_bucket.value_or(rgw_bucket()) << ":source_bucket=" << source_bucket.value_or(rgw_bucket()) << ":source_zone=" << sc->source_zone)))
+                                                     const RGWSyncTraceNodeRef& _tn_parent,
+                                                     ceph::real_time* progress)
+  : RGWCoroutine(_sc->env->cct), sc(_sc), sync_env(_sc->env),
+    target_bs(_target_bs), source_bs(_source_bs),
+    tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources",
+                                       SSTR( "target=" << target_bucket.value_or(rgw_bucket()) << ":source_bucket=" << source_bucket.value_or(rgw_bucket()) << ":source_zone=" << sc->source_zone))),
+    progress(progress)
 {
   if (target_bs) {
     target_bucket = target_bs->bucket;
@@ -4317,6 +4362,7 @@ int RGWRunBucketSourcesSyncCR::operate()
 
     if (pipes.empty()) {
       ldpp_dout(sync_env->dpp, 20) << __func__ << "(): no relevant sync pipes found" << dendl;
+      return set_cr_done();
     }
 
     for (siter = pipes.begin(); siter != pipes.end(); ++siter) {
@@ -4345,7 +4391,9 @@ int RGWRunBucketSourcesSyncCR::operate()
 
       ldpp_dout(sync_env->dpp, 20) << __func__ << "(): num shards=" << num_shards << " cur_shard=" << cur_shard << dendl;
 
-      for (; num_shards > 0; --num_shards, ++cur_shard) {
+      shard_progress.resize(num_shards);
+      cur_shard_progress = shard_progress.begin();
+      for (; num_shards > 0; --num_shards, ++cur_shard, ++cur_shard_progress) {
         /*
          * use a negatvie shard_id for backward compatibility,
          * this affects the crafted status oid
@@ -4359,7 +4407,8 @@ int RGWRunBucketSourcesSyncCR::operate()
 
         ldpp_dout(sync_env->dpp, 20) << __func__ << "(): sync_pair=" << sync_pair << dendl;
 
-        yield spawn(new RGWRunBucketSyncCoroutine(sc, sync_pair, tn), false);
+        yield spawn(new RGWRunBucketSyncCoroutine(sc, sync_pair, tn,
+                                                  &*cur_shard_progress), false);
         while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
           set_status() << "num_spawned() > spawn_window";
           yield wait_for_child();
@@ -4388,6 +4437,9 @@ int RGWRunBucketSourcesSyncCR::operate()
         }
       }
     }
+    if (progress) {
+      *progress = *std::min_element(shard_progress.begin(), shard_progress.end());
+    }
     return set_cr_done();
   }
 
@@ -4752,6 +4804,9 @@ int RGWRunBucketSyncCoroutine::operate()
           return set_cr_error(retcode);
         }
       }
+      if (progress) {
+        *progress = sync_status.inc_marker.timestamp;
+      }
 
       if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
         yield call(new RGWBucketShardFullSyncCR(sc, sync_pipe,
@@ -4768,7 +4823,8 @@ int RGWRunBucketSyncCoroutine::operate()
       if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
         yield call(new RGWBucketShardIncrementalSyncCR(sc, sync_pipe,
                                                        status_oid, lease_cr.get(),
-                                                       sync_status, tn));
+                                                       sync_status, tn,
+                                                       progress));
         if (retcode < 0) {
           tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode));
           lease_cr->go_down();
@@ -4793,7 +4849,7 @@ RGWCoroutine *RGWRemoteBucketManager::run_sync_cr(int num)
     return nullptr;
   }
 
-  return new RGWRunBucketSyncCoroutine(&sc, sync_pairs[num], sync_env->sync_tracer->root_node);
+  return new RGWRunBucketSyncCoroutine(&sc, sync_pairs[num], sync_env->sync_tracer->root_node, nullptr);
 }
 
 int RGWBucketPipeSyncStatusManager::init()