]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add RGWDataLogTrimCR
authorCasey Bodley <cbodley@redhat.com>
Mon, 6 Jun 2016 20:46:56 +0000 (16:46 -0400)
committerCasey Bodley <cbodley@redhat.com>
Fri, 22 Jul 2016 16:08:22 +0000 (12:08 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_bucket.h
src/rgw/rgw_cr_rados.h
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h

index 12182d12935ab0828283dc3fcff54b12959ba3c7..ee52ad626b7eca3b9df983aeefefe76e0ab34c1a 100644 (file)
@@ -466,6 +466,7 @@ public:
   ~RGWDataChangesLog();
 
   int choose_oid(const rgw_bucket_shard& bs);
+  const std::string& get_oid(int shard_id) const { return oids[shard_id]; }
   int add_entry(rgw_bucket& bucket, int shard_id);
   int get_log_shard_id(rgw_bucket& bucket, int shard_id);
   int renew_entries();
index 5174ab6e1f29bf88e928e1a2b103317642dd8e03..a1cbfabc8dee37de2ae6ddeae5406d4ccca8c095 100644 (file)
@@ -984,13 +984,14 @@ public:
 class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine {
   RGWRados *store;
   RGWAioCompletionNotifier *cn{nullptr};
+ protected:
   std::string oid;
   real_time start_time;
   real_time end_time;
   std::string from_marker;
   std::string to_marker;
 
-public:
+ public:
   RGWRadosTimelogTrimCR(RGWRados *store, const std::string& oid,
                         const real_time& start_time, const real_time& end_time,
                         const std::string& from_marker,
index fe7155c8036a4f8376240d991b5b527c02afa135..051231129269a2d1088c18c525ae1ab66b922df3 100644 (file)
@@ -2838,3 +2838,148 @@ string RGWBucketSyncStatusManager::status_oid(const string& source_zone,
   return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key();
 }
 
+
+#undef dout_prefix
+#define dout_prefix (*_dout << "data trim: ")
+
+namespace {
+
+/// return the marker that it's safe to trim up to
+const std::string& get_stable_marker(const rgw_data_sync_marker& m)
+{
+  return m.state == m.FullSync ? m.next_step_marker : m.marker;
+}
+
+/// comparison operator for take_min_markers()
+bool operator<(const rgw_data_sync_marker& lhs,
+               const rgw_data_sync_marker& rhs)
+{
+  // sort by stable marker
+  return get_stable_marker(lhs) < get_stable_marker(rhs);
+}
+
+/// populate the container starting with 'dest' with the minimum stable marker
+/// of each shard for all of the peers in [first, last)
+template <typename IterIn, typename IterOut>
+void take_min_markers(IterIn first, IterIn last, IterOut dest)
+{
+  if (first == last) {
+    return;
+  }
+  // initialize markers with the first peer's
+  auto m = dest;
+  for (auto &shard : first->sync_markers) {
+    *m = std::move(shard.second);
+    ++m;
+  }
+  // for remaining peers, replace with smaller markers
+  for (auto p = first + 1; p != last; ++p) {
+    m = dest;
+    for (auto &shard : p->sync_markers) {
+      if (shard.second < *m) {
+        *m = std::move(shard.second);
+      }
+      ++m;
+    }
+  }
+}
+
+// wrapper to update last_trim_marker on success
+class LastTimelogTrimCR : public RGWRadosTimelogTrimCR {
+  CephContext *cct;
+  std::string *last_trim_marker;
+ public:
+  LastTimelogTrimCR(RGWRados *store, const std::string& oid,
+                    const std::string& to_marker, std::string *last_trim_marker)
+    : RGWRadosTimelogTrimCR(store, oid, real_time{}, real_time{},
+                            std::string{}, to_marker),
+      cct(store->ctx()), last_trim_marker(last_trim_marker)
+  {}
+  int request_complete() override {
+    int r = RGWRadosTimelogTrimCR::request_complete();
+    if (r < 0 && r != -ENODATA) {
+      ldout(cct, 1) << "failed to trim datalog: " << cpp_strerror(r) << dendl;
+      return r;
+    }
+    ldout(cct, 10) << "datalog trimmed to marker " << to_marker << dendl;
+    *last_trim_marker = to_marker;
+    return 0;
+  }
+};
+
+} // anonymous namespace
+
+RGWDataLogTrimCR::RGWDataLogTrimCR(RGWRados *store, RGWHTTPManager *http,
+                                   int num_shards, utime_t interval)
+  : RGWCoroutine(store->ctx()), store(store), http(http),
+    num_shards(num_shards), interval(interval),
+    zone(store->get_zone().id),
+    peer_status(store->zone_conn_map.size()),
+    min_shard_markers(num_shards),
+    last_trim(num_shards)
+{
+}
+
+int RGWDataLogTrimCR::operate()
+{
+  reenter(this) {
+    for (;;) {
+      yield wait(interval);
+
+      ldout(cct, 10) << "fetching sync status for zone " << zone << dendl;
+      yield {
+        // query data sync status from each sync peer
+        rgw_http_param_pair params[] = {
+          { "type", "data" },
+          { "status", nullptr },
+          { "source-zone", zone.c_str() },
+          { nullptr, nullptr }
+        };
+
+        auto p = peer_status.begin();
+        for (auto& c : store->zone_conn_map) {
+          ldout(cct, 20) << "query sync status from " << c.first << dendl;
+          using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
+          spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
+                false);
+          ++p;
+        }
+      }
+
+      // must get a successful reply from all peers to consider trimming
+      ret = 0;
+      while (ret == 0 && num_spawned()) {
+        yield wait_for_child();
+        collect_next(&ret);
+      }
+      if (ret < 0) {
+        ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
+        drain_all();
+        continue;
+      }
+
+      ldout(cct, 10) << "trimming log shards" << dendl;
+      yield {
+        // determine the minimum marker for each shard
+        take_min_markers(peer_status.begin(), peer_status.end(),
+                         min_shard_markers.begin());
+
+        for (int i = 0; i < num_shards; i++) {
+          const auto& m = min_shard_markers[i];
+          auto& stable = get_stable_marker(m);
+          if (stable <= last_trim[i]) {
+            continue;
+          }
+          ldout(cct, 10) << "trimming log shard " << i
+              << " at marker=" << stable
+              << " last_trim=" << last_trim[i] << dendl;
+          using TrimCR = LastTimelogTrimCR;
+          spawn(new TrimCR(store, store->data_log->get_oid(i),
+                           stable, &last_trim[i]),
+                true);
+        }
+      }
+    }
+  }
+  return 0;
+}
index 1c950674c2245706e4b057f9f99977c7b9dccafe..07953c5d942c5a6f004ec2005ce7c62dfb2e88d8 100644 (file)
@@ -501,4 +501,21 @@ public:
 };
 
 
+class RGWDataLogTrimCR : public RGWCoroutine {
+  RGWRados *store;
+  RGWHTTPManager *http;
+  const int num_shards;
+  const utime_t interval; //< polling interval
+  const std::string& zone; //< my zone id
+  std::vector<rgw_data_sync_status> peer_status; //< sync status for each peer
+  std::vector<rgw_data_sync_marker> min_shard_markers; //< min marker per shard
+  std::vector<std::string> last_trim; //< last trimmed marker per shard
+  int ret{0};
+
+ public:
+  RGWDataLogTrimCR(RGWRados *store, RGWHTTPManager *http,
+                   int num_shards, utime_t interval);
+  int operate() override;
+};
+
 #endif