From: Yehuda Sadeh Date: Fri, 13 Nov 2015 04:50:12 +0000 (-0800) Subject: rgw: start marker tracker sync entry can fail X-Git-Tag: v10.1.0~354^2~214 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8c11019bf491cf68ed3e9f9f7978298e19eb69b1;p=ceph.git rgw: start marker tracker sync entry can fail We might hit duplicate entry, just skip it, don't assert. Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 774213f6b42a..ac8b1362d4b7 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -918,11 +918,14 @@ public: for (; iter != entries.end(); ++iter) { ldout(store->ctx(), 20) << __func__ << ": full sync: " << iter->first << dendl; total_entries++; - marker_tracker->start(iter->first, total_entries, utime_t()); + if (!marker_tracker->start(iter->first, total_entries, utime_t())) { + ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl; + } else { // fetch remote and write locally - yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, iter->first, iter->first, marker_tracker), false); - if (retcode < 0) { - return set_cr_error(retcode); + yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, iter->first, iter->first, marker_tracker), false); + if (retcode < 0) { + return set_cr_error(retcode); + } } sync_marker.marker = iter->first; } @@ -989,10 +992,13 @@ public: ldout(store->ctx(), 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl; continue; } - marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp); - yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, log_iter->entry.key, log_iter->log_id, marker_tracker), false); - if (retcode < 0) { - return set_cr_error(retcode); + if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) { + ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl; + } else { + yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, log_iter->entry.key, log_iter->log_id, marker_tracker), false); + if (retcode < 0) { + return set_cr_error(retcode); + } } } } @@ -2033,13 +2039,15 @@ int RGWBucketShardFullSyncCR::operate() yield { bucket_list_entry& entry = *entries_iter; total_entries++; - marker_tracker->start(entry.key, total_entries, utime_t()); list_marker = entry.key; + if (!marker_tracker->start(entry.key, total_entries, utime_t())) { + ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << entry.key << ". Duplicate entry?" << dendl; + } else { + RGWModifyOp op = (entry.key.instance.empty() || entry.key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH); - RGWModifyOp op = (entry.key.instance.empty() || entry.key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH); - - spawn(new RGWBucketSyncSingleEntryCR(store, async_rados, source_zone, bucket_info, shard_id, - entry.key, entry.versioned_epoch, entry.mtime, op, entry.key, marker_tracker), false); + spawn(new RGWBucketSyncSingleEntryCR(store, async_rados, source_zone, bucket_info, shard_id, + entry.key, entry.versioned_epoch, entry.mtime, op, entry.key, marker_tracker), false); + } } while ((int)num_spawned() > spawn_window) { yield wait_for_child(); @@ -2144,14 +2152,17 @@ int RGWBucketShardIncrementalSyncCR::operate() rgw_obj_key key(entries_iter->object, entries_iter->instance); ldout(store->ctx(), 20) << "[inc sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << dendl; rgw_bi_log_entry& entry = *entries_iter; - marker_tracker->start(entry.id, 0, entries_iter->timestamp); inc_marker.position = entry.id; - uint64_t versioned_epoch = 0; - if (entry.ver.pool < 0) { - versioned_epoch = entry.ver.epoch; + if (!marker_tracker->start(entry.id, 0, entries_iter->timestamp)) { + ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << entry.id << ". Duplicate entry?" << dendl; + } else { + uint64_t versioned_epoch = 0; + if (entry.ver.pool < 0) { + versioned_epoch = entry.ver.epoch; + } + spawn(new RGWBucketSyncSingleEntryCR(store, async_rados, source_zone, bucket_info, shard_id, + key, versioned_epoch, entry.timestamp, entry.op, entry.id, marker_tracker), false); } - spawn(new RGWBucketSyncSingleEntryCR(store, async_rados, source_zone, bucket_info, shard_id, - key, versioned_epoch, entry.timestamp, entry.op, entry.id, marker_tracker), false); } while ((int)num_spawned() > spawn_window) { yield wait_for_child(); diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 3ececd2af337..8b9c73016a7f 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -1209,19 +1209,21 @@ public: for (; iter != entries.end(); ++iter) { ldout(sync_env->cct, 20) << __func__ << ": full sync: " << iter->first << dendl; total_entries++; - marker_tracker->start(iter->first, total_entries, utime_t()); - + if (!marker_tracker->start(iter->first, total_entries, utime_t())) { + ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl; + } else { // fetch remote and write locally - yield { - RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker), false); - if (retcode < 0) { - return retcode; - } - assert(stack); - stack->get(); + yield { + RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker), false); + if (retcode < 0) { + return retcode; + } + assert(stack); + stack->get(); - stack_to_pos[stack] = iter->first; - pos_to_prev[iter->first] = marker; + stack_to_pos[stack] = iter->first; + pos_to_prev[iter->first] = marker; + } } marker = iter->first; } @@ -1340,17 +1342,20 @@ public: yield call(new RGWReadMDLogEntriesCR(sync_env, shard_id, &max_marker, INCREMENTAL_MAX_ENTRIES, &log_entries, &truncated)); for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) { ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << dendl; - marker_tracker->start(log_iter->id, 0, log_iter->timestamp); - raw_key = log_iter->section + ":" + log_iter->name; - yield { - RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, marker_tracker), false); - assert(stack); - stack->get(); - - stack_to_pos[stack] = log_iter->id; - pos_to_prev[log_iter->id] = marker; - marker = log_iter->id; + if (!marker_tracker->start(log_iter->id, 0, log_iter->timestamp)) { + ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->id << ". Duplicate entry?" << dendl; + } else { + raw_key = log_iter->section + ":" + log_iter->name; + yield { + RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, marker_tracker), false); + assert(stack); + stack->get(); + + stack_to_pos[stack] = log_iter->id; + pos_to_prev[log_iter->id] = marker; + } } + marker = log_iter->id; } } collect_children(); diff --git a/src/rgw/rgw_sync.h b/src/rgw/rgw_sync.h index 51f3e4ef0419..dfd1a57f73fe 100644 --- a/src/rgw/rgw_sync.h +++ b/src/rgw/rgw_sync.h @@ -226,8 +226,12 @@ public: RGWSyncShardMarkerTrack(int _window_size) : window_size(_window_size), updates_since_flush(0) {} virtual ~RGWSyncShardMarkerTrack() {} - void start(const T& pos, int index_pos, const utime_t& timestamp) { + bool start(const T& pos, int index_pos, const utime_t& timestamp) { + if (pending.find(pos) != pending.end()) { + return false; + } pending[pos] = marker_entry(index_pos, timestamp); + return true; } RGWCoroutine *finish(const T& pos) {