]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add BucketTrimWatcher to serve watch/notify apis
authorCasey Bodley <cbodley@redhat.com>
Fri, 1 Sep 2017 15:26:01 +0000 (11:26 -0400)
committerCasey Bodley <cbodley@redhat.com>
Fri, 10 Nov 2017 18:23:00 +0000 (13:23 -0500)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_sync_log_trim.cc
src/rgw/rgw_sync_log_trim.h

index cd177dc5747459ea00e302cb10ad8c8119c7ea44..462b467aceabfc1c451500489207e2809b3998b4 100644 (file)
  */
 
 #include <mutex>
+#include <boost/container/flat_map.hpp>
 
 #include "common/bounded_key_counter.h"
+#include "common/errno.h"
 #include "rgw_sync_log_trim.h"
+#include "rgw_rados.h"
+#include "include/assert.h"
 
 #define dout_subsys ceph_subsys_rgw
 
 using rgw::BucketTrimConfig;
 using BucketChangeCounter = BoundedKeyCounter<std::string, int>;
 
+
+// watch/notify api for gateways to coordinate about which buckets to trim
+enum TrimNotifyType {
+};
+WRITE_RAW_ENCODER(TrimNotifyType);
+
+struct TrimNotifyHandler {
+  virtual ~TrimNotifyHandler() = default;
+
+  virtual void handle(bufferlist::iterator& input, bufferlist& output) = 0;
+};
+
+/// rados watcher for bucket trim notifications
+class BucketTrimWatcher : public librados::WatchCtx2 {
+  RGWRados *const store;
+  const rgw_raw_obj& obj;
+  rgw_rados_ref ref;
+  uint64_t handle{0};
+
+  using HandlerPtr = std::unique_ptr<TrimNotifyHandler>;
+  boost::container::flat_map<TrimNotifyType, HandlerPtr> handlers;
+
+ public:
+  BucketTrimWatcher(RGWRados *store, const rgw_raw_obj& obj)
+    : store(store), obj(obj)
+  {
+  }
+
+  ~BucketTrimWatcher()
+  {
+    stop();
+  }
+
+  int start()
+  {
+    int r = store->get_raw_obj_ref(obj, &ref);
+    if (r < 0) {
+      return r;
+    }
+
+    // register a watch on the realm's control object
+    r = ref.ioctx.watch2(ref.oid, &handle, this);
+    if (r == -ENOENT) {
+      constexpr bool exclusive = true;
+      r = ref.ioctx.create(ref.oid, exclusive);
+      if (r == -EEXIST || r == 0) {
+        r = ref.ioctx.watch2(ref.oid, &handle, this);
+      }
+    }
+    if (r < 0) {
+      lderr(store->ctx()) << "Failed to watch " << ref.oid
+          << " with " << cpp_strerror(-r) << dendl;
+      ref.ioctx.close();
+      return r;
+    }
+
+    ldout(store->ctx(), 10) << "Watching " << ref.oid << dendl;
+    return 0;
+  }
+
+  int restart()
+  {
+    int r = ref.ioctx.unwatch2(handle);
+    if (r < 0) {
+      lderr(store->ctx()) << "Failed to unwatch on " << ref.oid
+          << " with " << cpp_strerror(-r) << dendl;
+    }
+    r = ref.ioctx.watch2(ref.oid, &handle, this);
+    if (r < 0) {
+      lderr(store->ctx()) << "Failed to restart watch on " << ref.oid
+          << " with " << cpp_strerror(-r) << dendl;
+      ref.ioctx.close();
+    }
+    return r;
+  }
+
+  void stop()
+  {
+    ref.ioctx.unwatch2(handle);
+    ref.ioctx.close();
+  }
+
+  /// respond to bucket trim notifications
+  void handle_notify(uint64_t notify_id, uint64_t cookie,
+                     uint64_t notifier_id, bufferlist& bl) override
+  {
+    if (cookie != handle) {
+      return;
+    }
+    bufferlist reply;
+    try {
+      auto p = bl.begin();
+      TrimNotifyType type;
+      ::decode(type, p);
+
+      auto handler = handlers.find(type);
+      if (handler != handlers.end()) {
+        handler->second->handle(p, reply);
+      } else {
+        lderr(store->ctx()) << "no handler for notify type " << type << dendl;
+      }
+    } catch (const buffer::error& e) {
+      lderr(store->ctx()) << "Failed to decode notification: " << e.what() << dendl;
+    }
+    ref.ioctx.notify_ack(ref.oid, notify_id, cookie, reply);
+  }
+
+  /// reestablish the watch if it gets disconnected
+  void handle_error(uint64_t cookie, int err) override
+  {
+    if (cookie != handle) {
+      return;
+    }
+    if (err == -ENOTCONN) {
+      ldout(store->ctx(), 4) << "Disconnected watch on " << ref.oid << dendl;
+      restart();
+    }
+  }
+};
+
+
 namespace rgw {
 
 class BucketTrimManager::Impl {
@@ -33,15 +158,22 @@ class BucketTrimManager::Impl {
   RGWRados *const store;
   const BucketTrimConfig config;
 
+  const rgw_raw_obj status_obj;
+
   /// count frequency of bucket instance entries in the data changes log
   BucketChangeCounter counter;
 
-  /// protect data shared between data sync and trim threads
+  /// serve the bucket trim watch/notify api
+  BucketTrimWatcher watcher;
+
+  /// protect data shared between data sync, trim, and watch/notify threads
   std::mutex mutex;
 
   Impl(RGWRados *store, const BucketTrimConfig& config)
     : store(store), config(config),
-      counter(config.counter_size)
+      status_obj(store->get_zone_params().log_pool, "bilog.trim"),
+      counter(config.counter_size),
+      watcher(store, status_obj)
   {}
 };
 
@@ -52,6 +184,11 @@ BucketTrimManager::BucketTrimManager(RGWRados *store,
 }
 BucketTrimManager::~BucketTrimManager() = default;
 
+int BucketTrimManager::init()
+{
+  return impl->watcher.start();
+}
+
 void BucketTrimManager::on_bucket_changed(const boost::string_view& bucket)
 {
   std::lock_guard<std::mutex> lock(impl->mutex);
index 2819eeb0bc4f667de32bc39ad66099c3a63a62c0..6110a7aa80e31eb94a475112703f0121f6d89e44 100644 (file)
@@ -51,6 +51,8 @@ class BucketTrimManager : public BucketChangeObserver {
   BucketTrimManager(RGWRados *store, const BucketTrimConfig& config);
   ~BucketTrimManager();
 
+  int init();
+
   /// increment a counter for the given bucket instance
   void on_bucket_changed(const boost::string_view& bucket_instance) override;
 };