]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: move caps code into caps.c
authorSage Weil <sage@newdream.net>
Tue, 19 Aug 2008 21:19:55 +0000 (14:19 -0700)
committerSage Weil <sage@newdream.net>
Tue, 19 Aug 2008 21:19:55 +0000 (14:19 -0700)
src/kernel/Makefile
src/kernel/caps.c [new file with mode: 0644]
src/kernel/inode.c
src/kernel/mds_client.c
src/kernel/mds_client.h
src/kernel/super.c
src/kernel/super.h

index 2291090d8b936f6a5756b2ed4234616a88ca8cfc..74ec2c8a0a3ea52e2b39a86f6e8a847c3eed2170 100644 (file)
@@ -6,7 +6,8 @@ ifneq ($(KERNELRELEASE),)
 
 obj-$(CONFIG_CEPH_FS) += ceph.o
 
-ceph-objs := super.o inode.o dir.o file.o addr.o ioctl.o export.o snap.o \
+ceph-objs := super.o inode.o dir.o file.o addr.o ioctl.o \
+       export.o caps.o snap.o \
        messenger.o \
        mds_client.o mdsmap.o \
        mon_client.o \
diff --git a/src/kernel/caps.c b/src/kernel/caps.c
new file mode 100644 (file)
index 0000000..240568a
--- /dev/null
@@ -0,0 +1,1057 @@
+#include <linux/fs.h>
+#include <linux/kernel.h>
+#include <linux/sched.h>
+#include <linux/wait.h>
+
+int ceph_debug_caps = -1;
+#define DOUT_VAR ceph_debug_caps
+#define DOUT_PREFIX "caps: "
+#include "super.h"
+
+#include "decode.h"
+#include "messenger.h"
+
+
+static struct ceph_inode_cap *__get_cap_for_mds(struct inode *inode, int mds)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_inode_cap *cap;
+       struct list_head *p;
+
+       list_for_each(p, &ci->i_caps) {
+               cap = list_entry(p, struct ceph_inode_cap, ci_caps);
+               if (cap->mds == mds)
+                       return cap;
+       }
+       return 0;
+}
+
+int ceph_get_cap_mds(struct inode *inode)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_inode_cap *cap;
+       int mds = -1;
+
+       spin_lock(&inode->i_lock);
+       if (!list_empty(&ci->i_caps)) {
+               cap = list_first_entry(&ci->i_caps, struct ceph_inode_cap,
+                                      ci_caps);
+               mds = cap->mds;
+       }
+       spin_unlock(&inode->i_lock);
+       return mds;
+}
+
+/*
+ * caller should hold session snap_rwsem, s_mutex.
+ *
+ * @fmode can be negative, in which case it is ignored.
+ */
+int ceph_add_cap(struct inode *inode,
+                struct ceph_mds_session *session,
+                int fmode, unsigned issued,
+                unsigned seq, unsigned mseq,
+                void *snapblob, int snapblob_len)
+{
+       int mds = session->s_mds;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_inode_cap *cap, *new_cap = 0;
+       int i;
+       int is_new = 0;
+       struct ceph_snaprealm *realm = 0;
+       struct ceph_mds_client *mdsc = &ceph_inode_to_client(inode)->mdsc;
+
+       if (snapblob_len)
+               realm = ceph_update_snap_trace(mdsc,
+                                              snapblob, snapblob+snapblob_len,
+                                              0);
+
+       dout(10, "ceph_add_cap on %p mds%d cap %d seq %d\n", inode,
+            session->s_mds, issued, seq);
+retry:
+       spin_lock(&inode->i_lock);
+       cap = __get_cap_for_mds(inode, mds);
+       if (!cap) {
+               for (i = 0; i < STATIC_CAPS; i++)
+                       if (ci->i_static_caps[i].mds == -1) {
+                               cap = &ci->i_static_caps[i];
+                               break;
+                       }
+               if (!cap) {
+                       if (new_cap) {
+                               cap = new_cap;
+                               new_cap = 0;
+                       } else {
+                               spin_unlock(&inode->i_lock);
+                               new_cap = kmalloc(sizeof(*cap), GFP_NOFS);
+                               if (new_cap == 0)
+                                       return -ENOMEM;
+                               goto retry;
+                       }
+               }
+
+               is_new = 1;    /* grab inode later */
+               cap->issued = cap->implemented = 0;
+               cap->mds = mds;
+               cap->flags = 0;
+               cap->flushed_snap = 0;
+
+               cap->ci = ci;
+               list_add(&cap->ci_caps, &ci->i_caps);
+
+               /* add to session cap list */
+               cap->session = session;
+               list_add(&cap->session_caps, &session->s_caps);
+               session->s_nr_caps++;
+
+               /* clear out old exporting info? */
+               if (ci->i_cap_exporting_mds == mds) {
+                       ci->i_cap_exporting_issued = 0;
+                       ci->i_cap_exporting_mseq = 0;
+                       ci->i_cap_exporting_mds = -1;
+               }
+       }
+       if (!ci->i_snaprealm) {
+               ci->i_snaprealm = realm;
+               list_add(&ci->i_snaprealm_item, &realm->inodes_with_caps);
+       } else
+               ceph_put_snaprealm(realm);
+
+       dout(10, "add_cap inode %p (%llx.%llx) cap %xh now %xh seq %d mds%d\n",
+            inode, ceph_vinop(inode), issued, issued|cap->issued, seq, mds);
+       cap->issued |= issued;
+       cap->implemented |= issued;
+       cap->seq = seq;
+       cap->mseq = mseq;
+       cap->gen = session->s_cap_gen;
+       if (fmode >= 0)
+               __ceph_get_fmode(ci, fmode);
+       spin_unlock(&inode->i_lock);
+       if (is_new)
+               igrab(inode);
+       if (new_cap)
+               kfree(new_cap);
+       return 0;
+}
+
+int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
+{
+       int have = ci->i_snap_caps;
+       struct ceph_inode_cap *cap;
+       struct list_head *p;
+       u32 gen;
+       unsigned long ttl;
+
+       list_for_each(p, &ci->i_caps) {
+               cap = list_entry(p, struct ceph_inode_cap, ci_caps);
+
+               spin_lock(&cap->session->s_cap_lock);
+               gen = cap->session->s_cap_gen;
+               ttl = cap->session->s_cap_ttl;
+               spin_unlock(&cap->session->s_cap_lock);
+
+               if (cap->gen < gen || time_after_eq(jiffies, ttl)) {
+                       dout(30, "__ceph_caps_issued %p cap %p issued %d "
+                            "but STALE (gen %u vs %u)\n", &ci->vfs_inode,
+                            cap, cap->issued, cap->gen, gen);
+                       continue;
+               }
+               dout(30, "__ceph_caps_issued %p cap %p issued %d\n",
+                    &ci->vfs_inode, cap, cap->issued);
+               have |= cap->issued;
+               if (implemented)
+                       *implemented |= cap->implemented;
+       }
+       return have;
+}
+
+/*
+ * caller should hold i_lock, snap_rwsem, and session s_mutex.
+ * returns true if this is the last cap.  if so, caller should iput.
+ */
+int __ceph_remove_cap(struct ceph_inode_cap *cap)
+{
+       struct ceph_mds_session *session = cap->session;
+       struct ceph_inode_info *ci = cap->ci;
+
+       dout(20, "__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode);
+
+       /* remove from session list */
+       list_del_init(&cap->session_caps);
+       session->s_nr_caps--;
+
+       /* remove from inode list */
+       list_del_init(&cap->ci_caps);
+       cap->session = 0;
+       cap->mds = -1;  /* mark unused */
+
+       if (cap < ci->i_static_caps ||
+           cap >= ci->i_static_caps + STATIC_CAPS)
+               kfree(cap);
+
+       if (list_empty(&ci->i_caps)) {
+               list_del_init(&ci->i_snaprealm_item);
+               return 1;
+       }
+       return 0;
+}
+
+/*
+ * caller should hold snap_rwsem and session s_mutex.
+ */
+void ceph_remove_cap(struct ceph_inode_cap *cap)
+{
+       struct inode *inode = &cap->ci->vfs_inode;
+       int was_last;
+
+       spin_lock(&inode->i_lock);
+       was_last = __ceph_remove_cap(cap);
+       spin_unlock(&inode->i_lock);
+       if (was_last)
+               iput(inode);
+}
+
+/*
+ * caller holds i_lock
+ *    -> client->cap_delay_lock
+ */
+void __ceph_cap_delay_requeue(struct ceph_mds_client *mdsc,
+                             struct ceph_inode_info *ci)
+{
+       ci->i_hold_caps_until = round_jiffies(jiffies + HZ * 5);
+       dout(10, "__cap_delay_requeue %p at %lu\n", &ci->vfs_inode,
+            ci->i_hold_caps_until);
+       spin_lock(&mdsc->cap_delay_lock);
+       if (list_empty(&ci->i_cap_delay_list))
+               igrab(&ci->vfs_inode);
+       else
+               list_del_init(&ci->i_cap_delay_list);
+       list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
+       spin_unlock(&mdsc->cap_delay_lock);
+}
+
+
+/*
+ * examine currently used, wanted versus held caps.
+ *  release, ack revoked caps to mds as appropriate.
+ * @is_delayed if caller just dropped a cap ref, and we probably want to delay
+ */
+void ceph_check_caps(struct ceph_inode_info *ci, int is_delayed, int flush_snap)
+{
+       struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode);
+       struct ceph_mds_client *mdsc = &client->mdsc;
+       struct inode *inode = &ci->vfs_inode;
+       struct ceph_inode_cap *cap;
+       struct list_head *p;
+       int wanted, used;
+       struct ceph_mds_session *session = 0;  /* if non-NULL, i hold s_mutex */
+       int took_snap_rwsem = 0;             /* true if mdsc->snap_rwsem held */
+
+retry:
+       spin_lock(&inode->i_lock);
+       wanted = __ceph_caps_wanted(ci);
+       used = __ceph_caps_used(ci);
+       dout(10, "check_caps %p wanted %d used %d issued %d\n", inode,
+            wanted, used, __ceph_caps_issued(ci, 0));
+
+       if (!is_delayed)
+               __ceph_cap_delay_requeue(mdsc, ci);
+
+       list_for_each(p, &ci->i_caps) {
+               int revoking;
+               cap = list_entry(p, struct ceph_inode_cap, ci_caps);
+
+               /* note: no side-effects allowed, until we take s_mutex */
+               revoking = cap->implemented & ~cap->issued;
+
+               if (ci->i_wanted_max_size > ci->i_max_size &&
+                   ci->i_wanted_max_size > ci->i_requested_max_size)
+                       goto ack;
+
+               /* completed revocation? */
+               if (revoking && (revoking && used) == 0) {
+                       dout(10, "completed revocation of %d\n",
+                            cap->implemented & ~cap->issued);
+                       goto ack;
+               }
+
+               /* approaching file_max? */
+               if ((cap->issued & CEPH_CAP_WR) &&
+                   (inode->i_size << 1) >= ci->i_max_size &&
+                   (ci->i_reported_size << 1) < ci->i_max_size) {
+                       dout(10, "i_size approaching max_size\n");
+                       goto ack;
+               }
+
+               /* flush snap? */
+               if (flush_snap &&
+                   (cap->issued & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER))) {
+                       if (cap->flushed_snap >=
+                           ci->i_snaprealm->cached_context->seq) {
+                               dout(10, "flushed_snap %llu >= seq %lld, "
+                                    "not flushing mds%d\n",
+                                    cap->flushed_snap,
+                                    ci->i_snaprealm->cached_context->seq,
+                                    cap->session->s_mds);
+                               continue;  /* already flushed for this snap */
+                       }
+                       goto ack;
+               }
+
+               if ((cap->issued & ~wanted) == 0)
+                       continue;     /* nothing extra, all good */
+
+               if (time_before(jiffies, ci->i_hold_caps_until)) {
+                       /* delaying cap release for a bit */
+                       dout(30, "delaying cap release\n");
+                       continue;
+               }
+
+ack:
+               /* take s_mutex, one way or another */
+               if (session && session != cap->session) {
+                       dout(30, "oops, wrong session %p mutex\n", session);
+                       mutex_unlock(&session->s_mutex);
+                       session = 0;
+               }
+               /* take snap_rwsem before session mutex */
+               if (!flush_snap && !took_snap_rwsem) {
+                       if (down_write_trylock(&mdsc->snap_rwsem) == 0) {
+                               dout(10, "inverting snap/in locks on %p\n",
+                                    inode);
+                               spin_unlock(&inode->i_lock);
+                               down_write(&mdsc->snap_rwsem);
+                               took_snap_rwsem = 1;
+                               goto retry;
+                       }
+                       took_snap_rwsem = 1;
+               }
+               if (!session) {
+                       session = cap->session;
+                       if (mutex_trylock(&session->s_mutex) == 0) {
+                               dout(10, "inverting session/ino locks on %p\n",
+                                    session);
+                               spin_unlock(&inode->i_lock);
+                               mutex_lock(&session->s_mutex);
+                               goto retry;
+                       }
+               }
+
+               /* send_cap drops i_lock */
+               __ceph_mdsc_send_cap(mdsc, session, cap,
+                                    used, wanted, flush_snap);
+
+               goto retry; /* retake i_lock and restart our cap scan. */
+       }
+
+       /* okay */
+       spin_unlock(&inode->i_lock);
+
+       if (session)
+               mutex_unlock(&session->s_mutex);
+       if (took_snap_rwsem)
+               up_write(&mdsc->snap_rwsem);
+}
+
+
+/*
+ * cap refs
+ */
+
+static void __take_cap_refs(struct ceph_inode_info *ci, int got)
+{
+       if (got & CEPH_CAP_RD)
+               ci->i_rd_ref++;
+       if (got & CEPH_CAP_RDCACHE)
+               ci->i_rdcache_ref++;
+       if (got & CEPH_CAP_WR)
+               ci->i_wr_ref++;
+       if (got & CEPH_CAP_WRBUFFER) {
+               atomic_inc(&ci->i_wrbuffer_ref);
+               dout(30, "__take_cap_refs %p wrbuffer %d -> %d (?)\n",
+                    &ci->vfs_inode, atomic_read(&ci->i_wrbuffer_ref)-1,
+                    atomic_read(&ci->i_wrbuffer_ref));
+       }
+}
+
+void ceph_take_cap_refs(struct ceph_inode_info *ci, int got)
+{
+       dout(30, "take_cap_refs on %p taking %d\n", &ci->vfs_inode, got);
+       spin_lock(&ci->vfs_inode.i_lock);
+       __take_cap_refs(ci, got);
+       spin_unlock(&ci->vfs_inode.i_lock);
+}
+
+int ceph_get_cap_refs(struct ceph_inode_info *ci, int need, int want, int *got,
+                     loff_t endoff)
+{
+       int ret = 0;
+       int have, implemented;
+
+       dout(30, "get_cap_refs on %p need %d want %d\n", &ci->vfs_inode,
+            need, want);
+       spin_lock(&ci->vfs_inode.i_lock);
+       if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) {
+               dout(20, "get_cap_refs endoff %llu > max_size %llu\n",
+                    endoff, ci->i_max_size);
+               goto sorry;
+       }
+       have = __ceph_caps_issued(ci, &implemented);
+       if ((have & need) == need) {
+               /*
+                * look at (implemented & ~have & not) so that we keep waiting
+                * on transition from wanted -> needed caps.  this is needed
+                * for WRBUFFER|WR -> WR to avoid a new WR sync write from
+                * going before a prior buffered writeback happens.
+                */
+               int not = want & ~(have & need);
+               int revoking = implemented & ~have;
+               dout(30, "get_cap_refs have %d but not %d (revoking %d)\n",
+                    have, not, revoking);
+               if ((revoking & not) == 0) {
+                       *got = need | (have & want);
+                       __take_cap_refs(ci, *got);
+                       ret = 1;
+               }
+       } else
+               dout(30, "get_cap_refs have %d needed %d\n", have, need);
+sorry:
+       spin_unlock(&ci->vfs_inode.i_lock);
+       dout(30, "get_cap_refs on %p ret %d got %d\n", &ci->vfs_inode,
+            ret, *got);
+       return ret;
+}
+
+void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
+{
+       int last = 0;
+
+       spin_lock(&ci->vfs_inode.i_lock);
+       if (had & CEPH_CAP_RD)
+               if (--ci->i_rd_ref == 0)
+                       last++;
+       if (had & CEPH_CAP_RDCACHE)
+               if (--ci->i_rdcache_ref == 0)
+                       last++;
+       if (had & CEPH_CAP_WR)
+               if (--ci->i_wr_ref == 0)
+                       last++;
+       if (had & CEPH_CAP_WRBUFFER) {
+               if (atomic_dec_and_test(&ci->i_wrbuffer_ref))
+                       last++;
+               dout(30, "put_cap_refs %p wrbuffer %d -> %d (?)\n",
+                    &ci->vfs_inode, atomic_read(&ci->i_wrbuffer_ref)+1,
+                    atomic_read(&ci->i_wrbuffer_ref));
+       }
+       spin_unlock(&ci->vfs_inode.i_lock);
+
+       dout(30, "put_cap_refs on %p had %d %s\n", &ci->vfs_inode, had,
+            last ? "last":"");
+
+       if (last)
+               ceph_check_caps(ci, 0, 0);
+}
+
+void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr)
+{
+       int was_last;
+       int v;
+
+       spin_lock(&ci->vfs_inode.i_lock);
+       was_last = atomic_sub_and_test(nr, &ci->i_wrbuffer_ref);
+       v = atomic_read(&ci->i_wrbuffer_ref);
+       spin_unlock(&ci->vfs_inode.i_lock);
+
+       dout(30, "put_wrbuffer_cap_refs on %p %d -> %d (?)%s\n",
+            &ci->vfs_inode, v+nr, v, was_last ? " LAST":"");
+       WARN_ON(v < 0);
+
+       if (was_last)
+               ceph_check_caps(ci, 0, 0);
+}
+
+
+
+
+static void send_cap(struct ceph_mds_client *mdsc, __u64 ino, int op,
+                    int caps, int wanted, __u64 seq, __u64 mseq,
+                    __u64 size, __u64 max_size,
+                    struct timespec *mtime, struct timespec *atime,
+                    u64 time_warp_seq, u64 follows, int mds)
+{
+       struct ceph_mds_caps *fc;
+       struct ceph_msg *msg;
+
+       dout(10, "send_cap %s %llx caps %d wanted %d seq %llu/%llu"
+            " follows %lld size %llu\n", ceph_cap_op_name(op), ino,
+            caps, wanted, seq, mseq, follows, size);
+
+       msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), 0, 0, 0);
+       if (IS_ERR(msg))
+               return;
+
+       fc = msg->front.iov_base;
+
+       memset(fc, 0, sizeof(*fc));
+
+       fc->op = cpu_to_le32(op);
+       fc->seq = cpu_to_le64(seq);
+       fc->migrate_seq = cpu_to_le64(mseq);
+       fc->caps = cpu_to_le32(caps);
+       fc->wanted = cpu_to_le32(wanted);
+       fc->ino = cpu_to_le64(ino);
+       fc->size = cpu_to_le64(size);
+       fc->max_size = cpu_to_le64(max_size);
+       fc->snap_follows = cpu_to_le64(follows);
+       if (mtime)
+               ceph_encode_timespec(&fc->mtime, mtime);
+       if (atime)
+               ceph_encode_timespec(&fc->atime, atime);
+       fc->time_warp_seq = cpu_to_le64(time_warp_seq);
+
+       ceph_send_msg_mds(mdsc, msg, mds);
+}
+
+
+
+
+
+/*
+ * caller holds s_mutex.  NOT snap_rwsem.
+ * return value:
+ *  0 - ok
+ *  1 - send the msg back to mds
+ */
+static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
+                           struct ceph_mds_session *session)
+{
+       struct ceph_inode_cap *cap;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       int mds = session->s_mds;
+       int seq = le32_to_cpu(grant->seq);
+       int newcaps = le32_to_cpu(grant->caps);
+       int used;
+       int issued; /* to me, before */
+       int wanted;
+       int reply = 0;
+       u64 size = le64_to_cpu(grant->size);
+       u64 max_size = le64_to_cpu(grant->max_size);
+       struct timespec mtime, atime, ctime;
+       int wake = 0;
+       int writeback_now = 0;
+       int invalidate = 0;
+
+       dout(10, "handle_cap_grant inode %p ci %p mds%d seq %d\n",
+            inode, ci, mds, seq);
+       dout(10, " size %llu max_size %llu, i_size %llu\n", size, max_size,
+               inode->i_size);
+
+       spin_lock(&inode->i_lock);
+
+       /* do we have this cap? */
+       cap = __get_cap_for_mds(inode, mds);
+       if (!cap) {
+               /*
+                * then ignore.  never reply to cap messages out of turn,
+                * or we'll be mixing up different instances of caps on the
+                * same inode, and confuse the mds.
+                */
+               dout(10, "no cap on %p ino %llx.%llx from mds%d, ignoring\n",
+                    inode, ci->i_vino.ino, ci->i_vino.snap, mds);
+               goto out;
+       }
+       dout(10, " cap %p\n", cap);
+       cap->gen = session->s_cap_gen;
+
+       /* size/ctime/mtime/atime? */
+       issued = __ceph_caps_issued(ci, 0);
+       ceph_decode_timespec(&mtime, &grant->mtime);
+       ceph_decode_timespec(&atime, &grant->atime);
+       ceph_decode_timespec(&ctime, &grant->ctime);
+       ceph_fill_file_bits(inode, issued, le64_to_cpu(grant->time_warp_seq),
+                           size, &ctime, &mtime, &atime);
+
+       /* max size increase? */
+       if (max_size != ci->i_max_size) {
+               dout(10, "max_size %lld -> %llu\n", ci->i_max_size, max_size);
+               ci->i_max_size = max_size;
+               if (max_size >= ci->i_wanted_max_size) {
+                       ci->i_wanted_max_size = 0;  /* reset */
+                       ci->i_requested_max_size = 0;
+               }
+               wake = 1;
+       }
+
+       /* check cap bits */
+       wanted = __ceph_caps_wanted(ci);
+       used = __ceph_caps_used(ci);
+       dout(10, " my wanted = %d, used = %d\n", wanted, used);
+       if (wanted != le32_to_cpu(grant->wanted)) {
+               dout(10, "mds wanted %d -> %d\n", le32_to_cpu(grant->wanted),
+                    wanted);
+               grant->wanted = cpu_to_le32(wanted);
+       }
+
+       cap->seq = seq;
+
+       /* layout may have changed */
+       ci->i_layout = grant->layout;
+
+       /* revocation? */
+       if (cap->issued & ~newcaps) {
+               dout(10, "revocation: %d -> %d\n", cap->issued, newcaps);
+               if ((cap->issued & ~newcaps) & CEPH_CAP_RDCACHE)
+                       invalidate = 1;
+               if ((used & ~newcaps) & CEPH_CAP_WRBUFFER)
+                       writeback_now = 1; /* will delay ack */
+               else {
+                       cap->implemented = newcaps;
+                       /* ack now.  re-use incoming message. */
+                       grant->size = le64_to_cpu(inode->i_size);
+                       grant->max_size = 0;  /* don't re-request */
+                       ceph_encode_timespec(&grant->mtime, &inode->i_mtime);
+                       ceph_encode_timespec(&grant->atime, &inode->i_atime);
+                       grant->time_warp_seq = cpu_to_le64(ci->i_time_warp_seq);
+                       grant->snap_follows =
+                            cpu_to_le64(ci->i_snaprealm->cached_context->seq);
+                       reply = 1;
+                       wake = 1;
+               }
+               cap->issued = newcaps;
+               goto out;
+       }
+
+       /* grant or no-op */
+       if (cap->issued == newcaps) {
+               dout(10, "caps unchanged: %d -> %d\n", cap->issued, newcaps);
+       } else {
+               dout(10, "grant: %d -> %d\n", cap->issued, newcaps);
+               cap->implemented = cap->issued = newcaps;
+               wake = 1;
+       }
+
+out:
+       spin_unlock(&inode->i_lock);
+       if (wake)
+               wake_up(&ci->i_cap_wq);
+       if (writeback_now) {
+               /*
+                * queue inode for writeback; we can't actually call
+                * write_inode_now, writepages, etc. from this
+                * context.
+                */
+               dout(10, "queueing %p for writeback\n", inode);
+               ceph_queue_writeback(ceph_client(inode->i_sb), ci);
+       }
+       if (invalidate)
+               invalidate_mapping_pages(&inode->i_data, 0, -1);
+       return reply;
+}
+
+
+/*
+ * caller hold s_mutex, snap_rwsem.
+ */
+static void __cap_delay_cancel(struct ceph_mds_client *mdsc,
+                              struct ceph_inode_info *ci)
+{
+       dout(10, "__cap_delay_cancel %p\n", &ci->vfs_inode);
+       if (list_empty(&ci->i_cap_delay_list))
+               return;
+       spin_lock(&mdsc->cap_delay_lock);
+       list_del_init(&ci->i_cap_delay_list);
+       spin_unlock(&mdsc->cap_delay_lock);
+       iput(&ci->vfs_inode);
+}
+
+static void handle_cap_released(struct inode *inode,
+                               struct ceph_mds_caps *m,
+                               struct ceph_mds_session *session)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       int seq = le32_to_cpu(m->seq);
+       int removed_last;
+       struct ceph_inode_cap *cap;
+
+       dout(10, "handle_cap_released inode %p ci %p mds%d seq %d\n", inode, ci,
+            session->s_mds, seq);
+
+       spin_lock(&inode->i_lock);
+       cap = __get_cap_for_mds(inode, session->s_mds);
+       BUG_ON(!cap);
+       removed_last = __ceph_remove_cap(cap);
+       if (removed_last)
+               __cap_delay_cancel(&ceph_inode_to_client(inode)->mdsc, ci);
+       spin_unlock(&inode->i_lock);
+       if (removed_last)
+               iput(inode);
+}
+
+
+/*
+ * caller hold s_mutex, snap_rwsem.
+ */
+static void handle_cap_flushedsnap(struct inode *inode,
+                                  struct ceph_mds_caps *m,
+                                  struct ceph_mds_session *session)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       int seq = le32_to_cpu(m->seq);
+
+       dout(10, "handle_cap_flushednsap inode %p ci %p mds%d seq %d\n", inode,
+            ci, session->s_mds, seq);
+
+       /* **** WRITE ME **** */
+}
+
+
+/*
+ * caller hold s_mutex, NOT snap_rwsem.
+ */
+static void handle_cap_trunc(struct inode *inode,
+                            struct ceph_mds_caps *trunc,
+                            struct ceph_mds_session *session)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       int mds = session->s_mds;
+       int seq = le32_to_cpu(trunc->seq);
+       u64 size = le64_to_cpu(trunc->size);
+       int queue_trunc = 0;
+
+       dout(10, "handle_cap_trunc inode %p ci %p mds%d seq %d\n", inode, ci,
+            mds, seq);
+
+       /*
+        * vmtruncate lazily; we can't block on i_mutex in the message
+        * handler path, or we deadlock against osd op replies needed
+        * to complete the writes holding i_lock.  vmtruncate will
+        * also block on page locks held by writes...
+        *
+        * if its an expansion, and there is no truncate pending, we
+        * don't need to truncate.
+        */
+
+       spin_lock(&inode->i_lock);
+       if (ci->i_vmtruncate_to < 0 && size > inode->i_size)
+               dout(10, "clean fwd truncate, no vmtruncate needed\n");
+       else if (ci->i_vmtruncate_to >= 0 && size >= ci->i_vmtruncate_to)
+               dout(10, "trunc to %lld < %lld already queued\n",
+                    ci->i_vmtruncate_to, size);
+       else {
+               /* we need to trunc even smaller */
+               dout(10, "queueing trunc %lld -> %lld\n", inode->i_size, size);
+               ci->i_vmtruncate_to = size;
+               queue_trunc = 1;
+       }
+       i_size_write(inode, size);
+       ci->i_reported_size = size;
+       spin_unlock(&inode->i_lock);
+
+       if (queue_trunc)
+               queue_work(ceph_client(inode->i_sb)->trunc_wq,
+                          &ci->i_vmtruncate_work);
+}
+
+static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
+                             struct ceph_mds_session *session)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       int mds = session->s_mds;
+       unsigned mseq = le32_to_cpu(ex->migrate_seq);
+       struct ceph_inode_cap *cap = 0, *t;
+       struct list_head *p;
+       int was_last = 0;
+
+       dout(10, "handle_cap_export inode %p ci %p mds%d mseq %d\n",
+            inode, ci, mds, mseq);
+
+       spin_lock(&inode->i_lock);
+
+       /* make sure we haven't seen a higher mseq */
+       list_for_each(p, &ci->i_caps) {
+               t = list_entry(p, struct ceph_inode_cap, ci_caps);
+               if (t->mseq > mseq) {
+                       dout(10, " higher mseq on cap from mds%d\n",
+                            t->session->s_mds);
+                       goto out;
+               }
+               if (t->session->s_mds == mds)
+                       cap = t;
+       }
+
+       if (cap) {
+               /* make note, and remove */
+               ci->i_cap_exporting_mds = mds;
+               ci->i_cap_exporting_mseq = mseq;
+               ci->i_cap_exporting_issued = cap->issued;
+               was_last = __ceph_remove_cap(cap);
+       } else
+               WARN_ON(!cap);
+
+out:
+       spin_unlock(&inode->i_lock);
+       if (was_last)
+               iput(inode);
+}
+
+static void handle_cap_import(struct inode *inode, struct ceph_mds_caps *im,
+                             struct ceph_mds_session *session,
+                             void *snaptrace, int snaptrace_len)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       int mds = session->s_mds;
+       unsigned issued = le32_to_cpu(im->caps);
+       unsigned seq = le32_to_cpu(im->seq);
+       unsigned mseq = le32_to_cpu(im->migrate_seq);
+
+       if (ci->i_cap_exporting_mds >= 0 &&
+           ci->i_cap_exporting_mseq < mseq) {
+               dout(10, "handle_cap_import inode %p ci %p mds%d mseq %d"
+                    " - cleared exporting from mds%d\n",
+                    inode, ci, mds, mseq,
+                    ci->i_cap_exporting_mds);
+               ci->i_cap_exporting_issued = 0;
+               ci->i_cap_exporting_mseq = 0;
+               ci->i_cap_exporting_mds = -1;
+       } else {
+               dout(10, "handle_cap_import inode %p ci %p mds%d mseq %d\n",
+                    inode, ci, mds, mseq);
+       }
+
+       ceph_add_cap(inode, session, -1, issued, seq, mseq,
+                    snaptrace, snaptrace_len);
+}
+
+
+
+void ceph_handle_caps(struct ceph_mds_client *mdsc,
+                     struct ceph_msg *msg)
+{
+       struct super_block *sb = mdsc->client->sb;
+       struct ceph_mds_session *session;
+       struct inode *inode;
+       struct ceph_mds_caps *h;
+       int mds = le32_to_cpu(msg->hdr.src.name.num);
+       int op;
+       u32 seq;
+       struct ceph_vino vino;
+       u64 size, max_size;
+
+       dout(10, "handle_caps from mds%d\n", mds);
+
+       /* decode */
+       if (msg->front.iov_len < sizeof(*h))
+               goto bad;
+       h = msg->front.iov_base;
+       op = le32_to_cpu(h->op);
+       vino.ino = le64_to_cpu(h->ino);
+       vino.snap = CEPH_NOSNAP;
+       seq = le32_to_cpu(h->seq);
+       size = le64_to_cpu(h->size);
+       max_size = le64_to_cpu(h->max_size);
+
+       /* find session */
+       mutex_lock(&mdsc->mutex);
+       session = __ceph_get_mds_session(mdsc, mds);
+       if (session)
+               down_write(&mdsc->snap_rwsem);
+       mutex_unlock(&mdsc->mutex);
+       if (!session) {
+               dout(10, "WTF, got cap but no session for mds%d\n", mds);
+               return;
+       }
+
+       mutex_lock(&session->s_mutex);
+       session->s_seq++;
+
+       /* lookup ino */
+       inode = ceph_find_inode(sb, vino);
+       dout(20, "op %d ino %llx inode %p\n", op, vino.ino, inode);
+       if (!inode) {
+               dout(10, "i don't have ino %llx, sending release\n", vino.ino);
+               send_cap(mdsc, vino.ino, CEPH_CAP_OP_RELEASE, 0, 0, seq,
+                        size, 0, 0, 0, 0, 0, 0, mds);
+               goto no_inode;
+       }
+
+       switch (op) {
+       case CEPH_CAP_OP_GRANT:
+               up_write(&mdsc->snap_rwsem);
+               if (handle_cap_grant(inode, h, session) == 1) {
+                       dout(10, "sending reply back to mds%d\n", mds);
+                       ceph_msg_get(msg);
+                       ceph_send_msg_mds(mdsc, msg, mds);
+               }
+               break;
+
+       case CEPH_CAP_OP_TRUNC:
+               up_write(&mdsc->snap_rwsem);
+               handle_cap_trunc(inode, h, session);
+               break;
+
+       case CEPH_CAP_OP_RELEASED:
+               handle_cap_released(inode, h, session);
+               up_write(&mdsc->snap_rwsem);
+               break;
+
+       case CEPH_CAP_OP_FLUSHEDSNAP:
+               handle_cap_flushedsnap(inode, h, session);
+               up_write(&mdsc->snap_rwsem);
+               break;
+
+       case CEPH_CAP_OP_EXPORT:
+               handle_cap_export(inode, h, session);
+               up_write(&mdsc->snap_rwsem);
+               break;
+
+       case CEPH_CAP_OP_IMPORT:
+               handle_cap_import(inode, h, session,
+                                 msg->front.iov_base + sizeof(*h),
+                                 le32_to_cpu(h->snap_trace_len));
+               up_write(&mdsc->snap_rwsem);
+               break;
+
+       default:
+               up_write(&mdsc->snap_rwsem);
+               derr(10, "unknown cap op %d %s\n", op, ceph_cap_op_name(op));
+       }
+
+       iput(inode);
+no_inode:
+       mutex_unlock(&session->s_mutex);
+       ceph_put_mds_session(session);
+       return;
+
+bad:
+       derr(10, "corrupt caps message\n");
+       return;
+}
+
+/*
+ * called with i_lock, then drops it.
+ * caller should hold snap_rwsem, s_mutex.
+ *
+ * returns true if we removed the last cap on this inode.
+ */
+int __ceph_mdsc_send_cap(struct ceph_mds_client *mdsc,
+                        struct ceph_mds_session *session,
+                        struct ceph_inode_cap *cap,
+                        int used, int wanted,
+                        int flush_snap)
+{
+       struct ceph_inode_info *ci = cap->ci;
+       struct inode *inode = &ci->vfs_inode;
+       int revoking = cap->implemented & ~cap->issued;
+       int dropping = cap->issued & ~wanted;
+       int keep;
+       u64 seq, mseq, time_warp_seq, follows;
+       u64 size, max_size;
+       struct timespec mtime, atime;
+       int wake = 0;
+       int op = CEPH_CAP_OP_ACK;
+
+       if (flush_snap)
+               op = CEPH_CAP_OP_FLUSHSNAP;
+       else if (wanted == 0)
+               op = CEPH_CAP_OP_RELEASE;
+
+       dout(10, "__send_cap cap %p session %p %d -> %d\n", cap, cap->session,
+            cap->issued, cap->issued & wanted);
+       cap->issued &= wanted;  /* drop bits we don't want */
+
+       if (revoking && (revoking && used) == 0) {
+               cap->implemented = cap->issued;
+               wake = 1;  /* for waiters on wanted -> needed transition */
+       }
+
+       keep = cap->issued;
+       seq = cap->seq;
+       mseq = cap->mseq;
+       size = inode->i_size;
+       ci->i_reported_size = size;
+       max_size = ci->i_wanted_max_size;
+       ci->i_requested_max_size = max_size;
+       mtime = inode->i_mtime;
+       atime = inode->i_atime;
+       time_warp_seq = ci->i_time_warp_seq;
+       follows = ci->i_snaprealm->cached_context->seq;
+       if (flush_snap)
+               cap->flushed_snap = follows; /* so we only flush it once */
+       spin_unlock(&inode->i_lock);
+
+       if (dropping & CEPH_CAP_RDCACHE) {
+               /*
+                * FIXME: this will block if there is a locked page..
+                */
+               dout(20, "invalidating pages on %p\n", inode);
+               invalidate_mapping_pages(&inode->i_data, 0, -1);
+               dout(20, "done invalidating pages on %p\n", inode);
+       }
+
+       send_cap(mdsc, ceph_vino(inode).ino,
+                op, keep, wanted, seq, mseq,
+                size, max_size, &mtime, &atime, time_warp_seq,
+                follows, session->s_mds);
+
+       if (wake)
+               wake_up(&ci->i_cap_wq);
+
+       return 0;
+}
+
+void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
+{
+       struct ceph_inode_info *ci;
+
+       dout(10, "check_delayed_caps\n");
+       while (1) {
+               spin_lock(&mdsc->cap_delay_lock);
+               if (list_empty(&mdsc->cap_delay_list))
+                       break;
+               ci = list_first_entry(&mdsc->cap_delay_list,
+                                     struct ceph_inode_info,
+                                     i_cap_delay_list);
+               if (time_before(jiffies, ci->i_hold_caps_until))
+                       break;
+               list_del_init(&ci->i_cap_delay_list);
+               spin_unlock(&mdsc->cap_delay_lock);
+               dout(10, "check_delayed_caps on %p\n", &ci->vfs_inode);
+               ceph_check_caps(ci, 1, 0);
+               iput(&ci->vfs_inode);
+       }
+       spin_unlock(&mdsc->cap_delay_lock);
+}
+
+void ceph_flush_write_caps(struct ceph_mds_client *mdsc,
+                          struct ceph_mds_session *session,
+                          int purge)
+{
+       struct list_head *p, *n;
+
+       list_for_each_safe (p, n, &session->s_caps) {
+               struct ceph_inode_cap *cap =
+                       list_entry(p, struct ceph_inode_cap, session_caps);
+               struct inode *inode = &cap->ci->vfs_inode;
+               int used, wanted;
+
+               spin_lock(&inode->i_lock);
+               if ((cap->implemented & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) == 0) {
+                       spin_unlock(&inode->i_lock);
+                       continue;
+               }
+
+               used = __ceph_caps_used(cap->ci);
+               wanted = __ceph_caps_wanted(cap->ci);
+
+               if (purge && (used || wanted)) {
+                       derr(0, "residual caps on %p used %d wanted %d s=%llu wrb=%d\n",
+                            inode, used, wanted, inode->i_size,
+                            atomic_read(&cap->ci->i_wrbuffer_ref));
+                       used = wanted = 0;
+               }
+
+               __ceph_mdsc_send_cap(mdsc, session, cap, used, wanted, 0);
+       }
+}
+
+
index ed81dd0942834407d72a9834d702f2a6150dc6a2..768b364a5f688de2dffc83ce8c467e85a3c085ce 100644 (file)
@@ -238,9 +238,9 @@ out:
  * depending on which capabilities/were help, and on the time_warp_seq
  * (which we increment on utimes()).
  */
-static void fill_file_bits(struct inode *inode, int issued, u64 time_warp_seq,
-                          u64 size, struct timespec *ctime,
-                          struct timespec *mtime, struct timespec *atime)
+void ceph_fill_file_bits(struct inode *inode, int issued, u64 time_warp_seq,
+                        u64 size, struct timespec *ctime,
+                        struct timespec *mtime, struct timespec *atime)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        u64 blocks = (size + (1<<9) - 1) >> 9;
@@ -335,8 +335,8 @@ int ceph_fill_inode(struct inode *inode,
        ceph_decode_timespec(&ctime, &info->ctime);
        issued = __ceph_caps_issued(ci, 0);
 
-       fill_file_bits(inode, issued, le64_to_cpu(info->time_warp_seq), size,
-                      &ctime, &mtime, &atime);
+       ceph_fill_file_bits(inode, issued, le64_to_cpu(info->time_warp_seq),
+                           size, &ctime, &mtime, &atime);
 
        inode->i_blkbits = blkbits;
 
@@ -1101,350 +1101,7 @@ retry_lookup:
 }
 
 
-/*
- * capabilities
- */
-
-static struct ceph_inode_cap *__get_cap_for_mds(struct inode *inode, int mds)
-{
-       struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_inode_cap *cap;
-       struct list_head *p;
-
-       list_for_each(p, &ci->i_caps) {
-               cap = list_entry(p, struct ceph_inode_cap, ci_caps);
-               if (cap->mds == mds)
-                       return cap;
-       }
-       return 0;
-}
-
-int ceph_get_cap_mds(struct inode *inode)
-{
-       struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_inode_cap *cap;
-       int mds = -1;
-
-       spin_lock(&inode->i_lock);
-       if (!list_empty(&ci->i_caps)) {
-               cap = list_first_entry(&ci->i_caps, struct ceph_inode_cap,
-                                      ci_caps);
-               mds = cap->mds;
-       }
-       spin_unlock(&inode->i_lock);
-       return mds;
-}
-
-/*
- * caller should hold session snap_rwsem, s_mutex.
- *
- * @fmode can be negative, in which case it is ignored.
- */
-int ceph_add_cap(struct inode *inode,
-                struct ceph_mds_session *session,
-                int fmode, unsigned issued,
-                unsigned seq, unsigned mseq,
-                void *snapblob, int snapblob_len)
-{
-       int mds = session->s_mds;
-       struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_inode_cap *cap, *new_cap = 0;
-       int i;
-       int is_new = 0;
-       struct ceph_snaprealm *realm = 0;
-       struct ceph_mds_client *mdsc = &ceph_inode_to_client(inode)->mdsc;
-
-       if (snapblob_len)
-               realm = ceph_update_snap_trace(mdsc,
-                                              snapblob, snapblob+snapblob_len,
-                                              0);
-
-       dout(10, "ceph_add_cap on %p mds%d cap %d seq %d\n", inode,
-            session->s_mds, issued, seq);
-retry:
-       spin_lock(&inode->i_lock);
-       cap = __get_cap_for_mds(inode, mds);
-       if (!cap) {
-               for (i = 0; i < STATIC_CAPS; i++)
-                       if (ci->i_static_caps[i].mds == -1) {
-                               cap = &ci->i_static_caps[i];
-                               break;
-                       }
-               if (!cap) {
-                       if (new_cap) {
-                               cap = new_cap;
-                               new_cap = 0;
-                       } else {
-                               spin_unlock(&inode->i_lock);
-                               new_cap = kmalloc(sizeof(*cap), GFP_NOFS);
-                               if (new_cap == 0)
-                                       return -ENOMEM;
-                               goto retry;
-                       }
-               }
-
-               is_new = 1;    /* grab inode later */
-               cap->issued = cap->implemented = 0;
-               cap->mds = mds;
-               cap->flags = 0;
-               cap->flushed_snap = 0;
-
-               cap->ci = ci;
-               list_add(&cap->ci_caps, &ci->i_caps);
-
-               /* add to session cap list */
-               cap->session = session;
-               list_add(&cap->session_caps, &session->s_caps);
-               session->s_nr_caps++;
-
-               /* clear out old exporting info? */
-               if (ci->i_cap_exporting_mds == mds) {
-                       ci->i_cap_exporting_issued = 0;
-                       ci->i_cap_exporting_mseq = 0;
-                       ci->i_cap_exporting_mds = -1;
-               }
-       }
-       if (!ci->i_snaprealm) {
-               ci->i_snaprealm = realm;
-               list_add(&ci->i_snaprealm_item, &realm->inodes_with_caps);
-       } else
-               ceph_put_snaprealm(realm);
-
-       dout(10, "add_cap inode %p (%llx.%llx) cap %xh now %xh seq %d mds%d\n",
-            inode, ceph_vinop(inode), issued, issued|cap->issued, seq, mds);
-       cap->issued |= issued;
-       cap->implemented |= issued;
-       cap->seq = seq;
-       cap->mseq = mseq;
-       cap->gen = session->s_cap_gen;
-       if (fmode >= 0)
-               __ceph_get_fmode(ci, fmode);
-       spin_unlock(&inode->i_lock);
-       if (is_new)
-               igrab(inode);
-       if (new_cap)
-               kfree(new_cap);
-       return 0;
-}
-
-int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
-{
-       int have = ci->i_snap_caps;
-       struct ceph_inode_cap *cap;
-       struct list_head *p;
-       u32 gen;
-       unsigned long ttl;
-
-       list_for_each(p, &ci->i_caps) {
-               cap = list_entry(p, struct ceph_inode_cap, ci_caps);
-
-               spin_lock(&cap->session->s_cap_lock);
-               gen = cap->session->s_cap_gen;
-               ttl = cap->session->s_cap_ttl;
-               spin_unlock(&cap->session->s_cap_lock);
 
-               if (cap->gen < gen || time_after_eq(jiffies, ttl)) {
-                       dout(30, "__ceph_caps_issued %p cap %p issued %d "
-                            "but STALE (gen %u vs %u)\n", &ci->vfs_inode,
-                            cap, cap->issued, cap->gen, gen);
-                       continue;
-               }
-               dout(30, "__ceph_caps_issued %p cap %p issued %d\n",
-                    &ci->vfs_inode, cap, cap->issued);
-               have |= cap->issued;
-               if (implemented)
-                       *implemented |= cap->implemented;
-       }
-       return have;
-}
-
-/*
- * caller should hold i_lock, snap_rwsem, and session s_mutex.
- * returns true if this is the last cap.  if so, caller should iput.
- */
-int __ceph_remove_cap(struct ceph_inode_cap *cap)
-{
-       struct ceph_mds_session *session = cap->session;
-       struct ceph_inode_info *ci = cap->ci;
-
-       dout(20, "__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode);
-
-       /* remove from session list */
-       list_del_init(&cap->session_caps);
-       session->s_nr_caps--;
-
-       /* remove from inode list */
-       list_del_init(&cap->ci_caps);
-       cap->session = 0;
-       cap->mds = -1;  /* mark unused */
-
-       if (cap < ci->i_static_caps ||
-           cap >= ci->i_static_caps + STATIC_CAPS)
-               kfree(cap);
-
-       if (list_empty(&ci->i_caps)) {
-               list_del_init(&ci->i_snaprealm_item);
-               return 1;
-       }
-       return 0;
-}
-
-/*
- * caller should hold snap_rwsem and session s_mutex.
- */
-void ceph_remove_cap(struct ceph_inode_cap *cap)
-{
-       struct inode *inode = &cap->ci->vfs_inode;
-       int was_last;
-
-       spin_lock(&inode->i_lock);
-       was_last = __ceph_remove_cap(cap);
-       spin_unlock(&inode->i_lock);
-       if (was_last)
-               iput(inode);
-}
-
-/*
- * caller holds i_lock
- *    -> client->cap_delay_lock
- */
-void __ceph_cap_delay_requeue(struct ceph_mds_client *mdsc,
-                             struct ceph_inode_info *ci)
-{
-       ci->i_hold_caps_until = round_jiffies(jiffies + HZ * 5);
-       dout(10, "__cap_delay_requeue %p at %lu\n", &ci->vfs_inode,
-            ci->i_hold_caps_until);
-       spin_lock(&mdsc->cap_delay_lock);
-       if (list_empty(&ci->i_cap_delay_list))
-               igrab(&ci->vfs_inode);
-       else
-               list_del_init(&ci->i_cap_delay_list);
-       list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
-       spin_unlock(&mdsc->cap_delay_lock);
-}
-
-
-/*
- * examine currently used, wanted versus held caps.
- *  release, ack revoked caps to mds as appropriate.
- * @is_delayed if caller just dropped a cap ref, and we probably want to delay
- */
-void ceph_check_caps(struct ceph_inode_info *ci, int is_delayed, int flush_snap)
-{
-       struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode);
-       struct ceph_mds_client *mdsc = &client->mdsc;
-       struct inode *inode = &ci->vfs_inode;
-       struct ceph_inode_cap *cap;
-       struct list_head *p;
-       int wanted, used;
-       struct ceph_mds_session *session = 0;  /* if non-NULL, i hold s_mutex */
-       int took_snap_rwsem = 0;             /* true if mdsc->snap_rwsem held */
-
-retry:
-       spin_lock(&inode->i_lock);
-       wanted = __ceph_caps_wanted(ci);
-       used = __ceph_caps_used(ci);
-       dout(10, "check_caps %p wanted %d used %d issued %d\n", inode,
-            wanted, used, __ceph_caps_issued(ci, 0));
-
-       if (!is_delayed)
-               __ceph_cap_delay_requeue(mdsc, ci);
-
-       list_for_each(p, &ci->i_caps) {
-               int revoking;
-               cap = list_entry(p, struct ceph_inode_cap, ci_caps);
-
-               /* note: no side-effects allowed, until we take s_mutex */
-               revoking = cap->implemented & ~cap->issued;
-
-               if (ci->i_wanted_max_size > ci->i_max_size &&
-                   ci->i_wanted_max_size > ci->i_requested_max_size)
-                       goto ack;
-
-               /* completed revocation? */
-               if (revoking && (revoking && used) == 0) {
-                       dout(10, "completed revocation of %d\n",
-                            cap->implemented & ~cap->issued);
-                       goto ack;
-               }
-
-               /* approaching file_max? */
-               if ((cap->issued & CEPH_CAP_WR) &&
-                   (inode->i_size << 1) >= ci->i_max_size &&
-                   (ci->i_reported_size << 1) < ci->i_max_size) {
-                       dout(10, "i_size approaching max_size\n");
-                       goto ack;
-               }
-
-               /* flush snap? */
-               if (flush_snap &&
-                   (cap->issued & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER))) {
-                       if (cap->flushed_snap >=
-                           ci->i_snaprealm->cached_context->seq) {
-                               dout(10, "flushed_snap %llu >= seq %lld, "
-                                    "not flushing mds%d\n",
-                                    cap->flushed_snap,
-                                    ci->i_snaprealm->cached_context->seq,
-                                    cap->session->s_mds);
-                               continue;  /* already flushed for this snap */
-                       }
-                       goto ack;
-               }
-
-               if ((cap->issued & ~wanted) == 0)
-                       continue;     /* nothing extra, all good */
-
-               if (time_before(jiffies, ci->i_hold_caps_until)) {
-                       /* delaying cap release for a bit */
-                       dout(30, "delaying cap release\n");
-                       continue;
-               }
-
-ack:
-               /* take s_mutex, one way or another */
-               if (session && session != cap->session) {
-                       dout(30, "oops, wrong session %p mutex\n", session);
-                       mutex_unlock(&session->s_mutex);
-                       session = 0;
-               }
-               /* take snap_rwsem before session mutex */
-               if (!flush_snap && !took_snap_rwsem) {
-                       if (down_write_trylock(&mdsc->snap_rwsem) == 0) {
-                               dout(10, "inverting snap/in locks on %p\n",
-                                    inode);
-                               spin_unlock(&inode->i_lock);
-                               down_write(&mdsc->snap_rwsem);
-                               took_snap_rwsem = 1;
-                               goto retry;
-                       }
-                       took_snap_rwsem = 1;
-               }
-               if (!session) {
-                       session = cap->session;
-                       if (mutex_trylock(&session->s_mutex) == 0) {
-                               dout(10, "inverting session/ino locks on %p\n",
-                                    session);
-                               spin_unlock(&inode->i_lock);
-                               mutex_lock(&session->s_mutex);
-                               goto retry;
-                       }
-               }
-
-               /* send_cap drops i_lock */
-               __ceph_mdsc_send_cap(mdsc, session, cap,
-                                    used, wanted, flush_snap);
-
-               goto retry; /* retake i_lock and restart our cap scan. */
-       }
-
-       /* okay */
-       spin_unlock(&inode->i_lock);
-
-       if (session)
-               mutex_unlock(&session->s_mutex);
-       if (took_snap_rwsem)
-               up_write(&mdsc->snap_rwsem);
-}
 
 void ceph_inode_set_size(struct inode *inode, loff_t size)
 {
@@ -1479,138 +1136,6 @@ void ceph_put_fmode(struct ceph_inode_info *ci, int fmode)
 }
 
 
-/*
- * caller holds s_mutex.  NOT snap_rwsem.
- * return value:
- *  0 - ok
- *  1 - send the msg back to mds
- */
-int ceph_handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
-                         struct ceph_mds_session *session)
-{
-       struct ceph_inode_cap *cap;
-       struct ceph_inode_info *ci = ceph_inode(inode);
-       int mds = session->s_mds;
-       int seq = le32_to_cpu(grant->seq);
-       int newcaps = le32_to_cpu(grant->caps);
-       int used;
-       int issued; /* to me, before */
-       int wanted;
-       int reply = 0;
-       u64 size = le64_to_cpu(grant->size);
-       u64 max_size = le64_to_cpu(grant->max_size);
-       struct timespec mtime, atime, ctime;
-       int wake = 0;
-       int writeback_now = 0;
-       int invalidate = 0;
-
-       dout(10, "handle_cap_grant inode %p ci %p mds%d seq %d\n",
-            inode, ci, mds, seq);
-       dout(10, " size %llu max_size %llu, i_size %llu\n", size, max_size,
-               inode->i_size);
-
-       spin_lock(&inode->i_lock);
-
-       /* do we have this cap? */
-       cap = __get_cap_for_mds(inode, mds);
-       if (!cap) {
-               /*
-                * then ignore.  never reply to cap messages out of turn,
-                * or we'll be mixing up different instances of caps on the
-                * same inode, and confuse the mds.
-                */
-               dout(10, "no cap on %p ino %llx.%llx from mds%d, ignoring\n",
-                    inode, ci->i_vino.ino, ci->i_vino.snap, mds);
-               goto out;
-       }
-       dout(10, " cap %p\n", cap);
-       cap->gen = session->s_cap_gen;
-
-       /* size/ctime/mtime/atime? */
-       issued = __ceph_caps_issued(ci, 0);
-       ceph_decode_timespec(&mtime, &grant->mtime);
-       ceph_decode_timespec(&atime, &grant->atime);
-       ceph_decode_timespec(&ctime, &grant->ctime);
-       fill_file_bits(inode, issued, le64_to_cpu(grant->time_warp_seq),
-                      size, &ctime, &mtime, &atime);
-
-       /* max size increase? */
-       if (max_size != ci->i_max_size) {
-               dout(10, "max_size %lld -> %llu\n", ci->i_max_size, max_size);
-               ci->i_max_size = max_size;
-               if (max_size >= ci->i_wanted_max_size) {
-                       ci->i_wanted_max_size = 0;  /* reset */
-                       ci->i_requested_max_size = 0;
-               }
-               wake = 1;
-       }
-
-       /* check cap bits */
-       wanted = __ceph_caps_wanted(ci);
-       used = __ceph_caps_used(ci);
-       dout(10, " my wanted = %d, used = %d\n", wanted, used);
-       if (wanted != le32_to_cpu(grant->wanted)) {
-               dout(10, "mds wanted %d -> %d\n", le32_to_cpu(grant->wanted),
-                    wanted);
-               grant->wanted = cpu_to_le32(wanted);
-       }
-
-       cap->seq = seq;
-
-       /* layout may have changed */
-       ci->i_layout = grant->layout;
-
-       /* revocation? */
-       if (cap->issued & ~newcaps) {
-               dout(10, "revocation: %d -> %d\n", cap->issued, newcaps);
-               if ((cap->issued & ~newcaps) & CEPH_CAP_RDCACHE)
-                       invalidate = 1;
-               if ((used & ~newcaps) & CEPH_CAP_WRBUFFER)
-                       writeback_now = 1; /* will delay ack */
-               else {
-                       cap->implemented = newcaps;
-                       /* ack now.  re-use incoming message. */
-                       grant->size = le64_to_cpu(inode->i_size);
-                       grant->max_size = 0;  /* don't re-request */
-                       ceph_encode_timespec(&grant->mtime, &inode->i_mtime);
-                       ceph_encode_timespec(&grant->atime, &inode->i_atime);
-                       grant->time_warp_seq = cpu_to_le64(ci->i_time_warp_seq);
-                       grant->snap_follows =
-                            cpu_to_le64(ci->i_snaprealm->cached_context->seq);
-                       reply = 1;
-                       wake = 1;
-               }
-               cap->issued = newcaps;
-               goto out;
-       }
-
-       /* grant or no-op */
-       if (cap->issued == newcaps) {
-               dout(10, "caps unchanged: %d -> %d\n", cap->issued, newcaps);
-       } else {
-               dout(10, "grant: %d -> %d\n", cap->issued, newcaps);
-               cap->implemented = cap->issued = newcaps;
-               wake = 1;
-       }
-
-out:
-       spin_unlock(&inode->i_lock);
-       if (wake)
-               wake_up(&ci->i_cap_wq);
-       if (writeback_now) {
-               /*
-                * queue inode for writeback; we can't actually call
-                * write_inode_now, writepages, etc. from this
-                * context.
-                */
-               dout(10, "queueing %p for writeback\n", inode);
-               ceph_queue_writeback(ceph_client(inode->i_sb), ci);
-       }
-       if (invalidate)
-               invalidate_mapping_pages(&inode->i_data, 0, -1);
-       return reply;
-}
-
 void ceph_inode_writeback(struct work_struct *work)
 {
        struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
@@ -1673,292 +1198,6 @@ void __ceph_do_pending_vmtruncate(struct inode *inode)
                dout(10, "__do_pending_vmtruncate %p nothing to do\n", inode);
 }
 
-/*
- * caller hold s_mutex, snap_rwsem.
- */
-static void __cap_delay_cancel(struct ceph_mds_client *mdsc,
-                              struct ceph_inode_info *ci)
-{
-       dout(10, "__cap_delay_cancel %p\n", &ci->vfs_inode);
-       if (list_empty(&ci->i_cap_delay_list))
-               return;
-       spin_lock(&mdsc->cap_delay_lock);
-       list_del_init(&ci->i_cap_delay_list);
-       spin_unlock(&mdsc->cap_delay_lock);
-       iput(&ci->vfs_inode);
-}
-
-void ceph_handle_cap_released(struct inode *inode,
-                             struct ceph_mds_caps *m,
-                             struct ceph_mds_session *session)
-{
-       struct ceph_inode_info *ci = ceph_inode(inode);
-       int seq = le32_to_cpu(m->seq);
-       int removed_last;
-       struct ceph_inode_cap *cap;
-
-       dout(10, "handle_cap_released inode %p ci %p mds%d seq %d\n", inode, ci,
-            session->s_mds, seq);
-
-       spin_lock(&inode->i_lock);
-       cap = __get_cap_for_mds(inode, session->s_mds);
-       BUG_ON(!cap);
-       removed_last = __ceph_remove_cap(cap);
-       if (removed_last)
-               __cap_delay_cancel(&ceph_inode_to_client(inode)->mdsc, ci);
-       spin_unlock(&inode->i_lock);
-       if (removed_last)
-               iput(inode);
-}
-
-/*
- * caller hold s_mutex, snap_rwsem.
- */
-void ceph_handle_cap_flushedsnap(struct inode *inode,
-                                struct ceph_mds_caps *m,
-                                struct ceph_mds_session *session)
-{
-       struct ceph_inode_info *ci = ceph_inode(inode);
-       int seq = le32_to_cpu(m->seq);
-
-       dout(10, "handle_cap_flushednsap inode %p ci %p mds%d seq %d\n", inode,
-            ci, session->s_mds, seq);
-
-       /* **** WRITE ME **** */
-}
-
-
-/*
- * caller hold s_mutex, NOT snap_rwsem.
- */
-void ceph_handle_cap_trunc(struct inode *inode,
-                          struct ceph_mds_caps *trunc,
-                          struct ceph_mds_session *session)
-{
-       struct ceph_inode_info *ci = ceph_inode(inode);
-       int mds = session->s_mds;
-       int seq = le32_to_cpu(trunc->seq);
-       u64 size = le64_to_cpu(trunc->size);
-       int queue_trunc = 0;
-
-       dout(10, "handle_cap_trunc inode %p ci %p mds%d seq %d\n", inode, ci,
-            mds, seq);
-
-       /*
-        * vmtruncate lazily; we can't block on i_mutex in the message
-        * handler path, or we deadlock against osd op replies needed
-        * to complete the writes holding i_lock.  vmtruncate will
-        * also block on page locks held by writes...
-        *
-        * if its an expansion, and there is no truncate pending, we
-        * don't need to truncate.
-        */
-
-       spin_lock(&inode->i_lock);
-       if (ci->i_vmtruncate_to < 0 && size > inode->i_size)
-               dout(10, "clean fwd truncate, no vmtruncate needed\n");
-       else if (ci->i_vmtruncate_to >= 0 && size >= ci->i_vmtruncate_to)
-               dout(10, "trunc to %lld < %lld already queued\n",
-                    ci->i_vmtruncate_to, size);
-       else {
-               /* we need to trunc even smaller */
-               dout(10, "queueing trunc %lld -> %lld\n", inode->i_size, size);
-               ci->i_vmtruncate_to = size;
-               queue_trunc = 1;
-       }
-       i_size_write(inode, size);
-       ci->i_reported_size = size;
-       spin_unlock(&inode->i_lock);
-
-       if (queue_trunc)
-               queue_work(ceph_client(inode->i_sb)->trunc_wq,
-                          &ci->i_vmtruncate_work);
-}
-
-void ceph_handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
-                           struct ceph_mds_session *session)
-{
-       struct ceph_inode_info *ci = ceph_inode(inode);
-       int mds = session->s_mds;
-       unsigned mseq = le32_to_cpu(ex->migrate_seq);
-       struct ceph_inode_cap *cap = 0, *t;
-       struct list_head *p;
-       int was_last = 0;
-
-       dout(10, "handle_cap_export inode %p ci %p mds%d mseq %d\n",
-            inode, ci, mds, mseq);
-
-       spin_lock(&inode->i_lock);
-
-       /* make sure we haven't seen a higher mseq */
-       list_for_each(p, &ci->i_caps) {
-               t = list_entry(p, struct ceph_inode_cap, ci_caps);
-               if (t->mseq > mseq) {
-                       dout(10, " higher mseq on cap from mds%d\n",
-                            t->session->s_mds);
-                       goto out;
-               }
-               if (t->session->s_mds == mds)
-                       cap = t;
-       }
-
-       if (cap) {
-               /* make note, and remove */
-               ci->i_cap_exporting_mds = mds;
-               ci->i_cap_exporting_mseq = mseq;
-               ci->i_cap_exporting_issued = cap->issued;
-               was_last = __ceph_remove_cap(cap);
-       } else
-               WARN_ON(!cap);
-
-out:
-       spin_unlock(&inode->i_lock);
-       if (was_last)
-               iput(inode);
-}
-
-void ceph_handle_cap_import(struct inode *inode, struct ceph_mds_caps *im,
-                           struct ceph_mds_session *session,
-                           void *snaptrace, int snaptrace_len)
-{
-       struct ceph_inode_info *ci = ceph_inode(inode);
-       int mds = session->s_mds;
-       unsigned issued = le32_to_cpu(im->caps);
-       unsigned seq = le32_to_cpu(im->seq);
-       unsigned mseq = le32_to_cpu(im->migrate_seq);
-
-       if (ci->i_cap_exporting_mds >= 0 &&
-           ci->i_cap_exporting_mseq < mseq) {
-               dout(10, "handle_cap_import inode %p ci %p mds%d mseq %d"
-                    " - cleared exporting from mds%d\n",
-                    inode, ci, mds, mseq,
-                    ci->i_cap_exporting_mds);
-               ci->i_cap_exporting_issued = 0;
-               ci->i_cap_exporting_mseq = 0;
-               ci->i_cap_exporting_mds = -1;
-       } else {
-               dout(10, "handle_cap_import inode %p ci %p mds%d mseq %d\n",
-                    inode, ci, mds, mseq);
-       }
-
-       ceph_add_cap(inode, session, -1, issued, seq, mseq,
-                    snaptrace, snaptrace_len);
-}
-
-
-static void __take_cap_refs(struct ceph_inode_info *ci, int got)
-{
-       if (got & CEPH_CAP_RD)
-               ci->i_rd_ref++;
-       if (got & CEPH_CAP_RDCACHE)
-               ci->i_rdcache_ref++;
-       if (got & CEPH_CAP_WR)
-               ci->i_wr_ref++;
-       if (got & CEPH_CAP_WRBUFFER) {
-               atomic_inc(&ci->i_wrbuffer_ref);
-               dout(30, "__take_cap_refs %p wrbuffer %d -> %d (?)\n",
-                    &ci->vfs_inode, atomic_read(&ci->i_wrbuffer_ref)-1,
-                    atomic_read(&ci->i_wrbuffer_ref));
-       }
-}
-
-void ceph_take_cap_refs(struct ceph_inode_info *ci, int got)
-{
-       dout(30, "take_cap_refs on %p taking %d\n", &ci->vfs_inode, got);
-       spin_lock(&ci->vfs_inode.i_lock);
-       __take_cap_refs(ci, got);
-       spin_unlock(&ci->vfs_inode.i_lock);
-}
-
-int ceph_get_cap_refs(struct ceph_inode_info *ci, int need, int want, int *got,
-                     loff_t endoff)
-{
-       int ret = 0;
-       int have, implemented;
-
-       dout(30, "get_cap_refs on %p need %d want %d\n", &ci->vfs_inode,
-            need, want);
-       spin_lock(&ci->vfs_inode.i_lock);
-       if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) {
-               dout(20, "get_cap_refs endoff %llu > max_size %llu\n",
-                    endoff, ci->i_max_size);
-               goto sorry;
-       }
-       have = __ceph_caps_issued(ci, &implemented);
-       if ((have & need) == need) {
-               /*
-                * look at (implemented & ~have & not) so that we keep waiting
-                * on transition from wanted -> needed caps.  this is needed
-                * for WRBUFFER|WR -> WR to avoid a new WR sync write from
-                * going before a prior buffered writeback happens.
-                */
-               int not = want & ~(have & need);
-               int revoking = implemented & ~have;
-               dout(30, "get_cap_refs have %d but not %d (revoking %d)\n",
-                    have, not, revoking);
-               if ((revoking & not) == 0) {
-                       *got = need | (have & want);
-                       __take_cap_refs(ci, *got);
-                       ret = 1;
-               }
-       } else
-               dout(30, "get_cap_refs have %d needed %d\n", have, need);
-sorry:
-       spin_unlock(&ci->vfs_inode.i_lock);
-       dout(30, "get_cap_refs on %p ret %d got %d\n", &ci->vfs_inode,
-            ret, *got);
-       return ret;
-}
-
-void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
-{
-       int last = 0;
-
-       spin_lock(&ci->vfs_inode.i_lock);
-       if (had & CEPH_CAP_RD)
-               if (--ci->i_rd_ref == 0)
-                       last++;
-       if (had & CEPH_CAP_RDCACHE)
-               if (--ci->i_rdcache_ref == 0)
-                       last++;
-       if (had & CEPH_CAP_WR)
-               if (--ci->i_wr_ref == 0)
-                       last++;
-       if (had & CEPH_CAP_WRBUFFER) {
-               if (atomic_dec_and_test(&ci->i_wrbuffer_ref))
-                       last++;
-               dout(30, "put_cap_refs %p wrbuffer %d -> %d (?)\n",
-                    &ci->vfs_inode, atomic_read(&ci->i_wrbuffer_ref)+1,
-                    atomic_read(&ci->i_wrbuffer_ref));
-       }
-       spin_unlock(&ci->vfs_inode.i_lock);
-
-       dout(30, "put_cap_refs on %p had %d %s\n", &ci->vfs_inode, had,
-            last ? "last":"");
-
-       if (last)
-               ceph_check_caps(ci, 0, 0);
-}
-
-void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr)
-{
-       int was_last;
-       int v;
-
-       spin_lock(&ci->vfs_inode.i_lock);
-       was_last = atomic_sub_and_test(nr, &ci->i_wrbuffer_ref);
-       v = atomic_read(&ci->i_wrbuffer_ref);
-       spin_unlock(&ci->vfs_inode.i_lock);
-
-       dout(30, "put_wrbuffer_cap_refs on %p %d -> %d (?)%s\n",
-            &ci->vfs_inode, v+nr, v, was_last ? " LAST":"");
-       WARN_ON(v < 0);
-
-       if (was_last)
-               ceph_check_caps(ci, 0, 0);
-}
-
-
 /*
  * symlinks
  */
index 0d4fd879bfd4cb2160cc9ef777b771485c133e7b..a94f442c1cb84ba68f4dea934dd7bf9ebcedd983 100644 (file)
@@ -13,8 +13,8 @@ int ceph_debug_mdsc = -1;
 #include "messenger.h"
 #include "decode.h"
 
-static void send_msg_mds(struct ceph_mds_client *mdsc, struct ceph_msg *msg,
-                        int mds)
+void ceph_send_msg_mds(struct ceph_mds_client *mdsc, struct ceph_msg *msg,
+                      int mds)
 {
        msg->hdr.dst.addr = *ceph_mdsmap_get_addr(mdsc->mdsmap, mds);
        msg->hdr.dst.name.type = cpu_to_le32(CEPH_ENTITY_TYPE_MDS);
@@ -242,8 +242,8 @@ static void destroy_reply_info(struct ceph_mds_reply_info *info)
  * sessions
  */
 
-static struct ceph_mds_session *__get_session(struct ceph_mds_client *mdsc,
-                                             int mds)
+struct ceph_mds_session *__ceph_get_mds_session(struct ceph_mds_client *mdsc,
+                                               int mds)
 {
        struct ceph_mds_session *session;
        if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == 0)
@@ -253,10 +253,10 @@ static struct ceph_mds_session *__get_session(struct ceph_mds_client *mdsc,
        return session;
 }
 
-static void put_session(struct ceph_mds_session *s)
+void ceph_put_mds_session(struct ceph_mds_session *s)
 {
        BUG_ON(s == NULL);
-       dout(30, "put_session %p %d -> %d\n", s,
+       dout(30, "put_mds_session %p %d -> %d\n", s,
             atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
        if (atomic_dec_and_test(&s->s_ref))
                kfree(s);
@@ -307,7 +307,7 @@ __register_session(struct ceph_mds_client *mdsc, int mds)
                mdsc->max_sessions = newmax;
        }
        if (mdsc->sessions[mds]) {
-               put_session(s); /* lost race */
+               ceph_put_mds_session(s); /* lost race */
                return mdsc->sessions[mds];
        } else {
                mdsc->sessions[mds] = s;
@@ -319,7 +319,7 @@ __register_session(struct ceph_mds_client *mdsc, int mds)
 static void __unregister_session(struct ceph_mds_client *mdsc, int mds)
 {
        dout(10, "__unregister_session mds%d %p\n", mds, mdsc->sessions[mds]);
-       put_session(mdsc->sessions[mds]);
+       ceph_put_mds_session(mdsc->sessions[mds]);
        mdsc->sessions[mds] = 0;
 }
 
@@ -336,11 +336,11 @@ static void get_request(struct ceph_mds_request *req)
 static void put_request_sessions(struct ceph_mds_request *req)
 {
        if (req->r_session) {
-               put_session(req->r_session);
+               ceph_put_mds_session(req->r_session);
                req->r_session = 0;
        }
        if (req->r_fwd_session) {
-               put_session(req->r_fwd_session);
+               ceph_put_mds_session(req->r_fwd_session);
                req->r_fwd_session = 0;
        }
 }
@@ -589,7 +589,7 @@ static int open_session(struct ceph_mds_client *mdsc,
        msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
        if (IS_ERR(msg))
                return PTR_ERR(msg);
-       send_msg_mds(mdsc, msg, mds);
+       ceph_send_msg_mds(mdsc, msg, mds);
 
        /* wait for session to open (or fail, or close) */
        dout(30, "open_session waiting on session %p\n", session);
@@ -814,7 +814,7 @@ static int send_renew_caps(struct ceph_mds_client *mdsc,
        msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 0);
        if (IS_ERR(msg))
                return PTR_ERR(msg);
-       send_msg_mds(mdsc, msg, session->s_mds);
+       ceph_send_msg_mds(mdsc, msg, session->s_mds);
        return 0;
 }
 
@@ -838,7 +838,7 @@ void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc,
 
        /* handle */
        mutex_lock(&mdsc->mutex);
-       session = __get_session(mdsc, mds);
+       session = __ceph_get_mds_session(mdsc, mds);
        if (session && mdsc->mdsmap)
                session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
        mutex_unlock(&mdsc->mutex);
@@ -905,7 +905,7 @@ void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc,
        }
 
        mutex_unlock(&session->s_mutex);
-       put_session(session);
+       ceph_put_mds_session(session);
        return;
 
 bad:
@@ -1033,7 +1033,7 @@ retry:
        }
 
        /* get session */
-       session = __get_session(mdsc, mds);
+       session = __ceph_get_mds_session(mdsc, mds);
        if (!session)
                session = __register_session(mdsc, mds);
        dout(30, "do_request mds%d session %p state %d\n", mds, session,
@@ -1051,7 +1051,7 @@ retry:
                err == -EAGAIN) {
                dout(30, "do_request session %p not open, state=%d, waiting\n",
                     session, session->s_state);
-               put_session(session);
+               ceph_put_mds_session(session);
                goto retry;
        }
 
@@ -1071,7 +1071,7 @@ retry:
        dout(10, "do_request %p r_expects_cap=%d\n", req, req->r_expects_cap);
        req->r_request = ceph_msg_maybe_dup(req->r_request);
        ceph_msg_get(req->r_request);
-       send_msg_mds(mdsc, req->r_request, mds);
+       ceph_send_msg_mds(mdsc, req->r_request, mds);
        wait_for_completion(&req->r_completion);
        mutex_lock(&mdsc->mutex);
        if (req->r_reply == NULL) {
@@ -1127,8 +1127,8 @@ void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
        dout(10, "handle_reply %p r_expects_cap=%d\n", req, req->r_expects_cap);
        mds = le32_to_cpu(msg->hdr.src.name.num);
        if (req->r_session && req->r_session->s_mds != mds) {
-               put_session(req->r_session);
-               req->r_session = __get_session(mdsc, mds);
+               ceph_put_mds_session(req->r_session);
+               req->r_session = __ceph_get_mds_session(mdsc, mds);
        }
        if (req->r_session == 0) {
                derr(1, "got reply on %llu, but no session for mds%d\n",
@@ -1252,8 +1252,8 @@ void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc,
                req->r_num_fwd = fwd_seq;
                req->r_resend_mds = next_mds;
                put_request_sessions(req);
-               req->r_session = __get_session(mdsc, next_mds);
-               req->r_fwd_session = __get_session(mdsc, from_mds);
+               req->r_session = __ceph_get_mds_session(mdsc, next_mds);
+               req->r_fwd_session = __ceph_get_mds_session(mdsc, from_mds);
        } else {
                /* no, resend. */
                /* forward race not possible; mds would drop */
@@ -1297,7 +1297,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
        dout(1, "reconnect to recovering mds%d\n", mds);
 
        /* find session */
-       session = __get_session(mdsc, mds);
+       session = __ceph_get_mds_session(mdsc, mds);
        if (session) {
                session->s_state = CEPH_MDS_SESSION_RECONNECTING;
                session->s_seq = 0;
@@ -1384,7 +1384,7 @@ send:
        reply->hdr.front_len = cpu_to_le32(reply->front.iov_len);
        dout(10, "final len was %u (guessed %d)\n",
             (unsigned)reply->front.iov_len, len);
-       send_msg_mds(mdsc, reply, mds);
+       ceph_send_msg_mds(mdsc, reply, mds);
 
        if (session) {
                if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) {
@@ -1398,7 +1398,7 @@ send:
 out:
        if (session) {
                mutex_unlock(&session->s_mutex);
-               put_session(session);
+               ceph_put_mds_session(session);
        }
        mutex_lock(&mdsc->mutex);
        return;
@@ -1467,277 +1467,6 @@ static void check_new_map(struct ceph_mds_client *mdsc,
 }
 
 
-/* caps */
-
-static void send_cap_ack(struct ceph_mds_client *mdsc, __u64 ino, int op,
-                        int caps, int wanted, __u64 seq, __u64 mseq,
-                        __u64 size, __u64 max_size,
-                        struct timespec *mtime, struct timespec *atime,
-                        u64 time_warp_seq, u64 follows, int mds)
-{
-       struct ceph_mds_caps *fc;
-       struct ceph_msg *msg;
-
-       dout(10, "send_cap_ack %s %llx caps %d wanted %d seq %llu/%llu"
-            " follows %lld size %llu\n", ceph_cap_op_name(op), ino,
-            caps, wanted, seq, mseq, follows, size);
-
-       msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), 0, 0, 0);
-       if (IS_ERR(msg))
-               return;
-
-       fc = msg->front.iov_base;
-
-       memset(fc, 0, sizeof(*fc));
-
-       fc->op = cpu_to_le32(op);
-       fc->seq = cpu_to_le64(seq);
-       fc->migrate_seq = cpu_to_le64(mseq);
-       fc->caps = cpu_to_le32(caps);
-       fc->wanted = cpu_to_le32(wanted);
-       fc->ino = cpu_to_le64(ino);
-       fc->size = cpu_to_le64(size);
-       fc->max_size = cpu_to_le64(max_size);
-       fc->snap_follows = cpu_to_le64(follows);
-       if (mtime)
-               ceph_encode_timespec(&fc->mtime, mtime);
-       if (atime)
-               ceph_encode_timespec(&fc->atime, atime);
-       fc->time_warp_seq = cpu_to_le64(time_warp_seq);
-
-       send_msg_mds(mdsc, msg, mds);
-}
-
-void ceph_mdsc_handle_caps(struct ceph_mds_client *mdsc,
-                          struct ceph_msg *msg)
-{
-       struct super_block *sb = mdsc->client->sb;
-       struct ceph_mds_session *session;
-       struct inode *inode;
-       struct ceph_mds_caps *h;
-       int mds = le32_to_cpu(msg->hdr.src.name.num);
-       int op;
-       u32 seq;
-       struct ceph_vino vino;
-       u64 size, max_size;
-
-       dout(10, "handle_caps from mds%d\n", mds);
-
-       /* decode */
-       if (msg->front.iov_len < sizeof(*h))
-               goto bad;
-       h = msg->front.iov_base;
-       op = le32_to_cpu(h->op);
-       vino.ino = le64_to_cpu(h->ino);
-       vino.snap = CEPH_NOSNAP;
-       seq = le32_to_cpu(h->seq);
-       size = le64_to_cpu(h->size);
-       max_size = le64_to_cpu(h->max_size);
-
-       /* find session */
-       mutex_lock(&mdsc->mutex);
-       session = __get_session(mdsc, mds);
-       if (session)
-               down_write(&mdsc->snap_rwsem);
-       mutex_unlock(&mdsc->mutex);
-       if (!session) {
-               dout(10, "WTF, got cap but no session for mds%d\n", mds);
-               return;
-       }
-
-       mutex_lock(&session->s_mutex);
-       session->s_seq++;
-
-       /* lookup ino */
-       inode = ceph_find_inode(sb, vino);
-       dout(20, "op %d ino %llx inode %p\n", op, vino.ino, inode);
-       if (!inode) {
-               dout(10, "i don't have ino %llx, sending release\n", vino.ino);
-               send_cap_ack(mdsc, vino.ino, CEPH_CAP_OP_RELEASE, 0, 0, seq,
-                            size, 0, 0, 0, 0, 0, 0, mds);
-               goto no_inode;
-       }
-
-       switch (op) {
-       case CEPH_CAP_OP_GRANT:
-               up_write(&mdsc->snap_rwsem);
-               if (ceph_handle_cap_grant(inode, h, session) == 1) {
-                       dout(10, "sending reply back to mds%d\n", mds);
-                       ceph_msg_get(msg);
-                       send_msg_mds(mdsc, msg, mds);
-               }
-               break;
-
-       case CEPH_CAP_OP_TRUNC:
-               up_write(&mdsc->snap_rwsem);
-               ceph_handle_cap_trunc(inode, h, session);
-               break;
-
-       case CEPH_CAP_OP_RELEASED:
-               ceph_handle_cap_released(inode, h, session);
-               up_write(&mdsc->snap_rwsem);
-               break;
-
-       case CEPH_CAP_OP_FLUSHEDSNAP:
-               ceph_handle_cap_flushedsnap(inode, h, session);
-               up_write(&mdsc->snap_rwsem);
-               break;
-
-       case CEPH_CAP_OP_EXPORT:
-               ceph_handle_cap_export(inode, h, session);
-               up_write(&mdsc->snap_rwsem);
-               break;
-
-       case CEPH_CAP_OP_IMPORT:
-               ceph_handle_cap_import(inode, h, session,
-                                      msg->front.iov_base + sizeof(*h),
-                                      le32_to_cpu(h->snap_trace_len));
-               up_write(&mdsc->snap_rwsem);
-               break;
-
-       default:
-               up_write(&mdsc->snap_rwsem);
-               derr(10, "unknown cap op %d %s\n", op, ceph_cap_op_name(op));
-       }
-
-       iput(inode);
-no_inode:
-       mutex_unlock(&session->s_mutex);
-       put_session(session);
-       return;
-
-bad:
-       derr(10, "corrupt caps message\n");
-       return;
-}
-
-/*
- * called with i_lock, then drops it.
- * caller should hold snap_rwsem, s_mutex.
- *
- * returns true if we removed the last cap on this inode.
- */
-int __ceph_mdsc_send_cap(struct ceph_mds_client *mdsc,
-                        struct ceph_mds_session *session,
-                        struct ceph_inode_cap *cap,
-                        int used, int wanted,
-                        int flush_snap)
-{
-       struct ceph_inode_info *ci = cap->ci;
-       struct inode *inode = &ci->vfs_inode;
-       int revoking = cap->implemented & ~cap->issued;
-       int dropping = cap->issued & ~wanted;
-       int keep;
-       u64 seq, mseq, time_warp_seq, follows;
-       u64 size, max_size;
-       struct timespec mtime, atime;
-       int wake = 0;
-       int op = CEPH_CAP_OP_ACK;
-
-       if (flush_snap)
-               op = CEPH_CAP_OP_FLUSHSNAP;
-       else if (wanted == 0)
-               op = CEPH_CAP_OP_RELEASE;
-
-       dout(10, "__send_cap cap %p session %p %d -> %d\n", cap, cap->session,
-            cap->issued, cap->issued & wanted);
-       cap->issued &= wanted;  /* drop bits we don't want */
-
-       if (revoking && (revoking && used) == 0) {
-               cap->implemented = cap->issued;
-               wake = 1;  /* for waiters on wanted -> needed transition */
-       }
-
-       keep = cap->issued;
-       seq = cap->seq;
-       mseq = cap->mseq;
-       size = inode->i_size;
-       ci->i_reported_size = size;
-       max_size = ci->i_wanted_max_size;
-       ci->i_requested_max_size = max_size;
-       mtime = inode->i_mtime;
-       atime = inode->i_atime;
-       time_warp_seq = ci->i_time_warp_seq;
-       follows = ci->i_snaprealm->cached_context->seq;
-       if (flush_snap)
-               cap->flushed_snap = follows; /* so we only flush it once */
-       spin_unlock(&inode->i_lock);
-
-       if (dropping & CEPH_CAP_RDCACHE) {
-               /*
-                * FIXME: this will block if there is a locked page..
-                */
-               dout(20, "invalidating pages on %p\n", inode);
-               invalidate_mapping_pages(&inode->i_data, 0, -1);
-               dout(20, "done invalidating pages on %p\n", inode);
-       }
-
-       send_cap_ack(mdsc, ceph_vino(inode).ino,
-                    op, keep, wanted, seq, mseq,
-                    size, max_size, &mtime, &atime, time_warp_seq,
-                    follows, session->s_mds);
-
-       if (wake)
-               wake_up(&ci->i_cap_wq);
-
-       return 0;
-}
-
-static void check_delayed_caps(struct ceph_mds_client *mdsc)
-{
-       struct ceph_inode_info *ci;
-
-       dout(10, "check_delayed_caps\n");
-       while (1) {
-               spin_lock(&mdsc->cap_delay_lock);
-               if (list_empty(&mdsc->cap_delay_list))
-                       break;
-               ci = list_first_entry(&mdsc->cap_delay_list,
-                                     struct ceph_inode_info,
-                                     i_cap_delay_list);
-               if (time_before(jiffies, ci->i_hold_caps_until))
-                       break;
-               list_del_init(&ci->i_cap_delay_list);
-               spin_unlock(&mdsc->cap_delay_lock);
-               dout(10, "check_delayed_caps on %p\n", &ci->vfs_inode);
-               ceph_check_caps(ci, 1, 0);
-               iput(&ci->vfs_inode);
-       }
-       spin_unlock(&mdsc->cap_delay_lock);
-}
-
-static void flush_write_caps(struct ceph_mds_client *mdsc,
-                            struct ceph_mds_session *session,
-                            int purge)
-{
-       struct list_head *p, *n;
-
-       list_for_each_safe (p, n, &session->s_caps) {
-               struct ceph_inode_cap *cap =
-                       list_entry(p, struct ceph_inode_cap, session_caps);
-               struct inode *inode = &cap->ci->vfs_inode;
-               int used, wanted;
-
-               spin_lock(&inode->i_lock);
-               if ((cap->implemented & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) == 0) {
-                       spin_unlock(&inode->i_lock);
-                       continue;
-               }
-
-               used = __ceph_caps_used(cap->ci);
-               wanted = __ceph_caps_wanted(cap->ci);
-
-               if (purge && (used || wanted)) {
-                       derr(0, "residual caps on %p used %d wanted %d s=%llu wrb=%d\n",
-                            inode, used, wanted, inode->i_size,
-                            atomic_read(&cap->ci->i_wrbuffer_ref));
-                       used = wanted = 0;
-               }
-
-               __ceph_mdsc_send_cap(mdsc, session, cap, used, wanted, 0);
-       }
-}
-
 
 /*
  * snap
@@ -1776,7 +1505,7 @@ void ceph_mdsc_handle_snap(struct ceph_mds_client *mdsc,
 
        /* find session */
        mutex_lock(&mdsc->mutex);
-       session = __get_session(mdsc, mds);
+       session = __ceph_get_mds_session(mdsc, mds);
        if (session)
                down_write(&mdsc->snap_rwsem);
        mutex_unlock(&mdsc->mutex);
@@ -1908,7 +1637,7 @@ static int close_session(struct ceph_mds_client *mdsc,
        if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
                goto done;
 
-       flush_write_caps(mdsc, session, 1);
+       ceph_flush_write_caps(mdsc, session, 1);
 
        session->s_state = CEPH_MDS_SESSION_CLOSING;
        msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
@@ -1917,7 +1646,7 @@ static int close_session(struct ceph_mds_client *mdsc,
                err = PTR_ERR(msg);
                goto done;
        }
-       send_msg_mds(mdsc, msg, mds);
+       ceph_send_msg_mds(mdsc, msg, mds);
 
 done:
        mutex_unlock(&session->s_mutex);
@@ -1956,7 +1685,7 @@ void ceph_mdsc_handle_lease(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
 
        /* find session */
        mutex_lock(&mdsc->mutex);
-       session = __get_session(mdsc, mds);
+       session = __ceph_get_mds_session(mdsc, mds);
        mutex_unlock(&mdsc->mutex);
        if (!session) {
                dout(10, "WTF, got lease but no session for mds%d\n", mds);
@@ -2005,7 +1734,7 @@ release:
        dout(10, "sending release\n");
        h->action = CEPH_MDS_LEASE_RELEASE;
        ceph_msg_get(msg);
-       send_msg_mds(mdsc, msg, mds);
+       ceph_send_msg_mds(mdsc, msg, mds);
        mutex_unlock(&session->s_mutex);
        return;
 
@@ -2085,7 +1814,7 @@ void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
                memcpy((void *)lease + sizeof(*lease) + 4, dentry->d_name.name,
                       dnamelen);
 
-       send_msg_mds(mdsc, msg, mds);
+       ceph_send_msg_mds(mdsc, msg, mds);
 }
 
 
@@ -2115,14 +1844,14 @@ static void delayed_work(struct work_struct *work)
 
        dout(10, "delayed_work on %p renew_caps=%d\n", mdsc, renew_caps);
 
-       check_delayed_caps(mdsc);
+       ceph_check_delayed_caps(mdsc);
 
        mutex_lock(&mdsc->mutex);
        if (renew_caps)
                mdsc->last_renew_caps = jiffies;
 
        for (i = 0; i < mdsc->max_sessions; i++) {
-               struct ceph_mds_session *session = __get_session(mdsc, i);
+               struct ceph_mds_session *session = __ceph_get_mds_session(mdsc, i);
                if (session == 0)
                        continue;
                if (session->s_ttl && time_after(jiffies, session->s_ttl)) {
@@ -2131,7 +1860,7 @@ static void delayed_work(struct work_struct *work)
                        want_map = mdsc->mdsmap->m_epoch;
                }
                if (session->s_state < CEPH_MDS_SESSION_OPEN) {
-                       put_session(session);
+                       ceph_put_mds_session(session);
                        continue;
                }
                //mutex_unlock(&mdsc->mutex);
@@ -2142,7 +1871,7 @@ static void delayed_work(struct work_struct *work)
                trim_session_leases(session);
 
                mutex_unlock(&session->s_mutex);
-               put_session(session);
+               ceph_put_mds_session(session);
                //mutex_lock(&mdsc->mutex);
        }
        mutex_unlock(&mdsc->mutex);
@@ -2182,7 +1911,7 @@ static void drop_leases(struct ceph_mds_client *mdsc)
 
        mutex_lock(&mdsc->mutex);
        for (i = 0; i < mdsc->max_sessions; i++) {
-               struct ceph_mds_session *session = __get_session(mdsc, i);
+               struct ceph_mds_session *session = __ceph_get_mds_session(mdsc, i);
                if (!session)
                        continue;
                //mutex_unlock(&mdsc->mutex);
@@ -2199,7 +1928,7 @@ static void drop_leases(struct ceph_mds_client *mdsc)
 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
 {
        drop_leases(mdsc);
-       check_delayed_caps(mdsc);
+       ceph_check_delayed_caps(mdsc);
 }
 
 /*
@@ -2236,7 +1965,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
                dout(10, "closing sessions\n");
                n = 0;
                for (i = 0; i < mdsc->max_sessions; i++) {
-                       session = __get_session(mdsc, i);
+                       session = __ceph_get_mds_session(mdsc, i);
                        if (!session)
                                continue;
                        //mutex_unlock(&mdsc->mutex);
index f15c9fe117bdb84e5ce9aef6d4daaaed5162f16c..4493a8180edb40dd4d222432175e21352bcfaad8 100644 (file)
@@ -146,6 +146,12 @@ struct ceph_mds_client {
 
 extern const char *ceph_mds_op_name(int op);
 
+extern struct ceph_mds_session *__ceph_get_mds_session(struct ceph_mds_client *mdsc, int mds);
+       extern void ceph_put_mds_session(struct ceph_mds_session *s);
+
+extern void ceph_send_msg_mds(struct ceph_mds_client *mdsc,
+                             struct ceph_msg *msg, int mds);
+
 extern void ceph_mdsc_init(struct ceph_mds_client *mdsc,
                           struct ceph_client *client);
 extern void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc);
@@ -160,8 +166,6 @@ extern void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc,
 extern void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc,
                                     struct ceph_msg *msg);
 
-extern void ceph_mdsc_handle_caps(struct ceph_mds_client *mdsc,
-                                 struct ceph_msg *msg);
 extern void ceph_mdsc_handle_snap(struct ceph_mds_client *mdsc,
                                  struct ceph_msg *msg);
 
index 37ada5e192ab9a90aa455f79338fe26e8b21595a..60ccdb6165a10683a640de4e24852543978400f7 100644 (file)
@@ -351,6 +351,9 @@ enum {
        Opt_debug_osdc,
        Opt_debug_addr,
        Opt_debug_inode,
+       Opt_debug_snap,
+       Opt_debug_ioctl,
+       Opt_debug_caps,
        Opt_monport,
        Opt_port,
        Opt_wsize,
@@ -374,6 +377,9 @@ static match_table_t arg_tokens = {
        {Opt_debug_osdc, "debug_osdc=%d"},
        {Opt_debug_addr, "debug_addr=%d"},
        {Opt_debug_inode, "debug_inode=%d"},
+       {Opt_debug_snap, "debug_snap=%d"},
+       {Opt_debug_ioctl, "debug_ioctl=%d"},
+       {Opt_debug_caps, "debug_caps=%d"},
        {Opt_monport, "monport=%d"},
        {Opt_port, "port=%d"},
        {Opt_wsize, "wsize=%d"},
@@ -532,6 +538,15 @@ static int parse_mount_args(int flags, char *options, const char *dev_name,
                case Opt_debug_inode:
                        ceph_debug_inode = intval;
                        break;
+               case Opt_debug_snap:
+                       ceph_debug_snap = intval;
+                       break;
+               case Opt_debug_ioctl:
+                       ceph_debug_ioctl = intval;
+                       break;
+               case Opt_debug_caps:
+                       ceph_debug_caps = intval;
+                       break;
                case Opt_debug_console:
                        ceph_debug_console = 1;
                        break;
@@ -806,7 +821,7 @@ void ceph_dispatch(void *p, struct ceph_msg *msg)
                ceph_mdsc_handle_forward(&client->mdsc, msg);
                break;
        case CEPH_MSG_CLIENT_CAPS:
-               ceph_mdsc_handle_caps(&client->mdsc, msg);
+               ceph_handle_caps(&client->mdsc, msg);
                break;
        case CEPH_MSG_CLIENT_SNAP:
                ceph_mdsc_handle_snap(&client->mdsc, msg);
index d2fa359e9409f4518062830c7eb8c94bc007c92b..cc7ac0d4f23e38973eda5bab55d8aa013bed20e3 100644 (file)
@@ -25,6 +25,7 @@ extern int ceph_debug_addr;
 extern int ceph_debug_inode;
 extern int ceph_debug_snap;
 extern int ceph_debug_ioctl;
+extern int ceph_debug_caps;
 
 #define CEPH_DUMP_ERROR_ALWAYS
 
@@ -529,6 +530,10 @@ extern struct inode *ceph_get_snapdir(struct inode *parent);
 extern int ceph_fill_inode(struct inode *inode,
                           struct ceph_mds_reply_info_in *iinfo,
                           struct ceph_mds_reply_dirfrag *dirinfo);
+extern void ceph_fill_file_bits(struct inode *inode, int issued,
+                               u64 time_warp_seq,
+                               u64 size, struct timespec *ctime,
+                               struct timespec *mtime, struct timespec *atime);
 extern int ceph_fill_trace(struct super_block *sb,
                           struct ceph_mds_request *req,
                           struct ceph_mds_session *session);
@@ -537,6 +542,23 @@ extern int ceph_readdir_prepopulate(struct ceph_mds_request *req);
 extern int ceph_inode_lease_valid(struct inode *inode, int mask);
 extern int ceph_dentry_lease_valid(struct dentry *dentry);
 
+extern void ceph_inode_set_size(struct inode *inode, loff_t size);
+extern void ceph_inode_writeback(struct work_struct *work);
+extern void ceph_vmtruncate_work(struct work_struct *work);
+extern void __ceph_do_pending_vmtruncate(struct inode *inode);
+
+extern int ceph_do_getattr(struct dentry *dentry, int mask);
+extern int ceph_setattr(struct dentry *dentry, struct iattr *attr);
+extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
+                       struct kstat *stat);
+extern int ceph_setxattr(struct dentry *, const char *,const void *,size_t,int);
+extern ssize_t ceph_getxattr(struct dentry *, const char *, void *, size_t);
+extern ssize_t ceph_listxattr(struct dentry *, char *, size_t);
+extern int ceph_removexattr(struct dentry *, const char *);
+
+/* caps.c */
+extern void ceph_handle_caps(struct ceph_mds_client *mdsc,
+                            struct ceph_msg *msg);
 extern int ceph_add_cap(struct inode *inode,
                        struct ceph_mds_session *session,
                        int fmode, unsigned issued,
@@ -546,43 +568,15 @@ extern int __ceph_remove_cap(struct ceph_inode_cap *cap);
 extern void ceph_remove_cap(struct ceph_inode_cap *cap);
 extern void ceph_remove_all_caps(struct ceph_inode_info *ci);
 extern int ceph_get_cap_mds(struct inode *inode);
-extern int ceph_handle_cap_grant(struct inode *inode,
-                                struct ceph_mds_caps *grant,
-                                struct ceph_mds_session *session);
-extern void ceph_handle_cap_trunc(struct inode *inode,
-                                 struct ceph_mds_caps *trunc,
-                                 struct ceph_mds_session *session);
-extern void ceph_handle_cap_released(struct inode *inode,
-                                 struct ceph_mds_caps *trunc,
-                                 struct ceph_mds_session *session);
-extern void ceph_handle_cap_flushedsnap(struct inode *inode,
-                                 struct ceph_mds_caps *trunc,
-                                 struct ceph_mds_session *session);
-extern void ceph_handle_cap_export(struct inode *inode,
-                                  struct ceph_mds_caps *ex,
-                                  struct ceph_mds_session *session);
-extern void ceph_handle_cap_import(struct inode *inode,
-                                  struct ceph_mds_caps *im,
-                                  struct ceph_mds_session *session,
-                                  void *snaptrace, int snaptrace_len);
 extern int ceph_get_cap_refs(struct ceph_inode_info *ci, int need, int want, int *got, loff_t offset);
 extern void ceph_take_cap_refs(struct ceph_inode_info *ci, int got);
 extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
 extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr);
 extern void ceph_check_caps(struct ceph_inode_info *ci, int delayed, int flush);
-extern void ceph_inode_set_size(struct inode *inode, loff_t size);
-extern void ceph_inode_writeback(struct work_struct *work);
-extern void ceph_vmtruncate_work(struct work_struct *work);
-extern void __ceph_do_pending_vmtruncate(struct inode *inode);
-
-extern int ceph_do_getattr(struct dentry *dentry, int mask);
-extern int ceph_setattr(struct dentry *dentry, struct iattr *attr);
-extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
-                       struct kstat *stat);
-extern int ceph_setxattr(struct dentry *, const char *,const void *,size_t,int);
-extern ssize_t ceph_getxattr(struct dentry *, const char *, void *, size_t);
-extern ssize_t ceph_listxattr(struct dentry *, char *, size_t);
-extern int ceph_removexattr(struct dentry *, const char *);
+extern void ceph_check_delayed_caps(struct ceph_mds_client *mdsc);
+extern void ceph_flush_write_caps(struct ceph_mds_client *mdsc,
+                                 struct ceph_mds_session *session,
+                                 int purge);
 
 /* addr.c */
 extern const struct address_space_operations ceph_aops;