ceph-objs := super.o inode.o dir.o file.o addr.o ioctl.o \
export.o caps.o snap.o \
- messenger.o \
+ messenger.o msgpool.o \
mds_client.o mdsmap.o \
mon_client.o \
osd_client.o osdmap.o crush/crush.o crush/mapper.o \
A generic message passing library is used to communicate with all
other components in the Ceph file system. The messenger library
provides ordered, reliable delivery of messages between two nodes in
-the system.
+the system, or notifies the higher layer when it is unable to do so.
This implementation is based on TCP.
EOF
+git add $target/ceph/msgpool.h
+git add $target/ceph/msgpool.c
+git commit -s -F - <<EOF
+ceph: message pools
+
+The msgpool is a basic mempool_t-like structure to preallocate
+messages we expect to receive over the wire. This ensures we have the
+necessary memory preallocated to process replies to requests, or to
+process unsolicited messages from various servers.
+
+EOF
+
git add $target/ceph/export.c
git commit -s -F - <<EOF
ceph: nfs re-export support
m->front_max = front_len;
m->front_is_vmalloc = false;
m->more_to_follow = false;
+ m->pool = NULL;
/* front */
if (front_len) {
m->nr_pages = 0;
m->pages = NULL;
- ceph_msg_kfree(m);
+ if (m->pool)
+ ceph_msgpool_put(m->pool, m);
+ else
+ ceph_msg_kfree(m);
}
}
bool front_is_vmalloc;
bool more_to_follow;
int front_max;
+
+ struct ceph_msg_pool *pool;
};
struct ceph_msg_pos {
req.buf = buf;
init_completion(&req.completion);
+ /* allocate memory for reply */
+ err = ceph_msgpool_resv(&monc->client->msgpool_statfs_reply, 1);
+ if (err)
+ return err;
+
/* register request */
mutex_lock(&monc->mutex);
req.tid = ++monc->last_tid;
mutex_lock(&monc->mutex);
radix_tree_delete(&monc->statfs_request_tree, req.tid);
monc->num_statfs_requests--;
+ ceph_msgpool_resv(&monc->client->msgpool_statfs_reply, -1);
mutex_unlock(&monc->mutex);
if (!err)
ceph_msg_put(msg);
}
+/*
+ * Allocate memory for incoming message
+ */
+static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
+ struct ceph_msg_header *hdr)
+{
+ struct ceph_mon_client *monc = con->private;
+ int type = le32_to_cpu(hdr->type);
+
+ switch (type) {
+ case CEPH_MSG_STATFS_REPLY:
+ return ceph_msgpool_get(&monc->client->msgpool_statfs_reply);
+ }
+ return ceph_alloc_msg(con, hdr);
+}
+
/*
* If the monitor connection resets, pick a new monitor and resubmit
* any pending requests.
.put = ceph_con_put,
.dispatch = dispatch,
.fault = mon_fault,
- .alloc_msg = ceph_alloc_msg,
+ .alloc_msg = mon_alloc_msg,
.alloc_middle = ceph_alloc_middle,
};
if (client->trunc_wq == NULL)
goto fail;
+ /* msg pools */
+ /* preallocated at request time: */
+ err = ceph_msgpool_init(&client->msgpool_statfs_reply,
+ sizeof(struct ceph_mon_statfs_reply), 0, false);
+ if (err < 0)
+ goto fail;
+
/* subsystems */
err = ceph_monc_init(&client->monc, client);
if (err < 0)
if (client->wb_pagevec_pool)
mempool_destroy(client->wb_pagevec_pool);
+ /* msg pools */
+ ceph_msgpool_destroy(&client->msgpool_statfs_reply);
+
release_mount_args(&client->mount_args);
kfree(client);
#include "types.h"
#include "ceph_debug.h"
#include "messenger.h"
+#include "msgpool.h"
#include "mon_client.h"
#include "mds_client.h"
#include "osd_client.h"
struct ceph_mds_client mdsc;
struct ceph_osd_client osdc;
+ /* msg pools */
+ struct ceph_msg_pool msgpool_statfs_reply;
+
/* writeback */
mempool_t *wb_pagevec_pool;
struct workqueue_struct *wb_wq;