]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
msg/async, v2: message frames are pre-dispatched now.
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Mon, 25 Feb 2019 21:07:27 +0000 (22:07 +0100)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Thu, 28 Feb 2019 20:42:34 +0000 (21:42 +0100)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/msg/async/ProtocolV2.cc

index cf700b601d7015c8f9f4c2615056d869ea428fa3..9a29dcde3bf010c3df8468587da82ddfa349fa3d 100644 (file)
@@ -1161,7 +1161,8 @@ CtPtr ProtocolV2::handle_read_frame_dispatch() {
     case Tag::WAIT:
       return handle_wait();
     case Tag::MESSAGE:
-      return handle_message();
+      // see the comment in ::read_frame_segment().
+      return handle_message_complete();
     default: {
       lderr(cct) << __func__
                  << " received unknown tag=" << static_cast<uint32_t>(next_tag)
@@ -1185,7 +1186,27 @@ CtPtr ProtocolV2::read_frame_segment() {
     // used for Message's data field. It might be dangerous and will be
     // ultimately replaced by `allocation policies`.
     rx_segments_data.emplace_back(ceph::bufferlist{});
-    return handle_read_frame_dispatch();
+
+    // XXX: for the sake of unified epilogue handling this becomes even
+    // uglier. We are doing early dispatch of Messages now. The overall
+    // idea is to:
+    //   1. parse ceph_msg_header2 which let us know tid, and thus pick
+    //      up appropriate rx_buffer (early dispatch aka pre-dispatch).
+    //   2. Read data field into selected rx_buffer.
+    //   3. REUNIFY WITH THE MAIN FLOW: read and handle frame epilogue.
+    //   4. Do ::handle_read_frame_dispatch() as for any kind of frame.
+    //      For messages it wll call ::handle_message_complete().
+#if 0
+    ceph_assert_always(next_tag == Tag::MESSAGE);
+#else
+    if (next_tag != Tag::MESSAGE) {
+      ldout(cct, 20) << __func__
+                    << " only message can use DEFERRED_ALLOCATION"
+                    << dendl;
+      return _fault();
+    }
+#endif
+    return handle_message();
   }
 
   std::unique_ptr<buffer::ptr_node, buffer::ptr_node::disposer> rx_buffer;
@@ -1430,7 +1451,7 @@ CtPtr ProtocolV2::read_message_data() {
   }
 
   state = READ_MESSAGE_COMPLETE;
-  return handle_message_complete();
+  return handle_read_frame_dispatch();
 }
 
 CtPtr ProtocolV2::handle_message_data(char *buffer, int r) {
@@ -1460,7 +1481,7 @@ CtPtr ProtocolV2::handle_message_extra_bytes(char *buffer, int r) {
   }
 
   state = READ_MESSAGE_COMPLETE;
-  return handle_message_complete();
+  return handle_read_frame_dispatch();
 }
 
 CtPtr ProtocolV2::handle_message_complete() {