]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-client.git/commitdiff
ceph: support multiple class method calls in one ceph_msg historic/wip-osd-op-cls-chain
authorDouglas Fuller <dfuller@redhat.com>
Fri, 17 Apr 2015 20:17:25 +0000 (13:17 -0700)
committerDouglas Fuller <douglas.fuller@gmail.com>
Thu, 23 Apr 2015 18:14:55 +0000 (11:14 -0700)
Messages are received from the wire directly into destination buffers.
A short read could result in corruption of the following destination
buffers. Allocate a single message buffer for all class method calls
and split them at the osd_client level. This only applies to ceph_msgs
containing multiple op call and may break support for ceph_msgs
containing a mix of class method calls that return data and other ops.

Signed-off-by: Douglas Fuller <dfuller@redhat.com>
include/linux/ceph/osd_client.h
net/ceph/messenger.c
net/ceph/osd_client.c

index 61b19c46bdb33d5fc2f4752df0c345350fa739e8..65fcf8015f0bee30dba0d97648be1c10388bd105 100644 (file)
@@ -99,6 +99,7 @@ struct ceph_osd_req_op {
                        struct ceph_osd_data request_info;
                        struct ceph_osd_data request_data;
                        struct ceph_osd_data response_data;
+                       struct ceph_osd_data chain_data;
                        __u8 class_len;
                        __u8 method_len;
                        __u8 argc;
index 967080a9f0436e8575663909572f063ac60440fe..ec04be442beafb684f7022f787db58842e4127ad 100644 (file)
@@ -907,6 +907,10 @@ static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor *cursor,
        BUG_ON(!data->pages);
        BUG_ON(!data->length);
 
+       /*
+        * bug here if a short read occurs and length < data->length; see
+        * http://tracker.ceph.com/issues/11424
+        */
        cursor->resid = min(length, data->length);
        page_count = calc_pages_for(data->alignment, (u64)data->length);
        cursor->page_offset = data->alignment & ~PAGE_MASK;
index 41a4abc7e98eebfd36487d6d381f680732d4cd68..3999dfabaa1f012294dbd66a32c6424d39c82c3d 100644 (file)
@@ -301,6 +301,73 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
        }
 }
 
+static int __build_op_cls_chain(struct ceph_osd_request *osd_req)
+{
+       u64 chain_length = 0;
+       u32 chain_pagecount = 0;
+       struct ceph_osd_req_op *op = NULL;
+       struct ceph_osd_data *osd_data;
+       struct ceph_osd_data *chain_data;
+       struct page **pages;
+       int i;
+
+       chain_data = osd_req_op_data(osd_req, 0, cls, chain_data);
+
+       for (i=0; i<osd_req->r_num_ops; i++)
+       {
+               op = &osd_req->r_ops[i];
+               if (op->op != CEPH_OSD_OP_CALL)
+                       break;
+
+               osd_data = osd_req_op_data(osd_req, i, cls, chain_data);
+               osd_data->length = 0;
+
+               osd_data = osd_req_op_data(osd_req, i, cls, response_data);
+               chain_length += osd_data->length;
+       }
+
+       chain_data->length = chain_length;
+       chain_pagecount = (u32)calc_pages_for(0, chain_data->length);
+       pages = ceph_alloc_page_vector(chain_pagecount, GFP_KERNEL);
+       if (IS_ERR(pages))
+               return PTR_ERR(pages);
+       ceph_osd_data_pages_init(chain_data, pages, chain_length, 0, false, false);
+
+       return 0;
+}
+
+static int __split_cls_op_chain(struct ceph_osd_request *osd_req)
+{
+       int i;
+       void * data;
+       void * p;
+       struct ceph_osd_data *osd_data;
+
+       osd_data = osd_req_op_data(osd_req, 0, cls, chain_data);
+
+       if (osd_data->length == 0)
+               return 0;
+
+       data = kzalloc(osd_data->length, GFP_KERNEL);
+       if (!data)
+               return -ENOMEM;
+
+       ceph_copy_from_page_vector(osd_data->pages, data, 0, osd_data->length);
+       ceph_osd_data_release(osd_data);
+
+       p = data;
+       for (i = 0; i < osd_req->r_num_ops; i++)
+       {
+               osd_data = osd_req_op_data(osd_req, i, cls, response_data);
+               ceph_copy_to_page_vector(osd_data->pages, p,
+                       0, osd_req->r_reply_op_len[i]);
+               p += osd_req->r_reply_op_len[i];
+       }
+
+       kfree(data);
+       return 0;
+}
+
 /*
  * requests
  */
@@ -694,8 +761,20 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
                        src->payload_len += data_length;
                        request_data_len += data_length;
                }
-               osd_data = &src->cls.response_data;
-               ceph_osdc_msg_data_add(req->r_reply, osd_data);
+               if (which == 0)
+               {
+                       int err;
+
+                       err = __build_op_cls_chain(req);
+                       if (err == -ENOMEM)
+                       {
+                               pr_err("error allocating memory for op chain\n");
+                               return 0;
+                       }
+                       osd_data = &src->cls.chain_data;
+                       if (osd_data->length)
+                               ceph_osdc_msg_data_add(req->r_reply, osd_data);
+               }
                break;
        case CEPH_OSD_OP_STARTSYNC:
                break;
@@ -1825,6 +1904,15 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        for (i = 0; i < numops; i++)
                req->r_reply_op_result[i] = ceph_decode_32(&p);
 
+       if (req->r_ops[0].op == CEPH_OSD_OP_CALL &&
+                       req->r_ops[0].cls.chain_data.length)
+       {
+               int err;
+               err = __split_cls_op_chain(req);
+               if (err == -ENOMEM)
+                       goto bad_put;
+       }
+
        if (le16_to_cpu(msg->hdr.version) >= 6) {
                p += 8 + 4; /* skip replay_version */
                p += 8; /* skip user_version */