]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: msgr2: template specialized write function for frames
authorRicardo Dias <rdias@suse.com>
Fri, 22 Feb 2019 11:05:38 +0000 (11:05 +0000)
committerRicardo Dias <rdias@suse.com>
Mon, 25 Feb 2019 16:43:38 +0000 (16:43 +0000)
Signed-off-by: Ricardo Dias <rdias@suse.com>
src/msg/async/ProtocolV2.cc
src/msg/async/ProtocolV2.h

index d4e9ed7fb9b19c8caad45f1e1950e2dac4f601d0..d76feae7384fbfa5e70c530a4cea0c21ea79c3ed 100644 (file)
@@ -747,6 +747,13 @@ CtPtr ProtocolV2::read(CONTINUATION_PARAM(next, ProtocolV2, char *, int),
   return nullptr;
 }
 
+template <class F>
+CtPtr ProtocolV2::write(const std::string &desc,
+                        CONTINUATION_PARAM(next, ProtocolV2),
+                        F &frame) {
+  return write(desc, CONTINUATION(next), frame.get_buffer());
+}
+
 CtPtr ProtocolV2::write(const std::string &desc,
                         CONTINUATION_PARAM(next, ProtocolV2),
                         bufferlist &buffer) {
@@ -898,7 +905,7 @@ CtPtr ProtocolV2::_handle_peer_banner_payload(char *buffer, int r) {
 
   INTERCEPT(state == CONNECTING ? 7 : 8);
 
-  return WRITE(hello.get_buffer(), "hello frame", read_frame);
+  return WRITE(hello, "hello frame", read_frame);
 }
 
 CtPtr ProtocolV2::handle_hello(ceph::bufferlist &payload)
@@ -1767,7 +1774,7 @@ CtPtr ProtocolV2::send_auth_request(std::vector<uint32_t> &allowed_methods) {
   INTERCEPT(9);
 
   AuthRequestFrame frame(auth_meta->auth_method, preferred_modes, bl);
-  return WRITE(frame.get_buffer(), "auth request", read_frame);
+  return WRITE(frame, "auth request", read_frame);
 }
 
 CtPtr ProtocolV2::handle_auth_bad_method(ceph::bufferlist &payload) {
@@ -1821,7 +1828,7 @@ CtPtr ProtocolV2::handle_auth_reply_more(ceph::bufferlist &payload)
     return _fault();
   }
   AuthRequestMoreFrame more_reply(dummy_ctor_conflict_helper{}, reply);
-  return WRITE(more_reply.get_buffer(), "auth request more", read_frame);
+  return WRITE(more_reply, "auth request more", read_frame);
 }
 
 CtPtr ProtocolV2::handle_auth_done(ceph::bufferlist &payload)
@@ -1927,7 +1934,7 @@ CtPtr ProtocolV2::send_client_ident() {
 
   INTERCEPT(11);
 
-  return WRITE(client_ident.get_buffer(), "client ident", read_frame);
+  return WRITE(client_ident, "client ident", read_frame);
 }
 
 CtPtr ProtocolV2::send_reconnect() {
@@ -1948,7 +1955,7 @@ CtPtr ProtocolV2::send_reconnect() {
 
   INTERCEPT(13);
 
-  return WRITE(reconnect.get_buffer(), "reconnect", read_frame);
+  return WRITE(reconnect, "reconnect", read_frame);
 }
 
 CtPtr ProtocolV2::handle_ident_missing_features(ceph::bufferlist &payload)
@@ -2152,7 +2159,7 @@ CtPtr ProtocolV2::_auth_bad_method(int r)
                << dendl;
   AuthBadMethodFrame bad_method(auth_meta->auth_method, r, allowed_methods,
                                allowed_modes);
-  return WRITE(bad_method.get_buffer(), "bad auth method", read_frame);
+  return WRITE(bad_method, "bad auth method", read_frame);
 }
 
 CtPtr ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
@@ -2183,10 +2190,10 @@ CtPtr ProtocolV2::_handle_auth_request(bufferlist& auth_payload, bool more)
       ceph::crypto::onwire::rxtx_t::create_handler_pair(cct, *auth_meta, true);
     AuthDoneFrame auth_done(connection->peer_global_id, auth_meta->con_mode,
                            reply);
-    return WRITE(auth_done.get_buffer(), "auth done", read_frame);
+    return WRITE(auth_done, "auth done", read_frame);
   } else if (r == 0) {
     AuthReplyMoreFrame more(dummy_ctor_conflict_helper{}, reply);
-    return WRITE(more.get_buffer(), "auth reply more", read_frame);
+    return WRITE(more, "auth reply more", read_frame);
   } else if (r == -EBUSY) {
     // kick the client and maybe they'll come back later
     return _fault();
@@ -2250,8 +2257,7 @@ CtPtr ProtocolV2::handle_client_ident(ceph::bufferlist &payload)
                   << feat_missing << std::dec << dendl;
     IdentMissingFeaturesFrame ident_missing_features(session_stream_handlers, feat_missing);
 
-    bufferlist &bl = ident_missing_features.get_buffer();
-    return WRITE(bl, "ident missing features", read_frame);
+    return WRITE(ident_missing_features, "ident missing features", read_frame);
   }
 
   connection_features =
@@ -2345,7 +2351,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
     ldout(cct, 0) << __func__
                   << " no existing connection exists, reseting client" << dendl;
     ResetFrame reset(session_stream_handlers, true);
-    return WRITE(reset.get_buffer(), "session reset", read_frame);
+    return WRITE(reset, "session reset", read_frame);
   }
 
   std::lock_guard<std::mutex> l(existing->lock);
@@ -2360,7 +2366,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
     ldout(cct, 5) << __func__ << " existing " << existing
                   << " already closed. Reseting client" << dendl;
     ResetFrame reset(session_stream_handlers, true);
-    return WRITE(reset.get_buffer(), "session reset", read_frame);
+    return WRITE(reset, "session reset", read_frame);
   }
 
   if (exproto->replacing) {
@@ -2368,8 +2374,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
                   << " existing racing replace happened while replacing."
                   << " existing=" << existing << dendl;
     RetryGlobalFrame retry(session_stream_handlers, exproto->peer_global_seq);
-    bufferlist &bl = retry.get_buffer();
-    return WRITE(bl, "session retry", read_frame);
+    return WRITE(retry, "session retry", read_frame);
   }
 
   if (exproto->client_cookie != reconnect.client_cookie()) {
@@ -2380,7 +2385,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
                   << ", reseting client."
                   << dendl;
     ResetFrame reset(session_stream_handlers, connection->policy.resetcheck);
-    return WRITE(reset.get_buffer(), "session reset", read_frame);
+    return WRITE(reset, "session reset", read_frame);
   } else if (exproto->server_cookie == 0) {
     // this happens when:
     //   - a connects to b
@@ -2393,7 +2398,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
                   << " server_ident. Asking peer to resume session"
                   << " establishment" << dendl;
     ResetFrame reset(session_stream_handlers, false);
-    return WRITE(reset.get_buffer(), "session reset", read_frame);
+    return WRITE(reset, "session reset", read_frame);
   }
 
   if (exproto->peer_global_seq > reconnect.global_seq()) {
@@ -2405,7 +2410,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
 
     INTERCEPT(18);
 
-    return WRITE(retry.get_buffer(), "session retry", read_frame);
+    return WRITE(retry, "session retry", read_frame);
   }
 
   if (exproto->connect_seq > reconnect.connect_seq()) {
@@ -2414,7 +2419,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
                   << " ccs=" << reconnect.connect_seq()
                   << " , ask client to retry" << dendl;
     RetryFrame retry(session_stream_handlers, exproto->connect_seq);
-    return WRITE(retry.get_buffer(), "session retry", read_frame);
+    return WRITE(retry, "session retry", read_frame);
   }
 
   if (exproto->connect_seq == reconnect.connect_seq()) {
@@ -2429,7 +2434,7 @@ CtPtr ProtocolV2::handle_reconnect(ceph::bufferlist &payload)
           << existing << dendl;
 
       WaitFrame wait(session_stream_handlers);
-      return WRITE(wait.get_buffer(), "wait", read_frame);
+      return WRITE(wait, "wait", read_frame);
     } else {
       // this connection wins
       ldout(cct, 1) << __func__
@@ -2472,7 +2477,7 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
                   << " existing racing replace happened while replacing."
                   << " existing=" << existing << dendl;
     WaitFrame wait(session_stream_handlers);
-    return WRITE(wait.get_buffer(), "wait", read_frame);
+    return WRITE(wait, "wait", read_frame);
   }
 
   if (exproto->peer_global_seq > peer_global_seq) {
@@ -2549,8 +2554,7 @@ CtPtr ProtocolV2::handle_existing_connection(AsyncConnectionRef existing) {
     existing->send_keepalive();
     existing->lock.lock();
     WaitFrame wait(session_stream_handlers);
-    bufferlist &bl = wait.get_buffer();
-    return WRITE(bl, "wait", read_frame);
+    return WRITE(wait, "wait", read_frame);
   }
 }
 
@@ -2737,7 +2741,7 @@ CtPtr ProtocolV2::send_server_ident() {
 
   INTERCEPT(12);
 
-  return WRITE(server_ident.get_buffer(), "server ident", server_ready);
+  return WRITE(server_ident, "server ident", server_ready);
 }
 
 CtPtr ProtocolV2::server_ready() {
@@ -2792,5 +2796,5 @@ CtPtr ProtocolV2::send_reconnect_ok() {
 
   INTERCEPT(14);
 
-  return WRITE(reconnect_ok.get_buffer(), "reconnect ok", server_ready);
+  return WRITE(reconnect_ok, "reconnect ok", server_ready);
 }
index 50dbbeecf975007802a9795343bd285a5218d838..045be4623ce88e95a502aefc5e1d356a9e7a6253 100644 (file)
@@ -133,6 +133,9 @@ private:
 
   Ct<ProtocolV2> *read(CONTINUATION_PARAM(next, ProtocolV2, char *, int),
                        int len, char *buffer = nullptr);
+  template <class F>
+  Ct<ProtocolV2> *write(const std::string &desc,
+                        CONTINUATION_PARAM(next, ProtocolV2), F &frame);
   Ct<ProtocolV2> *write(const std::string &desc,
                         CONTINUATION_PARAM(next, ProtocolV2),
                         bufferlist &buffer);