const std::string Q_LIST_OBJECT_NAME = "queues_list_object";
+struct PublishCommitCompleteArg {
+
+ PublishCommitCompleteArg(std::string _queue_name, const DoutPrefixProvider *_dpp)
+ : queue_name{std::move(_queue_name)}, dpp{_dpp} {}
+
+ std::string queue_name;
+ const DoutPrefixProvider *dpp;
+};
+
+void publish_commit_completion(rados_completion_t completion, void *arg) {
+ auto *comp_obj = reinterpret_cast<librados::AioCompletionImpl *>(completion);
+ std::unique_ptr<PublishCommitCompleteArg> pcc_arg(reinterpret_cast<PublishCommitCompleteArg *>(arg));
+ if (comp_obj->get_return_value() < 0) {
+ ldpp_dout(pcc_arg->dpp, 1) << "ERROR: failed to commit reservation to queue: "
+ << pcc_arg->queue_name << ". error: " << comp_obj->get_return_value()
+ << dendl;
+ }
+};
+
class Manager : public DoutPrefixProvider {
const size_t max_queue_size;
const uint32_t queues_update_period_ms;
std::vector<buffer::list> bl_data_vec{std::move(bl)};
librados::ObjectWriteOperation op;
cls_2pc_queue_commit(op, bl_data_vec, topic.res_id);
- const auto ret = rgw_rados_operate(
- dpp, res.store->getRados()->get_notif_pool_ctx(),
- queue_name, &op, res.yield);
+ aio_completion_ptr completion {librados::Rados::aio_create_completion()};
+ auto pcc_arg = make_unique<PublishCommitCompleteArg>(queue_name, dpp);
+ completion->set_complete_callback(pcc_arg.get(), publish_commit_completion);
+ auto &io_ctx = res.store->getRados()->get_notif_pool_ctx();
+ int ret = io_ctx.aio_operate(queue_name, completion.get(), &op);
topic.res_id = cls_2pc_reservation::NO_ID;
if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to commit reservation to queue: "
- << queue_name << ". error: " << ret
- << dendl;
+ << queue_name << ". error: " << ret << dendl;
return ret;
}
+ pcc_arg.release();
+ completion.release();
} else {
try {
// TODO add endpoint LRU cache