From: Kefu Chai Date: Tue, 29 May 2018 07:51:07 +0000 (+0800) Subject: mds,osd,mon,msg: use intrusive_ptr for holding Connection::priv X-Git-Tag: v14.0.1~1233^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=refs%2Fpull%2F22292%2Fhead;p=ceph.git mds,osd,mon,msg: use intrusive_ptr for holding Connection::priv See-also: http://tracker.ceph.com/issues/20924 Signed-off-by: Kefu Chai --- diff --git a/src/common/RefCountedObj.h b/src/common/RefCountedObj.h index 9c0dad1d302f..325304c1441c 100644 --- a/src/common/RefCountedObj.h +++ b/src/common/RefCountedObj.h @@ -20,6 +20,8 @@ #include "common/ceph_context.h" #include "common/valgrind.h" +#include + // re-include our assert to clobber the system one; fix dout: #include "include/assert.h" @@ -164,4 +166,6 @@ struct RefCountedWaitObject { void intrusive_ptr_add_ref(const RefCountedObject *p); void intrusive_ptr_release(const RefCountedObject *p); +using RefCountedPtr = boost::intrusive_ptr; + #endif diff --git a/src/mds/MDSDaemon.cc b/src/mds/MDSDaemon.cc index 847a9ea716fb..183f18e5f6ea 100644 --- a/src/mds/MDSDaemon.cc +++ b/src/mds/MDSDaemon.cc @@ -578,7 +578,8 @@ void MDSDaemon::send_command_reply(MCommand *m, MDSRank *mds_rank, int r, bufferlist outbl, std::string_view outs) { - Session *session = static_cast(m->get_connection()->get_priv()); + auto priv = m->get_connection()->get_priv(); + auto session = static_cast(priv.get()); assert(session != NULL); // If someone is using a closed session for sending commands (e.g. // the ceph CLI) then we should feel free to clean up this connection @@ -594,7 +595,7 @@ void MDSDaemon::send_command_reply(MCommand *m, MDSRank *mds_rank, assert(session->is_closed()); session->connection->mark_disposable(); } - session->put(); + priv.reset(); MCommandReply *reply = new MCommandReply(r, outs); reply->set_tid(m->get_tid()); @@ -605,7 +606,8 @@ void MDSDaemon::send_command_reply(MCommand *m, MDSRank *mds_rank, /* This function DOES put the passed message before returning*/ void MDSDaemon::handle_command(MCommand *m) { - Session *session = static_cast(m->get_connection()->get_priv()); + auto priv = m->get_connection()->get_priv(); + auto session = static_cast(priv.get()); assert(session != NULL); int r = 0; @@ -633,7 +635,7 @@ void MDSDaemon::handle_command(MCommand *m) } else { r = _handle_command(cmdmap, m, &outbl, &outs, &run_after, &need_reply); } - session->put(); + priv.reset(); if (need_reply) { send_command_reply(m, mds_rank, r, outbl, outs); @@ -1247,14 +1249,13 @@ bool MDSDaemon::ms_handle_reset(Connection *con) if (beacon.get_want_state() == CEPH_MDS_STATE_DNE) return false; - Session *session = static_cast(con->get_priv()); - if (session) { + auto priv = con->get_priv(); + if (auto session = static_cast(priv.get()); session) { if (session->is_closed()) { dout(3) << "ms_handle_reset closing connection for session " << session->info.inst << dendl; con->mark_down(); - con->set_priv(NULL); + con->set_priv(nullptr); } - session->put(); } else { con->mark_down(); } @@ -1276,14 +1277,13 @@ void MDSDaemon::ms_handle_remote_reset(Connection *con) if (beacon.get_want_state() == CEPH_MDS_STATE_DNE) return; - Session *session = static_cast(con->get_priv()); - if (session) { + auto priv = con->get_priv(); + if (auto session = static_cast(priv.get()); session) { if (session->is_closed()) { dout(3) << "ms_handle_remote_reset closing connection for session " << session->info.inst << dendl; con->mark_down(); - con->set_priv(NULL); + con->set_priv(nullptr); } - session->put(); } } @@ -1356,7 +1356,7 @@ bool MDSDaemon::ms_verify_authorizer(Connection *con, int peer_type, s->info.inst.addr = con->get_peer_addr(); s->info.inst.name = n; dout(10) << " new session " << s << " for " << s->info.inst << " con " << con << dendl; - con->set_priv(s); + con->set_priv(RefCountedPtr{s, false}); s->connection = con; if (mds_rank) { mds_rank->kick_waiters_for_any_client_connection(); @@ -1364,7 +1364,7 @@ bool MDSDaemon::ms_verify_authorizer(Connection *con, int peer_type, } else { dout(10) << " existing session " << s << " for " << s->info.inst << " existing con " << s->connection << ", new/authorizing con " << con << dendl; - con->set_priv(s->get()); + con->set_priv(RefCountedPtr{s, false}); @@ -1420,7 +1420,8 @@ void MDSDaemon::ms_handle_accept(Connection *con) return; } - Session *s = static_cast(con->get_priv()); + auto priv = con->get_priv(); + auto s = static_cast(priv.get()); dout(10) << "ms_handle_accept " << con->get_peer_addr() << " con " << con << " session " << s << dendl; if (s) { if (s->connection != con) { @@ -1433,7 +1434,6 @@ void MDSDaemon::ms_handle_accept(Connection *con) s->preopen_out_queue.pop_front(); } } - s->put(); } } diff --git a/src/mds/MDSRank.cc b/src/mds/MDSRank.cc index 93a0f4839f29..30f2122e3126 100644 --- a/src/mds/MDSRank.cc +++ b/src/mds/MDSRank.cc @@ -899,9 +899,9 @@ bool MDSRank::is_stale_message(Message *m) const Session *MDSRank::get_session(Message *m) { - Session *session = static_cast(m->get_connection()->get_priv()); + // do not carry ref + auto session = static_cast(m->get_connection()->get_priv().get()); if (session) { - session->put(); // do not carry ref dout(20) << "get_session have " << session << " " << session->info.inst << " state " << session->get_state_name() << dendl; // Check if we've imported an open session since (new sessions start closed) @@ -1016,9 +1016,9 @@ void MDSRank::send_message_client_counted(Message *m, client_t client) void MDSRank::send_message_client_counted(Message *m, Connection *connection) { - Session *session = static_cast(connection->get_priv()); + // do not carry ref + auto session = static_cast(m->get_connection()->get_priv().get()); if (session) { - session->put(); // do not carry ref send_message_client_counted(m, session); } else { dout(10) << "send_message_client_counted has no session for " << m->get_source_inst() << dendl; diff --git a/src/messages/PaxosServiceMessage.h b/src/messages/PaxosServiceMessage.h index daaf74745f9c..a13dc938e8a3 100644 --- a/src/messages/PaxosServiceMessage.h +++ b/src/messages/PaxosServiceMessage.h @@ -59,10 +59,8 @@ class PaxosServiceMessage : public Message { * normally. */ MonSession *get_session() { - MonSession *session = (MonSession *)get_connection()->get_priv(); - if (session) - session->put(); - return session; + auto priv = get_connection()->get_priv(); + return static_cast(priv.get()); } const char *get_type_name() const override { return "PaxosServiceMessage"; } diff --git a/src/mgr/DaemonServer.cc b/src/mgr/DaemonServer.cc index d976761542bc..36341b463b68 100644 --- a/src/mgr/DaemonServer.cc +++ b/src/mgr/DaemonServer.cc @@ -246,11 +246,11 @@ bool DaemonServer::ms_get_authorizer(int dest_type, bool DaemonServer::ms_handle_reset(Connection *con) { if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) { - MgrSessionRef session(static_cast(con->get_priv())); + auto priv = con->get_priv(); + auto session = static_cast(priv.get()); if (!session) { return false; } - session->put(); // SessionRef takes a ref Mutex::Locker l(lock); dout(10) << "unregistering osd." << session->osd_id << " session " << session << " con " << con << dendl; @@ -510,12 +510,12 @@ bool DaemonServer::handle_report(MMgrReport *m) { Mutex::Locker l(lock); // kill session - MgrSessionRef session(static_cast(m->get_connection()->get_priv())); + auto priv = m->get_connection()->get_priv(); + auto session = static_cast(priv.get()); if (!session) { return false; } m->get_connection()->mark_down(); - session->put(); dout(10) << "unregistering osd." << session->osd_id << " session " << session << " con " << m->get_connection() << dendl; @@ -721,11 +721,11 @@ bool DaemonServer::handle_command(MCommand *m) std::shared_ptr cmdctx = std::make_shared(m); - MgrSessionRef session(static_cast(m->get_connection()->get_priv())); + auto priv = m->get_connection()->get_priv(); + auto session = static_cast(priv.get()); if (!session) { return true; } - session->put(); // SessionRef takes a ref if (session->inst.name == entity_name_t()) session->inst.name = m->get_source(); @@ -785,11 +785,11 @@ bool DaemonServer::handle_command(MCommand *m) bool is_allowed; if (!mgr_cmd) { MonCommand py_command = {"", "", "py", "rw", "cli"}; - is_allowed = _allowed_command(session.get(), py_command.module, + is_allowed = _allowed_command(session, py_command.module, prefix, cmdctx->cmdmap, param_str_map, &py_command); } else { // validate user's permissions for requested command - is_allowed = _allowed_command(session.get(), mgr_cmd->module, + is_allowed = _allowed_command(session, mgr_cmd->module, prefix, cmdctx->cmdmap, param_str_map, mgr_cmd); } if (!is_allowed) { diff --git a/src/mgr/DaemonState.cc b/src/mgr/DaemonState.cc index e2f0922079f2..0c4034072c33 100644 --- a/src/mgr/DaemonState.cc +++ b/src/mgr/DaemonState.cc @@ -134,8 +134,8 @@ void DaemonPerfCounters::update(MMgrReport *report) << report->packed.length() << " bytes of data" << dendl; // Retrieve session state - MgrSessionRef session(static_cast( - report->get_connection()->get_priv())); + auto priv = report->get_connection()->get_priv(); + auto session = static_cast(priv.get()); // Load any newly declared types for (const auto &t : report->declare_types) { diff --git a/src/mon/MonOpRequest.h b/src/mon/MonOpRequest.h index b31ee02a46e9..364232b1f9d8 100644 --- a/src/mon/MonOpRequest.h +++ b/src/mon/MonOpRequest.h @@ -80,7 +80,7 @@ struct MonOpRequest : public TrackedOp { private: Message *request; utime_t dequeued_time; - MonSession *session; + RefCountedPtr session; ConnectionRef con; bool forwarded_to_leader; op_type_t op_type; @@ -90,7 +90,6 @@ private: req->get_recv_stamp().is_zero() ? ceph_clock_now() : req->get_recv_stamp()), request(req), - session(NULL), con(NULL), forwarded_to_leader(false), op_type(OP_TYPE_NONE) @@ -98,7 +97,7 @@ private: if (req) { con = req->get_connection(); if (con) { - session = static_cast(con->get_priv()); + session = con->get_priv(); } } } @@ -128,15 +127,10 @@ protected: public: ~MonOpRequest() override { request->put(); - // certain ops may not have a session (e.g., AUTH or PING) - if (session) - session->put(); } MonSession *get_session() const { - if (!session) - return NULL; - return session; + return static_cast(session.get()); } template @@ -153,16 +147,7 @@ public: ConnectionRef get_connection() { return con; } void set_session(MonSession *s) { - if (session) { - // we will be rewriting the existing session; drop the ref. - session->put(); - } - - if (s == NULL) { - session = NULL; - } else { - session = static_cast(s->get()); - } + session.reset(s); } bool is_src_mon() const { diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index e77441f94249..4441e8c73ee2 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -2934,15 +2934,12 @@ void Monitor::handle_command(MonOpRequestRef op) return; } - MonSession *session = static_cast( - m->get_connection()->get_priv()); + auto priv = m->get_connection()->get_priv(); + auto session = static_cast(priv.get()); if (!session) { dout(5) << __func__ << " dropping stray message " << *m << dendl; return; } - BOOST_SCOPE_EXIT_ALL(=) { - session->put(); - }; if (m->cmd.empty()) { string rs = "No command supplied"; @@ -3733,7 +3730,7 @@ void Monitor::handle_forward(MonOpRequestRef op) ConnectionRef c(new AnonConnection(cct)); MonSession *s = new MonSession(req->get_source_inst(), static_cast(c.get())); - c->set_priv(s->get()); + c->set_priv(RefCountedPtr{s, false}); c->set_peer_addr(m->client.addr); c->set_peer_type(m->client.name.type()); c->set_features(m->con_features); @@ -3771,7 +3768,6 @@ void Monitor::handle_forward(MonOpRequestRef op) dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl; _ms_dispatch(req); - s->put(); } } @@ -3949,7 +3945,7 @@ void Monitor::remove_session(MonSession *s) routed_requests.erase(*p); } s->routed_request_tids.clear(); - s->con->set_priv(NULL); + s->con->set_priv(nullptr); session_map.remove_session(s); logger->set(l_mon_num_sessions, session_map.get_size()); logger->inc(l_mon_session_rm); @@ -4077,7 +4073,7 @@ void Monitor::_ms_dispatch(Message *m) s = session_map.new_session(m->get_source_inst(), con.get()); } assert(s); - con->set_priv(s->get()); + con->set_priv(RefCountedPtr{s, false}); dout(10) << __func__ << " new session " << s << " " << *s << " features 0x" << std::hex << s->con_features << std::dec << dendl; @@ -4093,7 +4089,6 @@ void Monitor::_ms_dispatch(Message *m) s->caps = *mon_caps; s->authenticated = true; } - s->put(); } else { dout(20) << __func__ << " existing session " << s << " for " << s->inst << dendl; @@ -4955,12 +4950,13 @@ bool Monitor::ms_handle_reset(Connection *con) if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) return false; - MonSession *s = static_cast(con->get_priv()); + auto priv = con->get_priv(); + auto s = static_cast(priv.get()); if (!s) return false; // break any con <-> session ref cycle - s->con->set_priv(NULL); + s->con->set_priv(nullptr); if (is_shutdown()) return false; @@ -4972,7 +4968,6 @@ bool Monitor::ms_handle_reset(Connection *con) Mutex::Locker l(session_map_lock); remove_session(s); } - s->put(); return true; } diff --git a/src/msg/Connection.h b/src/msg/Connection.h index b8ffbbf9b308..a19092b93eb6 100644 --- a/src/msg/Connection.h +++ b/src/msg/Connection.h @@ -41,7 +41,7 @@ class Messenger; struct Connection : public RefCountedObject { mutable Mutex lock; Messenger *msgr; - RefCountedObject *priv; + RefCountedPtr priv; int peer_type; entity_addr_t peer_addr; utime_t last_keepalive, last_keepalive_ack; @@ -63,7 +63,6 @@ public: : RefCountedObject(cct, 0), lock("Connection::lock"), msgr(m), - priv(NULL), peer_type(-1), features(0), failed(false), @@ -72,24 +71,16 @@ public: ~Connection() override { //generic_dout(0) << "~Connection " << this << dendl; - if (priv) { - //generic_dout(0) << "~Connection " << this << " dropping priv " << priv << dendl; - priv->put(); - } } - void set_priv(RefCountedObject *o) { + void set_priv(const RefCountedPtr& o) { Mutex::Locker l(lock); - if (priv) - priv->put(); priv = o; } - RefCountedObject *get_priv() { + RefCountedPtr get_priv() { Mutex::Locker l(lock); - if (priv) - return priv->get(); - return NULL; + return priv; } /** diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 68cfa7d5bc0a..8a90d9d0370a 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -4278,12 +4278,12 @@ void OSD::_add_heartbeat_peer(int p) return; hi = &heartbeat_peers[p]; hi->peer = p; - HeartbeatSession *s = new HeartbeatSession(p); + RefCountedPtr s{new HeartbeatSession{p}, false}; hi->con_back = cons.first.get(); - hi->con_back->set_priv(s->get()); + hi->con_back->set_priv(s); if (cons.second) { hi->con_front = cons.second.get(); - hi->con_front->set_priv(s->get()); + hi->con_front->set_priv(s); dout(10) << "_add_heartbeat_peer: new peer osd." << p << " " << hi->con_back->get_peer_addr() << " " << hi->con_front->get_peer_addr() @@ -4294,7 +4294,6 @@ void OSD::_add_heartbeat_peer(int p) << " " << hi->con_back->get_peer_addr() << dendl; } - s->put(); } else { hi = &i->second; } @@ -4729,15 +4728,15 @@ void OSD::heartbeat() bool OSD::heartbeat_reset(Connection *con) { - HeartbeatSession *s = static_cast(con->get_priv()); + auto s = con->get_priv(); if (s) { heartbeat_lock.Lock(); if (is_stopping()) { heartbeat_lock.Unlock(); - s->put(); return true; } - map::iterator p = heartbeat_peers.find(s->peer); + auto heartbeat_session = static_cast(s.get()); + auto p = heartbeat_peers.find(heartbeat_session->peer); if (p != heartbeat_peers.end() && (p->second.con_back == con || p->second.con_front == con)) { @@ -4754,10 +4753,10 @@ bool OSD::heartbeat_reset(Connection *con) pair newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch); if (newcon.first) { p->second.con_back = newcon.first.get(); - p->second.con_back->set_priv(s->get()); + p->second.con_back->set_priv(s); if (newcon.second) { p->second.con_front = newcon.second.get(); - p->second.con_front->set_priv(s->get()); + p->second.con_front->set_priv(s); } } else { dout(10) << "heartbeat_reset failed hb con " << con << " for osd." << p->second.peer @@ -4768,7 +4767,6 @@ bool OSD::heartbeat_reset(Connection *con) dout(10) << "heartbeat_reset closing (old) failed hb con " << con << dendl; } heartbeat_lock.Unlock(); - s->put(); } return true; } @@ -5135,10 +5133,11 @@ void OSD::ms_handle_fast_connect(Connection *con) { if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON && con->get_peer_type() != CEPH_ENTITY_TYPE_MGR) { - Session *s = static_cast(con->get_priv()); + auto priv = con->get_priv(); + auto s = static_cast(priv.get()); if (!s) { - s = new Session(cct); - con->set_priv(s->get()); + s = new Session{cct}; + con->set_priv(RefCountedPtr{s, false}); s->con = con; dout(10) << " new session (outgoing) " << s << " con=" << s->con << " addr=" << s->con->get_peer_addr() << dendl; @@ -5146,7 +5145,6 @@ void OSD::ms_handle_fast_connect(Connection *con) assert(con->get_peer_type() == CEPH_ENTITY_TYPE_OSD); s->entity_name.set_type(CEPH_ENTITY_TYPE_OSD); } - s->put(); } } @@ -5154,10 +5152,11 @@ void OSD::ms_handle_fast_accept(Connection *con) { if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON && con->get_peer_type() != CEPH_ENTITY_TYPE_MGR) { - Session *s = static_cast(con->get_priv()); + auto priv = con->get_priv(); + auto s = static_cast(priv.get()); if (!s) { - s = new Session(cct); - con->set_priv(s->get()); + s = new Session{cct}; + con->set_priv(RefCountedPtr{s, false}); s->con = con; dout(10) << "new session (incoming)" << s << " con=" << con << " addr=" << con->get_peer_addr() @@ -5165,23 +5164,23 @@ void OSD::ms_handle_fast_accept(Connection *con) assert(con->get_peer_type() == CEPH_ENTITY_TYPE_OSD); s->entity_name.set_type(CEPH_ENTITY_TYPE_OSD); } - s->put(); } } bool OSD::ms_handle_reset(Connection *con) { - Session *session = static_cast(con->get_priv()); + auto s = con->get_priv(); + auto session = static_cast(s.get()); dout(2) << "ms_handle_reset con " << con << " session " << session << dendl; if (!session) return false; session->wstate.reset(con); - session->con.reset(NULL); // break con <-> session ref cycle + session->con->set_priv(nullptr); + session->con.reset(); // break con <-> session ref cycle // note that we break session->con *before* the session_handle_reset // cleanup below. this avoids a race between us and // PG::add_backoff, Session::check_backoff, etc. - session_handle_reset(session); - session->put(); + session_handle_reset(SessionRef{session}); return true; } @@ -5190,7 +5189,8 @@ bool OSD::ms_handle_refused(Connection *con) if (!cct->_conf->osd_fast_fail_on_connection_refused) return false; - Session *session = static_cast(con->get_priv()); + auto priv = con->get_priv(); + auto session = static_cast(priv.get()); dout(2) << "ms_handle_refused con " << con << " session " << session << dendl; if (!session) return false; @@ -5212,7 +5212,6 @@ bool OSD::ms_handle_refused(Connection *con) } } } - session->put(); return true; } @@ -5365,11 +5364,9 @@ void OSD::_send_boot() cluster_messenger->set_addr_unknowns(cluster_addr); dout(10) << " assuming cluster_addr ip matches client_addr" << dendl; } else { - Session *s = static_cast(local_connection->get_priv()); - if (s) - s->put(); - else + if (auto session = local_connection->get_priv(); !session) { cluster_messenger->ms_deliver_handle_fast_connect(local_connection); + } } entity_addr_t hb_back_addr = hb_back_server_messenger->get_myaddr(); @@ -5381,11 +5378,9 @@ void OSD::_send_boot() hb_back_server_messenger->set_addr_unknowns(hb_back_addr); dout(10) << " assuming hb_back_addr ip matches cluster_addr" << dendl; } else { - Session *s = static_cast(local_connection->get_priv()); - if (s) - s->put(); - else + if (auto session = local_connection->get_priv(); !session) { hb_back_server_messenger->ms_deliver_handle_fast_connect(local_connection); + } } entity_addr_t hb_front_addr = hb_front_server_messenger->get_myaddr(); @@ -5397,11 +5392,9 @@ void OSD::_send_boot() hb_front_server_messenger->set_addr_unknowns(hb_front_addr); dout(10) << " assuming hb_front_addr ip matches client_addr" << dendl; } else { - Session *s = static_cast(local_connection->get_priv()); - if (s) - s->put(); - else + if (auto session = local_connection->get_priv(); !session) { hb_front_server_messenger->ms_deliver_handle_fast_connect(local_connection); + } } MOSDBoot *mboot = new MOSDBoot(superblock, get_osdmap_epoch(), service.get_boot_epoch(), @@ -5614,7 +5607,8 @@ void OSD::handle_command(MMonCommand *m) void OSD::handle_command(MCommand *m) { ConnectionRef con = m->get_connection(); - Session *session = static_cast(con->get_priv()); + auto priv = con->get_priv(); + auto session = static_cast(priv.get()); if (!session) { con->send_message(new MCommandReply(m, -EPERM)); m->put(); @@ -5622,7 +5616,7 @@ void OSD::handle_command(MCommand *m) } OSDCap& caps = session->caps; - session->put(); + priv.reset(); if (!caps.allow_all() || m->get_source().is_mon()) { con->send_message(new MCommandReply(m, -EPERM)); @@ -6345,7 +6339,7 @@ void OSD::maybe_share_map( op->check_send_map = false; } -void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap) +void OSD::dispatch_session_waiting(SessionRef session, OSDMapRef osdmap) { assert(session->session_dispatch_lock.is_locked()); @@ -6463,17 +6457,14 @@ void OSD::ms_fast_dispatch(Message *m) // legacy client, and this is an MOSDOp (the *only* fast dispatch // message that didn't have an explicit spg_t); we need to map // them to an spg_t while preserving delivery order. - Session *session = static_cast(m->get_connection()->get_priv()); - if (session) { - { - Mutex::Locker l(session->session_dispatch_lock); - op->get(); - session->waiting_on_map.push_back(*op); - OSDMapRef nextmap = service.get_nextmap_reserved(); - dispatch_session_waiting(session, nextmap); - service.release_map(nextmap); - } - session->put(); + auto priv = m->get_connection()->get_priv(); + if (auto session = static_cast(priv.get()); session) { + Mutex::Locker l{session->session_dispatch_lock}; + op->get(); + session->waiting_on_map.push_back(*op); + OSDMapRef nextmap = service.get_nextmap_reserved(); + dispatch_session_waiting(session, nextmap); + service.release_map(nextmap); } } OID_EVENT_TRACE_WITH_MSG(m, "MS_FAST_DISPATCH_END", false); @@ -6484,12 +6475,11 @@ void OSD::ms_fast_preprocess(Message *m) if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_OSD) { if (m->get_type() == CEPH_MSG_OSD_MAP) { MOSDMap *mm = static_cast(m); - Session *s = static_cast(m->get_connection()->get_priv()); - if (s) { + auto priv = m->get_connection()->get_priv(); + if (auto s = static_cast(priv.get()); s) { s->received_map_lock.lock(); s->received_map_epoch = mm->get_last(); s->received_map_lock.unlock(); - s->put(); } } } @@ -6562,12 +6552,14 @@ bool OSD::ms_verify_authorizer(Connection *con, int peer_type, } if (isvalid) { - Session *s = static_cast(con->get_priv()); + auto priv = con->get_priv(); + auto s = static_cast(priv.get()); if (!s) { - s = new Session(cct); - con->set_priv(s->get()); + s = new Session{cct}; + con->set_priv(RefCountedPtr{s, false}); s->con = con; - dout(10) << " new session " << s << " con=" << s->con << " addr=" << s->con->get_peer_addr() << dendl; + dout(10) << " new session " << s << " con=" << s->con + << " addr=" << con->get_peer_addr() << dendl; } s->entity_name = name; @@ -6594,8 +6586,6 @@ bool OSD::ms_verify_authorizer(Connection *con, int peer_type, isvalid = false; } } - - s->put(); } return true; } @@ -7131,18 +7121,16 @@ void OSD::handle_osd_map(MOSDMap *m) return; } - Session *session = static_cast(m->get_connection()->get_priv()); - if (session && !(session->entity_name.is_mon() || + auto priv = m->get_connection()->get_priv(); + if (auto session = static_cast(priv.get()); + session && !(session->entity_name.is_mon() || session->entity_name.is_osd())) { //not enough perms! dout(10) << "got osd map from Session " << session << " which we can't take maps from (not a mon or osd)" << dendl; m->put(); - session->put(); return; } - if (session) - session->put(); // share with the objecter if (!is_preboot()) @@ -7981,15 +7969,15 @@ bool OSD::require_same_peer_instance(const Message *m, OSDMapRef& map, << dendl; ConnectionRef con = m->get_connection(); con->mark_down(); - Session *s = static_cast(con->get_priv()); - if (s) { + auto priv = con->get_priv(); + if (auto s = static_cast(priv.get()); s) { if (!is_fast_dispatch) s->session_dispatch_lock.Lock(); clear_session_waiting_on_map(s); - con->set_priv(NULL); // break ref <-> session cycle, if any + con->set_priv(nullptr); // break ref <-> session cycle, if any + s->con.reset(); if (!is_fast_dispatch) s->session_dispatch_lock.Unlock(); - s->put(); } return false; } @@ -8851,11 +8839,9 @@ void OSD::dequeue_op( logger->tinc(l_osd_op_before_dequeue_op_lat, latency); - Session *session = static_cast( - op->get_req()->get_connection()->get_priv()); - if (session) { + auto priv = op->get_req()->get_connection()->get_priv(); + if (auto session = static_cast(priv.get()); session) { maybe_share_map(session, op, pg->get_osdmap()); - session->put(); } if (pg->is_deleting()) @@ -9807,11 +9793,9 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) << ", dropping " << qi << dendl; // share map with client? if (boost::optional _op = qi.maybe_get_op()) { - Session *session = static_cast( - (*_op)->get_req()->get_connection()->get_priv()); - if (session) { + auto priv = (*_op)->get_req()->get_connection()->get_priv(); + if (auto session = static_cast(priv.get()); session) { osd->maybe_share_map(session, *_op, sdata->shard_osdmap); - session->put(); } } unsigned pushes_to_free = qi.get_reserved_pushes(); diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 7f35605e80cc..bcc6a7ab19bd 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -1435,44 +1435,37 @@ private: // -- sessions -- private: - void dispatch_session_waiting(Session *session, OSDMapRef osdmap); + void dispatch_session_waiting(SessionRef session, OSDMapRef osdmap); void maybe_share_map(Session *session, OpRequestRef op, OSDMapRef osdmap); Mutex session_waiting_lock; - set session_waiting_for_map; + set session_waiting_for_map; /// Caller assumes refs for included Sessions - void get_sessions_waiting_for_map(set *out) { + void get_sessions_waiting_for_map(set *out) { Mutex::Locker l(session_waiting_lock); out->swap(session_waiting_for_map); } - void register_session_waiting_on_map(Session *session) { + void register_session_waiting_on_map(SessionRef session) { Mutex::Locker l(session_waiting_lock); - if (session_waiting_for_map.insert(session).second) { - session->get(); - } + session_waiting_for_map.insert(session); } - void clear_session_waiting_on_map(Session *session) { + void clear_session_waiting_on_map(SessionRef session) { Mutex::Locker l(session_waiting_lock); - set::iterator i = session_waiting_for_map.find(session); - if (i != session_waiting_for_map.end()) { - (*i)->put(); - session_waiting_for_map.erase(i); - } + session_waiting_for_map.erase(session); } void dispatch_sessions_waiting_on_map() { - set sessions_to_check; + set sessions_to_check; get_sessions_waiting_for_map(&sessions_to_check); - for (set::iterator i = sessions_to_check.begin(); + for (auto i = sessions_to_check.begin(); i != sessions_to_check.end(); sessions_to_check.erase(i++)) { - (*i)->session_dispatch_lock.Lock(); - dispatch_session_waiting(*i, osdmap); - (*i)->session_dispatch_lock.Unlock(); - (*i)->put(); + Mutex::Locker l{(*i)->session_dispatch_lock}; + SessionRef session = *i; + dispatch_session_waiting(session, osdmap); } } - void session_handle_reset(Session *session) { + void session_handle_reset(SessionRef session) { Mutex::Locker l(session->session_dispatch_lock); clear_session_waiting_on_map(session); diff --git a/src/osd/PG.cc b/src/osd/PG.cc index c2d6092fb6fc..346d25d81de3 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -2091,13 +2091,14 @@ bool PG::op_has_sufficient_caps(OpRequestRef& op) const MOSDOp *req = static_cast(op->get_req()); - Session *session = static_cast(req->get_connection()->get_priv()); + auto priv = req->get_connection()->get_priv(); + auto session = static_cast(priv.get()); if (!session) { dout(0) << "op_has_sufficient_caps: no session for op " << *req << dendl; return false; } OSDCap& caps = session->caps; - session->put(); + priv.reset(); const string &key = req->get_hobj().get_key().empty() ? req->get_oid().name : diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index fcb0cdf22a0f..e9eed66be5ea 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -1682,10 +1682,9 @@ void PrimaryLogPG::get_src_oloc(const object_t& oid, const object_locator_t& olo void PrimaryLogPG::handle_backoff(OpRequestRef& op) { const MOSDBackoff *m = static_cast(op->get_req()); - SessionRef session = static_cast(m->get_connection()->get_priv()); + SessionRef session{static_cast(m->get_connection()->get_priv().get())}; if (!session) return; // drop it. - session->put(); // get_priv takes a ref, and so does the SessionRef hobject_t begin = info.pgid.pgid.get_hobj_start(); hobject_t end = info.pgid.pgid.get_hobj_end(pool.info.get_pg_num()); if (begin < m->begin) { @@ -1734,10 +1733,9 @@ void PrimaryLogPG::do_request( const Message *m = op->get_req(); int msg_type = m->get_type(); if (m->get_connection()->has_feature(CEPH_FEATURE_RADOS_BACKOFF)) { - SessionRef session = static_cast(m->get_connection()->get_priv()); + SessionRef session{static_cast(m->get_connection()->get_priv().get())}; if (!session) return; // drop it. - session->put(); // get_priv takes a ref, and so does the SessionRef if (msg_type == CEPH_MSG_OSD_OP) { if (session->check_backoff(cct, info.pgid, @@ -1926,12 +1924,11 @@ void PrimaryLogPG::do_op(OpRequestRef& op) m->get_connection()->has_feature(CEPH_FEATURE_RADOS_BACKOFF); SessionRef session; if (can_backoff) { - session = static_cast(m->get_connection()->get_priv()); + session = static_cast(m->get_connection()->get_priv().get()); if (!session.get()) { dout(10) << __func__ << " no session" << dendl; return; } - session->put(); // get_priv() takes a ref, and so does the intrusive_ptr if (session->check_backoff(cct, info.pgid, head, m)) { return; @@ -8106,10 +8103,9 @@ void PrimaryLogPG::do_osd_op_effects(OpContext *ctx, const ConnectionRef& conn) assert(conn); - boost::intrusive_ptr session((Session *)conn->get_priv()); - if (!session.get()) + auto session = conn->get_priv(); + if (!session) return; - session->put(); // get_priv() takes a ref, and so does the intrusive_ptr for (list >::iterator i = ctx->watch_connects.begin(); i != ctx->watch_connects.end(); diff --git a/src/osd/Watch.cc b/src/osd/Watch.cc index ef91e9e427bb..d07af3faf0c2 100644 --- a/src/osd/Watch.cc +++ b/src/osd/Watch.cc @@ -367,10 +367,11 @@ void Watch::connect(ConnectionRef con, bool _will_ping) dout(10) << __func__ << " con " << con << dendl; conn = con; will_ping = _will_ping; - Session* sessionref(static_cast(con->get_priv())); - if (sessionref) { + auto priv = con->get_priv(); + if (priv) { + auto sessionref = static_cast(priv.get()); sessionref->wstate.addWatch(self.lock()); - sessionref->put(); + priv.reset(); for (map::iterator i = in_progress_notifies.begin(); i != in_progress_notifies.end(); ++i) { @@ -413,10 +414,9 @@ void Watch::discard_state() unregister_cb(); discarded = true; if (conn) { - Session* sessionref(static_cast(conn->get_priv())); - if (sessionref) { - sessionref->wstate.removeWatch(self.lock()); - sessionref->put(); + if (auto priv = conn->get_priv(); priv) { + auto session = static_cast(priv.get()); + session->wstate.removeWatch(self.lock()); } conn = ConnectionRef(); } diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 5b06b8269a52..62accf4ed4a0 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -1802,7 +1802,7 @@ int Objecter::_get_session(int osd, OSDSession **session, shunique_lock& sul) OSDSession *s = new OSDSession(cct, osd); osd_sessions[osd] = s; s->con = messenger->get_connection(osdmap->get_inst(osd)); - s->con->set_priv(s->get()); + s->con->set_priv(RefCountedPtr{s}); logger->inc(l_osdc_osd_session_open); logger->set(l_osdc_osd_sessions, osd_sessions.size()); s->get(); @@ -1845,7 +1845,7 @@ void Objecter::_reopen_session(OSDSession *s) logger->inc(l_osdc_osd_session_close); } s->con = messenger->get_connection(inst); - s->con->set_priv(s->get()); + s->con->set_priv(RefCountedPtr{s}); s->incarnation++; logger->inc(l_osdc_osd_session_open); } @@ -3364,12 +3364,10 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) } ConnectionRef con = m->get_connection(); - OSDSession *s = static_cast(con->get_priv()); + auto priv = con->get_priv(); + auto s = static_cast(priv.get()); if (!s || s->con != con) { ldout(cct, 7) << __func__ << " no session on con " << con << dendl; - if (s) { - s->put(); - } m->put(); return; } @@ -3383,7 +3381,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) " onnvram" : " ack")) << " ... stray" << dendl; sl.unlock(); - s->put(); m->put(); return; } @@ -3406,7 +3403,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) } _session_op_remove(s, op); sl.unlock(); - s->put(); _op_submit(op, sul, NULL); m->put(); @@ -3422,7 +3418,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) << op->session->con->get_peer_addr() << dendl; m->put(); sl.unlock(); - s->put(); return; } } else { @@ -3441,7 +3436,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) num_in_flight--; _session_op_remove(s, op); sl.unlock(); - s->put(); // FIXME: two redirects could race and reorder @@ -3460,7 +3454,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) num_in_flight--; _session_op_remove(s, op); sl.unlock(); - s->put(); op->tid = 0; op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS | @@ -3554,7 +3547,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) } m->put(); - s->put(); } void Objecter::handle_osd_backoff(MOSDBackoff *m) @@ -3567,17 +3559,15 @@ void Objecter::handle_osd_backoff(MOSDBackoff *m) } ConnectionRef con = m->get_connection(); - OSDSession *s = static_cast(con->get_priv()); + auto priv = con->get_priv(); + auto s = static_cast(priv.get()); if (!s || s->con != con) { ldout(cct, 7) << __func__ << " no session on con " << con << dendl; - if (s) - s->put(); m->put(); return; } get_session(s); - s->put(); // from get_priv() above OSDSession::unique_lock sl(s->lock); @@ -4429,7 +4419,8 @@ bool Objecter::ms_handle_reset(Connection *con) if (!initialized) return false; if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) { - OSDSession *session = static_cast(con->get_priv()); + auto priv = con->get_priv(); + auto session = static_cast(priv.get()); if (session) { ldout(cct, 1) << "ms_handle_reset " << con << " session " << session << " osd." << session->osd << dendl; @@ -4446,7 +4437,6 @@ bool Objecter::ms_handle_reset(Connection *con) _linger_ops_resend(lresend, wl); wl.unlock(); maybe_request_map(); - session->put(); } return true; } @@ -4763,12 +4753,11 @@ void Objecter::handle_command_reply(MCommandReply *m) } ConnectionRef con = m->get_connection(); - OSDSession *s = static_cast(con->get_priv()); + auto priv = con->get_priv(); + auto s = static_cast(priv.get()); if (!s || s->con != con) { ldout(cct, 7) << __func__ << " no session on con " << con << dendl; m->put(); - if (s) - s->put(); return; } @@ -4779,7 +4768,6 @@ void Objecter::handle_command_reply(MCommandReply *m) << " not found" << dendl; m->put(); sl.unlock(); - s->put(); return; } @@ -4792,7 +4780,6 @@ void Objecter::handle_command_reply(MCommandReply *m) << dendl; m->put(); sl.unlock(); - s->put(); return; } if (c->poutbl) { @@ -4806,7 +4793,6 @@ void Objecter::handle_command_reply(MCommandReply *m) sul.unlock(); m->put(); - s->put(); } void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid) diff --git a/src/test/mon/test_mon_workloadgen.cc b/src/test/mon/test_mon_workloadgen.cc index ed7012339f7d..7306ba334698 100644 --- a/src/test/mon/test_mon_workloadgen.cc +++ b/src/test/mon/test_mon_workloadgen.cc @@ -898,11 +898,7 @@ class OSDStub : public TestStub bool ms_handle_reset(Connection *con) override { dout(1) << __func__ << dendl; - Session *session = (Session *)con->get_priv(); - if (!session) - return false; - session->put(); - return true; + return con->get_priv().get(); } bool ms_handle_refused(Connection *con) override { diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index a027aa6fcc9a..772c5a0200ca 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -115,32 +115,30 @@ class FakeDispatcher : public Dispatcher { void ms_handle_fast_connect(Connection *con) override { lock.Lock(); lderr(g_ceph_context) << __func__ << " " << con << dendl; - Session *s = static_cast(con->get_priv()); + auto s = con->get_priv(); if (!s) { - s = new Session(con); - con->set_priv(s->get()); - lderr(g_ceph_context) << __func__ << " con: " << con << " count: " << s->count << dendl; + auto session = new Session(con); + con->set_priv(RefCountedPtr{session, false}); + lderr(g_ceph_context) << __func__ << " con: " << con + << " count: " << session->count << dendl; } - s->put(); got_connect = true; cond.Signal(); lock.Unlock(); } void ms_handle_fast_accept(Connection *con) override { - Session *s = static_cast(con->get_priv()); - if (!s) { - s = new Session(con); - con->set_priv(s->get()); + if (!con->get_priv()) { + con->set_priv(RefCountedPtr{new Session(con), false}); } - s->put(); } bool ms_dispatch(Message *m) override { - Session *s = static_cast(m->get_connection()->get_priv()); + auto priv = m->get_connection()->get_priv(); + auto s = static_cast(priv.get()); if (!s) { s = new Session(m->get_connection()); - m->get_connection()->set_priv(s->get()); + priv.reset(s, false); + m->get_connection()->set_priv(priv); } - s->put(); s->count++; lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl; if (is_server) { @@ -155,22 +153,20 @@ class FakeDispatcher : public Dispatcher { bool ms_handle_reset(Connection *con) override { Mutex::Locker l(lock); lderr(g_ceph_context) << __func__ << " " << con << dendl; - Session *s = static_cast(con->get_priv()); - if (s) { - s->con.reset(NULL); // break con <-> session ref cycle - con->set_priv(NULL); // break ref <-> session cycle, if any - s->put(); + auto priv = con->get_priv(); + if (auto s = static_cast(priv.get()); s) { + s->con.reset(); // break con <-> session ref cycle + con->set_priv(nullptr); // break ref <-> session cycle, if any } return true; } void ms_handle_remote_reset(Connection *con) override { Mutex::Locker l(lock); lderr(g_ceph_context) << __func__ << " " << con << dendl; - Session *s = static_cast(con->get_priv()); - if (s) { - s->con.reset(NULL); // break con <-> session ref cycle - con->set_priv(NULL); // break ref <-> session cycle, if any - s->put(); + auto priv = con->get_priv(); + if (auto s = static_cast(priv.get()); s) { + s->con.reset(); // break con <-> session ref cycle + con->set_priv(nullptr); // break ref <-> session cycle, if any } got_remote_reset = true; cond.Signal(); @@ -179,12 +175,13 @@ class FakeDispatcher : public Dispatcher { return false; } void ms_fast_dispatch(Message *m) override { - Session *s = static_cast(m->get_connection()->get_priv()); + auto priv = m->get_connection()->get_priv(); + auto s = static_cast(priv.get()); if (!s) { s = new Session(m->get_connection()); - m->get_connection()->set_priv(s->get()); + priv.reset(s, false); + m->get_connection()->set_priv(priv); } - s->put(); s->count++; lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl; if (is_server) { @@ -238,7 +235,7 @@ TEST_P(MessengerTest, SimpleTest) { cli_dispatcher.got_new = false; } ASSERT_TRUE(conn->is_connected()); - ASSERT_TRUE((static_cast(conn->get_priv()))->get_count() == 1); + ASSERT_EQ(1, static_cast(conn->get_priv().get())->get_count()); ASSERT_TRUE(conn->peer_is_osd()); // 2. test rebind port @@ -257,7 +254,7 @@ TEST_P(MessengerTest, SimpleTest) { cli_dispatcher.cond.Wait(cli_dispatcher.lock); cli_dispatcher.got_new = false; } - ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); // 3. test markdown connection conn->mark_down(); @@ -284,7 +281,7 @@ TEST_P(MessengerTest, SimpleTest) { cli_dispatcher.got_new = false; } srv_dispatcher.loopback = false; - ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); client_msgr->shutdown(); client_msgr->wait(); server_msgr->shutdown(); @@ -311,12 +308,12 @@ TEST_P(MessengerTest, NameAddrTest) { cli_dispatcher.cond.Wait(cli_dispatcher.lock); cli_dispatcher.got_new = false; } - ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); ASSERT_TRUE(conn->get_peer_addr() == server_msgr->get_myaddr()); ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst()); // Make should server_conn is the one we already accepted from client, // so it means client_msgr has the same addr when server connection has - ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); server_msgr->shutdown(); client_msgr->shutdown(); server_msgr->wait(); @@ -372,7 +369,7 @@ TEST_P(MessengerTest, FeatureTest) { cli_dispatcher.cond.Wait(cli_dispatcher.lock); cli_dispatcher.got_new = false; } - ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); server_msgr->shutdown(); client_msgr->shutdown(); @@ -403,7 +400,7 @@ TEST_P(MessengerTest, TimeoutTest) { cli_dispatcher.got_new = false; } ASSERT_TRUE(conn->is_connected()); - ASSERT_TRUE((static_cast(conn->get_priv()))->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); ASSERT_TRUE(conn->peer_is_osd()); // 2. wait for idle @@ -444,12 +441,12 @@ TEST_P(MessengerTest, StatefulTest) { cli_dispatcher.cond.Wait(cli_dispatcher.lock); cli_dispatcher.got_new = false; } - ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); conn->mark_down(); ASSERT_FALSE(conn->is_connected()); ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst()); // don't lose state - ASSERT_TRUE(static_cast(server_conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(server_conn->get_priv().get())->get_count()); srv_dispatcher.got_new = false; conn = client_msgr->get_connection(server_msgr->get_myinst()); @@ -461,7 +458,7 @@ TEST_P(MessengerTest, StatefulTest) { cli_dispatcher.cond.Wait(cli_dispatcher.lock); cli_dispatcher.got_new = false; } - ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); server_conn = server_msgr->get_connection(client_msgr->get_myinst()); { Mutex::Locker l(srv_dispatcher.lock); @@ -502,9 +499,9 @@ TEST_P(MessengerTest, StatefulTest) { cli_dispatcher.got_new = false; } // resetcheck happen - ASSERT_EQ(1U, static_cast(conn->get_priv())->get_count()); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); server_conn = server_msgr->get_connection(client_msgr->get_myinst()); - ASSERT_EQ(1U, static_cast(server_conn->get_priv())->get_count()); + ASSERT_EQ(1U, static_cast(server_conn->get_priv().get())->get_count()); cli_dispatcher.got_remote_reset = false; server_msgr->shutdown(); @@ -539,7 +536,7 @@ TEST_P(MessengerTest, StatelessTest) { cli_dispatcher.cond.Wait(cli_dispatcher.lock); cli_dispatcher.got_new = false; } - ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); conn->mark_down(); ASSERT_FALSE(conn->is_connected()); @@ -553,7 +550,7 @@ TEST_P(MessengerTest, StatelessTest) { cli_dispatcher.cond.Wait(cli_dispatcher.lock); cli_dispatcher.got_new = false; } - ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst()); // server lose state { @@ -561,7 +558,7 @@ TEST_P(MessengerTest, StatelessTest) { while (!srv_dispatcher.got_new) srv_dispatcher.cond.Wait(srv_dispatcher.lock); } - ASSERT_EQ(1U, static_cast(server_conn->get_priv())->get_count()); + ASSERT_EQ(1U, static_cast(server_conn->get_priv().get())->get_count()); // 2. test for client lossy server_conn->mark_down(); @@ -578,7 +575,7 @@ TEST_P(MessengerTest, StatelessTest) { cli_dispatcher.cond.Wait(cli_dispatcher.lock); cli_dispatcher.got_new = false; } - ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); server_msgr->shutdown(); client_msgr->shutdown(); @@ -612,7 +609,7 @@ TEST_P(MessengerTest, ClientStandbyTest) { cli_dispatcher.cond.Wait(cli_dispatcher.lock); cli_dispatcher.got_new = false; } - ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst()); ASSERT_FALSE(cli_dispatcher.got_remote_reset); cli_dispatcher.got_connect = false; @@ -643,9 +640,9 @@ TEST_P(MessengerTest, ClientStandbyTest) { cli_dispatcher.cond.Wait(cli_dispatcher.lock); cli_dispatcher.got_new = false; } - ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); server_conn = server_msgr->get_connection(client_msgr->get_myinst()); - ASSERT_TRUE(static_cast(server_conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(server_conn->get_priv().get())->get_count()); server_msgr->shutdown(); client_msgr->shutdown(); @@ -678,7 +675,7 @@ TEST_P(MessengerTest, AuthTest) { cli_dispatcher.got_new = false; } ASSERT_TRUE(conn->is_connected()); - ASSERT_TRUE((static_cast(conn->get_priv()))->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); // 2. mix auth g_ceph_context->_conf->set_val("auth_cluster_required", "none"); @@ -696,7 +693,7 @@ TEST_P(MessengerTest, AuthTest) { cli_dispatcher.got_new = false; } ASSERT_TRUE(conn->is_connected()); - ASSERT_TRUE((static_cast(conn->get_priv()))->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); server_msgr->shutdown(); client_msgr->shutdown();