// TODO: state wait
throw std::system_error(make_error_code(error::negotiation_failure));
case CEPH_MSGR_TAG_SEQ:
- break;
case CEPH_MSGR_TAG_READY:
- break;
- }
- if (auto missing = (policy.features_required & ~(uint64_t)h.reply.features);
- missing) {
- logger().error("{} missing required features", __func__);
- throw std::system_error(make_error_code(error::negotiation_failure));
- }
- if (tag == CEPH_MSGR_TAG_SEQ) {
- return socket->read_exactly(sizeof(seq_num_t))
- .then([this] (auto buf) {
- auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
- discard_up_to(&out_q, *acked_seq);
- return socket->write_flush(make_static_packet(in_seq));
+ if (auto missing = (policy.features_required & ~(uint64_t)h.reply.features);
+ missing) {
+ logger().error("{} missing required features", __func__);
+ throw std::system_error(make_error_code(error::negotiation_failure));
+ }
+ return seastar::futurize_apply([this, tag] {
+ if (tag == CEPH_MSGR_TAG_SEQ) {
+ return socket->read_exactly(sizeof(seq_num_t))
+ .then([this] (auto buf) {
+ auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
+ discard_up_to(&out_q, *acked_seq);
+ return socket->write_flush(make_static_packet(in_seq));
+ });
+ }
+ // tag CEPH_MSGR_TAG_READY
+ return seastar::now();
}).then([this] {
- return handle_connect_reply(CEPH_MSGR_TAG_READY);
+ // hooray!
+ h.peer_global_seq = h.reply.global_seq;
+ policy.lossy = h.reply.flags & CEPH_MSG_CONNECT_LOSSY;
+ h.connect_seq++;
+ h.backoff = 0ms;
+ set_features(h.reply.features & h.connect.features);
+ if (h.authorizer) {
+ session_security.reset(
+ get_auth_session_handler(nullptr,
+ h.authorizer->protocol,
+ h.authorizer->session_key,
+ features));
+ }
+ h.authorizer.reset();
+ return seastar::make_ready_future<stop_t>(stop_t::yes);
});
- }
- if (tag == CEPH_MSGR_TAG_READY) {
- // hooray!
- h.peer_global_seq = h.reply.global_seq;
- policy.lossy = h.reply.flags & CEPH_MSG_CONNECT_LOSSY;
- h.connect_seq++;
- h.backoff = 0ms;
- set_features(h.reply.features & h.connect.features);
- if (h.authorizer) {
- session_security.reset(
- get_auth_session_handler(nullptr,
- h.authorizer->protocol,
- h.authorizer->session_key,
- features));
- }
- h.authorizer.reset();
- return seastar::make_ready_future<stop_t>(stop_t::yes);
- } else {
+ break;
+ default:
// unknown tag
logger().error("{} got unknown tag", __func__, int(tag));
throw std::system_error(make_error_code(error::negotiation_failure));