From: Yehuda Sadeh Date: Mon, 21 Mar 2016 23:50:32 +0000 (-0700) Subject: rgw: data sync, update and flush high marker X-Git-Tag: v10.1.1~128^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d615eafbe22a932608f857663faad03f36c35d86;p=ceph.git rgw: data sync, update and flush high marker We need to update the high marker even if we skip entries, and eventually flush it. This is needed so that our position in the bucket index log that we follow is reflected correctly. Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index f53596f21e6b..24756de8fc62 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -2202,6 +2202,8 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine { string instance; string ns; + string cur_id; + public: @@ -2270,32 +2272,44 @@ int RGWBucketShardIncrementalSyncCR::operate() entries_iter = list_result.begin(); for (; entries_iter != list_result.end(); ++entries_iter) { entry = &(*entries_iter); - inc_marker.position = entry->id; + { + ssize_t p = entry->id.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */ + if (p < 0) { + cur_id = entry->id; + } else { + cur_id = entry->id.substr(p + 1); + } + } + inc_marker.position = cur_id; if (!rgw_obj::parse_raw_oid(entries_iter->object, &name, &instance, &ns)) { set_status() << "parse_raw_oid() on " << entries_iter->object << " returned false, skipping entry"; ldout(sync_env->cct, 20) << "parse_raw_oid() on " << entries_iter->object << " returned false, skipping entry" << dendl; + marker_tracker->try_update_high_marker(cur_id, 0, entries_iter->timestamp); continue; } - ldout(sync_env->cct, 20) << "parsed entry: iter->object=" << entries_iter->object << " iter->instance=" << entries_iter->instance << " name=" << name << " instance=" << instance << " ns=" << ns << dendl; + ldout(sync_env->cct, 20) << "parsed entry: id=" << cur_id << " iter->object=" << entry->object << " iter->instance=" << entry->instance << " name=" << name << " instance=" << instance << " ns=" << ns << dendl; if (!ns.empty()) { - set_status() << "skipping entry in namespace: " << entries_iter->object; - ldout(sync_env->cct, 20) << "skipping entry in namespace: " << entries_iter->object << dendl; + set_status() << "skipping entry in namespace: " << entry->object; + ldout(sync_env->cct, 20) << "skipping entry in namespace: " << entry->object << dendl; + marker_tracker->try_update_high_marker(cur_id, 0, entry->timestamp); continue; } - key = rgw_obj_key(name, entries_iter->instance); - set_status() << "got entry.id=" << entry->id << " key=" << key << " op=" << (int)entry->op; + key = rgw_obj_key(name, entry->instance); + set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op; if (entry->op == CLS_RGW_OP_CANCEL) { set_status() << "canceled operation, skipping"; ldout(sync_env->cct, 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": canceled operation" << dendl; + marker_tracker->try_update_high_marker(cur_id, 0, entry->timestamp); continue; } if (entry->state != CLS_RGW_STATE_COMPLETE) { set_status() << "non-complete operation, skipping"; ldout(sync_env->cct, 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": non-complete operation" << dendl; + marker_tracker->try_update_high_marker(cur_id, 0, entry->timestamp); continue; } ldout(sync_env->cct, 20) << "[inc sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << dendl; @@ -2309,16 +2323,16 @@ int RGWBucketShardIncrementalSyncCR::operate() yield wait_for_child(); } - if (!marker_tracker->index_key_to_marker(key, entry->op, entry->id)) { + if (!marker_tracker->index_key_to_marker(key, entry->op, cur_id)) { set_status() << "can't do op, sync already in progress for object"; - ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << entry->id << ":" << key << " sync already in progress for object" << dendl; - marker_tracker->try_update_high_marker(entry->id, 0, entries_iter->timestamp); + ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object" << dendl; + marker_tracker->try_update_high_marker(cur_id, 0, entry->timestamp); continue; } // yield { set_status() << "start object sync"; - if (!marker_tracker->start(entry->id, 0, entries_iter->timestamp)) { - ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << entry->id << ". Duplicate entry?" << dendl; + if (!marker_tracker->start(cur_id, 0, entry->timestamp)) { + ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << cur_id << ". Duplicate entry?" << dendl; } else { uint64_t versioned_epoch = 0; bucket_entry_owner owner(entry->owner, entry->owner_display_name); @@ -2328,7 +2342,7 @@ int RGWBucketShardIncrementalSyncCR::operate() ldout(sync_env->cct, 20) << __func__ << "(): entry->timestamp=" << entry->timestamp << dendl; spawn(new RGWBucketSyncSingleEntryCR(sync_env, bucket_info, shard_id, key, entry->is_versioned(), versioned_epoch, entry->timestamp, owner, entry->op, - entry->state, entry->id, marker_tracker), false); + entry->state, cur_id, marker_tracker), false); } // } while ((int)num_spawned() > spawn_window) { @@ -2345,9 +2359,20 @@ int RGWBucketShardIncrementalSyncCR::operate() } } while (!list_result.empty()); + yield { + call(marker_tracker->flush()); + } + if (retcode < 0) { + ldout(sync_env->cct, 0) << "ERROR: marker_tracker->flush() returned retcode=" << retcode << dendl; + lease_cr->go_down(); + drain_all(); + return set_cr_error(retcode); + } + lease_cr->go_down(); /* wait for all operations to complete */ drain_all(); + return set_cr_done(); } return 0; diff --git a/src/rgw/rgw_sync.h b/src/rgw/rgw_sync.h index 66c639c42f30..679f91c6dbb2 100644 --- a/src/rgw/rgw_sync.h +++ b/src/rgw/rgw_sync.h @@ -294,6 +294,7 @@ class RGWSyncShardMarkerTrack { typename std::map pending; T high_marker; + T last_stored_marker; marker_entry high_entry; int window_size; @@ -355,14 +356,19 @@ public: updates_since_flush++; if (is_first && (updates_since_flush >= window_size || pending.empty())) { - return update_marker(high_marker, high_entry); + return flush(); } return NULL; } - RGWCoroutine *update_marker(const T& new_marker, marker_entry& entry) { + RGWCoroutine *flush() { + if (last_stored_marker == high_marker) { + return NULL; + } + updates_since_flush = 0; - return store_marker(new_marker, entry.pos, entry.timestamp); + last_stored_marker = high_marker; + return store_marker(high_marker, high_entry.pos, high_entry.timestamp); } /*