}
class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
+ static constexpr uint32_t lock_duration = 30;
RGWDataSyncEnv *sync_env;
-
RGWRados *store;
+ const rgw_pool& pool;
+ const uint32_t num_shards;
string sync_status_oid;
string lock_name;
string cookie;
- rgw_data_sync_info status;
+ rgw_data_sync_status *status;
map<int, RGWDataChangesLogInfo> shards_info;
public:
- RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
- uint32_t _num_shards) : RGWCoroutine(_sync_env->cct),
- sync_env(_sync_env), store(sync_env->store) {
+ RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, uint32_t num_shards,
+ rgw_data_sync_status *status)
+ : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), store(sync_env->store),
+ pool(store->get_zone_params().log_pool),
+ num_shards(num_shards), status(status) {
lock_name = "sync_lock";
- status.num_shards = _num_shards;
#define COOKIE_LEN 16
char buf[COOKIE_LEN + 1];
int operate() override {
int ret;
reenter(this) {
- yield {
- uint32_t lock_duration = 30;
- call(new RGWSimpleRadosLockCR(sync_env->async_rados, store,
- rgw_raw_obj(store->get_zone_params().log_pool, sync_status_oid),
- lock_name, cookie, lock_duration));
- if (retcode < 0) {
- ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
- return set_cr_error(retcode);
- }
+ using LockCR = RGWSimpleRadosLockCR;
+ yield call(new LockCR(sync_env->async_rados, store,
+ rgw_raw_obj{pool, sync_status_oid},
+ lock_name, cookie, lock_duration));
+ if (retcode < 0) {
+ ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
+ return set_cr_error(retcode);
}
- yield {
- call(new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados,
- store,
- rgw_raw_obj(store->get_zone_params().log_pool, sync_status_oid),
- status));
- }
- yield { /* take lock again, we just recreated the object */
- uint32_t lock_duration = 30;
- call(new RGWSimpleRadosLockCR(sync_env->async_rados,
- store,
- rgw_raw_obj(store->get_zone_params().log_pool, sync_status_oid),
- lock_name, cookie, lock_duration));
- if (retcode < 0) {
- ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
- return set_cr_error(retcode);
- }
+ using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
+ yield call(new WriteInfoCR(sync_env->async_rados, store,
+ rgw_raw_obj{pool, sync_status_oid},
+ status->sync_info));
+ if (retcode < 0) {
+ ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl;
+ return set_cr_error(retcode);
+ }
+
+ /* take lock again, we just recreated the object */
+ yield call(new LockCR(sync_env->async_rados, store,
+ rgw_raw_obj{pool, sync_status_oid},
+ lock_name, cookie, lock_duration));
+ if (retcode < 0) {
+ ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
+ return set_cr_error(retcode);
}
+
/* fetch current position in logs */
yield {
RGWRESTConn *conn = store->get_zone_conn_by_id(sync_env->source_zone);
ldout(cct, 0) << "ERROR: connection to zone " << sync_env->source_zone << " does not exist!" << dendl;
return set_cr_error(-EIO);
}
- for (int i = 0; i < (int)status.num_shards; i++) {
+ for (uint32_t i = 0; i < num_shards; i++) {
spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, i, &shards_info[i]), true);
- }
+ }
}
while (collect(&ret, NULL)) {
- if (ret < 0) {
- return set_state(RGWCoroutine_Error);
- }
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: failed to read remote data log shards" << dendl;
+ return set_state(RGWCoroutine_Error);
+ }
yield;
}
yield {
- for (int i = 0; i < (int)status.num_shards; i++) {
- rgw_data_sync_marker marker;
+ for (uint32_t i = 0; i < num_shards; i++) {
RGWDataChangesLogInfo& info = shards_info[i];
- marker.next_step_marker = info.marker;
- marker.timestamp = info.last_update;
- spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
- rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i)),
- marker), true);
+ auto& marker = status->sync_markers[i];
+ marker.next_step_marker = info.marker;
+ marker.timestamp = info.last_update;
+ const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i);
+ using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_data_sync_marker>;
+ spawn(new WriteMarkerCR(sync_env->async_rados, store,
+ rgw_raw_obj{pool, oid}, marker), true);
}
}
- yield {
- status.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
- call(new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, store,
- rgw_raw_obj(store->get_zone_params().log_pool, sync_status_oid),
- status));
- }
- yield { /* unlock */
- call(new RGWSimpleRadosUnlockCR(sync_env->async_rados,
- store,
- rgw_raw_obj(store->get_zone_params().log_pool, sync_status_oid),
- lock_name, cookie));
- }
while (collect(&ret, NULL)) {
- if (ret < 0) {
- return set_state(RGWCoroutine_Error);
- }
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: failed to write data sync status markers" << dendl;
+ return set_state(RGWCoroutine_Error);
+ }
yield;
}
- drain_all();
+
+ status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
+ yield call(new WriteInfoCR(sync_env->async_rados, store,
+ rgw_raw_obj{pool, sync_status_oid},
+ status->sync_info));
+ if (retcode < 0) {
+ ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl;
+ return set_cr_error(retcode);
+ }
+ yield call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store,
+ rgw_raw_obj{pool, sync_status_oid},
+ lock_name, cookie));
return set_cr_done();
}
return 0;
int RGWRemoteDataLog::init_sync_status(int num_shards)
{
+ rgw_data_sync_status sync_status;
RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
int ret = http_manager.set_threaded();
}
RGWDataSyncEnv sync_env_local = sync_env;
sync_env_local.http_manager = &http_manager;
- ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards));
+ ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards, &sync_status));
http_manager.stop();
return ret;
}
/* state: init status */
if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
ldout(sync_env->cct, 20) << __func__ << "(): init" << dendl;
- yield call(new RGWInitDataSyncStatusCoroutine(sync_env, sync_status.sync_info.num_shards));
+ yield call(new RGWInitDataSyncStatusCoroutine(sync_env, num_shards, &sync_status));
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl;
return set_cr_error(retcode);
}
- sync_status.sync_info.num_shards = num_shards;
- sync_status.sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
- /* update new state */
- yield call(set_sync_info_cr());
-
- if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
- return set_cr_error(retcode);
- }
+ // sets state = StateBuildingFullSyncMaps
*reset_backoff = true;
}