return *_dout << "osd." << whoami << " " << epoch << " ";
}
+void PGQueueable::RunVis::operator()(OpRequestRef &op) {
+ return osd->dequeue_op(pg, op, handle);
+}
+
//Initial features in new superblock.
//Features here are also automatically upgraded
CompatSet OSD::get_osd_initial_compat_set() {
void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued)
{
- osd->op_shardedwq.dequeue(pg, dequeued);
+ if (dequeued)
+ osd->op_shardedwq.dequeue_and_get_ops(pg, dequeued);
+ else
+ osd->op_shardedwq.dequeue(pg);
}
void OSDService::queue_for_peering(PG *pg)
return;
}
}
- pair<PGRef, OpRequestRef> item = sdata->pqueue.dequeue();
+ pair<PGRef, PGQueueable> item = sdata->pqueue.dequeue();
sdata->pg_for_processing[&*(item.first)].push_back(item.second);
sdata->sdata_op_ordering_lock.Unlock();
ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval,
(item.first)->lock_suspend_timeout(tp_handle);
- OpRequestRef op;
+ boost::optional<PGQueueable> op;
{
Mutex::Locker l(sdata->sdata_op_ordering_lock);
if (!sdata->pg_for_processing.count(&*(item.first))) {
// and will begin to be handled by a worker thread.
{
#ifdef WITH_LTTNG
- osd_reqid_t reqid = op->get_reqid();
+ osd_reqid_t reqid;
+ if (boost::optional<OpRequestRef> _op = op->maybe_get_op()) {
+ reqid = (*_op)->get_reqid();
+ }
#endif
tracepoint(osd, opwq_process_start, reqid.name._type,
reqid.name._num, reqid.tid, reqid.inc);
delete f;
*_dout << dendl;
- osd->dequeue_op(item.first, op, tp_handle);
+ op->run(osd, item.first, tp_handle);
{
#ifdef WITH_LTTNG
- osd_reqid_t reqid = op->get_reqid();
+ osd_reqid_t reqid;
+ if (boost::optional<OpRequestRef> _op = op->maybe_get_op()) {
+ reqid = (*_op)->get_reqid();
+ }
#endif
tracepoint(osd, opwq_process_finish, reqid.name._type,
reqid.name._num, reqid.tid, reqid.inc);
(item.first)->unlock();
}
-void OSD::ShardedOpWQ::_enqueue(pair<PGRef, OpRequestRef> item) {
+void OSD::ShardedOpWQ::_enqueue(pair<PGRef, PGQueueable> item) {
uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size());
ShardData* sdata = shard_list[shard_index];
assert (NULL != sdata);
- unsigned priority = item.second->get_req()->get_priority();
- unsigned cost = item.second->get_req()->get_cost();
+ unsigned priority = item.second.get_priority();
+ unsigned cost = item.second.get_cost();
sdata->sdata_op_ordering_lock.Lock();
if (priority >= CEPH_MSG_PRIO_LOW)
sdata->pqueue.enqueue_strict(
- item.second->get_req()->get_source_inst(), priority, item);
+ item.second.get_owner(), priority, item);
else
- sdata->pqueue.enqueue(item.second->get_req()->get_source_inst(),
+ sdata->pqueue.enqueue(
+ item.second.get_owner(),
priority, cost, item);
sdata->sdata_op_ordering_lock.Unlock();
}
-void OSD::ShardedOpWQ::_enqueue_front(pair<PGRef, OpRequestRef> item) {
+void OSD::ShardedOpWQ::_enqueue_front(pair<PGRef, PGQueueable> item) {
uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size());
item.second = sdata->pg_for_processing[&*(item.first)].back();
sdata->pg_for_processing[&*(item.first)].pop_back();
}
- unsigned priority = item.second->get_req()->get_priority();
- unsigned cost = item.second->get_req()->get_cost();
+ unsigned priority = item.second.get_priority();
+ unsigned cost = item.second.get_cost();
if (priority >= CEPH_MSG_PRIO_LOW)
sdata->pqueue.enqueue_strict_front(
- item.second->get_req()->get_source_inst(),priority, item);
+ item.second.get_owner(),
+ priority, item);
else
- sdata->pqueue.enqueue_front(item.second->get_req()->get_source_inst(),
+ sdata->pqueue.enqueue_front(
+ item.second.get_owner(),
priority, cost, item);
sdata->sdata_op_ordering_lock.Unlock();
typedef ceph::shared_ptr<DeletingState> DeletingStateRef;
class OSD;
+class PGQueueable {
+ typedef boost::variant<
+ OpRequestRef
+ > QVariant;
+ QVariant qvariant;
+ int cost;
+ unsigned priority;
+ utime_t start_time;
+ entity_inst_t owner;
+ struct RunVis : public boost::static_visitor<> {
+ OSD *osd;
+ PGRef &pg;
+ ThreadPool::TPHandle &handle;
+ RunVis(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle)
+ : osd(osd), pg(pg), handle(handle) {}
+ void operator()(OpRequestRef &op);
+ };
+public:
+ PGQueueable(OpRequestRef op)
+ : qvariant(op), cost(op->get_req()->get_cost()),
+ priority(op->get_req()->get_priority()),
+ start_time(op->get_req()->get_recv_stamp()),
+ owner(op->get_req()->get_source_inst())
+ {}
+ boost::optional<OpRequestRef> maybe_get_op() {
+ OpRequestRef *op = boost::get<OpRequestRef>(&qvariant);
+ return op ? *op : boost::optional<OpRequestRef>();
+ }
+ void run(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) {
+ RunVis v(osd, pg, handle);
+ boost::apply_visitor(v, qvariant);
+ }
+ unsigned get_priority() const { return priority; }
+ int get_cost() const { return cost; }
+ utime_t get_start_time() const { return start_time; }
+ entity_inst_t get_owner() const { return owner; }
+};
+
class OSDService {
public:
OSD *osd;
PerfCounters *&logger;
PerfCounters *&recoverystate_perf;
MonClient *&monc;
- ShardedThreadPool::ShardedWQ < pair <PGRef, OpRequestRef> > &op_wq;
+ ShardedThreadPool::ShardedWQ < pair <PGRef, PGQueueable> > &op_wq;
ThreadPool::BatchWorkQueue<PG> &peering_wq;
ThreadPool::WorkQueue<PG> &recovery_wq;
ThreadPool::WorkQueue<PG> &snap_trim_wq;
// -- op queue --
-
- class ShardedOpWQ: public ShardedThreadPool::ShardedWQ < pair <PGRef, OpRequestRef> > {
+ friend class PGQueueable;
+ class ShardedOpWQ: public ShardedThreadPool::ShardedWQ < pair <PGRef, PGQueueable> > {
struct ShardData {
Mutex sdata_lock;
Cond sdata_cond;
Mutex sdata_op_ordering_lock;
- map<PG*, list<OpRequestRef> > pg_for_processing;
- PrioritizedQueue< pair<PGRef, OpRequestRef>, entity_inst_t> pqueue;
+ map<PG*, list<PGQueueable> > pg_for_processing;
+ PrioritizedQueue< pair<PGRef, PGQueueable>, entity_inst_t> pqueue;
ShardData(
string lock_name, string ordering_lock,
uint64_t max_tok_per_prio, uint64_t min_cost)
public:
ShardedOpWQ(uint32_t pnum_shards, OSD *o, time_t ti, time_t si, ShardedThreadPool* tp):
- ShardedThreadPool::ShardedWQ < pair <PGRef, OpRequestRef> >(ti, si, tp),
+ ShardedThreadPool::ShardedWQ < pair <PGRef, PGQueueable> >(ti, si, tp),
osd(o), num_shards(pnum_shards) {
for(uint32_t i = 0; i < num_shards; i++) {
char lock_name[32] = {0};
}
void _process(uint32_t thread_index, heartbeat_handle_d *hb);
- void _enqueue(pair <PGRef, OpRequestRef> item);
- void _enqueue_front(pair <PGRef, OpRequestRef> item);
+ void _enqueue(pair <PGRef, PGQueueable> item);
+ void _enqueue_front(pair <PGRef, PGQueueable> item);
void return_waiting_threads() {
for(uint32_t i = 0; i < num_shards; i++) {
struct Pred {
PG *pg;
Pred(PG *pg) : pg(pg) {}
- bool operator()(const pair<PGRef, OpRequestRef> &op) {
+ bool operator()(const pair<PGRef, PGQueueable> &op) {
return op.first == pg;
}
};
- void dequeue(PG *pg, list<OpRequestRef> *dequeued = 0) {
+ void dequeue(PG *pg) {
ShardData* sdata = NULL;
assert(pg != NULL);
uint32_t shard_index = pg->get_pgid().ps()% shard_list.size();
sdata = shard_list[shard_index];
assert(sdata != NULL);
- if (!dequeued) {
- sdata->sdata_op_ordering_lock.Lock();
- sdata->pqueue.remove_by_filter(Pred(pg));
- sdata->pg_for_processing.erase(pg);
- sdata->sdata_op_ordering_lock.Unlock();
- } else {
- list<pair<PGRef, OpRequestRef> > _dequeued;
- sdata->sdata_op_ordering_lock.Lock();
- sdata->pqueue.remove_by_filter(Pred(pg), &_dequeued);
- for (list<pair<PGRef, OpRequestRef> >::iterator i = _dequeued.begin();
- i != _dequeued.end(); ++i) {
- dequeued->push_back(i->second);
- }
- if (sdata->pg_for_processing.count(pg)) {
- dequeued->splice(
- dequeued->begin(),
- sdata->pg_for_processing[pg]);
- sdata->pg_for_processing.erase(pg);
+ sdata->sdata_op_ordering_lock.Lock();
+ sdata->pqueue.remove_by_filter(Pred(pg));
+ sdata->pg_for_processing.erase(pg);
+ sdata->sdata_op_ordering_lock.Unlock();
+ }
+
+ void dequeue_and_get_ops(PG *pg, list<OpRequestRef> *dequeued) {
+ ShardData* sdata = NULL;
+ assert(pg != NULL);
+ uint32_t shard_index = pg->get_pgid().ps()% shard_list.size();
+ sdata = shard_list[shard_index];
+ assert(sdata != NULL);
+ assert(dequeued);
+ list<pair<PGRef, PGQueueable> > _dequeued;
+ sdata->sdata_op_ordering_lock.Lock();
+ sdata->pqueue.remove_by_filter(Pred(pg), &_dequeued);
+ for (list<pair<PGRef, PGQueueable> >::iterator i = _dequeued.begin();
+ i != _dequeued.end(); ++i) {
+ boost::optional<OpRequestRef> mop = i->second.maybe_get_op();
+ if (mop)
+ dequeued->push_back(*mop);
+ }
+ map<PG *, list<PGQueueable> >::iterator iter =
+ sdata->pg_for_processing.find(pg);
+ if (iter != sdata->pg_for_processing.end()) {
+ for (list<PGQueueable>::reverse_iterator i = iter->second.rbegin();
+ i != iter->second.rend();
+ ++i) {
+ boost::optional<OpRequestRef> mop = i->maybe_get_op();
+ if (mop)
+ dequeued->push_front(*mop);
}
- sdata->sdata_op_ordering_lock.Unlock();
+ sdata->pg_for_processing.erase(iter);
}
+ sdata->sdata_op_ordering_lock.Unlock();
}
bool is_shard_empty(uint32_t thread_index) {