From c8011ed0c69ef68853b5f9f9a16a9783e062b3dd Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Thu, 26 Oct 2017 23:55:05 -0400 Subject: [PATCH] msg: SocketConnection reads message tags Signed-off-by: Casey Bodley --- src/crimson/net/SocketConnection.cc | 72 ++++++++++++++++++++++++++++- src/crimson/net/SocketConnection.h | 6 +++ 2 files changed, 76 insertions(+), 2 deletions(-) diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index 26a3f1b168593..3fc03cb0e1b9f 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -17,6 +17,7 @@ #include "SocketConnection.h" +#include "include/msgr.h" #include "msg/Message.h" using namespace ceph::net; @@ -94,6 +95,63 @@ seastar::future SocketConnection::read(size_t bytes) }); } +void SocketConnection::read_tags_until_next_message() +{ + seastar::repeat([this] { + // read the next tag + return in.read_exactly(1) + .then([this] (auto buf) { + if (buf.empty()) { + throw std::system_error(make_error_code(error::read_eof)); + } + switch (buf[0]) { + case CEPH_MSGR_TAG_MSG: + // stop looping and notify read_header() + return seastar::make_ready_future( + seastar::stop_iteration::yes); + + case CEPH_MSGR_TAG_ACK: + return in.read_exactly(sizeof(ceph_le64)) + .then([] (auto buf) { + auto seq = reinterpret_cast(buf.get()); + std::cout << "ack " << *seq << std::endl; + return seastar::stop_iteration::no; + }); + + case CEPH_MSGR_TAG_KEEPALIVE: + break; + + case CEPH_MSGR_TAG_KEEPALIVE2: + return in.read_exactly(sizeof(ceph_timespec)) + .then([] (auto buf) { + auto t = reinterpret_cast(buf.get()); + std::cout << "keepalive2 " << t->tv_sec << std::endl; + // TODO: schedule ack + return seastar::stop_iteration::no; + }); + + case CEPH_MSGR_TAG_KEEPALIVE2_ACK: + return in.read_exactly(sizeof(ceph_timespec)) + .then([] (auto buf) { + auto t = reinterpret_cast(buf.get()); + std::cout << "keepalive2 ack " << t->tv_sec << std::endl; + return seastar::stop_iteration::no; + }); + + case CEPH_MSGR_TAG_CLOSE: + std::cout << "close" << std::endl; + break; + } + return seastar::make_ready_future( + seastar::stop_iteration::no); + }); + }).then_wrapped([this] (auto fut) { + // satisfy the message promise + fut.forward_to(std::move(on_message)); + on_message = seastar::promise<>{}; + }); +} + seastar::future SocketConnection::read_message() { return on_message.get_future() @@ -119,9 +177,11 @@ seastar::future SocketConnection::read_message() // read footer return read(sizeof(m.footer)); }).then([this] (bufferlist bl) { - auto p = bl.begin(); - ::decode(m.footer, p); + // resume background processing of tags + read_tags_until_next_message(); + auto p = bl.cbegin(); + ::decode(m.footer, p); auto msg = ::decode_message(nullptr, 0, m.header, m.footer, m.front, m.middle, m.data, nullptr); constexpr bool add_ref = false; // Message starts with 1 ref @@ -132,6 +192,8 @@ seastar::future SocketConnection::read_message() seastar::future<> SocketConnection::write_message(MessageRef msg) { bufferlist bl; + unsigned char tag = CEPH_MSGR_TAG_MSG; + encode(tag, bl); encode_message(msg.get(), 0, bl); // write as a seastar::net::packet return out.write(std::move(bl)) @@ -296,6 +358,9 @@ seastar::future<> SocketConnection::client_handshake() // TODO: read authorizer assert(p.end()); return handle_connect_reply(); + }).then([this] { + // start background processing of tags + read_tags_until_next_message(); }).then_wrapped([this] (auto fut) { // satisfy the handshake's promise fut.forward_to(std::move(h.promise)); @@ -324,6 +389,9 @@ seastar::future<> SocketConnection::server_handshake() // TODO: read authorizer return handle_connect(); + }).then([this] { + // start background processing of tags + read_tags_until_next_message(); }).then_wrapped([this] (auto fut) { // satisfy the handshake's promise fut.forward_to(std::move(h.promise)); diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index de0062b7c3093..bf86803fe185f 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -56,6 +56,12 @@ class SocketConnection : public Connection { bufferlist data; } m; + /// satisfied when a CEPH_MSGR_TAG_MSG is read, indicating that a message + /// header will follow + seastar::promise<> on_message; + + void read_tags_until_next_message(); + /// becomes available when handshake completes, and when all previous messages /// have been sent to the output stream. send() chains new messages as /// continuations to this future to act as a queue -- 2.39.5