// identify peer
char banner[strlen(CEPH_BANNER)+1];
- rc = tcp_read(sd, banner, strlen(CEPH_BANNER));
+ rc = tcp_read(sd, banner, strlen(CEPH_BANNER), messenger->timeout);
if (rc < 0) {
dout(10) << "accept couldn't read banner" << dendl;
state = STATE_CLOSED;
bufferptr tp(sizeof(peer_addr));
addrbl.push_back(tp);
}
- rc = tcp_read(sd, addrbl.c_str(), addrbl.length());
+ rc = tcp_read(sd, addrbl.c_str(), addrbl.length(), messenger->timeout);
if (rc < 0) {
dout(10) << "accept couldn't read peer_addr" << dendl;
state = STATE_CLOSED;
int reply_tag = 0;
bool replace = false;
while (1) {
- rc = tcp_read(sd, (char*)&connect, sizeof(connect));
+ rc = tcp_read(sd, (char*)&connect, sizeof(connect), messenger->timeout);
if (rc < 0) {
dout(10) << "accept couldn't read connect" << dendl;
goto fail_unlocked;
authorizer.clear();
if (connect.authorizer_len) {
bp = buffer::create(connect.authorizer_len);
- if (tcp_read(sd, bp.c_str(), connect.authorizer_len) < 0) {
+ if (tcp_read(sd, bp.c_str(), connect.authorizer_len, messenger->timeout) < 0) {
dout(10) << "accept couldn't read connect authorizer" << dendl;
goto fail_unlocked;
}
// verify banner
// FIXME: this should be non-blocking, or in some other way verify the banner as we get it.
- rc = tcp_read(sd, (char*)&banner, strlen(CEPH_BANNER));
+ rc = tcp_read(sd, (char*)&banner, strlen(CEPH_BANNER), messenger->timeout);
if (rc < 0) {
dout(2) << "connect couldn't read banner, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
goto fail;
bufferptr p(sizeof(paddr) * 2);
addrbl.push_back(p);
}
- rc = tcp_read(sd, addrbl.c_str(), addrbl.length());
+ rc = tcp_read(sd, addrbl.c_str(), addrbl.length(), messenger->timeout);
if (rc < 0) {
dout(2) << "connect couldn't read peer addrs, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
goto fail;
dout(20) << "connect wrote (self +) cseq, waiting for reply" << dendl;
ceph_msg_connect_reply reply;
- if (tcp_read(sd, (char*)&reply, sizeof(reply)) < 0) {
+ if (tcp_read(sd, (char*)&reply, sizeof(reply), messenger->timeout) < 0) {
dout(2) << "connect read reply " << strerror_r(errno, buf, sizeof(buf)) << dendl;
goto fail;
}
if (reply.authorizer_len) {
dout(10) << "reply.authorizer_len=" << reply.authorizer_len << dendl;
bufferptr bp = buffer::create(reply.authorizer_len);
- if (tcp_read(sd, bp.c_str(), reply.authorizer_len) < 0) {
+ if (tcp_read(sd, bp.c_str(), reply.authorizer_len, messenger->timeout) < 0) {
dout(10) << "connect couldn't read connect authorizer_reply" << dendl;
goto fail;
}
char buf[80];
char tag = -1;
dout(20) << "reader reading tag..." << dendl;
- int rc = tcp_read(sd, (char*)&tag, 1);
+ int rc = tcp_read(sd, (char*)&tag, 1, messenger->timeout);
if (rc < 0) {
pipe_lock.Lock();
dout(2) << "reader couldn't read tag, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
if (tag == CEPH_MSGR_TAG_ACK) {
dout(20) << "reader got ACK" << dendl;
ceph_le64 seq;
- int rc = tcp_read( sd, (char*)&seq, sizeof(seq));
+ int rc = tcp_read( sd, (char*)&seq, sizeof(seq), messenger->timeout);
pipe_lock.Lock();
if (rc < 0) {
dout(2) << "reader couldn't read ack seq, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
__u32 header_crc;
if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) {
- if (tcp_read( sd, (char*)&header, sizeof(header) ) < 0)
+ if (tcp_read( sd, (char*)&header, sizeof(header), messenger->timeout ) < 0)
return -1;
header_crc = ceph_crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
} else {
ceph_msg_header_old oldheader;
- if (tcp_read( sd, (char*)&oldheader, sizeof(oldheader) ) < 0)
+ if (tcp_read( sd, (char*)&oldheader, sizeof(oldheader), messenger->timeout ) < 0)
return -1;
// this is fugly
memcpy(&header, &oldheader, sizeof(header));
front_len = header.front_len;
if (front_len) {
bufferptr bp = buffer::create(front_len);
- if (tcp_read( sd, bp.c_str(), front_len ) < 0)
+ if (tcp_read( sd, bp.c_str(), front_len, messenger->timeout ) < 0)
goto out_dethrottle;
front.push_back(bp);
dout(20) << "reader got front " << front.length() << dendl;
middle_len = header.middle_len;
if (middle_len) {
bufferptr bp = buffer::create(middle_len);
- if (tcp_read( sd, bp.c_str(), middle_len ) < 0)
+ if (tcp_read( sd, bp.c_str(), middle_len, messenger->timeout ) < 0)
goto out_dethrottle;
middle.push_back(bp);
dout(20) << "reader got middle " << middle.length() << dendl;
int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK),
(unsigned)left);
bufferptr bp = buffer::create(head);
- if (tcp_read( sd, bp.c_str(), head ) < 0)
+ if (tcp_read( sd, bp.c_str(), head, messenger->timeout ) < 0)
goto out_dethrottle;
data.push_back(bp);
left -= head;
int middle = left & PAGE_MASK;
if (middle > 0) {
bufferptr bp = buffer::create_page_aligned(middle);
- if (tcp_read( sd, bp.c_str(), middle ) < 0)
+ if (tcp_read( sd, bp.c_str(), middle, messenger->timeout ) < 0)
goto out_dethrottle;
data.push_back(bp);
left -= middle;
if (left) {
bufferptr bp = buffer::create(left);
- if (tcp_read( sd, bp.c_str(), left ) < 0)
+ if (tcp_read( sd, bp.c_str(), left, messenger->timeout ) < 0)
goto out_dethrottle;
data.push_back(bp);
dout(20) << "reader got data tail " << left << dendl;
}
// footer
- if (tcp_read(sd, (char*)&footer, sizeof(footer)) < 0)
+ if (tcp_read(sd, (char*)&footer, sizeof(footer), messenger->timeout) < 0)
goto out_dethrottle;
aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0;