From: lvshanchun Date: Mon, 5 Feb 2018 12:43:39 +0000 (+0800) Subject: rgw: read behind bucket shards of a specified data log shard X-Git-Tag: v12.2.6~131^2~7 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=91899871f4681864971246f33011d0cf480cf10f;p=ceph.git rgw: read behind bucket shards of a specified data log shard add read_shard_status to read pending bucket shards, recovering bucket shards and sync marker Signed-off-by: lvshanchun (cherry picked from commit 9c5ff46ad9c849eb0c99a4d78be3f9dd2299779c) Conflicts: rgw_data_sync.cc (make_move_iterator fails to build with gcc<5) --- diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 8e557ef00197..e4f37267e1c8 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1998,6 +1998,171 @@ int RGWReadBucketSyncStatusCoroutine::operate() } return 0; } + +#define OMAP_READ_MAX_ENTRIES 10 +class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine { + RGWDataSyncEnv *sync_env; + RGWRados *store; + + const int shard_id; + int max_entries; + + set& recovering_buckets; + string marker; + string error_oid; + + set error_entries; + int max_omap_entries; + int count; + +public: + RGWReadRecoveringBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id, + set& _recovering_buckets, const int _max_entries) + : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries), + recovering_buckets(_recovering_buckets), max_omap_entries(OMAP_READ_MAX_ENTRIES) + { + error_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id) + ".retry"; + } + + int operate() override; +}; + +int RGWReadRecoveringBucketShardsCoroutine::operate() +{ + reenter(this){ + //read recovering bucket shards + count = 0; + do { + yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->get_zone_params().log_pool, error_oid), + marker, &error_entries, max_omap_entries)); + + if (retcode == -ENOENT) { + break; + } + + if (retcode < 0) { + ldout(sync_env->cct, 0) << "failed to read recovering bucket shards with " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + + if (error_entries.empty()) { + break; + } + + count += error_entries.size(); + marker = *error_entries.rbegin(); + recovering_buckets.insert(error_entries.begin(), error_entries.end()); + }while((int)error_entries.size() == max_omap_entries && count < max_entries); + + return set_cr_done(); + } + + return 0; +} + +class RGWReadPendingBucketShardsCoroutine : public RGWCoroutine { + RGWDataSyncEnv *sync_env; + RGWRados *store; + + const int shard_id; + int max_entries; + + set& pending_buckets; + string marker; + string status_oid; + + rgw_data_sync_marker* sync_marker; + int count; + + list log_entries; + bool truncated; + +public: + RGWReadPendingBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id, + set& _pending_buckets, + rgw_data_sync_marker* _sync_marker, const int _max_entries) + : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries), + pending_buckets(_pending_buckets), sync_marker(_sync_marker) + { + status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id); + } + + int operate() override; +}; + +int RGWReadPendingBucketShardsCoroutine::operate() +{ + reenter(this){ + //read sync status marker + using CR = RGWSimpleRadosReadCR; + yield call(new CR(sync_env->async_rados, store, + rgw_raw_obj(store->get_zone_params().log_pool, status_oid), + sync_marker)); + if (retcode < 0) { + ldout(sync_env->cct,0) << "failed to read sync status marker with " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + + //read pending bucket shards + marker = sync_marker->marker; + count = 0; + do{ + yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &marker, &log_entries, &truncated)); + + if (retcode == -ENOENT) { + break; + } + + if (retcode < 0) { + ldout(sync_env->cct,0) << "failed to read remote data log info with " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + + if (log_entries.empty()) { + break; + } + + count += log_entries.size(); + for (const auto& entry : log_entries) { + pending_buckets.insert(entry.entry.key); + } + }while(truncated && count < max_entries); + + return set_cr_done(); + } + + return 0; +} + +int RGWRemoteDataLog::read_shard_status(int shard_id, set& pending_buckets, set& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries) +{ + // cannot run concurrently with run_sync(), so run in a separate manager + RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry()); + RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr()); + int ret = http_manager.set_threaded(); + if (ret < 0) { + ldout(store->ctx(), 0) << "failed in http_manager.start() ret=" << ret << dendl; + return ret; + } + RGWDataSyncEnv sync_env_local = sync_env; + sync_env_local.http_manager = &http_manager; + list stacks; + RGWCoroutinesStack* recovering_stack = new RGWCoroutinesStack(store->ctx(), &crs); + recovering_stack->call(new RGWReadRecoveringBucketShardsCoroutine(&sync_env_local, shard_id, recovering_buckets, max_entries)); + stacks.push_back(recovering_stack); + RGWCoroutinesStack* pending_stack = new RGWCoroutinesStack(store->ctx(), &crs); + pending_stack->call(new RGWReadPendingBucketShardsCoroutine(&sync_env_local, shard_id, pending_buckets, sync_marker, max_entries)); + stacks.push_back(pending_stack); + ret = crs.run(stacks); + http_manager.stop(); + return ret; +} + RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status) { return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status); diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 4792ad9794ae..51354a6f6be4 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -275,6 +275,7 @@ public: int read_source_log_shards_next(map shard_markers, map *result); int read_sync_status(rgw_data_sync_status *sync_status); int read_recovering_shards(const int num_shards, set& recovering_shards); + int read_shard_status(int shard_id, set& lagging_buckets,set& recovering_buckets, rgw_data_sync_marker* sync_marker, const int max_entries); int init_sync_status(int num_shards); int run_sync(int num_shards); @@ -329,6 +330,9 @@ public: return source_log.read_recovering_shards(num_shards, recovering_shards); } + int read_shard_status(int shard_id, set& lagging_buckets, set& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries) { + return source_log.read_shard_status(shard_id, lagging_buckets, recovering_buckets,sync_marker, max_entries); + } int init_sync_status() { return source_log.init_sync_status(num_shards); } int read_log_info(rgw_datalog_info *log_info) {