]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/kafka/amqp: fix race conditionn in async completion handlers 54697/head
authorYuval Lifshitz <ylifshit@redhat.com>
Tue, 28 Nov 2023 17:49:03 +0000 (17:49 +0000)
committerYuval Lifshitz <ylifshit@redhat.com>
Wed, 29 Nov 2023 09:34:37 +0000 (09:34 +0000)
Fixes: https://tracker.ceph.com/issues/63314
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
src/rgw/driver/rados/rgw_pubsub_push.cc

index bdb24ce9ad10326d2e4438d20e14c1ede6d9d6a2..05dc9e65d0ea52c4c8d38ecd1d9b08bbf9a50528 100644 (file)
@@ -115,6 +115,55 @@ public:
   }
 };
 
+namespace {
+// this allows waiting untill "finish()" is called from a different thread
+// waiting could be blocking the waiting thread or yielding, depending
+// with compilation flag support and whether the optional_yield is set
+class Waiter {
+  using Signature = void(boost::system::error_code);
+  using Completion = ceph::async::Completion<Signature>;
+  using CompletionInit = boost::asio::async_completion<yield_context, Signature>;
+  std::unique_ptr<Completion> completion = nullptr;
+  int ret;
+
+  bool done = false;
+  mutable std::mutex lock;
+  mutable std::condition_variable cond;
+
+public:
+  int wait(optional_yield y) {
+    std::unique_lock l{lock};
+    if (done) {
+      return ret;
+    }
+    if (y) {
+      boost::system::error_code ec;
+      auto&& token = y.get_yield_context()[ec];
+      CompletionInit init(token);
+      completion = Completion::create(y.get_io_context().get_executor(),
+          std::move(init.completion_handler));
+      l.unlock();
+      init.result.get();
+      return -ec.value();
+    }
+    cond.wait(l, [this]{return (done==true);});
+    return ret;
+  }
+
+  void finish(int r) {
+    std::unique_lock l{lock};
+    ret = r;
+    done = true;
+    if (completion) {
+      boost::system::error_code ec(-ret, boost::system::system_category());
+      Completion::post(std::move(completion), ec);
+    } else {
+      cond.notify_all();
+    }
+  }
+};
+} // namespace
+
 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
 class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
 private:
@@ -187,71 +236,17 @@ public:
     }
   }
 
-  // this allows waiting untill "finish()" is called from a different thread
-  // waiting could be blocking the waiting thread or yielding, depending
-  // with compilation flag support and whether the optional_yield is set
-  class Waiter {
-    using Signature = void(boost::system::error_code);
-    using Completion = ceph::async::Completion<Signature>;
-    std::unique_ptr<Completion> completion = nullptr;
-    int ret;
-
-    mutable std::atomic<bool> done = false;
-    mutable std::mutex lock;
-    mutable std::condition_variable cond;
-
-    template <typename ExecutionContext, typename CompletionToken>
-    auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
-      boost::asio::async_completion<CompletionToken, Signature> init(token);
-      auto& handler = init.completion_handler;
-      {
-        std::unique_lock l{lock};
-        completion = Completion::create(ctx.get_executor(), std::move(handler));
-      }
-      return init.result.get();
-    }
-
-  public:
-    int wait(optional_yield y) {
-      if (done) {
-        return ret;
-      }
-      if (y) {
-       auto& io_ctx = y.get_io_context();
-        auto& yield_ctx = y.get_yield_context();
-        boost::system::error_code ec;
-        async_wait(io_ctx, yield_ctx[ec]);
-        return -ec.value();
-      }
-      std::unique_lock l(lock);
-      cond.wait(l, [this]{return (done==true);});
-      return ret;
-    }
-
-    void finish(int r) {
-      std::unique_lock l{lock};
-      ret = r;
-      done = true;
-      if (completion) {
-        boost::system::error_code ec(-ret, boost::system::system_category());
-        Completion::post(std::move(completion), ec);
-      } else {
-        cond.notify_all();
-      }
-    }
-  };
-
   int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
     if (ack_level == ack_level_t::None) {
       return amqp::publish(conn_id, topic, json_format_pubsub_event(event));
     } else {
       // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
-      // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
-      auto w = std::unique_ptr<Waiter>(new Waiter);
+      auto w = std::make_unique<Waiter>();
       const auto rc = amqp::publish_with_confirm(conn_id, 
         topic,
         json_format_pubsub_event(event),
-        std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
+        [wp = w.get()](int r) { wp->finish(r);}
+      );
       if (rc < 0) {
         // failed to publish, does not wait for reply
         return rc;
@@ -314,70 +309,16 @@ public:
     }
   }
 
-  // this allows waiting untill "finish()" is called from a different thread
-  // waiting could be blocking the waiting thread or yielding, depending
-  // with compilation flag support and whether the optional_yield is set
-  class Waiter {
-    using Signature = void(boost::system::error_code);
-    using Completion = ceph::async::Completion<Signature>;
-    std::unique_ptr<Completion> completion = nullptr;
-    int ret;
-
-    mutable std::atomic<bool> done = false;
-    mutable std::mutex lock;
-    mutable std::condition_variable cond;
-
-    template <typename ExecutionContext, typename CompletionToken>
-    auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
-      boost::asio::async_completion<CompletionToken, Signature> init(token);
-      auto& handler = init.completion_handler;
-      {
-        std::unique_lock l{lock};
-        completion = Completion::create(ctx.get_executor(), std::move(handler));
-      }
-      return init.result.get();
-    }
-
-  public:
-    int wait(optional_yield y) {
-      if (done) {
-        return ret;
-      }
-      if (y) {
-        auto& io_ctx = y.get_io_context();
-        auto& yield_ctx = y.get_yield_context();
-        boost::system::error_code ec;
-        async_wait(io_ctx, yield_ctx[ec]);
-        return -ec.value();
-      }
-      std::unique_lock l(lock);
-      cond.wait(l, [this]{return (done==true);});
-      return ret;
-    }
-
-    void finish(int r) {
-      std::unique_lock l{lock};
-      ret = r;
-      done = true;
-      if (completion) {
-        boost::system::error_code ec(-ret, boost::system::system_category());
-        Completion::post(std::move(completion), ec);
-      } else {
-        cond.notify_all();
-      }
-    }
-  };
-
   int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
     if (ack_level == ack_level_t::None) {
       return kafka::publish(conn_name, topic, json_format_pubsub_event(event));
     } else {
-      // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
-      auto w = std::unique_ptr<Waiter>(new Waiter);
+      auto w = std::make_unique<Waiter>();
       const auto rc = kafka::publish_with_confirm(conn_name, 
         topic,
         json_format_pubsub_event(event),
-        std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
+        [wp = w.get()](int r) { wp->finish(r); }
+      );
       if (rc < 0) {
         // failed to publish, does not wait for reply
         return rc;