From: Radoslaw Zarzynski Date: Thu, 7 Feb 2019 14:11:27 +0000 (+0100) Subject: msg/async: expose message segmentation to ::write_message(). X-Git-Tag: v14.1.1~157^2~45 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=78b68f8393977fea6f9fc1a486b0d77d86660e7c;p=ceph.git msg/async: expose message segmentation to ::write_message(). Signed-off-by: Radoslaw Zarzynski --- diff --git a/src/msg/async/ProtocolV2.cc b/src/msg/async/ProtocolV2.cc index e19ec4deabb2..cdc1ef658515 100644 --- a/src/msg/async/ProtocolV2.cc +++ b/src/msg/async/ProtocolV2.cc @@ -58,8 +58,6 @@ void ProtocolV2::run_continuation(CtPtr continuation) { } } -const int ASYNC_COALESCE_THRESHOLD = 256; - #define WRITE(B, D, C) write(D, CONTINUATION(C), B) #define READ(L, C) read(CONTINUATION(C), L) @@ -107,23 +105,23 @@ static void alloc_aligned_buffer(bufferlist &data, unsigned len, unsigned off) { template struct Frame { protected: - bufferlist payload; - bufferlist frame_buffer; + ceph::bufferlist payload; + ceph::bufferlist::contiguous_filler preamble_filler; public: - Frame() {} + Frame() : preamble_filler(payload.append_hole(8)) {} bufferlist &get_buffer(const uint32_t extra_payload_len = 0) { - if (frame_buffer.length()) { - return frame_buffer; - } - encode(static_cast(payload.length() + extra_payload_len + sizeof(uint32_t)), - frame_buffer, -1ll); - uint32_t tag = static_cast(static_cast(this)->tag); + __le32 msg_len = payload.length() + extra_payload_len - sizeof(std::uint32_t); + preamble_filler.copy_in(sizeof(msg_len), + reinterpret_cast(&msg_len)); + + __le32 tag = static_cast(static_cast(this)->tag); + preamble_filler.copy_in(sizeof(tag), + reinterpret_cast(&tag)); ceph_assert(tag != 0); - encode(tag, frame_buffer, -1ll); - frame_buffer.claim_append(payload); - return frame_buffer; + + return payload; } void decode_frame(char *payload, uint32_t length) { @@ -279,7 +277,10 @@ template struct SignedEncryptedFrame : public PayloadFrame { SignedEncryptedFrame(ProtocolV2 &protocol, const Args &... args) : PayloadFrame(args...) { - protocol.authencrypt_payload(this->payload); + ceph::bufferlist trans_bl; + this->payload.splice(8, this->payload.length() - 8, &trans_bl); + protocol.authencrypt_payload(trans_bl); + this->payload.claim_append(trans_bl); } SignedEncryptedFrame(ProtocolV2 &protocol, char *payload, uint32_t length) @@ -421,9 +422,23 @@ struct MessageHeaderFrame : public PayloadFrame { const ProtocolV2::Tag tag = ProtocolV2::Tag::MESSAGE; - MessageHeaderFrame(ProtocolV2 &protocol, const ceph_msg_header2 &msghdr) + MessageHeaderFrame(ProtocolV2 &protocol, + const ceph_msg_header2 &msghdr, + ceph::bufferlist&& front_bl, + ceph::bufferlist&& middle_bl, + ceph::bufferlist&& data_bl) : PayloadFrame(msghdr) { - protocol.authencrypt_payload(this->payload); + ceph::bufferlist trans_bl; + this->payload.splice(8, this->payload.length() - 8, &trans_bl); + protocol.authencrypt_payload(trans_bl); + this->payload.claim_append(trans_bl); + + trans_bl.append(front_bl); + trans_bl.append(middle_bl); + trans_bl.append(data_bl); + + protocol.authencrypt_payload(trans_bl); + this->payload.claim_append(trans_bl); } MessageHeaderFrame(ProtocolV2 &protocol, char *payload, uint32_t length) @@ -483,13 +498,10 @@ void ProtocolV2::discard_out_queue() { (*p)->put(); } sent.clear(); - for (map>>::iterator p = - out_queue.begin(); - p != out_queue.end(); ++p) { - for (list>::iterator r = p->second.begin(); - r != p->second.end(); ++r) { - ldout(cct, 20) << __func__ << " discard " << r->second << dendl; - r->second->put(); + for (auto& [ prio, entries ] : out_queue) { + for (auto& entry : entries) { + ldout(cct, 20) << __func__ << " discard " << *entry.m << dendl; + entry.m->put(); } } out_queue.clear(); @@ -546,7 +558,7 @@ void ProtocolV2::requeue_sent() { return; } - list> &rq = out_queue[CEPH_MSG_PRIO_HIGHEST]; + auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST]; out_seq -= sent.size(); while (!sent.empty()) { Message *m = sent.back(); @@ -554,7 +566,7 @@ void ProtocolV2::requeue_sent() { ldout(cct, 5) << __func__ << " requeueing message m=" << m << " seq=" << m->get_seq() << " type=" << m->get_type() << " " << *m << dendl; - rq.push_front(make_pair(bufferlist(), m)); + rq.emplace_front(out_queue_entry_t{false, m}); } } @@ -564,15 +576,15 @@ uint64_t ProtocolV2::discard_requeued_up_to(uint64_t out_seq, uint64_t seq) { if (out_queue.count(CEPH_MSG_PRIO_HIGHEST) == 0) { return seq; } - list> &rq = out_queue[CEPH_MSG_PRIO_HIGHEST]; + auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST]; uint64_t count = out_seq; while (!rq.empty()) { - pair p = rq.front(); - if (p.second->get_seq() == 0 || p.second->get_seq() > seq) break; - ldout(cct, 5) << __func__ << " discarding message m=" << p.second - << " seq=" << p.second->get_seq() << " ack_seq=" << seq << " " - << *(p.second) << dendl; - p.second->put(); + Message* const m = rq.front().m; + if (m->get_seq() == 0 || m->get_seq() > seq) break; + ldout(cct, 5) << __func__ << " discarding message m=" << m + << " seq=" << m->get_seq() << " ack_seq=" << seq << " " + << *m << dendl; + m->put(); rq.pop_front(); count++; } @@ -719,8 +731,8 @@ CtPtr ProtocolV2::_fault() { return nullptr; } -void ProtocolV2::prepare_send_message(uint64_t features, Message *m, - bufferlist &bl) { +void ProtocolV2::prepare_send_message(uint64_t features, + Message *m) { ldout(cct, 20) << __func__ << " m=" << *m << dendl; // associate message with Connection (for benefit of encode_payload) @@ -734,29 +746,25 @@ void ProtocolV2::prepare_send_message(uint64_t features, Message *m, // encode and copy out of *m m->encode(features, messenger->crcflags); - - bl.append(m->get_payload()); - bl.append(m->get_middle()); - bl.append(m->get_data()); } void ProtocolV2::send_message(Message *m) { - bufferlist bl; uint64_t f = connection->get_features(); // TODO: Currently not all messages supports reencode like MOSDMap, so here // only let fast dispatch support messages prepare message - bool can_fast_prepare = messenger->ms_can_fast_dispatch(m); + const bool can_fast_prepare = messenger->ms_can_fast_dispatch(m); if (can_fast_prepare) { - prepare_send_message(f, m, bl); + prepare_send_message(f, m); } std::lock_guard l(connection->write_lock); + bool is_prepared = can_fast_prepare; // "features" changes will change the payload encoding if (can_fast_prepare && (!can_write || connection->get_features() != f)) { // ensure the correctness of message encoding - bl.clear(); m->clear_payload(); + is_prepared = false; ldout(cct, 10) << __func__ << " clear encoded buffer previous " << f << " != " << connection->get_features() << dendl; } @@ -768,7 +776,8 @@ void ProtocolV2::send_message(Message *m) { ldout(cct, 5) << __func__ << " enqueueing message m=" << m << " type=" << m->get_type() << " " << *m << dendl; m->trace.event("async enqueueing message"); - out_queue[m->get_priority()].emplace_back(std::move(bl), m); + out_queue[m->get_priority()].emplace_back( + out_queue_entry_t{is_prepared, m}); ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m << dendl; if ((!replacing && can_write) || state == STANDBY) { @@ -813,26 +822,23 @@ void ProtocolV2::read_event() { } } -Message *ProtocolV2::_get_next_outgoing(bufferlist *bl) { - Message *m = 0; +ProtocolV2::out_queue_entry_t ProtocolV2::_get_next_outgoing() { + out_queue_entry_t out_entry; + if (!out_queue.empty()) { - map>>::reverse_iterator it = - out_queue.rbegin(); - ceph_assert(!it->second.empty()); - list>::iterator p = it->second.begin(); - m = p->second; - if (bl) { - bl->swap(p->first); - } - it->second.erase(p); - if (it->second.empty()) { + auto it = out_queue.rbegin(); + auto& entries = it->second; + ceph_assert(!entries.empty()); + out_entry = entries.front(); + entries.pop_front(); + if (entries.empty()) { out_queue.erase(it->first); } } - return m; + return out_entry; } -ssize_t ProtocolV2::write_message(Message *m, bufferlist &bl, bool more) { +ssize_t ProtocolV2::write_message(Message *m, bool more) { FUNCTRACE(cct); ceph_assert(connection->center->in_thread()); m->set_seq(++out_seq); @@ -861,24 +867,16 @@ ssize_t ProtocolV2::write_message(Message *m, bufferlist &bl, bool more) { sizeof(header2) - sizeof(header2.header_crc)); } - bufferlist flat_bl; - if ((bl.length() <= ASYNC_COALESCE_THRESHOLD) && (bl.buffers().size() > 1)) { - for (const auto &pb : bl.buffers()) { - flat_bl.append((char *)pb.c_str(), pb.length()); - } - } else { - flat_bl.claim_append(bl); - } - MessageHeaderFrame message(*this, header2); - authencrypt_payload(flat_bl); + MessageHeaderFrame message(*this, header2, + ceph::bufferlist(m->get_payload()), + ceph::bufferlist(m->get_middle()), + ceph::bufferlist(m->get_data())); ldout(cct, 5) << __func__ << " sending message m=" << m << " seq=" << m->get_seq() << " " << *m << dendl; - bufferlist &msg_bl = message.get_buffer(flat_bl.length()); - connection->outcoming_bl.claim_append(msg_bl); - connection->outcoming_bl.claim_append(flat_bl); + connection->outcoming_bl.claim_append(message.get_buffer()); m->trace.event("async writing message"); ldout(cct, 20) << __func__ << " sending m=" << m << " seq=" << m->get_seq() @@ -957,26 +955,25 @@ void ProtocolV2::write_event() { auto start = ceph::mono_clock::now(); bool more; do { - bufferlist data; - Message *m = _get_next_outgoing(&data); - if (!m) { + const auto out_entry = _get_next_outgoing(); + if (!out_entry.m) { break; } if (!connection->policy.lossy) { // put on sent list - sent.push_back(m); - m->get(); + sent.push_back(out_entry.m); + out_entry.m->get(); } more = !out_queue.empty(); connection->write_lock.unlock(); // send_message or requeue messages may not encode message - if (!data.length()) { - prepare_send_message(connection->get_features(), m, data); + if (!out_entry.is_prepared) { + prepare_send_message(connection->get_features(), out_entry.m); } - r = write_message(m, data, more); + r = write_message(out_entry.m, more); connection->write_lock.lock(); if (r == 0) { diff --git a/src/msg/async/ProtocolV2.h b/src/msg/async/ProtocolV2.h index d578d378bd8f..059cc28e0850 100644 --- a/src/msg/async/ProtocolV2.h +++ b/src/msg/async/ProtocolV2.h @@ -86,7 +86,11 @@ private: bool reconnecting; bool replacing; bool can_write; - std::map>> out_queue; + struct out_queue_entry_t { + bool is_prepared {false}; + Message* m {nullptr}; + }; + std::map> out_queue; std::list sent; std::atomic out_seq{0}; std::atomic in_seq{0}; @@ -123,9 +127,9 @@ private: Ct *_fault(); void discard_out_queue(); void reset_session(); - void prepare_send_message(uint64_t features, Message *m, bufferlist &bl); - Message *_get_next_outgoing(bufferlist *bl); - ssize_t write_message(Message *m, bufferlist &bl, bool more); + void prepare_send_message(uint64_t features, Message *m); + out_queue_entry_t _get_next_outgoing(); + ssize_t write_message(Message *m, bool more); void append_keepalive(); void append_keepalive_ack(utime_t ×tamp); void handle_message_ack(uint64_t seq);