static constexpr uint8_t CRYPTO_BLOCK_SIZE { 16 };
-struct segment_t {
- __le32 length;
- __le16 alignment;
-} __attribute__((packed));
+using segment_t = ProtocolV2::segment_t;
struct preamble_main_t {
static constexpr std::size_t MAX_NUM_SEGMENTS = 2;
__le16 crc;
__u8 tag;
__u8 num_segments;
- std::array<segment_t, MAX_NUM_SEGMENTS> segments;
+ std::array<ProtocolV2::segment_t, MAX_NUM_SEGMENTS> segments;
} __attribute__((packed));
static_assert(sizeof(preamble_main_t) == CRYPTO_BLOCK_SIZE);
static_assert(std::is_standard_layout<preamble_main_t>::value);
struct preamble_extra_t {
static constexpr std::size_t MAX_NUM_SEGMENTS = 2;
- std::array<segment_t, MAX_NUM_SEGMENTS> segments;
+ std::array<ProtocolV2::segment_t, MAX_NUM_SEGMENTS> segments;
std::array<__u8, 4> always_padding;
} __attribute__((packed));
static_assert(sizeof(preamble_extra_t) == CRYPTO_BLOCK_SIZE);
// everything through ::decode is unnecessary.
next_payload_len = main_preamble.segments[0].length;
+ rx_segments_todo_rev.clear();
+ for (std::uint8_t idx = main_preamble.num_segments;
+ idx <= rx_segments_todo_rev.capacity() && idx > 0;
+ /* NOP */)
+ {
+ --idx;
+ ldout(cct, 10) << __func__ << " got new segment:"
+ << " len=" << main_preamble.segments[idx].length
+ << " align=" << main_preamble.segments[idx].alignment
+ << dendl;
+ rx_segments_todo_rev.emplace_back(main_preamble.segments[idx]);
+ }
+
// TODO: move this ugliness into dedicated procedure
const auto rx_crc = ceph_crc16c(0,
reinterpret_cast<const unsigned char*>(&main_preamble) + sizeof(main_preamble.crc),
case Tag::KEEPALIVE2:
case Tag::KEEPALIVE2_ACK:
case Tag::ACK:
- return READ(next_payload_len, handle_frame_payload);
+ rx_segments_data.clear();
+ return read_frame_segment();
case Tag::WAIT:
return handle_wait();
case Tag::MESSAGE:
return nullptr;
}
+CtPtr ProtocolV2::read_frame_segment() {
+ ldout(cct, 20) << __func__ << dendl;
+ ceph_assert(!rx_segments_todo_rev.empty());
+
+#if 0
+ auto rx_buffer = ceph::buffer::ptr_node::create(
+ buffer::create_aligned(rx_segments_todo_rev.back().length,
+ rx_segments_todo_rev.back().alignment));
+#else
+ auto rx_buffer = ceph::buffer::ptr_node::create(
+ buffer::create(rx_segments_todo_rev.back().length));
+#endif
+
+ rx_segments_todo_rev.pop_back();
+ rx_segments_data.emplace_back();
+ rx_segments_data.back().push_back(std::move(rx_buffer));
+ return READB(rx_segments_data.back().length(),
+ rx_segments_data.back().c_str(),
+ handle_frame_segment);
+}
+
+CtPtr ProtocolV2::handle_frame_segment(char *buffer, int r) {
+ ldout(cct, 20) << __func__ << " r=" << r << dendl;
+
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " read frame segment failed r=" << r << " ("
+ << cpp_strerror(r) << ")" << dendl;
+ return _fault();
+ }
+
+ if (!rx_segments_todo_rev.empty()) {
+ return read_frame_segment();
+ } else {
+ // TODO: for makeshift only. This will be more generic and throttled
+ return handle_frame_payload(buffer, r);
+ }
+}
+
CtPtr ProtocolV2::handle_frame_payload(char *buffer, int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
return _fault();
}
+ ceph_assert(!rx_segments_data.empty());
+ const auto this_payload_len = rx_segments_data.back().length();
+
ldout(cct, 30) << __func__ << "\n";
bufferlist bl;
- bl.append(buffer, next_payload_len);
+ bl.append(buffer, this_payload_len);
bl.hexdump(*_dout);
*_dout << dendl;
switch (next_tag) {
case Tag::HELLO:
- return handle_hello(buffer, next_payload_len);
+ return handle_hello(buffer, this_payload_len);
case Tag::AUTH_REQUEST:
- return handle_auth_request(buffer, next_payload_len);
+ return handle_auth_request(buffer, this_payload_len);
case Tag::AUTH_BAD_METHOD:
- return handle_auth_bad_method(buffer, next_payload_len);
+ return handle_auth_bad_method(buffer, this_payload_len);
case Tag::AUTH_REPLY_MORE:
- return handle_auth_reply_more(buffer, next_payload_len);
+ return handle_auth_reply_more(buffer, this_payload_len);
case Tag::AUTH_REQUEST_MORE:
- return handle_auth_request_more(buffer, next_payload_len);
+ return handle_auth_request_more(buffer, this_payload_len);
case Tag::AUTH_DONE:
- return handle_auth_done(buffer, next_payload_len);
+ return handle_auth_done(buffer, this_payload_len);
case Tag::CLIENT_IDENT:
- return handle_client_ident(buffer, next_payload_len);
+ return handle_client_ident(buffer, this_payload_len);
case Tag::SERVER_IDENT:
- return handle_server_ident(buffer, next_payload_len);
+ return handle_server_ident(buffer, this_payload_len);
case Tag::IDENT_MISSING_FEATURES:
- return handle_ident_missing_features(buffer, next_payload_len);
+ return handle_ident_missing_features(buffer, this_payload_len);
case Tag::SESSION_RECONNECT:
- return handle_reconnect(buffer, next_payload_len);
+ return handle_reconnect(buffer, this_payload_len);
case Tag::SESSION_RESET:
- return handle_session_reset(buffer, next_payload_len);
+ return handle_session_reset(buffer, this_payload_len);
case Tag::SESSION_RETRY:
- return handle_session_retry(buffer, next_payload_len);
+ return handle_session_retry(buffer, this_payload_len);
case Tag::SESSION_RETRY_GLOBAL:
- return handle_session_retry_global(buffer, next_payload_len);
+ return handle_session_retry_global(buffer, this_payload_len);
case Tag::SESSION_RECONNECT_OK:
- return handle_reconnect_ok(buffer, next_payload_len);
+ return handle_reconnect_ok(buffer, this_payload_len);
case Tag::KEEPALIVE2:
- return handle_keepalive2(buffer, next_payload_len);
+ return handle_keepalive2(buffer, this_payload_len);
case Tag::KEEPALIVE2_ACK:
- return handle_keepalive2_ack(buffer, next_payload_len);
+ return handle_keepalive2_ack(buffer, this_payload_len);
case Tag::ACK:
- return handle_message_ack(buffer, next_payload_len);
+ return handle_message_ack(buffer, this_payload_len);
default:
ceph_abort();
}
#ifndef _MSG_ASYNC_PROTOCOL_V2_
#define _MSG_ASYNC_PROTOCOL_V2_
+#include <boost/container/static_vector.hpp>
+
#include "Protocol.h"
#include "crypto_onwire.h"
Ct<ProtocolV2> *bannerExchangeCallback;
uint32_t next_payload_len;
+
+public:
+ struct segment_t {
+ __le32 length;
+ __le16 alignment;
+ } __attribute__((packed));
+
+private:
+ static constexpr std::size_t MAX_NUM_SEGMENTS = 4;
+ // segment descriptors are stored in reversed order. This is because
+ // vectors don't support ::pop_front. We might want to exchange
+ // the container to slightly tuned one in the future.
+ boost::container::static_vector<segment_t,
+ MAX_NUM_SEGMENTS> rx_segments_todo_rev;
+ boost::container::static_vector<ceph::bufferlist,
+ MAX_NUM_SEGMENTS> rx_segments_data;
+
+
Tag next_tag;
ceph_msg_header2 current_header;
utime_t backoff; // backoff time
CONTINUATION_DECL(ProtocolV2, read_frame);
READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_preamble_main);
- READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_frame_payload);
+ READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_frame_segment);
READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_header);
CONTINUATION_DECL(ProtocolV2, throttle_message);
CONTINUATION_DECL(ProtocolV2, throttle_bytes);
Ct<ProtocolV2> *read_frame();
Ct<ProtocolV2> *handle_read_frame_preamble_main(char *buffer, int r);
Ct<ProtocolV2> *handle_read_frame_dispatch();
+ Ct<ProtocolV2> *read_frame_segment();
+ Ct<ProtocolV2> *handle_frame_segment(char *buffer, int r);
Ct<ProtocolV2> *handle_frame_payload(char *buffer, int r);
Ct<ProtocolV2> *ready();