const string reshard_lock_name = "reshard_process";
const string bucket_instance_lock_name = "bucket_instance_lock";
-RGWReshard::RGWReshard(CephContext *_cct, RGWRados* _store):cct(_cct), store(_store)
+RGWReshard::RGWReshard(CephContext *_cct, RGWRados* _store):cct(_cct), store(_store),
+ instance_lock(bucket_instance_lock_name)
{
max_jobs = cct->_conf->rgw_reshard_max_jobs;
}
}
+int RGWReshard::lock_bucket_index_shared(const string& oid)
+{
+ int ret = get_io_ctx(io_ctx);
+ if (ret < 0)
+ return ret;
+
+ ret = instance_lock.lock_shared(&io_ctx, oid);
+ if (ret == -EBUSY) {
+ dout(0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl;
+ return 0;
+ }
+
+ return ret;
+}
+
+int RGWReshard::unlock_bucket_index(const string& oid)
+{
+ int ret = get_io_ctx(io_ctx);
+ if (ret < 0)
+ return ret;
+ instance_lock.unlock(&io_ctx, oid);
+ return 0;
+}
+
+
+const int num_retries = 10;
+const int default_reshard_sleep_duration = 30;
+
+int RGWReshard::block_while_resharding(const string& bucket_instance_oid)
+{
+ int ret = 0;
+ cls_rgw_bucket_instance_entry entry;
+ bool resharding = false;
+
+ for (int i=0; i< num_retries;i++) {
+ ret = lock_bucket_index_shared(bucket_instance_oid);
+ if (ret < 0) {
+ return ret;
+ }
+
+ ret = cls_rgw_get_bucket_resharding(io_ctx, bucket_instance_oid,
+ entry, resharding);
+ if (ret < 0) {
+ ldout(cct, 0) << "RGWReshard::" << __func__ << " ERROR: failed to get bucket resharding :" <<
+ cpp_strerror(-ret)<< dendl;
+ return ret;
+ }
+
+ if (resharding) {
+ ret = unlock_bucket_index(bucket_instance_oid);
+ if (ret < 0) {
+ return ret;
+ }
+ sleep(default_reshard_sleep_duration);
+ } else {
+ return 0;
+ }
+ }
+ ldout(cct, 0) << "RGWReshard::" << __func__ << " ERROR: bucket is still resharding, please retry" << dendl;
+ ret = unlock_bucket_index(bucket_instance_oid);
+ if (ret < 0) {
+ return ret;
+ }
+ return -EAGAIN;
+}
#include <vector>
#include "include/rados/librados.hpp"
#include "cls/rgw/cls_rgw_types.h"
+#include "cls/lock/cls_lock_client.h"
class CephContext;
class RGWRados;
RGWRados *store;
string lock_name;
int max_jobs;
+ rados::cls::lock::Lock instance_lock;
+ librados::IoCtx io_ctx;
int get_io_ctx(librados::IoCtx& io_ctx);
+
public:
RGWReshard(CephContext* cct, RGWRados* _store);
int add(cls_rgw_reshard_entry& entry);
int list(string& marker, uint32_t max, list<cls_rgw_reshard_entry>& entries, bool& is_truncated);
int set_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
- int get_bucket_instance_info(const rgw_bucket& bucket, RGWBucketInfo& bucket_info);
};
#endif