From d615eafbe22a932608f857663faad03f36c35d86 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 21 Mar 2016 16:50:32 -0700 Subject: [PATCH] rgw: data sync, update and flush high marker We need to update the high marker even if we skip entries, and eventually flush it. This is needed so that our position in the bucket index log that we follow is reflected correctly. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_data_sync.cc | 49 ++++++++++++++++++++++++++++++---------- src/rgw/rgw_sync.h | 12 +++++++--- 2 files changed, 46 insertions(+), 15 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index f53596f21e6b4..24756de8fc629 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -2202,6 +2202,8 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine { string instance; string ns; + string cur_id; + public: @@ -2270,32 +2272,44 @@ int RGWBucketShardIncrementalSyncCR::operate() entries_iter = list_result.begin(); for (; entries_iter != list_result.end(); ++entries_iter) { entry = &(*entries_iter); - inc_marker.position = entry->id; + { + ssize_t p = entry->id.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */ + if (p < 0) { + cur_id = entry->id; + } else { + cur_id = entry->id.substr(p + 1); + } + } + inc_marker.position = cur_id; if (!rgw_obj::parse_raw_oid(entries_iter->object, &name, &instance, &ns)) { set_status() << "parse_raw_oid() on " << entries_iter->object << " returned false, skipping entry"; ldout(sync_env->cct, 20) << "parse_raw_oid() on " << entries_iter->object << " returned false, skipping entry" << dendl; + marker_tracker->try_update_high_marker(cur_id, 0, entries_iter->timestamp); continue; } - ldout(sync_env->cct, 20) << "parsed entry: iter->object=" << entries_iter->object << " iter->instance=" << entries_iter->instance << " name=" << name << " instance=" << instance << " ns=" << ns << dendl; + ldout(sync_env->cct, 20) << "parsed entry: id=" << cur_id << " iter->object=" << entry->object << " iter->instance=" << entry->instance << " name=" << name << " instance=" << instance << " ns=" << ns << dendl; if (!ns.empty()) { - set_status() << "skipping entry in namespace: " << entries_iter->object; - ldout(sync_env->cct, 20) << "skipping entry in namespace: " << entries_iter->object << dendl; + set_status() << "skipping entry in namespace: " << entry->object; + ldout(sync_env->cct, 20) << "skipping entry in namespace: " << entry->object << dendl; + marker_tracker->try_update_high_marker(cur_id, 0, entry->timestamp); continue; } - key = rgw_obj_key(name, entries_iter->instance); - set_status() << "got entry.id=" << entry->id << " key=" << key << " op=" << (int)entry->op; + key = rgw_obj_key(name, entry->instance); + set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op; if (entry->op == CLS_RGW_OP_CANCEL) { set_status() << "canceled operation, skipping"; ldout(sync_env->cct, 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": canceled operation" << dendl; + marker_tracker->try_update_high_marker(cur_id, 0, entry->timestamp); continue; } if (entry->state != CLS_RGW_STATE_COMPLETE) { set_status() << "non-complete operation, skipping"; ldout(sync_env->cct, 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": non-complete operation" << dendl; + marker_tracker->try_update_high_marker(cur_id, 0, entry->timestamp); continue; } ldout(sync_env->cct, 20) << "[inc sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << dendl; @@ -2309,16 +2323,16 @@ int RGWBucketShardIncrementalSyncCR::operate() yield wait_for_child(); } - if (!marker_tracker->index_key_to_marker(key, entry->op, entry->id)) { + if (!marker_tracker->index_key_to_marker(key, entry->op, cur_id)) { set_status() << "can't do op, sync already in progress for object"; - ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << entry->id << ":" << key << " sync already in progress for object" << dendl; - marker_tracker->try_update_high_marker(entry->id, 0, entries_iter->timestamp); + ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object" << dendl; + marker_tracker->try_update_high_marker(cur_id, 0, entry->timestamp); continue; } // yield { set_status() << "start object sync"; - if (!marker_tracker->start(entry->id, 0, entries_iter->timestamp)) { - ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << entry->id << ". Duplicate entry?" << dendl; + if (!marker_tracker->start(cur_id, 0, entry->timestamp)) { + ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << cur_id << ". Duplicate entry?" << dendl; } else { uint64_t versioned_epoch = 0; bucket_entry_owner owner(entry->owner, entry->owner_display_name); @@ -2328,7 +2342,7 @@ int RGWBucketShardIncrementalSyncCR::operate() ldout(sync_env->cct, 20) << __func__ << "(): entry->timestamp=" << entry->timestamp << dendl; spawn(new RGWBucketSyncSingleEntryCR(sync_env, bucket_info, shard_id, key, entry->is_versioned(), versioned_epoch, entry->timestamp, owner, entry->op, - entry->state, entry->id, marker_tracker), false); + entry->state, cur_id, marker_tracker), false); } // } while ((int)num_spawned() > spawn_window) { @@ -2345,9 +2359,20 @@ int RGWBucketShardIncrementalSyncCR::operate() } } while (!list_result.empty()); + yield { + call(marker_tracker->flush()); + } + if (retcode < 0) { + ldout(sync_env->cct, 0) << "ERROR: marker_tracker->flush() returned retcode=" << retcode << dendl; + lease_cr->go_down(); + drain_all(); + return set_cr_error(retcode); + } + lease_cr->go_down(); /* wait for all operations to complete */ drain_all(); + return set_cr_done(); } return 0; diff --git a/src/rgw/rgw_sync.h b/src/rgw/rgw_sync.h index 66c639c42f30e..679f91c6dbb27 100644 --- a/src/rgw/rgw_sync.h +++ b/src/rgw/rgw_sync.h @@ -294,6 +294,7 @@ class RGWSyncShardMarkerTrack { typename std::map pending; T high_marker; + T last_stored_marker; marker_entry high_entry; int window_size; @@ -355,14 +356,19 @@ public: updates_since_flush++; if (is_first && (updates_since_flush >= window_size || pending.empty())) { - return update_marker(high_marker, high_entry); + return flush(); } return NULL; } - RGWCoroutine *update_marker(const T& new_marker, marker_entry& entry) { + RGWCoroutine *flush() { + if (last_stored_marker == high_marker) { + return NULL; + } + updates_since_flush = 0; - return store_marker(new_marker, entry.pos, entry.timestamp); + last_stored_marker = high_marker; + return store_marker(high_marker, high_entry.pos, high_entry.timestamp); } /* -- 2.39.5