// session security structure. PLR
ldout(cct, 10) << __func__ << " accept setting up session_security." << dendl;
- // existing?
+ if (connection->policy.server &&
+ connection->policy.lossy) {
+ // incoming lossy client, no need to register this connection
+ // new session
+ ldout(cct, 10) << __func__ << " accept new session" << dendl;
+ return open(reply, authorizer_reply);
+ }
+
AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
connection->inject_delay();
peer_global_seq = client_ident.global_seq();
- // Looks good so far, let's check if there is already an existing connection
- // to this peer.
-
- connection->lock.unlock();
- AsyncConnectionRef existing = messenger->lookup_conn(*connection->peer_addrs);
-
- if (existing &&
- existing->protocol->proto_type != 2) {
- ldout(cct,1) << __func__ << " existing " << existing << " proto "
- << existing->protocol.get() << " version is "
- << existing->protocol->proto_type << ", marking down" << dendl;
- existing->mark_down();
- existing = nullptr;
- }
+ if (connection->policy.server &&
+ connection->policy.lossy) {
+ // incoming lossy client, no need to register this connection
+ } else {
+ // Looks good so far, let's check if there is already an existing connection
+ // to this peer.
+ connection->lock.unlock();
+ AsyncConnectionRef existing = messenger->lookup_conn(
+ *connection->peer_addrs);
+
+ if (existing &&
+ existing->protocol->proto_type != 2) {
+ ldout(cct,1) << __func__ << " existing " << existing << " proto "
+ << existing->protocol.get() << " version is "
+ << existing->protocol->proto_type << ", marking down"
+ << dendl;
+ existing->mark_down();
+ existing = nullptr;
+ }
- connection->inject_delay();
+ connection->inject_delay();
- connection->lock.lock();
- if (state != SESSION_ACCEPTING) {
- ldout(cct, 1) << __func__
- << " state changed while accept, it must be mark_down"
- << dendl;
- ceph_assert(state == CLOSED);
- return _fault();
- }
+ connection->lock.lock();
+ if (state != SESSION_ACCEPTING) {
+ ldout(cct, 1) << __func__
+ << " state changed while accept, it must be mark_down"
+ << dendl;
+ ceph_assert(state == CLOSED);
+ return _fault();
+ }
- if (existing) {
- return handle_existing_connection(existing);
+ if (existing) {
+ return handle_existing_connection(existing);
+ }
}
// if everything is OK reply with server identification
bool got_connect;
bool loopback;
entity_addrvec_t last_accept;
+ ConnectionRef *last_accept_con_ptr = nullptr;
explicit FakeDispatcher(bool s): Dispatcher(g_ceph_context),
is_server(s), got_new(false), got_remote_reset(false),
}
void ms_handle_fast_accept(Connection *con) override {
last_accept = con->get_peer_addrs();
+ if (last_accept_con_ptr) {
+ *last_accept_con_ptr = con;
+ }
if (!con->get_priv()) {
con->set_priv(RefCountedPtr{new Session(con), false});
}
server_msgr->wait();
}
-TEST_P(MessengerTest, NameAddrTest) {
- FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
- entity_addr_t bind_addr;
- bind_addr.parse("v2:127.0.0.1");
- server_msgr->bind(bind_addr);
- server_msgr->add_dispatcher_head(&srv_dispatcher);
- server_msgr->start();
-
- client_msgr->add_dispatcher_head(&cli_dispatcher);
- client_msgr->start();
-
- MPing *m = new MPing();
- ConnectionRef conn = client_msgr->connect_to(server_msgr->get_mytype(),
- server_msgr->get_myaddrs());
- {
- ASSERT_EQ(conn->send_message(m), 0);
- std::unique_lock l{cli_dispatcher.lock};
- cli_dispatcher.cond.wait(l, [&] { return cli_dispatcher.got_new; });
- cli_dispatcher.got_new = false;
- }
- ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
- ASSERT_TRUE(conn->get_peer_addrs() == server_msgr->get_myaddrs());
- ConnectionRef server_conn = server_msgr->connect_to(
- client_msgr->get_mytype(), srv_dispatcher.last_accept);
- // Verify that server_conn is the one we already accepted from client,
- // so it means the session counter in server_conn is also incremented.
- ASSERT_EQ(1U, static_cast<Session*>(server_conn->get_priv().get())->get_count());
- server_msgr->shutdown();
- client_msgr->shutdown();
- server_msgr->wait();
- client_msgr->wait();
-}
-
TEST_P(MessengerTest, FeatureTest) {
FakeDispatcher cli_dispatcher(false), srv_dispatcher(true);
entity_addr_t bind_addr;
ASSERT_FALSE(conn->is_connected());
srv_dispatcher.got_new = false;
+ ConnectionRef server_conn;
+ srv_dispatcher.last_accept_con_ptr = &server_conn;
conn = client_msgr->connect_to(server_msgr->get_mytype(),
server_msgr->get_myaddrs());
{
cli_dispatcher.got_new = false;
}
ASSERT_EQ(1U, static_cast<Session*>(conn->get_priv().get())->get_count());
- ConnectionRef server_conn = server_msgr->connect_to(client_msgr->get_mytype(),
- srv_dispatcher.last_accept);
+ ASSERT_TRUE(server_conn);
+
// server lose state
{
std::unique_lock l{srv_dispatcher.lock};