]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: messenger.c cleanup, reorganization, comments
authorSage Weil <sage@newdream.net>
Sat, 18 Oct 2008 03:47:07 +0000 (20:47 -0700)
committerSage Weil <sage@newdream.net>
Sat, 18 Oct 2008 03:47:07 +0000 (20:47 -0700)
src/TODO
src/kernel/decode.h
src/kernel/messenger.c
src/kernel/messenger.h
src/kernel/super.h

index c31a0b2841f3cb5f177ab0737477b39d0aa23d1d..f7d5ed36cb5171232c861ac63d99a06779281596 100644 (file)
--- a/src/TODO
+++ b/src/TODO
@@ -47,7 +47,6 @@ snaps on osd
 - efficient recovery of clones using the clone diff info
 
 kernel client
-- fix osd client timeout
 - make osd retry writes if failure after ack..
 - clean up cap flush on session close
 - ACLs
index ef21b69cce38e53fd371b8aa971190e089f7ce1c..26c2cd4b0a2c8a3e3cf351a11cd4ee9c7e756040 100644 (file)
@@ -37,6 +37,7 @@
                (*p)++;                                 \
        } while (0)
 
+/* decode into an __le## */
 #define ceph_decode_64_le(p, v)                                \
        do {                                            \
                v = *(__le64*)*(p);                     \
@@ -59,6 +60,7 @@
                *(p) += n;                              \
        } while (0)
 
+/* bounds check too */
 #define ceph_decode_64_safe(p, end, v, bad)                    \
        do {                                                    \
                ceph_decode_need(p, end, sizeof(__u64), bad);   \
 /*
  * encoders
  */
-
 #define ceph_encode_64(p, v)                     \
        do {                                      \
                *(__le64*)*(p) = cpu_to_le64((v)); \
 /*
  * filepath, string encoders
  */
-
 static __inline__ void ceph_encode_filepath(void **p, void *end,
                                            __u64 ino, const char *path)
 {
index 9fab1017f305eeac4aec75675d53772e51998caf..21e97dd8626e7eae032e7a3b6e0dd4c4db0b8e7a 100644 (file)
@@ -27,12 +27,14 @@ static char tag_wait = CEPH_MSGR_TAG_WAIT;
 static char tag_msg = CEPH_MSGR_TAG_MSG;
 static char tag_ack = CEPH_MSGR_TAG_ACK;
 
+
+static void ceph_queue_con(struct ceph_connection *con);
 static void con_work(struct work_struct *);
-static void try_accept(struct work_struct *);
+static void ceph_fault(struct ceph_connection *con);
 
 
 /*
- *  workqueue
+ * work queue for all reading and writing to/from the socket.
  */
 struct workqueue_struct *ceph_msgr_wq;
 
@@ -58,7 +60,7 @@ void ceph_msgr_exit(void)
  * socket callback functions
  */
 
-/* listen socket received a connect */
+/* listen socket received a connection */
 static void ceph_accept_ready(struct sock *sk, int count_unused)
 {
        struct ceph_messenger *msgr = (struct ceph_messenger *)sk->sk_user_data;
@@ -69,37 +71,36 @@ static void ceph_accept_ready(struct sock *sk, int count_unused)
                queue_work(ceph_msgr_wq, &msgr->awork);
 }
 
-/* Data available on socket or listen socket received a connect */
+/* data available on socket, or listen socket received a connect */
 static void ceph_data_ready(struct sock *sk, int count_unused)
 {
        struct ceph_connection *con =
                (struct ceph_connection *)sk->sk_user_data;
        if (sk->sk_state != TCP_CLOSE_WAIT) {
-               dout(30, "ceph_data_ready on %p state = %lu, queuing rwork\n",
+               dout(30, "ceph_data_ready on %p state = %lu, queueing work\n",
                     con, con->state);
                ceph_queue_con(con);
        }
 }
 
-/* socket has bufferspace for writing */
+/* socket has buffer space for writing */
 static void ceph_write_space(struct sock *sk)
 {
        struct ceph_connection *con =
                (struct ceph_connection *)sk->sk_user_data;
 
-       dout(30, "ceph_write_space %p state = %lu\n", con, con->state);
-
-       /* only queue to workqueue if a WRITE is pending */
+       /* only queue to workqueue if there is data we want to write. */
        if (test_bit(WRITE_PENDING, &con->state)) {
-               dout(30, "ceph_write_space %p queuing write work\n", con);
+               dout(30, "ceph_write_space %p queueing write work\n", con);
                ceph_queue_con(con);
-       }
+       } else
+               dout(30, "ceph_write_space %p nothing to write\n", con);
 
-       /* Since we have our own write_space, Clear the SOCK_NOSPACE flag */
+       /* since we have our own write_space, clear the SOCK_NOSPACE flag */
        clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
 }
 
-/* sockets state has change */
+/* socket's state has changed */
 static void ceph_state_change(struct sock *sk)
 {
        struct ceph_connection *con =
@@ -125,12 +126,14 @@ static void ceph_state_change(struct sock *sk)
                break;
        case TCP_ESTABLISHED:
                dout(30, "ceph_state_change TCP_ESTABLISHED\n");
-               ceph_write_space(sk);
+               ceph_queue_con(con);
                break;
        }
 }
 
-/* make a listening socket active by setting up the data ready call back */
+/*
+ * set up socket callbacks
+ */
 static void listen_sock_callbacks(struct socket *sock,
                                  struct ceph_messenger *msgr)
 {
@@ -139,7 +142,6 @@ static void listen_sock_callbacks(struct socket *sock,
        sk->sk_data_ready = ceph_accept_ready;
 }
 
-/* make a socket active by setting up the call back functions */
 static void set_sock_callbacks(struct socket *sock,
                               struct ceph_connection *con)
 {
@@ -160,35 +162,36 @@ static void set_sock_callbacks(struct socket *sock,
  */
 static struct socket *ceph_tcp_connect(struct ceph_connection *con)
 {
-       int ret;
        struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
        struct socket *sock;
+       int ret;
 
+       BUG_ON(con->sock);
        ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
        if (ret)
                return ERR_PTR(ret);
-
        con->sock = sock;
        sock->sk->sk_allocation = GFP_NOFS;
 
        set_sock_callbacks(sock, con);
 
-       dout (20, "connect %u.%u.%u.%u:%u\n",
-                    IPQUADPORT(*(struct sockaddr_in *)paddr));
+       dout(20, "connect %u.%u.%u.%u:%u\n",
+            IPQUADPORT(*(struct sockaddr_in *)paddr));
 
        ret = sock->ops->connect(sock, paddr,
                                 sizeof(struct sockaddr_in), O_NONBLOCK);
        if (ret == -EINPROGRESS) {
-               dout(20, "connect EINPROGRESS sk_state = = %u\n",
+               dout(20, "connect %u.%u.%u.%u:%u EINPROGRESS sk_state = %u\n",
+                    IPQUADPORT(*(struct sockaddr_in *)paddr),
                     sock->sk->sk_state);
                ret = 0;
        }
        if (ret < 0) {
-               /* TBD check for fatal errors, retry if not fatal.. */
-               derr(1, "connect %u.%u.%u.%u:%u error: %d\n",
+               derr(1, "connect %u.%u.%u.%u:%u error %d\n",
                     IPQUADPORT(*(struct sockaddr_in *)paddr), ret);
                sock_release(sock);
                con->sock = NULL;
+               con->error_msg = "connect error";
        }
 
        if (ret < 0)
@@ -197,7 +200,7 @@ static struct socket *ceph_tcp_connect(struct ceph_connection *con)
 }
 
 /*
- * setup listening socket
+ * set up listening socket
  */
 static int ceph_tcp_listen(struct ceph_messenger *msgr)
 {
@@ -210,13 +213,11 @@ static int ceph_tcp_listen(struct ceph_messenger *msgr)
        ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
        if (ret)
                return ret;
-
        sock->sk->sk_allocation = GFP_NOFS;
-
        ret = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
                                (char *)&optval, sizeof(optval));
        if (ret < 0) {
-               derr(0, "Failed to set SO_REUSEADDR: %d\n", ret);
+               derr(0, "failed to set SO_REUSEADDR: %d\n", ret);
                goto err;
        }
 
@@ -235,26 +236,15 @@ static int ceph_tcp_listen(struct ceph_messenger *msgr)
                derr(0, "failed to getsockname: %d\n", ret);
                goto err;
        }
-       dout(10, "listen on port %d\n", ntohs(myaddr->sin_port));
-
-       ret = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
-                               (char *)&optval, sizeof(optval));
-       if (ret < 0) {
-               derr(0, "Failed to set SO_KEEPALIVE: %d\n", ret);
-               goto err;
-       }
+       dout(0, "listening on %u.%u.%u.%u:%u\n", IPQUADPORT(*myaddr));
 
-       /* TBD: probaby want to tune the backlog queue .. */
-       ret = sock->ops->listen(sock, CEPH_MSGR_BACKUP);
-       if (ret < 0) {
-               derr(0, "kernel_listen error: %d\n", ret);
-               goto err;
-       }
+       /* we don't care too much if this works or not */
+       sock->ops->listen(sock, CEPH_MSGR_BACKUP);
 
        /* ok! */
        msgr->listen_sock = sock;
        listen_sock_callbacks(sock, msgr);
-       return ret;
+       return 0;
 
 err:
        sock_release(sock);
@@ -262,20 +252,17 @@ err:
 }
 
 /*
- *  accept a connection
+ * accept a connection
  */
 static int ceph_tcp_accept(struct socket *lsock, struct ceph_connection *con)
 {
-       int ret;
-       struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
-       int len;
        struct socket *sock;
+       int ret;
 
        ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
        if (ret)
                return ret;
        con->sock = sock;
-
        sock->sk->sk_allocation = GFP_NOFS;
 
        ret = lsock->ops->accept(lsock, sock, O_NONBLOCK);
@@ -286,15 +273,7 @@ static int ceph_tcp_accept(struct socket *lsock, struct ceph_connection *con)
 
        sock->ops = lsock->ops;
        sock->type = lsock->type;
-       ret = sock->ops->getname(sock, paddr, &len, 2);
-       if (ret < 0) {
-               derr(0, "getname error: %d\n", ret);
-               goto err;
-       }
-
-       /* setup callbacks */
        set_sock_callbacks(sock, con);
-
        return ret;
 
 err:
@@ -303,85 +282,74 @@ err:
        return ret;
 }
 
-/*
- * receive a message this may return after partial send
- */
 static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
 {
        struct kvec iov = {buf, len};
-       struct msghdr msg = {.msg_flags = 0};
-       int rlen = 0;           /* length read */
+       struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
 
-       msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
-       rlen = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
-       return(rlen);
+       return kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
 }
 
-
 /*
- * Send a message this may return after partial send
+ * write something.  @more is true if caller will be sending more data
+ * shortly.
  */
 static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
                     size_t kvlen, size_t len, int more)
 {
-       struct msghdr msg = {.msg_flags = 0};
-       int rlen = 0;
+       struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
 
-       msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
        if (more)
                msg.msg_flags |= MSG_MORE;
        else
                msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
 
-       rlen = kernel_sendmsg(sock, &msg, iov, kvlen, len);
-
-       return(rlen);
+       return kernel_sendmsg(sock, &msg, iov, kvlen, len);
 }
 
 
-
 /*
  * create a new connection.
  */
 static struct ceph_connection *new_connection(struct ceph_messenger *msgr)
 {
        struct ceph_connection *con;
+
        con = kzalloc(sizeof(struct ceph_connection), GFP_NOFS);
        if (con == NULL)
                return NULL;
-
        con->msgr = msgr;
        atomic_set(&con->nref, 1);
-
        INIT_LIST_HEAD(&con->list_all);
        INIT_LIST_HEAD(&con->list_bucket);
-
        spin_lock_init(&con->out_queue_lock);
        INIT_LIST_HEAD(&con->out_queue);
        INIT_LIST_HEAD(&con->out_sent);
-
        INIT_DELAYED_WORK(&con->work, con_work);
-
        return con;
 }
 
 /*
- * the radix_tree has an unsigned long key and void * value.  since
- * ceph_entity_addr is bigger than that, we use a trivial hash key, and
- * point to a list_head in ceph_connection, as you would with a hash
- * table.  in the rare event that the trivial hash collides, we just
- * traverse the (short) list.
+ * The con_tree radix_tree has an unsigned long key and void * value.
+ * Since ceph_entity_addr is bigger than that, we use a trivial hash
+ * key, and point to a list_head in ceph_connection, as you would with
+ * a hash table.  If the trivial hash collides, we just traverse the
+ * (hopefully short) list until we find what we want.
  */
 static unsigned long hash_addr(struct ceph_entity_addr *addr)
 {
        unsigned long key;
+
        key = *(__u32 *)&addr->ipaddr.sin_addr.s_addr;
        key ^= *(__u16 *)&addr->ipaddr.sin_port;
        return key;
 }
 
 /*
- * get an existing connection, if any, for given addr
+ * Get an existing connection, if any, for given addr.  Note that we
+ * may need to traverse the list_bucket list, which has to "head."
+ *
+ * called under con_lock.
  */
 static struct ceph_connection *__get_connection(struct ceph_messenger *msgr,
                                                struct ceph_entity_addr *addr)
@@ -390,7 +358,6 @@ static struct ceph_connection *__get_connection(struct ceph_messenger *msgr,
        struct list_head *head, *p;
        unsigned long key = hash_addr(addr);
 
-       /* existing? */
        head = radix_tree_lookup(&msgr->con_tree, key);
        if (head == NULL)
                return NULL;
@@ -402,7 +369,6 @@ static struct ceph_connection *__get_connection(struct ceph_messenger *msgr,
                if (memcmp(&con->peer_addr, addr, sizeof(addr)) == 0)
                        goto yes;
        }
-
        return NULL;
 
 yes:
@@ -414,18 +380,18 @@ yes:
 
 
 /*
- * close connection socket
+ * Shutdown/close the socket for the given connection.
  */
 static int con_close_socket(struct ceph_connection *con)
 {
        int rc;
+
        dout(10, "con_close_socket on %p sock %p\n", con, con->sock);
        if (!con->sock)
                return 0;
        rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
        sock_release(con->sock);
        con->sock = NULL;
-
        return rc;
 }
 
@@ -447,20 +413,23 @@ static void put_connection(struct ceph_connection *con)
 }
 
 /*
- * add to connections tree
+ * add a connection to the con_tree.
+ *
+ * called under con_lock.
  */
 static int __register_connection(struct ceph_messenger *msgr,
-                                 struct ceph_connection *con)
+                                struct ceph_connection *con)
 {
        struct list_head *head;
        unsigned long key = hash_addr(&con->peer_addr);
        int rc = 0;
 
-       /* inc ref count */
+       dout(20, "register_connection %p %d -> %d\n", con,
+            atomic_read(&con->nref), atomic_read(&con->nref) + 1);
        atomic_inc(&con->nref);
-       dout(20, "add_connection %p %d -> %d\n", con,
-            atomic_read(&con->nref) - 1, atomic_read(&con->nref));
 
+       /* if were just ACCEPTING this connection, it is already on the 
+        * con_all and con_accepting lists. */
        if (test_and_clear_bit(ACCEPTING, &con->state)) {
                list_del_init(&con->list_bucket);
                put_connection(con);
@@ -469,25 +438,27 @@ static int __register_connection(struct ceph_messenger *msgr,
 
        head = radix_tree_lookup(&msgr->con_tree, key);
        if (head) {
-               dout(20, "add_connection %p in existing bucket %lu head %p\n",
+               dout(20, "register_connection %p in old bucket %lu head %p\n",
                     con, key, head);
                list_add(&con->list_bucket, head);
        } else {
-               dout(20, "add_connection %p in new bucket %lu head %p\n", con,
-                    key, &con->list_bucket);
-               INIT_LIST_HEAD(&con->list_bucket); /* empty */
+               dout(20, "register_connection %p in new bucket %lu head %p\n",
+                    con, key, &con->list_bucket);
+               INIT_LIST_HEAD(&con->list_bucket);   /* empty */
                rc = radix_tree_insert(&msgr->con_tree, key, &con->list_bucket);
-
                if (rc < 0) {
                        list_del(&con->list_all);
+                       put_connection(con);
                        return rc;
                }
        }
        set_bit(REGISTERED, &con->state);
-
        return 0;
 }
 
+/*
+ * called under con_lock.
+ */
 static void add_connection_accepting(struct ceph_messenger *msgr,
                                     struct ceph_connection *con)
 {
@@ -500,8 +471,10 @@ static void add_connection_accepting(struct ceph_messenger *msgr,
 }
 
 /*
- * remove connection from all list.
- * also, from con_tree radix tree, if it should have been there
+ * Remove connection from all list.  Also, from con_tree, if it should
+ * have been there.
+ *
+ * called under con_lock.
  */
 static void __remove_connection(struct ceph_messenger *msgr,
                                struct ceph_connection *con)
@@ -516,14 +489,15 @@ static void __remove_connection(struct ceph_messenger *msgr,
        }
        list_del_init(&con->list_all);
        if (test_bit(REGISTERED, &con->state)) {
-               /* remove from con_tree too */
                key = hash_addr(&con->peer_addr);
                if (list_empty(&con->list_bucket)) {
-                       /* last one */
+                       /* last one in this bucket */
                        dout(20, "__remove_connection %p and bucket %lu\n",
                             con, key);
                        radix_tree_delete(&msgr->con_tree, key);
                } else {
+                       /* if we share this bucket, and the radix tree points
+                        * to us, adjust it to point to the next guy. */
                        slot = radix_tree_lookup_slot(&msgr->con_tree, key);
                        val = radix_tree_deref_slot(slot);
                        dout(20, "__remove_connection %p from bucket %lu "
@@ -538,7 +512,7 @@ static void __remove_connection(struct ceph_messenger *msgr,
                        list_del_init(&con->list_bucket);
                }
        }
-       if (test_bit(ACCEPTING, &con->state))
+       if (test_and_clear_bit(ACCEPTING, &con->state))
                list_del_init(&con->list_bucket);
        put_connection(con);
 }
@@ -551,248 +525,111 @@ static void remove_connection(struct ceph_messenger *msgr,
        spin_unlock(&msgr->con_lock);
 }
 
-
-/*
- * atomically queue work on a connection.  bump reference to avoid
- * races with connection teardown.
- */
-void ceph_queue_con(struct ceph_connection *con)
-{
-       if (test_bit(WAIT, &con->state) ||
-           test_bit(CLOSED, &con->state) ||
-           test_bit(BACKOFF, &con->state)) {
-               dout(40, "ceph_queue_con %p ignoring: WAIT|CLOSED|BACKOFF\n",
-                    con);
-               return;
-       }
-
-       atomic_inc(&con->nref);
-       dout(40, "ceph_queue_con %p %d -> %d\n", con,
-            atomic_read(&con->nref) - 1, atomic_read(&con->nref));
-
-       set_bit(QUEUED, &con->state);
-       if (test_bit(BUSY, &con->state) ||
-           !queue_work(ceph_msgr_wq, &con->work.work)) {
-               dout(40, "ceph_queue_con %p - already BUSY or queued\n", con);
-               put_connection(con);
-       }
-}
-
-
 /*
- * failure case
- * A retry mechanism is used with exponential backoff
+ * replace another connection
+ *  (old and new should be for the _same_ peer,
+ *   and thus in the same bucket in the radix tree)
  */
-static void ceph_fault(struct ceph_connection *con)
+static void __replace_connection(struct ceph_messenger *msgr,
+                                struct ceph_connection *old,
+                                struct ceph_connection *new)
 {
-       derr(1, "%s%d %u.%u.%u.%u:%u %s\n", ENTITY_NAME(con->peer_name),
-            IPQUADPORT(con->peer_addr.ipaddr), con->error_msg);
-       dout(10, "fault %p state %lu to peer %u.%u.%u.%u:%u\n",
-            con, con->state, IPQUADPORT(con->peer_addr.ipaddr));
+       /* replace in con_tree */
+       if (list_empty(&old->list_bucket)) {
+               /* oh, just replace old with new in bucket list */
+               list_add(&new->list_bucket, &old->list_bucket);
+               list_del_init(&old->list_bucket);
+       } else {
+               unsigned long key = hash_addr(&new->peer_addr);
+               void **slot;
 
-       if (test_bit(LOSSYTX, &con->state)) {
-               dout(30, "fault on LOSSYTX channel\n");
-               remove_connection(con->msgr, con);
-               return;
+               slot = radix_tree_lookup_slot(&msgr->con_tree, key);
+               BUG_ON(radix_tree_deref_slot(slot) != &old->list_bucket);
+               radix_tree_replace_slot(slot, &new->list_bucket);
        }
 
-       con_close_socket(con);
-
-       /* hmm? */
-       BUG_ON(test_bit(WAIT, &con->state));
+       /* take old connections message queue */
+       spin_lock(&old->out_queue_lock);
+       if (!list_empty(&old->out_queue))
+               list_splice_init(&new->out_queue, &old->out_queue);
+       spin_unlock(&old->out_queue_lock);
 
-       /*
-        * If there are no messages in the queue, place the
-        * connection in a STANDBY state.  otherwise, retry with
-        * delay
-        */
-       spin_lock(&con->out_queue_lock);
-       if (list_empty(&con->out_queue)) {
-               dout(10, "fault setting STANDBY\n");
-               set_bit(STANDBY, &con->state);
-               spin_unlock(&con->out_queue_lock);
-               return;
-       }
+       new->connect_seq = le32_to_cpu(new->in_connect.connect_seq);
+       new->out_seq = old->out_seq;
 
-       dout(10, "fault setting BACKOFF\n");
-       set_bit(BACKOFF, &con->state);
+       set_bit(CLOSED, &old->state);
+       put_connection(old); /* dec reference count */
 
-       if (con->delay == 0)
-               con->delay = BASE_DELAY_INTERVAL;
-       else if (con->delay < MAX_DELAY_INTERVAL)
-               con->delay *= 2;
+       clear_bit(ACCEPTING, &new->state);
+}
 
-       atomic_inc(&con->nref);
-       dout(40, "fault queueing %p %d -> %d delay %lu\n", con,
-            atomic_read(&con->nref) - 1, atomic_read(&con->nref),
-            con->delay);
-       queue_delayed_work(ceph_msgr_wq, &con->work,
-                          round_jiffies_relative(con->delay));
 
-       list_splice_init(&con->out_sent, &con->out_queue);
-       spin_unlock(&con->out_queue_lock);
-}
 
-/*
- * non-blocking versions
- *
- * these should be called while holding con->con_lock
- */
 
 /*
- * write as much of con->out_partial to the socket as we can.
- *  1 -> done
- *  0 -> socket full, but more to do
- * <0 -> error
+ * We maintain a global counter to order connection attempts.  Get
+ * a unique seq greater than @gt.
  */
-static int write_partial_kvec(struct ceph_connection *con)
+static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
 {
-       int ret;
+       u32 ret;
 
-       dout(10, "write_partial_kvec have %d left\n", con->out_kvec_bytes);
-       while (con->out_kvec_bytes > 0) {
-               ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
-                                      con->out_kvec_left, con->out_kvec_bytes,
-                                      con->out_more);
-               if (ret <= 0)
-                       goto out;
-               con->out_kvec_bytes -= ret;
-               if (con->out_kvec_bytes == 0)
-                       break;            /* done */
-               while (ret > 0) {
-                       if (ret >= con->out_kvec_cur->iov_len) {
-                               ret -= con->out_kvec_cur->iov_len;
-                               con->out_kvec_cur++;
-                               con->out_kvec_left--;
-                       } else {
-                               con->out_kvec_cur->iov_len -= ret;
-                               con->out_kvec_cur->iov_base += ret;
-                               ret = 0;
-                               break;
-                       }
-               }
-       }
-       con->out_kvec_left = 0;
-       ret = 1;
-out:
-       dout(30, "write_partial_kvec %p left %d vec %d bytes ret = %d\n", con,
-            con->out_kvec_left, con->out_kvec_bytes, ret);
-       return ret;  /* done! */
+       spin_lock(&msgr->global_seq_lock);
+       if (msgr->global_seq < gt)
+               msgr->global_seq = gt;
+       ret = ++msgr->global_seq;
+       spin_unlock(&msgr->global_seq_lock);
+       return ret;
 }
 
-static int write_partial_msg_pages(struct ceph_connection *con,
-                                  struct ceph_msg *msg)
-{
-       int ret;
-       unsigned data_len = le32_to_cpu(msg->hdr.data_len);
-       struct ceph_client *client = con->msgr->parent;
-       int crc = !(client->mount_args.flags & CEPH_MOUNT_NOCRC);
-       size_t len;
-
-       dout(30, "write_partial_msg_pages con %p msg %p on %d/%d offset %d\n",
-            con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
-            con->out_msg_pos.page_pos);
-
-       while (con->out_msg_pos.page < con->out_msg->nr_pages) {
-               struct page *page = NULL;
-               void *kaddr = NULL;
-
-               mutex_lock(&msg->page_mutex);
-               if (msg->pages) {
-                       page = msg->pages[con->out_msg_pos.page];
-                       if (crc)
-                               kaddr = kmap(page);
-               } else {
-                       /*dout(60, "using zero page\n");*/
-                       if (crc)
-                               kaddr = page_address(con->msgr->zero_page);
-               }
-               len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
-                         (int)(data_len - con->out_msg_pos.data_pos));
-               if (crc && !con->out_msg_pos.did_page_crc) {
-                       void *base = kaddr + con->out_msg_pos.page_pos;
-
-                       con->out_msg->footer.data_crc =
-                               cpu_to_le32(crc32c_le(le32_to_cpu(con->out_msg->footer.data_crc),
-                                         base, len));
-                       con->out_msg_pos.did_page_crc = 1;
-               }
-
-               if (msg->pages)
-                       ret = kernel_sendpage(con->sock, page,
-                                             con->out_msg_pos.page_pos, len,
-                                             MSG_DONTWAIT | MSG_NOSIGNAL |
-                                             MSG_MORE);
-               else
-                       ret = kernel_sendpage(con->sock, con->msgr->zero_page,
-                                             con->out_msg_pos.page_pos, len,
-                                             MSG_DONTWAIT | MSG_NOSIGNAL |
-                                             MSG_MORE);
-
-               if (crc && msg->pages)
-                       kunmap(page);
-
-               mutex_unlock(&msg->page_mutex);
-               if (ret <= 0)
-                       goto out;
-               con->out_msg_pos.data_pos += ret;
-               con->out_msg_pos.page_pos += ret;
 
-               if (ret == len) {
-                       con->out_msg_pos.page_pos = 0;
-                       con->out_msg_pos.page++;
-                       con->out_msg_pos.did_page_crc = 0;
-               }
-       }
 
-       /* done with data pages */
-       dout(30, "write_partial_msg_pages wrote all pages on %p\n", con);
-
-       /* queue up footer, too */
-       if (!crc)
-               con->out_msg->footer.flags |= cpu_to_le32(CEPH_MSG_FOOTER_NOCRC);
-       con->out_kvec[0].iov_base = &con->out_msg->footer;
-       con->out_kvec_bytes = con->out_kvec[0].iov_len =
-               sizeof(con->out_msg->footer);
-       con->out_kvec_left = 1;
-       con->out_kvec_cur = con->out_kvec;
-       con->out_msg = NULL;
-       con->out_more = 0;  /* end of message */
 
-       ret = 1;
-out:
-       return ret;
+/*
+ * Prepare footer for currently outgoing message, and finish things
+ * off.  Assumes out_kvec* are already valid.. we just add on to the end.
+ */
+static void prepare_write_message_footer(struct ceph_connection *con, int v)
+{
+       struct ceph_msg *m = con->out_msg;
+
+       con->out_kvec[v].iov_base = &m->footer;
+       con->out_kvec[v].iov_len = sizeof(m->footer);
+       con->out_kvec_bytes += sizeof(m->footer);
+       con->out_kvec_left++;
+       con->out_msg = NULL;   /* we're done with this one */
+       con->out_more = 0;     /* end of message */
 }
 
-
 /*
- * build out_partial based on the next outgoing message in the queue.
+ * Prepare headers for the next outgoing message.
  */
 static void prepare_write_message(struct ceph_connection *con)
 {
        struct ceph_msg *m;
        int v = 0;
+
        con->out_kvec_bytes = 0;
 
-       /* ack? */
+       /* Sneak an ack in there first?  If we can get it into the same
+        * TCP packet that's a good thing. */
        if (con->in_seq > con->in_seq_acked) {
                con->in_seq_acked = con->in_seq;
                con->out_kvec[v].iov_base = &tag_ack;
                con->out_kvec[v++].iov_len = 1;
-               con->out32 = cpu_to_le32(con->in_seq_acked);
-               con->out_kvec[v].iov_base = &con->out32;
+               con->out_temp_ack = cpu_to_le32(con->in_seq_acked);
+               con->out_kvec[v].iov_base = &con->out_temp_ack;
                con->out_kvec[v++].iov_len = 4;
                con->out_kvec_bytes = 1 + 4;
        }
 
-       /* move to sending/sent list */
+       /* move message to sending/sent list */
        m = list_entry(con->out_queue.next,
                       struct ceph_msg, list_head);
        list_del_init(&m->list_head);
        list_add_tail(&m->list_head, &con->out_sent);
-       con->out_msg = m;  /* we dont bother taking a reference here. */
+       con->out_msg = m;   /* we don't bother taking a reference here. */
 
-       /* encode header */
        dout(20, "prepare_write_message %p seq %lld type %d len %d+%d %d pgs\n",
             m, le64_to_cpu(m->hdr.seq), le16_to_cpu(m->hdr.type),
             le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.data_len),
@@ -808,27 +645,35 @@ static void prepare_write_message(struct ceph_connection *con)
        con->out_kvec_left = v;
        con->out_kvec_bytes += 1 + sizeof(m->hdr) + m->front.iov_len;
        con->out_kvec_cur = con->out_kvec;
-       con->out_more = 1;  /* data? */
-
-       /* pages */
-       con->out_msg_pos.page = 0;
-       con->out_msg_pos.page_pos = le32_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
-       con->out_msg_pos.data_pos = 0;
-       con->out_msg_pos.did_page_crc = 0;
 
        /* fill in crc (except data pages), footer */
-       con->out_msg->hdr.crc = cpu_to_le32(crc32c_le(0, (void *)&m->hdr,
-                                         sizeof(m->hdr) - sizeof(m->hdr.crc)));
+       con->out_msg->hdr.crc =
+               cpu_to_le32(crc32c_le(0, (void *)&m->hdr,
+                                     sizeof(m->hdr) - sizeof(m->hdr.crc)));
        con->out_msg->footer.flags = 0;
-       con->out_msg->footer.front_crc = cpu_to_le32(crc32c_le(0, m->front.iov_base,
-                                                  m->front.iov_len));
+       con->out_msg->footer.front_crc =
+               cpu_to_le32(crc32c_le(0, m->front.iov_base, m->front.iov_len));
        con->out_msg->footer.data_crc = 0;
 
+       /* is there a data payload? */
+       if (le32_to_cpu(m->hdr.data_len) == 0) {
+               /* initialize page iterator */
+               con->out_msg_pos.page = 0;
+               con->out_msg_pos.page_pos =
+                       le32_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
+               con->out_msg_pos.data_pos = 0;
+               con->out_msg_pos.did_page_crc = 0;
+               con->out_more = 1;  /* data + footer will follow */
+       } else {
+               /* no, queue up footer too and be done */
+               prepare_write_message_footer(con, v);   
+       }
+
        set_bit(WRITE_PENDING, &con->state);
 }
 
 /*
- * prepare an ack for send
+ * Prepare an ack.
  */
 static void prepare_write_ack(struct ceph_connection *con)
 {
@@ -838,8 +683,8 @@ static void prepare_write_ack(struct ceph_connection *con)
 
        con->out_kvec[0].iov_base = &tag_ack;
        con->out_kvec[0].iov_len = 1;
-       con->out32 = cpu_to_le32(con->in_seq_acked);
-       con->out_kvec[1].iov_base = &con->out32;
+       con->out_temp_ack = cpu_to_le32(con->in_seq_acked);
+       con->out_kvec[1].iov_base = &con->out_temp_ack;
        con->out_kvec[1].iov_len = 4;
        con->out_kvec_left = 2;
        con->out_kvec_bytes = 1 + 4;
@@ -848,25 +693,18 @@ static void prepare_write_ack(struct ceph_connection *con)
        set_bit(WRITE_PENDING, &con->state);
 }
 
-static void prepare_read_connect(struct ceph_connection *con)
-{
-       con->in_base_pos = 0;
-}
-
-static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
-{
-       u32 ret;
-       spin_lock(&msgr->global_seq_lock);
-       if (msgr->global_seq < gt)
-               msgr->global_seq = gt;
-       ret = ++msgr->global_seq;
-       spin_unlock(&msgr->global_seq_lock);
-       return ret;
-}
+/*
+ * Connection negotiation.
+ */
 
+/*
+ * We connected to a peer and are saying hello.
+ */
 static void prepare_write_connect(struct ceph_messenger *msgr,
                                  struct ceph_connection *con)
 {
+       int len = strlen(CEPH_BANNER);
+
        con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
        con->out_connect.global_seq = cpu_to_le32(con->global_seq);
        con->out_connect.flags = 0;
@@ -874,14 +712,13 @@ static void prepare_write_connect(struct ceph_messenger *msgr,
                con->out_connect.flags = CEPH_MSG_CONNECT_LOSSYTX;
 
        con->out_kvec[0].iov_base = CEPH_BANNER;
-       con->out_kvec[0].iov_len = strlen(CEPH_BANNER);
+       con->out_kvec[0].iov_len = len;
        con->out_kvec[1].iov_base = &msgr->inst.addr;
        con->out_kvec[1].iov_len = sizeof(msgr->inst.addr);
        con->out_kvec[2].iov_base = &con->out_connect;
        con->out_kvec[2].iov_len = sizeof(con->out_connect);
        con->out_kvec_left = 3;
-       con->out_kvec_bytes = strlen(CEPH_BANNER) +
-               sizeof(msgr->inst.addr) +
+       con->out_kvec_bytes = len + sizeof(msgr->inst.addr) +
                sizeof(con->out_connect);
        con->out_kvec_cur = con->out_kvec;
        con->out_more = 0;
@@ -903,6 +740,9 @@ static void prepare_write_connect_retry(struct ceph_messenger *msgr,
        set_bit(WRITE_PENDING, &con->state);
 }
 
+/*
+ * We accepted a connection and are saying hello.
+ */
 static void prepare_write_accept_hello(struct ceph_messenger *msgr,
                                       struct ceph_connection *con)
 {
@@ -919,7 +759,10 @@ static void prepare_write_accept_hello(struct ceph_messenger *msgr,
        set_bit(WRITE_PENDING, &con->state);
 }
 
-static void prepare_write_accept_reply(struct ceph_connection *con, char *ptag)
+/*
+ * Write a single protocol control message (byte).
+ */
+static void prepare_write_tag(struct ceph_connection *con, char *ptag)
 {
        con->out_kvec[0].iov_base = ptag;
        con->out_kvec[0].iov_len = 1;
@@ -930,6 +773,9 @@ static void prepare_write_accept_reply(struct ceph_connection *con, char *ptag)
        set_bit(WRITE_PENDING, &con->state);
 }
 
+/*
+ * Negotiation succeeded on an incoming connection, tell the peer.
+ */
 static void prepare_write_accept_ready(struct ceph_connection *con)
 {
        con->out_connect.flags = 0;
@@ -947,6 +793,10 @@ static void prepare_write_accept_ready(struct ceph_connection *con)
        set_bit(WRITE_PENDING, &con->state);
 }
 
+/*
+ * The connecting peer needs to try again with a larger connect_seq or
+ * global_seq (as indicated by *ptag).
+ */
 static void prepare_write_accept_retry(struct ceph_connection *con, char *ptag,
                                       __le32 *pseq)
 {
@@ -960,384 +810,221 @@ static void prepare_write_accept_retry(struct ceph_connection *con, char *ptag,
        con->out_more = 0;
        set_bit(WRITE_PENDING, &con->state);
 
-       /* we'll re-read the connect request, but not the hello */
+       /* we'll re-read the connect request, sans the hello + addr */
        con->in_base_pos = strlen(CEPH_BANNER) + sizeof(con->msgr->inst.addr);
 }
 
+
+
+
 /*
- * worker function when socket is writeable
+ * write as much of pending kvecs to the socket as we can.
+ *  1 -> done
+ *  0 -> socket full, but more to do
+ * <0 -> error
  */
-static int try_write(struct ceph_connection *con)
+static int write_partial_kvec(struct ceph_connection *con)
 {
-       struct ceph_messenger *msgr = con->msgr;
-       int ret = 1;
-
-       dout(30, "try_write start %p state %lu nref %d\n", con, con->state,
-            atomic_read(&con->nref));
-
-more:
-       dout(30, "try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
+       int ret;
 
-       /* initiate connect? */
-       if (con->sock == NULL) {
-               if (test_and_clear_bit(STANDBY, &con->state))
-                       con->connect_seq++;
-               con->global_seq = get_global_seq(msgr, 0);
-               prepare_write_connect(msgr, con);
-               prepare_read_connect(con);
-               set_bit(CONNECTING, &con->state);
-               con->in_tag = CEPH_MSGR_TAG_READY;
-               dout(5, "try_write initiating connect on %p new state %lu\n",
-                    con, con->state);
-               BUG_ON(con->sock);
-               con->sock = ceph_tcp_connect(con);
-               dout(10, "tcp_connect returned %p\n", con->sock);
-               if (IS_ERR(con->sock)) {
-                       con->sock = NULL;
-                       con->error_msg = "connect error";
-                       ret = -1;
+       dout(10, "write_partial_kvec %p %d left\n", con, con->out_kvec_bytes);
+       while (con->out_kvec_bytes > 0) {
+               ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
+                                      con->out_kvec_left, con->out_kvec_bytes,
+                                      con->out_more);
+               if (ret <= 0)
                        goto out;
+               con->out_kvec_bytes -= ret;
+               if (con->out_kvec_bytes == 0)
+                       break;            /* done */
+               while (ret > 0) {
+                       if (ret >= con->out_kvec_cur->iov_len) {
+                               ret -= con->out_kvec_cur->iov_len;
+                               con->out_kvec_cur++;
+                               con->out_kvec_left--;
+                       } else {
+                               con->out_kvec_cur->iov_len -= ret;
+                               con->out_kvec_cur->iov_base += ret;
+                               ret = 0;
+                               break;
+                       }
                }
        }
+       con->out_kvec_left = 0;
+       ret = 1;
+out:
+       dout(30, "write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
+            con->out_kvec_bytes, con->out_kvec_left, ret);
+       return ret;  /* done! */
+}
 
-       /* kvec data queued? */
-more_kvec:
-       if (con->out_kvec_left) {
-               ret = write_partial_kvec(con);
-               if (ret == 0)
-                       goto done;
-               if (ret < 0) {
-                       dout(30, "try_write write_partial_kvec err %d\n", ret);
-                       goto done;
+/*
+ * Write as much message data payload as we can.  If we finish, queue
+ * up the footer.
+ *  1 -> done, footer is now queued in out_kvec[].
+ *  0 -> socket full, but more to do
+ * <0 -> error
+ */
+static int write_partial_msg_pages(struct ceph_connection *con)
+{
+       struct ceph_client *client = con->msgr->parent;
+       struct ceph_msg *msg = con->out_msg;
+       unsigned data_len = le32_to_cpu(msg->hdr.data_len);
+       size_t len;
+       int crc = !(client->mount_args.flags & CEPH_MOUNT_NOCRC);
+       int ret;
+
+       dout(30, "write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
+            con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
+            con->out_msg_pos.page_pos);
+
+       while (con->out_msg_pos.page < con->out_msg->nr_pages) {
+               struct page *page = NULL;
+               void *kaddr = NULL;
+
+               /*
+                * if we are calculating the data crc (the default), we need
+                * to map the page.  if our pages[] has been revoked, use the
+                * zero page.
+                */
+               mutex_lock(&msg->page_mutex);
+               if (msg->pages) {
+                       page = msg->pages[con->out_msg_pos.page];
+                       if (crc)
+                               kaddr = kmap(page);
+               } else {
+                       page = con->msgr->zero_page;
+                       if (crc)
+                               kaddr = page_address(con->msgr->zero_page);
                }
-       }
+               len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
+                         (int)(data_len - con->out_msg_pos.data_pos));
+               if (crc && !con->out_msg_pos.did_page_crc) {
+                       void *base = kaddr + con->out_msg_pos.page_pos;
+                       u32 crc = le32_to_cpu(con->out_msg->footer.data_crc);
 
-       /* msg pages? */
-       if (con->out_msg) {
-               ret = write_partial_msg_pages(con, con->out_msg);
-               if (ret == 1)
-                       goto more_kvec;
-               if (ret == 0)
-                       goto done;
-               if (ret < 0) {
-                       dout(30, "try_write write_partial_msg_pages err %d\n",
-                            ret);
-                       goto done;
+                       con->out_msg->footer.data_crc =
+                               cpu_to_le32(crc32c_le(crc, base, len));
+                       con->out_msg_pos.did_page_crc = 1;
                }
-       }
-       con->out_msg = NULL; /* done with this message. */
 
-       /* anything else pending? */
-       spin_lock(&con->out_queue_lock);
-       if (!list_empty(&con->out_queue)) {
-               prepare_write_message(con);
-       } else if (con->in_seq > con->in_seq_acked) {
-               prepare_write_ack(con);
-       } else {
-               clear_bit(WRITE_PENDING, &con->state);
-               /* hmm, nothing to do! No more writes pending? */
-               dout(30, "try_write nothing else to write.\n");
-               spin_unlock(&con->out_queue_lock);
-               goto done;
+               ret = kernel_sendpage(con->sock, page,
+                                     con->out_msg_pos.page_pos, len,
+                                     MSG_DONTWAIT | MSG_NOSIGNAL |
+                                     MSG_MORE);
+               
+               if (crc && msg->pages)
+                       kunmap(page);
+
+               mutex_unlock(&msg->page_mutex);
+               if (ret <= 0)
+                       goto out;
+
+               con->out_msg_pos.data_pos += ret;
+               con->out_msg_pos.page_pos += ret;
+               if (ret == len) {
+                       con->out_msg_pos.page_pos = 0;
+                       con->out_msg_pos.page++;
+                       con->out_msg_pos.did_page_crc = 0;
+               }
        }
-       spin_unlock(&con->out_queue_lock);
-       goto more;
 
-done:
-       ret = 0;
+       dout(30, "write_partial_msg_pages %p msg %p done\n", con, msg);
+
+       /* prepare and queue up footer, too */
+       if (!crc)
+               con->out_msg->footer.flags |=
+                       cpu_to_le32(CEPH_MSG_FOOTER_NOCRC);
+       con->out_kvec_bytes = 0;
+       con->out_kvec_left = 0; 
+       con->out_kvec_cur = con->out_kvec;
+       prepare_write_message_footer(con, 0);
+       ret = 1;
 out:
-       dout(30, "try_write done on %p\n", con);
        return ret;
 }
 
+
+
+/*
+ * Prepare to read connection handshake, or an ack.
+ */
+static void prepare_read_connect(struct ceph_connection *con)
+{
+       con->in_base_pos = 0;
+}
+
+static void prepare_read_ack(struct ceph_connection *con)
+{
+       con->in_base_pos = 0;
+}
+
 /*
- * prepare to read a message
+ * Prepare to read a message.
  */
 static int prepare_read_message(struct ceph_connection *con)
 {
        int err;
+
        BUG_ON(con->in_msg != NULL);
        con->in_base_pos = 0;
        con->in_msg = ceph_msg_new(0, 0, 0, 0, NULL);
        if (IS_ERR(con->in_msg)) {
-               /* TBD: we don't check for error in caller, handle it here? */
                err = PTR_ERR(con->in_msg);
                con->in_msg = NULL;
-               derr(1, "kmalloc failure on incoming message %d\n", err);
+               con->error_msg = "out of memory for incoming message";
                return err;
        }
        con->in_front_crc = con->in_data_crc = 0;
        return 0;
 }
 
+
+
 /*
- * read (part of) a message
+ * Read all or part of the connect-side handshake on a new connection
  */
-static int read_message_partial(struct ceph_connection *con)
+static int read_partial_connect(struct ceph_connection *con)
 {
-       struct ceph_msg *m = con->in_msg;
-       void *p;
-       int ret;
-       int to, want, left;
-       unsigned front_len, data_len, data_off;
+       int ret, to;
+       dout(20, "read_partial_connect %p at %d\n", con, con->in_base_pos);
 
-       dout(20, "read_message_partial con %p msg %p\n", con, m);
+       /* peer's banner */
+       to = strlen(CEPH_BANNER);
+       while (con->in_base_pos < to) {
+               int left = to - con->in_base_pos;
+               int have = con->in_base_pos;
+               ret = ceph_tcp_recvmsg(con->sock,
+                                      (char *)&con->in_banner + have,
+                                      left);
+               if (ret <= 0)
+                       goto out;
+               con->in_base_pos += ret;
+       }
 
-       /* header */
-       while (con->in_base_pos < sizeof(m->hdr)) {
-               left = sizeof(m->hdr) - con->in_base_pos;
+       /* peer's addr */
+       to += sizeof(con->actual_peer_addr);
+       while (con->in_base_pos < to) {
+               int left = to - con->in_base_pos;
+               int have = sizeof(con->actual_peer_addr) - left;
                ret = ceph_tcp_recvmsg(con->sock,
-                                      (char *)&m->hdr + con->in_base_pos,
+                                      (char *)&con->actual_peer_addr + have,
                                       left);
                if (ret <= 0)
-                       return ret;
+                       goto out;
                con->in_base_pos += ret;
-               if (con->in_base_pos == sizeof(m->hdr)) {
-                       u32 crc = crc32c_le(0, (void *)&m->hdr,
-                                   sizeof(m->hdr) - sizeof(m->hdr.crc));
-                       if (crc != le32_to_cpu(m->hdr.crc)) {
-                               derr(0, "read_message_partial %p bad hdr crc"
-                                    " %u != expected %u\n",
-                                    m, crc, m->hdr.crc);
-                               return -EIO;
-                       }
-               }
        }
 
-       /* front */
-       front_len = le32_to_cpu(m->hdr.front_len);
-       while (m->front.iov_len < front_len) {
-               if (m->front.iov_base == NULL) {
-                       m->front.iov_base = kmalloc(front_len, GFP_NOFS);
-                       if (m->front.iov_base == NULL)
-                               return -ENOMEM;
-               }
-               left = front_len - m->front.iov_len;
-               ret = ceph_tcp_recvmsg(con->sock, (char *)m->front.iov_base +
-                                      m->front.iov_len, left);
+       /* in_tag */
+       to += 1;
+       if (con->in_base_pos < to) {
+               ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
                if (ret <= 0)
-                       return ret;
-               m->front.iov_len += ret;
-               if (m->front.iov_len == front_len)
-                       con->in_front_crc = crc32c_le(0, m->front.iov_base,
-                                                     m->front.iov_len);
-       }
-
-       /* (page) data */
-       data_len = le32_to_cpu(m->hdr.data_len);
-       data_off = le32_to_cpu(m->hdr.data_off);
-       if (data_len == 0)
-               goto no_data;
-       if (m->nr_pages == 0) {
-               con->in_msg_pos.page = 0;
-               con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
-               con->in_msg_pos.data_pos = 0;
-               /* find pages for data payload */
-               want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
-               ret = 0;
-               BUG_ON(!con->msgr->prepare_pages);
-               ret = con->msgr->prepare_pages(con->msgr->parent, m, want);
-               if (ret < 0) {
-                       dout(10, "prepare_pages failed, skipping payload\n");
-                       con->in_base_pos = -data_len - sizeof(m->footer);
-                       ceph_msg_put(con->in_msg);
-                       con->in_msg = NULL;
-                       con->in_tag = CEPH_MSGR_TAG_READY;
-                       return 0;
-               } else {
-                       BUG_ON(m->nr_pages < want);
-               }
-       }
-       while (con->in_msg_pos.data_pos < data_len) {
-               left = min((int)(data_len - con->in_msg_pos.data_pos),
-                          (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
-               mutex_lock(&m->page_mutex);
-               if (!m->pages) {
-                       dout(10, "pages revoked during msg read\n");
-                       mutex_unlock(&m->page_mutex);
-                       con->in_base_pos = con->in_msg_pos.data_pos - data_len -
-                               sizeof(m->footer);
-                       ceph_msg_put(m);
-                       con->in_msg = NULL;
-                       con->in_tag = CEPH_MSGR_TAG_READY;
-                       return 0;
-               }
-               p = kmap(m->pages[con->in_msg_pos.page]);
-               ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
-                                      left);
-               if (ret > 0)
-                       con->in_data_crc =
-                               crc32c_le(con->in_data_crc,
-                                         p + con->in_msg_pos.page_pos, ret);
-               kunmap(m->pages[con->in_msg_pos.page]);
-               mutex_unlock(&m->page_mutex);
-               if (ret <= 0)
-                       return ret;
-               con->in_msg_pos.data_pos += ret;
-               con->in_msg_pos.page_pos += ret;
-               if (con->in_msg_pos.page_pos == PAGE_SIZE) {
-                       con->in_msg_pos.page_pos = 0;
-                       con->in_msg_pos.page++;
-               }
-       }
-
-no_data:
-       /* footer */
-       to = sizeof(m->hdr) + sizeof(m->footer);
-       while (con->in_base_pos < to) {
-               left = to - con->in_base_pos;
-               ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer +
-                                      (con->in_base_pos - sizeof(m->hdr)),
-                                      left);
-               if (ret <= 0)
-                       return ret;
-               con->in_base_pos += ret;
-       }
-       dout(20, "read_message_partial got msg %p\n", m);
-
-       /* crc ok? */
-       if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
-               derr(0, "read_message_partial %p front crc %u != expected %u\n",
-                    con->in_msg,
-                    con->in_front_crc, m->footer.front_crc);
-               return -EIO;
-       }
-       if (con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
-               derr(0, "read_message_partial %p data crc %u != expected %u\n",
-                    con->in_msg,
-                    con->in_data_crc, m->footer.data_crc);
-               return -EIO;
-       }
-
-       /* did i learn my ip? */
-       if (con->msgr->inst.addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) {
-               /*
-                * in practice, we learn our ip from the first incoming mon
-                * message, before anyone else knows we exist, so this is
-                * safe.
-                */
-               con->msgr->inst.addr.ipaddr = con->in_msg->hdr.dst.addr.ipaddr;
-               dout(10, "read_message_partial learned my addr is "
-                    "%u.%u.%u.%u:%u\n",
-                    IPQUADPORT(con->msgr->inst.addr.ipaddr));
-       }
-
-       return 1; /* done! */
-}
-
-static void process_message(struct ceph_connection *con)
-{
-       /* if first message, set peer_name */
-       if (con->peer_name.type == 0)
-               con->peer_name = con->in_msg->hdr.src.name;
-
-       spin_lock(&con->out_queue_lock);
-       con->in_seq++;
-       spin_unlock(&con->out_queue_lock);
-
-       dout(1, "===== %p %llu from %s%d %d=%s len %d+%d (%u %u) =====\n",
-            con->in_msg, le64_to_cpu(con->in_msg->hdr.seq),
-            ENTITY_NAME(con->in_msg->hdr.src.name),
-            le16_to_cpu(con->in_msg->hdr.type),
-            ceph_msg_type_name(le16_to_cpu(con->in_msg->hdr.type)),
-            le32_to_cpu(con->in_msg->hdr.front_len),
-            le32_to_cpu(con->in_msg->hdr.data_len),
-            con->in_front_crc, con->in_data_crc);
-       con->msgr->dispatch(con->msgr->parent, con->in_msg);
-       con->in_msg = NULL;
-       con->in_tag = CEPH_MSGR_TAG_READY;
-}
-
-/*
- * prepare to read an ack
- */
-static void prepare_read_ack(struct ceph_connection *con)
-{
-       con->in_base_pos = 0;
-}
-
-/*
- * read (part of) an ack
- */
-static int read_ack_partial(struct ceph_connection *con)
-{
-       while (con->in_base_pos < sizeof(con->in_partial_ack)) {
-               int left = sizeof(con->in_partial_ack) - con->in_base_pos;
-               int ret = ceph_tcp_recvmsg(con->sock,
-                                          (char *)&con->in_partial_ack +
-                                          con->in_base_pos, left);
-               if (ret <= 0)
-                       return ret;
-               con->in_base_pos += ret;
-       }
-       return 1; /* done */
-}
-
-static void process_ack(struct ceph_connection *con)
-{
-       struct ceph_msg *m;
-       u32 ack = le32_to_cpu(con->in_partial_ack);
-       u64 seq;
-
-       spin_lock(&con->out_queue_lock);
-       while (!list_empty(&con->out_sent)) {
-               m = list_entry(con->out_sent.next, struct ceph_msg, list_head);
-               seq = le64_to_cpu(m->hdr.seq);
-               if (seq > ack)
-                       break;
-               dout(5, "got ack for seq %llu type %d at %p\n", seq,
-                    le16_to_cpu(m->hdr.type), m);
-               list_del_init(&m->list_head);
-               ceph_msg_put(m);
-       }
-       spin_unlock(&con->out_queue_lock);
-       con->in_tag = CEPH_MSGR_TAG_READY;
-}
-
-
-/*
- * read portion of connect-side handshake on a new connection
- */
-static int read_connect_partial(struct ceph_connection *con)
-{
-       int ret, to;
-       dout(20, "read_connect_partial %p at %d\n", con, con->in_base_pos);
-
-       /* peer's banner */
-       to = strlen(CEPH_BANNER);
-       while (con->in_base_pos < to) {
-               int left = to - con->in_base_pos;
-               int have = con->in_base_pos;
-               ret = ceph_tcp_recvmsg(con->sock,
-                                      (char *)&con->in_banner + have,
-                                      left);
-               if (ret <= 0)
-                       goto out;
-               con->in_base_pos += ret;
-       }
-
-       /* peer's addr */
-       to += sizeof(con->actual_peer_addr);
-       while (con->in_base_pos < to) {
-               int left = to - con->in_base_pos;
-               int have = sizeof(con->actual_peer_addr) - left;
-               ret = ceph_tcp_recvmsg(con->sock,
-                                      (char *)&con->actual_peer_addr + have,
-                                      left);
-               if (ret <= 0)
-                       goto out;
-               con->in_base_pos += ret;
-       }
-
-       /* in_tag */
-       to += 1;
-       if (con->in_base_pos < to) {
-               ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
-               if (ret <= 0)
-                       goto out;
-               con->in_base_pos += ret;
+                       goto out;
+               con->in_base_pos += ret;
        }
 
+       /* TAG_READY is followed by a u8 flags */
        if (con->in_tag == CEPH_MSGR_TAG_READY) {
                to++;
                if (con->in_base_pos < to) {
@@ -1349,6 +1036,7 @@ static int read_connect_partial(struct ceph_connection *con)
                }
        }
 
+       /* TAG_RETRY_SESSION is followed by a __le32 connect_seq */
        if (con->in_tag == CEPH_MSGR_TAG_RETRY_SESSION) {
                /* peer's connect_seq */
                to += sizeof(con->in_connect.connect_seq);
@@ -1363,6 +1051,8 @@ static int read_connect_partial(struct ceph_connection *con)
                        con->in_base_pos += ret;
                }
        }
+
+       /* TAG_RETRY_SESSION is followed by a __le32 global_seq */
        if (con->in_tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
                /* peer's global_seq */
                to += sizeof(con->in_connect.global_seq);
@@ -1377,18 +1067,31 @@ static int read_connect_partial(struct ceph_connection *con)
                        con->in_base_pos += ret;
                }
        }
-       ret = 1;
-out:
-       dout(20, "read_connect_partial %p end at %d ret %d\n", con,
-            con->in_base_pos, ret);
-       dout(20, "read_connect_partial connect_seq = %u, global_seq = %u\n",
-            le32_to_cpu(con->in_connect.connect_seq),
+       ret = 1;  /* done */
+       dout(20, "read_partial_connect %p connect_seq = %u, global_seq = %u\n",
+            con, le32_to_cpu(con->in_connect.connect_seq),
             le32_to_cpu(con->in_connect.global_seq));
-       return ret; /* done */
+out:
+       return ret;
+}
+
+/*
+ * Verify the hello banner looks okay.
+ */
+static int verify_hello(struct ceph_connection *con)
+{
+       if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
+               derr(10, "connection to/from %u.%u.%u.%u:%u has bad banner\n",
+                    IPQUADPORT(con->peer_addr.ipaddr));
+               con->error_msg = "protocol error, bad banner";
+               return -1;
+       }
+       return 0;
 }
 
 /*
- * Reset a connection
+ * Reset a connection.  Discard all incoming and outgoing messages
+ * and clear *_seq state.
  */
 static void reset_connection(struct ceph_connection *con)
 {
@@ -1410,16 +1113,6 @@ static void reset_connection(struct ceph_connection *con)
        spin_unlock(&con->out_queue_lock);
 }
 
-static int verify_hello(struct ceph_connection *con)
-{
-       if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
-               derr(10, "connection from %u.%u.%u.%u:%u with bad banner\n",
-                    IPQUADPORT(con->peer_addr.ipaddr));
-               con->error_msg = "protocol error, bad banner";
-               return -1;
-       }
-       return 0;
-}
 
 static int process_connect(struct ceph_connection *con)
 {
@@ -1428,7 +1121,11 @@ static int process_connect(struct ceph_connection *con)
        if (verify_hello(con) < 0)
                return -1;
 
-       /* verify peer addr */
+       /*
+        * Make sure the other end is who we wanted.  note that the other
+        * end may not yet know their ip address, so if it's 0.0.0.0, give
+        * them the benefit of the doubt.
+        */
        if (!ceph_entity_addr_is_local(&con->peer_addr,
                                       &con->actual_peer_addr) &&
            con->actual_peer_addr.ipaddr.sin_addr.s_addr != 0) {
@@ -1444,15 +1141,27 @@ static int process_connect(struct ceph_connection *con)
 
        switch (con->in_tag) {
        case CEPH_MSGR_TAG_RESETSESSION:
+               /*
+                * If we connected with a large connect_seq but the peer
+                * has no record of a session with us (no connection, or
+                * connect_seq == 0), they will send RESETSESION to indicate
+                * that they must have reset their session, and may have
+                * dropped messages.
+                */
                dout(10, "process_connect got RESET peer seq %u\n",
                     le32_to_cpu(con->in_connect.connect_seq));
                reset_connection(con);
                prepare_write_connect_retry(con->msgr, con);
                prepare_read_connect(con);
+               /* Tell ceph about it. */
                con->msgr->peer_reset(con->msgr->parent, &con->peer_addr,
                                      &con->peer_name);
                break;
        case CEPH_MSGR_TAG_RETRY_SESSION:
+               /*
+                * If we sent a smaller connect_seq than the peer has, try
+                * again with a larger value.
+                */
                dout(10,
                     "process_connect got RETRY my seq = %u, peer_seq = %u\n",
                     le32_to_cpu(con->out_connect.connect_seq),
@@ -1462,6 +1171,10 @@ static int process_connect(struct ceph_connection *con)
                prepare_read_connect(con);
                break;
        case CEPH_MSGR_TAG_RETRY_GLOBAL:
+               /*
+                * If we sent a smaller global_seq than the peer has, try
+                * again with a larger value.
+                */
                dout(10,
                     "process_connect got RETRY_GLOBAL my %u, peer_gseq = %u\n",
                     con->global_seq, le32_to_cpu(con->in_connect.global_seq));
@@ -1472,6 +1185,12 @@ static int process_connect(struct ceph_connection *con)
                prepare_read_connect(con);
                break;
        case CEPH_MSGR_TAG_WAIT:
+               /*
+                * If there is a connection race (we are opening connections to
+                * each other), one of us may just have to WAIT.  We will keep
+                * our queued messages, in expectation of being replaced by an
+                * incoming connection.
+                */
                dout(10, "process_connect peer connecting WAIT\n");
                set_bit(WAIT, &con->state);
                con_close_socket(con);
@@ -1479,8 +1198,9 @@ static int process_connect(struct ceph_connection *con)
        case CEPH_MSGR_TAG_READY:
                dout(10, "process_connect got READY, now open\n");
                clear_bit(CONNECTING, &con->state);
-               con->lossy_rx = con->in_flags & CEPH_MSG_CONNECT_LOSSYTX;
-               con->delay = 0;  /* reset backoffmemory */
+               if (con->in_flags & CEPH_MSG_CONNECT_LOSSYTX)
+                       set_bit(LOSSYRX, &con->state);
+               con->delay = 0;  /* reset backoff memory */
                break;
        default:
                derr(1, "process_connect protocol error, will retry\n");
@@ -1492,9 +1212,10 @@ static int process_connect(struct ceph_connection *con)
 
 
 /*
- * read portion of accept-side handshake on a newly accepted connection
+ * Read all or part of the accept-side handshake on a newly accepted
+ * connection.
  */
-static int read_accept_partial(struct ceph_connection *con)
+static int read_partial_accept(struct ceph_connection *con)
 {
        int ret;
        int to;
@@ -1536,145 +1257,445 @@ static int read_accept_partial(struct ceph_connection *con)
                con->in_base_pos += ret;
        }
 
+       return 1; /* done */
+}
+
+/*
+ * Call after a new connection's handshake has been read.
+ */
+static int process_accept(struct ceph_connection *con)
+{
+       struct ceph_connection *existing;
+       struct ceph_messenger *msgr = con->msgr;
+       u32 peer_gseq = le32_to_cpu(con->in_connect.global_seq);
+       u32 peer_cseq = le32_to_cpu(con->in_connect.connect_seq);
+
+       if (verify_hello(con) < 0)
+               return -1;
+
+       /* note flags */
+       if (con->in_flags & CEPH_MSG_CONNECT_LOSSYTX)
+               set_bit(LOSSYRX, &con->state);
+
+       /* do we have an existing connection for this peer? */
+       if (radix_tree_preload(GFP_NOFS) < 0) {
+               derr(10, "ENOMEM in process_accept\n");
+               con->error_msg = "out of memory";
+               return -1;
+       }
+       spin_lock(&msgr->con_lock);
+       existing = __get_connection(msgr, &con->peer_addr);
+       if (existing) {
+               if (peer_gseq < existing->global_seq) {
+                       /* retry_global */
+                       con->global_seq = existing->global_seq;
+                       con->out_connect.global_seq =
+                               cpu_to_le32(con->global_seq);
+                       prepare_write_accept_retry(con,
+                                          &tag_retry_global,
+                                          &con->out_connect.global_seq);
+               } else if (test_bit(LOSSYTX, &existing->state)) {
+                       dout(20, "process_accept %p replacing LOSSYTX %p\n",
+                            con, existing);
+                       reset_connection(existing);
+                       __replace_connection(msgr, existing, con);
+                       prepare_write_accept_ready(con);
+               } else if (peer_cseq < existing->connect_seq) {
+                       if (peer_cseq == 0) {
+                               reset_connection(existing);
+                               __replace_connection(msgr, existing, con);
+                               prepare_write_accept_ready(con);
+                               con->msgr->peer_reset(con->msgr->parent,
+                                                     &con->peer_addr,
+                                                     &con->peer_name);
+                       } else {
+                               /* old attempt or peer didn't get the READY */
+                               /* send retry with peers connect seq */
+                               con->connect_seq = existing->connect_seq;
+                               con->out_connect.connect_seq =
+                                       cpu_to_le32(con->connect_seq);
+                               prepare_write_accept_retry(con,
+                                       &tag_retry_session,
+                                       &con->out_connect.connect_seq);
+                       }
+               } else if (peer_cseq == existing->connect_seq &&
+                          (test_bit(CONNECTING, &existing->state) ||
+                           test_bit(STANDBY, &existing->state) ||
+                           test_bit(WAIT, &existing->state))) {
+                       /* connection race */
+                       dout(20, "process_accept connection race state = %lu\n",
+                            con->state);
+                       if (ceph_entity_addr_equal(&msgr->inst.addr,
+                                                  &con->peer_addr)) {
+                               /* incoming connection wins.. */
+                               /* replace existing with new connection */
+                               __replace_connection(msgr, existing, con);
+                               prepare_write_accept_ready(con);
+                       } else {
+                               /* our existing outgoing connection wins..
+                                  tell peer to wait for our outgoing
+                                  connection to go through */
+                               prepare_write_tag(con, &tag_wait);
+                       }
+               } else if (existing->connect_seq == 0 &&
+                          peer_cseq > existing->connect_seq) {
+                       /* we reset and already reconnecting */
+                       prepare_write_tag(con, &tag_reset);
+               } else {
+                       /* reconnect case, replace connection */
+                       __replace_connection(msgr, existing, con);
+                       prepare_write_accept_ready(con);
+               }
+               put_connection(existing);
+       } else if (peer_cseq > 0) {
+               dout(20, "process_accept no existing connection, we reset\n");
+               prepare_write_tag(con, &tag_reset);
+       } else {
+               dout(20, "process_accept no existing connection, opening\n");
+               __register_connection(msgr, con);
+               con->global_seq = peer_gseq;
+               con->connect_seq = peer_cseq + 1;
+               prepare_write_accept_ready(con);
+       }
+       spin_unlock(&msgr->con_lock);
+       radix_tree_preload_end();
+
+       ceph_queue_con(con);
+       put_connection(con);
+       return 0;
+}
+
+/*
+ * read (part of) an ack
+ */
+static int read_partial_ack(struct ceph_connection *con)
+{
+       while (con->in_base_pos < sizeof(con->in_temp_ack)) {
+               int left = sizeof(con->in_temp_ack) - con->in_base_pos;
+               int ret = ceph_tcp_recvmsg(con->sock,
+                                          (char *)&con->in_temp_ack +
+                                          con->in_base_pos, left);
+               if (ret <= 0)
+                       return ret;
+               con->in_base_pos += ret;
+       }
+       return 1; /* done */
+}
+
+/*
+ * We can finally discard anything that's been acked.
+ */
+static void process_ack(struct ceph_connection *con)
+{
+       struct ceph_msg *m;
+       u32 ack = le32_to_cpu(con->in_temp_ack);
+       u64 seq;
+
+       spin_lock(&con->out_queue_lock);
+       while (!list_empty(&con->out_sent)) {
+               m = list_entry(con->out_sent.next, struct ceph_msg, list_head);
+               seq = le64_to_cpu(m->hdr.seq);
+               if (seq > ack)
+                       break;
+               dout(5, "got ack for seq %llu type %d at %p\n", seq,
+                    le16_to_cpu(m->hdr.type), m);
+               list_del_init(&m->list_head);
+               ceph_msg_put(m);
+       }
+       spin_unlock(&con->out_queue_lock);
+       con->in_tag = CEPH_MSGR_TAG_READY;
+}
+
+
+
+
+
+
+/*
+ * read (part of) a message.
+ */
+static int read_partial_message(struct ceph_connection *con)
+{
+       struct ceph_msg *m = con->in_msg;
+       void *p;
+       int ret;
+       int to, want, left;
+       unsigned front_len, data_len, data_off;
+
+       dout(20, "read_partial_message con %p msg %p\n", con, m);
+
+       /* header */
+       while (con->in_base_pos < sizeof(m->hdr)) {
+               left = sizeof(m->hdr) - con->in_base_pos;
+               ret = ceph_tcp_recvmsg(con->sock,
+                                      (char *)&m->hdr + con->in_base_pos,
+                                      left);
+               if (ret <= 0)
+                       return ret;
+               con->in_base_pos += ret;
+               if (con->in_base_pos == sizeof(m->hdr)) {
+                       u32 crc = crc32c_le(0, (void *)&m->hdr,
+                                   sizeof(m->hdr) - sizeof(m->hdr.crc));
+                       if (crc != le32_to_cpu(m->hdr.crc)) {
+                               derr(0, "read_partial_message %p bad hdr crc"
+                                    " %u != expected %u\n",
+                                    m, crc, m->hdr.crc);
+                               return -EIO;
+                       }
+               }
+       }
+
+       /* front */
+       front_len = le32_to_cpu(m->hdr.front_len);
+       while (m->front.iov_len < front_len) {
+               if (m->front.iov_base == NULL) {
+                       m->front.iov_base = kmalloc(front_len, GFP_NOFS);
+                       if (m->front.iov_base == NULL)
+                               return -ENOMEM;
+               }
+               left = front_len - m->front.iov_len;
+               ret = ceph_tcp_recvmsg(con->sock, (char *)m->front.iov_base +
+                                      m->front.iov_len, left);
+               if (ret <= 0)
+                       return ret;
+               m->front.iov_len += ret;
+               if (m->front.iov_len == front_len)
+                       con->in_front_crc = crc32c_le(0, m->front.iov_base,
+                                                     m->front.iov_len);
+       }
+
+       /* (page) data */
+       data_len = le32_to_cpu(m->hdr.data_len);
+       data_off = le32_to_cpu(m->hdr.data_off);
+       if (data_len == 0)
+               goto no_data;
+       if (m->nr_pages == 0) {
+               con->in_msg_pos.page = 0;
+               con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
+               con->in_msg_pos.data_pos = 0;
+               /* find pages for data payload */
+               want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
+               ret = 0;
+               BUG_ON(!con->msgr->prepare_pages);
+               ret = con->msgr->prepare_pages(con->msgr->parent, m, want);
+               if (ret < 0) {
+                       dout(10, "prepare_pages failed, skipping payload\n");
+                       con->in_base_pos = -data_len - sizeof(m->footer);
+                       ceph_msg_put(con->in_msg);
+                       con->in_msg = NULL;
+                       con->in_tag = CEPH_MSGR_TAG_READY;
+                       return 0;
+               }
+               BUG_ON(m->nr_pages < want);
+       }
+       while (con->in_msg_pos.data_pos < data_len) {
+               left = min((int)(data_len - con->in_msg_pos.data_pos),
+                          (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
+               mutex_lock(&m->page_mutex);
+               if (!m->pages) {
+                       dout(10, "pages revoked during msg read\n");
+                       mutex_unlock(&m->page_mutex);
+                       con->in_base_pos = con->in_msg_pos.data_pos - data_len -
+                               sizeof(m->footer);
+                       ceph_msg_put(m);
+                       con->in_msg = NULL;
+                       con->in_tag = CEPH_MSGR_TAG_READY;
+                       return 0;
+               }
+               p = kmap(m->pages[con->in_msg_pos.page]);
+               ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
+                                      left);
+               if (ret > 0)
+                       con->in_data_crc =
+                               crc32c_le(con->in_data_crc,
+                                         p + con->in_msg_pos.page_pos, ret);
+               kunmap(m->pages[con->in_msg_pos.page]);
+               mutex_unlock(&m->page_mutex);
+               if (ret <= 0)
+                       return ret;
+               con->in_msg_pos.data_pos += ret;
+               con->in_msg_pos.page_pos += ret;
+               if (con->in_msg_pos.page_pos == PAGE_SIZE) {
+                       con->in_msg_pos.page_pos = 0;
+                       con->in_msg_pos.page++;
+               }
+       }
+
+no_data:
+       /* footer */
+       to = sizeof(m->hdr) + sizeof(m->footer);
+       while (con->in_base_pos < to) {
+               left = to - con->in_base_pos;
+               ret = ceph_tcp_recvmsg(con->sock, (char *)&m->footer +
+                                      (con->in_base_pos - sizeof(m->hdr)),
+                                      left);
+               if (ret <= 0)
+                       return ret;
+               con->in_base_pos += ret;
+       }
+       dout(20, "read_partial_message got msg %p\n", m);
+
+       /* crc ok? */
+       if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
+               derr(0, "read_partial_message %p front crc %u != expected %u\n",
+                    con->in_msg,
+                    con->in_front_crc, m->footer.front_crc);
+               return -EIO;
+       }
+       if (con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
+               derr(0, "read_partial_message %p data crc %u != expected %u\n",
+                    con->in_msg,
+                    con->in_data_crc, m->footer.data_crc);
+               return -EIO;
+       }
+
+       /* did i learn my ip? */
+       if (con->msgr->inst.addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) {
+               /*
+                * in practice, we learn our ip from the first incoming mon
+                * message, before anyone else knows we exist, so this is
+                * safe.
+                */
+               con->msgr->inst.addr.ipaddr = con->in_msg->hdr.dst.addr.ipaddr;
+               dout(10, "read_partial_message learned my addr is "
+                    "%u.%u.%u.%u:%u\n",
+                    IPQUADPORT(con->msgr->inst.addr.ipaddr));
+       }
+
+       return 1; /* done! */
+}
+
+/*
+ * Process message.  This happens in the worker thread.  The callback should
+ * be careful not to do anything that waits on other incoming messages or it
+ * may deadlock.
+ */
+static void process_message(struct ceph_connection *con)
+{
+       /* if first message, set peer_name */
+       if (con->peer_name.type == 0)
+               con->peer_name = con->in_msg->hdr.src.name;
+
+       spin_lock(&con->out_queue_lock);
+       con->in_seq++;
+       spin_unlock(&con->out_queue_lock);
+
+       dout(1, "===== %p %llu from %s%d %d=%s len %d+%d (%u %u) =====\n",
+            con->in_msg, le64_to_cpu(con->in_msg->hdr.seq),
+            ENTITY_NAME(con->in_msg->hdr.src.name),
+            le16_to_cpu(con->in_msg->hdr.type),
+            ceph_msg_type_name(le16_to_cpu(con->in_msg->hdr.type)),
+            le32_to_cpu(con->in_msg->hdr.front_len),
+            le32_to_cpu(con->in_msg->hdr.data_len),
+            con->in_front_crc, con->in_data_crc);
+       con->msgr->dispatch(con->msgr->parent, con->in_msg);
+       con->in_msg = NULL;
+       con->in_tag = CEPH_MSGR_TAG_READY;
+}
+
 
-       return 1; /* done */
-}
 
-/*
- * replace another connection
- *  (old and new should be for the _same_ peer,
- *   and thus in the same pos in the radix tree)
- */
-static void __replace_connection(struct ceph_messenger *msgr,
-                                struct ceph_connection *old,
-                                struct ceph_connection *new)
-{
-       /* take old connections message queue */
-       spin_lock(&old->out_queue_lock);
-       if (!list_empty(&old->out_queue))
-               list_splice_init(&new->out_queue, &old->out_queue);
-       spin_unlock(&old->out_queue_lock);
 
-       new->connect_seq = le32_to_cpu(new->in_connect.connect_seq);
-       new->out_seq = old->out_seq;
 
-       /* replace list entry */
-       list_add(&new->list_bucket, &old->list_bucket);
-       list_del_init(&old->list_bucket);
 
-       set_bit(CLOSED, &old->state);
-       put_connection(old); /* dec reference count */
 
-       clear_bit(ACCEPTING, &new->state);
-       prepare_write_accept_ready(new);
-}
 
 /*
- * call after a new connection's handshake has completed
+ * Write something to the socket.  Called in a worker thread when the
+ * socket appears to be writeable and we have something ready to send.
  */
-static int process_accept(struct ceph_connection *con)
+static int try_write(struct ceph_connection *con)
 {
-       struct ceph_connection *existing;
        struct ceph_messenger *msgr = con->msgr;
-       u32 peer_gseq = le32_to_cpu(con->in_connect.global_seq);
-       u32 peer_cseq = le32_to_cpu(con->in_connect.connect_seq);
+       int ret = 1;
 
-       if (verify_hello(con) < 0)
-               return -1;
+       dout(30, "try_write start %p state %lu nref %d\n", con, con->state,
+            atomic_read(&con->nref));
 
-       /* note flags */
-       con->lossy_rx = con->in_flags & CEPH_MSG_CONNECT_LOSSYTX;
+more:
+       dout(30, "try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
 
-       /* connect */
-       /* do we have an existing connection for this peer? */
-       if (radix_tree_preload(GFP_NOFS) < 0) {
-               derr(10, "ENOMEM in process_accept\n");
-               con->error_msg = "out of memory";
-               return -1;
+       /* open the socket first? */
+       if (con->sock == NULL) {
+               /*
+                * if we were STANDBY and are reconnecting _this_
+                * connection, bump connect_seq now.  Always bump
+                * global_seq.
+                */
+               if (test_and_clear_bit(STANDBY, &con->state))
+                       con->connect_seq++;
+               con->global_seq = get_global_seq(msgr, 0);
+
+               prepare_write_connect(msgr, con);
+               prepare_read_connect(con);
+               set_bit(CONNECTING, &con->state);
+
+               con->in_tag = CEPH_MSGR_TAG_READY;
+               dout(5, "try_write initiating connect on %p new state %lu\n",
+                    con, con->state);
+               con->sock = ceph_tcp_connect(con);
+               if (IS_ERR(con->sock)) {
+                       con->sock = NULL;
+                       con->error_msg = "connect error";
+                       ret = -1;
+                       goto out;
+               }
        }
-       spin_lock(&msgr->con_lock);
-       existing = __get_connection(msgr, &con->peer_addr);
-       if (existing) {
-               if (peer_gseq < existing->global_seq) {
-                       /* retry_global */
-                       con->global_seq = existing->global_seq;
-                       con->out_connect.global_seq =
-                               cpu_to_le32(con->global_seq);
-                       prepare_write_accept_retry(con,
-                                          &tag_retry_global,
-                                          &con->out_connect.global_seq);
-               } else if (test_bit(LOSSYTX, &existing->state)) {
-                       dout(20, "process_accept replacing existing LOSSYTX %p\n",
-                            existing);
-                       reset_connection(existing);
-                       __replace_connection(msgr, existing, con);
-               } else if (peer_cseq < existing->connect_seq) {
-                       if (peer_cseq == 0) {
-                               /* reset existing connection */
-                               reset_connection(existing);
-                               /* replace connection */
-                               __replace_connection(msgr, existing, con);
-                               con->msgr->peer_reset(con->msgr->parent,
-                                                     &con->peer_addr,
-                                                     &con->peer_name);
-                       } else {
-                               /* old attempt or peer didn't get the READY */
-                               /* send retry with peers connect seq */
-                               con->connect_seq = existing->connect_seq;
-                               con->out_connect.connect_seq =
-                                       cpu_to_le32(con->connect_seq);
-                               prepare_write_accept_retry(con,
-                                       &tag_retry_session,
-                                       &con->out_connect.connect_seq);
-                       }
-               } else if (peer_cseq == existing->connect_seq &&
-                          (test_bit(CONNECTING, &existing->state) ||
-                           test_bit(STANDBY, &existing->state) ||
-                           test_bit(WAIT, &existing->state))) {
-                       /* connection race */
-                       dout(20, "process_accept connection race state = %lu\n",
-                            con->state);
-                       if (ceph_entity_addr_equal(&msgr->inst.addr,
-                                                  &con->peer_addr)) {
-                               /* incoming connection wins.. */
-                               /* replace existing with new connection */
-                               __replace_connection(msgr, existing, con);
-                       } else {
-                               /* our existing outgoing connection wins..
-                                  tell peer to wait for our outgoing
-                                  connection to go through */
-                               prepare_write_accept_reply(con, &tag_wait);
-                       }
-               } else if (existing->connect_seq == 0 &&
-                          peer_cseq > existing->connect_seq) {
-                       /* we reset and already reconnecting */
-                       prepare_write_accept_reply(con, &tag_reset);
-               } else {
-                       /* reconnect case, replace connection */
-                       __replace_connection(msgr, existing, con);
+
+more_kvec:
+       /* kvec data queued? */
+       if (con->out_kvec_left) {
+               ret = write_partial_kvec(con);
+               if (ret <= 0)
+                       goto done;
+               if (ret < 0) {
+                       dout(30, "try_write write_partial_kvec err %d\n", ret);
+                       goto done;
                }
-               put_connection(existing);
-       } else if (peer_cseq > 0) {
-               dout(20, "process_accept no existing connection, we reset\n");
-               prepare_write_accept_reply(con, &tag_reset);
-       } else {
-               dout(20, "process_accept no existing connection, opening\n");
-               __register_connection(msgr, con);
-               con->global_seq = peer_gseq;
-               con->connect_seq = peer_cseq + 1;
-               prepare_write_accept_ready(con);
        }
-       spin_unlock(&msgr->con_lock);
-       radix_tree_preload_end();
 
-       ceph_queue_con(con);
-       put_connection(con);
-       return 0;
+       /* msg pages? */
+       if (con->out_msg) {
+               ret = write_partial_msg_pages(con);
+               if (ret == 1)
+                       goto more_kvec;  /* we need to send the footer, too! */
+               if (ret == 0)
+                       goto done;
+               if (ret < 0) {
+                       dout(30, "try_write write_partial_msg_pages err %d\n",
+                            ret);
+                       goto done;
+               }
+       }
+
+       /* is anything else pending? */
+       spin_lock(&con->out_queue_lock);
+       if (!list_empty(&con->out_queue)) {
+               prepare_write_message(con);
+               spin_unlock(&con->out_queue_lock);
+               goto more;
+       }
+       if (con->in_seq > con->in_seq_acked) {
+               prepare_write_ack(con);
+               spin_unlock(&con->out_queue_lock);
+               goto more;
+       }
+
+       /* Nothing to do! */
+       clear_bit(WRITE_PENDING, &con->state);
+       dout(30, "try_write nothing else to write.\n");
+       spin_unlock(&con->out_queue_lock);
+done:
+       ret = 0;
+out:
+       dout(30, "try_write done on %p\n", con);
+       return ret;
 }
 
 
+
 /*
- * worker function when data is available on the socket
+ * Read what we can from the socket.
  */
 static int try_read(struct ceph_connection *con)
 {
@@ -1690,7 +1711,7 @@ static int try_read(struct ceph_connection *con)
 more:
        if (test_bit(ACCEPTING, &con->state)) {
                dout(20, "try_read accepting\n");
-               ret = read_accept_partial(con);
+               ret = read_partial_accept(con);
                if (ret <= 0)
                        goto done;
                if (process_accept(con) < 0) {
@@ -1701,7 +1722,7 @@ more:
        }
        if (test_bit(CONNECTING, &con->state)) {
                dout(20, "try_read connecting\n");
-               ret = read_connect_partial(con);
+               ret = read_partial_connect(con);
                if (ret <= 0)
                        goto done;
                if (process_connect(con) < 0) {
@@ -1712,7 +1733,11 @@ more:
        }
 
        if (con->in_base_pos < 0) {
-               /* skipping + discarding content */
+               /*
+                * skipping + discarding content.
+                *
+                * FIXME: there must be a better way to do this!
+                */
                static char buf[1024];
                int skip = min(1024, -con->in_base_pos);
                dout(20, "skipping %d / %d bytes\n", skip, -con->in_base_pos);
@@ -1724,6 +1749,9 @@ more:
                        goto more;
        }
        if (con->in_tag == CEPH_MSGR_TAG_READY) {
+               /*
+                * what's next?
+                */
                ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
                if (ret <= 0)
                        goto done;
@@ -1743,7 +1771,7 @@ more:
                }
        }
        if (con->in_tag == CEPH_MSGR_TAG_MSG) {
-               ret = read_message_partial(con);
+               ret = read_partial_message(con);
                if (ret == -EIO) {
                        con->error_msg = "bad crc";
                        goto out;
@@ -1756,7 +1784,7 @@ more:
                goto more;
        }
        if (con->in_tag == CEPH_MSGR_TAG_ACK) {
-               ret = read_ack_partial(con);
+               ret = read_partial_ack(con);
                if (ret <= 0)
                        goto done;
                process_ack(con);
@@ -1777,6 +1805,39 @@ bad_tag:
 }
 
 
+/*
+ * Atomically queue work on a connection.  Bump @con reference to
+ * avoid races with connection teardown.
+ *
+ * There is some trickery going on with QUEUED, BUSY, and BACKOFF:
+ *
+ * 
+ */
+static void ceph_queue_con(struct ceph_connection *con)
+{
+       if (test_bit(WAIT, &con->state) ||
+           test_bit(CLOSED, &con->state) ||
+           test_bit(BACKOFF, &con->state)) {
+               dout(40, "ceph_queue_con %p ignoring: WAIT|CLOSED|BACKOFF\n",
+                    con);
+               return;
+       }
+
+       atomic_inc(&con->nref);
+       dout(40, "ceph_queue_con %p %d -> %d\n", con,
+            atomic_read(&con->nref) - 1, atomic_read(&con->nref));
+
+       set_bit(QUEUED, &con->state);
+       if (test_bit(BUSY, &con->state) ||
+           !queue_work(ceph_msgr_wq, &con->work.work)) {
+               dout(40, "ceph_queue_con %p - already BUSY or queued\n", con);
+               put_connection(con);
+       }
+}
+
+/*
+ * Do some work on a connection.  Drop a connection ref when we're done.
+ */
 static void con_work(struct work_struct *work)
 {
        struct ceph_connection *con = container_of(work, struct ceph_connection,
@@ -1823,6 +1884,60 @@ out:
 }
 
 
+/*
+ * failure case
+ * A retry mechanism is used with exponential backoff
+ */
+static void ceph_fault(struct ceph_connection *con)
+{
+       derr(1, "%s%d %u.%u.%u.%u:%u %s\n", ENTITY_NAME(con->peer_name),
+            IPQUADPORT(con->peer_addr.ipaddr), con->error_msg);
+       dout(10, "fault %p state %lu to peer %u.%u.%u.%u:%u\n",
+            con, con->state, IPQUADPORT(con->peer_addr.ipaddr));
+
+       if (test_bit(LOSSYTX, &con->state)) {
+               dout(30, "fault on LOSSYTX channel\n");
+               remove_connection(con->msgr, con);
+               return;
+       }
+
+       con_close_socket(con);
+
+       /* hmm? */
+       BUG_ON(test_bit(WAIT, &con->state));
+
+       /*
+        * If there are no messages in the queue, place the
+        * connection in a STANDBY state.  otherwise, retry with
+        * delay
+        */
+       spin_lock(&con->out_queue_lock);
+       if (list_empty(&con->out_queue)) {
+               dout(10, "fault setting STANDBY\n");
+               set_bit(STANDBY, &con->state);
+               spin_unlock(&con->out_queue_lock);
+               return;
+       }
+
+       dout(10, "fault setting BACKOFF\n");
+       set_bit(BACKOFF, &con->state);
+
+       if (con->delay == 0)
+               con->delay = BASE_DELAY_INTERVAL;
+       else if (con->delay < MAX_DELAY_INTERVAL)
+               con->delay *= 2;
+
+       atomic_inc(&con->nref);
+       dout(40, "fault queueing %p %d -> %d delay %lu\n", con,
+            atomic_read(&con->nref) - 1, atomic_read(&con->nref),
+            con->delay);
+       queue_delayed_work(ceph_msgr_wq, &con->work,
+                          round_jiffies_relative(con->delay));
+
+       list_splice_init(&con->out_sent, &con->out_queue);
+       spin_unlock(&con->out_queue_lock);
+}
+
 
 /*
  *  worker function when listener receives a connect
index 0faeb491656197adfa8a76fc851533a1f804e45c..3899137781f4c0835f9fba249d494cfe81c2b8e1 100644 (file)
 
 #include "ceph_fs.h"
 
+/*
+ * Ceph uses the messenger to exchange ceph_msg messages with
+ * other hosts in the system.  The messenger provides ordered and 
+ * reliable delivery.  It tolerates TCP disconnects by reconnecting
+ * (with exponential backoff) in the case of a fault (disconnection,
+ * bad crc, protocol error).  Acks allow sent messages to be discarded
+ * by the sender.
+ *
+ * The network topology is flat: there is no "client" or "server," and
+ * any node can initiate a connection (i.e., send messages) to any other
+ * node.  There is a fair bit of complexity to handle the "connection
+ * race" case where two nodes are simultaneously connecting to each other
+ * so that the end result is a single session.
+ *
+ * The messenger can also send messages in "lossy" mode, where there is
+ * no error recovery or connect retry... the message is just dropped if
+ * something goes wrong.
+ */
+
 struct ceph_msg;
 
+#define IPQUADPORT(n)                                                  \
+       (unsigned int)((be32_to_cpu((n).sin_addr.s_addr) >> 24)) & 0xFF,                        \
+               (unsigned int)((be32_to_cpu((n).sin_addr.s_addr)) >> 16) & 0xFF,        \
+               (unsigned int)((be32_to_cpu((n).sin_addr.s_addr))>>8) & 0xFF,   \
+               (unsigned int)((be32_to_cpu((n).sin_addr.s_addr))) & 0xFF,      \
+               (unsigned int)(ntohs((n).sin_port))
+
+
 extern struct workqueue_struct *ceph_msgr_wq;       /* receive work queue */
 
+/*
+ * Ceph defines these callbacks for handling events:
+ */
+/* handle an incoming message. */
 typedef void (*ceph_msgr_dispatch_t) (void *p, struct ceph_msg *m);
-typedef void (*ceph_msgr_peer_reset_t) (void *p, struct ceph_entity_addr *addr,
-                                       struct ceph_entity_name *pn);
+/* an incoming message has a data payload; tell me what pages I
+ * should read the data into. */
 typedef int (*ceph_msgr_prepare_pages_t) (void *p, struct ceph_msg *m,
                                          int want);
+/* a remote host as terminated a message exchange session, and messages
+ * we sent (or they tried to send us) may be lost. */
+typedef void (*ceph_msgr_peer_reset_t) (void *p, struct ceph_entity_addr *addr,
+                                       struct ceph_entity_name *pn);
 
-static __inline__ const char *ceph_name_type_str(int t) {
+static inline const char *ceph_name_type_str(int t) {
        switch (t) {
        case CEPH_ENTITY_TYPE_MON: return "mon";
        case CEPH_ENTITY_TYPE_MDS: return "mds";
@@ -40,22 +75,37 @@ static __inline__ const char *ceph_name_type_str(int t) {
                le32_to_cpu((n).num)
 
 struct ceph_messenger {
-       void *parent;
+       void *parent;                    /* normally struct ceph_client * */
        ceph_msgr_dispatch_t dispatch;
        ceph_msgr_peer_reset_t peer_reset;
        ceph_msgr_prepare_pages_t prepare_pages;
+
        struct ceph_entity_inst inst;    /* my name+address */
+
        struct socket *listen_sock;      /* listening socket */
        struct work_struct awork;        /* accept work */
+
        spinlock_t con_lock;
-       struct list_head con_all;        /* all connections */
-       struct list_head con_accepting;  /* accepting */
+       struct list_head con_all;        /* all open connections */
+       struct list_head con_accepting;  /*  accepting */
        struct radix_tree_root con_tree; /*  established */
-       struct page *zero_page;
+
+       struct page *zero_page;          /* used in certain error cases */
+
+       /*
+        * the global_seq counts connections i (attempt to) initiate
+        * in order to disambiguate certain connect race conditions.
+        */
        u32 global_seq;
        spinlock_t global_seq_lock;
 };
 
+/*
+ * a single message.  it contains a header (src, dest, message type, etc.),
+ * footer (crc values, mainly), a "front" message body, and possibly a
+ * data payload (stored in some number of pages).  The page_mutex protects
+ * access to the page vector.
+ */
 struct ceph_msg {
        struct ceph_msg_header hdr;     /* header */
        struct ceph_msg_footer footer;  /* footer */
@@ -68,81 +118,100 @@ struct ceph_msg {
 };
 
 struct ceph_msg_pos {
-       int page, page_pos;        /* which page; -3=tag, -2=hdr, -1=front */
-       int data_pos;
-       int did_page_crc;
+       int page, page_pos;  /* which page; offset in page */
+       int data_pos;        /* offset in data payload */
+       int did_page_crc;    /* true if we've calculated crc for current page */
 };
 
 /* ceph connection fault delay defaults */
 #define BASE_DELAY_INTERVAL    (HZ/2)
 #define MAX_DELAY_INTERVAL     (5 * 60 * HZ)
 
-/* ceph_connection state bit flags */
-#define LOSSYTX         0 /* close channel on errors */
-#define LOSSYRX         1 /* close channel on errors */
+/*
+ * ceph_connection state bit flags
+ *
+ * QUEUED, BUSY, and BACKOFF are used together to ensure that only a
+ * single thread is currently opening, reading or writing data to the
+ * socket.
+ */
+#define LOSSYTX         0  /* we can close channel or drop messages on errors */
+#define LOSSYRX         1  /* peer may reset/drop messages */
 #define CONNECTING     2
 #define ACCEPTING      3
-#define WRITE_PENDING  4  /* we have data to send */
-#define QUEUED          5  /* there is work to be done */
+#define WRITE_PENDING  4  /* we have data ready to send */
+#define QUEUED          5  /* there is work queued on this connection */
 #define BUSY            6  /* work is being done */
 #define BACKOFF         7  /* backing off; will retry */
-#define STANDBY                8  /* standby, when socket state close, no messages */
-#define WAIT           9  /* wait for peer to connect */
-#define CLOSED         10  /* we've closed the connection */
+#define STANDBY                8  /* no outgoing messages, socket closed.  we keep
+                           * the ceph_connection around to maintain shared
+                           * state with the peer. */
+#define WAIT           9  /* waiting for peer to connect to us (during a
+                           * connection race) */
+#define CLOSED         10 /* we've closed the connection */
 #define SOCK_CLOSED    11 /* socket state changed to closed */
-#define REGISTERED      12
-
-
+#define REGISTERED      12 /* connection appears in con_tree */
+
+/*
+ * A single connection with another host.
+ *
+ * We maintain a queue of outgoing messages, and some session state to
+ * ensure that we can preserve the lossless, ordered delivery of
+ * messages in the case of a TCP disconnect.
+ */
 struct ceph_connection {
        struct ceph_messenger *msgr;
-       struct socket *sock;    /* connection socket */
-       unsigned long state;    /* connection state */
-       const char *error_msg;
+       struct socket *sock;
+       unsigned long state;    /* connection state (see flags above) */
+       const char *error_msg;  /* error message, if any */
 
        atomic_t nref;
 
-       struct list_head list_all;   /* msgr->con_all */
+       struct list_head list_all;     /* msgr->con_all */
        struct list_head list_bucket;  /* msgr->con_tree or con_accepting */
 
        struct ceph_entity_addr peer_addr; /* peer address */
        struct ceph_entity_name peer_name; /* peer name */
-       __u32 connect_seq, global_seq;
-       bool lossy_rx;                     /* true if sender is lossy */
-
+       __u32 connect_seq, global_seq; /* identify the most recent connection
+                                         attempt for this connection, client */
+       
        /* out queue */
        spinlock_t out_queue_lock;   /* protects out_queue, out_sent, out_seq */
        struct list_head out_queue;
        struct list_head out_sent;   /* sending/sent but unacked */
-
        __u32 out_seq;               /* last message queued for send */
+
        __u32 in_seq, in_seq_acked;  /* last message received, acked */
 
-       /* negotiation temps */
+       /* connection negotiation temps */
        char in_banner[CEPH_BANNER_MAX_LEN];
        struct ceph_msg_connect out_connect, in_connect;
        struct ceph_entity_addr actual_peer_addr;
+       u8 in_flags;       /* follows TAG_READY */
 
-       /* out */
-       struct ceph_msg *out_msg;
+       /* message out temps */
+       struct ceph_msg *out_msg;        /* sending message (== tail of
+                                           out_sent) */
        struct ceph_msg_pos out_msg_pos;
-       __le32 out32;
-       struct kvec out_kvec[6],
+
+       struct kvec out_kvec[6],         /* sending header/footer data */
                *out_kvec_cur;
-       int out_kvec_left;   /* kvec's left */
-       int out_kvec_bytes;  /* bytes left */
-       int out_more;        /* there is more data after this kvec */
-
-       /* partially read message contents */
-       char in_tag;
-       u8 in_flags;
-       int in_base_pos;   /* for ack seq, or msg headers, or handshake */
-       __le32 in_partial_ack;
+       int out_kvec_left;   /* kvec's left in out_kvec */
+       int out_kvec_bytes;  /* total bytes left */
+       int out_more;        /* there is more data after the kvecs */
+       __le32 out_temp_ack; /* for writing an ack */
+
+       /* message in temps */
        struct ceph_msg *in_msg;
        struct ceph_msg_pos in_msg_pos;
-       u32 in_front_crc, in_data_crc;
+       u32 in_front_crc, in_data_crc;  /* calculated crc, for comparison
+                                          message footer */
+
+       char in_tag;         /* protocol control byte */
+       int in_base_pos;     /* bytes read */
+       __le32 in_temp_ack;  /* for reading an ack */
 
        struct delayed_work work;           /* send|recv work */
-       unsigned long       delay;          /* delay interval */
+       unsigned long       delay;          /* current delay interval */
 };
 
 extern int ceph_msgr_init(void);
@@ -154,13 +223,11 @@ extern void ceph_messenger_destroy(struct ceph_messenger *);
 extern void ceph_messenger_mark_down(struct ceph_messenger *msgr,
                                     struct ceph_entity_addr *addr);
 
-extern void ceph_queue_con(struct ceph_connection *con);
-
 extern struct ceph_msg *ceph_msg_new(int type, int front_len,
                                     int page_len, int page_off,
                                     struct page **pages);
 
-static __inline__ struct ceph_msg *ceph_msg_get(struct ceph_msg *msg) {
+static inline struct ceph_msg *ceph_msg_get(struct ceph_msg *msg) {
        /*printk("ceph_msg_get %p %d -> %d\n", msg, atomic_read(&msg->nref),
          atomic_read(&msg->nref)+1);*/
        atomic_inc(&msg->nref);
index 3585ca9feb245a8f44ca582dcd14b28aa48e0245..66feaff50f0f29f22467be7c51983cf58ce5f156 100644 (file)
@@ -54,13 +54,6 @@ extern int ceph_debug_mask;
 #define CEPH_BLOCK_SHIFT 20    /* 1 MB blocks for purposes of statfs*/
 #define CEPH_BLOCK  (1 << CEPH_BLOCK_SHIFT)
 
-#define IPQUADPORT(n)                                                  \
-       (unsigned int)((be32_to_cpu((n).sin_addr.s_addr) >> 24)) & 0xFF,                        \
-               (unsigned int)((be32_to_cpu((n).sin_addr.s_addr)) >> 16) & 0xFF,        \
-               (unsigned int)((be32_to_cpu((n).sin_addr.s_addr))>>8) & 0xFF,   \
-               (unsigned int)((be32_to_cpu((n).sin_addr.s_addr))) & 0xFF,      \
-               (unsigned int)(ntohs((n).sin_port))
-
 
 #if 0
 # define dput(dentry)                                 \