return true;
}
+class RGWReadDataSyncRecoveringShardsCR : public RGWShardCollectCR {
+ static constexpr int MAX_CONCURRENT_SHARDS = 16;
+
+ RGWDataSyncEnv *env;
+
+ uint64_t max_entries;
+ int num_shards;
+ int shard_id{0};;
+
+ string marker;
+ map<int, std::set<std::string>> &entries_map;
+
+ public:
+ RGWReadDataSyncRecoveringShardsCR(RGWDataSyncEnv *env, uint64_t _max_entries, int _num_shards,
+ map<int, std::set<std::string>>& _entries_map)
+ : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), env(env),
+ max_entries(_max_entries), num_shards(_num_shards), entries_map(_entries_map)
+ {}
+ bool spawn_next() override;
+};
+
+bool RGWReadDataSyncRecoveringShardsCR::spawn_next()
+{
+ if (shard_id > num_shards)
+ return false;
+
+ string error_oid = RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id) + ".retry";
+ spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->get_zone_params().log_pool, error_oid),
+ marker, &entries_map[shard_id], max_entries), false);
+
+ ++shard_id;
+ return true;
+}
+
class RGWReadDataSyncStatusCoroutine : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
rgw_data_sync_status *sync_status;
return ret;
}
+int RGWRemoteDataLog::read_recovering_shards(const int num_shards, set<int>& recovering_shards)
+{
+ // cannot run concurrently with run_sync(), so run in a separate manager
+ RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
+ RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
+ int ret = http_manager.set_threaded();
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
+ return ret;
+ }
+ RGWDataSyncEnv sync_env_local = sync_env;
+ sync_env_local.http_manager = &http_manager;
+ map<int, std::set<std::string>> entries_map;
+ uint64_t max_entries{1};
+ ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, entries_map));
+ http_manager.stop();
+
+ if (ret == 0) {
+ for (const auto& entry : entries_map) {
+ if (entry.second.size() != 0) {
+ recovering_shards.insert(entry.first);
+ }
+ }
+ }
+
+ return ret;
+}
+
int RGWRemoteDataLog::init_sync_status(int num_shards)
{
rgw_data_sync_status sync_status;
int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info);
int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result);
int read_sync_status(rgw_data_sync_status *sync_status);
+ int read_recovering_shards(const int num_shards, set<int>& recovering_shards);
int init_sync_status(int num_shards);
int run_sync(int num_shards);
int read_sync_status(rgw_data_sync_status *sync_status) {
return source_log.read_sync_status(sync_status);
}
+
+ int read_recovering_shards(const int num_shards, set<int>& recovering_shards) {
+ return source_log.read_recovering_shards(num_shards, recovering_shards);
+ }
+
int init_sync_status() { return source_log.init_sync_status(num_shards); }
int read_log_info(rgw_datalog_info *log_info) {