]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async: Encode message once features are set 59286/head
authorAishwarya Mathuria <amathuri@redhat.com>
Thu, 5 May 2022 03:02:51 +0000 (08:32 +0530)
committerKonstantin Shalygin <k0ste@k0ste.ru>
Sat, 11 Jan 2025 07:30:15 +0000 (14:30 +0700)
Modify send_message to check if features are set before trying to encode a
message.
If features are not set at this point, we will encode the message at a
later stage (in write_event) when the connection will be in ready state
which implies that the features will definitely be set.

Fixes: https://tracker.ceph.com/issues/52657
Signed-off-by: Aishwarya Mathuria <amathuri@redhat.com>
(cherry picked from commit 7268211161ba5d2c47464c19fb25555ae194841d)

src/msg/async/ProtocolV1.cc
src/msg/async/ProtocolV1.h
src/msg/async/ProtocolV2.cc

index b45ad8ca5155f37dcadbe2ef7b1dffa4b5122deb..041942fd906ac3ba34117a1efe2d6c157149ce2c 100644 (file)
@@ -217,8 +217,10 @@ void ProtocolV1::send_message(Message *m) {
   // TODO: Currently not all messages supports reencode like MOSDMap, so here
   // only let fast dispatch support messages prepare message
   bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
-  if (can_fast_prepare) {
+  bool is_prepared = false;
+  if (can_fast_prepare && f) {
     prepare_send_message(f, m, bl);
+    is_prepared = true;
   }
 
   std::lock_guard<std::mutex> l(connection->write_lock);
@@ -238,7 +240,8 @@ void ProtocolV1::send_message(Message *m) {
   } else {
     m->queue_start = ceph::mono_clock::now();
     m->trace.event("async enqueueing message");
-    out_q[m->get_priority()].emplace_back(std::move(bl), m);
+    out_q[m->get_priority()].emplace_back(out_q_entry_t{
+      std::move(bl), m, is_prepared});
     ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
                    << dendl;
     if (can_write != WriteStatus::REPLACING && !write_in_progress) {
@@ -322,8 +325,10 @@ void ProtocolV1::write_event() {
        }
       }
 
-      ceph::buffer::list data;
-      Message *m = _get_next_outgoing(&data);
+      const out_q_entry_t out_entry = _get_next_outgoing();
+      Message *m = out_entry.m;
+      ceph::buffer::list data = out_entry.bl;
+
       if (!m) {
         break;
       }
@@ -337,7 +342,7 @@ void ProtocolV1::write_event() {
       connection->write_lock.unlock();
 
       // send_message or requeue messages may not encode message
-      if (!data.length()) {
+      if (!data.length() || !out_entry.is_prepared) {
         prepare_send_message(connection->get_features(), m, data);
       }
 
@@ -1199,7 +1204,7 @@ void ProtocolV1::requeue_sent() {
     return;
   }
 
-  list<pair<ceph::buffer::list, Message *> > &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
+  list<out_q_entry_t> &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
   out_seq -= sent.size();
   while (!sent.empty()) {
     Message *m = sent.back();
@@ -1207,7 +1212,7 @@ void ProtocolV1::requeue_sent() {
     ldout(cct, 10) << __func__ << " " << *m << " for resend "
                    << " (" << m->get_seq() << ")" << dendl;
     m->clear_payload();
-    rq.push_front(make_pair(ceph::buffer::list(), m));
+    rq.push_front(out_q_entry_t{ceph::buffer::list(), m, false});
   }
 }
 
@@ -1217,15 +1222,15 @@ uint64_t ProtocolV1::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) {
   if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0) {
     return seq;
   }
-  list<pair<ceph::buffer::list, Message *> > &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
+  list<out_q_entry_t> &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
   uint64_t count = out_seq;
   while (!rq.empty()) {
-    pair<ceph::buffer::list, Message *> p = rq.front();
-    if (p.second->get_seq() == 0 || p.second->get_seq() > seq) break;
-    ldout(cct, 10) << __func__ << " " << *(p.second) << " for resend seq "
-                   << p.second->get_seq() << " <= " << seq << ", discarding"
+    Message* const m = rq.front().m;
+    if (m->get_seq() == 0 || m->get_seq() > seq) break;
+    ldout(cct, 10) << __func__ << " " << *(m) << " for resend seq "
+                   << m->get_seq() << " <= " << seq << ", discarding"
                    << dendl;
-    p.second->put();
+    m->put();
     rq.pop_front();
     count++;
   }
@@ -1245,13 +1250,13 @@ void ProtocolV1::discard_out_queue() {
     (*p)->put();
   }
   sent.clear();
-  for (map<int, list<pair<ceph::buffer::list, Message *> > >::iterator p =
+  for (map<int, list<out_q_entry_t>>::iterator p =
            out_q.begin();
        p != out_q.end(); ++p) {
-    for (list<pair<ceph::buffer::list, Message *> >::iterator r = p->second.begin();
+    for (list<out_q_entry_t>::iterator r = p->second.begin();
          r != p->second.end(); ++r) {
-      ldout(cct, 20) << __func__ << " discard " << r->second << dendl;
-      r->second->put();
+      ldout(cct, 20) << __func__ << " discard " << r->m << dendl;
+      r->m->put();
     }
   }
   out_q.clear();
@@ -1320,22 +1325,18 @@ void ProtocolV1::reset_recv_state()
   }
 }
 
-Message *ProtocolV1::_get_next_outgoing(ceph::buffer::list *bl) {
-  Message *m = 0;
+ProtocolV1::out_q_entry_t ProtocolV1::_get_next_outgoing() {
+  out_q_entry_t out_entry;
   if (!out_q.empty()) {
-    map<int, list<pair<ceph::buffer::list, Message *> > >::reverse_iterator it =
+    map<int, list<out_q_entry_t>>::reverse_iterator it =
         out_q.rbegin();
     ceph_assert(!it->second.empty());
-    list<pair<ceph::buffer::list, Message *> >::iterator p = it->second.begin();
-    m = p->second;
-    if (p->first.length() && bl) {
-      assert(bl->length() == 0);
-      bl->swap(p->first);
-    }
+    list<out_q_entry_t>::iterator p = it->second.begin();
+    out_entry = *p;
     it->second.erase(p);
     if (it->second.empty()) out_q.erase(it->first);
   }
-  return m;
+  return out_entry;
 }
 
 /**
index b23860e8a015cf52b9296ded19385e33a9b5c3a1..1b7c1d2b5f8cce8707ce4783fac591df11bbf108 100644 (file)
@@ -105,8 +105,14 @@ protected:
   enum class WriteStatus { NOWRITE, REPLACING, CANWRITE, CLOSED };
   std::atomic<WriteStatus> can_write;
   std::list<Message *> sent;  // the first ceph::buffer::list need to inject seq
+  //struct for outbound msgs
+  struct out_q_entry_t {
+    ceph::buffer::list bl;
+    Message* m {nullptr};
+    bool is_prepared {false};
+  };
   // priority queue for outbound msgs
-  std::map<int, std::list<std::pair<ceph::buffer::list, Message *>>> out_q;
+  std::map<int, std::list<out_q_entry_t>> out_q;
   bool keepalive;
   bool write_in_progress = false;
 
@@ -194,7 +200,7 @@ protected:
   void session_reset();
   void randomize_out_seq();
 
-  Message *_get_next_outgoing(ceph::buffer::list *bl);
+  out_q_entry_t _get_next_outgoing();
 
   void prepare_send_message(uint64_t features, Message *m, ceph::buffer::list &bl);
   ssize_t write_message(Message *m, ceph::buffer::list &bl, bool more);
index 08426b796b88b16c9e0142a7dc7d58d2d8d071f7..7c4a4d0fe94137df39e12234ecfd986e1dbb3979 100644 (file)
@@ -431,12 +431,15 @@ void ProtocolV2::send_message(Message *m) {
   // TODO: Currently not all messages supports reencode like MOSDMap, so here
   // only let fast dispatch support messages prepare message
   const bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
-  if (can_fast_prepare) {
+  bool is_prepared;
+  if (can_fast_prepare && f) {
     prepare_send_message(f, m);
+    is_prepared = can_fast_prepare;
+  } else {
+    is_prepared = false;
   }
 
   std::lock_guard<std::mutex> l(connection->write_lock);
-  bool is_prepared = can_fast_prepare;
   // "features" changes will change the payload encoding
   if (can_fast_prepare && (!can_write || connection->get_features() != f)) {
     // ensure the correctness of message encoding