}
};
-class RGWListBucketIndexesCR : public RGWCoroutine {
+class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
+ const string instance_key;
- rgw::sal::RadosStore* store;
+ rgw_bucket_index_marker_info *info;
+
+public:
+ RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncCtx *_sc,
+ const rgw_bucket& bucket,
+ rgw_bucket_index_marker_info *_info)
+ : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
+ instance_key(bucket.get_key()), info(_info) {}
+
+ int operate(const DoutPrefixProvider *dpp) override {
+ reenter(this) {
+ yield {
+ rgw_http_param_pair pairs[] = { { "type" , "bucket-index" },
+ { "bucket-instance", instance_key.c_str() },
+ { "info" , NULL },
+ { NULL, NULL } };
+
+ string p = "/admin/log/";
+ call(new RGWReadRESTResourceCR<rgw_bucket_index_marker_info>(sync_env->cct, sc->conn, sync_env->http_manager, p, pairs, info));
+ }
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+
+ return set_cr_done();
+ }
+ return 0;
+ }
+};
+
+
+class RGWListBucketIndexesCR : public RGWCoroutine {
+ RGWDataSyncCtx *sc;
+ RGWDataSyncEnv *sync_env = sc->env;
+
+ rgw::sal::RadosStore* store = sync_env->store;
rgw_data_sync_status *sync_status;
- int num_shards;
- int req_ret;
- int ret;
+ int req_ret = 0;
+ int ret = 0;
list<string>::iterator iter;
- RGWShardedOmapCRManager *entries_index;
-
- string oid_prefix;
+ unique_ptr<RGWShardedOmapCRManager> entries_index;
+ string oid_prefix =
+ datalog_sync_full_sync_index_prefix + "." + sc->source_zone.id;
- string path;
+ string path = "/admin/metadata/bucket.instance";
bucket_instance_meta_info meta_info;
string key;
- string s;
- int i;
- bool failed;
- bool truncated;
+ bool failed = false;
+ bool truncated = false;
read_metadata_list result;
public:
- RGWListBucketIndexesCR(RGWDataSyncCtx *_sc,
- rgw_data_sync_status *_sync_status) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
- store(sync_env->store), sync_status(_sync_status),
- req_ret(0), ret(0), entries_index(NULL), i(0), failed(false), truncated(false) {
- oid_prefix = datalog_sync_full_sync_index_prefix + "." + sc->source_zone.id;
- path = "/admin/metadata/bucket.instance";
- num_shards = sync_status->sync_info.num_shards;
- }
- ~RGWListBucketIndexesCR() override {
- delete entries_index;
- }
+ RGWListBucketIndexesCR(RGWDataSyncCtx* sc,
+ rgw_data_sync_status* sync_status)
+ : RGWCoroutine(sc->cct), sc(sc), sync_status(sync_status) {}
+ ~RGWListBucketIndexesCR() override { }
int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
- entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
- sync_env->svc->zone->get_zone_params().log_pool,
- oid_prefix);
+ entries_index = std::make_unique<RGWShardedOmapCRManager>(
+ sync_env->async_rados, store, this,
+ cct->_conf->rgw_data_log_num_shards,
+ sync_env->svc->zone->get_zone_params().log_pool,
+ oid_prefix);
yield; // yield so OmapAppendCRs can start
do {
yield {
- string entrypoint = string("/admin/metadata/bucket.instance");
+ string entrypoint = "/admin/metadata/bucket.instance"s;
rgw_http_param_pair pairs[] = {{"max-entries", "1000"},
{"marker", result.marker.c_str()},
{NULL, NULL}};
- call(new RGWReadRESTResourceCR<read_metadata_list>(sync_env->cct, sc->conn, sync_env->http_manager,
- entrypoint, pairs, &result));
- }
- if (retcode < 0) {
- ldpp_dout(dpp, 0) << "ERROR: failed to fetch metadata for section bucket.instance" << dendl;
+ call(new RGWReadRESTResourceCR<read_metadata_list>(
+ sync_env->cct, sc->conn, sync_env->http_manager,
+ entrypoint, pairs, &result));
+ }
+ if (retcode < 0) {
+ ldpp_dout(dpp, 0)
+ << "ERROR: failed to fetch metadata for section bucket.instance"
+ << dendl;
return set_cr_error(retcode);
}
for (iter = result.keys.begin(); iter != result.keys.end(); ++iter) {
- ldpp_dout(dpp, 20) << "list metadata: section=bucket.instance key=" << *iter << dendl;
+ ldpp_dout(dpp, 20) << "list metadata: section=bucket.instance key="
+ << *iter << dendl;
key = *iter;
yield {
rgw_http_param_pair pairs[] = {{"key", key.c_str()},
{NULL, NULL}};
- call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(sync_env->cct, sc->conn, sync_env->http_manager, path, pairs, &meta_info));
+ call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(
+ sync_env->cct, sc->conn, sync_env->http_manager, path, pairs,
+ &meta_info));
}
-
- num_shards = meta_info.data.get_bucket_info().layout.current_index.layout.normal.num_shards;
- if (num_shards > 0) {
- for (i = 0; i < num_shards; i++) {
- char buf[16];
- snprintf(buf, sizeof(buf), ":%d", i);
- s = key + buf;
- yield entries_index->append(s, sync_env->svc->datalog_rados->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i));
- }
- } else {
- yield entries_index->append(key, sync_env->svc->datalog_rados->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1));
- }
- }
- truncated = result.truncated;
+ if (retcode < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to fetch metadata for key: "
+ << key << dendl;
+ return set_cr_error(retcode);
+ }
+ // Now that bucket full sync is bucket-wide instead of
+ // per-shard, we only need to register a single shard of
+ // each bucket to guarantee that sync will see everything
+ // that happened before data full sync starts. This also
+ // means we don't have to care about the bucket's current
+ // shard count.
+ yield entries_index->append(
+ fmt::format("{}:{}", key, 0),
+ sync_env->svc->datalog_rados->get_log_shard_id(
+ meta_info.data.get_bucket_info().bucket, 0));
+ }
+ truncated = result.truncated;
} while (truncated);
yield {
}
}
if (!failed) {
- for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status->sync_markers.begin(); iter != sync_status->sync_markers.end(); ++iter) {
+ for (auto iter = sync_status->sync_markers.begin();
+ iter != sync_status->sync_markers.end();
+ ++iter) {
int shard_id = (int)iter->first;
rgw_data_sync_marker& marker = iter->second;
marker.total_entries = entries_index->get_total_entries(shard_id);
- spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(dpp, sync_env->async_rados, sync_env->svc->sysobj,
- rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)),
- marker),
- true);
- }
+ spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(
+ dpp, sync_env->async_rados, sync_env->svc->sysobj,
+ rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool,
+ RGWDataSyncStatusManager::shard_obj_name(
+ sc->source_zone, shard_id)),
+ marker),
+ true);
+ }
} else {
- yield call(sync_env->error_logger->log_error_cr(dpp, sc->conn->get_remote_id(), "data.init", "",
- EIO, string("failed to build bucket instances map")));
+ yield call(sync_env->error_logger->log_error_cr(
+ dpp, sc->conn->get_remote_id(), "data.init", "",
+ EIO, string("failed to build bucket instances map")));
}
while (collect(&ret, NULL)) {
- if (ret < 0) {
- yield call(sync_env->error_logger->log_error_cr(dpp, sc->conn->get_remote_id(), "data.init", "",
- -ret, string("failed to store sync status: ") + cpp_strerror(-ret)));
- req_ret = ret;
- }
- yield;
+ if (ret < 0) {
+ yield call(sync_env->error_logger->log_error_cr(
+ dpp, sc->conn->get_remote_id(), "data.init", "",
+ -ret, string("failed to store sync status: ") +
+ cpp_strerror(-ret)));
+ req_ret = ret;
+ }
+ yield;
}
-
drain_all();
if (req_ret < 0) {
yield return set_cr_error(req_ret);
return string(buf);
}
-class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
- RGWDataSyncCtx *sc;
- RGWDataSyncEnv *sync_env;
- const string instance_key;
-
- rgw_bucket_index_marker_info *info;
-
-public:
- RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncCtx *_sc,
- const rgw_bucket& bucket,
- rgw_bucket_index_marker_info *_info)
- : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
- instance_key(bucket.get_key()), info(_info) {}
-
- int operate(const DoutPrefixProvider *dpp) override {
- reenter(this) {
- yield {
- rgw_http_param_pair pairs[] = { { "type" , "bucket-index" },
- { "bucket-instance", instance_key.c_str() },
- { "info" , NULL },
- { NULL, NULL } };
-
- string p = "/admin/log/";
- call(new RGWReadRESTResourceCR<rgw_bucket_index_marker_info>(sync_env->cct, sc->conn, sync_env->http_manager, p, pairs, info));
- }
- if (retcode < 0) {
- return set_cr_error(retcode);
- }
-
- return set_cr_done();
- }
- return 0;
- }
-};
-
class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;