]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: async writepages
authorSage Weil <sage@newdream.net>
Mon, 29 Sep 2008 18:48:56 +0000 (11:48 -0700)
committerSage Weil <sage@newdream.net>
Mon, 29 Sep 2008 18:49:17 +0000 (11:49 -0700)
src/kernel/addr.c
src/kernel/osd_client.c
src/kernel/osd_client.h

index 214ad3625dd71b05e89cef8713da0204bf531a07..460bda78c25cd6d35ccc0efccebaa4a7f3884ad8 100644 (file)
@@ -436,21 +436,79 @@ void ceph_release_pages(struct page **pages, int num)
        pagevec_release(&pvec);
 }
 
+
 /*
- * ceph_writepages:
- *  do write jobs for several pages
+ * writeback completion handler.
  */
-static int ceph_writepages(struct address_space *mapping,
-                          struct writeback_control *wbc)
+static void writepages_finish(struct ceph_osd_request *req)
+{
+       struct inode *inode = req->r_inode;
+       struct ceph_osd_reply_head *replyhead;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       unsigned wrote;
+       loff_t offset = req->r_pages[0]->index << PAGE_CACHE_SHIFT;
+       struct page *page;
+       int i;
+       struct ceph_snap_context *snapc = req->r_snapc;
+       struct address_space *mapping = inode->i_mapping;
+       struct writeback_control *wbc = req->r_wbc;
+       __s32 rc = -EIO;
+       __u64 bytes = 0;
+
+       /* parse reply */
+       if (req->r_reply) {
+               replyhead = req->r_reply->front.iov_base;
+               rc = le32_to_cpu(replyhead->result);
+               bytes = le32_to_cpu(replyhead->length);
+       }
+       if (rc >= 0)
+               wrote = (bytes + (offset & ~PAGE_CACHE_MASK) + ~PAGE_CACHE_MASK)
+                       >> PAGE_CACHE_SHIFT;
+       else
+               wrote = 0;
+       dout(10, "writepages_finish rc %d bytes %llu wrote %d (pages)\n", rc,
+            bytes, wrote);
+
+       /* clean or redirty pages */
+       for (i = 0; i < req->r_num_pages; i++) {
+               page = req->r_pages[i];
+               WARN_ON(!PageUptodate(page));
+               if (i < wrote) {
+                       dout(20, "%p cleaning %p\n", inode, page);
+                       page->private = 0;
+                       ClearPagePrivate(page);
+                       ceph_put_snap_context(snapc);
+               } else {
+                       dout(20, "%p redirtying %p\n", inode, page);
+                       ceph_redirty_page(mapping, page);
+                       wbc->pages_skipped++;
+               }
+               dout(50, "unlocking %d %p\n", i, page);
+               end_page_writeback(page);
+               unlock_page(page);
+       }
+       dout(20, "%p wrote+cleaned %d pages\n", inode, wrote);
+       ceph_put_wrbuffer_cap_refs(ci, wrote, snapc);
+
+       ceph_release_pages(req->r_pages, req->r_num_pages);
+       ceph_osdc_put_request(req);
+}
+
+/*
+ * initiate async writeback
+ */
+static int ceph_writepages_start(struct address_space *mapping,
+                                struct writeback_control *wbc)
 {
        struct inode *inode = mapping->host;
+       struct backing_dev_info *bdi = mapping->backing_dev_info;
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_client *client = ceph_inode_to_client(inode);
        pgoff_t index, start, end;
        int range_whole = 0;
        int should_loop = 1;
-       struct page **pages;
-       pgoff_t max_pages = 0;
+       struct page **pages = 0;
+       pgoff_t max_pages = 0, max_pages_ever = 0;
        struct ceph_snap_context *snapc = 0, *last_snapc = 0;
        struct pagevec pvec;
        int done = 0;
@@ -465,19 +523,15 @@ static int ceph_writepages(struct address_space *mapping,
        dout(10, "writepages on %p, wsize %u\n", inode, wsize);
 
        /* larger page vector? */
-       max_pages = wsize >> PAGE_CACHE_SHIFT;
-       pages = kmalloc(max_pages * sizeof(*pages), GFP_NOFS);
-       if (!pages)
-               return generic_writepages(mapping, wbc);
+       max_pages_ever = wsize >> PAGE_CACHE_SHIFT;
        pagevec_init(&pvec, 0);
 
-       /* ?? from cifs. */
-       /*
+       /* ?? */
        if (wbc->nonblocking && bdi_write_congested(bdi)) {
-               wbc->encountered_congestions = 1;
+               dout(10, "writepages congested\n");
+               wbc->encountered_congestion = 1;
                return 0;
        }
-       */
 
        /* where to start/end? */
        if (wbc->range_cyclic) {
@@ -520,16 +574,19 @@ retry:
                struct page *page;
                int want;
                loff_t offset, len;
-               unsigned wrote;
+               struct ceph_osd_request *req;
 
+               req = 0;
                next = 0;
                locked_pages = 0;
+               max_pages = max_pages_ever;
 
 get_more_pages:
                first = -1;
                want = min(end - index,
                           min((pgoff_t)PAGEVEC_SIZE,
-                              max_pages - (pgoff_t)locked_pages) - 1) + 1;
+                              max_pages - (pgoff_t)locked_pages) - 1)
+                       + 1;
                pvec_pages = pagevec_lookup_tag(&pvec, mapping, &index,
                                                PAGECACHE_TAG_DIRTY,
                                                want);
@@ -539,8 +596,23 @@ get_more_pages:
                for (i = 0; i < pvec_pages && locked_pages < max_pages; i++) {
                        page = pvec.pages[i];
                        dout(20, "? %p idx %lu\n", page, page->index);
-                       if (locked_pages == 0)
+                       if (locked_pages == 0) {
                                lock_page(page);
+
+                               /* alloc request */
+                               offset = page->index << PAGE_CACHE_SHIFT;
+                               req = ceph_osdc_new_request(&client->osdc,
+                                                           &ci->i_layout,
+                                                           ceph_vino(inode),
+                                                           offset, wsize,
+                                                           CEPH_OSD_OP_WRITE,
+                                                           snapc);
+                               max_pages = req->r_num_pages;
+                               pages = req->r_pages;
+                               req->r_callback = writepages_finish;
+                               req->r_inode = inode;
+                               req->r_wbc = wbc;
+                       }
 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,27)
                        else if (!trylock_page(page))
 #else
@@ -649,41 +721,8 @@ get_more_pages:
                                 (loff_t)locked_pages << PAGE_CACHE_SHIFT);
                dout(10, "writepages got %d pages at %llu~%llu\n",
                     locked_pages, offset, len);
-               rc = ceph_osdc_writepages(&client->osdc,
-                                         ceph_vino(inode),
-                                         &ci->i_layout,
-                                         snapc,
-                                         offset, len,
-                                         pages,
-                                         locked_pages);
-               if (rc >= 0)
-                       wrote = (rc + (offset & ~PAGE_CACHE_MASK)
-                                + ~PAGE_CACHE_MASK)
-                               >> PAGE_CACHE_SHIFT;
-               else
-                       wrote = 0;
-               dout(20, "writepages rc %d wrote %d\n", rc, wrote);
-
-               /* clean or redirty pages */
-               for (i = 0; i < locked_pages; i++) {
-                       page = pages[i];
-                       WARN_ON(!PageUptodate(page));
-                       if (i < wrote) {
-                               dout(20, "%p cleaning %p\n", inode, page);
-                               page->private = 0;
-                               ClearPagePrivate(page);
-                               ceph_put_snap_context(snapc);
-                       } else {
-                               dout(20, "%p redirtying %p\n", inode, page);
-                               ceph_redirty_page(mapping, page);
-                               wbc->pages_skipped++;
-                       }
-                       dout(50, "unlocking %d %p\n", i, page);
-                       end_page_writeback(page);
-                       unlock_page(page);
-               }
-               dout(20, "%p wrote+cleaned %d pages\n", inode, wrote);
-               ceph_put_wrbuffer_cap_refs(ci, wrote, snapc);
+               rc = ceph_osdc_writepages_start(&client->osdc, req,
+                                               len, locked_pages);
 
                /* continue? */
                index = next;
@@ -692,10 +731,6 @@ get_more_pages:
                        done = 1;
 
        release_pages:
-               /* hmm, pagevec_release also does lru_add_drain()...? */
-               dout(50, "release_pages on %d\n", locked_pages);
-               ceph_release_pages(pages, locked_pages);
-
                dout(50, "pagevec_release on %d pages (%p)\n", (int)pvec.nr,
                     pvec.nr ? pvec.pages[0] : 0);
                pagevec_release(&pvec);
@@ -716,7 +751,6 @@ get_more_pages:
                mapping->writeback_index = index;
 
 out:
-       kfree(pages);
        if (rc > 0)
                rc = 0;  /* vfs expects us to return 0 */
        ceph_put_snap_context(snapc);
@@ -865,7 +899,7 @@ const struct address_space_operations ceph_aops = {
        .readpage = ceph_readpage,
        .readpages = ceph_readpages,
        .writepage = ceph_writepage,
-       .writepages = ceph_writepages,
+       .writepages = ceph_writepages_start,
        .write_begin = ceph_write_begin,
        .write_end = ceph_write_end,
        .set_page_dirty = ceph_set_page_dirty_vfs,
@@ -875,7 +909,7 @@ const struct address_space_operations ceph_aops = {
 
 
 /*
- * vm ops 
+ * vm ops
  */
 
 static int ceph_page_mkwrite(struct vm_area_struct *vma, struct page *page)
index 5bc00877699e0d0894ed6eb714a9792bbf22ebec..2439340673e9def634a2a1a8383beeb1beefe1cc 100644 (file)
@@ -26,6 +26,45 @@ struct ceph_readdesc {
 };
 
 
+
+
+/*
+ * calculate the mapping of an extent onto an object, and fill out the
+ * request accordingly.  shorten extent as necessary if it hits an
+ * object boundary.
+ */
+static __u64 calc_layout(struct ceph_osd_client *osdc,
+                        struct ceph_vino vino, struct ceph_file_layout *layout,
+                        __u64 off, __u64 len,
+                        struct ceph_osd_request *req)
+{
+       struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
+       __u64 toff = off, tlen = len;
+
+       reqhead->oid.ino = vino.ino;
+       reqhead->oid.snap = vino.snap;
+
+       calc_file_object_mapping(layout, &toff, &tlen, &reqhead->oid,
+                                &off, &len);
+       if (tlen != 0)
+               dout(10, " skipping last %llu, writing  %llu~%llu\n",
+                    tlen, off, len);
+       reqhead->offset = cpu_to_le64(off);
+       reqhead->length = cpu_to_le64(len);
+
+       calc_object_layout(&reqhead->layout, &reqhead->oid, layout,
+                          osdc->osdmap);
+       req->r_num_pages = calc_pages_for(off, len);
+
+       dout(10, "calc_layout bno %u on %llu~%llu pgid %llx (%d pages)\n",
+            le32_to_cpu(reqhead->oid.bno), off, len,
+            le64_to_cpu(reqhead->layout.ol_pgid),
+            req->r_num_pages);
+
+       return len;
+}
+
+
 /*
  * requests
  */
@@ -35,17 +74,20 @@ static void get_request(struct ceph_osd_request *req)
        atomic_inc(&req->r_ref);
 }
 
-static void put_request(struct ceph_osd_request *req)
+void ceph_osdc_put_request(struct ceph_osd_request *req)
 {
        dout(10, "put_request %p %d -> %d\n", req, atomic_read(&req->r_ref),
             atomic_read(&req->r_ref)-1);
        BUG_ON(atomic_read(&req->r_ref) <= 0);
        if (atomic_dec_and_test(&req->r_ref)) {
                ceph_msg_put(req->r_request);
+               ceph_put_snap_context(req->r_snapc);
                kfree(req);
        }
 }
 
+
+
 struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int op,
                                 struct ceph_snap_context *snapc)
 {
@@ -78,16 +120,33 @@ struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int op,
        return req;
 }
 
-static struct ceph_osd_request *alloc_request(int num_pages,
-                                             struct ceph_msg *msg)
+struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
+                                              struct ceph_file_layout *layout,
+                                              struct ceph_vino vino,
+                                              __u64 off, __u64 len, int op,
+                                              struct ceph_snap_context *snapc)
 {
        struct ceph_osd_request *req;
+       struct ceph_msg *msg;
+       int num_pages = len >> PAGE_CACHE_SHIFT;
+
+       BUG_ON(vino.snap != CEPH_NOSNAP);
 
        req = kzalloc(sizeof(*req) + num_pages*sizeof(void *), GFP_NOFS);
        if (req == NULL)
                return ERR_PTR(-ENOMEM);
+
+       msg = new_request_msg(osdc, op, snapc);
+       if (IS_ERR(msg)) {
+               kfree(req);
+               return ERR_PTR(PTR_ERR(msg));
+       }
        req->r_request = msg;
-       req->r_num_pages = num_pages;
+       req->r_snapc = ceph_get_snap_context(snapc);
+
+       /* calculate max write size */
+       calc_layout(osdc, vino, layout, off, len, req);
+
        atomic_set(&req->r_ref, 1);
        return req;
 }
@@ -96,7 +155,7 @@ static void reschedule_timeout(struct ceph_osd_client *osdc)
 {
        int timeout = osdc->client->mount_args.osd_timeout;
        dout(10, "reschedule timeout (%d seconds)\n", timeout);
-       schedule_delayed_work(&osdc->timeout_work, 
+       schedule_delayed_work(&osdc->timeout_work,
                              round_jiffies_relative(timeout*HZ));
 }
 
@@ -124,21 +183,20 @@ static int register_request(struct ceph_osd_client *osdc,
        get_request(req);
        rc = radix_tree_insert(&osdc->request_tree, req->r_tid, (void *)req);
 
-       if (osdc->nr_requests == 0) 
+       if (osdc->nr_requests == 0)
                reschedule_timeout(osdc);
        osdc->nr_requests++;
 
        spin_unlock(&osdc->request_lock);
        radix_tree_preload_end();
-       
+
        return rc;
 }
 
-static void unregister_request(struct ceph_osd_client *osdc,
+static void __unregister_request(struct ceph_osd_client *osdc,
                               struct ceph_osd_request *req)
 {
-       dout(30, "unregister_request %p tid %lld\n", req, req->r_tid);
-       spin_lock(&osdc->request_lock);
+       dout(30, "__unregister_request %p tid %lld\n", req, req->r_tid);
        radix_tree_delete(&osdc->request_tree, req->r_tid);
 
        osdc->nr_requests--;
@@ -146,8 +204,7 @@ static void unregister_request(struct ceph_osd_client *osdc,
        if (osdc->nr_requests)
                reschedule_timeout(osdc);
 
-       spin_unlock(&osdc->request_lock);
-       put_request(req);
+       ceph_osdc_put_request(req);
 }
 
 /*
@@ -161,12 +218,12 @@ static int pick_osd(struct ceph_osd_client *osdc,
        int pps; /* placement ps */
        int osds[10];
        int nr_osds;
-       
+
        ruleno = crush_find_rule(osdc->osdmap->crush, req->r_pgid.pg.pool,
                                 req->r_pgid.pg.type, req->r_pgid.pg.size);
        if (ruleno < 0) {
                derr(0, "pick_osd no crush rule for pool %d type %d size %d\n",
-                    req->r_pgid.pg.pool, req->r_pgid.pg.type, 
+                    req->r_pgid.pg.pool, req->r_pgid.pg.type,
                     req->r_pgid.pg.size);
                return -1;
        }
@@ -217,7 +274,7 @@ static void send_request(struct ceph_osd_client *osdc,
        req->r_request->hdr.dst.name.num = cpu_to_le32(osd);
        req->r_request->hdr.dst.addr = osdc->osdmap->osd_addr[osd];
        req->r_last_osd = req->r_request->hdr.dst.addr;
-       
+
        ceph_msg_get(req->r_request); /* send consumes a ref */
        ceph_msg_send(osdc->client->msgr, req->r_request, 0);
 }
@@ -259,10 +316,15 @@ void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
        dout(10, "handle_reply tid %llu flags %d |= %d\n", tid, req->r_flags,
             rhead->flags);
        req->r_flags |= rhead->flags;
+       __unregister_request(osdc, req);
        spin_unlock(&osdc->request_lock);
-       complete(&req->r_completion);
+
+       if (req->r_callback)
+               req->r_callback(req);
+       else
+               complete(&req->r_completion);  /* see do_sync_request */
 done:
-       put_request(req);
+       ceph_osdc_put_request(req);
        return;
 
 bad:
@@ -282,11 +344,11 @@ static int kick_requests(struct ceph_osd_client *osdc)
        int got;
        int osd;
        int ret = 0;
-       
+
 more:
        spin_lock(&osdc->request_lock);
 more_locked:
-       got = radix_tree_gang_lookup(&osdc->request_tree, (void **)&req, 
+       got = radix_tree_gang_lookup(&osdc->request_tree, (void **)&req,
                                     next_tid, 1);
        if (got == 0)
                goto done;
@@ -306,7 +368,7 @@ more_locked:
                        req->r_flags |= CEPH_OSD_OP_RETRY;
                        send_request(osdc, req, osd);
                }
-               put_request(req);
+               ceph_osdc_put_request(req);
                goto more;
        }
        goto more_locked;
@@ -466,29 +528,31 @@ out:
        return ret;
 }
 
+
+void start_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req)
+{
+       register_request(osdc, req);
+       down_read(&osdc->map_sem);
+       send_request(osdc, req, -1);
+       up_read(&osdc->map_sem);
+}
+
 /*
  * synchronously do an osd request.
  */
-int do_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req)
+int do_sync_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req)
 {
        struct ceph_osd_reply_head *replyhead;
        __s32 rc;
        int bytes;
 
-       /* register+send request */
-       register_request(osdc, req);
-       down_read(&osdc->map_sem);
-       send_request(osdc, req, -1);
-       up_read(&osdc->map_sem);
-
+       start_request(osdc, req);       /* register+send request */
        rc = wait_for_completion_interruptible(&req->r_completion);
-
-       unregister_request(osdc, req);
        if (rc < 0) {
                struct ceph_msg *msg;
                dout(0, "tid %llu err %d, revoking %p pages\n", req->r_tid,
                     rc, req->r_request);
-               /* 
+               /*
                 * mark req aborted _before_ revoking pages, so that
                 * if a racing kick_request _does_ dup the page vec
                 * pointer, it will definitely then see the aborted
@@ -511,7 +575,7 @@ int do_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req)
        replyhead = req->r_reply->front.iov_base;
        rc = le32_to_cpu(replyhead->result);
        bytes = le32_to_cpu(req->r_reply->hdr.data_len);
-       dout(10, "do_request tid %llu result %d, %d bytes\n",
+       dout(10, "do_sync_request tid %llu result %d, %d bytes\n",
             req->r_tid, rc, bytes);
        if (rc < 0)
                return rc;
@@ -520,7 +584,7 @@ int do_request(struct ceph_osd_client *osdc, struct ceph_osd_request *req)
 
 void handle_timeout(struct work_struct *work)
 {
-       struct ceph_osd_client *osdc = 
+       struct ceph_osd_client *osdc =
                container_of(work, struct ceph_osd_client, timeout_work.work);
        dout(10, "timeout\n");
        down_read(&osdc->map_sem);
@@ -557,44 +621,10 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
 
 
 
-/*
- * calculate the mapping of an extent onto an object, and fill out the
- * request accordingly.  shorten extent as necessary if it hits an
- * object boundary.
- */
-static __u64 calc_layout(struct ceph_osd_client *osdc,
-                        struct ceph_vino vino, struct ceph_file_layout *layout,
-                        __u64 off, __u64 len,
-                        struct ceph_osd_request *req)
-{
-       struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
-       __u64 toff = off, tlen = len;
-
-       reqhead->oid.ino = vino.ino;
-       reqhead->oid.snap = vino.snap;
-
-       calc_file_object_mapping(layout, &toff, &tlen, &reqhead->oid,
-                                &off, &len);
-       if (tlen != 0)
-               dout(10, " skipping last %llu, writing  %llu~%llu\n",
-                    tlen, off, len);
-       reqhead->offset = cpu_to_le64(off);
-       reqhead->length = cpu_to_le64(len);
-
-       calc_object_layout(&reqhead->layout, &reqhead->oid, layout,
-                          osdc->osdmap);
-
-       dout(10, "calc_layout bno %u on %llu~%llu pgid %llx\n",
-            le32_to_cpu(reqhead->oid.bno), off, len,
-            le64_to_cpu(reqhead->layout.ol_pgid));
-
-       return len;
-}
-
 /*
  * synchronous read direct to user buffer.
  *
- * FIXME: if read spans object boundary, just do two separate reads.
+ * if read spans object boundary, just do two separate reads.  FIXME:
  * for a correct atomic read, we should take read locks on all
  * objects.
  */
@@ -605,7 +635,7 @@ int ceph_osdc_sync_read(struct ceph_osd_client *osdc, struct ceph_vino vino,
 {
        struct ceph_msg *reqm;
        struct ceph_osd_request *req;
-       int num_pages, i, po, left, l;
+       int i, po, left, l;
        __s32 rc;
        int finalrc = 0;
        u64 rlen;
@@ -614,35 +644,31 @@ int ceph_osdc_sync_read(struct ceph_osd_client *osdc, struct ceph_vino vino,
             vino.snap, off, len);
 
 more:
-       /* request msg */
-       reqm = new_request_msg(osdc, CEPH_OSD_OP_READ, 0);
-       if (IS_ERR(reqm))
-               return PTR_ERR(reqm);
-
-       num_pages = calc_pages_for(off, len);
-       req = alloc_request(num_pages, reqm);
+       req = ceph_osdc_new_request(osdc, layout, vino, off, len,
+                                   CEPH_OSD_OP_READ, 0);
        if (IS_ERR(req))
                return PTR_ERR(req);
+       reqm = req->r_request;
 
        rlen = calc_layout(osdc, vino, layout, off, len, req);
-       num_pages = calc_pages_for(off, rlen);  /* recalc */
-       dout(10, "sync_read %llu~%llu -> %d pages\n", off, rlen, num_pages);
+       dout(10, "sync_read %llu~%llu -> %d pages\n", off, rlen,
+            req->r_num_pages);
 
        /* allocate temp pages to hold data */
-       for (i = 0; i < num_pages; i++) {
+       for (i = 0; i < req->r_num_pages; i++) {
                req->r_pages[i] = alloc_page(GFP_NOFS);
                if (req->r_pages[i] == NULL) {
                        req->r_num_pages = i+1;
-                       put_request(req);
+                       ceph_osdc_put_request(req);
                        return -ENOMEM;
                }
        }
-       reqm->nr_pages = num_pages;
+       reqm->nr_pages = req->r_num_pages;
        reqm->pages = req->r_pages;
        reqm->hdr.data_len = cpu_to_le32(rlen);
        reqm->hdr.data_off = cpu_to_le32(off);
 
-       rc = do_request(osdc, req);
+       rc = do_sync_request(osdc, req);
        if (rc > 0) {
                /* copy into user buffer */
                po = off & ~PAGE_CACHE_MASK;
@@ -671,7 +697,7 @@ more:
                }
        }
 out:
-       put_request(req);
+       ceph_osdc_put_request(req);
        if (rc > 0) {
                finalrc += rc;
                off += rc;
@@ -701,23 +727,19 @@ int ceph_osdc_readpage(struct ceph_osd_client *osdc, struct ceph_vino vino,
             vino.snap, off, len);
 
        /* request msg */
-       reqm = new_request_msg(osdc, CEPH_OSD_OP_READ, 0);
-       if (IS_ERR(reqm))
-               return PTR_ERR(reqm);
-       reqhead = reqm->front.iov_base;
-
-       req = alloc_request(1, reqm);
-       if (IS_ERR(req)) {
-               ceph_msg_put(reqm);
+       req = ceph_osdc_new_request(osdc, layout, vino, off, len,
+                                   CEPH_OSD_OP_READ, 0);
+       if (IS_ERR(req))
                return PTR_ERR(req);
-       }
+       reqm = req->r_request;
+       reqhead = reqm->front.iov_base;
        req->r_pages[0] = page;
 
        len = calc_layout(osdc, vino, layout, off, len, req);
        BUG_ON(len != PAGE_CACHE_SIZE);
 
-       rc = do_request(osdc, req);
-       put_request(req);
+       rc = do_sync_request(osdc, req);
+       ceph_osdc_put_request(req);
        dout(10, "readpage result %d\n", rc);
        if (rc == -ENOENT)
                rc = 0;         /* object page dne; zero it */
@@ -751,14 +773,11 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
             vino.snap, off, len);
 
        /* alloc request, w/ optimistically-sized page vector */
-       reqm = new_request_msg(osdc, CEPH_OSD_OP_READ, 0);
-       if (IS_ERR(reqm))
-               return PTR_ERR(reqm);
-       req = alloc_request(num_pages, reqm);
-       if (req == 0) {
-               ceph_msg_put(reqm);
-               return -ENOMEM;
-       }
+       req = ceph_osdc_new_request(osdc, layout, vino, off, len,
+                                   CEPH_OSD_OP_READ, 0);
+       if (IS_ERR(req))
+               return PTR_ERR(req);
+       reqm = req->r_request;
 
        /* find adjacent pages */
        next_index = list_entry(page_list->prev, struct page, lru)->index;
@@ -780,13 +799,12 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
 
        /* request msg */
        len = calc_layout(osdc, vino, layout, off, len, req);
-       req->r_num_pages = calc_pages_for(off, len);
        dout(10, "readpages final extent is %llu~%llu -> %d pages\n",
             off, len, req->r_num_pages);
-       rc = do_request(osdc, req);
+       rc = do_sync_request(osdc, req);
 
 out:
-       put_request(req);
+       ceph_osdc_put_request(req);
        dout(10, "readpages result %d\n", rc);
        if (rc < 0)
                dout(10, "hrm!\n");
@@ -809,7 +827,7 @@ int ceph_osdc_sync_write(struct ceph_osd_client *osdc, struct ceph_vino vino,
        struct ceph_msg *reqm;
        struct ceph_osd_request_head *reqhead;
        struct ceph_osd_request *req;
-       int num_pages, i, po, l, left;
+       int i, po, l, left;
        __s32 rc;
        u64 rlen;
        int finalrc = 0;
@@ -818,30 +836,23 @@ int ceph_osdc_sync_write(struct ceph_osd_client *osdc, struct ceph_vino vino,
             vino.snap, off, len);
 
 more:
-       /* request msg */
-       reqm = new_request_msg(osdc, CEPH_OSD_OP_WRITE, snapc);
-       if (IS_ERR(reqm))
-               return PTR_ERR(reqm);
+       req = ceph_osdc_new_request(osdc, layout, vino, off, len,
+                                   CEPH_OSD_OP_WRITE, snapc);
+       if (IS_ERR(req))
+               return PTR_ERR(req);
+       reqm = req->r_request;
        reqhead = reqm->front.iov_base;
        reqhead->flags = CEPH_OSD_OP_ACK | /* just ack for now... FIXME */
                CEPH_OSD_OP_ORDERSNAP;     /* get EOLDSNAPC if out of order */
 
-       /* how many pages? */
-       num_pages = calc_pages_for(off, len);
-       req = alloc_request(num_pages, reqm);
-       if (IS_ERR(req)) {
-               ceph_msg_put(reqm);
-               return PTR_ERR(req);
-       }
-
        rlen = calc_layout(osdc, vino, layout, off, len, req);
-       num_pages = calc_pages_for(off, rlen);  /* recalc */
-       dout(10, "sync_write %llu~%llu -> %d pages\n", off, rlen, num_pages);
+       dout(10, "sync_write %llu~%llu -> %d pages\n", off, rlen,
+            req->r_num_pages);
 
        /* copy data into a set of pages */
        left = rlen;
        po = off & ~PAGE_MASK;
-       for (i = 0; i < num_pages; i++) {
+       for (i = 0; i < req->r_num_pages; i++) {
                int bad;
                req->r_pages[i] = alloc_page(GFP_NOFS);
                if (req->r_pages[i] == NULL) {
@@ -866,15 +877,15 @@ more:
                }
        }
        reqm->pages = req->r_pages;
-       reqm->nr_pages = num_pages;
+       reqm->nr_pages = req->r_num_pages;
        reqm->hdr.data_len = cpu_to_le32(rlen);
        reqm->hdr.data_off = cpu_to_le32(off);
 
-       rc = do_request(osdc, req);
+       rc = do_sync_request(osdc, req);
 out:
        for (i = 0; i < req->r_num_pages; i++)
                __free_pages(req->r_pages[i], 0);
-       put_request(req);
+       ceph_osdc_put_request(req);
        if (rc == 0) {
                finalrc += rlen;
                off += rlen;
@@ -888,7 +899,7 @@ out:
 }
 
 /*
- * do a write job for N pages
+ * do a sync write for N pages
  */
 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
                         struct ceph_file_layout *layout,
@@ -904,38 +915,62 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
        BUG_ON(vino.snap != CEPH_NOSNAP);
 
        /* request + msg */
-       reqm = new_request_msg(osdc, CEPH_OSD_OP_WRITE, snapc);
-       if (IS_ERR(reqm))
-               return PTR_ERR(reqm);
-       req = alloc_request(num_pages, reqm);
-       if (IS_ERR(req)) {
-               ceph_msg_put(reqm);
+       req = ceph_osdc_new_request(osdc, layout, vino, off, len,
+                                   CEPH_OSD_OP_WRITE, snapc);
+       if (IS_ERR(req))
                return PTR_ERR(req);
-       }
-
+       reqm = req->r_request;
        reqhead = reqm->front.iov_base;
        if (osdc->client->mount_args.flags & CEPH_MOUNT_UNSAFE_WRITEBACK)
                reqhead->flags = CEPH_OSD_OP_ACK;
        else
                reqhead->flags = CEPH_OSD_OP_SAFE;
 
-       len = calc_layout(osdc, vino, layout, off, len, req);
-       num_pages = calc_pages_for(off, len);
-       dout(10, "writepages %llu~%llu -> %d pages\n", off, len, num_pages);
-       
+       len = le64_to_cpu(reqhead->length);
+       dout(10, "writepages %llu~%llu -> %d pages\n", off, len,
+            req->r_num_pages);
+
        /* copy pages */
-       memcpy(req->r_pages, pages, num_pages * sizeof(struct page *));
+       memcpy(req->r_pages, pages, req->r_num_pages * sizeof(struct page *));
        reqm->pages = req->r_pages;
-       reqm->nr_pages = req->r_num_pages = num_pages;
+       reqm->nr_pages = req->r_num_pages;
        reqm->hdr.data_len = len;
        reqm->hdr.data_off = off;
 
-       rc = do_request(osdc, req);
-       put_request(req);
+       rc = do_sync_request(osdc, req);
+       ceph_osdc_put_request(req);
        if (rc == 0)
                rc = len;
        dout(10, "writepages result %d\n", rc);
        return rc;
 }
 
+/*
+ * start an async multipage write
+ */
+int ceph_osdc_writepages_start(struct ceph_osd_client *osdc,
+                              struct ceph_osd_request *req,
+                              __u64 len, int num_pages)
+{
+       struct ceph_msg *reqm = req->r_request;
+       struct ceph_osd_request_head *reqhead = reqm->front.iov_base;
+       __u64 off = le64_to_cpu(reqhead->offset);
+
+       dout(10, "writepages_start %llu~%llu, %d pages\n", off, len, num_pages);
+
+       if (osdc->client->mount_args.flags & CEPH_MOUNT_UNSAFE_WRITEBACK)
+               reqhead->flags = CEPH_OSD_OP_ACK;
+       else
+               reqhead->flags = CEPH_OSD_OP_SAFE;
+       reqhead->length = cpu_to_le64(len);
+
+       /* reference pages in message */
+       reqm->pages = req->r_pages;
+       reqm->nr_pages = req->r_num_pages = num_pages;
+       reqm->hdr.data_len = len;
+       reqm->hdr.data_off = off;
+
+       start_request(osdc, req);
+       return 0;
+}
 
index 20cf1ac4dca325be222ef71196d89da4379e2b75..3af88f8be7270c57c954cb8451c624196c5b2cdf 100644 (file)
@@ -22,18 +22,26 @@ enum {
        REQUEST_DONE   /* read/stat/whatever completed */
 };
 
+struct ceph_osd_request;
+
+typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *);
+
 struct ceph_osd_request {
        __u64             r_tid;
        int               r_aborted;
        int               r_flags;
+       struct ceph_snap_context *r_snapc;
+       struct inode *r_inode;
+       struct writeback_control *r_wbc;
        struct ceph_msg  *r_request;
        struct ceph_entity_addr r_last_osd;  /* last osd we sent request to */
        union ceph_pg     r_pgid;
        struct ceph_msg  *r_reply;
        int               r_result;
        atomic_t          r_ref;
+       ceph_osdc_callback_t r_callback;
        struct completion r_completion;      /* on ack or commit or read? */
-       unsigned          r_num_pages;        /* size of page array (follows) */
+       unsigned          r_num_pages;       /* size of page array (follows) */
        struct page      *r_pages[0];        /* pages for data payload */
 };
 
@@ -62,6 +70,13 @@ extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
                                 struct ceph_msg *msg);
 extern int ceph_osdc_prepare_pages(void *p, struct ceph_msg *m, int want);
 
+extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
+                                     struct ceph_file_layout *layout,
+                                     struct ceph_vino vino,
+                                     __u64 offset, __u64 len, int op,
+                                     struct ceph_snap_context *snapc);
+extern void ceph_osdc_put_request(struct ceph_osd_request *req);
+
 extern int ceph_osdc_readpage(struct ceph_osd_client *osdc,
                              struct ceph_vino vino,
                              struct ceph_file_layout *layout,
@@ -80,6 +95,10 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
                                struct ceph_snap_context *sc,
                                loff_t off, loff_t len,
                                struct page **pagevec, int nr_pages);
+extern int ceph_osdc_writepages_start(struct ceph_osd_client *osdc,
+                                     struct ceph_osd_request *req,
+                                     __u64 len,
+                                     int nr_pages);
 
 extern int ceph_osdc_sync_read(struct ceph_osd_client *osdc,
                               struct ceph_vino vino,