SocketMessenger::SocketMessenger(const entity_name_t& myname,
const std::string& logic_name,
uint32_t nonce)
- : master_sid{seastar::this_shard_id()},
+ : sid{seastar::this_shard_id()},
logic_name{logic_name},
nonce{nonce},
my_name{myname}
SocketMessenger::~SocketMessenger()
{
logger().debug("~SocketMessenger: {}", logic_name);
+ ceph_assert_always(seastar::this_shard_id() == sid);
ceph_assert(!listener);
}
bool SocketMessenger::set_addr_unknowns(const entity_addrvec_t &addrs)
{
+ assert(seastar::this_shard_id() == sid);
bool ret = false;
entity_addrvec_t newaddrs = my_addrs;
void SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs)
{
- assert(seastar::this_shard_id() == master_sid);
+ assert(seastar::this_shard_id() == sid);
my_addrs = addrs;
for (auto& addr : my_addrs.v) {
addr.nonce = nonce;
crimson::net::listen_ertr::future<>
SocketMessenger::do_listen(const entity_addrvec_t& addrs)
{
- assert(seastar::this_shard_id() == master_sid);
ceph_assert(addrs.front().get_family() == AF_INET);
set_myaddrs(addrs);
return seastar::futurize_invoke([this] {
SocketMessenger::bind_ertr::future<>
SocketMessenger::bind(const entity_addrvec_t& addrs)
{
+ assert(seastar::this_shard_id() == sid);
using crimson::common::local_conf;
return seastar::do_with(int64_t{local_conf()->ms_bind_retry_count},
[this, addrs] (auto& count) {
seastar::future<> SocketMessenger::start(
const dispatchers_t& _dispatchers) {
- assert(seastar::this_shard_id() == master_sid);
+ assert(seastar::this_shard_id() == sid);
dispatchers.assign(_dispatchers);
if (listener) {
ceph_assert(get_myaddr().get_port() > 0);
return listener->accept([this](SocketRef socket, entity_addr_t peer_addr) {
- assert(seastar::this_shard_id() == master_sid);
+ assert(seastar::this_shard_id() == sid);
assert(get_myaddr().is_msgr2());
SocketConnectionRef conn =
seastar::make_shared<SocketConnection>(*this, dispatchers);
crimson::net::ConnectionRef
SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_name_t& peer_name)
{
- assert(seastar::this_shard_id() == master_sid);
+ assert(seastar::this_shard_id() == sid);
// make sure we connect to a valid peer_addr
if (!peer_addr.is_msgr2()) {
seastar::future<> SocketMessenger::shutdown()
{
- assert(seastar::this_shard_id() == master_sid);
+ assert(seastar::this_shard_id() == sid);
return seastar::futurize_invoke([this] {
assert(dispatchers.empty());
if (listener) {
const entity_addr_t &peer_addr_for_me,
const SocketConnection& conn)
{
- assert(seastar::this_shard_id() == master_sid);
+ assert(seastar::this_shard_id() == sid);
if (!need_addr) {
if ((!get_myaddr().is_any() &&
get_myaddr().get_type() != peer_addr_for_me.get_type()) ||
SocketPolicy SocketMessenger::get_policy(entity_type_t peer_type) const
{
+ assert(seastar::this_shard_id() == sid);
return policy_set.get(peer_type);
}
SocketPolicy SocketMessenger::get_default_policy() const
{
+ assert(seastar::this_shard_id() == sid);
return policy_set.get_default();
}
void SocketMessenger::set_default_policy(const SocketPolicy& p)
{
+ assert(seastar::this_shard_id() == sid);
policy_set.set_default(p);
}
void SocketMessenger::set_policy(entity_type_t peer_type,
const SocketPolicy& p)
{
+ assert(seastar::this_shard_id() == sid);
policy_set.set(peer_type, p);
}
void SocketMessenger::set_policy_throttler(entity_type_t peer_type,
Throttle* throttle)
{
+ assert(seastar::this_shard_id() == sid);
// only byte throttler is used in OSD
policy_set.set_throttlers(peer_type, throttle, nullptr);
}
crimson::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr)
{
+ assert(seastar::this_shard_id() == sid);
if (auto found = connections.find(addr);
found != connections.end()) {
return found->second;
void SocketMessenger::accept_conn(SocketConnectionRef conn)
{
+ assert(seastar::this_shard_id() == sid);
accepting_conns.insert(conn);
}
void SocketMessenger::unaccept_conn(SocketConnectionRef conn)
{
+ assert(seastar::this_shard_id() == sid);
accepting_conns.erase(conn);
}
void SocketMessenger::register_conn(SocketConnectionRef conn)
{
+ assert(seastar::this_shard_id() == sid);
auto [i, added] = connections.emplace(conn->get_peer_addr(), conn);
std::ignore = i;
ceph_assert(added);
void SocketMessenger::unregister_conn(SocketConnectionRef conn)
{
+ assert(seastar::this_shard_id() == sid);
ceph_assert(conn);
auto found = connections.find(conn->get_peer_addr());
ceph_assert(found != connections.end());
void SocketMessenger::closing_conn(SocketConnectionRef conn)
{
+ assert(seastar::this_shard_id() == sid);
closing_conns.push_back(conn);
}
void SocketMessenger::closed_conn(SocketConnectionRef conn)
{
+ assert(seastar::this_shard_id() == sid);
for (auto it = closing_conns.begin();
it != closing_conns.end();) {
if (*it == conn) {
uint32_t SocketMessenger::get_global_seq(uint32_t old)
{
+ assert(seastar::this_shard_id() == sid);
if (old > global_seq) {
global_seq = old;
}
class ShardedServerSocket;
class SocketMessenger final : public Messenger {
- const seastar::shard_id master_sid;
- // Distinguish messengers with meaningful names for debugging
- const std::string logic_name;
- const uint32_t nonce;
-
- entity_name_t my_name;
- entity_addrvec_t my_addrs;
- crimson::auth::AuthClient* auth_client = nullptr;
- crimson::auth::AuthServer* auth_server = nullptr;
-
- ShardedServerSocket<true> *listener = nullptr;
- ChainedDispatchers dispatchers;
- std::map<entity_addr_t, SocketConnectionRef> connections;
- std::set<SocketConnectionRef> accepting_conns;
- std::vector<SocketConnectionRef> closing_conns;
- ceph::net::PolicySet<Throttle> policy_set;
- // specifying we haven't learned our addr; set false when we find it.
- bool need_addr = true;
- uint32_t global_seq = 0;
- bool started = false;
- seastar::promise<> shutdown_promise;
-
- listen_ertr::future<> do_listen(const entity_addrvec_t& addr);
- /// try to bind to the first unused port of given address
- bind_ertr::future<> try_bind(const entity_addrvec_t& addr,
- uint32_t min_port, uint32_t max_port);
-
-
- public:
+// Messenger public interfaces
+public:
SocketMessenger(const entity_name_t& myname,
const std::string& logic_name,
uint32_t nonce);
+
~SocketMessenger() override;
const entity_name_t &get_myname() const override {
void set_myaddrs(const entity_addrvec_t& addr) override;
+ bool set_addr_unknowns(const entity_addrvec_t &addr) override;
+
void set_auth_client(crimson::auth::AuthClient *ac) override {
+ assert(seastar::this_shard_id() == sid);
auth_client = ac;
}
void set_auth_server(crimson::auth::AuthServer *as) override {
+ assert(seastar::this_shard_id() == sid);
auth_server = as;
}
-
- bool set_addr_unknowns(const entity_addrvec_t &addr) override;
- // Messenger interfaces are assumed to be called from its own shard, but its
- // behavior should be symmetric when called from any shard.
bind_ertr::future<> bind(const entity_addrvec_t& addr) override;
seastar::future<> start(const dispatchers_t& dispatchers) override;
const entity_name_t& peer_name) override;
bool owns_connection(Connection &conn) const override {
+ assert(seastar::this_shard_id() == sid);
return this == &static_cast<SocketConnection&>(conn).get_messenger();
}
// can only wait once
seastar::future<> wait() override {
- assert(seastar::this_shard_id() == master_sid);
+ assert(seastar::this_shard_id() == sid);
return shutdown_promise.get_future();
}
void stop() override {
+ assert(seastar::this_shard_id() == sid);
dispatchers.clear();
}
bool is_started() const override {
+ assert(seastar::this_shard_id() == sid);
return !dispatchers.empty();
}
void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) override;
- public:
- crimson::auth::AuthClient* get_auth_client() const { return auth_client; }
+// SocketMessenger public interfaces
+public:
+ crimson::auth::AuthClient* get_auth_client() const {
+ assert(seastar::this_shard_id() == sid);
+ return auth_client;
+ }
- crimson::auth::AuthServer* get_auth_server() const { return auth_server; }
+ crimson::auth::AuthServer* get_auth_server() const {
+ assert(seastar::this_shard_id() == sid);
+ return auth_server;
+ }
uint32_t get_global_seq(uint32_t old=0);
const SocketConnection& conn);
SocketConnectionRef lookup_conn(const entity_addr_t& addr);
+
void accept_conn(SocketConnectionRef);
+
void unaccept_conn(SocketConnectionRef);
+
void register_conn(SocketConnectionRef);
+
void unregister_conn(SocketConnectionRef);
+
void closing_conn(SocketConnectionRef);
+
void closed_conn(SocketConnectionRef);
- seastar::shard_id shard_id() const {
- assert(seastar::this_shard_id() == master_sid);
- return master_sid;
+ seastar::shard_id get_shard_id() const {
+ return sid;
}
#ifdef UNIT_TESTS_BUILT
Interceptor *interceptor = nullptr;
#endif
+
+private:
+ listen_ertr::future<> do_listen(const entity_addrvec_t& addr);
+
+ /// try to bind to the first unused port of given address
+ bind_ertr::future<> try_bind(const entity_addrvec_t& addr,
+ uint32_t min_port, uint32_t max_port);
+
+ const seastar::shard_id sid;
+ // Distinguish messengers with meaningful names for debugging
+ const std::string logic_name;
+ const uint32_t nonce;
+
+ entity_name_t my_name;
+ entity_addrvec_t my_addrs;
+ crimson::auth::AuthClient* auth_client = nullptr;
+ crimson::auth::AuthServer* auth_server = nullptr;
+
+ ShardedServerSocket<true> *listener = nullptr;
+ ChainedDispatchers dispatchers;
+ std::map<entity_addr_t, SocketConnectionRef> connections;
+ std::set<SocketConnectionRef> accepting_conns;
+ std::vector<SocketConnectionRef> closing_conns;
+ ceph::net::PolicySet<Throttle> policy_set;
+ // specifying we haven't learned our addr; set false when we find it.
+ bool need_addr = true;
+ uint32_t global_seq = 0;
+ bool started = false;
+ seastar::promise<> shutdown_promise;
};
} // namespace crimson::net