<< " in e" << m->get_map_epoch() << "/" << osdmap->get_epoch();
}
-void OSDService::enqueue_back(spg_t pgid, OpQueueItem&& qi)
+void OSDService::enqueue_back(OpQueueItem&& qi)
{
- osd->op_shardedwq.queue(make_pair(pgid, std::move(qi)));
+ osd->op_shardedwq.queue(std::move(qi));
}
-void OSDService::enqueue_front(spg_t pgid, OpQueueItem&& qi)
+void OSDService::enqueue_front(OpQueueItem&& qi)
{
- osd->op_shardedwq.queue_front(make_pair(pgid, std::move(qi)));
+ osd->op_shardedwq.queue_front(std::move(qi));
}
void OSDService::queue_for_peering(PG *pg)
void OSDService::queue_for_snap_trim(PG *pg)
{
+ class PGSnapTrim : public PGOpQueueable {
+ epoch_t epoch_queued;
+ public:
+ PGSnapTrim(
+ spg_t pg,
+ epoch_t epoch_queued)
+ : PGOpQueueable(pg), epoch_queued(epoch_queued) {}
+ op_type_t get_op_type() const override final {
+ return op_type_t::bg_snaptrim;
+ }
+ ostream &print(ostream &rhs) const override final {
+ return rhs << "PGSnapTrim(pgid=" << get_pgid()
+ << "epoch_queued=" << epoch_queued
+ << ")";
+ }
+ void run(
+ OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) override final {
+ pg->snap_trimmer(epoch_queued);
+ }
+ };
dout(10) << "queueing " << *pg << " for snaptrim" << dendl;
- osd->op_shardedwq.queue(
- make_pair(
- pg->pg_id,
- OpQueueItem(
- PGSnapTrim(pg->get_osdmap()->get_epoch()),
- cct->_conf->osd_snap_trim_cost,
- cct->_conf->osd_snap_trim_priority,
- ceph_clock_now(),
- 0,
- pg->get_osdmap()->get_epoch())));
+ enqueue_back(
+ OpQueueItem(
+ unique_ptr<OpQueueItem::OpQueueable>(
+ new PGSnapTrim(pg->pg_id, pg->get_osdmap()->get_epoch())),
+ cct->_conf->osd_snap_trim_cost,
+ cct->_conf->osd_snap_trim_priority,
+ ceph_clock_now(),
+ 0,
+ pg->get_osdmap()->get_epoch()));
}
+void OSDService::queue_for_scrub(PG *pg, bool with_high_priority)
+{
+ class PGScrub : public PGOpQueueable {
+ epoch_t epoch_queued;
+ public:
+ PGScrub(
+ spg_t pg,
+ epoch_t epoch_queued)
+ : PGOpQueueable(pg), epoch_queued(epoch_queued) {}
+ op_type_t get_op_type() const override final {
+ return op_type_t::bg_scrub;
+ }
+ ostream &print(ostream &rhs) const override final {
+ return rhs << "PGScrub(pgid=" << get_pgid()
+ << "epoch_queued=" << epoch_queued
+ << ")";
+ }
+ void run(
+ OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) override final {
+ pg->scrub(epoch_queued, handle);
+ }
+ };
+ unsigned scrub_queue_priority = pg->scrubber.priority;
+ if (with_high_priority && scrub_queue_priority < cct->_conf->osd_client_op_priority) {
+ scrub_queue_priority = cct->_conf->osd_client_op_priority;
+ }
+ const auto epoch = pg->get_osdmap()->get_epoch();
+ enqueue_back(
+ OpQueueItem(
+ unique_ptr<OpQueueItem::OpQueueable>(new PGScrub(pg->info.pgid, epoch)),
+ cct->_conf->osd_snap_trim_cost,
+ scrub_queue_priority,
+ ceph_clock_now(),
+ 0,
+ epoch));
+}
+
+void OSDService::_queue_for_recovery(
+ std::pair<epoch_t, PGRef> p,
+ uint64_t reserved_pushes)
+{
+ class PGRecovery : public PGOpQueueable {
+ epoch_t epoch_queued;
+ uint64_t reserved_pushes;
+ public:
+ PGRecovery(
+ spg_t pg,
+ epoch_t epoch_queued,
+ uint64_t reserved_pushes)
+ : PGOpQueueable(pg),
+ epoch_queued(epoch_queued),
+ reserved_pushes(reserved_pushes) {}
+ op_type_t get_op_type() const override final {
+ return op_type_t::bg_recovery;
+ }
+ virtual ostream &print(ostream &rhs) const override final {
+ return rhs << "PGRecovery(pgid=" << get_pgid()
+ << "epoch_queued=" << epoch_queued
+ << "reserved_pushes=" << reserved_pushes
+ << ")";
+ }
+ virtual uint64_t get_reserved_pushes() const override final {
+ return reserved_pushes;
+ }
+ virtual void run(
+ OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) override final {
+ osd->do_recovery(pg.get(), epoch_queued, reserved_pushes, handle);
+ }
+ };
+ assert(recovery_lock.is_locked_by_me());
+ enqueue_back(
+ OpQueueItem(
+ unique_ptr<OpQueueItem::OpQueueable>(
+ new PGRecovery(
+ p.second->info.pgid, p.first, reserved_pushes)),
+ cct->_conf->osd_recovery_cost,
+ cct->_conf->osd_recovery_priority,
+ ceph_clock_now(),
+ 0,
+ p.first));
+}
// ====================================================================
// OSD
op->osd_trace.keyval("cost", op->get_req()->get_cost());
op->mark_queued_for_pg();
logger->tinc(l_osd_op_before_queue_op_lat, latency);
- op_shardedwq.queue(make_pair(pg, OpQueueItem(op, epoch)));
+ op_shardedwq.queue(
+ OpQueueItem(
+ unique_ptr<OpQueueItem::OpQueueable>(new PGOpItem(pg, op)),
+ op->get_req()->get_cost(),
+ op->get_req()->get_priority(),
+ op->get_req()->get_recv_stamp(),
+ op->get_req()->get_source().num(),
+ epoch));
}
-
-
/*
* NOTE: dequeue called in worker thread, with pg lock
*/
for (auto i = p->second.to_process.rbegin();
i != p->second.to_process.rend();
++i) {
- sdata->_enqueue_front(make_pair(pgid, *i), osd->op_prio_cutoff);
+ sdata->_enqueue_front(std::move(*i), osd->op_prio_cutoff);
}
for (auto& q : p->second.to_process) {
pushes_to_free += q.get_reserved_pushes();
void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
{
uint32_t shard_index = thread_index % num_shards;
- ShardData *sdata = shard_list[shard_index];
- assert(NULL != sdata);
-
+ auto& sdata = shard_list[shard_index];
+ assert(sdata);
// peek at spg_t
sdata->sdata_op_ordering_lock.Lock();
if (sdata->pqueue->empty()) {
return;
}
}
- pair<spg_t, OpQueueItem> item = sdata->pqueue->dequeue();
+ OpQueueItem item = sdata->pqueue->dequeue();
if (osd->is_stopping()) {
sdata->sdata_op_ordering_lock.Unlock();
return; // OSD shutdown, discard.
}
PGRef pg;
uint64_t requeue_seq;
+ const auto token = item.get_ordering_token();
{
- auto& slot = sdata->pg_slots[item.first];
- dout(30) << __func__ << " " << item.first
+ auto& slot = sdata->pg_slots[token];
+ dout(30) << __func__ << " " << token
<< " to_process " << slot.to_process
<< " waiting_for_pg=" << (int)slot.waiting_for_pg << dendl;
- slot.to_process.push_back(item.second);
+ slot.to_process.push_back(std::move(item));
// note the requeue seq now...
requeue_seq = slot.requeue_seq;
if (slot.waiting_for_pg) {
// save ourselves a bit of effort
- dout(20) << __func__ << " " << item.first << " item " << item.second
+ dout(20) << __func__ << slot.to_process.back()
<< " queued, waiting_for_pg" << dendl;
sdata->sdata_op_ordering_lock.Unlock();
return;
}
pg = slot.pg;
- dout(20) << __func__ << " " << item.first << " item " << item.second
+ dout(20) << __func__ << " " << slot.to_process.back()
<< " queued" << dendl;
++slot.num_running;
}
// [lookup +] lock pg (if we have it)
if (!pg) {
- pg = osd->_lookup_lock_pg(item.first);
+ pg = osd->_lookup_lock_pg(token);
} else {
pg->lock();
}
osd->service.maybe_inject_dispatch_delay();
- boost::optional<OpQueueItem> qi;
-
// we don't use a Mutex::Locker here because of the
// osd->service.release_reserved_pushes() call below
sdata->sdata_op_ordering_lock.Lock();
- auto q = sdata->pg_slots.find(item.first);
+ auto q = sdata->pg_slots.find(token);
assert(q != sdata->pg_slots.end());
auto& slot = q->second;
--slot.num_running;
if (slot.to_process.empty()) {
// raced with wake_pg_waiters or prune_pg_waiters
- dout(20) << __func__ << " " << item.first << " nothing queued" << dendl;
+ dout(20) << __func__ << " " << token
+ << " nothing queued" << dendl;
if (pg) {
pg->unlock();
}
return;
}
if (requeue_seq != slot.requeue_seq) {
- dout(20) << __func__ << " " << item.first
+ dout(20) << __func__ << " " << token
<< " requeue_seq " << slot.requeue_seq << " > our "
<< requeue_seq << ", we raced with wake_pg_waiters"
<< dendl;
dout(20) << __func__ << " " << item.first << " set pg to " << pg << dendl;
slot.pg = pg;
}
- dout(30) << __func__ << " " << item.first << " to_process " << slot.to_process
+ dout(30) << __func__ << " " << token
+ << " to_process " << slot.to_process
<< " waiting_for_pg=" << (int)slot.waiting_for_pg << dendl;
// make sure we're not already waiting for this pg
if (slot.waiting_for_pg) {
- dout(20) << __func__ << " " << item.first << " item " << item.second
+ dout(20) << __func__ << " " << token
<< " slot is waiting_for_pg" << dendl;
if (pg) {
pg->unlock();
}
// take next item
- qi = slot.to_process.front();
+ auto qi = std::move(slot.to_process.front());
slot.to_process.pop_front();
- dout(20) << __func__ << " " << item.first << " item " << *qi
- << " pg " << pg << dendl;
+ dout(20) << __func__ << " " << qi << " pg " << pg << dendl;
if (!pg) {
// should this pg shard exist on this osd in this (or a later) epoch?
OSDMapRef osdmap = sdata->waiting_for_pg_osdmap;
- if (osdmap->is_up_acting_osd_shard(item.first, osd->whoami)) {
- dout(20) << __func__ << " " << item.first
- << " no pg, should exist, will wait on " << *qi << dendl;
- slot.to_process.push_front(*qi);
+ if (osdmap->is_up_acting_osd_shard(token,
+ osd->whoami)) {
+ dout(20) << __func__ << " " << token
+ << " no pg, should exist, will wait" << " on " << qi << dendl;
+ slot.to_process.push_front(std::move(qi));
slot.waiting_for_pg = true;
- } else if (qi->get_map_epoch() > osdmap->get_epoch()) {
- dout(20) << __func__ << " " << item.first << " no pg, item epoch is "
- << qi->get_map_epoch() << " > " << osdmap->get_epoch()
- << ", will wait on " << *qi << dendl;
- slot.to_process.push_front(*qi);
+ } else if (qi.get_map_epoch() > osdmap->get_epoch()) {
+ dout(20) << __func__ << " " << token
+ << " no pg, item epoch is "
+ << qi.get_map_epoch() << " > " << osdmap->get_epoch()
+ << ", will wait on " << qi << dendl;
+ slot.to_process.push_front(std::move(qi));
slot.waiting_for_pg = true;
} else {
- dout(20) << __func__ << " " << item.first << " no pg, shouldn't exist,"
- << " dropping " << *qi << dendl;
+ dout(20) << __func__ << " " << token
+ << " no pg, shouldn't exist,"
+ << " dropping " << qi << dendl;
// share map with client?
- if (boost::optional<OpRequestRef> _op = qi->maybe_get_op()) {
+ if (boost::optional<OpRequestRef> _op = qi.maybe_get_op()) {
Session *session = static_cast<Session *>(
(*_op)->get_req()->get_connection()->get_priv());
if (session) {
session->put();
}
}
- unsigned pushes_to_free = qi->get_reserved_pushes();
+ unsigned pushes_to_free = qi.get_reserved_pushes();
if (pushes_to_free > 0) {
sdata->sdata_op_ordering_lock.Unlock();
osd->service.release_reserved_pushes(pushes_to_free);
{
#ifdef WITH_LTTNG
osd_reqid_t reqid;
- if (boost::optional<OpRequestRef> _op = qi->maybe_get_op()) {
+ if (boost::optional<OpRequestRef> _op = qi.maybe_get_op()) {
reqid = (*_op)->get_reqid();
}
#endif
ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval,
suicide_interval);
- qi->run(osd, pg, tp_handle);
+ qi.run(osd, pg, tp_handle);
{
#ifdef WITH_LTTNG
osd_reqid_t reqid;
- if (boost::optional<OpRequestRef> _op = qi->maybe_get_op()) {
+ if (boost::optional<OpRequestRef> _op = qi.maybe_get_op()) {
reqid = (*_op)->get_reqid();
}
#endif
pg->unlock();
}
-void OSD::ShardedOpWQ::_enqueue(pair<spg_t, OpQueueItem>&& item) {
+void OSD::ShardedOpWQ::_enqueue(OpQueueItem&& item) {
uint32_t shard_index =
- item.first.hash_to_shard(shard_list.size());
+ item.get_ordering_token().hash_to_shard(shard_list.size());
ShardData* sdata = shard_list[shard_index];
assert (NULL != sdata);
- unsigned priority = item.second.get_priority();
- unsigned cost = item.second.get_cost();
+ unsigned priority = item.get_priority();
+ unsigned cost = item.get_cost();
sdata->sdata_op_ordering_lock.Lock();
- dout(20) << __func__ << " " << item.first << " " << item.second << dendl;
+ dout(20) << __func__ << " " << item << dendl;
if (priority >= osd->op_prio_cutoff)
sdata->pqueue->enqueue_strict(
- item.second.get_owner(), priority, std::move(item));
+ item.get_owner(), priority, std::move(item));
else
sdata->pqueue->enqueue(
- item.second.get_owner(),
- priority, cost, std::move(item));
+ item.get_owner(), priority, cost, std::move(item));
sdata->sdata_op_ordering_lock.Unlock();
sdata->sdata_lock.Lock();
}
-void OSD::ShardedOpWQ::_enqueue_front(pair<spg_t, OpQueueItem>&& item)
+void OSD::ShardedOpWQ::_enqueue_front(OpQueueItem&& item)
{
- uint32_t shard_index = item.first.hash_to_shard(shard_list.size());
- ShardData* sdata = shard_list[shard_index];
- assert (NULL != sdata);
+ auto shard_index = item.get_ordering_token().hash_to_shard(shard_list.size());
+ auto& sdata = shard_list[shard_index];
+ assert(sdata);
sdata->sdata_op_ordering_lock.Lock();
- auto p = sdata->pg_slots.find(item.first);
+ auto p = sdata->pg_slots.find(item.get_ordering_token());
if (p != sdata->pg_slots.end() && !p->second.to_process.empty()) {
// we may be racing with _process, which has dequeued a new item
// from pqueue, put it on to_process, and is now busy taking the
// pg lock. ensure this old requeued item is ordered before any
// such newer item in to_process.
- p->second.to_process.push_front(item.second);
- item.second = p->second.to_process.back();
+ p->second.to_process.push_front(std::move(item));
+ item = std::move(p->second.to_process.back());
p->second.to_process.pop_back();
- dout(20) << __func__ << " " << item.first
+ dout(20) << __func__
<< " " << p->second.to_process.front()
- << " shuffled w/ " << item.second << dendl;
+ << " shuffled w/ " << item << dendl;
} else {
- dout(20) << __func__ << " " << item.first << " " << item.second << dendl;
+ dout(20) << __func__ << " " << item << dendl;
}
sdata->_enqueue_front(std::move(item), osd->op_prio_cutoff);
sdata->sdata_op_ordering_lock.Unlock();
GenContextWQ recovery_gen_wq;
ClassHandler *&class_handler;
- void enqueue_back(spg_t pgid, OpQueueItem&& qi);
- void enqueue_front(spg_t pgid, OpQueueItem&& qi);
+ void enqueue_back(OpQueueItem&& qi);
+ void enqueue_front(OpQueueItem&& qi);
void maybe_inject_dispatch_delay() {
if (g_conf->osd_debug_inject_dispatch_delay_probability > 0) {
AsyncReserver<spg_t> snap_reserver;
void queue_for_snap_trim(PG *pg);
-
- void queue_for_scrub(PG *pg, bool with_high_priority) {
- unsigned scrub_queue_priority = pg->scrubber.priority;
- if (with_high_priority && scrub_queue_priority < cct->_conf->osd_client_op_priority) {
- scrub_queue_priority = cct->_conf->osd_client_op_priority;
- }
- enqueue_back(
- pg->get_pgid(),
- OpQueueItem(
- PGScrub(pg->get_osdmap()->get_epoch()),
- cct->_conf->osd_scrub_cost,
- scrub_queue_priority,
- ceph_clock_now(),
- 0,
- pg->get_osdmap()->get_epoch()));
- }
+ void queue_for_scrub(PG *pg, bool with_high_priority);
private:
// -- pg recovery and associated throttling --
bool _recover_now(uint64_t *available_pushes);
void _maybe_queue_recovery();
void _queue_for_recovery(
- pair<epoch_t, PGRef> p, uint64_t reserved_pushes) {
- assert(recovery_lock.is_locked_by_me());
- enqueue_back(
- p.second->get_pgid(),
- OpQueueItem(
- PGRecovery(p.first, reserved_pushes),
- cct->_conf->osd_recovery_cost,
- cct->_conf->osd_recovery_priority,
- ceph_clock_now(),
- 0,
- p.first));
- }
+ pair<epoch_t, PGRef> p, uint64_t reserved_pushes);
public:
void start_recovery_op(PG *pg, const hobject_t& soid);
void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue);
* wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
* and already requeued the items.
*/
- friend class OpQueueItem;
+ friend class PGOpItem;
class ShardedOpWQ
- : public ShardedThreadPool::ShardedWQ<pair<spg_t,OpQueueItem>>
+ : public ShardedThreadPool::ShardedWQ<OpQueueItem>
{
struct ShardData {
Mutex sdata_lock;
unordered_map<spg_t,pg_slot> pg_slots;
/// priority queue
- std::unique_ptr<OpQueue< pair<spg_t, OpQueueItem>, uint64_t>> pqueue;
+ std::unique_ptr<OpQueue<OpQueueItem, uint64_t>> pqueue;
- void _enqueue_front(pair<spg_t, OpQueueItem>&& item, unsigned cutoff) {
- unsigned priority = item.second.get_priority();
- unsigned cost = item.second.get_cost();
+ void _enqueue_front(OpQueueItem&& item, unsigned cutoff) {
+ unsigned priority = item.get_priority();
+ unsigned cost = item.get_cost();
if (priority >= cutoff)
pqueue->enqueue_strict_front(
- item.second.get_owner(),
+ item.get_owner(),
priority, std::move(item));
else
pqueue->enqueue_front(
- item.second.get_owner(),
+ item.get_owner(),
priority, cost, std::move(item));
}
sdata_op_ordering_lock(ordering_lock.c_str(), false, true,
false, cct) {
if (opqueue == io_queue::weightedpriority) {
- pqueue = std::unique_ptr
- <WeightedPriorityQueue<pair<spg_t,OpQueueItem>,uint64_t>>(
- new WeightedPriorityQueue<pair<spg_t,OpQueueItem>,uint64_t>(
- max_tok_per_prio, min_cost));
+ pqueue = ceph::make_unique<
+ WeightedPriorityQueue<OpQueueItem,uint64_t>>(
+ max_tok_per_prio, min_cost);
} else if (opqueue == io_queue::prioritized) {
- pqueue = std::unique_ptr
- <PrioritizedQueue<pair<spg_t,OpQueueItem>,uint64_t>>(
- new PrioritizedQueue<pair<spg_t,OpQueueItem>,uint64_t>(
- max_tok_per_prio, min_cost));
+ pqueue = ceph::make_unique<
+ PrioritizedQueue<OpQueueItem,uint64_t>>(
+ max_tok_per_prio, min_cost);
} else if (opqueue == io_queue::mclock_opclass) {
pqueue = ceph::make_unique<ceph::mClockOpClassQueue>(cct);
} else if (opqueue == io_queue::mclock_client) {
time_t ti,
time_t si,
ShardedThreadPool* tp)
- : ShardedThreadPool::ShardedWQ<pair<spg_t,OpQueueItem>>(ti, si, tp),
+ : ShardedThreadPool::ShardedWQ<OpQueueItem>(ti, si, tp),
osd(o),
num_shards(pnum_shards) {
for (uint32_t i = 0; i < num_shards; i++) {
void _process(uint32_t thread_index, heartbeat_handle_d *hb) override;
/// enqueue a new item
- void _enqueue(pair <spg_t, OpQueueItem>&& item) override;
+ void _enqueue(OpQueueItem&& item) override;
/// requeue an old item (at the front of the line)
- void _enqueue_front(pair <spg_t, OpQueueItem>&& item) override;
+ void _enqueue_front(OpQueueItem&& item) override;
void return_waiting_threads() override {
for(uint32_t i = 0; i < num_shards; i++) {
void dump(Formatter *f) {
for(uint32_t i = 0; i < num_shards; i++) {
- ShardData* sdata = shard_list[i];
- char lock_name[32] = {0};
- snprintf(lock_name, sizeof(lock_name), "%s%d", "OSD:ShardedOpWQ:", i);
- assert (NULL != sdata);
+ auto &&sdata = shard_list[i];
+
+ char queue_name[32] = {0};
+ snprintf(queue_name, sizeof(queue_name), "%s%d", "OSD:ShardedOpWQ:", i);
+ assert(NULL != sdata);
+
sdata->sdata_op_ordering_lock.Lock();
- f->open_object_section(lock_name);
+ f->open_object_section(queue_name);
sdata->pqueue->dump(f);
f->close_section();
sdata->sdata_op_ordering_lock.Unlock();
out_ops->push_front(*mop);
}
}
- bool operator()(const pair<spg_t, OpQueueItem> &op) {
- if (op.first == pgid) {
- accumulate(op.second);
+ bool operator()(const OpQueueItem &op) {
+ if (op.get_ordering_token() == pgid) {
+ accumulate(op);
return true;
} else {
return false;
bool is_shard_empty(uint32_t thread_index) override {
uint32_t shard_index = thread_index % num_shards;
- ShardData* sdata = shard_list[shard_index];
- assert(NULL != sdata);
+ auto &&sdata = shard_list[shard_index];
+ assert(sdata);
Mutex::Locker l(sdata->sdata_op_ordering_lock);
return sdata->pqueue->empty();
}
*
*/
-
-#include "PG.h"
#include "OpQueueItem.h"
#include "OSD.h"
-
-void OpQueueItem::RunVis::operator()(const OpRequestRef &op) {
+void PGOpItem::run(OSD *osd,
+ PGRef& pg,
+ ThreadPool::TPHandle &handle)
+{
osd->dequeue_op(pg, op, handle);
}
-
-void OpQueueItem::RunVis::operator()(const PGSnapTrim &op) {
- pg->snap_trimmer(op.epoch_queued);
-}
-
-void OpQueueItem::RunVis::operator()(const PGScrub &op) {
- pg->scrub(op.epoch_queued, handle);
-}
-
-void OpQueueItem::RunVis::operator()(const PGRecovery &op) {
- osd->do_recovery(pg.get(), op.epoch_queued, op.reserved_pushes, handle);
-}
}
};
-
class OpQueueItem {
- typedef boost::variant<
- OpRequestRef,
- PGSnapTrim,
- PGScrub,
- PGRecovery
- > QVariant;
- QVariant qvariant;
- int cost;
- unsigned priority;
- utime_t start_time;
- uint64_t owner; ///< global id (e.g., client.XXX)
- epoch_t map_epoch; ///< an epoch we expect the PG to exist in
-
- struct RunVis : public boost::static_visitor<> {
- OSD *osd;
- PGRef &pg;
- ThreadPool::TPHandle &handle;
- RunVis(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle)
- : osd(osd), pg(pg), handle(handle) {}
- void operator()(const OpRequestRef &op);
- void operator()(const PGSnapTrim &op);
- void operator()(const PGScrub &op);
- void operator()(const PGRecovery &op);
- }; // struct RunVis
-
- struct StringifyVis : public boost::static_visitor<std::string> {
- std::string operator()(const OpRequestRef &op) {
- return stringify(op);
- }
- std::string operator()(const PGSnapTrim &op) {
- return "PGSnapTrim";
+public:
+ class OrderLocker {
+ public:
+ using Ref = unique_ptr<OrderLocker>;
+ virtual void lock() = 0;
+ virtual void unlock() = 0;
+ virtual ~OrderLocker() {}
+ };
+ // Abstraction for operations queueable in the op queue
+ class OpQueueable {
+ public:
+ enum class op_type_t {
+ client_op,
+ osd_subop,
+ bg_snaptrim,
+ bg_recovery,
+ bg_scrub
+ };
+ using Ref = std::unique_ptr<OpQueueable>;
+
+ /// Items with the same queue token will end up in the same shard
+ virtual uint32_t get_queue_token() const = 0;
+
+ /* Items will be dequeued and locked atomically w.r.t. other items with the
+ * same ordering token */
+ virtual const spg_t& get_ordering_token() const = 0;
+ virtual OrderLocker::Ref get_order_locker(PGRef pg) = 0;
+ virtual op_type_t get_op_type() const = 0;
+ virtual boost::optional<OpRequestRef> maybe_get_op() const {
+ return boost::none;
}
- std::string operator()(const PGScrub &op) {
- return "PGScrub";
+
+ virtual uint64_t get_reserved_pushes() const {
+ return 0;
}
- std::string operator()(const PGRecovery &op) {
- return "PGRecovery";
+
+ virtual ostream &print(ostream &rhs) const = 0;
+
+ virtual void run(OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) = 0;
+ virtual ~OpQueueable() {}
+ friend ostream& operator<<(ostream& out, const OpQueueable& q) {
+ return q.print(out);
}
};
- friend ostream& operator<<(ostream& out, const OpQueueItem& q) {
- StringifyVis v;
- return out << "OpQueueItem(" << boost::apply_visitor(v, q.qvariant)
- << " prio " << q.priority << " cost " << q.cost
- << " e" << q.map_epoch << ")";
- }
+private:
+ OpQueueable::Ref qitem;
+ int cost;
+ unsigned priority;
+ utime_t start_time;
+ uint64_t owner; ///< global id (e.g., client.XXX)
+ epoch_t map_epoch; ///< an epoch we expect the PG to exist in
public:
-
- OpQueueItem(OpRequestRef op, epoch_t e)
- : qvariant(op), cost(op->get_req()->get_cost()),
- priority(op->get_req()->get_priority()),
- start_time(op->get_req()->get_recv_stamp()),
- owner(op->get_req()->get_source().num()),
- map_epoch(e)
- {}
OpQueueItem(
- const PGSnapTrim &op, int cost, unsigned priority, utime_t start_time,
- uint64_t owner, epoch_t e)
- : qvariant(op), cost(cost), priority(priority), start_time(start_time),
- owner(owner), map_epoch(e) {}
- OpQueueItem(
- const PGScrub &op, int cost, unsigned priority, utime_t start_time,
- uint64_t owner, epoch_t e)
- : qvariant(op), cost(cost), priority(priority), start_time(start_time),
- owner(owner), map_epoch(e) {}
- OpQueueItem(
- const PGRecovery &op, int cost, unsigned priority, utime_t start_time,
- uint64_t owner, epoch_t e)
- : qvariant(op), cost(cost), priority(priority), start_time(start_time),
- owner(owner), map_epoch(e) {}
-
- const boost::optional<OpRequestRef> maybe_get_op() const {
- const OpRequestRef *op = boost::get<OpRequestRef>(&qvariant);
- return op ? OpRequestRef(*op) : boost::optional<OpRequestRef>();
+ OpQueueable::Ref &&item,
+ int cost,
+ unsigned priority,
+ utime_t start_time,
+ uint64_t owner,
+ epoch_t e)
+ : qitem(std::move(item)),
+ cost(cost),
+ priority(priority),
+ start_time(start_time),
+ owner(owner),
+ map_epoch(e)
+ {}
+ OpQueueItem(OpQueueItem &&) = default;
+ OpQueueItem(const OpQueueItem &) = delete;
+ OpQueueItem &operator=(OpQueueItem &&) = default;
+ OpQueueItem &operator=(const OpQueueItem &) = delete;
+
+ OrderLocker::Ref get_order_locker(PGRef pg) {
+ return qitem->get_order_locker(pg);
+ }
+ uint32_t get_queue_token() const {
+ return qitem->get_queue_token();
+ }
+ const spg_t& get_ordering_token() const {
+ return qitem->get_ordering_token();
+ }
+ using op_type_t = OpQueueable::op_type_t;
+ OpQueueable::op_type_t get_op_type() const {
+ return qitem->get_op_type();
+ }
+ boost::optional<OpRequestRef> maybe_get_op() const {
+ return qitem->maybe_get_op();
}
uint64_t get_reserved_pushes() const {
- const PGRecovery *op = boost::get<PGRecovery>(&qvariant);
- return op ? op->reserved_pushes : 0;
+ return qitem->get_reserved_pushes();
}
- void run(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) {
- RunVis v(osd, pg, handle);
- boost::apply_visitor(v, qvariant);
+ void run(OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) {
+ qitem->run(osd, pg, handle);
}
unsigned get_priority() const { return priority; }
int get_cost() const { return cost; }
utime_t get_start_time() const { return start_time; }
uint64_t get_owner() const { return owner; }
epoch_t get_map_epoch() const { return map_epoch; }
- const QVariant& get_variant() const { return qvariant; }
-}; // struct OpQueueItem
+
+ friend ostream& operator<<(ostream& out, const OpQueueItem& item) {
+ return out << "OpQueueItem("
+ << item.get_ordering_token() << " " << *item.qitem
+ << " prio " << item.get_priority()
+ << " cost " << item.get_cost()
+ << " e" << item.get_map_epoch() << ")";
+ }
+};
+
+/// Implements boilerplate for operations queued for the pg lock
+class PGOpQueueable : public OpQueueItem::OpQueueable {
+ spg_t pgid;
+protected:
+ const spg_t& get_pgid() const {
+ return pgid;
+ }
+public:
+ PGOpQueueable(spg_t pg) : pgid(pg) {}
+ uint32_t get_queue_token() const override final {
+ return get_pgid().ps();
+ }
+
+ const spg_t& get_ordering_token() const override final {
+ return get_pgid();
+ }
+
+ OpQueueItem::OrderLocker::Ref get_order_locker(PGRef pg) override final {
+ class Locker : public OpQueueItem::OrderLocker {
+ PGRef pg;
+ public:
+ Locker(PGRef pg) : pg(pg) {}
+ void lock() override final {
+ pg->lock();
+ }
+ void unlock() override final {
+ pg->unlock();
+ }
+ };
+ return OpQueueItem::OrderLocker::Ref(
+ new Locker(pg));
+ }
+};
+
+class PGOpItem : public PGOpQueueable {
+ OpRequestRef op;
+public:
+ PGOpItem(spg_t pg, OpRequestRef op) : PGOpQueueable(pg), op(op) {}
+ op_type_t get_op_type() const override final {
+ return op_type_t::client_op;
+ }
+ ostream &print(ostream &rhs) const override final {
+ return rhs << "PGOpItem(op=" << *(op->get_req()) << ")";
+ }
+ boost::optional<OpRequestRef> maybe_get_op() const override final {
+ return op;
+ }
+ void run(OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) override final;
+};
p->second.push_front(op);
} else {
dout(20) << __func__ << " " << op << dendl;
- osd->enqueue_front(info.pgid, OpQueueItem(op, get_osdmap()->get_epoch()));
+ osd->enqueue_front(
+ OpQueueItem(
+ unique_ptr<OpQueueItem::OpQueueable>(new PGOpItem(info.pgid, op)),
+ op->get_req()->get_cost(),
+ op->get_req()->get_priority(),
+ op->get_req()->get_recv_stamp(),
+ op->get_req()->get_source_inst(),
+ get_osdmap()->get_epoch()));
}
}
for (list<OpRequestRef>::reverse_iterator i = ls.rbegin();
i != ls.rend();
++i) {
- auto p = waiting_for_map.find((*i)->get_source());
- if (p != waiting_for_map.end()) {
- dout(20) << __func__ << " " << *i << " (waiting_for_map " << p->first
- << ")" << dendl;
- p->second.push_front(*i);
- } else {
- dout(20) << __func__ << " " << *i << dendl;
- osd->enqueue_front(info.pgid, OpQueueItem(*i, get_osdmap()->get_epoch()));
- }
+ requeue_op(*i);
}
ls.clear();
}
} else {
dout(20) << __func__ << " " << p->first << " " << p->second << dendl;
for (auto q = p->second.rbegin(); q != p->second.rend(); ++q) {
- osd->enqueue_front(info.pgid, OpQueueItem(*q, epoch));
+ auto req = *q;
+ osd->enqueue_front(OpQueueItem(
+ unique_ptr<OpQueueItem::OpQueueable>(new PGOpItem(info.pgid, req)),
+ req->get_req()->get_cost(),
+ req->get_req()->get_priority(),
+ req->get_req()->get_recv_stamp(),
+ req->get_req()->get_source_inst(),
+ epoch));
}
p = waiting_for_map.erase(p);
}
if (scrubber.active_rep_scrub) {
if (last_update_applied >= static_cast<const MOSDRepScrub*>(
scrubber.active_rep_scrub->get_req())->scrub_to) {
+ auto& op = scrubber.active_rep_scrub;
osd->enqueue_back(
- info.pgid,
- OpQueueItem(scrubber.active_rep_scrub, get_osdmap()->get_epoch()));
+ OpQueueItem(
+ unique_ptr<OpQueueItem::OpQueueable>(new PGOpItem(info.pgid, op)),
+ op->get_req()->get_cost(),
+ op->get_req()->get_priority(),
+ op->get_req()->get_recv_stamp(),
+ op->get_req()->get_source_inst(),
+ get_osdmap()->get_epoch()));
scrubber.active_rep_scrub = OpRequestRef();
}
}
if (!deleting && active_pushes == 0 &&
scrubber.active_rep_scrub && static_cast<const MOSDRepScrub*>(
scrubber.active_rep_scrub->get_req())->chunky) {
+ auto& op = scrubber.active_rep_scrub;
osd->enqueue_back(
- info.pgid,
- OpQueueItem(scrubber.active_rep_scrub, get_osdmap()->get_epoch()));
- scrubber.active_rep_scrub = OpRequestRef();
+ OpQueueItem(
+ unique_ptr<OpQueueItem::OpQueueable>(new PGOpItem(info.pgid, op)),
+ op->get_req()->get_cost(),
+ op->get_req()->get_priority(),
+ op->get_req()->get_recv_stamp(),
+ op->get_req()->get_source_inst(),
+ get_osdmap()->get_epoch()));
+ scrubber.active_rep_scrub.reset();
}
unlock();
}
mClockClientQueue::osd_op_type_t
mClockClientQueue::get_osd_op_type(const Request& request) {
- osd_op_type_t type =
- boost::apply_visitor(pg_queueable_visitor, request.second.get_variant());
-
+ switch (request.get_op_type()) {
// if we got client_op back then we need to distinguish between
// a client op and an osd subop.
-
- if (osd_op_type_t::client_op != type) {
- return type;
- /* fixme: this should match REPOP and probably others
- } else if (MSG_OSD_SUBOP ==
- boost::get<OpRequestRef>(
- request.second.get_variant())->get_req()->get_header().type) {
- return osd_op_type_t::osd_subop;
- */
- } else {
+ case OpQueueItem::op_type_t::client_op:
return osd_op_type_t::client_op;
+ case OpQueueItem::op_type_t::osd_subop:
+ return osd_op_type_t::osd_subop;
+ case OpQueueItem::op_type_t::bg_snaptrim:
+ return osd_op_type_t::bg_snaptrim;
+ case OpQueueItem::op_type_t::bg_recovery:
+ return osd_op_type_t::bg_recovery;
+ case OpQueueItem::op_type_t::bg_scrub:
+ return osd_op_type_t::bg_scrub;
+ default:
+ assert(0);
}
}
namespace ceph {
- using Request = std::pair<spg_t, OpQueueItem>;
+ using Request = OpQueueItem;
using Client = uint64_t;
// This class exists to bridge the ceph code, which treats the class
std::list<Request> *out) override final {
queue.remove_by_filter(
[&cl, out] (Request&& r) -> bool {
- if (cl == r.second.get_owner()) {
+ if (cl == r.get_owner()) {
out->push_front(std::move(r));
return true;
} else {
mClockOpClassQueue::osd_op_type_t
mClockOpClassQueue::get_osd_op_type(const Request& request) {
- osd_op_type_t type =
- boost::apply_visitor(pg_queueable_visitor, request.second.get_variant());
-
+ switch (request.get_op_type()) {
// if we got client_op back then we need to distinguish between
// a client op and an osd subop.
-
- if (osd_op_type_t::client_op != type) {
- return type;
- /* fixme: this should match REPOP and probably others
- } else if (MSG_OSD_SUBOP ==
- boost::get<OpRequestRef>(
- request.second.get_variant())->get_req()->get_header().type) {
- return osd_op_type_t::osd_subop;
- */
- } else {
+ case OpQueueItem::op_type_t::client_op:
return osd_op_type_t::client_op;
+ case OpQueueItem::op_type_t::osd_subop:
+ return osd_op_type_t::osd_subop;
+ case OpQueueItem::op_type_t::bg_snaptrim:
+ return osd_op_type_t::bg_snaptrim;
+ case OpQueueItem::op_type_t::bg_recovery:
+ return osd_op_type_t::bg_recovery;
+ case OpQueueItem::op_type_t::bg_scrub:
+ return osd_op_type_t::bg_scrub;
+ default:
+ assert(0);
}
}
namespace ceph {
- using Request = std::pair<spg_t, OpQueueItem>;
+ using Request = OpQueueItem;
using Client = uint64_t;
// This class exists to bridge the ceph code, which treats the class
std::list<Request> *out) override final {
queue.remove_by_filter(
[&cl, out] (Request&& r) -> bool {
- if (cl == r.second.get_owner()) {
+ if (cl == r.get_owner()) {
out->push_front(std::move(r));
return true;
} else {