ldpp_dout(this, 10) << "Started notification manager with: " << worker_count << " workers" << dendl;
}
- int add_persistent_topic(const std::string& topic_name, optional_yield y) {
- if (topic_name == Q_LIST_OBJECT_NAME) {
+ int add_persistent_topic(const std::string& topic_queue, optional_yield y) {
+ if (topic_queue == Q_LIST_OBJECT_NAME) {
ldpp_dout(this, 1) << "ERROR: topic name cannot be: " << Q_LIST_OBJECT_NAME << " (conflict with queue list object name)" << dendl;
return -EINVAL;
}
librados::ObjectWriteOperation op;
op.create(true);
- cls_2pc_queue_init(op, topic_name, max_queue_size);
+ cls_2pc_queue_init(op, topic_queue, max_queue_size);
auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx();
- auto ret = rgw_rados_operate(this, rados_ioctx, topic_name, &op, y);
+ auto ret = rgw_rados_operate(this, rados_ioctx, topic_queue, &op, y);
if (ret == -EEXIST) {
// queue already exists - nothing to do
- ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_name << " already exists. nothing to do" << dendl;
+ ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_queue << " already exists. nothing to do" << dendl;
return 0;
}
if (ret < 0) {
// failed to create queue
- ldpp_dout(this, 1) << "ERROR: failed to create queue for topic: " << topic_name << ". error: " << ret << dendl;
+ ldpp_dout(this, 1) << "ERROR: failed to create queue for topic: " << topic_queue << ". error: " << ret << dendl;
return ret;
}
bufferlist empty_bl;
- std::map<std::string, bufferlist> new_topic{{topic_name, empty_bl}};
+ std::map<std::string, bufferlist> new_topic{{topic_queue, empty_bl}};
op.omap_set(new_topic);
ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
if (ret < 0) {
- ldpp_dout(this, 1) << "ERROR: failed to add queue: " << topic_name << " to queue list. error: " << ret << dendl;
+ ldpp_dout(this, 1) << "ERROR: failed to add queue: " << topic_queue << " to queue list. error: " << ret << dendl;
return ret;
}
- ldpp_dout(this, 20) << "INFO: queue: " << topic_name << " added to queue list" << dendl;
+ ldpp_dout(this, 20) << "INFO: queue: " << topic_queue << " added to queue list" << dendl;
return 0;
}
};
return s_manager->add_persistent_topic(topic_name, y);
}
-int remove_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx, const std::string& topic_name, optional_yield y) {
+int remove_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx, const std::string& topic_queue, optional_yield y) {
librados::ObjectWriteOperation op;
op.remove();
- auto ret = rgw_rados_operate(dpp, rados_ioctx, topic_name, &op, y);
+ auto ret = rgw_rados_operate(dpp, rados_ioctx, topic_queue, &op, y);
if (ret == -ENOENT) {
// queue already removed - nothing to do
- ldpp_dout(dpp, 20) << "INFO: queue for topic: " << topic_name << " already removed. nothing to do" << dendl;
+ ldpp_dout(dpp, 20) << "INFO: queue for topic: " << topic_queue << " already removed. nothing to do" << dendl;
return 0;
}
if (ret < 0) {
// failed to remove queue
- ldpp_dout(dpp, 1) << "ERROR: failed to remove queue for topic: " << topic_name << ". error: " << ret << dendl;
+ ldpp_dout(dpp, 1) << "ERROR: failed to remove queue for topic: " << topic_queue << ". error: " << ret << dendl;
return ret;
}
- std::set<std::string> topic_to_remove{{topic_name}};
+ std::set<std::string> topic_to_remove{{topic_queue}};
op.omap_rm_keys(topic_to_remove);
ret = rgw_rados_operate(dpp, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to remove queue: " << topic_name << " from queue list. error: " << ret << dendl;
+ ldpp_dout(dpp, 1) << "ERROR: failed to remove queue: " << topic_queue << " from queue list. error: " << ret << dendl;
return ret;
}
- ldpp_dout(dpp, 20) << "INFO: queue: " << topic_name << " removed from queue list" << dendl;
+ ldpp_dout(dpp, 20) << "INFO: queue: " << topic_queue << " removed from queue list" << dendl;
return 0;
}
-int remove_persistent_topic(const std::string& topic_name, optional_yield y) {
+int remove_persistent_topic(const std::string& topic_queue, optional_yield y) {
if (!s_manager) {
return -EAGAIN;
}
- return remove_persistent_topic(s_manager, s_manager->rados_store.getRados()->get_notif_pool_ctx(), topic_name, y);
+ return remove_persistent_topic(s_manager, s_manager->rados_store.getRados()->get_notif_pool_ctx(), topic_queue, y);
}
rgw::sal::Object* get_object_with_attributes(
void shutdown();
// create persistent delivery queue for a topic (endpoint)
-// this operation also add a topic name to the common (to all RGWs) list of all topics
-int add_persistent_topic(const std::string& topic_name, optional_yield y);
+// this operation also add a topic queue to the common (to all RGWs) list of all topics
+int add_persistent_topic(const std::string& topic_queue, optional_yield y);
// remove persistent delivery queue for a topic (endpoint)
-// this operation also remove the topic name from the common (to all RGWs) list of all topics
-int remove_persistent_topic(const std::string& topic_name, optional_yield y);
+// this operation also remove the topic queue from the common (to all RGWs) list of all topics
+int remove_persistent_topic(const std::string& topic_queue, optional_yield y);
// same as the above, expect you need to provide the IoCtx, the above uses rgw::notify::Manager::rados_ioctx
-int remove_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx, const std::string& topic_name, optional_yield y);
+int remove_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx, const std::string& topic_queue, optional_yield y);
// struct holding reservation information
// populated in the publish_reserve call