}
};
+namespace {
+// this allows waiting untill "finish()" is called from a different thread
+// waiting could be blocking the waiting thread or yielding, depending
+// with compilation flag support and whether the optional_yield is set
+class Waiter {
+ using Signature = void(boost::system::error_code);
+ using Completion = ceph::async::Completion<Signature>;
+ using CompletionInit = boost::asio::async_completion<yield_context, Signature>;
+ std::unique_ptr<Completion> completion = nullptr;
+ int ret;
+
+ bool done = false;
+ mutable std::mutex lock;
+ mutable std::condition_variable cond;
+
+public:
+ int wait(optional_yield y) {
+ std::unique_lock l{lock};
+ if (done) {
+ return ret;
+ }
+ if (y) {
+ boost::system::error_code ec;
+ auto&& token = y.get_yield_context()[ec];
+ CompletionInit init(token);
+ completion = Completion::create(y.get_io_context().get_executor(),
+ std::move(init.completion_handler));
+ l.unlock();
+ init.result.get();
+ return -ec.value();
+ }
+ cond.wait(l, [this]{return (done==true);});
+ return ret;
+ }
+
+ void finish(int r) {
+ std::unique_lock l{lock};
+ ret = r;
+ done = true;
+ if (completion) {
+ boost::system::error_code ec(-ret, boost::system::system_category());
+ Completion::post(std::move(completion), ec);
+ } else {
+ cond.notify_all();
+ }
+ }
+};
+} // namespace
+
#ifdef WITH_RADOSGW_AMQP_ENDPOINT
class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
private:
return amqp::publish(conn, topic, json_format_pubsub_event(event));
} else {
// TODO: currently broker and routable are the same - this will require different flags but the same mechanism
- // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
- auto w = std::unique_ptr<Waiter>(new Waiter);
+ auto w = std::make_unique<Waiter>();
const auto rc = amqp::publish_with_confirm(conn,
topic,
json_format_pubsub_event(event),
- std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
+ [wp = w.get()](int r) { wp->finish(r);}
+ );
if (rc < 0) {
// failed to publish, does not wait for reply
return rc;
if (ack_level == ack_level_t::None) {
return kafka::publish(conn, topic, json_format_pubsub_event(event));
} else {
- // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
- auto w = std::unique_ptr<Waiter>(new Waiter);
+ auto w = std::make_unique<Waiter>();
const auto rc = kafka::publish_with_confirm(conn,
topic,
json_format_pubsub_event(event),
- std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
+ [wp = w.get()](int r) { wp->finish(r); }
+ );
if (rc < 0) {
// failed to publish, does not wait for reply
return rc;