static string mdlog_sync_status_oid = "mdlog.sync-status";
static string mdlog_sync_status_shard_prefix = "mdlog.sync-status.shard";
+static string mdlog_sync_full_sync_index_prefix = "meta.full-sync.index";
void rgw_mdlog_info::decode_json(JSONObj *obj) {
JSONDecoder::decode_json("num_objects", num_shards, obj);
};
class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine {
- RGWAsyncRadosProcessor *async_rados;
RGWRados *store;
map<string, bufferlist> entries;
RGWAioCompletionNotifier *cn;
public:
- RGWRadosSetOmapKeysCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ RGWRadosSetOmapKeysCR(RGWRados *_store,
rgw_bucket& _pool, const string& _oid,
map<string, bufferlist>& _entries) : RGWSimpleCoroutine(_store->ctx()),
- async_rados(_async_rados),
store(_store),
entries(_entries),
pool(_pool), oid(_oid), cn(NULL) {
};
class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine {
- RGWAsyncRadosProcessor *async_rados;
RGWRados *store;
string marker;
int max_entries;
int rval;
+ librados::IoCtx ioctx;
rgw_bucket pool;
string oid;
RGWAioCompletionNotifier *cn;
public:
- RGWRadosGetOmapKeysCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ RGWRadosGetOmapKeysCR(RGWRados *_store,
rgw_bucket& _pool, const string& _oid,
const string& _marker,
map<string, bufferlist> *_entries, int _max_entries) : RGWSimpleCoroutine(_store->ctx()),
- async_rados(_async_rados),
store(_store),
marker(_marker),
entries(_entries), max_entries(_max_entries), rval(0),
}
int send_request() {
- librados::IoCtx ioctx;
librados::Rados *rados = store->get_rados_handle();
int r = rados->ioctx_create(pool.name.c_str(), ioctx); /* system object only! */
if (r < 0) {
}
}
if (entries.size() >= OMAP_APPEND_MAX_ENTRIES || going_down) {
- call(new RGWRadosSetOmapKeysCR(async_rados, store, pool, oid, entries));
+ call(new RGWRadosSetOmapKeysCR(store, pool, oid, entries));
entries.clear();
}
}
reenter(this) {
entries_index = new RGWShardedOmapCRManager(async_rados, store, this, num_shards,
- store->get_zone_params().log_pool, "meta.full-sync.index");
+ store->get_zone_params().log_pool, mdlog_sync_full_sync_index_prefix);
yield {
call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), conn, http_manager,
"/admin/metadata", NULL, §ions));
}
};
+static string full_sync_index_shard_oid(int shard_id)
+{
+ char buf[mdlog_sync_full_sync_index_prefix.size() + 16];
+ snprintf(buf, sizeof(buf), "%s.%d", mdlog_sync_full_sync_index_prefix.c_str(), shard_id);
+ return string(buf);
+}
+
+class RGWMetaSyncShardCR : public RGWCoroutine {
+ RGWRados *store;
+ RGWHTTPManager *http_manager;
+ RGWAsyncRadosProcessor *async_rados;
+
+ rgw_bucket pool;
+
+ uint32_t shard_id;
+ rgw_meta_sync_marker sync_marker;
+
+ map<string, bufferlist> entries;
+ map<string, bufferlist>::iterator iter;
+
+ string oid;
+
+public:
+ RGWMetaSyncShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
+ rgw_bucket& _pool,
+ uint32_t _shard_id, rgw_meta_sync_marker& _marker) : RGWCoroutine(_store->ctx()), store(_store),
+ http_manager(_mgr),
+ async_rados(_async_rados),
+ pool(_pool),
+ shard_id(_shard_id),
+ sync_marker(_marker) {
+ }
+
+ int operate() {
+ RGWRESTConn *conn = store->rest_master_conn;
+
+#define OMAP_GET_MAX_ENTRIES 100
+ int max_entries = OMAP_GET_MAX_ENTRIES;
+ reenter(this) {
+ if (sync_marker.state == rgw_meta_sync_marker::FullSync) {
+ oid = full_sync_index_shard_oid(shard_id);
+ do {
+ yield call(new RGWRadosGetOmapKeysCR(store, pool, oid, sync_marker.marker, &entries, max_entries));
+ if (retcode < 0) {
+ ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
+ return set_state(RGWCoroutine_Error, retcode);
+ }
+ iter = entries.begin();
+ yield {
+ for (; iter != entries.end(); ++iter) {
+ ldout(store->ctx(), 20) << __func__ << ": full sync: " << iter->first << dendl;
+ // fetch remote
+ // write local
+ // update shard marker
+ }
+ }
+ } while (entries.size() == max_entries);
+ // update shard state
+ return set_state(RGWCoroutine_Done, 0);
+ } else if (sync_marker.state == rgw_meta_sync_marker::IncrementalSync) {
+ int r = incremental_sync();
+ } else {
+ return set_state(RGWCoroutine_Error, -EIO);
+ }
+ }
+ return 0;
+ }
+
+ int incremental_sync() {
+ /* TODO */
+ return -ENOTSUP;
+ }
+};
+
+class RGWMetaSyncCR : public RGWCoroutine {
+ RGWRados *store;
+ RGWHTTPManager *http_manager;
+ RGWAsyncRadosProcessor *async_rados;
+
+ rgw_meta_sync_status sync_status;
+
+
+public:
+ RGWMetaSyncCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, rgw_meta_sync_status& _sync_status) : RGWCoroutine(_store->ctx()), store(_store),
+ http_manager(_mgr),
+ async_rados(_async_rados),
+ sync_status(_sync_status) {
+ }
+
+ int operate() {
+ RGWRESTConn *conn = store->rest_master_conn;
+
+ reenter(this) {
+ yield {
+ map<uint32_t, rgw_meta_sync_marker>::iterator iter = sync_status.sync_markers.begin();
+ for (; iter != sync_status.sync_markers.end(); ++iter) {
+ uint32_t shard_id = iter->first;
+ rgw_meta_sync_marker marker;
+ spawn(new RGWMetaSyncShardCR(store, http_manager, async_rados, store->get_zone_params().log_pool,
+ shard_id,
+ sync_status.sync_markers[shard_id]), true);
+ }
+ }
+ yield return set_state(RGWCoroutine_Done);
+ }
+ return 0;
+ }
+};
+
class RGWCloneMetaLogCoroutine : public RGWCoroutine {
RGWRados *store;
RGWMetadataLog *mdlog;
/* fall through */
case rgw_meta_sync_info::StateSync:
ldout(store->ctx(), 20) << __func__ << "(): sync" << dendl;
+ r = run(new RGWMetaSyncCR(store, &http_manager, async_rados, sync_status));
+ if (r < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to fetch all metadata keys" << dendl;
+ return r;
+ }
break;
default:
ldout(store->ctx(), 0) << "ERROR: bad sync state!" << dendl;