From: Casey Bodley Date: Thu, 7 Sep 2017 20:27:53 +0000 (-0400) Subject: rgw: add BucketTrimInstanceCR X-Git-Tag: v13.0.1~210^2~12 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=35cf54e1059bd0a46e99fecc191c117f656c7b02;p=ceph.git rgw: add BucketTrimInstanceCR fetches bucket sync status from each peer, calculates the min markers for each shard, and trims the bilog shards. calls the TrimObserver on success Fixes: http://tracker.ceph.com/issues/18229 Signed-off-by: Casey Bodley --- diff --git a/src/rgw/rgw_sync_log_trim.cc b/src/rgw/rgw_sync_log_trim.cc index 74acfe4d3e49..660db0089e1f 100644 --- a/src/rgw/rgw_sync_log_trim.cc +++ b/src/rgw/rgw_sync_log_trim.cc @@ -21,6 +21,8 @@ #include "common/errno.h" #include "rgw_sync_log_trim.h" #include "rgw_cr_rados.h" +#include "rgw_cr_rest.h" +#include "rgw_data_sync.h" #include "rgw_metadata.h" #include "rgw_rados.h" #include "rgw_sync.h" @@ -280,12 +282,86 @@ struct BucketTrimObserver { virtual bool trimmed_recently(const boost::string_view& bucket_instance) = 0; }; +/// populate the status with the minimum stable marker of each shard +template +int take_min_status(CephContext *cct, Iter first, Iter last, + std::vector *status) +{ + status->clear(); + boost::optional num_shards; + for (auto peer = first; peer != last; ++peer) { + const size_t peer_shards = peer->size(); + if (!num_shards) { + num_shards = peer_shards; + status->resize(*num_shards); + } else if (*num_shards != peer_shards) { + // all peers must agree on the number of shards + return -EINVAL; + } + auto m = status->begin(); + for (auto& shard : *peer) { + auto& marker = *m++; + // only consider incremental sync markers + if (shard.state != rgw_bucket_shard_sync_info::StateIncrementalSync) { + continue; + } + // always take the first marker, or any later marker that's smaller + if (peer == first || marker > shard.inc_marker.position) { + marker = std::move(shard.inc_marker.position); + } + } + } + return 0; +} + +/// trim each bilog shard to the given marker, while limiting the number of +/// concurrent requests +class BucketTrimShardCollectCR : public RGWShardCollectCR { + static constexpr int MAX_CONCURRENT_SHARDS = 16; + RGWRados *const store; + const RGWBucketInfo& bucket_info; + const std::vector& markers; //< shard markers to trim + size_t i{0}; //< index of current shard marker + public: + BucketTrimShardCollectCR(RGWRados *store, const RGWBucketInfo& bucket_info, + const std::vector& markers) + : RGWShardCollectCR(store->ctx(), MAX_CONCURRENT_SHARDS), + store(store), bucket_info(bucket_info), markers(markers) + {} + bool spawn_next() override; +}; + +bool BucketTrimShardCollectCR::spawn_next() +{ + while (i < markers.size()) { + const auto& marker = markers[i]; + const auto shard_id = i++; + + // skip empty markers + if (!marker.empty()) { + ldout(cct, 10) << "trimming bilog shard " << shard_id + << " of " << bucket_info.bucket << " at marker " << marker << dendl; + spawn(new RGWRadosBILogTrimCR(store, bucket_info, shard_id, + std::string{}, marker), + false); + return true; + } + } + return false; +} + /// trim the bilog of all of the given bucket instance's shards class BucketTrimInstanceCR : public RGWCoroutine { RGWRados *const store; RGWHTTPManager *const http; BucketTrimObserver *const observer; std::string bucket_instance; + const std::string& zone_id; //< my zone id + RGWBucketInfo bucket_info; //< bucket instance info to locate bucket indices + + using StatusShards = std::vector; + std::vector peer_status; //< sync status for each peer + std::vector min_markers; //< min marker per shard public: BucketTrimInstanceCR(RGWRados *store, RGWHTTPManager *http, @@ -293,13 +369,82 @@ class BucketTrimInstanceCR : public RGWCoroutine { const std::string& bucket_instance) : RGWCoroutine(store->ctx()), store(store), http(http), observer(observer), - bucket_instance(bucket_instance) + bucket_instance(bucket_instance), + zone_id(store->get_zone().id), + peer_status(store->zone_conn_map.size()) {} - int operate() { + + int operate() override; +}; + +int BucketTrimInstanceCR::operate() +{ + reenter(this) { + ldout(cct, 4) << "starting trim on bucket=" << bucket_instance << dendl; + + // query peers for sync status + set_status("fetching sync status from peers"); + yield { + // query data sync status from each sync peer + rgw_http_param_pair params[] = { + { "type", "bucket-index" }, + { "status", nullptr }, + { "bucket", bucket_instance.c_str() }, + { "source-zone", zone_id.c_str() }, + { nullptr, nullptr } + }; + + auto p = peer_status.begin(); + for (auto& c : store->zone_conn_map) { + using StatusCR = RGWReadRESTResourceCR; + spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p), + false); + ++p; + } + // in parallel, read the local bucket instance info + spawn(new RGWGetBucketInstanceInfoCR(store->get_async_rados(), store, + bucket_instance, &bucket_info), + false); + } + // wait for a response from each peer. all must respond to attempt trim + while (num_spawned()) { + int child_ret; + yield wait_for_child(); + collect(&child_ret, nullptr); + if (child_ret < 0) { + drain_all(); + return set_cr_error(child_ret); + } + } + + // determine the minimum marker for each shard + retcode = take_min_status(cct, peer_status.begin(), peer_status.end(), + &min_markers); + if (retcode < 0) { + ldout(cct, 4) << "failed to correlate bucket sync status from peers" << dendl; + return set_cr_error(retcode); + } + + // trim shards with a ShardCollectCR + ldout(cct, 10) << "trimming bilogs for bucket=" << bucket_info.bucket + << " markers=" << min_markers << ", shards=" << min_markers.size() << dendl; + set_status("trimming bilog shards"); + yield call(new BucketTrimShardCollectCR(store, bucket_info, min_markers)); + // ENODATA just means there were no keys to trim + if (retcode == -ENODATA) { + retcode = 0; + } + if (retcode < 0) { + ldout(cct, 4) << "failed to trim bilog shards: " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + observer->on_bucket_trimmed(std::move(bucket_instance)); return set_cr_done(); } -}; + return 0; +} /// trim each bucket instance while limiting the number of concurrent operations class BucketTrimInstanceCollectCR : public RGWShardCollectCR {