From 909ff4c102b090f20cf279e497b2780a8b371275 Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Tue, 28 Nov 2023 17:49:03 +0000 Subject: [PATCH] rgw/kafka/amqp: fix race conditionn in async completion handlers Fixes: https://tracker.ceph.com/issues/63314 Signed-off-by: Yuval Lifshitz (cherry picked from commit 3e6d5273fc408efab9012883d4d16c0299e62b83) Conflicts: src/rgw/rgw_pubsub_push.cc --- src/rgw/rgw_pubsub_push.cc | 61 ++++++++++++++++++++++++++++++++++---- 1 file changed, 55 insertions(+), 6 deletions(-) diff --git a/src/rgw/rgw_pubsub_push.cc b/src/rgw/rgw_pubsub_push.cc index 79fa736e92504..7493fcf25bf86 100644 --- a/src/rgw/rgw_pubsub_push.cc +++ b/src/rgw/rgw_pubsub_push.cc @@ -170,6 +170,55 @@ public: } }; +namespace { +// this allows waiting untill "finish()" is called from a different thread +// waiting could be blocking the waiting thread or yielding, depending +// with compilation flag support and whether the optional_yield is set +class Waiter { + using Signature = void(boost::system::error_code); + using Completion = ceph::async::Completion; + using CompletionInit = boost::asio::async_completion; + std::unique_ptr completion = nullptr; + int ret; + + bool done = false; + mutable std::mutex lock; + mutable std::condition_variable cond; + +public: + int wait(optional_yield y) { + std::unique_lock l{lock}; + if (done) { + return ret; + } + if (y) { + boost::system::error_code ec; + auto&& token = y.get_yield_context()[ec]; + CompletionInit init(token); + completion = Completion::create(y.get_io_context().get_executor(), + std::move(init.completion_handler)); + l.unlock(); + init.result.get(); + return -ec.value(); + } + cond.wait(l, [this]{return (done==true);}); + return ret; + } + + void finish(int r) { + std::unique_lock l{lock}; + ret = r; + done = true; + if (completion) { + boost::system::error_code ec(-ret, boost::system::system_category()); + Completion::post(std::move(completion), ec); + } else { + cond.notify_all(); + } + } +}; +} // namespace + #ifdef WITH_RADOSGW_AMQP_ENDPOINT class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint { private: @@ -410,12 +459,12 @@ public: return amqp::publish(conn, topic, json_format_pubsub_event(event)); } else { // TODO: currently broker and routable are the same - this will require different flags but the same mechanism - // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine - auto w = std::unique_ptr(new Waiter); + auto w = std::make_unique(); const auto rc = amqp::publish_with_confirm(conn, topic, json_format_pubsub_event(event), - std::bind(&Waiter::finish, w.get(), std::placeholders::_1)); + [wp = w.get()](int r) { wp->finish(r);} + ); if (rc < 0) { // failed to publish, does not wait for reply return rc; @@ -645,12 +694,12 @@ public: if (ack_level == ack_level_t::None) { return kafka::publish(conn, topic, json_format_pubsub_event(event)); } else { - // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine - auto w = std::unique_ptr(new Waiter); + auto w = std::make_unique(); const auto rc = kafka::publish_with_confirm(conn, topic, json_format_pubsub_event(event), - std::bind(&Waiter::finish, w.get(), std::placeholders::_1)); + [wp = w.get()](int r) { wp->finish(r); } + ); if (rc < 0) { // failed to publish, does not wait for reply return rc; -- 2.39.5