From db131b313093b5349eda31b6b1f71dc954a8b1fe Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Tue, 22 Jan 2019 17:25:46 +0000 Subject: [PATCH] msg/async: msgr2: organizing log messages Signed-off-by: Ricardo Dias --- src/msg/async/AsyncConnection.cc | 2 +- src/msg/async/ProtocolV2.cc | 110 +++++++++++++++++-------------- 2 files changed, 63 insertions(+), 49 deletions(-) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 13711c98bc9..e6727d5872b 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -650,7 +650,7 @@ void AsyncConnection::mark_down() void AsyncConnection::handle_write() { - ldout(async_msgr->cct, 4) << __func__ << dendl; + ldout(async_msgr->cct, 10) << __func__ << dendl; protocol->write_event(); } diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 392c61690bb..dc81e23e822 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -439,6 +439,7 @@ ProtocolV2::~ProtocolV2() { } void ProtocolV2::connect() { + ldout(cct, 1) << __func__ << dendl; state = START_CONNECT; got_bad_method = 0; @@ -448,7 +449,10 @@ void ProtocolV2::connect() { } } -void ProtocolV2::accept() { state = START_ACCEPT; } +void ProtocolV2::accept() { + ldout(cct, 1) << __func__ << dendl; + state = START_ACCEPT; +} bool ProtocolV2::is_connected() { return can_write; } @@ -477,7 +481,7 @@ void ProtocolV2::discard_out_queue() { } void ProtocolV2::reset_session() { - ldout(cct, 20) << __func__ << dendl; + ldout(cct, 1) << __func__ << dendl; std::lock_guard l(connection->write_lock); if (connection->delay_state) { @@ -501,7 +505,7 @@ void ProtocolV2::reset_session() { } void ProtocolV2::stop() { - ldout(cct, 2) << __func__ << dendl; + ldout(cct, 1) << __func__ << dendl; if (state == CLOSED) { return; } @@ -530,8 +534,9 @@ void ProtocolV2::requeue_sent() { while (!sent.empty()) { Message *m = sent.back(); sent.pop_back(); - ldout(cct, 10) << __func__ << " " << *m << " for resend " - << " (" << m->get_seq() << ")" << dendl; + ldout(cct, 5) << __func__ << " requeueing message m=" << m + << " seq=" << m->get_seq() << " type=" << m->get_type() << " " + << *m << dendl; rq.push_front(make_pair(bufferlist(), m)); } } @@ -547,9 +552,9 @@ uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) { while (!rq.empty()) { pair p = rq.front(); if (p.second->get_seq() == 0 || p.second->get_seq() > seq) break; - ldout(cct, 10) << __func__ << " " << *(p.second) << " for resend seq " - << p.second->get_seq() << " <= " << seq << ", discarding" - << dendl; + ldout(cct, 5) << __func__ << " discarding message m=" << p.second + << " seq=" << p.second->get_seq() << " ack_seq=" << seq << " " + << *(p.second) << dendl; p.second->put(); rq.pop_front(); count++; @@ -612,7 +617,7 @@ CtPtr ProtocolV2::_fault() { if (connection->policy.lossy && state != START_CONNECT && state != CONNECTING) { - ldout(cct, 1) << __func__ << " on lossy channel, failing" << dendl; + ldout(cct, 2) << __func__ << " on lossy channel, failing" << dendl; stop(); connection->dispatch_queue->queue_reset(connection); return nullptr; @@ -626,7 +631,7 @@ CtPtr ProtocolV2::_fault() { if (out_queue.empty() && state >= START_ACCEPT && state <= ACCEPTING_SESSION && !replacing) { - ldout(cct, 10) << __func__ << " with nothing to send and in the half " + ldout(cct, 2) << __func__ << " with nothing to send and in the half " << " accept state just closed" << dendl; connection->write_lock.unlock(); stop(); @@ -640,8 +645,8 @@ CtPtr ProtocolV2::_fault() { if (connection->policy.standby && out_queue.empty() && !keepalive && state != WAIT) { - ldout(cct, 10) << __func__ << " with nothing to send, going to standby" - << dendl; + ldout(cct, 1) << __func__ << " with nothing to send, going to standby" + << dendl; state = STANDBY; connection->write_lock.unlock(); return nullptr; @@ -652,10 +657,10 @@ CtPtr ProtocolV2::_fault() { if (state != START_CONNECT && state != CONNECTING && state != WAIT) { // policy maybe empty when state is in accept if (connection->policy.server) { - ldout(cct, 0) << __func__ << " server, going to standby" << dendl; + ldout(cct, 1) << __func__ << " server, going to standby" << dendl; state = STANDBY; } else { - ldout(cct, 0) << __func__ << " initiating reconnect" << dendl; + ldout(cct, 1) << __func__ << " initiating reconnect" << dendl; connect_seq++; global_seq = messenger->get_global_seq(); state = START_CONNECT; @@ -677,7 +682,7 @@ CtPtr ProtocolV2::_fault() { global_seq = messenger->get_global_seq(); state = START_CONNECT; connection->state = AsyncConnection::STATE_CONNECTING; - ldout(cct, 10) << __func__ << " waiting " << backoff << dendl; + ldout(cct, 1) << __func__ << " waiting " << backoff << dendl; // woke up again; connection->register_time_events.insert( connection->center->create_time_event(backoff.to_nsec() / 1000, @@ -724,14 +729,16 @@ void ProtocolV2::send_message(Message *m) { // ensure the correctness of message encoding bl.clear(); m->clear_payload(); - ldout(cct, 5) << __func__ << " clear encoded buffer previous " << f - << " != " << connection->get_features() << dendl; + ldout(cct, 10) << __func__ << " clear encoded buffer previous " << f + << " != " << connection->get_features() << dendl; } if (state == CLOSED) { ldout(cct, 10) << __func__ << " connection closed." << " Drop message " << m << dendl; m->put(); } else { + ldout(cct, 5) << __func__ << " enqueueing message m=" << m + << " type=" << m->get_type() << " " << *m << dendl; m->trace.event("async enqueueing message"); out_queue[m->get_priority()].emplace_back(std::move(bl), m); ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m @@ -839,17 +846,17 @@ ssize_t ProtocolV2::write_message(Message *m, bufferlist &bl, bool more) { encrypt_payload(flat_bl); MessageFrame message(this, header2, flat_bl); - ldout(cct, 5) << __func__ << " sending message type=" << header2.type - << " src " << entity_name_t(messenger->get_myname()) - << " front=" << header2.front_len - << " data=" << header2.data_len << " off " << header2.data_off - << dendl; + ldout(cct, 5) << __func__ << " sending message m=" << m + << " seq=" << m->get_seq() << " " << *m << dendl; bufferlist &msg_bl = message.get_buffer(); connection->outcoming_bl.claim_append(msg_bl); m->trace.event("async writing message"); - ldout(cct, 20) << __func__ << " sending " << m->get_seq() << " " << m + ldout(cct, 20) << __func__ << " sending m=" << m << " seq=" << m->get_seq() + << " src=" << entity_name_t(messenger->get_myname()) + << " front=" << header2.front_len + << " data=" << header2.data_len << " off=" << header2.data_off << dendl; ssize_t total_send_size = connection->outcoming_bl.length(); ssize_t rc = connection->_try_send(more); @@ -1217,7 +1224,7 @@ CtPtr ProtocolV2::_banner_exchange_handle_peer_banner(char *buffer, int r) { return _fault(); } -unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX); + unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX); if (memcmp(buffer, CEPH_BANNER_V2_PREFIX, banner_prefix_len)) { if (memcmp(buffer, CEPH_BANNER, strlen(CEPH_BANNER))) { @@ -1250,11 +1257,6 @@ unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX); offset += sizeof(__le64); peer_required_features = *(__le64 *)(buffer + offset); - ldout(cct, 1) << __func__ << " banner peer_type=" << (int)peer_type - << " supported=" << std::hex << peer_supported_features - << " required=" << std::hex << peer_required_features - << std::dec << dendl; - if (connection->get_peer_type() == -1) { connection->set_peer_type(peer_type); @@ -1277,6 +1279,11 @@ unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX); } } + ldout(cct, 1) << __func__ << " peer_type=" << (int)peer_type + << " supported=" << std::hex << peer_supported_features + << " required=" << std::hex << peer_required_features + << std::dec << dendl; + // Check feature bit compatibility __le64 supported_features = CEPH_MSGR2_SUPPORTED_FEATURES; @@ -1498,6 +1505,10 @@ CtPtr ProtocolV2::ready() { connection->maybe_start_delay_thread(); state = READY; + ldout(cct, 1) << __func__ << " entity=" << peer_name << " cookie=" << std::hex + << cookie << std::dec << " in_seq=" << in_seq + << " out_seq=" << out_seq << dendl; + return CONTINUE(read_frame); } @@ -1914,9 +1925,10 @@ CtPtr ProtocolV2::handle_message_complete() { // note last received message. in_seq = message->get_seq(); - ldout(cct, 5) << " rx " << message->get_source() << " seq " - << message->get_seq() << " " << message << " " << *message - << dendl; + ldout(cct, 5) << __func__ << " received message m=" << message + << " seq=" << message->get_seq() + << " from=" << message->get_source() << " type=" << header.type + << " " << *message << dendl; bool need_dispatch_writer = true; if (!connection->policy.lossy) { @@ -2693,6 +2705,8 @@ CtPtr ProtocolV2::handle_reconnect(char *payload, uint32_t length) { return WRITE(bl, "session retry", read_frame); } + ldout(cct, 1) << __func__ << " reconnect to existing=" << existing << dendl; + // everything looks good exproto->connect_seq = reconnect.connect_seq(); exproto->message_seq = reconnect.msg_seq(); @@ -2737,9 +2751,9 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) { if (existing->policy.lossy) { // existing connection can be thrown out in favor of this one ldout(cct, 1) - << __func__ - << " accept replacing existing (lossy) channel (new one lossy=" - << connection->policy.lossy << ")" << dendl; + << __func__ << " existing=" << existing + << " is a lossy channel. Stopping existing in favor of this connection" + << dendl; existing->protocol->stop(); existing->dispatch_queue->queue_reset(existing.get()); return send_server_ident(); @@ -2749,9 +2763,8 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) { // Found previous session // peer has reseted and we're going to reuse the existing connection // by replacing the communication socket - ldout(cct, 1) << __func__ - << " found previous session, peer must have reseted." - << dendl; + ldout(cct, 1) << __func__ << " found previous session existing=" << existing + << ", peer must have reseted." << dendl; if (connection->policy.resetcheck) { exproto->reset_session(); } @@ -2759,9 +2772,8 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) { } if (exproto->state == READY || exproto->state == STANDBY) { - ldout(cct, 1) << __func__ - << " existing connection is READY/STANDBY, lets reuse it" - << dendl; + ldout(cct, 1) << __func__ << " existing=" << existing + << " is READY/STANDBY, lets reuse it" << dendl; return reuse_connection(existing, exproto, false); } @@ -2773,14 +2785,14 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) { // this connection wins ldout(cct, 1) << __func__ << " connection race detected, replacing existing=" - << existing << " socket by this connection's socket" - << dendl; + << existing << " socket by this connection's socket" << dendl; return reuse_connection(existing, exproto, false); } else { // the existing connection wins - ldout(cct, 1) << __func__ - << " connection race detected, this connection loses" - << dendl; + ldout(cct, 1) + << __func__ + << " connection race detected, this connection loses to existing=" + << existing << dendl; ceph_assert(connection->peer_addrs->msgr2_addr() > messenger->get_myaddrs().msgr2_addr()); @@ -2822,13 +2834,15 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing, auto temp_cs = std::move(connection->cs); EventCenter *new_center = connection->center; Worker *new_worker = connection->worker; + + ldout(messenger->cct, 5) << __func__ << " stop myself to swap existing" + << dendl; // avoid _stop shutdown replacing socket // queue a reset on the new connection, which we're dumping for the old stop(); connection->dispatch_queue->queue_reset(connection); - ldout(messenger->cct, 1) << __func__ << " stop myself to swap existing" - << dendl; + exproto->can_write = false; exproto->replacing = true; exproto->session_security = session_security; -- 2.39.5