]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: use new caps protocol
authorSage Weil <sage@newdream.net>
Tue, 6 Jan 2009 20:47:29 +0000 (12:47 -0800)
committerSage Weil <sage@newdream.net>
Tue, 6 Jan 2009 20:47:29 +0000 (12:47 -0800)
Also,
- Stat root inode instead of opening it.  We will keep the PIN cap
  and be happy.
- add ceph_cap_string helper to make cap values readable

src/TODO
src/kernel/addr.c
src/kernel/caps.c
src/kernel/file.c
src/kernel/inode.c
src/kernel/snap.c
src/kernel/super.c
src/kernel/super.h

index 55b652e8c2dca6fd7d829885ee73925a91f35e60..0f1224af65781b244b9a81c40ae50e0ab9b40c0c 100644 (file)
--- a/src/TODO
+++ b/src/TODO
@@ -46,6 +46,11 @@ timer
 
 
 caps
+- flatten Capability tracking of issued/pending caps.
+- exploit excl auth lease on client
+- issue excl leases to loner if open for write...
+
+
 
 - two types of cap events from mds -> client
   - cap issue .. in a reply, or an IMPORT cap op.
index 988f9fd84f12a395315e51d2fafa943ef0f8cad3..0bf78bdc5c84213c4a773d5f4e46083129df8547 100644 (file)
@@ -107,12 +107,12 @@ static int ceph_set_page_dirty(struct page *page,
                }
                BUG_ON(!capsnap);
                BUG_ON(capsnap->context != snapc);
-               capsnap->dirty++;
+               capsnap->dirty_pages++;
                dout(20, "%p set_page_dirty %p snap %lld %d/%d -> %d/%d"
                     " snapc %p seq %lld (%d snaps)\n",
                     mapping->host, page, capsnap->follows,
-                    ci->i_wrbuffer_ref-1, capsnap->dirty-1,
-                    ci->i_wrbuffer_ref, capsnap->dirty,
+                    ci->i_wrbuffer_ref-1, capsnap->dirty_pages-1,
+                    ci->i_wrbuffer_ref, capsnap->dirty_pages,
                     snapc, snapc->seq, snapc->num_snaps);
        }
        spin_unlock(&inode->i_lock);
@@ -333,11 +333,11 @@ static struct ceph_snap_context *__get_oldest_context(struct inode *inode)
        list_for_each(p, &ci->i_cap_snaps) {
                capsnap = list_entry(p, struct ceph_cap_snap, ci_item);
                dout(20, " cap_snap %p snapc %p has %d dirty pages\n", capsnap,
-                    capsnap->context, capsnap->dirty);
-               if (capsnap->dirty)
+                    capsnap->context, capsnap->dirty_pages);
+               if (capsnap->dirty_pages)
                        break;
        }
-       if (capsnap && capsnap->dirty) {
+       if (capsnap && capsnap->dirty_pages) {
                snapc = ceph_get_snap_context(capsnap->context);
        } else if (ci->i_snap_realm) {
                snapc = ceph_get_snap_context(ci->i_snap_realm->cached_context);
index aa3c05e6c7deba3bfcacc8c436b52edab65ba28e..fcdde37553db8d7b628c1381e32c7fb69a22da89 100644 (file)
@@ -14,6 +14,76 @@ int ceph_debug_caps = -1;
 #include "messenger.h"
 
 
+#define MAX_CAP_STR 20
+static char cap_str[MAX_CAP_STR][40];
+static spinlock_t cap_str_lock = SPIN_LOCK_UNLOCKED;
+static int last_cap_str;
+
+static char *gcap_string(char *s, int c)
+{
+       if (c & CEPH_CAP_GRDCACHE)
+               *s++ = 'c';
+       if (c & CEPH_CAP_GEXCL)
+               *s++ = 'x';
+       if (c & CEPH_CAP_GRD)
+               *s++ = 'r';
+       if (c & CEPH_CAP_GWR)
+               *s++ = 'w';
+       if (c & CEPH_CAP_GWRBUFFER)
+               *s++ = 'b';
+       if (c & CEPH_CAP_GLAZYIO)
+               *s++ = 'l';
+       return s;
+}
+
+const char *ceph_cap_string(int caps)
+{
+       int i;
+       char *s;
+       int c;
+
+       spin_lock(&cap_str_lock);
+       i = last_cap_str++;
+       if (last_cap_str == MAX_CAP_STR)
+               last_cap_str = 0;
+       spin_unlock(&cap_str_lock);
+
+       s = cap_str[i];
+
+       if (caps & CEPH_CAP_PIN)
+               *s++ = 'p';
+
+       c = (caps >> CEPH_CAP_SAUTH) & 3;
+       if (c) {
+               *s++ = 'A';
+               s = gcap_string(s, c);
+       }
+
+       c = (caps >> CEPH_CAP_SLINK) & 3;
+       if (c) {
+               *s++ = 'L';
+               s = gcap_string(s, c);
+       }
+
+       c = (caps >> CEPH_CAP_SXATTR) & 3;
+       if (c) {
+               *s++ = 'X';
+               s = gcap_string(s, c);
+       }
+
+       c = caps >> CEPH_CAP_SFILE;
+       if (c) {
+               *s++ = 'F';
+               s = gcap_string(s, c);
+       }
+       
+       if (s == cap_str[i])
+               *s++ = '-';
+       *s = 0;
+       return cap_str[i];
+}
+
+
 /*
  * Find ceph_cap for given mds, if any.
  *
@@ -117,8 +187,8 @@ int ceph_add_cap(struct inode *inode,
        int mds = session->s_mds;
        int is_first = 0;
 
-       dout(10, "add_cap on %p mds%d cap %d seq %d\n", inode,
-            session->s_mds, issued, seq);
+       dout(10, "add_cap on %p mds%d cap %s seq %d\n", inode,
+            session->s_mds, ceph_cap_string(issued), seq);
 retry:
        spin_lock(&inode->i_lock);
        cap = __get_cap_for_mds(inode, mds);
@@ -135,6 +205,7 @@ retry:
                }
 
                cap->issued = cap->implemented = 0;
+               cap->flushing = 0;
                cap->mds = mds;
 
                is_first = RB_EMPTY_ROOT(&ci->i_caps);  /* grab inode later */
@@ -160,8 +231,9 @@ retry:
                list_add(&ci->i_snap_realm_item, &realm->inodes_with_caps);
        }
 
-       dout(10, "add_cap inode %p (%llx.%llx) cap %xh now %xh seq %d mds%d\n",
-            inode, ceph_vinop(inode), issued, issued|cap->issued, seq, mds);
+       dout(10, "add_cap inode %p (%llx.%llx) cap %s now %s seq %d mds%d\n",
+            inode, ceph_vinop(inode), ceph_cap_string(issued),
+            ceph_cap_string(issued|cap->issued), seq, mds);
        cap->issued |= issued;
        cap->implemented |= issued;
        cap->seq = seq;
@@ -198,13 +270,13 @@ int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
                spin_unlock(&cap->session->s_cap_lock);
 
                if (cap->gen < gen || time_after_eq(jiffies, ttl)) {
-                       dout(30, "__ceph_caps_issued %p cap %p issued %d "
+                       dout(30, "__ceph_caps_issued %p cap %p issued %s "
                             "but STALE (gen %u vs %u)\n", &ci->vfs_inode,
-                            cap, cap->issued, cap->gen, gen);
+                            cap, ceph_cap_string(cap->issued), cap->gen, gen);
                        continue;
                }
-               dout(30, "__ceph_caps_issued %p cap %p issued %d\n",
-                    &ci->vfs_inode, cap, cap->issued);
+               dout(30, "__ceph_caps_issued %p cap %p issued %s\n",
+                    &ci->vfs_inode, cap, ceph_cap_string(cap->issued));
                have |= cap->issued;
                if (implemented)
                        *implemented |= cap->implemented;
@@ -212,6 +284,28 @@ int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
        return have;
 }
 
+/*
+ * Return mask of fields dirtied while holding *_EXCL or FILE_WR caps.
+ * Include both per-inode 'dirty' fields and 'flushing' caps.
+ */
+int __ceph_caps_dirty(struct ceph_inode_info *ci)
+{
+       struct inode *inode = &ci->vfs_inode;
+       int dirty = ci->i_dirty_caps;
+       struct ceph_cap *cap;
+       struct rb_node *p;
+
+       for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
+               cap = rb_entry(p, struct ceph_cap, ci_node);
+               dout(30, "__ceph_caps_dirty %p cap %p flushing %s\n",
+                    inode, cap, ceph_cap_string(cap->flushing));
+               dirty |= cap->flushing;
+       }
+       dout(30, "__ceph_caps_dirty %p dirty %s\n", inode,
+            ceph_cap_string(dirty));
+       return dirty;
+}
+
 /*
  * caller should hold i_lock, snap_rwsem, and session s_mutex.
  * returns true if this is the last cap.  if so, caller should iput.
@@ -304,7 +398,7 @@ static void __cap_delay_cancel(struct ceph_mds_client *mdsc,
  * Caller should be holding s_mutex.
  */
 static void send_cap_msg(struct ceph_mds_client *mdsc, u64 ino, int op,
-                        int caps, int wanted, u64 seq, u64 mseq,
+                        int caps, int wanted, int dirty, u64 seq, u64 mseq,
                         u64 size, u64 max_size,
                         struct timespec *mtime, struct timespec *atime,
                         u64 time_warp_seq,
@@ -314,9 +408,11 @@ static void send_cap_msg(struct ceph_mds_client *mdsc, u64 ino, int op,
        struct ceph_mds_caps *fc;
        struct ceph_msg *msg;
 
-       dout(10, "send_cap_msg %s %llx caps %d wanted %d seq %llu/%llu"
+       dout(10, "send_cap_msg %s %llx caps %s wanted %s dirty %s seq %llu/%llu"
             " follows %lld size %llu\n", ceph_cap_op_name(op), ino,
-            caps, wanted, seq, mseq, follows, size);
+            ceph_cap_string(caps), ceph_cap_string(wanted),
+            ceph_cap_string(dirty),
+            seq, mseq, follows, size);
 
        msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), 0, 0, NULL);
        if (IS_ERR(msg))
@@ -331,6 +427,7 @@ static void send_cap_msg(struct ceph_mds_client *mdsc, u64 ino, int op,
        fc->migrate_seq = cpu_to_le32(mseq);
        fc->caps = cpu_to_le32(caps);
        fc->wanted = cpu_to_le32(wanted);
+       fc->dirty = cpu_to_le32(dirty);
        fc->ino = cpu_to_le64(ino);
        fc->size = cpu_to_le64(size);
        fc->max_size = cpu_to_le64(max_size);
@@ -357,30 +454,44 @@ static void send_cap_msg(struct ceph_mds_client *mdsc, u64 ino, int op,
  * caller should hold snap_rwsem, s_mutex.
  */
 static void __send_cap(struct ceph_mds_client *mdsc,
-               struct ceph_mds_session *session,
-               struct ceph_cap *cap,
-               int used, int wanted) __releases(cap->ci->vfs_inode->i_lock)
+                      struct ceph_mds_session *session,
+                      struct ceph_cap *cap,
+                      int used, int want, int retain)
+       __releases(cap->ci->vfs_inode->i_lock)
 {
        struct ceph_inode_info *ci = cap->ci;
        struct inode *inode = &ci->vfs_inode;
        int revoking = cap->implemented & ~cap->issued;
-       int dropping = cap->issued & ~wanted;
+       int dropping = cap->issued & ~retain;
        int keep;
        u64 seq, mseq, time_warp_seq, follows;
        u64 size, max_size;
        struct timespec mtime, atime;
        int wake = 0;
-       int op = CEPH_CAP_OP_ACK;
+       int op = CEPH_CAP_OP_UPDATE;
        mode_t mode;
        uid_t uid;
        gid_t gid;
+       int dirty;
+       int flushing;
+       int last_cap = 0;       
 
-       if (wanted == 0)
+       if (retain == 0)
                op = CEPH_CAP_OP_RELEASE;
 
        dout(10, "__send_cap cap %p session %p %d -> %d\n", cap, cap->session,
-            cap->issued, cap->issued & wanted);
-       cap->issued &= wanted;  /* drop bits we don't want */
+            cap->issued, cap->issued & retain);
+
+       dirty = __ceph_caps_dirty(ci);
+       cap->flushing |= dirty & cap->issued;
+       if (cap->flushing) {
+               ci->i_dirty_caps &= ~cap->flushing;
+               dout(10, "__send_cap flushing %s, dirty_caps now %s\n",
+                    ceph_cap_string(cap->flushing),
+                    ceph_cap_string(ci->i_dirty_caps));
+       }
+
+       cap->issued &= retain;  /* drop bits we don't want */
 
        if (revoking && (revoking & used) == 0) {
                cap->implemented = cap->issued;
@@ -394,6 +505,7 @@ static void __send_cap(struct ceph_mds_client *mdsc,
        }
 
        keep = cap->issued;
+       flushing = cap->flushing;
        seq = cap->seq;
        mseq = cap->mseq;
        size = inode->i_size;
@@ -407,6 +519,11 @@ static void __send_cap(struct ceph_mds_client *mdsc,
        uid = inode->i_uid;
        gid = inode->i_gid;
        mode = inode->i_mode;
+
+       /* close out cap? */
+       if (flushing == 0 && keep == 0)
+               last_cap = __ceph_remove_cap(cap);
+       
        spin_unlock(&inode->i_lock);
 
        if (dropping & CEPH_CAP_FILE_RDCACHE) {
@@ -416,13 +533,15 @@ static void __send_cap(struct ceph_mds_client *mdsc,
        }
 
        send_cap_msg(mdsc, ceph_vino(inode).ino,
-                    op, keep, wanted, seq, mseq,
+                    op, keep, want, flushing, seq, mseq,
                     size, max_size, &mtime, &atime, time_warp_seq,
                     uid, gid, mode,
                     follows, session->s_mds);
 
        if (wake)
                wake_up(&ci->i_cap_wq);
+       if (last_cap)
+               iput(inode);
 }
 
 
@@ -465,7 +584,7 @@ retry:
                 * we need to wait for sync writes to complete and for dirty
                 * pages to be written out.
                 */
-               if (capsnap->dirty || capsnap->writing)
+               if (capsnap->dirty_pages || capsnap->writing)
                        continue;
 
                /* pick mds, take s_mutex */
@@ -501,7 +620,8 @@ retry:
                dout(10, "flush_snaps %p cap_snap %p follows %lld size %llu\n",
                     inode, capsnap, next_follows, capsnap->size);
                send_cap_msg(mdsc, ceph_vino(inode).ino,
-                            CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0, 0, mseq,
+                            CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0,
+                            capsnap->dirty, 0, mseq,
                             capsnap->size, 0,
                             &capsnap->mtime, &capsnap->atime,
                             capsnap->time_warp_seq,
@@ -554,7 +674,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int is_delayed)
        int file_wanted, used;
        struct ceph_mds_session *session = NULL;    /* if set, i hold s_mutex */
        int took_snap_rwsem = 0;             /* true if mdsc->snap_rwsem held */
-       int revoking;
+       int retain, revoking;
        int mds = -1;   /* keep track of how far we've gone through i_caps list
                           to avoid an infinite loop on retry */
        struct rb_node *p;
@@ -575,8 +695,17 @@ retry:
 retry_locked:
        file_wanted = __ceph_caps_file_wanted(ci);
        used = __ceph_caps_used(ci);
-       dout(10, "check_caps %p file_wanted %d used %d issued %d\n",
-            inode, file_wanted, used, __ceph_caps_issued(ci, NULL));
+
+       retain = file_wanted | used;
+       if (!mdsc->stopping)
+               retain |= CEPH_CAP_PIN |
+                       (S_ISDIR(inode->i_mode) ? CEPH_CAP_ANY_RDCACHE :
+                        CEPH_CAP_ANY_RD);
+
+       dout(10, "check_caps %p file_wanted %s used %s retain %s issued %s\n",
+            inode, ceph_cap_string(file_wanted), ceph_cap_string(used),
+            ceph_cap_string(retain),
+            ceph_cap_string(__ceph_caps_issued(ci, NULL)));
 
        /*
         * Reschedule delayed caps release, unless we are called from
@@ -628,7 +757,8 @@ retry_locked:
 
                revoking = cap->implemented & ~cap->issued;
                if (revoking)
-                       dout(10, "mds%d revoking %d\n", cap->mds, revoking);
+                       dout(10, "mds%d revoking %s\n", cap->mds,
+                            ceph_cap_string(revoking));
 
                /* request larger max_size from MDS? */
                if (ci->i_wanted_max_size > ci->i_max_size &&
@@ -645,8 +775,8 @@ retry_locked:
 
                /* completed revocation? going down and there are no caps? */
                if (revoking && (revoking & used) == 0) {
-                       dout(10, "completed revocation of %d\n",
-                            cap->implemented & ~cap->issued);
+                       dout(10, "completed revocation of %s\n",
+                            ceph_cap_string(cap->implemented & ~cap->issued));
                        goto ack;
                }
 
@@ -654,7 +784,7 @@ retry_locked:
                if (!revoking && mdsc->stopping && (used == 0))
                        goto ack;
 
-               if ((cap->issued & ~(file_wanted | used)) == 0)
+               if ((cap->issued & ~retain) == 0)
                        continue;     /* nothing extra, all good */
 
                /* delay cap release for a bit? */
@@ -696,7 +826,7 @@ ack:
                mds = cap->mds;  /* remember mds, so we don't repeat */
 
                /* __send_cap drops i_lock */
-               __send_cap(mdsc, session, cap, used, used | file_wanted);
+               __send_cap(mdsc, session, cap, used, file_wanted|used, retain);
 
                goto retry; /* retake i_lock and restart our cap scan. */
        }
@@ -746,7 +876,8 @@ int ceph_get_cap_refs(struct ceph_inode_info *ci, int need, int want, int *got,
        int ret = 0;
        int have, implemented;
 
-       dout(30, "get_cap_refs %p need %d want %d\n", inode, need, want);
+       dout(30, "get_cap_refs %p need %s want %s\n", inode,
+            ceph_cap_string(need), ceph_cap_string(want));
        spin_lock(&inode->i_lock);
        if (need & CEPH_CAP_FILE_WR) {
                if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) {
@@ -777,21 +908,22 @@ int ceph_get_cap_refs(struct ceph_inode_info *ci, int need, int want, int *got,
                 */
                int not = want & ~(have & need);
                int revoking = implemented & ~have;
-               dout(30, "get_cap_refs %p have %d but not %d (revoking %d)\n",
-                    inode, have, not, revoking);
+               dout(30, "get_cap_refs %p have %s but not %s (revoking %s)\n",
+                    inode, ceph_cap_string(have), ceph_cap_string(not),
+                    ceph_cap_string(revoking));
                if ((revoking & not) == 0) {
                        *got = need | (have & want);
                        __take_cap_refs(ci, *got);
                        ret = 1;
                }
        } else {
-               dout(30, "get_cap_refs %p have %d needed %d\n", inode,
-                    have, need);
+               dout(30, "get_cap_refs %p have %s needed %s\n", inode,
+                    ceph_cap_string(have), ceph_cap_string(need));
        }
 sorry:
        spin_unlock(&inode->i_lock);
-       dout(30, "get_cap_refs %p ret %d got %d\n", inode,
-            ret, *got);
+       dout(30, "get_cap_refs %p ret %d got %s\n", inode,
+            ret, ceph_cap_string(*got));
        return ret;
 }
 
@@ -841,7 +973,8 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
                }
        spin_unlock(&inode->i_lock);
 
-       dout(30, "put_cap_refs %p had %d %s\n", inode, had, last ? "last" : "");
+       dout(30, "put_cap_refs %p had %s %s\n", inode, ceph_cap_string(had),
+            last ? "last" : "");
 
        if (last && !flushsnaps)
                ceph_check_caps(ci, 0);
@@ -888,8 +1021,8 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
                        capsnap = list_entry(p, struct ceph_cap_snap, ci_item);
                        if (capsnap->context == snapc) {
                                found = 1;
-                               capsnap->dirty -= nr;
-                               last_snap = !capsnap->dirty;
+                               capsnap->dirty_pages -= nr;
+                               last_snap = !capsnap->dirty_pages;
                                break;
                        }
                }
@@ -897,8 +1030,8 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
                dout(30, "put_wrbuffer_cap_refs on %p cap_snap %p "
                     " snap %lld %d/%d -> %d/%d %s%s\n",
                     inode, capsnap, capsnap->context->seq,
-                    ci->i_wrbuffer_ref+nr, capsnap->dirty + nr,
-                    ci->i_wrbuffer_ref, capsnap->dirty,
+                    ci->i_wrbuffer_ref+nr, capsnap->dirty_pages + nr,
+                    ci->i_wrbuffer_ref, capsnap->dirty_pages,
                     last ? " (wrbuffer last)" : "",
                     last_snap ? " (capsnap last)" : "");
        }
@@ -946,8 +1079,8 @@ static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
        int tried_invalidate = 0;
        int ret;
 
-       dout(10, "handle_cap_grant inode %p ci %p mds%d seq %d\n",
-            inode, ci, mds, seq);
+       dout(10, "handle_cap_grant inode %p ci %p mds%d seq %d %s\n",
+            inode, ci, mds, seq, ceph_cap_string(newcaps));
        dout(10, " size %llu max_size %llu, i_size %llu\n", size, max_size,
                inode->i_size);
        spin_lock(&inode->i_lock);
@@ -1005,9 +1138,23 @@ start:
                goto start;
        }
 
+       issued = __ceph_caps_issued(ci, NULL);
+
+       if ((issued & CEPH_CAP_AUTH_EXCL) == 0) {
+               inode->i_mode = le32_to_cpu(grant->mode);
+               inode->i_uid = le32_to_cpu(grant->uid);
+               inode->i_gid = le32_to_cpu(grant->gid);
+       }
+       
+       if ((issued & CEPH_CAP_LINK_EXCL) == 0) {
+               inode->i_nlink = le32_to_cpu(grant->nlink);
+       }
+
+       if ((issued & CEPH_CAP_XATTR_EXCL) == 0 && grant->xattr_len) {
+#warning fix xattr cap update
+       }
 
        /* size/ctime/mtime/atime? */
-       issued = __ceph_caps_issued(ci, NULL);
        ceph_decode_timespec(&mtime, &grant->mtime);
        ceph_decode_timespec(&atime, &grant->atime);
        ceph_decode_timespec(&ctime, &grant->ctime);
@@ -1030,10 +1177,12 @@ start:
        /* check cap bits */
        wanted = __ceph_caps_wanted(ci);
        used = __ceph_caps_used(ci);
-       dout(10, " my wanted = %d, used = %d\n", wanted, used);
+       dout(10, " my wanted = %s, used = %s\n", ceph_cap_string(wanted),
+            ceph_cap_string(used));
        if (wanted != le32_to_cpu(grant->wanted)) {
-               dout(10, "mds wanted %d -> %d\n", le32_to_cpu(grant->wanted),
-                    wanted);
+               dout(10, "mds wanted %s -> %s\n",
+                    ceph_cap_string(le32_to_cpu(grant->wanted)),
+                    ceph_cap_string(wanted));
                grant->wanted = cpu_to_le32(wanted);
        }
 
@@ -1044,7 +1193,8 @@ start:
 
        /* revocation? */
        if (cap->issued & ~newcaps) {
-               dout(10, "revocation: %d -> %d\n", cap->issued, newcaps);
+               dout(10, "revocation: %s -> %s\n", ceph_cap_string(cap->issued),
+                    ceph_cap_string(newcaps));
                if ((used & ~newcaps) & CEPH_CAP_FILE_WRBUFFER) {
                        writeback = 1; /* will delay ack */
                } else if (((used & ~newcaps) & CEPH_CAP_FILE_RDCACHE) == 0 ||
@@ -1071,9 +1221,11 @@ start:
 
        /* grant or no-op */
        if (cap->issued == newcaps) {
-               dout(10, "caps unchanged: %d -> %d\n", cap->issued, newcaps);
+               dout(10, "caps unchanged: %s -> %s\n",
+                    ceph_cap_string(cap->issued), ceph_cap_string(newcaps));
        } else {
-               dout(10, "grant: %d -> %d\n", cap->issued, newcaps);
+               dout(10, "grant: %s -> %s\n", ceph_cap_string(cap->issued),
+                    ceph_cap_string(newcaps));
                cap->issued = newcaps;
                cap->implemented |= newcaps;    /* add bits only, to
                                                 * avoid stepping on a
@@ -1105,27 +1257,39 @@ out:
 
 
 /*
- * Handle RELEASE from MDS.  That means we can throw away our cap
- * state as the MDS has fully flushed that metadata to disk.
+ * Handle FLUSH_ACK from MDS, indicating that metadata we sent to the
+ * MDS has been safely recorded.
  */
-static void handle_cap_released(struct inode *inode,
+static void handle_cap_flush_ack(struct inode *inode,
                                struct ceph_mds_caps *m,
                                struct ceph_mds_session *session)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
-       int seq = le32_to_cpu(m->seq);
-       int removed_last;
        struct ceph_cap *cap;
-
-       dout(10, "handle_cap_released inode %p ci %p mds%d seq %d\n", inode, ci,
-            session->s_mds, seq);
+       unsigned seq = le32_to_cpu(m->seq);
+       int caps = le32_to_cpu(m->caps);
+       int dirty = le32_to_cpu(m->dirty);
+       int cleaned = dirty & ~caps;
+       int removed_last = 0;
 
        spin_lock(&inode->i_lock);
        cap = __get_cap_for_mds(inode, session->s_mds);
        BUG_ON(!cap);
-       removed_last = __ceph_remove_cap(cap);
-       if (removed_last)
-               __cap_delay_cancel(&ceph_inode_to_client(inode)->mdsc, ci);
+       dout(10, "handle_cap_flush_ack inode %p mds%d seq %d cleaned %s,"
+            " flushing %s -> %s\n",
+            inode, session->s_mds, seq, ceph_cap_string(cleaned),
+            ceph_cap_string(cap->flushing),
+            ceph_cap_string(cap->flushing & ~cleaned));
+       cap->flushing &= ~cleaned;
+
+       /* did we release this cap, too? */
+       if (caps == 0 && seq == cap->seq) {
+               BUG_ON(cap->flushing);
+               removed_last = __ceph_remove_cap(cap);
+               if (removed_last)
+                       __cap_delay_cancel(&ceph_inode_to_client(inode)->mdsc,
+                                          ci);
+       }
        spin_unlock(&inode->i_lock);
        if (removed_last)
                iput(inode);
@@ -1133,14 +1297,14 @@ static void handle_cap_released(struct inode *inode,
 
 
 /*
- * Handle FLUSHEDSNAP.  MDS has flushed snap data to disk and we can
+ * Handle FLUSHSNAP_ACK.  MDS has flushed snap data to disk and we can
  * throw away our cap_snap.
  *
  * Caller hold s_mutex, snap_rwsem.
  */
-static void handle_cap_flushedsnap(struct inode *inode,
-                                  struct ceph_mds_caps *m,
-                                  struct ceph_mds_session *session)
+static void handle_cap_flushsnap_ack(struct inode *inode,
+                                    struct ceph_mds_caps *m,
+                                    struct ceph_mds_session *session)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        u64 follows = le64_to_cpu(m->snap_follows);
@@ -1148,14 +1312,14 @@ static void handle_cap_flushedsnap(struct inode *inode,
        struct ceph_cap_snap *capsnap;
        int drop = 0;
 
-       dout(10, "handle_cap_flushedsnap inode %p ci %p mds%d follows %lld\n",
+       dout(10, "handle_cap_flushsnap_ack inode %p ci %p mds%d follows %lld\n",
             inode, ci, session->s_mds, follows);
 
        spin_lock(&inode->i_lock);
        list_for_each(p, &ci->i_cap_snaps) {
                capsnap = list_entry(p, struct ceph_cap_snap, ci_item);
                if (capsnap->follows == follows) {
-                       WARN_ON(capsnap->dirty || capsnap->writing);
+                       WARN_ON(capsnap->dirty_pages || capsnap->writing);
                        dout(10, " removing cap_snap %p follows %lld\n",
                             capsnap, follows);
                        ceph_put_snap_context(capsnap->context);
@@ -1371,9 +1535,14 @@ void ceph_handle_caps(struct ceph_mds_client *mdsc,
        dout(20, " op %s ino %llx inode %p\n", ceph_cap_op_name(op), vino.ino,
             inode);
        if (!inode) {
-               dout(10, " i don't have ino %llx, sending release\n", vino.ino);
-               send_cap_msg(mdsc, vino.ino, CEPH_CAP_OP_RELEASE, 0, 0, seq,
-                            size, 0, 0, NULL, NULL, 0, 0, 0, 0, 0, mds);
+               dout(10, " i don't have ino %llx\n", vino.ino);
+               if (op == CEPH_CAP_OP_IMPORT)
+                       send_cap_msg(mdsc, vino.ino, CEPH_CAP_OP_RELEASE,
+                                    0, 0, 0,
+                                    seq, 0,
+                                    size, 0, NULL, NULL, 0,
+                                    0, 0, 0,
+                                    0, mds);
                goto no_inode;
        }
 
@@ -1392,15 +1561,15 @@ void ceph_handle_caps(struct ceph_mds_client *mdsc,
                handle_cap_trunc(inode, h, session);
                break;
 
-       case CEPH_CAP_OP_RELEASED:
-               handle_cap_released(inode, h, session);
+       case CEPH_CAP_OP_FLUSH_ACK:
+               handle_cap_flush_ack(inode, h, session);
                up_write(&mdsc->snap_rwsem);
                if (list_empty(&session->s_caps))
                        ceph_mdsc_flushed_all_caps(mdsc, session);
                break;
 
-       case CEPH_CAP_OP_FLUSHEDSNAP:
-               handle_cap_flushedsnap(inode, h, session);
+       case CEPH_CAP_OP_FLUSHSNAP_ACK:
+               handle_cap_flushsnap_ack(inode, h, session);
                up_write(&mdsc->snap_rwsem);
                break;
 
index f7bd10a97adef62614043500218160dd76bb6b66..9b1795a97f12d8288dd5edce4f066b07d6c935c9 100644 (file)
@@ -399,6 +399,9 @@ retry_snap:
                        ret = sync_page_range(inode, mapping, pos, ret);
                }
        }
+       if (ret >= 0)
+               ci->i_dirty_caps |= CEPH_CAP_FILE_WR;
+
 out:
        dout(10, "aio_write %p %llu~%u  dropping cap refs on %d\n",
             inode, pos, (unsigned)iov->iov_len, got);
index 417abcc1cd2630003f0c60813399fb3a54eb8f92..b91a4f7e634e733d75ba0e12df0489ff687f5c43 100644 (file)
@@ -258,6 +258,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
        ci->i_xattr_data = NULL;
 
        ci->i_caps = RB_ROOT;
+       ci->i_dirty_caps = 0;
        for (i = 0; i < CEPH_FILE_MODE_NUM; i++)
                ci->i_nr_by_mode[i] = 0;
        init_waitqueue_head(&ci->i_cap_wq);
@@ -1204,6 +1205,7 @@ void ceph_put_fmode(struct ceph_inode_info *ci, int fmode)
        spin_lock(&ci->vfs_inode.i_lock);
        dout(20, "put_mode %p fmode %d %d -> %d\n", &ci->vfs_inode, fmode,
             ci->i_nr_by_mode[fmode], ci->i_nr_by_mode[fmode]-1);
+       WARN_ON(ci->i_nr_by_mode[fmode] == 0);
        if (--ci->i_nr_by_mode[fmode] == 0)
                last++;
        spin_unlock(&ci->vfs_inode.i_lock);
@@ -1445,6 +1447,7 @@ static int ceph_setattr_time(struct dentry *dentry, struct iattr *attr)
                if (ia_valid & ATTR_MTIME)
                        inode->i_mtime = attr->ia_mtime;
                inode->i_ctime = CURRENT_TIME;
+               ci->i_dirty_caps |= CEPH_CAP_FILE_EXCL;
                return 0;
        }
 
@@ -1460,6 +1463,7 @@ static int ceph_setattr_time(struct dentry *dentry, struct iattr *attr)
                if (ia_valid & ATTR_MTIME)
                        inode->i_mtime = attr->ia_mtime;
                inode->i_ctime = CURRENT_TIME;
+               ci->i_dirty_caps |= CEPH_CAP_FILE_WR;
                return 0;
        }
        /* if i have valid values, this may be a no-op */
index 8d8080f886c0ceb4065b550534031be8cec7074d..27d08899178adb7c65a2b351055315f0793e4e1b 100644 (file)
@@ -327,6 +327,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci,
                capsnap->follows = snapc->seq - 1;
                capsnap->context = ceph_get_snap_context(snapc);
                capsnap->issued = __ceph_caps_issued(ci, NULL);
+               capsnap->dirty = __ceph_caps_dirty(ci);
 
                capsnap->mode = inode->i_mode;
                capsnap->uid = inode->i_uid;
@@ -339,7 +340,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci,
                /* dirty page count moved from _head to this cap_snap;
                   all subsequent writes page dirties occur _after_ this
                   snapshot. */
-               capsnap->dirty = ci->i_wrbuffer_ref_head;
+               capsnap->dirty_pages = ci->i_wrbuffer_ref_head;
                ci->i_wrbuffer_ref_head = 0;
                ceph_put_snap_context(ci->i_head_snapc);
                ci->i_head_snapc = NULL;
@@ -382,11 +383,11 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
        capsnap->atime = inode->i_atime;
        capsnap->ctime = inode->i_ctime;
        capsnap->time_warp_seq = ci->i_time_warp_seq;
-       if (capsnap->dirty) {
+       if (capsnap->dirty_pages) {
                dout(10, "finish_cap_snap %p cap_snap %p snapc %p %llu s=%llu "
                     "still has %d dirty pages\n", inode, capsnap,
                     capsnap->context, capsnap->context->seq,
-                    capsnap->size, capsnap->dirty);
+                    capsnap->size, capsnap->dirty_pages);
                return 0;
        }
        dout(10, "finish_cap_snap %p cap_snap %p snapc %p %llu s=%llu clean\n",
index e7c20db2dd05aa6ec5b0b8122d9d1b652ae61189..489c1eea05b3fb2dbfd86f2ff2ab297e6e6eb07b 100644 (file)
@@ -718,21 +718,15 @@ static struct dentry *open_root_dentry(struct ceph_client *client,
 
        /* open dir */
        dout(30, "open_root_inode opening '%s'\n", path);
-       req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_OPEN,
+       req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LSTAT,
                                       1, path, 0, NULL,
                                       NULL, USE_ANY_MDS);
        if (IS_ERR(req))
                return ERR_PTR(PTR_ERR(req));
        req->r_started = started;
        req->r_timeout = client->mount_args.mount_timeout * HZ;
-       req->r_expected_cap = kmalloc(sizeof(struct ceph_cap), GFP_NOFS);
-       if (!req->r_expected_cap) {
-               root = ERR_PTR(-ENOMEM);
-               goto out;
-       }
        reqhead = req->r_request->front.iov_base;
-       reqhead->args.open.flags = cpu_to_le32(O_DIRECTORY);
-       reqhead->args.open.mode = 0;
+       reqhead->args.stat.mask = CEPH_STAT_CAP_INODE;
        err = ceph_mdsc_do_request(mdsc, req);
        if (err == 0) {
                root = req->r_last_dentry;
@@ -740,7 +734,6 @@ static struct dentry *open_root_dentry(struct ceph_client *client,
                dout(30, "open_root_inode success, root dentry is %p\n", root);
        } else
                root = ERR_PTR(err);
-out:
        ceph_mdsc_put_request(req);
        return root;
 }
@@ -816,14 +809,6 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
                goto out;
        }
 
-       /*
-        * Drop the reference we just got, since the VFS doesn't give
-        * us a reliable way to drop it later when a particular
-        * vfsmount goes away.  If the directory we just mounted on is
-        * renamed on the server, we are screwed.
-        */
-       ceph_put_fmode(ceph_inode(root->d_inode), CEPH_FILE_MODE_PIN);
-
        mnt->mnt_root = root;
        mnt->mnt_sb = client->sb;
        client->mount_state = CEPH_MOUNT_MOUNTED;
index 91db5b7bae7361fd8991bfbb56e1030445be1dfe..da09b62d46491c342751908dd8b361a87c0c739b 100644 (file)
@@ -135,6 +135,7 @@ struct ceph_cap {
        int mds;
        int issued;       /* latest, from the mds */
        int implemented;  /* what we've implemented (for tracking revocation) */
+       int flushing;     /* dirty fields being written back to mds */
        u32 seq, mseq, gen;
 };
 
@@ -148,7 +149,7 @@ struct ceph_cap_snap {
 
        struct list_head ci_item;
        u64 follows;
-       int issued;
+       int issued, dirty;
        struct ceph_snap_context *context;
        
        mode_t mode;
@@ -162,7 +163,7 @@ struct ceph_cap_snap {
        struct timespec mtime, atime, ctime;
        u64 time_warp_seq;
        int writing;   /* a sync write is still in progress */
-       int dirty;     /* dirty pages awaiting writeback */
+       int dirty_pages;     /* dirty pages awaiting writeback */
 };
 
 static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
@@ -224,6 +225,7 @@ struct ceph_inode_info {
        /* capabilities.  protected _both_ by i_lock and cap->session's
         * s_mutex. */
        struct rb_root i_caps;           /* cap list */
+       unsigned i_dirty_caps;           /* mask of dirtied fields */
        wait_queue_head_t i_cap_wq;      /* threads waiting on a capability */
        unsigned long i_hold_caps_until; /* jiffies */
        struct list_head i_cap_delay_list;  /* for delayed cap release to mds */
@@ -381,6 +383,8 @@ static inline int ceph_caps_issued(struct ceph_inode_info *ci)
        return issued;
 }
 
+extern int __ceph_caps_dirty(struct ceph_inode_info *ci);
+
 static inline int __ceph_caps_used(struct ceph_inode_info *ci)
 {
        int used = 0;
@@ -643,6 +647,7 @@ extern ssize_t ceph_listxattr(struct dentry *, char *, size_t);
 extern int ceph_removexattr(struct dentry *, const char *);
 
 /* caps.c */
+extern const char *ceph_cap_string(int c);
 extern void ceph_handle_caps(struct ceph_mds_client *mdsc,
                             struct ceph_msg *msg);
 extern int ceph_add_cap(struct inode *inode,