From: Radoslaw Zarzynski Date: Fri, 15 Feb 2019 02:09:19 +0000 (+0100) Subject: msg/async: initial multi-segment support for V2. X-Git-Tag: v14.1.1~157^2~26 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=eb444b81841974434cbd71840fed7c6bf56697f7;p=ceph-ci.git msg/async: initial multi-segment support for V2. Signed-off-by: Radoslaw Zarzynski --- diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 74e45fbe2a7..e6a6a9a000a 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -544,7 +544,10 @@ struct MessageHeaderFrame { // FIXME: plainsize -> ciphersize; for AES-GCM they are equall apart from auth tag size fill_preamble({ - segment_t { this->payload.length() + front_len + middle_len + data_len + 16 - FRAME_PREAMBLE_SIZE, 16 }, + segment_t{ this->payload.length() - FRAME_PREAMBLE_SIZE, 8 }, + segment_t{ front_len, 8 }, + segment_t{ middle_len, 8 }, + segment_t{ data_len + 16, segment_t::DEFERRED_ALLOCATION }, }, {}); } @@ -1446,13 +1449,13 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(char *buffer, int r) { next_tag = static_cast(main_preamble.tag); // FIXME: makeshift solution - ceph_assert_always(main_preamble.num_segments == 1); + ceph_assert_always(main_preamble.num_segments == 1 || + main_preamble.num_segments == 4); // I expect ceph_le32 will make the endian conversion for me. Passing // everything through ::decode is unnecessary. - next_payload_len = main_preamble.segments[0].length; - rx_segments_todo_rev.clear(); + next_payload_len = 0; for (std::uint8_t idx = main_preamble.num_segments; idx <= rx_segments_todo_rev.capacity() && idx > 0; /* NOP */) @@ -1463,6 +1466,7 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(char *buffer, int r) { << " align=" << main_preamble.segments[idx].alignment << dendl; rx_segments_todo_rev.emplace_back(main_preamble.segments[idx]); + next_payload_len += main_preamble.segments[idx].length; } // TODO: move this ugliness into dedicated procedure @@ -1477,7 +1481,8 @@ CtPtr ProtocolV2::handle_read_frame_preamble_main(char *buffer, int r) { } } - return handle_read_frame_dispatch(); + rx_segments_data.clear(); + return read_frame_segment(); } CtPtr ProtocolV2::handle_read_frame_dispatch() { @@ -1502,8 +1507,7 @@ CtPtr ProtocolV2::handle_read_frame_dispatch() { case Tag::KEEPALIVE2: case Tag::KEEPALIVE2_ACK: case Tag::ACK: - rx_segments_data.clear(); - return read_frame_segment(); + return handle_frame_payload(); case Tag::WAIT: return handle_wait(); case Tag::MESSAGE: @@ -1532,15 +1536,20 @@ CtPtr ProtocolV2::read_frame_segment() { buffer::create(rx_segments_todo_rev.back().length)); #endif + if (rx_segments_todo_rev.back().alignment == 0) { + rx_segments_data.emplace_back(); + return handle_read_frame_dispatch(); + } + rx_segments_todo_rev.pop_back(); rx_segments_data.emplace_back(); rx_segments_data.back().push_back(std::move(rx_buffer)); return READB(rx_segments_data.back().length(), rx_segments_data.back().c_str(), - handle_frame_segment); + handle_read_frame_segment); } -CtPtr ProtocolV2::handle_frame_segment(char *buffer, int r) { +CtPtr ProtocolV2::handle_read_frame_segment(char *buffer, int r) { ldout(cct, 20) << __func__ << " r=" << r << dendl; if (r < 0) { @@ -1553,20 +1562,13 @@ CtPtr ProtocolV2::handle_frame_segment(char *buffer, int r) { return read_frame_segment(); } else { // TODO: for makeshift only. This will be more generic and throttled - return handle_frame_payload(buffer, r); + return handle_read_frame_dispatch(); } } -CtPtr ProtocolV2::handle_frame_payload(char *buffer, int r) { - ldout(cct, 20) << __func__ << " r=" << r << dendl; - - if (r < 0) { - ldout(cct, 1) << __func__ << " read frame payload failed r=" << r << " (" - << cpp_strerror(r) << ")" << dendl; - return _fault(); - } - +CtPtr ProtocolV2::handle_frame_payload() { ceph_assert(!rx_segments_data.empty()); + auto* buffer = rx_segments_data.back().c_str(); const auto this_payload_len = rx_segments_data.back().length(); ldout(cct, 30) << __func__ << "\n"; @@ -1653,6 +1655,7 @@ CtPtr ProtocolV2::ready() { CtPtr ProtocolV2::handle_message() { ldout(cct, 20) << __func__ << dendl; + ceph_assert(rx_segments_data.size() == 4); ceph_assert(state == READY); #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE) @@ -1660,36 +1663,23 @@ CtPtr ProtocolV2::handle_message() { #endif recv_stamp = ceph_clock_now(); - const uint32_t header_len = calculate_payload_size( - session_security.rx.get(), sizeof(ceph_msg_header2)); - return READ(header_len, handle_message_header); -} - -CtPtr ProtocolV2::handle_message_header(char *buffer, int r) { - ldout(cct, 20) << __func__ << " r=" << r << dendl; - - if (r < 0) { - ldout(cct, 1) << __func__ << " read message header failed" << dendl; - return _fault(); - } - - const uint32_t header_len = calculate_payload_size( - session_security.rx.get(), sizeof(ceph_msg_header2)); - - ceph::bufferlist text; - text.push_back(buffer::create_static(header_len, buffer)); + // TODO: move crypto processing to segment reader if (auth_meta->is_mode_secure()) { ceph_assert(session_stream_handlers.rx); - text = session_stream_handlers.rx->authenticated_decrypt_update( - std::move(text), 8); + rx_segments_data[0] = \ + session_stream_handlers.rx->authenticated_decrypt_update( + std::move(rx_segments_data[0]), 8); } - MessageHeaderFrame header_frame(std::move(text)); + MessageHeaderFrame header_frame(std::move(rx_segments_data[0])); ceph_msg_header2 &header = header_frame.header(); - ldout(cct, 20) << __func__ << " got envelope type=" << header.type << " src " - << peer_name << " front=" << header.front_len - << " data=" << header.data_len << " off " << header.data_off + ldout(cct, 20) << __func__ + << " got envelope type=" << header.type + << " src " << peer_name + << " front=" << header.front_len + << " data=" << header.data_len + << " off " << header.data_off << dendl; if (messenger->crcflags & MSG_CRC_HEADER) { @@ -1714,171 +1704,29 @@ CtPtr ProtocolV2::handle_message_header(char *buffer, int r) { extra.clear(); current_header = header; - next_payload_len -= header_len; - #if 0 state = THROTTLE_MESSAGE; return CONTINUE(throttle_message); -#else - state = READ_MESSAGE_FRONT; - return read_message_front(); #endif -} - -CtPtr ProtocolV2::throttle_message() { - ldout(cct, 20) << __func__ << dendl; - - if (connection->policy.throttler_messages) { - ldout(cct, 10) << __func__ << " wants " << 1 - << " message from policy throttler " - << connection->policy.throttler_messages->get_current() - << "/" << connection->policy.throttler_messages->get_max() - << dendl; - if (!connection->policy.throttler_messages->get_or_fail()) { - ldout(cct, 10) << __func__ << " wants 1 message from policy throttle " - << connection->policy.throttler_messages->get_current() - << "/" << connection->policy.throttler_messages->get_max() - << " failed, just wait." << dendl; - // following thread pool deal with th full message queue isn't a - // short time, so we can wait a ms. - if (connection->register_time_events.empty()) { - connection->register_time_events.insert( - connection->center->create_time_event(1000, - connection->wakeup_handler)); - } - return nullptr; - } - } - - state = THROTTLE_BYTES; - return CONTINUE(throttle_bytes); -} - -CtPtr ProtocolV2::throttle_bytes() { - ldout(cct, 20) << __func__ << dendl; - - uint32_t cur_msg_size = current_header.front_len + current_header.middle_len + - current_header.data_len; - if (cur_msg_size) { - if (connection->policy.throttler_bytes) { - ldout(cct, 10) << __func__ << " wants " << cur_msg_size - << " bytes from policy throttler " - << connection->policy.throttler_bytes->get_current() << "/" - << connection->policy.throttler_bytes->get_max() << dendl; - if (!connection->policy.throttler_bytes->get_or_fail(cur_msg_size)) { - ldout(cct, 10) << __func__ << " wants " << cur_msg_size - << " bytes from policy throttler " - << connection->policy.throttler_bytes->get_current() - << "/" << connection->policy.throttler_bytes->get_max() - << " failed, just wait." << dendl; - // following thread pool deal with th full message queue isn't a - // short time, so we can wait a ms. - if (connection->register_time_events.empty()) { - connection->register_time_events.insert( - connection->center->create_time_event( - 1000, connection->wakeup_handler)); - } - return nullptr; - } - } - } - state = THROTTLE_DISPATCH_QUEUE; - return CONTINUE(throttle_dispatch_queue); -} + // front + ceph_assert(current_header.front_len == rx_segments_data[1].length()); + ceph_assert(!front.length()); + front = std::move(rx_segments_data[1]); -CtPtr ProtocolV2::throttle_dispatch_queue() { - ldout(cct, 20) << __func__ << dendl; - - uint32_t cur_msg_size = current_header.front_len + current_header.middle_len + - current_header.data_len; - - if (cur_msg_size) { - if (!connection->dispatch_queue->dispatch_throttler.get_or_fail( - cur_msg_size)) { - ldout(cct, 10) - << __func__ << " wants " << cur_msg_size - << " bytes from dispatch throttle " - << connection->dispatch_queue->dispatch_throttler.get_current() << "/" - << connection->dispatch_queue->dispatch_throttler.get_max() - << " failed, just wait." << dendl; - // following thread pool deal with th full message queue isn't a - // short time, so we can wait a ms. - if (connection->register_time_events.empty()) { - connection->register_time_events.insert( - connection->center->create_time_event(1000, - connection->wakeup_handler)); - } - return nullptr; - } - } - - throttle_stamp = ceph_clock_now(); - - state = READ_MESSAGE_FRONT; - return read_message_front(); -} - -CtPtr ProtocolV2::read_message_front() { - ldout(cct, 20) << __func__ << dendl; - - unsigned front_len = current_header.front_len; - if (front_len) { - if (!front.length()) { - front.push_back(buffer::create(front_len)); - } - return READB(front_len, front.c_str(), handle_message_front); - } - return read_message_middle(); -} - -CtPtr ProtocolV2::handle_message_front(char *buffer, int r) { - ldout(cct, 20) << __func__ << " r=" << r << dendl; - - if (r < 0) { - ldout(cct, 1) << __func__ << " read message front failed" << dendl; - return _fault(); - } - - ldout(cct, 20) << __func__ << " got front " << front.length() << dendl; - - next_payload_len -= current_header.front_len; - - return read_message_middle(); -} - -CtPtr ProtocolV2::read_message_middle() { - ldout(cct, 20) << __func__ << dendl; - - if (current_header.middle_len) { - if (!middle.length()) { - middle.push_back(buffer::create(current_header.middle_len)); - } - return READB(current_header.middle_len, middle.c_str(), - handle_message_middle); - } - - return read_message_data_prepare(); -} - -CtPtr ProtocolV2::handle_message_middle(char *buffer, int r) { - ldout(cct, 20) << __func__ << " r" << r << dendl; - - if (r < 0) { - ldout(cct, 1) << __func__ << " read message middle failed" << dendl; - return _fault(); - } - - ldout(cct, 20) << __func__ << " got middle " << middle.length() << dendl; - - next_payload_len -= current_header.middle_len; + // middle + ceph_assert(current_header.middle_len == rx_segments_data[2].length()); + ceph_assert(!middle.length()); + middle = std::move(rx_segments_data[2]); + next_payload_len -= sizeof(ceph_msg_header2); + next_payload_len -= front.length(); + next_payload_len -= middle.length(); return read_message_data_prepare(); } CtPtr ProtocolV2::read_message_data_prepare() { ldout(cct, 20) << __func__ << dendl; - unsigned data_len = le32_to_cpu(current_header.data_len); unsigned data_off = le32_to_cpu(current_header.data_off); @@ -1967,9 +1815,11 @@ CtPtr ProtocolV2::handle_message_extra_bytes(char *buffer, int r) { CtPtr ProtocolV2::handle_message_complete() { ldout(cct, 20) << __func__ << dendl; - ldout(cct, 5) << __func__ << " got " << front.length() << " + " - << middle.length() << " + " << data.length() << " byte message" - << dendl; + ldout(cct, 5) << __func__ + << " got " << front.length() + << " + " << middle.length() + << " + " << data.length() + << " byte message" << dendl; ceph_msg_header header{current_header.seq, current_header.tid, @@ -2145,6 +1995,101 @@ CtPtr ProtocolV2::handle_message_complete() { return CONTINUE(read_frame); } + +CtPtr ProtocolV2::throttle_message() { + ldout(cct, 20) << __func__ << dendl; + + if (connection->policy.throttler_messages) { + ldout(cct, 10) << __func__ << " wants " << 1 + << " message from policy throttler " + << connection->policy.throttler_messages->get_current() + << "/" << connection->policy.throttler_messages->get_max() + << dendl; + if (!connection->policy.throttler_messages->get_or_fail()) { + ldout(cct, 10) << __func__ << " wants 1 message from policy throttle " + << connection->policy.throttler_messages->get_current() + << "/" << connection->policy.throttler_messages->get_max() + << " failed, just wait." << dendl; + // following thread pool deal with th full message queue isn't a + // short time, so we can wait a ms. + if (connection->register_time_events.empty()) { + connection->register_time_events.insert( + connection->center->create_time_event(1000, + connection->wakeup_handler)); + } + return nullptr; + } + } + + state = THROTTLE_BYTES; + return CONTINUE(throttle_bytes); +} + +CtPtr ProtocolV2::throttle_bytes() { + ldout(cct, 20) << __func__ << dendl; + + uint32_t cur_msg_size = current_header.front_len + current_header.middle_len + + current_header.data_len; + if (cur_msg_size) { + if (connection->policy.throttler_bytes) { + ldout(cct, 10) << __func__ << " wants " << cur_msg_size + << " bytes from policy throttler " + << connection->policy.throttler_bytes->get_current() << "/" + << connection->policy.throttler_bytes->get_max() << dendl; + if (!connection->policy.throttler_bytes->get_or_fail(cur_msg_size)) { + ldout(cct, 10) << __func__ << " wants " << cur_msg_size + << " bytes from policy throttler " + << connection->policy.throttler_bytes->get_current() + << "/" << connection->policy.throttler_bytes->get_max() + << " failed, just wait." << dendl; + // following thread pool deal with th full message queue isn't a + // short time, so we can wait a ms. + if (connection->register_time_events.empty()) { + connection->register_time_events.insert( + connection->center->create_time_event( + 1000, connection->wakeup_handler)); + } + return nullptr; + } + } + } + + state = THROTTLE_DISPATCH_QUEUE; + return CONTINUE(throttle_dispatch_queue); +} + +CtPtr ProtocolV2::throttle_dispatch_queue() { + ldout(cct, 20) << __func__ << dendl; + + uint32_t cur_msg_size = current_header.front_len + current_header.middle_len + + current_header.data_len; + + if (cur_msg_size) { + if (!connection->dispatch_queue->dispatch_throttler.get_or_fail( + cur_msg_size)) { + ldout(cct, 10) + << __func__ << " wants " << cur_msg_size + << " bytes from dispatch throttle " + << connection->dispatch_queue->dispatch_throttler.get_current() << "/" + << connection->dispatch_queue->dispatch_throttler.get_max() + << " failed, just wait." << dendl; + // following thread pool deal with th full message queue isn't a + // short time, so we can wait a ms. + if (connection->register_time_events.empty()) { + connection->register_time_events.insert( + connection->center->create_time_event(1000, + connection->wakeup_handler)); + } + return nullptr; + } + } + + throttle_stamp = ceph_clock_now(); + + state = READ_MESSAGE_FRONT; + return read_message_data_prepare(); +} + CtPtr ProtocolV2::handle_keepalive2(char *payload, uint32_t length) { ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index 38c7cee04bd..8d556e08ab2 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -107,6 +107,7 @@ private: public: struct segment_t { + static constexpr __le16 DEFERRED_ALLOCATION { 0x0000 }; __le32 length; __le16 alignment; } __attribute__((packed)); @@ -168,13 +169,10 @@ private: CONTINUATION_DECL(ProtocolV2, read_frame); READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_preamble_main); - READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_frame_segment); - READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_header); + READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_segment); CONTINUATION_DECL(ProtocolV2, throttle_message); CONTINUATION_DECL(ProtocolV2, throttle_bytes); CONTINUATION_DECL(ProtocolV2, throttle_dispatch_queue); - READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_front); - READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_middle); CONTINUATION_DECL(ProtocolV2, read_message_data); READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_data); READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_extra_bytes); @@ -183,20 +181,15 @@ private: Ct *handle_read_frame_preamble_main(char *buffer, int r); Ct *handle_read_frame_dispatch(); Ct *read_frame_segment(); - Ct *handle_frame_segment(char *buffer, int r); - Ct *handle_frame_payload(char *buffer, int r); + Ct *handle_read_frame_segment(char *buffer, int r); + Ct *handle_frame_payload(); Ct *ready(); Ct *handle_message(); - Ct *handle_message_header(char *buffer, int r); Ct *throttle_message(); Ct *throttle_bytes(); Ct *throttle_dispatch_queue(); - Ct *read_message_front(); - Ct *handle_message_front(char *buffer, int r); - Ct *read_message_middle(); - Ct *handle_message_middle(char *buffer, int r); Ct *read_message_data_prepare(); Ct *read_message_data(); Ct *handle_message_data(char *buffer, int r);