]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mds,osd,mon,msg: use intrusive_ptr for holding Connection::priv
authorKefu Chai <kchai@redhat.com>
Tue, 29 May 2018 07:51:07 +0000 (15:51 +0800)
committerSamuel Just <sjust@redhat.com>
Wed, 21 Aug 2019 22:05:39 +0000 (15:05 -0700)
See-also: http://tracker.ceph.com/issues/20924
Signed-off-by: Kefu Chai <kchai@redhat.com>
Signed-off-by: Samuel Just <sjust@redhat.com>
(cherry picked from commit 72883956c26fdc4345324e9b27b45c3dfac17fa2)

 Conflicts:
src/osd/OSD.cc
src/osdc/Objecter.cc

17 files changed:
src/common/RefCountedObj.h
src/mds/MDSDaemon.cc
src/mds/MDSRank.cc
src/messages/PaxosServiceMessage.h
src/mgr/DaemonServer.cc
src/mgr/DaemonState.cc
src/mon/MonOpRequest.h
src/mon/Monitor.cc
src/msg/Connection.h
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PrimaryLogPG.cc
src/osd/Watch.cc
src/osdc/Objecter.cc
src/test/mon/test_mon_workloadgen.cc
src/test/msgr/test_msgr.cc

index 9c0dad1d302f8574e1ec017622ce616b547f503a..325304c1441c30a1f1f7b69f80106c99c3d518c4 100644 (file)
@@ -20,6 +20,8 @@
 #include "common/ceph_context.h"
 #include "common/valgrind.h"
 
+#include <boost/smart_ptr/intrusive_ptr.hpp>
+
 // re-include our assert to clobber the system one; fix dout:
 #include "include/assert.h"
 
@@ -164,4 +166,6 @@ struct RefCountedWaitObject {
 void intrusive_ptr_add_ref(const RefCountedObject *p);
 void intrusive_ptr_release(const RefCountedObject *p);
 
+using RefCountedPtr = boost::intrusive_ptr<RefCountedObject>;
+
 #endif
index e312bf7cc227f4bacc71db4cc47259cd080b95a8..a16c8caf86db1614cdd96b706ae248659d2a09fb 100644 (file)
@@ -590,7 +590,8 @@ void MDSDaemon::send_command_reply(MCommand *m, MDSRank *mds_rank,
                                   int r, bufferlist outbl,
                                   boost::string_view outs)
 {
-  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
+  auto priv = m->get_connection()->get_priv();
+  auto session = static_cast<Session *>(priv.get());
   assert(session != NULL);
   // If someone is using a closed session for sending commands (e.g.
   // the ceph CLI) then we should feel free to clean up this connection
@@ -606,7 +607,7 @@ void MDSDaemon::send_command_reply(MCommand *m, MDSRank *mds_rank,
     assert(session->is_closed());
     session->connection->mark_disposable();
   }
-  session->put();
+  priv.reset();
 
   MCommandReply *reply = new MCommandReply(r, outs);
   reply->set_tid(m->get_tid());
@@ -617,7 +618,8 @@ void MDSDaemon::send_command_reply(MCommand *m, MDSRank *mds_rank,
 /* This function DOES put the passed message before returning*/
 void MDSDaemon::handle_command(MCommand *m)
 {
-  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
+  auto priv = m->get_connection()->get_priv();
+  auto session = static_cast<Session *>(priv.get());
   assert(session != NULL);
 
   int r = 0;
@@ -645,7 +647,7 @@ void MDSDaemon::handle_command(MCommand *m)
   } else {
     r = _handle_command(cmdmap, m, &outbl, &outs, &run_after, &need_reply);
   }
-  session->put();
+  priv.reset();
 
   if (need_reply) {
     send_command_reply(m, mds_rank, r, outbl, outs);
@@ -1254,14 +1256,13 @@ bool MDSDaemon::ms_handle_reset(Connection *con)
   if (beacon.get_want_state() == CEPH_MDS_STATE_DNE)
     return false;
 
-  Session *session = static_cast<Session *>(con->get_priv());
-  if (session) {
+  auto priv = con->get_priv();
+  if (auto session = static_cast<Session *>(priv.get()); session) {
     if (session->is_closed()) {
       dout(3) << "ms_handle_reset closing connection for session " << session->info.inst << dendl;
       con->mark_down();
-      con->set_priv(NULL);
+      con->set_priv(nullptr);
     }
-    session->put();
   } else {
     con->mark_down();
   }
@@ -1283,14 +1284,13 @@ void MDSDaemon::ms_handle_remote_reset(Connection *con)
   if (beacon.get_want_state() == CEPH_MDS_STATE_DNE)
     return;
 
-  Session *session = static_cast<Session *>(con->get_priv());
-  if (session) {
+  auto priv = con->get_priv();
+  if (auto session = static_cast<Session *>(priv.get()); session) {
     if (session->is_closed()) {
       dout(3) << "ms_handle_remote_reset closing connection for session " << session->info.inst << dendl;
       con->mark_down();
-      con->set_priv(NULL);
+      con->set_priv(nullptr);
     }
-    session->put();
   }
 }
 
@@ -1364,7 +1364,7 @@ bool MDSDaemon::ms_verify_authorizer(Connection *con, int peer_type,
       s->info.inst.addr = con->get_peer_addr();
       s->info.inst.name = n;
       dout(10) << " new session " << s << " for " << s->info.inst << " con " << con << dendl;
-      con->set_priv(s);
+      con->set_priv(RefCountedPtr{s, false});
       s->connection = con;
       if (mds_rank) {
         mds_rank->kick_waiters_for_any_client_connection();
@@ -1372,7 +1372,7 @@ bool MDSDaemon::ms_verify_authorizer(Connection *con, int peer_type,
     } else {
       dout(10) << " existing session " << s << " for " << s->info.inst << " existing con " << s->connection
               << ", new/authorizing con " << con << dendl;
-      con->set_priv(s->get());
+      con->set_priv(RefCountedPtr{s, false});
 
 
 
@@ -1428,7 +1428,8 @@ void MDSDaemon::ms_handle_accept(Connection *con)
     return;
   }
 
-  Session *s = static_cast<Session *>(con->get_priv());
+  auto priv = con->get_priv();
+  auto s = static_cast<Session *>(priv.get());
   dout(10) << "ms_handle_accept " << con->get_peer_addr() << " con " << con << " session " << s << dendl;
   if (s) {
     if (s->connection != con) {
@@ -1441,7 +1442,6 @@ void MDSDaemon::ms_handle_accept(Connection *con)
        s->preopen_out_queue.pop_front();
       }
     }
-    s->put();
   }
 }
 
index dbc5a7ad86101f78878e3ea122a300f5cf550bce..d2b508b5680648c427323851dcf2d7cabe40dfe0 100644 (file)
@@ -964,7 +964,7 @@ void MDSRank::ProgressThread::shutdown()
 bool MDSRankDispatcher::ms_dispatch(Message *m)
 {
   if (m->get_source().is_client()) {
-    Session *session = static_cast<Session*>(m->get_connection()->get_priv());
+    Session *session = static_cast<Session*>(m->get_connection()->get_priv().get());
     if (session)
       session->last_seen = Session::clock::now();
   }
@@ -1294,9 +1294,9 @@ bool MDSRank::is_stale_message(Message *m) const
 
 Session *MDSRank::get_session(Message *m)
 {
-  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
+  // do not carry ref
+  auto session = static_cast<Session *>(m->get_connection()->get_priv().get());
   if (session) {
-    session->put(); // do not carry ref
     dout(20) << "get_session have " << session << " " << session->info.inst
             << " state " << session->get_state_name() << dendl;
     // Check if we've imported an open session since (new sessions start closed)
@@ -1412,9 +1412,9 @@ void MDSRank::send_message_client_counted(Message *m, client_t client)
 
 void MDSRank::send_message_client_counted(Message *m, Connection *connection)
 {
-  Session *session = static_cast<Session *>(connection->get_priv());
+  // do not carry ref
+  auto session = static_cast<Session *>(m->get_connection()->get_priv().get());
   if (session) {
-    session->put();  // do not carry ref
     send_message_client_counted(m, session);
   } else {
     dout(10) << "send_message_client_counted has no session for " << m->get_source_inst() << dendl;
index a85ac9e6c4a6bf4d94e6c6678f62d10c015e057f..2ac491b7a5c114656d35c91d52a1746a5685c353 100644 (file)
@@ -58,10 +58,8 @@ class PaxosServiceMessage : public Message {
    * normally.
    */
   MonSession *get_session() {
-    MonSession *session = (MonSession *)get_connection()->get_priv();
-    if (session)
-      session->put();
-    return session;
+    auto priv = get_connection()->get_priv();
+    return static_cast<MonSession*>(priv.get());
   }
   
   const char *get_type_name() const override { return "PaxosServiceMessage"; }
index 256148e4e1925821d70d520f423bd4496d81a560..7641788d451863a552f17c5fdab2b8a30f0b75bc 100644 (file)
@@ -246,11 +246,11 @@ bool DaemonServer::ms_get_authorizer(int dest_type,
 bool DaemonServer::ms_handle_reset(Connection *con)
 {
   if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
-    MgrSessionRef session(static_cast<MgrSession*>(con->get_priv()));
+    auto priv = con->get_priv();
+    auto session = static_cast<MgrSession*>(priv.get());
     if (!session) {
       return false;
     }
-    session->put(); // SessionRef takes a ref
     Mutex::Locker l(lock);
     dout(10) << "unregistering osd." << session->osd_id
             << "  session " << session << " con " << con << dendl;
@@ -465,12 +465,12 @@ bool DaemonServer::handle_report(MMgrReport *m)
     {
       Mutex::Locker l(lock);
       // kill session
-      MgrSessionRef session(static_cast<MgrSession*>(m->get_connection()->get_priv()));
+      auto priv = m->get_connection()->get_priv();
+      auto session = static_cast<MgrSession*>(priv.get());
       if (!session) {
        return false;
       }
       m->get_connection()->mark_down();
-      session->put();
 
       dout(10) << "unregistering osd." << session->osd_id
               << "  session " << session << " con " << m->get_connection() << dendl;
@@ -666,11 +666,11 @@ bool DaemonServer::handle_command(MCommand *m)
 
   std::shared_ptr<CommandContext> cmdctx = std::make_shared<CommandContext>(m);
 
-  MgrSessionRef session(static_cast<MgrSession*>(m->get_connection()->get_priv()));
+  auto priv = m->get_connection()->get_priv();
+  auto session = static_cast<MgrSession*>(priv.get());
   if (!session) {
     return true;
   }
-  session->put(); // SessionRef takes a ref
   if (session->inst.name == entity_name_t())
     session->inst.name = m->get_source();
 
@@ -730,11 +730,11 @@ bool DaemonServer::handle_command(MCommand *m)
   bool is_allowed;
   if (!mgr_cmd) {
     MonCommand py_command = {"", "", "py", "rw", "cli"};
-    is_allowed = _allowed_command(session.get(), py_command.module,
+    is_allowed = _allowed_command(session, py_command.module,
       prefix, cmdctx->cmdmap, param_str_map, &py_command);
   } else {
     // validate user's permissions for requested command
-    is_allowed = _allowed_command(session.get(), mgr_cmd->module,
+    is_allowed = _allowed_command(session, mgr_cmd->module,
       prefix, cmdctx->cmdmap,  param_str_map, mgr_cmd);
   }
   if (!is_allowed) {
index 9ba7cf926d899bce1d9b4597be76f3bf7b8c3fb6..cc7f16b2175cf1c2233b81ff34102e4b88248758 100644 (file)
@@ -126,8 +126,8 @@ void DaemonPerfCounters::update(MMgrReport *report)
            << report->packed.length() << " bytes of data" << dendl;
 
   // Retrieve session state
-  MgrSessionRef session(static_cast<MgrSession*>(
-        report->get_connection()->get_priv()));
+  auto priv = report->get_connection()->get_priv();
+  auto session = static_cast<MgrSession*>(priv.get());
 
   // Load any newly declared types
   for (const auto &t : report->declare_types) {
index 7a43bff2f60256335c77e6248c1cfcb3979e4eb7..ac39ea99ddba22b265eb0d07f91d05a34434988c 100644 (file)
@@ -80,7 +80,7 @@ struct MonOpRequest : public TrackedOp {
 private:
   Message *request;
   utime_t dequeued_time;
-  MonSession *session;
+  RefCountedPtr session;
   ConnectionRef con;
   bool forwarded_to_leader;
   op_type_t op_type;
@@ -90,7 +90,6 @@ private:
       req->get_recv_stamp().is_zero() ?
       ceph_clock_now() : req->get_recv_stamp()),
     request(req),
-    session(NULL),
     con(NULL),
     forwarded_to_leader(false),
     op_type(OP_TYPE_NONE)
@@ -103,7 +102,7 @@ private:
     if (req) {
       con = req->get_connection();
       if (con) {
-        session = static_cast<MonSession*>(con->get_priv());
+        session = con->get_priv();
       }
     }
   }
@@ -133,15 +132,10 @@ protected:
 public:
   ~MonOpRequest() override {
     request->put();
-    // certain ops may not have a session (e.g., AUTH or PING)
-    if (session)
-      session->put();
   }
 
   MonSession *get_session() const {
-    if (!session)
-      return NULL;
-    return session;
+    return static_cast<MonSession*>(session.get());
   }
 
   template<class T>
@@ -158,16 +152,7 @@ public:
   ConnectionRef get_connection() { return con; }
 
   void set_session(MonSession *s) {
-    if (session) {
-      // we will be rewriting the existing session; drop the ref.
-      session->put();
-    }
-
-    if (s == NULL) {
-      session = NULL;
-    } else {
-      session = static_cast<MonSession*>(s->get());
-    }
+    session.reset(s);
   }
 
   bool is_src_mon() const {
index 76c3c77081b8845f50856acb82d464803ff95489..c0228380782afd54d7b3e29d6b7fa6cdfb5d7a21 100644 (file)
@@ -3000,15 +3000,12 @@ void Monitor::handle_command(MonOpRequestRef op)
     return;
   }
 
-  MonSession *session = static_cast<MonSession *>(
-    m->get_connection()->get_priv());
+  auto priv = m->get_connection()->get_priv();
+  auto session = static_cast<MonSession *>(priv.get());
   if (!session) {
     dout(5) << __func__ << " dropping stray message " << *m << dendl;
     return;
   }
-  BOOST_SCOPE_EXIT_ALL(=) {
-    session->put();
-  };
 
   if (m->cmd.empty()) {
     string rs = "No command supplied";
@@ -3834,7 +3831,7 @@ void Monitor::handle_forward(MonOpRequestRef op)
     ConnectionRef c(new AnonConnection(cct));
     MonSession *s = new MonSession(req->get_source_inst(),
                                   static_cast<Connection*>(c.get()));
-    c->set_priv(s->get());
+    c->set_priv(RefCountedPtr{s, false});
     c->set_peer_addr(m->client.addr);
     c->set_peer_type(m->client.name.type());
     c->set_features(m->con_features);
@@ -3871,7 +3868,6 @@ void Monitor::handle_forward(MonOpRequestRef op)
     dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl;
 
     _ms_dispatch(req);
-    s->put();
   }
 }
 
@@ -4049,7 +4045,7 @@ void Monitor::remove_session(MonSession *s)
     routed_requests.erase(*p);
   }
   s->routed_request_tids.clear();
-  s->con->set_priv(NULL);
+  s->con->set_priv(nullptr);
   session_map.remove_session(s);
   logger->set(l_mon_num_sessions, session_map.get_size());
   logger->inc(l_mon_session_rm);
@@ -4173,7 +4169,7 @@ void Monitor::_ms_dispatch(Message *m)
       s = session_map.new_session(m->get_source_inst(), con.get());
     }
     assert(s);
-    con->set_priv(s->get());
+    con->set_priv(RefCountedPtr{s, false});
     dout(10) << __func__ << " new session " << s << " " << *s
             << " features 0x" << std::hex
             << s->con_features << std::dec << dendl;
@@ -4188,7 +4184,6 @@ void Monitor::_ms_dispatch(Message *m)
       if (!s->caps.is_allow_all()) // but no need to repeatedly copy
         s->caps = *mon_caps;
     }
-    s->put();
   } else {
     dout(20) << __func__ << " existing session " << s << " for " << s->inst
             << dendl;
@@ -5084,12 +5079,13 @@ bool Monitor::ms_handle_reset(Connection *con)
   if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
     return false;
 
-  MonSession *s = static_cast<MonSession *>(con->get_priv());
+  auto priv = con->get_priv();
+  auto s = static_cast<MonSession*>(priv.get());
   if (!s)
     return false;
 
   // break any con <-> session ref cycle
-  s->con->set_priv(NULL);
+  s->con->set_priv(nullptr);
 
   if (is_shutdown())
     return false;
@@ -5101,7 +5097,6 @@ bool Monitor::ms_handle_reset(Connection *con)
     Mutex::Locker l(session_map_lock);
     remove_session(s);
   }
-  s->put();
   return true;
 }
 
index 94e934c55f1335fbec249ed8f6954d835f5b1a52..99fa51944493db6e99678a2d83d6471e3a3128b1 100644 (file)
@@ -41,7 +41,7 @@ class Messenger;
 struct Connection : public RefCountedObject {
   mutable Mutex lock;
   Messenger *msgr;
-  RefCountedObject *priv;
+  RefCountedPtr priv;
   int peer_type;
   entity_addr_t peer_addr;
   utime_t last_keepalive, last_keepalive_ack;
@@ -63,7 +63,6 @@ public:
     : RefCountedObject(cct, 0),
       lock("Connection::lock"),
       msgr(m),
-      priv(NULL),
       peer_type(-1),
       features(0),
       failed(false),
@@ -72,24 +71,16 @@ public:
 
   ~Connection() override {
     //generic_dout(0) << "~Connection " << this << dendl;
-    if (priv) {
-      //generic_dout(0) << "~Connection " << this << " dropping priv " << priv << dendl;
-      priv->put();
-    }
   }
 
-  void set_priv(RefCountedObject *o) {
+  void set_priv(const RefCountedPtr& o) {
     Mutex::Locker l(lock);
-    if (priv)
-      priv->put();
     priv = o;
   }
 
-  RefCountedObject *get_priv() {
+  RefCountedPtr get_priv() {
     Mutex::Locker l(lock);
-    if (priv)
-      return priv->get();
-    return NULL;
+    return priv;
   }
 
   /**
index 3a3ea22ebbc6f1ab11b4a6a15dfad3540054c7e5..ea8ed5d30e8583a7658f3abb3c132927586f79ca 100644 (file)
@@ -4814,12 +4814,12 @@ void OSD::_add_heartbeat_peer(int p)
       return;
     hi = &heartbeat_peers[p];
     hi->peer = p;
-    HeartbeatSession *s = new HeartbeatSession(p);
+    RefCountedPtr s{new HeartbeatSession{p}, false};
     hi->con_back = cons.first.get();
-    hi->con_back->set_priv(s->get());
+    hi->con_back->set_priv(s);
     if (cons.second) {
       hi->con_front = cons.second.get();
-      hi->con_front->set_priv(s->get());
+      hi->con_front->set_priv(s);
       dout(10) << "_add_heartbeat_peer: new peer osd." << p
               << " " << hi->con_back->get_peer_addr()
               << " " << hi->con_front->get_peer_addr()
@@ -4830,7 +4830,6 @@ void OSD::_add_heartbeat_peer(int p)
               << " " << hi->con_back->get_peer_addr()
               << dendl;
     }
-    s->put();
   } else {
     hi = &i->second;
   }
@@ -5274,13 +5273,13 @@ void OSD::heartbeat()
 bool OSD::heartbeat_reset(Connection *con)
 {
   Mutex::Locker l(heartbeat_lock);
-  HeartbeatSession *s = static_cast<HeartbeatSession*>(con->get_priv());
+  auto s = con->get_priv();
   if (s) {
     if (is_stopping()) {
-      s->put();
       return true;
     }
-    map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(s->peer);
+    auto heartbeat_session = static_cast<HeartbeatSession*>(s.get());
+    auto p = heartbeat_peers.find(heartbeat_session->peer);
     if (p != heartbeat_peers.end() &&
        (p->second.con_back == con ||
         p->second.con_front == con)) {
@@ -5297,10 +5296,10 @@ bool OSD::heartbeat_reset(Connection *con)
       pair<ConnectionRef,ConnectionRef> newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch);
       if (newcon.first) {
        p->second.con_back = newcon.first.get();
-       p->second.con_back->set_priv(s->get());
+       p->second.con_back->set_priv(s);
        if (newcon.second) {
          p->second.con_front = newcon.second.get();
-         p->second.con_front->set_priv(s->get());
+         p->second.con_front->set_priv(s);
        }
       } else {
        dout(10) << "heartbeat_reset failed hb con " << con << " for osd." << p->second.peer
@@ -5310,7 +5309,6 @@ bool OSD::heartbeat_reset(Connection *con)
     } else {
       dout(10) << "heartbeat_reset closing (old) failed hb con " << con << dendl;
     }
-    s->put();
   }
   return true;
 }
@@ -5879,10 +5877,11 @@ void OSD::ms_handle_fast_connect(Connection *con)
 {
   if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON &&
       con->get_peer_type() != CEPH_ENTITY_TYPE_MGR) {
-    Session *s = static_cast<Session*>(con->get_priv());
+    auto priv = con->get_priv();
+    auto s = static_cast<Session*>(priv.get());
     if (!s) {
-      s = new Session(cct);
-      con->set_priv(s->get());
+      s = new Session{cct};
+      con->set_priv(RefCountedPtr{s, false});
       s->con = con;
       dout(10) << " new session (outgoing) " << s << " con=" << s->con
           << " addr=" << s->con->get_peer_addr() << dendl;
@@ -5890,7 +5889,6 @@ void OSD::ms_handle_fast_connect(Connection *con)
       assert(con->get_peer_type() == CEPH_ENTITY_TYPE_OSD);
       s->entity_name.set_type(CEPH_ENTITY_TYPE_OSD);
     }
-    s->put();
   }
 }
 
@@ -5898,10 +5896,11 @@ void OSD::ms_handle_fast_accept(Connection *con)
 {
   if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON &&
       con->get_peer_type() != CEPH_ENTITY_TYPE_MGR) {
-    Session *s = static_cast<Session*>(con->get_priv());
+    auto priv = con->get_priv();
+    auto s = static_cast<Session*>(priv.get());
     if (!s) {
-      s = new Session(cct);
-      con->set_priv(s->get());
+      s = new Session{cct};
+      con->set_priv(RefCountedPtr{s, false});
       s->con = con;
       dout(10) << "new session (incoming)" << s << " con=" << con
           << " addr=" << con->get_peer_addr()
@@ -5909,23 +5908,23 @@ void OSD::ms_handle_fast_accept(Connection *con)
       assert(con->get_peer_type() == CEPH_ENTITY_TYPE_OSD);
       s->entity_name.set_type(CEPH_ENTITY_TYPE_OSD);
     }
-    s->put();
   }
 }
 
 bool OSD::ms_handle_reset(Connection *con)
 {
-  Session *session = static_cast<Session*>(con->get_priv());
+  auto s = con->get_priv();
+  auto session = static_cast<Session*>(s.get());
   dout(2) << "ms_handle_reset con " << con << " session " << session << dendl;
   if (!session)
     return false;
   session->wstate.reset(con);
-  session->con.reset(NULL);  // break con <-> session ref cycle
+  session->con->set_priv(nullptr);
+  session->con.reset();  // break con <-> session ref cycle
   // note that we break session->con *before* the session_handle_reset
   // cleanup below.  this avoids a race between us and
   // PG::add_backoff, Session::check_backoff, etc.
-  session_handle_reset(session);
-  session->put();
+  session_handle_reset(SessionRef{session});
   return true;
 }
 
@@ -5934,7 +5933,8 @@ bool OSD::ms_handle_refused(Connection *con)
   if (!cct->_conf->osd_fast_fail_on_connection_refused)
     return false;
 
-  Session *session = static_cast<Session*>(con->get_priv());
+  auto priv = con->get_priv();
+  auto session = static_cast<Session*>(priv.get());
   dout(2) << "ms_handle_refused con " << con << " session " << session << dendl;
   if (!session)
     return false;
@@ -5956,7 +5956,6 @@ bool OSD::ms_handle_refused(Connection *con)
       }
     }
   }
-  session->put();
   return true;
 }
 
@@ -6115,11 +6114,9 @@ void OSD::_send_boot()
     cluster_messenger->set_addr_unknowns(cluster_addr);
     dout(10) << " assuming cluster_addr ip matches client_addr" << dendl;
   } else {
-    Session *s = static_cast<Session*>(local_connection->get_priv());
-    if (s)
-      s->put();
-    else
+    if (auto session = local_connection->get_priv(); !session) {
       cluster_messenger->ms_deliver_handle_fast_connect(local_connection);
+    }
   }
 
   entity_addr_t hb_back_addr = hb_back_server_messenger->get_myaddr();
@@ -6131,11 +6128,9 @@ void OSD::_send_boot()
     hb_back_server_messenger->set_addr_unknowns(hb_back_addr);
     dout(10) << " assuming hb_back_addr ip matches cluster_addr" << dendl;
   } else {
-    Session *s = static_cast<Session*>(local_connection->get_priv());
-    if (s)
-      s->put();
-    else
+    if (auto session = local_connection->get_priv(); !session) {
       hb_back_server_messenger->ms_deliver_handle_fast_connect(local_connection);
+    }
   }
 
   entity_addr_t hb_front_addr = hb_front_server_messenger->get_myaddr();
@@ -6147,11 +6142,9 @@ void OSD::_send_boot()
     hb_front_server_messenger->set_addr_unknowns(hb_front_addr);
     dout(10) << " assuming hb_front_addr ip matches client_addr" << dendl;
   } else {
-    Session *s = static_cast<Session*>(local_connection->get_priv());
-    if (s)
-      s->put();
-    else
+    if (auto session = local_connection->get_priv(); !session) {
       hb_front_server_messenger->ms_deliver_handle_fast_connect(local_connection);
+    }
   }
 
   MOSDBoot *mboot = new MOSDBoot(superblock, get_osdmap_epoch(), service.get_boot_epoch(),
@@ -6512,7 +6505,8 @@ void OSD::handle_command(MMonCommand *m)
 void OSD::handle_command(MCommand *m)
 {
   ConnectionRef con = m->get_connection();
-  Session *session = static_cast<Session *>(con->get_priv());
+  auto priv = con->get_priv();
+  auto session = static_cast<Session *>(priv.get());
   if (!session) {
     con->send_message(new MCommandReply(m, -EPERM));
     m->put();
@@ -6520,7 +6514,7 @@ void OSD::handle_command(MCommand *m)
   }
 
   OSDCap& caps = session->caps;
-  session->put();
+  priv.reset();
 
   if (!caps.allow_all() || m->get_source().is_mon()) {
     con->send_message(new MCommandReply(m, -EPERM));
@@ -7186,7 +7180,7 @@ void OSD::maybe_share_map(
   op->check_send_map = false;
 }
 
-void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
+void OSD::dispatch_session_waiting(SessionRef session, OSDMapRef osdmap)
 {
   assert(session->session_dispatch_lock.is_locked());
 
@@ -7259,17 +7253,14 @@ void OSD::ms_fast_dispatch(Message *m)
     // legacy client, and this is an MOSDOp (the *only* fast dispatch
     // message that didn't have an explicit spg_t); we need to map
     // them to an spg_t while preserving delivery order.
-    Session *session = static_cast<Session*>(m->get_connection()->get_priv());
-    if (session) {
-      {
-       Mutex::Locker l(session->session_dispatch_lock);
-       op->get();
-       session->waiting_on_map.push_back(*op);
-       OSDMapRef nextmap = service.get_nextmap_reserved();
-       dispatch_session_waiting(session, nextmap);
-       service.release_map(nextmap);
-      }
-      session->put();
+    auto priv = m->get_connection()->get_priv();
+    if (auto session = static_cast<Session*>(priv.get()); session) {
+      Mutex::Locker l{session->session_dispatch_lock};
+      op->get();
+      session->waiting_on_map.push_back(*op);
+      OSDMapRef nextmap = service.get_nextmap_reserved();
+      dispatch_session_waiting(session, nextmap);
+      service.release_map(nextmap);
     }
   }
   OID_EVENT_TRACE_WITH_MSG(m, "MS_FAST_DISPATCH_END", false); 
@@ -7280,12 +7271,11 @@ void OSD::ms_fast_preprocess(Message *m)
   if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
     if (m->get_type() == CEPH_MSG_OSD_MAP) {
       MOSDMap *mm = static_cast<MOSDMap*>(m);
-      Session *s = static_cast<Session*>(m->get_connection()->get_priv());
-      if (s) {
+      auto priv = m->get_connection()->get_priv();
+      if (auto s = static_cast<Session*>(priv.get()); s) {
        s->received_map_lock.lock();
        s->received_map_epoch = mm->get_last();
        s->received_map_lock.unlock();
-       s->put();
       }
     }
   }
@@ -7360,12 +7350,14 @@ bool OSD::ms_verify_authorizer(
   }
 
   if (isvalid) {
-    Session *s = static_cast<Session *>(con->get_priv());
+    auto priv = con->get_priv();
+    auto s = static_cast<Session*>(priv.get());
     if (!s) {
-      s = new Session(cct);
-      con->set_priv(s->get());
+      s = new Session{cct};
+      con->set_priv(RefCountedPtr{s, false});
       s->con = con;
-      dout(10) << " new session " << s << " con=" << s->con << " addr=" << s->con->get_peer_addr() << dendl;
+      dout(10) << " new session " << s << " con=" << s->con
+              << " addr=" << con->get_peer_addr() << dendl;
     }
 
     s->entity_name = name;
@@ -7387,8 +7379,6 @@ bool OSD::ms_verify_authorizer(
       else
        dout(10) << " session " << s << " " << s->entity_name << " failed to parse caps '" << str << "'" << dendl;
     }
-
-    s->put();
   }
   return true;
 }
@@ -7900,18 +7890,16 @@ void OSD::handle_osd_map(MOSDMap *m)
     return;
   }
 
-  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
-  if (session && !(session->entity_name.is_mon() ||
+  auto priv = m->get_connection()->get_priv();
+  if (auto session = static_cast<Session *>(priv.get());
+      session && !(session->entity_name.is_mon() ||
                   session->entity_name.is_osd())) {
     //not enough perms!
     dout(10) << "got osd map from Session " << session
              << " which we can't take maps from (not a mon or osd)" << dendl;
     m->put();
-    session->put();
     return;
   }
-  if (session)
-    session->put();
 
   // share with the objecter
   if (!is_preboot())
@@ -8691,15 +8679,15 @@ bool OSD::require_same_peer_instance(const Message *m, OSDMapRef& map,
            << dendl;
     ConnectionRef con = m->get_connection();
     con->mark_down();
-    Session *s = static_cast<Session*>(con->get_priv());
-    if (s) {
+    auto priv = con->get_priv();
+    if (auto s = static_cast<Session*>(priv.get()); s) {
       if (!is_fast_dispatch)
        s->session_dispatch_lock.Lock();
       clear_session_waiting_on_map(s);
-      con->set_priv(NULL);   // break ref <-> session cycle, if any
+      con->set_priv(nullptr);   // break ref <-> session cycle, if any
+      s->con.reset();
       if (!is_fast_dispatch)
        s->session_dispatch_lock.Unlock();
-      s->put();
     }
     return false;
   }
@@ -9871,11 +9859,9 @@ void OSD::dequeue_op(
 
   logger->tinc(l_osd_op_before_dequeue_op_lat, latency);
 
-  Session *session = static_cast<Session *>(
-    op->get_req()->get_connection()->get_priv());
-  if (session) {
+  auto priv = op->get_req()->get_connection()->get_priv();
+  if (auto session = static_cast<Session *>(priv.get()); session) {
     maybe_share_map(session, op, pg->get_osdmap());
-    session->put();
   }
 
   if (pg->deleting)
@@ -10619,11 +10605,9 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
               << " dropping " << *qi << dendl;
       // share map with client?
       if (boost::optional<OpRequestRef> _op = qi->maybe_get_op()) {
-       Session *session = static_cast<Session *>(
-         (*_op)->get_req()->get_connection()->get_priv());
-       if (session) {
-         osd->maybe_share_map(session, *_op, sdata->waiting_for_pg_osdmap);
-         session->put();
+       auto priv = (*_op)->get_req()->get_connection()->get_priv();
+       if (auto session = static_cast<Session *>(priv.get()); session) {
+         osd->maybe_share_map(session, *_op, sdata->shard_osdmap);
        }
       }
       unsigned pushes_to_free = qi->get_reserved_pushes();
index 24b4200d1ecc8cf47ed30e786d478fda8c4fb8d2..f2a71eb9a30092fa30ad8395f646d0b325598617 100644 (file)
@@ -1400,44 +1400,37 @@ private:
 
   // -- sessions --
 private:
-  void dispatch_session_waiting(Session *session, OSDMapRef osdmap);
+  void dispatch_session_waiting(SessionRef session, OSDMapRef osdmap);
   void maybe_share_map(Session *session, OpRequestRef op, OSDMapRef osdmap);
 
   Mutex session_waiting_lock;
-  set<Session*> session_waiting_for_map;
+  set<SessionRef> session_waiting_for_map;
 
   /// Caller assumes refs for included Sessions
-  void get_sessions_waiting_for_map(set<Session*> *out) {
+  void get_sessions_waiting_for_map(set<SessionRef> *out) {
     Mutex::Locker l(session_waiting_lock);
     out->swap(session_waiting_for_map);
   }
-  void register_session_waiting_on_map(Session *session) {
+  void register_session_waiting_on_map(SessionRef session) {
     Mutex::Locker l(session_waiting_lock);
-    if (session_waiting_for_map.insert(session).second) {
-      session->get();
-    }
+    session_waiting_for_map.insert(session);
   }
-  void clear_session_waiting_on_map(Session *session) {
+  void clear_session_waiting_on_map(SessionRef session) {
     Mutex::Locker l(session_waiting_lock);
-    set<Session*>::iterator i = session_waiting_for_map.find(session);
-    if (i != session_waiting_for_map.end()) {
-      (*i)->put();
-      session_waiting_for_map.erase(i);
-    }
+    session_waiting_for_map.erase(session);
   }
   void dispatch_sessions_waiting_on_map() {
-    set<Session*> sessions_to_check;
+    set<SessionRef> sessions_to_check;
     get_sessions_waiting_for_map(&sessions_to_check);
-    for (set<Session*>::iterator i = sessions_to_check.begin();
+    for (auto i = sessions_to_check.begin();
         i != sessions_to_check.end();
         sessions_to_check.erase(i++)) {
-      (*i)->session_dispatch_lock.Lock();
-      dispatch_session_waiting(*i, osdmap);
-      (*i)->session_dispatch_lock.Unlock();
-      (*i)->put();
+      Mutex::Locker l{(*i)->session_dispatch_lock};
+      SessionRef session = *i;
+      dispatch_session_waiting(session, osdmap);
     }
   }
-  void session_handle_reset(Session *session) {
+  void session_handle_reset(SessionRef session) {
     Mutex::Locker l(session->session_dispatch_lock);
     clear_session_waiting_on_map(session);
 
index 26ccdeee574b033a75aa9c5c7862f8d2f339a2b3..51b2d2c4973469f81206c93ce2a0ccddae5e005b 100644 (file)
@@ -1939,13 +1939,14 @@ bool PG::op_has_sufficient_caps(OpRequestRef& op)
 
   const MOSDOp *req = static_cast<const MOSDOp*>(op->get_req());
 
-  Session *session = static_cast<Session*>(req->get_connection()->get_priv());
+  auto priv = req->get_connection()->get_priv();
+  auto session = static_cast<Session*>(priv.get());
   if (!session) {
     dout(0) << "op_has_sufficient_caps: no session for op " << *req << dendl;
     return false;
   }
   OSDCap& caps = session->caps;
-  session->put();
+  priv.reset();
 
   const string &key = req->get_hobj().get_key().empty() ?
     req->get_oid().name :
index d1b5953e22e9ed720ecc7eecdfa4fe8266cfb520..f5b7091edbdb6582de9da8d55e9e91b5eb16f829 100644 (file)
@@ -1691,10 +1691,9 @@ void PrimaryLogPG::get_src_oloc(const object_t& oid, const object_locator_t& olo
 void PrimaryLogPG::handle_backoff(OpRequestRef& op)
 {
   const MOSDBackoff *m = static_cast<const MOSDBackoff*>(op->get_req());
-  SessionRef session = static_cast<Session*>(m->get_connection()->get_priv());
+  SessionRef session{static_cast<Session*>(m->get_connection()->get_priv().get())};
   if (!session)
     return;  // drop it.
-  session->put();  // get_priv takes a ref, and so does the SessionRef
   hobject_t begin = info.pgid.pgid.get_hobj_start();
   hobject_t end = info.pgid.pgid.get_hobj_end(pool.info.get_pg_num());
   if (begin < m->begin) {
@@ -1742,10 +1741,9 @@ void PrimaryLogPG::do_request(
   // pg-wide backoffs
   const Message *m = op->get_req();
   if (m->get_connection()->has_feature(CEPH_FEATURE_RADOS_BACKOFF)) {
-    SessionRef session = static_cast<Session*>(m->get_connection()->get_priv());
+    SessionRef session{static_cast<Session*>(m->get_connection()->get_priv().get())};
     if (!session)
       return;  // drop it.
-    session->put();  // get_priv takes a ref, and so does the SessionRef
 
     if (op->get_req()->get_type() == CEPH_MSG_OSD_OP) {
       if (session->check_backoff(cct, info.pgid,
@@ -1942,12 +1940,11 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
     m->get_connection()->has_feature(CEPH_FEATURE_RADOS_BACKOFF);
   SessionRef session;
   if (can_backoff) {
-    session = static_cast<Session*>(m->get_connection()->get_priv());
+    session = static_cast<Session*>(m->get_connection()->get_priv().get());
     if (!session.get()) {
       dout(10) << __func__ << " no session" << dendl;
       return;
     }
-    session->put();  // get_priv() takes a ref, and so does the intrusive_ptr
 
     if (session->check_backoff(cct, info.pgid, head, m)) {
       return;
@@ -7466,10 +7463,9 @@ void PrimaryLogPG::do_osd_op_effects(OpContext *ctx, const ConnectionRef& conn)
 
   assert(conn);
 
-  boost::intrusive_ptr<Session> session((Session *)conn->get_priv());
-  if (!session.get())
+  auto session = conn->get_priv();
+  if (!session)
     return;
-  session->put();  // get_priv() takes a ref, and so does the intrusive_ptr
 
   for (list<pair<watch_info_t,bool> >::iterator i = ctx->watch_connects.begin();
        i != ctx->watch_connects.end();
index 7ff9f99b2bfab5929a52b7409279d20cccb5d09d..800b3c07b24d61f6d9f0aa9772cab6ed382401c5 100644 (file)
@@ -369,10 +369,11 @@ void Watch::connect(ConnectionRef con, bool _will_ping)
   dout(10) << __func__ << " con " << con << dendl;
   conn = con;
   will_ping = _will_ping;
-  Session* sessionref(static_cast<Session*>(con->get_priv()));
-  if (sessionref) {
+  auto priv = con->get_priv();
+  if (priv) {
+    auto sessionref = static_cast<Session*>(priv.get());
     sessionref->wstate.addWatch(self.lock());
-    sessionref->put();
+    priv.reset();
     for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
         i != in_progress_notifies.end();
         ++i) {
@@ -415,10 +416,9 @@ void Watch::discard_state()
   unregister_cb();
   discarded = true;
   if (conn) {
-    Session* sessionref(static_cast<Session*>(conn->get_priv()));
-    if (sessionref) {
-      sessionref->wstate.removeWatch(self.lock());
-      sessionref->put();
+    if (auto priv = conn->get_priv(); priv) {
+      auto session = static_cast<Session*>(priv.get());
+      session->wstate.removeWatch(self.lock());
     }
     conn = ConnectionRef();
   }
index a16e6542187c7d5748d0250d49008e54a7f63cdf..5149cdc918d127cb41273ae7f832143a01cdfa59 100644 (file)
@@ -1793,7 +1793,7 @@ int Objecter::_get_session(int osd, OSDSession **session, shunique_lock& sul)
   OSDSession *s = new OSDSession(cct, osd);
   osd_sessions[osd] = s;
   s->con = messenger->get_connection(osdmap->get_inst(osd));
-  s->con->set_priv(s->get());
+  s->con->set_priv(RefCountedPtr{s});
   logger->inc(l_osdc_osd_session_open);
   logger->set(l_osdc_osd_sessions, osd_sessions.size());
   s->get();
@@ -1837,7 +1837,7 @@ void Objecter::_reopen_session(OSDSession *s)
     logger->inc(l_osdc_osd_session_close);
   }
   s->con = messenger->get_connection(inst);
-  s->con->set_priv(s->get());
+  s->con->set_priv(RefCountedPtr{s});
   s->incarnation++;
   logger->inc(l_osdc_osd_session_open);
 }
@@ -3358,12 +3358,10 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
   }
 
   ConnectionRef con = m->get_connection();
-  OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+  auto priv = con->get_priv();
+  auto s = static_cast<OSDSession*>(priv.get());
   if (!s || s->con != con) {
     ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
-    if (s) {
-      s->put();
-    }
     m->put();
     return;
   }
@@ -3377,7 +3375,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
                                                    " onnvram" : " ack"))
                  << " ... stray" << dendl;
     sl.unlock();
-    s->put();
     m->put();
     return;
   }
@@ -3400,7 +3397,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
     }
     _session_op_remove(s, op);
     sl.unlock();
-    s->put();
 
     _op_submit(op, sul, NULL);
     m->put();
@@ -3416,7 +3412,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
                    << op->session->con->get_peer_addr() << dendl;
       m->put();
       sl.unlock();
-      s->put();
       return;
     }
   } else {
@@ -3435,7 +3430,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
       num_in_flight--;
     _session_op_remove(s, op);
     sl.unlock();
-    s->put();
 
     // FIXME: two redirects could race and reorder
 
@@ -3456,7 +3450,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
       num_in_flight--;
     _session_op_remove(s, op);
     sl.unlock();
-    s->put();
 
     op->tid = 0;
     op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
@@ -3550,7 +3543,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
   }
 
   m->put();
-  s->put();
 }
 
 void Objecter::handle_osd_backoff(MOSDBackoff *m)
@@ -3563,17 +3555,15 @@ void Objecter::handle_osd_backoff(MOSDBackoff *m)
   }
 
   ConnectionRef con = m->get_connection();
-  OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+  auto priv = con->get_priv();
+  auto s = static_cast<OSDSession*>(priv.get());
   if (!s || s->con != con) {
     ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
-    if (s)
-      s->put();
     m->put();
     return;
   }
 
   get_session(s);
-  s->put();  // from get_priv() above
 
   OSDSession::unique_lock sl(s->lock);
 
@@ -4423,7 +4413,8 @@ bool Objecter::ms_handle_reset(Connection *con)
   if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
     unique_lock wl(rwlock);
     
-    OSDSession *session = static_cast<OSDSession*>(con->get_priv());
+    auto priv = con->get_priv();
+    auto session = static_cast<OSDSession*>(priv.get());
     if (session) {
       ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
                    << " osd." << session->osd << dendl;
@@ -4442,7 +4433,6 @@ bool Objecter::ms_handle_reset(Connection *con)
       _linger_ops_resend(lresend, wl);
       wl.unlock();
       maybe_request_map();
-      session->put();
     }
     return true;
   }
@@ -4757,12 +4747,11 @@ void Objecter::handle_command_reply(MCommandReply *m)
   }
 
   ConnectionRef con = m->get_connection();
-  OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+  auto priv = con->get_priv();
+  auto s = static_cast<OSDSession*>(priv.get());
   if (!s || s->con != con) {
     ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
     m->put();
-    if (s)
-      s->put();
     return;
   }
 
@@ -4773,7 +4762,6 @@ void Objecter::handle_command_reply(MCommandReply *m)
                   << " not found" << dendl;
     m->put();
     sl.unlock();
-    s->put();
     return;
   }
 
@@ -4786,7 +4774,6 @@ void Objecter::handle_command_reply(MCommandReply *m)
                   << dendl;
     m->put();
     sl.unlock();
-    s->put();
     return;
   }
   if (c->poutbl) {
@@ -4800,7 +4787,6 @@ void Objecter::handle_command_reply(MCommandReply *m)
   sul.unlock();
 
   m->put();
-  s->put();
 }
 
 void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
index 5984a26aa936308a0a0d3278c4371b73995300ae..e8f95c28c0c09b1a98ac115ba14bc362eef93a37 100644 (file)
@@ -897,11 +897,7 @@ class OSDStub : public TestStub
 
   bool ms_handle_reset(Connection *con) override {
     dout(1) << __func__ << dendl;
-    Session *session = (Session *)con->get_priv();
-    if (!session)
-      return false;
-    session->put();
-    return true;
+    return con->get_priv().get();
   }
 
   bool ms_handle_refused(Connection *con) override {
index a3c83e3ab72d92f03ea2fe5ec974c77d6fa28690..c40feb1bb5aede14f7622428c8b230e175e40212 100644 (file)
@@ -115,32 +115,30 @@ class FakeDispatcher : public Dispatcher {
   void ms_handle_fast_connect(Connection *con) override {
     lock.Lock();
     lderr(g_ceph_context) << __func__ << " " << con << dendl;
-    Session *s = static_cast<Session*>(con->get_priv());
+    auto s = con->get_priv();
     if (!s) {
-      s = new Session(con);
-      con->set_priv(s->get());
-      lderr(g_ceph_context) << __func__ << " con: " << con << " count: " << s->count << dendl;
+      auto session = new Session(con);
+      con->set_priv(RefCountedPtr{session, false});
+      lderr(g_ceph_context) << __func__ << " con: " << con
+                           << " count: " << session->count << dendl;
     }
-    s->put();
     got_connect = true;
     cond.Signal();
     lock.Unlock();
   }
   void ms_handle_fast_accept(Connection *con) override {
-    Session *s = static_cast<Session*>(con->get_priv());
-    if (!s) {
-      s = new Session(con);
-      con->set_priv(s->get());
+    if (!con->get_priv()) {
+      con->set_priv(RefCountedPtr{new Session(con), false});
     }
-    s->put();
   }
   bool ms_dispatch(Message *m) override {
-    Session *s = static_cast<Session*>(m->get_connection()->get_priv());
+    auto priv = m->get_connection()->get_priv();
+    auto s = static_cast<Session*>(priv.get());
     if (!s) {
       s = new Session(m->get_connection());
-      m->get_connection()->set_priv(s->get());
+      priv.reset(s, false);
+      m->get_connection()->set_priv(priv);
     }
-    s->put();
     s->count++;
     lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
     if (is_server) {
@@ -155,22 +153,20 @@ class FakeDispatcher : public Dispatcher {
   bool ms_handle_reset(Connection *con) override {
     Mutex::Locker l(lock);
     lderr(g_ceph_context) << __func__ << " " << con << dendl;
-    Session *s = static_cast<Session*>(con->get_priv());
-    if (s) {
-      s->con.reset(NULL);  // break con <-> session ref cycle
-      con->set_priv(NULL);   // break ref <-> session cycle, if any
-      s->put();
+    auto priv = con->get_priv();
+    if (auto s = static_cast<Session*>(priv.get()); s) {
+      s->con.reset();  // break con <-> session ref cycle
+      con->set_priv(nullptr);   // break ref <-> session cycle, if any
     }
     return true;
   }
   void ms_handle_remote_reset(Connection *con) override {
     Mutex::Locker l(lock);
     lderr(g_ceph_context) << __func__ << " " << con << dendl;
-    Session *s = static_cast<Session*>(con->get_priv());
-    if (s) {
-      s->con.reset(NULL);  // break con <-> session ref cycle
-      con->set_priv(NULL);   // break ref <-> session cycle, if any
-      s->put();
+    auto priv = con->get_priv();
+    if (auto s = static_cast<Session*>(priv.get()); s) {
+      s->con.reset();  // break con <-> session ref cycle
+      con->set_priv(nullptr);   // break ref <-> session cycle, if any
     }
     got_remote_reset = true;
     cond.Signal();
@@ -179,12 +175,13 @@ class FakeDispatcher : public Dispatcher {
     return false;
   }
   void ms_fast_dispatch(Message *m) override {
-    Session *s = static_cast<Session*>(m->get_connection()->get_priv());
+    auto priv = m->get_connection()->get_priv();
+    auto s = static_cast<Session*>(priv.get());
     if (!s) {
       s = new Session(m->get_connection());
-      m->get_connection()->set_priv(s->get());
+      priv.reset(s, false);
+      m->get_connection()->set_priv(priv);
     }
-    s->put();
     s->count++;
     lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
     if (is_server) {
@@ -239,7 +236,7 @@ TEST_P(MessengerTest, SimpleTest) {
     cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(conn->is_connected());
-  ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
+  ASSERT_EQ(1, static_cast<Session*>(conn->get_priv().get())->get_count());
   ASSERT_TRUE(conn->peer_is_osd());
 
   // 2. test rebind port
@@ -258,7 +255,7 @@ TEST_P(MessengerTest, SimpleTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
 
   // 3. test markdown connection
   conn->mark_down();
@@ -285,7 +282,7 @@ TEST_P(MessengerTest, SimpleTest) {
     cli_dispatcher.got_new = false;
   }
   srv_dispatcher.loopback = false;
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   client_msgr->shutdown();
   client_msgr->wait();
   server_msgr->shutdown();
@@ -312,12 +309,12 @@ TEST_P(MessengerTest, NameAddrTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   ASSERT_TRUE(conn->get_peer_addr() == server_msgr->get_myaddr());
   ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
   // Make should server_conn is the one we already accepted from client,
   // so it means client_msgr has the same addr when server connection has
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   server_msgr->shutdown();
   client_msgr->shutdown();
   server_msgr->wait();
@@ -373,7 +370,7 @@ TEST_P(MessengerTest, FeatureTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
 
   server_msgr->shutdown();
   client_msgr->shutdown();
@@ -404,7 +401,7 @@ TEST_P(MessengerTest, TimeoutTest) {
     cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(conn->is_connected());
-  ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   ASSERT_TRUE(conn->peer_is_osd());
 
   // 2. wait for idle
@@ -445,12 +442,12 @@ TEST_P(MessengerTest, StatefulTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   conn->mark_down();
   ASSERT_FALSE(conn->is_connected());
   ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
   // don't lose state
-  ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
 
   srv_dispatcher.got_new = false;
   conn = client_msgr->get_connection(server_msgr->get_myinst());
@@ -462,7 +459,7 @@ TEST_P(MessengerTest, StatefulTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   server_conn = server_msgr->get_connection(client_msgr->get_myinst());
   {
     Mutex::Locker l(srv_dispatcher.lock);
@@ -503,9 +500,9 @@ TEST_P(MessengerTest, StatefulTest) {
     cli_dispatcher.got_new = false;
   }
   // resetcheck happen
-  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv())->get_count());
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   server_conn = server_msgr->get_connection(client_msgr->get_myinst());
-  ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv())->get_count());
+  ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
   cli_dispatcher.got_remote_reset = false;
 
   server_msgr->shutdown();
@@ -540,7 +537,7 @@ TEST_P(MessengerTest, StatelessTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   conn->mark_down();
   ASSERT_FALSE(conn->is_connected());
 
@@ -554,7 +551,7 @@ TEST_P(MessengerTest, StatelessTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
   // server lose state
   {
@@ -562,7 +559,7 @@ TEST_P(MessengerTest, StatelessTest) {
     while (!srv_dispatcher.got_new)
       srv_dispatcher.cond.Wait(srv_dispatcher.lock);
   }
-  ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv())->get_count());
+  ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
 
   // 2. test for client lossy
   server_conn->mark_down();
@@ -579,7 +576,7 @@ TEST_P(MessengerTest, StatelessTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
 
   server_msgr->shutdown();
   client_msgr->shutdown();
@@ -613,7 +610,7 @@ TEST_P(MessengerTest, ClientStandbyTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
   ASSERT_FALSE(cli_dispatcher.got_remote_reset);
   cli_dispatcher.got_connect = false;
@@ -644,9 +641,9 @@ TEST_P(MessengerTest, ClientStandbyTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   server_conn = server_msgr->get_connection(client_msgr->get_myinst());
-  ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
 
   server_msgr->shutdown();
   client_msgr->shutdown();
@@ -679,7 +676,7 @@ TEST_P(MessengerTest, AuthTest) {
     cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(conn->is_connected());
-  ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
 
   // 2. mix auth
   g_ceph_context->_conf->set_val("auth_cluster_required", "none");
@@ -697,7 +694,7 @@ TEST_P(MessengerTest, AuthTest) {
     cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(conn->is_connected());
-  ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
 
   server_msgr->shutdown();
   client_msgr->shutdown();