});
}
-void SocketConnection::read_tags_until_next_message()
+seastar::future<> SocketConnection::handle_tags()
{
- seastar::repeat([this] {
+ return seastar::keep_doing([this] {
// read the next tag
return socket->read_exactly(1)
.then([this] (auto buf) {
switch (buf[0]) {
case CEPH_MSGR_TAG_MSG:
- // stop looping and notify read_header()
- return seastar::make_ready_future<stop_t>(stop_t::yes);
+ return read_message();
case CEPH_MSGR_TAG_ACK:
return handle_ack();
case CEPH_MSGR_TAG_KEEPALIVE:
- break;
+ return seastar::now();
case CEPH_MSGR_TAG_KEEPALIVE2:
- return handle_keepalive2()
- .then([this] { return stop_t::no; });
+ return handle_keepalive2();
case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
- return handle_keepalive2_ack()
- .then([this] { return stop_t::no; });
+ return handle_keepalive2_ack();
case CEPH_MSGR_TAG_CLOSE:
logger().info("{} got tag close", *this);
- break;
+ throw std::system_error(make_error_code(error::connection_aborted));
+ default:
+ logger().error("{} got unknown msgr tag {}", *this, static_cast<int>(buf[0]));
+ throw std::system_error(make_error_code(error::read_eof));
}
- return seastar::make_ready_future<stop_t>(stop_t::no);
});
- }).handle_exception_type([this] (const std::system_error& e) {
- if (e.code() == error::read_eof) {
- close();
- }
- throw e;
- }).then_wrapped([this] (auto fut) {
- // satisfy the message promise
- fut.forward_to(std::move(on_message));
});
}
-seastar::future<seastar::stop_iteration> SocketConnection::handle_ack()
+seastar::future<> SocketConnection::handle_ack()
{
return socket->read_exactly(sizeof(ceph_le64))
.then([this] (auto buf) {
auto seq = reinterpret_cast<const ceph_le64*>(buf.get());
discard_up_to(&sent, *seq);
- return stop_t::no;
});
}
return policy.throttler_bytes->get(to_read);
}
-seastar::future<MessageRef> SocketConnection::do_read_message()
+seastar::future<> SocketConnection::read_message()
{
- return on_message.get_future()
- .then([this] {
- on_message = seastar::promise<>{};
- // read header
- return socket->read(sizeof(m.header));
- }).then([this] (bufferlist bl) {
+ return socket->read(sizeof(m.header))
+ .then([this] (bufferlist bl) {
// throttle the traffic, maybe
auto p = bl.cbegin();
::decode(m.header, p);
// read footer
return socket->read(sizeof(m.footer));
}).then([this] (bufferlist bl) {
- // resume background processing of tags
- read_tags_until_next_message();
-
auto p = bl.cbegin();
::decode(m.footer, p);
auto msg = ::decode_message(nullptr, 0, m.header, m.footer,
m.front, m.middle, m.data, nullptr);
// TODO: set time stamps
msg->set_byte_throttler(policy.throttler_bytes);
- constexpr bool add_ref = false; // Message starts with 1 ref
- return MessageRef{msg, add_ref};
- });
-}
-seastar::future<MessageRef> SocketConnection::read_message()
-{
- return seastar::repeat_until_value([this] {
- return do_read_message()
- .then([this] (MessageRef msg) -> std::optional<MessageRef> {
- if (!update_rx_seq(msg->get_seq())) {
- // skip this request and read the next
- return {};
- }
- return msg;
+ if (!update_rx_seq(msg->get_seq())) {
+ // skip this message
+ return;
+ }
+
+ constexpr bool add_ref = false; // Message starts with 1 ref
+ auto msg_ref = MessageRef{msg, add_ref};
+ // start dispatch, ignoring exceptions from the application layer
+ seastar::with_gate(pending_dispatch, [this, msg = std::move(msg_ref)] {
+ return dispatcher.ms_dispatch(this, std::move(msg))
+ .handle_exception([this] (std::exception_ptr eptr) {
+ logger().error("{} ms_dispatch caught exception: {}", *this, eptr);
+ ceph_assert(false);
+ });
});
});
}
h.promise.set_value();
seastar::with_gate(pending_dispatch, [this] {
// start background processing of tags
- read_tags_until_next_message();
- return seastar::keep_doing([this] {
- return read_message()
- .then([this] (MessageRef msg) {
- // start dispatch, ignoring exceptions from the application layer
- seastar::with_gate(pending_dispatch, [this, msg = std::move(msg)] {
- return dispatcher.ms_dispatch(this, std::move(msg))
- .handle_exception([] (std::exception_ptr eptr) {});
- });
- // return immediately to start on the next message
- return seastar::now();
- });
- }).handle_exception_type([this] (const std::system_error& e) {
+ return handle_tags()
+ .handle_exception_type([this] (const std::system_error& e) {
+ logger().warn("{} open fault: {}", *this, e);
if (e.code() == error::connection_aborted ||
e.code() == error::connection_reset) {
- return dispatcher.ms_handle_reset(this);
+ return dispatcher.ms_handle_reset(this)
+ .then([this] {
+ close();
+ });
} else if (e.code() == error::read_eof) {
- return dispatcher.ms_handle_remote_reset(this);
+ return dispatcher.ms_handle_remote_reset(this)
+ .then([this] {
+ close();
+ });
} else {
throw e;
}
}).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in the open state
logger().warn("{} open fault: {}", *this, eptr);
+ close();
});
});
}
bufferlist data;
} m;
- /// satisfied when a CEPH_MSGR_TAG_MSG is read, indicating that a message
- /// header will follow
- seastar::promise<> on_message;
-
seastar::future<> maybe_throttle();
- void read_tags_until_next_message();
- seastar::future<stop_t> handle_ack();
+ seastar::future<> handle_tags();
+ seastar::future<> handle_ack();
/// becomes available when handshake completes, and when all previous messages
/// have been sent to the output stream. send() chains new messages as
/// false otherwise.
bool update_rx_seq(seq_num_t seq);
- seastar::future<MessageRef> do_read_message();
+ seastar::future<> read_message();
std::unique_ptr<AuthSessionHandler> session_security;
void start_accept(seastar::connected_socket&& socket,
const entity_addr_t& peer_addr);
- /// read a message from a connection that has completed its handshake
- seastar::future<MessageRef> read_message();
-
/// the number of connections initiated in this session, increment when a
/// new connection is established
uint32_t connect_seq() const {