From 7e5d162eddf0c03af6275c4c8058e0421c865bdc Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 13 Oct 2009 11:51:32 -0700 Subject: [PATCH] msgr: add ms_handle_connect callback Called when an outgoing connection succeeds. --- src/msg/Dispatcher.h | 3 +++ src/msg/Messenger.h | 6 ++++++ src/msg/SimpleMessenger.cc | 15 +++++++++++++-- src/msg/SimpleMessenger.h | 14 +++++++++++--- 4 files changed, 33 insertions(+), 5 deletions(-) diff --git a/src/msg/Dispatcher.h b/src/msg/Dispatcher.h index 24c5c5ed283c9..93d7c597b9f9f 100644 --- a/src/msg/Dispatcher.h +++ b/src/msg/Dispatcher.h @@ -29,6 +29,9 @@ public: // how i receive messages virtual bool ms_dispatch(Message *m) = 0; + // after a connection connects + virtual void ms_handle_connect(Connection *con) { }; + /* * on any connection reset. * this indicates that the ordered+reliable delivery semantics have diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 743a1f8d0bdaf..c14a1c44b76dc 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -104,6 +104,12 @@ protected: << dendl; assert(0); } + void ms_deliver_handle_connect(Connection *con) { + for (list::iterator p = dispatchers.begin(); + p != dispatchers.end(); + p++) + (*p)->ms_handle_connect(con); + } void ms_deliver_handle_reset(Connection *con) { for (list::iterator p = dispatchers.begin(); p != dispatchers.end(); diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 7c638d7f934f1..38d7cc758f9e3 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -287,14 +287,21 @@ void SimpleMessenger::Endpoint::dispatch_entry() } Message *m = ls.front(); ls.pop_front(); - if ((long)m == BAD_REMOTE_RESET) { + if ((long)m == D_BAD_REMOTE_RESET) { lock.Lock(); Connection *con = remote_reset_q.front(); remote_reset_q.pop_front(); lock.Unlock(); ms_deliver_handle_remote_reset(con); con->put(); - } else if ((long)m == BAD_RESET) { + } else if ((long)m == D_CONNECT) { + lock.Lock(); + Connection *con = connect_q.front(); + connect_q.pop_front(); + lock.Unlock(); + ms_deliver_handle_connect(con); + con->put(); + } else if ((long)m == D_BAD_RESET) { lock.Lock(); Connection *con = reset_q.front(); reset_q.pop_front(); @@ -1002,6 +1009,10 @@ int SimpleMessenger::Pipe::connect() backoff = utime_t(); dout(20) << "connect success " << connect_seq << ", lossy = " << policy.lossy << dendl; + for (unsigned i=0; ilocal.size(); i++) + if (rank->local[i]) + rank->local[i]->queue_connect(connection_state->get()); + if (!reader_running) { dout(20) << "connect starting reader" << dendl; start_reader(); diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index b338f720b8f52..7b4dc4eaf66ff 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -296,21 +296,29 @@ private: lock.Unlock(); } - enum { BAD_REMOTE_RESET, BAD_RESET }; + enum { D_CONNECT, D_BAD_REMOTE_RESET, D_BAD_RESET }; + list connect_q; list remote_reset_q; list reset_q; + void queue_connect(Connection *con) { + lock.Lock(); + connect_q.push_back(con); + dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)D_CONNECT); + cond.Signal(); + lock.Unlock(); + } void queue_remote_reset(Connection *con) { lock.Lock(); remote_reset_q.push_back(con); - dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_REMOTE_RESET); + dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)D_BAD_REMOTE_RESET); cond.Signal(); lock.Unlock(); } void queue_reset(Connection *con) { lock.Lock(); reset_q.push_back(con); - dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_RESET); + dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)D_BAD_RESET); cond.Signal(); lock.Unlock(); } -- 2.39.5