#include "common/ceph_time.h"
#include "messages/MPing.h"
+#include "crimson/auth/DummyAuth.h"
#include "crimson/common/log.h"
#include "crimson/net/Connection.h"
#include "crimson/net/Dispatcher.h"
static bool verbose = false;
static seastar::future<> test_echo(unsigned rounds,
- double keepalive_ratio)
+ double keepalive_ratio,
+ bool v2)
{
struct test_state {
struct Server final
public seastar::peering_sharded_service<Server> {
ceph::net::Messenger *msgr = nullptr;
MessageRef msg_pong{new MPing(), false};
+ ceph::auth::DummyAuthClientServer dummy_auth;
Dispatcher* get_local_shard() override {
return &(container().local());
return fut.then([this, addr](ceph::net::Messenger *messenger) {
return container().invoke_on_all([messenger](auto& server) {
server.msgr = messenger->get_local_shard();
+ server.msgr->set_auth_client(&server.dummy_auth);
+ server.msgr->set_auth_server(&server.dummy_auth);
}).then([messenger, addr] {
return messenger->bind(entity_addrvec_t{addr});
}).then([this, messenger] {
std::map<ceph::net::Connection*, seastar::promise<>> pending_conns;
std::map<ceph::net::ConnectionRef, PingSessionRef> sessions;
MessageRef msg_ping{new MPing(), false};
+ ceph::auth::DummyAuthClientServer dummy_auth;
Client(unsigned rounds, double keepalive_ratio)
: rounds(rounds),
.then([this](ceph::net::Messenger *messenger) {
return container().invoke_on_all([messenger](auto& client) {
client.msgr = messenger->get_local_shard();
+ client.msgr->set_auth_client(&client.dummy_auth);
+ client.msgr->set_auth_server(&client.dummy_auth);
}).then([this, messenger] {
return messenger->start(this);
});
};
};
- logger().info("test_echo():");
+ logger().info("test_echo(rounds={}, keepalive_ratio={}, v2={}):",
+ rounds, keepalive_ratio, v2);
return seastar::when_all_succeed(
ceph::net::create_sharded<test_state::Server>(),
ceph::net::create_sharded<test_state::Server>(),
ceph::net::create_sharded<test_state::Client>(rounds, keepalive_ratio),
ceph::net::create_sharded<test_state::Client>(rounds, keepalive_ratio))
- .then([rounds, keepalive_ratio](test_state::Server *server1,
- test_state::Server *server2,
- test_state::Client *client1,
- test_state::Client *client2) {
+ .then([rounds, keepalive_ratio, v2](test_state::Server *server1,
+ test_state::Server *server2,
+ test_state::Client *client1,
+ test_state::Client *client2) {
// start servers and clients
entity_addr_t addr1;
addr1.parse("127.0.0.1:9010", nullptr);
- addr1.set_type(entity_addr_t::TYPE_LEGACY);
entity_addr_t addr2;
addr2.parse("127.0.0.1:9011", nullptr);
- addr2.set_type(entity_addr_t::TYPE_LEGACY);
+ if (v2) {
+ addr1.set_type(entity_addr_t::TYPE_MSGR2);
+ addr2.set_type(entity_addr_t::TYPE_MSGR2);
+ } else {
+ addr1.set_type(entity_addr_t::TYPE_LEGACY);
+ addr2.set_type(entity_addr_t::TYPE_LEGACY);
+ }
return seastar::when_all_succeed(
server1->init(entity_name_t::OSD(0), "server1", 1, addr1),
server2->init(entity_name_t::OSD(1), "server2", 2, addr2),
}).finally([server2] {
logger().info("server2 shutdown...");
return server2->shutdown();
+ }).finally([] {
+ logger().info("test_echo() done!\n");
});
});
}
-static seastar::future<> test_concurrent_dispatch()
+static seastar::future<> test_concurrent_dispatch(bool v2)
{
struct test_state {
struct Server final
int count = 0;
seastar::promise<> on_second; // satisfied on second dispatch
seastar::promise<> on_done; // satisfied when first dispatch unblocks
+ ceph::auth::DummyAuthClientServer dummy_auth;
seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
MessageRef m) override {
.then([this, addr](ceph::net::Messenger *messenger) {
return container().invoke_on_all([messenger](auto& server) {
server.msgr = messenger->get_local_shard();
+ server.msgr->set_auth_client(&server.dummy_auth);
+ server.msgr->set_auth_server(&server.dummy_auth);
}).then([messenger, addr] {
return messenger->bind(entity_addrvec_t{addr});
}).then([this, messenger] {
: public ceph::net::Dispatcher,
public seastar::peering_sharded_service<Client> {
ceph::net::Messenger *msgr = nullptr;
+ ceph::auth::DummyAuthClientServer dummy_auth;
seastar::future<> init(const entity_name_t& name,
const std::string& lname,
.then([this](ceph::net::Messenger *messenger) {
return container().invoke_on_all([messenger](auto& client) {
client.msgr = messenger->get_local_shard();
+ client.msgr->set_auth_client(&client.dummy_auth);
+ client.msgr->set_auth_server(&client.dummy_auth);
}).then([this, messenger] {
return messenger->start(this);
});
};
};
- logger().info("test_concurrent_dispatch():");
+ logger().info("test_concurrent_dispatch(v2={}):", v2);
return seastar::when_all_succeed(
ceph::net::create_sharded<test_state::Server>(),
ceph::net::create_sharded<test_state::Client>())
- .then([](test_state::Server *server,
+ .then([v2](test_state::Server *server,
test_state::Client *client) {
entity_addr_t addr;
addr.parse("127.0.0.1:9010", nullptr);
- addr.set_type(entity_addr_t::TYPE_LEGACY);
+ if (v2) {
+ addr.set_type(entity_addr_t::TYPE_MSGR2);
+ } else {
+ addr.set_type(entity_addr_t::TYPE_LEGACY);
+ }
addr.set_family(AF_INET);
return seastar::when_all_succeed(
server->init(entity_name_t::OSD(4), "server3", 5, addr),
}).finally([server] {
logger().info("server shutdown...");
return server->msgr->shutdown();
+ }).finally([] {
+ logger().info("test_concurrent_dispatch() done!\n");
});
});
}
verbose = config["verbose"].as<bool>();
auto rounds = config["rounds"].as<unsigned>();
auto keepalive_ratio = config["keepalive-ratio"].as<double>();
- return test_echo(rounds, keepalive_ratio)
- .then([] {
- return test_concurrent_dispatch();
+ return test_echo(rounds, keepalive_ratio, false)
+ .then([rounds, keepalive_ratio] {
+ return test_echo(rounds, keepalive_ratio, true);
+ }).then([] {
+ return test_concurrent_dispatch(false);
+ }).then([] {
+ return test_concurrent_dispatch(true);
}).then([] {
std::cout << "All tests succeeded" << std::endl;
}).handle_exception([] (auto eptr) {