From: Haomai Wang Date: Tue, 26 May 2015 07:03:58 +0000 (+0800) Subject: AsyncConnection: Avoid encoding message with lock holding X-Git-Tag: v9.0.2~23^2~12 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=84b9088e164b39919065194831455e441a06ef85;p=ceph.git AsyncConnection: Avoid encoding message with lock holding Signed-off-by: Haomai Wang --- diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 8373432bc5c4..9ef88ab8d340 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -39,6 +39,9 @@ ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) { << ")."; } +// Notes: +// 1. Don't dispatch any event when closed! It may cause AsyncConnection alive even if AsyncMessenger dead + const int AsyncConnection::TCP_PREFETCH_MIN_SIZE = 512; class C_time_wakeup : public EventCallback { @@ -100,12 +103,7 @@ class C_handle_dispatch : public EventCallback { public: C_handle_dispatch(AsyncMessenger *msgr, Message *m): msgr(msgr), m(m) {} void do_request(int id) { - //msgr->ms_fast_preprocess(m); - //if (msgr->ms_can_fast_dispatch(m)) { - // msgr->ms_fast_dispatch(m); - //} else { - msgr->ms_deliver_dispatch(m); - //} + msgr->ms_deliver_dispatch(m); } }; @@ -176,8 +174,9 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c) : Connection(cct, m), async_msgr(m), global_seq(0), connect_seq(0), peer_global_seq(0), - out_seq(0), in_seq(0), in_seq_acked(0), state(STATE_NONE), state_after_send(0), sd(-1), - port(-1), lock("AsyncConnection::lock"), open_write(false), keepalive(false), recv_buf(NULL), + out_seq(0), in_seq(0), in_seq_acked(0), state(STATE_NONE), state_after_send(0), + sd(-1), port(-1), write_lock("AsyncConnection::write_lock"), can_write(0), + open_write(false), lock("AsyncConnection::lock"), keepalive(false), recv_buf(NULL), recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)), recv_start(0), recv_end(0), got_bad_auth(false), authorizer(NULL), replacing(false), is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), center(c) @@ -272,7 +271,7 @@ int AsyncConnection::do_sendmsg(struct msghdr &msg, int len, bool more) // else return < 0 means error int AsyncConnection::_try_send(bufferlist send_bl, bool send) { - assert(lock.is_locked()); + assert(write_lock.is_locked()); if (send_bl.length()) { if (outcoming_bl.length()) outcoming_bl.claim_append(send_bl); @@ -307,7 +306,7 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send) } } - uint64_t sent = 0; + uint64_t sent_bytes = 0; list::const_iterator pb = outcoming_bl.buffers().begin(); uint64_t left_pbrs = outcoming_bl.buffers().size(); while (left_pbrs) { @@ -332,7 +331,7 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send) return r; // "r" is the remaining length - sent += msglen - r; + sent_bytes += msglen - r; if (r > 0) { ldout(async_msgr->cct, 5) << __func__ << " remaining " << r << " needed to be sent, creating event for writing" @@ -343,14 +342,14 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send) } // trim already sent for outcoming_bl - if (sent) { + if (sent_bytes) { bufferlist bl; - if (sent < outcoming_bl.length()) - outcoming_bl.splice(sent, outcoming_bl.length()-sent, &bl); + if (sent_bytes < outcoming_bl.length()) + outcoming_bl.splice(sent_bytes, outcoming_bl.length()-sent_bytes, &bl); bl.swap(outcoming_bl); } - ldout(async_msgr->cct, 20) << __func__ << " sent bytes " << sent + ldout(async_msgr->cct, 20) << __func__ << " sent bytes " << sent_bytes << " remaining bytes " << outcoming_bl.length() << dendl; if (!open_write && is_queued()) { @@ -507,7 +506,9 @@ void AsyncConnection::process() ldout(async_msgr->cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl; t = (ceph_timespec*)state_buffer; utime_t kp_t = utime_t(*t); + write_lock.Lock(); _send_keepalive_or_ack(true, &kp_t); + write_lock.Unlock(); ldout(async_msgr->cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl; state = STATE_OPEN; break; @@ -955,6 +956,7 @@ int AsyncConnection::_process_connection() switch(state) { case STATE_WAIT_SEND: { + Mutex::Locker l(write_lock); if (!outcoming_bl.length()) { assert(state_after_send); state = state_after_send; @@ -1016,7 +1018,7 @@ int AsyncConnection::_process_connection() bufferlist bl; bl.append(state_buffer, strlen(CEPH_BANNER)); - r = _try_send(bl); + r = try_send(bl); if (r == 0) { state = STATE_CONNECTING_WAIT_IDENTIFY_PEER; ldout(async_msgr->cct, 10) << __func__ << " connect write banner done: " @@ -1091,7 +1093,7 @@ int AsyncConnection::_process_connection() } ::encode(async_msgr->get_myaddr(), myaddrbl); - r = _try_send(myaddrbl); + r = try_send(myaddrbl); if (r == 0) { state = STATE_CONNECTING_SEND_CONNECT_MSG; ldout(async_msgr->cct, 10) << __func__ << " connect sent my addr " @@ -1139,7 +1141,7 @@ int AsyncConnection::_process_connection() ldout(async_msgr->cct, 10) << __func__ << " connect sending gseq=" << global_seq << " cseq=" << connect_seq << " proto=" << connect_msg.protocol_version << dendl; - r = _try_send(bl); + r = try_send(bl); if (r == 0) { state = STATE_CONNECTING_WAIT_CONNECT_REPLY; ldout(async_msgr->cct,20) << __func__ << " connect wrote (self +) cseq, waiting for reply" << dendl; @@ -1212,7 +1214,6 @@ int AsyncConnection::_process_connection() case STATE_CONNECTING_WAIT_ACK_SEQ: { uint64_t newly_acked_seq = 0; - bufferlist bl; r = read_until(sizeof(newly_acked_seq), state_buffer); if (r < 0) { @@ -1224,19 +1225,21 @@ int AsyncConnection::_process_connection() newly_acked_seq = *((uint64_t*)state_buffer); ldout(async_msgr->cct, 2) << __func__ << " got newly_acked_seq " << newly_acked_seq - << " vs out_seq " << out_seq << dendl; - while (newly_acked_seq > out_seq) { - Message *m = _get_next_outgoing(); - assert(m); - ldout(async_msgr->cct, 2) << __func__ << " discarding previously sent " << m->get_seq() - << " " << *m << dendl; - assert(m->get_seq() <= newly_acked_seq); - m->put(); - ++out_seq; - } + << " vs out_seq " << out_seq.read() << dendl; + discard_requeued_up_to(newly_acked_seq); + //while (newly_acked_seq > out_seq.read()) { + // Message *m = _get_next_outgoing(NULL); + // assert(m); + // ldout(async_msgr->cct, 2) << __func__ << " discarding previously sent " << m->get_seq() + // << " " << *m << dendl; + // assert(m->get_seq() <= newly_acked_seq); + // m->put(); + // out_seq.inc(); + //} + bufferlist bl; bl.append((char*)&in_seq, sizeof(in_seq)); - r = _try_send(bl); + r = try_send(bl); if (r == 0) { state = STATE_CONNECTING_READY; ldout(async_msgr->cct, 10) << __func__ << " send in_seq done " << dendl; @@ -1283,8 +1286,11 @@ int AsyncConnection::_process_connection() // message may in queue between last _try_send and connection ready // write event may already notify and we need to force scheduler again + write_lock.Lock(); + can_write = 1; if (is_queued()) center->dispatch_event_external(write_handler); + write_lock.Unlock(); break; } @@ -1313,7 +1319,7 @@ int AsyncConnection::_process_connection() ::encode(socket_addr, bl); ldout(async_msgr->cct, 1) << __func__ << " sd=" << sd << " " << socket_addr << dendl; - r = _try_send(bl); + r = try_send(bl); if (r == 0) { state = STATE_ACCEPTING_WAIT_BANNER_ADDR; ldout(async_msgr->cct, 10) << __func__ << " write banner and addr done: " @@ -1442,6 +1448,9 @@ int AsyncConnection::_process_connection() ldout(async_msgr->cct, 20) << __func__ << " accept done" << dendl; state = STATE_OPEN; memset(&connect_msg, 0, sizeof(connect_msg)); + write_lock.Lock(); + can_write = 1; + write_lock.Unlock(); break; } @@ -1743,6 +1752,8 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a existing->center->dispatch_event_external(existing->reset_handler); existing->_stop(); } else { + assert(can_write == 0); + existing->write_lock.Lock(true); // queue a reset on the new connection, which we're dumping for the old center->dispatch_event_external(reset_handler); @@ -1767,6 +1778,8 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a existing->requeue_sent(); swap(existing->sd, sd); + swap(existing->can_write, can_write); + existing->can_write = 0; existing->open_write = false; existing->replacing = true; existing->state_offset = 0; @@ -1776,6 +1789,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a // there shouldn't exist any buffer assert(recv_start == recv_end); + existing->write_lock.Unlock(); if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) { // handle error existing->fault(); @@ -1785,6 +1799,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a existing->lock.Unlock(); return 0; } + existing->write_lock.Unlock(); existing->lock.Unlock(); open: @@ -1858,7 +1873,7 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a goto fail_registered; } - r = _try_send(reply_bl); + r = try_send(reply_bl); if (r < 0) goto fail_registered; @@ -1919,34 +1934,40 @@ void AsyncConnection::accept(int incoming) int AsyncConnection::send_message(Message *m) { ldout(async_msgr->cct, 10) << __func__ << dendl; + + if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection + ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl; + Mutex::Locker l(write_lock); + local_messages.push_back(m); + center->dispatch_event_external(local_deliver_handler); + return 0; + } + + bufferlist bl; + Mutex::Locker l(write_lock); + m->set_seq(out_seq.inc()); m->get_header().src = async_msgr->get_myname(); if (!m->get_priority()) m->set_priority(async_msgr->get_default_send_priority()); + if (can_write == 1) + prepare_send_message(m, bl); - Mutex::Locker l(lock); - if (!is_queued() && state >= STATE_OPEN && state <= STATE_OPEN_TAG_CLOSE) { + if (!is_queued() && can_write == 1) { ldout(async_msgr->cct, 10) << __func__ << " try send msg " << m << dendl; - int r = _send(m); + int r = write_message(m, bl); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl; // we want to handle fault within internal thread center->dispatch_event_external(write_handler); } - } else if (state == STATE_CLOSED) { + } else if (can_write == 2) { ldout(async_msgr->cct, 10) << __func__ << " connection closed." << " Drop message " << m << dendl; m->put(); - } else if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection - ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl; - local_messages.push_back(m); - center->dispatch_event_external(local_deliver_handler); } else { - out_q[m->get_priority()].push_back(m); - if (state == STATE_STANDBY && !policy.server) { - ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state) - << " policy.server is false" << dendl; - _connect(); - } else if (sd >= 0 && !open_write) { + out_q[m->get_priority()].push_back(make_pair(bl, m)); + if (can_write == 0) { + ldout(async_msgr->cct, 10) << __func__ << " write is denied, reschedule m=" << m << dendl; center->dispatch_event_external(write_handler); } } @@ -1955,35 +1976,35 @@ int AsyncConnection::send_message(Message *m) void AsyncConnection::requeue_sent() { + assert(write_lock.is_locked()); if (sent.empty()) return; - list& rq = out_q[CEPH_MSG_PRIO_HIGHEST]; + list >& rq = out_q[CEPH_MSG_PRIO_HIGHEST]; while (!sent.empty()) { - Message *m = sent.back(); + pair p = sent.back(); sent.pop_back(); - ldout(async_msgr->cct, 10) << __func__ << " " << *m << " for resend seq " << out_seq - << " (" << m->get_seq() << ")" << dendl; - rq.push_front(m); - out_seq--; + ldout(async_msgr->cct, 10) << __func__ << " " << *(p.second) << " for resend seq " + << " (" << p.second->get_seq() << ")" << dendl; + rq.push_front(p); } } void AsyncConnection::discard_requeued_up_to(uint64_t seq) { ldout(async_msgr->cct, 10) << __func__ << " " << seq << dendl; + Mutex::Locker l(write_lock); if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0) return; - list& rq = out_q[CEPH_MSG_PRIO_HIGHEST]; + list >& rq = out_q[CEPH_MSG_PRIO_HIGHEST]; while (!rq.empty()) { - Message *m = rq.front(); - if (m->get_seq() == 0 || m->get_seq() > seq) + pair p = rq.front(); + if (p.second->get_seq() == 0 || p.second->get_seq() > seq) break; - ldout(async_msgr->cct, 10) << __func__ << " " << *m << " for resend seq " << out_seq + ldout(async_msgr->cct, 10) << __func__ << " " << *(p.second) << " for resend seq " << p.second->get_seq() << " <= " << seq << ", discarding" << dendl; - m->put(); + p.second->put(); rq.pop_front(); - out_seq++; } if (rq.empty()) out_q.erase(CEPH_MSG_PRIO_HIGHEST); @@ -1997,15 +2018,16 @@ void AsyncConnection::discard_out_queue() { ldout(async_msgr->cct, 10) << __func__ << " started" << dendl; - for (list::iterator p = sent.begin(); p != sent.end(); ++p) { - ldout(async_msgr->cct, 20) << __func__ << " discard " << *p << dendl; - (*p)->put(); + Mutex::Locker l(write_lock); + for (list >::iterator p = sent.begin(); p != sent.end(); ++p) { + ldout(async_msgr->cct, 20) << __func__ << " discard " << p->second << dendl; + p->second->put(); } sent.clear(); - for (map >::iterator p = out_q.begin(); p != out_q.end(); ++p) - for (list::iterator r = p->second.begin(); r != p->second.end(); ++r) { - ldout(async_msgr->cct, 20) << __func__ << " discard " << *r << dendl; - (*r)->put(); + for (map > >::iterator p = out_q.begin(); p != out_q.end(); ++p) + for (list >::iterator r = p->second.begin(); r != p->second.end(); ++r) { + ldout(async_msgr->cct, 20) << __func__ << " discard " << r->second << dendl; + r->second->put(); } out_q.clear(); outcoming_bl.clear(); @@ -2016,13 +2038,15 @@ int AsyncConnection::randomize_out_seq() if (get_features() & CEPH_FEATURE_MSG_AUTH) { // Set out_seq to a random value, so CRC won't be predictable. Don't bother checking seq_error // here. We'll check it on the call. PLR - int seq_error = get_random_bytes((char *)&out_seq, sizeof(out_seq)); - out_seq &= SEQ_MASK; - lsubdout(async_msgr->cct, ms, 10) << __func__ << " randomize_out_seq " << out_seq << dendl; + uint64_t rand_seq; + int seq_error = get_random_bytes((char *)&rand_seq, sizeof(rand_seq)); + rand_seq &= SEQ_MASK; + lsubdout(async_msgr->cct, ms, 10) << __func__ << " randomize_out_seq " << rand_seq << dendl; + out_seq.set(rand_seq); return seq_error; } else { // previously, seq #'s always started at 0. - out_seq = 0; + out_seq.set(0); return 0; } } @@ -2042,12 +2066,14 @@ void AsyncConnection::fault() return ; } + write_lock.Lock(); if (sd >= 0) { shutdown_socket(); center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); ::close(sd); sd = -1; } + can_write = 0; open_write = false; // requeue sent items @@ -2063,15 +2089,19 @@ void AsyncConnection::fault() << " accept state just closed, state=" << get_state_name(state) << dendl; center->dispatch_event_external(reset_handler); + + write_lock.Unlock(); _stop(); return ; } if (policy.standby && !is_queued()) { ldout(async_msgr->cct,0) << __func__ << " with nothing to send, going to standby" << dendl; state = STATE_STANDBY; + write_lock.Unlock(); return; } + write_lock.Unlock(); if (!(state >= STATE_CONNECTING && state < STATE_CONNECTING_READY)) { // policy maybe empty when state is in accept if (policy.server) { @@ -2108,7 +2138,7 @@ void AsyncConnection::was_session_reset() center->dispatch_event_external(remote_reset_handler); if (randomize_out_seq()) { - ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl; + ldout(async_msgr->cct, 15) << __func__ << " could not get random bytes to set seq number for session reset; set seq number to " << out_seq.read() << dendl; } in_seq = 0; @@ -2131,7 +2161,9 @@ void AsyncConnection::_stop() async_msgr->unregister_conn(this); state = STATE_CLOSED; + Mutex::Locker l(write_lock); open_write = false; + can_write = 2; state_offset = 0; if (sd >= 0) { shutdown_socket(); @@ -2145,33 +2177,35 @@ void AsyncConnection::_stop() center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this))); } -int AsyncConnection::_send(Message *m) +void AsyncConnection::prepare_send_message(Message *m, bufferlist &bl) { - m->set_seq(++out_seq); - if (!policy.lossy) { - // put on sent list - sent.push_back(m); - m->get(); - } + assert(write_lock.is_locked()); + ldout(async_msgr->cct, 20) << __func__ << " m" << " " << *m << dendl; // associate message with Connection (for benefit of encode_payload) m->set_connection(this); - uint64_t features = get_features(); if (m->empty_payload()) - ldout(async_msgr->cct, 20) << __func__ << " encoding " << m->get_seq() << " features " << features - << " " << m << " " << *m << dendl; + ldout(async_msgr->cct, 20) << __func__ << " encoding features " + << features << " " << m << " " << *m << dendl; else - ldout(async_msgr->cct, 20) << __func__ << " half-reencoding " << m->get_seq() << " features " - << features << " " << m << " " << *m << dendl; + ldout(async_msgr->cct, 20) << __func__ << " half-reencoding features " + << features << " " << m << " " << *m << dendl; // encode and copy out of *m - m->encode(features, async_msgr->crcflags); + m->encode(features, msgr->crcflags); // prepare everything ceph_msg_header& header = m->get_header(); ceph_msg_footer& footer = m->get_footer(); + ldout(async_msgr->cct, 20) << __func__ << " sending message type=" << header.type + << " src " << entity_name_t(header.src) + << " front=" << header.front_len + << " data=" << header.data_len + << " off " << header.data_off << dendl; + + // Now that we have all the crcs calculated, handle the // digital signature for the message, if the AsyncConnection has session // security set up. Some session security options do not @@ -2181,46 +2215,18 @@ int AsyncConnection::_send(Message *m) ldout(async_msgr->cct, 20) << __func__ << " no session security" << dendl; } else { if (session_security->sign_message(m)) { - ldout(async_msgr->cct, 20) << __func__ << " failed to sign seq # " - << header.seq << "): sig = " << footer.sig << dendl; + ldout(async_msgr->cct, 20) << __func__ << " failed to sign m=" + << m << "): sig = " << footer.sig << dendl; } else { - ldout(async_msgr->cct, 20) << __func__ << " signed seq # " << header.seq - << "): sig = " << footer.sig << dendl; + ldout(async_msgr->cct, 20) << __func__ << " signed m=" << m + << "): sig = " << footer.sig << dendl; } } - bufferlist blist = m->get_payload(); - blist.append(m->get_middle()); - blist.append(m->get_data()); - - ldout(async_msgr->cct, 20) << __func__ << " sending " << m->get_seq() - << " " << m << dendl; - int rc = write_message(header, footer, blist); - - if (rc < 0) { - ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", " - << cpp_strerror(errno) << dendl; - } else if (rc == 0) { - ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " done." << dendl; - } else { - ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " continuely." << dendl; - } - m->put(); - - return rc; -} - -int AsyncConnection::write_message(const ceph_msg_header& header, const ceph_msg_footer& footer, - bufferlist& blist) -{ - bufferlist bl; - int ret; - // send tag char tag = CEPH_MSGR_TAG_MSG; bl.append(&tag, sizeof(tag)); - // send envelope ceph_msg_header_old oldheader; if (has_feature(CEPH_FEATURE_NOSRCADDR)) { bl.append((char*)&header, sizeof(header)); @@ -2239,7 +2245,9 @@ int AsyncConnection::write_message(const ceph_msg_header& header, const ceph_msg bl.append((char*)&oldheader, sizeof(oldheader)); } - bl.claim_append(blist); + bl.claim_append(m->get_payload()); + bl.append(m->get_middle()); + bl.append(m->get_data()); // send footer; if receiver doesn't support signatures, use the old footer format ceph_msg_footer_old old_footer; @@ -2257,26 +2265,46 @@ int AsyncConnection::write_message(const ceph_msg_header& header, const ceph_msg old_footer.flags = footer.flags; bl.append((char*)&old_footer, sizeof(old_footer)); } +} - // send - ret = _try_send(bl); - if (ret < 0) - return ret; +int AsyncConnection::write_message(Message *m, bufferlist& bl) +{ + assert(write_lock.is_locked()); + assert(can_write == 1); + if (!policy.lossy) { + // put on sent list + sent.push_back(make_pair(bl, m)); + m->get(); + } - return ret; + ldout(async_msgr->cct, 20) << __func__ << " sending " << m->get_seq() + << " " << m << dendl; + int rc = _try_send(bl); + if (rc < 0) { + ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", " + << cpp_strerror(errno) << dendl; + } else if (rc == 0) { + ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " done." << dendl; + } else { + ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " continuely." << dendl; + } + m->put(); + + return rc; } void AsyncConnection::handle_ack(uint64_t seq) { ldout(async_msgr->cct, 15) << __func__ << " got ack seq " << seq << dendl; // trim sent list - while (!sent.empty() && sent.front()->get_seq() <= seq) { - Message *m = sent.front(); + Mutex::Locker l(write_lock); + while (!sent.empty() && sent.front().second->get_seq() <= seq) { + pair p = sent.front(); sent.pop_front(); ldout(async_msgr->cct, 10) << __func__ << " got ack seq " - << seq << " >= " << m->get_seq() << " on " - << m << " " << *m << dendl; - m->put(); + << seq << " >= " << p.second->get_seq() << " on " + << p.second << " " << *(p.second) << dendl; + p.second->put(); } } @@ -2299,7 +2327,7 @@ void AsyncConnection::mark_down() void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp) { - assert(lock.is_locked()); + assert(write_lock.is_locked()); bufferlist bl; utime_t t = ceph_clock_now(async_msgr->cct); @@ -2329,6 +2357,8 @@ void AsyncConnection::handle_write() Mutex::Locker l(lock); bufferlist bl; int r = 0; + + write_lock.Lock(); if (state >= STATE_OPEN && state <= STATE_OPEN_TAG_CLOSE) { if (keepalive) { _send_keepalive_or_ack(); @@ -2336,12 +2366,17 @@ void AsyncConnection::handle_write() } while (1) { - Message *m = _get_next_outgoing(); + bufferlist data; + Message *m = _get_next_outgoing(&data); if (!m) break; + // send_message may not encode message + if (!data.length()) + prepare_send_message(m, data); + ldout(async_msgr->cct, 10) << __func__ << " try send msg " << m << dendl; - r = _send(m); + r = write_message(m, data); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl; goto fail; @@ -2366,16 +2401,22 @@ void AsyncConnection::handle_write() ldout(async_msgr->cct, 1) << __func__ << " send msg failed" << dendl; goto fail; } - } else if (state != STATE_CONNECTING && state != STATE_CLOSED) { + } else if (state == STATE_STANDBY && !policy.server && is_queued()) { + ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state) + << " policy.server is false" << dendl; + _connect(); + } else if (sd >= 0 && state != STATE_CONNECTING && state != STATE_CLOSED) { r = _try_send(bl); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl; goto fail; } } + write_lock.Unlock(); return ; fail: + write_lock.Unlock(); fault(); } @@ -2390,7 +2431,7 @@ void AsyncConnection::wakeup_from(uint64_t id) void AsyncConnection::local_deliver() { ldout(async_msgr->cct, 10) << __func__ << dendl; - Mutex::Locker l(lock); + Mutex::Locker l(write_lock); while (!local_messages.empty()) { Message *m = local_messages.back(); local_messages.pop_back(); @@ -2398,12 +2439,12 @@ void AsyncConnection::local_deliver() m->set_recv_stamp(ceph_clock_now(async_msgr->cct)); ldout(async_msgr->cct, 10) << __func__ << " " << *m << " local deliver " << dendl; async_msgr->ms_fast_preprocess(m); - lock.Unlock(); + write_lock.Unlock(); if (async_msgr->ms_can_fast_dispatch(m)) { async_msgr->ms_fast_dispatch(m); } else { msgr->ms_deliver_dispatch(m); } - lock.Lock(); + write_lock.Lock(); } } diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 8c677f524501..85a203ba357b 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -45,10 +45,15 @@ class AsyncConnection : public Connection { int read_bulk(int fd, char *buf, int len); int do_sendmsg(struct msghdr &msg, int len, bool more); + int try_send(bufferlist bl, bool send=true) { + Mutex::Locker l(write_lock); + return _try_send(bl, send); + } // if "send" is false, it will only append bl to send buffer // the main usage is avoid error happen outside messenger threads int _try_send(bufferlist bl, bool send=true); int _send(Message *m); + void prepare_send_message(Message *m, bufferlist &bl); int read_until(uint64_t needed, char *p); int _process_connection(); void _connect(); @@ -63,7 +68,7 @@ class AsyncConnection : public Connection { int randomize_out_seq(); void handle_ack(uint64_t seq); void _send_keepalive_or_ack(bool ack=false, utime_t *t=NULL); - int write_message(const ceph_msg_header& header, const ceph_msg_footer& footer, bufferlist& blist); + int write_message(Message *m, bufferlist& bl); int _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &reply, bufferlist authorizer_reply) { bufferlist reply_bl; @@ -74,7 +79,7 @@ class AsyncConnection : public Connection { if (reply.authorizer_len) { reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length()); } - int r = _try_send(reply_bl); + int r = try_send(reply_bl); if (r < 0) return -1; @@ -82,25 +87,31 @@ class AsyncConnection : public Connection { return 0; } bool is_queued() { + assert(write_lock.is_locked()); return !out_q.empty() || outcoming_bl.length(); } void shutdown_socket() { if (sd >= 0) ::shutdown(sd, SHUT_RDWR); } - Message *_get_next_outgoing() { + Message *_get_next_outgoing(bufferlist *bl) { + assert(write_lock.is_locked()); Message *m = 0; while (!m && !out_q.empty()) { - map >::reverse_iterator p = out_q.rbegin(); - if (!p->second.empty()) { - m = p->second.front(); - p->second.pop_front(); + map > >::reverse_iterator it = out_q.rbegin(); + if (!it->second.empty()) { + list >::iterator p = it->second.begin(); + m = p->second; + if (bl) + bl->swap(p->first); + it->second.erase(p); } - if (p->second.empty()) - out_q.erase(p->first); + if (it->second.empty()) + out_q.erase(it->first); } return m; } + public: AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c); ~AsyncConnection(); @@ -206,19 +217,24 @@ class AsyncConnection : public Connection { AsyncMessenger *async_msgr; int global_seq; __u32 connect_seq, peer_global_seq; - uint64_t out_seq; + atomic_t out_seq; uint64_t in_seq, in_seq_acked; int state; int state_after_send; int sd; int port; Messenger::Policy policy; - map > out_q; // priority queue for outbound msgs - list sent; + + Mutex write_lock; + int can_write; // 0. can't send 1. can send_message 2. connection is closed + bool open_write; + map > > out_q; // priority queue for outbound msgs + list > sent; // the first bufferlist need to inject seq list local_messages; // local deliver + bufferlist outcoming_bl; + Mutex lock; utime_t backoff; // backoff time - bool open_write; EventCallbackRef read_handler; EventCallbackRef write_handler; EventCallbackRef reset_handler; @@ -265,7 +281,6 @@ class AsyncConnection : public Connection { char *state_buffer; // used only by "read_until" uint64_t state_offset; - bufferlist outcoming_bl; NetHandler net; EventCenter *center; ceph::shared_ptr session_security; diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index 1d32b8f18eab..1a23b92edff6 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -745,10 +745,10 @@ class SyntheticDispatcher : public Dispatcher { ::decode(i, blp); ::decode(reply, blp); if (reply) { - cerr << __func__ << " reply=" << reply << " i=" << i << std::endl; + //cerr << __func__ << " reply=" << reply << " i=" << i << std::endl; reply_message(m, i); } else if (sent.count(i)) { - cerr << __func__ << " reply=" << reply << " i=" << i << std::endl; + //cerr << __func__ << " reply=" << reply << " i=" << i << std::endl; ASSERT_EQ(conn_sent[m->get_connection()].front(), i); ASSERT_TRUE(m->get_data().contents_equal(sent[i])); conn_sent[m->get_connection()].pop_front(); @@ -776,6 +776,7 @@ class SyntheticDispatcher : public Dispatcher { if (m->get_middle().length()) rm->set_middle(bl); m->get_connection()->send_message(rm); + //cerr << __func__ << " conn=" << m->get_connection() << " reply m=" << m << " i=" << i << std::endl; } void send_message_wrap(ConnectionRef con, Message *m) { @@ -791,6 +792,7 @@ class SyntheticDispatcher : public Dispatcher { sent[i] = m->get_data(); conn_sent[con].push_back(i); } + //cerr << __func__ << " conn=" << con.get() << " send m=" << m << " i=" << i << std::endl; } ASSERT_EQ(con->send_message(m), 0); }