const std::string Q_LIST_OBJECT_NAME = "queues_list_object";
struct PublishCommitCompleteArg {
+ PublishCommitCompleteArg(const std::string& _queue_name, CephContext* _cct)
+ : queue_name{_queue_name}, cct{_cct} {}
- PublishCommitCompleteArg(std::string _queue_name, const DoutPrefixProvider *_dpp)
- : queue_name{std::move(_queue_name)}, dpp{_dpp} {}
-
- std::string queue_name;
- const DoutPrefixProvider *dpp;
+ const std::string queue_name;
+ CephContext* const cct;
};
-void publish_commit_completion(rados_completion_t completion, void *arg) {
- auto *comp_obj = reinterpret_cast<librados::AioCompletionImpl *>(completion);
- std::unique_ptr<PublishCommitCompleteArg> pcc_arg(reinterpret_cast<PublishCommitCompleteArg *>(arg));
- if (comp_obj->get_return_value() < 0) {
- ldpp_dout(pcc_arg->dpp, 1) << "ERROR: failed to commit reservation to queue: "
- << pcc_arg->queue_name << ". error: " << comp_obj->get_return_value()
- << dendl;
- }
+void publish_commit_completion(rados_completion_t completion, void* arg) {
+ std::unique_ptr<PublishCommitCompleteArg> pcc_args{reinterpret_cast<PublishCommitCompleteArg*>(arg)};
+ if (const auto rc = rados_aio_get_return_value(completion); rc < 0) {
+ ldout(pcc_args->cct, 1) << "ERROR: failed to commit reservation to queue: "
+ << pcc_args->queue_name << ". error: " << rc << dendl;
+ }
};
class Manager : public DoutPrefixProvider {
std::vector<buffer::list> bl_data_vec{std::move(bl)};
librados::ObjectWriteOperation op;
cls_2pc_queue_commit(op, bl_data_vec, topic.res_id);
- aio_completion_ptr completion {librados::Rados::aio_create_completion()};
- auto pcc_arg = make_unique<PublishCommitCompleteArg>(queue_name, dpp);
- completion->set_complete_callback(pcc_arg.get(), publish_commit_completion);
- auto &io_ctx = res.store->getRados()->get_notif_pool_ctx();
- int ret = io_ctx.aio_operate(queue_name, completion.get(), &op);
topic.res_id = cls_2pc_reservation::NO_ID;
- if (ret < 0) {
+ auto pcc_arg = make_unique<PublishCommitCompleteArg>(queue_name, dpp->get_cct());
+ aio_completion_ptr completion{librados::Rados::aio_create_completion(pcc_arg.get(), publish_commit_completion)};
+ auto& io_ctx = res.store->getRados()->get_notif_pool_ctx();
+ if (const int ret = io_ctx.aio_operate(queue_name, completion.get(), &op); ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to commit reservation to queue: "
<< queue_name << ". error: " << ret << dendl;
return ret;
}
+ // args will be released inside the callback
pcc_arg.release();
- completion.release();
} else {
try {
// TODO add endpoint LRU cache