#include "common/WorkQueue.h"
#include "common/Throttle.h"
#include "common/errno.h"
+#include "common/perf_counters_key.h"
#include "rgw_common.h"
#include "rgw_zone.h"
string marker;
bool truncated;
vector<rgw_data_change_log_entry> entries;
+ real_time last_update;
read_remote_data_log_response() : truncated(false) {}
void decode_json(JSONObj *obj) {
JSONDecoder::decode_json("marker", marker, obj);
JSONDecoder::decode_json("truncated", truncated, obj);
+ JSONDecoder::decode_json("last_update", last_update, obj);
JSONDecoder::decode_json("entries", entries, obj);
};
};
string *pnext_marker;
vector<rgw_data_change_log_entry> *entries;
bool *truncated;
+ real_time *last_update;
read_remote_data_log_response response;
std::optional<TOPNSPC::common::PerfGuard> timer;
RGWReadRemoteDataLogShardCR(RGWDataSyncCtx *_sc, int _shard_id,
const std::string& marker, string *pnext_marker,
vector<rgw_data_change_log_entry> *_entries,
- bool *_truncated)
+ bool *_truncated, real_time *_last_update)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
shard_id(_shard_id), marker(marker), pnext_marker(pnext_marker),
- entries(_entries), truncated(_truncated) {
+ entries(_entries), truncated(_truncated), last_update(_last_update) {
}
int operate(const DoutPrefixProvider *dpp) override {
entries->swap(response.entries);
*pnext_marker = response.marker;
*truncated = response.truncated;
+ *last_update = response.last_update;
return set_cr_done();
}
}
rgw_data_sync_marker sync_marker;
RGWSyncTraceNodeRef tn;
RGWObjVersionTracker& objv;
+ sync_deltas::SyncDeltaCountersManager sync_delta_counters_manager;
public:
RGWDataSyncShardMarkerTrack(RGWDataSyncCtx *_sc,
const string& _marker_oid,
const rgw_data_sync_marker& _marker,
- RGWSyncTraceNodeRef& _tn, RGWObjVersionTracker& objv) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
+ RGWSyncTraceNodeRef& _tn,
+ RGWObjVersionTracker& objv,
+ const uint32_t shard_id) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
sc(_sc), sync_env(_sc->env),
marker_oid(_marker_oid),
sync_marker(_marker),
- tn(_tn), objv(objv) {}
+ tn(_tn), objv(objv),
+ sync_delta_counters_manager(init_keys(shard_id), _sc->env->cct) {}
- RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
+ std::string init_keys(const uint32_t shard_id) {
+ std::string sz_id = sc->source_zone.id;
+ std::string lz_id = sc->env->svc->zone->get_zone_params().get_id();
+ return ceph::perf_counters::key_create(rgw_sync_delta_counters_key,
+ {{"local-zone-id", lz_id},
+ {"source-zone-id", sz_id},
+ {"shard-id", std::to_string(shard_id)}});
+ }
+
+ RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp, const real_time& last_update) override {
sync_marker.marker = new_marker;
sync_marker.pos = index_pos;
sync_marker.timestamp = timestamp;
+ // Since store_marker() is called by full and incremental sync but
+ // last_update is only modified during incremental sync we only want to
+ // report deltas for incremental sync
+ real_time zero_time;
+ if (last_update != zero_time) {
+ auto delta = last_update - timestamp;
+ sync_delta_counters_manager.tset(sync_deltas::l_rgw_datalog_sync_delta, delta);
+ }
+
tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->dpp, sync_env->driver,
reenter(this) {
tn->log(10, "start full sync");
oid = full_data_sync_index_shard_oid(sc->source_zone, shard_id);
- marker_tracker.emplace(sc, status_oid, sync_marker, tn, objv);
+ marker_tracker.emplace(sc, status_oid, sync_marker, tn, objv, shard_id);
total_entries = sync_marker.pos;
entry_timestamp = sync_marker.timestamp; // time when full sync started
do {
string next_marker;
vector<rgw_data_change_log_entry> log_entries;
+ real_time last_update;
decltype(log_entries)::iterator log_iter;
bool truncated = false;
int cbret = 0;
int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
tn->log(10, "start incremental sync");
- marker_tracker.emplace(sc, status_oid, sync_marker, tn, objv);
+ marker_tracker.emplace(sc, status_oid, sync_marker, tn, objv, shard_id);
do {
if (!lease_cr->is_locked()) {
lost_lock = true;
yield call(new RGWReadRemoteDataLogShardCR(sc, shard_id,
sync_marker.marker,
&next_marker, &log_entries,
- &truncated));
+ &truncated, &last_update));
if (retcode < 0 && retcode != -ENOENT) {
tn->log(0, SSTR("ERROR: failed to read remote data log info: ret="
<< retcode));
tn->log(1, SSTR("failed to parse bucket shard: "
<< log_iter->entry.key));
marker_tracker->try_update_high_marker(log_iter->log_id, 0,
- log_iter->log_timestamp);
+ log_iter->log_timestamp, last_update);
continue;
}
if (!marker_tracker->start(log_iter->log_id, 0,
- log_iter->log_timestamp)) {
+ log_iter->log_timestamp, last_update)) {
tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id
<< ". Duplicate entry?"));
} else {
std::string next_marker;
vector<rgw_data_change_log_entry> log_entries;
bool truncated;
+ real_time last_update;
public:
RGWReadPendingBucketShardsCoroutine(RGWDataSyncCtx *_sc, const int _shard_id,
count = 0;
do{
yield call(new RGWReadRemoteDataLogShardCR(sc, shard_id, marker,
- &next_marker, &log_entries, &truncated));
+ &next_marker, &log_entries, &truncated, &last_update));
if (retcode == -ENOENT) {
break;
{}
- RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
+ RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp, const real_time& last_update) override {
sync_status.full.position = new_marker;
sync_status.full.count = index_pos;
const rgw_raw_obj& get_obj() const { return obj; }
- RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
+ RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp, const real_time& last_update) override {
sync_marker.position = new_marker;
sync_marker.timestamp = timestamp;
struct marker_entry {
uint64_t pos;
real_time timestamp;
+ real_time last_update;
marker_entry() : pos(0) {}
- marker_entry(uint64_t _p, const real_time& _ts) : pos(_p), timestamp(_ts) {}
+ marker_entry(uint64_t _p, const real_time& _ts, const real_time& _lu = {}) : pos(_p), timestamp(_ts), last_update(_lu) {}
};
typename std::map<T, marker_entry> pending;
protected:
typename std::set<K> need_retry_set;
- virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const real_time& timestamp) = 0;
+ virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const real_time& timestamp, const real_time& last_update) = 0;
virtual RGWOrderCallCR *allocate_order_control_cr() = 0;
virtual void handle_finish(const T& marker) { }
}
}
- bool start(const T& pos, int index_pos, const real_time& timestamp) {
+ bool start(const T& pos, int index_pos, const real_time& timestamp, const real_time& last_update = {}) {
if (pending.find(pos) != pending.end()) {
return false;
}
- pending[pos] = marker_entry(index_pos, timestamp);
+ pending[pos] = marker_entry(index_pos, timestamp, last_update);
return true;
}
- void try_update_high_marker(const T& pos, int index_pos, const real_time& timestamp) {
- finish_markers[pos] = marker_entry(index_pos, timestamp);
+ void try_update_high_marker(const T& pos, int index_pos, const real_time& timestamp, const real_time& last_update = {}) {
+ finish_markers[pos] = marker_entry(index_pos, timestamp, last_update);
}
RGWCoroutine *finish(const T& pos) {
--i;
const T& high_marker = i->first;
marker_entry& high_entry = i->second;
- RGWCoroutine *cr = order(store_marker(high_marker, high_entry.pos, high_entry.timestamp));
+ RGWCoroutine *cr = order(store_marker(high_marker, high_entry.pos, high_entry.timestamp, high_entry.last_update));
finish_markers.erase(finish_markers.begin(), last);
return cr;
}