From 69daf369ef0764070cc01db1de054cc90361f33c Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Sat, 21 Oct 2017 16:58:41 -0400 Subject: [PATCH] msg: add seastar SocketMessenger Signed-off-by: Casey Bodley --- src/crimson/net/CMakeLists.txt | 3 +- src/crimson/net/SocketConnection.cc | 24 ++--- src/crimson/net/SocketConnection.h | 2 - src/crimson/net/SocketMessenger.cc | 156 ++++++++++++++++++++++++++++ src/crimson/net/SocketMessenger.h | 48 +++++++++ 5 files changed, 217 insertions(+), 16 deletions(-) create mode 100644 src/crimson/net/SocketMessenger.cc create mode 100644 src/crimson/net/SocketMessenger.h diff --git a/src/crimson/net/CMakeLists.txt b/src/crimson/net/CMakeLists.txt index 9c4118252570c..c707adddf63da 100644 --- a/src/crimson/net/CMakeLists.txt +++ b/src/crimson/net/CMakeLists.txt @@ -1,6 +1,7 @@ set(crimson_net_srcs Errors.cc - SocketConnection.cc) + SocketConnection.cc + SocketMessenger.cc) add_library(crimson_net_objs OBJECT ${crimson_net_srcs}) target_compile_definitions(crimson_net_objs PUBLIC $) diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index af89fa6f12b6f..8ecafa43bdb35 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -93,21 +93,19 @@ seastar::future SocketConnection::read(size_t bytes) }); } -seastar::future SocketConnection::read_header() -{ - return read(sizeof(m.header)) - .then([this] (bufferlist bl) { - auto p = bl.begin(); - ::decode(m.header, p); - return m.header; - }); -} - seastar::future SocketConnection::read_message() { - // read front - return read(m.header.front_len) - .then([this] (bufferlist bl) { + return on_message.get_future() + .then([this] { + // read header + return read(sizeof(m.header)); + }).then([this] (bufferlist bl) { + auto p = bl.cbegin(); + ::decode(m.header, p); + }).then([this] { + // read front + return read(m.header.front_len); + }).then([this] (bufferlist bl) { m.front = std::move(bl); // read middle return read(m.header.middle_len); diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 617d6dbbe8633..ff77f0dd3360d 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -65,8 +65,6 @@ class SocketConnection : public Connection { seastar::future<> server_handshake() override; - seastar::future read_header() override; - seastar::future read_message() override; seastar::future<> send(MessageRef msg) override; diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc new file mode 100644 index 0000000000000..62e0cb3cdd7f9 --- /dev/null +++ b/src/crimson/net/SocketMessenger.cc @@ -0,0 +1,156 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "SocketMessenger.h" +#include "SocketConnection.h" +#include "Dispatcher.h" +#include "msg/Message.h" + +using namespace ceph::net; + +SocketMessenger::SocketMessenger(const entity_name_t& myname) + : Messenger{myname} +{} + +void SocketMessenger::bind(const entity_addr_t& addr) +{ + if (addr.get_family() != AF_INET) { + throw std::system_error(EAFNOSUPPORT, std::generic_category()); + } + + set_myaddr(addr); + + seastar::socket_address address(addr.in4_addr()); + seastar::listen_options lo; + lo.reuse_address = true; + listener = seastar::listen(address, lo); +} + +seastar::future<> SocketMessenger::dispatch(ConnectionRef conn) +{ + connections.push_back(conn); + + return seastar::repeat([=] { + return conn->read_message() + .then([=] (MessageRef msg) { + if (msg) { + return dispatcher->ms_dispatch(conn, std::move(msg)); + } else { + return seastar::now(); + } + }).then([] { + return seastar::stop_iteration::no; + }); + }).handle_exception_type([=] (const std::system_error& e) { + if (e.code() == error::connection_aborted || + e.code() == error::connection_reset) { + dispatcher->ms_handle_reset(conn); + } else if (e.code() == error::read_eof) { + dispatcher->ms_handle_remote_reset(conn); + } else { + throw e; + } + }); +} + +seastar::future<> SocketMessenger::accept(seastar::connected_socket socket, + seastar::socket_address paddr) +{ + // allocate the connection + entity_addr_t peer_addr; + peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); + ConnectionRef conn = new SocketConnection(this, get_myaddr(), + peer_addr, std::move(socket)); + // initiate the handshake + return conn->server_handshake() + .handle_exception([conn] (std::exception_ptr eptr) { + // close the connection before returning errors + return seastar::make_exception_future<>(eptr) + .finally([conn] { return conn->close(); }); + }).then([this, conn] { + dispatcher->ms_handle_accept(conn); + // dispatch messages until the connection closes or the dispatch + // queue shuts down + return dispatch(std::move(conn)); + }); +} + +seastar::future<> SocketMessenger::start(Dispatcher *disp) +{ + dispatcher = disp; + + // start listening if bind() was called + if (listener) { + seastar::repeat([this] { + return listener->accept() + .then([this] (seastar::connected_socket socket, + seastar::socket_address paddr) { + // start processing the connection + accept(std::move(socket), paddr) + .handle_exception([] (std::exception_ptr eptr) {}); + // don't wait before accepting another + return seastar::stop_iteration::no; + }); + }).handle_exception_type([this] (const std::system_error& e) { + // stop gracefully on connection_aborted + if (e.code() != error::connection_aborted) { + throw e; + } + }); + } + + return seastar::now(); +} + +seastar::future SocketMessenger::connect(const entity_addr_t& addr, + const entity_addr_t& myaddr) +{ + if (auto found = std::find_if(connections.begin(), + connections.end(), + [&addr](auto conn) { + return conn->get_peer_addr() == addr; + }); + found != connections.end()) { + return seastar::make_ready_future(*found); + } + return seastar::connect(addr.in4_addr()) + .then([=] (seastar::connected_socket socket) { + ConnectionRef conn = new SocketConnection(this, get_myaddr(), addr, + std::move(socket)); + // complete the handshake before returning to the caller + return conn->client_handshake() + .handle_exception([conn] (std::exception_ptr eptr) { + // close the connection before returning errors + return seastar::make_exception_future<>(eptr) + .finally([conn] { return conn->close(); }); + }).then([=] { + dispatcher->ms_handle_connect(conn); + // dispatch replies on this connection + dispatch(conn) + .handle_exception([] (std::exception_ptr eptr) {}); + return conn; + }); + }); +} + +seastar::future<> SocketMessenger::shutdown() +{ + if (listener) { + listener->abort_accept(); + } + return seastar::parallel_for_each(connections.begin(), connections.end(), + [this] (ConnectionRef conn) { + return conn->close(); + }).finally([this] { connections.clear(); }); +} diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h new file mode 100644 index 0000000000000..5cca096a65908 --- /dev/null +++ b/src/crimson/net/SocketMessenger.h @@ -0,0 +1,48 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2017 Red Hat, Inc + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include +#include + +#include "Messenger.h" + +namespace ceph::net { + +class SocketMessenger : public Messenger { + boost::optional listener; + Dispatcher *dispatcher = nullptr; + std::list connections; + + seastar::future<> dispatch(ConnectionRef conn); + + seastar::future<> accept(seastar::connected_socket socket, + seastar::socket_address paddr); + + public: + SocketMessenger(const entity_name_t& myname); + + void bind(const entity_addr_t& addr) override; + + seastar::future<> start(Dispatcher *dispatcher) override; + + seastar::future connect(const entity_addr_t& addr, + const entity_addr_t& myaddr) override; + + seastar::future<> shutdown() override; +}; + +} // namespace ceph::net -- 2.39.5