#define CEPH_MSG_FOOTER_COMPLETE (1<<0) /* msg wasn't aborted */
#define CEPH_MSG_FOOTER_NOCRC (1<<1) /* no data crc */
#define CEPH_MSG_FOOTER_SIGNED (1<<2) /* msg was signed */
+#define CEPH_MSG_FOOTER_LATEABRT (1<<3) /* msg was aborted after txing data */
#endif
sent_tag = static_cast<Tag>(0);
next_tag = static_cast<Tag>(0);
+ reset_throttle();
+}
+
+void ProtocolV2::reset_throttle() {
if (state > THROTTLE_MESSAGE && state <= THROTTLE_DONE &&
connection->policy.throttler_messages) {
ldout(cct, 10) << __func__ << " releasing " << 1
0};
ceph_msg_footer footer{0, 0, 0, 0, current_header.flags};
+ // we do have a mechanism that allows transmitter to start sending message
+ // and abort after putting entire data field on wire. This will be used by
+ // the kernel client to avoid unnecessary buffering.
+ if (current_header.flags & CEPH_MSG_FOOTER_LATEABRT) {
+ ceph_assert(state == THROTTLE_DONE);
+
+ reset_throttle();
+ state = READY;
+ return CONTINUE(read_frame);
+ }
+
Message *message = decode_message(cct, 0, header, footer,
rx_segments_data[SegmentIndex::Msg::FRONT],
rx_segments_data[SegmentIndex::Msg::MIDDLE],
void requeue_sent();
uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq);
void reset_recv_state();
+ void reset_throttle();
Ct<ProtocolV2> *_fault();
void discard_out_queue();
void reset_session();