]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
msg/async: introduce bptr-carrying continuations.
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Fri, 8 Mar 2019 02:54:49 +0000 (03:54 +0100)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Sun, 10 Mar 2019 00:11:59 +0000 (01:11 +0100)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/msg/async/Protocol.h
src/msg/async/ProtocolV2.cc
src/msg/async/ProtocolV2.h

index 08a6481240f1dafeb8330caab6f667cf83d0c9c8..556c9ef1470f92fadeb96f2fa7a7ea6618e0af2b 100644 (file)
@@ -46,10 +46,38 @@ public:
   }
 };
 
+using rx_buffer_t = ceph::bufferptr;
+// FIXME: std::function in AsyncConnection requires us to be copy-
+// constructible just in the case - even if nobody actually copies
+// the std::functions there. This inhibits usage of unique_ptr of
+// ptr_node inside. Reworking the callback mechanism might help but
+// for now we can go with regular bptr.
+//
+//    std::unique_ptr<buffer::ptr_node, buffer::ptr_node::disposer>;
+
+template <class C>
+class CtRxNode : public Ct<C> {
+  using fn_t = Ct<C> *(C::*)(rx_buffer_t&&, int r);
+  fn_t _f;
+
+  mutable rx_buffer_t node;
+  int r;
+
+public:
+  CtRxNode(fn_t f) : _f(f) {}
+  void setParams(rx_buffer_t &&node, int r) {
+    this->node = std::move(node);
+    this->r = r;
+  }
+  inline Ct<C> *call(C *foo) const override {
+    return (foo->*_f)(std::move(node), r);
+  }
+};
 
 template <class C> using CONTINUATION_TYPE = CtFun<C>;
 template <class C> using CONTINUATION_TX_TYPE = CtFun<C, int>;
 template <class C> using CONTINUATION_RX_TYPE = CtFun<C, char*, int>;
+template <class C> using CONTINUATION_RXBPTR_TYPE = CtRxNode<C>;
 
 #define CONTINUATION_DECL(C, F, ...)                    \
   CtFun<C, ##__VA_ARGS__> F##_cont { (&C::F) };
@@ -67,6 +95,10 @@ template <class C> using CONTINUATION_RX_TYPE = CtFun<C, char*, int>;
 
 #define READ_HANDLER_CONTINUATION_DECL(C, F) \
   CONTINUATION_DECL(C, F, char *, int)
+
+#define READ_BPTR_HANDLER_CONTINUATION_DECL(C, F) \
+  CtRxNode<C> F##_cont { (&C::F) };
+
 #define WRITE_HANDLER_CONTINUATION_DECL(C, F) CONTINUATION_DECL(C, F, int)
 
 //////////////////////////////////////////////////////////////////////
index 256c835bd487393125417fb5581d2ba415ce623b..49189c170043f79548f5db08d443f9e399948e33 100644 (file)
@@ -715,6 +715,24 @@ CtPtr ProtocolV2::read(CONTINUATION_RX_TYPE<ProtocolV2> &next,
   return nullptr;
 }
 
+CtPtr ProtocolV2::read(CONTINUATION_RXBPTR_TYPE<ProtocolV2> &next,
+                       rx_buffer_t &&buffer) {
+  const auto len = buffer.length();
+  const auto buf = buffer.c_str();
+  ssize_t r = connection->read(len, buf,
+    [&next, this, bufnode = std::move(buffer)](char *buffer, int r) mutable {
+      next.setParams(std::move(bufnode), r);
+      run_continuation(next);
+    });
+  if (r <= 0) {
+    // error or done synchronously
+    next.setParams(rx_buffer_t(), r);
+    return &next;
+  }
+
+  return nullptr;
+}
+
 template <class F>
 CtPtr ProtocolV2::write(const std::string &desc,
                         CONTINUATION_TYPE<ProtocolV2> &next,
index 3e8e84a4e0efaf25f09f24c8f70003845aed9ab5..242a2bea3557e8c38f833d74ba469fb15980ea4e 100644 (file)
@@ -117,6 +117,8 @@ private:
 
   Ct<ProtocolV2> *read(CONTINUATION_RX_TYPE<ProtocolV2> &next,
                        int len, char *buffer = nullptr);
+  Ct<ProtocolV2> *read(CONTINUATION_RXBPTR_TYPE<ProtocolV2> &next,
+                       rx_buffer_t&& buffer);
   template <class F>
   Ct<ProtocolV2> *write(const std::string &desc,
                         CONTINUATION_TYPE<ProtocolV2> &next,