]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: data sync tracks timestamps for error repo
authorCasey Bodley <cbodley@redhat.com>
Tue, 31 Mar 2020 13:22:56 +0000 (09:22 -0400)
committerCasey Bodley <cbodley@redhat.com>
Mon, 13 Apr 2020 15:06:46 +0000 (11:06 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_data_sync.cc

index f2658c9a85237603515a4a55beff474bf6a4a93a..1045e01f2422301cbaee5e119d6b94dd2012673e 100644 (file)
@@ -24,6 +24,7 @@
 #include "rgw_bucket_sync.h"
 #include "rgw_metadata.h"
 #include "rgw_sync_counters.h"
+#include "rgw_sync_error_repo.h"
 #include "rgw_sync_module.h"
 #include "rgw_sal.h"
 
@@ -1284,6 +1285,7 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine {
 
   string raw_key;
   string entry_marker;
+  ceph::real_time entry_timestamp;
 
   rgw_bucket_shard source_bs;
 
@@ -1296,16 +1298,15 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine {
   boost::intrusive_ptr<RGWOmapAppend> error_repo;
   bool remove_from_repo;
 
-  set<string> keys;
-
   RGWSyncTraceNodeRef tn;
 public:
   RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc,
-                          const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker,
+                          const string& _raw_key, const string& _entry_marker,
+                           ceph::real_time entry_timestamp, RGWDataSyncShardMarkerTrack *_marker_tracker,
                            RGWOmapAppend *_error_repo, bool _remove_from_repo, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sc->cct),
                                                       sc(_sc), sync_env(_sc->env),
                                                      raw_key(_raw_key), entry_marker(_entry_marker),
-                                                      sync_status(0),
+                                                      entry_timestamp(entry_timestamp), sync_status(0),
                                                       marker_tracker(_marker_tracker),
                                                       error_repo(_error_repo), remove_from_repo(_remove_from_repo) {
     set_description() << "data sync single entry (source_zone=" << sc->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
@@ -1352,12 +1353,16 @@ public:
             tn->log(0, SSTR("ERROR: failed to log sync failure: retcode=" << retcode));
           }
         }
-        if (error_repo && !error_repo->append(raw_key)) {
-          tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode));
+        if (error_repo) {
+          yield call(rgw_error_repo_write_cr(sync_env->store->svc()->rados, error_repo->get_obj(),
+                                             raw_key, entry_timestamp));
+          if (retcode < 0) {
+            tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode));
+          }
         }
       } else if (error_repo && remove_from_repo) {
-        keys = {raw_key};
-        yield call(new RGWRadosRemoveOmapKeysCR(sync_env->store, error_repo->get_obj(), keys));
+        yield call(rgw_error_repo_remove_cr(sync_env->store->svc()->rados, error_repo->get_obj(),
+                                            raw_key, entry_timestamp));
         if (retcode < 0) {
           tn->log(0, SSTR("ERROR: failed to remove omap key from error repo ("
              << error_repo->get_obj() << " retcode=" << retcode));
@@ -1432,6 +1437,7 @@ class RGWDataSyncShardCR : public RGWCoroutine {
   RGWOmapAppend *error_repo;
   std::map<std::string, bufferlist> error_entries;
   string error_marker;
+  ceph::real_time entry_timestamp;
   int max_error_entries;
 
   ceph::coarse_real_time error_retry_time;
@@ -1544,6 +1550,7 @@ public:
       oid = full_data_sync_index_shard_oid(sc->source_zone, shard_id);
       set_marker_tracker(new RGWDataSyncShardMarkerTrack(sc, status_oid, sync_marker, tn));
       total_entries = sync_marker.pos;
+      entry_timestamp = sync_marker.timestamp; // time when full sync started
       do {
         if (!lease_cr->is_locked()) {
           stop_spawned_services();
@@ -1567,11 +1574,11 @@ public:
         for (; iter != entries.end(); ++iter) {
           tn->log(20, SSTR("full sync: " << iter->first));
           total_entries++;
-          if (!marker_tracker->start(iter->first, total_entries, real_time())) {
+          if (!marker_tracker->start(iter->first, total_entries, entry_timestamp)) {
             tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?"));
           } else {
             // fetch remote and write locally
-            yield spawn(new RGWDataSyncSingleEntryCR(sc, iter->first, iter->first, marker_tracker, error_repo, false, tn), false);
+            yield spawn(new RGWDataSyncSingleEntryCR(sc, iter->first, iter->first, entry_timestamp, marker_tracker, error_repo, false, tn), false);
           }
           sync_marker.marker = iter->first;
 
@@ -1657,7 +1664,7 @@ public:
         for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
           yield {
             tn->log(20, SSTR("received async update notification: " << *modified_iter));
-            spawn(new RGWDataSyncSingleEntryCR(sc, *modified_iter, string(), marker_tracker, nullptr, false, tn), false);
+            spawn(new RGWDataSyncSingleEntryCR(sc, *modified_iter, string(), ceph::real_time{}, marker_tracker, nullptr, false, tn), false);
           }
         }
 
@@ -1671,8 +1678,11 @@ public:
           iter = error_entries.begin();
           for (; iter != error_entries.end(); ++iter) {
             error_marker = iter->first;
-            tn->log(20, SSTR("handle error entry: " << error_marker));
-            spawn(new RGWDataSyncSingleEntryCR(sc, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false);
+            entry_timestamp = rgw_error_repo_decode_value(iter->second);
+            tn->log(20, SSTR("handle error entry key=" << error_marker << " timestamp=" << entry_timestamp));
+            spawn(new RGWDataSyncSingleEntryCR(sc, error_marker, error_marker,
+                                               entry_timestamp, nullptr /* no marker tracker */,
+                                               error_repo, true, tn), false);
           }
           if (!omapvals->more) {
             if (error_marker.empty() && error_entries.empty()) {
@@ -1714,7 +1724,9 @@ public:
           if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
             tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
           } else {
-            spawn(new RGWDataSyncSingleEntryCR(sc, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false, tn), false);
+            spawn(new RGWDataSyncSingleEntryCR(sc, log_iter->entry.key, log_iter->log_id,
+                                               log_iter->log_timestamp, marker_tracker,
+                                               error_repo, false, tn), false);
           }
           while ((int)num_spawned() > spawn_window) {
             set_status() << "num_spawned() > spawn_window";