From 87ceca6e83372cc9ddc0e24a872ad1df36927ef8 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 11 Aug 2008 10:37:04 -0700 Subject: [PATCH] kclient: ceph_snap_context, pass context with osd writes --- src/kernel/addr.c | 12 ++++- src/kernel/file.c | 1 + src/kernel/inode.c | 16 ++++++- src/kernel/mds_client.c | 9 ++-- src/kernel/osd_client.c | 100 +++++++++++++++++++++++----------------- src/kernel/osd_client.h | 32 ++----------- src/kernel/snap.c | 73 ++++++++++++++++------------- src/kernel/super.c | 2 + src/kernel/super.h | 32 +++++++++---- 9 files changed, 157 insertions(+), 120 deletions(-) diff --git a/src/kernel/addr.c b/src/kernel/addr.c index 298ca22393a8f..776121d20f6ae 100644 --- a/src/kernel/addr.c +++ b/src/kernel/addr.c @@ -212,6 +212,7 @@ static int ceph_writepage(struct page *page, struct writeback_control *wbc) was_dirty = PageDirty(page); set_page_writeback(page); err = ceph_osdc_writepages(osdc, ceph_vino(inode), &ci->i_layout, + ci->i_snaprealm->cached_context, page_off, len, &page, 1); if (err >= 0) { if (was_dirty) { @@ -431,6 +432,7 @@ get_more_pages: rc = ceph_osdc_writepages(&client->osdc, ceph_vino(inode), &ci->i_layout, + ci->i_snaprealm->cached_context, offset, len, pagep, locked_pages); @@ -504,7 +506,7 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping, struct page **pagep, void **fsdata) { struct inode *inode = file->f_dentry->d_inode; - struct ceph_inode_info *ci; + struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->osdc; struct page *page; pgoff_t index = pos >> PAGE_CACHE_SHIFT; @@ -523,6 +525,13 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping, dout(10, "write_begin file %p inode %p page %p %d~%d\n", file, inode, page, (int)pos, (int)len); + /* build snap context */ + if (!ci->i_snaprealm->cached_context) { + r = ceph_snaprealm_build_context(ci->i_snaprealm); + if (r < 0) + return r; + } + if (PageUptodate(page)) return 0; @@ -541,7 +550,6 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping, /* we need to read it. */ /* or, do sub-page granularity dirty accounting? */ /* try to read the full page */ - ci = ceph_inode(inode); r = ceph_osdc_readpage(osdc, ceph_vino(inode), &ci->i_layout, page_off, PAGE_SIZE, page); if (r < 0) diff --git a/src/kernel/file.c b/src/kernel/file.c index 16077650499be..3141c66bad02d 100644 --- a/src/kernel/file.c +++ b/src/kernel/file.c @@ -237,6 +237,7 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data, ret = ceph_osdc_sync_write(&client->osdc, ceph_vino(inode), &ci->i_layout, + ci->i_snaprealm->cached_context, pos, count, data); if (ret > 0) { pos += ret; diff --git a/src/kernel/inode.c b/src/kernel/inode.c index 19a3a1fc85583..454e25311f479 100644 --- a/src/kernel/inode.c +++ b/src/kernel/inode.c @@ -1104,14 +1104,21 @@ int ceph_get_cap_mds(struct inode *inode) int ceph_add_cap(struct inode *inode, struct ceph_mds_session *session, int fmode, unsigned issued, - unsigned seq, unsigned mseq) + unsigned seq, unsigned mseq, + void *snapblob, int snapblob_len) { int mds = session->s_mds; struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_cap *cap, *new_cap = 0; int i; int is_new = 0; + struct ceph_snaprealm *realm = 0; + if (snapblob_len) + realm = ceph_update_snap_trace(ceph_inode_to_client(inode), + snapblob, snapblob+snapblob_len, + 0); + dout(10, "ceph_add_cap on %p mds%d cap %d seq %d\n", inode, session->s_mds, issued, seq); spin_lock(&inode->i_lock); @@ -1154,6 +1161,11 @@ int ceph_add_cap(struct inode *inode, ci->i_cap_exporting_mds = -1; } } + if (!ci->i_snaprealm) { + ci->i_snaprealm = realm; + list_add(&ci->i_snaprealm_item, &realm->inodes_with_caps); + } else + ceph_put_snaprealm(realm); dout(10, "add_cap inode %p (%llx.%llx) cap %xh now %xh seq %d mds%d\n", inode, ceph_vinop(inode), issued, issued|cap->issued, seq, mds); @@ -1679,7 +1691,7 @@ void ceph_handle_cap_import(struct inode *inode, struct ceph_mds_caps *im, inode, ci, mds, mseq); } - ceph_add_cap(inode, session, -1, issued, seq, mseq); + ceph_add_cap(inode, session, -1, issued, seq, mseq, 0, 0); /* FIXME */ } diff --git a/src/kernel/mds_client.c b/src/kernel/mds_client.c index b2b5e6959a12e..f7cc3539227c8 100644 --- a/src/kernel/mds_client.c +++ b/src/kernel/mds_client.c @@ -1178,11 +1178,6 @@ void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg) if (err) goto done; if (result == 0) { - /* snap trace? */ - if (rinfo->snapblob_len) - ceph_update_snap_trace(mdsc->client, rinfo->snapblob, - rinfo->snapblob+rinfo->snapblob_len, 0); - /* caps? */ if (req->r_expects_cap && req->r_last_inode) { cap = le32_to_cpu(rinfo->head->file_caps); @@ -1191,7 +1186,9 @@ void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg) err = ceph_add_cap(req->r_last_inode, req->r_session, req->r_fmode, - cap, capseq, mseq); + cap, capseq, mseq, + rinfo->snapblob, + rinfo->snapblob_len); if (err) goto done; } diff --git a/src/kernel/osd_client.c b/src/kernel/osd_client.c index fb29e1d5da973..9cbf41d926b7c 100644 --- a/src/kernel/osd_client.c +++ b/src/kernel/osd_client.c @@ -43,13 +43,16 @@ static void put_request(struct ceph_osd_request *req) } } -struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int op) +struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int op, + struct ceph_snap_context *snapc) { struct ceph_msg *req; struct ceph_osd_request_head *head; + size_t size = sizeof(struct ceph_osd_request_head); - req = ceph_msg_new(CEPH_MSG_OSD_OP, - sizeof(struct ceph_osd_request_head), 0, 0, 0); + if (snapc) + size += sizeof(u64) + snapc->num_snaps; + req = ceph_msg_new(CEPH_MSG_OSD_OP, size, 0, 0, 0); if (IS_ERR(req)) return req; memset(req->front.iov_base, 0, req->front.iov_len); @@ -60,20 +63,29 @@ struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int op) head->client_inc = 1; /* always, for now. */ head->flags = 0; + /* snaps */ + if (snapc) { + head->snap_seq = cpu_to_le64(snapc->seq); + head->num_snaps = cpu_to_le32(snapc->num_snaps); + memcpy(req->front.iov_base + sizeof(*head), snapc->snaps, + snapc->num_snaps*sizeof(u64)); + dout(10, "snapc seq %lld %d snaps\n", snapc->seq, + snapc->num_snaps); + } return req; } -static struct ceph_osd_request *alloc_request(int nr_pages, +static struct ceph_osd_request *alloc_request(int num_pages, struct ceph_msg *msg) { struct ceph_osd_request *req; - req = kmalloc(sizeof(*req) + nr_pages*sizeof(void *), GFP_NOFS); + req = kmalloc(sizeof(*req) + num_pages*sizeof(void *), GFP_NOFS); if (req == NULL) return ERR_PTR(-ENOMEM); req->r_aborted = 0; req->r_request = msg; - req->r_nr_pages = nr_pages; + req->r_num_pages = num_pages; atomic_set(&req->r_ref, 1); memset(&req->r_last_osd, 0, sizeof(req->r_last_osd)); return req; @@ -440,10 +452,10 @@ int ceph_osdc_prepare_pages(void *p, struct ceph_msg *m, int want) goto out; } dout(10, "prepare_pages tid %llu have %d pages, want %d\n", - tid, req->r_nr_pages, want); - if (likely(req->r_nr_pages >= want)) { + tid, req->r_num_pages, want); + if (likely(req->r_num_pages >= want)) { m->pages = req->r_pages; - m->nr_pages = req->r_nr_pages; + m->nr_pages = req->r_num_pages; ceph_msg_get(m); req->r_reply = m; ret = 0; /* success */ @@ -588,36 +600,36 @@ int ceph_osdc_sync_read(struct ceph_osd_client *osdc, struct ceph_vino vino, { struct ceph_msg *reqm; struct ceph_osd_request *req; - int nr_pages, i, po, left, l; + int num_pages, i, po, left, l; __s32 rc; dout(10, "sync_read on vino %llx.%llx at %llu~%llu\n", vino.ino, vino.snap, off, len); /* request msg */ - reqm = new_request_msg(osdc, CEPH_OSD_OP_READ); + reqm = new_request_msg(osdc, CEPH_OSD_OP_READ, 0); if (IS_ERR(reqm)) return PTR_ERR(reqm); - nr_pages = calc_pages_for(off, len); - req = alloc_request(nr_pages, reqm); + num_pages = calc_pages_for(off, len); + req = alloc_request(num_pages, reqm); if (IS_ERR(req)) return PTR_ERR(req); len = calc_layout(osdc, vino, layout, off, len, req); - nr_pages = calc_pages_for(off, len); /* recalc */ - dout(10, "sync_read %llu~%llu -> %d pages\n", off, len, nr_pages); + num_pages = calc_pages_for(off, len); /* recalc */ + dout(10, "sync_read %llu~%llu -> %d pages\n", off, len, num_pages); /* allocate temp pages to hold data */ - for (i = 0; i < nr_pages; i++) { + for (i = 0; i < num_pages; i++) { req->r_pages[i] = alloc_page(GFP_NOFS); if (req->r_pages[i] == NULL) { - req->r_nr_pages = i+1; + req->r_num_pages = i+1; put_request(req); return -ENOMEM; } } - reqm->nr_pages = nr_pages; + reqm->nr_pages = num_pages; reqm->pages = req->r_pages; reqm->hdr.data_len = cpu_to_le32(len); reqm->hdr.data_off = cpu_to_le32(off); @@ -673,7 +685,7 @@ int ceph_osdc_readpage(struct ceph_osd_client *osdc, struct ceph_vino vino, vino.snap, off, len); /* request msg */ - reqm = new_request_msg(osdc, CEPH_OSD_OP_READ); + reqm = new_request_msg(osdc, CEPH_OSD_OP_READ, 0); if (IS_ERR(reqm)) return PTR_ERR(reqm); reqhead = reqm->front.iov_base; @@ -704,7 +716,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, struct address_space *mapping, struct ceph_vino vino, struct ceph_file_layout *layout, __u64 off, __u64 len, - struct list_head *page_list, int nr_pages) + struct list_head *page_list, int num_pages) { struct ceph_msg *reqm; struct ceph_osd_request *req; @@ -717,16 +729,16 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, * for now, our strategy is simple: start with the * initial page, and fetch as much of that object as * we can that falls within the range specified by - * nr_pages. + * num_pages. */ dout(10, "readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, vino.snap, off, len); /* alloc request, w/ optimistically-sized page vector */ - reqm = new_request_msg(osdc, CEPH_OSD_OP_READ); + reqm = new_request_msg(osdc, CEPH_OSD_OP_READ, 0); if (IS_ERR(reqm)) return PTR_ERR(reqm); - req = alloc_request(nr_pages, reqm); + req = alloc_request(num_pages, reqm); if (req == 0) { ceph_msg_put(reqm); return -ENOMEM; @@ -743,7 +755,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, } else break; } - dout(10, "readpages found %d/%d contig\n", contig_pages, nr_pages); + dout(10, "readpages found %d/%d contig\n", contig_pages, num_pages); if (contig_pages == 0) goto out; len = min((contig_pages << PAGE_CACHE_SHIFT) - (off & ~PAGE_CACHE_MASK), @@ -752,9 +764,9 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, /* request msg */ len = calc_layout(osdc, vino, layout, off, len, req); - req->r_nr_pages = calc_pages_for(off, len); + req->r_num_pages = calc_pages_for(off, len); dout(10, "readpages final extent is %llu~%llu -> %d pages\n", - off, len, req->r_nr_pages); + off, len, req->r_num_pages); rc = do_request(osdc, req); out: @@ -771,45 +783,46 @@ out: */ int ceph_osdc_sync_write(struct ceph_osd_client *osdc, struct ceph_vino vino, struct ceph_file_layout *layout, + struct ceph_snap_context *snapc, __u64 off, __u64 len, const char __user *data) { struct ceph_msg *reqm; struct ceph_osd_request_head *reqhead; struct ceph_osd_request *req; - int nr_pages, i, po, l, left; + int num_pages, i, po, l, left; __s32 rc; dout(10, "sync_write on ino %llx.%llx at %llu~%llu\n", vino.ino, vino.snap, off, len); /* request msg */ - reqm = new_request_msg(osdc, CEPH_OSD_OP_WRITE); + reqm = new_request_msg(osdc, CEPH_OSD_OP_WRITE, snapc); if (IS_ERR(reqm)) return PTR_ERR(reqm); reqhead = reqm->front.iov_base; reqhead->flags = CEPH_OSD_OP_ACK; /* just ack.. FIXME */ /* how many pages? */ - nr_pages = calc_pages_for(off, len); - req = alloc_request(nr_pages, reqm); + num_pages = calc_pages_for(off, len); + req = alloc_request(num_pages, reqm); if (IS_ERR(req)) { ceph_msg_put(reqm); return PTR_ERR(req); } len = calc_layout(osdc, vino, layout, off, len, req); - nr_pages = calc_pages_for(off, len); /* recalc */ - dout(10, "sync_write %llu~%llu -> %d pages\n", off, len, nr_pages); + num_pages = calc_pages_for(off, len); /* recalc */ + dout(10, "sync_write %llu~%llu -> %d pages\n", off, len, num_pages); /* copy data into a set of pages */ left = len; po = off & ~PAGE_MASK; rc = -EFAULT; - for (i = 0; i < nr_pages; i++) { + for (i = 0; i < num_pages; i++) { int bad; req->r_pages[i] = alloc_page(GFP_NOFS); if (req->r_pages[i] == NULL) { - req->r_nr_pages = i+1; + req->r_num_pages = i+1; put_request(req); return -ENOMEM; } @@ -827,7 +840,7 @@ int ceph_osdc_sync_write(struct ceph_osd_client *osdc, struct ceph_vino vino, } } reqm->pages = req->r_pages; - reqm->nr_pages = nr_pages; + reqm->nr_pages = num_pages; reqm->hdr.data_len = cpu_to_le32(len); reqm->hdr.data_off = cpu_to_le32(off); @@ -845,8 +858,9 @@ out: */ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, struct ceph_file_layout *layout, + struct ceph_snap_context *snapc, loff_t off, loff_t len, - struct page **pages, int nr_pages) + struct page **pages, int num_pages) { struct ceph_msg *reqm; struct ceph_osd_request_head *reqhead; @@ -856,10 +870,10 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, BUG_ON(vino.snap != CEPH_NOSNAP); /* request + msg */ - reqm = new_request_msg(osdc, CEPH_OSD_OP_WRITE); + reqm = new_request_msg(osdc, CEPH_OSD_OP_WRITE, snapc); if (IS_ERR(reqm)) return PTR_ERR(reqm); - req = alloc_request(nr_pages, reqm); + req = alloc_request(num_pages, reqm); if (IS_ERR(req)) { ceph_msg_put(reqm); return PTR_ERR(req); @@ -872,13 +886,13 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, reqhead->flags = CEPH_OSD_OP_SAFE; len = calc_layout(osdc, vino, layout, off, len, req); - nr_pages = calc_pages_for(off, len); - dout(10, "writepages %llu~%llu -> %d pages\n", off, len, nr_pages); - + num_pages = calc_pages_for(off, len); + dout(10, "writepages %llu~%llu -> %d pages\n", off, len, num_pages); + /* copy pages */ - memcpy(req->r_pages, pages, nr_pages * sizeof(struct page *)); + memcpy(req->r_pages, pages, num_pages * sizeof(struct page *)); reqm->pages = req->r_pages; - reqm->nr_pages = req->r_nr_pages = nr_pages; + reqm->nr_pages = req->r_num_pages = num_pages; reqm->hdr.data_len = len; reqm->hdr.data_off = off; diff --git a/src/kernel/osd_client.h b/src/kernel/osd_client.h index 72e942e027210..20cf1ac4dca32 100644 --- a/src/kernel/osd_client.h +++ b/src/kernel/osd_client.h @@ -11,6 +11,7 @@ #include "osdmap.h" struct ceph_msg; +struct ceph_snap_context; /* * pending request @@ -32,7 +33,7 @@ struct ceph_osd_request { int r_result; atomic_t r_ref; struct completion r_completion; /* on ack or commit or read? */ - unsigned r_nr_pages; /* size of page array (follows) */ + unsigned r_num_pages; /* size of page array (follows) */ struct page *r_pages[0]; /* pages for data payload */ }; @@ -72,19 +73,11 @@ extern int ceph_osdc_readpages(struct ceph_osd_client *osdc, struct ceph_file_layout *layout, __u64 off, __u64 len, struct list_head *page_list, int nr_pages); -extern int ceph_osdc_prepare_write(struct ceph_osd_client *osdc, - struct ceph_vino vino, - struct ceph_file_layout *layout, - loff_t off, loff_t len, - struct page *page); -extern int ceph_osdc_commit_write(struct ceph_osd_client *osdc, - struct ceph_vino vino, - struct ceph_file_layout *layout, - loff_t off, loff_t len, - struct page *page); + extern int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, struct ceph_file_layout *layout, + struct ceph_snap_context *sc, loff_t off, loff_t len, struct page **pagevec, int nr_pages); @@ -96,24 +89,9 @@ extern int ceph_osdc_sync_read(struct ceph_osd_client *osdc, extern int ceph_osdc_sync_write(struct ceph_osd_client *osdc, struct ceph_vino vino, struct ceph_file_layout *layout, + struct ceph_snap_context *sc, __u64 off, __u64 len, const char __user *data); -extern int ceph_osdc_prepare_write(struct ceph_osd_client *osdc, - struct ceph_vino vino, - struct ceph_file_layout *layout, - loff_t off, loff_t len, - struct page *page); -extern int ceph_osdc_commit_write(struct ceph_osd_client *osdc, - struct ceph_vino vino, - struct ceph_file_layout *layout, - loff_t off, loff_t len, - struct page *page); -extern int ceph_osdc_writepage(struct ceph_osd_client *osdc, - struct ceph_vino vino, - struct ceph_file_layout *layout, - loff_t off, loff_t len, - struct page *page); - #endif diff --git a/src/kernel/snap.c b/src/kernel/snap.c index 6eed93480fd51..74c396cf1d464 100644 --- a/src/kernel/snap.c +++ b/src/kernel/snap.c @@ -47,7 +47,7 @@ void ceph_put_snaprealm(struct ceph_snaprealm *realm) if (realm->nref == 0) { kfree(realm->prior_parent_snaps); kfree(realm->snaps); - kfree(realm->cached_snaps); + ceph_put_snap_context(realm->cached_context); kfree(realm); } } @@ -89,54 +89,58 @@ static int cmpu64_rev(const void *a, const void *b) int ceph_snaprealm_build_context(struct ceph_snaprealm *realm) { struct ceph_snaprealm *parent = realm->parent; + struct ceph_snap_context *sc; int err = 0; int i; int num = realm->num_prior_parent_snaps + realm->num_snaps; if (parent) { - if (!parent->cached_seq) { + if (!parent->cached_context) { err = ceph_snaprealm_build_context(parent); if (err) goto fail; } - num += parent->num_cached_snaps; /* possible overestimate */ + num += parent->cached_context->num_snaps; } - if (realm->cached_snaps) - kfree(realm->cached_snaps); + if (realm->cached_context) + ceph_put_snap_context(realm->cached_context); err = -ENOMEM; - realm->cached_snaps = kmalloc(num * sizeof(u64), GFP_NOFS); - if (!realm->cached_snaps) + realm->cached_context = sc = kzalloc(sizeof(*sc) + num*sizeof(u64), + GFP_NOFS); + if (!realm->cached_context) goto fail; + atomic_set(&sc->nref, 1); /* build (reverse sorted) snap vector */ num = 0; - realm->cached_seq = realm->seq; + sc->seq = realm->seq; if (parent) { - for (i = 0; i < parent->num_cached_snaps; i++) - if (parent->cached_snaps[i] >= realm->parent_since) - realm->cached_snaps[num++] = - parent->cached_snaps[i]; - if (parent->cached_seq > realm->cached_seq) - realm->cached_seq = parent->cached_seq; + for (i = 0; i < parent->cached_context->num_snaps; i++) + if (parent->cached_context->snaps[i] >= + realm->parent_since) + sc->snaps[num++] = + parent->cached_context->snaps[i]; + if (parent->cached_context->seq > sc->seq) + sc->seq = parent->cached_context->seq; } - memcpy(realm->cached_snaps + num, realm->snaps, + memcpy(sc->snaps + num, realm->snaps, sizeof(u64)*realm->num_snaps); num += realm->num_snaps; - memcpy(realm->cached_snaps + num, realm->prior_parent_snaps, + memcpy(sc->snaps + num, realm->prior_parent_snaps, sizeof(u64)*realm->num_prior_parent_snaps); num += realm->num_prior_parent_snaps; - sort(realm->cached_snaps, num, sizeof(u64), cmpu64_rev, NULL); - realm->num_cached_snaps = num; + sort(sc->snaps, num, sizeof(u64), cmpu64_rev, NULL); + sc->num_snaps = num; dout(10, "snaprealm_build_context %llx %p : seq %lld %d snaps\n", - realm->ino, realm, realm->cached_seq, realm->num_cached_snaps); + realm->ino, realm, sc->seq, sc->num_snaps); return 0; fail: - if (realm->cached_snaps) { - kfree(realm->cached_snaps); - realm->cached_snaps = 0; + if (realm->cached_context) { + ceph_put_snap_context(realm->cached_context); + realm->cached_context = 0; } derr(0, "snaprealm_build_context %llx %p fail %d\n", realm->ino, realm, err); @@ -149,8 +153,11 @@ void ceph_invalidate_snaprealm(struct ceph_snaprealm *realm) struct ceph_snaprealm *child; dout(10, "invalidate_snaprealm %llx %p\n", realm->ino, realm); - realm->cached_seq = 0; - + if (realm->cached_context) { + ceph_put_snap_context(realm->cached_context); + realm->cached_context = 0; + } + list_for_each(p, &realm->children) { child = list_entry(p, struct ceph_snaprealm, child_item); ceph_invalidate_snaprealm(child); @@ -160,27 +167,29 @@ void ceph_invalidate_snaprealm(struct ceph_snaprealm *realm) static int dup_array(u64 **dst, u64 *src, int num) { + int i; + if (*dst) kfree(*dst); if (num) { *dst = kmalloc(sizeof(u64) * num, GFP_NOFS); if (!*dst) return -1; - memcpy(*dst, src, sizeof(u64) * num); + for (i = 0; i < num; i++) + (*dst)[i] = le64_to_cpu(src[i]); } else *dst = 0; return 0; } -u64 ceph_update_snap_trace(struct ceph_client *client, - void *p, void *e, int must_flush) +struct ceph_snaprealm *ceph_update_snap_trace(struct ceph_client *client, + void *p, void *e, int must_flush) { struct ceph_mds_snap_realm *ri; int err = -ENOMEM; - u64 first = 0; u64 *snaps; u64 *prior_parent_snaps; - struct ceph_snaprealm *realm; + struct ceph_snaprealm *realm, *first = 0; int invalidate; more: @@ -197,8 +206,10 @@ more: realm = ceph_get_snaprealm(client, le64_to_cpu(ri->ino)); if (!realm) goto fail; - if (!first) - first = realm->ino; + if (!first) { + first = realm; + realm->nref++; + } if (le64_to_cpu(ri->seq) > realm->seq) { dout(10, "update_snap_trace updating %llx %p %lld -> %lld\n", diff --git a/src/kernel/super.c b/src/kernel/super.c index 5b563d56b65de..62326dcb64ffd 100644 --- a/src/kernel/super.c +++ b/src/kernel/super.c @@ -182,6 +182,8 @@ static struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_hold_caps_until = 0; INIT_LIST_HEAD(&ci->i_cap_delay_list); + ci->i_snaprealm = 0; + ci->i_hashval = 0; INIT_WORK(&ci->i_wb_work, ceph_inode_writeback); diff --git a/src/kernel/super.h b/src/kernel/super.h index cc849e9b89d7e..1afac0421e9e0 100644 --- a/src/kernel/super.h +++ b/src/kernel/super.h @@ -253,8 +253,8 @@ struct ceph_inode_info { int i_rd_ref, i_rdcache_ref, i_wr_ref; atomic_t i_wrbuffer_ref; - struct ceph_snaprealm *snaprealm; - struct list_head snaprealm_item; + struct ceph_snaprealm *i_snaprealm; + struct list_head i_snaprealm_item; struct work_struct i_wb_work; /* writeback work */ @@ -437,6 +437,21 @@ struct ceph_file_info { * snapshots */ +struct ceph_snap_context { + atomic_t nref; + u64 seq; + int num_snaps; + u64 snaps[]; +}; + +static inline void ceph_put_snap_context(struct ceph_snap_context *sc) +{ + if (!sc) + return; + if (atomic_dec_and_test(&sc->nref)) + kfree(sc); +} + struct ceph_snaprealm { u64 ino; int nref; @@ -453,10 +468,7 @@ struct ceph_snaprealm { struct list_head child_item; struct list_head children; - /* cached snap context */ - u64 cached_seq; /* 0 => invalidated */ - u64 *cached_snaps; - int num_cached_snaps; + struct ceph_snap_context *cached_context; struct list_head inodes_with_caps; }; @@ -469,8 +481,9 @@ extern struct ceph_snaprealm *ceph_find_snaprealm(struct ceph_client *client, extern void ceph_put_snaprealm(struct ceph_snaprealm *realm); extern int ceph_adjust_snaprealm_parent(struct ceph_client *client, struct ceph_snaprealm *realm, u64 p); -extern u64 ceph_update_snap_trace(struct ceph_client *client, - void *p, void *e, int must_flush); +extern struct ceph_snaprealm *ceph_update_snap_trace(struct ceph_client *client, + void *p, void *e, + int must_flush); extern int ceph_snaprealm_build_context(struct ceph_snaprealm *realm); extern void ceph_invalidate_snaprealm(struct ceph_snaprealm *realm); @@ -508,7 +521,8 @@ extern int ceph_dentry_lease_valid(struct dentry *dentry); extern int ceph_add_cap(struct inode *inode, struct ceph_mds_session *session, int fmode, unsigned issued, - unsigned cap, unsigned seq); + unsigned cap, unsigned seq, + void *snapblob, int snapblob_len); extern void __ceph_remove_cap(struct ceph_inode_cap *cap); extern void ceph_remove_cap(struct ceph_inode_cap *cap); extern void ceph_remove_all_caps(struct ceph_inode_info *ci); -- 2.39.5