]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: use msgpools for osd requests, replies
authorSage Weil <sage@newdream.net>
Fri, 18 Sep 2009 21:54:04 +0000 (14:54 -0700)
committerSage Weil <sage@newdream.net>
Fri, 18 Sep 2009 21:54:04 +0000 (14:54 -0700)
src/kernel/addr.c
src/kernel/file.c
src/kernel/osd_client.c
src/kernel/osd_client.h

index 5c337faa967df577cf34294952d723968154c090..3f8c4b30f0d51dba0f5a6fc04939a0b00c610103 100644 (file)
@@ -784,7 +784,7 @@ get_more_pages:
                                            snapc, do_sync,
                                            ci->i_truncate_seq,
                                            ci->i_truncate_size,
-                                           &inode->i_mtime, true);
+                                           &inode->i_mtime, true, 1);
                                max_pages = req->r_num_pages;
 
                                alloc_page_vec(client, req);
index bc92c84b27ee94a18a4a75a7cad7e296fe60f264..aa4f12c35fadf7e7e343f3ee76dbed598cced526 100644 (file)
@@ -650,7 +650,7 @@ more:
                                    ci->i_snap_realm->cached_context,
                                    do_sync,
                                    ci->i_truncate_seq, ci->i_truncate_size,
-                                   &mtime, false);
+                                   &mtime, false, 2);
        if (IS_ERR(req))
                return PTR_ERR(req);
 
index 013646d80e10730ab13051b3c5dc9368466b2b04..b19b5216b89bb128db2e7f60e74b6536639ed2f1 100644 (file)
@@ -117,7 +117,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
                                               u32 truncate_seq,
                                               u64 truncate_size,
                                               struct timespec *mtime,
-                                              bool use_mempool)
+                                              bool use_mempool, int num_reply)
 {
        struct ceph_osd_request *req;
        struct ceph_msg *msg;
@@ -127,7 +127,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        int do_trunc = truncate_seq && (off + *plen > truncate_size);
        int num_op = 1 + do_sync + do_trunc;
        size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
-       int i;
+       int err, i;
        u64 prevofs;
 
        if (use_mempool) {
@@ -139,6 +139,12 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        if (req == NULL)
                return ERR_PTR(-ENOMEM);
 
+       err = ceph_msgpool_resv(&osdc->msgpool_op_reply, num_reply);
+       if (err) {
+               ceph_osdc_put_request(req);
+               return ERR_PTR(-ENOMEM);
+       }
+
        req->r_osdc = osdc;
        req->r_mempool = use_mempool;
        atomic_set(&req->r_ref, 1);
@@ -153,11 +159,16 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        msg_size += 40;
        if (snapc)
                msg_size += sizeof(u64) * snapc->num_snaps;
-       msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL);
+       if (use_mempool)
+               msg = ceph_msgpool_get(&osdc->msgpool_op);
+       else
+               msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL);
        if (IS_ERR(msg)) {
-               kfree(req);
+               ceph_msgpool_resv(&osdc->msgpool_op_reply, num_reply);
+               ceph_osdc_put_request(req);
                return ERR_PTR(PTR_ERR(msg));
        }
+       msg->hdr.type = cpu_to_le32(CEPH_MSG_OSD_OP);
        memset(msg->front.iov_base, 0, msg->front.iov_len);
        head = msg->front.iov_base;
        op = (void *)(head + 1);
@@ -1087,6 +1098,8 @@ void ceph_osdc_sync(struct ceph_osd_client *osdc)
  */
 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
 {
+       int err;
+
        dout("init\n");
        osdc->client = client;
        osdc->osdmap = NULL;
@@ -1105,6 +1118,14 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
                                        sizeof(struct ceph_osd_request));
        if (!osdc->req_mempool)
                return -ENOMEM;
+
+       err = ceph_msgpool_init(&osdc->msgpool_op, 4096, 10, true);
+       if (err < 0)
+               return -ENOMEM;
+       err = ceph_msgpool_init(&osdc->msgpool_op_reply, 512, 0, false);
+       if (err < 0)
+               return -ENOMEM;
+
        return 0;
 }
 
@@ -1116,6 +1137,8 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
                osdc->osdmap = NULL;
        }
        mempool_destroy(osdc->req_mempool);
+       ceph_msgpool_destroy(&osdc->msgpool_op);
+       ceph_msgpool_destroy(&osdc->msgpool_op_reply);
 }
 
 /*
@@ -1136,7 +1159,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
        req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
                                    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
                                    NULL, 0, truncate_seq, truncate_size, NULL,
-                                   false);
+                                   false, 1);
        if (IS_ERR(req))
                return PTR_ERR(req);
 
@@ -1179,7 +1202,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
                                            CEPH_OSD_FLAG_WRITE,
                                    snapc, do_sync,
                                    truncate_seq, truncate_size, mtime,
-                                   nofail);
+                                   nofail, 1);
        if (IS_ERR(req))
                return PTR_ERR(req);
 
@@ -1227,6 +1250,20 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
        ceph_msg_put(msg);
 }
 
+static struct ceph_msg *alloc_msg(struct ceph_connection *con,
+                                 struct ceph_msg_header *hdr)
+{
+       struct ceph_osd *osd = con->private;
+       struct ceph_osd_client *osdc = osd->o_osdc;
+       int type = le32_to_cpu(hdr->type);
+
+       switch (type) {
+       case CEPH_MSG_OSD_OPREPLY:
+               return ceph_msgpool_get(&osdc->msgpool_op_reply);
+       }
+       return ceph_alloc_msg(con, hdr);
+}
+
 /*
  * Wrappers to refcount containing ceph_osd struct
  */
@@ -1248,8 +1285,8 @@ const static struct ceph_connection_operations osd_con_ops = {
        .get = get_osd_con,
        .put = put_osd_con,
        .dispatch = dispatch,
+       .alloc_msg = alloc_msg,
        .peer_reset = osd_reset,
-       .alloc_msg = ceph_alloc_msg,
        .alloc_middle = ceph_alloc_middle,
        .prepare_pages = prepare_pages,
 };
index 89b5cf151a94499fa7767e5ebdecc84f885dd6ab..84f392c891eec4c87b1de11f76219875b8e2137f 100644 (file)
@@ -86,6 +86,9 @@ struct ceph_osd_client {
        struct dentry          *debugfs_file;
 
        mempool_t              *req_mempool;
+
+       struct ceph_msg_pool   msgpool_op;
+       struct ceph_msg_pool   msgpool_op_reply;
 };
 
 extern int ceph_osdc_init(struct ceph_osd_client *osdc,
@@ -105,7 +108,7 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
                                      int do_sync, u32 truncate_seq,
                                      u64 truncate_size,
                                      struct timespec *mtime,
-                                     bool use_mempool);
+                                     bool use_mempool, int num_reply);
 
 static inline void ceph_osdc_get_request(struct ceph_osd_request *req)
 {