]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: refactor write path to facilitate sync or O_DIRECT writes
authorSage Weil <sage@newdream.net>
Fri, 13 Mar 2009 17:38:20 +0000 (10:38 -0700)
committerSage Weil <sage@newdream.net>
Fri, 13 Mar 2009 20:43:40 +0000 (13:43 -0700)
Refactor osd_client writepages to always take a page vector, owned
by the caller.  Move user data copy for regular sync write into
file.c.  Alternatively, build a page vector of user pages for
O_DIRECT writes.  Fix ->writepages async callers to allocate and
free the page vector.

src/kernel/addr.c
src/kernel/file.c
src/kernel/osd_client.c
src/kernel/osd_client.h

index 96c5c37df42a35f60d7ca72045371d98c5aab87f..a79316fc9f7b472f440966abc65aedfc60cd495c 100644 (file)
@@ -388,7 +388,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
                                   &ci->i_layout, snapc,
                                   page_off, len,
                                   ci->i_truncate_seq, ci->i_truncate_size,
-                                  &page, 1);
+                                  &page, 1, 0);
        if (err < 0) {
                dout(20, "writepage setting page error %p\n", page);
                SetPageError(page);
@@ -497,6 +497,7 @@ static void writepages_finish(struct ceph_osd_request *req)
        ceph_put_wrbuffer_cap_refs(ci, req->r_num_pages, snapc);
 
        ceph_release_pages(req->r_pages, req->r_num_pages);
+       kfree(req->r_pages);
        ceph_osdc_put_request(req);
 }
 
@@ -513,7 +514,6 @@ static int ceph_writepages_start(struct address_space *mapping,
        pgoff_t index, start, end;
        int range_whole = 0;
        int should_loop = 1;
-       struct page **pages = NULL;
        pgoff_t max_pages = 0, max_pages_ever = 0;
        struct ceph_snap_context *snapc = NULL, *last_snapc = NULL;
        struct pagevec *pvec;
@@ -695,7 +695,12 @@ get_more_pages:
                                                    ci->i_truncate_seq,
                                                    ci->i_truncate_size);
                                max_pages = req->r_num_pages;
-                               pages = req->r_pages;
+
+                               rc = -ENOMEM;
+                               req->r_pages = kmalloc(sizeof(*req->r_pages) *
+                                                      max_pages, GFP_NOFS);
+                               if (req->r_pages == NULL)
+                                       goto out;
                                req->r_callback = writepages_finish;
                                req->r_inode = inode;
                                req->r_wbc = wbc;
@@ -707,7 +712,7 @@ get_more_pages:
                        dout(20, "%p will write page %p idx %lu\n",
                             inode, page, page->index);
                        set_page_writeback(page);
-                       pages[locked_pages] = page;
+                       req->r_pages[locked_pages] = page;
                        locked_pages++;
                        next = page->index + 1;
                }
@@ -737,7 +742,7 @@ get_more_pages:
                }
 
                /* submit the write */
-               offset = pages[0]->index << PAGE_CACHE_SHIFT;
+               offset = req->r_pages[0]->index << PAGE_CACHE_SHIFT;
                len = min(i_size_read(inode) - offset,
                          (u64)locked_pages << PAGE_CACHE_SHIFT);
                dout(10, "writepages got %d pages at %llu~%llu\n",
index a5e429618647095d5d5fcfed930caf6b4db2a686..4ceb89b62a4d415e27d0a23fbf9d6c8bb450b88e 100644 (file)
@@ -226,6 +226,98 @@ static ssize_t ceph_sync_read(struct file *file, char __user *data,
        return ret;
 }
 
+/*
+ * build a vector of user pages
+ */
+static struct page **get_direct_page_vector(const char __user *data,
+                                           int num_pages,
+                                           loff_t off, size_t len)
+{
+       struct page **pages;
+       int rc;
+
+       if ((off & ~PAGE_CACHE_MASK) ||
+           (len & ~PAGE_CACHE_MASK))
+               return ERR_PTR(-EINVAL);
+
+       pages = kmalloc(sizeof(*pages) * num_pages, GFP_NOFS);
+       if (!pages)
+               return ERR_PTR(-ENOMEM);
+
+       down_read(&current->mm->mmap_sem);
+       rc = get_user_pages(current, current->mm, (unsigned long)data,
+                           num_pages, 0, 0, pages, NULL);
+       up_read(&current->mm->mmap_sem);
+       if (rc < 0)
+               goto fail;
+       return pages;
+
+fail:
+       kfree(pages);
+       return ERR_PTR(rc);
+}
+
+static void release_page_vector(struct page **pages, int num_pages)
+{
+       int i;
+
+       for (i = 0; i < num_pages; i++)
+               __free_pages(pages[i], 0);
+       kfree(pages);
+}
+
+/*
+ * copy user data into a page vector
+ */
+static struct page **copy_into_page_vector(const char __user *data,
+                                          int num_pages,
+                                          loff_t off, size_t len)
+{
+       struct page **pages;
+       int i, po, l, left;
+       int rc;
+
+       pages = kmalloc(sizeof(*pages) * num_pages, GFP_NOFS);
+       if (!pages)
+               return ERR_PTR(-ENOMEM);
+
+       left = len;
+       po = off & ~PAGE_MASK;
+       for (i = 0; i < num_pages; i++) {
+               int bad;
+               pages[i] = alloc_page(GFP_NOFS);
+               if (pages[i] == NULL) {
+                       rc = -ENOMEM;
+                       goto fail;
+               }
+               l = min_t(int, PAGE_SIZE-po, left);
+               bad = copy_from_user(page_address(pages[i]) + po, data, l);
+               if (bad == l) {
+                       rc = -EFAULT;
+                       goto fail;
+               }
+               data += l - bad;
+               left -= l - bad;
+               if (po) {
+                       po += l - bad;
+                       if (po == PAGE_CACHE_SIZE)
+                               po = 0;
+               }
+       }
+       return pages;
+
+fail:
+       release_page_vector(pages, i);
+       return ERR_PTR(rc);
+}
+
+/*
+ * synchronous write.  from userspace.
+ *
+ * FIXME: if write spans object boundary, just do two separate write.
+ * for a correct atomic write, we should take write locks on all
+ * objects, rollback on failure, etc.
+ */
 static ssize_t ceph_sync_write(struct file *file, const char __user *data,
                               size_t count, loff_t *offset)
 {
@@ -234,28 +326,67 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
        struct ceph_client *client = ceph_inode_to_client(inode);
        int ret = 0;
        off_t pos = *offset;
+       int num_pages = calc_pages_for(pos, count);
+       struct page **pages;
+       struct page **page_pos;
+       int pages_left;
+       int flags;
+       int written = 0;
 
        if (ceph_snap(file->f_dentry->d_inode) != CEPH_NOSNAP)
                return -EROFS;
 
-       dout(10, "sync_write on file %p %lld~%u\n", file, *offset,
-            (unsigned)count);
+       dout(10, "sync_write on file %p %lld~%u %s\n", file, *offset,
+            (unsigned)count, (file->f_flags & O_DIRECT) ? "O_DIRECT":"");
 
        if (file->f_flags & O_APPEND)
                pos = i_size_read(inode);
 
-       ret = ceph_osdc_sync_write(&client->osdc, ceph_vino(inode),
+       if (file->f_flags & O_DIRECT)
+               pages = get_direct_page_vector(data, num_pages, pos, count);
+       else
+               pages = copy_into_page_vector(data, num_pages, pos, count);
+       if (IS_ERR(pages))
+               return PTR_ERR(pages);
+
+       flags = CEPH_OSD_OP_ORDERSNAP;
+       if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)
+               flags |= CEPH_OSD_OP_ACK;
+
+       /*
+        * we may need to do multiple writes here if we span an object
+        * boundary.  this isn't atomic, unfortunately.  :(
+        */
+       page_pos = pages;
+       pages_left = num_pages;
+
+more:
+       ret = ceph_osdc_writepages(&client->osdc, ceph_vino(inode),
                                   &ci->i_layout,
                                   ci->i_snap_realm->cached_context,
                                   pos, count, ci->i_truncate_seq,
-                                  ci->i_truncate_size, data);
+                                  ci->i_truncate_size,
+                                  page_pos, pages_left,
+                                  flags);
        if (ret > 0) {
                pos += ret;
+               written += ret;
+               count -= ret;
+               page_pos += (ret >> PAGE_CACHE_SHIFT);
+               pages_left -= (ret >> PAGE_CACHE_SHIFT);
+               if (pages_left)
+                       goto more;
+
+               ret = written;
                *offset = pos;
                if (pos > i_size_read(inode))
                        ceph_inode_set_size(inode, pos);
        }
 
+       if (file->f_flags & O_DIRECT)
+               kfree(pages);
+       else
+               release_page_vector(pages, num_pages);
        return ret;
 }
 
@@ -267,7 +398,7 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
  * Hmm, the sync reach case isn't actually async... should it be?
  */
 static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov,
-                     unsigned long nr_segs, loff_t pos)
+                            unsigned long nr_segs, loff_t pos)
 {
        struct file *filp = iocb->ki_filp;
        loff_t *ppos = &iocb->ki_pos;
@@ -277,20 +408,18 @@ static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov,
        ssize_t ret;
        int got = 0;
 
-       __ceph_do_pending_vmtruncate(inode);
-
        dout(10, "aio_read %llx.%llx %llu~%u trying to get caps on %p\n",
             ceph_vinop(inode), pos, (unsigned)len, inode);
-       ret = ceph_get_caps(ci,
-                                CEPH_CAP_FILE_RD,
-                                CEPH_CAP_FILE_RDCACHE,
-                                &got, -1);
+       __ceph_do_pending_vmtruncate(inode);
+       ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_RDCACHE,
+                           &got, -1);
        if (ret < 0)
                goto out;
        dout(10, "aio_read %llx.%llx %llu~%u got cap refs %d\n",
             ceph_vinop(inode), pos, (unsigned)len, got);
 
        if ((got & CEPH_CAP_FILE_RDCACHE) == 0 ||
+           (iocb->ki_filp->f_flags & O_DIRECT) ||
            (inode->i_sb->s_flags & MS_SYNCHRONOUS))
                /* hmm, this isn't really async... */
                ret = ceph_sync_read(filp, iov->iov_base, len, ppos);
@@ -361,17 +490,16 @@ retry_snap:
        check_max_size(inode, endoff);
        dout(10, "aio_write %p %llu~%u getting caps. i_size %llu\n",
             inode, pos, (unsigned)iov->iov_len, inode->i_size);
-       ret = ceph_get_caps(ci,
-                                CEPH_CAP_FILE_WR,
-                                CEPH_CAP_FILE_WRBUFFER,
-                                &got, endoff);
+       ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_WRBUFFER,
+                           &got, endoff);
        if (ret < 0)
                goto out;
 
-       dout(10, "aio_write %p %llu~%u  got cap refs on %d\n",
-            inode, pos, (unsigned)iov->iov_len, got);
+       dout(10, "aio_write %p %llu~%u  got %s\n",
+            inode, pos, (unsigned)iov->iov_len, ceph_cap_string(got));
 
-       if ((got & CEPH_CAP_FILE_WRBUFFER) == 0) {
+       if ((got & CEPH_CAP_FILE_WRBUFFER) == 0 ||
+           (iocb->ki_filp->f_flags & O_DIRECT)) {
                ret = ceph_sync_write(file, iov->iov_base, iov->iov_len,
                        &iocb->ki_pos);
        } else {
index 01acd4ebfe71f2f8502e1f4d66bba277928d689a..f32e58e9457fe48be7035dad7d3d3996393866ee 100644 (file)
@@ -95,7 +95,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 {
        struct ceph_osd_request *req;
        struct ceph_msg *msg;
-       int num_pages = calc_pages_for(off, *plen);
        struct ceph_osd_request_head *head;
        struct ceph_osd_op *op;
        __le64 *snaps;
@@ -106,7 +105,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        u64 prevofs;
 
        /* we may overallocate here, if our write extent is shortened below */
-       req = kzalloc(sizeof(*req) + num_pages*sizeof(void *), GFP_NOFS);
+       req = kzalloc(sizeof(*req), GFP_NOFS);
        if (req == NULL)
                return ERR_PTR(-ENOMEM);
 
@@ -996,113 +995,22 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
        return rc;
 }
 
-
 /*
- * synchronous write.  from userspace.
- *
- * FIXME: if write spans object boundary, just do two separate write.
- * for a correct atomic write, we should take write locks on all
- * objects, rollback on failure, etc.
- */
-int ceph_osdc_sync_write(struct ceph_osd_client *osdc, struct ceph_vino vino,
-                        struct ceph_file_layout *layout,
-                        struct ceph_snap_context *snapc,
-                        u64 off, u64 len,
-                        u32 truncate_seq, u64 truncate_size,
-                        const char __user *data)
-{
-       struct ceph_msg *reqm;
-       struct ceph_osd_request_head *reqhead;
-       struct ceph_osd_request *req;
-       int i, po, l, left;
-       int rc;
-       int finalrc = 0;
-
-       dout(10, "sync_write on ino %llx.%llx at %llu~%llu\n", vino.ino,
-            vino.snap, off, len);
-
-more:
-       req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
-                                   CEPH_OSD_OP_WRITE, snapc, 0,
-                                   truncate_seq, truncate_size);
-       if (IS_ERR(req))
-               return PTR_ERR(req);
-       reqm = req->r_request;
-       reqhead = reqm->front.iov_base;
-       reqhead->flags =
-               cpu_to_le32(CEPH_OSD_OP_ACK |           /* ack for now, FIXME */
-                           CEPH_OSD_OP_ORDERSNAP |     /* EOLDSNAPC if ooo */
-                           CEPH_OSD_OP_MODIFY);
-
-       dout(10, "sync_write %llu~%llu -> %d pages\n", off, len,
-            req->r_num_pages);
-
-       /* copy data into a set of pages */
-       left = len;
-       po = off & ~PAGE_MASK;
-       for (i = 0; i < req->r_num_pages; i++) {
-               int bad;
-               req->r_pages[i] = alloc_page(GFP_NOFS);
-               if (req->r_pages[i] == NULL) {
-                       req->r_num_pages = i+1;
-                       rc = -ENOMEM;
-                       goto out;
-               }
-               l = min_t(int, PAGE_SIZE-po, left);
-               bad = copy_from_user(page_address(req->r_pages[i]) + po, data,
-                                    l);
-               if (bad == l) {
-                       req->r_num_pages = i+1;
-                       rc = -EFAULT;
-                       goto out;
-               }
-               data += l - bad;
-               left -= l - bad;
-               if (po) {
-                       po += l - bad;
-                       if (po == PAGE_CACHE_SIZE)
-                               po = 0;
-               }
-       }
-       reqm->pages = req->r_pages;
-       reqm->nr_pages = req->r_num_pages;
-       reqm->hdr.data_len = cpu_to_le32(len);
-       reqm->hdr.data_off = cpu_to_le16(off);
-
-       rc = do_sync_request(osdc, req);
-out:
-       for (i = 0; i < req->r_num_pages; i++)
-               __free_pages(req->r_pages[i], 0);
-       ceph_osdc_put_request(req);
-       if (rc == 0) {
-               finalrc += len;
-               off += len;
-               len -= len;
-               if (len > 0)
-                       goto more;
-       } else {
-               finalrc = rc;
-       }
-       dout(10, "sync_write result %d\n", finalrc);
-       return finalrc;
-}
-
-/*
- * do a sync write for N pages
+ * do a sync write on N pages
  */
 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
                         struct ceph_file_layout *layout,
                         struct ceph_snap_context *snapc,
                         u64 off, u64 len,
                         u32 truncate_seq, u64 truncate_size,
-                        struct page **pages, int num_pages)
+                        struct page **pages, int num_pages,
+                        int flags)
 {
        struct ceph_msg *reqm;
        struct ceph_osd_request_head *reqhead;
        struct ceph_osd_op *op;
        struct ceph_osd_request *req;
        int rc = 0;
-       int flags;
 
        BUG_ON(vino.snap != CEPH_NOSNAP);
 
@@ -1115,20 +1023,17 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
        reqhead = reqm->front.iov_base;
        op = (void *)(reqhead + 1);
 
-       flags = CEPH_OSD_OP_MODIFY;
-       if (osdc->client->mount_args.flags & CEPH_MOUNT_UNSAFE_WRITEBACK)
-               flags |= CEPH_OSD_OP_ACK;
-       else
-               flags |= CEPH_OSD_OP_ONDISK;
-       reqhead->flags = cpu_to_le32(flags);
+       reqhead->flags = cpu_to_le32(flags | 
+                                    CEPH_OSD_OP_ONDISK |
+                                    CEPH_OSD_OP_MODIFY);
 
        len = le64_to_cpu(op->length);
        dout(10, "writepages %llu~%llu -> %d pages\n", off, len,
             req->r_num_pages);
 
        /* copy page vector */
-       memcpy(req->r_pages, pages, req->r_num_pages * sizeof(struct page *));
-       reqm->pages = req->r_pages;
+       req->r_pages = pages;
+       reqm->pages = pages;
        reqm->nr_pages = req->r_num_pages;
        reqm->hdr.data_len = cpu_to_le32(len);
        reqm->hdr.data_off = cpu_to_le16(off);
@@ -1142,7 +1047,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
 }
 
 /*
- * start an async multipage write
+ * start an async write
  */
 int ceph_osdc_writepages_start(struct ceph_osd_client *osdc,
                               struct ceph_osd_request *req,
index 0f60a9bdadb5a3f32daa0964cce5e291bfa551bd..c385f75be494b446e41d85ffd0aa71ae7f00ea90 100644 (file)
@@ -34,10 +34,12 @@ typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *);
 
 struct ceph_osd_request_attr {
        struct attribute attr;
-       ssize_t (*show)(struct ceph_osd_request *, struct ceph_osd_request_attr *,
+       ssize_t (*show)(struct ceph_osd_request *,
+                       struct ceph_osd_request_attr *,
                        char *);
-       ssize_t (*store)(struct ceph_osd_request *, struct ceph_osd_request_attr *,
-                       const char *, size_t);
+       ssize_t (*store)(struct ceph_osd_request *,
+                        struct ceph_osd_request_attr *,
+                        const char *, size_t);
 };
 
 /* an in-flight request */
@@ -66,7 +68,7 @@ struct ceph_osd_request {
        union ceph_pg     r_pgid;             /* placement group */
        struct ceph_snap_context *r_snapc;    /* snap context for writes */
        unsigned          r_num_pages;        /* size of page array (follows) */
-       struct page      *r_pages[0];         /* pages for data payload */
+       struct page     **r_pages;            /* pages for data payload */
 };
 
 struct ceph_osd_client {
@@ -131,7 +133,8 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
                                struct ceph_snap_context *sc,
                                u64 off, u64 len,
                                u32 truncate_seq, u64 truncate_size,
-                               struct page **pagevec, int nr_pages);
+                               struct page **pagevec, int nr_pages,
+                               int flags);
 extern int ceph_osdc_writepages_start(struct ceph_osd_client *osdc,
                                      struct ceph_osd_request *req,
                                      u64 len,
@@ -143,13 +146,6 @@ extern int ceph_osdc_sync_read(struct ceph_osd_client *osdc,
                               u64 off, u64 len,
                               u32 truncate_seq, u64 truncate_size,
                               char __user *data);
-extern int ceph_osdc_sync_write(struct ceph_osd_client *osdc,
-                               struct ceph_vino vino,
-                               struct ceph_file_layout *layout,
-                               struct ceph_snap_context *sc,
-                               u64 off, u64 len,
-                               u32 truncate_seq, u64 truncate_size,
-                               const char __user *data);
 
 #endif