]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: fix up cap vs session locking mess with s_mutex
authorSage Weil <sage@newdream.net>
Mon, 14 Apr 2008 18:29:13 +0000 (11:29 -0700)
committerSage Weil <sage@newdream.net>
Mon, 14 Apr 2008 18:29:13 +0000 (11:29 -0700)
src/kernel/client.c
src/kernel/inode.c
src/kernel/mds_client.c
src/kernel/mds_client.h
src/kernel/super.h

index 06a6945113b663adf995dc372238a4aad0ba1935..ce8e8deadc2235fa5144158d95e31196a6027e00 100644 (file)
@@ -202,6 +202,7 @@ struct ceph_client *ceph_create_client(struct ceph_mount_args *args, struct supe
        cl->msgr = ceph_messenger_create(myaddr);
        if (IS_ERR(cl->msgr)) {
                err = PTR_ERR(cl->msgr);
+               cl->msgr = 0;
                goto fail;
        }
        cl->msgr->parent = cl;
@@ -238,9 +239,10 @@ void ceph_destroy_client(struct ceph_client *cl)
        /* unmount */
        /* ... */
 
+       if (cl->wb_wq)
+               destroy_workqueue(cl->wb_wq);
        ceph_messenger_destroy(cl->msgr);
        put_client_counter();
-       destroy_workqueue(cl->wb_wq);
        kfree(cl);
        dout(10, "destroy_client %p done\n", cl);
 }
index a3ef360e1e8ded434ccf8a534b83350fed2bb27c..1aab5d3940b44232e3d46d6268be9155a73edea2 100644 (file)
@@ -181,9 +181,7 @@ int ceph_fill_inode(struct inode *inode, struct ceph_mds_reply_inode *info)
 }
 
 /* 
- * inode lease lock order is
- *     inode->i_lock
- *          session->s_cap_lock
+ * caller should hold session s_mutex.
  */
 void ceph_update_inode_lease(struct inode *inode, 
                             struct ceph_mds_reply_lease *lease,
@@ -209,15 +207,11 @@ void ceph_update_inode_lease(struct inode *inode,
                ci->i_lease_ttl = ttl;
                ci->i_lease_mask = le16_to_cpu(lease->mask);
                if (ci->i_lease_session) {
-                       spin_lock(&ci->i_lease_session->s_cap_lock);
                        list_del(&ci->i_lease_item);
-                       spin_unlock(&ci->i_lease_session->s_cap_lock);
                } else
                        is_new = 1;
                ci->i_lease_session = session;
-               spin_lock(&session->s_cap_lock);
                list_add(&ci->i_lease_item, &session->s_inode_leases);
-               spin_unlock(&session->s_cap_lock);
        }
        spin_unlock(&inode->i_lock);
        if (is_new) {
@@ -235,9 +229,7 @@ void ceph_revoke_inode_lease(struct ceph_inode_info *ci, int mask)
             &ci->vfs_inode, ci->i_lease_mask, ci->i_lease_mask & ~mask);
        ci->i_lease_mask &= ~mask;
        if (ci->i_lease_mask == 0) {
-               spin_lock(&ci->i_lease_session->s_cap_lock);
                list_del(&ci->i_lease_item);
-               spin_unlock(&ci->i_lease_session->s_cap_lock);
                ci->i_lease_session = 0;
                drop = 1;
        }
@@ -280,9 +272,7 @@ out:
 
 
 /*
- * dentry lease lock order is
- *     dentry->d_lock
- *         session->s_cap_lock
+ * caller should hold session s_mutex.
  */
 void ceph_update_dentry_lease(struct dentry *dentry, 
                              struct ceph_mds_reply_lease *lease,
@@ -327,15 +317,10 @@ void ceph_update_dentry_lease(struct dentry *dentry,
        dentry->d_time = ttl;
 
        /* (re)add to session lru */
-       if (!is_new && di->lease_session) {
-               spin_lock(&di->lease_session->s_cap_lock);
+       if (!is_new && di->lease_session)
                list_del(&di->lease_item);
-               spin_unlock(&di->lease_session->s_cap_lock);
-       }
        di->lease_session = session;          
-       spin_lock(&session->s_cap_lock);
        list_add(&di->lease_item, &session->s_dentry_leases);
-       spin_unlock(&session->s_cap_lock);
        
        spin_unlock(&dentry->d_lock);
        if (is_new) {
@@ -359,9 +344,7 @@ void ceph_revoke_dentry_lease(struct dentry *dentry)
        di = ceph_dentry(dentry);
        if (di) {
                session = di->lease_session;
-               spin_lock(&session->s_cap_lock);
                list_del(&di->lease_item);
-               spin_unlock(&session->s_cap_lock);
                kfree(di);
                drop = 1;
                dentry->d_fsdata = 0;
@@ -652,9 +635,7 @@ static struct ceph_inode_cap *__get_cap_for_mds(struct inode *inode, int mds)
 }
 
 /*
- * lock ordering is
- *    inode->i_lock
- *         session->s_cap_lock
+ * caller shoudl hold session s_mutex.
  */
 struct ceph_inode_cap *ceph_add_cap(struct inode *inode,
                                    struct ceph_mds_session *session, 
@@ -699,10 +680,8 @@ struct ceph_inode_cap *ceph_add_cap(struct inode *inode,
 
                /* add to session cap list */
                cap->session = session;
-               spin_lock(&session->s_cap_lock);
                list_add(&cap->session_caps, &session->s_caps);
                session->s_nr_caps++;
-               spin_unlock(&session->s_cap_lock);
        }
 
        dout(10, "add_cap inode %p (%llx) got cap %xh now %xh seq %d from %d\n",
@@ -729,6 +708,9 @@ int __ceph_caps_issued(struct ceph_inode_info *ci)
        return have;
 }
 
+/*
+ * caller should hold i_lock and session s_mutex.
+ */
 void __ceph_remove_cap(struct ceph_inode_cap *cap)
 {
        struct ceph_mds_session *session = cap->session;
@@ -736,10 +718,8 @@ void __ceph_remove_cap(struct ceph_inode_cap *cap)
        dout(10, "__ceph_remove_cap %p from %p\n", cap, &cap->ci->vfs_inode);
 
        /* remove from session list */
-       spin_lock(&session->s_cap_lock);
        list_del(&cap->session_caps);
        session->s_nr_caps--;
-       spin_unlock(&session->s_cap_lock);
 
        /* remove from inode list */
        list_del(&cap->ci_caps);
@@ -751,6 +731,9 @@ void __ceph_remove_cap(struct ceph_inode_cap *cap)
                kfree(cap);
 }
 
+/*
+ * caller should hold session s_mutex.
+ */
 void ceph_remove_cap(struct ceph_inode_cap *cap)
 {
        struct inode *inode = &cap->ci->vfs_inode;
@@ -777,6 +760,7 @@ void ceph_check_caps(struct ceph_inode_info *ci)
        __u64 size, max_size;
        struct timespec mtime, atime;
        int mds;
+       struct ceph_mds_session *session = 0;  /* if non-NULL, i hold s_mutex */
 
 retry:
        spin_lock(&ci->vfs_inode.i_lock);
@@ -787,18 +771,17 @@ retry:
                int revoking, dropping;
                cap = list_entry(p, struct ceph_inode_cap, ci_caps);
 
+               /* note: no side-effects allowed, until we take s_mutex */
+               revoking = cap->implemented & ~cap->issued;
+
                if (ci->i_wanted_max_size > ci->i_max_size &&
                    ci->i_wanted_max_size > ci->i_requested_max_size)
                        goto ack;
 
                /* completed revocation? */
-               revoking = cap->implemented & ~cap->issued;
-               dout(20, "cap %p issued %d impl %d revoking %d used %d\n",
-                    cap, cap->issued, cap->implemented, revoking, used);
                if (revoking && (revoking && used) == 0) {
                        dout(10, "completed revocation of %d\n",
                             cap->implemented & ~cap->issued);
-                       cap->implemented = cap->issued;
                        goto ack;
                }
 
@@ -807,7 +790,6 @@ retry:
                    (ci->vfs_inode.i_size << 1) >= ci->i_max_size &&
                    (ci->i_reported_size << 1) < ci->i_max_size) {
                        dout(10, "i_size approaching max_size\n");
-                       ci->i_reported_size = ci->vfs_inode.i_size;
                        goto ack;
                }
                
@@ -815,12 +797,38 @@ retry:
                        continue;     /* nothing extra, all good */
 
        ack:
+               /* take s_mutex, one way or another */
+               if (session && session != cap->session) {
+                       dout(30, "oops, wrong session mutex\n");
+                       up(&session->s_mutex);
+                       session = 0;
+               }
+               if (!session) {
+                       session = cap->session;
+                       if (down_trylock(&session->s_mutex) != 0) {
+                               dout(10, "inverting session/inode locking\n");
+                               spin_unlock(&ci->vfs_inode.i_lock);
+                               down(&session->s_mutex);
+                               spin_unlock(&ci->vfs_inode.i_lock);
+                               goto retry;
+                       }
+               }
+
+               /* ok */
                dropping = cap->issued & ~wanted;
+               if (dropping & CEPH_CAP_RDCACHE) {
+                       dout(20, "invalidating pages on %p\n", &ci->vfs_inode);
+                       invalidate_mapping_pages(&ci->vfs_inode.i_data, 0, -1);
+               }
                cap->issued &= wanted;  /* drop bits we don't want */
 
+               if (revoking && (revoking && used) == 0) 
+                       cap->implemented = cap->issued;
+               
                keep = cap->issued;
                seq = cap->seq;
                size = ci->vfs_inode.i_size;
+               ci->i_reported_size = size;
                max_size = ci->i_wanted_max_size;
                ci->i_requested_max_size = max_size;
                mtime = ci->vfs_inode.i_mtime;
@@ -834,17 +842,17 @@ retry:
                                       keep, wanted, seq, 
                                       size, max_size, &mtime, &atime, mds);
 
-               if (dropping & CEPH_CAP_RDCACHE) {
-                       dout(20, "invalidating pages on %p\n", &ci->vfs_inode);
-                       invalidate_mapping_pages(&ci->vfs_inode.i_data, 0, -1);
-               }
                if (wanted == 0)
                        iput(&ci->vfs_inode);  /* removed cap */
+               up(&session->s_mutex);
                goto retry;
        }
 
        /* okay */
        spin_unlock(&ci->vfs_inode.i_lock);
+
+       if (session)
+               up(&session->s_mutex);
 }
 
 void ceph_inode_set_size(struct inode *inode, loff_t size)
@@ -1044,7 +1052,6 @@ void ceph_inode_writeback(struct work_struct *work)
        write_inode_now(&ci->vfs_inode, 0);
 }
 
-
 void apply_truncate(struct inode *inode, loff_t size)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
index c6b861ba13aa17b1eb28abeb404b9bd1889cb05d..036688da3be3932675d02b39f5e3f12a0f1597fe 100644 (file)
@@ -287,7 +287,7 @@ __register_session(struct ceph_mds_client *mdsc, int mds)
        s->s_mds = mds;
        s->s_state = CEPH_MDS_SESSION_NEW;
        s->s_cap_seq = 0;
-       spin_lock_init(&s->s_cap_lock);
+       init_MUTEX(&s->s_mutex);
        INIT_LIST_HEAD(&s->s_caps);
        INIT_LIST_HEAD(&s->s_inode_leases);
        INIT_LIST_HEAD(&s->s_dentry_leases);
@@ -544,12 +544,10 @@ static int resume_session(struct ceph_mds_client *mdsc,
        dout(10, "resume_session to mds%d\n", mds);
 
        /* note cap staleness */
-       spin_lock(&session->s_cap_lock);
        list_for_each(cp, &session->s_caps) {
                cap = list_entry(cp, struct ceph_inode_cap, session_caps);
                cap->issued = cap->implemented = 0;
        }
-       spin_unlock(&session->s_cap_lock);
 
        session->s_state = CEPH_MDS_SESSION_RESUMING;
 
@@ -603,19 +601,15 @@ static void remove_session_caps(struct ceph_mds_session *session)
         * we don't deadlock with __remove_cap in inode.c.
         */
        dout(10, "remove_session_caps on %p\n", session);
-       spin_lock(&session->s_cap_lock);
        while (session->s_nr_caps > 0) {
                cap = list_entry(session->s_caps.next, struct ceph_inode_cap,
                                 session_caps);
                ci = cap->ci;
                dout(10, "removing cap %p, ci is %p, inode is %p\n",
                     cap, ci, &ci->vfs_inode);
-               spin_unlock(&session->s_cap_lock);
                ceph_remove_cap(cap);
-               spin_lock(&session->s_cap_lock);
        }
        BUG_ON(session->s_nr_caps > 0);
-       spin_unlock(&session->s_cap_lock);
 }
 
 static void remove_session_leases(struct ceph_mds_session *session)
@@ -625,16 +619,12 @@ static void remove_session_leases(struct ceph_mds_session *session)
 
        dout(10, "remove_session_leases on %p\n", session);
 
-       spin_lock(&session->s_cap_lock);
-
        /* inodes */
        while (!list_empty(&session->s_inode_leases)) {
                ci = list_entry(session->s_inode_leases.next, 
                                struct ceph_inode_info, i_lease_item);
                dout(10, "removing lease from inode %p\n", &ci->vfs_inode);
-               spin_unlock(&session->s_cap_lock);
                ceph_revoke_inode_lease(ci, ci->i_lease_mask);
-               spin_lock(&session->s_cap_lock);
        }
 
        /* dentries */
@@ -642,12 +632,8 @@ static void remove_session_leases(struct ceph_mds_session *session)
                di = list_entry(session->s_dentry_leases.next, 
                                struct ceph_dentry_info, lease_item);
                dout(10, "removing lease from dentry %p\n", di->dentry);
-               spin_unlock(&session->s_cap_lock);
                ceph_revoke_dentry_lease(di->dentry);
-               spin_lock(&session->s_cap_lock);
        }
-
-       spin_unlock(&session->s_cap_lock);
 }
 
 void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc,
@@ -655,7 +641,7 @@ void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc,
 {
        __u32 op;
        __u64 seq;
-       struct ceph_mds_session *session;
+       struct ceph_mds_session *session = 0;
        int mds = le32_to_cpu(msg->hdr.src.name.num);
        struct ceph_mds_session_head *h = msg->front.iov_base;
 
@@ -669,6 +655,8 @@ void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc,
        /* handle */
        spin_lock(&mdsc->lock);
        session = __get_session(mdsc, mds);
+       down(&session->s_mutex);
+       
        dout(1, "handle_session %p op %d seq %llu\n", session, op, seq);
        switch (op) {
        case CEPH_SESSION_OPEN:
@@ -713,13 +701,13 @@ void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc,
        put_session(session);
        spin_unlock(&mdsc->lock);
 
-out:
+       up(&session->s_mutex);
        return;
 
 bad:
        dout(1, "corrupt session message, len %d, expected %d\n",
             (int)msg->front.iov_len, (int)sizeof(*h));
-       goto out;
+       return;
 }
 
 
@@ -916,6 +904,8 @@ void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
        BUG_ON(req->r_reply);
        spin_unlock(&mdsc->lock);
 
+       down(&req->r_session->s_mutex);
+
        /* parse */
        rinfo = &req->r_reply_info;
        err = parse_reply_info(msg, rinfo);
@@ -951,6 +941,7 @@ void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
        }
 
 done:
+       up(&req->r_session->s_mutex);
        spin_lock(&mdsc->lock);
        if (err) {
                req->r_reply = ERR_PTR(err);
@@ -1082,13 +1073,11 @@ void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
                session->s_state = CEPH_MDS_SESSION_RECONNECTING;
 
                /* estimate needed space */
-               spin_lock(&session->s_cap_lock);
                len += session->s_nr_caps *
                        sizeof(struct ceph_mds_cap_reconnect);
                len += session->s_nr_caps * (100); /* guess! */
                dout(40, "estimating i need %d bytes for %d caps\n",
                     len, session->s_nr_caps);
-               spin_unlock(&session->s_cap_lock);
        } else {
                dout(20, "no session for mds%d, will send short reconnect\n",
                     mds);
@@ -1096,6 +1085,9 @@ void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
 
        spin_unlock(&mdsc->lock);  /* drop lock for duration */
 
+       if (session)
+               down(&session->s_mutex);
+
 retry:
        /* build reply */
        reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, len, 0, 0, 0);
@@ -1114,7 +1106,6 @@ retry:
        dout(10, "session %p state %d\n", session, session->s_state);
 
        /* traverse this session's caps */
-       spin_lock(&session->s_cap_lock);
        ceph_encode_8(&p, 0);
        ceph_encode_32(&p, session->s_nr_caps);
        count = 0;
@@ -1149,7 +1140,6 @@ retry:
                dput(dentry);
                count++;
        }
-       spin_unlock(&session->s_cap_lock);
 
 send:
        reply->front.iov_len = p - reply->front.iov_base;
@@ -1167,6 +1157,10 @@ send:
                        dout(0, "WARNING: reconnect on %p raced and lost?\n",
                             session);
                }
+       }
+out:
+       if (session) {
+               up(&session->s_mutex);
                put_session(session);
        }
        return;
@@ -1177,7 +1171,6 @@ needmore:
        dout(30, "i guessed %d, and did %d of %d, retrying with %d\n",
             len, count, session->s_nr_caps, newlen);
        len = newlen;
-       spin_unlock(&session->s_cap_lock);
        ceph_msg_put(reply);
        goto retry;
 
@@ -1186,6 +1179,7 @@ bad:
        derr(0, "error %d generating reconnect.  what to do?\n", err);
        /* fixme */
        BUG_ON(1);
+       goto out;
 }
 
 /*
@@ -1302,11 +1296,15 @@ void ceph_mdsc_handle_filecaps(struct ceph_mds_client *mdsc,
        max_size = le64_to_cpu(h->max_size);
 
        /* find session */
+       spin_lock(&mdsc->lock);
        session = __get_session(&client->mdsc, mds);
+       spin_unlock(&mdsc->lock);
        if (!session) {
                dout(10, "WTF, got filecap but no session for mds%d\n", mds);
                return;
        }
+
+       down(&session->s_mutex);
        session->s_cap_seq++;
 
        /* lookup ino */
@@ -1318,19 +1316,12 @@ void ceph_mdsc_handle_filecaps(struct ceph_mds_client *mdsc,
 #endif
        dout(20, "op is %d, ino %llx %p\n", op, ino, inode);
 
-       if (inode && ceph_ino(inode) != ino) {
-               BUG_ON(sizeof(ino_t) >= sizeof(u64));
-               dout(10, "UH OH, lame ceph ino %llx -> %lu ino_t hash collided?"
-                    "  inode is %llx\n", ino, inot, ceph_ino(inode));
-               inode = 0;
-       }
-
        if (!inode) {
                dout(10, "wtf, i don't have ino %lu=%llx?  closing out cap\n",
                     inot, ino);
                ceph_mdsc_send_cap_ack(mdsc, ino, 0, 0, seq, 
                                       size, 0, 0, 0, mds);
-               return;
+               goto no_inode;
        }
 
        switch (op) {
@@ -1353,7 +1344,10 @@ void ceph_mdsc_handle_filecaps(struct ceph_mds_client *mdsc,
        }
 
        iput(inode);
+no_inode:
+       up(&session->s_mutex);
        return;
+
 bad:
        dout(10, "corrupt filecaps message\n");
        return;
@@ -1414,13 +1408,17 @@ void ceph_mdsc_handle_lease(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
        dname.len = msg->front.iov_len - sizeof(*h) - sizeof(__u32);
 
        /* find session */
+       spin_lock(&mdsc->lock);
        session = __get_session(mdsc, mds);
+       spin_unlock(&mdsc->lock);
        if (!session) {
                dout(10, "WTF, got lease but no session for mds%d\n", mds);
                return;
        }
        session->s_cap_seq++;
 
+       down(&session->s_mutex);
+
        /* lookup inode */
        inot = ceph_ino_to_ino(ino);
 #if BITS_PER_LONG == 64
@@ -1463,6 +1461,7 @@ release:
        h->action = CEPH_MDS_LEASE_RELEASE;
        ceph_msg_get(msg);
        send_msg_mds(mdsc, msg, mds);
+       up(&session->s_mutex);
        return;
 
 bad:
index 75a70244d13f918c90d4198de23197e7b9d7c2f1..1b57c5de1cc9f0c08a50d03b1e38d36cbdd5004b 100644 (file)
@@ -55,6 +55,7 @@ struct ceph_mds_session {
        int               s_mds;
        int               s_state;
        __u64             s_cap_seq;    /* cap message count/seq from mds */
+       struct semaphore  s_mutex;
        spinlock_t        s_cap_lock;
        struct list_head  s_caps;
        struct list_head  s_inode_leases, s_dentry_leases;
index 74e93a4814a7fe0a83cba150c6dd1669bf4357d1..5d41a000df3bb179a1ce6551c14e2229ebb31fea 100644 (file)
@@ -175,6 +175,7 @@ struct ceph_inode_info {
        unsigned long i_hashval;
 
        struct work_struct i_wb_work;  /* writeback work */
+       struct work_struct i_cap_work;  /* cap work */
 
        struct inode vfs_inode; /* at end */
 };