OSDSession *s = new OSDSession(cct, osd);
osd_sessions[osd] = s;
s->con = messenger->get_connection(osdmap->get_inst(osd));
+ s->con->set_priv(s->get());
logger->inc(l_osdc_osd_session_open);
logger->inc(l_osdc_osd_sessions, osd_sessions.size());
s->get();
ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now "
<< inst << dendl;
if (s->con) {
+ s->con->set_priv(NULL);
s->con->mark_down();
logger->inc(l_osdc_osd_session_close);
}
s->con = messenger->get_connection(inst);
+ s->con->set_priv(s->get());
s->incarnation++;
logger->inc(l_osdc_osd_session_open);
}
ldout(cct, 10) << "close_session for osd." << s->osd << dendl;
if (s->con) {
+ s->con->set_priv(NULL);
s->con->mark_down();
logger->inc(l_osdc_osd_session_close);
}
// get pio
ceph_tid_t tid = m->get_tid();
- int osd_num = (int)m->get_source().num();
-
shunique_lock sul(rwlock, ceph::acquire_shared);
if (!initialized.read()) {
m->put();
return;
}
- map<int, OSDSession *>::iterator siter = osd_sessions.find(osd_num);
- if (siter == osd_sessions.end()) {
- ldout(cct, 7) << "handle_osd_op_reply " << tid
- << (m->is_ondisk() ? " ondisk":(m->is_onnvram() ?
- " onnvram":" ack"))
- << " ... unknown osd" << dendl;
+ ConnectionRef con = m->get_connection();
+ OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+ if (!s || s->con != con) {
+ ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
+ if (s) {
+ s->put();
+ }
m->put();
return;
}
- OSDSession *s = siter->second;
- get_session(s);
-
OSDSession::unique_lock sl(s->lock);
map<ceph_tid_t, Op *>::iterator iter = s->ops.find(tid);
if (!initialized.read())
return false;
if (con->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
- int osd = osdmap->identify_osd(con->get_peer_addr());
- if (osd >= 0) {
- ldout(cct, 1) << "ms_handle_reset on osd." << osd << dendl;
+ OSDSession *session = static_cast<OSDSession*>(con->get_priv());
+ if (session) {
+ ldout(cct, 1) << "ms_handle_reset " << con << " session " << session
+ << " osd." << session->osd << dendl;
unique_lock wl(rwlock);
if (!initialized.read()) {
wl.unlock();
return false;
}
- map<int,OSDSession*>::iterator p = osd_sessions.find(osd);
- if (p != osd_sessions.end()) {
- OSDSession *session = p->second;
- map<uint64_t, LingerOp *> lresend;
- OSDSession::unique_lock sl(session->lock);
- _reopen_session(session);
- _kick_requests(session, lresend);
- sl.unlock();
- _linger_ops_resend(lresend, wl);
- wl.unlock();
- maybe_request_map();
- } else {
- wl.unlock();
- }
- } else {
- ldout(cct, 10) << "ms_handle_reset on unknown osd addr "
- << con->get_peer_addr() << dendl;
+ map<uint64_t, LingerOp *> lresend;
+ OSDSession::unique_lock sl(session->lock);
+ _reopen_session(session);
+ _kick_requests(session, lresend);
+ sl.unlock();
+ _linger_ops_resend(lresend, wl);
+ wl.unlock();
+ maybe_request_map();
+ session->put();
}
return true;
}
void Objecter::handle_command_reply(MCommandReply *m)
{
- int osd_num = (int)m->get_source().num();
-
unique_lock wl(rwlock);
if (!initialized.read()) {
m->put();
return;
}
- map<int, OSDSession *>::iterator siter = osd_sessions.find(osd_num);
- if (siter == osd_sessions.end()) {
- ldout(cct, 10) << "handle_command_reply tid " << m->get_tid()
- << " osd not found" << dendl;
+ ConnectionRef con = m->get_connection();
+ OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+ if (!s || s->con != con) {
+ ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
m->put();
+ if (s)
+ s->put();
return;
}
- OSDSession *s = siter->second;
-
OSDSession::shared_lock sl(s->lock);
map<ceph_tid_t,CommandOp*>::iterator p = s->command_ops.find(m->get_tid());
if (p == s->command_ops.end()) {
<< " not found" << dendl;
m->put();
sl.unlock();
+ if (s)
+ s->put();
return;
}
<< dendl;
m->put();
sl.unlock();
+ if (s)
+ s->put();
return;
}
- if (c->poutbl)
+ if (c->poutbl) {
c->poutbl->claim(m->get_data());
+ }
sl.unlock();
_finish_command(c, m->r, m->rs);
m->put();
+ if (s)
+ s->put();
}
int Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)