RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, const map<string, bufferlist>& _bucket_attrs) :
store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs),
- reshard_lock(reshard_lock_name) {
+ reshard_lock(reshard_lock_name), locked_bucket(false) {
const rgw_bucket& b = bucket_info.bucket;
reshard_oid = b.tenant + (b.tenant.empty() ? "" : ":") + b.name + ":" + b.bucket_id;
ldout(store->ctx(), 0) << "RGWReshard::add failed to acquire lock on " << reshard_oid << " ret=" << ret << dendl;
return ret;
}
+ lock_start_time = ceph_clock_now();
+ locked_bucket = true;
return 0;
}
if (ret < 0) {
ldout(store->ctx(), 0) << "WARNING: RGWReshard::add failed to drop lock on " << reshard_oid << " ret=" << ret << dendl;
}
+ locked_bucket = false;
+}
+
+
+int RGWBucketReshard::renew_lock_bucket()
+{
+ if (!locked_bucket) {
+ return 0;
+ }
+
+ utime_t now = ceph_clock_now();
+ /* do you need to renew lock? */
+ if (now > lock_start_time + store->ctx()->_conf->rgw_reshard_bucket_lock_duration/ 2) {
+ reshard_lock.set_renew(true);
+ int ret = reshard_lock.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
+ if (ret == -EBUSY) { /* already locked by another processor */
+ ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << reshard_oid << dendl;
+ return ret;
+ }
+ lock_start_time = now;
+ }
+ return 0;
}
int RGWBucketReshard::set_resharding_status(const string& new_instance_id, int32_t num_shards, cls_rgw_reshard_status status)
(*out) << " " << total_entries;
}
}
+ ret = renew_lock_bucket();
+ if (ret < 0) {
+ lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl;
+ return -ret;
+ }
+ ret = renew_lock_bucket();
+ if (ret < 0) {
+ lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl;
+ return -ret;
+ }
}
}
+
if (verbose) {
formatter->close_section();
if (out) {