]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/multisite: changes to error handling.
authorShilpa Jagannath <smanjara@redhat.com>
Tue, 5 Jul 2022 19:54:26 +0000 (15:54 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Mon, 8 Aug 2022 19:44:15 +0000 (15:44 -0400)
Signed-off-by: Shilpa Jagannath <smanjara@redhat.com>
src/rgw/rgw_data_sync.cc

index 8c9fe5189a406583517f8ffcd81b19fc11976e29..f191f45d231f8dfa8d2b20dd9b6a5de64bb39ac7 100644 (file)
@@ -1417,8 +1417,8 @@ class RGWFullSyncErrorRepoCR: public RGWCoroutine {
   RGWDataSyncCtx *sc;
   RGWDataSyncEnv *sync_env;
   rgw_bucket_shard source_bs;
+  std::string error_marker;
   ceph::real_time timestamp;
-  int *error_result;
   RGWSyncTraceNodeRef tn;
   rgw_bucket_index_marker_info remote_info;
   rgw_pool pool;
@@ -1435,10 +1435,10 @@ class RGWFullSyncErrorRepoCR: public RGWCoroutine {
   }
 
 public:
-  RGWFullSyncErrorRepoCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs,
-                         ceph::real_time& _timestamp, int *_error_result, RGWSyncTraceNodeRef& _tn)
+  RGWFullSyncErrorRepoCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs, std::string _error_marker,
+                         ceph::real_time& _timestamp, RGWSyncTraceNodeRef& _tn)
     : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
-      source_bs(_source_bs), timestamp(_timestamp), error_result(_error_result), tn(_tn) {
+      source_bs(_source_bs), error_marker(_error_marker), timestamp(_timestamp), tn(_tn) {
         tn = sync_env->sync_tracer->add_node(_tn, "error_repo", SSTR(bucket_shard_str(source_bs)));
       }
 
@@ -1462,20 +1462,21 @@ public:
                             [&](uint64_t stack_id, int ret) {
                               if (ret < 0) {
                                 tn->log(10, SSTR("writing to error repo returned error: " << ret));
-                                *error_result = ret;
                               }
-                              return 0;
-                              });
+                              return ret;
+                            });
         }
       }
       drain_all_cb([&](uint64_t stack_id, int ret) {
                    if (ret < 0) {
                      tn->log(10, SSTR("writing to error repo returned error: " << ret));
-                     *error_result = ret;
                    }
-                   return 0;
+                   return ret;
                  });
 
+      spawn(rgw::error_repo::remove_cr(sync_env->store->svc()->rados, error_repo,
+                                      error_marker, timestamp), false);
+
       return set_cr_done();
     }
     return 0;
@@ -1499,7 +1500,7 @@ RGWCoroutine* sync_single_entry(RGWDataSyncCtx *sc, const rgw_bucket_shard& src,
                                       lease_cr.get(), tn);
 }
 
-class RGWHandleFullSyncCR : public RGWCoroutine {
+class RGWDataFullSyncSingleEntryCR : public RGWCoroutine {
   RGWDataSyncCtx *sc;
   RGWDataSyncEnv *sync_env;
   rgw_bucket_shard source_bs;
@@ -1508,22 +1509,21 @@ class RGWHandleFullSyncCR : public RGWCoroutine {
   ceph::real_time timestamp;
   boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
   boost::intrusive_ptr<rgw::bucket_sync::Cache> bucket_shard_cache;
-  std::optional<RGWDataSyncShardMarkerTrack> marker_tracker;
   RGWSyncTraceNodeRef tn;
   rgw_bucket_index_marker_info remote_info;
   uint32_t sid;
   std::vector<store_gen_shards>::iterator each;
+  uint64_t i{0};
 
 public:
-  RGWHandleFullSyncCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs,
+  RGWDataFullSyncSingleEntryCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs,
                       const std::string& _key, rgw_raw_obj& _error_repo,
                       ceph::real_time& _timestamp, boost::intrusive_ptr<RGWContinuousLeaseCR> _lease_cr,
                       boost::intrusive_ptr<rgw::bucket_sync::Cache> _bucket_shard_cache,
-                      std::optional<RGWDataSyncShardMarkerTrack> _marker_tracker,
                       RGWSyncTraceNodeRef& _tn)
     : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), source_bs(_source_bs), key(_key),
       error_repo(_error_repo), timestamp(_timestamp), lease_cr(_lease_cr),
-      bucket_shard_cache(_bucket_shard_cache), marker_tracker(_marker_tracker), tn(_tn) {}
+      bucket_shard_cache(_bucket_shard_cache), tn(_tn) {}
 
 
   int operate(const DoutPrefixProvider *dpp) override {
@@ -1546,7 +1546,7 @@ public:
 
       source_bs.shard_id = 0;
       yield call(sync_single_entry(sc, source_bs, remote_info.oldest_gen, key, timestamp,
-                                  lease_cr, bucket_shard_cache, marker_tracker, error_repo, tn, false));
+                                  lease_cr, bucket_shard_cache, std::nullopt, error_repo, tn, false));
       if (retcode < 0) {
         tn->log(10, SSTR("full sync: failed to sync " << source_bs.shard_id << " of gen "
                         << remote_info.oldest_gen << ". Writing to error repo for retry"));
@@ -1562,47 +1562,59 @@ public:
       each = remote_info.generations.begin();
       for (; each != remote_info.generations.end(); each++) {
         for (sid = 0; sid < each->num_shards; sid++) {
+          if (retcode < 0) {
+            sid = source_bs.shard_id;
+            for (; sid < each->num_shards; sid++) {
+              source_bs.shard_id = sid;
+              yield_spawn_window(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
+                      rgw::error_repo::encode_key(source_bs, each->gen),
+                      timestamp), cct->_conf->rgw_data_sync_spawn_window,
+                      [&](uint64_t stack_id, int ret) {
+                        if (ret < 0) {
+                          tn->log(0, SSTR("ERROR: failed to write " << each->gen << ":"
+                          << sid << " to error repo: retcode=" << ret));
+                        }
+                        return ret;
+                      });
+            }
+            i = std::distance(remote_info.generations.begin(), each);
+            for (each[i]; each != remote_info.generations.end(); each++) {
+              for (sid = 0; sid < each->num_shards; sid++){
+                yield_spawn_window(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
+                        rgw::error_repo::encode_key(source_bs, each->gen),
+                        timestamp), cct->_conf->rgw_data_sync_spawn_window,
+                        [&](uint64_t stack_id, int ret) {
+                          if (ret < 0) {
+                            tn->log(0, SSTR("ERROR: failed to write " << each->gen << ":"
+                            << sid << " to error repo: retcode=" << ret));
+                          }
+                          return ret;
+                        });
+              }
+            }
+          } else {
           source_bs.shard_id = sid;
           tn->log(10, SSTR("full sync: syncing shard_id " << sid << " of gen " << each->gen));
           yield_spawn_window(sync_single_entry(sc, source_bs, each->gen, key, timestamp,
-                            lease_cr, bucket_shard_cache, marker_tracker, error_repo, tn, false),
+                            lease_cr, bucket_shard_cache, std::nullopt, error_repo, tn, false),
                             cct->_conf->rgw_data_sync_spawn_window,
                             [&](uint64_t stack_id, int ret) {
-                            if (ret < 0) {
-                              sid = source_bs.shard_id;
-                              for (; sid < each->num_shards; sid++) {
-                                source_bs.shard_id = sid;
-                                spawn(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
-                                        rgw::error_repo::encode_key(source_bs, each->gen),
-                                        timestamp), false);
-                                if (retcode < 0) {
-                                  tn->log(0, SSTR("ERROR: failed to write " << each->gen << ":"
-                                  << sid << " to error repo: retcode=" << retcode));
-                                }
-                              }
-                              auto i = std::distance(remote_info.generations.begin(), each);
-                              for (each[i]; each != remote_info.generations.end(); each++) {
-                                for (sid = 0; sid < each->num_shards; sid++){
-                                  spawn(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
-                                          rgw::error_repo::encode_key(source_bs, each->gen),
-                                          timestamp), false);
-                                  if (retcode < 0) {
-                                    tn->log(0, SSTR("ERROR: failed to write " << each->gen << ":"
-                                    << sid << " to error repo: retcode=" << retcode));
-                                  }
-                                }
+                              if (ret < 0) {
+                                retcode = ret;
                               }
-                            }
-                            return 0;
-                            });
-          drain_all_cb([&](uint64_t stack_id, int ret) {
-            if (ret < 0) {
-              tn->log(10, SSTR("a sync operation returned error: " << ret));
-            }
-            return ret;
-          });
+                              return retcode;
+                              });
+          }
         }
       }
+
+      drain_all_cb([&](uint64_t stack_id, int ret) {
+        if (ret < 0) {
+          tn->log(10, SSTR("a sync operation returned error: " << ret));
+        }
+        return ret;
+      });
+
       return set_cr_done();
     }
     return 0;
@@ -1665,14 +1677,10 @@ class RGWDataSyncShardCR : public RGWCoroutine {
 
   rgw_bucket_shard source_bs;
   std::optional<uint64_t> gen;
-  int error_result{0};
 
   // target number of entries to cache before recycling idle ones
   static constexpr size_t target_cache_size = 256;
   boost::intrusive_ptr<rgw::bucket_sync::Cache> bucket_shard_cache;
-  rgw_bucket_index_marker_info remote_info;
-  uint32_t sid;
-  std::vector<store_gen_shards>::iterator each;
 
   int parse_bucket_key(const std::string& key, rgw_bucket_shard& bs) const {
     return rgw_bucket_parse_bucket_key(sync_env->cct, key,
@@ -1799,8 +1807,10 @@ public:
           if (!marker_tracker->start(iter->first, total_entries, entry_timestamp)) {
             tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?"));
           } else {
-            yield call(new RGWHandleFullSyncCR(sc, source_bs, iter->first, error_repo, entry_timestamp,
-                            lease_cr, bucket_shard_cache, marker_tracker, tn));
+            yield_spawn_window(new RGWDataFullSyncSingleEntryCR(sc, source_bs, iter->first, error_repo, entry_timestamp,
+                            lease_cr, bucket_shard_cache, tn), cct->_conf->rgw_data_sync_spawn_window, std::nullopt);
+
+            yield call(marker_tracker->finish(iter->first));
           }
          sync_marker.marker = iter->first;
         }
@@ -1908,12 +1918,9 @@ public:
             }
             if (!gen) {
               // write all full sync obligations for the bucket to error repo
-              spawn(new RGWFullSyncErrorRepoCR(sc, source_bs, entry_timestamp, &error_result, tn), false);
-              if (error_result == 0) {
-                spawn(rgw::error_repo::remove_cr(sync_env->store->svc()->rados, error_repo,
-                                                error_marker, entry_timestamp), false);
-              } else {
-                tn->log(0, SSTR("ERROR: failed to write to error repo: retcode=" << error_result));
+              spawn(new RGWFullSyncErrorRepoCR(sc, source_bs, error_marker, entry_timestamp, tn), false);
+              if (retcode < 0) {
+                tn->log(0, SSTR("ERROR: failed to write to error repo: retcode=" << retcode));
               }
             } else {
               tn->log(20, SSTR("handle error entry key=" << to_string(source_bs, gen) << " timestamp=" << entry_timestamp));