]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add RGWReadDataSyncRecoveringShardsCR to read recovering shards
authorlvshanchun <lvshanchun@gmail.com>
Thu, 28 Dec 2017 06:39:39 +0000 (14:39 +0800)
committerCasey Bodley <cbodley@redhat.com>
Wed, 9 May 2018 17:04:54 +0000 (13:04 -0400)
Signed-off-by: lvshanchun <lvshanchun@gmail.com>
(cherry picked from commit f4de5b58e27f320e8eed6a72684ad7621a7c1110)

src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h

index e24bd40cd9a57557f893c6bb5a48d528428d0f70..8e557ef00197aa94a120c24f57f3f6beb7c8b13b 100644 (file)
@@ -152,6 +152,40 @@ bool RGWReadDataSyncStatusMarkersCR::spawn_next()
   return true;
 }
 
+class RGWReadDataSyncRecoveringShardsCR : public RGWShardCollectCR {
+  static constexpr int MAX_CONCURRENT_SHARDS = 16;
+
+  RGWDataSyncEnv *env;
+
+  uint64_t max_entries;
+  int num_shards;
+  int shard_id{0};;
+
+  string marker;
+  map<int, std::set<std::string>> &entries_map;
+
+ public:
+  RGWReadDataSyncRecoveringShardsCR(RGWDataSyncEnv *env, uint64_t _max_entries, int _num_shards,
+      map<int, std::set<std::string>>& _entries_map)
+    : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), env(env),
+    max_entries(_max_entries), num_shards(_num_shards), entries_map(_entries_map)
+  {}
+  bool spawn_next() override;
+};
+
+bool RGWReadDataSyncRecoveringShardsCR::spawn_next()
+{
+  if (shard_id > num_shards)
+    return false;
+  string error_oid = RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id) + ".retry";
+  spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->get_zone_params().log_pool, error_oid),
+                                  marker, &entries_map[shard_id], max_entries), false);
+
+  ++shard_id;
+  return true;
+}
+
 class RGWReadDataSyncStatusCoroutine : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
   rgw_data_sync_status *sync_status;
@@ -654,6 +688,34 @@ int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status)
   return ret;
 }
 
+int RGWRemoteDataLog::read_recovering_shards(const int num_shards, set<int>& recovering_shards)
+{
+  // cannot run concurrently with run_sync(), so run in a separate manager
+  RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
+  RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
+  int ret = http_manager.set_threaded();
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
+    return ret;
+  }
+  RGWDataSyncEnv sync_env_local = sync_env;
+  sync_env_local.http_manager = &http_manager;
+  map<int, std::set<std::string>> entries_map;
+  uint64_t max_entries{1};
+  ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, entries_map));
+  http_manager.stop();
+
+  if (ret == 0) {
+    for (const auto& entry : entries_map) {
+      if (entry.second.size() != 0) {
+        recovering_shards.insert(entry.first);
+      }
+    }
+  }
+
+  return ret;
+}
+
 int RGWRemoteDataLog::init_sync_status(int num_shards)
 {
   rgw_data_sync_status sync_status;
index 8e876b9c97d77a0c66f026414fb686b7ecf79fd1..4792ad9794ae08d1cc8646221d171a312693f7b9 100644 (file)
@@ -274,6 +274,7 @@ public:
   int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info);
   int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result);
   int read_sync_status(rgw_data_sync_status *sync_status);
+  int read_recovering_shards(const int num_shards, set<int>& recovering_shards);
   int init_sync_status(int num_shards);
   int run_sync(int num_shards);
 
@@ -323,6 +324,11 @@ public:
   int read_sync_status(rgw_data_sync_status *sync_status) {
     return source_log.read_sync_status(sync_status);
   }
+
+  int read_recovering_shards(const int num_shards, set<int>& recovering_shards) {
+    return source_log.read_recovering_shards(num_shards, recovering_shards);
+  }
+
   int init_sync_status() { return source_log.init_sync_status(num_shards); }
 
   int read_log_info(rgw_datalog_info *log_info) {