]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/async/notifications: use common async waiter in pubsub push 58765/head
authorYuval Lifshitz <ylifshit@ibm.com>
Tue, 23 Jul 2024 17:41:50 +0000 (17:41 +0000)
committerYuval Lifshitz <ylifshit@ibm.com>
Thu, 25 Jul 2024 16:23:56 +0000 (16:23 +0000)
* use the "yield_waiter" and "waiter" from common/async insteasd of the "waiter"
  implemented inside the bucket notification code (this is so we don't
  need separate investigations for 2 implementations)
* added a unit test that simulate how a separate thread (kafka or amqp) is
resuming a coroutine which is created by either the frontend or the
notification manager.

before using "defer" the unit test is passing, however,
when executed under thread sanitizer (using the WITH_TSAN cmake flag)
the following errors are observed: https://0x0.st/Xp4P.txt
after using "defer" the unit test passes under TSAN without errors.

Fixes: https://tracker.ceph.com/issues/64184
Signed-off-by: Yuval Lifshitz <ylifshit@ibm.com>
src/rgw/driver/rados/rgw_pubsub_push.cc
src/test/common/test_async_yield_waiter.cc

index 4e73eeb770a08c5db73e99f91419ac89f1b88063..07d65fa10280f6e92745f4f8a6475456c0de733e 100644 (file)
@@ -9,6 +9,8 @@
 #include "common/Formatter.h"
 #include "common/iso_8601.h"
 #include "common/async/completion.h"
+#include "common/async/yield_waiter.h"
+#include "common/async/waiter.h"
 #include "rgw_asio_thread.h"
 #include "rgw_common.h"
 #include "rgw_data_sync.h"
@@ -131,57 +133,6 @@ public:
   }
 };
 
-namespace {
-// this allows waiting untill "finish()" is called from a different thread
-// waiting could be blocking the waiting thread or yielding, depending
-// with compilation flag support and whether the optional_yield is set
-class Waiter {
-  using Signature = void(boost::system::error_code);
-  using Completion = ceph::async::Completion<Signature>;
-  std::unique_ptr<Completion> completion = nullptr;
-  int ret;
-
-  bool done = false;
-  mutable std::mutex lock;
-  mutable std::condition_variable cond;
-
-public:
-  int wait(const DoutPrefixProvider* dpp, optional_yield y) {
-    std::unique_lock l{lock};
-    if (done) {
-      return ret;
-    }
-    if (y) {
-      boost::system::error_code ec;
-      auto yield = y.get_yield_context();
-      auto&& token = yield[ec];
-      boost::asio::async_initiate<boost::asio::yield_context, Signature>(
-          [this, &l] (auto handler, auto ex) {
-            completion = Completion::create(ex, std::move(handler));
-            l.unlock(); // unlock before suspend
-          }, token, yield.get_executor());
-      return -ec.value();
-    }
-    maybe_warn_about_blocking(dpp);
-
-    cond.wait(l, [this]{return (done==true);});
-    return ret;
-  }
-
-  void finish(int r) {
-    std::unique_lock l{lock};
-    ret = r;
-    done = true;
-    if (completion) {
-      boost::system::error_code ec(-ret, boost::system::system_category());
-      Completion::post(std::move(completion), ec);
-    } else {
-      cond.notify_all();
-    }
-  }
-};
-} // namespace
-
 #ifdef WITH_RADOSGW_AMQP_ENDPOINT
 class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
 private:
@@ -256,17 +207,29 @@ public:
       return amqp::publish(conn_id, topic, json_format_pubsub_event(event));
     } else {
       // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
-      auto w = std::make_unique<Waiter>();
-      const auto rc = amqp::publish_with_confirm(conn_id, 
-        topic,
-        json_format_pubsub_event(event),
-        [wp = w.get()](int r) { wp->finish(r);}
-      );
+      if (y) {
+        auto& yield = y.get_yield_context();
+        ceph::async::yield_waiter<int> w;
+        boost::asio::defer(yield.get_executor(),[&w, &event, this]() {
+          const auto rc = amqp::publish_with_confirm(
+              conn_id, topic, json_format_pubsub_event(event),
+              [&w](int r) {w.complete(boost::system::error_code{}, r);});
+          if (rc < 0) {
+            // failed to publish, does not wait for reply
+            w.complete(boost::system::error_code{}, rc);
+          }
+        });
+        return w.async_wait(yield);
+      }
+      ceph::async::waiter<int> w;
+      const auto rc = amqp::publish_with_confirm(
+            conn_id, topic, json_format_pubsub_event(event),
+            [&w](int r) {w(r);});
       if (rc < 0) {
         // failed to publish, does not wait for reply
         return rc;
       }
-      return w->wait(dpp, y);
+      return w.wait();
     }
   }
 
@@ -329,15 +292,29 @@ public:
     if (ack_level == ack_level_t::None) {
       return kafka::publish(conn_id, topic, json_format_pubsub_event(event));
     } else {
-      auto w = std::make_unique<Waiter>();
+      if (y) {
+        auto& yield = y.get_yield_context();
+        ceph::async::yield_waiter<int> w;
+        boost::asio::defer(yield.get_executor(),[&w, &event, this]() {
+          const auto rc = kafka::publish_with_confirm(
+              conn_id, topic, json_format_pubsub_event(event),
+              [&w](int r) {w.complete(boost::system::error_code{}, r);});
+          if (rc < 0) {
+            // failed to publish, does not wait for reply
+            w.complete(boost::system::error_code{}, rc);
+          }
+        });
+        return w.async_wait(yield);
+      }
+      ceph::async::waiter<int> w;
       const auto rc = kafka::publish_with_confirm(
-          conn_id, topic, json_format_pubsub_event(event),
-          [wp = w.get()](int r) { wp->finish(r); });
+            conn_id, topic, json_format_pubsub_event(event),
+            [&w](int r) {w(r);});
       if (rc < 0) {
         // failed to publish, does not wait for reply
         return rc;
       }
-      return w->wait(dpp, y);
+      return w.wait();
     }
   }
 
index 6746825968e1c5f3da2098215a077629ad5f69e8..cd74ffc526e7edff44961e7d8b02f6ce21de493a 100644 (file)
@@ -17,6 +17,7 @@
 #include <exception>
 #include <memory>
 #include <optional>
+#include <thread>
 #include <boost/asio/io_context.hpp>
 #include <boost/asio/spawn.hpp>
 #include <gtest/gtest.h>
@@ -240,4 +241,29 @@ TEST(YieldWaiterPtr, wait_error)
   }
 }
 
+void invoke_callback(int expected_reply, std::function<void(int)> cb) {
+  auto t = std::thread([cb, expected_reply] {
+      cb(expected_reply);
+  }); 
+  t.detach();
+}
+
+TEST(YieldWaiterInt, mt_wait_complete)
+{
+  boost::asio::io_context io_context;
+  int reply;
+  const int expected_reply = 42; 
+  boost::asio::spawn(io_context,
+      [&reply](boost::asio::yield_context yield) {
+        yield_waiter<int> waiter;
+        boost::asio::defer(yield.get_executor(),[&waiter] {
+            invoke_callback(expected_reply, [&waiter](int r) {waiter.complete(boost::system::error_code{}, r);});
+          });
+        reply = waiter.async_wait(yield);
+      }, rethrow);
+  io_context.run(); 
+  EXPECT_EQ(reply, expected_reply);
+}
+
 } // namespace ceph::async
+