From: Michal Jarzabek Date: Tue, 26 Jul 2016 19:02:48 +0000 (+0100) Subject: msg/Pipe: move DelayedDelivery class to cc file X-Git-Tag: v11.1.0~403^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3b2e507287703dd64fa4a607691304ce54d691f9;p=ceph.git msg/Pipe: move DelayedDelivery class to cc file Signed-off-by: Michal Jarzabek --- diff --git a/src/msg/simple/Pipe.cc b/src/msg/simple/Pipe.cc index 2c2efb097f7..6ce6824c908 100644 --- a/src/msg/simple/Pipe.cc +++ b/src/msg/simple/Pipe.cc @@ -58,6 +58,70 @@ ostream& operator<<(ostream &out, const Pipe &pipe) { return pipe._pipe_prefix(out); } +/** + * The DelayedDelivery is for injecting delays into Message delivery off + * the socket. It is only enabled if delays are requested, and if they + * are then it pulls Messages off the DelayQueue and puts them into the + * in_q (SimpleMessenger::dispatch_queue). + * Please note that this probably has issues with Pipe shutdown and + * replacement semantics. I've tried, but no guarantees. + */ +class Pipe::DelayedDelivery: public Thread { + Pipe *pipe; + std::deque< pair > delay_queue; + Mutex delay_lock; + Cond delay_cond; + int flush_count; + bool active_flush; + bool stop_delayed_delivery; + bool delay_dispatching; // we are in fast dispatch now + bool stop_fast_dispatching_flag; // we need to stop fast dispatching + +public: + explicit DelayedDelivery(Pipe *p) + : pipe(p), + delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0), + active_flush(false), + stop_delayed_delivery(false), + delay_dispatching(false), + stop_fast_dispatching_flag(false) { } + ~DelayedDelivery() { + discard(); + } + void *entry(); + void queue(utime_t release, Message *m) { + Mutex::Locker l(delay_lock); + delay_queue.push_back(make_pair(release, m)); + delay_cond.Signal(); + } + void discard(); + void flush(); + bool is_flushing() { + Mutex::Locker l(delay_lock); + return flush_count > 0 || active_flush; + } + void wait_for_flush() { + Mutex::Locker l(delay_lock); + while (flush_count > 0 || active_flush) + delay_cond.Wait(delay_lock); + } + void stop() { + delay_lock.Lock(); + stop_delayed_delivery = true; + delay_cond.Signal(); + delay_lock.Unlock(); + } + void steal_for_pipe(Pipe *new_owner) { + Mutex::Locker l(delay_lock); + pipe = new_owner; + } + /** + * We need to stop fast dispatching before we need to stop putting + * normal messages into the DispatchQueue. + */ + void stop_fast_dispatching(); +}; + /************************************** * Pipe */ diff --git a/src/msg/simple/Pipe.h b/src/msg/simple/Pipe.h index c9a166e6611..1e1d5dafc0e 100644 --- a/src/msg/simple/Pipe.h +++ b/src/msg/simple/Pipe.h @@ -63,70 +63,8 @@ static const int SM_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX); void *entry() { pipe->writer(); return 0; } } writer_thread; - /** - * The DelayedDelivery is for injecting delays into Message delivery off - * the socket. It is only enabled if delays are requested, and if they - * are then it pulls Messages off the DelayQueue and puts them into the - * in_q (SimpleMessenger::dispatch_queue). - * Please note that this probably has issues with Pipe shutdown and - * replacement semantics. I've tried, but no guarantees. - */ - class DelayedDelivery: public Thread { - Pipe *pipe; - std::deque< pair > delay_queue; - Mutex delay_lock; - Cond delay_cond; - int flush_count; - bool active_flush; - bool stop_delayed_delivery; - bool delay_dispatching; // we are in fast dispatch now - bool stop_fast_dispatching_flag; // we need to stop fast dispatching - - public: - explicit DelayedDelivery(Pipe *p) - : pipe(p), - delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0), - active_flush(false), - stop_delayed_delivery(false), - delay_dispatching(false), - stop_fast_dispatching_flag(false) { } - ~DelayedDelivery() { - discard(); - } - void *entry(); - void queue(utime_t release, Message *m) { - Mutex::Locker l(delay_lock); - delay_queue.push_back(make_pair(release, m)); - delay_cond.Signal(); - } - void discard(); - void flush(); - bool is_flushing() { - Mutex::Locker l(delay_lock); - return flush_count > 0 || active_flush; - } - void wait_for_flush() { - Mutex::Locker l(delay_lock); - while (flush_count > 0 || active_flush) - delay_cond.Wait(delay_lock); - } - void stop() { - delay_lock.Lock(); - stop_delayed_delivery = true; - delay_cond.Signal(); - delay_lock.Unlock(); - } - void steal_for_pipe(Pipe *new_owner) { - Mutex::Locker l(delay_lock); - pipe = new_owner; - } - /** - * We need to stop fast dispatching before we need to stop putting - * normal messages into the DispatchQueue. - */ - void stop_fast_dispatching(); - } *delay_thread; - + class DelayedDelivery; + DelayedDelivery *delay_thread; public: Pipe(SimpleMessenger *r, int st, PipeConnection *con); ~Pipe();