connection->pendingReadLen.reset();
connection->writeCallback.reset();
- uint32_t cur_msg_size = rx_segments_todo_rev[2].length + \
- rx_segments_todo_rev[1].length + \
- rx_segments_todo_rev[0].length;
+ uint32_t cur_msg_size = rx_segments_desc[1].logical.length + \
+ rx_segments_desc[2].logical.length + \
+ rx_segments_desc[3].logical.length;
if (state > THROTTLE_MESSAGE && state <= THROTTLE_DONE &&
connection->policy.throttler_messages) {
return !out_queue.empty() || connection->is_queued();
}
+uint32_t ProtocolV2::get_onwire_size(uint32_t logical_size) const {
+ return logical_size;
+}
+
CtPtr ProtocolV2::read(CONTINUATION_PARAM(next, ProtocolV2, char *, int),
int len, char *buffer) {
if (!buffer) {
}
{
+ // I expect ceph_le32 will make the endian conversion for me. Passing
+ // everything through ::decode is unnecessary.
const auto& main_preamble = \
reinterpret_cast<preamble_block_t&>(*preamble.c_str());
next_tag = static_cast<Tag>(main_preamble.tag);
ceph_assert_always(main_preamble.num_segments == 1 ||
main_preamble.num_segments == 4);
- // I expect ceph_le32 will make the endian conversion for me. Passing
- // everything through ::decode is unnecessary.
- rx_segments_todo_rev.clear();
+ rx_segments_desc.clear();
+ rx_segments_data.clear();
next_payload_len = 0;
- for (std::uint8_t idx = main_preamble.num_segments;
- idx <= rx_segments_todo_rev.capacity() && idx > 0;
- /* NOP */)
- {
- --idx;
+
+ if (main_preamble.num_segments > MAX_NUM_SEGMENTS) {
+ ldout(cct, 30) << __func__
+ << " num_segments=" << main_preamble.num_segments
+ << " is too much" << dendl;
+ return _fault();
+ }
+ for (std::uint8_t idx = 0; idx < main_preamble.num_segments; idx++) {
ldout(cct, 10) << __func__ << " got new segment:"
<< " len=" << main_preamble.segments[idx].length
<< " align=" << main_preamble.segments[idx].alignment
<< dendl;
- rx_segments_todo_rev.emplace_back(main_preamble.segments[idx]);
- next_payload_len += main_preamble.segments[idx].length;
+ rx_segments_desc.emplace_back(onwire_segment_t{
+ get_onwire_size(main_preamble.segments[idx].length),
+ main_preamble.segments[idx]
+ });
+ next_payload_len += rx_segments_desc.back().onwire_length;
}
if (session_stream_handlers.rx) {
- rx_segments_todo_rev.front().length += \
+ rx_segments_desc.back().onwire_length += \
session_stream_handlers.rx->get_extra_size_at_final();
next_payload_len += \
session_stream_handlers.rx->get_extra_size_at_final();
state = THROTTLE_MESSAGE;
return CONTINUE(throttle_message);
} else {
- rx_segments_data.clear();
return read_frame_segment();
}
}
CtPtr ProtocolV2::read_frame_segment() {
ldout(cct, 20) << __func__ << dendl;
- ceph_assert(!rx_segments_todo_rev.empty());
+ ceph_assert(!rx_segments_desc.empty());
-#if 0
- auto rx_buffer = ceph::buffer::ptr_node::create(
- buffer::create_aligned(rx_segments_todo_rev.back().length,
- rx_segments_todo_rev.back().alignment));
-#else
- auto rx_buffer = ceph::buffer::ptr_node::create(
- buffer::create(rx_segments_todo_rev.back().length));
-#endif
+ // description of current segment to read
+ const auto& cur_rx_desc = rx_segments_desc.at(rx_segments_data.size());
- if (rx_segments_todo_rev.back().alignment == 0) {
- rx_segments_data.emplace_back();
+ if (cur_rx_desc.logical.alignment == segment_t::DEFERRED_ALLOCATION) {
+ // This is a special case for the rx_buffers zero-copy optimization
+ // used for Message's data field. It might be dangerous and will be
+ // ultimately replaced by `allocation policies`.
+ rx_segments_data.emplace_back(ceph::bufferlist{});
return handle_read_frame_dispatch();
}
- rx_segments_todo_rev.pop_back();
+ std::unique_ptr<buffer::ptr_node, buffer::ptr_node::disposer> rx_buffer;
+ try {
+ rx_buffer = ceph::buffer::ptr_node::create(buffer::create_aligned(
+ cur_rx_desc.onwire_length, cur_rx_desc.logical.alignment));
+ } catch (std::bad_alloc&) {
+ // Catching because of potential issues with satisfying alignment.
+ ldout(cct, 20) << __func__ << " can't allocate aligned rx_buffer "
+ << " len=" << cur_rx_desc.onwire_length
+ << " align=" << cur_rx_desc.logical.alignment
+ << dendl;
+ return _fault();
+ }
+
rx_segments_data.emplace_back();
rx_segments_data.back().push_back(std::move(rx_buffer));
- return READB(rx_segments_data.back().length(),
- rx_segments_data.back().c_str(),
- handle_read_frame_segment);
+ return READB(rx_segments_data.back().length(), rx_segments_data.back().c_str(),
+ handle_read_frame_segment);
}
CtPtr ProtocolV2::handle_read_frame_segment(char *buffer, int r) {
return _fault();
}
- if (!rx_segments_todo_rev.empty()) {
- return read_frame_segment();
+ if (rx_segments_desc.size() == rx_segments_data.size()) {
+ // OK, all segments planned to read are read. Can go with dispatch.
+ return handle_read_frame_dispatch();
} else {
// TODO: for makeshift only. This will be more generic and throttled
- return handle_read_frame_dispatch();
+ return read_frame_segment();
}
}
CtPtr ProtocolV2::throttle_bytes() {
ldout(cct, 20) << __func__ << dendl;
- ceph_assert(rx_segments_todo_rev.size() == 4);
- uint32_t cur_msg_size = rx_segments_todo_rev[2].length + \
- rx_segments_todo_rev[1].length + \
- rx_segments_todo_rev[0].length;
+ ceph_assert(rx_segments_desc.size() == 4);
+ uint32_t cur_msg_size = rx_segments_desc[1].logical.length + \
+ rx_segments_desc[2].logical.length + \
+ rx_segments_desc[3].logical.length;
if (cur_msg_size) {
if (connection->policy.throttler_bytes) {
ldout(cct, 10) << __func__ << " wants " << cur_msg_size
CtPtr ProtocolV2::throttle_dispatch_queue() {
ldout(cct, 20) << __func__ << dendl;
- ceph_assert(rx_segments_todo_rev.size() == 4);
- uint32_t cur_msg_size = rx_segments_todo_rev[2].length + \
- rx_segments_todo_rev[1].length + \
- rx_segments_todo_rev[0].length;
+ ceph_assert(rx_segments_desc.size() == 4);
+ uint32_t cur_msg_size = rx_segments_desc[1].logical.length + \
+ rx_segments_desc[2].logical.length + \
+ rx_segments_desc[3].logical.length;
if (cur_msg_size) {
if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
throttle_stamp = ceph_clock_now();
state = THROTTLE_DONE;
- rx_segments_data.clear();
return read_frame_segment();
}