From 727db6860b97169f3154011db1656a92ccef925d Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 21 Sep 2015 16:34:25 -0700 Subject: [PATCH] rgw: window of full sync operations per bucket also store marker periodically, reuse marker logic from md sync. Marker still needs some work. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_cr_rados.cc | 4 +- src/rgw/rgw_cr_rados.h | 16 ++-- src/rgw/rgw_data_sync.cc | 170 ++++++++++++++++++++++++++++++++++----- src/rgw/rgw_data_sync.h | 20 ++--- src/rgw/rgw_json_enc.cc | 6 ++ src/rgw/rgw_sync.cc | 51 +++--------- src/rgw/rgw_sync.h | 47 +++++++++++ 7 files changed, 230 insertions(+), 84 deletions(-) diff --git a/src/rgw/rgw_cr_rados.cc b/src/rgw/rgw_cr_rados.cc index c508001033e78..542bbefc75623 100644 --- a/src/rgw/rgw_cr_rados.cc +++ b/src/rgw/rgw_cr_rados.cc @@ -371,8 +371,8 @@ int RGWAsyncFetchRemoteObj::_send_request() string op_id = store->unique_id(store->get_new_req_id()); map attrs; - rgw_obj src_obj(bucket_info.bucket, obj_name); - src_obj.set_instance(obj_version_id); + rgw_obj src_obj(bucket_info.bucket, key.name); + src_obj.set_instance(key.instance); rgw_obj dest_obj(src_obj); diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h index 5efdd7d238611..b068daab658e1 100644 --- a/src/rgw/rgw_cr_rados.h +++ b/src/rgw/rgw_cr_rados.h @@ -489,8 +489,7 @@ class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest { RGWBucketInfo bucket_info; - string obj_name; - string obj_version_id; + rgw_obj_key key; uint64_t versioned_epoch; time_t src_mtime; @@ -503,12 +502,12 @@ public: RGWAsyncFetchRemoteObj(RGWAioCompletionNotifier *cn, RGWRados *_store, const string& _source_zone, RGWBucketInfo& _bucket_info, - const string& _obj_name, const string& _version_id, + const rgw_obj_key& _key, uint64_t _versioned_epoch, bool _if_newer) : RGWAsyncRadosRequest(cn), store(_store), source_zone(_source_zone), bucket_info(_bucket_info), - obj_name(_obj_name), obj_version_id(_version_id), + key(_key), versioned_epoch(_versioned_epoch), copy_if_newer(_if_newer) {} }; @@ -521,8 +520,7 @@ class RGWFetchRemoteObjCR : public RGWSimpleCoroutine { RGWBucketInfo bucket_info; - string obj_name; - string obj_version_id; + rgw_obj_key key; uint64_t versioned_epoch; time_t src_mtime; @@ -535,13 +533,13 @@ public: RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, const string& _source_zone, RGWBucketInfo& _bucket_info, - const string& _obj_name, const string& _version_id, + const rgw_obj_key& _key, uint64_t _versioned_epoch, bool _if_newer) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()), async_rados(_async_rados), store(_store), source_zone(_source_zone), bucket_info(_bucket_info), - obj_name(_obj_name), obj_version_id(_version_id), + key(_key), versioned_epoch(_versioned_epoch), copy_if_newer(_if_newer), req(NULL) {} @@ -552,7 +550,7 @@ public: int send_request() { req = new RGWAsyncFetchRemoteObj(stack->create_completion_notifier(), store, source_zone, bucket_info, - obj_name, obj_version_id, versioned_epoch, copy_if_newer); + key, versioned_epoch, copy_if_newer); async_rados->queue(req); return 0; } diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 249a0e8650664..3d5ee388c880f 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -6,6 +6,7 @@ #include "rgw_common.h" #include "rgw_rados.h" +#include "rgw_sync.h" #include "rgw_data_sync.h" #include "rgw_rest_conn.h" #include "rgw_cr_rados.h" @@ -698,7 +699,7 @@ public: } yield { status.state = rgw_bucket_shard_sync_info::StateFullSync; - status.marker.incremental = info.max_marker; + status.marker.incremental_marker = info.max_marker; call(new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid, status)); } @@ -736,6 +737,17 @@ RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(RGWObjectCtx& obj_ctx, rgw bucket_name, bucket_id, shard_id, sync_status); } +class RGWWriteBucketSyncStatusCoroutine : public RGWSimpleRadosWriteCR { + +public: + RGWWriteBucketSyncStatusCoroutine(RGWAsyncRadosProcessor *async_rados, RGWRados *store, + const string& source_zone, const string& bucket_name, const string bucket_id, int shard_id, + rgw_bucket_shard_sync_info& status) : RGWSimpleRadosWriteCR(async_rados, store, + store->get_zone_params().log_pool, + RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id), + status) {} +}; + RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() { for (map::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) { delete iter->second; @@ -755,8 +767,7 @@ struct bucket_entry_owner { struct bucket_list_entry { bool delete_marker; - string key; - string version_id; + rgw_obj_key key; bool is_latest; utime_t mtime; string etag; @@ -770,8 +781,8 @@ struct bucket_list_entry { void decode_json(JSONObj *obj) { JSONDecoder::decode_json("IsDeleteMarker", delete_marker, obj); - JSONDecoder::decode_json("Key", key, obj); - JSONDecoder::decode_json("VersionId", version_id, obj); + JSONDecoder::decode_json("Key", key.name, obj); + JSONDecoder::decode_json("VersionId", key.instance, obj); JSONDecoder::decode_json("IsLatest", is_latest, obj); string mtime_str; JSONDecoder::decode_json("LastModified", mtime_str, obj); @@ -857,8 +868,8 @@ public: { "versions" , NULL }, { "format" , "json" }, { "objs-container" , "true" }, - { "key-marker" , marker.key.c_str() }, - { "version-id-marker" , marker.ver.c_str() }, + { "key-marker" , marker.full_marker.name.c_str() }, + { "version-id-marker" , marker.full_marker.instance.c_str() }, { NULL, NULL } }; string p = string("/") + bucket_name; @@ -876,6 +887,103 @@ public: } }; +#define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10 + +class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack { + RGWRados *store; + RGWAsyncRadosProcessor *async_rados; + + string marker_oid; + rgw_bucket_shard_sync_marker sync_marker; + + +public: + RGWBucketFullSyncShardMarkerTrack(RGWRados *_store, RGWAsyncRadosProcessor *_async_rados, + const string& _marker_oid, + const rgw_bucket_shard_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW), + store(_store), + async_rados(_async_rados), + marker_oid(_marker_oid), + sync_marker(_marker) {} + + RGWCoroutine *store_marker(const rgw_obj_key& new_marker) { + sync_marker.full_marker = new_marker; + + ldout(store->ctx(), 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl; + return new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, + marker_oid, sync_marker); + } +}; + +class RGWBucketSyncSingleEntryCR : public RGWCoroutine { + RGWRados *store; + RGWAsyncRadosProcessor *async_rados; + + string source_zone; + RGWBucketInfo *bucket_info; + int shard_id; + + rgw_obj_key key; + uint64_t versioned_epoch; + + rgw_obj_key entry_marker; + RGWBucketFullSyncShardMarkerTrack *marker_tracker; + + int sync_status; + + +public: + RGWBucketSyncSingleEntryCR(RGWRados *_store, RGWAsyncRadosProcessor *_async_rados, + const string& _source_zone, RGWBucketInfo *_bucket_info, int _shard_id, + const rgw_obj_key& _key, uint64_t _versioned_epoch, + const rgw_obj_key& _entry_marker, RGWBucketFullSyncShardMarkerTrack *_marker_tracker) : RGWCoroutine(_store->ctx()), store(_store), + async_rados(_async_rados), + source_zone(_source_zone), + bucket_info(_bucket_info), shard_id(_shard_id), + key(_key), + entry_marker(_entry_marker), + marker_tracker(_marker_tracker) { + + } + + int operate() { + reenter(this) { + yield { + int r = call(new RGWFetchRemoteObjCR(async_rados, store, source_zone, *bucket_info, + key, versioned_epoch, + true)); + if (r < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to call RGWFetchRemoteObjCR()" << dendl; + return r; + } + } + if (retcode < 0 && retcode != -ENOENT) { + rgw_bucket& bucket = bucket_info->bucket; + ldout(store->ctx(), 0) << "ERROR: failed to sync object: " << bucket.name << ":" << bucket.bucket_id << ":" << shard_id << "/" << key << dendl; + sync_status = retcode; + } + yield { + /* update marker */ + int ret = call(marker_tracker->finish(entry_marker)); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: marker_tracker->finish(" << entry_marker << ") returned ret=" << ret << dendl; + return set_state(RGWCoroutine_Error, sync_status); + } + } + if (sync_status == 0) { + sync_status = retcode; + } + if (sync_status < 0) { + return set_state(RGWCoroutine_Error, sync_status); + } + return set_state(RGWCoroutine_Done, 0); + } + return 0; + } +}; + +#define BUCKET_SYNC_SPAWN_WINDOW 20 + class RGWRunBucketSyncCoroutine : public RGWCoroutine { RGWHTTPManager *http_manager; RGWAsyncRadosProcessor *async_rados; @@ -890,6 +998,9 @@ class RGWRunBucketSyncCoroutine : public RGWCoroutine { bucket_list_result list_result; list::iterator entries_iter; rgw_bucket_shard_sync_info sync_status; + RGWBucketFullSyncShardMarkerTrack *marker_tracker; + int spawn_window; + int pending; public: RGWRunBucketSyncCoroutine(RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, @@ -900,8 +1011,13 @@ public: store(_store), obj_ctx(_obj_ctx), source_zone(_source_zone), bucket_name(_bucket_name), - bucket_id(_bucket_id), shard_id(_shard_id) {} + bucket_id(_bucket_id), shard_id(_shard_id), + marker_tracker(NULL), + spawn_window(BUCKET_SYNC_SPAWN_WINDOW), pending(0) {} + ~RGWRunBucketSyncCoroutine() { + delete marker_tracker; + } int operate(); }; @@ -909,6 +1025,7 @@ public: int RGWRunBucketSyncCoroutine::operate() { reenter(this) { + int ret; yield { int r = call(new RGWReadBucketSyncStatusCoroutine(async_rados, store, obj_ctx, source_zone, bucket_name, bucket_id, shard_id, &sync_status)); if (r < 0) { @@ -949,25 +1066,40 @@ int RGWRunBucketSyncCoroutine::operate() if (retcode < 0 && retcode != -ENOENT) { return set_state(RGWCoroutine_Error, retcode); } + marker_tracker = new RGWBucketFullSyncShardMarkerTrack(store, async_rados, + RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id), + sync_status.marker); entries_iter = list_result.entries.begin(); for (; entries_iter != list_result.entries.end(); ++entries_iter) { - ldout(store->ctx(), 20) << "[full sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << entries_iter->key << " [" << entries_iter->version_id << "]" << dendl; + ldout(store->ctx(), 20) << "[full sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << entries_iter->key << dendl; yield { bucket_list_entry& entry = *entries_iter; - int r = call(new RGWFetchRemoteObjCR(async_rados, store, source_zone, bucket_info, - entry.key, entry.version_id, entry.versioned_epoch, - true)); - if (r < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to call RGWFetchRemoteObjCR()" << dendl; - return r; - } + ++pending; + marker_tracker->start(entry.key); + spawn(new RGWBucketSyncSingleEntryCR(store, async_rados, source_zone, &bucket_info, shard_id, + entry.key, entry.versioned_epoch, entry.key, marker_tracker), false); } - if (retcode < 0 && retcode != -ENOENT) { - ldout(store->ctx(), 0) << "ERROR: failed to sync object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << entries_iter->key << " [" << entries_iter->version_id << "]" << dendl; - return set_state(RGWCoroutine_Error, retcode); + while (pending > spawn_window && + collect(&ret)) { + --pending; + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: a sync operation returned error" << dendl; + /* we should have reported this error */ +#warning deal with error + } + yield; } } } while (list_result.is_truncated); + /* wait for all operations to complete */ + while (collect(&ret)) { + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: a sync operation returned error" << dendl; + /* we should have reported this error */ +#warning deal with error + } + yield; + } } } diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 69ff0103747e6..d95e2548d0b80 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -203,32 +203,28 @@ class RGWBucketSyncStatusManager; class RGWBucketSyncCR; struct rgw_bucket_shard_sync_marker { - string key; - string ver; - string incremental; + rgw_obj_key full_marker; + string incremental_marker; rgw_bucket_shard_sync_marker() {} void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); - ::encode(key, bl); - ::encode(ver, bl); - ::encode(incremental, bl); + ::encode(full_marker, bl); + ::encode(incremental_marker, bl); ENCODE_FINISH(bl); } void decode(bufferlist::iterator& bl) { DECODE_START(1, bl); - ::decode(key, bl); - ::decode(ver, bl); - ::decode(incremental, bl); + ::decode(full_marker, bl); + ::decode(incremental_marker, bl); DECODE_FINISH(bl); } void dump(Formatter *f) const { - encode_json("key", key, f); - encode_json("ver", ver, f); - encode_json("incremental", incremental, f); + encode_json("full_marker", full_marker, f); + encode_json("incremental_marker", incremental_marker, f); } }; WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_marker) diff --git a/src/rgw/rgw_json_enc.cc b/src/rgw/rgw_json_enc.cc index 4bc2b444d892a..9731393b93b10 100644 --- a/src/rgw/rgw_json_enc.cc +++ b/src/rgw/rgw_json_enc.cc @@ -662,6 +662,12 @@ void RGWBucketInfo::decode_json(JSONObj *obj) { } } +void rgw_obj_key::dump(Formatter *f) const +{ + encode_json("name", name, f); + encode_json("instance", instance, f); +} + void RGWObjEnt::dump(Formatter *f) const { encode_json("name", key.name, f); diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 4e1fcaf3a8c68..3c49e68d867a8 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -734,59 +734,26 @@ public: #define META_SYNC_UPDATE_MARKER_WINDOW 10 -class RGWMetaSyncShardMarkerTrack { +class RGWMetaSyncShardMarkerTrack : public RGWSyncShardMarkerTrack { RGWRados *store; - RGWHTTPManager *http_manager; RGWAsyncRadosProcessor *async_rados; - RGWCoroutine *cr; - - map pending; - - string high_marker; string marker_oid; rgw_meta_sync_marker sync_marker; - int updates_since_flush; public: RGWMetaSyncShardMarkerTrack(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, - RGWCoroutine *_cr, const string& _marker_oid, - const rgw_meta_sync_marker& _marker) : store(_store), http_manager(_mgr), - async_rados(_async_rados), cr(_cr), + const rgw_meta_sync_marker& _marker) : RGWSyncShardMarkerTrack(META_SYNC_UPDATE_MARKER_WINDOW), + store(_store), + async_rados(_async_rados), marker_oid(_marker_oid), - sync_marker(_marker), updates_since_flush(0) {} - - void start(const string& pos) { - pending[pos] = true; - } - - RGWCoroutine *finish(const string& pos) { - assert(!pending.empty()); + sync_marker(_marker) {} - map::iterator iter = pending.begin(); - const string& first_pos = iter->first; - - if (pos > high_marker) { - high_marker = pos; - } - - pending.erase(pos); - - updates_since_flush++; - - if (pos == first_pos && (updates_since_flush >= META_SYNC_UPDATE_MARKER_WINDOW || pending.empty())) { - return update_marker(high_marker); - } - return NULL; - } - - RGWCoroutine *update_marker(const string& new_marker) { + RGWCoroutine *store_marker(const string& new_marker) { sync_marker.marker = new_marker; - updates_since_flush = 0; - ldout(store->ctx(), 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl; return new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, marker_oid, sync_marker); @@ -827,7 +794,7 @@ public: pos = raw_key.find(':'); section = raw_key.substr(0, pos); key = raw_key.substr(pos + 1); - call(new RGWReadRemoteMetadataCR(store, http_manager, async_rados, section, key, &md_bl)); + sync_status = call(new RGWReadRemoteMetadataCR(store, http_manager, async_rados, section, key, &md_bl)); } if (sync_status < 0) { @@ -978,7 +945,7 @@ public: int max_entries = OMAP_GET_MAX_ENTRIES; reenter(&full_cr) { oid = full_sync_index_shard_oid(shard_id); - set_marker_tracker(new RGWMetaSyncShardMarkerTrack(store, http_manager, async_rados, this, + set_marker_tracker(new RGWMetaSyncShardMarkerTrack(store, http_manager, async_rados, RGWMetaSyncStatusManager::shard_obj_name(shard_id), sync_marker)); do { @@ -1030,7 +997,7 @@ public: int incremental_sync() { reenter(&incremental_cr) { mdlog_marker = sync_marker.marker; - set_marker_tracker(new RGWMetaSyncShardMarkerTrack(store, http_manager, async_rados, this, + set_marker_tracker(new RGWMetaSyncShardMarkerTrack(store, http_manager, async_rados, RGWMetaSyncStatusManager::shard_obj_name(shard_id), sync_marker)); do { diff --git a/src/rgw/rgw_sync.h b/src/rgw/rgw_sync.h index 16586f337005f..d3c7b497eec5a 100644 --- a/src/rgw/rgw_sync.h +++ b/src/rgw/rgw_sync.h @@ -217,4 +217,51 @@ public: } }; +template +class RGWSyncShardMarkerTrack { + typename std::map pending; + + T high_marker; + + int window_size; + int updates_since_flush; + + +protected: + virtual RGWCoroutine *store_marker(const T& new_marker) = 0; + +public: + RGWSyncShardMarkerTrack(int _window_size) : window_size(_window_size), updates_since_flush(0) {} + virtual ~RGWSyncShardMarkerTrack() {} + + void start(const T& pos) { + pending[pos] = true; + } + + RGWCoroutine *finish(const T& pos) { + assert(!pending.empty()); + + typename std::map::iterator iter = pending.begin(); + const T& first_pos = iter->first; + + if (!(pos <= high_marker)) { + high_marker = pos; + } + + pending.erase(pos); + + updates_since_flush++; + + if (pos == first_pos && (updates_since_flush >= window_size || pending.empty())) { + return update_marker(high_marker); + } + return NULL; + } + + RGWCoroutine *update_marker(const T& new_marker) { + updates_since_flush = 0; + return store_marker(new_marker); + } +}; + #endif -- 2.39.5