From 23a92878f7b569af50af64093951d96a56198fbd Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Fri, 8 Mar 2019 03:54:49 +0100 Subject: [PATCH] msg/async: introduce bptr-carrying continuations. Signed-off-by: Radoslaw Zarzynski --- src/msg/async/Protocol.h | 32 ++++++++++++++++++++++++++++++++ src/msg/async/ProtocolV2.cc | 18 ++++++++++++++++++ src/msg/async/ProtocolV2.h | 2 ++ 3 files changed, 52 insertions(+) diff --git a/src/msg/async/Protocol.h b/src/msg/async/Protocol.h index 08a6481240f..556c9ef1470 100644 --- a/src/msg/async/Protocol.h +++ b/src/msg/async/Protocol.h @@ -46,10 +46,38 @@ public: } }; +using rx_buffer_t = ceph::bufferptr; +// FIXME: std::function in AsyncConnection requires us to be copy- +// constructible just in the case - even if nobody actually copies +// the std::functions there. This inhibits usage of unique_ptr of +// ptr_node inside. Reworking the callback mechanism might help but +// for now we can go with regular bptr. +// +// std::unique_ptr; + +template +class CtRxNode : public Ct { + using fn_t = Ct *(C::*)(rx_buffer_t&&, int r); + fn_t _f; + + mutable rx_buffer_t node; + int r; + +public: + CtRxNode(fn_t f) : _f(f) {} + void setParams(rx_buffer_t &&node, int r) { + this->node = std::move(node); + this->r = r; + } + inline Ct *call(C *foo) const override { + return (foo->*_f)(std::move(node), r); + } +}; template using CONTINUATION_TYPE = CtFun; template using CONTINUATION_TX_TYPE = CtFun; template using CONTINUATION_RX_TYPE = CtFun; +template using CONTINUATION_RXBPTR_TYPE = CtRxNode; #define CONTINUATION_DECL(C, F, ...) \ CtFun F##_cont { (&C::F) }; @@ -67,6 +95,10 @@ template using CONTINUATION_RX_TYPE = CtFun; #define READ_HANDLER_CONTINUATION_DECL(C, F) \ CONTINUATION_DECL(C, F, char *, int) + +#define READ_BPTR_HANDLER_CONTINUATION_DECL(C, F) \ + CtRxNode F##_cont { (&C::F) }; + #define WRITE_HANDLER_CONTINUATION_DECL(C, F) CONTINUATION_DECL(C, F, int) ////////////////////////////////////////////////////////////////////// diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 256c835bd48..49189c17004 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -715,6 +715,24 @@ CtPtr ProtocolV2::read(CONTINUATION_RX_TYPE &next, return nullptr; } +CtPtr ProtocolV2::read(CONTINUATION_RXBPTR_TYPE &next, + rx_buffer_t &&buffer) { + const auto len = buffer.length(); + const auto buf = buffer.c_str(); + ssize_t r = connection->read(len, buf, + [&next, this, bufnode = std::move(buffer)](char *buffer, int r) mutable { + next.setParams(std::move(bufnode), r); + run_continuation(next); + }); + if (r <= 0) { + // error or done synchronously + next.setParams(rx_buffer_t(), r); + return &next; + } + + return nullptr; +} + template CtPtr ProtocolV2::write(const std::string &desc, CONTINUATION_TYPE &next, diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index 3e8e84a4e0e..242a2bea355 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -117,6 +117,8 @@ private: Ct *read(CONTINUATION_RX_TYPE &next, int len, char *buffer = nullptr); + Ct *read(CONTINUATION_RXBPTR_TYPE &next, + rx_buffer_t&& buffer); template Ct *write(const std::string &desc, CONTINUATION_TYPE &next, -- 2.39.5