]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/kafka/amqp: fix race conditionn in async completion handlers 54737/head
authorYuval Lifshitz <ylifshit@redhat.com>
Tue, 28 Nov 2023 17:49:03 +0000 (17:49 +0000)
committerYuval Lifshitz <ylifshit@redhat.com>
Thu, 30 Nov 2023 15:03:34 +0000 (15:03 +0000)
Fixes: https://tracker.ceph.com/issues/63314
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
(cherry picked from commit 3e6d5273fc408efab9012883d4d16c0299e62b83)

Conflicts:
src/rgw/rgw_pubsub_push.cc

src/rgw/rgw_pubsub_push.cc

index 79fa736e9250421ff87bf85a4c5877759dbb6874..7493fcf25bf86a14016438d1d1f2326b0b6b5766 100644 (file)
@@ -170,6 +170,55 @@ public:
   }
 };
 
+namespace {
+// this allows waiting untill "finish()" is called from a different thread
+// waiting could be blocking the waiting thread or yielding, depending
+// with compilation flag support and whether the optional_yield is set
+class Waiter {
+  using Signature = void(boost::system::error_code);
+  using Completion = ceph::async::Completion<Signature>;
+  using CompletionInit = boost::asio::async_completion<yield_context, Signature>;
+  std::unique_ptr<Completion> completion = nullptr;
+  int ret;
+
+  bool done = false;
+  mutable std::mutex lock;
+  mutable std::condition_variable cond;
+
+public:
+  int wait(optional_yield y) {
+    std::unique_lock l{lock};
+    if (done) {
+      return ret;
+    }
+    if (y) {
+      boost::system::error_code ec;
+      auto&& token = y.get_yield_context()[ec];
+      CompletionInit init(token);
+      completion = Completion::create(y.get_io_context().get_executor(),
+          std::move(init.completion_handler));
+      l.unlock();
+      init.result.get();
+      return -ec.value();
+    }
+    cond.wait(l, [this]{return (done==true);});
+    return ret;
+  }
+
+  void finish(int r) {
+    std::unique_lock l{lock};
+    ret = r;
+    done = true;
+    if (completion) {
+      boost::system::error_code ec(-ret, boost::system::system_category());
+      Completion::post(std::move(completion), ec);
+    } else {
+      cond.notify_all();
+    }
+  }
+};
+} // namespace
+
 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
 class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
 private:
@@ -410,12 +459,12 @@ public:
       return amqp::publish(conn, topic, json_format_pubsub_event(event));
     } else {
       // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
-      // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
-      auto w = std::unique_ptr<Waiter>(new Waiter);
+      auto w = std::make_unique<Waiter>();
       const auto rc = amqp::publish_with_confirm(conn, 
         topic,
         json_format_pubsub_event(event),
-        std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
+        [wp = w.get()](int r) { wp->finish(r);}
+      );
       if (rc < 0) {
         // failed to publish, does not wait for reply
         return rc;
@@ -645,12 +694,12 @@ public:
     if (ack_level == ack_level_t::None) {
       return kafka::publish(conn, topic, json_format_pubsub_event(event));
     } else {
-      // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
-      auto w = std::unique_ptr<Waiter>(new Waiter);
+      auto w = std::make_unique<Waiter>();
       const auto rc = kafka::publish_with_confirm(conn, 
         topic,
         json_format_pubsub_event(event),
-        std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
+        [wp = w.get()](int r) { wp->finish(r); }
+      );
       if (rc < 0) {
         // failed to publish, does not wait for reply
         return rc;