#include "common/ceph_context.h"
 #include "common/valgrind.h"
 
+#include <boost/smart_ptr/intrusive_ptr.hpp>
+
 // re-include our assert to clobber the system one; fix dout:
 #include "include/assert.h"
 
 void intrusive_ptr_add_ref(const RefCountedObject *p);
 void intrusive_ptr_release(const RefCountedObject *p);
 
+using RefCountedPtr = boost::intrusive_ptr<RefCountedObject>;
+
 #endif
 
                                   int r, bufferlist outbl,
                                   std::string_view outs)
 {
-  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
+  auto priv = m->get_connection()->get_priv();
+  auto session = static_cast<Session *>(priv.get());
   assert(session != NULL);
   // If someone is using a closed session for sending commands (e.g.
   // the ceph CLI) then we should feel free to clean up this connection
     assert(session->is_closed());
     session->connection->mark_disposable();
   }
-  session->put();
+  priv.reset();
 
   MCommandReply *reply = new MCommandReply(r, outs);
   reply->set_tid(m->get_tid());
 /* This function DOES put the passed message before returning*/
 void MDSDaemon::handle_command(MCommand *m)
 {
-  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
+  auto priv = m->get_connection()->get_priv();
+  auto session = static_cast<Session *>(priv.get());
   assert(session != NULL);
 
   int r = 0;
   } else {
     r = _handle_command(cmdmap, m, &outbl, &outs, &run_after, &need_reply);
   }
-  session->put();
+  priv.reset();
 
   if (need_reply) {
     send_command_reply(m, mds_rank, r, outbl, outs);
   if (beacon.get_want_state() == CEPH_MDS_STATE_DNE)
     return false;
 
-  Session *session = static_cast<Session *>(con->get_priv());
-  if (session) {
+  auto priv = con->get_priv();
+  if (auto session = static_cast<Session *>(priv.get()); session) {
     if (session->is_closed()) {
       dout(3) << "ms_handle_reset closing connection for session " << session->info.inst << dendl;
       con->mark_down();
-      con->set_priv(NULL);
+      con->set_priv(nullptr);
     }
-    session->put();
   } else {
     con->mark_down();
   }
   if (beacon.get_want_state() == CEPH_MDS_STATE_DNE)
     return;
 
-  Session *session = static_cast<Session *>(con->get_priv());
-  if (session) {
+  auto priv = con->get_priv();
+  if (auto session = static_cast<Session *>(priv.get()); session) {
     if (session->is_closed()) {
       dout(3) << "ms_handle_remote_reset closing connection for session " << session->info.inst << dendl;
       con->mark_down();
-      con->set_priv(NULL);
+      con->set_priv(nullptr);
     }
-    session->put();
   }
 }
 
       s->info.inst.addr = con->get_peer_addr();
       s->info.inst.name = n;
       dout(10) << " new session " << s << " for " << s->info.inst << " con " << con << dendl;
-      con->set_priv(s);
+      con->set_priv(RefCountedPtr{s, false});
       s->connection = con;
       if (mds_rank) {
         mds_rank->kick_waiters_for_any_client_connection();
     } else {
       dout(10) << " existing session " << s << " for " << s->info.inst << " existing con " << s->connection
               << ", new/authorizing con " << con << dendl;
-      con->set_priv(s->get());
+      con->set_priv(RefCountedPtr{s, false});
 
 
 
     return;
   }
 
-  Session *s = static_cast<Session *>(con->get_priv());
+  auto priv = con->get_priv();
+  auto s = static_cast<Session *>(priv.get());
   dout(10) << "ms_handle_accept " << con->get_peer_addr() << " con " << con << " session " << s << dendl;
   if (s) {
     if (s->connection != con) {
        s->preopen_out_queue.pop_front();
       }
     }
-    s->put();
   }
 }
 
 
 
 Session *MDSRank::get_session(Message *m)
 {
-  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
+  // do not carry ref
+  auto session = static_cast<Session *>(m->get_connection()->get_priv().get());
   if (session) {
-    session->put(); // do not carry ref
     dout(20) << "get_session have " << session << " " << session->info.inst
             << " state " << session->get_state_name() << dendl;
     // Check if we've imported an open session since (new sessions start closed)
 
 void MDSRank::send_message_client_counted(Message *m, Connection *connection)
 {
-  Session *session = static_cast<Session *>(connection->get_priv());
+  // do not carry ref
+  auto session = static_cast<Session *>(m->get_connection()->get_priv().get());
   if (session) {
-    session->put();  // do not carry ref
     send_message_client_counted(m, session);
   } else {
     dout(10) << "send_message_client_counted has no session for " << m->get_source_inst() << dendl;
 
    * normally.
    */
   MonSession *get_session() {
-    MonSession *session = (MonSession *)get_connection()->get_priv();
-    if (session)
-      session->put();
-    return session;
+    auto priv = get_connection()->get_priv();
+    return static_cast<MonSession*>(priv.get());
   }
   
   const char *get_type_name() const override { return "PaxosServiceMessage"; }
 
 bool DaemonServer::ms_handle_reset(Connection *con)
 {
   if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
-    MgrSessionRef session(static_cast<MgrSession*>(con->get_priv()));
+    auto priv = con->get_priv();
+    auto session = static_cast<MgrSession*>(priv.get());
     if (!session) {
       return false;
     }
-    session->put(); // SessionRef takes a ref
     Mutex::Locker l(lock);
     dout(10) << "unregistering osd." << session->osd_id
             << "  session " << session << " con " << con << dendl;
     {
       Mutex::Locker l(lock);
       // kill session
-      MgrSessionRef session(static_cast<MgrSession*>(m->get_connection()->get_priv()));
+      auto priv = m->get_connection()->get_priv();
+      auto session = static_cast<MgrSession*>(priv.get());
       if (!session) {
        return false;
       }
       m->get_connection()->mark_down();
-      session->put();
 
       dout(10) << "unregistering osd." << session->osd_id
               << "  session " << session << " con " << m->get_connection() << dendl;
 
   std::shared_ptr<CommandContext> cmdctx = std::make_shared<CommandContext>(m);
 
-  MgrSessionRef session(static_cast<MgrSession*>(m->get_connection()->get_priv()));
+  auto priv = m->get_connection()->get_priv();
+  auto session = static_cast<MgrSession*>(priv.get());
   if (!session) {
     return true;
   }
-  session->put(); // SessionRef takes a ref
   if (session->inst.name == entity_name_t())
     session->inst.name = m->get_source();
 
   bool is_allowed;
   if (!mgr_cmd) {
     MonCommand py_command = {"", "", "py", "rw", "cli"};
-    is_allowed = _allowed_command(session.get(), py_command.module,
+    is_allowed = _allowed_command(session, py_command.module,
       prefix, cmdctx->cmdmap, param_str_map, &py_command);
   } else {
     // validate user's permissions for requested command
-    is_allowed = _allowed_command(session.get(), mgr_cmd->module,
+    is_allowed = _allowed_command(session, mgr_cmd->module,
       prefix, cmdctx->cmdmap,  param_str_map, mgr_cmd);
   }
   if (!is_allowed) {
 
            << report->packed.length() << " bytes of data" << dendl;
 
   // Retrieve session state
-  MgrSessionRef session(static_cast<MgrSession*>(
-        report->get_connection()->get_priv()));
+  auto priv = report->get_connection()->get_priv();
+  auto session = static_cast<MgrSession*>(priv.get());
 
   // Load any newly declared types
   for (const auto &t : report->declare_types) {
 
 private:
   Message *request;
   utime_t dequeued_time;
-  MonSession *session;
+  RefCountedPtr session;
   ConnectionRef con;
   bool forwarded_to_leader;
   op_type_t op_type;
       req->get_recv_stamp().is_zero() ?
       ceph_clock_now() : req->get_recv_stamp()),
     request(req),
-    session(NULL),
     con(NULL),
     forwarded_to_leader(false),
     op_type(OP_TYPE_NONE)
     if (req) {
       con = req->get_connection();
       if (con) {
-        session = static_cast<MonSession*>(con->get_priv());
+        session = con->get_priv();
       }
     }
   }
 public:
   ~MonOpRequest() override {
     request->put();
-    // certain ops may not have a session (e.g., AUTH or PING)
-    if (session)
-      session->put();
   }
 
   MonSession *get_session() const {
-    if (!session)
-      return NULL;
-    return session;
+    return static_cast<MonSession*>(session.get());
   }
 
   template<class T>
   ConnectionRef get_connection() { return con; }
 
   void set_session(MonSession *s) {
-    if (session) {
-      // we will be rewriting the existing session; drop the ref.
-      session->put();
-    }
-
-    if (s == NULL) {
-      session = NULL;
-    } else {
-      session = static_cast<MonSession*>(s->get());
-    }
+    session.reset(s);
   }
 
   bool is_src_mon() const {
 
     return;
   }
 
-  MonSession *session = static_cast<MonSession *>(
-    m->get_connection()->get_priv());
+  auto priv = m->get_connection()->get_priv();
+  auto session = static_cast<MonSession *>(priv.get());
   if (!session) {
     dout(5) << __func__ << " dropping stray message " << *m << dendl;
     return;
   }
-  BOOST_SCOPE_EXIT_ALL(=) {
-    session->put();
-  };
 
   if (m->cmd.empty()) {
     string rs = "No command supplied";
     ConnectionRef c(new AnonConnection(cct));
     MonSession *s = new MonSession(req->get_source_inst(),
                                   static_cast<Connection*>(c.get()));
-    c->set_priv(s->get());
+    c->set_priv(RefCountedPtr{s, false});
     c->set_peer_addr(m->client.addr);
     c->set_peer_type(m->client.name.type());
     c->set_features(m->con_features);
     dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl;
 
     _ms_dispatch(req);
-    s->put();
   }
 }
 
     routed_requests.erase(*p);
   }
   s->routed_request_tids.clear();
-  s->con->set_priv(NULL);
+  s->con->set_priv(nullptr);
   session_map.remove_session(s);
   logger->set(l_mon_num_sessions, session_map.get_size());
   logger->inc(l_mon_session_rm);
       s = session_map.new_session(m->get_source_inst(), con.get());
     }
     assert(s);
-    con->set_priv(s->get());
+    con->set_priv(RefCountedPtr{s, false});
     dout(10) << __func__ << " new session " << s << " " << *s
             << " features 0x" << std::hex
             << s->con_features << std::dec << dendl;
         s->caps = *mon_caps;
       s->authenticated = true;
     }
-    s->put();
   } else {
     dout(20) << __func__ << " existing session " << s << " for " << s->inst
             << dendl;
   if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
     return false;
 
-  MonSession *s = static_cast<MonSession *>(con->get_priv());
+  auto priv = con->get_priv();
+  auto s = static_cast<MonSession*>(priv.get());
   if (!s)
     return false;
 
   // break any con <-> session ref cycle
-  s->con->set_priv(NULL);
+  s->con->set_priv(nullptr);
 
   if (is_shutdown())
     return false;
     Mutex::Locker l(session_map_lock);
     remove_session(s);
   }
-  s->put();
   return true;
 }
 
 
 struct Connection : public RefCountedObject {
   mutable Mutex lock;
   Messenger *msgr;
-  RefCountedObject *priv;
+  RefCountedPtr priv;
   int peer_type;
   entity_addr_t peer_addr;
   utime_t last_keepalive, last_keepalive_ack;
     : RefCountedObject(cct, 0),
       lock("Connection::lock"),
       msgr(m),
-      priv(NULL),
       peer_type(-1),
       features(0),
       failed(false),
 
   ~Connection() override {
     //generic_dout(0) << "~Connection " << this << dendl;
-    if (priv) {
-      //generic_dout(0) << "~Connection " << this << " dropping priv " << priv << dendl;
-      priv->put();
-    }
   }
 
-  void set_priv(RefCountedObject *o) {
+  void set_priv(const RefCountedPtr& o) {
     Mutex::Locker l(lock);
-    if (priv)
-      priv->put();
     priv = o;
   }
 
-  RefCountedObject *get_priv() {
+  RefCountedPtr get_priv() {
     Mutex::Locker l(lock);
-    if (priv)
-      return priv->get();
-    return NULL;
+    return priv;
   }
 
   /**
 
       return;
     hi = &heartbeat_peers[p];
     hi->peer = p;
-    HeartbeatSession *s = new HeartbeatSession(p);
+    RefCountedPtr s{new HeartbeatSession{p}, false};
     hi->con_back = cons.first.get();
-    hi->con_back->set_priv(s->get());
+    hi->con_back->set_priv(s);
     if (cons.second) {
       hi->con_front = cons.second.get();
-      hi->con_front->set_priv(s->get());
+      hi->con_front->set_priv(s);
       dout(10) << "_add_heartbeat_peer: new peer osd." << p
               << " " << hi->con_back->get_peer_addr()
               << " " << hi->con_front->get_peer_addr()
               << " " << hi->con_back->get_peer_addr()
               << dendl;
     }
-    s->put();
   } else {
     hi = &i->second;
   }
 
 bool OSD::heartbeat_reset(Connection *con)
 {
-  HeartbeatSession *s = static_cast<HeartbeatSession*>(con->get_priv());
+  auto s = con->get_priv();
   if (s) {
     heartbeat_lock.Lock();
     if (is_stopping()) {
       heartbeat_lock.Unlock();
-      s->put();
       return true;
     }
-    map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(s->peer);
+    auto heartbeat_session = static_cast<HeartbeatSession*>(s.get());
+    auto p = heartbeat_peers.find(heartbeat_session->peer);
     if (p != heartbeat_peers.end() &&
        (p->second.con_back == con ||
         p->second.con_front == con)) {
       pair<ConnectionRef,ConnectionRef> newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch);
       if (newcon.first) {
        p->second.con_back = newcon.first.get();
-       p->second.con_back->set_priv(s->get());
+       p->second.con_back->set_priv(s);
        if (newcon.second) {
          p->second.con_front = newcon.second.get();
-         p->second.con_front->set_priv(s->get());
+         p->second.con_front->set_priv(s);
        }
       } else {
        dout(10) << "heartbeat_reset failed hb con " << con << " for osd." << p->second.peer
       dout(10) << "heartbeat_reset closing (old) failed hb con " << con << dendl;
     }
     heartbeat_lock.Unlock();
-    s->put();
   }
   return true;
 }
 {
   if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON &&
       con->get_peer_type() != CEPH_ENTITY_TYPE_MGR) {
-    Session *s = static_cast<Session*>(con->get_priv());
+    auto priv = con->get_priv();
+    auto s = static_cast<Session*>(priv.get());
     if (!s) {
-      s = new Session(cct);
-      con->set_priv(s->get());
+      s = new Session{cct};
+      con->set_priv(RefCountedPtr{s, false});
       s->con = con;
       dout(10) << " new session (outgoing) " << s << " con=" << s->con
           << " addr=" << s->con->get_peer_addr() << dendl;
       assert(con->get_peer_type() == CEPH_ENTITY_TYPE_OSD);
       s->entity_name.set_type(CEPH_ENTITY_TYPE_OSD);
     }
-    s->put();
   }
 }
 
 {
   if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON &&
       con->get_peer_type() != CEPH_ENTITY_TYPE_MGR) {
-    Session *s = static_cast<Session*>(con->get_priv());
+    auto priv = con->get_priv();
+    auto s = static_cast<Session*>(priv.get());
     if (!s) {
-      s = new Session(cct);
-      con->set_priv(s->get());
+      s = new Session{cct};
+      con->set_priv(RefCountedPtr{s, false});
       s->con = con;
       dout(10) << "new session (incoming)" << s << " con=" << con
           << " addr=" << con->get_peer_addr()
       assert(con->get_peer_type() == CEPH_ENTITY_TYPE_OSD);
       s->entity_name.set_type(CEPH_ENTITY_TYPE_OSD);
     }
-    s->put();
   }
 }
 
 bool OSD::ms_handle_reset(Connection *con)
 {
-  Session *session = static_cast<Session*>(con->get_priv());
+  auto s = con->get_priv();
+  auto session = static_cast<Session*>(s.get());
   dout(2) << "ms_handle_reset con " << con << " session " << session << dendl;
   if (!session)
     return false;
   session->wstate.reset(con);
-  session->con.reset(NULL);  // break con <-> session ref cycle
+  session->con->set_priv(nullptr);
+  session->con.reset();  // break con <-> session ref cycle
   // note that we break session->con *before* the session_handle_reset
   // cleanup below.  this avoids a race between us and
   // PG::add_backoff, Session::check_backoff, etc.
-  session_handle_reset(session);
-  session->put();
+  session_handle_reset(SessionRef{session});
   return true;
 }
 
   if (!cct->_conf->osd_fast_fail_on_connection_refused)
     return false;
 
-  Session *session = static_cast<Session*>(con->get_priv());
+  auto priv = con->get_priv();
+  auto session = static_cast<Session*>(priv.get());
   dout(2) << "ms_handle_refused con " << con << " session " << session << dendl;
   if (!session)
     return false;
       }
     }
   }
-  session->put();
   return true;
 }
 
     cluster_messenger->set_addr_unknowns(cluster_addr);
     dout(10) << " assuming cluster_addr ip matches client_addr" << dendl;
   } else {
-    Session *s = static_cast<Session*>(local_connection->get_priv());
-    if (s)
-      s->put();
-    else
+    if (auto session = local_connection->get_priv(); !session) {
       cluster_messenger->ms_deliver_handle_fast_connect(local_connection);
+    }
   }
 
   entity_addr_t hb_back_addr = hb_back_server_messenger->get_myaddr();
     hb_back_server_messenger->set_addr_unknowns(hb_back_addr);
     dout(10) << " assuming hb_back_addr ip matches cluster_addr" << dendl;
   } else {
-    Session *s = static_cast<Session*>(local_connection->get_priv());
-    if (s)
-      s->put();
-    else
+    if (auto session = local_connection->get_priv(); !session) {
       hb_back_server_messenger->ms_deliver_handle_fast_connect(local_connection);
+    }
   }
 
   entity_addr_t hb_front_addr = hb_front_server_messenger->get_myaddr();
     hb_front_server_messenger->set_addr_unknowns(hb_front_addr);
     dout(10) << " assuming hb_front_addr ip matches client_addr" << dendl;
   } else {
-    Session *s = static_cast<Session*>(local_connection->get_priv());
-    if (s)
-      s->put();
-    else
+    if (auto session = local_connection->get_priv(); !session) {
       hb_front_server_messenger->ms_deliver_handle_fast_connect(local_connection);
+    }
   }
 
   MOSDBoot *mboot = new MOSDBoot(superblock, get_osdmap_epoch(), service.get_boot_epoch(),
 void OSD::handle_command(MCommand *m)
 {
   ConnectionRef con = m->get_connection();
-  Session *session = static_cast<Session *>(con->get_priv());
+  auto priv = con->get_priv();
+  auto session = static_cast<Session *>(priv.get());
   if (!session) {
     con->send_message(new MCommandReply(m, -EPERM));
     m->put();
   }
 
   OSDCap& caps = session->caps;
-  session->put();
+  priv.reset();
 
   if (!caps.allow_all() || m->get_source().is_mon()) {
     con->send_message(new MCommandReply(m, -EPERM));
   op->check_send_map = false;
 }
 
-void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
+void OSD::dispatch_session_waiting(SessionRef session, OSDMapRef osdmap)
 {
   assert(session->session_dispatch_lock.is_locked());
 
     // legacy client, and this is an MOSDOp (the *only* fast dispatch
     // message that didn't have an explicit spg_t); we need to map
     // them to an spg_t while preserving delivery order.
-    Session *session = static_cast<Session*>(m->get_connection()->get_priv());
-    if (session) {
-      {
-       Mutex::Locker l(session->session_dispatch_lock);
-       op->get();
-       session->waiting_on_map.push_back(*op);
-       OSDMapRef nextmap = service.get_nextmap_reserved();
-       dispatch_session_waiting(session, nextmap);
-       service.release_map(nextmap);
-      }
-      session->put();
+    auto priv = m->get_connection()->get_priv();
+    if (auto session = static_cast<Session*>(priv.get()); session) {
+      Mutex::Locker l{session->session_dispatch_lock};
+      op->get();
+      session->waiting_on_map.push_back(*op);
+      OSDMapRef nextmap = service.get_nextmap_reserved();
+      dispatch_session_waiting(session, nextmap);
+      service.release_map(nextmap);
     }
   }
   OID_EVENT_TRACE_WITH_MSG(m, "MS_FAST_DISPATCH_END", false); 
   if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
     if (m->get_type() == CEPH_MSG_OSD_MAP) {
       MOSDMap *mm = static_cast<MOSDMap*>(m);
-      Session *s = static_cast<Session*>(m->get_connection()->get_priv());
-      if (s) {
+      auto priv = m->get_connection()->get_priv();
+      if (auto s = static_cast<Session*>(priv.get()); s) {
        s->received_map_lock.lock();
        s->received_map_epoch = mm->get_last();
        s->received_map_lock.unlock();
-       s->put();
       }
     }
   }
   }
 
   if (isvalid) {
-    Session *s = static_cast<Session *>(con->get_priv());
+    auto priv = con->get_priv();
+    auto s = static_cast<Session*>(priv.get());
     if (!s) {
-      s = new Session(cct);
-      con->set_priv(s->get());
+      s = new Session{cct};
+      con->set_priv(RefCountedPtr{s, false});
       s->con = con;
-      dout(10) << " new session " << s << " con=" << s->con << " addr=" << s->con->get_peer_addr() << dendl;
+      dout(10) << " new session " << s << " con=" << s->con
+              << " addr=" << con->get_peer_addr() << dendl;
     }
 
     s->entity_name = name;
         isvalid = false;
       }
     }
-
-    s->put();
   }
   return true;
 }
     return;
   }
 
-  Session *session = static_cast<Session *>(m->get_connection()->get_priv());
-  if (session && !(session->entity_name.is_mon() ||
+  auto priv = m->get_connection()->get_priv();
+  if (auto session = static_cast<Session *>(priv.get());
+      session && !(session->entity_name.is_mon() ||
                   session->entity_name.is_osd())) {
     //not enough perms!
     dout(10) << "got osd map from Session " << session
              << " which we can't take maps from (not a mon or osd)" << dendl;
     m->put();
-    session->put();
     return;
   }
-  if (session)
-    session->put();
 
   // share with the objecter
   if (!is_preboot())
            << dendl;
     ConnectionRef con = m->get_connection();
     con->mark_down();
-    Session *s = static_cast<Session*>(con->get_priv());
-    if (s) {
+    auto priv = con->get_priv();
+    if (auto s = static_cast<Session*>(priv.get()); s) {
       if (!is_fast_dispatch)
        s->session_dispatch_lock.Lock();
       clear_session_waiting_on_map(s);
-      con->set_priv(NULL);   // break ref <-> session cycle, if any
+      con->set_priv(nullptr);   // break ref <-> session cycle, if any
+      s->con.reset();
       if (!is_fast_dispatch)
        s->session_dispatch_lock.Unlock();
-      s->put();
     }
     return false;
   }
 
   logger->tinc(l_osd_op_before_dequeue_op_lat, latency);
 
-  Session *session = static_cast<Session *>(
-    op->get_req()->get_connection()->get_priv());
-  if (session) {
+  auto priv = op->get_req()->get_connection()->get_priv();
+  if (auto session = static_cast<Session *>(priv.get()); session) {
     maybe_share_map(session, op, pg->get_osdmap());
-    session->put();
   }
 
   if (pg->is_deleting())
               << ", dropping " << qi << dendl;
       // share map with client?
       if (boost::optional<OpRequestRef> _op = qi.maybe_get_op()) {
-       Session *session = static_cast<Session *>(
-         (*_op)->get_req()->get_connection()->get_priv());
-       if (session) {
+       auto priv = (*_op)->get_req()->get_connection()->get_priv();
+       if (auto session = static_cast<Session *>(priv.get()); session) {
          osd->maybe_share_map(session, *_op, sdata->shard_osdmap);
-         session->put();
        }
       }
       unsigned pushes_to_free = qi.get_reserved_pushes();
 
 
   // -- sessions --
 private:
-  void dispatch_session_waiting(Session *session, OSDMapRef osdmap);
+  void dispatch_session_waiting(SessionRef session, OSDMapRef osdmap);
   void maybe_share_map(Session *session, OpRequestRef op, OSDMapRef osdmap);
 
   Mutex session_waiting_lock;
-  set<Session*> session_waiting_for_map;
+  set<SessionRef> session_waiting_for_map;
 
   /// Caller assumes refs for included Sessions
-  void get_sessions_waiting_for_map(set<Session*> *out) {
+  void get_sessions_waiting_for_map(set<SessionRef> *out) {
     Mutex::Locker l(session_waiting_lock);
     out->swap(session_waiting_for_map);
   }
-  void register_session_waiting_on_map(Session *session) {
+  void register_session_waiting_on_map(SessionRef session) {
     Mutex::Locker l(session_waiting_lock);
-    if (session_waiting_for_map.insert(session).second) {
-      session->get();
-    }
+    session_waiting_for_map.insert(session);
   }
-  void clear_session_waiting_on_map(Session *session) {
+  void clear_session_waiting_on_map(SessionRef session) {
     Mutex::Locker l(session_waiting_lock);
-    set<Session*>::iterator i = session_waiting_for_map.find(session);
-    if (i != session_waiting_for_map.end()) {
-      (*i)->put();
-      session_waiting_for_map.erase(i);
-    }
+    session_waiting_for_map.erase(session);
   }
   void dispatch_sessions_waiting_on_map() {
-    set<Session*> sessions_to_check;
+    set<SessionRef> sessions_to_check;
     get_sessions_waiting_for_map(&sessions_to_check);
-    for (set<Session*>::iterator i = sessions_to_check.begin();
+    for (auto i = sessions_to_check.begin();
         i != sessions_to_check.end();
         sessions_to_check.erase(i++)) {
-      (*i)->session_dispatch_lock.Lock();
-      dispatch_session_waiting(*i, osdmap);
-      (*i)->session_dispatch_lock.Unlock();
-      (*i)->put();
+      Mutex::Locker l{(*i)->session_dispatch_lock};
+      SessionRef session = *i;
+      dispatch_session_waiting(session, osdmap);
     }
   }
-  void session_handle_reset(Session *session) {
+  void session_handle_reset(SessionRef session) {
     Mutex::Locker l(session->session_dispatch_lock);
     clear_session_waiting_on_map(session);
 
 
 
   const MOSDOp *req = static_cast<const MOSDOp*>(op->get_req());
 
-  Session *session = static_cast<Session*>(req->get_connection()->get_priv());
+  auto priv = req->get_connection()->get_priv();
+  auto session = static_cast<Session*>(priv.get());
   if (!session) {
     dout(0) << "op_has_sufficient_caps: no session for op " << *req << dendl;
     return false;
   }
   OSDCap& caps = session->caps;
-  session->put();
+  priv.reset();
 
   const string &key = req->get_hobj().get_key().empty() ?
     req->get_oid().name :
 
 void PrimaryLogPG::handle_backoff(OpRequestRef& op)
 {
   const MOSDBackoff *m = static_cast<const MOSDBackoff*>(op->get_req());
-  SessionRef session = static_cast<Session*>(m->get_connection()->get_priv());
+  SessionRef session{static_cast<Session*>(m->get_connection()->get_priv().get())};
   if (!session)
     return;  // drop it.
-  session->put();  // get_priv takes a ref, and so does the SessionRef
   hobject_t begin = info.pgid.pgid.get_hobj_start();
   hobject_t end = info.pgid.pgid.get_hobj_end(pool.info.get_pg_num());
   if (begin < m->begin) {
   const Message *m = op->get_req();
   int msg_type = m->get_type();
   if (m->get_connection()->has_feature(CEPH_FEATURE_RADOS_BACKOFF)) {
-    SessionRef session = static_cast<Session*>(m->get_connection()->get_priv());
+    SessionRef session{static_cast<Session*>(m->get_connection()->get_priv().get())};
     if (!session)
       return;  // drop it.
-    session->put();  // get_priv takes a ref, and so does the SessionRef
 
     if (msg_type == CEPH_MSG_OSD_OP) {
       if (session->check_backoff(cct, info.pgid,
     m->get_connection()->has_feature(CEPH_FEATURE_RADOS_BACKOFF);
   SessionRef session;
   if (can_backoff) {
-    session = static_cast<Session*>(m->get_connection()->get_priv());
+    session = static_cast<Session*>(m->get_connection()->get_priv().get());
     if (!session.get()) {
       dout(10) << __func__ << " no session" << dendl;
       return;
     }
-    session->put();  // get_priv() takes a ref, and so does the intrusive_ptr
 
     if (session->check_backoff(cct, info.pgid, head, m)) {
       return;
 
   assert(conn);
 
-  boost::intrusive_ptr<Session> session((Session *)conn->get_priv());
-  if (!session.get())
+  auto session = conn->get_priv();
+  if (!session)
     return;
-  session->put();  // get_priv() takes a ref, and so does the intrusive_ptr
 
   for (list<pair<watch_info_t,bool> >::iterator i = ctx->watch_connects.begin();
        i != ctx->watch_connects.end();
 
   dout(10) << __func__ << " con " << con << dendl;
   conn = con;
   will_ping = _will_ping;
-  Session* sessionref(static_cast<Session*>(con->get_priv()));
-  if (sessionref) {
+  auto priv = con->get_priv();
+  if (priv) {
+    auto sessionref = static_cast<Session*>(priv.get());
     sessionref->wstate.addWatch(self.lock());
-    sessionref->put();
+    priv.reset();
     for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
         i != in_progress_notifies.end();
         ++i) {
   unregister_cb();
   discarded = true;
   if (conn) {
-    Session* sessionref(static_cast<Session*>(conn->get_priv()));
-    if (sessionref) {
-      sessionref->wstate.removeWatch(self.lock());
-      sessionref->put();
+    if (auto priv = conn->get_priv(); priv) {
+      auto session = static_cast<Session*>(priv.get());
+      session->wstate.removeWatch(self.lock());
     }
     conn = ConnectionRef();
   }
 
   OSDSession *s = new OSDSession(cct, osd);
   osd_sessions[osd] = s;
   s->con = messenger->get_connection(osdmap->get_inst(osd));
-  s->con->set_priv(s->get());
+  s->con->set_priv(RefCountedPtr{s});
   logger->inc(l_osdc_osd_session_open);
   logger->set(l_osdc_osd_sessions, osd_sessions.size());
   s->get();
     logger->inc(l_osdc_osd_session_close);
   }
   s->con = messenger->get_connection(inst);
-  s->con->set_priv(s->get());
+  s->con->set_priv(RefCountedPtr{s});
   s->incarnation++;
   logger->inc(l_osdc_osd_session_open);
 }
   }
 
   ConnectionRef con = m->get_connection();
-  OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+  auto priv = con->get_priv();
+  auto s = static_cast<OSDSession*>(priv.get());
   if (!s || s->con != con) {
     ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
-    if (s) {
-      s->put();
-    }
     m->put();
     return;
   }
                                                    " onnvram" : " ack"))
                  << " ... stray" << dendl;
     sl.unlock();
-    s->put();
     m->put();
     return;
   }
     }
     _session_op_remove(s, op);
     sl.unlock();
-    s->put();
 
     _op_submit(op, sul, NULL);
     m->put();
                    << op->session->con->get_peer_addr() << dendl;
       m->put();
       sl.unlock();
-      s->put();
       return;
     }
   } else {
       num_in_flight--;
     _session_op_remove(s, op);
     sl.unlock();
-    s->put();
 
     // FIXME: two redirects could race and reorder
 
       num_in_flight--;
     _session_op_remove(s, op);
     sl.unlock();
-    s->put();
 
     op->tid = 0;
     op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
   }
 
   m->put();
-  s->put();
 }
 
 void Objecter::handle_osd_backoff(MOSDBackoff *m)
   }
 
   ConnectionRef con = m->get_connection();
-  OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+  auto priv = con->get_priv();
+  auto s = static_cast<OSDSession*>(priv.get());
   if (!s || s->con != con) {
     ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
-    if (s)
-      s->put();
     m->put();
     return;
   }
 
   get_session(s);
-  s->put();  // from get_priv() above
 
   OSDSession::unique_lock sl(s->lock);
 
   if (!initialized)
     return false;
   if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
-    OSDSession *session = static_cast<OSDSession*>(con->get_priv());
+    auto priv = con->get_priv();
+    auto session = static_cast<OSDSession*>(priv.get());
     if (session) {
       ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
                    << " osd." << session->osd << dendl;
       _linger_ops_resend(lresend, wl);
       wl.unlock();
       maybe_request_map();
-      session->put();
     }
     return true;
   }
   }
 
   ConnectionRef con = m->get_connection();
-  OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+  auto priv = con->get_priv();
+  auto s = static_cast<OSDSession*>(priv.get());
   if (!s || s->con != con) {
     ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
     m->put();
-    if (s)
-      s->put();
     return;
   }
 
                   << " not found" << dendl;
     m->put();
     sl.unlock();
-    s->put();
     return;
   }
 
                   << dendl;
     m->put();
     sl.unlock();
-    s->put();
     return;
   }
   if (c->poutbl) {
   sul.unlock();
 
   m->put();
-  s->put();
 }
 
 void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
 
 
   bool ms_handle_reset(Connection *con) override {
     dout(1) << __func__ << dendl;
-    Session *session = (Session *)con->get_priv();
-    if (!session)
-      return false;
-    session->put();
-    return true;
+    return con->get_priv().get();
   }
 
   bool ms_handle_refused(Connection *con) override {
 
   void ms_handle_fast_connect(Connection *con) override {
     lock.Lock();
     lderr(g_ceph_context) << __func__ << " " << con << dendl;
-    Session *s = static_cast<Session*>(con->get_priv());
+    auto s = con->get_priv();
     if (!s) {
-      s = new Session(con);
-      con->set_priv(s->get());
-      lderr(g_ceph_context) << __func__ << " con: " << con << " count: " << s->count << dendl;
+      auto session = new Session(con);
+      con->set_priv(RefCountedPtr{session, false});
+      lderr(g_ceph_context) << __func__ << " con: " << con
+                           << " count: " << session->count << dendl;
     }
-    s->put();
     got_connect = true;
     cond.Signal();
     lock.Unlock();
   }
   void ms_handle_fast_accept(Connection *con) override {
-    Session *s = static_cast<Session*>(con->get_priv());
-    if (!s) {
-      s = new Session(con);
-      con->set_priv(s->get());
+    if (!con->get_priv()) {
+      con->set_priv(RefCountedPtr{new Session(con), false});
     }
-    s->put();
   }
   bool ms_dispatch(Message *m) override {
-    Session *s = static_cast<Session*>(m->get_connection()->get_priv());
+    auto priv = m->get_connection()->get_priv();
+    auto s = static_cast<Session*>(priv.get());
     if (!s) {
       s = new Session(m->get_connection());
-      m->get_connection()->set_priv(s->get());
+      priv.reset(s, false);
+      m->get_connection()->set_priv(priv);
     }
-    s->put();
     s->count++;
     lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
     if (is_server) {
   bool ms_handle_reset(Connection *con) override {
     Mutex::Locker l(lock);
     lderr(g_ceph_context) << __func__ << " " << con << dendl;
-    Session *s = static_cast<Session*>(con->get_priv());
-    if (s) {
-      s->con.reset(NULL);  // break con <-> session ref cycle
-      con->set_priv(NULL);   // break ref <-> session cycle, if any
-      s->put();
+    auto priv = con->get_priv();
+    if (auto s = static_cast<Session*>(priv.get()); s) {
+      s->con.reset();  // break con <-> session ref cycle
+      con->set_priv(nullptr);   // break ref <-> session cycle, if any
     }
     return true;
   }
   void ms_handle_remote_reset(Connection *con) override {
     Mutex::Locker l(lock);
     lderr(g_ceph_context) << __func__ << " " << con << dendl;
-    Session *s = static_cast<Session*>(con->get_priv());
-    if (s) {
-      s->con.reset(NULL);  // break con <-> session ref cycle
-      con->set_priv(NULL);   // break ref <-> session cycle, if any
-      s->put();
+    auto priv = con->get_priv();
+    if (auto s = static_cast<Session*>(priv.get()); s) {
+      s->con.reset();  // break con <-> session ref cycle
+      con->set_priv(nullptr);   // break ref <-> session cycle, if any
     }
     got_remote_reset = true;
     cond.Signal();
     return false;
   }
   void ms_fast_dispatch(Message *m) override {
-    Session *s = static_cast<Session*>(m->get_connection()->get_priv());
+    auto priv = m->get_connection()->get_priv();
+    auto s = static_cast<Session*>(priv.get());
     if (!s) {
       s = new Session(m->get_connection());
-      m->get_connection()->set_priv(s->get());
+      priv.reset(s, false);
+      m->get_connection()->set_priv(priv);
     }
-    s->put();
     s->count++;
     lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
     if (is_server) {
     cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(conn->is_connected());
-  ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
+  ASSERT_EQ(1, static_cast<Session*>(conn->get_priv().get())->get_count());
   ASSERT_TRUE(conn->peer_is_osd());
 
   // 2. test rebind port
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
 
   // 3. test markdown connection
   conn->mark_down();
     cli_dispatcher.got_new = false;
   }
   srv_dispatcher.loopback = false;
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   client_msgr->shutdown();
   client_msgr->wait();
   server_msgr->shutdown();
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   ASSERT_TRUE(conn->get_peer_addr() == server_msgr->get_myaddr());
   ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
   // Make should server_conn is the one we already accepted from client,
   // so it means client_msgr has the same addr when server connection has
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   server_msgr->shutdown();
   client_msgr->shutdown();
   server_msgr->wait();
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
 
   server_msgr->shutdown();
   client_msgr->shutdown();
     cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(conn->is_connected());
-  ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   ASSERT_TRUE(conn->peer_is_osd());
 
   // 2. wait for idle
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   conn->mark_down();
   ASSERT_FALSE(conn->is_connected());
   ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
   // don't lose state
-  ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
 
   srv_dispatcher.got_new = false;
   conn = client_msgr->get_connection(server_msgr->get_myinst());
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   server_conn = server_msgr->get_connection(client_msgr->get_myinst());
   {
     Mutex::Locker l(srv_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
   // resetcheck happen
-  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv())->get_count());
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   server_conn = server_msgr->get_connection(client_msgr->get_myinst());
-  ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv())->get_count());
+  ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
   cli_dispatcher.got_remote_reset = false;
 
   server_msgr->shutdown();
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   conn->mark_down();
   ASSERT_FALSE(conn->is_connected());
 
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
   // server lose state
   {
     while (!srv_dispatcher.got_new)
       srv_dispatcher.cond.Wait(srv_dispatcher.lock);
   }
-  ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv())->get_count());
+  ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
 
   // 2. test for client lossy
   server_conn->mark_down();
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
 
   server_msgr->shutdown();
   client_msgr->shutdown();
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
   ASSERT_FALSE(cli_dispatcher.got_remote_reset);
   cli_dispatcher.got_connect = false;
       cli_dispatcher.cond.Wait(cli_dispatcher.lock);
     cli_dispatcher.got_new = false;
   }
-  ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
   server_conn = server_msgr->get_connection(client_msgr->get_myinst());
-  ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
 
   server_msgr->shutdown();
   client_msgr->shutdown();
     cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(conn->is_connected());
-  ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
 
   // 2. mix auth
   g_ceph_context->_conf->set_val("auth_cluster_required", "none");
     cli_dispatcher.got_new = false;
   }
   ASSERT_TRUE(conn->is_connected());
-  ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
+  ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
 
   server_msgr->shutdown();
   client_msgr->shutdown();