From 774bd9d99ed9d9f0630dba9658d756812b5e3253 Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Thu, 21 Feb 2019 21:35:13 +0100 Subject: [PATCH] msg/async, v2: frame decoding operates on bufferlist. This change is driven by buggy buffer's life time management polluting AuthAuthorizer::bl with dangling raw_static instances. Signed-off-by: Radoslaw Zarzynski --- src/msg/async/ProtocolV2.cc | 219 ++++++++++++++++++++---------------- src/msg/async/ProtocolV2.h | 34 +++--- 2 files changed, 140 insertions(+), 113 deletions(-) diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index c5dc81a5431..9f766c1f3c1 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -189,9 +189,7 @@ public: return payload; } - void decode_frame(char *payload, uint32_t length) { - bufferlist bl; - bl.push_back(buffer::create_static(length, payload)); + void decode_frame(const ceph::bufferlist& bl) { auto ti = bl.cbegin(); static_cast(this)->decode_payload(ti); } @@ -199,7 +197,9 @@ public: void decode_payload(bufferlist::const_iterator &ti) {} }; +// TODO, FIXME: fix this altogether with the Frame hierarchy rework struct do_not_encode_tag_t {}; +struct dummy_ctor_conflict_helper {}; template struct PayloadFrame : public Frame { @@ -222,6 +222,8 @@ protected: } } else if constexpr (std::is_same()) { this->payload.append((char *)&t, sizeof(t)); + } else if constexpr (std::is_same()) { + /* NOP, only to discriminate ctors for decode/encode. FIXME. */ } else { encode(t, this->payload, features); } @@ -244,6 +246,8 @@ protected: auto ptr = ti.get_current_ptr(); ti.advance(sizeof(T)); t = *(T *)ptr.raw_c_str(); + } else if constexpr (std::is_same()) { + /* NOP, only to discriminate ctors for decode/encode. FIXME. */ } else { decode(t, ti); } @@ -267,8 +271,8 @@ public: PayloadFrame(do_not_encode_tag_t) {} - PayloadFrame(char *payload, uint32_t length) { - this->decode_frame(payload, length); + PayloadFrame(const ceph::bufferlist &payload) { + this->decode_frame(payload); } void decode_payload(bufferlist::const_iterator &ti) { @@ -313,19 +317,21 @@ struct AuthBadMethodFrame }; struct AuthReplyMoreFrame - : public PayloadFrame { + : public PayloadFrame { static const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_REPLY_MORE; using PayloadFrame::PayloadFrame; - inline bufferlist &auth_payload() { return get_val<0>(); } + inline bufferlist &auth_payload() { return get_val<1>(); } }; struct AuthRequestMoreFrame - : public PayloadFrame { + : public PayloadFrame { static const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_REQUEST_MORE; using PayloadFrame::PayloadFrame; - inline bufferlist &auth_payload() { return get_val<0>(); } + inline bufferlist &auth_payload() { return get_val<1>(); } }; struct AuthDoneFrame @@ -371,23 +377,21 @@ struct SignedEncryptedFrame : public PayloadFrame { } } - SignedEncryptedFrame(ProtocolV2 &protocol, char *payload, uint32_t length) + SignedEncryptedFrame(ProtocolV2 &protocol, ceph::bufferlist& bl) : PayloadFrame(do_not_encode_tag_t{}) { if (!protocol.session_stream_handlers.rx) { - this->decode_frame(payload, length); + this->decode_frame(bl); return; } - ceph::bufferlist bl; - bl.push_back(buffer::create_static(length, payload)); - + const auto length = bl.length(); ceph::bufferlist plain_bl = \ protocol.session_stream_handlers.rx->authenticated_decrypt_update_final( std::move(bl), ProtocolV2::segment_t::DEFAULT_ALIGNMENT); ceph_assert(plain_bl.length() == length - \ protocol.session_stream_handlers.rx->get_extra_size_at_final()); - this->decode_frame(plain_bl.c_str(), plain_bl.length()); + this->decode_frame(plain_bl); } }; @@ -548,7 +552,7 @@ struct MessageHeaderFrame MessageHeaderFrame(ceph::bufferlist&& text) : PayloadFrame(do_not_encode_tag_t{}) { - this->decode_frame(text.c_str(), text.length()); + this->decode_frame(text); } inline ceph_msg_header2 &header() { return get_val<0>(); } @@ -1366,10 +1370,12 @@ CtPtr ProtocolV2::_handle_peer_banner_payload(char *buffer, int r) { return WRITE(hello.get_buffer(), "hello frame", read_frame); } -CtPtr ProtocolV2::handle_hello(char *payload, uint32_t length) { - ldout(cct, 20) << __func__ << " payload_len=" << std::dec << length << dendl; +CtPtr ProtocolV2::handle_hello(ceph::bufferlist &payload) +{ + ldout(cct, 20) << __func__ + << " payload.length()=" << payload.length() << dendl; - HelloFrame hello(payload, length); + HelloFrame hello(payload); ldout(cct, 5) << __func__ << " received hello:" << " peer_type=" << (int)hello.entity_type() @@ -1603,50 +1609,47 @@ CtPtr ProtocolV2::handle_read_frame_segment(char *buffer, int r) { CtPtr ProtocolV2::handle_frame_payload() { ceph_assert(!rx_segments_data.empty()); - auto* buffer = rx_segments_data.back().c_str(); - const auto this_payload_len = rx_segments_data.back().length(); + auto& payload = rx_segments_data.back(); ldout(cct, 30) << __func__ << "\n"; - bufferlist bl; - bl.append(buffer, this_payload_len); - bl.hexdump(*_dout); + payload.hexdump(*_dout); *_dout << dendl; switch (next_tag) { case Tag::HELLO: - return handle_hello(buffer, this_payload_len); + return handle_hello(payload); case Tag::AUTH_REQUEST: - return handle_auth_request(buffer, this_payload_len); + return handle_auth_request(payload); case Tag::AUTH_BAD_METHOD: - return handle_auth_bad_method(buffer, this_payload_len); + return handle_auth_bad_method(payload); case Tag::AUTH_REPLY_MORE: - return handle_auth_reply_more(buffer, this_payload_len); + return handle_auth_reply_more(payload); case Tag::AUTH_REQUEST_MORE: - return handle_auth_request_more(buffer, this_payload_len); + return handle_auth_request_more(payload); case Tag::AUTH_DONE: - return handle_auth_done(buffer, this_payload_len); + return handle_auth_done(payload); case Tag::CLIENT_IDENT: - return handle_client_ident(buffer, this_payload_len); + return handle_client_ident(payload); case Tag::SERVER_IDENT: - return handle_server_ident(buffer, this_payload_len); + return handle_server_ident(payload); case Tag::IDENT_MISSING_FEATURES: - return handle_ident_missing_features(buffer, this_payload_len); + return handle_ident_missing_features(payload); case Tag::SESSION_RECONNECT: - return handle_reconnect(buffer, this_payload_len); + return handle_reconnect(payload); case Tag::SESSION_RESET: - return handle_session_reset(buffer, this_payload_len); + return handle_session_reset(payload); case Tag::SESSION_RETRY: - return handle_session_retry(buffer, this_payload_len); + return handle_session_retry(payload); case Tag::SESSION_RETRY_GLOBAL: - return handle_session_retry_global(buffer, this_payload_len); + return handle_session_retry_global(payload); case Tag::SESSION_RECONNECT_OK: - return handle_reconnect_ok(buffer, this_payload_len); + return handle_reconnect_ok(payload); case Tag::KEEPALIVE2: - return handle_keepalive2(buffer, this_payload_len); + return handle_keepalive2(payload); case Tag::KEEPALIVE2_ACK: - return handle_keepalive2_ack(buffer, this_payload_len); + return handle_keepalive2_ack(payload); case Tag::ACK: - return handle_message_ack(buffer, this_payload_len); + return handle_message_ack(payload); default: ceph_abort(); } @@ -2137,10 +2140,12 @@ CtPtr ProtocolV2::throttle_dispatch_queue() { return read_frame_segment(); } -CtPtr ProtocolV2::handle_keepalive2(char *payload, uint32_t length) { - ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; +CtPtr ProtocolV2::handle_keepalive2(ceph::bufferlist &payload) +{ + ldout(cct, 20) << __func__ + << " payload.length()=" << payload.length() << dendl; - KeepAliveFrame keepalive_frame(*this, payload, length); + KeepAliveFrame keepalive_frame(*this, payload); ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl; @@ -2159,20 +2164,24 @@ CtPtr ProtocolV2::handle_keepalive2(char *payload, uint32_t length) { return CONTINUE(read_frame); } -CtPtr ProtocolV2::handle_keepalive2_ack(char *payload, uint32_t length) { - ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; +CtPtr ProtocolV2::handle_keepalive2_ack(ceph::bufferlist &payload) +{ + ldout(cct, 20) << __func__ + << " payload.length()=" << payload.length() << dendl; - KeepAliveFrameAck keepalive_ack_frame(*this, payload, length); + KeepAliveFrameAck keepalive_ack_frame(*this, payload); connection->set_last_keepalive_ack(keepalive_ack_frame.timestamp()); ldout(cct, 20) << __func__ << " got KEEPALIVE_ACK" << dendl; return CONTINUE(read_frame); } -CtPtr ProtocolV2::handle_message_ack(char *payload, uint32_t length) { - ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; +CtPtr ProtocolV2::handle_message_ack(ceph::bufferlist &payload) +{ + ldout(cct, 20) << __func__ + << " payload.length()=" << payload.length() << dendl; - AckFrame ack(*this, payload, length); + AckFrame ack(*this, payload); handle_message_ack(ack.seq()); return CONTINUE(read_frame); } @@ -2230,10 +2239,11 @@ CtPtr ProtocolV2::send_auth_request(std::vector &allowed_methods) { return WRITE(frame.get_buffer(), "auth request", read_frame); } -CtPtr ProtocolV2::handle_auth_bad_method(char *payload, uint32_t length) { - ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; +CtPtr ProtocolV2::handle_auth_bad_method(ceph::bufferlist &payload) { + ldout(cct, 20) << __func__ + << " payload.length()=" << payload.length() << dendl; - AuthBadMethodFrame bad_method(payload, length); + AuthBadMethodFrame bad_method(payload); ldout(cct, 1) << __func__ << " method=" << bad_method.method() << " result " << cpp_strerror(bad_method.result()) << ", allowed methods=" << bad_method.allowed_methods() @@ -2255,17 +2265,17 @@ CtPtr ProtocolV2::handle_auth_bad_method(char *payload, uint32_t length) { return send_auth_request(bad_method.allowed_methods()); } -CtPtr ProtocolV2::handle_auth_reply_more(char *payload, uint32_t length) +CtPtr ProtocolV2::handle_auth_reply_more(ceph::bufferlist &payload) { - ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; - AuthReplyMoreFrame auth_more(payload, length); + ldout(cct, 20) << __func__ + << " payload.length()=" << payload.length() << dendl; + + AuthReplyMoreFrame auth_more(payload); ldout(cct, 5) << __func__ << " auth reply more len=" << auth_more.auth_payload().length() << dendl; ceph_assert(messenger->auth_client); - bufferlist bl; - bl.append(payload, length); - bufferlist reply; + ceph::bufferlist reply; auto am = auth_meta; connection->lock.unlock(); int r = messenger->auth_client->handle_auth_reply_more( @@ -2279,14 +2289,16 @@ CtPtr ProtocolV2::handle_auth_reply_more(char *payload, uint32_t length) << r << dendl; return _fault(); } - AuthRequestMoreFrame more_reply(reply); + AuthRequestMoreFrame more_reply(dummy_ctor_conflict_helper{}, reply); return WRITE(more_reply.get_buffer(), "auth request more", read_frame); } -CtPtr ProtocolV2::handle_auth_done(char *payload, uint32_t length) { - ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; +CtPtr ProtocolV2::handle_auth_done(ceph::bufferlist &payload) +{ + ldout(cct, 20) << __func__ + << " payload.length()=" << payload.length() << dendl; - AuthDoneFrame auth_done(payload, length); + AuthDoneFrame auth_done(payload); ceph_assert(messenger->auth_client); auto am = auth_meta; @@ -2408,11 +2420,12 @@ CtPtr ProtocolV2::send_reconnect() { return WRITE(reconnect.get_buffer(), "reconnect", read_frame); } -CtPtr ProtocolV2::handle_ident_missing_features(char *payload, - uint32_t length) { - ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; +CtPtr ProtocolV2::handle_ident_missing_features(ceph::bufferlist &payload) +{ + ldout(cct, 20) << __func__ + << " payload.length()=" << payload.length() << dendl; - IdentMissingFeaturesFrame ident_missing(*this, payload, length); + IdentMissingFeaturesFrame ident_missing(*this, payload); lderr(cct) << __func__ << " client does not support all server features: " << std::hex << ident_missing.features() << std::dec << dendl; @@ -2420,10 +2433,12 @@ CtPtr ProtocolV2::handle_ident_missing_features(char *payload, return _fault(); } -CtPtr ProtocolV2::handle_session_reset(char *payload, uint32_t length) { - ldout(cct, 20) << __func__ << dendl; +CtPtr ProtocolV2::handle_session_reset(ceph::bufferlist &payload) +{ + ldout(cct, 20) << __func__ + << " payload.length()=" << payload.length() << dendl; - ResetFrame reset(*this, payload, length); + ResetFrame reset(*this, payload); ldout(cct, 1) << __func__ << " received session reset full=" << reset.full() << dendl; @@ -2438,10 +2453,12 @@ CtPtr ProtocolV2::handle_session_reset(char *payload, uint32_t length) { return send_client_ident(); } -CtPtr ProtocolV2::handle_session_retry(char *payload, uint32_t length) { - ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; +CtPtr ProtocolV2::handle_session_retry(ceph::bufferlist &payload) +{ + ldout(cct, 20) << __func__ + << " payload.length()=" << payload.length() << dendl; - RetryFrame retry(*this, payload, length); + RetryFrame retry(*this, payload); connect_seq = retry.connect_seq() + 1; ldout(cct, 1) << __func__ @@ -2451,10 +2468,12 @@ CtPtr ProtocolV2::handle_session_retry(char *payload, uint32_t length) { return send_reconnect(); } -CtPtr ProtocolV2::handle_session_retry_global(char *payload, uint32_t length) { - ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; +CtPtr ProtocolV2::handle_session_retry_global(ceph::bufferlist &payload) +{ + ldout(cct, 20) << __func__ + << " payload.length()=" << payload.length() << dendl; - RetryGlobalFrame retry(*this, payload, length); + RetryGlobalFrame retry(*this, payload); global_seq = messenger->get_global_seq(retry.global_seq()); ldout(cct, 1) << __func__ << " received session retry global global_seq=" @@ -2470,15 +2489,16 @@ CtPtr ProtocolV2::handle_wait() { state = WAIT; ceph_assert(rx_segments_data.size() == 1); ceph_assert(rx_segments_desc.size() == 1); - WaitFrame(*this, rx_segments_data[SegmentIndex::Frame::PAYLOAD].c_str(), - rx_segments_data[SegmentIndex::Frame::PAYLOAD].length()); + WaitFrame(*this, rx_segments_data[SegmentIndex::Frame::PAYLOAD]); return _fault(); } -CtPtr ProtocolV2::handle_reconnect_ok(char *payload, uint32_t length) { - ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; +CtPtr ProtocolV2::handle_reconnect_ok(ceph::bufferlist &payload) +{ + ldout(cct, 20) << __func__ + << " payload.length()=" << payload.length() << dendl; - ReconnectOkFrame reconnect_ok(*this, payload, length); + ReconnectOkFrame reconnect_ok(*this, payload); ldout(cct, 5) << __func__ << " reconnect accepted: sms=" << reconnect_ok.msg_seq() << dendl; @@ -2500,10 +2520,12 @@ CtPtr ProtocolV2::handle_reconnect_ok(char *payload, uint32_t length) { return ready(); } -CtPtr ProtocolV2::handle_server_ident(char *payload, uint32_t length) { - ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; +CtPtr ProtocolV2::handle_server_ident(ceph::bufferlist &payload) +{ + ldout(cct, 20) << __func__ + << " payload.length()=" << payload.length() << dendl; - ServerIdentFrame server_ident(*this, payload, length); + ServerIdentFrame server_ident(*this, payload); ldout(cct, 5) << __func__ << " received server identification:" << " addrs=" << server_ident.addrs() << " gid=" << server_ident.gid() @@ -2569,8 +2591,8 @@ CtPtr ProtocolV2::post_server_banner_exchange() { return CONTINUE(read_frame); } -CtPtr ProtocolV2::handle_auth_request(char *payload, uint32_t length) { - AuthRequestFrame request(payload, length); +CtPtr ProtocolV2::handle_auth_request(ceph::bufferlist &payload) { + AuthRequestFrame request(payload); ldout(cct, 10) << __func__ << " AuthRequest(method=" << request.method() << ", preferred_modes=" << request.preferred_modes() << ", payload_len=" << request.auth_payload().length() << ")" @@ -2644,7 +2666,7 @@ CtPtr ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more) reply); return WRITE(auth_done.get_buffer(), "auth done", read_frame); } else if (r == 0) { - AuthReplyMoreFrame more(reply); + AuthReplyMoreFrame more(dummy_ctor_conflict_helper{}, reply); return WRITE(more.get_buffer(), "auth reply more", read_frame); } else if (r == -EBUSY) { // kick the client and maybe they'll come back later @@ -2654,17 +2676,20 @@ CtPtr ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more) } } -CtPtr ProtocolV2::handle_auth_request_more(char *payload, uint32_t length) +CtPtr ProtocolV2::handle_auth_request_more(ceph::bufferlist &payload) { - ldout(cct, 20) << __func__ << " payload_len=" << length << dendl; - AuthRequestMoreFrame auth_more(payload, length); + ldout(cct, 20) << __func__ + << " payload.length()=" << payload.length() << dendl; + AuthRequestMoreFrame auth_more(payload); return _handle_auth_request(auth_more.auth_payload(), true); } -CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) { - ldout(cct, 20) << __func__ << " payload_len=" << std::dec << length << dendl; +CtPtr ProtocolV2::handle_client_ident(ceph::bufferlist &payload) +{ + ldout(cct, 20) << __func__ + << " payload.length()=" << payload.length() << dendl; - ClientIdentFrame client_ident(*this, payload, length); + ClientIdentFrame client_ident(*this, payload); ldout(cct, 5) << __func__ << " received client identification:" << " addrs=" << client_ident.addrs() @@ -2750,10 +2775,12 @@ CtPtr ProtocolV2::handle_client_ident(char *payload, uint32_t length) { return send_server_ident(); } -CtPtr ProtocolV2::handle_reconnect(char *payload, uint32_t length) { - ldout(cct, 20) << __func__ << " payload_len=" << std::dec << length << dendl; +CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload) +{ + ldout(cct, 20) << __func__ + << " payload.length()=" << payload.length() << dendl; - ReconnectFrame reconnect(*this, payload, length); + ReconnectFrame reconnect(*this, payload); ldout(cct, 5) << __func__ << " received reconnect:" diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index 80292c5793c..66e4e9752a7 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -191,7 +191,7 @@ private: Ct *_wait_for_peer_banner(); Ct *_handle_peer_banner(char *buffer, int r); Ct *_handle_peer_banner_payload(char *buffer, int r); - Ct *handle_hello(char *payload, uint32_t length); + Ct *handle_hello(ceph::bufferlist &payload); CONTINUATION_DECL(ProtocolV2, read_frame); READ_HANDLER_CONTINUATION_DECL(ProtocolV2, handle_read_frame_preamble_main); @@ -222,10 +222,10 @@ private: Ct *handle_message_extra_bytes(char *buffer, int r); Ct *handle_message_complete(); - Ct *handle_keepalive2(char *payload, uint32_t length); - Ct *handle_keepalive2_ack(char *payload, uint32_t length); + Ct *handle_keepalive2(ceph::bufferlist &payload); + Ct *handle_keepalive2_ack(ceph::bufferlist &payload); - Ct *handle_message_ack(char *payload, uint32_t length); + Ct *handle_message_ack(ceph::bufferlist &payload); public: uint64_t connection_features; @@ -257,18 +257,18 @@ private: return send_auth_request(empty); } Ct *send_auth_request(std::vector &allowed_methods); - Ct *handle_auth_bad_method(char *payload, uint32_t length); - Ct *handle_auth_reply_more(char *payload, uint32_t length); - Ct *handle_auth_done(char *payload, uint32_t length); + Ct *handle_auth_bad_method(ceph::bufferlist &payload); + Ct *handle_auth_reply_more(ceph::bufferlist &payload); + Ct *handle_auth_done(ceph::bufferlist &payload); Ct *send_client_ident(); Ct *send_reconnect(); - Ct *handle_ident_missing_features(char *payload, uint32_t length); - Ct *handle_session_reset(char *payload, uint32_t length); - Ct *handle_session_retry(char *payload, uint32_t length); - Ct *handle_session_retry_global(char *payload, uint32_t length); + Ct *handle_ident_missing_features(ceph::bufferlist &payload); + Ct *handle_session_reset(ceph::bufferlist &payload); + Ct *handle_session_retry(ceph::bufferlist &payload); + Ct *handle_session_retry_global(ceph::bufferlist &payload); Ct *handle_wait(); - Ct *handle_reconnect_ok(char *payload, uint32_t length); - Ct *handle_server_ident(char *payload, uint32_t length); + Ct *handle_reconnect_ok(ceph::bufferlist &payload); + Ct *handle_server_ident(ceph::bufferlist &payload); // Server Protocol CONTINUATION_DECL(ProtocolV2, start_server_banner_exchange); @@ -277,13 +277,13 @@ private: Ct *start_server_banner_exchange(); Ct *post_server_banner_exchange(); - Ct *handle_auth_request(char *payload, uint32_t length); - Ct *handle_auth_request_more(char *payload, uint32_t length); + Ct *handle_auth_request(ceph::bufferlist &payload); + Ct *handle_auth_request_more(ceph::bufferlist &payload); Ct *_handle_auth_request(bufferlist& auth_payload, bool more); Ct *_auth_bad_method(int r); - Ct *handle_client_ident(char *payload, uint32_t length); + Ct *handle_client_ident(ceph::bufferlist &payload); Ct *handle_ident_missing_features_write(int r); - Ct *handle_reconnect(char *payload, uint32_t length); + Ct *handle_reconnect(ceph::bufferlist &payload); Ct *handle_existing_connection(AsyncConnectionRef existing); Ct *reuse_connection(AsyncConnectionRef existing, ProtocolV2 *exproto); -- 2.39.5