]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds,osd,mon,msg: use intrusive_ptr for holding Connection::priv 22292/head
authorKefu Chai <kchai@redhat.com>
Tue, 29 May 2018 07:51:07 +0000 (15:51 +0800)
committerKefu Chai <kchai@redhat.com>
Wed, 30 May 2018 17:32:56 +0000 (01:32 +0800)
See-also: http://tracker.ceph.com/issues/20924
Signed-off-by: Kefu Chai <kchai@redhat.com>
17 files changed:
src/common/RefCountedObj.h
src/mds/MDSDaemon.cc
src/mds/MDSRank.cc
src/messages/PaxosServiceMessage.h
src/mgr/DaemonServer.cc
src/mgr/DaemonState.cc
src/mon/MonOpRequest.h
src/mon/Monitor.cc
src/msg/Connection.h
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PrimaryLogPG.cc
src/osd/Watch.cc
src/osdc/Objecter.cc
src/test/mon/test_mon_workloadgen.cc
src/test/msgr/test_msgr.cc

index 9c0dad1d302f8574e1ec017622ce616b547f503a..325304c1441c30a1f1f7b69f80106c99c3d518c4 100644 (file)
@@ -20,6 +20,8 @@
 #include "common/ceph_context.h"
 #include "common/valgrind.h"
 
+#include <boost/smart_ptr/intrusive_ptr.hpp>
+
 // re-include our assert to clobber the system one; fix dout:
 #include "include/assert.h"
 
@@ -164,4 +166,6 @@ struct RefCountedWaitObject {
 void intrusive_ptr_add_ref(const RefCountedObject *p);
 void intrusive_ptr_release(const RefCountedObject *p);
 
+using RefCountedPtr = boost::intrusive_ptr<RefCountedObject>;
+
 #endif
index 847a9ea716fbf43051345fccd33f75b3a1b9a308..183f18e5f6ea77ec894327310606d6df8c0f675a 100644 (file)
@@ -578,7 +578,8 @@ void MDSDaemon::send_command_reply(MCommand *m, MDSRank *mds_rank,
                                   int r, bufferlist outbl,
                                   std::string_view outs)
 {
-  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
+  auto priv = m->get_connection()->get_priv();
+  auto session = static_cast<Session *>(priv.get());
   assert(session != NULL);
   // If someone is using a closed session for sending commands (e.g.
   // the ceph CLI) then we should feel free to clean up this connection
@@ -594,7 +595,7 @@ void MDSDaemon::send_command_reply(MCommand *m, MDSRank *mds_rank,
     assert(session->is_closed());
     session->connection->mark_disposable();
   }
-  session->put();
+  priv.reset();
 
   MCommandReply *reply = new MCommandReply(r, outs);
   reply->set_tid(m->get_tid());
@@ -605,7 +606,8 @@ void MDSDaemon::send_command_reply(MCommand *m, MDSRank *mds_rank,
 /* This function DOES put the passed message before returning*/
 void MDSDaemon::handle_command(MCommand *m)
 {
-  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
+  auto priv = m->get_connection()->get_priv();
+  auto session = static_cast<Session *>(priv.get());
   assert(session != NULL);
 
   int r = 0;
@@ -633,7 +635,7 @@ void MDSDaemon::handle_command(MCommand *m)
   } else {
     r = _handle_command(cmdmap, m, &outbl, &outs, &run_after, &need_reply);
   }
-  session->put();
+  priv.reset();
 
   if (need_reply) {
     send_command_reply(m, mds_rank, r, outbl, outs);
@@ -1247,14 +1249,13 @@ bool MDSDaemon::ms_handle_reset(Connection *con)
   if (beacon.get_want_state() == CEPH_MDS_STATE_DNE)
     return false;
 
-  Session *session = static_cast<Session *>(con->get_priv());
-  if (session) {
+  auto priv = con->get_priv();
+  if (auto session = static_cast<Session *>(priv.get()); session) {
     if (session->is_closed()) {
       dout(3) << "ms_handle_reset closing connection for session " << session->info.inst << dendl;
       con->mark_down();
-      con->set_priv(NULL);
+      con->set_priv(nullptr);
     }
-    session->put();
   } else {
     con->mark_down();
   }
@@ -1276,14 +1277,13 @@ void MDSDaemon::ms_handle_remote_reset(Connection *con)
   if (beacon.get_want_state() == CEPH_MDS_STATE_DNE)
     return;
 
-  Session *session = static_cast<Session *>(con->get_priv());
-  if (session) {
+  auto priv = con->get_priv();
+  if (auto session = static_cast<Session *>(priv.get()); session) {
     if (session->is_closed()) {
       dout(3) << "ms_handle_remote_reset closing connection for session " << session->info.inst << dendl;
       con->mark_down();
-      con->set_priv(NULL);
+      con->set_priv(nullptr);
     }
-    session->put();
   }
 }
 
@@ -1356,7 +1356,7 @@ bool MDSDaemon::ms_verify_authorizer(Connection *con, int peer_type,
       s->info.inst.addr = con->get_peer_addr();
       s->info.inst.name = n;
       dout(10) << " new session " << s << " for " << s->info.inst << " con " << con << dendl;
-      con->set_priv(s);
+      con->set_priv(RefCountedPtr{s, false});
       s->connection = con;
       if (mds_rank) {
         mds_rank->kick_waiters_for_any_client_connection();
@@ -1364,7 +1364,7 @@ bool MDSDaemon::ms_verify_authorizer(Connection *con, int peer_type,
     } else {
       dout(10) << " existing session " << s << " for " << s->info.inst << " existing con " << s->connection
               << ", new/authorizing con " << con << dendl;
-      con->set_priv(s->get());
+      con->set_priv(RefCountedPtr{s, false});
 
 
 
@@ -1420,7 +1420,8 @@ void MDSDaemon::ms_handle_accept(Connection *con)
     return;
   }
 
-  Session *s = static_cast<Session *>(con->get_priv());
+  auto priv = con->get_priv();
+  auto s = static_cast<Session *>(priv.get());
   dout(10) << "ms_handle_accept " << con->get_peer_addr() << " con " << con << " session " << s << dendl;
   if (s) {
     if (s->connection != con) {
@@ -1433,7 +1434,6 @@ void MDSDaemon::ms_handle_accept(Connection *con)
        s->preopen_out_queue.pop_front();
       }
     }
-    s->put();
   }
 }
 
index 93a0f4839f29af3e398682bfdc7330eb6fdfa28c..30f2122e3126074aa176063322deb972a70d9009 100644 (file)
@@ -899,9 +899,9 @@ bool MDSRank::is_stale_message(Message *m) const
 
 Session *MDSRank::get_session(Message *m)
 {
-  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
+  // do not carry ref
+  auto session = static_cast<Session *>(m->get_connection()->get_priv().get());
   if (session) {
-    session->put(); // do not carry ref
     dout(20) << "get_session have " << session << " " << session->info.inst
             << " state " << session->get_state_name() << dendl;
     // Check if we've imported an open session since (new sessions start closed)
@@ -1016,9 +1016,9 @@ void MDSRank::send_message_client_counted(Message *m, client_t client)
 
 void MDSRank::send_message_client_counted(Message *m, Connection *connection)
 {
-  Session *session = static_cast<Session *>(connection->get_priv());
+  // do not carry ref
+  auto session = static_cast<Session *>(m->get_connection()->get_priv().get());
   if (session) {
-    session->put();  // do not carry ref
     send_message_client_counted(m, session);
   } else {
     dout(10) << "send_message_client_counted has no session for " << m->get_source_inst() << dendl;
index daaf74745f9c656ed49ae40f2fd27af00ba5c5c5..a13dc938e8a3ea830da170015a3e11bba296bdbf 100644 (file)
@@ -59,10 +59,8 @@ class PaxosServiceMessage : public Message {
    * normally.
    */
   MonSession *get_session() {
-    MonSession *session = (MonSession *)get_connection()->get_priv();
-    if (session)
-      session->put();
-    return session;
+    auto priv = get_connection()->get_priv();
+    return static_cast<MonSession*>(priv.get());
   }
   
   const char *get_type_name() const override { return "PaxosServiceMessage"; }
index d976761542bc0e257f26d5a385aef1d4f530f0c0..36341b463b68e9bb6f28a373543520f2ecd9fa5e 100644 (file)
@@ -246,11 +246,11 @@ bool DaemonServer::ms_get_authorizer(int dest_type,
 bool DaemonServer::ms_handle_reset(Connection *con)
 {
   if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
-    MgrSessionRef session(static_cast<MgrSession*>(con->get_priv()));
+    auto priv = con->get_priv();
+    auto session = static_cast<MgrSession*>(priv.get());
     if (!session) {
       return false;
     }
-    session->put(); // SessionRef takes a ref
     Mutex::Locker l(lock);
     dout(10) << "unregistering osd." << session->osd_id
             << "  session " << session << " con " << con << dendl;
@@ -510,12 +510,12 @@ bool DaemonServer::handle_report(MMgrReport *m)
     {
       Mutex::Locker l(lock);
       // kill session
-      MgrSessionRef session(static_cast<MgrSession*>(m->get_connection()->get_priv()));
+      auto priv = m->get_connection()->get_priv();
+      auto session = static_cast<MgrSession*>(priv.get());
       if (!session) {
        return false;
       }
       m->get_connection()->mark_down();
-      session->put();
 
       dout(10) << "unregistering osd." << session->osd_id
               << "  session " << session << " con " << m->get_connection() << dendl;
@@ -721,11 +721,11 @@ bool DaemonServer::handle_command(MCommand *m)
 
   std::shared_ptr<CommandContext> cmdctx = std::make_shared<CommandContext>(m);
 
-  MgrSessionRef session(static_cast<MgrSession*>(m->get_connection()->get_priv()));
+  auto priv = m->get_connection()->get_priv();
+  auto session = static_cast<MgrSession*>(priv.get());
   if (!session) {
     return true;
   }
-  session->put(); // SessionRef takes a ref
   if (session->inst.name == entity_name_t())
     session->inst.name = m->get_source();
 
@@ -785,11 +785,11 @@ bool DaemonServer::handle_command(MCommand *m)
   bool is_allowed;
   if (!mgr_cmd) {
     MonCommand py_command = {"", "", "py", "rw", "cli"};
-    is_allowed = _allowed_command(session.get(), py_command.module,
+    is_allowed = _allowed_command(session, py_command.module,
       prefix, cmdctx->cmdmap, param_str_map, &py_command);
   } else {
     // validate user's permissions for requested command
-    is_allowed = _allowed_command(session.get(), mgr_cmd->module,
+    is_allowed = _allowed_command(session, mgr_cmd->module,
       prefix, cmdctx->cmdmap,  param_str_map, mgr_cmd);
   }
   if (!is_allowed) {
index e2f0922079f23d4d9915ab83a4dc45fd252f41df..0c4034072c33f66e68c680d79ebbef7012b20e99 100644 (file)
@@ -134,8 +134,8 @@ void DaemonPerfCounters::update(MMgrReport *report)
            << report->packed.length() << " bytes of data" << dendl;
 
   // Retrieve session state
-  MgrSessionRef session(static_cast<MgrSession*>(
-        report->get_connection()->get_priv()));
+  auto priv = report->get_connection()->get_priv();
+  auto session = static_cast<MgrSession*>(priv.get());
 
   // Load any newly declared types
   for (const auto &t : report->declare_types) {
index b31ee02a46e9cfffa559b23fceb50a2e4ca62fa9..364232b1f9d88e57126e93d217a00bae2597c5b6 100644 (file)
@@ -80,7 +80,7 @@ struct MonOpRequest : public TrackedOp {
 private:
   Message *request;
   utime_t dequeued_time;
-  MonSession *session;
+  RefCountedPtr session;
   ConnectionRef con;
   bool forwarded_to_leader;
   op_type_t op_type;
@@ -90,7 +90,6 @@ private:
       req->get_recv_stamp().is_zero() ?
       ceph_clock_now() : req->get_recv_stamp()),
     request(req),
-    session(NULL),
     con(NULL),
     forwarded_to_leader(false),
     op_type(OP_TYPE_NONE)
@@ -98,7 +97,7 @@ private:
     if (req) {
       con = req->get_connection();
       if (con) {
-        session = static_cast<MonSession*>(con->get_priv());
+        session = con->get_priv();
       }
     }
   }
@@ -128,15 +127,10 @@ protected:
 public:
   ~MonOpRequest() override {
     request->put();
-    // certain ops may not have a session (e.g., AUTH or PING)
-    if (session)
-      session->put();
   }
 
   MonSession *get_session() const {
-    if (!session)
-      return NULL;
-    return session;
+    return static_cast<MonSession*>(session.get());
   }
 
   template<class T>
@@ -153,16 +147,7 @@ public:
   ConnectionRef get_connection() { return con; }
 
   void set_session(MonSession *s) {
-    if (session) {
-      // we will be rewriting the existing session; drop the ref.
-      session->put();
-    }
-
-    if (s == NULL) {
-      session = NULL;
-    } else {
-      session = static_cast<MonSession*>(s->get());
-    }
+    session.reset(s);
   }
 
   bool is_src_mon() const {
index e77441f94249182bc1470e54fbe7ca55b7e2957a..4441e8c73ee2df22fda7279d52503bf66967e255 100644 (file)
@@ -2934,15 +2934,12 @@ void Monitor::handle_command(MonOpRequestRef op)
     return;
   }
 
-  MonSession *session = static_cast<MonSession *>(
-    m->get_connection()->get_priv());
+  auto priv = m->get_connection()->get_priv();
+  auto session = static_cast<MonSession *>(priv.get());
   if (!session) {
     dout(5) << __func__ << " dropping stray message " << *m << dendl;
     return;
   }
-  BOOST_SCOPE_EXIT_ALL(=) {
-    session->put();
-  };
 
   if (m->cmd.empty()) {
     string rs = "No command supplied";
@@ -3733,7 +3730,7 @@ void Monitor::handle_forward(MonOpRequestRef op)
     ConnectionRef c(new AnonConnection(cct));
     MonSession *s = new MonSession(req->get_source_inst(),
                                   static_cast<Connection*>(c.get()));
-    c->set_priv(s->get());
+    c->set_priv(RefCountedPtr{s, false});
     c->set_peer_addr(m->client.addr);
     c->set_peer_type(m->client.name.type());
     c->set_features(m->con_features);
@@ -3771,7 +3768,6 @@ void Monitor::handle_forward(MonOpRequestRef op)
     dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl;
 
     _ms_dispatch(req);
-    s->put();
   }
 }
 
@@ -3949,7 +3945,7 @@ void Monitor::remove_session(MonSession *s)
     routed_requests.erase(*p);
   }
   s->routed_request_tids.clear();
-  s->con->set_priv(NULL);
+  s->con->set_priv(nullptr);
   session_map.remove_session(s);
   logger->set(l_mon_num_sessions, session_map.get_size());
   logger->inc(l_mon_session_rm);
@@ -4077,7 +4073,7 @@ void Monitor::_ms_dispatch(Message *m)
       s = session_map.new_session(m->get_source_inst(), con.get());
     }
     assert(s);
-    con->set_priv(s->get());
+    con->set_priv(RefCountedPtr{s, false});
     dout(10) << __func__ << " new session " << s << " " << *s
             << " features 0x" << std::hex
             << s->con_features << std::dec << dendl;
@@ -4093,7 +4089,6 @@ void Monitor::_ms_dispatch(Message *m)
         s->caps = *mon_caps;
       s->authenticated = true;
     }
-    s->put();
   } else {
     dout(20) << __func__ << " existing session " << s << " for " << s->inst
             << dendl;
@@ -4955,12 +4950,13 @@ bool Monitor::ms_handle_reset(Connection *con)
   if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
     return false;
 
-  MonSession *s = static_cast<MonSession *>(con->get_priv());
+  auto priv = con->get_priv();
+  auto s = static_cast<MonSession*>(priv.get());
   if (!s)
     return false;
 
   // break any con <-> session ref cycle
-  s->con->set_priv(NULL);
+  s->con->set_priv(nullptr);
 
   if (is_shutdown())
     return false;
@@ -4972,7 +4968,6 @@ bool Monitor::ms_handle_reset(Connection *con)
     Mutex::Locker l(session_map_lock);
     remove_session(s);
   }
-  s->put();
   return true;
 }
 
index b8ffbbf9b30818175830b524f28d3099cd0d73c8..a19092b93eb6f9c734256f67d230467e367d1dea 100644 (file)
@@ -41,7 +41,7 @@ class Messenger;
 struct Connection : public RefCountedObject {
   mutable Mutex lock;
   Messenger *msgr;
-  RefCountedObject *priv;
+  RefCountedPtr priv;
   int peer_type;
   entity_addr_t peer_addr;
   utime_t last_keepalive, last_keepalive_ack;
@@ -63,7 +63,6 @@ public:
     : RefCountedObject(cct, 0),
       lock("Connection::lock"),
       msgr(m),
-      priv(NULL),
       peer_type(-1),
       features(0),
       failed(false),
@@ -72,24 +71,16 @@ public:
 
   ~Connection() override {
     //generic_dout(0) << "~Connection " << this << dendl;
-    if (priv) {
-      //generic_dout(0) << "~Connection " << this << " dropping priv " << priv << dendl;
-      priv->put();
-    }
   }
 
-  void set_priv(RefCountedObject *o) {
+  void set_priv(const RefCountedPtr& o) {
     Mutex::Locker l(lock);
-    if (priv)
-      priv->put();
     priv = o;
   }
 
-  RefCountedObject *get_priv() {
+  RefCountedPtr get_priv() {
     Mutex::Locker l(lock);
-    if (priv)
-      return priv->get();
-    return NULL;
+    return priv;
   }
 
   /**
index 68cfa7d5bc0a99b531179b48b92d225339ccc350..8a90d9d0370afd738fd1129a98c399fd047edf9c 100644 (file)
@@ -4278,12 +4278,12 @@ void OSD::_add_heartbeat_peer(int p)
       return;
     hi = &heartbeat_peers[p];
     hi->peer = p;
-    HeartbeatSession *s = new HeartbeatSession(p);
+    RefCountedPtr s{new HeartbeatSession{p}, false};
     hi->con_back = cons.first.get();
-    hi->con_back->set_priv(s->get());
+    hi->con_back->set_priv(s);
     if (cons.second) {
       hi->con_front = cons.second.get();
-      hi->con_front->set_priv(s->get());
+      hi->con_front->set_priv(s);
       dout(10) << "_add_heartbeat_peer: new peer osd." << p
               << " " << hi->con_back->get_peer_addr()
               << " " << hi->con_front->get_peer_addr()
@@ -4294,7 +4294,6 @@ void OSD::_add_heartbeat_peer(int p)
               << " " << hi->con_back->get_peer_addr()
               << dendl;
     }
-    s->put();
   } else {
     hi = &i->second;
   }
@@ -4729,15 +4728,15 @@ void OSD::heartbeat()
 
 bool OSD::heartbeat_reset(Connection *con)
 {
-  HeartbeatSession *s = static_cast<HeartbeatSession*>(con->get_priv());
+  auto s = con->get_priv();
   if (s) {
     heartbeat_lock.Lock();
     if (is_stopping()) {
       heartbeat_lock.Unlock();
-      s->put();
       return true;
     }
-    map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(s->peer);
+    auto heartbeat_session = static_cast<HeartbeatSession*>(s.get());
+    auto p = heartbeat_peers.find(heartbeat_session->peer);
     if (p != heartbeat_peers.end() &&
        (p->second.con_back == con ||
         p->second.con_front == con)) {
@@ -4754,10 +4753,10 @@ bool OSD::heartbeat_reset(Connection *con)
       pair<ConnectionRef,ConnectionRef> newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch);
       if (newcon.first) {
        p->second.con_back = newcon.first.get();
-       p->second.con_back->set_priv(s->get());
+       p->second.con_back->set_priv(s);
        if (newcon.second) {
          p->second.con_front = newcon.second.get();
-         p->second.con_front->set_priv(s->get());
+         p->second.con_front->set_priv(s);
        }
       } else {
        dout(10) << "heartbeat_reset failed hb con " << con << " for osd." << p->second.peer
@@ -4768,7 +4767,6 @@ bool OSD::heartbeat_reset(Connection *con)
       dout(10) << "heartbeat_reset closing (old) failed hb con " << con << dendl;
     }
     heartbeat_lock.Unlock();
-    s->put();
   }
   return true;
 }
@@ -5135,10 +5133,11 @@ void OSD::ms_handle_fast_connect(Connection *con)
 {
   if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON &&
       con->get_peer_type() != CEPH_ENTITY_TYPE_MGR) {
-    Session *s = static_cast<Session*>(con->get_priv());
+    auto priv = con->get_priv();
+    auto s = static_cast<Session*>(priv.get());
     if (!s) {
-      s = new Session(cct);
-      con->set_priv(s->get());
+      s = new Session{cct};
+      con->set_priv(RefCountedPtr{s, false});
       s->con = con;
       dout(10) << " new session (outgoing) " << s << " con=" << s->con
           << " addr=" << s->con->get_peer_addr() << dendl;
@@ -5146,7 +5145,6 @@ void OSD::ms_handle_fast_connect(Connection *con)
       assert(con->get_peer_type() == CEPH_ENTITY_TYPE_OSD);
       s->entity_name.set_type(CEPH_ENTITY_TYPE_OSD);
     }
-    s->put();
   }
 }
 
@@ -5154,10 +5152,11 @@ void OSD::ms_handle_fast_accept(Connection *con)
 {
   if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON &&
       con->get_peer_type() != CEPH_ENTITY_TYPE_MGR) {
-    Session *s = static_cast<Session*>(con->get_priv());
+    auto priv = con->get_priv();
+    auto s = static_cast<Session*>(priv.get());
     if (!s) {
-      s = new Session(cct);
-      con->set_priv(s->get());
+      s = new Session{cct};
+      con->set_priv(RefCountedPtr{s, false});
       s->con = con;
       dout(10) << "new session (incoming)" << s << " con=" << con
           << " addr=" << con->get_peer_addr()
@@ -5165,23 +5164,23 @@ void OSD::ms_handle_fast_accept(Connection *con)
       assert(con->get_peer_type() == CEPH_ENTITY_TYPE_OSD);
       s->entity_name.set_type(CEPH_ENTITY_TYPE_OSD);
     }
-    s->put();
   }
 }
 
 bool OSD::ms_handle_reset(Connection *con)
 {
-  Session *session = static_cast<Session*>(con->get_priv());
+  auto s = con->get_priv();
+  auto session = static_cast<Session*>(s.get());
   dout(2) << "ms_handle_reset con " << con << " session " << session << dendl;
   if (!session)
     return false;
   session->wstate.reset(con);
-  session->con.reset(NULL);  // break con <-> session ref cycle
+  session->con->set_priv(nullptr);
+  session->con.reset();  // break con <-> session ref cycle
   // note that we break session->con *before* the session_handle_reset
   // cleanup below.  this avoids a race between us and
   // PG::add_backoff, Session::check_backoff, etc.
-  session_handle_reset(session);
-  session->put();
+  session_handle_reset(SessionRef{session});
   return true;
 }
 
@@ -5190,7 +5189,8 @@ bool OSD::ms_handle_refused(Connection *con)
   if (!cct->_conf->osd_fast_fail_on_connection_refused)
     return false;
 
-  Session *session = static_cast<Session*>(con->get_priv());
+  auto priv = con->get_priv();
+  auto session = static_cast<Session*>(priv.get());
   dout(2) << "ms_handle_refused con " << con << " session " << session << dendl;
   if (!session)
     return false;
@@ -5212,7 +5212,6 @@ bool OSD::ms_handle_refused(Connection *con)
       }
     }
   }
-  session->put();
   return true;
 }
 
@@ -5365,11 +5364,9 @@ void OSD::_send_boot()
     cluster_messenger->set_addr_unknowns(cluster_addr);
     dout(10) << " assuming cluster_addr ip matches client_addr" << dendl;
   } else {
-    Session *s = static_cast<Session*>(local_connection->get_priv());
-    if (s)
-      s->put();
-    else
+    if (auto session = local_connection->get_priv(); !session) {
       cluster_messenger->ms_deliver_handle_fast_connect(local_connection);
+    }
   }
 
   entity_addr_t hb_back_addr = hb_back_server_messenger->get_myaddr();
@@ -5381,11 +5378,9 @@ void OSD::_send_boot()
     hb_back_server_messenger->set_addr_unknowns(hb_back_addr);
     dout(10) << " assuming hb_back_addr ip matches cluster_addr" << dendl;
   } else {
-    Session *s = static_cast<Session*>(local_connection->get_priv());
-    if (s)
-      s->put();
-    else
+    if (auto session = local_connection->get_priv(); !session) {
       hb_back_server_messenger->ms_deliver_handle_fast_connect(local_connection);
+    }
   }
 
   entity_addr_t hb_front_addr = hb_front_server_messenger->get_myaddr();
@@ -5397,11 +5392,9 @@ void OSD::_send_boot()
     hb_front_server_messenger->set_addr_unknowns(hb_front_addr);
     dout(10) << " assuming hb_front_addr ip matches client_addr" << dendl;
   } else {
-    Session *s = static_cast<Session*>(local_connection->get_priv());
-    if (s)
-      s->put();
-    else
+    if (auto session = local_connection->get_priv(); !session) {
       hb_front_server_messenger->ms_deliver_handle_fast_connect(local_connection);
+    }
   }
 
   MOSDBoot *mboot = new MOSDBoot(superblock, get_osdmap_epoch(), service.get_boot_epoch(),
@@ -5614,7 +5607,8 @@ void OSD::handle_command(MMonCommand *m)
 void OSD::handle_command(MCommand *m)
 {
   ConnectionRef con = m->get_connection();
-  Session *session = static_cast<Session *>(con->get_priv());
+  auto priv = con->get_priv();
+  auto session = static_cast<Session *>(priv.get());
   if (!session) {
     con->send_message(new MCommandReply(m, -EPERM));
     m->put();
@@ -5622,7 +5616,7 @@ void OSD::handle_command(MCommand *m)
   }
 
   OSDCap& caps = session->caps;
-  session->put();
+  priv.reset();
 
   if (!caps.allow_all() || m->get_source().is_mon()) {
     con->send_message(new MCommandReply(m, -EPERM));
@@ -6345,7 +6339,7 @@ void OSD::maybe_share_map(
   op->check_send_map = false;
 }
 
-void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
+void OSD::dispatch_session_waiting(SessionRef session, OSDMapRef osdmap)
 {
   assert(session->session_dispatch_lock.is_locked());
 
@@ -6463,17 +6457,14 @@ void OSD::ms_fast_dispatch(Message *m)
     // legacy client, and this is an MOSDOp (the *only* fast dispatch
     // message that didn't have an explicit spg_t); we need to map
     // them to an spg_t while preserving delivery order.
-    Session *session = static_cast<Session*>(m->get_connection()->get_priv());
-    if (session) {
-      {
-       Mutex::Locker l(session->session_dispatch_lock);
-       op->get();
-       session->waiting_on_map.push_back(*op);
-       OSDMapRef nextmap = service.get_nextmap_reserved();
-       dispatch_session_waiting(session, nextmap);
-       service.release_map(nextmap);
-      }
-      session->put();
+    auto priv = m->get_connection()->get_priv();
+    if (auto session = static_cast<Session*>(priv.get()); session) {
+      Mutex::Locker l{session->session_dispatch_lock};
+      op->get();
+      session->waiting_on_map.push_back(*op);
+      OSDMapRef nextmap = service.get_nextmap_reserved();
+      dispatch_session_waiting(session, nextmap);
+      service.release_map(nextmap);
     }
   }
   OID_EVENT_TRACE_WITH_MSG(m, "MS_FAST_DISPATCH_END", false); 
@@ -6484,12 +6475,11 @@ void OSD::ms_fast_preprocess(Message *m)
   if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
     if (m->get_type() == CEPH_MSG_OSD_MAP) {
       MOSDMap *mm = static_cast<MOSDMap*>(m);
-      Session *s = static_cast<Session*>(m->get_connection()->get_priv());
-      if (s) {
+      auto priv = m->get_connection()->get_priv();
+      if (auto s = static_cast<Session*>(priv.get()); s) {
        s->received_map_lock.lock();
        s->received_map_epoch = mm->get_last();
        s->received_map_lock.unlock();
-       s->put();
       }
     }
   }
@@ -6562,12 +6552,14 @@ bool OSD::ms_verify_authorizer(Connection *con, int peer_type,
   }
 
   if (isvalid) {
-    Session *s = static_cast<Session *>(con->get_priv());
+    auto priv = con->get_priv();
+    auto s = static_cast<Session*>(priv.get());
     if (!s) {
-      s = new Session(cct);
-      con->set_priv(s->get());
+      s = new Session{cct};
+      con->set_priv(RefCountedPtr{s, false});
       s->con = con;
-      dout(10) << " new session " << s << " con=" << s->con << " addr=" << s->con->get_peer_addr() << dendl;
+      dout(10) << " new session " << s << " con=" << s->con
+              << " addr=" << con->get_peer_addr() << dendl;
     }
 
     s->entity_name = name;
@@ -6594,8 +6586,6 @@ bool OSD::ms_verify_authorizer(Connection *con, int peer_type,
         isvalid = false;
       }
     }
-
-    s->put();
   }
   return true;
 }
@@ -7131,18 +7121,16 @@ void OSD::handle_osd_map(MOSDMap *m)
     return;
   }
 
-  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
-  if (session && !(session->entity_name.is_mon() ||
+  auto priv = m->get_connection()->get_priv();
+  if (auto session = static_cast<Session *>(priv.get());
+      session && !(session->entity_name.is_mon() ||
                   session->entity_name.is_osd())) {
     //not enough perms!
     dout(10) << "got osd map from Session " << session
              << " which we can't take maps from (not a mon or osd)" << dendl;
     m->put();
-    session->put();
     return;
   }
-  if (session)
-    session->put();
 
   // share with the objecter
   if (!is_preboot())
@@ -7981,15 +7969,15 @@ bool OSD::require_same_peer_instance(const Message *m, OSDMapRef& map,
            << dendl;
     ConnectionRef con = m->get_connection();
     con->mark_down();
-    Session *s = static_cast<Session*>(con->get_priv());
-    if (s) {
+    auto priv = con->get_priv();
+    if (auto s = static_cast<Session*>(priv.get()); s) {
       if (!is_fast_dispatch)
        s->session_dispatch_lock.Lock();
       clear_session_waiting_on_map(s);
-      con->set_priv(NULL);   // break ref <-> session cycle, if any
+      con->set_priv(nullptr);   // break ref <-> session cycle, if any
+      s->con.reset();
       if (!is_fast_dispatch)
        s->session_dispatch_lock.Unlock();
-      s->put();
     }
     return false;
   }
@@ -8851,11 +8839,9 @@ void OSD::dequeue_op(
 
   logger->tinc(l_osd_op_before_dequeue_op_lat, latency);
 
-  Session *session = static_cast<Session *>(
-    op->get_req()->get_connection()->get_priv());
-  if (session) {
+  auto priv = op->get_req()->get_connection()->get_priv();
+  if (auto session = static_cast<Session *>(priv.get()); session) {
     maybe_share_map(session, op, pg->get_osdmap());
-    session->put();
   }
 
   if (pg->is_deleting())
@@ -9807,11 +9793,9 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
               << ", dropping " << qi << dendl;
       // share map with client?
       if (boost::optional<OpRequestRef> _op = qi.maybe_get_op()) {
-       Session *session = static_cast<Session *>(
-         (*_op)->get_req()->get_connection()->get_priv());
-       if (session) {
+       auto priv = (*_op)->get_req()->get_connection()->get_priv();
+       if (auto session = static_cast<Session *>(priv.get()); session) {
          osd->maybe_share_map(session, *_op, sdata->shard_osdmap);
-         session->put();
        }
       }
       unsigned pushes_to_free = qi.get_reserved_pushes();
index 7f35605e80cc5215ed52933e8ba50759101f0d1e..bcc6a7ab19bd2634a2e32fa9ccef1f083e9c3405 100644 (file)
@@ -1435,44 +1435,37 @@ private:
 
   // -- sessions --
 private:
-  void dispatch_session_waiting(Session *session, OSDMapRef osdmap);
+  void dispatch_session_waiting(SessionRef session, OSDMapRef osdmap);
   void maybe_share_map(Session *session, OpRequestRef op, OSDMapRef osdmap);
 
   Mutex session_waiting_lock;
-  set<Session*> session_waiting_for_map;
+  set<SessionRef> session_waiting_for_map;
 
   /// Caller assumes refs for included Sessions
-  void get_sessions_waiting_for_map(set<Session*> *out) {
+  void get_sessions_waiting_for_map(set<SessionRef> *out) {
     Mutex::Locker l(session_waiting_lock);
     out->swap(session_waiting_for_map);
   }
-  void register_session_waiting_on_map(Session *session) {
+  void register_session_waiting_on_map(SessionRef session) {
     Mutex::Locker l(session_waiting_lock);
-    if (session_waiting_for_map.insert(session).second) {
-      session->get();
-    }
+    session_waiting_for_map.insert(session);
   }
-  void clear_session_waiting_on_map(Session *session) {
+  void clear_session_waiting_on_map(SessionRef session) {
     Mutex::Locker l(session_waiting_lock);
-    set<Session*>::iterator i = session_waiting_for_map.find(session);
-    if (i != session_waiting_for_map.end()) {
-      (*i)->put();
-      session_waiting_for_map.erase(i);
-    }
+    session_waiting_for_map.erase(session);
   }
   void dispatch_sessions_waiting_on_map() {
-    set<Session*> sessions_to_check;
+    set<SessionRef> sessions_to_check;
     get_sessions_waiting_for_map(&sessions_to_check);
-    for (set<Session*>::iterator i = sessions_to_check.begin();
+    for (auto i = sessions_to_check.begin();
         i != sessions_to_check.end();
         sessions_to_check.erase(i++)) {
-      (*i)->session_dispatch_lock.Lock();
-      dispatch_session_waiting(*i, osdmap);
-      (*i)->session_dispatch_lock.Unlock();
-      (*i)->put();
+      Mutex::Locker l{(*i)->session_dispatch_lock};
+      SessionRef session = *i;
+      dispatch_session_waiting(session, osdmap);
     }
   }
-  void session_handle_reset(Session *session) {
+  void session_handle_reset(SessionRef session) {
     Mutex::Locker l(session->session_dispatch_lock);
     clear_session_waiting_on_map(session);
 
index c2d6092fb6fcfa4b4b57ec15a60f2ffa6b6d866d..346d25d81de38c6a54077e4663a02e77c0037138 100644 (file)
@@ -2091,13 +2091,14 @@ bool PG::op_has_sufficient_caps(OpRequestRef& op)
 
   const MOSDOp *req = static_cast<const MOSDOp*>(op->get_req());
 
-  Session *session = static_cast<Session*>(req->get_connection()->get_priv());
+  auto priv = req->get_connection()->get_priv();
+  auto session = static_cast<Session*>(priv.get());
   if (!session) {
     dout(0) << "op_has_sufficient_caps: no session for op " << *req << dendl;
     return false;
   }
   OSDCap& caps = session->caps;
-  session->put();
+  priv.reset();
 
   const string &key = req->get_hobj().get_key().empty() ?
     req->get_oid().name :
index fcb0cdf22a0f09502e80b92b4fc9e897372d4d46..e9eed66be5ea11945a619cc4d1472b6c1043757b 100644 (file)
@@ -1682,10 +1682,9 @@ void PrimaryLogPG::get_src_oloc(const object_t& oid, const object_locator_t& olo
 void PrimaryLogPG::handle_backoff(OpRequestRef& op)
 {
   const MOSDBackoff *m = static_cast<const MOSDBackoff*>(op->get_req());
-  SessionRef session = static_cast<Session*>(m->get_connection()->get_priv());
+  SessionRef session{static_cast<Session*>(m->get_connection()->get_priv().get())};
   if (!session)
     return;  // drop it.
-  session->put();  // get_priv takes a ref, and so does the SessionRef
   hobject_t begin = info.pgid.pgid.get_hobj_start();
   hobject_t end = info.pgid.pgid.get_hobj_end(pool.info.get_pg_num());
   if (begin < m->begin) {
@@ -1734,10 +1733,9 @@ void PrimaryLogPG::do_request(
   const Message *m = op->get_req();
   int msg_type = m->get_type();
   if (m->get_connection()->has_feature(CEPH_FEATURE_RADOS_BACKOFF)) {
-    SessionRef session = static_cast<Session*>(m->get_connection()->get_priv());
+    SessionRef session{static_cast<Session*>(m->get_connection()->get_priv().get())};
     if (!session)
       return;  // drop it.
-    session->put();  // get_priv takes a ref, and so does the SessionRef
 
     if (msg_type == CEPH_MSG_OSD_OP) {
       if (session->check_backoff(cct, info.pgid,
@@ -1926,12 +1924,11 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
     m->get_connection()->has_feature(CEPH_FEATURE_RADOS_BACKOFF);
   SessionRef session;
   if (can_backoff) {
-    session = static_cast<Session*>(m->get_connection()->get_priv());
+    session = static_cast<Session*>(m->get_connection()->get_priv().get());
     if (!session.get()) {
       dout(10) << __func__ << " no session" << dendl;
       return;
     }
-    session->put();  // get_priv() takes a ref, and so does the intrusive_ptr
 
     if (session->check_backoff(cct, info.pgid, head, m)) {
       return;
@@ -8106,10 +8103,9 @@ void PrimaryLogPG::do_osd_op_effects(OpContext *ctx, const ConnectionRef& conn)
 
   assert(conn);
 
-  boost::intrusive_ptr<Session> session((Session *)conn->get_priv());
-  if (!session.get())
+  auto session = conn->get_priv();
+  if (!session)
     return;
-  session->put();  // get_priv() takes a ref, and so does the intrusive_ptr
 
   for (list<pair<watch_info_t,bool> >::iterator i = ctx->watch_connects.begin();
        i != ctx->watch_connects.end();
index ef91e9e427bb0f7ca20ba9d790c946327730e4d1..d07af3faf0c2202312fc2838c9f60b14514a10dd 100644 (file)
@@ -367,10 +367,11 @@ void Watch::connect(ConnectionRef con, bool _will_ping)
   dout(10) << __func__ << " con " << con << dendl;
   conn = con;
   will_ping = _will_ping;
-  Session* sessionref(static_cast<Session*>(con->get_priv()));
-  if (sessionref) {
+  auto priv = con->get_priv();
+  if (priv) {
+    auto sessionref = static_cast<Session*>(priv.get());
     sessionref->wstate.addWatch(self.lock());
-    sessionref->put();
+    priv.reset();
     for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
         i != in_progress_notifies.end();
         ++i) {
@@ -413,10 +414,9 @@ void Watch::discard_state()
   unregister_cb();
   discarded = true;
   if (conn) {
-    Session* sessionref(static_cast<Session*>(conn->get_priv()));
-    if (sessionref) {
-      sessionref->wstate.removeWatch(self.lock());
-      sessionref->put();
+    if (auto priv = conn->get_priv(); priv) {
+      auto session = static_cast<Session*>(priv.get());
+      session->wstate.removeWatch(self.lock());
     }
     conn = ConnectionRef();
   }
index 5b06b8269a520f022953a323c6db375fc1f13904..62accf4ed4a079908f665daa5cc20d3242647c75 100644 (file)
@@ -1802,7 +1802,7 @@ int Objecter::_get_session(int osd, OSDSession **session, shunique_lock& sul)
   OSDSession *s = new OSDSession(cct, osd);
   osd_sessions[osd] = s;
   s->con = messenger->get_connection(osdmap->get_inst(osd));
-  s->con->set_priv(s->get());
+  s->con->set_priv(RefCountedPtr{s});
   logger->inc(l_osdc_osd_session_open);
   logger->set(l_osdc_osd_sessions, osd_sessions.size());
   s->get();
@@ -1845,7 +1845,7 @@ void Objecter::_reopen_session(OSDSession *s)
     logger->inc(l_osdc_osd_session_close);
   }
   s->con = messenger->get_connection(inst);
-  s->con->set_priv(s->get());
+  s->con->set_priv(RefCountedPtr{s});
   s->incarnation++;
   logger->inc(l_osdc_osd_session_open);
 }
@@ -3364,12 +3364,10 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
   }
 
   ConnectionRef con = m->get_connection();
-  OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+  auto priv = con->get_priv();
+  auto s = static_cast<OSDSession*>(priv.get());
   if (!s || s->con != con) {
     ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
-    if (s) {
-      s->put();
-    }
     m->put();
     return;
   }
@@ -3383,7 +3381,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
                                                    " onnvram" : " ack"))
                  << " ... stray" << dendl;
     sl.unlock();
-    s->put();
     m->put();
     return;
   }
@@ -3406,7 +3403,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
     }
     _session_op_remove(s, op);
     sl.unlock();
-    s->put();
 
     _op_submit(op, sul, NULL);
     m->put();
@@ -3422,7 +3418,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
                    << op->session->con->get_peer_addr() << dendl;
       m->put();
       sl.unlock();
-      s->put();
       return;
     }
   } else {
@@ -3441,7 +3436,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
       num_in_flight--;
     _session_op_remove(s, op);
     sl.unlock();
-    s->put();
 
     // FIXME: two redirects could race and reorder
 
@@ -3460,7 +3454,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
       num_in_flight--;
     _session_op_remove(s, op);
     sl.unlock();
-    s->put();
 
     op->tid = 0;
     op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
@@ -3554,7 +3547,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
   }
 
   m->put();
-  s->put();
 }
 
 void Objecter::handle_osd_backoff(MOSDBackoff *m)
@@ -3567,17 +3559,15 @@ void Objecter::handle_osd_backoff(MOSDBackoff *m)
   }
 
   ConnectionRef con = m->get_connection();
-  OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+  auto priv = con->get_priv();
+  auto s = static_cast<OSDSession*>(priv.get());
   if (!s || s->con != con) {
     ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
-    if (s)
-      s->put();
     m->put();
     return;
   }
 
   get_session(s);
-  s->put();  // from get_priv() above
 
   OSDSession::unique_lock sl(s->lock);
 
@@ -4429,7 +4419,8 @@ bool Objecter::ms_handle_reset(Connection *con)
   if (!initialized)
     return false;
   if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
-    OSDSession *session = static_cast<OSDSession*>(con->get_priv());
+    auto priv = con->get_priv();
+    auto session = static_cast<OSDSession*>(priv.get());
     if (session) {
       ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
                    << " osd." << session->osd << dendl;
@@ -4446,7 +4437,6 @@ bool Objecter::ms_handle_reset(Connection *con)
       _linger_ops_resend(lresend, wl);
       wl.unlock();
       maybe_request_map();
-      session->put();
     }
     return true;
   }
@@ -4763,12 +4753,11 @@ void Objecter::handle_command_reply(MCommandReply *m)
   }
 
   ConnectionRef con = m->get_connection();
-  OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+  auto priv = con->get_priv();
+  auto s = static_cast<OSDSession*>(priv.get());
   if (!s || s->con != con) {
     ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
     m->put();
-    if (s)
-      s->put();
     return;
   }
 
@@ -4779,7 +4768,6 @@ void Objecter::handle_command_reply(MCommandReply *m)
                   << " not found" << dendl;
     m->put();
     sl.unlock();
-    s->put();
     return;
   }
 
@@ -4792,7 +4780,6 @@ void Objecter::handle_command_reply(MCommandReply *m)
                   << dendl;
     m->put();
     sl.unlock();
-    s->put();
     return;
   }
   if (c->poutbl) {
@@ -4806,7 +4793,6 @@ void Objecter::handle_command_reply(MCommandReply *m)
   sul.unlock();
 
   m->put();
-  s->put();
 }
 
 void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
index ed7012339f7d2af43a4177a8084ce672a7a8f242..7306ba334698e9a062a02bfba070f7d12ea5ebed 100644 (file)
@@ -898,11 +898,7 @@ class OSDStub : public TestStub
 
   bool ms_handle_reset(Connection *con) override {
     dout(1) << __func__ << dendl;
-    Session *session = (Session *)con->get_priv();
-    if (!session)
-      return false;
-    session->put();
-    return true;
+    return con->get_priv().get();
   }
 
   bool ms_handle_refused(Connection *con) override {
index a027aa6fcc9a17e2a0f5159af4a62a11affa1130..772c5a0200cae0fd97117c5df7d4ba2c8e76a4f8 100644 (file)
@@ -115,32 +115,30 @@ class FakeDispatcher : public Dispatcher {
   void ms_handle_fast_connect(Connection *con) override {
     lock.Lock();
     lderr(g_ceph_context) << __func__ << " " << con << dendl;
-    Session *s = static_cast<Session*>(con->get_priv());
+    auto s = con->get_priv();
     if (!s) {
-      s = new Session(con);
-      con->set_priv(s->get());
-      lderr(g_ceph_context) << __func__ << " con: " << con << " count: " << s->count << dendl;
+      auto session = new Session(con);
+      con->set_priv(RefCountedPtr{session, false});
+      lderr(g_ceph_context) << __func__ << " con: " << con
+                           << " count: " << session->count << dendl;
     }
-    s->put();
     got_connect = true;
     cond.Signal();
     lock.Unlock();
   }
   void ms_handle_fast_accept(Connection *con) override {
-    Session *s = static_cast<Session*>(con->get_priv());
-    if (!s) {
-      s = new Session(con);
-      con->set_priv(s->get());
+    if (!con->get_priv()) {
+      con->set_priv(RefCountedPtr{new Session(con), false});
     }
-    s->put();
   }
   bool ms_dispatch(Message *m) override {
-    Session *s = static_cast<Session*>(m->get_connection()->get_priv());
+    auto priv = m->get_connection()->get_priv();
+    auto s = static_cast<Session*>(priv.get());
     if (!s) {
       s = new Session(m->get_connection());
-      m->get_connection()->set_priv(s->get());
+      priv.reset(s, false);
+      m->get_connection()->set_priv(priv);
     }
-    s->put();
     s->count++;
     lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
     if (is_server) {
@@ -155,22 +153,20 @@ class FakeDispatcher : public Dispatcher {
   bool ms_handle_reset(Connection *con) override {
     Mutex::Locker l(lock);
     lderr(g_ceph_context) << __func__ << " " << con << dendl;
-    Session *s = static_cast<Session*>(con->get_priv());
-    if (s) {
-      s->con.reset(NULL);  // break con <-> session ref cycle
-      con->set_priv(NULL);   // break ref <-> session cycle, if any
-      s->put();
+    auto priv = con->get_priv();
+    if (auto s = static_cast<Session*>(priv.get()); s) {
+      s->con.reset();  // break con <-> session ref cycle
+      con->set_priv(nullptr);   // break ref <-> session cycle, if any
     }
     return true;
   }
   void ms_handle_remote_reset(Connection *con) override {
     Mutex::Locker l(lock);
     lderr(g_ceph_context) << __func__ << " " << con << dendl;
-    Session *s = static_cast<Session*>(con->get_priv());
-    if (s) {
-      s->con.reset(NULL);  // break con <-> session ref cycle
-      con->set_priv(NULL);   // break ref <-> session cycle, if any
-      s->put();
+    auto priv = con->get_priv();
+    if (auto s = static_cast<Session*>(priv.get()); s) {
+      s->con.reset();  // break con <-> session ref cycle
+      con->set_priv(nullptr);   // break ref <-> session cycle, if any
     }
     got_remote_reset = true;
     cond.Signal();
@@ -179,12 +175,13 @@ class FakeDispatcher : public Dispatcher {
     return false;
   }
   void ms_fast_dispatch(Message *m) override {
-    Session *s = static_cast<Session*>(m->get_connection()->get_priv());
+    auto priv = m->get_connection()->get_priv();
+    auto s = static_cast<Session*>(priv.get());
     if (!s) {
       s = new Session(m->get_connection());
-      m->get_connection()->set_priv(s->get());
+      priv.reset(s, false);
+      m->get_connection()->set_priv(priv);
     }
-    s->put();
     s->count++;
     lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
     if (is_server) {
@@ -238,7 +235,7 @@ TEST_P(MessengerTest, SimpleTest) {
     cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(conn->is_connected());
-  ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
+  ASSERT_EQ(1, static_cast<Session*>(conn->get_priv().get())->get_count());
   ASSERT_TRUE(conn->peer_is_osd());
 
   // 2. test rebind port
@@ -257,7 +254,7 @@ TEST_P(MessengerTest, SimpleTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
 
   // 3. test markdown connection
   conn->mark_down();
@@ -284,7 +281,7 @@ TEST_P(MessengerTest, SimpleTest) {
     cli_dispatcher.got_new = false;
   }
   srv_dispatcher.loopback = false;
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   client_msgr->shutdown();
   client_msgr->wait();
   server_msgr->shutdown();
@@ -311,12 +308,12 @@ TEST_P(MessengerTest, NameAddrTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   ASSERT_TRUE(conn->get_peer_addr() == server_msgr->get_myaddr());
   ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
   // Make should server_conn is the one we already accepted from client,
   // so it means client_msgr has the same addr when server connection has
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   server_msgr->shutdown();
   client_msgr->shutdown();
   server_msgr->wait();
@@ -372,7 +369,7 @@ TEST_P(MessengerTest, FeatureTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
 
   server_msgr->shutdown();
   client_msgr->shutdown();
@@ -403,7 +400,7 @@ TEST_P(MessengerTest, TimeoutTest) {
     cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(conn->is_connected());
-  ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   ASSERT_TRUE(conn->peer_is_osd());
 
   // 2. wait for idle
@@ -444,12 +441,12 @@ TEST_P(MessengerTest, StatefulTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   conn->mark_down();
   ASSERT_FALSE(conn->is_connected());
   ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
   // don't lose state
-  ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
 
   srv_dispatcher.got_new = false;
   conn = client_msgr->get_connection(server_msgr->get_myinst());
@@ -461,7 +458,7 @@ TEST_P(MessengerTest, StatefulTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   server_conn = server_msgr->get_connection(client_msgr->get_myinst());
   {
     Mutex::Locker l(srv_dispatcher.lock);
@@ -502,9 +499,9 @@ TEST_P(MessengerTest, StatefulTest) {
     cli_dispatcher.got_new = false;
   }
   // resetcheck happen
-  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv())->get_count());
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   server_conn = server_msgr->get_connection(client_msgr->get_myinst());
-  ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv())->get_count());
+  ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
   cli_dispatcher.got_remote_reset = false;
 
   server_msgr->shutdown();
@@ -539,7 +536,7 @@ TEST_P(MessengerTest, StatelessTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   conn->mark_down();
   ASSERT_FALSE(conn->is_connected());
 
@@ -553,7 +550,7 @@ TEST_P(MessengerTest, StatelessTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
   // server lose state
   {
@@ -561,7 +558,7 @@ TEST_P(MessengerTest, StatelessTest) {
     while (!srv_dispatcher.got_new)
       srv_dispatcher.cond.Wait(srv_dispatcher.lock);
   }
-  ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv())->get_count());
+  ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
 
   // 2. test for client lossy
   server_conn->mark_down();
@@ -578,7 +575,7 @@ TEST_P(MessengerTest, StatelessTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
 
   server_msgr->shutdown();
   client_msgr->shutdown();
@@ -612,7 +609,7 @@ TEST_P(MessengerTest, ClientStandbyTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
   ASSERT_FALSE(cli_dispatcher.got_remote_reset);
   cli_dispatcher.got_connect = false;
@@ -643,9 +640,9 @@ TEST_P(MessengerTest, ClientStandbyTest) {
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   server_conn = server_msgr->get_connection(client_msgr->get_myinst());
-  ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
 
   server_msgr->shutdown();
   client_msgr->shutdown();
@@ -678,7 +675,7 @@ TEST_P(MessengerTest, AuthTest) {
     cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(conn->is_connected());
-  ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
 
   // 2. mix auth
   g_ceph_context->_conf->set_val("auth_cluster_required", "none");
@@ -696,7 +693,7 @@ TEST_P(MessengerTest, AuthTest) {
     cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(conn->is_connected());
-  ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
 
   server_msgr->shutdown();
   client_msgr->shutdown();