From: Haomai Wang Date: Tue, 2 Sep 2014 03:47:31 +0000 (+0800) Subject: Replace AsyncConnection* with AsyncconnectionRef X-Git-Tag: v0.88~37^2~4^2~32 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3554bed4a089d50342a19b637c7ef1797371dac5;p=ceph.git Replace AsyncConnection* with AsyncconnectionRef Signed-off-by: Haomai Wang --- diff --git a/src/msg/AsyncConnection.cc b/src/msg/AsyncConnection.cc index 731a87fa093..730cf484918 100644 --- a/src/msg/AsyncConnection.cc +++ b/src/msg/AsyncConnection.cc @@ -26,42 +26,33 @@ ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) { } class C_handle_read : public EventCallback { - AsyncConnection *conn; + AsyncConnectionRef conn; public: - C_handle_read(AsyncConnection *c): conn(c) { - conn->get(); - } + C_handle_read(AsyncConnectionRef c): conn(c) {} void do_request(int fd) { conn->process(); - conn->put(); } }; class C_handle_write : public EventCallback { - AsyncConnection *conn; + AsyncConnectionRef conn; public: - C_handle_write(AsyncConnection *c): conn(c) { - conn->get(); - } + C_handle_write(AsyncConnectionRef c): conn(c) {} void do_request(int fd) { conn->handle_write(); - conn->put(); } }; class C_handle_reset : public EventCallback { AsyncMessenger *msgr; - AsyncConnection *conn; + AsyncConnectionRef conn; public: - C_handle_reset(AsyncMessenger *m, AsyncConnection *c): msgr(m), conn(c) { - conn->get(); - } + C_handle_reset(AsyncMessenger *m, AsyncConnectionRef c): msgr(m), conn(c) {} void do_request(int id) { - msgr->ms_deliver_handle_reset(conn); - conn->put(); + msgr->ms_deliver_handle_reset(conn.get()); } }; @@ -1023,9 +1014,7 @@ int AsyncConnection::_process_connection() session_security.reset(); } - get(); async_msgr->ms_deliver_handle_connect(this); - get(); async_msgr->ms_deliver_handle_fast_connect(this); // reset connect state variables @@ -1323,7 +1312,6 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a } bool authorizer_valid; - get(); if (!async_msgr->verify_authorizer(this, peer_type, connect.authorizer_protocol, authorizer_bl, authorizer_reply, authorizer_valid, session_key) || !authorizer_valid) { ldout(async_msgr->cct,0) << __func__ << ": got bad authorizer" << dendl; @@ -1335,7 +1323,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a ldout(async_msgr->cct, 10) << __func__ << " accept: setting up session_security." << dendl; // existing? - AsyncConnection *existing = async_msgr->lookup_conn(peer_addr); + AsyncConnectionRef existing = async_msgr->lookup_conn(peer_addr); if (existing) { if (connect.global_seq < existing->peer_global_seq) { ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing @@ -1462,11 +1450,9 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a } if (existing->policy.lossy) { // disconnect from the Connection - existing->get(); async_msgr->ms_deliver_handle_reset(existing); } else { // queue a reset on the new connection, which we're dumping for the old - get(); async_msgr->ms_deliver_handle_reset(this); // reset the in_seq if this is a hard reset from peer, @@ -1515,9 +1501,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a session_key, get_features())); // notify - get(); async_msgr->ms_deliver_handle_accept(this); - get(); async_msgr->ms_deliver_handle_fast_accept(this); // ok! @@ -1708,7 +1692,6 @@ void AsyncConnection::was_session_reset() discard_out_queue(); outcoming_bl.clear(); - get(); async_msgr->ms_deliver_handle_remote_reset(this); if (randomize_out_seq()) { @@ -1744,7 +1727,6 @@ int AsyncConnection::_send(Message *m) } // associate message with Connection (for benefit of encode_payload) - get(); m->set_connection(this); uint64_t features = get_features(); diff --git a/src/msg/AsyncConnection.h b/src/msg/AsyncConnection.h index 929cd56242d..a010cb0912b 100644 --- a/src/msg/AsyncConnection.h +++ b/src/msg/AsyncConnection.h @@ -248,4 +248,6 @@ class AsyncConnection : public Connection { ceph::shared_ptr session_security; }; /* AsyncConnection */ +typedef boost::intrusive_ptr AsyncConnectionRef; + #endif diff --git a/src/msg/AsyncMessenger.cc b/src/msg/AsyncMessenger.cc index e1144ccd458..b1f69f447a3 100644 --- a/src/msg/AsyncMessenger.cc +++ b/src/msg/AsyncMessenger.cc @@ -383,7 +383,7 @@ void AsyncMessenger::wait() ldout(cct, 10) << __func__ << ": closing pipes" << dendl; while (!conns.empty()) { - AsyncConnection *p = conns.begin()->second; + AsyncConnectionRef p = conns.begin()->second; _stop_conn(p); } } @@ -394,17 +394,17 @@ void AsyncMessenger::wait() started = false; } -AsyncConnection *AsyncMessenger::add_accept(int sd) +AsyncConnectionRef AsyncMessenger::add_accept(int sd) { lock.Lock(); - AsyncConnection *conn = new AsyncConnection(cct, this); + AsyncConnectionRef conn = new AsyncConnection(cct, this); conn->accept(sd); accepting_conns.insert(conn); lock.Unlock(); return conn; } -AsyncConnection *AsyncMessenger::create_connect(const entity_addr_t& addr, int type) +AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int type) { assert(lock.is_locked()); assert(addr != my_inst.addr); @@ -413,7 +413,7 @@ AsyncConnection *AsyncMessenger::create_connect(const entity_addr_t& addr, int t << ", creating connection and registering" << dendl; // create connection - AsyncConnection *conn = new AsyncConnection(cct, this); + AsyncConnectionRef conn = new AsyncConnection(cct, this); conn->connect(addr, type); assert(!conns.count(addr)); conns[addr] = conn; @@ -429,7 +429,7 @@ ConnectionRef AsyncMessenger::get_connection(const entity_inst_t& dest) return local_connection; } - AsyncConnection *conn = _lookup_conn(dest.addr); + AsyncConnectionRef conn = _lookup_conn(dest.addr); if (conn) { ldout(cct, 10) << __func__ << " " << dest << " existing " << conn << dendl; } else { @@ -464,12 +464,12 @@ int AsyncMessenger::_send_message(Message *m, const entity_inst_t& dest) return -EINVAL; } - AsyncConnection *conn = _lookup_conn(dest.addr); + AsyncConnectionRef conn = _lookup_conn(dest.addr); submit_message(m, conn, dest.addr, dest.name.type()); return 0; } -void AsyncMessenger::submit_message(Message *m, AsyncConnection *con, +void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con, const entity_addr_t& dest_addr, int dest_type) { if (cct->_conf->ms_dump_on_send) { @@ -547,24 +547,24 @@ void AsyncMessenger::mark_down_all() { ldout(cct,1) << __func__ << " " << dendl; lock.Lock(); - for (set::iterator q = accepting_conns.begin(); + for (set::iterator q = accepting_conns.begin(); q != accepting_conns.end(); ++q) { - AsyncConnection *p = *q; + AsyncConnectionRef p = *q; ldout(cct, 5) << __func__ << " accepting_conn " << p << dendl; p->mark_down(); p->get(); - ms_deliver_handle_reset(p); + ms_deliver_handle_reset(p.get()); } accepting_conns.clear(); while (!conns.empty()) { - ceph::unordered_map::iterator it = conns.begin(); - AsyncConnection *p = it->second; + ceph::unordered_map::iterator it = conns.begin(); + AsyncConnectionRef p = it->second; ldout(cct, 5) << __func__ << " " << it->first << " " << p << dendl; conns.erase(it); p->mark_down(); p->get(); - ms_deliver_handle_reset(p); + ms_deliver_handle_reset(p.get()); } lock.Unlock(); } @@ -572,12 +572,12 @@ void AsyncMessenger::mark_down_all() void AsyncMessenger::mark_down(const entity_addr_t& addr) { lock.Lock(); - AsyncConnection *p = _lookup_conn(addr); + AsyncConnectionRef p = _lookup_conn(addr); if (p) { ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl; _stop_conn(p); p->get(); - ms_deliver_handle_reset(p); + ms_deliver_handle_reset(p.get()); } else { ldout(cct, 1) << __func__ << " " << addr << " -- pipe dne" << dendl; } diff --git a/src/msg/AsyncMessenger.h b/src/msg/AsyncMessenger.h index a7666871dcf..efe811f52c5 100644 --- a/src/msg/AsyncMessenger.h +++ b/src/msg/AsyncMessenger.h @@ -204,7 +204,7 @@ private: * @return a pointer to the newly-created connection. Caller does not own a * reference; take one if you need it. */ - AsyncConnection *create_connect(const entity_addr_t& addr, int type); + AsyncConnectionRef create_connect(const entity_addr_t& addr, int type); /** * Queue up a Message for delivery to the entity specified @@ -218,7 +218,7 @@ private: * @param dest_type The peer type of the address we're sending to * just drop silently under failure. */ - void submit_message(Message *m, AsyncConnection *con, + void submit_message(Message *m, AsyncConnectionRef con, const entity_addr_t& dest_addr, int dest_type); int _send_message(Message *m, const entity_inst_t& dest); @@ -255,21 +255,21 @@ private: * NOTE: a Asyncconnection* with state CLOSED may still be in the map but is considered * invalid and can be replaced by anyone holding the msgr lock */ - ceph::unordered_map conns; + ceph::unordered_map conns; /** * list of connection are in teh process of accepting * * These are not yet in the conns map. */ - set accepting_conns; + set accepting_conns; /// internal cluster protocol version, if any, for talking to entities of the same type. int cluster_protocol; - AsyncConnection *_lookup_conn(const entity_addr_t& k) { + AsyncConnectionRef _lookup_conn(const entity_addr_t& k) { assert(lock.is_locked()); - ceph::unordered_map::iterator p = conns.find(k); + ceph::unordered_map::iterator p = conns.find(k); if (p == conns.end()) return NULL; if (!p->second->is_connected()) { @@ -281,7 +281,7 @@ private: return p->second; } - void _stop_conn(AsyncConnection *c) { + void _stop_conn(AsyncConnectionRef c) { assert(lock.is_locked()); if (c) { c->mark_down(); @@ -311,12 +311,12 @@ public: /** * This wraps _lookup_conn. */ - AsyncConnection *lookup_conn(const entity_addr_t& k) { + AsyncConnectionRef lookup_conn(const entity_addr_t& k) { Mutex::Locker l(lock); return _lookup_conn(k); } - void accept_conn(AsyncConnection *conn) { + void accept_conn(AsyncConnectionRef conn) { Mutex::Locker l(lock); if (conns.count(conn->get_peer_addr())) delete conns[conn->get_peer_addr()]; @@ -325,7 +325,7 @@ public: } void learned_addr(const entity_addr_t &peer_addr_for_me); - AsyncConnection *add_accept(int sd); + AsyncConnectionRef add_accept(int sd); /** * This wraps ms_deliver_get_authorizer. We use it for AsyncConnection.