return pipe._pipe_prefix(out);
}
+/**
+ * The DelayedDelivery is for injecting delays into Message delivery off
+ * the socket. It is only enabled if delays are requested, and if they
+ * are then it pulls Messages off the DelayQueue and puts them into the
+ * in_q (SimpleMessenger::dispatch_queue).
+ * Please note that this probably has issues with Pipe shutdown and
+ * replacement semantics. I've tried, but no guarantees.
+ */
+class Pipe::DelayedDelivery: public Thread {
+ Pipe *pipe;
+ std::deque< pair<utime_t,Message*> > delay_queue;
+ Mutex delay_lock;
+ Cond delay_cond;
+ int flush_count;
+ bool active_flush;
+ bool stop_delayed_delivery;
+ bool delay_dispatching; // we are in fast dispatch now
+ bool stop_fast_dispatching_flag; // we need to stop fast dispatching
+
+public:
+ explicit DelayedDelivery(Pipe *p)
+ : pipe(p),
+ delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0),
+ active_flush(false),
+ stop_delayed_delivery(false),
+ delay_dispatching(false),
+ stop_fast_dispatching_flag(false) { }
+ ~DelayedDelivery() {
+ discard();
+ }
+ void *entry();
+ void queue(utime_t release, Message *m) {
+ Mutex::Locker l(delay_lock);
+ delay_queue.push_back(make_pair(release, m));
+ delay_cond.Signal();
+ }
+ void discard();
+ void flush();
+ bool is_flushing() {
+ Mutex::Locker l(delay_lock);
+ return flush_count > 0 || active_flush;
+ }
+ void wait_for_flush() {
+ Mutex::Locker l(delay_lock);
+ while (flush_count > 0 || active_flush)
+ delay_cond.Wait(delay_lock);
+ }
+ void stop() {
+ delay_lock.Lock();
+ stop_delayed_delivery = true;
+ delay_cond.Signal();
+ delay_lock.Unlock();
+ }
+ void steal_for_pipe(Pipe *new_owner) {
+ Mutex::Locker l(delay_lock);
+ pipe = new_owner;
+ }
+ /**
+ * We need to stop fast dispatching before we need to stop putting
+ * normal messages into the DispatchQueue.
+ */
+ void stop_fast_dispatching();
+};
+
/**************************************
* Pipe
*/
void *entry() { pipe->writer(); return 0; }
} writer_thread;
- /**
- * The DelayedDelivery is for injecting delays into Message delivery off
- * the socket. It is only enabled if delays are requested, and if they
- * are then it pulls Messages off the DelayQueue and puts them into the
- * in_q (SimpleMessenger::dispatch_queue).
- * Please note that this probably has issues with Pipe shutdown and
- * replacement semantics. I've tried, but no guarantees.
- */
- class DelayedDelivery: public Thread {
- Pipe *pipe;
- std::deque< pair<utime_t,Message*> > delay_queue;
- Mutex delay_lock;
- Cond delay_cond;
- int flush_count;
- bool active_flush;
- bool stop_delayed_delivery;
- bool delay_dispatching; // we are in fast dispatch now
- bool stop_fast_dispatching_flag; // we need to stop fast dispatching
-
- public:
- explicit DelayedDelivery(Pipe *p)
- : pipe(p),
- delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0),
- active_flush(false),
- stop_delayed_delivery(false),
- delay_dispatching(false),
- stop_fast_dispatching_flag(false) { }
- ~DelayedDelivery() {
- discard();
- }
- void *entry();
- void queue(utime_t release, Message *m) {
- Mutex::Locker l(delay_lock);
- delay_queue.push_back(make_pair(release, m));
- delay_cond.Signal();
- }
- void discard();
- void flush();
- bool is_flushing() {
- Mutex::Locker l(delay_lock);
- return flush_count > 0 || active_flush;
- }
- void wait_for_flush() {
- Mutex::Locker l(delay_lock);
- while (flush_count > 0 || active_flush)
- delay_cond.Wait(delay_lock);
- }
- void stop() {
- delay_lock.Lock();
- stop_delayed_delivery = true;
- delay_cond.Signal();
- delay_lock.Unlock();
- }
- void steal_for_pipe(Pipe *new_owner) {
- Mutex::Locker l(delay_lock);
- pipe = new_owner;
- }
- /**
- * We need to stop fast dispatching before we need to stop putting
- * normal messages into the DispatchQueue.
- */
- void stop_fast_dispatching();
- } *delay_thread;
-
+ class DelayedDelivery;
+ DelayedDelivery *delay_thread;
public:
Pipe(SimpleMessenger *r, int st, PipeConnection *con);
~Pipe();