return 0;
}
-int RGWReshard::remove(cls_rgw_reshard_entry& entry)
+int RGWReshard::remove(const cls_rgw_reshard_entry& entry)
{
string logshard_oid;
}
}
+int RGWReshard::process_entry(const cls_rgw_reshard_entry& entry,
+ int max_entries, const DoutPrefixProvider *dpp)
+{
+ ldout(store->ctx(), 20) << __func__ << " resharding " <<
+ entry.bucket_name << dendl;
+
+ rgw_bucket bucket;
+ RGWBucketInfo bucket_info;
+
+ int ret = store->getRados()->get_bucket_info(store->svc(),
+ entry.tenant, entry.bucket_name,
+ bucket_info, nullptr,
+ null_yield, dpp, nullptr);
+ if (ret < 0 || bucket_info.bucket.bucket_id != entry.bucket_id) {
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ <<
+ ": Error in get_bucket_info for bucket " << entry.bucket_name <<
+ ": " << cpp_strerror(-ret) << dendl;
+ if (ret != -ENOENT) {
+ // any error other than ENOENT will abort
+ return ret;
+ }
+ } else {
+ ldpp_dout(dpp, 0) << __func__ <<
+ ": Bucket: " << entry.bucket_name <<
+ " already resharded by someone, skipping " << dendl;
+ }
+
+ // we've encountered a reshard queue entry for an apparently
+ // non-existent bucket; let's try to recover by cleaning up
+ ldpp_dout(dpp, 0) << __func__ <<
+ ": removing reshard queue entry for a resharded or non-existent bucket" <<
+ entry.bucket_name << dendl;
+
+ ret = remove(entry);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ <<
+ ": Error removing non-existent bucket " <<
+ entry.bucket_name << " from resharding queue: " <<
+ cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+
+ // we cleaned up, move on to the next entry
+ return 0;
+ }
+
+ RGWBucketReshard br(store, bucket_info, nullptr);
+
+ ReshardFaultInjector f; // no fault injected
+ ret = br.execute(entry.new_num_shards, f, max_entries, dpp,
+ false, nullptr, nullptr, this);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ <<
+ ": Error during resharding bucket " << entry.bucket_name << ":" <<
+ cpp_strerror(-ret)<< dendl;
+ return ret;
+ }
+
+ ldpp_dout(dpp, 20) << __func__ <<
+ " removing reshard queue entry for bucket " << entry.bucket_name <<
+ dendl;
+
+ ret = remove(entry);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << __func__ << ": Error removing bucket " <<
+ entry.bucket_name << " from resharding queue: " <<
+ cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+ return 0;
+}
+
int RGWReshard::process_single_logshard(int logshard_num, const DoutPrefixProvider *dpp)
{
string marker;
}
for(auto& entry: entries) { // logshard entries
-
- ldout(store->ctx(), 20) << __func__ << " resharding " <<
- entry.bucket_name << dendl;
-
- rgw_bucket bucket;
- RGWBucketInfo bucket_info;
-
- ret = store->getRados()->get_bucket_info(store->svc(),
- entry.tenant, entry.bucket_name,
- bucket_info, nullptr,
- null_yield, dpp, nullptr);
- if (ret < 0 || bucket_info.bucket.bucket_id != entry.bucket_id) {
- if (ret < 0) {
- ldout(cct, 0) << __func__ <<
- ": Error in get_bucket_info for bucket " << entry.bucket_name <<
- ": " << cpp_strerror(-ret) << dendl;
- if (ret != -ENOENT) {
- // any error other than ENOENT will abort
- return ret;
- }
- } else {
- ldout(cct,0) << __func__ <<
- ": Bucket: " << entry.bucket_name <<
- " already resharded by someone, skipping " << dendl;
- }
-
- // we've encountered a reshard queue entry for an apparently
- // non-existent bucket; let's try to recover by cleaning up
- ldout(cct, 0) << __func__ <<
- ": removing reshard queue entry for a resharded or non-existent bucket" <<
- entry.bucket_name << dendl;
-
- ret = remove(entry);
- if (ret < 0) {
- ldout(cct, 0) << __func__ <<
- ": Error removing non-existent bucket " <<
- entry.bucket_name << " from resharding queue: " <<
- cpp_strerror(-ret) << dendl;
- return ret;
- }
-
- // we cleaned up, move on to the next entry
- goto finished_entry;
- }
-
- {
- RGWBucketReshard br(store, bucket_info, nullptr);
-
- ReshardFaultInjector f;
- ret = br.execute(entry.new_num_shards, f, max_entries, dpp,
- false, nullptr, nullptr, this);
- if (ret < 0) {
- ldout(store->ctx(), 0) << __func__ <<
- ": Error during resharding bucket " << entry.bucket_name << ":" <<
- cpp_strerror(-ret)<< dendl;
- return ret;
- }
-
- ldout(store->ctx(), 20) << __func__ <<
- " removing reshard queue entry for bucket " << entry.bucket_name <<
- dendl;
-
- ret = remove(entry);
- if (ret < 0) {
- ldout(cct, 0) << __func__ << ": Error removing bucket " <<
- entry.bucket_name << " from resharding queue: " <<
- cpp_strerror(-ret) << dendl;
- return ret;
+ process_entry(entry, max_entries, dpp);
+ if (ret < 0) {
+ return ret;
}
- }
-
- finished_entry:
Clock::time_point now = Clock::now();
if (logshard_lock.should_renew(now)) {
- ret = logshard_lock.renew(now);
- if (ret < 0) {
- return ret;
- }
+ ret = logshard_lock.renew(now);
+ if (ret < 0) {
+ return ret;
+ }
}
entry.get_key(&marker);