From: Ricardo Dias Date: Fri, 16 Nov 2018 17:17:34 +0000 (+0000) Subject: msg/async: msgr2: refactored the frame structures X-Git-Tag: v14.1.0~271^2~45 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=68fe3ff7349539d4e212d9d8cb5a84da98fedbbf;p=ceph.git msg/async: msgr2: refactored the frame structures Signed-off-by: Ricardo Dias --- diff --git a/src/msg/async/Protocol.h b/src/msg/async/Protocol.h index ddf8bafe363..5a115cb57c5 100644 --- a/src/msg/async/Protocol.h +++ b/src/msg/async/Protocol.h @@ -64,14 +64,6 @@ public: } \ } -#define CONTINUATION_RUN2(I, CT) \ - { \ - Ct::type> *_cont = CT; \ - while (_cont) { \ - _cont = _cont->call(I); \ - } \ - } - #define READ_HANDLER_CONTINUATION_DECL(C, F) \ CONTINUATION_DECL(C, F, char *, int) #define WRITE_HANDLER_CONTINUATION_DECL(C, F) CONTINUATION_DECL(C, F, int) diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index b5b36f025d3..4e0c66cac54 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -21,16 +21,31 @@ ostream &ProtocolV2::_conn_prefix(std::ostream *_dout) { << " l=" << connection->policy.lossy << ")."; } +// TODO: REMOVE THIS +void ProtocolV2::log(const std::string message, uint64_t val, uint64_t val2) { + ldout(cct, 1) << __func__ << " " << message << val << ":" << val2 << dendl; +} + +using CtPtr = Ct *; + +void ProtocolV2::run_continuation(CtPtr continuation) { + try { + CONTINUATION_RUN(continuation) + } catch (const buffer::error &e) { + lderr(cct) << __func__ << " failed decoding of frame header: " << e + << dendl; + _fault(); + } +} + const int ASYNC_COALESCE_THRESHOLD = 256; -#define WRITE(B, C) write(CONTINUATION(C), B) +#define WRITE(B, D, C) write(D, CONTINUATION(C), B) #define READ(L, C) read(CONTINUATION(C), L) #define READB(L, B, C) read(CONTINUATION(C), L, B) -using CtPtr = Ct *; - static void alloc_aligned_buffer(bufferlist &data, unsigned len, unsigned off) { // create a buffer to read into that matches the data alignment unsigned alloc_len = 0; @@ -48,6 +63,318 @@ static void alloc_aligned_buffer(bufferlist &data, unsigned len, unsigned off) { data.push_back(std::move(ptr)); } +/** + * Protocol V2 Frame Structures + **/ + +template +struct Frame { +protected: + bufferlist payload; + bufferlist frame_buffer; + +public: + Frame() {} + + bufferlist &get_buffer() { + if (frame_buffer.length()) { + return frame_buffer; + } + encode((uint32_t)(payload.length() + sizeof(uint32_t)), frame_buffer, -1ll); + uint32_t tag = static_cast(static_cast(this)->tag); + ceph_assert(tag != 0); + encode(tag, frame_buffer, -1ll); + frame_buffer.claim_append(payload); + return frame_buffer; + } + + void decode_frame(char *payload, uint32_t length) { + bufferlist bl; + bl.push_back(buffer::create_static(length, payload)); + auto ti = bl.cbegin(); + static_cast(this)->decode_payload(ti); + } + + void decode_payload(bufferlist::const_iterator &ti) {} +}; + +template +struct PayloadFrame : public Frame { + std::tuple _values; + + template + inline void _encode_payload_each(T &t) { + if constexpr (std::is_same()) { + encode((uint32_t)t.length(), this->payload, -1ll); + this->payload.claim_append((bufferlist &)t); + } else if constexpr (std::is_same const>()) { + encode((uint32_t)t.size(), this->payload, -1ll); + for (const auto &elem : t) { + encode(elem, this->payload, 0); + } + } else { + encode(t, this->payload, -1ll); + } + } + + PayloadFrame(const Args &... args) { (_encode_payload_each(args), ...); } + + PayloadFrame(char *payload, uint32_t length) { + this->decode_frame(payload, length); + } + + template + inline void _decode_payload_each(T &t, bufferlist::const_iterator &ti) const { + if constexpr (std::is_same()) { + uint32_t len; + decode(len, ti); + ceph_assert(len == ti.get_remaining()); + if (len) { + t.append(ti.get_current_ptr()); + } + } else if constexpr (std::is_same>()) { + uint32_t size; + decode(size, ti); + for (uint32_t i = 0; i < size; ++i) { + decode(t[i], ti); + } + } else { + decode(t, ti); + } + } + + template + inline void _decode_payload(bufferlist::const_iterator &ti, + std::index_sequence) const { + (_decode_payload_each((Args &)std::get(_values), ti), ...); + } + + void decode_payload(bufferlist::const_iterator &ti) { + _decode_payload(ti, std::index_sequence_for()); + } + + template + inline decltype(auto) get_val() { + return std::get(_values); + } +}; + +struct AuthRequestFrame + : public PayloadFrame { + const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_REQUEST; + using PayloadFrame::PayloadFrame; + + AuthRequestFrame(uint32_t method) : AuthRequestFrame(method, bufferlist()) {} + + inline uint32_t &method() { return get_val<0>(); } + inline bufferlist &auth_payload() { return get_val<1>(); } +}; + +struct AuthBadMethodFrame + : public PayloadFrame> { + const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_BAD_METHOD; + using PayloadFrame::PayloadFrame; + + inline uint32_t &method() { return get_val<0>(); } + inline std::vector &allowed_methods() { return get_val<1>(); } +}; + +struct AuthBadAuthFrame + : public PayloadFrame { + const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_BAD_AUTH; + using PayloadFrame::PayloadFrame; + + inline uint32_t &error_code() { return get_val<0>(); } + inline std::string &error_msg() { return get_val<1>(); } +}; + +struct AuthMoreFrame : public PayloadFrame { + const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_MORE; + using PayloadFrame::PayloadFrame; + + inline bufferlist &auth_payload() { return get_val<0>(); } +}; + +struct AuthDoneFrame + : public PayloadFrame { + const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_DONE; + using PayloadFrame::PayloadFrame; + + inline uint64_t &flags() { return get_val<0>(); } + inline bufferlist &auth_payload() { return get_val<1>(); } +}; + +template +struct SignedEncryptedFrame : public PayloadFrame { +protected: + ProtocolV2 *protocol; + +public: + SignedEncryptedFrame(ProtocolV2 *protocol, const Args &... args) + : PayloadFrame(args...), protocol(protocol) {} + + SignedEncryptedFrame(ProtocolV2 *protocol, char *payload, uint32_t length) + : PayloadFrame(payload, length), protocol(protocol) {} + + bufferlist &get_buffer() { + if (this->frame_buffer.length()) { + return this->frame_buffer; + } + + bufferlist signature; + if (protocol->session_security) { + protocol->session_security->sign_bufferlist(this->payload, signature); + protocol->log("payload signature=", signature.length(), this->payload.length()); + } + + encode((uint32_t)(this->payload.length() + sizeof(uint32_t)), + this->frame_buffer, -1ll); + uint32_t tag = static_cast(static_cast(this)->tag); + ceph_assert(tag != 0); + encode(tag, this->frame_buffer, -1ll); + this->frame_buffer.claim_append(this->payload); + return this->frame_buffer; + } +}; + +struct ClientIdentFrame + : public SignedEncryptedFrame { + const ProtocolV2::Tag tag = ProtocolV2::Tag::IDENT; + using SignedEncryptedFrame::SignedEncryptedFrame; + + inline entity_addrvec_t &addrs() { return get_val<0>(); } + inline int64_t &gid() { return get_val<1>(); } + inline uint64_t &global_seq() { return get_val<2>(); } + inline uint64_t &supported_features() { return get_val<3>(); } + inline uint64_t &required_features() { return get_val<4>(); } + inline uint64_t &flags() { return get_val<5>(); } +}; + +struct ServerIdentFrame + : public SignedEncryptedFrame { + const ProtocolV2::Tag tag = ProtocolV2::Tag::IDENT; + using SignedEncryptedFrame::SignedEncryptedFrame; + + inline entity_addrvec_t &addrs() { return get_val<0>(); } + inline int64_t &gid() { return get_val<1>(); } + inline uint64_t &global_seq() { return get_val<2>(); } + inline uint64_t &supported_features() { return get_val<3>(); } + inline uint64_t &required_features() { return get_val<4>(); } + inline uint64_t &flags() { return get_val<5>(); } + inline uint64_t &cookie() { return get_val<6>(); } +}; + +struct ReconnectFrame + : public SignedEncryptedFrame { + const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RECONNECT; + using SignedEncryptedFrame::SignedEncryptedFrame; + + inline entity_addrvec_t &addrs() { return get_val<0>(); } + inline uint64_t &cookie() { return get_val<1>(); } + inline uint64_t &global_seq() { return get_val<2>(); } + inline uint64_t &connect_seq() { return get_val<3>(); } + inline uint64_t &msg_seq() { return get_val<4>(); } +}; + +struct ResetFrame : public Frame { + const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RESET; +}; + +struct RetryFrame : public SignedEncryptedFrame { + const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RETRY; + using SignedEncryptedFrame::SignedEncryptedFrame; + + uint64_t connect_seq() { return get_val<0>(); } +}; + +struct RetryGlobalFrame + : public SignedEncryptedFrame { + const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RETRY_GLOBAL; + using SignedEncryptedFrame::SignedEncryptedFrame; + + inline uint64_t &global_seq() { return get_val<0>(); } +}; + +struct WaitFrame : public Frame { + const ProtocolV2::Tag tag = ProtocolV2::Tag::WAIT; +}; + +struct ReconnectOkFrame + : public SignedEncryptedFrame { + const ProtocolV2::Tag tag = ProtocolV2::Tag::SESSION_RECONNECT_OK; + using SignedEncryptedFrame::SignedEncryptedFrame; + + inline uint64_t &msg_seq() { return get_val<0>(); } +}; + +struct IdentMissingFeaturesFrame + : public SignedEncryptedFrame { + const ProtocolV2::Tag tag = ProtocolV2::Tag::IDENT_MISSING_FEATURES; + using SignedEncryptedFrame::SignedEncryptedFrame; + + inline uint64_t &features() { return get_val<0>(); } +}; + +struct MessageFrame : public Frame { + const ProtocolV2::Tag tag = ProtocolV2::Tag::MESSAGE; + const unsigned int ASYNC_COALESCE_THRESHOLD = 256; + + ceph_msg_header2 header2; + + MessageFrame(Message *msg, bufferlist &data, uint64_t ack_seq, + bool calc_crc) { + ceph_msg_header &header = msg->get_header(); + ceph_msg_footer &footer = msg->get_footer(); + + header2 = ceph_msg_header2{header.seq, header.tid, + header.type, header.priority, + header.version, header.front_len, + header.middle_len, 0, + header.data_len, header.data_off, + ack_seq, footer.front_crc, + footer.middle_crc, footer.data_crc, + footer.flags, header.compat_version, + header.reserved, 0}; + + if (calc_crc) { + header2.header_crc = + ceph_crc32c(0, (unsigned char *)&header2, + sizeof(header2) - sizeof(header2.header_crc)); + } + + payload.append((char *)&header2, sizeof(header2)); + if ((data.length() <= ASYNC_COALESCE_THRESHOLD) && + (data.buffers().size() > 1)) { + for (const auto &pb : data.buffers()) { + payload.append((char *)pb.c_str(), pb.length()); + } + } else { + payload.claim_append(data); + } + } +}; + +struct KeepAliveFrame : public SignedEncryptedFrame { + const ProtocolV2::Tag tag = ProtocolV2::Tag::KEEPALIVE2; + using SignedEncryptedFrame::SignedEncryptedFrame; + + KeepAliveFrame(ProtocolV2 *protocol) + : KeepAliveFrame(protocol, ceph_clock_now()) {} + + inline utime_t ×tamp() { return get_val<0>(); } +}; + +struct AckFrame : public SignedEncryptedFrame { + const ProtocolV2::Tag tag = ProtocolV2::Tag::ACK; + using SignedEncryptedFrame::SignedEncryptedFrame; + + inline uint64_t &seq() { return get_val<0>(); } +}; + ProtocolV2::ProtocolV2(AsyncConnection *connection) : Protocol(2, connection), temp_buffer(nullptr), @@ -55,14 +382,17 @@ ProtocolV2::ProtocolV2(AsyncConnection *connection) peer_required_features(0), authorizer(nullptr), got_bad_auth(false), + got_bad_method(0), + auth_flags(0), cookie(0), + global_seq(0), connect_seq(0), peer_global_seq(0), message_seq(0), replacing(false), can_write(false), bannerExchangeCallback(nullptr), - next_frame_len(0), + next_payload_len(0), keepalive(false) { temp_buffer = new char[4096]; } @@ -78,11 +408,11 @@ void ProtocolV2::connect() { state = START_CONNECT; got_bad_auth = false; + got_bad_method = 0; if (authorizer) { delete authorizer; authorizer = nullptr; } - global_seq = messenger->get_global_seq(); } void ProtocolV2::accept() { state = START_ACCEPT; } @@ -101,10 +431,10 @@ void ProtocolV2::discard_out_queue() { (*p)->put(); } sent.clear(); - for (map > >::iterator p = + for (map>>::iterator p = out_queue.begin(); p != out_queue.end(); ++p) { - for (list >::iterator r = p->second.begin(); + for (list>::iterator r = p->second.begin(); r != p->second.end(); ++r) { ldout(cct, 20) << __func__ << " discard " << r->second << dendl; r->second->put(); @@ -162,7 +492,7 @@ void ProtocolV2::requeue_sent() { return; } - list > &rq = out_queue[CEPH_MSG_PRIO_HIGHEST]; + list> &rq = out_queue[CEPH_MSG_PRIO_HIGHEST]; out_seq -= sent.size(); while (!sent.empty()) { Message *m = sent.back(); @@ -179,7 +509,7 @@ uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) { if (out_queue.count(CEPH_MSG_PRIO_HIGHEST) == 0) { return seq; } - list > &rq = out_queue[CEPH_MSG_PRIO_HIGHEST]; + list> &rq = out_queue[CEPH_MSG_PRIO_HIGHEST]; uint64_t count = out_seq; while (!rq.empty()) { pair p = rq.front(); @@ -202,6 +532,7 @@ void ProtocolV2::reset_recv_state() { } authorizer = nullptr; got_bad_auth = false; + got_bad_method = 0; } // clean read and write callbacks @@ -390,22 +721,22 @@ void ProtocolV2::read_event() { switch (state) { case START_CONNECT: - CONTINUATION_RUN(CONTINUATION(start_client_banner_exchange)); + run_continuation(CONTINUATION(start_client_banner_exchange)); break; case START_ACCEPT: - CONTINUATION_RUN(CONTINUATION(start_server_banner_exchange)); + run_continuation(CONTINUATION(start_server_banner_exchange)); break; case READY: - CONTINUATION_RUN(CONTINUATION(read_frame)); + run_continuation(CONTINUATION(read_frame)); break; case THROTTLE_MESSAGE: - CONTINUATION_RUN(CONTINUATION(throttle_message)); + run_continuation(CONTINUATION(throttle_message)); break; case THROTTLE_BYTES: - CONTINUATION_RUN(CONTINUATION(throttle_bytes)); + run_continuation(CONTINUATION(throttle_bytes)); break; case THROTTLE_DISPATCH_QUEUE: - CONTINUATION_RUN(CONTINUATION(throttle_dispatch_queue)); + run_continuation(CONTINUATION(throttle_dispatch_queue)); break; default: break; @@ -415,10 +746,10 @@ void ProtocolV2::read_event() { Message *ProtocolV2::_get_next_outgoing(bufferlist *bl) { Message *m = 0; if (!out_queue.empty()) { - map > >::reverse_iterator it = + map>>::reverse_iterator it = out_queue.rbegin(); ceph_assert(!it->second.empty()); - list >::iterator p = it->second.begin(); + list>::iterator p = it->second.begin(); m = p->second; if (bl) { bl->swap(p->first); @@ -477,14 +808,12 @@ ssize_t ProtocolV2::write_message(Message *m, bufferlist &bl, bool more) { void ProtocolV2::append_keepalive() { ldout(cct, 10) << __func__ << dendl; - KeepAliveFrame keepalive_frame; + KeepAliveFrame keepalive_frame(this); connection->outcoming_bl.claim_append(keepalive_frame.get_buffer()); } void ProtocolV2::append_keepalive_ack(utime_t ×tamp) { - struct ceph_timespec ts; - timestamp.encode_timeval(&ts); - KeepAliveFrame keepalive_ack_frame(ts); + KeepAliveFrame keepalive_ack_frame(this, timestamp); connection->outcoming_bl.claim_append(keepalive_ack_frame.get_buffer()); } @@ -566,7 +895,7 @@ void ProtocolV2::write_event() { if (left) { ceph_le64 s; s = in_seq; - AckFrame ack(in_seq); + AckFrame ack(this, in_seq); connection->outcoming_bl.claim_append(ack.get_buffer()); ldout(cct, 10) << __func__ << " try send msg ack, acked " << left << " messages" << dendl; @@ -625,7 +954,7 @@ CtPtr ProtocolV2::read(CONTINUATION_PARAM(next, ProtocolV2, char *, int), ssize_t r = connection->read(len, buffer, [CONTINUATION(next), this](char *buffer, int r) { CONTINUATION(next)->setParams(buffer, r); - CONTINUATION_RUN(CONTINUATION(next)); + run_continuation(CONTINUATION(next)); }); if (r <= 0) { return CONTINUE(next, buffer, r); @@ -634,14 +963,27 @@ CtPtr ProtocolV2::read(CONTINUATION_PARAM(next, ProtocolV2, char *, int), return nullptr; } -CtPtr ProtocolV2::write(CONTINUATION_PARAM(next, ProtocolV2, int), +CtPtr ProtocolV2::write(const std::string &desc, + CONTINUATION_PARAM(next, ProtocolV2), bufferlist &buffer) { - ssize_t r = connection->write(buffer, [CONTINUATION(next), this](int r) { - CONTINUATION(next)->setParams(r); - CONTINUATION_RUN(CONTINUATION(next)); - }); + ssize_t r = + connection->write(buffer, [CONTINUATION(next), desc, this](int r) { + if (r < 0) { + ldout(cct, 1) << __func__ << " " << desc << " write failed r=" << r + << " (" << cpp_strerror(r) << ")" << dendl; + connection->inject_delay(); + _fault(); + } + run_continuation(CONTINUATION(next)); + }); + if (r <= 0) { - return CONTINUE(next, r); + if (r < 0) { + ldout(cct, 1) << __func__ << " " << desc << " write failed r=" << r + << " (" << cpp_strerror(r) << ")" << dendl; + return _fault(); + } + return CONTINUE(next); } return nullptr; @@ -670,17 +1012,10 @@ CtPtr ProtocolV2::_banner_exchange(CtPtr callback) { bufferlist bl; bl.append(banner, banner_len); - return WRITE(bl, _banner_exchange_handle_write); + return WRITE(bl, "banner", _wait_for_peer_banner); } -CtPtr ProtocolV2::_banner_exchange_handle_write(int r) { - ldout(cct, 20) << __func__ << " r=" << r << dendl; - if (r < 0) { - ldout(cct, 1) << __func__ << " write banner failed r=" << r << " (" - << cpp_strerror(r) << ")" << dendl; - return _fault(); - } - +CtPtr ProtocolV2::_wait_for_peer_banner() { unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(uint8_t) + 2 * sizeof(__le64); return READ(banner_len, _banner_exchange_handle_peer_banner); @@ -738,7 +1073,7 @@ unsigned banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX); if (connection->get_peer_type() != peer_type) { ldout(cct, 1) << __func__ << " connection peer type does not match what" << " peer advertises " << connection->get_peer_type() - << " != " << peer_type << dendl; + << " != " << (int)peer_type << dendl; stop(); connection->dispatch_queue->queue_reset(connection); return nullptr; @@ -806,10 +1141,23 @@ CtPtr ProtocolV2::handle_read_frame_length_and_tag(char *buffer, int r) { return _fault(); } - next_frame_len = *(uint32_t *)buffer - sizeof(uint32_t); - next_tag = static_cast(*(uint32_t *)(buffer + sizeof(uint32_t))); + bufferlist bl; + bl.push_back(buffer::create_static(sizeof(uint32_t) * 2, buffer)); + try { + auto ti = bl.cbegin(); + uint32_t frame_len; + decode(frame_len, ti); + next_payload_len = frame_len - sizeof(uint32_t); + uint32_t tag; + decode(tag, ti); + next_tag = static_cast(tag); + } catch (const buffer::error &e) { + lderr(cct) << __func__ << " failed decoding of frame header: " << e + << dendl; + return _fault(); + } - ldout(cct, 10) << __func__ << " next frame_len=" << next_frame_len + ldout(cct, 10) << __func__ << " next payload_len=" << next_payload_len << " tag=" << static_cast(next_tag) << dendl; switch (next_tag) { @@ -827,13 +1175,19 @@ CtPtr ProtocolV2::handle_read_frame_length_and_tag(char *buffer, int r) { case Tag::KEEPALIVE2: case Tag::KEEPALIVE2_ACK: case Tag::ACK: - return READ(next_frame_len, handle_frame_payload); + return READ(next_payload_len, handle_frame_payload); case Tag::SESSION_RESET: return handle_session_reset(); case Tag::WAIT: return handle_wait(); case Tag::MESSAGE: return handle_message(); + default: { + lderr(cct) << __func__ + << " received unknown tag=" << static_cast(next_tag) + << dendl; + ceph_abort(); + } } return nullptr; @@ -850,33 +1204,33 @@ CtPtr ProtocolV2::handle_frame_payload(char *buffer, int r) { switch (next_tag) { case Tag::AUTH_REQUEST: - return handle_auth_request(buffer, next_frame_len); + return handle_auth_request(buffer, next_payload_len); case Tag::AUTH_BAD_METHOD: - return handle_auth_bad_method(buffer, next_frame_len); + return handle_auth_bad_method(buffer, next_payload_len); case Tag::AUTH_BAD_AUTH: - return handle_auth_bad_auth(buffer, next_frame_len); + return handle_auth_bad_auth(buffer, next_payload_len); case Tag::AUTH_MORE: - return handle_auth_more(buffer, next_frame_len); + return handle_auth_more(buffer, next_payload_len); case Tag::AUTH_DONE: - return handle_auth_done(buffer, next_frame_len); + return handle_auth_done(buffer, next_payload_len); case Tag::IDENT: - return handle_ident(buffer, next_frame_len); + return handle_ident(buffer, next_payload_len); case Tag::IDENT_MISSING_FEATURES: - return handle_ident_missing_features(buffer, next_frame_len); + return handle_ident_missing_features(buffer, next_payload_len); case Tag::SESSION_RECONNECT: - return handle_reconnect(buffer, next_frame_len); + return handle_reconnect(buffer, next_payload_len); case Tag::SESSION_RETRY: - return handle_session_retry(buffer, next_frame_len); + return handle_session_retry(buffer, next_payload_len); case Tag::SESSION_RETRY_GLOBAL: - return handle_session_retry_global(buffer, next_frame_len); + return handle_session_retry_global(buffer, next_payload_len); case Tag::SESSION_RECONNECT_OK: - return handle_reconnect_ok(buffer, next_frame_len); + return handle_reconnect_ok(buffer, next_payload_len); case Tag::KEEPALIVE2: - return handle_keepalive2(buffer, next_frame_len); + return handle_keepalive2(buffer, next_payload_len); case Tag::KEEPALIVE2_ACK: - return handle_keepalive2_ack(buffer, next_frame_len); + return handle_keepalive2_ack(buffer, next_payload_len); case Tag::ACK: - return handle_message_ack(buffer, next_frame_len); + return handle_message_ack(buffer, next_payload_len); default: ceph_abort(); } @@ -888,46 +1242,25 @@ CtPtr ProtocolV2::handle_auth_more(char *payload, uint32_t length) { AuthMoreFrame auth_more(payload, length); ldout(cct, 1) << __func__ - << " auth more len=" << auth_more.auth_payload.length() + << " auth more len=" << auth_more.auth_payload().length() << dendl; if (state == CONNECTING) { ldout(cct, 10) << __func__ << " connect got auth challenge" << dendl; - - ceph_assert(authorizer); - authorizer->add_challenge(cct, auth_more.auth_payload); - AuthMoreFrame more_reply(authorizer->bl); - return WRITE(more_reply.get_buffer(), handle_auth_more_write); - + if (auth_method == CEPH_AUTH_CEPHX) { + ceph_assert(authorizer); + authorizer->add_challenge(cct, auth_more.auth_payload()); + AuthMoreFrame more_reply(authorizer->bl); + return WRITE(more_reply.get_buffer(), "auth more", read_frame); + } else { + ceph_abort("Auth method %d not implemented", auth_method); + } } else if (state == ACCEPTING) { - connection->lock.unlock(); - bool authorizer_valid; - bufferlist authorizer_reply; - ceph_assert((bool)authorizer_challenge); - if (!messenger->ms_deliver_verify_authorizer( - connection, connection->peer_type, auth_method, - auth_more.auth_payload, authorizer_reply, authorizer_valid, - session_key, &authorizer_challenge) || - !authorizer_valid) { - connection->lock.lock(); - - ldout(cct, 0) << __func__ << " got bad authorizer, auth_reply_len=" - << authorizer_reply.length() << dendl; - session_security.reset(); - AuthBadAuthFrame bad_auth(EPERM, "Bad Authorizer"); - bufferlist &bl = bad_auth.get_buffer(); - return WRITE(bl, handle_auth_bad_auth_write); + if (auth_method == CEPH_AUTH_CEPHX) { + return handle_cephx_auth(auth_more.auth_payload()); + } else { + ceph_abort("Auth method %d not implemented", auth_method); } - - connection->lock.lock(); - - session_security.reset(get_auth_session_handler( - cct, auth_method, session_key, - CEPH_FEATURE_MSG_AUTH | CEPH_FEATURE_CEPHX_V2)); - - AuthDoneFrame auth_done(0, authorizer_reply); - bufferlist &bl = auth_done.get_buffer(); - return WRITE(bl, handle_auth_done_write); } else { ceph_abort(); } @@ -935,18 +1268,6 @@ CtPtr ProtocolV2::handle_auth_more(char *payload, uint32_t length) { return nullptr; } -CtPtr ProtocolV2::handle_auth_more_write(int r) { - ldout(cct, 20) << __func__ << " r=" << r << dendl; - - if (r < 0) { - ldout(cct, 1) << __func__ << " auth more write failed r=" << r << " (" - << cpp_strerror(r) << ")" << dendl; - return _fault(); - } - - return CONTINUE(read_frame); -} - CtPtr ProtocolV2::handle_ident(char *payload, uint32_t length) { if (state == CONNECTING) { return handle_server_ident(payload, length); @@ -1185,7 +1506,7 @@ CtPtr ProtocolV2::read_message_data_prepare() { if (data_len) { // get a buffer - map >::iterator p = + map>::iterator p = connection->rx_buffers.find(current_header.tid); if (p != connection->rx_buffers.end()) { ldout(cct, 10) << __func__ << " seleting rx buffer v " << p->second.second @@ -1406,16 +1727,16 @@ CtPtr ProtocolV2::handle_message_complete() { CtPtr ProtocolV2::handle_keepalive2(char *payload, uint32_t length) { ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; - KeepAliveFrame keepalive_frame(payload, length); + KeepAliveFrame keepalive_frame(this, payload, length); ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl; - utime_t kp_t = utime_t(keepalive_frame.timestamp); connection->write_lock.lock(); - append_keepalive_ack(kp_t); + append_keepalive_ack(keepalive_frame.timestamp()); connection->write_lock.unlock(); - ldout(cct, 20) << __func__ << " got KEEPALIVE2 " << kp_t << dendl; + ldout(cct, 20) << __func__ << " got KEEPALIVE2 " + << keepalive_frame.timestamp() << dendl; connection->set_last_keepalive(ceph_clock_now()); if (is_connected()) { @@ -1428,8 +1749,8 @@ CtPtr ProtocolV2::handle_keepalive2(char *payload, uint32_t length) { CtPtr ProtocolV2::handle_keepalive2_ack(char *payload, uint32_t length) { ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; - KeepAliveFrame keepalive_ack_frame(payload, length); - connection->set_last_keepalive_ack(utime_t(keepalive_ack_frame.timestamp)); + KeepAliveFrame keepalive_ack_frame(this, payload, length); + connection->set_last_keepalive_ack(keepalive_ack_frame.timestamp()); ldout(cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl; return CONTINUE(read_frame); @@ -1438,8 +1759,8 @@ CtPtr ProtocolV2::handle_keepalive2_ack(char *payload, uint32_t length) { CtPtr ProtocolV2::handle_message_ack(char *payload, uint32_t length) { ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; - AckFrame ack(payload, length); - handle_message_ack(ack.seq); + AckFrame ack(this, payload, length); + handle_message_ack(ack.seq()); return CONTINUE(read_frame); } @@ -1449,6 +1770,8 @@ CtPtr ProtocolV2::start_client_banner_exchange() { ldout(cct, 20) << __func__ << dendl; state = CONNECTING; + global_seq = messenger->get_global_seq(); + return _banner_exchange(CONTINUATION(post_client_banner_exchange)); } @@ -1461,7 +1784,7 @@ CtPtr ProtocolV2::post_client_banner_exchange() { return send_auth_request(); } -CtPtr ProtocolV2::send_auth_request(std::vector allowed_methods) { +CtPtr ProtocolV2::send_auth_request(std::vector &allowed_methods) { ldout(cct, 20) << __func__ << dendl; if (!authorizer) { @@ -1485,7 +1808,7 @@ CtPtr ProtocolV2::send_auth_request(std::vector allowed_methods) { << connection->peer_type << dendl; AuthRequestFrame authFrame(auth_method); bufferlist &bl = authFrame.get_buffer(); - return WRITE(bl, handle_auth_request_write); + return WRITE(bl, "auth request", read_frame); } auth_method = authorizer->protocol; @@ -1505,30 +1828,25 @@ CtPtr ProtocolV2::send_auth_request(std::vector allowed_methods) { AuthRequestFrame authFrame(auth_method, authorizer->bl); bufferlist &bl = authFrame.get_buffer(); - return WRITE(bl, handle_auth_request_write); -} - -CtPtr ProtocolV2::handle_auth_request_write(int r) { - ldout(cct, 20) << __func__ << " r=" << r << dendl; - - if (r < 0) { - ldout(cct, 1) << __func__ << " auth request write failed r=" << r << " (" - << cpp_strerror(r) << ")" << dendl; - return _fault(); - } - - return CONTINUE(read_frame); + return WRITE(bl, "auth request", read_frame); } CtPtr ProtocolV2::handle_auth_bad_method(char *payload, uint32_t length) { ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; AuthBadMethodFrame bad_method(payload, length); - ldout(cct, 1) << __func__ << " auth method=" << bad_method.method - << " rejected, allowed methods=" << bad_method.allowed_methods + ldout(cct, 1) << __func__ << " auth method=" << bad_method.method() + << " rejected, allowed methods=" << bad_method.allowed_methods() << dendl; - return send_auth_request(bad_method.allowed_methods); + if (got_bad_method == bad_method.allowed_methods().size()) { + ldout(cct, 1) << __func__ << " too many attempts, closing connection" + << dendl; + return _fault(); + } + got_bad_method++; + + return send_auth_request(bad_method.allowed_methods()); } CtPtr ProtocolV2::handle_auth_bad_auth(char *payload, uint32_t length) { @@ -1536,10 +1854,12 @@ CtPtr ProtocolV2::handle_auth_bad_auth(char *payload, uint32_t length) { AuthBadAuthFrame bad_auth(payload, length); ldout(cct, 1) << __func__ << " authentication failed" - << " error code=" << bad_auth.error_code - << " error message=" << bad_auth.error_msg << dendl; + << " error code=" << bad_auth.error_code() + << " error message=" << bad_auth.error_msg() << dendl; if (got_bad_auth) { + ldout(cct, 1) << __func__ << " too many attempts, closing connection" + << dendl; return _fault(); } @@ -1553,7 +1873,7 @@ CtPtr ProtocolV2::handle_auth_bad_auth(char *payload, uint32_t length) { AuthRequestFrame authFrame(auth_method, authorizer->bl); bufferlist &bl = authFrame.get_buffer(); - return WRITE(bl, handle_auth_request_write); + return WRITE(bl, "auth request", read_frame); } CtPtr ProtocolV2::handle_auth_done(char *payload, uint32_t length) { @@ -1562,7 +1882,7 @@ CtPtr ProtocolV2::handle_auth_done(char *payload, uint32_t length) { AuthDoneFrame auth_done(payload, length); if (authorizer) { - auto iter = auth_done.auth_payload.cbegin(); + auto iter = auth_done.auth_payload().cbegin(); if (!authorizer->verify_reply(iter)) { ldout(cct, 0) << __func__ << " failed verifying authorize reply" << dendl; return _fault(); @@ -1570,7 +1890,8 @@ CtPtr ProtocolV2::handle_auth_done(char *payload, uint32_t length) { } ldout(cct, 1) << __func__ << " authentication done," - << " flags=" << auth_done.flags << dendl; + << " flags=" << std::hex << auth_done.flags() << std::dec + << dendl; if (authorizer) { ldout(cct, 10) << __func__ << " setting up session_security with auth " @@ -1586,6 +1907,8 @@ CtPtr ProtocolV2::handle_auth_done(char *payload, uint32_t length) { session_security.reset(); } + auth_flags = auth_done.flags(); + if (!cookie) { ceph_assert(connect_seq == 0); return send_client_ident(); @@ -1603,7 +1926,7 @@ CtPtr ProtocolV2::send_client_ident() { flags |= CEPH_MSG_CONNECT_LOSSY; } - ClientIdentFrame client_ident(messenger->get_myaddrs(), + ClientIdentFrame client_ident(this, messenger->get_myaddrs(), messenger->get_myname().num(), global_seq, connection->policy.features_supported, connection->policy.features_required, flags); @@ -1618,54 +1941,30 @@ CtPtr ProtocolV2::send_client_ident() { << " flags=" << flags << std::dec << dendl; bufferlist &bl = client_ident.get_buffer(); - return WRITE(bl, handle_client_ident_write); -} - -CtPtr ProtocolV2::handle_client_ident_write(int r) { - ldout(cct, 20) << __func__ << " r=" << r << dendl; - - if (r < 0) { - ldout(cct, 1) << __func__ << " client ident write failed r=" << r << " (" - << cpp_strerror(r) << ")" << dendl; - return _fault(); - } - - return CONTINUE(read_frame); + return WRITE(bl, "client ident", read_frame); } CtPtr ProtocolV2::send_reconnect() { ldout(cct, 20) << __func__ << dendl; - ReconnectFrame reconnect(messenger->get_myaddrs(), cookie, global_seq, + ReconnectFrame reconnect(this, messenger->get_myaddrs(), cookie, global_seq, connect_seq, in_seq); ldout(cct, 5) << __func__ << " reconnect to session: cookie=" << cookie << " gs=" << global_seq << " cs=" << connect_seq << " ms=" << in_seq << dendl; bufferlist &bl = reconnect.get_buffer(); - return WRITE(bl, handle_reconnect_write); -} - -CtPtr ProtocolV2::handle_reconnect_write(int r) { - ldout(cct, 20) << __func__ << " r=" << r << dendl; - - if (r < 0) { - ldout(cct, 1) << __func__ << " reconnect write failed r=" << r << " (" - << cpp_strerror(r) << ")" << dendl; - return _fault(); - } - - return CONTINUE(read_frame); + return WRITE(bl, "reconnect", read_frame); } CtPtr ProtocolV2::handle_ident_missing_features(char *payload, uint32_t length) { ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; - IdentMissingFeaturesFrame ident_missing(payload, length); + IdentMissingFeaturesFrame ident_missing(this, payload, length); lderr(cct) << __func__ << " client does not support all server features: " << std::hex - << ident_missing.features << std::dec << dendl; + << ident_missing.features() << std::dec << dendl; return _fault(); } @@ -1682,11 +1981,11 @@ CtPtr ProtocolV2::handle_session_reset() { CtPtr ProtocolV2::handle_session_retry(char *payload, uint32_t length) { ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; - RetryFrame retry(payload, length); - connect_seq = retry.connect_seq + 1; + RetryFrame retry(this, payload, length); + connect_seq = retry.connect_seq() + 1; ldout(cct, 1) << __func__ - << " received session retry connect_seq=" << retry.connect_seq + << " received session retry connect_seq=" << retry.connect_seq() << ", inc to cs=" << connect_seq << dendl; return send_reconnect(); @@ -1695,11 +1994,11 @@ CtPtr ProtocolV2::handle_session_retry(char *payload, uint32_t length) { CtPtr ProtocolV2::handle_session_retry_global(char *payload, uint32_t length) { ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; - RetryGlobalFrame retry(payload, length); - global_seq = messenger->get_global_seq(retry.global_seq); + RetryGlobalFrame retry(this, payload, length); + global_seq = messenger->get_global_seq(retry.global_seq()); ldout(cct, 1) << __func__ << " received session retry global global_seq=" - << retry.global_seq << ", choose new gs=" << global_seq + << retry.global_seq() << ", choose new gs=" << global_seq << dendl; return send_reconnect(); @@ -1715,11 +2014,12 @@ CtPtr ProtocolV2::handle_wait() { CtPtr ProtocolV2::handle_reconnect_ok(char *payload, uint32_t length) { ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; - ReconnectOkFrame reconnect_ok(payload, length); + ReconnectOkFrame reconnect_ok(this, payload, length); ldout(cct, 5) << __func__ - << " reconnect accepted: sms=" << reconnect_ok.msg_seq << dendl; + << " reconnect accepted: sms=" << reconnect_ok.msg_seq() + << dendl; - out_seq = discard_requeued_up_to(out_seq, reconnect_ok.msg_seq); + out_seq = discard_requeued_up_to(out_seq, reconnect_ok.msg_seq()); backoff = utime_t(); ldout(cct, 10) << __func__ << " reconnect success " << connect_seq @@ -1739,25 +2039,26 @@ CtPtr ProtocolV2::handle_reconnect_ok(char *payload, uint32_t length) { CtPtr ProtocolV2::handle_server_ident(char *payload, uint32_t length) { ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; - ServerIdentFrame server_ident(payload, length); + ServerIdentFrame server_ident(this, payload, length); ldout(cct, 5) << __func__ << " received server identification: " - << "addrs=" << server_ident.addrs << " gid=" << server_ident.gid - << " global_seq=" << server_ident.global_seq + << "addrs=" << server_ident.addrs() + << " gid=" << server_ident.gid() + << " global_seq=" << server_ident.global_seq() << " features_supported=" << std::hex - << server_ident.supported_features - << " features_required=" << server_ident.required_features - << " flags=" << server_ident.flags << " cookie=" << std::dec - << server_ident.cookie << dendl; + << server_ident.supported_features() + << " features_required=" << server_ident.required_features() + << " flags=" << server_ident.flags() << " cookie=" << std::dec + << server_ident.cookie() << dendl; - cookie = server_ident.cookie; + cookie = server_ident.cookie(); - connection->set_peer_addrs(server_ident.addrs); - connection->peer_global_id = server_ident.gid; - connection->set_features(server_ident.supported_features & + connection->set_peer_addrs(server_ident.addrs()); + connection->peer_global_id = server_ident.gid(); + connection->set_features(server_ident.supported_features() & connection->policy.features_supported); - peer_global_seq = server_ident.global_seq; + peer_global_seq = server_ident.global_seq(); - connection->policy.lossy = server_ident.flags & CEPH_MSG_CONNECT_LOSSY; + connection->policy.lossy = server_ident.flags() & CEPH_MSG_CONNECT_LOSSY; backoff = utime_t(); ldout(cct, 10) << __func__ << " connect success " << connect_seq @@ -1792,51 +2093,24 @@ CtPtr ProtocolV2::post_server_banner_exchange() { return CONTINUE(read_frame); } -CtPtr ProtocolV2::handle_auth_request(char *payload, uint32_t length) { - ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; - - AuthRequestFrame auth_request(payload, length); - - ldout(cct, 10) << __func__ << " AuthRequest(method=" << auth_request.method - << ", auth_len=" << auth_request.len << ")" << dendl; - - std::vector allowed_methods; - messenger->ms_deliver_get_auth_allowed_methods(connection->peer_type, - allowed_methods); - - bool found = std::find(allowed_methods.begin(), allowed_methods.end(), - auth_request.method) != allowed_methods.end(); - if (!found) { - ldout(cct, 1) << __func__ << " auth method=" << auth_request.method - << " not allowed" << dendl; - AuthBadMethodFrame bad_method(auth_request.method, allowed_methods); - bufferlist &bl = bad_method.get_buffer(); - return WRITE(bl, handle_auth_bad_method_write); - } - - ldout(cct, 10) << __func__ << " auth method=" << auth_request.method - << " accepted" << dendl; - - auth_method = auth_request.method; +CtPtr ProtocolV2::handle_cephx_auth(bufferlist &auth_payload) { + ldout(cct, 20) << __func__ << dendl; - if (auth_request.method == CEPH_AUTH_NONE) { - ldout(cct, 1) << __func__ << " proceeding without authentication" << dendl; + ceph_assert(auth_method == CEPH_AUTH_CEPHX); - session_security.reset(); - bufferlist empty_bl; - AuthDoneFrame auth_done(0, empty_bl); - bufferlist &bl = auth_done.get_buffer(); - return WRITE(bl, handle_auth_done_write); - } + ldout(cct, 15) << __func__ + << " authorizer payload len=" << auth_payload.length() + << dendl; - connection->lock.unlock(); bool authorizer_valid; bufferlist authorizer_reply; bool had_challenge = (bool)authorizer_challenge; + + connection->lock.unlock(); if (!messenger->ms_deliver_verify_authorizer( - connection, connection->peer_type, auth_request.method, - auth_request.auth_payload, authorizer_reply, authorizer_valid, - session_key, &authorizer_challenge) || + connection, connection->peer_type, auth_method, auth_payload, + authorizer_reply, authorizer_valid, session_key, + &authorizer_challenge) || !authorizer_valid) { connection->lock.lock(); @@ -1844,14 +2118,13 @@ CtPtr ProtocolV2::handle_auth_request(char *payload, uint32_t length) { ldout(cct, 10) << __func__ << " challenging authorizer" << dendl; ceph_assert(authorizer_reply.length()); AuthMoreFrame more(authorizer_reply); - return WRITE(more.get_buffer(), handle_auth_more_write); + return WRITE(more.get_buffer(), "auth more", read_frame); } else { ldout(cct, 0) << __func__ << " got bad authorizer, auth_reply_len=" << authorizer_reply.length() << dendl; session_security.reset(); AuthBadAuthFrame bad_auth(EPERM, "Bad Authorizer"); - bufferlist &bl = bad_auth.get_buffer(); - return WRITE(bl, handle_auth_bad_auth_write); + return WRITE(bad_auth.get_buffer(), "bad auth", read_frame); } } @@ -1861,85 +2134,106 @@ CtPtr ProtocolV2::handle_auth_request(char *payload, uint32_t length) { get_auth_session_handler(cct, auth_method, session_key, CEPH_FEATURE_MSG_AUTH | CEPH_FEATURE_CEPHX_V2)); - AuthDoneFrame auth_done(0, authorizer_reply); - bufferlist &bl = auth_done.get_buffer(); - return WRITE(bl, handle_auth_done_write); + if (cct->_conf.get_val("ms_msgr2_sign_messages")) { + auth_flags |= static_cast(AuthFlag::SIGNED); + } + if (cct->_conf.get_val("ms_msgr2_encrypt_messages")) { + auth_flags |= static_cast(AuthFlag::ENCRYPTED); + } + + ldout(cct, 1) << __func__ << " authentication done," + << " flags=" << std::hex << auth_flags << std::dec << dendl; + + AuthDoneFrame auth_done(auth_flags, authorizer_reply); + return WRITE(auth_done.get_buffer(), "auth done", read_frame); } -CtPtr ProtocolV2::handle_auth_bad_method_write(int r) { - ldout(cct, 20) << __func__ << " r=" << r << dendl; +CtPtr ProtocolV2::handle_auth_request(char *payload, uint32_t length) { + ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; - if (r < 0) { - ldout(cct, 1) << __func__ << " auth bad method write failed r=" << r << " (" - << cpp_strerror(r) << ")" << dendl; - return _fault(); - } + AuthRequestFrame auth_request(payload, length); - return CONTINUE(read_frame); -} + ldout(cct, 10) << __func__ << " AuthRequest(method=" << auth_request.method() + << ", auth_len=" << auth_request.auth_payload().length() << ")" + << dendl; -CtPtr ProtocolV2::handle_auth_bad_auth_write(int r) { - ldout(cct, 20) << __func__ << " r=" << r << dendl; + std::vector allowed_methods; + messenger->ms_deliver_get_auth_allowed_methods(connection->peer_type, + allowed_methods); - if (r < 0) { - ldout(cct, 1) << __func__ << " auth bad auth write failed r=" << r << " (" - << cpp_strerror(r) << ")" << dendl; - return _fault(); + bool found = std::find(allowed_methods.begin(), allowed_methods.end(), + auth_request.method()) != allowed_methods.end(); + if (!found) { + ldout(cct, 1) << __func__ << " auth method=" << auth_request.method() + << " not allowed" << dendl; + AuthBadMethodFrame bad_method(auth_request.method(), allowed_methods); + bufferlist &bl = bad_method.get_buffer(); + return WRITE(bl, "bad auth method", read_frame); } - return CONTINUE(read_frame); -} + ldout(cct, 10) << __func__ << " auth method=" << auth_request.method() + << " accepted" << dendl; -CtPtr ProtocolV2::handle_auth_done_write(int r) { - ldout(cct, 20) << __func__ << " r=" << r << dendl; + auth_method = auth_request.method(); - if (r < 0) { - ldout(cct, 1) << __func__ << " auth done write failed r=" << r << " (" - << cpp_strerror(r) << ")" << dendl; - return _fault(); + if (auth_method == CEPH_AUTH_NONE) { + ldout(cct, 1) << __func__ << " proceeding without authentication" << dendl; + + session_security.reset(); + bufferlist empty_bl; + AuthDoneFrame auth_done(0, empty_bl); + return WRITE(auth_done.get_buffer(), "auth done", read_frame); } - return CONTINUE(read_frame); + if (auth_method == CEPH_AUTH_CEPHX) { + return handle_cephx_auth(auth_request.auth_payload()); + } + + lderr(cct) << __func__ << " auth method " << auth_method << " not implemented" + << dendl; + ceph_abort(); + return nullptr; } CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) { ldout(cct, 20) << __func__ << " payload_len=" << std::dec << length << dendl; - ClientIdentFrame client_ident(payload, length); + ClientIdentFrame client_ident(this, payload, length); ldout(cct, 5) << __func__ << " received client identification: " - << "addrs=" << client_ident.addrs << " gid=" << client_ident.gid - << " global_seq=" << client_ident.global_seq + << "addrs=" << client_ident.addrs() + << " gid=" << client_ident.gid() + << " global_seq=" << client_ident.global_seq() << " features_supported=" << std::hex - << client_ident.supported_features - << " features_required=" << client_ident.required_features - << " flags=" << client_ident.flags << std::dec << dendl; + << client_ident.supported_features() + << " features_required=" << client_ident.required_features() + << " flags=" << client_ident.flags() << std::dec << dendl; - if (client_ident.addrs.empty()) { + if (client_ident.addrs().empty()) { connection->set_peer_addr(connection->target_addr); } else { // Should we check if one of the ident.addrs match connection->target_addr // as we do in ProtocolV1? - connection->set_peer_addrs(client_ident.addrs); - connection->target_addr = client_ident.addrs.msgr2_addr(); + connection->set_peer_addrs(client_ident.addrs()); + connection->target_addr = client_ident.addrs().msgr2_addr(); } uint64_t feat_missing = connection->policy.features_required & - ~(uint64_t)client_ident.supported_features; + ~(uint64_t)client_ident.supported_features(); if (feat_missing) { ldout(cct, 1) << __func__ << " peer missing required features " << std::hex << feat_missing << std::dec << dendl; - IdentMissingFeaturesFrame ident_missing_features(feat_missing); + IdentMissingFeaturesFrame ident_missing_features(this, feat_missing); bufferlist &bl = ident_missing_features.get_buffer(); - return WRITE(bl, handle_ident_missing_features_write); + return WRITE(bl, "ident missing features", read_frame); } connection_features = - client_ident.supported_features & connection->policy.features_supported; + client_ident.supported_features() & connection->policy.features_supported; state = ACCEPTING_SESSION; - peer_global_seq = client_ident.global_seq; + peer_global_seq = client_ident.global_seq(); // Looks good so far, let's check if there is already an existing connection // to this peer. @@ -1966,36 +2260,24 @@ CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) { return send_server_ident(); } -CtPtr ProtocolV2::handle_ident_missing_features_write(int r) { - ldout(cct, 20) << __func__ << " r=" << r << dendl; - - if (r < 0) { - ldout(cct, 1) << __func__ << " ident missing features write failed r=" << r - << " (" << cpp_strerror(r) << ")" << dendl; - return _fault(); - } - - return CONTINUE(read_frame); -} - CtPtr ProtocolV2::handle_reconnect(char *payload, uint32_t length) { ldout(cct, 20) << __func__ << " payload_len=" << std::dec << length << dendl; - ReconnectFrame reconnect(payload, length); + ReconnectFrame reconnect(this, payload, length); ldout(cct, 5) << __func__ - << " received reconnect: cookie=" << reconnect.cookie - << " gs=" << reconnect.global_seq - << " cs=" << reconnect.connect_seq - << " ms=" << reconnect.msg_seq << dendl; + << " received reconnect: cookie=" << reconnect.cookie() + << " gs=" << reconnect.global_seq() + << " cs=" << reconnect.connect_seq() + << " ms=" << reconnect.msg_seq() << dendl; - if (reconnect.addrs.empty()) { + if (reconnect.addrs().empty()) { connection->set_peer_addr(connection->target_addr); } else { // Should we check if one of the ident.addrs match connection->target_addr // as we do in ProtocolV1? - connection->set_peer_addrs(reconnect.addrs); - connection->target_addr = reconnect.addrs.msgr2_addr(); + connection->set_peer_addrs(reconnect.addrs()); + connection->target_addr = reconnect.addrs().msgr2_addr(); } connection->lock.unlock(); @@ -2020,7 +2302,7 @@ CtPtr ProtocolV2::handle_reconnect(char *payload, uint32_t length) { // session ldout(cct, 0) << __func__ << " no existing connection exists, reseting client" << dendl; - return WRITE(bl, handle_session_reset_write); + return WRITE(bl, "session reset", read_frame); } std::lock_guard l(existing->lock); @@ -2034,79 +2316,56 @@ CtPtr ProtocolV2::handle_reconnect(char *payload, uint32_t length) { if (exproto->state == CLOSED) { ldout(cct, 5) << __func__ << " existing " << existing << " already closed. Reseting client" << dendl; - return WRITE(bl, handle_session_reset_write); + return WRITE(bl, "session reset", read_frame); } if (exproto->replacing) { ldout(cct, 1) << __func__ << " existing racing replace happened while replacing." << " existing=" << existing << dendl; - RetryGlobalFrame retry(exproto->peer_global_seq); + RetryGlobalFrame retry(this, exproto->peer_global_seq); bufferlist &bl = retry.get_buffer(); - return WRITE(bl, handle_session_retry_write); + return WRITE(bl, "session retry", read_frame); } if (!exproto->cookie) { // server connection was reseted, reset client ldout(cct, 5) << __func__ << " no cookie set, reseting client" << dendl; - return WRITE(bl, handle_session_reset_write); - } else if (exproto->cookie != reconnect.cookie) { + return WRITE(bl, "session reset", read_frame); + } else if (exproto->cookie != reconnect.cookie()) { ldout(cct, 5) << __func__ << " cookie mismatch sc=" << exproto->cookie - << " cc=" << reconnect.cookie << ", reseting client" << dendl; - return WRITE(bl, handle_session_reset_write); + << " cc=" << reconnect.cookie() << ", reseting client" + << dendl; + return WRITE(bl, "session reset", read_frame); } - if (exproto->peer_global_seq > reconnect.global_seq) { + if (exproto->peer_global_seq > reconnect.global_seq()) { ldout(cct, 5) << __func__ << " stale global_seq: sgs=" << exproto->peer_global_seq - << " cgs=" << reconnect.global_seq + << " cgs=" << reconnect.global_seq() << ", ask client to retry global" << dendl; - RetryGlobalFrame retry(exproto->peer_global_seq); + RetryGlobalFrame retry(this, exproto->peer_global_seq); bufferlist &bl = retry.get_buffer(); - return WRITE(bl, handle_session_retry_write); + return WRITE(bl, "session retry", read_frame); } - if (exproto->connect_seq >= reconnect.connect_seq) { + if (exproto->connect_seq >= reconnect.connect_seq()) { ldout(cct, 5) << __func__ << " stale connect_seq scs=" << exproto->connect_seq - << " ccs=" << reconnect.connect_seq + << " ccs=" << reconnect.connect_seq() << " , ask client to retry" << dendl; - RetryFrame retry(exproto->connect_seq); + RetryFrame retry(this, exproto->connect_seq); bufferlist &bl = retry.get_buffer(); - return WRITE(bl, handle_session_retry_write); + return WRITE(bl, "session retry", read_frame); } // everything looks good - exproto->connect_seq = reconnect.connect_seq; - exproto->message_seq = reconnect.msg_seq; + exproto->connect_seq = reconnect.connect_seq(); + exproto->message_seq = reconnect.msg_seq(); return reuse_connection(existing, exproto, true); } -CtPtr ProtocolV2::handle_session_reset_write(int r) { - ldout(cct, 20) << __func__ << " r=" << r << dendl; - - if (r < 0) { - ldout(cct, 1) << __func__ << " session reset write failed r=" << r << " (" - << cpp_strerror(r) << ")" << dendl; - return _fault(); - } - - return CONTINUE(read_frame); -} - -CtPtr ProtocolV2::handle_session_retry_write(int r) { - ldout(cct, 20) << __func__ << " r=" << r << dendl; - - if (r < 0) { - ldout(cct, 1) << __func__ << " session retry write failed r=" << r << " (" - << cpp_strerror(r) << ")" << dendl; - return _fault(); - } - - return CONTINUE(read_frame); -} - CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) { ldout(cct, 20) << __func__ << " existing=" << existing << dendl; @@ -2130,7 +2389,7 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) { << " existing=" << existing << dendl; WaitFrame wait; bufferlist &bl = wait.get_buffer(); - return WRITE(bl, handle_wait_write); + return WRITE(bl, "wait", read_frame); } if (existing->policy.lossy) { @@ -2180,20 +2439,8 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) { messenger->get_myaddrs().msgr2_addr()); WaitFrame wait; bufferlist &bl = wait.get_buffer(); - return WRITE(bl, handle_wait_write); - } -} - -CtPtr ProtocolV2::handle_wait_write(int r) { - ldout(cct, 20) << __func__ << " r=" << r << dendl; - - if (r < 0) { - ldout(cct, 1) << __func__ << " wait write failed r=" << r << " (" - << cpp_strerror(r) << ")" << dendl; - return _fault(); + return WRITE(bl, "wait", read_frame); } - - return CONTINUE(read_frame); } CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing, @@ -2230,6 +2477,10 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing, << dendl; exproto->can_write = false; exproto->replacing = true; + exproto->session_security = session_security; + exproto->auth_method = auth_method; + exproto->session_key = session_key; + exproto->authorizer_challenge = std::move(authorizer_challenge); existing->state_offset = 0; // avoid previous thread modify event exproto->state = NONE; @@ -2285,9 +2536,9 @@ CtPtr ProtocolV2::reuse_connection(AsyncConnectionRef existing, existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE, existing->read_handler); if (!reconnect) { - CONTINUATION_RUN2(exproto, exproto->send_server_ident()) + exproto->run_continuation(exproto->send_server_ident()); } else { - CONTINUATION_RUN2(exproto, exproto->send_reconnect_ok()) + exproto->run_continuation(exproto->send_reconnect_ok()); } }; if (existing->center->in_thread()) @@ -2320,7 +2571,7 @@ CtPtr ProtocolV2::send_server_ident() { } uint64_t gs = messenger->get_global_seq(); - ServerIdentFrame server_ident( + ServerIdentFrame server_ident(this, messenger->get_myaddrs(), messenger->get_myname().num(), gs, connection->policy.features_supported, connection->policy.features_required, flags, cookie); @@ -2367,18 +2618,11 @@ CtPtr ProtocolV2::send_server_ident() { messenger->ms_deliver_handle_fast_accept(connection); bufferlist &bl = server_ident.get_buffer(); - return WRITE(bl, handle_server_ident_write); + return WRITE(bl, "server ident", server_ready); } -CtPtr ProtocolV2::handle_server_ident_write(int r) { - ldout(cct, 20) << __func__ << " r=" << r << dendl; - - if (r < 0) { - ldout(cct, 1) << __func__ << " server ident write failed r=" << r << " (" - << cpp_strerror(r) << ")" << dendl; - connection->inject_delay(); - return _fault(); - } +CtPtr ProtocolV2::server_ready() { + ldout(cct, 20) << __func__ << dendl; if (connection->delay_state) { ceph_assert(connection->delay_state->ready()); @@ -2393,7 +2637,7 @@ CtPtr ProtocolV2::send_reconnect_ok() { out_seq = discard_requeued_up_to(out_seq, message_seq); uint64_t ms = in_seq; - ReconnectOkFrame reconnect_ok(ms); + ReconnectOkFrame reconnect_ok(this, ms); ldout(cct, 5) << __func__ << " sending reconnect_ok: msg_seq=" << ms << dendl; @@ -2428,22 +2672,5 @@ CtPtr ProtocolV2::send_reconnect_ok() { messenger->ms_deliver_handle_fast_accept(connection); bufferlist &bl = reconnect_ok.get_buffer(); - return WRITE(bl, handle_reconnect_ok_write); -} - -CtPtr ProtocolV2::handle_reconnect_ok_write(int r) { - ldout(cct, 20) << __func__ << " r=" << r << dendl; - - if (r < 0) { - ldout(cct, 1) << __func__ << " reconnect ok write failed r=" << r << " (" - << cpp_strerror(r) << ")" << dendl; - connection->inject_delay(); - return _fault(); - } - - if (connection->delay_state) { - ceph_assert(connection->delay_state->ready()); - } - - return ready(); + return WRITE(bl, "reconnect ok", server_ready); } diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index be44f379bf4..e46323645f8 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -45,8 +45,9 @@ private: return statenames[state]; } +public: enum class Tag : uint32_t { - AUTH_REQUEST, + AUTH_REQUEST = 1, AUTH_BAD_METHOD, AUTH_BAD_AUTH, AUTH_MORE, @@ -65,384 +66,8 @@ private: ACK }; - struct Frame { - uint32_t tag; - bufferlist payload; - bufferlist frame_buffer; - - Frame(Tag tag) : tag(static_cast(tag)) { - encode(this->tag, payload, 0); - } - - Frame() {} - - bufferlist &get_buffer() { - if (frame_buffer.length()) { - return frame_buffer; - } - encode((uint32_t)payload.length(), frame_buffer, 0); - frame_buffer.claim_append(payload); - return frame_buffer; - } - }; - - struct SignedEncryptedFrame : public Frame { - SignedEncryptedFrame(Tag tag) : Frame(tag) {} - SignedEncryptedFrame() : Frame() {} - bufferlist &get_buffer() { return Frame::get_buffer(); } - }; - - struct AuthRequestFrame : public Frame { - uint32_t method; - uint32_t len; - bufferlist auth_payload; - - AuthRequestFrame(uint32_t method, bufferlist &auth_payload) - : Frame(Tag::AUTH_REQUEST) { - encode(method, payload, 0); - encode(auth_payload.length(), payload, 0); - payload.claim_append(auth_payload); - } - - AuthRequestFrame(uint32_t method) : Frame(Tag::AUTH_REQUEST) { - encode(method, payload, 0); - encode((uint32_t)0, payload, 0); - } - - AuthRequestFrame(char *payload, uint32_t length) : Frame() { - method = *(uint32_t *)payload; - len = *(uint32_t *)(payload + sizeof(uint32_t)); - ceph_assert((length - (sizeof(uint32_t) * 2)) == len); - auth_payload.append((payload + (sizeof(uint32_t) * 2)), len); - } - }; - - struct AuthBadMethodFrame : public Frame { - uint32_t method; - std::vector allowed_methods; - - AuthBadMethodFrame(uint32_t method, std::vector methods) - : Frame(Tag::AUTH_BAD_METHOD) { - encode(method, payload, 0); - encode((uint32_t)methods.size(), payload, 0); - for (const auto &a_meth : methods) { - encode(a_meth, payload, 0); - } - } - - AuthBadMethodFrame(char *payload, uint32_t length) : Frame() { - method = *(uint32_t *)payload; - uint32_t num_methods = *(uint32_t *)(payload + sizeof(uint32_t)); - payload += sizeof(uint32_t) * 2; - for (unsigned i = 0; i < num_methods; ++i) { - allowed_methods.push_back( - *(uint32_t *)(payload + sizeof(uint32_t) * i)); - } - } - }; - - struct AuthBadAuthFrame : public Frame { - uint32_t error_code; - std::string error_msg; - - AuthBadAuthFrame(uint32_t error_code, std::string error_msg) - : Frame(Tag::AUTH_BAD_AUTH) { - encode(error_code, payload, 0); - encode(error_msg, payload, 0); - } - - AuthBadAuthFrame(char *payload, uint32_t length) : Frame() { - error_code = *(uint32_t *)payload; - uint32_t len = *(uint32_t *)(payload + sizeof(uint32_t)); - error_msg = std::string(payload + sizeof(uint32_t) * 2, len); - } - }; - - struct AuthMoreFrame : public Frame { - bufferlist auth_payload; - - AuthMoreFrame(bufferlist &auth_payload) : Frame(Tag::AUTH_MORE) { - encode(auth_payload.length(), payload, 0); - payload.claim_append(auth_payload); - } - - AuthMoreFrame(char *payload, uint32_t length) : Frame() { - uint32_t len = *(uint32_t *)payload; - ceph_assert((length - sizeof(uint32_t)) == len); - auth_payload.append(payload + sizeof(uint32_t), len); - } - }; - - struct AuthDoneFrame : public Frame { - uint64_t flags; - bufferlist auth_payload; - - AuthDoneFrame(uint64_t flags, bufferlist &auth_payload) - : Frame(Tag::AUTH_DONE) { - encode(flags, payload, 0); - encode(auth_payload.length(), payload, 0); - payload.claim_append(auth_payload); - } - - AuthDoneFrame(char *payload, uint32_t length) : Frame() { - flags = *(uint64_t *)payload; - payload += sizeof(uint64_t); - uint32_t len = *(uint32_t *)payload; - ceph_assert((length - sizeof(uint32_t) - sizeof(uint64_t)) == len); - auth_payload.append(payload + sizeof(uint32_t), len); - } - }; - - struct ClientIdentFrame : public SignedEncryptedFrame { - entity_addrvec_t addrs; - int64_t gid; - uint64_t global_seq; - uint64_t supported_features; // CEPH_FEATURE_* - uint64_t required_features; // CEPH_FEATURE_* - uint64_t flags; // CEPH_MSG_CONNECT_* - - ClientIdentFrame(const entity_addrvec_t &addrs, int64_t gid, - uint64_t global_seq, uint64_t supported_features, - uint64_t required_features, uint64_t flags) - : SignedEncryptedFrame(Tag::IDENT) { - encode(addrs, payload, -1ll); - encode(gid, payload, -1ll); - encode(global_seq, payload, -1ll); - encode(supported_features, payload, -1ll); - encode(required_features, payload, -1ll); - encode(flags, payload, -1ll); - } - - ClientIdentFrame(char *payload, uint32_t length) : SignedEncryptedFrame() { - bufferlist bl; - bl.push_back(buffer::create_static(length, payload)); - try { - auto ti = bl.cbegin(); - decode_frame(ti); - } catch (const buffer::error &e) { - } - } - - ClientIdentFrame() : SignedEncryptedFrame() {} - - protected: - void decode_frame(ceph::buffer::list::const_iterator &ti) { - decode(addrs, ti); - decode(gid, ti); - decode(global_seq, ti); - decode(supported_features, ti); - decode(required_features, ti); - decode(flags, ti); - } - }; - - struct ServerIdentFrame : public ClientIdentFrame { - uint64_t cookie; - - ServerIdentFrame(const entity_addrvec_t &addrs, int64_t gid, - uint64_t global_seq, uint64_t supported_features, - uint64_t required_features, uint64_t flags, - uint64_t cookie) - : ClientIdentFrame(addrs, gid, global_seq, supported_features, - required_features, flags) { - encode(cookie, payload, -1ll); - } - - ServerIdentFrame(char *payload, uint32_t length) : ClientIdentFrame() { - bufferlist bl; - bl.push_back(buffer::create_static(length, payload)); - try { - auto ti = bl.cbegin(); - ClientIdentFrame::decode_frame(ti); - decode(cookie, ti); - } catch (const buffer::error &e) { - } - } - }; - - struct ReconnectFrame : public SignedEncryptedFrame { - entity_addrvec_t addrs; - uint64_t cookie; - uint64_t global_seq; - uint64_t connect_seq; - uint64_t msg_seq; - - ReconnectFrame(const entity_addrvec_t &addrs, uint64_t cookie, - uint64_t global_seq, uint64_t connect_seq, uint64_t msg_seq) - : SignedEncryptedFrame(Tag::SESSION_RECONNECT) { - encode(addrs, payload, -1ll); - encode(cookie, payload, 0); - encode(global_seq, payload, 0); - encode(connect_seq, payload, 0); - encode(msg_seq, payload, 0); - } - - ReconnectFrame(char *payload, uint32_t length) : SignedEncryptedFrame() { - bufferlist bl; - bl.push_back(buffer::create_static(length, payload)); - try { - auto ti = bl.cbegin(); - decode(addrs, ti); - decode(cookie, ti); - decode(global_seq, ti); - decode(connect_seq, ti); - decode(msg_seq, ti); - } catch (const buffer::error &e) { - } - } - }; - - struct ResetFrame : public SignedEncryptedFrame { - ResetFrame() : SignedEncryptedFrame(Tag::SESSION_RESET) {} - }; - - struct RetryFrame : public SignedEncryptedFrame { - uint64_t connect_seq; - - RetryFrame(uint64_t connect_seq) - : SignedEncryptedFrame(Tag::SESSION_RETRY) { - encode(connect_seq, payload); - } - - RetryFrame(char *payload, uint32_t length) : SignedEncryptedFrame() { - bufferlist bl; - bl.push_back(buffer::create_static(length, payload)); - try { - auto ti = bl.cbegin(); - decode(connect_seq, ti); - } catch (const buffer::error &e) { - } - } - }; - - struct RetryGlobalFrame : public SignedEncryptedFrame { - uint64_t global_seq; - - RetryGlobalFrame(uint64_t global_seq) - : SignedEncryptedFrame(Tag::SESSION_RETRY_GLOBAL) { - encode(global_seq, payload); - } - - RetryGlobalFrame(char *payload, uint32_t length) : SignedEncryptedFrame() { - bufferlist bl; - bl.push_back(buffer::create_static(length, payload)); - try { - auto ti = bl.cbegin(); - decode(global_seq, ti); - } catch (const buffer::error &e) { - } - } - }; - - struct WaitFrame : public SignedEncryptedFrame { - WaitFrame() : SignedEncryptedFrame(Tag::WAIT) {} - }; - - struct ReconnectOkFrame : public SignedEncryptedFrame { - uint64_t msg_seq; - - ReconnectOkFrame(uint64_t msg_seq) - : SignedEncryptedFrame(Tag::SESSION_RECONNECT_OK) { - encode(msg_seq, payload, 0); - } - - ReconnectOkFrame(char *payload, uint32_t length) : SignedEncryptedFrame() { - bufferlist bl; - bl.push_back(buffer::create_static(length, payload)); - try { - auto ti = bl.cbegin(); - decode(msg_seq, ti); - } catch (const buffer::error &e) { - } - } - }; - - struct IdentMissingFeaturesFrame : public SignedEncryptedFrame { - __le64 features; - - IdentMissingFeaturesFrame(uint64_t features) - : SignedEncryptedFrame(Tag::IDENT_MISSING_FEATURES), - features(features) { - encode(features, payload, -1ll); - } - - IdentMissingFeaturesFrame(char *payload, uint32_t length) - : SignedEncryptedFrame() { - features = *(uint64_t *)payload; - } - }; - - struct MessageFrame : public SignedEncryptedFrame { - const unsigned int ASYNC_COALESCE_THRESHOLD = 256; - - ceph_msg_header2 header2; - - MessageFrame(Message *msg, bufferlist &data, uint64_t ack_seq, - bool calc_crc) - : SignedEncryptedFrame(Tag::MESSAGE) { - ceph_msg_header &header = msg->get_header(); - ceph_msg_footer &footer = msg->get_footer(); - - header2 = ceph_msg_header2{header.seq, header.tid, - header.type, header.priority, - header.version, header.front_len, - header.middle_len, 0, - header.data_len, header.data_off, - ack_seq, footer.front_crc, - footer.middle_crc, footer.data_crc, - footer.flags, header.compat_version, - header.reserved, 0}; - - if (calc_crc) { - header2.header_crc = - ceph_crc32c(0, (unsigned char *)&header2, - sizeof(header2) - sizeof(header2.header_crc)); - } - - payload.append((char *)&header2, sizeof(header2)); - if ((data.length() <= ASYNC_COALESCE_THRESHOLD) && - (data.buffers().size() > 1)) { - for (const auto &pb : data.buffers()) { - payload.append((char *)pb.c_str(), pb.length()); - } - } else { - payload.claim_append(data); - } - } - }; - - struct KeepAliveFrame : public SignedEncryptedFrame { - struct ceph_timespec timestamp; - - KeepAliveFrame() : SignedEncryptedFrame(Tag::KEEPALIVE2) { - struct ceph_timespec ts; - utime_t t = ceph_clock_now(); - t.encode_timeval(&ts); - payload.append((char *)&ts, sizeof(ts)); - } - - KeepAliveFrame(struct ceph_timespec ×tamp) - : SignedEncryptedFrame(Tag::KEEPALIVE2_ACK) { - payload.append((char *)×tamp, sizeof(timestamp)); - } - - KeepAliveFrame(char *payload, uint32_t length) : SignedEncryptedFrame() { - ceph_assert(length == sizeof(timestamp)); - timestamp = *(struct ceph_timespec *)payload; - } - }; - - struct AckFrame : public SignedEncryptedFrame { - uint64_t seq; - - AckFrame(uint64_t seq) : SignedEncryptedFrame(Tag::ACK) { - encode(seq, payload, 0); - } - - AckFrame(char *payload, uint32_t length) : SignedEncryptedFrame() { - seq = *(uint64_t *)payload; - } - }; +private: + enum class AuthFlag : uint64_t { ENCRYPTED = 1, SIGNED = 2 }; char *temp_buffer; State state; @@ -450,9 +75,11 @@ private: AuthAuthorizer *authorizer; uint32_t auth_method; bool got_bad_auth; + uint32_t got_bad_method; CryptoKey session_key; std::shared_ptr session_security; std::unique_ptr authorizer_challenge; + uint64_t auth_flags; uint64_t connection_features; uint64_t cookie; uint64_t global_seq; @@ -470,7 +97,7 @@ private: using ProtFuncPtr = void (ProtocolV2::*)(); Ct *bannerExchangeCallback; - uint32_t next_frame_len; + uint32_t next_payload_len; Tag next_tag; ceph_msg_header2 current_header; utime_t backoff; // backoff time @@ -484,11 +111,13 @@ private: bool keepalive; ostream &_conn_prefix(std::ostream *_dout); + inline void run_continuation(Ct *continuation); Ct *read(CONTINUATION_PARAM(next, ProtocolV2, char *, int), int len, char *buffer = nullptr); - Ct *write(CONTINUATION_PARAM(next, ProtocolV2, int), - bufferlist &bl); + Ct *write(const std::string &desc, + CONTINUATION_PARAM(next, ProtocolV2), + bufferlist &buffer); void requeue_sent(); uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq); @@ -503,18 +132,17 @@ private: void append_keepalive_ack(utime_t ×tamp); void handle_message_ack(uint64_t seq); - WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, _banner_exchange_handle_write); + CONTINUATION_DECL(ProtocolV2, _wait_for_peer_banner); READ_HANDLER_CONTINUATION_DECL(ProtocolV2, _banner_exchange_handle_peer_banner); Ct *_banner_exchange(Ct *callback); - Ct *_banner_exchange_handle_write(int r); + Ct *_wait_for_peer_banner(); Ct *_banner_exchange_handle_peer_banner(char *buffer, int r); CONTINUATION_DECL(ProtocolV2, read_frame); READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_length_and_tag); READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_frame_payload); - WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_more_write); READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_message_header); CONTINUATION_DECL(ProtocolV2, throttle_message); CONTINUATION_DECL(ProtocolV2, throttle_bytes); @@ -527,10 +155,7 @@ private: Ct *read_frame(); Ct *handle_read_frame_length_and_tag(char *buffer, int r); Ct *handle_frame_payload(char *buffer, int r); - Ct *handle_auth_more(char *payload, uint32_t length); - Ct *handle_auth_more_write(int r); - Ct *handle_ident(char *payload, uint32_t length); Ct *ready(); @@ -574,21 +199,19 @@ private: // Client Protocol CONTINUATION_DECL(ProtocolV2, start_client_banner_exchange); CONTINUATION_DECL(ProtocolV2, post_client_banner_exchange); - WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_request_write); - WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_client_ident_write); - WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_reconnect_write); Ct *start_client_banner_exchange(); Ct *post_client_banner_exchange(); - Ct *send_auth_request(std::vector allowed_methods = {}); - Ct *handle_auth_request_write(int r); + inline Ct *send_auth_request() { + std::vector empty; + return send_auth_request(empty); + } + Ct *send_auth_request(std::vector &allowed_methods); Ct *handle_auth_bad_method(char *payload, uint32_t length); Ct *handle_auth_bad_auth(char *payload, uint32_t length); Ct *handle_auth_done(char *payload, uint32_t length); Ct *send_client_ident(); - Ct *handle_client_ident_write(int r); Ct *send_reconnect(); - Ct *handle_reconnect_write(int r); Ct *handle_ident_missing_features(char *payload, uint32_t length); Ct *handle_session_reset(); Ct *handle_session_retry(char *payload, uint32_t length); @@ -600,36 +223,28 @@ private: // Server Protocol CONTINUATION_DECL(ProtocolV2, start_server_banner_exchange); CONTINUATION_DECL(ProtocolV2, post_server_banner_exchange); - WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_bad_method_write); - WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_bad_auth_write); - WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_auth_done_write); - WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, - handle_ident_missing_features_write); - WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_session_reset_write); - WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_session_retry_write); - WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_wait_write); - WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_server_ident_write); - WRITE_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_reconnect_ok_write); + CONTINUATION_DECL(ProtocolV2, server_ready); Ct *start_server_banner_exchange(); Ct *post_server_banner_exchange(); + Ct *handle_cephx_auth(bufferlist &auth_payload); Ct *handle_auth_request(char *payload, uint32_t length); - Ct *handle_auth_bad_method_write(int r); - Ct *handle_auth_bad_auth_write(int r); - Ct *handle_auth_done_write(int r); Ct *handle_client_ident(char *payload, uint32_t length); Ct *handle_ident_missing_features_write(int r); Ct *handle_reconnect(char *payload, uint32_t length); - Ct *handle_session_reset_write(int r); - Ct *handle_session_retry_write(int r); Ct *handle_existing_connection(AsyncConnectionRef existing); - Ct *handle_wait_write(int r); Ct *reuse_connection(AsyncConnectionRef existing, ProtocolV2 *exproto, bool reconnect); Ct *send_server_ident(); - Ct *handle_server_ident_write(int r); Ct *send_reconnect_ok(); - Ct *handle_reconnect_ok_write(int r); + Ct *server_ready(); + +public: + template + friend struct SignedEncryptedFrame; + + // TODO: REMOVE THIS + void log(const std::string message, uint64_t val, uint64_t val2); }; #endif /* _MSG_ASYNC_PROTOCOL_V2_ */