#define dout_subsys ceph_subsys_rgw
-static string datalog_sync_status_oid = "datalog.sync-status";
+static string datalog_sync_status_oid_prefix = "datalog.sync-status";
static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard";
static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
static string bucket_status_oid_prefix = "bucket.sync-status";
RGWObjectCtx& _obj_ctx, const string& _source_zone,
rgw_data_sync_status *_status) : RGWSimpleRadosReadCR(_async_rados, _store, _obj_ctx,
_store->get_zone_params().log_pool,
- datalog_sync_status_oid,
+ RGWDataSyncStatusManager::sync_status_oid(_source_zone),
&_status->sync_info),
async_rados(_async_rados), store(_store),
obj_ctx(_obj_ctx), source_zone(_source_zone),
RGWObjectCtx& obj_ctx;
string source_zone;
+ string sync_status_oid;
+
string lock_name;
string cookie;
rgw_data_sync_info status;
gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
string cookie = buf;
+
+ sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(source_zone);
}
int operate() {
reenter(this) {
yield {
uint32_t lock_duration = 30;
- call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, datalog_sync_status_oid,
+ call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
lock_name, cookie, lock_duration));
if (retcode < 0) {
- ldout(cct, 0) << "ERROR: failed to take a lock on " << datalog_sync_status_oid << dendl;
+ ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
return set_state(RGWCoroutine_Error, retcode);
}
}
yield {
call(new RGWSimpleRadosWriteCR<rgw_data_sync_info>(async_rados, store, store->get_zone_params().log_pool,
- datalog_sync_status_oid, status));
+ sync_status_oid, status));
}
yield { /* take lock again, we just recreated the object */
uint32_t lock_duration = 30;
- call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, datalog_sync_status_oid,
+ call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
lock_name, cookie, lock_duration));
if (retcode < 0) {
- ldout(cct, 0) << "ERROR: failed to take a lock on " << datalog_sync_status_oid << dendl;
+ ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
return set_state(RGWCoroutine_Error, retcode);
}
}
yield {
status.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
call(new RGWSimpleRadosWriteCR<rgw_data_sync_info>(async_rados, store, store->get_zone_params().log_pool,
- datalog_sync_status_oid, status));
+ sync_status_oid, status));
}
yield { /* unlock */
- call(new RGWSimpleRadosUnlockCR(async_rados, store, store->get_zone_params().log_pool, datalog_sync_status_oid,
+ call(new RGWSimpleRadosUnlockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
lock_name, cookie));
}
while (collect(&ret)) {
return run(new RGWInitDataSyncStatusCoroutine(async_rados, store, &http_manager, obj_ctx, source_zone, num_shards));
}
-int RGWRemoteDataLog::set_sync_info(const rgw_data_sync_info& sync_info)
-{
- return run(new RGWSimpleRadosWriteCR<rgw_data_sync_info>(async_rados, store, store->get_zone_params().log_pool,
- datalog_sync_status_oid, sync_info));
-}
-
static string full_data_sync_index_shard_oid(const string& source_zone, int shard_id)
{
char buf[datalog_sync_full_sync_index_prefix.size() + 16];
}
int full_sync() {
- int ret;
-
#define OMAP_GET_MAX_ENTRIES 100
int max_entries = OMAP_GET_MAX_ENTRIES;
reenter(&full_cr) {
/* TODO */
return 0;
#endif
+ return 0;
+ }
+};
+
+class RGWDataSyncCR : public RGWCoroutine {
+ RGWRados *store;
+ RGWHTTPManager *http_manager;
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRESTConn *conn;
+ string source_zone;
+
+ RGWObjectCtx obj_ctx;
+
+ rgw_data_sync_status sync_status;
+
+ RGWDataSyncShardMarkerTrack *marker_tracker;
+
+
+public:
+ RGWDataSyncCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
+ RGWRESTConn *_conn, rgw_bucket& _pool, const string& _source_zone) : RGWCoroutine(_store->ctx()), store(_store),
+ http_manager(_mgr),
+ async_rados(_async_rados),
+ conn(_conn),
+ source_zone(_source_zone),
+ obj_ctx(store),
+ marker_tracker(NULL) {
+ }
+
+ int operate() {
+ reenter(this) {
+ int r;
+
+ yield {
+ /* read sync status */
+ r = call(new RGWReadDataSyncStatusCoroutine(async_rados, store, obj_ctx, source_zone, &sync_status));
+ if (r < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to call RGWReadDataSyncStatusCoroutine r=" << r << dendl;
+ return set_state(RGWCoroutine_Error, r);
+ }
+ }
+
+ if (retcode < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status, retcode=" << retcode << dendl;
+ return set_state(RGWCoroutine_Error, retcode);
+ }
+
+ yield {
+ /* state: init status */
+ if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
+ ldout(store->ctx(), 20) << __func__ << "(): init" << dendl;
+ r = call(new RGWInitDataSyncStatusCoroutine(async_rados, store, http_manager, obj_ctx, source_zone, sync_status.sync_info.num_shards));
+ if (r < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to call RGWReadDataSyncStatusCoroutine r=" << r << dendl;
+ return set_state(RGWCoroutine_Error, r);
+ }
+ sync_status.sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
+ /* update new state */
+ yield {
+ r = call(set_sync_info_cr());
+ if (r < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to write sync status" << dendl;
+ return r;
+ }
+ }
+ }
+ }
+
+ if (retcode < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl;
+ return set_state(RGWCoroutine_Error, retcode);
+ }
+
+ if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
+ /* state: building full sync maps */
+ yield {
+ ldout(store->ctx(), 20) << __func__ << "(): building full sync maps" << dendl;
+ r = call(new RGWListBucketIndexesCR(store, http_manager, async_rados, conn, source_zone, sync_status.sync_info.num_shards));
+ if (r < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to call RGWListBucketIndexesCR r=" << r << dendl;
+ return set_state(RGWCoroutine_Error, r);
+ }
+ }
+ sync_status.sync_info.state = rgw_data_sync_info::StateSync;
+ /* update new state */
+ yield {
+ r = call(set_sync_info_cr());
+ if (r < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to write sync status" << dendl;
+ return r;
+ }
+ }
+ }
+ if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
+#if 0
+ case rgw_data_sync_info::StateSync:
+ for (int i = 0; i < num_shards; i++) {
+ RGWCoroutine *cr = new RGWDataSyncShardCR(store, &http_manager, async_rados,
+ conn, store->get_zone_params().log_pool, source_zone,
+ i, rgw_data_sync_marker& _marker)
+ }
+#endif
+#warning FIXME
+ }
+
+ return set_state(RGWCoroutine_Done, 0);
+ }
+ return 0;
+ }
+
+ RGWCoroutine *set_sync_info_cr() {
+ return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(async_rados, store, store->get_zone_params().log_pool,
+ RGWDataSyncStatusManager::sync_status_oid(source_zone),
+ sync_status.sync_info);
}
};
{
RGWObjectCtx obj_ctx(store, NULL);
- int r = run(new RGWReadDataSyncStatusCoroutine(async_rados, store, obj_ctx, source_zone, &sync_status));
+ int r = run(new RGWDataSyncCR(store, &http_manager, async_rados, conn, store->get_zone_params().log_pool, source_zone));
if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status" << dendl;
+ ldout(store->ctx(), 0) << "ERROR: failed to run sync" << dendl;
return r;
}
- switch ((rgw_data_sync_info::SyncState)sync_status.sync_info.state) {
- case rgw_data_sync_info::StateInit:
- ldout(store->ctx(), 20) << __func__ << "(): init" << dendl;
- r = run(new RGWInitDataSyncStatusCoroutine(async_rados, store, &http_manager, obj_ctx, source_zone, num_shards));
- /* fall through */
- case rgw_data_sync_info::StateBuildingFullSyncMaps:
- ldout(store->ctx(), 20) << __func__ << "(): building full sync maps" << dendl;
- r = run(new RGWListBucketIndexesCR(store, &http_manager, async_rados, conn, source_zone, num_shards));
- sync_status.sync_info.state = rgw_data_sync_info::StateSync;
- r = set_sync_info(sync_status.sync_info);
- if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to update sync status" << dendl;
- return r;
- }
- /* fall through */
- case rgw_data_sync_info::StateSync:
-#warning FIXME
- break;
- default:
- ldout(store->ctx(), 0) << "ERROR: bad sync state!" << dendl;
- return -EIO;
- }
-
return 0;
}
return r;
}
- source_status_obj = rgw_obj(store->get_zone_params().log_pool, datalog_sync_status_oid);
+ source_status_obj = rgw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(source_zone));
r = source_log.init(source_zone, conn);
if (r < 0) {
return 0;
}
+string RGWDataSyncStatusManager::sync_status_oid(const string& source_zone)
+{
+ char buf[datalog_sync_status_oid_prefix.size() + source_zone.size() + 16];
+ snprintf(buf, sizeof(buf), "%s.%s", datalog_sync_status_oid_prefix.c_str(), source_zone.c_str());
+
+ return string(buf);
+}
+
string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int shard_id)
{
char buf[datalog_sync_status_shard_prefix.size() + source_zone.size() + 16];