From a5b5aea4d83f8495cc89015e267737652f00361f Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 13 May 2011 13:01:08 -0700 Subject: [PATCH] msgr: mark_down_on_empty and mark_disposable Mark a connection to close when messages are sent, and to close on any error. We can use this to tell people who should be dead that they should be dead, but not waste resources reconnecting to them. Signed-off-by: Sage Weil --- src/msg/Messenger.h | 2 ++ src/msg/SimpleMessenger.cc | 46 ++++++++++++++++++++++++++++++++++++++ src/msg/SimpleMessenger.h | 6 ++++- 3 files changed, 53 insertions(+), 1 deletion(-) diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 7364aacc2c9fd..1171e1c3b647b 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -158,6 +158,8 @@ protected: virtual void mark_down(const entity_addr_t& a) = 0; virtual void mark_down(Connection *con) = 0; + virtual void mark_down_on_empty(Connection *con) = 0; + virtual void mark_disposable(Connection *con) = 0; virtual Connection *get_connection(const entity_inst_t& dest) = 0; diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index a2f04436e7079..e9fcbba86319b 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -1741,6 +1741,14 @@ void SimpleMessenger::Pipe::writer() continue; } + if (close_on_empty) { + // this is slightly hacky + dout(10) << "writer queue empty, closing" << dendl; + policy.lossy = true; + fault(); + continue; + } + // wait dout(20) << "writer sleeping" << dendl; cond.Wait(pipe_lock); @@ -2682,6 +2690,44 @@ void SimpleMessenger::mark_down(Connection *con) lock.Unlock(); } +void SimpleMessenger::mark_down_on_empty(Connection *con) +{ + lock.Lock(); + Pipe *p = (Pipe *)con->get_pipe(); + if (p) { + p->pipe_lock.Lock(); + p->unregister_pipe(); + if (p->out_q.empty()) { + dout(1) << "mark_down_on_empty " << con << " -- " << p << " closing (queue is empty)" << dendl; + p->stop(); + } else { + dout(1) << "mark_down_on_empty " << con << " -- " << p << " marking (queue is not empty)" << dendl; + p->close_on_empty = true; + } + p->pipe_lock.Unlock(); + p->put(); + } else { + dout(1) << "mark_down_on_empty " << con << " -- pipe dne" << dendl; + } + lock.Unlock(); +} + +void SimpleMessenger::mark_disposable(Connection *con) +{ + lock.Lock(); + Pipe *p = (Pipe *)con->get_pipe(); + if (p) { + dout(1) << "mark_disposable " << con << " -- " << p << dendl; + p->pipe_lock.Lock(); + p->policy.lossy = true; + p->pipe_lock.Unlock(); + p->put(); + } else { + dout(1) << "mark_disposable " << con << " -- pipe dne" << dendl; + } + lock.Unlock(); +} + void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) { lock.Lock(); diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index b285ff9e5a9f5..fcf72043dfb73 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -152,6 +152,7 @@ private: Cond cond; bool keepalive; bool halt_delivery; //if a pipe's queue is destroyed, stop adding to it + bool close_on_empty; __u32 connect_seq, peer_global_seq; uint64_t out_seq; @@ -214,7 +215,7 @@ private: state(st), connection_state(new Connection), reader_running(false), reader_joining(false), writer_running(false), - in_qlen(0), keepalive(false), halt_delivery(false), + in_qlen(0), keepalive(false), halt_delivery(false), close_on_empty(false), connect_seq(0), peer_global_seq(0), out_seq(0), in_seq(0), in_seq_acked(0), reader_thread(this), writer_thread(this) { @@ -473,6 +474,9 @@ private: void mark_down(const entity_addr_t& addr); void mark_down(Connection *con); + void mark_down_on_empty(Connection *con); + void mark_disposable(Connection *con); + void mark_down_all(); // reaper -- 2.39.5