&ci->i_layout, snapc,
page_off, len,
ci->i_truncate_seq, ci->i_truncate_size,
- &page, 1);
+ &page, 1, 0);
if (err < 0) {
dout(20, "writepage setting page error %p\n", page);
SetPageError(page);
ceph_put_wrbuffer_cap_refs(ci, req->r_num_pages, snapc);
ceph_release_pages(req->r_pages, req->r_num_pages);
+ kfree(req->r_pages);
ceph_osdc_put_request(req);
}
pgoff_t index, start, end;
int range_whole = 0;
int should_loop = 1;
- struct page **pages = NULL;
pgoff_t max_pages = 0, max_pages_ever = 0;
struct ceph_snap_context *snapc = NULL, *last_snapc = NULL;
struct pagevec *pvec;
ci->i_truncate_seq,
ci->i_truncate_size);
max_pages = req->r_num_pages;
- pages = req->r_pages;
+
+ rc = -ENOMEM;
+ req->r_pages = kmalloc(sizeof(*req->r_pages) *
+ max_pages, GFP_NOFS);
+ if (req->r_pages == NULL)
+ goto out;
req->r_callback = writepages_finish;
req->r_inode = inode;
req->r_wbc = wbc;
dout(20, "%p will write page %p idx %lu\n",
inode, page, page->index);
set_page_writeback(page);
- pages[locked_pages] = page;
+ req->r_pages[locked_pages] = page;
locked_pages++;
next = page->index + 1;
}
}
/* submit the write */
- offset = pages[0]->index << PAGE_CACHE_SHIFT;
+ offset = req->r_pages[0]->index << PAGE_CACHE_SHIFT;
len = min(i_size_read(inode) - offset,
(u64)locked_pages << PAGE_CACHE_SHIFT);
dout(10, "writepages got %d pages at %llu~%llu\n",
return ret;
}
+/*
+ * build a vector of user pages
+ */
+static struct page **get_direct_page_vector(const char __user *data,
+ int num_pages,
+ loff_t off, size_t len)
+{
+ struct page **pages;
+ int rc;
+
+ if ((off & ~PAGE_CACHE_MASK) ||
+ (len & ~PAGE_CACHE_MASK))
+ return ERR_PTR(-EINVAL);
+
+ pages = kmalloc(sizeof(*pages) * num_pages, GFP_NOFS);
+ if (!pages)
+ return ERR_PTR(-ENOMEM);
+
+ down_read(¤t->mm->mmap_sem);
+ rc = get_user_pages(current, current->mm, (unsigned long)data,
+ num_pages, 0, 0, pages, NULL);
+ up_read(¤t->mm->mmap_sem);
+ if (rc < 0)
+ goto fail;
+ return pages;
+
+fail:
+ kfree(pages);
+ return ERR_PTR(rc);
+}
+
+static void release_page_vector(struct page **pages, int num_pages)
+{
+ int i;
+
+ for (i = 0; i < num_pages; i++)
+ __free_pages(pages[i], 0);
+ kfree(pages);
+}
+
+/*
+ * copy user data into a page vector
+ */
+static struct page **copy_into_page_vector(const char __user *data,
+ int num_pages,
+ loff_t off, size_t len)
+{
+ struct page **pages;
+ int i, po, l, left;
+ int rc;
+
+ pages = kmalloc(sizeof(*pages) * num_pages, GFP_NOFS);
+ if (!pages)
+ return ERR_PTR(-ENOMEM);
+
+ left = len;
+ po = off & ~PAGE_MASK;
+ for (i = 0; i < num_pages; i++) {
+ int bad;
+ pages[i] = alloc_page(GFP_NOFS);
+ if (pages[i] == NULL) {
+ rc = -ENOMEM;
+ goto fail;
+ }
+ l = min_t(int, PAGE_SIZE-po, left);
+ bad = copy_from_user(page_address(pages[i]) + po, data, l);
+ if (bad == l) {
+ rc = -EFAULT;
+ goto fail;
+ }
+ data += l - bad;
+ left -= l - bad;
+ if (po) {
+ po += l - bad;
+ if (po == PAGE_CACHE_SIZE)
+ po = 0;
+ }
+ }
+ return pages;
+
+fail:
+ release_page_vector(pages, i);
+ return ERR_PTR(rc);
+}
+
+/*
+ * synchronous write. from userspace.
+ *
+ * FIXME: if write spans object boundary, just do two separate write.
+ * for a correct atomic write, we should take write locks on all
+ * objects, rollback on failure, etc.
+ */
static ssize_t ceph_sync_write(struct file *file, const char __user *data,
size_t count, loff_t *offset)
{
struct ceph_client *client = ceph_inode_to_client(inode);
int ret = 0;
off_t pos = *offset;
+ int num_pages = calc_pages_for(pos, count);
+ struct page **pages;
+ struct page **page_pos;
+ int pages_left;
+ int flags;
+ int written = 0;
if (ceph_snap(file->f_dentry->d_inode) != CEPH_NOSNAP)
return -EROFS;
- dout(10, "sync_write on file %p %lld~%u\n", file, *offset,
- (unsigned)count);
+ dout(10, "sync_write on file %p %lld~%u %s\n", file, *offset,
+ (unsigned)count, (file->f_flags & O_DIRECT) ? "O_DIRECT":"");
if (file->f_flags & O_APPEND)
pos = i_size_read(inode);
- ret = ceph_osdc_sync_write(&client->osdc, ceph_vino(inode),
+ if (file->f_flags & O_DIRECT)
+ pages = get_direct_page_vector(data, num_pages, pos, count);
+ else
+ pages = copy_into_page_vector(data, num_pages, pos, count);
+ if (IS_ERR(pages))
+ return PTR_ERR(pages);
+
+ flags = CEPH_OSD_OP_ORDERSNAP;
+ if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)
+ flags |= CEPH_OSD_OP_ACK;
+
+ /*
+ * we may need to do multiple writes here if we span an object
+ * boundary. this isn't atomic, unfortunately. :(
+ */
+ page_pos = pages;
+ pages_left = num_pages;
+
+more:
+ ret = ceph_osdc_writepages(&client->osdc, ceph_vino(inode),
&ci->i_layout,
ci->i_snap_realm->cached_context,
pos, count, ci->i_truncate_seq,
- ci->i_truncate_size, data);
+ ci->i_truncate_size,
+ page_pos, pages_left,
+ flags);
if (ret > 0) {
pos += ret;
+ written += ret;
+ count -= ret;
+ page_pos += (ret >> PAGE_CACHE_SHIFT);
+ pages_left -= (ret >> PAGE_CACHE_SHIFT);
+ if (pages_left)
+ goto more;
+
+ ret = written;
*offset = pos;
if (pos > i_size_read(inode))
ceph_inode_set_size(inode, pos);
}
+ if (file->f_flags & O_DIRECT)
+ kfree(pages);
+ else
+ release_page_vector(pages, num_pages);
return ret;
}
* Hmm, the sync reach case isn't actually async... should it be?
*/
static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov,
- unsigned long nr_segs, loff_t pos)
+ unsigned long nr_segs, loff_t pos)
{
struct file *filp = iocb->ki_filp;
loff_t *ppos = &iocb->ki_pos;
ssize_t ret;
int got = 0;
- __ceph_do_pending_vmtruncate(inode);
-
dout(10, "aio_read %llx.%llx %llu~%u trying to get caps on %p\n",
ceph_vinop(inode), pos, (unsigned)len, inode);
- ret = ceph_get_caps(ci,
- CEPH_CAP_FILE_RD,
- CEPH_CAP_FILE_RDCACHE,
- &got, -1);
+ __ceph_do_pending_vmtruncate(inode);
+ ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_RDCACHE,
+ &got, -1);
if (ret < 0)
goto out;
dout(10, "aio_read %llx.%llx %llu~%u got cap refs %d\n",
ceph_vinop(inode), pos, (unsigned)len, got);
if ((got & CEPH_CAP_FILE_RDCACHE) == 0 ||
+ (iocb->ki_filp->f_flags & O_DIRECT) ||
(inode->i_sb->s_flags & MS_SYNCHRONOUS))
/* hmm, this isn't really async... */
ret = ceph_sync_read(filp, iov->iov_base, len, ppos);
check_max_size(inode, endoff);
dout(10, "aio_write %p %llu~%u getting caps. i_size %llu\n",
inode, pos, (unsigned)iov->iov_len, inode->i_size);
- ret = ceph_get_caps(ci,
- CEPH_CAP_FILE_WR,
- CEPH_CAP_FILE_WRBUFFER,
- &got, endoff);
+ ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_WRBUFFER,
+ &got, endoff);
if (ret < 0)
goto out;
- dout(10, "aio_write %p %llu~%u got cap refs on %d\n",
- inode, pos, (unsigned)iov->iov_len, got);
+ dout(10, "aio_write %p %llu~%u got %s\n",
+ inode, pos, (unsigned)iov->iov_len, ceph_cap_string(got));
- if ((got & CEPH_CAP_FILE_WRBUFFER) == 0) {
+ if ((got & CEPH_CAP_FILE_WRBUFFER) == 0 ||
+ (iocb->ki_filp->f_flags & O_DIRECT)) {
ret = ceph_sync_write(file, iov->iov_base, iov->iov_len,
&iocb->ki_pos);
} else {
{
struct ceph_osd_request *req;
struct ceph_msg *msg;
- int num_pages = calc_pages_for(off, *plen);
struct ceph_osd_request_head *head;
struct ceph_osd_op *op;
__le64 *snaps;
u64 prevofs;
/* we may overallocate here, if our write extent is shortened below */
- req = kzalloc(sizeof(*req) + num_pages*sizeof(void *), GFP_NOFS);
+ req = kzalloc(sizeof(*req), GFP_NOFS);
if (req == NULL)
return ERR_PTR(-ENOMEM);
return rc;
}
-
/*
- * synchronous write. from userspace.
- *
- * FIXME: if write spans object boundary, just do two separate write.
- * for a correct atomic write, we should take write locks on all
- * objects, rollback on failure, etc.
- */
-int ceph_osdc_sync_write(struct ceph_osd_client *osdc, struct ceph_vino vino,
- struct ceph_file_layout *layout,
- struct ceph_snap_context *snapc,
- u64 off, u64 len,
- u32 truncate_seq, u64 truncate_size,
- const char __user *data)
-{
- struct ceph_msg *reqm;
- struct ceph_osd_request_head *reqhead;
- struct ceph_osd_request *req;
- int i, po, l, left;
- int rc;
- int finalrc = 0;
-
- dout(10, "sync_write on ino %llx.%llx at %llu~%llu\n", vino.ino,
- vino.snap, off, len);
-
-more:
- req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
- CEPH_OSD_OP_WRITE, snapc, 0,
- truncate_seq, truncate_size);
- if (IS_ERR(req))
- return PTR_ERR(req);
- reqm = req->r_request;
- reqhead = reqm->front.iov_base;
- reqhead->flags =
- cpu_to_le32(CEPH_OSD_OP_ACK | /* ack for now, FIXME */
- CEPH_OSD_OP_ORDERSNAP | /* EOLDSNAPC if ooo */
- CEPH_OSD_OP_MODIFY);
-
- dout(10, "sync_write %llu~%llu -> %d pages\n", off, len,
- req->r_num_pages);
-
- /* copy data into a set of pages */
- left = len;
- po = off & ~PAGE_MASK;
- for (i = 0; i < req->r_num_pages; i++) {
- int bad;
- req->r_pages[i] = alloc_page(GFP_NOFS);
- if (req->r_pages[i] == NULL) {
- req->r_num_pages = i+1;
- rc = -ENOMEM;
- goto out;
- }
- l = min_t(int, PAGE_SIZE-po, left);
- bad = copy_from_user(page_address(req->r_pages[i]) + po, data,
- l);
- if (bad == l) {
- req->r_num_pages = i+1;
- rc = -EFAULT;
- goto out;
- }
- data += l - bad;
- left -= l - bad;
- if (po) {
- po += l - bad;
- if (po == PAGE_CACHE_SIZE)
- po = 0;
- }
- }
- reqm->pages = req->r_pages;
- reqm->nr_pages = req->r_num_pages;
- reqm->hdr.data_len = cpu_to_le32(len);
- reqm->hdr.data_off = cpu_to_le16(off);
-
- rc = do_sync_request(osdc, req);
-out:
- for (i = 0; i < req->r_num_pages; i++)
- __free_pages(req->r_pages[i], 0);
- ceph_osdc_put_request(req);
- if (rc == 0) {
- finalrc += len;
- off += len;
- len -= len;
- if (len > 0)
- goto more;
- } else {
- finalrc = rc;
- }
- dout(10, "sync_write result %d\n", finalrc);
- return finalrc;
-}
-
-/*
- * do a sync write for N pages
+ * do a sync write on N pages
*/
int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
struct ceph_file_layout *layout,
struct ceph_snap_context *snapc,
u64 off, u64 len,
u32 truncate_seq, u64 truncate_size,
- struct page **pages, int num_pages)
+ struct page **pages, int num_pages,
+ int flags)
{
struct ceph_msg *reqm;
struct ceph_osd_request_head *reqhead;
struct ceph_osd_op *op;
struct ceph_osd_request *req;
int rc = 0;
- int flags;
BUG_ON(vino.snap != CEPH_NOSNAP);
reqhead = reqm->front.iov_base;
op = (void *)(reqhead + 1);
- flags = CEPH_OSD_OP_MODIFY;
- if (osdc->client->mount_args.flags & CEPH_MOUNT_UNSAFE_WRITEBACK)
- flags |= CEPH_OSD_OP_ACK;
- else
- flags |= CEPH_OSD_OP_ONDISK;
- reqhead->flags = cpu_to_le32(flags);
+ reqhead->flags = cpu_to_le32(flags |
+ CEPH_OSD_OP_ONDISK |
+ CEPH_OSD_OP_MODIFY);
len = le64_to_cpu(op->length);
dout(10, "writepages %llu~%llu -> %d pages\n", off, len,
req->r_num_pages);
/* copy page vector */
- memcpy(req->r_pages, pages, req->r_num_pages * sizeof(struct page *));
- reqm->pages = req->r_pages;
+ req->r_pages = pages;
+ reqm->pages = pages;
reqm->nr_pages = req->r_num_pages;
reqm->hdr.data_len = cpu_to_le32(len);
reqm->hdr.data_off = cpu_to_le16(off);
}
/*
- * start an async multipage write
+ * start an async write
*/
int ceph_osdc_writepages_start(struct ceph_osd_client *osdc,
struct ceph_osd_request *req,
struct ceph_osd_request_attr {
struct attribute attr;
- ssize_t (*show)(struct ceph_osd_request *, struct ceph_osd_request_attr *,
+ ssize_t (*show)(struct ceph_osd_request *,
+ struct ceph_osd_request_attr *,
char *);
- ssize_t (*store)(struct ceph_osd_request *, struct ceph_osd_request_attr *,
- const char *, size_t);
+ ssize_t (*store)(struct ceph_osd_request *,
+ struct ceph_osd_request_attr *,
+ const char *, size_t);
};
/* an in-flight request */
union ceph_pg r_pgid; /* placement group */
struct ceph_snap_context *r_snapc; /* snap context for writes */
unsigned r_num_pages; /* size of page array (follows) */
- struct page *r_pages[0]; /* pages for data payload */
+ struct page **r_pages; /* pages for data payload */
};
struct ceph_osd_client {
struct ceph_snap_context *sc,
u64 off, u64 len,
u32 truncate_seq, u64 truncate_size,
- struct page **pagevec, int nr_pages);
+ struct page **pagevec, int nr_pages,
+ int flags);
extern int ceph_osdc_writepages_start(struct ceph_osd_client *osdc,
struct ceph_osd_request *req,
u64 len,
u64 off, u64 len,
u32 truncate_seq, u64 truncate_size,
char __user *data);
-extern int ceph_osdc_sync_write(struct ceph_osd_client *osdc,
- struct ceph_vino vino,
- struct ceph_file_layout *layout,
- struct ceph_snap_context *sc,
- u64 off, u64 len,
- u32 truncate_seq, u64 truncate_size,
- const char __user *data);
#endif