// TODO: encapsulate a SessionedSender class
protected:
- seastar::future<> close_io() {
+ seastar::future<> close_io(
+ bool is_dispatch_reset,
+ bool is_replace) {
ceph_assert_always(io_state == io_state_t::drop);
+
+ if (is_dispatch_reset) {
+ dispatch_reset(is_replace);
+ }
assert(!gate.is_closed());
return gate.close();
}
seastar::future<FrameAssemblerV2Ref> wait_io_exit_dispatching();
+ void reset_session(bool full);
+
void requeue_out_sent_up_to(seq_num_t seq);
void requeue_out_sent();
- void reset_out();
-
- void reset_in() {
- in_seq = 0;
- }
-
bool is_out_queued_or_sent() const {
return is_out_queued() || !out_sent_msgs.empty();
}
void dispatch_connect();
+ private:
void dispatch_reset(bool is_replace);
void dispatch_remote_reset();
- private:
bool is_out_queued() const {
return (!out_pending_msgs.empty() ||
ack_left > 0 ||
next_keepalive_ack.has_value());
}
+ void reset_out();
+
seastar::future<stop_t> try_exit_out_dispatch();
seastar::future<> do_out_dispatch();
void do_in_dispatch();
+private:
ChainedDispatchers &dispatchers;
SocketConnection &conn;
{
server_cookie = 0;
connect_seq = 0;
- reset_in();
if (full) {
client_cookie = generate_client_cookie();
peer_global_seq = 0;
- reset_out();
- dispatch_remote_reset();
}
+ do_reset_session(full);
}
seastar::future<std::tuple<entity_type_t, entity_addr_t>>
// this is required for the case when this connection is being replaced
requeue_out_sent_up_to(0);
- reset_in();
+ do_reset_session(false);
if (!conn.policy.lossy) {
server_cookie = ceph::util::generate_random_number<uint64_t>(1, -1ll);
}
assert(!gate.is_closed());
auto handshake_closed = gate.close();
- auto io_closed = close_io();
-
- if (is_dispatch_reset) {
- dispatch_reset(is_replace);
- }
+ auto io_closed = close_io(
+ is_dispatch_reset, is_replace);
// asynchronous operations
assert(!closed_clean_fut.valid());
void notify_mark_down() override;
+ private:
seastar::future<> wait_exit_io() {
if (exit_io.has_value()) {
return exit_io->get_shared_future();
}
}
- private:
- SocketConnection &conn;
-
- SocketMessenger &messenger;
-
- bool has_socket = false;
-
- // the socket exists and it is not shutdown
- bool is_socket_valid = false;
-
- FrameAssemblerV2Ref frame_assembler;
-
- std::optional<seastar::shared_promise<>> exit_io;
-
- AuthConnectionMetaRef auth_meta;
-
- crimson::common::Gated gate;
-
- bool closed = false;
-
- // become valid only after closed == true
- seastar::shared_future<> closed_clean_fut;
-
-#ifdef UNIT_TESTS_BUILT
- bool closed_clean = false;
-
-#endif
enum class state_t {
NONE = 0,
ACCEPTING,
REPLACING,
CLOSING
};
- state_t state = state_t::NONE;
static const char *get_state_name(state_t state) {
const char *const statenames[] = {"NONE",
void trigger_state(state_t state, io_state_t io_state, bool reentrant);
- uint64_t peer_supported_features = 0;
-
- uint64_t client_cookie = 0;
- uint64_t server_cookie = 0;
- uint64_t global_seq = 0;
- uint64_t peer_global_seq = 0;
- uint64_t connect_seq = 0;
-
- seastar::future<> execution_done = seastar::now();
-
template <typename Func, typename T>
void gated_execute(const char *what, T &who, Func &&func) {
gate.dispatch_in_background(what, who, [this, &who, &func] {
});
}
- class Timer {
- double last_dur_ = 0.0;
- const SocketConnection& conn;
- std::optional<seastar::abort_source> as;
- public:
- Timer(SocketConnection& conn) : conn(conn) {}
- double last_dur() const { return last_dur_; }
- seastar::future<> backoff(double seconds);
- void cancel() {
- last_dur_ = 0.0;
- if (as) {
- as->request_abort();
- as = std::nullopt;
- }
- }
- };
- Timer protocol_timer;
-
- private:
void fault(state_t expected_state,
const char *where,
std::exception_ptr eptr);
// reentrant
void do_close(bool is_dispatch_reset,
std::optional<std::function<void()>> f_accept_new=std::nullopt);
+
+ private:
+ SocketConnection &conn;
+
+ SocketMessenger &messenger;
+
+ bool has_socket = false;
+
+ // the socket exists and it is not shutdown
+ bool is_socket_valid = false;
+
+ FrameAssemblerV2Ref frame_assembler;
+
+ std::optional<seastar::shared_promise<>> exit_io;
+
+ AuthConnectionMetaRef auth_meta;
+
+ crimson::common::Gated gate;
+
+ bool closed = false;
+
+ // become valid only after closed == true
+ seastar::shared_future<> closed_clean_fut;
+
+#ifdef UNIT_TESTS_BUILT
+ bool closed_clean = false;
+
+#endif
+ state_t state = state_t::NONE;
+
+ uint64_t peer_supported_features = 0;
+
+ uint64_t client_cookie = 0;
+ uint64_t server_cookie = 0;
+ uint64_t global_seq = 0;
+ uint64_t peer_global_seq = 0;
+ uint64_t connect_seq = 0;
+
+ seastar::future<> execution_done = seastar::now();
+
+ class Timer {
+ double last_dur_ = 0.0;
+ const SocketConnection& conn;
+ std::optional<seastar::abort_source> as;
+ public:
+ Timer(SocketConnection& conn) : conn(conn) {}
+ double last_dur() const { return last_dur_; }
+ seastar::future<> backoff(double seconds);
+ void cancel() {
+ last_dur_ = 0.0;
+ if (as) {
+ as->request_abort();
+ as = std::nullopt;
+ }
+ }
+ };
+ Timer protocol_timer;
};
} // namespace crimson::net