public:
BucketTrimWatcher(RGWRados *store, const rgw_raw_obj& obj,
TrimCounters::Server *counters)
- : store(store), obj(obj)
- {
+ : store(store), obj(obj) {
handlers.emplace(NotifyTrimCounters, new TrimCounters::Handler(counters));
handlers.emplace(NotifyTrimComplete, new TrimComplete::Handler(counters));
}
- ~BucketTrimWatcher()
- {
+ ~BucketTrimWatcher() {
stop();
}
- int start()
- {
+ int start() {
int r = store->get_raw_obj_ref(obj, &ref);
if (r < 0) {
return r;
return 0;
}
- int restart()
- {
+ int restart() {
int r = ref.ioctx.unwatch2(handle);
if (r < 0) {
lderr(store->ctx()) << "Failed to unwatch on " << ref.oid
return r;
}
- void stop()
- {
+ void stop() {
if (handle) {
ref.ioctx.unwatch2(handle);
ref.ioctx.close();
/// respond to bucket trim notifications
void handle_notify(uint64_t notify_id, uint64_t cookie,
- uint64_t notifier_id, bufferlist& bl) override
- {
+ uint64_t notifier_id, bufferlist& bl) override {
if (cookie != handle) {
return;
}
}
/// reestablish the watch if it gets disconnected
- void handle_error(uint64_t cookie, int err) override
- {
+ void handle_error(uint64_t cookie, int err) override {
if (cookie != handle) {
return;
}
/// insert an event at the given point in time. this time must be at least as
/// recent as the last inserted event
- void insert(T&& value, const time_point& now)
- {
+ void insert(T&& value, const time_point& now) {
// assert(events.empty() || now >= events.back().time)
events.push_back(Event{std::move(value), now});
}
/// performs a linear search for an event matching the given key, whose type
/// U can be any that provides operator==(U, T)
template <typename U>
- bool lookup(const U& key) const
- {
+ bool lookup(const U& key) const {
for (const auto& event : events) {
if (key == event.value) {
return true;
}
/// remove events that are no longer recent compared to the given point in time
- void expire_old(const time_point& now)
- {
+ void expire_old(const time_point& now) {
const auto expired_before = now - max_duration;
while (!events.empty() && events.front().time < expired_before) {
events.pop_front();
{}
/// TrimCounters::Server interface for watch/notify api
- void get_bucket_counters(int count, TrimCounters::Vector& buckets)
- {
+ void get_bucket_counters(int count, TrimCounters::Vector& buckets) {
buckets.reserve(count);
std::lock_guard<std::mutex> lock(mutex);
counter.get_highest(count, [&buckets] (const std::string& key, int count) {
ldout(store->ctx(), 20) << "get_bucket_counters: " << buckets << dendl;
}
- void reset_bucket_counters() override
- {
+ void reset_bucket_counters() override {
ldout(store->ctx(), 20) << "bucket trim completed" << dendl;
std::lock_guard<std::mutex> lock(mutex);
counter.clear();
}
/// BucketTrimObserver interface to remember successfully-trimmed buckets
- void on_bucket_trimmed(std::string&& bucket_instance) override
- {
+ void on_bucket_trimmed(std::string&& bucket_instance) override {
ldout(store->ctx(), 20) << "trimmed bucket instance " << bucket_instance << dendl;
std::lock_guard<std::mutex> lock(mutex);
trimmed.insert(std::move(bucket_instance), clock_type::now());
}
- bool trimmed_recently(const boost::string_view& bucket_instance) override
- {
+ bool trimmed_recently(const boost::string_view& bucket_instance) override {
std::lock_guard<std::mutex> lock(mutex);
return trimmed.lookup(bucket_instance);
}