From: Aishwarya Mathuria Date: Thu, 5 May 2022 03:02:51 +0000 (+0530) Subject: msg/async: Encode message once features are set X-Git-Tag: testing/wip-yuriw-testing-20240417.205635~1^2 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=7268211161ba5d2c47464c19fb25555ae194841d;p=ceph-ci.git msg/async: Encode message once features are set Modify send_message to check if features are set before trying to encode a message. If features are not set at this point, we will encode the message at a later stage (in write_event) when the connection will be in ready state which implies that the features will definitely be set. Fixes: https://tracker.ceph.com/issues/52657 Signed-off-by: Aishwarya Mathuria --- diff --git a/src/msg/async/ProtocolV1.cc b/src/msg/async/ProtocolV1.cc index b45ad8ca515..041942fd906 100644 --- a/src/msg/async/ProtocolV1.cc +++ b/src/msg/async/ProtocolV1.cc @@ -217,8 +217,10 @@ void ProtocolV1::send_message(Message *m) { // TODO: Currently not all messages supports reencode like MOSDMap, so here // only let fast dispatch support messages prepare message bool can_fast_prepare = messenger->ms_can_fast_dispatch(m); - if (can_fast_prepare) { + bool is_prepared = false; + if (can_fast_prepare && f) { prepare_send_message(f, m, bl); + is_prepared = true; } std::lock_guard l(connection->write_lock); @@ -238,7 +240,8 @@ void ProtocolV1::send_message(Message *m) { } else { m->queue_start = ceph::mono_clock::now(); m->trace.event("async enqueueing message"); - out_q[m->get_priority()].emplace_back(std::move(bl), m); + out_q[m->get_priority()].emplace_back(out_q_entry_t{ + std::move(bl), m, is_prepared}); ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m << dendl; if (can_write != WriteStatus::REPLACING && !write_in_progress) { @@ -322,8 +325,10 @@ void ProtocolV1::write_event() { } } - ceph::buffer::list data; - Message *m = _get_next_outgoing(&data); + const out_q_entry_t out_entry = _get_next_outgoing(); + Message *m = out_entry.m; + ceph::buffer::list data = out_entry.bl; + if (!m) { break; } @@ -337,7 +342,7 @@ void ProtocolV1::write_event() { connection->write_lock.unlock(); // send_message or requeue messages may not encode message - if (!data.length()) { + if (!data.length() || !out_entry.is_prepared) { prepare_send_message(connection->get_features(), m, data); } @@ -1199,7 +1204,7 @@ void ProtocolV1::requeue_sent() { return; } - list > &rq = out_q[CEPH_MSG_PRIO_HIGHEST]; + list &rq = out_q[CEPH_MSG_PRIO_HIGHEST]; out_seq -= sent.size(); while (!sent.empty()) { Message *m = sent.back(); @@ -1207,7 +1212,7 @@ void ProtocolV1::requeue_sent() { ldout(cct, 10) << __func__ << " " << *m << " for resend " << " (" << m->get_seq() << ")" << dendl; m->clear_payload(); - rq.push_front(make_pair(ceph::buffer::list(), m)); + rq.push_front(out_q_entry_t{ceph::buffer::list(), m, false}); } } @@ -1217,15 +1222,15 @@ uint64_t ProtocolV1::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) { if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0) { return seq; } - list > &rq = out_q[CEPH_MSG_PRIO_HIGHEST]; + list &rq = out_q[CEPH_MSG_PRIO_HIGHEST]; uint64_t count = out_seq; while (!rq.empty()) { - pair p = rq.front(); - if (p.second->get_seq() == 0 || p.second->get_seq() > seq) break; - ldout(cct, 10) << __func__ << " " << *(p.second) << " for resend seq " - << p.second->get_seq() << " <= " << seq << ", discarding" + Message* const m = rq.front().m; + if (m->get_seq() == 0 || m->get_seq() > seq) break; + ldout(cct, 10) << __func__ << " " << *(m) << " for resend seq " + << m->get_seq() << " <= " << seq << ", discarding" << dendl; - p.second->put(); + m->put(); rq.pop_front(); count++; } @@ -1245,13 +1250,13 @@ void ProtocolV1::discard_out_queue() { (*p)->put(); } sent.clear(); - for (map > >::iterator p = + for (map>::iterator p = out_q.begin(); p != out_q.end(); ++p) { - for (list >::iterator r = p->second.begin(); + for (list::iterator r = p->second.begin(); r != p->second.end(); ++r) { - ldout(cct, 20) << __func__ << " discard " << r->second << dendl; - r->second->put(); + ldout(cct, 20) << __func__ << " discard " << r->m << dendl; + r->m->put(); } } out_q.clear(); @@ -1320,22 +1325,18 @@ void ProtocolV1::reset_recv_state() } } -Message *ProtocolV1::_get_next_outgoing(ceph::buffer::list *bl) { - Message *m = 0; +ProtocolV1::out_q_entry_t ProtocolV1::_get_next_outgoing() { + out_q_entry_t out_entry; if (!out_q.empty()) { - map > >::reverse_iterator it = + map>::reverse_iterator it = out_q.rbegin(); ceph_assert(!it->second.empty()); - list >::iterator p = it->second.begin(); - m = p->second; - if (p->first.length() && bl) { - assert(bl->length() == 0); - bl->swap(p->first); - } + list::iterator p = it->second.begin(); + out_entry = *p; it->second.erase(p); if (it->second.empty()) out_q.erase(it->first); } - return m; + return out_entry; } /** diff --git a/src/msg/async/ProtocolV1.h b/src/msg/async/ProtocolV1.h index b23860e8a01..1b7c1d2b5f8 100644 --- a/src/msg/async/ProtocolV1.h +++ b/src/msg/async/ProtocolV1.h @@ -105,8 +105,14 @@ protected: enum class WriteStatus { NOWRITE, REPLACING, CANWRITE, CLOSED }; std::atomic can_write; std::list sent; // the first ceph::buffer::list need to inject seq + //struct for outbound msgs + struct out_q_entry_t { + ceph::buffer::list bl; + Message* m {nullptr}; + bool is_prepared {false}; + }; // priority queue for outbound msgs - std::map>> out_q; + std::map> out_q; bool keepalive; bool write_in_progress = false; @@ -194,7 +200,7 @@ protected: void session_reset(); void randomize_out_seq(); - Message *_get_next_outgoing(ceph::buffer::list *bl); + out_q_entry_t _get_next_outgoing(); void prepare_send_message(uint64_t features, Message *m, ceph::buffer::list &bl); ssize_t write_message(Message *m, ceph::buffer::list &bl, bool more); diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index 08426b796b8..7c4a4d0fe94 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -431,12 +431,15 @@ void ProtocolV2::send_message(Message *m) { // TODO: Currently not all messages supports reencode like MOSDMap, so here // only let fast dispatch support messages prepare message const bool can_fast_prepare = messenger->ms_can_fast_dispatch(m); - if (can_fast_prepare) { + bool is_prepared; + if (can_fast_prepare && f) { prepare_send_message(f, m); + is_prepared = can_fast_prepare; + } else { + is_prepared = false; } std::lock_guard l(connection->write_lock); - bool is_prepared = can_fast_prepare; // "features" changes will change the payload encoding if (can_fast_prepare && (!can_write || connection->get_features() != f)) { // ensure the correctness of message encoding