SocketConnection::SocketConnection(SocketMessenger& messenger,
Dispatcher& dispatcher)
: messenger(messenger),
- dispatcher(dispatcher),
- send_ready(h.promise.get_future())
+ dispatcher(dispatcher)
{
ceph_assert(&messenger.container().local() == &messenger);
}
seastar::future<bool> SocketConnection::is_connected()
{
return seastar::smp::submit_to(shard_id(), [this] {
- return !send_ready.failed();
+ return write_state == write_state_t::open;
});
}
+//TODO(performance): batch messages in out_q instead of chaining individual write events
+//TODO: should discard all the pending messages when reset
+seastar::future<> SocketConnection::write_event(MessageRef msg)
+{
+ switch (write_state) {
+ case write_state_t::open:
+ case write_state_t::delay:
+ return seastar::with_gate(pending_dispatch, [this, msg=std::move(msg)] {
+ seastar::shared_future<> f = send_ready.then([this, msg = std::move(msg)] {
+ return seastar::repeat([this, msg=std::move(msg)] {
+ switch (write_state) {
+ case write_state_t::open:
+ return seastar::futurize_apply([this] {
+ if (m_keepalive) {
+ return do_keepalive()
+ .then([this] { m_keepalive = false; });
+ }
+ return seastar::now();
+ }).then([this] {
+ if (m_keepalive_ack) {
+ return do_keepalive_ack()
+ .then([this] { m_keepalive_ack = false; });
+ }
+ return seastar::now();
+ }).then([this, msg] {
+ if (msg) {
+ return write_message(msg);
+ }
+ return seastar::now();
+ }).then([this] {
+ return socket->flush();
+ }).then([] {
+ return stop_t::yes;
+ }).handle_exception([this] (std::exception_ptr eptr) {
+ logger().warn("{} write_event fault: {}", *this, eptr);
+ close();
+ return stop_t::no;
+ });
+ case write_state_t::delay:
+ // delay all the writes until open
+ return state_changed.get_shared_future()
+ .then([] { return stop_t::no; });
+ case write_state_t::drop:
+ return seastar::make_ready_future<stop_t>(stop_t::yes);
+ default:
+ ceph_assert(false);
+ }
+ });
+ });
+ send_ready = f.get_future();
+ return f.get_future();
+ });
+ case write_state_t::drop:
+ return seastar::now();
+ default:
+ ceph_assert(false);
+ }
+}
+
seastar::future<> SocketConnection::send(MessageRef msg)
{
logger().debug("{} --> {} === {}", messenger, get_peer_addr(), *msg);
return seastar::smp::submit_to(shard_id(), [this, msg=std::move(msg)] {
- if (state == state_t::closing)
- return seastar::now();
- return seastar::with_gate(pending_dispatch, [this, msg=std::move(msg)] {
- return do_send(std::move(msg))
- .handle_exception([this] (std::exception_ptr eptr) {
- logger().warn("{} send fault: {}", *this, eptr);
- close();
- });
- });
- });
+ return write_event(msg);
+ });
}
seastar::future<> SocketConnection::keepalive()
{
return seastar::smp::submit_to(shard_id(), [this] {
- if (state == state_t::closing)
- return seastar::now();
- return seastar::with_gate(pending_dispatch, [this] {
- return do_keepalive()
- .handle_exception([this] (std::exception_ptr eptr) {
- logger().warn("{} keepalive fault: {}", *this, eptr);
- close();
- });
- });
- });
+ if (!m_keepalive) {
+ m_keepalive = true;
+ write_event();
+ }
+ });
}
seastar::future<> SocketConnection::close()
bl.append((const char*)&old_footer, sizeof(old_footer));
}
// write as a seastar::net::packet
- return socket->write_flush(std::move(bl));
+ return socket->write(std::move(bl));
// TODO: lossless policy
// .then([this, msg = std::move(msg)] {
// if (!policy.lossy) {
// });
}
-seastar::future<> SocketConnection::do_send(MessageRef msg)
+seastar::future<> SocketConnection::do_keepalive()
{
- // chain the message after the last message is sent
- // TODO: retry send for lossless connection
- seastar::shared_future<> f = send_ready.then(
- [this, msg = std::move(msg)] {
- if (state == state_t::closing)
- return seastar::now();
- return write_message(std::move(msg));
- });
-
- // chain any later messages after this one completes
- send_ready = f.get_future();
- // allow the caller to wait on the same future
- return f.get_future();
+ k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec(
+ ceph::coarse_real_clock::now());
+ logger().debug("{} write keepalive2 {}", *this, k.req.stamp.tv_sec);
+ return socket->write(make_static_packet(k.req));
}
-seastar::future<> SocketConnection::do_keepalive()
+seastar::future<> SocketConnection::do_keepalive_ack()
{
- // TODO: retry keepalive for lossless connection
- seastar::shared_future<> f = send_ready.then([this] {
- if (state == state_t::closing)
- return seastar::now();
- k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec(
- ceph::coarse_real_clock::now());
- return socket->write_flush(make_static_packet(k.req));
- });
- send_ready = f.get_future();
- return f.get_future();
+ logger().debug("{} write keepalive2 ack {}", *this, k.ack.stamp.tv_sec);
+ return socket->write(make_static_packet(k.ack));
}
seastar::future<> SocketConnection::do_close()
ceph_assert(state == state_t::connecting);
close_ready = pending_dispatch.close().finally(std::move(cleanup));
}
+
logger().debug("{} trigger closing, was {}", *this, static_cast<int>(state));
state = state_t::closing;
+ write_state = write_state_t::drop;
+ state_changed.set_value();
+ state_changed = seastar::shared_promise<>();
+
return close_ready.get_future();
}
return socket->read_exactly(sizeof(ceph_timespec))
.then([this] (auto buf) {
k.ack.stamp = *reinterpret_cast<const ceph_timespec*>(buf.get());
- seastar::shared_future<> f = send_ready.then([this] {
- logger().debug("{} keepalive2 {}", *this, k.ack.stamp.tv_sec);
- return socket->write_flush(make_static_packet(k.ack));
- });
- send_ready = f.get_future();
- return f.get_future();
+ logger().debug("{} got keepalive2 {}", *this, k.ack.stamp.tv_sec);
+ if (!m_keepalive_ack) {
+ m_keepalive_ack = true;
+ write_event();
+ }
});
}
.then([this] (auto buf) {
auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
k.ack_stamp = *t;
- logger().debug("{} keepalive2 ack {}", *this, t->tv_sec);
+ logger().debug("{} got keepalive2 ack {}", *this, t->tv_sec);
});
}
const entity_type_t& _peer_type)
{
ceph_assert(state == state_t::none);
+ logger().debug("{} trigger connecting, was {}", *this, static_cast<int>(state));
+ state = state_t::connecting;
+ write_state = write_state_t::delay;
+ state_changed.set_value();
+ state_changed = seastar::shared_promise<>();
+
ceph_assert(!socket);
peer_addr = _peer_addr;
peer_type = _peer_type;
messenger.register_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
- logger().debug("{} trigger connecting, was {}", *this, static_cast<int>(state));
- state = state_t::connecting;
seastar::with_gate(pending_dispatch, [this] {
return seastar::connect(peer_addr.in4_addr())
.then([this](seastar::connected_socket fd) {
}).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in the connecting state
logger().warn("{} connecting fault: {}", *this, eptr);
- h.promise.set_value();
close();
});
});
const entity_addr_t& _peer_addr)
{
ceph_assert(state == state_t::none);
+ logger().debug("{} trigger accepting, was {}", *this, static_cast<int>(state));
+ state = state_t::accepting;
+ write_state = write_state_t::delay;
+ state_changed.set_value();
+ state_changed = seastar::shared_promise<>();
+
ceph_assert(!socket);
peer_addr.u = _peer_addr.u;
peer_addr.set_port(0);
socket_port = _peer_addr.get_port();
socket = std::move(sock);
messenger.accept_conn(seastar::static_pointer_cast<SocketConnection>(shared_from_this()));
- logger().debug("{} trigger accepting, was {}", *this, static_cast<int>(state));
- state = state_t::accepting;
seastar::with_gate(pending_dispatch, [this, _peer_addr] {
// encode/send server's handshake header
bufferlist bl;
}).handle_exception([this] (std::exception_ptr eptr) {
// TODO: handle fault in the accepting state
logger().warn("{} accepting fault: {}", *this, eptr);
- h.promise.set_value();
close();
});
});
{
logger().debug("{} trigger open, was {}", *this, static_cast<int>(state));
state = state_t::open;
- // satisfy the handshake's promise
- h.promise.set_value();
+ write_state = write_state_t::open;
+ state_changed.set_value();
+ state_changed = seastar::shared_promise<>();
+
seastar::with_gate(pending_dispatch, [this] {
// start background processing of tags
return handle_tags()