}
SocketConnection::SocketConnection(SocketMessenger& messenger,
- const entity_addr_t& my_addr,
- const entity_addr_t& peer_addr,
- seastar::connected_socket&& fd)
- : Connection(my_addr, peer_addr),
+ const entity_addr_t& my_addr)
+ : Connection(my_addr),
messenger(messenger),
- socket(std::move(fd)),
- in(socket.input()),
- out(socket.output()),
send_ready(h.promise.get_future())
{
}
return !send_ready.failed();
}
-// an input_stream consumer that reads buffer segments into a bufferlist up to
-// the given number of remaining bytes
-struct bufferlist_consumer {
- bufferlist& bl;
- size_t& remaining;
-
- bufferlist_consumer(bufferlist& bl, size_t& remaining)
- : bl(bl), remaining(remaining) {}
-
- using tmp_buf = seastar::temporary_buffer<char>;
- using consumption_result_type = typename seastar::input_stream<char>::consumption_result_type;
-
- // consume some or all of a buffer segment
- seastar::future<consumption_result_type> operator()(tmp_buf&& data) {
- if (remaining >= data.size()) {
- // consume the whole buffer
- remaining -= data.size();
- bl.append(buffer::create_foreign(std::move(data)));
- if (remaining > 0) {
- // return none to request more segments
- return seastar::make_ready_future<consumption_result_type>(
- seastar::continue_consuming{});
- } else {
- // return an empty buffer to singal that we're done
- return seastar::make_ready_future<consumption_result_type>(
- consumption_result_type::stop_consuming_type({}));
- }
- }
- if (remaining > 0) {
- // consume the front
- bl.append(buffer::create_foreign(data.share(0, remaining)));
- data.trim_front(remaining);
- remaining = 0;
- }
- // give the rest back to signal that we're done
- return seastar::make_ready_future<consumption_result_type>(
- consumption_result_type::stop_consuming_type{std::move(data)});
- };
-};
-
-seastar::future<bufferlist> SocketConnection::read(size_t bytes)
-{
- if (bytes == 0) {
- return seastar::make_ready_future<bufferlist>();
- }
- r.buffer.clear();
- r.remaining = bytes;
- return in.consume(bufferlist_consumer{r.buffer, r.remaining})
- .then([this] {
- if (r.remaining) { // throw on short reads
- throw std::system_error(make_error_code(error::read_eof));
- }
- return seastar::make_ready_future<bufferlist>(std::move(r.buffer));
- });
-}
-
void SocketConnection::read_tags_until_next_message()
{
seastar::repeat([this] {
// read the next tag
- return in.read_exactly(1)
+ return socket->read_exactly(1)
.then([this] (auto buf) {
if (buf.empty()) {
throw std::system_error(make_error_code(error::read_eof));
seastar::future<seastar::stop_iteration> SocketConnection::handle_ack()
{
- return in.read_exactly(sizeof(ceph_le64))
+ return socket->read_exactly(sizeof(ceph_le64))
.then([this] (auto buf) {
auto seq = reinterpret_cast<const ceph_le64*>(buf.get());
discard_up_to(&sent, *seq);
.then([this] {
on_message = seastar::promise<>{};
// read header
- return read(sizeof(m.header));
+ return socket->read(sizeof(m.header));
}).then([this] (bufferlist bl) {
// throttle the traffic, maybe
auto p = bl.cbegin();
return maybe_throttle();
}).then([this] {
// read front
- return read(m.header.front_len);
+ return socket->read(m.header.front_len);
}).then([this] (bufferlist bl) {
m.front = std::move(bl);
// read middle
- return read(m.header.middle_len);
+ return socket->read(m.header.middle_len);
}).then([this] (bufferlist bl) {
m.middle = std::move(bl);
// read data
- return read(m.header.data_len);
+ return socket->read(m.header.data_len);
}).then([this] (bufferlist bl) {
m.data = std::move(bl);
// read footer
- return read(sizeof(m.footer));
+ return socket->read(sizeof(m.footer));
}).then([this] (bufferlist bl) {
// resume background processing of tags
read_tags_until_next_message();
bl.append((const char*)&old_footer, sizeof(old_footer));
}
// write as a seastar::net::packet
- return out.write(std::move(bl))
- .then([this] { return out.flush(); })
+ return socket->write_flush(std::move(bl))
.then([this, msg = std::move(msg)] {
if (!policy.lossy) {
sent.push(std::move(msg));
seastar::shared_future<> f = send_ready.then([this] {
k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec(
ceph::coarse_real_clock::now());
- return out.write(make_static_packet(k.req));
- }).then([this] {
- return out.flush();
+ return socket->write_flush(make_static_packet(k.req));
});
send_ready = f.get_future();
return f.get_future();
seastar::future<> SocketConnection::close()
{
- if (state == state_t::closed) {
+ if (state == state_t::closing) {
// already closing
assert(close_ready.valid());
return close_ready.get_future();
}
- state = state_t::closed;
-
// unregister_conn() drops a reference, so hold another until completion
auto cleanup = [conn = SocketConnectionRef(this)] {};
- messenger.unregister_conn(this);
+ if (state == state_t::accepting) {
+ messenger.unaccept_conn(this);
+ } else if (state >= state_t::connecting && state < state_t::closing) {
+ messenger.unregister_conn(this);
+ } else {
+ // cannot happen
+ ceph_assert(false);
+ }
+ state = state_t::closing;
- // close_ready become valid only after state is state_t::closed
+ // close_ready become valid only after state is state_t::closing
assert(!close_ready.valid());
- close_ready = seastar::when_all(in.close(), out.close())
- .discard_result()
- .finally(std::move(cleanup));
+ close_ready = socket->close().finally(std::move(cleanup));
return close_ready.get_future();
}
}
}
-seastar::future<> SocketConnection::handle_connect()
+seastar::future<>
+SocketConnection::repeat_handle_connect()
{
- return read(sizeof(h.connect))
+ return socket->read(sizeof(h.connect))
.then([this](bufferlist bl) {
auto p = bl.cbegin();
::decode(h.connect, p);
- return read(h.connect.authorizer_len);
+ peer_type = h.connect.host_type;
+ return socket->read(h.connect.authorizer_len);
}).then([this] (bufferlist authorizer) {
if (h.connect.protocol_version != get_proto_version(h.connect.host_type, false)) {
return seastar::make_ready_future<msgr_tag_t, bufferlist>(
return seastar::make_ready_future<msgr_tag_t, bufferlist>(
CEPH_MSGR_TAG_FEATURES, bufferlist{});
}
- return messenger.verify_authorizer(get_peer_type(),
- h.connect.authorizer_protocol,
- authorizer);
+ return messenger.verify_authorizer(peer_type,
+ h.connect.authorizer_protocol,
+ authorizer);
}).then([this] (ceph::net::msgr_tag_t tag, bufferlist&& authorizer_reply) {
memset(&h.reply, 0, sizeof(h.reply));
if (tag) {
policy.features_supported) |
policy.features_required);
h.reply.authorizer_len = authorizer_reply.length();
- return out.write(make_static_packet(h.reply))
+ return socket->write(make_static_packet(h.reply))
.then([this, reply=std::move(authorizer_reply)]() mutable {
- return out.write(std::move(reply));
- }).then([this] {
- return out.flush();
+ return socket->write_flush(std::move(reply));
});
}
h.reply.flags = h.reply.flags | CEPH_MSG_CONNECT_LOSSY;
}
h.reply.authorizer_len = authorizer_reply.length();
- return out.write(make_static_packet(h.reply))
+ return socket->write(make_static_packet(h.reply))
.then([this, reply=std::move(authorizer_reply)]() mutable {
if (reply.length()) {
- return out.write(std::move(reply));
+ return socket->write(std::move(reply));
} else {
return seastar::now();
}
}).then([this] {
if (h.reply.tag == CEPH_MSGR_TAG_SEQ) {
- return out.write(make_static_packet(in_seq))
+ return socket->write_flush(make_static_packet(in_seq))
.then([this] {
- return out.flush();
- }).then([this] {
- return in.read_exactly(sizeof(seq_num_t));
+ return socket->read_exactly(sizeof(seq_num_t));
}).then([this] (auto buf) {
auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
discard_up_to(&out_q, *acked_seq);
});
} else {
- return out.flush();
+ return socket->flush();
}
}).then([this] {
+ messenger.register_conn(this);
+ messenger.unaccept_conn(this);
state = state_t::open;
});
}
seastar::future<>
SocketConnection::handle_keepalive2()
{
- return in.read_exactly(sizeof(ceph_timespec))
+ return socket->read_exactly(sizeof(ceph_timespec))
.then([this] (auto buf) {
k.ack.stamp = *reinterpret_cast<const ceph_timespec*>(buf.get());
std::cout << "keepalive2 " << k.ack.stamp.tv_sec << std::endl;
- return out.write(make_static_packet(k.ack));
- }).then([this] {
- return out.flush();
+ return socket->write_flush(make_static_packet(k.ack));
});
}
seastar::future<>
SocketConnection::handle_keepalive2_ack()
{
- return in.read_exactly(sizeof(ceph_timespec))
+ return socket->read_exactly(sizeof(ceph_timespec))
.then([this] (auto buf) {
auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
k.ack_stamp = *t;
}
h.got_bad_auth = true;
// try harder
- return messenger.get_authorizer(h.peer_type, true)
+ return messenger.get_authorizer(peer_type, true)
.then([this](auto&& auth) {
h.authorizer = std::move(auth);
return seastar::now();
return fault();
}
if (tag == CEPH_MSGR_TAG_SEQ) {
- return in.read_exactly(sizeof(seq_num_t))
+ return socket->read_exactly(sizeof(seq_num_t))
.then([this] (auto buf) {
auto acked_seq = reinterpret_cast<const seq_num_t*>(buf.get());
discard_up_to(&out_q, *acked_seq);
- return out.write(make_static_packet(in_seq));
- }).then([this] {
- return out.flush();
+ return socket->write_flush(make_static_packet(in_seq));
}).then([this] {
return handle_connect_reply(CEPH_MSGR_TAG_READY);
});
}
}
-seastar::future<> SocketConnection::connect(entity_type_t peer_type,
- entity_type_t host_type)
+seastar::future<> SocketConnection::repeat_connect()
{
// encode ceph_msg_connect
- h.peer_type = peer_type;
memset(&h.connect, 0, sizeof(h.connect));
h.connect.features = policy.features_supported;
- h.connect.host_type = host_type;
+ h.connect.host_type = messenger.get_myname().type();
h.connect.global_seq = h.global_seq;
h.connect.connect_seq = h.connect_seq;
h.connect.protocol_version = get_proto_version(peer_type, true);
h.connect.authorizer_len = 0;
bl.append(create_static(h.connect));
};
- return out.write(std::move(bl));
- }).then([this] {
- return out.flush();
+ return socket->write_flush(std::move(bl));
}).then([this] {
// read the reply
- return read(sizeof(h.reply));
+ return socket->read(sizeof(h.reply));
}).then([this] (bufferlist bl) {
auto p = bl.cbegin();
::decode(h.reply, p);
ceph_assert(p.end());
- return read(h.reply.authorizer_len);
+ return socket->read(h.reply.authorizer_len);
}).then([this] (bufferlist bl) {
if (h.authorizer) {
auto reply = bl.cbegin();
});
}
-seastar::future<> SocketConnection::client_handshake(entity_type_t peer_type,
- entity_type_t host_type)
-{
- // read server's handshake header
- return read(server_header_size)
- .then([this] (bufferlist headerbl) {
+seastar::future<>
+SocketConnection::start_connect(const entity_addr_t& _peer_addr,
+ const entity_type_t& _peer_type)
+{
+ ceph_assert(state == state_t::none);
+ ceph_assert(!socket);
+ peer_addr = _peer_addr;
+ peer_type = _peer_type;
+ messenger.register_conn(this);
+ state = state_t::connecting;
+ return seastar::connect(peer_addr.in4_addr())
+ .then([this](seastar::connected_socket fd) {
+ socket.emplace(std::move(fd));
+ // read server's handshake header
+ return socket->read(server_header_size);
+ }).then([this] (bufferlist headerbl) {
auto p = headerbl.cbegin();
validate_banner(p);
entity_addr_t saddr, caddr;
bl.append(buffer::create_static(banner_size, banner));
::encode(my_addr, bl, 0);
h.global_seq = messenger.get_global_seq();
- return out.write(std::move(bl)).then([this] { return out.flush(); });
+ return socket->write_flush(std::move(bl));
}).then([=] {
return seastar::do_until([=] { return state == state_t::open; },
- [=] { return connect(peer_type, host_type); });
+ [=] { return repeat_connect(); });
}).then([this] {
// start background processing of tags
read_tags_until_next_message();
});
}
-seastar::future<> SocketConnection::server_handshake()
-{
+seastar::future<>
+SocketConnection::start_accept(seastar::connected_socket&& fd,
+ const entity_addr_t& _peer_addr)
+{
+ ceph_assert(state == state_t::none);
+ ceph_assert(!socket);
+ peer_addr = _peer_addr;
+ socket.emplace(std::move(fd));
+ messenger.accept_conn(this);
+ state = state_t::accepting;
// encode/send server's handshake header
bufferlist bl;
bl.append(buffer::create_static(banner_size, banner));
::encode(my_addr, bl, 0);
::encode(peer_addr, bl, 0);
- return out.write(std::move(bl))
- .then([this] { return out.flush(); })
+ return socket->write_flush(std::move(bl))
.then([this] {
// read client's handshake header and connect request
- return read(client_header_size);
+ return socket->read(client_header_size);
}).then([this] (bufferlist bl) {
auto p = bl.cbegin();
validate_banner(p);
}
}).then([this] {
return seastar::do_until([this] { return state == state_t::open; },
- [this] { return handle_connect(); });
+ [this] { return repeat_handle_connect(); });
}).then([this] {
// start background processing of tags
read_tags_until_next_message();
#include "msg/Policy.h"
#include "Connection.h"
+#include "Socket.h"
#include "crimson/thread/Throttle.h"
class AuthAuthorizer;
class SocketConnection : public Connection {
SocketMessenger& messenger;
- seastar::connected_socket socket;
- seastar::input_stream<char> in;
- seastar::output_stream<char> out;
+ std::optional<Socket> socket;
enum class state_t {
none,
+ accepting,
+ connecting,
open,
standby,
- closed,
- wait
+ wait,
+ closing
};
state_t state = state_t::none;
- /// become valid only when state is state_t::closed
+ /// become valid only when state is state_t::closing
seastar::shared_future<> close_ready;
- /// buffer state for read()
- struct Reader {
- bufferlist buffer;
- size_t remaining;
- } r;
-
- /// read the requested number of bytes into a bufferlist
- seastar::future<bufferlist> read(size_t bytes);
-
/// state for handshake
struct Handshake {
ceph_msg_connect connect;
ceph_msg_connect_reply reply;
bool got_bad_auth = false;
std::unique_ptr<AuthAuthorizer> authorizer;
- peer_type_t peer_type;
std::chrono::milliseconds backoff;
uint32_t connect_seq = 0;
uint32_t peer_global_seq = 0;
} h;
/// server side of handshake negotiation
- seastar::future<> handle_connect();
+ seastar::future<> repeat_handle_connect();
seastar::future<> handle_connect_with_existing(SocketConnectionRef existing,
bufferlist&& authorizer_reply);
seastar::future<> replace_existing(SocketConnectionRef existing,
bool require_auth_feature() const;
uint32_t get_proto_version(entity_type_t peer_type, bool connec) const;
/// client side of handshake negotiation
- seastar::future<> connect(entity_type_t peer_type, entity_type_t host_type);
+ seastar::future<> repeat_connect();
seastar::future<> handle_connect_reply(ceph::net::msgr_tag_t tag);
void reset_session();
public:
SocketConnection(SocketMessenger& messenger,
- const entity_addr_t& my_addr,
- const entity_addr_t& peer_addr,
- seastar::connected_socket&& socket);
+ const entity_addr_t& my_addr);
~SocketConnection();
Messenger* get_messenger() const override;
int get_peer_type() const override {
- return h.connect.host_type;
+ return peer_type;
}
bool is_connected() override;
public:
/// complete a handshake from the client's perspective
- seastar::future<> client_handshake(entity_type_t peer_type,
- entity_type_t host_type);
+ seastar::future<> start_connect(const entity_addr_t& peer_addr,
+ const entity_type_t& peer_type);
/// complete a handshake from the server's perspective
- seastar::future<> server_handshake();
+ seastar::future<> start_accept(seastar::connected_socket&& socket,
+ const entity_addr_t& peer_addr);
/// read a message from a connection that has completed its handshake
seastar::future<MessageRef> read_message();
seastar::future<> SocketMessenger::dispatch(SocketConnectionRef conn)
{
- auto [i, added] = connections.emplace(conn->get_peer_addr(), conn);
- std::ignore = i;
- ceph_assert(added);
-
return seastar::keep_doing([=] {
return conn->read_message()
.then([=] (MessageRef msg) {
entity_addr_t peer_addr;
peer_addr.set_type(entity_addr_t::TYPE_DEFAULT);
peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
- SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(),
- peer_addr, std::move(socket));
+ SocketConnectionRef conn = new SocketConnection(*this, get_myaddr());
// initiate the handshake
- return conn->server_handshake()
- .then([=] {
+ return conn->start_accept(std::move(socket), peer_addr)
+ .then([this, conn] {
// notify the dispatcher and allow them to reject the connection
return seastar::with_gate(pending_dispatch, [=] {
return dispatcher->ms_handle_accept(conn);
return seastar::now();
}
-seastar::future<ceph::net::ConnectionRef>
-SocketMessenger::connect(const entity_addr_t& addr, entity_type_t peer_type)
+ceph::net::ConnectionRef
+SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type)
{
- if (auto found = lookup_conn(addr); found) {
- return seastar::make_ready_future<ceph::net::ConnectionRef>(found);
+ if (auto found = lookup_conn(peer_addr); found) {
+ return found;
}
- return seastar::connect(addr.in4_addr())
- .then([=] (seastar::connected_socket socket) {
- SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), addr,
- std::move(socket));
- // complete the handshake before returning to the caller
- return conn->client_handshake(peer_type, get_myname().type())
- .then([=] {
- // notify the dispatcher and allow them to reject the connection
- return seastar::with_gate(pending_dispatch, [=] {
- return dispatcher->ms_handle_connect(conn);
- });
- }).handle_exception([conn] (std::exception_ptr eptr) {
- // close the connection before returning errors
- return seastar::make_exception_future<>(eptr)
- .finally([conn] { return conn->close(); });
- // TODO: retry on fault
- }).then([=] {
- // dispatch replies on this connection
- dispatch(conn)
- .handle_exception([] (std::exception_ptr eptr) {});
- return ConnectionRef(conn);
- });
+ SocketConnectionRef conn = new SocketConnection(*this, get_myaddr());
+ conn->start_connect(peer_addr, peer_type)
+ .then([this, conn] {
+ // notify the dispatcher and allow them to reject the connection
+ return seastar::with_gate(pending_dispatch, [this, conn] {
+ return dispatcher->ms_handle_connect(conn);
+ });
+ }).handle_exception([conn] (std::exception_ptr eptr) {
+ // close the connection before returning errors
+ return seastar::make_exception_future<>(eptr)
+ .finally([conn] { return conn->close(); });
+ // TODO: retry on fault
+ }).then([this, conn] {
+ // dispatch replies on this connection
+ dispatch(conn)
+ .handle_exception([] (std::exception_ptr eptr) {});
});
+ return conn;
}
seastar::future<> SocketMessenger::shutdown()
listener->abort_accept();
}
// close all connections
- return seastar::parallel_for_each(connections.begin(), connections.end(),
- [this] (auto conn) {
- return conn.second->close();
+ return seastar::parallel_for_each(accepting_conns, [] (auto conn) {
+ return conn->close();
+ }).then([this] {
+ ceph_assert(accepting_conns.empty());
+ return seastar::parallel_for_each(connections, [] (auto conn) {
+ return conn.second->close();
+ });
}).finally([this] {
- connections.clear();
+ ceph_assert(connections.empty());
// closing connections will unblock any dispatchers that were waiting to
// send(). wait for any pending calls to finish
return pending_dispatch.close();
}
}
+void SocketMessenger::accept_conn(SocketConnectionRef conn)
+{
+ accepting_conns.insert(conn);
+}
+
+void SocketMessenger::unaccept_conn(SocketConnectionRef conn)
+{
+ accepting_conns.erase(conn);
+}
+
+void SocketMessenger::register_conn(SocketConnectionRef conn)
+{
+ auto [i, added] = connections.emplace(conn->get_peer_addr(), conn);
+ std::ignore = i;
+ ceph_assert(added);
+}
+
void SocketMessenger::unregister_conn(SocketConnectionRef conn)
{
ceph_assert(conn);