return policy.throttler_bytes->get(to_read);
}
-seastar::future<MessageRef> SocketConnection::read_message()
+seastar::future<MessageRef> SocketConnection::do_read_message()
{
return on_message.get_future()
.then([this] {
::decode(m.footer, p);
auto msg = ::decode_message(nullptr, 0, m.header, m.footer,
m.front, m.middle, m.data, nullptr);
+ // TODO: set time stamps
+ msg->set_byte_throttler(policy.throttler_bytes);
constexpr bool add_ref = false; // Message starts with 1 ref
return MessageRef{msg, add_ref};
- }).then([this] (MessageRef msg) {
- if (msg) {
- // TODO: set time stamps
- msg->set_byte_throttler(policy.throttler_bytes);
- if (!update_rx_seq(msg->get_seq())) {
- msg.reset();
- }
- }
- return msg;
+ });
+}
+
+seastar::future<MessageRef> SocketConnection::read_message()
+{
+ namespace stdx = std::experimental;
+
+ return seastar::repeat_until_value([this] {
+ return do_read_message()
+ .then([this] (MessageRef msg) -> stdx::optional<MessageRef> {
+ if (!update_rx_seq(msg->get_seq())) {
+ // skip this request and read the next
+ return stdx::nullopt;
+ }
+ return msg;
+ });
});
}