From a3a7c037bfd044b55195d906eb299d96229cc48d Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Mon, 14 Sep 2015 12:19:23 -0400 Subject: [PATCH] timer: Another timer using new time/lock standards Signed-off-by: Adam C. Emerson --- src/common/Makefile.am | 3 +- src/common/ceph_timer.h | 309 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 311 insertions(+), 1 deletion(-) create mode 100644 src/common/ceph_timer.h diff --git a/src/common/Makefile.am b/src/common/Makefile.am index 64ce61443bd..4dfe988a57f 100644 --- a/src/common/Makefile.am +++ b/src/common/Makefile.am @@ -265,7 +265,8 @@ noinst_HEADERS += \ common/TracepointProvider.h \ common/event_socket.h \ common/PluginRegistry.h \ - common/ceph_time.h + common/ceph_time.h \ + common/ceph_timer.h if ENABLE_XIO noinst_HEADERS += \ diff --git a/src/common/ceph_timer.h b/src/common/ceph_timer.h new file mode 100644 index 00000000000..ae95eb250dd --- /dev/null +++ b/src/common/ceph_timer.h @@ -0,0 +1,309 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef COMMON_CEPH_TIMER_H +#define COMMON_CEPH_TIMER_H + +#include +#include +#include +#include + +#include + +#include "ceph_time.h" + +namespace ceph { + + /// Newly constructed timer should be suspended at point of + /// construction. + + struct construct_suspended_t { }; + constexpr construct_suspended_t construct_suspended { }; + + namespace timer_detail { + using boost::intrusive::member_hook; + using boost::intrusive::set_member_hook; + using boost::intrusive::link_mode; + using boost::intrusive::normal_link; + using boost::intrusive::set; + using boost::intrusive::constant_time_size; + using boost::intrusive::compare; + + // Compared to the SafeTimer this does fewer allocations (you + // don't have to allocate a new Context every time you + // want to cue the next tick.) + // + // It also does not share a lock with the caller. If you call + // cancel event, it either cancels the event (and returns true) or + // you missed it. If this does not work for you, you can set up a + // flag and mutex of your own. + // + // You get to pick your clock. I like mono_clock, since I usually + // want to wait FOR a given duration. real_clock is worthwhile if + // you want to wait UNTIL a specific moment of wallclock time. If + // you want you can set up a timer that executes a function after + // you use up ten seconds of CPU time. + + template + class timer { + typedef set_member_hook > sh; + + struct event { + typename TC::time_point t; + uint64_t id; + std::function f; + + sh schedule_link; + sh event_link; + + event() : t(TC::time_point::min()), id(0) {} + event(uint64_t _id) : t(TC::time_point::min()), id(_id) {} + event(typename TC::time_point _t, uint64_t _id, + std::function&& _f) : t(_t), id(_id), f(_f) {} + event(typename TC::time_point _t, uint64_t _id, + const std::function& _f) : t(_t), id(_id), f(_f) {} + bool operator <(const event& e) { + return t == e.t ? id < e.id : t < e.t; + } + }; + struct SchedCompare { + bool operator()(const event& e1, const event& e2) const { + return e1.t == e2.t ? e1.id < e2.id : e1.t < e2.t; + } + }; + struct EventCompare { + bool operator()(const event& e1, const event& e2) const { + return e1.id < e2.id; + } + }; + + set, + constant_time_size, + compare > schedule; + + set, + constant_time_size, + compare > events; + + std::mutex lock; + using lock_guard = std::lock_guard; + using unique_lock = std::unique_lock; + std::condition_variable cond; + + event* running{ nullptr }; + uint64_t next_id{ 0 }; + + bool suspended; + std::thread thread; + + void timer_thread() { + unique_lock l(lock); + while (!suspended) { + typename TC::time_point now = TC::now(); + + while (!schedule.empty()) { + auto p = schedule.begin(); + // Should we wait for the future? + if (p->t > now) + break; + + event& e = *p; + schedule.erase(e); + events.erase(e); + + // Since we have only one thread it is impossible to have more + // than one running event + running = &e; + + l.unlock(); + e.f(); + l.lock(); + + if (running) { + running = nullptr; + delete &e; + } // Otherwise the event requeued itself + } + + if (schedule.empty()) + cond.wait(l); + else + cond.wait_until(l, schedule.begin()->t); + } + } + + public: + timer() { + lock_guard l(lock); + suspended = false; + thread = std::thread(&timer::timer_thread, this); + } + + // Create a suspended timer, jobs will be executed in order when + // it is resumed. + timer(construct_suspended_t) { + lock_guard l(lock); + suspended = true; + } + + timer(const timer &) = delete; + timer& operator=(const timer &) = delete; + + ~timer() { + suspend(); + cancel_all_events(); + } + + // Suspend operation of the timer (and let its thread die). + void suspend() { + unique_lock l(lock); + if (suspended) + return; + + suspended = true; + cond.notify_one(); + l.unlock(); + thread.join(); + } + + + // Resume operation of the timer. (Must have been previously + // suspended.) + void resume() { + unique_lock l(lock); + if (!suspended) + return; + + suspended = false; + assert(!thread.joinable()); + thread = std::thread(&timer::timer_thread, this); + } + + // Schedule an event in the relative future + template + uint64_t add_event(typename TC::duration duration, + Callable&& f, Args&&... args) { + typename TC::time_point when = TC::now(); + when += duration; + return add_event(when, + std::forward(f), + std::forward(args)...); + } + + // Schedule an event in the absolute future + template + uint64_t add_event(typename TC::time_point when, + Callable&& f, Args&&... args) { + std::lock_guard l(lock); + event& e = *(new event( + when, ++next_id, + std::forward >( + std::bind(std::forward(f), + std::forward(args)...)))); + auto i = schedule.insert(e); + events.insert(e); + + /* If the event we have just inserted comes before everything + * else, we need to adjust our timeout. */ + if (i.first == schedule.begin()) + cond.notify_one(); + + // Previously each event was a context, identified by a + // pointer, and each context to be called only once. Since you + // can queue the same function pointer, member function, + // lambda, or functor up multiple times, identifying things by + // function for the purposes of cancellation is no longer + // suitable. Thus: + return e.id; + } + + // Cancel an event. If the event has already come and gone (or you + // never submitted it) you will receive false. Otherwise you will + // receive true and it is guaranteed the event will not execute. + bool cancel_event(const uint64_t id) { + std::lock_guard l(lock); + event dummy(id); + auto p = events.find(dummy); + if (p == events.end()) { + return false; + } + + event& e = *p; + events.erase(e); + schedule.erase(e); + delete &e; + + return true; + } + + // Reschedules a currently running event in the relative + // future. Must be called only from an event executed by this + // timer. If you have a function that can be called either from + // this timer or some other way, it is your responsibility to make + // sure it can tell the difference only does not call + // reschedule_me in the non-timer case. + // + // Returns an event id. If you had an event_id from the first + // scheduling, replace it with this return value. + uint64_t reschedule_me(typename TC::duration duration) { + return reschedule_me(TC::now() + duration); + } + + // Reschedules a currently running event in the absolute + // future. Must be called only from an event executed by this + // timer. if you have a function that can be called either from + // this timer or some other way, it is your responsibility to make + // sure it can tell the difference only does not call + // reschedule_me in the non-timer case. + // + // Returns an event id. If you had an event_id from the first + // scheduling, replace it with this return value. + uint64_t reschedule_me(typename TC::time_point when) { + if (std::this_thread::get_id() != thread.get_id()) + throw std::make_error_condition(std::errc::operation_not_permitted); + std::lock_guard l(lock); + running->t = when; + uint64_t id = ++next_id; + running->id = id; + schedule.insert(*running); + events.insert(*running); + + // Hacky, but keeps us from being deleted + running = nullptr; + + // Same function, but you get a new ID. + return id; + } + + // Remove all events from the queue. + void cancel_all_events() { + std::lock_guard l(lock); + while (!events.empty()) { + auto p = events.begin(); + event& e = *p; + schedule.erase(e); + events.erase(e); + delete &e; + } + } + }; // timer + }; // timer_detail + + using timer_detail::timer; +}; // ceph + +#endif -- 2.47.3