]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: hold lock for data log trimming
authorCasey Bodley <cbodley@redhat.com>
Sun, 31 Jul 2016 16:42:24 +0000 (12:42 -0400)
committerCasey Bodley <cbodley@redhat.com>
Mon, 22 Jan 2018 20:55:17 +0000 (15:55 -0500)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit 0777fff4163588850a95433799d4e3ce1bd8b6e9)

src/rgw/rgw_data_sync.cc

index a3e39f078cbe36e702f73571ff6a29e5ac98da3b..70afacbe8c00664037e6566ecde92288f95da514 100644 (file)
@@ -2950,22 +2950,21 @@ class DataLogTrimCR : public RGWCoroutine {
   RGWRados *store;
   RGWHTTPManager *http;
   const int num_shards;
-  const utime_t interval; //< polling interval
   const std::string& zone; //< my zone id
   std::vector<rgw_data_sync_status> peer_status; //< sync status for each peer
   std::vector<rgw_data_sync_marker> min_shard_markers; //< min marker per shard
-  std::vector<std::string> last_trim; //< last trimmed marker per shard
+  std::vector<std::string>& last_trim; //< last trimmed marker per shard
   int ret{0};
 
  public:
   DataLogTrimCR(RGWRados *store, RGWHTTPManager *http,
-                   int num_shards, utime_t interval)
+                   int num_shards, std::vector<std::string>& last_trim)
     : RGWCoroutine(store->ctx()), store(store), http(http),
-      num_shards(num_shards), interval(interval),
+      num_shards(num_shards),
       zone(store->get_zone().id),
       peer_status(store->zone_conn_map.size()),
       min_shard_markers(num_shards),
-      last_trim(num_shards)
+      last_trim(last_trim)
   {}
 
   int operate() override;
@@ -2974,62 +2973,115 @@ class DataLogTrimCR : public RGWCoroutine {
 int DataLogTrimCR::operate()
 {
   reenter(this) {
-    for (;;) {
-      yield wait(interval);
+    ldout(cct, 10) << "fetching sync status for zone " << zone << dendl;
+    set_status("fetching sync status");
+    yield {
+      // query data sync status from each sync peer
+      rgw_http_param_pair params[] = {
+        { "type", "data" },
+        { "status", nullptr },
+        { "source-zone", zone.c_str() },
+        { nullptr, nullptr }
+      };
+
+      auto p = peer_status.begin();
+      for (auto& c : store->zone_conn_map) {
+        ldout(cct, 20) << "query sync status from " << c.first << dendl;
+        using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
+        spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
+              false);
+        ++p;
+      }
+    }
+
+    // must get a successful reply from all peers to consider trimming
+    ret = 0;
+    while (ret == 0 && num_spawned() > 0) {
+      yield wait_for_child();
+      collect_next(&ret);
+    }
+    drain_all();
 
-      ldout(cct, 10) << "fetching sync status for zone " << zone << dendl;
-      yield {
-        // query data sync status from each sync peer
-        rgw_http_param_pair params[] = {
-          { "type", "data" },
-          { "status", nullptr },
-          { "source-zone", zone.c_str() },
-          { nullptr, nullptr }
-        };
-
-        auto p = peer_status.begin();
-        for (auto& c : store->zone_conn_map) {
-          ldout(cct, 20) << "query sync status from " << c.first << dendl;
-          using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
-          spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
-                false);
-          ++p;
+    if (ret < 0) {
+      ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
+      return set_cr_error(ret);
+    }
+
+    ldout(cct, 10) << "trimming log shards" << dendl;
+    set_status("trimming log shards");
+    yield {
+      // determine the minimum marker for each shard
+      take_min_markers(peer_status.begin(), peer_status.end(),
+                       min_shard_markers.begin());
+
+      for (int i = 0; i < num_shards; i++) {
+        const auto& m = min_shard_markers[i];
+        auto& stable = get_stable_marker(m);
+        if (stable <= last_trim[i]) {
+          continue;
         }
+        ldout(cct, 10) << "trimming log shard " << i
+            << " at marker=" << stable
+            << " last_trim=" << last_trim[i] << dendl;
+        using TrimCR = LastTimelogTrimCR;
+        spawn(new TrimCR(store, store->data_log->get_oid(i),
+                         stable, &last_trim[i]),
+              true);
       }
+    }
+    return set_cr_done();
+  }
+  return 0;
+}
 
-      // must get a successful reply from all peers to consider trimming
-      ret = 0;
-      while (ret == 0 && num_spawned()) {
-        yield wait_for_child();
-        collect_next(&ret);
-      }
-      if (ret < 0) {
-        ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
-        drain_all();
+class DataLogTrimPollCR : public RGWCoroutine {
+  RGWRados *store;
+  RGWHTTPManager *http;
+  const int num_shards;
+  const utime_t interval; //< polling interval
+  const std::string lock_oid; //< use first data log shard for lock
+  const std::string lock_cookie;
+  std::vector<std::string> last_trim; //< last trimmed marker per shard
+
+ public:
+  DataLogTrimPollCR(RGWRados *store, RGWHTTPManager *http,
+                    int num_shards, utime_t interval)
+    : RGWCoroutine(store->ctx()), store(store), http(http),
+      num_shards(num_shards), interval(interval),
+      lock_oid(store->data_log->get_oid(0)),
+      lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
+      last_trim(num_shards)
+  {}
+
+  int operate();
+};
+
+int DataLogTrimPollCR::operate()
+{
+  reenter(this) {
+    for (;;) {
+      set_status("sleeping");
+      wait(interval);
+
+      // request a 'data_trim' lock that covers the entire wait interval to
+      // prevent other gateways from attempting to trim for the duration
+      set_status("acquiring trim lock");
+      yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
+                                          store->get_zone_params().log_pool,
+                                          lock_oid, "data_trim", lock_cookie,
+                                          interval.sec()));
+      if (retcode < 0) {
+        // if the lock is already held, go back to sleep and try again later
+        ldout(cct, 4) << "failed to lock " << lock_oid << ", trying again in "
+            << interval.sec() << "s" << dendl;
         continue;
       }
 
-      ldout(cct, 10) << "trimming log shards" << dendl;
-      yield {
-        // determine the minimum marker for each shard
-        take_min_markers(peer_status.begin(), peer_status.end(),
-                         min_shard_markers.begin());
-
-        for (int i = 0; i < num_shards; i++) {
-          const auto& m = min_shard_markers[i];
-          auto& stable = get_stable_marker(m);
-          if (stable <= last_trim[i]) {
-            continue;
-          }
-          ldout(cct, 10) << "trimming log shard " << i
-              << " at marker=" << stable
-              << " last_trim=" << last_trim[i] << dendl;
-          using TrimCR = LastTimelogTrimCR;
-          spawn(new TrimCR(store, store->data_log->get_oid(i),
-                           stable, &last_trim[i]),
-                true);
-        }
-      }
+      set_status("trimming");
+      yield call(new DataLogTrimCR(store, http, num_shards, last_trim));
+
+      // note that the lock is not released. this is intentional, as it avoids
+      // duplicating this work in other gateways
     }
   }
   return 0;
@@ -3041,5 +3093,5 @@ RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
                                       RGWHTTPManager *http,
                                       int num_shards, utime_t interval)
 {
-  return new DataLogTrimCR(store, http, num_shards, interval);
+  return new DataLogTrimPollCR(store, http, num_shards, interval);
 }