#include "common/ceph_context.h"
#include "common/valgrind.h"
+#include <boost/smart_ptr/intrusive_ptr.hpp>
+
// re-include our assert to clobber the system one; fix dout:
#include "include/assert.h"
void intrusive_ptr_add_ref(const RefCountedObject *p);
void intrusive_ptr_release(const RefCountedObject *p);
+using RefCountedPtr = boost::intrusive_ptr<RefCountedObject>;
+
#endif
int r, bufferlist outbl,
std::string_view outs)
{
- Session *session = static_cast<Session *>(m->get_connection()->get_priv());
+ auto priv = m->get_connection()->get_priv();
+ auto session = static_cast<Session *>(priv.get());
assert(session != NULL);
// If someone is using a closed session for sending commands (e.g.
// the ceph CLI) then we should feel free to clean up this connection
assert(session->is_closed());
session->connection->mark_disposable();
}
- session->put();
+ priv.reset();
MCommandReply *reply = new MCommandReply(r, outs);
reply->set_tid(m->get_tid());
/* This function DOES put the passed message before returning*/
void MDSDaemon::handle_command(MCommand *m)
{
- Session *session = static_cast<Session *>(m->get_connection()->get_priv());
+ auto priv = m->get_connection()->get_priv();
+ auto session = static_cast<Session *>(priv.get());
assert(session != NULL);
int r = 0;
} else {
r = _handle_command(cmdmap, m, &outbl, &outs, &run_after, &need_reply);
}
- session->put();
+ priv.reset();
if (need_reply) {
send_command_reply(m, mds_rank, r, outbl, outs);
if (beacon.get_want_state() == CEPH_MDS_STATE_DNE)
return false;
- Session *session = static_cast<Session *>(con->get_priv());
- if (session) {
+ auto priv = con->get_priv();
+ if (auto session = static_cast<Session *>(priv.get()); session) {
if (session->is_closed()) {
dout(3) << "ms_handle_reset closing connection for session " << session->info.inst << dendl;
con->mark_down();
- con->set_priv(NULL);
+ con->set_priv(nullptr);
}
- session->put();
} else {
con->mark_down();
}
if (beacon.get_want_state() == CEPH_MDS_STATE_DNE)
return;
- Session *session = static_cast<Session *>(con->get_priv());
- if (session) {
+ auto priv = con->get_priv();
+ if (auto session = static_cast<Session *>(priv.get()); session) {
if (session->is_closed()) {
dout(3) << "ms_handle_remote_reset closing connection for session " << session->info.inst << dendl;
con->mark_down();
- con->set_priv(NULL);
+ con->set_priv(nullptr);
}
- session->put();
}
}
s->info.inst.addr = con->get_peer_addr();
s->info.inst.name = n;
dout(10) << " new session " << s << " for " << s->info.inst << " con " << con << dendl;
- con->set_priv(s);
+ con->set_priv(RefCountedPtr{s, false});
s->connection = con;
if (mds_rank) {
mds_rank->kick_waiters_for_any_client_connection();
} else {
dout(10) << " existing session " << s << " for " << s->info.inst << " existing con " << s->connection
<< ", new/authorizing con " << con << dendl;
- con->set_priv(s->get());
+ con->set_priv(RefCountedPtr{s, false});
return;
}
- Session *s = static_cast<Session *>(con->get_priv());
+ auto priv = con->get_priv();
+ auto s = static_cast<Session *>(priv.get());
dout(10) << "ms_handle_accept " << con->get_peer_addr() << " con " << con << " session " << s << dendl;
if (s) {
if (s->connection != con) {
s->preopen_out_queue.pop_front();
}
}
- s->put();
}
}
Session *MDSRank::get_session(Message *m)
{
- Session *session = static_cast<Session *>(m->get_connection()->get_priv());
+ // do not carry ref
+ auto session = static_cast<Session *>(m->get_connection()->get_priv().get());
if (session) {
- session->put(); // do not carry ref
dout(20) << "get_session have " << session << " " << session->info.inst
<< " state " << session->get_state_name() << dendl;
// Check if we've imported an open session since (new sessions start closed)
void MDSRank::send_message_client_counted(Message *m, Connection *connection)
{
- Session *session = static_cast<Session *>(connection->get_priv());
+ // do not carry ref
+ auto session = static_cast<Session *>(m->get_connection()->get_priv().get());
if (session) {
- session->put(); // do not carry ref
send_message_client_counted(m, session);
} else {
dout(10) << "send_message_client_counted has no session for " << m->get_source_inst() << dendl;
* normally.
*/
MonSession *get_session() {
- MonSession *session = (MonSession *)get_connection()->get_priv();
- if (session)
- session->put();
- return session;
+ auto priv = get_connection()->get_priv();
+ return static_cast<MonSession*>(priv.get());
}
const char *get_type_name() const override { return "PaxosServiceMessage"; }
bool DaemonServer::ms_handle_reset(Connection *con)
{
if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
- MgrSessionRef session(static_cast<MgrSession*>(con->get_priv()));
+ auto priv = con->get_priv();
+ auto session = static_cast<MgrSession*>(priv.get());
if (!session) {
return false;
}
- session->put(); // SessionRef takes a ref
Mutex::Locker l(lock);
dout(10) << "unregistering osd." << session->osd_id
<< " session " << session << " con " << con << dendl;
{
Mutex::Locker l(lock);
// kill session
- MgrSessionRef session(static_cast<MgrSession*>(m->get_connection()->get_priv()));
+ auto priv = m->get_connection()->get_priv();
+ auto session = static_cast<MgrSession*>(priv.get());
if (!session) {
return false;
}
m->get_connection()->mark_down();
- session->put();
dout(10) << "unregistering osd." << session->osd_id
<< " session " << session << " con " << m->get_connection() << dendl;
std::shared_ptr<CommandContext> cmdctx = std::make_shared<CommandContext>(m);
- MgrSessionRef session(static_cast<MgrSession*>(m->get_connection()->get_priv()));
+ auto priv = m->get_connection()->get_priv();
+ auto session = static_cast<MgrSession*>(priv.get());
if (!session) {
return true;
}
- session->put(); // SessionRef takes a ref
if (session->inst.name == entity_name_t())
session->inst.name = m->get_source();
bool is_allowed;
if (!mgr_cmd) {
MonCommand py_command = {"", "", "py", "rw", "cli"};
- is_allowed = _allowed_command(session.get(), py_command.module,
+ is_allowed = _allowed_command(session, py_command.module,
prefix, cmdctx->cmdmap, param_str_map, &py_command);
} else {
// validate user's permissions for requested command
- is_allowed = _allowed_command(session.get(), mgr_cmd->module,
+ is_allowed = _allowed_command(session, mgr_cmd->module,
prefix, cmdctx->cmdmap, param_str_map, mgr_cmd);
}
if (!is_allowed) {
<< report->packed.length() << " bytes of data" << dendl;
// Retrieve session state
- MgrSessionRef session(static_cast<MgrSession*>(
- report->get_connection()->get_priv()));
+ auto priv = report->get_connection()->get_priv();
+ auto session = static_cast<MgrSession*>(priv.get());
// Load any newly declared types
for (const auto &t : report->declare_types) {
private:
Message *request;
utime_t dequeued_time;
- MonSession *session;
+ RefCountedPtr session;
ConnectionRef con;
bool forwarded_to_leader;
op_type_t op_type;
req->get_recv_stamp().is_zero() ?
ceph_clock_now() : req->get_recv_stamp()),
request(req),
- session(NULL),
con(NULL),
forwarded_to_leader(false),
op_type(OP_TYPE_NONE)
if (req) {
con = req->get_connection();
if (con) {
- session = static_cast<MonSession*>(con->get_priv());
+ session = con->get_priv();
}
}
}
public:
~MonOpRequest() override {
request->put();
- // certain ops may not have a session (e.g., AUTH or PING)
- if (session)
- session->put();
}
MonSession *get_session() const {
- if (!session)
- return NULL;
- return session;
+ return static_cast<MonSession*>(session.get());
}
template<class T>
ConnectionRef get_connection() { return con; }
void set_session(MonSession *s) {
- if (session) {
- // we will be rewriting the existing session; drop the ref.
- session->put();
- }
-
- if (s == NULL) {
- session = NULL;
- } else {
- session = static_cast<MonSession*>(s->get());
- }
+ session.reset(s);
}
bool is_src_mon() const {
return;
}
- MonSession *session = static_cast<MonSession *>(
- m->get_connection()->get_priv());
+ auto priv = m->get_connection()->get_priv();
+ auto session = static_cast<MonSession *>(priv.get());
if (!session) {
dout(5) << __func__ << " dropping stray message " << *m << dendl;
return;
}
- BOOST_SCOPE_EXIT_ALL(=) {
- session->put();
- };
if (m->cmd.empty()) {
string rs = "No command supplied";
ConnectionRef c(new AnonConnection(cct));
MonSession *s = new MonSession(req->get_source_inst(),
static_cast<Connection*>(c.get()));
- c->set_priv(s->get());
+ c->set_priv(RefCountedPtr{s, false});
c->set_peer_addr(m->client.addr);
c->set_peer_type(m->client.name.type());
c->set_features(m->con_features);
dout(10) << " mesg " << req << " from " << m->get_source_addr() << dendl;
_ms_dispatch(req);
- s->put();
}
}
routed_requests.erase(*p);
}
s->routed_request_tids.clear();
- s->con->set_priv(NULL);
+ s->con->set_priv(nullptr);
session_map.remove_session(s);
logger->set(l_mon_num_sessions, session_map.get_size());
logger->inc(l_mon_session_rm);
s = session_map.new_session(m->get_source_inst(), con.get());
}
assert(s);
- con->set_priv(s->get());
+ con->set_priv(RefCountedPtr{s, false});
dout(10) << __func__ << " new session " << s << " " << *s
<< " features 0x" << std::hex
<< s->con_features << std::dec << dendl;
s->caps = *mon_caps;
s->authenticated = true;
}
- s->put();
} else {
dout(20) << __func__ << " existing session " << s << " for " << s->inst
<< dendl;
if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON)
return false;
- MonSession *s = static_cast<MonSession *>(con->get_priv());
+ auto priv = con->get_priv();
+ auto s = static_cast<MonSession*>(priv.get());
if (!s)
return false;
// break any con <-> session ref cycle
- s->con->set_priv(NULL);
+ s->con->set_priv(nullptr);
if (is_shutdown())
return false;
Mutex::Locker l(session_map_lock);
remove_session(s);
}
- s->put();
return true;
}
struct Connection : public RefCountedObject {
mutable Mutex lock;
Messenger *msgr;
- RefCountedObject *priv;
+ RefCountedPtr priv;
int peer_type;
entity_addr_t peer_addr;
utime_t last_keepalive, last_keepalive_ack;
: RefCountedObject(cct, 0),
lock("Connection::lock"),
msgr(m),
- priv(NULL),
peer_type(-1),
features(0),
failed(false),
~Connection() override {
//generic_dout(0) << "~Connection " << this << dendl;
- if (priv) {
- //generic_dout(0) << "~Connection " << this << " dropping priv " << priv << dendl;
- priv->put();
- }
}
- void set_priv(RefCountedObject *o) {
+ void set_priv(const RefCountedPtr& o) {
Mutex::Locker l(lock);
- if (priv)
- priv->put();
priv = o;
}
- RefCountedObject *get_priv() {
+ RefCountedPtr get_priv() {
Mutex::Locker l(lock);
- if (priv)
- return priv->get();
- return NULL;
+ return priv;
}
/**
return;
hi = &heartbeat_peers[p];
hi->peer = p;
- HeartbeatSession *s = new HeartbeatSession(p);
+ RefCountedPtr s{new HeartbeatSession{p}, false};
hi->con_back = cons.first.get();
- hi->con_back->set_priv(s->get());
+ hi->con_back->set_priv(s);
if (cons.second) {
hi->con_front = cons.second.get();
- hi->con_front->set_priv(s->get());
+ hi->con_front->set_priv(s);
dout(10) << "_add_heartbeat_peer: new peer osd." << p
<< " " << hi->con_back->get_peer_addr()
<< " " << hi->con_front->get_peer_addr()
<< " " << hi->con_back->get_peer_addr()
<< dendl;
}
- s->put();
} else {
hi = &i->second;
}
bool OSD::heartbeat_reset(Connection *con)
{
- HeartbeatSession *s = static_cast<HeartbeatSession*>(con->get_priv());
+ auto s = con->get_priv();
if (s) {
heartbeat_lock.Lock();
if (is_stopping()) {
heartbeat_lock.Unlock();
- s->put();
return true;
}
- map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(s->peer);
+ auto heartbeat_session = static_cast<HeartbeatSession*>(s.get());
+ auto p = heartbeat_peers.find(heartbeat_session->peer);
if (p != heartbeat_peers.end() &&
(p->second.con_back == con ||
p->second.con_front == con)) {
pair<ConnectionRef,ConnectionRef> newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch);
if (newcon.first) {
p->second.con_back = newcon.first.get();
- p->second.con_back->set_priv(s->get());
+ p->second.con_back->set_priv(s);
if (newcon.second) {
p->second.con_front = newcon.second.get();
- p->second.con_front->set_priv(s->get());
+ p->second.con_front->set_priv(s);
}
} else {
dout(10) << "heartbeat_reset failed hb con " << con << " for osd." << p->second.peer
dout(10) << "heartbeat_reset closing (old) failed hb con " << con << dendl;
}
heartbeat_lock.Unlock();
- s->put();
}
return true;
}
{
if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON &&
con->get_peer_type() != CEPH_ENTITY_TYPE_MGR) {
- Session *s = static_cast<Session*>(con->get_priv());
+ auto priv = con->get_priv();
+ auto s = static_cast<Session*>(priv.get());
if (!s) {
- s = new Session(cct);
- con->set_priv(s->get());
+ s = new Session{cct};
+ con->set_priv(RefCountedPtr{s, false});
s->con = con;
dout(10) << " new session (outgoing) " << s << " con=" << s->con
<< " addr=" << s->con->get_peer_addr() << dendl;
assert(con->get_peer_type() == CEPH_ENTITY_TYPE_OSD);
s->entity_name.set_type(CEPH_ENTITY_TYPE_OSD);
}
- s->put();
}
}
{
if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON &&
con->get_peer_type() != CEPH_ENTITY_TYPE_MGR) {
- Session *s = static_cast<Session*>(con->get_priv());
+ auto priv = con->get_priv();
+ auto s = static_cast<Session*>(priv.get());
if (!s) {
- s = new Session(cct);
- con->set_priv(s->get());
+ s = new Session{cct};
+ con->set_priv(RefCountedPtr{s, false});
s->con = con;
dout(10) << "new session (incoming)" << s << " con=" << con
<< " addr=" << con->get_peer_addr()
assert(con->get_peer_type() == CEPH_ENTITY_TYPE_OSD);
s->entity_name.set_type(CEPH_ENTITY_TYPE_OSD);
}
- s->put();
}
}
bool OSD::ms_handle_reset(Connection *con)
{
- Session *session = static_cast<Session*>(con->get_priv());
+ auto s = con->get_priv();
+ auto session = static_cast<Session*>(s.get());
dout(2) << "ms_handle_reset con " << con << " session " << session << dendl;
if (!session)
return false;
session->wstate.reset(con);
- session->con.reset(NULL); // break con <-> session ref cycle
+ session->con->set_priv(nullptr);
+ session->con.reset(); // break con <-> session ref cycle
// note that we break session->con *before* the session_handle_reset
// cleanup below. this avoids a race between us and
// PG::add_backoff, Session::check_backoff, etc.
- session_handle_reset(session);
- session->put();
+ session_handle_reset(SessionRef{session});
return true;
}
if (!cct->_conf->osd_fast_fail_on_connection_refused)
return false;
- Session *session = static_cast<Session*>(con->get_priv());
+ auto priv = con->get_priv();
+ auto session = static_cast<Session*>(priv.get());
dout(2) << "ms_handle_refused con " << con << " session " << session << dendl;
if (!session)
return false;
}
}
}
- session->put();
return true;
}
cluster_messenger->set_addr_unknowns(cluster_addr);
dout(10) << " assuming cluster_addr ip matches client_addr" << dendl;
} else {
- Session *s = static_cast<Session*>(local_connection->get_priv());
- if (s)
- s->put();
- else
+ if (auto session = local_connection->get_priv(); !session) {
cluster_messenger->ms_deliver_handle_fast_connect(local_connection);
+ }
}
entity_addr_t hb_back_addr = hb_back_server_messenger->get_myaddr();
hb_back_server_messenger->set_addr_unknowns(hb_back_addr);
dout(10) << " assuming hb_back_addr ip matches cluster_addr" << dendl;
} else {
- Session *s = static_cast<Session*>(local_connection->get_priv());
- if (s)
- s->put();
- else
+ if (auto session = local_connection->get_priv(); !session) {
hb_back_server_messenger->ms_deliver_handle_fast_connect(local_connection);
+ }
}
entity_addr_t hb_front_addr = hb_front_server_messenger->get_myaddr();
hb_front_server_messenger->set_addr_unknowns(hb_front_addr);
dout(10) << " assuming hb_front_addr ip matches client_addr" << dendl;
} else {
- Session *s = static_cast<Session*>(local_connection->get_priv());
- if (s)
- s->put();
- else
+ if (auto session = local_connection->get_priv(); !session) {
hb_front_server_messenger->ms_deliver_handle_fast_connect(local_connection);
+ }
}
MOSDBoot *mboot = new MOSDBoot(superblock, get_osdmap_epoch(), service.get_boot_epoch(),
void OSD::handle_command(MCommand *m)
{
ConnectionRef con = m->get_connection();
- Session *session = static_cast<Session *>(con->get_priv());
+ auto priv = con->get_priv();
+ auto session = static_cast<Session *>(priv.get());
if (!session) {
con->send_message(new MCommandReply(m, -EPERM));
m->put();
}
OSDCap& caps = session->caps;
- session->put();
+ priv.reset();
if (!caps.allow_all() || m->get_source().is_mon()) {
con->send_message(new MCommandReply(m, -EPERM));
op->check_send_map = false;
}
-void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
+void OSD::dispatch_session_waiting(SessionRef session, OSDMapRef osdmap)
{
assert(session->session_dispatch_lock.is_locked());
// legacy client, and this is an MOSDOp (the *only* fast dispatch
// message that didn't have an explicit spg_t); we need to map
// them to an spg_t while preserving delivery order.
- Session *session = static_cast<Session*>(m->get_connection()->get_priv());
- if (session) {
- {
- Mutex::Locker l(session->session_dispatch_lock);
- op->get();
- session->waiting_on_map.push_back(*op);
- OSDMapRef nextmap = service.get_nextmap_reserved();
- dispatch_session_waiting(session, nextmap);
- service.release_map(nextmap);
- }
- session->put();
+ auto priv = m->get_connection()->get_priv();
+ if (auto session = static_cast<Session*>(priv.get()); session) {
+ Mutex::Locker l{session->session_dispatch_lock};
+ op->get();
+ session->waiting_on_map.push_back(*op);
+ OSDMapRef nextmap = service.get_nextmap_reserved();
+ dispatch_session_waiting(session, nextmap);
+ service.release_map(nextmap);
}
}
OID_EVENT_TRACE_WITH_MSG(m, "MS_FAST_DISPATCH_END", false);
if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
if (m->get_type() == CEPH_MSG_OSD_MAP) {
MOSDMap *mm = static_cast<MOSDMap*>(m);
- Session *s = static_cast<Session*>(m->get_connection()->get_priv());
- if (s) {
+ auto priv = m->get_connection()->get_priv();
+ if (auto s = static_cast<Session*>(priv.get()); s) {
s->received_map_lock.lock();
s->received_map_epoch = mm->get_last();
s->received_map_lock.unlock();
- s->put();
}
}
}
}
if (isvalid) {
- Session *s = static_cast<Session *>(con->get_priv());
+ auto priv = con->get_priv();
+ auto s = static_cast<Session*>(priv.get());
if (!s) {
- s = new Session(cct);
- con->set_priv(s->get());
+ s = new Session{cct};
+ con->set_priv(RefCountedPtr{s, false});
s->con = con;
- dout(10) << " new session " << s << " con=" << s->con << " addr=" << s->con->get_peer_addr() << dendl;
+ dout(10) << " new session " << s << " con=" << s->con
+ << " addr=" << con->get_peer_addr() << dendl;
}
s->entity_name = name;
isvalid = false;
}
}
-
- s->put();
}
return true;
}
return;
}
- Session *session = static_cast<Session *>(m->get_connection()->get_priv());
- if (session && !(session->entity_name.is_mon() ||
+ auto priv = m->get_connection()->get_priv();
+ if (auto session = static_cast<Session *>(priv.get());
+ session && !(session->entity_name.is_mon() ||
session->entity_name.is_osd())) {
//not enough perms!
dout(10) << "got osd map from Session " << session
<< " which we can't take maps from (not a mon or osd)" << dendl;
m->put();
- session->put();
return;
}
- if (session)
- session->put();
// share with the objecter
if (!is_preboot())
<< dendl;
ConnectionRef con = m->get_connection();
con->mark_down();
- Session *s = static_cast<Session*>(con->get_priv());
- if (s) {
+ auto priv = con->get_priv();
+ if (auto s = static_cast<Session*>(priv.get()); s) {
if (!is_fast_dispatch)
s->session_dispatch_lock.Lock();
clear_session_waiting_on_map(s);
- con->set_priv(NULL); // break ref <-> session cycle, if any
+ con->set_priv(nullptr); // break ref <-> session cycle, if any
+ s->con.reset();
if (!is_fast_dispatch)
s->session_dispatch_lock.Unlock();
- s->put();
}
return false;
}
logger->tinc(l_osd_op_before_dequeue_op_lat, latency);
- Session *session = static_cast<Session *>(
- op->get_req()->get_connection()->get_priv());
- if (session) {
+ auto priv = op->get_req()->get_connection()->get_priv();
+ if (auto session = static_cast<Session *>(priv.get()); session) {
maybe_share_map(session, op, pg->get_osdmap());
- session->put();
}
if (pg->is_deleting())
<< ", dropping " << qi << dendl;
// share map with client?
if (boost::optional<OpRequestRef> _op = qi.maybe_get_op()) {
- Session *session = static_cast<Session *>(
- (*_op)->get_req()->get_connection()->get_priv());
- if (session) {
+ auto priv = (*_op)->get_req()->get_connection()->get_priv();
+ if (auto session = static_cast<Session *>(priv.get()); session) {
osd->maybe_share_map(session, *_op, sdata->shard_osdmap);
- session->put();
}
}
unsigned pushes_to_free = qi.get_reserved_pushes();
// -- sessions --
private:
- void dispatch_session_waiting(Session *session, OSDMapRef osdmap);
+ void dispatch_session_waiting(SessionRef session, OSDMapRef osdmap);
void maybe_share_map(Session *session, OpRequestRef op, OSDMapRef osdmap);
Mutex session_waiting_lock;
- set<Session*> session_waiting_for_map;
+ set<SessionRef> session_waiting_for_map;
/// Caller assumes refs for included Sessions
- void get_sessions_waiting_for_map(set<Session*> *out) {
+ void get_sessions_waiting_for_map(set<SessionRef> *out) {
Mutex::Locker l(session_waiting_lock);
out->swap(session_waiting_for_map);
}
- void register_session_waiting_on_map(Session *session) {
+ void register_session_waiting_on_map(SessionRef session) {
Mutex::Locker l(session_waiting_lock);
- if (session_waiting_for_map.insert(session).second) {
- session->get();
- }
+ session_waiting_for_map.insert(session);
}
- void clear_session_waiting_on_map(Session *session) {
+ void clear_session_waiting_on_map(SessionRef session) {
Mutex::Locker l(session_waiting_lock);
- set<Session*>::iterator i = session_waiting_for_map.find(session);
- if (i != session_waiting_for_map.end()) {
- (*i)->put();
- session_waiting_for_map.erase(i);
- }
+ session_waiting_for_map.erase(session);
}
void dispatch_sessions_waiting_on_map() {
- set<Session*> sessions_to_check;
+ set<SessionRef> sessions_to_check;
get_sessions_waiting_for_map(&sessions_to_check);
- for (set<Session*>::iterator i = sessions_to_check.begin();
+ for (auto i = sessions_to_check.begin();
i != sessions_to_check.end();
sessions_to_check.erase(i++)) {
- (*i)->session_dispatch_lock.Lock();
- dispatch_session_waiting(*i, osdmap);
- (*i)->session_dispatch_lock.Unlock();
- (*i)->put();
+ Mutex::Locker l{(*i)->session_dispatch_lock};
+ SessionRef session = *i;
+ dispatch_session_waiting(session, osdmap);
}
}
- void session_handle_reset(Session *session) {
+ void session_handle_reset(SessionRef session) {
Mutex::Locker l(session->session_dispatch_lock);
clear_session_waiting_on_map(session);
const MOSDOp *req = static_cast<const MOSDOp*>(op->get_req());
- Session *session = static_cast<Session*>(req->get_connection()->get_priv());
+ auto priv = req->get_connection()->get_priv();
+ auto session = static_cast<Session*>(priv.get());
if (!session) {
dout(0) << "op_has_sufficient_caps: no session for op " << *req << dendl;
return false;
}
OSDCap& caps = session->caps;
- session->put();
+ priv.reset();
const string &key = req->get_hobj().get_key().empty() ?
req->get_oid().name :
void PrimaryLogPG::handle_backoff(OpRequestRef& op)
{
const MOSDBackoff *m = static_cast<const MOSDBackoff*>(op->get_req());
- SessionRef session = static_cast<Session*>(m->get_connection()->get_priv());
+ SessionRef session{static_cast<Session*>(m->get_connection()->get_priv().get())};
if (!session)
return; // drop it.
- session->put(); // get_priv takes a ref, and so does the SessionRef
hobject_t begin = info.pgid.pgid.get_hobj_start();
hobject_t end = info.pgid.pgid.get_hobj_end(pool.info.get_pg_num());
if (begin < m->begin) {
const Message *m = op->get_req();
int msg_type = m->get_type();
if (m->get_connection()->has_feature(CEPH_FEATURE_RADOS_BACKOFF)) {
- SessionRef session = static_cast<Session*>(m->get_connection()->get_priv());
+ SessionRef session{static_cast<Session*>(m->get_connection()->get_priv().get())};
if (!session)
return; // drop it.
- session->put(); // get_priv takes a ref, and so does the SessionRef
if (msg_type == CEPH_MSG_OSD_OP) {
if (session->check_backoff(cct, info.pgid,
m->get_connection()->has_feature(CEPH_FEATURE_RADOS_BACKOFF);
SessionRef session;
if (can_backoff) {
- session = static_cast<Session*>(m->get_connection()->get_priv());
+ session = static_cast<Session*>(m->get_connection()->get_priv().get());
if (!session.get()) {
dout(10) << __func__ << " no session" << dendl;
return;
}
- session->put(); // get_priv() takes a ref, and so does the intrusive_ptr
if (session->check_backoff(cct, info.pgid, head, m)) {
return;
assert(conn);
- boost::intrusive_ptr<Session> session((Session *)conn->get_priv());
- if (!session.get())
+ auto session = conn->get_priv();
+ if (!session)
return;
- session->put(); // get_priv() takes a ref, and so does the intrusive_ptr
for (list<pair<watch_info_t,bool> >::iterator i = ctx->watch_connects.begin();
i != ctx->watch_connects.end();
dout(10) << __func__ << " con " << con << dendl;
conn = con;
will_ping = _will_ping;
- Session* sessionref(static_cast<Session*>(con->get_priv()));
- if (sessionref) {
+ auto priv = con->get_priv();
+ if (priv) {
+ auto sessionref = static_cast<Session*>(priv.get());
sessionref->wstate.addWatch(self.lock());
- sessionref->put();
+ priv.reset();
for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
i != in_progress_notifies.end();
++i) {
unregister_cb();
discarded = true;
if (conn) {
- Session* sessionref(static_cast<Session*>(conn->get_priv()));
- if (sessionref) {
- sessionref->wstate.removeWatch(self.lock());
- sessionref->put();
+ if (auto priv = conn->get_priv(); priv) {
+ auto session = static_cast<Session*>(priv.get());
+ session->wstate.removeWatch(self.lock());
}
conn = ConnectionRef();
}
OSDSession *s = new OSDSession(cct, osd);
osd_sessions[osd] = s;
s->con = messenger->get_connection(osdmap->get_inst(osd));
- s->con->set_priv(s->get());
+ s->con->set_priv(RefCountedPtr{s});
logger->inc(l_osdc_osd_session_open);
logger->set(l_osdc_osd_sessions, osd_sessions.size());
s->get();
logger->inc(l_osdc_osd_session_close);
}
s->con = messenger->get_connection(inst);
- s->con->set_priv(s->get());
+ s->con->set_priv(RefCountedPtr{s});
s->incarnation++;
logger->inc(l_osdc_osd_session_open);
}
}
ConnectionRef con = m->get_connection();
- OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+ auto priv = con->get_priv();
+ auto s = static_cast<OSDSession*>(priv.get());
if (!s || s->con != con) {
ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
- if (s) {
- s->put();
- }
m->put();
return;
}
" onnvram" : " ack"))
<< " ... stray" << dendl;
sl.unlock();
- s->put();
m->put();
return;
}
}
_session_op_remove(s, op);
sl.unlock();
- s->put();
_op_submit(op, sul, NULL);
m->put();
<< op->session->con->get_peer_addr() << dendl;
m->put();
sl.unlock();
- s->put();
return;
}
} else {
num_in_flight--;
_session_op_remove(s, op);
sl.unlock();
- s->put();
// FIXME: two redirects could race and reorder
num_in_flight--;
_session_op_remove(s, op);
sl.unlock();
- s->put();
op->tid = 0;
op->target.flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
}
m->put();
- s->put();
}
void Objecter::handle_osd_backoff(MOSDBackoff *m)
}
ConnectionRef con = m->get_connection();
- OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+ auto priv = con->get_priv();
+ auto s = static_cast<OSDSession*>(priv.get());
if (!s || s->con != con) {
ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
- if (s)
- s->put();
m->put();
return;
}
get_session(s);
- s->put(); // from get_priv() above
OSDSession::unique_lock sl(s->lock);
if (!initialized)
return false;
if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
- OSDSession *session = static_cast<OSDSession*>(con->get_priv());
+ auto priv = con->get_priv();
+ auto session = static_cast<OSDSession*>(priv.get());
if (session) {
ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
<< " osd." << session->osd << dendl;
_linger_ops_resend(lresend, wl);
wl.unlock();
maybe_request_map();
- session->put();
}
return true;
}
}
ConnectionRef con = m->get_connection();
- OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+ auto priv = con->get_priv();
+ auto s = static_cast<OSDSession*>(priv.get());
if (!s || s->con != con) {
ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
m->put();
- if (s)
- s->put();
return;
}
<< " not found" << dendl;
m->put();
sl.unlock();
- s->put();
return;
}
<< dendl;
m->put();
sl.unlock();
- s->put();
return;
}
if (c->poutbl) {
sul.unlock();
m->put();
- s->put();
}
void Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
bool ms_handle_reset(Connection *con) override {
dout(1) << __func__ << dendl;
- Session *session = (Session *)con->get_priv();
- if (!session)
- return false;
- session->put();
- return true;
+ return con->get_priv().get();
}
bool ms_handle_refused(Connection *con) override {
void ms_handle_fast_connect(Connection *con) override {
lock.Lock();
lderr(g_ceph_context) << __func__ << " " << con << dendl;
- Session *s = static_cast<Session*>(con->get_priv());
+ auto s = con->get_priv();
if (!s) {
- s = new Session(con);
- con->set_priv(s->get());
- lderr(g_ceph_context) << __func__ << " con: " << con << " count: " << s->count << dendl;
+ auto session = new Session(con);
+ con->set_priv(RefCountedPtr{session, false});
+ lderr(g_ceph_context) << __func__ << " con: " << con
+ << " count: " << session->count << dendl;
}
- s->put();
got_connect = true;
cond.Signal();
lock.Unlock();
}
void ms_handle_fast_accept(Connection *con) override {
- Session *s = static_cast<Session*>(con->get_priv());
- if (!s) {
- s = new Session(con);
- con->set_priv(s->get());
+ if (!con->get_priv()) {
+ con->set_priv(RefCountedPtr{new Session(con), false});
}
- s->put();
}
bool ms_dispatch(Message *m) override {
- Session *s = static_cast<Session*>(m->get_connection()->get_priv());
+ auto priv = m->get_connection()->get_priv();
+ auto s = static_cast<Session*>(priv.get());
if (!s) {
s = new Session(m->get_connection());
- m->get_connection()->set_priv(s->get());
+ priv.reset(s, false);
+ m->get_connection()->set_priv(priv);
}
- s->put();
s->count++;
lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
if (is_server) {
bool ms_handle_reset(Connection *con) override {
Mutex::Locker l(lock);
lderr(g_ceph_context) << __func__ << " " << con << dendl;
- Session *s = static_cast<Session*>(con->get_priv());
- if (s) {
- s->con.reset(NULL); // break con <-> session ref cycle
- con->set_priv(NULL); // break ref <-> session cycle, if any
- s->put();
+ auto priv = con->get_priv();
+ if (auto s = static_cast<Session*>(priv.get()); s) {
+ s->con.reset(); // break con <-> session ref cycle
+ con->set_priv(nullptr); // break ref <-> session cycle, if any
}
return true;
}
void ms_handle_remote_reset(Connection *con) override {
Mutex::Locker l(lock);
lderr(g_ceph_context) << __func__ << " " << con << dendl;
- Session *s = static_cast<Session*>(con->get_priv());
- if (s) {
- s->con.reset(NULL); // break con <-> session ref cycle
- con->set_priv(NULL); // break ref <-> session cycle, if any
- s->put();
+ auto priv = con->get_priv();
+ if (auto s = static_cast<Session*>(priv.get()); s) {
+ s->con.reset(); // break con <-> session ref cycle
+ con->set_priv(nullptr); // break ref <-> session cycle, if any
}
got_remote_reset = true;
cond.Signal();
return false;
}
void ms_fast_dispatch(Message *m) override {
- Session *s = static_cast<Session*>(m->get_connection()->get_priv());
+ auto priv = m->get_connection()->get_priv();
+ auto s = static_cast<Session*>(priv.get());
if (!s) {
s = new Session(m->get_connection());
- m->get_connection()->set_priv(s->get());
+ priv.reset(s, false);
+ m->get_connection()->set_priv(priv);
}
- s->put();
s->count++;
lderr(g_ceph_context) << __func__ << " conn: " << m->get_connection() << " session " << s << " count: " << s->count << dendl;
if (is_server) {
cli_dispatcher.got_new = false;
}
ASSERT_TRUE(conn->is_connected());
- ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
+ ASSERT_EQ(1, static_cast<Session*>(conn->get_priv().get())->get_count());
ASSERT_TRUE(conn->peer_is_osd());
// 2. test rebind port
cli_dispatcher.cond.Wait(cli_dispatcher.lock);
cli_dispatcher.got_new = false;
}
- ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
// 3. test markdown connection
conn->mark_down();
cli_dispatcher.got_new = false;
}
srv_dispatcher.loopback = false;
- ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
client_msgr->shutdown();
client_msgr->wait();
server_msgr->shutdown();
cli_dispatcher.cond.Wait(cli_dispatcher.lock);
cli_dispatcher.got_new = false;
}
- ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
ASSERT_TRUE(conn->get_peer_addr() == server_msgr->get_myaddr());
ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
// Make should server_conn is the one we already accepted from client,
// so it means client_msgr has the same addr when server connection has
- ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
server_msgr->shutdown();
client_msgr->shutdown();
server_msgr->wait();
cli_dispatcher.cond.Wait(cli_dispatcher.lock);
cli_dispatcher.got_new = false;
}
- ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
server_msgr->shutdown();
client_msgr->shutdown();
cli_dispatcher.got_new = false;
}
ASSERT_TRUE(conn->is_connected());
- ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
ASSERT_TRUE(conn->peer_is_osd());
// 2. wait for idle
cli_dispatcher.cond.Wait(cli_dispatcher.lock);
cli_dispatcher.got_new = false;
}
- ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
conn->mark_down();
ASSERT_FALSE(conn->is_connected());
ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
// don't lose state
- ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1);
+ ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
srv_dispatcher.got_new = false;
conn = client_msgr->get_connection(server_msgr->get_myinst());
cli_dispatcher.cond.Wait(cli_dispatcher.lock);
cli_dispatcher.got_new = false;
}
- ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
server_conn = server_msgr->get_connection(client_msgr->get_myinst());
{
Mutex::Locker l(srv_dispatcher.lock);
cli_dispatcher.got_new = false;
}
// resetcheck happen
- ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv())->get_count());
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
server_conn = server_msgr->get_connection(client_msgr->get_myinst());
- ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv())->get_count());
+ ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
cli_dispatcher.got_remote_reset = false;
server_msgr->shutdown();
cli_dispatcher.cond.Wait(cli_dispatcher.lock);
cli_dispatcher.got_new = false;
}
- ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
conn->mark_down();
ASSERT_FALSE(conn->is_connected());
cli_dispatcher.cond.Wait(cli_dispatcher.lock);
cli_dispatcher.got_new = false;
}
- ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
// server lose state
{
while (!srv_dispatcher.got_new)
srv_dispatcher.cond.Wait(srv_dispatcher.lock);
}
- ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv())->get_count());
+ ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
// 2. test for client lossy
server_conn->mark_down();
cli_dispatcher.cond.Wait(cli_dispatcher.lock);
cli_dispatcher.got_new = false;
}
- ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
server_msgr->shutdown();
client_msgr->shutdown();
cli_dispatcher.cond.Wait(cli_dispatcher.lock);
cli_dispatcher.got_new = false;
}
- ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
ASSERT_FALSE(cli_dispatcher.got_remote_reset);
cli_dispatcher.got_connect = false;
cli_dispatcher.cond.Wait(cli_dispatcher.lock);
cli_dispatcher.got_new = false;
}
- ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
server_conn = server_msgr->get_connection(client_msgr->get_myinst());
- ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1);
+ ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
server_msgr->shutdown();
client_msgr->shutdown();
cli_dispatcher.got_new = false;
}
ASSERT_TRUE(conn->is_connected());
- ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
// 2. mix auth
g_ceph_context->_conf->set_val("auth_cluster_required", "none");
cli_dispatcher.got_new = false;
}
ASSERT_TRUE(conn->is_connected());
- ASSERT_TRUE((static_cast<Session*>(conn->get_priv()))->get_count() == 1);
+ ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
server_msgr->shutdown();
client_msgr->shutdown();