namespace {
+// TODO: apply the same logging policy to Protocol V1
+// Log levels in V2 Protocol:
+// * error level, something error that cause connection to terminate:
+// - fatal errors;
+// - bugs;
+// * warn level: something unusual that identifies connection fault or replacement:
+// - unstable network;
+// - incompatible peer;
+// - auth failure;
+// - connection race;
+// - connection reset;
+// * info level, something very important to show connection lifecycle,
+// which doesn't happen very frequently;
+// * debug level, important logs for debugging, including:
+// - all the messages sent/received (-->/<==);
+// - all the frames exchanged (WRITE/GOT);
+// - important fields updated (UPDATE);
+// - connection state transitions (TRIGGER);
+// * trace level, trivial logs showing:
+// - the exact bytes being sent/received (SEND/RECV(bytes));
+// - detailed information of sub-frames;
+// - integrity checks;
+// - etc.
seastar::logger& logger() {
return ceph::get_logger(ceph_subsys_ms);
}
ceph::net::SocketConnection& conn,
const char *where) {
if (actual != expected) {
- logger().error("{} {} received wrong tag: {}, expected {}",
- conn, where,
- static_cast<uint32_t>(actual),
- static_cast<uint32_t>(expected));
+ logger().warn("{} {} received wrong tag: {}, expected {}",
+ conn, where,
+ static_cast<uint32_t>(actual),
+ static_cast<uint32_t>(expected));
abort_in_fault();
}
}
inline void unexpected_tag(const Tag& unexpected,
ceph::net::SocketConnection& conn,
const char *where) {
- logger().error("{} {} received unexpected tag: {}",
- conn, where, static_cast<uint32_t>(unexpected));
+ logger().warn("{} {} received unexpected tag: {}",
+ conn, where, static_cast<uint32_t>(unexpected));
abort_in_fault();
}
conn.target_addr = _peer_addr;
conn.set_peer_type(_peer_type);
conn.policy = messenger.get_policy(_peer_type);
+ logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_type={},"
+ " policy(lossy={}, server={}, standby={}, resetcheck={})",
+ conn, _peer_addr, ceph_entity_type_name(_peer_type), conn.policy.lossy,
+ conn.policy.server, conn.policy.standby, conn.policy.resetcheck);
messenger.register_conn(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
execute_connecting();
ceph_assert(!socket);
conn.target_addr = _peer_addr;
socket = std::move(sock);
+ logger().info("{} ProtocolV2::start_accept(): peer_addr={}", conn, _peer_addr);
messenger.accept_conn(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
execute_accepting();
// everything through ::Decode is unnecessary.
const auto& main_preamble = \
*reinterpret_cast<const preamble_block_t*>(bl.get());
+ logger().trace("{} RECV({}) main preamble: tag={}, num_segments={}, crc={}",
+ conn, bl.size(), (int)main_preamble.tag,
+ (int)main_preamble.num_segments, main_preamble.crc);
// verify preamble's CRC before any further processing
const auto rx_crc = ceph_crc32c(0,
reinterpret_cast<const unsigned char*>(&main_preamble),
sizeof(main_preamble) - sizeof(main_preamble.crc));
if (rx_crc != main_preamble.crc) {
- logger().error("{} crc mismatch for main preamble rx_crc={} tx_crc={}",
- conn, rx_crc, main_preamble.crc);
+ logger().warn("{} crc mismatch for main preamble rx_crc={} tx_crc={}",
+ conn, rx_crc, main_preamble.crc);
abort_in_fault();
}
- logger().trace("{} read main preamble: tag={}, len={}", conn, (int)main_preamble.tag, bl.size());
// currently we do support between 1 and MAX_NUM_SEGMENTS segments
if (main_preamble.num_segments < 1 ||
main_preamble.num_segments > MAX_NUM_SEGMENTS) {
- logger().error("{} unsupported num_segments={}",
- conn, main_preamble.num_segments);
+ logger().warn("{} unsupported num_segments={}",
+ conn, main_preamble.num_segments);
abort_in_fault();
}
if (main_preamble.num_segments > MAX_NUM_SEGMENTS) {
- logger().error("{} num_segments too much: {}",
- conn, main_preamble.num_segments);
+ logger().warn("{} num_segments too much: {}",
+ conn, main_preamble.num_segments);
abort_in_fault();
}
rx_segments_data.clear();
for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) {
- logger().trace("{} got new segment: len={} align={}",
+ logger().trace("{} GOT frame segment: len={} align={}",
conn, main_preamble.segments[idx].length,
main_preamble.segments[idx].alignment);
rx_segments_desc.emplace_back(main_preamble.segments[idx]);
// TODO: create aligned and contiguous buffer from socket
return read_exactly(cur_rx_desc.length)
.then([this] (auto tmp_bl) {
+ logger().trace("{} RECV({}) frame segment[{}]",
+ conn, tmp_bl.size(), rx_segments_data.size());
bufferlist data;
data.append(buffer::create(std::move(tmp_bl)));
- logger().trace("{} read frame segment[{}], length={}",
- conn, rx_segments_data.size(), data.length());
if (session_stream_handlers.rx) {
// TODO
ceph_assert(false);
ceph_assert(!session_stream_handlers.rx);
return read_exactly(FRAME_PLAIN_EPILOGUE_SIZE);
}).then([this] (auto bl) {
- logger().trace("{} read frame epilogue length={}", conn, bl.size());
+ logger().trace("{} RECV({}) frame epilogue", conn, bl.size());
__u8 late_flags;
if (session_stream_handlers.rx) {
const __u32 expected_crc = epilogue.crc_values[idx];
const __u32 calculated_crc = rx_segments_data[idx].crc32c(-1);
if (expected_crc != calculated_crc) {
- logger().error("{} message integrity check failed at index {}:"
- " expected_crc={} calculated_crc={}",
- conn, (unsigned int)idx, expected_crc, calculated_crc);
+ logger().warn("{} message integrity check failed at index {}:"
+ " expected_crc={} calculated_crc={}",
+ conn, (unsigned int)idx, expected_crc, calculated_crc);
abort_in_fault();
} else {
logger().trace("{} message integrity check success at index {}: crc={}",
}
late_flags = epilogue.late_flags;
}
+ logger().trace("{} GOT frame epilogue: late_flags={}",
+ conn, (unsigned)late_flags);
// we do have a mechanism that allows transmitter to start sending message
// and abort after putting entire data field on wire. This will be used by
seastar::future<> ProtocolV2::write_frame(F &frame, bool flush)
{
auto bl = frame.get_buffer(session_stream_handlers);
- logger().trace("{} write frame: tag={}, len={}", conn,
- static_cast<uint32_t>(frame.tag), bl.length());
+ const auto main_preamble = reinterpret_cast<const preamble_block_t*>(bl.front().c_str());
+ logger().trace("{} SEND({}) frame: tag={}, num_segments={}, crc={}",
+ conn, bl.length(), (int)main_preamble->tag,
+ (int)main_preamble->num_segments, main_preamble->crc);
if (flush) {
return write_flush(std::move(bl));
} else {
conn, get_state_name(state));
ceph_assert(false);
}
- logger().trace("{} trigger {}, was {}",
+ logger().debug("{} TRIGGER {}, was {}",
conn, get_state_name(_state), get_state_name(state));
state = _state;
set_write_state(_write_state);
bufferlist bl;
bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
- encode((uint16_t)banner_payload.length(), bl, 0);
+ auto len_payload = static_cast<uint16_t>(banner_payload.length());
+ encode(len_payload, bl, 0);
bl.claim_append(banner_payload);
+ logger().debug("{} SEND({}) banner: len_payload={}, supported={}, "
+ "required={}, banner=\"{}\"",
+ conn, bl.length(), len_payload,
+ CEPH_MSGR2_SUPPORTED_FEATURES, CEPH_MSGR2_REQUIRED_FEATURES,
+ CEPH_BANNER_V2_PREFIX);
return write_flush(std::move(bl)).then([this] {
// 2. read peer banner
unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(__le16);
}).then([this] (auto bl) {
// 3. process peer banner and read banner_payload
unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
+ logger().debug("{} RECV({}) banner: \"{}\"",
+ conn, bl.size(),
+ std::string((const char*)bl.get(), banner_prefix_len));
if (memcmp(bl.get(), CEPH_BANNER_V2_PREFIX, banner_prefix_len) != 0) {
if (memcmp(bl.get(), CEPH_BANNER, strlen(CEPH_BANNER)) == 0) {
- logger().error("{} peer is using V1 protocol", conn);
+ logger().warn("{} peer is using V1 protocol", conn);
} else {
- logger().error("{} peer sent bad banner", conn);
+ logger().warn("{} peer sent bad banner", conn);
}
abort_in_fault();
}
try {
decode(payload_len, ti);
} catch (const buffer::error &e) {
- logger().error("{} decode banner payload len failed", conn);
+ logger().warn("{} decode banner payload len failed", conn);
abort_in_fault();
}
+ logger().debug("{} GOT banner: payload_len={}", conn, payload_len);
return read(payload_len);
}).then([this] (bufferlist bl) {
// 4. process peer banner_payload and send HelloFrame
decode(peer_supported_features, p);
decode(peer_required_features, p);
} catch (const buffer::error &e) {
- logger().error("{} decode banner payload failed", conn);
+ logger().warn("{} decode banner payload failed", conn);
abort_in_fault();
}
- logger().trace("{} supported={} required={}",
- conn, peer_supported_features, peer_required_features);
+ logger().debug("{} RECV({}) banner features: supported={} required={}",
+ conn, bl.length(),
+ peer_supported_features, peer_required_features);
// Check feature bit compatibility
uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
auto hello = HelloFrame::Encode(messenger.get_mytype(),
conn.target_addr);
+ logger().debug("{} WRITE HelloFrame: my_type={}, peer_addr={}",
+ conn, ceph_entity_type_name(messenger.get_mytype()),
+ conn.target_addr);
return write_frame(hello);
}).then([this] {
//5. read peer HelloFrame
}).then([this] {
// 6. process peer HelloFrame
auto hello = HelloFrame::Decode(rx_segments_data.back());
- logger().trace("{} received hello: peer_type={} peer_addr_for_me={}",
+ logger().debug("{} GOT HelloFrame: my_type={} peer_addr={}",
conn, ceph_entity_type_name(hello.entity_type()),
hello.peer_addr());
return seastar::make_ready_future<entity_type_t, entity_addr_t>(
return read_frame_payload().then([this] {
// handle_auth_bad_method() logic
auto bad_method = AuthBadMethodFrame::Decode(rx_segments_data.back());
- logger().warn("{} got AuthBadMethod, method={} reslt={}, "
- "allowed methods={}, allowed modes={}",
+ logger().warn("{} GOT AuthBadMethodFrame: method={} result={}, "
+ "allowed_methods={}, allowed_modes={}",
conn, bad_method.method(), cpp_strerror(bad_method.result()),
bad_method.allowed_methods(), bad_method.allowed_modes());
ceph_assert(messenger.get_auth_client());
bad_method.method(), bad_method.result(),
bad_method.allowed_methods(), bad_method.allowed_modes());
if (r < 0) {
- logger().error("{} auth_client handle_auth_bad_method returned {}",
- conn, r);
+ logger().warn("{} auth_client handle_auth_bad_method returned {}",
+ conn, r);
abort_in_fault();
}
return client_auth(bad_method.allowed_methods());
return read_frame_payload().then([this] {
// handle_auth_reply_more() logic
auto auth_more = AuthReplyMoreFrame::Decode(rx_segments_data.back());
- logger().trace("{} auth reply more len={}",
+ logger().debug("{} GOT AuthReplyMoreFrame: payload_len={}",
conn, auth_more.auth_payload().length());
ceph_assert(messenger.get_auth_client());
// let execute_connecting() take care of the thrown exception
auto reply = messenger.get_auth_client()->handle_auth_reply_more(
conn.shared_from_this(), auth_meta, auth_more.auth_payload());
auto more_reply = AuthRequestMoreFrame::Encode(reply);
+ logger().debug("{} WRITE AuthRequestMoreFrame: payload_len={}",
+ conn, reply.length());
return write_frame(more_reply);
}).then([this] {
return handle_auth_reply();
return read_frame_payload().then([this] {
// handle_auth_done() logic
auto auth_done = AuthDoneFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
+ conn, auth_done.global_id(),
+ ceph_con_mode_name(auth_done.con_mode()),
+ auth_done.auth_payload().length());
ceph_assert(messenger.get_auth_client());
int r = messenger.get_auth_client()->handle_auth_done(
conn.shared_from_this(), auth_meta,
auth_done.con_mode(),
auth_done.auth_payload());
if (r < 0) {
- logger().error("{} auth_client handle_auth_done returned {}", conn, r);
+ logger().warn("{} auth_client handle_auth_done returned {}", conn, r);
abort_in_fault();
}
auth_meta->con_mode = auth_done.con_mode();
messenger.get_auth_client()->get_auth_request(conn.shared_from_this(), auth_meta);
auth_meta->auth_method = auth_method;
auto frame = AuthRequestFrame::Encode(auth_method, preferred_modes, bl);
+ logger().debug("{} WRITE AuthRequestFrame: method={},"
+ " preferred_modes={}, payload_len={}",
+ conn, auth_method, preferred_modes, bl.length());
return write_frame(frame).then([this] {
return handle_auth_reply();
});
{
return read_frame_payload().then([this] {
// handle_wait() logic
- logger().trace("{} received WAIT (connection race)", conn);
+ logger().warn("{} GOT WaitFrame", conn);
WaitFrame::Decode(rx_segments_data.back());
return false;
});
entity_addr_t a;
a.u.sa.sa_family = AF_INET;
a.set_type(entity_addr_t::TYPE_MSGR2);
- logger().debug("{} learn from addr {}", conn, a);
return messenger.learned_addr(a).then([this] {
uint64_t flags = 0;
if (conn.policy.lossy) {
conn.policy.features_required | msgr2_required, flags,
client_cookie);
- logger().trace("{} sending identification: addrs={} target={} gid={}"
- " global_seq={} features_supported={} features_required={}"
- " flags={} cookie={}",
+ logger().debug("{} WRITE ClientIdentFrame: addrs={}, target={}, gid={},"
+ " gs={}, features_supported={}, features_required={},"
+ " flags={}, cookie={}",
conn, messenger.get_myaddrs(), conn.target_addr,
messenger.get_myname().num(), global_seq,
conn.policy.features_supported,
return read_frame_payload().then([this] {
// handle_ident_missing_features() logic
auto ident_missing = IdentMissingFeaturesFrame::Decode(rx_segments_data.back());
- logger().error("{} client does not support all server features: {}",
- conn, ident_missing.features());
+ logger().warn("{} GOT IdentMissingFeaturesFrame: features={}"
+ " (client does not support all server features)",
+ conn, ident_missing.features());
abort_in_fault();
// won't be executed
return false;
return read_frame_payload().then([this] {
// handle_server_ident() logic
auto server_ident = ServerIdentFrame::Decode(rx_segments_data.back());
- logger().trace("{} received server identification:"
- " addrs={} gid={} global_seq={}"
- " features_supported={} features_required={}"
- " flags={} cookie={}",
+ logger().debug("{} GOT ServerIdentFrame:"
+ " addrs={}, gid={}, gs={},"
+ " features_supported={}, features_required={},"
+ " flags={}, cookie={}",
conn,
server_ident.addrs(), server_ident.gid(),
server_ident.global_seq(),
ceph_assert(server_ident.flags() & CEPH_MSG_CONNECT_LOSSY);
conn.policy.lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY;
// TODO: backoff = utime_t();
- logger().trace("{} connect success {}, lossy={}, features={}",
- conn, connect_seq, conn.policy.lossy, conn.features);
return dispatcher.ms_handle_connect(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()))
global_seq,
connect_seq,
conn.in_seq);
- logger().trace("{} reconnect to session: client_cookie={}"
- " server_cookie={} gs={} cs={} ms={}",
+ logger().debug("{} WRITE ReconnectFrame: client_cookie={},"
+ " server_cookie={}, gs={}, cs={}, msg_seq={}",
conn, client_cookie, server_cookie,
global_seq, connect_seq, conn.in_seq);
return write_frame(reconnect).then([this] {
return read_frame_payload().then([this] {
// handle_session_retry_global() logic
auto retry = RetryGlobalFrame::Decode(rx_segments_data.back());
+ logger().warn("{} GOT RetryGlobalFrame: gs={}",
+ conn, retry.global_seq());
global_seq = messenger.get_global_seq(retry.global_seq());
- logger().warn("{} received session retry global "
- "global_seq={}, choose new gs={}",
- conn, retry.global_seq(), global_seq);
+ logger().warn("{} UPDATE: gs={}", conn, global_seq);
return client_reconnect();
});
case Tag::SESSION_RETRY:
return read_frame_payload().then([this] {
// handle_session_retry() logic
auto retry = RetryFrame::Decode(rx_segments_data.back());
+ logger().warn("{} GOT RetryFrame: cs={}",
+ conn, retry.connect_seq());
connect_seq = retry.connect_seq() + 1;
- logger().warn("{} received session retry connect_seq={}, inc to cs={}",
- conn, retry.connect_seq(), connect_seq);
+ logger().warn("{} UPDATE: cs={}", conn, connect_seq);
return client_reconnect();
});
case Tag::SESSION_RESET:
return read_frame_payload().then([this] {
// handle_session_reset() logic
auto reset = ResetFrame::Decode(rx_segments_data.back());
- logger().warn("{} received session reset full={}", reset.full());
+ logger().warn("{} GOT ResetFrame: full={}", conn, reset.full());
reset_session(reset.full());
return client_connect();
});
return read_frame_payload().then([this] {
// handle_reconnect_ok() logic
auto reconnect_ok = ReconnectOkFrame::Decode(rx_segments_data.back());
- logger().trace("{} received reconnect ok:"
- "sms={}, lossy={}, features={}",
- conn, reconnect_ok.msg_seq(),
- connect_seq, conn.policy.lossy, conn.features);
+ logger().debug("{} GOT ReconnectOkFrame: msg_seq={}",
+ conn, reconnect_ok.msg_seq());
// TODO
// discard_requeued_up_to()
// backoff = utime_t();
seastar::with_gate(pending_dispatch, [this] {
enable_recording();
global_seq = messenger.get_global_seq();
+ logger().debug("{} UPDATE: gs={}", conn, global_seq);
return Socket::connect(conn.peer_addr)
.then([this](SocketFRef sock) {
+ logger().debug("{} socket connected", conn);
socket = std::move(sock);
if (state == state_t::CLOSING) {
return socket->close().then([this] {
- logger().info("{} is closed during Socket::connect()", conn);
+ logger().warn("{} is closed during Socket::connect()", conn);
abort_in_fault();
});
}
}).then([this] (entity_type_t _peer_type,
entity_addr_t _my_addr_from_peer) {
if (conn.get_peer_type() != _peer_type) {
- logger().debug("{} connection peer type does not match what peer advertises {} != {}",
- conn, ceph_entity_type_name(conn.get_peer_type()),
- ceph_entity_type_name(_peer_type));
+ logger().warn("{} connection peer type does not match what peer advertises {} != {}",
+ conn, ceph_entity_type_name(conn.get_peer_type()),
+ ceph_entity_type_name(_peer_type));
dispatch_reset();
abort_in_close();
}
if (messenger.get_myaddrs().empty() ||
messenger.get_myaddrs().front().is_blank_ip()) {
- logger().trace("peer {} says I am {}", conn.target_addr, _my_addr_from_peer);
return messenger.learned_addr(_my_addr_from_peer);
} else {
return seastar::now();
}
}).then([this] (bool proceed_or_wait) {
if (proceed_or_wait) {
+ logger().info("{} connected: gs={}, pgs={}, cs={},"
+ " client_cookie={}, server_cookie={}, in_seq={}, out_seq={}",
+ conn, global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie, conn.in_seq, conn.out_seq);
execute_ready();
} else {
execute_wait();
ceph_assert(r < 0);
auto [allowed_methods, allowed_modes] =
messenger.get_auth_server()->get_supported_auth_methods(conn.get_peer_type());
- logger().warn("{} send AuthBadMethod(auth_method={}, r={}, "
+ auto bad_method = AuthBadMethodFrame::Encode(
+ auth_meta->auth_method, r, allowed_methods, allowed_modes);
+ logger().warn("{} WRITE AuthBadMethodFrame: method={}, result={}, "
"allowed_methods={}, allowed_modes={})",
conn, auth_meta->auth_method, cpp_strerror(r),
allowed_methods, allowed_modes);
- auto bad_method = AuthBadMethodFrame::Encode(
- auth_meta->auth_method, r, allowed_methods, allowed_modes);
return write_frame(bad_method).then([this] {
return server_auth();
});
case 1: {
auto auth_done = AuthDoneFrame::Encode(
conn.peer_global_id, auth_meta->con_mode, reply);
+ logger().debug("{} WRITE AuthDoneFrame: gid={}, con_mode={}, payload_len={}",
+ conn, conn.peer_global_id,
+ ceph_con_mode_name(auth_meta->con_mode), reply.length());
return write_frame(auth_done).then([this] {
ceph_assert(auth_meta);
// TODO
// auth more
case 0: {
auto more = AuthReplyMoreFrame::Encode(reply);
+ logger().debug("{} WRITE AuthReplyMoreFrame: payload_len={}",
+ conn, reply.length());
return write_frame(more).then([this] {
return read_main_preamble();
}).then([this] (Tag tag) {
return read_frame_payload();
}).then([this] {
auto auth_more = AuthRequestMoreFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT AuthRequestMoreFrame: payload_len={}",
+ conn, auth_more.auth_payload().length());
return _handle_auth_request(auth_more.auth_payload(), true);
});
}
}).then([this] {
// handle_auth_request() logic
auto request = AuthRequestFrame::Decode(rx_segments_data.back());
- logger().trace("{} got AuthRequest(method={}, preferred_modes={}, payload_len={})",
+ logger().debug("{} GOT AuthRequestFrame: method={}, preferred_modes={},"
+ " payload_len={}",
conn, request.method(), request.preferred_modes(),
request.auth_payload().length());
auth_meta->auth_method = request.method();
seastar::future<bool> ProtocolV2::send_wait()
{
auto wait = WaitFrame::Encode();
+ logger().warn("{} WRITE WaitFrame", conn);
return write_frame(wait).then([this] {
return false;
});
ceph_assert(exproto);
if (exproto->state == state_t::CLOSING) {
- logger().warn("{} existing {} already closed.", conn, *existing);
+ logger().warn("{} existing connection {} already closed.", conn, *existing);
return send_server_ident().then([this] {
return true;
});
}
if (exproto->state == state_t::REPLACING) {
- logger().warn("{} existing racing replace happened while replacing: {}",
+ logger().warn("{} racing replace happened while replacing existing connection {}",
conn, *existing);
return send_wait();
}
if (exproto->peer_global_seq > peer_global_seq) {
- logger().warn("{} this is a stale connection, peer_global_seq={}"
- "existing->peer_global_seq={} close this connection",
- conn, peer_global_seq, exproto->peer_global_seq);
+ logger().warn("{} this is a stale connection, because peer_global_seq({})"
+ "< existing->peer_global_seq({}), close this connection"
+ " in favor of existing connection {}",
+ conn, peer_global_seq, exproto->peer_global_seq, *existing);
dispatch_reset();
abort_in_close();
}
if (existing->policy.lossy) {
// existing connection can be thrown out in favor of this one
- logger().warn("{} existing={} is a lossy channel. Close existing in favor of"
+ logger().warn("{} existing connection {} is a lossy channel. Close existing in favor of"
" this connection", conn, *existing);
exproto->dispatch_reset();
exproto->close();
return read_frame_payload().then([this] {
// handle_client_ident() logic
auto client_ident = ClientIdentFrame::Decode(rx_segments_data.back());
- logger().trace("{} received client identification: addrs={} target={}"
- " gid={} global_seq={} features_supported={}"
- " features_required={} flags={} cookie={}",
+ logger().debug("{} GOT ClientIdentFrame: addrs={}, target={},"
+ " gid={}, gs={}, features_supported={},"
+ " features_required={}, flags={}, cookie={}",
conn, client_ident.addrs(), client_ident.target_addr(),
client_ident.gid(), client_ident.global_seq(),
client_ident.supported_features(),
if (client_ident.addrs().empty() ||
client_ident.addrs().front() == entity_addr_t()) {
- logger().error("{} oops, client_ident.addrs() is empty", conn);
+ logger().warn("{} oops, client_ident.addrs() is empty", conn);
abort_in_fault();
}
if (!messenger.get_myaddrs().contains(client_ident.target_addr())) {
- logger().error("{} peer is trying to reach {} which is not us ({})",
- conn, client_ident.target_addr(), messenger.get_myaddrs());
+ logger().warn("{} peer is trying to reach {} which is not us ({})",
+ conn, client_ident.target_addr(), messenger.get_myaddrs());
abort_in_fault();
}
// TODO: change peer_addr to entity_addrvec_t
conn.peer_addr.set_type(paddr.get_type());
conn.peer_addr.set_port(paddr.get_port());
conn.peer_addr.set_nonce(paddr.get_nonce());
- logger().trace("{} got paddr={}, conn.peer_addr={}", conn, paddr, conn.peer_addr);
+ logger().debug("{} UPDATE: peer_addr={}", conn, conn.peer_addr);
conn.target_addr = conn.peer_addr;
conn.set_peer_id(client_ident.gid());
(conn.policy.features_required | msgr2_required) &
~(uint64_t)client_ident.supported_features();
if (feat_missing) {
- logger().warn("{} peer missing required features {}", conn, feat_missing);
auto ident_missing_features = IdentMissingFeaturesFrame::Encode(feat_missing);
+ logger().warn("{} WRITE IdentMissingFeaturesFrame: features={} (peer missing)",
+ conn, feat_missing);
return write_frame(ident_missing_features).then([this] {
return false;
});
}
connection_features =
client_ident.supported_features() & conn.policy.features_supported;
+ logger().debug("{} UPDATE: connection_features={}", conn, connection_features);
peer_global_seq = client_ident.global_seq();
if (existing) {
if (existing->protocol->proto_type != proto_t::v2) {
- logger().warn("{} existing {} proto version is {}, close",
+ logger().warn("{} existing connection {} proto version is {}, close existing",
conn, *existing,
static_cast<int>(existing->protocol->proto_type));
// should unregister the existing from msgr atomically
seastar::future<bool> ProtocolV2::send_retry(uint64_t connect_seq)
{
auto retry = RetryFrame::Encode(connect_seq);
+ logger().warn("{} WRITE RetryFrame: cs={}", conn, connect_seq);
return write_frame(retry).then([this] {
return read_reconnect();
});
seastar::future<bool> ProtocolV2::send_retry_global(uint64_t global_seq)
{
auto retry = RetryGlobalFrame::Encode(global_seq);
+ logger().warn("{} WRITE RetryGlobalFrame: gs={}", conn, global_seq);
return write_frame(retry).then([this] {
return read_reconnect();
});
seastar::future<bool> ProtocolV2::send_reset(bool full)
{
auto reset = ResetFrame::Encode(full);
+ logger().warn("{} WRITE ResetFrame: full={}", conn, full);
return write_frame(reset).then([this] {
return read_main_preamble();
}).then([this] (Tag tag) {
// handle_reconnect() logic
auto reconnect = ReconnectFrame::Decode(rx_segments_data.back());
- logger().trace("{} received reconnect: client_cookie={} server_cookie={}"
- " gs={} cs={} ms={}",
+ logger().debug("{} GOT ReconnectFrame: client_cookie={}, server_cookie={},"
+ " gs={}, cs={}, msg_seq={}",
conn, reconnect.client_cookie(), reconnect.server_cookie(),
reconnect.global_seq(), reconnect.connect_seq(),
reconnect.msg_seq());
if (!existing) {
// there is no existing connection therefore cannot reconnect to previous
// session
- logger().error("{} server_reconnect: no existing connection,"
- " reseting client", conn);
+ logger().warn("{} server_reconnect: no existing connection,"
+ " reseting client", conn);
return send_reset(true);
}
if (existing->protocol->proto_type != proto_t::v2) {
- logger().warn("{} server_reconnect: existing {} proto version is {},"
- "close existing and resetting client.",
+ logger().warn("{} server_reconnect: existing connection {} proto version is {},"
+ "close existing and reset client.",
conn, *existing,
static_cast<int>(existing->protocol->proto_type));
existing->close();
ceph_assert(exproto);
if (exproto->state == state_t::REPLACING) {
- logger().warn("{} server_reconnect: existing racing replace happened while "
- " replacing, retry_global. existing={}", conn, *existing);
+ logger().warn("{} server_reconnect: racing replace happened while "
+ " replacing existing connection {}, retry global.",
+ conn, *existing);
return send_retry_global(exproto->peer_global_seq);
}
if (exproto->client_cookie != reconnect.client_cookie()) {
- logger().warn("{} server_reconnect: existing={} client cookie mismatch,"
- " I must have reseted: cc={} rcc={}, reseting client.",
+ logger().warn("{} server_reconnect:"
+ " client_cookie mismatch with existing connection {},"
+ " cc={} rcc={}. I must have reseted, reseting client.",
conn, *existing, exproto->client_cookie, reconnect.client_cookie());
return send_reset(conn.policy.resetcheck);
} else if (exproto->server_cookie == 0) {
// - b reconnects to a with cookie X, connect_seq=1
// - a has cookie==0
logger().warn("{} server_reconnect: I was a client and didn't received the"
- " server_ident. Asking peer to resume session establishment",
- conn);
+ " server_ident with existing connection {}."
+ " Asking peer to resume session establishment",
+ conn, *existing);
return send_reset(false);
}
if (exproto->peer_global_seq > reconnect.global_seq()) {
- logger().trace("{} server_reconnect: stale global_seq: sgs={} cgs={},"
- " ask client to retry global",
- conn, exproto->peer_global_seq, reconnect.global_seq());
+ logger().warn("{} server_reconnect: stale global_seq: exist_pgs={} peer_gs={},"
+ " with existing connection {},"
+ " ask client to retry global",
+ conn, exproto->peer_global_seq, reconnect.global_seq(),
+ *existing);
return send_retry_global(exproto->peer_global_seq);
}
if (exproto->connect_seq > reconnect.connect_seq()) {
- logger().trace("{} server_reconnect: stale connect_seq scs={} ccs={},"
- " ask client to retry",
- conn, exproto->connect_seq, reconnect.connect_seq());
+ logger().warn("{} server_reconnect: stale connect_seq exist_cs={} peer_cs={},"
+ " with existing connection {},"
+ " ask client to retry",
+ conn, exproto->connect_seq, reconnect.connect_seq(),
+ *existing);
return send_retry(exproto->connect_seq);
}
!existing->policy.server) {
// the existing connection wins
logger().warn("{} server_reconnect: reconnect race detected,"
- " this connection loses to existing={},"
+ " this connection loses to existing connection {},"
" ask client to wait", conn, *existing);
return send_wait();
} else {
// this connection wins
logger().warn("{} server_reconnect: reconnect race detected,"
- " replacing existing={} socket by this connection's socket",
+ " replacing existing connection {}"
+ " socket by this connection's socket",
conn, *existing);
}
}
- logger().warn("{} server_reconnect: reconnect to exsiting={}", conn, *existing);
+ logger().warn("{} server_reconnect: reconnect to exsiting connection {}",
+ conn, *existing);
// everything looks good
exproto->connect_seq = reconnect.connect_seq();
conn.set_peer_type(_peer_type);
conn.policy = messenger.get_policy(_peer_type);
- logger().trace("{} accept of host type {}, lossy={} server={} standby={} resetcheck={}",
- conn, ceph_entity_type_name(_peer_type),
- conn.policy.lossy, conn.policy.server,
- conn.policy.standby, conn.policy.resetcheck);
+ logger().info("{} UPDATE: peer_type={},"
+ " policy(lossy={} server={} standby={} resetcheck={})",
+ conn, ceph_entity_type_name(_peer_type),
+ conn.policy.lossy, conn.policy.server,
+ conn.policy.standby, conn.policy.resetcheck);
return server_auth();
}).then([this] {
return read_main_preamble();
messenger.unaccept_conn(
seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this()));
+ logger().info("{} accepted: gs={}, pgs={}, cs={},"
+ " client_cookie={}, server_cookie={}, in_seq={}, out_seq={}",
+ conn, global_seq, peer_global_seq, connect_seq,
+ client_cookie, server_cookie, conn.in_seq, conn.out_seq);
execute_ready();
} else {
execute_server_wait();
ceph_assert(record_io);
record_io = false;
rxbuf.clear();
+ logger().debug("{} WRITE AuthSignatureFrame: signature={}", conn, sig);
return write_frame(sig_frame).then([this] {
return read_main_preamble();
}).then([this] (Tag tag) {
}).then([this] {
// handle_auth_signature() logic
auto sig_frame = AuthSignatureFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT AuthSignatureFrame: signature={}", conn, sig_frame.signature());
const auto actual_tx_sig = auth_meta->session_key.empty() ?
sha256_digest_t() : auth_meta->session_key.hmac_sha256(nullptr, txbuf);
conn, actual_tx_sig, sig_frame.signature());
abort_in_fault();
}
- logger().trace("{} pre-auth signature success sig_frame.signature()={}",
- conn, sig_frame.signature());
txbuf.clear();
});
}
flags,
server_cookie);
- logger().trace("{} sending server identification: addrs={} gid={}"
- " global_seq={} features_supported={} features_required={}"
- " flags={} cookie={}",
+ logger().debug("{} WRITE ServerIdentFrame: addrs={}, gid={},"
+ " gs={}, features_supported={}, features_required={},"
+ " flags={}, cookie={}",
conn, messenger.get_myaddrs(), messenger.get_myname().num(),
gs, conn.policy.features_supported,
conn.policy.features_required | msgr2_required,
return read_frame_payload().then([this] {
// handle_message_ack() logic
auto ack = AckFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT AckFrame: seq={}", ack.seq());
handle_message_ack(ack.seq());
});
case Tag::KEEPALIVE2:
return read_frame_payload().then([this] {
// handle_keepalive2() logic
auto keepalive_frame = KeepAliveFrame::Decode(rx_segments_data.back());
+ logger().debug("{} GOT KeepAliveFrame: timestamp={}",
+ conn, keepalive_frame.timestamp());
notify_keepalive_ack(keepalive_frame.timestamp());
conn.set_last_keepalive(seastar::lowres_system_clock::now());
});
auto keepalive_ack_frame = KeepAliveFrameAck::Decode(rx_segments_data.back());
conn.set_last_keepalive_ack(
seastar::lowres_system_clock::time_point{keepalive_ack_frame.timestamp()});
- logger().trace("{} got KEEPALIVE_ACK {}",
+ logger().debug("{} GOT KeepAliveFrameAck: timestamp={}",
conn, conn.last_keepalive_ack);
});
default: {