From 9766f5b92dcda46e895d64c2fdf5960d12eec24e Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Wed, 10 May 2017 16:58:41 -0700 Subject: [PATCH] rgw_admin: add rgw-admin reshard status command and initial blocking logic A command that returns the reshard status field on the bucket index objects (all shards). Also, initial implementation of identifying and blocking while reshard in progress. Signed-off-by: Yehuda Sadeh --- src/cls/rgw/cls_rgw_client.h | 4 +-- src/cls/rgw/cls_rgw_types.h | 6 +++++ src/rgw/rgw_admin.cc | 30 +++++++++++++++++++++ src/rgw/rgw_rados.cc | 52 ++++++++++++++++++++++++++++++------ src/rgw/rgw_rados.h | 18 +++++++++++-- src/rgw/rgw_reshard.cc | 50 +++++++++++++++++++++++++++++----- src/rgw/rgw_reshard.h | 9 +++---- 7 files changed, 144 insertions(+), 25 deletions(-) diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index 8db31c4135f..c1abcf0cccd 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -462,8 +462,8 @@ protected: int issue_op(int shard_id, const string& oid) override; public: CLSRGWIssueSetBucketResharding(librados::IoCtx& ioc, map& _bucket_objs, - const cls_rgw_bucket_instance_entry& entry, - uint32_t _max_aio) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {} + const cls_rgw_bucket_instance_entry& _entry, + uint32_t _max_aio) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio), entry(_entry) {} }; int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx); diff --git a/src/cls/rgw/cls_rgw_types.h b/src/cls/rgw/cls_rgw_types.h index 8244a48dff4..081d3ba95cd 100644 --- a/src/cls/rgw/cls_rgw_types.h +++ b/src/cls/rgw/cls_rgw_types.h @@ -646,6 +646,9 @@ struct cls_rgw_bucket_instance_entry { bool resharding() const { return reshard_status != CLS_RGW_RESHARD_NONE; } + bool resharding_in_progress() const { + return reshard_status == CLS_RGW_RESHARD_IN_PROGRESS; + } }; WRITE_CLASS_ENCODER(cls_rgw_bucket_instance_entry) @@ -699,6 +702,9 @@ struct rgw_bucket_dir_header { bool resharding() const { return new_instance.resharding(); } + bool resharding_in_progress() const { + return new_instance.resharding_in_progress(); + } }; WRITE_CLASS_ENCODER(rgw_bucket_dir_header) diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index a7abc56142a..3a49fb637a9 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -482,6 +482,7 @@ enum { OPT_ROLE_POLICY_DELETE, OPT_RESHARD_ADD, OPT_RESHARD_LIST, + OPT_RESHARD_STATUS, OPT_RESHARD_EXECUTE, OPT_RESHARD_CANCEL, }; @@ -913,6 +914,8 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_ return OPT_RESHARD_ADD; if (strcmp(cmd, "list") == 0) return OPT_RESHARD_LIST; + if (strcmp(cmd, "status") == 0) + return OPT_RESHARD_STATUS; if (strcmp(cmd, "execute") == 0) return OPT_RESHARD_EXECUTE; if (strcmp(cmd, "cancel") == 0) @@ -5625,6 +5628,33 @@ next: return 0; } + if (opt_cmd == OPT_RESHARD_STATUS) { + if (bucket_name.empty()) { + cerr << "ERROR: bucket not specified" << std::endl; + return EINVAL; + } + + rgw_bucket bucket; + RGWBucketInfo bucket_info; + map attrs; + ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket, &attrs); + if (ret < 0) { + cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl; + return -ret; + } + + RGWBucketReshard br(store, bucket_info, attrs); + list status; + int r = br.get_status(&status); + if (r < 0) { + cerr << "ERROR: could not get resharding status for bucket " << bucket_name << std::endl; + return -r; + } + + encode_json("status", status, formatter); + formatter->flush(cout); + } + #if 0 if (opt_cmd == OPT_RESHARD_EXECUTE) { RGWReshard reshard(store); diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 0f88e99558c..364da290efa 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -8892,6 +8892,22 @@ int RGWRados::Object::get_manifest(RGWObjManifest **pmanifest) return 0; } +int RGWRados::Bucket::update_bucket_id(const string& new_bucket_id) +{ + rgw_bucket& bucket = bucket_info.bucket; + bucket.bucket_id = new_bucket_id; + + int ret = get_bucket_instance_info(ctx, bucket, bucket_info, nullptr, nullptr); + if (ret < 0) { + return ret; + } + obj.bucket = bucket; + bucket_shard.bucket = bucket; + bs_initialized = false + + return 0; +} + int RGWRados::Object::Read::get_attr(const char *name, bufferlist& dest) { RGWObjState *state; @@ -9521,13 +9537,6 @@ int RGWRados::Bucket::UpdateIndex::prepare(RGWModifyOp op, const string *write_t return 0; } RGWRados *store = target->get_store(); - BucketShard *bs; - - int ret = get_bucket_shard(&bs); - if (ret < 0) { - ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl; - return ret; - } if (write_tag && write_tag->length()) { optag = string(write_tag->c_str(), write_tag->length()); @@ -9537,7 +9546,34 @@ int RGWRados::Bucket::UpdateIndex::prepare(RGWModifyOp op, const string *write_t } } - int r = store->cls_obj_prepare_op(*bs, op, optag, obj, bilog_flags, zones_trace); + int r; + +#define NUM_RESHARD_RETRIES 3 + + for (int i = 0; i < NUM_RESHARD_RETRIES; ++i) { + BucketShard *bs; + int ret = get_bucket_shard(&bs); + if (ret < 0) { + ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl; + return ret; + } + r = store->cls_obj_prepare_op(*bs, op, optag, obj, bilog_flags, zones_trace); + if (r != -ERR_BUSY_RESHARDING) { + break; + } + ldout(store->ctx(), 0) << "NOTICE: resharding operation on bucket index detected, blocking" << dendl; + RGWReshard reshard(store); + string new_bucket_id; + r = reshard.block_while_resharding(bs, &new_bucket_id); + if (r != -EAGAIN) { + if (r < 0) { + return r; + } + r = target->update_bucket_id(new_bucket_id); + break; + } + } + if (r < 0) { return r; } diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index b79cc209cbc..dedc439f4c6 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -2675,6 +2675,8 @@ public: RGWBucketInfo& get_bucket_info() { return bucket_info; } int get_manifest(RGWObjManifest **pmanifest); + void update_bucket_id(const string& new_bucket_id); + int get_bucket_shard(BucketShard **pbs) { if (!bs_initialized) { int r = bs.init(bucket_info.bucket, obj); @@ -2866,6 +2868,19 @@ public: bool blind; bool prepared{false}; rgw_zone_set *zones_trace{nullptr}; + + int init_bs() { + int r = bs.init(target->get_bucket(), obj); + if (r < 0) { + return r; + } + bs_initialized = true; + return 0; + } + + void invalidate_bs() { + bs_initialized = false; + } public: UpdateIndex(RGWRados::Bucket *_target, const rgw_obj& _obj) : target(_target), obj(_obj), @@ -2875,11 +2890,10 @@ public: int get_bucket_shard(BucketShard **pbs) { if (!bs_initialized) { - int r = bs.init(target->get_bucket(), obj); + int r = init_bs(); if (r < 0) { return r; } - bs_initialized = true; } *pbs = &bs; return 0; diff --git a/src/rgw/rgw_reshard.cc b/src/rgw/rgw_reshard.cc index 385fdd0b6d7..d656dae9633 100644 --- a/src/rgw/rgw_reshard.cc +++ b/src/rgw/rgw_reshard.cc @@ -220,7 +220,7 @@ void RGWBucketReshard::unlock_bucket() } } -int RGWBucketReshard::init_resharding(const string& new_instance_id) +int RGWBucketReshard::set_resharding_status(const string& new_instance_id, cls_rgw_reshard_status status) { if (new_instance_id.empty()) { ldout(store->ctx(), 0) << __func__ << " missing new bucket instance id" << dendl; @@ -228,7 +228,7 @@ int RGWBucketReshard::init_resharding(const string& new_instance_id) } cls_rgw_bucket_instance_entry instance_entry; - instance_entry.set_status(new_instance_id, CLS_RGW_RESHARD_IN_PROGRESS); + instance_entry.set_status(new_instance_id, status); int ret = store->bucket_set_reshard(bucket_info, instance_entry); if (ret < 0) { @@ -403,6 +403,31 @@ int RGWBucketReshard::do_reshard( return 0; } +int RGWBucketReshard::get_status(list *status) +{ + librados::IoCtx index_ctx; + map bucket_objs; + + int r = store->open_bucket_index(bucket_info, index_ctx, bucket_objs); + if (r < 0) { + return r; + } + + for (auto i : bucket_objs) { + cls_rgw_bucket_instance_entry entry; + + int ret = cls_rgw_get_bucket_resharding(index_ctx, i.second, &entry); + if (ret < 0 && ret != -ENOENT) { + lderr(store->ctx()) << "ERROR: " << __func__ << ": cls_rgw_get_bucket_resharding() returned ret=" << ret << dendl; + return ret; + } + + status->push_back(entry); + } + + return 0; +} + int RGWBucketReshard::execute(int num_shards, int max_op_entries, bool verbose, ostream *out, Formatter *formatter) @@ -420,12 +445,15 @@ int RGWBucketReshard::execute(int num_shards, int max_op_entries, return ret; } - ret = init_resharding(new_bucket_info.bucket.bucket_id); + ret = set_resharding_status(bucket_info.bucket.bucket_id, CLS_RGW_RESHARD_IN_PROGRESS); if (ret < 0) { unlock_bucket(); return ret; } +#warning remove me +sleep(10); + ret = do_reshard(num_shards, new_bucket_info, max_op_entries, @@ -436,6 +464,12 @@ int RGWBucketReshard::execute(int num_shards, int max_op_entries, return ret; } + ret = set_resharding_status(bucket_info.bucket.bucket_id, CLS_RGW_RESHARD_DONE); + if (ret < 0) { + unlock_bucket(); + return ret; + } + unlock_bucket(); return 0; @@ -569,23 +603,25 @@ int RGWReshard::unlock_bucket_index(const string& oid) } const int num_retries = 10; -const int default_reshard_sleep_duration = 30; +const int default_reshard_sleep_duration = 5; -int RGWReshard::block_while_resharding(const string& bucket_instance_oid) +int RGWReshard::block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id) { int ret = 0; cls_rgw_bucket_instance_entry entry; for (int i=0; i< num_retries;i++) { - ret = cls_rgw_get_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid, &entry); + ret = cls_rgw_get_bucket_resharding(bs->index_ctx, bs->bucket_obj, &entry); if (ret < 0) { ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: failed to get bucket resharding :" << cpp_strerror(-ret)<< dendl; return ret; } - if (!entry.resharding()) { + if (!entry.resharding_in_progress()) { + *new_bucket_id = entry.new_bucket_instance_id; return 0; } + ldout(store->ctx(), 20) << "NOTICE: reshard still in progress; " << (i < num_retries - 1 ? "retrying" : "too many retries") << dendl; /* needed to unlock as clear resharding uses the same lock */ #warning replace sleep with interruptible condition sleep(default_reshard_sleep_duration); diff --git a/src/rgw/rgw_reshard.h b/src/rgw/rgw_reshard.h index d20412f59ba..98be4ad2131 100644 --- a/src/rgw/rgw_reshard.h +++ b/src/rgw/rgw_reshard.h @@ -44,7 +44,7 @@ class RGWBucketReshard { int lock_bucket(); void unlock_bucket(); - int init_resharding(const string& new_instance_id); + int set_resharding_status(const string& new_instance_id, cls_rgw_reshard_status status); int clear_resharding(); int create_new_bucket_instance(int new_num_shards, @@ -63,6 +63,7 @@ public: bool verbose = false, ostream *out = nullptr, Formatter *formatter = nullptr); int abort(); + int get_status(std::list *status); }; class RGWReshard { @@ -81,11 +82,7 @@ class RGWReshard { int remove(cls_rgw_reshard_entry& entry); int list(string& marker, uint32_t max, list& entries, bool& is_truncated); int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry); - /* - if succefull, keeps the bucket index locked. It will be unlocked - in the guard dtor. - */ - int block_while_resharding(const string& bucket_instance_oid); + int block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id); }; #endif -- 2.39.5