delete obj_tombstone_cache;
delete sync_modules_manager;
+ if (reshard_wait.get()) {
+ reshard_wait->stop();
+ reshard_wait.reset();
+ }
delete reshard;
delete index_completion_manager;
}
obj_tombstone_cache = new tombstone_cache_t(cct->_conf->rgw_obj_tombstone_cache_size);
}
+ reshard_wait = std::make_shared<RGWReshardWait>(this);
+
reshard = new RGWReshard(this);
index_completion_manager = new RGWIndexCompletionManager(this);
ret = index_completion_manager->start();
break;
}
ldout(store->ctx(), 0) << "NOTICE: resharding operation on bucket index detected, blocking" << dendl;
- RGWReshard reshard(store);
string new_bucket_id;
- r = reshard.block_while_resharding(bs, &new_bucket_id);
+ r = store->block_while_resharding(bs, &new_bucket_id);
if (r == -ERR_BUSY_RESHARDING) {
continue;
}
break;
}
ldout(cct, 0) << "NOTICE: resharding operation on bucket index detected, blocking" << dendl;
- RGWReshard reshard(this);
string new_bucket_id;
- r = reshard.block_while_resharding(bs, &new_bucket_id);
+ r = block_while_resharding(bs, &new_bucket_id);
if (r == -ERR_BUSY_RESHARDING) {
continue;
}
return 0;
}
+int RGWRados::block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id)
+{
+ std::shared_ptr<RGWReshardWait> waiter = reshard_wait;
+
+ return waiter->block_while_resharding(bs, new_bucket_id);
+}
+
int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjState& olh_state, const rgw_obj& obj_instance,
bool delete_marker,
const string& op_tag,
struct RGWZoneGroup;
struct RGWZoneParams;
class RGWReshard;
+class RGWReshardWait;
/* flags for put_obj_meta() */
#define PUT_OBJ_CREATE 0x01
RGWDataChangesLog *data_log;
RGWReshard *reshard;
+ std::shared_ptr<RGWReshardWait> reshard_wait;
virtual ~RGWRados() = default;
int obj_operate(const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::ObjectReadOperation *op);
int guard_reshard(BucketShard *bs, const rgw_obj& obj_instance, std::function<int(BucketShard *)> call);
+ int block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id);
void bucket_index_guard_olh_op(RGWObjState& olh_state, librados::ObjectOperation& op);
int olh_init_modification(const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, string *op_tag);
const int num_retries = 10;
const int default_reshard_sleep_duration = 5;
-int RGWReshard::block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id)
+int RGWReshardWait::do_wait()
+{
+ Mutex::Locker l(lock);
+
+ cond.WaitInterval(lock, utime_t(default_reshard_sleep_duration, 0));
+
+ if (going_down) {
+ return -ECANCELED;
+ }
+
+ return 0;
+}
+
+int RGWReshardWait::block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id)
{
int ret = 0;
cls_rgw_bucket_instance_entry entry;
- for (int i=0; i< num_retries;i++) {
+ for (int i=0; i < num_retries;i++) {
ret = cls_rgw_get_bucket_resharding(bs->index_ctx, bs->bucket_obj, &entry);
if (ret < 0) {
- ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: failed to get bucket resharding :" <<
+ ldout(store->ctx(), 0) << __func__ << " ERROR: failed to get bucket resharding :" <<
cpp_strerror(-ret)<< dendl;
return ret;
}
}
ldout(store->ctx(), 20) << "NOTICE: reshard still in progress; " << (i < num_retries - 1 ? "retrying" : "too many retries") << dendl;
/* needed to unlock as clear resharding uses the same lock */
-#warning replace sleep with interruptible condition
- sleep(default_reshard_sleep_duration);
+
+ if (i == num_retries - 1) {
+ break;
+ }
+
+ ret = do_wait();
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << __func__ << " ERROR: bucket is still resharding, please retry" << dendl;
+ return ret;
+ }
}
- ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: bucket is still resharding, please retry" << dendl;
+ ldout(store->ctx(), 0) << __func__ << " ERROR: bucket is still resharding, please retry" << dendl;
return -ERR_BUSY_RESHARDING;
}
int remove(cls_rgw_reshard_entry& entry);
int list(string& marker, uint32_t max, list<cls_rgw_reshard_entry>& entries, bool& is_truncated);
int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
- int block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id);
int reshard_bucket(Formatter *formatter,
int num_shards,
void stop_processor();
};
+
+class RGWReshardWait {
+ RGWRados *store;
+ Mutex lock{"RGWReshardWait::lock"};
+ Cond cond;
+
+ bool going_down{false};
+
+ int do_wait();
+public:
+ RGWReshardWait(RGWRados *_store) : store(_store) {}
+ ~RGWReshardWait() {
+ assert(going_down);
+ }
+ int block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id);
+
+ void stop() {
+ Mutex::Locker l(lock);
+ going_down = true;
+ cond.SignalAll();
+ }
+};
+
#endif