From 38eafa721e3be219fcdaca72cb1e9d1b5d7802da Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Wed, 31 Oct 2018 16:30:32 +0000 Subject: [PATCH] msg/async: msgr2: messange exchange phase Signed-off-by: Ricardo Dias --- src/include/msgr.h | 24 + src/msg/async/ProtocolV2.cc | 941 ++++++++++++++++++++++++++++++++++-- src/msg/async/ProtocolV2.h | 287 ++++++++--- 3 files changed, 1138 insertions(+), 114 deletions(-) diff --git a/src/include/msgr.h b/src/include/msgr.h index 0c342fc1e0a..687f4a82b5e 100644 --- a/src/include/msgr.h +++ b/src/include/msgr.h @@ -193,6 +193,30 @@ struct ceph_msg_header { __le32 crc; /* header crc32c */ } __attribute__ ((packed)); +struct ceph_msg_header2 { + __le64 seq; /* message seq# for this session */ + __le64 tid; /* transaction id */ + __le16 type; /* message type */ + __le16 priority; /* priority. higher value == higher priority */ + __le16 version; /* version of message encoding */ + + __le32 front_len; /* bytes in main payload */ + __le32 middle_len;/* bytes in middle payload */ + __le32 data_pre_padding_len; + __le32 data_len; /* bytes of data payload */ + __le16 data_off; /* sender: include full offset; + receiver: mask against ~PAGE_MASK */ + + __le64 ack_seq; + + __le32 front_crc, middle_crc, data_crc; + __u8 flags; + /* oldest code we think can decode this. unknown if zero. */ + __le16 compat_version; + __le16 reserved; + __le32 header_crc; +} __attribute__ ((packed)); + #define CEPH_MSG_PRIO_LOW 64 #define CEPH_MSG_PRIO_DEFAULT 127 #define CEPH_MSG_PRIO_HIGH 196 diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index d526210eb2d..8cdd9658e49 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -4,6 +4,7 @@ #include "ProtocolV2.h" #include "AsyncMessenger.h" +#include "common/EventTrace.h" #include "common/errno.h" #include "include/random.h" @@ -20,6 +21,8 @@ ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) { << " l=" << connection->policy.lossy << ")."; } +const int ASYNC_COALESCE_THRESHOLD = 256; + #define WRITE(B, C) write(CONTINUATION(C), B) #define READ(L, C) read(CONTINUATION(C), L) @@ -28,14 +31,34 @@ ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) { using CtPtr = Ct *; +static void alloc_aligned_buffer(bufferlist &data, unsigned len, unsigned off) { + // create a buffer to read into that matches the data alignment + unsigned alloc_len = 0; + unsigned left = len; + unsigned head = 0; + if (off & ~CEPH_PAGE_MASK) { + // head + alloc_len += CEPH_PAGE_SIZE; + head = std::min(CEPH_PAGE_SIZE - (off & ~CEPH_PAGE_MASK), left); + left -= head; + } + alloc_len += left; + bufferptr ptr(buffer::create_small_page_aligned(alloc_len)); + if (head) ptr.set_offset(CEPH_PAGE_SIZE - head); + data.push_back(std::move(ptr)); +} + ProtocolV2::ProtocolV2(AsyncConnection *connection) : Protocol(2, connection), temp_buffer(nullptr), state(NONE), peer_required_features(0), cookie(0), + message_seq(0), + can_write(false), bannerExchangeCallback(nullptr), - next_frame_len(0) { + next_frame_len(0), + keepalive(false) { temp_buffer = new char[4096]; } @@ -45,7 +68,31 @@ void ProtocolV2::connect() { state = START_CONNECT; } void ProtocolV2::accept() { state = START_ACCEPT; } -bool ProtocolV2::is_connected() { return false; } +bool ProtocolV2::is_connected() { return can_write; } + +/* + * Tears down the message queues, and removes them from the + * DispatchQueue Must hold write_lock prior to calling. + */ +void ProtocolV2::discard_out_queue() { + ldout(cct, 10) << __func__ << " started" << dendl; + + for (list::iterator p = sent.begin(); p != sent.end(); ++p) { + ldout(cct, 20) << __func__ << " discard " << *p << dendl; + (*p)->put(); + } + sent.clear(); + for (map > >::iterator p = + out_queue.begin(); + p != out_queue.end(); ++p) { + for (list >::iterator r = p->second.begin(); + r != p->second.end(); ++r) { + ldout(cct, 20) << __func__ << " discard " << r->second << dendl; + r->second->put(); + } + } + out_queue.clear(); +} void ProtocolV2::stop() { ldout(cct, 2) << __func__ << dendl; @@ -55,8 +102,13 @@ void ProtocolV2::stop() { if (connection->delay_state) connection->delay_state->flush(); + std::lock_guard l(connection->write_lock); + + discard_out_queue(); + connection->_stop(); + can_write = false; state = CLOSED; } @@ -77,9 +129,70 @@ void ProtocolV2::fault() { } } -void ProtocolV2::send_message(Message *m) {} +void ProtocolV2::prepare_send_message(uint64_t features, Message *m, + bufferlist &bl) { + ldout(cct, 20) << __func__ << " m=" << *m << dendl; + + // associate message with Connection (for benefit of encode_payload) + if (m->empty_payload()) { + ldout(cct, 20) << __func__ << " encoding features " << features << " " << m + << " " << *m << dendl; + } else { + ldout(cct, 20) << __func__ << " half-reencoding features " << features + << " " << m << " " << *m << dendl; + } -void ProtocolV2::send_keepalive() {} + // encode and copy out of *m + m->encode(features, messenger->crcflags); + + bl.append(m->get_payload()); + bl.append(m->get_middle()); + bl.append(m->get_data()); +} + +void ProtocolV2::send_message(Message *m) { + bufferlist bl; + uint64_t f = connection->get_features(); + + // TODO: Currently not all messages supports reencode like MOSDMap, so here + // only let fast dispatch support messages prepare message + bool can_fast_prepare = messenger->ms_can_fast_dispatch(m); + if (can_fast_prepare) { + prepare_send_message(f, m, bl); + } + + std::lock_guard l(connection->write_lock); + // "features" changes will change the payload encoding + if (can_fast_prepare && (!can_write || connection->get_features() != f)) { + // ensure the correctness of message encoding + bl.clear(); + m->clear_payload(); + ldout(cct, 5) << __func__ << " clear encoded buffer previous " << f + << " != " << connection->get_features() << dendl; + } + if (state == CLOSED) { + ldout(cct, 10) << __func__ << " connection closed." + << " Drop message " << m << dendl; + m->put(); + } else { + m->trace.event("async enqueueing message"); + out_queue[m->get_priority()].emplace_back(std::move(bl), m); + ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m + << dendl; + if (can_write) { + connection->center->dispatch_event_external(connection->write_handler); + } + } +} + +void ProtocolV2::send_keepalive() { + ldout(cct, 10) << __func__ << dendl; + std::lock_guard l(connection->write_lock); + if (can_write) { + keepalive = true; + connection->center->dispatch_event_external(connection->write_handler); + } +} void ProtocolV2::read_event() { ldout(cct, 20) << __func__ << dendl; @@ -91,12 +204,221 @@ void ProtocolV2::read_event() { case START_ACCEPT: CONTINUATION_RUN(CONTINUATION(start_server_banner_exchange)); break; + case READY: + CONTINUATION_RUN(CONTINUATION(read_frame)); + break; + case THROTTLE_MESSAGE: + CONTINUATION_RUN(CONTINUATION(throttle_message)); + break; + case THROTTLE_BYTES: + CONTINUATION_RUN(CONTINUATION(throttle_bytes)); + break; + case THROTTLE_DISPATCH_QUEUE: + CONTINUATION_RUN(CONTINUATION(throttle_dispatch_queue)); + break; default: break; } } -void ProtocolV2::write_event() {} +Message *ProtocolV2::_get_next_outgoing(bufferlist *bl) { + Message *m = 0; + if (!out_queue.empty()) { + map > >::reverse_iterator it = + out_queue.rbegin(); + ceph_assert(!it->second.empty()); + list >::iterator p = it->second.begin(); + m = p->second; + if (bl) { + bl->swap(p->first); + } + it->second.erase(p); + if (it->second.empty()) { + out_queue.erase(it->first); + } + } + return m; +} + +ssize_t ProtocolV2::write_message(Message *m, bufferlist &bl, bool more) { + FUNCTRACE(cct); + ceph_assert(connection->center->in_thread()); + m->set_seq(++out_seq); + + connection->lock.lock(); + uint64_t ack_seq = in_seq; + ack_left = 0; + connection->lock.unlock(); + + MessageFrame message(m, bl, ack_seq, messenger->crcflags & MSG_CRC_HEADER); + + ldout(cct, 20) << __func__ << " sending message type=" << message.header2.type + << " src " << entity_name_t(messenger->get_myname()) + << " front=" << message.header2.front_len + << " data=" << message.header2.data_len << " off " + << message.header2.data_off << dendl; + + bufferlist &msg_bl = message.get_buffer(); + connection->outcoming_bl.claim_append(msg_bl); + + m->trace.event("async writing message"); + ldout(cct, 20) << __func__ << " sending " << m->get_seq() << " " << m + << dendl; + ssize_t total_send_size = connection->outcoming_bl.length(); + ssize_t rc = connection->_try_send(more); + if (rc < 0) { + ldout(cct, 1) << __func__ << " error sending " << m << ", " + << cpp_strerror(rc) << dendl; + } else { + connection->logger->inc( + l_msgr_send_bytes, total_send_size - connection->outcoming_bl.length()); + ldout(cct, 10) << __func__ << " sending " << m + << (rc ? " continuely." : " done.") << dendl; + } + if (m->get_type() == CEPH_MSG_OSD_OP) + OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OP_END", false); + else if (m->get_type() == CEPH_MSG_OSD_OPREPLY) + OID_EVENT_TRACE_WITH_MSG(m, "SEND_MSG_OSD_OPREPLY_END", false); + m->put(); + + return rc; +} + +void ProtocolV2::append_keepalive() { + ldout(cct, 10) << __func__ << dendl; + KeepAliveFrame keepalive_frame; + connection->outcoming_bl.claim_append(keepalive_frame.get_buffer()); +} + +void ProtocolV2::append_keepalive_ack(utime_t ×tamp) { + struct ceph_timespec ts; + timestamp.encode_timeval(&ts); + KeepAliveFrame keepalive_ack_frame(ts); + connection->outcoming_bl.claim_append(keepalive_ack_frame.get_buffer()); +} + +void ProtocolV2::handle_message_ack(uint64_t seq) { + if (connection->policy.lossy) { // lossy connections don't keep sent messages + return; + } + + ldout(cct, 15) << __func__ << " seq=" << seq << dendl; + + // trim sent list + static const int max_pending = 128; + int i = 0; + Message *pending[max_pending]; + connection->write_lock.lock(); + while (!sent.empty() && sent.front()->get_seq() <= seq && i < max_pending) { + Message *m = sent.front(); + sent.pop_front(); + pending[i++] = m; + ldout(cct, 10) << __func__ << " got ack seq " << seq + << " >= " << m->get_seq() << " on " << m << " " << *m + << dendl; + } + connection->write_lock.unlock(); + for (int k = 0; k < i; k++) { + pending[k]->put(); + } +} + +void ProtocolV2::write_event() { + ldout(cct, 10) << __func__ << dendl; + ssize_t r = 0; + + connection->write_lock.lock(); + if (can_write) { + if (keepalive) { + append_keepalive(); + keepalive = false; + } + + auto start = ceph::mono_clock::now(); + bool more; + do { + bufferlist data; + Message *m = _get_next_outgoing(&data); + if (!m) { + break; + } + + if (!connection->policy.lossy) { + // put on sent list + sent.push_back(m); + m->get(); + } + more = !out_queue.empty(); + connection->write_lock.unlock(); + + // send_message or requeue messages may not encode message + if (!data.length()) { + prepare_send_message(connection->get_features(), m, data); + } + + r = write_message(m, data, more); + + connection->write_lock.lock(); + if (r == 0) { + ; + } else if (r < 0) { + ldout(cct, 1) << __func__ << " send msg failed" << dendl; + break; + } else if (r > 0) + break; + } while (can_write); + connection->write_lock.unlock(); + + // if r > 0 mean data still lefted, so no need _try_send. + if (r == 0) { + uint64_t left = ack_left; + if (left) { + ceph_le64 s; + s = in_seq; + AckFrame ack(in_seq); + connection->outcoming_bl.claim_append(ack.get_buffer()); + ldout(cct, 10) << __func__ << " try send msg ack, acked " << left + << " messages" << dendl; + ack_left -= left; + left = ack_left; + r = connection->_try_send(left); + } else if (is_queued()) { + r = connection->_try_send(); + } + } + + connection->logger->tinc(l_msgr_running_send_time, + ceph::mono_clock::now() - start); + if (r < 0) { + ldout(cct, 1) << __func__ << " send msg failed" << dendl; + connection->lock.lock(); + fault(); + connection->lock.unlock(); + return; + } + } else { + connection->write_lock.unlock(); + connection->lock.lock(); + connection->write_lock.lock(); + // if (state == STANDBY && !connection->policy.server && is_queued()) { + // ldout(cct, 10) << __func__ << " policy.server is false" << dendl; + // connection->_connect(); + // } else + if (connection->cs && state != NONE && state != CLOSED && + state != START_CONNECT) { + r = connection->_try_send(); + if (r < 0) { + ldout(cct, 1) << __func__ << " send outcoming bl failed" << dendl; + connection->write_lock.unlock(); + fault(); + connection->lock.unlock(); + return; + } + } + connection->write_lock.unlock(); + connection->lock.unlock(); + } +} bool ProtocolV2::is_queued() { return false; } @@ -252,25 +574,49 @@ unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX); } CtPtr ProtocolV2::read_frame() { + if (state == CLOSED) { + return nullptr; + } + ldout(cct, 20) << __func__ << dendl; - return READ(sizeof(__le32), handle_read_frame_length); + return READ(sizeof(__le32) * 2, handle_read_frame_length_and_tag); } -CtPtr ProtocolV2::handle_read_frame_length(char *buffer, int r) { +CtPtr ProtocolV2::handle_read_frame_length_and_tag(char *buffer, int r) { ldout(cct, 20) << __func__ << " r=" << r << dendl; if (r < 0) { - ldout(cct, 1) << __func__ << " read frame length failed r=" << r << " (" - << cpp_strerror(r) << ")" << dendl; + ldout(cct, 1) << __func__ << " read frame length and tag failed r=" << r + << " (" << cpp_strerror(r) << ")" << dendl; return _fault(); } - next_frame_len = *(__le32 *)buffer; + next_frame_len = *(uint32_t *)buffer - sizeof(uint32_t); + next_tag = static_cast(*(uint32_t *)(buffer + sizeof(uint32_t))); - return READ(next_frame_len, handle_frame); + ldout(cct, 10) << __func__ << " next frame_len=" << next_frame_len + << " tag=" << static_cast(next_tag) << dendl; + + switch (next_tag) { + case Tag::AUTH_REQUEST: + case Tag::AUTH_BAD_METHOD: + case Tag::AUTH_BAD_AUTH: + case Tag::AUTH_MORE: + case Tag::AUTH_DONE: + case Tag::IDENT: + case Tag::IDENT_MISSING_FEATURES: + case Tag::KEEPALIVE2: + case Tag::KEEPALIVE2_ACK: + case Tag::ACK: + return READ(next_frame_len, handle_frame_payload); + case Tag::MESSAGE: + return handle_message(); + } + + return nullptr; } -CtPtr ProtocolV2::handle_frame(char *buffer, int r) { +CtPtr ProtocolV2::handle_frame_payload(char *buffer, int r) { ldout(cct, 20) << __func__ << " r=" << r << dendl; if (r < 0) { @@ -279,27 +625,27 @@ CtPtr ProtocolV2::handle_frame(char *buffer, int r) { return _fault(); } - Tag tag = static_cast(*(__le32 *)buffer); - buffer += sizeof(__le32); - uint32_t payload_len = next_frame_len - sizeof(__le32); - - ldout(cct, 10) << __func__ << " tag=" << static_cast(tag) << dendl; - - switch (tag) { + switch (next_tag) { case Tag::AUTH_REQUEST: - return handle_auth_request(buffer, payload_len); + return handle_auth_request(buffer, next_frame_len); case Tag::AUTH_BAD_METHOD: - return handle_auth_bad_method(buffer, payload_len); + return handle_auth_bad_method(buffer, next_frame_len); case Tag::AUTH_BAD_AUTH: - return handle_auth_bad_auth(buffer, payload_len); + return handle_auth_bad_auth(buffer, next_frame_len); case Tag::AUTH_MORE: - return handle_auth_more(buffer, payload_len); + return handle_auth_more(buffer, next_frame_len); case Tag::AUTH_DONE: - return handle_auth_done(buffer, payload_len); + return handle_auth_done(buffer, next_frame_len); case Tag::IDENT: - return handle_ident(buffer, payload_len); + return handle_ident(buffer, next_frame_len); case Tag::IDENT_MISSING_FEATURES: - return handle_ident_missing_features(buffer, payload_len); + return handle_ident_missing_features(buffer, next_frame_len); + case Tag::KEEPALIVE2: + return handle_keepalive2(buffer, next_frame_len); + case Tag::KEEPALIVE2_ACK: + return handle_keepalive2_ack(buffer, next_frame_len); + case Tag::ACK: + return handle_message_ack(buffer, next_frame_len); default: ceph_abort(); } @@ -334,14 +680,14 @@ CtPtr ProtocolV2::handle_auth_more(char *payload, uint32_t length) { encode(hello, auth_bl, 0); /* END TO REMOVE */ AuthMoreFrame more(auth_bl); - bufferlist bl = more.to_bufferlist(); + bufferlist &bl = more.get_buffer(); return WRITE(bl, handle_auth_more_write); } /* END TO REMOVE */ AuthDoneFrame auth_done(0); - auto bl = auth_done.to_bufferlist(); + bufferlist &bl = auth_done.get_buffer(); return WRITE(bl, handle_auth_done_write); } @@ -367,6 +713,491 @@ CtPtr ProtocolV2::handle_ident(char *payload, uint32_t length) { ceph_abort("wrong state at handle_ident"); } +CtPtr ProtocolV2::ready() { + ldout(cct, 25) << __func__ << dendl; + + // make sure no pending tick timer + if (connection->last_tick_id) { + connection->center->delete_time_event(connection->last_tick_id); + } + connection->last_tick_id = connection->center->create_time_event( + connection->inactive_timeout_us, connection->tick_handler); + + { + std::lock_guard l(connection->write_lock); + can_write = true; + if (!out_queue.empty()) { + connection->center->dispatch_event_external(connection->write_handler); + } + } + + state = READY; + + return CONTINUE(read_frame); +} + +CtPtr ProtocolV2::handle_message() { + ldout(cct, 20) << __func__ << dendl; + + ceph_assert(state == READY); + + return READ(sizeof(ceph_msg_header2), handle_message_header); +} + +CtPtr ProtocolV2::handle_message_header(char *buffer, int r) { + ldout(cct, 20) << __func__ << " r=" << r << dendl; + + if (r < 0) { + ldout(cct, 1) << __func__ << " read message header failed" << dendl; + return _fault(); + } + + ceph_msg_header2 header; + header = *((ceph_msg_header2 *)buffer); + + entity_name_t src(connection->peer_type, connection->peer_global_id); + + ldout(cct, 20) << __func__ << " got envelope type=" << header.type << " src " + << src << " front=" << header.front_len + << " data=" << header.data_len << " off " << header.data_off + << dendl; + + if (messenger->crcflags & MSG_CRC_HEADER) { + __u32 header_crc = 0; + header_crc = ceph_crc32c(0, (unsigned char *)&header, + sizeof(header) - sizeof(header.header_crc)); + // verify header crc + if (header_crc != header.header_crc) { + ldout(cct, 0) << __func__ << " got bad header crc " << header_crc + << " != " << header.header_crc << dendl; + return _fault(); + } + } + + // Reset state + data_buf.clear(); + front.clear(); + middle.clear(); + data.clear(); + current_header = header; + + state = THROTTLE_MESSAGE; + return CONTINUE(throttle_message); +} + +CtPtr ProtocolV2::throttle_message() { + ldout(cct, 20) << __func__ << dendl; + + if (connection->policy.throttler_messages) { + ldout(cct, 10) << __func__ << " wants " << 1 + << " message from policy throttler " + << connection->policy.throttler_messages->get_current() + << "/" << connection->policy.throttler_messages->get_max() + << dendl; + if (!connection->policy.throttler_messages->get_or_fail()) { + ldout(cct, 10) << __func__ << " wants 1 message from policy throttle " + << connection->policy.throttler_messages->get_current() + << "/" << connection->policy.throttler_messages->get_max() + << " failed, just wait." << dendl; + // following thread pool deal with th full message queue isn't a + // short time, so we can wait a ms. + if (connection->register_time_events.empty()) { + connection->register_time_events.insert( + connection->center->create_time_event(1000, + connection->wakeup_handler)); + } + return nullptr; + } + } + + state = THROTTLE_BYTES; + return CONTINUE(throttle_bytes); +} + +CtPtr ProtocolV2::throttle_bytes() { + ldout(cct, 20) << __func__ << dendl; + + uint32_t cur_msg_size = current_header.front_len + current_header.middle_len + + current_header.data_len; + if (cur_msg_size) { + if (connection->policy.throttler_bytes) { + ldout(cct, 10) << __func__ << " wants " << cur_msg_size + << " bytes from policy throttler " + << connection->policy.throttler_bytes->get_current() << "/" + << connection->policy.throttler_bytes->get_max() << dendl; + if (!connection->policy.throttler_bytes->get_or_fail(cur_msg_size)) { + ldout(cct, 10) << __func__ << " wants " << cur_msg_size + << " bytes from policy throttler " + << connection->policy.throttler_bytes->get_current() + << "/" << connection->policy.throttler_bytes->get_max() + << " failed, just wait." << dendl; + // following thread pool deal with th full message queue isn't a + // short time, so we can wait a ms. + if (connection->register_time_events.empty()) { + connection->register_time_events.insert( + connection->center->create_time_event( + 1000, connection->wakeup_handler)); + } + return nullptr; + } + } + } + + state = THROTTLE_DISPATCH_QUEUE; + return CONTINUE(throttle_dispatch_queue); +} + +CtPtr ProtocolV2::throttle_dispatch_queue() { + ldout(cct, 20) << __func__ << dendl; + + uint32_t cur_msg_size = current_header.front_len + current_header.middle_len + + current_header.data_len; + + if (cur_msg_size) { + if (!connection->dispatch_queue->dispatch_throttler.get_or_fail( + cur_msg_size)) { + ldout(cct, 10) + << __func__ << " wants " << cur_msg_size + << " bytes from dispatch throttle " + << connection->dispatch_queue->dispatch_throttler.get_current() << "/" + << connection->dispatch_queue->dispatch_throttler.get_max() + << " failed, just wait." << dendl; + // following thread pool deal with th full message queue isn't a + // short time, so we can wait a ms. + if (connection->register_time_events.empty()) { + connection->register_time_events.insert( + connection->center->create_time_event(1000, + connection->wakeup_handler)); + } + return nullptr; + } + } + + throttle_stamp = ceph_clock_now(); + + state = READ_MESSAGE_FRONT; + return read_message_front(); +} + +CtPtr ProtocolV2::read_message_front() { + ldout(cct, 20) << __func__ << dendl; + + unsigned front_len = current_header.front_len; + if (front_len) { + if (!front.length()) { + front.push_back(buffer::create(front_len)); + } + return READB(front_len, front.c_str(), handle_message_front); + } + return read_message_middle(); +} + +CtPtr ProtocolV2::handle_message_front(char *buffer, int r) { + ldout(cct, 20) << __func__ << " r=" << r << dendl; + + if (r < 0) { + ldout(cct, 1) << __func__ << " read message front failed" << dendl; + return _fault(); + } + + ldout(cct, 20) << __func__ << " got front " << front.length() << dendl; + + return read_message_middle(); +} + +CtPtr ProtocolV2::read_message_middle() { + ldout(cct, 20) << __func__ << dendl; + + if (current_header.middle_len) { + if (!middle.length()) { + middle.push_back(buffer::create(current_header.middle_len)); + } + return READB(current_header.middle_len, middle.c_str(), + handle_message_middle); + } + + return read_message_data_prepare(); +} + +CtPtr ProtocolV2::handle_message_middle(char *buffer, int r) { + ldout(cct, 20) << __func__ << " r" << r << dendl; + + if (r < 0) { + ldout(cct, 1) << __func__ << " read message middle failed" << dendl; + return _fault(); + } + + ldout(cct, 20) << __func__ << " got middle " << middle.length() << dendl; + + return read_message_data_prepare(); +} + +CtPtr ProtocolV2::read_message_data_prepare() { + ldout(cct, 20) << __func__ << dendl; + + unsigned data_len = le32_to_cpu(current_header.data_len); + unsigned data_off = le32_to_cpu(current_header.data_off); + + if (data_len) { + // get a buffer + map >::iterator p = + connection->rx_buffers.find(current_header.tid); + if (p != connection->rx_buffers.end()) { + ldout(cct, 10) << __func__ << " seleting rx buffer v " << p->second.second + << " at offset " << data_off << " len " + << p->second.first.length() << dendl; + data_buf = p->second.first; + // make sure it's big enough + if (data_buf.length() < data_len) + data_buf.push_back(buffer::create(data_len - data_buf.length())); + data_blp = data_buf.begin(); + } else { + ldout(cct, 20) << __func__ << " allocating new rx buffer at offset " + << data_off << dendl; + alloc_aligned_buffer(data_buf, data_len, data_off); + data_blp = data_buf.begin(); + } + } + + msg_left = data_len; + + return CONTINUE(read_message_data); +} + +CtPtr ProtocolV2::read_message_data() { + ldout(cct, 20) << __func__ << " msg_left=" << msg_left << dendl; + + if (msg_left > 0) { + bufferptr bp = data_blp.get_current_ptr(); + unsigned read_len = std::min(bp.length(), msg_left); + + return READB(read_len, bp.c_str(), handle_message_data); + } + + state = READ_MESSAGE_COMPLETE; + return handle_message_complete(); +} + +CtPtr ProtocolV2::handle_message_data(char *buffer, int r) { + ldout(cct, 20) << __func__ << " r=" << r << dendl; + + if (r < 0) { + ldout(cct, 1) << __func__ << " read data error " << dendl; + return _fault(); + } + + bufferptr bp = data_blp.get_current_ptr(); + unsigned read_len = std::min(bp.length(), msg_left); + ceph_assert(read_len < std::numeric_limits::max()); + data_blp.advance(read_len); + data.append(bp, 0, read_len); + msg_left -= read_len; + + return CONTINUE(read_message_data); +} + +CtPtr ProtocolV2::handle_message_complete() { + ldout(cct, 20) << __func__ << dendl; + + ldout(cct, 20) << __func__ << " got " << front.length() << " + " + << middle.length() << " + " << data.length() << " byte message" + << dendl; + + ceph_msg_header header{ + current_header.seq, + current_header.tid, + current_header.type, + current_header.priority, + current_header.version, + current_header.front_len, + current_header.middle_len, + current_header.data_len, + current_header.data_off, + entity_name_t(connection->peer_type, connection->peer_global_id), + current_header.compat_version, + current_header.reserved, + 0}; + ceph_msg_footer footer{current_header.front_crc, current_header.middle_crc, + current_header.data_crc, 0, current_header.flags}; + + Message *message = decode_message(cct, messenger->crcflags, header, footer, + front, middle, data, connection); + if (!message) { + ldout(cct, 1) << __func__ << " decode message failed " << dendl; + return _fault(); + } + + // + // Check the signature if one should be present. A zero return indicates + // success. PLR + // + + // if (session_security.get() == NULL) { + // ldout(cct, 10) << __func__ << " no session security set" << dendl; + // } else { + // if (session_security->check_message_signature(message)) { + // ldout(cct, 0) << __func__ << " Signature check failed" << dendl; + // message->put(); + // return _fault(); + // } + // } + message->set_byte_throttler(connection->policy.throttler_bytes); + message->set_message_throttler(connection->policy.throttler_messages); + + uint32_t cur_msg_size = current_header.front_len + current_header.middle_len + + current_header.data_len; + + // store reservation size in message, so we don't get confused + // by messages entering the dispatch queue through other paths. + message->set_dispatch_throttle_size(cur_msg_size); + + message->set_recv_stamp(recv_stamp); + message->set_throttle_stamp(throttle_stamp); + message->set_recv_complete_stamp(ceph_clock_now()); + + // check received seq#. if it is old, drop the message. + // note that incoming messages may skip ahead. this is convenient for the + // client side queueing because messages can't be renumbered, but the (kernel) + // client will occasionally pull a message out of the sent queue to send + // elsewhere. in that case it doesn't matter if we "got" it or not. + uint64_t cur_seq = in_seq; + if (message->get_seq() <= cur_seq) { + ldout(cct, 0) << __func__ << " got old message " << message->get_seq() + << " <= " << cur_seq << " " << message << " " << *message + << ", discarding" << dendl; + message->put(); + if (connection->has_feature(CEPH_FEATURE_RECONNECT_SEQ) && + cct->_conf->ms_die_on_old_message) { + ceph_assert(0 == "old msgs despite reconnect_seq feature"); + } + return nullptr; + } + if (message->get_seq() > cur_seq + 1) { + ldout(cct, 0) << __func__ << " missed message? skipped from seq " + << cur_seq << " to " << message->get_seq() << dendl; + if (cct->_conf->ms_die_on_skipped_message) { + ceph_assert(0 == "skipped incoming seq"); + } + } + + message->set_connection(connection); + +#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) + if (message->get_type() == CEPH_MSG_OSD_OP || + message->get_type() == CEPH_MSG_OSD_OPREPLY) { + utime_t ltt_processed_stamp = ceph_clock_now(); + double usecs_elapsed = + (ltt_processed_stamp.to_nsec() - ltt_recv_stamp.to_nsec()) / 1000; + ostringstream buf; + if (message->get_type() == CEPH_MSG_OSD_OP) + OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OP", + false); + else + OID_ELAPSED_WITH_MSG(message, usecs_elapsed, "TIME_TO_DECODE_OSD_OPREPLY", + false); + } +#endif + + // note last received message. + in_seq = message->get_seq(); + ldout(cct, 5) << " rx " << message->get_source() << " seq " + << message->get_seq() << " " << message << " " << *message + << dendl; + + bool need_dispatch_writer = true; + if (!connection->policy.lossy) { + ack_left++; + need_dispatch_writer = true; + } + + state = READY; + + connection->logger->inc(l_msgr_recv_messages); + connection->logger->inc( + l_msgr_recv_bytes, + cur_msg_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer)); + + messenger->ms_fast_preprocess(message); + auto fast_dispatch_time = ceph::mono_clock::now(); + connection->logger->tinc(l_msgr_running_recv_time, + fast_dispatch_time - connection->recv_start_time); + if (connection->delay_state) { + double delay_period = 0; + if (rand() % 10000 < cct->_conf->ms_inject_delay_probability * 10000.0) { + delay_period = + cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0; + ldout(cct, 1) << "queue_received will delay after " + << (ceph_clock_now() + delay_period) << " on " << message + << " " << *message << dendl; + } + connection->delay_state->queue(delay_period, message); + } else if (messenger->ms_can_fast_dispatch(message)) { + connection->lock.unlock(); + connection->dispatch_queue->fast_dispatch(message); + connection->recv_start_time = ceph::mono_clock::now(); + connection->logger->tinc(l_msgr_running_fast_dispatch_time, + connection->recv_start_time - fast_dispatch_time); + connection->lock.lock(); + } else { + connection->dispatch_queue->enqueue(message, message->get_priority(), + connection->conn_id); + } + + handle_message_ack(current_header.ack_seq); + + // clean up local buffer references + data_buf.clear(); + front.clear(); + middle.clear(); + data.clear(); + + if (need_dispatch_writer && connection->is_connected()) { + connection->center->dispatch_event_external(connection->write_handler); + } + + return CONTINUE(read_frame); +} + +CtPtr ProtocolV2::handle_keepalive2(char *payload, uint32_t length) { + ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; + + KeepAliveFrame keepalive_frame(payload, length); + + ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl; + + utime_t kp_t = utime_t(keepalive_frame.timestamp); + connection->write_lock.lock(); + append_keepalive_ack(kp_t); + connection->write_lock.unlock(); + + ldout(cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl; + connection->set_last_keepalive(ceph_clock_now()); + + if (is_connected()) { + connection->center->dispatch_event_external(connection->write_handler); + } + + return CONTINUE(read_frame); +} + +CtPtr ProtocolV2::handle_keepalive2_ack(char *payload, uint32_t length) { + ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; + + KeepAliveFrame keepalive_ack_frame(payload, length); + connection->set_last_keepalive_ack(utime_t(keepalive_ack_frame.timestamp)); + ldout(cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl; + + return CONTINUE(read_frame); +} + +CtPtr ProtocolV2::handle_message_ack(char *payload, uint32_t length) { + ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; + + AckFrame ack(payload, length); + handle_message_ack(ack.seq); + return CONTINUE(read_frame); +} + /* Client Protocol Methods */ CtPtr ProtocolV2::start_client_banner_exchange() { @@ -407,7 +1238,7 @@ CtPtr ProtocolV2::send_auth_request(std::vector<__u32> allowed_methods) { } AuthRequestFrame authFrame(method, auth_bl); - bufferlist bl = authFrame.to_bufferlist(); + bufferlist &bl = authFrame.get_buffer(); return WRITE(bl, handle_auth_request_write); } @@ -477,7 +1308,7 @@ CtPtr ProtocolV2::send_client_ident() { << " flags: " << ident.flags << " cookie: " << std::dec << ident.cookie << dendl; - bufferlist bl = ident.to_bufferlist(); + bufferlist &bl = ident.get_buffer(); return WRITE(bl, handle_client_ident_write); } @@ -523,9 +1354,7 @@ CtPtr ProtocolV2::handle_server_ident(char *payload, uint32_t length) { connection->set_features(server_ident.required_features & connection->policy.features_supported); - state = READY; - - return CONTINUE(read_frame); + return ready(); } /* Server Protocol Methods */ @@ -594,7 +1423,7 @@ CtPtr ProtocolV2::handle_auth_request(char *payload, uint32_t length) { ldout(cct, 1) << __func__ << " auth method=" << auth_request.method << " not allowed" << dendl; AuthBadMethodFrame bad_method(auth_request.method, allowed_methods); - bufferlist bl = bad_method.to_bufferlist(); + bufferlist &bl = bad_method.get_buffer(); return WRITE(bl, handle_auth_bad_method_write); } @@ -605,7 +1434,7 @@ CtPtr ProtocolV2::handle_auth_request(char *payload, uint32_t length) { if (!valid) { AuthBadAuthFrame bad_auth(12, "Permission denied"); - bufferlist bl = bad_auth.to_bufferlist(); + bufferlist &bl = bad_auth.get_buffer(); return WRITE(bl, handle_auth_bad_auth_write); } @@ -616,7 +1445,7 @@ CtPtr ProtocolV2::handle_auth_request(char *payload, uint32_t length) { encode(hello, auth_bl, 0); /* END TO REMOVE */ AuthMoreFrame more(auth_bl); - bufferlist bl = more.to_bufferlist(); + bufferlist &bl = more.get_buffer(); return WRITE(bl, handle_auth_more_write); } @@ -685,8 +1514,7 @@ CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) { << feat_missing << std::dec << dendl; IdentMissingFeaturesFrame ident_missing_features(feat_missing); - bufferlist bl; - bl = ident_missing_features.to_bufferlist(); + bufferlist &bl = ident_missing_features.get_buffer(); return WRITE(bl, handle_ident_missing_features_write); } @@ -710,7 +1538,34 @@ CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) { << " flags: " << ident.flags << " cookie: " << std::dec << ident.cookie << dendl; - bufferlist bl = ident.to_bufferlist(); + connection->lock.unlock(); + // Because "replacing" will prevent other connections preempt this addr, + // it's safe that here we don't acquire Connection's lock + ssize_t r = messenger->accept_conn(connection); + + connection->inject_delay(); + + connection->lock.lock(); + + if (r < 0) { + ldout(cct, 1) << __func__ << " existing race replacing process for addr = " + << connection->peer_addrs.msgr2_addr() + << " just fail later one(this)" << dendl; + ldout(cct, 10) << "accept fault after register" << dendl; + connection->inject_delay(); + return _fault(); + } + if (state != ACCEPTING) { + ldout(cct, 1) << __func__ + << " state changed while accept_conn, it must be mark_down" + << dendl; + ceph_assert(state == CLOSED || state == NONE); + ldout(cct, 10) << "accept fault after register" << dendl; + connection->inject_delay(); + return _fault(); + } + + bufferlist &bl = ident.get_buffer(); return WRITE(bl, handle_send_server_ident_write); } @@ -723,6 +1578,10 @@ CtPtr ProtocolV2::handle_ident_missing_features_write(int r) { return _fault(); } + // notify + connection->dispatch_queue->queue_accept(connection); + messenger->ms_deliver_handle_fast_accept(connection); + return CONTINUE(read_frame); } @@ -735,7 +1594,5 @@ CtPtr ProtocolV2::handle_send_server_ident_write(int r) { return _fault(); } - state = READY; - - return CONTINUE(read_frame); + return ready(); } diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index e8a7b60b0af..7de850756d0 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -15,17 +15,31 @@ private: START_ACCEPT, ACCEPTING, READY, + THROTTLE_MESSAGE, + THROTTLE_BYTES, + THROTTLE_DISPATCH_QUEUE, + READ_MESSAGE_FRONT, + READ_MESSAGE_COMPLETE, CLOSED }; static const char *get_state_name(int state) { - const char *const statenames[] = {"NONE", "START_CONNECT", - "CONNECTING", "START_ACCEPT", - "ACCEPTING", "CLOSED"}; + const char *const statenames[] = {"NONE", + "START_CONNECT", + "CONNECTING", + "START_ACCEPT", + "ACCEPTING", + "READY", + "THROTTLE_MESSAGE", + "THROTTLE_BYTES", + "THROTTLE_DISPATCH_QUEUE", + "READ_MESSAGE_FRONT", + "READ_MESSAGE_COMPLETE", + "CLOSED"}; return statenames[state]; } - enum class Tag : __le32 { + enum class Tag : uint32_t { AUTH_REQUEST, AUTH_BAD_METHOD, AUTH_BAD_AUTH, @@ -33,40 +47,46 @@ private: AUTH_DONE, IDENT, IDENT_MISSING_FEATURES, + MESSAGE, + KEEPALIVE2, + KEEPALIVE2_ACK, + ACK }; struct Frame { - __le32 frame_len; - __le32 tag; + uint32_t tag; bufferlist payload; + bufferlist frame_buffer; - Frame(Tag tag, __le32 payload_len) - : frame_len(sizeof(__le32) + payload_len), - tag(static_cast<__le32>(tag)) {} + Frame(Tag tag) : tag(static_cast(tag)) { + encode(this->tag, payload, 0); + } - bufferlist to_bufferlist() { - ceph_assert(payload.length() == (frame_len - sizeof(__le32))); - bufferlist bl; - encode(frame_len, bl, 0); - encode(tag, bl, 0); - bl.claim_append(payload); - return bl; + Frame() {} + + bufferlist &get_buffer() { + if (frame_buffer.length()) { + return frame_buffer; + } + encode((uint32_t)payload.length(), frame_buffer, 0); + frame_buffer.claim_append(payload); + return frame_buffer; } }; struct SignedEncryptedFrame : public Frame { - SignedEncryptedFrame(Tag tag, __le32 payload_len) - : Frame(tag, payload_len) {} - bufferlist to_bufferlist() { return Frame::to_bufferlist(); } + SignedEncryptedFrame(Tag tag) : Frame(tag) {} + SignedEncryptedFrame() : Frame() {} + bufferlist &get_buffer() { return Frame::get_buffer(); } }; struct AuthRequestFrame : public Frame { - __le32 method; - __le32 len; + uint32_t method; + uint32_t len; bufferlist auth_payload; - AuthRequestFrame(__le32 method, bufferlist &auth_payload) - : Frame(Tag::AUTH_REQUEST, sizeof(__le32) * 2 + auth_payload.length()), + AuthRequestFrame(uint32_t method, bufferlist &auth_payload) + : Frame(Tag::AUTH_REQUEST), method(method), len(auth_payload.length()), auth_payload(auth_payload) { @@ -75,92 +95,87 @@ private: payload.claim_append(auth_payload); } - AuthRequestFrame(char *payload, uint32_t length) - : Frame(Tag::AUTH_REQUEST, length) { - method = *(__le32 *)payload; - len = *(__le32 *)(payload + sizeof(__le32)); - ceph_assert((length - (sizeof(__le32) * 2)) == len); - auth_payload.append((payload + (sizeof(__le32) * 2)), len); + AuthRequestFrame(char *payload, uint32_t length) : Frame() { + method = *(uint32_t *)payload; + len = *(uint32_t *)(payload + sizeof(uint32_t)); + ceph_assert((length - (sizeof(uint32_t) * 2)) == len); + auth_payload.append((payload + (sizeof(uint32_t) * 2)), len); } }; struct AuthBadMethodFrame : public Frame { - __le32 method; + uint32_t method; std::vector<__u32> allowed_methods; - AuthBadMethodFrame(__le32 method, std::vector<__u32> methods) - : Frame(Tag::AUTH_BAD_METHOD, sizeof(__le32) * (2 + methods.size())), + AuthBadMethodFrame(uint32_t method, std::vector<__u32> methods) + : Frame(Tag::AUTH_BAD_METHOD), method(method), allowed_methods(methods) { encode(method, payload, 0); - encode((__le32)allowed_methods.size(), payload, 0); + encode((uint32_t)allowed_methods.size(), payload, 0); for (const auto &a_meth : allowed_methods) { encode(a_meth, payload, 0); } } - AuthBadMethodFrame(char *payload, uint32_t length) - : Frame(Tag::AUTH_BAD_METHOD, length) { - method = *(__le32 *)payload; - __le32 num_methods = *(__le32 *)(payload + sizeof(__le32)); - payload += sizeof(__le32) * 2; + AuthBadMethodFrame(char *payload, uint32_t length) : Frame() { + method = *(uint32_t *)payload; + uint32_t num_methods = *(uint32_t *)(payload + sizeof(uint32_t)); + payload += sizeof(uint32_t) * 2; for (unsigned i = 0; i < num_methods; ++i) { - allowed_methods.push_back(*(__le32 *)(payload + sizeof(__le32) * i)); + allowed_methods.push_back( + *(uint32_t *)(payload + sizeof(uint32_t) * i)); } } }; struct AuthBadAuthFrame : public Frame { - __le32 error_code; + uint32_t error_code; std::string error_msg; - AuthBadAuthFrame(__le32 error_code, std::string error_msg) - : Frame(Tag::AUTH_BAD_AUTH, sizeof(__le32) * 2 + error_msg.size()), + AuthBadAuthFrame(uint32_t error_code, std::string error_msg) + : Frame(Tag::AUTH_BAD_AUTH), error_code(error_code), error_msg(error_msg) { encode(error_code, payload, 0); encode(error_msg, payload, 0); } - AuthBadAuthFrame(char *payload, uint32_t length) - : Frame(Tag::AUTH_BAD_AUTH, length) { - error_code = *(__le32 *)payload; - __le32 len = *(__le32 *)(payload + sizeof(__le32)); - error_msg = std::string(payload + sizeof(__le32) * 2, len); + AuthBadAuthFrame(char *payload, uint32_t length) : Frame() { + error_code = *(uint32_t *)payload; + uint32_t len = *(uint32_t *)(payload + sizeof(uint32_t)); + error_msg = std::string(payload + sizeof(uint32_t) * 2, len); } }; struct AuthMoreFrame : public Frame { - __le32 len; + uint32_t len; bufferlist auth_payload; AuthMoreFrame(bufferlist &auth_payload) - : Frame(Tag::AUTH_MORE, sizeof(__le32) + auth_payload.length()), + : Frame(Tag::AUTH_MORE), len(auth_payload.length()), auth_payload(auth_payload) { encode(len, payload, 0); payload.claim_append(auth_payload); } - AuthMoreFrame(char *payload, uint32_t length) - : Frame(Tag::AUTH_BAD_AUTH, length) { - len = *(__le32 *)payload; - ceph_assert((length - sizeof(__le32)) == len); - auth_payload.append(payload + sizeof(__le32), len); + AuthMoreFrame(char *payload, uint32_t length) : Frame() { + len = *(uint32_t *)payload; + ceph_assert((length - sizeof(uint32_t)) == len); + auth_payload.append(payload + sizeof(uint32_t), len); } }; struct AuthDoneFrame : public Frame { - __le64 flags; + uint64_t flags; - AuthDoneFrame(uint64_t flags) - : Frame(Tag::AUTH_DONE, sizeof(__le64)), flags(flags) { + AuthDoneFrame(uint64_t flags) : Frame(Tag::AUTH_DONE), flags(flags) { encode(flags, payload, 0); } - AuthDoneFrame(char *payload, uint32_t length) - : Frame(Tag::AUTH_DONE, length) { - flags = *(__le64 *)payload; + AuthDoneFrame(char *payload, uint32_t length) : Frame() { + flags = *(uint64_t *)payload; } }; @@ -174,7 +189,7 @@ private: IdentFrame(entity_addrvec_t addrs, int64_t gid, uint64_t supported_features, uint64_t required_features, uint64_t flags, uint64_t cookie) - : SignedEncryptedFrame(Tag::IDENT, 0), + : SignedEncryptedFrame(Tag::IDENT), addrs(addrs), gid(gid), supported_features(supported_features), @@ -187,13 +202,11 @@ private: encode(required_features, payload, -1ll); encode(flags, payload, -1ll); encode(cookie, payload, -1ll); - frame_len = sizeof(uint32_t) + payload.length(); } - IdentFrame(char *payload, uint32_t length) - : SignedEncryptedFrame(Tag::IDENT, length) { + IdentFrame(char *payload, uint32_t length) : SignedEncryptedFrame() { bufferlist bl; - bl.append(payload, length); + bl.push_back(buffer::create_static(length, payload)); try { auto ti = bl.cbegin(); decode(addrs, ti); @@ -211,25 +224,117 @@ private: __le64 features; IdentMissingFeaturesFrame(uint64_t features) - : SignedEncryptedFrame(Tag::IDENT_MISSING_FEATURES, sizeof(uint64_t)), + : SignedEncryptedFrame(Tag::IDENT_MISSING_FEATURES), features(features) { encode(features, payload, -1ll); } IdentMissingFeaturesFrame(char *payload, uint32_t length) - : SignedEncryptedFrame(Tag::IDENT_MISSING_FEATURES, length) { + : SignedEncryptedFrame() { features = *(uint64_t *)payload; } }; + struct MessageFrame : public SignedEncryptedFrame { + const unsigned int ASYNC_COALESCE_THRESHOLD = 256; + + ceph_msg_header2 header2; + + MessageFrame(Message *msg, bufferlist &data, uint64_t ack_seq, + bool calc_crc) + : SignedEncryptedFrame(Tag::MESSAGE) { + ceph_msg_header &header = msg->get_header(); + ceph_msg_footer &footer = msg->get_footer(); + + header2 = ceph_msg_header2{header.seq, header.tid, + header.type, header.priority, + header.version, header.front_len, + header.middle_len, 0, + header.data_len, header.data_off, + ack_seq, footer.front_crc, + footer.middle_crc, footer.data_crc, + footer.flags, header.compat_version, + header.reserved, 0}; + + if (calc_crc) { + header2.header_crc = + ceph_crc32c(0, (unsigned char *)&header2, + sizeof(header2) - sizeof(header2.header_crc)); + } + + payload.append((char *)&header2, sizeof(header2)); + if ((data.length() <= ASYNC_COALESCE_THRESHOLD) && + (data.buffers().size() > 1)) { + for (const auto &pb : data.buffers()) { + payload.append((char *)pb.c_str(), pb.length()); + } + } else { + payload.claim_append(data); + } + } + }; + + struct KeepAliveFrame : public SignedEncryptedFrame { + struct ceph_timespec timestamp; + + KeepAliveFrame() : SignedEncryptedFrame(Tag::KEEPALIVE2) { + struct ceph_timespec ts; + utime_t t = ceph_clock_now(); + t.encode_timeval(&ts); + payload.append((char *)&ts, sizeof(ts)); + } + + KeepAliveFrame(struct ceph_timespec ×tamp) + : SignedEncryptedFrame(Tag::KEEPALIVE2_ACK) { + payload.append((char *)×tamp, sizeof(timestamp)); + } + + KeepAliveFrame(char *payload, uint32_t length) : SignedEncryptedFrame() { + ceph_assert(length == sizeof(timestamp)); + timestamp = *(struct ceph_timespec *)payload; + } + }; + + struct AckFrame : public SignedEncryptedFrame { + uint64_t seq; + + AckFrame(uint64_t seq) : SignedEncryptedFrame(Tag::ACK) { + encode(seq, payload, 0); + } + + AckFrame(char *payload, uint32_t length) : SignedEncryptedFrame() { + seq = *(uint64_t *)payload; + } + }; + char *temp_buffer; State state; uint64_t peer_required_features; uint64_t cookie; + uint64_t message_seq; + bool can_write; + std::map>> out_queue; + std::list sent; + std::atomic out_seq{0}; + std::atomic in_seq{0}; + std::atomic ack_left{0}; using ProtFuncPtr = void (ProtocolV2::*)(); Ct *bannerExchangeCallback; + uint32_t next_frame_len; + Tag next_tag; + ceph_msg_header2 current_header; + utime_t backoff; // backoff time + utime_t recv_stamp; + utime_t throttle_stamp; + unsigned msg_left; + bufferlist data_buf; + bufferlist::iterator data_blp; + bufferlist front, middle, data; + + bool keepalive; + ostream &_conn_prefix(std::ostream *_dout); Ct *read(CONTINUATION_PARAM(next, ProtocolV2, char *, int), @@ -242,6 +347,14 @@ private: return nullptr; } + void discard_out_queue(); + void prepare_send_message(uint64_t features, Message *m, bufferlist &bl); + Message *_get_next_outgoing(bufferlist *bl); + ssize_t write_message(Message *m, bufferlist &bl, bool more); + void append_keepalive(); + void append_keepalive_ack(utime_t ×tamp); + void handle_message_ack(uint64_t seq); + WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, _banner_exchange_handle_write); READ_HANDLER_CONTINUATION_DECL(ProtocolV2, _banner_exchange_handle_peer_banner); @@ -250,19 +363,49 @@ private: Ct *_banner_exchange_handle_write(int r); Ct *_banner_exchange_handle_peer_banner(char *buffer, int r); - uint32_t next_frame_len; CONTINUATION_DECL(ProtocolV2, read_frame); - READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_length); - READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_frame); + READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_length_and_tag); + READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_frame_payload); WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_more_write); + READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_header); + CONTINUATION_DECL(ProtocolV2, throttle_message); + CONTINUATION_DECL(ProtocolV2, throttle_bytes); + CONTINUATION_DECL(ProtocolV2, throttle_dispatch_queue); + READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_front); + READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_middle); + CONTINUATION_DECL(ProtocolV2, read_message_data); + READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_data); Ct *read_frame(); - Ct *handle_read_frame_length(char *buffer, int r); - Ct *handle_frame(char *buffer, int r); + Ct *handle_read_frame_length_and_tag(char *buffer, int r); + Ct *handle_frame_payload(char *buffer, int r); + Ct *handle_auth_more(char *payload, uint32_t length); Ct *handle_auth_more_write(int r); + Ct *handle_ident(char *payload, uint32_t length); + Ct *ready(); + + Ct *handle_message(); + Ct *handle_message_header(char *buffer, int r); + Ct *throttle_message(); + Ct *throttle_bytes(); + Ct *throttle_dispatch_queue(); + Ct *read_message_front(); + Ct *handle_message_front(char *buffer, int r); + Ct *read_message_middle(); + Ct *handle_message_middle(char *buffer, int r); + Ct *read_message_data_prepare(); + Ct *read_message_data(); + Ct *handle_message_data(char *buffer, int r); + Ct *handle_message_complete(); + + Ct *handle_keepalive2(char *payload, uint32_t length); + Ct *handle_keepalive2_ack(char *payload, uint32_t length); + + Ct *handle_message_ack(char *payload, uint32_t length); + public: ProtocolV2(AsyncConnection *connection); virtual ~ProtocolV2(); -- 2.39.5