using seq_num_t = uint64_t;
class Connection : public seastar::enable_shared_from_this<Connection> {
- entity_name_t peer_name = {0, -1};
+ entity_name_t peer_name = {0, entity_name_t::NEW};
protected:
entity_addr_t peer_addr;
clock_t::time_point last_keepalive;
clock_t::time_point last_keepalive_ack;
- void set_peer_type(entity_type_t peer_type) { peer_name._type = peer_type; }
- void set_peer_id(int64_t peer_id) { peer_name._num = peer_id; }
- void set_peer_name(entity_name_t name) { peer_name = name; }
+ void set_peer_type(entity_type_t peer_type) {
+ // it is not allowed to assign an unknown value when the current
+ // value is known
+ assert(!(peer_type == 0 &&
+ peer_name.type() != 0));
+ // it is not allowed to assign a different known value when the
+ // current value is also known.
+ assert(!(peer_type != 0 &&
+ peer_name.type() != 0 &&
+ peer_type != peer_name.type()));
+ peer_name._type = peer_type;
+ }
+ void set_peer_id(int64_t peer_id) {
+ // it is not allowed to assign an unknown value when the current
+ // value is known
+ assert(!(peer_id == entity_name_t::NEW &&
+ peer_name.num() != entity_name_t::NEW));
+ // it is not allowed to assign a different known value when the
+ // current value is also known.
+ assert(!(peer_id != entity_name_t::NEW &&
+ peer_name.num() != entity_name_t::NEW &&
+ peer_id != peer_name.num()));
+ peer_name._num = peer_id;
+ }
+ void set_peer_name(entity_name_t name) {
+ set_peer_type(name.type());
+ set_peer_id(name.num());
+ }
public:
uint64_t peer_global_id = 0;
/// or a new pending connection
virtual ConnectionRef
connect(const entity_addr_t& peer_addr,
- const entity_type_t& peer_type) = 0;
+ const entity_name_t& peer_name) = 0;
+
+ ConnectionRef
+ connect(const entity_addr_t& peer_addr,
+ const entity_type_t& peer_type) {
+ return connect(peer_addr, entity_name_t(peer_type, -1));
+ }
// wait for messenger shutdown
virtual seastar::future<> wait() = 0;
}
virtual void start_connect(const entity_addr_t& peer_addr,
- const entity_type_t& peer_type) = 0;
+ const entity_name_t& peer_name) = 0;
virtual void start_accept(SocketRef&& socket,
const entity_addr_t& peer_addr) = 0;
}
void ProtocolV1::start_connect(const entity_addr_t& _peer_addr,
- const entity_type_t& _peer_type)
+ const entity_name_t& _peer_name)
{
ceph_assert(state == state_t::none);
logger().trace("{} trigger connecting, was {}", conn, static_cast<int>(state));
ceph_assert(!socket);
conn.peer_addr = _peer_addr;
conn.target_addr = _peer_addr;
- conn.set_peer_type(_peer_type);
- conn.policy = messenger.get_policy(_peer_type);
+ conn.set_peer_name(_peer_name);
+ conn.policy = messenger.get_policy(_peer_name.type());
messenger.register_conn(
seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()));
gated_dispatch("start_connect", [this] {
.then([this](bufferlist bl) {
auto p = bl.cbegin();
::decode(h.connect, p);
+ if (conn.get_peer_type() != 0 &&
+ conn.get_peer_type() != h.connect.host_type) {
+ logger().error("{} repeat_handle_connect(): my peer type does not match"
+ " what peer advertises {} != {}",
+ conn, conn.get_peer_type(), h.connect.host_type);
+ throw std::system_error(make_error_code(error::protocol_aborted));
+ }
conn.set_peer_type(h.connect.host_type);
conn.policy = messenger.get_policy(h.connect.host_type);
if (!conn.policy.lossy && !conn.policy.server && conn.target_addr.get_port() <= 0) {
private:
void start_connect(const entity_addr_t& peer_addr,
- const entity_type_t& peer_type) override;
+ const entity_name_t& peer_name) override;
void start_accept(SocketRef&& socket,
const entity_addr_t& peer_addr) override;
ProtocolV2::~ProtocolV2() {}
void ProtocolV2::start_connect(const entity_addr_t& _peer_addr,
- const entity_type_t& _peer_type)
+ const entity_name_t& _peer_name)
{
ceph_assert(state == state_t::NONE);
ceph_assert(!socket);
conn.peer_addr = _peer_addr;
conn.target_addr = _peer_addr;
- conn.set_peer_type(_peer_type);
- conn.policy = messenger.get_policy(_peer_type);
+ conn.set_peer_name(_peer_name);
+ conn.policy = messenger.get_policy(_peer_name.type());
client_cookie = generate_client_cookie();
- logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_type={}, cc={}"
+ logger().info("{} ProtocolV2::start_connect(): peer_addr={}, peer_name={}, cc={}"
" policy(lossy={}, server={}, standby={}, resetcheck={})",
- conn, _peer_addr, ceph_entity_type_name(_peer_type), client_cookie,
+ conn, _peer_addr, _peer_name, client_cookie,
conn.policy.lossy, conn.policy.server,
conn.policy.standby, conn.policy.resetcheck);
messenger.register_conn(
throw std::system_error(
make_error_code(crimson::net::error::bad_peer_address));
}
+ if (conn.get_peer_id() != entity_name_t::NEW &&
+ conn.get_peer_id() != server_ident.gid()) {
+ logger().error("{} connection peer id ({}) does not match "
+ "what it should be ({}) during connecting, close",
+ conn, server_ident.gid(), conn.get_peer_id());
+ abort_in_close(*this, true);
+ }
conn.set_peer_id(server_ident.gid());
conn.set_features(server_ident.supported_features() &
conn.policy.features_supported);
});
}
+bool ProtocolV2::validate_peer_name(const entity_name_t& peer_name) const
+{
+ auto my_peer_name = conn.get_peer_name();
+ if (my_peer_name.type() != peer_name.type()) {
+ return false;
+ }
+ if (my_peer_name.num() != entity_name_t::NEW &&
+ peer_name.num() != entity_name_t::NEW &&
+ my_peer_name.num() != peer_name.num()) {
+ return false;
+ }
+ return true;
+}
+
seastar::future<ProtocolV2::next_step_t>
ProtocolV2::send_wait()
{
existing_proto->client_cookie,
existing_proto->server_cookie);
+ if (!validate_peer_name(existing_conn->get_peer_name())) {
+ logger().error("{} server_connect: my peer_name doesn't match"
+ " the existing connection {}, abort", conn, existing_conn);
+ abort_in_fault();
+ }
+
if (existing_proto->state == state_t::REPLACING) {
logger().warn("{} server_connect: racing replace happened while"
" replacing existing connection {}, send wait.",
make_error_code(crimson::net::error::bad_peer_address));
}
+ if (conn.get_peer_id() != entity_name_t::NEW &&
+ conn.get_peer_id() != client_ident.gid()) {
+ logger().error("{} client_ident peer_id ({}) does not match"
+ " what it should be ({}) during accepting, abort",
+ conn, client_ident.gid(), conn.get_peer_id());
+ abort_in_fault();
+ }
conn.set_peer_id(client_ident.gid());
client_cookie = client_ident.cookie();
existing_proto->client_cookie,
existing_proto->server_cookie);
+ if (!validate_peer_name(existing_conn->get_peer_name())) {
+ logger().error("{} server_reconnect: my peer_name doesn't match"
+ " the existing connection {}, abort", conn, existing_conn);
+ abort_in_fault();
+ }
+
if (existing_proto->state == state_t::REPLACING) {
logger().warn("{} server_reconnect: racing replace happened while "
" replacing existing connection {}, retry global.",
return write_frame(reconnect_ok);
} else {
client_cookie = new_client_cookie;
- conn.set_peer_name(new_peer_name);
+ assert(conn.get_peer_type() == new_peer_name.type());
+ if (conn.get_peer_id() == entity_name_t::NEW) {
+ conn.set_peer_id(new_peer_name.num());
+ }
connection_features = new_conn_features;
return send_server_ident();
}
private:
void start_connect(const entity_addr_t& peer_addr,
- const entity_type_t& peer_type) override;
+ const entity_name_t& peer_name) override;
void start_accept(SocketRef&& socket,
const entity_addr_t& peer_addr) override;
seastar::future<> _handle_auth_request(bufferlist& auth_payload, bool more);
seastar::future<> server_auth();
+ bool validate_peer_name(const entity_name_t& peer_name) const;
seastar::future<next_step_t> send_wait();
seastar::future<next_step_t> reuse_connection(ProtocolV2* existing_proto,
bool do_reset=false,
void
SocketConnection::start_connect(const entity_addr_t& _peer_addr,
- const entity_type_t& _peer_type)
+ const entity_name_t& _peer_name)
{
- protocol->start_connect(_peer_addr, _peer_type);
+ protocol->start_connect(_peer_addr, _peer_name);
}
void
/// start a handshake from the client's perspective,
/// only call when SocketConnection first construct
void start_connect(const entity_addr_t& peer_addr,
- const entity_type_t& peer_type);
+ const entity_name_t& peer_name);
/// start a handshake from the server's perspective,
/// only call when SocketConnection first construct
void start_accept(SocketRef&& socket,
}
crimson::net::ConnectionRef
-SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type)
+SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_name_t& peer_name)
{
assert(seastar::engine().cpu_id() == master_sid);
}
SocketConnectionRef conn = seastar::make_shared<SocketConnection>(
*this, *dispatcher, peer_addr.is_msgr2());
- conn->start_connect(peer_addr, peer_type);
+ conn->start_connect(peer_addr, peer_name);
return conn->shared_from_this();
}
seastar::future<> start(Dispatcher *dispatcher) override;
ConnectionRef connect(const entity_addr_t& peer_addr,
- const entity_type_t& peer_type) override;
+ const entity_name_t& peer_name) override;
// can only wait once
seastar::future<> wait() override {
assert(seastar::engine().cpu_id() == master_sid);