]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/multisite: Store generation and its corresponding num shards in a struct.
authorShilpa Jagannath <smanjara@redhat.com>
Tue, 14 Jun 2022 03:27:24 +0000 (23:27 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Mon, 8 Aug 2022 19:44:15 +0000 (15:44 -0400)
Call remove_cr() to remove full sync obligations after writing all
shard entries in to error repo. Replace call() with spawn() and yield_spawn_window()

Signed-off-by: Shilpa Jagannath <smanjara@redhat.com>
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h
src/rgw/rgw_rest_log.cc
src/rgw/rgw_rest_log.h

index 179f59fb284c64bc292cfdb21d63534b10b2a011..8d897d175c4b1a3c9ef9d53850ddcb5b720cecd0 100644 (file)
@@ -1417,10 +1417,16 @@ class RGWFullSyncErrorRepoCR: public RGWCoroutine {
   RGWDataSyncCtx *sc;
   RGWDataSyncEnv *sync_env;
   rgw_bucket_shard source_bs;
+  ceph::real_time timestamp;
+  int *error_result;
   RGWSyncTraceNodeRef tn;
   rgw_bucket_index_marker_info remote_info;
   rgw_pool pool;
   RGWDataChangesLog *datalog_changes{nullptr};
+  rgw_raw_obj error_repo;
+  uint32_t sid;
+  rgw_bucket_shard bs;
+  std::vector<store_gen_shards>::const_iterator each;
 
   rgw_raw_obj datalog_oid_for_error_repo(rgw_bucket_shard& bs) {
     int datalog_shard = datalog_changes->choose_oid(bs);
@@ -1429,30 +1435,48 @@ class RGWFullSyncErrorRepoCR: public RGWCoroutine {
   }
 
 public:
-  RGWFullSyncErrorRepoCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs, RGWSyncTraceNodeRef& _tn)
+  RGWFullSyncErrorRepoCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs,
+                         ceph::real_time& _timestamp, int *_error_result, RGWSyncTraceNodeRef& _tn)
     : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
-      source_bs(_source_bs), tn(_tn) {
+      source_bs(_source_bs), timestamp(_timestamp), error_result(_error_result), tn(_tn) {
         tn = sync_env->sync_tracer->add_node(_tn, "error_repo", SSTR(bucket_shard_str(source_bs)));
       }
 
   int operate(const DoutPrefixProvider *dpp) override {
     reenter(this) {
-      yield {
-        call(new RGWReadRemoteBucketIndexLogInfoCR(sc, source_bs.bucket, &remote_info));
-        if (retcode < 0) {
-          return set_cr_error(retcode);
-        }
-        for (const auto& each : remote_info.gen_numshards) {
-          for (uint32_t sid = 0; sid < each.second; sid++) {
-            rgw_bucket_shard bs(source_bs.bucket, sid);
-            rgw_raw_obj error_repo = datalog_oid_for_error_repo(bs);
-            tn->log(10, SSTR("writing shard_id " << sid << "of gen" << each.first << " to error repo for retry"));
-            call(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
-                                                rgw::error_repo::encode_key(bs, each.first),
-                                                ceph::real_time{}));
-          }
+      yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, source_bs.bucket, &remote_info));
+      if (retcode < 0) {
+        return set_cr_error(retcode);
+      }
+
+      each = remote_info.generations.cbegin();
+      for (; each != remote_info.generations.cend(); each++) {
+        for (sid = 0; sid < each->num_shards; sid++) {
+          bs.bucket = source_bs.bucket;
+          bs.shard_id = sid;
+          error_repo = datalog_oid_for_error_repo(bs);
+          tn->log(10, SSTR("writing shard_id " << sid << " of gen " << each->gen << " to error repo for retry"));
+          yield_spawn_window(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
+                            rgw::error_repo::encode_key(bs, each->gen),
+                            timestamp), cct->_conf->rgw_data_sync_spawn_window,
+                            [&](uint64_t stack_id, int ret) {
+                              if (ret < 0) {
+                                tn->log(10, SSTR("writing to error repo returned error: " << ret));
+                                *error_result = ret;
+                              }
+                              return 0;
+                              });
         }
       }
+      drain_all_cb([&](uint64_t stack_id, int ret) {
+                   if (ret < 0) {
+                     tn->log(10, SSTR("writing to error repo returned error: " << ret));
+                     *error_result = ret;
+                   }
+                   return 0;
+                 });
+
+      return set_cr_done();
     }
     return 0;
   }
@@ -1514,6 +1538,7 @@ class RGWDataSyncShardCR : public RGWCoroutine {
 
   rgw_bucket_shard source_bs;
   std::optional<uint64_t> gen;
+  int error_result{0};
 
   // target number of entries to cache before recycling idle ones
   static constexpr size_t target_cache_size = 256;
@@ -1765,13 +1790,17 @@ public:
             }
             if (!gen) {
               // write all full sync obligations for the bucket to error repo
-              yield call(new RGWFullSyncErrorRepoCR(sc, source_bs, tn));
-              if (retcode < 0) {
-                tn->log(0, SSTR("ERROR: failed to write to error repo: retcode=" << retcode));
+              spawn(new RGWFullSyncErrorRepoCR(sc, source_bs, entry_timestamp, &error_result, tn), false);
+              if (error_result == 0) {
+                spawn(rgw::error_repo::remove_cr(sync_env->store->svc()->rados, error_repo,
+                                                error_marker, entry_timestamp), false);
+              } else {
+                tn->log(0, SSTR("ERROR: failed to write to error repo: retcode=" << error_result));
               }
+            } else {
+              tn->log(20, SSTR("handle error entry key=" << to_string(source_bs, gen) << " timestamp=" << entry_timestamp));
+              spawn(sync_single_entry(source_bs, gen, "", entry_timestamp, true), false);
             }
-            tn->log(20, SSTR("handle error entry key=" << to_string(source_bs, gen) << " timestamp=" << entry_timestamp));
-            spawn(sync_single_entry(source_bs, gen, "", entry_timestamp, true), false);
           }
           if (!omapvals->more) {
             error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs);
index 040b65d5d0b5dfa40fc1a7c4231f929f315314eb..b35744206cf61dea6ade22b3aee326e2e81cd967 100644 (file)
@@ -667,6 +667,21 @@ struct bilog_status_v2 {
   void decode_json(JSONObj *obj);
 };
 
+struct store_gen_shards {
+  uint64_t gen = 0;
+  uint32_t num_shards = 0;
+
+  void dump(Formatter *f) const {
+    encode_json("gen", gen, f);
+    encode_json("num_shards", num_shards, f);
+  }
+
+  void decode_json(JSONObj *obj) {
+    JSONDecoder::decode_json("gen", gen, obj);
+    JSONDecoder::decode_json("num_shards", num_shards, obj);
+  }
+};
+
 struct rgw_bucket_index_marker_info {
   std::string bucket_ver;
   std::string master_ver;
@@ -674,7 +689,7 @@ struct rgw_bucket_index_marker_info {
   bool syncstopped{false};
   uint64_t oldest_gen = 0;
   uint64_t latest_gen = 0;
-  std::vector<std::pair<uint64_t, uint32_t>> gen_numshards;
+  std::vector<store_gen_shards> generations;
 
   void decode_json(JSONObj *obj) {
     JSONDecoder::decode_json("bucket_ver", bucket_ver, obj);
@@ -683,6 +698,7 @@ struct rgw_bucket_index_marker_info {
     JSONDecoder::decode_json("syncstopped", syncstopped, obj);
     JSONDecoder::decode_json("oldest_gen", oldest_gen, obj);
     JSONDecoder::decode_json("latest_gen", latest_gen, obj);
+    JSONDecoder::decode_json("generations", generations, obj);
   }
 };
 
index b3940e58de3fd859be1a6d0d254b541b87a0def6..81622cab5b41c41c76eb1f13ac667566e4040342 100644 (file)
@@ -568,7 +568,7 @@ void RGWOp_BILog_Info::execute(optional_yield y) {
 
   for (auto& log : logs) {
       uint32_t num_shards = log.layout.in_index.layout.num_shards;
-      gen_numshards.push_back(std::make_pair(log.gen, num_shards));
+      generations.push_back({log.gen, num_shards});
   }
 }
 
@@ -587,7 +587,7 @@ void RGWOp_BILog_Info::send_response() {
   encode_json("syncstopped", syncstopped, s->formatter);
   encode_json("oldest_gen", oldest_gen, s->formatter);
   encode_json("latest_gen", latest_gen, s->formatter);
-  //encode_json("gen_numshards", gen_numshards, s->formatter); TODO: add supporting encode/decode for std::pair
+  encode_json("generations", generations, s->formatter);
   s->formatter->close_section();
 
   flusher.flush();
index 9eed2526b76d4761123f735090dc7805884f2850..36936f1eb4f7b5b3a8e651c894ec3f6b84ec725d 100644 (file)
@@ -20,6 +20,7 @@
 #include "rgw_rest_s3.h"
 #include "rgw_metadata.h"
 #include "rgw_mdlog.h"
+#include "rgw_data_sync.h"
 
 class RGWOp_BILog_List : public RGWRESTOp {
   bool sent_header;
@@ -53,7 +54,7 @@ class RGWOp_BILog_Info : public RGWRESTOp {
   bool syncstopped;
   uint64_t oldest_gen = 0;
   uint64_t latest_gen = 0;
-  std::vector<std::pair<uint64_t, uint32_t>> gen_numshards;
+  std::vector<store_gen_shards> generations;
 
 public:
   RGWOp_BILog_Info() : bucket_ver(), master_ver(), syncstopped(false) {}