seastar::future<> SocketConnection::close()
{
+ if (state == state_t::closed) {
+ // already closing
+ assert(close_ready.valid());
+ return close_ready.get_future();
+ }
+
+ state = state_t::closed;
+
// unregister_conn() drops a reference, so hold another until completion
auto cleanup = [conn = ConnectionRef(this)] {};
get_messenger()->unregister_conn(this);
- return seastar::when_all(in.close(), out.close())
+ // close_ready become valid only after state is state_t::closed
+ assert(!close_ready.valid());
+ close_ready = seastar::when_all(in.close(), out.close())
.discard_result()
.finally(std::move(cleanup));
+ return close_ready.get_future();
}
// handshake
#pragma once
#include <seastar/core/reactor.hh>
+#include <seastar/core/shared_future.hh>
#include "msg/Policy.h"
#include "Connection.h"
state_t state = state_t::none;
+ /// become valid only when state is state_t::closed
+ seastar::shared_future<> close_ready;
+
/// buffer state for read()
struct Reader {
bufferlist buffer;