using SocketPolicy = ceph::net::Policy<Throttle>;
class Messenger {
- crimson::auth::AuthClient* auth_client = nullptr;
- crimson::auth::AuthServer* auth_server = nullptr;
-
-protected:
- entity_name_t my_name;
- entity_addrvec_t my_addrs;
-
public:
- Messenger(const entity_name_t& name)
- : my_name(name)
- {}
+ Messenger() {}
+
virtual ~Messenger() {}
-#ifdef UNIT_TESTS_BUILT
- Interceptor *interceptor = nullptr;
-#endif
+ virtual const entity_name_t& get_myname() const = 0;
+
+ entity_type_t get_mytype() const { return get_myname().type(); }
+
+ virtual const entity_addrvec_t &get_myaddrs() const = 0;
+
+ entity_addr_t get_myaddr() const { return get_myaddrs().front(); }
- entity_type_t get_mytype() const { return my_name.type(); }
- const entity_name_t& get_myname() const { return my_name; }
- const entity_addrvec_t& get_myaddrs() const { return my_addrs; }
- entity_addr_t get_myaddr() const { return my_addrs.front(); }
virtual void set_myaddrs(const entity_addrvec_t& addrs) = 0;
+
virtual bool set_addr_unknowns(const entity_addrvec_t &addrs) = 0;
+ virtual void set_auth_client(crimson::auth::AuthClient *) = 0;
+
+ virtual void set_auth_server(crimson::auth::AuthServer *) = 0;
+
using bind_ertr = crimson::errorator<
crimson::ct_error::address_in_use, // The address (range) is already bound
crimson::ct_error::address_not_available
// and must be called if is bound.
virtual seastar::future<> shutdown() = 0;
- crimson::auth::AuthClient* get_auth_client() const { return auth_client; }
- void set_auth_client(crimson::auth::AuthClient *ac) {
- auth_client = ac;
- }
- crimson::auth::AuthServer* get_auth_server() const { return auth_server; }
- void set_auth_server(crimson::auth::AuthServer *as) {
- auth_server = as;
- }
-
virtual void print(std::ostream& out) const = 0;
virtual SocketPolicy get_policy(entity_type_t peer_type) const = 0;
create(const entity_name_t& name,
const std::string& lname,
const uint64_t nonce);
+
+#ifdef UNIT_TESTS_BUILT
+ virtual void set_interceptor(Interceptor *) = 0;
+#endif
};
inline std::ostream& operator<<(std::ostream& out, const Messenger& msgr) {
SocketMessenger::SocketMessenger(const entity_name_t& myname,
const std::string& logic_name,
uint32_t nonce)
- : Messenger{myname},
- master_sid{seastar::this_shard_id()},
+ : master_sid{seastar::this_shard_id()},
logic_name{logic_name},
- nonce{nonce}
+ nonce{nonce},
+ my_name{myname}
{}
SocketMessenger::~SocketMessenger()
class SocketMessenger final : public Messenger {
const seastar::shard_id master_sid;
- seastar::promise<> shutdown_promise;
+ // Distinguish messengers with meaningful names for debugging
+ const std::string logic_name;
+ const uint32_t nonce;
+
+ entity_name_t my_name;
+ entity_addrvec_t my_addrs;
+ crimson::auth::AuthClient* auth_client = nullptr;
+ crimson::auth::AuthServer* auth_server = nullptr;
FixedCPUServerSocket* listener = nullptr;
ChainedDispatchers dispatchers;
std::set<SocketConnectionRef> accepting_conns;
std::vector<SocketConnectionRef> closing_conns;
ceph::net::PolicySet<Throttle> policy_set;
- // Distinguish messengers with meaningful names for debugging
- const std::string logic_name;
- const uint32_t nonce;
// specifying we haven't learned our addr; set false when we find it.
bool need_addr = true;
uint32_t global_seq = 0;
bool started = false;
+ seastar::promise<> shutdown_promise;
listen_ertr::future<> do_listen(const entity_addrvec_t& addr);
/// try to bind to the first unused port of given address
uint32_t nonce);
~SocketMessenger() override;
+ const entity_name_t &get_myname() const override {
+ return my_name;
+ }
+
+ const entity_addrvec_t &get_myaddrs() const override {
+ return my_addrs;
+ }
+
void set_myaddrs(const entity_addrvec_t& addr) override;
+ void set_auth_client(crimson::auth::AuthClient *ac) override {
+ auth_client = ac;
+ }
+
+ void set_auth_server(crimson::auth::AuthServer *as) override {
+ auth_server = as;
+ }
+
+
bool set_addr_unknowns(const entity_addrvec_t &addr) override;
// Messenger interfaces are assumed to be called from its own shard, but its
// behavior should be symmetric when called from any shard.
void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) override;
public:
+ crimson::auth::AuthClient* get_auth_client() const { return auth_client; }
+
+ crimson::auth::AuthServer* get_auth_server() const { return auth_server; }
+
uint32_t get_global_seq(uint32_t old=0);
void learned_addr(const entity_addr_t &peer_addr_for_me,
assert(seastar::this_shard_id() == master_sid);
return master_sid;
}
+
+#ifdef UNIT_TESTS_BUILT
+ void set_interceptor(Interceptor *i) override {
+ interceptor = i;
+ }
+
+ Interceptor *interceptor = nullptr;
+#endif
};
} // namespace crimson::net
test_msgr->set_default_policy(policy);
test_msgr->set_auth_client(&dummy_auth);
test_msgr->set_auth_server(&dummy_auth);
- test_msgr->interceptor = &interceptor;
+ test_msgr->set_interceptor(&interceptor);
return test_msgr->bind(entity_addrvec_t{addr}).safe_then([this] {
return test_msgr->start({this});
}, Messenger::bind_ertr::all_same_way([addr] (const std::error_code& e) {