]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg: add seastar SocketConnection
authorCasey Bodley <cbodley@redhat.com>
Sat, 21 Oct 2017 20:22:45 +0000 (16:22 -0400)
committerKefu Chai <kchai@redhat.com>
Wed, 13 Jun 2018 06:09:21 +0000 (14:09 +0800)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/crimson/net/CMakeLists.txt
src/crimson/net/Errors.cc
src/crimson/net/Errors.h
src/crimson/net/SocketConnection.cc [new file with mode: 0644]
src/crimson/net/SocketConnection.h [new file with mode: 0644]

index a7f3c7162e7e24d72a2c30810c8fd533984556fd..9c4118252570cc808f14011e9b85d383583bedc9 100644 (file)
@@ -1,5 +1,6 @@
 set(crimson_net_srcs
-  Errors.cc)
+  Errors.cc
+  SocketConnection.cc)
 add_library(crimson_net_objs OBJECT ${crimson_net_srcs})
 target_compile_definitions(crimson_net_objs
   PUBLIC $<TARGET_PROPERTY:Seastar::seastar,INTERFACE_COMPILE_DEFINITIONS>)
index 1ac6fe8c48318a1dba5b3298f2b60173387630e7..fe182377dfcf46287f2f496a4b6e2f6c3a6d4be5 100644 (file)
@@ -25,6 +25,8 @@ const std::error_category& net_category()
 
     std::string message(int ev) const override {
       switch (static_cast<error>(ev)) {
+        case error::read_eof:
+          return "read eof";
         case error::connection_aborted:
           return "connection aborted";
         case error::connection_refused:
index e02e6cdd78dbdd9a02475ae0c37e8fd822c06b32..af3720ae469a66b3c5e9aa65a3844f303740ea35 100644 (file)
@@ -20,6 +20,7 @@ namespace ceph::net {
 
 /// net error codes
 enum class error {
+  read_eof,
   connection_aborted,
   connection_refused,
   connection_reset,
diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc
new file mode 100644 (file)
index 0000000..af89fa6
--- /dev/null
@@ -0,0 +1,169 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include <core/shared_future.hh>
+
+#include "SocketConnection.h"
+
+#include "msg/Message.h"
+
+using namespace ceph::net;
+
+SocketConnection::SocketConnection(Messenger *messenger,
+                                   const entity_addr_t& my_addr,
+                                   const entity_addr_t& peer_addr,
+                                   seastar::connected_socket&& fd)
+  : Connection(messenger, my_addr, peer_addr),
+    socket(std::move(fd)),
+    in(socket.input()),
+    out(socket.output()),
+    send_ready(seastar::now())
+{
+}
+
+SocketConnection::~SocketConnection()
+{
+  // errors were reported to callers of send()
+  assert(send_ready.available());
+  send_ready.ignore_ready_future();
+}
+
+bool SocketConnection::is_connected()
+{
+  return !send_ready.failed();
+}
+
+// an input_stream consumer that reads buffer segments into a bufferlist up to
+// the given number of remaining bytes
+struct bufferlist_consumer {
+  bufferlist& bl;
+  size_t& remaining;
+
+  bufferlist_consumer(bufferlist& bl, size_t& remaining)
+    : bl(bl), remaining(remaining) {}
+
+  using tmp_buf = seastar::temporary_buffer<char>;
+  using unconsumed_remainder = std::experimental::optional<tmp_buf>;
+
+  // consume some or all of a buffer segment
+  seastar::future<unconsumed_remainder> operator()(tmp_buf&& data) {
+    if (remaining >= data.size()) {
+      // consume the whole buffer
+      remaining -= data.size();
+      bl.append(buffer::create_foreign(std::move(data)));
+      if (remaining > 0) {
+        // return none to request more segments
+        return seastar::make_ready_future<unconsumed_remainder>();
+      }
+      // return an empty buffer to singal that we're done
+      return seastar::make_ready_future<unconsumed_remainder>(tmp_buf{});
+    }
+    if (remaining > 0) {
+      // consume the front
+      bl.append(buffer::create_foreign(data.share(0, remaining)));
+      data.trim_front(remaining);
+      remaining = 0;
+    }
+    // give the rest back to signal that we're done
+    return seastar::make_ready_future<unconsumed_remainder>(std::move(data));
+  };
+};
+
+seastar::future<bufferlist> SocketConnection::read(size_t bytes)
+{
+  r.buffer.clear();
+  r.remaining = bytes;
+  return in.consume(bufferlist_consumer{r.buffer, r.remaining})
+    .then([this] {
+      if (r.remaining) { // throw on short reads
+        throw std::system_error(make_error_code(error::read_eof));
+      }
+      return seastar::make_ready_future<bufferlist>(std::move(r.buffer));
+    });
+}
+
+seastar::future<ceph_msg_header> SocketConnection::read_header()
+{
+  return read(sizeof(m.header))
+    .then([this] (bufferlist bl) {
+      auto p = bl.begin();
+      ::decode(m.header, p);
+      return m.header;
+    });
+}
+
+seastar::future<MessageRef> SocketConnection::read_message()
+{
+  // read front
+  return read(m.header.front_len)
+    .then([this] (bufferlist bl) {
+      m.front = std::move(bl);
+      // read middle
+      return read(m.header.middle_len);
+    }).then([this] (bufferlist bl) {
+      m.middle = std::move(bl);
+      // read data
+      return read(m.header.data_len);
+    }).then([this] (bufferlist bl) {
+      m.data = std::move(bl);
+      // read footer
+      return read(sizeof(m.footer));
+    }).then([this] (bufferlist bl) {
+      auto p = bl.begin();
+      ::decode(m.footer, p);
+
+      auto msg = ::decode_message(nullptr, 0, m.header, m.footer,
+                                  m.front, m.middle, m.data, nullptr);
+      constexpr bool add_ref = false; // Message starts with 1 ref
+      return MessageRef{msg, add_ref};
+    });
+}
+
+seastar::future<> SocketConnection::write_message(MessageRef msg)
+{
+  bufferlist bl;
+  encode_message(msg.get(), 0, bl);
+  // write as a seastar::net::packet
+  return out.write(std::move(bl))
+    .then([this] { return out.flush(); });
+}
+
+seastar::future<> SocketConnection::send(MessageRef msg)
+{
+  // chain the message after the last message is sent
+  seastar::shared_future<> f = send_ready.then(
+    [this, msg = std::move(msg)] {
+      return write_message(std::move(msg));
+    });
+
+  // chain any later messages after this one completes
+  send_ready = f.get_future();
+  // allow the caller to wait on the same future
+  return f.get_future();
+}
+
+seastar::future<> SocketConnection::close()
+{
+  return seastar::when_all(in.close(), out.close()).discard_result();
+}
+
+seastar::future<> SocketConnection::client_handshake()
+{
+  return seastar::now(); // TODO
+}
+
+seastar::future<> SocketConnection::server_handshake()
+{
+  return seastar::now(); // TODO
+}
diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h
new file mode 100644 (file)
index 0000000..617d6db
--- /dev/null
@@ -0,0 +1,78 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2017 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <core/reactor.hh>
+
+#include "Connection.h"
+
+namespace ceph {
+namespace net {
+
+class SocketConnection : public Connection {
+  seastar::connected_socket socket;
+  seastar::input_stream<char> in;
+  seastar::output_stream<char> out;
+
+  /// buffer state for read()
+  struct Reader {
+    bufferlist buffer;
+    size_t remaining;
+  } r;
+
+  /// read the requested number of bytes into a bufferlist
+  seastar::future<bufferlist> read(size_t bytes);
+
+  /// state for an incoming message
+  struct MessageReader {
+    ceph_msg_header header;
+    ceph_msg_footer footer;
+    bufferlist front;
+    bufferlist middle;
+    bufferlist data;
+  } m;
+
+  /// becomes available when handshake completes, and when all previous messages
+  /// have been sent to the output stream. send() chains new messages as
+  /// continuations to this future to act as a queue
+  seastar::future<> send_ready;
+
+  /// encode/write a message
+  seastar::future<> write_message(MessageRef msg);
+
+ public:
+  SocketConnection(Messenger *messenger,
+                   const entity_addr_t& my_addr,
+                   const entity_addr_t& peer_addr,
+                   seastar::connected_socket&& socket);
+  ~SocketConnection();
+
+  bool is_connected() override;
+
+  seastar::future<> client_handshake() override;
+
+  seastar::future<> server_handshake() override;
+
+  seastar::future<ceph_msg_header> read_header() override;
+
+  seastar::future<MessageRef> read_message() override;
+
+  seastar::future<> send(MessageRef msg) override;
+
+  seastar::future<> close() override;
+};
+
+} // namespace net
+} // namespace ceph