]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: prevent spurious/lost notifications in the index completion thread
authorYuval Lifshitz <ylifshit@redhat.com>
Wed, 23 Feb 2022 15:21:10 +0000 (17:21 +0200)
committerCory Snyder <csnyder@1111systems.com>
Thu, 7 Dec 2023 09:38:30 +0000 (04:38 -0500)
this was happening when asyn completions happened during reshard.
more information about testing:
https://gist.github.com/yuvalif/d526c0a3a4c5b245b9e951a6c5a10517

we also add more logs to the completion manager.
should allow finding unhandled completions due to reshards.

Fixes: https://tracker.ceph.com/issues/54435
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
(cherry picked from commit e72b6510a402cbafa6c99475626f15d97fd00f86)

src/rgw/rgw_rados.cc

index e3d905018fa5cb4a92a39ae01a8c571ed1e587cd..ab98b5ffa21c6a41aa2a4198a501f963c724cf2d 100644 (file)
@@ -788,122 +788,65 @@ struct complete_op_data {
   }
 };
 
-class RGWIndexCompletionThread : public RGWRadosThread, public DoutPrefixProvider {
-  RGWRados *store;
-
-  uint64_t interval_msec() override {
-    return 0;
-  }
-
-  list<complete_op_data *> completions;
-
-  ceph::mutex completions_lock =
-    ceph::make_mutex("RGWIndexCompletionThread::completions_lock");
-public:
-  RGWIndexCompletionThread(RGWRados *_store)
-    : RGWRadosThread(_store, "index-complete"), store(_store) {}
-
-  int process(const DoutPrefixProvider *dpp) override;
-
-  void add_completion(complete_op_data *completion) {
-    {
-      std::lock_guard l{completions_lock};
-      completions.push_back(completion);
-    }
-
-    signal();
-  }
-
-  CephContext *get_cct() const override { return store->ctx(); }
-  unsigned get_subsys() const { return dout_subsys; }
-  std::ostream& gen_prefix(std::ostream& out) const { return out << "rgw index completion thread: "; }
-};
-
-int RGWIndexCompletionThread::process(const DoutPrefixProvider *dpp)
-{
-  list<complete_op_data *> comps;
-
-  {
-    std::lock_guard l{completions_lock};
-    completions.swap(comps);
-  }
+class RGWIndexCompletionManager {
+  RGWRados* const store;
+  const int num_shards;
+  ceph::containers::tiny_vector<ceph::mutex> locks;
+  std::vector<set<complete_op_data*>> completions;
+  std::vector<complete_op_data*> retry_completions;
 
-  for (auto c : comps) {
-    std::unique_ptr<complete_op_data> up{c};
+  std::thread retry_thread;
+  std::condition_variable cond;
+  std::mutex retry_completions_lock;
 
-    if (going_down()) {
-      continue;
-    }
-    ldpp_dout(this, 20) << __func__ << "(): handling completion for key=" << c->key << dendl;
+  bool _stop{false};
 
-    RGWRados::BucketShard bs(store);
-    RGWBucketInfo bucket_info;
+  std::atomic<int> cur_shard {0};
 
-    int r = bs.init(c->obj.bucket, c->obj, &bucket_info, this);
-    if (r < 0) {
-      ldpp_dout(this, 0) << "ERROR: " << __func__ << "(): failed to initialize BucketShard, obj=" << c->obj << " r=" << r << dendl;
-      /* not much to do */
-      continue;
+  void process();
+  
+  void add_completion(complete_op_data *completion);
+  
+  void stop() {
+    if (retry_thread.joinable()) {
+      _stop = true;
+      cond.notify_all();
+      retry_thread.join();
     }
 
-    r = store->guard_reshard(this, &bs, c->obj, bucket_info,
-                            [&](RGWRados::BucketShard *bs) -> int {
-                              librados::ObjectWriteOperation o;
-                              o.assert_exists(); // bucket index shard must exist
-                              cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
-                              cls_rgw_bucket_complete_op(o, c->op, c->tag, c->ver, c->key, c->dir_meta, &c->remove_objs,
-                                                         c->log_op, c->bilog_op, &c->zones_trace);
-                              return bs->bucket_obj.operate(this, &o, null_yield);
-                             });
-    if (r < 0) {
-      ldpp_dout(this, 0) << "ERROR: " << __func__ << "(): bucket index completion failed, obj=" << c->obj << " r=" << r << dendl;
-      /* ignoring error, can't do anything about it */
-      continue;
-    }
-    r = store->svc.datalog_rados->add_entry(this, bucket_info, bs.shard_id);
-    if (r < 0) {
-      ldpp_dout(this, -1) << "ERROR: failed writing data log" << dendl;
+    for (int i = 0; i < num_shards; ++i) {
+      std::lock_guard l{locks[i]};
+      for (auto c : completions[i]) {
+        c->stop();
+      }
     }
+    completions.clear();
+  }
+  
+  int next_shard() {
+    int result = cur_shard % num_shards;
+    cur_shard++;
+    return result;
   }
-
-  return 0;
-}
-
-class RGWIndexCompletionManager {
-  RGWRados *store{nullptr};
-  ceph::containers::tiny_vector<ceph::mutex> locks;
-  vector<set<complete_op_data *> > completions;
-
-  RGWIndexCompletionThread *completion_thread{nullptr};
-
-  int num_shards;
-
-  std::atomic<int> cur_shard {0};
-
 
 public:
   RGWIndexCompletionManager(RGWRados *_store) :
     store(_store),
+    num_shards(store->ctx()->_conf->rgw_thread_pool_size),
     locks{ceph::make_lock_container<ceph::mutex>(
-      store->ctx()->_conf->rgw_thread_pool_size,
+      num_shards,
       [](const size_t i) {
         return ceph::make_mutex("RGWIndexCompletionManager::lock::" +
                                std::to_string(i));
-      })}
-  {
-    num_shards = store->ctx()->_conf->rgw_thread_pool_size;
-    completions.resize(num_shards);
-  }
+      })},
+    completions(num_shards),
+    retry_thread(&RGWIndexCompletionManager::process, this)
+    {}
+
   ~RGWIndexCompletionManager() {
     stop();
   }
 
-  int next_shard() {
-    int result = cur_shard % num_shards;
-    cur_shard++;
-    return result;
-  }
-
   void create_completion(const rgw_obj& obj,
                          RGWModifyOp op, string& tag,
                          rgw_bucket_entry_ver& ver,
@@ -913,36 +856,17 @@ public:
                          uint16_t bilog_op,
                          rgw_zone_set *zones_trace,
                          complete_op_data **result);
-  bool handle_completion(completion_t cb, complete_op_data *arg);
 
-  int start(const DoutPrefixProvider *dpp) {
-    completion_thread = new RGWIndexCompletionThread(store);
-    int ret = completion_thread->init(dpp);
-    if (ret < 0) {
-      return ret;
-    }
-    completion_thread->start();
-    return 0;
-  }
-  void stop() {
-    if (completion_thread) {
-      completion_thread->stop();
-      delete completion_thread;
-    }
+  bool handle_completion(completion_t cb, complete_op_data *arg);
 
-    for (int i = 0; i < num_shards; ++i) {
-      std::lock_guard l{locks[i]};
-      for (auto c : completions[i]) {
-        c->stop();
-      }
-    }
-    completions.clear();
+  CephContext* ctx() {
+    return store->ctx();
   }
 };
 
 static void obj_complete_cb(completion_t cb, void *arg)
 {
-  complete_op_data *completion = (complete_op_data *)arg;
+  complete_op_data *completion = reinterpret_cast<complete_op_data*>(arg);
   completion->lock.lock();
   if (completion->stopped) {
     completion->lock.unlock(); /* can drop lock, no one else is referencing us */
@@ -956,6 +880,58 @@ static void obj_complete_cb(completion_t cb, void *arg)
   }
 }
 
+void RGWIndexCompletionManager::process()
+{
+  DoutPrefix dpp(store->ctx(), dout_subsys, "rgw index completion thread: ");
+  while(!_stop) {
+    std::vector<complete_op_data*> comps;
+
+    {
+      std::unique_lock l{retry_completions_lock};
+      cond.wait(l, [this](){return _stop || !retry_completions.empty();});
+      if (_stop) {
+        return;
+      }
+      retry_completions.swap(comps);
+    }
+
+    for (auto c : comps) {
+      std::unique_ptr<complete_op_data> up{c};
+
+      ldpp_dout(&dpp, 20) << __func__ << "(): handling completion for key=" << c->key << dendl;
+
+      RGWRados::BucketShard bs(store);
+      RGWBucketInfo bucket_info;
+
+      int r = bs.init(c->obj.bucket, c->obj, &bucket_info, &dpp);
+      if (r < 0) {
+        ldpp_dout(&dpp, 0) << "ERROR: " << __func__ << "(): failed to initialize BucketShard, obj=" << c->obj << " r=" << r << dendl;
+        /* not much to do */
+        continue;
+      }
+
+      r = store->guard_reshard(&dpp, &bs, c->obj, bucket_info,
+                            [&](RGWRados::BucketShard *bs) -> int {
+                              librados::ObjectWriteOperation o;
+                              o.assert_exists(); // bucket index shard must exist
+                              cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
+                              cls_rgw_bucket_complete_op(o, c->op, c->tag, c->ver, c->key, c->dir_meta, &c->remove_objs,
+                                                         c->log_op, c->bilog_op, &c->zones_trace);
+                              return bs->bucket_obj.operate(&dpp, &o, null_yield);
+                             });
+      if (r < 0) {
+        ldpp_dout(&dpp, 0) << "ERROR: " << __func__ << "(): bucket index completion failed, obj=" << c->obj << " r=" << r << dendl;
+        /* ignoring error, can't do anything about it */
+        continue;
+      }
+
+      r = store->svc.datalog_rados->add_entry(&dpp, bucket_info, bs.shard_id);
+      if (r < 0) {
+        ldpp_dout(&dpp, -1) << "ERROR: failed writing data log" << dendl;
+      }
+    }
+  }
+}
 
 void RGWIndexCompletionManager::create_completion(const rgw_obj& obj,
                                                   RGWModifyOp op, string& tag,
@@ -999,7 +975,16 @@ void RGWIndexCompletionManager::create_completion(const rgw_obj& obj,
   entry->rados_completion = librados::Rados::aio_create_completion(entry, obj_complete_cb);
 
   std::lock_guard l{locks[shard_id]};
-  completions[shard_id].insert(entry);
+  const auto ok = completions[shard_id].insert(entry).second;
+  ceph_assert(ok);
+}
+
+void RGWIndexCompletionManager::add_completion(complete_op_data *completion) {
+  {
+    std::lock_guard l{retry_completions_lock};
+    retry_completions.push_back(completion);
+  }
+  cond.notify_all();
 }
 
 bool RGWIndexCompletionManager::handle_completion(completion_t cb, complete_op_data *arg)
@@ -1012,6 +997,7 @@ bool RGWIndexCompletionManager::handle_completion(completion_t cb, complete_op_d
 
     auto iter = comps.find(arg);
     if (iter == comps.end()) {
+      ldout(arg->manager->ctx(), 0) << __func__ << "(): cannot find completion for obj=" << arg->key << dendl;
       return true;
     }
 
@@ -1020,9 +1006,13 @@ bool RGWIndexCompletionManager::handle_completion(completion_t cb, complete_op_d
 
   int r = rados_aio_get_return_value(cb);
   if (r != -ERR_BUSY_RESHARDING) {
+    ldout(arg->manager->ctx(), 20) << __func__ << "(): completion " << 
+      (r == 0 ? "ok" : "failed with " + to_string(r)) << 
+      " for obj=" << arg->key << dendl;
     return true;
   }
-  completion_thread->add_completion(arg);
+  add_completion(arg);
+  ldout(arg->manager->ctx(), 20) << __func__ << "(): async completion added for obj=" << arg->key << dendl;
   return false;
 }
 
@@ -1340,10 +1330,6 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp)
   }
 
   index_completion_manager = new RGWIndexCompletionManager(this);
-  ret = index_completion_manager->start(dpp);
-  if (ret < 0) {
-    return ret;
-  }
   ret = rgw::notify::init(cct, store, dpp);
   if (ret < 0 ) {
     ldpp_dout(dpp, 1) << "ERROR: failed to initialize notification manager" << dendl;