From 3b13a0ee98844ed8710b6d270b1b44c68451710e Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Tue, 23 Jul 2024 17:41:50 +0000 Subject: [PATCH] rgw/async/notifications: use common async waiter in pubsub push * use the "yield_waiter" and "waiter" from common/async insteasd of the "waiter" implemented inside the bucket notification code (this is so we don't need separate investigations for 2 implementations) * added a unit test that simulate how a separate thread (kafka or amqp) is resuming a coroutine which is created by either the frontend or the notification manager. before using "defer" the unit test is passing, however, when executed under thread sanitizer (using the WITH_TSAN cmake flag) the following errors are observed: https://0x0.st/Xp4P.txt after using "defer" the unit test passes under TSAN without errors. Fixes: https://tracker.ceph.com/issues/64184 Signed-off-by: Yuval Lifshitz (cherry picked from commit 2872c75f184c9e715219dfa9ad44f5b6cfe4e1fe) --- src/rgw/driver/rados/rgw_pubsub_push.cc | 99 +++++++++------------- src/test/common/test_async_yield_waiter.cc | 26 ++++++ 2 files changed, 65 insertions(+), 60 deletions(-) diff --git a/src/rgw/driver/rados/rgw_pubsub_push.cc b/src/rgw/driver/rados/rgw_pubsub_push.cc index e0d742d6f10..2bd465799a6 100644 --- a/src/rgw/driver/rados/rgw_pubsub_push.cc +++ b/src/rgw/driver/rados/rgw_pubsub_push.cc @@ -9,6 +9,8 @@ #include "common/Formatter.h" #include "common/iso_8601.h" #include "common/async/completion.h" +#include "common/async/yield_waiter.h" +#include "common/async/waiter.h" #include "rgw_common.h" #include "rgw_data_sync.h" #include "rgw_pubsub.h" @@ -129,55 +131,6 @@ public: } }; -namespace { -// this allows waiting untill "finish()" is called from a different thread -// waiting could be blocking the waiting thread or yielding, depending -// with compilation flag support and whether the optional_yield is set -class Waiter { - using Signature = void(boost::system::error_code); - using Completion = ceph::async::Completion; - std::unique_ptr completion = nullptr; - int ret; - - bool done = false; - mutable std::mutex lock; - mutable std::condition_variable cond; - -public: - int wait(optional_yield y) { - std::unique_lock l{lock}; - if (done) { - return ret; - } - if (y) { - boost::system::error_code ec; - auto yield = y.get_yield_context(); - auto&& token = yield[ec]; - boost::asio::async_initiate( - [this, &l] (auto handler, auto ex) { - completion = Completion::create(ex, std::move(handler)); - l.unlock(); // unlock before suspend - }, token, yield.get_executor()); - return -ec.value(); - } - cond.wait(l, [this]{return (done==true);}); - return ret; - } - - void finish(int r) { - std::unique_lock l{lock}; - ret = r; - done = true; - if (completion) { - boost::system::error_code ec(-ret, boost::system::system_category()); - Completion::post(std::move(completion), ec); - } else { - cond.notify_all(); - } - } -}; -} // namespace - #ifdef WITH_RADOSGW_AMQP_ENDPOINT class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint { private: @@ -252,17 +205,29 @@ public: return amqp::publish(conn_id, topic, json_format_pubsub_event(event)); } else { // TODO: currently broker and routable are the same - this will require different flags but the same mechanism - auto w = std::make_unique(); - const auto rc = amqp::publish_with_confirm(conn_id, - topic, - json_format_pubsub_event(event), - [wp = w.get()](int r) { wp->finish(r);} - ); + if (y) { + auto& yield = y.get_yield_context(); + ceph::async::yield_waiter w; + boost::asio::defer(yield.get_executor(),[&w, &event, this]() { + const auto rc = amqp::publish_with_confirm( + conn_id, topic, json_format_pubsub_event(event), + [&w](int r) {w.complete(boost::system::error_code{}, r);}); + if (rc < 0) { + // failed to publish, does not wait for reply + w.complete(boost::system::error_code{}, rc); + } + }); + return w.async_wait(yield); + } + ceph::async::waiter w; + const auto rc = amqp::publish_with_confirm( + conn_id, topic, json_format_pubsub_event(event), + [&w](int r) {w(r);}); if (rc < 0) { // failed to publish, does not wait for reply return rc; } - return w->wait(y); + return w.wait(); } } @@ -324,15 +289,29 @@ public: if (ack_level == ack_level_t::None) { return kafka::publish(conn_id, topic, json_format_pubsub_event(event)); } else { - auto w = std::make_unique(); + if (y) { + auto& yield = y.get_yield_context(); + ceph::async::yield_waiter w; + boost::asio::defer(yield.get_executor(),[&w, &event, this]() { + const auto rc = kafka::publish_with_confirm( + conn_id, topic, json_format_pubsub_event(event), + [&w](int r) {w.complete(boost::system::error_code{}, r);}); + if (rc < 0) { + // failed to publish, does not wait for reply + w.complete(boost::system::error_code{}, rc); + } + }); + return w.async_wait(yield); + } + ceph::async::waiter w; const auto rc = kafka::publish_with_confirm( - conn_id, topic, json_format_pubsub_event(event), - [wp = w.get()](int r) { wp->finish(r); }); + conn_id, topic, json_format_pubsub_event(event), + [&w](int r) {w(r);}); if (rc < 0) { // failed to publish, does not wait for reply return rc; } - return w->wait(y); + return w.wait(); } } diff --git a/src/test/common/test_async_yield_waiter.cc b/src/test/common/test_async_yield_waiter.cc index 6746825968e..cd74ffc526e 100644 --- a/src/test/common/test_async_yield_waiter.cc +++ b/src/test/common/test_async_yield_waiter.cc @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -240,4 +241,29 @@ TEST(YieldWaiterPtr, wait_error) } } +void invoke_callback(int expected_reply, std::function cb) { + auto t = std::thread([cb, expected_reply] { + cb(expected_reply); + }); + t.detach(); +} + +TEST(YieldWaiterInt, mt_wait_complete) +{ + boost::asio::io_context io_context; + int reply; + const int expected_reply = 42; + boost::asio::spawn(io_context, + [&reply](boost::asio::yield_context yield) { + yield_waiter waiter; + boost::asio::defer(yield.get_executor(),[&waiter] { + invoke_callback(expected_reply, [&waiter](int r) {waiter.complete(boost::system::error_code{}, r);}); + }); + reply = waiter.async_wait(yield); + }, rethrow); + io_context.run(); + EXPECT_EQ(reply, expected_reply); +} + } // namespace ceph::async + -- 2.39.5