]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: data sync, don't yield when spawning
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 1 Dec 2015 21:27:20 +0000 (13:27 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Feb 2016 00:13:40 +0000 (16:13 -0800)
this enables us to filter out duplicate keys without worrying about races, since
spawned crs will only be executed once we yield.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_data_sync.cc

index f741c014c6640614d2469c6eb368d31823adb05b..c46cbce9880ca1cc1cd9be9f8c5b3763b5ab495a 100644 (file)
@@ -739,6 +739,8 @@ public:
   }
 };
 
+#define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
+
 class RGWDataSyncShardCR : public RGWCoroutine {
   RGWRados *store;
   RGWHTTPManager *http_manager;
@@ -779,8 +781,12 @@ class RGWDataSyncShardCR : public RGWCoroutine {
 
   int total_entries;
 
+  int spawn_window;
+
   bool *reset_backoff;
 
+  set<string> spawned_keys;
+
 public:
   RGWDataSyncShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
                      RGWRESTConn *_conn, rgw_bucket& _pool, const string& _source_zone,
@@ -793,7 +799,7 @@ public:
                                                      shard_id(_shard_id),
                                                      sync_marker(_marker),
                                                       marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
-                                                      total_entries(0), reset_backoff(NULL) {
+                                                      total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL) {
     set_description() << "data sync shard source_zone=" << source_zone << " shard_id=" << shard_id;
   }
 
@@ -920,6 +926,7 @@ public:
 #define INCREMENTAL_MAX_ENTRIES 100
        ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
        if (datalog_marker > sync_marker.marker) {
+          spawned_keys.clear();
           yield call(new RGWReadRemoteDataLogShardCR(store, http_manager, async_rados, conn, shard_id, &sync_marker.marker, &log_entries, &truncated));
           for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
             ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl;
@@ -930,13 +937,32 @@ public:
             if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
               ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl;
             } else {
-              yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, log_iter->entry.key, log_iter->log_id, marker_tracker), false);
-              if (retcode < 0) {
-                drain_all();
-                return set_cr_error(retcode);
+              /*
+               * don't spawn the same key more than once. We can do that as long as we don't yield
+               */
+              if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
+                spawned_keys.insert(log_iter->entry.key);
+                spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, log_iter->entry.key, log_iter->log_id, marker_tracker), false);
+                if (retcode < 0) {
+                  drain_all();
+                  return set_cr_error(retcode);
+                }
               }
             }
          }
+          while ((int)num_spawned() > spawn_window) {
+            set_status() << "num_spawned() > spawn_window";
+            yield wait_for_child();
+            int ret;
+            while (collect(&ret)) {
+              if (ret < 0) {
+                ldout(store->ctx(), 0) << "ERROR: a sync operation returned error" << dendl;
+                /* we should have reported this error */
+#warning deal with error
+              }
+              /* not waiting for child here */
+            }
+          }
        }
        ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
        if (datalog_marker == sync_marker.marker) {
@@ -2089,7 +2115,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
           ldout(store->ctx(), 20) << __func__ << ": skipping sync of entry: " << entry->id << ":" << key << " sync already in progress for object" << dendl;
           continue;
         }
-        yield {
+        // yield {
           set_status() << "start object sync";
           if (!marker_tracker->start(entry->id, 0, entries_iter->timestamp)) {
             ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << entry->id << ". Duplicate entry?" << dendl;
@@ -2101,7 +2127,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
             spawn(new RGWBucketSyncSingleEntryCR<string, rgw_obj_key>(store, async_rados, source_zone, bucket_info, shard_id,
                                                          key, versioned_epoch, entry->timestamp, entry->op, entry->state, entry->id, marker_tracker), false);
           }
-        }
+        // }
         while ((int)num_spawned() > spawn_window) {
           set_status() << "num_spawned() > spawn_window";
           yield wait_for_child();