]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: refactor readpages, striping
authorSage Weil <sage@newdream.net>
Tue, 11 Aug 2009 22:20:51 +0000 (15:20 -0700)
committerSage Weil <sage@newdream.net>
Tue, 11 Aug 2009 22:20:51 +0000 (15:20 -0700)
Make zero filling conditional on whether we did a short read or
the caller wants zeroed pages (e.g. O_DIRECT).  Simply lowest level
osdc readpages to do a short read if it hits a stripe boundary;
that is sufficient for the ->readpages() aops.

src/kernel/addr.c
src/kernel/file.c
src/kernel/osd_client.c
src/kernel/osd_client.h

index f6658ee991a73f7ed3e15abcc07bf10d6134d3b5..229a5d2b5dcaaa0fe14b91c2a0d6819b11f594cd 100644 (file)
@@ -200,13 +200,14 @@ static int readpage_nounlock(struct file *filp, struct page *page)
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->osdc;
        int err = 0;
+       u64 len = PAGE_CACHE_SIZE;
 
        dout("readpage inode %p file %p page %p index %lu\n",
             inode, filp, page, page->index);
        err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
-                                 page->index << PAGE_SHIFT, PAGE_SIZE,
+                                 page->index << PAGE_CACHE_SHIFT, &len,
                                  ci->i_truncate_seq, ci->i_truncate_size,
-                                 &page, 1, 1);
+                                 &page, 1);
        if (unlikely(err < 0)) {
                SetPageError(page);
                goto out;
@@ -269,6 +270,7 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
        struct page **pages;
        struct pagevec pvec;
        loff_t offset;
+       u64 len;
 
        dout("readpages %p file %p nr_pages %d\n",
             inode, file, nr_pages);
@@ -279,10 +281,11 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
 
        /* guess read extent */
        offset = pages[0]->index << PAGE_CACHE_SHIFT;
+       len = nr_pages << PAGE_CACHE_SHIFT;
        rc = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
-                                offset, nr_pages << PAGE_CACHE_SHIFT,
+                                offset, &len,
                                 ci->i_truncate_seq, ci->i_truncate_size,
-                                pages, nr_pages, 1);
+                                pages, nr_pages);
        if (rc < 0)
                goto out;
 
index 1ffa67afd1c70c3a631fa0100bc74c4973057a41..156cb4735451cb47439fa1a520674c3ba959ff3c 100644 (file)
@@ -343,7 +343,7 @@ static int copy_user_to_page_vector(struct page **pages,
        int l, bad;
 
        while (left > 0) {
-               l = min_t(int, PAGE_SIZE-po, left);
+               l = min_t(int, PAGE_CACHE_SIZE-po, left);
                bad = copy_from_user(page_address(pages[i]) + po, data, l);
                if (bad == l)
                        return -EFAULT;
@@ -386,6 +386,129 @@ static int copy_page_vector_to_user(struct page **pages, char __user *data,
        return len;
 }
 
+/*
+ * Zero an extent within a page vector.  Offset is relative to the
+ * start of the first page.
+ */
+static void zero_page_vector_range(int off, int len, struct page **pages)
+{
+       int i = off >> PAGE_CACHE_SHIFT;
+
+       dout("zero_page_vector_page %u~%u\n", off, len);
+       BUG_ON(len < PAGE_CACHE_SIZE);
+
+       /* leading partial page? */
+       if (off & ~PAGE_CACHE_MASK) {
+               dout("zeroing %d %p head from %d\n", i, pages[i],
+                    (int)(off & ~PAGE_CACHE_MASK));
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 25)
+               zero_user_segment(pages[i], off & ~PAGE_CACHE_MASK,
+                                 PAGE_CACHE_SIZE);
+#else
+               zero_user_page(pages[i], off & ~PAGE_CACHE_MASK,
+                              PAGE_CACHE_SIZE - (off & ~PAGE_CACHE_MASK),
+                              KM_USER0);
+#endif
+               off += PAGE_CACHE_SIZE;
+               off &= PAGE_CACHE_MASK;
+               i++;
+       }
+       while (len >= PAGE_CACHE_SIZE) {
+               dout("zeroing %d %p\n", i, pages[i]);
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 25)
+               zero_user_segment(pages[i], 0, PAGE_CACHE_SIZE);
+#else
+               zero_user_page(pages[i], 0, PAGE_CACHE_SIZE, KM_USER0);
+#endif
+               off += PAGE_CACHE_SIZE;
+               len -= PAGE_CACHE_SIZE;
+               i++;
+       }
+       /* trailing partial page? */
+       if (len) {
+               dout("zeroing %d %p tail to %d\n", i, pages[i], (int)len);
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 25)
+               zero_user_segment(pages[i], 0, len);
+#else
+               zero_user_page(pages[i], 0, PAGE_CACHE_SIZE - len, KM_USER0);
+#endif
+       }
+}
+
+
+/*
+ * Read a range of bytes striped over one or more objects.  Iterate over
+ * objects we stripe over.  (That's not atomic, but good enough for now.)
+ *
+ * If @fill, zero any regions that are holes or past object EOF on
+ * disk.
+ */
+static int striped_read(struct inode *inode,
+                       u64 off, u64 len,
+                       struct page **pages, int num_pages,
+                       bool fill)
+{
+       struct ceph_client *client = ceph_inode_to_client(inode);
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       u64 pos, this_len;
+       int page_off = off & ~PAGE_CACHE_SIZE; /* first byte's offset in page */
+       int left, pages_left;
+       int read;
+       struct page **page_pos;
+       int ret;
+       bool was_short;
+
+       /*
+        * we may need to do multiple reads.  not atomic, unfortunately.
+        */
+       pos = off;
+       left = len;
+       page_pos = pages;
+       pages_left = num_pages;
+       read = 0;
+
+more:
+       this_len = left;
+       ret = ceph_osdc_readpages(&client->osdc, ceph_vino(inode),
+                                 &ci->i_layout, pos, &this_len,
+                                 ci->i_truncate_seq,
+                                 ci->i_truncate_size,
+                                 page_pos, pages_left);
+       was_short = this_len < left;
+       if (ret == -ENOENT)
+               ret = 0;
+       dout("striped_read %llu~%u (read %u) got %d%s\n", pos, left, read, ret,
+            was_short ? " SHORT" : "");
+
+       if (ret > 0) {
+               int didpages =
+                       ((pos & ~PAGE_CACHE_MASK) + ret) >> PAGE_CACHE_SHIFT;
+
+               if (read < pos - off) {
+                       dout(" zero gap %llu to %llu\n", off + read, pos);
+                       zero_page_vector_range(page_off + read,
+                                              pos - off - read, pages);
+               }
+               pos += ret;
+               read = pos - off;
+               left -= ret;
+               if (left && was_short) {
+                       page_pos += didpages;
+                       pages_left -= didpages;
+                       goto more;
+               }
+               if (fill) {
+                       dout("zero tail\n");
+                       zero_page_vector_range(page_off + read,
+                                              len - read, pages);
+               }
+       }
+       if (ret >= 0)
+               ret = read;
+       dout("striped_read returns %d\n", ret);
+       return ret;
+}
+
 /*
  * Completely synchronous read and write methods.  Direct from __user
  * buffer to osd, or directly to user pages (if O_DIRECT).
@@ -393,24 +516,19 @@ static int copy_page_vector_to_user(struct page **pages, char __user *data,
  * If the read spans object boundary, just do multiple reads.
  */
 static ssize_t ceph_sync_read(struct file *file, char __user *data,
-                             unsigned left, loff_t *offset)
+                             unsigned len, loff_t *poff)
 {
        struct inode *inode = file->f_dentry->d_inode;
-       struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_client *client = ceph_inode_to_client(inode);
-       long long unsigned start_off = *offset;
-       long long unsigned pos = start_off;
-       struct page **pages, **page_pos;
-       int num_pages = calc_pages_for(start_off, left);
-       int pages_left;
-       int read = 0;
+       struct page **pages;
+       u64 off = *poff;
+       int num_pages = calc_pages_for(off, len);
        int ret;
 
-       dout("sync_read on file %p %llu~%u %s\n", file, start_off, left,
+       dout("sync_read on file %p %llu~%u %s\n", file, off, len,
             (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
 
        if (file->f_flags & O_DIRECT) {
-               pages = get_direct_page_vector(data, num_pages, pos, left);
+               pages = get_direct_page_vector(data, num_pages, off, len);
 
                /*
                 * flush any page cache pages in this range.  this
@@ -425,45 +543,19 @@ static ssize_t ceph_sync_read(struct file *file, char __user *data,
        if (IS_ERR(pages))
                return PTR_ERR(pages);
 
-       /*
-        * we may need to do multiple reads.  not atomic, unfortunately.
-        */
-       page_pos = pages;
-       pages_left = num_pages;
+       ret = striped_read(inode, off, len, pages, num_pages,
+                          file->f_flags & O_DIRECT);
 
-more:
-       ret = ceph_osdc_readpages(&client->osdc, ceph_vino(inode),
-                                 &ci->i_layout,
-                                 pos, left, ci->i_truncate_seq,
-                                 ci->i_truncate_size,
-                                 page_pos, pages_left,
-                                 file->f_flags & O_DIRECT);
-       if (ret > 0) {
-               int didpages =
-                       ((pos & ~PAGE_CACHE_MASK) + ret) >> PAGE_CACHE_SHIFT;
-
-               pos += ret;
-               read += ret;
-               left -= ret;
-               if (left) {
-                       page_pos += didpages;
-                       pages_left -= didpages;
-                       goto more;
-               }
-       }
-       if (ret >= 0) {
-               ret = copy_page_vector_to_user(pages, data, start_off, read);
-               if (ret >= 0) {
-                       *offset = start_off + read;
-                       ret = read;
-               }
-       }
+       if (ret >= 0 && (file->f_flags & O_DIRECT) == 0)
+               ret = copy_page_vector_to_user(pages, data, off, ret);
+       if (ret >= 0)
+               *poff = off + ret;
 
        if (file->f_flags & O_DIRECT)
                put_page_vector(pages, num_pages);
        else
                ceph_release_page_vector(pages, num_pages);
-       dout("sync_read read %d result %d\n", read, ret);
+       dout("sync_read result %d\n", ret);
        return ret;
 }
 
index 82827aad80f026d7160c5233ee57d82599d6989f..3f2dc4a938a61c6ff90b75e3d2abb2c18819a83c 100644 (file)
@@ -953,26 +953,21 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
 }
 
 /*
- * Read some contiguous pages.  If @fill, return number of bytes read/zeroed
- * (i.e., the range we tried to read).  If not @fill, return the number of
- * bytes actually read.  (We do this because mapping readpages and O_DIRECT
- * reads should zero out the full extent, but regular sync reads don't care.)
+ * Read some contiguous pages.  If we cross a stripe boundary, shorten
+ * *plen.  Return number of bytes read, or error.
  */
 int ceph_osdc_readpages(struct ceph_osd_client *osdc,
                        struct ceph_vino vino, struct ceph_file_layout *layout,
-                       u64 off, u64 len,
+                       u64 off, u64 *plen,
                        u32 truncate_seq, u64 truncate_size,
-                       struct page **pages, int num_pages,
-                       int fill)
+                       struct page **pages, int num_pages)
 {
        struct ceph_osd_request *req;
-       int i;
-       struct page *page;
-       int rc = 0, read = 0;
+       int rc = 0;
 
        dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
-            vino.snap, off, len);
-       req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
+            vino.snap, off, *plen);
+       req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
                                    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
                                    NULL, 0, truncate_seq, truncate_size, NULL,
                                    false);
@@ -981,53 +976,16 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
 
        /* it may be a short read due to an object boundary */
        req->r_pages = pages;
-       num_pages = calc_pages_for(off, len);
+       num_pages = calc_pages_for(off, *plen);
        req->r_num_pages = num_pages;
 
-       dout("readpages final extent is %llu~%llu (%d pages)\n",
-            off, len, req->r_num_pages);
+       dout("readpages  final extent is %llu~%llu (%d pages)\n",
+            off, *plen, req->r_num_pages);
 
        rc = ceph_osdc_start_request(osdc, req, false);
        if (!rc)
                rc = ceph_osdc_wait_request(osdc, req);
 
-       if (rc >= 0) {
-               read = rc;
-               if (fill)
-                       rc = len;
-       } else if (rc == -ENOENT) {
-               if (fill)
-                       rc = len;
-       }
-
-       /* zero trailing pages on success? */
-       if (fill && read < (num_pages << PAGE_CACHE_SHIFT)) {
-               if (read & ~PAGE_CACHE_MASK) {
-                       i = read >> PAGE_CACHE_SHIFT;
-                       page = pages[i];
-                       dout("readpages zeroing %d %p from %d\n", i, page,
-                            (int)(read & ~PAGE_CACHE_MASK));
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 25)
-                       zero_user_segment(page, read & ~PAGE_CACHE_MASK,
-                                         PAGE_CACHE_SIZE);
-#else
-                       zero_user_page(page, read & ~PAGE_CACHE_MASK,
-                              PAGE_CACHE_SIZE - (read & ~PAGE_CACHE_MASK),
-                              KM_USER0);
-#endif
-                       read += PAGE_CACHE_SIZE;
-               }
-               for (i = read >> PAGE_CACHE_SHIFT; i < num_pages; i++) {
-                       page = req->r_pages[i];
-                       dout("readpages zeroing %d %p\n", i, page);
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 25)
-                       zero_user_segment(page, 0, PAGE_CACHE_SIZE);
-#else
-                       zero_user_page(page, 0, PAGE_CACHE_SIZE, KM_USER0);
-#endif
-               }
-       }
-
        ceph_osdc_put_request(req);
        dout("readpages result %d\n", rc);
        return rc;
index b7270ecdda7de7c2656e894349c782decbc342ea..eaffbfcb61b4845ae0be98b1de7a8e1f148cde3f 100644 (file)
@@ -118,9 +118,9 @@ extern void ceph_osdc_sync(struct ceph_osd_client *osdc);
 extern int ceph_osdc_readpages(struct ceph_osd_client *osdc,
                               struct ceph_vino vino,
                               struct ceph_file_layout *layout,
-                              u64 off, u64 len,
+                              u64 off, u64 *plen,
                               u32 truncate_seq, u64 truncate_size,
-                              struct page **pages, int nr_pages, int fill);
+                              struct page **pages, int nr_pages);
 
 extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
                                struct ceph_vino vino,