#include "messenger.h"
+#define MAX_CAP_STR 20
+static char cap_str[MAX_CAP_STR][40];
+static spinlock_t cap_str_lock = SPIN_LOCK_UNLOCKED;
+static int last_cap_str;
+
+static char *gcap_string(char *s, int c)
+{
+ if (c & CEPH_CAP_GRDCACHE)
+ *s++ = 'c';
+ if (c & CEPH_CAP_GEXCL)
+ *s++ = 'x';
+ if (c & CEPH_CAP_GRD)
+ *s++ = 'r';
+ if (c & CEPH_CAP_GWR)
+ *s++ = 'w';
+ if (c & CEPH_CAP_GWRBUFFER)
+ *s++ = 'b';
+ if (c & CEPH_CAP_GLAZYIO)
+ *s++ = 'l';
+ return s;
+}
+
+const char *ceph_cap_string(int caps)
+{
+ int i;
+ char *s;
+ int c;
+
+ spin_lock(&cap_str_lock);
+ i = last_cap_str++;
+ if (last_cap_str == MAX_CAP_STR)
+ last_cap_str = 0;
+ spin_unlock(&cap_str_lock);
+
+ s = cap_str[i];
+
+ if (caps & CEPH_CAP_PIN)
+ *s++ = 'p';
+
+ c = (caps >> CEPH_CAP_SAUTH) & 3;
+ if (c) {
+ *s++ = 'A';
+ s = gcap_string(s, c);
+ }
+
+ c = (caps >> CEPH_CAP_SLINK) & 3;
+ if (c) {
+ *s++ = 'L';
+ s = gcap_string(s, c);
+ }
+
+ c = (caps >> CEPH_CAP_SXATTR) & 3;
+ if (c) {
+ *s++ = 'X';
+ s = gcap_string(s, c);
+ }
+
+ c = caps >> CEPH_CAP_SFILE;
+ if (c) {
+ *s++ = 'F';
+ s = gcap_string(s, c);
+ }
+
+ if (s == cap_str[i])
+ *s++ = '-';
+ *s = 0;
+ return cap_str[i];
+}
+
+
/*
* Find ceph_cap for given mds, if any.
*
int mds = session->s_mds;
int is_first = 0;
- dout(10, "add_cap on %p mds%d cap %d seq %d\n", inode,
- session->s_mds, issued, seq);
+ dout(10, "add_cap on %p mds%d cap %s seq %d\n", inode,
+ session->s_mds, ceph_cap_string(issued), seq);
retry:
spin_lock(&inode->i_lock);
cap = __get_cap_for_mds(inode, mds);
}
cap->issued = cap->implemented = 0;
+ cap->flushing = 0;
cap->mds = mds;
is_first = RB_EMPTY_ROOT(&ci->i_caps); /* grab inode later */
list_add(&ci->i_snap_realm_item, &realm->inodes_with_caps);
}
- dout(10, "add_cap inode %p (%llx.%llx) cap %xh now %xh seq %d mds%d\n",
- inode, ceph_vinop(inode), issued, issued|cap->issued, seq, mds);
+ dout(10, "add_cap inode %p (%llx.%llx) cap %s now %s seq %d mds%d\n",
+ inode, ceph_vinop(inode), ceph_cap_string(issued),
+ ceph_cap_string(issued|cap->issued), seq, mds);
cap->issued |= issued;
cap->implemented |= issued;
cap->seq = seq;
spin_unlock(&cap->session->s_cap_lock);
if (cap->gen < gen || time_after_eq(jiffies, ttl)) {
- dout(30, "__ceph_caps_issued %p cap %p issued %d "
+ dout(30, "__ceph_caps_issued %p cap %p issued %s "
"but STALE (gen %u vs %u)\n", &ci->vfs_inode,
- cap, cap->issued, cap->gen, gen);
+ cap, ceph_cap_string(cap->issued), cap->gen, gen);
continue;
}
- dout(30, "__ceph_caps_issued %p cap %p issued %d\n",
- &ci->vfs_inode, cap, cap->issued);
+ dout(30, "__ceph_caps_issued %p cap %p issued %s\n",
+ &ci->vfs_inode, cap, ceph_cap_string(cap->issued));
have |= cap->issued;
if (implemented)
*implemented |= cap->implemented;
return have;
}
+/*
+ * Return mask of fields dirtied while holding *_EXCL or FILE_WR caps.
+ * Include both per-inode 'dirty' fields and 'flushing' caps.
+ */
+int __ceph_caps_dirty(struct ceph_inode_info *ci)
+{
+ struct inode *inode = &ci->vfs_inode;
+ int dirty = ci->i_dirty_caps;
+ struct ceph_cap *cap;
+ struct rb_node *p;
+
+ for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
+ cap = rb_entry(p, struct ceph_cap, ci_node);
+ dout(30, "__ceph_caps_dirty %p cap %p flushing %s\n",
+ inode, cap, ceph_cap_string(cap->flushing));
+ dirty |= cap->flushing;
+ }
+ dout(30, "__ceph_caps_dirty %p dirty %s\n", inode,
+ ceph_cap_string(dirty));
+ return dirty;
+}
+
/*
* caller should hold i_lock, snap_rwsem, and session s_mutex.
* returns true if this is the last cap. if so, caller should iput.
* Caller should be holding s_mutex.
*/
static void send_cap_msg(struct ceph_mds_client *mdsc, u64 ino, int op,
- int caps, int wanted, u64 seq, u64 mseq,
+ int caps, int wanted, int dirty, u64 seq, u64 mseq,
u64 size, u64 max_size,
struct timespec *mtime, struct timespec *atime,
u64 time_warp_seq,
struct ceph_mds_caps *fc;
struct ceph_msg *msg;
- dout(10, "send_cap_msg %s %llx caps %d wanted %d seq %llu/%llu"
+ dout(10, "send_cap_msg %s %llx caps %s wanted %s dirty %s seq %llu/%llu"
" follows %lld size %llu\n", ceph_cap_op_name(op), ino,
- caps, wanted, seq, mseq, follows, size);
+ ceph_cap_string(caps), ceph_cap_string(wanted),
+ ceph_cap_string(dirty),
+ seq, mseq, follows, size);
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), 0, 0, NULL);
if (IS_ERR(msg))
fc->migrate_seq = cpu_to_le32(mseq);
fc->caps = cpu_to_le32(caps);
fc->wanted = cpu_to_le32(wanted);
+ fc->dirty = cpu_to_le32(dirty);
fc->ino = cpu_to_le64(ino);
fc->size = cpu_to_le64(size);
fc->max_size = cpu_to_le64(max_size);
* caller should hold snap_rwsem, s_mutex.
*/
static void __send_cap(struct ceph_mds_client *mdsc,
- struct ceph_mds_session *session,
- struct ceph_cap *cap,
- int used, int wanted) __releases(cap->ci->vfs_inode->i_lock)
+ struct ceph_mds_session *session,
+ struct ceph_cap *cap,
+ int used, int want, int retain)
+ __releases(cap->ci->vfs_inode->i_lock)
{
struct ceph_inode_info *ci = cap->ci;
struct inode *inode = &ci->vfs_inode;
int revoking = cap->implemented & ~cap->issued;
- int dropping = cap->issued & ~wanted;
+ int dropping = cap->issued & ~retain;
int keep;
u64 seq, mseq, time_warp_seq, follows;
u64 size, max_size;
struct timespec mtime, atime;
int wake = 0;
- int op = CEPH_CAP_OP_ACK;
+ int op = CEPH_CAP_OP_UPDATE;
mode_t mode;
uid_t uid;
gid_t gid;
+ int dirty;
+ int flushing;
+ int last_cap = 0;
- if (wanted == 0)
+ if (retain == 0)
op = CEPH_CAP_OP_RELEASE;
dout(10, "__send_cap cap %p session %p %d -> %d\n", cap, cap->session,
- cap->issued, cap->issued & wanted);
- cap->issued &= wanted; /* drop bits we don't want */
+ cap->issued, cap->issued & retain);
+
+ dirty = __ceph_caps_dirty(ci);
+ cap->flushing |= dirty & cap->issued;
+ if (cap->flushing) {
+ ci->i_dirty_caps &= ~cap->flushing;
+ dout(10, "__send_cap flushing %s, dirty_caps now %s\n",
+ ceph_cap_string(cap->flushing),
+ ceph_cap_string(ci->i_dirty_caps));
+ }
+
+ cap->issued &= retain; /* drop bits we don't want */
if (revoking && (revoking & used) == 0) {
cap->implemented = cap->issued;
}
keep = cap->issued;
+ flushing = cap->flushing;
seq = cap->seq;
mseq = cap->mseq;
size = inode->i_size;
uid = inode->i_uid;
gid = inode->i_gid;
mode = inode->i_mode;
+
+ /* close out cap? */
+ if (flushing == 0 && keep == 0)
+ last_cap = __ceph_remove_cap(cap);
+
spin_unlock(&inode->i_lock);
if (dropping & CEPH_CAP_FILE_RDCACHE) {
}
send_cap_msg(mdsc, ceph_vino(inode).ino,
- op, keep, wanted, seq, mseq,
+ op, keep, want, flushing, seq, mseq,
size, max_size, &mtime, &atime, time_warp_seq,
uid, gid, mode,
follows, session->s_mds);
if (wake)
wake_up(&ci->i_cap_wq);
+ if (last_cap)
+ iput(inode);
}
* we need to wait for sync writes to complete and for dirty
* pages to be written out.
*/
- if (capsnap->dirty || capsnap->writing)
+ if (capsnap->dirty_pages || capsnap->writing)
continue;
/* pick mds, take s_mutex */
dout(10, "flush_snaps %p cap_snap %p follows %lld size %llu\n",
inode, capsnap, next_follows, capsnap->size);
send_cap_msg(mdsc, ceph_vino(inode).ino,
- CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0, 0, mseq,
+ CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0,
+ capsnap->dirty, 0, mseq,
capsnap->size, 0,
&capsnap->mtime, &capsnap->atime,
capsnap->time_warp_seq,
int file_wanted, used;
struct ceph_mds_session *session = NULL; /* if set, i hold s_mutex */
int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */
- int revoking;
+ int retain, revoking;
int mds = -1; /* keep track of how far we've gone through i_caps list
to avoid an infinite loop on retry */
struct rb_node *p;
retry_locked:
file_wanted = __ceph_caps_file_wanted(ci);
used = __ceph_caps_used(ci);
- dout(10, "check_caps %p file_wanted %d used %d issued %d\n",
- inode, file_wanted, used, __ceph_caps_issued(ci, NULL));
+
+ retain = file_wanted | used;
+ if (!mdsc->stopping)
+ retain |= CEPH_CAP_PIN |
+ (S_ISDIR(inode->i_mode) ? CEPH_CAP_ANY_RDCACHE :
+ CEPH_CAP_ANY_RD);
+
+ dout(10, "check_caps %p file_wanted %s used %s retain %s issued %s\n",
+ inode, ceph_cap_string(file_wanted), ceph_cap_string(used),
+ ceph_cap_string(retain),
+ ceph_cap_string(__ceph_caps_issued(ci, NULL)));
/*
* Reschedule delayed caps release, unless we are called from
revoking = cap->implemented & ~cap->issued;
if (revoking)
- dout(10, "mds%d revoking %d\n", cap->mds, revoking);
+ dout(10, "mds%d revoking %s\n", cap->mds,
+ ceph_cap_string(revoking));
/* request larger max_size from MDS? */
if (ci->i_wanted_max_size > ci->i_max_size &&
/* completed revocation? going down and there are no caps? */
if (revoking && (revoking & used) == 0) {
- dout(10, "completed revocation of %d\n",
- cap->implemented & ~cap->issued);
+ dout(10, "completed revocation of %s\n",
+ ceph_cap_string(cap->implemented & ~cap->issued));
goto ack;
}
if (!revoking && mdsc->stopping && (used == 0))
goto ack;
- if ((cap->issued & ~(file_wanted | used)) == 0)
+ if ((cap->issued & ~retain) == 0)
continue; /* nothing extra, all good */
/* delay cap release for a bit? */
mds = cap->mds; /* remember mds, so we don't repeat */
/* __send_cap drops i_lock */
- __send_cap(mdsc, session, cap, used, used | file_wanted);
+ __send_cap(mdsc, session, cap, used, file_wanted|used, retain);
goto retry; /* retake i_lock and restart our cap scan. */
}
int ret = 0;
int have, implemented;
- dout(30, "get_cap_refs %p need %d want %d\n", inode, need, want);
+ dout(30, "get_cap_refs %p need %s want %s\n", inode,
+ ceph_cap_string(need), ceph_cap_string(want));
spin_lock(&inode->i_lock);
if (need & CEPH_CAP_FILE_WR) {
if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) {
*/
int not = want & ~(have & need);
int revoking = implemented & ~have;
- dout(30, "get_cap_refs %p have %d but not %d (revoking %d)\n",
- inode, have, not, revoking);
+ dout(30, "get_cap_refs %p have %s but not %s (revoking %s)\n",
+ inode, ceph_cap_string(have), ceph_cap_string(not),
+ ceph_cap_string(revoking));
if ((revoking & not) == 0) {
*got = need | (have & want);
__take_cap_refs(ci, *got);
ret = 1;
}
} else {
- dout(30, "get_cap_refs %p have %d needed %d\n", inode,
- have, need);
+ dout(30, "get_cap_refs %p have %s needed %s\n", inode,
+ ceph_cap_string(have), ceph_cap_string(need));
}
sorry:
spin_unlock(&inode->i_lock);
- dout(30, "get_cap_refs %p ret %d got %d\n", inode,
- ret, *got);
+ dout(30, "get_cap_refs %p ret %d got %s\n", inode,
+ ret, ceph_cap_string(*got));
return ret;
}
}
spin_unlock(&inode->i_lock);
- dout(30, "put_cap_refs %p had %d %s\n", inode, had, last ? "last" : "");
+ dout(30, "put_cap_refs %p had %s %s\n", inode, ceph_cap_string(had),
+ last ? "last" : "");
if (last && !flushsnaps)
ceph_check_caps(ci, 0);
capsnap = list_entry(p, struct ceph_cap_snap, ci_item);
if (capsnap->context == snapc) {
found = 1;
- capsnap->dirty -= nr;
- last_snap = !capsnap->dirty;
+ capsnap->dirty_pages -= nr;
+ last_snap = !capsnap->dirty_pages;
break;
}
}
dout(30, "put_wrbuffer_cap_refs on %p cap_snap %p "
" snap %lld %d/%d -> %d/%d %s%s\n",
inode, capsnap, capsnap->context->seq,
- ci->i_wrbuffer_ref+nr, capsnap->dirty + nr,
- ci->i_wrbuffer_ref, capsnap->dirty,
+ ci->i_wrbuffer_ref+nr, capsnap->dirty_pages + nr,
+ ci->i_wrbuffer_ref, capsnap->dirty_pages,
last ? " (wrbuffer last)" : "",
last_snap ? " (capsnap last)" : "");
}
int tried_invalidate = 0;
int ret;
- dout(10, "handle_cap_grant inode %p ci %p mds%d seq %d\n",
- inode, ci, mds, seq);
+ dout(10, "handle_cap_grant inode %p ci %p mds%d seq %d %s\n",
+ inode, ci, mds, seq, ceph_cap_string(newcaps));
dout(10, " size %llu max_size %llu, i_size %llu\n", size, max_size,
inode->i_size);
spin_lock(&inode->i_lock);
goto start;
}
+ issued = __ceph_caps_issued(ci, NULL);
+
+ if ((issued & CEPH_CAP_AUTH_EXCL) == 0) {
+ inode->i_mode = le32_to_cpu(grant->mode);
+ inode->i_uid = le32_to_cpu(grant->uid);
+ inode->i_gid = le32_to_cpu(grant->gid);
+ }
+
+ if ((issued & CEPH_CAP_LINK_EXCL) == 0) {
+ inode->i_nlink = le32_to_cpu(grant->nlink);
+ }
+
+ if ((issued & CEPH_CAP_XATTR_EXCL) == 0 && grant->xattr_len) {
+#warning fix xattr cap update
+ }
/* size/ctime/mtime/atime? */
- issued = __ceph_caps_issued(ci, NULL);
ceph_decode_timespec(&mtime, &grant->mtime);
ceph_decode_timespec(&atime, &grant->atime);
ceph_decode_timespec(&ctime, &grant->ctime);
/* check cap bits */
wanted = __ceph_caps_wanted(ci);
used = __ceph_caps_used(ci);
- dout(10, " my wanted = %d, used = %d\n", wanted, used);
+ dout(10, " my wanted = %s, used = %s\n", ceph_cap_string(wanted),
+ ceph_cap_string(used));
if (wanted != le32_to_cpu(grant->wanted)) {
- dout(10, "mds wanted %d -> %d\n", le32_to_cpu(grant->wanted),
- wanted);
+ dout(10, "mds wanted %s -> %s\n",
+ ceph_cap_string(le32_to_cpu(grant->wanted)),
+ ceph_cap_string(wanted));
grant->wanted = cpu_to_le32(wanted);
}
/* revocation? */
if (cap->issued & ~newcaps) {
- dout(10, "revocation: %d -> %d\n", cap->issued, newcaps);
+ dout(10, "revocation: %s -> %s\n", ceph_cap_string(cap->issued),
+ ceph_cap_string(newcaps));
if ((used & ~newcaps) & CEPH_CAP_FILE_WRBUFFER) {
writeback = 1; /* will delay ack */
} else if (((used & ~newcaps) & CEPH_CAP_FILE_RDCACHE) == 0 ||
/* grant or no-op */
if (cap->issued == newcaps) {
- dout(10, "caps unchanged: %d -> %d\n", cap->issued, newcaps);
+ dout(10, "caps unchanged: %s -> %s\n",
+ ceph_cap_string(cap->issued), ceph_cap_string(newcaps));
} else {
- dout(10, "grant: %d -> %d\n", cap->issued, newcaps);
+ dout(10, "grant: %s -> %s\n", ceph_cap_string(cap->issued),
+ ceph_cap_string(newcaps));
cap->issued = newcaps;
cap->implemented |= newcaps; /* add bits only, to
* avoid stepping on a
/*
- * Handle RELEASE from MDS. That means we can throw away our cap
- * state as the MDS has fully flushed that metadata to disk.
+ * Handle FLUSH_ACK from MDS, indicating that metadata we sent to the
+ * MDS has been safely recorded.
*/
-static void handle_cap_released(struct inode *inode,
+static void handle_cap_flush_ack(struct inode *inode,
struct ceph_mds_caps *m,
struct ceph_mds_session *session)
{
struct ceph_inode_info *ci = ceph_inode(inode);
- int seq = le32_to_cpu(m->seq);
- int removed_last;
struct ceph_cap *cap;
-
- dout(10, "handle_cap_released inode %p ci %p mds%d seq %d\n", inode, ci,
- session->s_mds, seq);
+ unsigned seq = le32_to_cpu(m->seq);
+ int caps = le32_to_cpu(m->caps);
+ int dirty = le32_to_cpu(m->dirty);
+ int cleaned = dirty & ~caps;
+ int removed_last = 0;
spin_lock(&inode->i_lock);
cap = __get_cap_for_mds(inode, session->s_mds);
BUG_ON(!cap);
- removed_last = __ceph_remove_cap(cap);
- if (removed_last)
- __cap_delay_cancel(&ceph_inode_to_client(inode)->mdsc, ci);
+ dout(10, "handle_cap_flush_ack inode %p mds%d seq %d cleaned %s,"
+ " flushing %s -> %s\n",
+ inode, session->s_mds, seq, ceph_cap_string(cleaned),
+ ceph_cap_string(cap->flushing),
+ ceph_cap_string(cap->flushing & ~cleaned));
+ cap->flushing &= ~cleaned;
+
+ /* did we release this cap, too? */
+ if (caps == 0 && seq == cap->seq) {
+ BUG_ON(cap->flushing);
+ removed_last = __ceph_remove_cap(cap);
+ if (removed_last)
+ __cap_delay_cancel(&ceph_inode_to_client(inode)->mdsc,
+ ci);
+ }
spin_unlock(&inode->i_lock);
if (removed_last)
iput(inode);
/*
- * Handle FLUSHEDSNAP. MDS has flushed snap data to disk and we can
+ * Handle FLUSHSNAP_ACK. MDS has flushed snap data to disk and we can
* throw away our cap_snap.
*
* Caller hold s_mutex, snap_rwsem.
*/
-static void handle_cap_flushedsnap(struct inode *inode,
- struct ceph_mds_caps *m,
- struct ceph_mds_session *session)
+static void handle_cap_flushsnap_ack(struct inode *inode,
+ struct ceph_mds_caps *m,
+ struct ceph_mds_session *session)
{
struct ceph_inode_info *ci = ceph_inode(inode);
u64 follows = le64_to_cpu(m->snap_follows);
struct ceph_cap_snap *capsnap;
int drop = 0;
- dout(10, "handle_cap_flushedsnap inode %p ci %p mds%d follows %lld\n",
+ dout(10, "handle_cap_flushsnap_ack inode %p ci %p mds%d follows %lld\n",
inode, ci, session->s_mds, follows);
spin_lock(&inode->i_lock);
list_for_each(p, &ci->i_cap_snaps) {
capsnap = list_entry(p, struct ceph_cap_snap, ci_item);
if (capsnap->follows == follows) {
- WARN_ON(capsnap->dirty || capsnap->writing);
+ WARN_ON(capsnap->dirty_pages || capsnap->writing);
dout(10, " removing cap_snap %p follows %lld\n",
capsnap, follows);
ceph_put_snap_context(capsnap->context);
dout(20, " op %s ino %llx inode %p\n", ceph_cap_op_name(op), vino.ino,
inode);
if (!inode) {
- dout(10, " i don't have ino %llx, sending release\n", vino.ino);
- send_cap_msg(mdsc, vino.ino, CEPH_CAP_OP_RELEASE, 0, 0, seq,
- size, 0, 0, NULL, NULL, 0, 0, 0, 0, 0, mds);
+ dout(10, " i don't have ino %llx\n", vino.ino);
+ if (op == CEPH_CAP_OP_IMPORT)
+ send_cap_msg(mdsc, vino.ino, CEPH_CAP_OP_RELEASE,
+ 0, 0, 0,
+ seq, 0,
+ size, 0, NULL, NULL, 0,
+ 0, 0, 0,
+ 0, mds);
goto no_inode;
}
handle_cap_trunc(inode, h, session);
break;
- case CEPH_CAP_OP_RELEASED:
- handle_cap_released(inode, h, session);
+ case CEPH_CAP_OP_FLUSH_ACK:
+ handle_cap_flush_ack(inode, h, session);
up_write(&mdsc->snap_rwsem);
if (list_empty(&session->s_caps))
ceph_mdsc_flushed_all_caps(mdsc, session);
break;
- case CEPH_CAP_OP_FLUSHEDSNAP:
- handle_cap_flushedsnap(inode, h, session);
+ case CEPH_CAP_OP_FLUSHSNAP_ACK:
+ handle_cap_flushsnap_ack(inode, h, session);
up_write(&mdsc->snap_rwsem);
break;