From: Ali Maredia Date: Wed, 13 Mar 2024 15:26:40 +0000 (+0000) Subject: rgw: add labeled counters for sync deltas of shards X-Git-Tag: v20.0.0~178^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=818e678d223ae6b704f192d8f2fec94719b9016a;p=ceph.git rgw: add labeled counters for sync deltas of shards Signed-off-by: Ali Maredia --- diff --git a/src/common/perf_counters.cc b/src/common/perf_counters.cc index 2eeaa80aae8..0c39c2e2686 100644 --- a/src/common/perf_counters.cc +++ b/src/common/perf_counters.cc @@ -315,6 +315,23 @@ void PerfCounters::tset(int idx, utime_t amt) ceph_abort(); } +void PerfCounters::tset(int idx, ceph::timespan amt) +{ +#ifndef WITH_SEASTAR + if (!m_cct->_conf->perf) + return; +#endif + + ceph_assert(idx > m_lower_bound); + ceph_assert(idx < m_upper_bound); + perf_counter_data_any_d& data(m_data[idx - m_lower_bound - 1]); + if (!(data.type & PERFCOUNTER_TIME)) + return; + data.u64 = amt.count(); + if (data.type & PERFCOUNTER_LONGRUNAVG) + ceph_abort(); +} + utime_t PerfCounters::tget(int idx) const { #ifndef WITH_SEASTAR diff --git a/src/common/perf_counters.h b/src/common/perf_counters.h index 0d0fe86a092..b75968bea92 100644 --- a/src/common/perf_counters.h +++ b/src/common/perf_counters.h @@ -242,6 +242,7 @@ public: uint64_t get(int idx) const; void tset(int idx, utime_t v); + void tset(int idx, ceph::timespan v); void tinc(int idx, utime_t v); void tinc(int idx, ceph::timespan v); utime_t tget(int idx) const; diff --git a/src/rgw/driver/rados/rgw_data_sync.cc b/src/rgw/driver/rados/rgw_data_sync.cc index c0a9059a251..b8133971784 100644 --- a/src/rgw/driver/rados/rgw_data_sync.cc +++ b/src/rgw/driver/rados/rgw_data_sync.cc @@ -6,6 +6,7 @@ #include "common/WorkQueue.h" #include "common/Throttle.h" #include "common/errno.h" +#include "common/perf_counters_key.h" #include "rgw_common.h" #include "rgw_zone.h" @@ -300,12 +301,14 @@ struct read_remote_data_log_response { string marker; bool truncated; vector entries; + real_time last_update; read_remote_data_log_response() : truncated(false) {} void decode_json(JSONObj *obj) { JSONDecoder::decode_json("marker", marker, obj); JSONDecoder::decode_json("truncated", truncated, obj); + JSONDecoder::decode_json("last_update", last_update, obj); JSONDecoder::decode_json("entries", entries, obj); }; }; @@ -321,6 +324,7 @@ class RGWReadRemoteDataLogShardCR : public RGWCoroutine { string *pnext_marker; vector *entries; bool *truncated; + real_time *last_update; read_remote_data_log_response response; std::optional timer; @@ -332,10 +336,10 @@ public: RGWReadRemoteDataLogShardCR(RGWDataSyncCtx *_sc, int _shard_id, const std::string& marker, string *pnext_marker, vector *_entries, - bool *_truncated) + bool *_truncated, real_time *_last_update) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), shard_id(_shard_id), marker(marker), pnext_marker(pnext_marker), - entries(_entries), truncated(_truncated) { + entries(_entries), truncated(_truncated), last_update(_last_update) { } int operate(const DoutPrefixProvider *dpp) override { @@ -397,6 +401,7 @@ public: entries->swap(response.entries); *pnext_marker = response.marker; *truncated = response.truncated; + *last_update = response.last_update; return set_cr_done(); } } @@ -1109,22 +1114,44 @@ class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrackenv), marker_oid(_marker_oid), sync_marker(_marker), - tn(_tn), objv(objv) {} + tn(_tn), objv(objv), + sync_delta_counters_manager(init_keys(shard_id), _sc->env->cct) {} - RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override { + std::string init_keys(const uint32_t shard_id) { + std::string sz_id = sc->source_zone.id; + std::string lz_id = sc->env->svc->zone->get_zone_params().get_id(); + return ceph::perf_counters::key_create(rgw_sync_delta_counters_key, + {{"local-zone-id", lz_id}, + {"source-zone-id", sz_id}, + {"shard-id", std::to_string(shard_id)}}); + } + + RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp, const real_time& last_update) override { sync_marker.marker = new_marker; sync_marker.pos = index_pos; sync_marker.timestamp = timestamp; + // Since store_marker() is called by full and incremental sync but + // last_update is only modified during incremental sync we only want to + // report deltas for incremental sync + real_time zero_time; + if (last_update != zero_time) { + auto delta = last_update - timestamp; + sync_delta_counters_manager.tset(sync_deltas::l_rgw_datalog_sync_delta, delta); + } + tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker)); return new RGWSimpleRadosWriteCR(sync_env->dpp, sync_env->driver, @@ -1814,7 +1841,7 @@ public: reenter(this) { tn->log(10, "start full sync"); oid = full_data_sync_index_shard_oid(sc->source_zone, shard_id); - marker_tracker.emplace(sc, status_oid, sync_marker, tn, objv); + marker_tracker.emplace(sc, status_oid, sync_marker, tn, objv, shard_id); total_entries = sync_marker.pos; entry_timestamp = sync_marker.timestamp; // time when full sync started do { @@ -1927,6 +1954,7 @@ class RGWDataIncSyncShardCR : public RGWDataBaseSyncShardCR { string next_marker; vector log_entries; + real_time last_update; decltype(log_entries)::iterator log_iter; bool truncated = false; int cbret = 0; @@ -1968,7 +1996,7 @@ public: int operate(const DoutPrefixProvider *dpp) override { reenter(this) { tn->log(10, "start incremental sync"); - marker_tracker.emplace(sc, status_oid, sync_marker, tn, objv); + marker_tracker.emplace(sc, status_oid, sync_marker, tn, objv, shard_id); do { if (!lease_cr->is_locked()) { lost_lock = true; @@ -2073,7 +2101,7 @@ public: yield call(new RGWReadRemoteDataLogShardCR(sc, shard_id, sync_marker.marker, &next_marker, &log_entries, - &truncated)); + &truncated, &last_update)); if (retcode < 0 && retcode != -ENOENT) { tn->log(0, SSTR("ERROR: failed to read remote data log info: ret=" << retcode)); @@ -2100,11 +2128,11 @@ public: tn->log(1, SSTR("failed to parse bucket shard: " << log_iter->entry.key)); marker_tracker->try_update_high_marker(log_iter->log_id, 0, - log_iter->log_timestamp); + log_iter->log_timestamp, last_update); continue; } if (!marker_tracker->start(log_iter->log_id, 0, - log_iter->log_timestamp)) { + log_iter->log_timestamp, last_update)) { tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?")); } else { @@ -3922,6 +3950,7 @@ class RGWReadPendingBucketShardsCoroutine : public RGWCoroutine { std::string next_marker; vector log_entries; bool truncated; + real_time last_update; public: RGWReadPendingBucketShardsCoroutine(RGWDataSyncCtx *_sc, const int _shard_id, @@ -3956,7 +3985,7 @@ int RGWReadPendingBucketShardsCoroutine::operate(const DoutPrefixProvider *dpp) count = 0; do{ yield call(new RGWReadRemoteDataLogShardCR(sc, shard_id, marker, - &next_marker, &log_entries, &truncated)); + &next_marker, &log_entries, &truncated, &last_update)); if (retcode == -ENOENT) { break; @@ -4223,7 +4252,7 @@ public: {} - RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override { + RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp, const real_time& last_update) override { sync_status.full.position = new_marker; sync_status.full.count = index_pos; @@ -4324,7 +4353,7 @@ public: const rgw_raw_obj& get_obj() const { return obj; } - RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override { + RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp, const real_time& last_update) override { sync_marker.position = new_marker; sync_marker.timestamp = timestamp; diff --git a/src/rgw/driver/rados/rgw_rest_log.cc b/src/rgw/driver/rados/rgw_rest_log.cc index 9315dfc0afd..ef67a2b4510 100644 --- a/src/rgw/driver/rados/rgw_rest_log.cc +++ b/src/rgw/driver/rados/rgw_rest_log.cc @@ -691,6 +691,12 @@ void RGWOp_DATALog_List::execute(optional_yield y) { op_ret = static_cast(driver)->svc()-> datalog_rados->list_entries(this, shard_id, max_entries, entries, marker, &last_marker, &truncated, y); + + RGWDataChangesLogInfo info; + op_ret = static_cast(driver)->svc()-> + datalog_rados->get_info(this, shard_id, &info, y); + + last_update = info.last_update; } void RGWOp_DATALog_List::send_response() { @@ -703,6 +709,8 @@ void RGWOp_DATALog_List::send_response() { s->formatter->open_object_section("log_entries"); s->formatter->dump_string("marker", last_marker); + utime_t lu(last_update); + encode_json("last_update", lu, s->formatter); s->formatter->dump_bool("truncated", truncated); { s->formatter->open_array_section("entries"); diff --git a/src/rgw/driver/rados/rgw_rest_log.h b/src/rgw/driver/rados/rgw_rest_log.h index b3a8e49d1b2..7c7e3510c71 100644 --- a/src/rgw/driver/rados/rgw_rest_log.h +++ b/src/rgw/driver/rados/rgw_rest_log.h @@ -209,6 +209,7 @@ class RGWOp_DATALog_List : public RGWRESTOp { std::string last_marker; bool truncated; bool extra_info; + ceph::real_time last_update; public: RGWOp_DATALog_List() : truncated(false), extra_info(false) {} ~RGWOp_DATALog_List() override {} diff --git a/src/rgw/driver/rados/rgw_sync.cc b/src/rgw/driver/rados/rgw_sync.cc index bd730dfd6c2..aeaeb52188c 100644 --- a/src/rgw/driver/rados/rgw_sync.cc +++ b/src/rgw/driver/rados/rgw_sync.cc @@ -1256,7 +1256,7 @@ public: sync_marker(_marker), tn(_tn){} - RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override { + RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp, const real_time& last_update) override { sync_marker.marker = new_marker; if (index_pos > 0) { sync_marker.pos = index_pos; diff --git a/src/rgw/driver/rados/rgw_sync.h b/src/rgw/driver/rados/rgw_sync.h index f0ee28056af..942ea444efe 100644 --- a/src/rgw/driver/rados/rgw_sync.h +++ b/src/rgw/driver/rados/rgw_sync.h @@ -343,9 +343,10 @@ class RGWSyncShardMarkerTrack { struct marker_entry { uint64_t pos; real_time timestamp; + real_time last_update; marker_entry() : pos(0) {} - marker_entry(uint64_t _p, const real_time& _ts) : pos(_p), timestamp(_ts) {} + marker_entry(uint64_t _p, const real_time& _ts, const real_time& _lu = {}) : pos(_p), timestamp(_ts), last_update(_lu) {} }; typename std::map pending; @@ -359,7 +360,7 @@ class RGWSyncShardMarkerTrack { protected: typename std::set need_retry_set; - virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const real_time& timestamp) = 0; + virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const real_time& timestamp, const real_time& last_update) = 0; virtual RGWOrderCallCR *allocate_order_control_cr() = 0; virtual void handle_finish(const T& marker) { } @@ -371,16 +372,16 @@ public: } } - bool start(const T& pos, int index_pos, const real_time& timestamp) { + bool start(const T& pos, int index_pos, const real_time& timestamp, const real_time& last_update = {}) { if (pending.find(pos) != pending.end()) { return false; } - pending[pos] = marker_entry(index_pos, timestamp); + pending[pos] = marker_entry(index_pos, timestamp, last_update); return true; } - void try_update_high_marker(const T& pos, int index_pos, const real_time& timestamp) { - finish_markers[pos] = marker_entry(index_pos, timestamp); + void try_update_high_marker(const T& pos, int index_pos, const real_time& timestamp, const real_time& last_update = {}) { + finish_markers[pos] = marker_entry(index_pos, timestamp, last_update); } RGWCoroutine *finish(const T& pos) { @@ -436,7 +437,7 @@ public: --i; const T& high_marker = i->first; marker_entry& high_entry = i->second; - RGWCoroutine *cr = order(store_marker(high_marker, high_entry.pos, high_entry.timestamp)); + RGWCoroutine *cr = order(store_marker(high_marker, high_entry.pos, high_entry.timestamp, high_entry.last_update)); finish_markers.erase(finish_markers.begin(), last); return cr; } diff --git a/src/rgw/driver/rados/rgw_sync_counters.cc b/src/rgw/driver/rados/rgw_sync_counters.cc index 1d23d58dcfb..b95fa8fc4fe 100644 --- a/src/rgw/driver/rados/rgw_sync_counters.cc +++ b/src/rgw/driver/rados/rgw_sync_counters.cc @@ -3,6 +3,7 @@ #include "common/ceph_context.h" #include "rgw_sync_counters.h" +#include "common/perf_counters_key.h" namespace sync_counters { @@ -26,3 +27,31 @@ PerfCountersRef build(CephContext *cct, const std::string& name) } } // namespace sync_counters + +namespace sync_deltas { + +void add_rgw_sync_delta_counters(PerfCountersBuilder *lpcb) { + lpcb->set_prio_default(PerfCountersBuilder::PRIO_USEFUL); + lpcb->add_time(l_rgw_datalog_sync_delta, "sync_delta", "Sync delta between data log shard in seconds"); +} + +SyncDeltaCountersManager::SyncDeltaCountersManager(const std::string& name, CephContext *cct) + : cct(cct) +{ + std::string_view key = ceph::perf_counters::key_name(name); + ceph_assert(rgw_sync_delta_counters_key == key); + PerfCountersBuilder pcb(cct, name, l_rgw_sync_delta_first, l_rgw_sync_delta_last); + add_rgw_sync_delta_counters(&pcb); + sync_delta_counters = std::unique_ptr(pcb.create_perf_counters()); + cct->get_perfcounters_collection()->add(sync_delta_counters.get()); +} + +void SyncDeltaCountersManager::tset(int idx, ceph::timespan v) { + sync_delta_counters->tset(idx, v); +} + +SyncDeltaCountersManager::~SyncDeltaCountersManager() { + cct->get_perfcounters_collection()->remove(sync_delta_counters.get()); +} + +} // namespace sync_deltas diff --git a/src/rgw/driver/rados/rgw_sync_counters.h b/src/rgw/driver/rados/rgw_sync_counters.h index df3acc68023..5120df3517a 100644 --- a/src/rgw/driver/rados/rgw_sync_counters.h +++ b/src/rgw/driver/rados/rgw_sync_counters.h @@ -23,3 +23,27 @@ enum { PerfCountersRef build(CephContext *cct, const std::string& name); } // namespace sync_counters + +const std::string rgw_sync_delta_counters_key = "rgw_sync_delta"; + +namespace sync_deltas { + +enum { + l_rgw_sync_delta_first = 806000, + l_rgw_datalog_sync_delta, + l_rgw_sync_delta_last, +}; + +class SyncDeltaCountersManager { + std::unique_ptr sync_delta_counters; + CephContext *cct; + +public: + SyncDeltaCountersManager(const std::string& name, CephContext *cct); + + void tset(int idx, ceph::timespan v); + + ~SyncDeltaCountersManager(); +}; + +} // namespace sync_deltas