]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/s3-notifications: reservations are now committed asynchronously 55199/head
authorigomon <igomon@bloomberg.net>
Tue, 16 Jan 2024 19:28:01 +0000 (14:28 -0500)
committerigomon <igomon@bloomberg.net>
Wed, 24 Jan 2024 15:25:15 +0000 (10:25 -0500)
Signed-off-by: Igor Gomon <igomon@bloomberg.net>
src/rgw/driver/rados/rgw_notify.cc

index 6711f310c2681a1e3af93e5a68d12585cd884ffd..9baed182ed9466beae6a8f52e7821afd7d6eedff 100644 (file)
@@ -84,6 +84,25 @@ auto make_stack_allocator() {
 
 const std::string Q_LIST_OBJECT_NAME = "queues_list_object";
 
+struct PublishCommitCompleteArg {
+
+    PublishCommitCompleteArg(std::string _queue_name, const DoutPrefixProvider *_dpp)
+            : queue_name{std::move(_queue_name)}, dpp{_dpp} {}
+
+    std::string queue_name;
+    const DoutPrefixProvider *dpp;
+};
+
+void publish_commit_completion(rados_completion_t completion, void *arg) {
+    auto *comp_obj = reinterpret_cast<librados::AioCompletionImpl *>(completion);
+    std::unique_ptr<PublishCommitCompleteArg> pcc_arg(reinterpret_cast<PublishCommitCompleteArg *>(arg));
+    if (comp_obj->get_return_value() < 0) {
+        ldpp_dout(pcc_arg->dpp, 1) << "ERROR: failed to commit reservation to queue: "
+                                   << pcc_arg->queue_name << ". error: " << comp_obj->get_return_value()
+                                   << dendl;
+    }
+};
+
 class Manager : public DoutPrefixProvider {
   const size_t max_queue_size;
   const uint32_t queues_update_period_ms;
@@ -1087,16 +1106,19 @@ int publish_commit(rgw::sal::Object* obj,
       std::vector<buffer::list> bl_data_vec{std::move(bl)};
       librados::ObjectWriteOperation op;
       cls_2pc_queue_commit(op, bl_data_vec, topic.res_id);
-      const auto ret = rgw_rados_operate(
-       dpp, res.store->getRados()->get_notif_pool_ctx(),
-       queue_name, &op, res.yield);
+      aio_completion_ptr completion {librados::Rados::aio_create_completion()};
+      auto pcc_arg = make_unique<PublishCommitCompleteArg>(queue_name, dpp);
+      completion->set_complete_callback(pcc_arg.get(), publish_commit_completion);
+      auto &io_ctx = res.store->getRados()->get_notif_pool_ctx();
+      int ret = io_ctx.aio_operate(queue_name, completion.get(), &op);
       topic.res_id = cls_2pc_reservation::NO_ID;
       if (ret < 0) {
         ldpp_dout(dpp, 1) << "ERROR: failed to commit reservation to queue: "
-                         << queue_name << ". error: " << ret
-                         << dendl;
+                          << queue_name << ". error: " << ret << dendl;
         return ret;
       }
+      pcc_arg.release();
+      completion.release();
     } else {
       try {
         // TODO add endpoint LRU cache