From 528aff10a3ad662a77aa11d9cd64ede0ada0bd2a Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 28 Aug 2015 19:27:06 -0700 Subject: [PATCH] rgw: switch to incremental sync when done full meta sync each shard need to collect the coroutines it spawned before continuing. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_coroutine.cc | 9 ++++++ src/rgw/rgw_sync.cc | 62 +++++++++++++++++++++++++++++++++------- src/rgw/rgw_sync.h | 2 +- 3 files changed, 61 insertions(+), 12 deletions(-) diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc index 447e17e60c832..a92a51ccbe517 100644 --- a/src/rgw/rgw_coroutine.cc +++ b/src/rgw/rgw_coroutine.cc @@ -72,6 +72,9 @@ string RGWCoroutinesStack::error_str() } int RGWCoroutinesStack::call(RGWCoroutine *next_op, int ret) { + if (!next_op) { + return ret; + } ops.push_back(next_op); if (pos != ops.end()) { ++pos; @@ -83,6 +86,9 @@ int RGWCoroutinesStack::call(RGWCoroutine *next_op, int ret) { void RGWCoroutinesStack::spawn(RGWCoroutine *source_op, RGWCoroutine *op, bool wait) { + if (!op) { + return; + } op->get(); rgw_spawned_stacks *s = (source_op ? &source_op->spawned : &spawned); @@ -284,6 +290,9 @@ int RGWCoroutinesManager::run(list& stacks) int RGWCoroutinesManager::run(RGWCoroutine *op) { + if (!op) { + return 0; + } list stacks; RGWCoroutinesStack *stack = allocate_stack(); op->get(); diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index a91f431bc0aa7..65519ed2788d3 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -1197,7 +1197,7 @@ public: pending[key] = true; } - int finish(const string& key) { + RGWCoroutine *finish(const string& key) { assert(!pending.empty()); map::iterator iter = pending.begin(); @@ -1214,17 +1214,17 @@ public: if (key == first_key && (updates_since_flush >= META_SYNC_UPDATE_MARKER_WINDOW || pending.empty())) { return update_marker(high_marker); } - return 0; + return NULL; } - int update_marker(const string& new_marker) { + RGWCoroutine *update_marker(const string& new_marker) { sync_marker.marker = new_marker; updates_since_flush = 0; ldout(store->ctx(), 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl; - return cr->call(new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, - marker_oid, sync_marker)); + return new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, + marker_oid, sync_marker); } }; @@ -1274,7 +1274,7 @@ public: sync_status = retcode; yield { /* update marker */ - int ret = marker_tracker->finish(raw_key); + int ret = call(marker_tracker->finish(raw_key)); if (ret < 0) { ldout(store->ctx(), 0) << "ERROR: marker_tracker->finish(" << raw_key << ") returned ret=" << ret << dendl; return set_state(RGWCoroutine_Error, sync_status); @@ -1322,19 +1322,29 @@ public: marker_tracker(NULL) { } + ~RGWMetaSyncShardCR() { + delete marker_tracker; + } + + void set_marker_tracker(RGWMetaSyncShardMarkerTrack *mt) { + delete marker_tracker; + marker_tracker = mt; + } + int operate() { RGWRESTConn *conn = store->rest_master_conn; string section; string key; + int ret; #define OMAP_GET_MAX_ENTRIES 100 int max_entries = OMAP_GET_MAX_ENTRIES; reenter(this) { if (sync_marker.state == rgw_meta_sync_marker::FullSync) { oid = full_sync_index_shard_oid(shard_id); - marker_tracker = new RGWMetaSyncShardMarkerTrack(store, http_manager, async_rados, this, + set_marker_tracker(new RGWMetaSyncShardMarkerTrack(store, http_manager, async_rados, this, RGWMetaSyncStatusManager::shard_obj_name(shard_id), - sync_marker); + sync_marker)); do { yield return call(new RGWRadosGetOmapKeysCR(store, pool, oid, sync_marker.marker, &entries, max_entries)); if (retcode < 0) { @@ -1350,12 +1360,37 @@ public: if (retcode < 0) { return set_state(RGWCoroutine_Error, retcode); } + sync_marker.marker = iter->first; } } while ((int)entries.size() == max_entries); + + /* wait for all operations to complete */ + while (collect(&ret)) { + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: a sync operation returned error" << dendl; + /* we should have reported this error */ +#warning deal with error + } + yield; + } + + yield { + /* update marker to reflect we're done with full sync */ + sync_marker.state = rgw_meta_sync_marker::IncrementalSync; + sync_marker.marker.clear(); + call(new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, + RGWMetaSyncStatusManager::shard_obj_name(shard_id), sync_marker)); + } + if (retcode < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl; + return set_state(RGWCoroutine_Error, retcode); + } + goto incremental_sync; // update shard state - return set_state(RGWCoroutine_Done, 0); } else if (sync_marker.state == rgw_meta_sync_marker::IncrementalSync) { +incremental_sync: int r = incremental_sync(); + return set_state(RGWCoroutine_Done, 0); } else { return set_state(RGWCoroutine_Error, -EIO); } @@ -1489,6 +1524,12 @@ int RGWRemoteMetaLog::init_sync_status(int num_shards) return run(new RGWInitSyncStatusCoroutine(async_rados, store, obj_ctx, num_shards)); } +int RGWRemoteMetaLog::set_sync_info(const rgw_meta_sync_info& sync_info) +{ + return run(new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, + mdlog_sync_status_oid, sync_info)); +} + int RGWRemoteMetaLog::run_sync(int num_shards, rgw_meta_sync_status& sync_status) { RGWObjectCtx obj_ctx(store, NULL); @@ -1513,8 +1554,7 @@ int RGWRemoteMetaLog::run_sync(int num_shards, rgw_meta_sync_status& sync_status } sync_status.sync_info.state = rgw_meta_sync_info::StateSync; - r = run(new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, - mdlog_sync_status_oid, sync_status.sync_info)); + r = set_sync_info(sync_status.sync_info); if (r < 0) { ldout(store->ctx(), 0) << "ERROR: failed to update sync status" << dendl; return r; diff --git a/src/rgw/rgw_sync.h b/src/rgw/rgw_sync.h index 147eb51f07d0b..698f270d7fe8c 100644 --- a/src/rgw/rgw_sync.h +++ b/src/rgw/rgw_sync.h @@ -148,6 +148,7 @@ public: int fetch(int num_shards, vector& clone_markers); int read_sync_status(rgw_meta_sync_status *sync_status); int init_sync_status(int num_shards); + int set_sync_info(const rgw_meta_sync_info& sync_info); int run_sync(int num_shards, rgw_meta_sync_status& sync_status); }; @@ -187,7 +188,6 @@ class RGWMetaSyncStatusManager { public: RGWMetaSyncStatusManager(RGWRados *_store) : store(_store), master_log(store, this), num_shards(0), ts_to_shard_lock("ts_to_shard_lock") {} - int init(); rgw_meta_sync_status& get_sync_status() { return sync_status; } -- 2.39.5