]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: DataSyncShardCR uses GetOmapVals
authorCasey Bodley <cbodley@redhat.com>
Tue, 31 Mar 2020 13:22:53 +0000 (09:22 -0400)
committerCasey Bodley <cbodley@redhat.com>
Mon, 13 Apr 2020 15:06:46 +0000 (11:06 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_data_sync.cc

index e11e7419d3455c0f63cfdb45bd67fa2b906719fb..f2658c9a85237603515a4a55beff474bf6a4a93a 100644 (file)
@@ -1392,9 +1392,9 @@ class RGWDataSyncShardCR : public RGWCoroutine {
   uint32_t shard_id;
   rgw_data_sync_marker& sync_marker;
 
-  RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
-  std::set<std::string> entries;
-  std::set<std::string>::iterator iter;
+  RGWRadosGetOmapValsCR::ResultPtr omapvals;
+  std::map<std::string, bufferlist> entries;
+  std::map<std::string, bufferlist>::iterator iter;
 
   string oid;
 
@@ -1430,7 +1430,7 @@ class RGWDataSyncShardCR : public RGWCoroutine {
 
   string error_oid;
   RGWOmapAppend *error_repo;
-  std::set<std::string> error_entries;
+  std::map<std::string, bufferlist> error_entries;
   string error_marker;
   int max_error_entries;
 
@@ -1550,31 +1550,30 @@ public:
           drain_all();
           return set_cr_error(-ECANCELED);
         }
-        omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
-        yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
-                                             sync_marker.marker, max_entries, omapkeys));
+        omapvals = std::make_shared<RGWRadosGetOmapValsCR::Result>();
+        yield call(new RGWRadosGetOmapValsCR(sync_env->store, rgw_raw_obj(pool, oid),
+                                             sync_marker.marker, max_entries, omapvals));
         if (retcode < 0) {
-          tn->log(0, SSTR("ERROR: RGWRadosGetOmapKeysCR() returned ret=" << retcode));
           lease_cr->go_down();
           drain_all();
           return set_cr_error(retcode);
         }
-        entries = std::move(omapkeys->entries);
+        entries = std::move(omapvals->entries);
         if (entries.size() > 0) {
           tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
         }
         tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync"));
         iter = entries.begin();
         for (; iter != entries.end(); ++iter) {
-          tn->log(20, SSTR("full sync: " << *iter));
+          tn->log(20, SSTR("full sync: " << iter->first));
           total_entries++;
-          if (!marker_tracker->start(*iter, total_entries, real_time())) {
-            tn->log(0, SSTR("ERROR: cannot start syncing " << *iter << ". Duplicate entry?"));
+          if (!marker_tracker->start(iter->first, total_entries, real_time())) {
+            tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?"));
           } else {
             // fetch remote and write locally
-            yield spawn(new RGWDataSyncSingleEntryCR(sc, *iter, *iter, marker_tracker, error_repo, false, tn), false);
+            yield spawn(new RGWDataSyncSingleEntryCR(sc, iter->first, iter->first, marker_tracker, error_repo, false, tn), false);
           }
-          sync_marker.marker = *iter;
+          sync_marker.marker = iter->first;
 
           while ((int)num_spawned() > spawn_window) {
             set_status() << "num_spawned() > spawn_window";
@@ -1587,8 +1586,8 @@ public:
             }
           }
         }
-      } while (omapkeys->more);
-      omapkeys.reset();
+      } while (omapvals->more);
+      omapvals.reset();
 
       drain_all_but_stack(lease_stack.get());
 
@@ -1664,18 +1663,18 @@ public:
 
         if (error_retry_time <= ceph::coarse_real_clock::now()) {
           /* process bucket shards that previously failed */
-          omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
-          yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
-                                               error_marker, max_error_entries, omapkeys));
-          error_entries = std::move(omapkeys->entries);
+          omapvals = std::make_shared<RGWRadosGetOmapValsCR::Result>();
+          yield call(new RGWRadosGetOmapValsCR(sync_env->store, rgw_raw_obj(pool, error_oid),
+                                               error_marker, max_error_entries, omapvals));
+          error_entries = std::move(omapvals->entries);
           tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries"));
           iter = error_entries.begin();
           for (; iter != error_entries.end(); ++iter) {
-            error_marker = *iter;
+            error_marker = iter->first;
             tn->log(20, SSTR("handle error entry: " << error_marker));
             spawn(new RGWDataSyncSingleEntryCR(sc, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false);
           }
-          if (!omapkeys->more) {
+          if (!omapvals->more) {
             if (error_marker.empty() && error_entries.empty()) {
               /* the retry repo is empty, we back off a bit before calling it again */
               retry_backoff_secs *= 2;
@@ -1689,7 +1688,7 @@ public:
             error_marker.clear();
           }
         }
-        omapkeys.reset();
+        omapvals.reset();
 
         tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker));
         yield call(new RGWReadRemoteDataLogShardCR(sc, shard_id, sync_marker.marker,