{
// FIXME: plainsize -> ciphersize; for AES-GCM they are equall apart from auth tag size
fill_preamble({
- segment_t { this->payload.length() + front_len + middle_len + data_len + 16 - FRAME_PREAMBLE_SIZE, 16 },
+ segment_t{ this->payload.length() - FRAME_PREAMBLE_SIZE, 8 },
+ segment_t{ front_len, 8 },
+ segment_t{ middle_len, 8 },
+ segment_t{ data_len + 16, segment_t::DEFERRED_ALLOCATION },
}, {});
}
next_tag = static_cast<Tag>(main_preamble.tag);
// FIXME: makeshift solution
- ceph_assert_always(main_preamble.num_segments == 1);
+ ceph_assert_always(main_preamble.num_segments == 1 ||
+ main_preamble.num_segments == 4);
// I expect ceph_le32 will make the endian conversion for me. Passing
// everything through ::decode is unnecessary.
- next_payload_len = main_preamble.segments[0].length;
-
rx_segments_todo_rev.clear();
+ next_payload_len = 0;
for (std::uint8_t idx = main_preamble.num_segments;
idx <= rx_segments_todo_rev.capacity() && idx > 0;
/* NOP */)
<< " align=" << main_preamble.segments[idx].alignment
<< dendl;
rx_segments_todo_rev.emplace_back(main_preamble.segments[idx]);
+ next_payload_len += main_preamble.segments[idx].length;
}
// TODO: move this ugliness into dedicated procedure
}
}
- return handle_read_frame_dispatch();
+ rx_segments_data.clear();
+ return read_frame_segment();
}
CtPtr ProtocolV2::handle_read_frame_dispatch() {
case Tag::KEEPALIVE2:
case Tag::KEEPALIVE2_ACK:
case Tag::ACK:
- rx_segments_data.clear();
- return read_frame_segment();
+ return handle_frame_payload();
case Tag::WAIT:
return handle_wait();
case Tag::MESSAGE:
buffer::create(rx_segments_todo_rev.back().length));
#endif
+ if (rx_segments_todo_rev.back().alignment == 0) {
+ rx_segments_data.emplace_back();
+ return handle_read_frame_dispatch();
+ }
+
rx_segments_todo_rev.pop_back();
rx_segments_data.emplace_back();
rx_segments_data.back().push_back(std::move(rx_buffer));
return READB(rx_segments_data.back().length(),
rx_segments_data.back().c_str(),
- handle_frame_segment);
+ handle_read_frame_segment);
}
-CtPtr ProtocolV2::handle_frame_segment(char *buffer, int r) {
+CtPtr ProtocolV2::handle_read_frame_segment(char *buffer, int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
return read_frame_segment();
} else {
// TODO: for makeshift only. This will be more generic and throttled
- return handle_frame_payload(buffer, r);
+ return handle_read_frame_dispatch();
}
}
-CtPtr ProtocolV2::handle_frame_payload(char *buffer, int r) {
- ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
- if (r < 0) {
- ldout(cct, 1) << __func__ << " read frame payload failed r=" << r << " ("
- << cpp_strerror(r) << ")" << dendl;
- return _fault();
- }
-
+CtPtr ProtocolV2::handle_frame_payload() {
ceph_assert(!rx_segments_data.empty());
+ auto* buffer = rx_segments_data.back().c_str();
const auto this_payload_len = rx_segments_data.back().length();
ldout(cct, 30) << __func__ << "\n";
CtPtr ProtocolV2::handle_message() {
ldout(cct, 20) << __func__ << dendl;
+ ceph_assert(rx_segments_data.size() == 4);
ceph_assert(state == READY);
#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
#endif
recv_stamp = ceph_clock_now();
- const uint32_t header_len = calculate_payload_size(
- session_security.rx.get(), sizeof(ceph_msg_header2));
- return READ(header_len, handle_message_header);
-}
-
-CtPtr ProtocolV2::handle_message_header(char *buffer, int r) {
- ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
- if (r < 0) {
- ldout(cct, 1) << __func__ << " read message header failed" << dendl;
- return _fault();
- }
-
- const uint32_t header_len = calculate_payload_size(
- session_security.rx.get(), sizeof(ceph_msg_header2));
-
- ceph::bufferlist text;
- text.push_back(buffer::create_static(header_len, buffer));
+ // TODO: move crypto processing to segment reader
if (auth_meta->is_mode_secure()) {
ceph_assert(session_stream_handlers.rx);
- text = session_stream_handlers.rx->authenticated_decrypt_update(
- std::move(text), 8);
+ rx_segments_data[0] = \
+ session_stream_handlers.rx->authenticated_decrypt_update(
+ std::move(rx_segments_data[0]), 8);
}
- MessageHeaderFrame header_frame(std::move(text));
+ MessageHeaderFrame header_frame(std::move(rx_segments_data[0]));
ceph_msg_header2 &header = header_frame.header();
- ldout(cct, 20) << __func__ << " got envelope type=" << header.type << " src "
- << peer_name << " front=" << header.front_len
- << " data=" << header.data_len << " off " << header.data_off
+ ldout(cct, 20) << __func__
+ << " got envelope type=" << header.type
+ << " src " << peer_name
+ << " front=" << header.front_len
+ << " data=" << header.data_len
+ << " off " << header.data_off
<< dendl;
if (messenger->crcflags & MSG_CRC_HEADER) {
extra.clear();
current_header = header;
- next_payload_len -= header_len;
-
#if 0
state = THROTTLE_MESSAGE;
return CONTINUE(throttle_message);
-#else
- state = READ_MESSAGE_FRONT;
- return read_message_front();
#endif
-}
-
-CtPtr ProtocolV2::throttle_message() {
- ldout(cct, 20) << __func__ << dendl;
-
- if (connection->policy.throttler_messages) {
- ldout(cct, 10) << __func__ << " wants " << 1
- << " message from policy throttler "
- << connection->policy.throttler_messages->get_current()
- << "/" << connection->policy.throttler_messages->get_max()
- << dendl;
- if (!connection->policy.throttler_messages->get_or_fail()) {
- ldout(cct, 10) << __func__ << " wants 1 message from policy throttle "
- << connection->policy.throttler_messages->get_current()
- << "/" << connection->policy.throttler_messages->get_max()
- << " failed, just wait." << dendl;
- // following thread pool deal with th full message queue isn't a
- // short time, so we can wait a ms.
- if (connection->register_time_events.empty()) {
- connection->register_time_events.insert(
- connection->center->create_time_event(1000,
- connection->wakeup_handler));
- }
- return nullptr;
- }
- }
-
- state = THROTTLE_BYTES;
- return CONTINUE(throttle_bytes);
-}
-
-CtPtr ProtocolV2::throttle_bytes() {
- ldout(cct, 20) << __func__ << dendl;
-
- uint32_t cur_msg_size = current_header.front_len + current_header.middle_len +
- current_header.data_len;
- if (cur_msg_size) {
- if (connection->policy.throttler_bytes) {
- ldout(cct, 10) << __func__ << " wants " << cur_msg_size
- << " bytes from policy throttler "
- << connection->policy.throttler_bytes->get_current() << "/"
- << connection->policy.throttler_bytes->get_max() << dendl;
- if (!connection->policy.throttler_bytes->get_or_fail(cur_msg_size)) {
- ldout(cct, 10) << __func__ << " wants " << cur_msg_size
- << " bytes from policy throttler "
- << connection->policy.throttler_bytes->get_current()
- << "/" << connection->policy.throttler_bytes->get_max()
- << " failed, just wait." << dendl;
- // following thread pool deal with th full message queue isn't a
- // short time, so we can wait a ms.
- if (connection->register_time_events.empty()) {
- connection->register_time_events.insert(
- connection->center->create_time_event(
- 1000, connection->wakeup_handler));
- }
- return nullptr;
- }
- }
- }
- state = THROTTLE_DISPATCH_QUEUE;
- return CONTINUE(throttle_dispatch_queue);
-}
+ // front
+ ceph_assert(current_header.front_len == rx_segments_data[1].length());
+ ceph_assert(!front.length());
+ front = std::move(rx_segments_data[1]);
-CtPtr ProtocolV2::throttle_dispatch_queue() {
- ldout(cct, 20) << __func__ << dendl;
-
- uint32_t cur_msg_size = current_header.front_len + current_header.middle_len +
- current_header.data_len;
-
- if (cur_msg_size) {
- if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
- cur_msg_size)) {
- ldout(cct, 10)
- << __func__ << " wants " << cur_msg_size
- << " bytes from dispatch throttle "
- << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
- << connection->dispatch_queue->dispatch_throttler.get_max()
- << " failed, just wait." << dendl;
- // following thread pool deal with th full message queue isn't a
- // short time, so we can wait a ms.
- if (connection->register_time_events.empty()) {
- connection->register_time_events.insert(
- connection->center->create_time_event(1000,
- connection->wakeup_handler));
- }
- return nullptr;
- }
- }
-
- throttle_stamp = ceph_clock_now();
-
- state = READ_MESSAGE_FRONT;
- return read_message_front();
-}
-
-CtPtr ProtocolV2::read_message_front() {
- ldout(cct, 20) << __func__ << dendl;
-
- unsigned front_len = current_header.front_len;
- if (front_len) {
- if (!front.length()) {
- front.push_back(buffer::create(front_len));
- }
- return READB(front_len, front.c_str(), handle_message_front);
- }
- return read_message_middle();
-}
-
-CtPtr ProtocolV2::handle_message_front(char *buffer, int r) {
- ldout(cct, 20) << __func__ << " r=" << r << dendl;
-
- if (r < 0) {
- ldout(cct, 1) << __func__ << " read message front failed" << dendl;
- return _fault();
- }
-
- ldout(cct, 20) << __func__ << " got front " << front.length() << dendl;
-
- next_payload_len -= current_header.front_len;
-
- return read_message_middle();
-}
-
-CtPtr ProtocolV2::read_message_middle() {
- ldout(cct, 20) << __func__ << dendl;
-
- if (current_header.middle_len) {
- if (!middle.length()) {
- middle.push_back(buffer::create(current_header.middle_len));
- }
- return READB(current_header.middle_len, middle.c_str(),
- handle_message_middle);
- }
-
- return read_message_data_prepare();
-}
-
-CtPtr ProtocolV2::handle_message_middle(char *buffer, int r) {
- ldout(cct, 20) << __func__ << " r" << r << dendl;
-
- if (r < 0) {
- ldout(cct, 1) << __func__ << " read message middle failed" << dendl;
- return _fault();
- }
-
- ldout(cct, 20) << __func__ << " got middle " << middle.length() << dendl;
-
- next_payload_len -= current_header.middle_len;
+ // middle
+ ceph_assert(current_header.middle_len == rx_segments_data[2].length());
+ ceph_assert(!middle.length());
+ middle = std::move(rx_segments_data[2]);
+ next_payload_len -= sizeof(ceph_msg_header2);
+ next_payload_len -= front.length();
+ next_payload_len -= middle.length();
return read_message_data_prepare();
}
CtPtr ProtocolV2::read_message_data_prepare() {
ldout(cct, 20) << __func__ << dendl;
-
unsigned data_len = le32_to_cpu(current_header.data_len);
unsigned data_off = le32_to_cpu(current_header.data_off);
CtPtr ProtocolV2::handle_message_complete() {
ldout(cct, 20) << __func__ << dendl;
- ldout(cct, 5) << __func__ << " got " << front.length() << " + "
- << middle.length() << " + " << data.length() << " byte message"
- << dendl;
+ ldout(cct, 5) << __func__
+ << " got " << front.length()
+ << " + " << middle.length()
+ << " + " << data.length()
+ << " byte message" << dendl;
ceph_msg_header header{current_header.seq,
current_header.tid,
return CONTINUE(read_frame);
}
+
+CtPtr ProtocolV2::throttle_message() {
+ ldout(cct, 20) << __func__ << dendl;
+
+ if (connection->policy.throttler_messages) {
+ ldout(cct, 10) << __func__ << " wants " << 1
+ << " message from policy throttler "
+ << connection->policy.throttler_messages->get_current()
+ << "/" << connection->policy.throttler_messages->get_max()
+ << dendl;
+ if (!connection->policy.throttler_messages->get_or_fail()) {
+ ldout(cct, 10) << __func__ << " wants 1 message from policy throttle "
+ << connection->policy.throttler_messages->get_current()
+ << "/" << connection->policy.throttler_messages->get_max()
+ << " failed, just wait." << dendl;
+ // following thread pool deal with th full message queue isn't a
+ // short time, so we can wait a ms.
+ if (connection->register_time_events.empty()) {
+ connection->register_time_events.insert(
+ connection->center->create_time_event(1000,
+ connection->wakeup_handler));
+ }
+ return nullptr;
+ }
+ }
+
+ state = THROTTLE_BYTES;
+ return CONTINUE(throttle_bytes);
+}
+
+CtPtr ProtocolV2::throttle_bytes() {
+ ldout(cct, 20) << __func__ << dendl;
+
+ uint32_t cur_msg_size = current_header.front_len + current_header.middle_len +
+ current_header.data_len;
+ if (cur_msg_size) {
+ if (connection->policy.throttler_bytes) {
+ ldout(cct, 10) << __func__ << " wants " << cur_msg_size
+ << " bytes from policy throttler "
+ << connection->policy.throttler_bytes->get_current() << "/"
+ << connection->policy.throttler_bytes->get_max() << dendl;
+ if (!connection->policy.throttler_bytes->get_or_fail(cur_msg_size)) {
+ ldout(cct, 10) << __func__ << " wants " << cur_msg_size
+ << " bytes from policy throttler "
+ << connection->policy.throttler_bytes->get_current()
+ << "/" << connection->policy.throttler_bytes->get_max()
+ << " failed, just wait." << dendl;
+ // following thread pool deal with th full message queue isn't a
+ // short time, so we can wait a ms.
+ if (connection->register_time_events.empty()) {
+ connection->register_time_events.insert(
+ connection->center->create_time_event(
+ 1000, connection->wakeup_handler));
+ }
+ return nullptr;
+ }
+ }
+ }
+
+ state = THROTTLE_DISPATCH_QUEUE;
+ return CONTINUE(throttle_dispatch_queue);
+}
+
+CtPtr ProtocolV2::throttle_dispatch_queue() {
+ ldout(cct, 20) << __func__ << dendl;
+
+ uint32_t cur_msg_size = current_header.front_len + current_header.middle_len +
+ current_header.data_len;
+
+ if (cur_msg_size) {
+ if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
+ cur_msg_size)) {
+ ldout(cct, 10)
+ << __func__ << " wants " << cur_msg_size
+ << " bytes from dispatch throttle "
+ << connection->dispatch_queue->dispatch_throttler.get_current() << "/"
+ << connection->dispatch_queue->dispatch_throttler.get_max()
+ << " failed, just wait." << dendl;
+ // following thread pool deal with th full message queue isn't a
+ // short time, so we can wait a ms.
+ if (connection->register_time_events.empty()) {
+ connection->register_time_events.insert(
+ connection->center->create_time_event(1000,
+ connection->wakeup_handler));
+ }
+ return nullptr;
+ }
+ }
+
+ throttle_stamp = ceph_clock_now();
+
+ state = READ_MESSAGE_FRONT;
+ return read_message_data_prepare();
+}
+
CtPtr ProtocolV2::handle_keepalive2(char *payload, uint32_t length) {
ldout(cct, 20) << __func__ << " payload_len=" << length << dendl;