]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/multisite: build a coroutine class to handle error repo logic
authorShilpa Jagannath <smanjara@redhat.com>
Tue, 7 Jun 2022 20:58:45 +0000 (16:58 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Mon, 8 Aug 2022 19:44:15 +0000 (15:44 -0400)
Signed-off-by: Shilpa Jagannath <smanjara@redhat.com>
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h
src/rgw/rgw_datalog.h
src/rgw/rgw_rest_log.cc
src/rgw/rgw_rest_log.h

index 49efc9c96028a53e005cebafd2e8e114ccb2026f..179f59fb284c64bc292cfdb21d63534b10b2a011 100644 (file)
@@ -1413,6 +1413,51 @@ public:
   }
 };
 
+class RGWFullSyncErrorRepoCR: public RGWCoroutine {
+  RGWDataSyncCtx *sc;
+  RGWDataSyncEnv *sync_env;
+  rgw_bucket_shard source_bs;
+  RGWSyncTraceNodeRef tn;
+  rgw_bucket_index_marker_info remote_info;
+  rgw_pool pool;
+  RGWDataChangesLog *datalog_changes{nullptr};
+
+  rgw_raw_obj datalog_oid_for_error_repo(rgw_bucket_shard& bs) {
+    int datalog_shard = datalog_changes->choose_oid(bs);
+    string oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, datalog_shard);
+    return rgw_raw_obj(pool, oid + ".retry");
+  }
+
+public:
+  RGWFullSyncErrorRepoCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs, RGWSyncTraceNodeRef& _tn)
+    : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
+      source_bs(_source_bs), tn(_tn) {
+        tn = sync_env->sync_tracer->add_node(_tn, "error_repo", SSTR(bucket_shard_str(source_bs)));
+      }
+
+  int operate(const DoutPrefixProvider *dpp) override {
+    reenter(this) {
+      yield {
+        call(new RGWReadRemoteBucketIndexLogInfoCR(sc, source_bs.bucket, &remote_info));
+        if (retcode < 0) {
+          return set_cr_error(retcode);
+        }
+        for (const auto& each : remote_info.gen_numshards) {
+          for (uint32_t sid = 0; sid < each.second; sid++) {
+            rgw_bucket_shard bs(source_bs.bucket, sid);
+            rgw_raw_obj error_repo = datalog_oid_for_error_repo(bs);
+            tn->log(10, SSTR("writing shard_id " << sid << "of gen" << each.first << " to error repo for retry"));
+            call(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
+                                                rgw::error_repo::encode_key(bs, each.first),
+                                                ceph::real_time{}));
+          }
+        }
+      }
+    }
+    return 0;
+  }
+};
+
 #define DATA_SYNC_MAX_ERR_ENTRIES 10
 
 class RGWDataSyncShardCR : public RGWCoroutine {
@@ -1468,6 +1513,7 @@ class RGWDataSyncShardCR : public RGWCoroutine {
   RGWSyncTraceNodeRef tn;
 
   rgw_bucket_shard source_bs;
+  std::optional<uint64_t> gen;
 
   // target number of entries to cache before recycling idle ones
   static constexpr size_t target_cache_size = 256;
@@ -1478,14 +1524,6 @@ class RGWDataSyncShardCR : public RGWCoroutine {
                                        &bs.bucket, &bs.shard_id);
   }
 
-  rgw_raw_obj datalog_oid_for_error_repo(rgw_bucket_shard& bs) {
-    auto shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0);
-    auto datalog_shard = (ceph_str_hash_linux(bs.bucket.name.data(), bs.bucket.name.size()) +
-        shard_shift) % cct->_conf->rgw_data_log_num_shards;
-    string oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, datalog_shard);
-    return rgw_raw_obj(pool, oid + ".retry");
-  }
-
   RGWCoroutine* sync_single_entry(const rgw_bucket_shard& src,
                                   std::optional<uint64_t> gen,
                                   const std::string& marker,
@@ -1714,7 +1752,6 @@ public:
           for (; iter != error_entries.end(); ++iter) {
             error_marker = iter->first;
             entry_timestamp = rgw::error_repo::decode_value(iter->second);
-            std::optional<uint64_t> gen;
             retcode = rgw::error_repo::decode_key(iter->first, source_bs, gen);
             if (retcode == -EINVAL) {
               // backward compatibility for string keys that don't encode a gen
@@ -1726,6 +1763,13 @@ public:
                                                error_marker, entry_timestamp), false);
               continue;
             }
+            if (!gen) {
+              // write all full sync obligations for the bucket to error repo
+              yield call(new RGWFullSyncErrorRepoCR(sc, source_bs, tn));
+              if (retcode < 0) {
+                tn->log(0, SSTR("ERROR: failed to write to error repo: retcode=" << retcode));
+              }
+            }
             tn->log(20, SSTR("handle error entry key=" << to_string(source_bs, gen) << " timestamp=" << entry_timestamp));
             spawn(sync_single_entry(source_bs, gen, "", entry_timestamp, true), false);
           }
@@ -1758,32 +1802,6 @@ public:
             marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
             continue;
           }
-          if (!log_iter->entry.gen) {
-            yield {
-              rgw_bucket_index_marker_info remote_info;
-              BucketIndexShardsManager remote_markers;
-              retcode = rgw_read_remote_bilog_info(sync_env->dpp, sc->conn, source_bs.bucket,
-                            remote_info, remote_markers, null_yield);
-
-              if (retcode < 0) {
-                tn->log(1, SSTR(" rgw_read_remote_bilog_info failed with retcode=" << retcode));
-                return retcode;
-              }
-              for (const auto& each : remote_info.gen_numshards) {
-                for (int sid = 0; sid < each.second; sid++) {
-                  rgw_bucket_shard bs(source_bs.bucket, sid);
-                  error_repo = datalog_oid_for_error_repo(bs);
-                  tn->log(10, SSTR("writing shard_id " << sid << "of gen" << each.first << " to error repo for retry"));
-                  call(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
-                                                      rgw::error_repo::encode_key(bs, each.first),
-                                                      ceph::real_time{}));
-                  if (retcode < 0) {
-                    tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode));
-                  }
-                }
-              }
-            }
-          }
           if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
             tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
           } else {
index 07814a7ec8736b62a1cc309f3e38dba7d5fd0c2e..040b65d5d0b5dfa40fc1a7c4231f929f315314eb 100644 (file)
@@ -674,7 +674,7 @@ struct rgw_bucket_index_marker_info {
   bool syncstopped{false};
   uint64_t oldest_gen = 0;
   uint64_t latest_gen = 0;
-  std::vector<std::pair<int, int>> gen_numshards;
+  std::vector<std::pair<uint64_t, uint32_t>> gen_numshards;
 
   void decode_json(JSONObj *obj) {
     JSONDecoder::decode_json("bucket_ver", bucket_ver, obj);
index d54f76a3249a22055c072d862fadc97b76b903c9..0bc4837c9c1bfb43920a94e38e0629f3932a56f6 100644 (file)
@@ -284,7 +284,6 @@ class RGWDataChangesLog {
   std::thread renew_thread;
 
   std::function<bool(const rgw_bucket& bucket, optional_yield y, const DoutPrefixProvider *dpp)> bucket_filter;
-  int choose_oid(const rgw_bucket_shard& bs);
   bool going_down() const;
   bool filter_bucket(const DoutPrefixProvider *dpp, const rgw_bucket& bucket, optional_yield y) const;
   int renew_entries(const DoutPrefixProvider *dpp);
@@ -296,7 +295,7 @@ public:
 
   int start(const DoutPrefixProvider *dpp, const RGWZone* _zone, const RGWZoneParams& zoneparams,
            librados::Rados* lr);
-
+  int choose_oid(const rgw_bucket_shard& bs);
   int add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info,
                const rgw::bucket_log_layout_generation& gen, int shard_id);
   int get_log_shard_id(rgw_bucket& bucket, int shard_id);
index 500c85bac6f95883e55266c23883dcb0d10f9eb4..b3940e58de3fd859be1a6d0d254b541b87a0def6 100644 (file)
@@ -566,15 +566,9 @@ void RGWOp_BILog_Info::execute(optional_yield y) {
   oldest_gen = logs.front().gen;
   latest_gen = logs.back().gen;
 
-  std::vector<std::pair<int, int>> gen_numshards;
-  for (auto gen = logs.front().gen; gen <= logs.back().gen; gen++) {
-    auto log = std::find_if(logs.begin(), logs.end(), rgw::matches_gen(gen));
-    if (log == logs.end()) {
-      ldpp_dout(s, 5) << "ERROR: no log layout with gen=" << gen << dendl;
-      op_ret = -ENOENT;
-    }
-    const auto& num_shards = log->layout.in_index.layout.num_shards;
-    gen_numshards.push_back(std::make_pair(gen, num_shards));
+  for (auto& log : logs) {
+      uint32_t num_shards = log.layout.in_index.layout.num_shards;
+      gen_numshards.push_back(std::make_pair(log.gen, num_shards));
   }
 }
 
@@ -593,6 +587,7 @@ void RGWOp_BILog_Info::send_response() {
   encode_json("syncstopped", syncstopped, s->formatter);
   encode_json("oldest_gen", oldest_gen, s->formatter);
   encode_json("latest_gen", latest_gen, s->formatter);
+  //encode_json("gen_numshards", gen_numshards, s->formatter); TODO: add supporting encode/decode for std::pair
   s->formatter->close_section();
 
   flusher.flush();
index 3232f03ca6854da29da7b19d7d0fb487e460fa16..9eed2526b76d4761123f735090dc7805884f2850 100644 (file)
@@ -53,6 +53,7 @@ class RGWOp_BILog_Info : public RGWRESTOp {
   bool syncstopped;
   uint64_t oldest_gen = 0;
   uint64_t latest_gen = 0;
+  std::vector<std::pair<uint64_t, uint32_t>> gen_numshards;
 
 public:
   RGWOp_BILog_Info() : bucket_ver(), master_ver(), syncstopped(false) {}