seastar::future<> SocketMessenger::do_start(Dispatcher *disp)
{
dispatcher = disp;
+ started = true;
// start listening if bind() was called
if (listener) {
if (e.code() != error::connection_aborted) {
logger().error("{} unexpected error during accept: {}", *this, e);
}
- });
+ }).then([this] () { return accepting_complete.set_value(); });
+ } else {
+ accepting_complete.set_value();
}
-
return seastar::now();
}
seastar::future<> SocketMessenger::do_shutdown()
{
+ if (!started) {
+ return seastar::now();
+ }
+
if (listener) {
listener->abort_accept();
}
return seastar::parallel_for_each(connections, [] (auto conn) {
return conn.second->close();
});
+ }).then([this] {
+ return accepting_complete.get_shared_future();
}).finally([this] {
ceph_assert(connections.empty());
});
#include <seastar/core/gate.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/sharded.hh>
+#include <seastar/core/shared_future.hh>
#include "Messenger.h"
#include "SocketConnection.h"
seastar::socket_address paddr);
void do_bind(const entity_addrvec_t& addr);
+
+ bool started = false;
+ seastar::shared_promise<> accepting_complete;
seastar::future<> do_start(Dispatcher *disp);
seastar::foreign_ptr<ConnectionRef> do_connect(const entity_addr_t& peer_addr,
const entity_type_t& peer_type);