if (marrival.empty())
return 0;
else
- return (now - marrival.begin()->first);
+ return (now - *marrival.begin());
}
uint64_t DispatchQueue::pre_dispatch(const ref_t<Message>& m)
return;
}
ldout(cct,20) << "queue " << m << " prio " << priority << dendl;
- add_arrival(m);
+ QueueItem item{m};
+ add_arrival(item);
if (priority >= CEPH_MSG_PRIO_LOW) {
- mqueue.enqueue_strict(id, priority, QueueItem(m));
+ mqueue.enqueue_strict(id, priority, std::move(item));
} else {
- mqueue.enqueue(id, priority, m->get_cost(), QueueItem(m));
+ mqueue.enqueue(id, priority, m->get_cost(), std::move(item));
}
cond.notify_all();
}
while (!mqueue.empty()) {
QueueItem qitem = mqueue.dequeue();
if (!qitem.is_code())
- remove_arrival(qitem.get_message());
+ remove_arrival(qitem);
l.unlock();
if (qitem.is_code()) {
for (auto i = removed.begin(); i != removed.end(); ++i) {
ceph_assert(!(i->is_code())); // We don't discard id 0, ever!
const ref_t<Message>& m = i->get_message();
- remove_arrival(m);
+ remove_arrival(*i);
dispatch_throttle_release(m->get_dispatch_throttle_size());
}
}
#define CEPH_DISPATCHQUEUE_H
#include <atomic>
-#include <map>
+#include <set>
#include <queue>
#include <boost/intrusive_ptr.hpp>
#include "include/ceph_assert.h"
* See Messenger::dispatch_entry for details.
*/
class DispatchQueue {
+ using ArrivalSet = std::multiset<double>;
+ ArrivalSet marrival;
+
class QueueItem {
int type;
ConnectionRef con;
ceph_assert(is_code());
return con.get();
}
+
+ /**
+ * An iterator into #marrival. This field is only initialized if
+ * `!is_code()`. It is set by add_arrival() and used by
+ * remove_arrival().
+ */
+ ArrivalSet::iterator arrival;
};
CephContext *cct;
PrioritizedQueue<QueueItem, uint64_t> mqueue;
- std::set<std::pair<double, ceph::ref_t<Message>>> marrival;
- std::map<ceph::ref_t<Message>, decltype(marrival)::iterator> marrival_map;
- void add_arrival(const ceph::ref_t<Message>& m) {
- marrival_map.insert(
- make_pair(
- m,
- marrival.insert(std::make_pair(m->get_recv_stamp(), m)).first
- )
- );
+ void add_arrival(QueueItem &item) {
+ item.arrival = marrival.insert(item.get_message()->get_recv_stamp());
}
- void remove_arrival(const ceph::ref_t<Message>& m) {
- auto it = marrival_map.find(m);
- ceph_assert(it != marrival_map.end());
- marrival.erase(it->second);
- marrival_map.erase(it);
+ void remove_arrival(QueueItem &item) {
+ marrival.erase(item.arrival);
}
std::atomic<uint64_t> next_id;