*/
#include <mutex>
+#include <boost/circular_buffer.hpp>
#include <boost/container/flat_map.hpp>
#include "common/bounded_key_counter.h"
};
+/// Interface to communicate with the trim manager about completed operations
+struct BucketTrimObserver {
+ virtual ~BucketTrimObserver() = default;
+
+ virtual void on_bucket_trimmed(std::string&& bucket_instance) = 0;
+ virtual bool trimmed_recently(const boost::string_view& bucket_instance) = 0;
+};
+
/// trim the bilog of all of the given bucket instance's shards
class BucketTrimInstanceCR : public RGWCoroutine {
RGWRados *const store;
+ BucketTrimObserver *const observer;
std::string bucket_instance;
public:
- BucketTrimInstanceCR(RGWRados *store, const std::string& bucket_instance)
+ BucketTrimInstanceCR(RGWRados *store, BucketTrimObserver *observer,
+ const std::string& bucket_instance)
: RGWCoroutine(store->ctx()), store(store),
+ observer(observer),
bucket_instance(bucket_instance)
{}
int operate() {
+ observer->on_bucket_trimmed(std::move(bucket_instance));
return set_cr_done();
}
};
/// trim each bucket instance while limiting the number of concurrent operations
class BucketTrimInstanceCollectCR : public RGWShardCollectCR {
RGWRados *const store;
+ BucketTrimObserver *const observer;
std::vector<std::string>::const_iterator bucket;
std::vector<std::string>::const_iterator end;
public:
- BucketTrimInstanceCollectCR(RGWRados *store,
+ BucketTrimInstanceCollectCR(RGWRados *store, BucketTrimObserver *observer,
const std::vector<std::string>& buckets,
int max_concurrent)
: RGWShardCollectCR(store->ctx(), max_concurrent),
- store(store),
+ store(store), observer(observer),
bucket(buckets.begin()), end(buckets.end())
{}
bool spawn_next() override;
if (bucket == end) {
return false;
}
- spawn(new BucketTrimInstanceCR(store, *bucket), false);
+ spawn(new BucketTrimInstanceCR(store, observer, *bucket), false);
++bucket;
return true;
}
class BucketTrimCR : public RGWCoroutine {
RGWRados *const store;
const BucketTrimConfig& config;
+ BucketTrimObserver *const observer;
const rgw_raw_obj& obj;
bufferlist notify_replies;
BucketChangeCounter counter;
static const std::string section; //< metadata section for bucket instances
public:
BucketTrimCR(RGWRados *store, const BucketTrimConfig& config,
- const rgw_raw_obj& obj)
+ BucketTrimObserver *observer, const rgw_raw_obj& obj)
: RGWCoroutine(store->ctx()), store(store), config(config),
- obj(obj), counter(config.counter_size)
+ observer(observer), obj(obj), counter(config.counter_size)
{}
int operate();
yield {
// list cold buckets to consider for trim
auto cb = [this] (std::string&& bucket, std::string&& marker) {
+ // filter out keys that we trimmed recently
+ if (observer->trimmed_recently(bucket)) {
+ return true;
+ }
// filter out active buckets that we've already selected
auto i = std::find(buckets.begin(), buckets.end(), bucket);
if (i != buckets.end()) {
// trim bucket instances with limited concurrency
set_status("trimming buckets");
ldout(cct, 4) << "collected " << buckets.size() << " buckets for trim" << dendl;
- yield call(new BucketTrimInstanceCollectCR(store, buckets,
+ yield call(new BucketTrimInstanceCollectCR(store, observer, buckets,
config.concurrent_buckets));
// ignore errors from individual buckets
class BucketTrimPollCR : public RGWCoroutine {
RGWRados *const store;
const BucketTrimConfig& config;
+ BucketTrimObserver *const observer;
const rgw_raw_obj& obj;
const std::string name{"trim"}; //< lock name
const std::string cookie;
public:
BucketTrimPollCR(RGWRados *store, const BucketTrimConfig& config,
- const rgw_raw_obj& obj)
- : RGWCoroutine(store->ctx()), store(store), config(config), obj(obj),
+ BucketTrimObserver *observer, const rgw_raw_obj& obj)
+ : RGWCoroutine(store->ctx()), store(store), config(config),
+ observer(observer), obj(obj),
cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct))
{}
}
set_status("trimming");
- yield call(new BucketTrimCR(store, config, obj));
+ yield call(new BucketTrimCR(store, config, observer, obj));
if (retcode < 0) {
// on errors, unlock so other gateways can try
set_status("unlocking");
return 0;
}
+/// tracks a bounded list of events with timestamps. old events can be expired,
+/// and recent events can be searched by key. expiration depends on events being
+/// inserted in temporal order
+template <typename T, typename Clock = ceph::coarse_mono_clock>
+class RecentEventList {
+ public:
+ using clock_type = Clock;
+ using time_point = typename clock_type::time_point;
+
+ RecentEventList(size_t max_size, const ceph::timespan& max_duration)
+ : events(max_size), max_duration(max_duration)
+ {}
+
+ /// insert an event at the given point in time. this time must be at least as
+ /// recent as the last inserted event
+ void insert(T&& value, const time_point& now)
+ {
+ // assert(events.empty() || now >= events.back().time)
+ events.push_back(Event{std::move(value), now});
+ }
+
+ /// performs a linear search for an event matching the given key, whose type
+ /// U can be any that provides operator==(U, T)
+ template <typename U>
+ bool lookup(const U& key) const
+ {
+ for (const auto& event : events) {
+ if (key == event.value) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /// remove events that are no longer recent compared to the given point in time
+ void expire_old(const time_point& now)
+ {
+ const auto expired_before = now - max_duration;
+ while (!events.empty() && events.front().time < expired_before) {
+ events.pop_front();
+ }
+ }
+
+ private:
+ struct Event {
+ T value;
+ time_point time;
+ };
+ boost::circular_buffer<Event> events;
+ const ceph::timespan max_duration;
+};
+
namespace rgw {
-class BucketTrimManager::Impl : public TrimCounters::Server {
+class BucketTrimManager::Impl : public TrimCounters::Server,
+ public BucketTrimObserver {
public:
RGWRados *const store;
const BucketTrimConfig config;
/// count frequency of bucket instance entries in the data changes log
BucketChangeCounter counter;
+ using RecentlyTrimmedBucketList = RecentEventList<std::string>;
+ using clock_type = RecentlyTrimmedBucketList::clock_type;
+ /// track recently trimmed buckets to focus trim activity elsewhere
+ RecentlyTrimmedBucketList trimmed;
+
/// serve the bucket trim watch/notify api
BucketTrimWatcher watcher;
: store(store), config(config),
status_obj(store->get_zone_params().log_pool, BucketTrimStatus::oid),
counter(config.counter_size),
+ trimmed(config.recent_size, config.recent_duration),
watcher(store, status_obj, this)
{}
});
ldout(store->ctx(), 20) << "get_bucket_counters: " << buckets << dendl;
}
+
+ /// BucketTrimObserver interface to remember successfully-trimmed buckets
+ void on_bucket_trimmed(std::string&& bucket_instance) override
+ {
+ ldout(store->ctx(), 20) << "trimmed bucket instance " << bucket_instance << dendl;
+ std::lock_guard<std::mutex> lock(mutex);
+ trimmed.insert(std::move(bucket_instance), clock_type::now());
+ }
+
+ bool trimmed_recently(const boost::string_view& bucket_instance) override
+ {
+ std::lock_guard<std::mutex> lock(mutex);
+ return trimmed.lookup(bucket_instance);
+ }
};
BucketTrimManager::BucketTrimManager(RGWRados *store,
void BucketTrimManager::on_bucket_changed(const boost::string_view& bucket)
{
std::lock_guard<std::mutex> lock(impl->mutex);
+ // filter recently trimmed bucket instances out of bucket change counter
+ if (impl->trimmed.lookup(bucket)) {
+ return;
+ }
impl->counter.insert(bucket.to_string());
}
RGWCoroutine* BucketTrimManager::create_bucket_trim_cr()
{
- return new BucketTrimPollCR(impl->store, impl->config, impl->status_obj);
+ return new BucketTrimPollCR(impl->store, impl->config,
+ impl.get(), impl->status_obj);
}
} // namespace rgw