Otherwise the messenger implementation needs to be templated.
Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
}
#endif
-#define SERVER_SOCKET ShardedServerSocket<IS_FIXED_CPU>
-
-template <bool IS_FIXED_CPU>
-SERVER_SOCKET::ShardedServerSocket(
+ShardedServerSocket::ShardedServerSocket(
seastar::shard_id sid,
+ bool is_fixed_cpu,
construct_tag)
- : primary_sid{sid}
+ : primary_sid{sid}, is_fixed_cpu{is_fixed_cpu}
{
}
-template <bool IS_FIXED_CPU>
-SERVER_SOCKET::~ShardedServerSocket()
+ShardedServerSocket::~ShardedServerSocket()
{
assert(!listener);
// detect whether user have called destroy() properly
ceph_assert_always(!service);
}
-template <bool IS_FIXED_CPU>
listen_ertr::future<>
-SERVER_SOCKET::listen(entity_addr_t addr)
+ShardedServerSocket::listen(entity_addr_t addr)
{
ceph_assert_always(seastar::this_shard_id() == primary_sid);
logger().debug("ShardedServerSocket({})::listen()...", addr);
seastar::socket_address s_addr(addr.in4_addr());
seastar::listen_options lo;
lo.reuse_address = true;
- if constexpr (IS_FIXED_CPU) {
+ if (ss.is_fixed_cpu) {
lo.set_fixed_cpu(ss.primary_sid);
}
ss.listener = seastar::listen(s_addr, lo);
});
}
-template <bool IS_FIXED_CPU>
seastar::future<>
-SERVER_SOCKET::accept(accept_func_t &&_fn_accept)
+ShardedServerSocket::accept(accept_func_t &&_fn_accept)
{
ceph_assert_always(seastar::this_shard_id() == primary_sid);
logger().debug("ShardedServerSocket({})::accept()...", listen_addr);
return seastar::keep_doing([&ss] {
return ss.listener->accept(
).then([&ss](seastar::accept_result accept_result) {
- if constexpr (IS_FIXED_CPU) {
+#ifndef NDEBUG
+ if (ss.is_fixed_cpu) {
// see seastar::listen_options::set_fixed_cpu()
ceph_assert_always(seastar::this_shard_id() == ss.primary_sid);
}
+#endif
auto [socket, paddr] = std::move(accept_result);
entity_addr_t peer_addr;
peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
std::move(socket), Socket::side_t::acceptor,
peer_addr.get_port(), Socket::construct_tag{});
logger().debug("ShardedServerSocket({})::accept(): "
- "accepted peer {}, socket {}",
- ss.listen_addr, peer_addr, fmt::ptr(_socket));
+ "accepted peer {}, socket {}, is_fixed = {}",
+ ss.listen_addr, peer_addr, fmt::ptr(_socket), ss.is_fixed_cpu);
std::ignore = seastar::with_gate(
ss.shutdown_gate,
[socket=std::move(_socket), peer_addr, &ss]() mutable {
});
}
-template <bool IS_FIXED_CPU>
seastar::future<>
-SERVER_SOCKET::shutdown_destroy()
+ShardedServerSocket::shutdown_destroy()
{
assert(seastar::this_shard_id() == primary_sid);
logger().debug("ShardedServerSocket({})::shutdown_destroy()...", listen_addr);
});
}
-template <bool IS_FIXED_CPU>
-seastar::future<SERVER_SOCKET*>
-SERVER_SOCKET::create()
+seastar::future<ShardedServerSocket*>
+ShardedServerSocket::create(bool is_fixed_cpu)
{
auto primary_sid = seastar::this_shard_id();
// start the sharded service: we should only construct/stop shards on #0
- return seastar::smp::submit_to(0, [primary_sid] {
+ return seastar::smp::submit_to(0, [primary_sid, is_fixed_cpu] {
auto service = std::make_unique<sharded_service_t>();
- return service->start(primary_sid, construct_tag{}
+ return service->start(primary_sid, is_fixed_cpu, construct_tag{}
).then([service = std::move(service)]() mutable {
auto p_shard = service.get();
p_shard->local().service = std::move(service);
});
}
-template class ShardedServerSocket<true>;
-template class ShardedServerSocket<false>;
-
} // namespace crimson::net
crimson::ct_error::address_not_available // https://techoverflow.net/2021/08/06/how-i-fixed-python-oserror-errno-99-cannot-assign-requested-address/
>;
-template <bool IS_FIXED_CPU>
class ShardedServerSocket
- : public seastar::peering_sharded_service<ShardedServerSocket<IS_FIXED_CPU> > {
+ : public seastar::peering_sharded_service<ShardedServerSocket> {
struct construct_tag {};
public:
- ShardedServerSocket(seastar::shard_id sid, construct_tag);
+ ShardedServerSocket(seastar::shard_id sid, bool is_fixed_cpu, construct_tag);
~ShardedServerSocket();
ShardedServerSocket(ShardedServerSocket&&) = delete;
ShardedServerSocket(const ShardedServerSocket&) = delete;
+ ShardedServerSocket& operator=(ShardedServerSocket&&) = delete;
ShardedServerSocket& operator=(const ShardedServerSocket&) = delete;
+ bool is_fixed() const { return is_fixed_cpu; }
+
listen_ertr::future<> listen(entity_addr_t addr);
using accept_func_t =
seastar::future<> shutdown_destroy();
- static seastar::future<ShardedServerSocket*> create();
+ static seastar::future<ShardedServerSocket*> create(bool is_fixed_cpu);
private:
- // the fixed CPU if IS_FIXED_CPU is true
+ // the fixed CPU if is_fixed_cpu is true
const seastar::shard_id primary_sid;
+ const bool is_fixed_cpu;
entity_addr_t listen_addr;
std::optional<seastar::server_socket> listener;
seastar::gate shutdown_gate;
set_myaddrs(addrs);
return seastar::futurize_invoke([this] {
if (!listener) {
- return ShardedServerSocket<true>::create(
+ return ShardedServerSocket::create(true
).then([this] (auto _listener) {
listener = _listener;
});
ceph_assert(get_myaddr().get_port() > 0);
return listener->accept([this](SocketRef socket, entity_addr_t peer_addr) {
+ assert(listener->is_fixed());
assert(seastar::this_shard_id() == sid);
assert(get_myaddr().is_msgr2());
SocketConnectionRef conn =
namespace crimson::net {
-template <bool IS_FIXED_CPU>
class ShardedServerSocket;
class SocketMessenger final : public Messenger {
crimson::auth::AuthClient* auth_client = nullptr;
crimson::auth::AuthServer* auth_server = nullptr;
- ShardedServerSocket<true> *listener = nullptr;
+ ShardedServerSocket *listener = nullptr;
ChainedDispatchers dispatchers;
std::map<entity_addr_t, SocketConnectionRef> connections;
std::set<SocketConnectionRef> accepting_conns;
});
}
-template <bool IS_FIXED_CPU>
-future<> test_bind_same() {
+future<> test_bind_same(bool is_fixed_cpu) {
logger().info("test_bind_same()...");
- return ShardedServerSocket<IS_FIXED_CPU>::create().then([](auto pss1) {
+ return ShardedServerSocket::create(is_fixed_cpu
+ ).then([is_fixed_cpu](auto pss1) {
auto saddr = get_server_addr();
- return pss1->listen(saddr).safe_then([saddr] {
+ return pss1->listen(saddr).safe_then([saddr, is_fixed_cpu] {
// try to bind the same address
- return ShardedServerSocket<IS_FIXED_CPU>::create(
+ return ShardedServerSocket::create(is_fixed_cpu
).then([saddr](auto pss2) {
return pss2->listen(saddr).safe_then([] {
logger().error("test_bind_same() should raise address_in_use");
});
}
-template <bool IS_FIXED_CPU>
-future<> test_accept() {
+future<> test_accept(bool is_fixed_cpu) {
logger().info("test_accept()");
- return ShardedServerSocket<IS_FIXED_CPU>::create(
+ return ShardedServerSocket::create(is_fixed_cpu
).then([](auto pss) {
auto saddr = get_server_addr();
return pss->listen(saddr
});
}
-template <bool IS_FIXED_CPU>
class SocketFactory {
static constexpr seastar::shard_id CLIENT_CPU = 0u;
SocketRef client_socket;
seastar::promise<> server_connected;
static constexpr seastar::shard_id SERVER_CPU = 1u;
- ShardedServerSocket<IS_FIXED_CPU> *pss = nullptr;
+ ShardedServerSocket *pss = nullptr;
seastar::shard_id server_socket_CPU;
SocketRef server_socket;
public:
template <typename FuncC, typename FuncS>
- static future<> dispatch_sockets(FuncC&& cb_client, FuncS&& cb_server) {
+ static future<> dispatch_sockets(
+ bool is_fixed_cpu,
+ FuncC&& cb_client,
+ FuncS&& cb_server) {
ceph_assert_always(seastar::this_shard_id() == CLIENT_CPU);
auto owner = std::make_unique<SocketFactory>();
auto psf = owner.get();
auto saddr = get_server_addr();
- return seastar::smp::submit_to(SERVER_CPU, [psf, saddr] {
- return ShardedServerSocket<IS_FIXED_CPU>::create(
+ return seastar::smp::submit_to(SERVER_CPU, [psf, saddr, is_fixed_cpu] {
+ return ShardedServerSocket::create(is_fixed_cpu
).then([psf, saddr](auto pss) {
psf->pss = pss;
return pss->listen(saddr
logger().info("dispatch_sockets(): accepted at shard {}",
seastar::this_shard_id());
psf->server_socket_CPU = seastar::this_shard_id();
- if constexpr (IS_FIXED_CPU) {
+ if (psf->pss->is_fixed()) {
ceph_assert_always(SERVER_CPU == seastar::this_shard_id());
}
psf->server_socket = std::move(socket);
}
};
-template <bool IS_FIXED_CPU>
-future<> test_read_write() {
+future<> test_read_write(bool is_fixed_cpu) {
logger().info("test_read_write()...");
- return SocketFactory<IS_FIXED_CPU>::dispatch_sockets(
+ return SocketFactory::dispatch_sockets(
+ is_fixed_cpu,
[](auto cs) { return Connection::dispatch_rw_bounded(cs, 128); },
[](auto ss) { return Connection::dispatch_rw_bounded(ss, 128); }
).then([] {
});
}
-template <bool IS_FIXED_CPU>
-future<> test_unexpected_down() {
+future<> test_unexpected_down(bool is_fixed_cpu) {
logger().info("test_unexpected_down()...");
- return SocketFactory<IS_FIXED_CPU>::dispatch_sockets(
+ return SocketFactory::dispatch_sockets(
+ is_fixed_cpu,
[](auto cs) {
return Connection::dispatch_rw_bounded(cs, 128, true
).handle_exception_type([](const std::system_error& e) {
});
}
-template <bool IS_FIXED_CPU>
-future<> test_shutdown_propagated() {
+future<> test_shutdown_propagated(bool is_fixed_cpu) {
logger().info("test_shutdown_propagated()...");
- return SocketFactory<IS_FIXED_CPU>::dispatch_sockets(
+ return SocketFactory::dispatch_sockets(
+ is_fixed_cpu,
[](auto cs) {
logger().debug("test_shutdown_propagated() shutdown client socket");
cs->shutdown();
});
}
-template <bool IS_FIXED_CPU>
-future<> test_preemptive_down() {
+future<> test_preemptive_down(bool is_fixed_cpu) {
logger().info("test_preemptive_down()...");
- return SocketFactory<IS_FIXED_CPU>::dispatch_sockets(
+ return SocketFactory::dispatch_sockets(
+ is_fixed_cpu,
[](auto cs) { return Connection::dispatch_rw_unbounded(cs, true); },
[](auto ss) { return Connection::dispatch_rw_unbounded(ss); }
).then([] {
});
}
-template <bool IS_FIXED_CPU>
-future<> do_test_with_type() {
- return test_bind_same<IS_FIXED_CPU>(
- ).then([] {
- return test_accept<IS_FIXED_CPU>();
- }).then([] {
- return test_read_write<IS_FIXED_CPU>();
- }).then([] {
- return test_unexpected_down<IS_FIXED_CPU>();
- }).then([] {
- return test_shutdown_propagated<IS_FIXED_CPU>();
- }).then([] {
- return test_preemptive_down<IS_FIXED_CPU>();
+future<> do_test_with_type(bool is_fixed_cpu) {
+ return test_bind_same(is_fixed_cpu
+ ).then([is_fixed_cpu] {
+ return test_accept(is_fixed_cpu);
+ }).then([is_fixed_cpu] {
+ return test_read_write(is_fixed_cpu);
+ }).then([is_fixed_cpu] {
+ return test_unexpected_down(is_fixed_cpu);
+ }).then([is_fixed_cpu] {
+ return test_shutdown_propagated(is_fixed_cpu);
+ }).then([is_fixed_cpu] {
+ return test_preemptive_down(is_fixed_cpu);
});
}
}).then([] {
return test_refused();
}).then([] {
- return do_test_with_type<true>();
+ return do_test_with_type(true);
}).then([] {
- return do_test_with_type<false>();
+ return do_test_with_type(false);
}).then([] {
logger().info("All tests succeeded");
// Seastar has bugs to have events undispatched during shutdown,