return true;
}
+// note: currently only called by tesing code
void cls_rgw_bucket_init_index(ObjectWriteOperation& o)
{
bufferlist in;
}
static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx,
- const string& oid, BucketIndexAioManager *manager) {
+ const string& oid,
+ BucketIndexAioManager *manager) {
bufferlist in;
librados::ObjectWriteOperation op;
op.create(true);
return manager->aio_operate(io_ctx, oid, &op);
}
+static bool issue_bucket_index_clean_op(librados::IoCtx& io_ctx,
+ const string& oid,
+ BucketIndexAioManager *manager) {
+ bufferlist in;
+ librados::ObjectWriteOperation op;
+ op.remove();
+ return manager->aio_operate(io_ctx, oid, &op);
+}
+
static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
const string& oid, uint64_t timeout, BucketIndexAioManager *manager) {
bufferlist in;
void CLSRGWIssueBucketIndexInit::cleanup()
{
// Do best effort removal
- for (map<int, string>::iterator citer = objs_container.begin(); citer != iter; ++citer) {
+ for (auto citer = objs_container.begin(); citer != iter; ++citer) {
io_ctx.remove(citer->second);
}
}
+int CLSRGWIssueBucketIndexClean::issue_op(int shard_id, const string& oid)
+{
+ return issue_bucket_index_clean_op(io_ctx, oid, &manager);
+}
+
int CLSRGWIssueSetTagTimeout::issue_op(int shard_id, const string& oid)
{
return issue_bucket_set_tag_timeout_op(io_ctx, oid, tag_timeout, &manager);
+// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
#ifndef CEPH_CLS_RGW_CLIENT_H
#define CEPH_CLS_RGW_CLIENT_H
virtual void reset_container(map<int, string>& objs) {}
public:
- CLSRGWConcurrentIO(librados::IoCtx& ioc, map<int, string>& _objs_container,
- uint32_t _max_aio) : io_ctx(ioc), objs_container(_objs_container), max_aio(_max_aio) {}
- virtual ~CLSRGWConcurrentIO() {}
+
+ CLSRGWConcurrentIO(librados::IoCtx& ioc,
+ map<int, string>& _objs_container,
+ uint32_t _max_aio) :
+ io_ctx(ioc), objs_container(_objs_container), max_aio(_max_aio)
+ {}
+
+ virtual ~CLSRGWConcurrentIO()
+ {}
int operator()() {
int ret = 0;
CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
};
+
+class CLSRGWIssueBucketIndexClean : public CLSRGWConcurrentIO {
+protected:
+ int issue_op(int shard_id, const string& oid) override;
+ int valid_ret_code() override {
+ return -ENOENT;
+ }
+
+public:
+ CLSRGWIssueBucketIndexClean(librados::IoCtx& ioc,
+ map<int, string>& _bucket_objs,
+ uint32_t _max_aio) :
+ CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio)
+ {}
+};
+
+
class CLSRGWIssueSetTagTimeout : public CLSRGWConcurrentIO {
uint64_t tag_timeout;
protected:
}
return ret;
}
-};
+}; // class BucketReshardShard
+
class BucketReshardManager {
RGWRados *store;
vector<BucketReshardShard *> target_shards;
public:
- BucketReshardManager(RGWRados *_store, const RGWBucketInfo& _target_bucket_info, int _num_target_shards) : store(_store), target_bucket_info(_target_bucket_info),
- num_target_shards(_num_target_shards) {
+ BucketReshardManager(RGWRados *_store,
+ const RGWBucketInfo& _target_bucket_info,
+ int _num_target_shards) :
+ store(_store), target_bucket_info(_target_bucket_info),
+ num_target_shards(_num_target_shards)
+ {
target_shards.resize(num_target_shards);
for (int i = 0; i < num_target_shards; ++i) {
target_shards[i] = new BucketReshardShard(store, target_bucket_info, i, completions);
for (auto& shard : target_shards) {
int ret = shard->wait_all_aio();
if (ret < 0) {
- ldout(store->ctx(), 20) << __func__ << ": shard->wait_all_aio() returned ret=" << ret << dendl;
+ ldout(store->ctx(), 20) << __func__ <<
+ ": shard->wait_all_aio() returned ret=" << ret << dendl;
}
}
}
- /*
- * did_flush is set if not nullptr and a flush occurred; otherwise not altered
- */
int add_entry(int shard_index,
rgw_cls_bi_entry& entry, bool account, uint8_t category,
const rgw_bucket_category_stats& entry_stats) {
target_shards.clear();
return ret;
}
-};
+}; // class BucketReshardManager
RGWBucketReshard::RGWBucketReshard(RGWRados *_store,
const RGWBucketInfo& _bucket_info,
int RGWBucketReshard::create_new_bucket_instance(int new_num_shards,
RGWBucketInfo& new_bucket_info)
{
- return ::create_new_bucket_instance(store, new_num_shards, bucket_info, bucket_attrs, new_bucket_info);
+ return ::create_new_bucket_instance(store, new_num_shards,
+ bucket_info, bucket_attrs, new_bucket_info);
}
int RGWBucketReshard::cancel()
~BucketInfoReshardUpdate() {
if (in_progress) {
+ // resharding must not have ended correctly, clean up
int ret =
RGWBucketReshard::clear_index_shard_reshard_status(store, bucket_info);
if (ret < 0) {
" clear_index_shard_status returned " << ret << dendl;
}
bucket_info.new_bucket_instance_id.clear();
- set_status(CLS_RGW_RESHARD_NONE); // saves new_bucket_instance as well
+ set_status(CLS_RGW_RESHARD_NONE); // clears new_bucket_instance as well
}
}
int ret = 0;
if (out) {
- (*out) << "*** NOTICE: operation will not remove old bucket index objects ***" << std::endl;
- (*out) << "*** these will need to be removed manually ***" << std::endl;
(*out) << "tenant: " << bucket_info.bucket.tenant << std::endl;
(*out) << "bucket name: " << bucket_info.bucket.name << std::endl;
- (*out) << "old bucket instance id: " << bucket_info.bucket.bucket_id << std::endl;
- (*out) << "new bucket instance id: " << new_bucket_info.bucket.bucket_id << std::endl;
+ (*out) << "old bucket instance id: " << bucket_info.bucket.bucket_id <<
+ std::endl;
+ (*out) << "new bucket instance id: " << new_bucket_info.bucket.bucket_id <<
+ std::endl;
}
- /* update bucket info -- in progress*/
+ /* update bucket info -- in progress*/
list<rgw_cls_bi_entry> entries;
if (max_entries < 0) {
- ldout(store->ctx(), 0) << __func__ << ": can't reshard, negative max_entries" << dendl;
+ ldout(store->ctx(), 0) << __func__ <<
+ ": can't reshard, negative max_entries" << dendl;
return -EINVAL;
}
cout << "total entries:";
}
- int num_source_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
+ const int num_source_shards =
+ (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
string marker;
for (int i = 0; i < num_source_shards; ++i) {
bool is_truncated = true;
return ret;
}
- list<rgw_cls_bi_entry>::iterator iter;
- for (iter = entries.begin(); iter != entries.end(); ++iter) {
+ for (auto iter = entries.begin(); iter != entries.end(); ++iter) {
rgw_cls_bi_entry& entry = *iter;
if (verbose) {
formatter->open_object_section("entry");
}
return 0;
+ // NB: some error clean-up is done by ~BucketInfoReshardUpdate
} // RGWBucketReshard::do_reshard
int RGWBucketReshard::get_status(list<cls_rgw_bucket_instance_entry> *status)
bool verbose, ostream *out, Formatter *formatter,
RGWReshard* reshard_log)
{
+ Clock::time_point now;
+
int ret = reshard_lock.lock();
if (ret < 0) {
return ret;
RGWBucketInfo new_bucket_info;
ret = create_new_bucket_instance(num_shards, new_bucket_info);
if (ret < 0) {
- reshard_lock.unlock();
- return ret;
+ // shard state is uncertain, but this will attempt to remove them anyway
+ goto error_out;
}
if (reshard_log) {
ret = reshard_log->update(bucket_info, new_bucket_info);
if (ret < 0) {
- reshard_lock.unlock();
- return ret;
+ goto error_out;
}
}
+ // set resharding status of current bucket_info & shards with
+ // information about planned resharding
ret = set_resharding_status(new_bucket_info.bucket.bucket_id,
num_shards, CLS_RGW_RESHARD_IN_PROGRESS);
if (ret < 0) {
new_bucket_info,
max_op_entries,
verbose, out, formatter);
+ if (ret < 0) {
+ goto error_out;
+ }
+
+ // at this point we've done the main work; we'll make a best-effort
+ // to clean-up but will not indicate any errors encountered
+ reshard_lock.unlock();
+
+ // resharding successful, so remove old bucket index shards; use
+ // best effort and don't report out an error; the lock isn't needed
+ // at this point since all we're using a best effor to to remove old
+ // shard objects
+ ret = store->clean_bucket_index(bucket_info, bucket_info.num_shards);
if (ret < 0) {
- reshard_lock.unlock();
- return ret;
+ lderr(store->ctx()) << "Error: " << __func__ <<
+ " failed to clean up old shards; " <<
+ "RGWRados::clean_bucket_index returned " << ret << dendl;
}
- ret = set_resharding_status(new_bucket_info.bucket.bucket_id, num_shards,
- CLS_RGW_RESHARD_DONE);
+ ret = rgw_bucket_instance_remove_entry(store,
+ bucket_info.bucket.get_key(),
+ nullptr);
if (ret < 0) {
- reshard_lock.unlock();
- return ret;
+ lderr(store->ctx()) << "Error: " << __func__ <<
+ " failed to clean old bucket info object \"" <<
+ bucket_info.bucket.get_key() <<
+ "\"created after successufl resharding with error " << ret << dendl;
}
+ return 0;
+
+error_out:
+
reshard_lock.unlock();
- return 0;
-}
+ // since the real problem is the issue that led to this error code
+ // path, we won't touch ret and instead use another variable to
+ // temporarily error codes
+ int ret2 = store->clean_bucket_index(new_bucket_info,
+ new_bucket_info.num_shards);
+ if (ret2 < 0) {
+ lderr(store->ctx()) << "Error: " << __func__ <<
+ " failed to clean up shards from failed incomplete resharding; " <<
+ "RGWRados::clean_bucket_index returned " << ret2 << dendl;
+ }
+
+ ret2 = rgw_bucket_instance_remove_entry(store,
+ new_bucket_info.bucket.get_key(),
+ nullptr);
+ if (ret2 < 0) {
+ lderr(store->ctx()) << "Error: " << __func__ <<
+ " failed to clean bucket info object \"" <<
+ new_bucket_info.bucket.get_key() <<
+ "\"created during incomplete resharding with error " << ret2 << dendl;
+ }
+
+ return ret;
+} // execute
RGWReshard::RGWReshard(RGWRados* _store, bool _verbose, ostream *_out,
num_logshards = store->ctx()->_conf->rgw_reshard_num_logs;
}
-string RGWReshard::get_logshard_key(const string& tenant, const string& bucket_name)
+string RGWReshard::get_logshard_key(const string& tenant,
+ const string& bucket_name)
{
return tenant + ":" + bucket_name;
}