From: Sage Weil Date: Mon, 14 Apr 2008 18:29:13 +0000 (-0700) Subject: kclient: fix up cap vs session locking mess with s_mutex X-Git-Tag: v0.2~204^2~38 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=68f035b9c1c8fde2c1ba33b8f3420f4608c52195;p=ceph.git kclient: fix up cap vs session locking mess with s_mutex --- diff --git a/src/kernel/client.c b/src/kernel/client.c index 06a6945113b66..ce8e8deadc223 100644 --- a/src/kernel/client.c +++ b/src/kernel/client.c @@ -202,6 +202,7 @@ struct ceph_client *ceph_create_client(struct ceph_mount_args *args, struct supe cl->msgr = ceph_messenger_create(myaddr); if (IS_ERR(cl->msgr)) { err = PTR_ERR(cl->msgr); + cl->msgr = 0; goto fail; } cl->msgr->parent = cl; @@ -238,9 +239,10 @@ void ceph_destroy_client(struct ceph_client *cl) /* unmount */ /* ... */ + if (cl->wb_wq) + destroy_workqueue(cl->wb_wq); ceph_messenger_destroy(cl->msgr); put_client_counter(); - destroy_workqueue(cl->wb_wq); kfree(cl); dout(10, "destroy_client %p done\n", cl); } diff --git a/src/kernel/inode.c b/src/kernel/inode.c index a3ef360e1e8de..1aab5d3940b44 100644 --- a/src/kernel/inode.c +++ b/src/kernel/inode.c @@ -181,9 +181,7 @@ int ceph_fill_inode(struct inode *inode, struct ceph_mds_reply_inode *info) } /* - * inode lease lock order is - * inode->i_lock - * session->s_cap_lock + * caller should hold session s_mutex. */ void ceph_update_inode_lease(struct inode *inode, struct ceph_mds_reply_lease *lease, @@ -209,15 +207,11 @@ void ceph_update_inode_lease(struct inode *inode, ci->i_lease_ttl = ttl; ci->i_lease_mask = le16_to_cpu(lease->mask); if (ci->i_lease_session) { - spin_lock(&ci->i_lease_session->s_cap_lock); list_del(&ci->i_lease_item); - spin_unlock(&ci->i_lease_session->s_cap_lock); } else is_new = 1; ci->i_lease_session = session; - spin_lock(&session->s_cap_lock); list_add(&ci->i_lease_item, &session->s_inode_leases); - spin_unlock(&session->s_cap_lock); } spin_unlock(&inode->i_lock); if (is_new) { @@ -235,9 +229,7 @@ void ceph_revoke_inode_lease(struct ceph_inode_info *ci, int mask) &ci->vfs_inode, ci->i_lease_mask, ci->i_lease_mask & ~mask); ci->i_lease_mask &= ~mask; if (ci->i_lease_mask == 0) { - spin_lock(&ci->i_lease_session->s_cap_lock); list_del(&ci->i_lease_item); - spin_unlock(&ci->i_lease_session->s_cap_lock); ci->i_lease_session = 0; drop = 1; } @@ -280,9 +272,7 @@ out: /* - * dentry lease lock order is - * dentry->d_lock - * session->s_cap_lock + * caller should hold session s_mutex. */ void ceph_update_dentry_lease(struct dentry *dentry, struct ceph_mds_reply_lease *lease, @@ -327,15 +317,10 @@ void ceph_update_dentry_lease(struct dentry *dentry, dentry->d_time = ttl; /* (re)add to session lru */ - if (!is_new && di->lease_session) { - spin_lock(&di->lease_session->s_cap_lock); + if (!is_new && di->lease_session) list_del(&di->lease_item); - spin_unlock(&di->lease_session->s_cap_lock); - } di->lease_session = session; - spin_lock(&session->s_cap_lock); list_add(&di->lease_item, &session->s_dentry_leases); - spin_unlock(&session->s_cap_lock); spin_unlock(&dentry->d_lock); if (is_new) { @@ -359,9 +344,7 @@ void ceph_revoke_dentry_lease(struct dentry *dentry) di = ceph_dentry(dentry); if (di) { session = di->lease_session; - spin_lock(&session->s_cap_lock); list_del(&di->lease_item); - spin_unlock(&session->s_cap_lock); kfree(di); drop = 1; dentry->d_fsdata = 0; @@ -652,9 +635,7 @@ static struct ceph_inode_cap *__get_cap_for_mds(struct inode *inode, int mds) } /* - * lock ordering is - * inode->i_lock - * session->s_cap_lock + * caller shoudl hold session s_mutex. */ struct ceph_inode_cap *ceph_add_cap(struct inode *inode, struct ceph_mds_session *session, @@ -699,10 +680,8 @@ struct ceph_inode_cap *ceph_add_cap(struct inode *inode, /* add to session cap list */ cap->session = session; - spin_lock(&session->s_cap_lock); list_add(&cap->session_caps, &session->s_caps); session->s_nr_caps++; - spin_unlock(&session->s_cap_lock); } dout(10, "add_cap inode %p (%llx) got cap %xh now %xh seq %d from %d\n", @@ -729,6 +708,9 @@ int __ceph_caps_issued(struct ceph_inode_info *ci) return have; } +/* + * caller should hold i_lock and session s_mutex. + */ void __ceph_remove_cap(struct ceph_inode_cap *cap) { struct ceph_mds_session *session = cap->session; @@ -736,10 +718,8 @@ void __ceph_remove_cap(struct ceph_inode_cap *cap) dout(10, "__ceph_remove_cap %p from %p\n", cap, &cap->ci->vfs_inode); /* remove from session list */ - spin_lock(&session->s_cap_lock); list_del(&cap->session_caps); session->s_nr_caps--; - spin_unlock(&session->s_cap_lock); /* remove from inode list */ list_del(&cap->ci_caps); @@ -751,6 +731,9 @@ void __ceph_remove_cap(struct ceph_inode_cap *cap) kfree(cap); } +/* + * caller should hold session s_mutex. + */ void ceph_remove_cap(struct ceph_inode_cap *cap) { struct inode *inode = &cap->ci->vfs_inode; @@ -777,6 +760,7 @@ void ceph_check_caps(struct ceph_inode_info *ci) __u64 size, max_size; struct timespec mtime, atime; int mds; + struct ceph_mds_session *session = 0; /* if non-NULL, i hold s_mutex */ retry: spin_lock(&ci->vfs_inode.i_lock); @@ -787,18 +771,17 @@ retry: int revoking, dropping; cap = list_entry(p, struct ceph_inode_cap, ci_caps); + /* note: no side-effects allowed, until we take s_mutex */ + revoking = cap->implemented & ~cap->issued; + if (ci->i_wanted_max_size > ci->i_max_size && ci->i_wanted_max_size > ci->i_requested_max_size) goto ack; /* completed revocation? */ - revoking = cap->implemented & ~cap->issued; - dout(20, "cap %p issued %d impl %d revoking %d used %d\n", - cap, cap->issued, cap->implemented, revoking, used); if (revoking && (revoking && used) == 0) { dout(10, "completed revocation of %d\n", cap->implemented & ~cap->issued); - cap->implemented = cap->issued; goto ack; } @@ -807,7 +790,6 @@ retry: (ci->vfs_inode.i_size << 1) >= ci->i_max_size && (ci->i_reported_size << 1) < ci->i_max_size) { dout(10, "i_size approaching max_size\n"); - ci->i_reported_size = ci->vfs_inode.i_size; goto ack; } @@ -815,12 +797,38 @@ retry: continue; /* nothing extra, all good */ ack: + /* take s_mutex, one way or another */ + if (session && session != cap->session) { + dout(30, "oops, wrong session mutex\n"); + up(&session->s_mutex); + session = 0; + } + if (!session) { + session = cap->session; + if (down_trylock(&session->s_mutex) != 0) { + dout(10, "inverting session/inode locking\n"); + spin_unlock(&ci->vfs_inode.i_lock); + down(&session->s_mutex); + spin_unlock(&ci->vfs_inode.i_lock); + goto retry; + } + } + + /* ok */ dropping = cap->issued & ~wanted; + if (dropping & CEPH_CAP_RDCACHE) { + dout(20, "invalidating pages on %p\n", &ci->vfs_inode); + invalidate_mapping_pages(&ci->vfs_inode.i_data, 0, -1); + } cap->issued &= wanted; /* drop bits we don't want */ + if (revoking && (revoking && used) == 0) + cap->implemented = cap->issued; + keep = cap->issued; seq = cap->seq; size = ci->vfs_inode.i_size; + ci->i_reported_size = size; max_size = ci->i_wanted_max_size; ci->i_requested_max_size = max_size; mtime = ci->vfs_inode.i_mtime; @@ -834,17 +842,17 @@ retry: keep, wanted, seq, size, max_size, &mtime, &atime, mds); - if (dropping & CEPH_CAP_RDCACHE) { - dout(20, "invalidating pages on %p\n", &ci->vfs_inode); - invalidate_mapping_pages(&ci->vfs_inode.i_data, 0, -1); - } if (wanted == 0) iput(&ci->vfs_inode); /* removed cap */ + up(&session->s_mutex); goto retry; } /* okay */ spin_unlock(&ci->vfs_inode.i_lock); + + if (session) + up(&session->s_mutex); } void ceph_inode_set_size(struct inode *inode, loff_t size) @@ -1044,7 +1052,6 @@ void ceph_inode_writeback(struct work_struct *work) write_inode_now(&ci->vfs_inode, 0); } - void apply_truncate(struct inode *inode, loff_t size) { struct ceph_inode_info *ci = ceph_inode(inode); diff --git a/src/kernel/mds_client.c b/src/kernel/mds_client.c index c6b861ba13aa1..036688da3be39 100644 --- a/src/kernel/mds_client.c +++ b/src/kernel/mds_client.c @@ -287,7 +287,7 @@ __register_session(struct ceph_mds_client *mdsc, int mds) s->s_mds = mds; s->s_state = CEPH_MDS_SESSION_NEW; s->s_cap_seq = 0; - spin_lock_init(&s->s_cap_lock); + init_MUTEX(&s->s_mutex); INIT_LIST_HEAD(&s->s_caps); INIT_LIST_HEAD(&s->s_inode_leases); INIT_LIST_HEAD(&s->s_dentry_leases); @@ -544,12 +544,10 @@ static int resume_session(struct ceph_mds_client *mdsc, dout(10, "resume_session to mds%d\n", mds); /* note cap staleness */ - spin_lock(&session->s_cap_lock); list_for_each(cp, &session->s_caps) { cap = list_entry(cp, struct ceph_inode_cap, session_caps); cap->issued = cap->implemented = 0; } - spin_unlock(&session->s_cap_lock); session->s_state = CEPH_MDS_SESSION_RESUMING; @@ -603,19 +601,15 @@ static void remove_session_caps(struct ceph_mds_session *session) * we don't deadlock with __remove_cap in inode.c. */ dout(10, "remove_session_caps on %p\n", session); - spin_lock(&session->s_cap_lock); while (session->s_nr_caps > 0) { cap = list_entry(session->s_caps.next, struct ceph_inode_cap, session_caps); ci = cap->ci; dout(10, "removing cap %p, ci is %p, inode is %p\n", cap, ci, &ci->vfs_inode); - spin_unlock(&session->s_cap_lock); ceph_remove_cap(cap); - spin_lock(&session->s_cap_lock); } BUG_ON(session->s_nr_caps > 0); - spin_unlock(&session->s_cap_lock); } static void remove_session_leases(struct ceph_mds_session *session) @@ -625,16 +619,12 @@ static void remove_session_leases(struct ceph_mds_session *session) dout(10, "remove_session_leases on %p\n", session); - spin_lock(&session->s_cap_lock); - /* inodes */ while (!list_empty(&session->s_inode_leases)) { ci = list_entry(session->s_inode_leases.next, struct ceph_inode_info, i_lease_item); dout(10, "removing lease from inode %p\n", &ci->vfs_inode); - spin_unlock(&session->s_cap_lock); ceph_revoke_inode_lease(ci, ci->i_lease_mask); - spin_lock(&session->s_cap_lock); } /* dentries */ @@ -642,12 +632,8 @@ static void remove_session_leases(struct ceph_mds_session *session) di = list_entry(session->s_dentry_leases.next, struct ceph_dentry_info, lease_item); dout(10, "removing lease from dentry %p\n", di->dentry); - spin_unlock(&session->s_cap_lock); ceph_revoke_dentry_lease(di->dentry); - spin_lock(&session->s_cap_lock); } - - spin_unlock(&session->s_cap_lock); } void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, @@ -655,7 +641,7 @@ void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, { __u32 op; __u64 seq; - struct ceph_mds_session *session; + struct ceph_mds_session *session = 0; int mds = le32_to_cpu(msg->hdr.src.name.num); struct ceph_mds_session_head *h = msg->front.iov_base; @@ -669,6 +655,8 @@ void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, /* handle */ spin_lock(&mdsc->lock); session = __get_session(mdsc, mds); + down(&session->s_mutex); + dout(1, "handle_session %p op %d seq %llu\n", session, op, seq); switch (op) { case CEPH_SESSION_OPEN: @@ -713,13 +701,13 @@ void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc, put_session(session); spin_unlock(&mdsc->lock); -out: + up(&session->s_mutex); return; bad: dout(1, "corrupt session message, len %d, expected %d\n", (int)msg->front.iov_len, (int)sizeof(*h)); - goto out; + return; } @@ -916,6 +904,8 @@ void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg) BUG_ON(req->r_reply); spin_unlock(&mdsc->lock); + down(&req->r_session->s_mutex); + /* parse */ rinfo = &req->r_reply_info; err = parse_reply_info(msg, rinfo); @@ -951,6 +941,7 @@ void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg) } done: + up(&req->r_session->s_mutex); spin_lock(&mdsc->lock); if (err) { req->r_reply = ERR_PTR(err); @@ -1082,13 +1073,11 @@ void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds) session->s_state = CEPH_MDS_SESSION_RECONNECTING; /* estimate needed space */ - spin_lock(&session->s_cap_lock); len += session->s_nr_caps * sizeof(struct ceph_mds_cap_reconnect); len += session->s_nr_caps * (100); /* guess! */ dout(40, "estimating i need %d bytes for %d caps\n", len, session->s_nr_caps); - spin_unlock(&session->s_cap_lock); } else { dout(20, "no session for mds%d, will send short reconnect\n", mds); @@ -1096,6 +1085,9 @@ void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds) spin_unlock(&mdsc->lock); /* drop lock for duration */ + if (session) + down(&session->s_mutex); + retry: /* build reply */ reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, len, 0, 0, 0); @@ -1114,7 +1106,6 @@ retry: dout(10, "session %p state %d\n", session, session->s_state); /* traverse this session's caps */ - spin_lock(&session->s_cap_lock); ceph_encode_8(&p, 0); ceph_encode_32(&p, session->s_nr_caps); count = 0; @@ -1149,7 +1140,6 @@ retry: dput(dentry); count++; } - spin_unlock(&session->s_cap_lock); send: reply->front.iov_len = p - reply->front.iov_base; @@ -1167,6 +1157,10 @@ send: dout(0, "WARNING: reconnect on %p raced and lost?\n", session); } + } +out: + if (session) { + up(&session->s_mutex); put_session(session); } return; @@ -1177,7 +1171,6 @@ needmore: dout(30, "i guessed %d, and did %d of %d, retrying with %d\n", len, count, session->s_nr_caps, newlen); len = newlen; - spin_unlock(&session->s_cap_lock); ceph_msg_put(reply); goto retry; @@ -1186,6 +1179,7 @@ bad: derr(0, "error %d generating reconnect. what to do?\n", err); /* fixme */ BUG_ON(1); + goto out; } /* @@ -1302,11 +1296,15 @@ void ceph_mdsc_handle_filecaps(struct ceph_mds_client *mdsc, max_size = le64_to_cpu(h->max_size); /* find session */ + spin_lock(&mdsc->lock); session = __get_session(&client->mdsc, mds); + spin_unlock(&mdsc->lock); if (!session) { dout(10, "WTF, got filecap but no session for mds%d\n", mds); return; } + + down(&session->s_mutex); session->s_cap_seq++; /* lookup ino */ @@ -1318,19 +1316,12 @@ void ceph_mdsc_handle_filecaps(struct ceph_mds_client *mdsc, #endif dout(20, "op is %d, ino %llx %p\n", op, ino, inode); - if (inode && ceph_ino(inode) != ino) { - BUG_ON(sizeof(ino_t) >= sizeof(u64)); - dout(10, "UH OH, lame ceph ino %llx -> %lu ino_t hash collided?" - " inode is %llx\n", ino, inot, ceph_ino(inode)); - inode = 0; - } - if (!inode) { dout(10, "wtf, i don't have ino %lu=%llx? closing out cap\n", inot, ino); ceph_mdsc_send_cap_ack(mdsc, ino, 0, 0, seq, size, 0, 0, 0, mds); - return; + goto no_inode; } switch (op) { @@ -1353,7 +1344,10 @@ void ceph_mdsc_handle_filecaps(struct ceph_mds_client *mdsc, } iput(inode); +no_inode: + up(&session->s_mutex); return; + bad: dout(10, "corrupt filecaps message\n"); return; @@ -1414,13 +1408,17 @@ void ceph_mdsc_handle_lease(struct ceph_mds_client *mdsc, struct ceph_msg *msg) dname.len = msg->front.iov_len - sizeof(*h) - sizeof(__u32); /* find session */ + spin_lock(&mdsc->lock); session = __get_session(mdsc, mds); + spin_unlock(&mdsc->lock); if (!session) { dout(10, "WTF, got lease but no session for mds%d\n", mds); return; } session->s_cap_seq++; + down(&session->s_mutex); + /* lookup inode */ inot = ceph_ino_to_ino(ino); #if BITS_PER_LONG == 64 @@ -1463,6 +1461,7 @@ release: h->action = CEPH_MDS_LEASE_RELEASE; ceph_msg_get(msg); send_msg_mds(mdsc, msg, mds); + up(&session->s_mutex); return; bad: diff --git a/src/kernel/mds_client.h b/src/kernel/mds_client.h index 75a70244d13f9..1b57c5de1cc9f 100644 --- a/src/kernel/mds_client.h +++ b/src/kernel/mds_client.h @@ -55,6 +55,7 @@ struct ceph_mds_session { int s_mds; int s_state; __u64 s_cap_seq; /* cap message count/seq from mds */ + struct semaphore s_mutex; spinlock_t s_cap_lock; struct list_head s_caps; struct list_head s_inode_leases, s_dentry_leases; diff --git a/src/kernel/super.h b/src/kernel/super.h index 74e93a4814a7f..5d41a000df3bb 100644 --- a/src/kernel/super.h +++ b/src/kernel/super.h @@ -175,6 +175,7 @@ struct ceph_inode_info { unsigned long i_hashval; struct work_struct i_wb_work; /* writeback work */ + struct work_struct i_cap_work; /* cap work */ struct inode vfs_inode; /* at end */ };