From: Yehuda Sadeh Date: Wed, 25 Nov 2015 04:20:35 +0000 (-0800) Subject: rgw: don't spawn multiple concurrent object sync for same object X-Git-Tag: v10.1.0~354^2~167 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e08eb9c1345bf82b6354039753ebd9322c0b32da;p=ceph.git rgw: don't spawn multiple concurrent object sync for same object Either mark the current running operation to retry, or wait for it to finish (if it's not doing the same operation). Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 8e41a766a043..e35c1c81d673 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -570,7 +570,7 @@ public: #define DATA_SYNC_UPDATE_MARKER_WINDOW 1 -class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack { +class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack { RGWRados *store; RGWAsyncRadosProcessor *async_rados; @@ -579,7 +579,6 @@ class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack { map key_to_marker; map marker_to_key; - set need_retry_set; void handle_finish(const string& marker) { map::iterator iter = marker_to_key.find(marker); @@ -587,8 +586,8 @@ class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack { return; } key_to_marker.erase(iter->second); + reset_need_retry(iter->second); marker_to_key.erase(iter); - need_retry_set.erase(marker); } public: @@ -618,28 +617,13 @@ public: */ bool index_key_to_marker(const string& key, const string& marker) { if (key_to_marker.find(key) != key_to_marker.end()) { - need_retry_set.insert(key); + set_need_retry(key); return false; } key_to_marker[key] = marker; marker_to_key[marker] = key; return true; } - - /* - * a key needs retry if it was processing when another marker that points - * to the same bucket shards arrives. Instead of processing it, we mark - * it as need_retry so that when we finish processing the original, we - * retry the processing on the same bucket shard, in case there are more - * entries to process. This closes a race that can happen. - */ - bool need_retry(const string& key) { - return (need_retry_set.find(key) != need_retry_set.end()); - } - - void reset_need_retry(const string& key) { - need_retry_set.erase(key); - } }; class RGWRunBucketSyncCoroutine : public RGWCoroutine { @@ -810,6 +794,7 @@ public: sync_marker(_marker), marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"), total_entries(0), reset_backoff(NULL) { + set_description() << "data sync shard source_zone=" << source_zone << " shard_id=" << shard_id; } ~RGWDataSyncShardCR() { @@ -1714,14 +1699,13 @@ public: #define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10 -class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack { +class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack { RGWRados *store; RGWAsyncRadosProcessor *async_rados; string marker_oid; rgw_bucket_shard_full_sync_marker sync_marker; - public: RGWBucketFullSyncShardMarkerTrack(RGWRados *_store, RGWAsyncRadosProcessor *_async_rados, const string& _marker_oid, @@ -1744,13 +1728,25 @@ public: } }; -class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack { +class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack { RGWRados *store; RGWAsyncRadosProcessor *async_rados; string marker_oid; rgw_bucket_shard_inc_sync_marker sync_marker; + map > key_to_marker; + map marker_to_key; + + void handle_finish(const string& marker) { + map::iterator iter = marker_to_key.find(marker); + if (iter == marker_to_key.end()) { + return; + } + key_to_marker.erase(iter->second); + reset_need_retry(iter->second); + marker_to_key.erase(iter); + } public: RGWBucketIncSyncShardMarkerTrack(RGWRados *_store, RGWAsyncRadosProcessor *_async_rados, @@ -1771,9 +1767,37 @@ public: return new RGWSimpleRadosWriteAttrsCR(async_rados, store, store->get_zone_params().log_pool, marker_oid, attrs); } + + /* + * create index from key -> , and from marker -> key + * this is useful so that we can insure that we only have one + * entry for any key that is used. This is needed when doing + * incremenatl sync of data, and we don't want to run multiple + * concurrent sync operations for the same bucket shard + * Also, we should make sure that we don't run concurrent operations on the same key with + * different ops. + */ + bool index_key_to_marker(const rgw_obj_key& key, RGWModifyOp op, const string& marker) { + if (key_to_marker.find(key) != key_to_marker.end()) { + set_need_retry(key); + return false; + } + key_to_marker[key] = make_pair<>(op, marker); + marker_to_key[marker] = key; + return true; + } + + bool can_do_op(const rgw_obj_key& key, RGWModifyOp op) { + auto i = key_to_marker.find(key); + if (i == key_to_marker.end()) { + return true; + } + + return (i->second.first == op); + } }; -template +template class RGWBucketSyncSingleEntryCR : public RGWCoroutine { RGWRados *store; RGWAsyncRadosProcessor *async_rados; @@ -1789,7 +1813,7 @@ class RGWBucketSyncSingleEntryCR : public RGWCoroutine { RGWPendingState op_state; T entry_marker; - RGWSyncShardMarkerTrack *marker_tracker; + RGWSyncShardMarkerTrack *marker_tracker; int sync_status; @@ -1800,7 +1824,7 @@ public: const rgw_obj_key& _key, uint64_t _versioned_epoch, utime_t& _timestamp, RGWModifyOp _op, RGWPendingState _op_state, - const T& _entry_marker, RGWSyncShardMarkerTrack *_marker_tracker) : RGWCoroutine(_store->ctx()), store(_store), + const T& _entry_marker, RGWSyncShardMarkerTrack *_marker_tracker) : RGWCoroutine(_store->ctx()), store(_store), async_rados(_async_rados), source_zone(_source_zone), bucket_info(_bucket_info), shard_id(_shard_id), @@ -1811,6 +1835,7 @@ public: marker_tracker(_marker_tracker), sync_status(0) { set_description() << "bucket sync single entry (source_zone=" << source_zone << ") b=" << bucket_info->bucket << ":" << shard_id <<"/" << key << "[" << versioned_epoch << "] log_entry=" << entry_marker; + set_status("init"); } int operate() { @@ -1819,32 +1844,36 @@ public: if (op_state != CLS_RGW_STATE_COMPLETE) { goto done; } - yield { - if (op == CLS_RGW_OP_ADD || - op == CLS_RGW_OP_LINK_OLH) { - if (op == CLS_RGW_OP_ADD && !key.instance.empty() && key.instance != "null") { - set_status("skipping entry"); - ldout(store->ctx(), 10) << "bucket skipping sync obj: " << source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]: versioned object will be synced on link_olh" << dendl; - return set_cr_done(); + do { + yield { + marker_tracker->reset_need_retry(key); + if (op == CLS_RGW_OP_ADD || + op == CLS_RGW_OP_LINK_OLH) { + if (op == CLS_RGW_OP_ADD && !key.instance.empty() && key.instance != "null") { + set_status("skipping entry"); + ldout(store->ctx(), 10) << "bucket skipping sync obj: " << source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]: versioned object will be synced on link_olh" << dendl; + goto done; + } + set_status("syncing obj"); + ldout(store->ctx(), 5) << "bucket sync: sync obj: " << source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl; + call(new RGWFetchRemoteObjCR(async_rados, store, source_zone, *bucket_info, + key, versioned_epoch, + true)); + } else if (op == CLS_RGW_OP_DEL) { + call(new RGWRemoveObjCR(async_rados, store, source_zone, *bucket_info, key, versioned_epoch, ×tamp)); } - set_status("syncing obj"); - ldout(store->ctx(), 5) << "bucket sync: sync obj: " << source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl; - call(new RGWFetchRemoteObjCR(async_rados, store, source_zone, *bucket_info, - key, versioned_epoch, - true)); - } else if (op == CLS_RGW_OP_DEL) { - call(new RGWRemoveObjCR(async_rados, store, source_zone, *bucket_info, key, versioned_epoch, ×tamp)); } - } + } while (marker_tracker->need_retry(key)); if (retcode < 0 && retcode != -ENOENT) { - set_status("failed to sync obj"); + set_status() << "failed to sync obj; retcode=" << retcode; rgw_bucket& bucket = bucket_info->bucket; ldout(store->ctx(), 0) << "ERROR: failed to sync object: " << bucket.name << ":" << bucket.bucket_id << ":" << shard_id << "/" << key << dendl; sync_status = retcode; } done: /* update marker */ + set_status() << "calling marker_tracker->finish(" << entry_marker << ")"; yield call(marker_tracker->finish(entry_marker)); if (sync_status == 0) { sync_status = retcode; @@ -1930,7 +1959,7 @@ int RGWBucketShardFullSyncCR::operate() } else { RGWModifyOp op = (entry.key.instance.empty() || entry.key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH); - spawn(new RGWBucketSyncSingleEntryCR(store, async_rados, source_zone, bucket_info, shard_id, + spawn(new RGWBucketSyncSingleEntryCR(store, async_rados, source_zone, bucket_info, shard_id, entry.key, entry.versioned_epoch, entry.mtime, op, CLS_RGW_STATE_COMPLETE, entry.key, marker_tracker), false); } } @@ -1981,8 +2010,12 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine { list list_result; list::iterator entries_iter; rgw_bucket_shard_inc_sync_marker inc_marker; + rgw_obj_key key; + rgw_bi_log_entry *entry; RGWBucketIncSyncShardMarkerTrack *marker_tracker; int spawn_window; + bool updated_status; + public: RGWBucketShardIncrementalSyncCR(RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, @@ -1996,8 +2029,11 @@ public: bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id), bucket_info(_bucket_info), - inc_marker(_inc_marker), marker_tracker(NULL), - spawn_window(BUCKET_SYNC_SPAWN_WINDOW) {} + inc_marker(_inc_marker), entry(NULL), marker_tracker(NULL), + spawn_window(BUCKET_SYNC_SPAWN_WINDOW), updated_status(false) { + set_description() << "bucket shard incremental sync bucket=" << _bucket_name << ":" << _bucket_id << ":" << _shard_id; + set_status("init"); + } ~RGWBucketShardIncrementalSyncCR() { delete marker_tracker; @@ -2014,6 +2050,7 @@ int RGWBucketShardIncrementalSyncCR::operate() inc_marker); do { ldout(store->ctx(), 20) << __func__ << "(): listing bilog for incremental sync" << dendl; + set_status() << "listing bilog; position=" << inc_marker.position; yield call(new RGWListBucketIndexLogCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id, inc_marker.position, &list_result)); if (retcode < 0 && retcode != -ENOENT) { @@ -2023,23 +2060,46 @@ int RGWBucketShardIncrementalSyncCR::operate() } entries_iter = list_result.begin(); for (; entries_iter != list_result.end(); ++entries_iter) { + key = rgw_obj_key(entries_iter->object, entries_iter->instance); + entry = &(*entries_iter); + set_status() << "got entry.id=" << entry->id << " key=" << key << " op=" << (int)entry->op; + if (entry->op == CLS_RGW_OP_CANCEL) { + set_status() << "canceled operation, skipping"; + ldout(store->ctx(), 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": canceled operation" << dendl; + continue; + } + ldout(store->ctx(), 20) << "[inc sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << dendl; + inc_marker.position = entry->id; + updated_status = false; + while (!marker_tracker->can_do_op(key, entry->op)) { + if (!updated_status) { + set_status() << "can't do op, conflicting inflight operation"; + updated_status = true; + } + ldout(store->ctx(), 5) << *this << ": [inc sync] can't do op on key=" << key << " need to wait for conflicting operation to complete" << dendl; + yield wait_for_child(); + + } + if (!marker_tracker->index_key_to_marker(key, entry->op, entry->id)) { + set_status() << "can't do op, sync already in progress for object"; + ldout(store->ctx(), 20) << __func__ << ": skipping sync of entry: " << entry->id << ":" << key << " sync already in progress for object" << dendl; + continue; + } yield { - rgw_obj_key key(entries_iter->object, entries_iter->instance); - ldout(store->ctx(), 20) << "[inc sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << dendl; - rgw_bi_log_entry& entry = *entries_iter; - inc_marker.position = entry.id; - if (!marker_tracker->start(entry.id, 0, entries_iter->timestamp)) { - ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << entry.id << ". Duplicate entry?" << dendl; + set_status() << "start object sync"; + if (!marker_tracker->start(entry->id, 0, entries_iter->timestamp)) { + ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << entry->id << ". Duplicate entry?" << dendl; } else { uint64_t versioned_epoch = 0; - if (entry.ver.pool < 0) { - versioned_epoch = entry.ver.epoch; + if (entry->ver.pool < 0) { + versioned_epoch = entry->ver.epoch; } - spawn(new RGWBucketSyncSingleEntryCR(store, async_rados, source_zone, bucket_info, shard_id, - key, versioned_epoch, entry.timestamp, entry.op, entry.state, entry.id, marker_tracker), false); + spawn(new RGWBucketSyncSingleEntryCR(store, async_rados, source_zone, bucket_info, shard_id, + key, versioned_epoch, entry->timestamp, entry->op, entry->state, entry->id, marker_tracker), false); } } while ((int)num_spawned() > spawn_window) { + set_status() << "num_spawned() > spawn_window"; yield wait_for_child(); while (collect(&ret)) { if (ret < 0) { diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 7fba583df875..eb64054c7a33 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -267,6 +267,10 @@ struct rgw_bucket_shard_inc_sync_marker { void dump(Formatter *f) const { encode_json("position", position, f); } + + bool operator<(const rgw_bucket_shard_inc_sync_marker& m) const { + return (position < m.position); + } }; WRITE_CLASS_ENCODER(rgw_bucket_shard_inc_sync_marker) diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index f1a08a1dc03a..7b1e980b205b 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -886,7 +886,7 @@ public: #define META_SYNC_UPDATE_MARKER_WINDOW 10 -class RGWMetaSyncShardMarkerTrack : public RGWSyncShardMarkerTrack { +class RGWMetaSyncShardMarkerTrack : public RGWSyncShardMarkerTrack { RGWMetaSyncEnv *sync_env; string marker_oid; diff --git a/src/rgw/rgw_sync.h b/src/rgw/rgw_sync.h index de0fd2b0aa4c..e8d0b1d445b1 100644 --- a/src/rgw/rgw_sync.h +++ b/src/rgw/rgw_sync.h @@ -185,7 +185,7 @@ public: } }; -template +template class RGWSyncShardMarkerTrack { struct marker_entry { uint64_t pos; @@ -204,6 +204,8 @@ class RGWSyncShardMarkerTrack { protected: + typename std::set need_retry_set; + virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const utime_t& timestamp) = 0; virtual void handle_finish(const T& marker) { } @@ -228,7 +230,8 @@ public: } typename std::map::iterator iter = pending.begin(); - const T& first_pos = iter->first; + + bool is_first = (pos == iter->first); typename std::map::iterator pos_iter = pending.find(pos); if (pos_iter == pending.end()) { @@ -247,7 +250,7 @@ public: updates_since_flush++; - if (pos == first_pos && (updates_since_flush >= window_size || pending.empty())) { + if (is_first && (updates_since_flush >= window_size || pending.empty())) { return update_marker(high_marker, high_entry); } return NULL; @@ -257,6 +260,25 @@ public: updates_since_flush = 0; return store_marker(new_marker, entry.pos, entry.timestamp); } + + /* + * a key needs retry if it was processing when another marker that points + * to the same bucket shards arrives. Instead of processing it, we mark + * it as need_retry so that when we finish processing the original, we + * retry the processing on the same bucket shard, in case there are more + * entries to process. This closes a race that can happen. + */ + bool need_retry(const K& key) { + return (need_retry_set.find(key) != need_retry_set.end()); + } + + void set_need_retry(const K& key) { + need_retry_set.insert(key); + } + + void reset_need_retry(const K& key) { + need_retry_set.erase(key); + } }; class RGWMetaSyncShardMarkerTrack;