connection->pendingReadLen.reset();
connection->writeCallback.reset();
- uint32_t cur_msg_size = rx_segments_desc[1].logical.length + \
- rx_segments_desc[2].logical.length + \
- rx_segments_desc[3].logical.length;
+ uint32_t cur_msg_size = \
+ rx_segments_desc[SegmentIndex::Msg::FRONT].logical.length + \
+ rx_segments_desc[SegmentIndex::Msg::MIDDLE].logical.length + \
+ rx_segments_desc[SegmentIndex::Msg::DATA].logical.length;
if (state > THROTTLE_MESSAGE && state <= THROTTLE_DONE &&
connection->policy.throttler_messages) {
if (auth_meta->is_mode_secure()) {
ceph_assert(session_stream_handlers.rx);
- rx_segments_data[0] = \
+ rx_segments_data[SegmentIndex::Msg::HEADER] = \
session_stream_handlers.rx->authenticated_decrypt_update(
- std::move(rx_segments_data[0]),
+ std::move(rx_segments_data[SegmentIndex::Msg::HEADER]),
segment_t::DEFAULT_ALIGNMENT);
}
- MessageHeaderFrame header_frame(std::move(rx_segments_data[0]));
+ MessageHeaderFrame header_frame(
+ std::move(rx_segments_data[SegmentIndex::Msg::HEADER]));
ceph_msg_header2 &header = header_frame.header();
ldout(cct, 20) << __func__
current_header = header;
// front
- ceph_assert(current_header.front_len == rx_segments_data[1].length());
+ ceph_assert(current_header.front_len == \
+ rx_segments_data[SegmentIndex::Msg::FRONT].length());
ceph_assert(!front.length());
- front = std::move(rx_segments_data[1]);
+ front = std::move(rx_segments_data[SegmentIndex::Msg::FRONT]);
// middle
- ceph_assert(current_header.middle_len == rx_segments_data[2].length());
+ ceph_assert(current_header.middle_len == \
+ rx_segments_data[SegmentIndex::Msg::MIDDLE].length());
ceph_assert(!middle.length());
- middle = std::move(rx_segments_data[2]);
+ middle = std::move(rx_segments_data[SegmentIndex::Msg::MIDDLE]);
next_payload_len -= sizeof(ceph_msg_header2);
next_payload_len -= front.length();
ldout(cct, 20) << __func__ << dendl;
ceph_assert(rx_segments_desc.size() == 4);
- uint32_t cur_msg_size = rx_segments_desc[1].logical.length + \
- rx_segments_desc[2].logical.length + \
- rx_segments_desc[3].logical.length;
+ uint32_t cur_msg_size = \
+ rx_segments_desc[SegmentIndex::Msg::FRONT].logical.length + \
+ rx_segments_desc[SegmentIndex::Msg::MIDDLE].logical.length + \
+ rx_segments_desc[SegmentIndex::Msg::DATA].logical.length;
if (cur_msg_size) {
if (connection->policy.throttler_bytes) {
ldout(cct, 10) << __func__ << " wants " << cur_msg_size
ldout(cct, 20) << __func__ << dendl;
ceph_assert(rx_segments_desc.size() == 4);
- uint32_t cur_msg_size = rx_segments_desc[1].logical.length + \
- rx_segments_desc[2].logical.length + \
- rx_segments_desc[3].logical.length;
+ uint32_t cur_msg_size =
+ rx_segments_desc[SegmentIndex::Msg::FRONT].logical.length + \
+ rx_segments_desc[SegmentIndex::Msg::MIDDLE].logical.length + \
+ rx_segments_desc[SegmentIndex::Msg::DATA].logical.length;
if (cur_msg_size) {
if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
ldout(cct, 1) << __func__ << " received WAIT (connection race)" << dendl;
state = WAIT;
ceph_assert(rx_segments_data.size() == 1);
- WaitFrame(*this, rx_segments_data[0].c_str(), rx_segments_data[0].length());
+ ceph_assert(rx_segments_desc.size() == 1);
+ WaitFrame(*this, rx_segments_data[SegmentIndex::Frame::PAYLOAD].c_str(),
+ rx_segments_data[SegmentIndex::Frame::PAYLOAD].length());
return _fault();
}
struct segment_t logical;
} __attribute__((packed));
-private:
static constexpr std::size_t MAX_NUM_SEGMENTS = 4;
+ struct SegmentIndex {
+ struct Msg {
+ static constexpr std::size_t HEADER = 0;
+ static constexpr std::size_t FRONT = 1;
+ static constexpr std::size_t MIDDLE = 2;
+ static constexpr std::size_t DATA = 3;
+ };
+
+ struct Frame {
+ static constexpr std::size_t PAYLOAD = 0;
+ };
+ };
+
boost::container::static_vector<onwire_segment_t,
MAX_NUM_SEGMENTS> rx_segments_desc;
boost::container::static_vector<ceph::bufferlist,
MAX_NUM_SEGMENTS> rx_segments_data;
+private:
Tag next_tag;
ceph_msg_header2 current_header;