]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: don't spawn multiple concurrent object sync for same object
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 25 Nov 2015 04:20:35 +0000 (20:20 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Feb 2016 00:13:39 +0000 (16:13 -0800)
Either mark the current running operation to retry, or wait for it to finish
(if it's not doing the same operation).

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h
src/rgw/rgw_sync.cc
src/rgw/rgw_sync.h

index 8e41a766a043fc3aba90b340b8963714181c7f5d..e35c1c81d6737c4552c20f99a4a1270cd7825da2 100644 (file)
@@ -570,7 +570,7 @@ public:
 
 #define DATA_SYNC_UPDATE_MARKER_WINDOW 1
 
-class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string> {
+class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
   RGWRados *store;
   RGWAsyncRadosProcessor *async_rados;
 
@@ -579,7 +579,6 @@ class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string> {
 
   map<string, string> key_to_marker;
   map<string, string> marker_to_key;
-  set<string> need_retry_set;
 
   void handle_finish(const string& marker) {
     map<string, string>::iterator iter = marker_to_key.find(marker);
@@ -587,8 +586,8 @@ class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string> {
       return;
     }
     key_to_marker.erase(iter->second);
+    reset_need_retry(iter->second);
     marker_to_key.erase(iter);
-    need_retry_set.erase(marker);
   }
 
 public:
@@ -618,28 +617,13 @@ public:
    */
   bool index_key_to_marker(const string& key, const string& marker) {
     if (key_to_marker.find(key) != key_to_marker.end()) {
-      need_retry_set.insert(key);
+      set_need_retry(key);
       return false;
     }
     key_to_marker[key] = marker;
     marker_to_key[marker] = key;
     return true;
   }
-
-  /*
-   * a key needs retry if it was processing when another marker that points
-   * to the same bucket shards arrives. Instead of processing it, we mark
-   * it as need_retry so that when we finish processing the original, we
-   * retry the processing on the same bucket shard, in case there are more
-   * entries to process. This closes a race that can happen.
-   */
-  bool need_retry(const string& key) {
-    return (need_retry_set.find(key) != need_retry_set.end());
-  }
-
-  void reset_need_retry(const string& key) {
-    need_retry_set.erase(key);
-  }
 };
 
 class RGWRunBucketSyncCoroutine : public RGWCoroutine {
@@ -810,6 +794,7 @@ public:
                                                      sync_marker(_marker),
                                                       marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
                                                       total_entries(0), reset_backoff(NULL) {
+    set_description() << "data sync shard source_zone=" << source_zone << " shard_id=" << shard_id;
   }
 
   ~RGWDataSyncShardCR() {
@@ -1714,14 +1699,13 @@ public:
 
 #define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
 
-class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key> {
+class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key, rgw_obj_key> {
   RGWRados *store;
   RGWAsyncRadosProcessor *async_rados;
 
   string marker_oid;
   rgw_bucket_shard_full_sync_marker sync_marker;
 
-
 public:
   RGWBucketFullSyncShardMarkerTrack(RGWRados *_store, RGWAsyncRadosProcessor *_async_rados,
                          const string& _marker_oid,
@@ -1744,13 +1728,25 @@ public:
   }
 };
 
-class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string> {
+class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, rgw_obj_key> {
   RGWRados *store;
   RGWAsyncRadosProcessor *async_rados;
 
   string marker_oid;
   rgw_bucket_shard_inc_sync_marker sync_marker;
 
+  map<rgw_obj_key, pair<RGWModifyOp, string> > key_to_marker;
+  map<string, rgw_obj_key> marker_to_key;
+
+  void handle_finish(const string& marker) {
+    map<string, rgw_obj_key>::iterator iter = marker_to_key.find(marker);
+    if (iter == marker_to_key.end()) {
+      return;
+    }
+    key_to_marker.erase(iter->second);
+    reset_need_retry(iter->second);
+    marker_to_key.erase(iter);
+  }
 
 public:
   RGWBucketIncSyncShardMarkerTrack(RGWRados *_store, RGWAsyncRadosProcessor *_async_rados,
@@ -1771,9 +1767,37 @@ public:
     return new RGWSimpleRadosWriteAttrsCR(async_rados, store, store->get_zone_params().log_pool,
                                 marker_oid, attrs);
   }
+
+  /*
+   * create index from key -> <op, marker>, and from marker -> key
+   * this is useful so that we can insure that we only have one
+   * entry for any key that is used. This is needed when doing
+   * incremenatl sync of data, and we don't want to run multiple
+   * concurrent sync operations for the same bucket shard 
+   * Also, we should make sure that we don't run concurrent operations on the same key with
+   * different ops.
+   */
+  bool index_key_to_marker(const rgw_obj_key& key, RGWModifyOp op, const string& marker) {
+    if (key_to_marker.find(key) != key_to_marker.end()) {
+      set_need_retry(key);
+      return false;
+    }
+    key_to_marker[key] = make_pair<>(op, marker);
+    marker_to_key[marker] = key;
+    return true;
+  }
+
+  bool can_do_op(const rgw_obj_key& key, RGWModifyOp op) {
+    auto i = key_to_marker.find(key);
+    if (i == key_to_marker.end()) {
+      return true;
+    }
+
+    return (i->second.first == op);
+  }
 };
 
-template <class T>
+template <class T, class K>
 class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
   RGWRados *store;
   RGWAsyncRadosProcessor *async_rados;
@@ -1789,7 +1813,7 @@ class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
   RGWPendingState op_state;
 
   T entry_marker;
-  RGWSyncShardMarkerTrack<T> *marker_tracker;
+  RGWSyncShardMarkerTrack<T, K> *marker_tracker;
 
   int sync_status;
 
@@ -1800,7 +1824,7 @@ public:
                              const rgw_obj_key& _key, uint64_t _versioned_epoch,
                              utime_t& _timestamp,
                              RGWModifyOp _op, RGWPendingState _op_state,
-                            const T& _entry_marker, RGWSyncShardMarkerTrack<T> *_marker_tracker) : RGWCoroutine(_store->ctx()), store(_store),
+                            const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker) : RGWCoroutine(_store->ctx()), store(_store),
                                                      async_rados(_async_rados),
                                                      source_zone(_source_zone),
                                                       bucket_info(_bucket_info), shard_id(_shard_id),
@@ -1811,6 +1835,7 @@ public:
                                                       marker_tracker(_marker_tracker),
                                                       sync_status(0) {
     set_description() << "bucket sync single entry (source_zone=" << source_zone << ") b=" << bucket_info->bucket << ":" << shard_id <<"/" << key << "[" << versioned_epoch << "] log_entry=" << entry_marker;
+    set_status("init");
   }
 
   int operate() {
@@ -1819,32 +1844,36 @@ public:
       if (op_state != CLS_RGW_STATE_COMPLETE) {
         goto done;
       }
-      yield {
-        if (op == CLS_RGW_OP_ADD ||
-            op == CLS_RGW_OP_LINK_OLH) {
-          if (op == CLS_RGW_OP_ADD && !key.instance.empty() && key.instance != "null") {
-            set_status("skipping entry");
-            ldout(store->ctx(), 10) << "bucket skipping sync obj: " << source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]: versioned object will be synced on link_olh" << dendl;
-            return set_cr_done();
+      do {
+        yield {
+          marker_tracker->reset_need_retry(key);
+          if (op == CLS_RGW_OP_ADD ||
+              op == CLS_RGW_OP_LINK_OLH) {
+            if (op == CLS_RGW_OP_ADD && !key.instance.empty() && key.instance != "null") {
+              set_status("skipping entry");
+              ldout(store->ctx(), 10) << "bucket skipping sync obj: " << source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]: versioned object will be synced on link_olh" << dendl;
+              goto done;
 
+            }
+            set_status("syncing obj");
+            ldout(store->ctx(), 5) << "bucket sync: sync obj: " << source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
+            call(new RGWFetchRemoteObjCR(async_rados, store, source_zone, *bucket_info,
+                                         key, versioned_epoch,
+                                         true));
+          } else if (op == CLS_RGW_OP_DEL) {
+            call(new RGWRemoveObjCR(async_rados, store, source_zone, *bucket_info, key, versioned_epoch, &timestamp));
           }
-          set_status("syncing obj");
-          ldout(store->ctx(), 5) << "bucket sync: sync obj: " << source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
-          call(new RGWFetchRemoteObjCR(async_rados, store, source_zone, *bucket_info,
-                                           key, versioned_epoch,
-                                           true));
-        } else if (op == CLS_RGW_OP_DEL) {
-          call(new RGWRemoveObjCR(async_rados, store, source_zone, *bucket_info, key, versioned_epoch, &timestamp));
         }
-      }
+      } while (marker_tracker->need_retry(key));
       if (retcode < 0 && retcode != -ENOENT) {
-        set_status("failed to sync obj");
+        set_status() << "failed to sync obj; retcode=" << retcode;
         rgw_bucket& bucket = bucket_info->bucket;
         ldout(store->ctx(), 0) << "ERROR: failed to sync object: " << bucket.name << ":" << bucket.bucket_id << ":" << shard_id << "/" << key << dendl;
         sync_status = retcode;
       }
 done:
       /* update marker */
+      set_status() << "calling marker_tracker->finish(" << entry_marker << ")";
       yield call(marker_tracker->finish(entry_marker));
       if (sync_status == 0) {
         sync_status = retcode;
@@ -1930,7 +1959,7 @@ int RGWBucketShardFullSyncCR::operate()
           } else {
             RGWModifyOp op = (entry.key.instance.empty() || entry.key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH);
 
-            spawn(new RGWBucketSyncSingleEntryCR<rgw_obj_key>(store, async_rados, source_zone, bucket_info, shard_id,
+            spawn(new RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>(store, async_rados, source_zone, bucket_info, shard_id,
                                                               entry.key, entry.versioned_epoch, entry.mtime, op, CLS_RGW_STATE_COMPLETE, entry.key, marker_tracker), false);
           }
         }
@@ -1981,8 +2010,12 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
   list<rgw_bi_log_entry> list_result;
   list<rgw_bi_log_entry>::iterator entries_iter;
   rgw_bucket_shard_inc_sync_marker inc_marker;
+  rgw_obj_key key;
+  rgw_bi_log_entry *entry;
   RGWBucketIncSyncShardMarkerTrack *marker_tracker;
   int spawn_window;
+  bool updated_status;
+
 
 public:
   RGWBucketShardIncrementalSyncCR(RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
@@ -1996,8 +2029,11 @@ public:
                                                                             bucket_name(_bucket_name),
                                                                            bucket_id(_bucket_id), shard_id(_shard_id),
                                                                             bucket_info(_bucket_info),
-                                                                            inc_marker(_inc_marker), marker_tracker(NULL),
-                                                                            spawn_window(BUCKET_SYNC_SPAWN_WINDOW) {}
+                                                                            inc_marker(_inc_marker), entry(NULL), marker_tracker(NULL),
+                                                                            spawn_window(BUCKET_SYNC_SPAWN_WINDOW), updated_status(false) {
+    set_description() << "bucket shard incremental sync bucket=" << _bucket_name << ":" << _bucket_id << ":" << _shard_id;
+    set_status("init");
+  }
 
   ~RGWBucketShardIncrementalSyncCR() {
     delete marker_tracker;
@@ -2014,6 +2050,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
                                                           inc_marker);
     do {
       ldout(store->ctx(), 20) << __func__ << "(): listing bilog for incremental sync" << dendl;
+      set_status() << "listing bilog; position=" << inc_marker.position;
       yield call(new RGWListBucketIndexLogCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id,
                                          inc_marker.position, &list_result));
       if (retcode < 0 && retcode != -ENOENT) {
@@ -2023,23 +2060,46 @@ int RGWBucketShardIncrementalSyncCR::operate()
       }
       entries_iter = list_result.begin();
       for (; entries_iter != list_result.end(); ++entries_iter) {
+        key = rgw_obj_key(entries_iter->object, entries_iter->instance);
+        entry = &(*entries_iter);
+        set_status() << "got entry.id=" << entry->id << " key=" << key << " op=" << (int)entry->op;
+        if (entry->op == CLS_RGW_OP_CANCEL) {
+          set_status() << "canceled operation, skipping";
+          ldout(store->ctx(), 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": canceled operation" << dendl;
+          continue;
+        }
+        ldout(store->ctx(), 20) << "[inc sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << dendl;
+        inc_marker.position = entry->id;
+        updated_status = false;
+        while (!marker_tracker->can_do_op(key, entry->op)) {
+          if (!updated_status) {
+            set_status() << "can't do op, conflicting inflight operation";
+            updated_status = true;
+          }
+          ldout(store->ctx(), 5) << *this << ": [inc sync] can't do op on key=" << key << " need to wait for conflicting operation to complete" << dendl;
+          yield wait_for_child();
+          
+        }
+        if (!marker_tracker->index_key_to_marker(key, entry->op, entry->id)) {
+          set_status() << "can't do op, sync already in progress for object";
+          ldout(store->ctx(), 20) << __func__ << ": skipping sync of entry: " << entry->id << ":" << key << " sync already in progress for object" << dendl;
+          continue;
+        }
         yield {
-          rgw_obj_key key(entries_iter->object, entries_iter->instance);
-          ldout(store->ctx(), 20) << "[inc sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << dendl;
-          rgw_bi_log_entry& entry = *entries_iter;
-          inc_marker.position = entry.id;
-          if (!marker_tracker->start(entry.id, 0, entries_iter->timestamp)) {
-            ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << entry.id << ". Duplicate entry?" << dendl;
+          set_status() << "start object sync";
+          if (!marker_tracker->start(entry->id, 0, entries_iter->timestamp)) {
+            ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << entry->id << ". Duplicate entry?" << dendl;
           } else {
             uint64_t versioned_epoch = 0;
-            if (entry.ver.pool < 0) {
-              versioned_epoch = entry.ver.epoch;
+            if (entry->ver.pool < 0) {
+              versioned_epoch = entry->ver.epoch;
             }
-            spawn(new RGWBucketSyncSingleEntryCR<string>(store, async_rados, source_zone, bucket_info, shard_id,
-                                                         key, versioned_epoch, entry.timestamp, entry.op, entry.state, entry.id, marker_tracker), false);
+            spawn(new RGWBucketSyncSingleEntryCR<string, rgw_obj_key>(store, async_rados, source_zone, bucket_info, shard_id,
+                                                         key, versioned_epoch, entry->timestamp, entry->op, entry->state, entry->id, marker_tracker), false);
           }
         }
         while ((int)num_spawned() > spawn_window) {
+          set_status() << "num_spawned() > spawn_window";
           yield wait_for_child();
           while (collect(&ret)) {
             if (ret < 0) {
index 7fba583df8752f5f60c139685aef15069181cfba..eb64054c7a33a5bad4bd3ac476d65df2d7cfef9f 100644 (file)
@@ -267,6 +267,10 @@ struct rgw_bucket_shard_inc_sync_marker {
   void dump(Formatter *f) const {
     encode_json("position", position, f);
   }
+
+  bool operator<(const rgw_bucket_shard_inc_sync_marker& m) const {
+    return (position < m.position);
+  }
 };
 WRITE_CLASS_ENCODER(rgw_bucket_shard_inc_sync_marker)
 
index f1a08a1dc03a0cbaa3196f32c008877d7147a1b0..7b1e980b205b5f974a4c305db29b753d146f5bc5 100644 (file)
@@ -886,7 +886,7 @@ public:
 
 #define META_SYNC_UPDATE_MARKER_WINDOW 10
 
-class RGWMetaSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string> {
+class RGWMetaSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
   RGWMetaSyncEnv *sync_env;
 
   string marker_oid;
index de0fd2b0aa4cf3376bd7353b5b35a5d8bf51e40b..e8d0b1d445b133d654abefab3e2cbc44b3b037eb 100644 (file)
@@ -185,7 +185,7 @@ public:
   }
 };
 
-template <class T>
+template <class T, class K>
 class RGWSyncShardMarkerTrack {
   struct marker_entry {
     uint64_t pos;
@@ -204,6 +204,8 @@ class RGWSyncShardMarkerTrack {
 
 
 protected:
+  typename std::set<K> need_retry_set;
+
   virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const utime_t& timestamp) = 0;
   virtual void handle_finish(const T& marker) { }
 
@@ -228,7 +230,8 @@ public:
     }
 
     typename std::map<T, marker_entry>::iterator iter = pending.begin();
-    const T& first_pos = iter->first;
+
+    bool is_first = (pos == iter->first);
 
     typename std::map<T, marker_entry>::iterator pos_iter = pending.find(pos);
     if (pos_iter == pending.end()) {
@@ -247,7 +250,7 @@ public:
 
     updates_since_flush++;
 
-    if (pos == first_pos && (updates_since_flush >= window_size || pending.empty())) {
+    if (is_first && (updates_since_flush >= window_size || pending.empty())) {
       return update_marker(high_marker, high_entry);
     }
     return NULL;
@@ -257,6 +260,25 @@ public:
     updates_since_flush = 0;
     return store_marker(new_marker, entry.pos, entry.timestamp);
   }
+
+  /*
+   * a key needs retry if it was processing when another marker that points
+   * to the same bucket shards arrives. Instead of processing it, we mark
+   * it as need_retry so that when we finish processing the original, we
+   * retry the processing on the same bucket shard, in case there are more
+   * entries to process. This closes a race that can happen.
+   */
+  bool need_retry(const K& key) {
+    return (need_retry_set.find(key) != need_retry_set.end());
+  }
+
+  void set_need_retry(const K& key) {
+    need_retry_set.insert(key);
+  }
+
+  void reset_need_retry(const K& key) {
+    need_retry_set.erase(key);
+  }
 };
 
 class RGWMetaSyncShardMarkerTrack;