// watch/notify api for gateways to coordinate about which buckets to trim
enum TrimNotifyType {
NotifyTrimCounters = 0,
+ NotifyTrimComplete,
};
WRITE_RAW_ENCODER(TrimNotifyType);
virtual ~Server() = default;
virtual void get_bucket_counters(int count, Vector& counters) = 0;
+ virtual void reset_bucket_counters() = 0;
};
/// notify handler
::encode(response, output);
}
+/// api to notify peer gateways that trim has completed and their bucket change
+/// counters can be reset
+struct TrimComplete {
+ struct Request {
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::iterator& p);
+ };
+ struct Response {
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::iterator& p);
+ };
+
+ /// server interface to reset bucket counters
+ using Server = TrimCounters::Server;
+
+ /// notify handler
+ class Handler : public TrimNotifyHandler {
+ Server *const server;
+ public:
+ Handler(Server *server) : server(server) {}
+
+ void handle(bufferlist::iterator& input, bufferlist& output) override;
+ };
+};
+
+void TrimComplete::Request::encode(bufferlist& bl) const
+{
+ ENCODE_START(1, 1, bl);
+ ENCODE_FINISH(bl);
+}
+void TrimComplete::Request::decode(bufferlist::iterator& p)
+{
+ DECODE_START(1, p);
+ DECODE_FINISH(p);
+}
+WRITE_CLASS_ENCODER(TrimComplete::Request);
+
+void TrimComplete::Response::encode(bufferlist& bl) const
+{
+ ENCODE_START(1, 1, bl);
+ ENCODE_FINISH(bl);
+}
+void TrimComplete::Response::decode(bufferlist::iterator& p)
+{
+ DECODE_START(1, p);
+ DECODE_FINISH(p);
+}
+WRITE_CLASS_ENCODER(TrimComplete::Response);
+
+void TrimComplete::Handler::handle(bufferlist::iterator& input,
+ bufferlist& output)
+{
+ Request request;
+ ::decode(request, input);
+
+ server->reset_bucket_counters();
+
+ Response response;
+ ::encode(response, output);
+}
+
/// rados watcher for bucket trim notifications
class BucketTrimWatcher : public librados::WatchCtx2 {
: store(store), obj(obj)
{
handlers.emplace(NotifyTrimCounters, new TrimCounters::Handler(counters));
+ handlers.emplace(NotifyTrimComplete, new TrimComplete::Handler(counters));
}
~BucketTrimWatcher()
const BucketTrimConfig& config;
BucketTrimObserver *const observer;
const rgw_raw_obj& obj;
+ ceph::mono_time start_time;
bufferlist notify_replies;
BucketChangeCounter counter;
std::vector<std::string> buckets; //< buckets selected for trim
int BucketTrimCR::operate()
{
reenter(this) {
+ start_time = ceph::mono_clock::now();
+
if (config.buckets_per_interval) {
// query watch/notify for hot buckets
ldout(cct, 10) << "fetching active bucket counters" << dendl;
}
}
+ // notify peers that trim completed
+ set_status("trim completed");
+ yield {
+ const TrimNotifyType type = NotifyTrimComplete;
+ TrimComplete::Request request;
+ bufferlist bl;
+ ::encode(type, bl);
+ ::encode(request, bl);
+ call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms,
+ nullptr));
+ }
+ if (retcode < 0) {
+ ldout(cct, 10) << "failed to notify peers of trim completion" << dendl;
+ return set_cr_error(retcode);
+ }
+
+ ldout(cct, 4) << "bucket index log processing completed in "
+ << ceph::mono_clock::now() - start_time << dendl;
return set_cr_done();
}
return 0;
ldout(store->ctx(), 20) << "get_bucket_counters: " << buckets << dendl;
}
+ void reset_bucket_counters() override
+ {
+ ldout(store->ctx(), 20) << "bucket trim completed" << dendl;
+ std::lock_guard<std::mutex> lock(mutex);
+ counter.clear();
+ trimmed.expire_old(clock_type::now());
+ }
+
/// BucketTrimObserver interface to remember successfully-trimmed buckets
void on_bucket_trimmed(std::string&& bucket_instance) override
{