]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: reshard complete handling, use multiple locks
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 17 May 2017 01:10:15 +0000 (18:10 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Mon, 5 Jun 2017 20:17:47 +0000 (13:17 -0700)
to reduce contention. Also, fix completions cleanup.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_rados.cc

index da88c39407053a2826df845332655588c7bc876c..0b05353bd1fe7d1ab1818c8501486fab8c2c40d8 100644 (file)
@@ -3383,7 +3383,9 @@ int RGWRados::get_max_chunk_size(const string& placement_rule, const rgw_obj& ob
 class RGWIndexCompletionManager;
 
 struct complete_op_data {
+  Mutex lock{"complete_op_data"};
   AioCompletion *rados_completion{nullptr};
+  int manager_shard_id{-1};
   RGWIndexCompletionManager *manager{nullptr};
   rgw_obj obj;
   RGWModifyOp op;
@@ -3394,6 +3396,13 @@ struct complete_op_data {
   list<cls_rgw_obj_key> remove_objs;
   bool log_op;
   uint16_t bilog_op;
+
+  bool stopped{false};
+
+  void stop() {
+    Mutex::Locker l(lock);
+    stopped = true;
+  }
 };
 
 class RGWIndexCompletionThread : public RGWRadosThread {
@@ -3472,20 +3481,43 @@ int RGWIndexCompletionThread::process()
 
 class RGWIndexCompletionManager {
   RGWRados *store{nullptr};
-  Mutex lock{"RGWIndexCompletionManager::lock"};
-  set<complete_op_data *> completions;
+  vector<Mutex *> locks;
+  vector<set<complete_op_data *> > completions;
 
   RGWIndexCompletionThread *completion_thread{nullptr};
 
+  int num_shards;
+
+  atomic_t cur_shard;
+
 
 public:
-  RGWIndexCompletionManager(RGWRados *_store) : store(_store) {}
+  RGWIndexCompletionManager(RGWRados *_store) : store(_store) {
+    num_shards = store->ctx()->_conf->rgw_thread_pool_size;
+
+    for (int i = 0; i < num_shards; i++) {
+      char buf[64];
+      snprintf(buf, sizeof(buf), "RGWIndexCompletionManager::lock::%d", i);
+      locks.push_back(new Mutex(buf));
+    }
+
+    completions.resize(num_shards);
+  }
   ~RGWIndexCompletionManager() {
     stop();
+
+    for (auto l : locks) {
+      delete l;
+    }
   }
 
-  void create_completion(AioCompletion *rados_completion,
-                         const rgw_obj& obj,
+  int next_shard() {
+    int result = cur_shard.read() % num_shards;
+    cur_shard.inc();
+    return result;
+  }
+
+  void create_completion(const rgw_obj& obj,
                          RGWModifyOp op, string& tag,
                          rgw_bucket_entry_ver& ver,
                          const cls_rgw_obj_key& key,
@@ -3510,17 +3542,31 @@ public:
       delete completion_thread;
     }
 
-    Mutex::Locker l(lock);
-    for (auto c : completions) {
-      c->rados_completion->release();
-      delete c;
+    for (int i = 0; i < num_shards; ++i) {
+      Mutex::Locker l(*locks[i]);
+      for (auto c : completions[i]) {
+        Mutex::Locker cl(c->lock);
+        c->stop();
+      }
     }
     completions.clear();
   }
 };
 
-void RGWIndexCompletionManager::create_completion(AioCompletion *rados_completion,
-                                                  const rgw_obj& obj,
+static void obj_complete_cb(completion_t cb, void *arg)
+{
+  complete_op_data *completion = (complete_op_data *)arg;
+  completion->lock.Lock();
+  if (completion->stopped) {
+    completion->lock.Unlock(); /* can drop lock, no one else is referencing us */
+    delete completion;
+  }
+  ((complete_op_data *)arg)->manager->handle_completion(cb, completion);
+  completion->lock.Unlock();
+}
+
+
+void RGWIndexCompletionManager::create_completion(const rgw_obj& obj,
                                                   RGWModifyOp op, string& tag,
                                                   rgw_bucket_entry_ver& ver,
                                                   const cls_rgw_obj_key& key,
@@ -3531,7 +3577,9 @@ void RGWIndexCompletionManager::create_completion(AioCompletion *rados_completio
 {
   complete_op_data *entry = new complete_op_data;
 
-  entry->rados_completion = rados_completion;
+  int shard_id = next_shard();
+
+  entry->manager_shard_id = shard_id;
   entry->manager = this;
   entry->obj = obj;
   entry->op = op;
@@ -3545,21 +3593,26 @@ void RGWIndexCompletionManager::create_completion(AioCompletion *rados_completio
 
   *result = entry;
 
-  Mutex::Locker l(lock);
-  completions.insert(entry);
+  entry->rados_completion = librados::Rados::aio_create_completion(entry, NULL, obj_complete_cb);
+
+  Mutex::Locker l(*locks[shard_id]);
+  completions[shard_id].insert(entry);
 }
 
 void RGWIndexCompletionManager::handle_completion(completion_t cb, complete_op_data *arg)
 {
+  int shard_id = arg->manager_shard_id;
   {
-    Mutex::Locker l(lock);
+    Mutex::Locker l(*locks[shard_id]);
 
-    auto iter = completions.find(arg);
-    if (iter == completions.end()) {
+    auto& comps = completions[shard_id];
+
+    auto iter = comps.find(arg);
+    if (iter == comps.end()) {
       return;
     }
 
-    completions.erase(iter);
+    comps.erase(iter);
   }
 
   int r = rados_aio_get_return_value(cb);
@@ -12420,11 +12473,6 @@ int RGWRados::cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag,
   return bs.index_ctx.operate(bs.bucket_obj, &o);
 }
 
-static void obj_complete_cb(completion_t cb, void *arg)
-{
-  ((complete_op_data *)arg)->manager->handle_completion(cb, (complete_op_data *)arg);
-}
-
 int RGWRados::cls_obj_complete_op(BucketShard& bs, const rgw_obj& obj, RGWModifyOp op, string& tag,
                                   int64_t pool, uint64_t epoch,
                                   rgw_bucket_dir_entry& ent, RGWObjCategory category,
@@ -12462,11 +12510,10 @@ int RGWRados::cls_obj_complete_op(BucketShard& bs, const rgw_obj& obj, RGWModify
                              get_zone().log_data, bilog_flags, zones_trace);
 
   complete_op_data *arg;
-  AioCompletion *c = librados::Rados::aio_create_completion(arg, NULL, obj_complete_cb);
-  index_completion_manager->create_completion(c, obj, op, tag, ver, key, dir_meta, ro,
+  index_completion_manager->create_completion(obj, op, tag, ver, key, dir_meta, ro,
                                               get_zone().log_data, bilog_flags, &arg);
-  int ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &o);
-  c->release();
+  int ret = bs.index_ctx.aio_operate(bs.bucket_obj, arg->rados_completion, &o);
+  arg->rados_completion->release();
   return ret;
 }