assert(!out_exit_dispatching);
}
-void Protocol::close(bool dispatch_reset,
- std::optional<std::function<void()>> f_accept_new)
-{
- if (closed) {
- // already closing
- return;
- }
-
- bool is_replace = f_accept_new ? true : false;
- logger().info("{} closing: reset {}, replace {}", conn,
- dispatch_reset ? "yes" : "no",
- is_replace ? "yes" : "no");
-
- // atomic operations
- closed = true;
- trigger_close();
- if (f_accept_new) {
- (*f_accept_new)();
- }
- if (conn.socket) {
- conn.socket->shutdown();
- }
- set_out_state(out_state_t::drop);
- assert(!gate.is_closed());
- auto gate_closed = gate.close();
-
- if (dispatch_reset) {
- dispatchers.ms_handle_reset(
- seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
- is_replace);
- }
-
- // asynchronous operations
- assert(!close_ready.valid());
- close_ready = std::move(gate_closed).then([this] {
- if (conn.socket) {
- return conn.socket->close();
- } else {
- return seastar::now();
- }
- }).then([this] {
- logger().debug("{} closed!", conn);
- on_closed();
-#ifdef UNIT_TESTS_BUILT
- is_closed_clean = true;
- if (conn.interceptor) {
- conn.interceptor->register_conn_closed(conn);
- }
-#endif
- }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
- logger().error("{} closing: close_ready got unexpected exception {}", conn, eptr);
- ceph_abort();
- });
-}
-
ceph::bufferlist Protocol::sweep_out_pending_msgs_to_sent(
size_t num_msgs,
bool require_keepalive,
namespace crimson::net {
class Protocol {
+// public to SocketConnection
public:
Protocol(Protocol&&) = delete;
virtual ~Protocol();
virtual bool is_connected() const = 0;
+ virtual void close() = 0;
+
+ virtual seastar::future<> close_clean_yielded() = 0;
+
#ifdef UNIT_TESTS_BUILT
- bool is_closed_clean = false;
- bool is_closed() const { return closed; }
-#endif
+ virtual bool is_closed_clean() const = 0;
- // Reentrant closing
- void close(bool dispatch_reset, std::optional<std::function<void()>> f_accept_new=std::nullopt);
- seastar::future<> close_clean(bool dispatch_reset) {
- // yield() so that close(dispatch_reset) can be called *after*
- // close_clean() is applied to all connections in a container using
- // seastar::parallel_for_each(). otherwise, we could erase a connection in
- // the container when seastar::parallel_for_each() is still iterating in
- // it. that'd lead to a segfault.
- return seastar::yield(
- ).then([this, dispatch_reset, conn_ref = conn.shared_from_this()] {
- close(dispatch_reset);
- // it can happen if close_clean() is called inside Dispatcher::ms_handle_reset()
- // which will otherwise result in deadlock
- assert(close_ready.valid());
- return close_ready.get_future();
- });
- }
+ virtual bool is_closed() const = 0;
+#endif
virtual void start_connect(const entity_addr_t& peer_addr,
const entity_name_t& peer_name) = 0;
Protocol(ChainedDispatchers& dispatchers,
SocketConnection& conn);
- virtual void trigger_close() = 0;
-
virtual ceph::bufferlist do_sweep_messages(
const std::deque<MessageURef>& msgs,
size_t num_msgs,
virtual void notify_out() = 0;
- virtual void on_closed() = 0;
-
- private:
- bool closed = false;
- // become valid only after closed == true
- seastar::shared_future<> close_ready;
-
// the write state-machine
public:
using clock_t = seastar::lowres_system_clock;
throw std::system_error(make_error_code(crimson::net::error::protocol_aborted));
}
-[[noreturn]] void abort_in_close(crimson::net::ProtocolV2& proto, bool dispatch_reset) {
- proto.close(dispatch_reset);
- abort_protocol();
+#define ABORT_IN_CLOSE(dispatch_reset) { \
+ do_close(dispatch_reset); \
+ abort_protocol(); \
}
inline void expect_tag(const Tag& expected,
if (conn.policy.lossy) {
logger().info("{} {}: fault at {} on lossy channel, going to CLOSING -- {}",
conn, func_name, get_state_name(state), eptr);
- close(true);
+ do_close(true);
} else if (conn.policy.server ||
(conn.policy.standby && !is_out_queued_or_sent())) {
logger().info("{} {}: fault at {} with nothing to send, going to STANDBY -- {}",
logger().error("{} peer does not support all required features"
" required={} peer_supported={}",
conn, required_features, _peer_supported_features);
- abort_in_close(*this, is_connect);
+ ABORT_IN_CLOSE(is_connect);
}
if ((supported_features & _peer_required_features) != _peer_required_features) {
logger().error("{} we do not support all peer required features"
" peer_required={} supported={}",
conn, _peer_required_features, supported_features);
- abort_in_close(*this, is_connect);
+ ABORT_IN_CLOSE(is_connect);
}
peer_supported_features = _peer_supported_features;
bool is_rev1 = HAVE_MSGR2_FEATURE(peer_supported_features, REVISION_1);
});
} catch (const crimson::auth::error& e) {
logger().error("{} get_initial_auth_request returned {}", conn, e.what());
- abort_in_close(*this, true);
+ ABORT_IN_CLOSE(true);
return seastar::now();
}
}
logger().error("{} connection peer id ({}) does not match "
"what it should be ({}) during connecting, close",
conn, server_ident.gid(), conn.get_peer_id());
- abort_in_close(*this, true);
+ ABORT_IN_CLOSE(true);
}
conn.set_peer_id(server_ident.gid());
conn.set_features(server_ident.supported_features() &
logger().warn("{} connection peer type does not match what peer advertises {} != {}",
conn, ceph_entity_type_name(conn.get_peer_type()),
ceph_entity_type_name(_peer_type));
- abort_in_close(*this, true);
+ ABORT_IN_CLOSE(true);
}
if (unlikely(state != state_t::CONNECTING)) {
logger().debug("{} triggered {} during banner_exchange(), abort",
// close this connection because all the necessary information is delivered
// to the exisiting connection, and jump to error handling code to abort the
// current state.
- abort_in_close(*this, false);
+ ABORT_IN_CLOSE(false);
return seastar::make_ready_future<next_step_t>(next_step_t::none);
}
}).handle_exception([this] (std::exception_ptr eptr) {
logger().info("{} execute_accepting(): fault at {}, going to CLOSING -- {}",
conn, get_state_name(state), eptr);
- close(false);
+ do_close(false);
});
});
}
trigger_state(state_t::ESTABLISHING, out_state_t::delay, false);
if (existing_conn) {
- existing_conn->protocol->close(
+ static_cast<ProtocolV2*>(existing_conn->protocol.get())->do_close(
true /* dispatch_reset */, std::move(accept_me));
if (unlikely(state != state_t::ESTABLISHING)) {
logger().warn("{} triggered {} during execute_establishing(), "
}).handle_exception([this] (std::exception_ptr eptr) {
logger().info("{} execute_server_wait(): fault at {}, going to CLOSING -- {}",
conn, get_state_name(state), eptr);
- close(false);
+ do_close(false);
});
});
}
// CLOSING state
-void ProtocolV2::trigger_close()
+void ProtocolV2::close()
{
+ do_close(false);
+}
+
+seastar::future<> ProtocolV2::close_clean_yielded()
+{
+ // yield() so that do_close() can be called *after* close_clean_yielded() is
+ // applied to all connections in a container using
+ // seastar::parallel_for_each(). otherwise, we could erase a connection in
+ // the container when seastar::parallel_for_each() is still iterating in it.
+ // that'd lead to a segfault.
+ return seastar::yield(
+ ).then([this, conn_ref = conn.shared_from_this()] {
+ do_close(false);
+ // it can happen if close_clean() is called inside Dispatcher::ms_handle_reset()
+ // which will otherwise result in deadlock
+ assert(closed_clean_fut.valid());
+ return closed_clean_fut.get_future();
+ });
+}
+
+void ProtocolV2::do_close(
+ bool dispatch_reset,
+ std::optional<std::function<void()>> f_accept_new)
+{
+ if (closed) {
+ // already closing
+ return;
+ }
+
+ bool is_replace = f_accept_new ? true : false;
+ logger().info("{} closing: reset {}, replace {}", conn,
+ dispatch_reset ? "yes" : "no",
+ is_replace ? "yes" : "no");
+
+ /*
+ * atomic operations
+ */
+
+ closed = true;
+
+ // trigger close
messenger.closing_conn(
seastar::static_pointer_cast<SocketConnection>(
conn.shared_from_this()));
-
if (state == state_t::ACCEPTING || state == state_t::SERVER_WAIT) {
messenger.unaccept_conn(
seastar::static_pointer_cast<SocketConnection>(
// cannot happen
ceph_assert(false);
}
-
protocol_timer.cancel();
trigger_state(state_t::CLOSING, out_state_t::drop, false);
-}
-void ProtocolV2::on_closed()
-{
- messenger.closed_conn(
- seastar::static_pointer_cast<SocketConnection>(
- conn.shared_from_this()));
+ if (f_accept_new) {
+ (*f_accept_new)();
+ }
+ if (conn.socket) {
+ conn.socket->shutdown();
+ }
+ assert(!gate.is_closed());
+ auto gate_closed = gate.close();
+
+ if (dispatch_reset) {
+ dispatchers.ms_handle_reset(
+ seastar::static_pointer_cast<SocketConnection>(conn.shared_from_this()),
+ is_replace);
+ }
+
+ // asynchronous operations
+ assert(!closed_clean_fut.valid());
+ closed_clean_fut = std::move(gate_closed).then([this] {
+ if (conn.socket) {
+ return conn.socket->close();
+ } else {
+ return seastar::now();
+ }
+ }).then([this] {
+ logger().debug("{} closed!", conn);
+ messenger.closed_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+#ifdef UNIT_TESTS_BUILT
+ closed_clean = true;
+ if (conn.interceptor) {
+ conn.interceptor->register_conn_closed(conn);
+ }
+#endif
+ }).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
+ logger().error("{} closing: closed_clean_fut got unexpected exception {}",
+ conn, eptr);
+ ceph_abort();
+ });
}
void ProtocolV2::print_conn(std::ostream& out) const
SocketConnection& conn,
SocketMessenger& messenger);
~ProtocolV2() override;
- void print_conn(std::ostream&) const final;
+// public to SocketConnection, but private to the others
private:
- void on_closed() override;
bool is_connected() const override;
+ void close() override;
+
+ seastar::future<> close_clean_yielded() override;
+
+#ifdef UNIT_TESTS_BUILT
+ bool is_closed_clean() const override {
+ return closed_clean;
+ }
+
+ bool is_closed() const override {
+ return closed;
+ }
+
+#endif
void start_connect(const entity_addr_t& peer_addr,
const entity_name_t& peer_name) override;
void start_accept(SocketRef&& socket,
const entity_addr_t& peer_addr) override;
- void trigger_close() override;
+ void print_conn(std::ostream&) const final;
+ private:
ceph::bufferlist do_sweep_messages(
const std::deque<MessageURef>& msgs,
size_t num_msgs,
AuthConnectionMetaRef auth_meta;
+ bool closed = false;
+
+ // become valid only after closed == true
+ seastar::shared_future<> closed_clean_fut;
+
+#ifdef UNIT_TESTS_BUILT
+ bool closed_clean = false;
+
+#endif
enum class state_t {
NONE = 0,
ACCEPTING,
// SERVER_WAIT
void execute_server_wait();
+
+ // CLOSING
+ // reentrant
+ void do_close(bool dispatch_reset,
+ std::optional<std::function<void()>> f_accept_new=std::nullopt);
};
} // namespace crimson::net