}
return 0;
}
+
+#define OMAP_READ_MAX_ENTRIES 10
+class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+ RGWRados *store;
+
+ const int shard_id;
+ int max_entries;
+
+ set<string>& recovering_buckets;
+ string marker;
+ string error_oid;
+
+ set<string> error_entries;
+ int max_omap_entries;
+ int count;
+
+public:
+ RGWReadRecoveringBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id,
+ set<string>& _recovering_buckets, const int _max_entries)
+ : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
+ recovering_buckets(_recovering_buckets), max_omap_entries(OMAP_READ_MAX_ENTRIES)
+ {
+ error_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id) + ".retry";
+ }
+
+ int operate() override;
+};
+
+int RGWReadRecoveringBucketShardsCoroutine::operate()
+{
+ reenter(this){
+ //read recovering bucket shards
+ count = 0;
+ do {
+ yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->get_zone_params().log_pool, error_oid),
+ marker, &error_entries, max_omap_entries));
+
+ if (retcode == -ENOENT) {
+ break;
+ }
+
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "failed to read recovering bucket shards with "
+ << cpp_strerror(retcode) << dendl;
+ return set_cr_error(retcode);
+ }
+
+ if (error_entries.empty()) {
+ break;
+ }
+
+ count += error_entries.size();
+ marker = *error_entries.rbegin();
+ recovering_buckets.insert(error_entries.begin(), error_entries.end());
+ }while((int)error_entries.size() == max_omap_entries && count < max_entries);
+
+ return set_cr_done();
+ }
+
+ return 0;
+}
+
+class RGWReadPendingBucketShardsCoroutine : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+ RGWRados *store;
+
+ const int shard_id;
+ int max_entries;
+
+ set<string>& pending_buckets;
+ string marker;
+ string status_oid;
+
+ rgw_data_sync_marker* sync_marker;
+ int count;
+
+ list<rgw_data_change_log_entry> log_entries;
+ bool truncated;
+
+public:
+ RGWReadPendingBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id,
+ set<string>& _pending_buckets,
+ rgw_data_sync_marker* _sync_marker, const int _max_entries)
+ : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
+ pending_buckets(_pending_buckets), sync_marker(_sync_marker)
+ {
+ status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
+ }
+
+ int operate() override;
+};
+
+int RGWReadPendingBucketShardsCoroutine::operate()
+{
+ reenter(this){
+ //read sync status marker
+ using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
+ yield call(new CR(sync_env->async_rados, store,
+ rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
+ sync_marker));
+ if (retcode < 0) {
+ ldout(sync_env->cct,0) << "failed to read sync status marker with "
+ << cpp_strerror(retcode) << dendl;
+ return set_cr_error(retcode);
+ }
+
+ //read pending bucket shards
+ marker = sync_marker->marker;
+ count = 0;
+ do{
+ yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &marker, &log_entries, &truncated));
+
+ if (retcode == -ENOENT) {
+ break;
+ }
+
+ if (retcode < 0) {
+ ldout(sync_env->cct,0) << "failed to read remote data log info with "
+ << cpp_strerror(retcode) << dendl;
+ return set_cr_error(retcode);
+ }
+
+ if (log_entries.empty()) {
+ break;
+ }
+
+ count += log_entries.size();
+ for (const auto& entry : log_entries) {
+ pending_buckets.insert(entry.entry.key);
+ }
+ }while(truncated && count < max_entries);
+
+ return set_cr_done();
+ }
+
+ return 0;
+}
+
+int RGWRemoteDataLog::read_shard_status(int shard_id, set<string>& pending_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries)
+{
+ // cannot run concurrently with run_sync(), so run in a separate manager
+ RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
+ RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
+ int ret = http_manager.set_threaded();
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "failed in http_manager.start() ret=" << ret << dendl;
+ return ret;
+ }
+ RGWDataSyncEnv sync_env_local = sync_env;
+ sync_env_local.http_manager = &http_manager;
+ list<RGWCoroutinesStack *> stacks;
+ RGWCoroutinesStack* recovering_stack = new RGWCoroutinesStack(store->ctx(), &crs);
+ recovering_stack->call(new RGWReadRecoveringBucketShardsCoroutine(&sync_env_local, shard_id, recovering_buckets, max_entries));
+ stacks.push_back(recovering_stack);
+ RGWCoroutinesStack* pending_stack = new RGWCoroutinesStack(store->ctx(), &crs);
+ pending_stack->call(new RGWReadPendingBucketShardsCoroutine(&sync_env_local, shard_id, pending_buckets, sync_marker, max_entries));
+ stacks.push_back(pending_stack);
+ ret = crs.run(stacks);
+ http_manager.stop();
+ return ret;
+}
+
RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
{
return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status);
int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result);
int read_sync_status(rgw_data_sync_status *sync_status);
int read_recovering_shards(const int num_shards, set<int>& recovering_shards);
+ int read_shard_status(int shard_id, set<string>& lagging_buckets,set<string>& recovering_buckets, rgw_data_sync_marker* sync_marker, const int max_entries);
int init_sync_status(int num_shards);
int run_sync(int num_shards);
return source_log.read_recovering_shards(num_shards, recovering_shards);
}
+ int read_shard_status(int shard_id, set<string>& lagging_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries) {
+ return source_log.read_shard_status(shard_id, lagging_buckets, recovering_buckets,sync_marker, max_entries);
+ }
int init_sync_status() { return source_log.init_sync_status(num_shards); }
int read_log_info(rgw_datalog_info *log_info) {