con->out_msg->footer.flags = 0;
con->out_msg->footer.front_crc =
cpu_to_le32(crc32c(0, m->front.iov_base, m->front.iov_len));
+ con->out_msg->footer.middle_crc = 0;
con->out_msg->footer.data_crc = 0;
/* is there a data payload? */
dout("prepare_read_message %p\n", con);
BUG_ON(con->in_msg != NULL);
con->in_base_pos = 0;
- con->in_front_crc = con->in_data_crc = 0;
+ con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0;
return 0;
}
void *p;
int ret;
int to, want, left;
- unsigned front_len, data_len, data_off;
+ unsigned front_len, middle_len, data_len, data_off;
struct ceph_client *client = con->msgr->parent;
int datacrc = !ceph_test_opt(client, NOCRC);
front_len = le32_to_cpu(con->in_hdr.front_len);
if (front_len > CEPH_MSG_MAX_FRONT_LEN)
return -EIO;
+ middle_len = le32_to_cpu(con->in_hdr.middle_len);
+ if (middle_len > CEPH_MSG_MAX_DATA_LEN)
+ return -EIO;
data_len = le32_to_cpu(con->in_hdr.data_len);
if (data_len > CEPH_MSG_MAX_DATA_LEN)
return -EIO;
if (!con->in_msg) {
/* skip this message */
dout("alloc_msg returned NULL, skipping message\n");
- con->in_base_pos = -front_len - data_len -
+ con->in_base_pos = -front_len - middle_len - data_len -
sizeof(m->footer);
con->in_tag = CEPH_MSGR_TAG_READY;
return 0;
}
m = con->in_msg;
m->front.iov_len = 0; /* haven't read it yet */
+ m->middle.iov_len = 0; /* haven't read it yet */
memcpy(&m->hdr, &con->in_hdr, sizeof(con->in_hdr));
}
m->front.iov_len);
}
+ /* middle */
+ while (m->middle.iov_len < middle_len) {
+ if (m->middle.iov_base == NULL) {
+ BUG_ON(!con->msgr->alloc_middle);
+ ret = con->msgr->alloc_middle(con->msgr->parent, m);
+ if (ret < 0) {
+ dout("alloc_middle failed, skipping payload\n");
+ con->in_base_pos = -middle_len - data_len
+ - sizeof(m->footer);
+ ceph_msg_put(con->in_msg);
+ con->in_msg = NULL;
+ con->in_tag = CEPH_MSGR_TAG_READY;
+ return 0;
+ }
+ }
+ left = middle_len - m->middle.iov_len;
+ ret = ceph_tcp_recvmsg(con->sock, (char *)m->middle.iov_base +
+ m->middle.iov_len, left);
+ if (ret <= 0)
+ return ret;
+ m->middle.iov_len += ret;
+ if (m->middle.iov_len == middle_len)
+ con->in_middle_crc = crc32c(0, m->middle.iov_base,
+ m->middle.iov_len);
+ }
+
/* (page) data */
data_off = le16_to_cpu(m->hdr.data_off);
if (data_len == 0)
if (!m->pages) {
dout("pages revoked during msg read\n");
mutex_unlock(&m->page_mutex);
- con->in_base_pos = con->in_msg_pos.data_pos - data_len -
- sizeof(m->footer);
+ con->in_base_pos = middle_len - con->in_msg_pos.data_pos
+ - data_len - sizeof(m->footer);
ceph_msg_put(m);
con->in_msg = NULL;
con->in_tag = CEPH_MSGR_TAG_READY;
con->in_base_pos += ret;
}
dout("read_partial_message got msg %p\n", m);
- dout("got footer front crc %d data crc %d\n",
- m->footer.front_crc, m->footer.data_crc);
+ dout("got footer front crc %d middle crc %d data crc %d\n",
+ m->footer.front_crc, m->footer.middle_crc, m->footer.data_crc);
/* crc ok? */
if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
pr_err("ceph read_partial_message %p front crc %u != exp. %u\n",
- con->in_msg,
- con->in_front_crc, m->footer.front_crc);
+ con->in_msg, con->in_front_crc, m->footer.front_crc);
+ return -EBADMSG;
+ }
+ if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) {
+ pr_err("ceph read_partial_message %p middle crc %u != exp %u\n",
+ con->in_msg, con->in_middle_crc, m->footer.middle_crc);
return -EBADMSG;
}
if (datacrc &&
con->in_seq++;
spin_unlock(&con->out_queue_lock);
- dout("===== %p %llu from %s%d %d=%s len %d+%d (%u %u) =====\n",
+ dout("===== %p %llu from %s%d %d=%s len %d+%d (%u %u %u) =====\n",
con->in_msg, le64_to_cpu(con->in_msg->hdr.seq),
ENTITY_NAME(con->in_msg->hdr.src.name),
le16_to_cpu(con->in_msg->hdr.type),
ceph_msg_type_name(le16_to_cpu(con->in_msg->hdr.type)),
le32_to_cpu(con->in_msg->hdr.front_len),
le32_to_cpu(con->in_msg->hdr.data_len),
- con->in_front_crc, con->in_data_crc);
+ con->in_front_crc, con->in_middle_crc, con->in_data_crc);
con->msgr->dispatch(con->msgr->parent, con->in_msg);
con->in_msg = NULL;
prepare_read_tag(con);
m->hdr.type = cpu_to_le16(type);
m->hdr.front_len = cpu_to_le32(front_len);
+ m->hdr.middle_len = 0;
m->hdr.data_len = cpu_to_le32(page_len);
m->hdr.data_off = cpu_to_le16(page_off);
m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
m->hdr.mds_protocol = CEPH_MDS_PROTOCOL;
m->hdr.mdsc_protocol = CEPH_MDSC_PROTOCOL;
m->footer.front_crc = 0;
+ m->footer.middle_crc = 0;
m->footer.data_crc = 0;
m->front_max = front_len;
m->front_is_vmalloc = false;
}
m->front.iov_len = front_len;
- /* pages */
+ /* middle */
+ m->middle.iov_base = NULL;
+ m->middle.iov_len = 0;
+
+ /* data */
m->nr_pages = calc_pages_for(page_off, page_len);
m->pages = pages;
*/
void ceph_msg_kfree(struct ceph_msg *m)
{
+ dout("msg_kfree %p\n", m);
if (m->front_is_vmalloc)
vfree(m->front.iov_base);
else
kfree(m->front.iov_base);
+ if (m->middle.iov_base) {
+ dout("vfree %p\n", m->middle.iov_base);
+ vfree(m->middle.iov_base);
+ dout("vfree done\n");
+ }
kfree(m);
+ dout("msg_kfree %p done\n", m);
}
void ceph_msg_put(struct ceph_msg *m)
typedef struct ceph_msg * (*ceph_msgr_alloc_msg_t) (void *p,
struct ceph_msg_header *hdr);
+typedef int (*ceph_msgr_alloc_middle_t) (void *p, struct ceph_msg *msg);
static inline const char *ceph_name_type_str(int t)
ceph_msgr_peer_reset_t peer_reset;
ceph_msgr_prepare_pages_t prepare_pages;
ceph_msgr_alloc_msg_t alloc_msg;
+ ceph_msgr_alloc_middle_t alloc_middle;
struct ceph_entity_inst inst; /* my name+address */
struct ceph_msg {
struct ceph_msg_header hdr; /* header */
struct ceph_msg_footer footer; /* footer */
- struct kvec front; /* first bit of message */
+ struct kvec front, middle; /* unaligned blobs of message */
struct mutex page_mutex;
struct page **pages; /* data payload. NOT OWNER. */
unsigned nr_pages; /* size of page array */
struct ceph_msg_header in_hdr;
struct ceph_msg *in_msg;
struct ceph_msg_pos in_msg_pos;
- u32 in_front_crc, in_data_crc; /* calculated crc, for comparison
- message footer */
+ u32 in_front_crc, in_middle_crc, in_data_crc; /* calculated crc */
char in_tag; /* protocol control byte */
int in_base_pos; /* bytes read */
static void ceph_peer_reset(void *p, struct ceph_entity_addr *peer_addr,
struct ceph_entity_name *peer_name);
static struct ceph_msg *ceph_alloc_msg(void *p, struct ceph_msg_header *hdr);
+static int ceph_alloc_middle(void *p, struct ceph_msg *msg);
/*
* find filename portion of a path (/foo/bar/baz -> baz)
client->msgr->prepare_pages = ceph_osdc_prepare_pages;
client->msgr->peer_reset = ceph_peer_reset;
client->msgr->alloc_msg = ceph_alloc_msg;
+ client->msgr->alloc_middle = ceph_alloc_middle;
}
/* send mount request, and wait for mon, mds, and osd maps */
return msg;
}
+/*
+ * Allocate "middle" portion of a message, if it is needed and wasn't
+ * allocated by alloc_msg. This allows us to read a small fixed-size
+ * per-type header in the front and then gracefully fail (i.e.,
+ * propagate the error to the caller based on info in the front) when
+ * the middle is too large.
+ */
+static int ceph_alloc_middle(void *p, struct ceph_msg *msg)
+{
+ int type = le32_to_cpu(msg->hdr.type);
+ int middle_len = le32_to_cpu(msg->hdr.middle_len);
+
+ dout("alloc_middle %p type %d %s middle_len %d\n", msg, type,
+ ceph_msg_type_name(type), middle_len);
+ BUG_ON(!middle_len);
+ BUG_ON(msg->middle.iov_base);
+
+ msg->middle.iov_base = __vmalloc(middle_len, GFP_NOFS, PAGE_KERNEL);
+ if (!msg->middle.iov_base)
+ return -ENOMEM;
+ return 0;
+}
+
+
/*
* Process an incoming message.
*