]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add TrimComplete to watch/notify api
authorCasey Bodley <cbodley@redhat.com>
Fri, 15 Sep 2017 19:54:44 +0000 (15:54 -0400)
committerCasey Bodley <cbodley@redhat.com>
Fri, 10 Nov 2017 18:23:02 +0000 (13:23 -0500)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_sync_log_trim.cc

index 660db0089e1fab11f7bc6ba9fb31ac24381ede1c..171e7c80a0b3d39b0687aba00c4f0464142de71b 100644 (file)
@@ -45,6 +45,7 @@ using rgw::BucketTrimStatus;
 // watch/notify api for gateways to coordinate about which buckets to trim
 enum TrimNotifyType {
   NotifyTrimCounters = 0,
+  NotifyTrimComplete,
 };
 WRITE_RAW_ENCODER(TrimNotifyType);
 
@@ -93,6 +94,7 @@ struct TrimCounters {
     virtual ~Server() = default;
 
     virtual void get_bucket_counters(int count, Vector& counters) = 0;
+    virtual void reset_bucket_counters() = 0;
   };
 
   /// notify handler
@@ -162,6 +164,67 @@ void TrimCounters::Handler::handle(bufferlist::iterator& input,
   ::encode(response, output);
 }
 
+/// api to notify peer gateways that trim has completed and their bucket change
+/// counters can be reset
+struct TrimComplete {
+  struct Request {
+    void encode(bufferlist& bl) const;
+    void decode(bufferlist::iterator& p);
+  };
+  struct Response {
+    void encode(bufferlist& bl) const;
+    void decode(bufferlist::iterator& p);
+  };
+
+  /// server interface to reset bucket counters
+  using Server = TrimCounters::Server;
+
+  /// notify handler
+  class Handler : public TrimNotifyHandler {
+    Server *const server;
+   public:
+    Handler(Server *server) : server(server) {}
+
+    void handle(bufferlist::iterator& input, bufferlist& output) override;
+  };
+};
+
+void TrimComplete::Request::encode(bufferlist& bl) const
+{
+  ENCODE_START(1, 1, bl);
+  ENCODE_FINISH(bl);
+}
+void TrimComplete::Request::decode(bufferlist::iterator& p)
+{
+  DECODE_START(1, p);
+  DECODE_FINISH(p);
+}
+WRITE_CLASS_ENCODER(TrimComplete::Request);
+
+void TrimComplete::Response::encode(bufferlist& bl) const
+{
+  ENCODE_START(1, 1, bl);
+  ENCODE_FINISH(bl);
+}
+void TrimComplete::Response::decode(bufferlist::iterator& p)
+{
+  DECODE_START(1, p);
+  DECODE_FINISH(p);
+}
+WRITE_CLASS_ENCODER(TrimComplete::Response);
+
+void TrimComplete::Handler::handle(bufferlist::iterator& input,
+                                   bufferlist& output)
+{
+  Request request;
+  ::decode(request, input);
+
+  server->reset_bucket_counters();
+
+  Response response;
+  ::encode(response, output);
+}
+
 
 /// rados watcher for bucket trim notifications
 class BucketTrimWatcher : public librados::WatchCtx2 {
@@ -179,6 +242,7 @@ class BucketTrimWatcher : public librados::WatchCtx2 {
     : store(store), obj(obj)
   {
     handlers.emplace(NotifyTrimCounters, new TrimCounters::Handler(counters));
+    handlers.emplace(NotifyTrimComplete, new TrimComplete::Handler(counters));
   }
 
   ~BucketTrimWatcher()
@@ -652,6 +716,7 @@ class BucketTrimCR : public RGWCoroutine {
   const BucketTrimConfig& config;
   BucketTrimObserver *const observer;
   const rgw_raw_obj& obj;
+  ceph::mono_time start_time;
   bufferlist notify_replies;
   BucketChangeCounter counter;
   std::vector<std::string> buckets; //< buckets selected for trim
@@ -676,6 +741,8 @@ const std::string BucketTrimCR::section{"bucket.instance"};
 int BucketTrimCR::operate()
 {
   reenter(this) {
+    start_time = ceph::mono_clock::now();
+
     if (config.buckets_per_interval) {
       // query watch/notify for hot buckets
       ldout(cct, 10) << "fetching active bucket counters" << dendl;
@@ -780,6 +847,24 @@ int BucketTrimCR::operate()
       }
     }
 
+    // notify peers that trim completed
+    set_status("trim completed");
+    yield {
+      const TrimNotifyType type = NotifyTrimComplete;
+      TrimComplete::Request request;
+      bufferlist bl;
+      ::encode(type, bl);
+      ::encode(request, bl);
+      call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms,
+                                nullptr));
+    }
+    if (retcode < 0) {
+      ldout(cct, 10) << "failed to notify peers of trim completion" << dendl;
+      return set_cr_error(retcode);
+    }
+
+    ldout(cct, 4) << "bucket index log processing completed in "
+        << ceph::mono_clock::now() - start_time << dendl;
     return set_cr_done();
   }
   return 0;
@@ -950,6 +1035,14 @@ class BucketTrimManager::Impl : public TrimCounters::Server,
     ldout(store->ctx(), 20) << "get_bucket_counters: " << buckets << dendl;
   }
 
+  void reset_bucket_counters() override
+  {
+    ldout(store->ctx(), 20) << "bucket trim completed" << dendl;
+    std::lock_guard<std::mutex> lock(mutex);
+    counter.clear();
+    trimmed.expire_old(clock_type::now());
+  }
+
   /// BucketTrimObserver interface to remember successfully-trimmed buckets
   void on_bucket_trimmed(std::string&& bucket_instance) override
   {