JSONDecoder::decode_json("entries", entries, obj);
};
-class RGWReadDataSyncStatusCoroutine : public RGWSimpleRadosReadCR<rgw_data_sync_info> {
- RGWDataSyncEnv *sync_env;
+class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR {
+ static constexpr int MAX_CONCURRENT_SHARDS = 16;
+
+ RGWDataSyncEnv *env;
+ const int num_shards;
+ int shard_id{0};
+ map<uint32_t, rgw_data_sync_marker>& markers;
+
+ public:
+ RGWReadDataSyncStatusMarkersCR(RGWDataSyncEnv *env, int num_shards,
+ map<uint32_t, rgw_data_sync_marker>& markers)
+ : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS),
+ env(env), num_shards(num_shards), markers(markers)
+ {}
+ bool spawn_next() override;
+};
+
+bool RGWReadDataSyncStatusMarkersCR::spawn_next()
+{
+ if (shard_id >= num_shards) {
+ return false;
+ }
+ using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
+ spawn(new CR(env->async_rados, env->store, env->store->get_zone_params().log_pool,
+ RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id),
+ &markers[shard_id]),
+ false);
+ shard_id++;
+ return true;
+}
+class RGWReadDataSyncStatusCoroutine : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
rgw_data_sync_status *sync_status;
public:
RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
- rgw_data_sync_status *_status) : RGWSimpleRadosReadCR(_sync_env->async_rados, _sync_env->store,
- _sync_env->store->get_zone_params().log_pool,
- RGWDataSyncStatusManager::sync_status_oid(_sync_env->source_zone),
- &_status->sync_info),
- sync_env(_sync_env),
- sync_status(_status) {}
-
- int handle_data(rgw_data_sync_info& data);
+ rgw_data_sync_status *_status)
+ : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status)
+ {}
+ int operate() override;
};
-int RGWReadDataSyncStatusCoroutine::handle_data(rgw_data_sync_info& data)
+int RGWReadDataSyncStatusCoroutine::operate()
{
- if (retcode == -ENOENT) {
- return retcode;
- }
-
- map<uint32_t, rgw_data_sync_marker>& markers = sync_status->sync_markers;
- RGWRados *store = sync_env->store;
- for (int i = 0; i < (int)data.num_shards; i++) {
- spawn(new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, store, store->get_zone_params().log_pool,
- RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i), &markers[i]), true);
+ reenter(this) {
+ // read sync info
+ using ReadInfoCR = RGWSimpleRadosReadCR<rgw_data_sync_info>;
+ yield {
+ bool empty_on_enoent = false; // fail on ENOENT
+ call(new ReadInfoCR(sync_env->async_rados, sync_env->store,
+ sync_env->store->get_zone_params().log_pool,
+ RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone),
+ &sync_status->sync_info, empty_on_enoent));
+ }
+ if (retcode < 0) {
+ ldout(sync_env->cct, 4) << "failed to read sync status info with "
+ << cpp_strerror(retcode) << dendl;
+ return set_cr_error(retcode);
+ }
+ // read shard markers
+ using ReadMarkersCR = RGWReadDataSyncStatusMarkersCR;
+ yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards,
+ sync_status->sync_markers));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 4) << "failed to read sync status markers with "
+ << cpp_strerror(retcode) << dendl;
+ return set_cr_error(retcode);
+ }
+ return set_cr_done();
}
return 0;
}