]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: move datalog trimming into rgw_trim_datalog.cc
authorCasey Bodley <cbodley@redhat.com>
Sat, 13 Apr 2019 17:21:44 +0000 (13:21 -0400)
committerCasey Bodley <cbodley@redhat.com>
Thu, 18 Apr 2019 17:16:17 +0000 (13:16 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/CMakeLists.txt
src/rgw/rgw_admin.cc
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h
src/rgw/rgw_rados.cc
src/rgw/rgw_trim_datalog.cc [new file with mode: 0644]
src/rgw/rgw_trim_datalog.h [new file with mode: 0644]

index b93cac4aa1e1b79eb18d22c60a4d3fb9ec061398..cff1ead0ebfef322737cd4175bf42930e6a1b2f8 100644 (file)
@@ -74,6 +74,7 @@ set(librgw_common_srcs
   rgw_sync_module_pubsub_rest.cc
   rgw_sync_log_trim.cc
   rgw_sync_trace.cc
+  rgw_trim_datalog.cc
   rgw_trim_mdlog.cc
   rgw_period_history.cc
   rgw_period_puller.cc
index e76d2954024f59efb0d6ed731d84d464f50a8ebe..de75af45e1e40ea5c49780985a1b3d42e96ee1fd 100644 (file)
@@ -44,6 +44,7 @@ extern "C" {
 #include "rgw_usage.h"
 #include "rgw_orphan.h"
 #include "rgw_sync.h"
+#include "rgw_trim_datalog.h"
 #include "rgw_trim_mdlog.h"
 #include "rgw_sync_log_trim.h"
 #include "rgw_data_sync.h"
index 6c8d4236cd1653160d46fc41ac1021262cfe54d3..33e69c965941001e33f241dd3ba6ece687a88bb1 100644 (file)
@@ -3503,208 +3503,3 @@ int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, RGWRados *store, const
   return crs.run(new RGWCollectBucketSyncStatusCR(store, &env, num_shards,
                                                   bucket_info.bucket, status));
 }
-
-
-// TODO: move into rgw_data_sync_trim.cc
-#undef dout_prefix
-#define dout_prefix (*_dout << "data trim: ")
-
-namespace {
-
-/// return the marker that it's safe to trim up to
-const std::string& get_stable_marker(const rgw_data_sync_marker& m)
-{
-  return m.state == m.FullSync ? m.next_step_marker : m.marker;
-}
-
-/// comparison operator for take_min_markers()
-bool operator<(const rgw_data_sync_marker& lhs,
-               const rgw_data_sync_marker& rhs)
-{
-  // sort by stable marker
-  return get_stable_marker(lhs) < get_stable_marker(rhs);
-}
-
-/// populate the container starting with 'dest' with the minimum stable marker
-/// of each shard for all of the peers in [first, last)
-template <typename IterIn, typename IterOut>
-void take_min_markers(IterIn first, IterIn last, IterOut dest)
-{
-  if (first == last) {
-    return;
-  }
-  // initialize markers with the first peer's
-  auto m = dest;
-  for (auto &shard : first->sync_markers) {
-    *m = std::move(shard.second);
-    ++m;
-  }
-  // for remaining peers, replace with smaller markers
-  for (auto p = first + 1; p != last; ++p) {
-    m = dest;
-    for (auto &shard : p->sync_markers) {
-      if (shard.second < *m) {
-        *m = std::move(shard.second);
-      }
-      ++m;
-    }
-  }
-}
-
-} // anonymous namespace
-
-class DataLogTrimCR : public RGWCoroutine {
-  RGWRados *store;
-  RGWHTTPManager *http;
-  const int num_shards;
-  const std::string& zone_id; //< my zone id
-  std::vector<rgw_data_sync_status> peer_status; //< sync status for each peer
-  std::vector<rgw_data_sync_marker> min_shard_markers; //< min marker per shard
-  std::vector<std::string>& last_trim; //< last trimmed marker per shard
-  int ret{0};
-
- public:
-  DataLogTrimCR(RGWRados *store, RGWHTTPManager *http,
-                   int num_shards, std::vector<std::string>& last_trim)
-    : RGWCoroutine(store->ctx()), store(store), http(http),
-      num_shards(num_shards),
-      zone_id(store->svc.zone->get_zone().id),
-      peer_status(store->svc.zone->get_zone_data_notify_to_map().size()),
-      min_shard_markers(num_shards),
-      last_trim(last_trim)
-  {}
-
-  int operate() override;
-};
-
-int DataLogTrimCR::operate()
-{
-  reenter(this) {
-    ldout(cct, 10) << "fetching sync status for zone " << zone_id << dendl;
-    set_status("fetching sync status");
-    yield {
-      // query data sync status from each sync peer
-      rgw_http_param_pair params[] = {
-        { "type", "data" },
-        { "status", nullptr },
-        { "source-zone", zone_id.c_str() },
-        { nullptr, nullptr }
-      };
-
-      auto p = peer_status.begin();
-      for (auto& c : store->svc.zone->get_zone_data_notify_to_map()) {
-        ldout(cct, 20) << "query sync status from " << c.first << dendl;
-        using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
-        spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
-              false);
-        ++p;
-      }
-    }
-
-    // must get a successful reply from all peers to consider trimming
-    ret = 0;
-    while (ret == 0 && num_spawned() > 0) {
-      yield wait_for_child();
-      collect_next(&ret);
-    }
-    drain_all();
-
-    if (ret < 0) {
-      ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
-      return set_cr_error(ret);
-    }
-
-    ldout(cct, 10) << "trimming log shards" << dendl;
-    set_status("trimming log shards");
-    yield {
-      // determine the minimum marker for each shard
-      take_min_markers(peer_status.begin(), peer_status.end(),
-                       min_shard_markers.begin());
-
-      for (int i = 0; i < num_shards; i++) {
-        const auto& m = min_shard_markers[i];
-        auto& stable = get_stable_marker(m);
-        if (stable <= last_trim[i]) {
-          continue;
-        }
-        ldout(cct, 10) << "trimming log shard " << i
-            << " at marker=" << stable
-            << " last_trim=" << last_trim[i] << dendl;
-        using TrimCR = RGWSyncLogTrimCR;
-        spawn(new TrimCR(store, store->data_log->get_oid(i),
-                         stable, &last_trim[i]),
-              true);
-      }
-    }
-    return set_cr_done();
-  }
-  return 0;
-}
-
-RGWCoroutine* create_admin_data_log_trim_cr(RGWRados *store,
-                                            RGWHTTPManager *http,
-                                            int num_shards,
-                                            std::vector<std::string>& markers)
-{
-  return new DataLogTrimCR(store, http, num_shards, markers);
-}
-
-class DataLogTrimPollCR : public RGWCoroutine {
-  RGWRados *store;
-  RGWHTTPManager *http;
-  const int num_shards;
-  const utime_t interval; //< polling interval
-  const std::string lock_oid; //< use first data log shard for lock
-  const std::string lock_cookie;
-  std::vector<std::string> last_trim; //< last trimmed marker per shard
-
- public:
-  DataLogTrimPollCR(RGWRados *store, RGWHTTPManager *http,
-                    int num_shards, utime_t interval)
-    : RGWCoroutine(store->ctx()), store(store), http(http),
-      num_shards(num_shards), interval(interval),
-      lock_oid(store->data_log->get_oid(0)),
-      lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
-      last_trim(num_shards)
-  {}
-
-  int operate() override;
-};
-
-int DataLogTrimPollCR::operate()
-{
-  reenter(this) {
-    for (;;) {
-      set_status("sleeping");
-      wait(interval);
-
-      // request a 'data_trim' lock that covers the entire wait interval to
-      // prevent other gateways from attempting to trim for the duration
-      set_status("acquiring trim lock");
-      yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
-                                          rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, lock_oid),
-                                          "data_trim", lock_cookie,
-                                          interval.sec()));
-      if (retcode < 0) {
-        // if the lock is already held, go back to sleep and try again later
-        ldout(cct, 4) << "failed to lock " << lock_oid << ", trying again in "
-            << interval.sec() << "s" << dendl;
-        continue;
-      }
-
-      set_status("trimming");
-      yield call(new DataLogTrimCR(store, http, num_shards, last_trim));
-
-      // note that the lock is not released. this is intentional, as it avoids
-      // duplicating this work in other gateways
-    }
-  }
-  return 0;
-}
-
-RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
-                                      RGWHTTPManager *http,
-                                      int num_shards, utime_t interval)
-{
-  return new DataLogTrimPollCR(store, http, num_shards, interval);
-}
index 55a71d720daaa06d912055d23714149f2a32b4d7..440ef153cb282bed67d3c88e15b4feca248d8ca8 100644 (file)
@@ -611,15 +611,4 @@ public:
   int create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) override;
 };
 
-// DataLogTrimCR factory function
-extern RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
-                                             RGWHTTPManager *http,
-                                             int num_shards, utime_t interval);
-
-// factory function for datalog trim via radosgw-admin
-RGWCoroutine* create_admin_data_log_trim_cr(RGWRados *store,
-                                            RGWHTTPManager *http,
-                                            int num_shards,
-                                            std::vector<std::string>& markers);
-
 #endif
index 4a18def8d258c7ebe8bf6337d5d071c37e90d3f8..f6a68d44d05876ebf62afb8d08dd3e0f496c158b 100644 (file)
@@ -68,6 +68,7 @@ using namespace librados;
 #include "rgw_sync.h"
 #include "rgw_sync_counters.h"
 #include "rgw_sync_trace.h"
+#include "rgw_trim_datalog.h"
 #include "rgw_trim_mdlog.h"
 #include "rgw_data_sync.h"
 #include "rgw_realm_watcher.h"
diff --git a/src/rgw/rgw_trim_datalog.cc b/src/rgw/rgw_trim_datalog.cc
new file mode 100644 (file)
index 0000000..272dfab
--- /dev/null
@@ -0,0 +1,218 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <vector>
+#include <string>
+
+#include "rgw_trim_datalog.h"
+#include "rgw_cr_rados.h"
+#include "rgw_cr_rest.h"
+#include "rgw_data_sync.h"
+#include "rgw_zone.h"
+
+#include <boost/asio/yield.hpp>
+
+#define dout_subsys ceph_subsys_rgw
+
+#undef dout_prefix
+#define dout_prefix (*_dout << "data trim: ")
+
+namespace {
+
+/// return the marker that it's safe to trim up to
+const std::string& get_stable_marker(const rgw_data_sync_marker& m)
+{
+  return m.state == m.FullSync ? m.next_step_marker : m.marker;
+}
+
+/// comparison operator for take_min_markers()
+bool operator<(const rgw_data_sync_marker& lhs,
+               const rgw_data_sync_marker& rhs)
+{
+  // sort by stable marker
+  return get_stable_marker(lhs) < get_stable_marker(rhs);
+}
+
+/// populate the container starting with 'dest' with the minimum stable marker
+/// of each shard for all of the peers in [first, last)
+template <typename IterIn, typename IterOut>
+void take_min_markers(IterIn first, IterIn last, IterOut dest)
+{
+  if (first == last) {
+    return;
+  }
+  // initialize markers with the first peer's
+  auto m = dest;
+  for (auto &shard : first->sync_markers) {
+    *m = std::move(shard.second);
+    ++m;
+  }
+  // for remaining peers, replace with smaller markers
+  for (auto p = first + 1; p != last; ++p) {
+    m = dest;
+    for (auto &shard : p->sync_markers) {
+      if (shard.second < *m) {
+        *m = std::move(shard.second);
+      }
+      ++m;
+    }
+  }
+}
+
+} // anonymous namespace
+
+class DataLogTrimCR : public RGWCoroutine {
+  RGWRados *store;
+  RGWHTTPManager *http;
+  const int num_shards;
+  const std::string& zone_id; //< my zone id
+  std::vector<rgw_data_sync_status> peer_status; //< sync status for each peer
+  std::vector<rgw_data_sync_marker> min_shard_markers; //< min marker per shard
+  std::vector<std::string>& last_trim; //< last trimmed marker per shard
+  int ret{0};
+
+ public:
+  DataLogTrimCR(RGWRados *store, RGWHTTPManager *http,
+                   int num_shards, std::vector<std::string>& last_trim)
+    : RGWCoroutine(store->ctx()), store(store), http(http),
+      num_shards(num_shards),
+      zone_id(store->svc.zone->get_zone().id),
+      peer_status(store->svc.zone->get_zone_data_notify_to_map().size()),
+      min_shard_markers(num_shards),
+      last_trim(last_trim)
+  {}
+
+  int operate() override;
+};
+
+int DataLogTrimCR::operate()
+{
+  reenter(this) {
+    ldout(cct, 10) << "fetching sync status for zone " << zone_id << dendl;
+    set_status("fetching sync status");
+    yield {
+      // query data sync status from each sync peer
+      rgw_http_param_pair params[] = {
+        { "type", "data" },
+        { "status", nullptr },
+        { "source-zone", zone_id.c_str() },
+        { nullptr, nullptr }
+      };
+
+      auto p = peer_status.begin();
+      for (auto& c : store->svc.zone->get_zone_data_notify_to_map()) {
+        ldout(cct, 20) << "query sync status from " << c.first << dendl;
+        using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
+        spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
+              false);
+        ++p;
+      }
+    }
+
+    // must get a successful reply from all peers to consider trimming
+    ret = 0;
+    while (ret == 0 && num_spawned() > 0) {
+      yield wait_for_child();
+      collect_next(&ret);
+    }
+    drain_all();
+
+    if (ret < 0) {
+      ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
+      return set_cr_error(ret);
+    }
+
+    ldout(cct, 10) << "trimming log shards" << dendl;
+    set_status("trimming log shards");
+    yield {
+      // determine the minimum marker for each shard
+      take_min_markers(peer_status.begin(), peer_status.end(),
+                       min_shard_markers.begin());
+
+      for (int i = 0; i < num_shards; i++) {
+        const auto& m = min_shard_markers[i];
+        auto& stable = get_stable_marker(m);
+        if (stable <= last_trim[i]) {
+          continue;
+        }
+        ldout(cct, 10) << "trimming log shard " << i
+            << " at marker=" << stable
+            << " last_trim=" << last_trim[i] << dendl;
+        using TrimCR = RGWSyncLogTrimCR;
+        spawn(new TrimCR(store, store->data_log->get_oid(i),
+                         stable, &last_trim[i]),
+              true);
+      }
+    }
+    return set_cr_done();
+  }
+  return 0;
+}
+
+RGWCoroutine* create_admin_data_log_trim_cr(RGWRados *store,
+                                            RGWHTTPManager *http,
+                                            int num_shards,
+                                            std::vector<std::string>& markers)
+{
+  return new DataLogTrimCR(store, http, num_shards, markers);
+}
+
+class DataLogTrimPollCR : public RGWCoroutine {
+  RGWRados *store;
+  RGWHTTPManager *http;
+  const int num_shards;
+  const utime_t interval; //< polling interval
+  const std::string lock_oid; //< use first data log shard for lock
+  const std::string lock_cookie;
+  std::vector<std::string> last_trim; //< last trimmed marker per shard
+
+ public:
+  DataLogTrimPollCR(RGWRados *store, RGWHTTPManager *http,
+                    int num_shards, utime_t interval)
+    : RGWCoroutine(store->ctx()), store(store), http(http),
+      num_shards(num_shards), interval(interval),
+      lock_oid(store->data_log->get_oid(0)),
+      lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
+      last_trim(num_shards)
+  {}
+
+  int operate() override;
+};
+
+int DataLogTrimPollCR::operate()
+{
+  reenter(this) {
+    for (;;) {
+      set_status("sleeping");
+      wait(interval);
+
+      // request a 'data_trim' lock that covers the entire wait interval to
+      // prevent other gateways from attempting to trim for the duration
+      set_status("acquiring trim lock");
+      yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
+                                          rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, lock_oid),
+                                          "data_trim", lock_cookie,
+                                          interval.sec()));
+      if (retcode < 0) {
+        // if the lock is already held, go back to sleep and try again later
+        ldout(cct, 4) << "failed to lock " << lock_oid << ", trying again in "
+            << interval.sec() << "s" << dendl;
+        continue;
+      }
+
+      set_status("trimming");
+      yield call(new DataLogTrimCR(store, http, num_shards, last_trim));
+
+      // note that the lock is not released. this is intentional, as it avoids
+      // duplicating this work in other gateways
+    }
+  }
+  return 0;
+}
+
+RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
+                                      RGWHTTPManager *http,
+                                      int num_shards, utime_t interval)
+{
+  return new DataLogTrimPollCR(store, http, num_shards, interval);
+}
diff --git a/src/rgw/rgw_trim_datalog.h b/src/rgw/rgw_trim_datalog.h
new file mode 100644 (file)
index 0000000..6b640da
--- /dev/null
@@ -0,0 +1,20 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+class RGWCoroutine;
+class RGWRados;
+class RGWHTTPManager;
+class utime_t;
+
+// DataLogTrimCR factory function
+extern RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
+                                             RGWHTTPManager *http,
+                                             int num_shards, utime_t interval);
+
+// factory function for datalog trim via radosgw-admin
+RGWCoroutine* create_admin_data_log_trim_cr(RGWRados *store,
+                                            RGWHTTPManager *http,
+                                            int num_shards,
+                                            std::vector<std::string>& markers);