From 105f8a0721cb4fc0fb605cdd30e2bc45fd3f050d Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 30 Oct 2015 16:59:44 -0700 Subject: [PATCH] rgw: rework incremental md sync error handling similar to what we do with the full md sync. Identify transient errors, and if so return -EAGAIN so that caller would retry. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_sync.cc | 72 +++++++++++++++++++++++++++++++++------------ 1 file changed, 53 insertions(+), 19 deletions(-) diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 67f08e0d8a709..c3d29db4b8fe1 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -890,6 +890,7 @@ public: } if (sync_status < 0) { +#warning need to store entry for non-transient errors ldout(sync_env->cct, 10) << *this << ": failed to send read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status << dendl; log_error() << "failed to send read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status << std::endl; return set_cr_error(sync_status); @@ -902,15 +903,15 @@ public: sync_status = retcode; - yield { - /* update marker */ - int ret = call(marker_tracker->finish(entry_marker)); - if (ret < 0) { - ldout(sync_env->cct, 0) << "ERROR: marker_tracker->finish(" << entry_marker << ") returned ret=" << ret << dendl; - return set_cr_error(sync_status); - } - } if (sync_status == 0) { + yield { + /* update marker */ + int ret = call(marker_tracker->finish(entry_marker)); + if (ret < 0) { + ldout(sync_env->cct, 0) << "ERROR: marker_tracker->finish(" << entry_marker << ") returned ret=" << ret << dendl; + return set_cr_error(sync_status); + } + } sync_status = retcode; } if (sync_status < 0) { @@ -974,6 +975,7 @@ class RGWMetaSyncShardCR : public RGWCoroutine { uint32_t shard_id; rgw_meta_sync_marker sync_marker; string marker; + string max_marker; map entries; map::iterator iter; @@ -1227,6 +1229,7 @@ public: int incremental_sync() { reenter(&incremental_cr) { + can_adjust_marker = true; /* grab lock */ if (!lease_cr) { /* could have had a lease_cr lock from previous state */ yield { @@ -1255,6 +1258,14 @@ public: set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env, sync_env->shard_obj_name(shard_id), sync_marker)); + + /* + * mdlog_marker: the remote sync marker positiion + * sync_marker: the local sync marker position + * max_marker: the max mdlog position that we fetched + * marker: the current position we try to sync + */ + marker = max_marker = sync_marker.marker; /* inc sync */ do { if (!lease_cr->is_locked()) { @@ -1263,30 +1274,49 @@ public: } #define INCREMENTAL_MAX_ENTRIES 100 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl; - if (mdlog_marker <= sync_marker.marker) { + if (mdlog_marker <= max_marker) { /* we're at the tip, try to bring more entries */ ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " syncing mdlog for shard_id=" << shard_id << dendl; yield call(new RGWCloneMetaLogCoroutine(sync_env, shard_id, mdlog_marker, &mdlog_marker)); } + if (retcode < 0) { + ldout(sync_env->cct, 10) << *this << ": failed to fetch more log entries, retcode=" << retcode << dendl; + return retcode; + } ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl; - if (mdlog_marker > sync_marker.marker) { - yield call(new RGWReadMDLogEntriesCR(sync_env, shard_id, &sync_marker.marker, INCREMENTAL_MAX_ENTRIES, &log_entries, &truncated)); + if (mdlog_marker > max_marker) { + marker = max_marker; + yield call(new RGWReadMDLogEntriesCR(sync_env, shard_id, &max_marker, INCREMENTAL_MAX_ENTRIES, &log_entries, &truncated)); for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) { ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << dendl; marker_tracker->start(log_iter->id); raw_key = log_iter->section + ":" + log_iter->name; - yield spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, marker_tracker), false); - if (retcode < 0) { - return retcode; + yield { + RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, marker_tracker), false); + if (retcode < 0) { + return retcode; + } + assert(stack); + stack->get(); + + stack_to_pos[stack] = log_iter->id; + pos_to_prev[log_iter->id] = marker; + marker = log_iter->id; + } } - } - } - ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl; - if (mdlog_marker == sync_marker.marker) { + } + collect_children(); + ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " max_marker=" << max_marker << " sync_marker.marker=" << sync_marker.marker << dendl; + if (mdlog_marker == max_marker && can_adjust_marker) { #define INCREMENTAL_INTERVAL 20 yield wait(utime_t(INCREMENTAL_INTERVAL, 0)); } - } while (true); + } while (can_adjust_marker); + + while (num_spawned() > 1) { + yield wait_for_child(); + collect_children(); + } yield lease_cr->go_down(); @@ -1295,6 +1325,10 @@ public: if (lost_lock) { return -EBUSY; } + + if (!can_adjust_marker) { + return -EAGAIN; + } } /* TODO */ return 0; -- 2.39.5