]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add BucketTrimInstanceCR
authorCasey Bodley <cbodley@redhat.com>
Thu, 7 Sep 2017 20:27:53 +0000 (16:27 -0400)
committerCasey Bodley <cbodley@redhat.com>
Fri, 10 Nov 2017 18:23:02 +0000 (13:23 -0500)
fetches bucket sync status from each peer, calculates the min markers
for each shard, and trims the bilog shards. calls the TrimObserver on
success

Fixes: http://tracker.ceph.com/issues/18229
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_sync_log_trim.cc

index 74acfe4d3e4903ffa2a64a49da8ced3fab9bf019..660db0089e1fab11f7bc6ba9fb31ac24381ede1c 100644 (file)
@@ -21,6 +21,8 @@
 #include "common/errno.h"
 #include "rgw_sync_log_trim.h"
 #include "rgw_cr_rados.h"
+#include "rgw_cr_rest.h"
+#include "rgw_data_sync.h"
 #include "rgw_metadata.h"
 #include "rgw_rados.h"
 #include "rgw_sync.h"
@@ -280,12 +282,86 @@ struct BucketTrimObserver {
   virtual bool trimmed_recently(const boost::string_view& bucket_instance) = 0;
 };
 
+/// populate the status with the minimum stable marker of each shard
+template <typename Iter>
+int take_min_status(CephContext *cct, Iter first, Iter last,
+                    std::vector<std::string> *status)
+{
+  status->clear();
+  boost::optional<size_t> num_shards;
+  for (auto peer = first; peer != last; ++peer) {
+    const size_t peer_shards = peer->size();
+    if (!num_shards) {
+      num_shards = peer_shards;
+      status->resize(*num_shards);
+    } else if (*num_shards != peer_shards) {
+      // all peers must agree on the number of shards
+      return -EINVAL;
+    }
+    auto m = status->begin();
+    for (auto& shard : *peer) {
+      auto& marker = *m++;
+      // only consider incremental sync markers
+      if (shard.state != rgw_bucket_shard_sync_info::StateIncrementalSync) {
+        continue;
+      }
+      // always take the first marker, or any later marker that's smaller
+      if (peer == first || marker > shard.inc_marker.position) {
+        marker = std::move(shard.inc_marker.position);
+      }
+    }
+  }
+  return 0;
+}
+
+/// trim each bilog shard to the given marker, while limiting the number of
+/// concurrent requests
+class BucketTrimShardCollectCR : public RGWShardCollectCR {
+  static constexpr int MAX_CONCURRENT_SHARDS = 16;
+  RGWRados *const store;
+  const RGWBucketInfo& bucket_info;
+  const std::vector<std::string>& markers; //< shard markers to trim
+  size_t i{0}; //< index of current shard marker
+ public:
+  BucketTrimShardCollectCR(RGWRados *store, const RGWBucketInfo& bucket_info,
+                           const std::vector<std::string>& markers)
+    : RGWShardCollectCR(store->ctx(), MAX_CONCURRENT_SHARDS),
+      store(store), bucket_info(bucket_info), markers(markers)
+  {}
+  bool spawn_next() override;
+};
+
+bool BucketTrimShardCollectCR::spawn_next()
+{
+  while (i < markers.size()) {
+    const auto& marker = markers[i];
+    const auto shard_id = i++;
+
+    // skip empty markers
+    if (!marker.empty()) {
+      ldout(cct, 10) << "trimming bilog shard " << shard_id
+          << " of " << bucket_info.bucket << " at marker " << marker << dendl;
+      spawn(new RGWRadosBILogTrimCR(store, bucket_info, shard_id,
+                                    std::string{}, marker),
+            false);
+      return true;
+    }
+  }
+  return false;
+}
+
 /// trim the bilog of all of the given bucket instance's shards
 class BucketTrimInstanceCR : public RGWCoroutine {
   RGWRados *const store;
   RGWHTTPManager *const http;
   BucketTrimObserver *const observer;
   std::string bucket_instance;
+  const std::string& zone_id; //< my zone id
+  RGWBucketInfo bucket_info; //< bucket instance info to locate bucket indices
+
+  using StatusShards = std::vector<rgw_bucket_shard_sync_info>;
+  std::vector<StatusShards> peer_status; //< sync status for each peer
+  std::vector<std::string> min_markers; //< min marker per shard
 
  public:
   BucketTrimInstanceCR(RGWRados *store, RGWHTTPManager *http,
@@ -293,13 +369,82 @@ class BucketTrimInstanceCR : public RGWCoroutine {
                        const std::string& bucket_instance)
     : RGWCoroutine(store->ctx()), store(store),
       http(http), observer(observer),
-      bucket_instance(bucket_instance)
+      bucket_instance(bucket_instance),
+      zone_id(store->get_zone().id),
+      peer_status(store->zone_conn_map.size())
   {}
-  int operate() {
+
+  int operate() override;
+};
+
+int BucketTrimInstanceCR::operate()
+{
+  reenter(this) {
+    ldout(cct, 4) << "starting trim on bucket=" << bucket_instance << dendl;
+
+    // query peers for sync status
+    set_status("fetching sync status from peers");
+    yield {
+      // query data sync status from each sync peer
+      rgw_http_param_pair params[] = {
+        { "type", "bucket-index" },
+        { "status", nullptr },
+        { "bucket", bucket_instance.c_str() },
+        { "source-zone", zone_id.c_str() },
+        { nullptr, nullptr }
+      };
+
+      auto p = peer_status.begin();
+      for (auto& c : store->zone_conn_map) {
+        using StatusCR = RGWReadRESTResourceCR<StatusShards>;
+        spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
+              false);
+        ++p;
+      }
+      // in parallel, read the local bucket instance info
+      spawn(new RGWGetBucketInstanceInfoCR(store->get_async_rados(), store,
+                                           bucket_instance, &bucket_info),
+            false);
+    }
+    // wait for a response from each peer. all must respond to attempt trim
+    while (num_spawned()) {
+      int child_ret;
+      yield wait_for_child();
+      collect(&child_ret, nullptr);
+      if (child_ret < 0) {
+        drain_all();
+        return set_cr_error(child_ret);
+      }
+    }
+
+    // determine the minimum marker for each shard
+    retcode = take_min_status(cct, peer_status.begin(), peer_status.end(),
+                              &min_markers);
+    if (retcode < 0) {
+      ldout(cct, 4) << "failed to correlate bucket sync status from peers" << dendl;
+      return set_cr_error(retcode);
+    }
+
+    // trim shards with a ShardCollectCR
+    ldout(cct, 10) << "trimming bilogs for bucket=" << bucket_info.bucket
+       << " markers=" << min_markers << ", shards=" << min_markers.size() << dendl;
+    set_status("trimming bilog shards");
+    yield call(new BucketTrimShardCollectCR(store, bucket_info, min_markers));
+    // ENODATA just means there were no keys to trim
+    if (retcode == -ENODATA) {
+      retcode = 0;
+    }
+    if (retcode < 0) {
+      ldout(cct, 4) << "failed to trim bilog shards: "
+          << cpp_strerror(retcode) << dendl;
+      return set_cr_error(retcode);
+    }
+
     observer->on_bucket_trimmed(std::move(bucket_instance));
     return set_cr_done();
   }
-};
+  return 0;
+}
 
 /// trim each bucket instance while limiting the number of concurrent operations
 class BucketTrimInstanceCollectCR : public RGWShardCollectCR {