#include "common/bounded_key_counter.h"
#include "common/errno.h"
-#include "rgw_sync_log_trim.h"
#include "rgw_rados.h"
+#include "rgw_sync_log_trim.h"
#include "include/assert.h"
#define dout_subsys ceph_subsys_rgw
// watch/notify api for gateways to coordinate about which buckets to trim
enum TrimNotifyType {
+ NotifyTrimCounters = 0,
};
WRITE_RAW_ENCODER(TrimNotifyType);
virtual void handle(bufferlist::iterator& input, bufferlist& output) = 0;
};
+/// api to share the bucket trim counters between gateways in the same zone.
+/// each gateway will process different datalog shards, so the gateway that runs
+/// the trim process needs to accumulate their counters
+struct TrimCounters {
+ /// counter for a single bucket
+ struct BucketCounter {
+ std::string bucket;
+ int count{0};
+
+ BucketCounter() = default;
+ BucketCounter(const std::string& bucket, int count)
+ : bucket(bucket), count(count) {}
+
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::iterator& p);
+ };
+ using Vector = std::vector<BucketCounter>;
+
+ /// request bucket trim counters from peer gateways
+ struct Request {
+ uint16_t max_buckets; //< maximum number of bucket counters to return
+
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::iterator& p);
+ };
+
+ /// return the current bucket trim counters
+ struct Response {
+ Vector bucket_counters;
+
+ void encode(bufferlist& bl) const;
+ void decode(bufferlist::iterator& p);
+ };
+
+ /// server interface to query the hottest buckets
+ struct Server {
+ virtual ~Server() = default;
+
+ virtual void get_bucket_counters(int count, Vector& counters) = 0;
+ };
+
+ /// notify handler
+ class Handler : public TrimNotifyHandler {
+ Server *const server;
+ public:
+ Handler(Server *server) : server(server) {}
+
+ void handle(bufferlist::iterator& input, bufferlist& output) override;
+ };
+};
+std::ostream& operator<<(std::ostream& out, const TrimCounters::BucketCounter& rhs)
+{
+ return out << rhs.bucket << ":" << rhs.count;
+}
+
+void TrimCounters::BucketCounter::encode(bufferlist& bl) const
+{
+ // no versioning to save space
+ ::encode(bucket, bl);
+ ::encode(count, bl);
+}
+void TrimCounters::BucketCounter::decode(bufferlist::iterator& p)
+{
+ ::decode(bucket, p);
+ ::decode(count, p);
+}
+WRITE_CLASS_ENCODER(TrimCounters::BucketCounter);
+
+void TrimCounters::Request::encode(bufferlist& bl) const
+{
+ ENCODE_START(1, 1, bl);
+ ::encode(max_buckets, bl);
+ ENCODE_FINISH(bl);
+}
+void TrimCounters::Request::decode(bufferlist::iterator& p)
+{
+ DECODE_START(1, p);
+ ::decode(max_buckets, p);
+ DECODE_FINISH(p);
+}
+WRITE_CLASS_ENCODER(TrimCounters::Request);
+
+void TrimCounters::Response::encode(bufferlist& bl) const
+{
+ ENCODE_START(1, 1, bl);
+ ::encode(bucket_counters, bl);
+ ENCODE_FINISH(bl);
+}
+void TrimCounters::Response::decode(bufferlist::iterator& p)
+{
+ DECODE_START(1, p);
+ ::decode(bucket_counters, p);
+ DECODE_FINISH(p);
+}
+WRITE_CLASS_ENCODER(TrimCounters::Response);
+
+void TrimCounters::Handler::handle(bufferlist::iterator& input,
+ bufferlist& output)
+{
+ Request request;
+ ::decode(request, input);
+ auto count = std::min<uint16_t>(request.max_buckets, 128);
+
+ Response response;
+ server->get_bucket_counters(count, response.bucket_counters);
+ ::encode(response, output);
+}
+
+
/// rados watcher for bucket trim notifications
class BucketTrimWatcher : public librados::WatchCtx2 {
RGWRados *const store;
boost::container::flat_map<TrimNotifyType, HandlerPtr> handlers;
public:
- BucketTrimWatcher(RGWRados *store, const rgw_raw_obj& obj)
+ BucketTrimWatcher(RGWRados *store, const rgw_raw_obj& obj,
+ TrimCounters::Server *counters)
: store(store), obj(obj)
{
+ handlers.emplace(NotifyTrimCounters, new TrimCounters::Handler(counters));
}
~BucketTrimWatcher()
namespace rgw {
-class BucketTrimManager::Impl {
+class BucketTrimManager::Impl : public TrimCounters::Server {
public:
RGWRados *const store;
const BucketTrimConfig config;
: store(store), config(config),
status_obj(store->get_zone_params().log_pool, "bilog.trim"),
counter(config.counter_size),
- watcher(store, status_obj)
+ watcher(store, status_obj, this)
{}
+
+ /// TrimCounters::Server interface for watch/notify api
+ void get_bucket_counters(int count, TrimCounters::Vector& buckets)
+ {
+ buckets.reserve(count);
+ std::lock_guard<std::mutex> lock(mutex);
+ counter.get_highest(count, [&buckets] (const std::string& key, int count) {
+ buckets.emplace_back(key, count);
+ });
+ ldout(store->ctx(), 20) << "get_bucket_counters: " << buckets << dendl;
+ }
};
BucketTrimManager::BucketTrimManager(RGWRados *store,