]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: add msgpools
authorSage Weil <sage@newdream.net>
Mon, 17 Aug 2009 19:36:47 +0000 (12:36 -0700)
committerSage Weil <sage@newdream.net>
Mon, 17 Aug 2009 19:36:47 +0000 (12:36 -0700)
src/Makefile.am
src/kernel/Makefile
src/kernel/import_patch_set_into_linux_git.sh
src/kernel/messenger.c
src/kernel/messenger.h
src/kernel/msgpool.c [new file with mode: 0644]
src/kernel/msgpool.h [new file with mode: 0644]

index 658a62992aa27fb925cd03cba456783f4830056a..d446d59ba9035cd5566bf7dd7ee5a2c88f62d752 100644 (file)
@@ -476,6 +476,8 @@ noinst_HEADERS = \
        kernel/messenger.h\
        kernel/mon_client.c\
        kernel/mon_client.h\
+       kernel/msgpool.c\
+       kernel/msgpool.h\
        kernel/msgr.h\
        kernel/osd_client.c\
        kernel/osd_client.h\
index ba1e6a59f651a4b5528d2c9cf99ab94f29a42ea6..031b85f96f9f54df59747c9384ff8b7b3fcbe6f7 100644 (file)
@@ -8,7 +8,7 @@ obj-$(CONFIG_CEPH_FS) += ceph.o
 
 ceph-objs := super.o inode.o dir.o file.o addr.o ioctl.o \
        export.o caps.o snap.o \
-       messenger.o \
+       messenger.o msgpool.o \
        mds_client.o mdsmap.o \
        mon_client.o \
        osd_client.o osdmap.o crush/crush.o crush/mapper.o \
index a134f3dc4b0ffc939d47df15fc1c36fca4342ac4..5797e138e7c656f13e8ff4edba2e71617eaa2d6a 100755 (executable)
@@ -293,6 +293,18 @@ This implementation is based on TCP.
 
 EOF
 
+git add $target/ceph/msgpool.h
+git add $target/ceph/msgpool.c
+git commit -s -F - <<EOF
+ceph: message pools
+
+The msgpool is a basic mempool_t-like structure to preallocate
+messages we expect to receive over the wire.  This ensures we have the
+necessary memory preallocated to process replies to requests, or to
+process unsolicited messages from various servers.
+
+EOF
+
 git add $target/ceph/export.c
 git commit -s -F - <<EOF
 ceph: nfs re-export support
index c51220d8b8c7acead6a51433eaab7a883fd0c1e2..98303198e10ee4e71083a7f723e44503ca68e886 100644 (file)
@@ -1419,6 +1419,9 @@ static int read_partial_message(struct ceph_connection *con)
        front_len = le32_to_cpu(con->in_hdr.front_len);
        if (front_len > CEPH_MSG_MAX_FRONT_LEN)
                return -EIO;
+       data_len = le32_to_cpu(con->in_hdr.data_len);
+       if (data_len > CEPH_MSG_MAX_DATA_LEN)
+               return -EIO;
 
        /* allocate message? */
        if (!con->in_msg) {
@@ -1426,6 +1429,14 @@ static int read_partial_message(struct ceph_connection *con)
                     con->in_hdr.front_len, con->in_hdr.data_len);
                con->in_msg = con->msgr->alloc_msg(con->msgr->parent,
                                                   &con->in_hdr);
+               if (!con->in_msg) {
+                       /* skip this message */
+                       dout("alloc_msg returned NULL, skipping message\n");
+                       con->in_base_pos = -front_len - data_len -
+                               sizeof(m->footer);
+                       con->in_tag = CEPH_MSGR_TAG_READY;
+                       return 0;
+               }
                if (IS_ERR(con->in_msg)) {
                        ret = PTR_ERR(con->in_msg);
                        con->in_msg = NULL;
@@ -1452,10 +1463,6 @@ static int read_partial_message(struct ceph_connection *con)
        }
 
        /* (page) data */
-       data_len = le32_to_cpu(m->hdr.data_len);
-       if (data_len > CEPH_MSG_MAX_DATA_LEN)
-               return -EIO;
-
        data_off = le16_to_cpu(m->hdr.data_off);
        if (data_len == 0)
                goto no_data;
@@ -2249,7 +2256,6 @@ int ceph_msg_send(struct ceph_messenger *msgr, struct ceph_msg *msg,
        return ret;
 }
 
-
 /*
  * construct a new message with given type, size
  * the new msg has a ref count of 1.
@@ -2279,8 +2285,10 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
        m->hdr.mdsc_protocol = CEPH_MDSC_PROTOCOL;
        m->footer.front_crc = 0;
        m->footer.data_crc = 0;
+       m->front_max = front_len;
        m->front_is_vmalloc = false;
        m->more_to_follow = false;
+       m->pool = NULL;
 
        /* front */
        if (front_len) {
@@ -2316,6 +2324,18 @@ out:
        return ERR_PTR(-ENOMEM);
 }
 
+/*
+ * Free a generically kmalloc'd message.
+ */
+void ceph_msg_kfree(struct ceph_msg *m)
+{
+       if (m->front_is_vmalloc)
+               vfree(m->front.iov_base);
+       else
+               kfree(m->front.iov_base);
+       kfree(m);
+}
+
 void ceph_msg_put(struct ceph_msg *m)
 {
        dout("ceph_msg_put %p %d -> %d\n", m, atomic_read(&m->nref),
@@ -2334,11 +2354,10 @@ void ceph_msg_put(struct ceph_msg *m)
        if (atomic_dec_and_test(&m->nref)) {
                dout("ceph_msg_put last one on %p\n", m);
                WARN_ON(!list_empty(&m->list_head));
-               if (m->front_is_vmalloc)
-                       vfree(m->front.iov_base);
+               if (m->pool)
+                       ceph_msgpool_put(m->pool, m);
                else
-                       kfree(m->front.iov_base);
-               kfree(m);
+                       ceph_msg_kfree(m);
        }
 }
 
@@ -2358,3 +2377,4 @@ void ceph_ping(struct ceph_messenger *msgr, struct ceph_entity_name name,
        m->hdr.dst.addr = *addr;
        ceph_msg_send(msgr, m, BASE_DELAY_INTERVAL);
 }
+
index 974dcaff2b815e70de2ad10c989f6c8d9a6d89fe..188b1da3ec6de22fea5e9134a1347e22ba0766d9 100644 (file)
@@ -39,6 +39,7 @@ typedef void (*ceph_msgr_peer_reset_t) (void *p, struct ceph_entity_addr *addr,
 typedef struct ceph_msg * (*ceph_msgr_alloc_msg_t) (void *p,
                                            struct ceph_msg_header *hdr);
 
+
 static inline const char *ceph_name_type_str(int t)
 {
        switch (t) {
@@ -102,6 +103,9 @@ struct ceph_msg {
        atomic_t nref;
        bool front_is_vmalloc;
        bool more_to_follow;
+       int front_max;
+
+       struct ceph_msg_pool *pool;
 };
 
 struct ceph_msg_pos {
@@ -221,6 +225,7 @@ extern void ceph_messenger_mark_down(struct ceph_messenger *msgr,
 extern struct ceph_msg *ceph_msg_new(int type, int front_len,
                                     int page_len, int page_off,
                                     struct page **pages);
+extern void ceph_msg_kfree(struct ceph_msg *m);
 
 static inline struct ceph_msg *ceph_msg_get(struct ceph_msg *msg)
 {
diff --git a/src/kernel/msgpool.c b/src/kernel/msgpool.c
new file mode 100644 (file)
index 0000000..80095a7
--- /dev/null
@@ -0,0 +1,146 @@
+
+#include <linux/err.h>
+#include <linux/sched.h>
+#include <linux/types.h>
+
+#include "ceph_debug.h"
+#include "msgpool.h"
+
+/*
+ * We use msg pools to preallocate memory for messages we expect to
+ * receive over the wire, to avoid getting ourselves into OOM
+ * conditions at unexpected times.  We take use a few different
+ * strategies:
+ *
+ *  - for request/response type interactions, we preallocate the
+ * memory needed for the response when we generate the request.
+ *
+ *  - for messages we can receive at any time from the MDS, we preallocate
+ * a pool of messages we can re-use.
+ *
+ *  - for writeback, we preallocate some number of messages to use for
+ * requests and their replies, so that we always make forward
+ * progress.
+ *
+ * The msgpool behaves like a mempool_t, but keeps preallocated
+ * ceph_msgs strung together on a list_head instead of using a pointer
+ * vector.  This avoids vector reallocation when we adjust the number
+ * of preallocated items (which happens frequently).
+ */
+
+
+/*
+ * Allocate or release as necessary to meet our target pool size.
+ */
+static int __fill_msgpool(struct ceph_msg_pool *pool)
+{
+       struct ceph_msg *msg;
+
+       while (pool->num < pool->min) {
+               dout("fill_msgpool %p %d/%d allocating\n", pool, pool->num,
+                    pool->min);
+               spin_unlock(&pool->lock);
+               msg = ceph_msg_new(0, pool->front_len, 0, 0, NULL);
+               spin_lock(&pool->lock);
+               if (IS_ERR(msg))
+                       return PTR_ERR(msg);
+               msg->pool = pool;
+               list_add(&msg->list_head, &pool->msgs);
+               pool->num++;
+       }
+       while (pool->num > pool->min) {
+               msg = list_first_entry(&pool->msgs, struct ceph_msg, list_head);
+               dout("fill_msgpool %p %d/%d releasing %p\n", pool, pool->num,
+                    pool->min, msg);
+               list_del_init(&msg->list_head);
+               pool->num--;
+               ceph_msg_kfree(msg);
+       }
+       return 0;
+}
+
+int ceph_msgpool_init(struct ceph_msg_pool *pool,
+                     int front_len, int min)
+{
+       int ret;
+
+       dout("msgpool_init %p front_len %d min %d\n", pool, front_len, min);
+       spin_lock_init(&pool->lock);
+       pool->front_len = front_len;
+       INIT_LIST_HEAD(&pool->msgs);
+       pool->num = 0;
+       pool->min = min;
+       init_waitqueue_head(&pool->wait);
+
+       spin_lock(&pool->lock);
+       ret = __fill_msgpool(pool);
+       spin_unlock(&pool->lock);
+       return ret;
+}
+
+void ceph_msgpool_destroy(struct ceph_msg_pool *pool)
+{
+       spin_lock(&pool->lock);
+       pool->min = 0;
+       __fill_msgpool(pool);
+       spin_unlock(&pool->lock);
+}
+
+int ceph_msgpool_resv(struct ceph_msg_pool *pool, int delta)
+{
+       int ret;
+
+       spin_lock(&pool->lock);
+       dout("msgpool_resv %p delta %d\n", pool, delta);
+       pool->min += delta;
+       ret = __fill_msgpool(pool);
+       spin_unlock(&pool->lock);
+       return ret;
+}
+
+struct ceph_msg *ceph_msgpool_get(struct ceph_msg_pool *pool)
+{
+       wait_queue_t wait;
+
+       while (1) {
+               spin_lock(&pool->lock);
+               if (likely(pool->num)) {
+                       struct ceph_msg *msg =
+                               list_entry(pool->msgs.next, struct ceph_msg,
+                                          list_head);
+                       list_del_init(&msg->list_head);
+                       pool->num--;
+                       dout("msgpool_get %p got %p, now %d/%d\n", pool, msg,
+                            pool->num, pool->min);
+                       spin_unlock(&pool->lock);
+                       return msg;
+               }
+               spin_unlock(&pool->lock);
+
+               dout("msgpool_get %p now %d/%d, waiting\n", pool,
+                    pool->num, pool->min);
+               init_wait(&wait);
+               prepare_to_wait(&pool->wait, &wait, TASK_UNINTERRUPTIBLE);
+               schedule();
+               finish_wait(&pool->wait, &wait);
+       }
+}
+
+void ceph_msgpool_put(struct ceph_msg_pool *pool, struct ceph_msg *msg)
+{
+       spin_lock(&pool->lock);
+       if (pool->num < pool->min) {
+               ceph_msg_get(msg);   /* retake a single ref */
+               list_add(&msg->list_head, &pool->msgs);
+               pool->num++;
+               dout("msgpool_put %p reclaim %p, now %d/%d\n", pool, msg,
+                    pool->num, pool->min);
+               spin_unlock(&pool->lock);
+               wake_up(&pool->wait);
+       } else {
+               dout("msgpool_put %p drop %p, at %d/%d\n", pool, msg,
+                    pool->num, pool->min);
+               spin_unlock(&pool->lock);
+               ceph_msg_kfree(msg);
+       }
+}
diff --git a/src/kernel/msgpool.h b/src/kernel/msgpool.h
new file mode 100644 (file)
index 0000000..cfe97c2
--- /dev/null
@@ -0,0 +1,25 @@
+#ifndef _FS_CEPH_MSGPOOL
+#define _FS_CEPH_MSGPOOL
+
+#include "messenger.h"
+
+/*
+ * we use memory pools for preallocating messages we may receive, to
+ * avoid unexpected OOM conditions.
+ */
+struct ceph_msg_pool {
+       spinlock_t lock;
+       int front_len;          /* preallocated payload size */
+       struct list_head msgs;  /* msgs in the pool; each has 1 ref */
+       int num, min;           /* cur, min # msgs in the pool */
+       wait_queue_head_t wait;
+};
+
+extern int ceph_msgpool_init(struct ceph_msg_pool *pool,
+                            int front_len, int size);
+extern void ceph_msgpool_destroy(struct ceph_msg_pool *pool);
+extern int ceph_msgpool_resv(struct ceph_msg_pool *, int delta);
+extern struct ceph_msg *ceph_msgpool_get(struct ceph_msg_pool *);
+extern void ceph_msgpool_put(struct ceph_msg_pool *, struct ceph_msg *);
+
+#endif