From: Casey Bodley Date: Sat, 13 Apr 2019 17:04:34 +0000 (-0400) Subject: rgw: move rgw_sync_log_trim.* to rgw_trim_bilog.* X-Git-Tag: v15.1.0~2837^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=4184edfa78eb5522667742edbf4f3a24eb1da95a;p=ceph.git rgw: move rgw_sync_log_trim.* to rgw_trim_bilog.* Signed-off-by: Casey Bodley --- diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index cff1ead0ebfe..dac869f824fb 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -72,8 +72,8 @@ set(librgw_common_srcs rgw_sync_module_pubsub.cc rgw_pubsub_push.cc rgw_sync_module_pubsub_rest.cc - rgw_sync_log_trim.cc rgw_sync_trace.cc + rgw_trim_bilog.cc rgw_trim_datalog.cc rgw_trim_mdlog.cc rgw_period_history.cc diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index de75af45e1e4..8ef49038aa50 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -44,9 +44,9 @@ extern "C" { #include "rgw_usage.h" #include "rgw_orphan.h" #include "rgw_sync.h" +#include "rgw_trim_bilog.h" #include "rgw_trim_datalog.h" #include "rgw_trim_mdlog.h" -#include "rgw_sync_log_trim.h" #include "rgw_data_sync.h" #include "rgw_rest_conn.h" #include "rgw_realm_watcher.h" diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 33e69c965941..ea9e53d5e9c0 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -23,7 +23,6 @@ #include "rgw_metadata.h" #include "rgw_sync_counters.h" #include "rgw_sync_module.h" -#include "rgw_sync_log_trim.h" #include "cls/lock/cls_lock_client.h" diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 2c35c7f397b1..d713dd58eddd 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -25,7 +25,7 @@ #include "rgw_meta_sync_status.h" #include "rgw_period_puller.h" #include "rgw_sync_module.h" -#include "rgw_sync_log_trim.h" +#include "rgw_trim_bilog.h" #include "rgw_service.h" #include "services/svc_rados.h" diff --git a/src/rgw/rgw_sync_log_trim.cc b/src/rgw/rgw_sync_log_trim.cc deleted file mode 100644 index 53aa8e10aa90..000000000000 --- a/src/rgw/rgw_sync_log_trim.cc +++ /dev/null @@ -1,1095 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2017 Red Hat, Inc - * - * Author: Casey Bodley - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - */ - -#include -#include -#include - -#include "include/scope_guard.h" -#include "common/bounded_key_counter.h" -#include "common/errno.h" -#include "rgw_sync_log_trim.h" -#include "rgw_cr_rados.h" -#include "rgw_cr_rest.h" -#include "rgw_data_sync.h" -#include "rgw_metadata.h" -#include "rgw_rados.h" -#include "rgw_zone.h" -#include "rgw_sync.h" - -#include "services/svc_zone.h" - -#include -#include "include/ceph_assert.h" - -#define dout_subsys ceph_subsys_rgw - -#undef dout_prefix -#define dout_prefix (*_dout << "trim: ") - -using rgw::BucketTrimConfig; -using BucketChangeCounter = BoundedKeyCounter; - -const std::string rgw::BucketTrimStatus::oid = "bilog.trim"; -using rgw::BucketTrimStatus; - - -// watch/notify api for gateways to coordinate about which buckets to trim -enum TrimNotifyType { - NotifyTrimCounters = 0, - NotifyTrimComplete, -}; -WRITE_RAW_ENCODER(TrimNotifyType); - -struct TrimNotifyHandler { - virtual ~TrimNotifyHandler() = default; - - virtual void handle(bufferlist::const_iterator& input, bufferlist& output) = 0; -}; - -/// api to share the bucket trim counters between gateways in the same zone. -/// each gateway will process different datalog shards, so the gateway that runs -/// the trim process needs to accumulate their counters -struct TrimCounters { - /// counter for a single bucket - struct BucketCounter { - std::string bucket; //< bucket instance metadata key - int count{0}; - - BucketCounter() = default; - BucketCounter(const std::string& bucket, int count) - : bucket(bucket), count(count) {} - - void encode(bufferlist& bl) const; - void decode(bufferlist::const_iterator& p); - }; - using Vector = std::vector; - - /// request bucket trim counters from peer gateways - struct Request { - uint16_t max_buckets; //< maximum number of bucket counters to return - - void encode(bufferlist& bl) const; - void decode(bufferlist::const_iterator& p); - }; - - /// return the current bucket trim counters - struct Response { - Vector bucket_counters; - - void encode(bufferlist& bl) const; - void decode(bufferlist::const_iterator& p); - }; - - /// server interface to query the hottest buckets - struct Server { - virtual ~Server() = default; - - virtual void get_bucket_counters(int count, Vector& counters) = 0; - virtual void reset_bucket_counters() = 0; - }; - - /// notify handler - class Handler : public TrimNotifyHandler { - Server *const server; - public: - explicit Handler(Server *server) : server(server) {} - - void handle(bufferlist::const_iterator& input, bufferlist& output) override; - }; -}; -std::ostream& operator<<(std::ostream& out, const TrimCounters::BucketCounter& rhs) -{ - return out << rhs.bucket << ":" << rhs.count; -} - -void TrimCounters::BucketCounter::encode(bufferlist& bl) const -{ - using ceph::encode; - // no versioning to save space - encode(bucket, bl); - encode(count, bl); -} -void TrimCounters::BucketCounter::decode(bufferlist::const_iterator& p) -{ - using ceph::decode; - decode(bucket, p); - decode(count, p); -} -WRITE_CLASS_ENCODER(TrimCounters::BucketCounter); - -void TrimCounters::Request::encode(bufferlist& bl) const -{ - ENCODE_START(1, 1, bl); - encode(max_buckets, bl); - ENCODE_FINISH(bl); -} -void TrimCounters::Request::decode(bufferlist::const_iterator& p) -{ - DECODE_START(1, p); - decode(max_buckets, p); - DECODE_FINISH(p); -} -WRITE_CLASS_ENCODER(TrimCounters::Request); - -void TrimCounters::Response::encode(bufferlist& bl) const -{ - ENCODE_START(1, 1, bl); - encode(bucket_counters, bl); - ENCODE_FINISH(bl); -} -void TrimCounters::Response::decode(bufferlist::const_iterator& p) -{ - DECODE_START(1, p); - decode(bucket_counters, p); - DECODE_FINISH(p); -} -WRITE_CLASS_ENCODER(TrimCounters::Response); - -void TrimCounters::Handler::handle(bufferlist::const_iterator& input, - bufferlist& output) -{ - Request request; - decode(request, input); - auto count = std::min(request.max_buckets, 128); - - Response response; - server->get_bucket_counters(count, response.bucket_counters); - encode(response, output); -} - -/// api to notify peer gateways that trim has completed and their bucket change -/// counters can be reset -struct TrimComplete { - struct Request { - void encode(bufferlist& bl) const; - void decode(bufferlist::const_iterator& p); - }; - struct Response { - void encode(bufferlist& bl) const; - void decode(bufferlist::const_iterator& p); - }; - - /// server interface to reset bucket counters - using Server = TrimCounters::Server; - - /// notify handler - class Handler : public TrimNotifyHandler { - Server *const server; - public: - explicit Handler(Server *server) : server(server) {} - - void handle(bufferlist::const_iterator& input, bufferlist& output) override; - }; -}; - -void TrimComplete::Request::encode(bufferlist& bl) const -{ - ENCODE_START(1, 1, bl); - ENCODE_FINISH(bl); -} -void TrimComplete::Request::decode(bufferlist::const_iterator& p) -{ - DECODE_START(1, p); - DECODE_FINISH(p); -} -WRITE_CLASS_ENCODER(TrimComplete::Request); - -void TrimComplete::Response::encode(bufferlist& bl) const -{ - ENCODE_START(1, 1, bl); - ENCODE_FINISH(bl); -} -void TrimComplete::Response::decode(bufferlist::const_iterator& p) -{ - DECODE_START(1, p); - DECODE_FINISH(p); -} -WRITE_CLASS_ENCODER(TrimComplete::Response); - -void TrimComplete::Handler::handle(bufferlist::const_iterator& input, - bufferlist& output) -{ - Request request; - decode(request, input); - - server->reset_bucket_counters(); - - Response response; - encode(response, output); -} - - -/// rados watcher for bucket trim notifications -class BucketTrimWatcher : public librados::WatchCtx2 { - RGWRados *const store; - const rgw_raw_obj& obj; - rgw_rados_ref ref; - uint64_t handle{0}; - - using HandlerPtr = std::unique_ptr; - boost::container::flat_map handlers; - - public: - BucketTrimWatcher(RGWRados *store, const rgw_raw_obj& obj, - TrimCounters::Server *counters) - : store(store), obj(obj) { - handlers.emplace(NotifyTrimCounters, new TrimCounters::Handler(counters)); - handlers.emplace(NotifyTrimComplete, new TrimComplete::Handler(counters)); - } - - ~BucketTrimWatcher() { - stop(); - } - - int start() { - int r = store->get_raw_obj_ref(obj, &ref); - if (r < 0) { - return r; - } - - // register a watch on the realm's control object - r = ref.ioctx.watch2(ref.obj.oid, &handle, this); - if (r == -ENOENT) { - constexpr bool exclusive = true; - r = ref.ioctx.create(ref.obj.oid, exclusive); - if (r == -EEXIST || r == 0) { - r = ref.ioctx.watch2(ref.obj.oid, &handle, this); - } - } - if (r < 0) { - lderr(store->ctx()) << "Failed to watch " << ref.obj - << " with " << cpp_strerror(-r) << dendl; - ref.ioctx.close(); - return r; - } - - ldout(store->ctx(), 10) << "Watching " << ref.obj.oid << dendl; - return 0; - } - - int restart() { - int r = ref.ioctx.unwatch2(handle); - if (r < 0) { - lderr(store->ctx()) << "Failed to unwatch on " << ref.obj - << " with " << cpp_strerror(-r) << dendl; - } - r = ref.ioctx.watch2(ref.obj.oid, &handle, this); - if (r < 0) { - lderr(store->ctx()) << "Failed to restart watch on " << ref.obj - << " with " << cpp_strerror(-r) << dendl; - ref.ioctx.close(); - } - return r; - } - - void stop() { - if (handle) { - ref.ioctx.unwatch2(handle); - ref.ioctx.close(); - } - } - - /// respond to bucket trim notifications - void handle_notify(uint64_t notify_id, uint64_t cookie, - uint64_t notifier_id, bufferlist& bl) override { - if (cookie != handle) { - return; - } - bufferlist reply; - try { - auto p = bl.cbegin(); - TrimNotifyType type; - decode(type, p); - - auto handler = handlers.find(type); - if (handler != handlers.end()) { - handler->second->handle(p, reply); - } else { - lderr(store->ctx()) << "no handler for notify type " << type << dendl; - } - } catch (const buffer::error& e) { - lderr(store->ctx()) << "Failed to decode notification: " << e.what() << dendl; - } - ref.ioctx.notify_ack(ref.obj.oid, notify_id, cookie, reply); - } - - /// reestablish the watch if it gets disconnected - void handle_error(uint64_t cookie, int err) override { - if (cookie != handle) { - return; - } - if (err == -ENOTCONN) { - ldout(store->ctx(), 4) << "Disconnected watch on " << ref.obj << dendl; - restart(); - } - } -}; - - -/// Interface to communicate with the trim manager about completed operations -struct BucketTrimObserver { - virtual ~BucketTrimObserver() = default; - - virtual void on_bucket_trimmed(std::string&& bucket_instance) = 0; - virtual bool trimmed_recently(const boost::string_view& bucket_instance) = 0; -}; - -/// populate the status with the minimum stable marker of each shard -template -int take_min_status(CephContext *cct, Iter first, Iter last, - std::vector *status) -{ - status->clear(); - std::optional num_shards; - for (auto peer = first; peer != last; ++peer) { - const size_t peer_shards = peer->size(); - if (!num_shards) { - num_shards = peer_shards; - status->resize(*num_shards); - } else if (*num_shards != peer_shards) { - // all peers must agree on the number of shards - return -EINVAL; - } - auto m = status->begin(); - for (auto& shard : *peer) { - auto& marker = *m++; - // only consider incremental sync markers - if (shard.state != rgw_bucket_shard_sync_info::StateIncrementalSync) { - continue; - } - // always take the first marker, or any later marker that's smaller - if (peer == first || marker > shard.inc_marker.position) { - marker = std::move(shard.inc_marker.position); - } - } - } - return 0; -} - -/// trim each bilog shard to the given marker, while limiting the number of -/// concurrent requests -class BucketTrimShardCollectCR : public RGWShardCollectCR { - static constexpr int MAX_CONCURRENT_SHARDS = 16; - RGWRados *const store; - const RGWBucketInfo& bucket_info; - const std::vector& markers; //< shard markers to trim - size_t i{0}; //< index of current shard marker - public: - BucketTrimShardCollectCR(RGWRados *store, const RGWBucketInfo& bucket_info, - const std::vector& markers) - : RGWShardCollectCR(store->ctx(), MAX_CONCURRENT_SHARDS), - store(store), bucket_info(bucket_info), markers(markers) - {} - bool spawn_next() override; -}; - -bool BucketTrimShardCollectCR::spawn_next() -{ - while (i < markers.size()) { - const auto& marker = markers[i]; - const auto shard_id = i++; - - // skip empty markers - if (!marker.empty()) { - ldout(cct, 10) << "trimming bilog shard " << shard_id - << " of " << bucket_info.bucket << " at marker " << marker << dendl; - spawn(new RGWRadosBILogTrimCR(store, bucket_info, shard_id, - std::string{}, marker), - false); - return true; - } - } - return false; -} - -/// trim the bilog of all of the given bucket instance's shards -class BucketTrimInstanceCR : public RGWCoroutine { - RGWRados *const store; - RGWHTTPManager *const http; - BucketTrimObserver *const observer; - std::string bucket_instance; - const std::string& zone_id; //< my zone id - RGWBucketInfo bucket_info; //< bucket instance info to locate bucket indices - int child_ret = 0; - - using StatusShards = std::vector; - std::vector peer_status; //< sync status for each peer - std::vector min_markers; //< min marker per shard - - public: - BucketTrimInstanceCR(RGWRados *store, RGWHTTPManager *http, - BucketTrimObserver *observer, - const std::string& bucket_instance) - : RGWCoroutine(store->ctx()), store(store), - http(http), observer(observer), - bucket_instance(bucket_instance), - zone_id(store->svc.zone->get_zone().id), - peer_status(store->svc.zone->get_zone_data_notify_to_map().size()) - {} - - int operate() override; -}; - -int BucketTrimInstanceCR::operate() -{ - reenter(this) { - ldout(cct, 4) << "starting trim on bucket=" << bucket_instance << dendl; - - // query peers for sync status - set_status("fetching sync status from peers"); - yield { - // query data sync status from each sync peer - rgw_http_param_pair params[] = { - { "type", "bucket-index" }, - { "status", nullptr }, - { "bucket", bucket_instance.c_str() }, - { "source-zone", zone_id.c_str() }, - { nullptr, nullptr } - }; - - auto p = peer_status.begin(); - for (auto& c : store->svc.zone->get_zone_data_notify_to_map()) { - using StatusCR = RGWReadRESTResourceCR; - spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p), - false); - ++p; - } - // in parallel, read the local bucket instance info - spawn(new RGWGetBucketInstanceInfoCR(store->get_async_rados(), store, - bucket_instance, &bucket_info), - false); - } - // wait for a response from each peer. all must respond to attempt trim - while (num_spawned()) { - yield wait_for_child(); - collect(&child_ret, nullptr); - if (child_ret < 0) { - drain_all(); - return set_cr_error(child_ret); - } - } - - // determine the minimum marker for each shard - retcode = take_min_status(cct, peer_status.begin(), peer_status.end(), - &min_markers); - if (retcode < 0) { - ldout(cct, 4) << "failed to correlate bucket sync status from peers" << dendl; - return set_cr_error(retcode); - } - - // trim shards with a ShardCollectCR - ldout(cct, 10) << "trimming bilogs for bucket=" << bucket_info.bucket - << " markers=" << min_markers << ", shards=" << min_markers.size() << dendl; - set_status("trimming bilog shards"); - yield call(new BucketTrimShardCollectCR(store, bucket_info, min_markers)); - // ENODATA just means there were no keys to trim - if (retcode == -ENODATA) { - retcode = 0; - } - if (retcode < 0) { - ldout(cct, 4) << "failed to trim bilog shards: " - << cpp_strerror(retcode) << dendl; - return set_cr_error(retcode); - } - - observer->on_bucket_trimmed(std::move(bucket_instance)); - return set_cr_done(); - } - return 0; -} - -/// trim each bucket instance while limiting the number of concurrent operations -class BucketTrimInstanceCollectCR : public RGWShardCollectCR { - RGWRados *const store; - RGWHTTPManager *const http; - BucketTrimObserver *const observer; - std::vector::const_iterator bucket; - std::vector::const_iterator end; - public: - BucketTrimInstanceCollectCR(RGWRados *store, RGWHTTPManager *http, - BucketTrimObserver *observer, - const std::vector& buckets, - int max_concurrent) - : RGWShardCollectCR(store->ctx(), max_concurrent), - store(store), http(http), observer(observer), - bucket(buckets.begin()), end(buckets.end()) - {} - bool spawn_next() override; -}; - -bool BucketTrimInstanceCollectCR::spawn_next() -{ - if (bucket == end) { - return false; - } - spawn(new BucketTrimInstanceCR(store, http, observer, *bucket), false); - ++bucket; - return true; -} - -/// correlate the replies from each peer gateway into the given counter -int accumulate_peer_counters(bufferlist& bl, BucketChangeCounter& counter) -{ - counter.clear(); - - try { - // decode notify responses - auto p = bl.cbegin(); - std::map, bufferlist> replies; - std::set> timeouts; - decode(replies, p); - decode(timeouts, p); - - for (auto& peer : replies) { - auto q = peer.second.cbegin(); - TrimCounters::Response response; - decode(response, q); - for (const auto& b : response.bucket_counters) { - counter.insert(b.bucket, b.count); - } - } - } catch (const buffer::error& e) { - return -EIO; - } - return 0; -} - -/// metadata callback has the signature bool(string&& key, string&& marker) -using MetadataListCallback = std::function; - -/// lists metadata keys, passing each to a callback until it returns false. -/// on reaching the end, it will restart at the beginning and list up to the -/// initial marker -class AsyncMetadataList : public RGWAsyncRadosRequest { - CephContext *const cct; - RGWMetadataManager *const mgr; - const std::string section; - const std::string start_marker; - MetadataListCallback callback; - - int _send_request() override; - public: - AsyncMetadataList(CephContext *cct, RGWCoroutine *caller, - RGWAioCompletionNotifier *cn, RGWMetadataManager *mgr, - const std::string& section, const std::string& start_marker, - const MetadataListCallback& callback) - : RGWAsyncRadosRequest(caller, cn), cct(cct), mgr(mgr), - section(section), start_marker(start_marker), callback(callback) - {} -}; - -int AsyncMetadataList::_send_request() -{ - void* handle = nullptr; - std::list keys; - bool truncated{false}; - std::string marker; - - // start a listing at the given marker - int r = mgr->list_keys_init(section, start_marker, &handle); - if (r == -EINVAL) { - // restart with empty marker below - } else if (r < 0) { - ldout(cct, 10) << "failed to init metadata listing: " - << cpp_strerror(r) << dendl; - return r; - } else { - ldout(cct, 20) << "starting metadata listing at " << start_marker << dendl; - - // release the handle when scope exits - auto g = make_scope_guard([=] { mgr->list_keys_complete(handle); }); - - do { - // get the next key and marker - r = mgr->list_keys_next(handle, 1, keys, &truncated); - if (r < 0) { - ldout(cct, 10) << "failed to list metadata: " - << cpp_strerror(r) << dendl; - return r; - } - marker = mgr->get_marker(handle); - - if (!keys.empty()) { - ceph_assert(keys.size() == 1); - auto& key = keys.front(); - if (!callback(std::move(key), std::move(marker))) { - return 0; - } - } - } while (truncated); - - if (start_marker.empty()) { - // already listed all keys - return 0; - } - } - - // restart the listing from the beginning (empty marker) - handle = nullptr; - - r = mgr->list_keys_init(section, "", &handle); - if (r < 0) { - ldout(cct, 10) << "failed to restart metadata listing: " - << cpp_strerror(r) << dendl; - return r; - } - ldout(cct, 20) << "restarting metadata listing" << dendl; - - // release the handle when scope exits - auto g = make_scope_guard([=] { mgr->list_keys_complete(handle); }); - do { - // get the next key and marker - r = mgr->list_keys_next(handle, 1, keys, &truncated); - if (r < 0) { - ldout(cct, 10) << "failed to list metadata: " - << cpp_strerror(r) << dendl; - return r; - } - marker = mgr->get_marker(handle); - - if (!keys.empty()) { - ceph_assert(keys.size() == 1); - auto& key = keys.front(); - // stop at original marker - if (marker >= start_marker) { - return 0; - } - if (!callback(std::move(key), std::move(marker))) { - return 0; - } - } - } while (truncated); - - return 0; -} - -/// coroutine wrapper for AsyncMetadataList -class MetadataListCR : public RGWSimpleCoroutine { - RGWAsyncRadosProcessor *const async_rados; - RGWMetadataManager *const mgr; - const std::string& section; - const std::string& start_marker; - MetadataListCallback callback; - RGWAsyncRadosRequest *req{nullptr}; - public: - MetadataListCR(CephContext *cct, RGWAsyncRadosProcessor *async_rados, - RGWMetadataManager *mgr, const std::string& section, - const std::string& start_marker, - const MetadataListCallback& callback) - : RGWSimpleCoroutine(cct), async_rados(async_rados), mgr(mgr), - section(section), start_marker(start_marker), callback(callback) - {} - ~MetadataListCR() override { - request_cleanup(); - } - - int send_request() override { - req = new AsyncMetadataList(cct, this, stack->create_completion_notifier(), - mgr, section, start_marker, callback); - async_rados->queue(req); - return 0; - } - int request_complete() override { - return req->get_ret_status(); - } - void request_cleanup() override { - if (req) { - req->finish(); - req = nullptr; - } - } -}; - -class BucketTrimCR : public RGWCoroutine { - RGWRados *const store; - RGWHTTPManager *const http; - const BucketTrimConfig& config; - BucketTrimObserver *const observer; - const rgw_raw_obj& obj; - ceph::mono_time start_time; - bufferlist notify_replies; - BucketChangeCounter counter; - std::vector buckets; //< buckets selected for trim - BucketTrimStatus status; - RGWObjVersionTracker objv; //< version tracker for trim status object - std::string last_cold_marker; //< position for next trim marker - - static const std::string section; //< metadata section for bucket instances - public: - BucketTrimCR(RGWRados *store, RGWHTTPManager *http, - const BucketTrimConfig& config, BucketTrimObserver *observer, - const rgw_raw_obj& obj) - : RGWCoroutine(store->ctx()), store(store), http(http), config(config), - observer(observer), obj(obj), counter(config.counter_size) - {} - - int operate() override; -}; - -const std::string BucketTrimCR::section{"bucket.instance"}; - -int BucketTrimCR::operate() -{ - reenter(this) { - start_time = ceph::mono_clock::now(); - - if (config.buckets_per_interval) { - // query watch/notify for hot buckets - ldout(cct, 10) << "fetching active bucket counters" << dendl; - set_status("fetching active bucket counters"); - yield { - // request the top bucket counters from each peer gateway - const TrimNotifyType type = NotifyTrimCounters; - TrimCounters::Request request{32}; - bufferlist bl; - encode(type, bl); - encode(request, bl); - call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms, - ¬ify_replies)); - } - if (retcode < 0) { - ldout(cct, 10) << "failed to fetch peer bucket counters" << dendl; - return set_cr_error(retcode); - } - - // select the hottest buckets for trim - retcode = accumulate_peer_counters(notify_replies, counter); - if (retcode < 0) { - ldout(cct, 4) << "failed to correlate peer bucket counters" << dendl; - return set_cr_error(retcode); - } - buckets.reserve(config.buckets_per_interval); - - const int max_count = config.buckets_per_interval - - config.min_cold_buckets_per_interval; - counter.get_highest(max_count, - [this] (const std::string& bucket, int count) { - buckets.push_back(bucket); - }); - } - - if (buckets.size() < config.buckets_per_interval) { - // read BucketTrimStatus for marker position - set_status("reading trim status"); - using ReadStatus = RGWSimpleRadosReadCR; - yield call(new ReadStatus(store->get_async_rados(), store->svc.sysobj, obj, - &status, true, &objv)); - if (retcode < 0) { - ldout(cct, 10) << "failed to read bilog trim status: " - << cpp_strerror(retcode) << dendl; - return set_cr_error(retcode); - } - if (status.marker == "MAX") { - status.marker.clear(); // restart at the beginning - } - ldout(cct, 10) << "listing cold buckets from marker=" - << status.marker << dendl; - - set_status("listing cold buckets for trim"); - yield { - // capture a reference so 'this' remains valid in the callback - auto ref = boost::intrusive_ptr{this}; - // list cold buckets to consider for trim - auto cb = [this, ref] (std::string&& bucket, std::string&& marker) { - // filter out keys that we trimmed recently - if (observer->trimmed_recently(bucket)) { - return true; - } - // filter out active buckets that we've already selected - auto i = std::find(buckets.begin(), buckets.end(), bucket); - if (i != buckets.end()) { - return true; - } - buckets.emplace_back(std::move(bucket)); - // remember the last cold bucket spawned to update the status marker - last_cold_marker = std::move(marker); - // return true if there's room for more - return buckets.size() < config.buckets_per_interval; - }; - - call(new MetadataListCR(cct, store->get_async_rados(), store->meta_mgr, - section, status.marker, cb)); - } - if (retcode < 0) { - ldout(cct, 4) << "failed to list bucket instance metadata: " - << cpp_strerror(retcode) << dendl; - return set_cr_error(retcode); - } - } - - // trim bucket instances with limited concurrency - set_status("trimming buckets"); - ldout(cct, 4) << "collected " << buckets.size() << " buckets for trim" << dendl; - yield call(new BucketTrimInstanceCollectCR(store, http, observer, buckets, - config.concurrent_buckets)); - // ignore errors from individual buckets - - // write updated trim status - if (!last_cold_marker.empty() && status.marker != last_cold_marker) { - set_status("writing updated trim status"); - status.marker = std::move(last_cold_marker); - ldout(cct, 20) << "writing bucket trim marker=" << status.marker << dendl; - using WriteStatus = RGWSimpleRadosWriteCR; - yield call(new WriteStatus(store->get_async_rados(), store->svc.sysobj, obj, - status, &objv)); - if (retcode < 0) { - ldout(cct, 4) << "failed to write updated trim status: " - << cpp_strerror(retcode) << dendl; - return set_cr_error(retcode); - } - } - - // notify peers that trim completed - set_status("trim completed"); - yield { - const TrimNotifyType type = NotifyTrimComplete; - TrimComplete::Request request; - bufferlist bl; - encode(type, bl); - encode(request, bl); - call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms, - nullptr)); - } - if (retcode < 0) { - ldout(cct, 10) << "failed to notify peers of trim completion" << dendl; - return set_cr_error(retcode); - } - - ldout(cct, 4) << "bucket index log processing completed in " - << ceph::mono_clock::now() - start_time << dendl; - return set_cr_done(); - } - return 0; -} - -class BucketTrimPollCR : public RGWCoroutine { - RGWRados *const store; - RGWHTTPManager *const http; - const BucketTrimConfig& config; - BucketTrimObserver *const observer; - const rgw_raw_obj& obj; - const std::string name{"trim"}; //< lock name - const std::string cookie; - - public: - BucketTrimPollCR(RGWRados *store, RGWHTTPManager *http, - const BucketTrimConfig& config, - BucketTrimObserver *observer, const rgw_raw_obj& obj) - : RGWCoroutine(store->ctx()), store(store), http(http), - config(config), observer(observer), obj(obj), - cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)) - {} - - int operate() override; -}; - -int BucketTrimPollCR::operate() -{ - reenter(this) { - for (;;) { - set_status("sleeping"); - wait(utime_t{static_cast(config.trim_interval_sec), 0}); - - // prevent others from trimming for our entire wait interval - set_status("acquiring trim lock"); - yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store, - obj, name, cookie, - config.trim_interval_sec)); - if (retcode < 0) { - ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl; - continue; - } - - set_status("trimming"); - yield call(new BucketTrimCR(store, http, config, observer, obj)); - if (retcode < 0) { - // on errors, unlock so other gateways can try - set_status("unlocking"); - yield call(new RGWSimpleRadosUnlockCR(store->get_async_rados(), store, - obj, name, cookie)); - } - } - } - return 0; -} - -/// tracks a bounded list of events with timestamps. old events can be expired, -/// and recent events can be searched by key. expiration depends on events being -/// inserted in temporal order -template -class RecentEventList { - public: - using clock_type = Clock; - using time_point = typename clock_type::time_point; - - RecentEventList(size_t max_size, const ceph::timespan& max_duration) - : events(max_size), max_duration(max_duration) - {} - - /// insert an event at the given point in time. this time must be at least as - /// recent as the last inserted event - void insert(T&& value, const time_point& now) { - // ceph_assert(events.empty() || now >= events.back().time) - events.push_back(Event{std::move(value), now}); - } - - /// performs a linear search for an event matching the given key, whose type - /// U can be any that provides operator==(U, T) - template - bool lookup(const U& key) const { - for (const auto& event : events) { - if (key == event.value) { - return true; - } - } - return false; - } - - /// remove events that are no longer recent compared to the given point in time - void expire_old(const time_point& now) { - const auto expired_before = now - max_duration; - while (!events.empty() && events.front().time < expired_before) { - events.pop_front(); - } - } - - private: - struct Event { - T value; - time_point time; - }; - boost::circular_buffer events; - const ceph::timespan max_duration; -}; - -namespace rgw { - -// read bucket trim configuration from ceph context -void configure_bucket_trim(CephContext *cct, BucketTrimConfig& config) -{ - const auto& conf = cct->_conf; - - config.trim_interval_sec = - conf.get_val("rgw_sync_log_trim_interval"); - config.counter_size = 512; - config.buckets_per_interval = - conf.get_val("rgw_sync_log_trim_max_buckets"); - config.min_cold_buckets_per_interval = - conf.get_val("rgw_sync_log_trim_min_cold_buckets"); - config.concurrent_buckets = - conf.get_val("rgw_sync_log_trim_concurrent_buckets"); - config.notify_timeout_ms = 10000; - config.recent_size = 128; - config.recent_duration = std::chrono::hours(2); -} - -class BucketTrimManager::Impl : public TrimCounters::Server, - public BucketTrimObserver { - public: - RGWRados *const store; - const BucketTrimConfig config; - - const rgw_raw_obj status_obj; - - /// count frequency of bucket instance entries in the data changes log - BucketChangeCounter counter; - - using RecentlyTrimmedBucketList = RecentEventList; - using clock_type = RecentlyTrimmedBucketList::clock_type; - /// track recently trimmed buckets to focus trim activity elsewhere - RecentlyTrimmedBucketList trimmed; - - /// serve the bucket trim watch/notify api - BucketTrimWatcher watcher; - - /// protect data shared between data sync, trim, and watch/notify threads - std::mutex mutex; - - Impl(RGWRados *store, const BucketTrimConfig& config) - : store(store), config(config), - status_obj(store->svc.zone->get_zone_params().log_pool, BucketTrimStatus::oid), - counter(config.counter_size), - trimmed(config.recent_size, config.recent_duration), - watcher(store, status_obj, this) - {} - - /// TrimCounters::Server interface for watch/notify api - void get_bucket_counters(int count, TrimCounters::Vector& buckets) { - buckets.reserve(count); - std::lock_guard lock(mutex); - counter.get_highest(count, [&buckets] (const std::string& key, int count) { - buckets.emplace_back(key, count); - }); - ldout(store->ctx(), 20) << "get_bucket_counters: " << buckets << dendl; - } - - void reset_bucket_counters() override { - ldout(store->ctx(), 20) << "bucket trim completed" << dendl; - std::lock_guard lock(mutex); - counter.clear(); - trimmed.expire_old(clock_type::now()); - } - - /// BucketTrimObserver interface to remember successfully-trimmed buckets - void on_bucket_trimmed(std::string&& bucket_instance) override { - ldout(store->ctx(), 20) << "trimmed bucket instance " << bucket_instance << dendl; - std::lock_guard lock(mutex); - trimmed.insert(std::move(bucket_instance), clock_type::now()); - } - - bool trimmed_recently(const boost::string_view& bucket_instance) override { - std::lock_guard lock(mutex); - return trimmed.lookup(bucket_instance); - } -}; - -BucketTrimManager::BucketTrimManager(RGWRados *store, - const BucketTrimConfig& config) - : impl(new Impl(store, config)) -{ -} -BucketTrimManager::~BucketTrimManager() = default; - -int BucketTrimManager::init() -{ - return impl->watcher.start(); -} - -void BucketTrimManager::on_bucket_changed(const boost::string_view& bucket) -{ - std::lock_guard lock(impl->mutex); - // filter recently trimmed bucket instances out of bucket change counter - if (impl->trimmed.lookup(bucket)) { - return; - } - impl->counter.insert(bucket.to_string()); -} - -RGWCoroutine* BucketTrimManager::create_bucket_trim_cr(RGWHTTPManager *http) -{ - return new BucketTrimPollCR(impl->store, http, impl->config, - impl.get(), impl->status_obj); -} - -RGWCoroutine* BucketTrimManager::create_admin_bucket_trim_cr(RGWHTTPManager *http) -{ - // return the trim coroutine without any polling - return new BucketTrimCR(impl->store, http, impl->config, - impl.get(), impl->status_obj); -} - -} // namespace rgw diff --git a/src/rgw/rgw_sync_log_trim.h b/src/rgw/rgw_sync_log_trim.h deleted file mode 100644 index 13d1f63a1cf4..000000000000 --- a/src/rgw/rgw_sync_log_trim.h +++ /dev/null @@ -1,110 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2017 Red Hat, Inc - * - * Author: Casey Bodley - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - */ - -#ifndef RGW_SYNC_LOG_TRIM_H -#define RGW_SYNC_LOG_TRIM_H - -#include -#include -#include "include/encoding.h" -#include "common/ceph_time.h" - -class CephContext; -class RGWCoroutine; -class RGWHTTPManager; -class RGWRados; - -namespace rgw { - -/// Interface to inform the trim process about which buckets are most active -struct BucketChangeObserver { - virtual ~BucketChangeObserver() = default; - - virtual void on_bucket_changed(const boost::string_view& bucket_instance) = 0; -}; - -/// Configuration for BucketTrimManager -struct BucketTrimConfig { - /// time interval in seconds between bucket trim attempts - uint32_t trim_interval_sec{0}; - /// maximum number of buckets to track with BucketChangeObserver - size_t counter_size{0}; - /// maximum number of buckets to process each trim interval - uint32_t buckets_per_interval{0}; - /// minimum number of buckets to choose from the global bucket instance list - uint32_t min_cold_buckets_per_interval{0}; - /// maximum number of buckets to process in parallel - uint32_t concurrent_buckets{0}; - /// timeout in ms for bucket trim notify replies - uint64_t notify_timeout_ms{0}; - /// maximum number of recently trimmed buckets to remember (should be small - /// enough for a linear search) - size_t recent_size{0}; - /// maximum duration to consider a trim as 'recent' (should be some multiple - /// of the trim interval, at least) - ceph::timespan recent_duration{0}; -}; - -/// fill out the BucketTrimConfig from the ceph context -void configure_bucket_trim(CephContext *cct, BucketTrimConfig& config); - -/// Determines the buckets on which to focus trim activity, using two sources of -/// input: the frequency of entries read from the data changes log, and a global -/// listing of the bucket.instance metadata. This allows us to trim active -/// buckets quickly, while also ensuring that all buckets will eventually trim -class BucketTrimManager : public BucketChangeObserver { - class Impl; - std::unique_ptr impl; - public: - BucketTrimManager(RGWRados *store, const BucketTrimConfig& config); - ~BucketTrimManager(); - - int init(); - - /// increment a counter for the given bucket instance - void on_bucket_changed(const boost::string_view& bucket_instance) override; - - /// create a coroutine to run the bucket trim process every trim interval - RGWCoroutine* create_bucket_trim_cr(RGWHTTPManager *http); - - /// create a coroutine to trim buckets directly via radosgw-admin - RGWCoroutine* create_admin_bucket_trim_cr(RGWHTTPManager *http); -}; - -/// provides persistent storage for the trim manager's current position in the -/// list of bucket instance metadata -struct BucketTrimStatus { - std::string marker; //< metadata key of current bucket instance - - void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); - encode(marker, bl); - ENCODE_FINISH(bl); - } - void decode(bufferlist::const_iterator& p) { - DECODE_START(1, p); - decode(marker, p); - DECODE_FINISH(p); - } - - static const std::string oid; -}; - -} // namespace rgw - -WRITE_CLASS_ENCODER(rgw::BucketTrimStatus); - -#endif // RGW_SYNC_LOG_TRIM_H diff --git a/src/rgw/rgw_trim_bilog.cc b/src/rgw/rgw_trim_bilog.cc new file mode 100644 index 000000000000..fd8927868ca8 --- /dev/null +++ b/src/rgw/rgw_trim_bilog.cc @@ -0,0 +1,1095 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * Author: Casey Bodley + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include +#include +#include + +#include "include/scope_guard.h" +#include "common/bounded_key_counter.h" +#include "common/errno.h" +#include "rgw_trim_bilog.h" +#include "rgw_cr_rados.h" +#include "rgw_cr_rest.h" +#include "rgw_data_sync.h" +#include "rgw_metadata.h" +#include "rgw_rados.h" +#include "rgw_zone.h" +#include "rgw_sync.h" + +#include "services/svc_zone.h" + +#include +#include "include/ceph_assert.h" + +#define dout_subsys ceph_subsys_rgw + +#undef dout_prefix +#define dout_prefix (*_dout << "trim: ") + +using rgw::BucketTrimConfig; +using BucketChangeCounter = BoundedKeyCounter; + +const std::string rgw::BucketTrimStatus::oid = "bilog.trim"; +using rgw::BucketTrimStatus; + + +// watch/notify api for gateways to coordinate about which buckets to trim +enum TrimNotifyType { + NotifyTrimCounters = 0, + NotifyTrimComplete, +}; +WRITE_RAW_ENCODER(TrimNotifyType); + +struct TrimNotifyHandler { + virtual ~TrimNotifyHandler() = default; + + virtual void handle(bufferlist::const_iterator& input, bufferlist& output) = 0; +}; + +/// api to share the bucket trim counters between gateways in the same zone. +/// each gateway will process different datalog shards, so the gateway that runs +/// the trim process needs to accumulate their counters +struct TrimCounters { + /// counter for a single bucket + struct BucketCounter { + std::string bucket; //< bucket instance metadata key + int count{0}; + + BucketCounter() = default; + BucketCounter(const std::string& bucket, int count) + : bucket(bucket), count(count) {} + + void encode(bufferlist& bl) const; + void decode(bufferlist::const_iterator& p); + }; + using Vector = std::vector; + + /// request bucket trim counters from peer gateways + struct Request { + uint16_t max_buckets; //< maximum number of bucket counters to return + + void encode(bufferlist& bl) const; + void decode(bufferlist::const_iterator& p); + }; + + /// return the current bucket trim counters + struct Response { + Vector bucket_counters; + + void encode(bufferlist& bl) const; + void decode(bufferlist::const_iterator& p); + }; + + /// server interface to query the hottest buckets + struct Server { + virtual ~Server() = default; + + virtual void get_bucket_counters(int count, Vector& counters) = 0; + virtual void reset_bucket_counters() = 0; + }; + + /// notify handler + class Handler : public TrimNotifyHandler { + Server *const server; + public: + explicit Handler(Server *server) : server(server) {} + + void handle(bufferlist::const_iterator& input, bufferlist& output) override; + }; +}; +std::ostream& operator<<(std::ostream& out, const TrimCounters::BucketCounter& rhs) +{ + return out << rhs.bucket << ":" << rhs.count; +} + +void TrimCounters::BucketCounter::encode(bufferlist& bl) const +{ + using ceph::encode; + // no versioning to save space + encode(bucket, bl); + encode(count, bl); +} +void TrimCounters::BucketCounter::decode(bufferlist::const_iterator& p) +{ + using ceph::decode; + decode(bucket, p); + decode(count, p); +} +WRITE_CLASS_ENCODER(TrimCounters::BucketCounter); + +void TrimCounters::Request::encode(bufferlist& bl) const +{ + ENCODE_START(1, 1, bl); + encode(max_buckets, bl); + ENCODE_FINISH(bl); +} +void TrimCounters::Request::decode(bufferlist::const_iterator& p) +{ + DECODE_START(1, p); + decode(max_buckets, p); + DECODE_FINISH(p); +} +WRITE_CLASS_ENCODER(TrimCounters::Request); + +void TrimCounters::Response::encode(bufferlist& bl) const +{ + ENCODE_START(1, 1, bl); + encode(bucket_counters, bl); + ENCODE_FINISH(bl); +} +void TrimCounters::Response::decode(bufferlist::const_iterator& p) +{ + DECODE_START(1, p); + decode(bucket_counters, p); + DECODE_FINISH(p); +} +WRITE_CLASS_ENCODER(TrimCounters::Response); + +void TrimCounters::Handler::handle(bufferlist::const_iterator& input, + bufferlist& output) +{ + Request request; + decode(request, input); + auto count = std::min(request.max_buckets, 128); + + Response response; + server->get_bucket_counters(count, response.bucket_counters); + encode(response, output); +} + +/// api to notify peer gateways that trim has completed and their bucket change +/// counters can be reset +struct TrimComplete { + struct Request { + void encode(bufferlist& bl) const; + void decode(bufferlist::const_iterator& p); + }; + struct Response { + void encode(bufferlist& bl) const; + void decode(bufferlist::const_iterator& p); + }; + + /// server interface to reset bucket counters + using Server = TrimCounters::Server; + + /// notify handler + class Handler : public TrimNotifyHandler { + Server *const server; + public: + explicit Handler(Server *server) : server(server) {} + + void handle(bufferlist::const_iterator& input, bufferlist& output) override; + }; +}; + +void TrimComplete::Request::encode(bufferlist& bl) const +{ + ENCODE_START(1, 1, bl); + ENCODE_FINISH(bl); +} +void TrimComplete::Request::decode(bufferlist::const_iterator& p) +{ + DECODE_START(1, p); + DECODE_FINISH(p); +} +WRITE_CLASS_ENCODER(TrimComplete::Request); + +void TrimComplete::Response::encode(bufferlist& bl) const +{ + ENCODE_START(1, 1, bl); + ENCODE_FINISH(bl); +} +void TrimComplete::Response::decode(bufferlist::const_iterator& p) +{ + DECODE_START(1, p); + DECODE_FINISH(p); +} +WRITE_CLASS_ENCODER(TrimComplete::Response); + +void TrimComplete::Handler::handle(bufferlist::const_iterator& input, + bufferlist& output) +{ + Request request; + decode(request, input); + + server->reset_bucket_counters(); + + Response response; + encode(response, output); +} + + +/// rados watcher for bucket trim notifications +class BucketTrimWatcher : public librados::WatchCtx2 { + RGWRados *const store; + const rgw_raw_obj& obj; + rgw_rados_ref ref; + uint64_t handle{0}; + + using HandlerPtr = std::unique_ptr; + boost::container::flat_map handlers; + + public: + BucketTrimWatcher(RGWRados *store, const rgw_raw_obj& obj, + TrimCounters::Server *counters) + : store(store), obj(obj) { + handlers.emplace(NotifyTrimCounters, new TrimCounters::Handler(counters)); + handlers.emplace(NotifyTrimComplete, new TrimComplete::Handler(counters)); + } + + ~BucketTrimWatcher() { + stop(); + } + + int start() { + int r = store->get_raw_obj_ref(obj, &ref); + if (r < 0) { + return r; + } + + // register a watch on the realm's control object + r = ref.ioctx.watch2(ref.obj.oid, &handle, this); + if (r == -ENOENT) { + constexpr bool exclusive = true; + r = ref.ioctx.create(ref.obj.oid, exclusive); + if (r == -EEXIST || r == 0) { + r = ref.ioctx.watch2(ref.obj.oid, &handle, this); + } + } + if (r < 0) { + lderr(store->ctx()) << "Failed to watch " << ref.obj + << " with " << cpp_strerror(-r) << dendl; + ref.ioctx.close(); + return r; + } + + ldout(store->ctx(), 10) << "Watching " << ref.obj.oid << dendl; + return 0; + } + + int restart() { + int r = ref.ioctx.unwatch2(handle); + if (r < 0) { + lderr(store->ctx()) << "Failed to unwatch on " << ref.obj + << " with " << cpp_strerror(-r) << dendl; + } + r = ref.ioctx.watch2(ref.obj.oid, &handle, this); + if (r < 0) { + lderr(store->ctx()) << "Failed to restart watch on " << ref.obj + << " with " << cpp_strerror(-r) << dendl; + ref.ioctx.close(); + } + return r; + } + + void stop() { + if (handle) { + ref.ioctx.unwatch2(handle); + ref.ioctx.close(); + } + } + + /// respond to bucket trim notifications + void handle_notify(uint64_t notify_id, uint64_t cookie, + uint64_t notifier_id, bufferlist& bl) override { + if (cookie != handle) { + return; + } + bufferlist reply; + try { + auto p = bl.cbegin(); + TrimNotifyType type; + decode(type, p); + + auto handler = handlers.find(type); + if (handler != handlers.end()) { + handler->second->handle(p, reply); + } else { + lderr(store->ctx()) << "no handler for notify type " << type << dendl; + } + } catch (const buffer::error& e) { + lderr(store->ctx()) << "Failed to decode notification: " << e.what() << dendl; + } + ref.ioctx.notify_ack(ref.obj.oid, notify_id, cookie, reply); + } + + /// reestablish the watch if it gets disconnected + void handle_error(uint64_t cookie, int err) override { + if (cookie != handle) { + return; + } + if (err == -ENOTCONN) { + ldout(store->ctx(), 4) << "Disconnected watch on " << ref.obj << dendl; + restart(); + } + } +}; + + +/// Interface to communicate with the trim manager about completed operations +struct BucketTrimObserver { + virtual ~BucketTrimObserver() = default; + + virtual void on_bucket_trimmed(std::string&& bucket_instance) = 0; + virtual bool trimmed_recently(const boost::string_view& bucket_instance) = 0; +}; + +/// populate the status with the minimum stable marker of each shard +template +int take_min_status(CephContext *cct, Iter first, Iter last, + std::vector *status) +{ + status->clear(); + std::optional num_shards; + for (auto peer = first; peer != last; ++peer) { + const size_t peer_shards = peer->size(); + if (!num_shards) { + num_shards = peer_shards; + status->resize(*num_shards); + } else if (*num_shards != peer_shards) { + // all peers must agree on the number of shards + return -EINVAL; + } + auto m = status->begin(); + for (auto& shard : *peer) { + auto& marker = *m++; + // only consider incremental sync markers + if (shard.state != rgw_bucket_shard_sync_info::StateIncrementalSync) { + continue; + } + // always take the first marker, or any later marker that's smaller + if (peer == first || marker > shard.inc_marker.position) { + marker = std::move(shard.inc_marker.position); + } + } + } + return 0; +} + +/// trim each bilog shard to the given marker, while limiting the number of +/// concurrent requests +class BucketTrimShardCollectCR : public RGWShardCollectCR { + static constexpr int MAX_CONCURRENT_SHARDS = 16; + RGWRados *const store; + const RGWBucketInfo& bucket_info; + const std::vector& markers; //< shard markers to trim + size_t i{0}; //< index of current shard marker + public: + BucketTrimShardCollectCR(RGWRados *store, const RGWBucketInfo& bucket_info, + const std::vector& markers) + : RGWShardCollectCR(store->ctx(), MAX_CONCURRENT_SHARDS), + store(store), bucket_info(bucket_info), markers(markers) + {} + bool spawn_next() override; +}; + +bool BucketTrimShardCollectCR::spawn_next() +{ + while (i < markers.size()) { + const auto& marker = markers[i]; + const auto shard_id = i++; + + // skip empty markers + if (!marker.empty()) { + ldout(cct, 10) << "trimming bilog shard " << shard_id + << " of " << bucket_info.bucket << " at marker " << marker << dendl; + spawn(new RGWRadosBILogTrimCR(store, bucket_info, shard_id, + std::string{}, marker), + false); + return true; + } + } + return false; +} + +/// trim the bilog of all of the given bucket instance's shards +class BucketTrimInstanceCR : public RGWCoroutine { + RGWRados *const store; + RGWHTTPManager *const http; + BucketTrimObserver *const observer; + std::string bucket_instance; + const std::string& zone_id; //< my zone id + RGWBucketInfo bucket_info; //< bucket instance info to locate bucket indices + int child_ret = 0; + + using StatusShards = std::vector; + std::vector peer_status; //< sync status for each peer + std::vector min_markers; //< min marker per shard + + public: + BucketTrimInstanceCR(RGWRados *store, RGWHTTPManager *http, + BucketTrimObserver *observer, + const std::string& bucket_instance) + : RGWCoroutine(store->ctx()), store(store), + http(http), observer(observer), + bucket_instance(bucket_instance), + zone_id(store->svc.zone->get_zone().id), + peer_status(store->svc.zone->get_zone_data_notify_to_map().size()) + {} + + int operate() override; +}; + +int BucketTrimInstanceCR::operate() +{ + reenter(this) { + ldout(cct, 4) << "starting trim on bucket=" << bucket_instance << dendl; + + // query peers for sync status + set_status("fetching sync status from peers"); + yield { + // query data sync status from each sync peer + rgw_http_param_pair params[] = { + { "type", "bucket-index" }, + { "status", nullptr }, + { "bucket", bucket_instance.c_str() }, + { "source-zone", zone_id.c_str() }, + { nullptr, nullptr } + }; + + auto p = peer_status.begin(); + for (auto& c : store->svc.zone->get_zone_data_notify_to_map()) { + using StatusCR = RGWReadRESTResourceCR; + spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p), + false); + ++p; + } + // in parallel, read the local bucket instance info + spawn(new RGWGetBucketInstanceInfoCR(store->get_async_rados(), store, + bucket_instance, &bucket_info), + false); + } + // wait for a response from each peer. all must respond to attempt trim + while (num_spawned()) { + yield wait_for_child(); + collect(&child_ret, nullptr); + if (child_ret < 0) { + drain_all(); + return set_cr_error(child_ret); + } + } + + // determine the minimum marker for each shard + retcode = take_min_status(cct, peer_status.begin(), peer_status.end(), + &min_markers); + if (retcode < 0) { + ldout(cct, 4) << "failed to correlate bucket sync status from peers" << dendl; + return set_cr_error(retcode); + } + + // trim shards with a ShardCollectCR + ldout(cct, 10) << "trimming bilogs for bucket=" << bucket_info.bucket + << " markers=" << min_markers << ", shards=" << min_markers.size() << dendl; + set_status("trimming bilog shards"); + yield call(new BucketTrimShardCollectCR(store, bucket_info, min_markers)); + // ENODATA just means there were no keys to trim + if (retcode == -ENODATA) { + retcode = 0; + } + if (retcode < 0) { + ldout(cct, 4) << "failed to trim bilog shards: " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + + observer->on_bucket_trimmed(std::move(bucket_instance)); + return set_cr_done(); + } + return 0; +} + +/// trim each bucket instance while limiting the number of concurrent operations +class BucketTrimInstanceCollectCR : public RGWShardCollectCR { + RGWRados *const store; + RGWHTTPManager *const http; + BucketTrimObserver *const observer; + std::vector::const_iterator bucket; + std::vector::const_iterator end; + public: + BucketTrimInstanceCollectCR(RGWRados *store, RGWHTTPManager *http, + BucketTrimObserver *observer, + const std::vector& buckets, + int max_concurrent) + : RGWShardCollectCR(store->ctx(), max_concurrent), + store(store), http(http), observer(observer), + bucket(buckets.begin()), end(buckets.end()) + {} + bool spawn_next() override; +}; + +bool BucketTrimInstanceCollectCR::spawn_next() +{ + if (bucket == end) { + return false; + } + spawn(new BucketTrimInstanceCR(store, http, observer, *bucket), false); + ++bucket; + return true; +} + +/// correlate the replies from each peer gateway into the given counter +int accumulate_peer_counters(bufferlist& bl, BucketChangeCounter& counter) +{ + counter.clear(); + + try { + // decode notify responses + auto p = bl.cbegin(); + std::map, bufferlist> replies; + std::set> timeouts; + decode(replies, p); + decode(timeouts, p); + + for (auto& peer : replies) { + auto q = peer.second.cbegin(); + TrimCounters::Response response; + decode(response, q); + for (const auto& b : response.bucket_counters) { + counter.insert(b.bucket, b.count); + } + } + } catch (const buffer::error& e) { + return -EIO; + } + return 0; +} + +/// metadata callback has the signature bool(string&& key, string&& marker) +using MetadataListCallback = std::function; + +/// lists metadata keys, passing each to a callback until it returns false. +/// on reaching the end, it will restart at the beginning and list up to the +/// initial marker +class AsyncMetadataList : public RGWAsyncRadosRequest { + CephContext *const cct; + RGWMetadataManager *const mgr; + const std::string section; + const std::string start_marker; + MetadataListCallback callback; + + int _send_request() override; + public: + AsyncMetadataList(CephContext *cct, RGWCoroutine *caller, + RGWAioCompletionNotifier *cn, RGWMetadataManager *mgr, + const std::string& section, const std::string& start_marker, + const MetadataListCallback& callback) + : RGWAsyncRadosRequest(caller, cn), cct(cct), mgr(mgr), + section(section), start_marker(start_marker), callback(callback) + {} +}; + +int AsyncMetadataList::_send_request() +{ + void* handle = nullptr; + std::list keys; + bool truncated{false}; + std::string marker; + + // start a listing at the given marker + int r = mgr->list_keys_init(section, start_marker, &handle); + if (r == -EINVAL) { + // restart with empty marker below + } else if (r < 0) { + ldout(cct, 10) << "failed to init metadata listing: " + << cpp_strerror(r) << dendl; + return r; + } else { + ldout(cct, 20) << "starting metadata listing at " << start_marker << dendl; + + // release the handle when scope exits + auto g = make_scope_guard([=] { mgr->list_keys_complete(handle); }); + + do { + // get the next key and marker + r = mgr->list_keys_next(handle, 1, keys, &truncated); + if (r < 0) { + ldout(cct, 10) << "failed to list metadata: " + << cpp_strerror(r) << dendl; + return r; + } + marker = mgr->get_marker(handle); + + if (!keys.empty()) { + ceph_assert(keys.size() == 1); + auto& key = keys.front(); + if (!callback(std::move(key), std::move(marker))) { + return 0; + } + } + } while (truncated); + + if (start_marker.empty()) { + // already listed all keys + return 0; + } + } + + // restart the listing from the beginning (empty marker) + handle = nullptr; + + r = mgr->list_keys_init(section, "", &handle); + if (r < 0) { + ldout(cct, 10) << "failed to restart metadata listing: " + << cpp_strerror(r) << dendl; + return r; + } + ldout(cct, 20) << "restarting metadata listing" << dendl; + + // release the handle when scope exits + auto g = make_scope_guard([=] { mgr->list_keys_complete(handle); }); + do { + // get the next key and marker + r = mgr->list_keys_next(handle, 1, keys, &truncated); + if (r < 0) { + ldout(cct, 10) << "failed to list metadata: " + << cpp_strerror(r) << dendl; + return r; + } + marker = mgr->get_marker(handle); + + if (!keys.empty()) { + ceph_assert(keys.size() == 1); + auto& key = keys.front(); + // stop at original marker + if (marker >= start_marker) { + return 0; + } + if (!callback(std::move(key), std::move(marker))) { + return 0; + } + } + } while (truncated); + + return 0; +} + +/// coroutine wrapper for AsyncMetadataList +class MetadataListCR : public RGWSimpleCoroutine { + RGWAsyncRadosProcessor *const async_rados; + RGWMetadataManager *const mgr; + const std::string& section; + const std::string& start_marker; + MetadataListCallback callback; + RGWAsyncRadosRequest *req{nullptr}; + public: + MetadataListCR(CephContext *cct, RGWAsyncRadosProcessor *async_rados, + RGWMetadataManager *mgr, const std::string& section, + const std::string& start_marker, + const MetadataListCallback& callback) + : RGWSimpleCoroutine(cct), async_rados(async_rados), mgr(mgr), + section(section), start_marker(start_marker), callback(callback) + {} + ~MetadataListCR() override { + request_cleanup(); + } + + int send_request() override { + req = new AsyncMetadataList(cct, this, stack->create_completion_notifier(), + mgr, section, start_marker, callback); + async_rados->queue(req); + return 0; + } + int request_complete() override { + return req->get_ret_status(); + } + void request_cleanup() override { + if (req) { + req->finish(); + req = nullptr; + } + } +}; + +class BucketTrimCR : public RGWCoroutine { + RGWRados *const store; + RGWHTTPManager *const http; + const BucketTrimConfig& config; + BucketTrimObserver *const observer; + const rgw_raw_obj& obj; + ceph::mono_time start_time; + bufferlist notify_replies; + BucketChangeCounter counter; + std::vector buckets; //< buckets selected for trim + BucketTrimStatus status; + RGWObjVersionTracker objv; //< version tracker for trim status object + std::string last_cold_marker; //< position for next trim marker + + static const std::string section; //< metadata section for bucket instances + public: + BucketTrimCR(RGWRados *store, RGWHTTPManager *http, + const BucketTrimConfig& config, BucketTrimObserver *observer, + const rgw_raw_obj& obj) + : RGWCoroutine(store->ctx()), store(store), http(http), config(config), + observer(observer), obj(obj), counter(config.counter_size) + {} + + int operate() override; +}; + +const std::string BucketTrimCR::section{"bucket.instance"}; + +int BucketTrimCR::operate() +{ + reenter(this) { + start_time = ceph::mono_clock::now(); + + if (config.buckets_per_interval) { + // query watch/notify for hot buckets + ldout(cct, 10) << "fetching active bucket counters" << dendl; + set_status("fetching active bucket counters"); + yield { + // request the top bucket counters from each peer gateway + const TrimNotifyType type = NotifyTrimCounters; + TrimCounters::Request request{32}; + bufferlist bl; + encode(type, bl); + encode(request, bl); + call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms, + ¬ify_replies)); + } + if (retcode < 0) { + ldout(cct, 10) << "failed to fetch peer bucket counters" << dendl; + return set_cr_error(retcode); + } + + // select the hottest buckets for trim + retcode = accumulate_peer_counters(notify_replies, counter); + if (retcode < 0) { + ldout(cct, 4) << "failed to correlate peer bucket counters" << dendl; + return set_cr_error(retcode); + } + buckets.reserve(config.buckets_per_interval); + + const int max_count = config.buckets_per_interval - + config.min_cold_buckets_per_interval; + counter.get_highest(max_count, + [this] (const std::string& bucket, int count) { + buckets.push_back(bucket); + }); + } + + if (buckets.size() < config.buckets_per_interval) { + // read BucketTrimStatus for marker position + set_status("reading trim status"); + using ReadStatus = RGWSimpleRadosReadCR; + yield call(new ReadStatus(store->get_async_rados(), store->svc.sysobj, obj, + &status, true, &objv)); + if (retcode < 0) { + ldout(cct, 10) << "failed to read bilog trim status: " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + if (status.marker == "MAX") { + status.marker.clear(); // restart at the beginning + } + ldout(cct, 10) << "listing cold buckets from marker=" + << status.marker << dendl; + + set_status("listing cold buckets for trim"); + yield { + // capture a reference so 'this' remains valid in the callback + auto ref = boost::intrusive_ptr{this}; + // list cold buckets to consider for trim + auto cb = [this, ref] (std::string&& bucket, std::string&& marker) { + // filter out keys that we trimmed recently + if (observer->trimmed_recently(bucket)) { + return true; + } + // filter out active buckets that we've already selected + auto i = std::find(buckets.begin(), buckets.end(), bucket); + if (i != buckets.end()) { + return true; + } + buckets.emplace_back(std::move(bucket)); + // remember the last cold bucket spawned to update the status marker + last_cold_marker = std::move(marker); + // return true if there's room for more + return buckets.size() < config.buckets_per_interval; + }; + + call(new MetadataListCR(cct, store->get_async_rados(), store->meta_mgr, + section, status.marker, cb)); + } + if (retcode < 0) { + ldout(cct, 4) << "failed to list bucket instance metadata: " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + } + + // trim bucket instances with limited concurrency + set_status("trimming buckets"); + ldout(cct, 4) << "collected " << buckets.size() << " buckets for trim" << dendl; + yield call(new BucketTrimInstanceCollectCR(store, http, observer, buckets, + config.concurrent_buckets)); + // ignore errors from individual buckets + + // write updated trim status + if (!last_cold_marker.empty() && status.marker != last_cold_marker) { + set_status("writing updated trim status"); + status.marker = std::move(last_cold_marker); + ldout(cct, 20) << "writing bucket trim marker=" << status.marker << dendl; + using WriteStatus = RGWSimpleRadosWriteCR; + yield call(new WriteStatus(store->get_async_rados(), store->svc.sysobj, obj, + status, &objv)); + if (retcode < 0) { + ldout(cct, 4) << "failed to write updated trim status: " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + } + + // notify peers that trim completed + set_status("trim completed"); + yield { + const TrimNotifyType type = NotifyTrimComplete; + TrimComplete::Request request; + bufferlist bl; + encode(type, bl); + encode(request, bl); + call(new RGWRadosNotifyCR(store, obj, bl, config.notify_timeout_ms, + nullptr)); + } + if (retcode < 0) { + ldout(cct, 10) << "failed to notify peers of trim completion" << dendl; + return set_cr_error(retcode); + } + + ldout(cct, 4) << "bucket index log processing completed in " + << ceph::mono_clock::now() - start_time << dendl; + return set_cr_done(); + } + return 0; +} + +class BucketTrimPollCR : public RGWCoroutine { + RGWRados *const store; + RGWHTTPManager *const http; + const BucketTrimConfig& config; + BucketTrimObserver *const observer; + const rgw_raw_obj& obj; + const std::string name{"trim"}; //< lock name + const std::string cookie; + + public: + BucketTrimPollCR(RGWRados *store, RGWHTTPManager *http, + const BucketTrimConfig& config, + BucketTrimObserver *observer, const rgw_raw_obj& obj) + : RGWCoroutine(store->ctx()), store(store), http(http), + config(config), observer(observer), obj(obj), + cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)) + {} + + int operate() override; +}; + +int BucketTrimPollCR::operate() +{ + reenter(this) { + for (;;) { + set_status("sleeping"); + wait(utime_t{static_cast(config.trim_interval_sec), 0}); + + // prevent others from trimming for our entire wait interval + set_status("acquiring trim lock"); + yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store, + obj, name, cookie, + config.trim_interval_sec)); + if (retcode < 0) { + ldout(cct, 4) << "failed to lock: " << cpp_strerror(retcode) << dendl; + continue; + } + + set_status("trimming"); + yield call(new BucketTrimCR(store, http, config, observer, obj)); + if (retcode < 0) { + // on errors, unlock so other gateways can try + set_status("unlocking"); + yield call(new RGWSimpleRadosUnlockCR(store->get_async_rados(), store, + obj, name, cookie)); + } + } + } + return 0; +} + +/// tracks a bounded list of events with timestamps. old events can be expired, +/// and recent events can be searched by key. expiration depends on events being +/// inserted in temporal order +template +class RecentEventList { + public: + using clock_type = Clock; + using time_point = typename clock_type::time_point; + + RecentEventList(size_t max_size, const ceph::timespan& max_duration) + : events(max_size), max_duration(max_duration) + {} + + /// insert an event at the given point in time. this time must be at least as + /// recent as the last inserted event + void insert(T&& value, const time_point& now) { + // ceph_assert(events.empty() || now >= events.back().time) + events.push_back(Event{std::move(value), now}); + } + + /// performs a linear search for an event matching the given key, whose type + /// U can be any that provides operator==(U, T) + template + bool lookup(const U& key) const { + for (const auto& event : events) { + if (key == event.value) { + return true; + } + } + return false; + } + + /// remove events that are no longer recent compared to the given point in time + void expire_old(const time_point& now) { + const auto expired_before = now - max_duration; + while (!events.empty() && events.front().time < expired_before) { + events.pop_front(); + } + } + + private: + struct Event { + T value; + time_point time; + }; + boost::circular_buffer events; + const ceph::timespan max_duration; +}; + +namespace rgw { + +// read bucket trim configuration from ceph context +void configure_bucket_trim(CephContext *cct, BucketTrimConfig& config) +{ + const auto& conf = cct->_conf; + + config.trim_interval_sec = + conf.get_val("rgw_sync_log_trim_interval"); + config.counter_size = 512; + config.buckets_per_interval = + conf.get_val("rgw_sync_log_trim_max_buckets"); + config.min_cold_buckets_per_interval = + conf.get_val("rgw_sync_log_trim_min_cold_buckets"); + config.concurrent_buckets = + conf.get_val("rgw_sync_log_trim_concurrent_buckets"); + config.notify_timeout_ms = 10000; + config.recent_size = 128; + config.recent_duration = std::chrono::hours(2); +} + +class BucketTrimManager::Impl : public TrimCounters::Server, + public BucketTrimObserver { + public: + RGWRados *const store; + const BucketTrimConfig config; + + const rgw_raw_obj status_obj; + + /// count frequency of bucket instance entries in the data changes log + BucketChangeCounter counter; + + using RecentlyTrimmedBucketList = RecentEventList; + using clock_type = RecentlyTrimmedBucketList::clock_type; + /// track recently trimmed buckets to focus trim activity elsewhere + RecentlyTrimmedBucketList trimmed; + + /// serve the bucket trim watch/notify api + BucketTrimWatcher watcher; + + /// protect data shared between data sync, trim, and watch/notify threads + std::mutex mutex; + + Impl(RGWRados *store, const BucketTrimConfig& config) + : store(store), config(config), + status_obj(store->svc.zone->get_zone_params().log_pool, BucketTrimStatus::oid), + counter(config.counter_size), + trimmed(config.recent_size, config.recent_duration), + watcher(store, status_obj, this) + {} + + /// TrimCounters::Server interface for watch/notify api + void get_bucket_counters(int count, TrimCounters::Vector& buckets) { + buckets.reserve(count); + std::lock_guard lock(mutex); + counter.get_highest(count, [&buckets] (const std::string& key, int count) { + buckets.emplace_back(key, count); + }); + ldout(store->ctx(), 20) << "get_bucket_counters: " << buckets << dendl; + } + + void reset_bucket_counters() override { + ldout(store->ctx(), 20) << "bucket trim completed" << dendl; + std::lock_guard lock(mutex); + counter.clear(); + trimmed.expire_old(clock_type::now()); + } + + /// BucketTrimObserver interface to remember successfully-trimmed buckets + void on_bucket_trimmed(std::string&& bucket_instance) override { + ldout(store->ctx(), 20) << "trimmed bucket instance " << bucket_instance << dendl; + std::lock_guard lock(mutex); + trimmed.insert(std::move(bucket_instance), clock_type::now()); + } + + bool trimmed_recently(const boost::string_view& bucket_instance) override { + std::lock_guard lock(mutex); + return trimmed.lookup(bucket_instance); + } +}; + +BucketTrimManager::BucketTrimManager(RGWRados *store, + const BucketTrimConfig& config) + : impl(new Impl(store, config)) +{ +} +BucketTrimManager::~BucketTrimManager() = default; + +int BucketTrimManager::init() +{ + return impl->watcher.start(); +} + +void BucketTrimManager::on_bucket_changed(const boost::string_view& bucket) +{ + std::lock_guard lock(impl->mutex); + // filter recently trimmed bucket instances out of bucket change counter + if (impl->trimmed.lookup(bucket)) { + return; + } + impl->counter.insert(bucket.to_string()); +} + +RGWCoroutine* BucketTrimManager::create_bucket_trim_cr(RGWHTTPManager *http) +{ + return new BucketTrimPollCR(impl->store, http, impl->config, + impl.get(), impl->status_obj); +} + +RGWCoroutine* BucketTrimManager::create_admin_bucket_trim_cr(RGWHTTPManager *http) +{ + // return the trim coroutine without any polling + return new BucketTrimCR(impl->store, http, impl->config, + impl.get(), impl->status_obj); +} + +} // namespace rgw diff --git a/src/rgw/rgw_trim_bilog.h b/src/rgw/rgw_trim_bilog.h new file mode 100644 index 000000000000..13d1f63a1cf4 --- /dev/null +++ b/src/rgw/rgw_trim_bilog.h @@ -0,0 +1,110 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * Author: Casey Bodley + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#ifndef RGW_SYNC_LOG_TRIM_H +#define RGW_SYNC_LOG_TRIM_H + +#include +#include +#include "include/encoding.h" +#include "common/ceph_time.h" + +class CephContext; +class RGWCoroutine; +class RGWHTTPManager; +class RGWRados; + +namespace rgw { + +/// Interface to inform the trim process about which buckets are most active +struct BucketChangeObserver { + virtual ~BucketChangeObserver() = default; + + virtual void on_bucket_changed(const boost::string_view& bucket_instance) = 0; +}; + +/// Configuration for BucketTrimManager +struct BucketTrimConfig { + /// time interval in seconds between bucket trim attempts + uint32_t trim_interval_sec{0}; + /// maximum number of buckets to track with BucketChangeObserver + size_t counter_size{0}; + /// maximum number of buckets to process each trim interval + uint32_t buckets_per_interval{0}; + /// minimum number of buckets to choose from the global bucket instance list + uint32_t min_cold_buckets_per_interval{0}; + /// maximum number of buckets to process in parallel + uint32_t concurrent_buckets{0}; + /// timeout in ms for bucket trim notify replies + uint64_t notify_timeout_ms{0}; + /// maximum number of recently trimmed buckets to remember (should be small + /// enough for a linear search) + size_t recent_size{0}; + /// maximum duration to consider a trim as 'recent' (should be some multiple + /// of the trim interval, at least) + ceph::timespan recent_duration{0}; +}; + +/// fill out the BucketTrimConfig from the ceph context +void configure_bucket_trim(CephContext *cct, BucketTrimConfig& config); + +/// Determines the buckets on which to focus trim activity, using two sources of +/// input: the frequency of entries read from the data changes log, and a global +/// listing of the bucket.instance metadata. This allows us to trim active +/// buckets quickly, while also ensuring that all buckets will eventually trim +class BucketTrimManager : public BucketChangeObserver { + class Impl; + std::unique_ptr impl; + public: + BucketTrimManager(RGWRados *store, const BucketTrimConfig& config); + ~BucketTrimManager(); + + int init(); + + /// increment a counter for the given bucket instance + void on_bucket_changed(const boost::string_view& bucket_instance) override; + + /// create a coroutine to run the bucket trim process every trim interval + RGWCoroutine* create_bucket_trim_cr(RGWHTTPManager *http); + + /// create a coroutine to trim buckets directly via radosgw-admin + RGWCoroutine* create_admin_bucket_trim_cr(RGWHTTPManager *http); +}; + +/// provides persistent storage for the trim manager's current position in the +/// list of bucket instance metadata +struct BucketTrimStatus { + std::string marker; //< metadata key of current bucket instance + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(marker, bl); + ENCODE_FINISH(bl); + } + void decode(bufferlist::const_iterator& p) { + DECODE_START(1, p); + decode(marker, p); + DECODE_FINISH(p); + } + + static const std::string oid; +}; + +} // namespace rgw + +WRITE_CLASS_ENCODER(rgw::BucketTrimStatus); + +#endif // RGW_SYNC_LOG_TRIM_H