});
}
-//TODO(performance): batch messages in out_q instead of chaining individual write events
-//TODO: should discard all the pending messages when reset
-seastar::future<> SocketConnection::write_event(MessageRef msg)
+void SocketConnection::write_event()
{
+ if (write_dispatching) {
+ // already dispatching
+ return;
+ }
+ write_dispatching = true;
switch (write_state) {
case write_state_t::open:
case write_state_t::delay:
- return seastar::with_gate(pending_dispatch, [this, msg=std::move(msg)] {
- seastar::shared_future<> f = send_ready.then([this, msg = std::move(msg)] {
- return seastar::repeat([this, msg=std::move(msg)] {
- switch (write_state) {
- case write_state_t::open:
- return seastar::futurize_apply([this] {
- if (m_keepalive) {
- return do_keepalive()
- .then([this] { m_keepalive = false; });
- }
- return seastar::now();
- }).then([this] {
- if (m_keepalive_ack) {
- return do_keepalive_ack()
- .then([this] { m_keepalive_ack = false; });
- }
- return seastar::now();
- }).then([this, msg] {
- if (msg) {
- return write_message(msg);
+ seastar::with_gate(pending_dispatch, [this] {
+ return seastar::repeat([this] {
+ switch (write_state) {
+ case write_state_t::open:
+ return seastar::futurize_apply([this] {
+ if (m_keepalive) {
+ return do_keepalive()
+ .then([this] { m_keepalive = false; });
+ }
+ return seastar::now();
+ }).then([this] {
+ if (m_keepalive_ack) {
+ return do_keepalive_ack()
+ .then([this] { m_keepalive_ack = false; });
+ }
+ return seastar::now();
+ }).then([this] {
+ if (!out_q.empty()){
+ MessageRef msg = out_q.front();
+ return write_message(msg)
+ .then([this, msg] {
+ if (msg == out_q.front()) {
+ out_q.pop();
}
- return seastar::now();
- }).then([this] {
- return socket->flush();
- }).then([] {
- return stop_t::yes;
- }).handle_exception([this] (std::exception_ptr eptr) {
- logger().warn("{} write_event fault: {}", *this, eptr);
- close();
return stop_t::no;
});
- case write_state_t::delay:
- // delay all the writes until open
- return state_changed.get_shared_future()
- .then([] { return stop_t::no; });
- case write_state_t::drop:
- return seastar::make_ready_future<stop_t>(stop_t::yes);
- default:
- ceph_assert(false);
- }
- });
+ } else {
+ return socket->flush()
+ .then([this] {
+ if (!out_q.empty()) {
+ return stop_t::no;
+ } else {
+ write_dispatching = false;
+ return stop_t::yes;
+ }
+ });
+ }
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ logger().warn("{} write_event fault: {}", *this, eptr);
+ close();
+ return stop_t::no;
+ });
+ case write_state_t::delay:
+ // delay dispatching writes until open
+ return state_changed.get_shared_future()
+ .then([] { return stop_t::no; });
+ case write_state_t::drop:
+ write_dispatching = false;
+ return seastar::make_ready_future<stop_t>(stop_t::yes);
+ default:
+ ceph_assert(false);
+ }
});
- send_ready = f.get_future();
- return f.get_future();
});
+ return;
case write_state_t::drop:
- return seastar::now();
+ write_dispatching = false;
default:
ceph_assert(false);
}
{
logger().debug("{} --> {} === {}", messenger, get_peer_addr(), *msg);
return seastar::smp::submit_to(shard_id(), [this, msg=std::move(msg)] {
- return write_event(msg);
+ if (write_state != write_state_t::drop) {
+ out_q.push(std::move(msg));
+ write_event();
+ }
});
}