From 8f3616c62d9dc1a2a412686898d580046f4773f6 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 2 Feb 2021 12:51:14 -0500 Subject: [PATCH] rgw: refactor per-entry reshard logic into separate function this cuts down on nesting and avoids the need for goto Signed-off-by: Casey Bodley --- src/rgw/rgw_reshard.cc | 157 +++++++++++++++++++++-------------------- src/rgw/rgw_reshard.h | 4 +- 2 files changed, 84 insertions(+), 77 deletions(-) diff --git a/src/rgw/rgw_reshard.cc b/src/rgw/rgw_reshard.cc index 6b313fa3ddda5..9281f6d5054c3 100644 --- a/src/rgw/rgw_reshard.cc +++ b/src/rgw/rgw_reshard.cc @@ -888,7 +888,7 @@ int RGWReshard::get(cls_rgw_reshard_entry& entry) return 0; } -int RGWReshard::remove(cls_rgw_reshard_entry& entry) +int RGWReshard::remove(const cls_rgw_reshard_entry& entry) { string logshard_oid; @@ -963,6 +963,79 @@ void RGWReshardWait::stop() } } +int RGWReshard::process_entry(const cls_rgw_reshard_entry& entry, + int max_entries, const DoutPrefixProvider *dpp) +{ + ldout(store->ctx(), 20) << __func__ << " resharding " << + entry.bucket_name << dendl; + + rgw_bucket bucket; + RGWBucketInfo bucket_info; + + int ret = store->getRados()->get_bucket_info(store->svc(), + entry.tenant, entry.bucket_name, + bucket_info, nullptr, + null_yield, dpp, nullptr); + if (ret < 0 || bucket_info.bucket.bucket_id != entry.bucket_id) { + if (ret < 0) { + ldpp_dout(dpp, 0) << __func__ << + ": Error in get_bucket_info for bucket " << entry.bucket_name << + ": " << cpp_strerror(-ret) << dendl; + if (ret != -ENOENT) { + // any error other than ENOENT will abort + return ret; + } + } else { + ldpp_dout(dpp, 0) << __func__ << + ": Bucket: " << entry.bucket_name << + " already resharded by someone, skipping " << dendl; + } + + // we've encountered a reshard queue entry for an apparently + // non-existent bucket; let's try to recover by cleaning up + ldpp_dout(dpp, 0) << __func__ << + ": removing reshard queue entry for a resharded or non-existent bucket" << + entry.bucket_name << dendl; + + ret = remove(entry); + if (ret < 0) { + ldpp_dout(dpp, 0) << __func__ << + ": Error removing non-existent bucket " << + entry.bucket_name << " from resharding queue: " << + cpp_strerror(-ret) << dendl; + return ret; + } + + // we cleaned up, move on to the next entry + return 0; + } + + RGWBucketReshard br(store, bucket_info, nullptr); + + ReshardFaultInjector f; // no fault injected + ret = br.execute(entry.new_num_shards, f, max_entries, dpp, + false, nullptr, nullptr, this); + if (ret < 0) { + ldpp_dout(dpp, 0) << __func__ << + ": Error during resharding bucket " << entry.bucket_name << ":" << + cpp_strerror(-ret)<< dendl; + return ret; + } + + ldpp_dout(dpp, 20) << __func__ << + " removing reshard queue entry for bucket " << entry.bucket_name << + dendl; + + ret = remove(entry); + if (ret < 0) { + ldpp_dout(dpp, 0) << __func__ << ": Error removing bucket " << + entry.bucket_name << " from resharding queue: " << + cpp_strerror(-ret) << dendl; + return ret; + } + return 0; +} + int RGWReshard::process_single_logshard(int logshard_num, const DoutPrefixProvider *dpp) { string marker; @@ -993,85 +1066,17 @@ int RGWReshard::process_single_logshard(int logshard_num, const DoutPrefixProvid } for(auto& entry: entries) { // logshard entries - - ldout(store->ctx(), 20) << __func__ << " resharding " << - entry.bucket_name << dendl; - - rgw_bucket bucket; - RGWBucketInfo bucket_info; - - ret = store->getRados()->get_bucket_info(store->svc(), - entry.tenant, entry.bucket_name, - bucket_info, nullptr, - null_yield, dpp, nullptr); - if (ret < 0 || bucket_info.bucket.bucket_id != entry.bucket_id) { - if (ret < 0) { - ldout(cct, 0) << __func__ << - ": Error in get_bucket_info for bucket " << entry.bucket_name << - ": " << cpp_strerror(-ret) << dendl; - if (ret != -ENOENT) { - // any error other than ENOENT will abort - return ret; - } - } else { - ldout(cct,0) << __func__ << - ": Bucket: " << entry.bucket_name << - " already resharded by someone, skipping " << dendl; - } - - // we've encountered a reshard queue entry for an apparently - // non-existent bucket; let's try to recover by cleaning up - ldout(cct, 0) << __func__ << - ": removing reshard queue entry for a resharded or non-existent bucket" << - entry.bucket_name << dendl; - - ret = remove(entry); - if (ret < 0) { - ldout(cct, 0) << __func__ << - ": Error removing non-existent bucket " << - entry.bucket_name << " from resharding queue: " << - cpp_strerror(-ret) << dendl; - return ret; - } - - // we cleaned up, move on to the next entry - goto finished_entry; - } - - { - RGWBucketReshard br(store, bucket_info, nullptr); - - ReshardFaultInjector f; - ret = br.execute(entry.new_num_shards, f, max_entries, dpp, - false, nullptr, nullptr, this); - if (ret < 0) { - ldout(store->ctx(), 0) << __func__ << - ": Error during resharding bucket " << entry.bucket_name << ":" << - cpp_strerror(-ret)<< dendl; - return ret; - } - - ldout(store->ctx(), 20) << __func__ << - " removing reshard queue entry for bucket " << entry.bucket_name << - dendl; - - ret = remove(entry); - if (ret < 0) { - ldout(cct, 0) << __func__ << ": Error removing bucket " << - entry.bucket_name << " from resharding queue: " << - cpp_strerror(-ret) << dendl; - return ret; + process_entry(entry, max_entries, dpp); + if (ret < 0) { + return ret; } - } - - finished_entry: Clock::time_point now = Clock::now(); if (logshard_lock.should_renew(now)) { - ret = logshard_lock.renew(now); - if (ret < 0) { - return ret; - } + ret = logshard_lock.renew(now); + if (ret < 0) { + return ret; + } } entry.get_key(&marker); diff --git a/src/rgw/rgw_reshard.h b/src/rgw/rgw_reshard.h index cc47046390654..2d8a8b6acc676 100644 --- a/src/rgw/rgw_reshard.h +++ b/src/rgw/rgw_reshard.h @@ -213,11 +213,13 @@ public: int add(cls_rgw_reshard_entry& entry); int update(const RGWBucketInfo& bucket_info); int get(cls_rgw_reshard_entry& entry); - int remove(cls_rgw_reshard_entry& entry); + int remove(const cls_rgw_reshard_entry& entry); int list(int logshard_num, string& marker, uint32_t max, std::list& entries, bool *is_truncated); int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry); /* reshard thread */ + int process_entry(const cls_rgw_reshard_entry& entry, int max_entries, + const DoutPrefixProvider *dpp); int process_single_logshard(int logshard_num, const DoutPrefixProvider *dpp); int process_all_logshards(const DoutPrefixProvider *dpp); bool going_down(); -- 2.39.5