namespace crimson::net {
-#ifdef UNIT_TESTS_BUILT
-class Interceptor;
-#endif
-
using seq_num_t = uint64_t;
/**
* the connection originates.
*/
class Connection : public seastar::enable_shared_from_this<Connection> {
- entity_name_t peer_name = {0, entity_name_t::NEW};
-
- protected:
- entity_addr_t peer_addr;
-
- // which of the peer_addrs we're connecting to (as client)
- // or should reconnect to (as peer)
- entity_addr_t target_addr;
-
+ public:
using clock_t = seastar::lowres_system_clock;
- clock_t::time_point last_keepalive;
- clock_t::time_point last_keepalive_ack;
-
- void set_peer_type(entity_type_t peer_type) {
- // it is not allowed to assign an unknown value when the current
- // value is known
- assert(!(peer_type == 0 &&
- peer_name.type() != 0));
- // it is not allowed to assign a different known value when the
- // current value is also known.
- assert(!(peer_type != 0 &&
- peer_name.type() != 0 &&
- peer_type != peer_name.type()));
- peer_name._type = peer_type;
- }
- void set_peer_id(int64_t peer_id) {
- // it is not allowed to assign an unknown value when the current
- // value is known
- assert(!(peer_id == entity_name_t::NEW &&
- peer_name.num() != entity_name_t::NEW));
- // it is not allowed to assign a different known value when the
- // current value is also known.
- assert(!(peer_id != entity_name_t::NEW &&
- peer_name.num() != entity_name_t::NEW &&
- peer_id != peer_name.num()));
- peer_name._num = peer_id;
- }
- void set_peer_name(entity_name_t name) {
- set_peer_type(name.type());
- set_peer_id(name.num());
- }
- public:
Connection() {}
+
virtual ~Connection() {}
-#ifdef UNIT_TESTS_BUILT
- Interceptor *interceptor = nullptr;
-#endif
+ virtual const entity_name_t &get_peer_name() const = 0;
+
+ entity_type_t get_peer_type() const { return get_peer_name().type(); }
+ int64_t get_peer_id() const { return get_peer_name().num(); }
+ bool peer_is_mon() const { return get_peer_name().is_mon(); }
+ bool peer_is_mgr() const { return get_peer_name().is_mgr(); }
+ bool peer_is_mds() const { return get_peer_name().is_mds(); }
+ bool peer_is_osd() const { return get_peer_name().is_osd(); }
+ bool peer_is_client() const { return get_peer_name().is_client(); }
+
+ virtual const entity_addr_t &get_peer_addr() const = 0;
- const entity_addr_t& get_peer_addr() const { return peer_addr; }
const entity_addrvec_t get_peer_addrs() const {
- return entity_addrvec_t(peer_addr);
+ return entity_addrvec_t(get_peer_addr());
}
- const auto& get_peer_socket_addr() const {
- return target_addr;
- }
- const entity_name_t& get_peer_name() const { return peer_name; }
- entity_type_t get_peer_type() const { return peer_name.type(); }
- int64_t get_peer_id() const { return peer_name.num(); }
- bool peer_is_mon() const { return peer_name.is_mon(); }
- bool peer_is_mgr() const { return peer_name.is_mgr(); }
- bool peer_is_mds() const { return peer_name.is_mds(); }
- bool peer_is_osd() const { return peer_name.is_osd(); }
- bool peer_is_client() const { return peer_name.is_client(); }
+ virtual const entity_addr_t &get_peer_socket_addr() const = 0;
virtual uint64_t get_features() const = 0;
/// true if the handshake has completed and no errors have been encountered
virtual bool is_connected() const = 0;
-#ifdef UNIT_TESTS_BUILT
- virtual bool is_closed() const = 0;
-
- virtual bool is_closed_clean() const = 0;
-
- virtual bool peer_wins() const = 0;
-#endif
-
/**
* send
*
*/
virtual seastar::future<> keepalive() = 0;
+ virtual clock_t::time_point get_last_keepalive() const = 0;
+
+ virtual clock_t::time_point get_last_keepalive_ack() const = 0;
+
+ // workaround for the monitor client
+ virtual void set_last_keepalive_ack(clock_t::time_point when) = 0;
+
// close the connection and cancel any any pending futures from read/send,
// without dispatching any reset event
virtual void mark_down() = 0;
- virtual void print(std::ostream& out) const = 0;
-
- void set_last_keepalive(clock_t::time_point when) {
- last_keepalive = when;
- }
- void set_last_keepalive_ack(clock_t::time_point when) {
- last_keepalive_ack = when;
- }
- auto get_last_keepalive() const { return last_keepalive; }
- auto get_last_keepalive_ack() const { return last_keepalive_ack; }
-
struct user_private_t {
virtual ~user_private_t() = default;
};
-private:
- std::unique_ptr<user_private_t> user_private;
-public:
- bool has_user_private() const {
- return user_private != nullptr;
- }
- void set_user_private(std::unique_ptr<user_private_t> new_user_private) {
- assert(!has_user_private());
- user_private = std::move(new_user_private);
- }
- user_private_t &get_user_private() {
- assert(has_user_private());
- return *user_private;
- }
+
+ virtual bool has_user_private() const = 0;
+
+ virtual user_private_t &get_user_private() = 0;
+
+ virtual void set_user_private(std::unique_ptr<user_private_t>) = 0;
+
+ virtual void print(std::ostream& out) const = 0;
+
+#ifdef UNIT_TESTS_BUILT
+ virtual bool is_closed() const = 0;
+
+ virtual bool is_closed_clean() const = 0;
+
+ virtual bool peer_wins() const = 0;
+#endif
};
inline std::ostream& operator<<(std::ostream& out, const Connection& conn) {
class SocketConnection;
using SocketConnectionRef = seastar::shared_ptr<SocketConnection>;
+#ifdef UNIT_TESTS_BUILT
+class Interceptor;
+#endif
+
class SocketConnection : public Connection {
const seastar::shard_id core;
SocketMessenger& messenger;
std::unique_ptr<Protocol> protocol;
+ entity_name_t peer_name = {0, entity_name_t::NEW};
+
+ entity_addr_t peer_addr;
+
+ // which of the peer_addrs we're connecting to (as client)
+ // or should reconnect to (as peer)
+ entity_addr_t target_addr;
+
+ clock_t::time_point last_keepalive;
+
+ clock_t::time_point last_keepalive_ack;
+
uint64_t features = 0;
ceph::net::Policy<crimson::common::Throttle> policy;
uint64_t peer_global_id = 0;
+ std::unique_ptr<user_private_t> user_private;
+
seastar::shard_id shard_id() const;
public:
SocketConnection(SocketMessenger& messenger,
ChainedDispatchers& dispatchers);
+
~SocketConnection() override;
+ const entity_name_t &get_peer_name() const override {
+ return peer_name;
+ }
+
+ const entity_addr_t &get_peer_addr() const override {
+ return peer_addr;
+ }
+
+ const entity_addr_t &get_peer_socket_addr() const override {
+ return target_addr;
+ }
+
uint64_t get_features() const override {
return features;
}
bool is_connected() const override;
-#ifdef UNIT_TESTS_BUILT
- bool is_closed_clean() const override;
+ seastar::future<> send(MessageURef msg) override;
- bool is_closed() const override;
+ seastar::future<> keepalive() override;
- bool peer_wins() const override;
-#else
- bool peer_wins() const;
-#endif
+ clock_t::time_point get_last_keepalive() const override {
+ return last_keepalive;
+ }
- seastar::future<> send(MessageURef msg) override;
+ clock_t::time_point get_last_keepalive_ack() const override {
+ return last_keepalive_ack;
+ }
- seastar::future<> keepalive() override;
+ void set_last_keepalive_ack(clock_t::time_point when) override {
+ last_keepalive_ack = when;
+ }
void mark_down() override;
+ bool has_user_private() const override {
+ return user_private != nullptr;
+ }
+
+ user_private_t &get_user_private() override {
+ assert(has_user_private());
+ return *user_private;
+ }
+
+ void set_user_private(std::unique_ptr<user_private_t> new_user_private) override {
+ assert(!has_user_private());
+ user_private = std::move(new_user_private);
+ }
+
void print(std::ostream& out) const override;
+ public:
+ void set_peer_type(entity_type_t peer_type) {
+ // it is not allowed to assign an unknown value when the current
+ // value is known
+ assert(!(peer_type == 0 &&
+ peer_name.type() != 0));
+ // it is not allowed to assign a different known value when the
+ // current value is also known.
+ assert(!(peer_type != 0 &&
+ peer_name.type() != 0 &&
+ peer_type != peer_name.type()));
+ peer_name._type = peer_type;
+ }
+
+ void set_peer_id(int64_t peer_id) {
+ // it is not allowed to assign an unknown value when the current
+ // value is known
+ assert(!(peer_id == entity_name_t::NEW &&
+ peer_name.num() != entity_name_t::NEW));
+ // it is not allowed to assign a different known value when the
+ // current value is also known.
+ assert(!(peer_id != entity_name_t::NEW &&
+ peer_name.num() != entity_name_t::NEW &&
+ peer_id != peer_name.num()));
+ peer_name._num = peer_id;
+ }
+
+ void set_peer_name(entity_name_t name) {
+ set_peer_type(name.type());
+ set_peer_id(name.num());
+ }
+
+ void set_last_keepalive(clock_t::time_point when) {
+ last_keepalive = when;
+ }
+
void set_features(uint64_t f) {
features = f;
}
return messenger;
}
+#ifdef UNIT_TESTS_BUILT
+ bool is_closed_clean() const override;
+
+ bool is_closed() const override;
+
+ bool peer_wins() const override;
+
+ Interceptor *interceptor = nullptr;
+#else
+ bool peer_wins() const;
+#endif
+
friend class Protocol;
friend class ProtocolV2;
};