inode, filp, page, page->index);
err = ceph_osdc_readpage(osdc, ceph_ino(inode), &ci->i_layout,
page->index << PAGE_SHIFT, PAGE_SIZE, page);
- if (err)
- goto out_unlock;
-
+ if (err < 0)
+ goto out;
+
+ if (err < PAGE_CACHE_SIZE) {
+ void *kaddr;
+ dout(10, "readpage zeroing tail %d bytes of page %p\n",
+ (int)PAGE_CACHE_SIZE - err, page);
+ kaddr = kmap(page);
+ memset(kaddr + err, 0, PAGE_CACHE_SIZE - err);
+ kunmap(page);
+ }
SetPageUptodate(page);
/* TODO: update info in ci? */
-out_unlock:
+out:
return err;
}
struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->osdc;
int rc = 0;
struct page *page;
- struct ceph_osd_request *oreq;
struct pagevec pvec;
loff_t offset;
}
*/
- //return generic_writepages(mapping, wbc);
-
/* where to start? */
pagevec_init(&pvec, 0);
if (wbc->range_cyclic) {
dout(20, "writepages rc %d\n", rc);
/* unmap+unlock pages */
+ if (rc >= 0)
+ rc += offset & ~PAGE_CACHE_MASK;
for (i = 0; i < locked_pages; i++) {
page = pvec.pages[first + i];
if (rc > (i << PAGE_CACHE_SHIFT))
if (should_loop && !done) {
/* more to do; loop back to beginning of file */
- dout(10, "looping back to beginning of file\n");
+ dout(10, "writepages looping back to beginning of file\n");
should_loop = 0;
index = 0;
goto retry;
return rc;
}
-
-/*
- * ceph_prepare_write:
- * allocate and initialize buffer heads for each page
- */
-static int ceph_prepare_write(struct file *filp, struct page *page,
- unsigned from, unsigned to)
-{
-/* struct inode *inode = filp->f_dentry->d_inode;*/
- struct inode *inode = page->mapping->host;
- struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->osdc;
- int err = 0;
- loff_t offset, i_size;
-
- dout(10, "prepare_write file %p inode %p page %p %d~%d\n", filp,
- inode, page, from, (to-from));
-
- /*
- err = ceph_wait_for_cap(inode, CEPH_CAP_WR);
- if (err)
- return err;
- */
-
- /*
- * 1. check if page is up to date
- * 2. If not, read a page to be up to date
- */
-
- if (PageUptodate(page))
- return 0;
-
- /* The given page is already up to date if it's a full page */
- if ((to == PAGE_SIZE) && (from == 0)) {
- SetPageUptodate(page);
- return 0;
- }
-
- offset = (loff_t)page->index << PAGE_SHIFT;
- i_size = i_size_read(inode);
-
- if ((offset >= i_size) ||
- ((from == 0) && (offset + to) >= i_size)) {
- /* data beyond the file end doesn't need to be read */
- simple_prepare_write(filp, page, from, to);
- SetPageUptodate(page);
- return 0;
- }
-
- /* Now it's clear that the page is not up to date */
-
- err = ceph_osdc_prepare_write(osdc, ceph_ino(inode), &ci->i_layout,
- page->index << PAGE_SHIFT, PAGE_SIZE,
- page);
- if (err)
- goto out_unlock;
-
- /* TODO: update info in ci? */
-
-out_unlock:
- return err;
-}
-
-/*
- * ceph_commit_write:
- * mark the page as dirty, so that it is written to the disk later
- */
-static int ceph_commit_write(struct file *filp, struct page *page,
- unsigned from, unsigned to)
-{
-/* struct inode *inode = filp->f_dentry->d_inode;*/
- struct inode *inode = page->mapping->host;
- struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->osdc;
- loff_t position = ((loff_t)page->index << PAGE_SHIFT) + to;
- int err = 0;
- char *page_data;
-
- dout(10, "commit_write file %p inode %p page %p %d~%d\n", filp,
- inode, page, from, (to-from));
-
- spin_lock(&inode->i_lock);
- if (position > inode->i_size)
- i_size_write(inode, position);
- spin_unlock(&inode->i_lock);
-
- /*
- * 1. check if page is up to date
- * 2. If not, make the page up to date by writing a page
- * 3. If yes, just set the page as dirty
- */
-
- if (!PageUptodate(page)) {
- position = ((loff_t)page->index << PAGE_SHIFT) + from;
-
- page_data = kmap(page);
- err = ceph_osdc_commit_write(osdc, ceph_ino(inode), &ci->i_layout,
- page->index << PAGE_SHIFT,
- PAGE_SIZE,
- page);
- if (err)
- err = 0; /* FIXME: more sophisticated error handling */
- kunmap(page);
-
- /* TODO: update info in ci? */
- } else {
- /* set the page as up-to-date and mark it as dirty */
- SetPageUptodate(page);
- set_page_dirty(page);
- ci->i_nr_dirty_pages++;
- }
-
-/*out_unlock:*/
- return err;
-}
-
-
/*
* newer write interface
*/
const struct address_space_operations ceph_aops = {
- .write_begin = ceph_write_begin,
- .write_end = ceph_write_end,
- //.prepare_write = ceph_prepare_write,
- //.commit_write = ceph_commit_write,
.readpage = ceph_readpage,
.readpages = ceph_readpages,
.writepage = ceph_writepage,
.writepages = ceph_writepages,
+ .write_begin = ceph_write_begin,
+ .write_end = ceph_write_end,
// .set_page_dirty = ceph_set_page_dirty,
.releasepage = ceph_releasepage,
};
loff_t len;
};
-static int ceph_readpage_async(struct ceph_osd_client *osdc,
- struct ceph_msg *reqm,
- struct page *page);
-
void ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
{
dout(5, "init\n");
loff_t off, loff_t len,
struct page *page)
{
- struct ceph_msg *reqm;
- struct ceph_osd_request_head *reqhead;
- int ret;
+ struct ceph_msg *reqm, *reply;
+ struct ceph_osd_request_head *reqhead;
+ struct ceph_osd_request *req;
+ struct ceph_osd_reply_head *replyhead;
+ __s32 rc;
dout(10, "readpage on ino %llx at %lld~%lld\n", ino, off, len);
reqhead = reqm->front.iov_base;
reqhead->oid.ino = ino;
reqhead->oid.rev = 0;
+
+ /* calc mapping */
calc_file_object_mapping(layout, &off, &len, &reqhead->oid,
&reqhead->offset, &reqhead->length);
BUG_ON(len != 0);
- calc_object_layout(&reqhead->layout, &reqhead->oid, layout, osdc->osdmap);
- dout(10, "readpage object block %u %llu~%llu\n", reqhead->oid.bno, reqhead->offset, reqhead->length);
+ calc_object_layout(&reqhead->layout, &reqhead->oid, layout,
+ osdc->osdmap);
+ dout(10, "readpage object block %u on %llu~%llu\n",
+ reqhead->oid.bno, reqhead->offset, reqhead->length);
- ret = ceph_readpage_async(osdc, reqm, page);
- if (ret < 0)
- return ret;
- return 0;
-}
-
-/*
- * read multiple pages (readahead)
- */
+ req = alloc_request(1);
+ if (IS_ERR(req)) {
+ ceph_msg_put(reqm);
+ return PTR_ERR(req);
+ }
+ req->r_pages[0] = page;
-int ceph_osdc_readpages_filler(void *data, struct page *page)
-{
- struct ceph_readdesc *pdesc = (struct ceph_readdesc *)data;
- struct ceph_osd_client *osdc = pdesc->osdc;
- struct ceph_file_layout *layout = pdesc->layout;
- loff_t off = pdesc->off;
- loff_t len = pdesc->len;
+ /* register+send request */
+ spin_lock(&osdc->lock);
+ req = register_request(osdc, reqm, 1, req);
+ if (IS_ERR(req)) {
+ spin_unlock(&osdc->lock);
+ ceph_msg_put(reqm);
+ return PTR_ERR(req);
+ }
- struct ceph_osd_request_head *reqhead;
- struct ceph_msg *reqm;
- struct inode *inode = page->mapping->host;
- ceph_ino_t ino = ceph_ino(inode);
- int ret;
+ reqhead->osdmap_epoch = osdc->osdmap->epoch;
- /* request msg */
- reqm = new_request_msg(osdc, CEPH_OSD_OP_READ);
- if (IS_ERR(reqm))
- return PTR_ERR(reqm);
- reqhead = reqm->front.iov_base;
- reqhead->oid.ino = ino;
- reqhead->oid.rev = 0;
- calc_file_object_mapping(layout, &off, &len, &reqhead->oid,
- &reqhead->offset, &reqhead->length);
- BUG_ON(len != 0);
- calc_object_layout(&reqhead->layout, &reqhead->oid, layout, osdc->osdmap);
- dout(10, "readpage object block %u %llu~%llu\n", reqhead->oid.bno, reqhead->offset, reqhead->length);
+ send_request(osdc, req);
+ spin_unlock(&osdc->lock);
- ret = ceph_readpage_async(osdc, reqm, page);
- if (ret < 0)
- return ret;
+ /* wait */
+ dout(10, "readpage tid %llu waiting on %p\n", req->r_tid, req);
+ wait_for_completion(&req->r_completion);
+ dout(10, "readpage tid %llu got reply on %p\n", req->r_tid, req);
+
+ spin_lock(&osdc->lock);
+ unregister_request(osdc, req);
+ spin_unlock(&osdc->lock);
- return 0;
+ reply = req->r_reply;
+ replyhead = reply->front.iov_base;
+ rc = le32_to_cpu(replyhead->result);
+ dout(10, "readpage result %d, read %d bytes\n", rc,
+ le32_to_cpu(reply->hdr.data_len));
+ if (rc < 0)
+ return rc;
+ return le32_to_cpu(reply->hdr.data_len);
}
-/* FIXME: Since this macro list_to_page is already defined in mm/readahead.c,
- * it's not good to define it again at this point.
+/*
+ * read some contiguous pages from page_list.
+ * - we stop if pages aren't contiguous, or when we hit an object boundary
*/
-#define list_to_page(head) (list_entry((head)->prev, struct page, lru))
-
int ceph_osdc_readpages(struct ceph_osd_client *osdc,
struct address_space *mapping,
ceph_ino_t ino, struct ceph_file_layout *layout,
struct page *page;
pgoff_t next_index;
int contig_pages;
+ __s32 rc;
/*
* for now, our strategy is simple: start with the
reply = req->r_reply;
replyhead = reply->front.iov_base;
- dout(10, "readpages result %d, read %d bytes\n", replyhead->result,
- reply->hdr.data_len);
- if (replyhead->result == 0)
- return reply->hdr.data_len;
- return replyhead->result;
+ rc = le32_to_cpu(replyhead->result);
+ dout(10, "readpages result %d, read %d bytes\n", rc,
+ le32_to_cpu(reply->hdr.data_len));
+ if (rc < 0)
+ return rc;
+ return le32_to_cpu(reply->hdr.data_len);
}
/*
- * silly hack.
+ * synchronous write. from userspace.
*/
-int ceph_osdc_silly_write(struct ceph_osd_client *osdc, ceph_ino_t ino,
- struct ceph_file_layout *layout,
- __u64 len, __u64 off, const char __user *data)
+int ceph_osdc_sync_write(struct ceph_osd_client *osdc, ceph_ino_t ino,
+ struct ceph_file_layout *layout,
+ __u64 off, __u64 len, const char __user *data)
{
struct ceph_msg *reqm, *reply;
struct ceph_osd_request_head *reqhead;
struct ceph_osd_request *req;
struct ceph_osd_reply_head *replyhead;
__u64 toff = off, tlen = len;
- int nrp, i, po, l, left;
+ int nr_pages, i, po, l, left;
+ __s32 rc;
- dout(10, "silly_write on ino %llx at %llu~%llu\n", ino, off, len);
+ dout(10, "sync_write on ino %llx at %llu~%llu\n", ino, off, len);
/* request msg */
reqm = new_request_msg(osdc, CEPH_OSD_OP_WRITE);
reqhead = reqm->front.iov_base;
reqhead->oid.ino = ino;
reqhead->oid.rev = 0;
- reqhead->flags = CEPH_OSD_OP_ACK|CEPH_OSD_OP_SAFE; /* want them both */
+ reqhead->flags = CEPH_OSD_OP_ACK; /* just ack.. FIXME */
calc_file_object_mapping(layout, &toff, &tlen, &reqhead->oid,
&reqhead->offset, &reqhead->length);
if (tlen != 0) {
- dout(10, "not writing complete bit.. skipping last %llu, doing %llu~%llu\n", tlen, off, len);
+ dout(10, " skipping last %llu, writing %llu~%llu\n",
+ tlen, off, len);
len -= tlen;
}
- calc_object_layout(&reqhead->layout, &reqhead->oid, layout, osdc->osdmap);
- dout(10, "silly_write object block %u %llu~%llu\n", reqhead->oid.bno, reqhead->offset, reqhead->length);
+ calc_object_layout(&reqhead->layout, &reqhead->oid, layout,
+ osdc->osdmap);
+ dout(10, "sync_write object block %u on %llu~%llu\n",
+ reqhead->oid.bno, reqhead->offset, reqhead->length);
/* how many pages? */
- nrp = calc_pages_for(len, off);
- dout(10, "%d~%d -> %d pages\n", (int)off, (int)len, nrp);
+ nr_pages = calc_pages_for(len, off);
+ dout(10, "sync_write %llu~%llu -> %d pages\n", off, len, nr_pages);
- req = alloc_request(nrp);
+ req = alloc_request(nr_pages);
if (IS_ERR(req))
return PTR_ERR(req);
- /* register+send request */
- spin_lock(&osdc->lock);
- req = register_request(osdc, reqm, nrp, req);
- if (IS_ERR(req)) {
- ceph_msg_put(reqm);
- spin_unlock(&osdc->lock);
- return PTR_ERR(req);
- }
-
- /* data into a set of pages */
- for (i=0; i<nrp; i++)
+ /* copy data into a set of pages */
+ for (i=0; i<nr_pages; i++)
req->r_pages[i] = alloc_page(GFP_KERNEL);
left = len;
po = off & ~PAGE_MASK;
- for (i=0; i<nrp; i++) {
+ for (i=0; i<nr_pages; i++) {
l = min_t(int, PAGE_SIZE-po, left);
copy_from_user(page_address(req->r_pages[i]) + po, data, l);
data += l;
left -= l;
po = 0;
}
- req->r_request->pages = req->r_pages;
- req->r_request->nr_pages = req->r_nr_pages;
- req->r_request->hdr.data_len = cpu_to_le32(len);
- req->r_request->hdr.data_off = cpu_to_le32(off);
-
- reqhead->osdmap_epoch = osdc->osdmap->epoch;
- send_request(osdc, req);
- spin_unlock(&osdc->lock);
-
- /* wait */
- dout(10, "silly_write tid %llu waiting for reply on %p\n", req->r_tid, req);
- wait_for_completion(&req->r_completion);
- dout(10, "silly_write tid %llu got reply on %p\n", req->r_tid, req);
-
- spin_lock(&osdc->lock);
- unregister_request(osdc, req);
- spin_unlock(&osdc->lock);
-
- reply = req->r_reply;
- replyhead = reply->front.iov_base;
- dout(10, "silly_write result %d, returning %d\n", replyhead->result, (int)len);
- put_request(req);
- return (int)len;
-}
-
-
-
-/*
- * Make a page up to date to be written later to remote OSDs
- */
-int ceph_osdc_prepare_write(struct ceph_osd_client *osdc, ceph_ino_t ino,
- struct ceph_file_layout *layout,
- loff_t off, loff_t len,
- struct page *page)
-{
- int rc;
- char *read_data;
- struct ceph_msg *reqm;
- struct ceph_osd_request_head *reqhead;
-
- dout(10, "osdc_prepare_write on ino %llx at %lld~%lld\n", ino, off, len);
-
- get_page(page);
- read_data = kmap(page);
-
- /*
- * 1. Read a page by calling ceph_osdc_readpage()
- * 2. Fill the rest bytes by 0, to align by PAGE_SIZE
- */
-
- /* request msg */
- reqm = new_request_msg(osdc, CEPH_OSD_OP_READ);
- if (IS_ERR(reqm))
- return PTR_ERR(reqm);
- reqhead = reqm->front.iov_base;
- reqhead->oid.ino = ino;
- reqhead->oid.rev = 0;
- calc_file_object_mapping(layout, &off, &len, &reqhead->oid,
- &reqhead->offset, &reqhead->length);
- BUG_ON(len != 0);
- calc_object_layout(&reqhead->layout, &reqhead->oid, layout, osdc->osdmap);
- dout(10, "readpage object block %u %llu~%llu\n", reqhead->oid.bno, reqhead->offset, reqhead->length);
-
- rc = ceph_readpage_async(osdc, reqm, page);
- if (rc < 0)
- goto io_error;
-
- if (rc < PAGE_SIZE)
- memset(read_data + rc, 0, PAGE_SIZE - rc);
-
- SetPageUptodate(page);
-
- rc = 0;
-
-io_error:
- kunmap(page);
- put_page(page);
- return rc;
-}
-
-/*
- * Mark a page as dirty to be written into remote OSDs
- */
-int ceph_osdc_commit_write(struct ceph_osd_client *osdc, ceph_ino_t ino,
- struct ceph_file_layout *layout,
- loff_t off, loff_t len,
- struct page *page)
-{
- char *write_data;
- int ret = 0;
-
- dout(10, "osdc_commit_write on ino %llx at %lld~%lld\n", ino, off, len);
-
- /*
- * if it is not up-to-date, then force it to be so.
- */
-
- write_data = kmap(page);
- write_data += off;
-
- if (len > PAGE_SIZE) {
- kunmap(page);
- return -EIO;
- }
-
- //ret = ceph_writepage_async(osdc, ino, layout, off, len, write_data);
- ret = len;
- kunmap(page);
- return ret;
-}
-
-
-/*
- * do a read job for one page
- */
-static int ceph_readpage_async(struct ceph_osd_client *osdc,
- struct ceph_msg *reqm,
- struct page *page)
-{
- struct ceph_msg *reply;
- struct ceph_osd_request_head *reqhead;
- struct ceph_osd_request *req;
- struct ceph_osd_reply_head *replyhead;
-
- reqhead = reqm->front.iov_base;
-
- req = alloc_request(1);
- if (IS_ERR(req))
- return PTR_ERR(req);
/* register+send request */
spin_lock(&osdc->lock);
- req = register_request(osdc, reqm, 1, req);
+ req = register_request(osdc, reqm, nr_pages, req);
if (IS_ERR(req)) {
ceph_msg_put(reqm);
spin_unlock(&osdc->lock);
return PTR_ERR(req);
}
- req->r_pages[0] = page;
+ req->r_request->pages = req->r_pages;
+ req->r_request->nr_pages = nr_pages;
+ req->r_request->hdr.data_len = cpu_to_le32(len);
+ req->r_request->hdr.data_off = cpu_to_le32(off);
reqhead->osdmap_epoch = osdc->osdmap->epoch;
send_request(osdc, req);
spin_unlock(&osdc->lock);
/* wait */
- dout(10, "readpage tid %llu waiting for reply on %p\n", req->r_tid, req);
+ dout(10, "sync_write tid %llu waiting on %p\n", req->r_tid, req);
wait_for_completion(&req->r_completion);
- dout(10, "readpage tid %llu got reply on %p\n", req->r_tid, req);
+ dout(10, "sync_write tid %llu got reply on %p\n", req->r_tid, req);
spin_lock(&osdc->lock);
unregister_request(osdc, req);
reply = req->r_reply;
replyhead = reply->front.iov_base;
- dout(10, "readpage result %d\n", replyhead->result);
+ rc = le32_to_cpu(replyhead->result);
+ dout(10, "sync_write result %d\n", rc);
put_request(req);
-
- return 0;
+ if (rc < 0)
+ return rc;
+ return len;
}
/*
struct ceph_osd_request *req;
struct ceph_osd_reply_head *replyhead;
__u64 toff = off, tlen = len;
- int ret = 0;
+ __s32 ret = 0;
/* request + msg */
reqm = new_request_msg(osdc, CEPH_OSD_OP_WRITE);