RGWBucketReshard::RGWBucketReshard(RGWRados *_store,
const RGWBucketInfo& _bucket_info,
const map<string, bufferlist>& _bucket_attrs,
- RenewLocksCallback _renew_locks_callback) :
+ RGWBucketReshardLock* _outer_reshard_lock) :
store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs),
- reshard_lock(reshard_lock_name),
- renew_locks_callback(_renew_locks_callback)
-{
- const rgw_bucket& b = bucket_info.bucket;
- reshard_oid = b.get_key(':');
-
- const int lock_dur_secs =
- store->ctx()->_conf->rgw_reshard_bucket_lock_duration;
- lock_duration = std::chrono::seconds(lock_dur_secs);
-
-#define COOKIE_LEN 16
- char cookie_buf[COOKIE_LEN + 1];
- gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
- cookie_buf[COOKIE_LEN] = '\0';
-
- reshard_lock.set_cookie(cookie_buf);
- reshard_lock.set_duration(lock_duration);
-}
-
-int RGWBucketReshard::lock_bucket()
-{
- reshard_lock.set_must_renew(false);
- int ret = reshard_lock.lock_exclusive_ephemeral(&store->reshard_pool_ctx,
- reshard_oid);
- if (ret < 0) {
- ldout(store->ctx(), 0) << "RGWReshard::add failed to acquire lock on " <<
- reshard_oid << " ret=" << ret << dendl;
- return ret;
- }
- lock_start_time = Clock::now();
- lock_renew_thresh = lock_start_time + lock_duration / 2;
-
- return 0;
-}
-
-void RGWBucketReshard::unlock_bucket()
-{
- int ret = reshard_lock.unlock(&store->reshard_pool_ctx, reshard_oid);
- if (ret < 0) {
- ldout(store->ctx(), 0) << "WARNING: RGWReshard::add failed to drop lock on " << reshard_oid << " ret=" << ret << dendl;
- }
-}
-
-int RGWBucketReshard::renew_lock_bucket(const Clock::time_point& now)
-{
- // assume outer locks have timespans at least the size of ours, so
- // can call inside conditional
- if (renew_locks_callback) {
- int ret = renew_locks_callback(now);
- if (ret < 0) {
- return ret;
- }
- }
-
- reshard_lock.set_must_renew(true);
- int ret = reshard_lock.lock_exclusive_ephemeral(&store->reshard_pool_ctx,
- reshard_oid);
- if (ret < 0) { /* expired or already locked by another processor */
- ldout(store->ctx(), 5) << __func__ << "(): failed to renew lock on " <<
- reshard_oid << " with " << cpp_strerror(-ret) << dendl;
- return ret;
- }
- reshard_lock.set_must_renew(false);
- lock_start_time = now;
- lock_renew_thresh = lock_start_time + lock_duration / 2;
- ldout(store->ctx(), 20) << __func__ << "(): successfully renewed lock on " <<
- reshard_oid << dendl;
-
- return 0;
-}
+ reshard_lock(store, bucket_info, true),
+ outer_reshard_lock(_outer_reshard_lock)
+{ }
int RGWBucketReshard::set_resharding_status(RGWRados* store,
RGWBucketInfo& bucket_info,
int RGWBucketReshard::cancel()
{
- int ret = lock_bucket();
+ int ret = reshard_lock.lock();
if (ret < 0) {
return ret;
}
ret = clear_resharding();
- unlock_bucket();
+ reshard_lock.unlock();
return ret;
}
}
};
+
+RGWBucketReshardLock::RGWBucketReshardLock(RGWRados* _store,
+ const std::string& reshard_lock_oid,
+ bool _ephemeral) :
+ store(_store),
+ lock_oid(reshard_lock_oid),
+ ephemeral(_ephemeral),
+ internal_lock(reshard_lock_name)
+{
+ const int lock_dur_secs = store->ctx()->_conf->rgw_reshard_bucket_lock_duration;
+ duration = std::chrono::seconds(lock_dur_secs);
+
+#define COOKIE_LEN 16
+ char cookie_buf[COOKIE_LEN + 1];
+ gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
+ cookie_buf[COOKIE_LEN] = '\0';
+
+ internal_lock.set_cookie(cookie_buf);
+ internal_lock.set_duration(duration);
+}
+
+int RGWBucketReshardLock::lock() {
+ internal_lock.set_must_renew(false);
+ int ret;
+ if (ephemeral) {
+ ret = internal_lock.lock_exclusive_ephemeral(&store->reshard_pool_ctx,
+ lock_oid);
+ } else {
+ ret = internal_lock.lock_exclusive(&store->reshard_pool_ctx, lock_oid);
+ }
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "RGWReshardLock::" << __func__ <<
+ " failed to acquire lock on " << lock_oid << " ret=" << ret << dendl;
+ return ret;
+ }
+ reset_time(Clock::now());
+
+ return 0;
+}
+
+void RGWBucketReshardLock::unlock() {
+ int ret = internal_lock.unlock(&store->reshard_pool_ctx, lock_oid);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "WARNING: RGWBucketReshardLock::" << __func__ <<
+ " failed to drop lock on " << lock_oid << " ret=" << ret << dendl;
+ }
+}
+
+int RGWBucketReshardLock::renew(const Clock::time_point& now) {
+ internal_lock.set_must_renew(true);
+ int ret;
+ if (ephemeral) {
+ ret = internal_lock.lock_exclusive_ephemeral(&store->reshard_pool_ctx,
+ lock_oid);
+ } else {
+ ret = internal_lock.lock_exclusive(&store->reshard_pool_ctx, lock_oid);
+ }
+ if (ret < 0) { /* expired or already locked by another processor */
+ ldout(store->ctx(), 5) << __func__ << "(): failed to renew lock on " <<
+ lock_oid << " with " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+ internal_lock.set_must_renew(false);
+
+ reset_time(now);
+ ldout(store->ctx(), 20) << __func__ << "(): successfully renewed lock on " <<
+ lock_oid << dendl;
+
+ return 0;
+}
+
+
int RGWBucketReshard::do_reshard(int num_shards,
RGWBucketInfo& new_bucket_info,
int max_entries,
}
Clock::time_point now = Clock::now();
- if (now >= lock_renew_thresh) {
- ret = renew_lock_bucket(now);
+ if (reshard_lock.should_renew(now)) {
+ // assume outer locks have timespans at least the size of ours, so
+ // can call inside conditional
+ if (outer_reshard_lock) {
+ ret = outer_reshard_lock->renew(now);
+ if (ret < 0) {
+ return ret;
+ }
+ }
+ ret = reshard_lock.renew(now);
if (ret < 0) {
lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl;
return ret;
bool verbose, ostream *out, Formatter *formatter,
RGWReshard* reshard_log)
{
- int ret = lock_bucket();
+ int ret = reshard_lock.lock();
if (ret < 0) {
return ret;
}
RGWBucketInfo new_bucket_info;
ret = create_new_bucket_instance(num_shards, new_bucket_info);
if (ret < 0) {
- unlock_bucket();
+ reshard_lock.unlock();
return ret;
}
if (reshard_log) {
ret = reshard_log->update(bucket_info, new_bucket_info);
if (ret < 0) {
- unlock_bucket();
+ reshard_lock.unlock();
return ret;
}
}
- ret = set_resharding_status(new_bucket_info.bucket.bucket_id, num_shards, CLS_RGW_RESHARD_IN_PROGRESS);
+ ret = set_resharding_status(new_bucket_info.bucket.bucket_id,
+ num_shards, CLS_RGW_RESHARD_IN_PROGRESS);
if (ret < 0) {
- unlock_bucket();
+ reshard_lock.unlock();
return ret;
}
verbose, out, formatter);
if (ret < 0) {
- unlock_bucket();
+ reshard_lock.unlock();
return ret;
}
- ret = set_resharding_status(new_bucket_info.bucket.bucket_id, num_shards, CLS_RGW_RESHARD_DONE);
+ ret = set_resharding_status(new_bucket_info.bucket.bucket_id, num_shards,
+ CLS_RGW_RESHARD_DONE);
if (ret < 0) {
- unlock_bucket();
+ reshard_lock.unlock();
return ret;
}
- unlock_bucket();
+ reshard_lock.unlock();
return 0;
}
CephContext *cct = store->ctx();
constexpr uint32_t max_entries = 1000;
- int max_secs = store->ctx()->_conf->rgw_reshard_bucket_lock_duration;
- std::chrono::seconds lock_duration(max_secs);
-
- rados::cls::lock::Lock l(reshard_lock_name);
- l.set_duration(lock_duration);
-
- char cookie_buf[COOKIE_LEN + 1];
- gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
- cookie_buf[COOKIE_LEN] = '\0';
- l.set_cookie(cookie_buf);
string logshard_oid;
get_logshard_oid(logshard_num, &logshard_oid);
- int ret = l.lock_exclusive(&store->reshard_pool_ctx, logshard_oid);
+ RGWBucketReshardLock logshard_lock(store, logshard_oid, false);
+
+ int ret = logshard_lock.lock();
if (ret == -EBUSY) { /* already locked by another processor */
- ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl;
+ ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " <<
+ logshard_oid << dendl;
return ret;
}
- Clock::time_point lock_start_time = Clock::now();
- const char* const log_func_name = __func__;
-
- auto renew_locks_callback =
- [&l, &lock_start_time, &logshard_oid, &log_func_name, this](const Clock::time_point& now) -> int {
- l.set_must_renew(true);
- int ret = l.lock_exclusive(&this->store->reshard_pool_ctx, logshard_oid);
- if (ret < 0) { /* expired or already locked by another processor */
- ldout(this->store->ctx(), 5) << log_func_name <<
- "[lambda](): failed to renew lock on " << logshard_oid <<
- " with " << cpp_strerror(-ret) << dendl;
- return ret;
- }
- lock_start_time = now;
- ldout(this->store->ctx(), 20) << log_func_name <<
- "[lambda](): successfully renewed lock on " << logshard_oid << dendl;
- return 0;
- };
do {
std::list<cls_rgw_reshard_entry> entries;
ret = list(logshard_num, marker, max_entries, entries, &truncated);
if (ret < 0) {
- ldout(cct, 10) << "cannot list all reshards in logshard oid=" << logshard_oid << dendl;
+ ldout(cct, 10) << "cannot list all reshards in logshard oid=" <<
+ logshard_oid << dendl;
continue;
}
ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name,
bucket_info, nullptr, &attrs);
if (ret < 0) {
- ldout(cct, 0) << __func__ << ": Error in get_bucket_info: " << cpp_strerror(-ret) << dendl;
+ ldout(cct, 0) << __func__ << ": Error in get_bucket_info: " <<
+ cpp_strerror(-ret) << dendl;
return -ret;
}
- RGWBucketReshard br(store, bucket_info, attrs, renew_locks_callback);
+ RGWBucketReshard br(store, bucket_info, attrs, nullptr);
Formatter* formatter = new JSONFormatter(false);
auto formatter_ptr = std::unique_ptr<Formatter>(formatter);
ret = br.execute(entry.new_num_shards, max_entries, true, nullptr,
formatter, this);
if (ret < 0) {
- ldout (store->ctx(), 0) << __func__ << "ERROR in reshard_bucket " << entry.bucket_name << ":" <<
+ ldout (store->ctx(), 0) << __func__ <<
+ "ERROR in reshard_bucket " << entry.bucket_name << ":" <<
cpp_strerror(-ret)<< dendl;
return ret;
}
- ldout (store->ctx(), 20) << " removing entry" << entry.bucket_name<< dendl;
+ ldout (store->ctx(), 20) << " removing entry" << entry.bucket_name <<
+ dendl;
ret = remove(entry);
if (ret < 0) {
- ldout(cct, 0)<< __func__ << ":Error removing bucket " << entry.bucket_name << " for resharding queue: "
- << cpp_strerror(-ret) << dendl;
+ ldout(cct, 0)<< __func__ << ":Error removing bucket " <<
+ entry.bucket_name << " for resharding queue: " <<
+ cpp_strerror(-ret) << dendl;
return ret;
}
}
Clock::time_point now = Clock::now();
- if (now >= lock_start_time + lock_duration / 2) {
- ret = renew_locks_callback(now);
+ if (logshard_lock.should_renew(now)) {
+ ret = logshard_lock.renew(now);
if (ret < 0) {
return ret;
}
}
} while (truncated);
- l.unlock(&store->reshard_pool_ctx, logshard_oid);
+ logshard_lock.unlock();
return 0;
}
class CephContext;
class RGWRados;
+class RGWBucketReshardLock {
+ using Clock = ceph::coarse_mono_clock;
+
+ RGWRados* store;
+ const std::string lock_oid;
+ const bool ephemeral;
+ rados::cls::lock::Lock internal_lock;
+ std::chrono::seconds duration;
+
+ Clock::time_point start_time;
+ Clock::time_point renew_thresh;
+
+ void reset_time(const Clock::time_point& now) {
+ start_time = now;
+ renew_thresh = start_time + duration / 2;
+ }
+
+public:
+ RGWBucketReshardLock(RGWRados* _store,
+ const std::string& reshard_lock_oid,
+ bool _ephemeral);
+ RGWBucketReshardLock(RGWRados* _store,
+ const RGWBucketInfo& bucket_info,
+ bool _ephemeral) :
+ RGWBucketReshardLock(_store, bucket_info.bucket.get_key(':'), _ephemeral)
+ {}
+
+ int lock();
+ void unlock();
+ int renew(const Clock::time_point&);
+
+ bool should_renew(const Clock::time_point& now) const {
+ return now >= renew_thresh;
+ }
+}; // class RGWBucketReshardLock
+
class RGWBucketReshard {
public:
using Clock = ceph::coarse_mono_clock;
- // returns 0 for success or a negative error code
- using RenewLocksCallback = std::function<int(const Clock::time_point&)>;
-
private:
RGWRados *store;
RGWBucketInfo bucket_info;
std::map<string, bufferlist> bucket_attrs;
- string reshard_oid;
- rados::cls::lock::Lock reshard_lock;
- Clock::time_point lock_start_time;
- std::chrono::seconds lock_duration;
- Clock::time_point lock_renew_thresh;
-
- RenewLocksCallback renew_locks_callback;
+ RGWBucketReshardLock reshard_lock;
+ RGWBucketReshardLock* outer_reshard_lock;
- int lock_bucket();
- void unlock_bucket();
- int renew_lock_bucket(const Clock::time_point&);
int clear_resharding();
- int create_new_bucket_instance(int new_num_shards, RGWBucketInfo& new_bucket_info);
+ int create_new_bucket_instance(int new_num_shards,
+ RGWBucketInfo& new_bucket_info);
int do_reshard(int num_shards,
RGWBucketInfo& new_bucket_info,
int max_entries,
Formatter *formatter);
public:
- // pass nullptr for the final parameter if no callback is used
+ // pass nullptr for the final parameter if no outer reshard lock to
+ // manage
RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info,
const std::map<string, bufferlist>& _bucket_attrs,
- RenewLocksCallback _renew_locks_callback);
+ RGWBucketReshardLock* _outer_reshard_lock);
int execute(int num_shards, int max_op_entries,
bool verbose = false, ostream *out = nullptr,
Formatter *formatter = nullptr,