// TODO: Currently not all messages supports reencode like MOSDMap, so here
// only let fast dispatch support messages prepare message
bool can_fast_prepare = messenger->ms_can_fast_dispatch(m);
- if (can_fast_prepare) {
+ bool is_prepared = false;
+ if (can_fast_prepare && f) {
prepare_send_message(f, m, bl);
+ is_prepared = true;
}
std::lock_guard<std::mutex> l(connection->write_lock);
} else {
m->queue_start = ceph::mono_clock::now();
m->trace.event("async enqueueing message");
- out_q[m->get_priority()].emplace_back(std::move(bl), m);
+ out_q[m->get_priority()].emplace_back(out_q_entry_t{
+ std::move(bl), m, is_prepared});
ldout(cct, 15) << __func__ << " inline write is denied, reschedule m=" << m
<< dendl;
if (can_write != WriteStatus::REPLACING && !write_in_progress) {
}
}
- ceph::buffer::list data;
- Message *m = _get_next_outgoing(&data);
+ const out_q_entry_t out_entry = _get_next_outgoing();
+ Message *m = out_entry.m;
+ ceph::buffer::list data = out_entry.bl;
+
if (!m) {
break;
}
connection->write_lock.unlock();
// send_message or requeue messages may not encode message
- if (!data.length()) {
+ if (!data.length() || !out_entry.is_prepared) {
prepare_send_message(connection->get_features(), m, data);
}
return;
}
- list<pair<ceph::buffer::list, Message *> > &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
+ list<out_q_entry_t> &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
out_seq -= sent.size();
while (!sent.empty()) {
Message *m = sent.back();
ldout(cct, 10) << __func__ << " " << *m << " for resend "
<< " (" << m->get_seq() << ")" << dendl;
m->clear_payload();
- rq.push_front(make_pair(ceph::buffer::list(), m));
+ rq.push_front(out_q_entry_t{ceph::buffer::list(), m, false});
}
}
if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0) {
return seq;
}
- list<pair<ceph::buffer::list, Message *> > &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
+ list<out_q_entry_t> &rq = out_q[CEPH_MSG_PRIO_HIGHEST];
uint64_t count = out_seq;
while (!rq.empty()) {
- pair<ceph::buffer::list, Message *> p = rq.front();
- if (p.second->get_seq() == 0 || p.second->get_seq() > seq) break;
- ldout(cct, 10) << __func__ << " " << *(p.second) << " for resend seq "
- << p.second->get_seq() << " <= " << seq << ", discarding"
+ Message* const m = rq.front().m;
+ if (m->get_seq() == 0 || m->get_seq() > seq) break;
+ ldout(cct, 10) << __func__ << " " << *(m) << " for resend seq "
+ << m->get_seq() << " <= " << seq << ", discarding"
<< dendl;
- p.second->put();
+ m->put();
rq.pop_front();
count++;
}
(*p)->put();
}
sent.clear();
- for (map<int, list<pair<ceph::buffer::list, Message *> > >::iterator p =
+ for (map<int, list<out_q_entry_t>>::iterator p =
out_q.begin();
p != out_q.end(); ++p) {
- for (list<pair<ceph::buffer::list, Message *> >::iterator r = p->second.begin();
+ for (list<out_q_entry_t>::iterator r = p->second.begin();
r != p->second.end(); ++r) {
- ldout(cct, 20) << __func__ << " discard " << r->second << dendl;
- r->second->put();
+ ldout(cct, 20) << __func__ << " discard " << r->m << dendl;
+ r->m->put();
}
}
out_q.clear();
}
}
-Message *ProtocolV1::_get_next_outgoing(ceph::buffer::list *bl) {
- Message *m = 0;
+ProtocolV1::out_q_entry_t ProtocolV1::_get_next_outgoing() {
+ out_q_entry_t out_entry;
if (!out_q.empty()) {
- map<int, list<pair<ceph::buffer::list, Message *> > >::reverse_iterator it =
+ map<int, list<out_q_entry_t>>::reverse_iterator it =
out_q.rbegin();
ceph_assert(!it->second.empty());
- list<pair<ceph::buffer::list, Message *> >::iterator p = it->second.begin();
- m = p->second;
- if (p->first.length() && bl) {
- assert(bl->length() == 0);
- bl->swap(p->first);
- }
+ list<out_q_entry_t>::iterator p = it->second.begin();
+ out_entry = *p;
it->second.erase(p);
if (it->second.empty()) out_q.erase(it->first);
}
- return m;
+ return out_entry;
}
/**
enum class WriteStatus { NOWRITE, REPLACING, CANWRITE, CLOSED };
std::atomic<WriteStatus> can_write;
std::list<Message *> sent; // the first ceph::buffer::list need to inject seq
+ //struct for outbound msgs
+ struct out_q_entry_t {
+ ceph::buffer::list bl;
+ Message* m {nullptr};
+ bool is_prepared {false};
+ };
// priority queue for outbound msgs
- std::map<int, std::list<std::pair<ceph::buffer::list, Message *>>> out_q;
+ std::map<int, std::list<out_q_entry_t>> out_q;
bool keepalive;
bool write_in_progress = false;
void session_reset();
void randomize_out_seq();
- Message *_get_next_outgoing(ceph::buffer::list *bl);
+ out_q_entry_t _get_next_outgoing();
void prepare_send_message(uint64_t features, Message *m, ceph::buffer::list &bl);
ssize_t write_message(Message *m, ceph::buffer::list &bl, bool more);