::encode(call, in);
op.exec("rgw", "guard_bucket_resharding", in);
}
+
+static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx, const string& oid,
+ const cls_rgw_bucket_instance_entry& entry,
+ BucketIndexAioManager *manager) {
+ bufferlist in;
+ struct cls_rgw_set_bucket_resharding_op call;
+ call.entry = entry;
+ ::encode(call, in);
+ librados::ObjectWriteOperation op;
+ op.exec("rgw", "set_bucket_resharding", in);
+ return manager->aio_operate(io_ctx, oid, &op);
+}
+
+int CLSRGWIssueSetBucketResharding::issue_op(int shard_id, const string& oid)
+{
+ return issue_set_bucket_resharding(io_ctx, oid, entry, &manager);
+}
+
CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(dir_headers) {}
};
+class CLSRGWIssueSetBucketResharding : public CLSRGWConcurrentIO {
+ cls_rgw_bucket_instance_entry entry;
+protected:
+ int issue_op(int shard_id, const string& oid) override;
+public:
+ CLSRGWIssueSetBucketResharding(librados::IoCtx& ioc, map<int, string>& _bucket_objs,
+ const cls_rgw_bucket_instance_entry& entry,
+ uint32_t _max_aio) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
+};
+
int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx);
void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates);
return CLSRGWIssueBucketRebuild(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
}
+int RGWRados::bucket_set_reshard(RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry)
+{
+ librados::IoCtx index_ctx;
+ map<int, string> bucket_objs;
+
+ int r = open_bucket_index(bucket_info, index_ctx, bucket_objs);
+ if (r < 0) {
+ return r;
+ }
+
+ return CLSRGWIssueSetBucketResharding(index_ctx, bucket_objs, entry, cct->_conf->rgw_bucket_index_max_aio)();
+}
+
int RGWRados::defer_gc(void *ctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj)
{
RGWObjectCtx *rctx = static_cast<RGWObjectCtx *>(ctx);
friend class RGWStateLog;
friend class RGWReplicaLogger;
friend class RGWReshard;
+ friend class RGWBucketReshard;
friend class BucketIndexLockGuard;
/** Open the pool used as root for this gateway */
map<RGWObjCategory, RGWStorageStats> *existing_stats,
map<RGWObjCategory, RGWStorageStats> *calculated_stats);
int bucket_rebuild_index(RGWBucketInfo& bucket_info);
+ int bucket_set_reshard(RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry);
int remove_objs_from_index(RGWBucketInfo& bucket_info, list<rgw_obj_index_key>& oid_list);
int move_rados_obj(librados::IoCtx& src_ioctx,
const string& src_oid, const string& src_locator,
const string reshard_lock_name = "reshard_process";
const string bucket_instance_lock_name = "bucket_instance_lock";
+RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info) :
+ store(_store), bucket_info(_bucket_info),
+ reshard_lock(reshard_lock_name) {
+ const rgw_bucket& b = bucket_info.bucket;
+ reshard_oid = b.tenant + (b.tenant.empty() ? "" : ":") + b.name + ":" + b.bucket_id;
+}
+
+int RGWBucketReshard::lock_bucket()
+{
+#warning set timeout for guard lock
+
+ int ret = reshard_lock.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "RGWReshard::add failed to acquire lock on " << reshard_oid << " ret=" << ret << dendl;
+ return ret;
+ }
+ return 0;
+}
+
+void RGWBucketReshard::unlock_bucket()
+{
+ int ret = reshard_lock.unlock(&store->reshard_pool_ctx, reshard_oid);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "WARNING: RGWReshard::add failed to drop lock on " << reshard_oid << " ret=" << ret << dendl;
+ }
+}
+
+int RGWBucketReshard::init_resharding(const cls_rgw_reshard_entry& entry)
+{
+ if (entry.new_instance_id.empty()) {
+ ldout(store->ctx(), 0) << __func__ << " missing new bucket instance id" << dendl;
+ return -EINVAL;
+ }
+
+ cls_rgw_bucket_instance_entry instance_entry;
+ instance_entry.new_bucket_instance_id = entry.new_instance_id;
+
+ int ret = store->bucket_set_reshard(bucket_info, instance_entry);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: "
+ << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+ return 0;
+}
+
+int RGWBucketReshard::clear_resharding()
+{
+ cls_rgw_bucket_instance_entry instance_entry;
+
+ int ret = store->bucket_set_reshard(bucket_info, instance_entry);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: "
+ << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+ return 0;
+}
+
RGWReshard::RGWReshard(CephContext *_cct, RGWRados* _store):cct(_cct), store(_store),
instance_lock(bucket_instance_lock_name)
{
return bucket_instance_lock_name + "." + bucket_instance_id;
}
-int RGWReshard::set_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry)
-{
- rados::cls::lock::Lock l(create_bucket_index_lock_name(entry.old_instance_id));
-
- if (entry.new_instance_id.empty()) {
- ldout(cct, 0) << "RGWReshard::" << __func__ << " missing new bucket instance id" << dendl;
- return -EEXIST;
- }
-
- int ret = l.lock_exclusive(&store->reshard_pool_ctx, bucket_instance_oid);
- if (ret == -EBUSY) {
- ldout(cct, 0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl;
- return 0;
- }
- if (ret < 0)
- return ret;
-
- cls_rgw_bucket_instance_entry instance_entry;
- instance_entry.new_bucket_instance_id = entry.new_instance_id;
-
- ret = cls_rgw_set_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid, instance_entry);
- if (ret < 0) {
- ldout(cct, 0) << "RGWReshard::" << __func__ << " ERROR: cls_rgw_set_bucket_resharding: "
- << cpp_strerror(-ret) << dendl;
- goto done;
- }
-
-done:
- l.unlock(&store->reshard_pool_ctx, bucket_instance_oid);
- return ret;
-}
-
int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry)
{
rados::cls::lock::Lock l(create_bucket_index_lock_name(entry.old_instance_id));
entry.new_instance_id.clear();
- ret = cls_rgw_clear_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid);
+ ret = cls_rgw_clear_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid);
l.unlock(&store->reshard_pool_ctx, bucket_instance_oid);
return ret;
int unlock();
};
+
+class RGWBucketReshard {
+ RGWRados *store;
+ RGWBucketInfo bucket_info;
+
+ string reshard_oid;
+ rados::cls::lock::Lock reshard_lock;
+
+ int lock_bucket();
+ void unlock_bucket();
+ int init_resharding(const cls_rgw_reshard_entry& entry);
+ int clear_resharding();
+public:
+ RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info);
+
+ int reshard();
+ int abort_reshard();
+};
+
class RGWReshard {
CephContext *cct;
RGWRados *store;
int get(cls_rgw_reshard_entry& entry);
int remove(cls_rgw_reshard_entry& entry);
int list(string& marker, uint32_t max, list<cls_rgw_reshard_entry>& entries, bool& is_truncated);
- int set_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
/*
if succefull, keeps the bucket index locked. It will be unlocked