}
}
-seastar::future<>
+seastar::future<seastar::stop_iteration>
SocketConnection::repeat_handle_connect()
{
return socket->read(sizeof(h.connect))
});
}
-seastar::future<>
+seastar::future<seastar::stop_iteration>
SocketConnection::send_connect_reply(msgr_tag_t tag,
bufferlist&& authorizer_reply)
{
return socket->write(make_static_packet(h.reply))
.then([this, reply=std::move(authorizer_reply)]() mutable {
return socket->write_flush(std::move(reply));
+ }).then([] {
+ return stop_t::no;
});
}
-seastar::future<>
+seastar::future<seastar::stop_iteration>
SocketConnection::send_connect_reply_ready(msgr_tag_t tag,
bufferlist&& authorizer_reply)
{
}).then([this] {
messenger.register_conn(this);
messenger.unaccept_conn(this);
- state = state_t::open;
+ return stop_t::yes;
});
}
});
}
-seastar::future<>
+seastar::future<seastar::stop_iteration>
SocketConnection::handle_connect_with_existing(SocketConnectionRef existing, bufferlist&& authorizer_reply)
{
if (h.connect.global_seq < existing->peer_global_seq()) {
}
}
-seastar::future<> SocketConnection::replace_existing(SocketConnectionRef existing,
- bufferlist&& authorizer_reply,
- bool is_reset_from_peer)
+seastar::future<seastar::stop_iteration>
+SocketConnection::replace_existing(SocketConnectionRef existing,
+ bufferlist&& authorizer_reply,
+ bool is_reset_from_peer)
{
msgr_tag_t reply_tag;
if (HAVE_FEATURE(h.connect.features, RECONNECT_SEQ) &&
return send_connect_reply_ready(reply_tag, std::move(authorizer_reply));
}
-seastar::future<> SocketConnection::handle_connect_reply(msgr_tag_t tag)
+seastar::future<seastar::stop_iteration>
+SocketConnection::handle_connect_reply(msgr_tag_t tag)
{
switch (tag) {
case CEPH_MSGR_TAG_FEATURES:
- return fault();
+ logger().error("{} connect protocol feature mispatch", __func__);
+ throw std::system_error(make_error_code(error::negotiation_failure));
case CEPH_MSGR_TAG_BADPROTOVER:
- return fault();
+ logger().error("{} connect protocol version mispatch", __func__);
+ throw std::system_error(make_error_code(error::negotiation_failure));
case CEPH_MSGR_TAG_BADAUTHORIZER:
if (h.got_bad_auth) {
logger().error("{} got bad authorizer", __func__);
return messenger.get_authorizer(peer_type, true)
.then([this](auto&& auth) {
h.authorizer = std::move(auth);
- return seastar::now();
+ return stop_t::no;
});
case CEPH_MSGR_TAG_RESETSESSION:
reset_session();
- return seastar::now();
+ return seastar::make_ready_future<stop_t>(stop_t::no);
case CEPH_MSGR_TAG_RETRY_GLOBAL:
h.global_seq = messenger.get_global_seq(h.reply.global_seq);
- return seastar::now();
+ return seastar::make_ready_future<stop_t>(stop_t::no);
case CEPH_MSGR_TAG_RETRY_SESSION:
ceph_assert(h.reply.connect_seq > h.connect_seq);
h.connect_seq = h.reply.connect_seq;
- return seastar::now();
+ return seastar::make_ready_future<stop_t>(stop_t::no);
case CEPH_MSGR_TAG_WAIT:
- return fault();
+ // TODO: state wait
+ throw std::system_error(make_error_code(error::negotiation_failure));
case CEPH_MSGR_TAG_SEQ:
break;
case CEPH_MSGR_TAG_READY:
}
if (auto missing = (policy.features_required & ~(uint64_t)h.reply.features);
missing) {
- return fault();
+ logger().error("{} missing required features", __func__);
+ throw std::system_error(make_error_code(error::negotiation_failure));
}
if (tag == CEPH_MSGR_TAG_SEQ) {
return socket->read_exactly(sizeof(seq_num_t))
features));
}
h.authorizer.reset();
- return seastar::now();
+ return seastar::make_ready_future<stop_t>(stop_t::yes);
} else {
// unknown tag
logger().error("{} got unknown tag", __func__, int(tag));
}
}
-seastar::future<> SocketConnection::repeat_connect()
+seastar::future<seastar::stop_iteration>
+SocketConnection::repeat_connect()
{
// encode ceph_msg_connect
memset(&h.connect, 0, sizeof(h.connect));
h.global_seq = messenger.get_global_seq();
return socket->write_flush(std::move(bl));
}).then([=] {
- return seastar::do_until([=] { return state == state_t::open; },
- [=] { return repeat_connect(); });
+ return seastar::repeat([this] {
+ return repeat_connect();
+ });
}).then([this] {
+ state = state_t::open;
// start background processing of tags
read_tags_until_next_message();
}).then_wrapped([this] (auto fut) {
peer_addr = addr;
}
}).then([this] {
- return seastar::do_until([this] { return state == state_t::open; },
- [this] { return repeat_handle_connect(); });
+ return seastar::repeat([this] {
+ return repeat_handle_connect();
+ });
}).then([this] {
+ state = state_t::open;
// start background processing of tags
read_tags_until_next_message();
}).then_wrapped([this] (auto fut) {
} h;
/// server side of handshake negotiation
- seastar::future<> repeat_handle_connect();
- seastar::future<> handle_connect_with_existing(SocketConnectionRef existing,
- bufferlist&& authorizer_reply);
- seastar::future<> replace_existing(SocketConnectionRef existing,
- bufferlist&& authorizer_reply,
- bool is_reset_from_peer = false);
- seastar::future<> send_connect_reply(ceph::net::msgr_tag_t tag,
- bufferlist&& authorizer_reply = {});
- seastar::future<> send_connect_reply_ready(ceph::net::msgr_tag_t tag,
- bufferlist&& authorizer_reply);
+ seastar::future<stop_t> repeat_handle_connect();
+ seastar::future<stop_t> handle_connect_with_existing(SocketConnectionRef existing,
+ bufferlist&& authorizer_reply);
+ seastar::future<stop_t> replace_existing(SocketConnectionRef existing,
+ bufferlist&& authorizer_reply,
+ bool is_reset_from_peer = false);
+ seastar::future<stop_t> send_connect_reply(ceph::net::msgr_tag_t tag,
+ bufferlist&& authorizer_reply = {});
+ seastar::future<stop_t> send_connect_reply_ready(ceph::net::msgr_tag_t tag,
+ bufferlist&& authorizer_reply);
seastar::future<> handle_keepalive2();
seastar::future<> handle_keepalive2_ack();
bool require_auth_feature() const;
uint32_t get_proto_version(entity_type_t peer_type, bool connec) const;
/// client side of handshake negotiation
- seastar::future<> repeat_connect();
- seastar::future<> handle_connect_reply(ceph::net::msgr_tag_t tag);
+ seastar::future<stop_t> repeat_connect();
+ seastar::future<stop_t> handle_connect_reply(ceph::net::msgr_tag_t tag);
void reset_session();
/// state for an incoming message