From 246c78ebfc8f3d1aa2d11123d15752833e2e2345 Mon Sep 17 00:00:00 2001 From: Orit Wasserman Date: Tue, 23 May 2017 13:37:08 +0300 Subject: [PATCH] rgw: update entry in resharding log when resharding has started Signed-off-by: Orit Wasserman --- src/rgw/rgw_reshard.cc | 100 ++++++++++++++++++++++++----------------- src/rgw/rgw_reshard.h | 7 +-- 2 files changed, 63 insertions(+), 44 deletions(-) diff --git a/src/rgw/rgw_reshard.cc b/src/rgw/rgw_reshard.cc index a80291d09d1..7ef28f0fcd8 100644 --- a/src/rgw/rgw_reshard.cc +++ b/src/rgw/rgw_reshard.cc @@ -9,6 +9,8 @@ #include "common/errno.h" #include "common/ceph_json.h" +#include "common/dout.h" + #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rgw @@ -190,7 +192,7 @@ public: RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, const map& _bucket_attrs) : store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs), reshard_lock(reshard_lock_name) { - const rgw_bucket& b = bucket_info.bucket; + const rgw_bucket& b = bucket_info.bucket; reshard_oid = b.tenant + (b.tenant.empty() ? "" : ":") + b.name + ":" + b.bucket_id; utime_t lock_duration(store->ctx()->_conf->rgw_reshard_bucket_lock_duration, 0); @@ -510,7 +512,7 @@ int RGWBucketReshard::get_status(list *status) } int RGWBucketReshard::execute(int num_shards, int max_op_entries, - bool verbose, ostream *out, Formatter *formatter) + bool verbose, ostream *out, Formatter *formatter, RGWReshard* reshard_log) { int ret = lock_bucket(); @@ -519,12 +521,19 @@ int RGWBucketReshard::execute(int num_shards, int max_op_entries, } RGWBucketInfo new_bucket_info; - ret = create_new_bucket_instance(num_shards, new_bucket_info); if (ret < 0) { unlock_bucket(); return ret; } + + if (reshard_log) { + ret = reshard_log->update(bucket_info, new_bucket_info); + if (ret < 0) { + return ret; + } + } + ret = set_resharding_status(new_bucket_info.bucket.bucket_id, num_shards, CLS_RGW_RESHARD_IN_PROGRESS); if (ret < 0) { unlock_bucket(); @@ -599,6 +608,28 @@ int RGWReshard::add(cls_rgw_reshard_entry& entry) return 0; } +int RGWReshard::update(const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info) +{ + cls_rgw_reshard_entry entry; + entry.bucket_name = bucket_info.bucket.name; + entry.bucket_id = bucket_info.bucket.bucket_id; + + int ret = get(entry); + if (ret < 0) { + return ret; + } + + entry.new_instance_id = new_bucket_info.bucket.name + ":" + new_bucket_info.bucket.bucket_id; + + ret = add(entry); + if (ret < 0) { + ldout(store->ctx(), 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " << + cpp_strerror(-ret) << dendl; + } + + return ret; +} + int RGWReshard::list(int logshard_num, string& marker, uint32_t max, std::list& entries, bool *is_truncated) { @@ -713,7 +744,7 @@ int RGWReshardWait::block_while_resharding(RGWRados::BucketShard *bs, string *ne int RGWReshard::process_single_logshard(int logshard_num) { string marker; - bool truncated = false; + bool truncated = true; CephContext *cct = store->ctx(); int max_entries = 1000; @@ -750,54 +781,41 @@ int RGWReshard::process_single_logshard(int logshard_num) } for(auto& entry: entries) { - /* resharding has not started */ - RGWObjectCtx obj_ctx(store); - RGWBucketInfo bucket_info; - map attrs; + if(entry.new_instance_id.empty()) { - ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, bucket_info, nullptr, - &attrs); - if (ret < 0) { - ldout(cct, 0) << __func__ << ": Error in get_bucket_info: " << cpp_strerror(-ret) << dendl; - return -ret; - } - rgw_bucket bucket = bucket_info.bucket; - RGWBucketInfo new_bucket_info(bucket_info); + ldout(store->ctx(), 20) << __func__ << " resharding " << entry.bucket_name << dendl; - if(entry.new_instance_id.empty()) { - ret = create_new_bucket_instance(store, entry.new_num_shards, bucket_info, attrs, - new_bucket_info); + RGWObjectCtx obj_ctx(store); + rgw_bucket bucket; + RGWBucketInfo bucket_info; + map attrs; + + ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, bucket_info, nullptr, + &attrs); if (ret < 0) { - ldout(cct, 0) << __func__ << " ERROR: could not create new bucket info: " << cpp_strerror(-ret) << dendl; - return ret; + ldout(cct, 0) << __func__ << ": Error in get_bucket_info: " << cpp_strerror(-ret) << dendl; + return -ret; } - entry.new_instance_id = new_bucket_info.bucket.bucket_id; - - ldout(cct, 20) << "reshard: assigning new bucket instance id for bucket=" << bucket.name - << " new_instance_id=" << entry.new_instance_id << dendl; + RGWBucketReshard br(store, bucket_info, attrs); - ret = add(entry); + Formatter* formatter = new JSONFormatter(false); + auto formatter_ptr = std::unique_ptr(formatter); + ret = br.execute(entry.new_num_shards, max_entries, true,nullptr, formatter, this); if (ret < 0) { - ldout(cct, 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " << - cpp_strerror(-ret) << dendl; + ldout (store->ctx(), 0) << __func__ << "ERROR in reshard_bucket " << entry.bucket_name << ":" << + cpp_strerror(-ret)<< dendl; return ret; } - } - RGWBucketAdminOpState bucket_op; - RGWBucketReshard reshard_op(store, bucket_info, attrs); - ret = reshard_op.do_reshard(entry.new_num_shards, new_bucket_info, - max_entries, verbose, out, formatter); - if (ret < 0) { - return ret; - } + ldout (store->ctx(), 20) << " removing entry" << entry.bucket_name<< dendl; - ret = remove(entry); - if (ret < 0) { - ldout(cct, 0)<< __func__ << ":Error removing bucket " << entry.bucket_name << " for resharding queue: " - << cpp_strerror(-ret) << dendl; - return ret; + ret = remove(entry); + if (ret < 0) { + ldout(cct, 0)<< __func__ << ":Error removing bucket " << entry.bucket_name << " for resharding queue: " + << cpp_strerror(-ret) << dendl; + return ret; + } } utime_t now = ceph_clock_now(); diff --git a/src/rgw/rgw_reshard.h b/src/rgw/rgw_reshard.h index 4237d2dc4fc..9873707f97d 100644 --- a/src/rgw/rgw_reshard.h +++ b/src/rgw/rgw_reshard.h @@ -29,8 +29,7 @@ class RGWBucketReshard { int set_resharding_status(const string& new_instance_id, int32_t num_shards, cls_rgw_reshard_status status); int clear_resharding(); - int create_new_bucket_instance(int new_num_shards, - RGWBucketInfo& new_bucket_info); + int create_new_bucket_instance(int new_num_shards, RGWBucketInfo& new_bucket_info); int do_reshard(int num_shards, const RGWBucketInfo& new_bucket_info, int max_entries, @@ -43,7 +42,8 @@ public: int execute(int num_shards, int max_op_entries, bool verbose = false, ostream *out = nullptr, - Formatter *formatter = nullptr); + Formatter *formatter = nullptr, + RGWReshard *reshard_log = nullptr); int abort(); int get_status(std::list *status); }; @@ -87,6 +87,7 @@ protected: public: RGWReshard(RGWRados* _store, bool _verbose = false, ostream *_out = nullptr, Formatter *_formatter = nullptr); int add(cls_rgw_reshard_entry& entry); + int update(const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info); int get(cls_rgw_reshard_entry& entry); int remove(cls_rgw_reshard_entry& entry); int list(int logshard_num, string& marker, uint32_t max, std::list& entries, bool *is_truncated); -- 2.39.5