close_session(p->second);
}
+ while(!check_latest_map_lingers.empty()) {
+ map<uint64_t, LingerOp*>::iterator i = check_latest_map_lingers.begin();
+ i->second->put();
+ check_latest_map_lingers.erase(i->first);
+ }
+
+ while(!check_latest_map_ops.empty()) {
+ map<ceph_tid_t, Op*>::iterator i = check_latest_map_ops.begin();
+ i->second->put();
+ check_latest_map_ops.erase(i->first);
+ }
+
+ while(!check_latest_map_commands.empty()) {
+ map<ceph_tid_t, CommandOp*>::iterator i = check_latest_map_commands.begin();
+ i->second->put();
+ check_latest_map_commands.erase(i->first);
+ }
+
+ while(!poolstat_ops.empty()) {
+ map<ceph_tid_t,PoolStatOp*>::iterator i = poolstat_ops.begin();
+ delete i->second;
+ poolstat_ops.erase(i->first);
+ }
+
+ while(!statfs_ops.empty()) {
+ map<ceph_tid_t, StatfsOp*>::iterator i = statfs_ops.begin();
+ delete i->second;
+ statfs_ops.erase(i->first);
+ }
+
+ while(!pool_ops.empty()) {
+ map<ceph_tid_t, PoolOp*>::iterator i = pool_ops.begin();
+ delete i->second;
+ pool_ops.erase(i->first);
+ }
+
+ ldout(cct, 20) << __func__ << " clearing up homeless session..." << dendl;
+ while(!homeless_session->linger_ops.empty()) {
+ std::map<uint64_t, LingerOp*>::iterator i = homeless_session->linger_ops.begin();
+ ldout(cct, 10) << " linger_op " << i->first << dendl;
+ {
+ RWLock::WLocker wl(homeless_session->lock);
+ _session_linger_op_remove(homeless_session, i->second);
+ }
+ linger_ops.erase(i->second->linger_id);
+ i->second->put();
+ }
+
+ while(!homeless_session->ops.empty()) {
+ std::map<ceph_tid_t, Op*>::iterator i = homeless_session->ops.begin();
+ ldout(cct, 10) << " op " << i->first << dendl;
+ {
+ RWLock::WLocker wl(homeless_session->lock);
+ _session_op_remove(homeless_session, i->second);
+ }
+ i->second->put();
+ }
+
+ while(!homeless_session->command_ops.empty()) {
+ std::map<ceph_tid_t, CommandOp*>::iterator i = homeless_session->command_ops.begin();
+ ldout(cct, 10) << " command_op " << i->first << dendl;
+ {
+ RWLock::WLocker wl(homeless_session->lock);
+ _session_command_op_remove(homeless_session, i->second);
+ }
+ i->second->put();
+ }
+
if (tick_event) {
timer.cancel_event(tick_event);
tick_event = NULL;
void Objecter::_unregister_linger(uint64_t linger_id)
{
assert(rwlock.is_wlocked());
+ ldout(cct, 20) << __func__ << " linger_id=" << linger_id << dendl;
map<uint64_t, LingerOp*>::iterator iter = linger_ops.find(linger_id);
if (iter != linger_ops.end()) {
LingerOp *info = iter->second;
- info->session->lock.get_write();
- info->session->linger_ops.erase(linger_id);
- info->session->lock.unlock();
+ OSDSession *s = info->session;
+ s->lock.get_write();
+ _session_linger_op_remove(s, info);
+ s->lock.unlock();
+
linger_ops.erase(iter);
info->canceled = true;
info->put();
assert(rwlock.is_wlocked());
RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
+ // Acquire linger ID
info->linger_id = ++max_linger_id;
ldout(cct, 10) << __func__ << " info " << info
<< " linger_id " << info->linger_id << dendl;
linger_ops[info->linger_id] = info;
+ // Populate Op::target
OSDSession *s = NULL;
_calc_target(&info->target);
+
+ // Create LingerOp<->OSDSession relation
int r = _get_session(info->target.osd, &s, lc);
assert(r == 0);
-
- info->session = s;
-
s->lock.get_write();
- s->linger_ops[info->linger_id] = info;
+ _session_linger_op_assign(s, info);
s->lock.unlock();
+ put_session(s);
+
_send_linger(info);
}
-
-
bool Objecter::ms_dispatch(Message *m)
{
ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
handle_pool_op_reply(static_cast<MPoolOpReply*>(m));
return true;
+ case CEPH_MSG_STATFS_REPLY:
+ handle_fs_stats_reply(static_cast<MStatfsReply*>(m));
+ return true;
+
// these we give others a chance to inspect
// MDS, OSD
case CEPH_MSG_OSD_MAP:
- m->get();
handle_osd_map(static_cast<MOSDMap*>(m));
return false;
-
- // Client
- case CEPH_MSG_STATFS_REPLY:
- m->get();
- handle_fs_stats_reply(static_cast<MStatfsReply*>(m));
- return false;
}
return false;
}
++lp; // check_linger_pool_dne() may touch linger_ops; prevent iterator invalidation
ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
bool unregister;
- int r = _calc_target(&op->target);
+ int r = _recalc_linger_op_target(op, lc);
switch (r) {
case RECALC_OP_TARGET_NO_ACTION:
if (!force_resend && !force_resend_writes)
break;
// -- fall-thru --
case RECALC_OP_TARGET_NEED_RESEND:
- if (op->session) {
- _session_linger_op_remove(op);
- }
need_resend_linger.push_back(op);
_linger_cancel_map_check(op);
break;
// -- fall-thru --
case RECALC_OP_TARGET_NEED_RESEND:
if (op->session) {
- _session_op_remove(op);
+ _session_op_remove(op->session, op);
}
need_resend[op->tid] = op;
_op_cancel_map_check(op);
case RECALC_OP_TARGET_NEED_RESEND:
need_resend_command[c->tid] = c;
if (c->session) {
- _session_command_op_remove(c);
+ _session_command_op_remove(c->session, c);
}
_command_cancel_map_check(c);
break;
if (m->fsid != monc->get_fsid()) {
ldout(cct, 0) << "handle_osd_map fsid " << m->fsid
<< " != " << monc->get_fsid() << dendl;
- m->put();
return;
}
}
s->lock.get_write();
if (mapped_session) {
- _session_op_assign(op, s);
+ _session_op_assign(s, op);
+ put_session(s);
}
if (op->should_resend) {
if (!op->session->is_homeless() && !op->target.paused) {
LingerOp *op = *p;
if (!op->session) {
_calc_target(&op->target);
- int r = _get_session(op->target.osd, &op->session, lc);
+ OSDSession *s = NULL;
+ int const r = _get_session(op->target.osd, &s, lc);
assert(r == 0);
+ assert(s != NULL);
+ op->session = s;
+ put_session(s);
}
if (!op->session->is_homeless()) {
logger->inc(l_osdc_linger_resend);
waiting_for_map.erase(p++);
}
- m->put();
-
monc->sub_got("osdmap", osdmap->get_epoch());
if (!waiting_for_map.empty()) {
if (op->oncommit) {
op->oncommit->complete(-ENOENT);
}
+
+ OSDSession *s = op->session;
+ assert(s != NULL);
+
if (!session_locked) {
- op->session->lock.get_write();
+ s->lock.get_write();
}
_finish_op(op);
if (!session_locked) {
- op->session->lock.unlock();
+ s->lock.unlock();
}
}
} else {
LingerOp *op = iter->second;
objecter->check_latest_map_lingers.erase(iter);
+ assert(op->get_nref() > 1); // something other than check_latest_map_lingers should
+ // have a ref to this guy too
op->put();
if (op->map_dne_bound == 0)
}
-
+/**
+ * Look up OSDSession by OSD id.
+ *
+ * @returns 0 on success, or -EAGAIN if the lock context requires promotion to write.
+ */
int Objecter::_get_session(int osd, OSDSession **session, RWLock::Context& lc)
{
assert(rwlock.is_locked());
if (osd < 0) {
*session = homeless_session;
+ ldout(cct, 20) << __func__ << " osd=" << osd << " returning homeless" << dendl;
return 0;
}
OSDSession *s = p->second;
s->get();
*session = s;
+ ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " " << s->get_nref() << dendl;
return 0;
}
if (!lc.is_wlocked()) {
logger->inc(l_osdc_osd_sessions, osd_sessions.size());
s->get();
*session = s;
+ ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " " << s->get_nref() << dendl;
return 0;
}
void Objecter::put_session(Objecter::OSDSession *s)
{
if (s && !s->is_homeless()) {
+ ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " " << s->get_nref() << dendl;
s->put();
}
}
+void Objecter::get_session(Objecter::OSDSession *s)
+{
+ assert(s != NULL);
+
+ if (!s->is_homeless()) {
+ ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " " << s->get_nref() << dendl;
+ s->get();
+ }
+}
+
void Objecter::_reopen_session(OSDSession *s)
{
assert(s->lock.is_locked());
logger->inc(l_osdc_osd_session_close);
}
s->lock.get_write();
- s->ops.clear();
- s->linger_ops.clear();
- s->command_ops.clear();
+
+ while(!s->linger_ops.empty()) {
+ std::map<uint64_t, LingerOp*>::iterator i = s->linger_ops.begin();
+ ldout(cct, 10) << " linger_op " << i->first << dendl;
+ _session_linger_op_remove(s, i->second);
+ {
+ RWLock::WLocker wl(homeless_session->lock);
+ _session_linger_op_assign(homeless_session, i->second);
+ }
+ }
+
+ while(!s->ops.empty()) {
+ std::map<ceph_tid_t, Op*>::iterator i = s->ops.begin();
+ ldout(cct, 10) << " op " << i->first << dendl;
+ _session_op_remove(s, i->second);
+ {
+ RWLock::WLocker wl(homeless_session->lock);
+ _session_op_assign(homeless_session, i->second);
+ }
+ }
+
+ while(!s->command_ops.empty()) {
+ std::map<ceph_tid_t, CommandOp*>::iterator i = s->command_ops.begin();
+ ldout(cct, 10) << " command_op " << i->first << dendl;
+ _session_command_op_remove(s, i->second);
+ {
+ RWLock::WLocker wl(homeless_session->lock);
+ _session_command_op_assign(homeless_session, i->second);
+ }
+ }
+
osd_sessions.erase(s->osd);
s->lock.unlock();
- s->put();
+ assert(s->get_nref() == 1); // We reassigned any/all ops, so should be last ref
+ put_session(s);
logger->set(l_osdc_osd_sessions, osd_sessions.size());
}
LingerOp *op = j->second;
op->get();
logger->inc(l_osdc_linger_resend);
+ assert(lresend.count(j->first) == 0);
lresend[j->first] = op;
}
ldout(cct, 10) << __func__ << " op " << op << dendl;
// pick target
- int r;
assert(op->session == NULL);
OSDSession *s = NULL;
- bool check_for_latest_map;
+ bool const check_for_latest_map = _calc_target(&op->target) == RECALC_OP_TARGET_POOL_DNE;
- while (true) {
- r = _calc_target(&op->target);
- check_for_latest_map = (r == RECALC_OP_TARGET_POOL_DNE);
- if (_get_session(op->target.osd, &s, lc) == -EAGAIN ||
- (check_for_latest_map && lc.is_rlocked())) {
- lc.promote();
- continue;
- }
- break;
+ // Try to get a session, including a retry if we need to take write lock
+ int r = _get_session(op->target.osd, &s, lc);
+ if (r == -EAGAIN) {
+ assert(s == NULL);
+ lc.promote();
+ r = _get_session(op->target.osd, &s, lc);
}
+ assert(r == 0);
assert(s); // may be homeless
+ // We may need to take wlock if we will need to _set_op_map_check later.
+ if (check_for_latest_map && !lc.is_wlocked()) {
+ lc.promote();
+ }
+
inflight_ops.inc();
// add to gather set(s)
_maybe_request_map();
}
- MOSDOp *m = _prepare_osd_op(op);
+ MOSDOp *m = NULL;
+ if (need_send) {
+ m = _prepare_osd_op(op);
+ }
s->lock.get_write();
if (op->tid == 0)
op->tid = last_tid.inc();
- _session_op_assign(op, s);
+ _session_op_assign(s, op);
if (need_send) {
_send_op(op, m);
}
- s->lock.unlock();
+ // Last chance to touch Op here, after giving up session lock it can be
+ // freed at any time by response handler.
+ ceph_tid_t tid = op->tid;
if (check_for_latest_map) {
_send_op_map_check(op);
}
+ op = NULL;
+
+ s->lock.unlock();
+ put_session(s);
ldout(cct, 5) << num_unacked.read() << " unacked, " << num_uncommitted.read() << " uncommitted" << dendl;
- return op->tid;
+ return tid;
}
int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
s->lock.unlock();
}
- if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
- ret = op_cancel(homeless_session, tid, r);
+ // Handle case where the op is in homeless session
+ {
+ RWLock::RLocker hs_lc(homeless_session->lock);
+ if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
+ ret = op_cancel(homeless_session, tid, r);
+ }
}
+
rwlock.unlock();
return ret;
return _get_session(target->osd, s, lc);
}
-void Objecter::_session_op_assign(Op *op, OSDSession *to)
+void Objecter::_session_op_assign(OSDSession *to, Op *op)
{
- assert(rwlock.is_locked());
assert(to->lock.is_locked());
- assert(!op->session);
+ assert(op->session == NULL);
assert(op->tid);
+ get_session(to);
op->session = to;
to->ops[op->tid] = op;
if (to->is_homeless()) {
num_homeless_ops.inc();
}
+
+ ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
}
-void Objecter::_session_op_remove(Op *op)
+void Objecter::_session_op_remove(OSDSession *from, Op *op)
{
- assert(rwlock.is_locked());
- OSDSession *s = op->session;
- assert(s);
- assert(s->lock.is_locked());
+ assert(op->session == from);
+ assert(from->lock.is_locked());
- if (s->is_homeless()) {
+ if (from->is_homeless()) {
num_homeless_ops.dec();
}
- s->ops.erase(op->tid);
+
+ from->ops.erase(op->tid);
+ put_session(from);
op->session = NULL;
+
+ ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
}
-void Objecter::_session_linger_op_remove(LingerOp *info)
+void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
{
- assert(rwlock.is_locked());
- OSDSession *s = info->session;
- assert(s);
- assert(s->lock.is_locked());
+ assert(to->lock.is_wlocked());
+ assert(op->session == NULL);
+
+ if (to->is_homeless()) {
+ num_homeless_ops.inc();
+ }
- s->linger_ops.erase(info->linger_id);
- info->session = NULL;
+ get_session(to);
+ op->session = to;
+ to->linger_ops[op->linger_id] = op;
+
+ ldout(cct, 15) << __func__ << " " << to->osd << " " << op->linger_id << dendl;
}
-void Objecter::_session_command_op_remove(CommandOp *op)
+void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
{
- assert(rwlock.is_locked());
- OSDSession *s = op->session;
- assert(s);
- assert(s->lock.is_locked());
+ assert(from == op->session);
+ assert(from->lock.is_locked());
- s->command_ops.erase(op->tid);
+ if (from->is_homeless()) {
+ num_homeless_ops.dec();
+ }
+
+ from->linger_ops.erase(op->linger_id);
+ put_session(from);
op->session = NULL;
+
+ ldout(cct, 15) << __func__ << " " << from->osd << " " << op->linger_id << dendl;
+}
+
+void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
+{
+ assert(from == op->session);
+ assert(from->lock.is_locked());
+
+ if (from->is_homeless()) {
+ num_homeless_ops.dec();
+ }
+
+ from->command_ops.erase(op->tid);
+ put_session(from);
+ op->session = NULL;
+
+ ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
+}
+
+void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
+{
+ assert(to->lock.is_locked());
+ assert(op->session == NULL);
+ assert(op->tid);
+
+ if (to->is_homeless()) {
+ num_homeless_ops.inc();
+ }
+
+ get_session(to);
+ op->session = to;
+ to->command_ops[op->tid] = op;
+
+ ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
}
int Objecter::_get_osd_session(int osd, RWLock::Context& lc, OSDSession **psession)
int Objecter::_recalc_linger_op_target(LingerOp *linger_op, RWLock::Context& lc)
{
- assert(rwlock.is_locked());
+ assert(rwlock.is_wlocked());
int r = _calc_target(&linger_op->target);
if (r == RECALC_OP_TARGET_NEED_RESEND) {
OSDSession *s;
r = _get_osd_session(linger_op->target.osd, lc, &s);
if (r < 0) {
+ // We have no session for the new destination for the op, so leave it
+ // in this session to be handled again next time we scan requests
return r;
}
- s->lock.get_write();
-
if (linger_op->session != s) {
- linger_op->session = s;
- s->linger_ops[linger_op->register_tid] = linger_op;
+ // NB locking two sessions (s and linger_op->session) at the same time here
+ // is only safe because we are the only one that takes two, and we are
+ // holding rwlock for write.
+ s->lock.get_write();
+ _session_linger_op_remove(linger_op->session, linger_op);
+ _session_linger_op_assign(s, linger_op);
+ s->lock.unlock();
}
- s->lock.unlock();
put_session(s);
return RECALC_OP_TARGET_NEED_RESEND;
}
if (op->budgeted)
put_op_budget(op);
- op->session->ops.erase(op->tid);
+ _session_op_remove(op->session, op);
logger->dec(l_osdc_op_active);
timer.cancel_event(op->ontimeout);
}
- put_session(op->session);
-
inflight_ops.dec();
op->put();
}
OSDSession *s = siter->second;
- s->get();
+ get_session(s);
s->lock.get_write();
<< (m->is_ondisk() ? " ondisk":(m->is_onnvram() ? " onnvram":" ack"))
<< " ... stray" << dendl;
s->lock.unlock();
- s->put();
+ put_session(s);
m->put();
return;
}
<< op->session->con->get_peer_addr() << dendl;
m->put();
s->lock.unlock();
- s->put();
+ put_session(s);
return;
}
} else {
num_unacked.dec();
if (op->oncommit)
num_uncommitted.dec();
- _session_op_remove(op);
+ _session_op_remove(s, op);
s->lock.unlock();
- s->put();
+ put_session(s);
// FIXME: two redirects could race and reorder
_send_op(op);
s->lock.unlock();
+ put_session(s);
m->put();
return;
}
}
m->put();
- s->put();
+ put_session(s);
}
ldout(cct, 10) << "unknown request " << tid << dendl;
}
ldout(cct, 10) << "done" << dendl;
- m->put();
}
int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
ceph_tid_t tid = last_tid.inc();
ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
c->tid = tid;
- homeless_session->command_ops[tid] = c;
- num_homeless_ops.inc();
- c->session = homeless_session;
+
+ {
+ RWLock::WLocker hs_wl(homeless_session->lock);
+ _session_command_op_assign(homeless_session, c);
+ }
+
(void)_calc_command_target(c);
_assign_command_session(c);
if (osd_timeout > 0) {
assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
if (c->session != s) {
- ldout(cct, 10) << "_assign_command_session " << c->tid << " now " << c->session << dendl;
if (c->session) {
- if (c->session->is_homeless()) {
- num_homeless_ops.dec();
- }
- if (c->tid) {
- c->session->lock.get_write();
- c->session->command_ops.erase(c->tid);
- c->session->lock.unlock();
- }
- put_session(c->session);
+ OSDSession *cs = c->session;
+ cs->lock.get_write();
+ _session_command_op_remove(c->session, c);
+ cs->lock.unlock();
}
- c->session = s;
s->lock.get_write();
- if (c->tid) {
- s->command_ops[c->tid] = c;
- }
- if (s->is_homeless())
- num_homeless_ops.inc();
+ _session_command_op_assign(s, c);
s->lock.unlock();
- } else {
- put_session(s);
}
+
+ put_session(s);
}
void Objecter::_send_command(CommandOp *c)
*c->prs = rs;
if (c->onfinish)
c->onfinish->complete(r);
- c->session->lock.get_write();
- c->session->command_ops.erase(c->tid);
- c->session->lock.unlock();
+
+ OSDSession *s = c->session;
+ s->lock.get_write();
+ _session_command_op_remove(c->session, c);
+ s->lock.unlock();
+
if (c->ontimeout) {
timer.cancel_event(c->ontimeout);
}
logger->dec(l_osdc_command_active);
}
+
+Objecter::OSDSession::~OSDSession()
+{
+ // Caller is responsible for re-assigning or
+ // destroying any ops that were assigned to us
+ assert(ops.empty());
+ assert(linger_ops.empty());
+ assert(command_ops.empty());
+
+ for (int i = 0; i < num_locks; i++) {
+ delete completion_locks[i];
+ }
+ delete[] completion_locks;
+}
+
+Objecter::~Objecter()
+{
+ delete osdmap;
+
+
+
+ assert(homeless_session->get_nref() == 1);
+ assert(num_homeless_ops.read() == 0);
+ homeless_session->put();
+
+ assert(osd_sessions.empty());
+ assert(poolstat_ops.empty());
+ assert(statfs_ops.empty());
+ assert(pool_ops.empty());
+ assert(waiting_for_map.empty());
+ assert(linger_ops.empty());
+ assert(check_latest_map_lingers.empty());
+ assert(check_latest_map_ops.empty());
+ assert(check_latest_map_commands.empty());
+
+ assert(!tick_event);
+ assert(!m_request_state_hook);
+ assert(!logger);
+}
+