From: Haomai Wang Date: Tue, 9 Dec 2014 05:16:52 +0000 (+0800) Subject: AsyncConnection: Using buffer read to avoid small read overhead X-Git-Tag: v0.93~247^2~25 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d93bdade3ec07ee438d36a9f8c931279e3408475;p=ceph.git AsyncConnection: Using buffer read to avoid small read overhead Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index b071d79abc00..bdb1296f4d3b 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -39,6 +39,8 @@ ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) { << ")."; } +const int AsyncConnection::TCP_PREFETCH_MIN_SIZE = 512; + class C_time_wakeup : public EventCallback { AsyncConnectionRef conn; @@ -185,10 +187,11 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c) : Connection(cct, m), async_msgr(m), global_seq(0), connect_seq(0), peer_global_seq(0), out_seq(0), in_seq(0), in_seq_acked(0), state(STATE_NONE), state_after_send(0), sd(-1), - port(-1), lock("AsyncConnection::lock"), open_write(false), keepalive(false), - stop_lock("AsyncConnection::stop_lock"), - got_bad_auth(false), authorizer(NULL), replacing(false), stopping(0), - state_buffer(4096), state_offset(0), net(cct), center(c) + port(-1), lock("AsyncConnection::lock"), open_write(false), keepalive(false), recv_buf(NULL), + recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)), + recv_start(0), recv_end(0), stop_lock("AsyncConnection::stop_lock"), + got_bad_auth(false), authorizer(NULL), replacing(false), + state_buffer(NULL), state_offset(0), net(cct), center(c) { read_handler.reset(new C_handle_read(this)); write_handler.reset(new C_handle_write(this)); @@ -200,11 +203,18 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCente accept_handler.reset(new C_deliver_accept(async_msgr, this)); local_deliver_handler.reset(new C_local_deliver(this)); memset(msgvec, 0, sizeof(msgvec)); + // double recv_max_prefetch see "read_until" + recv_buf = new char[2*recv_max_prefetch]; + state_buffer = new char[4096]; } AsyncConnection::~AsyncConnection() { assert(!authorizer); + if (recv_buf) + delete recv_buf; + if (state_buffer) + delete state_buffer; } /* return -1 means `fd` occurs error or closed, it should be closed @@ -364,31 +374,77 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send) // the needed buffer, so the passed in bufferptr must be the same. // Normally, only "read_message" will pass existing bufferptr in // +// And it will uses readahead method to reduce small read overhead, +// "recv_buf" is used to store read buffer +// // return the remaining bytes, 0 means this buffer is finished // else return < 0 means error -int AsyncConnection::read_until(uint64_t needed, bufferptr &p) +int AsyncConnection::read_until(uint64_t len, char *p) { - assert(needed); - int offset = state_offset; - int left = needed - offset; - int r; - do { - r = read_bulk(sd, p.c_str()+offset, left); - if (r < 0) { - ldout(async_msgr->cct, 1) << __func__ << " read failed, state is " << get_state_name(state) << dendl; - return -1; - } else if (r == left) { - state_offset = 0; + assert(len); + ldout(async_msgr->cct, 20) << __func__ << " len is " << len << " state_offset is " + << state_offset << dendl; + int r = 0; + uint64_t left = len - state_offset; + if (recv_end > recv_start) { + assert(state_offset == 0); + uint64_t to_read = MIN(recv_end - recv_start, left); + memcpy(p, recv_buf+recv_start, to_read); + recv_start += to_read; + left -= to_read; + ldout(async_msgr->cct, 20) << __func__ << " got " << to_read << " in buffer " + << " left is " << left << " buffer still has " + << recv_end - recv_start << dendl; + if (left == 0) { return 0; } - left -= r; - offset += r; - } while (r > 0); - - state_offset = offset; - ldout(async_msgr->cct, 20) << __func__ << " read " << r << " bytes, state is " - << get_state_name(state) << dendl; - return needed - offset; + state_offset += to_read; + } + + assert(recv_end == recv_start); + recv_end = recv_start = 0; + /* nothing left in the prefetch buffer */ + if (len > recv_max_prefetch) { + /* this was a large read, we don't prefetch for these */ + do { + r = read_bulk(sd, p+state_offset, left); + ldout(async_msgr->cct, 20) << __func__ << " read_bulk left is " << left << " got " << r << dendl; + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read failed, state is " << get_state_name(state) << dendl; + return -1; + } else if (r == static_cast(left)) { + state_offset = 0; + return 0; + } + state_offset += r; + left -= r; + } while (r > 0); + } else { + do { + r = read_bulk(sd, recv_buf+recv_end, recv_max_prefetch); + ldout(async_msgr->cct, 20) << __func__ << " read_bulk recv_end is " << recv_end + << " left is " << left << " got " << r << dendl; + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " read failed, state is " << get_state_name(state) << dendl; + return -1; + } + recv_end += r; + if (r >= static_cast(left)) { + recv_start = len - state_offset; + memcpy(p+state_offset, recv_buf, recv_start); + state_offset = 0; + return 0; + } + left -= r; + } while (r > 0); + memcpy(p+state_offset, recv_buf, recv_end-recv_start); + state_offset += (recv_end - recv_start); + recv_end = recv_start = 0; + } + ldout(async_msgr->cct, 20) << __func__ << " need len " << len << " remaining " + << len - state_offset << " bytes, state is " + << get_state_name(state) << dendl; + return len - state_offset; } void AsyncConnection::process() @@ -404,15 +460,14 @@ void AsyncConnection::process() case STATE_OPEN: { char tag = -1; - r = read_bulk(sd, &tag, sizeof(tag)); + r = read_until(sizeof(tag), &tag); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " read tag failed, state is " << get_state_name(state) << dendl; goto fail; - } else if (r == 0) { + } else if (r > 0) { break; } - assert(r == 1); if (tag == CEPH_MSGR_TAG_KEEPALIVE) { ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE" << dendl; @@ -446,7 +501,7 @@ void AsyncConnection::process() } ldout(async_msgr->cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl; - t = (ceph_timespec*)(state_buffer.c_str()); + t = (ceph_timespec*)state_buffer; utime_t kp_t = utime_t(*t); _send_keepalive_or_ack(true, &kp_t); ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl; @@ -465,7 +520,7 @@ void AsyncConnection::process() break; } - t = (ceph_timespec*)(state_buffer.c_str()); + t = (ceph_timespec*)state_buffer; last_keepalive_ack = utime_t(*t); ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl; state = STATE_OPEN; @@ -483,7 +538,7 @@ void AsyncConnection::process() break; } - seq = (ceph_le64*)(state_buffer.c_str()); + seq = (ceph_le64*)state_buffer; ldout(async_msgr->cct, 20) << __func__ << " got ACK" << dendl; handle_ack(*seq); state = STATE_OPEN; @@ -513,11 +568,11 @@ void AsyncConnection::process() ldout(async_msgr->cct, 20) << __func__ << " got MSG header" << dendl; if (has_feature(CEPH_FEATURE_NOSRCADDR)) { - header = *((ceph_msg_header*)state_buffer.c_str()); + header = *((ceph_msg_header*)state_buffer); header_crc = ceph_crc32c(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc)); } else { - oldheader = *((ceph_msg_header_old*)state_buffer.c_str()); + oldheader = *((ceph_msg_header_old*)state_buffer); // this is fugly memcpy(&header, &oldheader, sizeof(header)); header.src = oldheader.src.name; @@ -588,7 +643,7 @@ void AsyncConnection::process() int front_len = current_header.front_len; if (front_len) { bufferptr ptr = buffer::create(front_len); - r = read_until(front_len, ptr); + r = read_until(front_len, ptr.c_str()); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " read message front failed" << dendl; goto fail; @@ -609,7 +664,7 @@ void AsyncConnection::process() int middle_len = current_header.middle_len; if (middle_len) { bufferptr ptr = buffer::create(middle_len); - r = read_until(middle_len, ptr); + r = read_until(middle_len, ptr.c_str()); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " read message middle failed" << dendl; goto fail; @@ -658,7 +713,7 @@ void AsyncConnection::process() while (msg_left > 0) { bufferptr bp = data_blp.get_current_ptr(); uint64_t read = MIN(bp.length(), msg_left); - r = read_until(read, bp); + r = read_until(read, bp.c_str()); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " read data error " << dendl; goto fail; @@ -697,9 +752,9 @@ void AsyncConnection::process() } if (has_feature(CEPH_FEATURE_MSG_AUTH)) { - footer = *((ceph_msg_footer*)state_buffer.c_str()); + footer = *((ceph_msg_footer*)state_buffer); } else { - old_footer = *((ceph_msg_footer_old*)state_buffer.c_str()); + old_footer = *((ceph_msg_footer_old*)state_buffer); footer.front_crc = old_footer.front_crc; footer.middle_crc = old_footer.middle_crc; footer.data_crc = old_footer.data_crc; @@ -910,7 +965,7 @@ int AsyncConnection::_process_connection() break; } - if (memcmp(state_buffer.c_str(), CEPH_BANNER, strlen(CEPH_BANNER))) { + if (memcmp(state_buffer, CEPH_BANNER, strlen(CEPH_BANNER))) { ldout(async_msgr->cct, 0) << __func__ << " connect protocol error (bad banner) on peer " << get_peer_addr() << dendl; goto fail; @@ -919,7 +974,7 @@ int AsyncConnection::_process_connection() ldout(async_msgr->cct, 10) << __func__ << " get banner, ready to send banner" << dendl; bufferlist bl; - bl.append(state_buffer.c_str(), strlen(CEPH_BANNER)); + bl.append(state_buffer, strlen(CEPH_BANNER)); r = _try_send(bl); if (r == 0) { state = STATE_CONNECTING_WAIT_IDENTIFY_PEER; @@ -950,7 +1005,7 @@ int AsyncConnection::_process_connection() } bufferlist bl; - bl.append(state_buffer); + bl.append(state_buffer, sizeof(paddr)*2); bufferlist::iterator p = bl.begin(); try { ::decode(paddr, p); @@ -1057,7 +1112,7 @@ int AsyncConnection::_process_connection() break; } - connect_reply = *((ceph_msg_connect_reply*)state_buffer.c_str()); + connect_reply = *((ceph_msg_connect_reply*)state_buffer); connect_reply.features = ceph_sanitize_features(connect_reply.features); ldout(async_msgr->cct, 20) << __func__ << " connect got reply tag " << (int)connect_reply.tag @@ -1083,7 +1138,7 @@ int AsyncConnection::_process_connection() break; } - authorizer_reply.push_back(state_buffer); + authorizer_reply.append(state_buffer, connect_reply.authorizer_len); bufferlist::iterator iter = authorizer_reply.begin(); if (authorizer && !authorizer->verify_reply(iter)) { ldout(async_msgr->cct, 0) << __func__ << " failed verifying authorize reply" << dendl; @@ -1112,7 +1167,7 @@ int AsyncConnection::_process_connection() break; } - newly_acked_seq = *((uint64_t*)state_buffer.c_str()); + newly_acked_seq = *((uint64_t*)state_buffer); ldout(async_msgr->cct, 2) << __func__ << " got newly_acked_seq " << newly_acked_seq << " vs out_seq " << out_seq << dendl; while (newly_acked_seq > out_seq) { @@ -1231,13 +1286,13 @@ int AsyncConnection::_process_connection() break; } - if (memcmp(state_buffer.c_str(), CEPH_BANNER, strlen(CEPH_BANNER))) { - ldout(async_msgr->cct, 1) << __func__ << " accept peer sent bad banner '" << state_buffer.c_str() + if (memcmp(state_buffer, CEPH_BANNER, strlen(CEPH_BANNER))) { + ldout(async_msgr->cct, 1) << __func__ << " accept peer sent bad banner '" << state_buffer << "' (should be '" << CEPH_BANNER << "')" << dendl; goto fail; } - addr_bl.append(state_buffer, strlen(CEPH_BANNER), sizeof(peer_addr)); + addr_bl.append(state_buffer+strlen(CEPH_BANNER), sizeof(peer_addr)); { bufferlist::iterator ti = addr_bl.begin(); ::decode(peer_addr, ti); @@ -1267,7 +1322,7 @@ int AsyncConnection::_process_connection() break; } - connect_msg = *((ceph_msg_connect*)state_buffer.c_str()); + connect_msg = *((ceph_msg_connect*)state_buffer); // sanitize features connect_msg.features = ceph_sanitize_features(connect_msg.features); state = STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH; @@ -1286,7 +1341,7 @@ int AsyncConnection::_process_connection() } else if (r > 0) { break; } - authorizer_bl.push_back(state_buffer); + authorizer_bl.append(state_buffer, connect_msg.authorizer_len); } ldout(async_msgr->cct, 20) << __func__ << " accept got peer connect_seq " @@ -1319,7 +1374,7 @@ int AsyncConnection::_process_connection() break; } - newly_acked_seq = *((uint64_t*)state_buffer.c_str()); + newly_acked_seq = *((uint64_t*)state_buffer); ldout(async_msgr->cct, 2) << __func__ << " accept get newly_acked_seq " << newly_acked_seq << dendl; discard_requeued_up_to(newly_acked_seq); state = STATE_ACCEPTING_READY; @@ -1855,6 +1910,8 @@ void AsyncConnection::fault() // requeue sent items requeue_sent(); + recv_start = recv_end = 0; + state_offset = 0; outcoming_bl.clear(); if (policy.standby && !is_queued()) { ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl; diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index c3572c4f1713..9b40a692f6f5 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -50,7 +50,7 @@ class AsyncConnection : public Connection { // the main usage is avoid error happen outside messenger threads int _try_send(bufferlist bl, bool send=true); int _send(Message *m); - int read_until(uint64_t needed, bufferptr &p); + int read_until(uint64_t needed, char *p); int _process_connection(); void _connect(); void _stop(); @@ -195,6 +195,7 @@ class AsyncConnection : public Connection { STATE_WAIT, // just wait for racing connection }; + static const int TCP_PREFETCH_MIN_SIZE; static const char *get_state_name(int state) { const char* const statenames[] = {"STATE_NONE", "STATE_OPEN", @@ -263,6 +264,10 @@ class AsyncConnection : public Connection { EventCallbackRef local_deliver_handler; bool keepalive; struct iovec msgvec[IOV_MAX]; + char *recv_buf; + uint32_t recv_max_prefetch; + uint32_t recv_start; + uint32_t recv_end; Mutex stop_lock; // used to protect `mark_down_cond` Cond stop_cond; set register_time_events; // need to delete it if stop @@ -294,7 +299,7 @@ class AsyncConnection : public Connection { atomic_t stopping; // used only for local state, it will be overwrite when state transition - bufferptr state_buffer; + char *state_buffer; // used only by "read_until" uint64_t state_offset; bufferlist outcoming_bl;