From 21347a5c7282bee1c92598c916f8462dbce13131 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 9 Nov 2015 17:02:22 -0800 Subject: [PATCH] rgw: keep timestamp stats in incremental sync position data Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_data_sync.cc | 18 ++++++++++-------- src/rgw/rgw_data_sync.h | 4 ++++ src/rgw/rgw_json_enc.cc | 2 ++ src/rgw/rgw_meta_sync_status.h | 3 +++ src/rgw/rgw_sync.cc | 18 +++++++++++++----- src/rgw/rgw_sync.h | 31 +++++++++++++++++++------------ 6 files changed, 51 insertions(+), 25 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index d50eeb6dd31c5..2619d1f8a9e03 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -306,7 +306,9 @@ public: yield { for (int i = 0; i < (int)status.num_shards; i++) { rgw_data_sync_marker marker; - marker.next_step_marker = shards_info[i].marker; + RGWDataChangesLogInfo& info = shards_info[i]; + marker.next_step_marker = info.marker; + marker.timestamp = info.last_update; spawn(new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(source_zone, i), marker), true); } @@ -632,7 +634,7 @@ public: marker_oid(_marker_oid), sync_marker(_marker) {} - RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos) { + RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const utime_t& timestamp) { sync_marker.marker = new_marker; sync_marker.pos = index_pos; @@ -899,7 +901,7 @@ public: for (; iter != entries.end(); ++iter) { ldout(store->ctx(), 20) << __func__ << ": full sync: " << iter->first << dendl; total_entries++; - marker_tracker->start(iter->first, total_entries); + marker_tracker->start(iter->first, total_entries, utime_t()); // fetch remote and write locally yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, iter->first, iter->first, marker_tracker), false); if (retcode < 0) { @@ -969,7 +971,7 @@ public: ldout(store->ctx(), 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl; continue; } - marker_tracker->start(log_iter->log_id, 0); + marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp); yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, log_iter->entry.key, log_iter->log_id, marker_tracker), false); if (retcode < 0) { return set_cr_error(retcode); @@ -1711,7 +1713,7 @@ public: marker_oid(_marker_oid), sync_marker(_marker) {} - RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos) { + RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const utime_t& timestamp) { sync_marker.position = new_marker; sync_marker.count = index_pos; @@ -1741,7 +1743,7 @@ public: marker_oid(_marker_oid), sync_marker(_marker) {} - RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos) { + RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const utime_t& timestamp) { sync_marker.position = new_marker; map attrs; @@ -1914,7 +1916,7 @@ int RGWBucketShardFullSyncCR::operate() yield { bucket_list_entry& entry = *entries_iter; total_entries++; - marker_tracker->start(entry.key, total_entries); + marker_tracker->start(entry.key, total_entries, utime_t()); list_marker = entry.key; spawn(new RGWBucketSyncSingleEntryCR(store, async_rados, source_zone, bucket_info, shard_id, entry.key, entry.versioned_epoch, entry.mtime, CLS_RGW_OP_ADD, entry.key, marker_tracker), false); @@ -2022,7 +2024,7 @@ int RGWBucketShardIncrementalSyncCR::operate() rgw_obj_key key(entries_iter->object, entries_iter->instance); ldout(store->ctx(), 20) << "[inc sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << dendl; rgw_bi_log_entry& entry = *entries_iter; - marker_tracker->start(entry.id, 0); + marker_tracker->start(entry.id, 0, entries_iter->timestamp); inc_marker.position = entry.id; uint64_t versioned_epoch = 0; if (entry.ver.pool < 0) { diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 0e63df3fd2ec1..90cf092dee096 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -73,6 +73,7 @@ struct rgw_data_sync_marker { string next_step_marker; uint64_t total_entries; uint64_t pos; + utime_t timestamp; rgw_data_sync_marker() : state(FullSync), total_entries(0), pos(0) {} @@ -83,6 +84,7 @@ struct rgw_data_sync_marker { ::encode(next_step_marker, bl); ::encode(total_entries, bl); ::encode(pos, bl); + ::encode(timestamp, bl); ENCODE_FINISH(bl); } @@ -93,6 +95,7 @@ struct rgw_data_sync_marker { ::decode(next_step_marker, bl); ::decode(total_entries, bl); ::decode(pos, bl); + ::decode(timestamp, bl); DECODE_FINISH(bl); } @@ -102,6 +105,7 @@ struct rgw_data_sync_marker { encode_json("next_step_marker", next_step_marker, f); encode_json("total_entries", total_entries, f); encode_json("pos", pos, f); + encode_json("timestamp", timestamp, f); } }; WRITE_CLASS_ENCODER(rgw_data_sync_marker) diff --git a/src/rgw/rgw_json_enc.cc b/src/rgw/rgw_json_enc.cc index 0b6b2e0e4c395..8039ffdaa6873 100644 --- a/src/rgw/rgw_json_enc.cc +++ b/src/rgw/rgw_json_enc.cc @@ -1164,6 +1164,7 @@ void rgw_meta_sync_marker::decode_json(JSONObj *obj) JSONDecoder::decode_json("next_step_marker", next_step_marker, obj); JSONDecoder::decode_json("total_entries", total_entries, obj); JSONDecoder::decode_json("pos", pos, obj); + JSONDecoder::decode_json("timestamp", timestamp, obj); } void rgw_meta_sync_marker::dump(Formatter *f) const @@ -1173,6 +1174,7 @@ void rgw_meta_sync_marker::dump(Formatter *f) const encode_json("next_step_marker", next_step_marker, f); encode_json("total_entries", total_entries, f); encode_json("pos", pos, f); + encode_json("timestamp", timestamp, f); } void rgw_meta_sync_status::decode_json(JSONObj *obj) diff --git a/src/rgw/rgw_meta_sync_status.h b/src/rgw/rgw_meta_sync_status.h index 414994490f9b9..2635f9cc22c8b 100644 --- a/src/rgw/rgw_meta_sync_status.h +++ b/src/rgw/rgw_meta_sync_status.h @@ -42,6 +42,7 @@ struct rgw_meta_sync_marker { string next_step_marker; uint64_t total_entries; uint64_t pos; + utime_t timestamp; rgw_meta_sync_marker() : state(FullSync), total_entries(0), pos(0) {} @@ -52,6 +53,7 @@ struct rgw_meta_sync_marker { ::encode(next_step_marker, bl); ::encode(total_entries, bl); ::encode(pos, bl); + ::encode(timestamp, bl); ENCODE_FINISH(bl); } @@ -62,6 +64,7 @@ struct rgw_meta_sync_marker { ::decode(next_step_marker, bl); ::decode(total_entries, bl); ::decode(pos, bl); + ::decode(timestamp, bl); DECODE_FINISH(bl); } diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 49ddd98b893a2..c0efc3b0be41f 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -471,7 +471,9 @@ public: yield { for (int i = 0; i < (int)status.num_shards; i++) { rgw_meta_sync_marker marker; - marker.next_step_marker = shards_info[i].marker; + RGWMetadataLogInfo& info = shards_info[i]; + marker.next_step_marker = info.marker; + marker.timestamp = info.last_update; RGWRados *store = sync_env->store; spawn(new RGWSimpleRadosWriteCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_env->shard_obj_name(i), marker), true); @@ -836,9 +838,15 @@ public: marker_oid(_marker_oid), sync_marker(_marker) {} - RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos) { + RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const utime_t& timestamp) { sync_marker.marker = new_marker; - sync_marker.pos = index_pos; + if (index_pos > 0) { + sync_marker.pos = index_pos; + } + + if (timestamp.sec() > 0) { + sync_marker.timestamp = timestamp; + } ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl; RGWRados *store = sync_env->store; @@ -1181,7 +1189,7 @@ public: for (; iter != entries.end(); ++iter) { ldout(sync_env->cct, 20) << __func__ << ": full sync: " << iter->first << dendl; total_entries++; - marker_tracker->start(iter->first, total_entries); + marker_tracker->start(iter->first, total_entries, utime_t()); // fetch remote and write locally yield { @@ -1312,7 +1320,7 @@ public: yield call(new RGWReadMDLogEntriesCR(sync_env, shard_id, &max_marker, INCREMENTAL_MAX_ENTRIES, &log_entries, &truncated)); for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) { ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << dendl; - marker_tracker->start(log_iter->id, 0); + marker_tracker->start(log_iter->id, 0, log_iter->timestamp); raw_key = log_iter->section + ":" + log_iter->name; yield { RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, marker_tracker), false); diff --git a/src/rgw/rgw_sync.h b/src/rgw/rgw_sync.h index 1bf6ddaabf235..a927185e26e0b 100644 --- a/src/rgw/rgw_sync.h +++ b/src/rgw/rgw_sync.h @@ -156,39 +156,46 @@ public: template class RGWSyncShardMarkerTrack { - typename std::map pending; + struct marker_entry { + uint64_t pos; + utime_t timestamp; + + marker_entry() : pos(0) {} + marker_entry(uint64_t _p, const utime_t& _ts) : pos(_p), timestamp(_ts) {} + }; + typename std::map pending; T high_marker; - uint64_t high_index; + marker_entry high_entry; int window_size; int updates_since_flush; protected: - virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos) = 0; + virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const utime_t& timestamp) = 0; virtual void handle_finish(const T& marker) { } public: - RGWSyncShardMarkerTrack(int _window_size) : high_index(0), window_size(_window_size), updates_since_flush(0) {} + RGWSyncShardMarkerTrack(int _window_size) : window_size(_window_size), updates_since_flush(0) {} virtual ~RGWSyncShardMarkerTrack() {} - void start(const T& pos, int index_pos) { - pending[pos] = index_pos; + void start(const T& pos, int index_pos, const utime_t& timestamp) { + pending[pos] = marker_entry(index_pos, timestamp); } RGWCoroutine *finish(const T& pos) { assert(!pending.empty()); - typename std::map::iterator iter = pending.begin(); + typename std::map::iterator iter = pending.begin(); const T& first_pos = iter->first; - typename std::map::iterator pos_iter = pending.find(pos); + typename std::map::iterator pos_iter = pending.find(pos); assert(pos_iter != pending.end()); if (!(pos <= high_marker)) { high_marker = pos; - high_index = pos_iter->second; + high_entry = pos_iter->second; } pending.erase(pos); @@ -198,14 +205,14 @@ public: updates_since_flush++; if (pos == first_pos && (updates_since_flush >= window_size || pending.empty())) { - return update_marker(high_marker, high_index); + return update_marker(high_marker, high_entry); } return NULL; } - RGWCoroutine *update_marker(const T& new_marker, uint64_t index_pos) { + RGWCoroutine *update_marker(const T& new_marker, marker_entry& entry) { updates_since_flush = 0; - return store_marker(new_marker, index_pos); + return store_marker(new_marker, entry.pos, entry.timestamp); } }; -- 2.39.5