From: Sage Weil Date: Tue, 1 Apr 2008 20:36:43 +0000 (-0700) Subject: kclient: readpages works X-Git-Tag: v0.2~232 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=acd63b0bc63743e7f232f1e5eea9e46944db1f16;p=ceph.git kclient: readpages works --- diff --git a/src/kernel/addr.c b/src/kernel/addr.c index 3e4bc1231bd6..23e08d5634e6 100644 --- a/src/kernel/addr.c +++ b/src/kernel/addr.c @@ -34,25 +34,79 @@ out_unlock: } static int ceph_readpages(struct file *file, struct address_space *mapping, - struct list_head *pages, unsigned nr_pages) + struct list_head *page_list, unsigned nr_pages) { struct inode *inode = file->f_dentry->d_inode; struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->osdc; - int err = 0; + int rc = 0; + struct page *page; + int contig_pages = 0; + int next_index; + struct ceph_osd_request *oreq; + struct pagevec pvec; + loff_t offset; - dout(10, "ceph_readpages inode %p file %p nr_pages %d\n", + dout(10, "readpages inode %p file %p nr_pages %d\n", inode, file, nr_pages); - err = ceph_osdc_readpages(osdc, mapping, ceph_ino(inode), &ci->i_layout, - pages, nr_pages); - if (err < 0) - goto out_unlock; - -out_unlock: - return err; + /* alloc request, w/ page vector */ + oreq = ceph_osdc_alloc_request(nr_pages); + if (oreq == 0) + return -ENOMEM; + /* find adjacent pages */ + BUG_ON(list_empty(page_list)); + page = list_entry(page_list->prev, struct page, lru); + offset = page->index << PAGE_CACHE_SHIFT; + next_index = page->index; + contig_pages = 0; + list_for_each_entry_reverse(page, page_list, lru) { + if (page->index == next_index) { + oreq->r_pages[contig_pages] = page; + contig_pages++; + next_index++; + } else + break; + } + dout(10, "readpages found %d/%d contig\n", contig_pages, nr_pages); + if (contig_pages == 0) + return 0; + rc = ceph_osdc_readpages(osdc, mapping, ceph_ino(inode), &ci->i_layout, + offset, contig_pages << PAGE_CACHE_SHIFT, + oreq); + if (rc < 0) + goto out; + + /* set uptodate and add to lru in pagevec-sized chunks */ + pagevec_init(&pvec, 0); + for (; rc > 0; rc -= PAGE_CACHE_SIZE) { + if (list_empty(&page_list)) + break; /* WTF */ + page = list_entry(page_list->prev, struct page, lru); + list_del(&page->lru); + + if (add_to_page_cache(page, mapping, page->index, + GFP_KERNEL)) { + page_cache_release(page); + dout(20, "readpages add_to_page_cache failed on %p\n", + page); + continue; + } + dout(10, "readpages adding page %p\n", page); + flush_dcache_page(page); + SetPageUptodate(page); + unlock_page(page); + if (pagevec_add(&pvec, page) == 0) + pagevec_lru_add(&pvec); + } + pagevec_lru_add(&pvec); +out: + ceph_osdc_put_request(oreq); + if (rc < 0) + return rc; + return 0; } @@ -507,7 +561,7 @@ const struct address_space_operations ceph_aops = { //.prepare_write = ceph_prepare_write, //.commit_write = ceph_commit_write, .readpage = ceph_readpage, - //.readpages = ceph_readpages, + .readpages = ceph_readpages, .writepage = ceph_writepage, .writepages = ceph_writepages, // .set_page_dirty = ceph_set_page_dirty, diff --git a/src/kernel/file.c b/src/kernel/file.c index 3831a6245d96..9951ca7de6ad 100644 --- a/src/kernel/file.c +++ b/src/kernel/file.c @@ -183,20 +183,22 @@ ssize_t ceph_read(struct file *filp, char __user *buf, size_t len, loff_t *ppos) ssize_t ret; int got = 0; - dout(10, "read trying to get caps\n"); + dout(10, "read %llx %llu~%u trying to get caps\n", + ceph_ino(inode), *ppos, (unsigned)len); ret = wait_event_interruptible(ci->i_cap_wq, ceph_get_cap_refs(ci, CEPH_CAP_RD, CEPH_CAP_RDCACHE, &got)); if (ret < 0) goto out; - dout(10, "read got cap refs on %d\n", got); + dout(10, "read %llx %llu~%u got cap refs on %d\n", + ceph_ino(inode), *ppos, (unsigned)len, got); //if (got & CEPH_CAP_RDCACHE) { ret = do_sync_read(filp, buf, len, ppos); out: - dout(10, "read dropping cap refs on %d\n", got); + dout(10, "read %llx dropping cap refs on %d\n", ceph_ino(inode), got); ceph_put_cap_refs(ci, got); return ret; } diff --git a/src/kernel/osd_client.c b/src/kernel/osd_client.c index e4b62b85b747..3e0cf3d414e9 100644 --- a/src/kernel/osd_client.c +++ b/src/kernel/osd_client.c @@ -137,7 +137,7 @@ static void get_request(struct ceph_osd_request *req) atomic_inc(&req->r_ref); } -static void put_request(struct ceph_osd_request *req) +void ceph_osdc_put_request(struct ceph_osd_request *req) { BUG_ON(atomic_read(&req->r_ref) <= 0); if (atomic_dec_and_test(&req->r_ref)) { @@ -165,7 +165,7 @@ struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int op) return req; } -struct ceph_osd_request *alloc_request(int nr_pages) +struct ceph_osd_request *ceph_osdc_alloc_request(int nr_pages) { struct ceph_osd_request *req; @@ -243,7 +243,7 @@ static void unregister_request(struct ceph_osd_client *osdc, { dout(30, "unregister_request %p tid %lld\n", req, req->r_tid); radix_tree_delete(&osdc->request_tree, req->r_tid); - put_request(req); + ceph_osdc_put_request(req); } /* @@ -279,7 +279,7 @@ void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) req->r_flags |= rhead->flags; spin_unlock(&osdc->lock); complete(&req->r_completion); - put_request(req); + ceph_osdc_put_request(req); } @@ -400,19 +400,13 @@ int ceph_osdc_readpages_filler(void *data, struct page *page) int ceph_osdc_readpages(struct ceph_osd_client *osdc, struct address_space *mapping, - ceph_ino_t ino, - struct ceph_file_layout *layout, - struct list_head *pagels, int nr_pages) + ceph_ino_t ino, struct ceph_file_layout *layout, + __u64 off, __u64 len, + struct ceph_osd_request *req) { struct ceph_msg *reqm, *reply; struct ceph_osd_request_head *reqhead; - struct ceph_osd_request *req; struct ceph_osd_reply_head *replyhead; - __u64 off, len; - struct ceph_readdesc desc = { - .osdc = osdc, - .layout = layout - }; /* * for now, our strategy is simple: start with the @@ -421,7 +415,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, * nr_pages. */ - dout(10, "readpage on ino %llx at %lld~%lld\n", ino, off, len); + dout(10, "readpages on ino %llx on %llu~%llu\n", ino, off, len); /* request msg */ reqm = new_request_msg(osdc, CEPH_OSD_OP_READ); @@ -430,46 +424,35 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, reqhead = reqm->front.iov_base; reqhead->oid.ino = ino; reqhead->oid.rev = 0; - off = list_to_page(pagels)->index << PAGE_SHIFT; - len = nr_pages << PAGE_SHIFT; + calc_file_object_mapping(layout, &off, &len, &reqhead->oid, &reqhead->offset, &reqhead->length); - BUG_ON(len != 0); -#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 19) - nr_pages = DIV_ROUND_UP(reqhead->length, PAGE_SIZE); -#else - nr_pages = (reqhead->length + PAGE_SIZE - 1) / PAGE_SIZE; -#endif - calc_object_layout(&reqhead->layout, &reqhead->oid, layout, osdc->osdmap); - dout(10, "readpage object block %u %llu~%llu\n", reqhead->oid.bno, reqhead->offset, reqhead->length); + + req->r_nr_pages = ((reqhead->offset+reqhead->length+PAGE_CACHE_SIZE-1) + >> PAGE_CACHE_SHIFT) - (reqhead->offset >> PAGE_CACHE_SHIFT); + calc_object_layout(&reqhead->layout, &reqhead->oid, layout, + osdc->osdmap); + dout(10, "readpages object block %u of %llu~%llu\n", reqhead->oid.bno, + reqhead->offset, reqhead->length); /* register request */ - req = alloc_request(nr_pages); - if (IS_ERR(req)) - return PTR_ERR(req); - spin_lock(&osdc->lock); - req = register_request(osdc, reqm, nr_pages, req); + req = register_request(osdc, reqm, req->r_nr_pages, req); if (IS_ERR(req)) { ceph_msg_put(reqm); spin_unlock(&osdc->lock); return PTR_ERR(req); } - /* prepage pages (add to page cache, request vector) */ - desc.off = off; - desc.len = len; - read_cache_pages(mapping, pagels, ceph_osdc_readpages_filler, &desc); - /* send request */ reqhead->osdmap_epoch = osdc->osdmap->epoch; send_request(osdc, req); spin_unlock(&osdc->lock); /* wait */ - dout(10, "readpage tid %llu waiting for reply on %p\n", req->r_tid, req); + dout(10, "readpages tid %llu waiting on %p\n", req->r_tid, req); wait_for_completion(&req->r_completion); - dout(10, "readpage tid %llu got reply on %p\n", req->r_tid, req); + dout(10, "readpages tid %llu got reply on %p\n", req->r_tid, req); spin_lock(&osdc->lock); unregister_request(osdc, req); @@ -477,9 +460,11 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, reply = req->r_reply; replyhead = reply->front.iov_base; - dout(10, "readpage result %d\n", replyhead->result); - put_request(req); - return 0; + dout(10, "readpages result %d, read %d bytes\n", replyhead->result, + reply->hdr.data_len); + if (replyhead->result == 0) + return reply->hdr.data_len; + return replyhead->result; } @@ -520,7 +505,7 @@ int ceph_osdc_silly_write(struct ceph_osd_client *osdc, ceph_ino_t ino, nrp = calc_pages_for(len, off); dout(10, "%d~%d -> %d pages\n", (int)off, (int)len, nrp); - req = alloc_request(nrp); + req = ceph_osdc_alloc_request(nrp); if (IS_ERR(req)) return PTR_ERR(req); @@ -566,7 +551,7 @@ int ceph_osdc_silly_write(struct ceph_osd_client *osdc, ceph_ino_t ino, reply = req->r_reply; replyhead = reply->front.iov_base; dout(10, "silly_write result %d, returning %d\n", replyhead->result, (int)len); - put_request(req); + ceph_osdc_put_request(req); return (int)len; } @@ -671,7 +656,7 @@ static int ceph_readpage_async(struct ceph_osd_client *osdc, reqhead = reqm->front.iov_base; - req = alloc_request(1); + req = ceph_osdc_alloc_request(1); if (IS_ERR(req)) return PTR_ERR(req); @@ -700,7 +685,7 @@ static int ceph_readpage_async(struct ceph_osd_client *osdc, reply = req->r_reply; replyhead = reply->front.iov_base; dout(10, "readpage result %d\n", replyhead->result); - put_request(req); + ceph_osdc_put_request(req); return 0; } @@ -724,7 +709,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, ceph_ino_t ino, reqm = new_request_msg(osdc, CEPH_OSD_OP_WRITE); if (IS_ERR(reqm)) return PTR_ERR(reqm); - req = alloc_request(nr_pages); + req = ceph_osdc_alloc_request(nr_pages); if (IS_ERR(req)) { ceph_msg_put(reqm); return PTR_ERR(req); @@ -754,7 +739,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, ceph_ino_t ino, req = register_request(osdc, reqm, nr_pages, req); if (IS_ERR(req)) { ceph_msg_put(reqm); - put_request(req); + ceph_osdc_put_request(req); spin_unlock(&osdc->lock); return PTR_ERR(req); } @@ -785,7 +770,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, ceph_ino_t ino, replyhead = reply->front.iov_base; ret = le32_to_cpu(replyhead->result); dout(10, "writepages result %d\n", ret); - put_request(req); + ceph_osdc_put_request(req); /* return error, or number of bytes written */ if (ret < 0) diff --git a/src/kernel/osd_client.h b/src/kernel/osd_client.h index 223a1347a09a..0b8b6df7b97c 100644 --- a/src/kernel/osd_client.h +++ b/src/kernel/osd_client.h @@ -62,10 +62,10 @@ extern int ceph_osdc_readpage(struct ceph_osd_client *osdc, ceph_ino_t ino, loff_t off, loff_t len, struct page *page); extern int ceph_osdc_readpages(struct ceph_osd_client *osdc, - struct address_space *mapping, - ceph_ino_t ino, - struct ceph_file_layout *layout, - struct list_head *pagels, int nr_pages); + struct address_space *mapping, + ceph_ino_t ino, struct ceph_file_layout *layout, + __u64 off, __u64 len, + struct ceph_osd_request *req); extern int ceph_osdc_prepare_write(struct ceph_osd_client *osdc, ceph_ino_t ino, struct ceph_file_layout *layout, loff_t off, loff_t len, @@ -96,5 +96,8 @@ extern int ceph_osdc_writepage(struct ceph_osd_client *osdc, ceph_ino_t ino, loff_t off, loff_t len, struct page *page); +extern struct ceph_osd_request *ceph_osdc_alloc_request(int nr_pages); +extern void ceph_osdc_put_request(struct ceph_osd_request *req); + #endif