SocketConnection::~SocketConnection()
{
ceph_assert(pending_dispatch.is_closed());
- // errors were reported to callers of send()
- ceph_assert(send_ready.available());
- send_ready.ignore_ready_future();
}
ceph::net::Messenger*
return !send_ready.failed();
}
+seastar::future<> SocketConnection::send(MessageRef msg)
+{
+ return seastar::with_gate(pending_dispatch, [this, msg=std::move(msg)] {
+ return do_send(std::move(msg))
+ .handle_exception([this] (std::exception_ptr eptr) {
+ logger().warn("{} send fault: {}", *this, eptr);
+ close();
+ });
+ });
+}
+
+seastar::future<> SocketConnection::keepalive()
+{
+ return seastar::with_gate(pending_dispatch, [this] {
+ return do_keepalive()
+ .handle_exception([this] (std::exception_ptr eptr) {
+ logger().warn("{} keepalive fault: {}", *this, eptr);
+ close();
+ });
+ });
+}
+
void SocketConnection::read_tags_until_next_message()
{
seastar::repeat([this] {
});
}
-seastar::future<> SocketConnection::send(MessageRef msg)
+seastar::future<> SocketConnection::do_send(MessageRef msg)
{
// chain the message after the last message is sent
+ // TODO: retry send for lossless connection
seastar::shared_future<> f = send_ready.then(
[this, msg = std::move(msg)] {
return write_message(std::move(msg));
return f.get_future();
}
-seastar::future<> SocketConnection::keepalive()
+seastar::future<> SocketConnection::do_keepalive()
{
+ // TODO: retry keepalive for lossless connection
seastar::shared_future<> f = send_ready.then([this] {
k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec(
ceph::coarse_real_clock::now());