]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: parse bucket-shard before DataSyncSingleEntry
authorCasey Bodley <cbodley@redhat.com>
Tue, 31 Mar 2020 13:22:58 +0000 (09:22 -0400)
committerCasey Bodley <cbodley@redhat.com>
Mon, 13 Apr 2020 15:06:46 +0000 (11:06 -0400)
it's easier for DataSyncShard to handle parsing failures before calling
MarkerTrack::start() and DataSyncSingleEntry

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_data_sync.cc

index 1045e01f2422301cbaee5e119d6b94dd2012673e..a55836528639c7ec66ff12bde4ec6cafe845b35b 100644 (file)
@@ -1283,12 +1283,11 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine {
   RGWDataSyncCtx *sc;
   RGWDataSyncEnv *sync_env;
 
+  rgw_bucket_shard source_bs;
   string raw_key;
   string entry_marker;
   ceph::real_time entry_timestamp;
 
-  rgw_bucket_shard source_bs;
-
   int sync_status;
 
   bufferlist md_bl;
@@ -1300,15 +1299,16 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine {
 
   RGWSyncTraceNodeRef tn;
 public:
-  RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc,
+  RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& source_bs,
                           const string& _raw_key, const string& _entry_marker,
                            ceph::real_time entry_timestamp, RGWDataSyncShardMarkerTrack *_marker_tracker,
-                           RGWOmapAppend *_error_repo, bool _remove_from_repo, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sc->cct),
-                                                      sc(_sc), sync_env(_sc->env),
-                                                     raw_key(_raw_key), entry_marker(_entry_marker),
-                                                      entry_timestamp(entry_timestamp), sync_status(0),
-                                                      marker_tracker(_marker_tracker),
-                                                      error_repo(_error_repo), remove_from_repo(_remove_from_repo) {
+                           RGWOmapAppend *_error_repo, bool _remove_from_repo, const RGWSyncTraceNodeRef& _tn_parent)
+    : RGWCoroutine(_sc->cct),
+      sc(_sc), sync_env(_sc->env), source_bs(source_bs),
+      raw_key(_raw_key), entry_marker(_entry_marker),
+      entry_timestamp(entry_timestamp), sync_status(0),
+      marker_tracker(_marker_tracker),
+      error_repo(_error_repo), remove_from_repo(_remove_from_repo) {
     set_description() << "data sync single entry (source_zone=" << sc->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
     tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", raw_key);
   }
@@ -1316,22 +1316,15 @@ public:
   int operate() override {
     reenter(this) {
       do {
-        yield {
-          int ret = rgw_bucket_parse_bucket_key(sync_env->cct, raw_key,
-                                                &source_bs.bucket, &source_bs.shard_id);
-          if (ret < 0) {
-            return set_cr_error(-EIO);
-          }
-          if (marker_tracker) {
-            marker_tracker->reset_need_retry(raw_key);
-          }
-          tn->log(0, SSTR("triggering sync of source bucket/shard " << bucket_shard_str{source_bs}));
-
-          call(new RGWRunBucketSourcesSyncCR(sc,
-                                             std::nullopt, /* target_bs */
-                                             source_bs,
-                                             tn));
+        if (marker_tracker) {
+          marker_tracker->reset_need_retry(raw_key);
         }
+        tn->log(0, SSTR("triggering sync of source bucket/shard " << bucket_shard_str{source_bs}));
+
+        yield call(new RGWRunBucketSourcesSyncCR(sc,
+                                                 std::nullopt, /* target_bs */
+                                                 source_bs,
+                                                 tn));
       } while (marker_tracker && marker_tracker->need_retry(raw_key));
 
       sync_status = retcode;
@@ -1448,6 +1441,13 @@ class RGWDataSyncShardCR : public RGWCoroutine {
   uint32_t retry_backoff_secs;
 
   RGWSyncTraceNodeRef tn;
+
+  rgw_bucket_shard source_bs;
+
+  int parse_bucket_key(const std::string& key, rgw_bucket_shard& bs) const {
+    return rgw_bucket_parse_bucket_key(sync_env->cct, key,
+                                       &bs.bucket, &bs.shard_id);
+  }
 public:
   RGWDataSyncShardCR(RGWDataSyncCtx *_sc,
                      rgw_pool& _pool,
@@ -1572,13 +1572,19 @@ public:
         tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync"));
         iter = entries.begin();
         for (; iter != entries.end(); ++iter) {
+          retcode = parse_bucket_key(iter->first, source_bs);
+          if (retcode < 0) {
+            tn->log(1, SSTR("failed to parse bucket shard: " << iter->first));
+            marker_tracker->try_update_high_marker(iter->first, 0, entry_timestamp);
+            continue;
+          }
           tn->log(20, SSTR("full sync: " << iter->first));
           total_entries++;
           if (!marker_tracker->start(iter->first, total_entries, entry_timestamp)) {
             tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?"));
           } else {
             // fetch remote and write locally
-            yield spawn(new RGWDataSyncSingleEntryCR(sc, iter->first, iter->first, entry_timestamp, marker_tracker, error_repo, false, tn), false);
+            yield spawn(new RGWDataSyncSingleEntryCR(sc, source_bs, iter->first, iter->first, entry_timestamp, marker_tracker, error_repo, false, tn), false);
           }
           sync_marker.marker = iter->first;
 
@@ -1662,10 +1668,13 @@ public:
         }
         /* process out of band updates */
         for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
-          yield {
-            tn->log(20, SSTR("received async update notification: " << *modified_iter));
-            spawn(new RGWDataSyncSingleEntryCR(sc, *modified_iter, string(), ceph::real_time{}, marker_tracker, nullptr, false, tn), false);
+          retcode = parse_bucket_key(*modified_iter, source_bs);
+          if (retcode < 0) {
+            tn->log(1, SSTR("failed to parse bucket shard: " << *modified_iter));
+            continue;
           }
+          tn->log(20, SSTR("received async update notification: " << *modified_iter));
+          spawn(new RGWDataSyncSingleEntryCR(sc, source_bs, *modified_iter, string(), ceph::real_time{}, marker_tracker, nullptr, false, tn), false);
         }
 
         if (error_retry_time <= ceph::coarse_real_clock::now()) {
@@ -1679,8 +1688,15 @@ public:
           for (; iter != error_entries.end(); ++iter) {
             error_marker = iter->first;
             entry_timestamp = rgw_error_repo_decode_value(iter->second);
+            retcode = parse_bucket_key(error_marker, source_bs);
+            if (retcode < 0) {
+              tn->log(1, SSTR("failed to parse bucket shard: " << error_marker));
+              spawn(rgw_error_repo_remove_cr(sync_env->store->svc()->rados, rgw_raw_obj{pool, error_oid},
+                                             error_marker, entry_timestamp), false);
+              continue;
+            }
             tn->log(20, SSTR("handle error entry key=" << error_marker << " timestamp=" << entry_timestamp));
-            spawn(new RGWDataSyncSingleEntryCR(sc, error_marker, error_marker,
+            spawn(new RGWDataSyncSingleEntryCR(sc, source_bs, error_marker, error_marker,
                                                entry_timestamp, nullptr /* no marker tracker */,
                                                error_repo, true, tn), false);
           }
@@ -1716,6 +1732,12 @@ public:
 
         for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
           tn->log(20, SSTR("shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key));
+          retcode = parse_bucket_key(log_iter->entry.key, source_bs);
+          if (retcode < 0) {
+            tn->log(1, SSTR("failed to parse bucket shard: " << log_iter->entry.key));
+            marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
+            continue;
+          }
           if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
             tn->log(20, SSTR("skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard"));
             marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
@@ -1724,7 +1746,7 @@ public:
           if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
             tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
           } else {
-            spawn(new RGWDataSyncSingleEntryCR(sc, log_iter->entry.key, log_iter->log_id,
+            spawn(new RGWDataSyncSingleEntryCR(sc, source_bs, log_iter->entry.key, log_iter->log_id,
                                                log_iter->log_timestamp, marker_tracker,
                                                error_repo, false, tn), false);
           }