]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kmsgr: some formatting and style cleanup, and removed some legacy support
authorSage Weil <sage@newdream.net>
Tue, 1 Apr 2008 22:13:45 +0000 (15:13 -0700)
committerSage Weil <sage@newdream.net>
Tue, 1 Apr 2008 22:13:45 +0000 (15:13 -0700)
src/kernel/messenger.c
src/kernel/messenger.h

index 991bc158952ef50a8e10bf52ab23362186eddbe1..1dcaa09d57e1b3234ecc3e58a467a913c3d7a31f 100644 (file)
@@ -10,7 +10,7 @@
 
 int ceph_debug_msgr = 50;
 #define DOUT_VAR ceph_debug_msgr
-#define DOUT_PREFIX "msgr: " 
+#define DOUT_PREFIX "msgr: "
 #include "super.h"
 
 /* static tag bytes */
@@ -21,15 +21,9 @@ static char tag_wait = CEPH_MSGR_TAG_WAIT;
 static char tag_msg = CEPH_MSGR_TAG_MSG;
 static char tag_ack = CEPH_MSGR_TAG_ACK;
 
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
 static void try_read(struct work_struct *);
 static void try_write(struct work_struct *);
 static void try_accept(struct work_struct *);
-#else
-static void try_read(void *);
-static void try_write(void *);
-static void try_accept(void *);
-#endif
 
 
 
@@ -37,14 +31,14 @@ static void try_accept(void *);
  * connections
  */
 
-/* 
+/*
  * create a new connection.  initial state is NEW.
  */
 static struct ceph_connection *new_connection(struct ceph_messenger *msgr)
 {
        struct ceph_connection *con;
        con = kzalloc(sizeof(struct ceph_connection), GFP_KERNEL);
-       if (con == NULL) 
+       if (con == NULL)
                return NULL;
 
        con->msgr = msgr;
@@ -59,13 +53,8 @@ static struct ceph_connection *new_connection(struct ceph_messenger *msgr)
        INIT_LIST_HEAD(&con->out_queue);
        INIT_LIST_HEAD(&con->out_sent);
 
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
        INIT_WORK(&con->rwork, try_read);
        INIT_DELAYED_WORK(&con->swork, try_write);
-#else
-       INIT_WORK(&con->rwork, try_read, con);
-       INIT_WORK(&con->swork, try_write, con);
-#endif
 
        return con;
 }
@@ -77,7 +66,7 @@ static struct ceph_connection *new_connection(struct ceph_messenger *msgr)
  * table.  in the rare event that the trivial hash collides, we just
  * traverse the (short) list.
  */
-static unsigned long hash_addr(struct ceph_entity_addr *addr) 
+static unsigned long hash_addr(struct ceph_entity_addr *addr)
 {
        unsigned long key;
        key = *(__u32*)&addr->ipaddr.sin_addr.s_addr;
@@ -85,10 +74,11 @@ static unsigned long hash_addr(struct ceph_entity_addr *addr)
        return key;
 }
 
-/* 
+/*
  * get an existing connection, if any, for given addr
  */
-static struct ceph_connection *__get_connection(struct ceph_messenger *msgr, struct ceph_entity_addr *addr)
+static struct ceph_connection *__get_connection(struct ceph_messenger *msgr, 
+                                               struct ceph_entity_addr *addr)
 {
        struct ceph_connection *con = NULL;
        struct list_head *head, *p;
@@ -96,7 +86,7 @@ static struct ceph_connection *__get_connection(struct ceph_messenger *msgr, str
 
        /* existing? */
        head = radix_tree_lookup(&msgr->con_open, key);
-       if (head == NULL) 
+       if (head == NULL)
                goto out;
        con = list_entry(head, struct ceph_connection, list_bucket);
        if (memcmp(&con->peer_addr, addr, sizeof(addr)) == 0) {
@@ -117,10 +107,10 @@ out:
 
 
 
-/* 
+/*
  * drop a reference
  */
-static void put_connection(struct ceph_connection *con) 
+static void put_connection(struct ceph_connection *con)
 {
        dout(20, "put_connection nref = %d\n", atomic_read(&con->nref));
        if (atomic_dec_and_test(&con->nref)) {
@@ -131,10 +121,11 @@ static void put_connection(struct ceph_connection *con)
        }
 }
 
-/* 
+/*
  * add to connections tree
  */
-static void __add_connection(struct ceph_messenger *msgr, struct ceph_connection *con)
+static void __add_connection(struct ceph_messenger *msgr,
+                            struct ceph_connection *con)
 {
        struct list_head *head;
        unsigned long key = hash_addr(&con->peer_addr);
@@ -151,16 +142,19 @@ static void __add_connection(struct ceph_messenger *msgr, struct ceph_connection
 
        head = radix_tree_lookup(&msgr->con_open, key);
        if (head) {
-               dout(20, "add_connection %p in existing bucket %lu head %p\n", con, key, head);
+               dout(20, "add_connection %p in existing bucket %lu head %p\n",
+                    con, key, head);
                list_add(&con->list_bucket, head);
        } else {
-               dout(20, "add_connection %p in new bucket %lu head %p\n", con, key, &con->list_bucket);
+               dout(20, "add_connection %p in new bucket %lu head %p\n", con,
+                    key, &con->list_bucket);
                INIT_LIST_HEAD(&con->list_bucket); /* empty */
                radix_tree_insert(&msgr->con_open, key, &con->list_bucket);
        }
 }
 
-static void add_connection_accepting(struct ceph_messenger *msgr, struct ceph_connection *con)
+static void add_connection_accepting(struct ceph_messenger *msgr, 
+                                    struct ceph_connection *con)
 {
        atomic_inc(&con->nref);
        spin_lock(&msgr->con_lock);
@@ -173,7 +167,8 @@ static void add_connection_accepting(struct ceph_messenger *msgr, struct ceph_co
  * remove connection from all list.
  * also, from con_open radix tree, if it should have been there
  */
-static void __remove_connection(struct ceph_messenger *msgr, struct ceph_connection *con)
+static void __remove_connection(struct ceph_messenger *msgr,
+                               struct ceph_connection *con)
 {
        unsigned long key;
        void **slot, *val;
@@ -190,25 +185,20 @@ static void __remove_connection(struct ceph_messenger *msgr, struct ceph_connect
                key = hash_addr(&con->peer_addr);
                if (list_empty(&con->list_bucket)) {
                        /* last one */
-                       dout(20, "__remove_connection %p and removing bucket %lu\n", con, key);
+                       dout(20, "__remove_connection %p and bucket %lu\n", 
+                            con, key);
                        radix_tree_delete(&msgr->con_open, key);
                } else {
                        slot = radix_tree_lookup_slot(&msgr->con_open, key);
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
                        val = radix_tree_deref_slot(slot);
-#else
-                       val = *slot;
-#endif
-                       dout(20, "__remove_connection %p from bucket %lu head %p\n", con, key, val);
+                       dout(20, "__remove_connection %p from bucket %lu "
+                            "head %p\n", con, key, val);
                        if (val == &con->list_bucket) {
-                               dout(20, "__remove_connection adjusting bucket ptr"
-                                    " for %lu to next item, %p\n", key, 
+                               dout(20, "__remove_connection adjusting bucket"
+                                    " for %lu to next item, %p\n", key,
                                     con->list_bucket.next);
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
-                               radix_tree_replace_slot(slot, con->list_bucket.next);
-#else
-                               *slot = con->list_bucket.next;
-#endif
+                               radix_tree_replace_slot(slot, 
+                                                       con->list_bucket.next);
                        }
                        list_del(&con->list_bucket);
                }
@@ -216,7 +206,8 @@ static void __remove_connection(struct ceph_messenger *msgr, struct ceph_connect
        put_connection(con);
 }
 
-static void remove_connection(struct ceph_messenger *msgr, struct ceph_connection *con)
+static void remove_connection(struct ceph_messenger *msgr,
+                             struct ceph_connection *con)
 {
        spin_lock(&msgr->con_lock);
        __remove_connection(msgr, con);
@@ -231,11 +222,7 @@ void ceph_queue_write(struct ceph_connection *con)
 {
        dout(40, "ceph_queue_write %p\n", con);
        atomic_inc(&con->nref);
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
        if (!queue_work(send_wq, &con->swork.work)) {
-#else
-       if (!queue_work(send_wq, &con->swork)) {
-#endif
                dout(40, "ceph_queue_write %p - already queued\n", con);
                put_connection(con);
        }
@@ -271,13 +258,13 @@ void ceph_queue_read(struct ceph_connection *con)
  */
 static void ceph_send_fault(struct ceph_connection *con)
 {
-       derr(1, "ceph_send_fault %p state %lu to peer %u.%u.%u.%u:%u\n", 
+       derr(1, "ceph_send_fault %p state %lu to peer %u.%u.%u.%u:%u\n",
             con, con->state,
             IPQUADPORT(con->peer_addr.ipaddr));
 
        /* PW if never get here remove */
        if (test_bit(WAIT, &con->state)) {
-                derr(30, "ceph_send_fault received socket close during WAIT state\n");
+               derr(30, "ceph_send_fault socket close during WAIT state\n");
                return;
        }
 
@@ -288,7 +275,7 @@ static void ceph_send_fault(struct ceph_connection *con)
                con->sock = NULL;
                set_bit(NEW, &con->state);
 
-               /* If there are no messages in the queue, place the connection 
+               /* If there are no messages in the queue, place the connection
                * in a STANDBY state otherwise retry with delay */
                if (list_empty(&con->out_queue)) {
                        dout(10, "setting STANDBY bit\n");
@@ -338,7 +325,7 @@ static int write_partial_kvec(struct ceph_connection *con)
        int ret;
 
        while (con->out_kvec_bytes > 0) {
-               ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, 
+               ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur,
                                       con->out_kvec_left, con->out_kvec_bytes);
                if (ret <= 0) goto out;
                con->out_kvec_bytes -= ret;
@@ -359,20 +346,22 @@ static int write_partial_kvec(struct ceph_connection *con)
        con->out_kvec_left = 0;
        ret = 1;
 out:
-       dout(30, "write_partial_kvec %p left %d vec %d bytes ret = %d\n", con, 
+       dout(30, "write_partial_kvec %p left %d vec %d bytes ret = %d\n", con,
             con->out_kvec_left, con->out_kvec_bytes, ret);
        return ret;  /* done! */
 }
 
-static int write_partial_msg_pages(struct ceph_connection *con, struct ceph_msg *msg)
+static int write_partial_msg_pages(struct ceph_connection *con, 
+                                  struct ceph_msg *msg)
 {
        struct kvec kv;
        int ret;
        unsigned data_len = le32_to_cpu(msg->hdr.data_len);
 
        while (con->out_msg_pos.page < con->out_msg->nr_pages) {
-               kv.iov_base = kmap(msg->pages[con->out_msg_pos.page]) + con->out_msg_pos.page_pos;
-               kv.iov_len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos), 
+               kv.iov_base = kmap(msg->pages[con->out_msg_pos.page]) +
+                       con->out_msg_pos.page_pos;
+               kv.iov_len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
                                 (int)(data_len - con->out_msg_pos.data_pos));
                ret = ceph_tcp_sendmsg(con->sock, &kv, 1, kv.iov_len);
                if (ret < 0) return ret;
@@ -382,7 +371,7 @@ static int write_partial_msg_pages(struct ceph_connection *con, struct ceph_msg
                if (ret == kv.iov_len) {
                        con->out_msg_pos.page_pos = 0;
                        con->out_msg_pos.page++;
-               }               
+               }
        }
 
        /* done */
@@ -396,7 +385,8 @@ static int write_partial_msg_pages(struct ceph_connection *con, struct ceph_msg
  */
 static void prepare_write_message(struct ceph_connection *con)
 {
-       struct ceph_msg *m = list_entry(con->out_queue.next, struct ceph_msg, list_head);
+       struct ceph_msg *m = list_entry(con->out_queue.next, 
+                                       struct ceph_msg, list_head);
 
        /* move to sending/sent list */
        list_del(&m->list_head);
@@ -404,7 +394,7 @@ static void prepare_write_message(struct ceph_connection *con)
        con->out_msg = m;  /* FIXME: do we want to take a reference here? */
 
        /* encode header */
-       dout(20, "prepare_write_message %p seq %lld type %d len %d+%d\n", 
+       dout(20, "prepare_write_message %p seq %lld type %d len %d+%d\n",
             m, le64_to_cpu(m->hdr.seq), le32_to_cpu(m->hdr.type),
             le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.data_len));
        BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
@@ -427,7 +417,7 @@ static void prepare_write_message(struct ceph_connection *con)
        set_bit(WRITE_PENDING, &con->state);
 }
 
-/* 
+/*
  * prepare an ack for send
  */
 static void prepare_write_ack(struct ceph_connection *con)
@@ -445,7 +435,8 @@ static void prepare_write_ack(struct ceph_connection *con)
        set_bit(WRITE_PENDING, &con->state);
 }
 
-static void prepare_write_connect(struct ceph_messenger *msgr, struct ceph_connection *con)
+static void prepare_write_connect(struct ceph_messenger *msgr,
+                                 struct ceph_connection *con)
 {
        con->out_kvec[0].iov_base = &msgr->inst.addr;
        con->out_kvec[0].iov_len = sizeof(msgr->inst.addr);
@@ -458,7 +449,8 @@ static void prepare_write_connect(struct ceph_messenger *msgr, struct ceph_conne
        set_bit(WRITE_PENDING, &con->state);
 }
 
-static void prepare_write_accept_announce(struct ceph_messenger *msgr, struct ceph_connection *con)
+static void prepare_write_accept_announce(struct ceph_messenger *msgr, 
+                                         struct ceph_connection *con)
 {
        con->out_kvec[0].iov_base = &msgr->inst.addr;
        con->out_kvec[0].iov_len = sizeof(msgr->inst.addr);
@@ -494,22 +486,14 @@ static void prepare_write_accept_retry(struct ceph_connection *con, char *ptag)
 /*
  * worker function when socket is writeable
  */
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
 static void try_write(struct work_struct *work)
-#else
-static void try_write(void *arg)
-#endif
 {
-       struct ceph_connection *con = 
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
-                       container_of(work, struct ceph_connection, swork.work);
-#else
-                       arg;
-#endif
+       struct ceph_connection *con =
+               container_of(work, struct ceph_connection, swork.work);
        struct ceph_messenger *msgr = con->msgr;
        int ret = 1;
 
-       dout(30, "try_write start %p state %lu nref %d\n", con, con->state, 
+       dout(30, "try_write start %p state %lu nref %d\n", con, con->state,
             atomic_read(&con->nref));
 
        if (test_bit(CLOSED, &con->state)) {
@@ -530,7 +514,8 @@ more:
                        con->connect_seq++;
                prepare_write_connect(msgr, con);
                set_bit(CONNECTING, &con->state);
-               dout(5, "try_write initiating connect on %p new state %lu\n", con, con->state);
+               dout(5, "try_write initiating connect on %p new state %lu\n", 
+                    con, con->state);
                ret = ceph_tcp_connect(con);
                if (ret < 0) {
                        derr(1, "try_write tcp connect error %d\n", ret);
@@ -545,7 +530,7 @@ more:
                if (ret == 0)
                        goto done;
                if (ret < 0) {
-                       dout(30, "try_write write_partial_kvec returned error %d\n", ret);
+                       dout(30, "try_write write_partial_kvec err %d\n", ret);
                        goto done;
                }
        }
@@ -553,22 +538,24 @@ more:
        /* check if connect handshake finished.. if not requeue and return.. */
 /*
        if (!test_bit(OPEN, &con->state)) {
-                dout(5, "try_write state = %lu, need to requeue and exit\n", con->state);
-                goto done;
-        }
+               dout(5, "try_write state = %lu, need to requeue and exit\n",
+               con->state);
+               goto done;
+       }
 */
-       
+
        /* msg pages? */
        if (con->out_msg) {
                ret = write_partial_msg_pages(con, con->out_msg);
-               if (ret == 0) 
+               if (ret == 0)
                        goto done;
                if (ret < 0) {
-                       dout(30, "try_write write_partial_msg_pages returned error %d\n", ret);
+                       dout(30, "try_write write_partial_msg_pages err %d\n", 
+                            ret);
                        goto done;
                }
        }
-       
+
        /* anything else pending? */
        spin_lock(&con->out_queue_lock);
        if (con->in_seq > con->in_seq_acked) {
@@ -592,7 +579,7 @@ done:
        return;
 }
 
-/* 
+/*
  * prepare to read a message
  */
 static int prepare_read_message(struct ceph_connection *con)
@@ -603,9 +590,9 @@ static int prepare_read_message(struct ceph_connection *con)
        con->in_base_pos = 0;
        con->in_msg = ceph_msg_new(0, 0, 0, 0, 0);
        if (IS_ERR(con->in_msg)) {
-               /* TBD: we don't check for error in caller, handle error here? */
+               /* TBD: we don't check for error in caller, handle it here? */
                err = PTR_ERR(con->in_msg);
-               con->in_msg = 0;                
+               con->in_msg = 0;
                derr(1, "kmalloc failure on incoming message %d\n", err);
                return err;
        }
@@ -622,13 +609,14 @@ static int read_message_partial(struct ceph_connection *con)
        int ret;
        int want, left;
        unsigned front_len, data_len, data_off;
-       
+
        dout(20, "read_message_partial con %p msg %p\n", con, m);
 
        /* header */
        while (con->in_base_pos < sizeof(struct ceph_msg_header)) {
                left = sizeof(struct ceph_msg_header) - con->in_base_pos;
-               ret = ceph_tcp_recvmsg(con->sock, &m->hdr + con->in_base_pos, left);
+               ret = ceph_tcp_recvmsg(con->sock, &m->hdr + con->in_base_pos,
+                                      left);
                if (ret <= 0) return ret;
                con->in_base_pos += ret;
                if (con->in_base_pos == sizeof(struct ceph_msg_header))
@@ -644,7 +632,8 @@ static int read_message_partial(struct ceph_connection *con)
                                return -ENOMEM;
                }
                left = front_len - m->front.iov_len;
-               ret = ceph_tcp_recvmsg(con->sock, (char*)m->front.iov_base + m->front.iov_len, left);
+               ret = ceph_tcp_recvmsg(con->sock, (char*)m->front.iov_base +
+                                      m->front.iov_len, left);
                if (ret <= 0) return ret;
                m->front.iov_len += ret;
        }
@@ -652,7 +641,7 @@ static int read_message_partial(struct ceph_connection *con)
        /* (page) data */
        data_len = le32_to_cpu(m->hdr.data_len);
        data_off = le32_to_cpu(m->hdr.data_off);
-       if (data_len == 0) 
+       if (data_len == 0)
                goto done;
        if (m->nr_pages == 0) {
                con->in_msg_pos.page = 0;
@@ -664,8 +653,8 @@ static int read_message_partial(struct ceph_connection *con)
                BUG_ON(!con->msgr->prepare_pages);
                ret = con->msgr->prepare_pages(con->msgr->parent, m, want);
                if (ret < 0) {
-                       dout(10, "prepare_pages failed, skipping+discarding message\n");
-                       con->in_base_pos = -data_len; /* ignore rest of message */
+                       dout(10, "prepare_pages failed, skipping payload\n");
+                       con->in_base_pos = -data_len; /* skip payload */
                        ceph_msg_put(con->in_msg);
                        con->in_msg = 0;
                        con->in_tag = CEPH_MSGR_TAG_READY;
@@ -674,16 +663,15 @@ static int read_message_partial(struct ceph_connection *con)
                        BUG_ON(m->nr_pages != want);
                }
                /*
-                * FIXME: we should discard the data payload if ret 
+                * FIXME: we should discard the data payload if ret
                 */
        }
        while (con->in_msg_pos.data_pos < data_len) {
                left = min((int)(data_len - con->in_msg_pos.data_pos),
                           (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
-               /*dout(10, "data_pos = %d, data_len = %d, page_pos=%d left = %d\n", 
-                 con->in_msg_pos.data_pos, m->hdr.data_len, con->in_msg_pos.page_pos, left);*/
                p = kmap(m->pages[con->in_msg_pos.page]);
-               ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, left);
+               ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
+                                      left);
                if (ret <= 0) return ret;
                con->in_msg_pos.data_pos += ret;
                con->in_msg_pos.page_pos += ret;
@@ -700,11 +688,12 @@ done:
        if (con->msgr->inst.addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) {
                /*
                 * in practice, we learn our ip from the first incoming mon
-                * message, before anyone else knows we exist, so this is 
+                * message, before anyone else knows we exist, so this is
                 * safe.
                 */
                con->msgr->inst.addr.ipaddr = con->in_msg->hdr.dst.addr.ipaddr;
-               dout(10, "read_message_partial learned my addr is %u.%u.%u.%u:%u\n",
+               dout(10, "read_message_partial learned my addr is "
+                    "%u.%u.%u.%u:%u\n",
                     IPQUADPORT(con->msgr->inst.addr.ipaddr));
        }
 
@@ -712,7 +701,7 @@ done:
 }
 
 
-/* 
+/*
  * prepare to read an ack
  */
 static void prepare_read_ack(struct ceph_connection *con)
@@ -728,7 +717,9 @@ static int read_ack_partial(struct ceph_connection *con)
 {
        while (con->in_base_pos < sizeof(con->in_partial_ack)) {
                int left = sizeof(con->in_partial_ack) - con->in_base_pos;
-               int ret = ceph_tcp_recvmsg(con->sock, (char*)&con->in_partial_ack + con->in_base_pos, left);
+               int ret = ceph_tcp_recvmsg(con->sock, 
+                                          (char*)&con->in_partial_ack +
+                                          con->in_base_pos, left);
                if (ret <= 0) return ret;
                con->in_base_pos += ret;
        }
@@ -742,9 +733,9 @@ static void process_ack(struct ceph_connection *con, __u32 ack)
        while (!list_empty(&con->out_sent)) {
                m = list_entry(con->out_sent.next, struct ceph_msg, list_head);
                seq = le64_to_cpu(m->hdr.seq);
-               if (seq > ack) 
+               if (seq > ack)
                        break;
-               dout(5, "got ack for seq %llu type %d at %p\n", seq, 
+               dout(5, "got ack for seq %llu type %d at %p\n", seq,
                     le32_to_cpu(m->hdr.type), m);
                list_del(&m->list_head);
                ceph_msg_put(m);
@@ -752,20 +743,22 @@ static void process_ack(struct ceph_connection *con, __u32 ack)
 }
 
 
-/* 
+/*
  * read portion of connect-side handshake on a new connection
  */
 static int read_connect_partial(struct ceph_connection *con)
 {
        int ret, to;
-       dout(20, "read_connect_partial %p start at %d\n", con, con->in_base_pos);
+       dout(20, "read_connect_partial %p at %d\n", con, con->in_base_pos);
 
        /* actual_peer_addr */
        to = sizeof(con->actual_peer_addr);
        while (con->in_base_pos < to) {
                int left = to - con->in_base_pos;
                int have = con->in_base_pos;
-               ret = ceph_tcp_recvmsg(con->sock, (char*)&con->actual_peer_addr + have, left);
+               ret = ceph_tcp_recvmsg(con->sock, 
+                                      (char*)&con->actual_peer_addr + have, 
+                                      left);
                if (ret <= 0) goto out;
                con->in_base_pos += ret;
        }
@@ -784,16 +777,19 @@ static int read_connect_partial(struct ceph_connection *con)
                if (con->in_base_pos < to) {
                        int left = to - con->in_base_pos;
                        int have = sizeof(con->in_connect_seq) - left;
-                       ret = ceph_tcp_recvmsg(con->sock, 
-                                              (char*)&con->in_connect_seq + have, left);
+                       ret = ceph_tcp_recvmsg(con->sock,
+                                              (char*)&con->in_connect_seq +
+                                              have, left);
                        if (ret <= 0) goto out;
                        con->in_base_pos += ret;
-               }       
+               }
        }
        ret = 1;
 out:
-       dout(20, "read_connect_partial %p end at %d ret %d\n", con, con->in_base_pos, ret);
-       dout(20, "read_connect_partial peer in connect_seq = %u\n", le32_to_cpu(con->in_connect_seq));
+       dout(20, "read_connect_partial %p end at %d ret %d\n", con,
+            con->in_base_pos, ret);
+       dout(20, "read_connect_partial peer in connect_seq = %u\n", 
+            le32_to_cpu(con->in_connect_seq));
        return ret; /* done */
 }
 
@@ -821,7 +817,8 @@ static void process_connect(struct ceph_connection *con)
 {
        dout(20, "process_connect on %p tag %d\n", con, (int)con->in_tag);
        if (!ceph_entity_addr_is_local(con->peer_addr, con->actual_peer_addr)) {
-               derr(1, "process_connect wrong peer, want %u.%u.%u.%u:%u/%d, got %u.%u.%u.%u:%u/%d, wtf\n",
+               derr(1, "process_connect wrong peer, want %u.%u.%u.%u:%u/%d, "
+                    "got %u.%u.%u.%u:%u/%d, wtf\n",
                     IPQUADPORT(con->peer_addr.ipaddr),
                     con->peer_addr.nonce,
                     IPQUADPORT(con->actual_peer_addr.ipaddr),
@@ -832,18 +829,19 @@ static void process_connect(struct ceph_connection *con)
        }
        switch (con->in_tag) {
        case CEPH_MSGR_TAG_RESETSESSION:
-               dout(10, "process_connect got session RESET peers in_connect_seq %u\n", 
+               dout(10, "process_connect got RESET peer seq %u\n",
                     le32_to_cpu(con->in_connect_seq));
                reset_connection(con);
                prepare_write_connect(con->msgr, con);
                con->msgr->peer_reset(con->msgr->parent, &con->peer_name);
                break;
        case CEPH_MSGR_TAG_RETRY:
-               dout(10, 
-                    "process_connect got session RETRY connect_seq = %u, in_connect_seq = %u\n",
-                    le32_to_cpu(con->out_connect_seq), le32_to_cpu(con->in_connect_seq));
+               dout(10,
+                    "process_connect got RETRY my seq = %u, peer_seq = %u\n",
+                    le32_to_cpu(con->out_connect_seq), 
+                    le32_to_cpu(con->in_connect_seq));
                con->connect_seq = le32_to_cpu(con->in_connect_seq);
-               prepare_write_connect(con->msgr, con); 
+               prepare_write_connect(con->msgr, con);
                break;
        case CEPH_MSGR_TAG_WAIT:
                dout(10, "process_connect peer connecting WAIT\n");
@@ -858,7 +856,7 @@ static void process_connect(struct ceph_connection *con)
                set_bit(OPEN, &con->state);
                break;
        default:
-               derr(1, "process_connect protocol error, try connecting again in a bit\n");
+               derr(1, "process_connect protocol error, will retry\n");
                con->delay = 10 * HZ;  /* maybe use default.. */
                ceph_send_fault(con);
        }
@@ -881,7 +879,8 @@ static int read_accept_partial(struct ceph_connection *con)
        while (con->in_base_pos < to) {
                int left = to - con->in_base_pos;
                int have = con->in_base_pos;
-               ret = ceph_tcp_recvmsg(con->sock, (char*)&con->peer_addr + have, left);
+               ret = ceph_tcp_recvmsg(con->sock, 
+                                      (char*)&con->peer_addr + have, left);
                if (ret <= 0) return ret;
                con->in_base_pos += ret;
        }
@@ -891,7 +890,9 @@ static int read_accept_partial(struct ceph_connection *con)
        while (con->in_base_pos < to) {
                int left = to - con->in_base_pos;
                int have = sizeof(con->peer_addr) - left;
-               ret = ceph_tcp_recvmsg(con->sock, (char*)&con->in_connect_seq + have, left);
+               ret = ceph_tcp_recvmsg(con->sock, 
+                                      (char*)&con->in_connect_seq + have,
+                                      left);
                if (ret <= 0) return ret;
                con->in_base_pos += ret;
        }
@@ -900,10 +901,12 @@ static int read_accept_partial(struct ceph_connection *con)
 
 /*
  * replace another connection
- *  (old and new should be for the _same_ peer, 
+ *  (old and new should be for the _same_ peer,
  *   and thus in the same pos in the radix tree)
  */
-static void __replace_connection(struct ceph_messenger *msgr, struct ceph_connection *old, struct ceph_connection *new)
+static void __replace_connection(struct ceph_messenger *msgr,
+                                struct ceph_connection *old, 
+                                struct ceph_connection *new)
 {
        clear_bit(OPEN, &old->state);
 
@@ -951,7 +954,7 @@ static void process_accept(struct ceph_connection *con)
                                reset_connection(existing);
                                /* replace connection */
                                __replace_connection(msgr, existing, con);
-                               con->msgr->peer_reset(con->msgr->parent, 
+                               con->msgr->peer_reset(con->msgr->parent,
                                                      &con->peer_name);
                        } else {
                                /* old attempt or peer didn't get the READY */
@@ -960,23 +963,23 @@ static void process_accept(struct ceph_connection *con)
                                prepare_write_accept_retry(con, &tag_retry);
                        }
                } else if (peer_cseq == existing->connect_seq &&
-                          (test_bit(CONNECTING, &existing->state) || 
+                          (test_bit(CONNECTING, &existing->state) ||
                            test_bit(WAIT, &existing->state))) {
                        /* connection race */
                        dout(20, "process_accept connection race state = %lu\n",
                             con->state);
-                       if (ceph_entity_addr_equal(&msgr->inst.addr, 
+                       if (ceph_entity_addr_equal(&msgr->inst.addr,
                                                   &con->peer_addr)) {
                                /* incoming connection wins.. */
                                /* replace existing with new connection */
-                               __replace_connection(msgr, existing, con);
+                               __replace_connection(msgr, existing, con);
                        } else {
                                /* our existing outgoing connection wins..
-                                  tell peer to wait for our outgoing 
+                                  tell peer to wait for our outgoing
                                   connection to go through */
                                prepare_write_accept_reply(con, &tag_wait);
                        }
-               } else if (existing->connect_seq == 0 && 
+               } else if (existing->connect_seq == 0 &&
                          (peer_cseq > existing->connect_seq)) {
                        /* we reset and already reconnecting */
                        prepare_write_accept_reply(con, &tag_reset);
@@ -986,12 +989,12 @@ static void process_accept(struct ceph_connection *con)
                }
                put_connection(existing);
        } else if (peer_cseq > 0) {
-               dout(20, "process_accept no existing connection, connection reset\n");
+               dout(20, "process_accept no existing connection, we reset\n");
                prepare_write_accept_reply(con, &tag_reset);
        } else {
-               dout(20, "process_accept no existing connection, connection now OPEN\n");
+               dout(20, "process_accept no existing connection, opening\n");
                __add_connection(msgr, con);
-                set_bit(OPEN, &con->state);
+               set_bit(OPEN, &con->state);
                con->connect_seq = peer_cseq + 1;
                prepare_write_accept_reply(con, &tag_ready);
        }
@@ -1004,21 +1007,13 @@ static void process_accept(struct ceph_connection *con)
 /*
  * worker function when data is available on the socket
  */
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
 static void try_read(struct work_struct *work)
-#else
-static void try_read(void *arg)
-#endif
 {
        int ret = -1;
        struct ceph_connection *con;
        struct ceph_messenger *msgr;
 
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
        con = container_of(work, struct ceph_connection, rwork);
-#else
-       con = arg;
-#endif
        dout(20, "try_read start on %p\n", con);
        msgr = con->msgr;
 
@@ -1052,7 +1047,7 @@ more:
        if (con->in_base_pos < 0) {
                /* skipping + discarding content */
                static char buf[1024];
-               ret = ceph_tcp_recvmsg(con->sock, buf, 
+               ret = ceph_tcp_recvmsg(con->sock, buf,
                                       min(1024, -con->in_base_pos));
                if (ret <= 0) goto done;
                con->in_base_pos += ret;
@@ -1062,7 +1057,7 @@ more:
                ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
                if (ret <= 0) goto done;
                dout(30, "try_read got tag %d\n", (int)con->in_tag);
-               if (con->in_tag == CEPH_MSGR_TAG_MSG) 
+               if (con->in_tag == CEPH_MSGR_TAG_MSG)
                        prepare_read_message(con);
                else if (con->in_tag == CEPH_MSGR_TAG_ACK)
                        prepare_read_ack(con);
@@ -1077,14 +1072,15 @@ more:
                ret = read_message_partial(con);
                if (ret <= 0) goto done;
 
-               dout(1, "===== %p from %s%d %d=%s len %d+%d =====\n", con->in_msg,
-                    ceph_name_type_str(le32_to_cpu(con->in_msg->hdr.src.name.type)), 
+               dout(1, "===== %p from %s%d %d=%s len %d+%d =====\n", 
+                    con->in_msg,
+                    ceph_name_type_str(le32_to_cpu(con->in_msg->hdr.src.name.type)),
                     le32_to_cpu(con->in_msg->hdr.src.name.num),
-                    le32_to_cpu(con->in_msg->hdr.type), 
+                    le32_to_cpu(con->in_msg->hdr.type),
                     ceph_msg_type_name(le32_to_cpu(con->in_msg->hdr.type)),
-                    le32_to_cpu(con->in_msg->hdr.front_len), 
+                    le32_to_cpu(con->in_msg->hdr.front_len),
                     le32_to_cpu(con->in_msg->hdr.data_len));
-               msgr->dispatch(con->msgr->parent, con->in_msg); /* fixme: use a workqueue */
+               msgr->dispatch(con->msgr->parent, con->in_msg); 
                con->in_msg = 0;
                con->in_tag = CEPH_MSGR_TAG_READY;
                goto more;
@@ -1117,34 +1113,26 @@ out:
 /*
  *  worker function when listener receives a connect
  */
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
 static void try_accept(struct work_struct *work)
-#else
-static void try_accept(void *arg)
-#endif
 {
        struct ceph_connection *new_con = NULL;
        struct ceph_messenger *msgr;
 
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
        msgr = container_of(work, struct ceph_messenger, awork);
-#else
-       msgr = arg;
-#endif
 
-        dout(5, "Entered try_accept\n");
+       dout(5, "Entered try_accept\n");
 
        /* initialize the msgr connection */
        new_con = new_connection(msgr);
        if (new_con == NULL) {
-                       derr(1, "malloc failure\n");
+               derr(1, "malloc failure\n");
                goto done;
                }
        if (ceph_tcp_accept(msgr->listen_sock, new_con) < 0) {
-               derr(1, "error accepting connection\n");
+               derr(1, "error accepting connection\n");
                put_connection(new_con);
-                goto done;
-        }
+               goto done;
+       }
        dout(5, "accepted connection \n");
 
        new_con->in_tag = CEPH_MSGR_TAG_READY;
@@ -1155,12 +1143,12 @@ static void try_accept(void *arg)
        add_connection_accepting(msgr, new_con);
 
        /*
-        * hand off to worker threads ,should be able to write, we want to 
+        * hand off to worker threads ,should be able to write, we want to
         * try to write right away, we may have missed socket state change
         */
        ceph_queue_write(new_con);
 done:
-        return;
+       return;
 }
 
 /*
@@ -1168,21 +1156,17 @@ done:
  */
 struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr)
 {
-        struct ceph_messenger *msgr;
+       struct ceph_messenger *msgr;
        int ret = 0;
 
-        msgr = kzalloc(sizeof(*msgr), GFP_KERNEL);
-        if (msgr == NULL) 
+       msgr = kzalloc(sizeof(*msgr), GFP_KERNEL);
+       if (msgr == NULL)
                return ERR_PTR(-ENOMEM);
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
        INIT_WORK(&msgr->awork, try_accept);
-#else
-       INIT_WORK(&msgr->awork, try_accept, msgr);
-#endif
        spin_lock_init(&msgr->con_lock);
        INIT_LIST_HEAD(&msgr->con_all);
        INIT_LIST_HEAD(&msgr->con_accepting);
-       INIT_RADIX_TREE(&msgr->con_open, GFP_ATOMIC);  /* we insert under spinlock */
+       INIT_RADIX_TREE(&msgr->con_open, GFP_ATOMIC);
 
        /* pick listening address */
        if (myaddr) {
@@ -1193,18 +1177,18 @@ struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr)
                msgr->inst.addr.ipaddr.sin_port = htons(0);  /* any port */
        }
        msgr->inst.addr.ipaddr.sin_family = AF_INET;
-       
+
        /* create listening socket */
        ret = ceph_tcp_listen(msgr);
        if (ret < 0) {
                kfree(msgr);
                return ERR_PTR(ret);
        }
-       if (myaddr) 
+       if (myaddr)
                msgr->inst.addr.ipaddr.sin_addr = myaddr->ipaddr.sin_addr;
 
        dout(1, "create %p listening on %x:%d\n", msgr,
-            ntohl(msgr->inst.addr.ipaddr.sin_addr.s_addr), 
+            ntohl(msgr->inst.addr.ipaddr.sin_addr.s_addr),
             ntohs(msgr->inst.addr.ipaddr.sin_port));
        return msgr;
 }
@@ -1219,7 +1203,8 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr)
        spin_lock(&msgr->con_lock);
        while (!list_empty(&msgr->con_all)) {
                dout(1, "con_all isn't empty\n");
-               con = list_entry(msgr->con_all.next, struct ceph_connection, list_all);
+               con = list_entry(msgr->con_all.next, struct ceph_connection,
+                                list_all);
                __remove_connection(msgr, con);
        }
        spin_unlock(&msgr->con_lock);
@@ -1233,7 +1218,8 @@ void ceph_messenger_destroy(struct ceph_messenger *msgr)
 /*
  * mark a peer down.  drop any open connection.
  */
-void ceph_messenger_mark_down(struct ceph_messenger *msgr, struct ceph_entity_addr *addr)
+void ceph_messenger_mark_down(struct ceph_messenger *msgr, 
+                             struct ceph_entity_addr *addr)
 {
        struct ceph_connection *con;
 
@@ -1244,7 +1230,7 @@ void ceph_messenger_mark_down(struct ceph_messenger *msgr, struct ceph_entity_ad
        con = __get_connection(msgr, addr);
        if (con) {
                dout(10, "mark_down dropping %p\n", con);
-               set_bit(CLOSED, &con->state);  /* in case there is queued work */
+               set_bit(CLOSED, &con->state);  /* in case there's queued work */
                __remove_connection(msgr, con);
                put_connection(con);
        }
@@ -1257,11 +1243,12 @@ void ceph_messenger_mark_down(struct ceph_messenger *msgr, struct ceph_entity_ad
  *
  * will take+drop msgr, then connection locks.
  */
-int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, unsigned long timeout)
+int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, 
+                 unsigned long timeout)
 {
        struct ceph_connection *con, *newcon;
        int ret = 0;
-       
+
        /* set source */
        msg->hdr.src = msgr->inst;
 
@@ -1278,43 +1265,46 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, unsigned lo
                con = __get_connection(msgr, &msg->hdr.dst.addr);
                if (con) {
                        put_connection(newcon);
-                       dout(10, "ceph_msg_send (lost race and) had connection %p to peer %u.%u.%u.%u:%u\n", con,
+                       dout(10, "ceph_msg_send (lost race and) had connection "
+                            "%p to peer %u.%u.%u.%u:%u\n", con,
                             IPQUADPORT(msg->hdr.dst.addr.ipaddr));
                } else {
                        con = newcon;
                        con->peer_addr = msg->hdr.dst.addr;
                        con->peer_name = msg->hdr.dst.name;
                        __add_connection(msgr, con);
-                       dout(5, "ceph_msg_send new connection %p to peer %u.%u.%u.%u:%u\n", con,
+                       dout(5, "ceph_msg_send new connection %p to peer "
+                            "%u.%u.%u.%u:%u\n", con,
                             IPQUADPORT(msg->hdr.dst.addr.ipaddr));
                }
        } else {
-               dout(10, "ceph_msg_send had connection %p to peer %u.%u.%u.%u:%u\n", con,
+               dout(10, "ceph_msg_send had connection %p to peer "
+                    "%u.%u.%u.%u:%u\n", con,
                     IPQUADPORT(msg->hdr.dst.addr.ipaddr));
        }
        spin_unlock(&msgr->con_lock);
        con->delay = timeout;
-       dout(10, "ceph_msg_send delay = %lu\n", con->delay); 
+       dout(10, "ceph_msg_send delay = %lu\n", con->delay);
 
        /* queue */
        spin_lock(&con->out_queue_lock);
        msg->hdr.seq = cpu_to_le64(++con->out_seq);
        dout(1, "----- %p to %s%d %d=%s len %d+%d -----\n", msg,
-            ceph_name_type_str(le32_to_cpu(msg->hdr.dst.name.type)), 
+            ceph_name_type_str(le32_to_cpu(msg->hdr.dst.name.type)),
             le32_to_cpu(msg->hdr.dst.name.num),
             le32_to_cpu(msg->hdr.type),
             ceph_msg_type_name(le32_to_cpu(msg->hdr.type)),
-            le32_to_cpu(msg->hdr.front_len), 
+            le32_to_cpu(msg->hdr.front_len),
             le32_to_cpu(msg->hdr.data_len));
-       dout(2, "ceph_msg_send queuing %p seq %llu for %s%d on %p\n", msg, 
+       dout(2, "ceph_msg_send queuing %p seq %llu for %s%d on %p\n", msg,
             le64_to_cpu(msg->hdr.seq),
-            ceph_name_type_str(le32_to_cpu(msg->hdr.dst.name.type)), 
+            ceph_name_type_str(le32_to_cpu(msg->hdr.dst.name.type)),
             le32_to_cpu(msg->hdr.dst.name.num), con);
        ceph_msg_get(msg);
        list_add_tail(&msg->list_head, &con->out_queue);
        spin_unlock(&con->out_queue_lock);
-       
-       if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) 
+
+       if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
                ceph_queue_write(con);
 
        put_connection(con);
@@ -1323,10 +1313,11 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, unsigned lo
 }
 
 
-/* 
+/*
  * construct a new message with given type, size
  */
-struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_off, struct page **pages)
+struct ceph_msg *ceph_msg_new(int type, int front_len,
+                             int page_len, int page_off, struct page **pages)
 {
        struct ceph_msg *m;
 
@@ -1369,7 +1360,7 @@ void ceph_msg_put(struct ceph_msg *m)
        if (atomic_dec_and_test(&m->nref)) {
                dout(30, "ceph_msg_put last one on %p\n", m);
                BUG_ON(!list_empty(&m->list_head));
-               if (m->front.iov_base) 
+               if (m->front.iov_base)
                        kfree(m->front.iov_base);
                kfree(m);
        }
index fc0995f63018aba3f76239050a2c8e36c00c2ddf..c3d6b98ffcd07ddb99d60fb06a2bbaa469541e9f 100644 (file)
@@ -12,7 +12,8 @@ struct ceph_msg;
 
 typedef void (*ceph_msgr_dispatch_t) (void *p, struct ceph_msg *m);
 typedef void (*ceph_msgr_peer_reset_t) (void *p, struct ceph_entity_name *pn);
-typedef int (*ceph_msgr_prepare_pages_t) (void *p, struct ceph_msg *m, int want);
+typedef int (*ceph_msgr_prepare_pages_t) (void *p, struct ceph_msg *m,
+                                         int want);
 
 static __inline__ const char *ceph_name_type_str(int t) {
        switch (t) {
@@ -37,7 +38,7 @@ struct ceph_messenger {
        spinlock_t con_lock;
        struct list_head con_all;        /* all connections */
        struct list_head con_accepting;  /*  doing handshake, or */
-       struct radix_tree_root con_open; /*  established. see get_connection() */
+       struct radix_tree_root con_open; /*  established */
 };
 
 struct ceph_msg {
@@ -55,7 +56,7 @@ struct ceph_msg_pos {
 };
 
 /* ceph connection fault delay defaults */
-#define BASE_DELAY_INTERVAL    1       
+#define BASE_DELAY_INTERVAL    1
 #define MAX_DELAY_INTERVAL     (5U * 60 * HZ)
 
 /* ceph_connection state bit flags */
@@ -69,13 +70,13 @@ struct ceph_msg_pos {
 #define WAIT           7  /* wait for peer to connect */
 #define CLOSED         8  /* we've closed the connection */
 #define SOCK_CLOSE     9  /* socket state changed to close */
-#define STANDBY                10 /* standby, when socket state close, no message queued */
+#define STANDBY                10 /* standby, when socket state close, no messages */
 
 struct ceph_connection {
        struct ceph_messenger *msgr;
        struct socket *sock;    /* connection socket */
        unsigned long state;    /* connection state */
-       
+
        atomic_t nref;
 
        struct list_head list_all;   /* msgr->con_all */
@@ -83,8 +84,8 @@ struct ceph_connection {
 
        struct ceph_entity_addr peer_addr; /* peer address */
        struct ceph_entity_name peer_name; /* peer name */
-       __u32 connect_seq;     
-       __le32 in_connect_seq, out_connect_seq;     
+       __u32 connect_seq;
+       __le32 in_connect_seq, out_connect_seq;
        __u32 out_seq;               /* last message queued for send */
        __u32 in_seq, in_seq_acked;  /* last message received, acked */
 
@@ -94,7 +95,7 @@ struct ceph_connection {
        /* out queue */
        spinlock_t out_queue_lock;   /* protects out_queue, out_sent, out_seq */
        struct list_head out_queue;
-       struct list_head out_sent;   /* sending/sent but unacked; resend if connection drops */
+       struct list_head out_sent;   /* sending/sent but unacked */
 
        __le32 out32;
        struct kvec out_kvec[4],
@@ -105,36 +106,36 @@ struct ceph_connection {
        struct ceph_msg_pos out_msg_pos;
 
        /* partially read message contents */
-       char in_tag;       /* READY (accepting, or no in-progress read) or ACK or MSG */
-       int in_base_pos;   /* for ack seq, or msg headers, or accept handshake */
-       __u32 in_partial_ack; 
+       char in_tag;
+       int in_base_pos;   /* for ack seq, or msg headers, or handshake */
+       __u32 in_partial_ack;
        struct ceph_msg *in_msg;
        struct ceph_msg_pos in_msg_pos;
 
        struct work_struct rwork;               /* receive work */
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 20)
        struct delayed_work swork;              /* send work */
-#else
-       struct work_struct      swork;          /* send work */
-#endif
-        unsigned long           delay;          /* delay interval */
-        unsigned int            retries;        /* temp track of retries */
+       unsigned long           delay;          /* delay interval */
+       unsigned int            retries;        /* temp track of retries */
 };
 
 
-extern struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr);
+extern struct ceph_messenger *
+ceph_messenger_create(struct ceph_entity_addr *myaddr);
 extern void ceph_messenger_destroy(struct ceph_messenger *);
-extern void ceph_messenger_mark_down(struct ceph_messenger *msgr, struct ceph_entity_addr *addr);
+extern void ceph_messenger_mark_down(struct ceph_messenger *msgr,
+                                    struct ceph_entity_addr *addr);
 
 extern void ceph_queue_write(struct ceph_connection *con);
 extern void ceph_queue_read(struct ceph_connection *con);
 
-extern struct ceph_msg *ceph_msg_new(int type, int front_len, int page_len, int page_off, struct page **pages);
+extern struct ceph_msg *ceph_msg_new(int type, int front_len,
+                                    int page_len, int page_off,
+                                    struct page **pages);
 static __inline__ void ceph_msg_get(struct ceph_msg *msg) {
        atomic_inc(&msg->nref);
 }
 extern void ceph_msg_put(struct ceph_msg *msg);
-extern int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg, 
+extern int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg,
                         unsigned long timeout);
 
 #endif