front_len = le32_to_cpu(con->in_hdr.front_len);
if (front_len > CEPH_MSG_MAX_FRONT_LEN)
return -EIO;
+ data_len = le32_to_cpu(con->in_hdr.data_len);
+ if (data_len > CEPH_MSG_MAX_DATA_LEN)
+ return -EIO;
/* allocate message? */
if (!con->in_msg) {
con->in_hdr.front_len, con->in_hdr.data_len);
con->in_msg = con->msgr->alloc_msg(con->msgr->parent,
&con->in_hdr);
+ if (!con->in_msg) {
+ /* skip this message */
+ dout("alloc_msg returned NULL, skipping message\n");
+ con->in_base_pos = -front_len - data_len -
+ sizeof(m->footer);
+ con->in_tag = CEPH_MSGR_TAG_READY;
+ return 0;
+ }
if (IS_ERR(con->in_msg)) {
ret = PTR_ERR(con->in_msg);
con->in_msg = NULL;
}
/* (page) data */
- data_len = le32_to_cpu(m->hdr.data_len);
- if (data_len > CEPH_MSG_MAX_DATA_LEN)
- return -EIO;
-
data_off = le16_to_cpu(m->hdr.data_off);
if (data_len == 0)
goto no_data;
return ret;
}
-
/*
* construct a new message with given type, size
* the new msg has a ref count of 1.
m->hdr.mdsc_protocol = CEPH_MDSC_PROTOCOL;
m->footer.front_crc = 0;
m->footer.data_crc = 0;
+ m->front_max = front_len;
m->front_is_vmalloc = false;
m->more_to_follow = false;
+ m->pool = NULL;
/* front */
if (front_len) {
return ERR_PTR(-ENOMEM);
}
+/*
+ * Free a generically kmalloc'd message.
+ */
+void ceph_msg_kfree(struct ceph_msg *m)
+{
+ if (m->front_is_vmalloc)
+ vfree(m->front.iov_base);
+ else
+ kfree(m->front.iov_base);
+ kfree(m);
+}
+
void ceph_msg_put(struct ceph_msg *m)
{
dout("ceph_msg_put %p %d -> %d\n", m, atomic_read(&m->nref),
if (atomic_dec_and_test(&m->nref)) {
dout("ceph_msg_put last one on %p\n", m);
WARN_ON(!list_empty(&m->list_head));
- if (m->front_is_vmalloc)
- vfree(m->front.iov_base);
+ if (m->pool)
+ ceph_msgpool_put(m->pool, m);
else
- kfree(m->front.iov_base);
- kfree(m);
+ ceph_msg_kfree(m);
}
}
m->hdr.dst.addr = *addr;
ceph_msg_send(msgr, m, BASE_DELAY_INTERVAL);
}
+
--- /dev/null
+
+#include <linux/err.h>
+#include <linux/sched.h>
+#include <linux/types.h>
+
+#include "ceph_debug.h"
+#include "msgpool.h"
+
+/*
+ * We use msg pools to preallocate memory for messages we expect to
+ * receive over the wire, to avoid getting ourselves into OOM
+ * conditions at unexpected times. We take use a few different
+ * strategies:
+ *
+ * - for request/response type interactions, we preallocate the
+ * memory needed for the response when we generate the request.
+ *
+ * - for messages we can receive at any time from the MDS, we preallocate
+ * a pool of messages we can re-use.
+ *
+ * - for writeback, we preallocate some number of messages to use for
+ * requests and their replies, so that we always make forward
+ * progress.
+ *
+ * The msgpool behaves like a mempool_t, but keeps preallocated
+ * ceph_msgs strung together on a list_head instead of using a pointer
+ * vector. This avoids vector reallocation when we adjust the number
+ * of preallocated items (which happens frequently).
+ */
+
+
+/*
+ * Allocate or release as necessary to meet our target pool size.
+ */
+static int __fill_msgpool(struct ceph_msg_pool *pool)
+{
+ struct ceph_msg *msg;
+
+ while (pool->num < pool->min) {
+ dout("fill_msgpool %p %d/%d allocating\n", pool, pool->num,
+ pool->min);
+ spin_unlock(&pool->lock);
+ msg = ceph_msg_new(0, pool->front_len, 0, 0, NULL);
+ spin_lock(&pool->lock);
+ if (IS_ERR(msg))
+ return PTR_ERR(msg);
+ msg->pool = pool;
+ list_add(&msg->list_head, &pool->msgs);
+ pool->num++;
+ }
+ while (pool->num > pool->min) {
+ msg = list_first_entry(&pool->msgs, struct ceph_msg, list_head);
+ dout("fill_msgpool %p %d/%d releasing %p\n", pool, pool->num,
+ pool->min, msg);
+ list_del_init(&msg->list_head);
+ pool->num--;
+ ceph_msg_kfree(msg);
+ }
+ return 0;
+}
+
+int ceph_msgpool_init(struct ceph_msg_pool *pool,
+ int front_len, int min)
+{
+ int ret;
+
+ dout("msgpool_init %p front_len %d min %d\n", pool, front_len, min);
+ spin_lock_init(&pool->lock);
+ pool->front_len = front_len;
+ INIT_LIST_HEAD(&pool->msgs);
+ pool->num = 0;
+ pool->min = min;
+ init_waitqueue_head(&pool->wait);
+
+ spin_lock(&pool->lock);
+ ret = __fill_msgpool(pool);
+ spin_unlock(&pool->lock);
+ return ret;
+}
+
+void ceph_msgpool_destroy(struct ceph_msg_pool *pool)
+{
+ spin_lock(&pool->lock);
+ pool->min = 0;
+ __fill_msgpool(pool);
+ spin_unlock(&pool->lock);
+}
+
+int ceph_msgpool_resv(struct ceph_msg_pool *pool, int delta)
+{
+ int ret;
+
+ spin_lock(&pool->lock);
+ dout("msgpool_resv %p delta %d\n", pool, delta);
+ pool->min += delta;
+ ret = __fill_msgpool(pool);
+ spin_unlock(&pool->lock);
+ return ret;
+}
+
+struct ceph_msg *ceph_msgpool_get(struct ceph_msg_pool *pool)
+{
+ wait_queue_t wait;
+
+ while (1) {
+ spin_lock(&pool->lock);
+ if (likely(pool->num)) {
+ struct ceph_msg *msg =
+ list_entry(pool->msgs.next, struct ceph_msg,
+ list_head);
+ list_del_init(&msg->list_head);
+ pool->num--;
+ dout("msgpool_get %p got %p, now %d/%d\n", pool, msg,
+ pool->num, pool->min);
+ spin_unlock(&pool->lock);
+ return msg;
+ }
+ spin_unlock(&pool->lock);
+
+ dout("msgpool_get %p now %d/%d, waiting\n", pool,
+ pool->num, pool->min);
+ init_wait(&wait);
+ prepare_to_wait(&pool->wait, &wait, TASK_UNINTERRUPTIBLE);
+ schedule();
+ finish_wait(&pool->wait, &wait);
+ }
+}
+
+void ceph_msgpool_put(struct ceph_msg_pool *pool, struct ceph_msg *msg)
+{
+ spin_lock(&pool->lock);
+ if (pool->num < pool->min) {
+ ceph_msg_get(msg); /* retake a single ref */
+ list_add(&msg->list_head, &pool->msgs);
+ pool->num++;
+ dout("msgpool_put %p reclaim %p, now %d/%d\n", pool, msg,
+ pool->num, pool->min);
+ spin_unlock(&pool->lock);
+ wake_up(&pool->wait);
+ } else {
+ dout("msgpool_put %p drop %p, at %d/%d\n", pool, msg,
+ pool->num, pool->min);
+ spin_unlock(&pool->lock);
+ ceph_msg_kfree(msg);
+ }
+}
--- /dev/null
+#ifndef _FS_CEPH_MSGPOOL
+#define _FS_CEPH_MSGPOOL
+
+#include "messenger.h"
+
+/*
+ * we use memory pools for preallocating messages we may receive, to
+ * avoid unexpected OOM conditions.
+ */
+struct ceph_msg_pool {
+ spinlock_t lock;
+ int front_len; /* preallocated payload size */
+ struct list_head msgs; /* msgs in the pool; each has 1 ref */
+ int num, min; /* cur, min # msgs in the pool */
+ wait_queue_head_t wait;
+};
+
+extern int ceph_msgpool_init(struct ceph_msg_pool *pool,
+ int front_len, int size);
+extern void ceph_msgpool_destroy(struct ceph_msg_pool *pool);
+extern int ceph_msgpool_resv(struct ceph_msg_pool *, int delta);
+extern struct ceph_msg *ceph_msgpool_get(struct ceph_msg_pool *);
+extern void ceph_msgpool_put(struct ceph_msg_pool *, struct ceph_msg *);
+
+#endif