]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: force last writer wins on marker writes
authorYehuda Sadeh <yehuda@redhat.com>
Sat, 7 Apr 2018 00:41:37 +0000 (17:41 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Thu, 12 Apr 2018 23:08:40 +0000 (16:08 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_data_sync.cc
src/rgw/rgw_sync.cc
src/rgw/rgw_sync.h

index de4515f42b1b40094ad79fa80f2fe39107e9e93e..c21c3de1fe2170ea39b119506930e42007632f33 100644 (file)
@@ -954,6 +954,10 @@ public:
     marker_to_key[marker] = key;
     return true;
   }
+
+  RGWOrderCallCR *allocate_order_control_cr() {
+    return new RGWLastCallerWinsCR(sync_env->cct);
+  }
 };
 
 // ostream wrappers to print buckets without copying strings
@@ -2285,6 +2289,10 @@ public:
                                           rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
                                           attrs);
   }
+
+  RGWOrderCallCR *allocate_order_control_cr() {
+    return new RGWLastCallerWinsCR(sync_env->cct);
+  }
 };
 
 class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, rgw_obj_key> {
@@ -2357,6 +2365,10 @@ public:
   bool can_do_op(const rgw_obj_key& key) {
     return (key_to_marker.find(key) == key_to_marker.end());
   }
+
+  RGWOrderCallCR *allocate_order_control_cr() {
+    return new RGWLastCallerWinsCR(sync_env->cct);
+  }
 };
 
 template <class T, class K>
index 7b9f88c466266015561fde251c58954a0b0db9ee..1dcda31d26e75a75d1e267fb523226dbd091b018 100644 (file)
@@ -1159,6 +1159,21 @@ public:
 
 #define META_SYNC_UPDATE_MARKER_WINDOW 10
 
+
+int RGWLastCallerWinsCR::operate() {
+  RGWCoroutine *call_cr;
+  reenter(this) {
+    while (cr) {
+      call_cr = cr;
+      cr = nullptr;
+      yield call(call_cr);
+      /* cr might have been modified at this point */
+    }
+    return set_cr_done();
+  }
+  return 0;
+}
+
 class RGWMetaSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
   RGWMetaSyncEnv *sync_env;
 
@@ -1195,6 +1210,10 @@ public:
                                                            rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
                                                            sync_marker);
   }
+
+  RGWOrderCallCR *allocate_order_control_cr() {
+    return new RGWLastCallerWinsCR(sync_env->cct);
+  }
 };
 
 RGWMetaSyncSingleEntryCR::RGWMetaSyncSingleEntryCR(RGWMetaSyncEnv *_sync_env,
index f93592ce612bd287d1bb32e930f98fde1fac7d26..cea8a5cb886720463e57ee6deec7ab238d020c1e 100644 (file)
@@ -289,6 +289,36 @@ public:
   }
 };
 
+class RGWOrderCallCR : public RGWCoroutine
+{
+public:
+  RGWOrderCallCR(CephContext *cct) : RGWCoroutine(cct) {}
+
+  virtual void call_cr(RGWCoroutine *_cr) = 0;
+};
+
+class RGWLastCallerWinsCR : public RGWOrderCallCR
+{
+  RGWCoroutine *cr{nullptr};
+
+public:
+  RGWLastCallerWinsCR(CephContext *cct) : RGWOrderCallCR(cct) {}
+  ~RGWLastCallerWinsCR() {
+    if (cr) {
+      cr->put();
+    }
+  }
+
+  int operate() override;
+
+  void call_cr(RGWCoroutine *_cr) {
+    if (cr) {
+      cr->put();
+    }
+    cr = _cr;
+  }
+};
+
 template <class T, class K>
 class RGWSyncShardMarkerTrack {
   struct marker_entry {
@@ -305,16 +335,22 @@ class RGWSyncShardMarkerTrack {
   int window_size;
   int updates_since_flush;
 
+  RGWOrderCallCR *order_cr{nullptr};
 
 protected:
   typename std::set<K> need_retry_set;
 
   virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const real_time& timestamp) = 0;
+  virtual RGWOrderCallCR *allocate_order_control_cr() = 0;
   virtual void handle_finish(const T& marker) { }
 
 public:
   RGWSyncShardMarkerTrack(int _window_size) : window_size(_window_size), updates_since_flush(0) {}
-  virtual ~RGWSyncShardMarkerTrack() {}
+  virtual ~RGWSyncShardMarkerTrack() {
+    if (order_cr) {
+      order_cr->put();
+    }
+  }
 
   bool start(const T& pos, int index_pos, const real_time& timestamp) {
     if (pending.find(pos) != pending.end()) {
@@ -381,7 +417,7 @@ public:
     --i;
     const T& high_marker = i->first;
     marker_entry& high_entry = i->second;
-    RGWCoroutine *cr = store_marker(high_marker, high_entry.pos, high_entry.timestamp);
+    RGWCoroutine *cr = order(store_marker(high_marker, high_entry.pos, high_entry.timestamp));
     finish_markers.erase(finish_markers.begin(), last);
     return cr;
   }
@@ -404,6 +440,24 @@ public:
   void reset_need_retry(const K& key) {
     need_retry_set.erase(key);
   }
+
+  RGWCoroutine *order(RGWCoroutine *cr) {
+    /* either returns a new RGWLastWriteWinsCR, or update existing one, in which case it returns
+     * nothing and the existing one will call the cr
+     */
+    if (order_cr && order_cr->is_done()) {
+      order_cr->put();
+      order_cr = nullptr;
+    }
+    if (!order_cr) {
+      order_cr = allocate_order_control_cr();
+      order_cr->get();
+      order_cr->call_cr(cr);
+      return order_cr;
+    }
+    order_cr->call_cr(cr);
+    return nullptr; /* don't call it a second time */
+  }
 };
 
 class RGWMetaSyncShardMarkerTrack;