#include "common/Formatter.h"
#include "common/iso_8601.h"
#include "common/async/completion.h"
+#include "common/async/yield_waiter.h"
+#include "common/async/waiter.h"
#include "rgw_asio_thread.h"
#include "rgw_common.h"
#include "rgw_data_sync.h"
}
};
-namespace {
-// this allows waiting untill "finish()" is called from a different thread
-// waiting could be blocking the waiting thread or yielding, depending
-// with compilation flag support and whether the optional_yield is set
-class Waiter {
- using Signature = void(boost::system::error_code);
- using Completion = ceph::async::Completion<Signature>;
- std::unique_ptr<Completion> completion = nullptr;
- int ret;
-
- bool done = false;
- mutable std::mutex lock;
- mutable std::condition_variable cond;
-
-public:
- int wait(const DoutPrefixProvider* dpp, optional_yield y) {
- std::unique_lock l{lock};
- if (done) {
- return ret;
- }
- if (y) {
- boost::system::error_code ec;
- auto yield = y.get_yield_context();
- auto&& token = yield[ec];
- boost::asio::async_initiate<boost::asio::yield_context, Signature>(
- [this, &l] (auto handler, auto ex) {
- completion = Completion::create(ex, std::move(handler));
- l.unlock(); // unlock before suspend
- }, token, yield.get_executor());
- return -ec.value();
- }
- maybe_warn_about_blocking(dpp);
-
- cond.wait(l, [this]{return (done==true);});
- return ret;
- }
-
- void finish(int r) {
- std::unique_lock l{lock};
- ret = r;
- done = true;
- if (completion) {
- boost::system::error_code ec(-ret, boost::system::system_category());
- Completion::post(std::move(completion), ec);
- } else {
- cond.notify_all();
- }
- }
-};
-} // namespace
-
#ifdef WITH_RADOSGW_AMQP_ENDPOINT
class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
private:
return amqp::publish(conn_id, topic, json_format_pubsub_event(event));
} else {
// TODO: currently broker and routable are the same - this will require different flags but the same mechanism
- auto w = std::make_unique<Waiter>();
- const auto rc = amqp::publish_with_confirm(conn_id,
- topic,
- json_format_pubsub_event(event),
- [wp = w.get()](int r) { wp->finish(r);}
- );
+ if (y) {
+ auto& yield = y.get_yield_context();
+ ceph::async::yield_waiter<int> w;
+ boost::asio::defer(yield.get_executor(),[&w, &event, this]() {
+ const auto rc = amqp::publish_with_confirm(
+ conn_id, topic, json_format_pubsub_event(event),
+ [&w](int r) {w.complete(boost::system::error_code{}, r);});
+ if (rc < 0) {
+ // failed to publish, does not wait for reply
+ w.complete(boost::system::error_code{}, rc);
+ }
+ });
+ return w.async_wait(yield);
+ }
+ ceph::async::waiter<int> w;
+ const auto rc = amqp::publish_with_confirm(
+ conn_id, topic, json_format_pubsub_event(event),
+ [&w](int r) {w(r);});
if (rc < 0) {
// failed to publish, does not wait for reply
return rc;
}
- return w->wait(dpp, y);
+ return w.wait();
}
}
if (ack_level == ack_level_t::None) {
return kafka::publish(conn_id, topic, json_format_pubsub_event(event));
} else {
- auto w = std::make_unique<Waiter>();
+ if (y) {
+ auto& yield = y.get_yield_context();
+ ceph::async::yield_waiter<int> w;
+ boost::asio::defer(yield.get_executor(),[&w, &event, this]() {
+ const auto rc = kafka::publish_with_confirm(
+ conn_id, topic, json_format_pubsub_event(event),
+ [&w](int r) {w.complete(boost::system::error_code{}, r);});
+ if (rc < 0) {
+ // failed to publish, does not wait for reply
+ w.complete(boost::system::error_code{}, rc);
+ }
+ });
+ return w.async_wait(yield);
+ }
+ ceph::async::waiter<int> w;
const auto rc = kafka::publish_with_confirm(
- conn_id, topic, json_format_pubsub_event(event),
- [wp = w.get()](int r) { wp->finish(r); });
+ conn_id, topic, json_format_pubsub_event(event),
+ [&w](int r) {w(r);});
if (rc < 0) {
// failed to publish, does not wait for reply
return rc;
}
- return w->wait(dpp, y);
+ return w.wait();
}
}
#include <exception>
#include <memory>
#include <optional>
+#include <thread>
#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>
#include <gtest/gtest.h>
}
}
+void invoke_callback(int expected_reply, std::function<void(int)> cb) {
+ auto t = std::thread([cb, expected_reply] {
+ cb(expected_reply);
+ });
+ t.detach();
+}
+
+TEST(YieldWaiterInt, mt_wait_complete)
+{
+ boost::asio::io_context io_context;
+ int reply;
+ const int expected_reply = 42;
+ boost::asio::spawn(io_context,
+ [&reply](boost::asio::yield_context yield) {
+ yield_waiter<int> waiter;
+ boost::asio::defer(yield.get_executor(),[&waiter] {
+ invoke_callback(expected_reply, [&waiter](int r) {waiter.complete(boost::system::error_code{}, r);});
+ });
+ reply = waiter.async_wait(yield);
+ }, rethrow);
+ io_context.run();
+ EXPECT_EQ(reply, expected_reply);
+}
+
} // namespace ceph::async
+