]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: add ms_handle_connect callback
authorSage Weil <sage@newdream.net>
Tue, 13 Oct 2009 18:51:32 +0000 (11:51 -0700)
committerSage Weil <sage@newdream.net>
Tue, 13 Oct 2009 18:51:32 +0000 (11:51 -0700)
Called when an outgoing connection succeeds.

src/msg/Dispatcher.h
src/msg/Messenger.h
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 24c5c5ed283c917658bb5f50d00bf1b7434fa180..93d7c597b9f9fe0e5f2c6f420b162485e7d7116a 100644 (file)
@@ -29,6 +29,9 @@ public:
   // how i receive messages
   virtual bool ms_dispatch(Message *m) = 0;
 
+  // after a connection connects
+  virtual void ms_handle_connect(Connection *con) { };
+
   /*
    * on any connection reset.
    * this indicates that the ordered+reliable delivery semantics have 
index 743a1f8d0bdafe16357990613a34792d75601017..c14a1c44b76dc687dbe6fce45ca059bdc6f8084e 100644 (file)
@@ -104,6 +104,12 @@ protected:
                    << dendl;
     assert(0);
   }
+  void ms_deliver_handle_connect(Connection *con) {
+    for (list<Dispatcher*>::iterator p = dispatchers.begin();
+        p != dispatchers.end();
+        p++)
+      (*p)->ms_handle_connect(con);
+  }
   void ms_deliver_handle_reset(Connection *con) {
     for (list<Dispatcher*>::iterator p = dispatchers.begin();
         p != dispatchers.end();
index 7c638d7f934f1e798a1ff4426ab93b94ccc0890b..38d7cc758f9e31a75d913ec1b74b9ca5b3a55037 100644 (file)
@@ -287,14 +287,21 @@ void SimpleMessenger::Endpoint::dispatch_entry()
          }
           Message *m = ls.front();
           ls.pop_front();
-         if ((long)m == BAD_REMOTE_RESET) {
+         if ((long)m == D_BAD_REMOTE_RESET) {
            lock.Lock();
            Connection *con = remote_reset_q.front();
            remote_reset_q.pop_front();
            lock.Unlock();
            ms_deliver_handle_remote_reset(con);
            con->put();
-         } else if ((long)m == BAD_RESET) {
+         } else if ((long)m == D_CONNECT) {
+           lock.Lock();
+           Connection *con = connect_q.front();
+           connect_q.pop_front();
+           lock.Unlock();
+           ms_deliver_handle_connect(con);
+           con->put();
+         } else if ((long)m == D_BAD_RESET) {
            lock.Lock();
            Connection *con = reset_q.front();
            reset_q.pop_front();
@@ -1002,6 +1009,10 @@ int SimpleMessenger::Pipe::connect()
       backoff = utime_t();
       dout(20) << "connect success " << connect_seq << ", lossy = " << policy.lossy << dendl;
 
+      for (unsigned i=0; i<rank->local.size(); i++) 
+       if (rank->local[i])
+         rank->local[i]->queue_connect(connection_state->get());
+
       if (!reader_running) {
        dout(20) << "connect starting reader" << dendl;
        start_reader();
index b338f720b8f5204499b5569b5bd9102a3edaffae..7b4dc4eaf66ff970aaf5977e2b7231486c1e506c 100644 (file)
@@ -296,21 +296,29 @@ private:
       lock.Unlock();
     }
 
-    enum { BAD_REMOTE_RESET, BAD_RESET };
+    enum { D_CONNECT, D_BAD_REMOTE_RESET, D_BAD_RESET };
+    list<Connection*> connect_q;
     list<Connection*> remote_reset_q;
     list<Connection*> reset_q;
 
+    void queue_connect(Connection *con) {
+      lock.Lock();
+      connect_q.push_back(con);
+      dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)D_CONNECT);
+      cond.Signal();
+      lock.Unlock();
+    }
     void queue_remote_reset(Connection *con) {
       lock.Lock();
       remote_reset_q.push_back(con);
-      dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_REMOTE_RESET);
+      dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)D_BAD_REMOTE_RESET);
       cond.Signal();
       lock.Unlock();
     }
     void queue_reset(Connection *con) {
       lock.Lock();
       reset_q.push_back(con);
-      dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_RESET);
+      dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)D_BAD_RESET);
       cond.Signal();
       lock.Unlock();
     }