}
};
-void BucketIndexAioManager::do_completion(int id) {
+void BucketIndexAioManager::do_completion(const int request_id) {
std::lock_guard l{lock};
- auto iter = pendings.find(id);
+ auto iter = pendings.find(request_id);
ceph_assert(iter != pendings.end());
- completions[id] = iter->second;
+ completions[request_id] = iter->second;
pendings.erase(iter);
// If the caller needs a list of finished objects, store them
// for further processing
- auto miter = pending_objs.find(id);
+ auto miter = pending_objs.find(request_id);
if (miter != pending_objs.end()) {
- completion_objs[id] = miter->second;
+ completion_objs.emplace(request_id, miter->second);
pending_objs.erase(miter);
}
if (objs && r == 0) { /* update list of successfully completed objs */
auto liter = completion_objs.find(iter->first);
if (liter != completion_objs.end()) {
- (*objs)[liter->first] = liter->second;
+ (*objs)[liter->second.shard_id] = liter->second.oid;
}
}
if (ret_code && (r < 0 && r != valid_ret_code))
}
static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx,
+ const int shard_id,
const string& oid,
BucketIndexAioManager *manager) {
bufferlist in;
librados::ObjectWriteOperation op;
op.create(true);
op.exec(RGW_CLASS, RGW_BUCKET_INIT_INDEX, in);
- return manager->aio_operate(io_ctx, oid, &op);
+ return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
static bool issue_bucket_index_clean_op(librados::IoCtx& io_ctx,
+ const int shard_id,
const string& oid,
BucketIndexAioManager *manager) {
bufferlist in;
librados::ObjectWriteOperation op;
op.remove();
- return manager->aio_operate(io_ctx, oid, &op);
+ return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
- const string& oid, uint64_t timeout, BucketIndexAioManager *manager) {
+ const int shard_id,
+ const string& oid,
+ uint64_t timeout,
+ BucketIndexAioManager *manager) {
bufferlist in;
rgw_cls_tag_timeout_op call;
call.tag_timeout = timeout;
encode(call, in);
ObjectWriteOperation op;
op.exec(RGW_CLASS, RGW_BUCKET_SET_TAG_TIMEOUT, in);
- return manager->aio_operate(io_ctx, oid, &op);
+ return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
-int CLSRGWIssueBucketIndexInit::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBucketIndexInit::issue_op(const int shard_id, const string& oid)
{
- return issue_bucket_index_init_op(io_ctx, oid, &manager);
+ return issue_bucket_index_init_op(io_ctx, shard_id, oid, &manager);
}
void CLSRGWIssueBucketIndexInit::cleanup()
}
}
-int CLSRGWIssueBucketIndexClean::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBucketIndexClean::issue_op(const int shard_id, const string& oid)
{
- return issue_bucket_index_clean_op(io_ctx, oid, &manager);
+ return issue_bucket_index_clean_op(io_ctx, shard_id, oid, &manager);
}
-int CLSRGWIssueSetTagTimeout::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueSetTagTimeout::issue_op(const int shard_id, const string& oid)
{
- return issue_bucket_set_tag_timeout_op(io_ctx, oid, tag_timeout, &manager);
+ return issue_bucket_set_tag_timeout_op(io_ctx, shard_id, oid, tag_timeout, &manager);
}
void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation& o,
}
static bool issue_bucket_list_op(librados::IoCtx& io_ctx,
- const string& oid,
+ const int shard_id,
+ const std::string& oid,
const cls_rgw_obj_key& start_obj,
- const string& filter_prefix,
- const string& delimiter,
+ const std::string& filter_prefix,
+ const std::string& delimiter,
uint32_t num_entries,
bool list_versions,
BucketIndexAioManager *manager,
cls_rgw_bucket_list_op(op,
start_obj, filter_prefix, delimiter,
num_entries, list_versions, pdata);
- return manager->aio_operate(io_ctx, oid, &op);
+ return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
-int CLSRGWIssueBucketList::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBucketList::issue_op(const int shard_id, const string& oid)
{
- return issue_bucket_list_op(io_ctx, oid,
+ return issue_bucket_list_op(io_ctx, shard_id, oid,
start_obj, filter_prefix, delimiter,
num_entries, list_versions, &manager,
&result[shard_id]);
return 0;
}
-int cls_rgw_bucket_link_olh(librados::IoCtx& io_ctx, const string& oid,
+int cls_rgw_bucket_link_olh(librados::IoCtx& io_ctx, const string& oid,
const cls_rgw_obj_key& key, bufferlist& olh_tag,
bool delete_marker, const string& op_tag, rgw_bucket_dir_entry_meta *meta,
uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op, rgw_zone_set& zones_trace)
op.exec(RGW_CLASS, RGW_BI_LOG_LIST, in, new ClsBucketIndexOpCtx<cls_rgw_bi_log_list_ret>(pdata, ret));
}
-static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, int shard_id,
+static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, const int shard_id,
BucketIndexShardsManager& marker_mgr, uint32_t max,
BucketIndexAioManager *manager,
cls_rgw_bi_log_list_ret *pdata)
{
librados::ObjectReadOperation op;
cls_rgw_bilog_list(op, marker_mgr.get(shard_id, ""), max, pdata, nullptr);
- return manager->aio_operate(io_ctx, oid, &op);
+ return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
-int CLSRGWIssueBILogList::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBILogList::issue_op(const int shard_id, const string& oid)
{
return issue_bi_log_list_op(io_ctx, oid, shard_id, marker_mgr, max, &manager, &result[shard_id]);
}
op.exec(RGW_CLASS, RGW_BI_LOG_TRIM, in);
}
-static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, int shard_id,
+static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, const int shard_id,
BucketIndexShardsManager& start_marker_mgr,
BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) {
cls_rgw_bi_log_trim_op call;
librados::ObjectWriteOperation op;
cls_rgw_bilog_trim(op, start_marker_mgr.get(shard_id, ""),
end_marker_mgr.get(shard_id, ""));
- return manager->aio_operate(io_ctx, oid, &op);
+ return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
-int CLSRGWIssueBILogTrim::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBILogTrim::issue_op(const int shard_id, const string& oid)
{
return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager);
}
-static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager,
+static bool issue_bucket_check_index_op(IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager,
rgw_cls_check_index_ret *pdata) {
bufferlist in;
librados::ObjectReadOperation op;
op.exec(RGW_CLASS, RGW_BUCKET_CHECK_INDEX, in, new ClsBucketIndexOpCtx<rgw_cls_check_index_ret>(
pdata, NULL));
- return manager->aio_operate(io_ctx, oid, &op);
+ return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
int CLSRGWIssueBucketCheck::issue_op(int shard_id, const string& oid)
{
- return issue_bucket_check_index_op(io_ctx, oid, &manager, &result[shard_id]);
+ return issue_bucket_check_index_op(io_ctx, shard_id, oid, &manager, &result[shard_id]);
}
-static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid,
+static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const int shard_id, const string& oid,
BucketIndexAioManager *manager) {
bufferlist in;
librados::ObjectWriteOperation op;
op.exec(RGW_CLASS, RGW_BUCKET_REBUILD_INDEX, in);
- return manager->aio_operate(io_ctx, oid, &op);
+ return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
-int CLSRGWIssueBucketRebuild::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBucketRebuild::issue_op(const int shard_id, const string& oid)
{
- return issue_bucket_rebuild_index_op(io_ctx, oid, &manager);
+ return issue_bucket_rebuild_index_op(io_ctx, shard_id, oid, &manager);
}
void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates)
o.exec(RGW_CLASS, RGW_DIR_SUGGEST_CHANGES, updates);
}
-int CLSRGWIssueGetDirHeader::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueGetDirHeader::issue_op(const int shard_id, const string& oid)
{
cls_rgw_obj_key empty_key;
string empty_prefix;
string empty_delimiter;
- return issue_bucket_list_op(io_ctx, oid,
+ return issue_bucket_list_op(io_ctx, shard_id, oid,
empty_key, empty_prefix, empty_delimiter,
0, false, &manager, &result[shard_id]);
}
-static bool issue_resync_bi_log(librados::IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager)
+static bool issue_resync_bi_log(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
{
bufferlist in;
librados::ObjectWriteOperation op;
op.exec(RGW_CLASS, RGW_BI_LOG_RESYNC, in);
- return manager->aio_operate(io_ctx, oid, &op);
+ return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
-int CLSRGWIssueResyncBucketBILog::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueResyncBucketBILog::issue_op(const int shard_id, const string& oid)
{
- return issue_resync_bi_log(io_ctx, oid, &manager);
+ return issue_resync_bi_log(io_ctx, shard_id, oid, &manager);
}
-static bool issue_bi_log_stop(librados::IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager)
+static bool issue_bi_log_stop(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
{
bufferlist in;
librados::ObjectWriteOperation op;
op.exec(RGW_CLASS, RGW_BI_LOG_STOP, in);
- return manager->aio_operate(io_ctx, oid, &op);
+ return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
-int CLSRGWIssueBucketBILogStop::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBucketBILogStop::issue_op(const int shard_id, const string& oid)
{
- return issue_bi_log_stop(io_ctx, oid, &manager);
+ return issue_bi_log_stop(io_ctx, shard_id, oid, &manager);
}
class GetDirHeaderCompletion : public ObjectOperationCompletion {
op.exec(RGW_CLASS, RGW_GUARD_BUCKET_RESHARDING, in);
}
-static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx, const string& oid,
+static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx,
+ const int shard_id, const string& oid,
const cls_rgw_bucket_instance_entry& entry,
BucketIndexAioManager *manager) {
bufferlist in;
encode(call, in);
librados::ObjectWriteOperation op;
op.exec(RGW_CLASS, RGW_SET_BUCKET_RESHARDING, in);
- return manager->aio_operate(io_ctx, oid, &op);
+ return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
-int CLSRGWIssueSetBucketResharding::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueSetBucketResharding::issue_op(const int shard_id, const string& oid)
{
- return issue_set_bucket_resharding(io_ctx, oid, entry, &manager);
+ return issue_set_bucket_resharding(io_ctx, shard_id, oid, entry, &manager);
}
};
/*
- * This class manages AIO completions. This class is not completely thread-safe,
- * methods like *get_next* is not thread-safe and is expected to be called from
- * within one thread.
+ * This class manages AIO completions. This class is not completely
+ * thread-safe, methods like *get_next_request_id* is not thread-safe
+ * and is expected to be called from within one thread.
*/
class BucketIndexAioManager {
+public:
+
+ // allows us to reaccess the shard id and shard's oid during and
+ // after the asynchronous call is made
+ struct RequestObj {
+ int shard_id;
+ std::string oid;
+
+ RequestObj(int _shard_id, const std::string& _oid) :
+ shard_id(_shard_id), oid(_oid)
+ {/* empty */}
+ };
+
+
private:
+ // NB: the following 4 maps use the request_id as the key; this
+ // is not the same as the shard_id!
std::map<int, librados::AioCompletion*> pendings;
std::map<int, librados::AioCompletion*> completions;
- std::map<int, std::string> pending_objs;
- std::map<int, std::string> completion_objs;
+ std::map<int, const RequestObj> pending_objs;
+ std::map<int, const RequestObj> completion_objs;
+
int next = 0;
ceph::mutex lock = ceph::make_mutex("BucketIndexAioManager::lock");
ceph::condition_variable cond;
*
* Return next request ID.
*/
- int get_next() { return next++; }
-
+ int get_next_request_id() { return next++; }
+
/*
* Add a new pending AIO completion instance.
*
* @param oid - the object id associated with the object, if it is NULL, we don't
* track the object id per callback.
*/
- void add_pending(int id, librados::AioCompletion* completion, const std::string& oid) {
- pendings[id] = completion;
- pending_objs[id] = oid;
+ void add_pending(int request_id, librados::AioCompletion* completion, const int shard_id, const std::string& oid) {
+ pendings[request_id] = completion;
+ pending_objs.emplace(request_id, RequestObj(shard_id, oid));
}
+
public:
/*
* Create a new instance.
/*
* Do completion for the given AIO request.
*/
- void do_completion(int id);
+ void do_completion(int request_id);
/*
* Wait for AIO completions.
/**
* Do aio read operation.
*/
- bool aio_operate(librados::IoCtx& io_ctx, const std::string& oid, librados::ObjectReadOperation *op) {
+ bool aio_operate(librados::IoCtx& io_ctx, const int shard_id, const std::string& oid, librados::ObjectReadOperation *op) {
std::lock_guard l{lock};
- BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this);
+ const int request_id = get_next_request_id();
+ BucketIndexAioArg *arg = new BucketIndexAioArg(request_id, this);
librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, bucket_index_op_completion_cb);
int r = io_ctx.aio_operate(oid, c, (librados::ObjectReadOperation*)op, NULL);
if (r >= 0) {
- add_pending(arg->id, c, oid);
+ add_pending(arg->id, c, shard_id, oid);
} else {
arg->put();
c->release();
/**
* Do aio write operation.
*/
- bool aio_operate(librados::IoCtx& io_ctx, const std::string& oid, librados::ObjectWriteOperation *op) {
+ bool aio_operate(librados::IoCtx& io_ctx, const int shard_id, const std::string& oid, librados::ObjectWriteOperation *op) {
std::lock_guard l{lock};
- BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this);
+ const int request_id = get_next_request_id();
+ BucketIndexAioArg *arg = new BucketIndexAioArg(request_id, this);
librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, bucket_index_op_completion_cb);
int r = io_ctx.aio_operate(oid, c, (librados::ObjectWriteOperation*)op);
if (r >= 0) {
- add_pending(arg->id, c, oid);
+ add_pending(arg->id, c, shard_id, oid);
} else {
arg->put();
c->release();
std::string delimiter;
uint32_t num_entries;
bool list_versions;
- std::map<int, rgw_cls_list_ret>& result;
+ std::map<int, rgw_cls_list_ret>& result; // request_id -> return value
+
protected:
int issue_op(int shard_id, const std::string& oid) override;
+
public:
CLSRGWIssueBucketList(librados::IoCtx& io_ctx,
const cls_rgw_obj_key& _start_obj,
const std::string& _delimiter,
uint32_t _num_entries,
bool _list_versions,
- std::map<int, std::string>& oids,
+ std::map<int, std::string>& oids, // shard_id -> shard_oid
+ // shard_id -> return value
std::map<int, rgw_cls_list_ret>& list_results,
uint32_t max_aio) :
CLSRGWConcurrentIO(io_ctx, oids, max_aio),