#include "auth/Auth.h"
#include "crimson/common/log.h"
+#include "Errors.h"
#include "Socket.h"
#include "SocketConnection.h"
Protocol::~Protocol()
{
ceph_assert(pending_dispatch.is_closed());
+ assert(!exit_open);
}
bool Protocol::is_connected() const
size_t num_msgs = conn.out_q.size();
// we must have something to write...
ceph_assert(is_queued());
+ assert(!open_write);
+ open_write = true;
+
MessageRef front_msg;
if (likely(num_msgs)) {
front_msg = conn.out_q.front();
// the dispatching can ONLY stop now
ceph_assert(write_dispatching);
write_dispatching = false;
+ assert(open_write);
+ open_write = false;
return seastar::make_ready_future<stop_t>(stop_t::yes);
} else {
// something is pending to send during flushing
+ assert(open_write);
+ open_write = false;
return seastar::make_ready_future<stop_t>(stop_t::no);
}
});
} else {
// messages were enqueued during socket write
+ assert(open_write);
+ open_write = false;
return seastar::make_ready_future<stop_t>(stop_t::no);
}
+ }).handle_exception_type([this] (const std::system_error& e) {
+ if (e.code() != error::broken_pipe &&
+ e.code() != error::connection_reset) {
+ logger().error("{} do_write_dispatch_sweep(): unexpected error {}",
+ conn, e);
+ ceph_abort();
+ }
+
+ logger().debug("{} do_write_dispatch_sweep() fault: {}", conn, e);
+ assert(open_write);
+ open_write = false;
+ if (exit_open) {
+ exit_open->set_value();
+ exit_open = std::nullopt;
+ }
+ socket->shutdown();
+ if (write_state == write_state_t::open) {
+ write_state = write_state_t::delay;
+ }
+ return seastar::make_ready_future<stop_t>(stop_t::no);
}).handle_exception([this] (std::exception_ptr eptr) {
- logger().warn("{} do_write_dispatch_sweep() fault: {}", conn, eptr);
- close();
+ logger().error("{} do_write_dispatch_sweep(): unexpected exception {}",
+ conn, eptr);
+ ceph_abort();
return seastar::make_ready_future<stop_t>(stop_t::no);
});
}
seastar::future<> send(MessageRef msg);
seastar::future<> keepalive();
+// TODO: encapsulate a SessionedSender class
protected:
// write_state is changed with state atomically, indicating the write
// behavior of the according state.
drop
};
void set_write_state(const write_state_t& state) {
+ if (write_state == write_state_t::open &&
+ state == write_state_t::delay) {
+ if (open_write) {
+ exit_open = seastar::shared_promise<>();
+ }
+ }
+ if (state == write_state_t::drop && exit_open) {
+ exit_open->set_value();
+ exit_open = std::nullopt;
+ }
write_state = state;
state_changed.set_value();
state_changed = seastar::shared_promise<>();
}
+ seastar::future<> wait_write_exit() {
+ if (exit_open) {
+ return exit_open->get_shared_future();
+ }
+ return seastar::now();
+ }
+
void notify_keepalive_ack(utime_t keepalive_ack);
bool is_queued() const {
bool need_keepalive = false;
std::optional<utime_t> keepalive_ack = std::nullopt;
bool write_dispatching = false;
+ // Indicate if we are in the middle of writing.
+ bool open_write = false;
+ // If another continuation is trying to close or replace socket when
+ // open_write is true, it needs to wait for exit_open until writing is
+ // stopped or failed.
+ std::optional<seastar::shared_promise<>> exit_open;
+
seastar::future<stop_t> do_write_dispatch_sweep();
void write_event();
};