]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: handle_snap
authorSage Weil <sage@newdream.net>
Wed, 13 Aug 2008 17:00:55 +0000 (10:00 -0700)
committerSage Weil <sage@newdream.net>
Wed, 13 Aug 2008 19:37:21 +0000 (12:37 -0700)
src/include/ceph_fs.h
src/kernel/file.c
src/kernel/inode.c
src/kernel/mds_client.c
src/kernel/mds_client.h
src/kernel/osd_client.c
src/kernel/snap.c
src/kernel/super.c
src/kernel/super.h
src/messages/MClientSnap.h

index fab56442fed323f7ca0aea999994865f35850cbc..2ff8a020e6db207b01d939dddce8f703c1ea3b96 100644 (file)
@@ -873,7 +873,7 @@ struct ceph_mds_snap_head {
        __le32 num_split_inos;
        __le32 num_split_realms;
        __le32 trace_len;
-};
+} __attribute__ ((packed));
 /* followed by split inos, then split realms, then the trace blob */
 
 /*
@@ -887,7 +887,7 @@ struct ceph_mds_snap_realm {
        __le64 seq;           /* snap: version */
        __le32 num_snaps;
        __le32 num_prior_parent_snaps;
-};
+} __attribute__ ((packed));
 /* followed by my snaps, then prior parent snaps */
 
 /*
index 230d07dc7edc620c6107660221bae5f032e266b1..6903aa3501e3af82db6aba77fce6bb5e27512c7b 100644 (file)
@@ -317,7 +317,7 @@ static void check_max_size(struct inode *inode, loff_t endoff)
        }
        spin_unlock(&inode->i_lock);
        if (check)
-               ceph_check_caps(ci, 0);
+               ceph_check_caps(ci, 0, 0);
 }
 
 /*
index 9eba15330afbb8a54012607c9f2c5ac02492ade8..38676905c67d16b11592c6ccc09ba300c13166ac 100644 (file)
@@ -1172,6 +1172,7 @@ int ceph_add_cap(struct inode *inode,
                cap->issued = cap->implemented = 0;
                cap->mds = mds;
                cap->flags = 0;
+               cap->flushed_snap = 0;
 
                cap->ci = ci;
                list_add(&cap->ci_caps, &ci->i_caps);
@@ -1301,7 +1302,7 @@ void __ceph_cap_delay_requeue(struct ceph_mds_client *mdsc,
  *  release, ack revoked caps to mds as appropriate.
  * @is_delayed if caller just dropped a cap ref, and we probably want to delay
  */
-void ceph_check_caps(struct ceph_inode_info *ci, int is_delayed)
+void ceph_check_caps(struct ceph_inode_info *ci, int is_delayed, int flush_snap)
 {
        struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode);
        struct ceph_mds_client *mdsc = &client->mdsc;
@@ -1348,6 +1349,21 @@ retry:
                        goto ack;
                }
 
+               /* flush snap? */
+               if (flush_snap &&
+                   (cap->issued & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER))) {
+                       if (cap->flushed_snap >=
+                           ci->i_snaprealm->cached_context->seq) {
+                               dout(10, "flushed_snap %llu >= seq %lld, "
+                                    "not flushing mds%d\n",
+                                    cap->flushed_snap,
+                                    ci->i_snaprealm->cached_context->seq,
+                                    cap->session->s_mds);
+                               continue;  /* already flushed for this snap */
+                       }
+                       goto ack;
+               }
+
                if ((cap->issued & ~wanted) == 0)
                        continue;     /* nothing extra, all good */
 
@@ -1377,9 +1393,11 @@ ack:
 
                /* send_cap drops i_lock */
                removed_last = __ceph_mdsc_send_cap(mdsc, session, cap,
-                                                   used, wanted, !is_delayed);
+                                                   used, wanted, !is_delayed,
+                                                   flush_snap);
                if (removed_last)
                        goto out;
+               /* retake i_lock and restart our cap scan. */
                goto retry;
        }
 
@@ -1403,7 +1421,7 @@ void ceph_inode_set_size(struct inode *inode, loff_t size)
        if ((size << 1) >= ci->i_max_size &&
            (ci->i_reported_size << 1) < ci->i_max_size) {
                spin_unlock(&inode->i_lock);
-               ceph_check_caps(ci, 0);
+               ceph_check_caps(ci, 0, 0);
        } else
                spin_unlock(&inode->i_lock);
 }
@@ -1420,7 +1438,7 @@ void ceph_put_fmode(struct ceph_inode_info *ci, int fmode)
        spin_unlock(&ci->vfs_inode.i_lock);
 
        if (last && ci->i_vino.snap == CEPH_NOSNAP)
-               ceph_check_caps(ci, 0);
+               ceph_check_caps(ci, 0, 0);
 }
 
 
@@ -1577,7 +1595,7 @@ static int apply_truncate(struct inode *inode, loff_t size)
                spin_unlock(&inode->i_lock);
        }
        if (atomic_read(&ci->i_wrbuffer_ref) == 0)
-               ceph_check_caps(ci, 0);
+               ceph_check_caps(ci, 0, 0);
        return rc;
 }
 
@@ -1610,7 +1628,7 @@ void __ceph_do_pending_vmtruncate(struct inode *inode)
                dout(10, "__do_pending_vmtruncate %p to %lld\n", inode, to);
                vmtruncate(inode, to);
                if (atomic_read(&ci->i_wrbuffer_ref) == 0)
-                       ceph_check_caps(ci, 0);
+                       ceph_check_caps(ci, 0, 0);
        } else
                dout(10, "__do_pending_vmtruncate %p nothing to do\n", inode);
 }
@@ -1815,7 +1833,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
             last ? "last":"");
 
        if (last)
-               ceph_check_caps(ci, 0);
+               ceph_check_caps(ci, 0, 0);
 }
 
 void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr)
@@ -1833,7 +1851,7 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr)
        WARN_ON(v < 0);
 
        if (was_last)
-               ceph_check_caps(ci, 0);
+               ceph_check_caps(ci, 0, 0);
 }
 
 
index 2d4fc2541a6189b65422650f68a8d74b9860ccb1..b8f09f3b342c255a1fd497c12dd494b1dc627c0a 100644 (file)
@@ -1484,16 +1484,18 @@ static void check_new_map(struct ceph_mds_client *mdsc,
 
 /* caps */
 
-static void send_cap_ack(struct ceph_mds_client *mdsc, __u64 ino, int caps,
-                        int wanted, __u32 seq, __u64 size, __u64 max_size,
+static void send_cap_ack(struct ceph_mds_client *mdsc, __u64 ino, int op,
+                        int caps, int wanted, __u32 seq,
+                        __u64 size, __u64 max_size,
                         struct timespec *mtime, struct timespec *atime,
                         u64 time_warp_seq, u64 follows, int mds)
 {
        struct ceph_mds_caps *fc;
        struct ceph_msg *msg;
 
-       dout(10, "send_cap_ack %llx ca %d wa %d seq %u follows %lld sz %llu\n",
-            ino, caps, wanted, (unsigned)seq, follows, size);
+       dout(10, "send_cap_ack %s %llx caps %d wanted %d seq %u follows %lld"
+            " size %llu\n", ceph_cap_op_name(op), ino, caps, wanted,
+            (unsigned)seq, follows, size);
 
        msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), 0, 0, 0);
        if (IS_ERR(msg))
@@ -1537,7 +1539,7 @@ void ceph_mdsc_handle_caps(struct ceph_mds_client *mdsc,
        dout(10, "handle_caps from mds%d\n", mds);
 
        /* decode */
-       if (msg->front.iov_len != sizeof(*h))
+       if (msg->front.iov_len < sizeof(*h))
                goto bad;
        h = msg->front.iov_base;
        op = le32_to_cpu(h->op);
@@ -1565,7 +1567,8 @@ void ceph_mdsc_handle_caps(struct ceph_mds_client *mdsc,
 
        if (!inode) {
                dout(10, "i don't have ino %llx, sending release\n", vino.ino);
-               send_cap_ack(mdsc, vino.ino, 0, 0, seq, size, 0, 0, 0, 0, 0, mds);
+               send_cap_ack(mdsc, vino.ino, CEPH_CAP_OP_RELEASE, 0, 0, seq,
+                            size, 0, 0, 0, 0, 0, mds);
                goto no_inode;
        }
 
@@ -1598,7 +1601,7 @@ no_inode:
        return;
 
 bad:
-       dout(10, "corrupt caps message\n");
+       derr(10, "corrupt caps message\n");
        return;
 }
 
@@ -1623,7 +1626,8 @@ static void __cap_delay_cancel(struct ceph_mds_client *mdsc,
 int __ceph_mdsc_send_cap(struct ceph_mds_client *mdsc,
                         struct ceph_mds_session *session,
                         struct ceph_inode_cap *cap,
-                        int used, int wanted, int cancel_work)
+                        int used, int wanted,
+                        int cancel_work, int flush_snap)
 {
        struct ceph_inode_info *ci = cap->ci;
        struct inode *inode = &ci->vfs_inode;
@@ -1635,6 +1639,12 @@ int __ceph_mdsc_send_cap(struct ceph_mds_client *mdsc,
        struct timespec mtime, atime;
        int removed_last = 0;
        int wake = 0;
+       int op = CEPH_CAP_OP_ACK;
+
+       if (flush_snap)
+               op = CEPH_CAP_OP_FLUSHSNAP;
+       else if (wanted == 0)
+               op = CEPH_CAP_OP_RELEASE;
 
        dout(10, "__send_cap cap %p session %p %d -> %d\n", cap, cap->session,
             cap->issued, cap->issued & wanted);
@@ -1655,12 +1665,14 @@ int __ceph_mdsc_send_cap(struct ceph_mds_client *mdsc,
        atime = inode->i_atime;
        time_warp_seq = ci->i_time_warp_seq;
        follows = ci->i_snaprealm->cached_context->seq;
-       if (wanted == 0) {
+       if (wanted == 0 && !flush_snap) {
                __ceph_remove_cap(cap);
                removed_last = list_empty(&ci->i_caps);
                if (removed_last && cancel_work)
                        __cap_delay_cancel(mdsc, ci);
        }
+       if (flush_snap)
+               cap->flushed_snap = follows; /* so we only flush it once */
        spin_unlock(&inode->i_lock);
 
        if (dropping & CEPH_CAP_RDCACHE) {
@@ -1673,13 +1685,13 @@ int __ceph_mdsc_send_cap(struct ceph_mds_client *mdsc,
        }
 
        send_cap_ack(mdsc, ceph_vino(inode).ino,
-                    keep, wanted, seq,
+                    op, keep, wanted, seq,
                     size, max_size, &mtime, &atime, time_warp_seq,
                     follows, session->s_mds);
 
        if (wake)
                wake_up(&ci->i_cap_wq);
-       if (wanted == 0)
+       if (wanted == 0 && !flush_snap)
                iput(inode);  /* removed cap */
 
        return removed_last;
@@ -1702,7 +1714,7 @@ static void check_delayed_caps(struct ceph_mds_client *mdsc)
                list_del_init(&ci->i_cap_delay_list);
                spin_unlock(&mdsc->cap_delay_lock);
                dout(10, "check_delayed_caps on %p\n", &ci->vfs_inode);
-               ceph_check_caps(ci, 1);
+               ceph_check_caps(ci, 1, 0);
                iput(&ci->vfs_inode);
        }
        spin_unlock(&mdsc->cap_delay_lock);
@@ -1736,10 +1748,165 @@ static void flush_write_caps(struct ceph_mds_client *mdsc,
                        used = wanted = 0;
                }
 
-               __ceph_mdsc_send_cap(mdsc, session, cap, used, wanted, 1);
+               __ceph_mdsc_send_cap(mdsc, session, cap, used, wanted, 1, 0);
+       }
+}
+
+
+/*
+ * snap
+ */
+
+void ceph_mdsc_handle_snap(struct ceph_mds_client *mdsc,
+                          struct ceph_msg *msg)
+{
+       struct super_block *sb = mdsc->client->sb;
+       struct ceph_client *client = ceph_sb_to_client(sb);
+       struct ceph_mds_session *session;
+       int mds = le32_to_cpu(msg->hdr.src.name.num);
+       u64 split;
+       int op;
+       int trace_len;
+       struct ceph_snaprealm *realm = 0;
+       void *p = msg->front.iov_base;
+       void *e = p + msg->front.iov_len;
+       struct ceph_mds_snap_head *h;
+       int num_split_inos, num_split_realms;
+       __le64 *split_inos = 0, *split_realms = 0;
+       int i;
+
+       /* decode */
+       if (msg->front.iov_len < sizeof(*h))
+               goto bad;
+       h = p;
+       op = le32_to_cpu(h->op);
+       split = le64_to_cpu(h->split);
+       trace_len = le32_to_cpu(h->trace_len);
+       num_split_inos = le32_to_cpu(h->num_split_inos);
+       num_split_realms = le32_to_cpu(h->num_split_realms);
+       p += sizeof(*h);
+
+       dout(10, "handle_snap from mds%d op %s split %llx tracelen %d\n", mds,
+            ceph_snap_op_name(op), split, trace_len);
+
+       /* find session */
+       spin_lock(&mdsc->lock);
+       session = __get_session(&client->mdsc, mds);
+       spin_unlock(&mdsc->lock);
+       if (!session) {
+               dout(10, "WTF, got snap but no session for mds%d\n", mds);
+               return;
+       }
+
+       mutex_lock(&session->s_mutex);
+       session->s_seq++;
+       mutex_unlock(&session->s_mutex);
+
+       if (op == CEPH_SNAP_OP_SPLIT) {
+               struct ceph_mds_snap_realm *ri;
+
+               split_inos = p;
+               p += sizeof(u64) * num_split_inos;
+               split_realms = p;
+               p += sizeof(u64) * num_split_realms;
+               ceph_decode_need(&p, e, sizeof(*ri), bad);
+               ri = p;
+
+               realm = ceph_get_snaprealm(client, split);
+               if (IS_ERR(realm))
+                       goto out;
+               dout(10, "splitting snaprealm %llx %p\n", realm->ino, realm);
+
+               for (i = 0; i < num_split_inos; i++) {
+                       struct ceph_vino vino = {
+                               .ino = le64_to_cpu(split_inos[i]),
+                               .snap = CEPH_NOSNAP,
+                       };
+                       struct inode *inode = ceph_find_inode(sb, vino);
+                       struct ceph_inode_info *ci;
+                       if (!inode)
+                               continue;
+                       ci = ceph_inode(inode);
+                       spin_lock(&inode->i_lock);
+                       if (!ci->i_snaprealm)
+                               goto skip_inode;
+                       if (ci->i_snaprealm->created > le64_to_cpu(ri->created)) {
+                               dout(15, " leaving %p in newer realm %llx %p\n",
+                                    inode, ci->i_snaprealm->ino,
+                                    ci->i_snaprealm);
+                               goto skip_inode;
+                       }
+                       dout(15, " will move %p to split realm %llx %p\n",
+                            inode, realm->ino, realm);
+                       /*
+                        * remove from list, but don't re-add yet.  we
+                        * don't want the caps to be flushed (again) by
+                        * ceph_update_snap_trace below.
+                        */
+                       list_del_init(&ci->i_snaprealm_item);
+                       spin_unlock(&inode->i_lock);
+
+                       ceph_check_caps(ci, 0, 1);
+
+                       iput(inode);
+                       continue;
+
+               skip_inode:
+                       spin_unlock(&inode->i_lock);
+                       iput(inode);
+               }
+
+               for (i = 0; i < num_split_realms; i++) {
+                       struct ceph_snaprealm *child =
+                               ceph_get_snaprealm(client,
+                                          le64_to_cpu(split_realms[i]));
+                       if (!child)
+                               continue;
+                       ceph_adjust_snaprealm_parent(client, child, realm->ino);
+                       ceph_put_snaprealm(child);
+               }
+
+               ceph_put_snaprealm(realm);
+       }
+
+       realm = ceph_update_snap_trace(client, p, e,
+                                      op != CEPH_SNAP_OP_DESTROY);
+       if (IS_ERR(realm))
+               goto bad;
+
+       if (op == CEPH_SNAP_OP_SPLIT) {
+               for (i = 0; i < num_split_inos; i++) {
+                       struct ceph_vino vino = {
+                               .ino = le64_to_cpu(split_inos[i]),
+                               .snap = CEPH_NOSNAP,
+                       };
+                       struct inode *inode = ceph_find_inode(sb, vino);
+                       struct ceph_inode_info *ci;
+                       if (!inode)
+                               continue;
+                       ci = ceph_inode(inode);
+                       spin_lock(&inode->i_lock);
+                       /* _now_ add to newly split realm */
+                       ceph_put_snaprealm(ci->i_snaprealm);
+                       list_add(&ci->i_snaprealm_item,
+                                &realm->inodes_with_caps);
+                       ci->i_snaprealm = realm;
+                       realm->nref++;
+                       spin_unlock(&inode->i_lock);
+               }
        }
+
+       ceph_put_snaprealm(realm);
+       return;
+
+bad:
+       derr(10, "corrupt snap message from mds%d\n", mds);
+out:
+       return;
 }
 
+
+
 static int close_session(struct ceph_mds_client *mdsc,
                         struct ceph_mds_session *session)
 {
index 223b50f3d48c212e9cabf5045d3336753b85cccb..f83f0f4bc003365f6b113a2b44c9237f30821921 100644 (file)
@@ -151,6 +151,8 @@ extern void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc,
 
 extern void ceph_mdsc_handle_caps(struct ceph_mds_client *mdsc,
                                  struct ceph_msg *msg);
+extern void ceph_mdsc_handle_snap(struct ceph_mds_client *mdsc,
+                                 struct ceph_msg *msg);
 
 extern void ceph_mdsc_handle_lease(struct ceph_mds_client *mdsc,
                                   struct ceph_msg *msg);
@@ -171,7 +173,8 @@ extern void ceph_mdsc_put_request(struct ceph_mds_request *req);
 extern int __ceph_mdsc_send_cap(struct ceph_mds_client *mdsc,
                                struct ceph_mds_session *session,
                                struct ceph_inode_cap *cap,
-                               int used, int wanted, int cancel_work);
+                               int used, int wanted,
+                               int cancel_work, int flush_snap);
 extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc);
 
 #endif
index 9cbf41d926b7cbdf39ad3c65a93dd35e35820486..720c5eeff1552a9698eab4ec1a13b012071b1fd2 100644 (file)
@@ -51,7 +51,7 @@ struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int op,
        size_t size = sizeof(struct ceph_osd_request_head);
 
        if (snapc)
-               size += sizeof(u64) + snapc->num_snaps;
+               size += sizeof(u64) * snapc->num_snaps;
        req = ceph_msg_new(CEPH_MSG_OSD_OP, size, 0, 0, 0);
        if (IS_ERR(req))
                return req;
index 763ad5679f9f77031109778719ae1250d19824f3..9de4950c338c9a2e100163d42bea98d98667b39a 100644 (file)
@@ -86,7 +86,7 @@ static int cmpu64_rev(const void *a, const void *b)
        return 0;
 }
 
-int ceph_snaprealm_build_context(struct ceph_snaprealm *realm)
+int ceph_build_snap_context(struct ceph_snaprealm *realm)
 {
        struct ceph_snaprealm *parent = realm->parent;
        struct ceph_snap_context *sc;
@@ -96,13 +96,26 @@ int ceph_snaprealm_build_context(struct ceph_snaprealm *realm)
 
        if (parent) {
                if (!parent->cached_context) {
-                       err = ceph_snaprealm_build_context(parent);
+                       err = ceph_build_snap_context(parent);
                        if (err)
                                goto fail;
                }
                num += parent->cached_context->num_snaps;
        }
 
+       /* do i need to update? */
+       if (realm->cached_context && realm->cached_context->seq <= realm->seq &&
+           (!parent ||
+            realm->cached_context->seq <= parent->cached_context->seq)) {
+               dout(10, "build_snap_context %llx %p: %p seq %lld (%d snaps)"
+                    " (unchanged)\n",
+                    realm->ino, realm, realm->cached_context, 
+                    realm->cached_context->seq,
+                    realm->cached_context->num_snaps);
+               return 0;
+       }
+
+       /* build new */
        err = -ENOMEM;
        sc = kzalloc(sizeof(*sc) + num*sizeof(u64), GFP_NOFS);
        if (!sc)
@@ -130,8 +143,8 @@ int ceph_snaprealm_build_context(struct ceph_snaprealm *realm)
 
        sort(sc->snaps, num, sizeof(u64), cmpu64_rev, NULL);
        sc->num_snaps = num;
-       dout(10, "snaprealm_build_context %llx %p : seq %lld %d snaps\n",
-            realm->ino, realm, sc->seq, sc->num_snaps);
+       dout(10, "build_snap_context %llx %p: %p seq %lld (%d snaps)\n",
+            realm->ino, realm, sc, sc->seq, sc->num_snaps);
 
        if (realm->cached_context)
                ceph_put_snap_context(realm->cached_context);
@@ -143,7 +156,7 @@ fail:
                ceph_put_snap_context(realm->cached_context);
                realm->cached_context = 0;
        }
-       derr(0, "snaprealm_build_context %llx %p fail %d\n", realm->ino,
+       derr(0, "build_snap_context %llx %p fail %d\n", realm->ino,
             realm, err);
        return err;
 }
@@ -154,7 +167,7 @@ void ceph_rebuild_snaprealms(struct ceph_snaprealm *realm)
        struct ceph_snaprealm *child;
 
        dout(10, "rebuild_snaprealms %llx %p\n", realm->ino, realm);
-       ceph_snaprealm_build_context(realm);
+       ceph_build_snap_context(realm);
 
        list_for_each(p, &realm->children) {
                child = list_entry(p, struct ceph_snaprealm, child_item);
@@ -190,6 +203,7 @@ struct ceph_snaprealm *ceph_update_snap_trace(struct ceph_client *client,
        struct ceph_snaprealm *realm, *first = 0;
        int invalidate = 0;
 
+       dout(10, "update_snap_trace must_flush=%d\n", must_flush);
 more:
        ceph_decode_need(&p, e, sizeof(*ri), bad);
        ri = p;
@@ -210,10 +224,17 @@ more:
        }
 
        if (le64_to_cpu(ri->seq) > realm->seq) {
+               struct list_head *p;
                dout(10, "update_snap_trace updating %llx %p %lld -> %lld\n",
                     realm->ino, realm, realm->seq, le64_to_cpu(ri->seq));
                
-               // flush caps... and data?
+               list_for_each(p, &realm->inodes_with_caps) {
+                       struct ceph_inode_info *ci =
+                               list_entry(p, struct ceph_inode_info,
+                                          i_snaprealm_item);
+                       ceph_check_caps(ci, 0, 1);
+               }
+               dout(20, "update_snap_trace cap flush done\n");
 
        } else
                dout(10, "update_snap_trace %llx %p seq %lld unchanged\n",
@@ -240,6 +261,9 @@ more:
        } else if (!realm->cached_context)
                invalidate = 1;
 
+       dout(10, "done with %llx %p, invalidated=%d, %p %p\n", realm->ino,
+            realm, invalidate, p, e);
+
        if (p >= e && invalidate)
                ceph_rebuild_snaprealms(realm);
 
@@ -253,7 +277,7 @@ bad:
        err = -EINVAL;
 fail:
        derr(10, "update_snap_trace error %d\n", err);
-       return 0;       
+       return ERR_PTR(err);    
 }
 
 
index 73e13aec97f2015a308f4b84c119dd803039c611..178fb510e161084890926132483f4849f5e5bd90 100644 (file)
@@ -310,6 +310,7 @@ const char *ceph_msg_type_name(int type)
        case CEPH_MSG_CLIENT_REQUEST_FORWARD: return "client_request_forward";
        case CEPH_MSG_CLIENT_REPLY: return "client_reply";
        case CEPH_MSG_CLIENT_CAPS: return "client_caps";
+       case CEPH_MSG_CLIENT_SNAP: return "client_snap";
        case CEPH_MSG_CLIENT_LEASE: return "client_lease";
        case CEPH_MSG_OSD_GETMAP: return "osd_getmap";
        case CEPH_MSG_OSD_MAP: return "osd_map";
@@ -803,6 +804,9 @@ void ceph_dispatch(void *p, struct ceph_msg *msg)
        case CEPH_MSG_CLIENT_CAPS:
                ceph_mdsc_handle_caps(&client->mdsc, msg);
                break;
+       case CEPH_MSG_CLIENT_SNAP:
+               ceph_mdsc_handle_snap(&client->mdsc, msg);
+               break;
        case CEPH_MSG_CLIENT_LEASE:
                ceph_mdsc_handle_lease(&client->mdsc, msg);
                break;
index 5e1e3e942257881f44460fe92ac05a7c2178b470..6a29aa79629d8a2901ce99eaad3b74ded767682a 100644 (file)
@@ -176,6 +176,7 @@ struct ceph_inode_cap {
        int implemented;  /* what we've implemneted (for tracking revocation) */
        u32 seq, mseq, gen;
        int flags;  /* stale, etc.? */
+       u64 flushed_snap;
        struct ceph_inode_info *ci;
        struct list_head ci_caps;       /* per-ci caplist */
        struct ceph_mds_session *session;
@@ -449,6 +450,10 @@ struct ceph_snap_context {
 
 static inline struct ceph_snap_context *ceph_get_snap_context(struct ceph_snap_context *sc)
 {
+       /*
+       printk("get_snap_context %p %d -> %d\n", sc, atomic_read(&sc->nref),
+              atomic_read(&sc->nref)+1);
+       */
        atomic_inc(&sc->nref);
        return sc;
 }
@@ -457,8 +462,14 @@ static inline void ceph_put_snap_context(struct ceph_snap_context *sc)
 {
        if (!sc)
                return;
-       if (atomic_dec_and_test(&sc->nref))
+       /*
+       printk("put_snap_context %p %d -> %d\n", sc, atomic_read(&sc->nref),
+              atomic_read(&sc->nref)-1);
+       */
+       if (atomic_dec_and_test(&sc->nref)) {
+               /*printk(" deleting snap_context %p\n", sc);*/
                kfree(sc);
+       }
 }
 
 struct ceph_snaprealm {
@@ -493,7 +504,7 @@ extern int ceph_adjust_snaprealm_parent(struct ceph_client *client,
 extern struct ceph_snaprealm *ceph_update_snap_trace(struct ceph_client *client,
                                                     void *p, void *e,
                                                     int must_flush);
-extern int ceph_snaprealm_build_context(struct ceph_snaprealm *realm);
+extern int ceph_build_snap_context(struct ceph_snaprealm *realm);
 extern void ceph_invalidate_snaprealm(struct ceph_snaprealm *realm);
 
 
@@ -553,7 +564,7 @@ extern int ceph_get_cap_refs(struct ceph_inode_info *ci, int need, int want, int
 extern void ceph_take_cap_refs(struct ceph_inode_info *ci, int got);
 extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
 extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr);
-extern void ceph_check_caps(struct ceph_inode_info *ci, int is_delayed);
+extern void ceph_check_caps(struct ceph_inode_info *ci, int delayed, int flush);
 extern void ceph_inode_set_size(struct inode *inode, loff_t size);
 extern void ceph_inode_writeback(struct work_struct *work);
 extern void ceph_vmtruncate_work(struct work_struct *work);
index c3fe5caefd79e72ddd4c2c17e973fa912f418eb3..24263df1bc0a7d7cf08336eed1e66468447b9d43 100644 (file)
@@ -35,7 +35,8 @@ struct MClientSnap : public Message {
   void print(ostream& out) {
     out << "client_snap(" << ceph_snap_op_name(head.op);
     if (head.split)
-      out << " split=" << head.split;
+      out << " split=" << inodeno_t(head.split);
+    out << " tracelen=" << bl.length();
     out << ")";
   }