]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: alloc 'middle' section of messages via callback
authorSage Weil <sage@newdream.net>
Mon, 17 Aug 2009 23:51:13 +0000 (16:51 -0700)
committerSage Weil <sage@newdream.net>
Mon, 17 Aug 2009 23:51:13 +0000 (16:51 -0700)
src/kernel/messenger.c
src/kernel/messenger.h
src/kernel/msgpool.c
src/kernel/super.c

index 98303198e10ee4e71083a7f723e44503ca68e886..e9c702e95e9f9173e6b002791121676a16a12973 100644 (file)
@@ -668,6 +668,7 @@ static void prepare_write_message(struct ceph_connection *con)
        con->out_msg->footer.flags = 0;
        con->out_msg->footer.front_crc =
                cpu_to_le32(crc32c(0, m->front.iov_base, m->front.iov_len));
+       con->out_msg->footer.middle_crc = 0;
        con->out_msg->footer.data_crc = 0;
 
        /* is there a data payload? */
@@ -967,7 +968,7 @@ static int prepare_read_message(struct ceph_connection *con)
        dout("prepare_read_message %p\n", con);
        BUG_ON(con->in_msg != NULL);
        con->in_base_pos = 0;
-       con->in_front_crc = con->in_data_crc = 0;
+       con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0;
        return 0;
 }
 
@@ -1389,7 +1390,7 @@ static int read_partial_message(struct ceph_connection *con)
        void *p;
        int ret;
        int to, want, left;
-       unsigned front_len, data_len, data_off;
+       unsigned front_len, middle_len, data_len, data_off;
        struct ceph_client *client = con->msgr->parent;
        int datacrc = !ceph_test_opt(client, NOCRC);
 
@@ -1419,6 +1420,9 @@ static int read_partial_message(struct ceph_connection *con)
        front_len = le32_to_cpu(con->in_hdr.front_len);
        if (front_len > CEPH_MSG_MAX_FRONT_LEN)
                return -EIO;
+       middle_len = le32_to_cpu(con->in_hdr.middle_len);
+       if (middle_len > CEPH_MSG_MAX_DATA_LEN)
+               return -EIO;
        data_len = le32_to_cpu(con->in_hdr.data_len);
        if (data_len > CEPH_MSG_MAX_DATA_LEN)
                return -EIO;
@@ -1432,7 +1436,7 @@ static int read_partial_message(struct ceph_connection *con)
                if (!con->in_msg) {
                        /* skip this message */
                        dout("alloc_msg returned NULL, skipping message\n");
-                       con->in_base_pos = -front_len - data_len -
+                       con->in_base_pos = -front_len - middle_len - data_len -
                                sizeof(m->footer);
                        con->in_tag = CEPH_MSGR_TAG_READY;
                        return 0;
@@ -1445,6 +1449,7 @@ static int read_partial_message(struct ceph_connection *con)
                }
                m = con->in_msg;
                m->front.iov_len = 0;    /* haven't read it yet */
+               m->middle.iov_len = 0;    /* haven't read it yet */
                memcpy(&m->hdr, &con->in_hdr, sizeof(con->in_hdr));
        }
 
@@ -1462,6 +1467,32 @@ static int read_partial_message(struct ceph_connection *con)
                                                      m->front.iov_len);
        }
 
+       /* middle */
+       while (m->middle.iov_len < middle_len) {
+               if (m->middle.iov_base == NULL) {
+                       BUG_ON(!con->msgr->alloc_middle);
+                       ret = con->msgr->alloc_middle(con->msgr->parent, m);
+                       if (ret < 0) {
+                               dout("alloc_middle failed, skipping payload\n");
+                               con->in_base_pos = -middle_len - data_len
+                                       - sizeof(m->footer);
+                               ceph_msg_put(con->in_msg);
+                               con->in_msg = NULL;
+                               con->in_tag = CEPH_MSGR_TAG_READY;
+                               return 0;
+                       }
+               }
+               left = middle_len - m->middle.iov_len;
+               ret = ceph_tcp_recvmsg(con->sock, (char *)m->middle.iov_base +
+                                      m->middle.iov_len, left);
+               if (ret <= 0)
+                       return ret;
+               m->middle.iov_len += ret;
+               if (m->middle.iov_len == middle_len)
+                       con->in_middle_crc = crc32c(0, m->middle.iov_base,
+                                                     m->middle.iov_len);
+       }
+
        /* (page) data */
        data_off = le16_to_cpu(m->hdr.data_off);
        if (data_len == 0)
@@ -1493,8 +1524,8 @@ static int read_partial_message(struct ceph_connection *con)
                if (!m->pages) {
                        dout("pages revoked during msg read\n");
                        mutex_unlock(&m->page_mutex);
-                       con->in_base_pos = con->in_msg_pos.data_pos - data_len -
-                               sizeof(m->footer);
+                       con->in_base_pos = middle_len - con->in_msg_pos.data_pos
+                               - data_len - sizeof(m->footer);
                        ceph_msg_put(m);
                        con->in_msg = NULL;
                        con->in_tag = CEPH_MSGR_TAG_READY;
@@ -1532,14 +1563,18 @@ no_data:
                con->in_base_pos += ret;
        }
        dout("read_partial_message got msg %p\n", m);
-       dout("got footer front crc %d data crc %d\n",
-            m->footer.front_crc, m->footer.data_crc);
+       dout("got footer front crc %d middle crc %d data crc %d\n",
+            m->footer.front_crc, m->footer.middle_crc, m->footer.data_crc);
 
        /* crc ok? */
        if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
                pr_err("ceph read_partial_message %p front crc %u != exp. %u\n",
-                      con->in_msg,
-                      con->in_front_crc, m->footer.front_crc);
+                      con->in_msg, con->in_front_crc, m->footer.front_crc);
+               return -EBADMSG;
+       }
+       if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) {
+               pr_err("ceph read_partial_message %p middle crc %u != exp %u\n",
+                      con->in_msg, con->in_middle_crc, m->footer.middle_crc);
                return -EBADMSG;
        }
        if (datacrc &&
@@ -1582,14 +1617,14 @@ static void process_message(struct ceph_connection *con)
        con->in_seq++;
        spin_unlock(&con->out_queue_lock);
 
-       dout("===== %p %llu from %s%d %d=%s len %d+%d (%u %u) =====\n",
+       dout("===== %p %llu from %s%d %d=%s len %d+%d (%u %u %u) =====\n",
             con->in_msg, le64_to_cpu(con->in_msg->hdr.seq),
             ENTITY_NAME(con->in_msg->hdr.src.name),
             le16_to_cpu(con->in_msg->hdr.type),
             ceph_msg_type_name(le16_to_cpu(con->in_msg->hdr.type)),
             le32_to_cpu(con->in_msg->hdr.front_len),
             le32_to_cpu(con->in_msg->hdr.data_len),
-            con->in_front_crc, con->in_data_crc);
+            con->in_front_crc, con->in_middle_crc, con->in_data_crc);
        con->msgr->dispatch(con->msgr->parent, con->in_msg);
        con->in_msg = NULL;
        prepare_read_tag(con);
@@ -2274,6 +2309,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
 
        m->hdr.type = cpu_to_le16(type);
        m->hdr.front_len = cpu_to_le32(front_len);
+       m->hdr.middle_len = 0;
        m->hdr.data_len = cpu_to_le32(page_len);
        m->hdr.data_off = cpu_to_le16(page_off);
        m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
@@ -2284,6 +2320,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
        m->hdr.mds_protocol = CEPH_MDS_PROTOCOL;
        m->hdr.mdsc_protocol = CEPH_MDSC_PROTOCOL;
        m->footer.front_crc = 0;
+       m->footer.middle_crc = 0;
        m->footer.data_crc = 0;
        m->front_max = front_len;
        m->front_is_vmalloc = false;
@@ -2309,7 +2346,11 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
        }
        m->front.iov_len = front_len;
 
-       /* pages */
+       /* middle */
+       m->middle.iov_base = NULL;
+       m->middle.iov_len = 0;
+
+       /* data */
        m->nr_pages = calc_pages_for(page_off, page_len);
        m->pages = pages;
 
@@ -2329,11 +2370,18 @@ out:
  */
 void ceph_msg_kfree(struct ceph_msg *m)
 {
+       dout("msg_kfree %p\n", m);
        if (m->front_is_vmalloc)
                vfree(m->front.iov_base);
        else
                kfree(m->front.iov_base);
+       if (m->middle.iov_base) {
+               dout("vfree %p\n", m->middle.iov_base);
+               vfree(m->middle.iov_base);
+               dout("vfree done\n");
+       }
        kfree(m);
+       dout("msg_kfree %p done\n", m);
 }
 
 void ceph_msg_put(struct ceph_msg *m)
index 188b1da3ec6de22fea5e9134a1347e22ba0766d9..fe6ed6a0cf078af33b3650a0ad34e8ad9780cb8d 100644 (file)
@@ -38,6 +38,7 @@ typedef void (*ceph_msgr_peer_reset_t) (void *p, struct ceph_entity_addr *addr,
 
 typedef struct ceph_msg * (*ceph_msgr_alloc_msg_t) (void *p,
                                            struct ceph_msg_header *hdr);
+typedef int (*ceph_msgr_alloc_middle_t) (void *p, struct ceph_msg *msg);
 
 
 static inline const char *ceph_name_type_str(int t)
@@ -65,6 +66,7 @@ struct ceph_messenger {
        ceph_msgr_peer_reset_t peer_reset;
        ceph_msgr_prepare_pages_t prepare_pages;
        ceph_msgr_alloc_msg_t alloc_msg;
+       ceph_msgr_alloc_middle_t alloc_middle;
 
        struct ceph_entity_inst inst;    /* my name+address */
 
@@ -95,7 +97,7 @@ struct ceph_messenger {
 struct ceph_msg {
        struct ceph_msg_header hdr;     /* header */
        struct ceph_msg_footer footer;  /* footer */
-       struct kvec front;              /* first bit of message */
+       struct kvec front, middle;      /* unaligned blobs of message */
        struct mutex page_mutex;
        struct page **pages;            /* data payload.  NOT OWNER. */
        unsigned nr_pages;              /* size of page array */
@@ -202,8 +204,7 @@ struct ceph_connection {
        struct ceph_msg_header in_hdr;
        struct ceph_msg *in_msg;
        struct ceph_msg_pos in_msg_pos;
-       u32 in_front_crc, in_data_crc;  /* calculated crc, for comparison
-                                          message footer */
+       u32 in_front_crc, in_middle_crc, in_data_crc;  /* calculated crc */
 
        char in_tag;         /* protocol control byte */
        int in_base_pos;     /* bytes read */
index 6840bac65d54750235aeb564a47b967242563206..2b42067e8e0f177cd57e51eb29773589deacc9f5 100644 (file)
@@ -2,6 +2,7 @@
 #include <linux/err.h>
 #include <linux/sched.h>
 #include <linux/types.h>
+#include <linux/vmalloc.h>
 
 #include "ceph_debug.h"
 #include "msgpool.h"
@@ -131,6 +132,12 @@ void ceph_msgpool_put(struct ceph_msg_pool *pool, struct ceph_msg *msg)
 {
        spin_lock(&pool->lock);
        if (pool->num < pool->min) {
+               /* drop middle, if any */
+               if (msg->middle.iov_base) {
+                       vfree(msg->middle.iov_base);
+                       msg->middle.iov_base = NULL;
+                       msg->middle.iov_len = 0;
+               }
                ceph_msg_get(msg);   /* retake a single ref */
                list_add(&msg->list_head, &pool->msgs);
                pool->num++;
index ce228deb50b84e92364e92f5260148f506cc764f..9b50b5e1458ab1987be3b03a1689ccb318326cfd 100644 (file)
@@ -29,6 +29,7 @@ static void ceph_dispatch(void *p, struct ceph_msg *msg);
 static void ceph_peer_reset(void *p, struct ceph_entity_addr *peer_addr,
                            struct ceph_entity_name *peer_name);
 static struct ceph_msg *ceph_alloc_msg(void *p, struct ceph_msg_header *hdr);
+static int ceph_alloc_middle(void *p, struct ceph_msg *msg);
 
 /*
  * find filename portion of a path (/foo/bar/baz -> baz)
@@ -885,6 +886,7 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
                client->msgr->prepare_pages = ceph_osdc_prepare_pages;
                client->msgr->peer_reset = ceph_peer_reset;
                client->msgr->alloc_msg = ceph_alloc_msg;
+               client->msgr->alloc_middle = ceph_alloc_middle;
        }
 
        /* send mount request, and wait for mon, mds, and osd maps */
@@ -1031,6 +1033,30 @@ static struct ceph_msg *ceph_alloc_msg(void *p, struct ceph_msg_header *hdr)
        return msg;
 }
 
+/*
+ * Allocate "middle" portion of a message, if it is needed and wasn't
+ * allocated by alloc_msg.  This allows us to read a small fixed-size
+ * per-type header in the front and then gracefully fail (i.e.,
+ * propagate the error to the caller based on info in the front) when
+ * the middle is too large.
+ */
+static int ceph_alloc_middle(void *p, struct ceph_msg *msg)
+{
+       int type = le32_to_cpu(msg->hdr.type);
+       int middle_len = le32_to_cpu(msg->hdr.middle_len);
+
+       dout("alloc_middle %p type %d %s middle_len %d\n", msg, type,
+            ceph_msg_type_name(type), middle_len);
+       BUG_ON(!middle_len);
+       BUG_ON(msg->middle.iov_base);
+
+       msg->middle.iov_base = __vmalloc(middle_len, GFP_NOFS, PAGE_KERNEL);
+       if (!msg->middle.iov_base)
+               return -ENOMEM;
+       return 0;
+}
+
+
 /*
  * Process an incoming message.
  *