From 7d949170f845d7fd14f93d2c0cd8b31cb3264b71 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Sat, 21 Oct 2017 16:22:45 -0400 Subject: [PATCH] msg: add seastar SocketConnection Signed-off-by: Casey Bodley --- src/crimson/net/CMakeLists.txt | 3 +- src/crimson/net/Errors.cc | 2 + src/crimson/net/Errors.h | 1 + src/crimson/net/SocketConnection.cc | 169 ++++++++++++++++++++++++++++ src/crimson/net/SocketConnection.h | 78 +++++++++++++ 5 files changed, 252 insertions(+), 1 deletion(-) create mode 100644 src/crimson/net/SocketConnection.cc create mode 100644 src/crimson/net/SocketConnection.h diff --git a/src/crimson/net/CMakeLists.txt b/src/crimson/net/CMakeLists.txt index a7f3c7162e7e2..9c4118252570c 100644 --- a/src/crimson/net/CMakeLists.txt +++ b/src/crimson/net/CMakeLists.txt @@ -1,5 +1,6 @@ set(crimson_net_srcs - Errors.cc) + Errors.cc + SocketConnection.cc) add_library(crimson_net_objs OBJECT ${crimson_net_srcs}) target_compile_definitions(crimson_net_objs PUBLIC $) diff --git a/src/crimson/net/Errors.cc b/src/crimson/net/Errors.cc index 1ac6fe8c48318..fe182377dfcf4 100644 --- a/src/crimson/net/Errors.cc +++ b/src/crimson/net/Errors.cc @@ -25,6 +25,8 @@ const std::error_category& net_category() std::string message(int ev) const override { switch (static_cast(ev)) { + case error::read_eof: + return "read eof"; case error::connection_aborted: return "connection aborted"; case error::connection_refused: diff --git a/src/crimson/net/Errors.h b/src/crimson/net/Errors.h index e02e6cdd78dbd..af3720ae469a6 100644 --- a/src/crimson/net/Errors.h +++ b/src/crimson/net/Errors.h @@ -20,6 +20,7 @@ namespace ceph::net { /// net error codes enum class error { + read_eof, connection_aborted, connection_refused, connection_reset, diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc new file mode 100644 index 0000000000000..af89fa6f12b6f --- /dev/null +++ b/src/crimson/net/SocketConnection.cc @@ -0,0 +1,169 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include + +#include "SocketConnection.h" + +#include "msg/Message.h" + +using namespace ceph::net; + +SocketConnection::SocketConnection(Messenger *messenger, + const entity_addr_t& my_addr, + const entity_addr_t& peer_addr, + seastar::connected_socket&& fd) + : Connection(messenger, my_addr, peer_addr), + socket(std::move(fd)), + in(socket.input()), + out(socket.output()), + send_ready(seastar::now()) +{ +} + +SocketConnection::~SocketConnection() +{ + // errors were reported to callers of send() + assert(send_ready.available()); + send_ready.ignore_ready_future(); +} + +bool SocketConnection::is_connected() +{ + return !send_ready.failed(); +} + +// an input_stream consumer that reads buffer segments into a bufferlist up to +// the given number of remaining bytes +struct bufferlist_consumer { + bufferlist& bl; + size_t& remaining; + + bufferlist_consumer(bufferlist& bl, size_t& remaining) + : bl(bl), remaining(remaining) {} + + using tmp_buf = seastar::temporary_buffer; + using unconsumed_remainder = std::experimental::optional; + + // consume some or all of a buffer segment + seastar::future operator()(tmp_buf&& data) { + if (remaining >= data.size()) { + // consume the whole buffer + remaining -= data.size(); + bl.append(buffer::create_foreign(std::move(data))); + if (remaining > 0) { + // return none to request more segments + return seastar::make_ready_future(); + } + // return an empty buffer to singal that we're done + return seastar::make_ready_future(tmp_buf{}); + } + if (remaining > 0) { + // consume the front + bl.append(buffer::create_foreign(data.share(0, remaining))); + data.trim_front(remaining); + remaining = 0; + } + // give the rest back to signal that we're done + return seastar::make_ready_future(std::move(data)); + }; +}; + +seastar::future SocketConnection::read(size_t bytes) +{ + r.buffer.clear(); + r.remaining = bytes; + return in.consume(bufferlist_consumer{r.buffer, r.remaining}) + .then([this] { + if (r.remaining) { // throw on short reads + throw std::system_error(make_error_code(error::read_eof)); + } + return seastar::make_ready_future(std::move(r.buffer)); + }); +} + +seastar::future SocketConnection::read_header() +{ + return read(sizeof(m.header)) + .then([this] (bufferlist bl) { + auto p = bl.begin(); + ::decode(m.header, p); + return m.header; + }); +} + +seastar::future SocketConnection::read_message() +{ + // read front + return read(m.header.front_len) + .then([this] (bufferlist bl) { + m.front = std::move(bl); + // read middle + return read(m.header.middle_len); + }).then([this] (bufferlist bl) { + m.middle = std::move(bl); + // read data + return read(m.header.data_len); + }).then([this] (bufferlist bl) { + m.data = std::move(bl); + // read footer + return read(sizeof(m.footer)); + }).then([this] (bufferlist bl) { + auto p = bl.begin(); + ::decode(m.footer, p); + + auto msg = ::decode_message(nullptr, 0, m.header, m.footer, + m.front, m.middle, m.data, nullptr); + constexpr bool add_ref = false; // Message starts with 1 ref + return MessageRef{msg, add_ref}; + }); +} + +seastar::future<> SocketConnection::write_message(MessageRef msg) +{ + bufferlist bl; + encode_message(msg.get(), 0, bl); + // write as a seastar::net::packet + return out.write(std::move(bl)) + .then([this] { return out.flush(); }); +} + +seastar::future<> SocketConnection::send(MessageRef msg) +{ + // chain the message after the last message is sent + seastar::shared_future<> f = send_ready.then( + [this, msg = std::move(msg)] { + return write_message(std::move(msg)); + }); + + // chain any later messages after this one completes + send_ready = f.get_future(); + // allow the caller to wait on the same future + return f.get_future(); +} + +seastar::future<> SocketConnection::close() +{ + return seastar::when_all(in.close(), out.close()).discard_result(); +} + +seastar::future<> SocketConnection::client_handshake() +{ + return seastar::now(); // TODO +} + +seastar::future<> SocketConnection::server_handshake() +{ + return seastar::now(); // TODO +} diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h new file mode 100644 index 0000000000000..617d6dbbe8633 --- /dev/null +++ b/src/crimson/net/SocketConnection.h @@ -0,0 +1,78 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include + +#include "Connection.h" + +namespace ceph { +namespace net { + +class SocketConnection : public Connection { + seastar::connected_socket socket; + seastar::input_stream in; + seastar::output_stream out; + + /// buffer state for read() + struct Reader { + bufferlist buffer; + size_t remaining; + } r; + + /// read the requested number of bytes into a bufferlist + seastar::future read(size_t bytes); + + /// state for an incoming message + struct MessageReader { + ceph_msg_header header; + ceph_msg_footer footer; + bufferlist front; + bufferlist middle; + bufferlist data; + } m; + + /// becomes available when handshake completes, and when all previous messages + /// have been sent to the output stream. send() chains new messages as + /// continuations to this future to act as a queue + seastar::future<> send_ready; + + /// encode/write a message + seastar::future<> write_message(MessageRef msg); + + public: + SocketConnection(Messenger *messenger, + const entity_addr_t& my_addr, + const entity_addr_t& peer_addr, + seastar::connected_socket&& socket); + ~SocketConnection(); + + bool is_connected() override; + + seastar::future<> client_handshake() override; + + seastar::future<> server_handshake() override; + + seastar::future read_header() override; + + seastar::future read_message() override; + + seastar::future<> send(MessageRef msg) override; + + seastar::future<> close() override; +}; + +} // namespace net +} // namespace ceph -- 2.39.5