]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg: SocketConnection reads message tags
authorCasey Bodley <cbodley@redhat.com>
Fri, 27 Oct 2017 03:55:05 +0000 (23:55 -0400)
committerKefu Chai <kchai@redhat.com>
Wed, 13 Jun 2018 06:09:22 +0000 (14:09 +0800)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/crimson/net/SocketConnection.cc
src/crimson/net/SocketConnection.h

index 26a3f1b16859315fb70e60ba513e733351f0562b..3fc03cb0e1b9fac4f8afc49679527081a15f3209 100644 (file)
@@ -17,6 +17,7 @@
 
 #include "SocketConnection.h"
 
+#include "include/msgr.h"
 #include "msg/Message.h"
 
 using namespace ceph::net;
@@ -94,6 +95,63 @@ seastar::future<bufferlist> SocketConnection::read(size_t bytes)
     });
 }
 
+void SocketConnection::read_tags_until_next_message()
+{
+  seastar::repeat([this] {
+      // read the next tag
+      return in.read_exactly(1)
+        .then([this] (auto buf) {
+          if (buf.empty()) {
+            throw std::system_error(make_error_code(error::read_eof));
+          }
+          switch (buf[0]) {
+          case CEPH_MSGR_TAG_MSG:
+            // stop looping and notify read_header()
+            return seastar::make_ready_future<seastar::stop_iteration>(
+                seastar::stop_iteration::yes);
+
+          case CEPH_MSGR_TAG_ACK:
+            return in.read_exactly(sizeof(ceph_le64))
+              .then([] (auto buf) {
+                auto seq = reinterpret_cast<const ceph_le64*>(buf.get());
+                std::cout << "ack " << *seq << std::endl;
+                return seastar::stop_iteration::no;
+              });
+
+          case CEPH_MSGR_TAG_KEEPALIVE:
+            break;
+
+          case CEPH_MSGR_TAG_KEEPALIVE2:
+            return in.read_exactly(sizeof(ceph_timespec))
+              .then([] (auto buf) {
+                auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
+                std::cout << "keepalive2 " << t->tv_sec << std::endl;
+                // TODO: schedule ack
+                return seastar::stop_iteration::no;
+              });
+
+          case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
+            return in.read_exactly(sizeof(ceph_timespec))
+              .then([] (auto buf) {
+                auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
+                std::cout << "keepalive2 ack " << t->tv_sec << std::endl;
+                return seastar::stop_iteration::no;
+              });
+
+          case CEPH_MSGR_TAG_CLOSE:
+            std::cout << "close" << std::endl;
+            break;
+          }
+          return seastar::make_ready_future<seastar::stop_iteration>(
+              seastar::stop_iteration::no);
+        });
+    }).then_wrapped([this] (auto fut) {
+      // satisfy the message promise
+      fut.forward_to(std::move(on_message));
+      on_message = seastar::promise<>{};
+    });
+}
+
 seastar::future<MessageRef> SocketConnection::read_message()
 {
   return on_message.get_future()
@@ -119,9 +177,11 @@ seastar::future<MessageRef> SocketConnection::read_message()
       // read footer
       return read(sizeof(m.footer));
     }).then([this] (bufferlist bl) {
-      auto p = bl.begin();
-      ::decode(m.footer, p);
+      // resume background processing of tags
+      read_tags_until_next_message();
 
+      auto p = bl.cbegin();
+      ::decode(m.footer, p);
       auto msg = ::decode_message(nullptr, 0, m.header, m.footer,
                                   m.front, m.middle, m.data, nullptr);
       constexpr bool add_ref = false; // Message starts with 1 ref
@@ -132,6 +192,8 @@ seastar::future<MessageRef> SocketConnection::read_message()
 seastar::future<> SocketConnection::write_message(MessageRef msg)
 {
   bufferlist bl;
+  unsigned char tag = CEPH_MSGR_TAG_MSG;
+  encode(tag, bl);
   encode_message(msg.get(), 0, bl);
   // write as a seastar::net::packet
   return out.write(std::move(bl))
@@ -296,6 +358,9 @@ seastar::future<> SocketConnection::client_handshake()
       // TODO: read authorizer
       assert(p.end());
       return handle_connect_reply();
+    }).then([this] {
+      // start background processing of tags
+      read_tags_until_next_message();
     }).then_wrapped([this] (auto fut) {
       // satisfy the handshake's promise
       fut.forward_to(std::move(h.promise));
@@ -324,6 +389,9 @@ seastar::future<> SocketConnection::server_handshake()
       // TODO: read authorizer
 
       return handle_connect();
+    }).then([this] {
+      // start background processing of tags
+      read_tags_until_next_message();
     }).then_wrapped([this] (auto fut) {
       // satisfy the handshake's promise
       fut.forward_to(std::move(h.promise));
index de0062b7c309320bb48b2343961d0c21ac4077a9..bf86803fe185f753ecf884ee659936f275403199 100644 (file)
@@ -56,6 +56,12 @@ class SocketConnection : public Connection {
     bufferlist data;
   } m;
 
+  /// satisfied when a CEPH_MSGR_TAG_MSG is read, indicating that a message
+  /// header will follow
+  seastar::promise<> on_message;
+
+  void read_tags_until_next_message();
+
   /// becomes available when handshake completes, and when all previous messages
   /// have been sent to the output stream. send() chains new messages as
   /// continuations to this future to act as a queue