was_dirty = PageDirty(page);
set_page_writeback(page);
err = ceph_osdc_writepages(osdc, ceph_vino(inode), &ci->i_layout,
+ ci->i_snaprealm->cached_context,
page_off, len, &page, 1);
if (err >= 0) {
if (was_dirty) {
rc = ceph_osdc_writepages(&client->osdc,
ceph_vino(inode),
&ci->i_layout,
+ ci->i_snaprealm->cached_context,
offset, len,
pagep,
locked_pages);
struct page **pagep, void **fsdata)
{
struct inode *inode = file->f_dentry->d_inode;
- struct ceph_inode_info *ci;
+ struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->osdc;
struct page *page;
pgoff_t index = pos >> PAGE_CACHE_SHIFT;
dout(10, "write_begin file %p inode %p page %p %d~%d\n", file,
inode, page, (int)pos, (int)len);
+ /* build snap context */
+ if (!ci->i_snaprealm->cached_context) {
+ r = ceph_snaprealm_build_context(ci->i_snaprealm);
+ if (r < 0)
+ return r;
+ }
+
if (PageUptodate(page))
return 0;
/* we need to read it. */
/* or, do sub-page granularity dirty accounting? */
/* try to read the full page */
- ci = ceph_inode(inode);
r = ceph_osdc_readpage(osdc, ceph_vino(inode), &ci->i_layout,
page_off, PAGE_SIZE, page);
if (r < 0)
ret = ceph_osdc_sync_write(&client->osdc, ceph_vino(inode),
&ci->i_layout,
+ ci->i_snaprealm->cached_context,
pos, count, data);
if (ret > 0) {
pos += ret;
int ceph_add_cap(struct inode *inode,
struct ceph_mds_session *session,
int fmode, unsigned issued,
- unsigned seq, unsigned mseq)
+ unsigned seq, unsigned mseq,
+ void *snapblob, int snapblob_len)
{
int mds = session->s_mds;
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_inode_cap *cap, *new_cap = 0;
int i;
int is_new = 0;
+ struct ceph_snaprealm *realm = 0;
+ if (snapblob_len)
+ realm = ceph_update_snap_trace(ceph_inode_to_client(inode),
+ snapblob, snapblob+snapblob_len,
+ 0);
+
dout(10, "ceph_add_cap on %p mds%d cap %d seq %d\n", inode,
session->s_mds, issued, seq);
spin_lock(&inode->i_lock);
ci->i_cap_exporting_mds = -1;
}
}
+ if (!ci->i_snaprealm) {
+ ci->i_snaprealm = realm;
+ list_add(&ci->i_snaprealm_item, &realm->inodes_with_caps);
+ } else
+ ceph_put_snaprealm(realm);
dout(10, "add_cap inode %p (%llx.%llx) cap %xh now %xh seq %d mds%d\n",
inode, ceph_vinop(inode), issued, issued|cap->issued, seq, mds);
inode, ci, mds, mseq);
}
- ceph_add_cap(inode, session, -1, issued, seq, mseq);
+ ceph_add_cap(inode, session, -1, issued, seq, mseq, 0, 0); /* FIXME */
}
if (err)
goto done;
if (result == 0) {
- /* snap trace? */
- if (rinfo->snapblob_len)
- ceph_update_snap_trace(mdsc->client, rinfo->snapblob,
- rinfo->snapblob+rinfo->snapblob_len, 0);
-
/* caps? */
if (req->r_expects_cap && req->r_last_inode) {
cap = le32_to_cpu(rinfo->head->file_caps);
err = ceph_add_cap(req->r_last_inode,
req->r_session,
req->r_fmode,
- cap, capseq, mseq);
+ cap, capseq, mseq,
+ rinfo->snapblob,
+ rinfo->snapblob_len);
if (err)
goto done;
}
}
}
-struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int op)
+struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int op,
+ struct ceph_snap_context *snapc)
{
struct ceph_msg *req;
struct ceph_osd_request_head *head;
+ size_t size = sizeof(struct ceph_osd_request_head);
- req = ceph_msg_new(CEPH_MSG_OSD_OP,
- sizeof(struct ceph_osd_request_head), 0, 0, 0);
+ if (snapc)
+ size += sizeof(u64) + snapc->num_snaps;
+ req = ceph_msg_new(CEPH_MSG_OSD_OP, size, 0, 0, 0);
if (IS_ERR(req))
return req;
memset(req->front.iov_base, 0, req->front.iov_len);
head->client_inc = 1; /* always, for now. */
head->flags = 0;
+ /* snaps */
+ if (snapc) {
+ head->snap_seq = cpu_to_le64(snapc->seq);
+ head->num_snaps = cpu_to_le32(snapc->num_snaps);
+ memcpy(req->front.iov_base + sizeof(*head), snapc->snaps,
+ snapc->num_snaps*sizeof(u64));
+ dout(10, "snapc seq %lld %d snaps\n", snapc->seq,
+ snapc->num_snaps);
+ }
return req;
}
-static struct ceph_osd_request *alloc_request(int nr_pages,
+static struct ceph_osd_request *alloc_request(int num_pages,
struct ceph_msg *msg)
{
struct ceph_osd_request *req;
- req = kmalloc(sizeof(*req) + nr_pages*sizeof(void *), GFP_NOFS);
+ req = kmalloc(sizeof(*req) + num_pages*sizeof(void *), GFP_NOFS);
if (req == NULL)
return ERR_PTR(-ENOMEM);
req->r_aborted = 0;
req->r_request = msg;
- req->r_nr_pages = nr_pages;
+ req->r_num_pages = num_pages;
atomic_set(&req->r_ref, 1);
memset(&req->r_last_osd, 0, sizeof(req->r_last_osd));
return req;
goto out;
}
dout(10, "prepare_pages tid %llu have %d pages, want %d\n",
- tid, req->r_nr_pages, want);
- if (likely(req->r_nr_pages >= want)) {
+ tid, req->r_num_pages, want);
+ if (likely(req->r_num_pages >= want)) {
m->pages = req->r_pages;
- m->nr_pages = req->r_nr_pages;
+ m->nr_pages = req->r_num_pages;
ceph_msg_get(m);
req->r_reply = m;
ret = 0; /* success */
{
struct ceph_msg *reqm;
struct ceph_osd_request *req;
- int nr_pages, i, po, left, l;
+ int num_pages, i, po, left, l;
__s32 rc;
dout(10, "sync_read on vino %llx.%llx at %llu~%llu\n", vino.ino,
vino.snap, off, len);
/* request msg */
- reqm = new_request_msg(osdc, CEPH_OSD_OP_READ);
+ reqm = new_request_msg(osdc, CEPH_OSD_OP_READ, 0);
if (IS_ERR(reqm))
return PTR_ERR(reqm);
- nr_pages = calc_pages_for(off, len);
- req = alloc_request(nr_pages, reqm);
+ num_pages = calc_pages_for(off, len);
+ req = alloc_request(num_pages, reqm);
if (IS_ERR(req))
return PTR_ERR(req);
len = calc_layout(osdc, vino, layout, off, len, req);
- nr_pages = calc_pages_for(off, len); /* recalc */
- dout(10, "sync_read %llu~%llu -> %d pages\n", off, len, nr_pages);
+ num_pages = calc_pages_for(off, len); /* recalc */
+ dout(10, "sync_read %llu~%llu -> %d pages\n", off, len, num_pages);
/* allocate temp pages to hold data */
- for (i = 0; i < nr_pages; i++) {
+ for (i = 0; i < num_pages; i++) {
req->r_pages[i] = alloc_page(GFP_NOFS);
if (req->r_pages[i] == NULL) {
- req->r_nr_pages = i+1;
+ req->r_num_pages = i+1;
put_request(req);
return -ENOMEM;
}
}
- reqm->nr_pages = nr_pages;
+ reqm->nr_pages = num_pages;
reqm->pages = req->r_pages;
reqm->hdr.data_len = cpu_to_le32(len);
reqm->hdr.data_off = cpu_to_le32(off);
vino.snap, off, len);
/* request msg */
- reqm = new_request_msg(osdc, CEPH_OSD_OP_READ);
+ reqm = new_request_msg(osdc, CEPH_OSD_OP_READ, 0);
if (IS_ERR(reqm))
return PTR_ERR(reqm);
reqhead = reqm->front.iov_base;
struct address_space *mapping,
struct ceph_vino vino, struct ceph_file_layout *layout,
__u64 off, __u64 len,
- struct list_head *page_list, int nr_pages)
+ struct list_head *page_list, int num_pages)
{
struct ceph_msg *reqm;
struct ceph_osd_request *req;
* for now, our strategy is simple: start with the
* initial page, and fetch as much of that object as
* we can that falls within the range specified by
- * nr_pages.
+ * num_pages.
*/
dout(10, "readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
vino.snap, off, len);
/* alloc request, w/ optimistically-sized page vector */
- reqm = new_request_msg(osdc, CEPH_OSD_OP_READ);
+ reqm = new_request_msg(osdc, CEPH_OSD_OP_READ, 0);
if (IS_ERR(reqm))
return PTR_ERR(reqm);
- req = alloc_request(nr_pages, reqm);
+ req = alloc_request(num_pages, reqm);
if (req == 0) {
ceph_msg_put(reqm);
return -ENOMEM;
} else
break;
}
- dout(10, "readpages found %d/%d contig\n", contig_pages, nr_pages);
+ dout(10, "readpages found %d/%d contig\n", contig_pages, num_pages);
if (contig_pages == 0)
goto out;
len = min((contig_pages << PAGE_CACHE_SHIFT) - (off & ~PAGE_CACHE_MASK),
/* request msg */
len = calc_layout(osdc, vino, layout, off, len, req);
- req->r_nr_pages = calc_pages_for(off, len);
+ req->r_num_pages = calc_pages_for(off, len);
dout(10, "readpages final extent is %llu~%llu -> %d pages\n",
- off, len, req->r_nr_pages);
+ off, len, req->r_num_pages);
rc = do_request(osdc, req);
out:
*/
int ceph_osdc_sync_write(struct ceph_osd_client *osdc, struct ceph_vino vino,
struct ceph_file_layout *layout,
+ struct ceph_snap_context *snapc,
__u64 off, __u64 len, const char __user *data)
{
struct ceph_msg *reqm;
struct ceph_osd_request_head *reqhead;
struct ceph_osd_request *req;
- int nr_pages, i, po, l, left;
+ int num_pages, i, po, l, left;
__s32 rc;
dout(10, "sync_write on ino %llx.%llx at %llu~%llu\n", vino.ino,
vino.snap, off, len);
/* request msg */
- reqm = new_request_msg(osdc, CEPH_OSD_OP_WRITE);
+ reqm = new_request_msg(osdc, CEPH_OSD_OP_WRITE, snapc);
if (IS_ERR(reqm))
return PTR_ERR(reqm);
reqhead = reqm->front.iov_base;
reqhead->flags = CEPH_OSD_OP_ACK; /* just ack.. FIXME */
/* how many pages? */
- nr_pages = calc_pages_for(off, len);
- req = alloc_request(nr_pages, reqm);
+ num_pages = calc_pages_for(off, len);
+ req = alloc_request(num_pages, reqm);
if (IS_ERR(req)) {
ceph_msg_put(reqm);
return PTR_ERR(req);
}
len = calc_layout(osdc, vino, layout, off, len, req);
- nr_pages = calc_pages_for(off, len); /* recalc */
- dout(10, "sync_write %llu~%llu -> %d pages\n", off, len, nr_pages);
+ num_pages = calc_pages_for(off, len); /* recalc */
+ dout(10, "sync_write %llu~%llu -> %d pages\n", off, len, num_pages);
/* copy data into a set of pages */
left = len;
po = off & ~PAGE_MASK;
rc = -EFAULT;
- for (i = 0; i < nr_pages; i++) {
+ for (i = 0; i < num_pages; i++) {
int bad;
req->r_pages[i] = alloc_page(GFP_NOFS);
if (req->r_pages[i] == NULL) {
- req->r_nr_pages = i+1;
+ req->r_num_pages = i+1;
put_request(req);
return -ENOMEM;
}
}
}
reqm->pages = req->r_pages;
- reqm->nr_pages = nr_pages;
+ reqm->nr_pages = num_pages;
reqm->hdr.data_len = cpu_to_le32(len);
reqm->hdr.data_off = cpu_to_le32(off);
*/
int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
struct ceph_file_layout *layout,
+ struct ceph_snap_context *snapc,
loff_t off, loff_t len,
- struct page **pages, int nr_pages)
+ struct page **pages, int num_pages)
{
struct ceph_msg *reqm;
struct ceph_osd_request_head *reqhead;
BUG_ON(vino.snap != CEPH_NOSNAP);
/* request + msg */
- reqm = new_request_msg(osdc, CEPH_OSD_OP_WRITE);
+ reqm = new_request_msg(osdc, CEPH_OSD_OP_WRITE, snapc);
if (IS_ERR(reqm))
return PTR_ERR(reqm);
- req = alloc_request(nr_pages, reqm);
+ req = alloc_request(num_pages, reqm);
if (IS_ERR(req)) {
ceph_msg_put(reqm);
return PTR_ERR(req);
reqhead->flags = CEPH_OSD_OP_SAFE;
len = calc_layout(osdc, vino, layout, off, len, req);
- nr_pages = calc_pages_for(off, len);
- dout(10, "writepages %llu~%llu -> %d pages\n", off, len, nr_pages);
-
+ num_pages = calc_pages_for(off, len);
+ dout(10, "writepages %llu~%llu -> %d pages\n", off, len, num_pages);
+
/* copy pages */
- memcpy(req->r_pages, pages, nr_pages * sizeof(struct page *));
+ memcpy(req->r_pages, pages, num_pages * sizeof(struct page *));
reqm->pages = req->r_pages;
- reqm->nr_pages = req->r_nr_pages = nr_pages;
+ reqm->nr_pages = req->r_num_pages = num_pages;
reqm->hdr.data_len = len;
reqm->hdr.data_off = off;
#include "osdmap.h"
struct ceph_msg;
+struct ceph_snap_context;
/*
* pending request
int r_result;
atomic_t r_ref;
struct completion r_completion; /* on ack or commit or read? */
- unsigned r_nr_pages; /* size of page array (follows) */
+ unsigned r_num_pages; /* size of page array (follows) */
struct page *r_pages[0]; /* pages for data payload */
};
struct ceph_file_layout *layout,
__u64 off, __u64 len,
struct list_head *page_list, int nr_pages);
-extern int ceph_osdc_prepare_write(struct ceph_osd_client *osdc,
- struct ceph_vino vino,
- struct ceph_file_layout *layout,
- loff_t off, loff_t len,
- struct page *page);
-extern int ceph_osdc_commit_write(struct ceph_osd_client *osdc,
- struct ceph_vino vino,
- struct ceph_file_layout *layout,
- loff_t off, loff_t len,
- struct page *page);
+
extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
struct ceph_vino vino,
struct ceph_file_layout *layout,
+ struct ceph_snap_context *sc,
loff_t off, loff_t len,
struct page **pagevec, int nr_pages);
extern int ceph_osdc_sync_write(struct ceph_osd_client *osdc,
struct ceph_vino vino,
struct ceph_file_layout *layout,
+ struct ceph_snap_context *sc,
__u64 off, __u64 len,
const char __user *data);
-extern int ceph_osdc_prepare_write(struct ceph_osd_client *osdc,
- struct ceph_vino vino,
- struct ceph_file_layout *layout,
- loff_t off, loff_t len,
- struct page *page);
-extern int ceph_osdc_commit_write(struct ceph_osd_client *osdc,
- struct ceph_vino vino,
- struct ceph_file_layout *layout,
- loff_t off, loff_t len,
- struct page *page);
-extern int ceph_osdc_writepage(struct ceph_osd_client *osdc,
- struct ceph_vino vino,
- struct ceph_file_layout *layout,
- loff_t off, loff_t len,
- struct page *page);
-
#endif
if (realm->nref == 0) {
kfree(realm->prior_parent_snaps);
kfree(realm->snaps);
- kfree(realm->cached_snaps);
+ ceph_put_snap_context(realm->cached_context);
kfree(realm);
}
}
int ceph_snaprealm_build_context(struct ceph_snaprealm *realm)
{
struct ceph_snaprealm *parent = realm->parent;
+ struct ceph_snap_context *sc;
int err = 0;
int i;
int num = realm->num_prior_parent_snaps + realm->num_snaps;
if (parent) {
- if (!parent->cached_seq) {
+ if (!parent->cached_context) {
err = ceph_snaprealm_build_context(parent);
if (err)
goto fail;
}
- num += parent->num_cached_snaps; /* possible overestimate */
+ num += parent->cached_context->num_snaps;
}
- if (realm->cached_snaps)
- kfree(realm->cached_snaps);
+ if (realm->cached_context)
+ ceph_put_snap_context(realm->cached_context);
err = -ENOMEM;
- realm->cached_snaps = kmalloc(num * sizeof(u64), GFP_NOFS);
- if (!realm->cached_snaps)
+ realm->cached_context = sc = kzalloc(sizeof(*sc) + num*sizeof(u64),
+ GFP_NOFS);
+ if (!realm->cached_context)
goto fail;
+ atomic_set(&sc->nref, 1);
/* build (reverse sorted) snap vector */
num = 0;
- realm->cached_seq = realm->seq;
+ sc->seq = realm->seq;
if (parent) {
- for (i = 0; i < parent->num_cached_snaps; i++)
- if (parent->cached_snaps[i] >= realm->parent_since)
- realm->cached_snaps[num++] =
- parent->cached_snaps[i];
- if (parent->cached_seq > realm->cached_seq)
- realm->cached_seq = parent->cached_seq;
+ for (i = 0; i < parent->cached_context->num_snaps; i++)
+ if (parent->cached_context->snaps[i] >=
+ realm->parent_since)
+ sc->snaps[num++] =
+ parent->cached_context->snaps[i];
+ if (parent->cached_context->seq > sc->seq)
+ sc->seq = parent->cached_context->seq;
}
- memcpy(realm->cached_snaps + num, realm->snaps,
+ memcpy(sc->snaps + num, realm->snaps,
sizeof(u64)*realm->num_snaps);
num += realm->num_snaps;
- memcpy(realm->cached_snaps + num, realm->prior_parent_snaps,
+ memcpy(sc->snaps + num, realm->prior_parent_snaps,
sizeof(u64)*realm->num_prior_parent_snaps);
num += realm->num_prior_parent_snaps;
- sort(realm->cached_snaps, num, sizeof(u64), cmpu64_rev, NULL);
- realm->num_cached_snaps = num;
+ sort(sc->snaps, num, sizeof(u64), cmpu64_rev, NULL);
+ sc->num_snaps = num;
dout(10, "snaprealm_build_context %llx %p : seq %lld %d snaps\n",
- realm->ino, realm, realm->cached_seq, realm->num_cached_snaps);
+ realm->ino, realm, sc->seq, sc->num_snaps);
return 0;
fail:
- if (realm->cached_snaps) {
- kfree(realm->cached_snaps);
- realm->cached_snaps = 0;
+ if (realm->cached_context) {
+ ceph_put_snap_context(realm->cached_context);
+ realm->cached_context = 0;
}
derr(0, "snaprealm_build_context %llx %p fail %d\n", realm->ino,
realm, err);
struct ceph_snaprealm *child;
dout(10, "invalidate_snaprealm %llx %p\n", realm->ino, realm);
- realm->cached_seq = 0;
-
+ if (realm->cached_context) {
+ ceph_put_snap_context(realm->cached_context);
+ realm->cached_context = 0;
+ }
+
list_for_each(p, &realm->children) {
child = list_entry(p, struct ceph_snaprealm, child_item);
ceph_invalidate_snaprealm(child);
static int dup_array(u64 **dst, u64 *src, int num)
{
+ int i;
+
if (*dst)
kfree(*dst);
if (num) {
*dst = kmalloc(sizeof(u64) * num, GFP_NOFS);
if (!*dst)
return -1;
- memcpy(*dst, src, sizeof(u64) * num);
+ for (i = 0; i < num; i++)
+ (*dst)[i] = le64_to_cpu(src[i]);
} else
*dst = 0;
return 0;
}
-u64 ceph_update_snap_trace(struct ceph_client *client,
- void *p, void *e, int must_flush)
+struct ceph_snaprealm *ceph_update_snap_trace(struct ceph_client *client,
+ void *p, void *e, int must_flush)
{
struct ceph_mds_snap_realm *ri;
int err = -ENOMEM;
- u64 first = 0;
u64 *snaps;
u64 *prior_parent_snaps;
- struct ceph_snaprealm *realm;
+ struct ceph_snaprealm *realm, *first = 0;
int invalidate;
more:
realm = ceph_get_snaprealm(client, le64_to_cpu(ri->ino));
if (!realm)
goto fail;
- if (!first)
- first = realm->ino;
+ if (!first) {
+ first = realm;
+ realm->nref++;
+ }
if (le64_to_cpu(ri->seq) > realm->seq) {
dout(10, "update_snap_trace updating %llx %p %lld -> %lld\n",
ci->i_hold_caps_until = 0;
INIT_LIST_HEAD(&ci->i_cap_delay_list);
+ ci->i_snaprealm = 0;
+
ci->i_hashval = 0;
INIT_WORK(&ci->i_wb_work, ceph_inode_writeback);
int i_rd_ref, i_rdcache_ref, i_wr_ref;
atomic_t i_wrbuffer_ref;
- struct ceph_snaprealm *snaprealm;
- struct list_head snaprealm_item;
+ struct ceph_snaprealm *i_snaprealm;
+ struct list_head i_snaprealm_item;
struct work_struct i_wb_work; /* writeback work */
* snapshots
*/
+struct ceph_snap_context {
+ atomic_t nref;
+ u64 seq;
+ int num_snaps;
+ u64 snaps[];
+};
+
+static inline void ceph_put_snap_context(struct ceph_snap_context *sc)
+{
+ if (!sc)
+ return;
+ if (atomic_dec_and_test(&sc->nref))
+ kfree(sc);
+}
+
struct ceph_snaprealm {
u64 ino;
int nref;
struct list_head child_item;
struct list_head children;
- /* cached snap context */
- u64 cached_seq; /* 0 => invalidated */
- u64 *cached_snaps;
- int num_cached_snaps;
+ struct ceph_snap_context *cached_context;
struct list_head inodes_with_caps;
};
extern void ceph_put_snaprealm(struct ceph_snaprealm *realm);
extern int ceph_adjust_snaprealm_parent(struct ceph_client *client,
struct ceph_snaprealm *realm, u64 p);
-extern u64 ceph_update_snap_trace(struct ceph_client *client,
- void *p, void *e, int must_flush);
+extern struct ceph_snaprealm *ceph_update_snap_trace(struct ceph_client *client,
+ void *p, void *e,
+ int must_flush);
extern int ceph_snaprealm_build_context(struct ceph_snaprealm *realm);
extern void ceph_invalidate_snaprealm(struct ceph_snaprealm *realm);
extern int ceph_add_cap(struct inode *inode,
struct ceph_mds_session *session,
int fmode, unsigned issued,
- unsigned cap, unsigned seq);
+ unsigned cap, unsigned seq,
+ void *snapblob, int snapblob_len);
extern void __ceph_remove_cap(struct ceph_inode_cap *cap);
extern void ceph_remove_cap(struct ceph_inode_cap *cap);
extern void ceph_remove_all_caps(struct ceph_inode_info *ci);