* Identify the appropriate session, inode, and call the right handler
* based on the cap op.
*/
-void ceph_handle_caps(struct ceph_mds_client *mdsc,
+void ceph_handle_caps(struct ceph_mds_session *session,
struct ceph_msg *msg)
{
+ struct ceph_mds_client *mdsc = session->s_mdsc;
struct super_block *sb = mdsc->client->sb;
- struct ceph_mds_session *session;
struct inode *inode;
struct ceph_cap *cap;
struct ceph_mds_caps *h;
size = le64_to_cpu(h->size);
max_size = le64_to_cpu(h->max_size);
- /* find session */
- mutex_lock(&mdsc->mutex);
- session = __ceph_lookup_mds_session(mdsc, mds);
- mutex_unlock(&mdsc->mutex);
- if (!session) {
- dout("WTF, got cap but no session for mds%d\n", mds);
- return;
- }
-
mutex_lock(&session->s_mutex);
session->s_seq++;
dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
* This preserves the logical ordering of replies, capabilities, etc., sent
* by the MDS as they are applied to our local cache.
*/
-static void handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
{
+ struct ceph_mds_client *mdsc = session->s_mdsc;
struct ceph_mds_request *req;
struct ceph_mds_reply_head *head = msg->front.iov_base;
struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */
dout("handle_reply %p\n", req);
mds = le32_to_cpu(msg->hdr.src.name.num);
+ /* correct session? */
+ if (!req->r_session && req->r_session != session) {
+ pr_err("ceph_mdsc_handle_reply got %llu on session mds%d"
+ " not mds%d\n", tid, session->s_mds,
+ req->r_session ? req->r_session->s_mds : -1);
+ mutex_unlock(&mdsc->mutex);
+ goto out;
+ }
+
/* dup? */
if ((req->r_got_unsafe && !head->safe) ||
(req->r_got_safe && head->safe)) {
}
}
- if (req->r_session && req->r_session->s_mds != mds) {
- ceph_put_mds_session(req->r_session);
- req->r_session = __ceph_lookup_mds_session(mdsc, mds);
- }
- if (req->r_session == NULL) {
- pr_err("ceph_mdsc_handle_reply got %llu, but no session for"
- " mds%d\n", tid, mds);
- mutex_unlock(&mdsc->mutex);
- goto out;
- }
BUG_ON(req->r_reply);
if (!head->safe) {
mutex_unlock(&mdsc->mutex);
- mutex_lock(&req->r_session->s_mutex);
+ mutex_lock(&session->s_mutex);
/* parse */
rinfo = &req->r_reply_info;
}
add_cap_releases(mdsc, req->r_session, -1);
- mutex_unlock(&req->r_session->s_mutex);
+ mutex_unlock(&session->s_mutex);
/* kick calling process */
complete_request(mdsc, req);
/*
* handle a mds session control message
*/
-static void handle_session(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+static void handle_session(struct ceph_mds_session *session,
+ struct ceph_msg *msg)
{
+ struct ceph_mds_client *mdsc = session->s_mdsc;
u32 op;
u64 seq;
- struct ceph_mds_session *session = NULL;
int mds;
struct ceph_mds_session_head *h = msg->front.iov_base;
int wake = 0;
seq = le64_to_cpu(h->seq);
mutex_lock(&mdsc->mutex);
- session = __ceph_lookup_mds_session(mdsc, mds);
- if (session && mdsc->mdsmap)
- /* FIXME: this ttl calculation is generous */
- session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
- mutex_unlock(&mdsc->mutex);
-
- if (!session) {
- if (op != CEPH_SESSION_OPEN) {
- dout("handle_session no session for mds%d\n", mds);
- return;
- }
- dout("handle_session creating session for mds%d\n", mds);
- session = register_session(mdsc, mds);
- }
+ /* FIXME: this ttl calculation is generous */
+ session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
+ mutex_unlock(&mdsc->mutex);
mutex_lock(&session->s_mutex);
__wake_requests(mdsc, &session->s_waiting);
mutex_unlock(&mdsc->mutex);
}
- ceph_put_mds_session(session);
return;
bad:
ceph_mdsc_handle_map(mdsc, msg);
break;
case CEPH_MSG_CLIENT_SESSION:
- handle_session(mdsc, msg);
+ handle_session(s, msg);
break;
case CEPH_MSG_CLIENT_REPLY:
- handle_reply(mdsc, msg);
+ handle_reply(s, msg);
break;
case CEPH_MSG_CLIENT_REQUEST_FORWARD:
handle_forward(mdsc, msg);
break;
case CEPH_MSG_CLIENT_CAPS:
- ceph_handle_caps(mdsc, msg);
+ ceph_handle_caps(s, msg);
break;
case CEPH_MSG_CLIENT_SNAP:
ceph_handle_snap(mdsc, msg);