From 3cc48278bf0ee5c9535d04b60a661f988c50063b Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 20 Feb 2017 11:49:08 -0500 Subject: [PATCH] osd: restructure op_shardedwq This is difficult to break into pieces, so one big fat commit it is. A few trivial bits - include epoch in PGQueueable. - PGQueuable operator<< - remove op_wq ref from OSDService; use simple set of queue methods instead The big stuff: - Fast dispatch now passes messages directly to the queue based on an spg_t. The exception is MOSDOp's from legacy clients. We add a waiting_for_map mechanism on the front-side that is similar to but simpler than the previous one so that we can map those legacy requests to an accurate spg_t. - The dequeue path now has a waiting_for_pg mechanism. It also uses a much simpler set of data structures that should make it much faster than the previous incarnation. - Shutdown works a bit differently; we drain the queue instead of trying to remove work for individual PGs. This lets us remove the dequeue_pg machinery. Signed-off-by: Sage Weil --- src/common/config_opts.h | 2 + src/osd/OSD.cc | 1238 ++++++++++++++++---------------------- src/osd/OSD.h | 445 ++++++-------- src/osd/OSDMap.h | 22 +- src/osd/OpRequest.cc | 1 - src/osd/OpRequest.h | 14 +- src/osd/PG.cc | 86 ++- src/osd/PG.h | 68 ++- src/osd/PrimaryLogPG.cc | 34 +- src/osd/Session.h | 8 - 10 files changed, 838 insertions(+), 1080 deletions(-) diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 71d6dd6f6d7..c631a54e57d 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -853,6 +853,8 @@ OPTION(osd_backoff_on_degraded, OPT_BOOL, false) // [mainly for debug?] object u OPTION(osd_backoff_on_down, OPT_BOOL, true) // pg in down/incomplete state OPTION(osd_backoff_on_peering, OPT_BOOL, false) // [debug] pg peering OPTION(osd_debug_crash_on_ignored_backoff, OPT_BOOL, false) // crash osd if client ignores a backoff; useful for debugging +OPTION(osd_debug_inject_dispatch_delay_probability, OPT_DOUBLE, 0) +OPTION(osd_debug_inject_dispatch_delay_duration, OPT_DOUBLE, .1) OPTION(osd_debug_drop_ping_probability, OPT_DOUBLE, 0) OPTION(osd_debug_drop_ping_duration, OPT_INT, 0) OPTION(osd_debug_op_order, OPT_BOOL, false) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index eba3a132824..537f96f6242 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -223,7 +223,6 @@ OSDService::OSDService(OSD *osd) : logger(osd->logger), recoverystate_perf(osd->recoverystate_perf), monc(osd->monc), - op_wq(osd->op_shardedwq), peering_wq(osd->peering_wq), recovery_gen_wq("recovery_gen_wq", cct->_conf->osd_recovery_thread_timeout, &osd->disk_tp), @@ -1042,7 +1041,6 @@ void OSDService::share_map( } } - void OSDService::share_map_peer(int peer, Connection *con, OSDMapRef map) { if (!map) @@ -1459,14 +1457,14 @@ void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op) } } +void OSDService::enqueue_back(spg_t pgid, PGQueueable qi) +{ + osd->op_shardedwq.queue(make_pair(pgid, qi)); +} -void OSDService::dequeue_pg(PG *pg, list *dequeued) +void OSDService::enqueue_front(spg_t pgid, PGQueueable qi) { - FUNCTRACE(); - if (dequeued) - osd->op_shardedwq.dequeue_and_get_ops(pg, dequeued); - else - osd->op_shardedwq.dequeue(pg); + osd->op_shardedwq.queue_front(make_pair(pgid, qi)); } void OSDService::queue_for_peering(PG *pg) @@ -1474,17 +1472,19 @@ void OSDService::queue_for_peering(PG *pg) peering_wq.queue(pg); } -void OSDService::queue_for_snap_trim(PG *pg) { +void OSDService::queue_for_snap_trim(PG *pg) +{ dout(10) << "queueing " << *pg << " for snaptrim" << dendl; - op_wq.queue( + osd->op_shardedwq.queue( make_pair( - pg, + pg->info.pgid, PGQueueable( PGSnapTrim(pg->get_osdmap()->get_epoch()), cct->_conf->osd_snap_trim_cost, cct->_conf->osd_snap_trim_priority, ceph_clock_now(), - entity_inst_t()))); + entity_inst_t(), + pg->get_osdmap()->get_epoch()))); } @@ -2766,7 +2766,9 @@ int OSD::shutdown() service.start_shutdown(); - clear_waiting_sessions(); + // stop sending work to pgs. this just prevents any new work in _process + // from racing with on_shutdown and potentially entering the pg after. + op_shardedwq.drain(); // Shutdown PGs { @@ -2783,12 +2785,14 @@ int OSD::shutdown() } clear_pg_stat_queue(); - // finish ops - op_shardedwq.drain(); // should already be empty except for laggard PGs + // drain op queue again (in case PGs requeued something) + op_shardedwq.drain(); { finished.clear(); // zap waiters (bleh, this is messy) } + op_shardedwq.clear_pg_slots(); + // unregister commands cct->get_admin_socket()->unregister_command("status"); cct->get_admin_socket()->unregister_command("flush_journal"); @@ -3118,7 +3122,7 @@ void OSD::recursive_remove_collection(CephContext* cct, vector objects; store->collection_list(tmp, ghobject_t(), ghobject_t::get_max(), INT_MAX, &objects, 0); - + generic_dout(10) << __func__ << " " << objects << dendl; // delete them. int removed = 0; for (vector::iterator p = objects.begin(); @@ -3318,34 +3322,6 @@ PG *OSD::_create_lock_pg( return pg; } -PGRef OSD::get_pg_or_queue_for_pg(const spg_t& pgid, OpRequestRef& op, - Session *session) -{ - if (!session) { - return PGRef(); - } - // get_pg_or_queue_for_pg is only called from the fast_dispatch path where - // the session_dispatch_lock must already be held. - assert(session->session_dispatch_lock.is_locked()); - RWLock::RLocker l(pg_map_lock); - - ceph::unordered_map::iterator i = pg_map.find(pgid); - if (i == pg_map.end()) - session->waiting_for_pg[pgid]; - - auto wlistiter = session->waiting_for_pg.find(pgid); - - PG *out = NULL; - if (wlistiter == session->waiting_for_pg.end()) { - out = i->second; - } else { - op->get(); - wlistiter->second.push_back(*op); - register_session_waiting_on_pg(session, pgid); - } - return PGRef(out); -} - PG *OSD::_lookup_lock_pg(spg_t pgid) { RWLock::RLocker l(pg_map_lock); @@ -3750,12 +3726,12 @@ void OSD::handle_pg_peering_evt( dout(10) << *pg << " is new" << dendl; pg->queue_peering_event(evt); + wake_pg_waiters(pg); pg->unlock(); - wake_pg_waiters(pgid); return; } case RES_SELF: { - old_pg_state->lock(); + old_pg_state->lock(); OSDMapRef old_osd_map = old_pg_state->get_osdmap(); int old_role = old_pg_state->role; vector old_up = old_pg_state->up; @@ -3785,8 +3761,8 @@ void OSD::handle_pg_peering_evt( dout(10) << *pg << " is new (resurrected)" << dendl; pg->queue_peering_event(evt); + wake_pg_waiters(pg); pg->unlock(); - wake_pg_waiters(resurrected); return; } case RES_PARENT: { @@ -3826,8 +3802,8 @@ void OSD::handle_pg_peering_evt( //parent->queue_peering_event(evt); parent->queue_null(osdmap->get_epoch(), osdmap->get_epoch()); + wake_pg_waiters(parent); parent->unlock(); - wake_pg_waiters(resurrected); return; } } @@ -4790,6 +4766,7 @@ bool remove_dir( store->get_ideal_list_max(), &olist, &next); + generic_dout(10) << __func__ << " " << olist << dendl; // default cont to true, this is safe because caller(OSD::RemoveWQ::_process()) // will recheck the answer before it really goes on. bool cont = true; @@ -6014,9 +5991,6 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector& cmd, buffe } } - - - bool OSD::heartbeat_dispatch(Message *m) { dout(30) << "heartbeat_dispatch " << m << dendl; @@ -6065,110 +6039,71 @@ bool OSD::ms_dispatch(Message *m) return true; } -void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap) +void OSD::maybe_share_map( + Session *session, + OpRequestRef op, + OSDMapRef osdmap) { - assert(session->session_dispatch_lock.is_locked()); - assert(session->osdmap == osdmap); - - auto i = session->waiting_on_map.begin(); - while (i != session->waiting_on_map.end()) { - OpRequest *op = &(*i); - session->waiting_on_map.erase(i++); - if (!dispatch_op_fast(op, osdmap)) { - session->waiting_on_map.push_front(*op); - break; - } - op->put(); + if (!op->check_send_map) { + return; } + epoch_t last_sent_epoch = 0; - if (session->waiting_on_map.empty()) { - clear_session_waiting_on_map(session); - } else { - register_session_waiting_on_map(session); - } - session->maybe_reset_osdmap(); -} + session->sent_epoch_lock.lock(); + last_sent_epoch = session->last_sent_epoch; + session->sent_epoch_lock.unlock(); + const Message *m = op->get_req(); + service.share_map( + m->get_source(), + m->get_connection().get(), + op->sent_epoch, + osdmap, + session ? &last_sent_epoch : NULL); -void OSD::update_waiting_for_pg(Session *session, OSDMapRef newmap) -{ - assert(session->session_dispatch_lock.is_locked()); - if (!session->osdmap) { - session->osdmap = newmap; - return; + session->sent_epoch_lock.lock(); + if (session->last_sent_epoch < last_sent_epoch) { + session->last_sent_epoch = last_sent_epoch; } + session->sent_epoch_lock.unlock(); - if (newmap->get_epoch() == session->osdmap->get_epoch()) - return; + op->check_send_map = false; +} - assert(newmap->get_epoch() > session->osdmap->get_epoch()); +void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap) +{ + assert(session->session_dispatch_lock.is_locked()); - map > from; - from.swap(session->waiting_for_pg); + auto i = session->waiting_on_map.begin(); + while (i != session->waiting_on_map.end()) { + OpRequestRef op = &(*i); + assert(ms_can_fast_dispatch(op->get_req())); + const MOSDFastDispatchOp *m = static_cast( + op->get_req()); + if (m->get_map_epoch() > osdmap->get_epoch()) { + break; + } + session->waiting_on_map.erase(i++); + op->put(); - for (auto i = from.begin(); i != from.end(); from.erase(i++)) { - set children; - if (!newmap->have_pg_pool(i->first.pool())) { - // drop this wait list on the ground - i->second.clear_and_dispose(TrackedOp::Putter()); - } else { - assert(session->osdmap->have_pg_pool(i->first.pool())); - if (i->first.is_split( - session->osdmap->get_pg_num(i->first.pool()), - newmap->get_pg_num(i->first.pool()), - &children)) { - for (set::iterator child = children.begin(); - child != children.end(); - ++child) { - unsigned split_bits = child->get_split_bits( - newmap->get_pg_num(child->pool())); - boost::intrusive::list child_ops; - OSD::split_list(&i->second, &child_ops, child->ps(), split_bits); - if (!child_ops.empty()) { - session->waiting_for_pg[*child].swap(child_ops); - register_session_waiting_on_pg(session, *child); - } - } + spg_t pgid; + if (m->get_type() == CEPH_MSG_OSD_OP) { + pg_t actual_pgid = osdmap->raw_pg_to_pg( + static_cast(m)->get_pg()); + if (!osdmap->get_primary_shard(actual_pgid, &pgid)) { + continue; } - } - if (i->second.empty()) { - clear_session_waiting_on_pg(session, i->first); } else { - session->waiting_for_pg[i->first].swap(i->second); + pgid = m->get_spg(); } + enqueue_op(pgid, op, m->get_map_epoch()); } - session->osdmap = newmap; -} - -void OSD::session_notify_pg_create( - Session *session, OSDMapRef osdmap, spg_t pgid) -{ - assert(session->session_dispatch_lock.is_locked()); - update_waiting_for_pg(session, osdmap); - auto i = session->waiting_for_pg.find(pgid); - if (i != session->waiting_for_pg.end()) { - session->waiting_on_map.splice( - session->waiting_on_map.begin(), - i->second); - assert(i->second.empty()); - session->waiting_for_pg.erase(i); - } - clear_session_waiting_on_pg(session, pgid); -} - -void OSD::session_notify_pg_cleared( - Session *session, OSDMapRef osdmap, spg_t pgid) -{ - assert(session->session_dispatch_lock.is_locked()); - update_waiting_for_pg(session, osdmap); - auto i = session->waiting_for_pg.find(pgid); - if (i != session->waiting_for_pg.end()) { - i->second.clear_and_dispose(TrackedOp::Putter()); - session->waiting_for_pg.erase(i); + if (session->waiting_on_map.empty()) { + clear_session_waiting_on_map(session); + } else { + register_session_waiting_on_map(session); } - session->maybe_reset_osdmap(); - clear_session_waiting_on_pg(session, pgid); } void OSD::ms_fast_dispatch(Message *m) @@ -6186,19 +6121,36 @@ void OSD::ms_fast_dispatch(Message *m) tracepoint(osd, ms_fast_dispatch, reqid.name._type, reqid.name._num, reqid.tid, reqid.inc); } - OSDMapRef nextmap = service.get_nextmap_reserved(); - Session *session = static_cast(m->get_connection()->get_priv()); - if (session) { - { - Mutex::Locker l(session->session_dispatch_lock); - update_waiting_for_pg(session, nextmap); - op->get(); - session->waiting_on_map.push_back(*op); - dispatch_session_waiting(session, nextmap); + + // note sender epoch + op->sent_epoch = static_cast(m)->get_map_epoch(); + + service.maybe_inject_dispatch_delay(); + + if (m->get_connection()->has_features(CEPH_FEATUREMASK_RESEND_ON_SPLIT) || + m->get_type() != CEPH_MSG_OSD_OP) { + // queue it directly + enqueue_op( + static_cast(m)->get_spg(), + op, + static_cast(m)->get_map_epoch()); + } else { + // legacy client, and this is an MOSDOp (the *only* fast dispatch + // message that didn't have an explicit spg_t); we need to map + // them to an spg_t while preserving delivery order. + Session *session = static_cast(m->get_connection()->get_priv()); + if (session) { + { + Mutex::Locker l(session->session_dispatch_lock); + op->get(); + session->waiting_on_map.push_back(*op); + OSDMapRef nextmap = service.get_nextmap_reserved(); + dispatch_session_waiting(session, nextmap); + service.release_map(nextmap); + } + session->put(); } - session->put(); } - service.release_map(nextmap); OID_EVENT_TRACE_WITH_MSG(m, "MS_FAST_DISPATCH_END", false); } @@ -6320,73 +6272,6 @@ void OSD::do_waiters() dout(10) << "do_waiters -- finish" << dendl; } -template -epoch_t replica_op_required_epoch(OpRequestRef op) -{ - const T *m = static_cast(op->get_req()); - assert(m->get_type() == MSGTYPE); - return m->map_epoch; -} - -epoch_t op_required_epoch(OpRequestRef op) -{ - switch (op->get_req()->get_type()) { - case CEPH_MSG_OSD_OP: { - const MOSDOp *m = static_cast(op->get_req()); - return m->get_map_epoch(); - } - case CEPH_MSG_OSD_BACKOFF: { - const MOSDBackoff *m = static_cast(op->get_req()); - return m->map_epoch; - } - case MSG_OSD_SUBOP: - return replica_op_required_epoch(op); - case MSG_OSD_REPOP: - return replica_op_required_epoch(op); - case MSG_OSD_SUBOPREPLY: - return replica_op_required_epoch( - op); - case MSG_OSD_REPOPREPLY: - return replica_op_required_epoch( - op); - case MSG_OSD_PG_PUSH: - return replica_op_required_epoch( - op); - case MSG_OSD_PG_PULL: - return replica_op_required_epoch( - op); - case MSG_OSD_PG_PUSH_REPLY: - return replica_op_required_epoch( - op); - case MSG_OSD_PG_SCAN: - return replica_op_required_epoch(op); - case MSG_OSD_PG_BACKFILL: - return replica_op_required_epoch( - op); - case MSG_OSD_EC_WRITE: - return replica_op_required_epoch(op); - case MSG_OSD_EC_WRITE_REPLY: - return replica_op_required_epoch(op); - case MSG_OSD_EC_READ: - return replica_op_required_epoch(op); - case MSG_OSD_EC_READ_REPLY: - return replica_op_required_epoch(op); - case MSG_OSD_REP_SCRUB: - return replica_op_required_epoch(op); - case MSG_OSD_PG_UPDATE_LOG_MISSING: - return replica_op_required_epoch< - MOSDPGUpdateLogMissing, MSG_OSD_PG_UPDATE_LOG_MISSING>( - op); - case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY: - return replica_op_required_epoch< - MOSDPGUpdateLogMissingReply, MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY>( - op); - default: - ceph_abort(); - return 0; - } -} - void OSD::dispatch_op(OpRequestRef op) { switch (op->get_req()->get_type()) { @@ -6421,95 +6306,6 @@ void OSD::dispatch_op(OpRequestRef op) } } -bool OSD::dispatch_op_fast(OpRequestRef op, OSDMapRef& osdmap) -{ - if (is_stopping()) { - // we're shutting down, so drop the op - return true; - } - - epoch_t msg_epoch(op_required_epoch(op)); - if (msg_epoch > osdmap->get_epoch()) { - Session *s = static_cast(op->get_req()-> - get_connection()->get_priv()); - if (s) { - s->received_map_lock.lock(); - epoch_t received_epoch = s->received_map_epoch; - s->received_map_lock.unlock(); - if (received_epoch < msg_epoch) { - osdmap_subscribe(msg_epoch, false); - } - s->put(); - } - return false; - } - - switch(op->get_req()->get_type()) { - // client ops - case CEPH_MSG_OSD_OP: - handle_op(op, osdmap); - break; - case CEPH_MSG_OSD_BACKOFF: - handle_backoff(op, osdmap); - break; - // for replication etc. - case MSG_OSD_SUBOP: - handle_replica_op(op, osdmap); - break; - case MSG_OSD_REPOP: - handle_replica_op(op, osdmap); - break; - case MSG_OSD_SUBOPREPLY: - handle_replica_op(op, osdmap); - break; - case MSG_OSD_REPOPREPLY: - handle_replica_op(op, osdmap); - break; - case MSG_OSD_PG_PUSH: - handle_replica_op(op, osdmap); - break; - case MSG_OSD_PG_PULL: - handle_replica_op(op, osdmap); - break; - case MSG_OSD_PG_PUSH_REPLY: - handle_replica_op(op, osdmap); - break; - case MSG_OSD_PG_SCAN: - handle_replica_op(op, osdmap); - break; - case MSG_OSD_PG_BACKFILL: - handle_replica_op(op, osdmap); - break; - case MSG_OSD_EC_WRITE: - handle_replica_op(op, osdmap); - break; - case MSG_OSD_EC_WRITE_REPLY: - handle_replica_op(op, osdmap); - break; - case MSG_OSD_EC_READ: - handle_replica_op(op, osdmap); - break; - case MSG_OSD_EC_READ_REPLY: - handle_replica_op(op, osdmap); - break; - case MSG_OSD_REP_SCRUB: - handle_replica_op(op, osdmap); - break; - case MSG_OSD_PG_UPDATE_LOG_MISSING: - handle_replica_op( - op, osdmap); - break; - case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY: - handle_replica_op( - op, osdmap); - break; - default: - ceph_abort(); - } - return true; -} - void OSD::_dispatch(Message *m) { assert(osd_lock.is_locked()); @@ -7557,28 +7353,17 @@ void OSD::consume_map() service.await_reserved_maps(); service.publish_map(osdmap); + service.maybe_inject_dispatch_delay(); + dispatch_sessions_waiting_on_map(); + service.maybe_inject_dispatch_delay(); + // remove any PGs which we no longer host from the session waiting_for_pg lists - set pgs_to_check; - get_pgs_with_waiting_sessions(&pgs_to_check); - for (set::iterator p = pgs_to_check.begin(); - p != pgs_to_check.end(); - ++p) { - if (!(osdmap->is_acting_osd_shard(spg_t(p->pgid, p->shard), whoami))) { - set concerned_sessions; - get_sessions_possibly_interested_in_pg(*p, &concerned_sessions); - for (set::iterator i = concerned_sessions.begin(); - i != concerned_sessions.end(); - ++i) { - { - Mutex::Locker l((*i)->session_dispatch_lock); - session_notify_pg_cleared(*i, osdmap, *p); - } - (*i)->put(); - } - } - } + dout(20) << __func__ << " checking waiting_for_pg" << dendl; + op_shardedwq.prune_pg_waiters(osdmap, whoami); + + service.maybe_inject_dispatch_delay(); // scan pg's { @@ -8550,6 +8335,9 @@ void OSD::_remove_pg(PG *pg) service.pg_remove_epoch(pg->info.pgid); + // dereference from op_wq + op_shardedwq.clear_pg_pointer(pg->info.pgid); + // remove from map pg_map.erase(pg->info.pgid); pg->put("PGMap"); // since we've taken it out of map @@ -8718,232 +8506,6 @@ bool OSDService::is_recovery_active() // ========================================================= // OPS -class C_SendMap : public GenContext { - OSD *osd; - entity_name_t name; - ConnectionRef con; - OSDMapRef osdmap; - epoch_t map_epoch; - -public: - C_SendMap(OSD *osd, entity_name_t n, const ConnectionRef& con, - OSDMapRef& osdmap, epoch_t map_epoch) : - osd(osd), name(n), con(con), osdmap(osdmap), map_epoch(map_epoch) { - } - - void finish(ThreadPool::TPHandle& tp) override { - Session *session = static_cast( - con->get_priv()); - epoch_t last_sent_epoch; - if (session) { - session->sent_epoch_lock.lock(); - last_sent_epoch = session->last_sent_epoch; - session->sent_epoch_lock.unlock(); - } - osd->service.share_map( - name, - con.get(), - map_epoch, - osdmap, - session ? &last_sent_epoch : NULL); - if (session) { - session->sent_epoch_lock.lock(); - if (session->last_sent_epoch < last_sent_epoch) { - session->last_sent_epoch = last_sent_epoch; - } - session->sent_epoch_lock.unlock(); - session->put(); - } - } -}; - -struct send_map_on_destruct { - OSD *osd; - entity_name_t name; - ConnectionRef con; - OSDMapRef osdmap; - epoch_t map_epoch; - bool should_send; - send_map_on_destruct(OSD *osd, const Message *m, - OSDMapRef& osdmap, epoch_t map_epoch) - : osd(osd), name(m->get_source()), con(m->get_connection()), - osdmap(osdmap), map_epoch(map_epoch), - should_send(true) { } - ~send_map_on_destruct() { - if (!should_send) - return; - osd->service.op_gen_wq.queue(new C_SendMap(osd, name, con, - osdmap, map_epoch)); - } -}; - -void OSD::handle_op(OpRequestRef& op, OSDMapRef& osdmap) -{ - const MOSDOp *m = static_cast(op->get_req()); - assert(m->get_type() == CEPH_MSG_OSD_OP); - if (op_is_discardable(m)) { - dout(10) << " discardable " << *m << dendl; - return; - } - - // set up a map send if the Op gets blocked for some reason - send_map_on_destruct share_map(this, m, osdmap, m->get_map_epoch()); - Session *client_session = - static_cast(m->get_connection()->get_priv()); - epoch_t last_sent_epoch; - if (client_session) { - client_session->sent_epoch_lock.lock(); - last_sent_epoch = client_session->last_sent_epoch; - client_session->sent_epoch_lock.unlock(); - } - share_map.should_send = service.should_share_map( - m->get_source(), m->get_connection().get(), m->get_map_epoch(), - osdmap, client_session ? &last_sent_epoch : NULL); - if (client_session) { - client_session->put(); - } - - // calc actual pgid - pg_t _pgid = m->get_raw_pg(); - int64_t pool = _pgid.pool(); - - if ((m->get_flags() & CEPH_OSD_FLAG_PGOP) == 0 && - osdmap->have_pg_pool(pool)) - _pgid = osdmap->raw_pg_to_pg(_pgid); - - spg_t pgid; - if (!osdmap->get_primary_shard(_pgid, &pgid)) { - // missing pool or acting set empty -- drop - return; - } - - PGRef pg = get_pg_or_queue_for_pg(pgid, op, client_session); - if (pg) { - op->send_map_update = share_map.should_send; - op->sent_epoch = m->get_map_epoch(); - enqueue_op(pg, op); - share_map.should_send = false; - return; - } - - // ok, we didn't have the PG. - if (!cct->_conf->osd_debug_misdirected_ops) { - return; - } - // let's see if it's our fault or the client's. note that this might - // involve loading an old OSDmap off disk, so it can be slow. - - OSDMapRef send_map = service.try_get_map(m->get_map_epoch()); - if (!send_map) { - dout(7) << "don't have sender's osdmap; assuming it was valid and that" - << " client will resend" << dendl; - return; - } - if (!send_map->have_pg_pool(pgid.pool())) { - dout(7) << "dropping request; pool did not exist" << dendl; - clog->warn() << m->get_source_inst() << " invalid " << m->get_reqid() - << " pg " << m->get_raw_pg() - << " to osd." << whoami - << " in e" << osdmap->get_epoch() - << ", client e" << m->get_map_epoch() - << " when pool " << m->get_pg().pool() << " did not exist" - << "\n"; - return; - } - if (!send_map->osd_is_valid_op_target(pgid.pgid, whoami)) { - dout(7) << "we are invalid target" << dendl; - clog->warn() << m->get_source_inst() << " misdirected " << m->get_reqid() - << " pg " << m->get_raw_pg() - << " to osd." << whoami - << " in e" << osdmap->get_epoch() - << ", client e" << m->get_map_epoch() - << " pg " << pgid - << " features " << m->get_connection()->get_features() - << "\n"; - if (g_conf->osd_enxio_on_misdirected_op) { - service.reply_op_error(op, -ENXIO); - } - return; - } - - // check against current map too - if (!osdmap->have_pg_pool(pgid.pool()) || - !osdmap->osd_is_valid_op_target(pgid.pgid, whoami)) { - dout(7) << "dropping; no longer have PG (or pool); client will retarget" - << dendl; - return; - } -} - -void OSD::handle_backoff(OpRequestRef& op, OSDMapRef& osdmap) -{ - const MOSDBackoff *m = static_cast(op->get_req()); - Session *s = static_cast(m->get_connection()->get_priv()); - dout(10) << __func__ << " " << *m << " session " << s << dendl; - assert(s); - s->put(); - - if (m->op != CEPH_OSD_BACKOFF_OP_ACK_BLOCK) { - dout(10) << __func__ << " unrecognized op, ignoring" << dendl; - return; - } - - // map hobject range to PG(s) - PGRef pg = get_pg_or_queue_for_pg(m->pgid, op, s); - if (pg) { - enqueue_op(pg, op); - } -} - -template -void OSD::handle_replica_op(OpRequestRef& op, OSDMapRef& osdmap) -{ - const T *m = static_cast(op->get_req()); - assert(m->get_type() == MSGTYPE); - - dout(10) << __func__ << " " << *m << " epoch " << m->map_epoch << dendl; - if (!require_self_aliveness(op->get_req(), m->map_epoch)) - return; - if (!require_osd_peer(op->get_req())) - return; - if (osdmap->get_epoch() >= m->map_epoch && - !require_same_peer_instance(op->get_req(), osdmap, true)) - return; - - // must be a rep op. - assert(m->get_source().is_osd()); - - // share our map with sender, if they're old - bool should_share_map = false; - Session *peer_session = - static_cast(m->get_connection()->get_priv()); - epoch_t last_sent_epoch; - if (peer_session) { - peer_session->sent_epoch_lock.lock(); - last_sent_epoch = peer_session->last_sent_epoch; - peer_session->sent_epoch_lock.unlock(); - } - should_share_map = service.should_share_map( - m->get_source(), m->get_connection().get(), m->map_epoch, - osdmap, - peer_session ? &last_sent_epoch : NULL); - if (peer_session) { - peer_session->put(); - } - - PGRef pg = get_pg_or_queue_for_pg(m->pgid, op, peer_session); - if (pg) { - op->send_map_update = should_share_map; - op->sent_epoch = m->map_epoch; - enqueue_op(pg, op); - } else if (should_share_map && m->get_connection()->is_connected()) { - C_SendMap *send_map = new C_SendMap(this, m->get_source(), - m->get_connection(), - osdmap, m->map_epoch); - service.op_gen_wq.queue(send_map); - } -} - bool OSD::op_is_discardable(const MOSDOp *op) { // drop client request if they are not connected and can't get the @@ -8954,196 +8516,44 @@ bool OSD::op_is_discardable(const MOSDOp *op) return false; } -void OSD::enqueue_op(PGRef pg, OpRequestRef& op) +void OSD::enqueue_op(spg_t pg, OpRequestRef& op, epoch_t epoch) { utime_t latency = ceph_clock_now() - op->get_req()->get_recv_stamp(); dout(15) << "enqueue_op " << op << " prio " << op->get_req()->get_priority() << " cost " << op->get_req()->get_cost() << " latency " << latency + << " epoch " << epoch << " " << *(op->get_req()) << dendl; - pg->queue_op(op); + op->mark_queued_for_pg(); + op_shardedwq.queue(make_pair(pg, PGQueueable(op, epoch))); } -void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb ) { - uint32_t shard_index = thread_index % num_shards; - ShardData* sdata = shard_list[shard_index]; - assert(NULL != sdata); - sdata->sdata_op_ordering_lock.Lock(); - if (sdata->pqueue->empty()) { - sdata->sdata_op_ordering_lock.Unlock(); - osd->cct->get_heartbeat_map()->reset_timeout(hb, - osd->cct->_conf->threadpool_default_timeout, 0); - sdata->sdata_lock.Lock(); - sdata->sdata_cond.WaitInterval(sdata->sdata_lock, - utime_t(osd->cct->_conf->threadpool_empty_queue_max_wait, 0)); - sdata->sdata_lock.Unlock(); - sdata->sdata_op_ordering_lock.Lock(); - if(sdata->pqueue->empty()) { - sdata->sdata_op_ordering_lock.Unlock(); - return; - } - } - pair item = sdata->pqueue->dequeue(); - sdata->pg_for_processing[&*(item.first)].push_back(item.second); - sdata->sdata_op_ordering_lock.Unlock(); - ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval, - suicide_interval); +/* + * NOTE: dequeue called in worker thread, with pg lock + */ +void OSD::dequeue_op( + PGRef pg, OpRequestRef op, + ThreadPool::TPHandle &handle) +{ + FUNCTRACE(); + OID_EVENT_TRACE_WITH_MSG(op->get_req(), "DEQUEUE_OP_BEGIN", false); - (item.first)->lock_suspend_timeout(tp_handle); + utime_t now = ceph_clock_now(); + op->set_dequeued_time(now); + utime_t latency = now - op->get_req()->get_recv_stamp(); + dout(10) << "dequeue_op " << op << " prio " << op->get_req()->get_priority() + << " cost " << op->get_req()->get_cost() + << " latency " << latency + << " " << *(op->get_req()) + << " pg " << *pg << dendl; - boost::optional op; - { - Mutex::Locker l(sdata->sdata_op_ordering_lock); - if (!sdata->pg_for_processing.count(&*(item.first))) { - (item.first)->unlock(); - return; - } - assert(sdata->pg_for_processing[&*(item.first)].size()); - op = sdata->pg_for_processing[&*(item.first)].front(); - sdata->pg_for_processing[&*(item.first)].pop_front(); - if (!(sdata->pg_for_processing[&*(item.first)].size())) - sdata->pg_for_processing.erase(&*(item.first)); - } - - // osd:opwq_process marks the point at which an operation has been dequeued - // and will begin to be handled by a worker thread. - { -#ifdef WITH_LTTNG - osd_reqid_t reqid; - if (boost::optional _op = op->maybe_get_op()) { - reqid = (*_op)->get_reqid(); - } -#endif - tracepoint(osd, opwq_process_start, reqid.name._type, - reqid.name._num, reqid.tid, reqid.inc); - } - - lgeneric_subdout(osd->cct, osd, 30) << "dequeue status: "; - Formatter *f = Formatter::create("json"); - f->open_object_section("q"); - dump(f); - f->close_section(); - f->flush(*_dout); - delete f; - *_dout << dendl; - - op->run(osd, item.first, tp_handle); - - { -#ifdef WITH_LTTNG - osd_reqid_t reqid; - if (boost::optional _op = op->maybe_get_op()) { - reqid = (*_op)->get_reqid(); - } -#endif - tracepoint(osd, opwq_process_finish, reqid.name._type, - reqid.name._num, reqid.tid, reqid.inc); - } - - (item.first)->unlock(); -} - -void OSD::ShardedOpWQ::_enqueue(pair item) { - uint32_t shard_index = - (item.first)->get_pgid().hash_to_shard(shard_list.size()); - - ShardData* sdata = shard_list[shard_index]; - assert (NULL != sdata); - unsigned priority = item.second.get_priority(); - unsigned cost = item.second.get_cost(); - sdata->sdata_op_ordering_lock.Lock(); - - if (priority >= osd->op_prio_cutoff) - sdata->pqueue->enqueue_strict( - item.second.get_owner(), priority, item); - else - sdata->pqueue->enqueue( - item.second.get_owner(), - priority, cost, item); - sdata->sdata_op_ordering_lock.Unlock(); - - sdata->sdata_lock.Lock(); - sdata->sdata_cond.SignalOne(); - sdata->sdata_lock.Unlock(); - -} - -void OSD::ShardedOpWQ::_enqueue_front(pair item) { - - uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size()); - - ShardData* sdata = shard_list[shard_index]; - assert (NULL != sdata); - sdata->sdata_op_ordering_lock.Lock(); - if (sdata->pg_for_processing.count(&*(item.first))) { - sdata->pg_for_processing[&*(item.first)].push_front(item.second); - item.second = sdata->pg_for_processing[&*(item.first)].back(); - sdata->pg_for_processing[&*(item.first)].pop_back(); - } - unsigned priority = item.second.get_priority(); - unsigned cost = item.second.get_cost(); - if (priority >= osd->op_prio_cutoff) - sdata->pqueue->enqueue_strict_front( - item.second.get_owner(), - priority, item); - else - sdata->pqueue->enqueue_front( - item.second.get_owner(), - priority, cost, item); - - sdata->sdata_op_ordering_lock.Unlock(); - sdata->sdata_lock.Lock(); - sdata->sdata_cond.SignalOne(); - sdata->sdata_lock.Unlock(); - -} - - -/* - * NOTE: dequeue called in worker thread, with pg lock - */ -void OSD::dequeue_op( - PGRef pg, OpRequestRef op, - ThreadPool::TPHandle &handle) -{ - FUNCTRACE(); - OID_EVENT_TRACE_WITH_MSG(op->get_req(), "DEQUEUE_OP_BEGIN", false); - - utime_t now = ceph_clock_now(); - op->set_dequeued_time(now); - utime_t latency = now - op->get_req()->get_recv_stamp(); - dout(10) << "dequeue_op " << op << " prio " << op->get_req()->get_priority() - << " cost " << op->get_req()->get_cost() - << " latency " << latency - << " " << *(op->get_req()) - << " pg " << *pg << dendl; - - // share our map with sender, if they're old - if (op->send_map_update) { - const Message *m = op->get_req(); - Session *session = static_cast(m->get_connection()->get_priv()); - epoch_t last_sent_epoch; - if (session) { - session->sent_epoch_lock.lock(); - last_sent_epoch = session->last_sent_epoch; - session->sent_epoch_lock.unlock(); - } - service.share_map( - m->get_source(), - m->get_connection().get(), - op->sent_epoch, - osdmap, - session ? &last_sent_epoch : NULL); - if (session) { - session->sent_epoch_lock.lock(); - if (session->last_sent_epoch < last_sent_epoch) { - session->last_sent_epoch = last_sent_epoch; - } - session->sent_epoch_lock.unlock(); - session->put(); - } + Session *session = static_cast( + op->get_req()->get_connection()->get_priv()); + if (session) { + maybe_share_map(session, op, pg->get_osdmap()); + session->put(); } if (pg->deleting) @@ -9182,8 +8592,8 @@ struct C_CompleteSplits : public Context { } osd->pg_map_lock.put_write(); osd->dispatch_context_transaction(rctx, &**i); + osd->wake_pg_waiters(*i); (*i)->unlock(); - osd->wake_pg_waiters((*i)->info.pgid); } osd->dispatch_context(rctx, 0, osd->service.get_osdmap()); @@ -9621,3 +9031,371 @@ void OSD::PeeringWQ::_dequeue(list *out) { } in_use.insert(out->begin(), out->end()); } + + + +// ============================================================= + +#undef dout_context +#define dout_context osd->cct +#undef dout_prefix +#define dout_prefix *_dout << "osd." << osd->whoami << " op_wq " + +void OSD::ShardedOpWQ::wake_pg_waiters(spg_t pgid) +{ + uint32_t shard_index = pgid.hash_to_shard(shard_list.size()); + auto sdata = shard_list[shard_index]; + bool queued = false; + unsigned pushes_to_free = 0; + { + Mutex::Locker l(sdata->sdata_op_ordering_lock); + auto p = sdata->pg_slots.find(pgid); + if (p != sdata->pg_slots.end()) { + dout(20) << __func__ << " " << pgid + << " to_process " << p->second.to_process + << " waiting_for_pg=" << (int)p->second.waiting_for_pg << dendl; + for (auto i = p->second.to_process.rbegin(); + i != p->second.to_process.rend(); + ++i) { + sdata->_enqueue_front(make_pair(pgid, *i), osd->op_prio_cutoff); + } + for (auto& q : p->second.to_process) { + pushes_to_free += q.get_reserved_pushes(); + } + p->second.to_process.clear(); + p->second.waiting_for_pg = false; + ++p->second.requeue_seq; + queued = true; + } + } + if (pushes_to_free > 0) { + osd->service.release_reserved_pushes(pushes_to_free); + } + if (queued) { + sdata->sdata_lock.Lock(); + sdata->sdata_cond.SignalOne(); + sdata->sdata_lock.Unlock(); + } +} + +void OSD::ShardedOpWQ::prune_pg_waiters(OSDMapRef osdmap, int whoami) +{ + unsigned pushes_to_free = 0; + for (auto sdata : shard_list) { + Mutex::Locker l(sdata->sdata_op_ordering_lock); + sdata->waiting_for_pg_osdmap = osdmap; + auto p = sdata->pg_slots.begin(); + while (p != sdata->pg_slots.end()) { + ShardData::pg_slot& slot = p->second; + if (!slot.to_process.empty() && slot.num_running == 0) { + if (osdmap->is_up_acting_osd_shard(p->first, whoami)) { + dout(20) << __func__ << " " << p->first << " maps to us, keeping" + << dendl; + ++p; + continue; + } + while (!slot.to_process.empty() && + slot.to_process.front().get_map_epoch() <= osdmap->get_epoch()) { + auto& qi = slot.to_process.front(); + dout(20) << __func__ << " " << p->first + << " item " << qi + << " epoch " << qi.get_map_epoch() + << " <= " << osdmap->get_epoch() + << ", stale, dropping" << dendl; + pushes_to_free += qi.get_reserved_pushes(); + slot.to_process.pop_front(); + } + } + if (slot.to_process.empty() && + slot.num_running == 0 && + !slot.pg) { + dout(20) << __func__ << " " << p->first << " empty, pruning" << dendl; + p = sdata->pg_slots.erase(p); + } else { + ++p; + } + } + } + if (pushes_to_free > 0) { + osd->service.release_reserved_pushes(pushes_to_free); + } +} + +void OSD::ShardedOpWQ::clear_pg_pointer(spg_t pgid) +{ + uint32_t shard_index = pgid.hash_to_shard(shard_list.size()); + auto sdata = shard_list[shard_index]; + Mutex::Locker l(sdata->sdata_op_ordering_lock); + auto p = sdata->pg_slots.find(pgid); + if (p != sdata->pg_slots.end()) { + auto& slot = p->second; + dout(20) << __func__ << " " << pgid << " pg " << slot.pg << dendl; + assert(!slot.pg || slot.pg->deleting); + slot.pg = nullptr; + } +} + +void OSD::ShardedOpWQ::clear_pg_slots() +{ + for (auto sdata : shard_list) { + Mutex::Locker l(sdata->sdata_op_ordering_lock); + sdata->pg_slots.clear(); + sdata->waiting_for_pg_osdmap.reset(); + // don't bother with reserved pushes; we are shutting down + } +} + +#undef dout_prefix +#define dout_prefix *_dout << "osd." << osd->whoami << " op_wq(" << shard_index << ") " + +void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) +{ + uint32_t shard_index = thread_index % num_shards; + ShardData *sdata = shard_list[shard_index]; + assert(NULL != sdata); + + // peek at spg_t + sdata->sdata_op_ordering_lock.Lock(); + if (sdata->pqueue->empty()) { + dout(20) << __func__ << " empty q, waiting" << dendl; + // optimistically sleep a moment; maybe another work item will come along. + sdata->sdata_op_ordering_lock.Unlock(); + osd->cct->get_heartbeat_map()->reset_timeout(hb, + osd->cct->_conf->threadpool_default_timeout, 0); + sdata->sdata_lock.Lock(); + sdata->sdata_cond.WaitInterval(sdata->sdata_lock, + utime_t(osd->cct->_conf->threadpool_empty_queue_max_wait, 0)); + sdata->sdata_lock.Unlock(); + sdata->sdata_op_ordering_lock.Lock(); + if (sdata->pqueue->empty()) { + sdata->sdata_op_ordering_lock.Unlock(); + return; + } + } + pair item = sdata->pqueue->dequeue(); + if (osd->is_stopping()) { + sdata->sdata_op_ordering_lock.Unlock(); + return; // OSD shutdown, discard. + } + PGRef pg; + uint64_t requeue_seq; + { + auto& slot = sdata->pg_slots[item.first]; + dout(30) << __func__ << " " << item.first + << " to_process " << slot.to_process + << " waiting_for_pg=" << (int)slot.waiting_for_pg << dendl; + slot.to_process.push_back(item.second); + // note the requeue seq now... + requeue_seq = slot.requeue_seq; + if (slot.waiting_for_pg) { + // save ourselves a bit of effort + dout(20) << __func__ << " " << item.first << " item " << item.second + << " queued, waiting_for_pg" << dendl; + sdata->sdata_op_ordering_lock.Unlock(); + return; + } + pg = slot.pg; + dout(20) << __func__ << " " << item.first << " item " << item.second + << " queued" << dendl; + ++slot.num_running; + } + sdata->sdata_op_ordering_lock.Unlock(); + + osd->service.maybe_inject_dispatch_delay(); + + // [lookup +] lock pg (if we have it) + if (!pg) { + pg = osd->_lookup_lock_pg(item.first); + } else { + pg->lock(); + } + + osd->service.maybe_inject_dispatch_delay(); + + boost::optional qi; + + // we don't use a Mutex::Locker here because of the + // osd->service.release_reserved_pushes() call below + sdata->sdata_op_ordering_lock.Lock(); + + auto q = sdata->pg_slots.find(item.first); + assert(q != sdata->pg_slots.end()); + auto& slot = q->second; + --slot.num_running; + + if (slot.to_process.empty()) { + // raced with wake_pg_waiters or prune_pg_waiters + dout(20) << __func__ << " " << item.first << " nothing queued" << dendl; + if (pg) { + pg->unlock(); + } + sdata->sdata_op_ordering_lock.Unlock(); + return; + } + if (requeue_seq != slot.requeue_seq) { + dout(20) << __func__ << " " << item.first + << " requeue_seq " << slot.requeue_seq << " > our " + << requeue_seq << ", we raced with wake_pg_waiters" + << dendl; + if (pg) { + pg->unlock(); + } + sdata->sdata_op_ordering_lock.Unlock(); + return; + } + if (pg && !slot.pg && !pg->deleting) { + dout(20) << __func__ << " " << item.first << " set pg to " << pg << dendl; + slot.pg = pg; + } + dout(30) << __func__ << " " << item.first << " to_process " << slot.to_process + << " waiting_for_pg=" << (int)slot.waiting_for_pg << dendl; + + // make sure we're not already waiting for this pg + if (slot.waiting_for_pg) { + dout(20) << __func__ << " " << item.first << " item " << item.second + << " slot is waiting_for_pg" << dendl; + if (pg) { + pg->unlock(); + } + sdata->sdata_op_ordering_lock.Unlock(); + return; + } + + // take next item + qi = slot.to_process.front(); + slot.to_process.pop_front(); + dout(20) << __func__ << " " << item.first << " item " << *qi + << " pg " << pg << dendl; + + if (!pg) { + // should this pg shard exist on this osd in this (or a later) epoch? + OSDMapRef osdmap = sdata->waiting_for_pg_osdmap; + if (osdmap->is_up_acting_osd_shard(item.first, osd->whoami)) { + dout(20) << __func__ << " " << item.first + << " no pg, should exist, will wait" << " on " << *qi << dendl; + slot.to_process.push_front(*qi); + slot.waiting_for_pg = true; + } else if (qi->get_map_epoch() > osdmap->get_epoch()) { + dout(20) << __func__ << " " << item.first << " no pg, item epoch is " + << qi->get_map_epoch() << " > " << osdmap->get_epoch() + << ", will wait on " << *qi << dendl; + slot.to_process.push_front(*qi); + slot.waiting_for_pg = true; + } else { + dout(20) << __func__ << " " << item.first << " no pg, shouldn't exist," + << " dropping " << *qi << dendl; + // share map with client? + if (boost::optional _op = qi->maybe_get_op()) { + Session *session = static_cast( + (*_op)->get_req()->get_connection()->get_priv()); + if (session) { + osd->maybe_share_map(session, *_op, sdata->waiting_for_pg_osdmap); + session->put(); + } + } + unsigned pushes_to_free = qi->get_reserved_pushes(); + if (pushes_to_free > 0) { + sdata->sdata_op_ordering_lock.Unlock(); + osd->service.release_reserved_pushes(pushes_to_free); + return; + } + } + sdata->sdata_op_ordering_lock.Unlock(); + return; + } + sdata->sdata_op_ordering_lock.Unlock(); + + + // osd_opwq_process marks the point at which an operation has been dequeued + // and will begin to be handled by a worker thread. + { +#ifdef WITH_LTTNG + osd_reqid_t reqid; + if (boost::optional _op = qi->maybe_get_op()) { + reqid = (*_op)->get_reqid(); + } +#endif + tracepoint(osd, opwq_process_start, reqid.name._type, + reqid.name._num, reqid.tid, reqid.inc); + } + + lgeneric_subdout(osd->cct, osd, 30) << "dequeue status: "; + Formatter *f = Formatter::create("json"); + f->open_object_section("q"); + dump(f); + f->close_section(); + f->flush(*_dout); + delete f; + *_dout << dendl; + + ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval, + suicide_interval); + qi->run(osd, pg, tp_handle); + + { +#ifdef WITH_LTTNG + osd_reqid_t reqid; + if (boost::optional _op = qi->maybe_get_op()) { + reqid = (*_op)->get_reqid(); + } +#endif + tracepoint(osd, opwq_process_finish, reqid.name._type, + reqid.name._num, reqid.tid, reqid.inc); + } + + pg->unlock(); +} + +void OSD::ShardedOpWQ::_enqueue(pair item) { + uint32_t shard_index = + item.first.hash_to_shard(shard_list.size()); + + ShardData* sdata = shard_list[shard_index]; + assert (NULL != sdata); + unsigned priority = item.second.get_priority(); + unsigned cost = item.second.get_cost(); + sdata->sdata_op_ordering_lock.Lock(); + + dout(20) << __func__ << " " << item.first << " " << item.second << dendl; + if (priority >= osd->op_prio_cutoff) + sdata->pqueue->enqueue_strict( + item.second.get_owner(), priority, item); + else + sdata->pqueue->enqueue( + item.second.get_owner(), + priority, cost, item); + sdata->sdata_op_ordering_lock.Unlock(); + + sdata->sdata_lock.Lock(); + sdata->sdata_cond.SignalOne(); + sdata->sdata_lock.Unlock(); + +} + +void OSD::ShardedOpWQ::_enqueue_front(pair item) +{ + uint32_t shard_index = item.first.hash_to_shard(shard_list.size()); + ShardData* sdata = shard_list[shard_index]; + assert (NULL != sdata); + sdata->sdata_op_ordering_lock.Lock(); + auto p = sdata->pg_slots.find(item.first); + if (p != sdata->pg_slots.end() && !p->second.to_process.empty()) { + // we may be racing with _process, which has dequeued a new item + // from pqueue, put it on to_process, and is now busy taking the + // pg lock. ensure this old requeued item is ordered before any + // such newer item in to_process. + p->second.to_process.push_front(item.second); + item.second = p->second.to_process.back(); + p->second.to_process.pop_back(); + dout(20) << __func__ << " " << item.first + << " " << p->second.to_process.front() + << " shuffled w/ " << item.second << dendl; + } else { + dout(20) << __func__ << " " << item.first << " " << item.second << dendl; + } + sdata->_enqueue_front(item, osd->op_prio_cutoff); + sdata->sdata_op_ordering_lock.Unlock(); + sdata->sdata_lock.Lock(); + sdata->sdata_cond.SignalOne(); + sdata->sdata_lock.Unlock(); +} diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 8b4a2085517..9114a5d2e9d 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -359,7 +359,6 @@ struct PGRecovery { } }; - class PGQueueable { typedef boost::variant< OpRequestRef, @@ -372,6 +371,8 @@ class PGQueueable { unsigned priority; utime_t start_time; entity_inst_t owner; + epoch_t map_epoch; ///< an epoch we expect the PG to exist in + struct RunVis : public boost::static_visitor<> { OSD *osd; PGRef &pg; @@ -383,29 +384,52 @@ class PGQueueable { void operator()(const PGScrub &op); void operator()(const PGRecovery &op); }; + + struct StringifyVis : public boost::static_visitor { + std::string operator()(const OpRequestRef &op) { + return stringify(op); + } + std::string operator()(const PGSnapTrim &op) { + return "PGSnapTrim"; + } + std::string operator()(const PGScrub &op) { + return "PGScrub"; + } + std::string operator()(const PGRecovery &op) { + return "PGRecovery"; + } + }; + friend ostream& operator<<(ostream& out, const PGQueueable& q) { + StringifyVis v; + return out << "PGQueueable(" << boost::apply_visitor(v, q.qvariant) + << " prio " << q.priority << " cost " << q.cost + << " e" << q.map_epoch << ")"; + } + public: // cppcheck-suppress noExplicitConstructor - PGQueueable(OpRequestRef op) + PGQueueable(OpRequestRef op, epoch_t e) : qvariant(op), cost(op->get_req()->get_cost()), priority(op->get_req()->get_priority()), start_time(op->get_req()->get_recv_stamp()), - owner(op->get_req()->get_source_inst()) + owner(op->get_req()->get_source_inst()), + map_epoch(e) {} PGQueueable( const PGSnapTrim &op, int cost, unsigned priority, utime_t start_time, - const entity_inst_t &owner) + const entity_inst_t &owner, epoch_t e) : qvariant(op), cost(cost), priority(priority), start_time(start_time), - owner(owner) {} + owner(owner), map_epoch(e) {} PGQueueable( const PGScrub &op, int cost, unsigned priority, utime_t start_time, - const entity_inst_t &owner) + const entity_inst_t &owner, epoch_t e) : qvariant(op), cost(cost), priority(priority), start_time(start_time), - owner(owner) {} + owner(owner), map_epoch(e) {} PGQueueable( const PGRecovery &op, int cost, unsigned priority, utime_t start_time, - const entity_inst_t &owner) + const entity_inst_t &owner, epoch_t e) : qvariant(op), cost(cost), priority(priority), start_time(start_time), - owner(owner) {} + owner(owner), map_epoch(e) {} const boost::optional maybe_get_op() const { const OpRequestRef *op = boost::get(&qvariant); return op ? OpRequestRef(*op) : boost::optional(); @@ -422,6 +446,7 @@ public: int get_cost() const { return cost; } utime_t get_start_time() const { return start_time; } entity_inst_t get_owner() const { return owner; } + epoch_t get_map_epoch() const { return map_epoch; } }; class OSDService { @@ -443,13 +468,24 @@ public: PerfCounters *&logger; PerfCounters *&recoverystate_perf; MonClient *&monc; - ShardedThreadPool::ShardedWQ < pair > &op_wq; ThreadPool::BatchWorkQueue &peering_wq; GenContextWQ recovery_gen_wq; GenContextWQ op_gen_wq; ClassHandler *&class_handler; - void dequeue_pg(PG *pg, list *dequeued); + void enqueue_back(spg_t pgid, PGQueueable qi); + void enqueue_front(spg_t pgid, PGQueueable qi); + + void maybe_inject_dispatch_delay() { + if (g_conf->osd_debug_inject_dispatch_delay_probability > 0) { + if (rand() % 10000 < + g_conf->osd_debug_inject_dispatch_delay_probability * 10000) { + utime_t t; + t.set_from_double(g_conf->osd_debug_inject_dispatch_delay_duration); + t.sleep(); + } + } + } private: // -- map epoch lower bound -- @@ -918,15 +954,15 @@ public: void queue_for_snap_trim(PG *pg); void queue_for_scrub(PG *pg) { - op_wq.queue( - make_pair( - pg, - PGQueueable( - PGScrub(pg->get_osdmap()->get_epoch()), - cct->_conf->osd_scrub_cost, - pg->get_scrub_priority(), - ceph_clock_now(), - entity_inst_t()))); + enqueue_back( + pg->info.pgid, + PGQueueable( + PGScrub(pg->get_osdmap()->get_epoch()), + cct->_conf->osd_scrub_cost, + pg->get_scrub_priority(), + ceph_clock_now(), + entity_inst_t(), + pg->get_osdmap()->get_epoch())); } private: @@ -946,15 +982,15 @@ private: void _queue_for_recovery( pair p, uint64_t reserved_pushes) { assert(recovery_lock.is_locked_by_me()); - pair to_queue = make_pair( - p.second, + enqueue_back( + p.second->info.pgid, PGQueueable( PGRecovery(p.first, reserved_pushes), cct->_conf->osd_recovery_cost, cct->_conf->osd_recovery_priority, ceph_clock_now(), - entity_inst_t())); - op_wq.queue(to_queue); + entity_inst_t(), + p.first)); } public: void start_recovery_op(PG *pg, const hobject_t& soid); @@ -1403,81 +1439,12 @@ private: void get_latest_osdmap(); // -- sessions -- -public: - - - static bool split_request(OpRequestRef op, unsigned match, unsigned bits) { - unsigned mask = ~((~0)<get_req()->get_type()) { - case CEPH_MSG_OSD_OP: - return (static_cast( - op->get_req())->get_raw_pg().m_seed & mask) == match; - } - return false; - } - - static void split_list( - boost::intrusive::list *from, - boost::intrusive::list *to, - unsigned match, - unsigned bits) { - for (auto i = from->begin(); i != from->end(); ) { - if (split_request(&(*i), match, bits)) { - OpRequest& o = *i; - i = from->erase(i); - to->push_back(o); - } else { - ++i; - } - } - } - static void split_list( - list *from, - list *to, - unsigned match, - unsigned bits) { - for (auto i = from->begin(); i != from->end(); ) { - if (split_request(*i, match, bits)) { - to->push_back(*i); - from->erase(i++); - } else { - ++i; - } - } - } - - private: - void update_waiting_for_pg(Session *session, OSDMapRef osdmap); - void session_notify_pg_create(Session *session, OSDMapRef osdmap, spg_t pgid); - void session_notify_pg_cleared(Session *session, OSDMapRef osdmap, spg_t pgid); void dispatch_session_waiting(Session *session, OSDMapRef osdmap); + void maybe_share_map(Session *session, OpRequestRef op, OSDMapRef osdmap); Mutex session_waiting_lock; set session_waiting_for_map; - map > session_waiting_for_pg; - - void clear_waiting_sessions() { - Mutex::Locker l(session_waiting_lock); - for (map >::const_iterator i = - session_waiting_for_pg.cbegin(); - i != session_waiting_for_pg.cend(); - ++i) { - for (set::iterator j = i->second.begin(); - j != i->second.end(); - ++j) { - (*j)->put(); - } - } - session_waiting_for_pg.clear(); - - for (set::iterator i = session_waiting_for_map.begin(); - i != session_waiting_for_map.end(); - ++i) { - (*i)->put(); - } - session_waiting_for_map.clear(); - } /// Caller assumes refs for included Sessions void get_sessions_waiting_for_map(set *out) { @@ -1486,9 +1453,8 @@ private: } void register_session_waiting_on_map(Session *session) { Mutex::Locker l(session_waiting_lock); - if (session_waiting_for_map.count(session) == 0) { + if (session_waiting_for_map.insert(session).second) { session->get(); - session_waiting_for_map.insert(session); } } void clear_session_waiting_on_map(Session *session) { @@ -1506,37 +1472,15 @@ private: i != sessions_to_check.end(); sessions_to_check.erase(i++)) { (*i)->session_dispatch_lock.Lock(); - update_waiting_for_pg(*i, osdmap); dispatch_session_waiting(*i, osdmap); (*i)->session_dispatch_lock.Unlock(); (*i)->put(); } } - void clear_session_waiting_on_pg(Session *session, const spg_t &pgid) { - Mutex::Locker l(session_waiting_lock); - map >::iterator i = session_waiting_for_pg.find(pgid); - if (i == session_waiting_for_pg.end()) { - return; - } - set::iterator j = i->second.find(session); - if (j != i->second.end()) { - (*j)->put(); - i->second.erase(j); - } - if (i->second.empty()) { - session_waiting_for_pg.erase(i); - } - } void session_handle_reset(Session *session) { Mutex::Locker l(session->session_dispatch_lock); clear_session_waiting_on_map(session); - for (auto i = session->waiting_for_pg.cbegin(); - i != session->waiting_for_pg.cend(); - ++i) { - clear_session_waiting_on_pg(session, i->first); - } - session->clear_backoffs(); /* Messages have connection refs, we need to clear the @@ -1545,49 +1489,6 @@ private: * Bug #12338 */ session->waiting_on_map.clear_and_dispose(TrackedOp::Putter()); - for (auto& i : session->waiting_for_pg) { - i.second.clear_and_dispose(TrackedOp::Putter()); - } - session->waiting_for_pg.clear(); - session->osdmap.reset(); - } - void register_session_waiting_on_pg(Session *session, spg_t pgid) { - Mutex::Locker l(session_waiting_lock); - set &s = session_waiting_for_pg[pgid]; - set::const_iterator i = s.find(session); - if (i == s.cend()) { - session->get(); - s.insert(session); - } - } - void get_sessions_possibly_interested_in_pg( - spg_t pgid, set *sessions) { - Mutex::Locker l(session_waiting_lock); - while (1) { - map >::iterator i = session_waiting_for_pg.find(pgid); - if (i != session_waiting_for_pg.end()) { - sessions->insert(i->second.begin(), i->second.end()); - } - if (pgid.pgid.ps() == 0) { - break; - } else { - pgid = pgid.get_parent(); - } - } - for (set::iterator i = sessions->begin(); - i != sessions->end(); - ++i) { - (*i)->get(); - } - } - void get_pgs_with_waiting_sessions(set *pgs) { - Mutex::Locker l(session_waiting_lock); - for (map >::iterator i = - session_waiting_for_pg.begin(); - i != session_waiting_for_pg.end(); - ++i) { - pgs->insert(i->first); - } } private: @@ -1746,54 +1647,117 @@ private: // -- op queue -- enum io_queue { prioritized, - weightedpriority}; + weightedpriority + }; const io_queue op_queue; const unsigned int op_prio_cutoff; + /* + * The ordered op delivery chain is: + * + * fast dispatch -> pqueue back + * pqueue front <-> to_process back + * to_process front -> RunVis(item) + * <- queue_front() + * + * The pqueue is per-shard, and to_process is per pg_slot. Items can be + * pushed back up into to_process and/or pqueue while order is preserved. + * + * Multiple worker threads can operate on each shard. + * + * Under normal circumstances, num_running == to_proces.size(). There are + * two times when that is not true: (1) when waiting_for_pg == true and + * to_process is accumulating requests that are waiting for the pg to be + * instantiated; in that case they will all get requeued together by + * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg + * and already requeued the items. + */ friend class PGQueueable; - class ShardedOpWQ: public ShardedThreadPool::ShardedWQ < pair > { - + class ShardedOpWQ + : public ShardedThreadPool::ShardedWQ> + { struct ShardData { Mutex sdata_lock; Cond sdata_cond; - Mutex sdata_op_ordering_lock; - map > pg_for_processing; - std::unique_ptr, entity_inst_t>> pqueue; + + Mutex sdata_op_ordering_lock; ///< protects all members below + + OSDMapRef waiting_for_pg_osdmap; + struct pg_slot { + PGRef pg; ///< cached pg reference [optional] + list to_process; ///< order items for this slot + int num_running = 0; ///< _process threads doing pg lookup/lock + + /// true if pg does/did not exist. if so all new items go directly to + /// to_process. cleared by prune_pg_waiters. + bool waiting_for_pg = false; + + /// incremented by wake_pg_waiters; indicates racing _process threads + /// should bail out (their op has been requeued) + uint64_t requeue_seq = 0; + }; + + /// map of slots for each spg_t. maintains ordering of items dequeued + /// from pqueue while _process thread drops shard lock to acquire the + /// pg lock. slots are removed only by prune_pg_waiters. + unordered_map pg_slots; + + /// priority queue + std::unique_ptr, entity_inst_t>> pqueue; + + void _enqueue_front(pair item, unsigned cutoff) { + unsigned priority = item.second.get_priority(); + unsigned cost = item.second.get_cost(); + if (priority >= cutoff) + pqueue->enqueue_strict_front( + item.second.get_owner(), + priority, item); + else + pqueue->enqueue_front( + item.second.get_owner(), + priority, cost, item); + } + ShardData( string lock_name, string ordering_lock, uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct, io_queue opqueue) : sdata_lock(lock_name.c_str(), false, true, false, cct), - sdata_op_ordering_lock(ordering_lock.c_str(), false, true, false, cct) { - if (opqueue == weightedpriority) { - pqueue = std::unique_ptr - , entity_inst_t>>( - new WeightedPriorityQueue< pair, entity_inst_t>( - max_tok_per_prio, min_cost)); - } else if (opqueue == prioritized) { - pqueue = std::unique_ptr - , entity_inst_t>>( - new PrioritizedQueue< pair, entity_inst_t>( - max_tok_per_prio, min_cost)); - } - } + sdata_op_ordering_lock(ordering_lock.c_str(), false, true, + false, cct) { + if (opqueue == weightedpriority) { + pqueue = std::unique_ptr + ,entity_inst_t>>( + new WeightedPriorityQueue,entity_inst_t>( + max_tok_per_prio, min_cost)); + } else if (opqueue == prioritized) { + pqueue = std::unique_ptr + ,entity_inst_t>>( + new PrioritizedQueue,entity_inst_t>( + max_tok_per_prio, min_cost)); + } + } }; - + vector shard_list; OSD *osd; uint32_t num_shards; public: - ShardedOpWQ(uint32_t pnum_shards, OSD *o, time_t ti, time_t si, ShardedThreadPool* tp): - ShardedThreadPool::ShardedWQ < pair >(ti, si, tp), - osd(o), num_shards(pnum_shards) { - for(uint32_t i = 0; i < num_shards; i++) { + ShardedOpWQ(uint32_t pnum_shards, + OSD *o, + time_t ti, + time_t si, + ShardedThreadPool* tp) + : ShardedThreadPool::ShardedWQ>(ti, si, tp), + osd(o), + num_shards(pnum_shards) { + for (uint32_t i = 0; i < num_shards; i++) { char lock_name[32] = {0}; snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i); char order_lock[32] = {0}; - snprintf( - order_lock, sizeof(order_lock), "%s.%d", - "OSD:ShardedOpWQ:order:", i); + snprintf(order_lock, sizeof(order_lock), "%s.%d", + "OSD:ShardedOpWQ:order:", i); ShardData* one_shard = new ShardData( lock_name, order_lock, osd->cct->_conf->osd_op_pq_max_tokens_per_priority, @@ -1801,17 +1765,33 @@ private: shard_list.push_back(one_shard); } } - ~ShardedOpWQ() { - while(!shard_list.empty()) { + while (!shard_list.empty()) { delete shard_list.back(); shard_list.pop_back(); } } + /// wake any pg waiters after a PG is created/instantiated + void wake_pg_waiters(spg_t pgid); + + /// prune ops (and possiblye pg_slots) for pgs that shouldn't be here + void prune_pg_waiters(OSDMapRef osdmap, int whoami); + + /// clear cached PGRef on pg deletion + void clear_pg_pointer(spg_t pgid); + + /// clear pg_slots on shutdown + void clear_pg_slots(); + + /// try to do some work void _process(uint32_t thread_index, heartbeat_handle_d *hb); - void _enqueue(pair item); - void _enqueue_front(pair item); + + /// enqueue a new item + void _enqueue(pair item); + + /// requeue an old item (at the front of the line) + void _enqueue_front(pair item); void return_waiting_threads() { for(uint32_t i = 0; i < num_shards; i++) { @@ -1839,11 +1819,11 @@ private: /// Must be called on ops queued back to front struct Pred { - PG *pg; + spg_t pgid; list *out_ops; uint64_t reserved_pushes_to_free; - Pred(PG *pg, list *out_ops = 0) - : pg(pg), out_ops(out_ops), reserved_pushes_to_free(0) {} + Pred(spg_t pg, list *out_ops = 0) + : pgid(pg), out_ops(out_ops), reserved_pushes_to_free(0) {} void accumulate(const PGQueueable &op) { reserved_pushes_to_free += op.get_reserved_pushes(); if (out_ops) { @@ -1852,8 +1832,8 @@ private: out_ops->push_front(*mop); } } - bool operator()(const pair &op) { - if (op.first == pg) { + bool operator()(const pair &op) { + if (op.first == pgid) { accumulate(op.second); return true; } else { @@ -1865,40 +1845,7 @@ private: } }; - void dequeue(PG *pg) { - FUNCTRACE(); - return dequeue_and_get_ops(pg, nullptr); - } - - void dequeue_and_get_ops(PG *pg, list *dequeued) { - ShardData* sdata = NULL; - assert(pg != NULL); - uint32_t shard_index = pg->get_pgid().ps()% shard_list.size(); - sdata = shard_list[shard_index]; - assert(sdata != NULL); - sdata->sdata_op_ordering_lock.Lock(); - - Pred f(pg, dequeued); - - // items in pqueue are behind items in pg_for_processing - sdata->pqueue->remove_by_filter(f); - - map >::const_iterator iter = - sdata->pg_for_processing.find(pg); - if (iter != sdata->pg_for_processing.cend()) { - for (auto i = iter->second.crbegin(); - i != iter->second.crend(); - ++i) { - f.accumulate(*i); - } - sdata->pg_for_processing.erase(iter); - } - - sdata->sdata_op_ordering_lock.Unlock(); - osd->service.release_reserved_pushes(f.get_reserved_pushes_to_free()); - } - - bool is_shard_empty(uint32_t thread_index) { + bool is_shard_empty(uint32_t thread_index) override { uint32_t shard_index = thread_index % num_shards; ShardData* sdata = shard_list[shard_index]; assert(NULL != sdata); @@ -1908,7 +1855,7 @@ private: } op_shardedwq; - void enqueue_op(PGRef pg, OpRequestRef& op); + void enqueue_op(spg_t pg, OpRequestRef& op, epoch_t epoch); void dequeue_op( PGRef pg, OpRequestRef op, ThreadPool::TPHandle &handle); @@ -2042,8 +1989,6 @@ protected: PGPool _get_pool(int id, OSDMapRef createmap); - PGRef get_pg_or_queue_for_pg(const spg_t& pgid, OpRequestRef& op, - Session *session); PG *_lookup_lock_pg_with_map_lock_held(spg_t pgid); PG *_lookup_lock_pg(spg_t pgid); PG *_open_lock_pg(OSDMapRef createmap, @@ -2056,10 +2001,6 @@ protected: res_result _try_resurrect_pg( OSDMapRef curmap, spg_t pgid, spg_t *resurrected, PGRef *old_pg_state); - /** - * After unlocking the pg, the user must ensure that wake_pg_waiters - * is called. - */ PG *_create_lock_pg( OSDMapRef createmap, spg_t pgid, @@ -2071,7 +2012,6 @@ protected: pg_history_t history, const pg_interval_map_t& pi, ObjectStore::Transaction& t); - PG *_lookup_qlock_pg(spg_t pgid); PG* _make_pg(OSDMapRef createmap, spg_t pgid); void add_newly_split_pg(PG *pg, @@ -2096,25 +2036,11 @@ protected: int lastactingprimary ); ///< @return false if there was a map gap between from and now - void wake_pg_waiters(spg_t pgid) { - assert(osd_lock.is_locked()); - // Need write lock on pg_map_lock - set concerned_sessions; - get_sessions_possibly_interested_in_pg(pgid, &concerned_sessions); - - for (set::iterator i = concerned_sessions.begin(); - i != concerned_sessions.end(); - ++i) { - { - Mutex::Locker l((*i)->session_dispatch_lock); - session_notify_pg_create(*i, osdmap, pgid); - dispatch_session_waiting(*i, osdmap); - } - (*i)->put(); - } + // this must be called with pg->lock held on any pg addition to pg_map + void wake_pg_waiters(PGRef pg) { + assert(pg->is_locked()); + op_shardedwq.wake_pg_waiters(pg->info.pgid); } - - epoch_t last_pg_create_epoch; void handle_pg_create(OpRequestRef op); @@ -2473,11 +2399,6 @@ private: void handle_pg_scrub(struct MOSDScrub *m, PG* pg); void handle_scrub(struct MOSDScrub *m); void handle_osd_ping(class MOSDPing *m); - void handle_op(OpRequestRef& op, OSDMapRef& osdmap); - void handle_backoff(OpRequestRef& op, OSDMapRef& osdmap); - - template - void handle_replica_op(OpRequestRef& op, OSDMapRef& osdmap); int init_op_flags(OpRequestRef& op); diff --git a/src/osd/OSDMap.h b/src/osd/OSDMap.h index e01127dbb95..64d0580f7b3 100644 --- a/src/osd/OSDMap.h +++ b/src/osd/OSDMap.h @@ -784,14 +784,20 @@ public: /* * check whether an spg_t maps to a particular osd */ - bool is_acting_osd_shard(spg_t pg, int osd) const { - vector acting; - _pg_to_up_acting_osds(pg.pgid, NULL, NULL, &acting, NULL, false); - if (pg.shard == shard_id_t::NO_SHARD) - return calc_pg_role(osd, acting, acting.size()) >= 0; - if (pg.shard >= (int)acting.size()) - return false; - return acting[pg.shard] == osd; + bool is_up_acting_osd_shard(spg_t pg, int osd) const { + vector up, acting; + _pg_to_up_acting_osds(pg.pgid, &up, NULL, &acting, NULL, false); + if (pg.shard == shard_id_t::NO_SHARD) { + if (calc_pg_role(osd, acting, acting.size()) >= 0 || + calc_pg_role(osd, up, up.size()) >= 0) + return true; + } else { + if (pg.shard < (int)acting.size() && acting[pg.shard] == osd) + return true; + if (pg.shard < (int)up.size() && up[pg.shard] == osd) + return true; + } + return false; } diff --git a/src/osd/OpRequest.cc b/src/osd/OpRequest.cc index 3a161b65170..a71e35c90df 100644 --- a/src/osd/OpRequest.cc +++ b/src/osd/OpRequest.cc @@ -27,7 +27,6 @@ OpRequest::OpRequest(Message *req, OpTracker *tracker) : TrackedOp(tracker, req->get_recv_stamp()), rmw_flags(0), request(req), hit_flag_points(0), latest_flag_point(0), - send_map_update(false), sent_epoch(0), hitset_inserted(false) { if (req->get_priority() < tracker->cct->_conf->osd_client_op_priority) { // don't warn as quickly for low priority ops diff --git a/src/osd/OpRequest.h b/src/osd/OpRequest.h index dbcfe6773a3..9bd41830888 100644 --- a/src/osd/OpRequest.h +++ b/src/osd/OpRequest.h @@ -107,12 +107,22 @@ public: ~OpRequest() { request->put(); } - bool send_map_update; - epoch_t sent_epoch; + + bool check_send_map = true; ///< true until we check if sender needs a map + epoch_t sent_epoch = 0; ///< client's map epoch + bool hitset_inserted; const Message *get_req() const { return request; } Message *get_nonconst_req() { return request; } + entity_name_t get_source() { + if (request) { + return request->get_source(); + } else { + return entity_name_t(); + } + } + const char *state_string() const { switch(latest_flag_point) { case flag_queued_for_pg: return "queued for pg"; diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 963e75e4c80..885ce697769 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -221,7 +221,6 @@ PG::PG(OSDService *o, OSDMapRef curmap, p.get_split_bits(curmap->get_pg_num(_pool.id)), _pool.id, p.shard), - map_lock("PG::map_lock"), osdmap_ref(curmap), last_persisted_osdmap_ref(curmap), pool(_pool), _lock("PG::_lock"), #ifdef PG_DEBUG_REFS @@ -1910,48 +1909,6 @@ bool PG::op_has_sufficient_caps(OpRequestRef& op) return cap; } -void PG::take_op_map_waiters() -{ - Mutex::Locker l(map_lock); - for (list::iterator i = waiting_for_map.begin(); - i != waiting_for_map.end(); - ) { - if (op_must_wait_for_map(get_osdmap_with_maplock()->get_epoch(), *i)) { - break; - } else { - (*i)->mark_queued_for_pg(); - osd->op_wq.queue(make_pair(PGRef(this), *i)); - waiting_for_map.erase(i++); - } - } -} - -void PG::queue_op(OpRequestRef& op) -{ - Mutex::Locker l(map_lock); - if (!waiting_for_map.empty()) { - // preserve ordering - waiting_for_map.push_back(op); - op->mark_delayed("waiting_for_map not empty"); - return; - } - if (op_must_wait_for_map(get_osdmap_with_maplock()->get_epoch(), op)) { - waiting_for_map.push_back(op); - op->mark_delayed("op must wait for map"); - return; - } - op->mark_queued_for_pg(); - osd->op_wq.queue(make_pair(PGRef(this), op)); - { - // after queue() to include any locking costs -#ifdef WITH_LTTNG - osd_reqid_t reqid = op->get_reqid(); -#endif - tracepoint(pg, queue_op, reqid.name._type, - reqid.name._num, reqid.tid, reqid.inc, op->rmw_flags); - } -} - void PG::_activate_committed(epoch_t epoch, epoch_t activation_epoch) { lock(); @@ -3457,20 +3414,55 @@ void PG::requeue_object_waiters(map>& m) void PG::requeue_op(OpRequestRef op) { - osd->op_wq.queue_front(make_pair(PGRef(this), op)); + auto p = waiting_for_map.find(op->get_source()); + if (p != waiting_for_map.end()) { + dout(20) << __func__ << " " << op << " (waiting_for_map " << p->first << ")" + << dendl; + p->second.push_front(op); + } else { + dout(20) << __func__ << " " << op << dendl; + osd->enqueue_front(info.pgid, PGQueueable(op, get_osdmap()->get_epoch())); + } } void PG::requeue_ops(list &ls) { - dout(15) << " requeue_ops " << ls << dendl; for (list::reverse_iterator i = ls.rbegin(); i != ls.rend(); ++i) { - osd->op_wq.queue_front(make_pair(PGRef(this), *i)); + auto p = waiting_for_map.find((*i)->get_source()); + if (p != waiting_for_map.end()) { + dout(20) << __func__ << " " << *i << " (waiting_for_map " << p->first + << ")" << dendl; + p->second.push_front(*i); + } else { + dout(20) << __func__ << " " << *i << dendl; + osd->enqueue_front(info.pgid, PGQueueable(*i, get_osdmap()->get_epoch())); + } } ls.clear(); } +void PG::requeue_map_waiters() +{ + epoch_t epoch = get_osdmap()->get_epoch(); + auto p = waiting_for_map.begin(); + while (p != waiting_for_map.end()) { + if (op_must_wait_for_map(epoch, p->second.front())) { + dout(20) << __func__ << " " << p->first << " front op " + << p->second.front() << " must still wait, doing nothing" + << dendl; + ++p; + } else { + dout(20) << __func__ << " " << p->first << " " << p->second << dendl; + for (auto q = p->second.rbegin(); q != p->second.rend(); ++q) { + osd->enqueue_front(info.pgid, PGQueueable(*q, epoch)); + } + p = waiting_for_map.erase(p); + } + } +} + // ========================================================================================== // SCRUB @@ -5626,7 +5618,7 @@ bool PG::op_must_wait_for_map(epoch_t cur_epoch, OpRequestRef& op) void PG::take_waiters() { dout(10) << "take_waiters" << dendl; - take_op_map_waiters(); + requeue_map_waiters(); for (list::iterator i = peering_waiters.begin(); i != peering_waiters.end(); ++i) osd->queue_for_peering(this); diff --git a/src/osd/PG.h b/src/osd/PG.h index 118a2ec7eb4..dbbac2df8ee 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -218,28 +218,17 @@ public: return get_pgbackend()->get_is_recoverable_predicate(); } protected: - // Ops waiting for map, should be queued at back - Mutex map_lock; - list waiting_for_map; OSDMapRef osdmap_ref; OSDMapRef last_persisted_osdmap_ref; PGPool pool; - void queue_op(OpRequestRef& op); - void take_op_map_waiters(); + void requeue_map_waiters(); void update_osdmap_ref(OSDMapRef newmap) { assert(_lock.is_locked_by_me()); - Mutex::Locker l(map_lock); osdmap_ref = std::move(newmap); } - OSDMapRef get_osdmap_with_maplock() const { - assert(map_lock.is_locked()); - assert(osdmap_ref); - return osdmap_ref; - } - public: OSDMapRef get_osdmap() const { assert(is_locked()); @@ -813,10 +802,65 @@ public: protected: + /* + * blocked request wait hierarchy + * + * In order to preserve request ordering we need to be careful about the + * order in which blocked requests get requeued. Generally speaking, we + * push the requests back up to the op_wq in reverse order (most recent + * request first) so that they come back out again in the original order. + * However, because there are multiple wait queues, we need to requeue + * waitlists in order. Generally speaking, we requeue the wait lists + * that are checked first. + * + * Here are the various wait lists, in the order they are used during + * request processing, with notes: + * + * - waiting_for_map + * - may start or stop blocking at any time (depending on client epoch) + * - waiting_for_peered + * - !is_peered() or flushes_in_progress + * - only starts blocking on interval change; never restarts + * - waiting_for_active + * - !is_active() + * - only starts blocking on interval change; never restarts + * - waiting_for_scrub + * - starts and stops blocking for varying intervals during scrub + * - waiting_for_unreadable_object + * - never restarts once object is readable (* except for EIO?) + * - waiting_for_degraded_object + * - never restarts once object is writeable (* except for EIO?) + * - waiting_for_blocked_object + * - starts and stops based on proxied op activity + * - obc rwlocks + * - starts and stops based on read/write activity + * + * Notes: + * + * 1. During and interval change, we requeue *everything* in the above order. + * + * 2. When an obc rwlock is released, we check for a scrub block and requeue + * the op there if it applies. We ignore the unreadable/degraded/blocked + * queues because we assume they cannot apply at that time (this is + * probably mostly true). + * + * 3. The requeue_ops helper will push ops onto the waiting_for_map list if + * it is non-empty. + * + * These three behaviors are generally sufficient to maintain ordering, with + * the possible exception of cases where we make an object degraded or + * unreadable that was previously okay, e.g. when scrub or op processing + * encounter an unexpected error. FIXME. + */ // pg waiters unsigned flushes_in_progress; + // ops with newer maps than our (or blocked behind them) + // track these by client, since inter-request ordering doesn't otherwise + // matter. + unordered_map> waiting_for_map; + // ops waiting on peered list waiting_for_peered; diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index 579fe7c1cc9..96dbaafaf61 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -1552,7 +1552,24 @@ void PrimaryLogPG::do_request( OpRequestRef& op, ThreadPool::TPHandle &handle) { - assert(!op_must_wait_for_map(get_osdmap()->get_epoch(), op)); + // make sure we have a new enough map + auto p = waiting_for_map.find(op->get_source()); + if (p != waiting_for_map.end()) { + // preserve ordering + dout(20) << __func__ << " waiting_for_map " + << p->first << " not empty, queueing" << dendl; + p->second.push_back(op); + op->mark_delayed("waiting_for_map not empty"); + return; + } + if (op_must_wait_for_map(get_osdmap()->get_epoch(), op)) { + dout(20) << __func__ << " queue on waiting_for_map " + << op->get_source() << dendl; + waiting_for_map[op->get_source()].push_back(op); + op->mark_delayed("op must wait for map"); + return; + } + if (can_discard_request(op)) { return; } @@ -8272,10 +8289,9 @@ void PrimaryLogPG::op_applied(const eversion_t &applied_version) if (scrubber.active_rep_scrub) { if (last_update_applied == static_cast( scrubber.active_rep_scrub->get_req())->scrub_to) { - osd->op_wq.queue( - make_pair( - this, - scrubber.active_rep_scrub)); + osd->enqueue_back( + info.pgid, + PGQueueable(scrubber.active_rep_scrub, get_osdmap()->get_epoch())); scrubber.active_rep_scrub = OpRequestRef(); } } @@ -9503,10 +9519,9 @@ void PrimaryLogPG::_applied_recovered_object_replica() if (!deleting && active_pushes == 0 && scrubber.active_rep_scrub && static_cast( scrubber.active_rep_scrub->get_req())->chunky) { - osd->op_wq.queue( - make_pair( - this, - scrubber.active_rep_scrub)); + osd->enqueue_back( + info.pgid, + PGQueueable(scrubber.active_rep_scrub, get_osdmap()->get_epoch())); scrubber.active_rep_scrub = OpRequestRef(); } @@ -9898,7 +9913,6 @@ void PrimaryLogPG::on_shutdown() // remove from queues osd->pg_stat_queue_dequeue(this); - osd->dequeue_pg(this, 0); osd->peering_wq.dequeue(this); // handles queue races diff --git a/src/osd/Session.h b/src/osd/Session.h index 63316773711..5abcb0da3d1 100644 --- a/src/osd/Session.h +++ b/src/osd/Session.h @@ -130,9 +130,6 @@ struct Session : public RefCountedObject { Mutex session_dispatch_lock; boost::intrusive::list waiting_on_map; - OSDMapRef osdmap; /// Map as of which waiting_for_pg is current - map > waiting_for_pg; - Spinlock sent_epoch_lock; epoch_t last_sent_epoch; Spinlock received_map_lock; @@ -153,11 +150,6 @@ struct Session : public RefCountedObject { last_sent_epoch(0), received_map_epoch(0), backoff_lock("Session::backoff_lock") {} - void maybe_reset_osdmap() { - if (waiting_for_pg.empty()) { - osdmap.reset(); - } - } void ack_backoff( CephContext *cct, -- 2.47.3