ret = con->msgr->prepare_pages(con->msgr->parent, m, want);
if (ret < 0) {
dout(10, "prepare_pages failed, skipping payload\n");
- con->in_base_pos = -data_len; /* skip payload */
+ con->in_base_pos = -data_len - sizeof(m->footer);
ceph_msg_put(con->in_msg);
con->in_msg = 0;
con->in_tag = CEPH_MSGR_TAG_READY;
if (!m->pages) {
dout(10, "pages revoked during msg read\n");
mutex_unlock(&m->page_mutex);
- con->in_base_pos = con->in_msg_pos.data_pos - data_len;
+ con->in_base_pos = con->in_msg_pos.data_pos - data_len -
+ sizeof(m->footer);
ceph_msg_put(m);
con->in_msg = 0;
con->in_tag = CEPH_MSGR_TAG_READY;
le32_to_cpu(con->in_connect_seq));
reset_connection(con);
prepare_write_connect_retry(con->msgr, con);
+ prepare_read_connect(con);
con->msgr->peer_reset(con->msgr->parent, &con->peer_name);
break;
case CEPH_MSGR_TAG_RETRY:
le32_to_cpu(con->in_connect_seq));
con->connect_seq = le32_to_cpu(con->in_connect_seq);
prepare_write_connect_retry(con->msgr, con);
+ prepare_read_connect(con);
break;
case CEPH_MSGR_TAG_WAIT:
dout(10, "process_connect peer connecting WAIT\n");
if (con->in_base_pos < 0) {
/* skipping + discarding content */
static char buf[1024];
- ret = ceph_tcp_recvmsg(con->sock, buf,
- min(1024, -con->in_base_pos));
+ int skip = min(1024, -con->in_base_pos);
+ dout(20, "skipping %d / %d bytes\n", skip, -con->in_base_pos);
+ ret = ceph_tcp_recvmsg(con->sock, buf, skip);
if (ret <= 0)
goto done;
con->in_base_pos += ret;
- goto more;
+ if (con->in_base_pos)
+ goto more;
}
if (con->in_tag == CEPH_MSGR_TAG_READY) {
ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1);