reshard_lock.set_duration(lock_duration);
}
+RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, const map<string, bufferlist>& _bucket_attrs, rados::cls::lock::Lock& _reshard_lock, const utime_t& _lock_start_time) :
+ store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs),
+ reshard_lock(_reshard_lock), lock_start_time(_lock_start_time), locked_bucket(true)
+{
+ const rgw_bucket& b = bucket_info.bucket;
+ reshard_oid = b.tenant + (b.tenant.empty() ? "" : ":") + b.name + ":" + b.bucket_id;
+}
+
int RGWBucketReshard::lock_bucket()
{
int ret = reshard_lock.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
CephContext *cct = store->ctx();
int max_entries = 1000;
- int max_secs = 60;
+ int max_secs = store->ctx()->_conf->rgw_reshard_bucket_lock_duration;
rados::cls::lock::Lock l(reshard_lock_name);
return -ret;
}
- RGWBucketReshard br(store, bucket_info, attrs);
+ RGWBucketReshard br(store, bucket_info, attrs, l, lock_start_time);
Formatter* formatter = new JSONFormatter(false);
auto formatter_ptr = std::unique_ptr<Formatter>(formatter);
<< cpp_strerror(-ret) << dendl;
return ret;
}
+ ret = br.renew_lock_bucket();
+ if (ret < 0) {
+ ldout(cct, 0)<< __func__ << ":Error renewing bucket " << entry.bucket_name << " lock: "
+ << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
}
utime_t now = ceph_clock_now();
RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info,
const std::map<string, bufferlist>& _bucket_attrs);
+ RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info,
+ const std::map<string, bufferlist>& _bucket_attrs,
+ rados::cls::lock::Lock& reshard_lock, const utime_t& lock_start_time);
+
int execute(int num_shards, int max_op_entries,
bool verbose = false, ostream *out = nullptr,
Formatter *formatter = nullptr,