#include <fmt/format.h>
#include "librados/AioCompletionImpl.h"
#include "common/async/yield_waiter.h"
+#include <future>
#include <unordered_map>
};
class Manager : public DoutPrefixProvider {
+ using Executor = boost::asio::io_context::executor_type;
bool shutdown = false;
- const uint32_t queues_update_period_ms;
- const uint32_t queues_update_retry_ms;
- const uint32_t queue_idle_sleep_us;
- const utime_t failover_time;
+ static constexpr auto queues_update_period = std::chrono::milliseconds(30000); // 30s
+ static constexpr auto queues_update_retry = std::chrono::milliseconds(1000); // 1s
+ static constexpr auto queue_idle_sleep = std::chrono::milliseconds(100); // 100ms
+ const utime_t failover_time = utime_t(queues_update_period*3); // 90s
CephContext* const cct;
static constexpr auto COOKIE_LEN = 16;
const std::string lock_cookie;
boost::asio::io_context io_context;
- boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_guard;
- const uint32_t worker_count;
- std::vector<std::thread> workers;
- const uint32_t stale_reservations_period_s;
- const uint32_t reservations_cleanup_period_s;
+ boost::asio::executor_work_guard<Executor> work_guard;
+ std::thread worker;
+ static constexpr auto stale_reservations_period = std::chrono::seconds(120); // 120s
+ static constexpr auto reservations_cleanup_period = std::chrono::seconds(30); // 30s
queues_persistency_tracker topics_persistency_tracker;
const SiteConfig& site;
-public:
- rgw::sal::RadosStore& rados_store;
+ rgw::sal::RadosStore* const rados_store;
private:
librados::ObjectReadOperation op;
queues_t queues_chunk;
op.omap_get_keys2(start_after, max_chunk, &queues_chunk, &more, &rval);
- const auto ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), Q_LIST_OBJECT_NAME, std::move(op), nullptr, y);
+ const auto ret = rgw_rados_operate(this, rados_store->getRados()->get_notif_pool_ctx(), Q_LIST_OBJECT_NAME, std::move(op), nullptr, y);
if (ret == -ENOENT) {
// queue list object was not created - nothing to do
return 0;
return EntryProcessingResult::Successful;
}
- using Clock = ceph::coarse_mono_clock;
- using Executor = boost::asio::io_context::executor_type;
- using Timer = boost::asio::basic_waitable_timer<Clock,
+ void async_sleep(boost::asio::yield_context yield, const std::chrono::milliseconds& duration) {
+ using Clock = ceph::coarse_mono_clock;
+ using Timer = boost::asio::basic_waitable_timer<Clock,
boost::asio::wait_traits<Clock>, Executor>;
+ Timer timer(io_context);
+ timer.expires_after(duration);
+ boost::system::error_code ec;
+ timer.async_wait(yield[ec]);
+ if (ec) {
+ ldpp_dout(this, 1) << "ERROR: async_sleep failed with error: " << ec.message() << dendl;
+ }
+ }
// clean stale reservation from queue
void cleanup_queue(const std::string& queue_name, boost::asio::yield_context yield) {
+ auto next_check_time = ceph::coarse_real_clock::zero();
while (!shutdown) {
- ldpp_dout(this, 20) << "INFO: trying to perform stale reservation cleanup for queue: " << queue_name << dendl;
- const auto now = ceph::coarse_real_time::clock::now();
- const auto stale_time = now - std::chrono::seconds(stale_reservations_period_s);
+ // check if it is time for queue cleanup
+ if (ceph::coarse_real_clock::now() > next_check_time) {
+ next_check_time = ceph::coarse_real_clock::now() + reservations_cleanup_period;
+ } else {
+ // short sleep duration to prevent busy wait when doing queue cleanup
+ async_sleep(yield, queues_update_retry);
+ continue;
+ }
+ const auto tp = ceph::coarse_real_time::clock::to_time_t(next_check_time);
+ ldpp_dout(this, 20) << "INFO: performing stale reservation cleanup for queue: " << queue_name <<
+ ". next cleanup will happen at: " << std::ctime(&tp) << dendl;
+ const auto stale_time = ceph::coarse_real_time::clock::now() - stale_reservations_period;
librados::ObjectWriteOperation op;
op.assert_exists();
- rados::cls::lock::assert_locked(&op, queue_name+"_lock",
+ rados::cls::lock::assert_locked(&op, queue_name+"_lock",
ClsLockType::EXCLUSIVE,
- lock_cookie,
+ lock_cookie,
"" /*no tag*/);
cls_2pc_queue_expire_reservations(op, stale_time);
// check ownership and do reservation cleanup in one batch
- auto ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, std::move(op), yield);
+ auto ret = rgw_rados_operate(this, rados_store->getRados()->get_notif_pool_ctx(), queue_name, std::move(op), yield);
if (ret == -ENOENT) {
// queue was deleted
ldpp_dout(this, 10) << "INFO: queue: " << queue_name
ldpp_dout(this, 5) << "WARNING: failed to cleanup stale reservation from queue and/or lock queue: " << queue_name
<< ". error: " << ret << dendl;
}
- Timer timer(io_context);
- timer.expires_after(std::chrono::seconds(reservations_cleanup_period_s));
- boost::system::error_code ec;
- timer.async_wait(yield[ec]);
}
ldpp_dout(this, 5) << "INFO: manager stopped. done cleanup for queue: " << queue_name << dendl;
}
librados::ObjectWriteOperation op;
op.assert_exists();
rados::cls::lock::unlock(&op, queue_name+"_lock", lock_cookie);
- auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx();
+ auto& rados_ioctx = rados_store->getRados()->get_notif_pool_ctx();
const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, std::move(op), yield);
if (ret == -ENOENT) {
ldpp_dout(this, 10) << "INFO: queue: " << queue_name
std::string queue_topic_name;
parse_topic_metadata_key(queue_name, queue_topic_tenant, queue_topic_name);
rgw_pubsub_topic topic_info;
- RGWPubSub ps(&rados_store, queue_topic_tenant, site);
+ RGWPubSub ps(rados_store, queue_topic_tenant, site);
int ret = ps.get_topic(this, queue_topic_name, topic_info, yield, nullptr);
if (ret < 0) {
ldpp_dout(this, 1) << "WARNING: failed to fetch topic: "
while (!shutdown) {
// if queue was empty the last time, sleep for idle timeout
if (is_idle) {
- Timer timer(io_context);
- timer.expires_after(std::chrono::microseconds(queue_idle_sleep_us));
- boost::system::error_code ec;
- timer.async_wait(yield[ec]);
+ async_sleep(yield, queue_idle_sleep);
}
// get list of entries in the queue
- auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx();
+ auto& rados_ioctx = rados_store->getRados()->get_notif_pool_ctx();
is_idle = true;
bool truncated = false;
std::string end_marker;
std::string tenant_name;
// TODO: extract tenant name from queue_name once it is fixed
uint64_t size_to_migrate = 0;
- RGWPubSub ps(&rados_store, tenant_name, site);
+ RGWPubSub ps(rados_store, tenant_name, site);
rgw_pubsub_topic topic;
auto ret_of_get_topic = ps.get_topic(this, queue_name, topic,
owned_queues_t owned_queues;
size_t processed_queue_count = 0;
- // add randomness to the duration between queue checking
- // to make sure that different daemons are not synced
- std::random_device seed;
- std::mt19937 rnd_gen(seed());
- const auto min_jitter = 100; // ms
- const auto max_jitter = 500; // ms
- std::uniform_int_distribution<> duration_jitter(min_jitter, max_jitter);
-
std::vector<std::string> queue_gc;
std::mutex queue_gc_lock;
- auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx();
+ auto& rados_ioctx = rados_store->getRados()->get_notif_pool_ctx();
+ auto next_check_time = ceph::coarse_real_clock::zero();
while (!shutdown) {
- Timer timer(io_context);
- const auto duration = (has_error ?
- std::chrono::milliseconds(queues_update_retry_ms) : std::chrono::milliseconds(queues_update_period_ms)) +
- std::chrono::milliseconds(duration_jitter(rnd_gen));
- timer.expires_after(duration);
- const auto tp = ceph::coarse_real_time::clock::to_time_t(ceph::coarse_real_time::clock::now() + duration);
- ldpp_dout(this, 20) << "INFO: next queues processing will happen at: " << std::ctime(&tp) << dendl;
- boost::system::error_code ec;
- timer.async_wait(yield[ec]);
+ // check if queue list needs to be refreshed
+ if (ceph::coarse_real_clock::now() > next_check_time) {
+ next_check_time = ceph::coarse_real_clock::now() + queues_update_period;
+ const auto tp = ceph::coarse_real_time::clock::to_time_t(next_check_time);
+ ldpp_dout(this, 20) << "INFO: processing queue list. next queues processing will happen at: " << std::ctime(&tp) << dendl;
+ } else {
+ // short sleep duration to prevent busy wait when refreshing queue list
+ // or retrying after error
+ async_sleep(yield, queues_update_retry);
+ if (!has_error) {
+ // in case of error we will retry
+ continue;
+ }
+ }
queues_t queues;
auto ret = read_queue_list(queues, yield);
// or if ownership needs to be taken
librados::ObjectWriteOperation op;
op.assert_exists();
- rados::cls::lock::lock(&op, queue_name+"_lock",
+ rados::cls::lock::lock(&op, queue_name+"_lock",
ClsLockType::EXCLUSIVE,
- lock_cookie,
+ lock_cookie,
"" /*no tag*/,
"" /*no description*/,
failover_time,
queue_gc.clear();
}
}
- Timer timer(io_context);
while (processed_queue_count > 0) {
- ldpp_dout(this, 5) << "INFO: manager stopped. " << processed_queue_count << " queues are still being processed" << dendl;
- timer.expires_after(std::chrono::milliseconds(queues_update_retry_ms));
- boost::system::error_code ec;
- timer.async_wait(yield[ec]);
+ ldpp_dout(this, 20) << "INFO: manager stopped. " << processed_queue_count << " queues are still being processed" << dendl;
+ async_sleep(yield, queues_update_retry);
}
ldpp_dout(this, 5) << "INFO: manager stopped. done processing all queues" << dendl;
}
public:
- ~Manager() {
- }
+ ~Manager() = default;
void stop() {
+ ldpp_dout(this, 5) << "INFO: manager received stop signal. shutting down..." << dendl;
shutdown = true;
work_guard.reset();
- std::for_each(workers.begin(), workers.end(), [] (auto& worker) { worker.join(); });
+ if (worker.joinable()) {
+ // try graceful shutdown first
+ auto future = std::async(std::launch::async, [this]() {worker.join();});
+ if (future.wait_for(queues_update_retry*2) == std::future_status::timeout) {
+ // force stop if graceful shutdown takes too long
+ if (!io_context.stopped()) {
+ ldpp_dout(this, 5) << "INFO: force shutdown of manager" << dendl;
+ io_context.stop();
+ }
+ worker.join();
+ }
+ }
+ ldpp_dout(this, 5) << "INFO: manager shutdown ended" << dendl;
}
void init() {
});
// start the worker threads to do the actual queue processing
- for (auto worker_id = 0U; worker_id < worker_count; ++worker_id) {
- workers.emplace_back([this,worker_id]() {
- const auto thread_name = fmt::format("notif-worker-{}", worker_id);
- ceph_pthread_setname(thread_name.c_str());
- try {
- io_context.run();
- } catch (const std::exception& err) {
- ldpp_dout(this, 1) << "ERROR: notification worker failed with error: " << err.what() << dendl;
- throw err;
- }
- });
- }
- ldpp_dout(this, 10) << "INfO: started notification manager with: " << worker_count << " workers" << dendl;
+ worker = std::thread([this]() {
+ ceph_pthread_setname("notif-worker");
+ try {
+ ldpp_dout(this, 10) << "INFO: notification worker started" << dendl;
+ io_context.run();
+ ldpp_dout(this, 10) << "INFO: notification worker ended" << dendl;
+ } catch (const std::exception& err) {
+ ldpp_dout(this, 1) << "ERROR: notification worker failed with error: " << err.what() << dendl;
+ throw err;
+ }
+ });
+ ldpp_dout(this, 10) << "INfO: started notification manager" << dendl;
}
- // ctor: start all threads
- Manager(CephContext* _cct, uint32_t _queues_update_period_ms,
- uint32_t _queues_update_retry_ms, uint32_t _queue_idle_sleep_us, u_int32_t failover_time_ms,
- uint32_t _stale_reservations_period_s, uint32_t _reservations_cleanup_period_s,
- uint32_t _worker_count, rgw::sal::RadosStore* store,
- const SiteConfig& site) :
- queues_update_period_ms(_queues_update_period_ms),
- queues_update_retry_ms(_queues_update_retry_ms),
- queue_idle_sleep_us(_queue_idle_sleep_us),
- failover_time(std::chrono::milliseconds(failover_time_ms)),
+ Manager(CephContext* _cct, rgw::sal::RadosStore* store, const SiteConfig& site) :
cct(_cct),
lock_cookie(gen_rand_alphanumeric(cct, COOKIE_LEN)),
work_guard(boost::asio::make_work_guard(io_context)),
- worker_count(_worker_count),
- stale_reservations_period_s(_stale_reservations_period_s),
- reservations_cleanup_period_s(_reservations_cleanup_period_s),
site(site),
- rados_store(*store)
+ rados_store(store)
{}
};
std::unique_ptr<Manager> s_manager;
constexpr size_t MAX_QUEUE_SIZE = 128*1000*1000; // 128MB
-constexpr uint32_t Q_LIST_UPDATE_MSEC = 1000*30; // check queue list every 30seconds
-constexpr uint32_t Q_LIST_RETRY_MSEC = 1000; // retry every second if queue list update failed
-constexpr uint32_t IDLE_TIMEOUT_USEC = 100*1000; // idle sleep 100ms
-constexpr uint32_t FAILOVER_TIME_MSEC = 3*Q_LIST_UPDATE_MSEC; // FAILOVER TIME 3x renew time
-constexpr uint32_t WORKER_COUNT = 1; // 1 worker thread
-constexpr uint32_t STALE_RESERVATIONS_PERIOD_S = 120; // cleanup reservations that are more than 2 minutes old
-constexpr uint32_t RESERVATIONS_CLEANUP_PERIOD_S = 30; // reservation cleanup every 30 seconds
bool init(const DoutPrefixProvider* dpp, rgw::sal::RadosStore* store,
const SiteConfig& site) {
return false;
}
// TODO: take conf from CephContext
- s_manager = std::make_unique<Manager>(dpp->get_cct(),
- Q_LIST_UPDATE_MSEC, Q_LIST_RETRY_MSEC,
- IDLE_TIMEOUT_USEC, FAILOVER_TIME_MSEC,
- STALE_RESERVATIONS_PERIOD_S, RESERVATIONS_CLEANUP_PERIOD_S,
- WORKER_COUNT,
- store, site);
+ s_manager = std::make_unique<Manager>(dpp->get_cct(), store, site);
s_manager->init();
return true;
}