struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->osdc;
int rc = 0;
struct page *page;
- int contig_pages = 0;
- int next_index;
struct ceph_osd_request *oreq;
struct pagevec pvec;
loff_t offset;
dout(10, "readpages inode %p file %p nr_pages %d\n",
inode, file, nr_pages);
- /* alloc request, w/ page vector */
- oreq = ceph_osdc_alloc_request(nr_pages);
- if (oreq == 0)
- return -ENOMEM;
-
- /* find adjacent pages */
+ /* guess read extent */
BUG_ON(list_empty(page_list));
page = list_entry(page_list->prev, struct page, lru);
offset = page->index << PAGE_CACHE_SHIFT;
- next_index = page->index;
- contig_pages = 0;
- list_for_each_entry_reverse(page, page_list, lru) {
- if (page->index == next_index) {
- oreq->r_pages[contig_pages] = page;
- contig_pages++;
- next_index++;
- } else
- break;
- }
- dout(10, "readpages found %d/%d contig\n", contig_pages, nr_pages);
- if (contig_pages == 0)
- return 0;
rc = ceph_osdc_readpages(osdc, mapping, ceph_ino(inode), &ci->i_layout,
- offset, contig_pages << PAGE_CACHE_SHIFT,
- oreq);
+ offset, nr_pages << PAGE_CACHE_SHIFT,
+ page_list, nr_pages);
if (rc < 0)
- goto out;
+ return rc;
/* set uptodate and add to lru in pagevec-sized chunks */
pagevec_init(&pvec, 0);
+ if (rc > 0)
+ rc += offset & ~PAGE_CACHE_MASK;
for (; rc > 0; rc -= PAGE_CACHE_SIZE) {
- if (list_empty(&page_list))
+ if (list_empty(page_list))
break; /* WTF */
page = list_entry(page_list->prev, struct page, lru);
list_del(&page->lru);
pagevec_lru_add(&pvec);
}
pagevec_lru_add(&pvec);
-
-out:
- ceph_osdc_put_request(oreq);
- if (rc < 0)
- return rc;
return 0;
}
atomic_inc(&req->r_ref);
}
-void ceph_osdc_put_request(struct ceph_osd_request *req)
+static void put_request(struct ceph_osd_request *req)
{
BUG_ON(atomic_read(&req->r_ref) <= 0);
if (atomic_dec_and_test(&req->r_ref)) {
return req;
}
-struct ceph_osd_request *ceph_osdc_alloc_request(int nr_pages)
+static struct ceph_osd_request *alloc_request(int nr_pages)
{
struct ceph_osd_request *req;
{
dout(30, "unregister_request %p tid %lld\n", req, req->r_tid);
radix_tree_delete(&osdc->request_tree, req->r_tid);
- ceph_osdc_put_request(req);
+ put_request(req);
}
/*
req->r_flags |= rhead->flags;
spin_unlock(&osdc->lock);
complete(&req->r_completion);
- ceph_osdc_put_request(req);
+ put_request(req);
}
dout(10, "prepare_pages unknown tid %llu\n", tid);
goto out;
}
- if (likely(req->r_nr_pages == want)) {
- dout(10, "prepare_pages tid %llu using existing page vec\n", tid);
+ dout(10, "prepare_pages tid %llu have %d pages, want %d\n",
+ tid, req->r_nr_pages, want);
+ if (likely(req->r_nr_pages >= want)) {
m->pages = req->r_pages;
m->nr_pages = req->r_nr_pages;
ret = 0; /* success */
- } else {
- dout(10, "prepare_pages tid %llu have %d pages, reply wants %d\n",
- tid, req->r_nr_pages, want);
}
out:
spin_unlock(&osdc->lock);
struct address_space *mapping,
ceph_ino_t ino, struct ceph_file_layout *layout,
__u64 off, __u64 len,
- struct ceph_osd_request *req)
+ struct list_head *page_list, int nr_pages)
{
struct ceph_msg *reqm, *reply;
+ struct ceph_osd_request *req;
struct ceph_osd_request_head *reqhead;
struct ceph_osd_reply_head *replyhead;
+ struct page *page;
+ pgoff_t next_index;
+ int contig_pages;
/*
* for now, our strategy is simple: start with the
dout(10, "readpages on ino %llx on %llu~%llu\n", ino, off, len);
+ /* alloc request, w/ page vector */
+ req = alloc_request(nr_pages);
+ if (req == 0)
+ return -ENOMEM;
+
+ /* find adjacent pages */
+ next_index = list_entry(page_list->prev, struct page, lru)->index;
+ contig_pages = 0;
+ list_for_each_entry_reverse(page, page_list, lru) {
+ if (page->index == next_index) {
+ req->r_pages[contig_pages] = page;
+ contig_pages++;
+ next_index++;
+ } else
+ break;
+ }
+ dout(10, "readpages found %d/%d contig\n", contig_pages, nr_pages);
+ if (contig_pages == 0)
+ return 0;
+ len = min((contig_pages << PAGE_CACHE_SHIFT) - (off & ~PAGE_CACHE_MASK),
+ len);
+ dout(10, "readpages final extent is %llu~%llu\n", off, len);
+
/* request msg */
reqm = new_request_msg(osdc, CEPH_OSD_OP_READ);
- if (IS_ERR(reqm))
+ if (IS_ERR(reqm)) {
+ put_request(req);
return PTR_ERR(reqm);
+ }
reqhead = reqm->front.iov_base;
reqhead->oid.ino = ino;
reqhead->oid.rev = 0;
nrp = calc_pages_for(len, off);
dout(10, "%d~%d -> %d pages\n", (int)off, (int)len, nrp);
- req = ceph_osdc_alloc_request(nrp);
+ req = alloc_request(nrp);
if (IS_ERR(req))
return PTR_ERR(req);
reply = req->r_reply;
replyhead = reply->front.iov_base;
dout(10, "silly_write result %d, returning %d\n", replyhead->result, (int)len);
- ceph_osdc_put_request(req);
+ put_request(req);
return (int)len;
}
reqhead = reqm->front.iov_base;
- req = ceph_osdc_alloc_request(1);
+ req = alloc_request(1);
if (IS_ERR(req))
return PTR_ERR(req);
reply = req->r_reply;
replyhead = reply->front.iov_base;
dout(10, "readpage result %d\n", replyhead->result);
- ceph_osdc_put_request(req);
+ put_request(req);
return 0;
}
reqm = new_request_msg(osdc, CEPH_OSD_OP_WRITE);
if (IS_ERR(reqm))
return PTR_ERR(reqm);
- req = ceph_osdc_alloc_request(nr_pages);
+ req = alloc_request(nr_pages);
if (IS_ERR(req)) {
ceph_msg_put(reqm);
return PTR_ERR(req);
req = register_request(osdc, reqm, nr_pages, req);
if (IS_ERR(req)) {
ceph_msg_put(reqm);
- ceph_osdc_put_request(req);
+ put_request(req);
spin_unlock(&osdc->lock);
return PTR_ERR(req);
}
replyhead = reply->front.iov_base;
ret = le32_to_cpu(replyhead->result);
dout(10, "writepages result %d\n", ret);
- ceph_osdc_put_request(req);
+ put_request(req);
/* return error, or number of bytes written */
if (ret < 0)