]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/Pipe: move DelayedDelivery class to cc file 10447/head
authorMichal Jarzabek <stiopa@gmail.com>
Tue, 26 Jul 2016 19:02:48 +0000 (20:02 +0100)
committerMichal Jarzabek <stiopa@gmail.com>
Tue, 26 Jul 2016 19:02:48 +0000 (20:02 +0100)
Signed-off-by: Michal Jarzabek <stiopa@gmail.com>
src/msg/simple/Pipe.cc
src/msg/simple/Pipe.h

index 2c2efb097f7b591b73583df853a5bff76392b7c1..6ce6824c90851da419a0186f1b06563bcc69b009 100644 (file)
@@ -58,6 +58,70 @@ ostream& operator<<(ostream &out, const Pipe &pipe) {
   return pipe._pipe_prefix(out);
 }
 
+/**
+ * The DelayedDelivery is for injecting delays into Message delivery off
+ * the socket. It is only enabled if delays are requested, and if they
+ * are then it pulls Messages off the DelayQueue and puts them into the
+ * in_q (SimpleMessenger::dispatch_queue).
+ * Please note that this probably has issues with Pipe shutdown and
+ * replacement semantics. I've tried, but no guarantees.
+ */
+class Pipe::DelayedDelivery: public Thread {
+  Pipe *pipe;
+  std::deque< pair<utime_t,Message*> > delay_queue;
+  Mutex delay_lock;
+  Cond delay_cond;
+  int flush_count;
+  bool active_flush;
+  bool stop_delayed_delivery;
+  bool delay_dispatching; // we are in fast dispatch now
+  bool stop_fast_dispatching_flag; // we need to stop fast dispatching
+
+public:
+  explicit DelayedDelivery(Pipe *p)
+    : pipe(p),
+      delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0),
+      active_flush(false),
+      stop_delayed_delivery(false),
+      delay_dispatching(false),
+      stop_fast_dispatching_flag(false) { }
+  ~DelayedDelivery() {
+    discard();
+  }
+  void *entry();
+  void queue(utime_t release, Message *m) {
+    Mutex::Locker l(delay_lock);
+    delay_queue.push_back(make_pair(release, m));
+    delay_cond.Signal();
+  }
+  void discard();
+  void flush();
+  bool is_flushing() {
+    Mutex::Locker l(delay_lock);
+    return flush_count > 0 || active_flush;
+  }
+  void wait_for_flush() {
+    Mutex::Locker l(delay_lock);
+    while (flush_count > 0 || active_flush)
+      delay_cond.Wait(delay_lock);
+  }
+  void stop() {
+    delay_lock.Lock();
+    stop_delayed_delivery = true;
+    delay_cond.Signal();
+    delay_lock.Unlock();
+  }
+  void steal_for_pipe(Pipe *new_owner) {
+    Mutex::Locker l(delay_lock);
+    pipe = new_owner;
+  }
+  /**
+   * We need to stop fast dispatching before we need to stop putting
+   * normal messages into the DispatchQueue.
+   */
+  void stop_fast_dispatching();
+};
+
 /**************************************
  * Pipe
  */
index c9a166e6611b8b924505bc4f5a74cd40da193ca1..1e1d5dafc0ea813796c83d381fb890c75463471e 100644 (file)
@@ -63,70 +63,8 @@ static const int SM_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
       void *entry() { pipe->writer(); return 0; }
     } writer_thread;
 
-    /**
-     * The DelayedDelivery is for injecting delays into Message delivery off
-     * the socket. It is only enabled if delays are requested, and if they
-     * are then it pulls Messages off the DelayQueue and puts them into the
-     * in_q (SimpleMessenger::dispatch_queue).
-     * Please note that this probably has issues with Pipe shutdown and
-     * replacement semantics. I've tried, but no guarantees.
-     */
-    class DelayedDelivery: public Thread {
-      Pipe *pipe;
-      std::deque< pair<utime_t,Message*> > delay_queue;
-      Mutex delay_lock;
-      Cond delay_cond;
-      int flush_count;
-      bool active_flush;
-      bool stop_delayed_delivery;
-      bool delay_dispatching; // we are in fast dispatch now
-      bool stop_fast_dispatching_flag; // we need to stop fast dispatching
-
-    public:
-      explicit DelayedDelivery(Pipe *p)
-       : pipe(p),
-         delay_lock("Pipe::DelayedDelivery::delay_lock"), flush_count(0),
-         active_flush(false),
-         stop_delayed_delivery(false),
-         delay_dispatching(false),
-         stop_fast_dispatching_flag(false) { }
-      ~DelayedDelivery() {
-       discard();
-      }
-      void *entry();
-      void queue(utime_t release, Message *m) {
-       Mutex::Locker l(delay_lock);
-       delay_queue.push_back(make_pair(release, m));
-       delay_cond.Signal();
-      }
-      void discard();
-      void flush();
-      bool is_flushing() {
-        Mutex::Locker l(delay_lock);
-        return flush_count > 0 || active_flush;
-      }
-      void wait_for_flush() {
-        Mutex::Locker l(delay_lock);
-        while (flush_count > 0 || active_flush)
-          delay_cond.Wait(delay_lock);
-      }
-      void stop() {
-       delay_lock.Lock();
-       stop_delayed_delivery = true;
-       delay_cond.Signal();
-       delay_lock.Unlock();
-      }
-      void steal_for_pipe(Pipe *new_owner) {
-        Mutex::Locker l(delay_lock);
-        pipe = new_owner;
-      }
-      /**
-       * We need to stop fast dispatching before we need to stop putting
-       * normal messages into the DispatchQueue.
-       */
-      void stop_fast_dispatching();
-    } *delay_thread;
-
+    class DelayedDelivery;
+    DelayedDelivery *delay_thread;
   public:
     Pipe(SimpleMessenger *r, int st, PipeConnection *con);
     ~Pipe();