From cac047a88b1d4d74ba8e266151b95008385a970b Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Thu, 18 May 2017 11:51:21 -0700 Subject: [PATCH] rgw: replace reshard blocking sleep with interruptible condition wait Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_rados.cc | 19 +++++++++++++++---- src/rgw/rgw_rados.h | 3 +++ src/rgw/rgw_reshard.cc | 33 +++++++++++++++++++++++++++------ src/rgw/rgw_reshard.h | 24 +++++++++++++++++++++++- 4 files changed, 68 insertions(+), 11 deletions(-) diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 51a21a89350c0..fb3abc2cc283d 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -3719,6 +3719,10 @@ void RGWRados::finalize() delete obj_tombstone_cache; delete sync_modules_manager; + if (reshard_wait.get()) { + reshard_wait->stop(); + reshard_wait.reset(); + } delete reshard; delete index_completion_manager; } @@ -4532,6 +4536,8 @@ int RGWRados::init_complete() obj_tombstone_cache = new tombstone_cache_t(cct->_conf->rgw_obj_tombstone_cache_size); } + reshard_wait = std::make_shared(this); + reshard = new RGWReshard(this); index_completion_manager = new RGWIndexCompletionManager(this); ret = index_completion_manager->start(); @@ -9804,9 +9810,8 @@ int RGWRados::Bucket::UpdateIndex::guard_reshard(BucketShard **pbs, std::functio break; } ldout(store->ctx(), 0) << "NOTICE: resharding operation on bucket index detected, blocking" << dendl; - RGWReshard reshard(store); string new_bucket_id; - r = reshard.block_while_resharding(bs, &new_bucket_id); + r = store->block_while_resharding(bs, &new_bucket_id); if (r == -ERR_BUSY_RESHARDING) { continue; } @@ -10772,9 +10777,8 @@ int RGWRados::guard_reshard(BucketShard *bs, const rgw_obj& obj_instance, std::f break; } ldout(cct, 0) << "NOTICE: resharding operation on bucket index detected, blocking" << dendl; - RGWReshard reshard(this); string new_bucket_id; - r = reshard.block_while_resharding(bs, &new_bucket_id); + r = block_while_resharding(bs, &new_bucket_id); if (r == -ERR_BUSY_RESHARDING) { continue; } @@ -10796,6 +10800,13 @@ int RGWRados::guard_reshard(BucketShard *bs, const rgw_obj& obj_instance, std::f return 0; } +int RGWRados::block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id) +{ + std::shared_ptr waiter = reshard_wait; + + return waiter->block_while_resharding(bs, new_bucket_id); +} + int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjState& olh_state, const rgw_obj& obj_instance, bool delete_marker, const string& op_tag, diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 88594eba0266e..fe6633b9da3a9 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -39,6 +39,7 @@ class RGWRESTConn; struct RGWZoneGroup; struct RGWZoneParams; class RGWReshard; +class RGWReshardWait; /* flags for put_obj_meta() */ #define PUT_OBJ_CREATE 0x01 @@ -2456,6 +2457,7 @@ public: RGWDataChangesLog *data_log; RGWReshard *reshard; + std::shared_ptr reshard_wait; virtual ~RGWRados() = default; @@ -3252,6 +3254,7 @@ public: int obj_operate(const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::ObjectReadOperation *op); int guard_reshard(BucketShard *bs, const rgw_obj& obj_instance, std::function call); + int block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id); void bucket_index_guard_olh_op(RGWObjState& olh_state, librados::ObjectOperation& op); int olh_init_modification(const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, string *op_tag); diff --git a/src/rgw/rgw_reshard.cc b/src/rgw/rgw_reshard.cc index 851be230e76e1..10b7f87df3812 100644 --- a/src/rgw/rgw_reshard.cc +++ b/src/rgw/rgw_reshard.cc @@ -604,15 +604,28 @@ int RGWReshard::unlock_bucket_index(const string& oid) const int num_retries = 10; const int default_reshard_sleep_duration = 5; -int RGWReshard::block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id) +int RGWReshardWait::do_wait() +{ + Mutex::Locker l(lock); + + cond.WaitInterval(lock, utime_t(default_reshard_sleep_duration, 0)); + + if (going_down) { + return -ECANCELED; + } + + return 0; +} + +int RGWReshardWait::block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id) { int ret = 0; cls_rgw_bucket_instance_entry entry; - for (int i=0; i< num_retries;i++) { + for (int i=0; i < num_retries;i++) { ret = cls_rgw_get_bucket_resharding(bs->index_ctx, bs->bucket_obj, &entry); if (ret < 0) { - ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: failed to get bucket resharding :" << + ldout(store->ctx(), 0) << __func__ << " ERROR: failed to get bucket resharding :" << cpp_strerror(-ret)<< dendl; return ret; } @@ -622,10 +635,18 @@ int RGWReshard::block_while_resharding(RGWRados::BucketShard *bs, string *new_bu } ldout(store->ctx(), 20) << "NOTICE: reshard still in progress; " << (i < num_retries - 1 ? "retrying" : "too many retries") << dendl; /* needed to unlock as clear resharding uses the same lock */ -#warning replace sleep with interruptible condition - sleep(default_reshard_sleep_duration); + + if (i == num_retries - 1) { + break; + } + + ret = do_wait(); + if (ret < 0) { + ldout(store->ctx(), 0) << __func__ << " ERROR: bucket is still resharding, please retry" << dendl; + return ret; + } } - ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: bucket is still resharding, please retry" << dendl; + ldout(store->ctx(), 0) << __func__ << " ERROR: bucket is still resharding, please retry" << dendl; return -ERR_BUSY_RESHARDING; } diff --git a/src/rgw/rgw_reshard.h b/src/rgw/rgw_reshard.h index bb13753fc2e27..1d4366203674c 100644 --- a/src/rgw/rgw_reshard.h +++ b/src/rgw/rgw_reshard.h @@ -106,7 +106,6 @@ protected: int remove(cls_rgw_reshard_entry& entry); int list(string& marker, uint32_t max, list& entries, bool& is_truncated); int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry); - int block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id); int reshard_bucket(Formatter *formatter, int num_shards, @@ -125,4 +124,27 @@ protected: void stop_processor(); }; + +class RGWReshardWait { + RGWRados *store; + Mutex lock{"RGWReshardWait::lock"}; + Cond cond; + + bool going_down{false}; + + int do_wait(); +public: + RGWReshardWait(RGWRados *_store) : store(_store) {} + ~RGWReshardWait() { + assert(going_down); + } + int block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id); + + void stop() { + Mutex::Locker l(lock); + going_down = true; + cond.SignalAll(); + } +}; + #endif -- 2.39.5