From 1da369e1e8c6b65de3f8d73d120dbcb32a9dd9b8 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Wed, 24 May 2017 11:21:46 -0700 Subject: [PATCH] rgw: reshard: renew lease and handle marker when listing reshard repo Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_admin.cc | 2 +- src/rgw/rgw_reshard.cc | 77 +++++++++++++++++++++++++++--------------- 2 files changed, 50 insertions(+), 29 deletions(-) diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index e1a9027007e..7f12c1f36be 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -5650,10 +5650,10 @@ next: for (auto iter=entries.begin(); iter != entries.end(); ++iter) { cls_rgw_reshard_entry& entry = *iter; encode_json("entry", entry, formatter); + entry.get_key(&marker); } count += entries.size(); formatter->flush(cout); -#warning marker? } while (is_truncated && count < max_entries); if (count >= max_entries) { diff --git a/src/rgw/rgw_reshard.cc b/src/rgw/rgw_reshard.cc index 5bdf32d0afb..7bc885dcdcd 100644 --- a/src/rgw/rgw_reshard.cc +++ b/src/rgw/rgw_reshard.cc @@ -197,6 +197,7 @@ RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucke #define COOKIE_LEN 16 char cookie_buf[COOKIE_LEN + 1]; gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1); + cookie_buf[COOKIE_LEN] = '\0'; reshard_lock.set_cookie(cookie_buf); reshard_lock.set_duration(lock_duration); @@ -671,6 +672,12 @@ int RGWReshard::process_single_logshard(int logshard_num) utime_t time(max_secs, 0); l.set_duration(time); + char cookie_buf[COOKIE_LEN + 1]; + gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1); + cookie_buf[COOKIE_LEN] = '\0'; + + l.set_cookie(cookie_buf); + string logshard_oid; get_logshard_oid(logshard_num, &logshard_oid); @@ -680,6 +687,8 @@ int RGWReshard::process_single_logshard(int logshard_num) return ret; } + utime_t lock_start_time = ceph_clock_now(); + do { std::list entries; ret = list(logshard_num, marker, max_entries, entries, &truncated); @@ -690,21 +699,20 @@ int RGWReshard::process_single_logshard(int logshard_num) for(auto& entry: entries) { /* resharding has not started */ - if(entry.new_instance_id.empty()) { - RGWObjectCtx obj_ctx(store); - rgw_bucket bucket; - RGWBucketInfo bucket_info; - map attrs; + RGWObjectCtx obj_ctx(store); + RGWBucketInfo bucket_info; + map attrs; - ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, bucket_info, nullptr, - &attrs); - if (ret < 0) { - ldout(cct, 0) << __func__ << ": Error in get_bucket_info: " << cpp_strerror(-ret) << dendl; - return -ret; - } - bucket = bucket_info.bucket; + ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, bucket_info, nullptr, + &attrs); + if (ret < 0) { + ldout(cct, 0) << __func__ << ": Error in get_bucket_info: " << cpp_strerror(-ret) << dendl; + return -ret; + } + rgw_bucket bucket = bucket_info.bucket; + RGWBucketInfo new_bucket_info(bucket_info); - RGWBucketInfo new_bucket_info(bucket_info); + if(entry.new_instance_id.empty()) { ret = create_new_bucket_instance(store, entry.new_num_shards, bucket_info, attrs, new_bucket_info); if (ret < 0) { @@ -714,31 +722,44 @@ int RGWReshard::process_single_logshard(int logshard_num) entry.new_instance_id = new_bucket_info.bucket.bucket_id; + ldout(cct, 20) << "reshard: assigning new bucket instance id for bucket=" << bucket.name + << " new_instance_id=" << entry.new_instance_id << dendl; + ret = add(entry); if (ret < 0) { ldout(cct, 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " << cpp_strerror(-ret) << dendl; return ret; } + } - RGWBucketAdminOpState bucket_op; - RGWBucketReshard reshard_op(store, bucket_info, attrs); - ret = reshard_op.do_reshard(entry.new_num_shards, new_bucket_info, - max_entries, false, nullptr, nullptr); - if (ret < 0) { - return ret; - } + RGWBucketAdminOpState bucket_op; + RGWBucketReshard reshard_op(store, bucket_info, attrs); + ret = reshard_op.do_reshard(entry.new_num_shards, new_bucket_info, + max_entries, false, nullptr, nullptr); + if (ret < 0) { + return ret; + } - ret = remove(entry); - if (ret < 0) { - ldout(cct, 0)<< __func__ << ":Error removing bucket " << entry.bucket_name << " for resharding queue: " - << cpp_strerror(-ret) << dendl; - return ret; - } + ret = remove(entry); + if (ret < 0) { + ldout(cct, 0)<< __func__ << ":Error removing bucket " << entry.bucket_name << " for resharding queue: " + << cpp_strerror(-ret) << dendl; + return ret; + } + utime_t now = ceph_clock_now(); + + if (now > lock_start_time + max_secs / 2) { /* do you need to renew lock? */ + l.set_renew(true); + ret = l.lock_exclusive(&store->reshard_pool_ctx, logshard_oid); + if (ret == -EBUSY) { /* already locked by another processor */ + ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl; + return ret; + } + lock_start_time = now; } + entry.get_key(&marker); } -#warning update marker? -#warning do work here, renew lock } while (truncated); l.unlock(&store->reshard_pool_ctx, logshard_oid); -- 2.39.5