]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: check protocol version during connection
authorSage Weil <sage@newdream.net>
Thu, 10 Sep 2009 18:15:36 +0000 (11:15 -0700)
committerSage Weil <sage@newdream.net>
Thu, 10 Sep 2009 18:15:36 +0000 (11:15 -0700)
src/include/msgr.h
src/kernel/messenger.c
src/kernel/messenger.h
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index e80ebfd1c7a34009a722901356445ed23aa928c9..b17f0a9306536ee9f230b6c770a8be09bf61cb08 100644 (file)
@@ -21,7 +21,7 @@
  * whenever the wire protocol changes.  try to keep this string length
  * constant.
  */
-#define CEPH_BANNER "ceph v017"
+#define CEPH_BANNER "ceph v018"
 #define CEPH_BANNER_MAX_LEN 30
 
 
@@ -93,6 +93,7 @@ struct ceph_entity_inst {
 #define CEPH_MSGR_TAG_MSG          10  /* message */
 #define CEPH_MSGR_TAG_ACK          11  /* message ack */
 #define CEPH_MSGR_TAG_KEEPALIVE    12  /* just a keepalive byte! */
+#define CEPH_MSGR_TAG_BADPROTOVER  13  /* bad protocol version */
 
 
 /*
@@ -102,6 +103,7 @@ struct ceph_msg_connect {
        __le32 host_type;  /* CEPH_ENTITY_TYPE_* */
        __le32 global_seq;
        __le32 connect_seq;
+       __le32 protocol_version;
        __u8  flags;
 } __attribute__ ((packed));
 
@@ -109,6 +111,7 @@ struct ceph_msg_connect_reply {
        __u8 tag;
        __le32 global_seq;
        __le32 connect_seq;
+       __le32 protocol_version;
        __u8 flags;
 } __attribute__ ((packed));
 
index 99c691aad278300e801be88291cc9ca29a4bd715..1cda0d9282630180c53a210b71a28a250f925e27 100644 (file)
@@ -501,12 +501,28 @@ static void prepare_write_connect(struct ceph_messenger *msgr,
 {
        int len = strlen(CEPH_BANNER);
        unsigned global_seq = get_global_seq(con->msgr, 0);
+       int proto;
 
-       dout("prepare_write_connect %p connect_seq=%d global_seq=%d\n", con,
-            con->connect_seq, global_seq);
+       switch (con->peer_name.type) {
+       case CEPH_ENTITY_TYPE_MON:
+               proto = CEPH_MONC_PROTOCOL;
+               break;
+       case CEPH_ENTITY_TYPE_OSD:
+               proto = CEPH_OSDC_PROTOCOL;
+               break;
+       case CEPH_ENTITY_TYPE_MDS:
+               proto = CEPH_MDSC_PROTOCOL;
+               break;
+       default:
+               BUG();
+       }
+
+       dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
+            con->connect_seq, global_seq, proto);
        con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
        con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
        con->out_connect.global_seq = cpu_to_le32(global_seq);
+       con->out_connect.protocol_version = cpu_to_le32(proto);
        con->out_connect.flags = 0;
        if (test_bit(LOSSYTX, &con->state))
                con->out_connect.flags = CEPH_MSG_CONNECT_LOSSY;
@@ -831,6 +847,24 @@ static int process_connect(struct ceph_connection *con)
        }
 
        switch (con->in_reply.tag) {
+       case CEPH_MSGR_TAG_BADPROTOVER:
+               dout("process_connect got BADPROTOVER my %d != their %d\n",
+                    le32_to_cpu(con->out_connect.protocol_version),
+                    le32_to_cpu(con->in_reply.protocol_version));
+               pr_err("ceph %s%d %u.%u.%u.%u:%u protocol version mismatch,"
+                      " my %d != server's %d\n",
+                      ENTITY_NAME(con->peer_name),
+                      IPQUADPORT(con->peer_addr.ipaddr),
+                      le32_to_cpu(con->out_connect.protocol_version),
+                      le32_to_cpu(con->in_reply.protocol_version));
+               con->error_msg = "protocol version mismatch";
+               if (con->ops->bad_proto)
+                       con->ops->bad_proto(con);
+               reset_connection(con);
+               set_bit(CLOSED, &con->state);  /* in case there's queued work */
+               return -1;
+
+
        case CEPH_MSGR_TAG_RESETSESSION:
                /*
                 * If we connected with a large connect_seq but the peer
index 3550dbea2434464fa1750a9363a9e8f342f153e0..9d94edd304ff1a373ef5f096d41054a6814eba1f 100644 (file)
@@ -34,6 +34,9 @@ struct ceph_connection_operations {
        /* handle an incoming message. */
        void (*dispatch) (struct ceph_connection *con, struct ceph_msg *m);
 
+       /* protocol version mismatch */
+       void (*bad_proto) (struct ceph_connection *con);
+
        /* there was some error on the socket (disconnect, whatever) */
        void (*fault) (struct ceph_connection *con);
 
index 644b2bd41d2a386d1d58b74f6fdca6ec243aff5d..0cf00aae24b9492618b75502a8bfe632c1d07575 100644 (file)
@@ -369,7 +369,7 @@ void SimpleMessenger::Endpoint::prepare_dest(const entity_inst_t& inst)
   rank->lock.Lock();
   {
     if (rank->rank_pipe.count(inst.addr) == 0)
-      rank->connect_rank(inst.addr, rank->get_policy(inst.name.type()));
+      rank->connect_rank(inst.addr, inst.name.type());
   }
   rank->lock.Unlock();
 }
@@ -475,6 +475,36 @@ ostream& SimpleMessenger::Pipe::_pipe_prefix() {
                << ").";
 }
 
+static int get_proto_version(int my_type, int peer_type, bool connect)
+{
+  // set reply protocol version
+  if (peer_type == my_type) {
+    // internal
+    switch (my_type) {
+    case CEPH_ENTITY_TYPE_OSD: return CEPH_OSD_PROTOCOL;
+    case CEPH_ENTITY_TYPE_MDS: return CEPH_MDS_PROTOCOL;
+    case CEPH_ENTITY_TYPE_MON: return CEPH_MON_PROTOCOL;
+    }
+  } else {
+    // public
+    if (connect) {
+      switch (peer_type) {
+      case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
+      case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
+      case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
+      }
+    } else {
+      switch (my_type) {
+      case CEPH_ENTITY_TYPE_OSD: return CEPH_OSDC_PROTOCOL;
+      case CEPH_ENTITY_TYPE_MDS: return CEPH_MDSC_PROTOCOL;
+      case CEPH_ENTITY_TYPE_MON: return CEPH_MONC_PROTOCOL;
+      }
+    }
+  }
+  return 0;
+}
+
+
 int SimpleMessenger::Pipe::accept()
 {
   dout(10) << "accept" << dendl;
@@ -566,13 +596,24 @@ int SimpleMessenger::Pipe::accept()
     rank->lock.Lock();
 
     // note peer's type, flags
+    peer_type = connect.host_type;
     policy = rank->get_policy(connect.host_type);
     dout(10) << "accept host_type " << connect.host_type
             << ", setting policy, lossy_tx=" << policy.lossy_tx << dendl;
     lossy_rx = connect.flags & CEPH_MSG_CONNECT_LOSSY;
 
     memset(&reply, 0, sizeof(reply));
+    reply.protocol_version = get_proto_version(rank->my_type, peer_type, false);
 
+    // mismatch?
+    dout(10) << "accept my proto " << reply.protocol_version
+            << ", their proto " << connect.protocol_version << dendl;
+    if (connect.protocol_version != reply.protocol_version) {
+      reply.tag = CEPH_MSGR_TAG_BADPROTOVER;
+      rank->lock.Unlock();
+      goto reply;
+    }
+    
     // existing?
     if (rank->rank_pipe.count(peer_addr)) {
       existing = rank->rank_pipe[peer_addr];
@@ -889,6 +930,7 @@ int SimpleMessenger::Pipe::connect()
     connect.host_type = rank->my_type;
     connect.global_seq = gseq;
     connect.connect_seq = cseq;
+    connect.protocol_version = get_proto_version(rank->my_type, peer_type, true);
     connect.flags = 0;
     if (policy.lossy_tx)
       connect.flags |= CEPH_MSG_CONNECT_LOSSY;
@@ -899,7 +941,8 @@ int SimpleMessenger::Pipe::connect()
     msg.msg_iovlen = 1;
     msglen = msgvec[0].iov_len;
 
-    dout(10) << "connect sending gseq=" << gseq << " cseq=" << cseq << dendl;
+    dout(10) << "connect sending gseq=" << gseq << " cseq=" << cseq
+            << " proto=" << connect.protocol_version << dendl;
     if (do_sendmsg(sd, &msg, msglen)) {
       dout(2) << "connect couldn't write gseq, cseq, " << strerror(errno) << dendl;
       goto fail;
@@ -914,6 +957,7 @@ int SimpleMessenger::Pipe::connect()
     dout(20) << "connect got reply tag " << (int)reply.tag
             << " connect_seq " << reply.connect_seq
             << " global_seq " << reply.global_seq
+            << " proto " << reply.protocol_version
             << " flags " << (int)reply.flags
             << dendl;
 
@@ -923,6 +967,12 @@ int SimpleMessenger::Pipe::connect()
       goto stop_locked;
     }
 
+    if (reply.tag == CEPH_MSGR_TAG_BADPROTOVER) {
+      dout(0) << "connect protocol version mismatch, my " << connect.protocol_version
+             << " != " << reply.protocol_version << dendl;
+      goto fail_locked;
+    }
+
     if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
       dout(0) << "connect got RESETSESSION" << dendl;
       was_session_reset();
@@ -1993,7 +2043,7 @@ int SimpleMessenger::start(bool nodaemon)
 /* connect_rank
  * NOTE: assumes rank.lock held.
  */
-SimpleMessenger::Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr, const Policy& p)
+SimpleMessenger::Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr, int type)
 {
   assert(lock.is_locked());
   assert(addr != rank_addr);
@@ -2002,7 +2052,8 @@ SimpleMessenger::Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr,
   
   // create pipe
   Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING);
-  pipe->policy = p;
+  pipe->peer_type = type;
+  pipe->policy = get_policy(type);
   pipe->peer_addr = addr;
   pipe->start_writer();
   pipe->register_pipe();
@@ -2133,7 +2184,7 @@ void SimpleMessenger::submit_message(Message *m, const entity_inst_t& dest, bool
        } else {
          dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", new pipe." << dendl;
          // not connected.
-         pipe = connect_rank(dest_proc_addr, get_policy(dest.name.type()));
+         pipe = connect_rank(dest_proc_addr, dest.name.type());
          pipe->send(m);
        }
       }
@@ -2171,7 +2222,7 @@ void SimpleMessenger::send_keepalive(const entity_inst_t& dest)
       if (!pipe) {
        dout(20) << "send_keepalive remote, " << dest_addr << ", new pipe." << dendl;
        // not connected.
-       pipe = connect_rank(dest_proc_addr, get_policy(dest.name.type()));
+       pipe = connect_rank(dest_proc_addr, dest.name.type());
        pipe->send_keepalive();
       }
     }
index 7f3aed16b3f857f9a531fc121d9ac666667099de..b64b0a58a8e61d4a000b3f5e480c2ff827506963 100644 (file)
@@ -127,7 +127,7 @@ private:
     };
 
     int sd;
-    int new_sd;
+    int peer_type;
     entity_addr_t peer_addr;
     Policy policy;
     bool lossy_rx;
@@ -191,7 +191,7 @@ private:
   public:
     Pipe(SimpleMessenger *r, int st) : 
       rank(r),
-      sd(-1),
+      sd(-1), peer_type(-1),
       lock("SimpleMessenger::Pipe::lock"),
       state(st), 
       connection_state(new Connection),
@@ -422,7 +422,7 @@ private:
   Mutex global_seq_lock;
   __u32 global_seq;
       
-  Pipe *connect_rank(const entity_addr_t& addr, const Policy& p);
+  Pipe *connect_rank(const entity_addr_t& addr, int type);
 
   const entity_addr_t &get_rank_addr() { return rank_addr; }