]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: keep timestamp stats in incremental sync position data
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 10 Nov 2015 01:02:22 +0000 (17:02 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Feb 2016 00:13:30 +0000 (16:13 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h
src/rgw/rgw_json_enc.cc
src/rgw/rgw_meta_sync_status.h
src/rgw/rgw_sync.cc
src/rgw/rgw_sync.h

index d50eeb6dd31c509453686693826d0f87f0ea80ba..2619d1f8a9e0319d191717ec8b40607ed1ed494e 100644 (file)
@@ -306,7 +306,9 @@ public:
       yield {
         for (int i = 0; i < (int)status.num_shards; i++) {
          rgw_data_sync_marker marker;
-         marker.next_step_marker = shards_info[i].marker;
+          RGWDataChangesLogInfo& info = shards_info[i];
+         marker.next_step_marker = info.marker;
+         marker.timestamp = info.last_update;
           spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(async_rados, store, store->get_zone_params().log_pool,
                                                          RGWDataSyncStatusManager::shard_obj_name(source_zone, i), marker), true);
         }
@@ -632,7 +634,7 @@ public:
                                                                 marker_oid(_marker_oid),
                                                                 sync_marker(_marker) {}
 
-  RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos) {
+  RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const utime_t& timestamp) {
     sync_marker.marker = new_marker;
     sync_marker.pos = index_pos;
 
@@ -899,7 +901,7 @@ public:
         for (; iter != entries.end(); ++iter) {
           ldout(store->ctx(), 20) << __func__ << ": full sync: " << iter->first << dendl;
           total_entries++;
-          marker_tracker->start(iter->first, total_entries);
+          marker_tracker->start(iter->first, total_entries, utime_t());
             // fetch remote and write locally
           yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, iter->first, iter->first, marker_tracker), false);
           if (retcode < 0) {
@@ -969,7 +971,7 @@ public:
               ldout(store->ctx(), 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl;
               continue;
             }
-            marker_tracker->start(log_iter->log_id, 0);
+            marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp);
             yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, log_iter->entry.key, log_iter->log_id, marker_tracker), false);
             if (retcode < 0) {
               return set_cr_error(retcode);
@@ -1711,7 +1713,7 @@ public:
                                                                 marker_oid(_marker_oid),
                                                                 sync_marker(_marker) {}
 
-  RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos) {
+  RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const utime_t& timestamp) {
     sync_marker.position = new_marker;
     sync_marker.count = index_pos;
 
@@ -1741,7 +1743,7 @@ public:
                                                                 marker_oid(_marker_oid),
                                                                 sync_marker(_marker) {}
 
-  RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos) {
+  RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const utime_t& timestamp) {
     sync_marker.position = new_marker;
 
     map<string, bufferlist> attrs;
@@ -1914,7 +1916,7 @@ int RGWBucketShardFullSyncCR::operate()
         yield {
           bucket_list_entry& entry = *entries_iter;
           total_entries++;
-          marker_tracker->start(entry.key, total_entries);
+          marker_tracker->start(entry.key, total_entries, utime_t());
           list_marker = entry.key;
           spawn(new RGWBucketSyncSingleEntryCR<rgw_obj_key>(store, async_rados, source_zone, bucket_info, shard_id,
                                                entry.key, entry.versioned_epoch, entry.mtime, CLS_RGW_OP_ADD, entry.key, marker_tracker), false);
@@ -2022,7 +2024,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
           rgw_obj_key key(entries_iter->object, entries_iter->instance);
           ldout(store->ctx(), 20) << "[inc sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << dendl;
           rgw_bi_log_entry& entry = *entries_iter;
-          marker_tracker->start(entry.id, 0);
+          marker_tracker->start(entry.id, 0, entries_iter->timestamp);
           inc_marker.position = entry.id;
           uint64_t versioned_epoch = 0;
           if (entry.ver.pool < 0) {
index 0e63df3fd2ec1cba5cf13fba8b99c7b9d1b7d5b0..90cf092dee09616386047adbb06f802f7c7be253 100644 (file)
@@ -73,6 +73,7 @@ struct rgw_data_sync_marker {
   string next_step_marker;
   uint64_t total_entries;
   uint64_t pos;
+  utime_t timestamp;
 
   rgw_data_sync_marker() : state(FullSync), total_entries(0), pos(0) {}
 
@@ -83,6 +84,7 @@ struct rgw_data_sync_marker {
     ::encode(next_step_marker, bl);
     ::encode(total_entries, bl);
     ::encode(pos, bl);
+    ::encode(timestamp, bl);
     ENCODE_FINISH(bl);
   }
 
@@ -93,6 +95,7 @@ struct rgw_data_sync_marker {
     ::decode(next_step_marker, bl);
     ::decode(total_entries, bl);
     ::decode(pos, bl);
+    ::decode(timestamp, bl);
      DECODE_FINISH(bl);
   }
 
@@ -102,6 +105,7 @@ struct rgw_data_sync_marker {
     encode_json("next_step_marker", next_step_marker, f);
     encode_json("total_entries", total_entries, f);
     encode_json("pos", pos, f);
+    encode_json("timestamp", timestamp, f);
   }
 };
 WRITE_CLASS_ENCODER(rgw_data_sync_marker)
index 0b6b2e0e4c3956cbb745fef35278e428921b5122..8039ffdaa68738e647b3822b93174870dcb090c7 100644 (file)
@@ -1164,6 +1164,7 @@ void rgw_meta_sync_marker::decode_json(JSONObj *obj)
   JSONDecoder::decode_json("next_step_marker", next_step_marker, obj);
   JSONDecoder::decode_json("total_entries", total_entries, obj);
   JSONDecoder::decode_json("pos", pos, obj);
+  JSONDecoder::decode_json("timestamp", timestamp, obj);
 }
 
 void rgw_meta_sync_marker::dump(Formatter *f) const
@@ -1173,6 +1174,7 @@ void rgw_meta_sync_marker::dump(Formatter *f) const
   encode_json("next_step_marker", next_step_marker, f);
   encode_json("total_entries", total_entries, f);
   encode_json("pos", pos, f);
+  encode_json("timestamp", timestamp, f);
 }
 
 void rgw_meta_sync_status::decode_json(JSONObj *obj)
index 414994490f9b9a3fc05a49bf023ef469cf0e402c..2635f9cc22c8b598f1cf5f25a0c7b4a5d7258a95 100644 (file)
@@ -42,6 +42,7 @@ struct rgw_meta_sync_marker {
   string next_step_marker;
   uint64_t total_entries;
   uint64_t pos;
+  utime_t timestamp;
 
   rgw_meta_sync_marker() : state(FullSync), total_entries(0), pos(0) {}
 
@@ -52,6 +53,7 @@ struct rgw_meta_sync_marker {
     ::encode(next_step_marker, bl);
     ::encode(total_entries, bl);
     ::encode(pos, bl);
+    ::encode(timestamp, bl);
     ENCODE_FINISH(bl);
   }
 
@@ -62,6 +64,7 @@ struct rgw_meta_sync_marker {
     ::decode(next_step_marker, bl);
     ::decode(total_entries, bl);
     ::decode(pos, bl);
+    ::decode(timestamp, bl);
      DECODE_FINISH(bl);
   }
 
index 49ddd98b893a2d34b0cb32d8ac2ac21a56d14420..c0efc3b0be41ffb02913b3354650a3f5857ebc19 100644 (file)
@@ -471,7 +471,9 @@ public:
       yield {
         for (int i = 0; i < (int)status.num_shards; i++) {
          rgw_meta_sync_marker marker;
-         marker.next_step_marker = shards_info[i].marker;
+          RGWMetadataLogInfo& info = shards_info[i];
+         marker.next_step_marker = info.marker;
+         marker.timestamp = info.last_update;
           RGWRados *store = sync_env->store;
           spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados, store, store->get_zone_params().log_pool,
                                                          sync_env->shard_obj_name(i), marker), true);
@@ -836,9 +838,15 @@ public:
                                                                 marker_oid(_marker_oid),
                                                                 sync_marker(_marker) {}
 
-  RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos) {
+  RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const utime_t& timestamp) {
     sync_marker.marker = new_marker;
-    sync_marker.pos = index_pos;
+    if (index_pos > 0) {
+      sync_marker.pos = index_pos;
+    }
+
+    if (timestamp.sec() > 0) {
+      sync_marker.timestamp = timestamp;
+    }
 
     ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
     RGWRados *store = sync_env->store;
@@ -1181,7 +1189,7 @@ public:
         for (; iter != entries.end(); ++iter) {
           ldout(sync_env->cct, 20) << __func__ << ": full sync: " << iter->first << dendl;
           total_entries++;
-          marker_tracker->start(iter->first, total_entries);
+          marker_tracker->start(iter->first, total_entries, utime_t());
 
             // fetch remote and write locally
           yield {
@@ -1312,7 +1320,7 @@ public:
           yield call(new RGWReadMDLogEntriesCR(sync_env, shard_id, &max_marker, INCREMENTAL_MAX_ENTRIES, &log_entries, &truncated));
           for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
             ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << dendl;
-            marker_tracker->start(log_iter->id, 0);
+            marker_tracker->start(log_iter->id, 0, log_iter->timestamp);
             raw_key = log_iter->section + ":" + log_iter->name;
             yield {
               RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, marker_tracker), false);
index 1bf6ddaabf2358a77ec5438ec02ba03a67ae82d1..a927185e26e0b5a1d544e54c01d9461833618d3d 100644 (file)
@@ -156,39 +156,46 @@ public:
 
 template <class T>
 class RGWSyncShardMarkerTrack {
-  typename std::map<T, uint64_t> pending;
+  struct marker_entry {
+    uint64_t pos;
+    utime_t timestamp;
+
+    marker_entry() : pos(0) {}
+    marker_entry(uint64_t _p, const utime_t& _ts) : pos(_p), timestamp(_ts) {}
+  };
+  typename std::map<T, marker_entry> pending;
 
   T high_marker;
-  uint64_t high_index;
+  marker_entry high_entry;
 
   int window_size;
   int updates_since_flush;
 
 
 protected:
-  virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos) = 0;
+  virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const utime_t& timestamp) = 0;
   virtual void handle_finish(const T& marker) { }
 
 public:
-  RGWSyncShardMarkerTrack(int _window_size) : high_index(0), window_size(_window_size), updates_since_flush(0) {}
+  RGWSyncShardMarkerTrack(int _window_size) : window_size(_window_size), updates_since_flush(0) {}
   virtual ~RGWSyncShardMarkerTrack() {}
 
-  void start(const T& pos, int index_pos) {
-    pending[pos] = index_pos;
+  void start(const T& pos, int index_pos, const utime_t& timestamp) {
+    pending[pos] = marker_entry(index_pos, timestamp);
   }
 
   RGWCoroutine *finish(const T& pos) {
     assert(!pending.empty());
 
-    typename std::map<T, uint64_t>::iterator iter = pending.begin();
+    typename std::map<T, marker_entry>::iterator iter = pending.begin();
     const T& first_pos = iter->first;
 
-    typename std::map<T, uint64_t>::iterator pos_iter = pending.find(pos);
+    typename std::map<T, marker_entry>::iterator pos_iter = pending.find(pos);
     assert(pos_iter != pending.end());
 
     if (!(pos <= high_marker)) {
       high_marker = pos;
-      high_index = pos_iter->second;
+      high_entry = pos_iter->second;
     }
 
     pending.erase(pos);
@@ -198,14 +205,14 @@ public:
     updates_since_flush++;
 
     if (pos == first_pos && (updates_since_flush >= window_size || pending.empty())) {
-      return update_marker(high_marker, high_index);
+      return update_marker(high_marker, high_entry);
     }
     return NULL;
   }
 
-  RGWCoroutine *update_marker(const T& new_marker, uint64_t index_pos) {
+  RGWCoroutine *update_marker(const T& new_marker, marker_entry& entry) {
     updates_since_flush = 0;
-    return store_marker(new_marker, index_pos);
+    return store_marker(new_marker, entry.pos, entry.timestamp);
   }
 };