]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
messenger: introduce a "halt_delivery" flag, checked by queue_delivery.
authorGreg Farnum <gregf@hq.newdream.net>
Thu, 14 Oct 2010 18:05:51 +0000 (11:05 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Thu, 14 Oct 2010 18:05:51 +0000 (11:05 -0700)
Defaults to false, is set to true by destroy_queue.

src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 26053d13a6051a65f36f9bd8c2b30605f463e239..c6a40e874d866eae4d12939b79e973cd14558b92 100644 (file)
@@ -1223,7 +1223,7 @@ void SimpleMessenger::Pipe::discard_queue()
 {
   dout(10) << "discard_queue" << dendl;
   DispatchQueue& q = messenger->dispatch_queue;
-
+  halt_delivery = true;
   pipe_lock.Unlock();
   xlist<Pipe *>* list_on;
   q.lock.Lock();//to remove from round-robin
index f629c9ba9502f74a80af306f597dc777d6377615..bd5bb2ab726560d5c00680bf27ce748f9657f250 100644 (file)
@@ -143,6 +143,7 @@ private:
     list<Message*> sent;
     Cond cond;
     bool keepalive;
+    bool halt_delivery; //if a pipe's queue is destroyed, stop adding to it
     
     __u32 connect_seq, peer_global_seq;
     uint64_t out_seq;
@@ -190,7 +191,7 @@ private:
       state(st), 
       connection_state(new Connection),
       reader_running(false), reader_joining(false), writer_running(false),
-      in_qlen(0), keepalive(false),
+      in_qlen(0), keepalive(false), halt_delivery(false),
       connect_seq(0), peer_global_seq(0),
       out_seq(0), in_seq(0), in_seq_acked(0),
       reader_thread(this), writer_thread(this) {
@@ -259,9 +260,16 @@ private:
     //Don't call while holding pipe_lock!
     void queue_received(Message *m, int priority) {
       list<Message *>& queue = in_q[priority];
-
+      bool was_empty;
       pipe_lock.Lock();
-      bool was_empty = queue.empty();
+      if (halt_delivery) {
+        if (m>(void *)5) // don't want to put local-delivery signals
+                         // this magic number should be larger than
+                         // the size of the D_CONNECT et al enum
+          m->put();
+        goto unlock_return;
+      }
+      was_empty = queue.empty();
       queue.push_back(m);
       if (was_empty) //this pipe isn't on the endpoint queue
        enqueue_me(priority);
@@ -272,6 +280,7 @@ private:
       ++messenger->dispatch_queue.qlen;
       messenger->dispatch_queue.qlen_lock.unlock();
 
+unlock_return:
       pipe_lock.Unlock();
     }