// unregister_conn() drops a reference, so hold another until completion
auto cleanup = [conn_ref = conn.shared_from_this(), this] {
logger().debug("{} closed!", conn);
+ on_closed();
#ifdef UNIT_TESTS_BUILT
is_closed_clean = true;
if (conn.interceptor) {
virtual void notify_write() {};
+ virtual void on_closed() {}
+
public:
const proto_t proto_type;
SocketRef socket;
}
protocol_timer.cancel();
-
+ messenger.closing_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
trigger_state(state_t::CLOSING, write_state_t::drop, false);
}
+void ProtocolV2::on_closed()
+{
+ messenger.closed_conn(
+ seastar::static_pointer_cast<SocketConnection>(
+ conn.shared_from_this()));
+}
+
void ProtocolV2::print(std::ostream& out) const
{
out << conn;
~ProtocolV2() override;
void print(std::ostream&) const final;
private:
+ void on_closed() override;
bool is_connected() const override;
void start_connect(const entity_addr_t& peer_addr,
return seastar::parallel_for_each(connections, [] (auto conn) {
return conn.second->close_clean(false);
});
+ }).then([this] {
+ return seastar::parallel_for_each(closing_conns, [] (auto conn) {
+ return conn->close_clean(false);
+ });
}).then([this] {
ceph_assert(connections.empty());
shutdown_promise.set_value();
connections.erase(found);
}
+void SocketMessenger::closing_conn(SocketConnectionRef conn)
+{
+ closing_conns.push_back(conn);
+}
+
+void SocketMessenger::closed_conn(SocketConnectionRef conn)
+{
+ for (auto it = closing_conns.begin();
+ it != closing_conns.end();) {
+ if (*it == conn) {
+ it = closing_conns.erase(it);
+ } else {
+ it++;
+ }
+ }
+}
+
seastar::future<uint32_t>
SocketMessenger::get_global_seq(uint32_t old)
{
ChainedDispatchersRef dispatchers;
std::map<entity_addr_t, SocketConnectionRef> connections;
std::set<SocketConnectionRef> accepting_conns;
+ std::vector<SocketConnectionRef> closing_conns;
ceph::net::PolicySet<Throttle> policy_set;
// Distinguish messengers with meaningful names for debugging
const std::string logic_name;
void unaccept_conn(SocketConnectionRef);
void register_conn(SocketConnectionRef);
void unregister_conn(SocketConnectionRef);
+ void closing_conn(SocketConnectionRef);
+ void closed_conn(SocketConnectionRef);
seastar::shard_id shard_id() const {
assert(seastar::this_shard_id() == master_sid);
return master_sid;