]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: read behind bucket shards of a specified data log shard
authorlvshanchun <lvshanchun@gmail.com>
Mon, 5 Feb 2018 12:43:39 +0000 (20:43 +0800)
committerCasey Bodley <cbodley@redhat.com>
Mon, 14 May 2018 17:26:32 +0000 (13:26 -0400)
add read_shard_status to read pending bucket shards,
recovering bucket shards and sync marker

Signed-off-by: lvshanchun <lvshanchun@gmail.com>
(cherry picked from commit 9c5ff46ad9c849eb0c99a4d78be3f9dd2299779c)

Conflicts:
rgw_data_sync.cc (make_move_iterator fails to build with gcc<5)

src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h

index 8e557ef00197aa94a120c24f57f3f6beb7c8b13b..e4f37267e1c8d763517fd9e6124189bcbdc2662f 100644 (file)
@@ -1998,6 +1998,171 @@ int RGWReadBucketSyncStatusCoroutine::operate()
   }
   return 0;
 }
+
+#define OMAP_READ_MAX_ENTRIES 10
+class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine {
+  RGWDataSyncEnv *sync_env;
+  RGWRados *store;
+  
+  const int shard_id;
+  int max_entries;
+
+  set<string>& recovering_buckets;
+  string marker;
+  string error_oid;
+
+  set<string> error_entries;
+  int max_omap_entries;
+  int count;
+
+public:
+  RGWReadRecoveringBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id,
+                                      set<string>& _recovering_buckets, const int _max_entries) 
+  : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+  store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
+  recovering_buckets(_recovering_buckets), max_omap_entries(OMAP_READ_MAX_ENTRIES)
+  {
+    error_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id) + ".retry";
+  }
+
+  int operate() override;
+};
+
+int RGWReadRecoveringBucketShardsCoroutine::operate()
+{
+  reenter(this){
+    //read recovering bucket shards
+    count = 0;
+    do {
+      yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->get_zone_params().log_pool, error_oid), 
+            marker, &error_entries, max_omap_entries));
+
+      if (retcode == -ENOENT) {
+        break;
+      }
+
+      if (retcode < 0) {
+        ldout(sync_env->cct, 0) << "failed to read recovering bucket shards with " 
+          << cpp_strerror(retcode) << dendl;
+        return set_cr_error(retcode);
+      }
+
+      if (error_entries.empty()) {
+        break;
+      }
+
+      count += error_entries.size();
+      marker = *error_entries.rbegin();
+      recovering_buckets.insert(error_entries.begin(), error_entries.end());
+    }while((int)error_entries.size() == max_omap_entries && count < max_entries);
+  
+    return set_cr_done();
+  }
+
+  return 0;
+}
+
+class RGWReadPendingBucketShardsCoroutine : public RGWCoroutine {
+  RGWDataSyncEnv *sync_env;
+  RGWRados *store;
+  
+  const int shard_id;
+  int max_entries;
+
+  set<string>& pending_buckets;
+  string marker;
+  string status_oid;
+
+  rgw_data_sync_marker* sync_marker;
+  int count;
+
+  list<rgw_data_change_log_entry> log_entries;
+  bool truncated;
+
+public:
+  RGWReadPendingBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id,
+                                      set<string>& _pending_buckets,
+                                      rgw_data_sync_marker* _sync_marker, const int _max_entries) 
+  : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+  store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
+  pending_buckets(_pending_buckets), sync_marker(_sync_marker)
+  {
+    status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
+  }
+
+  int operate() override;
+};
+
+int RGWReadPendingBucketShardsCoroutine::operate()
+{
+  reenter(this){
+    //read sync status marker
+    using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
+    yield call(new CR(sync_env->async_rados, store, 
+                      rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
+                      sync_marker));
+    if (retcode < 0) {
+      ldout(sync_env->cct,0) << "failed to read sync status marker with " 
+        << cpp_strerror(retcode) << dendl;
+      return set_cr_error(retcode);
+    }
+
+    //read pending bucket shards
+    marker = sync_marker->marker;
+    count = 0;
+    do{
+      yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &marker, &log_entries, &truncated));
+
+      if (retcode == -ENOENT) {
+        break;
+      }
+
+      if (retcode < 0) {
+        ldout(sync_env->cct,0) << "failed to read remote data log info with " 
+          << cpp_strerror(retcode) << dendl;
+        return set_cr_error(retcode);
+      }
+
+      if (log_entries.empty()) {
+        break;
+      }
+
+      count += log_entries.size();
+      for (const auto& entry : log_entries) {
+        pending_buckets.insert(entry.entry.key);
+      }
+    }while(truncated && count < max_entries);
+
+    return set_cr_done();
+  }
+
+  return 0;
+}
+
+int RGWRemoteDataLog::read_shard_status(int shard_id, set<string>& pending_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries)
+{
+  // cannot run concurrently with run_sync(), so run in a separate manager
+  RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
+  RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
+  int ret = http_manager.set_threaded();
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "failed in http_manager.start() ret=" << ret << dendl;
+    return ret;
+  }
+  RGWDataSyncEnv sync_env_local = sync_env;
+  sync_env_local.http_manager = &http_manager;
+  list<RGWCoroutinesStack *> stacks;
+  RGWCoroutinesStack* recovering_stack = new RGWCoroutinesStack(store->ctx(), &crs);
+  recovering_stack->call(new RGWReadRecoveringBucketShardsCoroutine(&sync_env_local, shard_id, recovering_buckets, max_entries));
+  stacks.push_back(recovering_stack);
+  RGWCoroutinesStack* pending_stack = new RGWCoroutinesStack(store->ctx(), &crs);
+  pending_stack->call(new RGWReadPendingBucketShardsCoroutine(&sync_env_local, shard_id, pending_buckets, sync_marker, max_entries));
+  stacks.push_back(pending_stack);
+  ret = crs.run(stacks);
+  http_manager.stop();
+  return ret;
+}
+
 RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
 {
   return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status);
index 4792ad9794ae08d1cc8646221d171a312693f7b9..51354a6f6be4493bb9fa131495d294d7f572c62a 100644 (file)
@@ -275,6 +275,7 @@ public:
   int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result);
   int read_sync_status(rgw_data_sync_status *sync_status);
   int read_recovering_shards(const int num_shards, set<int>& recovering_shards);
+  int read_shard_status(int shard_id, set<string>& lagging_buckets,set<string>& recovering_buckets, rgw_data_sync_marker* sync_marker, const int max_entries);
   int init_sync_status(int num_shards);
   int run_sync(int num_shards);
 
@@ -329,6 +330,9 @@ public:
     return source_log.read_recovering_shards(num_shards, recovering_shards);
   }
 
+  int read_shard_status(int shard_id, set<string>& lagging_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries) {
+    return source_log.read_shard_status(shard_id, lagging_buckets, recovering_buckets,sync_marker, max_entries);
+  }
   int init_sync_status() { return source_log.init_sync_status(num_shards); }
 
   int read_log_info(rgw_datalog_info *log_info) {