#include "SocketConnection.h"
+#include "include/msgr.h"
#include "msg/Message.h"
using namespace ceph::net;
});
}
+void SocketConnection::read_tags_until_next_message()
+{
+ seastar::repeat([this] {
+ // read the next tag
+ return in.read_exactly(1)
+ .then([this] (auto buf) {
+ if (buf.empty()) {
+ throw std::system_error(make_error_code(error::read_eof));
+ }
+ switch (buf[0]) {
+ case CEPH_MSGR_TAG_MSG:
+ // stop looping and notify read_header()
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::yes);
+
+ case CEPH_MSGR_TAG_ACK:
+ return in.read_exactly(sizeof(ceph_le64))
+ .then([] (auto buf) {
+ auto seq = reinterpret_cast<const ceph_le64*>(buf.get());
+ std::cout << "ack " << *seq << std::endl;
+ return seastar::stop_iteration::no;
+ });
+
+ case CEPH_MSGR_TAG_KEEPALIVE:
+ break;
+
+ case CEPH_MSGR_TAG_KEEPALIVE2:
+ return in.read_exactly(sizeof(ceph_timespec))
+ .then([] (auto buf) {
+ auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
+ std::cout << "keepalive2 " << t->tv_sec << std::endl;
+ // TODO: schedule ack
+ return seastar::stop_iteration::no;
+ });
+
+ case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
+ return in.read_exactly(sizeof(ceph_timespec))
+ .then([] (auto buf) {
+ auto t = reinterpret_cast<const ceph_timespec*>(buf.get());
+ std::cout << "keepalive2 ack " << t->tv_sec << std::endl;
+ return seastar::stop_iteration::no;
+ });
+
+ case CEPH_MSGR_TAG_CLOSE:
+ std::cout << "close" << std::endl;
+ break;
+ }
+ return seastar::make_ready_future<seastar::stop_iteration>(
+ seastar::stop_iteration::no);
+ });
+ }).then_wrapped([this] (auto fut) {
+ // satisfy the message promise
+ fut.forward_to(std::move(on_message));
+ on_message = seastar::promise<>{};
+ });
+}
+
seastar::future<MessageRef> SocketConnection::read_message()
{
return on_message.get_future()
// read footer
return read(sizeof(m.footer));
}).then([this] (bufferlist bl) {
- auto p = bl.begin();
- ::decode(m.footer, p);
+ // resume background processing of tags
+ read_tags_until_next_message();
+ auto p = bl.cbegin();
+ ::decode(m.footer, p);
auto msg = ::decode_message(nullptr, 0, m.header, m.footer,
m.front, m.middle, m.data, nullptr);
constexpr bool add_ref = false; // Message starts with 1 ref
seastar::future<> SocketConnection::write_message(MessageRef msg)
{
bufferlist bl;
+ unsigned char tag = CEPH_MSGR_TAG_MSG;
+ encode(tag, bl);
encode_message(msg.get(), 0, bl);
// write as a seastar::net::packet
return out.write(std::move(bl))
// TODO: read authorizer
assert(p.end());
return handle_connect_reply();
+ }).then([this] {
+ // start background processing of tags
+ read_tags_until_next_message();
}).then_wrapped([this] (auto fut) {
// satisfy the handshake's promise
fut.forward_to(std::move(h.promise));
// TODO: read authorizer
return handle_connect();
+ }).then([this] {
+ // start background processing of tags
+ read_tags_until_next_message();
}).then_wrapped([this] (auto fut) {
// satisfy the handshake's promise
fut.forward_to(std::move(h.promise));