}
}
+seastar::future<stop_t> Protocol::do_write_dispatch_sweep()
+{
+ switch (write_state) {
+ case write_state_t::open:
+ return seastar::futurize_apply([this] {
+ if (need_keepalive) {
+ return do_keepalive()
+ .then([this] { need_keepalive = false; });
+ }
+ return seastar::now();
+ }).then([this] {
+ if (need_keepalive_ack) {
+ return do_keepalive_ack()
+ .then([this] { need_keepalive_ack = false; });
+ }
+ return seastar::now();
+ }).then([this] {
+ if (!conn.out_q.empty()){
+ MessageRef msg = conn.out_q.front();
+ return write_message(msg)
+ .then([this, msg] {
+ if (msg == conn.out_q.front()) {
+ conn.out_q.pop();
+ }
+ return stop_t::no;
+ });
+ } else {
+ return socket->flush().then([this] {
+ if (!conn.out_q.empty()) {
+ return stop_t::no;
+ } else {
+ // the dispatching can only stop when out_q is empty
+ ceph_assert(write_dispatching);
+ write_dispatching = false;
+ return stop_t::yes;
+ }
+ });
+ }
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ logger().warn("{} do_write_dispatch_sweep() fault: {}", conn, eptr);
+ close();
+ return stop_t::no;
+ });
+ case write_state_t::delay: {
+ // delay dispatching writes until open
+ return state_changed.get_shared_future()
+ .then([] { return stop_t::no; });
+ }
+ case write_state_t::drop:
+ ceph_assert(write_dispatching);
+ write_dispatching = false;
+ return seastar::make_ready_future<stop_t>(stop_t::yes);
+ default:
+ ceph_assert(false);
+ }
+}
+
void Protocol::write_event()
{
if (write_dispatching) {
case write_state_t::delay:
seastar::with_gate(pending_dispatch, [this] {
return seastar::repeat([this] {
- switch (write_state) {
- case write_state_t::open:
- return seastar::futurize_apply([this] {
- if (need_keepalive) {
- return do_keepalive()
- .then([this] { need_keepalive = false; });
- }
- return seastar::now();
- }).then([this] {
- if (need_keepalive_ack) {
- return do_keepalive_ack()
- .then([this] { need_keepalive_ack = false; });
- }
- return seastar::now();
- }).then([this] {
- if (!conn.out_q.empty()){
- MessageRef msg = conn.out_q.front();
- return write_message(msg)
- .then([this, msg] {
- if (msg == conn.out_q.front()) {
- conn.out_q.pop();
- }
- return stop_t::no;
- });
- } else {
- return socket->flush()
- .then([this] {
- if (!conn.out_q.empty()) {
- return stop_t::no;
- } else {
- write_dispatching = false;
- return stop_t::yes;
- }
- });
- }
- }).handle_exception([this] (std::exception_ptr eptr) {
- logger().warn("{} write_event fault: {}", conn, eptr);
- close();
- return stop_t::no;
- });
- case write_state_t::delay:
- // delay dispatching writes until open
- return state_changed.get_shared_future()
- .then([] { return stop_t::no; });
- case write_state_t::drop:
- write_dispatching = false;
- return seastar::make_ready_future<stop_t>(stop_t::yes);
- default:
- ceph_assert(false);
- }
+ return do_write_dispatch_sweep();
});
});
return;