}
-static int ceph_readpage(struct file *filp, struct page *page)
+static int readpage_nounlock(struct file *filp, struct page *page)
{
struct inode *inode = filp->f_dentry->d_inode;
struct ceph_inode_info *ci = ceph_inode(inode);
page->index << PAGE_SHIFT, PAGE_SIZE, page);
if (unlikely(err < 0))
goto out;
-
if (unlikely(err < PAGE_CACHE_SIZE)) {
void *kaddr = kmap_atomic(page, KM_USER0);
dout(10, "readpage zeroing tail %d bytes of page %p\n",
SetPageUptodate(page);
out:
- unlock_page(page);
return err;
}
+static int ceph_readpage(struct file *filp, struct page *page)
+{
+ int r = readpage_nounlock(filp, page);
+ unlock_page(page);
+ return r;
+}
+
static int ceph_readpages(struct file *file, struct address_space *mapping,
struct list_head *page_list, unsigned nr_pages)
{
list_for_each(p, &ci->i_cap_snaps) {
capsnap = list_entry(p, struct ceph_cap_snap, ci_item);
- dout(20, " cap_snap %p has %d dirty pages\n", capsnap,
- capsnap->dirty);
+ dout(20, " cap_snap %p snapc %p has %d dirty pages\n", capsnap,
+ capsnap->context, capsnap->dirty);
if (capsnap->dirty)
break;
}
snapc = ceph_get_snap_context(capsnap->context);
} else if (ci->i_snap_realm) {
snapc = ceph_get_snap_context(ci->i_snap_realm->cached_context);
- dout(20, " head has %d dirty pages\n", ci->i_wrbuffer_ref_head);
+ dout(20, " head snapc %p has %d dirty pages\n",
+ snapc, ci->i_wrbuffer_ref_head);
}
-
return snapc;
}
dout(20, " no snap context with dirty data?\n");
goto out;
}
- dout(20, " snapc is %p seq %lld (%d snaps)\n",
+ dout(20, " oldest snapc is %p seq %lld (%d snaps)\n",
snapc, snapc->seq, snapc->num_snaps);
while (!done && index <= end) {
{
struct inode *inode = file->f_dentry->d_inode;
struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->osdc;
struct ceph_mds_client *mdsc = &ceph_inode_to_client(inode)->mdsc;
struct page *page;
pgoff_t index = pos >> PAGE_CACHE_SHIFT;
dout(10, "write_begin file %p inode %p page %p %d~%d\n", file,
inode, page, (int)pos, (int)len);
+retry_locked:
/* writepages currently holds page lock, but if we change that later, */
wait_on_page_writeback(page);
if (page->private &&
(void *)page->private != ci->i_snap_realm->cached_context) {
snapc = get_oldest_context(inode);
- if (snapc == (void *)page->private) {
- /* yay, writeable, do it now */
- dout(10, " page %p snapc %p not current, but oldest\n",
- page, snapc);
- r = writepage_nounlock(page, 0);
- if (r < 0)
- goto fail;
- } else {
+ up_read(&mdsc->snap_rwsem);
+ if (snapc != (void *)page->private) {
dout(10, " page %p snapc %p not current or oldest\n",
page, (void *)page->private);
/* queue for writeback, and wait */
context_is_writeable(inode, snapc));
ceph_put_snap_context(snapc);
if (r < 0)
- return r; /* FIXME? */
+ goto fail_nosnap;
goto retry;
}
+
+ /* yay, writeable, do it now */
+ dout(10, " page %p snapc %p not current, but oldest\n",
+ page, snapc);
+ if (!clear_page_dirty_for_io(page))
+ goto retry_locked;
+ r = writepage_nounlock(page, 0);
+ if (r < 0)
+ goto fail_nosnap;
+ goto retry_locked;
}
if (PageUptodate(page))
}
/* we need to read it. */
- /* or, do sub-page granularity dirty accounting? */
- /* try to read the full page */
- r = ceph_osdc_readpage(osdc, ceph_vino(inode), &ci->i_layout,
- page_off, PAGE_SIZE, page);
+ up_read(&mdsc->snap_rwsem);
+ r = readpage_nounlock(file, page);
if (r < 0)
goto fail;
- if (r < pos_in_page) {
- void *kaddr = kmap_atomic(page, KM_USER1);
- dout(20, "write_begin zeroing pre %d~%d\n", r, pos_in_page-r);
- memset(kaddr+r, 0, pos_in_page-r);
- flush_dcache_page(page);
- kunmap_atomic(kaddr, KM_USER1);
- }
- end_in_page = pos_in_page + len;
- if (end_in_page < PAGE_SIZE && r < PAGE_SIZE) {
- void *kaddr = kmap_atomic(page, KM_USER1);
- dout(20, "write_begin zeroing post %d~%d\n", end_in_page,
- (int)PAGE_SIZE - end_in_page);
- memset(kaddr+end_in_page, 0, PAGE_SIZE-end_in_page);
- flush_dcache_page(page);
- kunmap_atomic(kaddr, KM_USER1);
- }
- return 0;
+ goto retry_locked;
fail:
- unlock_page(page);
up_read(&mdsc->snap_rwsem);
+fail_nosnap:
+ unlock_page(page);
return r;
}
int mds;
u64 follows = 0;
struct list_head *p;
- struct ceph_cap_snap *snapcap;
+ struct ceph_cap_snap *capsnap;
int issued;
u64 size;
struct timespec mtime, atime, ctime;
dout(10, "__flush_snaps %p\n", inode);
retry:
list_for_each(p, &ci->i_cap_snaps) {
- snapcap = list_entry(p, struct ceph_cap_snap, ci_item);
- if (snapcap->follows <= follows)
+ capsnap = list_entry(p, struct ceph_cap_snap, ci_item);
+ if (capsnap->follows <= follows)
continue;
- if (snapcap->dirty || snapcap->writing)
+ if (capsnap->dirty || capsnap->writing)
continue;
/* pick mds, take s_mutex */
goto retry;
}
- follows = snapcap->follows;
- size = snapcap->size;
- atime = snapcap->atime;
- mtime = snapcap->mtime;
- ctime = snapcap->ctime;
- issued = snapcap->issued;
+ follows = capsnap->follows;
+ size = capsnap->size;
+ atime = capsnap->atime;
+ mtime = capsnap->mtime;
+ ctime = capsnap->ctime;
+ issued = capsnap->issued;
spin_unlock(&inode->i_lock);
+ dout(10, "flush_snaps cap_snap %p follows %lld size %llu\n",
+ capsnap, follows, size);
+
send_cap(mdsc, ceph_vino(inode).ino,
CEPH_CAP_OP_FLUSHSNAP, issued, 0, 0, mseq,
size, 0, &mtime, &atime, 0,
}
}
BUG_ON(!capsnap);
- dout(30, "put_wrbuffer_cap_refs on %p snap %lld %d/%d -> %d/%d"
- " %s/%s\n", inode, capsnap->context->seq,
+ dout(30, "put_wrbuffer_cap_refs on %p cap_snap %p "
+ " snap %lld %d/%d -> %d/%d %s%s\n",
+ inode, capsnap, capsnap->context->seq,
ci->i_wrbuffer_ref+nr, capsnap->dirty + nr,
ci->i_wrbuffer_ref, capsnap->dirty,
- last ? " wrbuffer last,":"",
- last_snap ? " capsnap last":"");
+ last ? " (wrbuffer last)":"",
+ last_snap ? " (capsnap last)":"");
}
spin_unlock(&inode->i_lock);
struct list_head *p;
struct ceph_cap_snap *capsnap;
- dout(10, "handle_cap_flushednsap inode %p ci %p mds%d follows %lld\n",
+ dout(10, "handle_cap_flushedsnap inode %p ci %p mds%d follows %lld\n",
inode, ci, session->s_mds, follows);
spin_lock(&inode->i_lock);
capsnap = list_entry(p, struct ceph_cap_snap, ci_item);
if (capsnap->follows == follows) {
WARN_ON(capsnap->dirty);
- dout(10, " removing capsnap %p follows %lld\n",
+ dout(10, " removing cap_snap %p follows %lld\n",
capsnap, follows);
ceph_put_snap_context(capsnap->context);
list_del(&capsnap->ci_item);
if (num) {
*dst = kmalloc(sizeof(u64) * num, GFP_NOFS);
if (!*dst)
- return -1;
+ return -ENOMEM;
for (i = 0; i < num; i++)
(*dst)[i] = le64_to_cpu(src[i]);
} else
capsnap->atime = inode->i_atime;
capsnap->ctime = inode->i_ctime;
if (used & CEPH_CAP_WRBUFFER) {
- dout(10, "queue_cap_snap %p snapc %p %llu used %d,"
- " WRBUFFER, delaying\n", inode, capsnap->context,
+ dout(10, "finish_cap_snap %p cap_snap %p snapc %p %llu used %d,"
+ " WRBUFFER, delaying\n", inode, capsnap, capsnap->context,
capsnap->context->seq, used);
} else {
BUG_ON(ci->i_wrbuffer_ref_head);
list_add(&capsnap->ci_item, &ci->i_cap_snaps);
if (used & CEPH_CAP_WR) {
- dout(10, "queue_cap_snap %p snapc %p seq %llu used WR,"
- " now pending\n", inode, snapc, snapc->seq);
+ dout(10, "queue_cap_snap %p cap_snap %p snapc %p"
+ " seq %llu used WR, now pending\n", inode,
+ capsnap, snapc, snapc->seq);
capsnap->writing = 1;
} else {
__ceph_finish_cap_snap(ci, capsnap, used);