From: Casey Bodley Date: Fri, 1 Sep 2017 15:29:55 +0000 (-0400) Subject: rgw: add TrimCounters api to BucketTrimWatcher X-Git-Tag: v12.2.3~125^2~28 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=1d8dbaebca536cbf0341d4d0284dcbad191e64ad;p=ceph.git rgw: add TrimCounters api to BucketTrimWatcher Signed-off-by: Casey Bodley (cherry picked from commit 5bcf109eac30780cfa9ae5d524d2bde638651f40) --- diff --git a/src/rgw/rgw_sync_log_trim.cc b/src/rgw/rgw_sync_log_trim.cc index 462b467acea..09017290dfd 100644 --- a/src/rgw/rgw_sync_log_trim.cc +++ b/src/rgw/rgw_sync_log_trim.cc @@ -18,8 +18,8 @@ #include "common/bounded_key_counter.h" #include "common/errno.h" -#include "rgw_sync_log_trim.h" #include "rgw_rados.h" +#include "rgw_sync_log_trim.h" #include "include/assert.h" #define dout_subsys ceph_subsys_rgw @@ -33,6 +33,7 @@ using BucketChangeCounter = BoundedKeyCounter; // watch/notify api for gateways to coordinate about which buckets to trim enum TrimNotifyType { + NotifyTrimCounters = 0, }; WRITE_RAW_ENCODER(TrimNotifyType); @@ -42,6 +43,115 @@ struct TrimNotifyHandler { virtual void handle(bufferlist::iterator& input, bufferlist& output) = 0; }; +/// api to share the bucket trim counters between gateways in the same zone. +/// each gateway will process different datalog shards, so the gateway that runs +/// the trim process needs to accumulate their counters +struct TrimCounters { + /// counter for a single bucket + struct BucketCounter { + std::string bucket; + int count{0}; + + BucketCounter() = default; + BucketCounter(const std::string& bucket, int count) + : bucket(bucket), count(count) {} + + void encode(bufferlist& bl) const; + void decode(bufferlist::iterator& p); + }; + using Vector = std::vector; + + /// request bucket trim counters from peer gateways + struct Request { + uint16_t max_buckets; //< maximum number of bucket counters to return + + void encode(bufferlist& bl) const; + void decode(bufferlist::iterator& p); + }; + + /// return the current bucket trim counters + struct Response { + Vector bucket_counters; + + void encode(bufferlist& bl) const; + void decode(bufferlist::iterator& p); + }; + + /// server interface to query the hottest buckets + struct Server { + virtual ~Server() = default; + + virtual void get_bucket_counters(int count, Vector& counters) = 0; + }; + + /// notify handler + class Handler : public TrimNotifyHandler { + Server *const server; + public: + Handler(Server *server) : server(server) {} + + void handle(bufferlist::iterator& input, bufferlist& output) override; + }; +}; +std::ostream& operator<<(std::ostream& out, const TrimCounters::BucketCounter& rhs) +{ + return out << rhs.bucket << ":" << rhs.count; +} + +void TrimCounters::BucketCounter::encode(bufferlist& bl) const +{ + // no versioning to save space + ::encode(bucket, bl); + ::encode(count, bl); +} +void TrimCounters::BucketCounter::decode(bufferlist::iterator& p) +{ + ::decode(bucket, p); + ::decode(count, p); +} +WRITE_CLASS_ENCODER(TrimCounters::BucketCounter); + +void TrimCounters::Request::encode(bufferlist& bl) const +{ + ENCODE_START(1, 1, bl); + ::encode(max_buckets, bl); + ENCODE_FINISH(bl); +} +void TrimCounters::Request::decode(bufferlist::iterator& p) +{ + DECODE_START(1, p); + ::decode(max_buckets, p); + DECODE_FINISH(p); +} +WRITE_CLASS_ENCODER(TrimCounters::Request); + +void TrimCounters::Response::encode(bufferlist& bl) const +{ + ENCODE_START(1, 1, bl); + ::encode(bucket_counters, bl); + ENCODE_FINISH(bl); +} +void TrimCounters::Response::decode(bufferlist::iterator& p) +{ + DECODE_START(1, p); + ::decode(bucket_counters, p); + DECODE_FINISH(p); +} +WRITE_CLASS_ENCODER(TrimCounters::Response); + +void TrimCounters::Handler::handle(bufferlist::iterator& input, + bufferlist& output) +{ + Request request; + ::decode(request, input); + auto count = std::min(request.max_buckets, 128); + + Response response; + server->get_bucket_counters(count, response.bucket_counters); + ::encode(response, output); +} + + /// rados watcher for bucket trim notifications class BucketTrimWatcher : public librados::WatchCtx2 { RGWRados *const store; @@ -53,9 +163,11 @@ class BucketTrimWatcher : public librados::WatchCtx2 { boost::container::flat_map handlers; public: - BucketTrimWatcher(RGWRados *store, const rgw_raw_obj& obj) + BucketTrimWatcher(RGWRados *store, const rgw_raw_obj& obj, + TrimCounters::Server *counters) : store(store), obj(obj) { + handlers.emplace(NotifyTrimCounters, new TrimCounters::Handler(counters)); } ~BucketTrimWatcher() @@ -153,7 +265,7 @@ class BucketTrimWatcher : public librados::WatchCtx2 { namespace rgw { -class BucketTrimManager::Impl { +class BucketTrimManager::Impl : public TrimCounters::Server { public: RGWRados *const store; const BucketTrimConfig config; @@ -173,8 +285,19 @@ class BucketTrimManager::Impl { : store(store), config(config), status_obj(store->get_zone_params().log_pool, "bilog.trim"), counter(config.counter_size), - watcher(store, status_obj) + watcher(store, status_obj, this) {} + + /// TrimCounters::Server interface for watch/notify api + void get_bucket_counters(int count, TrimCounters::Vector& buckets) + { + buckets.reserve(count); + std::lock_guard lock(mutex); + counter.get_highest(count, [&buckets] (const std::string& key, int count) { + buckets.emplace_back(key, count); + }); + ldout(store->ctx(), 20) << "get_bucket_counters: " << buckets << dendl; + } }; BucketTrimManager::BucketTrimManager(RGWRados *store,