From: lvshanchun Date: Thu, 28 Dec 2017 06:39:39 +0000 (+0800) Subject: rgw: add RGWReadDataSyncRecoveringShardsCR to read recovering shards X-Git-Tag: v12.2.6~131^2~9 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3460f4c9122f5d9ddf52a03eb345d75f5511b7d7;p=ceph.git rgw: add RGWReadDataSyncRecoveringShardsCR to read recovering shards Signed-off-by: lvshanchun (cherry picked from commit f4de5b58e27f320e8eed6a72684ad7621a7c1110) --- diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index e24bd40cd9a5..8e557ef00197 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -152,6 +152,40 @@ bool RGWReadDataSyncStatusMarkersCR::spawn_next() return true; } +class RGWReadDataSyncRecoveringShardsCR : public RGWShardCollectCR { + static constexpr int MAX_CONCURRENT_SHARDS = 16; + + RGWDataSyncEnv *env; + + uint64_t max_entries; + int num_shards; + int shard_id{0};; + + string marker; + map> &entries_map; + + public: + RGWReadDataSyncRecoveringShardsCR(RGWDataSyncEnv *env, uint64_t _max_entries, int _num_shards, + map>& _entries_map) + : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), env(env), + max_entries(_max_entries), num_shards(_num_shards), entries_map(_entries_map) + {} + bool spawn_next() override; +}; + +bool RGWReadDataSyncRecoveringShardsCR::spawn_next() +{ + if (shard_id > num_shards) + return false; + + string error_oid = RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id) + ".retry"; + spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->get_zone_params().log_pool, error_oid), + marker, &entries_map[shard_id], max_entries), false); + + ++shard_id; + return true; +} + class RGWReadDataSyncStatusCoroutine : public RGWCoroutine { RGWDataSyncEnv *sync_env; rgw_data_sync_status *sync_status; @@ -654,6 +688,34 @@ int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status) return ret; } +int RGWRemoteDataLog::read_recovering_shards(const int num_shards, set& recovering_shards) +{ + // cannot run concurrently with run_sync(), so run in a separate manager + RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry()); + RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr()); + int ret = http_manager.set_threaded(); + if (ret < 0) { + ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl; + return ret; + } + RGWDataSyncEnv sync_env_local = sync_env; + sync_env_local.http_manager = &http_manager; + map> entries_map; + uint64_t max_entries{1}; + ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, entries_map)); + http_manager.stop(); + + if (ret == 0) { + for (const auto& entry : entries_map) { + if (entry.second.size() != 0) { + recovering_shards.insert(entry.first); + } + } + } + + return ret; +} + int RGWRemoteDataLog::init_sync_status(int num_shards) { rgw_data_sync_status sync_status; diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 8e876b9c97d7..4792ad9794ae 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -274,6 +274,7 @@ public: int read_source_log_shards_info(map *shards_info); int read_source_log_shards_next(map shard_markers, map *result); int read_sync_status(rgw_data_sync_status *sync_status); + int read_recovering_shards(const int num_shards, set& recovering_shards); int init_sync_status(int num_shards); int run_sync(int num_shards); @@ -323,6 +324,11 @@ public: int read_sync_status(rgw_data_sync_status *sync_status) { return source_log.read_sync_status(sync_status); } + + int read_recovering_shards(const int num_shards, set& recovering_shards) { + return source_log.read_recovering_shards(num_shards, recovering_shards); + } + int init_sync_status() { return source_log.init_sync_status(num_shards); } int read_log_info(rgw_datalog_info *log_info) {