From: Kefu Chai Date: Tue, 29 May 2018 07:51:07 +0000 (+0800) Subject: mds,osd,mon,msg: use intrusive_ptr for holding Connection::priv X-Git-Tag: v12.2.13~128^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b9d5bc3bdd1eab70ae5284758fd31c4c7cb05b9e;p=ceph.git mds,osd,mon,msg: use intrusive_ptr for holding Connection::priv See-also: http://tracker.ceph.com/issues/20924 Signed-off-by: Kefu Chai Signed-off-by: Samuel Just (cherry picked from commit 72883956c26fdc4345324e9b27b45c3dfac17fa2) Conflicts: src/osd/OSD.cc src/osdc/Objecter.cc --- diff --git a/src/common/RefCountedObj.h b/src/common/RefCountedObj.h index 9c0dad1d302f..325304c1441c 100644 --- a/src/common/RefCountedObj.h +++ b/src/common/RefCountedObj.h @@ -20,6 +20,8 @@ #include "common/ceph_context.h" #include "common/valgrind.h" +#include + // re-include our assert to clobber the system one; fix dout: #include "include/assert.h" @@ -164,4 +166,6 @@ struct RefCountedWaitObject { void intrusive_ptr_add_ref(const RefCountedObject *p); void intrusive_ptr_release(const RefCountedObject *p); +using RefCountedPtr = boost::intrusive_ptr; + #endif diff --git a/src/mds/MDSDaemon.cc b/src/mds/MDSDaemon.cc index e312bf7cc227..a16c8caf86db 100644 --- a/src/mds/MDSDaemon.cc +++ b/src/mds/MDSDaemon.cc @@ -590,7 +590,8 @@ void MDSDaemon::send_command_reply(MCommand *m, MDSRank *mds_rank, int r, bufferlist outbl, boost::string_view outs) { - Session *session = static_cast(m->get_connection()->get_priv()); + auto priv = m->get_connection()->get_priv(); + auto session = static_cast(priv.get()); assert(session != NULL); // If someone is using a closed session for sending commands (e.g. // the ceph CLI) then we should feel free to clean up this connection @@ -606,7 +607,7 @@ void MDSDaemon::send_command_reply(MCommand *m, MDSRank *mds_rank, assert(session->is_closed()); session->connection->mark_disposable(); } - session->put(); + priv.reset(); MCommandReply *reply = new MCommandReply(r, outs); reply->set_tid(m->get_tid()); @@ -617,7 +618,8 @@ void MDSDaemon::send_command_reply(MCommand *m, MDSRank *mds_rank, /* This function DOES put the passed message before returning*/ void MDSDaemon::handle_command(MCommand *m) { - Session *session = static_cast(m->get_connection()->get_priv()); + auto priv = m->get_connection()->get_priv(); + auto session = static_cast(priv.get()); assert(session != NULL); int r = 0; @@ -645,7 +647,7 @@ void MDSDaemon::handle_command(MCommand *m) } else { r = _handle_command(cmdmap, m, &outbl, &outs, &run_after, &need_reply); } - session->put(); + priv.reset(); if (need_reply) { send_command_reply(m, mds_rank, r, outbl, outs); @@ -1254,14 +1256,13 @@ bool MDSDaemon::ms_handle_reset(Connection *con) if (beacon.get_want_state() == CEPH_MDS_STATE_DNE) return false; - Session *session = static_cast(con->get_priv()); - if (session) { + auto priv = con->get_priv(); + if (auto session = static_cast(priv.get()); session) { if (session->is_closed()) { dout(3) << "ms_handle_reset closing connection for session " << session->info.inst << dendl; con->mark_down(); - con->set_priv(NULL); + con->set_priv(nullptr); } - session->put(); } else { con->mark_down(); } @@ -1283,14 +1284,13 @@ void MDSDaemon::ms_handle_remote_reset(Connection *con) if (beacon.get_want_state() == CEPH_MDS_STATE_DNE) return; - Session *session = static_cast(con->get_priv()); - if (session) { + auto priv = con->get_priv(); + if (auto session = static_cast(priv.get()); session) { if (session->is_closed()) { dout(3) << "ms_handle_remote_reset closing connection for session " << session->info.inst << dendl; con->mark_down(); - con->set_priv(NULL); + con->set_priv(nullptr); } - session->put(); } } @@ -1364,7 +1364,7 @@ bool MDSDaemon::ms_verify_authorizer(Connection *con, int peer_type, s->info.inst.addr = con->get_peer_addr(); s->info.inst.name = n; dout(10) << " new session " << s << " for " << s->info.inst << " con " << con << dendl; - con->set_priv(s); + con->set_priv(RefCountedPtr{s, false}); s->connection = con; if (mds_rank) { mds_rank->kick_waiters_for_any_client_connection(); @@ -1372,7 +1372,7 @@ bool MDSDaemon::ms_verify_authorizer(Connection *con, int peer_type, } else { dout(10) << " existing session " << s << " for " << s->info.inst << " existing con " << s->connection << ", new/authorizing con " << con << dendl; - con->set_priv(s->get()); + con->set_priv(RefCountedPtr{s, false}); @@ -1428,7 +1428,8 @@ void MDSDaemon::ms_handle_accept(Connection *con) return; } - Session *s = static_cast(con->get_priv()); + auto priv = con->get_priv(); + auto s = static_cast(priv.get()); dout(10) << "ms_handle_accept " << con->get_peer_addr() << " con " << con << " session " << s << dendl; if (s) { if (s->connection != con) { @@ -1441,7 +1442,6 @@ void MDSDaemon::ms_handle_accept(Connection *con) s->preopen_out_queue.pop_front(); } } - s->put(); } } diff --git a/src/mds/MDSRank.cc b/src/mds/MDSRank.cc index dbc5a7ad8610..d2b508b56806 100644 --- a/src/mds/MDSRank.cc +++ b/src/mds/MDSRank.cc @@ -964,7 +964,7 @@ void MDSRank::ProgressThread::shutdown() bool MDSRankDispatcher::ms_dispatch(Message *m) { if (m->get_source().is_client()) { - Session *session = static_cast(m->get_connection()->get_priv()); + Session *session = static_cast(m->get_connection()->get_priv().get()); if (session) session->last_seen = Session::clock::now(); } @@ -1294,9 +1294,9 @@ bool MDSRank::is_stale_message(Message *m) const Session *MDSRank::get_session(Message *m) { - Session *session = static_cast(m->get_connection()->get_priv()); + // do not carry ref + auto session = static_cast(m->get_connection()->get_priv().get()); if (session) { - session->put(); // do not carry ref dout(20) << "get_session have " << session << " " << session->info.inst << " state " << session->get_state_name() << dendl; // Check if we've imported an open session since (new sessions start closed) @@ -1412,9 +1412,9 @@ void MDSRank::send_message_client_counted(Message *m, client_t client) void MDSRank::send_message_client_counted(Message *m, Connection *connection) { - Session *session = static_cast(connection->get_priv()); + // do not carry ref + auto session = static_cast(m->get_connection()->get_priv().get()); if (session) { - session->put(); // do not carry ref send_message_client_counted(m, session); } else { dout(10) << "send_message_client_counted has no session for " << m->get_source_inst() << dendl; diff --git a/src/messages/PaxosServiceMessage.h b/src/messages/PaxosServiceMessage.h index a85ac9e6c4a6..2ac491b7a5c1 100644 --- a/src/messages/PaxosServiceMessage.h +++ b/src/messages/PaxosServiceMessage.h @@ -58,10 +58,8 @@ class PaxosServiceMessage : public Message { * normally. */ MonSession *get_session() { - MonSession *session = (MonSession *)get_connection()->get_priv(); - if (session) - session->put(); - return session; + auto priv = get_connection()->get_priv(); + return static_cast(priv.get()); } const char *get_type_name() const override { return "PaxosServiceMessage"; } diff --git a/src/mgr/DaemonServer.cc b/src/mgr/DaemonServer.cc index 256148e4e192..7641788d4518 100644 --- a/src/mgr/DaemonServer.cc +++ b/src/mgr/DaemonServer.cc @@ -246,11 +246,11 @@ bool DaemonServer::ms_get_authorizer(int dest_type, bool DaemonServer::ms_handle_reset(Connection *con) { if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) { - MgrSessionRef session(static_cast(con->get_priv())); + auto priv = con->get_priv(); + auto session = static_cast(priv.get()); if (!session) { return false; } - session->put(); // SessionRef takes a ref Mutex::Locker l(lock); dout(10) << "unregistering osd." << session->osd_id << " session " << session << " con " << con << dendl; @@ -465,12 +465,12 @@ bool DaemonServer::handle_report(MMgrReport *m) { Mutex::Locker l(lock); // kill session - MgrSessionRef session(static_cast(m->get_connection()->get_priv())); + auto priv = m->get_connection()->get_priv(); + auto session = static_cast(priv.get()); if (!session) { return false; } m->get_connection()->mark_down(); - session->put(); dout(10) << "unregistering osd." << session->osd_id << " session " << session << " con " << m->get_connection() << dendl; @@ -666,11 +666,11 @@ bool DaemonServer::handle_command(MCommand *m) std::shared_ptr cmdctx = std::make_shared(m); - MgrSessionRef session(static_cast(m->get_connection()->get_priv())); + auto priv = m->get_connection()->get_priv(); + auto session = static_cast(priv.get()); if (!session) { return true; } - session->put(); // SessionRef takes a ref if (session->inst.name == entity_name_t()) session->inst.name = m->get_source(); @@ -730,11 +730,11 @@ bool DaemonServer::handle_command(MCommand *m) bool is_allowed; if (!mgr_cmd) { MonCommand py_command = {"", "", "py", "rw", "cli"}; - is_allowed = _allowed_command(session.get(), py_command.module, + is_allowed = _allowed_command(session, py_command.module, prefix, cmdctx->cmdmap, param_str_map, &py_command); } else { // validate user's permissions for requested command - is_allowed = _allowed_command(session.get(), mgr_cmd->module, + is_allowed = _allowed_command(session, mgr_cmd->module, prefix, cmdctx->cmdmap, param_str_map, mgr_cmd); } if (!is_allowed) { diff --git a/src/mgr/DaemonState.cc b/src/mgr/DaemonState.cc index 9ba7cf926d89..cc7f16b2175c 100644 --- a/src/mgr/DaemonState.cc +++ b/src/mgr/DaemonState.cc @@ -126,8 +126,8 @@ void DaemonPerfCounters::update(MMgrReport *report) << report->packed.length() << " bytes of data" << dendl; // Retrieve session state - MgrSessionRef session(static_cast( - report->get_connection()->get_priv())); + auto priv = report->get_connection()->get_priv(); + auto session = static_cast(priv.get()); // Load any newly declared types for (const auto &t : report->declare_types) { diff --git a/src/mon/MonOpRequest.h b/src/mon/MonOpRequest.h index 7a43bff2f602..ac39ea99ddba 100644 --- a/src/mon/MonOpRequest.h +++ b/src/mon/MonOpRequest.h @@ -80,7 +80,7 @@ struct MonOpRequest : public TrackedOp { private: Message *request; utime_t dequeued_time; - MonSession *session; + RefCountedPtr session; ConnectionRef con; bool forwarded_to_leader; op_type_t op_type; @@ -90,7 +90,6 @@ private: req->get_recv_stamp().is_zero() ? ceph_clock_now() : req->get_recv_stamp()), request(req), - session(NULL), con(NULL), forwarded_to_leader(false), op_type(OP_TYPE_NONE) @@ -103,7 +102,7 @@ private: if (req) { con = req->get_connection(); if (con) { - session = static_cast(con->get_priv()); + session = con->get_priv(); } } } @@ -133,15 +132,10 @@ protected: public: ~MonOpRequest() override { request->put(); - // certain ops may not have a session (e.g., AUTH or PING) - if (session) - session->put(); } MonSession *get_session() const { - if (!session) - return NULL; - return session; + return static_cast(session.get()); } template @@ -158,16 +152,7 @@ public: ConnectionRef get_connection() { return con; } void set_session(MonSession *s) { - if (session) { - // we will be rewriting the existing session; drop the ref. - session->put(); - } - - if (s == NULL) { - session = NULL; - } else { - session = static_cast(s->get()); - } + session.reset(s); } bool is_src_mon() const { diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 76c3c77081b8..c0228380782a 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -3000,15 +3000,12 @@ void Monitor::handle_command(MonOpRequestRef op) return; } - MonSession *session = static_cast( - m->get_connection()->get_priv()); + auto priv = m->get_connection()->get_priv(); + auto session = static_cast(priv.get()); if (!session) { dout(5) << __func__ << " dropping stray message " << *m << dendl; return; } - BOOST_SCOPE_EXIT_ALL(=) { - session->put(); - }; if (m->cmd.empty()) { string rs = "No command supplied"; @@ -3834,7 +3831,7 @@ void Monitor::handle_forward(MonOpRequestRef op) ConnectionRef c(new AnonConnection(cct)); MonSession *s = new MonSession(req->get_source_inst(), static_cast(c.get())); - c->set_priv(s->get()); + c->set_priv(RefCountedPtr{s, false}); c->set_peer_addr(m->client.addr); c->set_peer_type(m->client.name.type()); c->set_features(m->con_features); @@ -3871,7 +3868,6 @@ void Monitor::handle_forward(MonOpRequestRef op) dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl; _ms_dispatch(req); - s->put(); } } @@ -4049,7 +4045,7 @@ void Monitor::remove_session(MonSession *s) routed_requests.erase(*p); } s->routed_request_tids.clear(); - s->con->set_priv(NULL); + s->con->set_priv(nullptr); session_map.remove_session(s); logger->set(l_mon_num_sessions, session_map.get_size()); logger->inc(l_mon_session_rm); @@ -4173,7 +4169,7 @@ void Monitor::_ms_dispatch(Message *m) s = session_map.new_session(m->get_source_inst(), con.get()); } assert(s); - con->set_priv(s->get()); + con->set_priv(RefCountedPtr{s, false}); dout(10) << __func__ << " new session " << s << " " << *s << " features 0x" << std::hex << s->con_features << std::dec << dendl; @@ -4188,7 +4184,6 @@ void Monitor::_ms_dispatch(Message *m) if (!s->caps.is_allow_all()) // but no need to repeatedly copy s->caps = *mon_caps; } - s->put(); } else { dout(20) << __func__ << " existing session " << s << " for " << s->inst << dendl; @@ -5084,12 +5079,13 @@ bool Monitor::ms_handle_reset(Connection *con) if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) return false; - MonSession *s = static_cast(con->get_priv()); + auto priv = con->get_priv(); + auto s = static_cast(priv.get()); if (!s) return false; // break any con <-> session ref cycle - s->con->set_priv(NULL); + s->con->set_priv(nullptr); if (is_shutdown()) return false; @@ -5101,7 +5097,6 @@ bool Monitor::ms_handle_reset(Connection *con) Mutex::Locker l(session_map_lock); remove_session(s); } - s->put(); return true; } diff --git a/src/msg/Connection.h b/src/msg/Connection.h index 94e934c55f13..99fa51944493 100644 --- a/src/msg/Connection.h +++ b/src/msg/Connection.h @@ -41,7 +41,7 @@ class Messenger; struct Connection : public RefCountedObject { mutable Mutex lock; Messenger *msgr; - RefCountedObject *priv; + RefCountedPtr priv; int peer_type; entity_addr_t peer_addr; utime_t last_keepalive, last_keepalive_ack; @@ -63,7 +63,6 @@ public: : RefCountedObject(cct, 0), lock("Connection::lock"), msgr(m), - priv(NULL), peer_type(-1), features(0), failed(false), @@ -72,24 +71,16 @@ public: ~Connection() override { //generic_dout(0) << "~Connection " << this << dendl; - if (priv) { - //generic_dout(0) << "~Connection " << this << " dropping priv " << priv << dendl; - priv->put(); - } } - void set_priv(RefCountedObject *o) { + void set_priv(const RefCountedPtr& o) { Mutex::Locker l(lock); - if (priv) - priv->put(); priv = o; } - RefCountedObject *get_priv() { + RefCountedPtr get_priv() { Mutex::Locker l(lock); - if (priv) - return priv->get(); - return NULL; + return priv; } /** diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 3a3ea22ebbc6..ea8ed5d30e85 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -4814,12 +4814,12 @@ void OSD::_add_heartbeat_peer(int p) return; hi = &heartbeat_peers[p]; hi->peer = p; - HeartbeatSession *s = new HeartbeatSession(p); + RefCountedPtr s{new HeartbeatSession{p}, false}; hi->con_back = cons.first.get(); - hi->con_back->set_priv(s->get()); + hi->con_back->set_priv(s); if (cons.second) { hi->con_front = cons.second.get(); - hi->con_front->set_priv(s->get()); + hi->con_front->set_priv(s); dout(10) << "_add_heartbeat_peer: new peer osd." << p << " " << hi->con_back->get_peer_addr() << " " << hi->con_front->get_peer_addr() @@ -4830,7 +4830,6 @@ void OSD::_add_heartbeat_peer(int p) << " " << hi->con_back->get_peer_addr() << dendl; } - s->put(); } else { hi = &i->second; } @@ -5274,13 +5273,13 @@ void OSD::heartbeat() bool OSD::heartbeat_reset(Connection *con) { Mutex::Locker l(heartbeat_lock); - HeartbeatSession *s = static_cast(con->get_priv()); + auto s = con->get_priv(); if (s) { if (is_stopping()) { - s->put(); return true; } - map::iterator p = heartbeat_peers.find(s->peer); + auto heartbeat_session = static_cast(s.get()); + auto p = heartbeat_peers.find(heartbeat_session->peer); if (p != heartbeat_peers.end() && (p->second.con_back == con || p->second.con_front == con)) { @@ -5297,10 +5296,10 @@ bool OSD::heartbeat_reset(Connection *con) pair newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch); if (newcon.first) { p->second.con_back = newcon.first.get(); - p->second.con_back->set_priv(s->get()); + p->second.con_back->set_priv(s); if (newcon.second) { p->second.con_front = newcon.second.get(); - p->second.con_front->set_priv(s->get()); + p->second.con_front->set_priv(s); } } else { dout(10) << "heartbeat_reset failed hb con " << con << " for osd." << p->second.peer @@ -5310,7 +5309,6 @@ bool OSD::heartbeat_reset(Connection *con) } else { dout(10) << "heartbeat_reset closing (old) failed hb con " << con << dendl; } - s->put(); } return true; } @@ -5879,10 +5877,11 @@ void OSD::ms_handle_fast_connect(Connection *con) { if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON && con->get_peer_type() != CEPH_ENTITY_TYPE_MGR) { - Session *s = static_cast(con->get_priv()); + auto priv = con->get_priv(); + auto s = static_cast(priv.get()); if (!s) { - s = new Session(cct); - con->set_priv(s->get()); + s = new Session{cct}; + con->set_priv(RefCountedPtr{s, false}); s->con = con; dout(10) << " new session (outgoing) " << s << " con=" << s->con << " addr=" << s->con->get_peer_addr() << dendl; @@ -5890,7 +5889,6 @@ void OSD::ms_handle_fast_connect(Connection *con) assert(con->get_peer_type() == CEPH_ENTITY_TYPE_OSD); s->entity_name.set_type(CEPH_ENTITY_TYPE_OSD); } - s->put(); } } @@ -5898,10 +5896,11 @@ void OSD::ms_handle_fast_accept(Connection *con) { if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON && con->get_peer_type() != CEPH_ENTITY_TYPE_MGR) { - Session *s = static_cast(con->get_priv()); + auto priv = con->get_priv(); + auto s = static_cast(priv.get()); if (!s) { - s = new Session(cct); - con->set_priv(s->get()); + s = new Session{cct}; + con->set_priv(RefCountedPtr{s, false}); s->con = con; dout(10) << "new session (incoming)" << s << " con=" << con << " addr=" << con->get_peer_addr() @@ -5909,23 +5908,23 @@ void OSD::ms_handle_fast_accept(Connection *con) assert(con->get_peer_type() == CEPH_ENTITY_TYPE_OSD); s->entity_name.set_type(CEPH_ENTITY_TYPE_OSD); } - s->put(); } } bool OSD::ms_handle_reset(Connection *con) { - Session *session = static_cast(con->get_priv()); + auto s = con->get_priv(); + auto session = static_cast(s.get()); dout(2) << "ms_handle_reset con " << con << " session " << session << dendl; if (!session) return false; session->wstate.reset(con); - session->con.reset(NULL); // break con <-> session ref cycle + session->con->set_priv(nullptr); + session->con.reset(); // break con <-> session ref cycle // note that we break session->con *before* the session_handle_reset // cleanup below. this avoids a race between us and // PG::add_backoff, Session::check_backoff, etc. - session_handle_reset(session); - session->put(); + session_handle_reset(SessionRef{session}); return true; } @@ -5934,7 +5933,8 @@ bool OSD::ms_handle_refused(Connection *con) if (!cct->_conf->osd_fast_fail_on_connection_refused) return false; - Session *session = static_cast(con->get_priv()); + auto priv = con->get_priv(); + auto session = static_cast(priv.get()); dout(2) << "ms_handle_refused con " << con << " session " << session << dendl; if (!session) return false; @@ -5956,7 +5956,6 @@ bool OSD::ms_handle_refused(Connection *con) } } } - session->put(); return true; } @@ -6115,11 +6114,9 @@ void OSD::_send_boot() cluster_messenger->set_addr_unknowns(cluster_addr); dout(10) << " assuming cluster_addr ip matches client_addr" << dendl; } else { - Session *s = static_cast(local_connection->get_priv()); - if (s) - s->put(); - else + if (auto session = local_connection->get_priv(); !session) { cluster_messenger->ms_deliver_handle_fast_connect(local_connection); + } } entity_addr_t hb_back_addr = hb_back_server_messenger->get_myaddr(); @@ -6131,11 +6128,9 @@ void OSD::_send_boot() hb_back_server_messenger->set_addr_unknowns(hb_back_addr); dout(10) << " assuming hb_back_addr ip matches cluster_addr" << dendl; } else { - Session *s = static_cast(local_connection->get_priv()); - if (s) - s->put(); - else + if (auto session = local_connection->get_priv(); !session) { hb_back_server_messenger->ms_deliver_handle_fast_connect(local_connection); + } } entity_addr_t hb_front_addr = hb_front_server_messenger->get_myaddr(); @@ -6147,11 +6142,9 @@ void OSD::_send_boot() hb_front_server_messenger->set_addr_unknowns(hb_front_addr); dout(10) << " assuming hb_front_addr ip matches client_addr" << dendl; } else { - Session *s = static_cast(local_connection->get_priv()); - if (s) - s->put(); - else + if (auto session = local_connection->get_priv(); !session) { hb_front_server_messenger->ms_deliver_handle_fast_connect(local_connection); + } } MOSDBoot *mboot = new MOSDBoot(superblock, get_osdmap_epoch(), service.get_boot_epoch(), @@ -6512,7 +6505,8 @@ void OSD::handle_command(MMonCommand *m) void OSD::handle_command(MCommand *m) { ConnectionRef con = m->get_connection(); - Session *session = static_cast(con->get_priv()); + auto priv = con->get_priv(); + auto session = static_cast(priv.get()); if (!session) { con->send_message(new MCommandReply(m, -EPERM)); m->put(); @@ -6520,7 +6514,7 @@ void OSD::handle_command(MCommand *m) } OSDCap& caps = session->caps; - session->put(); + priv.reset(); if (!caps.allow_all() || m->get_source().is_mon()) { con->send_message(new MCommandReply(m, -EPERM)); @@ -7186,7 +7180,7 @@ void OSD::maybe_share_map( op->check_send_map = false; } -void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap) +void OSD::dispatch_session_waiting(SessionRef session, OSDMapRef osdmap) { assert(session->session_dispatch_lock.is_locked()); @@ -7259,17 +7253,14 @@ void OSD::ms_fast_dispatch(Message *m) // legacy client, and this is an MOSDOp (the *only* fast dispatch // message that didn't have an explicit spg_t); we need to map // them to an spg_t while preserving delivery order. - Session *session = static_cast(m->get_connection()->get_priv()); - if (session) { - { - Mutex::Locker l(session->session_dispatch_lock); - op->get(); - session->waiting_on_map.push_back(*op); - OSDMapRef nextmap = service.get_nextmap_reserved(); - dispatch_session_waiting(session, nextmap); - service.release_map(nextmap); - } - session->put(); + auto priv = m->get_connection()->get_priv(); + if (auto session = static_cast(priv.get()); session) { + Mutex::Locker l{session->session_dispatch_lock}; + op->get(); + session->waiting_on_map.push_back(*op); + OSDMapRef nextmap = service.get_nextmap_reserved(); + dispatch_session_waiting(session, nextmap); + service.release_map(nextmap); } } OID_EVENT_TRACE_WITH_MSG(m, "MS_FAST_DISPATCH_END", false); @@ -7280,12 +7271,11 @@ void OSD::ms_fast_preprocess(Message *m) if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_OSD) { if (m->get_type() == CEPH_MSG_OSD_MAP) { MOSDMap *mm = static_cast(m); - Session *s = static_cast(m->get_connection()->get_priv()); - if (s) { + auto priv = m->get_connection()->get_priv(); + if (auto s = static_cast(priv.get()); s) { s->received_map_lock.lock(); s->received_map_epoch = mm->get_last(); s->received_map_lock.unlock(); - s->put(); } } } @@ -7360,12 +7350,14 @@ bool OSD::ms_verify_authorizer( } if (isvalid) { - Session *s = static_cast(con->get_priv()); + auto priv = con->get_priv(); + auto s = static_cast(priv.get()); if (!s) { - s = new Session(cct); - con->set_priv(s->get()); + s = new Session{cct}; + con->set_priv(RefCountedPtr{s, false}); s->con = con; - dout(10) << " new session " << s << " con=" << s->con << " addr=" << s->con->get_peer_addr() << dendl; + dout(10) << " new session " << s << " con=" << s->con + << " addr=" << con->get_peer_addr() << dendl; } s->entity_name = name; @@ -7387,8 +7379,6 @@ bool OSD::ms_verify_authorizer( else dout(10) << " session " << s << " " << s->entity_name << " failed to parse caps '" << str << "'" << dendl; } - - s->put(); } return true; } @@ -7900,18 +7890,16 @@ void OSD::handle_osd_map(MOSDMap *m) return; } - Session *session = static_cast(m->get_connection()->get_priv()); - if (session && !(session->entity_name.is_mon() || + auto priv = m->get_connection()->get_priv(); + if (auto session = static_cast(priv.get()); + session && !(session->entity_name.is_mon() || session->entity_name.is_osd())) { //not enough perms! dout(10) << "got osd map from Session " << session << " which we can't take maps from (not a mon or osd)" << dendl; m->put(); - session->put(); return; } - if (session) - session->put(); // share with the objecter if (!is_preboot()) @@ -8691,15 +8679,15 @@ bool OSD::require_same_peer_instance(const Message *m, OSDMapRef& map, << dendl; ConnectionRef con = m->get_connection(); con->mark_down(); - Session *s = static_cast(con->get_priv()); - if (s) { + auto priv = con->get_priv(); + if (auto s = static_cast(priv.get()); s) { if (!is_fast_dispatch) s->session_dispatch_lock.Lock(); clear_session_waiting_on_map(s); - con->set_priv(NULL); // break ref <-> session cycle, if any + con->set_priv(nullptr); // break ref <-> session cycle, if any + s->con.reset(); if (!is_fast_dispatch) s->session_dispatch_lock.Unlock(); - s->put(); } return false; } @@ -9871,11 +9859,9 @@ void OSD::dequeue_op( logger->tinc(l_osd_op_before_dequeue_op_lat, latency); - Session *session = static_cast( - op->get_req()->get_connection()->get_priv()); - if (session) { + auto priv = op->get_req()->get_connection()->get_priv(); + if (auto session = static_cast(priv.get()); session) { maybe_share_map(session, op, pg->get_osdmap()); - session->put(); } if (pg->deleting) @@ -10619,11 +10605,9 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) << " dropping " << *qi << dendl; // share map with client? if (boost::optional _op = qi->maybe_get_op()) { - Session *session = static_cast( - (*_op)->get_req()->get_connection()->get_priv()); - if (session) { - osd->maybe_share_map(session, *_op, sdata->waiting_for_pg_osdmap); - session->put(); + auto priv = (*_op)->get_req()->get_connection()->get_priv(); + if (auto session = static_cast(priv.get()); session) { + osd->maybe_share_map(session, *_op, sdata->shard_osdmap); } } unsigned pushes_to_free = qi->get_reserved_pushes(); diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 24b4200d1ecc..f2a71eb9a300 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -1400,44 +1400,37 @@ private: // -- sessions -- private: - void dispatch_session_waiting(Session *session, OSDMapRef osdmap); + void dispatch_session_waiting(SessionRef session, OSDMapRef osdmap); void maybe_share_map(Session *session, OpRequestRef op, OSDMapRef osdmap); Mutex session_waiting_lock; - set session_waiting_for_map; + set session_waiting_for_map; /// Caller assumes refs for included Sessions - void get_sessions_waiting_for_map(set *out) { + void get_sessions_waiting_for_map(set *out) { Mutex::Locker l(session_waiting_lock); out->swap(session_waiting_for_map); } - void register_session_waiting_on_map(Session *session) { + void register_session_waiting_on_map(SessionRef session) { Mutex::Locker l(session_waiting_lock); - if (session_waiting_for_map.insert(session).second) { - session->get(); - } + session_waiting_for_map.insert(session); } - void clear_session_waiting_on_map(Session *session) { + void clear_session_waiting_on_map(SessionRef session) { Mutex::Locker l(session_waiting_lock); - set::iterator i = session_waiting_for_map.find(session); - if (i != session_waiting_for_map.end()) { - (*i)->put(); - session_waiting_for_map.erase(i); - } + session_waiting_for_map.erase(session); } void dispatch_sessions_waiting_on_map() { - set sessions_to_check; + set sessions_to_check; get_sessions_waiting_for_map(&sessions_to_check); - for (set::iterator i = sessions_to_check.begin(); + for (auto i = sessions_to_check.begin(); i != sessions_to_check.end(); sessions_to_check.erase(i++)) { - (*i)->session_dispatch_lock.Lock(); - dispatch_session_waiting(*i, osdmap); - (*i)->session_dispatch_lock.Unlock(); - (*i)->put(); + Mutex::Locker l{(*i)->session_dispatch_lock}; + SessionRef session = *i; + dispatch_session_waiting(session, osdmap); } } - void session_handle_reset(Session *session) { + void session_handle_reset(SessionRef session) { Mutex::Locker l(session->session_dispatch_lock); clear_session_waiting_on_map(session); diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 26ccdeee574b..51b2d2c49734 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -1939,13 +1939,14 @@ bool PG::op_has_sufficient_caps(OpRequestRef& op) const MOSDOp *req = static_cast(op->get_req()); - Session *session = static_cast(req->get_connection()->get_priv()); + auto priv = req->get_connection()->get_priv(); + auto session = static_cast(priv.get()); if (!session) { dout(0) << "op_has_sufficient_caps: no session for op " << *req << dendl; return false; } OSDCap& caps = session->caps; - session->put(); + priv.reset(); const string &key = req->get_hobj().get_key().empty() ? req->get_oid().name : diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index d1b5953e22e9..f5b7091edbdb 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -1691,10 +1691,9 @@ void PrimaryLogPG::get_src_oloc(const object_t& oid, const object_locator_t& olo void PrimaryLogPG::handle_backoff(OpRequestRef& op) { const MOSDBackoff *m = static_cast(op->get_req()); - SessionRef session = static_cast(m->get_connection()->get_priv()); + SessionRef session{static_cast(m->get_connection()->get_priv().get())}; if (!session) return; // drop it. - session->put(); // get_priv takes a ref, and so does the SessionRef hobject_t begin = info.pgid.pgid.get_hobj_start(); hobject_t end = info.pgid.pgid.get_hobj_end(pool.info.get_pg_num()); if (begin < m->begin) { @@ -1742,10 +1741,9 @@ void PrimaryLogPG::do_request( // pg-wide backoffs const Message *m = op->get_req(); if (m->get_connection()->has_feature(CEPH_FEATURE_RADOS_BACKOFF)) { - SessionRef session = static_cast(m->get_connection()->get_priv()); + SessionRef session{static_cast(m->get_connection()->get_priv().get())}; if (!session) return; // drop it. - session->put(); // get_priv takes a ref, and so does the SessionRef if (op->get_req()->get_type() == CEPH_MSG_OSD_OP) { if (session->check_backoff(cct, info.pgid, @@ -1942,12 +1940,11 @@ void PrimaryLogPG::do_op(OpRequestRef& op) m->get_connection()->has_feature(CEPH_FEATURE_RADOS_BACKOFF); SessionRef session; if (can_backoff) { - session = static_cast(m->get_connection()->get_priv()); + session = static_cast(m->get_connection()->get_priv().get()); if (!session.get()) { dout(10) << __func__ << " no session" << dendl; return; } - session->put(); // get_priv() takes a ref, and so does the intrusive_ptr if (session->check_backoff(cct, info.pgid, head, m)) { return; @@ -7466,10 +7463,9 @@ void PrimaryLogPG::do_osd_op_effects(OpContext *ctx, const ConnectionRef& conn) assert(conn); - boost::intrusive_ptr session((Session *)conn->get_priv()); - if (!session.get()) + auto session = conn->get_priv(); + if (!session) return; - session->put(); // get_priv() takes a ref, and so does the intrusive_ptr for (list >::iterator i = ctx->watch_connects.begin(); i != ctx->watch_connects.end(); diff --git a/src/osd/Watch.cc b/src/osd/Watch.cc index 7ff9f99b2bfa..800b3c07b24d 100644 --- a/src/osd/Watch.cc +++ b/src/osd/Watch.cc @@ -369,10 +369,11 @@ void Watch::connect(ConnectionRef con, bool _will_ping) dout(10) << __func__ << " con " << con << dendl; conn = con; will_ping = _will_ping; - Session* sessionref(static_cast(con->get_priv())); - if (sessionref) { + auto priv = con->get_priv(); + if (priv) { + auto sessionref = static_cast(priv.get()); sessionref->wstate.addWatch(self.lock()); - sessionref->put(); + priv.reset(); for (map::iterator i = in_progress_notifies.begin(); i != in_progress_notifies.end(); ++i) { @@ -415,10 +416,9 @@ void Watch::discard_state() unregister_cb(); discarded = true; if (conn) { - Session* sessionref(static_cast(conn->get_priv())); - if (sessionref) { - sessionref->wstate.removeWatch(self.lock()); - sessionref->put(); + if (auto priv = conn->get_priv(); priv) { + auto session = static_cast(priv.get()); + session->wstate.removeWatch(self.lock()); } conn = ConnectionRef(); } diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index a16e6542187c..5149cdc918d1 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -1793,7 +1793,7 @@ int Objecter::_get_session(int osd, OSDSession **session, shunique_lock& sul) OSDSession *s = new OSDSession(cct, osd); osd_sessions[osd] = s; s->con = messenger->get_connection(osdmap->get_inst(osd)); - s->con->set_priv(s->get()); + s->con->set_priv(RefCountedPtr{s}); logger->inc(l_osdc_osd_session_open); logger->set(l_osdc_osd_sessions, osd_sessions.size()); s->get(); @@ -1837,7 +1837,7 @@ void Objecter::_reopen_session(OSDSession *s) logger->inc(l_osdc_osd_session_close); } s->con = messenger->get_connection(inst); - s->con->set_priv(s->get()); + s->con->set_priv(RefCountedPtr{s}); s->incarnation++; logger->inc(l_osdc_osd_session_open); } @@ -3358,12 +3358,10 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) } ConnectionRef con = m->get_connection(); - OSDSession *s = static_cast(con->get_priv()); + auto priv = con->get_priv(); + auto s = static_cast(priv.get()); if (!s || s->con != con) { ldout(cct, 7) << __func__ << " no session on con " << con << dendl; - if (s) { - s->put(); - } m->put(); return; } @@ -3377,7 +3375,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) " onnvram" : " ack")) << " ... stray" << dendl; sl.unlock(); - s->put(); m->put(); return; } @@ -3400,7 +3397,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) } _session_op_remove(s, op); sl.unlock(); - s->put(); _op_submit(op, sul, NULL); m->put(); @@ -3416,7 +3412,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) << op->session->con->get_peer_addr() << dendl; m->put(); sl.unlock(); - s->put(); return; } } else { @@ -3435,7 +3430,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) num_in_flight--; _session_op_remove(s, op); sl.unlock(); - s->put(); // FIXME: two redirects could race and reorder @@ -3456,7 +3450,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) num_in_flight--; _session_op_remove(s, op); sl.unlock(); - s->put(); op->tid = 0; op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS | @@ -3550,7 +3543,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) } m->put(); - s->put(); } void Objecter::handle_osd_backoff(MOSDBackoff *m) @@ -3563,17 +3555,15 @@ void Objecter::handle_osd_backoff(MOSDBackoff *m) } ConnectionRef con = m->get_connection(); - OSDSession *s = static_cast(con->get_priv()); + auto priv = con->get_priv(); + auto s = static_cast(priv.get()); if (!s || s->con != con) { ldout(cct, 7) << __func__ << " no session on con " << con << dendl; - if (s) - s->put(); m->put(); return; } get_session(s); - s->put(); // from get_priv() above OSDSession::unique_lock sl(s->lock); @@ -4423,7 +4413,8 @@ bool Objecter::ms_handle_reset(Connection *con) if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) { unique_lock wl(rwlock); - OSDSession *session = static_cast(con->get_priv()); + auto priv = con->get_priv(); + auto session = static_cast(priv.get()); if (session) { ldout(cct, 1) << "ms_handle_reset " << con << " session " << session << " osd." << session->osd << dendl; @@ -4442,7 +4433,6 @@ bool Objecter::ms_handle_reset(Connection *con) _linger_ops_resend(lresend, wl); wl.unlock(); maybe_request_map(); - session->put(); } return true; } @@ -4757,12 +4747,11 @@ void Objecter::handle_command_reply(MCommandReply *m) } ConnectionRef con = m->get_connection(); - OSDSession *s = static_cast(con->get_priv()); + auto priv = con->get_priv(); + auto s = static_cast(priv.get()); if (!s || s->con != con) { ldout(cct, 7) << __func__ << " no session on con " << con << dendl; m->put(); - if (s) - s->put(); return; } @@ -4773,7 +4762,6 @@ void Objecter::handle_command_reply(MCommandReply *m) << " not found" << dendl; m->put(); sl.unlock(); - s->put(); return; } @@ -4786,7 +4774,6 @@ void Objecter::handle_command_reply(MCommandReply *m) << dendl; m->put(); sl.unlock(); - s->put(); return; } if (c->poutbl) { @@ -4800,7 +4787,6 @@ void Objecter::handle_command_reply(MCommandReply *m) sul.unlock(); m->put(); - s->put(); } void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid) diff --git a/src/test/mon/test_mon_workloadgen.cc b/src/test/mon/test_mon_workloadgen.cc index 5984a26aa936..e8f95c28c0c0 100644 --- a/src/test/mon/test_mon_workloadgen.cc +++ b/src/test/mon/test_mon_workloadgen.cc @@ -897,11 +897,7 @@ class OSDStub : public TestStub bool ms_handle_reset(Connection *con) override { dout(1) << __func__ << dendl; - Session *session = (Session *)con->get_priv(); - if (!session) - return false; - session->put(); - return true; + return con->get_priv().get(); } bool ms_handle_refused(Connection *con) override { diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index a3c83e3ab72d..c40feb1bb5ae 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -115,32 +115,30 @@ class FakeDispatcher : public Dispatcher { void ms_handle_fast_connect(Connection *con) override { lock.Lock(); lderr(g_ceph_context) << __func__ << " " << con << dendl; - Session *s = static_cast(con->get_priv()); + auto s = con->get_priv(); if (!s) { - s = new Session(con); - con->set_priv(s->get()); - lderr(g_ceph_context) << __func__ << " con: " << con << " count: " << s->count << dendl; + auto session = new Session(con); + con->set_priv(RefCountedPtr{session, false}); + lderr(g_ceph_context) << __func__ << " con: " << con + << " count: " << session->count << dendl; } - s->put(); got_connect = true; cond.Signal(); lock.Unlock(); } void ms_handle_fast_accept(Connection *con) override { - Session *s = static_cast(con->get_priv()); - if (!s) { - s = new Session(con); - con->set_priv(s->get()); + if (!con->get_priv()) { + con->set_priv(RefCountedPtr{new Session(con), false}); } - s->put(); } bool ms_dispatch(Message *m) override { - Session *s = static_cast(m->get_connection()->get_priv()); + auto priv = m->get_connection()->get_priv(); + auto s = static_cast(priv.get()); if (!s) { s = new Session(m->get_connection()); - m->get_connection()->set_priv(s->get()); + priv.reset(s, false); + m->get_connection()->set_priv(priv); } - s->put(); s->count++; lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl; if (is_server) { @@ -155,22 +153,20 @@ class FakeDispatcher : public Dispatcher { bool ms_handle_reset(Connection *con) override { Mutex::Locker l(lock); lderr(g_ceph_context) << __func__ << " " << con << dendl; - Session *s = static_cast(con->get_priv()); - if (s) { - s->con.reset(NULL); // break con <-> session ref cycle - con->set_priv(NULL); // break ref <-> session cycle, if any - s->put(); + auto priv = con->get_priv(); + if (auto s = static_cast(priv.get()); s) { + s->con.reset(); // break con <-> session ref cycle + con->set_priv(nullptr); // break ref <-> session cycle, if any } return true; } void ms_handle_remote_reset(Connection *con) override { Mutex::Locker l(lock); lderr(g_ceph_context) << __func__ << " " << con << dendl; - Session *s = static_cast(con->get_priv()); - if (s) { - s->con.reset(NULL); // break con <-> session ref cycle - con->set_priv(NULL); // break ref <-> session cycle, if any - s->put(); + auto priv = con->get_priv(); + if (auto s = static_cast(priv.get()); s) { + s->con.reset(); // break con <-> session ref cycle + con->set_priv(nullptr); // break ref <-> session cycle, if any } got_remote_reset = true; cond.Signal(); @@ -179,12 +175,13 @@ class FakeDispatcher : public Dispatcher { return false; } void ms_fast_dispatch(Message *m) override { - Session *s = static_cast(m->get_connection()->get_priv()); + auto priv = m->get_connection()->get_priv(); + auto s = static_cast(priv.get()); if (!s) { s = new Session(m->get_connection()); - m->get_connection()->set_priv(s->get()); + priv.reset(s, false); + m->get_connection()->set_priv(priv); } - s->put(); s->count++; lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl; if (is_server) { @@ -239,7 +236,7 @@ TEST_P(MessengerTest, SimpleTest) { cli_dispatcher.got_new = false; } ASSERT_TRUE(conn->is_connected()); - ASSERT_TRUE((static_cast(conn->get_priv()))->get_count() == 1); + ASSERT_EQ(1, static_cast(conn->get_priv().get())->get_count()); ASSERT_TRUE(conn->peer_is_osd()); // 2. test rebind port @@ -258,7 +255,7 @@ TEST_P(MessengerTest, SimpleTest) { cli_dispatcher.cond.Wait(cli_dispatcher.lock); cli_dispatcher.got_new = false; } - ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); // 3. test markdown connection conn->mark_down(); @@ -285,7 +282,7 @@ TEST_P(MessengerTest, SimpleTest) { cli_dispatcher.got_new = false; } srv_dispatcher.loopback = false; - ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); client_msgr->shutdown(); client_msgr->wait(); server_msgr->shutdown(); @@ -312,12 +309,12 @@ TEST_P(MessengerTest, NameAddrTest) { cli_dispatcher.cond.Wait(cli_dispatcher.lock); cli_dispatcher.got_new = false; } - ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); ASSERT_TRUE(conn->get_peer_addr() == server_msgr->get_myaddr()); ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst()); // Make should server_conn is the one we already accepted from client, // so it means client_msgr has the same addr when server connection has - ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); server_msgr->shutdown(); client_msgr->shutdown(); server_msgr->wait(); @@ -373,7 +370,7 @@ TEST_P(MessengerTest, FeatureTest) { cli_dispatcher.cond.Wait(cli_dispatcher.lock); cli_dispatcher.got_new = false; } - ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); server_msgr->shutdown(); client_msgr->shutdown(); @@ -404,7 +401,7 @@ TEST_P(MessengerTest, TimeoutTest) { cli_dispatcher.got_new = false; } ASSERT_TRUE(conn->is_connected()); - ASSERT_TRUE((static_cast(conn->get_priv()))->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); ASSERT_TRUE(conn->peer_is_osd()); // 2. wait for idle @@ -445,12 +442,12 @@ TEST_P(MessengerTest, StatefulTest) { cli_dispatcher.cond.Wait(cli_dispatcher.lock); cli_dispatcher.got_new = false; } - ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); conn->mark_down(); ASSERT_FALSE(conn->is_connected()); ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst()); // don't lose state - ASSERT_TRUE(static_cast(server_conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(server_conn->get_priv().get())->get_count()); srv_dispatcher.got_new = false; conn = client_msgr->get_connection(server_msgr->get_myinst()); @@ -462,7 +459,7 @@ TEST_P(MessengerTest, StatefulTest) { cli_dispatcher.cond.Wait(cli_dispatcher.lock); cli_dispatcher.got_new = false; } - ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); server_conn = server_msgr->get_connection(client_msgr->get_myinst()); { Mutex::Locker l(srv_dispatcher.lock); @@ -503,9 +500,9 @@ TEST_P(MessengerTest, StatefulTest) { cli_dispatcher.got_new = false; } // resetcheck happen - ASSERT_EQ(1U, static_cast(conn->get_priv())->get_count()); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); server_conn = server_msgr->get_connection(client_msgr->get_myinst()); - ASSERT_EQ(1U, static_cast(server_conn->get_priv())->get_count()); + ASSERT_EQ(1U, static_cast(server_conn->get_priv().get())->get_count()); cli_dispatcher.got_remote_reset = false; server_msgr->shutdown(); @@ -540,7 +537,7 @@ TEST_P(MessengerTest, StatelessTest) { cli_dispatcher.cond.Wait(cli_dispatcher.lock); cli_dispatcher.got_new = false; } - ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); conn->mark_down(); ASSERT_FALSE(conn->is_connected()); @@ -554,7 +551,7 @@ TEST_P(MessengerTest, StatelessTest) { cli_dispatcher.cond.Wait(cli_dispatcher.lock); cli_dispatcher.got_new = false; } - ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst()); // server lose state { @@ -562,7 +559,7 @@ TEST_P(MessengerTest, StatelessTest) { while (!srv_dispatcher.got_new) srv_dispatcher.cond.Wait(srv_dispatcher.lock); } - ASSERT_EQ(1U, static_cast(server_conn->get_priv())->get_count()); + ASSERT_EQ(1U, static_cast(server_conn->get_priv().get())->get_count()); // 2. test for client lossy server_conn->mark_down(); @@ -579,7 +576,7 @@ TEST_P(MessengerTest, StatelessTest) { cli_dispatcher.cond.Wait(cli_dispatcher.lock); cli_dispatcher.got_new = false; } - ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); server_msgr->shutdown(); client_msgr->shutdown(); @@ -613,7 +610,7 @@ TEST_P(MessengerTest, ClientStandbyTest) { cli_dispatcher.cond.Wait(cli_dispatcher.lock); cli_dispatcher.got_new = false; } - ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst()); ASSERT_FALSE(cli_dispatcher.got_remote_reset); cli_dispatcher.got_connect = false; @@ -644,9 +641,9 @@ TEST_P(MessengerTest, ClientStandbyTest) { cli_dispatcher.cond.Wait(cli_dispatcher.lock); cli_dispatcher.got_new = false; } - ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); server_conn = server_msgr->get_connection(client_msgr->get_myinst()); - ASSERT_TRUE(static_cast(server_conn->get_priv())->get_count() == 1); + ASSERT_EQ(1U, static_cast(server_conn->get_priv().get())->get_count()); server_msgr->shutdown(); client_msgr->shutdown(); @@ -679,7 +676,7 @@ TEST_P(MessengerTest, AuthTest) { cli_dispatcher.got_new = false; } ASSERT_TRUE(conn->is_connected()); - ASSERT_TRUE((static_cast(conn->get_priv()))->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); // 2. mix auth g_ceph_context->_conf->set_val("auth_cluster_required", "none"); @@ -697,7 +694,7 @@ TEST_P(MessengerTest, AuthTest) { cli_dispatcher.got_new = false; } ASSERT_TRUE(conn->is_connected()); - ASSERT_TRUE((static_cast(conn->get_priv()))->get_count() == 1); + ASSERT_EQ(1U, static_cast(conn->get_priv().get())->get_count()); server_msgr->shutdown(); client_msgr->shutdown();