From 916f5995c92acd0c5ba66fde6031fed7cb91cb95 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Fri, 15 Sep 2017 15:54:44 -0400 Subject: [PATCH] rgw: add TrimComplete to watch/notify api Signed-off-by: Casey Bodley --- src/rgw/rgw_sync_log_trim.cc | 93 ++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/src/rgw/rgw_sync_log_trim.cc b/src/rgw/rgw_sync_log_trim.cc index 660db0089e1fa..171e7c80a0b3d 100644 --- a/src/rgw/rgw_sync_log_trim.cc +++ b/src/rgw/rgw_sync_log_trim.cc @@ -45,6 +45,7 @@ using rgw::BucketTrimStatus; // watch/notify api for gateways to coordinate about which buckets to trim enum TrimNotifyType { NotifyTrimCounters = 0, + NotifyTrimComplete, }; WRITE_RAW_ENCODER(TrimNotifyType); @@ -93,6 +94,7 @@ struct TrimCounters { virtual ~Server() = default; virtual void get_bucket_counters(int count, Vector& counters) = 0; + virtual void reset_bucket_counters() = 0; }; /// notify handler @@ -162,6 +164,67 @@ void TrimCounters::Handler::handle(bufferlist::iterator& input, ::encode(response, output); } +/// api to notify peer gateways that trim has completed and their bucket change +/// counters can be reset +struct TrimComplete { + struct Request { + void encode(bufferlist& bl) const; + void decode(bufferlist::iterator& p); + }; + struct Response { + void encode(bufferlist& bl) const; + void decode(bufferlist::iterator& p); + }; + + /// server interface to reset bucket counters + using Server = TrimCounters::Server; + + /// notify handler + class Handler : public TrimNotifyHandler { + Server *const server; + public: + Handler(Server *server) : server(server) {} + + void handle(bufferlist::iterator& input, bufferlist& output) override; + }; +}; + +void TrimComplete::Request::encode(bufferlist& bl) const +{ + ENCODE_START(1, 1, bl); + ENCODE_FINISH(bl); +} +void TrimComplete::Request::decode(bufferlist::iterator& p) +{ + DECODE_START(1, p); + DECODE_FINISH(p); +} +WRITE_CLASS_ENCODER(TrimComplete::Request); + +void TrimComplete::Response::encode(bufferlist& bl) const +{ + ENCODE_START(1, 1, bl); + ENCODE_FINISH(bl); +} +void TrimComplete::Response::decode(bufferlist::iterator& p) +{ + DECODE_START(1, p); + DECODE_FINISH(p); +} +WRITE_CLASS_ENCODER(TrimComplete::Response); + +void TrimComplete::Handler::handle(bufferlist::iterator& input, + bufferlist& output) +{ + Request request; + ::decode(request, input); + + server->reset_bucket_counters(); + + Response response; + ::encode(response, output); +} + /// rados watcher for bucket trim notifications class BucketTrimWatcher : public librados::WatchCtx2 { @@ -179,6 +242,7 @@ class BucketTrimWatcher : public librados::WatchCtx2 { : store(store), obj(obj) { handlers.emplace(NotifyTrimCounters, new TrimCounters::Handler(counters)); + handlers.emplace(NotifyTrimComplete, new TrimComplete::Handler(counters)); } ~BucketTrimWatcher() @@ -652,6 +716,7 @@ class BucketTrimCR : public RGWCoroutine { const BucketTrimConfig& config; BucketTrimObserver *const observer; const rgw_raw_obj& obj; + ceph::mono_time start_time; bufferlist notify_replies; BucketChangeCounter counter; std::vector buckets; //< buckets selected for trim @@ -676,6 +741,8 @@ const std::string BucketTrimCR::section{"bucket.instance"}; int BucketTrimCR::operate() { reenter(this) { + start_time = ceph::mono_clock::now(); + if (config.buckets_per_interval) { // query watch/notify for hot buckets ldout(cct, 10) << "fetching active bucket counters" << dendl; @@ -780,6 +847,24 @@ int BucketTrimCR::operate() } } + // notify peers that trim completed + set_status("trim completed"); + yield { + const TrimNotifyType type = NotifyTrimComplete; + TrimComplete::Request request; + bufferlist bl; + ::encode(type, bl); + ::encode(request, bl); + call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms, + nullptr)); + } + if (retcode < 0) { + ldout(cct, 10) << "failed to notify peers of trim completion" << dendl; + return set_cr_error(retcode); + } + + ldout(cct, 4) << "bucket index log processing completed in " + << ceph::mono_clock::now() - start_time << dendl; return set_cr_done(); } return 0; @@ -950,6 +1035,14 @@ class BucketTrimManager::Impl : public TrimCounters::Server, ldout(store->ctx(), 20) << "get_bucket_counters: " << buckets << dendl; } + void reset_bucket_counters() override + { + ldout(store->ctx(), 20) << "bucket trim completed" << dendl; + std::lock_guard lock(mutex); + counter.clear(); + trimmed.expire_old(clock_type::now()); + } + /// BucketTrimObserver interface to remember successfully-trimmed buckets void on_bucket_trimmed(std::string&& bucket_instance) override { -- 2.39.5