]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: don't keep reply ref in osd_request; just parse out the result
authorSage Weil <sage@newdream.net>
Thu, 20 Aug 2009 21:45:51 +0000 (14:45 -0700)
committerSage Weil <sage@newdream.net>
Thu, 20 Aug 2009 21:45:51 +0000 (14:45 -0700)
This requires a few new flags: r_prepared_pages, so we only do that
once, and r_got_reply, so we only parse the result once.

src/kernel/osd_client.c
src/kernel/osd_client.h

index 3f2dc4a938a61c6ff90b75e3d2abb2c18819a83c..af127cb64e5e9b540839593e6e32858f04206be9 100644 (file)
@@ -376,7 +376,7 @@ static int map_osds(struct ceph_osd_client *osdc,
 }
 
 /*
- * caller should hold map_sem (for read)
+ * caller should hold map_sem (for read) and request_mutex
  */
 static int send_request(struct ceph_osd_client *osdc,
                        struct ceph_osd_request *req)
@@ -518,19 +518,25 @@ void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                dout("handle_reply tid %llu aborted\n", tid);
                goto done;
        }
+       if (!req->r_got_reply) {
+               struct ceph_osd_reply_head *replyhead;
+               unsigned bytes;
+
+               replyhead = msg->front.iov_base;
+               req->r_result = le32_to_cpu(replyhead->result);
+               bytes = le32_to_cpu(msg->hdr.data_len);
+               dout("handle_reply result %d bytes %d\n", req->r_result,
+                    bytes);
+               if (req->r_result == 0)
+                       req->r_result = bytes;
+               req->r_got_reply = 1;
+       }
+       if (req->r_reply) {
+               ceph_msg_put(req->r_reply);
+               req->r_reply = NULL;
+       }
 
        if (req->r_reassert_version.epoch == 0) {
-               /* first ack */
-               if (req->r_reply == NULL) {
-                       /* no data payload, or r_reply would have been set by
-                          prepare_pages. */
-                       ceph_msg_get(msg);
-                       req->r_reply = msg;
-               } else {
-                       /* r_reply was set by prepare_pages */
-                       BUG_ON(req->r_reply != msg);
-               }
-
                /* in case we need to replay this op, */
                req->r_reassert_version = rhead->reassert_version;
        } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
@@ -783,12 +789,13 @@ int ceph_osdc_prepare_pages(void *p, struct ceph_msg *m, int want)
        }
        dout("prepare_pages tid %llu has %d pages, want %d\n",
             tid, req->r_num_pages, want);
-       if (likely(req->r_num_pages >= want && req->r_reply == NULL &&
-                   !req->r_aborted)) {
+       if (likely(req->r_num_pages >= want && !req->r_prepared_pages &&
+                  !req->r_aborted)) {
                m->pages = req->r_pages;
                m->nr_pages = req->r_num_pages;
+               req->r_reply = m;  /* only for duration of read over socket */
                ceph_msg_get(m);
-               req->r_reply = m;
+               req->r_prepared_pages = 1;
                ret = 0; /* success */
        }
 out:
@@ -811,8 +818,8 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
        register_request(osdc, req);
 
        down_read(&osdc->map_sem);
+       mutex_lock(&osdc->request_mutex);
        rc = send_request(osdc, req);
-       up_read(&osdc->map_sem);
        if (rc) {
                if (nofail) {
                        dout("osdc_start_request failed send, marking %lld\n",
@@ -820,11 +827,11 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
                        req->r_resend = true;
                        rc = 0;
                } else {
-                       mutex_lock(&osdc->request_mutex);
                        __unregister_request(osdc, req);
-                       mutex_unlock(&osdc->request_mutex);
                }
        }
+       mutex_unlock(&osdc->request_mutex);
+       up_read(&osdc->map_sem);
        return rc;
 }
 
@@ -834,9 +841,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
 int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
                           struct ceph_osd_request *req)
 {
-       struct ceph_osd_reply_head *replyhead;
-       __s32 rc;
-       int bytes;
+       int rc;
 
        rc = wait_for_completion_interruptible(&req->r_completion);
        if (rc < 0) {
@@ -844,15 +849,8 @@ int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
                return rc;
        }
 
-       /* parse reply */
-       replyhead = req->r_reply->front.iov_base;
-       rc = le32_to_cpu(replyhead->result);
-       bytes = le32_to_cpu(req->r_reply->hdr.data_len);
-       dout("wait_request tid %llu result %d, %d bytes\n",
-            req->r_tid, rc, bytes);
-       if (rc < 0)
-               return rc;
-       return bytes;
+       dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
+       return req->r_result;
 }
 
 /*
index eaffbfcb61b4845ae0be98b1de7a8e1f148cde3f..fc06ecc8f4db680b853e92f0d9965adad9dcd8a3 100644 (file)
@@ -23,11 +23,11 @@ struct ceph_osd_request {
        u64             r_tid;              /* unique for this client */
        struct rb_node  r_node;
 
-       struct ceph_msg  *r_request;
-       struct ceph_msg  *r_reply;
+       struct ceph_msg  *r_request, *r_reply;
        int               r_result;
        int               r_flags;     /* any additional flags for the osd */
        int               r_aborted;   /* set if we cancel this request */
+       int r_prepared_pages, r_got_reply;
 
        struct ceph_osd_client *r_osdc;
        atomic_t          r_ref;