From: Haomai Wang Date: Thu, 28 May 2015 17:24:36 +0000 (+0800) Subject: AsyncConnection: Make header insert when sending X-Git-Tag: v9.0.2~23^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b8954e6efbee3163e1596079f212c0232f62c35d;p=ceph.git AsyncConnection: Make header insert when sending Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index eed275db5961..43b4ca515f28 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -1804,7 +1804,6 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a existing->lock.Unlock(); return 0; } - existing->write_lock.Unlock(); existing->lock.Unlock(); open: @@ -1848,9 +1847,10 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a if (reply.authorizer_len) reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length()); - uint64_t s = in_seq.read(); - if (reply.tag == CEPH_MSGR_TAG_SEQ) + if (reply.tag == CEPH_MSGR_TAG_SEQ) { + uint64_t s = in_seq.read(); reply_bl.append((char*)&s, sizeof(s)); + } lock.Unlock(); // Because "replacing" will prevent other connections preempt this addr, @@ -1963,15 +1963,13 @@ int AsyncConnection::send_message(Message *m) prepare_send_message(f, m, bl); Mutex::Locker l(write_lock); - m->set_req(out_seq.inc()); + m->set_seq(out_seq.inc()); // "features" changes will change the payload encoding if (can_write == NOWRITE || get_features() != f) { // ensure the correctness of message encoding bl.clear(); ldout(async_msgr->cct, 5) << __func__ << " clear encoded buffer, can_write=" << can_write << " previous " - << f << " != " << get_features() << dedl; - } else { - inject_msg_header_crc(m, bl); + << f << " != " << get_features() << dendl; } if (!is_queued() && can_write == CANWRITE) { if (write_message(m, bl) < 0) { @@ -1999,11 +1997,11 @@ void AsyncConnection::requeue_sent() list >& rq = out_q[CEPH_MSG_PRIO_HIGHEST]; while (!sent.empty()) { - pair p = sent.back(); + Message* m = sent.back(); sent.pop_back(); - ldout(async_msgr->cct, 10) << __func__ << " " << *(p.second) << " for resend seq " - << " (" << p.second->get_seq() << ")" << dendl; - rq.push_front(p); + ldout(async_msgr->cct, 10) << __func__ << " " << *m << " for resend " + << " (" << m->get_seq() << ")" << dendl; + rq.push_front(make_pair(bufferlist(), m)); } } @@ -2036,9 +2034,9 @@ void AsyncConnection::discard_out_queue() ldout(async_msgr->cct, 10) << __func__ << " started" << dendl; assert(write_lock.is_locked()); - for (list >::iterator p = sent.begin(); p != sent.end(); ++p) { - ldout(async_msgr->cct, 20) << __func__ << " discard " << p->second << dendl; - p->second->put(); + for (list::iterator p = sent.begin(); p != sent.end(); ++p) { + ldout(async_msgr->cct, 20) << __func__ << " discard " << *p << dendl; + (*p)->put(); } sent.clear(); for (map > >::iterator p = out_q.begin(); p != out_q.end(); ++p) @@ -2072,7 +2070,6 @@ void AsyncConnection::fault() { if (state == STATE_CLOSED) { ldout(async_msgr->cct, 10) << __func__ << " state is already " << get_state_name(state) << dendl; - center->dispatch_event_external(reset_handler); return ; } @@ -2200,7 +2197,6 @@ void AsyncConnection::_stop() void AsyncConnection::prepare_send_message(uint64_t features, Message *m, bufferlist &bl) { - assert(write_lock.is_locked()); ldout(async_msgr->cct, 20) << __func__ << " m" << " " << *m << dendl; // associate message with Connection (for benefit of encode_payload) @@ -2243,25 +2239,7 @@ void AsyncConnection::prepare_send_message(uint64_t features, Message *m, buffer } } - // send tag - char tag = CEPH_MSGR_TAG_MSG; - bl.append(&tag, sizeof(tag)); - - ceph_msg_header_old oldheader; - if (has_feature(CEPH_FEATURE_NOSRCADDR)) { - bl.append((char*)&header, sizeof(header)); - } else { - memcpy(&oldheader, &header, sizeof(header)); - oldheader.src.name = header.src; - oldheader.src.addr = get_peer_addr(); - oldheader.orig_src = oldheader.src; - oldheader.reserved = header.reserved; - // delay crc calculate to "inject_msg_header_crc" - oldheader.crc = 0; - bl.append((char*)&oldheader, sizeof(oldheader)); - } - - bl.claim_append(m->get_payload()); + bl.append(m->get_payload()); bl.append(m->get_middle()); bl.append(m->get_data()); @@ -2288,15 +2266,40 @@ int AsyncConnection::write_message(Message *m, bufferlist& bl) { assert(write_lock.is_locked()); assert(can_write == CANWRITE); + if (!policy.lossy) { // put on sent list - sent.push_back(make_pair(bl, m)); + sent.push_back(m); m->get(); } + bufferlist complete_bl; + // send tag + char tag = CEPH_MSGR_TAG_MSG; + complete_bl.append(&tag, sizeof(tag)); + + m->calc_header_crc(); + ceph_msg_header& header = m->get_header(); + if (has_feature(CEPH_FEATURE_NOSRCADDR)) { + complete_bl.append((char*)&header, sizeof(header)); + } else { + ceph_msg_header_old oldheader; + memcpy(&oldheader, &header, sizeof(header)); + oldheader.src.name = header.src; + oldheader.src.addr = get_peer_addr(); + oldheader.orig_src = oldheader.src; + oldheader.reserved = header.reserved; + // delay crc calculate to "inject_msg_header_crc" + oldheader.crc = ceph_crc32c(0, (unsigned char*)&oldheader, + sizeof(oldheader) - sizeof(oldheader.crc)); + complete_bl.append((char*)&oldheader, sizeof(oldheader)); + } + + complete_bl.claim_append(bl); + ldout(async_msgr->cct, 20) << __func__ << " sending " << m->get_seq() << " " << m << dendl; - int rc = _try_send(bl); + int rc = _try_send(complete_bl); if (rc < 0) { ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", " << cpp_strerror(errno) << dendl; @@ -2315,13 +2318,13 @@ void AsyncConnection::handle_ack(uint64_t seq) ldout(async_msgr->cct, 15) << __func__ << " got ack seq " << seq << dendl; // trim sent list Mutex::Locker l(write_lock); - while (!sent.empty() && sent.front().second->get_seq() <= seq) { - pair p = sent.front(); + while (!sent.empty() && sent.front()->get_seq() <= seq) { + Message* m = sent.front(); sent.pop_front(); ldout(async_msgr->cct, 10) << __func__ << " got ack seq " - << seq << " >= " << p.second->get_seq() << " on " - << p.second << " " << *(p.second) << dendl; - p.second->put(); + << seq << " >= " << m->get_seq() << " on " + << m << " " << *m << dendl; + m->put(); } } @@ -2387,15 +2390,14 @@ void AsyncConnection::handle_write() if (!m) break; - // send_message may not encode message - if (!data.length()) { + // send_message or requeue messages may not encode message + if (!data.length()) prepare_send_message(get_features(), m, data); - inject_msg_header_crc(m, bl); - } r = write_message(m, data); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl; + write_lock.Unlock(); goto fail; } else if (r > 0) { break; @@ -2409,30 +2411,19 @@ void AsyncConnection::handle_write() bl.append(CEPH_MSGR_TAG_ACK); bl.append((char*)&s, sizeof(s)); ldout(async_msgr->cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl; - ack_left.sub(ack_left); + ack_left.sub(left); r = _try_send(bl); } else if (is_queued()) { r = _try_send(bl); } + write_lock.Unlock(); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl; goto fail; } - write_lock.Unlock(); } else { write_lock.Unlock(); - - if (async_msgr->cct->_conf->ms_inject_internal_delays) { - if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) { - ldout(msgr->cct, 10) << __func__ << " sleep for " - << async_msgr->cct->_conf->ms_inject_internal_delays << dendl; - utime_t t; - t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays); - t.sleep(); - } - } - lock.Lock(); write_lock.Lock(); if (state == STATE_STANDBY && !policy.server && is_queued()) { @@ -2444,7 +2435,9 @@ void AsyncConnection::handle_write() if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl; write_lock.Unlock(); - goto fail; + fault(); + lock.Unlock(); + return ; } } write_lock.Unlock(); @@ -2452,7 +2445,9 @@ void AsyncConnection::handle_write() } return ; + fail: + lock.Lock(); fault(); lock.Unlock(); } diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 9a6d24164824..64c2921d904d 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -55,20 +55,6 @@ class AsyncConnection : public Connection { int _try_send(bufferlist &bl, bool send=true); int _send(Message *m); void prepare_send_message(uint64_t features, Message *m, bufferlist &bl); -#define HEADER_CRC_OFF (sizeof(char)+offset(ceph_msg_header, crc)) - void inject_msg_header_crc(m, bl) { - if (msgr->crcflags & MSG_CRC_HEADER) { - if (has_feature(CEPH_FEATURE_NOSRCADDR)) { - __le32 *header_crc = static_cast<__le32*>(&bl[HEADER_CRC_OFF]); - m->calc_header_crc(); - *header_crc = m->get_header().crc; - } else { - ceph_msg_header_old *oldheader = static_cast(&bl[sizeof(char)]); - oldheader->crc = ceph_crc32c(0, (unsigned char*)oldheader, - sizeof(*oldheader) - sizeof(oldheader->crc)); - } - } - } int read_until(uint64_t needed, char *p); int _process_connection(); void _connect(); @@ -249,7 +235,7 @@ class AsyncConnection : public Connection { } can_write; bool open_write; map > > out_q; // priority queue for outbound msgs - list > sent; // the first bufferlist need to inject seq + list sent; // the first bufferlist need to inject seq list local_messages; // local deliver bufferlist outcoming_bl; bool keepalive;