]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: RGWListBucketIndexesCR only needs zero shard
authorAdam C. Emerson <aemerson@redhat.com>
Fri, 10 Sep 2021 15:38:05 +0000 (11:38 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Fri, 17 Sep 2021 17:31:09 +0000 (13:31 -0400)
We only need to check one shard, and everything has shard zero.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/rgw_data_sync.cc

index 20fed71f759da55663657d3e705930af6cacc982..e2469ecc02af90eb0556d8715b861134c46e8cc4 100644 (file)
@@ -832,94 +832,131 @@ struct bucket_instance_meta_info {
   }
 };
 
-class RGWListBucketIndexesCR : public RGWCoroutine {
+class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
   RGWDataSyncCtx *sc;
   RGWDataSyncEnv *sync_env;
+  const string instance_key;
 
-  rgw::sal::RadosStore* store;
+  rgw_bucket_index_marker_info *info;
+
+public:
+  RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncCtx *_sc,
+                                  const rgw_bucket& bucket,
+                                  rgw_bucket_index_marker_info *_info)
+    : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
+      instance_key(bucket.get_key()), info(_info) {}
+
+  int operate(const DoutPrefixProvider *dpp) override {
+    reenter(this) {
+      yield {
+        rgw_http_param_pair pairs[] = { { "type" , "bucket-index" },
+                                       { "bucket-instance", instance_key.c_str() },
+                                       { "info" , NULL },
+                                       { NULL, NULL } };
+
+        string p = "/admin/log/";
+        call(new RGWReadRESTResourceCR<rgw_bucket_index_marker_info>(sync_env->cct, sc->conn, sync_env->http_manager, p, pairs, info));
+      }
+      if (retcode < 0) {
+        return set_cr_error(retcode);
+      }
+
+      return set_cr_done();
+    }
+    return 0;
+  }
+};
+
+
+class RGWListBucketIndexesCR : public RGWCoroutine {
+  RGWDataSyncCtx *sc;
+  RGWDataSyncEnv *sync_env = sc->env;
+
+  rgw::sal::RadosStore* store = sync_env->store;
 
   rgw_data_sync_status *sync_status;
-  int num_shards;
 
-  int req_ret;
-  int ret;
+  int req_ret = 0;
+  int ret = 0;
 
   list<string>::iterator iter;
 
-  RGWShardedOmapCRManager *entries_index;
-
-  string oid_prefix;
+  unique_ptr<RGWShardedOmapCRManager> entries_index;
+  string oid_prefix =
+    datalog_sync_full_sync_index_prefix + "." + sc->source_zone.id;
 
-  string path;
+  string path = "/admin/metadata/bucket.instance";
   bucket_instance_meta_info meta_info;
   string key;
-  string s;
-  int i;
 
-  bool failed;
-  bool truncated;
+  bool failed = false;
+  bool truncated = false;
   read_metadata_list result;
 
 public:
-  RGWListBucketIndexesCR(RGWDataSyncCtx *_sc,
-                         rgw_data_sync_status *_sync_status) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
-                                                      store(sync_env->store), sync_status(_sync_status),
-                                                     req_ret(0), ret(0), entries_index(NULL), i(0), failed(false), truncated(false) {
-    oid_prefix = datalog_sync_full_sync_index_prefix + "." + sc->source_zone.id; 
-    path = "/admin/metadata/bucket.instance";
-    num_shards = sync_status->sync_info.num_shards;
-  }
-  ~RGWListBucketIndexesCR() override {
-    delete entries_index;
-  }
+  RGWListBucketIndexesCR(RGWDataSyncCtx* sc,
+                         rgw_data_sync_status* sync_status)
+    : RGWCoroutine(sc->cct), sc(sc), sync_status(sync_status) {}
+  ~RGWListBucketIndexesCR() override { }
 
   int operate(const DoutPrefixProvider *dpp) override {
     reenter(this) {
-      entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
-                                                 sync_env->svc->zone->get_zone_params().log_pool,
-                                                  oid_prefix);
+      entries_index = std::make_unique<RGWShardedOmapCRManager>(
+       sync_env->async_rados, store, this,
+       cct->_conf->rgw_data_log_num_shards,
+       sync_env->svc->zone->get_zone_params().log_pool,
+       oid_prefix);
       yield; // yield so OmapAppendCRs can start
 
       do {
         yield {
-          string entrypoint = string("/admin/metadata/bucket.instance");
+          string entrypoint = "/admin/metadata/bucket.instance"s;
 
           rgw_http_param_pair pairs[] = {{"max-entries", "1000"},
                                          {"marker", result.marker.c_str()},
                                          {NULL, NULL}};
 
-          call(new RGWReadRESTResourceCR<read_metadata_list>(sync_env->cct, sc->conn, sync_env->http_manager,
-                                                             entrypoint, pairs, &result));
-        }
-        if (retcode < 0) {
-          ldpp_dout(dpp, 0) << "ERROR: failed to fetch metadata for section bucket.instance" << dendl;
+          call(new RGWReadRESTResourceCR<read_metadata_list>(
+                sync_env->cct, sc->conn, sync_env->http_manager,
+                entrypoint, pairs, &result));
+       }
+       if (retcode < 0) {
+         ldpp_dout(dpp, 0)
+           << "ERROR: failed to fetch metadata for section bucket.instance"
+           << dendl;
           return set_cr_error(retcode);
         }
 
         for (iter = result.keys.begin(); iter != result.keys.end(); ++iter) {
-          ldpp_dout(dpp, 20) << "list metadata: section=bucket.instance key=" << *iter << dendl;
+          ldpp_dout(dpp, 20) << "list metadata: section=bucket.instance key="
+                            << *iter << dendl;
           key = *iter;
 
           yield {
             rgw_http_param_pair pairs[] = {{"key", key.c_str()},
                                            {NULL, NULL}};
 
-            call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(sync_env->cct, sc->conn, sync_env->http_manager, path, pairs, &meta_info));
+            call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(
+                  sync_env->cct, sc->conn, sync_env->http_manager, path, pairs,
+                  &meta_info));
           }
-
-          num_shards = meta_info.data.get_bucket_info().layout.current_index.layout.normal.num_shards;
-          if (num_shards > 0) {
-            for (i = 0; i < num_shards; i++) {
-              char buf[16];
-              snprintf(buf, sizeof(buf), ":%d", i);
-              s = key + buf;
-              yield entries_index->append(s, sync_env->svc->datalog_rados->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i));
-            }
-          } else {
-            yield entries_index->append(key, sync_env->svc->datalog_rados->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1));
-          }
-        }
-        truncated = result.truncated;
+         if (retcode < 0) {
+           ldpp_dout(dpp, 0) << "ERROR: failed to fetch metadata for key: "
+                             << key << dendl;
+           return set_cr_error(retcode);
+         }
+         // Now that bucket full sync is bucket-wide instead of
+         // per-shard, we only need to register a single shard of
+         // each bucket to guarantee that sync will see everything
+         // that happened before data full sync starts. This also
+         // means we don't have to care about the bucket's current
+         // shard count.
+         yield entries_index->append(
+           fmt::format("{}:{}", key, 0),
+           sync_env->svc->datalog_rados->get_log_shard_id(
+             meta_info.data.get_bucket_info().bucket, 0));
+       }
+       truncated = result.truncated;
       } while (truncated);
 
       yield {
@@ -928,28 +965,35 @@ public:
         }
       }
       if (!failed) {
-        for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status->sync_markers.begin(); iter != sync_status->sync_markers.end(); ++iter) {
+        for (auto iter = sync_status->sync_markers.begin();
+            iter != sync_status->sync_markers.end();
+            ++iter) {
           int shard_id = (int)iter->first;
           rgw_data_sync_marker& marker = iter->second;
           marker.total_entries = entries_index->get_total_entries(shard_id);
-          spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(dpp, sync_env->async_rados, sync_env->svc->sysobj,
-                                                                rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)),
-                                                                marker),
-                true);
-        }
+          spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(
+                 dpp, sync_env->async_rados, sync_env->svc->sysobj,
+                 rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool,
+                             RGWDataSyncStatusManager::shard_obj_name(
+                               sc->source_zone, shard_id)),
+                 marker),
+               true);
+       }
       } else {
-        yield call(sync_env->error_logger->log_error_cr(dpp, sc->conn->get_remote_id(), "data.init", "",
-                                                        EIO, string("failed to build bucket instances map")));
+        yield call(sync_env->error_logger->log_error_cr(
+                    dpp, sc->conn->get_remote_id(), "data.init", "",
+                    EIO, string("failed to build bucket instances map")));
       }
       while (collect(&ret, NULL)) {
-        if (ret < 0) {
-          yield call(sync_env->error_logger->log_error_cr(dpp, sc->conn->get_remote_id(), "data.init", "",
-                                                          -ret, string("failed to store sync status: ") + cpp_strerror(-ret)));
-          req_ret = ret;
-        }
-        yield;
+       if (ret < 0) {
+          yield call(sync_env->error_logger->log_error_cr(
+                      dpp, sc->conn->get_remote_id(), "data.init", "",
+                      -ret, string("failed to store sync status: ") +
+                      cpp_strerror(-ret)));
+         req_ret = ret;
+       }
+       yield;
       }
-
       drain_all();
       if (req_ret < 0) {
         yield return set_cr_error(req_ret);
@@ -2711,41 +2755,6 @@ string RGWDataSyncStatusManager::shard_obj_name(const rgw_zone_id& source_zone,
   return string(buf);
 }
 
-class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
-  RGWDataSyncCtx *sc;
-  RGWDataSyncEnv *sync_env;
-  const string instance_key;
-
-  rgw_bucket_index_marker_info *info;
-
-public:
-  RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncCtx *_sc,
-                                  const rgw_bucket& bucket,
-                                  rgw_bucket_index_marker_info *_info)
-    : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
-      instance_key(bucket.get_key()), info(_info) {}
-
-  int operate(const DoutPrefixProvider *dpp) override {
-    reenter(this) {
-      yield {
-        rgw_http_param_pair pairs[] = { { "type" , "bucket-index" },
-                                       { "bucket-instance", instance_key.c_str() },
-                                       { "info" , NULL },
-                                       { NULL, NULL } };
-
-        string p = "/admin/log/";
-        call(new RGWReadRESTResourceCR<rgw_bucket_index_marker_info>(sync_env->cct, sc->conn, sync_env->http_manager, p, pairs, info));
-      }
-      if (retcode < 0) {
-        return set_cr_error(retcode);
-      }
-
-      return set_cr_done();
-    }
-    return 0;
-  }
-};
-
 class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
   RGWDataSyncCtx *sc;
   RGWDataSyncEnv *sync_env;