return boost::context::protected_fixedsize_stack{128*1024};
}
+const std::string Q_LIST_OBJECT_NAME = "queues_list_object";
+
class Manager : public DoutPrefixProvider {
const size_t max_queue_size;
const uint32_t queues_update_period_ms;
const uint32_t queue_idle_sleep_us;
const utime_t failover_time;
CephContext* const cct;
- librados::IoCtx& rados_ioctx;
static constexpr auto COOKIE_LEN = 16;
const std::string lock_cookie;
boost::asio::io_context io_context;
std::vector<std::thread> workers;
const uint32_t stale_reservations_period_s;
const uint32_t reservations_cleanup_period_s;
-
- const std::string Q_LIST_OBJECT_NAME = "queues_list_object";
+public:
+ librados::IoCtx& rados_ioctx;
+private:
CephContext *get_cct() const override { return cct; }
unsigned get_subsys() const override { return dout_subsys; }
queue_idle_sleep_us(_queue_idle_sleep_us),
failover_time(std::chrono::milliseconds(failover_time_ms)),
cct(_cct),
- rados_ioctx(store->getRados()->get_notif_pool_ctx()),
lock_cookie(gen_rand_alphanumeric(cct, COOKIE_LEN)),
work_guard(boost::asio::make_work_guard(io_context)),
worker_count(_worker_count),
stale_reservations_period_s(_stale_reservations_period_s),
- reservations_cleanup_period_s(_reservations_cleanup_period_s)
+ reservations_cleanup_period_s(_reservations_cleanup_period_s),
+ rados_ioctx(store->getRados()->get_notif_pool_ctx())
{
spawn::spawn(io_context, [this] (yield_context yield) {
process_queues(yield);
ldpp_dout(this, 20) << "INFO: queue: " << topic_name << " added to queue list" << dendl;
return 0;
}
-
- int remove_persistent_topic(const std::string& topic_name, optional_yield y) {
- librados::ObjectWriteOperation op;
- op.remove();
- auto ret = rgw_rados_operate(this, rados_ioctx, topic_name, &op, y);
- if (ret == -ENOENT) {
- // queue already removed - nothing to do
- ldpp_dout(this, 20) << "INFO: queue for topic: " << topic_name << " already removed. nothing to do" << dendl;
- return 0;
- }
- if (ret < 0) {
- // failed to remove queue
- ldpp_dout(this, 1) << "ERROR: failed to remove queue for topic: " << topic_name << ". error: " << ret << dendl;
- return ret;
- }
-
- std::set<std::string> topic_to_remove{{topic_name}};
- op.omap_rm_keys(topic_to_remove);
- ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
- if (ret < 0) {
- ldpp_dout(this, 1) << "ERROR: failed to remove queue: " << topic_name << " from queue list. error: " << ret << dendl;
- return ret;
- }
- ldpp_dout(this, 20) << "INFO: queue: " << topic_name << " removed from queue list" << dendl;
- return 0;
- }
};
// singleton manager
return s_manager->add_persistent_topic(topic_name, y);
}
+int remove_persistent_topic(const DoutPrefixProvider* dpp, librados::IoCtx& rados_ioctx, const std::string& topic_name, optional_yield y) {
+ librados::ObjectWriteOperation op;
+ op.remove();
+ auto ret = rgw_rados_operate(dpp, rados_ioctx, topic_name, &op, y);
+ if (ret == -ENOENT) {
+ // queue already removed - nothing to do
+ ldpp_dout(dpp, 20) << "INFO: queue for topic: " << topic_name << " already removed. nothing to do" << dendl;
+ return 0;
+ }
+ if (ret < 0) {
+ // failed to remove queue
+ ldpp_dout(dpp, 1) << "ERROR: failed to remove queue for topic: " << topic_name << ". error: " << ret << dendl;
+ return ret;
+ }
+
+ std::set<std::string> topic_to_remove{{topic_name}};
+ op.omap_rm_keys(topic_to_remove);
+ ret = rgw_rados_operate(dpp, rados_ioctx, Q_LIST_OBJECT_NAME, &op, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to remove queue: " << topic_name << " from queue list. error: " << ret << dendl;
+ return ret;
+ }
+ ldpp_dout(dpp, 20) << "INFO: queue: " << topic_name << " removed from queue list" << dendl;
+ return 0;
+}
+
int remove_persistent_topic(const std::string& topic_name, optional_yield y) {
if (!s_manager) {
return -EAGAIN;
}
- return s_manager->remove_persistent_topic(topic_name, y);
+ return remove_persistent_topic(s_manager, s_manager->rados_ioctx, topic_name, y);
}
rgw::sal::Object* get_object_with_atttributes(