struct ceph_mds_request *req;
req = kmalloc(sizeof(*req), GFP_KERNEL);
-
+ req->r_tid = ++mdsc->last_tid;
req->r_request = msg;
- ceph_msg_get(msg); /* grab reference */
req->r_reply = 0;
req->r_num_mds = 0;
req->r_attempts = 0;
req->r_num_fwd = 0;
req->r_resend_mds = mds;
atomic_set(&req->r_ref, 2); /* one for request_tree, one for caller */
+ init_completion(&req->r_completion);
- req->r_tid = ++mdsc->last_tid;
radix_tree_insert(&mdsc->request_tree, req->r_tid, (void*)req);
-
+ ceph_msg_get(msg); /* grab reference */
return req;
}
if (mds >= mdsc->max_sessions) {
struct ceph_mds_session **sa;
/* realloc */
- dout(50, "mdsc register_session realloc to %d\n", mds);
- sa = kzalloc(mds * sizeof(struct ceph_mds_session), GFP_KERNEL);
+ dout(50, "mdsc register_session realloc to %d\n", mds+1);
+ sa = kzalloc((mds+1) * sizeof(struct ceph_mds_session), GFP_KERNEL);
BUG_ON(sa == NULL); /* i am lazy */
if (mdsc->sessions) {
memcpy(sa, mdsc->sessions,
kfree(mdsc->sessions);
}
mdsc->sessions = sa;
+ mdsc->max_sessions = mds+1;
}
s = kmalloc(sizeof(struct ceph_mds_session), GFP_KERNEL);
s->s_state = 0;
s->s_cap_seq = 0;
- init_completion(&s->s_completion);
atomic_set(&s->s_ref, 1);
+ init_completion(&s->s_completion);
mdsc->sessions[mds] = s;
}
static void wait_for_new_map(struct ceph_mds_client *mdsc)
{
+ dout(30, "wait_for_new_map enter\n");
if (mdsc->last_requested_map < mdsc->mdsmap->m_epoch)
ceph_monc_request_mdsmap(&mdsc->client->monc, mdsc->mdsmap->m_epoch);
wait_for_completion(&mdsc->map_waiters);
+ dout(30, "wait_for_new_map exit\n");
}
/* exported functions */
struct ceph_msg *req;
struct ceph_client_request_head *head;
void *p, *end;
+ int pathlen = 2*(sizeof(ino1) + sizeof(__u32));
+ if (path1) pathlen += strlen(path1);
+ if (path2) pathlen += strlen(path2);
req = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST,
- sizeof(struct ceph_client_request_head) +
- sizeof(ino1)*2 + sizeof(__u32)*2 + strlen(path1) + strlen(path2),
+ sizeof(struct ceph_client_request_head) + pathlen,
0, 0);
if (IS_ERR(req))
return req;
}
/* wait */
+ dout(30, "mdsc_do_request 1\n");
spin_unlock(&mdsc->lock);
+ dout(30, "mdsc_do_request 2\n");
wait_for_completion(&req->r_completion);
+ dout(30, "mdsc_do_request 3\n");
if (!req->r_reply) {
+ dout(30, "mdsc_do_request 4\n");
spin_lock(&mdsc->lock);
+ dout(30, "mdsc_do_request 5\n");
goto retry;
}
reply = req->r_reply;
+ dout(30, "mdsc_do_request 6\n");
spin_lock(&mdsc->lock);
+ dout(30, "mdsc_do_request 7\n");
unregister_request(mdsc, req);
spin_unlock(&mdsc->lock);
struct ceph_client_reply_head *head;
int ret;
+ dout(30, "mdsc do 1\n");
req = ceph_mdsc_create_request_msg(mdsc, op, ino1, path1, ino2, path2);
if (IS_ERR(req))
return PTR_ERR(req);
+ dout(30, "mdsc do 2\n");
reply = ceph_mdsc_do_request(mdsc, req, -1);
if (IS_ERR(reply))
return PTR_ERR(reply);
-
+ dout(30, "mdsc do 3\n");
head = reply->front.iov_base;
ret = head->result;
-
+ dout(30, "mdsc do 4\n");
ceph_msg_put(reply);
return ret;
}
return NULL;
con->msgr = msgr;
+ set_bit(NEW, &con->state);
+ atomic_set(&con->nref, 1);
INIT_LIST_HEAD(&con->list_all);
INIT_LIST_HEAD(&con->list_bucket);
+
+ spin_lock_init(&con->out_queue_lock);
INIT_LIST_HEAD(&con->out_queue);
INIT_LIST_HEAD(&con->out_sent);
- spin_lock_init(&con->lock);
- set_bit(NEW, &con->state);
- INIT_WORK(&con->rwork, try_read); /* setup work structure */
- INIT_WORK(&con->swork, try_write); /* setup work structure */
+ INIT_WORK(&con->rwork, try_read);
+ INIT_WORK(&con->swork, try_write);
- atomic_inc(&con->nref);
return con;
}
struct ceph_messenger *msgr;
int ret = 1;
- dout(30, "try_write start\n");
con = container_of(work, struct ceph_connection, swork);
msgr = con->msgr;
+ dout(30, "try_write start %p state %d\n", con, con->state);
+
+retry:
+ if (test_and_set_bit(WRITING, &con->state) != 0) {
+ dout(30, "try_write connection already writing\n");
+ return;
+ }
+ clear_bit(WRITEABLE, &con->state);
more:
dout(30, "try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
+ /* initiate connect? */
+ if (test_and_clear_bit(NEW, &con->state)) {
+ prepare_write_connect(msgr, con);
+ set_bit(CONNECTING, &con->state);
+ dout(5, "try_write initiating connect on %p new state %u\n", con, con->state);
+ ret = ceph_tcp_connect(con);
+ dout(5, "try_write initiated connect\n");
+ if (ret < 0) {
+ /* fault */
+ derr(1, "connect error, FIXME\n");
+ goto done;
+ }
+ }
+ /*if (test_bit(CONNECTING, &con->state)) {
+ dout(30, "try_write still connecting, doing nothing for now\n");
+ goto done;
+ }
+ */
+
/* kvec data queued? */
if (con->out_kvec_left) {
ret = write_partial_kvec(con);
+ if (ret == 0)
+ goto done;
if (test_and_clear_bit(REJECTING, &con->state)) {
dout(30, "try_write done rejecting, state %u, closing\n", con->state);
/* FIXME do something else here, pbly? */
set_bit(CLOSED, &con->state);
put_connection(con);
}
- if (ret <= 0) {
+ if (ret < 0) {
/* TBD: handle error; return for now */
con->error = ret;
goto done; /* error */
}
/* anything else pending? */
+ spin_lock(&con->out_queue_lock);
if (con->in_seq > con->in_seq_acked) {
prepare_write_ack(con);
- goto more;
- }
- if (!list_empty(&con->out_queue)) {
+ } else if (!list_empty(&con->out_queue)) {
prepare_write_message(con);
- goto more;
+ } else {
+ /* hmm, nothing to do! No more writes pending? */
+ dout(30, "try_write nothing else to write.\n");
+ clear_bit(WRITING, &con->state); /* clear this first */
+ clear_bit(WRITE_PENDING, &con->state); /* and this second, to avoid a race. */
+ spin_unlock(&con->out_queue_lock);
+ return;
}
-
- /* hmm, nothing to do! No more writes pending? */
- dout(30, "try_write nothing else to write\n");
- clear_bit(WRITE_PENDING, &con->state);
+ spin_unlock(&con->out_queue_lock);
+ goto more;
done:
dout(30, "try_write done\n");
+ clear_bit(WRITING, &con->state);
+
+ /*
+ * See if we became WRITEABLE again to avoid race against socket.
+ * Otherwise, this would be bad:
+ * A B
+ * - enter try_write, do some work
+ * - socket fills, we get -EAGAIN or whatever
+ * - socket becomes writeable again, work is queued
+ * - new try_write sees WRITING bit, exits
+ * - original try_write clears WRITING bit
+ */
+ if (test_bit(WRITEABLE, &con->state)) {
+ dout(30, "try_write writeable flag got set again, looping just in case\n");
+ goto retry;
+ }
return;
}
spin_lock(&con->msgr->con_lock);
existing = get_connection(con->msgr, &con->peer_addr);
if (existing) {
- spin_lock(&existing->lock);
+ //spin_lock(&existing->lock);
/* replace existing connection? */
if ((test_bit(CONNECTING, &existing->state) &&
compare_addr(&con->msgr->inst.addr, &con->peer_addr)) ||
set_bit(REJECTING, &con->state);
con->connect_seq = existing->connect_seq; /* send this with the reject */
}
- spin_unlock(&existing->lock);
+ //spin_unlock(&existing->lock);
put_connection(existing);
} else {
add_connection(con->msgr, con);
struct ceph_connection *con;
struct ceph_messenger *msgr;
+ dout(20, "Entering try_read\n");
con = container_of(work, struct ceph_connection, rwork);
- spin_lock(&con->lock);
msgr = con->msgr;
- dout(20, "Entered try_read\n");
+
+retry:
+ if (test_and_set_bit(READING, &con->state)) {
+ dout(20, "try_read already reading\n");
+ return;
+ }
+ clear_bit(READABLE, &con->state);
more:
/*
dout(20, "try_read accepting\n");
ret = read_accept_partial(con);
if (ret <= 0) goto done;
- /* accepted */
- process_accept(con);
+ process_accept(con); /* accepted */
goto more;
}
if (test_bit(CONNECTING, &con->state)) {
bad:
BUG_ON(1); /* shouldn't get here */
done:
- con->error = ret;
- spin_unlock(&con->lock);
+ clear_bit(READING, &con->state);
+ if (test_bit(READABLE, &con->state)) {
+ dout(30, "try_read readable flag set again, looping\n");
+ goto retry;
+ }
dout(20, "Exited try_read\n");
return;
}
ntohs(msg->hdr.dst.addr.ipaddr.sin_port));
}
- spin_lock(&con->lock);
-
- /* initiate connect? */
- dout(5, "ceph_msg_send connection %p state is %u\n", con, con->state);
- if (test_bit(NEW, &con->state)) {
- prepare_write_connect(msgr, con);
- spin_unlock(&con->lock); /* hrm */
- dout(5, "ceph_msg_send initiating connect on %p new state %u\n", con, con->state);
- ret = ceph_tcp_connect(con);
- dout(5, "ceph_msg_send done initiating connect on %p new state %u\n", con, con->state);
- spin_lock(&con->lock);
- if (ret < 0) {
- derr(1, "connection failure to peer %x:%d\n",
- ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr),
- ntohs(msg->hdr.dst.addr.ipaddr.sin_port));
- remove_connection(msgr, con);
- goto out;
- }
- set_bit(CONNECTING, &con->state);
- }
-
/* queue */
+ spin_lock(&con->out_queue_lock);
msg->hdr.seq = ++con->out_seq;
dout(1, "ceph_msg_send queuing %p seq %u for %s%d on %p\n", msg, msg->hdr.seq,
ceph_name_type_str(msg->hdr.dst.name.type), msg->hdr.dst.name.num, con);
ceph_msg_get(msg);
list_add_tail(&msg->list_head, &con->out_queue);
-
- if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
+ spin_unlock(&con->out_queue_lock);
+
+ if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) {
+ dout(30, "ceph_msg_send queuing new swork on %p\n", con);
queue_work(send_wq, &con->swork);
+ dout(30, "ceph_msg_send queued\n");
+ }
-out:
- spin_unlock(&con->lock);
put_connection(con);
+ dout(30, "ceph_msg_send done\n");
return ret;
}
};
-/* current state of connection */
-#define NEW 1
-#define CONNECTING 2
-#define ACCEPTING 3
-#define OPEN 4
-#define WRITE_PENDING 5
-#define REJECTING 6
-#define CLOSED 7
+/* ceph_connection state bit flags */
+#define NEW 0
+#define CONNECTING 1
+#define ACCEPTING 2
+#define OPEN 3
+#define WRITE_PENDING 4 /* we have data to send */
+#define WRITEABLE 5 /* set when socket becomes writeable */
+#define WRITING 6 /* provides mutual exclusion, protecting out_kvec, etc. */
+#define READABLE 7 /* set when socket gets new data */
+#define READING 8 /* provides mutual exclusion, protecting in_* */
+#define REJECTING 9
+#define CLOSED 10
struct ceph_connection {
struct ceph_messenger *msgr;
__u32 state; /* connection state */
atomic_t nref;
- spinlock_t lock; /* connection lock */
struct list_head list_all; /* msgr->con_all */
struct list_head list_bucket; /* msgr->con_open or con_accepting */
__u32 peer_connect_seq;
/* out queue */
+ spinlock_t out_queue_lock; /* protects out_queue, out_sent, out_seq */
struct list_head out_queue;
+ struct list_head out_sent; /* sending/sent but unacked; resend if connection drops */
+
struct ceph_msg_header out_hdr;
struct kvec out_kvec[4],
*out_kvec_cur;
struct ceph_msg *out_msg;
struct ceph_msg_pos out_msg_pos;
- struct list_head out_sent; /* sending/sent but unacked; resend if connection drops */
/* partially read message contents */
char in_tag; /* READY (accepting, or no in-progress read) or ACK or MSG */
static __inline__ int ceph_encode_filepath(void **p, void *end, ceph_ino_t ino, const char *path)
{
- __u32 len = strlen(path);
+ __u32 len = path ? strlen(path):0;
BUG_ON(*p + sizeof(ino) + sizeof(len) + len > end);
ceph_encode_64(p, end, ino);
ceph_encode_32(p, end, len);
- memcpy(*p, path, len);
+ if (len) memcpy(*p, path, len);
*p += len;
return 0;
}