}
static int ceph_readpages(struct file *file, struct address_space *mapping,
- struct list_head *pages, unsigned nr_pages)
+ struct list_head *page_list, unsigned nr_pages)
{
struct inode *inode = file->f_dentry->d_inode;
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->osdc;
- int err = 0;
+ int rc = 0;
+ struct page *page;
+ int contig_pages = 0;
+ int next_index;
+ struct ceph_osd_request *oreq;
+ struct pagevec pvec;
+ loff_t offset;
- dout(10, "ceph_readpages inode %p file %p nr_pages %d\n",
+ dout(10, "readpages inode %p file %p nr_pages %d\n",
inode, file, nr_pages);
- err = ceph_osdc_readpages(osdc, mapping, ceph_ino(inode), &ci->i_layout,
- pages, nr_pages);
- if (err < 0)
- goto out_unlock;
-
-out_unlock:
- return err;
+ /* alloc request, w/ page vector */
+ oreq = ceph_osdc_alloc_request(nr_pages);
+ if (oreq == 0)
+ return -ENOMEM;
+ /* find adjacent pages */
+ BUG_ON(list_empty(page_list));
+ page = list_entry(page_list->prev, struct page, lru);
+ offset = page->index << PAGE_CACHE_SHIFT;
+ next_index = page->index;
+ contig_pages = 0;
+ list_for_each_entry_reverse(page, page_list, lru) {
+ if (page->index == next_index) {
+ oreq->r_pages[contig_pages] = page;
+ contig_pages++;
+ next_index++;
+ } else
+ break;
+ }
+ dout(10, "readpages found %d/%d contig\n", contig_pages, nr_pages);
+ if (contig_pages == 0)
+ return 0;
+ rc = ceph_osdc_readpages(osdc, mapping, ceph_ino(inode), &ci->i_layout,
+ offset, contig_pages << PAGE_CACHE_SHIFT,
+ oreq);
+ if (rc < 0)
+ goto out;
+
+ /* set uptodate and add to lru in pagevec-sized chunks */
+ pagevec_init(&pvec, 0);
+ for (; rc > 0; rc -= PAGE_CACHE_SIZE) {
+ if (list_empty(&page_list))
+ break; /* WTF */
+ page = list_entry(page_list->prev, struct page, lru);
+ list_del(&page->lru);
+
+ if (add_to_page_cache(page, mapping, page->index,
+ GFP_KERNEL)) {
+ page_cache_release(page);
+ dout(20, "readpages add_to_page_cache failed on %p\n",
+ page);
+ continue;
+ }
+ dout(10, "readpages adding page %p\n", page);
+ flush_dcache_page(page);
+ SetPageUptodate(page);
+ unlock_page(page);
+ if (pagevec_add(&pvec, page) == 0)
+ pagevec_lru_add(&pvec);
+ }
+ pagevec_lru_add(&pvec);
+out:
+ ceph_osdc_put_request(oreq);
+ if (rc < 0)
+ return rc;
+ return 0;
}
//.prepare_write = ceph_prepare_write,
//.commit_write = ceph_commit_write,
.readpage = ceph_readpage,
- //.readpages = ceph_readpages,
+ .readpages = ceph_readpages,
.writepage = ceph_writepage,
.writepages = ceph_writepages,
// .set_page_dirty = ceph_set_page_dirty,
atomic_inc(&req->r_ref);
}
-static void put_request(struct ceph_osd_request *req)
+void ceph_osdc_put_request(struct ceph_osd_request *req)
{
BUG_ON(atomic_read(&req->r_ref) <= 0);
if (atomic_dec_and_test(&req->r_ref)) {
return req;
}
-struct ceph_osd_request *alloc_request(int nr_pages)
+struct ceph_osd_request *ceph_osdc_alloc_request(int nr_pages)
{
struct ceph_osd_request *req;
{
dout(30, "unregister_request %p tid %lld\n", req, req->r_tid);
radix_tree_delete(&osdc->request_tree, req->r_tid);
- put_request(req);
+ ceph_osdc_put_request(req);
}
/*
req->r_flags |= rhead->flags;
spin_unlock(&osdc->lock);
complete(&req->r_completion);
- put_request(req);
+ ceph_osdc_put_request(req);
}
int ceph_osdc_readpages(struct ceph_osd_client *osdc,
struct address_space *mapping,
- ceph_ino_t ino,
- struct ceph_file_layout *layout,
- struct list_head *pagels, int nr_pages)
+ ceph_ino_t ino, struct ceph_file_layout *layout,
+ __u64 off, __u64 len,
+ struct ceph_osd_request *req)
{
struct ceph_msg *reqm, *reply;
struct ceph_osd_request_head *reqhead;
- struct ceph_osd_request *req;
struct ceph_osd_reply_head *replyhead;
- __u64 off, len;
- struct ceph_readdesc desc = {
- .osdc = osdc,
- .layout = layout
- };
/*
* for now, our strategy is simple: start with the
* nr_pages.
*/
- dout(10, "readpage on ino %llx at %lld~%lld\n", ino, off, len);
+ dout(10, "readpages on ino %llx on %llu~%llu\n", ino, off, len);
/* request msg */
reqm = new_request_msg(osdc, CEPH_OSD_OP_READ);
reqhead = reqm->front.iov_base;
reqhead->oid.ino = ino;
reqhead->oid.rev = 0;
- off = list_to_page(pagels)->index << PAGE_SHIFT;
- len = nr_pages << PAGE_SHIFT;
+
calc_file_object_mapping(layout, &off, &len, &reqhead->oid,
&reqhead->offset, &reqhead->length);
- BUG_ON(len != 0);
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 19)
- nr_pages = DIV_ROUND_UP(reqhead->length, PAGE_SIZE);
-#else
- nr_pages = (reqhead->length + PAGE_SIZE - 1) / PAGE_SIZE;
-#endif
- calc_object_layout(&reqhead->layout, &reqhead->oid, layout, osdc->osdmap);
- dout(10, "readpage object block %u %llu~%llu\n", reqhead->oid.bno, reqhead->offset, reqhead->length);
+
+ req->r_nr_pages = ((reqhead->offset+reqhead->length+PAGE_CACHE_SIZE-1)
+ >> PAGE_CACHE_SHIFT) - (reqhead->offset >> PAGE_CACHE_SHIFT);
+ calc_object_layout(&reqhead->layout, &reqhead->oid, layout,
+ osdc->osdmap);
+ dout(10, "readpages object block %u of %llu~%llu\n", reqhead->oid.bno,
+ reqhead->offset, reqhead->length);
/* register request */
- req = alloc_request(nr_pages);
- if (IS_ERR(req))
- return PTR_ERR(req);
-
spin_lock(&osdc->lock);
- req = register_request(osdc, reqm, nr_pages, req);
+ req = register_request(osdc, reqm, req->r_nr_pages, req);
if (IS_ERR(req)) {
ceph_msg_put(reqm);
spin_unlock(&osdc->lock);
return PTR_ERR(req);
}
- /* prepage pages (add to page cache, request vector) */
- desc.off = off;
- desc.len = len;
- read_cache_pages(mapping, pagels, ceph_osdc_readpages_filler, &desc);
-
/* send request */
reqhead->osdmap_epoch = osdc->osdmap->epoch;
send_request(osdc, req);
spin_unlock(&osdc->lock);
/* wait */
- dout(10, "readpage tid %llu waiting for reply on %p\n", req->r_tid, req);
+ dout(10, "readpages tid %llu waiting on %p\n", req->r_tid, req);
wait_for_completion(&req->r_completion);
- dout(10, "readpage tid %llu got reply on %p\n", req->r_tid, req);
+ dout(10, "readpages tid %llu got reply on %p\n", req->r_tid, req);
spin_lock(&osdc->lock);
unregister_request(osdc, req);
reply = req->r_reply;
replyhead = reply->front.iov_base;
- dout(10, "readpage result %d\n", replyhead->result);
- put_request(req);
- return 0;
+ dout(10, "readpages result %d, read %d bytes\n", replyhead->result,
+ reply->hdr.data_len);
+ if (replyhead->result == 0)
+ return reply->hdr.data_len;
+ return replyhead->result;
}
nrp = calc_pages_for(len, off);
dout(10, "%d~%d -> %d pages\n", (int)off, (int)len, nrp);
- req = alloc_request(nrp);
+ req = ceph_osdc_alloc_request(nrp);
if (IS_ERR(req))
return PTR_ERR(req);
reply = req->r_reply;
replyhead = reply->front.iov_base;
dout(10, "silly_write result %d, returning %d\n", replyhead->result, (int)len);
- put_request(req);
+ ceph_osdc_put_request(req);
return (int)len;
}
reqhead = reqm->front.iov_base;
- req = alloc_request(1);
+ req = ceph_osdc_alloc_request(1);
if (IS_ERR(req))
return PTR_ERR(req);
reply = req->r_reply;
replyhead = reply->front.iov_base;
dout(10, "readpage result %d\n", replyhead->result);
- put_request(req);
+ ceph_osdc_put_request(req);
return 0;
}
reqm = new_request_msg(osdc, CEPH_OSD_OP_WRITE);
if (IS_ERR(reqm))
return PTR_ERR(reqm);
- req = alloc_request(nr_pages);
+ req = ceph_osdc_alloc_request(nr_pages);
if (IS_ERR(req)) {
ceph_msg_put(reqm);
return PTR_ERR(req);
req = register_request(osdc, reqm, nr_pages, req);
if (IS_ERR(req)) {
ceph_msg_put(reqm);
- put_request(req);
+ ceph_osdc_put_request(req);
spin_unlock(&osdc->lock);
return PTR_ERR(req);
}
replyhead = reply->front.iov_base;
ret = le32_to_cpu(replyhead->result);
dout(10, "writepages result %d\n", ret);
- put_request(req);
+ ceph_osdc_put_request(req);
/* return error, or number of bytes written */
if (ret < 0)