seastar::future<> SocketConnection::close()
{
+ // unregister_conn() drops a reference, so hold another until completion
+ auto cleanup = [conn = ConnectionRef(this)] {};
+
get_messenger()->unregister_conn(this);
- return seastar::when_all(in.close(), out.close()).discard_result();
+
+ return seastar::when_all(in.close(), out.close())
+ .discard_result()
+ .finally(std::move(cleanup));
}
// handshake
h.connect.authorizer_protocol,
authorizer);
}).then([this] (ceph::net::msgr_tag_t tag, bufferlist&& authorizer_reply) {
+ memset(&h.reply, 0, sizeof(h.reply));
if (tag) {
return send_connect_reply(tag, std::move(authorizer_reply));
}
}
h.connect_seq = h.connect.connect_seq + 1;
h.peer_global_seq = h.connect.global_seq;
- set_features((uint64_t)h.reply.features & (uint64_t)h.connect.features);
+ set_features((uint64_t)policy.features_supported & (uint64_t)h.connect.features);
// TODO: cct
return send_connect_reply_ready(CEPH_MSGR_TAG_READY, std::move(authorizer_reply));
});
h.reply.authorizer_len = authorizer_reply.length();
return out.write(reinterpret_cast<const char*>(&h.reply), sizeof(h.reply))
.then([this, reply=std::move(authorizer_reply)]() mutable {
- out.write(std::move(reply));
+ return out.write(std::move(reply));
}).then([this] {
return out.flush();
});
::encode(my_addr, bl, 0);
return out.write(std::move(bl)).then([this] { return out.flush(); });
}).then([=] {
- }).then([=] {
- return seastar::do_until([=] { return state == state_t::open; },
- [=] { return connect(peer_type, host_type); });
+ return seastar::do_until([=] { return state == state_t::open; },
+ [=] { return connect(peer_type, host_type); });
}).then([this] {
// start background processing of tags
read_tags_until_next_message();