From 1034a68fd12687ac81e6afc4718dbc8045648034 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 6 Apr 2018 17:41:37 -0700 Subject: [PATCH] rgw: force last writer wins on marker writes Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_data_sync.cc | 12 +++++++++ src/rgw/rgw_sync.cc | 19 +++++++++++++ src/rgw/rgw_sync.h | 58 ++++++++++++++++++++++++++++++++++++++-- 3 files changed, 87 insertions(+), 2 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index de4515f42b1b4..c21c3de1fe217 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -954,6 +954,10 @@ public: marker_to_key[marker] = key; return true; } + + RGWOrderCallCR *allocate_order_control_cr() { + return new RGWLastCallerWinsCR(sync_env->cct); + } }; // ostream wrappers to print buckets without copying strings @@ -2285,6 +2289,10 @@ public: rgw_raw_obj(store->get_zone_params().log_pool, marker_oid), attrs); } + + RGWOrderCallCR *allocate_order_control_cr() { + return new RGWLastCallerWinsCR(sync_env->cct); + } }; class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack { @@ -2357,6 +2365,10 @@ public: bool can_do_op(const rgw_obj_key& key) { return (key_to_marker.find(key) == key_to_marker.end()); } + + RGWOrderCallCR *allocate_order_control_cr() { + return new RGWLastCallerWinsCR(sync_env->cct); + } }; template diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 7b9f88c466266..1dcda31d26e75 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -1159,6 +1159,21 @@ public: #define META_SYNC_UPDATE_MARKER_WINDOW 10 + +int RGWLastCallerWinsCR::operate() { + RGWCoroutine *call_cr; + reenter(this) { + while (cr) { + call_cr = cr; + cr = nullptr; + yield call(call_cr); + /* cr might have been modified at this point */ + } + return set_cr_done(); + } + return 0; +} + class RGWMetaSyncShardMarkerTrack : public RGWSyncShardMarkerTrack { RGWMetaSyncEnv *sync_env; @@ -1195,6 +1210,10 @@ public: rgw_raw_obj(store->get_zone_params().log_pool, marker_oid), sync_marker); } + + RGWOrderCallCR *allocate_order_control_cr() { + return new RGWLastCallerWinsCR(sync_env->cct); + } }; RGWMetaSyncSingleEntryCR::RGWMetaSyncSingleEntryCR(RGWMetaSyncEnv *_sync_env, diff --git a/src/rgw/rgw_sync.h b/src/rgw/rgw_sync.h index f93592ce612bd..cea8a5cb88672 100644 --- a/src/rgw/rgw_sync.h +++ b/src/rgw/rgw_sync.h @@ -289,6 +289,36 @@ public: } }; +class RGWOrderCallCR : public RGWCoroutine +{ +public: + RGWOrderCallCR(CephContext *cct) : RGWCoroutine(cct) {} + + virtual void call_cr(RGWCoroutine *_cr) = 0; +}; + +class RGWLastCallerWinsCR : public RGWOrderCallCR +{ + RGWCoroutine *cr{nullptr}; + +public: + RGWLastCallerWinsCR(CephContext *cct) : RGWOrderCallCR(cct) {} + ~RGWLastCallerWinsCR() { + if (cr) { + cr->put(); + } + } + + int operate() override; + + void call_cr(RGWCoroutine *_cr) { + if (cr) { + cr->put(); + } + cr = _cr; + } +}; + template class RGWSyncShardMarkerTrack { struct marker_entry { @@ -305,16 +335,22 @@ class RGWSyncShardMarkerTrack { int window_size; int updates_since_flush; + RGWOrderCallCR *order_cr{nullptr}; protected: typename std::set need_retry_set; virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const real_time& timestamp) = 0; + virtual RGWOrderCallCR *allocate_order_control_cr() = 0; virtual void handle_finish(const T& marker) { } public: RGWSyncShardMarkerTrack(int _window_size) : window_size(_window_size), updates_since_flush(0) {} - virtual ~RGWSyncShardMarkerTrack() {} + virtual ~RGWSyncShardMarkerTrack() { + if (order_cr) { + order_cr->put(); + } + } bool start(const T& pos, int index_pos, const real_time& timestamp) { if (pending.find(pos) != pending.end()) { @@ -381,7 +417,7 @@ public: --i; const T& high_marker = i->first; marker_entry& high_entry = i->second; - RGWCoroutine *cr = store_marker(high_marker, high_entry.pos, high_entry.timestamp); + RGWCoroutine *cr = order(store_marker(high_marker, high_entry.pos, high_entry.timestamp)); finish_markers.erase(finish_markers.begin(), last); return cr; } @@ -404,6 +440,24 @@ public: void reset_need_retry(const K& key) { need_retry_set.erase(key); } + + RGWCoroutine *order(RGWCoroutine *cr) { + /* either returns a new RGWLastWriteWinsCR, or update existing one, in which case it returns + * nothing and the existing one will call the cr + */ + if (order_cr && order_cr->is_done()) { + order_cr->put(); + order_cr = nullptr; + } + if (!order_cr) { + order_cr = allocate_order_control_cr(); + order_cr->get(); + order_cr->call_cr(cr); + return order_cr; + } + order_cr->call_cr(cr); + return nullptr; /* don't call it a second time */ + } }; class RGWMetaSyncShardMarkerTrack; -- 2.39.5