]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: add back msgpools
authorSage Weil <sage@newdream.net>
Fri, 18 Sep 2009 20:01:22 +0000 (13:01 -0700)
committerSage Weil <sage@newdream.net>
Fri, 18 Sep 2009 20:01:22 +0000 (13:01 -0700)
This reverts commit b985882aa27fea1d1ed92b09b6057be961c56c0a.

src/kernel/Makefile
src/kernel/import_patch_set_into_linux_git.sh
src/kernel/messenger.c
src/kernel/messenger.h
src/kernel/mon_client.c
src/kernel/super.c
src/kernel/super.h

index ba1e6a59f651a4b5528d2c9cf99ab94f29a42ea6..031b85f96f9f54df59747c9384ff8b7b3fcbe6f7 100644 (file)
@@ -8,7 +8,7 @@ obj-$(CONFIG_CEPH_FS) += ceph.o
 
 ceph-objs := super.o inode.o dir.o file.o addr.o ioctl.o \
        export.o caps.o snap.o \
-       messenger.o \
+       messenger.o msgpool.o \
        mds_client.o mdsmap.o \
        mon_client.o \
        osd_client.o osdmap.o crush/crush.o crush/mapper.o \
index 615a791f1fc1667fad22a9cacf1e159e2f7ba126..a2c09332fd6d66800e228d6afe309e41ac406b37 100755 (executable)
@@ -298,12 +298,24 @@ ceph: messenger library
 A generic message passing library is used to communicate with all
 other components in the Ceph file system.  The messenger library
 provides ordered, reliable delivery of messages between two nodes in
-the system.
+the system, or notifies the higher layer when it is unable to do so.
 
 This implementation is based on TCP.
 
 EOF
 
+git add $target/ceph/msgpool.h
+git add $target/ceph/msgpool.c
+git commit -s -F - <<EOF
+ceph: message pools
+
+The msgpool is a basic mempool_t-like structure to preallocate
+messages we expect to receive over the wire.  This ensures we have the
+necessary memory preallocated to process replies to requests, or to
+process unsolicited messages from various servers.
+
+EOF
+
 git add $target/ceph/export.c
 git commit -s -F - <<EOF
 ceph: nfs re-export support
index 9d51b6eaa7cd188375f371e564cf183b7404df6c..1c4d4e726ee03ffa8d8bb838f1d2ccbece70b13d 100644 (file)
@@ -1738,6 +1738,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
        m->front_max = front_len;
        m->front_is_vmalloc = false;
        m->more_to_follow = false;
+       m->pool = NULL;
 
        /* front */
        if (front_len) {
@@ -1859,6 +1860,9 @@ void ceph_msg_put(struct ceph_msg *m)
                m->nr_pages = 0;
                m->pages = NULL;
 
-               ceph_msg_kfree(m);
+               if (m->pool)
+                       ceph_msgpool_put(m->pool, m);
+               else
+                       ceph_msg_kfree(m);
        }
 }
index 574ccbbdd82e73ecf8f322a136c4c1a7fcb96cc8..1e6de3a301c3ea8b0a8b5a2c4d02d5a938eca8ac 100644 (file)
@@ -100,6 +100,8 @@ struct ceph_msg {
        bool front_is_vmalloc;
        bool more_to_follow;
        int front_max;
+
+       struct ceph_msg_pool *pool;
 };
 
 struct ceph_msg_pos {
index c461e21ac0c326f6e44e1def88aba374abc59270..2a9df9b00956a75ad5178fe8bf55e523e60d6786 100644 (file)
@@ -440,6 +440,11 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
        req.buf = buf;
        init_completion(&req.completion);
 
+       /* allocate memory for reply */
+       err = ceph_msgpool_resv(&monc->client->msgpool_statfs_reply, 1);
+       if (err)
+               return err;
+
        /* register request */
        mutex_lock(&monc->mutex);
        req.tid = ++monc->last_tid;
@@ -461,6 +466,7 @@ int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf)
        mutex_lock(&monc->mutex);
        radix_tree_delete(&monc->statfs_request_tree, req.tid);
        monc->num_statfs_requests--;
+       ceph_msgpool_resv(&monc->client->msgpool_statfs_reply, -1);
        mutex_unlock(&monc->mutex);
 
        if (!err)
@@ -602,6 +608,22 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
        ceph_msg_put(msg);
 }
 
+/*
+ * Allocate memory for incoming message
+ */
+static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
+                                     struct ceph_msg_header *hdr)
+{
+       struct ceph_mon_client *monc = con->private;
+       int type = le32_to_cpu(hdr->type);
+
+       switch (type) {
+       case CEPH_MSG_STATFS_REPLY:
+               return ceph_msgpool_get(&monc->client->msgpool_statfs_reply);
+       }
+       return ceph_alloc_msg(con, hdr);
+}
+
 /*
  * If the monitor connection resets, pick a new monitor and resubmit
  * any pending requests.
@@ -644,6 +666,6 @@ const static struct ceph_connection_operations mon_con_ops = {
        .put = ceph_con_put,
        .dispatch = dispatch,
        .fault = mon_fault,
-       .alloc_msg = ceph_alloc_msg,
+       .alloc_msg = mon_alloc_msg,
        .alloc_middle = ceph_alloc_middle,
 };
index 8390da62cbd4b6f81fad95b49624c0263bb10b70..6a2f93f91b8a7043b7be28d15c3c20dc27140bdc 100644 (file)
@@ -612,6 +612,13 @@ static struct ceph_client *ceph_create_client(void)
        if (client->trunc_wq == NULL)
                goto fail;
 
+       /* msg pools */
+       /* preallocated at request time: */
+       err = ceph_msgpool_init(&client->msgpool_statfs_reply,
+                               sizeof(struct ceph_mon_statfs_reply), 0, false);
+       if (err < 0)
+               goto fail;
+
        /* subsystems */
        err = ceph_monc_init(&client->monc, client);
        if (err < 0)
@@ -649,6 +656,9 @@ static void ceph_destroy_client(struct ceph_client *client)
        if (client->wb_pagevec_pool)
                mempool_destroy(client->wb_pagevec_pool);
 
+       /* msg pools */
+       ceph_msgpool_destroy(&client->msgpool_statfs_reply);
+
        release_mount_args(&client->mount_args);
 
        kfree(client);
index 6265f097a15082054dfa37237e6180c0083f4283..2e0560802a0a2cde78ba554bed41aa35989a40a1 100644 (file)
@@ -13,6 +13,7 @@
 #include "types.h"
 #include "ceph_debug.h"
 #include "messenger.h"
+#include "msgpool.h"
 #include "mon_client.h"
 #include "mds_client.h"
 #include "osd_client.h"
@@ -132,6 +133,9 @@ struct ceph_client {
        struct ceph_mds_client mdsc;
        struct ceph_osd_client osdc;
 
+       /* msg pools */
+       struct ceph_msg_pool msgpool_statfs_reply;
+
        /* writeback */
        mempool_t *wb_pagevec_pool;
        struct workqueue_struct *wb_wq;