From fe38269bea5941c85d0435f31c8308e2badbf8ac Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Wed, 2 Sep 2015 16:22:23 -0700 Subject: [PATCH] rgw: track mdlog marker sync entries if the sync marker is < mdlog marker, otherwise try to fetch more entries. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_sync.cc | 45 +++++++++++++++++++++++++++++++++------------ 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 342f709d7c17e..18937cbb283bf 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -1531,6 +1531,7 @@ class RGWCloneMetaLogCoroutine : public RGWCoroutine { int shard_id; string marker; bool truncated; + string *new_marker; int max_entries; @@ -1544,12 +1545,17 @@ class RGWCloneMetaLogCoroutine : public RGWCoroutine { public: RGWCloneMetaLogCoroutine(RGWRados *_store, RGWHTTPManager *_mgr, - int _id, const string& _marker) : RGWCoroutine(_store->ctx()), store(_store), + int _id, const string& _marker, string *_new_marker) : RGWCoroutine(_store->ctx()), store(_store), mdlog(store->meta_mgr->get_log()), http_manager(_mgr), shard_id(_id), - marker(_marker), truncated(false), max_entries(CLONE_MAX_ENTRIES), + marker(_marker), truncated(false), new_marker(_new_marker), + max_entries(CLONE_MAX_ENTRIES), http_op(NULL), md_op_notifier(NULL), - req_ret(0) {} + req_ret(0) { + if (new_marker) { + *new_marker = marker; + } + } int operate(); @@ -1582,6 +1588,8 @@ class RGWMetaSyncShardCR : public RGWCoroutine { list log_entries; bool truncated; + string mdlog_marker; + Mutex inc_lock; Cond inc_cond; @@ -1683,18 +1691,27 @@ public: int incremental_sync() { reenter(&incremental_cr) { + mdlog_marker = sync_marker.marker; do { #define INCREMENTAL_MAX_ENTRIES 100 - yield call(new RGWReadMDLogEntriesCR(async_rados, store, shard_id, &sync_marker.marker, INCREMENTAL_MAX_ENTRIES, &log_entries, &truncated)); - if (log_entries.size() < INCREMENTAL_MAX_ENTRIES) { - ldout(store->ctx(), 20) << __func__ << ": syncing mdlog for shard_id=" << shard_id << dendl; - yield call(new RGWCloneMetaLogCoroutine(store, http_manager, shard_id, sync_marker.marker)); + ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl; + if (mdlog_marker <= sync_marker.marker) { + /* we're at the tip, try to bring more entries */ + ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " syncing mdlog for shard_id=" << shard_id << dendl; + yield call(new RGWCloneMetaLogCoroutine(store, http_manager, shard_id, mdlog_marker, &mdlog_marker)); } - for (list::iterator iter = log_entries.begin(); iter != log_entries.end(); ++iter) { - ldout(store->ctx(), 20) << __func__ << ": log_entry: " << iter->id << ":" << iter->section << ":" << iter->name << ":" << iter->timestamp << dendl; + ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl; + if (mdlog_marker > sync_marker.marker) { + yield call(new RGWReadMDLogEntriesCR(async_rados, store, shard_id, &sync_marker.marker, INCREMENTAL_MAX_ENTRIES, &log_entries, &truncated)); + for (list::iterator iter = log_entries.begin(); iter != log_entries.end(); ++iter) { + ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << iter->id << ":" << iter->section << ":" << iter->name << ":" << iter->timestamp << dendl; + } } + ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl; + if (mdlog_marker == sync_marker.marker) { #define INCREMENTAL_INTERVAL 20 - yield call(new RGWWaitCR(async_rados, store->ctx(), &inc_lock, &inc_cond, INCREMENTAL_INTERVAL)); + yield call(new RGWWaitCR(async_rados, store->ctx(), &inc_lock, &inc_cond, INCREMENTAL_INTERVAL)); + } } while (true); } /* TODO */ @@ -1740,7 +1757,7 @@ int RGWRemoteMetaLog::clone_shards(int num_shards, vector& clone_markers list stacks; for (int i = 0; i < (int)num_shards; i++) { RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), this); - int r = stack->call(new RGWCloneMetaLogCoroutine(store, &http_manager, i, clone_markers[i])); + int r = stack->call(new RGWCloneMetaLogCoroutine(store, &http_manager, i, clone_markers[i], NULL)); if (r < 0) { ldout(store->ctx(), 0) << "ERROR: stack->call() returned r=" << r << dendl; return r; @@ -1757,7 +1774,7 @@ int RGWRemoteMetaLog::fetch(int num_shards, vector& clone_markers) list stacks; for (int i = 0; i < (int)num_shards; i++) { RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), this); - int r = stack->call(new RGWCloneMetaLogCoroutine(store, &http_manager, i, clone_markers[i])); + int r = stack->call(new RGWCloneMetaLogCoroutine(store, &http_manager, i, clone_markers[i], NULL)); if (r < 0) { ldout(store->ctx(), 0) << "ERROR: stack->call() returned r=" << r << dendl; return r; @@ -1950,6 +1967,10 @@ int RGWCloneMetaLogCoroutine::state_receive_rest_response() return set_state(RGWCoroutine_Done); } + if (new_marker) { + *new_marker = data.entries.back().id; + } + return 0; } -- 2.39.5