]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: flush/release all caps back to mds before closing a session
authorSage Weil <sage@newdream.net>
Fri, 24 Oct 2008 20:56:45 +0000 (13:56 -0700)
committerSage Weil <sage@newdream.net>
Fri, 24 Oct 2008 20:56:45 +0000 (13:56 -0700)
This allows us to handle races that can occur if an inode moves between
MDSs.  It also ensures that _all_ dirty cap data is safely flushed before
we throw it out.

To make this work, we drop the PIN reference on the subdirectory that
we mounted.  Since VFS doesn't tell us when each vfsmount goes away,
we can't remove them as we go.  And we're more or less screwed anyway
if the directory gets renamed (currently, at least), so there's not
much point in having this reference in the first place.

src/kernel/caps.c
src/kernel/mds_client.c
src/kernel/mds_client.h
src/kernel/super.c
src/kernel/super.h

index f5f12cf13ac74eca528c207684d468acc7740e74..f74ab0cc7509d42ff3ca3db48eb1aaf29a7fe47a 100644 (file)
@@ -499,8 +499,6 @@ retry:
                }
 
                follows = capsnap->follows;
-               next_follows = follows + 1;
-
                size = capsnap->size;
                atime = capsnap->atime;
                mtime = capsnap->mtime;
@@ -518,6 +516,7 @@ retry:
                             follows, mds);
 
                spin_lock(&inode->i_lock);
+               next_follows = follows + 1;
                goto retry;
        }
 
@@ -656,7 +655,8 @@ retry_locked:
                        continue;     /* nothing extra, all good */
 
                /* delay cap release for a bit? */
-               if (time_before(jiffies, ci->i_hold_caps_until)) {
+               if (!is_delayed &&
+                   time_before(jiffies, ci->i_hold_caps_until)) {
                        dout(30, "delaying cap release\n");
                        continue;
                }
@@ -1375,6 +1375,8 @@ void ceph_handle_caps(struct ceph_mds_client *mdsc,
        case CEPH_CAP_OP_RELEASED:
                handle_cap_released(inode, h, session);
                up_write(&mdsc->snap_rwsem);
+               if (list_empty(&session->s_caps))
+                       ceph_mdsc_flushed_all_caps(mdsc, session);
                break;
 
        case CEPH_CAP_OP_FLUSHEDSNAP:
@@ -1385,6 +1387,8 @@ void ceph_handle_caps(struct ceph_mds_client *mdsc,
        case CEPH_CAP_OP_EXPORT:
                handle_cap_export(inode, h, session);
                up_write(&mdsc->snap_rwsem);
+               if (list_empty(&session->s_caps))
+                       ceph_mdsc_flushed_all_caps(mdsc, session);
                break;
 
        case CEPH_CAP_OP_IMPORT:
@@ -1442,47 +1446,3 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
        spin_unlock(&mdsc->cap_delay_lock);
 }
 
-
-/*
- * Force a flush of any snap_caps and write caps we hold.
- *
- * Caller holds snap_rwsem, s_mutex.
- */
-void ceph_flush_write_caps(struct ceph_mds_client *mdsc,
-                          struct ceph_mds_session *session)
-{
-       struct list_head *p, *n;
-
-       dout(10, "flush_write_caps mds%d\n", session->s_mds);
-       list_for_each_safe (p, n, &session->s_caps) {
-               struct ceph_cap *cap =
-                       list_entry(p, struct ceph_cap, session_caps);
-               struct inode *inode = &cap->ci->vfs_inode;
-               int used, wanted;
-
-               spin_lock(&inode->i_lock);
-
-               if ((cap->implemented & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER)) == 0) {
-                       spin_unlock(&inode->i_lock);
-                       continue;
-               }
-
-               /* FIXME */
-               if (!list_empty(&cap->ci->i_cap_snaps))
-                       __ceph_flush_snaps(cap->ci, NULL);
-
-               used = __ceph_caps_used(cap->ci);
-               wanted = __ceph_caps_wanted(cap->ci);
-               if (used || wanted) {
-                       derr(0, "residual caps on %p u %d w %d s=%llu wrb=%d\n",
-                            inode, used, wanted, inode->i_size,
-                            cap->ci->i_wrbuffer_ref);
-                       used = wanted = 0;
-               }
-
-               /* __send_cap drops i_lock */
-               __send_cap(mdsc, session, cap, used, wanted);
-       }
-}
-
-
index 282dec87e3ff65c84d5b31bd9a5250a5277c6bcd..7526dde3db9bcb0784407d1c8064be6c61b0c043 100644 (file)
@@ -278,6 +278,7 @@ static const char *session_state_name(int s)
        case CEPH_MDS_SESSION_NEW: return "new";
        case CEPH_MDS_SESSION_OPENING: return "opening";
        case CEPH_MDS_SESSION_OPEN: return "open";
+       case CEPH_MDS_SESSION_FLUSHING: return "flushing";
        case CEPH_MDS_SESSION_CLOSING: return "closing";
        case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
        default: return "???";
@@ -951,6 +952,86 @@ static void renewed_caps(struct ceph_mds_client *mdsc,
                wake_up_session_caps(session);
 }
 
+
+
+static int request_close_session(struct ceph_mds_client *mdsc,
+                                struct ceph_mds_session *session)
+{
+       struct ceph_msg *msg;
+       int err = 0;
+
+       msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
+                                session->s_seq);
+       if (IS_ERR(msg))
+               err = PTR_ERR(msg);
+       else
+               ceph_send_msg_mds(mdsc, msg, session->s_mds);
+       return err;
+}
+
+/*
+ * check all caps on a session, without allowing release to
+ * be delayed.
+ */
+static void check_all_caps(struct ceph_mds_client *mdsc,
+                        struct ceph_mds_session *session)
+{
+       struct list_head *p, *n;
+
+       list_for_each_safe (p, n, &session->s_caps) {
+               struct ceph_cap *cap =
+                       list_entry(p, struct ceph_cap, session_caps);
+               struct inode *inode = &cap->ci->vfs_inode;
+
+               igrab(inode);
+               mutex_unlock(&session->s_mutex);
+               ceph_check_caps(ceph_inode(inode), 1);
+               mutex_lock(&session->s_mutex);
+               iput(inode);
+       }
+}
+
+/*
+ * Called with s_mutex held.
+ */
+static int __close_session(struct ceph_mds_client *mdsc,
+                        struct ceph_mds_session *session)
+{
+       int mds = session->s_mds;
+       int err = 0;
+
+       dout(10, "close_session mds%d state=%s\n", mds,
+            session_state_name(session->s_state));
+       if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
+               return 0;
+
+       check_all_caps(mdsc, session);
+
+       if (list_empty(&session->s_caps)) {
+               session->s_state = CEPH_MDS_SESSION_CLOSING;
+               err = request_close_session(mdsc, session);
+       } else {
+               session->s_state = CEPH_MDS_SESSION_FLUSHING;
+       }
+       return err;
+}
+
+/*
+ * Called when the last cap for a session has been flushed or
+ * exported.
+ */
+void ceph_mdsc_flushed_all_caps(struct ceph_mds_client *mdsc,
+                               struct ceph_mds_session *session)
+{
+       dout(10, "flushed_all_caps for mds%d state %s\n", session->s_mds,
+            session_state_name(session->s_state));
+       if (session->s_state == CEPH_MDS_SESSION_FLUSHING) {
+               session->s_state = CEPH_MDS_SESSION_CLOSING;
+               request_close_session(mdsc, session);
+       }
+}
+
+
 /*
  * handle a mds session control message
  */
@@ -987,13 +1068,16 @@ void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc,
 
        mutex_lock(&session->s_mutex);
 
-       dout(2, "handle_session mds%d %s %p seq %llu\n",
-            mds, ceph_session_op_name(op), session, seq);
+       dout(2, "handle_session mds%d %s %p state %s seq %llu\n",
+            mds, ceph_session_op_name(op), session,
+            session_state_name(session->s_state), seq);
        switch (op) {
        case CEPH_SESSION_OPEN:
                session->s_state = CEPH_MDS_SESSION_OPEN;
                renewed_caps(mdsc, session, 0);
                complete(&session->s_completion);
+               if (mdsc->stopping)
+                       __close_session(mdsc, session);
                break;
 
        case CEPH_SESSION_RENEWCAPS:
@@ -1665,7 +1749,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
                oldstate = ceph_mdsmap_get_state(oldmap, i);
                newstate = ceph_mdsmap_get_state(newmap, i);
 
-               dout(20, "check_new_map mds%d state %d -> %d state %s\n",
+               dout(20, "check_new_map mds%d state %d -> %d (session %s)\n",
                     i, oldstate, newstate, session_state_name(s->s_state));
                if (newstate < oldstate) {
                        /* if the state moved backwards, that means
@@ -1695,44 +1779,6 @@ static void check_new_map(struct ceph_mds_client *mdsc,
 
 
 
-static int request_close_session(struct ceph_mds_client *mdsc,
-                                struct ceph_mds_session *session)
-{
-       struct ceph_msg *msg;
-       int err = 0;
-
-       msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
-                                session->s_seq);
-       if (IS_ERR(msg))
-               err = PTR_ERR(msg);
-       else
-               ceph_send_msg_mds(mdsc, msg, session->s_mds);
-       return err;
-}
-
-static int close_session(struct ceph_mds_client *mdsc,
-                        struct ceph_mds_session *session)
-{
-       int mds = session->s_mds;
-       int err = 0;
-
-       dout(10, "close_session mds%d\n", mds);
-       mutex_lock(&session->s_mutex);
-
-       if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
-               goto done;
-
-       ceph_flush_write_caps(mdsc, session);
-
-       session->s_state = CEPH_MDS_SESSION_CLOSING;
-       err = request_close_session(mdsc, session);
-
-done:
-       mutex_unlock(&session->s_mutex);
-       return err;
-}
-
-
 /*
  * leases
  */
@@ -2048,7 +2094,10 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
        dout(10, "close_sessions\n");
        mdsc->stopping = 1;
 
-       /* clean out cap delay list */
+       /*
+        * clean out the delayed cap list; we will flush everything
+        * explicitly below.
+        */
        spin_lock(&mdsc->cap_delay_lock);
        while (!list_empty(&mdsc->cap_delay_list)) {
                struct ceph_inode_info *ci;
@@ -2079,7 +2128,9 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
                        if (!session)
                                continue;
                        mutex_unlock(&mdsc->mutex);
-                       close_session(mdsc, session);
+                       mutex_lock(&session->s_mutex);
+                       __close_session(mdsc, session);
+                       mutex_unlock(&session->s_mutex);
                        ceph_put_mds_session(session);
                        mutex_lock(&mdsc->mutex);
                        n++;
@@ -2092,7 +2143,6 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
 
                dout(10, "waiting for sessions to close\n");
                mutex_unlock(&mdsc->mutex);
-
                wait_for_completion_timeout(&mdsc->session_close_waiters,
                                            timeout);
                mutex_lock(&mdsc->mutex);
index 52e5d300d242f7982c34e20f9a1ef03173b0d7f7..ab13c6b2ee044d237d9825b23b575c544491ca99 100644 (file)
@@ -101,7 +101,8 @@ enum {
        CEPH_MDS_SESSION_NEW = 1,
        CEPH_MDS_SESSION_OPENING = 2,
        CEPH_MDS_SESSION_OPEN = 3,
-       CEPH_MDS_SESSION_CLOSING = 4,
+       CEPH_MDS_SESSION_FLUSHING = 4,
+       CEPH_MDS_SESSION_CLOSING = 5,
        CEPH_MDS_SESSION_RECONNECTING = 6
 };
 
@@ -248,4 +249,7 @@ extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc);
 
 extern void ceph_mdsc_handle_reset(struct ceph_mds_client *mdsc, int mds);
 
+extern void ceph_mdsc_flushed_all_caps(struct ceph_mds_client *mdsc,
+                                      struct ceph_mds_session *session);
+
 #endif
index 23f1cb3932b3115cb32e522fe257a40bcefb825f..f4842505780486705bfc9e9d305e54d78ed30d7b 100644 (file)
@@ -813,6 +813,15 @@ static int ceph_mount(struct ceph_client *client, struct vfsmount *mnt,
                err = PTR_ERR(root);
                goto out;
        }
+
+       /*
+        * Drop the reference we just got, since the VFS doesn't give
+        * us a reliable way to drop it later when a particular
+        * vfsmount goes away.  If the directory we just mounted on is
+        * renamed on the server, we are screwed.
+        */
+       ceph_put_fmode(ceph_inode(root->d_inode), CEPH_FILE_MODE_PIN);
+
        mnt->mnt_root = root;
        mnt->mnt_sb = client->sb;
        client->mount_state = CEPH_MOUNT_MOUNTED;
index e1c5f0e4868d930d4355b6b2b1f3338ef4c70218..1026ebab42b6d3bae44ed64e2c0c03a30d6937ad 100644 (file)
@@ -629,8 +629,6 @@ extern void __ceph_flush_snaps(struct ceph_inode_info *ci,
                               struct ceph_mds_session **psession);
 extern void ceph_check_caps(struct ceph_inode_info *ci, int delayed);
 extern void ceph_check_delayed_caps(struct ceph_mds_client *mdsc);
-extern void ceph_flush_write_caps(struct ceph_mds_client *mdsc,
-                                 struct ceph_mds_session *session);
 
 /* addr.c */
 extern const struct address_space_operations ceph_aops;