From: Sage Weil Date: Mon, 17 Aug 2009 23:51:13 +0000 (-0700) Subject: kclient: alloc 'middle' section of messages via callback X-Git-Tag: v0.13~37 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=50d72f33221fd2800d58e1a3a97b015345be868a;p=ceph.git kclient: alloc 'middle' section of messages via callback --- diff --git a/src/kernel/messenger.c b/src/kernel/messenger.c index 98303198e10e..e9c702e95e9f 100644 --- a/src/kernel/messenger.c +++ b/src/kernel/messenger.c @@ -668,6 +668,7 @@ static void prepare_write_message(struct ceph_connection *con) con->out_msg->footer.flags = 0; con->out_msg->footer.front_crc = cpu_to_le32(crc32c(0, m->front.iov_base, m->front.iov_len)); + con->out_msg->footer.middle_crc = 0; con->out_msg->footer.data_crc = 0; /* is there a data payload? */ @@ -967,7 +968,7 @@ static int prepare_read_message(struct ceph_connection *con) dout("prepare_read_message %p\n", con); BUG_ON(con->in_msg != NULL); con->in_base_pos = 0; - con->in_front_crc = con->in_data_crc = 0; + con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0; return 0; } @@ -1389,7 +1390,7 @@ static int read_partial_message(struct ceph_connection *con) void *p; int ret; int to, want, left; - unsigned front_len, data_len, data_off; + unsigned front_len, middle_len, data_len, data_off; struct ceph_client *client = con->msgr->parent; int datacrc = !ceph_test_opt(client, NOCRC); @@ -1419,6 +1420,9 @@ static int read_partial_message(struct ceph_connection *con) front_len = le32_to_cpu(con->in_hdr.front_len); if (front_len > CEPH_MSG_MAX_FRONT_LEN) return -EIO; + middle_len = le32_to_cpu(con->in_hdr.middle_len); + if (middle_len > CEPH_MSG_MAX_DATA_LEN) + return -EIO; data_len = le32_to_cpu(con->in_hdr.data_len); if (data_len > CEPH_MSG_MAX_DATA_LEN) return -EIO; @@ -1432,7 +1436,7 @@ static int read_partial_message(struct ceph_connection *con) if (!con->in_msg) { /* skip this message */ dout("alloc_msg returned NULL, skipping message\n"); - con->in_base_pos = -front_len - data_len - + con->in_base_pos = -front_len - middle_len - data_len - sizeof(m->footer); con->in_tag = CEPH_MSGR_TAG_READY; return 0; @@ -1445,6 +1449,7 @@ static int read_partial_message(struct ceph_connection *con) } m = con->in_msg; m->front.iov_len = 0; /* haven't read it yet */ + m->middle.iov_len = 0; /* haven't read it yet */ memcpy(&m->hdr, &con->in_hdr, sizeof(con->in_hdr)); } @@ -1462,6 +1467,32 @@ static int read_partial_message(struct ceph_connection *con) m->front.iov_len); } + /* middle */ + while (m->middle.iov_len < middle_len) { + if (m->middle.iov_base == NULL) { + BUG_ON(!con->msgr->alloc_middle); + ret = con->msgr->alloc_middle(con->msgr->parent, m); + if (ret < 0) { + dout("alloc_middle failed, skipping payload\n"); + con->in_base_pos = -middle_len - data_len + - sizeof(m->footer); + ceph_msg_put(con->in_msg); + con->in_msg = NULL; + con->in_tag = CEPH_MSGR_TAG_READY; + return 0; + } + } + left = middle_len - m->middle.iov_len; + ret = ceph_tcp_recvmsg(con->sock, (char *)m->middle.iov_base + + m->middle.iov_len, left); + if (ret <= 0) + return ret; + m->middle.iov_len += ret; + if (m->middle.iov_len == middle_len) + con->in_middle_crc = crc32c(0, m->middle.iov_base, + m->middle.iov_len); + } + /* (page) data */ data_off = le16_to_cpu(m->hdr.data_off); if (data_len == 0) @@ -1493,8 +1524,8 @@ static int read_partial_message(struct ceph_connection *con) if (!m->pages) { dout("pages revoked during msg read\n"); mutex_unlock(&m->page_mutex); - con->in_base_pos = con->in_msg_pos.data_pos - data_len - - sizeof(m->footer); + con->in_base_pos = middle_len - con->in_msg_pos.data_pos + - data_len - sizeof(m->footer); ceph_msg_put(m); con->in_msg = NULL; con->in_tag = CEPH_MSGR_TAG_READY; @@ -1532,14 +1563,18 @@ no_data: con->in_base_pos += ret; } dout("read_partial_message got msg %p\n", m); - dout("got footer front crc %d data crc %d\n", - m->footer.front_crc, m->footer.data_crc); + dout("got footer front crc %d middle crc %d data crc %d\n", + m->footer.front_crc, m->footer.middle_crc, m->footer.data_crc); /* crc ok? */ if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) { pr_err("ceph read_partial_message %p front crc %u != exp. %u\n", - con->in_msg, - con->in_front_crc, m->footer.front_crc); + con->in_msg, con->in_front_crc, m->footer.front_crc); + return -EBADMSG; + } + if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) { + pr_err("ceph read_partial_message %p middle crc %u != exp %u\n", + con->in_msg, con->in_middle_crc, m->footer.middle_crc); return -EBADMSG; } if (datacrc && @@ -1582,14 +1617,14 @@ static void process_message(struct ceph_connection *con) con->in_seq++; spin_unlock(&con->out_queue_lock); - dout("===== %p %llu from %s%d %d=%s len %d+%d (%u %u) =====\n", + dout("===== %p %llu from %s%d %d=%s len %d+%d (%u %u %u) =====\n", con->in_msg, le64_to_cpu(con->in_msg->hdr.seq), ENTITY_NAME(con->in_msg->hdr.src.name), le16_to_cpu(con->in_msg->hdr.type), ceph_msg_type_name(le16_to_cpu(con->in_msg->hdr.type)), le32_to_cpu(con->in_msg->hdr.front_len), le32_to_cpu(con->in_msg->hdr.data_len), - con->in_front_crc, con->in_data_crc); + con->in_front_crc, con->in_middle_crc, con->in_data_crc); con->msgr->dispatch(con->msgr->parent, con->in_msg); con->in_msg = NULL; prepare_read_tag(con); @@ -2274,6 +2309,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, m->hdr.type = cpu_to_le16(type); m->hdr.front_len = cpu_to_le32(front_len); + m->hdr.middle_len = 0; m->hdr.data_len = cpu_to_le32(page_len); m->hdr.data_off = cpu_to_le16(page_off); m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT); @@ -2284,6 +2320,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, m->hdr.mds_protocol = CEPH_MDS_PROTOCOL; m->hdr.mdsc_protocol = CEPH_MDSC_PROTOCOL; m->footer.front_crc = 0; + m->footer.middle_crc = 0; m->footer.data_crc = 0; m->front_max = front_len; m->front_is_vmalloc = false; @@ -2309,7 +2346,11 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, } m->front.iov_len = front_len; - /* pages */ + /* middle */ + m->middle.iov_base = NULL; + m->middle.iov_len = 0; + + /* data */ m->nr_pages = calc_pages_for(page_off, page_len); m->pages = pages; @@ -2329,11 +2370,18 @@ out: */ void ceph_msg_kfree(struct ceph_msg *m) { + dout("msg_kfree %p\n", m); if (m->front_is_vmalloc) vfree(m->front.iov_base); else kfree(m->front.iov_base); + if (m->middle.iov_base) { + dout("vfree %p\n", m->middle.iov_base); + vfree(m->middle.iov_base); + dout("vfree done\n"); + } kfree(m); + dout("msg_kfree %p done\n", m); } void ceph_msg_put(struct ceph_msg *m) diff --git a/src/kernel/messenger.h b/src/kernel/messenger.h index 188b1da3ec6d..fe6ed6a0cf07 100644 --- a/src/kernel/messenger.h +++ b/src/kernel/messenger.h @@ -38,6 +38,7 @@ typedef void (*ceph_msgr_peer_reset_t) (void *p, struct ceph_entity_addr *addr, typedef struct ceph_msg * (*ceph_msgr_alloc_msg_t) (void *p, struct ceph_msg_header *hdr); +typedef int (*ceph_msgr_alloc_middle_t) (void *p, struct ceph_msg *msg); static inline const char *ceph_name_type_str(int t) @@ -65,6 +66,7 @@ struct ceph_messenger { ceph_msgr_peer_reset_t peer_reset; ceph_msgr_prepare_pages_t prepare_pages; ceph_msgr_alloc_msg_t alloc_msg; + ceph_msgr_alloc_middle_t alloc_middle; struct ceph_entity_inst inst; /* my name+address */ @@ -95,7 +97,7 @@ struct ceph_messenger { struct ceph_msg { struct ceph_msg_header hdr; /* header */ struct ceph_msg_footer footer; /* footer */ - struct kvec front; /* first bit of message */ + struct kvec front, middle; /* unaligned blobs of message */ struct mutex page_mutex; struct page **pages; /* data payload. NOT OWNER. */ unsigned nr_pages; /* size of page array */ @@ -202,8 +204,7 @@ struct ceph_connection { struct ceph_msg_header in_hdr; struct ceph_msg *in_msg; struct ceph_msg_pos in_msg_pos; - u32 in_front_crc, in_data_crc; /* calculated crc, for comparison - message footer */ + u32 in_front_crc, in_middle_crc, in_data_crc; /* calculated crc */ char in_tag; /* protocol control byte */ int in_base_pos; /* bytes read */ diff --git a/src/kernel/msgpool.c b/src/kernel/msgpool.c index 6840bac65d54..2b42067e8e0f 100644 --- a/src/kernel/msgpool.c +++ b/src/kernel/msgpool.c @@ -2,6 +2,7 @@ #include #include #include +#include #include "ceph_debug.h" #include "msgpool.h" @@ -131,6 +132,12 @@ void ceph_msgpool_put(struct ceph_msg_pool *pool, struct ceph_msg *msg) { spin_lock(&pool->lock); if (pool->num < pool->min) { + /* drop middle, if any */ + if (msg->middle.iov_base) { + vfree(msg->middle.iov_base); + msg->middle.iov_base = NULL; + msg->middle.iov_len = 0; + } ceph_msg_get(msg); /* retake a single ref */ list_add(&msg->list_head, &pool->msgs); pool->num++; diff --git a/src/kernel/super.c b/src/kernel/super.c index ce228deb50b8..9b50b5e1458a 100644 --- a/src/kernel/super.c +++ b/src/kernel/super.c @@ -29,6 +29,7 @@ static void ceph_dispatch(void *p, struct ceph_msg *msg); static void ceph_peer_reset(void *p, struct ceph_entity_addr *peer_addr, struct ceph_entity_name *peer_name); static struct ceph_msg *ceph_alloc_msg(void *p, struct ceph_msg_header *hdr); +static int ceph_alloc_middle(void *p, struct ceph_msg *msg); /* * find filename portion of a path (/foo/bar/baz -> baz) @@ -885,6 +886,7 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt, client->msgr->prepare_pages = ceph_osdc_prepare_pages; client->msgr->peer_reset = ceph_peer_reset; client->msgr->alloc_msg = ceph_alloc_msg; + client->msgr->alloc_middle = ceph_alloc_middle; } /* send mount request, and wait for mon, mds, and osd maps */ @@ -1031,6 +1033,30 @@ static struct ceph_msg *ceph_alloc_msg(void *p, struct ceph_msg_header *hdr) return msg; } +/* + * Allocate "middle" portion of a message, if it is needed and wasn't + * allocated by alloc_msg. This allows us to read a small fixed-size + * per-type header in the front and then gracefully fail (i.e., + * propagate the error to the caller based on info in the front) when + * the middle is too large. + */ +static int ceph_alloc_middle(void *p, struct ceph_msg *msg) +{ + int type = le32_to_cpu(msg->hdr.type); + int middle_len = le32_to_cpu(msg->hdr.middle_len); + + dout("alloc_middle %p type %d %s middle_len %d\n", msg, type, + ceph_msg_type_name(type), middle_len); + BUG_ON(!middle_len); + BUG_ON(msg->middle.iov_base); + + msg->middle.iov_base = __vmalloc(middle_len, GFP_NOFS, PAGE_KERNEL); + if (!msg->middle.iov_base) + return -ENOMEM; + return 0; +} + + /* * Process an incoming message. *