}
}
+static struct ceph_mds_request *find_request_and_lock(struct ceph_mds_client *mdsc, __u64 tid)
+{
+ struct ceph_mds_request *req;
+ spin_lock(&mdsc->lock);
+ req = radix_tree_lookup(&mdsc->request_tree, tid);
+ if (!req) {
+ spin_unlock(&mdsc->lock);
+ return NULL;
+ }
+ get_request(req);
+ return req;
+}
+
static struct ceph_mds_request *new_request(struct ceph_msg *msg, int mds)
{
struct ceph_mds_request *req;
return ceph_mdsmap_get_random_mds(mdsc->mdsmap);
}
-static void register_session(struct ceph_mds_client *mdsc, int mds)
+/*
+ * create+register a new session for given mds.
+ * drop locks for kmalloc, check for races.
+ */
+static struct ceph_mds_session *__register_session(struct ceph_mds_client *mdsc, int mds)
{
struct ceph_mds_session *s;
- /* register */
- dout(10, "register_session mds%d\n", mds);
- if (mds >= mdsc->max_sessions) {
- struct ceph_mds_session **sa;
- /* realloc */
- dout(50, "register_session realloc to %d\n", mds+1);
- sa = kzalloc((mds+1) * sizeof(struct ceph_mds_session), GFP_KERNEL);
- BUG_ON(sa == NULL); /* i am lazy */
- if (mdsc->sessions) {
- memcpy(sa, mdsc->sessions,
- mdsc->max_sessions*sizeof(struct ceph_mds_session));
- kfree(mdsc->sessions);
- }
- mdsc->sessions = sa;
- mdsc->max_sessions = mds+1;
- }
+ spin_unlock(&mdsc->lock);
+
s = kmalloc(sizeof(struct ceph_mds_session), GFP_KERNEL);
s->s_mds = mds;
s->s_state = CEPH_MDS_SESSION_NEW;
s->s_nr_caps = 0;
atomic_set(&s->s_ref, 1);
init_completion(&s->s_completion);
- mdsc->sessions[mds] = s;
+
+ spin_lock(&mdsc->lock);
+
+ /* register */
+ dout(10, "register_session mds%d\n", mds);
+ if (mds >= mdsc->max_sessions) {
+ int newmax = 1 << get_count_order(mds+1);
+ struct ceph_mds_session **sa;
+
+ dout(50, "register_session realloc to %d\n", newmax);
+ spin_unlock(&mdsc->lock);
+ sa = kzalloc(newmax * sizeof(void*), GFP_KERNEL);
+ spin_lock(&mdsc->lock);
+ if (sa == NULL)
+ return ERR_PTR(-ENOMEM);
+ if (mdsc->max_sessions < newmax) {
+ if (mdsc->sessions) {
+ memcpy(sa, mdsc->sessions,
+ mdsc->max_sessions*sizeof(struct ceph_mds_session));
+ kfree(mdsc->sessions);
+ }
+ mdsc->sessions = sa;
+ mdsc->max_sessions = newmax;
+ } else {
+ kfree(sa); /* lost race */
+ }
+ }
+ if (mdsc->sessions[mds]) {
+ ceph_mdsc_put_session(s); /* lost race */
+ return mdsc->sessions[mds];
+ } else {
+ mdsc->sessions[mds] = s;
+ return s;
+ }
}
-static struct ceph_mds_session *get_session(struct ceph_mds_client *mdsc, int mds)
+static struct ceph_mds_session *__get_session(struct ceph_mds_client *mdsc, int mds)
{
struct ceph_mds_session *session;
- dout(10, "get_session %d max %d\n", mds, mdsc->max_sessions);
+ dout(10, "__get_session %d max %d\n", mds, mdsc->max_sessions);
if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == 0)
- register_session(mdsc, mds);
+ return NULL;
session = mdsc->sessions[mds];
atomic_inc(&session->s_ref);
}
}
+ session->s_state = CEPH_MDS_SESSION_OPENING;
+ spin_unlock(&mdsc->lock);
+
/* send connect message */
msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_cap_seq);
if (IS_ERR(msg))
return PTR_ERR(msg); /* fixme */
- session->s_state = CEPH_MDS_SESSION_OPENING;
send_msg_mds(mdsc, msg, mds);
/* wait for session to open (or fail, or close) */
- spin_unlock(&mdsc->lock);
dout(30, "open_session waiting on session %p\n", session);
wait_for_completion(&session->s_completion);
dout(30, "open_session done waiting on session %p, state %d\n",
session, session->s_state);
+
spin_lock(&mdsc->lock);
return 0;
}
}
spin_unlock(&session->s_cap_lock);
+ /*
+ * fixme: drop mdsc->lock?
+ */
+
/* send request_resume message */
msg = create_session_msg(CEPH_SESSION_REQUEST_RESUME, session->s_cap_seq);
if (IS_ERR(msg))
seq = le64_to_cpu(h->seq);
/* handle */
- session = get_session(mdsc, mds);
+ session = __get_session(mdsc, mds);
dout(1, "handle_session op %d seq %llu\n", op, seq);
spin_lock(&mdsc->lock);
switch (op) {
dout(30, "do_request chose mds%d\n", mds);
/* get session */
- session = get_session(mdsc, mds);
+ session = __get_session(mdsc, mds);
+ if (!session)
+ session = __register_session(mdsc, mds);
dout(30, "do_request session %p state %d\n", session, session->s_state);
/* open? */
rhead = req->r_request->front.iov_base;
rhead->retry_attempt = cpu_to_le32(req->r_attempts-1);
rhead->oldest_client_tid = cpu_to_le64(get_oldest_tid(mdsc));
- send_msg_mds(mdsc, req->r_request, mds);
- /* wait */
+ /* send and wait */
spin_unlock(&mdsc->lock);
+ send_msg_mds(mdsc, req->r_request, mds);
wait_for_completion(&req->r_completion);
spin_lock(&mdsc->lock);
__unregister_request(mdsc, req);
spin_unlock(&mdsc->lock);
put_request(req);
+
if ((err = ceph_mdsc_parse_reply_info(reply, rinfo)) < 0) {
ceph_mdsc_put_session(session);
return err;
}
-
dout(30, "do_request done on %p result %d tracelen %d\n", msg,
rinfo->head->result, rinfo->trace_nr);
+
if (psession)
*psession = session;
else
tid = le64_to_cpu(head->tid);
/* pass to blocked caller */
- spin_lock(&mdsc->lock);
- req = radix_tree_lookup(&mdsc->request_tree, tid);
+ req = find_request_and_lock(mdsc, tid);
if (!req) {
- spin_unlock(&mdsc->lock);
dout(1, "got reply on unknown tid %llu\n", tid);
- goto done;
+ } else {
+ BUG_ON(req->r_reply);
+ req->r_reply = msg;
+ spin_unlock(&mdsc->lock);
+
+ ceph_msg_get(msg);
+ complete(&req->r_completion);
+ put_request(req);
}
- get_request(req);
- spin_unlock(&mdsc->lock);
-
- /* FIXME: locking on request? */
- BUG_ON(req->r_reply);
- req->r_reply = msg;
- ceph_msg_get(msg);
- complete(&req->r_completion);
- put_request(req);
-
done:
return;
}
goto bad;
/* handle */
- spin_lock(&mdsc->lock);
- req = radix_tree_lookup(&mdsc->request_tree, tid);
- if (req) get_request(req);
- if (!req) {
- spin_unlock(&mdsc->lock);
+ req = find_request_and_lock(mdsc, tid);
+ if (!req)
return; /* dup reply? */
- }
/* do we have a session with the dest mds? */
if (next_mds < mdsc->max_sessions &&
/*
* send an MClientReconnect to a recovering mds
+ *
+ * FIXME: needs to drop mdsc->lock ...
*/
void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
{
dout(10, "send_mds_reconnect mds%d\n", mds);
/* find session */
- session = get_session(mdsc, mds);
+ session = __get_session(mdsc, mds);
if (!session) {
dout(20, "no session for mds%d, sending short reconnect\n", mds);
reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, len, 0, 0, 0);
if (p + pathlen + 4 + sizeof(struct ceph_mds_cap_reconnect) > end) {
/* der, realloc front */
/*
- * this is broken (kmalloc under spinlock). should probably drop
+ * fixme: this is broken (kmalloc under spinlock). should probably drop
* lock, and retry with higher initial estimate?
*/
int off = end-p;
size = le64_to_cpu(h->seq);
/* find session */
- session = get_session(&client->mdsc, mds);
+ session = __get_session(&client->mdsc, mds);
if (!session) {
dout(10, "WTF, got filecap msg but no session for mds%d\n", mds);
return;
for (i=0; i<ci->i_nr_caps; i++) {
cap = &ci->i_caps[i];
- session = get_session(mdsc, cap->mds);
+ session = __get_session(mdsc, cap->mds);
BUG_ON(!session);
cap->caps &= wanted; /* drop caps we don't want */