obj-$(CONFIG_CEPH_FS) += ceph.o
-ceph-objs := super.o inode.o dir.o file.o addr.o ioctl.o export.o snap.o \
+ceph-objs := super.o inode.o dir.o file.o addr.o ioctl.o \
+ export.o caps.o snap.o \
messenger.o \
mds_client.o mdsmap.o \
mon_client.o \
--- /dev/null
+#include <linux/fs.h>
+#include <linux/kernel.h>
+#include <linux/sched.h>
+#include <linux/wait.h>
+
+int ceph_debug_caps = -1;
+#define DOUT_VAR ceph_debug_caps
+#define DOUT_PREFIX "caps: "
+#include "super.h"
+
+#include "decode.h"
+#include "messenger.h"
+
+
+static struct ceph_inode_cap *__get_cap_for_mds(struct inode *inode, int mds)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_inode_cap *cap;
+ struct list_head *p;
+
+ list_for_each(p, &ci->i_caps) {
+ cap = list_entry(p, struct ceph_inode_cap, ci_caps);
+ if (cap->mds == mds)
+ return cap;
+ }
+ return 0;
+}
+
+int ceph_get_cap_mds(struct inode *inode)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_inode_cap *cap;
+ int mds = -1;
+
+ spin_lock(&inode->i_lock);
+ if (!list_empty(&ci->i_caps)) {
+ cap = list_first_entry(&ci->i_caps, struct ceph_inode_cap,
+ ci_caps);
+ mds = cap->mds;
+ }
+ spin_unlock(&inode->i_lock);
+ return mds;
+}
+
+/*
+ * caller should hold session snap_rwsem, s_mutex.
+ *
+ * @fmode can be negative, in which case it is ignored.
+ */
+int ceph_add_cap(struct inode *inode,
+ struct ceph_mds_session *session,
+ int fmode, unsigned issued,
+ unsigned seq, unsigned mseq,
+ void *snapblob, int snapblob_len)
+{
+ int mds = session->s_mds;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_inode_cap *cap, *new_cap = 0;
+ int i;
+ int is_new = 0;
+ struct ceph_snaprealm *realm = 0;
+ struct ceph_mds_client *mdsc = &ceph_inode_to_client(inode)->mdsc;
+
+ if (snapblob_len)
+ realm = ceph_update_snap_trace(mdsc,
+ snapblob, snapblob+snapblob_len,
+ 0);
+
+ dout(10, "ceph_add_cap on %p mds%d cap %d seq %d\n", inode,
+ session->s_mds, issued, seq);
+retry:
+ spin_lock(&inode->i_lock);
+ cap = __get_cap_for_mds(inode, mds);
+ if (!cap) {
+ for (i = 0; i < STATIC_CAPS; i++)
+ if (ci->i_static_caps[i].mds == -1) {
+ cap = &ci->i_static_caps[i];
+ break;
+ }
+ if (!cap) {
+ if (new_cap) {
+ cap = new_cap;
+ new_cap = 0;
+ } else {
+ spin_unlock(&inode->i_lock);
+ new_cap = kmalloc(sizeof(*cap), GFP_NOFS);
+ if (new_cap == 0)
+ return -ENOMEM;
+ goto retry;
+ }
+ }
+
+ is_new = 1; /* grab inode later */
+ cap->issued = cap->implemented = 0;
+ cap->mds = mds;
+ cap->flags = 0;
+ cap->flushed_snap = 0;
+
+ cap->ci = ci;
+ list_add(&cap->ci_caps, &ci->i_caps);
+
+ /* add to session cap list */
+ cap->session = session;
+ list_add(&cap->session_caps, &session->s_caps);
+ session->s_nr_caps++;
+
+ /* clear out old exporting info? */
+ if (ci->i_cap_exporting_mds == mds) {
+ ci->i_cap_exporting_issued = 0;
+ ci->i_cap_exporting_mseq = 0;
+ ci->i_cap_exporting_mds = -1;
+ }
+ }
+ if (!ci->i_snaprealm) {
+ ci->i_snaprealm = realm;
+ list_add(&ci->i_snaprealm_item, &realm->inodes_with_caps);
+ } else
+ ceph_put_snaprealm(realm);
+
+ dout(10, "add_cap inode %p (%llx.%llx) cap %xh now %xh seq %d mds%d\n",
+ inode, ceph_vinop(inode), issued, issued|cap->issued, seq, mds);
+ cap->issued |= issued;
+ cap->implemented |= issued;
+ cap->seq = seq;
+ cap->mseq = mseq;
+ cap->gen = session->s_cap_gen;
+ if (fmode >= 0)
+ __ceph_get_fmode(ci, fmode);
+ spin_unlock(&inode->i_lock);
+ if (is_new)
+ igrab(inode);
+ if (new_cap)
+ kfree(new_cap);
+ return 0;
+}
+
+int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
+{
+ int have = ci->i_snap_caps;
+ struct ceph_inode_cap *cap;
+ struct list_head *p;
+ u32 gen;
+ unsigned long ttl;
+
+ list_for_each(p, &ci->i_caps) {
+ cap = list_entry(p, struct ceph_inode_cap, ci_caps);
+
+ spin_lock(&cap->session->s_cap_lock);
+ gen = cap->session->s_cap_gen;
+ ttl = cap->session->s_cap_ttl;
+ spin_unlock(&cap->session->s_cap_lock);
+
+ if (cap->gen < gen || time_after_eq(jiffies, ttl)) {
+ dout(30, "__ceph_caps_issued %p cap %p issued %d "
+ "but STALE (gen %u vs %u)\n", &ci->vfs_inode,
+ cap, cap->issued, cap->gen, gen);
+ continue;
+ }
+ dout(30, "__ceph_caps_issued %p cap %p issued %d\n",
+ &ci->vfs_inode, cap, cap->issued);
+ have |= cap->issued;
+ if (implemented)
+ *implemented |= cap->implemented;
+ }
+ return have;
+}
+
+/*
+ * caller should hold i_lock, snap_rwsem, and session s_mutex.
+ * returns true if this is the last cap. if so, caller should iput.
+ */
+int __ceph_remove_cap(struct ceph_inode_cap *cap)
+{
+ struct ceph_mds_session *session = cap->session;
+ struct ceph_inode_info *ci = cap->ci;
+
+ dout(20, "__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode);
+
+ /* remove from session list */
+ list_del_init(&cap->session_caps);
+ session->s_nr_caps--;
+
+ /* remove from inode list */
+ list_del_init(&cap->ci_caps);
+ cap->session = 0;
+ cap->mds = -1; /* mark unused */
+
+ if (cap < ci->i_static_caps ||
+ cap >= ci->i_static_caps + STATIC_CAPS)
+ kfree(cap);
+
+ if (list_empty(&ci->i_caps)) {
+ list_del_init(&ci->i_snaprealm_item);
+ return 1;
+ }
+ return 0;
+}
+
+/*
+ * caller should hold snap_rwsem and session s_mutex.
+ */
+void ceph_remove_cap(struct ceph_inode_cap *cap)
+{
+ struct inode *inode = &cap->ci->vfs_inode;
+ int was_last;
+
+ spin_lock(&inode->i_lock);
+ was_last = __ceph_remove_cap(cap);
+ spin_unlock(&inode->i_lock);
+ if (was_last)
+ iput(inode);
+}
+
+/*
+ * caller holds i_lock
+ * -> client->cap_delay_lock
+ */
+void __ceph_cap_delay_requeue(struct ceph_mds_client *mdsc,
+ struct ceph_inode_info *ci)
+{
+ ci->i_hold_caps_until = round_jiffies(jiffies + HZ * 5);
+ dout(10, "__cap_delay_requeue %p at %lu\n", &ci->vfs_inode,
+ ci->i_hold_caps_until);
+ spin_lock(&mdsc->cap_delay_lock);
+ if (list_empty(&ci->i_cap_delay_list))
+ igrab(&ci->vfs_inode);
+ else
+ list_del_init(&ci->i_cap_delay_list);
+ list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
+ spin_unlock(&mdsc->cap_delay_lock);
+}
+
+
+/*
+ * examine currently used, wanted versus held caps.
+ * release, ack revoked caps to mds as appropriate.
+ * @is_delayed if caller just dropped a cap ref, and we probably want to delay
+ */
+void ceph_check_caps(struct ceph_inode_info *ci, int is_delayed, int flush_snap)
+{
+ struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode);
+ struct ceph_mds_client *mdsc = &client->mdsc;
+ struct inode *inode = &ci->vfs_inode;
+ struct ceph_inode_cap *cap;
+ struct list_head *p;
+ int wanted, used;
+ struct ceph_mds_session *session = 0; /* if non-NULL, i hold s_mutex */
+ int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */
+
+retry:
+ spin_lock(&inode->i_lock);
+ wanted = __ceph_caps_wanted(ci);
+ used = __ceph_caps_used(ci);
+ dout(10, "check_caps %p wanted %d used %d issued %d\n", inode,
+ wanted, used, __ceph_caps_issued(ci, 0));
+
+ if (!is_delayed)
+ __ceph_cap_delay_requeue(mdsc, ci);
+
+ list_for_each(p, &ci->i_caps) {
+ int revoking;
+ cap = list_entry(p, struct ceph_inode_cap, ci_caps);
+
+ /* note: no side-effects allowed, until we take s_mutex */
+ revoking = cap->implemented & ~cap->issued;
+
+ if (ci->i_wanted_max_size > ci->i_max_size &&
+ ci->i_wanted_max_size > ci->i_requested_max_size)
+ goto ack;
+
+ /* completed revocation? */
+ if (revoking && (revoking && used) == 0) {
+ dout(10, "completed revocation of %d\n",
+ cap->implemented & ~cap->issued);
+ goto ack;
+ }
+
+ /* approaching file_max? */
+ if ((cap->issued & CEPH_CAP_WR) &&
+ (inode->i_size << 1) >= ci->i_max_size &&
+ (ci->i_reported_size << 1) < ci->i_max_size) {
+ dout(10, "i_size approaching max_size\n");
+ goto ack;
+ }
+
+ /* flush snap? */
+ if (flush_snap &&
+ (cap->issued & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER))) {
+ if (cap->flushed_snap >=
+ ci->i_snaprealm->cached_context->seq) {
+ dout(10, "flushed_snap %llu >= seq %lld, "
+ "not flushing mds%d\n",
+ cap->flushed_snap,
+ ci->i_snaprealm->cached_context->seq,
+ cap->session->s_mds);
+ continue; /* already flushed for this snap */
+ }
+ goto ack;
+ }
+
+ if ((cap->issued & ~wanted) == 0)
+ continue; /* nothing extra, all good */
+
+ if (time_before(jiffies, ci->i_hold_caps_until)) {
+ /* delaying cap release for a bit */
+ dout(30, "delaying cap release\n");
+ continue;
+ }
+
+ack:
+ /* take s_mutex, one way or another */
+ if (session && session != cap->session) {
+ dout(30, "oops, wrong session %p mutex\n", session);
+ mutex_unlock(&session->s_mutex);
+ session = 0;
+ }
+ /* take snap_rwsem before session mutex */
+ if (!flush_snap && !took_snap_rwsem) {
+ if (down_write_trylock(&mdsc->snap_rwsem) == 0) {
+ dout(10, "inverting snap/in locks on %p\n",
+ inode);
+ spin_unlock(&inode->i_lock);
+ down_write(&mdsc->snap_rwsem);
+ took_snap_rwsem = 1;
+ goto retry;
+ }
+ took_snap_rwsem = 1;
+ }
+ if (!session) {
+ session = cap->session;
+ if (mutex_trylock(&session->s_mutex) == 0) {
+ dout(10, "inverting session/ino locks on %p\n",
+ session);
+ spin_unlock(&inode->i_lock);
+ mutex_lock(&session->s_mutex);
+ goto retry;
+ }
+ }
+
+ /* send_cap drops i_lock */
+ __ceph_mdsc_send_cap(mdsc, session, cap,
+ used, wanted, flush_snap);
+
+ goto retry; /* retake i_lock and restart our cap scan. */
+ }
+
+ /* okay */
+ spin_unlock(&inode->i_lock);
+
+ if (session)
+ mutex_unlock(&session->s_mutex);
+ if (took_snap_rwsem)
+ up_write(&mdsc->snap_rwsem);
+}
+
+
+/*
+ * cap refs
+ */
+
+static void __take_cap_refs(struct ceph_inode_info *ci, int got)
+{
+ if (got & CEPH_CAP_RD)
+ ci->i_rd_ref++;
+ if (got & CEPH_CAP_RDCACHE)
+ ci->i_rdcache_ref++;
+ if (got & CEPH_CAP_WR)
+ ci->i_wr_ref++;
+ if (got & CEPH_CAP_WRBUFFER) {
+ atomic_inc(&ci->i_wrbuffer_ref);
+ dout(30, "__take_cap_refs %p wrbuffer %d -> %d (?)\n",
+ &ci->vfs_inode, atomic_read(&ci->i_wrbuffer_ref)-1,
+ atomic_read(&ci->i_wrbuffer_ref));
+ }
+}
+
+void ceph_take_cap_refs(struct ceph_inode_info *ci, int got)
+{
+ dout(30, "take_cap_refs on %p taking %d\n", &ci->vfs_inode, got);
+ spin_lock(&ci->vfs_inode.i_lock);
+ __take_cap_refs(ci, got);
+ spin_unlock(&ci->vfs_inode.i_lock);
+}
+
+int ceph_get_cap_refs(struct ceph_inode_info *ci, int need, int want, int *got,
+ loff_t endoff)
+{
+ int ret = 0;
+ int have, implemented;
+
+ dout(30, "get_cap_refs on %p need %d want %d\n", &ci->vfs_inode,
+ need, want);
+ spin_lock(&ci->vfs_inode.i_lock);
+ if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) {
+ dout(20, "get_cap_refs endoff %llu > max_size %llu\n",
+ endoff, ci->i_max_size);
+ goto sorry;
+ }
+ have = __ceph_caps_issued(ci, &implemented);
+ if ((have & need) == need) {
+ /*
+ * look at (implemented & ~have & not) so that we keep waiting
+ * on transition from wanted -> needed caps. this is needed
+ * for WRBUFFER|WR -> WR to avoid a new WR sync write from
+ * going before a prior buffered writeback happens.
+ */
+ int not = want & ~(have & need);
+ int revoking = implemented & ~have;
+ dout(30, "get_cap_refs have %d but not %d (revoking %d)\n",
+ have, not, revoking);
+ if ((revoking & not) == 0) {
+ *got = need | (have & want);
+ __take_cap_refs(ci, *got);
+ ret = 1;
+ }
+ } else
+ dout(30, "get_cap_refs have %d needed %d\n", have, need);
+sorry:
+ spin_unlock(&ci->vfs_inode.i_lock);
+ dout(30, "get_cap_refs on %p ret %d got %d\n", &ci->vfs_inode,
+ ret, *got);
+ return ret;
+}
+
+void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
+{
+ int last = 0;
+
+ spin_lock(&ci->vfs_inode.i_lock);
+ if (had & CEPH_CAP_RD)
+ if (--ci->i_rd_ref == 0)
+ last++;
+ if (had & CEPH_CAP_RDCACHE)
+ if (--ci->i_rdcache_ref == 0)
+ last++;
+ if (had & CEPH_CAP_WR)
+ if (--ci->i_wr_ref == 0)
+ last++;
+ if (had & CEPH_CAP_WRBUFFER) {
+ if (atomic_dec_and_test(&ci->i_wrbuffer_ref))
+ last++;
+ dout(30, "put_cap_refs %p wrbuffer %d -> %d (?)\n",
+ &ci->vfs_inode, atomic_read(&ci->i_wrbuffer_ref)+1,
+ atomic_read(&ci->i_wrbuffer_ref));
+ }
+ spin_unlock(&ci->vfs_inode.i_lock);
+
+ dout(30, "put_cap_refs on %p had %d %s\n", &ci->vfs_inode, had,
+ last ? "last":"");
+
+ if (last)
+ ceph_check_caps(ci, 0, 0);
+}
+
+void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr)
+{
+ int was_last;
+ int v;
+
+ spin_lock(&ci->vfs_inode.i_lock);
+ was_last = atomic_sub_and_test(nr, &ci->i_wrbuffer_ref);
+ v = atomic_read(&ci->i_wrbuffer_ref);
+ spin_unlock(&ci->vfs_inode.i_lock);
+
+ dout(30, "put_wrbuffer_cap_refs on %p %d -> %d (?)%s\n",
+ &ci->vfs_inode, v+nr, v, was_last ? " LAST":"");
+ WARN_ON(v < 0);
+
+ if (was_last)
+ ceph_check_caps(ci, 0, 0);
+}
+
+
+
+
+static void send_cap(struct ceph_mds_client *mdsc, __u64 ino, int op,
+ int caps, int wanted, __u64 seq, __u64 mseq,
+ __u64 size, __u64 max_size,
+ struct timespec *mtime, struct timespec *atime,
+ u64 time_warp_seq, u64 follows, int mds)
+{
+ struct ceph_mds_caps *fc;
+ struct ceph_msg *msg;
+
+ dout(10, "send_cap %s %llx caps %d wanted %d seq %llu/%llu"
+ " follows %lld size %llu\n", ceph_cap_op_name(op), ino,
+ caps, wanted, seq, mseq, follows, size);
+
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), 0, 0, 0);
+ if (IS_ERR(msg))
+ return;
+
+ fc = msg->front.iov_base;
+
+ memset(fc, 0, sizeof(*fc));
+
+ fc->op = cpu_to_le32(op);
+ fc->seq = cpu_to_le64(seq);
+ fc->migrate_seq = cpu_to_le64(mseq);
+ fc->caps = cpu_to_le32(caps);
+ fc->wanted = cpu_to_le32(wanted);
+ fc->ino = cpu_to_le64(ino);
+ fc->size = cpu_to_le64(size);
+ fc->max_size = cpu_to_le64(max_size);
+ fc->snap_follows = cpu_to_le64(follows);
+ if (mtime)
+ ceph_encode_timespec(&fc->mtime, mtime);
+ if (atime)
+ ceph_encode_timespec(&fc->atime, atime);
+ fc->time_warp_seq = cpu_to_le64(time_warp_seq);
+
+ ceph_send_msg_mds(mdsc, msg, mds);
+}
+
+
+
+
+
+/*
+ * caller holds s_mutex. NOT snap_rwsem.
+ * return value:
+ * 0 - ok
+ * 1 - send the msg back to mds
+ */
+static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
+ struct ceph_mds_session *session)
+{
+ struct ceph_inode_cap *cap;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ int mds = session->s_mds;
+ int seq = le32_to_cpu(grant->seq);
+ int newcaps = le32_to_cpu(grant->caps);
+ int used;
+ int issued; /* to me, before */
+ int wanted;
+ int reply = 0;
+ u64 size = le64_to_cpu(grant->size);
+ u64 max_size = le64_to_cpu(grant->max_size);
+ struct timespec mtime, atime, ctime;
+ int wake = 0;
+ int writeback_now = 0;
+ int invalidate = 0;
+
+ dout(10, "handle_cap_grant inode %p ci %p mds%d seq %d\n",
+ inode, ci, mds, seq);
+ dout(10, " size %llu max_size %llu, i_size %llu\n", size, max_size,
+ inode->i_size);
+
+ spin_lock(&inode->i_lock);
+
+ /* do we have this cap? */
+ cap = __get_cap_for_mds(inode, mds);
+ if (!cap) {
+ /*
+ * then ignore. never reply to cap messages out of turn,
+ * or we'll be mixing up different instances of caps on the
+ * same inode, and confuse the mds.
+ */
+ dout(10, "no cap on %p ino %llx.%llx from mds%d, ignoring\n",
+ inode, ci->i_vino.ino, ci->i_vino.snap, mds);
+ goto out;
+ }
+ dout(10, " cap %p\n", cap);
+ cap->gen = session->s_cap_gen;
+
+ /* size/ctime/mtime/atime? */
+ issued = __ceph_caps_issued(ci, 0);
+ ceph_decode_timespec(&mtime, &grant->mtime);
+ ceph_decode_timespec(&atime, &grant->atime);
+ ceph_decode_timespec(&ctime, &grant->ctime);
+ ceph_fill_file_bits(inode, issued, le64_to_cpu(grant->time_warp_seq),
+ size, &ctime, &mtime, &atime);
+
+ /* max size increase? */
+ if (max_size != ci->i_max_size) {
+ dout(10, "max_size %lld -> %llu\n", ci->i_max_size, max_size);
+ ci->i_max_size = max_size;
+ if (max_size >= ci->i_wanted_max_size) {
+ ci->i_wanted_max_size = 0; /* reset */
+ ci->i_requested_max_size = 0;
+ }
+ wake = 1;
+ }
+
+ /* check cap bits */
+ wanted = __ceph_caps_wanted(ci);
+ used = __ceph_caps_used(ci);
+ dout(10, " my wanted = %d, used = %d\n", wanted, used);
+ if (wanted != le32_to_cpu(grant->wanted)) {
+ dout(10, "mds wanted %d -> %d\n", le32_to_cpu(grant->wanted),
+ wanted);
+ grant->wanted = cpu_to_le32(wanted);
+ }
+
+ cap->seq = seq;
+
+ /* layout may have changed */
+ ci->i_layout = grant->layout;
+
+ /* revocation? */
+ if (cap->issued & ~newcaps) {
+ dout(10, "revocation: %d -> %d\n", cap->issued, newcaps);
+ if ((cap->issued & ~newcaps) & CEPH_CAP_RDCACHE)
+ invalidate = 1;
+ if ((used & ~newcaps) & CEPH_CAP_WRBUFFER)
+ writeback_now = 1; /* will delay ack */
+ else {
+ cap->implemented = newcaps;
+ /* ack now. re-use incoming message. */
+ grant->size = le64_to_cpu(inode->i_size);
+ grant->max_size = 0; /* don't re-request */
+ ceph_encode_timespec(&grant->mtime, &inode->i_mtime);
+ ceph_encode_timespec(&grant->atime, &inode->i_atime);
+ grant->time_warp_seq = cpu_to_le64(ci->i_time_warp_seq);
+ grant->snap_follows =
+ cpu_to_le64(ci->i_snaprealm->cached_context->seq);
+ reply = 1;
+ wake = 1;
+ }
+ cap->issued = newcaps;
+ goto out;
+ }
+
+ /* grant or no-op */
+ if (cap->issued == newcaps) {
+ dout(10, "caps unchanged: %d -> %d\n", cap->issued, newcaps);
+ } else {
+ dout(10, "grant: %d -> %d\n", cap->issued, newcaps);
+ cap->implemented = cap->issued = newcaps;
+ wake = 1;
+ }
+
+out:
+ spin_unlock(&inode->i_lock);
+ if (wake)
+ wake_up(&ci->i_cap_wq);
+ if (writeback_now) {
+ /*
+ * queue inode for writeback; we can't actually call
+ * write_inode_now, writepages, etc. from this
+ * context.
+ */
+ dout(10, "queueing %p for writeback\n", inode);
+ ceph_queue_writeback(ceph_client(inode->i_sb), ci);
+ }
+ if (invalidate)
+ invalidate_mapping_pages(&inode->i_data, 0, -1);
+ return reply;
+}
+
+
+/*
+ * caller hold s_mutex, snap_rwsem.
+ */
+static void __cap_delay_cancel(struct ceph_mds_client *mdsc,
+ struct ceph_inode_info *ci)
+{
+ dout(10, "__cap_delay_cancel %p\n", &ci->vfs_inode);
+ if (list_empty(&ci->i_cap_delay_list))
+ return;
+ spin_lock(&mdsc->cap_delay_lock);
+ list_del_init(&ci->i_cap_delay_list);
+ spin_unlock(&mdsc->cap_delay_lock);
+ iput(&ci->vfs_inode);
+}
+
+static void handle_cap_released(struct inode *inode,
+ struct ceph_mds_caps *m,
+ struct ceph_mds_session *session)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ int seq = le32_to_cpu(m->seq);
+ int removed_last;
+ struct ceph_inode_cap *cap;
+
+ dout(10, "handle_cap_released inode %p ci %p mds%d seq %d\n", inode, ci,
+ session->s_mds, seq);
+
+ spin_lock(&inode->i_lock);
+ cap = __get_cap_for_mds(inode, session->s_mds);
+ BUG_ON(!cap);
+ removed_last = __ceph_remove_cap(cap);
+ if (removed_last)
+ __cap_delay_cancel(&ceph_inode_to_client(inode)->mdsc, ci);
+ spin_unlock(&inode->i_lock);
+ if (removed_last)
+ iput(inode);
+}
+
+
+/*
+ * caller hold s_mutex, snap_rwsem.
+ */
+static void handle_cap_flushedsnap(struct inode *inode,
+ struct ceph_mds_caps *m,
+ struct ceph_mds_session *session)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ int seq = le32_to_cpu(m->seq);
+
+ dout(10, "handle_cap_flushednsap inode %p ci %p mds%d seq %d\n", inode,
+ ci, session->s_mds, seq);
+
+ /* **** WRITE ME **** */
+}
+
+
+/*
+ * caller hold s_mutex, NOT snap_rwsem.
+ */
+static void handle_cap_trunc(struct inode *inode,
+ struct ceph_mds_caps *trunc,
+ struct ceph_mds_session *session)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ int mds = session->s_mds;
+ int seq = le32_to_cpu(trunc->seq);
+ u64 size = le64_to_cpu(trunc->size);
+ int queue_trunc = 0;
+
+ dout(10, "handle_cap_trunc inode %p ci %p mds%d seq %d\n", inode, ci,
+ mds, seq);
+
+ /*
+ * vmtruncate lazily; we can't block on i_mutex in the message
+ * handler path, or we deadlock against osd op replies needed
+ * to complete the writes holding i_lock. vmtruncate will
+ * also block on page locks held by writes...
+ *
+ * if its an expansion, and there is no truncate pending, we
+ * don't need to truncate.
+ */
+
+ spin_lock(&inode->i_lock);
+ if (ci->i_vmtruncate_to < 0 && size > inode->i_size)
+ dout(10, "clean fwd truncate, no vmtruncate needed\n");
+ else if (ci->i_vmtruncate_to >= 0 && size >= ci->i_vmtruncate_to)
+ dout(10, "trunc to %lld < %lld already queued\n",
+ ci->i_vmtruncate_to, size);
+ else {
+ /* we need to trunc even smaller */
+ dout(10, "queueing trunc %lld -> %lld\n", inode->i_size, size);
+ ci->i_vmtruncate_to = size;
+ queue_trunc = 1;
+ }
+ i_size_write(inode, size);
+ ci->i_reported_size = size;
+ spin_unlock(&inode->i_lock);
+
+ if (queue_trunc)
+ queue_work(ceph_client(inode->i_sb)->trunc_wq,
+ &ci->i_vmtruncate_work);
+}
+
+static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
+ struct ceph_mds_session *session)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ int mds = session->s_mds;
+ unsigned mseq = le32_to_cpu(ex->migrate_seq);
+ struct ceph_inode_cap *cap = 0, *t;
+ struct list_head *p;
+ int was_last = 0;
+
+ dout(10, "handle_cap_export inode %p ci %p mds%d mseq %d\n",
+ inode, ci, mds, mseq);
+
+ spin_lock(&inode->i_lock);
+
+ /* make sure we haven't seen a higher mseq */
+ list_for_each(p, &ci->i_caps) {
+ t = list_entry(p, struct ceph_inode_cap, ci_caps);
+ if (t->mseq > mseq) {
+ dout(10, " higher mseq on cap from mds%d\n",
+ t->session->s_mds);
+ goto out;
+ }
+ if (t->session->s_mds == mds)
+ cap = t;
+ }
+
+ if (cap) {
+ /* make note, and remove */
+ ci->i_cap_exporting_mds = mds;
+ ci->i_cap_exporting_mseq = mseq;
+ ci->i_cap_exporting_issued = cap->issued;
+ was_last = __ceph_remove_cap(cap);
+ } else
+ WARN_ON(!cap);
+
+out:
+ spin_unlock(&inode->i_lock);
+ if (was_last)
+ iput(inode);
+}
+
+static void handle_cap_import(struct inode *inode, struct ceph_mds_caps *im,
+ struct ceph_mds_session *session,
+ void *snaptrace, int snaptrace_len)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ int mds = session->s_mds;
+ unsigned issued = le32_to_cpu(im->caps);
+ unsigned seq = le32_to_cpu(im->seq);
+ unsigned mseq = le32_to_cpu(im->migrate_seq);
+
+ if (ci->i_cap_exporting_mds >= 0 &&
+ ci->i_cap_exporting_mseq < mseq) {
+ dout(10, "handle_cap_import inode %p ci %p mds%d mseq %d"
+ " - cleared exporting from mds%d\n",
+ inode, ci, mds, mseq,
+ ci->i_cap_exporting_mds);
+ ci->i_cap_exporting_issued = 0;
+ ci->i_cap_exporting_mseq = 0;
+ ci->i_cap_exporting_mds = -1;
+ } else {
+ dout(10, "handle_cap_import inode %p ci %p mds%d mseq %d\n",
+ inode, ci, mds, mseq);
+ }
+
+ ceph_add_cap(inode, session, -1, issued, seq, mseq,
+ snaptrace, snaptrace_len);
+}
+
+
+
+void ceph_handle_caps(struct ceph_mds_client *mdsc,
+ struct ceph_msg *msg)
+{
+ struct super_block *sb = mdsc->client->sb;
+ struct ceph_mds_session *session;
+ struct inode *inode;
+ struct ceph_mds_caps *h;
+ int mds = le32_to_cpu(msg->hdr.src.name.num);
+ int op;
+ u32 seq;
+ struct ceph_vino vino;
+ u64 size, max_size;
+
+ dout(10, "handle_caps from mds%d\n", mds);
+
+ /* decode */
+ if (msg->front.iov_len < sizeof(*h))
+ goto bad;
+ h = msg->front.iov_base;
+ op = le32_to_cpu(h->op);
+ vino.ino = le64_to_cpu(h->ino);
+ vino.snap = CEPH_NOSNAP;
+ seq = le32_to_cpu(h->seq);
+ size = le64_to_cpu(h->size);
+ max_size = le64_to_cpu(h->max_size);
+
+ /* find session */
+ mutex_lock(&mdsc->mutex);
+ session = __ceph_get_mds_session(mdsc, mds);
+ if (session)
+ down_write(&mdsc->snap_rwsem);
+ mutex_unlock(&mdsc->mutex);
+ if (!session) {
+ dout(10, "WTF, got cap but no session for mds%d\n", mds);
+ return;
+ }
+
+ mutex_lock(&session->s_mutex);
+ session->s_seq++;
+
+ /* lookup ino */
+ inode = ceph_find_inode(sb, vino);
+ dout(20, "op %d ino %llx inode %p\n", op, vino.ino, inode);
+ if (!inode) {
+ dout(10, "i don't have ino %llx, sending release\n", vino.ino);
+ send_cap(mdsc, vino.ino, CEPH_CAP_OP_RELEASE, 0, 0, seq,
+ size, 0, 0, 0, 0, 0, 0, mds);
+ goto no_inode;
+ }
+
+ switch (op) {
+ case CEPH_CAP_OP_GRANT:
+ up_write(&mdsc->snap_rwsem);
+ if (handle_cap_grant(inode, h, session) == 1) {
+ dout(10, "sending reply back to mds%d\n", mds);
+ ceph_msg_get(msg);
+ ceph_send_msg_mds(mdsc, msg, mds);
+ }
+ break;
+
+ case CEPH_CAP_OP_TRUNC:
+ up_write(&mdsc->snap_rwsem);
+ handle_cap_trunc(inode, h, session);
+ break;
+
+ case CEPH_CAP_OP_RELEASED:
+ handle_cap_released(inode, h, session);
+ up_write(&mdsc->snap_rwsem);
+ break;
+
+ case CEPH_CAP_OP_FLUSHEDSNAP:
+ handle_cap_flushedsnap(inode, h, session);
+ up_write(&mdsc->snap_rwsem);
+ break;
+
+ case CEPH_CAP_OP_EXPORT:
+ handle_cap_export(inode, h, session);
+ up_write(&mdsc->snap_rwsem);
+ break;
+
+ case CEPH_CAP_OP_IMPORT:
+ handle_cap_import(inode, h, session,
+ msg->front.iov_base + sizeof(*h),
+ le32_to_cpu(h->snap_trace_len));
+ up_write(&mdsc->snap_rwsem);
+ break;
+
+ default:
+ up_write(&mdsc->snap_rwsem);
+ derr(10, "unknown cap op %d %s\n", op, ceph_cap_op_name(op));
+ }
+
+ iput(inode);
+no_inode:
+ mutex_unlock(&session->s_mutex);
+ ceph_put_mds_session(session);
+ return;
+
+bad:
+ derr(10, "corrupt caps message\n");
+ return;
+}
+
+/*
+ * called with i_lock, then drops it.
+ * caller should hold snap_rwsem, s_mutex.
+ *
+ * returns true if we removed the last cap on this inode.
+ */
+int __ceph_mdsc_send_cap(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session,
+ struct ceph_inode_cap *cap,
+ int used, int wanted,
+ int flush_snap)
+{
+ struct ceph_inode_info *ci = cap->ci;
+ struct inode *inode = &ci->vfs_inode;
+ int revoking = cap->implemented & ~cap->issued;
+ int dropping = cap->issued & ~wanted;
+ int keep;
+ u64 seq, mseq, time_warp_seq, follows;
+ u64 size, max_size;
+ struct timespec mtime, atime;
+ int wake = 0;
+ int op = CEPH_CAP_OP_ACK;
+
+ if (flush_snap)
+ op = CEPH_CAP_OP_FLUSHSNAP;
+ else if (wanted == 0)
+ op = CEPH_CAP_OP_RELEASE;
+
+ dout(10, "__send_cap cap %p session %p %d -> %d\n", cap, cap->session,
+ cap->issued, cap->issued & wanted);
+ cap->issued &= wanted; /* drop bits we don't want */
+
+ if (revoking && (revoking && used) == 0) {
+ cap->implemented = cap->issued;
+ wake = 1; /* for waiters on wanted -> needed transition */
+ }
+
+ keep = cap->issued;
+ seq = cap->seq;
+ mseq = cap->mseq;
+ size = inode->i_size;
+ ci->i_reported_size = size;
+ max_size = ci->i_wanted_max_size;
+ ci->i_requested_max_size = max_size;
+ mtime = inode->i_mtime;
+ atime = inode->i_atime;
+ time_warp_seq = ci->i_time_warp_seq;
+ follows = ci->i_snaprealm->cached_context->seq;
+ if (flush_snap)
+ cap->flushed_snap = follows; /* so we only flush it once */
+ spin_unlock(&inode->i_lock);
+
+ if (dropping & CEPH_CAP_RDCACHE) {
+ /*
+ * FIXME: this will block if there is a locked page..
+ */
+ dout(20, "invalidating pages on %p\n", inode);
+ invalidate_mapping_pages(&inode->i_data, 0, -1);
+ dout(20, "done invalidating pages on %p\n", inode);
+ }
+
+ send_cap(mdsc, ceph_vino(inode).ino,
+ op, keep, wanted, seq, mseq,
+ size, max_size, &mtime, &atime, time_warp_seq,
+ follows, session->s_mds);
+
+ if (wake)
+ wake_up(&ci->i_cap_wq);
+
+ return 0;
+}
+
+void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
+{
+ struct ceph_inode_info *ci;
+
+ dout(10, "check_delayed_caps\n");
+ while (1) {
+ spin_lock(&mdsc->cap_delay_lock);
+ if (list_empty(&mdsc->cap_delay_list))
+ break;
+ ci = list_first_entry(&mdsc->cap_delay_list,
+ struct ceph_inode_info,
+ i_cap_delay_list);
+ if (time_before(jiffies, ci->i_hold_caps_until))
+ break;
+ list_del_init(&ci->i_cap_delay_list);
+ spin_unlock(&mdsc->cap_delay_lock);
+ dout(10, "check_delayed_caps on %p\n", &ci->vfs_inode);
+ ceph_check_caps(ci, 1, 0);
+ iput(&ci->vfs_inode);
+ }
+ spin_unlock(&mdsc->cap_delay_lock);
+}
+
+void ceph_flush_write_caps(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session,
+ int purge)
+{
+ struct list_head *p, *n;
+
+ list_for_each_safe (p, n, &session->s_caps) {
+ struct ceph_inode_cap *cap =
+ list_entry(p, struct ceph_inode_cap, session_caps);
+ struct inode *inode = &cap->ci->vfs_inode;
+ int used, wanted;
+
+ spin_lock(&inode->i_lock);
+ if ((cap->implemented & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) == 0) {
+ spin_unlock(&inode->i_lock);
+ continue;
+ }
+
+ used = __ceph_caps_used(cap->ci);
+ wanted = __ceph_caps_wanted(cap->ci);
+
+ if (purge && (used || wanted)) {
+ derr(0, "residual caps on %p used %d wanted %d s=%llu wrb=%d\n",
+ inode, used, wanted, inode->i_size,
+ atomic_read(&cap->ci->i_wrbuffer_ref));
+ used = wanted = 0;
+ }
+
+ __ceph_mdsc_send_cap(mdsc, session, cap, used, wanted, 0);
+ }
+}
+
+
* depending on which capabilities/were help, and on the time_warp_seq
* (which we increment on utimes()).
*/
-static void fill_file_bits(struct inode *inode, int issued, u64 time_warp_seq,
- u64 size, struct timespec *ctime,
- struct timespec *mtime, struct timespec *atime)
+void ceph_fill_file_bits(struct inode *inode, int issued, u64 time_warp_seq,
+ u64 size, struct timespec *ctime,
+ struct timespec *mtime, struct timespec *atime)
{
struct ceph_inode_info *ci = ceph_inode(inode);
u64 blocks = (size + (1<<9) - 1) >> 9;
ceph_decode_timespec(&ctime, &info->ctime);
issued = __ceph_caps_issued(ci, 0);
- fill_file_bits(inode, issued, le64_to_cpu(info->time_warp_seq), size,
- &ctime, &mtime, &atime);
+ ceph_fill_file_bits(inode, issued, le64_to_cpu(info->time_warp_seq),
+ size, &ctime, &mtime, &atime);
inode->i_blkbits = blkbits;
}
-/*
- * capabilities
- */
-
-static struct ceph_inode_cap *__get_cap_for_mds(struct inode *inode, int mds)
-{
- struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_inode_cap *cap;
- struct list_head *p;
-
- list_for_each(p, &ci->i_caps) {
- cap = list_entry(p, struct ceph_inode_cap, ci_caps);
- if (cap->mds == mds)
- return cap;
- }
- return 0;
-}
-
-int ceph_get_cap_mds(struct inode *inode)
-{
- struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_inode_cap *cap;
- int mds = -1;
-
- spin_lock(&inode->i_lock);
- if (!list_empty(&ci->i_caps)) {
- cap = list_first_entry(&ci->i_caps, struct ceph_inode_cap,
- ci_caps);
- mds = cap->mds;
- }
- spin_unlock(&inode->i_lock);
- return mds;
-}
-
-/*
- * caller should hold session snap_rwsem, s_mutex.
- *
- * @fmode can be negative, in which case it is ignored.
- */
-int ceph_add_cap(struct inode *inode,
- struct ceph_mds_session *session,
- int fmode, unsigned issued,
- unsigned seq, unsigned mseq,
- void *snapblob, int snapblob_len)
-{
- int mds = session->s_mds;
- struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_inode_cap *cap, *new_cap = 0;
- int i;
- int is_new = 0;
- struct ceph_snaprealm *realm = 0;
- struct ceph_mds_client *mdsc = &ceph_inode_to_client(inode)->mdsc;
-
- if (snapblob_len)
- realm = ceph_update_snap_trace(mdsc,
- snapblob, snapblob+snapblob_len,
- 0);
-
- dout(10, "ceph_add_cap on %p mds%d cap %d seq %d\n", inode,
- session->s_mds, issued, seq);
-retry:
- spin_lock(&inode->i_lock);
- cap = __get_cap_for_mds(inode, mds);
- if (!cap) {
- for (i = 0; i < STATIC_CAPS; i++)
- if (ci->i_static_caps[i].mds == -1) {
- cap = &ci->i_static_caps[i];
- break;
- }
- if (!cap) {
- if (new_cap) {
- cap = new_cap;
- new_cap = 0;
- } else {
- spin_unlock(&inode->i_lock);
- new_cap = kmalloc(sizeof(*cap), GFP_NOFS);
- if (new_cap == 0)
- return -ENOMEM;
- goto retry;
- }
- }
-
- is_new = 1; /* grab inode later */
- cap->issued = cap->implemented = 0;
- cap->mds = mds;
- cap->flags = 0;
- cap->flushed_snap = 0;
-
- cap->ci = ci;
- list_add(&cap->ci_caps, &ci->i_caps);
-
- /* add to session cap list */
- cap->session = session;
- list_add(&cap->session_caps, &session->s_caps);
- session->s_nr_caps++;
-
- /* clear out old exporting info? */
- if (ci->i_cap_exporting_mds == mds) {
- ci->i_cap_exporting_issued = 0;
- ci->i_cap_exporting_mseq = 0;
- ci->i_cap_exporting_mds = -1;
- }
- }
- if (!ci->i_snaprealm) {
- ci->i_snaprealm = realm;
- list_add(&ci->i_snaprealm_item, &realm->inodes_with_caps);
- } else
- ceph_put_snaprealm(realm);
-
- dout(10, "add_cap inode %p (%llx.%llx) cap %xh now %xh seq %d mds%d\n",
- inode, ceph_vinop(inode), issued, issued|cap->issued, seq, mds);
- cap->issued |= issued;
- cap->implemented |= issued;
- cap->seq = seq;
- cap->mseq = mseq;
- cap->gen = session->s_cap_gen;
- if (fmode >= 0)
- __ceph_get_fmode(ci, fmode);
- spin_unlock(&inode->i_lock);
- if (is_new)
- igrab(inode);
- if (new_cap)
- kfree(new_cap);
- return 0;
-}
-
-int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
-{
- int have = ci->i_snap_caps;
- struct ceph_inode_cap *cap;
- struct list_head *p;
- u32 gen;
- unsigned long ttl;
-
- list_for_each(p, &ci->i_caps) {
- cap = list_entry(p, struct ceph_inode_cap, ci_caps);
-
- spin_lock(&cap->session->s_cap_lock);
- gen = cap->session->s_cap_gen;
- ttl = cap->session->s_cap_ttl;
- spin_unlock(&cap->session->s_cap_lock);
- if (cap->gen < gen || time_after_eq(jiffies, ttl)) {
- dout(30, "__ceph_caps_issued %p cap %p issued %d "
- "but STALE (gen %u vs %u)\n", &ci->vfs_inode,
- cap, cap->issued, cap->gen, gen);
- continue;
- }
- dout(30, "__ceph_caps_issued %p cap %p issued %d\n",
- &ci->vfs_inode, cap, cap->issued);
- have |= cap->issued;
- if (implemented)
- *implemented |= cap->implemented;
- }
- return have;
-}
-
-/*
- * caller should hold i_lock, snap_rwsem, and session s_mutex.
- * returns true if this is the last cap. if so, caller should iput.
- */
-int __ceph_remove_cap(struct ceph_inode_cap *cap)
-{
- struct ceph_mds_session *session = cap->session;
- struct ceph_inode_info *ci = cap->ci;
-
- dout(20, "__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode);
-
- /* remove from session list */
- list_del_init(&cap->session_caps);
- session->s_nr_caps--;
-
- /* remove from inode list */
- list_del_init(&cap->ci_caps);
- cap->session = 0;
- cap->mds = -1; /* mark unused */
-
- if (cap < ci->i_static_caps ||
- cap >= ci->i_static_caps + STATIC_CAPS)
- kfree(cap);
-
- if (list_empty(&ci->i_caps)) {
- list_del_init(&ci->i_snaprealm_item);
- return 1;
- }
- return 0;
-}
-
-/*
- * caller should hold snap_rwsem and session s_mutex.
- */
-void ceph_remove_cap(struct ceph_inode_cap *cap)
-{
- struct inode *inode = &cap->ci->vfs_inode;
- int was_last;
-
- spin_lock(&inode->i_lock);
- was_last = __ceph_remove_cap(cap);
- spin_unlock(&inode->i_lock);
- if (was_last)
- iput(inode);
-}
-
-/*
- * caller holds i_lock
- * -> client->cap_delay_lock
- */
-void __ceph_cap_delay_requeue(struct ceph_mds_client *mdsc,
- struct ceph_inode_info *ci)
-{
- ci->i_hold_caps_until = round_jiffies(jiffies + HZ * 5);
- dout(10, "__cap_delay_requeue %p at %lu\n", &ci->vfs_inode,
- ci->i_hold_caps_until);
- spin_lock(&mdsc->cap_delay_lock);
- if (list_empty(&ci->i_cap_delay_list))
- igrab(&ci->vfs_inode);
- else
- list_del_init(&ci->i_cap_delay_list);
- list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
- spin_unlock(&mdsc->cap_delay_lock);
-}
-
-
-/*
- * examine currently used, wanted versus held caps.
- * release, ack revoked caps to mds as appropriate.
- * @is_delayed if caller just dropped a cap ref, and we probably want to delay
- */
-void ceph_check_caps(struct ceph_inode_info *ci, int is_delayed, int flush_snap)
-{
- struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode);
- struct ceph_mds_client *mdsc = &client->mdsc;
- struct inode *inode = &ci->vfs_inode;
- struct ceph_inode_cap *cap;
- struct list_head *p;
- int wanted, used;
- struct ceph_mds_session *session = 0; /* if non-NULL, i hold s_mutex */
- int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */
-
-retry:
- spin_lock(&inode->i_lock);
- wanted = __ceph_caps_wanted(ci);
- used = __ceph_caps_used(ci);
- dout(10, "check_caps %p wanted %d used %d issued %d\n", inode,
- wanted, used, __ceph_caps_issued(ci, 0));
-
- if (!is_delayed)
- __ceph_cap_delay_requeue(mdsc, ci);
-
- list_for_each(p, &ci->i_caps) {
- int revoking;
- cap = list_entry(p, struct ceph_inode_cap, ci_caps);
-
- /* note: no side-effects allowed, until we take s_mutex */
- revoking = cap->implemented & ~cap->issued;
-
- if (ci->i_wanted_max_size > ci->i_max_size &&
- ci->i_wanted_max_size > ci->i_requested_max_size)
- goto ack;
-
- /* completed revocation? */
- if (revoking && (revoking && used) == 0) {
- dout(10, "completed revocation of %d\n",
- cap->implemented & ~cap->issued);
- goto ack;
- }
-
- /* approaching file_max? */
- if ((cap->issued & CEPH_CAP_WR) &&
- (inode->i_size << 1) >= ci->i_max_size &&
- (ci->i_reported_size << 1) < ci->i_max_size) {
- dout(10, "i_size approaching max_size\n");
- goto ack;
- }
-
- /* flush snap? */
- if (flush_snap &&
- (cap->issued & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER))) {
- if (cap->flushed_snap >=
- ci->i_snaprealm->cached_context->seq) {
- dout(10, "flushed_snap %llu >= seq %lld, "
- "not flushing mds%d\n",
- cap->flushed_snap,
- ci->i_snaprealm->cached_context->seq,
- cap->session->s_mds);
- continue; /* already flushed for this snap */
- }
- goto ack;
- }
-
- if ((cap->issued & ~wanted) == 0)
- continue; /* nothing extra, all good */
-
- if (time_before(jiffies, ci->i_hold_caps_until)) {
- /* delaying cap release for a bit */
- dout(30, "delaying cap release\n");
- continue;
- }
-
-ack:
- /* take s_mutex, one way or another */
- if (session && session != cap->session) {
- dout(30, "oops, wrong session %p mutex\n", session);
- mutex_unlock(&session->s_mutex);
- session = 0;
- }
- /* take snap_rwsem before session mutex */
- if (!flush_snap && !took_snap_rwsem) {
- if (down_write_trylock(&mdsc->snap_rwsem) == 0) {
- dout(10, "inverting snap/in locks on %p\n",
- inode);
- spin_unlock(&inode->i_lock);
- down_write(&mdsc->snap_rwsem);
- took_snap_rwsem = 1;
- goto retry;
- }
- took_snap_rwsem = 1;
- }
- if (!session) {
- session = cap->session;
- if (mutex_trylock(&session->s_mutex) == 0) {
- dout(10, "inverting session/ino locks on %p\n",
- session);
- spin_unlock(&inode->i_lock);
- mutex_lock(&session->s_mutex);
- goto retry;
- }
- }
-
- /* send_cap drops i_lock */
- __ceph_mdsc_send_cap(mdsc, session, cap,
- used, wanted, flush_snap);
-
- goto retry; /* retake i_lock and restart our cap scan. */
- }
-
- /* okay */
- spin_unlock(&inode->i_lock);
-
- if (session)
- mutex_unlock(&session->s_mutex);
- if (took_snap_rwsem)
- up_write(&mdsc->snap_rwsem);
-}
void ceph_inode_set_size(struct inode *inode, loff_t size)
{
}
-/*
- * caller holds s_mutex. NOT snap_rwsem.
- * return value:
- * 0 - ok
- * 1 - send the msg back to mds
- */
-int ceph_handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
- struct ceph_mds_session *session)
-{
- struct ceph_inode_cap *cap;
- struct ceph_inode_info *ci = ceph_inode(inode);
- int mds = session->s_mds;
- int seq = le32_to_cpu(grant->seq);
- int newcaps = le32_to_cpu(grant->caps);
- int used;
- int issued; /* to me, before */
- int wanted;
- int reply = 0;
- u64 size = le64_to_cpu(grant->size);
- u64 max_size = le64_to_cpu(grant->max_size);
- struct timespec mtime, atime, ctime;
- int wake = 0;
- int writeback_now = 0;
- int invalidate = 0;
-
- dout(10, "handle_cap_grant inode %p ci %p mds%d seq %d\n",
- inode, ci, mds, seq);
- dout(10, " size %llu max_size %llu, i_size %llu\n", size, max_size,
- inode->i_size);
-
- spin_lock(&inode->i_lock);
-
- /* do we have this cap? */
- cap = __get_cap_for_mds(inode, mds);
- if (!cap) {
- /*
- * then ignore. never reply to cap messages out of turn,
- * or we'll be mixing up different instances of caps on the
- * same inode, and confuse the mds.
- */
- dout(10, "no cap on %p ino %llx.%llx from mds%d, ignoring\n",
- inode, ci->i_vino.ino, ci->i_vino.snap, mds);
- goto out;
- }
- dout(10, " cap %p\n", cap);
- cap->gen = session->s_cap_gen;
-
- /* size/ctime/mtime/atime? */
- issued = __ceph_caps_issued(ci, 0);
- ceph_decode_timespec(&mtime, &grant->mtime);
- ceph_decode_timespec(&atime, &grant->atime);
- ceph_decode_timespec(&ctime, &grant->ctime);
- fill_file_bits(inode, issued, le64_to_cpu(grant->time_warp_seq),
- size, &ctime, &mtime, &atime);
-
- /* max size increase? */
- if (max_size != ci->i_max_size) {
- dout(10, "max_size %lld -> %llu\n", ci->i_max_size, max_size);
- ci->i_max_size = max_size;
- if (max_size >= ci->i_wanted_max_size) {
- ci->i_wanted_max_size = 0; /* reset */
- ci->i_requested_max_size = 0;
- }
- wake = 1;
- }
-
- /* check cap bits */
- wanted = __ceph_caps_wanted(ci);
- used = __ceph_caps_used(ci);
- dout(10, " my wanted = %d, used = %d\n", wanted, used);
- if (wanted != le32_to_cpu(grant->wanted)) {
- dout(10, "mds wanted %d -> %d\n", le32_to_cpu(grant->wanted),
- wanted);
- grant->wanted = cpu_to_le32(wanted);
- }
-
- cap->seq = seq;
-
- /* layout may have changed */
- ci->i_layout = grant->layout;
-
- /* revocation? */
- if (cap->issued & ~newcaps) {
- dout(10, "revocation: %d -> %d\n", cap->issued, newcaps);
- if ((cap->issued & ~newcaps) & CEPH_CAP_RDCACHE)
- invalidate = 1;
- if ((used & ~newcaps) & CEPH_CAP_WRBUFFER)
- writeback_now = 1; /* will delay ack */
- else {
- cap->implemented = newcaps;
- /* ack now. re-use incoming message. */
- grant->size = le64_to_cpu(inode->i_size);
- grant->max_size = 0; /* don't re-request */
- ceph_encode_timespec(&grant->mtime, &inode->i_mtime);
- ceph_encode_timespec(&grant->atime, &inode->i_atime);
- grant->time_warp_seq = cpu_to_le64(ci->i_time_warp_seq);
- grant->snap_follows =
- cpu_to_le64(ci->i_snaprealm->cached_context->seq);
- reply = 1;
- wake = 1;
- }
- cap->issued = newcaps;
- goto out;
- }
-
- /* grant or no-op */
- if (cap->issued == newcaps) {
- dout(10, "caps unchanged: %d -> %d\n", cap->issued, newcaps);
- } else {
- dout(10, "grant: %d -> %d\n", cap->issued, newcaps);
- cap->implemented = cap->issued = newcaps;
- wake = 1;
- }
-
-out:
- spin_unlock(&inode->i_lock);
- if (wake)
- wake_up(&ci->i_cap_wq);
- if (writeback_now) {
- /*
- * queue inode for writeback; we can't actually call
- * write_inode_now, writepages, etc. from this
- * context.
- */
- dout(10, "queueing %p for writeback\n", inode);
- ceph_queue_writeback(ceph_client(inode->i_sb), ci);
- }
- if (invalidate)
- invalidate_mapping_pages(&inode->i_data, 0, -1);
- return reply;
-}
-
void ceph_inode_writeback(struct work_struct *work)
{
struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
dout(10, "__do_pending_vmtruncate %p nothing to do\n", inode);
}
-/*
- * caller hold s_mutex, snap_rwsem.
- */
-static void __cap_delay_cancel(struct ceph_mds_client *mdsc,
- struct ceph_inode_info *ci)
-{
- dout(10, "__cap_delay_cancel %p\n", &ci->vfs_inode);
- if (list_empty(&ci->i_cap_delay_list))
- return;
- spin_lock(&mdsc->cap_delay_lock);
- list_del_init(&ci->i_cap_delay_list);
- spin_unlock(&mdsc->cap_delay_lock);
- iput(&ci->vfs_inode);
-}
-
-void ceph_handle_cap_released(struct inode *inode,
- struct ceph_mds_caps *m,
- struct ceph_mds_session *session)
-{
- struct ceph_inode_info *ci = ceph_inode(inode);
- int seq = le32_to_cpu(m->seq);
- int removed_last;
- struct ceph_inode_cap *cap;
-
- dout(10, "handle_cap_released inode %p ci %p mds%d seq %d\n", inode, ci,
- session->s_mds, seq);
-
- spin_lock(&inode->i_lock);
- cap = __get_cap_for_mds(inode, session->s_mds);
- BUG_ON(!cap);
- removed_last = __ceph_remove_cap(cap);
- if (removed_last)
- __cap_delay_cancel(&ceph_inode_to_client(inode)->mdsc, ci);
- spin_unlock(&inode->i_lock);
- if (removed_last)
- iput(inode);
-}
-
-/*
- * caller hold s_mutex, snap_rwsem.
- */
-void ceph_handle_cap_flushedsnap(struct inode *inode,
- struct ceph_mds_caps *m,
- struct ceph_mds_session *session)
-{
- struct ceph_inode_info *ci = ceph_inode(inode);
- int seq = le32_to_cpu(m->seq);
-
- dout(10, "handle_cap_flushednsap inode %p ci %p mds%d seq %d\n", inode,
- ci, session->s_mds, seq);
-
- /* **** WRITE ME **** */
-}
-
-
-/*
- * caller hold s_mutex, NOT snap_rwsem.
- */
-void ceph_handle_cap_trunc(struct inode *inode,
- struct ceph_mds_caps *trunc,
- struct ceph_mds_session *session)
-{
- struct ceph_inode_info *ci = ceph_inode(inode);
- int mds = session->s_mds;
- int seq = le32_to_cpu(trunc->seq);
- u64 size = le64_to_cpu(trunc->size);
- int queue_trunc = 0;
-
- dout(10, "handle_cap_trunc inode %p ci %p mds%d seq %d\n", inode, ci,
- mds, seq);
-
- /*
- * vmtruncate lazily; we can't block on i_mutex in the message
- * handler path, or we deadlock against osd op replies needed
- * to complete the writes holding i_lock. vmtruncate will
- * also block on page locks held by writes...
- *
- * if its an expansion, and there is no truncate pending, we
- * don't need to truncate.
- */
-
- spin_lock(&inode->i_lock);
- if (ci->i_vmtruncate_to < 0 && size > inode->i_size)
- dout(10, "clean fwd truncate, no vmtruncate needed\n");
- else if (ci->i_vmtruncate_to >= 0 && size >= ci->i_vmtruncate_to)
- dout(10, "trunc to %lld < %lld already queued\n",
- ci->i_vmtruncate_to, size);
- else {
- /* we need to trunc even smaller */
- dout(10, "queueing trunc %lld -> %lld\n", inode->i_size, size);
- ci->i_vmtruncate_to = size;
- queue_trunc = 1;
- }
- i_size_write(inode, size);
- ci->i_reported_size = size;
- spin_unlock(&inode->i_lock);
-
- if (queue_trunc)
- queue_work(ceph_client(inode->i_sb)->trunc_wq,
- &ci->i_vmtruncate_work);
-}
-
-void ceph_handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
- struct ceph_mds_session *session)
-{
- struct ceph_inode_info *ci = ceph_inode(inode);
- int mds = session->s_mds;
- unsigned mseq = le32_to_cpu(ex->migrate_seq);
- struct ceph_inode_cap *cap = 0, *t;
- struct list_head *p;
- int was_last = 0;
-
- dout(10, "handle_cap_export inode %p ci %p mds%d mseq %d\n",
- inode, ci, mds, mseq);
-
- spin_lock(&inode->i_lock);
-
- /* make sure we haven't seen a higher mseq */
- list_for_each(p, &ci->i_caps) {
- t = list_entry(p, struct ceph_inode_cap, ci_caps);
- if (t->mseq > mseq) {
- dout(10, " higher mseq on cap from mds%d\n",
- t->session->s_mds);
- goto out;
- }
- if (t->session->s_mds == mds)
- cap = t;
- }
-
- if (cap) {
- /* make note, and remove */
- ci->i_cap_exporting_mds = mds;
- ci->i_cap_exporting_mseq = mseq;
- ci->i_cap_exporting_issued = cap->issued;
- was_last = __ceph_remove_cap(cap);
- } else
- WARN_ON(!cap);
-
-out:
- spin_unlock(&inode->i_lock);
- if (was_last)
- iput(inode);
-}
-
-void ceph_handle_cap_import(struct inode *inode, struct ceph_mds_caps *im,
- struct ceph_mds_session *session,
- void *snaptrace, int snaptrace_len)
-{
- struct ceph_inode_info *ci = ceph_inode(inode);
- int mds = session->s_mds;
- unsigned issued = le32_to_cpu(im->caps);
- unsigned seq = le32_to_cpu(im->seq);
- unsigned mseq = le32_to_cpu(im->migrate_seq);
-
- if (ci->i_cap_exporting_mds >= 0 &&
- ci->i_cap_exporting_mseq < mseq) {
- dout(10, "handle_cap_import inode %p ci %p mds%d mseq %d"
- " - cleared exporting from mds%d\n",
- inode, ci, mds, mseq,
- ci->i_cap_exporting_mds);
- ci->i_cap_exporting_issued = 0;
- ci->i_cap_exporting_mseq = 0;
- ci->i_cap_exporting_mds = -1;
- } else {
- dout(10, "handle_cap_import inode %p ci %p mds%d mseq %d\n",
- inode, ci, mds, mseq);
- }
-
- ceph_add_cap(inode, session, -1, issued, seq, mseq,
- snaptrace, snaptrace_len);
-}
-
-
-static void __take_cap_refs(struct ceph_inode_info *ci, int got)
-{
- if (got & CEPH_CAP_RD)
- ci->i_rd_ref++;
- if (got & CEPH_CAP_RDCACHE)
- ci->i_rdcache_ref++;
- if (got & CEPH_CAP_WR)
- ci->i_wr_ref++;
- if (got & CEPH_CAP_WRBUFFER) {
- atomic_inc(&ci->i_wrbuffer_ref);
- dout(30, "__take_cap_refs %p wrbuffer %d -> %d (?)\n",
- &ci->vfs_inode, atomic_read(&ci->i_wrbuffer_ref)-1,
- atomic_read(&ci->i_wrbuffer_ref));
- }
-}
-
-void ceph_take_cap_refs(struct ceph_inode_info *ci, int got)
-{
- dout(30, "take_cap_refs on %p taking %d\n", &ci->vfs_inode, got);
- spin_lock(&ci->vfs_inode.i_lock);
- __take_cap_refs(ci, got);
- spin_unlock(&ci->vfs_inode.i_lock);
-}
-
-int ceph_get_cap_refs(struct ceph_inode_info *ci, int need, int want, int *got,
- loff_t endoff)
-{
- int ret = 0;
- int have, implemented;
-
- dout(30, "get_cap_refs on %p need %d want %d\n", &ci->vfs_inode,
- need, want);
- spin_lock(&ci->vfs_inode.i_lock);
- if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) {
- dout(20, "get_cap_refs endoff %llu > max_size %llu\n",
- endoff, ci->i_max_size);
- goto sorry;
- }
- have = __ceph_caps_issued(ci, &implemented);
- if ((have & need) == need) {
- /*
- * look at (implemented & ~have & not) so that we keep waiting
- * on transition from wanted -> needed caps. this is needed
- * for WRBUFFER|WR -> WR to avoid a new WR sync write from
- * going before a prior buffered writeback happens.
- */
- int not = want & ~(have & need);
- int revoking = implemented & ~have;
- dout(30, "get_cap_refs have %d but not %d (revoking %d)\n",
- have, not, revoking);
- if ((revoking & not) == 0) {
- *got = need | (have & want);
- __take_cap_refs(ci, *got);
- ret = 1;
- }
- } else
- dout(30, "get_cap_refs have %d needed %d\n", have, need);
-sorry:
- spin_unlock(&ci->vfs_inode.i_lock);
- dout(30, "get_cap_refs on %p ret %d got %d\n", &ci->vfs_inode,
- ret, *got);
- return ret;
-}
-
-void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
-{
- int last = 0;
-
- spin_lock(&ci->vfs_inode.i_lock);
- if (had & CEPH_CAP_RD)
- if (--ci->i_rd_ref == 0)
- last++;
- if (had & CEPH_CAP_RDCACHE)
- if (--ci->i_rdcache_ref == 0)
- last++;
- if (had & CEPH_CAP_WR)
- if (--ci->i_wr_ref == 0)
- last++;
- if (had & CEPH_CAP_WRBUFFER) {
- if (atomic_dec_and_test(&ci->i_wrbuffer_ref))
- last++;
- dout(30, "put_cap_refs %p wrbuffer %d -> %d (?)\n",
- &ci->vfs_inode, atomic_read(&ci->i_wrbuffer_ref)+1,
- atomic_read(&ci->i_wrbuffer_ref));
- }
- spin_unlock(&ci->vfs_inode.i_lock);
-
- dout(30, "put_cap_refs on %p had %d %s\n", &ci->vfs_inode, had,
- last ? "last":"");
-
- if (last)
- ceph_check_caps(ci, 0, 0);
-}
-
-void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr)
-{
- int was_last;
- int v;
-
- spin_lock(&ci->vfs_inode.i_lock);
- was_last = atomic_sub_and_test(nr, &ci->i_wrbuffer_ref);
- v = atomic_read(&ci->i_wrbuffer_ref);
- spin_unlock(&ci->vfs_inode.i_lock);
-
- dout(30, "put_wrbuffer_cap_refs on %p %d -> %d (?)%s\n",
- &ci->vfs_inode, v+nr, v, was_last ? " LAST":"");
- WARN_ON(v < 0);
-
- if (was_last)
- ceph_check_caps(ci, 0, 0);
-}
-
-
/*
* symlinks
*/
#include "messenger.h"
#include "decode.h"
-static void send_msg_mds(struct ceph_mds_client *mdsc, struct ceph_msg *msg,
- int mds)
+void ceph_send_msg_mds(struct ceph_mds_client *mdsc, struct ceph_msg *msg,
+ int mds)
{
msg->hdr.dst.addr = *ceph_mdsmap_get_addr(mdsc->mdsmap, mds);
msg->hdr.dst.name.type = cpu_to_le32(CEPH_ENTITY_TYPE_MDS);
* sessions
*/
-static struct ceph_mds_session *__get_session(struct ceph_mds_client *mdsc,
- int mds)
+struct ceph_mds_session *__ceph_get_mds_session(struct ceph_mds_client *mdsc,
+ int mds)
{
struct ceph_mds_session *session;
if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == 0)
return session;
}
-static void put_session(struct ceph_mds_session *s)
+void ceph_put_mds_session(struct ceph_mds_session *s)
{
BUG_ON(s == NULL);
- dout(30, "put_session %p %d -> %d\n", s,
+ dout(30, "put_mds_session %p %d -> %d\n", s,
atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
if (atomic_dec_and_test(&s->s_ref))
kfree(s);
mdsc->max_sessions = newmax;
}
if (mdsc->sessions[mds]) {
- put_session(s); /* lost race */
+ ceph_put_mds_session(s); /* lost race */
return mdsc->sessions[mds];
} else {
mdsc->sessions[mds] = s;
static void __unregister_session(struct ceph_mds_client *mdsc, int mds)
{
dout(10, "__unregister_session mds%d %p\n", mds, mdsc->sessions[mds]);
- put_session(mdsc->sessions[mds]);
+ ceph_put_mds_session(mdsc->sessions[mds]);
mdsc->sessions[mds] = 0;
}
static void put_request_sessions(struct ceph_mds_request *req)
{
if (req->r_session) {
- put_session(req->r_session);
+ ceph_put_mds_session(req->r_session);
req->r_session = 0;
}
if (req->r_fwd_session) {
- put_session(req->r_fwd_session);
+ ceph_put_mds_session(req->r_fwd_session);
req->r_fwd_session = 0;
}
}
msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
if (IS_ERR(msg))
return PTR_ERR(msg);
- send_msg_mds(mdsc, msg, mds);
+ ceph_send_msg_mds(mdsc, msg, mds);
/* wait for session to open (or fail, or close) */
dout(30, "open_session waiting on session %p\n", session);
msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 0);
if (IS_ERR(msg))
return PTR_ERR(msg);
- send_msg_mds(mdsc, msg, session->s_mds);
+ ceph_send_msg_mds(mdsc, msg, session->s_mds);
return 0;
}
/* handle */
mutex_lock(&mdsc->mutex);
- session = __get_session(mdsc, mds);
+ session = __ceph_get_mds_session(mdsc, mds);
if (session && mdsc->mdsmap)
session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
mutex_unlock(&mdsc->mutex);
}
mutex_unlock(&session->s_mutex);
- put_session(session);
+ ceph_put_mds_session(session);
return;
bad:
}
/* get session */
- session = __get_session(mdsc, mds);
+ session = __ceph_get_mds_session(mdsc, mds);
if (!session)
session = __register_session(mdsc, mds);
dout(30, "do_request mds%d session %p state %d\n", mds, session,
err == -EAGAIN) {
dout(30, "do_request session %p not open, state=%d, waiting\n",
session, session->s_state);
- put_session(session);
+ ceph_put_mds_session(session);
goto retry;
}
dout(10, "do_request %p r_expects_cap=%d\n", req, req->r_expects_cap);
req->r_request = ceph_msg_maybe_dup(req->r_request);
ceph_msg_get(req->r_request);
- send_msg_mds(mdsc, req->r_request, mds);
+ ceph_send_msg_mds(mdsc, req->r_request, mds);
wait_for_completion(&req->r_completion);
mutex_lock(&mdsc->mutex);
if (req->r_reply == NULL) {
dout(10, "handle_reply %p r_expects_cap=%d\n", req, req->r_expects_cap);
mds = le32_to_cpu(msg->hdr.src.name.num);
if (req->r_session && req->r_session->s_mds != mds) {
- put_session(req->r_session);
- req->r_session = __get_session(mdsc, mds);
+ ceph_put_mds_session(req->r_session);
+ req->r_session = __ceph_get_mds_session(mdsc, mds);
}
if (req->r_session == 0) {
derr(1, "got reply on %llu, but no session for mds%d\n",
req->r_num_fwd = fwd_seq;
req->r_resend_mds = next_mds;
put_request_sessions(req);
- req->r_session = __get_session(mdsc, next_mds);
- req->r_fwd_session = __get_session(mdsc, from_mds);
+ req->r_session = __ceph_get_mds_session(mdsc, next_mds);
+ req->r_fwd_session = __ceph_get_mds_session(mdsc, from_mds);
} else {
/* no, resend. */
/* forward race not possible; mds would drop */
dout(1, "reconnect to recovering mds%d\n", mds);
/* find session */
- session = __get_session(mdsc, mds);
+ session = __ceph_get_mds_session(mdsc, mds);
if (session) {
session->s_state = CEPH_MDS_SESSION_RECONNECTING;
session->s_seq = 0;
reply->hdr.front_len = cpu_to_le32(reply->front.iov_len);
dout(10, "final len was %u (guessed %d)\n",
(unsigned)reply->front.iov_len, len);
- send_msg_mds(mdsc, reply, mds);
+ ceph_send_msg_mds(mdsc, reply, mds);
if (session) {
if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) {
out:
if (session) {
mutex_unlock(&session->s_mutex);
- put_session(session);
+ ceph_put_mds_session(session);
}
mutex_lock(&mdsc->mutex);
return;
}
-/* caps */
-
-static void send_cap_ack(struct ceph_mds_client *mdsc, __u64 ino, int op,
- int caps, int wanted, __u64 seq, __u64 mseq,
- __u64 size, __u64 max_size,
- struct timespec *mtime, struct timespec *atime,
- u64 time_warp_seq, u64 follows, int mds)
-{
- struct ceph_mds_caps *fc;
- struct ceph_msg *msg;
-
- dout(10, "send_cap_ack %s %llx caps %d wanted %d seq %llu/%llu"
- " follows %lld size %llu\n", ceph_cap_op_name(op), ino,
- caps, wanted, seq, mseq, follows, size);
-
- msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), 0, 0, 0);
- if (IS_ERR(msg))
- return;
-
- fc = msg->front.iov_base;
-
- memset(fc, 0, sizeof(*fc));
-
- fc->op = cpu_to_le32(op);
- fc->seq = cpu_to_le64(seq);
- fc->migrate_seq = cpu_to_le64(mseq);
- fc->caps = cpu_to_le32(caps);
- fc->wanted = cpu_to_le32(wanted);
- fc->ino = cpu_to_le64(ino);
- fc->size = cpu_to_le64(size);
- fc->max_size = cpu_to_le64(max_size);
- fc->snap_follows = cpu_to_le64(follows);
- if (mtime)
- ceph_encode_timespec(&fc->mtime, mtime);
- if (atime)
- ceph_encode_timespec(&fc->atime, atime);
- fc->time_warp_seq = cpu_to_le64(time_warp_seq);
-
- send_msg_mds(mdsc, msg, mds);
-}
-
-void ceph_mdsc_handle_caps(struct ceph_mds_client *mdsc,
- struct ceph_msg *msg)
-{
- struct super_block *sb = mdsc->client->sb;
- struct ceph_mds_session *session;
- struct inode *inode;
- struct ceph_mds_caps *h;
- int mds = le32_to_cpu(msg->hdr.src.name.num);
- int op;
- u32 seq;
- struct ceph_vino vino;
- u64 size, max_size;
-
- dout(10, "handle_caps from mds%d\n", mds);
-
- /* decode */
- if (msg->front.iov_len < sizeof(*h))
- goto bad;
- h = msg->front.iov_base;
- op = le32_to_cpu(h->op);
- vino.ino = le64_to_cpu(h->ino);
- vino.snap = CEPH_NOSNAP;
- seq = le32_to_cpu(h->seq);
- size = le64_to_cpu(h->size);
- max_size = le64_to_cpu(h->max_size);
-
- /* find session */
- mutex_lock(&mdsc->mutex);
- session = __get_session(mdsc, mds);
- if (session)
- down_write(&mdsc->snap_rwsem);
- mutex_unlock(&mdsc->mutex);
- if (!session) {
- dout(10, "WTF, got cap but no session for mds%d\n", mds);
- return;
- }
-
- mutex_lock(&session->s_mutex);
- session->s_seq++;
-
- /* lookup ino */
- inode = ceph_find_inode(sb, vino);
- dout(20, "op %d ino %llx inode %p\n", op, vino.ino, inode);
- if (!inode) {
- dout(10, "i don't have ino %llx, sending release\n", vino.ino);
- send_cap_ack(mdsc, vino.ino, CEPH_CAP_OP_RELEASE, 0, 0, seq,
- size, 0, 0, 0, 0, 0, 0, mds);
- goto no_inode;
- }
-
- switch (op) {
- case CEPH_CAP_OP_GRANT:
- up_write(&mdsc->snap_rwsem);
- if (ceph_handle_cap_grant(inode, h, session) == 1) {
- dout(10, "sending reply back to mds%d\n", mds);
- ceph_msg_get(msg);
- send_msg_mds(mdsc, msg, mds);
- }
- break;
-
- case CEPH_CAP_OP_TRUNC:
- up_write(&mdsc->snap_rwsem);
- ceph_handle_cap_trunc(inode, h, session);
- break;
-
- case CEPH_CAP_OP_RELEASED:
- ceph_handle_cap_released(inode, h, session);
- up_write(&mdsc->snap_rwsem);
- break;
-
- case CEPH_CAP_OP_FLUSHEDSNAP:
- ceph_handle_cap_flushedsnap(inode, h, session);
- up_write(&mdsc->snap_rwsem);
- break;
-
- case CEPH_CAP_OP_EXPORT:
- ceph_handle_cap_export(inode, h, session);
- up_write(&mdsc->snap_rwsem);
- break;
-
- case CEPH_CAP_OP_IMPORT:
- ceph_handle_cap_import(inode, h, session,
- msg->front.iov_base + sizeof(*h),
- le32_to_cpu(h->snap_trace_len));
- up_write(&mdsc->snap_rwsem);
- break;
-
- default:
- up_write(&mdsc->snap_rwsem);
- derr(10, "unknown cap op %d %s\n", op, ceph_cap_op_name(op));
- }
-
- iput(inode);
-no_inode:
- mutex_unlock(&session->s_mutex);
- put_session(session);
- return;
-
-bad:
- derr(10, "corrupt caps message\n");
- return;
-}
-
-/*
- * called with i_lock, then drops it.
- * caller should hold snap_rwsem, s_mutex.
- *
- * returns true if we removed the last cap on this inode.
- */
-int __ceph_mdsc_send_cap(struct ceph_mds_client *mdsc,
- struct ceph_mds_session *session,
- struct ceph_inode_cap *cap,
- int used, int wanted,
- int flush_snap)
-{
- struct ceph_inode_info *ci = cap->ci;
- struct inode *inode = &ci->vfs_inode;
- int revoking = cap->implemented & ~cap->issued;
- int dropping = cap->issued & ~wanted;
- int keep;
- u64 seq, mseq, time_warp_seq, follows;
- u64 size, max_size;
- struct timespec mtime, atime;
- int wake = 0;
- int op = CEPH_CAP_OP_ACK;
-
- if (flush_snap)
- op = CEPH_CAP_OP_FLUSHSNAP;
- else if (wanted == 0)
- op = CEPH_CAP_OP_RELEASE;
-
- dout(10, "__send_cap cap %p session %p %d -> %d\n", cap, cap->session,
- cap->issued, cap->issued & wanted);
- cap->issued &= wanted; /* drop bits we don't want */
-
- if (revoking && (revoking && used) == 0) {
- cap->implemented = cap->issued;
- wake = 1; /* for waiters on wanted -> needed transition */
- }
-
- keep = cap->issued;
- seq = cap->seq;
- mseq = cap->mseq;
- size = inode->i_size;
- ci->i_reported_size = size;
- max_size = ci->i_wanted_max_size;
- ci->i_requested_max_size = max_size;
- mtime = inode->i_mtime;
- atime = inode->i_atime;
- time_warp_seq = ci->i_time_warp_seq;
- follows = ci->i_snaprealm->cached_context->seq;
- if (flush_snap)
- cap->flushed_snap = follows; /* so we only flush it once */
- spin_unlock(&inode->i_lock);
-
- if (dropping & CEPH_CAP_RDCACHE) {
- /*
- * FIXME: this will block if there is a locked page..
- */
- dout(20, "invalidating pages on %p\n", inode);
- invalidate_mapping_pages(&inode->i_data, 0, -1);
- dout(20, "done invalidating pages on %p\n", inode);
- }
-
- send_cap_ack(mdsc, ceph_vino(inode).ino,
- op, keep, wanted, seq, mseq,
- size, max_size, &mtime, &atime, time_warp_seq,
- follows, session->s_mds);
-
- if (wake)
- wake_up(&ci->i_cap_wq);
-
- return 0;
-}
-
-static void check_delayed_caps(struct ceph_mds_client *mdsc)
-{
- struct ceph_inode_info *ci;
-
- dout(10, "check_delayed_caps\n");
- while (1) {
- spin_lock(&mdsc->cap_delay_lock);
- if (list_empty(&mdsc->cap_delay_list))
- break;
- ci = list_first_entry(&mdsc->cap_delay_list,
- struct ceph_inode_info,
- i_cap_delay_list);
- if (time_before(jiffies, ci->i_hold_caps_until))
- break;
- list_del_init(&ci->i_cap_delay_list);
- spin_unlock(&mdsc->cap_delay_lock);
- dout(10, "check_delayed_caps on %p\n", &ci->vfs_inode);
- ceph_check_caps(ci, 1, 0);
- iput(&ci->vfs_inode);
- }
- spin_unlock(&mdsc->cap_delay_lock);
-}
-
-static void flush_write_caps(struct ceph_mds_client *mdsc,
- struct ceph_mds_session *session,
- int purge)
-{
- struct list_head *p, *n;
-
- list_for_each_safe (p, n, &session->s_caps) {
- struct ceph_inode_cap *cap =
- list_entry(p, struct ceph_inode_cap, session_caps);
- struct inode *inode = &cap->ci->vfs_inode;
- int used, wanted;
-
- spin_lock(&inode->i_lock);
- if ((cap->implemented & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) == 0) {
- spin_unlock(&inode->i_lock);
- continue;
- }
-
- used = __ceph_caps_used(cap->ci);
- wanted = __ceph_caps_wanted(cap->ci);
-
- if (purge && (used || wanted)) {
- derr(0, "residual caps on %p used %d wanted %d s=%llu wrb=%d\n",
- inode, used, wanted, inode->i_size,
- atomic_read(&cap->ci->i_wrbuffer_ref));
- used = wanted = 0;
- }
-
- __ceph_mdsc_send_cap(mdsc, session, cap, used, wanted, 0);
- }
-}
-
/*
* snap
/* find session */
mutex_lock(&mdsc->mutex);
- session = __get_session(mdsc, mds);
+ session = __ceph_get_mds_session(mdsc, mds);
if (session)
down_write(&mdsc->snap_rwsem);
mutex_unlock(&mdsc->mutex);
if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
goto done;
- flush_write_caps(mdsc, session, 1);
+ ceph_flush_write_caps(mdsc, session, 1);
session->s_state = CEPH_MDS_SESSION_CLOSING;
msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
err = PTR_ERR(msg);
goto done;
}
- send_msg_mds(mdsc, msg, mds);
+ ceph_send_msg_mds(mdsc, msg, mds);
done:
mutex_unlock(&session->s_mutex);
/* find session */
mutex_lock(&mdsc->mutex);
- session = __get_session(mdsc, mds);
+ session = __ceph_get_mds_session(mdsc, mds);
mutex_unlock(&mdsc->mutex);
if (!session) {
dout(10, "WTF, got lease but no session for mds%d\n", mds);
dout(10, "sending release\n");
h->action = CEPH_MDS_LEASE_RELEASE;
ceph_msg_get(msg);
- send_msg_mds(mdsc, msg, mds);
+ ceph_send_msg_mds(mdsc, msg, mds);
mutex_unlock(&session->s_mutex);
return;
memcpy((void *)lease + sizeof(*lease) + 4, dentry->d_name.name,
dnamelen);
- send_msg_mds(mdsc, msg, mds);
+ ceph_send_msg_mds(mdsc, msg, mds);
}
dout(10, "delayed_work on %p renew_caps=%d\n", mdsc, renew_caps);
- check_delayed_caps(mdsc);
+ ceph_check_delayed_caps(mdsc);
mutex_lock(&mdsc->mutex);
if (renew_caps)
mdsc->last_renew_caps = jiffies;
for (i = 0; i < mdsc->max_sessions; i++) {
- struct ceph_mds_session *session = __get_session(mdsc, i);
+ struct ceph_mds_session *session = __ceph_get_mds_session(mdsc, i);
if (session == 0)
continue;
if (session->s_ttl && time_after(jiffies, session->s_ttl)) {
want_map = mdsc->mdsmap->m_epoch;
}
if (session->s_state < CEPH_MDS_SESSION_OPEN) {
- put_session(session);
+ ceph_put_mds_session(session);
continue;
}
//mutex_unlock(&mdsc->mutex);
trim_session_leases(session);
mutex_unlock(&session->s_mutex);
- put_session(session);
+ ceph_put_mds_session(session);
//mutex_lock(&mdsc->mutex);
}
mutex_unlock(&mdsc->mutex);
mutex_lock(&mdsc->mutex);
for (i = 0; i < mdsc->max_sessions; i++) {
- struct ceph_mds_session *session = __get_session(mdsc, i);
+ struct ceph_mds_session *session = __ceph_get_mds_session(mdsc, i);
if (!session)
continue;
//mutex_unlock(&mdsc->mutex);
void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
{
drop_leases(mdsc);
- check_delayed_caps(mdsc);
+ ceph_check_delayed_caps(mdsc);
}
/*
dout(10, "closing sessions\n");
n = 0;
for (i = 0; i < mdsc->max_sessions; i++) {
- session = __get_session(mdsc, i);
+ session = __ceph_get_mds_session(mdsc, i);
if (!session)
continue;
//mutex_unlock(&mdsc->mutex);
extern const char *ceph_mds_op_name(int op);
+extern struct ceph_mds_session *__ceph_get_mds_session(struct ceph_mds_client *mdsc, int mds);
+ extern void ceph_put_mds_session(struct ceph_mds_session *s);
+
+extern void ceph_send_msg_mds(struct ceph_mds_client *mdsc,
+ struct ceph_msg *msg, int mds);
+
extern void ceph_mdsc_init(struct ceph_mds_client *mdsc,
struct ceph_client *client);
extern void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc);
extern void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc,
struct ceph_msg *msg);
-extern void ceph_mdsc_handle_caps(struct ceph_mds_client *mdsc,
- struct ceph_msg *msg);
extern void ceph_mdsc_handle_snap(struct ceph_mds_client *mdsc,
struct ceph_msg *msg);
Opt_debug_osdc,
Opt_debug_addr,
Opt_debug_inode,
+ Opt_debug_snap,
+ Opt_debug_ioctl,
+ Opt_debug_caps,
Opt_monport,
Opt_port,
Opt_wsize,
{Opt_debug_osdc, "debug_osdc=%d"},
{Opt_debug_addr, "debug_addr=%d"},
{Opt_debug_inode, "debug_inode=%d"},
+ {Opt_debug_snap, "debug_snap=%d"},
+ {Opt_debug_ioctl, "debug_ioctl=%d"},
+ {Opt_debug_caps, "debug_caps=%d"},
{Opt_monport, "monport=%d"},
{Opt_port, "port=%d"},
{Opt_wsize, "wsize=%d"},
case Opt_debug_inode:
ceph_debug_inode = intval;
break;
+ case Opt_debug_snap:
+ ceph_debug_snap = intval;
+ break;
+ case Opt_debug_ioctl:
+ ceph_debug_ioctl = intval;
+ break;
+ case Opt_debug_caps:
+ ceph_debug_caps = intval;
+ break;
case Opt_debug_console:
ceph_debug_console = 1;
break;
ceph_mdsc_handle_forward(&client->mdsc, msg);
break;
case CEPH_MSG_CLIENT_CAPS:
- ceph_mdsc_handle_caps(&client->mdsc, msg);
+ ceph_handle_caps(&client->mdsc, msg);
break;
case CEPH_MSG_CLIENT_SNAP:
ceph_mdsc_handle_snap(&client->mdsc, msg);
extern int ceph_debug_inode;
extern int ceph_debug_snap;
extern int ceph_debug_ioctl;
+extern int ceph_debug_caps;
#define CEPH_DUMP_ERROR_ALWAYS
extern int ceph_fill_inode(struct inode *inode,
struct ceph_mds_reply_info_in *iinfo,
struct ceph_mds_reply_dirfrag *dirinfo);
+extern void ceph_fill_file_bits(struct inode *inode, int issued,
+ u64 time_warp_seq,
+ u64 size, struct timespec *ctime,
+ struct timespec *mtime, struct timespec *atime);
extern int ceph_fill_trace(struct super_block *sb,
struct ceph_mds_request *req,
struct ceph_mds_session *session);
extern int ceph_inode_lease_valid(struct inode *inode, int mask);
extern int ceph_dentry_lease_valid(struct dentry *dentry);
+extern void ceph_inode_set_size(struct inode *inode, loff_t size);
+extern void ceph_inode_writeback(struct work_struct *work);
+extern void ceph_vmtruncate_work(struct work_struct *work);
+extern void __ceph_do_pending_vmtruncate(struct inode *inode);
+
+extern int ceph_do_getattr(struct dentry *dentry, int mask);
+extern int ceph_setattr(struct dentry *dentry, struct iattr *attr);
+extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
+ struct kstat *stat);
+extern int ceph_setxattr(struct dentry *, const char *,const void *,size_t,int);
+extern ssize_t ceph_getxattr(struct dentry *, const char *, void *, size_t);
+extern ssize_t ceph_listxattr(struct dentry *, char *, size_t);
+extern int ceph_removexattr(struct dentry *, const char *);
+
+/* caps.c */
+extern void ceph_handle_caps(struct ceph_mds_client *mdsc,
+ struct ceph_msg *msg);
extern int ceph_add_cap(struct inode *inode,
struct ceph_mds_session *session,
int fmode, unsigned issued,
extern void ceph_remove_cap(struct ceph_inode_cap *cap);
extern void ceph_remove_all_caps(struct ceph_inode_info *ci);
extern int ceph_get_cap_mds(struct inode *inode);
-extern int ceph_handle_cap_grant(struct inode *inode,
- struct ceph_mds_caps *grant,
- struct ceph_mds_session *session);
-extern void ceph_handle_cap_trunc(struct inode *inode,
- struct ceph_mds_caps *trunc,
- struct ceph_mds_session *session);
-extern void ceph_handle_cap_released(struct inode *inode,
- struct ceph_mds_caps *trunc,
- struct ceph_mds_session *session);
-extern void ceph_handle_cap_flushedsnap(struct inode *inode,
- struct ceph_mds_caps *trunc,
- struct ceph_mds_session *session);
-extern void ceph_handle_cap_export(struct inode *inode,
- struct ceph_mds_caps *ex,
- struct ceph_mds_session *session);
-extern void ceph_handle_cap_import(struct inode *inode,
- struct ceph_mds_caps *im,
- struct ceph_mds_session *session,
- void *snaptrace, int snaptrace_len);
extern int ceph_get_cap_refs(struct ceph_inode_info *ci, int need, int want, int *got, loff_t offset);
extern void ceph_take_cap_refs(struct ceph_inode_info *ci, int got);
extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr);
extern void ceph_check_caps(struct ceph_inode_info *ci, int delayed, int flush);
-extern void ceph_inode_set_size(struct inode *inode, loff_t size);
-extern void ceph_inode_writeback(struct work_struct *work);
-extern void ceph_vmtruncate_work(struct work_struct *work);
-extern void __ceph_do_pending_vmtruncate(struct inode *inode);
-
-extern int ceph_do_getattr(struct dentry *dentry, int mask);
-extern int ceph_setattr(struct dentry *dentry, struct iattr *attr);
-extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
- struct kstat *stat);
-extern int ceph_setxattr(struct dentry *, const char *,const void *,size_t,int);
-extern ssize_t ceph_getxattr(struct dentry *, const char *, void *, size_t);
-extern ssize_t ceph_listxattr(struct dentry *, char *, size_t);
-extern int ceph_removexattr(struct dentry *, const char *);
+extern void ceph_check_delayed_caps(struct ceph_mds_client *mdsc);
+extern void ceph_flush_write_caps(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session,
+ int purge);
/* addr.c */
extern const struct address_space_operations ceph_aops;