<< ").";
}
+const int AsyncConnection::TCP_PREFETCH_MIN_SIZE = 512;
+
class C_time_wakeup : public EventCallback {
AsyncConnectionRef conn;
AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c)
: Connection(cct, m), async_msgr(m), global_seq(0), connect_seq(0), peer_global_seq(0),
out_seq(0), in_seq(0), in_seq_acked(0), state(STATE_NONE), state_after_send(0), sd(-1),
- port(-1), lock("AsyncConnection::lock"), open_write(false), keepalive(false),
- stop_lock("AsyncConnection::stop_lock"),
- got_bad_auth(false), authorizer(NULL), replacing(false), stopping(0),
- state_buffer(4096), state_offset(0), net(cct), center(c)
+ port(-1), lock("AsyncConnection::lock"), open_write(false), keepalive(false), recv_buf(NULL),
+ recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
+ recv_start(0), recv_end(0), stop_lock("AsyncConnection::stop_lock"),
+ got_bad_auth(false), authorizer(NULL), replacing(false),
+ state_buffer(NULL), state_offset(0), net(cct), center(c)
{
read_handler.reset(new C_handle_read(this));
write_handler.reset(new C_handle_write(this));
accept_handler.reset(new C_deliver_accept(async_msgr, this));
local_deliver_handler.reset(new C_local_deliver(this));
memset(msgvec, 0, sizeof(msgvec));
+ // double recv_max_prefetch see "read_until"
+ recv_buf = new char[2*recv_max_prefetch];
+ state_buffer = new char[4096];
}
AsyncConnection::~AsyncConnection()
{
assert(!authorizer);
+ if (recv_buf)
+ delete recv_buf;
+ if (state_buffer)
+ delete state_buffer;
}
/* return -1 means `fd` occurs error or closed, it should be closed
// the needed buffer, so the passed in bufferptr must be the same.
// Normally, only "read_message" will pass existing bufferptr in
//
+// And it will uses readahead method to reduce small read overhead,
+// "recv_buf" is used to store read buffer
+//
// return the remaining bytes, 0 means this buffer is finished
// else return < 0 means error
-int AsyncConnection::read_until(uint64_t needed, bufferptr &p)
+int AsyncConnection::read_until(uint64_t len, char *p)
{
- assert(needed);
- int offset = state_offset;
- int left = needed - offset;
- int r;
- do {
- r = read_bulk(sd, p.c_str()+offset, left);
- if (r < 0) {
- ldout(async_msgr->cct, 1) << __func__ << " read failed, state is " << get_state_name(state) << dendl;
- return -1;
- } else if (r == left) {
- state_offset = 0;
+ assert(len);
+ ldout(async_msgr->cct, 20) << __func__ << " len is " << len << " state_offset is "
+ << state_offset << dendl;
+ int r = 0;
+ uint64_t left = len - state_offset;
+ if (recv_end > recv_start) {
+ assert(state_offset == 0);
+ uint64_t to_read = MIN(recv_end - recv_start, left);
+ memcpy(p, recv_buf+recv_start, to_read);
+ recv_start += to_read;
+ left -= to_read;
+ ldout(async_msgr->cct, 20) << __func__ << " got " << to_read << " in buffer "
+ << " left is " << left << " buffer still has "
+ << recv_end - recv_start << dendl;
+ if (left == 0) {
return 0;
}
- left -= r;
- offset += r;
- } while (r > 0);
-
- state_offset = offset;
- ldout(async_msgr->cct, 20) << __func__ << " read " << r << " bytes, state is "
- << get_state_name(state) << dendl;
- return needed - offset;
+ state_offset += to_read;
+ }
+
+ assert(recv_end == recv_start);
+ recv_end = recv_start = 0;
+ /* nothing left in the prefetch buffer */
+ if (len > recv_max_prefetch) {
+ /* this was a large read, we don't prefetch for these */
+ do {
+ r = read_bulk(sd, p+state_offset, left);
+ ldout(async_msgr->cct, 20) << __func__ << " read_bulk left is " << left << " got " << r << dendl;
+ if (r < 0) {
+ ldout(async_msgr->cct, 1) << __func__ << " read failed, state is " << get_state_name(state) << dendl;
+ return -1;
+ } else if (r == static_cast<int>(left)) {
+ state_offset = 0;
+ return 0;
+ }
+ state_offset += r;
+ left -= r;
+ } while (r > 0);
+ } else {
+ do {
+ r = read_bulk(sd, recv_buf+recv_end, recv_max_prefetch);
+ ldout(async_msgr->cct, 20) << __func__ << " read_bulk recv_end is " << recv_end
+ << " left is " << left << " got " << r << dendl;
+ if (r < 0) {
+ ldout(async_msgr->cct, 1) << __func__ << " read failed, state is " << get_state_name(state) << dendl;
+ return -1;
+ }
+ recv_end += r;
+ if (r >= static_cast<int>(left)) {
+ recv_start = len - state_offset;
+ memcpy(p+state_offset, recv_buf, recv_start);
+ state_offset = 0;
+ return 0;
+ }
+ left -= r;
+ } while (r > 0);
+ memcpy(p+state_offset, recv_buf, recv_end-recv_start);
+ state_offset += (recv_end - recv_start);
+ recv_end = recv_start = 0;
+ }
+ ldout(async_msgr->cct, 20) << __func__ << " need len " << len << " remaining "
+ << len - state_offset << " bytes, state is "
+ << get_state_name(state) << dendl;
+ return len - state_offset;
}
void AsyncConnection::process()
case STATE_OPEN:
{
char tag = -1;
- r = read_bulk(sd, &tag, sizeof(tag));
+ r = read_until(sizeof(tag), &tag);
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " read tag failed, state is "
<< get_state_name(state) << dendl;
goto fail;
- } else if (r == 0) {
+ } else if (r > 0) {
break;
}
- assert(r == 1);
if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE" << dendl;
}
ldout(async_msgr->cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
- t = (ceph_timespec*)(state_buffer.c_str());
+ t = (ceph_timespec*)state_buffer;
utime_t kp_t = utime_t(*t);
_send_keepalive_or_ack(true, &kp_t);
ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl;
break;
}
- t = (ceph_timespec*)(state_buffer.c_str());
+ t = (ceph_timespec*)state_buffer;
last_keepalive_ack = utime_t(*t);
ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl;
state = STATE_OPEN;
break;
}
- seq = (ceph_le64*)(state_buffer.c_str());
+ seq = (ceph_le64*)state_buffer;
ldout(async_msgr->cct, 20) << __func__ << " got ACK" << dendl;
handle_ack(*seq);
state = STATE_OPEN;
ldout(async_msgr->cct, 20) << __func__ << " got MSG header" << dendl;
if (has_feature(CEPH_FEATURE_NOSRCADDR)) {
- header = *((ceph_msg_header*)state_buffer.c_str());
+ header = *((ceph_msg_header*)state_buffer);
header_crc = ceph_crc32c(0, (unsigned char *)&header,
sizeof(header) - sizeof(header.crc));
} else {
- oldheader = *((ceph_msg_header_old*)state_buffer.c_str());
+ oldheader = *((ceph_msg_header_old*)state_buffer);
// this is fugly
memcpy(&header, &oldheader, sizeof(header));
header.src = oldheader.src.name;
int front_len = current_header.front_len;
if (front_len) {
bufferptr ptr = buffer::create(front_len);
- r = read_until(front_len, ptr);
+ r = read_until(front_len, ptr.c_str());
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " read message front failed" << dendl;
goto fail;
int middle_len = current_header.middle_len;
if (middle_len) {
bufferptr ptr = buffer::create(middle_len);
- r = read_until(middle_len, ptr);
+ r = read_until(middle_len, ptr.c_str());
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " read message middle failed" << dendl;
goto fail;
while (msg_left > 0) {
bufferptr bp = data_blp.get_current_ptr();
uint64_t read = MIN(bp.length(), msg_left);
- r = read_until(read, bp);
+ r = read_until(read, bp.c_str());
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " read data error " << dendl;
goto fail;
}
if (has_feature(CEPH_FEATURE_MSG_AUTH)) {
- footer = *((ceph_msg_footer*)state_buffer.c_str());
+ footer = *((ceph_msg_footer*)state_buffer);
} else {
- old_footer = *((ceph_msg_footer_old*)state_buffer.c_str());
+ old_footer = *((ceph_msg_footer_old*)state_buffer);
footer.front_crc = old_footer.front_crc;
footer.middle_crc = old_footer.middle_crc;
footer.data_crc = old_footer.data_crc;
break;
}
- if (memcmp(state_buffer.c_str(), CEPH_BANNER, strlen(CEPH_BANNER))) {
+ if (memcmp(state_buffer, CEPH_BANNER, strlen(CEPH_BANNER))) {
ldout(async_msgr->cct, 0) << __func__ << " connect protocol error (bad banner) on peer "
<< get_peer_addr() << dendl;
goto fail;
ldout(async_msgr->cct, 10) << __func__ << " get banner, ready to send banner" << dendl;
bufferlist bl;
- bl.append(state_buffer.c_str(), strlen(CEPH_BANNER));
+ bl.append(state_buffer, strlen(CEPH_BANNER));
r = _try_send(bl);
if (r == 0) {
state = STATE_CONNECTING_WAIT_IDENTIFY_PEER;
}
bufferlist bl;
- bl.append(state_buffer);
+ bl.append(state_buffer, sizeof(paddr)*2);
bufferlist::iterator p = bl.begin();
try {
::decode(paddr, p);
break;
}
- connect_reply = *((ceph_msg_connect_reply*)state_buffer.c_str());
+ connect_reply = *((ceph_msg_connect_reply*)state_buffer);
connect_reply.features = ceph_sanitize_features(connect_reply.features);
ldout(async_msgr->cct, 20) << __func__ << " connect got reply tag " << (int)connect_reply.tag
break;
}
- authorizer_reply.push_back(state_buffer);
+ authorizer_reply.append(state_buffer, connect_reply.authorizer_len);
bufferlist::iterator iter = authorizer_reply.begin();
if (authorizer && !authorizer->verify_reply(iter)) {
ldout(async_msgr->cct, 0) << __func__ << " failed verifying authorize reply" << dendl;
break;
}
- newly_acked_seq = *((uint64_t*)state_buffer.c_str());
+ newly_acked_seq = *((uint64_t*)state_buffer);
ldout(async_msgr->cct, 2) << __func__ << " got newly_acked_seq " << newly_acked_seq
<< " vs out_seq " << out_seq << dendl;
while (newly_acked_seq > out_seq) {
break;
}
- if (memcmp(state_buffer.c_str(), CEPH_BANNER, strlen(CEPH_BANNER))) {
- ldout(async_msgr->cct, 1) << __func__ << " accept peer sent bad banner '" << state_buffer.c_str()
+ if (memcmp(state_buffer, CEPH_BANNER, strlen(CEPH_BANNER))) {
+ ldout(async_msgr->cct, 1) << __func__ << " accept peer sent bad banner '" << state_buffer
<< "' (should be '" << CEPH_BANNER << "')" << dendl;
goto fail;
}
- addr_bl.append(state_buffer, strlen(CEPH_BANNER), sizeof(peer_addr));
+ addr_bl.append(state_buffer+strlen(CEPH_BANNER), sizeof(peer_addr));
{
bufferlist::iterator ti = addr_bl.begin();
::decode(peer_addr, ti);
break;
}
- connect_msg = *((ceph_msg_connect*)state_buffer.c_str());
+ connect_msg = *((ceph_msg_connect*)state_buffer);
// sanitize features
connect_msg.features = ceph_sanitize_features(connect_msg.features);
state = STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH;
} else if (r > 0) {
break;
}
- authorizer_bl.push_back(state_buffer);
+ authorizer_bl.append(state_buffer, connect_msg.authorizer_len);
}
ldout(async_msgr->cct, 20) << __func__ << " accept got peer connect_seq "
break;
}
- newly_acked_seq = *((uint64_t*)state_buffer.c_str());
+ newly_acked_seq = *((uint64_t*)state_buffer);
ldout(async_msgr->cct, 2) << __func__ << " accept get newly_acked_seq " << newly_acked_seq << dendl;
discard_requeued_up_to(newly_acked_seq);
state = STATE_ACCEPTING_READY;
// requeue sent items
requeue_sent();
+ recv_start = recv_end = 0;
+ state_offset = 0;
outcoming_bl.clear();
if (policy.standby && !is_queued()) {
ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl;