]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: carry CAP_WRBUFFER until writeback completes
authorSage Weil <sage@newdream.net>
Mon, 7 Apr 2008 23:13:16 +0000 (16:13 -0700)
committerSage Weil <sage@newdream.net>
Mon, 7 Apr 2008 23:24:49 +0000 (16:24 -0700)
src/kernel/addr.c
src/kernel/inode.c
src/kernel/super.c
src/kernel/super.h

index 5fd159a52230361c7b78860a3d865310e7716c3f..7f08bd7d64b9a07bc5acc270b63994bdf3f2b68c 100644 (file)
@@ -108,6 +108,7 @@ static int ceph_writepage(struct page *page, struct writeback_control *wbc)
        int len = PAGE_CACHE_SIZE;
        loff_t i_size;
        int err = 0;
+       int was_dirty;
        
        if (!page->mapping || !page->mapping->host)
                return -EFAULT;
@@ -124,14 +125,17 @@ static int ceph_writepage(struct page *page, struct writeback_control *wbc)
             inode, page, page->index, page_off, len);
        
        page_cache_get(page);
+       was_dirty = PageDirty(page);
        set_page_writeback(page);
        kmap(page);
        err = ceph_osdc_writepages(osdc, ceph_ino(inode), &ci->i_layout,
                                   page_off, len, &page, 1);
        kunmap(page);
-       if (err >= 0)
+       if (err >= 0) {
+               if (was_dirty)
+                       ceph_put_wrbuffer_cap_refs(ci, 1);
                SetPageUptodate(page);
-       else
+       else
                redirty_page_for_writepage(wbc, page);  /* is this right?? */
        unlock_page(page);
        end_page_writeback(page);
@@ -222,11 +226,14 @@ retry:
                                                want);
                for (i = 0; i < pvec_pages && locked_pages < max_pages; i++) {
                        page = pvec.pages[i];
+
                        if (locked_pages == 0) 
                                lock_page(page);
                        else if (TestSetPageLocked(page))
                                break;
-                       if (unlikely(page->mapping != mapping)) {
+                       /* only dirty pages, or wrbuffer accounting breaks! */
+                       if (unlikely(!PageDirty(page)) ||
+                           unlikely(page->mapping != mapping)) {
                                unlock_page(page);
                                break;
                        }
@@ -242,6 +249,7 @@ retry:
                        }
                        if (wbc->sync_mode != WB_SYNC_NONE)
                                wait_on_page_writeback(page);
+
                        if (PageWriteback(page) ||
                            !clear_page_dirty_for_io(page)) {
                                unlock_page(page);
@@ -250,15 +258,17 @@ retry:
 
                        /* ok */
                        set_page_writeback(page);
+
                        if (page_offset(page) >= i_size_read(inode)) {
                                done = 1;
                                unlock_page(page);
                                end_page_writeback(page);
                                break;
                        }
-
+                       
                        dout(50, "writepages locked page %p index %lu\n",
                             page, page->index);
+
                        kmap(page);
                        if (pages)
                                pages[locked_pages] = page;
@@ -295,6 +305,7 @@ retry:
                        loff_t offset = pagep[0]->index << PAGE_CACHE_SHIFT;
                        loff_t len = min(i_size_read(inode) - offset,
                                 (loff_t)locked_pages << PAGE_CACHE_SHIFT);
+                       unsigned wrote;
                        dout(10, "writepages got %d pages at %llu~%llu\n",
                             locked_pages, offset, len);
                        rc = ceph_osdc_writepages(&client->osdc, 
@@ -303,22 +314,27 @@ retry:
                                                  offset, len,
                                                  pagep,
                                                  locked_pages);
-                       dout(20, "writepages rc %d\n", rc);
+                       if (rc >= 0)
+                               wrote = (rc + (offset & ~PAGE_CACHE_MASK) 
+                                        + ~PAGE_CACHE_MASK) 
+                                       >> PAGE_CACHE_SHIFT;
+                       else 
+                               wrote = 0;                              
+                       dout(20, "writepages rc %d wrote %d\n", rc, wrote);
                                                     
                        /* unmap+unlock pages */
-                       if (rc >= 0)
-                               rc += offset & ~PAGE_CACHE_MASK;
                        for (i = 0; i < locked_pages; i++) {
                                page = pagep[i];
-                               if (rc > (i << PAGE_CACHE_SHIFT))
+                               if (i < wrote)
                                        SetPageUptodate(page);
                                else if (rc < 0)
-                                       SetPageError(page);
+                                       redirty_page_for_writepage(wbc, page);
                                kunmap(page);
                                dout(50, "unlocking %d %p\n", i, page);
                                unlock_page(page);
                                end_page_writeback(page);
                        }                               
+                       ceph_put_wrbuffer_cap_refs(ci, wrote);
 
                        /* continue? */
                        index = next;
@@ -440,6 +456,10 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
        if (!PageUptodate(page))
                SetPageUptodate(page);
 
+       if (!PageDirty(page))
+               ceph_take_cap_refs(ceph_inode(inode), CEPH_CAP_WRBUFFER);
+       else
+               dout(10, "page %p already dirty\n", page);
        set_page_dirty(page);
 
        unlock_page(page);
@@ -454,17 +474,7 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
  * page accounting
  */
 
-static int ceph_set_page_dirty(struct page *page)
-{
-       struct ceph_inode_info *ci = ceph_inode(page->mapping->host);
-       spin_lock(&ci->vfs_inode.i_lock);
-       dout(10, "set_page_dirty %p : %d -> %d \n", page,
-            ci->i_nr_dirty_pages, ci->i_nr_dirty_pages + 1);
-       ci->i_nr_dirty_pages++;
-       spin_lock(&ci->vfs_inode.i_lock);
-       return 0;
-}
-
+/*
 static int ceph_releasepage(struct page *page, gfp_t gfpmask)
 {
        struct ceph_inode_info *ci = ceph_inode(page->mapping->host);
@@ -479,7 +489,7 @@ static int ceph_releasepage(struct page *page, gfp_t gfpmask)
                ceph_check_caps_wanted(ci, gfpmask);
        return 0;
 }
-
+*/
 
 const struct address_space_operations ceph_aops = {
        .readpage = ceph_readpage,
@@ -487,7 +497,7 @@ const struct address_space_operations ceph_aops = {
        .writepage = ceph_writepage,
        .writepages = ceph_writepages,
        .write_begin = ceph_write_begin,
-       .write_end = simple_write_end, /*ceph_write_end,*/
+       .write_end = ceph_write_end,
 //     .set_page_dirty = ceph_set_page_dirty,
-       .releasepage = ceph_releasepage,
+       //.releasepage = ceph_releasepage,
 };
index 5bf736e8a937fb473b1b024b5615fcc2efb0bff6..744081c6c95c9300e500eb70428ef7bcb77afb0d 100644 (file)
@@ -994,6 +994,14 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got)
                ci->i_wrbuffer_ref++;
 }
 
+void ceph_take_cap_refs(struct ceph_inode_info *ci, int got)
+{
+       dout(20, "take_cap_refs on %p taking %d\n", &ci->vfs_inode, got);
+       spin_lock(&ci->vfs_inode.i_lock);
+       __take_cap_refs(ci, got);
+       spin_unlock(&ci->vfs_inode.i_lock);
+}      
+
 int ceph_get_cap_refs(struct ceph_inode_info *ci, int need, int want, int *got)
 {
        int ret = 0;
@@ -1002,7 +1010,7 @@ int ceph_get_cap_refs(struct ceph_inode_info *ci, int need, int want, int *got)
             need, want);
        spin_lock(&ci->vfs_inode.i_lock);
        have = __ceph_caps_issued(ci);
-       dout(10, "get_cap_refs have %d\n", have);
+       dout(20, "get_cap_refs have %d\n", have);
        if ((have & need) == need) {
                *got = need | (have & want);
                __take_cap_refs(ci, *got);
@@ -1029,14 +1037,34 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
                if (--ci->i_wr_ref == 0)
                        last++;
        if (had & CEPH_CAP_WRBUFFER)
-               if (--ci->i_wrbuffer_ref)
+               if (--ci->i_wrbuffer_ref == 0)
                        last++;
        spin_unlock(&ci->vfs_inode.i_lock);
 
+       dout(10, "put_cap_refs on %p had %d %s\n", &ci->vfs_inode, had,
+            last ? "last":"");
+       
        if (last) 
                ceph_check_caps_wanted(ci, GFP_KERNEL);
 }
 
+void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr)
+{
+       int last = 0;
+
+       spin_lock(&ci->vfs_inode.i_lock);
+       ci->i_wrbuffer_ref -= nr;
+       if (ci->i_wrbuffer_ref == 0)
+               last++;
+       BUG_ON(ci->i_wrbuffer_ref < 0);
+       spin_unlock(&ci->vfs_inode.i_lock);
+       
+       dout(10, "put_wrbuffer_cap_refs on %p nr %d %s\n", &ci->vfs_inode, nr,
+            last ? "last":"");
+       
+       if (last) 
+               ceph_check_caps_wanted(ci, GFP_KERNEL);
+}
 
 
 /*
index 7c944dee01458e3713057f4ce789051de50c0171..657a0ee1c5f70eea49ef0e3493d64ae4ee669b4e 100644 (file)
@@ -146,7 +146,6 @@ static struct inode *ceph_alloc_inode(struct super_block *sb)
 
        ci->i_rd_ref = ci->i_rdcache_ref = 0;
        ci->i_wr_ref = ci->i_wrbuffer_ref = 0;
-       ci->i_nr_pages = ci->i_nr_dirty_pages = 0;
 
        ci->i_hashval = 0;
 
index 6aec8db8dd6d37b0e489e6096b62cf8552c6ca12..3cba259ada3dde9db8c1496797ed38e5aaff586c 100644 (file)
@@ -169,8 +169,6 @@ struct ceph_inode_info {
        /* held references to caps */
        int i_rd_ref, i_rdcache_ref, i_wr_ref, i_wrbuffer_ref;
 
-       int i_nr_pages, i_nr_dirty_pages; // hrm!
-
        unsigned long i_hashval;
 
        struct inode vfs_inode; /* at end */
@@ -380,7 +378,9 @@ extern int ceph_handle_cap_trunc(struct inode *inode,
                                 struct ceph_mds_file_caps *grant,
                                 struct ceph_mds_session *session);
 extern int ceph_get_cap_refs(struct ceph_inode_info *ci, int need, int want, int *got);
+extern void ceph_take_cap_refs(struct ceph_inode_info *ci, int got);
 extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
+extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr);
 extern void ceph_check_caps_wanted(struct ceph_inode_info *ci, gfp_t gfpmask);
 extern void ceph_get_mode(struct ceph_inode_info *ci, int mode);
 extern void ceph_put_mode(struct ceph_inode_info *ci, int mode);