class RGWIndexCompletionManager;
struct complete_op_data {
+ Mutex lock{"complete_op_data"};
AioCompletion *rados_completion{nullptr};
+ int manager_shard_id{-1};
RGWIndexCompletionManager *manager{nullptr};
rgw_obj obj;
RGWModifyOp op;
list<cls_rgw_obj_key> remove_objs;
bool log_op;
uint16_t bilog_op;
+
+ bool stopped{false};
+
+ void stop() {
+ Mutex::Locker l(lock);
+ stopped = true;
+ }
};
class RGWIndexCompletionThread : public RGWRadosThread {
class RGWIndexCompletionManager {
RGWRados *store{nullptr};
- Mutex lock{"RGWIndexCompletionManager::lock"};
- set<complete_op_data *> completions;
+ vector<Mutex *> locks;
+ vector<set<complete_op_data *> > completions;
RGWIndexCompletionThread *completion_thread{nullptr};
+ int num_shards;
+
+ atomic_t cur_shard;
+
public:
- RGWIndexCompletionManager(RGWRados *_store) : store(_store) {}
+ RGWIndexCompletionManager(RGWRados *_store) : store(_store) {
+ num_shards = store->ctx()->_conf->rgw_thread_pool_size;
+
+ for (int i = 0; i < num_shards; i++) {
+ char buf[64];
+ snprintf(buf, sizeof(buf), "RGWIndexCompletionManager::lock::%d", i);
+ locks.push_back(new Mutex(buf));
+ }
+
+ completions.resize(num_shards);
+ }
~RGWIndexCompletionManager() {
stop();
+
+ for (auto l : locks) {
+ delete l;
+ }
}
- void create_completion(AioCompletion *rados_completion,
- const rgw_obj& obj,
+ int next_shard() {
+ int result = cur_shard.read() % num_shards;
+ cur_shard.inc();
+ return result;
+ }
+
+ void create_completion(const rgw_obj& obj,
RGWModifyOp op, string& tag,
rgw_bucket_entry_ver& ver,
const cls_rgw_obj_key& key,
delete completion_thread;
}
- Mutex::Locker l(lock);
- for (auto c : completions) {
- c->rados_completion->release();
- delete c;
+ for (int i = 0; i < num_shards; ++i) {
+ Mutex::Locker l(*locks[i]);
+ for (auto c : completions[i]) {
+ Mutex::Locker cl(c->lock);
+ c->stop();
+ }
}
completions.clear();
}
};
-void RGWIndexCompletionManager::create_completion(AioCompletion *rados_completion,
- const rgw_obj& obj,
+static void obj_complete_cb(completion_t cb, void *arg)
+{
+ complete_op_data *completion = (complete_op_data *)arg;
+ completion->lock.Lock();
+ if (completion->stopped) {
+ completion->lock.Unlock(); /* can drop lock, no one else is referencing us */
+ delete completion;
+ }
+ ((complete_op_data *)arg)->manager->handle_completion(cb, completion);
+ completion->lock.Unlock();
+}
+
+
+void RGWIndexCompletionManager::create_completion(const rgw_obj& obj,
RGWModifyOp op, string& tag,
rgw_bucket_entry_ver& ver,
const cls_rgw_obj_key& key,
{
complete_op_data *entry = new complete_op_data;
- entry->rados_completion = rados_completion;
+ int shard_id = next_shard();
+
+ entry->manager_shard_id = shard_id;
entry->manager = this;
entry->obj = obj;
entry->op = op;
*result = entry;
- Mutex::Locker l(lock);
- completions.insert(entry);
+ entry->rados_completion = librados::Rados::aio_create_completion(entry, NULL, obj_complete_cb);
+
+ Mutex::Locker l(*locks[shard_id]);
+ completions[shard_id].insert(entry);
}
void RGWIndexCompletionManager::handle_completion(completion_t cb, complete_op_data *arg)
{
+ int shard_id = arg->manager_shard_id;
{
- Mutex::Locker l(lock);
+ Mutex::Locker l(*locks[shard_id]);
- auto iter = completions.find(arg);
- if (iter == completions.end()) {
+ auto& comps = completions[shard_id];
+
+ auto iter = comps.find(arg);
+ if (iter == comps.end()) {
return;
}
- completions.erase(iter);
+ comps.erase(iter);
}
int r = rados_aio_get_return_value(cb);
return bs.index_ctx.operate(bs.bucket_obj, &o);
}
-static void obj_complete_cb(completion_t cb, void *arg)
-{
- ((complete_op_data *)arg)->manager->handle_completion(cb, (complete_op_data *)arg);
-}
-
int RGWRados::cls_obj_complete_op(BucketShard& bs, const rgw_obj& obj, RGWModifyOp op, string& tag,
int64_t pool, uint64_t epoch,
rgw_bucket_dir_entry& ent, RGWObjCategory category,
get_zone().log_data, bilog_flags, zones_trace);
complete_op_data *arg;
- AioCompletion *c = librados::Rados::aio_create_completion(arg, NULL, obj_complete_cb);
- index_completion_manager->create_completion(c, obj, op, tag, ver, key, dir_meta, ro,
+ index_completion_manager->create_completion(obj, op, tag, ver, key, dir_meta, ro,
get_zone().log_data, bilog_flags, &arg);
- int ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &o);
- c->release();
+ int ret = bs.index_ctx.aio_operate(bs.bucket_obj, arg->rados_completion, &o);
+ arg->rados_completion->release();
return ret;
}