seastar::future<>
Socket::close() {
#ifndef NDEBUG
- ceph_assert(!closed);
+ ceph_assert_always(!closed);
closed = true;
#endif
return seastar::when_all_succeed(
void Socket::set_trap(bp_type_t type, bp_action_t action, socket_blocker* blocker_) {
blocker = blocker_;
if (type == bp_type_t::READ) {
- ceph_assert(next_trap_read == bp_action_t::CONTINUE);
+ ceph_assert_always(next_trap_read == bp_action_t::CONTINUE);
next_trap_read = action;
} else { // type == bp_type_t::WRITE
if (next_trap_write == bp_action_t::CONTINUE) {
next_trap_write = action;
} else if (next_trap_write == bp_action_t::FAULT) {
// do_sweep_messages() may combine multiple write events into one socket write
- ceph_assert(action == bp_action_t::FAULT || action == bp_action_t::CONTINUE);
+ ceph_assert_always(action == bp_action_t::FAULT || action == bp_action_t::CONTINUE);
} else {
ceph_abort();
}
}
#endif
-FixedCPUServerSocket::FixedCPUServerSocket(
- seastar::shard_id cpu,
+#define SERVER_SOCKET ShardedServerSocket<IS_FIXED_CPU>
+
+template <bool IS_FIXED_CPU>
+SERVER_SOCKET::ShardedServerSocket(
+ seastar::shard_id sid,
construct_tag)
- : fixed_cpu{cpu}
+ : primary_sid{sid}
{
}
-FixedCPUServerSocket::~FixedCPUServerSocket()
+template <bool IS_FIXED_CPU>
+SERVER_SOCKET::~ShardedServerSocket()
{
assert(!listener);
// detect whether user have called destroy() properly
- ceph_assert(!service);
+ ceph_assert_always(!service);
}
+template <bool IS_FIXED_CPU>
listen_ertr::future<>
-FixedCPUServerSocket::listen(entity_addr_t addr)
+SERVER_SOCKET::listen(entity_addr_t addr)
{
- assert(seastar::this_shard_id() == fixed_cpu);
- logger().debug("FixedCPUServerSocket({})::listen()...", addr);
- return container().invoke_on_all([addr](auto& ss) {
+ ceph_assert_always(seastar::this_shard_id() == primary_sid);
+ logger().debug("ShardedServerSocket({})::listen()...", addr);
+ return this->container().invoke_on_all([addr](auto& ss) {
ss.listen_addr = addr;
seastar::socket_address s_addr(addr.in4_addr());
seastar::listen_options lo;
lo.reuse_address = true;
- lo.set_fixed_cpu(ss.fixed_cpu);
+ if constexpr (IS_FIXED_CPU) {
+ lo.set_fixed_cpu(ss.primary_sid);
+ }
ss.listener = seastar::listen(s_addr, lo);
}).then([] {
return listen_ertr::now();
}).handle_exception_type(
[addr](const std::system_error& e) -> listen_ertr::future<> {
if (e.code() == std::errc::address_in_use) {
- logger().debug("FixedCPUServerSocket({})::listen(): address in use", addr);
+ logger().debug("ShardedServerSocket({})::listen(): address in use", addr);
return crimson::ct_error::address_in_use::make();
} else if (e.code() == std::errc::address_not_available) {
- logger().debug("FixedCPUServerSocket({})::listen(): address not available",
+ logger().debug("ShardedServerSocket({})::listen(): address not available",
addr);
return crimson::ct_error::address_not_available::make();
}
- logger().error("FixedCPUServerSocket({})::listen(): "
+ logger().error("ShardedServerSocket({})::listen(): "
"got unexpeted error {}", addr, e.what());
ceph_abort();
});
}
+template <bool IS_FIXED_CPU>
seastar::future<>
-FixedCPUServerSocket::accept(accept_func_t &&_fn_accept)
+SERVER_SOCKET::accept(accept_func_t &&_fn_accept)
{
- assert(seastar::this_shard_id() == fixed_cpu);
- logger().debug("FixedCPUServerSocket({})::accept()...", listen_addr);
- return container().invoke_on_all([_fn_accept](auto &ss) {
+ ceph_assert_always(seastar::this_shard_id() == primary_sid);
+ logger().debug("ShardedServerSocket({})::accept()...", listen_addr);
+ return this->container().invoke_on_all([_fn_accept](auto &ss) {
assert(ss.listener);
ss.fn_accept = _fn_accept;
// gate accepting
- // FixedCPUServerSocket::shutdown() will drain the continuations in the gate
+ // ShardedServerSocket::shutdown() will drain the continuations in the gate
// so ignore the returned future
std::ignore = seastar::with_gate(ss.shutdown_gate, [&ss] {
return seastar::keep_doing([&ss] {
return ss.listener->accept(
).then([&ss](seastar::accept_result accept_result) {
- // assert seastar::listen_options::set_fixed_cpu() works
- assert(seastar::this_shard_id() == ss.fixed_cpu);
+ if constexpr (IS_FIXED_CPU) {
+ // see seastar::listen_options::set_fixed_cpu()
+ ceph_assert_always(seastar::this_shard_id() == ss.primary_sid);
+ }
auto [socket, paddr] = std::move(accept_result);
entity_addr_t peer_addr;
peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
SocketRef _socket = std::make_unique<Socket>(
std::move(socket), Socket::side_t::acceptor,
peer_addr.get_port(), Socket::construct_tag{});
- logger().debug("FixedCPUServerSocket({})::accept(): "
+ logger().debug("ShardedServerSocket({})::accept(): "
"accepted peer {}, socket {}",
ss.listen_addr, peer_addr, fmt::ptr(_socket));
std::ignore = seastar::with_gate(
} catch (std::exception &e) {
e_what = e.what();
}
- logger().error("FixedCPUServerSocket({})::accept(): "
+ logger().error("ShardedServerSocket({})::accept(): "
"fn_accept(s, {}) got unexpected exception {}",
ss.listen_addr, peer_addr, e_what);
ceph_abort();
}).handle_exception_type([&ss](const std::system_error& e) {
if (e.code() == std::errc::connection_aborted ||
e.code() == std::errc::invalid_argument) {
- logger().debug("FixedCPUServerSocket({})::accept(): stopped ({})",
+ logger().debug("ShardedServerSocket({})::accept(): stopped ({})",
ss.listen_addr, e.what());
} else {
throw;
} catch (std::exception &e) {
e_what = e.what();
}
- logger().error("FixedCPUServerSocket({})::accept(): "
+ logger().error("ShardedServerSocket({})::accept(): "
"got unexpected exception {}", ss.listen_addr, e_what);
ceph_abort();
});
});
}
+template <bool IS_FIXED_CPU>
seastar::future<>
-FixedCPUServerSocket::shutdown_destroy()
+SERVER_SOCKET::shutdown_destroy()
{
- assert(seastar::this_shard_id() == fixed_cpu);
- logger().debug("FixedCPUServerSocket({})::shutdown_destroy()...", listen_addr);
+ assert(seastar::this_shard_id() == primary_sid);
+ logger().debug("ShardedServerSocket({})::shutdown_destroy()...", listen_addr);
// shutdown shards
- return container().invoke_on_all([](auto& ss) {
+ return this->container().invoke_on_all([](auto& ss) {
if (ss.listener) {
ss.listener->abort_accept();
}
return ss.shutdown_gate.close();
}).then([this] {
// destroy shards
- return container().invoke_on_all([](auto& ss) {
+ return this->container().invoke_on_all([](auto& ss) {
assert(ss.shutdown_gate.is_closed());
ss.listen_addr = entity_addr_t();
ss.listener.reset();
});
}).then([this] {
// stop the sharded service: we should only construct/stop shards on #0
- return container().invoke_on(0, [](auto& ss) {
+ return this->container().invoke_on(0, [](auto& ss) {
assert(ss.service);
return ss.service->stop().finally([cleanup = std::move(ss.service)] {});
});
});
}
-seastar::future<FixedCPUServerSocket*>
-FixedCPUServerSocket::create()
+template <bool IS_FIXED_CPU>
+seastar::future<SERVER_SOCKET*>
+SERVER_SOCKET::create()
{
- auto fixed_cpu = seastar::this_shard_id();
+ auto primary_sid = seastar::this_shard_id();
// start the sharded service: we should only construct/stop shards on #0
- return seastar::smp::submit_to(0, [fixed_cpu] {
+ return seastar::smp::submit_to(0, [primary_sid] {
auto service = std::make_unique<sharded_service_t>();
- return service->start(fixed_cpu, construct_tag{}
+ return service->start(primary_sid, construct_tag{}
).then([service = std::move(service)]() mutable {
auto p_shard = service.get();
p_shard->local().service = std::move(service);
});
}
+template class ShardedServerSocket<true>;
+template class ShardedServerSocket<false>;
+
} // namespace crimson::net
socket_blocker* blocker = nullptr;
#endif
- friend class FixedCPUServerSocket;
+ friend class ShardedServerSocket;
};
using listen_ertr = crimson::errorator<
crimson::ct_error::address_not_available // https://techoverflow.net/2021/08/06/how-i-fixed-python-oserror-errno-99-cannot-assign-requested-address/
>;
-class FixedCPUServerSocket
- : public seastar::peering_sharded_service<FixedCPUServerSocket> {
+template <bool IS_FIXED_CPU>
+class ShardedServerSocket
+ : public seastar::peering_sharded_service<ShardedServerSocket<IS_FIXED_CPU> > {
struct construct_tag {};
public:
- FixedCPUServerSocket(seastar::shard_id cpu, construct_tag);
+ ShardedServerSocket(seastar::shard_id sid, construct_tag);
- ~FixedCPUServerSocket();
+ ~ShardedServerSocket();
- FixedCPUServerSocket(FixedCPUServerSocket&&) = delete;
- FixedCPUServerSocket(const FixedCPUServerSocket&) = delete;
- FixedCPUServerSocket& operator=(const FixedCPUServerSocket&) = delete;
+ ShardedServerSocket(ShardedServerSocket&&) = delete;
+ ShardedServerSocket(const ShardedServerSocket&) = delete;
+ ShardedServerSocket& operator=(const ShardedServerSocket&) = delete;
listen_ertr::future<> listen(entity_addr_t addr);
seastar::future<> shutdown_destroy();
- static seastar::future<FixedCPUServerSocket*> create();
+ static seastar::future<ShardedServerSocket*> create();
private:
- const seastar::shard_id fixed_cpu;
+ // the fixed CPU if IS_FIXED_CPU is true
+ const seastar::shard_id primary_sid;
entity_addr_t listen_addr;
std::optional<seastar::server_socket> listener;
seastar::gate shutdown_gate;
accept_func_t fn_accept;
- using sharded_service_t = seastar::sharded<FixedCPUServerSocket>;
+ using sharded_service_t = seastar::sharded<ShardedServerSocket>;
std::unique_ptr<sharded_service_t> service;
};
using seastar::engine;
using seastar::future;
using crimson::net::error;
-using crimson::net::FixedCPUServerSocket;
using crimson::net::listen_ertr;
+using crimson::net::ShardedServerSocket;
using crimson::net::Socket;
using crimson::net::SocketRef;
using crimson::net::stop_t;
});
}
+template <bool IS_FIXED_CPU>
future<> test_bind_same() {
logger().info("test_bind_same()...");
- return FixedCPUServerSocket::create().then([](auto pss1) {
+ return ShardedServerSocket<IS_FIXED_CPU>::create().then([](auto pss1) {
auto saddr = get_server_addr();
return pss1->listen(saddr).safe_then([saddr] {
// try to bind the same address
- return FixedCPUServerSocket::create(
+ return ShardedServerSocket<IS_FIXED_CPU>::create(
).then([saddr](auto pss2) {
return pss2->listen(saddr).safe_then([] {
logger().error("test_bind_same() should raise address_in_use");
});
}
+template <bool IS_FIXED_CPU>
future<> test_accept() {
logger().info("test_accept()");
- return FixedCPUServerSocket::create(
+ return ShardedServerSocket<IS_FIXED_CPU>::create(
).then([](auto pss) {
auto saddr = get_server_addr();
return pss->listen(saddr
});
}
+template <bool IS_FIXED_CPU>
class SocketFactory {
static constexpr seastar::shard_id CLIENT_CPU = 0u;
SocketRef client_socket;
seastar::promise<> server_connected;
static constexpr seastar::shard_id SERVER_CPU = 1u;
- FixedCPUServerSocket *pss = nullptr;
+ ShardedServerSocket<IS_FIXED_CPU> *pss = nullptr;
+
+ seastar::shard_id server_socket_CPU;
SocketRef server_socket;
public:
auto psf = owner.get();
auto saddr = get_server_addr();
return seastar::smp::submit_to(SERVER_CPU, [psf, saddr] {
- return FixedCPUServerSocket::create(
+ return ShardedServerSocket<IS_FIXED_CPU>::create(
).then([psf, saddr](auto pss) {
psf->pss = pss;
return pss->listen(saddr
return psf->pss->accept([psf](auto socket, auto paddr) {
logger().info("dispatch_sockets(): accepted at shard {}",
seastar::this_shard_id());
- ceph_assert_always(SERVER_CPU == seastar::this_shard_id());
+ psf->server_socket_CPU = seastar::this_shard_id();
+ if constexpr (IS_FIXED_CPU) {
+ ceph_assert_always(SERVER_CPU == seastar::this_shard_id());
+ }
psf->server_socket = std::move(socket);
return seastar::smp::submit_to(CLIENT_CPU, [psf] {
psf->server_connected.set_value();
ceph_abort();
});
}),
- seastar::smp::submit_to(SERVER_CPU,
+ seastar::smp::submit_to(psf->server_socket_CPU,
[socket = psf->server_socket.get(), cb_server = std::move(cb_server)] {
return cb_server(socket).then([socket] {
logger().debug("closing server socket...");
}
};
+template <bool IS_FIXED_CPU>
future<> test_read_write() {
logger().info("test_read_write()...");
- return SocketFactory::dispatch_sockets(
+ return SocketFactory<IS_FIXED_CPU>::dispatch_sockets(
[](auto cs) { return Connection::dispatch_rw_bounded(cs, 128); },
[](auto ss) { return Connection::dispatch_rw_bounded(ss, 128); }
).then([] {
});
}
+template <bool IS_FIXED_CPU>
future<> test_unexpected_down() {
logger().info("test_unexpected_down()...");
- return SocketFactory::dispatch_sockets(
+ return SocketFactory<IS_FIXED_CPU>::dispatch_sockets(
[](auto cs) {
return Connection::dispatch_rw_bounded(cs, 128, true
).handle_exception_type([](const std::system_error& e) {
});
}
+template <bool IS_FIXED_CPU>
future<> test_shutdown_propagated() {
logger().info("test_shutdown_propagated()...");
- return SocketFactory::dispatch_sockets(
+ return SocketFactory<IS_FIXED_CPU>::dispatch_sockets(
[](auto cs) {
logger().debug("test_shutdown_propagated() shutdown client socket");
cs->shutdown();
});
}
+template <bool IS_FIXED_CPU>
future<> test_preemptive_down() {
logger().info("test_preemptive_down()...");
- return SocketFactory::dispatch_sockets(
+ return SocketFactory<IS_FIXED_CPU>::dispatch_sockets(
[](auto cs) { return Connection::dispatch_rw_unbounded(cs, true); },
[](auto ss) { return Connection::dispatch_rw_unbounded(ss); }
).then([] {
});
}
+template <bool IS_FIXED_CPU>
+future<> do_test_with_type() {
+ return test_bind_same<IS_FIXED_CPU>(
+ ).then([] {
+ return test_accept<IS_FIXED_CPU>();
+ }).then([] {
+ return test_read_write<IS_FIXED_CPU>();
+ }).then([] {
+ return test_unexpected_down<IS_FIXED_CPU>();
+ }).then([] {
+ return test_shutdown_propagated<IS_FIXED_CPU>();
+ }).then([] {
+ return test_preemptive_down<IS_FIXED_CPU>();
+ });
+}
+
}
seastar::future<int> do_test(seastar::app_template& app)
}).then([] {
return test_refused();
}).then([] {
- return test_bind_same();
- }).then([] {
- return test_accept();
- }).then([] {
- return test_read_write();
- }).then([] {
- return test_unexpected_down();
- }).then([] {
- return test_shutdown_propagated();
+ return do_test_with_type<true>();
}).then([] {
- return test_preemptive_down();
+ return do_test_with_type<false>();
}).then([] {
logger().info("All tests succeeded");
// Seastar has bugs to have events undispatched during shutdown,