From c000a15fedf1a45ab920d65c4f9c2427d375347e Mon Sep 17 00:00:00 2001 From: Orit Wasserman Date: Tue, 18 Apr 2017 12:30:07 +0300 Subject: [PATCH] rgw: Add lock/unlock bucket instance and block_while_resharding methods Signed-off-by: Orit Wasserman --- src/rgw/rgw_reshard.cc | 68 +++++++++++++++++++++++++++++++++++++++++- src/rgw/rgw_reshard.h | 5 +++- 2 files changed, 71 insertions(+), 2 deletions(-) diff --git a/src/rgw/rgw_reshard.cc b/src/rgw/rgw_reshard.cc index bae28de06ba20..ab7bc8c9c038b 100644 --- a/src/rgw/rgw_reshard.cc +++ b/src/rgw/rgw_reshard.cc @@ -14,7 +14,8 @@ const string reshard_oid = "reshard"; const string reshard_lock_name = "reshard_process"; const string bucket_instance_lock_name = "bucket_instance_lock"; -RGWReshard::RGWReshard(CephContext *_cct, RGWRados* _store):cct(_cct), store(_store) +RGWReshard::RGWReshard(CephContext *_cct, RGWRados* _store):cct(_cct), store(_store), + instance_lock(bucket_instance_lock_name) { max_jobs = cct->_conf->rgw_reshard_max_jobs; } @@ -268,3 +269,68 @@ int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_r } +int RGWReshard::lock_bucket_index_shared(const string& oid) +{ + int ret = get_io_ctx(io_ctx); + if (ret < 0) + return ret; + + ret = instance_lock.lock_shared(&io_ctx, oid); + if (ret == -EBUSY) { + dout(0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl; + return 0; + } + + return ret; +} + +int RGWReshard::unlock_bucket_index(const string& oid) +{ + int ret = get_io_ctx(io_ctx); + if (ret < 0) + return ret; + instance_lock.unlock(&io_ctx, oid); + return 0; +} + + +const int num_retries = 10; +const int default_reshard_sleep_duration = 30; + +int RGWReshard::block_while_resharding(const string& bucket_instance_oid) +{ + int ret = 0; + cls_rgw_bucket_instance_entry entry; + bool resharding = false; + + for (int i=0; i< num_retries;i++) { + ret = lock_bucket_index_shared(bucket_instance_oid); + if (ret < 0) { + return ret; + } + + ret = cls_rgw_get_bucket_resharding(io_ctx, bucket_instance_oid, + entry, resharding); + if (ret < 0) { + ldout(cct, 0) << "RGWReshard::" << __func__ << " ERROR: failed to get bucket resharding :" << + cpp_strerror(-ret)<< dendl; + return ret; + } + + if (resharding) { + ret = unlock_bucket_index(bucket_instance_oid); + if (ret < 0) { + return ret; + } + sleep(default_reshard_sleep_duration); + } else { + return 0; + } + } + ldout(cct, 0) << "RGWReshard::" << __func__ << " ERROR: bucket is still resharding, please retry" << dendl; + ret = unlock_bucket_index(bucket_instance_oid); + if (ret < 0) { + return ret; + } + return -EAGAIN; +} diff --git a/src/rgw/rgw_reshard.h b/src/rgw/rgw_reshard.h index 9a71e30788769..ddde84635ec46 100644 --- a/src/rgw/rgw_reshard.h +++ b/src/rgw/rgw_reshard.h @@ -7,6 +7,7 @@ #include #include "include/rados/librados.hpp" #include "cls/rgw/cls_rgw_types.h" +#include "cls/lock/cls_lock_client.h" class CephContext; class RGWRados; @@ -16,8 +17,11 @@ class RGWReshard { RGWRados *store; string lock_name; int max_jobs; + rados::cls::lock::Lock instance_lock; + librados::IoCtx io_ctx; int get_io_ctx(librados::IoCtx& io_ctx); + public: RGWReshard(CephContext* cct, RGWRados* _store); int add(cls_rgw_reshard_entry& entry); @@ -27,7 +31,6 @@ class RGWReshard { int list(string& marker, uint32_t max, list& entries, bool& is_truncated); int set_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry); int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry); - int get_bucket_instance_info(const rgw_bucket& bucket, RGWBucketInfo& bucket_info); }; #endif -- 2.39.5