]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: ceph_snap_context, pass context with osd writes
authorSage Weil <sage@newdream.net>
Mon, 11 Aug 2008 17:37:04 +0000 (10:37 -0700)
committerSage Weil <sage@newdream.net>
Mon, 11 Aug 2008 17:37:04 +0000 (10:37 -0700)
src/kernel/addr.c
src/kernel/file.c
src/kernel/inode.c
src/kernel/mds_client.c
src/kernel/osd_client.c
src/kernel/osd_client.h
src/kernel/snap.c
src/kernel/super.c
src/kernel/super.h

index 298ca22393a8ffc407dbd0e858548fc91e91c590..776121d20f6ae5f17403d069641c46e293fa7137 100644 (file)
@@ -212,6 +212,7 @@ static int ceph_writepage(struct page *page, struct writeback_control *wbc)
        was_dirty = PageDirty(page);
        set_page_writeback(page);
        err = ceph_osdc_writepages(osdc, ceph_vino(inode), &ci->i_layout,
+                                  ci->i_snaprealm->cached_context,
                                   page_off, len, &page, 1);
        if (err >= 0) {
                if (was_dirty) {
@@ -431,6 +432,7 @@ get_more_pages:
                        rc = ceph_osdc_writepages(&client->osdc,
                                                  ceph_vino(inode),
                                                  &ci->i_layout,
+                                                 ci->i_snaprealm->cached_context,
                                                  offset, len,
                                                  pagep,
                                                  locked_pages);
@@ -504,7 +506,7 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
                            struct page **pagep, void **fsdata)
 {
        struct inode *inode = file->f_dentry->d_inode;
-       struct ceph_inode_info *ci;
+       struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->osdc;
        struct page *page;
        pgoff_t index = pos >> PAGE_CACHE_SHIFT;
@@ -523,6 +525,13 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
        dout(10, "write_begin file %p inode %p page %p %d~%d\n", file,
             inode, page, (int)pos, (int)len);
 
+       /* build snap context */
+       if (!ci->i_snaprealm->cached_context) {
+               r = ceph_snaprealm_build_context(ci->i_snaprealm);
+               if (r < 0)
+                       return r;
+       }               
+
        if (PageUptodate(page))
                return 0;
 
@@ -541,7 +550,6 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
        /* we need to read it. */
        /* or, do sub-page granularity dirty accounting? */
        /* try to read the full page */
-       ci = ceph_inode(inode);
        r = ceph_osdc_readpage(osdc, ceph_vino(inode), &ci->i_layout,
                               page_off, PAGE_SIZE, page);
        if (r < 0)
index 16077650499be34d7c3b28a026268a16fc92dff7..3141c66bad02d4b04dafc0988f8445ccda392ee2 100644 (file)
@@ -237,6 +237,7 @@ static ssize_t ceph_sync_write(struct file *file, const char __user *data,
 
        ret = ceph_osdc_sync_write(&client->osdc, ceph_vino(inode),
                                   &ci->i_layout,
+                                  ci->i_snaprealm->cached_context,
                                   pos, count, data);
        if (ret > 0) {
                pos += ret;
index 19a3a1fc855838528b96c5f2639f4c0eb928ba8b..454e25311f479cc7c15da177b57aa93e829b77b0 100644 (file)
@@ -1104,14 +1104,21 @@ int ceph_get_cap_mds(struct inode *inode)
 int ceph_add_cap(struct inode *inode,
                 struct ceph_mds_session *session,
                 int fmode, unsigned issued,
-                unsigned seq, unsigned mseq)
+                unsigned seq, unsigned mseq,
+                void *snapblob, int snapblob_len)
 {
        int mds = session->s_mds;
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_inode_cap *cap, *new_cap = 0;
        int i;
        int is_new = 0;
+       struct ceph_snaprealm *realm = 0;
 
+       if (snapblob_len)
+               realm = ceph_update_snap_trace(ceph_inode_to_client(inode),
+                                              snapblob, snapblob+snapblob_len,
+                                              0);
+       
        dout(10, "ceph_add_cap on %p mds%d cap %d seq %d\n", inode,
             session->s_mds, issued, seq);
        spin_lock(&inode->i_lock);
@@ -1154,6 +1161,11 @@ int ceph_add_cap(struct inode *inode,
                        ci->i_cap_exporting_mds = -1;
                }
        }
+       if (!ci->i_snaprealm) {
+               ci->i_snaprealm = realm;
+               list_add(&ci->i_snaprealm_item, &realm->inodes_with_caps);
+       } else
+               ceph_put_snaprealm(realm);
 
        dout(10, "add_cap inode %p (%llx.%llx) cap %xh now %xh seq %d mds%d\n",
             inode, ceph_vinop(inode), issued, issued|cap->issued, seq, mds);
@@ -1679,7 +1691,7 @@ void ceph_handle_cap_import(struct inode *inode, struct ceph_mds_caps *im,
                     inode, ci, mds, mseq);
        }
 
-       ceph_add_cap(inode, session, -1, issued, seq, mseq);
+       ceph_add_cap(inode, session, -1, issued, seq, mseq, 0, 0); /* FIXME */
 }
 
 
index b2b5e6959a12e7cfe94a2e4fa37e1dd84f9ba46e..f7cc3539227c89c60577251feaf49b1f7f59a23d 100644 (file)
@@ -1178,11 +1178,6 @@ void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
        if (err)
                goto done;
        if (result == 0) {
-               /* snap trace? */
-               if (rinfo->snapblob_len)
-                       ceph_update_snap_trace(mdsc->client, rinfo->snapblob,
-                                      rinfo->snapblob+rinfo->snapblob_len, 0);
-
                /* caps? */
                if (req->r_expects_cap && req->r_last_inode) {
                        cap = le32_to_cpu(rinfo->head->file_caps);
@@ -1191,7 +1186,9 @@ void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
                        err = ceph_add_cap(req->r_last_inode,
                                           req->r_session,
                                           req->r_fmode,
-                                          cap, capseq, mseq);
+                                          cap, capseq, mseq,
+                                          rinfo->snapblob,
+                                          rinfo->snapblob_len);
                        if (err)
                                goto done;
                }
index fb29e1d5da9738957ac2afbc0f189c4077a47716..9cbf41d926b7cbdf39ad3c65a93dd35e35820486 100644 (file)
@@ -43,13 +43,16 @@ static void put_request(struct ceph_osd_request *req)
        }
 }
 
-struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int op)
+struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int op,
+                                struct ceph_snap_context *snapc)
 {
        struct ceph_msg *req;
        struct ceph_osd_request_head *head;
+       size_t size = sizeof(struct ceph_osd_request_head);
 
-       req = ceph_msg_new(CEPH_MSG_OSD_OP,
-                          sizeof(struct ceph_osd_request_head), 0, 0, 0);
+       if (snapc)
+               size += sizeof(u64) + snapc->num_snaps;
+       req = ceph_msg_new(CEPH_MSG_OSD_OP, size, 0, 0, 0);
        if (IS_ERR(req))
                return req;
        memset(req->front.iov_base, 0, req->front.iov_len);
@@ -60,20 +63,29 @@ struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int op)
        head->client_inc = 1; /* always, for now. */
        head->flags = 0;
 
+       /* snaps */
+       if (snapc) {
+               head->snap_seq = cpu_to_le64(snapc->seq);
+               head->num_snaps = cpu_to_le32(snapc->num_snaps);
+               memcpy(req->front.iov_base + sizeof(*head), snapc->snaps,
+                      snapc->num_snaps*sizeof(u64));
+               dout(10, "snapc seq %lld %d snaps\n", snapc->seq,
+                    snapc->num_snaps);
+       }
        return req;
 }
 
-static struct ceph_osd_request *alloc_request(int nr_pages,
+static struct ceph_osd_request *alloc_request(int num_pages,
                                              struct ceph_msg *msg)
 {
        struct ceph_osd_request *req;
 
-       req = kmalloc(sizeof(*req) + nr_pages*sizeof(void *), GFP_NOFS);
+       req = kmalloc(sizeof(*req) + num_pages*sizeof(void *), GFP_NOFS);
        if (req == NULL)
                return ERR_PTR(-ENOMEM);
        req->r_aborted = 0;
        req->r_request = msg;
-       req->r_nr_pages = nr_pages;
+       req->r_num_pages = num_pages;
        atomic_set(&req->r_ref, 1);
        memset(&req->r_last_osd, 0, sizeof(req->r_last_osd));
        return req;
@@ -440,10 +452,10 @@ int ceph_osdc_prepare_pages(void *p, struct ceph_msg *m, int want)
                goto out;
        }
        dout(10, "prepare_pages tid %llu have %d pages, want %d\n",
-            tid, req->r_nr_pages, want);
-       if (likely(req->r_nr_pages >= want)) {
+            tid, req->r_num_pages, want);
+       if (likely(req->r_num_pages >= want)) {
                m->pages = req->r_pages;
-               m->nr_pages = req->r_nr_pages;
+               m->nr_pages = req->r_num_pages;
                ceph_msg_get(m);
                req->r_reply = m;
                ret = 0; /* success */
@@ -588,36 +600,36 @@ int ceph_osdc_sync_read(struct ceph_osd_client *osdc, struct ceph_vino vino,
 {
        struct ceph_msg *reqm;
        struct ceph_osd_request *req;
-       int nr_pages, i, po, left, l;
+       int num_pages, i, po, left, l;
        __s32 rc;
 
        dout(10, "sync_read on vino %llx.%llx at %llu~%llu\n", vino.ino,
             vino.snap, off, len);
 
        /* request msg */
-       reqm = new_request_msg(osdc, CEPH_OSD_OP_READ);
+       reqm = new_request_msg(osdc, CEPH_OSD_OP_READ, 0);
        if (IS_ERR(reqm))
                return PTR_ERR(reqm);
 
-       nr_pages = calc_pages_for(off, len);
-       req = alloc_request(nr_pages, reqm);
+       num_pages = calc_pages_for(off, len);
+       req = alloc_request(num_pages, reqm);
        if (IS_ERR(req))
                return PTR_ERR(req);
 
        len = calc_layout(osdc, vino, layout, off, len, req);
-       nr_pages = calc_pages_for(off, len);  /* recalc */
-       dout(10, "sync_read %llu~%llu -> %d pages\n", off, len, nr_pages);
+       num_pages = calc_pages_for(off, len);  /* recalc */
+       dout(10, "sync_read %llu~%llu -> %d pages\n", off, len, num_pages);
 
        /* allocate temp pages to hold data */
-       for (i = 0; i < nr_pages; i++) {
+       for (i = 0; i < num_pages; i++) {
                req->r_pages[i] = alloc_page(GFP_NOFS);
                if (req->r_pages[i] == NULL) {
-                       req->r_nr_pages = i+1;
+                       req->r_num_pages = i+1;
                        put_request(req);
                        return -ENOMEM;
                }
        }
-       reqm->nr_pages = nr_pages;
+       reqm->nr_pages = num_pages;
        reqm->pages = req->r_pages;
        reqm->hdr.data_len = cpu_to_le32(len);
        reqm->hdr.data_off = cpu_to_le32(off);
@@ -673,7 +685,7 @@ int ceph_osdc_readpage(struct ceph_osd_client *osdc, struct ceph_vino vino,
             vino.snap, off, len);
 
        /* request msg */
-       reqm = new_request_msg(osdc, CEPH_OSD_OP_READ);
+       reqm = new_request_msg(osdc, CEPH_OSD_OP_READ, 0);
        if (IS_ERR(reqm))
                return PTR_ERR(reqm);
        reqhead = reqm->front.iov_base;
@@ -704,7 +716,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
                        struct address_space *mapping,
                        struct ceph_vino vino, struct ceph_file_layout *layout,
                        __u64 off, __u64 len,
-                       struct list_head *page_list, int nr_pages)
+                       struct list_head *page_list, int num_pages)
 {
        struct ceph_msg *reqm;
        struct ceph_osd_request *req;
@@ -717,16 +729,16 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
         * for now, our strategy is simple: start with the
         * initial page, and fetch as much of that object as
         * we can that falls within the range specified by
-        * nr_pages.
+        * num_pages.
         */
        dout(10, "readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
             vino.snap, off, len);
 
        /* alloc request, w/ optimistically-sized page vector */
-       reqm = new_request_msg(osdc, CEPH_OSD_OP_READ);
+       reqm = new_request_msg(osdc, CEPH_OSD_OP_READ, 0);
        if (IS_ERR(reqm))
                return PTR_ERR(reqm);
-       req = alloc_request(nr_pages, reqm);
+       req = alloc_request(num_pages, reqm);
        if (req == 0) {
                ceph_msg_put(reqm);
                return -ENOMEM;
@@ -743,7 +755,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
                } else
                        break;
        }
-       dout(10, "readpages found %d/%d contig\n", contig_pages, nr_pages);
+       dout(10, "readpages found %d/%d contig\n", contig_pages, num_pages);
        if (contig_pages == 0)
                goto out;
        len = min((contig_pages << PAGE_CACHE_SHIFT) - (off & ~PAGE_CACHE_MASK),
@@ -752,9 +764,9 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
 
        /* request msg */
        len = calc_layout(osdc, vino, layout, off, len, req);
-       req->r_nr_pages = calc_pages_for(off, len);
+       req->r_num_pages = calc_pages_for(off, len);
        dout(10, "readpages final extent is %llu~%llu -> %d pages\n",
-            off, len, req->r_nr_pages);
+            off, len, req->r_num_pages);
        rc = do_request(osdc, req);
 
 out:
@@ -771,45 +783,46 @@ out:
  */
 int ceph_osdc_sync_write(struct ceph_osd_client *osdc, struct ceph_vino vino,
                         struct ceph_file_layout *layout,
+                        struct ceph_snap_context *snapc,
                         __u64 off, __u64 len, const char __user *data)
 {
        struct ceph_msg *reqm;
        struct ceph_osd_request_head *reqhead;
        struct ceph_osd_request *req;
-       int nr_pages, i, po, l, left;
+       int num_pages, i, po, l, left;
        __s32 rc;
 
        dout(10, "sync_write on ino %llx.%llx at %llu~%llu\n", vino.ino,
             vino.snap, off, len);
 
        /* request msg */
-       reqm = new_request_msg(osdc, CEPH_OSD_OP_WRITE);
+       reqm = new_request_msg(osdc, CEPH_OSD_OP_WRITE, snapc);
        if (IS_ERR(reqm))
                return PTR_ERR(reqm);
        reqhead = reqm->front.iov_base;
        reqhead->flags = CEPH_OSD_OP_ACK;  /* just ack.. FIXME */
 
        /* how many pages? */
-       nr_pages = calc_pages_for(off, len);
-       req = alloc_request(nr_pages, reqm);
+       num_pages = calc_pages_for(off, len);
+       req = alloc_request(num_pages, reqm);
        if (IS_ERR(req)) {
                ceph_msg_put(reqm);
                return PTR_ERR(req);
        }
 
        len = calc_layout(osdc, vino, layout, off, len, req);
-       nr_pages = calc_pages_for(off, len);  /* recalc */
-       dout(10, "sync_write %llu~%llu -> %d pages\n", off, len, nr_pages);
+       num_pages = calc_pages_for(off, len);  /* recalc */
+       dout(10, "sync_write %llu~%llu -> %d pages\n", off, len, num_pages);
 
        /* copy data into a set of pages */
        left = len;
        po = off & ~PAGE_MASK;
        rc = -EFAULT;
-       for (i = 0; i < nr_pages; i++) {
+       for (i = 0; i < num_pages; i++) {
                int bad;
                req->r_pages[i] = alloc_page(GFP_NOFS);
                if (req->r_pages[i] == NULL) {
-                       req->r_nr_pages = i+1;
+                       req->r_num_pages = i+1;
                        put_request(req);
                        return -ENOMEM;
                }
@@ -827,7 +840,7 @@ int ceph_osdc_sync_write(struct ceph_osd_client *osdc, struct ceph_vino vino,
                }
        }
        reqm->pages = req->r_pages;
-       reqm->nr_pages = nr_pages;
+       reqm->nr_pages = num_pages;
        reqm->hdr.data_len = cpu_to_le32(len);
        reqm->hdr.data_off = cpu_to_le32(off);
 
@@ -845,8 +858,9 @@ out:
  */
 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
                         struct ceph_file_layout *layout,
+                        struct ceph_snap_context *snapc,
                         loff_t off, loff_t len,
-                        struct page **pages, int nr_pages)
+                        struct page **pages, int num_pages)
 {
        struct ceph_msg *reqm;
        struct ceph_osd_request_head *reqhead;
@@ -856,10 +870,10 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
        BUG_ON(vino.snap != CEPH_NOSNAP);
 
        /* request + msg */
-       reqm = new_request_msg(osdc, CEPH_OSD_OP_WRITE);
+       reqm = new_request_msg(osdc, CEPH_OSD_OP_WRITE, snapc);
        if (IS_ERR(reqm))
                return PTR_ERR(reqm);
-       req = alloc_request(nr_pages, reqm);
+       req = alloc_request(num_pages, reqm);
        if (IS_ERR(req)) {
                ceph_msg_put(reqm);
                return PTR_ERR(req);
@@ -872,13 +886,13 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
                reqhead->flags = CEPH_OSD_OP_SAFE;
 
        len = calc_layout(osdc, vino, layout, off, len, req);
-       nr_pages = calc_pages_for(off, len);
-       dout(10, "writepages %llu~%llu -> %d pages\n", off, len, nr_pages);
-
+       num_pages = calc_pages_for(off, len);
+       dout(10, "writepages %llu~%llu -> %d pages\n", off, len, num_pages);
+       
        /* copy pages */
-       memcpy(req->r_pages, pages, nr_pages * sizeof(struct page *));
+       memcpy(req->r_pages, pages, num_pages * sizeof(struct page *));
        reqm->pages = req->r_pages;
-       reqm->nr_pages = req->r_nr_pages = nr_pages;
+       reqm->nr_pages = req->r_num_pages = num_pages;
        reqm->hdr.data_len = len;
        reqm->hdr.data_off = off;
 
index 72e942e02721020f800fd1f0482a9f84761e70a5..20cf1ac4dca325be222ef71196d89da4379e2b75 100644 (file)
@@ -11,6 +11,7 @@
 #include "osdmap.h"
 
 struct ceph_msg;
+struct ceph_snap_context;
 
 /*
  * pending request
@@ -32,7 +33,7 @@ struct ceph_osd_request {
        int               r_result;
        atomic_t          r_ref;
        struct completion r_completion;      /* on ack or commit or read? */
-       unsigned          r_nr_pages;        /* size of page array (follows) */
+       unsigned          r_num_pages;        /* size of page array (follows) */
        struct page      *r_pages[0];        /* pages for data payload */
 };
 
@@ -72,19 +73,11 @@ extern int ceph_osdc_readpages(struct ceph_osd_client *osdc,
                               struct ceph_file_layout *layout,
                               __u64 off, __u64 len,
                               struct list_head *page_list, int nr_pages);
-extern int ceph_osdc_prepare_write(struct ceph_osd_client *osdc,
-                                  struct ceph_vino vino,
-                                  struct ceph_file_layout *layout,
-                                  loff_t off, loff_t len,
-                                  struct page *page);
-extern int ceph_osdc_commit_write(struct ceph_osd_client *osdc,
-                                 struct ceph_vino vino,
-                                 struct ceph_file_layout *layout,
-                                 loff_t off, loff_t len,
-                                 struct page *page);
+
 extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
                                struct ceph_vino vino,
                                struct ceph_file_layout *layout,
+                               struct ceph_snap_context *sc,
                                loff_t off, loff_t len,
                                struct page **pagevec, int nr_pages);
 
@@ -96,24 +89,9 @@ extern int ceph_osdc_sync_read(struct ceph_osd_client *osdc,
 extern int ceph_osdc_sync_write(struct ceph_osd_client *osdc,
                                struct ceph_vino vino,
                                struct ceph_file_layout *layout,
+                               struct ceph_snap_context *sc,
                                __u64 off, __u64 len,
                                const char __user *data);
 
-extern int ceph_osdc_prepare_write(struct ceph_osd_client *osdc,
-                                  struct ceph_vino vino,
-                                  struct ceph_file_layout *layout,
-                                  loff_t off, loff_t len,
-                                  struct page *page);
-extern int ceph_osdc_commit_write(struct ceph_osd_client *osdc,
-                                 struct ceph_vino vino,
-                                 struct ceph_file_layout *layout,
-                                 loff_t off, loff_t len,
-                                 struct page *page);
-extern int ceph_osdc_writepage(struct ceph_osd_client *osdc,
-                              struct ceph_vino vino,
-                              struct ceph_file_layout *layout,
-                              loff_t off, loff_t len,
-                              struct page *page);
-
 #endif
 
index 6eed93480fd5139873f3ff84505ab7b9e61c0e0f..74c396cf1d4647d85386eb154fc2709498522079 100644 (file)
@@ -47,7 +47,7 @@ void ceph_put_snaprealm(struct ceph_snaprealm *realm)
        if (realm->nref == 0) {
                kfree(realm->prior_parent_snaps);
                kfree(realm->snaps);
-               kfree(realm->cached_snaps);
+               ceph_put_snap_context(realm->cached_context);
                kfree(realm);
        }
 }
@@ -89,54 +89,58 @@ static int cmpu64_rev(const void *a, const void *b)
 int ceph_snaprealm_build_context(struct ceph_snaprealm *realm)
 {
        struct ceph_snaprealm *parent = realm->parent;
+       struct ceph_snap_context *sc;
        int err = 0;
        int i;
        int num = realm->num_prior_parent_snaps + realm->num_snaps;
 
        if (parent) {
-               if (!parent->cached_seq) {
+               if (!parent->cached_context) {
                        err = ceph_snaprealm_build_context(parent);
                        if (err)
                                goto fail;
                }
-               num += parent->num_cached_snaps;  /* possible overestimate */
+               num += parent->cached_context->num_snaps;
        }
 
-       if (realm->cached_snaps)
-               kfree(realm->cached_snaps);
+       if (realm->cached_context)
+               ceph_put_snap_context(realm->cached_context);
        err = -ENOMEM;
-       realm->cached_snaps = kmalloc(num * sizeof(u64), GFP_NOFS);
-       if (!realm->cached_snaps)
+       realm->cached_context = sc = kzalloc(sizeof(*sc) + num*sizeof(u64),
+                                            GFP_NOFS);
+       if (!realm->cached_context)
                goto fail;
+       atomic_set(&sc->nref, 1);
 
        /* build (reverse sorted) snap vector */
        num = 0;
-       realm->cached_seq = realm->seq;
+       sc->seq = realm->seq;
        if (parent) {
-               for (i = 0; i < parent->num_cached_snaps; i++)
-                       if (parent->cached_snaps[i] >= realm->parent_since)
-                               realm->cached_snaps[num++] =
-                                       parent->cached_snaps[i];
-               if (parent->cached_seq > realm->cached_seq)
-                       realm->cached_seq = parent->cached_seq;
+               for (i = 0; i < parent->cached_context->num_snaps; i++)
+                       if (parent->cached_context->snaps[i] >=
+                           realm->parent_since)
+                               sc->snaps[num++] =
+                                       parent->cached_context->snaps[i];
+               if (parent->cached_context->seq > sc->seq)
+                       sc->seq = parent->cached_context->seq;
        }
-       memcpy(realm->cached_snaps + num, realm->snaps,
+       memcpy(sc->snaps + num, realm->snaps,
               sizeof(u64)*realm->num_snaps);
        num += realm->num_snaps;
-       memcpy(realm->cached_snaps + num, realm->prior_parent_snaps,
+       memcpy(sc->snaps + num, realm->prior_parent_snaps,
               sizeof(u64)*realm->num_prior_parent_snaps);
        num += realm->num_prior_parent_snaps;
 
-       sort(realm->cached_snaps, num, sizeof(u64), cmpu64_rev, NULL);
-       realm->num_cached_snaps = num;
+       sort(sc->snaps, num, sizeof(u64), cmpu64_rev, NULL);
+       sc->num_snaps = num;
        dout(10, "snaprealm_build_context %llx %p : seq %lld %d snaps\n",
-            realm->ino, realm, realm->cached_seq, realm->num_cached_snaps);
+            realm->ino, realm, sc->seq, sc->num_snaps);
        return 0;
 
 fail:
-       if (realm->cached_snaps) {
-               kfree(realm->cached_snaps);
-               realm->cached_snaps = 0;
+       if (realm->cached_context) {
+               ceph_put_snap_context(realm->cached_context);
+               realm->cached_context = 0;
        }
        derr(0, "snaprealm_build_context %llx %p fail %d\n", realm->ino,
             realm, err);
@@ -149,8 +153,11 @@ void ceph_invalidate_snaprealm(struct ceph_snaprealm *realm)
        struct ceph_snaprealm *child;
 
        dout(10, "invalidate_snaprealm %llx %p\n", realm->ino, realm);
-       realm->cached_seq = 0;
-       
+       if (realm->cached_context) {
+               ceph_put_snap_context(realm->cached_context);
+               realm->cached_context = 0;
+       }
+
        list_for_each(p, &realm->children) {
                child = list_entry(p, struct ceph_snaprealm, child_item);
                ceph_invalidate_snaprealm(child);
@@ -160,27 +167,29 @@ void ceph_invalidate_snaprealm(struct ceph_snaprealm *realm)
 
 static int dup_array(u64 **dst, u64 *src, int num)
 {
+       int i;
+
        if (*dst)
                kfree(*dst);
        if (num) {
                *dst = kmalloc(sizeof(u64) * num, GFP_NOFS);
                if (!*dst)
                        return -1;
-               memcpy(*dst, src, sizeof(u64) * num);
+               for (i = 0; i < num; i++)
+                       (*dst)[i] = le64_to_cpu(src[i]);
        } else
                *dst = 0;
        return 0;
 }
 
-u64 ceph_update_snap_trace(struct ceph_client *client,
-                          void *p, void *e, int must_flush)
+struct ceph_snaprealm *ceph_update_snap_trace(struct ceph_client *client,
+                                             void *p, void *e, int must_flush)
 {
        struct ceph_mds_snap_realm *ri;
        int err = -ENOMEM;
-       u64 first = 0;
        u64 *snaps;
        u64 *prior_parent_snaps;
-       struct ceph_snaprealm *realm;
+       struct ceph_snaprealm *realm, *first = 0;
        int invalidate;
 
 more:
@@ -197,8 +206,10 @@ more:
        realm = ceph_get_snaprealm(client, le64_to_cpu(ri->ino));
        if (!realm)
                goto fail;
-       if (!first)
-               first = realm->ino;
+       if (!first) {
+               first = realm;
+               realm->nref++;
+       }
 
        if (le64_to_cpu(ri->seq) > realm->seq) {
                dout(10, "update_snap_trace updating %llx %p %lld -> %lld\n",
index 5b563d56b65de00e8c77e6d6372246a32464e785..62326dcb64ffd035b3bf28b754eb2b2309e7fc1a 100644 (file)
@@ -182,6 +182,8 @@ static struct inode *ceph_alloc_inode(struct super_block *sb)
        ci->i_hold_caps_until = 0;
        INIT_LIST_HEAD(&ci->i_cap_delay_list);
 
+       ci->i_snaprealm = 0;
+
        ci->i_hashval = 0;
 
        INIT_WORK(&ci->i_wb_work, ceph_inode_writeback);
index cc849e9b89d7e5747ef2a528e92023e6810a190b..1afac0421e9e0489d95d4e10538100e604fea388 100644 (file)
@@ -253,8 +253,8 @@ struct ceph_inode_info {
        int i_rd_ref, i_rdcache_ref, i_wr_ref;
        atomic_t i_wrbuffer_ref;
 
-       struct ceph_snaprealm *snaprealm;
-       struct list_head snaprealm_item;
+       struct ceph_snaprealm *i_snaprealm;
+       struct list_head i_snaprealm_item;
 
        struct work_struct i_wb_work;  /* writeback work */
 
@@ -437,6 +437,21 @@ struct ceph_file_info {
  * snapshots
  */
 
+struct ceph_snap_context {
+       atomic_t nref;
+       u64 seq;
+       int num_snaps;
+       u64 snaps[];
+};
+
+static inline void ceph_put_snap_context(struct ceph_snap_context *sc)
+{
+       if (!sc)
+               return;
+       if (atomic_dec_and_test(&sc->nref))
+               kfree(sc);
+}
+
 struct ceph_snaprealm {
        u64 ino;
        int nref;
@@ -453,10 +468,7 @@ struct ceph_snaprealm {
        struct list_head child_item;
        struct list_head children;
 
-       /* cached snap context */
-       u64 cached_seq;       /* 0 => invalidated */
-       u64 *cached_snaps;
-       int num_cached_snaps;
+       struct ceph_snap_context *cached_context;
 
        struct list_head inodes_with_caps;
 };
@@ -469,8 +481,9 @@ extern struct ceph_snaprealm *ceph_find_snaprealm(struct ceph_client *client,
 extern void ceph_put_snaprealm(struct ceph_snaprealm *realm);
 extern int ceph_adjust_snaprealm_parent(struct ceph_client *client,
                                        struct ceph_snaprealm *realm, u64 p);
-extern u64 ceph_update_snap_trace(struct ceph_client *client,
-                                 void *p, void *e, int must_flush);
+extern struct ceph_snaprealm *ceph_update_snap_trace(struct ceph_client *client,
+                                                    void *p, void *e,
+                                                    int must_flush);
 extern int ceph_snaprealm_build_context(struct ceph_snaprealm *realm);
 extern void ceph_invalidate_snaprealm(struct ceph_snaprealm *realm);
 
@@ -508,7 +521,8 @@ extern int ceph_dentry_lease_valid(struct dentry *dentry);
 extern int ceph_add_cap(struct inode *inode,
                        struct ceph_mds_session *session,
                        int fmode, unsigned issued,
-                       unsigned cap, unsigned seq);
+                       unsigned cap, unsigned seq,
+                       void *snapblob, int snapblob_len);
 extern void __ceph_remove_cap(struct ceph_inode_cap *cap);
 extern void ceph_remove_cap(struct ceph_inode_cap *cap);
 extern void ceph_remove_all_caps(struct ceph_inode_info *ci);