<< queue_name << dendl;
}
}
-
}
}
const auto max_jitter = 500; // ms
std::uniform_int_distribution<> duration_jitter(min_jitter, max_jitter);
+ std::vector<std::string> queue_gc;
+ std::mutex queue_gc_lock;
while (true) {
Timer timer(io_context);
const auto duration = (has_error ?
continue;
}
- std::vector<std::string> queue_gc;
- std::mutex queue_gc_lock;
for (const auto& queue_name : queues) {
// try to lock the queue to check if it is owned by this rgw
// or if ownershif needs to be taken
// start the worker threads to do the actual queue processing
const std::string WORKER_THREAD_NAME = "notif-worker";
for (auto worker_id = 0U; worker_id < worker_count; ++worker_id) {
- workers.emplace_back([this]() { io_context.run(); });
- const auto rc = ceph_pthread_setname(workers.back().native_handle(),
- (WORKER_THREAD_NAME+std::to_string(worker_id)).c_str());
+ workers.emplace_back([this]() {
+ try {
+ io_context.run();
+ } catch (const std::exception& err) {
+ ldpp_dout(this, 10) << "Notification worker failed with error: " << err.what() << dendl;
+ throw(err);
+ }
+ });
+ const auto rc = ceph_pthread_setname(workers.back().native_handle(),
+ (WORKER_THREAD_NAME+std::to_string(worker_id)).c_str());
ceph_assert(rc == 0);
}
ldpp_dout(this, 10) << "Started notification manager with: " << worker_count << " workers" << dendl;