}
}
-const int ASYNC_COALESCE_THRESHOLD = 256;
-
#define WRITE(B, D, C) write(D, CONTINUATION(C), B)
#define READ(L, C) read(CONTINUATION(C), L)
template <class T>
struct Frame {
protected:
- bufferlist payload;
- bufferlist frame_buffer;
+ ceph::bufferlist payload;
+ ceph::bufferlist::contiguous_filler preamble_filler;
public:
- Frame() {}
+ Frame() : preamble_filler(payload.append_hole(8)) {}
bufferlist &get_buffer(const uint32_t extra_payload_len = 0) {
- if (frame_buffer.length()) {
- return frame_buffer;
- }
- encode(static_cast<uint32_t>(payload.length() + extra_payload_len + sizeof(uint32_t)),
- frame_buffer, -1ll);
- uint32_t tag = static_cast<uint32_t>(static_cast<T *>(this)->tag);
+ __le32 msg_len = payload.length() + extra_payload_len - sizeof(std::uint32_t);
+ preamble_filler.copy_in(sizeof(msg_len),
+ reinterpret_cast<const char*>(&msg_len));
+
+ __le32 tag = static_cast<uint32_t>(static_cast<T *>(this)->tag);
+ preamble_filler.copy_in(sizeof(tag),
+ reinterpret_cast<const char*>(&tag));
ceph_assert(tag != 0);
- encode(tag, frame_buffer, -1ll);
- frame_buffer.claim_append(payload);
- return frame_buffer;
+
+ return payload;
}
void decode_frame(char *payload, uint32_t length) {
struct SignedEncryptedFrame : public PayloadFrame<T, Args...> {
SignedEncryptedFrame(ProtocolV2 &protocol, const Args &... args)
: PayloadFrame<T, Args...>(args...) {
- protocol.authencrypt_payload(this->payload);
+ ceph::bufferlist trans_bl;
+ this->payload.splice(8, this->payload.length() - 8, &trans_bl);
+ protocol.authencrypt_payload(trans_bl);
+ this->payload.claim_append(trans_bl);
}
SignedEncryptedFrame(ProtocolV2 &protocol, char *payload, uint32_t length)
: public PayloadFrame<MessageHeaderFrame, ceph_msg_header2> {
const ProtocolV2::Tag tag = ProtocolV2::Tag::MESSAGE;
- MessageHeaderFrame(ProtocolV2 &protocol, const ceph_msg_header2 &msghdr)
+ MessageHeaderFrame(ProtocolV2 &protocol,
+ const ceph_msg_header2 &msghdr,
+ ceph::bufferlist&& front_bl,
+ ceph::bufferlist&& middle_bl,
+ ceph::bufferlist&& data_bl)
: PayloadFrame<MessageHeaderFrame, ceph_msg_header2>(msghdr) {
- protocol.authencrypt_payload(this->payload);
+ ceph::bufferlist trans_bl;
+ this->payload.splice(8, this->payload.length() - 8, &trans_bl);
+ protocol.authencrypt_payload(trans_bl);
+ this->payload.claim_append(trans_bl);
+
+ trans_bl.append(front_bl);
+ trans_bl.append(middle_bl);
+ trans_bl.append(data_bl);
+
+ protocol.authencrypt_payload(trans_bl);
+ this->payload.claim_append(trans_bl);
}
MessageHeaderFrame(ProtocolV2 &protocol, char *payload, uint32_t length)
(*p)->put();
}
sent.clear();
- for (map<int, list<pair<bufferlist, Message *>>>::iterator p =
- out_queue.begin();
- p != out_queue.end(); ++p) {
- for (list<pair<bufferlist, Message *>>::iterator r = p->second.begin();
- r != p->second.end(); ++r) {
- ldout(cct, 20) << __func__ << " discard " << r->second << dendl;
- r->second->put();
+ for (auto& [ prio, entries ] : out_queue) {
+ for (auto& entry : entries) {
+ ldout(cct, 20) << __func__ << " discard " << *entry.m << dendl;
+ entry.m->put();
}
}
out_queue.clear();
return;
}
- list<pair<bufferlist, Message *>> &rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
+ auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
out_seq -= sent.size();
while (!sent.empty()) {
Message *m = sent.back();
ldout(cct, 5) << __func__ << " requeueing message m=" << m
<< " seq=" << m->get_seq() << " type=" << m->get_type() << " "
<< *m << dendl;
- rq.push_front(make_pair(bufferlist(), m));
+ rq.emplace_front(out_queue_entry_t{false, m});
}
}
if (out_queue.count(CEPH_MSG_PRIO_HIGHEST) == 0) {
return seq;
}
- list<pair<bufferlist, Message *>> &rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
+ auto& rq = out_queue[CEPH_MSG_PRIO_HIGHEST];
uint64_t count = out_seq;
while (!rq.empty()) {
- pair<bufferlist, Message *> p = rq.front();
- if (p.second->get_seq() == 0 || p.second->get_seq() > seq) break;
- ldout(cct, 5) << __func__ << " discarding message m=" << p.second
- << " seq=" << p.second->get_seq() << " ack_seq=" << seq << " "
- << *(p.second) << dendl;
- p.second->put();
+ Message* const m = rq.front().m;
+ if (m->get_seq() == 0 || m->get_seq() > seq) break;
+ ldout(cct, 5) << __func__ << " discarding message m=" << m
+ << " seq=" << m->get_seq() << " ack_seq=" << seq << " "
+ << *m << dendl;
+ m->put();
rq.pop_front();
count++;
}
return nullptr;
}
-void ProtocolV2::prepare_send_message(uint64_t features, Message *m,
- bufferlist &bl) {
+void ProtocolV2::prepare_send_message(uint64_t features,
+ Message *m) {
ldout(cct, 20) << __func__ << " m=" << *m << dendl;
// associate message with Connection (for benefit of encode_payload)
// encode and copy out of *m
m->encode(features, messenger->crcflags);
-
- bl.append(m->get_payload());
- bl.append(m->get_middle());
- bl.append(m->get_data());
}
void ProtocolV2::send_message(Message *m) {
- bufferlist bl;
uint64_t f = connection->get_features();
// TODO: Currently not all messages supports reencode like MOSDMap, so here
// only let fast dispatch support messages prepare message
- bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
+ const bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
if (can_fast_prepare) {
- prepare_send_message(f, m, bl);
+ prepare_send_message(f, m);
}
std::lock_guard<std::mutex> l(connection->write_lock);
+ bool is_prepared = can_fast_prepare;
// "features" changes will change the payload encoding
if (can_fast_prepare && (!can_write || connection->get_features() != f)) {
// ensure the correctness of message encoding
- bl.clear();
m->clear_payload();
+ is_prepared = false;
ldout(cct, 10) << __func__ << " clear encoded buffer previous " << f
<< " != " << connection->get_features() << dendl;
}
ldout(cct, 5) << __func__ << " enqueueing message m=" << m
<< " type=" << m->get_type() << " " << *m << dendl;
m->trace.event("async enqueueing message");
- out_queue[m->get_priority()].emplace_back(std::move(bl), m);
+ out_queue[m->get_priority()].emplace_back(
+ out_queue_entry_t{is_prepared, m});
ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
<< dendl;
if ((!replacing && can_write) || state == STANDBY) {
}
}
-Message *ProtocolV2::_get_next_outgoing(bufferlist *bl) {
- Message *m = 0;
+ProtocolV2::out_queue_entry_t ProtocolV2::_get_next_outgoing() {
+ out_queue_entry_t out_entry;
+
if (!out_queue.empty()) {
- map<int, list<pair<bufferlist, Message *>>>::reverse_iterator it =
- out_queue.rbegin();
- ceph_assert(!it->second.empty());
- list<pair<bufferlist, Message *>>::iterator p = it->second.begin();
- m = p->second;
- if (bl) {
- bl->swap(p->first);
- }
- it->second.erase(p);
- if (it->second.empty()) {
+ auto it = out_queue.rbegin();
+ auto& entries = it->second;
+ ceph_assert(!entries.empty());
+ out_entry = entries.front();
+ entries.pop_front();
+ if (entries.empty()) {
out_queue.erase(it->first);
}
}
- return m;
+ return out_entry;
}
-ssize_t ProtocolV2::write_message(Message *m, bufferlist &bl, bool more) {
+ssize_t ProtocolV2::write_message(Message *m, bool more) {
FUNCTRACE(cct);
ceph_assert(connection->center->in_thread());
m->set_seq(++out_seq);
sizeof(header2) - sizeof(header2.header_crc));
}
- bufferlist flat_bl;
- if ((bl.length() <= ASYNC_COALESCE_THRESHOLD) && (bl.buffers().size() > 1)) {
- for (const auto &pb : bl.buffers()) {
- flat_bl.append((char *)pb.c_str(), pb.length());
- }
- } else {
- flat_bl.claim_append(bl);
- }
- MessageHeaderFrame message(*this, header2);
- authencrypt_payload(flat_bl);
+ MessageHeaderFrame message(*this, header2,
+ ceph::bufferlist(m->get_payload()),
+ ceph::bufferlist(m->get_middle()),
+ ceph::bufferlist(m->get_data()));
ldout(cct, 5) << __func__ << " sending message m=" << m
<< " seq=" << m->get_seq() << " " << *m << dendl;
- bufferlist &msg_bl = message.get_buffer(flat_bl.length());
- connection->outcoming_bl.claim_append(msg_bl);
- connection->outcoming_bl.claim_append(flat_bl);
+ connection->outcoming_bl.claim_append(message.get_buffer());
m->trace.event("async writing message");
ldout(cct, 20) << __func__ << " sending m=" << m << " seq=" << m->get_seq()
auto start = ceph::mono_clock::now();
bool more;
do {
- bufferlist data;
- Message *m = _get_next_outgoing(&data);
- if (!m) {
+ const auto out_entry = _get_next_outgoing();
+ if (!out_entry.m) {
break;
}
if (!connection->policy.lossy) {
// put on sent list
- sent.push_back(m);
- m->get();
+ sent.push_back(out_entry.m);
+ out_entry.m->get();
}
more = !out_queue.empty();
connection->write_lock.unlock();
// send_message or requeue messages may not encode message
- if (!data.length()) {
- prepare_send_message(connection->get_features(), m, data);
+ if (!out_entry.is_prepared) {
+ prepare_send_message(connection->get_features(), out_entry.m);
}
- r = write_message(m, data, more);
+ r = write_message(out_entry.m, more);
connection->write_lock.lock();
if (r == 0) {