// this tuple is only used when decoding values from a payload buffer
std::tuple<Args...> _values;
- // required only when signing and encryting payload, otherwise is null
+ // required for using econding/decoding features or when signing and
+ // encryting payload, otherwise is null
ProtocolV2 *protocol;
+ uint64_t features;
+
template <typename T>
inline void _encode_payload_each(T &t) {
if constexpr (std::is_same<T, bufferlist const>()) {
this->payload.claim_append((bufferlist &)t);
} else if constexpr (std::is_same<T, std::vector<uint32_t> const>()) {
- encode((uint32_t)t.size(), this->payload, -1ll);
+ encode((uint32_t)t.size(), this->payload, features);
for (const auto &elem : t) {
- encode(elem, this->payload, 0);
+ encode(elem, this->payload, features);
}
} else if constexpr (std::is_same<T, ceph_msg_header2 const>()) {
this->payload.append((char *)&t, sizeof(t));
protocol->sign_payload(this->payload);
protocol->encrypt_payload(this->payload);
} else {
- encode(t, this->payload, -1ll);
+ encode(t, this->payload, features);
}
}
}
public:
- PayloadFrame(const Args &... args) : protocol(nullptr) {
+ PayloadFrame(const Args &... args) : protocol(nullptr), features(0) {
(_encode_payload_each(args), ...);
}
PayloadFrame(ProtocolV2 *protocol, const Args &... args)
- : protocol(protocol) {
+ : protocol(protocol), features(protocol->connection_features) {
(_encode_payload_each(args), ...);
}
}
};
+struct HelloFrame : public PayloadFrame<HelloFrame,
+ uint8_t, // entity type
+ entity_addr_t> { // peer_addr
+ const ProtocolV2::Tag tag = ProtocolV2::Tag::HELLO;
+ using PayloadFrame::PayloadFrame;
+
+ inline uint8_t &entity_type() { return get_val<0>(); }
+ inline entity_addr_t &peer_addr() { return get_val<1>(); }
+};
+
struct AuthRequestFrame
: public PayloadFrame<AuthRequestFrame, uint32_t, uint32_t, bufferlist> {
const ProtocolV2::Tag tag = ProtocolV2::Tag::AUTH_REQUEST;
struct ServerIdentFrame
: public SignedEncryptedFrame<ServerIdentFrame, entity_addrvec_t,
- entity_addr_t, int64_t, uint64_t, uint64_t,
+ int64_t, uint64_t, uint64_t,
uint64_t, uint64_t, uint64_t> {
const ProtocolV2::Tag tag = ProtocolV2::Tag::IDENT;
using SignedEncryptedFrame::SignedEncryptedFrame;
inline entity_addrvec_t &addrs() { return get_val<0>(); }
- inline entity_addr_t &peer_addr() { return get_val<1>(); }
- inline int64_t &gid() { return get_val<2>(); }
- inline uint64_t &global_seq() { return get_val<3>(); }
- inline uint64_t &supported_features() { return get_val<4>(); }
- inline uint64_t &required_features() { return get_val<5>(); }
- inline uint64_t &flags() { return get_val<6>(); }
- inline uint64_t &cookie() { return get_val<7>(); }
+ inline int64_t &gid() { return get_val<1>(); }
+ inline uint64_t &global_seq() { return get_val<2>(); }
+ inline uint64_t &supported_features() { return get_val<3>(); }
+ inline uint64_t &required_features() { return get_val<4>(); }
+ inline uint64_t &flags() { return get_val<5>(); }
+ inline uint64_t &cookie() { return get_val<6>(); }
};
struct ReconnectFrame
ldout(cct, 20) << __func__ << dendl;
bannerExchangeCallback = callback;
- uint8_t type = messenger->get_mytype();
- __le64 supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
- __le64 required_features = CEPH_MSGR2_REQUIRED_FEATURES;
-
- size_t banner_prefix_len = strlen(CEPH_BANNER_V2_PREFIX);
- __le16 banner_payload_len = sizeof(uint8_t) + 2 * sizeof(__le64);
- size_t banner_len = banner_prefix_len + sizeof(__le16) + banner_payload_len;
- char banner[banner_len];
- uint8_t offset = 0;
- memcpy(banner, CEPH_BANNER_V2_PREFIX, banner_prefix_len);
- offset += banner_prefix_len;
- memcpy(banner + offset, (void *)&banner_payload_len, sizeof(__le16));
- offset += sizeof(__le16);
- memcpy(banner + offset, (void *)&type, sizeof(uint8_t));
- offset += sizeof(uint8_t);
- memcpy(banner + offset, (void *)&supported_features, sizeof(__le64));
- offset += sizeof(__le64);
- memcpy(banner + offset, (void *)&required_features, sizeof(__le64));
+ bufferlist banner_payload;
+ encode((uint64_t)CEPH_MSGR2_SUPPORTED_FEATURES, banner_payload, 0);
+ encode((uint64_t)CEPH_MSGR2_REQUIRED_FEATURES, banner_payload, 0);
bufferlist bl;
- bl.append(banner, banner_len);
+ bl.append(CEPH_BANNER_V2_PREFIX, strlen(CEPH_BANNER_V2_PREFIX));
+ encode((uint16_t)banner_payload.length(), bl, 0);
+ bl.claim_append(banner_payload);
return WRITE(bl, "banner", _wait_for_peer_banner);
}
CtPtr ProtocolV2::_wait_for_peer_banner() {
- unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(__le16) +
- sizeof(uint8_t) + 2 * sizeof(__le64);
- return READ(banner_len, _banner_exchange_handle_peer_banner);
+ unsigned banner_len = strlen(CEPH_BANNER_V2_PREFIX) + sizeof(__le16);
+ return READ(banner_len, _handle_peer_banner);
}
-CtPtr ProtocolV2::_banner_exchange_handle_peer_banner(char *buffer, int r) {
+CtPtr ProtocolV2::_handle_peer_banner(char *buffer, int r) {
ldout(cct, 20) << __func__ << " r=" << r << dendl;
if (r < 0) {
return _fault();
}
- uint8_t peer_type = 0;
- __le64 peer_supported_features;
- __le64 peer_required_features;
+ uint16_t payload_len;
+ bufferlist bl;
+ bl.push_back(
+ buffer::create_static(sizeof(__le16), buffer + banner_prefix_len));
+ auto ti = bl.cbegin();
+ try {
+ decode(payload_len, ti);
+ } catch (const buffer::error &e) {
+ lderr(cct) << __func__ << " decode banner payload len failed " << dendl;
+ return _fault();
+ }
- uint8_t offset = banner_prefix_len;
- __le16 banner_payload_len = *(__le16 *)(buffer + offset);
+ ceph_assert(payload_len <= 4096); // if we need more then we need to increase
+ // temp_buffer size as well
- // V2 banner len check
- if (banner_payload_len != (sizeof(uint8_t) + 2 * sizeof(__le64))) {
- lderr(cct) << __func__ << " bad banner length: " << banner_payload_len
- << dendl;
+ next_payload_len = payload_len;
+ return READ(next_payload_len, _handle_peer_banner_payload);
+}
+
+CtPtr ProtocolV2::_handle_peer_banner_payload(char *buffer, int r) {
+ ldout(cct, 20) << __func__ << " r=" << r << dendl;
+
+ if (r < 0) {
+ ldout(cct, 1) << __func__ << " read peer banner payload failed r=" << r
+ << " (" << cpp_strerror(r) << ")" << dendl;
return _fault();
}
- offset += sizeof(__le16);
-
- peer_type = *(uint8_t *)(buffer + offset);
- offset += sizeof(uint8_t);
- peer_supported_features = *(__le64 *)(buffer + offset);
- offset += sizeof(__le64);
- peer_required_features = *(__le64 *)(buffer + offset);
- if (connection->get_peer_type() == -1) {
- connection->set_peer_type(peer_type);
+ uint64_t peer_supported_features;
+ uint64_t peer_required_features;
- ceph_assert(state == ACCEPTING);
- connection->policy = messenger->get_policy(peer_type);
- ldout(cct, 10) << __func__ << " accept of host_type " << (int)peer_type
- << ", policy.lossy=" << connection->policy.lossy
- << " policy.server=" << connection->policy.server
- << " policy.standby=" << connection->policy.standby
- << " policy.resetcheck=" << connection->policy.resetcheck
- << dendl;
- } else {
- if (connection->get_peer_type() != peer_type) {
- ldout(cct, 1) << __func__ << " connection peer type does not match what"
- << " peer advertises " << connection->get_peer_type()
- << " != " << (int)peer_type << dendl;
- stop();
- connection->dispatch_queue->queue_reset(connection);
- return nullptr;
- }
+ bufferlist bl;
+ bl.push_back(buffer::create_static(next_payload_len, buffer));
+ auto ti = bl.cbegin();
+ try {
+ decode(peer_supported_features, ti);
+ decode(peer_required_features, ti);
+ } catch (const buffer::error &e) {
+ lderr(cct) << __func__ << " decode banner payload failed " << dendl;
+ return _fault();
}
- ldout(cct, 1) << __func__ << " peer_type=" << (int)peer_type
- << " supported=" << std::hex << peer_supported_features
- << " required=" << std::hex << peer_required_features
- << std::dec << dendl;
+ ldout(cct, 1) << __func__ << " supported=" << std::hex
+ << peer_supported_features << " required=" << std::hex
+ << peer_required_features << std::dec << dendl;
// Check feature bit compatibility
- __le64 supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
- __le64 required_features = CEPH_MSGR2_REQUIRED_FEATURES;
+ uint64_t supported_features = CEPH_MSGR2_SUPPORTED_FEATURES;
+ uint64_t required_features = CEPH_MSGR2_REQUIRED_FEATURES;
if ((required_features & peer_supported_features) != required_features) {
ldout(cct, 1) << __func__ << " peer does not support all required features"
}
this->peer_required_features = peer_required_features;
+ if (this->peer_required_features == 0) {
+ this->connection_features = CEPH_FEATURE_MSG_ADDR2;
+ }
+
+ HelloFrame hello(this, messenger->get_mytype(), connection->target_addr);
+ return WRITE(hello.get_buffer(), "hello frame", read_frame);
+}
+
+CtPtr ProtocolV2::handle_hello(char *payload, uint32_t length) {
+ ldout(cct, 20) << __func__ << " payload_len=" << std::dec << length << dendl;
+
+ HelloFrame hello(payload, length);
- if (cct->_conf->ms_inject_internal_delays &&
- cct->_conf->ms_inject_socket_failures) {
- if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
- ldout(cct, 10) << __func__ << " sleep for "
- << cct->_conf->ms_inject_internal_delays << dendl;
- utime_t t;
- t.set_from_double(cct->_conf->ms_inject_internal_delays);
- t.sleep();
+ ldout(cct, 5) << __func__ << " received hello:"
+ << " peer_type=" << (int)hello.entity_type()
+ << " peer_addr_for_me=" << hello.peer_addr() << dendl;
+
+ if (connection->get_peer_type() == -1) {
+ connection->set_peer_type(hello.entity_type());
+
+ ceph_assert(state == ACCEPTING);
+ connection->policy = messenger->get_policy(hello.entity_type());
+ ldout(cct, 10) << __func__ << " accept of host_type "
+ << (int)hello.entity_type()
+ << ", policy.lossy=" << connection->policy.lossy
+ << " policy.server=" << connection->policy.server
+ << " policy.standby=" << connection->policy.standby
+ << " policy.resetcheck=" << connection->policy.resetcheck
+ << dendl;
+ } else {
+ if (connection->get_peer_type() != hello.entity_type()) {
+ ldout(cct, 1) << __func__ << " connection peer type does not match what"
+ << " peer advertises " << connection->get_peer_type()
+ << " != " << (int)hello.entity_type() << dendl;
+ stop();
+ connection->dispatch_queue->queue_reset(connection);
+ return nullptr;
}
}
<< " tag=" << static_cast<uint32_t>(next_tag) << dendl;
switch (next_tag) {
+ case Tag::HELLO:
case Tag::AUTH_REQUEST:
case Tag::AUTH_BAD_METHOD:
case Tag::AUTH_BAD_AUTH:
}
switch (next_tag) {
+ case Tag::HELLO:
+ return handle_hello(buffer, next_payload_len);
case Tag::AUTH_REQUEST:
return handle_auth_request(buffer, next_payload_len);
case Tag::AUTH_BAD_METHOD:
messenger->get_myaddrs().front().is_blank_ip()) {
sockaddr_storage ss;
socklen_t len = sizeof(ss);
- getsockname(connection->cs.fd(), (sockaddr*)&ss, &len);
- ldout(cct,1) << __func__ << " getsockname reveals I am " << (sockaddr*)&ss
- << " when talking to " << connection->target_addr << dendl;
+ getsockname(connection->cs.fd(), (sockaddr *)&ss, &len);
+ ldout(cct, 1) << __func__ << " getsockname reveals I am " << (sockaddr *)&ss
+ << " when talking to " << connection->target_addr << dendl;
entity_addr_t a;
a.set_type(entity_addr_t::TYPE_MSGR2); // anything but NONE; learned_addr ignores this
- a.set_sockaddr((sockaddr*)&ss);
+ a.set_sockaddr((sockaddr *)&ss);
a.set_port(0);
connection->lock.unlock();
messenger->learned_addr(a);
if (cct->_conf->ms_inject_internal_delays &&
- cct->_conf->ms_inject_socket_failures) {
+ cct->_conf->ms_inject_socket_failures) {
if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
- ldout(cct, 10) << __func__ << " sleep for "
- << cct->_conf->ms_inject_internal_delays << dendl;
- utime_t t;
- t.set_from_double(cct->_conf->ms_inject_internal_delays);
- t.sleep();
+ ldout(cct, 10) << __func__ << " sleep for "
+ << cct->_conf->ms_inject_internal_delays << dendl;
+ utime_t t;
+ t.set_from_double(cct->_conf->ms_inject_internal_delays);
+ t.sleep();
}
}
connection->lock.lock();
if (state != CONNECTING) {
ldout(cct, 1) << __func__
- << " state changed while learned_addr, mark_down or "
- << " replacing must be happened just now" << dendl;
+ << " state changed while learned_addr, mark_down or "
+ << " replacing must be happened just now" << dendl;
return nullptr;
}
}
ServerIdentFrame server_ident(this, payload, length);
ldout(cct, 5) << __func__ << " received server identification:"
<< " addrs=" << server_ident.addrs()
- << " my_addr=" << server_ident.peer_addr()
<< " gid=" << server_ident.gid()
<< " global_seq=" << server_ident.global_seq()
<< " features_supported=" << std::hex
if (client_ident.addrs().empty()) {
return _fault(); // a v2 peer should never do this
}
+
connection->set_peer_addrs(client_ident.addrs());
connection->target_addr = connection->_infer_target_addr(client_ident.addrs());
uint64_t gs = messenger->get_global_seq();
ServerIdentFrame server_ident(
- this, messenger->get_myaddrs(), connection->target_addr,
- messenger->get_myname().num(), gs, connection->policy.features_supported,
+ this, messenger->get_myaddrs(), messenger->get_myname().num(), gs,
+ connection->policy.features_supported,
connection->policy.features_required, flags, cookie);
ldout(cct, 5) << __func__ << " sending identification:"
<< " addrs=" << messenger->get_myaddrs()
- << " target_addr=" << connection->target_addr
<< " gid=" << messenger->get_myname().num()
<< " global_seq=" << gs << " features_supported=" << std::hex
<< connection->policy.features_supported