#pragma once
+#include <chrono>
+#include <functional>
#include <ostream>
#include <map>
#include <vector>
const scheduler_id_t &id) const;
} client_registry;
+ class crimson_mclock_cleaning_job_t {
+ struct job_control_t {
+ std::chrono::milliseconds period;
+ std::function<void()> body;
+
+ bool stopping = false;
+ seastar::condition_variable cv;
+
+
+ template <typename D, typename F>
+ job_control_t(D _period, F &&_body) :
+ period(std::chrono::duration_cast<decltype(period)>(_period)),
+ body(std::forward<F>(_body)) {
+ }
+ };
+ seastar::lw_shared_ptr<job_control_t> control;
+
+ static seastar::future<> run(
+ seastar::lw_shared_ptr<job_control_t> control) {
+ while (!control->stopping) {
+ std::invoke(control->body);
+ co_await control->cv.wait(control->period);
+ }
+ }
+ public:
+ template<typename... Args>
+ crimson_mclock_cleaning_job_t(Args&&... args) :
+ control(seastar::make_lw_shared<job_control_t>(
+ std::forward<Args>(args)...))
+ {
+ std::ignore = run(control);
+ }
+
+ void try_update(milliseconds _period) {
+ control->period = _period;
+ control->cv.signal();
+ }
+
+ ~crimson_mclock_cleaning_job_t() {
+ control->stopping = true;
+ control->cv.signal();
+ }
+ };
using mclock_queue_t = crimson::dmclock::PullPriorityQueue<
scheduler_id_t,
item_t,
true,
true,
- 2>;
+ 2,
+ crimson_mclock_cleaning_job_t>;
mclock_queue_t scheduler;
std::list<item_t> immediate;