#include "common/errno.h"
#include "rgw_sync_log_trim.h"
#include "rgw_cr_rados.h"
+#include "rgw_cr_rest.h"
+#include "rgw_data_sync.h"
#include "rgw_metadata.h"
#include "rgw_rados.h"
#include "rgw_sync.h"
virtual bool trimmed_recently(const boost::string_view& bucket_instance) = 0;
};
+/// populate the status with the minimum stable marker of each shard
+template <typename Iter>
+int take_min_status(CephContext *cct, Iter first, Iter last,
+ std::vector<std::string> *status)
+{
+ status->clear();
+ boost::optional<size_t> num_shards;
+ for (auto peer = first; peer != last; ++peer) {
+ const size_t peer_shards = peer->size();
+ if (!num_shards) {
+ num_shards = peer_shards;
+ status->resize(*num_shards);
+ } else if (*num_shards != peer_shards) {
+ // all peers must agree on the number of shards
+ return -EINVAL;
+ }
+ auto m = status->begin();
+ for (auto& shard : *peer) {
+ auto& marker = *m++;
+ // only consider incremental sync markers
+ if (shard.state != rgw_bucket_shard_sync_info::StateIncrementalSync) {
+ continue;
+ }
+ // always take the first marker, or any later marker that's smaller
+ if (peer == first || marker > shard.inc_marker.position) {
+ marker = std::move(shard.inc_marker.position);
+ }
+ }
+ }
+ return 0;
+}
+
+/// trim each bilog shard to the given marker, while limiting the number of
+/// concurrent requests
+class BucketTrimShardCollectCR : public RGWShardCollectCR {
+ static constexpr int MAX_CONCURRENT_SHARDS = 16;
+ RGWRados *const store;
+ const RGWBucketInfo& bucket_info;
+ const std::vector<std::string>& markers; //< shard markers to trim
+ size_t i{0}; //< index of current shard marker
+ public:
+ BucketTrimShardCollectCR(RGWRados *store, const RGWBucketInfo& bucket_info,
+ const std::vector<std::string>& markers)
+ : RGWShardCollectCR(store->ctx(), MAX_CONCURRENT_SHARDS),
+ store(store), bucket_info(bucket_info), markers(markers)
+ {}
+ bool spawn_next() override;
+};
+
+bool BucketTrimShardCollectCR::spawn_next()
+{
+ while (i < markers.size()) {
+ const auto& marker = markers[i];
+ const auto shard_id = i++;
+
+ // skip empty markers
+ if (!marker.empty()) {
+ ldout(cct, 10) << "trimming bilog shard " << shard_id
+ << " of " << bucket_info.bucket << " at marker " << marker << dendl;
+ spawn(new RGWRadosBILogTrimCR(store, bucket_info, shard_id,
+ std::string{}, marker),
+ false);
+ return true;
+ }
+ }
+ return false;
+}
+
/// trim the bilog of all of the given bucket instance's shards
class BucketTrimInstanceCR : public RGWCoroutine {
RGWRados *const store;
RGWHTTPManager *const http;
BucketTrimObserver *const observer;
std::string bucket_instance;
+ const std::string& zone_id; //< my zone id
+ RGWBucketInfo bucket_info; //< bucket instance info to locate bucket indices
+
+ using StatusShards = std::vector<rgw_bucket_shard_sync_info>;
+ std::vector<StatusShards> peer_status; //< sync status for each peer
+ std::vector<std::string> min_markers; //< min marker per shard
public:
BucketTrimInstanceCR(RGWRados *store, RGWHTTPManager *http,
const std::string& bucket_instance)
: RGWCoroutine(store->ctx()), store(store),
http(http), observer(observer),
- bucket_instance(bucket_instance)
+ bucket_instance(bucket_instance),
+ zone_id(store->get_zone().id),
+ peer_status(store->zone_conn_map.size())
{}
- int operate() {
+
+ int operate() override;
+};
+
+int BucketTrimInstanceCR::operate()
+{
+ reenter(this) {
+ ldout(cct, 4) << "starting trim on bucket=" << bucket_instance << dendl;
+
+ // query peers for sync status
+ set_status("fetching sync status from peers");
+ yield {
+ // query data sync status from each sync peer
+ rgw_http_param_pair params[] = {
+ { "type", "bucket-index" },
+ { "status", nullptr },
+ { "bucket", bucket_instance.c_str() },
+ { "source-zone", zone_id.c_str() },
+ { nullptr, nullptr }
+ };
+
+ auto p = peer_status.begin();
+ for (auto& c : store->zone_conn_map) {
+ using StatusCR = RGWReadRESTResourceCR<StatusShards>;
+ spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
+ false);
+ ++p;
+ }
+ // in parallel, read the local bucket instance info
+ spawn(new RGWGetBucketInstanceInfoCR(store->get_async_rados(), store,
+ bucket_instance, &bucket_info),
+ false);
+ }
+ // wait for a response from each peer. all must respond to attempt trim
+ while (num_spawned()) {
+ int child_ret;
+ yield wait_for_child();
+ collect(&child_ret, nullptr);
+ if (child_ret < 0) {
+ drain_all();
+ return set_cr_error(child_ret);
+ }
+ }
+
+ // determine the minimum marker for each shard
+ retcode = take_min_status(cct, peer_status.begin(), peer_status.end(),
+ &min_markers);
+ if (retcode < 0) {
+ ldout(cct, 4) << "failed to correlate bucket sync status from peers" << dendl;
+ return set_cr_error(retcode);
+ }
+
+ // trim shards with a ShardCollectCR
+ ldout(cct, 10) << "trimming bilogs for bucket=" << bucket_info.bucket
+ << " markers=" << min_markers << ", shards=" << min_markers.size() << dendl;
+ set_status("trimming bilog shards");
+ yield call(new BucketTrimShardCollectCR(store, bucket_info, min_markers));
+ // ENODATA just means there were no keys to trim
+ if (retcode == -ENODATA) {
+ retcode = 0;
+ }
+ if (retcode < 0) {
+ ldout(cct, 4) << "failed to trim bilog shards: "
+ << cpp_strerror(retcode) << dendl;
+ return set_cr_error(retcode);
+ }
+
observer->on_bucket_trimmed(std::move(bucket_instance));
return set_cr_done();
}
-};
+ return 0;
+}
/// trim each bucket instance while limiting the number of concurrent operations
class BucketTrimInstanceCollectCR : public RGWShardCollectCR {