From 7be4eab8a339e9e083352a44ad09272da717c73e Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Fri, 1 Sep 2017 11:06:30 -0400 Subject: [PATCH] rgw: BucketTrimManager implements BucketTrimObserver Signed-off-by: Casey Bodley --- src/rgw/rgw_sync_log_trim.cc | 123 +++++++++++++++++++++++++++++++---- src/rgw/rgw_sync_log_trim.h | 7 ++ 2 files changed, 118 insertions(+), 12 deletions(-) diff --git a/src/rgw/rgw_sync_log_trim.cc b/src/rgw/rgw_sync_log_trim.cc index 4e155501a910f..a3c21329822c2 100644 --- a/src/rgw/rgw_sync_log_trim.cc +++ b/src/rgw/rgw_sync_log_trim.cc @@ -14,6 +14,7 @@ */ #include +#include #include #include "common/bounded_key_counter.h" @@ -271,17 +272,29 @@ class BucketTrimWatcher : public librados::WatchCtx2 { }; +/// Interface to communicate with the trim manager about completed operations +struct BucketTrimObserver { + virtual ~BucketTrimObserver() = default; + + virtual void on_bucket_trimmed(std::string&& bucket_instance) = 0; + virtual bool trimmed_recently(const boost::string_view& bucket_instance) = 0; +}; + /// trim the bilog of all of the given bucket instance's shards class BucketTrimInstanceCR : public RGWCoroutine { RGWRados *const store; + BucketTrimObserver *const observer; std::string bucket_instance; public: - BucketTrimInstanceCR(RGWRados *store, const std::string& bucket_instance) + BucketTrimInstanceCR(RGWRados *store, BucketTrimObserver *observer, + const std::string& bucket_instance) : RGWCoroutine(store->ctx()), store(store), + observer(observer), bucket_instance(bucket_instance) {} int operate() { + observer->on_bucket_trimmed(std::move(bucket_instance)); return set_cr_done(); } }; @@ -289,14 +302,15 @@ class BucketTrimInstanceCR : public RGWCoroutine { /// trim each bucket instance while limiting the number of concurrent operations class BucketTrimInstanceCollectCR : public RGWShardCollectCR { RGWRados *const store; + BucketTrimObserver *const observer; std::vector::const_iterator bucket; std::vector::const_iterator end; public: - BucketTrimInstanceCollectCR(RGWRados *store, + BucketTrimInstanceCollectCR(RGWRados *store, BucketTrimObserver *observer, const std::vector& buckets, int max_concurrent) : RGWShardCollectCR(store->ctx(), max_concurrent), - store(store), + store(store), observer(observer), bucket(buckets.begin()), end(buckets.end()) {} bool spawn_next() override; @@ -307,7 +321,7 @@ bool BucketTrimInstanceCollectCR::spawn_next() if (bucket == end) { return false; } - spawn(new BucketTrimInstanceCR(store, *bucket), false); + spawn(new BucketTrimInstanceCR(store, observer, *bucket), false); ++bucket; return true; } @@ -486,6 +500,7 @@ class MetadataListCR : public RGWSimpleCoroutine { class BucketTrimCR : public RGWCoroutine { RGWRados *const store; const BucketTrimConfig& config; + BucketTrimObserver *const observer; const rgw_raw_obj& obj; bufferlist notify_replies; BucketChangeCounter counter; @@ -497,9 +512,9 @@ class BucketTrimCR : public RGWCoroutine { static const std::string section; //< metadata section for bucket instances public: BucketTrimCR(RGWRados *store, const BucketTrimConfig& config, - const rgw_raw_obj& obj) + BucketTrimObserver *observer, const rgw_raw_obj& obj) : RGWCoroutine(store->ctx()), store(store), config(config), - obj(obj), counter(config.counter_size) + observer(observer), obj(obj), counter(config.counter_size) {} int operate(); @@ -566,6 +581,10 @@ int BucketTrimCR::operate() yield { // list cold buckets to consider for trim auto cb = [this] (std::string&& bucket, std::string&& marker) { + // filter out keys that we trimmed recently + if (observer->trimmed_recently(bucket)) { + return true; + } // filter out active buckets that we've already selected auto i = std::find(buckets.begin(), buckets.end(), bucket); if (i != buckets.end()) { @@ -591,7 +610,7 @@ int BucketTrimCR::operate() // trim bucket instances with limited concurrency set_status("trimming buckets"); ldout(cct, 4) << "collected " << buckets.size() << " buckets for trim" << dendl; - yield call(new BucketTrimInstanceCollectCR(store, buckets, + yield call(new BucketTrimInstanceCollectCR(store, observer, buckets, config.concurrent_buckets)); // ignore errors from individual buckets @@ -618,14 +637,16 @@ int BucketTrimCR::operate() class BucketTrimPollCR : public RGWCoroutine { RGWRados *const store; const BucketTrimConfig& config; + BucketTrimObserver *const observer; const rgw_raw_obj& obj; const std::string name{"trim"}; //< lock name const std::string cookie; public: BucketTrimPollCR(RGWRados *store, const BucketTrimConfig& config, - const rgw_raw_obj& obj) - : RGWCoroutine(store->ctx()), store(store), config(config), obj(obj), + BucketTrimObserver *observer, const rgw_raw_obj& obj) + : RGWCoroutine(store->ctx()), store(store), config(config), + observer(observer), obj(obj), cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)) {} @@ -650,7 +671,7 @@ int BucketTrimPollCR::operate() } set_status("trimming"); - yield call(new BucketTrimCR(store, config, obj)); + yield call(new BucketTrimCR(store, config, observer, obj)); if (retcode < 0) { // on errors, unlock so other gateways can try set_status("unlocking"); @@ -662,9 +683,62 @@ int BucketTrimPollCR::operate() return 0; } +/// tracks a bounded list of events with timestamps. old events can be expired, +/// and recent events can be searched by key. expiration depends on events being +/// inserted in temporal order +template +class RecentEventList { + public: + using clock_type = Clock; + using time_point = typename clock_type::time_point; + + RecentEventList(size_t max_size, const ceph::timespan& max_duration) + : events(max_size), max_duration(max_duration) + {} + + /// insert an event at the given point in time. this time must be at least as + /// recent as the last inserted event + void insert(T&& value, const time_point& now) + { + // assert(events.empty() || now >= events.back().time) + events.push_back(Event{std::move(value), now}); + } + + /// performs a linear search for an event matching the given key, whose type + /// U can be any that provides operator==(U, T) + template + bool lookup(const U& key) const + { + for (const auto& event : events) { + if (key == event.value) { + return true; + } + } + return false; + } + + /// remove events that are no longer recent compared to the given point in time + void expire_old(const time_point& now) + { + const auto expired_before = now - max_duration; + while (!events.empty() && events.front().time < expired_before) { + events.pop_front(); + } + } + + private: + struct Event { + T value; + time_point time; + }; + boost::circular_buffer events; + const ceph::timespan max_duration; +}; + namespace rgw { -class BucketTrimManager::Impl : public TrimCounters::Server { +class BucketTrimManager::Impl : public TrimCounters::Server, + public BucketTrimObserver { public: RGWRados *const store; const BucketTrimConfig config; @@ -674,6 +748,11 @@ class BucketTrimManager::Impl : public TrimCounters::Server { /// count frequency of bucket instance entries in the data changes log BucketChangeCounter counter; + using RecentlyTrimmedBucketList = RecentEventList; + using clock_type = RecentlyTrimmedBucketList::clock_type; + /// track recently trimmed buckets to focus trim activity elsewhere + RecentlyTrimmedBucketList trimmed; + /// serve the bucket trim watch/notify api BucketTrimWatcher watcher; @@ -684,6 +763,7 @@ class BucketTrimManager::Impl : public TrimCounters::Server { : store(store), config(config), status_obj(store->get_zone_params().log_pool, BucketTrimStatus::oid), counter(config.counter_size), + trimmed(config.recent_size, config.recent_duration), watcher(store, status_obj, this) {} @@ -697,6 +777,20 @@ class BucketTrimManager::Impl : public TrimCounters::Server { }); ldout(store->ctx(), 20) << "get_bucket_counters: " << buckets << dendl; } + + /// BucketTrimObserver interface to remember successfully-trimmed buckets + void on_bucket_trimmed(std::string&& bucket_instance) override + { + ldout(store->ctx(), 20) << "trimmed bucket instance " << bucket_instance << dendl; + std::lock_guard lock(mutex); + trimmed.insert(std::move(bucket_instance), clock_type::now()); + } + + bool trimmed_recently(const boost::string_view& bucket_instance) override + { + std::lock_guard lock(mutex); + return trimmed.lookup(bucket_instance); + } }; BucketTrimManager::BucketTrimManager(RGWRados *store, @@ -714,12 +808,17 @@ int BucketTrimManager::init() void BucketTrimManager::on_bucket_changed(const boost::string_view& bucket) { std::lock_guard lock(impl->mutex); + // filter recently trimmed bucket instances out of bucket change counter + if (impl->trimmed.lookup(bucket)) { + return; + } impl->counter.insert(bucket.to_string()); } RGWCoroutine* BucketTrimManager::create_bucket_trim_cr() { - return new BucketTrimPollCR(impl->store, impl->config, impl->status_obj); + return new BucketTrimPollCR(impl->store, impl->config, + impl.get(), impl->status_obj); } } // namespace rgw diff --git a/src/rgw/rgw_sync_log_trim.h b/src/rgw/rgw_sync_log_trim.h index 18de544266ac1..d8a1cacc74110 100644 --- a/src/rgw/rgw_sync_log_trim.h +++ b/src/rgw/rgw_sync_log_trim.h @@ -19,6 +19,7 @@ #include #include #include "include/encoding.h" +#include "common/ceph_time.h" class CephContext; class RGWCoroutine; @@ -47,6 +48,12 @@ struct BucketTrimConfig { uint32_t concurrent_buckets{0}; /// timeout in ms for bucket trim notify replies uint64_t notify_timeout_ms{0}; + /// maximum number of recently trimmed buckets to remember (should be small + /// enough for a linear search) + size_t recent_size{0}; + /// maximum duration to consider a trim as 'recent' (should be some multiple + /// of the trim interval, at least) + ceph::timespan recent_duration{0}; }; /// fill out the BucketTrimConfig from the ceph context -- 2.39.5