reset_throttle();
}
+size_t ProtocolV2::get_current_msg_size() const {
+ ceph_assert(!rx_segments_desc.empty());
+ size_t sum = 0;
+ // we don't include SegmentIndex::Msg::HEADER.
+ for (__u8 idx = 1; idx < rx_segments_desc.size(); idx++) {
+ sum += rx_segments_desc[idx].length;
+ }
+ return sum;
+}
+
void ProtocolV2::reset_throttle() {
if (state > THROTTLE_MESSAGE && state <= THROTTLE_DONE &&
connection->policy.throttler_messages) {
}
if (state > THROTTLE_BYTES && state <= THROTTLE_DONE) {
if (connection->policy.throttler_bytes) {
- const uint32_t cur_msg_size = \
- rx_segments_desc[SegmentIndex::Msg::FRONT].length + \
- rx_segments_desc[SegmentIndex::Msg::MIDDLE].length + \
- rx_segments_desc[SegmentIndex::Msg::DATA].length;
-
+ const size_t cur_msg_size = get_current_msg_size();
ldout(cct, 10) << __func__ << " releasing " << cur_msg_size
<< " bytes to policy throttler "
<< connection->policy.throttler_bytes->get_current() << "/"
}
}
if (state > THROTTLE_DISPATCH_QUEUE && state <= THROTTLE_DONE) {
- const uint32_t cur_msg_size = \
- rx_segments_desc[SegmentIndex::Msg::FRONT].length + \
- rx_segments_desc[SegmentIndex::Msg::MIDDLE].length + \
- rx_segments_desc[SegmentIndex::Msg::DATA].length;
-
+ const size_t cur_msg_size = get_current_msg_size();
ldout(cct, 10)
<< __func__ << " releasing " << cur_msg_size
<< " bytes to dispatch_queue throttler "
return _fault();
}
- // currently we do support only 4 or 1 segments
- if (main_preamble.num_segments != 1 && main_preamble.num_segments != 4) {
+ // currently we do support between 1 and MAX_NUM_SEGMENTS segments
+ if (main_preamble.num_segments < 1 ||
+ main_preamble.num_segments > MAX_NUM_SEGMENTS) {
ldout(cct, 10) << __func__ << " unsupported num_segments="
<< " tx_crc=" << main_preamble.num_segments << dendl;
return _fault();
CtPtr ProtocolV2::handle_message() {
ldout(cct, 20) << __func__ << dendl;
-
- ceph_assert(rx_segments_data.size() == 4);
ceph_assert(state == THROTTLE_DONE);
#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
CtPtr ProtocolV2::throttle_bytes() {
ldout(cct, 20) << __func__ << dendl;
- ceph_assert(rx_segments_desc.size() == 4);
- uint32_t cur_msg_size = \
- rx_segments_desc[SegmentIndex::Msg::FRONT].length + \
- rx_segments_desc[SegmentIndex::Msg::MIDDLE].length + \
- rx_segments_desc[SegmentIndex::Msg::DATA].length;
+ const size_t cur_msg_size = get_current_msg_size();
if (cur_msg_size) {
if (connection->policy.throttler_bytes) {
ldout(cct, 10) << __func__ << " wants " << cur_msg_size
CtPtr ProtocolV2::throttle_dispatch_queue() {
ldout(cct, 20) << __func__ << dendl;
- ceph_assert(rx_segments_desc.size() == 4);
- uint32_t cur_msg_size =
- rx_segments_desc[SegmentIndex::Msg::FRONT].length + \
- rx_segments_desc[SegmentIndex::Msg::MIDDLE].length + \
- rx_segments_desc[SegmentIndex::Msg::DATA].length;
-
+ const size_t cur_msg_size = get_current_msg_size();
if (cur_msg_size) {
if (!connection->dispatch_queue->dispatch_throttler.get_or_fail(
cur_msg_size)) {
};
ceph::bufferlist::contiguous_filler preamble_filler;
+ __u8 calc_num_segments(
+ const std::array<segment_t, MAX_NUM_SEGMENTS>& segments)
+ {
+ for (__u8 num = SegmentsNumV; num > 0; num--) {
+ if (segments[num-1].length) {
+ return num;
+ }
+ }
+ // frame always has at least one segment.
+ return 1;
+ }
+
// craft the main preamble. It's always present regardless of the number
// of segments message is composed from.
void fill_preamble() {
segments.front().length() - FRAME_PREAMBLE_SIZE;
main_preamble.segments.front().alignment = alignments.front();
+ // there is no business in issuing frame without at least one segment
+ // filled.
if constexpr(SegmentsNumV > 1) {
for (__u8 idx = 1; idx < SegmentsNumV; idx++) {
main_preamble.segments[idx].length = segments[idx].length();
main_preamble.segments[idx].alignment = alignments[idx];
}
}
- main_preamble.num_segments = SegmentsNumV;
+ // calculate the number of non-empty segments.
+ // TODO: reorder segments to get DATA first
+ main_preamble.num_segments = calc_num_segments(main_preamble.segments);
main_preamble.crc =
ceph_crc32c(0, reinterpret_cast<unsigned char *>(&main_preamble),
segment_t::DEFAULT_ALIGNMENT,
segment_t::DEFAULT_ALIGNMENT,
segment_t::PAGE_SIZE_ALIGNMENT> {
+ struct {
+ uint32_t front;
+ uint32_t middle;
+ uint32_t data;
+ } len;
+
static const Tag tag = Tag::MESSAGE;
static MessageFrame Encode(const ceph_msg_header2 &msg_header,