]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
OSD: enable ms_fast_dispatch
authorGreg Farnum <greg@inktank.com>
Wed, 26 Mar 2014 22:04:39 +0000 (15:04 -0700)
committerGreg Farnum <greg@inktank.com>
Mon, 5 May 2014 22:29:20 +0000 (15:29 -0700)
We've been setting it up, now this patch actually adds a fast path for osd ops
which bypasses the osd_lock and should not block on any longly held locks. In
addition to the actual ms_fast_dispatch; we take advantage of the fast_notify
functions in order to create a Session for every peer, since that is now the
data structure around which we handle incoming Messages and waitlisting; and
fast_preprocess in order to track when a peer has already sent us a new map
(otherwise, if we see an op with a too-new epoch, we have to request it from
the monitor).

Signed-off-by: Samuel Just <sam.just@inktank.com>
Signed-off-by: Greg Farnum <greg@inktank.com>
src/osd/OSD.cc
src/osd/OSD.h

index 86e928d5dbe8f90341798e6be5b094e96ed8b62f..fc910ed98e00e7b2e6475e67999d7eb8409ea26e 100644 (file)
@@ -3653,6 +3653,41 @@ void OSD::ms_handle_connect(Connection *con)
   }
 }
 
+void OSD::ms_handle_fast_connect(Connection *con)
+{
+  if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
+    Session *s = static_cast<Session*>(con->get_priv());
+    if (!s) {
+      s = new Session;
+      con->set_priv(s->get());
+      s->con = con;
+      dout(10) << " new session (outgoing)" << s << " con=" << s->con
+          << " addr=" << s->con->get_peer_addr() << dendl;
+      // we don't connect to clients
+      assert(con->get_peer_type() == CEPH_ENTITY_TYPE_OSD);
+      s->entity_name.set_type(CEPH_ENTITY_TYPE_OSD);
+    }
+  }
+}
+
+void OSD::ms_handle_fast_accept(Connection *con)
+{
+  if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
+    Session *s = static_cast<Session*>(con->get_priv());
+    if (!s) {
+      s = new Session();
+      con->set_priv(s->get());
+      s->con = con;
+      dout(10) << "new session (incoming)" << s << " con=" << con
+          << " addr=" << con->get_peer_addr()
+          << " must have raced with connect" << dendl;
+      assert(con->get_peer_type() == CEPH_ENTITY_TYPE_OSD);
+      s->entity_name.set_type(CEPH_ENTITY_TYPE_OSD);
+    }
+    s->put();
+  }
+}
+
 bool OSD::ms_handle_reset(Connection *con)
 {
   OSD::Session *session = (OSD::Session *)con->get_priv();
@@ -4866,6 +4901,48 @@ bool OSD::ms_dispatch(Message *m)
   return true;
 }
 
+void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
+{
+  for (list<OpRequestRef>::iterator i = session->waiting_on_map.begin();
+       i != session->waiting_on_map.end() && dispatch_op_fast(*i, osdmap);
+       session->waiting_on_map.erase(i++));
+
+  if (session->waiting_on_map.empty()) {
+    clear_session_waiting_on_map(session);
+  } else {
+    register_session_waiting_on_map(session);
+  }
+}
+
+void OSD::ms_fast_dispatch(Message *m)
+{
+  OpRequestRef op = op_tracker.create_request<OpRequest>(m);
+  OSDMapRef nextmap = service.get_nextmap_reserved();
+  Session *session = static_cast<Session*>(m->get_connection()->get_priv());
+  assert(session);
+  {
+    Mutex::Locker l(session->session_dispatch_lock);
+    session->waiting_on_map.push_back(op);
+    dispatch_session_waiting(session, nextmap);
+  }
+  session->put();
+  service.release_map(nextmap);
+}
+
+void OSD::ms_fast_preprocess(Message *m)
+{
+  if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
+    if (m->get_type() == CEPH_MSG_OSD_MAP) {
+      MOSDMap *mm = static_cast<MOSDMap*>(m);
+      Session *s = static_cast<Session*>(m->get_connection()->get_priv());
+      s->received_map_lock.Lock();
+      s->received_map_epoch = mm->get_last();
+      s->received_map_lock.Unlock();
+      s->put();
+    }
+  }
+}
+
 bool OSD::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new)
 {
   dout(10) << "OSD::ms_get_authorizer type=" << ceph_entity_type_name(dest_type) << dendl;
@@ -5052,56 +5129,72 @@ void OSD::dispatch_op(OpRequestRef op)
   case MSG_OSD_RECOVERY_RESERVE:
     handle_pg_recovery_reserve(op);
     break;
-  default:
-    epoch_t msg_epoch(op_required_epoch(op));
-    if (msg_epoch > osdmap->get_epoch())
-      return false;
+  }
+}
 
-    switch(op->get_req()->get_type()) {
-    // client ops
-    case CEPH_MSG_OSD_OP:
-      handle_op(op, osdmap);
-      break;
+bool OSD::dispatch_op_fast(OpRequestRef op, OSDMapRef osdmap) {
+  if (is_stopping()) {
+    // we're shutting down, so drop the op
+    return true;
+  }
 
-      // for replication etc.
-    case MSG_OSD_SUBOP:
-      handle_replica_op<MOSDSubOp, MSG_OSD_SUBOP>(op, osdmap);
-      break;
-    case MSG_OSD_SUBOPREPLY:
-      handle_replica_op<MOSDSubOpReply, MSG_OSD_SUBOPREPLY>(op, osdmap);
-      break;
-    case MSG_OSD_PG_PUSH:
-      handle_replica_op<MOSDPGPush, MSG_OSD_PG_PUSH>(op, osdmap);
-      break;
-    case MSG_OSD_PG_PULL:
-      handle_replica_op<MOSDPGPull, MSG_OSD_PG_PULL>(op, osdmap);
-      break;
-    case MSG_OSD_PG_PUSH_REPLY:
-      handle_replica_op<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(op, osdmap);
-      break;
-    case MSG_OSD_PG_SCAN:
-      handle_replica_op<MOSDPGScan, MSG_OSD_PG_SCAN>(op, osdmap);
-      break;
-    case MSG_OSD_PG_BACKFILL:
-      handle_replica_op<MOSDPGBackfill, MSG_OSD_PG_BACKFILL>(op, osdmap);
-      break;
-    case MSG_OSD_EC_WRITE:
-      handle_replica_op<MOSDECSubOpWrite, MSG_OSD_EC_WRITE>(op, osdmap);
-      break;
-    case MSG_OSD_EC_WRITE_REPLY:
-      handle_replica_op<MOSDECSubOpWriteReply, MSG_OSD_EC_WRITE_REPLY>(op, osdmap);
-      break;
-    case MSG_OSD_EC_READ:
-      handle_replica_op<MOSDECSubOpRead, MSG_OSD_EC_READ>(op, osdmap);
-      break;
-    case MSG_OSD_EC_READ_REPLY:
-      handle_replica_op<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op, osdmap);
-      break;
-    default:
-      assert(0);
+  epoch_t msg_epoch(op_required_epoch(op));
+  if (msg_epoch > osdmap->get_epoch()) {
+    Session *s = static_cast<Session*>(op->get_req()->
+                                      get_connection()->get_priv());
+    s->received_map_lock.Lock();
+    epoch_t received_epoch = s->received_map_epoch;
+    s->received_map_lock.Unlock();
+    if (received_epoch < msg_epoch) {
+      osdmap_subscribe(msg_epoch, false);
     }
+    s->put();
+    return false;
+  }
+
+  switch(op->get_req()->get_type()) {
+  // client ops
+  case CEPH_MSG_OSD_OP:
+    handle_op(op, osdmap);
+    break;
+    // for replication etc.
+  case MSG_OSD_SUBOP:
+    handle_replica_op<MOSDSubOp, MSG_OSD_SUBOP>(op, osdmap);
     break;
+  case MSG_OSD_SUBOPREPLY:
+    handle_replica_op<MOSDSubOpReply, MSG_OSD_SUBOPREPLY>(op, osdmap);
+    break;
+  case MSG_OSD_PG_PUSH:
+    handle_replica_op<MOSDPGPush, MSG_OSD_PG_PUSH>(op, osdmap);
+    break;
+  case MSG_OSD_PG_PULL:
+    handle_replica_op<MOSDPGPull, MSG_OSD_PG_PULL>(op, osdmap);
+    break;
+  case MSG_OSD_PG_PUSH_REPLY:
+    handle_replica_op<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(op, osdmap);
+    break;
+  case MSG_OSD_PG_SCAN:
+    handle_replica_op<MOSDPGScan, MSG_OSD_PG_SCAN>(op, osdmap);
+    break;
+  case MSG_OSD_PG_BACKFILL:
+    handle_replica_op<MOSDPGBackfill, MSG_OSD_PG_BACKFILL>(op, osdmap);
+    break;
+  case MSG_OSD_EC_WRITE:
+    handle_replica_op<MOSDECSubOpWrite, MSG_OSD_EC_WRITE>(op, osdmap);
+    break;
+  case MSG_OSD_EC_WRITE_REPLY:
+    handle_replica_op<MOSDECSubOpWriteReply, MSG_OSD_EC_WRITE_REPLY>(op, osdmap);
+    break;
+  case MSG_OSD_EC_READ:
+    handle_replica_op<MOSDECSubOpRead, MSG_OSD_EC_READ>(op, osdmap);
+    break;
+  case MSG_OSD_EC_READ_REPLY:
+    handle_replica_op<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op, osdmap);
+    break;
+  default:
+    assert(0);
   }
+  return true;
 }
 
 void OSD::_dispatch(Message *m)
@@ -6040,6 +6133,17 @@ void OSD::consume_map()
   service.await_reserved_maps();
   service.publish_map(osdmap);
 
+  set<Session*> sessions_to_check;
+  get_sessions_waiting_for_map(&sessions_to_check);
+  for (set<Session*>::iterator i = sessions_to_check.begin();
+       i != sessions_to_check.end();
+       sessions_to_check.erase(i++)) {
+    (*i)->session_dispatch_lock.Lock();
+    dispatch_session_waiting(*i, osdmap);
+    (*i)->session_dispatch_lock.Unlock();
+    (*i)->put();
+  }
+
   // scan pg's
   {
     RWLock::RLocker l(pg_map_lock);
@@ -6311,8 +6415,15 @@ bool OSD::require_same_or_newer_map(OpRequestRef op, epoch_t epoch)
              << " expected " << (osdmap->have_inst(from) ? osdmap->get_cluster_addr(from) : entity_addr_t())
              << dendl;
       ConnectionRef con = m->get_connection();
-      con->set_priv(NULL);   // break ref <-> session cycle, if any
       cluster_messenger->mark_down(con.get());
+      Session *s = static_cast<Session*>(con->get_priv());
+      if (s) {
+       s->session_dispatch_lock.Lock();
+       clear_session_waiting_on_map(s);
+       con->set_priv(NULL);   // break ref <-> session cycle, if any
+       s->session_dispatch_lock.Unlock();
+       s->put();
+      }
       return false;
     }
   }
@@ -7628,6 +7739,7 @@ void OSD::handle_op(OpRequestRef op, OSDMapRef osdmap)
       return;
     }
   }
+
   // calc actual pgid
   pg_t _pgid = m->get_pg();
   int64_t pool = _pgid.pool();
index 659053251367ef77d71e91900ea685bcb48c2544..a7059fbbfed5ac59dd301dd5b128cbd56d299c39 100644 (file)
@@ -866,6 +866,7 @@ protected:
   void tick();
   void _dispatch(Message *m);
   void dispatch_op(OpRequestRef op);
+  bool dispatch_op_fast(OpRequestRef op, OSDMapRef osdmap);
 
   void check_osdmap_features(ObjectStore *store);
 
@@ -1021,14 +1022,17 @@ public:
 
     Mutex sent_epoch_lock;
     epoch_t last_sent_epoch;
+    Mutex received_map_lock;
+    epoch_t received_map_epoch; // largest epoch seen in MOSDMap from here
 
     Session() :
       auid(-1), con(0),
       session_dispatch_lock("Session::session_dispatch_lock"),
-      sent_epoch_lock("Session::sent_epoch_lock"),
-      last_sent_epoch(0)
+      sent_epoch_lock("Session::sent_epoch_lock"), last_sent_epoch(0),
+      received_map_lock("Session::received_map_lock"), received_map_epoch(0)
     {}
   };
+  void dispatch_session_waiting(Session *session, OSDMapRef osdmap);
   Mutex session_waiting_for_map_lock;
   set<Session*> session_waiting_for_map;
   /// Caller assumes refs for included Sessions
@@ -1980,12 +1984,36 @@ protected:
   }
 
  private:
+  bool ms_can_fast_dispatch_any() const { return true; }
+  bool ms_can_fast_dispatch(Message *m) const {
+    switch (m->get_type()) {
+    case CEPH_MSG_OSD_OP:
+    case MSG_OSD_SUBOP:
+    case MSG_OSD_SUBOPREPLY:
+    case MSG_OSD_PG_PUSH:
+    case MSG_OSD_PG_PULL:
+    case MSG_OSD_PG_PUSH_REPLY:
+    case MSG_OSD_PG_SCAN:
+    case MSG_OSD_PG_BACKFILL:
+    case MSG_OSD_EC_WRITE:
+    case MSG_OSD_EC_WRITE_REPLY:
+    case MSG_OSD_EC_READ:
+    case MSG_OSD_EC_READ_REPLY:
+      return true;
+    default:
+      return false;
+    }
+  }
+  void ms_fast_dispatch(Message *m);
+  void ms_fast_preprocess(Message *m);
   bool ms_dispatch(Message *m);
   bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new);
   bool ms_verify_authorizer(Connection *con, int peer_type,
                            int protocol, bufferlist& authorizer, bufferlist& authorizer_reply,
                            bool& isvalid, CryptoKey& session_key);
   void ms_handle_connect(Connection *con);
+  void ms_handle_fast_connect(Connection *con);
+  void ms_handle_fast_accept(Connection *con);
   bool ms_handle_reset(Connection *con);
   void ms_handle_remote_reset(Connection *con) {}