]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: track and kick flushing caps per-mds
authorSage Weil <sage@newdream.net>
Wed, 8 Jul 2009 22:55:03 +0000 (15:55 -0700)
committerSage Weil <sage@newdream.net>
Wed, 8 Jul 2009 22:55:03 +0000 (15:55 -0700)
After an MDS restarts, we need to resend cap flush messages we
were trying to write back.  So, track flushing caps per-session,
with a global counter to make ceph_mdsc_sync simple.  When the
MDS goes active, re-flush anything that was flushing.

Also change old 'sync' terminology to more descriptive 'flushing'.

src/kernel/caps.c
src/kernel/inode.c
src/kernel/mds_client.c
src/kernel/mds_client.h
src/kernel/super.h

index c4f0a3ed85d1e7da3066d9d082243e03669ff75c..7dc820e3d1380b51b7eacc7ad492e2dde861f458 100644 (file)
@@ -1138,16 +1138,18 @@ static void ceph_flush_snaps(struct ceph_inode_info *ci)
 /*
  * Add dirty inode to the sync (currently flushing) list.
  */
-static void __mark_caps_sync(struct inode *inode)
+static void __mark_caps_flushing(struct inode *inode,
+                                struct ceph_mds_session *session)
 {
        struct ceph_mds_client *mdsc = &ceph_client(inode->i_sb)->mdsc;
        struct ceph_inode_info *ci = ceph_inode(inode);
 
        BUG_ON(list_empty(&ci->i_dirty_item));
        spin_lock(&mdsc->cap_dirty_lock);
-       if (list_empty(&ci->i_sync_item)) {
-               dout(20, " inode %p now sync\n", &ci->vfs_inode);
-               list_add(&ci->i_sync_item, &mdsc->cap_sync);
+       if (list_empty(&ci->i_flushing_item)) {
+               dout(20, " inode %p now flushing\n", &ci->vfs_inode);
+               list_add(&ci->i_flushing_item, &session->s_cap_flushing);
+               mdsc->num_cap_flushing++;
        }
        spin_unlock(&mdsc->cap_dirty_lock);
 }
@@ -1359,7 +1361,7 @@ ack:
                             ceph_cap_string(ci->i_flushing_caps | flushing));
                        ci->i_flushing_caps |= flushing;
                        ci->i_dirty_caps = 0;
-                       __mark_caps_sync(inode);
+                       __mark_caps_flushing(inode, session);
                }
 
                mds = cap->mds;  /* remember mds, so we don't repeat */
@@ -1452,7 +1454,7 @@ retry:
                if (cap->session->s_state < CEPH_MDS_SESSION_OPEN)
                        goto out;
 
-               __mark_caps_sync(inode);
+               __mark_caps_flushing(inode, session);
 
                flushing = ci->i_dirty_caps;
                dout(10, " flushing %s, flushing_caps %s -> %s\n",
@@ -1511,6 +1513,41 @@ int ceph_write_inode(struct inode *inode, int wait)
 }
 
 
+/*
+ * After a recovering MDS goes active, we need to resend any caps
+ * we were flushing.
+ *
+ * Caller holds session->s_mutex.
+ */
+void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
+                            struct ceph_mds_session *session)
+{
+       struct ceph_inode_info *ci;
+
+       dout(10, "kick_flushing_caps mds%d\n", session->s_mds); 
+       list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
+               struct inode *inode = &ci->vfs_inode;
+               struct ceph_cap *cap;
+
+               spin_lock(&inode->i_lock);
+               cap = ci->i_auth_cap;
+               if (cap && cap->session == session) {
+                       dout(20, "kick_flushing_caps %p cap %p %s\n", inode,
+                            cap, ceph_cap_string(ci->i_flushing_caps));
+                       __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
+                                  __ceph_caps_used(ci),
+                                  __ceph_caps_wanted(ci),
+                                  cap->issued | cap->implemented,
+                                  ci->i_flushing_caps);
+               } else {
+                       dout(0, " %p auth cap %p not mds%d ???\n", inode, cap,
+                            session->s_mds);
+                       spin_unlock(&inode->i_lock);
+               }
+       }
+}
+
+
 /*
  * Take references to capabilities we hold, so that we don't release
  * them to the MDS prematurely.
@@ -2030,10 +2067,20 @@ static void handle_cap_flush_ack(struct inode *inode,
        new_dirty = ci->i_dirty_caps | ci->i_flushing_caps;
        if (old_dirty) {
                spin_lock(&mdsc->cap_dirty_lock);
-               list_del_init(&ci->i_sync_item);
-               if (list_empty(&mdsc->cap_sync))
-                       wake_up(&mdsc->cap_sync_wq);
-               dout(20, " inode %p now !sync\n", inode);
+               list_del_init(&ci->i_flushing_item);
+               if (!list_empty(&session->s_cap_flushing))
+                       dout(20, " mds%d still flushing cap on %p\n",
+                            session->s_mds,
+                            &list_entry(session->s_cap_flushing.next,
+                                        struct ceph_inode_info,
+                                        i_flushing_item)->vfs_inode);
+               mdsc->num_cap_flushing--;
+               if (!mdsc->num_cap_flushing)
+                       wake_up(&mdsc->cap_flushing_wq);
+               else
+                       dout(20, " still %d caps flushing\n",
+                            mdsc->num_cap_flushing);
+               dout(20, " inode %p now !flushing\n", inode);
                if (!new_dirty) {
                        dout(20, " inode %p now clean\n", inode);
                        list_del_init(&ci->i_dirty_item);
index 149d5b402cb98b708a3360f745475ce403fffa7c..458f69083d4d0cfc2bf90be54e5bbf099f811329 100644 (file)
@@ -274,7 +274,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
        ci->i_dirty_caps = 0;
        ci->i_flushing_caps = 0;
        INIT_LIST_HEAD(&ci->i_dirty_item);
-       INIT_LIST_HEAD(&ci->i_sync_item);
+       INIT_LIST_HEAD(&ci->i_flushing_item);
        init_waitqueue_head(&ci->i_cap_wq);
        ci->i_hold_caps_min = 0;
        ci->i_hold_caps_max = 0;
index 8b94fa9d2a3948197b5e78e857727d158bd1025d..73d45a7b7364c5d245b1fb8b50467c3be05cf630 100644 (file)
@@ -307,6 +307,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
        s->s_num_cap_releases = 0;
        INIT_LIST_HEAD(&s->s_cap_releases);
        INIT_LIST_HEAD(&s->s_cap_releases_done);
+       INIT_LIST_HEAD(&s->s_cap_flushing);
 
        dout(10, "register_session mds%d\n", mds);
        if (mds >= mdsc->max_sessions) {
@@ -772,7 +773,6 @@ static void renewed_caps(struct ceph_mds_client *mdsc,
 }
 
 
-
 static int request_close_session(struct ceph_mds_client *mdsc,
                                 struct ceph_mds_session *session)
 {
@@ -2134,8 +2134,11 @@ static void check_new_map(struct ceph_mds_client *mdsc,
                 * sure it successfully forwarded our request before
                 * it died.
                 */
-               if (newstate >= CEPH_MDS_STATE_ACTIVE)
+               if (oldstate < CEPH_MDS_STATE_ACTIVE &&
+                   newstate >= CEPH_MDS_STATE_ACTIVE) {
                        kick_requests(mdsc, i, 1);
+                       ceph_kick_flushing_caps(mdsc, s);
+               }
        }
 }
 
@@ -2445,9 +2448,9 @@ void ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
        INIT_LIST_HEAD(&mdsc->snap_flush_list);
        spin_lock_init(&mdsc->snap_flush_lock);
        INIT_LIST_HEAD(&mdsc->cap_dirty);
-       INIT_LIST_HEAD(&mdsc->cap_sync);
+       mdsc->num_cap_flushing = 0;
        spin_lock_init(&mdsc->cap_dirty_lock);
-       init_waitqueue_head(&mdsc->cap_sync_wq);
+       init_waitqueue_head(&mdsc->cap_flushing_wq);
        spin_lock_init(&mdsc->dentry_lru_lock);
        INIT_LIST_HEAD(&mdsc->dentry_lru);
 }
@@ -2523,19 +2526,19 @@ void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
  */
 static int are_no_sync_caps(struct ceph_mds_client *mdsc)
 {
-       int empty;
+       int num;
        spin_lock(&mdsc->cap_dirty_lock);
-       empty = list_empty(&mdsc->cap_sync);
+       num = mdsc->num_cap_flushing;
        spin_unlock(&mdsc->cap_dirty_lock);
-       dout(20, "are_no_sync_caps = %d\n", empty);
-       return empty;
+       dout(20, "are_no_sync_caps = %d\n", num);
+       return num == 0;
 }
 
 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
 {
        dout(10, "sync\n");
        ceph_check_delayed_caps(mdsc);
-       wait_event(mdsc->cap_sync_wq, are_no_sync_caps(mdsc));
+       wait_event(mdsc->cap_flushing_wq, are_no_sync_caps(mdsc));
 }
 
 
index 039b9e5c713e36695c89eba3b38ace052a6f0d55..c3dd837d10e2964900a9d58675098eeb37042870 100644 (file)
@@ -128,6 +128,8 @@ struct ceph_mds_session {
        int               s_num_cap_releases;
        struct list_head  s_cap_releases; /* waiting cap_release messages */
        struct list_head  s_cap_releases_done; /* ready to send */
+
+       struct list_head  s_cap_flushing;      /* inodes w/ flushing caps */
 };
 
 /*
@@ -262,9 +264,10 @@ struct ceph_mds_client {
        spinlock_t       cap_delay_lock;   /* protects cap_delay_list */
        struct list_head snap_flush_list;  /* cap_snaps ready to flush */
        spinlock_t       snap_flush_lock;
-       struct list_head cap_dirty, cap_sync; /* inodes with dirty cap data */
+       struct list_head cap_dirty;        /* inodes with dirty caps */
+       int num_cap_flushing;              /* # caps we are flushing */
        spinlock_t       cap_dirty_lock;
-       wait_queue_head_t cap_sync_wq;
+       wait_queue_head_t cap_flushing_wq;
 
        struct dentry           *debugfs_file;
 
index db184d4efcfb633a18b72902790c6e39ffae8c10..02a4372243704d099e59648487fe63e5243a2d42 100644 (file)
@@ -306,7 +306,7 @@ struct ceph_inode_info {
        struct rb_root i_caps;           /* cap list */
        struct ceph_cap *i_auth_cap;     /* authoritative cap, if any */
        unsigned i_dirty_caps, i_flushing_caps;     /* mask of dirtied fields */
-       struct list_head i_dirty_item, i_sync_item;
+       struct list_head i_dirty_item, i_flushing_item;
        wait_queue_head_t i_cap_wq;      /* threads waiting on a capability */
        unsigned long i_hold_caps_min; /* jiffies */
        unsigned long i_hold_caps_max; /* jiffies */
@@ -848,6 +848,8 @@ static inline void ceph_remove_cap(struct ceph_cap *cap)
 
 extern void ceph_queue_caps_release(struct inode *inode);
 extern int ceph_write_inode(struct inode *inode, int unused);
+extern void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
+                                   struct ceph_mds_session *session);
 extern int ceph_get_cap_mds(struct inode *inode);
 extern void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps);
 extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);