]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: BucketTrimManager implements BucketTrimObserver
authorCasey Bodley <cbodley@redhat.com>
Fri, 1 Sep 2017 15:06:30 +0000 (11:06 -0400)
committerCasey Bodley <cbodley@redhat.com>
Fri, 10 Nov 2017 18:23:01 +0000 (13:23 -0500)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_sync_log_trim.cc
src/rgw/rgw_sync_log_trim.h

index 4e155501a910f7d8fa28d4282a85e17f2b235290..a3c21329822c20425b4c7aec74b08d8ddc7569df 100644 (file)
@@ -14,6 +14,7 @@
  */
 
 #include <mutex>
+#include <boost/circular_buffer.hpp>
 #include <boost/container/flat_map.hpp>
 
 #include "common/bounded_key_counter.h"
@@ -271,17 +272,29 @@ class BucketTrimWatcher : public librados::WatchCtx2 {
 };
 
 
+/// Interface to communicate with the trim manager about completed operations
+struct BucketTrimObserver {
+  virtual ~BucketTrimObserver() = default;
+
+  virtual void on_bucket_trimmed(std::string&& bucket_instance) = 0;
+  virtual bool trimmed_recently(const boost::string_view& bucket_instance) = 0;
+};
+
 /// trim the bilog of all of the given bucket instance's shards
 class BucketTrimInstanceCR : public RGWCoroutine {
   RGWRados *const store;
+  BucketTrimObserver *const observer;
   std::string bucket_instance;
 
  public:
-  BucketTrimInstanceCR(RGWRados *store, const std::string& bucket_instance)
+  BucketTrimInstanceCR(RGWRados *store, BucketTrimObserver *observer,
+                       const std::string& bucket_instance)
     : RGWCoroutine(store->ctx()), store(store),
+      observer(observer),
       bucket_instance(bucket_instance)
   {}
   int operate() {
+    observer->on_bucket_trimmed(std::move(bucket_instance));
     return set_cr_done();
   }
 };
@@ -289,14 +302,15 @@ class BucketTrimInstanceCR : public RGWCoroutine {
 /// trim each bucket instance while limiting the number of concurrent operations
 class BucketTrimInstanceCollectCR : public RGWShardCollectCR {
   RGWRados *const store;
+  BucketTrimObserver *const observer;
   std::vector<std::string>::const_iterator bucket;
   std::vector<std::string>::const_iterator end;
  public:
-  BucketTrimInstanceCollectCR(RGWRados *store,
+  BucketTrimInstanceCollectCR(RGWRados *store, BucketTrimObserver *observer,
                               const std::vector<std::string>& buckets,
                               int max_concurrent)
     : RGWShardCollectCR(store->ctx(), max_concurrent),
-      store(store),
+      store(store), observer(observer),
       bucket(buckets.begin()), end(buckets.end())
   {}
   bool spawn_next() override;
@@ -307,7 +321,7 @@ bool BucketTrimInstanceCollectCR::spawn_next()
   if (bucket == end) {
     return false;
   }
-  spawn(new BucketTrimInstanceCR(store, *bucket), false);
+  spawn(new BucketTrimInstanceCR(store, observer, *bucket), false);
   ++bucket;
   return true;
 }
@@ -486,6 +500,7 @@ class MetadataListCR : public RGWSimpleCoroutine {
 class BucketTrimCR : public RGWCoroutine {
   RGWRados *const store;
   const BucketTrimConfig& config;
+  BucketTrimObserver *const observer;
   const rgw_raw_obj& obj;
   bufferlist notify_replies;
   BucketChangeCounter counter;
@@ -497,9 +512,9 @@ class BucketTrimCR : public RGWCoroutine {
   static const std::string section; //< metadata section for bucket instances
  public:
   BucketTrimCR(RGWRados *store, const BucketTrimConfig& config,
-               const rgw_raw_obj& obj)
+               BucketTrimObserver *observer, const rgw_raw_obj& obj)
     : RGWCoroutine(store->ctx()), store(store), config(config),
-      obj(obj), counter(config.counter_size)
+      observer(observer), obj(obj), counter(config.counter_size)
   {}
 
   int operate();
@@ -566,6 +581,10 @@ int BucketTrimCR::operate()
       yield {
         // list cold buckets to consider for trim
         auto cb = [this] (std::string&& bucket, std::string&& marker) {
+          // filter out keys that we trimmed recently
+          if (observer->trimmed_recently(bucket)) {
+            return true;
+          }
           // filter out active buckets that we've already selected
           auto i = std::find(buckets.begin(), buckets.end(), bucket);
           if (i != buckets.end()) {
@@ -591,7 +610,7 @@ int BucketTrimCR::operate()
     // trim bucket instances with limited concurrency
     set_status("trimming buckets");
     ldout(cct, 4) << "collected " << buckets.size() << " buckets for trim" << dendl;
-    yield call(new BucketTrimInstanceCollectCR(store, buckets,
+    yield call(new BucketTrimInstanceCollectCR(store, observer, buckets,
                                                config.concurrent_buckets));
     // ignore errors from individual buckets
 
@@ -618,14 +637,16 @@ int BucketTrimCR::operate()
 class BucketTrimPollCR : public RGWCoroutine {
   RGWRados *const store;
   const BucketTrimConfig& config;
+  BucketTrimObserver *const observer;
   const rgw_raw_obj& obj;
   const std::string name{"trim"}; //< lock name
   const std::string cookie;
 
  public:
   BucketTrimPollCR(RGWRados *store, const BucketTrimConfig& config,
-                   const rgw_raw_obj& obj)
-    : RGWCoroutine(store->ctx()), store(store), config(config), obj(obj),
+                   BucketTrimObserver *observer, const rgw_raw_obj& obj)
+    : RGWCoroutine(store->ctx()), store(store), config(config),
+      observer(observer), obj(obj),
       cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct))
   {}
 
@@ -650,7 +671,7 @@ int BucketTrimPollCR::operate()
       }
 
       set_status("trimming");
-      yield call(new BucketTrimCR(store, config, obj));
+      yield call(new BucketTrimCR(store, config, observer, obj));
       if (retcode < 0) {
         // on errors, unlock so other gateways can try
         set_status("unlocking");
@@ -662,9 +683,62 @@ int BucketTrimPollCR::operate()
   return 0;
 }
 
+/// tracks a bounded list of events with timestamps. old events can be expired,
+/// and recent events can be searched by key. expiration depends on events being
+/// inserted in temporal order
+template <typename T, typename Clock = ceph::coarse_mono_clock>
+class RecentEventList {
+ public:
+  using clock_type = Clock;
+  using time_point = typename clock_type::time_point;
+
+  RecentEventList(size_t max_size, const ceph::timespan& max_duration)
+    : events(max_size), max_duration(max_duration)
+  {}
+
+  /// insert an event at the given point in time. this time must be at least as
+  /// recent as the last inserted event
+  void insert(T&& value, const time_point& now)
+  {
+    // assert(events.empty() || now >= events.back().time)
+    events.push_back(Event{std::move(value), now});
+  }
+
+  /// performs a linear search for an event matching the given key, whose type
+  /// U can be any that provides operator==(U, T)
+  template <typename U>
+  bool lookup(const U& key) const
+  {
+    for (const auto& event : events) {
+      if (key == event.value) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /// remove events that are no longer recent compared to the given point in time
+  void expire_old(const time_point& now)
+  {
+    const auto expired_before = now - max_duration;
+    while (!events.empty() && events.front().time < expired_before) {
+      events.pop_front();
+    }
+  }
+
+ private:
+  struct Event {
+    T value;
+    time_point time;
+  };
+  boost::circular_buffer<Event> events;
+  const ceph::timespan max_duration;
+};
+
 namespace rgw {
 
-class BucketTrimManager::Impl : public TrimCounters::Server {
+class BucketTrimManager::Impl : public TrimCounters::Server,
+                                public BucketTrimObserver {
  public:
   RGWRados *const store;
   const BucketTrimConfig config;
@@ -674,6 +748,11 @@ class BucketTrimManager::Impl : public TrimCounters::Server {
   /// count frequency of bucket instance entries in the data changes log
   BucketChangeCounter counter;
 
+  using RecentlyTrimmedBucketList = RecentEventList<std::string>;
+  using clock_type = RecentlyTrimmedBucketList::clock_type;
+  /// track recently trimmed buckets to focus trim activity elsewhere
+  RecentlyTrimmedBucketList trimmed;
+
   /// serve the bucket trim watch/notify api
   BucketTrimWatcher watcher;
 
@@ -684,6 +763,7 @@ class BucketTrimManager::Impl : public TrimCounters::Server {
     : store(store), config(config),
       status_obj(store->get_zone_params().log_pool, BucketTrimStatus::oid),
       counter(config.counter_size),
+      trimmed(config.recent_size, config.recent_duration),
       watcher(store, status_obj, this)
   {}
 
@@ -697,6 +777,20 @@ class BucketTrimManager::Impl : public TrimCounters::Server {
                         });
     ldout(store->ctx(), 20) << "get_bucket_counters: " << buckets << dendl;
   }
+
+  /// BucketTrimObserver interface to remember successfully-trimmed buckets
+  void on_bucket_trimmed(std::string&& bucket_instance) override
+  {
+    ldout(store->ctx(), 20) << "trimmed bucket instance " << bucket_instance << dendl;
+    std::lock_guard<std::mutex> lock(mutex);
+    trimmed.insert(std::move(bucket_instance), clock_type::now());
+  }
+
+  bool trimmed_recently(const boost::string_view& bucket_instance) override
+  {
+    std::lock_guard<std::mutex> lock(mutex);
+    return trimmed.lookup(bucket_instance);
+  }
 };
 
 BucketTrimManager::BucketTrimManager(RGWRados *store,
@@ -714,12 +808,17 @@ int BucketTrimManager::init()
 void BucketTrimManager::on_bucket_changed(const boost::string_view& bucket)
 {
   std::lock_guard<std::mutex> lock(impl->mutex);
+  // filter recently trimmed bucket instances out of bucket change counter
+  if (impl->trimmed.lookup(bucket)) {
+    return;
+  }
   impl->counter.insert(bucket.to_string());
 }
 
 RGWCoroutine* BucketTrimManager::create_bucket_trim_cr()
 {
-  return new BucketTrimPollCR(impl->store, impl->config, impl->status_obj);
+  return new BucketTrimPollCR(impl->store, impl->config,
+                              impl.get(), impl->status_obj);
 }
 
 } // namespace rgw
index 18de544266ac1f8ca9321572ac04e7d3a29390eb..d8a1cacc74110b0ef2081aeeac9a653ab2b036bc 100644 (file)
@@ -19,6 +19,7 @@
 #include <memory>
 #include <boost/utility/string_view.hpp>
 #include "include/encoding.h"
+#include "common/ceph_time.h"
 
 class CephContext;
 class RGWCoroutine;
@@ -47,6 +48,12 @@ struct BucketTrimConfig {
   uint32_t concurrent_buckets{0};
   /// timeout in ms for bucket trim notify replies
   uint64_t notify_timeout_ms{0};
+  /// maximum number of recently trimmed buckets to remember (should be small
+  /// enough for a linear search)
+  size_t recent_size{0};
+  /// maximum duration to consider a trim as 'recent' (should be some multiple
+  /// of the trim interval, at least)
+  ceph::timespan recent_duration{0};
 };
 
 /// fill out the BucketTrimConfig from the ceph context