]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Replace AsyncConnection* with AsyncconnectionRef
authorHaomai Wang <haomaiwang@gmail.com>
Tue, 2 Sep 2014 03:47:31 +0000 (11:47 +0800)
committerHaomai Wang <haomaiwang@gmail.com>
Wed, 8 Oct 2014 06:03:17 +0000 (14:03 +0800)
Signed-off-by: Haomai Wang <haomaiwang@gmail.com>
src/msg/AsyncConnection.cc
src/msg/AsyncConnection.h
src/msg/AsyncMessenger.cc
src/msg/AsyncMessenger.h

index 731a87fa093b1e0572c0737915476af471eee9fb..730cf484918e56ff7274fb97db95e71b40c7ce3d 100644 (file)
@@ -26,42 +26,33 @@ ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) {
 }
 
 class C_handle_read : public EventCallback {
-  AsyncConnection *conn;
+  AsyncConnectionRef conn;
 
  public:
-  C_handle_read(AsyncConnection *c): conn(c) {
-    conn->get();
-  }
+  C_handle_read(AsyncConnectionRef c): conn(c) {}
   void do_request(int fd) {
     conn->process();
-    conn->put();
   }
 };
 
 class C_handle_write : public EventCallback {
-  AsyncConnection *conn;
+  AsyncConnectionRef conn;
 
  public:
-  C_handle_write(AsyncConnection *c): conn(c) {
-    conn->get();
-  }
+  C_handle_write(AsyncConnectionRef c): conn(c) {}
   void do_request(int fd) {
     conn->handle_write();
-    conn->put();
   }
 };
 
 class C_handle_reset : public EventCallback {
   AsyncMessenger *msgr;
-  AsyncConnection *conn;
+  AsyncConnectionRef conn;
 
  public:
-  C_handle_reset(AsyncMessenger *m, AsyncConnection *c): msgr(m), conn(c) {
-    conn->get();
-  }
+  C_handle_reset(AsyncMessenger *m, AsyncConnectionRef c): msgr(m), conn(c) {}
   void do_request(int id) {
-    msgr->ms_deliver_handle_reset(conn);
-    conn->put();
+    msgr->ms_deliver_handle_reset(conn.get());
   }
 };
 
@@ -1023,9 +1014,7 @@ int AsyncConnection::_process_connection()
           session_security.reset();
         }
 
-        get();
         async_msgr->ms_deliver_handle_connect(this);
-        get();
         async_msgr->ms_deliver_handle_fast_connect(this);
 
         // reset connect state variables
@@ -1323,7 +1312,6 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   }
 
   bool authorizer_valid;
-  get();
   if (!async_msgr->verify_authorizer(this, peer_type, connect.authorizer_protocol, authorizer_bl,
                                authorizer_reply, authorizer_valid, session_key) || !authorizer_valid) {
     ldout(async_msgr->cct,0) << __func__ << ": got bad authorizer" << dendl;
@@ -1335,7 +1323,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   ldout(async_msgr->cct, 10) << __func__ << " accept:  setting up session_security." << dendl;
 
   // existing?
-  AsyncConnection *existing = async_msgr->lookup_conn(peer_addr);
+  AsyncConnectionRef existing = async_msgr->lookup_conn(peer_addr);
   if (existing) {
     if (connect.global_seq < existing->peer_global_seq) {
       ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing
@@ -1462,11 +1450,9 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
   }
   if (existing->policy.lossy) {
     // disconnect from the Connection
-    existing->get();
     async_msgr->ms_deliver_handle_reset(existing);
   } else {
     // queue a reset on the new connection, which we're dumping for the old
-    get();
     async_msgr->ms_deliver_handle_reset(this);
 
     // reset the in_seq if this is a hard reset from peer,
@@ -1515,9 +1501,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a
                                session_key, get_features()));
 
   // notify
-  get();
   async_msgr->ms_deliver_handle_accept(this);
-  get();
   async_msgr->ms_deliver_handle_fast_accept(this);
 
   // ok!
@@ -1708,7 +1692,6 @@ void AsyncConnection::was_session_reset()
   discard_out_queue();
   outcoming_bl.clear();
 
-  get();
   async_msgr->ms_deliver_handle_remote_reset(this);
 
   if (randomize_out_seq()) {
@@ -1744,7 +1727,6 @@ int AsyncConnection::_send(Message *m)
   }
 
   // associate message with Connection (for benefit of encode_payload)
-  get();
   m->set_connection(this);
 
   uint64_t features = get_features();
index 929cd56242d639fe1786088b6c5afd13527d4a53..a010cb0912bc36c7d4622d2acc49d0117e96e5dd 100644 (file)
@@ -248,4 +248,6 @@ class AsyncConnection : public Connection {
   ceph::shared_ptr<AuthSessionHandler> session_security;
 }; /* AsyncConnection */
 
+typedef boost::intrusive_ptr<AsyncConnection> AsyncConnectionRef;
+
 #endif
index e1144ccd458df845ef2a5a33177cc0f0e54fb1d9..b1f69f447a304af3cd28c3d28d97b1139eeb3735 100644 (file)
@@ -383,7 +383,7 @@ void AsyncMessenger::wait()
     ldout(cct, 10) << __func__ << ": closing pipes" << dendl;
 
     while (!conns.empty()) {
-      AsyncConnection *p = conns.begin()->second;
+      AsyncConnectionRef p = conns.begin()->second;
       _stop_conn(p);
     }
   }
@@ -394,17 +394,17 @@ void AsyncMessenger::wait()
   started = false;
 }
 
-AsyncConnection *AsyncMessenger::add_accept(int sd)
+AsyncConnectionRef AsyncMessenger::add_accept(int sd)
 {
   lock.Lock();
-  AsyncConnection *conn = new AsyncConnection(cct, this);
+  AsyncConnectionRef conn = new AsyncConnection(cct, this);
   conn->accept(sd);
   accepting_conns.insert(conn);
   lock.Unlock();
   return conn;
 }
 
-AsyncConnection *AsyncMessenger::create_connect(const entity_addr_t& addr, int type)
+AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int type)
 {
   assert(lock.is_locked());
   assert(addr != my_inst.addr);
@@ -413,7 +413,7 @@ AsyncConnection *AsyncMessenger::create_connect(const entity_addr_t& addr, int t
                  << ", creating connection and registering" << dendl;
 
   // create connection
-  AsyncConnection *conn = new AsyncConnection(cct, this);
+  AsyncConnectionRef conn = new AsyncConnection(cct, this);
   conn->connect(addr, type);
   assert(!conns.count(addr));
   conns[addr] = conn;
@@ -429,7 +429,7 @@ ConnectionRef AsyncMessenger::get_connection(const entity_inst_t& dest)
     return local_connection;
   }
 
-  AsyncConnection *conn = _lookup_conn(dest.addr);
+  AsyncConnectionRef conn = _lookup_conn(dest.addr);
   if (conn) {
     ldout(cct, 10) << __func__ << " " << dest << " existing " << conn << dendl;
   } else {
@@ -464,12 +464,12 @@ int AsyncMessenger::_send_message(Message *m, const entity_inst_t& dest)
     return -EINVAL;
   }
 
-  AsyncConnection *conn = _lookup_conn(dest.addr);
+  AsyncConnectionRef conn = _lookup_conn(dest.addr);
   submit_message(m, conn, dest.addr, dest.name.type());
   return 0;
 }
 
-void AsyncMessenger::submit_message(Message *m, AsyncConnection *con,
+void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con,
                                     const entity_addr_t& dest_addr, int dest_type)
 {
   if (cct->_conf->ms_dump_on_send) {
@@ -547,24 +547,24 @@ void AsyncMessenger::mark_down_all()
 {
   ldout(cct,1) << __func__ << " " << dendl;
   lock.Lock();
-  for (set<AsyncConnection*>::iterator q = accepting_conns.begin();
+  for (set<AsyncConnectionRef>::iterator q = accepting_conns.begin();
        q != accepting_conns.end(); ++q) {
-    AsyncConnection *p = *q;
+    AsyncConnectionRef p = *q;
     ldout(cct, 5) << __func__ << " accepting_conn " << p << dendl;
     p->mark_down();
     p->get();
-    ms_deliver_handle_reset(p);
+    ms_deliver_handle_reset(p.get());
   }
   accepting_conns.clear();
 
   while (!conns.empty()) {
-    ceph::unordered_map<entity_addr_t, AsyncConnection*>::iterator it = conns.begin();
-    AsyncConnection *p = it->second;
+    ceph::unordered_map<entity_addr_t, AsyncConnectionRef>::iterator it = conns.begin();
+    AsyncConnectionRef p = it->second;
     ldout(cct, 5) << __func__ << " " << it->first << " " << p << dendl;
     conns.erase(it);
     p->mark_down();
     p->get();
-    ms_deliver_handle_reset(p);
+    ms_deliver_handle_reset(p.get());
   }
   lock.Unlock();
 }
@@ -572,12 +572,12 @@ void AsyncMessenger::mark_down_all()
 void AsyncMessenger::mark_down(const entity_addr_t& addr)
 {
   lock.Lock();
-  AsyncConnection *p = _lookup_conn(addr);
+  AsyncConnectionRef p = _lookup_conn(addr);
   if (p) {
     ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl;
     _stop_conn(p);
     p->get();
-    ms_deliver_handle_reset(p);
+    ms_deliver_handle_reset(p.get());
   } else {
     ldout(cct, 1) << __func__ << " " << addr << " -- pipe dne" << dendl;
   }
index a7666871dcf2b56227e5ff5b94ec2e5e38ce3234..efe811f52c561316e907e904e046f1bc652b952b 100644 (file)
@@ -204,7 +204,7 @@ private:
    * @return a pointer to the newly-created connection. Caller does not own a
    * reference; take one if you need it.
    */
-  AsyncConnection *create_connect(const entity_addr_t& addr, int type);
+  AsyncConnectionRef create_connect(const entity_addr_t& addr, int type);
 
   /**
    * Queue up a Message for delivery to the entity specified
@@ -218,7 +218,7 @@ private:
    * @param dest_type The peer type of the address we're sending to
    * just drop silently under failure.
    */
-  void submit_message(Message *m, AsyncConnection *con,
+  void submit_message(Message *m, AsyncConnectionRef con,
                       const entity_addr_t& dest_addr, int dest_type);
 
   int _send_message(Message *m, const entity_inst_t& dest);
@@ -255,21 +255,21 @@ private:
    * NOTE: a Asyncconnection* with state CLOSED may still be in the map but is considered
    * invalid and can be replaced by anyone holding the msgr lock
    */
-  ceph::unordered_map<entity_addr_t, AsyncConnection*> conns;
+  ceph::unordered_map<entity_addr_t, AsyncConnectionRef> conns;
 
   /**
    * list of connection are in teh process of accepting
    *
    * These are not yet in the conns map.
    */
-  set<AsyncConnection*> accepting_conns;
+  set<AsyncConnectionRef> accepting_conns;
 
   /// internal cluster protocol version, if any, for talking to entities of the same type.
   int cluster_protocol;
 
-  AsyncConnection *_lookup_conn(const entity_addr_t& k) {
+  AsyncConnectionRef _lookup_conn(const entity_addr_t& k) {
     assert(lock.is_locked());
-    ceph::unordered_map<entity_addr_t, AsyncConnection*>::iterator p = conns.find(k);
+    ceph::unordered_map<entity_addr_t, AsyncConnectionRef>::iterator p = conns.find(k);
     if (p == conns.end())
       return NULL;
     if (!p->second->is_connected()) {
@@ -281,7 +281,7 @@ private:
     return p->second;
   }
 
-  void _stop_conn(AsyncConnection *c) {
+  void _stop_conn(AsyncConnectionRef c) {
     assert(lock.is_locked());
     if (c) {
       c->mark_down();
@@ -311,12 +311,12 @@ public:
   /**
    * This wraps _lookup_conn.
    */
-  AsyncConnection *lookup_conn(const entity_addr_t& k) {
+  AsyncConnectionRef lookup_conn(const entity_addr_t& k) {
     Mutex::Locker l(lock);
     return _lookup_conn(k);
   }
 
-  void accept_conn(AsyncConnection *conn) {
+  void accept_conn(AsyncConnectionRef conn) {
     Mutex::Locker l(lock);
     if (conns.count(conn->get_peer_addr()))
       delete conns[conn->get_peer_addr()];
@@ -325,7 +325,7 @@ public:
   }
 
   void learned_addr(const entity_addr_t &peer_addr_for_me);
-  AsyncConnection *add_accept(int sd);
+  AsyncConnectionRef add_accept(int sd);
 
   /**
    * This wraps ms_deliver_get_authorizer. We use it for AsyncConnection.