From: Greg Farnum Date: Thu, 14 Oct 2010 18:05:51 +0000 (-0700) Subject: messenger: introduce a "halt_delivery" flag, checked by queue_delivery. X-Git-Tag: v0.22.1~29^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=69be0df61d29a093dbeadf6dbcd4e18b429d0a22;p=ceph.git messenger: introduce a "halt_delivery" flag, checked by queue_delivery. Defaults to false, is set to true by destroy_queue. --- diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 26053d13a605..c6a40e874d86 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -1223,7 +1223,7 @@ void SimpleMessenger::Pipe::discard_queue() { dout(10) << "discard_queue" << dendl; DispatchQueue& q = messenger->dispatch_queue; - + halt_delivery = true; pipe_lock.Unlock(); xlist* list_on; q.lock.Lock();//to remove from round-robin diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index f629c9ba9502..bd5bb2ab7265 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -143,6 +143,7 @@ private: list sent; Cond cond; bool keepalive; + bool halt_delivery; //if a pipe's queue is destroyed, stop adding to it __u32 connect_seq, peer_global_seq; uint64_t out_seq; @@ -190,7 +191,7 @@ private: state(st), connection_state(new Connection), reader_running(false), reader_joining(false), writer_running(false), - in_qlen(0), keepalive(false), + in_qlen(0), keepalive(false), halt_delivery(false), connect_seq(0), peer_global_seq(0), out_seq(0), in_seq(0), in_seq_acked(0), reader_thread(this), writer_thread(this) { @@ -259,9 +260,16 @@ private: //Don't call while holding pipe_lock! void queue_received(Message *m, int priority) { list& queue = in_q[priority]; - + bool was_empty; pipe_lock.Lock(); - bool was_empty = queue.empty(); + if (halt_delivery) { + if (m>(void *)5) // don't want to put local-delivery signals + // this magic number should be larger than + // the size of the D_CONNECT et al enum + m->put(); + goto unlock_return; + } + was_empty = queue.empty(); queue.push_back(m); if (was_empty) //this pipe isn't on the endpoint queue enqueue_me(priority); @@ -272,6 +280,7 @@ private: ++messenger->dispatch_queue.qlen; messenger->dispatch_queue.qlen_lock.unlock(); +unlock_return: pipe_lock.Unlock(); }