]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: Fix async caller into Datalog
authorAdam C. Emerson <aemerson@redhat.com>
Fri, 5 Jun 2020 23:28:29 +0000 (19:28 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Wed, 9 Sep 2020 02:09:40 +0000 (22:09 -0400)
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/rgw_bucket.cc
src/rgw/rgw_bucket.h
src/rgw/rgw_cr_rados.h
src/rgw/rgw_trim_datalog.cc
src/rgw/rgw_trim_datalog.h

index 32f299efbc164f4db22ff4703352de942e6054e2..100c2b94e69fa1f99b22bd2d1acae2a6d94ffab9 100644 (file)
@@ -2367,7 +2367,7 @@ int RGWDataChangesLog::trim_entries(int shard_id, std::string_view marker,
                               {}, std::string(marker), c, null_yield);
 }
 
-bool RGWDataChangesLog::going_down()
+bool RGWDataChangesLog::going_down() const
 {
   return down_flag;
 }
@@ -2426,6 +2426,10 @@ void RGWDataChangesLog::read_clear_modified(map<int, set<string> > &modified)
   modified_shards.clear();
 }
 
+std::string_view RGWDataChangesLog::max_marker() const {
+  return "99999999"sv;
+}
+
 void RGWBucketCompleteInfo::dump(Formatter *f) const {
   encode_json("bucket_info", info, f);
   encode_json("attrs", attrs, f);
index 127643187cecce4ddfec9a76f6b22bdd5b313781..597707e46677e2fef3a0e1e3254bee51ed1a0f01 100644 (file)
@@ -536,6 +536,10 @@ class RGWDataChangesLog {
   std::thread renew_thread;
 
   std::function<bool(const rgw_bucket& bucket, optional_yield y)> bucket_filter;
+  int choose_oid(const rgw_bucket_shard& bs);
+  bool going_down() const;
+  bool filter_bucket(const rgw_bucket& bucket, optional_yield y) const;
+  int renew_entries();
 
 public:
 
@@ -545,11 +549,8 @@ public:
   void init(RGWSI_Cls *cls_svc);
   int start(const RGWZone* _zone);
 
-  int choose_oid(const rgw_bucket_shard& bs);
-  std::string get_oid(int shard_id) const;
   int add_entry(const RGWBucketInfo& bucket_info, int shard_id);
   int get_log_shard_id(rgw_bucket& bucket, int shard_id);
-  int renew_entries();
   int list_entries(int shard, int max_entries,
                   std::vector<rgw_data_change_log_entry>& entries,
                   std::optional<std::string_view> marker,
@@ -572,13 +573,12 @@ public:
     this->observer = observer;
   }
 
-  bool going_down();
-
   void set_bucket_filter(decltype(bucket_filter)&& f) {
     bucket_filter = std::move(f);
   }
-
-  bool filter_bucket(const rgw_bucket& bucket, optional_yield y) const;
+  // a marker that compares greater than any other
+  std::string_view max_marker() const;
+  std::string get_oid(int shard_id) const;
 };
 
 struct rgw_ep_info {
index 0e8a94154f0e564998bd9aec26ded724ab14748a..0f837d2007fb33af7662f12d2ef62488af1932ac 100644 (file)
@@ -1342,7 +1342,6 @@ class RGWSyncLogTrimCR : public RGWRadosTimelogTrimCR {
   CephContext *cct;
   std::string *last_trim_marker;
  public:
-  // a marker that compares greater than any timestamp-based index
   static constexpr const char* max_marker = "99999999";
 
   RGWSyncLogTrimCR(rgw::sal::RGWRadosStore *store, const std::string& oid,
index f5ab685e8147a059a93634df27699d5262a49968..26d2cb258964d9da6d8b9960cde9a920b771a742 100644 (file)
 
 namespace {
 
+class DatalogTrimImplCR : public RGWSimpleCoroutine {
+  rgw::sal::RGWRadosStore *store;
+  boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
+  int shard;
+  std::string marker;
+  std::string* last_trim_marker;
+
+ public:
+  DatalogTrimImplCR(rgw::sal::RGWRadosStore* store, int shard,
+                   const std::string& marker, std::string* last_trim_marker)
+  : RGWSimpleCoroutine(store->ctx()), store(store), shard(shard),
+    marker(marker), last_trim_marker(last_trim_marker) {
+    set_description() << "Datalog trim shard=" << shard
+                     << " marker=" << marker;
+  }
+
+  int send_request() override {
+    set_status() << "sending request";
+    cn = stack->create_completion_notifier();
+    return store->svc()->datalog_rados->trim_entries(shard, marker,
+                                                    cn->completion());
+  }
+  int request_complete() override {
+    int r = cn->completion()->get_return_value();
+    set_status() << "request complete; ret=" << r;
+    if (r != -ENODATA) {
+      return r;
+    }
+    // nothing left to trim, update last_trim_marker
+    if (*last_trim_marker < marker &&
+       marker != store->svc()->datalog_rados->max_marker()) {
+      *last_trim_marker = marker;
+    }
+    return 0;
+  }
+};
+
 /// return the marker that it's safe to trim up to
 const std::string& get_stable_marker(const rgw_data_sync_marker& m)
 {
@@ -51,7 +88,7 @@ void take_min_markers(IterIn first, IterIn last, IterOut dest)
 } // anonymous namespace
 
 class DataLogTrimCR : public RGWCoroutine {
-  using TrimCR = RGWSyncLogTrimCR;
+  using TrimCR = DatalogTrimImplCR;
   rgw::sal::RGWRadosStore *store;
   RGWHTTPManager *http;
   const int num_shards;
@@ -68,7 +105,8 @@ class DataLogTrimCR : public RGWCoroutine {
       num_shards(num_shards),
       zone_id(store->svc()->zone->get_zone().id),
       peer_status(store->svc()->zone->get_zone_data_notify_to_map().size()),
-      min_shard_markers(num_shards, TrimCR::max_marker),
+      min_shard_markers(num_shards,
+                       std::string(store->svc()->datalog_rados->max_marker())),
       last_trim(last_trim)
   {}
 
@@ -127,8 +165,7 @@ int DataLogTrimCR::operate()
         ldout(cct, 10) << "trimming log shard " << i
             << " at marker=" << m
             << " last_trim=" << last_trim[i] << dendl;
-        spawn(new TrimCR(store, store->svc()->datalog_rados->get_oid(i),
-                         m, &last_trim[i]),
+        spawn(new TrimCR(store, i, m, &last_trim[i]),
               true);
       }
     }
index cf64a4c6c9d496ca4d375a8fbdec594d3467fb88..ffdd2b38450c9068c96280997f200f4b11b09b4d 100644 (file)
@@ -3,6 +3,9 @@
 
 #pragma once
 
+#include <string>
+#include <vector>
+
 class RGWCoroutine;
 class RGWRados;
 class RGWHTTPManager;