Each OSDService now has two AsyncReserver instances: one for backfills going
from the osd (local_reserver) and one for backfills going to the osd
(remote_reserver). An AsyncReserver (common/AsyncReserver.h) manages a queue
-of waiting items and a set of current reservation holders. When a slot frees
-up, the AsyncReserver queues the Context* associated with the next item in the
-finisher provided to the constructor.
+by priority of waiting items and a set of current reservation holders. When a
+slot frees up, the AsyncReserver queues the Context* associated with the next
+item on the highest priority queue in the finisher provided to the constructor.
For a primary to initiate a backfill, it must first obtain a reservation from
its own local_reserver. Then, it must obtain a reservation from the backfill
/**
* Manages a configurable number of asyncronous reservations.
+ *
+ * Memory usage is linear with the number of items queued and
+ * linear with respect to the total number of priorities used
+ * over all time.
*/
template <typename T>
class AsyncReserver {
unsigned max_allowed;
Mutex lock;
- list<pair<T, Context*> > queue;
- map<T, typename list<pair<T, Context*> >::iterator > queue_pointers;
+ map<unsigned, list<pair<T, Context*> > > queues;
+ map<T, pair<unsigned, typename list<pair<T, Context*> >::iterator > > queue_pointers;
set<T> in_progress;
void do_queues() {
- while (in_progress.size() < max_allowed &&
- !queue.empty()) {
- pair<T, Context*> p = queue.front();
- queue_pointers.erase(p.first);
- queue.pop_front();
- f->queue(p.second);
- in_progress.insert(p.first);
+ typename map<unsigned, list<pair<T, Context*> > >::reverse_iterator it;
+ for (it = queues.rbegin();
+ it != queues.rend() && in_progress.size() < max_allowed;
+ ++it) {
+ while (in_progress.size() < max_allowed &&
+ !it->second.empty()) {
+ pair<T, Context*> p = it->second.front();
+ queue_pointers.erase(p.first);
+ it->second.pop_front();
+ f->queue(p.second);
+ in_progress.insert(p.first);
+ }
}
}
public:
*/
void request_reservation(
T item, ///< [in] reservation key
- Context *on_reserved ///< [in] callback to be called on reservation
+ Context *on_reserved, ///< [in] callback to be called on reservation
+ unsigned prio
) {
Mutex::Locker l(lock);
assert(!queue_pointers.count(item) &&
!in_progress.count(item));
- queue.push_back(make_pair(item, on_reserved));
- queue_pointers.insert(make_pair(item, --queue.end()));
+ queues[prio].push_back(make_pair(item, on_reserved));
+ queue_pointers.insert(make_pair(item, make_pair(prio,--(queues[prio]).end())));
do_queues();
}
) {
Mutex::Locker l(lock);
if (queue_pointers.count(item)) {
- delete queue_pointers[item]->second;
- queue.erase(queue_pointers[item]);
+ unsigned prio = queue_pointers[item].first;
+ delete queue_pointers[item].second->second;
+ queues[prio].erase(queue_pointers[item].second);
queue_pointers.erase(item);
} else {
in_progress.erase(item);
}
do_queues();
}
+ static const unsigned MAX_PRIORITY = (unsigned)-1;
};
#endif