]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
AsyncMessenger: Release connection if stopped
authorHaomai Wang <haomaiwang@gmail.com>
Tue, 11 Nov 2014 04:02:12 +0000 (12:02 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Tue, 11 Nov 2014 16:10:06 +0000 (00:10 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/async/AsyncConnection.cc
src/msg/async/AsyncConnection.h
src/msg/async/AsyncMessenger.cc
src/msg/async/AsyncMessenger.h
src/msg/async/Event.cc

index 5e68415341ca801106c54e1e511a36a3cb800d75..07cd056421d4e27f7973b2c67e3fb12b2e77cbc0 100644 (file)
@@ -723,7 +723,7 @@ void AsyncConnection::process()
         {
           ldout(async_msgr->cct,20) << __func__ << " got CLOSE" << dendl;
           _stop();
-          break;
+          return ;
         }
 
       case STATE_STANDBY:
@@ -1356,7 +1356,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   bufferlist reply_bl;
   uint64_t existing_seq = -1;
   bool is_reset_from_peer = false;
-  char reply_tag;
+  char reply_tag = 0;
 
   memset(&reply, 0, sizeof(reply));
   reply.protocol_version = async_msgr->get_proto_version(peer_type, false);
@@ -1816,9 +1816,10 @@ void AsyncConnection::was_session_reset()
   in_seq_acked = 0;
 }
 
-void AsyncConnection::_stop()
+void AsyncConnection::_stop(bool external)
 {
   ldout(async_msgr->cct, 10) << __func__ << dendl;
+  center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
   center->dispatch_event_external(reset_handler);
   shutdown_socket();
   discard_out_queue();
@@ -1827,6 +1828,10 @@ void AsyncConnection::_stop()
     was_session_reset();
   open_write = false;
   state = STATE_CLOSED;
+  ::close(sd);
+  sd = -1;
+  async_msgr->unregister_conn(peer_addr, external);
+  put();
 }
 
 int AsyncConnection::_send(Message *m)
index 05515fd5933863a8434da382a7a5cbea99fefe21..0e61d8b1e11c6f3e9c84f9c5bc6007d82fd79f06 100644 (file)
@@ -52,7 +52,7 @@ class AsyncConnection : public Connection {
   int read_until(uint64_t needed, bufferptr &p);
   int _process_connection();
   void _connect();
-  void _stop();
+  void _stop(bool external=false);
   int handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &r);
   int handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl);
   void was_session_reset();
@@ -109,7 +109,7 @@ class AsyncConnection : public Connection {
 
   bool is_connected() {
     // FIXME?
-    return true;
+    return state != STATE_CLOSED;
   }
 
   // Only call when AsyncConnection first construct
@@ -124,9 +124,10 @@ class AsyncConnection : public Connection {
   int send_message(Message *m);
 
   void send_keepalive();
+  // Don't call it from AsyncConnection
   void mark_down() {
     Mutex::Locker l(lock);
-    _stop();
+    _stop(true);
   }
   void mark_disposable() {
     Mutex::Locker l(lock);
index 3a8eb04f7cefc18eb19258c6a71db07244214179..87878c35a0bd9e573b144e438b6dc4516e135618 100644 (file)
@@ -357,7 +357,9 @@ int AsyncMessenger::shutdown()
   // break ref cycles on the loopback connection
   processor.stop();
   local_connection->set_priv(NULL);
+  lock.Lock();
   stop_cond.Signal();
+  lock.Unlock();
   stopped = true;
   return 0;
 }
index 2b64080834a23e822def5ccb59b6bd3ff652cb55..fda691160daba3f36c3a0f0993da6b80c2dc617d 100644 (file)
@@ -286,6 +286,8 @@ private:
     ceph::unordered_map<entity_addr_t, AsyncConnectionRef>::iterator p = conns.find(k);
     if (p == conns.end())
       return NULL;
+
+    assert(p->second->is_connected());
     return p->second;
   }
 
@@ -377,6 +379,19 @@ public:
     _init_local_connection();
   }
 
+  /**
+   * Unregister connection from `conns`
+   * `external` is used to indicate whether need to lock AsyncMessenger::lock,
+   * it may call. If external is false, it means that AsyncConnection take the
+   * initiative to unregister
+   */
+  void unregister_conn(const entity_addr_t &addr, bool external) {
+    if (!external)
+      lock.Lock();
+    conns.erase(addr);
+    if (!external)
+      lock.Unlock();
+  }
   /**
    * @} // AsyncMessenger Internals
    */
index 23871132eac816358f18a683865dcde6b875b1e2..64d030eacbc7ea02ad564e0452886dba327b0369 100644 (file)
@@ -39,15 +39,7 @@ class C_handle_notify : public EventCallback {
   C_handle_notify() {}
   void do_request(int fd_or_id) {
     char c[100];
-    int r;
-    do {
-      r = read(fd_or_id, c, 100);
-      if (r > 0 || (r < 0 && errno == EAGAIN)) {
-        break;
-      } else {
-        assert(0);
-      }
-    } while (1);
+    assert(read(fd_or_id, c, 100));
   }
 };