NULL
};
+Mutex *Objecter::OSDSession::get_lock(object_t& oid)
+{
+#define HASH_PRIME 1021
+ uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size()) % HASH_PRIME;
+
+ return completion_locks[h % num_locks];
+}
+
const char** Objecter::get_tracked_conf_keys() const
{
return config_keys;
<< cpp_strerror(ret) << dendl;
}
+ timer.init();
+
rwlock.get_read();
schedule_tick();
delete logger;
logger = NULL;
}
+
+ timer.shutdown();
+
}
void Objecter::_send_linger(LingerOp *info)
}
void Objecter::unregister_linger(uint64_t linger_id)
+{
+ RWLock::WLocker wl(rwlock);
+ _unregister_linger(linger_id);
+}
+
+void Objecter::_unregister_linger(uint64_t linger_id)
{
map<uint64_t, LingerOp*>::iterator iter = linger_ops.find(linger_id);
if (iter != linger_ops.end()) {
info->session->linger_ops.erase(linger_id);
info->session->lock.unlock();
linger_ops.erase(iter);
+ info->canceled = true;
info->put();
logger->dec(l_osdc_linger_active);
{
assert(rwlock.is_wlocked());
+ list<uint64_t> unregister_lingers;
+
RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
- RWLock::WLocker wl(s->lock);
+
+ s->lock.get_write();
// check for changed linger mappings (_before_ regular ops)
map<ceph_tid_t,LingerOp*>::iterator lp = s->linger_ops.begin();
LingerOp *op = lp->second;
++lp; // check_linger_pool_dne() may touch linger_ops; prevent iterator invalidation
ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
+ bool unregister;
int r = _calc_target(&op->target);
switch (r) {
case RECALC_OP_TARGET_NO_ACTION:
_linger_cancel_map_check(op);
break;
case RECALC_OP_TARGET_POOL_DNE:
- _check_linger_pool_dne(op);
+ _check_linger_pool_dne(op, &unregister);
+ if (unregister) {
+ ldout(cct, 10) << " need to unregister linger op " << op->linger_id << dendl;
+ unregister_lingers.push_back(op->linger_id);
+ }
break;
}
}
_op_cancel_map_check(op);
break;
case RECALC_OP_TARGET_POOL_DNE:
- _check_op_pool_dne(op);
+ _check_op_pool_dne(op, true);
break;
}
}
CommandOp *c = cp->second;
++cp;
ldout(cct, 10) << " checking command " << c->tid << dendl;
- int r = _recalc_command_target(c);
+ int r = _calc_command_target(c);
switch (r) {
case RECALC_OP_TARGET_NO_ACTION:
// resend if skipped map; otherwise do nothing.
// -- fall-thru --
case RECALC_OP_TARGET_NEED_RESEND:
need_resend_command[c->tid] = c;
+ if (c->session) {
+ _session_command_op_remove(c);
+ }
_command_cancel_map_check(c);
break;
case RECALC_OP_TARGET_POOL_DNE:
break;
}
}
+
+ s->lock.unlock();
+
+ for (list<uint64_t>::iterator iter = unregister_lingers.begin(); iter != unregister_lingers.end(); ++iter) {
+ _unregister_linger(*iter);
+ }
}
void Objecter::handle_osd_map(MOSDMap *m)
for (map<ceph_tid_t,CommandOp*>::iterator p = need_resend_command.begin();
p != need_resend_command.end(); ++p) {
CommandOp *c = p->second;
- if (c->session) {
+ _assign_command_session(c);
+ if (c->session && !c->session->is_homeless()) {
_send_command(c);
}
}
if (op->map_dne_bound == 0)
op->map_dne_bound = latest;
- objecter->_check_op_pool_dne(op);
+ objecter->_check_op_pool_dne(op, false);
}
int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name, snapid_t *snap)
return 0;
}
-void Objecter::_check_op_pool_dne(Op *op)
+void Objecter::_check_op_pool_dne(Op *op, bool session_locked)
{
assert(rwlock.is_wlocked());
if (op->oncommit) {
op->oncommit->complete(-ENOENT);
}
- RWLock::WLocker wl(op->session->lock);
+ if (!session_locked) {
+ op->session->lock.get_write();
+ }
_finish_op(op);
+ if (!session_locked) {
+ op->session->lock.unlock();
+ }
}
} else {
_send_op_map_check(op);
if (op->map_dne_bound == 0)
op->map_dne_bound = latest;
- objecter->_check_linger_pool_dne(op);
+ bool unregister;
+ objecter->_check_linger_pool_dne(op, &unregister);
+
+ if (unregister) {
+ objecter->_unregister_linger(op->linger_id);
+ }
}
-void Objecter::_check_linger_pool_dne(LingerOp *op)
+void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister)
{
assert(rwlock.is_wlocked());
+ *need_unregister = false;
+
ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
<< " current " << osdmap->get_epoch()
<< " map_dne_bound " << op->map_dne_bound
if (op->on_reg_commit) {
op->on_reg_commit->complete(-ENOENT);
}
- unregister_linger(op->linger_id);
+ *need_unregister = true;
}
} else {
_send_linger_map_check(op);
if (!lc.is_wlocked()) {
return -EAGAIN;
}
- OSDSession *s = new OSDSession(osd);
+ OSDSession *s = new OSDSession(cct, osd);
osd_sessions[osd] = s;
s->con = messenger->get_connection(osdmap->get_inst(osd));
logger->inc(l_osdc_osd_session_open);
// resend lingers
map<uint64_t, LingerOp*> lresend; // resend in order
for (map<ceph_tid_t, LingerOp*>::iterator j = session->linger_ops.begin(); j != session->linger_ops.end(); ++j) {
+ LingerOp *op = j->second;
+ op->get();
logger->inc(l_osdc_linger_resend);
- lresend[j->first] = j->second;
+ lresend[j->first] = op;
}
// resend commands
session->lock.unlock();
while (!lresend.empty()) {
- _send_linger(lresend.begin()->second);
+ LingerOp *op = lresend.begin()->second;
+ if (!op->canceled) {
+ _send_linger(op);
+ }
+ op->put();
lresend.erase(lresend.begin());
}
}
void Objecter::tick()
{
- if (!initialized.read())
+ if (!initialized.read()) {
+ schedule_tick();
return;
+ }
assert(rwlock.is_locked());
ldout(cct, 10) << "_op_submit oid " << op->target.base_oid
<< " " << op->target.base_oloc << " " << op->target.target_oloc
<< " " << op->ops << " tid " << op->tid
- << " osd." << (!op->session->is_homeless() ? op->session->osd : -1)
+ << " osd." << (!s->is_homeless() ? s->osd : -1)
<< dendl;
assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
info->session = NULL;
}
+void Objecter::_session_command_op_remove(CommandOp *op)
+{
+ assert(rwlock.is_locked());
+ OSDSession *s = op->session;
+ assert(s);
+ assert(s->lock.is_locked());
+
+ s->command_ops.erase(op->tid);
+ op->session = NULL;
+}
+
int Objecter::_get_osd_session(int osd, RWLock::Context& lc, OSDSession **psession)
{
int r;
return _get_osd_session(op->target.osd, lc, psession);
}
-void Objecter::_session_op_validate(Op *op, RWLock::Context& lc, bool session_locked)
-{
- assert(rwlock.is_locked());
-
- if (op->session && op->session->osd != op->target.osd) {
- OSDSession *orig_session = op->session;
- _session_op_remove(op);
- put_session(orig_session);
- }
-}
-
-int Objecter::_recalc_op_target(Op *op, RWLock::Context& lc,
- bool src_session_locked)
-{
- assert(rwlock.is_locked());
-
- int r = _calc_target(&op->target);
- if (r == RECALC_OP_TARGET_NEED_RESEND) {
- _session_op_validate(op, lc, src_session_locked);
- }
- return r;
-}
-
bool Objecter::_promote_lock_check_race(RWLock::Context& lc)
{
epoch_t epoch = osdmap->get_epoch();
timer.cancel_event(op->ontimeout);
}
- op->session->put();
+ put_session(op->session);
inflight_ops.dec();
op->session->lock.get_write();
op->session->ops.erase(op->tid);
op->session->lock.unlock();
- op->session->put();
+ put_session(op->session);
op->session = NULL;
inflight_ops.dec();
op->outbl = 0;
}
+ /* get it before we call _finish_op() */
+ Mutex *completion_lock = s->get_lock(op->target.base_oid);
+
// done with this tid?
if (!op->onack && !op->oncommit) {
ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
ldout(cct, 5) << num_unacked.read() << " unacked, " << num_uncommitted.read() << " uncommitted" << dendl;
// serialize completions
- s->completion_lock.Lock();
+
+ completion_lock->Lock();
s->lock.unlock();
// do callbacks
if (oncommit) {
oncommit->complete(rc);
}
- s->completion_lock.Unlock();
+ completion_lock->Unlock();
m->put();
s->put();
{
RWLock::WLocker wl(rwlock);
+ RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
+
ceph_tid_t tid = last_tid.inc();
ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
c->tid = tid;
homeless_session.command_ops[tid] = c;
num_homeless_ops.inc();
c->session = &homeless_session;
- (void)_recalc_command_target(c);
+ (void)_calc_command_target(c);
+ _assign_command_session(c);
if (osd_timeout > 0) {
c->ontimeout = new C_CancelCommandOp(c->session, tid, this);
timer.add_event_after(osd_timeout, c->ontimeout);
return 0;
}
-int Objecter::_recalc_command_target(CommandOp *c)
+int Objecter::_calc_command_target(CommandOp *c)
{
assert(rwlock.is_wlocked());
RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
- OSDSession *s = &homeless_session;
c->map_check_error = 0;
+
if (c->target_osd >= 0) {
if (!osdmap->exists(c->target_osd)) {
c->map_check_error = -ENOENT;
c->map_check_error_str = "osd down";
return RECALC_OP_TARGET_OSD_DOWN;
}
- int r = _get_session(c->target_osd, &s, lc);
- assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
+ c->osd = c->target_osd;
} else {
if (!osdmap->have_pg_pool(c->target_pg.pool())) {
c->map_check_error = -ENOENT;
c->map_check_error_str = "pool dne";
return RECALC_OP_TARGET_POOL_DNE;
}
- int primary;
vector<int> acting;
- osdmap->pg_to_acting_osds(c->target_pg, &acting, &primary);
- if (primary != -1) {
- int r = _get_session(primary, &s, lc);
- assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
- }
+ osdmap->pg_to_acting_osds(c->target_pg, &acting, &c->osd);
}
+ OSDSession *s;
+ int r = _get_session(c->osd, &s, lc);
+ assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
+
if (c->session != s) {
- ldout(cct, 10) << "_recalc_command_target " << c->tid << " now " << c->session << dendl;
+ put_session(s);
+ return RECALC_OP_TARGET_NEED_RESEND;
+ }
+
+ put_session(s);
+
+ ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, " << c->session << dendl;
+
+ return RECALC_OP_TARGET_NO_ACTION;
+}
+
+void Objecter::_assign_command_session(CommandOp *c)
+{
+ assert(rwlock.is_wlocked());
+
+ RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
+
+ OSDSession *s;
+ int r = _get_session(c->osd, &s, lc);
+ assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
+
+ if (c->session != s) {
+ ldout(cct, 10) << "_assign_command_session " << c->tid << " now " << c->session << dendl;
if (c->session) {
if (c->session->is_homeless()) {
num_homeless_ops.dec();
if (s->is_homeless())
num_homeless_ops.inc();
s->lock.unlock();
- return RECALC_OP_TARGET_NEED_RESEND;
} else {
put_session(s);
}
-
- ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, " << c->session << dendl;
- return RECALC_OP_TARGET_NO_ACTION;
}
void Objecter::_send_command(CommandOp *c)
string *prs;
int target_osd;
pg_t target_pg;
+ int osd; /* calculated osd for sending request */
epoch_t map_dne_bound;
int map_check_error; // error to return if map check fails
const char *map_check_error_str;
CommandOp()
: session(NULL),
- tid(0), poutbl(NULL), prs(NULL), target_osd(-1),
+ tid(0), poutbl(NULL), prs(NULL), target_osd(-1), osd(-1),
map_dne_bound(0),
map_check_error(0),
map_check_error_str(NULL),
};
int submit_command(CommandOp *c, ceph_tid_t *ptid);
- int _recalc_command_target(CommandOp *c);
+ int _calc_command_target(CommandOp *c);
+ void _assign_command_session(CommandOp *c);
void _send_command(CommandOp *c);
int command_op_cancel(OSDSession *s, ceph_tid_t tid, int r);
void _finish_command(CommandOp *c, int r, string rs);
version_t *pobjver;
bool registered;
+ bool canceled;
Context *on_reg_ack, *on_reg_commit;
OSDSession *session;
snap(CEPH_NOSNAP),
poutbl(NULL), pobjver(NULL),
registered(false),
+ canceled(false),
on_reg_ack(NULL), on_reg_commit(NULL),
session(NULL),
register_tid(0),
// -- osd sessions --
struct OSDSession : public RefCountedObject {
RWLock lock;
- Mutex completion_lock;
+ Mutex **completion_locks;
// pending ops
map<ceph_tid_t,Op*> ops;
int osd;
int incarnation;
+ int num_locks;
ConnectionRef con;
- OSDSession(int o) : lock("OSDSession"), completion_lock("OSDSession::completion_lock"), osd(o), incarnation(0), con(NULL) {}
+ OSDSession(CephContext *cct, int o) : lock("OSDSession"), osd(o), incarnation(0), con(NULL) {
+ num_locks = cct->_conf->objecter_completion_locks_per_session;
+ completion_locks = new Mutex *[num_locks];
+ for (int i = 0; i < num_locks; i++) {
+ completion_locks[i] = new Mutex("OSDSession::completion_lock");
+ }
+ }
+
+ ~OSDSession() {
+ for (int i = 0; i < num_locks; i++) {
+ delete completion_locks[i];
+ }
+ delete[] completion_locks;
+ }
bool is_homeless() { return (osd == -1); }
+
+ Mutex *get_lock(object_t& oid);
};
map<int,OSDSession*> osd_sessions;
RWLock::Context& lc);
void _session_op_remove(Op *op);
void _session_linger_op_remove(LingerOp *info);
+ void _session_command_op_remove(CommandOp *op);
void _session_op_assign(Op *op, OSDSession *s);
int _get_osd_session(int osd, RWLock::Context& lc, OSDSession **psession);
int _assign_op_target_session(Op *op, RWLock::Context& lc, bool src_session_locked, bool dst_session_locked);
int _get_op_target_session(Op *op, RWLock::Context& lc, OSDSession **psession);
- void _session_op_validate(Op *op, RWLock::Context& lc, bool session_locked);
- int _recalc_op_target(Op *op, RWLock::Context& lc, bool session_locked = false);
int _recalc_linger_op_target(LingerOp *op, RWLock::Context& lc);
void _linger_submit(LingerOp *info);
void _linger_ack(LingerOp *info, int r);
void _linger_commit(LingerOp *info, int r);
- void _check_op_pool_dne(Op *op);
+ void _check_op_pool_dne(Op *op, bool session_locked);
void _send_op_map_check(Op *op);
void _op_cancel_map_check(Op *op);
- void _check_linger_pool_dne(LingerOp *op);
+ void _check_linger_pool_dne(LingerOp *op, bool *need_unregister);
void _send_linger_map_check(LingerOp *op);
void _linger_cancel_map_check(LingerOp *op);
void _check_command_map_dne(CommandOp *op);
logger(NULL), tick_event(NULL),
m_request_state_hook(NULL),
num_homeless_ops(0),
- homeless_session(-1),
+ homeless_session(cct, -1),
mon_timeout(mon_timeout),
osd_timeout(osd_timeout),
op_throttle_bytes(cct, "objecter_bytes", cct->_conf->objecter_inflight_op_bytes),
Context *onack,
version_t *objver);
void unregister_linger(uint64_t linger_id);
+ void _unregister_linger(uint64_t linger_id);
/**
* set up initial ops in the op vector, and allocate a final op slot.