From f96d9a8e22195bfb3347b5add7a4385895d36f9c Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Fri, 1 Sep 2017 11:26:01 -0400 Subject: [PATCH] rgw: add BucketTrimWatcher to serve watch/notify apis Signed-off-by: Casey Bodley --- src/rgw/rgw_sync_log_trim.cc | 141 ++++++++++++++++++++++++++++++++++- src/rgw/rgw_sync_log_trim.h | 2 + 2 files changed, 141 insertions(+), 2 deletions(-) diff --git a/src/rgw/rgw_sync_log_trim.cc b/src/rgw/rgw_sync_log_trim.cc index cd177dc5747..462b467acea 100644 --- a/src/rgw/rgw_sync_log_trim.cc +++ b/src/rgw/rgw_sync_log_trim.cc @@ -14,9 +14,13 @@ */ #include +#include #include "common/bounded_key_counter.h" +#include "common/errno.h" #include "rgw_sync_log_trim.h" +#include "rgw_rados.h" +#include "include/assert.h" #define dout_subsys ceph_subsys_rgw @@ -26,6 +30,127 @@ using rgw::BucketTrimConfig; using BucketChangeCounter = BoundedKeyCounter; + +// watch/notify api for gateways to coordinate about which buckets to trim +enum TrimNotifyType { +}; +WRITE_RAW_ENCODER(TrimNotifyType); + +struct TrimNotifyHandler { + virtual ~TrimNotifyHandler() = default; + + virtual void handle(bufferlist::iterator& input, bufferlist& output) = 0; +}; + +/// rados watcher for bucket trim notifications +class BucketTrimWatcher : public librados::WatchCtx2 { + RGWRados *const store; + const rgw_raw_obj& obj; + rgw_rados_ref ref; + uint64_t handle{0}; + + using HandlerPtr = std::unique_ptr; + boost::container::flat_map handlers; + + public: + BucketTrimWatcher(RGWRados *store, const rgw_raw_obj& obj) + : store(store), obj(obj) + { + } + + ~BucketTrimWatcher() + { + stop(); + } + + int start() + { + int r = store->get_raw_obj_ref(obj, &ref); + if (r < 0) { + return r; + } + + // register a watch on the realm's control object + r = ref.ioctx.watch2(ref.oid, &handle, this); + if (r == -ENOENT) { + constexpr bool exclusive = true; + r = ref.ioctx.create(ref.oid, exclusive); + if (r == -EEXIST || r == 0) { + r = ref.ioctx.watch2(ref.oid, &handle, this); + } + } + if (r < 0) { + lderr(store->ctx()) << "Failed to watch " << ref.oid + << " with " << cpp_strerror(-r) << dendl; + ref.ioctx.close(); + return r; + } + + ldout(store->ctx(), 10) << "Watching " << ref.oid << dendl; + return 0; + } + + int restart() + { + int r = ref.ioctx.unwatch2(handle); + if (r < 0) { + lderr(store->ctx()) << "Failed to unwatch on " << ref.oid + << " with " << cpp_strerror(-r) << dendl; + } + r = ref.ioctx.watch2(ref.oid, &handle, this); + if (r < 0) { + lderr(store->ctx()) << "Failed to restart watch on " << ref.oid + << " with " << cpp_strerror(-r) << dendl; + ref.ioctx.close(); + } + return r; + } + + void stop() + { + ref.ioctx.unwatch2(handle); + ref.ioctx.close(); + } + + /// respond to bucket trim notifications + void handle_notify(uint64_t notify_id, uint64_t cookie, + uint64_t notifier_id, bufferlist& bl) override + { + if (cookie != handle) { + return; + } + bufferlist reply; + try { + auto p = bl.begin(); + TrimNotifyType type; + ::decode(type, p); + + auto handler = handlers.find(type); + if (handler != handlers.end()) { + handler->second->handle(p, reply); + } else { + lderr(store->ctx()) << "no handler for notify type " << type << dendl; + } + } catch (const buffer::error& e) { + lderr(store->ctx()) << "Failed to decode notification: " << e.what() << dendl; + } + ref.ioctx.notify_ack(ref.oid, notify_id, cookie, reply); + } + + /// reestablish the watch if it gets disconnected + void handle_error(uint64_t cookie, int err) override + { + if (cookie != handle) { + return; + } + if (err == -ENOTCONN) { + ldout(store->ctx(), 4) << "Disconnected watch on " << ref.oid << dendl; + restart(); + } + } +}; + + namespace rgw { class BucketTrimManager::Impl { @@ -33,15 +158,22 @@ class BucketTrimManager::Impl { RGWRados *const store; const BucketTrimConfig config; + const rgw_raw_obj status_obj; + /// count frequency of bucket instance entries in the data changes log BucketChangeCounter counter; - /// protect data shared between data sync and trim threads + /// serve the bucket trim watch/notify api + BucketTrimWatcher watcher; + + /// protect data shared between data sync, trim, and watch/notify threads std::mutex mutex; Impl(RGWRados *store, const BucketTrimConfig& config) : store(store), config(config), - counter(config.counter_size) + status_obj(store->get_zone_params().log_pool, "bilog.trim"), + counter(config.counter_size), + watcher(store, status_obj) {} }; @@ -52,6 +184,11 @@ BucketTrimManager::BucketTrimManager(RGWRados *store, } BucketTrimManager::~BucketTrimManager() = default; +int BucketTrimManager::init() +{ + return impl->watcher.start(); +} + void BucketTrimManager::on_bucket_changed(const boost::string_view& bucket) { std::lock_guard lock(impl->mutex); diff --git a/src/rgw/rgw_sync_log_trim.h b/src/rgw/rgw_sync_log_trim.h index 2819eeb0bc4..6110a7aa80e 100644 --- a/src/rgw/rgw_sync_log_trim.h +++ b/src/rgw/rgw_sync_log_trim.h @@ -51,6 +51,8 @@ class BucketTrimManager : public BucketChangeObserver { BucketTrimManager(RGWRados *store, const BucketTrimConfig& config); ~BucketTrimManager(); + int init(); + /// increment a counter for the given bucket instance void on_bucket_changed(const boost::string_view& bucket_instance) override; }; -- 2.39.5