struct ceph_connection *con;
struct ceph_messenger *msgr;
- printk(KERN_INFO "Entered ceph_data_ready \n");
-
if (sk->sk_state == TCP_LISTEN) {
msgr = (struct ceph_messenger *)sk->sk_user_data;
+ dout(30, "ceph_data_ready listener %p\n", msgr);
queue_work(recv_wq, &msgr->awork);
} else {
con = (struct ceph_connection *)sk->sk_user_data;
+ dout(30, "ceph_data_ready connection %p state = %u, queuing rwork\n",
+ con, con->state);
queue_work(recv_wq, &con->rwork);
}
}
{
struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data;
- printk(KERN_INFO "Entered ceph_write_space state = %u\n",con->state);
- if (test_bit(WRITE_PEND, &con->state)) {
- printk(KERN_INFO "WRITE_PEND set in connection\n");
+ dout(30, "ceph_write_space %p state = %u\n", con, con->state);
+ if (test_bit(WRITE_PENDING, &con->state)) {
+ dout(30, "ceph_write_space %p queueing write work\n", con);
queue_work(send_wq, &con->swork);
}
}
static void ceph_state_change(struct sock *sk)
{
struct ceph_connection *con = (struct ceph_connection *)sk->sk_user_data;
- printk(KERN_INFO "Entered ceph_state_change state = %u\n", con->state);
+ dout(30, "ceph_state_change %p state = %u\n", con, con->state);
if (sk->sk_state == TCP_ESTABLISHED) {
- if (test_and_clear_bit(CONNECTING, &con->state) ||
- test_bit(ACCEPTING, &con->state))
- set_bit(OPEN, &con->state);
- ceph_write_space(sk);
+ /*if (test_bit(CONNECTING, &con->state) ||
+ test_bit(ACCEPTING, &con->state)) {*/
+ dout(30, "ceph_state_change %p socket established, queuing swork\n", con);
+ queue_work(send_wq, &con->swork);
+ /*}*/
}
}
int ret;
struct sockaddr *paddr = (struct sockaddr *)&con->peer_addr.ipaddr;
- set_bit(CONNECTING, &con->state);
-
ret = sock_create_kern(AF_INET, SOCK_STREAM, IPPROTO_TCP, &con->sock);
if (ret < 0) {
derr(1, "ceph_tcp_connect sock_create_kern error: %d\n", ret);
goto done;
}
- /* setup callbacks */
set_sock_callbacks(con->sock, (void *)con);
-
- ret = con->sock->ops->connect(con->sock, paddr,
+
+ ret = con->sock->ops->connect(con->sock, paddr,
sizeof(struct sockaddr_in), O_NONBLOCK);
- if (ret == -EINPROGRESS) return 0;
+ if (ret == -EINPROGRESS)
+ return 0;
if (ret < 0) {
/* TBD check for fatal errors, retry if not fatal.. */
derr(1, "ceph_tcp_connect kernel_connect error: %d\n", ret);
derr(0, "failed to getsockname: %d\n", ret);
goto err;
}
- dout(0, "ceph_tcp_listen on %x:%d\n",
- ntohl(myaddr->sin_addr.s_addr),
- ntohs(myaddr->sin_port));
+ dout(0, "ceph_tcp_listen on port %d\n", ntohs(myaddr->sin_port));
ret = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
(char *)&optval, sizeof(optval));
goto err;
}
- set_bit(ACCEPTING, &con->state);
done:
return ret;
err:
struct msghdr msg = {.msg_flags = 0};
int rlen = 0; /* length read */
- printk(KERN_INFO "entered krevmsg\n");
+ dout(30, "ceph_tcp_recvmsg %p len %d\n", sock, (int)len);
msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
-
/* receive one kvec for now... */
rlen = kernel_recvmsg(sock, &msg, &iov, 1, len, msg.msg_flags);
- if (rlen < 0) {
- printk(KERN_INFO "kernel_recvmsg error: %d\n", rlen);
- }
- /* TBD: kernel_recvmsg doesn't fill in the name and namelen
- */
+ dout(30, "ceph_tcp_recvmsg %p len %d ret = %d\n", sock, (int)len, rlen);
return(rlen);
-
}
/*
struct msghdr msg = {.msg_flags = 0};
int rlen = 0;
- printk(KERN_INFO "entered ksendmsg\n");
+ dout(30, "ceph_tcp_sendmsg %p len %d\n", sock, (int)len);
msg.msg_flags |= MSG_DONTWAIT | MSG_NOSIGNAL;
-
rlen = kernel_sendmsg(sock, &msg, iov, kvlen, len);
- if (rlen < 0) {
- printk(KERN_INFO "kernel_sendmsg error: %d\n", rlen);
- }
+ dout(30, "ceph_tcp_sendmsg %p len %d ret = %d\n", sock, (int)len, rlen);
return(rlen);
}
INIT_LIST_HEAD(&con->out_queue);
INIT_LIST_HEAD(&con->out_sent);
- spin_lock_init(&con->con_lock);
+ spin_lock_init(&con->lock);
set_bit(NEW, &con->state);
INIT_WORK(&con->rwork, try_read); /* setup work structure */
INIT_WORK(&con->swork, try_write); /* setup work structure */
dout(20, "__remove_connection %p from %p\n", con, msgr);
list_del(&con->list_all);
- if (test_bit(CONNECTING, &con->state) ||
+ if (test_bit(CONNECTING, &con->state) ||
test_bit(OPEN, &con->state)) {
/* remove from con_open too */
key = hash_addr(&con->peer_addr);
{
int ret;
+ dout(30, "write_partial_kvec %p left %d vec %d bytes\n", con,
+ con->out_kvec_left, con->out_kvec_bytes);
while (con->out_kvec_bytes > 0) {
ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, con->out_kvec_left, con->out_kvec_bytes);
- if (ret < 0) return ret; /* error */
- if (ret == 0) return 0; /* socket full */
+ if (ret <= 0) goto out;
con->out_kvec_bytes -= ret;
if (con->out_kvec_bytes == 0)
break; /* done */
}
}
con->out_kvec_left = 0;
- return 1; /* done! */
+ ret = 1;
+out:
+ dout(30, "write_partial_kvec %p left %d vec %d bytes ret = %d\n", con,
+ con->out_kvec_left, con->out_kvec_bytes, ret);
+ return ret; /* done! */
}
static int write_partial_msg_pages(struct ceph_connection *con, struct ceph_msg *msg)
/* move to sending/sent list */
list_del(&m->list_head);
- list_add(&m->list_head, &con->out_sent);
+ list_add_tail(&m->list_head, &con->out_sent);
con->out_msg = m;
/* encode header */
ceph_encode_header(&con->out_hdr, &m->hdr);
+ dout(20, "prepare_write_message %p seq %d type %d len %d+%d\n",
+ m, m->hdr.seq, m->hdr.type, m->hdr.front_len, m->hdr.data_len);
+
/* tag + hdr + front */
con->out_kvec[0].iov_base = &tag_msg;
con->out_kvec[0].iov_len = 1;
con->out_msg_pos.page = 0;
con->out_msg_pos.page_pos = m->hdr.data_off & PAGE_MASK;
con->out_msg_pos.data_pos = 0;
+
+ set_bit(WRITE_PENDING, &con->state);
}
/*
con->out_kvec_left = 2;
con->out_kvec_bytes = 1 + sizeof(con->in_seq_acked);
con->out_kvec_cur = con->out_kvec;
+ set_bit(WRITE_PENDING, &con->state);
+}
+
+static void prepare_write_connect(struct ceph_messenger *msgr, struct ceph_connection *con)
+{
+ con->out_kvec[0].iov_base = &msgr->inst.addr;
+ con->out_kvec[0].iov_len = sizeof(msgr->inst.addr);
+ con->out_kvec[1].iov_base = &con->connect_seq;
+ con->out_kvec[1].iov_len = sizeof(con->connect_seq);
+ con->out_kvec_left = 2;
+ con->out_kvec_bytes = sizeof(msgr->inst.addr) + sizeof(con->connect_seq);
+ con->out_kvec_cur = con->out_kvec;
+ set_bit(WRITE_PENDING, &con->state);
}
static void prepare_write_accept_announce(struct ceph_messenger *msgr, struct ceph_connection *con)
con->out_kvec_left = 1;
con->out_kvec_bytes = sizeof(msgr->inst.addr);
con->out_kvec_cur = con->out_kvec;
+ set_bit(WRITE_PENDING, &con->state);
}
static void prepare_write_accept_ready(struct ceph_connection *con)
con->out_kvec_left = 1;
con->out_kvec_bytes = 1;
con->out_kvec_cur = con->out_kvec;
+ set_bit(WRITE_PENDING, &con->state);
}
+
static void prepare_write_accept_reject(struct ceph_connection *con)
{
con->out_kvec[0].iov_base = &tag_reject;
con->out_kvec_left = 2;
con->out_kvec_bytes = 1 + sizeof(con->connect_seq);
con->out_kvec_cur = con->out_kvec;
+ set_bit(WRITE_PENDING, &con->state);
}
/*
/* kvec data queued? */
if (con->out_kvec_left) {
ret = write_partial_kvec(con);
- if (ret == 0)
- goto done;
-
- if (test_bit(REJECTING, &con->state)) {
+ if (test_and_clear_bit(REJECTING, &con->state)) {
+ dout(30, "try_write done rejecting, state %u, closing\n", con->state);
/* FIXME do something else here, pbly? */
remove_connection(msgr, con);
set_bit(CLOSED, &con->state);
put_connection(con);
}
-
- /* TBD: handle error; return for now */
- if (ret < 0) {
+ if (ret <= 0) {
+ /* TBD: handle error; return for now */
con->error = ret;
goto done; /* error */
}
}
/* hmm, nothing to do! No more writes pending? */
- if (ret)
- clear_bit(WRITE_PEND, &con->state);
+ dout(30, "try_write nothing else to write\n");
+ clear_bit(WRITE_PENDING, &con->state);
+
done:
return;
}
}
+/*
+ * read portion of connect-side handshake on a new connection
+ */
+static int read_connect_partial(struct ceph_connection *con)
+{
+ int ret, to;
+ dout(20, "read_connect_partial %p start at %d\n", con, con->in_base_pos);
+
+ /* actual_peer_addr */
+ to = sizeof(con->actual_peer_addr);
+ while (con->in_base_pos < to) {
+ int left = to - con->in_base_pos;
+ int have = con->in_base_pos;
+ ret = ceph_tcp_recvmsg(con->sock, (char*)&con->actual_peer_addr + have, left);
+ if (ret <= 0) goto out;
+ con->in_base_pos += ret;
+ }
+
+ /* in_tag */
+ to = sizeof(con->actual_peer_addr) + 1;
+ if (con->in_base_pos < to) {
+ ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
+ if (ret <= 0) goto out;
+ con->in_base_pos += ret;
+ }
+
+ /* peer_connect_seq */
+ to += sizeof(con->peer_connect_seq);
+ if (con->in_base_pos < to) {
+ int left = to - con->in_base_pos;
+ int have = sizeof(con->peer_connect_seq) - left;
+ ret = ceph_tcp_recvmsg(con->sock, (char*)&con->peer_connect_seq + have, left);
+ if (ret <= 0) goto out;
+ con->in_base_pos += ret;
+ }
+ ret = 1;
+out:
+ dout(20, "read_connect_partial %p end at %d ret %d\n", con, con->in_base_pos, ret);
+ return ret; /* done */
+}
+
+static void process_connect(struct ceph_connection *con)
+{
+ dout(20, "process_connect on %p tag %d\n", con, (int)con->in_tag);
+ clear_bit(CONNECTING, &con->state);
+ if (!ceph_entity_addr_is_local(con->peer_addr, con->actual_peer_addr)) {
+ derr(1, "process_connect wrong peer, want %x:%d/%d, got %x:%d/%d, wtf\n",
+ ntohl(con->peer_addr.ipaddr.sin_addr.s_addr),
+ ntohs(con->peer_addr.ipaddr.sin_port),
+ con->peer_addr.nonce,
+ ntohl(con->actual_peer_addr.ipaddr.sin_addr.s_addr),
+ ntohs(con->actual_peer_addr.ipaddr.sin_port),
+ con->actual_peer_addr.nonce);
+ con->in_tag = CEPH_MSGR_TAG_REJECT;
+ }
+ if (con->in_tag == CEPH_MSGR_TAG_REJECT) {
+ dout(10, "process_connect got REJECT peer seq %u\n", con->peer_connect_seq);
+ set_bit(CLOSED, &con->state);
+ }
+ if (con->in_tag == CEPH_MSGR_TAG_READY) {
+ dout(10, "process_connect got READY, now open\n");
+ set_bit(OPEN, &con->state);
+ }
+}
+
+
+
/*
- * read portion of handshake on a newly accepted connection
+ * read portion of accept-side handshake on a newly accepted connection
*/
static int read_accept_partial(struct ceph_connection *con)
{
spin_lock(&con->msgr->con_lock);
existing = get_connection(con->msgr, &con->peer_addr);
if (existing) {
- spin_lock(&existing->con_lock);
+ spin_lock(&existing->lock);
if ((test_bit(CONNECTING, &existing->state) &&
compare_addr(&con->msgr->inst.addr, &con->peer_addr)) ||
(test_bit(OPEN, &existing->state) &&
con->out_seq = existing->out_seq;
set_bit(OPEN, &con->state);
set_bit(CLOSED, &existing->state);
+ clear_bit(OPEN, &existing->state);
} else {
/* reject new connection */
set_bit(REJECTING, &con->state);
con->connect_seq = existing->connect_seq; /* send this with the reject */
}
- spin_unlock(&existing->con_lock);
+ spin_unlock(&existing->lock);
put_connection(existing);
} else {
add_connection(con->msgr, con);
- set_bit(OPEN, &con->state);
+ con->state = OPEN;
}
spin_unlock(&con->msgr->con_lock);
/* the result? */
+ clear_bit(ACCEPTING, &con->state);
if (test_bit(REJECTING, &con->state))
prepare_write_accept_reject(con);
else
struct ceph_messenger *msgr;
con = container_of(work, struct ceph_connection, rwork);
+ spin_lock(&con->lock);
msgr = con->msgr;
more:
if (test_bit(CLOSED, &con->state)) goto done;
if (test_bit(ACCEPTING, &con->state)) {
+ dout(20, "try_read accepting\n");
ret = read_accept_partial(con);
if (ret <= 0) goto done;
/* accepted */
process_accept(con);
goto more;
}
+ if (test_bit(CONNECTING, &con->state)) {
+ dout(20, "try_read connecting\n");
+ ret = read_connect_partial(con);
+ if (ret <= 0) goto done;
+ process_connect(con);
+ if (test_bit(CLOSED, &con->state)) goto done;
+ }
if (con->in_tag == CEPH_MSGR_TAG_READY) {
ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);
else if (con->in_tag == CEPH_MSGR_TAG_ACK)
prepare_read_ack(con);
else {
- printk(KERN_INFO "bad tag %d\n", (int)con->in_tag);
+ derr(2, "try_read got bad tag %d\n", (int)con->in_tag);
+ ret = -EINVAL;
goto bad;
}
goto more;
con->in_tag = CEPH_MSGR_TAG_READY;
goto more;
}
+ derr(2, "try_read bad con->in_tag = %d\n", (int)con->in_tag);
bad:
BUG_ON(1); /* shouldn't get here */
done:
con->error = ret;
+ spin_unlock(&con->lock);
return;
}
derr(1, "malloc failure\n");
goto done;
}
-
- if(ceph_tcp_accept(msgr->listen_sock, new_con) < 0) {
+ if (ceph_tcp_accept(msgr->listen_sock, new_con) < 0) {
derr(1, "error accepting connection\n");
- kfree(new_con);
+ put_connection(new_con);
goto done;
}
dout(5, "accepted connection \n");
new_con->in_tag = CEPH_MSGR_TAG_READY;
-
+ set_bit(ACCEPTING, &new_con->state);
prepare_write_accept_announce(msgr, new_con);
-
add_connection_accepting(msgr, new_con);
- set_bit(WRITE_PEND, &new_con->state);
/*
* hand off to worker threads ,should be able to write, we want to
* try to write right away, we may have missed socket state change
/*
* create a new messenger instance, creates listening socket
*/
-struct ceph_messenger *ceph_messenger_create()
+struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr)
{
struct ceph_messenger *msgr;
int ret = 0;
kfree(msgr);
return ERR_PTR(ret);
}
-
+ if (myaddr)
+ msgr->inst.addr.ipaddr.sin_addr = myaddr->ipaddr.sin_addr;
dout(1, "ceph_messenger_create %p listening on %x:%d\n", msgr,
ntohl(msgr->inst.addr.ipaddr.sin_addr.s_addr),
ntohs(msg->hdr.dst.addr.ipaddr.sin_port));
}
- spin_lock(&con->con_lock);
+ spin_lock(&con->lock);
/* initiate connect? */
- if (test_bit(NEW, &con->state)) {
- dout(5, "ceph_msg_send initiating connect on %p\n", con);
+ dout(5, "ceph_msg_send connection %p state is %u\n", con, con->state);
+ if (test_and_clear_bit(NEW, &con->state)) {
+ set_bit(CONNECTING, &con->state);
+ dout(5, "ceph_msg_send initiating connect on %p new state %u\n", con, con->state);
ret = ceph_tcp_connect(con);
- if (ret < 0){
+ if (ret < 0) {
derr(1, "connection failure to peer %x:%d\n",
ntohl(msg->hdr.dst.addr.ipaddr.sin_addr.s_addr),
ntohs(msg->hdr.dst.addr.ipaddr.sin_port));
put_connection(con);
return(ret);
}
+ prepare_write_connect(msgr, con);
}
/* queue */
- dout(1, "ceph_msg_send queuing outgoing message for %s%d on %p\n",
+ msg->hdr.seq = ++con->out_seq;
+ dout(1, "ceph_msg_send queuing %p seq %u for %s%d on %p\n", msg, msg->hdr.seq,
ceph_name_type_str(msg->hdr.dst.name.type), msg->hdr.dst.name.num, con);
ceph_msg_get(msg);
+ list_add_tail(&msg->list_head, &con->out_queue);
- list_add(&msg->list_head, &con->out_queue);
- set_bit(WRITE_PEND, &con->state);
- spin_unlock(&con->con_lock);
+ spin_unlock(&con->lock);
put_connection(con);
return ret;
}
struct ceph_msg *m;
int i;
- m = kzalloc(sizeof(*m), GFP_KERNEL);
+ m = kmalloc(sizeof(*m), GFP_KERNEL);
if (m == NULL)
goto out;
atomic_set(&m->nref, 1);
if (m->pages[i] == NULL)
goto out2;
}
+ } else {
+ m->pages = 0;
}
+
+ INIT_LIST_HEAD(&m->list_head);
return m;
out2:
void ceph_msg_put(struct ceph_msg *m)
{
+ int i;
if (atomic_dec_and_test(&m->nref)) {
- int i;
+ dout(30, "ceph_msg_put last one on %p\n", m);
if (m->pages) {
for (i=0; i<m->nr_pages; i++)
if (m->pages[i])