From: Greg Farnum Date: Wed, 26 Mar 2014 22:04:39 +0000 (-0700) Subject: OSD: enable ms_fast_dispatch X-Git-Tag: v0.81~57^2~8 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=fd2b57eac0acf298b1d65035f6b2f5ef23163aa7;p=ceph.git OSD: enable ms_fast_dispatch We've been setting it up, now this patch actually adds a fast path for osd ops which bypasses the osd_lock and should not block on any longly held locks. In addition to the actual ms_fast_dispatch; we take advantage of the fast_notify functions in order to create a Session for every peer, since that is now the data structure around which we handle incoming Messages and waitlisting; and fast_preprocess in order to track when a peer has already sent us a new map (otherwise, if we see an op with a too-new epoch, we have to request it from the monitor). Signed-off-by: Samuel Just Signed-off-by: Greg Farnum --- diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 86e928d5dbe..fc910ed98e0 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -3653,6 +3653,41 @@ void OSD::ms_handle_connect(Connection *con) } } +void OSD::ms_handle_fast_connect(Connection *con) +{ + if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON) { + Session *s = static_cast(con->get_priv()); + if (!s) { + s = new Session; + con->set_priv(s->get()); + s->con = con; + dout(10) << " new session (outgoing)" << s << " con=" << s->con + << " addr=" << s->con->get_peer_addr() << dendl; + // we don't connect to clients + assert(con->get_peer_type() == CEPH_ENTITY_TYPE_OSD); + s->entity_name.set_type(CEPH_ENTITY_TYPE_OSD); + } + } +} + +void OSD::ms_handle_fast_accept(Connection *con) +{ + if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON) { + Session *s = static_cast(con->get_priv()); + if (!s) { + s = new Session(); + con->set_priv(s->get()); + s->con = con; + dout(10) << "new session (incoming)" << s << " con=" << con + << " addr=" << con->get_peer_addr() + << " must have raced with connect" << dendl; + assert(con->get_peer_type() == CEPH_ENTITY_TYPE_OSD); + s->entity_name.set_type(CEPH_ENTITY_TYPE_OSD); + } + s->put(); + } +} + bool OSD::ms_handle_reset(Connection *con) { OSD::Session *session = (OSD::Session *)con->get_priv(); @@ -4866,6 +4901,48 @@ bool OSD::ms_dispatch(Message *m) return true; } +void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap) +{ + for (list::iterator i = session->waiting_on_map.begin(); + i != session->waiting_on_map.end() && dispatch_op_fast(*i, osdmap); + session->waiting_on_map.erase(i++)); + + if (session->waiting_on_map.empty()) { + clear_session_waiting_on_map(session); + } else { + register_session_waiting_on_map(session); + } +} + +void OSD::ms_fast_dispatch(Message *m) +{ + OpRequestRef op = op_tracker.create_request(m); + OSDMapRef nextmap = service.get_nextmap_reserved(); + Session *session = static_cast(m->get_connection()->get_priv()); + assert(session); + { + Mutex::Locker l(session->session_dispatch_lock); + session->waiting_on_map.push_back(op); + dispatch_session_waiting(session, nextmap); + } + session->put(); + service.release_map(nextmap); +} + +void OSD::ms_fast_preprocess(Message *m) +{ + if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_OSD) { + if (m->get_type() == CEPH_MSG_OSD_MAP) { + MOSDMap *mm = static_cast(m); + Session *s = static_cast(m->get_connection()->get_priv()); + s->received_map_lock.Lock(); + s->received_map_epoch = mm->get_last(); + s->received_map_lock.Unlock(); + s->put(); + } + } +} + bool OSD::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new) { dout(10) << "OSD::ms_get_authorizer type=" << ceph_entity_type_name(dest_type) << dendl; @@ -5052,56 +5129,72 @@ void OSD::dispatch_op(OpRequestRef op) case MSG_OSD_RECOVERY_RESERVE: handle_pg_recovery_reserve(op); break; - default: - epoch_t msg_epoch(op_required_epoch(op)); - if (msg_epoch > osdmap->get_epoch()) - return false; + } +} - switch(op->get_req()->get_type()) { - // client ops - case CEPH_MSG_OSD_OP: - handle_op(op, osdmap); - break; +bool OSD::dispatch_op_fast(OpRequestRef op, OSDMapRef osdmap) { + if (is_stopping()) { + // we're shutting down, so drop the op + return true; + } - // for replication etc. - case MSG_OSD_SUBOP: - handle_replica_op(op, osdmap); - break; - case MSG_OSD_SUBOPREPLY: - handle_replica_op(op, osdmap); - break; - case MSG_OSD_PG_PUSH: - handle_replica_op(op, osdmap); - break; - case MSG_OSD_PG_PULL: - handle_replica_op(op, osdmap); - break; - case MSG_OSD_PG_PUSH_REPLY: - handle_replica_op(op, osdmap); - break; - case MSG_OSD_PG_SCAN: - handle_replica_op(op, osdmap); - break; - case MSG_OSD_PG_BACKFILL: - handle_replica_op(op, osdmap); - break; - case MSG_OSD_EC_WRITE: - handle_replica_op(op, osdmap); - break; - case MSG_OSD_EC_WRITE_REPLY: - handle_replica_op(op, osdmap); - break; - case MSG_OSD_EC_READ: - handle_replica_op(op, osdmap); - break; - case MSG_OSD_EC_READ_REPLY: - handle_replica_op(op, osdmap); - break; - default: - assert(0); + epoch_t msg_epoch(op_required_epoch(op)); + if (msg_epoch > osdmap->get_epoch()) { + Session *s = static_cast(op->get_req()-> + get_connection()->get_priv()); + s->received_map_lock.Lock(); + epoch_t received_epoch = s->received_map_epoch; + s->received_map_lock.Unlock(); + if (received_epoch < msg_epoch) { + osdmap_subscribe(msg_epoch, false); } + s->put(); + return false; + } + + switch(op->get_req()->get_type()) { + // client ops + case CEPH_MSG_OSD_OP: + handle_op(op, osdmap); + break; + // for replication etc. + case MSG_OSD_SUBOP: + handle_replica_op(op, osdmap); break; + case MSG_OSD_SUBOPREPLY: + handle_replica_op(op, osdmap); + break; + case MSG_OSD_PG_PUSH: + handle_replica_op(op, osdmap); + break; + case MSG_OSD_PG_PULL: + handle_replica_op(op, osdmap); + break; + case MSG_OSD_PG_PUSH_REPLY: + handle_replica_op(op, osdmap); + break; + case MSG_OSD_PG_SCAN: + handle_replica_op(op, osdmap); + break; + case MSG_OSD_PG_BACKFILL: + handle_replica_op(op, osdmap); + break; + case MSG_OSD_EC_WRITE: + handle_replica_op(op, osdmap); + break; + case MSG_OSD_EC_WRITE_REPLY: + handle_replica_op(op, osdmap); + break; + case MSG_OSD_EC_READ: + handle_replica_op(op, osdmap); + break; + case MSG_OSD_EC_READ_REPLY: + handle_replica_op(op, osdmap); + break; + default: + assert(0); } + return true; } void OSD::_dispatch(Message *m) @@ -6040,6 +6133,17 @@ void OSD::consume_map() service.await_reserved_maps(); service.publish_map(osdmap); + set sessions_to_check; + get_sessions_waiting_for_map(&sessions_to_check); + for (set::iterator i = sessions_to_check.begin(); + i != sessions_to_check.end(); + sessions_to_check.erase(i++)) { + (*i)->session_dispatch_lock.Lock(); + dispatch_session_waiting(*i, osdmap); + (*i)->session_dispatch_lock.Unlock(); + (*i)->put(); + } + // scan pg's { RWLock::RLocker l(pg_map_lock); @@ -6311,8 +6415,15 @@ bool OSD::require_same_or_newer_map(OpRequestRef op, epoch_t epoch) << " expected " << (osdmap->have_inst(from) ? osdmap->get_cluster_addr(from) : entity_addr_t()) << dendl; ConnectionRef con = m->get_connection(); - con->set_priv(NULL); // break ref <-> session cycle, if any cluster_messenger->mark_down(con.get()); + Session *s = static_cast(con->get_priv()); + if (s) { + s->session_dispatch_lock.Lock(); + clear_session_waiting_on_map(s); + con->set_priv(NULL); // break ref <-> session cycle, if any + s->session_dispatch_lock.Unlock(); + s->put(); + } return false; } } @@ -7628,6 +7739,7 @@ void OSD::handle_op(OpRequestRef op, OSDMapRef osdmap) return; } } + // calc actual pgid pg_t _pgid = m->get_pg(); int64_t pool = _pgid.pool(); diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 65905325136..a7059fbbfed 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -866,6 +866,7 @@ protected: void tick(); void _dispatch(Message *m); void dispatch_op(OpRequestRef op); + bool dispatch_op_fast(OpRequestRef op, OSDMapRef osdmap); void check_osdmap_features(ObjectStore *store); @@ -1021,14 +1022,17 @@ public: Mutex sent_epoch_lock; epoch_t last_sent_epoch; + Mutex received_map_lock; + epoch_t received_map_epoch; // largest epoch seen in MOSDMap from here Session() : auid(-1), con(0), session_dispatch_lock("Session::session_dispatch_lock"), - sent_epoch_lock("Session::sent_epoch_lock"), - last_sent_epoch(0) + sent_epoch_lock("Session::sent_epoch_lock"), last_sent_epoch(0), + received_map_lock("Session::received_map_lock"), received_map_epoch(0) {} }; + void dispatch_session_waiting(Session *session, OSDMapRef osdmap); Mutex session_waiting_for_map_lock; set session_waiting_for_map; /// Caller assumes refs for included Sessions @@ -1980,12 +1984,36 @@ protected: } private: + bool ms_can_fast_dispatch_any() const { return true; } + bool ms_can_fast_dispatch(Message *m) const { + switch (m->get_type()) { + case CEPH_MSG_OSD_OP: + case MSG_OSD_SUBOP: + case MSG_OSD_SUBOPREPLY: + case MSG_OSD_PG_PUSH: + case MSG_OSD_PG_PULL: + case MSG_OSD_PG_PUSH_REPLY: + case MSG_OSD_PG_SCAN: + case MSG_OSD_PG_BACKFILL: + case MSG_OSD_EC_WRITE: + case MSG_OSD_EC_WRITE_REPLY: + case MSG_OSD_EC_READ: + case MSG_OSD_EC_READ_REPLY: + return true; + default: + return false; + } + } + void ms_fast_dispatch(Message *m); + void ms_fast_preprocess(Message *m); bool ms_dispatch(Message *m); bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new); bool ms_verify_authorizer(Connection *con, int peer_type, int protocol, bufferlist& authorizer, bufferlist& authorizer_reply, bool& isvalid, CryptoKey& session_key); void ms_handle_connect(Connection *con); + void ms_handle_fast_connect(Connection *con); + void ms_handle_fast_accept(Connection *con); bool ms_handle_reset(Connection *con); void ms_handle_remote_reset(Connection *con) {}