]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add TrimCounters api to BucketTrimWatcher
authorCasey Bodley <cbodley@redhat.com>
Fri, 1 Sep 2017 15:29:55 +0000 (11:29 -0400)
committerCasey Bodley <cbodley@redhat.com>
Mon, 22 Jan 2018 22:02:06 +0000 (17:02 -0500)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
(cherry picked from commit 5bcf109eac30780cfa9ae5d524d2bde638651f40)

src/rgw/rgw_sync_log_trim.cc

index 462b467aceabfc1c451500489207e2809b3998b4..09017290dfd4ade3746acae7f7c488f03ce434e0 100644 (file)
@@ -18,8 +18,8 @@
 
 #include "common/bounded_key_counter.h"
 #include "common/errno.h"
-#include "rgw_sync_log_trim.h"
 #include "rgw_rados.h"
+#include "rgw_sync_log_trim.h"
 #include "include/assert.h"
 
 #define dout_subsys ceph_subsys_rgw
@@ -33,6 +33,7 @@ using BucketChangeCounter = BoundedKeyCounter<std::string, int>;
 
 // watch/notify api for gateways to coordinate about which buckets to trim
 enum TrimNotifyType {
+  NotifyTrimCounters = 0,
 };
 WRITE_RAW_ENCODER(TrimNotifyType);
 
@@ -42,6 +43,115 @@ struct TrimNotifyHandler {
   virtual void handle(bufferlist::iterator& input, bufferlist& output) = 0;
 };
 
+/// api to share the bucket trim counters between gateways in the same zone.
+/// each gateway will process different datalog shards, so the gateway that runs
+/// the trim process needs to accumulate their counters
+struct TrimCounters {
+  /// counter for a single bucket
+  struct BucketCounter {
+    std::string bucket;
+    int count{0};
+
+    BucketCounter() = default;
+    BucketCounter(const std::string& bucket, int count)
+      : bucket(bucket), count(count) {}
+
+    void encode(bufferlist& bl) const;
+    void decode(bufferlist::iterator& p);
+  };
+  using Vector = std::vector<BucketCounter>;
+
+  /// request bucket trim counters from peer gateways
+  struct Request {
+    uint16_t max_buckets; //< maximum number of bucket counters to return
+
+    void encode(bufferlist& bl) const;
+    void decode(bufferlist::iterator& p);
+  };
+
+  /// return the current bucket trim counters
+  struct Response {
+    Vector bucket_counters;
+
+    void encode(bufferlist& bl) const;
+    void decode(bufferlist::iterator& p);
+  };
+
+  /// server interface to query the hottest buckets
+  struct Server {
+    virtual ~Server() = default;
+
+    virtual void get_bucket_counters(int count, Vector& counters) = 0;
+  };
+
+  /// notify handler
+  class Handler : public TrimNotifyHandler {
+    Server *const server;
+   public:
+    Handler(Server *server) : server(server) {}
+
+    void handle(bufferlist::iterator& input, bufferlist& output) override;
+  };
+};
+std::ostream& operator<<(std::ostream& out, const TrimCounters::BucketCounter& rhs)
+{
+  return out << rhs.bucket << ":" << rhs.count;
+}
+
+void TrimCounters::BucketCounter::encode(bufferlist& bl) const
+{
+  // no versioning to save space
+  ::encode(bucket, bl);
+  ::encode(count, bl);
+}
+void TrimCounters::BucketCounter::decode(bufferlist::iterator& p)
+{
+  ::decode(bucket, p);
+  ::decode(count, p);
+}
+WRITE_CLASS_ENCODER(TrimCounters::BucketCounter);
+
+void TrimCounters::Request::encode(bufferlist& bl) const
+{
+  ENCODE_START(1, 1, bl);
+  ::encode(max_buckets, bl);
+  ENCODE_FINISH(bl);
+}
+void TrimCounters::Request::decode(bufferlist::iterator& p)
+{
+  DECODE_START(1, p);
+  ::decode(max_buckets, p);
+  DECODE_FINISH(p);
+}
+WRITE_CLASS_ENCODER(TrimCounters::Request);
+
+void TrimCounters::Response::encode(bufferlist& bl) const
+{
+  ENCODE_START(1, 1, bl);
+  ::encode(bucket_counters, bl);
+  ENCODE_FINISH(bl);
+}
+void TrimCounters::Response::decode(bufferlist::iterator& p)
+{
+  DECODE_START(1, p);
+  ::decode(bucket_counters, p);
+  DECODE_FINISH(p);
+}
+WRITE_CLASS_ENCODER(TrimCounters::Response);
+
+void TrimCounters::Handler::handle(bufferlist::iterator& input,
+                                   bufferlist& output)
+{
+  Request request;
+  ::decode(request, input);
+  auto count = std::min<uint16_t>(request.max_buckets, 128);
+
+  Response response;
+  server->get_bucket_counters(count, response.bucket_counters);
+  ::encode(response, output);
+}
+
+
 /// rados watcher for bucket trim notifications
 class BucketTrimWatcher : public librados::WatchCtx2 {
   RGWRados *const store;
@@ -53,9 +163,11 @@ class BucketTrimWatcher : public librados::WatchCtx2 {
   boost::container::flat_map<TrimNotifyType, HandlerPtr> handlers;
 
  public:
-  BucketTrimWatcher(RGWRados *store, const rgw_raw_obj& obj)
+  BucketTrimWatcher(RGWRados *store, const rgw_raw_obj& obj,
+                    TrimCounters::Server *counters)
     : store(store), obj(obj)
   {
+    handlers.emplace(NotifyTrimCounters, new TrimCounters::Handler(counters));
   }
 
   ~BucketTrimWatcher()
@@ -153,7 +265,7 @@ class BucketTrimWatcher : public librados::WatchCtx2 {
 
 namespace rgw {
 
-class BucketTrimManager::Impl {
+class BucketTrimManager::Impl : public TrimCounters::Server {
  public:
   RGWRados *const store;
   const BucketTrimConfig config;
@@ -173,8 +285,19 @@ class BucketTrimManager::Impl {
     : store(store), config(config),
       status_obj(store->get_zone_params().log_pool, "bilog.trim"),
       counter(config.counter_size),
-      watcher(store, status_obj)
+      watcher(store, status_obj, this)
   {}
+
+  /// TrimCounters::Server interface for watch/notify api
+  void get_bucket_counters(int count, TrimCounters::Vector& buckets)
+  {
+    buckets.reserve(count);
+    std::lock_guard<std::mutex> lock(mutex);
+    counter.get_highest(count, [&buckets] (const std::string& key, int count) {
+                          buckets.emplace_back(key, count);
+                        });
+    ldout(store->ctx(), 20) << "get_bucket_counters: " << buckets << dendl;
+  }
 };
 
 BucketTrimManager::BucketTrimManager(RGWRados *store,