From cdb8899a416a239b8eb06dfc5ef9d36feccd330f Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 12 May 2014 16:58:31 -0700 Subject: [PATCH] objecter: shard completion_lock Object ops responses are sharded, lock hashed by object name. This guarantees ordering on the same object. Cross object order is not guaranteed anymore. Signed-off-by: Yehuda Sadeh --- src/common/config_opts.h | 1 + src/osdc/Objecter.cc | 187 +++++++++++++++++++++++++++------------ src/osdc/Objecter.h | 38 ++++++-- 3 files changed, 158 insertions(+), 68 deletions(-) diff --git a/src/common/config_opts.h b/src/common/config_opts.h index ad9ba2623a1..bc22e4ab3b5 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -288,6 +288,7 @@ OPTION(objecter_tick_interval, OPT_DOUBLE, 5.0) OPTION(objecter_timeout, OPT_DOUBLE, 10.0) // before we ask for a map OPTION(objecter_inflight_op_bytes, OPT_U64, 1024*1024*100) // max in-flight data (both directions) OPTION(objecter_inflight_ops, OPT_U64, 1024) // max in-flight ios +OPTION(objecter_completion_locks_per_session, OPT_U64, 32) // num of completion locks per each session, for serializing same object responses OPTION(journaler_allow_split_entries, OPT_BOOL, true) OPTION(journaler_write_head_interval, OPT_INT, 15) OPTION(journaler_prefetch_periods, OPT_INT, 10) // * journal object size diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 5b5e38d8f43..ed364e43e87 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -136,6 +136,14 @@ static const char *config_keys[] = { NULL }; +Mutex *Objecter::OSDSession::get_lock(object_t& oid) +{ +#define HASH_PRIME 1021 + uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size()) % HASH_PRIME; + + return completion_locks[h % num_locks]; +} + const char** Objecter::get_tracked_conf_keys() const { return config_keys; @@ -256,6 +264,8 @@ void Objecter::init() << cpp_strerror(ret) << dendl; } + timer.init(); + rwlock.get_read(); schedule_tick(); @@ -299,6 +309,9 @@ void Objecter::shutdown() delete logger; logger = NULL; } + + timer.shutdown(); + } void Objecter::_send_linger(LingerOp *info) @@ -368,6 +381,12 @@ void Objecter::_linger_commit(LingerOp *info, int r) } void Objecter::unregister_linger(uint64_t linger_id) +{ + RWLock::WLocker wl(rwlock); + _unregister_linger(linger_id); +} + +void Objecter::_unregister_linger(uint64_t linger_id) { map::iterator iter = linger_ops.find(linger_id); if (iter != linger_ops.end()) { @@ -376,6 +395,7 @@ void Objecter::unregister_linger(uint64_t linger_id) info->session->linger_ops.erase(linger_id); info->session->lock.unlock(); linger_ops.erase(iter); + info->canceled = true; info->put(); logger->dec(l_osdc_linger_active); @@ -502,8 +522,11 @@ void Objecter::_scan_requests(OSDSession *s, { assert(rwlock.is_wlocked()); + list unregister_lingers; + RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite); - RWLock::WLocker wl(s->lock); + + s->lock.get_write(); // check for changed linger mappings (_before_ regular ops) map::iterator lp = s->linger_ops.begin(); @@ -511,6 +534,7 @@ void Objecter::_scan_requests(OSDSession *s, LingerOp *op = lp->second; ++lp; // check_linger_pool_dne() may touch linger_ops; prevent iterator invalidation ldout(cct, 10) << " checking linger op " << op->linger_id << dendl; + bool unregister; int r = _calc_target(&op->target); switch (r) { case RECALC_OP_TARGET_NO_ACTION: @@ -525,7 +549,11 @@ void Objecter::_scan_requests(OSDSession *s, _linger_cancel_map_check(op); break; case RECALC_OP_TARGET_POOL_DNE: - _check_linger_pool_dne(op); + _check_linger_pool_dne(op, &unregister); + if (unregister) { + ldout(cct, 10) << " need to unregister linger op " << op->linger_id << dendl; + unregister_lingers.push_back(op->linger_id); + } break; } } @@ -551,7 +579,7 @@ void Objecter::_scan_requests(OSDSession *s, _op_cancel_map_check(op); break; case RECALC_OP_TARGET_POOL_DNE: - _check_op_pool_dne(op); + _check_op_pool_dne(op, true); break; } } @@ -562,7 +590,7 @@ void Objecter::_scan_requests(OSDSession *s, CommandOp *c = cp->second; ++cp; ldout(cct, 10) << " checking command " << c->tid << dendl; - int r = _recalc_command_target(c); + int r = _calc_command_target(c); switch (r) { case RECALC_OP_TARGET_NO_ACTION: // resend if skipped map; otherwise do nothing. @@ -571,6 +599,9 @@ void Objecter::_scan_requests(OSDSession *s, // -- fall-thru -- case RECALC_OP_TARGET_NEED_RESEND: need_resend_command[c->tid] = c; + if (c->session) { + _session_command_op_remove(c); + } _command_cancel_map_check(c); break; case RECALC_OP_TARGET_POOL_DNE: @@ -580,6 +611,12 @@ void Objecter::_scan_requests(OSDSession *s, break; } } + + s->lock.unlock(); + + for (list::iterator iter = unregister_lingers.begin(); iter != unregister_lingers.end(); ++iter) { + _unregister_linger(*iter); + } } void Objecter::handle_osd_map(MOSDMap *m) @@ -752,7 +789,8 @@ void Objecter::handle_osd_map(MOSDMap *m) for (map::iterator p = need_resend_command.begin(); p != need_resend_command.end(); ++p) { CommandOp *c = p->second; - if (c->session) { + _assign_command_session(c); + if (c->session && !c->session->is_homeless()) { _send_command(c); } } @@ -809,7 +847,7 @@ void Objecter::C_Op_Map_Latest::finish(int r) if (op->map_dne_bound == 0) op->map_dne_bound = latest; - objecter->_check_op_pool_dne(op); + objecter->_check_op_pool_dne(op, false); } int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name, snapid_t *snap) @@ -865,7 +903,7 @@ int Objecter::pool_snap_list(int64_t poolid, vector *snaps) return 0; } -void Objecter::_check_op_pool_dne(Op *op) +void Objecter::_check_op_pool_dne(Op *op, bool session_locked) { assert(rwlock.is_wlocked()); @@ -885,8 +923,13 @@ void Objecter::_check_op_pool_dne(Op *op) if (op->oncommit) { op->oncommit->complete(-ENOENT); } - RWLock::WLocker wl(op->session->lock); + if (!session_locked) { + op->session->lock.get_write(); + } _finish_op(op); + if (!session_locked) { + op->session->lock.unlock(); + } } } else { _send_op_map_check(op); @@ -938,13 +981,20 @@ void Objecter::C_Linger_Map_Latest::finish(int r) if (op->map_dne_bound == 0) op->map_dne_bound = latest; - objecter->_check_linger_pool_dne(op); + bool unregister; + objecter->_check_linger_pool_dne(op, &unregister); + + if (unregister) { + objecter->_unregister_linger(op->linger_id); + } } -void Objecter::_check_linger_pool_dne(LingerOp *op) +void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister) { assert(rwlock.is_wlocked()); + *need_unregister = false; + ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id << " current " << osdmap->get_epoch() << " map_dne_bound " << op->map_dne_bound @@ -957,7 +1007,7 @@ void Objecter::_check_linger_pool_dne(LingerOp *op) if (op->on_reg_commit) { op->on_reg_commit->complete(-ENOENT); } - unregister_linger(op->linger_id); + *need_unregister = true; } } else { _send_linger_map_check(op); @@ -1079,7 +1129,7 @@ int Objecter::_get_session(int osd, OSDSession **session, RWLock::Context& lc) if (!lc.is_wlocked()) { return -EAGAIN; } - OSDSession *s = new OSDSession(osd); + OSDSession *s = new OSDSession(cct, osd); osd_sessions[osd] = s; s->con = messenger->get_connection(osdmap->get_inst(osd)); logger->inc(l_osdc_osd_session_open); @@ -1265,8 +1315,10 @@ void Objecter::_kick_requests(OSDSession *session) // resend lingers map lresend; // resend in order for (map::iterator j = session->linger_ops.begin(); j != session->linger_ops.end(); ++j) { + LingerOp *op = j->second; + op->get(); logger->inc(l_osdc_linger_resend); - lresend[j->first] = j->second; + lresend[j->first] = op; } // resend commands @@ -1282,7 +1334,11 @@ void Objecter::_kick_requests(OSDSession *session) session->lock.unlock(); while (!lresend.empty()) { - _send_linger(lresend.begin()->second); + LingerOp *op = lresend.begin()->second; + if (!op->canceled) { + _send_linger(op); + } + op->put(); lresend.erase(lresend.begin()); } } @@ -1296,8 +1352,10 @@ void Objecter::schedule_tick() void Objecter::tick() { - if (!initialized.read()) + if (!initialized.read()) { + schedule_tick(); return; + } assert(rwlock.is_locked()); @@ -1549,7 +1607,7 @@ ceph_tid_t Objecter::_op_submit(Op *op, RWLock::Context& lc) ldout(cct, 10) << "_op_submit oid " << op->target.base_oid << " " << op->target.base_oloc << " " << op->target.target_oloc << " " << op->ops << " tid " << op->tid - << " osd." << (!op->session->is_homeless() ? op->session->osd : -1) + << " osd." << (!s->is_homeless() ? s->osd : -1) << dendl; assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)); @@ -1882,6 +1940,17 @@ void Objecter::_session_linger_op_remove(LingerOp *info) info->session = NULL; } +void Objecter::_session_command_op_remove(CommandOp *op) +{ + assert(rwlock.is_locked()); + OSDSession *s = op->session; + assert(s); + assert(s->lock.is_locked()); + + s->command_ops.erase(op->tid); + op->session = NULL; +} + int Objecter::_get_osd_session(int osd, RWLock::Context& lc, OSDSession **psession) { int r; @@ -1905,29 +1974,6 @@ int Objecter::_get_op_target_session(Op *op, RWLock::Context& lc, OSDSession **p return _get_osd_session(op->target.osd, lc, psession); } -void Objecter::_session_op_validate(Op *op, RWLock::Context& lc, bool session_locked) -{ - assert(rwlock.is_locked()); - - if (op->session && op->session->osd != op->target.osd) { - OSDSession *orig_session = op->session; - _session_op_remove(op); - put_session(orig_session); - } -} - -int Objecter::_recalc_op_target(Op *op, RWLock::Context& lc, - bool src_session_locked) -{ - assert(rwlock.is_locked()); - - int r = _calc_target(&op->target); - if (r == RECALC_OP_TARGET_NEED_RESEND) { - _session_op_validate(op, lc, src_session_locked); - } - return r; -} - bool Objecter::_promote_lock_check_race(RWLock::Context& lc) { epoch_t epoch = osdmap->get_epoch(); @@ -1995,7 +2041,7 @@ void Objecter::_finish_op(Op *op) timer.cancel_event(op->ontimeout); } - op->session->put(); + put_session(op->session); inflight_ops.dec(); @@ -2138,7 +2184,7 @@ void Objecter::unregister_op(Op *op) op->session->lock.get_write(); op->session->ops.erase(op->tid); op->session->lock.unlock(); - op->session->put(); + put_session(op->session); op->session = NULL; inflight_ops.dec(); @@ -2314,6 +2360,9 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) op->outbl = 0; } + /* get it before we call _finish_op() */ + Mutex *completion_lock = s->get_lock(op->target.base_oid); + // done with this tid? if (!op->onack && !op->oncommit) { ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl; @@ -2323,7 +2372,8 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) ldout(cct, 5) << num_unacked.read() << " unacked, " << num_uncommitted.read() << " uncommitted" << dendl; // serialize completions - s->completion_lock.Lock(); + + completion_lock->Lock(); s->lock.unlock(); // do callbacks @@ -2333,7 +2383,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) if (oncommit) { oncommit->complete(rc); } - s->completion_lock.Unlock(); + completion_lock->Unlock(); m->put(); s->put(); @@ -3374,13 +3424,16 @@ int Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid) { RWLock::WLocker wl(rwlock); + RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite); + ceph_tid_t tid = last_tid.inc(); ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl; c->tid = tid; homeless_session.command_ops[tid] = c; num_homeless_ops.inc(); c->session = &homeless_session; - (void)_recalc_command_target(c); + (void)_calc_command_target(c); + _assign_command_session(c); if (osd_timeout > 0) { c->ontimeout = new C_CancelCommandOp(c->session, tid, this); timer.add_event_after(osd_timeout, c->ontimeout); @@ -3401,14 +3454,14 @@ int Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid) return 0; } -int Objecter::_recalc_command_target(CommandOp *c) +int Objecter::_calc_command_target(CommandOp *c) { assert(rwlock.is_wlocked()); RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite); - OSDSession *s = &homeless_session; c->map_check_error = 0; + if (c->target_osd >= 0) { if (!osdmap->exists(c->target_osd)) { c->map_check_error = -ENOENT; @@ -3420,25 +3473,45 @@ int Objecter::_recalc_command_target(CommandOp *c) c->map_check_error_str = "osd down"; return RECALC_OP_TARGET_OSD_DOWN; } - int r = _get_session(c->target_osd, &s, lc); - assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */ + c->osd = c->target_osd; } else { if (!osdmap->have_pg_pool(c->target_pg.pool())) { c->map_check_error = -ENOENT; c->map_check_error_str = "pool dne"; return RECALC_OP_TARGET_POOL_DNE; } - int primary; vector acting; - osdmap->pg_to_acting_osds(c->target_pg, &acting, &primary); - if (primary != -1) { - int r = _get_session(primary, &s, lc); - assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */ - } + osdmap->pg_to_acting_osds(c->target_pg, &acting, &c->osd); } + OSDSession *s; + int r = _get_session(c->osd, &s, lc); + assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */ + if (c->session != s) { - ldout(cct, 10) << "_recalc_command_target " << c->tid << " now " << c->session << dendl; + put_session(s); + return RECALC_OP_TARGET_NEED_RESEND; + } + + put_session(s); + + ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, " << c->session << dendl; + + return RECALC_OP_TARGET_NO_ACTION; +} + +void Objecter::_assign_command_session(CommandOp *c) +{ + assert(rwlock.is_wlocked()); + + RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite); + + OSDSession *s; + int r = _get_session(c->osd, &s, lc); + assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */ + + if (c->session != s) { + ldout(cct, 10) << "_assign_command_session " << c->tid << " now " << c->session << dendl; if (c->session) { if (c->session->is_homeless()) { num_homeless_ops.dec(); @@ -3458,13 +3531,9 @@ int Objecter::_recalc_command_target(CommandOp *c) if (s->is_homeless()) num_homeless_ops.inc(); s->lock.unlock(); - return RECALC_OP_TARGET_NEED_RESEND; } else { put_session(s); } - - ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, " << c->session << dendl; - return RECALC_OP_TARGET_NO_ACTION; } void Objecter::_send_command(CommandOp *c) diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index e6e6c06966d..7111d3387df 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -1326,6 +1326,7 @@ public: string *prs; int target_osd; pg_t target_pg; + int osd; /* calculated osd for sending request */ epoch_t map_dne_bound; int map_check_error; // error to return if map check fails const char *map_check_error_str; @@ -1334,7 +1335,7 @@ public: CommandOp() : session(NULL), - tid(0), poutbl(NULL), prs(NULL), target_osd(-1), + tid(0), poutbl(NULL), prs(NULL), target_osd(-1), osd(-1), map_dne_bound(0), map_check_error(0), map_check_error_str(NULL), @@ -1342,7 +1343,8 @@ public: }; int submit_command(CommandOp *c, ceph_tid_t *ptid); - int _recalc_command_target(CommandOp *c); + int _calc_command_target(CommandOp *c); + void _assign_command_session(CommandOp *c); void _send_command(CommandOp *c); int command_op_cancel(OSDSession *s, ceph_tid_t tid, int r); void _finish_command(CommandOp *c, int r, string rs); @@ -1366,6 +1368,7 @@ public: version_t *pobjver; bool registered; + bool canceled; Context *on_reg_ack, *on_reg_commit; OSDSession *session; @@ -1378,6 +1381,7 @@ public: snap(CEPH_NOSNAP), poutbl(NULL), pobjver(NULL), registered(false), + canceled(false), on_reg_ack(NULL), on_reg_commit(NULL), session(NULL), register_tid(0), @@ -1430,7 +1434,7 @@ public: // -- osd sessions -- struct OSDSession : public RefCountedObject { RWLock lock; - Mutex completion_lock; + Mutex **completion_locks; // pending ops map ops; @@ -1439,11 +1443,27 @@ public: int osd; int incarnation; + int num_locks; ConnectionRef con; - OSDSession(int o) : lock("OSDSession"), completion_lock("OSDSession::completion_lock"), osd(o), incarnation(0), con(NULL) {} + OSDSession(CephContext *cct, int o) : lock("OSDSession"), osd(o), incarnation(0), con(NULL) { + num_locks = cct->_conf->objecter_completion_locks_per_session; + completion_locks = new Mutex *[num_locks]; + for (int i = 0; i < num_locks; i++) { + completion_locks[i] = new Mutex("OSDSession::completion_lock"); + } + } + + ~OSDSession() { + for (int i = 0; i < num_locks; i++) { + delete completion_locks[i]; + } + delete[] completion_locks; + } bool is_homeless() { return (osd == -1); } + + Mutex *get_lock(object_t& oid); }; map osd_sessions; @@ -1494,12 +1514,11 @@ public: RWLock::Context& lc); void _session_op_remove(Op *op); void _session_linger_op_remove(LingerOp *info); + void _session_command_op_remove(CommandOp *op); void _session_op_assign(Op *op, OSDSession *s); int _get_osd_session(int osd, RWLock::Context& lc, OSDSession **psession); int _assign_op_target_session(Op *op, RWLock::Context& lc, bool src_session_locked, bool dst_session_locked); int _get_op_target_session(Op *op, RWLock::Context& lc, OSDSession **psession); - void _session_op_validate(Op *op, RWLock::Context& lc, bool session_locked); - int _recalc_op_target(Op *op, RWLock::Context& lc, bool session_locked = false); int _recalc_linger_op_target(LingerOp *op, RWLock::Context& lc); void _linger_submit(LingerOp *info); @@ -1507,10 +1526,10 @@ public: void _linger_ack(LingerOp *info, int r); void _linger_commit(LingerOp *info, int r); - void _check_op_pool_dne(Op *op); + void _check_op_pool_dne(Op *op, bool session_locked); void _send_op_map_check(Op *op); void _op_cancel_map_check(Op *op); - void _check_linger_pool_dne(LingerOp *op); + void _check_linger_pool_dne(LingerOp *op, bool *need_unregister); void _send_linger_map_check(LingerOp *op); void _linger_cancel_map_check(LingerOp *op); void _check_command_map_dne(CommandOp *op); @@ -1574,7 +1593,7 @@ public: logger(NULL), tick_event(NULL), m_request_state_hook(NULL), num_homeless_ops(0), - homeless_session(-1), + homeless_session(cct, -1), mon_timeout(mon_timeout), osd_timeout(osd_timeout), op_throttle_bytes(cct, "objecter_bytes", cct->_conf->objecter_inflight_op_bytes), @@ -1771,6 +1790,7 @@ public: Context *onack, version_t *objver); void unregister_linger(uint64_t linger_id); + void _unregister_linger(uint64_t linger_id); /** * set up initial ops in the op vector, and allocate a final op slot. -- 2.47.3