A command that returns the reshard status field on the bucket index objects (all
shards).
Also, initial implementation of identifying and blocking while reshard in
progress.
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
int issue_op(int shard_id, const string& oid) override;
public:
CLSRGWIssueSetBucketResharding(librados::IoCtx& ioc, map<int, string>& _bucket_objs,
- const cls_rgw_bucket_instance_entry& entry,
- uint32_t _max_aio) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
+ const cls_rgw_bucket_instance_entry& _entry,
+ uint32_t _max_aio) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio), entry(_entry) {}
};
int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx);
bool resharding() const {
return reshard_status != CLS_RGW_RESHARD_NONE;
}
+ bool resharding_in_progress() const {
+ return reshard_status == CLS_RGW_RESHARD_IN_PROGRESS;
+ }
};
WRITE_CLASS_ENCODER(cls_rgw_bucket_instance_entry)
bool resharding() const {
return new_instance.resharding();
}
+ bool resharding_in_progress() const {
+ return new_instance.resharding_in_progress();
+ }
};
WRITE_CLASS_ENCODER(rgw_bucket_dir_header)
OPT_ROLE_POLICY_DELETE,
OPT_RESHARD_ADD,
OPT_RESHARD_LIST,
+ OPT_RESHARD_STATUS,
OPT_RESHARD_EXECUTE,
OPT_RESHARD_CANCEL,
};
return OPT_RESHARD_ADD;
if (strcmp(cmd, "list") == 0)
return OPT_RESHARD_LIST;
+ if (strcmp(cmd, "status") == 0)
+ return OPT_RESHARD_STATUS;
if (strcmp(cmd, "execute") == 0)
return OPT_RESHARD_EXECUTE;
if (strcmp(cmd, "cancel") == 0)
return 0;
}
+ if (opt_cmd == OPT_RESHARD_STATUS) {
+ if (bucket_name.empty()) {
+ cerr << "ERROR: bucket not specified" << std::endl;
+ return EINVAL;
+ }
+
+ rgw_bucket bucket;
+ RGWBucketInfo bucket_info;
+ map<string, bufferlist> attrs;
+ ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket, &attrs);
+ if (ret < 0) {
+ cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
+ return -ret;
+ }
+
+ RGWBucketReshard br(store, bucket_info, attrs);
+ list<cls_rgw_bucket_instance_entry> status;
+ int r = br.get_status(&status);
+ if (r < 0) {
+ cerr << "ERROR: could not get resharding status for bucket " << bucket_name << std::endl;
+ return -r;
+ }
+
+ encode_json("status", status, formatter);
+ formatter->flush(cout);
+ }
+
#if 0
if (opt_cmd == OPT_RESHARD_EXECUTE) {
RGWReshard reshard(store);
return 0;
}
+int RGWRados::Bucket::update_bucket_id(const string& new_bucket_id)
+{
+ rgw_bucket& bucket = bucket_info.bucket;
+ bucket.bucket_id = new_bucket_id;
+
+ int ret = get_bucket_instance_info(ctx, bucket, bucket_info, nullptr, nullptr);
+ if (ret < 0) {
+ return ret;
+ }
+ obj.bucket = bucket;
+ bucket_shard.bucket = bucket;
+ bs_initialized = false
+
+ return 0;
+}
+
int RGWRados::Object::Read::get_attr(const char *name, bufferlist& dest)
{
RGWObjState *state;
return 0;
}
RGWRados *store = target->get_store();
- BucketShard *bs;
-
- int ret = get_bucket_shard(&bs);
- if (ret < 0) {
- ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl;
- return ret;
- }
if (write_tag && write_tag->length()) {
optag = string(write_tag->c_str(), write_tag->length());
}
}
- int r = store->cls_obj_prepare_op(*bs, op, optag, obj, bilog_flags, zones_trace);
+ int r;
+
+#define NUM_RESHARD_RETRIES 3
+
+ for (int i = 0; i < NUM_RESHARD_RETRIES; ++i) {
+ BucketShard *bs;
+ int ret = get_bucket_shard(&bs);
+ if (ret < 0) {
+ ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl;
+ return ret;
+ }
+ r = store->cls_obj_prepare_op(*bs, op, optag, obj, bilog_flags, zones_trace);
+ if (r != -ERR_BUSY_RESHARDING) {
+ break;
+ }
+ ldout(store->ctx(), 0) << "NOTICE: resharding operation on bucket index detected, blocking" << dendl;
+ RGWReshard reshard(store);
+ string new_bucket_id;
+ r = reshard.block_while_resharding(bs, &new_bucket_id);
+ if (r != -EAGAIN) {
+ if (r < 0) {
+ return r;
+ }
+ r = target->update_bucket_id(new_bucket_id);
+ break;
+ }
+ }
+
if (r < 0) {
return r;
}
RGWBucketInfo& get_bucket_info() { return bucket_info; }
int get_manifest(RGWObjManifest **pmanifest);
+ void update_bucket_id(const string& new_bucket_id);
+
int get_bucket_shard(BucketShard **pbs) {
if (!bs_initialized) {
int r = bs.init(bucket_info.bucket, obj);
bool blind;
bool prepared{false};
rgw_zone_set *zones_trace{nullptr};
+
+ int init_bs() {
+ int r = bs.init(target->get_bucket(), obj);
+ if (r < 0) {
+ return r;
+ }
+ bs_initialized = true;
+ return 0;
+ }
+
+ void invalidate_bs() {
+ bs_initialized = false;
+ }
public:
UpdateIndex(RGWRados::Bucket *_target, const rgw_obj& _obj) : target(_target), obj(_obj),
int get_bucket_shard(BucketShard **pbs) {
if (!bs_initialized) {
- int r = bs.init(target->get_bucket(), obj);
+ int r = init_bs();
if (r < 0) {
return r;
}
- bs_initialized = true;
}
*pbs = &bs;
return 0;
}
}
-int RGWBucketReshard::init_resharding(const string& new_instance_id)
+int RGWBucketReshard::set_resharding_status(const string& new_instance_id, cls_rgw_reshard_status status)
{
if (new_instance_id.empty()) {
ldout(store->ctx(), 0) << __func__ << " missing new bucket instance id" << dendl;
}
cls_rgw_bucket_instance_entry instance_entry;
- instance_entry.set_status(new_instance_id, CLS_RGW_RESHARD_IN_PROGRESS);
+ instance_entry.set_status(new_instance_id, status);
int ret = store->bucket_set_reshard(bucket_info, instance_entry);
if (ret < 0) {
return 0;
}
+int RGWBucketReshard::get_status(list<cls_rgw_bucket_instance_entry> *status)
+{
+ librados::IoCtx index_ctx;
+ map<int, string> bucket_objs;
+
+ int r = store->open_bucket_index(bucket_info, index_ctx, bucket_objs);
+ if (r < 0) {
+ return r;
+ }
+
+ for (auto i : bucket_objs) {
+ cls_rgw_bucket_instance_entry entry;
+
+ int ret = cls_rgw_get_bucket_resharding(index_ctx, i.second, &entry);
+ if (ret < 0 && ret != -ENOENT) {
+ lderr(store->ctx()) << "ERROR: " << __func__ << ": cls_rgw_get_bucket_resharding() returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ status->push_back(entry);
+ }
+
+ return 0;
+}
+
int RGWBucketReshard::execute(int num_shards, int max_op_entries,
bool verbose, ostream *out, Formatter *formatter)
return ret;
}
- ret = init_resharding(new_bucket_info.bucket.bucket_id);
+ ret = set_resharding_status(bucket_info.bucket.bucket_id, CLS_RGW_RESHARD_IN_PROGRESS);
if (ret < 0) {
unlock_bucket();
return ret;
}
+#warning remove me
+sleep(10);
+
ret = do_reshard(num_shards,
new_bucket_info,
max_op_entries,
return ret;
}
+ ret = set_resharding_status(bucket_info.bucket.bucket_id, CLS_RGW_RESHARD_DONE);
+ if (ret < 0) {
+ unlock_bucket();
+ return ret;
+ }
+
unlock_bucket();
return 0;
}
const int num_retries = 10;
-const int default_reshard_sleep_duration = 30;
+const int default_reshard_sleep_duration = 5;
-int RGWReshard::block_while_resharding(const string& bucket_instance_oid)
+int RGWReshard::block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id)
{
int ret = 0;
cls_rgw_bucket_instance_entry entry;
for (int i=0; i< num_retries;i++) {
- ret = cls_rgw_get_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid, &entry);
+ ret = cls_rgw_get_bucket_resharding(bs->index_ctx, bs->bucket_obj, &entry);
if (ret < 0) {
ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: failed to get bucket resharding :" <<
cpp_strerror(-ret)<< dendl;
return ret;
}
- if (!entry.resharding()) {
+ if (!entry.resharding_in_progress()) {
+ *new_bucket_id = entry.new_bucket_instance_id;
return 0;
}
+ ldout(store->ctx(), 20) << "NOTICE: reshard still in progress; " << (i < num_retries - 1 ? "retrying" : "too many retries") << dendl;
/* needed to unlock as clear resharding uses the same lock */
#warning replace sleep with interruptible condition
sleep(default_reshard_sleep_duration);
int lock_bucket();
void unlock_bucket();
- int init_resharding(const string& new_instance_id);
+ int set_resharding_status(const string& new_instance_id, cls_rgw_reshard_status status);
int clear_resharding();
int create_new_bucket_instance(int new_num_shards,
bool verbose = false, ostream *out = nullptr,
Formatter *formatter = nullptr);
int abort();
+ int get_status(std::list<cls_rgw_bucket_instance_entry> *status);
};
class RGWReshard {
int remove(cls_rgw_reshard_entry& entry);
int list(string& marker, uint32_t max, list<cls_rgw_reshard_entry>& entries, bool& is_truncated);
int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
- /*
- if succefull, keeps the bucket index locked. It will be unlocked
- in the guard dtor.
- */
- int block_while_resharding(const string& bucket_instance_oid);
+ int block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id);
};
#endif