SocketConnection::SocketConnection(SocketMessenger& messenger,
ChainedDispatchers& dispatchers)
- : core(messenger.get_shard_id()),
- messenger(messenger)
+ : msgr_sid{messenger.get_shard_id()}, messenger(messenger)
{
auto ret = create_handlers(dispatchers, *this);
io_handler = std::move(ret.io_handler);
bool SocketConnection::is_connected() const
{
- assert(seastar::this_shard_id() == shard_id());
return io_handler->is_connected();
}
#ifdef UNIT_TESTS_BUILT
bool SocketConnection::is_closed() const
{
- assert(seastar::this_shard_id() == shard_id());
+ assert(seastar::this_shard_id() == msgr_sid);
return protocol->is_closed();
}
bool SocketConnection::is_closed_clean() const
{
- assert(seastar::this_shard_id() == shard_id());
+ assert(seastar::this_shard_id() == msgr_sid);
return protocol->is_closed_clean();
}
#endif
bool SocketConnection::peer_wins() const
{
+ assert(seastar::this_shard_id() == msgr_sid);
return (messenger.get_myaddr() > peer_addr || policy.server);
}
seastar::future<> SocketConnection::send(MessageURef msg)
{
return seastar::smp::submit_to(
- shard_id(),
+ io_handler->get_shard_id(),
[this, msg=std::move(msg)]() mutable {
return io_handler->send(std::move(msg));
});
seastar::future<> SocketConnection::send_keepalive()
{
return seastar::smp::submit_to(
- shard_id(),
+ io_handler->get_shard_id(),
[this] {
return io_handler->send_keepalive();
});
void SocketConnection::mark_down()
{
- assert(seastar::this_shard_id() == shard_id());
io_handler->mark_down();
}
SocketConnection::start_connect(const entity_addr_t& _peer_addr,
const entity_name_t& _peer_name)
{
+ assert(seastar::this_shard_id() == msgr_sid);
protocol->start_connect(_peer_addr, _peer_name);
}
SocketConnection::start_accept(SocketRef&& sock,
const entity_addr_t& _peer_addr)
{
+ assert(seastar::this_shard_id() == msgr_sid);
protocol->start_accept(std::move(sock), _peer_addr);
}
seastar::future<>
SocketConnection::close_clean_yielded()
{
+ assert(seastar::this_shard_id() == msgr_sid);
return protocol->close_clean_yielded();
}
-seastar::shard_id SocketConnection::shard_id() const {
- return core;
-}
-
seastar::socket_address SocketConnection::get_local_address() const {
+ assert(seastar::this_shard_id() == msgr_sid);
return socket->get_local_address();
}
ConnectionRef
SocketConnection::get_local_shared_foreign_from_this()
{
- assert(seastar::this_shard_id() == shard_id());
+ assert(seastar::this_shard_id() == msgr_sid);
return make_local_shared_foreign(
seastar::make_foreign(shared_from_this()));
}
+SocketMessenger &
+SocketConnection::get_messenger() const
+{
+ assert(seastar::this_shard_id() == msgr_sid);
+ return messenger;
+}
+
+void SocketConnection::set_peer_type(entity_type_t peer_type) {
+ assert(seastar::this_shard_id() == msgr_sid);
+ // it is not allowed to assign an unknown value when the current
+ // value is known
+ assert(!(peer_type == 0 &&
+ peer_name.type() != 0));
+ // it is not allowed to assign a different known value when the
+ // current value is also known.
+ assert(!(peer_type != 0 &&
+ peer_name.type() != 0 &&
+ peer_type != peer_name.type()));
+ peer_name._type = peer_type;
+}
+
+void SocketConnection::set_peer_id(int64_t peer_id) {
+ assert(seastar::this_shard_id() == msgr_sid);
+ // it is not allowed to assign an unknown value when the current
+ // value is known
+ assert(!(peer_id == entity_name_t::NEW &&
+ peer_name.num() != entity_name_t::NEW));
+ // it is not allowed to assign a different known value when the
+ // current value is also known.
+ assert(!(peer_id != entity_name_t::NEW &&
+ peer_name.num() != entity_name_t::NEW &&
+ peer_id != peer_name.num()));
+ peer_name._num = peer_id;
+}
+
+void SocketConnection::set_features(uint64_t f) {
+ assert(seastar::this_shard_id() == msgr_sid);
+ features = f;
+}
+
void SocketConnection::set_socket(Socket *s) {
+ assert(seastar::this_shard_id() == msgr_sid);
socket = s;
}
void SocketConnection::print(ostream& out) const {
- out << (void*)this << " ";
- messenger.print(out);
- if (!socket) {
- out << " >> " << get_peer_name() << " " << peer_addr;
- } else if (socket->get_side() == Socket::side_t::acceptor) {
- out << " >> " << get_peer_name() << " " << peer_addr
- << "@" << socket->get_ephemeral_port();
- } else { // socket->get_side() == Socket::side_t::connector
- out << "@" << socket->get_ephemeral_port()
- << " >> " << get_peer_name() << " " << peer_addr;
- }
+ out << (void*)this << " ";
+ messenger.print(out);
+ if (seastar::this_shard_id() != msgr_sid) {
+ out << " >> " << get_peer_name() << " " << peer_addr;
+ } else if (!socket) {
+ out << " >> " << get_peer_name() << " " << peer_addr;
+ } else if (socket->get_side() == Socket::side_t::acceptor) {
+ out << " >> " << get_peer_name() << " " << peer_addr
+ << "@" << socket->get_ephemeral_port();
+ } else { // socket->get_side() == Socket::side_t::connector
+ out << "@" << socket->get_ephemeral_port()
+ << " >> " << get_peer_name() << " " << peer_addr;
+ }
}
} // namespace crimson::net
ConnectionHandler &operator=(const ConnectionHandler &) = delete;
ConnectionHandler &operator=(ConnectionHandler &&) = delete;
+ virtual seastar::shard_id get_shard_id() const = 0;
+
virtual bool is_connected() const = 0;
virtual seastar::future<> send(MessageURef) = 0;
};
class SocketConnection : public Connection {
- const seastar::shard_id core;
-
- SocketMessenger& messenger;
-
- std::unique_ptr<ConnectionHandler> io_handler;
-
- std::unique_ptr<ProtocolV2> protocol;
-
- Socket *socket = nullptr;
-
- entity_name_t peer_name = {0, entity_name_t::NEW};
-
- entity_addr_t peer_addr;
-
- // which of the peer_addrs we're connecting to (as client)
- // or should reconnect to (as peer)
- entity_addr_t target_addr;
-
- uint64_t features = 0;
-
- ceph::net::Policy<crimson::common::Throttle> policy;
-
- uint64_t peer_global_id = 0;
-
- std::unique_ptr<user_private_t> user_private;
-
// Connection interfaces, public to users
public:
SocketConnection(SocketMessenger& messenger,
void print(std::ostream& out) const override;
- // public to SocketMessenger
+ /*
+ * Public to SocketMessenger
+ * Working in SocketMessenger::get_shard_id();
+ */
public:
/// start a handshake from the client's perspective,
/// only call when SocketConnection first construct
seastar::socket_address get_local_address() const;
- SocketMessenger &get_messenger() const {
- return messenger;
- }
+ SocketMessenger &get_messenger() const;
ConnectionRef get_local_shared_foreign_from_this();
private:
- seastar::shard_id shard_id() const;
-
- void set_peer_type(entity_type_t peer_type) {
- // it is not allowed to assign an unknown value when the current
- // value is known
- assert(!(peer_type == 0 &&
- peer_name.type() != 0));
- // it is not allowed to assign a different known value when the
- // current value is also known.
- assert(!(peer_type != 0 &&
- peer_name.type() != 0 &&
- peer_type != peer_name.type()));
- peer_name._type = peer_type;
- }
+ void set_peer_type(entity_type_t peer_type);
- void set_peer_id(int64_t peer_id) {
- // it is not allowed to assign an unknown value when the current
- // value is known
- assert(!(peer_id == entity_name_t::NEW &&
- peer_name.num() != entity_name_t::NEW));
- // it is not allowed to assign a different known value when the
- // current value is also known.
- assert(!(peer_id != entity_name_t::NEW &&
- peer_name.num() != entity_name_t::NEW &&
- peer_id != peer_name.num()));
- peer_name._num = peer_id;
- }
+ void set_peer_id(int64_t peer_id);
void set_peer_name(entity_name_t name) {
set_peer_type(name.type());
set_peer_id(name.num());
}
- void set_features(uint64_t f) {
- features = f;
- }
+ void set_features(uint64_t f);
void set_socket(Socket *s);
bool peer_wins() const;
#endif
+private:
+ const seastar::shard_id msgr_sid;
+
+ /*
+ * Core owner is messenger core, may allow to access from the I/O core.
+ */
+ SocketMessenger& messenger;
+
+ std::unique_ptr<ProtocolV2> protocol;
+
+ Socket *socket = nullptr;
+
+ entity_name_t peer_name = {0, entity_name_t::NEW};
+
+ entity_addr_t peer_addr;
+
+ // which of the peer_addrs we're connecting to (as client)
+ // or should reconnect to (as peer)
+ entity_addr_t target_addr;
+
+ uint64_t features = 0;
+
+ ceph::net::Policy<crimson::common::Throttle> policy;
+
+ uint64_t peer_global_id = 0;
+
+ /*
+ * Core owner is I/O core (mutable).
+ */
+ std::unique_ptr<ConnectionHandler> io_handler;
+
+ /*
+ * Core owner is up to the connection user.
+ */
+ std::unique_ptr<user_private_t> user_private;
+
friend class IOHandler;
friend class ProtocolV2;
friend class FrameAssemblerV2;
IOHandler::IOHandler(ChainedDispatchers &dispatchers,
SocketConnection &conn)
- : dispatchers(dispatchers),
+ : sid(seastar::this_shard_id()),
+ dispatchers(dispatchers),
conn(conn),
conn_ref(conn.get_local_shared_foreign_from_this())
{}
seastar::future<> IOHandler::send(MessageURef msg)
{
+ ceph_assert_always(seastar::this_shard_id() == sid);
if (io_state != io_state_t::drop) {
out_pending_msgs.push_back(std::move(msg));
notify_out_dispatch();
seastar::future<> IOHandler::send_keepalive()
{
+ ceph_assert_always(seastar::this_shard_id() == sid);
if (!need_keepalive) {
need_keepalive = true;
notify_out_dispatch();
void IOHandler::mark_down()
{
+ ceph_assert_always(seastar::this_shard_id() == sid);
ceph_assert_always(io_state != io_state_t::none);
need_dispatch_reset = false;
if (io_state == io_state_t::drop) {
* as ConnectionHandler
*/
private:
+ seastar::shard_id get_shard_id() const final {
+ return sid;
+ }
+
bool is_connected() const final {
+ ceph_assert_always(seastar::this_shard_id() == sid);
return protocol_is_connected;
}
seastar::future<> send_keepalive() final;
clock_t::time_point get_last_keepalive() const final {
+ ceph_assert_always(seastar::this_shard_id() == sid);
return last_keepalive;
}
clock_t::time_point get_last_keepalive_ack() const final {
+ ceph_assert_always(seastar::this_shard_id() == sid);
return last_keepalive_ack;
}
void set_last_keepalive_ack(clock_t::time_point when) final {
+ ceph_assert_always(seastar::this_shard_id() == sid);
last_keepalive_ack = when;
}
void do_in_dispatch();
private:
+ seastar::shard_id sid;
+
ChainedDispatchers &dispatchers;
SocketConnection &conn;