op_tracker(cct, cct->_conf->osd_enable_op_tracker,
cct->_conf->osd_num_op_tracker_shard),
test_ops_hook(NULL),
+ op_queue(get_io_queue()),
+ op_prio_cutoff(get_io_prio_cut()),
op_shardedwq(
cct->_conf->osd_op_num_shards,
this,
load_pgs();
dout(2) << "superblock: i am osd." << superblock.whoami << dendl;
+ dout(0) << "using " << op_queue << " op queue with priority op cut off at " <<
+ op_prio_cutoff << "." << dendl;
create_logger();
ShardData* sdata = shard_list[shard_index];
assert(NULL != sdata);
sdata->sdata_op_ordering_lock.Lock();
- if (sdata->pqueue.empty()) {
+ if (sdata->pqueue->empty()) {
sdata->sdata_op_ordering_lock.Unlock();
osd->cct->get_heartbeat_map()->reset_timeout(hb, 4, 0);
sdata->sdata_lock.Lock();
sdata->sdata_cond.WaitInterval(osd->cct, sdata->sdata_lock, utime_t(2, 0));
sdata->sdata_lock.Unlock();
sdata->sdata_op_ordering_lock.Lock();
- if(sdata->pqueue.empty()) {
+ if(sdata->pqueue->empty()) {
sdata->sdata_op_ordering_lock.Unlock();
return;
}
}
- pair<PGRef, PGQueueable> item = sdata->pqueue.dequeue();
+ pair<PGRef, PGQueueable> item = sdata->pqueue->dequeue();
sdata->pg_for_processing[&*(item.first)].push_back(item.second);
sdata->sdata_op_ordering_lock.Unlock();
ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval,
unsigned cost = item.second.get_cost();
sdata->sdata_op_ordering_lock.Lock();
- if (priority >= CEPH_MSG_PRIO_LOW)
- sdata->pqueue.enqueue_strict(
+ if (priority >= osd->op_prio_cutoff)
+ sdata->pqueue->enqueue_strict(
item.second.get_owner(), priority, item);
else
- sdata->pqueue.enqueue(
+ sdata->pqueue->enqueue(
item.second.get_owner(),
priority, cost, item);
sdata->sdata_op_ordering_lock.Unlock();
}
unsigned priority = item.second.get_priority();
unsigned cost = item.second.get_cost();
- if (priority >= CEPH_MSG_PRIO_LOW)
- sdata->pqueue.enqueue_strict_front(
+ if (priority >= osd->op_prio_cutoff)
+ sdata->pqueue->enqueue_strict_front(
item.second.get_owner(),
priority, item);
else
- sdata->pqueue.enqueue_front(
+ sdata->pqueue->enqueue_front(
item.second.get_owner(),
priority, cost, item);
#include "common/shared_cache.hpp"
#include "common/simple_cache.hpp"
#include "common/sharedptr_registry.hpp"
+#include "common/WeightedPriorityQueue.h"
#include "common/PrioritizedQueue.h"
+#include "common/OpQueue.h"
#include "messages/MOSDOp.h"
#include "include/Spinlock.h"
friend struct C_CompleteSplits;
// -- op queue --
+ enum io_queue {
+ prioritized,
+ weightedpriority};
+ const io_queue op_queue;
+ const unsigned int op_prio_cutoff;
friend class PGQueueable;
class ShardedOpWQ: public ShardedThreadPool::ShardedWQ < pair <PGRef, PGQueueable> > {
Cond sdata_cond;
Mutex sdata_op_ordering_lock;
map<PG*, list<PGQueueable> > pg_for_processing;
- PrioritizedQueue< pair<PGRef, PGQueueable>, entity_inst_t> pqueue;
+ std::unique_ptr<OpQueue< pair<PGRef, PGQueueable>, entity_inst_t>> pqueue;
ShardData(
string lock_name, string ordering_lock,
- uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct)
+ uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct,
+ io_queue opqueue)
: sdata_lock(lock_name.c_str(), false, true, false, cct),
- sdata_op_ordering_lock(ordering_lock.c_str(), false, true, false, cct),
- pqueue(max_tok_per_prio, min_cost) {}
+ sdata_op_ordering_lock(ordering_lock.c_str(), false, true, false, cct) {
+ if (opqueue == weightedpriority) {
+ pqueue = std::unique_ptr
+ <WeightedPriorityQueue< pair<PGRef, PGQueueable>, entity_inst_t>>(
+ new WeightedPriorityQueue< pair<PGRef, PGQueueable>, entity_inst_t>(
+ max_tok_per_prio, min_cost));
+ } else if (opqueue == prioritized) {
+ pqueue = std::unique_ptr
+ <PrioritizedQueue< pair<PGRef, PGQueueable>, entity_inst_t>>(
+ new PrioritizedQueue< pair<PGRef, PGQueueable>, entity_inst_t>(
+ max_tok_per_prio, min_cost));
+ }
+ }
};
vector<ShardData*> shard_list;
ShardData* one_shard = new ShardData(
lock_name, order_lock,
osd->cct->_conf->osd_op_pq_max_tokens_per_priority,
- osd->cct->_conf->osd_op_pq_min_cost, osd->cct);
+ osd->cct->_conf->osd_op_pq_min_cost, osd->cct, osd->op_queue);
shard_list.push_back(one_shard);
}
}
assert (NULL != sdata);
sdata->sdata_op_ordering_lock.Lock();
f->open_object_section(lock_name);
- sdata->pqueue.dump(f);
+ sdata->pqueue->dump(f);
f->close_section();
sdata->sdata_op_ordering_lock.Unlock();
}
sdata = shard_list[shard_index];
assert(sdata != NULL);
sdata->sdata_op_ordering_lock.Lock();
- sdata->pqueue.remove_by_filter(Pred(pg));
+ sdata->pqueue->remove_by_filter(Pred(pg), 0);
sdata->pg_for_processing.erase(pg);
sdata->sdata_op_ordering_lock.Unlock();
}
assert(dequeued);
list<pair<PGRef, PGQueueable> > _dequeued;
sdata->sdata_op_ordering_lock.Lock();
- sdata->pqueue.remove_by_filter(Pred(pg), &_dequeued);
+ sdata->pqueue->remove_by_filter(Pred(pg), &_dequeued);
for (list<pair<PGRef, PGQueueable> >::iterator i = _dequeued.begin();
i != _dequeued.end(); ++i) {
boost::optional<OpRequestRef> mop = i->second.maybe_get_op();
ShardData* sdata = shard_list[shard_index];
assert(NULL != sdata);
Mutex::Locker l(sdata->sdata_op_ordering_lock);
- return sdata->pqueue.empty();
+ return sdata->pqueue->empty();
}
} op_shardedwq;
bool ms_handle_reset(Connection *con);
void ms_handle_remote_reset(Connection *con) {}
+ io_queue get_io_queue() const {
+ if (cct->_conf->osd_op_queue == "debug_random") {
+ srand(time(NULL));
+ return (rand() % 2 < 1) ? prioritized : weightedpriority;
+ } else if (cct->_conf->osd_op_queue == "wpq") {
+ return weightedpriority;
+ } else {
+ return prioritized;
+ }
+ }
+
+ unsigned int get_io_prio_cut() const {
+ if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
+ srand(time(NULL));
+ return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
+ } else if (cct->_conf->osd_op_queue_cut_off == "low") {
+ return CEPH_MSG_PRIO_LOW;
+ } else {
+ return CEPH_MSG_PRIO_HIGH;
+ }
+ }
+
public:
/* internal and external can point to the same messenger, they will still
* be cleaned up properly*/