class Manager : public DoutPrefixProvider {
bool shutdown = false;
- const size_t max_queue_size;
const uint32_t queues_update_period_ms;
const uint32_t queues_update_retry_ms;
const uint32_t queue_idle_sleep_us;
}
// ctor: start all threads
- Manager(CephContext* _cct, uint32_t _max_queue_size, uint32_t _queues_update_period_ms,
+ Manager(CephContext* _cct, uint32_t _queues_update_period_ms,
uint32_t _queues_update_retry_ms, uint32_t _queue_idle_sleep_us, u_int32_t failover_time_ms,
uint32_t _stale_reservations_period_s, uint32_t _reservations_cleanup_period_s,
uint32_t _worker_count, rgw::sal::RadosStore* store,
const SiteConfig& site) :
- max_queue_size(_max_queue_size),
queues_update_period_ms(_queues_update_period_ms),
queues_update_retry_ms(_queues_update_retry_ms),
queue_idle_sleep_us(_queue_idle_sleep_us),
site(site),
rados_store(*store)
{}
-
- int add_persistent_topic(const std::string& topic_queue, optional_yield y) {
- if (topic_queue == Q_LIST_OBJECT_NAME) {
- ldpp_dout(this, 1) << "ERROR: topic name cannot be: " << Q_LIST_OBJECT_NAME << " (conflict with queue list object name)" << dendl;
- return -EINVAL;
- }
- librados::ObjectWriteOperation op;
- op.create(true);
- cls_2pc_queue_init(op, topic_queue, max_queue_size);
- auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx();
- auto ret = rgw_rados_operate(this, rados_ioctx, topic_queue, &op, y);
- if (ret == -EEXIST) {
- // queue already exists - nothing to do
- ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_queue << " already exists. nothing to do" << dendl;
- return 0;
- }
- if (ret < 0) {
- // failed to create queue
- ldpp_dout(this, 1) << "ERROR: failed to create queue for topic: " << topic_queue << ". error: " << ret << dendl;
- return ret;
- }
-
- bufferlist empty_bl;
- std::map<std::string, bufferlist> new_topic{{topic_queue, empty_bl}};
- op.omap_set(new_topic);
- ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
- if (ret < 0) {
- ldpp_dout(this, 1) << "ERROR: failed to add queue: " << topic_queue << " to queue list. error: " << ret << dendl;
- return ret;
- }
- ldpp_dout(this, 20) << "INFO: queue: " << topic_queue << " added to queue list" << dendl;
- return 0;
- }
};
std::unique_ptr<Manager> s_manager;
return false;
}
// TODO: take conf from CephContext
- s_manager = std::make_unique<Manager>(dpp->get_cct(), MAX_QUEUE_SIZE,
+ s_manager = std::make_unique<Manager>(dpp->get_cct(),
Q_LIST_UPDATE_MSEC, Q_LIST_RETRY_MSEC,
IDLE_TIMEOUT_USEC, FAILOVER_TIME_MSEC,
STALE_RESERVATIONS_PERIOD_S, RESERVATIONS_CLEANUP_PERIOD_S,
s_manager.reset();
}
-int add_persistent_topic(const std::string& topic_name, optional_yield y) {
+int add_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx,
+ const std::string& topic_queue, optional_yield y)
+{
+ if (topic_queue == Q_LIST_OBJECT_NAME) {
+ ldpp_dout(dpp, 1) << "ERROR: topic name cannot be: " << Q_LIST_OBJECT_NAME << " (conflict with queue list object name)" << dendl;
+ return -EINVAL;
+ }
+ librados::ObjectWriteOperation op;
+ op.create(true);
+ cls_2pc_queue_init(op, topic_queue, MAX_QUEUE_SIZE);
+ auto ret = rgw_rados_operate(dpp, rados_ioctx, topic_queue, &op, y);
+ if (ret == -EEXIST) {
+ // queue already exists - nothing to do
+ ldpp_dout(dpp, 20) << "INFO: queue for topic: " << topic_queue << " already exists. nothing to do" << dendl;
+ return 0;
+ }
+ if (ret < 0) {
+ // failed to create queue
+ ldpp_dout(dpp, 1) << "ERROR: failed to create queue for topic: " << topic_queue << ". error: " << ret << dendl;
+ return ret;
+ }
+
+ bufferlist empty_bl;
+ std::map<std::string, bufferlist> new_topic{{topic_queue, empty_bl}};
+ op.omap_set(new_topic);
+ ret = rgw_rados_operate(dpp, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to add queue: " << topic_queue << " to queue list. error: " << ret << dendl;
+ return ret;
+ }
+ ldpp_dout(dpp, 20) << "INFO: queue: " << topic_queue << " added to queue list" << dendl;
+ return 0;
+}
+
+int add_persistent_topic(const std::string& topic_queue, optional_yield y) {
if (!s_manager) {
return -EAGAIN;
}
- return s_manager->add_persistent_topic(topic_name, y);
+ return add_persistent_topic(s_manager.get(), s_manager->rados_store.getRados()->get_notif_pool_ctx(), topic_queue, y);
}
int remove_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx, const std::string& topic_queue, optional_yield y) {