/* move to sending/sent list */
list_del_init(&m->list_head);
list_add_tail(&m->list_head, &con->out_sent);
- con->out_msg = m; /* FIXME: do we want to take a reference here? */
+ con->out_msg = m; /* we dont bother taking a reference here. */
/* encode header */
dout(20, "prepare_write_message %p seq %lld type %d len %d+%d %d pgs\n",
con->connect_seq++;
prepare_write_connect(msgr, con);
set_bit(CONNECTING, &con->state);
+ con->in_tag = CEPH_MSGR_TAG_READY;
dout(5, "try_write initiating connect on %p new state %lu\n",
con, con->state);
ret = ceph_tcp_connect(con);
{
int err;
BUG_ON(con->in_msg != NULL);
- con->in_tag = CEPH_MSGR_TAG_MSG;
con->in_base_pos = 0;
con->in_msg = ceph_msg_new(0, 0, 0, 0, 0);
if (IS_ERR(con->in_msg)) {
return 1; /* done! */
}
+static void process_message(struct ceph_connection *con)
+{
+ /* if first message, set peer_name */
+ if (con->peer_name.type == 0)
+ con->peer_name = con->in_msg->hdr.src.name;
+
+ spin_lock(&con->out_queue_lock);
+ con->in_seq++;
+ spin_unlock(&con->out_queue_lock);
+ ceph_queue_write(con);
+
+ dout(1, "===== %p %u from %s%d %d=%s len %d+%d =====\n",
+ con->in_msg, le32_to_cpu(con->in_msg->hdr.seq),
+ ENTITY_NAME(con->in_msg->hdr.src.name),
+ le32_to_cpu(con->in_msg->hdr.type),
+ ceph_msg_type_name(le32_to_cpu(con->in_msg->hdr.type)),
+ le32_to_cpu(con->in_msg->hdr.front_len),
+ le32_to_cpu(con->in_msg->hdr.data_len));
+ con->msgr->dispatch(con->msgr->parent, con->in_msg);
+ con->in_msg = 0;
+ con->in_tag = CEPH_MSGR_TAG_READY;
+}
/*
* prepare to read an ack
*/
static void prepare_read_ack(struct ceph_connection *con)
{
- con->in_tag = CEPH_MSGR_TAG_ACK;
con->in_base_pos = 0;
}
return 1; /* done */
}
-static void process_ack(struct ceph_connection *con, __u32 ack)
+static void process_ack(struct ceph_connection *con)
{
struct ceph_msg *m;
- __u64 seq;
+ u32 ack = le32_to_cpu(con->in_partial_ack);
+ u64 seq;
+
while (!list_empty(&con->out_sent)) {
m = list_entry(con->out_sent.next, struct ceph_msg, list_head);
seq = le64_to_cpu(m->hdr.seq);
list_del_init(&m->list_head);
ceph_msg_put(m);
}
+ con->in_tag = CEPH_MSGR_TAG_READY;
}
default:
derr(1, "process_connect protocol error, will retry\n");
con->delay = BASE_DELAY_INTERVAL;
- con->error_msg = "protocol error";
+ con->error_msg = "protocol error, garbage tag during connect";
ceph_fault(con);
}
if (test_bit(WRITE_PENDING, &con->state))
if (ret <= 0)
goto done;
process_connect(con);
- if (con->in_tag == CEPH_MSGR_TAG_RETRY)
- goto more;
- if (test_bit(CLOSED, &con->state))
- goto done;
+ goto more;
}
if (con->in_base_pos < 0) {
if (ret <= 0)
goto done;
dout(30, "try_read got tag %d\n", (int)con->in_tag);
- if (con->in_tag == CEPH_MSGR_TAG_MSG)
+ switch (con->in_tag) {
+ case CEPH_MSGR_TAG_MSG:
prepare_read_message(con);
- else if (con->in_tag == CEPH_MSGR_TAG_ACK)
+ break;
+ case CEPH_MSGR_TAG_ACK:
prepare_read_ack(con);
- else if (con->in_tag == CEPH_MSGR_TAG_CLOSE) {
+ break;
+ case CEPH_MSGR_TAG_CLOSE:
set_bit(CLOSED, &con->state); /* fixme */
goto done;
- } else {
- derr(2, "try_read got bad tag %d\n", (int)con->in_tag);
- ret = -EINVAL;
- goto bad;
+ default:
+ goto bad_tag;
}
- goto more;
}
if (con->in_tag == CEPH_MSGR_TAG_MSG) {
ret = read_message_partial(con);
if (ret <= 0)
goto done;
-
- /* if first message, set peer_name */
- if (con->peer_name.type == 0)
- con->peer_name = con->in_msg->hdr.src.name;
-
- spin_lock(&con->out_queue_lock);
- con->in_seq++;
- spin_unlock(&con->out_queue_lock);
- ceph_queue_write(con);
-
- dout(1, "===== %p %u from %s%d %d=%s len %d+%d =====\n",
- con->in_msg, le32_to_cpu(con->in_msg->hdr.seq),
- ENTITY_NAME(con->in_msg->hdr.src.name),
- le32_to_cpu(con->in_msg->hdr.type),
- ceph_msg_type_name(le32_to_cpu(con->in_msg->hdr.type)),
- le32_to_cpu(con->in_msg->hdr.front_len),
- le32_to_cpu(con->in_msg->hdr.data_len));
- msgr->dispatch(con->msgr->parent, con->in_msg);
- con->in_msg = 0;
- con->in_tag = CEPH_MSGR_TAG_READY;
+ process_message(con);
goto more;
}
if (con->in_tag == CEPH_MSGR_TAG_ACK) {
ret = read_ack_partial(con);
if (ret <= 0)
goto done;
- /* got an ack */
- process_ack(con, con->in_partial_ack);
- con->in_tag = CEPH_MSGR_TAG_READY;
+ process_ack(con);
goto more;
}
+
+bad_tag:
derr(2, "try_read bad con->in_tag = %d\n", (int)con->in_tag);
-bad:
- BUG_ON(1); /* shouldn't get here */
+ con->error_msg = "protocol error, garbage tag";
+ ceph_fault(con);
+
done:
clear_bit(READING, &con->state);
if (test_bit(READABLE, &con->state)) {
/* initialize the msgr connection */
new_con = new_connection(msgr);
if (new_con == NULL) {
- derr(1, "malloc failure\n");
+ derr(1, "kmalloc failure accepting new connection\n");
goto done;
}
if (ceph_tcp_accept(msgr->listen_sock, new_con) < 0) {
}
dout(5, "accepted connection \n");
- new_con->in_tag = CEPH_MSGR_TAG_READY;
new_con->connect_seq = 1;
set_bit(ACCEPTING, &new_con->state);
clear_bit(NEW, &new_con->state);