using seq_num_t = uint64_t;
+/**
+ * Connection
+ *
+ * Abstraction for messenger connections.
+ *
+ * Except when otherwise specified, methods must be invoked from the core on which
+ * the connection originates.
+ */
class Connection : public seastar::enable_shared_from_this<Connection> {
entity_name_t peer_name = {0, entity_name_t::NEW};
virtual bool peer_wins() const = 0;
#endif
- /// send a message over a connection that has completed its handshake
+ /**
+ * send
+ *
+ * Send a message over a connection that has completed its handshake.
+ * May be invoked from any core.
+ */
virtual seastar::future<> send(MessageURef msg) = 0;
- /// send a keepalive message over a connection that has completed its
- /// handshake
+ /**
+ * keepalive
+ *
+ * Send a keepalive message over a connection that has completed its
+ * handshake.
+ *
+ * May be invoked from any core.
+ */
virtual seastar::future<> keepalive() = 0;
// close the connection and cancel any any pending futures from read/send,
SocketConnection::SocketConnection(SocketMessenger& messenger,
ChainedDispatchers& dispatchers)
- : messenger(messenger),
+ : core(messenger.shard_id()),
+ messenger(messenger),
protocol(std::make_unique<ProtocolV2>(dispatchers, *this, messenger))
{
#ifdef UNIT_TESTS_BUILT
seastar::future<> SocketConnection::send(MessageURef msg)
{
- assert(seastar::this_shard_id() == shard_id());
- return protocol->send(std::move(msg));
+ return seastar::smp::submit_to(
+ shard_id(),
+ [this, msg=std::move(msg)]() mutable {
+ return protocol->send(std::move(msg));
+ });
}
seastar::future<> SocketConnection::keepalive()
{
- assert(seastar::this_shard_id() == shard_id());
- return protocol->keepalive();
+ return seastar::smp::submit_to(
+ shard_id(),
+ [this] {
+ return protocol->keepalive();
+ });
}
void SocketConnection::mark_down()
}
seastar::shard_id SocketConnection::shard_id() const {
- return messenger.shard_id();
+ return core;
}
seastar::socket_address SocketConnection::get_local_address() const {
using SocketConnectionRef = seastar::shared_ptr<SocketConnection>;
class SocketConnection : public Connection {
+ const seastar::shard_id core;
SocketMessenger& messenger;
std::unique_ptr<Protocol> protocol;