cookie(0),
message_seq(0),
can_write(false),
+ connect_seq(0),
bannerExchangeCallback(nullptr),
next_frame_len(0),
keepalive(false) {
state = CLOSED;
}
-void ProtocolV2::fault() {
+void ProtocolV2::fault() { _fault(); }
+
+void ProtocolV2::requeue_sent() {
+ if (sent.empty()) {
+ return;
+ }
+
+ list<pair<bufferlist, Message *> > &rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
+ out_seq -= sent.size();
+ while (!sent.empty()) {
+ Message *m = sent.back();
+ sent.pop_back();
+ ldout(cct, 10) << __func__ << " " << *m << " for resend "
+ << " (" << m->get_seq() << ")" << dendl;
+ rq.push_front(make_pair(bufferlist(), m));
+ }
+}
+
+void ProtocolV2::reset_recv_state() {
+ // clean read and write callbacks
+ connection->pendingReadLen.reset();
+ connection->writeCallback.reset();
+
+ uint32_t cur_msg_size = current_header.front_len + current_header.middle_len +
+ current_header.data_len;
+
+ if (state > THROTTLE_MESSAGE && state <= READ_MESSAGE_FRONT &&
+ connection->policy.throttler_messages) {
+ ldout(cct, 10) << __func__ << " releasing " << 1
+ << " message to policy throttler "
+ << connection->policy.throttler_messages->get_current()
+ << "/" << connection->policy.throttler_messages->get_max()
+ << dendl;
+ connection->policy.throttler_messages->put();
+ }
+ if (state > THROTTLE_BYTES && state <= READ_MESSAGE_FRONT) {
+ if (connection->policy.throttler_bytes) {
+ ldout(cct, 10) << __func__ << " releasing " << cur_msg_size
+ << " bytes to policy throttler "
+ << connection->policy.throttler_bytes->get_current() << "/"
+ << connection->policy.throttler_bytes->get_max() << dendl;
+ connection->policy.throttler_bytes->put(cur_msg_size);
+ }
+ }
+ if (state > THROTTLE_DISPATCH_QUEUE && state <= READ_MESSAGE_FRONT) {
+ ldout(cct, 10)
+ << __func__ << " releasing " << cur_msg_size
+ << " bytes to dispatch_queue throttler "
+ << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
+ << connection->dispatch_queue->dispatch_throttler.get_max() << dendl;
+ connection->dispatch_queue->dispatch_throttle_release(cur_msg_size);
+ }
+}
+
+CtPtr ProtocolV2::_fault() {
ldout(cct, 10) << __func__ << dendl;
if (state == CLOSED || state == NONE) {
ldout(cct, 10) << __func__ << " connection is already closed" << dendl;
- return;
+ return nullptr;
}
if (connection->policy.lossy && state != START_CONNECT &&
ldout(cct, 1) << __func__ << " on lossy channel, failing" << dendl;
stop();
connection->dispatch_queue->queue_reset(connection);
- return;
+ return nullptr;
}
+
+ connection->write_lock.lock();
+
+ can_write = false;
+ // requeue sent items
+ requeue_sent();
+
+ if (out_queue.empty() && state >= START_ACCEPT &&
+ state <= ACCEPTED_CLIENT_IDENT) {
+ ldout(cct, 10) << __func__ << " with nothing to send and in the half "
+ << " accept state just closed" << dendl;
+ stop();
+ connection->dispatch_queue->queue_reset(connection);
+ connection->write_lock.unlock();
+ return nullptr;
+ }
+
+ connection->fault();
+ reset_recv_state();
+
+ if (connection->policy.standby && out_queue.empty() && state != WAIT) {
+ ldout(cct, 10) << __func__ << " with nothing to send, going to standby"
+ << dendl;
+ state = STANDBY;
+ connection->write_lock.unlock();
+ return nullptr;
+ }
+
+ connection->write_lock.unlock();
+
+ if (state != START_CONNECT && state != CONNECTING && state != WAIT) {
+ // policy maybe empty when state is in accept
+ if (connection->policy.server) {
+ ldout(cct, 0) << __func__ << " server, going to standby" << dendl;
+ state = STANDBY;
+ } else {
+ ldout(cct, 0) << __func__ << " initiating reconnect" << dendl;
+ connect_seq++;
+ state = START_CONNECT;
+ connection->state = AsyncConnection::STATE_CONNECTING;
+ }
+ backoff = utime_t();
+ connection->center->dispatch_event_external(connection->read_handler);
+ } else {
+ if (state == WAIT) {
+ backoff.set_from_double(cct->_conf->ms_max_backoff);
+ } else if (backoff == utime_t()) {
+ backoff.set_from_double(cct->_conf->ms_initial_backoff);
+ } else {
+ backoff += backoff;
+ if (backoff > cct->_conf->ms_max_backoff)
+ backoff.set_from_double(cct->_conf->ms_max_backoff);
+ }
+
+ state = START_CONNECT;
+ connection->state = AsyncConnection::STATE_CONNECTING;
+ ldout(cct, 10) << __func__ << " waiting " << backoff << dendl;
+ // woke up again;
+ connection->register_time_events.insert(
+ connection->center->create_time_event(backoff.to_nsec() / 1000,
+ connection->wakeup_handler));
+ }
+ return nullptr;
}
void ProtocolV2::prepare_send_message(uint64_t features, Message *m,
connection->write_lock.unlock();
connection->lock.lock();
connection->write_lock.lock();
- // if (state == STANDBY && !connection->policy.server && is_queued()) {
- // ldout(cct, 10) << __func__ << " policy.server is false" << dendl;
- // connection->_connect();
- // } else
- if (connection->cs && state != NONE && state != CLOSED &&
- state != START_CONNECT) {
+ if (state == STANDBY && !connection->policy.server && is_queued()) {
+ ldout(cct, 10) << __func__ << " policy.server is false" << dendl;
+ connection->_connect();
+ } else if (connection->cs && state != NONE && state != CLOSED &&
+ state != START_CONNECT) {
r = connection->_try_send();
if (r < 0) {
ldout(cct, 1) << __func__ << " send outcoming bl failed" << dendl;
if (connection->get_peer_type() == -1) {
connection->set_peer_type(peer_type);
+
+ ceph_assert(state == ACCEPTING);
+ connection->policy = messenger->get_policy(peer_type);
+ ldout(cct, 10) << __func__ << " accept of host_type " << peer_type
+ << ", policy.lossy=" << connection->policy.lossy
+ << " policy.server=" << connection->policy.server
+ << " policy.standby=" << connection->policy.standby
+ << " policy.resetcheck=" << connection->policy.resetcheck
+ << dendl;
} else {
if (connection->get_peer_type() != peer_type) {
ldout(cct, 1) << __func__ << " connection peer type does not match what"
return WRITE(bl, handle_ident_missing_features_write);
}
+ state = ACCEPTED_CLIENT_IDENT;
+
// if everything is OK reply with server identification
connection->peer_global_id = client_ident.gid;
cookie = client_ident.cookie;
connection->inject_delay();
return _fault();
}
- if (state != ACCEPTING) {
+ if (state != ACCEPTED_CLIENT_IDENT) {
ldout(cct, 1) << __func__
<< " state changed while accept_conn, it must be mark_down"
<< dendl;