#include "rgw_bucket_sync.h"
#include "rgw_metadata.h"
#include "rgw_sync_counters.h"
+#include "rgw_sync_error_repo.h"
#include "rgw_sync_module.h"
#include "rgw_sal.h"
string raw_key;
string entry_marker;
+ ceph::real_time entry_timestamp;
rgw_bucket_shard source_bs;
boost::intrusive_ptr<RGWOmapAppend> error_repo;
bool remove_from_repo;
- set<string> keys;
-
RGWSyncTraceNodeRef tn;
public:
RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc,
- const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker,
+ const string& _raw_key, const string& _entry_marker,
+ ceph::real_time entry_timestamp, RGWDataSyncShardMarkerTrack *_marker_tracker,
RGWOmapAppend *_error_repo, bool _remove_from_repo, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sc->cct),
sc(_sc), sync_env(_sc->env),
raw_key(_raw_key), entry_marker(_entry_marker),
- sync_status(0),
+ entry_timestamp(entry_timestamp), sync_status(0),
marker_tracker(_marker_tracker),
error_repo(_error_repo), remove_from_repo(_remove_from_repo) {
set_description() << "data sync single entry (source_zone=" << sc->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
tn->log(0, SSTR("ERROR: failed to log sync failure: retcode=" << retcode));
}
}
- if (error_repo && !error_repo->append(raw_key)) {
- tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode));
+ if (error_repo) {
+ yield call(rgw_error_repo_write_cr(sync_env->store->svc()->rados, error_repo->get_obj(),
+ raw_key, entry_timestamp));
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode));
+ }
}
} else if (error_repo && remove_from_repo) {
- keys = {raw_key};
- yield call(new RGWRadosRemoveOmapKeysCR(sync_env->store, error_repo->get_obj(), keys));
+ yield call(rgw_error_repo_remove_cr(sync_env->store->svc()->rados, error_repo->get_obj(),
+ raw_key, entry_timestamp));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to remove omap key from error repo ("
<< error_repo->get_obj() << " retcode=" << retcode));
RGWOmapAppend *error_repo;
std::map<std::string, bufferlist> error_entries;
string error_marker;
+ ceph::real_time entry_timestamp;
int max_error_entries;
ceph::coarse_real_time error_retry_time;
oid = full_data_sync_index_shard_oid(sc->source_zone, shard_id);
set_marker_tracker(new RGWDataSyncShardMarkerTrack(sc, status_oid, sync_marker, tn));
total_entries = sync_marker.pos;
+ entry_timestamp = sync_marker.timestamp; // time when full sync started
do {
if (!lease_cr->is_locked()) {
stop_spawned_services();
for (; iter != entries.end(); ++iter) {
tn->log(20, SSTR("full sync: " << iter->first));
total_entries++;
- if (!marker_tracker->start(iter->first, total_entries, real_time())) {
+ if (!marker_tracker->start(iter->first, total_entries, entry_timestamp)) {
tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?"));
} else {
// fetch remote and write locally
- yield spawn(new RGWDataSyncSingleEntryCR(sc, iter->first, iter->first, marker_tracker, error_repo, false, tn), false);
+ yield spawn(new RGWDataSyncSingleEntryCR(sc, iter->first, iter->first, entry_timestamp, marker_tracker, error_repo, false, tn), false);
}
sync_marker.marker = iter->first;
for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
yield {
tn->log(20, SSTR("received async update notification: " << *modified_iter));
- spawn(new RGWDataSyncSingleEntryCR(sc, *modified_iter, string(), marker_tracker, nullptr, false, tn), false);
+ spawn(new RGWDataSyncSingleEntryCR(sc, *modified_iter, string(), ceph::real_time{}, marker_tracker, nullptr, false, tn), false);
}
}
iter = error_entries.begin();
for (; iter != error_entries.end(); ++iter) {
error_marker = iter->first;
- tn->log(20, SSTR("handle error entry: " << error_marker));
- spawn(new RGWDataSyncSingleEntryCR(sc, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false);
+ entry_timestamp = rgw_error_repo_decode_value(iter->second);
+ tn->log(20, SSTR("handle error entry key=" << error_marker << " timestamp=" << entry_timestamp));
+ spawn(new RGWDataSyncSingleEntryCR(sc, error_marker, error_marker,
+ entry_timestamp, nullptr /* no marker tracker */,
+ error_repo, true, tn), false);
}
if (!omapvals->more) {
if (error_marker.empty() && error_entries.empty()) {
if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
} else {
- spawn(new RGWDataSyncSingleEntryCR(sc, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false, tn), false);
+ spawn(new RGWDataSyncSingleEntryCR(sc, log_iter->entry.key, log_iter->log_id,
+ log_iter->log_timestamp, marker_tracker,
+ error_repo, false, tn), false);
}
while ((int)num_spawned() > spawn_window) {
set_status() << "num_spawned() > spawn_window";