enable the WorkQueue to support move-only template types.
Signed-off-by: Samuel Just <sjust@redhat.com>
Signed-off-by: Kefu Chai <kchai@redhat.com>
Signed-off-by: Myoungwon Oh <omwmw@sk.com>
// them into out.
virtual void remove_by_class(K k, std::list<T> *out) = 0;
// Enqueue op in the back of the strict queue
- virtual void enqueue_strict(K cl, unsigned priority, T item) = 0;
+ virtual void enqueue_strict(K cl, unsigned priority, T &&item) = 0;
// Enqueue op in the front of the strict queue
- virtual void enqueue_strict_front(K cl, unsigned priority, T item) = 0;
+ virtual void enqueue_strict_front(K cl, unsigned priority, T &&item) = 0;
// Enqueue op in the back of the regular queue
- virtual void enqueue(K cl, unsigned priority, unsigned cost, T item) = 0;
+ virtual void enqueue(K cl, unsigned priority, unsigned cost, T &&item) = 0;
// Enqueue the op in the front of the regular queue
- virtual void enqueue_front(K cl, unsigned priority, unsigned cost, T item) = 0;
+ virtual void enqueue_front(
+ K cl, unsigned priority, unsigned cost, T &&item) = 0;
// Returns if the queue is empty
virtual bool empty() const = 0;
// Return an op to be dispatch
tokens = 0;
}
}
- void enqueue(K cl, unsigned cost, T item) {
- q[cl].push_back(std::make_pair(cost, item));
+ void enqueue(K cl, unsigned cost, T &&item) {
+ q[cl].push_back(std::make_pair(cost, std::move(item)));
if (cur == q.end())
cur = q.begin();
size++;
}
- void enqueue_front(K cl, unsigned cost, T item) {
- q[cl].push_front(std::make_pair(cost, item));
+ void enqueue_front(K cl, unsigned cost, T &&item) {
+ q[cl].push_front(std::make_pair(cost, std::move(item)));
if (cur == q.end())
cur = q.begin();
size++;
}
- std::pair<unsigned, T> front() const {
+ std::pair<unsigned, T> &front() const {
assert(!(q.empty()));
assert(cur != q.end());
return cur->second.front();
}
- void pop_front() {
+ T pop_front() {
assert(!(q.empty()));
assert(cur != q.end());
+ T ret = std::move(cur->second.front().second);
cur->second.pop_front();
if (cur->second.empty()) {
q.erase(cur++);
cur = q.begin();
}
size--;
+ return ret;
}
unsigned length() const {
assert(size >= 0);
i->second.rbegin();
j != i->second.rend();
++j) {
- out->push_front(j->second);
+ out->push_front(std::move(j->second));
}
}
q.erase(i);
}
}
- void enqueue_strict(K cl, unsigned priority, T item) final {
- high_queue[priority].enqueue(cl, 0, item);
+ void enqueue_strict(K cl, unsigned priority, T&& item) final {
+ high_queue[priority].enqueue(cl, 0, std::move(item));
}
- void enqueue_strict_front(K cl, unsigned priority, T item) final {
- high_queue[priority].enqueue_front(cl, 0, item);
+ void enqueue_strict_front(K cl, unsigned priority, T&& item) final {
+ high_queue[priority].enqueue_front(cl, 0, std::move(item));
}
- void enqueue(K cl, unsigned priority, unsigned cost, T item) final {
+ void enqueue(K cl, unsigned priority, unsigned cost, T&& item) final {
if (cost < min_cost)
cost = min_cost;
if (cost > max_tokens_per_subqueue)
cost = max_tokens_per_subqueue;
- create_queue(priority)->enqueue(cl, cost, item);
+ create_queue(priority)->enqueue(cl, cost, std::move(item));
}
- void enqueue_front(K cl, unsigned priority, unsigned cost, T item) final {
+ void enqueue_front(K cl, unsigned priority, unsigned cost, T&& item) final {
if (cost < min_cost)
cost = min_cost;
if (cost > max_tokens_per_subqueue)
cost = max_tokens_per_subqueue;
- create_queue(priority)->enqueue_front(cl, cost, item);
+ create_queue(priority)->enqueue_front(cl, cost, std::move(item));
}
bool empty() const final {
assert(!empty());
if (!(high_queue.empty())) {
- T ret = high_queue.rbegin()->second.front().second;
+ T ret = std::move(high_queue.rbegin()->second.front().second);
high_queue.rbegin()->second.pop_front();
if (high_queue.rbegin()->second.empty()) {
high_queue.erase(high_queue.rbegin()->first);
++i) {
assert(!(i->second.empty()));
if (i->second.front().first < i->second.num_tokens()) {
- T ret = i->second.front().second;
unsigned cost = i->second.front().first;
i->second.take_tokens(cost);
+ T ret = std::move(i->second.front().second);
i->second.pop_front();
if (i->second.empty()) {
remove_queue(i->first);
// if no subqueues have sufficient tokens, we behave like a strict
// priority queue.
- T ret = queue.rbegin()->second.front().second;
unsigned cost = queue.rbegin()->second.front().first;
+ T ret = std::move(queue.rbegin()->second.front().second);
queue.rbegin()->second.pop_front();
if (queue.rbegin()->second.empty()) {
remove_queue(queue.rbegin()->first);
public:
unsigned cost;
T item;
- ListPair(unsigned c, T& i) :
+ ListPair(unsigned c, T&& i) :
cost(c),
- item(i)
- {}
+ item(std::move(i))
+ {}
};
class Klass : public bi::set_base_hook<>
{
{ return a.key > b.key; }
friend bool operator== (const Klass &a, const Klass &b)
{ return a.key == b.key; }
- void insert(unsigned cost, T& item, bool front) {
+ void insert(unsigned cost, T&& item, bool front) {
if (front) {
- lp.push_front(*new ListPair(cost, item));
+ lp.push_front(*new ListPair(cost, std::move(item)));
} else {
- lp.push_back(*new ListPair(cost, item));
+ lp.push_back(*new ListPair(cost, std::move(item)));
}
}
//Get the cost of the next item to dequeue
}
T pop() {
assert(!lp.empty());
- T ret = lp.begin()->item;
+ T ret = std::move(lp.begin()->item);
lp.erase_and_dispose(lp.begin(), DelItem<ListPair>());
return ret;
}
unsigned count = 0;
for (Lit i = --lp.end();; --i) {
if (out) {
- out->push_front(i->item);
+ out->push_front(std::move(i->item));
}
i = lp.erase_and_dispose(i, DelItem<ListPair>());
++count;
bool empty() const {
return klasses.empty();
}
- void insert(K cl, unsigned cost, T& item, bool front = false) {
+ void insert(K cl, unsigned cost, T&& item, bool front = false) {
typename Klasses::insert_commit_data insert_data;
std::pair<Kit, bool> ret =
klasses.insert_unique_check(cl, MapKey<Klass, K>(), insert_data);
ret.first = klasses.insert_unique_commit(*new Klass(cl), insert_data);
check_end();
}
- ret.first->insert(cost, item, front);
+ ret.first->insert(cost, std::move(item), front);
}
unsigned get_cost() const {
assert(!empty());
bool empty() const {
return !size;
}
- void insert(unsigned p, K cl, unsigned cost, T& item, bool front = false) {
+ void insert(unsigned p, K cl, unsigned cost, T&& item, bool front = false) {
typename SubQueues::insert_commit_data insert_data;
std::pair<typename SubQueues::iterator, bool> ret =
queues.insert_unique_check(p, MapKey<SubQueue, unsigned>(), insert_data);
ret.first = queues.insert_unique_commit(*new SubQueue(p), insert_data);
total_prio += p;
}
- ret.first->insert(cl, cost, item, front);
+ ret.first->insert(cl, cost, std::move(item), front);
if (cost > max_cost) {
max_cost = cost;
}
bool empty() const final {
return !(strict.size + normal.size);
}
- void enqueue_strict(K cl, unsigned p, T item) final {
- strict.insert(p, cl, 0, item);
+ void enqueue_strict(K cl, unsigned p, T&& item) final {
+ strict.insert(p, cl, 0, std::move(item));
}
- void enqueue_strict_front(K cl, unsigned p, T item) final {
- strict.insert(p, cl, 0, item, true);
+ void enqueue_strict_front(K cl, unsigned p, T&& item) final {
+ strict.insert(p, cl, 0, std::move(item), true);
}
- void enqueue(K cl, unsigned p, unsigned cost, T item) final {
- normal.insert(p, cl, cost, item);
+ void enqueue(K cl, unsigned p, unsigned cost, T&& item) final {
+ normal.insert(p, cl, cost, std::move(item));
}
- void enqueue_front(K cl, unsigned p, unsigned cost, T item) final {
- normal.insert(p, cl, cost, item, true);
+ void enqueue_front(K cl, unsigned p, unsigned cost, T&& item) final {
+ normal.insert(p, cl, cost, std::move(item), true);
}
T dequeue() override {
assert(strict.size + normal.size > 0);
ShardedThreadPool* sharded_pool;
protected:
- virtual void _enqueue(T) = 0;
- virtual void _enqueue_front(T) = 0;
+ virtual void _enqueue(T&&) = 0;
+ virtual void _enqueue_front(T&&) = 0;
public:
}
~ShardedWQ() override {}
- void queue(T item) {
- _enqueue(item);
+ void queue(T&& item) {
+ _enqueue(std::move(item));
}
- void queue_front(T item) {
- _enqueue_front(item);
+ void queue_front(T&& item) {
+ _enqueue_front(std::move(item));
}
void drain() {
sharded_pool->drain();
typedef std::list<std::pair<cost_t, T> > ListPairs;
static unsigned filter_list_pairs(ListPairs *l,
- std::function<bool (const T&)> f,
+ std::function<bool (T&&)> f,
std::list<T>* out = nullptr) {
unsigned ret = 0;
for (typename ListPairs::iterator i = l->end();
) {
auto next = i;
--next;
- if (f(next->second)) {
+ if (f(std::move(next->second))) {
++ret;
- if (out) out->push_back(next->second);
+ if (out) out->push_back(std::move(next->second));
l->erase(next);
} else {
i = next;
}
}
- void enqueue(K cl, cost_t cost, T item) {
- q[cl].push_back(std::make_pair(cost, item));
+ void enqueue(K cl, cost_t cost, T&& item) {
+ q[cl].emplace_back(cost, std::move(item));
if (cur == q.end())
cur = q.begin();
size++;
}
- void enqueue_front(K cl, cost_t cost, T item) {
- q[cl].push_front(std::make_pair(cost, item));
+ void enqueue_front(K cl, cost_t cost, T&& item) {
+ q[cl].emplace_front(cost, std::move(item));
if (cur == q.end())
cur = q.begin();
size++;
}
- std::pair<cost_t, T> front() const {
+ const std::pair<cost_t, T>& front() const {
+ assert(!(q.empty()));
+ assert(cur != q.end());
+ return cur->second.front();
+ }
+
+ std::pair<cost_t, T>& front() {
assert(!(q.empty()));
assert(cur != q.end());
return cur->second.front();
return q.empty();
}
- void remove_by_filter(std::function<bool (const T&)> f) {
+ void remove_by_filter(std::function<bool (T&&)> f) {
for (typename Classes::iterator i = q.begin();
i != q.end();
/* no-inc */) {
}
if (out) {
for (auto j = i->second.rbegin(); j != i->second.rend(); ++j) {
- out->push_front(j->second);
+ out->push_front(std::move(j->second));
}
}
q.erase(i);
// be sure to do things in reverse priority order and push_front
// to the list so items end up on list in front-to-back priority
// order
- void remove_by_filter(std::function<bool (const T&)> filter_accum) {
+ void remove_by_filter(std::function<bool (T&&)> filter_accum) {
queue.remove_by_req_filter(filter_accum, true);
for (auto i = queue_front.rbegin(); i != queue_front.rend(); /* no-inc */) {
- if (filter_accum(i->second)) {
+ if (filter_accum(std::move(i->second))) {
i = decltype(i){ queue_front.erase(std::next(i).base()) };
} else {
++i;
if (out) {
queue.remove_by_client(k,
true,
- [&out] (const T& t) { out->push_front(t); });
+ [&out] (T&& t) {
+ out->push_front(std::move(t));
+ });
} else {
queue.remove_by_client(k, true);
}
for (auto i = queue_front.rbegin(); i != queue_front.rend(); /* no-inc */) {
if (k == i->first) {
- if (nullptr != out) out->push_front(i->second);
+ if (nullptr != out) out->push_front(std::move(i->second));
i = decltype(i){ queue_front.erase(std::next(i).base()) };
} else {
++i;
}
}
- void enqueue_strict(K cl, unsigned priority, T item) override final {
- high_queue[priority].enqueue(cl, 0, item);
+ void enqueue_strict(K cl, unsigned priority, T&& item) override final {
+ high_queue[priority].enqueue(cl, 0, std::move(item));
}
- void enqueue_strict_front(K cl, unsigned priority, T item) override final {
- high_queue[priority].enqueue_front(cl, 0, item);
+ void enqueue_strict_front(K cl, unsigned priority, T&& item) override final {
+ high_queue[priority].enqueue_front(cl, 0, std::move(item));
}
- void enqueue(K cl, unsigned priority, unsigned cost, T item) override final {
+ void enqueue(K cl, unsigned priority, unsigned cost, T&& item) override final {
// priority is ignored
queue.add_request(std::move(item), cl, cost);
}
void enqueue_front(K cl,
unsigned priority,
unsigned cost,
- T item) override final {
- queue_front.emplace_front(std::pair<K,T>(cl, item));
+ T&& item) override final {
+ queue_front.emplace_front(std::pair<K,T>(cl, std::move(item)));
}
bool empty() const override final {
assert(!empty());
if (!(high_queue.empty())) {
- T ret = high_queue.rbegin()->second.front().second;
+ T ret = std::move(high_queue.rbegin()->second.front().second);
high_queue.rbegin()->second.pop_front();
if (high_queue.rbegin()->second.empty()) {
high_queue.erase(high_queue.rbegin()->first);
}
if (!queue_front.empty()) {
- T ret = queue_front.front().second;
+ T ret = std::move(queue_front.front().second);
queue_front.pop_front();
return ret;
}
auto pr = queue.pull_request();
assert(pr.is_retn());
auto& retn = pr.get_retn();
- return *(retn.request);
+ return std::move(*(retn.request));
}
void dump(ceph::Formatter *f) const override final {
<< " in e" << m->get_map_epoch() << "/" << osdmap->get_epoch();
}
-void OSDService::enqueue_back(spg_t pgid, OpQueueItem qi)
+void OSDService::enqueue_back(spg_t pgid, OpQueueItem&& qi)
{
- osd->op_shardedwq.queue(make_pair(pgid, qi));
+ osd->op_shardedwq.queue(make_pair(pgid, std::move(qi)));
}
-void OSDService::enqueue_front(spg_t pgid, OpQueueItem qi)
+void OSDService::enqueue_front(spg_t pgid, OpQueueItem&& qi)
{
- osd->op_shardedwq.queue_front(make_pair(pgid, qi));
+ osd->op_shardedwq.queue_front(make_pair(pgid, std::move(qi)));
}
void OSDService::queue_for_peering(PG *pg)
pg->unlock();
}
-void OSD::ShardedOpWQ::_enqueue(pair<spg_t, OpQueueItem> item) {
+void OSD::ShardedOpWQ::_enqueue(pair<spg_t, OpQueueItem>&& item) {
uint32_t shard_index =
item.first.hash_to_shard(shard_list.size());
dout(20) << __func__ << " " << item.first << " " << item.second << dendl;
if (priority >= osd->op_prio_cutoff)
sdata->pqueue->enqueue_strict(
- item.second.get_owner(), priority, item);
+ item.second.get_owner(), priority, std::move(item));
else
sdata->pqueue->enqueue(
item.second.get_owner(),
- priority, cost, item);
+ priority, cost, std::move(item));
sdata->sdata_op_ordering_lock.Unlock();
sdata->sdata_lock.Lock();
}
-void OSD::ShardedOpWQ::_enqueue_front(pair<spg_t, OpQueueItem> item)
+void OSD::ShardedOpWQ::_enqueue_front(pair<spg_t, OpQueueItem>&& item)
{
uint32_t shard_index = item.first.hash_to_shard(shard_list.size());
ShardData* sdata = shard_list[shard_index];
} else {
dout(20) << __func__ << " " << item.first << " " << item.second << dendl;
}
- sdata->_enqueue_front(item, osd->op_prio_cutoff);
+ sdata->_enqueue_front(std::move(item), osd->op_prio_cutoff);
sdata->sdata_op_ordering_lock.Unlock();
sdata->sdata_lock.Lock();
sdata->sdata_cond.SignalOne();
GenContextWQ recovery_gen_wq;
ClassHandler *&class_handler;
- void enqueue_back(spg_t pgid, OpQueueItem qi);
- void enqueue_front(spg_t pgid, OpQueueItem qi);
+ void enqueue_back(spg_t pgid, OpQueueItem&& qi);
+ void enqueue_front(spg_t pgid, OpQueueItem&& qi);
void maybe_inject_dispatch_delay() {
if (g_conf->osd_debug_inject_dispatch_delay_probability > 0) {
/// priority queue
std::unique_ptr<OpQueue< pair<spg_t, OpQueueItem>, uint64_t>> pqueue;
- void _enqueue_front(pair<spg_t, OpQueueItem> item, unsigned cutoff) {
+ void _enqueue_front(pair<spg_t, OpQueueItem>&& item, unsigned cutoff) {
unsigned priority = item.second.get_priority();
unsigned cost = item.second.get_cost();
if (priority >= cutoff)
pqueue->enqueue_strict_front(
item.second.get_owner(),
- priority, item);
+ priority, std::move(item));
else
pqueue->enqueue_front(
item.second.get_owner(),
- priority, cost, item);
+ priority, cost, std::move(item));
}
ShardData(
void _process(uint32_t thread_index, heartbeat_handle_d *hb) override;
/// enqueue a new item
- void _enqueue(pair <spg_t, OpQueueItem> item) override;
+ void _enqueue(pair <spg_t, OpQueueItem>&& item) override;
/// requeue an old item (at the front of the line)
- void _enqueue_front(pair <spg_t, OpQueueItem> item) override;
+ void _enqueue_front(pair <spg_t, OpQueueItem>&& item) override;
void return_waiting_threads() override {
for(uint32_t i = 0; i < num_shards; i++) {
inline void mClockClientQueue::enqueue_strict(Client cl,
unsigned priority,
- Request item) {
- queue.enqueue_strict(get_inner_client(cl, item), priority, item);
+ Request&& item) {
+ queue.enqueue_strict(get_inner_client(cl, item), priority,
+ std::move(item));
}
// Enqueue op in the front of the strict queue
inline void mClockClientQueue::enqueue_strict_front(Client cl,
unsigned priority,
- Request item) {
- queue.enqueue_strict_front(get_inner_client(cl, item), priority, item);
+ Request&& item) {
+ queue.enqueue_strict_front(get_inner_client(cl, item), priority,
+ std::move(item));
}
// Enqueue op in the back of the regular queue
inline void mClockClientQueue::enqueue(Client cl,
unsigned priority,
unsigned cost,
- Request item) {
- queue.enqueue(get_inner_client(cl, item), priority, cost, item);
+ Request&& item) {
+ queue.enqueue(get_inner_client(cl, item), priority, cost,
+ std::move(item));
}
// Enqueue the op in the front of the regular queue
inline void mClockClientQueue::enqueue_front(Client cl,
unsigned priority,
unsigned cost,
- Request item) {
- queue.enqueue_front(get_inner_client(cl, item), priority, cost, item);
+ Request&& item) {
+ queue.enqueue_front(get_inner_client(cl, item), priority, cost,
+ std::move(item));
}
// Return an op to be dispatched
inline void remove_by_class(Client cl,
std::list<Request> *out) override final {
queue.remove_by_filter(
- [&cl, out] (const Request& r) -> bool {
+ [&cl, out] (Request&& r) -> bool {
if (cl == r.second.get_owner()) {
- out->push_front(r);
+ out->push_front(std::move(r));
return true;
} else {
return false;
void enqueue_strict(Client cl,
unsigned priority,
- Request item) override final;
+ Request&& item) override final;
// Enqueue op in the front of the strict queue
void enqueue_strict_front(Client cl,
unsigned priority,
- Request item) override final;
+ Request&& item) override final;
// Enqueue op in the back of the regular queue
void enqueue(Client cl,
unsigned priority,
unsigned cost,
- Request item) override final;
+ Request&& item) override final;
// Enqueue the op in the front of the regular queue
void enqueue_front(Client cl,
unsigned priority,
unsigned cost,
- Request item) override final;
+ Request&& item) override final;
// Return an op to be dispatch
Request dequeue() override final;
inline void remove_by_class(Client cl,
std::list<Request> *out) override final {
queue.remove_by_filter(
- [&cl, out] (const Request& r) -> bool {
+ [&cl, out] (Request&& r) -> bool {
if (cl == r.second.get_owner()) {
- out->push_front(r);
+ out->push_front(std::move(r));
return true;
} else {
return false;
inline void enqueue_strict(Client cl,
unsigned priority,
- Request item) override final {
- queue.enqueue_strict(get_osd_op_type(item), priority, item);
+ Request&& item) override final {
+ queue.enqueue_strict(get_osd_op_type(item), priority, std::move(item));
}
// Enqueue op in the front of the strict queue
inline void enqueue_strict_front(Client cl,
unsigned priority,
- Request item) override final {
- queue.enqueue_strict_front(get_osd_op_type(item), priority, item);
+ Request&& item) override final {
+ queue.enqueue_strict_front(get_osd_op_type(item), priority, std::move(item));
}
// Enqueue op in the back of the regular queue
inline void enqueue(Client cl,
unsigned priority,
unsigned cost,
- Request item) override final {
- queue.enqueue(get_osd_op_type(item), priority, cost, item);
+ Request&& item) override final {
+ queue.enqueue(get_osd_op_type(item), priority, cost, std::move(item));
}
// Enqueue the op in the front of the regular queue
inline void enqueue_front(Client cl,
unsigned priority,
unsigned cost,
- Request item) override final {
- queue.enqueue_front(get_osd_op_type(item), priority, cost, item);
+ Request&& item) override final {
+ queue.enqueue_front(get_osd_op_type(item), priority, cost, std::move(item));
}
// Returns if the queue is empty
// 0 .. item_size-1
for (unsigned i = 0; i < item_size; i++) {
unsigned priority = items[i];
- pq.enqueue_strict(Klass(0), priority, items[i]);
+ const Item& item = items[i];
+ pq.enqueue_strict(Klass(0), priority, Item(item));
}
// item_size-1 .. 0
for (unsigned i = item_size; i > 0; i--) {
} else {
num_high_cost++;
}
- pq.enqueue(Klass(0), priority, cost, item);
+ pq.enqueue(Klass(0), priority, cost, Item(item));
}
// the token in all buckets is 0 at the beginning, so dequeue() should pick
// the first one with the highest priority.
Klass k = ITEM_TO_CLASS(item);
unsigned priority = 0;
unsigned cost = 1;
- pq.enqueue(k, priority, cost, item);
+ pq.enqueue(k, priority, cost, Item(item));
}
// just sample first 1/2 of the items
// if i pick too small a dataset, the result won't be statisitcally
for (int i = 0; i < item_size; i++) {
const Item& item = items[i];
Klass k = ITEM_TO_CLASS(item);
- pq.enqueue(k, 0, 0, item);
+ pq.enqueue(k, 0, 0, Item(item));
if (k == class_to_remove) {
num_to_remove++;
}
ASSERT_FALSE(q.empty());
ASSERT_EQ(2u, q.length());
- q.enqueue_front(client2, 12, 0, reqs.back());
+ q.enqueue_front(client2, 12, 0, std::move(reqs.back()));
reqs.pop_back();
- q.enqueue_strict_front(client3, 12, reqs.back());
+ q.enqueue_strict_front(client3, 12, std::move(reqs.back()));
reqs.pop_back();
- q.enqueue_strict_front(client2, 12, reqs.back());
+ q.enqueue_strict_front(client2, 12, std::move(reqs.back()));
reqs.pop_back();
ASSERT_FALSE(q.empty());
ASSERT_FALSE(q.empty());
ASSERT_EQ(2u, q.length());
- q.enqueue_front(client2, 12, 0, reqs.back());
+ q.enqueue_front(client2, 12, 0, std::move(reqs.back()));
reqs.pop_back();
- q.enqueue_strict_front(client3, 12, reqs.back());
+ q.enqueue_strict_front(client3, 12, std::move(reqs.back()));
reqs.pop_back();
- q.enqueue_strict_front(client2, 12, reqs.back());
+ q.enqueue_strict_front(client2, 12, std::move(reqs.back()));
reqs.pop_back();
ASSERT_FALSE(q.empty());