public:
BucketReshardShard(RGWRados *_store, const RGWBucketInfo& _bucket_info,
int _num_shard,
- deque<librados::AioCompletion *>& _completions) : store(_store), bucket_info(_bucket_info), bs(store),
- aio_completions(_completions) {
+ deque<librados::AioCompletion *>& _completions) :
+ store(_store), bucket_info(_bucket_info), bs(store),
+ aio_completions(_completions)
+ {
num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1);
bs.init(bucket_info.bucket, num_shard);
}
return ret;
}
}
+
return 0;
}
+
int flush() {
if (entries.size() == 0) {
return 0;
}
}
+ /*
+ * did_flush is set if not nullptr and a flush occurred; otherwise not altered
+ */
int add_entry(int shard_index,
rgw_cls_bi_entry& entry, bool account, uint8_t category,
const rgw_bucket_category_stats& entry_stats) {
- int ret = target_shards[shard_index]->add_entry(entry, account, category, entry_stats);
+ int ret = target_shards[shard_index]->add_entry(entry, account, category,
+ entry_stats);
if (ret < 0) {
- derr << "ERROR: target_shards.add_entry(" << entry.idx << ") returned error: " << cpp_strerror(-ret) << dendl;
+ derr << "ERROR: target_shards.add_entry(" << entry.idx <<
+ ") returned error: " << cpp_strerror(-ret) << dendl;
return ret;
}
+
return 0;
}
}
};
-RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, const map<string, bufferlist>& _bucket_attrs) :
- store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs),
- reshard_lock(reshard_lock_name), locked_bucket(false) {
+RGWBucketReshard::RGWBucketReshard(RGWRados *_store,
+ const RGWBucketInfo& _bucket_info,
+ const map<string, bufferlist>& _bucket_attrs,
+ RenewLocksCallback _renew_locks_callback) :
+ store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs),
+ reshard_lock(reshard_lock_name),
+ renew_locks_callback(_renew_locks_callback)
+{
const rgw_bucket& b = bucket_info.bucket;
- reshard_oid = b.tenant + (b.tenant.empty() ? "" : ":") + b.name + ":" + b.bucket_id;
+ reshard_oid = b.get_key(':');
+
+ const int lock_dur_secs =
+ store->ctx()->_conf->rgw_reshard_bucket_lock_duration;
+ lock_duration = std::chrono::seconds(lock_dur_secs);
- utime_t lock_duration(store->ctx()->_conf->rgw_reshard_bucket_lock_duration, 0);
#define COOKIE_LEN 16
char cookie_buf[COOKIE_LEN + 1];
gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
reshard_lock.set_duration(lock_duration);
}
-RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, const map<string, bufferlist>& _bucket_attrs, rados::cls::lock::Lock& _reshard_lock, const utime_t& _lock_start_time) :
- store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs),
- reshard_lock(_reshard_lock), lock_start_time(_lock_start_time), locked_bucket(true)
-{
- const rgw_bucket& b = bucket_info.bucket;
- reshard_oid = b.tenant + (b.tenant.empty() ? "" : ":") + b.name + ":" + b.bucket_id;
-}
-
int RGWBucketReshard::lock_bucket()
{
+ reshard_lock.set_must_renew(false);
int ret = reshard_lock.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
if (ret < 0) {
ldout(store->ctx(), 0) << "RGWReshard::add failed to acquire lock on " << reshard_oid << " ret=" << ret << dendl;
return ret;
}
- lock_start_time = ceph_clock_now();
- locked_bucket = true;
+ lock_start_time = Clock::now();
+ lock_renew_thresh = lock_start_time + lock_duration / 2;
+
return 0;
}
if (ret < 0) {
ldout(store->ctx(), 0) << "WARNING: RGWReshard::add failed to drop lock on " << reshard_oid << " ret=" << ret << dendl;
}
- locked_bucket = false;
}
-
-int RGWBucketReshard::renew_lock_bucket()
+int RGWBucketReshard::renew_lock_bucket(const Clock::time_point& now)
{
- if (!locked_bucket) {
- return 0;
- }
-
- utime_t now = ceph_clock_now();
- /* do you need to renew lock? */
- if (now > lock_start_time + store->ctx()->_conf->rgw_reshard_bucket_lock_duration/ 2) {
- reshard_lock.set_must_renew(true);
- int ret = reshard_lock.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
- if (ret == -EBUSY) { /* already locked by another processor */
- ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << reshard_oid << dendl;
+ // assume outer locks have timespans at least the size of ours, so
+ // can call inside conditional
+ if (renew_locks_callback) {
+ int ret = renew_locks_callback(now);
+ if (ret < 0) {
return ret;
}
- lock_start_time = now;
}
+
+ reshard_lock.set_must_renew(true);
+ int ret = reshard_lock.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
+ if (ret < 0) { /* expired or already locked by another processor */
+ ldout(store->ctx(), 5) << __func__ << "(): failed to renew lock on " <<
+ reshard_oid << " with " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+ lock_start_time = now;
+ lock_renew_thresh = lock_start_time + lock_duration / 2;
+ ldout(store->ctx(), 20) << __func__ << "(): successfully renewed lock on " <<
+ reshard_oid << dendl;
+
return 0;
}
}
public:
- BucketInfoReshardUpdate(RGWRados *_store, RGWBucketInfo& _bucket_info,
- map<string, bufferlist>& _bucket_attrs, const string& new_bucket_id) : store(_store),
- bucket_info(_bucket_info),
- bucket_attrs(_bucket_attrs) {
+ BucketInfoReshardUpdate(RGWRados *_store,
+ RGWBucketInfo& _bucket_info,
+ map<string, bufferlist>& _bucket_attrs,
+ const string& new_bucket_id) :
+ store(_store),
+ bucket_info(_bucket_info),
+ bucket_attrs(_bucket_attrs)
+ {
bucket_info.new_bucket_instance_id = new_bucket_id;
}
+
~BucketInfoReshardUpdate() {
if (in_progress) {
bucket_info.new_bucket_instance_id.clear();
}
};
-int RGWBucketReshard::do_reshard(
- int num_shards,
- RGWBucketInfo& new_bucket_info,
- int max_entries,
- bool verbose,
- ostream *out,
- Formatter *formatter)
+int RGWBucketReshard::do_reshard(int num_shards,
+ RGWBucketInfo& new_bucket_info,
+ int max_entries,
+ bool verbose,
+ ostream *out,
+ Formatter *formatter)
{
rgw_bucket& bucket = bucket_info.bucket;
ret = store->bi_list(bucket, i, string(), marker, max_entries, &entries, &is_truncated);
if (ret < 0 && ret != -ENOENT) {
derr << "ERROR: bi_list(): " << cpp_strerror(-ret) << dendl;
- return -ret;
+ return ret;
}
list<rgw_cls_bi_entry>::iterator iter;
int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
- ret = target_shards_mgr.add_entry(shard_index, entry, account, category, stats);
+ ret = target_shards_mgr.add_entry(shard_index, entry, account,
+ category, stats);
if (ret < 0) {
return ret;
}
+
+ Clock::time_point now = Clock::now();
+ if (now >= lock_renew_thresh) {
+ ret = renew_lock_bucket(now);
+ if (ret < 0) {
+ lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl;
+ return ret;
+ }
+ }
+
if (verbose) {
formatter->close_section();
if (out) {
formatter->flush(*out);
- formatter->flush(*out);
}
} else if (out && !(total_entries % 1000)) {
(*out) << " " << total_entries;
}
- }
- ret = renew_lock_bucket();
- if (ret < 0) {
- lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl;
- return -ret;
- }
- ret = renew_lock_bucket();
- if (ret < 0) {
- lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl;
- return -ret;
- }
+ } // entries loop
}
}
ret = target_shards_mgr.finish();
if (ret < 0) {
lderr(store->ctx()) << "ERROR: failed to reshard" << dendl;
- return EIO;
+ return -EIO;
}
ret = rgw_link_bucket(store, new_bucket_info.owner, new_bucket_info.bucket, bucket_info.creation_time);
if (ret < 0) {
lderr(store->ctx()) << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << cpp_strerror(-ret) << ")" << dendl;
- return -ret;
+ return ret;
}
ret = bucket_info_updater.complete();
ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
/* don't error out, reshard process succeeded */
}
+
return 0;
-}
+} // RGWBucketReshard::do_reshard
int RGWBucketReshard::get_status(list<cls_rgw_bucket_instance_entry> *status)
{
int RGWBucketReshard::execute(int num_shards, int max_op_entries,
- bool verbose, ostream *out, Formatter *formatter, RGWReshard* reshard_log)
-
+ bool verbose, ostream *out, Formatter *formatter,
+ RGWReshard* reshard_log)
{
int ret = lock_bucket();
if (ret < 0) {
RGWReshard::RGWReshard(RGWRados* _store, bool _verbose, ostream *_out,
- Formatter *_formatter) : store(_store), instance_lock(bucket_instance_lock_name),
- verbose(_verbose), out(_out), formatter(_formatter)
+ Formatter *_formatter) :
+ store(_store), instance_lock(bucket_instance_lock_name),
+ verbose(_verbose), out(_out), formatter(_formatter)
{
num_logshards = store->ctx()->_conf->rgw_reshard_num_logs;
}
bool truncated = true;
CephContext *cct = store->ctx();
- int max_entries = 1000;
+ constexpr uint32_t max_entries = 1000;
int max_secs = store->ctx()->_conf->rgw_reshard_bucket_lock_duration;
+ std::chrono::seconds lock_duration(max_secs);
rados::cls::lock::Lock l(reshard_lock_name);
-
- utime_t time(max_secs, 0);
- l.set_duration(time);
+ l.set_duration(lock_duration);
char cookie_buf[COOKIE_LEN + 1];
gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
cookie_buf[COOKIE_LEN] = '\0';
-
l.set_cookie(cookie_buf);
string logshard_oid;
ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl;
return ret;
}
-
- utime_t lock_start_time = ceph_clock_now();
+ Clock::time_point lock_start_time = Clock::now();
+ const char* const log_func_name = __func__;
+
+ auto renew_locks_callback =
+ [&l, &lock_start_time, &logshard_oid, &log_func_name, this](const Clock::time_point& now) -> int {
+ l.set_must_renew(true);
+ int ret = l.lock_exclusive(&this->store->reshard_pool_ctx, logshard_oid);
+ if (ret < 0) { /* expired or already locked by another processor */
+ ldout(this->store->ctx(), 5) << log_func_name <<
+ "[lambda](): failed to renew lock on " << logshard_oid <<
+ " with " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+ lock_start_time = now;
+ ldout(this->store->ctx(), 20) << log_func_name <<
+ "[lambda](): successfully renewed lock on " << logshard_oid << dendl;
+ return 0;
+ };
do {
std::list<cls_rgw_reshard_entry> entries;
continue;
}
- for(auto& entry: entries) {
+ for(auto& entry: entries) { // logshard entries
if(entry.new_instance_id.empty()) {
- ldout(store->ctx(), 20) << __func__ << " resharding " << entry.bucket_name << dendl;
+ ldout(store->ctx(), 20) << __func__ << " resharding " <<
+ entry.bucket_name << dendl;
RGWObjectCtx obj_ctx(store);
rgw_bucket bucket;
RGWBucketInfo bucket_info;
map<string, bufferlist> attrs;
- ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, bucket_info, nullptr,
- &attrs);
+ ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name,
+ bucket_info, nullptr, &attrs);
if (ret < 0) {
ldout(cct, 0) << __func__ << ": Error in get_bucket_info: " << cpp_strerror(-ret) << dendl;
return -ret;
}
- RGWBucketReshard br(store, bucket_info, attrs, l, lock_start_time);
+ RGWBucketReshard br(store, bucket_info, attrs, renew_locks_callback);
Formatter* formatter = new JSONFormatter(false);
auto formatter_ptr = std::unique_ptr<Formatter>(formatter);
- ret = br.execute(entry.new_num_shards, max_entries, true,nullptr, formatter, this);
+ ret = br.execute(entry.new_num_shards, max_entries, true, nullptr,
+ formatter, this);
if (ret < 0) {
ldout (store->ctx(), 0) << __func__ << "ERROR in reshard_bucket " << entry.bucket_name << ":" <<
cpp_strerror(-ret)<< dendl;
<< cpp_strerror(-ret) << dendl;
return ret;
}
- ret = br.renew_lock_bucket();
+ }
+
+ Clock::time_point now = Clock::now();
+ if (now >= lock_start_time + lock_duration / 2) {
+ ret = renew_locks_callback(now);
if (ret < 0) {
- ldout(cct, 0)<< __func__ << ":Error renewing bucket " << entry.bucket_name << " lock: "
- << cpp_strerror(-ret) << dendl;
return ret;
}
}
- utime_t now = ceph_clock_now();
-
- if (now > lock_start_time + max_secs / 2) { /* do you need to renew lock? */
- l.set_must_renew(true);
- ret = l.lock_exclusive(&store->reshard_pool_ctx, logshard_oid);
- if (ret == -EBUSY) { /* already locked by another processor */
- ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl;
- return ret;
- }
- lock_start_time = now;
- }
+
entry.get_key(&marker);
}
} while (truncated);