#define COOKIE_LEN 16
char cookie_buf[COOKIE_LEN + 1];
gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
+ cookie_buf[COOKIE_LEN] = '\0';
reshard_lock.set_cookie(cookie_buf);
reshard_lock.set_duration(lock_duration);
utime_t time(max_secs, 0);
l.set_duration(time);
+ char cookie_buf[COOKIE_LEN + 1];
+ gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
+ cookie_buf[COOKIE_LEN] = '\0';
+
+ l.set_cookie(cookie_buf);
+
string logshard_oid;
get_logshard_oid(logshard_num, &logshard_oid);
return ret;
}
+ utime_t lock_start_time = ceph_clock_now();
+
do {
std::list<cls_rgw_reshard_entry> entries;
ret = list(logshard_num, marker, max_entries, entries, &truncated);
for(auto& entry: entries) {
/* resharding has not started */
- if(entry.new_instance_id.empty()) {
- RGWObjectCtx obj_ctx(store);
- rgw_bucket bucket;
- RGWBucketInfo bucket_info;
- map<string, bufferlist> attrs;
+ RGWObjectCtx obj_ctx(store);
+ RGWBucketInfo bucket_info;
+ map<string, bufferlist> attrs;
- ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, bucket_info, nullptr,
- &attrs);
- if (ret < 0) {
- ldout(cct, 0) << __func__ << ": Error in get_bucket_info: " << cpp_strerror(-ret) << dendl;
- return -ret;
- }
- bucket = bucket_info.bucket;
+ ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, bucket_info, nullptr,
+ &attrs);
+ if (ret < 0) {
+ ldout(cct, 0) << __func__ << ": Error in get_bucket_info: " << cpp_strerror(-ret) << dendl;
+ return -ret;
+ }
+ rgw_bucket bucket = bucket_info.bucket;
+ RGWBucketInfo new_bucket_info(bucket_info);
- RGWBucketInfo new_bucket_info(bucket_info);
+ if(entry.new_instance_id.empty()) {
ret = create_new_bucket_instance(store, entry.new_num_shards, bucket_info, attrs,
new_bucket_info);
if (ret < 0) {
entry.new_instance_id = new_bucket_info.bucket.bucket_id;
+ ldout(cct, 20) << "reshard: assigning new bucket instance id for bucket=" << bucket.name
+ << " new_instance_id=" << entry.new_instance_id << dendl;
+
ret = add(entry);
if (ret < 0) {
ldout(cct, 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " <<
cpp_strerror(-ret) << dendl;
return ret;
}
+ }
- RGWBucketAdminOpState bucket_op;
- RGWBucketReshard reshard_op(store, bucket_info, attrs);
- ret = reshard_op.do_reshard(entry.new_num_shards, new_bucket_info,
- max_entries, false, nullptr, nullptr);
- if (ret < 0) {
- return ret;
- }
+ RGWBucketAdminOpState bucket_op;
+ RGWBucketReshard reshard_op(store, bucket_info, attrs);
+ ret = reshard_op.do_reshard(entry.new_num_shards, new_bucket_info,
+ max_entries, false, nullptr, nullptr);
+ if (ret < 0) {
+ return ret;
+ }
- ret = remove(entry);
- if (ret < 0) {
- ldout(cct, 0)<< __func__ << ":Error removing bucket " << entry.bucket_name << " for resharding queue: "
- << cpp_strerror(-ret) << dendl;
- return ret;
- }
+ ret = remove(entry);
+ if (ret < 0) {
+ ldout(cct, 0)<< __func__ << ":Error removing bucket " << entry.bucket_name << " for resharding queue: "
+ << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+ utime_t now = ceph_clock_now();
+
+ if (now > lock_start_time + max_secs / 2) { /* do you need to renew lock? */
+ l.set_renew(true);
+ ret = l.lock_exclusive(&store->reshard_pool_ctx, logshard_oid);
+ if (ret == -EBUSY) { /* already locked by another processor */
+ ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl;
+ return ret;
+ }
+ lock_start_time = now;
}
+ entry.get_key(&marker);
}
-#warning update marker?
-#warning do work here, renew lock
} while (truncated);
l.unlock(&store->reshard_pool_ctx, logshard_oid);