]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg: add seastar SocketMessenger
authorCasey Bodley <cbodley@redhat.com>
Sat, 21 Oct 2017 20:58:41 +0000 (16:58 -0400)
committerKefu Chai <kchai@redhat.com>
Wed, 13 Jun 2018 06:09:21 +0000 (14:09 +0800)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/crimson/net/CMakeLists.txt
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h
src/crimson/net/SocketMessenger.cc [new file with mode: 0644]
src/crimson/net/SocketMessenger.h [new file with mode: 0644]

index 9c4118252570cc808f14011e9b85d383583bedc9..c707adddf63dadf446c82ef06681978b0d2b50a0 100644 (file)
@@ -1,6 +1,7 @@
 set(crimson_net_srcs
   Errors.cc
-  SocketConnection.cc)
+  SocketConnection.cc
+  SocketMessenger.cc)
 add_library(crimson_net_objs OBJECT ${crimson_net_srcs})
 target_compile_definitions(crimson_net_objs
   PUBLIC $<TARGET_PROPERTY:Seastar::seastar,INTERFACE_COMPILE_DEFINITIONS>)
index af89fa6f12b6f403ca9e13b1b0d342a1c7ad0384..8ecafa43bdb35a47c5057d003878fe47a6fd3841 100644 (file)
@@ -93,21 +93,19 @@ seastar::future<bufferlist> SocketConnection::read(size_t bytes)
     });
 }
 
-seastar::future<ceph_msg_header> SocketConnection::read_header()
-{
-  return read(sizeof(m.header))
-    .then([this] (bufferlist bl) {
-      auto p = bl.begin();
-      ::decode(m.header, p);
-      return m.header;
-    });
-}
-
 seastar::future<MessageRef> SocketConnection::read_message()
 {
-  // read front
-  return read(m.header.front_len)
-    .then([this] (bufferlist bl) {
+  return on_message.get_future()
+    .then([this] {
+      // read header
+      return read(sizeof(m.header));
+    }).then([this] (bufferlist bl) {
+      auto p = bl.cbegin();
+      ::decode(m.header, p);
+    }).then([this] {
+      // read front
+      return read(m.header.front_len);
+    }).then([this] (bufferlist bl) {
       m.front = std::move(bl);
       // read middle
       return read(m.header.middle_len);
index 617d6dbbe86332bea8aa8cd5d72a1832a4088dfd..ff77f0dd3360d92c772d7390005404300fcac783 100644 (file)
@@ -65,8 +65,6 @@ class SocketConnection : public Connection {
 
   seastar::future<> server_handshake() override;
 
-  seastar::future<ceph_msg_header> read_header() override;
-
   seastar::future<MessageRef> read_message() override;
 
   seastar::future<> send(MessageRef msg) override;
diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc
new file mode 100644 (file)
index 0000000..62e0cb3
--- /dev/null
@@ -0,0 +1,156 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "SocketMessenger.h"
+#include "SocketConnection.h"
+#include "Dispatcher.h"
+#include "msg/Message.h"
+
+using namespace ceph::net;
+
+SocketMessenger::SocketMessenger(const entity_name_t& myname)
+  : Messenger{myname}
+{}
+
+void SocketMessenger::bind(const entity_addr_t& addr)
+{
+  if (addr.get_family() != AF_INET) {
+    throw std::system_error(EAFNOSUPPORT, std::generic_category());
+  }
+
+  set_myaddr(addr);
+
+  seastar::socket_address address(addr.in4_addr());
+  seastar::listen_options lo;
+  lo.reuse_address = true;
+  listener = seastar::listen(address, lo);
+}
+
+seastar::future<> SocketMessenger::dispatch(ConnectionRef conn)
+{
+  connections.push_back(conn);
+
+  return seastar::repeat([=] {
+      return conn->read_message()
+        .then([=] (MessageRef msg) {
+          if (msg) {
+           return dispatcher->ms_dispatch(conn, std::move(msg));
+         } else {
+           return seastar::now();
+         }
+        }).then([] {
+          return seastar::stop_iteration::no;
+        });
+    }).handle_exception_type([=] (const std::system_error& e) {
+      if (e.code() == error::connection_aborted ||
+          e.code() == error::connection_reset) {
+        dispatcher->ms_handle_reset(conn);
+      } else if (e.code() == error::read_eof) {
+        dispatcher->ms_handle_remote_reset(conn);
+      } else {
+        throw e;
+      }
+    });
+}
+
+seastar::future<> SocketMessenger::accept(seastar::connected_socket socket,
+                                          seastar::socket_address paddr)
+{
+  // allocate the connection
+  entity_addr_t peer_addr;
+  peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
+  ConnectionRef conn = new SocketConnection(this, get_myaddr(),
+                                            peer_addr, std::move(socket));
+  // initiate the handshake
+  return conn->server_handshake()
+    .handle_exception([conn] (std::exception_ptr eptr) {
+      // close the connection before returning errors
+      return seastar::make_exception_future<>(eptr)
+        .finally([conn] { return conn->close(); });
+    }).then([this, conn] {
+      dispatcher->ms_handle_accept(conn);
+      // dispatch messages until the connection closes or the dispatch
+      // queue shuts down
+      return dispatch(std::move(conn));
+    });
+}
+
+seastar::future<> SocketMessenger::start(Dispatcher *disp)
+{
+  dispatcher = disp;
+
+  // start listening if bind() was called
+  if (listener) {
+    seastar::repeat([this] {
+        return listener->accept()
+          .then([this] (seastar::connected_socket socket,
+                        seastar::socket_address paddr) {
+            // start processing the connection
+            accept(std::move(socket), paddr)
+              .handle_exception([] (std::exception_ptr eptr) {});
+            // don't wait before accepting another
+            return seastar::stop_iteration::no;
+          });
+      }).handle_exception_type([this] (const std::system_error& e) {
+        // stop gracefully on connection_aborted
+        if (e.code() != error::connection_aborted) {
+          throw e;
+        }
+      });
+  }
+
+  return seastar::now();
+}
+
+seastar::future<ceph::net::ConnectionRef> SocketMessenger::connect(const entity_addr_t& addr,
+                                                        const entity_addr_t& myaddr)
+{
+  if (auto found = std::find_if(connections.begin(),
+                               connections.end(),
+                               [&addr](auto conn) {
+                                 return conn->get_peer_addr() == addr;
+                               });
+      found != connections.end()) {
+    return seastar::make_ready_future<ceph::net::ConnectionRef>(*found);
+  }
+  return seastar::connect(addr.in4_addr())
+    .then([=] (seastar::connected_socket socket) {
+      ConnectionRef conn = new SocketConnection(this, get_myaddr(), addr,
+                                                std::move(socket));
+      // complete the handshake before returning to the caller
+      return conn->client_handshake()
+        .handle_exception([conn] (std::exception_ptr eptr) {
+          // close the connection before returning errors
+          return seastar::make_exception_future<>(eptr)
+            .finally([conn] { return conn->close(); });
+        }).then([=] {
+          dispatcher->ms_handle_connect(conn);
+          // dispatch replies on this connection
+          dispatch(conn)
+            .handle_exception([] (std::exception_ptr eptr) {});
+          return conn;
+        });
+    });
+}
+
+seastar::future<> SocketMessenger::shutdown()
+{
+  if (listener) {
+    listener->abort_accept();
+  }
+  return seastar::parallel_for_each(connections.begin(), connections.end(),
+    [this] (ConnectionRef conn) {
+      return conn->close();
+    }).finally([this] { connections.clear(); });
+}
diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h
new file mode 100644 (file)
index 0000000..5cca096
--- /dev/null
@@ -0,0 +1,48 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <list>
+#include <boost/optional.hpp>
+#include <core/reactor.hh>
+
+#include "Messenger.h"
+
+namespace ceph::net {
+
+class SocketMessenger : public Messenger {
+  boost::optional<seastar::server_socket> listener;
+  Dispatcher *dispatcher = nullptr;
+  std::list<ConnectionRef> connections;
+
+  seastar::future<> dispatch(ConnectionRef conn);
+
+  seastar::future<> accept(seastar::connected_socket socket,
+                           seastar::socket_address paddr);
+
+ public:
+  SocketMessenger(const entity_name_t& myname);
+
+  void bind(const entity_addr_t& addr) override;
+
+  seastar::future<> start(Dispatcher *dispatcher) override;
+
+  seastar::future<ConnectionRef> connect(const entity_addr_t& addr,
+                                         const entity_addr_t& myaddr) override;
+
+  seastar::future<> shutdown() override;
+};
+
+} // namespace ceph::net