yield {
for (int i = 0; i < (int)status.num_shards; i++) {
rgw_data_sync_marker marker;
- marker.next_step_marker = shards_info[i].marker;
+ RGWDataChangesLogInfo& info = shards_info[i];
+ marker.next_step_marker = info.marker;
+ marker.timestamp = info.last_update;
spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(async_rados, store, store->get_zone_params().log_pool,
RGWDataSyncStatusManager::shard_obj_name(source_zone, i), marker), true);
}
marker_oid(_marker_oid),
sync_marker(_marker) {}
- RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos) {
+ RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const utime_t& timestamp) {
sync_marker.marker = new_marker;
sync_marker.pos = index_pos;
for (; iter != entries.end(); ++iter) {
ldout(store->ctx(), 20) << __func__ << ": full sync: " << iter->first << dendl;
total_entries++;
- marker_tracker->start(iter->first, total_entries);
+ marker_tracker->start(iter->first, total_entries, utime_t());
// fetch remote and write locally
yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, iter->first, iter->first, marker_tracker), false);
if (retcode < 0) {
ldout(store->ctx(), 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl;
continue;
}
- marker_tracker->start(log_iter->log_id, 0);
+ marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp);
yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, log_iter->entry.key, log_iter->log_id, marker_tracker), false);
if (retcode < 0) {
return set_cr_error(retcode);
marker_oid(_marker_oid),
sync_marker(_marker) {}
- RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos) {
+ RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const utime_t& timestamp) {
sync_marker.position = new_marker;
sync_marker.count = index_pos;
marker_oid(_marker_oid),
sync_marker(_marker) {}
- RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos) {
+ RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const utime_t& timestamp) {
sync_marker.position = new_marker;
map<string, bufferlist> attrs;
yield {
bucket_list_entry& entry = *entries_iter;
total_entries++;
- marker_tracker->start(entry.key, total_entries);
+ marker_tracker->start(entry.key, total_entries, utime_t());
list_marker = entry.key;
spawn(new RGWBucketSyncSingleEntryCR<rgw_obj_key>(store, async_rados, source_zone, bucket_info, shard_id,
entry.key, entry.versioned_epoch, entry.mtime, CLS_RGW_OP_ADD, entry.key, marker_tracker), false);
rgw_obj_key key(entries_iter->object, entries_iter->instance);
ldout(store->ctx(), 20) << "[inc sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << dendl;
rgw_bi_log_entry& entry = *entries_iter;
- marker_tracker->start(entry.id, 0);
+ marker_tracker->start(entry.id, 0, entries_iter->timestamp);
inc_marker.position = entry.id;
uint64_t versioned_epoch = 0;
if (entry.ver.pool < 0) {
string next_step_marker;
uint64_t total_entries;
uint64_t pos;
+ utime_t timestamp;
rgw_data_sync_marker() : state(FullSync), total_entries(0), pos(0) {}
::encode(next_step_marker, bl);
::encode(total_entries, bl);
::encode(pos, bl);
+ ::encode(timestamp, bl);
ENCODE_FINISH(bl);
}
::decode(next_step_marker, bl);
::decode(total_entries, bl);
::decode(pos, bl);
+ ::decode(timestamp, bl);
DECODE_FINISH(bl);
}
encode_json("next_step_marker", next_step_marker, f);
encode_json("total_entries", total_entries, f);
encode_json("pos", pos, f);
+ encode_json("timestamp", timestamp, f);
}
};
WRITE_CLASS_ENCODER(rgw_data_sync_marker)
JSONDecoder::decode_json("next_step_marker", next_step_marker, obj);
JSONDecoder::decode_json("total_entries", total_entries, obj);
JSONDecoder::decode_json("pos", pos, obj);
+ JSONDecoder::decode_json("timestamp", timestamp, obj);
}
void rgw_meta_sync_marker::dump(Formatter *f) const
encode_json("next_step_marker", next_step_marker, f);
encode_json("total_entries", total_entries, f);
encode_json("pos", pos, f);
+ encode_json("timestamp", timestamp, f);
}
void rgw_meta_sync_status::decode_json(JSONObj *obj)
string next_step_marker;
uint64_t total_entries;
uint64_t pos;
+ utime_t timestamp;
rgw_meta_sync_marker() : state(FullSync), total_entries(0), pos(0) {}
::encode(next_step_marker, bl);
::encode(total_entries, bl);
::encode(pos, bl);
+ ::encode(timestamp, bl);
ENCODE_FINISH(bl);
}
::decode(next_step_marker, bl);
::decode(total_entries, bl);
::decode(pos, bl);
+ ::decode(timestamp, bl);
DECODE_FINISH(bl);
}
yield {
for (int i = 0; i < (int)status.num_shards; i++) {
rgw_meta_sync_marker marker;
- marker.next_step_marker = shards_info[i].marker;
+ RGWMetadataLogInfo& info = shards_info[i];
+ marker.next_step_marker = info.marker;
+ marker.timestamp = info.last_update;
RGWRados *store = sync_env->store;
spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados, store, store->get_zone_params().log_pool,
sync_env->shard_obj_name(i), marker), true);
marker_oid(_marker_oid),
sync_marker(_marker) {}
- RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos) {
+ RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const utime_t& timestamp) {
sync_marker.marker = new_marker;
- sync_marker.pos = index_pos;
+ if (index_pos > 0) {
+ sync_marker.pos = index_pos;
+ }
+
+ if (timestamp.sec() > 0) {
+ sync_marker.timestamp = timestamp;
+ }
ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
RGWRados *store = sync_env->store;
for (; iter != entries.end(); ++iter) {
ldout(sync_env->cct, 20) << __func__ << ": full sync: " << iter->first << dendl;
total_entries++;
- marker_tracker->start(iter->first, total_entries);
+ marker_tracker->start(iter->first, total_entries, utime_t());
// fetch remote and write locally
yield {
yield call(new RGWReadMDLogEntriesCR(sync_env, shard_id, &max_marker, INCREMENTAL_MAX_ENTRIES, &log_entries, &truncated));
for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << dendl;
- marker_tracker->start(log_iter->id, 0);
+ marker_tracker->start(log_iter->id, 0, log_iter->timestamp);
raw_key = log_iter->section + ":" + log_iter->name;
yield {
RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, marker_tracker), false);
template <class T>
class RGWSyncShardMarkerTrack {
- typename std::map<T, uint64_t> pending;
+ struct marker_entry {
+ uint64_t pos;
+ utime_t timestamp;
+
+ marker_entry() : pos(0) {}
+ marker_entry(uint64_t _p, const utime_t& _ts) : pos(_p), timestamp(_ts) {}
+ };
+ typename std::map<T, marker_entry> pending;
T high_marker;
- uint64_t high_index;
+ marker_entry high_entry;
int window_size;
int updates_since_flush;
protected:
- virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos) = 0;
+ virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const utime_t& timestamp) = 0;
virtual void handle_finish(const T& marker) { }
public:
- RGWSyncShardMarkerTrack(int _window_size) : high_index(0), window_size(_window_size), updates_since_flush(0) {}
+ RGWSyncShardMarkerTrack(int _window_size) : window_size(_window_size), updates_since_flush(0) {}
virtual ~RGWSyncShardMarkerTrack() {}
- void start(const T& pos, int index_pos) {
- pending[pos] = index_pos;
+ void start(const T& pos, int index_pos, const utime_t& timestamp) {
+ pending[pos] = marker_entry(index_pos, timestamp);
}
RGWCoroutine *finish(const T& pos) {
assert(!pending.empty());
- typename std::map<T, uint64_t>::iterator iter = pending.begin();
+ typename std::map<T, marker_entry>::iterator iter = pending.begin();
const T& first_pos = iter->first;
- typename std::map<T, uint64_t>::iterator pos_iter = pending.find(pos);
+ typename std::map<T, marker_entry>::iterator pos_iter = pending.find(pos);
assert(pos_iter != pending.end());
if (!(pos <= high_marker)) {
high_marker = pos;
- high_index = pos_iter->second;
+ high_entry = pos_iter->second;
}
pending.erase(pos);
updates_since_flush++;
if (pos == first_pos && (updates_since_flush >= window_size || pending.empty())) {
- return update_marker(high_marker, high_index);
+ return update_marker(high_marker, high_entry);
}
return NULL;
}
- RGWCoroutine *update_marker(const T& new_marker, uint64_t index_pos) {
+ RGWCoroutine *update_marker(const T& new_marker, marker_entry& entry) {
updates_since_flush = 0;
- return store_marker(new_marker, index_pos);
+ return store_marker(new_marker, entry.pos, entry.timestamp);
}
};