set(crimson_net_srcs
net/Dispatcher.cc
net/Errors.cc
+ net/Messenger.cc
net/SocketConnection.cc
net/SocketMessenger.cc
net/Socket.cc)
thread/Throttle.cc)
add_library(crimson STATIC
${crimson_auth_srcs}
- ${crimson_mon_srcs}
+ # TODO: fix crimson_mon_client with the new design
+ # ${crimson_mon_srcs}
${crimson_net_srcs}
${crimson_thread_srcs}
${CMAKE_SOURCE_DIR}/src/common/buffer_seastar.cc)
#pragma once
#include <queue>
-#include <boost/smart_ptr/intrusive_ref_counter.hpp>
#include <seastar/core/future.hh>
+#include <seastar/core/shared_ptr.hh>
#include "Fwd.h"
using seq_num_t = uint64_t;
-class Connection : public boost::intrusive_ref_counter<Connection,
- boost::thread_unsafe_counter> {
+class Connection : public seastar::enable_shared_from_this<Connection> {
protected:
entity_addr_t peer_addr;
peer_type_t peer_type = -1;
virtual int get_peer_type() const = 0;
/// true if the handshake has completed and no errors have been encountered
- virtual bool is_connected() = 0;
+ virtual seastar::future<bool> is_connected() = 0;
/// send a message over a connection that has completed its handshake
virtual seastar::future<> send(MessageRef msg) = 0;
/// close the connection and cancel any any pending futures from read/send
virtual seastar::future<> close() = 0;
+ /// which shard id the connection lives
+ virtual seastar::shard_id shard_id() const = 0;
+
virtual void print(ostream& out) const = 0;
};
#pragma once
#include <seastar/core/future.hh>
+#include <seastar/core/sharded.hh>
#include "Fwd.h"
}
virtual seastar::future<std::unique_ptr<AuthAuthorizer>>
ms_get_authorizer(peer_type_t);
+
+ // get the local dispatcher shard if it is accessed by another core
+ virtual Dispatcher* get_local_shard() {
+ return this;
+ }
};
} // namespace ceph::net
#pragma once
-#include <boost/intrusive_ptr.hpp>
+#include <seastar/core/shared_ptr.hh>
+#include <seastar/core/sharded.hh>
#include "msg/msg_types.h"
#include "msg/Message.h"
using msgr_tag_t = uint8_t;
class Connection;
-using ConnectionRef = boost::intrusive_ptr<Connection>;
+using ConnectionRef = seastar::shared_ptr<Connection>;
+// NOTE: ConnectionXRef should only be used in seastar world, because
+// lw_shared_ptr<> is not safe to be accessed by unpinned alien threads.
+using ConnectionXRef = seastar::lw_shared_ptr<seastar::foreign_ptr<ConnectionRef>>;
class Dispatcher;
class Messenger;
+template <typename T, typename... Args>
+seastar::future<T*> create_sharded(Args... args) {
+ auto sharded_obj = seastar::make_lw_shared<seastar::sharded<T>>();
+ return sharded_obj->start(args...).then([sharded_obj]() {
+ auto ret = &sharded_obj->local();
+ seastar::engine().at_exit([sharded_obj]() {
+ return sharded_obj->stop().finally([sharded_obj] {});
+ });
+ return ret;
+ });
+}
+
} // namespace ceph::net
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "Messenger.h"
+#include "SocketMessenger.h"
+
+namespace ceph::net {
+
+seastar::future<Messenger*>
+Messenger::create(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce)
+{
+ return create_sharded<SocketMessenger>(name, lname, nonce)
+ .then([](Messenger *msgr) {
+ return msgr;
+ });
+}
+
+} // namespace ceph::net
const entity_name_t& get_myname() const { return my_name; }
const entity_addrvec_t& get_myaddrs() const { return my_addrs; }
entity_addr_t get_myaddr() const { return my_addrs.front(); }
- virtual void set_myaddrs(const entity_addrvec_t& addrs) {
+ virtual seastar::future<> set_myaddrs(const entity_addrvec_t& addrs) {
my_addrs = addrs;
+ return seastar::now();
}
/// bind to the given address
- virtual void bind(const entity_addrvec_t& addr) = 0;
+ virtual seastar::future<> bind(const entity_addrvec_t& addr) = 0;
/// try to bind to the first unused port of given address
- virtual void try_bind(const entity_addrvec_t& addr,
- uint32_t min_port, uint32_t max_port) = 0;
+ virtual seastar::future<> try_bind(const entity_addrvec_t& addr,
+ uint32_t min_port, uint32_t max_port) = 0;
/// start the messenger
virtual seastar::future<> start(Dispatcher *dispatcher) = 0;
/// either return an existing connection to the peer,
/// or a new pending connection
- virtual ConnectionRef connect(const entity_addr_t& peer_addr,
- const entity_type_t& peer_type) = 0;
+ virtual seastar::future<ConnectionXRef>
+ connect(const entity_addr_t& peer_addr,
+ const entity_type_t& peer_type) = 0;
+
+ // wait for messenger shutdown
+ virtual seastar::future<> wait() = 0;
/// stop listenening and wait for all connections to close. safe to destruct
/// after this future becomes available
crc_flags |= MSG_CRC_HEADER;
}
+ // get the local messenger shard if it is accessed by another core
+ virtual Messenger* get_local_shard() {
+ return this;
+ }
+
virtual void print(ostream& out) const = 0;
+
+ static seastar::future<Messenger*>
+ create(const entity_name_t& name, const std::string& lname, const uint64_t nonce);
};
inline ostream& operator<<(ostream& out, const Messenger& msgr) {
class Socket
{
+ const seastar::shard_id sid;
seastar::connected_socket socket;
seastar::input_stream<char> in;
seastar::output_stream<char> out;
public:
explicit Socket(seastar::connected_socket&& _socket)
- : socket(std::move(_socket)),
+ : sid{seastar::engine().cpu_id()},
+ socket(std::move(_socket)),
in(socket.input()),
out(socket.output()) {}
- Socket(Socket&& o) = default;
+ Socket(Socket&& o) = delete;
/// read the requested number of bytes into a bufferlist
seastar::future<bufferlist> read(size_t bytes);
/// Socket can only be closed once.
seastar::future<> close() {
- return seastar::when_all(in.close(), out.close()).discard_result();
+ return seastar::smp::submit_to(sid, [this] {
+ return seastar::when_all(
+ in.close(), out.close()).discard_result();
+ });
}
};
dispatcher(dispatcher),
send_ready(h.promise.get_future())
{
+ ceph_assert(&messenger.container().local() == &messenger);
}
SocketConnection::~SocketConnection()
return &messenger;
}
-bool SocketConnection::is_connected()
+seastar::future<bool> SocketConnection::is_connected()
{
- return !send_ready.failed();
+ return seastar::smp::submit_to(shard_id(), [this] {
+ return !send_ready.failed();
+ });
}
seastar::future<> SocketConnection::send(MessageRef msg)
{
- if (state == state_t::closing)
- return seastar::now();
- return seastar::with_gate(pending_dispatch, [this, msg=std::move(msg)] {
- return do_send(std::move(msg))
- .handle_exception([this] (std::exception_ptr eptr) {
- logger().warn("{} send fault: {}", *this, eptr);
- close();
+ return seastar::smp::submit_to(shard_id(), [this, msg=std::move(msg)] {
+ if (state == state_t::closing)
+ return seastar::now();
+ return seastar::with_gate(pending_dispatch, [this, msg=std::move(msg)] {
+ return do_send(std::move(msg))
+ .handle_exception([this] (std::exception_ptr eptr) {
+ logger().warn("{} send fault: {}", *this, eptr);
+ close();
+ });
});
});
}
seastar::future<> SocketConnection::keepalive()
{
- if (state == state_t::closing)
- return seastar::now();
- return seastar::with_gate(pending_dispatch, [this] {
- return do_keepalive()
- .handle_exception([this] (std::exception_ptr eptr) {
- logger().warn("{} keepalive fault: {}", *this, eptr);
- close();
+ return seastar::smp::submit_to(shard_id(), [this] {
+ if (state == state_t::closing)
+ return seastar::now();
+ return seastar::with_gate(pending_dispatch, [this] {
+ return do_keepalive()
+ .handle_exception([this] (std::exception_ptr eptr) {
+ logger().warn("{} keepalive fault: {}", *this, eptr);
+ close();
+ });
});
});
}
+seastar::future<> SocketConnection::close()
+{
+ return seastar::smp::submit_to(shard_id(), [this] {
+ return do_close();
+ });
+}
+
seastar::future<> SocketConnection::handle_tags()
{
return seastar::keep_doing([this] {
}
constexpr bool add_ref = false; // Message starts with 1 ref
+ // TODO: change MessageRef with foreign_ptr
auto msg_ref = MessageRef{msg, add_ref};
// start dispatch, ignoring exceptions from the application layer
seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] {
- return dispatcher.ms_dispatch(this, std::move(msg))
+ return dispatcher.ms_dispatch(
+ seastar::static_pointer_cast<SocketConnection>(shared_from_this()),
+ std::move(msg))
.handle_exception([this] (std::exception_ptr eptr) {
logger().error("{} ms_dispatch caught exception: {}", *this, eptr);
ceph_assert(false);
return f.get_future();
}
-seastar::future<> SocketConnection::close()
+seastar::future<> SocketConnection::do_close()
{
if (state == state_t::closing) {
// already closing
}
// unregister_conn() drops a reference, so hold another until completion
- auto cleanup = [conn = SocketConnectionRef(this)] {};
+ auto cleanup = [conn_ref = shared_from_this(), this] {
+ logger().debug("{} closed!", *this);
+ };
if (state == state_t::accepting) {
- messenger.unaccept_conn(this);
+ messenger.unaccept_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
} else if (state >= state_t::connecting && state < state_t::closing) {
- messenger.unregister_conn(this);
+ messenger.unregister_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
} else {
// cannot happen
ceph_assert(false);
ceph_assert(!socket);
peer_addr = _peer_addr;
peer_type = _peer_type;
- messenger.register_conn(this);
+ messenger.register_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
logger().debug("{} trigger connecting, was {}", *this, static_cast<int>(state));
state = state_t::connecting;
seastar::with_gate(pending_dispatch, [this] {
fd.shutdown_output();
throw std::system_error(make_error_code(error::connection_aborted));
}
- socket.emplace(std::move(fd));
+ socket = seastar::make_foreign(std::make_unique<Socket>(std::move(fd)));
// read server's handshake header
return socket->read(server_header_size);
}).then([this] (bufferlist headerbl) {
side = side_t::connector;
socket_port = caddr.get_port();
- messenger.learned_addr(caddr);
-
+ return messenger.learned_addr(caddr);
+ }).then([this] {
// encode/send client's handshake header
bufferlist bl;
bl.append(buffer::create_static(banner_size, banner));
});
}).then([this] {
// notify the dispatcher and allow them to reject the connection
- return dispatcher.ms_handle_connect(this);
+ return dispatcher.ms_handle_connect(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
}).then([this] {
execute_open();
}).handle_exception([this] (std::exception_ptr eptr) {
}
void
-SocketConnection::start_accept(seastar::connected_socket&& fd,
+SocketConnection::start_accept(seastar::foreign_ptr<std::unique_ptr<Socket>>&& sock,
const entity_addr_t& _peer_addr)
{
ceph_assert(state == state_t::none);
peer_addr.set_port(0);
side = side_t::acceptor;
socket_port = _peer_addr.get_port();
- socket.emplace(std::move(fd));
- messenger.accept_conn(this);
+ socket = std::move(sock);
+ messenger.accept_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
logger().debug("{} trigger accepting, was {}", *this, static_cast<int>(state));
state = state_t::accepting;
seastar::with_gate(pending_dispatch, [this, _peer_addr] {
});
}).then([this] {
// notify the dispatcher and allow them to reject the connection
- return dispatcher.ms_handle_accept(this);
+ return dispatcher.ms_handle_accept(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
}).then([this] {
- messenger.register_conn(this);
- messenger.unaccept_conn(this);
+ messenger.register_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
+ messenger.unaccept_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
execute_open();
}).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in the accepting state
logger().warn("{} open fault: {}", *this, e);
if (e.code() == error::connection_aborted ||
e.code() == error::connection_reset) {
- return dispatcher.ms_handle_reset(this)
+ return dispatcher.ms_handle_reset(seastar::static_pointer_cast<SocketConnection>(shared_from_this()))
.then([this] {
close();
});
} else if (e.code() == error::read_eof) {
- return dispatcher.ms_handle_remote_reset(this)
+ return dispatcher.ms_handle_remote_reset(seastar::static_pointer_cast<SocketConnection>(shared_from_this()))
.then([this] {
close();
});
seastar::future<> SocketConnection::fault()
{
if (policy.lossy) {
- messenger.unregister_conn(this);
+ messenger.unregister_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
}
if (h.backoff.count()) {
h.backoff += h.backoff;
return seastar::sleep(h.backoff);
}
+seastar::shard_id SocketConnection::shard_id() const {
+ return messenger.shard_id();
+}
+
void SocketConnection::print(ostream& out) const {
messenger.print(out);
if (side == side_t::none) {
#include <seastar/core/gate.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/shared_future.hh>
+#include <seastar/core/sharded.hh>
#include "msg/Policy.h"
#include "Connection.h"
class SocketMessenger;
class SocketConnection;
-using SocketConnectionRef = boost::intrusive_ptr<SocketConnection>;
+using SocketConnectionRef = seastar::shared_ptr<SocketConnection>;
class SocketConnection : public Connection {
SocketMessenger& messenger;
- std::optional<Socket> socket;
+ seastar::foreign_ptr<std::unique_ptr<Socket>> socket;
Dispatcher& dispatcher;
seastar::gate pending_dispatch;
seastar::future<> do_send(MessageRef msg);
seastar::future<> do_keepalive();
+ seastar::future<> do_close();
public:
SocketConnection(SocketMessenger& messenger,
return peer_type;
}
- bool is_connected() override;
+ seastar::future<bool> is_connected() override;
seastar::future<> send(MessageRef msg) override;
seastar::future<> close() override;
+ seastar::shard_id shard_id() const override;
+
void print(ostream& out) const override;
public:
const entity_type_t& peer_type);
/// start a handshake from the server's perspective,
/// only call when SocketConnection first construct
- void start_accept(seastar::connected_socket&& socket,
+ void start_accept(seastar::foreign_ptr<std::unique_ptr<Socket>>&& socket,
const entity_addr_t& peer_addr);
/// the number of connections initiated in this session, increment when a
#include "SocketMessenger.h"
#include <tuple>
+#include <boost/functional/hash.hpp>
#include "auth/Auth.h"
#include "Errors.h"
#include "Dispatcher.h"
+#include "Socket.h"
using namespace ceph::net;
SocketMessenger::SocketMessenger(const entity_name_t& myname,
const std::string& logic_name,
uint32_t nonce)
- : Messenger{myname}, logic_name{logic_name}, nonce{nonce}
+ : Messenger{myname},
+ sid{seastar::engine().cpu_id()},
+ logic_name{logic_name},
+ nonce{nonce}
{}
-void SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs)
+seastar::future<> SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs)
{
auto my_addrs = addrs;
for (auto& addr : my_addrs.v) {
addr.nonce = nonce;
}
- // TODO: propagate to all the cores of the Messenger
- Messenger::set_myaddrs(my_addrs);
+ return container().invoke_on_all([my_addrs](auto& msgr) {
+ return msgr.Messenger::set_myaddrs(my_addrs);
+ });
}
-void SocketMessenger::bind(const entity_addrvec_t& addrs)
+seastar::future<> SocketMessenger::bind(const entity_addrvec_t& addrs)
{
- // TODO: v2: listen on multiple addresses
- auto addr = addrs.legacy_addr();
-
- if (addr.get_family() != AF_INET) {
- throw std::system_error(EAFNOSUPPORT, std::generic_category());
+ ceph_assert(addrs.legacy_addr().get_family() == AF_INET);
+ auto my_addrs = addrs;
+ for (auto& addr : my_addrs.v) {
+ addr.nonce = nonce;
}
-
- set_myaddrs(addrs);
- logger().info("listening on {}", addr);
- seastar::socket_address address(addr.in4_addr());
- seastar::listen_options lo;
- lo.reuse_address = true;
- listener = seastar::listen(address, lo);
+ logger().info("listening on {}", my_addrs.legacy_addr().in4_addr());
+ return container().invoke_on_all([my_addrs](auto& msgr) {
+ msgr.do_bind(my_addrs);
+ });
}
-void SocketMessenger::try_bind(const entity_addrvec_t& addrs,
- uint32_t min_port, uint32_t max_port)
+seastar::future<>
+SocketMessenger::try_bind(const entity_addrvec_t& addrs,
+ uint32_t min_port, uint32_t max_port)
{
auto addr = addrs.legacy_or_front_addr();
if (addr.get_port() != 0) {
return bind(addrs);
}
- for (auto port = min_port; port <= max_port; port++) {
- try {
- addr.set_port(port);
- bind(entity_addrvec_t{addr});
- logger().info("{}: try_bind: done", *this);
- return;
- } catch (const std::system_error& e) {
- logger().debug("{}: try_bind: {} already used", *this, port);
- if (port == max_port) {
- throw;
- }
- }
- }
+ ceph_assert(min_port <= max_port);
+ return seastar::do_with(uint32_t(min_port),
+ [this, max_port, addr] (auto& port) {
+ return seastar::repeat([this, max_port, addr, &port] {
+ auto to_bind = addr;
+ to_bind.set_port(port);
+ return bind(entity_addrvec_t{to_bind})
+ .then([this] {
+ logger().info("{}: try_bind: done", *this);
+ return stop_t::yes;
+ }).handle_exception_type([this, max_port, &port] (const std::system_error& e) {
+ logger().debug("{}: try_bind: {} already used", *this, port);
+ if (port == max_port) {
+ throw e;
+ }
+ ++port;
+ return stop_t::no;
+ });
+ });
+ });
+}
+
+seastar::future<> SocketMessenger::start(Dispatcher *disp) {
+ return container().invoke_on_all([disp](auto& msgr) {
+ return msgr.do_start(disp->get_local_shard());
+ });
+}
+
+seastar::future<ceph::net::ConnectionXRef>
+SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type)
+{
+ auto shard = locate_shard(peer_addr);
+ return container().invoke_on(shard, [peer_addr, peer_type](auto& msgr) {
+ return msgr.do_connect(peer_addr, peer_type);
+ }).then([](seastar::foreign_ptr<ConnectionRef>&& conn) {
+ return seastar::make_lw_shared<seastar::foreign_ptr<ConnectionRef>>(std::move(conn));
+ });
}
-seastar::future<> SocketMessenger::start(Dispatcher *disp)
+seastar::future<> SocketMessenger::shutdown()
+{
+ return container().invoke_on_all([](auto& msgr) {
+ return msgr.do_shutdown();
+ }).finally([this] {
+ return container().invoke_on_all([](auto& msgr) {
+ msgr.shutdown_promise.set_value();
+ });
+ });
+}
+
+void SocketMessenger::do_bind(const entity_addrvec_t& addrs)
+{
+ Messenger::set_myaddrs(addrs);
+
+ // TODO: v2: listen on multiple addresses
+ seastar::socket_address address(addrs.legacy_addr().in4_addr());
+ seastar::listen_options lo;
+ lo.reuse_address = true;
+ listener = seastar::listen(address, lo);
+}
+
+seastar::future<> SocketMessenger::do_start(Dispatcher *disp)
{
dispatcher = disp;
// allocate the connection
entity_addr_t peer_addr;
peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
- SocketConnectionRef conn = new SocketConnection(*this, *dispatcher);
+ auto shard = locate_shard(peer_addr);
+#warning fixme
+ // we currently do dangerous i/o from a Connection core, different from the Socket core.
+ auto sock = seastar::make_foreign(std::make_unique<Socket>(std::move(socket)));
// don't wait before accepting another
- conn->start_accept(std::move(socket), peer_addr);
+ container().invoke_on(shard, [sock = std::move(sock), peer_addr, this](auto& msgr) mutable {
+ SocketConnectionRef conn = seastar::make_shared<SocketConnection>(msgr, *msgr.dispatcher);
+ conn->start_accept(std::move(sock), peer_addr);
+ });
});
}).handle_exception_type([this] (const std::system_error& e) {
// stop gracefully on connection_aborted
return seastar::now();
}
-ceph::net::ConnectionRef
-SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type)
+seastar::foreign_ptr<ceph::net::ConnectionRef>
+SocketMessenger::do_connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type)
{
if (auto found = lookup_conn(peer_addr); found) {
- return found;
+ return seastar::make_foreign(found->shared_from_this());
}
- SocketConnectionRef conn = new SocketConnection(*this, *dispatcher);
+ SocketConnectionRef conn = seastar::make_shared<SocketConnection>(*this, *dispatcher);
conn->start_connect(peer_addr, peer_type);
- return conn;
+ return seastar::make_foreign(conn->shared_from_this());
}
-seastar::future<> SocketMessenger::shutdown()
+seastar::future<> SocketMessenger::do_shutdown()
{
if (listener) {
listener->abort_accept();
});
}
-void SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
+seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
{
if (!get_myaddr().is_blank_ip()) {
// already learned or binded
- return;
+ return seastar::now();
}
// Only learn IP address if blank.
addr.u = peer_addr_for_me.u;
addr.set_type(peer_addr_for_me.get_type());
addr.set_port(get_myaddr().get_port());
- set_myaddrs(entity_addrvec_t{addr});
+ return set_myaddrs(entity_addrvec_t{addr});
}
void SocketMessenger::set_default_policy(const SocketPolicy& p)
policy_set.set_throttlers(peer_type, throttle, nullptr);
}
+seastar::shard_id SocketMessenger::locate_shard(const entity_addr_t& addr)
+{
+ ceph_assert(addr.get_family() == AF_INET);
+ std::size_t seed = 0;
+ boost::hash_combine(seed, addr.u.sin.sin_addr.s_addr);
+ //boost::hash_combine(seed, addr.u.sin.sin_port);
+ //boost::hash_combine(seed, addr.nonce);
+ return seed % seastar::smp::count;
+}
+
ceph::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr)
{
if (auto found = connections.find(addr);
#include <set>
#include <seastar/core/gate.hh>
#include <seastar/core/reactor.hh>
+#include <seastar/core/sharded.hh>
#include "msg/Policy.h"
#include "Messenger.h"
using SocketPolicy = ceph::net::Policy<ceph::thread::Throttle>;
-class SocketMessenger final : public Messenger {
+class SocketMessenger final : public Messenger, public seastar::peering_sharded_service<SocketMessenger> {
+ const seastar::shard_id sid;
+ seastar::promise<> shutdown_promise;
+
std::optional<seastar::server_socket> listener;
Dispatcher *dispatcher = nullptr;
std::map<entity_addr_t, SocketConnectionRef> connections;
seastar::future<> accept(seastar::connected_socket socket,
seastar::socket_address paddr);
+ void do_bind(const entity_addrvec_t& addr);
+ seastar::future<> do_start(Dispatcher *disp);
+ seastar::foreign_ptr<ConnectionRef> do_connect(const entity_addr_t& peer_addr,
+ const entity_type_t& peer_type);
+ seastar::future<> do_shutdown();
+ // conn sharding options:
+ // 1. Simplest: sharded by ip only
+ // 2. Balanced: sharded by ip + port + nonce,
+ // but, need to move SocketConnection between cores.
+ seastar::shard_id locate_shard(const entity_addr_t& addr);
+
public:
SocketMessenger(const entity_name_t& myname,
const std::string& logic_name,
uint32_t nonce);
- void set_myaddrs(const entity_addrvec_t& addr) override;
+ seastar::future<> set_myaddrs(const entity_addrvec_t& addr) override;
- void bind(const entity_addrvec_t& addr) override;
+ // Messenger interfaces are assumed to be called from its own shard, but its
+ // behavior should be symmetric when called from any shard.
+ seastar::future<> bind(const entity_addrvec_t& addr) override;
- void try_bind(const entity_addrvec_t& addr,
- uint32_t min_port, uint32_t max_port) override;
+ seastar::future<> try_bind(const entity_addrvec_t& addr,
+ uint32_t min_port, uint32_t max_port) override;
seastar::future<> start(Dispatcher *dispatcher) override;
- ConnectionRef connect(const entity_addr_t& peer_addr,
- const entity_type_t& peer_type) override;
+ seastar::future<ConnectionXRef> connect(const entity_addr_t& peer_addr,
+ const entity_type_t& peer_type) override;
+ // can only wait once
+ seastar::future<> wait() override {
+ return shutdown_promise.get_future();
+ }
seastar::future<> shutdown() override;
+ Messenger* get_local_shard() override {
+ return &container().local();
+ }
+
void print(ostream& out) const override {
out << get_myname()
<< "(" << logic_name
}
public:
- void learned_addr(const entity_addr_t &peer_addr_for_me);
+ seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me);
void set_default_policy(const SocketPolicy& p);
void set_policy(entity_type_t peer_type, const SocketPolicy& p);
void set_policy_throttler(entity_type_t peer_type, Throttle* throttle);
void unaccept_conn(SocketConnectionRef);
void register_conn(SocketConnectionRef);
void unregister_conn(SocketConnectionRef);
+
+ // required by sharded<>
+ seastar::future<> stop() {
+ return seastar::make_ready_future<>();
+ }
+
+ seastar::shard_id shard_id() const {
+ return sid;
+ }
};
} // namespace ceph::net
target_link_libraries(unittest_seastar_denc crimson GTest::Main)
add_executable(unittest_seastar_messenger test_messenger.cc)
-#add_ceph_unittest(unittest_seastar_messenger)
+add_ceph_unittest(unittest_seastar_messenger)
target_link_libraries(unittest_seastar_messenger ceph-common crimson)
-add_executable(unittest_seastar_echo
- test_alien_echo.cc)
-target_link_libraries(unittest_seastar_echo ceph-common global crimson)
+# TODO: fix unittest_seastar_echo with the new design
+#add_executable(unittest_seastar_echo
+# test_alien_echo.cc)
+#target_link_libraries(unittest_seastar_echo ceph-common global crimson)
add_executable(unittest_seastar_thread_pool
test_thread_pool.cc)
test_config.cc)
target_link_libraries(unittest_seastar_config crimson)
-add_executable(unittest_seastar_monc
- test_monc.cc)
-target_link_libraries(unittest_seastar_monc crimson)
+# TODO: fix unittest_seastar_monc with the new design
+#add_executable(unittest_seastar_monc
+# test_monc.cc)
+#target_link_libraries(unittest_seastar_monc crimson)
add_executable(unittest_seastar_perfcounters
test_perfcounters.cc)
#include "messages/MPing.h"
+#include "crimson/common/log.h"
#include "crimson/net/Connection.h"
#include "crimson/net/Dispatcher.h"
-#include "crimson/net/SocketMessenger.h"
+#include "crimson/net/Messenger.h"
+#include <map>
#include <random>
#include <boost/program_options.hpp>
#include <seastar/core/app-template.hh>
+#include <seastar/core/do_with.hh>
#include <seastar/core/future-util.hh>
#include <seastar/core/reactor.hh>
+#include <seastar/core/sleep.hh>
namespace bpo = boost::program_options;
+namespace {
+
+seastar::logger& logger() {
+ return ceph::get_logger(ceph_subsys_ms);
+}
+
static std::random_device rd;
static std::default_random_engine rng{rd()};
static bool verbose = false;
double keepalive_ratio)
{
struct test_state {
- entity_addr_t addr;
-
- struct {
- ceph::net::SocketMessenger messenger{entity_name_t::OSD(1), "server1", 1};
- struct ServerDispatcher : ceph::net::Dispatcher {
- seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
- MessageRef m) override {
- if (verbose) {
- std::cout << "server got " << *m << std::endl;
- }
- // reply with a pong
- return c->send(MessageRef{new MPing(), false});
+ struct Server final
+ : public ceph::net::Dispatcher,
+ public seastar::peering_sharded_service<Server> {
+ ceph::net::Messenger *msgr = nullptr;
+
+ Dispatcher* get_local_shard() override {
+ return &(container().local());
+ }
+ seastar::future<> stop() {
+ return seastar::make_ready_future<>();
+ }
+ seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+ MessageRef m) override {
+ if (verbose) {
+ logger().info("server got {}", *m);
}
- } dispatcher;
- } server;
+ // reply with a pong
+ return c->send(MessageRef{new MPing(), false});
+ }
- struct {
- unsigned rounds;
- std::bernoulli_distribution keepalive_dist{};
- ceph::net::SocketMessenger messenger{entity_name_t::OSD(0), "client1", 2};
- struct ClientDispatcher : ceph::net::Dispatcher {
- seastar::promise<MessageRef> reply;
+ seastar::future<> init(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce,
+ const entity_addr_t& addr) {
+ auto&& fut = ceph::net::Messenger::create(name, lname, nonce);
+ return fut.then([this, addr](ceph::net::Messenger *messenger) {
+ return container().invoke_on_all([messenger](auto& server) {
+ server.msgr = messenger->get_local_shard();
+ }).then([messenger, addr] {
+ return messenger->bind(entity_addrvec_t{addr});
+ }).then([this, messenger] {
+ return messenger->start(this);
+ });
+ });
+ }
+ seastar::future<> shutdown() {
+ ceph_assert(msgr);
+ return msgr->shutdown();
+ }
+ };
+
+ struct Client final
+ : public ceph::net::Dispatcher,
+ public seastar::peering_sharded_service<Client> {
+
+ struct PingSession : public seastar::enable_shared_from_this<PingSession> {
unsigned count = 0u;
- seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
- MessageRef m) override {
- ++count;
- if (verbose) {
- std::cout << "client ms_dispatch " << count << std::endl;
- }
- reply.set_value(std::move(m));
+ };
+ using PingSessionRef = seastar::shared_ptr<PingSession>;
+
+ unsigned rounds;
+ std::bernoulli_distribution keepalive_dist;
+ ceph::net::Messenger *msgr = nullptr;
+ std::map<ceph::net::Connection*, seastar::promise<>> pending_conns;
+ std::map<ceph::net::ConnectionRef, PingSessionRef> sessions;
+
+ Client(unsigned rounds, double keepalive_ratio)
+ : rounds(rounds),
+ keepalive_dist(std::bernoulli_distribution{keepalive_ratio}) {}
+ Dispatcher* get_local_shard() override {
+ return &(container().local());
+ }
+ seastar::future<> stop() {
+ return seastar::now();
+ }
+ seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override {
+ logger().info("{}: connected to {}", *conn, conn->get_peer_addr());
+ auto session = seastar::make_shared<PingSession>();
+ auto [i, added] = sessions.emplace(conn, session);
+ std::ignore = i;
+ ceph_assert(added);
+ return container().invoke_on_all([conn = conn.get()](auto& client) {
+ auto [i, added] = client.pending_conns.emplace(conn, seastar::promise<>());
+ std::ignore = i;
+ ceph_assert(added);
+ });
+ }
+ seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+ MessageRef m) override {
+ auto found = sessions.find(c);
+ if (found == sessions.end()) {
+ ceph_assert(false);
+ }
+ auto session = found->second;
+ ++(session->count);
+ if (verbose) {
+ logger().info("client ms_dispatch {}", session->count);
+ }
+
+ if (session->count == rounds) {
+ logger().info("{}: finished receiving {} pongs", *c.get(), session->count);
+ return container().invoke_on_all([conn = c.get()](auto &client) {
+ auto found = client.pending_conns.find(conn);
+ ceph_assert(found != client.pending_conns.end());
+ found->second.set_value();
+ });
+ } else {
return seastar::now();
}
- } dispatcher;
- seastar::future<> pingpong(ceph::net::ConnectionRef c) {
- return seastar::repeat([conn=std::move(c), this] {
- if (keepalive_dist(rng)) {
- return conn->keepalive().then([] {
- return seastar::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::no);
+ }
+
+ seastar::future<> init(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce) {
+ return ceph::net::Messenger::create(name, lname, nonce)
+ .then([this](ceph::net::Messenger *messenger) {
+ return container().invoke_on_all([messenger](auto& client) {
+ client.msgr = messenger->get_local_shard();
+ }).then([this, messenger] {
+ return messenger->start(this);
});
- } else {
- return conn->send(MessageRef{new MPing(), false}).then([&] {
- return dispatcher.reply.get_future();
- }).then([&] (MessageRef msg) {
- dispatcher.reply = seastar::promise<MessageRef>{};
- if (verbose) {
- std::cout << "client got reply " << *msg << std::endl;
- }
- return seastar::make_ready_future<seastar::stop_iteration>(
- seastar::stop_iteration::yes);
- });
- };
- });
+ });
}
- bool done() const {
- return dispatcher.count >= rounds;
+
+ seastar::future<> shutdown() {
+ ceph_assert(msgr);
+ return msgr->shutdown();
}
- } client;
- };
- return seastar::do_with(test_state{},
- [rounds, keepalive_ratio] (test_state& t) {
- // bind the server
- t.addr.set_type(entity_addr_t::TYPE_LEGACY);
- t.addr.set_family(AF_INET);
- t.addr.set_port(9010);
- t.addr.set_nonce(1);
- t.server.messenger.bind(entity_addrvec_t{t.addr});
-
- t.client.rounds = rounds;
- t.client.keepalive_dist = std::bernoulli_distribution{keepalive_ratio};
-
- return t.server.messenger.start(&t.server.dispatcher)
- .then([&] {
- return t.client.messenger.start(&t.client.dispatcher)
- .then([&] {
- return t.client.messenger.connect(t.addr,
- entity_name_t::TYPE_OSD);
- }).then([&client=t.client] (ceph::net::ConnectionRef conn) {
- if (verbose) {
- std::cout << "client connected" << std::endl;
- }
- return seastar::repeat([&client,conn=std::move(conn)] {
- return client.pingpong(conn).then([&client] {
- return seastar::make_ready_future<seastar::stop_iteration>(
- client.done() ?
- seastar::stop_iteration::yes :
- seastar::stop_iteration::no);
- });
+
+ seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr, bool foreign_dispatch=true) {
+ return msgr->connect(peer_addr, entity_name_t::TYPE_OSD)
+ .then([this, foreign_dispatch](auto conn) {
+ if (foreign_dispatch) {
+ return do_dispatch_pingpong(&**conn)
+ .finally([this, conn] {});
+ } else {
+ // NOTE: this could be faster if we don't switch cores in do_dispatch_pingpong().
+ return container().invoke_on(conn->get()->shard_id(), [conn = &**conn](auto &client) {
+ return client.do_dispatch_pingpong(conn);
+ }).finally([this, conn] {});
+ }
+ });
+ }
+
+ private:
+ seastar::future<> do_dispatch_pingpong(ceph::net::Connection* conn) {
+ return seastar::do_with(0u, 0u,
+ [this, conn](auto &count_ping, auto &count_keepalive) {
+ return seastar::do_until(
+ [this, conn, &count_ping, &count_keepalive] {
+ bool stop = (count_ping == rounds);
+ if (stop) {
+ logger().info("{}: finished sending {} pings with {} keepalives",
+ *conn, count_ping, count_keepalive);
+ }
+ return stop;
+ },
+ [this, conn, &count_ping, &count_keepalive] {
+ return seastar::repeat([this, conn, &count_ping, &count_keepalive] {
+ if (keepalive_dist(rng)) {
+ count_keepalive += 1;
+ return conn->keepalive()
+ .then([&count_keepalive] {
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::no);
+ });
+ } else {
+ count_ping += 1;
+ return conn->send(MessageRef{new MPing(), false})
+ .then([] {
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::yes);
+ });
+ }
+ });
+ }).then([this, conn] {
+ auto found = pending_conns.find(conn);
+ if (found == pending_conns.end())
+ throw std::runtime_error{"Not connected."};
+ return found->second.get_future();
});
- }).finally([&] {
- if (verbose) {
- std::cout << "client shutting down" << std::endl;
- }
- return t.client.messenger.shutdown();
- });
- }).finally([&] {
- if (verbose) {
- std::cout << "server shutting down" << std::endl;
- }
- return t.server.messenger.shutdown();
+ });
+ }
+ };
+ };
+
+ logger().info("test_echo():");
+ return seastar::when_all_succeed(
+ ceph::net::create_sharded<test_state::Server>(),
+ ceph::net::create_sharded<test_state::Server>(),
+ ceph::net::create_sharded<test_state::Client>(rounds, keepalive_ratio),
+ ceph::net::create_sharded<test_state::Client>(rounds, keepalive_ratio))
+ .then([rounds, keepalive_ratio](test_state::Server *server1,
+ test_state::Server *server2,
+ test_state::Client *client1,
+ test_state::Client *client2) {
+ // start servers and clients
+ entity_addr_t addr1;
+ addr1.parse("127.0.0.1:9010", nullptr);
+ addr1.set_type(entity_addr_t::TYPE_LEGACY);
+ entity_addr_t addr2;
+ addr2.parse("127.0.0.1:9011", nullptr);
+ addr2.set_type(entity_addr_t::TYPE_LEGACY);
+ return seastar::when_all_succeed(
+ server1->init(entity_name_t::OSD(0), "server1", 1, addr1),
+ server2->init(entity_name_t::OSD(1), "server2", 2, addr2),
+ client1->init(entity_name_t::OSD(2), "client1", 3),
+ client2->init(entity_name_t::OSD(3), "client2", 4))
+ // dispatch pingpoing
+ .then([client1, client2, server1, server2] {
+ return seastar::when_all_succeed(
+ // test connecting in parallel, accepting in parallel,
+ // and operating the connection reference from a foreign/local core
+ client1->dispatch_pingpong(server1->msgr->get_myaddr(), true),
+ client1->dispatch_pingpong(server2->msgr->get_myaddr(), false),
+ client2->dispatch_pingpong(server1->msgr->get_myaddr(), false),
+ client2->dispatch_pingpong(server2->msgr->get_myaddr(), true));
+ // shutdown
+ }).finally([client1] {
+ logger().info("client1 shutdown...");
+ return client1->shutdown();
+ }).finally([client2] {
+ logger().info("client2 shutdown...");
+ return client2->shutdown();
+ }).finally([server1] {
+ logger().info("server1 shutdown...");
+ return server1->shutdown();
+ }).finally([server2] {
+ logger().info("server2 shutdown...");
+ return server2->shutdown();
});
});
}
static seastar::future<> test_concurrent_dispatch()
{
struct test_state {
- entity_addr_t addr;
-
- struct {
- ceph::net::SocketMessenger messenger{entity_name_t::OSD(1), "server2", 3};
- class ServerDispatcher : public ceph::net::Dispatcher {
- int count = 0;
- seastar::promise<> on_second; // satisfied on second dispatch
- seastar::promise<> on_done; // satisfied when first dispatch unblocks
- public:
- seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
- MessageRef m) override {
- switch (++count) {
- case 1:
- // block on the first request until we reenter with the second
- return on_second.get_future().then([=] { on_done.set_value(); });
- case 2:
- on_second.set_value();
- return seastar::now();
- default:
- throw std::runtime_error("unexpected count");
- }
+ struct Server final
+ : public ceph::net::Dispatcher,
+ public seastar::peering_sharded_service<Server> {
+ ceph::net::Messenger *msgr = nullptr;
+ int count = 0;
+ seastar::promise<> on_second; // satisfied on second dispatch
+ seastar::promise<> on_done; // satisfied when first dispatch unblocks
+
+ seastar::future<> ms_dispatch(ceph::net::ConnectionRef c,
+ MessageRef m) override {
+ switch (++count) {
+ case 1:
+ // block on the first request until we reenter with the second
+ return on_second.get_future()
+ .then([this] {
+ return container().invoke_on_all([](Server& server) {
+ server.on_done.set_value();
+ });
+ });
+ case 2:
+ on_second.set_value();
+ return seastar::now();
+ default:
+ throw std::runtime_error("unexpected count");
}
- seastar::future<> wait() { return on_done.get_future(); }
- } dispatcher;
- } server;
-
- struct {
- ceph::net::SocketMessenger messenger{entity_name_t::OSD(0), "client2", 4};
- ceph::net::Dispatcher dispatcher;
- } client;
+ }
+
+ seastar::future<> wait() { return on_done.get_future(); }
+
+ seastar::future<> init(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce,
+ const entity_addr_t& addr) {
+ return ceph::net::Messenger::create(name, lname, nonce)
+ .then([this, addr](ceph::net::Messenger *messenger) {
+ return container().invoke_on_all([messenger](auto& server) {
+ server.msgr = messenger->get_local_shard();
+ }).then([messenger, addr] {
+ return messenger->bind(entity_addrvec_t{addr});
+ }).then([this, messenger] {
+ return messenger->start(this);
+ });
+ });
+ }
+
+ Dispatcher* get_local_shard() override {
+ return &(container().local());
+ }
+ seastar::future<> stop() {
+ return seastar::make_ready_future<>();
+ }
+ };
+
+ struct Client final
+ : public ceph::net::Dispatcher,
+ public seastar::peering_sharded_service<Client> {
+ ceph::net::Messenger *msgr = nullptr;
+
+ seastar::future<> init(const entity_name_t& name,
+ const std::string& lname,
+ const uint64_t nonce) {
+ return ceph::net::Messenger::create(name, lname, nonce)
+ .then([this](ceph::net::Messenger *messenger) {
+ return container().invoke_on_all([messenger](auto& client) {
+ client.msgr = messenger->get_local_shard();
+ }).then([this, messenger] {
+ return messenger->start(this);
+ });
+ });
+ }
+
+ Dispatcher* get_local_shard() override {
+ return &(container().local());
+ }
+ seastar::future<> stop() {
+ return seastar::make_ready_future<>();
+ }
+ };
};
- return seastar::do_with(test_state{},
- [] (test_state& t) {
- // bind the server
- t.addr.set_type(entity_addr_t::TYPE_LEGACY);
- t.addr.set_family(AF_INET);
- t.addr.set_port(9010);
- t.addr.set_nonce(3);
- t.server.messenger.bind(entity_addrvec_t{t.addr});
-
- return t.server.messenger.start(&t.server.dispatcher)
- .then([&] {
- return t.client.messenger.start(&t.client.dispatcher)
- .then([&] {
- return t.client.messenger.connect(t.addr,
- entity_name_t::TYPE_OSD);
- }).then([] (ceph::net::ConnectionRef conn) {
- // send two messages
- conn->send(MessageRef{new MPing, false});
- conn->send(MessageRef{new MPing, false});
- }).then([&] {
- // wait for the server to get both
- return t.server.dispatcher.wait();
- }).finally([&] {
- return t.client.messenger.shutdown();
- });
- }).finally([&] {
- return t.server.messenger.shutdown();
+
+ logger().info("test_concurrent_dispatch():");
+ return seastar::when_all_succeed(
+ ceph::net::create_sharded<test_state::Server>(),
+ ceph::net::create_sharded<test_state::Client>())
+ .then([](test_state::Server *server,
+ test_state::Client *client) {
+ entity_addr_t addr;
+ addr.parse("127.0.0.1:9010", nullptr);
+ addr.set_type(entity_addr_t::TYPE_LEGACY);
+ addr.set_family(AF_INET);
+ return seastar::when_all_succeed(
+ server->init(entity_name_t::OSD(4), "server3", 5, addr),
+ client->init(entity_name_t::OSD(5), "client3", 6))
+ .then([server, client] {
+ return client->msgr->connect(server->msgr->get_myaddr(),
+ entity_name_t::TYPE_OSD);
+ }).then([](ceph::net::ConnectionXRef conn) {
+ // send two messages
+ (*conn)->send(MessageRef{new MPing, false});
+ (*conn)->send(MessageRef{new MPing, false});
+ }).then([server] {
+ server->wait();
+ }).finally([client] {
+ logger().info("client shutdown...");
+ return client->msgr->shutdown();
+ }).finally([server] {
+ logger().info("server shutdown...");
+ return server->msgr->shutdown();
});
});
}
+}
+
int main(int argc, char** argv)
{
seastar::app_template app;
"number of pingpong rounds")
("keepalive-ratio", bpo::value<double>()->default_value(0.1),
"ratio of keepalive in ping messages");
- return app.run(argc, argv, [&] {
+ return app.run(argc, argv, [&app] {
auto&& config = app.configuration();
verbose = config["verbose"].as<bool>();
auto rounds = config["rounds"].as<unsigned>();