]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add labeled counters for sync deltas of shards
authorAli Maredia <amaredia@redhat.com>
Wed, 13 Mar 2024 15:26:40 +0000 (15:26 +0000)
committerCasey Bodley <cbodley@redhat.com>
Wed, 29 Jan 2025 17:12:19 +0000 (12:12 -0500)
Signed-off-by: Ali Maredia <amaredia@redhat.com>
src/common/perf_counters.cc
src/common/perf_counters.h
src/rgw/driver/rados/rgw_data_sync.cc
src/rgw/driver/rados/rgw_rest_log.cc
src/rgw/driver/rados/rgw_rest_log.h
src/rgw/driver/rados/rgw_sync.cc
src/rgw/driver/rados/rgw_sync.h
src/rgw/driver/rados/rgw_sync_counters.cc
src/rgw/driver/rados/rgw_sync_counters.h

index 2eeaa80aae8ea6ae46c78aeed7ff2786568dafff..0c39c2e268619353ac1700f8e1c073034217fe2b 100644 (file)
@@ -315,6 +315,23 @@ void PerfCounters::tset(int idx, utime_t amt)
     ceph_abort();
 }
 
+void PerfCounters::tset(int idx, ceph::timespan amt)
+{
+#ifndef WITH_SEASTAR
+  if (!m_cct->_conf->perf)
+    return;
+#endif
+
+  ceph_assert(idx > m_lower_bound);
+  ceph_assert(idx < m_upper_bound);
+  perf_counter_data_any_d& data(m_data[idx - m_lower_bound - 1]);
+  if (!(data.type & PERFCOUNTER_TIME))
+    return;
+  data.u64 = amt.count();
+  if (data.type & PERFCOUNTER_LONGRUNAVG)
+    ceph_abort();
+}
+
 utime_t PerfCounters::tget(int idx) const
 {
 #ifndef WITH_SEASTAR
index 0d0fe86a0920313ead589b10e227e979b2911bef..b75968bea92acc85a0df2e4fb43b97c98b615d43 100644 (file)
@@ -242,6 +242,7 @@ public:
   uint64_t get(int idx) const;
 
   void tset(int idx, utime_t v);
+  void tset(int idx, ceph::timespan v);
   void tinc(int idx, utime_t v);
   void tinc(int idx, ceph::timespan v);
   utime_t tget(int idx) const;
index c0a9059a25179f70b4bdd19fe530a5b0a678406d..b81339717844b0addf7b790541d66ed34ba7fe47 100644 (file)
@@ -6,6 +6,7 @@
 #include "common/WorkQueue.h"
 #include "common/Throttle.h"
 #include "common/errno.h"
+#include "common/perf_counters_key.h"
 
 #include "rgw_common.h"
 #include "rgw_zone.h"
@@ -300,12 +301,14 @@ struct read_remote_data_log_response {
   string marker;
   bool truncated;
   vector<rgw_data_change_log_entry> entries;
+  real_time last_update;
 
   read_remote_data_log_response() : truncated(false) {}
 
   void decode_json(JSONObj *obj) {
     JSONDecoder::decode_json("marker", marker, obj);
     JSONDecoder::decode_json("truncated", truncated, obj);
+    JSONDecoder::decode_json("last_update", last_update, obj);
     JSONDecoder::decode_json("entries", entries, obj);
   };
 };
@@ -321,6 +324,7 @@ class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
   string *pnext_marker;
   vector<rgw_data_change_log_entry> *entries;
   bool *truncated;
+  real_time *last_update;
 
   read_remote_data_log_response response;
   std::optional<TOPNSPC::common::PerfGuard> timer;
@@ -332,10 +336,10 @@ public:
   RGWReadRemoteDataLogShardCR(RGWDataSyncCtx *_sc, int _shard_id,
                               const std::string& marker, string *pnext_marker,
                               vector<rgw_data_change_log_entry> *_entries,
-                              bool *_truncated)
+                              bool *_truncated, real_time *_last_update)
     : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
       shard_id(_shard_id), marker(marker), pnext_marker(pnext_marker),
-      entries(_entries), truncated(_truncated) {
+      entries(_entries), truncated(_truncated), last_update(_last_update) {
   }
 
   int operate(const DoutPrefixProvider *dpp) override {
@@ -397,6 +401,7 @@ public:
         entries->swap(response.entries);
         *pnext_marker = response.marker;
         *truncated = response.truncated;
+        *last_update = response.last_update;
         return set_cr_done();
       }
     }
@@ -1109,22 +1114,44 @@ class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, strin
   rgw_data_sync_marker sync_marker;
   RGWSyncTraceNodeRef tn;
   RGWObjVersionTracker& objv;
+  sync_deltas::SyncDeltaCountersManager sync_delta_counters_manager;
 
 public:
   RGWDataSyncShardMarkerTrack(RGWDataSyncCtx *_sc,
                          const string& _marker_oid,
                          const rgw_data_sync_marker& _marker,
-                         RGWSyncTraceNodeRef& _tn, RGWObjVersionTracker& objv) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
+                         RGWSyncTraceNodeRef& _tn, 
+                         RGWObjVersionTracker& objv,
+                         const uint32_t shard_id) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
                                                                 sc(_sc), sync_env(_sc->env),
                                                                 marker_oid(_marker_oid),
                                                                 sync_marker(_marker),
-                                                                tn(_tn), objv(objv) {}
+                                                                tn(_tn), objv(objv),
+                                                                sync_delta_counters_manager(init_keys(shard_id), _sc->env->cct) {}
 
-  RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
+  std::string init_keys(const uint32_t shard_id) {
+    std::string sz_id = sc->source_zone.id;
+    std::string lz_id = sc->env->svc->zone->get_zone_params().get_id();
+    return ceph::perf_counters::key_create(rgw_sync_delta_counters_key,
+        {{"local-zone-id", lz_id},
+        {"source-zone-id", sz_id},
+        {"shard-id", std::to_string(shard_id)}});
+  }
+
+  RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp, const real_time& last_update) override {
     sync_marker.marker = new_marker;
     sync_marker.pos = index_pos;
     sync_marker.timestamp = timestamp;
 
+    // Since store_marker() is called by full and incremental sync but
+    // last_update is only modified during incremental sync we only want to
+    // report deltas for incremental sync
+    real_time zero_time;
+    if (last_update != zero_time) {
+      auto delta = last_update - timestamp;
+      sync_delta_counters_manager.tset(sync_deltas::l_rgw_datalog_sync_delta, delta);
+    }
+
     tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
 
     return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->dpp, sync_env->driver,
@@ -1814,7 +1841,7 @@ public:
     reenter(this) {
       tn->log(10, "start full sync");
       oid = full_data_sync_index_shard_oid(sc->source_zone, shard_id);
-      marker_tracker.emplace(sc, status_oid, sync_marker, tn, objv);
+      marker_tracker.emplace(sc, status_oid, sync_marker, tn, objv, shard_id);
       total_entries = sync_marker.pos;
       entry_timestamp = sync_marker.timestamp; // time when full sync started
       do {
@@ -1927,6 +1954,7 @@ class RGWDataIncSyncShardCR : public RGWDataBaseSyncShardCR {
 
   string next_marker;
   vector<rgw_data_change_log_entry> log_entries;
+  real_time last_update;
   decltype(log_entries)::iterator log_iter;
   bool truncated = false;
   int cbret = 0;
@@ -1968,7 +1996,7 @@ public:
   int operate(const DoutPrefixProvider *dpp) override {
     reenter(this) {
       tn->log(10, "start incremental sync");
-      marker_tracker.emplace(sc, status_oid, sync_marker, tn, objv);
+      marker_tracker.emplace(sc, status_oid, sync_marker, tn, objv, shard_id);
       do {
         if (!lease_cr->is_locked()) {
           lost_lock = true;
@@ -2073,7 +2101,7 @@ public:
         yield call(new RGWReadRemoteDataLogShardCR(sc, shard_id,
                                                   sync_marker.marker,
                                                    &next_marker, &log_entries,
-                                                  &truncated));
+                                                  &truncated, &last_update));
         if (retcode < 0 && retcode != -ENOENT) {
           tn->log(0, SSTR("ERROR: failed to read remote data log info: ret="
                          << retcode));
@@ -2100,11 +2128,11 @@ public:
             tn->log(1, SSTR("failed to parse bucket shard: "
                            << log_iter->entry.key));
             marker_tracker->try_update_high_marker(log_iter->log_id, 0,
-                                                  log_iter->log_timestamp);
+                                                  log_iter->log_timestamp, last_update);
             continue;
           }
           if (!marker_tracker->start(log_iter->log_id, 0,
-                                    log_iter->log_timestamp)) {
+                                    log_iter->log_timestamp, last_update)) {
             tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id
                            << ". Duplicate entry?"));
           } else {
@@ -3922,6 +3950,7 @@ class RGWReadPendingBucketShardsCoroutine : public RGWCoroutine {
   std::string next_marker;
   vector<rgw_data_change_log_entry> log_entries;
   bool truncated;
+  real_time last_update;
 
 public:
   RGWReadPendingBucketShardsCoroutine(RGWDataSyncCtx *_sc, const int _shard_id,
@@ -3956,7 +3985,7 @@ int RGWReadPendingBucketShardsCoroutine::operate(const DoutPrefixProvider *dpp)
     count = 0;
     do{
       yield call(new RGWReadRemoteDataLogShardCR(sc, shard_id, marker,
-                                                 &next_marker, &log_entries, &truncated));
+                                                 &next_marker, &log_entries, &truncated, &last_update));
 
       if (retcode == -ENOENT) {
         break;
@@ -4223,7 +4252,7 @@ public:
   {}
 
 
-  RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
+  RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp, const real_time& last_update) override {
     sync_status.full.position = new_marker;
     sync_status.full.count = index_pos;
 
@@ -4324,7 +4353,7 @@ public:
 
   const rgw_raw_obj& get_obj() const { return obj; }
 
-  RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
+  RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp, const real_time& last_update) override {
     sync_marker.position = new_marker;
     sync_marker.timestamp = timestamp;
 
index 9315dfc0afd8af7aab968272ac35f8ca3cb439f1..ef67a2b4510e37244e597af08b7d7921c87bf8e5 100644 (file)
@@ -691,6 +691,12 @@ void RGWOp_DATALog_List::execute(optional_yield y) {
   op_ret = static_cast<rgw::sal::RadosStore*>(driver)->svc()->
     datalog_rados->list_entries(this, shard_id, max_entries, entries,
                                marker, &last_marker, &truncated, y);
+
+  RGWDataChangesLogInfo info;
+  op_ret = static_cast<rgw::sal::RadosStore*>(driver)->svc()->
+    datalog_rados->get_info(this, shard_id, &info, y);
+
+  last_update = info.last_update;
 }
 
 void RGWOp_DATALog_List::send_response() {
@@ -703,6 +709,8 @@ void RGWOp_DATALog_List::send_response() {
 
   s->formatter->open_object_section("log_entries");
   s->formatter->dump_string("marker", last_marker);
+  utime_t lu(last_update);
+  encode_json("last_update", lu, s->formatter);
   s->formatter->dump_bool("truncated", truncated);
   {
     s->formatter->open_array_section("entries");
index b3a8e49d1b2954f37b7239d87243e0c7d97d4e3e..7c7e3510c71a9d5d693b354e56e69f248d9b60d2 100644 (file)
@@ -209,6 +209,7 @@ class RGWOp_DATALog_List : public RGWRESTOp {
   std::string last_marker;
   bool truncated;
   bool extra_info;
+  ceph::real_time last_update;
 public:
   RGWOp_DATALog_List() : truncated(false), extra_info(false) {}
   ~RGWOp_DATALog_List() override {}
index bd730dfd6c25996ddc0986969dea65b97cdd112b..aeaeb52188c1e3ab5a1500edb8677dfe3a40e7e4 100644 (file)
@@ -1256,7 +1256,7 @@ public:
                                                                 sync_marker(_marker),
                                                                 tn(_tn){}
 
-  RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
+  RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp, const real_time& last_update) override {
     sync_marker.marker = new_marker;
     if (index_pos > 0) {
       sync_marker.pos = index_pos;
index f0ee28056afa73e902418f1bb82178a376c8e31c..942ea444efe115f9a77d61a559c9d8134ca415cc 100644 (file)
@@ -343,9 +343,10 @@ class RGWSyncShardMarkerTrack {
   struct marker_entry {
     uint64_t pos;
     real_time timestamp;
+    real_time last_update;
 
     marker_entry() : pos(0) {}
-    marker_entry(uint64_t _p, const real_time& _ts) : pos(_p), timestamp(_ts) {}
+    marker_entry(uint64_t _p, const real_time& _ts, const real_time& _lu = {}) : pos(_p), timestamp(_ts), last_update(_lu) {}
   };
   typename std::map<T, marker_entry> pending;
 
@@ -359,7 +360,7 @@ class RGWSyncShardMarkerTrack {
 protected:
   typename std::set<K> need_retry_set;
 
-  virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const real_time& timestamp) = 0;
+  virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const real_time& timestamp, const real_time& last_update) = 0;
   virtual RGWOrderCallCR *allocate_order_control_cr() = 0;
   virtual void handle_finish(const T& marker) { }
 
@@ -371,16 +372,16 @@ public:
     }
   }
 
-  bool start(const T& pos, int index_pos, const real_time& timestamp) {
+  bool start(const T& pos, int index_pos, const real_time& timestamp, const real_time& last_update = {}) {
     if (pending.find(pos) != pending.end()) {
       return false;
     }
-    pending[pos] = marker_entry(index_pos, timestamp);
+    pending[pos] = marker_entry(index_pos, timestamp, last_update);
     return true;
   }
 
-  void try_update_high_marker(const T& pos, int index_pos, const real_time& timestamp) {
-    finish_markers[pos] = marker_entry(index_pos, timestamp);
+  void try_update_high_marker(const T& pos, int index_pos, const real_time& timestamp, const real_time& last_update = {}) {
+    finish_markers[pos] = marker_entry(index_pos, timestamp, last_update);
   }
 
   RGWCoroutine *finish(const T& pos) {
@@ -436,7 +437,7 @@ public:
     --i;
     const T& high_marker = i->first;
     marker_entry& high_entry = i->second;
-    RGWCoroutine *cr = order(store_marker(high_marker, high_entry.pos, high_entry.timestamp));
+    RGWCoroutine *cr = order(store_marker(high_marker, high_entry.pos, high_entry.timestamp, high_entry.last_update));
     finish_markers.erase(finish_markers.begin(), last);
     return cr;
   }
index 1d23d58dcfb0376be42fa72d5ac239e1e0581bc0..b95fa8fc4fed826da8e0ef2a063a39f5a351573e 100644 (file)
@@ -3,6 +3,7 @@
 
 #include "common/ceph_context.h"
 #include "rgw_sync_counters.h"
+#include "common/perf_counters_key.h"
 
 namespace sync_counters {
 
@@ -26,3 +27,31 @@ PerfCountersRef build(CephContext *cct, const std::string& name)
 }
 
 } // namespace sync_counters
+
+namespace sync_deltas {
+
+void add_rgw_sync_delta_counters(PerfCountersBuilder *lpcb) {
+  lpcb->set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
+  lpcb->add_time(l_rgw_datalog_sync_delta, "sync_delta", "Sync delta between data log shard in seconds");
+}
+
+SyncDeltaCountersManager::SyncDeltaCountersManager(const std::string& name, CephContext *cct)
+    : cct(cct)
+{
+  std::string_view key = ceph::perf_counters::key_name(name);
+  ceph_assert(rgw_sync_delta_counters_key == key);
+  PerfCountersBuilder pcb(cct, name, l_rgw_sync_delta_first, l_rgw_sync_delta_last);
+  add_rgw_sync_delta_counters(&pcb);
+  sync_delta_counters = std::unique_ptr<PerfCounters>(pcb.create_perf_counters());
+  cct->get_perfcounters_collection()->add(sync_delta_counters.get());
+}
+
+void SyncDeltaCountersManager::tset(int idx, ceph::timespan  v) {
+  sync_delta_counters->tset(idx, v);
+}
+
+SyncDeltaCountersManager::~SyncDeltaCountersManager() {
+  cct->get_perfcounters_collection()->remove(sync_delta_counters.get());
+}
+
+} // namespace sync_deltas
index df3acc68023dbe61d352665f7c3c48c6262d0cee..5120df3517a36c8ad1a7200eee05b06ff0a1dad2 100644 (file)
@@ -23,3 +23,27 @@ enum {
 PerfCountersRef build(CephContext *cct, const std::string& name);
 
 } // namespace sync_counters
+
+const std::string rgw_sync_delta_counters_key = "rgw_sync_delta";
+
+namespace sync_deltas {
+
+enum {
+  l_rgw_sync_delta_first = 806000,
+  l_rgw_datalog_sync_delta,
+  l_rgw_sync_delta_last,
+};
+
+class SyncDeltaCountersManager {
+  std::unique_ptr<PerfCounters> sync_delta_counters;
+  CephContext *cct;
+
+public:
+  SyncDeltaCountersManager(const std::string& name, CephContext *cct);
+
+  void tset(int idx, ceph::timespan v);
+
+  ~SyncDeltaCountersManager();
+};
+
+} // namespace sync_deltas