}
void ProtocolV2::connect() {
+ ldout(cct, 1) << __func__ << dendl;
state = START_CONNECT;
got_bad_method = 0;
}
}
-void ProtocolV2::accept() { state = START_ACCEPT; }
+void ProtocolV2::accept() {
+ ldout(cct, 1) << __func__ << dendl;
+ state = START_ACCEPT;
+}
bool ProtocolV2::is_connected() { return can_write; }
}
void ProtocolV2::reset_session() {
- ldout(cct, 20) << __func__ << dendl;
+ ldout(cct, 1) << __func__ << dendl;
std::lock_guard<std::mutex> l(connection->write_lock);
if (connection->delay_state) {
}
void ProtocolV2::stop() {
- ldout(cct, 2) << __func__ << dendl;
+ ldout(cct, 1) << __func__ << dendl;
if (state == CLOSED) {
return;
}
while (!sent.empty()) {
Message *m = sent.back();
sent.pop_back();
- ldout(cct, 10) << __func__ << " " << *m << " for resend "
- << " (" << m->get_seq() << ")" << dendl;
+ ldout(cct, 5) << __func__ << " requeueing message m=" << m
+ << " seq=" << m->get_seq() << " type=" << m->get_type() << " "
+ << *m << dendl;
rq.push_front(make_pair(bufferlist(), m));
}
}
while (!rq.empty()) {
pair<bufferlist, Message *> p = rq.front();
if (p.second->get_seq() == 0 || p.second->get_seq() > seq) break;
- ldout(cct, 10) << __func__ << " " << *(p.second) << " for resend seq "
- << p.second->get_seq() << " <= " << seq << ", discarding"
- << dendl;
+ ldout(cct, 5) << __func__ << " discarding message m=" << p.second
+ << " seq=" << p.second->get_seq() << " ack_seq=" << seq << " "
+ << *(p.second) << dendl;
p.second->put();
rq.pop_front();
count++;
if (connection->policy.lossy && state != START_CONNECT &&
state != CONNECTING) {
- ldout(cct, 1) << __func__ << " on lossy channel, failing" << dendl;
+ ldout(cct, 2) << __func__ << " on lossy channel, failing" << dendl;
stop();
connection->dispatch_queue->queue_reset(connection);
return nullptr;
if (out_queue.empty() && state >= START_ACCEPT &&
state <= ACCEPTING_SESSION && !replacing) {
- ldout(cct, 10) << __func__ << " with nothing to send and in the half "
+ ldout(cct, 2) << __func__ << " with nothing to send and in the half "
<< " accept state just closed" << dendl;
connection->write_lock.unlock();
stop();
if (connection->policy.standby && out_queue.empty() && !keepalive &&
state != WAIT) {
- ldout(cct, 10) << __func__ << " with nothing to send, going to standby"
- << dendl;
+ ldout(cct, 1) << __func__ << " with nothing to send, going to standby"
+ << dendl;
state = STANDBY;
connection->write_lock.unlock();
return nullptr;
if (state != START_CONNECT && state != CONNECTING && state != WAIT) {
// policy maybe empty when state is in accept
if (connection->policy.server) {
- ldout(cct, 0) << __func__ << " server, going to standby" << dendl;
+ ldout(cct, 1) << __func__ << " server, going to standby" << dendl;
state = STANDBY;
} else {
- ldout(cct, 0) << __func__ << " initiating reconnect" << dendl;
+ ldout(cct, 1) << __func__ << " initiating reconnect" << dendl;
connect_seq++;
global_seq = messenger->get_global_seq();
state = START_CONNECT;
global_seq = messenger->get_global_seq();
state = START_CONNECT;
connection->state = AsyncConnection::STATE_CONNECTING;
- ldout(cct, 10) << __func__ << " waiting " << backoff << dendl;
+ ldout(cct, 1) << __func__ << " waiting " << backoff << dendl;
// woke up again;
connection->register_time_events.insert(
connection->center->create_time_event(backoff.to_nsec() / 1000,
// ensure the correctness of message encoding
bl.clear();
m->clear_payload();
- ldout(cct, 5) << __func__ << " clear encoded buffer previous " << f
- << " != " << connection->get_features() << dendl;
+ ldout(cct, 10) << __func__ << " clear encoded buffer previous " << f
+ << " != " << connection->get_features() << dendl;
}
if (state == CLOSED) {
ldout(cct, 10) << __func__ << " connection closed."
<< " Drop message " << m << dendl;
m->put();
} else {
+ ldout(cct, 5) << __func__ << " enqueueing message m=" << m
+ << " type=" << m->get_type() << " " << *m << dendl;
m->trace.event("async enqueueing message");
out_queue[m->get_priority()].emplace_back(std::move(bl), m);
ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
encrypt_payload(flat_bl);
MessageFrame message(this, header2, flat_bl);
- ldout(cct, 5) << __func__ << " sending message type=" << header2.type
- << " src " << entity_name_t(messenger->get_myname())
- << " front=" << header2.front_len
- << " data=" << header2.data_len << " off " << header2.data_off
- << dendl;
+ ldout(cct, 5) << __func__ << " sending message m=" << m
+ << " seq=" << m->get_seq() << " " << *m << dendl;
bufferlist &msg_bl = message.get_buffer();
connection->outcoming_bl.claim_append(msg_bl);
m->trace.event("async writing message");
- ldout(cct, 20) << __func__ << " sending " << m->get_seq() << " " << m
+ ldout(cct, 20) << __func__ << " sending m=" << m << " seq=" << m->get_seq()
+ << " src=" << entity_name_t(messenger->get_myname())
+ << " front=" << header2.front_len
+ << " data=" << header2.data_len << " off=" << header2.data_off
<< dendl;
ssize_t total_send_size = connection->outcoming_bl.length();
ssize_t rc = connection->_try_send(more);
return _fault();
}
-unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
+ unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
if (memcmp(buffer, CEPH_BANNER_V2_PREFIX, banner_prefix_len)) {
if (memcmp(buffer, CEPH_BANNER, strlen(CEPH_BANNER))) {
offset += sizeof(__le64);
peer_required_features = *(__le64 *)(buffer + offset);
- ldout(cct, 1) << __func__ << " banner peer_type=" << (int)peer_type
- << " supported=" << std::hex << peer_supported_features
- << " required=" << std::hex << peer_required_features
- << std::dec << dendl;
-
if (connection->get_peer_type() == -1) {
connection->set_peer_type(peer_type);
}
}
+ ldout(cct, 1) << __func__ << " peer_type=" << (int)peer_type
+ << " supported=" << std::hex << peer_supported_features
+ << " required=" << std::hex << peer_required_features
+ << std::dec << dendl;
+
// Check feature bit compatibility
__le64 supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
connection->maybe_start_delay_thread();
state = READY;
+ ldout(cct, 1) << __func__ << " entity=" << peer_name << " cookie=" << std::hex
+ << cookie << std::dec << " in_seq=" << in_seq
+ << " out_seq=" << out_seq << dendl;
+
return CONTINUE(read_frame);
}
// note last received message.
in_seq = message->get_seq();
- ldout(cct, 5) << " rx " << message->get_source() << " seq "
- << message->get_seq() << " " << message << " " << *message
- << dendl;
+ ldout(cct, 5) << __func__ << " received message m=" << message
+ << " seq=" << message->get_seq()
+ << " from=" << message->get_source() << " type=" << header.type
+ << " " << *message << dendl;
bool need_dispatch_writer = true;
if (!connection->policy.lossy) {
return WRITE(bl, "session retry", read_frame);
}
+ ldout(cct, 1) << __func__ << " reconnect to existing=" << existing << dendl;
+
// everything looks good
exproto->connect_seq = reconnect.connect_seq();
exproto->message_seq = reconnect.msg_seq();
if (existing->policy.lossy) {
// existing connection can be thrown out in favor of this one
ldout(cct, 1)
- << __func__
- << " accept replacing existing (lossy) channel (new one lossy="
- << connection->policy.lossy << ")" << dendl;
+ << __func__ << " existing=" << existing
+ << " is a lossy channel. Stopping existing in favor of this connection"
+ << dendl;
existing->protocol->stop();
existing->dispatch_queue->queue_reset(existing.get());
return send_server_ident();
// Found previous session
// peer has reseted and we're going to reuse the existing connection
// by replacing the communication socket
- ldout(cct, 1) << __func__
- << " found previous session, peer must have reseted."
- << dendl;
+ ldout(cct, 1) << __func__ << " found previous session existing=" << existing
+ << ", peer must have reseted." << dendl;
if (connection->policy.resetcheck) {
exproto->reset_session();
}
}
if (exproto->state == READY || exproto->state == STANDBY) {
- ldout(cct, 1) << __func__
- << " existing connection is READY/STANDBY, lets reuse it"
- << dendl;
+ ldout(cct, 1) << __func__ << " existing=" << existing
+ << " is READY/STANDBY, lets reuse it" << dendl;
return reuse_connection(existing, exproto, false);
}
// this connection wins
ldout(cct, 1) << __func__
<< " connection race detected, replacing existing="
- << existing << " socket by this connection's socket"
- << dendl;
+ << existing << " socket by this connection's socket" << dendl;
return reuse_connection(existing, exproto, false);
} else {
// the existing connection wins
- ldout(cct, 1) << __func__
- << " connection race detected, this connection loses"
- << dendl;
+ ldout(cct, 1)
+ << __func__
+ << " connection race detected, this connection loses to existing="
+ << existing << dendl;
ceph_assert(connection->peer_addrs->msgr2_addr() >
messenger->get_myaddrs().msgr2_addr());
auto temp_cs = std::move(connection->cs);
EventCenter *new_center = connection->center;
Worker *new_worker = connection->worker;
+
+ ldout(messenger->cct, 5) << __func__ << " stop myself to swap existing"
+ << dendl;
// avoid _stop shutdown replacing socket
// queue a reset on the new connection, which we're dumping for the old
stop();
connection->dispatch_queue->queue_reset(connection);
- ldout(messenger->cct, 1) << __func__ << " stop myself to swap existing"
- << dendl;
+
exproto->can_write = false;
exproto->replacing = true;
exproto->session_security = session_security;