atomic_inc(&req->r_ref);
}
-static void drop_request_session_attempt_refs(struct ceph_mds_request *req)
+static void put_request_sessions(struct ceph_mds_request *req)
{
- int i;
- for (i = 0; i < req->r_num_mds; i++)
- put_session(req->r_mds[i]);
- req->r_num_mds = 0;
+ if (req->r_session) {
+ put_session(req->r_session);
+ req->r_session = 0;
+ }
+ if (req->r_fwd_session) {
+ put_session(req->r_fwd_session);
+ req->r_fwd_session = 0;
+ }
}
void ceph_mdsc_put_request(struct ceph_mds_request *req)
ceph_msg_put(req->r_reply);
destroy_reply_info(&req->r_reply_info);
}
- if (req->r_session)
- put_session(req->r_session);
if (req->r_last_inode)
iput(req->r_last_inode);
if (req->r_last_dentry)
dput(req->r_last_dentry);
- drop_request_session_attempt_refs(req);
+ put_request_sessions(req);
kfree(req);
}
}
req->r_fmode = 0;
req->r_cap = 0;
req->r_session = 0;
- req->r_num_mds = 0;
+ req->r_fwd_session = 0;
req->r_attempts = 0;
req->r_num_fwd = 0;
req->r_resend_mds = -1;
}
}
+/*
+ * kick outstanding requests
+ */
+void kick_requests(struct ceph_mds_client *mdsc, int mds, int all)
+{
+ struct ceph_mds_request *reqs[10];
+ u64 nexttid = 0;
+ int i, got;
+
+ dout(20, "kick_requests mds%d\n", mds);
+ while (nexttid < mdsc->last_tid) {
+ got = radix_tree_gang_lookup(&mdsc->request_tree,
+ (void **)&reqs, nexttid, 10);
+ if (got == 0)
+ break;
+ nexttid = reqs[got-1]->r_tid + 1;
+ for (i = 0; i < got; i++) {
+ if ((reqs[i]->r_session &&
+ reqs[i]->r_session->s_mds == mds) ||
+ (all && reqs[i]->r_fwd_session &&
+ reqs[i]->r_fwd_session->s_mds == mds)) {
+ dout(10, " kicking req %llu\n", reqs[i]->r_tid);
+ put_request_sessions(reqs[i]);
+ complete(&reqs[i]->r_completion);
+ }
+ }
+ }
+}
+
void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc,
struct ceph_msg *msg)
{
break;
case CEPH_SESSION_CLOSE:
- if (session->s_cap_seq == seq) {
- dout(2, "session close from mds%d\n", mds);
- complete(&session->s_completion); /* for good measure */
- __unregister_session(mdsc, mds);
- } else {
- dout(2, "ignoring session close from mds%d, "
- "seq %llu < my seq %llu\n",
- le32_to_cpu(msg->hdr.src.name.num),
- seq, session->s_cap_seq);
- }
+ dout(2, "session close from mds%d\n", mds);
+ complete(&session->s_completion); /* for good measure */
+ __unregister_session(mdsc, mds);
remove_session_caps(session);
remove_session_leases(session);
complete(&mdsc->session_close_waiters);
+ kick_requests(mdsc, mds, 0); /* cur only */
break;
case CEPH_SESSION_RENEWCAPS:
}
/* make request? */
- if (req->r_num_mds == 0) {
+ if (req->r_from_time == 0) {
req->r_from_time = jiffies;
dout(30, "do_request from_time %lu\n", req->r_from_time);
}
- BUG_ON(req->r_num_mds >= 2);
- req->r_mds[req->r_num_mds++] = session;
+ BUG_ON(req->r_session);
+ req->r_session = session;
req->r_resend_mds = -1; /* forget any specific mds hint */
req->r_attempts++;
rhead = req->r_request->front.iov_base;
/* send and wait */
spin_unlock(&mdsc->lock);
dout(10, "do_request %p r_expects_cap=%d\n", req, req->r_expects_cap);
+ req->r_request = ceph_msg_maybe_dup(req->r_request);
ceph_msg_get(req->r_request);
send_msg_mds(mdsc, req->r_request, mds);
wait_for_completion(&req->r_completion);
ceph_msg_put(req->r_request);
req->r_request = 0;
- drop_request_session_attempt_refs(req);
err = le32_to_cpu(req->r_reply_info.head->result);
dout(30, "do_request done on %p result %d\n", req, err);
}
dout(10, "handle_reply %p r_expects_cap=%d\n", req, req->r_expects_cap);
mds = le32_to_cpu(msg->hdr.src.name.num);
- req->r_session = __get_session(mdsc, mds);
- BUG_ON(req->r_session == 0);
+ if (req->r_session && req->r_session->s_mds != mds) {
+ put_session(req->r_session);
+ req->r_session = __get_session(mdsc, mds);
+ }
+ if (req->r_session == 0) {
+ derr(1, "got reply on %llu, but no session for mds%d\n",
+ tid, mds);
+ spin_unlock(&mdsc->lock);
+ return;
+ }
BUG_ON(req->r_reply);
spin_unlock(&mdsc->lock);
int err = -EINVAL;
void *p = msg->front.iov_base;
void *end = p + msg->front.iov_len;
- int frommds = le32_to_cpu(msg->hdr.src.name.num);
+ int from_mds = le32_to_cpu(msg->hdr.src.name.num);
/* decode */
ceph_decode_need(&p, end, sizeof(__u64)+2*sizeof(__u32), bad);
if (fwd_seq > req->r_num_fwd) {
req->r_num_fwd = fwd_seq;
req->r_resend_mds = next_mds;
- drop_request_session_attempt_refs(req);
- req->r_num_mds = 1;
- req->r_mds[0] = __get_session(mdsc, frommds);
+ put_request_sessions(req);
+ req->r_session = __get_session(mdsc, next_mds);
+ req->r_fwd_session = __get_session(mdsc, from_mds);
}
spin_unlock(&mdsc->lock);
} else {
/* no, resend. */
/* forward race not possible; mds would drop */
BUG_ON(fwd_seq <= req->r_num_fwd);
- drop_request_session_attempt_refs(req);
+ put_request_sessions(req);
req->r_resend_mds = next_mds;
spin_unlock(&mdsc->lock);
complete(&req->r_completion);
-/*
- * kick outstanding requests
- */
-void kick_requests(struct ceph_mds_client *mdsc, int mds)
-{
- struct ceph_mds_request *reqs[10];
- u64 nexttid = 0;
- int i, got;
-
- dout(20, "kick_requests mds%d\n", mds);
- while (nexttid < mdsc->last_tid) {
- got = radix_tree_gang_lookup(&mdsc->request_tree,
- (void **)&reqs, nexttid, 10);
- if (got == 0)
- break;
- nexttid = reqs[got-1]->r_tid + 1;
- for (i = 0; i < got; i++) {
- if ((reqs[i]->r_num_mds >= 1 &&
- reqs[i]->r_mds[0]->s_mds == mds) ||
- (reqs[i]->r_num_mds >= 2 &&
- reqs[i]->r_mds[1]->s_mds == mds)) {
- dout(10, " kicking req %llu\n", reqs[i]->r_tid);
- /* FIXME */
- complete(&reqs[i]->r_completion);
- }
- }
- }
-}
-
/*
* send an reconnect to a recovering mds
*/
list_for_each(cp, &session->s_caps) {
cap = list_entry(cp, struct ceph_inode_cap, session_caps);
ci = cap->ci;
+ dentry = d_find_alias(&ci->vfs_inode);
+ if (dentry == NULL)
+ continue;
ceph_decode_need(&p, end, sizeof(u64) +
sizeof(struct ceph_mds_cap_reconnect),
needmore);
ceph_encode_timespec(&rec->mtime, &ci->vfs_inode.i_mtime);
ceph_encode_timespec(&rec->atime, &ci->vfs_inode.i_atime);
spin_unlock(&ci->vfs_inode.i_lock);
- dentry = d_find_alias(&ci->vfs_inode);
- if (dentry == NULL)
- continue;
path = ceph_build_dentry_path(dentry, &pathlen);
if (IS_ERR(path)) {
err = PTR_ERR(path);
__unregister_session(mdsc, i);
break;
case CEPH_MDS_SESSION_OPEN:
- kick_requests(mdsc, i);
+ kick_requests(mdsc, i, 1); /* cur or forwarder */
break;
}
}