From: Sage Weil Date: Fri, 13 Mar 2009 17:38:20 +0000 (-0700) Subject: kclient: refactor write path to facilitate sync or O_DIRECT writes X-Git-Tag: v0.7.1^2~25 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=7990b384f25b7356aacf987486991190ab7cfec8;p=ceph.git kclient: refactor write path to facilitate sync or O_DIRECT writes Refactor osd_client writepages to always take a page vector, owned by the caller. Move user data copy for regular sync write into file.c. Alternatively, build a page vector of user pages for O_DIRECT writes. Fix ->writepages async callers to allocate and free the page vector. --- diff --git a/src/kernel/addr.c b/src/kernel/addr.c index 96c5c37df42a..a79316fc9f7b 100644 --- a/src/kernel/addr.c +++ b/src/kernel/addr.c @@ -388,7 +388,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) &ci->i_layout, snapc, page_off, len, ci->i_truncate_seq, ci->i_truncate_size, - &page, 1); + &page, 1, 0); if (err < 0) { dout(20, "writepage setting page error %p\n", page); SetPageError(page); @@ -497,6 +497,7 @@ static void writepages_finish(struct ceph_osd_request *req) ceph_put_wrbuffer_cap_refs(ci, req->r_num_pages, snapc); ceph_release_pages(req->r_pages, req->r_num_pages); + kfree(req->r_pages); ceph_osdc_put_request(req); } @@ -513,7 +514,6 @@ static int ceph_writepages_start(struct address_space *mapping, pgoff_t index, start, end; int range_whole = 0; int should_loop = 1; - struct page **pages = NULL; pgoff_t max_pages = 0, max_pages_ever = 0; struct ceph_snap_context *snapc = NULL, *last_snapc = NULL; struct pagevec *pvec; @@ -695,7 +695,12 @@ get_more_pages: ci->i_truncate_seq, ci->i_truncate_size); max_pages = req->r_num_pages; - pages = req->r_pages; + + rc = -ENOMEM; + req->r_pages = kmalloc(sizeof(*req->r_pages) * + max_pages, GFP_NOFS); + if (req->r_pages == NULL) + goto out; req->r_callback = writepages_finish; req->r_inode = inode; req->r_wbc = wbc; @@ -707,7 +712,7 @@ get_more_pages: dout(20, "%p will write page %p idx %lu\n", inode, page, page->index); set_page_writeback(page); - pages[locked_pages] = page; + req->r_pages[locked_pages] = page; locked_pages++; next = page->index + 1; } @@ -737,7 +742,7 @@ get_more_pages: } /* submit the write */ - offset = pages[0]->index << PAGE_CACHE_SHIFT; + offset = req->r_pages[0]->index << PAGE_CACHE_SHIFT; len = min(i_size_read(inode) - offset, (u64)locked_pages << PAGE_CACHE_SHIFT); dout(10, "writepages got %d pages at %llu~%llu\n", diff --git a/src/kernel/file.c b/src/kernel/file.c index a5e429618647..4ceb89b62a4d 100644 --- a/src/kernel/file.c +++ b/src/kernel/file.c @@ -226,6 +226,98 @@ static ssize_t ceph_sync_read(struct file *file, char __user *data, return ret; } +/* + * build a vector of user pages + */ +static struct page **get_direct_page_vector(const char __user *data, + int num_pages, + loff_t off, size_t len) +{ + struct page **pages; + int rc; + + if ((off & ~PAGE_CACHE_MASK) || + (len & ~PAGE_CACHE_MASK)) + return ERR_PTR(-EINVAL); + + pages = kmalloc(sizeof(*pages) * num_pages, GFP_NOFS); + if (!pages) + return ERR_PTR(-ENOMEM); + + down_read(¤t->mm->mmap_sem); + rc = get_user_pages(current, current->mm, (unsigned long)data, + num_pages, 0, 0, pages, NULL); + up_read(¤t->mm->mmap_sem); + if (rc < 0) + goto fail; + return pages; + +fail: + kfree(pages); + return ERR_PTR(rc); +} + +static void release_page_vector(struct page **pages, int num_pages) +{ + int i; + + for (i = 0; i < num_pages; i++) + __free_pages(pages[i], 0); + kfree(pages); +} + +/* + * copy user data into a page vector + */ +static struct page **copy_into_page_vector(const char __user *data, + int num_pages, + loff_t off, size_t len) +{ + struct page **pages; + int i, po, l, left; + int rc; + + pages = kmalloc(sizeof(*pages) * num_pages, GFP_NOFS); + if (!pages) + return ERR_PTR(-ENOMEM); + + left = len; + po = off & ~PAGE_MASK; + for (i = 0; i < num_pages; i++) { + int bad; + pages[i] = alloc_page(GFP_NOFS); + if (pages[i] == NULL) { + rc = -ENOMEM; + goto fail; + } + l = min_t(int, PAGE_SIZE-po, left); + bad = copy_from_user(page_address(pages[i]) + po, data, l); + if (bad == l) { + rc = -EFAULT; + goto fail; + } + data += l - bad; + left -= l - bad; + if (po) { + po += l - bad; + if (po == PAGE_CACHE_SIZE) + po = 0; + } + } + return pages; + +fail: + release_page_vector(pages, i); + return ERR_PTR(rc); +} + +/* + * synchronous write. from userspace. + * + * FIXME: if write spans object boundary, just do two separate write. + * for a correct atomic write, we should take write locks on all + * objects, rollback on failure, etc. + */ static ssize_t ceph_sync_write(struct file *file, const char __user *data, size_t count, loff_t *offset) { @@ -234,28 +326,67 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data, struct ceph_client *client = ceph_inode_to_client(inode); int ret = 0; off_t pos = *offset; + int num_pages = calc_pages_for(pos, count); + struct page **pages; + struct page **page_pos; + int pages_left; + int flags; + int written = 0; if (ceph_snap(file->f_dentry->d_inode) != CEPH_NOSNAP) return -EROFS; - dout(10, "sync_write on file %p %lld~%u\n", file, *offset, - (unsigned)count); + dout(10, "sync_write on file %p %lld~%u %s\n", file, *offset, + (unsigned)count, (file->f_flags & O_DIRECT) ? "O_DIRECT":""); if (file->f_flags & O_APPEND) pos = i_size_read(inode); - ret = ceph_osdc_sync_write(&client->osdc, ceph_vino(inode), + if (file->f_flags & O_DIRECT) + pages = get_direct_page_vector(data, num_pages, pos, count); + else + pages = copy_into_page_vector(data, num_pages, pos, count); + if (IS_ERR(pages)) + return PTR_ERR(pages); + + flags = CEPH_OSD_OP_ORDERSNAP; + if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0) + flags |= CEPH_OSD_OP_ACK; + + /* + * we may need to do multiple writes here if we span an object + * boundary. this isn't atomic, unfortunately. :( + */ + page_pos = pages; + pages_left = num_pages; + +more: + ret = ceph_osdc_writepages(&client->osdc, ceph_vino(inode), &ci->i_layout, ci->i_snap_realm->cached_context, pos, count, ci->i_truncate_seq, - ci->i_truncate_size, data); + ci->i_truncate_size, + page_pos, pages_left, + flags); if (ret > 0) { pos += ret; + written += ret; + count -= ret; + page_pos += (ret >> PAGE_CACHE_SHIFT); + pages_left -= (ret >> PAGE_CACHE_SHIFT); + if (pages_left) + goto more; + + ret = written; *offset = pos; if (pos > i_size_read(inode)) ceph_inode_set_size(inode, pos); } + if (file->f_flags & O_DIRECT) + kfree(pages); + else + release_page_vector(pages, num_pages); return ret; } @@ -267,7 +398,7 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data, * Hmm, the sync reach case isn't actually async... should it be? */ static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov, - unsigned long nr_segs, loff_t pos) + unsigned long nr_segs, loff_t pos) { struct file *filp = iocb->ki_filp; loff_t *ppos = &iocb->ki_pos; @@ -277,20 +408,18 @@ static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov, ssize_t ret; int got = 0; - __ceph_do_pending_vmtruncate(inode); - dout(10, "aio_read %llx.%llx %llu~%u trying to get caps on %p\n", ceph_vinop(inode), pos, (unsigned)len, inode); - ret = ceph_get_caps(ci, - CEPH_CAP_FILE_RD, - CEPH_CAP_FILE_RDCACHE, - &got, -1); + __ceph_do_pending_vmtruncate(inode); + ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_RDCACHE, + &got, -1); if (ret < 0) goto out; dout(10, "aio_read %llx.%llx %llu~%u got cap refs %d\n", ceph_vinop(inode), pos, (unsigned)len, got); if ((got & CEPH_CAP_FILE_RDCACHE) == 0 || + (iocb->ki_filp->f_flags & O_DIRECT) || (inode->i_sb->s_flags & MS_SYNCHRONOUS)) /* hmm, this isn't really async... */ ret = ceph_sync_read(filp, iov->iov_base, len, ppos); @@ -361,17 +490,16 @@ retry_snap: check_max_size(inode, endoff); dout(10, "aio_write %p %llu~%u getting caps. i_size %llu\n", inode, pos, (unsigned)iov->iov_len, inode->i_size); - ret = ceph_get_caps(ci, - CEPH_CAP_FILE_WR, - CEPH_CAP_FILE_WRBUFFER, - &got, endoff); + ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_WRBUFFER, + &got, endoff); if (ret < 0) goto out; - dout(10, "aio_write %p %llu~%u got cap refs on %d\n", - inode, pos, (unsigned)iov->iov_len, got); + dout(10, "aio_write %p %llu~%u got %s\n", + inode, pos, (unsigned)iov->iov_len, ceph_cap_string(got)); - if ((got & CEPH_CAP_FILE_WRBUFFER) == 0) { + if ((got & CEPH_CAP_FILE_WRBUFFER) == 0 || + (iocb->ki_filp->f_flags & O_DIRECT)) { ret = ceph_sync_write(file, iov->iov_base, iov->iov_len, &iocb->ki_pos); } else { diff --git a/src/kernel/osd_client.c b/src/kernel/osd_client.c index 01acd4ebfe71..f32e58e9457f 100644 --- a/src/kernel/osd_client.c +++ b/src/kernel/osd_client.c @@ -95,7 +95,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, { struct ceph_osd_request *req; struct ceph_msg *msg; - int num_pages = calc_pages_for(off, *plen); struct ceph_osd_request_head *head; struct ceph_osd_op *op; __le64 *snaps; @@ -106,7 +105,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, u64 prevofs; /* we may overallocate here, if our write extent is shortened below */ - req = kzalloc(sizeof(*req) + num_pages*sizeof(void *), GFP_NOFS); + req = kzalloc(sizeof(*req), GFP_NOFS); if (req == NULL) return ERR_PTR(-ENOMEM); @@ -996,113 +995,22 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, return rc; } - /* - * synchronous write. from userspace. - * - * FIXME: if write spans object boundary, just do two separate write. - * for a correct atomic write, we should take write locks on all - * objects, rollback on failure, etc. - */ -int ceph_osdc_sync_write(struct ceph_osd_client *osdc, struct ceph_vino vino, - struct ceph_file_layout *layout, - struct ceph_snap_context *snapc, - u64 off, u64 len, - u32 truncate_seq, u64 truncate_size, - const char __user *data) -{ - struct ceph_msg *reqm; - struct ceph_osd_request_head *reqhead; - struct ceph_osd_request *req; - int i, po, l, left; - int rc; - int finalrc = 0; - - dout(10, "sync_write on ino %llx.%llx at %llu~%llu\n", vino.ino, - vino.snap, off, len); - -more: - req = ceph_osdc_new_request(osdc, layout, vino, off, &len, - CEPH_OSD_OP_WRITE, snapc, 0, - truncate_seq, truncate_size); - if (IS_ERR(req)) - return PTR_ERR(req); - reqm = req->r_request; - reqhead = reqm->front.iov_base; - reqhead->flags = - cpu_to_le32(CEPH_OSD_OP_ACK | /* ack for now, FIXME */ - CEPH_OSD_OP_ORDERSNAP | /* EOLDSNAPC if ooo */ - CEPH_OSD_OP_MODIFY); - - dout(10, "sync_write %llu~%llu -> %d pages\n", off, len, - req->r_num_pages); - - /* copy data into a set of pages */ - left = len; - po = off & ~PAGE_MASK; - for (i = 0; i < req->r_num_pages; i++) { - int bad; - req->r_pages[i] = alloc_page(GFP_NOFS); - if (req->r_pages[i] == NULL) { - req->r_num_pages = i+1; - rc = -ENOMEM; - goto out; - } - l = min_t(int, PAGE_SIZE-po, left); - bad = copy_from_user(page_address(req->r_pages[i]) + po, data, - l); - if (bad == l) { - req->r_num_pages = i+1; - rc = -EFAULT; - goto out; - } - data += l - bad; - left -= l - bad; - if (po) { - po += l - bad; - if (po == PAGE_CACHE_SIZE) - po = 0; - } - } - reqm->pages = req->r_pages; - reqm->nr_pages = req->r_num_pages; - reqm->hdr.data_len = cpu_to_le32(len); - reqm->hdr.data_off = cpu_to_le16(off); - - rc = do_sync_request(osdc, req); -out: - for (i = 0; i < req->r_num_pages; i++) - __free_pages(req->r_pages[i], 0); - ceph_osdc_put_request(req); - if (rc == 0) { - finalrc += len; - off += len; - len -= len; - if (len > 0) - goto more; - } else { - finalrc = rc; - } - dout(10, "sync_write result %d\n", finalrc); - return finalrc; -} - -/* - * do a sync write for N pages + * do a sync write on N pages */ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, struct ceph_file_layout *layout, struct ceph_snap_context *snapc, u64 off, u64 len, u32 truncate_seq, u64 truncate_size, - struct page **pages, int num_pages) + struct page **pages, int num_pages, + int flags) { struct ceph_msg *reqm; struct ceph_osd_request_head *reqhead; struct ceph_osd_op *op; struct ceph_osd_request *req; int rc = 0; - int flags; BUG_ON(vino.snap != CEPH_NOSNAP); @@ -1115,20 +1023,17 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, reqhead = reqm->front.iov_base; op = (void *)(reqhead + 1); - flags = CEPH_OSD_OP_MODIFY; - if (osdc->client->mount_args.flags & CEPH_MOUNT_UNSAFE_WRITEBACK) - flags |= CEPH_OSD_OP_ACK; - else - flags |= CEPH_OSD_OP_ONDISK; - reqhead->flags = cpu_to_le32(flags); + reqhead->flags = cpu_to_le32(flags | + CEPH_OSD_OP_ONDISK | + CEPH_OSD_OP_MODIFY); len = le64_to_cpu(op->length); dout(10, "writepages %llu~%llu -> %d pages\n", off, len, req->r_num_pages); /* copy page vector */ - memcpy(req->r_pages, pages, req->r_num_pages * sizeof(struct page *)); - reqm->pages = req->r_pages; + req->r_pages = pages; + reqm->pages = pages; reqm->nr_pages = req->r_num_pages; reqm->hdr.data_len = cpu_to_le32(len); reqm->hdr.data_off = cpu_to_le16(off); @@ -1142,7 +1047,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, } /* - * start an async multipage write + * start an async write */ int ceph_osdc_writepages_start(struct ceph_osd_client *osdc, struct ceph_osd_request *req, diff --git a/src/kernel/osd_client.h b/src/kernel/osd_client.h index 0f60a9bdadb5..c385f75be494 100644 --- a/src/kernel/osd_client.h +++ b/src/kernel/osd_client.h @@ -34,10 +34,12 @@ typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *); struct ceph_osd_request_attr { struct attribute attr; - ssize_t (*show)(struct ceph_osd_request *, struct ceph_osd_request_attr *, + ssize_t (*show)(struct ceph_osd_request *, + struct ceph_osd_request_attr *, char *); - ssize_t (*store)(struct ceph_osd_request *, struct ceph_osd_request_attr *, - const char *, size_t); + ssize_t (*store)(struct ceph_osd_request *, + struct ceph_osd_request_attr *, + const char *, size_t); }; /* an in-flight request */ @@ -66,7 +68,7 @@ struct ceph_osd_request { union ceph_pg r_pgid; /* placement group */ struct ceph_snap_context *r_snapc; /* snap context for writes */ unsigned r_num_pages; /* size of page array (follows) */ - struct page *r_pages[0]; /* pages for data payload */ + struct page **r_pages; /* pages for data payload */ }; struct ceph_osd_client { @@ -131,7 +133,8 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_snap_context *sc, u64 off, u64 len, u32 truncate_seq, u64 truncate_size, - struct page **pagevec, int nr_pages); + struct page **pagevec, int nr_pages, + int flags); extern int ceph_osdc_writepages_start(struct ceph_osd_client *osdc, struct ceph_osd_request *req, u64 len, @@ -143,13 +146,6 @@ extern int ceph_osdc_sync_read(struct ceph_osd_client *osdc, u64 off, u64 len, u32 truncate_seq, u64 truncate_size, char __user *data); -extern int ceph_osdc_sync_write(struct ceph_osd_client *osdc, - struct ceph_vino vino, - struct ceph_file_layout *layout, - struct ceph_snap_context *sc, - u64 off, u64 len, - u32 truncate_seq, u64 truncate_size, - const char __user *data); #endif