]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: backoff mechanism for data sync
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 11 Nov 2015 20:43:31 +0000 (12:43 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Feb 2016 00:13:31 +0000 (16:13 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_data_sync.cc

index 8f6dd40d1923e367f7538b265f57508aecc6f2ac..34251b65603f08c177ee35dd571433ffd2c004ef 100644 (file)
@@ -837,11 +837,12 @@ class RGWDataSyncShardCR : public RGWCoroutine {
 
   int total_entries;
 
+  bool *reset_backoff;
 
 public:
   RGWDataSyncShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
                      RGWRESTConn *_conn, rgw_bucket& _pool, const string& _source_zone,
-                    uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWCoroutine(_store->ctx()), store(_store),
+                    uint32_t _shard_id, rgw_data_sync_marker& _marker, bool *_reset_backoff) : RGWCoroutine(_store->ctx()), store(_store),
                                                       http_manager(_mgr),
                                                      async_rados(_async_rados),
                                                       conn(_conn),
@@ -850,7 +851,7 @@ public:
                                                      shard_id(_shard_id),
                                                      sync_marker(_marker),
                                                       marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
-                                                      total_entries(0) {
+                                                      total_entries(0), reset_backoff(NULL) {
   }
 
   ~RGWDataSyncShardCR() {
@@ -868,13 +869,23 @@ public:
   }
 
   int operate() {
+    int r;
     while (true) {
       switch (sync_marker.state) {
       case rgw_data_sync_marker::FullSync:
-        return full_sync();
+        r = full_sync();
+        if (r < 0) {
+          ldout(cct, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
+          return set_cr_error(r);
+        }
+        return 0;
       case rgw_data_sync_marker::IncrementalSync:
-        return incremental_sync();
-        break;
+        r  = incremental_sync();
+        if (r < 0) {
+          ldout(cct, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl;
+          return set_cr_error(r);
+        }
+        return 0;
       default:
         return set_cr_error(-EIO);
       }
@@ -883,6 +894,7 @@ public:
   }
 
   int full_sync() {
+#warning lock shard for full_sync
 #define OMAP_GET_MAX_ENTRIES 100
     int max_entries = OMAP_GET_MAX_ENTRIES;
     reenter(&full_cr) {
@@ -931,6 +943,7 @@ public:
     
 
   int incremental_sync() {
+#warning lock shard for inc_sync
     reenter(&incremental_cr) {
       set_marker_tracker(new RGWDataSyncShardMarkerTrack(store, http_manager, async_rados,
                                                          RGWDataSyncStatusManager::shard_obj_name(source_zone, shard_id),
@@ -989,6 +1002,113 @@ public:
   }
 };
 
+class RGWControlCR : public RGWCoroutine
+{
+  RGWCoroutine *cr;
+  Mutex lock;
+
+  RGWSyncBackoff backoff;
+  bool reset_backoff;
+
+protected:
+  bool *backoff_ptr() {
+    return &reset_backoff;
+  }
+
+  Mutex& cr_lock() {
+    return lock;
+  }
+
+  RGWCoroutine *get_cr() {
+    return cr;
+  }
+
+public:
+  RGWControlCR(CephContext *_cct) : RGWCoroutine(_cct), cr(NULL), lock("RGWControlCR::lock"), reset_backoff(false) {
+  }
+
+  virtual ~RGWControlCR() {
+    if (cr) {
+      cr->put();
+    }
+  }
+
+  virtual RGWCoroutine *alloc_cr() = 0;
+
+  int operate() {
+    reenter(this) {
+      while (true) {
+        yield {
+          Mutex::Locker l(lock);
+          cr = alloc_cr();
+          int r = call(cr);
+          if (r < 0) {
+            cr->put();
+            cr = NULL;
+            ldout(cct, 0) << "ERROR: call() returned " << r << dendl;
+            return set_cr_error(r);
+          }
+        }
+        {
+          Mutex::Locker l(lock);
+          cr->put();
+          cr = NULL;
+        }
+        if (retcode < 0 && retcode != -EBUSY && retcode != -EAGAIN) {
+          ldout(cct, 0) << "ERROR: RGWControlCR called coroutine returned " << retcode << dendl;
+          return set_cr_error(retcode);
+        }
+        if (reset_backoff) {
+          backoff.reset();
+        }
+        yield backoff.backoff(this);
+      }
+    }
+    return 0;
+  }
+};
+
+class RGWDataSyncShardControlCR : public RGWControlCR {
+  RGWRados *store;
+  RGWHTTPManager *http_manager;
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRESTConn *conn;
+
+  rgw_bucket pool;
+
+  string source_zone;
+  uint32_t shard_id;
+  rgw_data_sync_marker sync_marker;
+
+public:
+  RGWDataSyncShardControlCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
+                     RGWRESTConn *_conn, rgw_bucket& _pool, const string& _source_zone,
+                    uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWControlCR(_store->ctx()), store(_store),
+                                                      http_manager(_mgr),
+                                                     async_rados(_async_rados),
+                                                      conn(_conn),
+                                                     pool(_pool),
+                                                      source_zone(_source_zone),
+                                                     shard_id(_shard_id),
+                                                     sync_marker(_marker) {
+  }
+
+  RGWCoroutine *alloc_cr() {
+    return new RGWDataSyncShardCR(store, http_manager, async_rados, conn, pool, source_zone, shard_id, sync_marker, backoff_ptr());
+  }
+
+  void append_modified_shards(set<string>& keys) {
+    Mutex::Locker l(cr_lock());
+
+    RGWDataSyncShardCR *cr = static_cast<RGWDataSyncShardCR *>(get_cr());
+    if (!cr) {
+      return;
+    }
+
+    cr->append_modified_shards(keys);
+  }
+};
+
 class RGWDataSyncCR : public RGWCoroutine {
   RGWRados *store;
   RGWHTTPManager *http_manager;
@@ -1003,18 +1123,21 @@ class RGWDataSyncCR : public RGWCoroutine {
   RGWDataSyncShardMarkerTrack *marker_tracker;
 
   Mutex shard_crs_lock;
-  map<int, RGWDataSyncShardCR *> shard_crs;
+  map<int, RGWDataSyncShardControlCR *> shard_crs;
+
+  bool *reset_backoff;
 
 public:
   RGWDataSyncCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
-                RGWRESTConn *_conn, const string& _source_zone) : RGWCoroutine(_store->ctx()), store(_store),
+                RGWRESTConn *_conn, const string& _source_zone, bool *_reset_backoff) : RGWCoroutine(_store->ctx()), store(_store),
                                                       http_manager(_mgr),
                                                      async_rados(_async_rados),
                                                       conn(_conn),
                                                       source_zone(_source_zone),
                                                       obj_ctx(store),
                                                       marker_tracker(NULL),
-                                                      shard_crs_lock("RGWDataSyncCR::shard_crs_lock") {
+                                                      shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
+                                                      reset_backoff(_reset_backoff) {
   }
 
   int operate() {
@@ -1035,30 +1158,36 @@ public:
         return set_cr_error(retcode);
       }
 
-      yield {
-        /* state: init status */
-        if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
-          ldout(store->ctx(), 20) << __func__ << "(): init" << dendl;
+      /* state: init status */
+      if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
+        ldout(store->ctx(), 20) << __func__ << "(): init" << dendl;
+        yield {
           r = call(new RGWInitDataSyncStatusCoroutine(async_rados, store, http_manager, obj_ctx, source_zone, sync_status.sync_info.num_shards));
           if (r < 0) {
             ldout(store->ctx(), 0) << "ERROR: failed to call RGWReadDataSyncStatusCoroutine r=" << r << dendl;
             return  set_cr_error(r);
           }
-          sync_status.sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
-          /* update new state */
-          yield {
-            r = call(set_sync_info_cr());
-            if (r < 0) {
-              ldout(store->ctx(), 0) << "ERROR: failed to write sync status" << dendl;
-              return r;
-            }
+        }
+        if (retcode < 0) {
+          ldout(store->ctx(), 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl;
+          return set_cr_error(retcode);
+        }
+        sync_status.sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
+        /* update new state */
+        yield {
+          r = call(set_sync_info_cr());
+          if (r < 0) {
+            ldout(store->ctx(), 0) << "ERROR: failed to write sync status" << dendl;
+            return set_cr_error(r);
           }
         }
-      }
 
-      if (retcode < 0) {
-        ldout(store->ctx(), 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl;
-        return set_cr_error(retcode);
+        if (retcode < 0) {
+          ldout(store->ctx(), 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
+          return set_cr_error(retcode);
+        }
+
+        *reset_backoff = true;
       }
 
       if  ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
@@ -1077,16 +1206,24 @@ public:
           r = call(set_sync_info_cr());
           if (r < 0) {
             ldout(store->ctx(), 0) << "ERROR: failed to write sync status" << dendl;
-            return r;
+            return set_cr_error(r);
           }
         }
+
+        if (retcode < 0) {
+          ldout(store->ctx(), 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
+          return set_cr_error(retcode);
+        }
+
+        *reset_backoff = true;
       }
+
       yield {
         if  ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
           case rgw_data_sync_info::StateSync:
             for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
                  iter != sync_status.sync_markers.end(); ++iter) {
-              RGWDataSyncShardCR *cr = new RGWDataSyncShardCR(store, http_manager, async_rados,
+              RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(store, http_manager, async_rados,
                                                         conn, store->get_zone_params().log_pool, source_zone,
                                                         iter->first, iter->second);
               shard_crs_lock.Lock();
@@ -1110,7 +1247,7 @@ public:
 
   void wakeup(int shard_id, set<string>& keys) {
     Mutex::Locker l(shard_crs_lock);
-    map<int, RGWDataSyncShardCR *>::iterator iter = shard_crs.find(shard_id);
+    map<int, RGWDataSyncShardControlCR *>::iterator iter = shard_crs.find(shard_id);
     if (iter == shard_crs.end()) {
       return;
     }
@@ -1119,7 +1256,7 @@ public:
   }
 };
 
-class RGWDataSyncControlCR : public RGWCoroutine
+class RGWDataSyncControlCR : public RGWControlCR
 {
   RGWRados *store;
   RGWHTTPManager *http_manager;
@@ -1127,67 +1264,23 @@ class RGWDataSyncControlCR : public RGWCoroutine
   RGWRESTConn *conn;
   string source_zone;
 
-  Mutex lock;
-  RGWDataSyncCR *cr;
-
-  RGWSyncBackoff backoff;
-  bool reset_backoff;
-
 public:
   RGWDataSyncControlCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
-                RGWRESTConn *_conn, const string& _source_zone) : RGWCoroutine(_store->ctx()), store(_store),
+                RGWRESTConn *_conn, const string& _source_zone) : RGWControlCR(_store->ctx()), store(_store),
                                                       http_manager(_mgr),
                                                      async_rados(_async_rados),
                                                       conn(_conn),
-                                                      source_zone(_source_zone),
-                                                      lock("RGWDataSyncControlCR::lock"),
-                                                      cr(NULL),
-                                                      reset_backoff(false) {
+                                                      source_zone(_source_zone) {
   }
 
-  ~RGWDataSyncControlCR() {
-    Mutex::Locker l(lock);
-    if (cr) {
-      cr->put();
-      cr = NULL;
-    }
-  }
-
-  int operate() {
-    reenter(this) {
-      while (true) {
-        yield {
-          Mutex::Locker l(lock);
-          cr = new RGWDataSyncCR(store, http_manager, async_rados, conn, source_zone);
-          cr->get();
-          int r = call(new RGWDataSyncCR(store, http_manager, async_rados, conn, source_zone));
-          if (r < 0) {
-            cr->put();
-            cr = NULL;
-            ldout(store->ctx(), 0) << "ERROR: call(RGWDataSyncControlCR()) returned " << r << dendl;
-            return set_cr_error(r);
-          }
-        }
-        {
-          Mutex::Locker l(lock);
-          cr->put();
-          cr = NULL;
-        }
-        if (retcode < 0 && retcode != -EBUSY && retcode != -EAGAIN) {
-          ldout(store->ctx(), 0) << "ERROR: RGWDataSyncControlCR() returned " << retcode << dendl;
-          return set_cr_error(retcode);
-        }
-        if (reset_backoff) {
-          backoff.reset();
-        }
-        yield backoff.backoff(this);
-      }
-    }
-    return 0;
+  RGWCoroutine *alloc_cr() {
+    return new RGWDataSyncCR(store, http_manager, async_rados, conn, source_zone, backoff_ptr());
   }
 
   void wakeup(int shard_id, set<string>& keys) {
-    Mutex::Locker l(lock);
+    Mutex::Locker l(cr_lock());
+
+    RGWDataSyncCR *cr = static_cast<RGWDataSyncCR *>(get_cr());
     if (cr) {
       cr->wakeup(shard_id, keys);
     }