From 624a3fa05c0421486f8f83884f6e756715994d24 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 31 Jul 2015 13:20:50 -0700 Subject: [PATCH] rgw: initial work on sync status marker Also, rename radosgw-admin mdlog sync to radosgw-admin mdlog-fetch. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_admin.cc | 18 +++++ src/rgw/rgw_sync.cc | 188 +++++++++++++++++++++++++++++++++++++++++++ src/rgw/rgw_sync.h | 3 +- 3 files changed, 208 insertions(+), 1 deletion(-) diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index f506d0f904f5c..9ed9e7027274a 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -268,6 +268,7 @@ enum { OPT_METADATA_LIST, OPT_MDLOG_LIST, OPT_MDLOG_TRIM, + OPT_MDLOG_FETCH, OPT_MDLOG_SYNC, OPT_BILOG_LIST, OPT_BILOG_TRIM, @@ -2890,6 +2891,23 @@ next: } } + if (opt_cmd == OPT_MDLOG_FETCH) { + RGWMetadataSync sync(store); + + int ret = sync.init(); + if (ret < 0) { + cerr << "ERROR: sync.init() returned ret=" << ret << std::endl; + return -ret; + } + + ret = sync.fetch(); + if (ret < 0) { + cerr << "ERROR: sync.clone_shards() returned ret=" << ret << std::endl; + return -ret; + } + + } + if (opt_cmd == OPT_MDLOG_SYNC) { RGWMetadataSync sync(store); diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 1d82a9daaa08c..0cbb402c6e996 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -11,6 +11,8 @@ #define dout_subsys ceph_subsys_rgw +static string mdlog_sync_status_oid = "mdlog.sync-status"; + void rgw_mdlog_info::decode_json(JSONObj *obj) { JSONDecoder::decode_json("num_objects", num_shards, obj); } @@ -180,6 +182,181 @@ static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg) #define CLONE_MAX_ENTRIES 100 #define CLONE_OPS_WINDOW 16 + +struct RGWMetaSyncStatus { + uint32_t num_shards; + + enum StateOptions { + StateInit = 0, + StateBuildingFullSyncMaps = 1, + StateFullSync = 2, + StateIncrementalSync = 3, + }; + + struct sync_marker { + int state; + string marker; + + sync_marker() : state((int)StateInit) {} + }; + map markers; + +}; + +class RGWSyncStatusStore { + RGWRados *store; + librados::IoCtx ioctx; + + RGWMetaSyncStatus status; + int read_status(); +public: + RGWSyncStatusStore(RGWRados *_store) : store(_store) {} + + int init(); +}; + +#if 0 +int RGWSyncStatusStore::read_status() +{ + string marker; + +#define MAX_OMAP_ENTRIES 100 + do { + map vals; + int r = ioctx.omap_get_vals(mdlog_sync_status_oid, marker, MAX_OMAP_ENTRIES, &vals); + + if (r < 0) { + return r; + } + if (vals.size() != MAX_OMAP_ENTRIES) { + break; + } + + for (map::iterator miter = vals.begin(); miter != vals.end(); ++miter) { + const string& k = miter->first; + bufferlist& bl = miter->second; + + bufferlist::iterator iter = bl.begin(); + try { + ::decode(s, iter); + status.set_entry(shard_id, s); + } catch (buffer::error& err) { + ldout(store->ctx(), 0) << "ERROR: failed to decode entry for k=" << k << dendl; + } + } + } while (true); + + return 0; +} +#endif + +int RGWSyncStatusStore::init() +{ + const char *log_pool = store->get_zone_params().log_pool.name.c_str(); + librados::Rados *rados = store->get_rados_handle(); + int r = rados->ioctx_create(log_pool, ioctx); + if (r < 0) { + lderr(store->ctx()) << "ERROR: failed to open log pool (" << store->get_zone_params().log_pool.name << " ret=" << r << dendl; + return r; + } + +#if 0 + r = read_status(); + if (r < 0) { + return r; + } +#endif + + return 0; +} + +class RGWMetaSyncOp : public RGWAsyncOp { + RGWRados *store; + RGWMetadataLog *mdlog; + RGWHTTPManager *http_manager; + RGWSyncStatusStore sync_store; + + int shard_id; + string marker; + + int max_entries; + + RGWRESTReadResource *http_op; + + enum State { + Init = 0, + ReadSyncStatus = 1, + ReadSyncStatusComplete = 2, + Done = 100, + Error = 200, + } state; + + int set_state(State s, int ret = 0) { + state = s; + return ret; + } +public: + RGWMetaSyncOp(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncOpsManager *_ops_mgr, + int _id) : RGWAsyncOp(_ops_mgr), store(_store), + mdlog(store->meta_mgr->get_log()), + http_manager(_mgr), sync_store(_store), + shard_id(_id), + max_entries(CLONE_MAX_ENTRIES), + http_op(NULL), + state(RGWMetaSyncOp::Init) {} + + int operate(); + + int state_init(); + int state_read_sync_status(); + int state_read_sync_status_complete(); + + bool is_done() { return (state == Done || state == Error); } + bool is_error() { return (state == Error); } +}; + +int RGWMetaSyncOp::operate() +{ + switch (state) { + case Init: + ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": init request" << dendl; + return state_init(); + case ReadSyncStatus: + ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status" << dendl; + return state_read_sync_status(); + case ReadSyncStatusComplete: + ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status complete" << dendl; + return state_read_sync_status_complete(); + case Done: + ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": done" << dendl; + break; + case Error: + ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": error" << dendl; + break; + } + + return 0; +} + +int RGWMetaSyncOp::state_init() +{ + int ret = sync_store.init(); + if (ret < 0) { + return set_state(Error, ret); + } + return set_state(ReadSyncStatus); +} + +int RGWMetaSyncOp::state_read_sync_status() +{ + return 0; +} + +int RGWMetaSyncOp::state_read_sync_status_complete() +{ + return 0; +} + class RGWCloneMetaLogOp : public RGWAsyncOp { RGWRados *store; RGWMetadataLog *mdlog; @@ -314,6 +491,17 @@ int RGWRemoteMetaLog::clone_shards() return run(ops); } +int RGWRemoteMetaLog::fetch() +{ + list ops; + for (int i = 0; i < (int)log_info.num_shards; i++) { + RGWCloneMetaLogOp *op = new RGWCloneMetaLogOp(store, &http_manager, this, i, clone_markers[i]); + ops.push_back(op); + } + + return run(ops); +} + int RGWCloneMetaLogOp::operate() { switch (state) { diff --git a/src/rgw/rgw_sync.h b/src/rgw/rgw_sync.h index 428f5718a5443..d4f6e7536c8df 100644 --- a/src/rgw/rgw_sync.h +++ b/src/rgw/rgw_sync.h @@ -113,8 +113,8 @@ public: int list_shard(int shard_id); int list_shards(); int get_shard_info(int shard_id); - int clone_shard(int shard_id, const string& marker, string *new_marker, bool *truncated); int clone_shards(); + int fetch(); }; class RGWMetadataSync { @@ -126,6 +126,7 @@ public: int init(); + int fetch() { return master_log.fetch(); } int clone_shards() { return master_log.clone_shards(); } }; -- 2.39.5