]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: expose message segmentation to ::write_message().
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Thu, 7 Feb 2019 14:11:27 +0000 (15:11 +0100)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Thu, 21 Feb 2019 20:54:18 +0000 (21:54 +0100)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/msg/async/ProtocolV2.cc
src/msg/async/ProtocolV2.h

index e19ec4deabb27006f40775dd2c6e724bf2675050..cdc1ef658515f74dec2a63ce155a691c1a1bb1fd 100644 (file)
@@ -58,8 +58,6 @@ void ProtocolV2::run_continuation(CtPtr continuation) {
   }
 }
 
-const int ASYNC_COALESCE_THRESHOLD = 256;
-
 #define WRITE(B, D, C) write(D, CONTINUATION(C), B)
 
 #define READ(L, C) read(CONTINUATION(C), L)
@@ -107,23 +105,23 @@ static void alloc_aligned_buffer(bufferlist &data, unsigned len, unsigned off) {
 template <class T>
 struct Frame {
 protected:
-  bufferlist payload;
-  bufferlist frame_buffer;
+  ceph::bufferlist payload;
+  ceph::bufferlist::contiguous_filler preamble_filler;
 
 public:
-  Frame() {}
+  Frame() : preamble_filler(payload.append_hole(8)) {}
 
   bufferlist &get_buffer(const uint32_t extra_payload_len = 0) {
-    if (frame_buffer.length()) {
-      return frame_buffer;
-    }
-    encode(static_cast<uint32_t>(payload.length() + extra_payload_len + sizeof(uint32_t)),
-          frame_buffer, -1ll);
-    uint32_t tag = static_cast<uint32_t>(static_cast<T *>(this)->tag);
+    __le32 msg_len = payload.length() + extra_payload_len - sizeof(std::uint32_t);
+    preamble_filler.copy_in(sizeof(msg_len),
+                           reinterpret_cast<const char*>(&msg_len));
+
+    __le32 tag = static_cast<uint32_t>(static_cast<T *>(this)->tag);
+    preamble_filler.copy_in(sizeof(tag),
+                           reinterpret_cast<const char*>(&tag));
     ceph_assert(tag != 0);
-    encode(tag, frame_buffer, -1ll);
-    frame_buffer.claim_append(payload);
-    return frame_buffer;
+
+    return payload;
   }
 
   void decode_frame(char *payload, uint32_t length) {
@@ -279,7 +277,10 @@ template <class T, typename... Args>
 struct SignedEncryptedFrame : public PayloadFrame<T, Args...> {
   SignedEncryptedFrame(ProtocolV2 &protocol, const Args &... args)
       : PayloadFrame<T, Args...>(args...) {
-    protocol.authencrypt_payload(this->payload);
+    ceph::bufferlist trans_bl;
+    this->payload.splice(8, this->payload.length() - 8, &trans_bl);
+    protocol.authencrypt_payload(trans_bl);
+    this->payload.claim_append(trans_bl);
   }
 
   SignedEncryptedFrame(ProtocolV2 &protocol, char *payload, uint32_t length)
@@ -421,9 +422,23 @@ struct MessageHeaderFrame
     : public PayloadFrame<MessageHeaderFrame, ceph_msg_header2> {
   const ProtocolV2::Tag tag = ProtocolV2::Tag::MESSAGE;
 
-  MessageHeaderFrame(ProtocolV2 &protocol, const ceph_msg_header2 &msghdr)
+  MessageHeaderFrame(ProtocolV2 &protocol,
+                    const ceph_msg_header2 &msghdr,
+                    ceph::bufferlist&& front_bl,
+                    ceph::bufferlist&& middle_bl,
+                    ceph::bufferlist&& data_bl)
       : PayloadFrame<MessageHeaderFrame, ceph_msg_header2>(msghdr) {
-    protocol.authencrypt_payload(this->payload);
+    ceph::bufferlist trans_bl;
+    this->payload.splice(8, this->payload.length() - 8, &trans_bl);
+    protocol.authencrypt_payload(trans_bl);
+    this->payload.claim_append(trans_bl);
+
+    trans_bl.append(front_bl);
+    trans_bl.append(middle_bl);
+    trans_bl.append(data_bl);
+
+    protocol.authencrypt_payload(trans_bl);
+    this->payload.claim_append(trans_bl);
   }
 
   MessageHeaderFrame(ProtocolV2 &protocol, char *payload, uint32_t length)
@@ -483,13 +498,10 @@ void ProtocolV2::discard_out_queue() {
     (*p)->put();
   }
   sent.clear();
-  for (map<int, list<pair<bufferlist, Message *>>>::iterator p =
-           out_queue.begin();
-       p != out_queue.end(); ++p) {
-    for (list<pair<bufferlist, Message *>>::iterator r = p->second.begin();
-         r != p->second.end(); ++r) {
-      ldout(cct, 20) << __func__ << " discard " << r->second << dendl;
-      r->second->put();
+  for (auto& [ prio, entries ] : out_queue) {
+    for (auto& entry : entries) {
+      ldout(cct, 20) << __func__ << " discard " << *entry.m << dendl;
+      entry.m->put();
     }
   }
   out_queue.clear();
@@ -546,7 +558,7 @@ void ProtocolV2::requeue_sent() {
     return;
   }
 
-  list<pair<bufferlist, Message *>> &rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
+  auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
   out_seq -= sent.size();
   while (!sent.empty()) {
     Message *m = sent.back();
@@ -554,7 +566,7 @@ void ProtocolV2::requeue_sent() {
     ldout(cct, 5) << __func__ << " requeueing message m=" << m
                   << " seq=" << m->get_seq() << " type=" << m->get_type() << " "
                   << *m << dendl;
-    rq.push_front(make_pair(bufferlist(), m));
+    rq.emplace_front(out_queue_entry_t{false, m});
   }
 }
 
@@ -564,15 +576,15 @@ uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
   if (out_queue.count(CEPH_MSG_PRIO_HIGHEST) == 0) {
     return seq;
   }
-  list<pair<bufferlist, Message *>> &rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
+  auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
   uint64_t count = out_seq;
   while (!rq.empty()) {
-    pair<bufferlist, Message *> p = rq.front();
-    if (p.second->get_seq() == 0 || p.second->get_seq() > seq) break;
-    ldout(cct, 5) << __func__ << " discarding message m=" << p.second
-                  << " seq=" << p.second->get_seq() << " ack_seq=" << seq << " "
-                  << *(p.second) << dendl;
-    p.second->put();
+    Message* const m = rq.front().m;
+    if (m->get_seq() == 0 || m->get_seq() > seq) break;
+    ldout(cct, 5) << __func__ << " discarding message m=" << m
+                  << " seq=" << m->get_seq() << " ack_seq=" << seq << " "
+                  << *m << dendl;
+    m->put();
     rq.pop_front();
     count++;
   }
@@ -719,8 +731,8 @@ CtPtr ProtocolV2::_fault() {
   return nullptr;
 }
 
-void ProtocolV2::prepare_send_message(uint64_t features, Message *m,
-                                      bufferlist &bl) {
+void ProtocolV2::prepare_send_message(uint64_t features,
+                                     Message *m) {
   ldout(cct, 20) << __func__ << " m=" << *m << dendl;
 
   // associate message with Connection (for benefit of encode_payload)
@@ -734,29 +746,25 @@ void ProtocolV2::prepare_send_message(uint64_t features, Message *m,
 
   // encode and copy out of *m
   m->encode(features, messenger->crcflags);
-
-  bl.append(m->get_payload());
-  bl.append(m->get_middle());
-  bl.append(m->get_data());
 }
 
 void ProtocolV2::send_message(Message *m) {
-  bufferlist bl;
   uint64_t f = connection->get_features();
 
   // TODO: Currently not all messages supports reencode like MOSDMap, so here
   // only let fast dispatch support messages prepare message
-  bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
+  const bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
   if (can_fast_prepare) {
-    prepare_send_message(f, m, bl);
+    prepare_send_message(f, m);
   }
 
   std::lock_guard<std::mutex> l(connection->write_lock);
+  bool is_prepared = can_fast_prepare;
   // "features" changes will change the payload encoding
   if (can_fast_prepare && (!can_write || connection->get_features() != f)) {
     // ensure the correctness of message encoding
-    bl.clear();
     m->clear_payload();
+    is_prepared = false;
     ldout(cct, 10) << __func__ << " clear encoded buffer previous " << f
                    << " != " << connection->get_features() << dendl;
   }
@@ -768,7 +776,8 @@ void ProtocolV2::send_message(Message *m) {
     ldout(cct, 5) << __func__ << " enqueueing message m=" << m
                   << " type=" << m->get_type() << " " << *m << dendl;
     m->trace.event("async enqueueing message");
-    out_queue[m->get_priority()].emplace_back(std::move(bl), m);
+    out_queue[m->get_priority()].emplace_back(
+      out_queue_entry_t{is_prepared, m});
     ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
                    << dendl;
     if ((!replacing && can_write) || state == STANDBY) {
@@ -813,26 +822,23 @@ void ProtocolV2::read_event() {
   }
 }
 
-Message *ProtocolV2::_get_next_outgoing(bufferlist *bl) {
-  Message *m = 0;
+ProtocolV2::out_queue_entry_t ProtocolV2::_get_next_outgoing() {
+  out_queue_entry_t out_entry;
+
   if (!out_queue.empty()) {
-    map<int, list<pair<bufferlist, Message *>>>::reverse_iterator it =
-        out_queue.rbegin();
-    ceph_assert(!it->second.empty());
-    list<pair<bufferlist, Message *>>::iterator p = it->second.begin();
-    m = p->second;
-    if (bl) {
-      bl->swap(p->first);
-    }
-    it->second.erase(p);
-    if (it->second.empty()) {
+    auto it = out_queue.rbegin();
+    auto& entries = it->second;
+    ceph_assert(!entries.empty());
+    out_entry = entries.front();
+    entries.pop_front();
+    if (entries.empty()) {
       out_queue.erase(it->first);
     }
   }
-  return m;
+  return out_entry;
 }
 
-ssize_t ProtocolV2::write_message(Message *m, bufferlist &bl, bool more) {
+ssize_t ProtocolV2::write_message(Message *m, bool more) {
   FUNCTRACE(cct);
   ceph_assert(connection->center->in_thread());
   m->set_seq(++out_seq);
@@ -861,24 +867,16 @@ ssize_t ProtocolV2::write_message(Message *m, bufferlist &bl, bool more) {
                     sizeof(header2) - sizeof(header2.header_crc));
   }
 
-  bufferlist flat_bl;
-  if ((bl.length() <= ASYNC_COALESCE_THRESHOLD) && (bl.buffers().size() > 1)) {
-    for (const auto &pb : bl.buffers()) {
-      flat_bl.append((char *)pb.c_str(), pb.length());
-    }
-  } else {
-    flat_bl.claim_append(bl);
-  }
 
-  MessageHeaderFrame message(*this, header2);
-  authencrypt_payload(flat_bl);
+  MessageHeaderFrame message(*this, header2,
+    ceph::bufferlist(m->get_payload()),
+    ceph::bufferlist(m->get_middle()),
+    ceph::bufferlist(m->get_data()));
 
   ldout(cct, 5) << __func__ << " sending message m=" << m
                 << " seq=" << m->get_seq() << " " << *m << dendl;
 
-  bufferlist &msg_bl = message.get_buffer(flat_bl.length());
-  connection->outcoming_bl.claim_append(msg_bl);
-  connection->outcoming_bl.claim_append(flat_bl);
+  connection->outcoming_bl.claim_append(message.get_buffer());
 
   m->trace.event("async writing message");
   ldout(cct, 20) << __func__ << " sending m=" << m << " seq=" << m->get_seq()
@@ -957,26 +955,25 @@ void ProtocolV2::write_event() {
     auto start = ceph::mono_clock::now();
     bool more;
     do {
-      bufferlist data;
-      Message *m = _get_next_outgoing(&data);
-      if (!m) {
+      const auto out_entry = _get_next_outgoing();
+      if (!out_entry.m) {
         break;
       }
 
       if (!connection->policy.lossy) {
         // put on sent list
-        sent.push_back(m);
-        m->get();
+        sent.push_back(out_entry.m);
+        out_entry.m->get();
       }
       more = !out_queue.empty();
       connection->write_lock.unlock();
 
       // send_message or requeue messages may not encode message
-      if (!data.length()) {
-        prepare_send_message(connection->get_features(), m, data);
+      if (!out_entry.is_prepared) {
+        prepare_send_message(connection->get_features(), out_entry.m);
       }
 
-      r = write_message(m, data, more);
+      r = write_message(out_entry.m, more);
 
       connection->write_lock.lock();
       if (r == 0) {
index d578d378bd8faf347bac924a082f82cc9dc91d60..059cc28e0850d22c2874ed1e204740fbdf95c7e4 100644 (file)
@@ -86,7 +86,11 @@ private:
   bool reconnecting;
   bool replacing;
   bool can_write;
-  std::map<int, std::list<std::pair<bufferlist, Message *>>> out_queue;
+  struct out_queue_entry_t {
+    bool is_prepared {false};
+    Message* m {nullptr};
+  };
+  std::map<int, std::list<out_queue_entry_t>> out_queue;
   std::list<Message *> sent;
   std::atomic<uint64_t> out_seq{0};
   std::atomic<uint64_t> in_seq{0};
@@ -123,9 +127,9 @@ private:
   Ct<ProtocolV2> *_fault();
   void discard_out_queue();
   void reset_session();
-  void prepare_send_message(uint64_t features, Message *m, bufferlist &bl);
-  Message *_get_next_outgoing(bufferlist *bl);
-  ssize_t write_message(Message *m, bufferlist &bl, bool more);
+  void prepare_send_message(uint64_t features, Message *m);
+  out_queue_entry_t _get_next_outgoing();
+  ssize_t write_message(Message *m, bool more);
   void append_keepalive();
   void append_keepalive_ack(utime_t &timestamp);
   void handle_message_ack(uint64_t seq);