#include "rgw_trim_bilog.h"
#include "rgw_cr_rados.h"
#include "rgw_cr_rest.h"
+#include "rgw_cr_tools.h"
#include "rgw_data_sync.h"
#include "rgw_metadata.h"
#include "rgw_sal.h"
RGWHTTPManager *const http;
BucketTrimObserver *const observer;
std::string bucket_instance;
+ rgw_bucket_get_sync_policy_params get_policy_params;
+ std::shared_ptr<rgw_bucket_get_sync_policy_result> source_policy;
rgw_bucket bucket;
const std::string& zone_id; //< my zone id
- RGWBucketInfo bucket_info; //< bucket instance info to locate bucket indices
+ RGWBucketInfo _bucket_info;
+ const RGWBucketInfo *pbucket_info; //< pointer to bucket instance info to locate bucket indices
int child_ret = 0;
using StatusShards = std::vector<rgw_bucket_shard_sync_info>;
: RGWCoroutine(store->ctx()), store(store),
http(http), observer(observer),
bucket_instance(bucket_instance),
- zone_id(store->svc()->zone->get_zone().id),
- peer_status(store->svc()->zone->get_zone_data_notify_to_map().size()) {
+ zone_id(store->svc()->zone->get_zone().id) {
rgw_bucket_parse_bucket_key(cct, bucket_instance, &bucket, nullptr);
+ source_policy = make_shared<rgw_bucket_get_sync_policy_result>();
}
int operate() override;
reenter(this) {
ldout(cct, 4) << "starting trim on bucket=" << bucket_instance << dendl;
+ get_policy_params.zone = zone_id;
+ get_policy_params.bucket = bucket;
+ yield call(new RGWBucketGetSyncPolicyHandlerCR(store->svc()->rados->get_async_processor(),
+ store,
+ get_policy_params,
+ source_policy));
+ if (retcode < 0) {
+ if (retcode != -ENOENT) {
+ ldout(cct, 0) << "ERROR: failed to fetch policy handler for bucket=" << bucket << dendl;
+ }
+
+ return set_cr_error(retcode);
+ }
+
+ if (auto& opt_bucket_info = source_policy->policy_handler->get_bucket_info();
+ opt_bucket_info) {
+ pbucket_info = &(*opt_bucket_info);
+ } else {
+ /* this shouldn't really happen */
+ return set_cr_error(-ENOENT);
+ }
+
// query peers for sync status
- set_status("fetching sync status from peers");
+ set_status("fetching sync status from relevant peers");
yield {
// query data sync status from each sync peer
rgw_http_param_pair params[] = {
{ nullptr, nullptr }
};
+ const auto& all_dests = source_policy->policy_handler->get_all_dests();
+
+ set<rgw_zone_id> target_zones;
+ rgw_zone_id last_zone;
+ for (const auto& entry : all_dests) {
+ if (entry.first != last_zone) {
+ last_zone = entry.first;
+ target_zones.insert(last_zone);
+ }
+ }
+
+ peer_status.resize(target_zones.size());
+
+ auto& zone_conn_map = store->svc()->zone->get_zone_conn_map();
+
auto p = peer_status.begin();
- for (auto& c : store->svc()->zone->get_zone_data_notify_to_map()) {
+ for (auto& zid : target_zones) {
+ auto ziter = zone_conn_map.find(zid);
+ if (ziter == zone_conn_map.end()) {
+ ldout(cct, 0) << "WARNING: no connection to zone " << zid << ", can't trim bucket: " << bucket << dendl;
+ return set_cr_error(-ECANCELED);
+ }
using StatusCR = RGWReadRESTResourceCR<StatusShards>;
- spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
+ spawn(new StatusCR(cct, ziter->second, http, "/admin/log/", params, &*p),
false);
++p;
}
- // in parallel, read the local bucket instance info
- spawn(new RGWGetBucketInstanceInfoCR(store->svc()->rados->get_async_processor(), store,
- bucket, &bucket_info, nullptr),
- false);
}
// wait for a response from each peer. all must respond to attempt trim
while (num_spawned()) {
// initialize each shard with the maximum marker, which is only used when
// there are no peers syncing from us
- min_markers.assign(std::max(1u, bucket_info.num_shards),
+ min_markers.assign(std::max(1u, pbucket_info->num_shards),
RGWSyncLogTrimCR::max_marker);
// determine the minimum marker for each shard
}
// trim shards with a ShardCollectCR
- ldout(cct, 10) << "trimming bilogs for bucket=" << bucket_info.bucket
+ ldout(cct, 10) << "trimming bilogs for bucket=" << pbucket_info->bucket
<< " markers=" << min_markers << ", shards=" << min_markers.size() << dendl;
set_status("trimming bilog shards");
- yield call(new BucketTrimShardCollectCR(store, bucket_info, min_markers));
+ yield call(new BucketTrimShardCollectCR(store, *pbucket_info, min_markers));
// ENODATA just means there were no keys to trim
if (retcode == -ENODATA) {
retcode = 0;