]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: readpages works
authorSage Weil <sage@newdream.net>
Tue, 1 Apr 2008 20:36:43 +0000 (13:36 -0700)
committerSage Weil <sage@newdream.net>
Tue, 1 Apr 2008 20:36:43 +0000 (13:36 -0700)
src/kernel/addr.c
src/kernel/file.c
src/kernel/osd_client.c
src/kernel/osd_client.h

index 3e4bc1231bd6cf4a69ebc221d84fbf74c318eb2d..23e08d5634e6ee00ebada86f4062b64b1c955036 100644 (file)
@@ -34,25 +34,79 @@ out_unlock:
 }
 
 static int ceph_readpages(struct file *file, struct address_space *mapping,
-                         struct list_head *pages, unsigned nr_pages)
+                         struct list_head *page_list, unsigned nr_pages)
 {
        struct inode *inode = file->f_dentry->d_inode;
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->osdc;
-       int err = 0;
+       int rc = 0;
+       struct page *page;
+       int contig_pages = 0;
+       int next_index;
+       struct ceph_osd_request *oreq;
+       struct pagevec pvec;
+       loff_t offset;
 
-       dout(10, "ceph_readpages inode %p file %p nr_pages %d\n",
+       dout(10, "readpages inode %p file %p nr_pages %d\n",
             inode, file, nr_pages);
 
-       err = ceph_osdc_readpages(osdc, mapping, ceph_ino(inode), &ci->i_layout,
-                                 pages, nr_pages);
-       if (err < 0)
-               goto out_unlock;
-
-out_unlock:
-       return err;
+       /* alloc request, w/ page vector */
+       oreq = ceph_osdc_alloc_request(nr_pages);
+       if (oreq == 0)
+               return -ENOMEM;
 
+       /* find adjacent pages */
+       BUG_ON(list_empty(page_list));
+       page = list_entry(page_list->prev, struct page, lru);
+       offset = page->index << PAGE_CACHE_SHIFT;
+       next_index = page->index;
+       contig_pages = 0;
+       list_for_each_entry_reverse(page, page_list, lru) {
+               if (page->index == next_index) {
+                       oreq->r_pages[contig_pages] = page;
+                       contig_pages++;
+                       next_index++;
+               } else
+                       break;
+       }
+       dout(10, "readpages found %d/%d contig\n", contig_pages, nr_pages);
+       if (contig_pages == 0)
+               return 0;
+       rc = ceph_osdc_readpages(osdc, mapping, ceph_ino(inode), &ci->i_layout,
+                                offset, contig_pages << PAGE_CACHE_SHIFT,
+                                oreq);
+       if (rc < 0)
+               goto out;
+       
+       /* set uptodate and add to lru in pagevec-sized chunks */
+       pagevec_init(&pvec, 0);
+       for (; rc > 0; rc -= PAGE_CACHE_SIZE) {
+               if (list_empty(&page_list))
+                       break;  /* WTF */
+               page = list_entry(page_list->prev, struct page, lru);
+               list_del(&page->lru);
+
+               if (add_to_page_cache(page, mapping, page->index,
+                                     GFP_KERNEL)) {
+                       page_cache_release(page);
+                       dout(20, "readpages add_to_page_cache failed on %p\n",
+                            page);
+                       continue;
+               }
+               dout(10, "readpages adding page %p\n", page);
+               flush_dcache_page(page);
+               SetPageUptodate(page);
+               unlock_page(page);
+               if (pagevec_add(&pvec, page) == 0)
+                       pagevec_lru_add(&pvec);
+       }
+       pagevec_lru_add(&pvec);
 
+out:
+       ceph_osdc_put_request(oreq);
+       if (rc < 0)
+               return rc;
+       return 0;
 }
 
 
@@ -507,7 +561,7 @@ const struct address_space_operations ceph_aops = {
        //.prepare_write = ceph_prepare_write,
        //.commit_write = ceph_commit_write,
        .readpage = ceph_readpage,
-       //.readpages = ceph_readpages,
+       .readpages = ceph_readpages,
        .writepage = ceph_writepage,
        .writepages = ceph_writepages,
 //     .set_page_dirty = ceph_set_page_dirty,
index 3831a6245d965d597bb3a24d3da12fedf2127e11..9951ca7de6ad047ee63d32fd68030b6a4ba34b50 100644 (file)
@@ -183,20 +183,22 @@ ssize_t ceph_read(struct file *filp, char __user *buf, size_t len, loff_t *ppos)
        ssize_t ret;
        int got = 0;
 
-       dout(10, "read trying to get caps\n");
+       dout(10, "read %llx %llu~%u trying to get caps\n",
+            ceph_ino(inode), *ppos, (unsigned)len);
        ret = wait_event_interruptible(ci->i_cap_wq,
                                       ceph_get_cap_refs(ci, CEPH_CAP_RD, 
                                                         CEPH_CAP_RDCACHE, 
                                                         &got));
        if (ret < 0) 
                goto out;
-       dout(10, "read got cap refs on %d\n", got);
+       dout(10, "read %llx %llu~%u got cap refs on %d\n",
+            ceph_ino(inode), *ppos, (unsigned)len, got);
 
        //if (got & CEPH_CAP_RDCACHE) {
        ret = do_sync_read(filp, buf, len, ppos);
 
 out:
-       dout(10, "read dropping cap refs on %d\n", got);
+       dout(10, "read %llx dropping cap refs on %d\n", ceph_ino(inode), got);
        ceph_put_cap_refs(ci, got);
        return ret;
 }
index e4b62b85b747243934dce2ce1d2ed66129468e05..3e0cf3d414e96fbd6baf2ac3b927e3e125806d88 100644 (file)
@@ -137,7 +137,7 @@ static void get_request(struct ceph_osd_request *req)
        atomic_inc(&req->r_ref);
 }
 
-static void put_request(struct ceph_osd_request *req)
+void ceph_osdc_put_request(struct ceph_osd_request *req)
 {
        BUG_ON(atomic_read(&req->r_ref) <= 0);
        if (atomic_dec_and_test(&req->r_ref)) {
@@ -165,7 +165,7 @@ struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int op)
        return req;
 }
 
-struct ceph_osd_request *alloc_request(int nr_pages)
+struct ceph_osd_request *ceph_osdc_alloc_request(int nr_pages)
 {
        struct ceph_osd_request *req;
 
@@ -243,7 +243,7 @@ static void unregister_request(struct ceph_osd_client *osdc,
 {
        dout(30, "unregister_request %p tid %lld\n", req, req->r_tid);
        radix_tree_delete(&osdc->request_tree, req->r_tid);
-       put_request(req);
+       ceph_osdc_put_request(req);
 }
 
 /*
@@ -279,7 +279,7 @@ void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
        req->r_flags |= rhead->flags;
        spin_unlock(&osdc->lock);
        complete(&req->r_completion);
-       put_request(req);
+       ceph_osdc_put_request(req);
 }
 
 
@@ -400,19 +400,13 @@ int ceph_osdc_readpages_filler(void *data, struct page *page)
 
 int ceph_osdc_readpages(struct ceph_osd_client *osdc,
                        struct address_space *mapping,
-                       ceph_ino_t ino,
-                       struct ceph_file_layout *layout, 
-                       struct list_head *pagels, int nr_pages)
+                       ceph_ino_t ino, struct ceph_file_layout *layout, 
+                       __u64 off, __u64 len,
+                       struct ceph_osd_request *req)
 {
        struct ceph_msg *reqm, *reply;
        struct ceph_osd_request_head *reqhead;
-       struct ceph_osd_request *req;
        struct ceph_osd_reply_head *replyhead;
-       __u64 off, len;
-       struct ceph_readdesc desc = {
-               .osdc = osdc,
-               .layout = layout
-       };
 
        /*
         * for now, our strategy is simple: start with the 
@@ -421,7 +415,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
         * nr_pages.
         */
 
-       dout(10, "readpage on ino %llx at %lld~%lld\n", ino, off, len);
+       dout(10, "readpages on ino %llx on %llu~%llu\n", ino, off, len);
 
        /* request msg */
        reqm = new_request_msg(osdc, CEPH_OSD_OP_READ);
@@ -430,46 +424,35 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
        reqhead = reqm->front.iov_base;
        reqhead->oid.ino = ino;
        reqhead->oid.rev = 0;
-       off = list_to_page(pagels)->index << PAGE_SHIFT;
-       len = nr_pages << PAGE_SHIFT;
+       
        calc_file_object_mapping(layout, &off, &len, &reqhead->oid,
                                 &reqhead->offset, &reqhead->length);
-       BUG_ON(len != 0);
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 19)
-       nr_pages = DIV_ROUND_UP(reqhead->length, PAGE_SIZE);
-#else
-       nr_pages = (reqhead->length + PAGE_SIZE - 1) / PAGE_SIZE;
-#endif
-       calc_object_layout(&reqhead->layout, &reqhead->oid, layout, osdc->osdmap);
-       dout(10, "readpage object block %u %llu~%llu\n", reqhead->oid.bno, reqhead->offset, reqhead->length);
+
+       req->r_nr_pages = ((reqhead->offset+reqhead->length+PAGE_CACHE_SIZE-1)
+                  >> PAGE_CACHE_SHIFT) - (reqhead->offset >> PAGE_CACHE_SHIFT);
+       calc_object_layout(&reqhead->layout, &reqhead->oid, layout, 
+                          osdc->osdmap);
+       dout(10, "readpages object block %u of %llu~%llu\n", reqhead->oid.bno, 
+            reqhead->offset, reqhead->length);
        
        /* register request */
-       req = alloc_request(nr_pages);
-       if (IS_ERR(req))
-               return PTR_ERR(req);
-
        spin_lock(&osdc->lock);
-       req = register_request(osdc, reqm, nr_pages, req);
+       req = register_request(osdc, reqm, req->r_nr_pages, req);
        if (IS_ERR(req)) {
                ceph_msg_put(reqm);
                spin_unlock(&osdc->lock);
                return PTR_ERR(req);
        }
 
-       /* prepage pages (add to page cache, request vector) */
-       desc.off = off;
-       desc.len = len;
-       read_cache_pages(mapping, pagels, ceph_osdc_readpages_filler, &desc);
-
        /* send request */
        reqhead->osdmap_epoch = osdc->osdmap->epoch;
        send_request(osdc, req);
        spin_unlock(&osdc->lock);
        
        /* wait */
-       dout(10, "readpage tid %llu waiting for reply on %p\n", req->r_tid, req);
+       dout(10, "readpages tid %llu waiting on %p\n", req->r_tid, req);
        wait_for_completion(&req->r_completion);
-       dout(10, "readpage tid %llu got reply on %p\n", req->r_tid, req);
+       dout(10, "readpages tid %llu got reply on %p\n", req->r_tid, req);
 
        spin_lock(&osdc->lock);
        unregister_request(osdc, req);
@@ -477,9 +460,11 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
 
        reply = req->r_reply;
        replyhead = reply->front.iov_base;
-       dout(10, "readpage result %d\n", replyhead->result);
-       put_request(req);
-       return 0;
+       dout(10, "readpages result %d, read %d bytes\n", replyhead->result,
+               reply->hdr.data_len);
+       if (replyhead->result == 0)
+               return reply->hdr.data_len;
+       return replyhead->result;
 }
 
 
@@ -520,7 +505,7 @@ int ceph_osdc_silly_write(struct ceph_osd_client *osdc, ceph_ino_t ino,
        nrp = calc_pages_for(len, off);
        dout(10, "%d~%d -> %d pages\n", (int)off, (int)len, nrp);
 
-       req = alloc_request(nrp);
+       req = ceph_osdc_alloc_request(nrp);
        if (IS_ERR(req))
                return PTR_ERR(req);
 
@@ -566,7 +551,7 @@ int ceph_osdc_silly_write(struct ceph_osd_client *osdc, ceph_ino_t ino,
        reply = req->r_reply;
        replyhead = reply->front.iov_base;
        dout(10, "silly_write result %d, returning %d\n", replyhead->result, (int)len);
-       put_request(req);
+       ceph_osdc_put_request(req);
        return (int)len;
 }
 
@@ -671,7 +656,7 @@ static int ceph_readpage_async(struct ceph_osd_client *osdc,
 
        reqhead = reqm->front.iov_base;
 
-       req = alloc_request(1);
+       req = ceph_osdc_alloc_request(1);
        if (IS_ERR(req))
                return PTR_ERR(req);
 
@@ -700,7 +685,7 @@ static int ceph_readpage_async(struct ceph_osd_client *osdc,
        reply = req->r_reply;
        replyhead = reply->front.iov_base;
        dout(10, "readpage result %d\n", replyhead->result);
-       put_request(req);
+       ceph_osdc_put_request(req);
 
        return 0;
 }
@@ -724,7 +709,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, ceph_ino_t ino,
        reqm = new_request_msg(osdc, CEPH_OSD_OP_WRITE);
        if (IS_ERR(reqm))
                return PTR_ERR(reqm);
-       req = alloc_request(nr_pages);
+       req = ceph_osdc_alloc_request(nr_pages);
        if (IS_ERR(req)) {
                ceph_msg_put(reqm);
                return PTR_ERR(req);
@@ -754,7 +739,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, ceph_ino_t ino,
        req = register_request(osdc, reqm, nr_pages, req);
        if (IS_ERR(req)) {
                ceph_msg_put(reqm);
-               put_request(req);
+               ceph_osdc_put_request(req);
                spin_unlock(&osdc->lock);
                return PTR_ERR(req);
        }
@@ -785,7 +770,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, ceph_ino_t ino,
        replyhead = reply->front.iov_base;
        ret = le32_to_cpu(replyhead->result);
        dout(10, "writepages result %d\n", ret);
-       put_request(req);
+       ceph_osdc_put_request(req);
        
        /* return error, or number of bytes written */
        if (ret < 0)
index 223a1347a09a80c7bf2c1d3d0a3fac7e40f6652a..0b8b6df7b97c916ca016b78c45af65fad444d803 100644 (file)
@@ -62,10 +62,10 @@ extern int ceph_osdc_readpage(struct ceph_osd_client *osdc, ceph_ino_t ino,
                              loff_t off, loff_t len,
                              struct page *page);
 extern int ceph_osdc_readpages(struct ceph_osd_client *osdc,
-                       struct address_space *mapping,
-                       ceph_ino_t ino,
-                       struct ceph_file_layout *layout,
-                       struct list_head *pagels, int nr_pages);
+                              struct address_space *mapping,
+                              ceph_ino_t ino, struct ceph_file_layout *layout,
+                              __u64 off, __u64 len,
+                              struct ceph_osd_request *req);
 extern int ceph_osdc_prepare_write(struct ceph_osd_client *osdc, ceph_ino_t ino,
                              struct ceph_file_layout *layout, 
                              loff_t off, loff_t len,
@@ -96,5 +96,8 @@ extern int ceph_osdc_writepage(struct ceph_osd_client *osdc, ceph_ino_t ino,
                                 loff_t off, loff_t len,
                                 struct page *page);
 
+extern struct ceph_osd_request *ceph_osdc_alloc_request(int nr_pages);
+extern void ceph_osdc_put_request(struct ceph_osd_request *req);
+
 #endif