]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: refactor read path, allow O_DIRECT
authorSage Weil <sage@newdream.net>
Fri, 13 Mar 2009 19:38:23 +0000 (12:38 -0700)
committerSage Weil <sage@newdream.net>
Fri, 13 Mar 2009 20:43:40 +0000 (13:43 -0700)
Clean up the read path to only use a single readpages function.
Prepare page vectors in caller.  Allow O_DIRECT reads.

src/kernel/addr.c
src/kernel/file.c
src/kernel/osd_client.c
src/kernel/osd_client.h
src/kernel/super.h

index a79316fc9f7b472f440966abc65aedfc60cd495c..94b5578f42fd2a49931a9977e2356a6fd610ff7d 100644 (file)
@@ -211,10 +211,10 @@ static int readpage_nounlock(struct file *filp, struct page *page)
 
        dout(10, "readpage inode %p file %p page %p index %lu\n",
             inode, filp, page, page->index);
-       err = ceph_osdc_readpage(osdc, ceph_vino(inode), &ci->i_layout,
-                                page->index << PAGE_SHIFT, PAGE_SIZE,
-                                ci->i_truncate_seq, ci->i_truncate_size,
-                                page);
+       err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
+                                 page->index << PAGE_SHIFT, PAGE_SIZE,
+                                 ci->i_truncate_seq, ci->i_truncate_size,
+                                 &page, 1);
        if (unlikely(err < 0)) {
                SetPageError(page);
                goto out;
@@ -233,7 +233,39 @@ static int ceph_readpage(struct file *filp, struct page *page)
 }
 
 /*
- * Read multiple pages.  Most of the work is done in the osd_client.
+ * Build a vector of contiguous pages from the provided page list.
+ */
+static struct page **page_vector_from_list(struct list_head *page_list,
+                                          unsigned *nr_pages)
+{
+       struct page **pages;
+       struct page *page;
+       int next_index, contig_pages = 0;
+
+       /* build page vector */
+       pages = kmalloc(sizeof(*pages) * *nr_pages, GFP_NOFS);
+       if (!pages)
+               return ERR_PTR(-ENOMEM);
+
+       BUG_ON(list_empty(page_list));
+       next_index = list_entry(page_list->prev, struct page, lru)->index;
+       list_for_each_entry_reverse(page, page_list, lru) {
+               if (page->index == next_index) {
+                       dout(20, "readpages page %d %p\n", contig_pages, page);
+                       pages[contig_pages] = page;
+                       contig_pages++;
+                       next_index++;
+               } else {
+                       break;
+               }
+       }
+       *nr_pages = contig_pages;
+       return pages;
+}
+
+/*
+ * Read multiple pages.  Leave pages we don't read + unlock in page_list;
+ * the caller (VM) cleans them up.
  */
 static int ceph_readpages(struct file *file, struct address_space *mapping,
                          struct list_head *page_list, unsigned nr_pages)
@@ -242,27 +274,31 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->osdc;
        int rc = 0;
-       struct page *page;
+       struct page **pages;
        struct pagevec pvec;
        loff_t offset;
 
        dout(10, "readpages %p file %p nr_pages %d\n",
             inode, file, nr_pages);
 
+       pages = page_vector_from_list(page_list, &nr_pages);
+       if (IS_ERR(pages))
+               return PTR_ERR(pages);
+
        /* guess read extent */
-       BUG_ON(list_empty(page_list));
-       page = list_entry(page_list->prev, struct page, lru);
-       offset = page->index << PAGE_CACHE_SHIFT;
-       rc = ceph_osdc_readpages(osdc, mapping, ceph_vino(inode), &ci->i_layout,
+       offset = pages[0]->index << PAGE_CACHE_SHIFT;
+       rc = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
                                 offset, nr_pages << PAGE_CACHE_SHIFT,
                                 ci->i_truncate_seq, ci->i_truncate_size,
-                                page_list, nr_pages);
+                                pages, nr_pages);
        if (rc < 0)
-               return rc;
+               goto out;
 
        /* set uptodate and add to lru in pagevec-sized chunks */
        pagevec_init(&pvec, 0);
        for (; rc > 0; rc -= PAGE_CACHE_SIZE) {
+               struct page *page;
+
                BUG_ON(list_empty(page_list));
                page = list_entry(page_list->prev, struct page, lru);
                list_del(&page->lru);
@@ -290,7 +326,11 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
 #else
        pagevec_lru_add(&pvec);
 #endif
-       return 0;
+       rc = 0;
+
+out:
+       kfree(pages);
+       return rc;
 }
 
 /*
@@ -687,13 +727,13 @@ get_more_pages:
                                offset = page->index << PAGE_CACHE_SHIFT;
                                len = wsize;
                                req = ceph_osdc_new_request(&client->osdc,
-                                                   &ci->i_layout,
-                                                   ceph_vino(inode),
-                                                   offset, &len,
-                                                   CEPH_OSD_OP_WRITE,
-                                                   snapc, do_sync,
-                                                   ci->i_truncate_seq,
-                                                   ci->i_truncate_size);
+                                           &ci->i_layout,
+                                           ceph_vino(inode),
+                                           offset, &len,
+                                           CEPH_OSD_OP_WRITE, 0,
+                                           snapc, do_sync,
+                                           ci->i_truncate_seq,
+                                           ci->i_truncate_size);
                                max_pages = req->r_num_pages;
 
                                rc = -ENOMEM;
index 4ceb89b62a4d415e27d0a23fbf9d6c8bb450b88e..3d439a3fd35ee69f34111ee24766e28ebbea5ef5 100644 (file)
@@ -201,31 +201,6 @@ int ceph_release(struct inode *inode, struct file *file)
        return 0;
 }
 
-/*
- * Completely synchronous read and write methods.  Direct from __user
- * buffer to osd.
- */
-static ssize_t ceph_sync_read(struct file *file, char __user *data,
-                              size_t count, loff_t *offset)
-{
-       struct inode *inode = file->f_dentry->d_inode;
-       struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_client *client = ceph_inode_to_client(inode);
-       int ret = 0;
-       off_t pos = *offset;
-
-       dout(10, "sync_read on file %p %lld~%u\n", file, *offset,
-            (unsigned)count);
-
-       ret = ceph_osdc_sync_read(&client->osdc, ceph_vino(inode),
-                                 &ci->i_layout,
-                                 pos, count, ci->i_truncate_seq,
-                                 ci->i_truncate_size, data);
-       if (ret > 0)
-               *offset = pos + ret;
-       return ret;
-}
-
 /*
  * build a vector of user pages
  */
@@ -266,36 +241,68 @@ static void release_page_vector(struct page **pages, int num_pages)
        kfree(pages);
 }
 
-/*
- * copy user data into a page vector
- */
-static struct page **copy_into_page_vector(const char __user *data,
-                                          int num_pages,
-                                          loff_t off, size_t len)
+static struct page **alloc_page_vector(int num_pages)
 {
        struct page **pages;
-       int i, po, l, left;
-       int rc;
+       int i;
 
        pages = kmalloc(sizeof(*pages) * num_pages, GFP_NOFS);
        if (!pages)
                return ERR_PTR(-ENOMEM);
-
-       left = len;
-       po = off & ~PAGE_MASK;
        for (i = 0; i < num_pages; i++) {
-               int bad;
                pages[i] = alloc_page(GFP_NOFS);
                if (pages[i] == NULL) {
-                       rc = -ENOMEM;
-                       goto fail;
+                       release_page_vector(pages, i);
+                       return ERR_PTR(-ENOMEM);
                }
+       }
+       return pages;
+}
+
+/*
+ * copy user data into a page vector
+ */
+static int copy_user_to_page_vector(struct page **pages,
+                                   const char __user *data,
+                                   loff_t off, size_t len)
+{
+       int i = 0;
+       int po = off & ~PAGE_CACHE_MASK;
+       int left = len;
+       int l, bad;
+
+       while (left > 0) {
                l = min_t(int, PAGE_SIZE-po, left);
                bad = copy_from_user(page_address(pages[i]) + po, data, l);
-               if (bad == l) {
-                       rc = -EFAULT;
-                       goto fail;
+               if (bad == l)
+                       return -EFAULT;
+               data += l - bad;
+               left -= l - bad;
+               if (po) {
+                       po += l - bad;
+                       if (po == PAGE_CACHE_SIZE)
+                               po = 0;
                }
+       }
+       return len;
+}
+
+/*
+ * copy user data from a page vector into a user pointer
+ */
+static int copy_page_vector_to_user(struct page **pages, char __user *data,
+                                   loff_t off, size_t len)
+{
+       int i = 0;
+       int po = off & ~PAGE_CACHE_MASK;
+       int left = len;
+       int l, bad;
+
+       while (left > 0) {
+               l = min_t(int, left, PAGE_CACHE_SIZE-po);
+               bad = copy_to_user(data, page_address(pages[i]) + po, l);
+               if (bad == l)
+                       return -EFAULT;
                data += l - bad;
                left -= l - bad;
                if (po) {
@@ -303,12 +310,78 @@ static struct page **copy_into_page_vector(const char __user *data,
                        if (po == PAGE_CACHE_SIZE)
                                po = 0;
                }
+               i++;
        }
-       return pages;
+       return len;
+}
 
-fail:
-       release_page_vector(pages, i);
-       return ERR_PTR(rc);
+/*
+ * Completely synchronous read and write methods.  Direct from __user
+ * buffer to osd.
+ *
+ * If read spans object boundary, just do multiple reads.
+ *
+ * FIXME: for a correct atomic read, we should take read locks on all
+ * objects.
+ */
+static ssize_t ceph_sync_read(struct file *file, char __user *data,
+                             unsigned left, loff_t *offset)
+{
+       struct inode *inode = file->f_dentry->d_inode;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_client *client = ceph_inode_to_client(inode);
+       long long unsigned start_off = *offset;
+       long long unsigned pos = start_off;
+       struct page **pages, **page_pos;
+       int num_pages = calc_pages_for(start_off, left);
+       int pages_left;
+       int read = 0;
+       int ret;
+
+       dout(10, "sync_read on file %p %llu~%u\n", file, start_off, left);
+
+       if (file->f_flags & O_DIRECT)
+               pages = get_direct_page_vector(data, num_pages, pos, left);
+       else
+               pages = alloc_page_vector(num_pages);
+       if (IS_ERR(pages))
+               return PTR_ERR(pages);
+
+       /*
+        * we may need to do multiple reads.  not atomic, unfortunately.
+        */
+       page_pos = pages;
+       pages_left = num_pages;
+
+more:
+       ret = ceph_osdc_readpages(&client->osdc, ceph_vino(inode),
+                                 &ci->i_layout,
+                                 pos, left, ci->i_truncate_seq,
+                                 ci->i_truncate_size,
+                                 page_pos, pages_left);
+       if (ret > 0) {
+               int didpages =
+                       ((pos & ~PAGE_CACHE_MASK) + ret) >> PAGE_CACHE_SHIFT;
+
+               pos += ret;
+               read += ret;
+               left -= ret;
+               if (left) {
+                       page_pos += didpages;
+                       pages_left -= didpages;
+                       goto more;
+               }
+
+               ret = copy_page_vector_to_user(pages, data, start_off, read);
+               if (ret == 0)
+                       *offset = start_off + read;
+       }
+
+       if (file->f_flags & O_DIRECT)
+               kfree(pages);
+       else
+               release_page_vector(pages, num_pages);
+       return ret;
 }
 
 /*
@@ -319,35 +392,42 @@ fail:
  * objects, rollback on failure, etc.
  */
 static ssize_t ceph_sync_write(struct file *file, const char __user *data,
-                              size_t count, loff_t *offset)
+                              size_t left, loff_t *offset)
 {
        struct inode *inode = file->f_dentry->d_inode;
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_client *client = ceph_inode_to_client(inode);
-       int ret = 0;
-       off_t pos = *offset;
-       int num_pages = calc_pages_for(pos, count);
-       struct page **pages;
-       struct page **page_pos;
-       int pages_left;
-       int flags;
+       struct page **pages, **page_pos;
+       int num_pages, pages_left;
+       long long unsigned pos;
        int written = 0;
+       int flags;
+       int ret;
 
        if (ceph_snap(file->f_dentry->d_inode) != CEPH_NOSNAP)
                return -EROFS;
 
        dout(10, "sync_write on file %p %lld~%u %s\n", file, *offset,
-            (unsigned)count, (file->f_flags & O_DIRECT) ? "O_DIRECT":"");
+            (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT":"");
 
        if (file->f_flags & O_APPEND)
                pos = i_size_read(inode);
-
-       if (file->f_flags & O_DIRECT)
-               pages = get_direct_page_vector(data, num_pages, pos, count);
        else
-               pages = copy_into_page_vector(data, num_pages, pos, count);
-       if (IS_ERR(pages))
-               return PTR_ERR(pages);
+               pos = *offset;
+       num_pages = calc_pages_for(pos, left);
+
+       if (file->f_flags & O_DIRECT) {
+               pages = get_direct_page_vector(data, num_pages, pos, left);
+               if (IS_ERR(pages))
+                       return PTR_ERR(pages);
+       } else {
+               pages = alloc_page_vector(num_pages);
+               if (IS_ERR(pages))
+                       return PTR_ERR(pages);
+               ret = copy_user_to_page_vector(pages, data, pos, left);
+               if (ret < 0)
+                       goto out;
+       }
 
        flags = CEPH_OSD_OP_ORDERSNAP;
        if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)
@@ -364,18 +444,23 @@ more:
        ret = ceph_osdc_writepages(&client->osdc, ceph_vino(inode),
                                   &ci->i_layout,
                                   ci->i_snap_realm->cached_context,
-                                  pos, count, ci->i_truncate_seq,
+                                  pos, left, ci->i_truncate_seq,
                                   ci->i_truncate_size,
                                   page_pos, pages_left,
                                   flags);
        if (ret > 0) {
+               int didpages =
+                       ((pos & ~PAGE_CACHE_MASK) + ret) >> PAGE_CACHE_SHIFT;
+
                pos += ret;
                written += ret;
-               count -= ret;
-               page_pos += (ret >> PAGE_CACHE_SHIFT);
-               pages_left -= (ret >> PAGE_CACHE_SHIFT);
-               if (pages_left)
+               left -= ret;
+               if (left) {
+                       page_pos += didpages;
+                       pages_left -= didpages;
+                       BUG_ON(!pages_left);
                        goto more;
+               }
 
                ret = written;
                *offset = pos;
@@ -383,6 +468,7 @@ more:
                        ceph_inode_set_size(inode, pos);
        }
 
+out:
        if (file->f_flags & O_DIRECT)
                kfree(pages);
        else
@@ -415,8 +501,8 @@ static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov,
                            &got, -1);
        if (ret < 0)
                goto out;
-       dout(10, "aio_read %llx.%llx %llu~%u got cap refs %d\n",
-            ceph_vinop(inode), pos, (unsigned)len, got);
+       dout(10, "aio_read %llx.%llx %llu~%u got cap refs on %s\n",
+            ceph_vinop(inode), pos, (unsigned)len, ceph_cap_string(got));
 
        if ((got & CEPH_CAP_FILE_RDCACHE) == 0 ||
            (iocb->ki_filp->f_flags & O_DIRECT) ||
@@ -427,8 +513,8 @@ static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov,
                ret = generic_file_aio_read(iocb, iov, nr_segs, pos);
 
 out:
-       dout(10, "aio_read %llx.%llx dropping cap refs on %d\n",
-            ceph_vinop(inode), got);
+       dout(10, "aio_read %llx.%llx dropping cap refs on %s\n",
+            ceph_vinop(inode), ceph_cap_string(got));
        ceph_put_cap_refs(ci, got);
        return ret;
 }
@@ -495,11 +581,12 @@ retry_snap:
        if (ret < 0)
                goto out;
 
-       dout(10, "aio_write %p %llu~%u  got %s\n",
+       dout(10, "aio_write %p %llu~%u  got cap refs on %s\n",
             inode, pos, (unsigned)iov->iov_len, ceph_cap_string(got));
 
        if ((got & CEPH_CAP_FILE_WRBUFFER) == 0 ||
-           (iocb->ki_filp->f_flags & O_DIRECT)) {
+           (iocb->ki_filp->f_flags & O_DIRECT) ||
+           (inode->i_sb->s_flags & MS_SYNCHRONOUS)) {
                ret = ceph_sync_write(file, iov->iov_base, iov->iov_len,
                        &iocb->ki_pos);
        } else {
@@ -514,8 +601,8 @@ retry_snap:
                ci->i_dirty_caps |= CEPH_CAP_FILE_WR;
 
 out:
-       dout(10, "aio_write %p %llu~%u  dropping cap refs on %d\n",
-            inode, pos, (unsigned)iov->iov_len, got);
+       dout(10, "aio_write %p %llu~%u  dropping cap refs on %s\n",
+            inode, pos, (unsigned)iov->iov_len, ceph_cap_string(got));
        ceph_put_cap_refs(ci, got);
 
        if (ret == -EOLDSNAPC) {
index f32e58e9457fe48be7035dad7d3d3996393866ee..241addb42f9e68823ea1775aa55e8ca6620ea091 100644 (file)
@@ -87,7 +87,8 @@ void ceph_osdc_put_request(struct ceph_osd_request *req)
 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
                                               struct ceph_file_layout *layout,
                                               struct ceph_vino vino,
-                                              u64 off, u64 *plen, int opcode,
+                                              u64 off, u64 *plen,
+                                              int opcode, int flags,
                                               struct ceph_snap_context *snapc,
                                               int do_sync,
                                               u32 truncate_seq,
@@ -123,7 +124,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        snaps = (void *)(op + num_op);
 
        head->client_inc = cpu_to_le32(1); /* always, for now. */
-       head->flags = 0;
+       head->flags = flags;
        head->num_ops = cpu_to_le16(num_op);
        op->op = cpu_to_le16(opcode);
 
@@ -766,192 +767,35 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
        }
 }
 
-
-
 /*
- * synchronous read direct to user buffer.
- *
- * if read spans object boundary, just do two separate reads.
- *
- * FIXME: for a correct atomic read, we should take read locks on all
- * objects.
- */
-int ceph_osdc_sync_read(struct ceph_osd_client *osdc, struct ceph_vino vino,
-                       struct ceph_file_layout *layout,
-                       u64 off, u64 len,
-                       u32 truncate_seq, u64 truncate_size,
-                       char __user *data)
-{
-       struct ceph_osd_request *req;
-       int i, po, left, l;
-       int rc;
-       int finalrc = 0;
-
-       dout(10, "sync_read on vino %llx.%llx at %llu~%llu\n", vino.ino,
-            vino.snap, off, len);
-
-more:
-       req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
-                                   CEPH_OSD_OP_READ, NULL, 0,
-                                   truncate_seq, truncate_size);
-       if (IS_ERR(req))
-               return PTR_ERR(req);
-
-       dout(10, "sync_read %llu~%llu -> %d pages\n", off, len,
-            req->r_num_pages);
-
-       /* allocate temp pages to hold data */
-       for (i = 0; i < req->r_num_pages; i++) {
-               req->r_pages[i] = alloc_page(GFP_NOFS);
-               if (req->r_pages[i] == NULL) {
-                       req->r_num_pages = i+1;
-                       ceph_osdc_put_request(req);
-                       return -ENOMEM;
-               }
-       }
-
-       rc = do_sync_request(osdc, req);
-       if (rc > 0) {
-               /* copy into user buffer */
-               po = off & ~PAGE_CACHE_MASK;
-               left = rc;
-               i = 0;
-               while (left > 0) {
-                       int bad;
-                       l = min_t(int, left, PAGE_CACHE_SIZE-po);
-                       bad = copy_to_user(data,
-                                          page_address(req->r_pages[i]) + po,
-                                          l);
-                       if (bad == l) {
-                               rc = -EFAULT;
-                               goto out;
-                       }
-                       data += l - bad;
-                       left -= l - bad;
-                       if (po) {
-                               po += l - bad;
-                               if (po == PAGE_CACHE_SIZE)
-                                       po = 0;
-                       }
-                       i++;
-               }
-       }
-out:
-       ceph_osdc_put_request(req);
-       if (rc > 0) {
-               finalrc += rc;
-               off += rc;
-               len -= rc;
-               if (len > 0)
-                       goto more;
-       } else {
-               finalrc = rc;
-       }
-       dout(10, "sync_read result %d\n", finalrc);
-       return finalrc;
-}
-
-/*
- * Read a single page.  Return number of bytes read (or zeroed).
- */
-int ceph_osdc_readpage(struct ceph_osd_client *osdc, struct ceph_vino vino,
-                      struct ceph_file_layout *layout,
-                      u64 off, u64 len,
-                      u32 truncate_seq, u64 truncate_size,
-                      struct page *page)
-{
-       struct ceph_osd_request *req;
-       int rc, read = 0;
-
-       dout(10, "readpage on ino %llx.%llx at %lld~%lld\n", vino.ino,
-            vino.snap, off, len);
-       req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
-                                   CEPH_OSD_OP_READ, NULL, 0,
-                                   truncate_seq, truncate_size);
-       if (IS_ERR(req))
-               return PTR_ERR(req);
-       BUG_ON(len != PAGE_CACHE_SIZE);
-
-       req->r_pages[0] = page;
-       rc = do_sync_request(osdc, req);
-
-       if (rc >= 0) {
-               read = rc;
-               rc = len;
-       } else if (rc == -ENOENT) {
-               rc = len;
-       }
-
-       if (read < PAGE_CACHE_SIZE) {
-               dout(10, "readpage zeroing %p from %d\n", page, read);
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 25)
-               zero_user_segment(page, read, PAGE_CACHE_SIZE);
-#else
-               zero_user_page(page, read, PAGE_CACHE_SIZE-read, KM_USER0);
-#endif
-       }
-
-       ceph_osdc_put_request(req);
-       dout(10, "readpage result %d\n", rc);
-       return rc;
-}
-
-/*
- * Read some contiguous pages from page_list.  Return number of bytes
- * read (or zeroed).
+ * Read some contiguous pages.  Return number of bytes read (or
+ * zeroed).
  */
 int ceph_osdc_readpages(struct ceph_osd_client *osdc,
-                       struct address_space *mapping,
                        struct ceph_vino vino, struct ceph_file_layout *layout,
                        u64 off, u64 len,
                        u32 truncate_seq, u64 truncate_size,
-                       struct list_head *page_list, int num_pages)
+                       struct page **pages, int num_pages)
 {
        struct ceph_osd_request *req;
-       struct ceph_osd_request_head *reqhead;
-       struct ceph_osd_op *op;
+       int i;
        struct page *page;
-       pgoff_t next_index;
-       int contig_pages = 0;
-       int i = 0;
        int rc = 0, read = 0;
 
-       /*
-        * for now, our strategy is simple: start with the
-        * initial page, and fetch as much of that object as
-        * we can that falls within the range specified by
-        * num_pages.
-        */
        dout(10, "readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
             vino.snap, off, len);
-
-       /* alloc request, w/ optimistically-sized page vector */
        req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
-                                   CEPH_OSD_OP_READ, NULL, 0,
+                                   CEPH_OSD_OP_READ, 0, NULL, 0,
                                    truncate_seq, truncate_size);
        if (IS_ERR(req))
                return PTR_ERR(req);
 
-       /* build vector from page_list */
-       next_index = list_entry(page_list->prev, struct page, lru)->index;
-       list_for_each_entry_reverse(page, page_list, lru) {
-               if (page->index == next_index) {
-                       dout(20, "readpages page %d %p\n", contig_pages, page);
-                       req->r_pages[contig_pages] = page;
-                       contig_pages++;
-                       next_index++;
-               } else {
-                       break;
-               }
-       }
-       BUG_ON(!contig_pages);
-       len = min((contig_pages << PAGE_CACHE_SHIFT) - (off & ~PAGE_CACHE_MASK),
-                 len);
-       req->r_num_pages = contig_pages;
-       reqhead = req->r_request->front.iov_base;
-       op = (void *)(reqhead + 1);
-       op->length = cpu_to_le64(len);
-       dout(10, "readpages final extent is %llu~%llu -> %d pages\n",
+       /* it may be a short read due to an object boundary */
+       req->r_pages = pages;
+       num_pages = calc_pages_for(off, len);
+       req->r_num_pages = num_pages;
+
+       dout(10, "readpages final extent is %llu~%llu (%d pages)\n",
             off, len, req->r_num_pages);
        rc = do_sync_request(osdc, req);
 
@@ -963,10 +807,10 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
        }
 
        /* zero trailing pages on success */
-       if (read < (contig_pages << PAGE_CACHE_SHIFT)) {
+       if (read < (num_pages << PAGE_CACHE_SHIFT)) {
                if (read & ~PAGE_CACHE_MASK) {
                        i = read >> PAGE_CACHE_SHIFT;
-                       page = req->r_pages[i];
+                       page = pages[i];
                        dout(20, "readpages zeroing %d %p from %d\n", i, page,
                             (int)(read & ~PAGE_CACHE_MASK));
 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 25)
@@ -979,7 +823,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
 #endif
                        read += PAGE_CACHE_SIZE;
                }
-               for (i = read >> PAGE_CACHE_SHIFT; i < contig_pages; i++) {
+               for (i = read >> PAGE_CACHE_SHIFT; i < num_pages; i++) {
                        page = req->r_pages[i];
                        dout(20, "readpages zeroing %d %p\n", i, page);
 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 25)
@@ -1007,32 +851,27 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
                         int flags)
 {
        struct ceph_msg *reqm;
-       struct ceph_osd_request_head *reqhead;
-       struct ceph_osd_op *op;
        struct ceph_osd_request *req;
        int rc = 0;
 
        BUG_ON(vino.snap != CEPH_NOSNAP);
-
        req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
-                                   CEPH_OSD_OP_WRITE, snapc, 0,
+                                   CEPH_OSD_OP_WRITE,
+                                   flags | CEPH_OSD_OP_ONDISK |
+                                   CEPH_OSD_OP_MODIFY,
+                                   snapc, 0,
                                    truncate_seq, truncate_size);
        if (IS_ERR(req))
                return PTR_ERR(req);
-       reqm = req->r_request;
-       reqhead = reqm->front.iov_base;
-       op = (void *)(reqhead + 1);
-
-       reqhead->flags = cpu_to_le32(flags | 
-                                    CEPH_OSD_OP_ONDISK |
-                                    CEPH_OSD_OP_MODIFY);
 
-       len = le64_to_cpu(op->length);
-       dout(10, "writepages %llu~%llu -> %d pages\n", off, len,
+       /* it may be a short write due to an object boundary */
+       req->r_pages = pages;
+       req->r_num_pages = calc_pages_for(off, len);
+       dout(10, "writepages %llu~%llu (%d pages)\n", off, len,
             req->r_num_pages);
 
-       /* copy page vector */
-       req->r_pages = pages;
+       /* set up data payload */
+       reqm = req->r_request;
        reqm->pages = pages;
        reqm->nr_pages = req->r_num_pages;
        reqm->hdr.data_len = cpu_to_le32(len);
index c385f75be494b446e41d85ffd0aa71ae7f00ea90..74ed43b7fe166692db9422be564599b4bd5d25d0 100644 (file)
@@ -107,25 +107,18 @@ extern int ceph_osdc_prepare_pages(void *p, struct ceph_msg *m, int want);
 extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
                                      struct ceph_file_layout *layout,
                                      struct ceph_vino vino,
-                                     u64 offset, u64 *len, int op,
+                                     u64 offset, u64 *len, int op, int flags,
                                      struct ceph_snap_context *snapc,
                                      int do_sync, u32 truncate_seq,
                                      u64 truncate_size);
 extern void ceph_osdc_put_request(struct ceph_osd_request *req);
 
-extern int ceph_osdc_readpage(struct ceph_osd_client *osdc,
-                             struct ceph_vino vino,
-                             struct ceph_file_layout *layout,
-                             u64 off, u64 len,
-                             u32 truncate_seq, u64 truncate_size,
-                             struct page *page);
 extern int ceph_osdc_readpages(struct ceph_osd_client *osdc,
-                              struct address_space *mapping,
                               struct ceph_vino vino,
                               struct ceph_file_layout *layout,
                               u64 off, u64 len,
                               u32 truncate_seq, u64 truncate_size,
-                              struct list_head *page_list, int nr_pages);
+                              struct page **pages, int nr_pages);
 
 extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
                                struct ceph_vino vino,
@@ -133,19 +126,12 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
                                struct ceph_snap_context *sc,
                                u64 off, u64 len,
                                u32 truncate_seq, u64 truncate_size,
-                               struct page **pagevec, int nr_pages,
+                               struct page **pages, int nr_pages,
                                int flags);
 extern int ceph_osdc_writepages_start(struct ceph_osd_client *osdc,
                                      struct ceph_osd_request *req,
                                      u64 len,
                                      int nr_pages);
 
-extern int ceph_osdc_sync_read(struct ceph_osd_client *osdc,
-                              struct ceph_vino vino,
-                              struct ceph_file_layout *layout,
-                              u64 off, u64 len,
-                              u32 truncate_seq, u64 truncate_size,
-                              char __user *data);
-
 #endif
 
index 5c19c5a5800ad89c1dd29c54c1fa8a598f5b5fa5..d643b140e1091ada029667dde3374da40eba56de 100644 (file)
@@ -485,7 +485,7 @@ static inline int __ceph_caps_wanted(struct ceph_inode_info *ci)
 {
        int w = __ceph_caps_file_wanted(ci) | __ceph_caps_used(ci);
        if (w & CEPH_CAP_FILE_WRBUFFER)
-               w |= (CEPH_CAP_FILE_EXCL);  /* we want EXCL if we have dirty data */
+               w |= (CEPH_CAP_FILE_EXCL);  /* we want EXCL if dirty data */
        return w;
 }