From: Samuel Just Date: Wed, 11 Sep 2019 20:42:44 +0000 (-0700) Subject: src/osd: replace OpQueue abstraction in osd with Scheduler X-Git-Tag: v15.1.0~1180^2~3 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=17a05cb15b7d69db744299efa230a95cc8efb6c8;p=ceph-ci.git src/osd: replace OpQueue abstraction in osd with Scheduler OpQueue is overkill for mclock based schedulers. The interface doesn't need to externalize the _strict modifiers, the scheduler can figure that out from the item itself. Introduce simpler Scheduler interface and add an adapter for the existing OpQueue based implementations. Signed-off-by: Samuel Just --- diff --git a/src/ceph_osd.cc b/src/ceph_osd.cc index b24ca32ab02..3e03e863239 100644 --- a/src/ceph_osd.cc +++ b/src/ceph_osd.cc @@ -68,12 +68,12 @@ TracepointProvider::Traits cyg_profile_traits("libcyg_profile_tp.so", } // anonymous namespace -OSD *osd = nullptr; +OSD *osdptr = nullptr; void handle_osd_signal(int signum) { - if (osd) - osd->handle_signal(signum); + if (osdptr) + osdptr->handle_signal(signum); } static void usage() @@ -673,21 +673,21 @@ flushjournal_out: forker.exit(1); } - osd = new OSD(g_ceph_context, - store, - whoami, - ms_cluster, - ms_public, - ms_hb_front_client, - ms_hb_back_client, - ms_hb_front_server, - ms_hb_back_server, - ms_objecter, - &mc, - data_path, - journal_path); - - int err = osd->pre_init(); + osdptr = new OSD(g_ceph_context, + store, + whoami, + ms_cluster, + ms_public, + ms_hb_front_client, + ms_hb_back_client, + ms_hb_front_server, + ms_hb_back_server, + ms_objecter, + &mc, + data_path, + journal_path); + + int err = osdptr->pre_init(); if (err < 0) { derr << TEXT_RED << " ** ERROR: osd pre_init failed: " << cpp_strerror(-err) << TEXT_NORMAL << dendl; @@ -703,7 +703,7 @@ flushjournal_out: ms_objecter->start(); // start osd - err = osd->init(); + err = osdptr->init(); if (err < 0) { derr << TEXT_RED << " ** ERROR: osd init failed: " << cpp_strerror(-err) << TEXT_NORMAL << dendl; @@ -721,7 +721,7 @@ flushjournal_out: register_async_signal_handler_oneshot(SIGINT, handle_osd_signal); register_async_signal_handler_oneshot(SIGTERM, handle_osd_signal); - osd->final_init(); + osdptr->final_init(); if (g_conf().get_val("inject_early_sigterm")) kill(getpid(), SIGTERM); @@ -740,7 +740,7 @@ flushjournal_out: shutdown_async_signal_handler(); // done - delete osd; + delete osdptr; delete ms_public; delete ms_hb_front_client; delete ms_hb_back_client; diff --git a/src/common/OpQueue.h b/src/common/OpQueue.h index 451130e5389..0204f4b4403 100644 --- a/src/common/OpQueue.h +++ b/src/common/OpQueue.h @@ -63,6 +63,9 @@ public: // Formatted output of the queue virtual void dump(ceph::Formatter *f) const = 0; + // Human readable brief description of queue and relevant parameters + virtual void print(std::ostream &f) const = 0; + // Don't leak resources on destruction virtual ~OpQueue() {}; }; diff --git a/src/common/PrioritizedQueue.h b/src/common/PrioritizedQueue.h index 6d7de1291f6..9adf21aafe1 100644 --- a/src/common/PrioritizedQueue.h +++ b/src/common/PrioritizedQueue.h @@ -343,6 +343,10 @@ public: } f->close_section(); } + + void print(std::ostream &ostream) const final { + ostream << "PrioritizedQueue"; + } }; #endif diff --git a/src/common/WeightedPriorityQueue.h b/src/common/WeightedPriorityQueue.h index a05174e8185..cf34709b979 100644 --- a/src/common/WeightedPriorityQueue.h +++ b/src/common/WeightedPriorityQueue.h @@ -344,6 +344,10 @@ class WeightedPriorityQueue : public OpQueue normal.dump(f); f->close_section(); } + + void print(std::ostream &ostream) const final { + ostream << "WeightedPriorityQueue"; + } }; #endif diff --git a/src/common/mClockPriorityQueue.h b/src/common/mClockPriorityQueue.h index ae4259207a3..c1f9f3c2517 100644 --- a/src/common/mClockPriorityQueue.h +++ b/src/common/mClockPriorityQueue.h @@ -360,6 +360,10 @@ namespace ceph { f->dump_int("size", queue.request_count()); f->close_section(); } // dump + + void print(std::ostream &os) const final { + os << "mClockPriorityQueue"; + } }; } // namespace ceph diff --git a/src/osd/CMakeLists.txt b/src/osd/CMakeLists.txt index 3d7950ce5cc..3cc60752999 100644 --- a/src/osd/CMakeLists.txt +++ b/src/osd/CMakeLists.txt @@ -32,7 +32,8 @@ set(osd_srcs mClockOpClassSupport.cc mClockOpClassQueue.cc mClockClientQueue.cc - OpQueueItem.cc + scheduler/OpScheduler.cc + scheduler/OpSchedulerItem.cc PeeringState.cc PGStateUtils.cc MissingLoc.cc diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index dbf1e754f70..1de0cf47e1a 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -180,6 +180,7 @@ #undef dout_prefix #define dout_prefix _prefix(_dout, whoami, get_osdmap_epoch()) +using namespace ceph::osd::scheduler; static ostream& _prefix(std::ostream* _dout, int whoami, epoch_t epoch) { return *_dout << "osd." << whoami << " " << epoch << " "; @@ -1673,12 +1674,12 @@ void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op) << " in e" << m->get_map_epoch() << "/" << osdmap->get_epoch(); } -void OSDService::enqueue_back(OpQueueItem&& qi) +void OSDService::enqueue_back(OpSchedulerItem&& qi) { osd->op_shardedwq.queue(std::move(qi)); } -void OSDService::enqueue_front(OpQueueItem&& qi) +void OSDService::enqueue_front(OpSchedulerItem&& qi) { osd->op_shardedwq.queue_front(std::move(qi)); } @@ -1689,8 +1690,8 @@ void OSDService::queue_recovery_context( { epoch_t e = get_osdmap_epoch(); enqueue_back( - OpQueueItem( - unique_ptr( + OpSchedulerItem( + unique_ptr( new PGRecoveryContext(pg->get_pgid(), c, e)), cct->_conf->osd_recovery_cost, cct->_conf->osd_recovery_priority, @@ -1703,8 +1704,8 @@ void OSDService::queue_for_snap_trim(PG *pg) { dout(10) << "queueing " << *pg << " for snaptrim" << dendl; enqueue_back( - OpQueueItem( - unique_ptr( + OpSchedulerItem( + unique_ptr( new PGSnapTrim(pg->get_pgid(), pg->get_osdmap_epoch())), cct->_conf->osd_snap_trim_cost, cct->_conf->osd_snap_trim_priority, @@ -1721,8 +1722,8 @@ void OSDService::queue_for_scrub(PG *pg, bool with_high_priority) } const auto epoch = pg->get_osdmap_epoch(); enqueue_back( - OpQueueItem( - unique_ptr(new PGScrub(pg->get_pgid(), epoch)), + OpSchedulerItem( + unique_ptr(new PGScrub(pg->get_pgid(), epoch)), cct->_conf->osd_scrub_cost, scrub_queue_priority, ceph_clock_now(), @@ -1734,8 +1735,8 @@ void OSDService::queue_for_pg_delete(spg_t pgid, epoch_t e) { dout(10) << __func__ << " on " << pgid << " e " << e << dendl; enqueue_back( - OpQueueItem( - unique_ptr( + OpSchedulerItem( + unique_ptr( new PGDelete(pgid, e)), cct->_conf->osd_pg_delete_cost, cct->_conf->osd_pg_delete_priority, @@ -1888,8 +1889,8 @@ void OSDService::_queue_for_recovery( { ceph_assert(ceph_mutex_is_locked_by_me(recovery_lock)); enqueue_back( - OpQueueItem( - unique_ptr( + OpSchedulerItem( + unique_ptr( new PGRecovery( p.second->get_pgid(), p.first, reserved_pushes)), cct->_conf->osd_recovery_cost, @@ -2148,8 +2149,6 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_, op_tracker(cct, cct->_conf->osd_enable_op_tracker, cct->_conf->osd_num_op_tracker_shard), test_ops_hook(NULL), - op_queue(get_io_queue()), - op_prio_cutoff(get_io_prio_cut()), op_shardedwq( this, cct->_conf->osd_op_thread_timeout, @@ -2199,10 +2198,7 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_, OSDShard *one_shard = new OSDShard( i, cct, - this, - cct->_conf->osd_op_pq_max_tokens_per_priority, - cct->_conf->osd_op_pq_min_cost, - op_queue); + this); shards.push_back(one_shard); } } @@ -3453,8 +3449,6 @@ int OSD::init() load_pgs(); dout(2) << "superblock: I am osd." << superblock.whoami << dendl; - dout(0) << "using " << op_queue << " op queue with priority op cut off at " << - op_prio_cutoff << "." << dendl; create_logger(); @@ -9533,8 +9527,8 @@ void OSD::enqueue_op(spg_t pg, OpRequestRef&& op, epoch_t epoch) op->mark_queued_for_pg(); logger->tinc(l_osd_op_before_queue_op_lat, latency); op_shardedwq.queue( - OpQueueItem( - unique_ptr(new PGOpItem(pg, std::move(op))), + OpSchedulerItem( + unique_ptr(new PGOpItem(pg, std::move(op))), cost, priority, stamp, owner, epoch)); } @@ -9542,8 +9536,8 @@ void OSD::enqueue_peering_evt(spg_t pgid, PGPeeringEventRef evt) { dout(15) << __func__ << " " << pgid << " " << evt->get_desc() << dendl; op_shardedwq.queue( - OpQueueItem( - unique_ptr(new PGPeeringItem(pgid, evt)), + OpSchedulerItem( + unique_ptr(new PGPeeringItem(pgid, evt)), 10, cct->_conf->osd_peering_op_priority, utime_t(), @@ -9555,8 +9549,8 @@ void OSD::enqueue_peering_evt_front(spg_t pgid, PGPeeringEventRef evt) { dout(15) << __func__ << " " << pgid << " " << evt->get_desc() << dendl; op_shardedwq.queue_front( - OpQueueItem( - unique_ptr(new PGPeeringItem(pgid, evt)), + OpSchedulerItem( + unique_ptr(new PGPeeringItem(pgid, evt)), 10, cct->_conf->osd_peering_op_priority, utime_t(), @@ -10272,13 +10266,13 @@ void OSDShard::_wake_pg_slot( for (auto i = slot->to_process.rbegin(); i != slot->to_process.rend(); ++i) { - _enqueue_front(std::move(*i), osd->op_prio_cutoff); + scheduler->enqueue_front(std::move(*i)); } slot->to_process.clear(); for (auto i = slot->waiting.rbegin(); i != slot->waiting.rend(); ++i) { - _enqueue_front(std::move(*i), osd->op_prio_cutoff); + scheduler->enqueue_front(std::move(*i)); } slot->waiting.clear(); for (auto i = slot->waiting_peering.rbegin(); @@ -10288,7 +10282,7 @@ void OSDShard::_wake_pg_slot( // items are waiting for maps we don't have yet. FIXME, maybe, // someday, if we decide this inefficiency matters for (auto j = i->second.rbegin(); j != i->second.rend(); ++j) { - _enqueue_front(std::move(*j), osd->op_prio_cutoff); + scheduler->enqueue_front(std::move(*j)); } } slot->waiting_peering.clear(); @@ -10480,6 +10474,26 @@ void OSDShard::unprime_split_children(spg_t parent, unsigned old_pg_num) } } +OSDShard::OSDShard( + int id, + CephContext *cct, + OSD *osd) + : shard_id(id), + cct(cct), + osd(osd), + shard_name(string("OSDShard.") + stringify(id)), + sdata_wait_lock_name(shard_name + "::sdata_wait_lock"), + sdata_wait_lock{make_mutex(sdata_wait_lock_name)}, + osdmap_lock_name(shard_name + "::osdmap_lock"), + osdmap_lock{make_mutex(osdmap_lock_name)}, + shard_lock_name(shard_name + "::shard_lock"), + shard_lock{make_mutex(shard_lock_name)}, + scheduler(ceph::osd::scheduler::make_scheduler(cct)), + context_queue(sdata_wait_lock, sdata_cond) +{ + dout(0) << "using op scheduler " << *scheduler << dendl; +} + // ============================================================= @@ -10491,7 +10505,7 @@ void OSDShard::unprime_split_children(spg_t parent, unsigned old_pg_num) void OSD::ShardedOpWQ::_add_slot_waiter( spg_t pgid, OSDShardPGSlot *slot, - OpQueueItem&& qi) + OpSchedulerItem&& qi) { if (qi.is_peering()) { dout(20) << __func__ << " " << pgid @@ -10525,7 +10539,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) // peek at spg_t sdata->shard_lock.lock(); - if (sdata->pqueue->empty() && + if (sdata->scheduler->empty() && (!is_smallest_thread_index || sdata->context_queue.empty())) { std::unique_lock wait_lock{sdata->sdata_wait_lock}; if (is_smallest_thread_index && !sdata->context_queue.empty()) { @@ -10538,7 +10552,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) sdata->sdata_cond.wait(wait_lock); wait_lock.unlock(); sdata->shard_lock.lock(); - if (sdata->pqueue->empty() && + if (sdata->scheduler->empty() && !(is_smallest_thread_index && !sdata->context_queue.empty())) { sdata->shard_lock.unlock(); return; @@ -10558,7 +10572,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) sdata->context_queue.move_to(oncommits); } - if (sdata->pqueue->empty()) { + if (sdata->scheduler->empty()) { if (osd->is_stopping()) { sdata->shard_lock.unlock(); for (auto c : oncommits) { @@ -10572,7 +10586,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) return; } - OpQueueItem item = sdata->pqueue->dequeue(); + OpSchedulerItem item = sdata->scheduler->dequeue(); if (osd->is_stopping()) { sdata->shard_lock.unlock(); for (auto c : oncommits) { @@ -10805,25 +10819,21 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) handle_oncommits(oncommits); } -void OSD::ShardedOpWQ::_enqueue(OpQueueItem&& item) { +void OSD::ShardedOpWQ::_enqueue(OpSchedulerItem&& item) { uint32_t shard_index = item.get_ordering_token().hash_to_shard(osd->shards.size()); + dout(20) << __func__ << " " << item << dendl; + OSDShard* sdata = osd->shards[shard_index]; assert (NULL != sdata); - unsigned priority = item.get_priority(); - unsigned cost = item.get_cost(); - sdata->shard_lock.lock(); - dout(20) << __func__ << " " << item << dendl; - bool empty = sdata->pqueue->empty(); - if (priority >= osd->op_prio_cutoff) - sdata->pqueue->enqueue_strict( - item.get_owner(), priority, std::move(item)); - else - sdata->pqueue->enqueue( - item.get_owner(), priority, cost, std::move(item)); - sdata->shard_lock.unlock(); + bool empty = true; + { + std::lock_guard l{sdata->shard_lock}; + empty = sdata->scheduler->empty(); + sdata->scheduler->enqueue(std::move(item)); + } if (empty) { std::lock_guard l{sdata->sdata_wait_lock}; @@ -10831,7 +10841,7 @@ void OSD::ShardedOpWQ::_enqueue(OpQueueItem&& item) { } } -void OSD::ShardedOpWQ::_enqueue_front(OpQueueItem&& item) +void OSD::ShardedOpWQ::_enqueue_front(OpSchedulerItem&& item) { auto shard_index = item.get_ordering_token().hash_to_shard(osd->shards.size()); auto& sdata = osd->shards[shard_index]; @@ -10841,7 +10851,7 @@ void OSD::ShardedOpWQ::_enqueue_front(OpQueueItem&& item) if (p != sdata->pg_slots.end() && !p->second->to_process.empty()) { // we may be racing with _process, which has dequeued a new item - // from pqueue, put it on to_process, and is now busy taking the + // from scheduler, put it on to_process, and is now busy taking the // pg lock. ensure this old requeued item is ordered before any // such newer item in to_process. p->second->to_process.push_front(std::move(item)); @@ -10853,7 +10863,7 @@ void OSD::ShardedOpWQ::_enqueue_front(OpQueueItem&& item) } else { dout(20) << __func__ << " " << item << dendl; } - sdata->_enqueue_front(std::move(item), osd->op_prio_cutoff); + sdata->scheduler->enqueue_front(std::move(item)); sdata->shard_lock.unlock(); std::lock_guard l{sdata->sdata_wait_lock}; sdata->sdata_cond.notify_one(); @@ -10890,22 +10900,3 @@ int heap(CephContext& cct, const cmdmap_t& cmdmap, Formatter& f, } }} // namespace ceph::osd_cmds - - -std::ostream& operator<<(std::ostream& out, const io_queue& q) { - switch(q) { - case io_queue::prioritized: - out << "prioritized"; - break; - case io_queue::weightedpriority: - out << "weightedpriority"; - break; - case io_queue::mclock_opclass: - out << "mclock_opclass"; - break; - case io_queue::mclock_client: - out << "mclock_client"; - break; - } - return out; -} diff --git a/src/osd/OSD.h b/src/osd/OSD.h index eeef8c5d206..7713c208f8a 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -41,7 +41,7 @@ #include "OpRequest.h" #include "Session.h" -#include "osd/OpQueueItem.h" +#include "osd/scheduler/OpScheduler.h" #include #include @@ -55,8 +55,6 @@ #include "common/sharedptr_registry.hpp" #include "common/WeightedPriorityQueue.h" #include "common/PrioritizedQueue.h" -#include "osd/mClockOpClassQueue.h" -#include "osd/mClockClientQueue.h" #include "messages/MOSDOp.h" #include "common/EventTrace.h" #include "osd/osd_perf_counters.h" @@ -105,6 +103,7 @@ class MMonGetPurgedSnapsReply; class OSD; class OSDService { + using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem; public: OSD *osd; CephContext *cct; @@ -125,8 +124,8 @@ public: md_config_cacher_t osd_max_object_size; md_config_cacher_t osd_skip_data_digest; - void enqueue_back(OpQueueItem&& qi); - void enqueue_front(OpQueueItem&& qi); + void enqueue_back(OpSchedulerItem&& qi); + void enqueue_front(OpSchedulerItem&& qi); void maybe_inject_dispatch_delay() { if (g_conf()->osd_debug_inject_dispatch_delay_probability > 0) { @@ -903,15 +902,6 @@ public: ~OSDService(); }; - -enum class io_queue { - prioritized, - weightedpriority, - mclock_opclass, - mclock_client, -}; - - /* Each PG slot includes queues for events that are processing and/or waiting @@ -941,7 +931,7 @@ enum class io_queue { don't affect the given PG.) - we maintain two separate wait lists, *waiting* and *waiting_peering*. The - OpQueueItem has an is_peering() bool to determine which we use. Waiting + OpSchedulerItem has an is_peering() bool to determine which we use. Waiting peering events are queued up by epoch required. - when we wake a PG slot (e.g., we finished split, or got a newer osdmap, or @@ -963,14 +953,15 @@ enum class io_queue { */ struct OSDShardPGSlot { + using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem; PGRef pg; ///< pg reference - deque to_process; ///< order items for this slot + deque to_process; ///< order items for this slot int num_running = 0; ///< _process threads doing pg lookup/lock - deque waiting; ///< waiting for pg (or map + pg) + deque waiting; ///< waiting for pg (or map + pg) /// waiting for map (peering evt) - map> waiting_peering; + map> waiting_peering; /// incremented by wake_pg_waiters; indicates racing _process threads /// should bail out (their op has been requeued) @@ -1010,7 +1001,7 @@ struct OSDShard { ceph::mutex shard_lock; ///< protects remaining members below /// map of slots for each spg_t. maintains ordering of items dequeued - /// from pqueue while _process thread drops shard lock to acquire the + /// from scheduler while _process thread drops shard lock to acquire the /// pg lock. stale slots are removed by consume_map. unordered_map> pg_slots; @@ -1032,25 +1023,12 @@ struct OSDShard { ceph::condition_variable min_pg_epoch_cond; /// priority queue - std::unique_ptr> pqueue; + ceph::osd::scheduler::OpSchedulerRef scheduler; bool stop_waiting = false; ContextQueue context_queue; - void _enqueue_front(OpQueueItem&& item, unsigned cutoff) { - unsigned priority = item.get_priority(); - unsigned cost = item.get_cost(); - if (priority >= cutoff) - pqueue->enqueue_strict_front( - item.get_owner(), - priority, std::move(item)); - else - pqueue->enqueue_front( - item.get_owner(), - priority, cost, std::move(item)); - } - void _attach_pg(OSDShardPGSlot *slot, PG *pg); void _detach_pg(OSDShardPGSlot *slot); @@ -1083,38 +1061,13 @@ struct OSDShard { OSDShard( int id, CephContext *cct, - OSD *osd, - uint64_t max_tok_per_prio, uint64_t min_cost, - io_queue opqueue) - : shard_id(id), - cct(cct), - osd(osd), - shard_name(string("OSDShard.") + stringify(id)), - sdata_wait_lock_name(shard_name + "::sdata_wait_lock"), - sdata_wait_lock{make_mutex(sdata_wait_lock_name)}, - osdmap_lock_name(shard_name + "::osdmap_lock"), - osdmap_lock{make_mutex(osdmap_lock_name)}, - shard_lock_name(shard_name + "::shard_lock"), - shard_lock{make_mutex(shard_lock_name)}, - context_queue(sdata_wait_lock, sdata_cond) { - if (opqueue == io_queue::weightedpriority) { - pqueue = std::make_unique< - WeightedPriorityQueue>( - max_tok_per_prio, min_cost); - } else if (opqueue == io_queue::prioritized) { - pqueue = std::make_unique< - PrioritizedQueue>( - max_tok_per_prio, min_cost); - } else if (opqueue == io_queue::mclock_opclass) { - pqueue = std::make_unique(cct); - } else if (opqueue == io_queue::mclock_client) { - pqueue = std::make_unique(cct); - } - } + OSD *osd); }; class OSD : public Dispatcher, public md_config_obs_t { + using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem; + /** OSD **/ // global lock ceph::mutex osd_lock = ceph::make_mutex("OSD::osd_lock"); @@ -1591,24 +1544,18 @@ private: friend struct C_FinishSplits; friend struct C_OpenPGs; - // -- op queue -- - friend std::ostream& operator<<(std::ostream& out, const io_queue& q); - - const io_queue op_queue; -public: - const unsigned int op_prio_cutoff; protected: /* * The ordered op delivery chain is: * - * fast dispatch -> pqueue back - * pqueue front <-> to_process back + * fast dispatch -> scheduler back + * scheduler front <-> to_process back * to_process front -> RunVis(item) * <- queue_front() * - * The pqueue is per-shard, and to_process is per pg_slot. Items can be - * pushed back up into to_process and/or pqueue while order is preserved. + * The scheduler is per-shard, and to_process is per pg_slot. Items can be + * pushed back up into to_process and/or scheduler while order is preserved. * * Multiple worker threads can operate on each shard. * @@ -1619,13 +1566,13 @@ protected: * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg * and already requeued the items. */ - friend class PGOpItem; - friend class PGPeeringItem; - friend class PGRecovery; - friend class PGDelete; + friend class ceph::osd::scheduler::PGOpItem; + friend class ceph::osd::scheduler::PGPeeringItem; + friend class ceph::osd::scheduler::PGRecovery; + friend class ceph::osd::scheduler::PGDelete; class ShardedOpWQ - : public ShardedThreadPool::ShardedWQ + : public ShardedThreadPool::ShardedWQ { OSD *osd; @@ -1634,23 +1581,23 @@ protected: time_t ti, time_t si, ShardedThreadPool* tp) - : ShardedThreadPool::ShardedWQ(ti, si, tp), + : ShardedThreadPool::ShardedWQ(ti, si, tp), osd(o) { } void _add_slot_waiter( spg_t token, OSDShardPGSlot *slot, - OpQueueItem&& qi); + OpSchedulerItem&& qi); /// try to do some work void _process(uint32_t thread_index, heartbeat_handle_d *hb) override; /// enqueue a new item - void _enqueue(OpQueueItem&& item) override; + void _enqueue(OpSchedulerItem&& item) override; /// requeue an old item (at the front of the line) - void _enqueue_front(OpQueueItem&& item) override; + void _enqueue_front(OpSchedulerItem&& item) override; void return_waiting_threads() override { for(uint32_t i = 0; i < osd->num_shards; i++) { @@ -1681,7 +1628,7 @@ protected: std::scoped_lock l{sdata->shard_lock}; f->open_object_section(queue_name); - sdata->pqueue->dump(f); + sdata->scheduler->dump(*f); f->close_section(); } } @@ -1692,9 +1639,9 @@ protected: ceph_assert(sdata); std::lock_guard l(sdata->shard_lock); if (thread_index < osd->num_shards) { - return sdata->pqueue->empty() && sdata->context_queue.empty(); + return sdata->scheduler->empty() && sdata->context_queue.empty(); } else { - return sdata->pqueue->empty(); + return sdata->scheduler->empty(); } } @@ -2041,39 +1988,6 @@ private: void ms_handle_remote_reset(Connection *con) override {} bool ms_handle_refused(Connection *con) override; - io_queue get_io_queue() const { - if (cct->_conf->osd_op_queue == "debug_random") { - static io_queue index_lookup[] = { io_queue::prioritized, - io_queue::weightedpriority, - io_queue::mclock_opclass, - io_queue::mclock_client }; - srand(time(NULL)); - unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0])); - return index_lookup[which]; - } else if (cct->_conf->osd_op_queue == "prioritized") { - return io_queue::prioritized; - } else if (cct->_conf->osd_op_queue == "mclock_opclass") { - return io_queue::mclock_opclass; - } else if (cct->_conf->osd_op_queue == "mclock_client") { - return io_queue::mclock_client; - } else { - // default / catch-all is 'wpq' - return io_queue::weightedpriority; - } - } - - unsigned int get_io_prio_cut() const { - if (cct->_conf->osd_op_queue_cut_off == "debug_random") { - srand(time(NULL)); - return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW; - } else if (cct->_conf->osd_op_queue_cut_off == "high") { - return CEPH_MSG_PRIO_HIGH; - } else { - // default / catch-all is 'low' - return CEPH_MSG_PRIO_LOW; - } - } - public: /* internal and external can point to the same messenger, they will still * be cleaned up properly*/ @@ -2172,9 +2086,6 @@ private: }; -std::ostream& operator<<(std::ostream& out, const io_queue& q); - - //compatibility of the executable extern const CompatSet::Feature ceph_osd_feature_compat[]; extern const CompatSet::Feature ceph_osd_feature_ro_compat[]; diff --git a/src/osd/OpQueueItem.cc b/src/osd/OpQueueItem.cc deleted file mode 100644 index 1deb1e7a03e..00000000000 --- a/src/osd/OpQueueItem.cc +++ /dev/null @@ -1,84 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2016 Red Hat Inc. - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include "OpQueueItem.h" -#include "OSD.h" - -void PGOpItem::run( - OSD *osd, - OSDShard *sdata, - PGRef& pg, - ThreadPool::TPHandle &handle) -{ - osd->dequeue_op(pg, op, handle); - pg->unlock(); -} - -void PGPeeringItem::run( - OSD *osd, - OSDShard *sdata, - PGRef& pg, - ThreadPool::TPHandle &handle) -{ - osd->dequeue_peering_evt(sdata, pg.get(), evt, handle); -} - -void PGSnapTrim::run( - OSD *osd, - OSDShard *sdata, - PGRef& pg, - ThreadPool::TPHandle &handle) -{ - pg->snap_trimmer(epoch_queued); - pg->unlock(); -} - -void PGScrub::run( - OSD *osd, - OSDShard *sdata, - PGRef& pg, - ThreadPool::TPHandle &handle) -{ - pg->scrub(epoch_queued, handle); - pg->unlock(); -} - -void PGRecovery::run( - OSD *osd, - OSDShard *sdata, - PGRef& pg, - ThreadPool::TPHandle &handle) -{ - osd->do_recovery(pg.get(), epoch_queued, reserved_pushes, handle); - pg->unlock(); -} - -void PGRecoveryContext::run( - OSD *osd, - OSDShard *sdata, - PGRef& pg, - ThreadPool::TPHandle &handle) -{ - c.release()->complete(handle); - pg->unlock(); -} - -void PGDelete::run( - OSD *osd, - OSDShard *sdata, - PGRef& pg, - ThreadPool::TPHandle &handle) -{ - osd->dequeue_delete(sdata, pg.get(), epoch_queued, handle); -} diff --git a/src/osd/OpQueueItem.h b/src/osd/OpQueueItem.h deleted file mode 100644 index 0680e0bd7e7..00000000000 --- a/src/osd/OpQueueItem.h +++ /dev/null @@ -1,342 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2016 Red Hat Inc. - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#pragma once - -#include - -#include "include/types.h" -#include "include/utime.h" -#include "osd/OpRequest.h" -#include "osd/PG.h" -#include "PGPeeringEvent.h" - -class OSD; -class OSDShard; - -class OpQueueItem { -public: - class OrderLocker { - public: - using Ref = unique_ptr; - virtual void lock() = 0; - virtual void unlock() = 0; - virtual ~OrderLocker() {} - }; - // Abstraction for operations queueable in the op queue - class OpQueueable { - public: - enum class op_type_t { - client_op, - peering_event, - bg_snaptrim, - bg_recovery, - bg_scrub, - bg_pg_delete - }; - using Ref = std::unique_ptr; - - /// Items with the same queue token will end up in the same shard - virtual uint32_t get_queue_token() const = 0; - - /* Items will be dequeued and locked atomically w.r.t. other items with the - * same ordering token */ - virtual const spg_t& get_ordering_token() const = 0; - virtual OrderLocker::Ref get_order_locker(PGRef pg) = 0; - virtual op_type_t get_op_type() const = 0; - virtual std::optional maybe_get_op() const { - return std::nullopt; - } - - virtual uint64_t get_reserved_pushes() const { - return 0; - } - - virtual bool is_peering() const { - return false; - } - virtual bool peering_requires_pg() const { - ceph_abort(); - } - virtual const PGCreateInfo *creates_pg() const { - return nullptr; - } - - virtual ostream &print(ostream &rhs) const = 0; - - virtual void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) = 0; - virtual ~OpQueueable() {} - friend ostream& operator<<(ostream& out, const OpQueueable& q) { - return q.print(out); - } - - }; - -private: - OpQueueable::Ref qitem; - int cost; - unsigned priority; - utime_t start_time; - uint64_t owner; ///< global id (e.g., client.XXX) - epoch_t map_epoch; ///< an epoch we expect the PG to exist in - -public: - OpQueueItem( - OpQueueable::Ref &&item, - int cost, - unsigned priority, - utime_t start_time, - uint64_t owner, - epoch_t e) - : qitem(std::move(item)), - cost(cost), - priority(priority), - start_time(start_time), - owner(owner), - map_epoch(e) - {} - OpQueueItem(OpQueueItem &&) = default; - OpQueueItem(const OpQueueItem &) = delete; - OpQueueItem &operator=(OpQueueItem &&) = default; - OpQueueItem &operator=(const OpQueueItem &) = delete; - - OrderLocker::Ref get_order_locker(PGRef pg) { - return qitem->get_order_locker(pg); - } - uint32_t get_queue_token() const { - return qitem->get_queue_token(); - } - const spg_t& get_ordering_token() const { - return qitem->get_ordering_token(); - } - using op_type_t = OpQueueable::op_type_t; - OpQueueable::op_type_t get_op_type() const { - return qitem->get_op_type(); - } - std::optional maybe_get_op() const { - return qitem->maybe_get_op(); - } - uint64_t get_reserved_pushes() const { - return qitem->get_reserved_pushes(); - } - void run(OSD *osd, OSDShard *sdata,PGRef& pg, ThreadPool::TPHandle &handle) { - qitem->run(osd, sdata, pg, handle); - } - unsigned get_priority() const { return priority; } - int get_cost() const { return cost; } - utime_t get_start_time() const { return start_time; } - uint64_t get_owner() const { return owner; } - epoch_t get_map_epoch() const { return map_epoch; } - - bool is_peering() const { - return qitem->is_peering(); - } - - const PGCreateInfo *creates_pg() const { - return qitem->creates_pg(); - } - - bool peering_requires_pg() const { - return qitem->peering_requires_pg(); - } - - friend ostream& operator<<(ostream& out, const OpQueueItem& item) { - out << "OpQueueItem(" - << item.get_ordering_token() << " " << *item.qitem - << " prio " << item.get_priority() - << " cost " << item.get_cost() - << " e" << item.get_map_epoch(); - if (item.get_reserved_pushes()) { - out << " reserved_pushes " << item.get_reserved_pushes(); - } - return out << ")"; - } -}; // class OpQueueItem - -/// Implements boilerplate for operations queued for the pg lock -class PGOpQueueable : public OpQueueItem::OpQueueable { - spg_t pgid; -protected: - const spg_t& get_pgid() const { - return pgid; - } -public: - explicit PGOpQueueable(spg_t pg) : pgid(pg) {} - uint32_t get_queue_token() const override final { - return get_pgid().ps(); - } - - const spg_t& get_ordering_token() const override final { - return get_pgid(); - } - - OpQueueItem::OrderLocker::Ref get_order_locker(PGRef pg) override final { - class Locker : public OpQueueItem::OrderLocker { - PGRef pg; - public: - explicit Locker(PGRef pg) : pg(pg) {} - void lock() override final { - pg->lock(); - } - void unlock() override final { - pg->unlock(); - } - }; - return OpQueueItem::OrderLocker::Ref( - new Locker(pg)); - } -}; - -class PGOpItem : public PGOpQueueable { - OpRequestRef op; -public: - PGOpItem(spg_t pg, OpRequestRef op) : PGOpQueueable(pg), op(std::move(op)) {} - op_type_t get_op_type() const override final { - return op_type_t::client_op; - } - ostream &print(ostream &rhs) const override final { - return rhs << "PGOpItem(op=" << *(op->get_req()) << ")"; - } - std::optional maybe_get_op() const override final { - return op; - } - void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final; -}; - -class PGPeeringItem : public PGOpQueueable { - PGPeeringEventRef evt; -public: - PGPeeringItem(spg_t pg, PGPeeringEventRef e) : PGOpQueueable(pg), evt(e) {} - op_type_t get_op_type() const override final { - return op_type_t::peering_event; - } - ostream &print(ostream &rhs) const override final { - return rhs << "PGPeeringEvent(" << evt->get_desc() << ")"; - } - void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final; - bool is_peering() const override { - return true; - } - bool peering_requires_pg() const override { - return evt->requires_pg; - } - const PGCreateInfo *creates_pg() const override { - return evt->create_info.get(); - } -}; - -class PGSnapTrim : public PGOpQueueable { - epoch_t epoch_queued; -public: - PGSnapTrim( - spg_t pg, - epoch_t epoch_queued) - : PGOpQueueable(pg), epoch_queued(epoch_queued) {} - op_type_t get_op_type() const override final { - return op_type_t::bg_snaptrim; - } - ostream &print(ostream &rhs) const override final { - return rhs << "PGSnapTrim(pgid=" << get_pgid() - << "epoch_queued=" << epoch_queued - << ")"; - } - void run( - OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final; -}; - -class PGScrub : public PGOpQueueable { - epoch_t epoch_queued; -public: - PGScrub( - spg_t pg, - epoch_t epoch_queued) - : PGOpQueueable(pg), epoch_queued(epoch_queued) {} - op_type_t get_op_type() const override final { - return op_type_t::bg_scrub; - } - ostream &print(ostream &rhs) const override final { - return rhs << "PGScrub(pgid=" << get_pgid() - << "epoch_queued=" << epoch_queued - << ")"; - } - void run( - OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final; -}; - -class PGRecovery : public PGOpQueueable { - epoch_t epoch_queued; - uint64_t reserved_pushes; -public: - PGRecovery( - spg_t pg, - epoch_t epoch_queued, - uint64_t reserved_pushes) - : PGOpQueueable(pg), - epoch_queued(epoch_queued), - reserved_pushes(reserved_pushes) {} - op_type_t get_op_type() const override final { - return op_type_t::bg_recovery; - } - virtual ostream &print(ostream &rhs) const override final { - return rhs << "PGRecovery(pgid=" << get_pgid() - << "epoch_queued=" << epoch_queued - << "reserved_pushes=" << reserved_pushes - << ")"; - } - virtual uint64_t get_reserved_pushes() const override final { - return reserved_pushes; - } - virtual void run( - OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final; -}; - -class PGRecoveryContext : public PGOpQueueable { - unique_ptr> c; - epoch_t epoch; -public: - PGRecoveryContext(spg_t pgid, - GenContext *c, epoch_t epoch) - : PGOpQueueable(pgid), - c(c), epoch(epoch) {} - op_type_t get_op_type() const override final { - return op_type_t::bg_recovery; - } - ostream &print(ostream &rhs) const override final { - return rhs << "PGRecoveryContext(pgid=" << get_pgid() - << " c=" << c.get() << " epoch=" << epoch - << ")"; - } - void run( - OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final; -}; - -class PGDelete : public PGOpQueueable { - epoch_t epoch_queued; -public: - PGDelete( - spg_t pg, - epoch_t epoch_queued) - : PGOpQueueable(pg), - epoch_queued(epoch_queued) {} - op_type_t get_op_type() const override final { - return op_type_t::bg_pg_delete; - } - ostream &print(ostream &rhs) const override final { - return rhs << "PGDelete(" << get_pgid() - << " e" << epoch_queued - << ")"; - } - void run( - OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final; -}; diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 949e8507d66..e4e82891c5d 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -25,6 +25,7 @@ #include "OpRequest.h" #include "ScrubStore.h" #include "Session.h" +#include "osd/scheduler/OpSchedulerItem.h" #include "common/Timer.h" #include "common/perf_counters.h" @@ -76,6 +77,8 @@ #undef dout_prefix #define dout_prefix _prefix(_dout, this) +using namespace ceph::osd::scheduler; + template static ostream& _prefix(std::ostream *_dout, T *t) { @@ -1270,8 +1273,8 @@ void PG::requeue_op(OpRequestRef op) } else { dout(20) << __func__ << " " << op << dendl; osd->enqueue_front( - OpQueueItem( - unique_ptr(new PGOpItem(info.pgid, op)), + OpSchedulerItem( + unique_ptr(new PGOpItem(info.pgid, op)), op->get_req()->get_cost(), op->get_req()->get_priority(), op->get_req()->get_recv_stamp(), @@ -1304,8 +1307,8 @@ void PG::requeue_map_waiters() dout(20) << __func__ << " " << p->first << " " << p->second << dendl; for (auto q = p->second.rbegin(); q != p->second.rend(); ++q) { auto req = *q; - osd->enqueue_front(OpQueueItem( - unique_ptr(new PGOpItem(info.pgid, req)), + osd->enqueue_front(OpSchedulerItem( + unique_ptr(new PGOpItem(info.pgid, req)), req->get_req()->get_cost(), req->get_req()->get_priority(), req->get_req()->get_recv_stamp(), diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index ec013eb085a..2ef4ebf8b6c 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -76,6 +76,8 @@ static ostream& _prefix(std::ostream *_dout, T *pg) { MEMPOOL_DEFINE_OBJECT_FACTORY(PrimaryLogPG, replicatedpg, osd); +using namespace ceph::osd::scheduler; + /** * The CopyCallback class defines an interface for completions to the * copy_start code. Users of the copy infrastructure must implement @@ -11606,8 +11608,8 @@ void PrimaryLogPG::_applied_recovered_object_replica() scrubber.active_rep_scrub->get_req())->chunky) { auto& op = scrubber.active_rep_scrub; osd->enqueue_back( - OpQueueItem( - unique_ptr(new PGOpItem(info.pgid, op)), + OpSchedulerItem( + unique_ptr(new PGOpItem(info.pgid, op)), op->get_req()->get_cost(), op->get_req()->get_priority(), op->get_req()->get_recv_stamp(), diff --git a/src/osd/mClockClientQueue.h b/src/osd/mClockClientQueue.h index 84454ff6bd0..ea0866e3031 100644 --- a/src/osd/mClockClientQueue.h +++ b/src/osd/mClockClientQueue.h @@ -22,13 +22,13 @@ #include "common/config.h" #include "common/ceph_context.h" #include "common/mClockPriorityQueue.h" -#include "osd/OpQueueItem.h" +#include "osd/scheduler/OpSchedulerItem.h" #include "osd/mClockOpClassSupport.h" namespace ceph { - using Request = OpQueueItem; + using Request = ceph::osd::scheduler::OpSchedulerItem; using Client = uint64_t; // This class exists to bridge the ceph code, which treats the class @@ -103,9 +103,13 @@ namespace ceph { // Formatted output of the queue void dump(ceph::Formatter *f) const override final; + void print(std::ostream &ostream) const final { + ostream << "mClockClientQueue"; + } + protected: InnerClient get_inner_client(const Client& cl, const Request& request); - }; // class mClockClientAdapter + }; // class mClockClientQueue } // namespace ceph diff --git a/src/osd/mClockOpClassQueue.h b/src/osd/mClockOpClassQueue.h index 3ad7f71972d..0e069ee3f26 100644 --- a/src/osd/mClockOpClassQueue.h +++ b/src/osd/mClockOpClassQueue.h @@ -23,13 +23,13 @@ #include "common/config.h" #include "common/ceph_context.h" #include "common/mClockPriorityQueue.h" -#include "osd/OpQueueItem.h" +#include "osd/scheduler/OpSchedulerItem.h" #include "osd/mClockOpClassSupport.h" namespace ceph { - using Request = OpQueueItem; + using Request = ceph::osd::scheduler::OpSchedulerItem; using Client = uint64_t; // This class exists to bridge the ceph code, which treats the class @@ -121,5 +121,9 @@ namespace ceph { // Formatted output of the queue void dump(ceph::Formatter *f) const override final; - }; // class mClockOpClassAdapter + + void print(std::ostream &ostream) const final { + ostream << "mClockOpClassQueue"; + } + }; // class mClockOpClassQueue } // namespace ceph diff --git a/src/osd/mClockOpClassSupport.cc b/src/osd/mClockOpClassSupport.cc index 49cad6ced58..c8501eb90fd 100644 --- a/src/osd/mClockOpClassSupport.cc +++ b/src/osd/mClockOpClassSupport.cc @@ -13,10 +13,8 @@ */ -#include "common/dout.h" #include "osd/mClockOpClassSupport.h" -#include "osd/OpQueueItem.h" - +#include "common/dout.h" #include "include/ceph_assert.h" namespace ceph { @@ -80,7 +78,8 @@ namespace ceph { } osd_op_type_t - OpClassClientInfoMgr::osd_op_type(const OpQueueItem& op) const { + OpClassClientInfoMgr::osd_op_type( + const ceph::osd::scheduler::OpSchedulerItem& op) const { osd_op_type_t type = convert_op_type(op.get_op_type()); if (osd_op_type_t::client_op != type) { return type; diff --git a/src/osd/mClockOpClassSupport.h b/src/osd/mClockOpClassSupport.h index 1ea1043eb08..f99c6417699 100644 --- a/src/osd/mClockOpClassSupport.h +++ b/src/osd/mClockOpClassSupport.h @@ -19,13 +19,14 @@ #include "dmclock/src/dmclock_server.h" #include "osd/OpRequest.h" -#include "osd/OpQueueItem.h" +#include "osd/scheduler/OpSchedulerItem.h" namespace ceph { namespace mclock { - using op_item_type_t = OpQueueItem::OpQueueable::op_type_t; + using op_item_type_t = + ceph::osd::scheduler::OpSchedulerItem::OpQueueable::op_type_t; enum class osd_op_type_t { client_op, osd_rep_op, bg_snaptrim, bg_recovery, bg_scrub, bg_pg_delete, @@ -93,7 +94,8 @@ namespace ceph { } } - osd_op_type_t osd_op_type(const OpQueueItem&) const; + osd_op_type_t osd_op_type( + const ceph::osd::scheduler::OpSchedulerItem&) const; // used for debugging since faster implementation can be done // with rep_op_msg_bitmap diff --git a/src/osd/scheduler/OpScheduler.cc b/src/osd/scheduler/OpScheduler.cc new file mode 100644 index 00000000000..f8886270394 --- /dev/null +++ b/src/osd/scheduler/OpScheduler.cc @@ -0,0 +1,77 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2019 Red Hat Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include + +#include "osd/scheduler/OpScheduler.h" + +#include "common/PrioritizedQueue.h" +#include "common/WeightedPriorityQueue.h" +#include "osd/scheduler/mClockScheduler.h" +#include "osd/mClockClientQueue.h" +#include "osd/mClockOpClassQueue.h" + +namespace ceph::osd::scheduler { + +OpSchedulerRef make_scheduler(CephContext *cct) +{ + const std::string *type = &cct->_conf->osd_op_queue; + if (*type == "debug_random") { + static const std::string index_lookup[] = { "prioritized", + "mclock_opclass", + "mclock_client", + "wpq" }; + srand(time(NULL)); + unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0])); + type = &index_lookup[which]; + } + + if (*type == "prioritized") { + return std::make_unique< + ClassedOpQueueScheduler>>( + cct, + cct->_conf->osd_op_pq_max_tokens_per_priority, + cct->_conf->osd_op_pq_min_cost + ); + } else if (*type == "mclock_opclass") { + return std::make_unique< + ClassedOpQueueScheduler>( + cct, + cct + ); + } else if (*type == "mclock_client") { + return std::make_unique< + ClassedOpQueueScheduler>( + cct, + cct + ); + } else if (*type == "wpq" ) { + // default is 'wpq' + return std::make_unique< + ClassedOpQueueScheduler>>( + cct, + cct->_conf->osd_op_pq_max_tokens_per_priority, + cct->_conf->osd_op_pq_min_cost + ); + } else { + ceph_assert("Invalid choice of wq" == 0); + } +} + +std::ostream &operator<<(std::ostream &lhs, const OpScheduler &rhs) { + rhs.print(lhs); + return lhs; +} + +} diff --git a/src/osd/scheduler/OpScheduler.h b/src/osd/scheduler/OpScheduler.h new file mode 100644 index 00000000000..ae770a72b27 --- /dev/null +++ b/src/osd/scheduler/OpScheduler.h @@ -0,0 +1,137 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2019 Red Hat Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include + +#include "common/ceph_context.h" +#include "osd/scheduler/OpSchedulerItem.h" + +namespace ceph::osd::scheduler { + +using client = uint64_t; + +/** + * Base interface for classes responsible for choosing + * op processing order in the OSD. + */ +class OpScheduler { +public: + // Enqueue op for scheduling + virtual void enqueue(OpSchedulerItem &&item) = 0; + + // Enqueue op for processing as though it were enqueued prior + // to other items already scheduled. + virtual void enqueue_front(OpSchedulerItem &&item) = 0; + + // Returns true iff there are no ops scheduled + virtual bool empty() const = 0; + + // Return next op to be processed + virtual OpSchedulerItem dequeue() = 0; + + // Dump formatted representation for the queue + virtual void dump(ceph::Formatter &f) const = 0; + + // Print human readable brief description with relevant parameters + virtual void print(std::ostream &out) const = 0; + + // Destructor + virtual ~OpScheduler() {}; +}; + +std::ostream &operator<<(std::ostream &lhs, const OpScheduler &); +using OpSchedulerRef = std::unique_ptr; + +OpSchedulerRef make_scheduler(CephContext *cct); + +/** + * Implements OpScheduler in terms of OpQueue + * + * Templated on queue type to avoid dynamic dispatch, T should implement + * OpQueue. This adapter is mainly responsible for + * the boilerplate priority cutoff/strict concept which is needed for + * OpQueue based implementations. + */ +template +class ClassedOpQueueScheduler : public OpScheduler { + unsigned cutoff; + T queue; + + static unsigned int get_io_prio_cut(CephContext *cct) { + if (cct->_conf->osd_op_queue_cut_off == "debug_random") { + srand(time(NULL)); + return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW; + } else if (cct->_conf->osd_op_queue_cut_off == "high") { + return CEPH_MSG_PRIO_HIGH; + } else { + // default / catch-all is 'low' + return CEPH_MSG_PRIO_LOW; + } + } +public: + template + ClassedOpQueueScheduler(CephContext *cct, Args&&... args) : + cutoff(get_io_prio_cut(cct)), + queue(std::forward(args)...) + {} + + void enqueue(OpSchedulerItem &&item) final { + unsigned priority = item.get_priority(); + unsigned cost = item.get_cost(); + + if (priority >= cutoff) + queue.enqueue_strict( + item.get_owner(), priority, std::move(item)); + else + queue.enqueue( + item.get_owner(), priority, cost, std::move(item)); + } + + void enqueue_front(OpSchedulerItem &&item) final { + unsigned priority = item.get_priority(); + unsigned cost = item.get_cost(); + if (priority >= cutoff) + queue.enqueue_strict_front( + item.get_owner(), + priority, std::move(item)); + else + queue.enqueue_front( + item.get_owner(), + priority, cost, std::move(item)); + } + + bool empty() const final { + return queue.empty(); + } + + OpSchedulerItem dequeue() final { + return queue.dequeue(); + } + + void dump(ceph::Formatter &f) const final { + return queue.dump(&f); + } + + void print(std::ostream &out) const final { + out << "ClassedOpQueueScheduler(queue="; + queue.print(out); + out << ", cutoff=" << cutoff << ")"; + } + + ~ClassedOpQueueScheduler() final {}; +}; + +} diff --git a/src/osd/scheduler/OpSchedulerItem.cc b/src/osd/scheduler/OpSchedulerItem.cc new file mode 100644 index 00000000000..28069dcef3a --- /dev/null +++ b/src/osd/scheduler/OpSchedulerItem.cc @@ -0,0 +1,88 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 Red Hat Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "osd/scheduler/OpSchedulerItem.h" +#include "osd/OSD.h" + +namespace ceph::osd::scheduler { + +void PGOpItem::run( + OSD *osd, + OSDShard *sdata, + PGRef& pg, + ThreadPool::TPHandle &handle) +{ + osd->dequeue_op(pg, op, handle); + pg->unlock(); +} + +void PGPeeringItem::run( + OSD *osd, + OSDShard *sdata, + PGRef& pg, + ThreadPool::TPHandle &handle) +{ + osd->dequeue_peering_evt(sdata, pg.get(), evt, handle); +} + +void PGSnapTrim::run( + OSD *osd, + OSDShard *sdata, + PGRef& pg, + ThreadPool::TPHandle &handle) +{ + pg->snap_trimmer(epoch_queued); + pg->unlock(); +} + +void PGScrub::run( + OSD *osd, + OSDShard *sdata, + PGRef& pg, + ThreadPool::TPHandle &handle) +{ + pg->scrub(epoch_queued, handle); + pg->unlock(); +} + +void PGRecovery::run( + OSD *osd, + OSDShard *sdata, + PGRef& pg, + ThreadPool::TPHandle &handle) +{ + osd->do_recovery(pg.get(), epoch_queued, reserved_pushes, handle); + pg->unlock(); +} + +void PGRecoveryContext::run( + OSD *osd, + OSDShard *sdata, + PGRef& pg, + ThreadPool::TPHandle &handle) +{ + c.release()->complete(handle); + pg->unlock(); +} + +void PGDelete::run( + OSD *osd, + OSDShard *sdata, + PGRef& pg, + ThreadPool::TPHandle &handle) +{ + osd->dequeue_delete(sdata, pg.get(), epoch_queued, handle); +} + +} diff --git a/src/osd/scheduler/OpSchedulerItem.h b/src/osd/scheduler/OpSchedulerItem.h new file mode 100644 index 00000000000..5636221532f --- /dev/null +++ b/src/osd/scheduler/OpSchedulerItem.h @@ -0,0 +1,438 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 Red Hat Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include + +#include "include/types.h" +#include "include/utime.h" +#include "osd/OpRequest.h" +#include "osd/PG.h" +#include "osd/PGPeeringEvent.h" +#include "common/mClockCommon.h" +#include "messages/MOSDOp.h" + + +class OSD; +class OSDShard; + +namespace ceph::osd::scheduler { + +enum class op_scheduler_class : uint8_t { + background_recovery = 0, + background_best_effort, + immediate, + client, +}; + +class OpSchedulerItem { +public: + class OrderLocker { + public: + using Ref = unique_ptr; + virtual void lock() = 0; + virtual void unlock() = 0; + virtual ~OrderLocker() {} + }; + + // Abstraction for operations queueable in the op queue + class OpQueueable { + public: + enum class op_type_t { + client_op, + peering_event, + bg_snaptrim, + bg_recovery, + bg_scrub, + bg_pg_delete + }; + using Ref = std::unique_ptr; + + /// Items with the same queue token will end up in the same shard + virtual uint32_t get_queue_token() const = 0; + + /* Items will be dequeued and locked atomically w.r.t. other items with the + * same ordering token */ + virtual const spg_t& get_ordering_token() const = 0; + virtual OrderLocker::Ref get_order_locker(PGRef pg) = 0; + virtual op_type_t get_op_type() const = 0; + virtual std::optional maybe_get_op() const { + return std::nullopt; + } + + virtual uint64_t get_reserved_pushes() const { + return 0; + } + + virtual bool is_peering() const { + return false; + } + virtual bool peering_requires_pg() const { + ceph_abort(); + } + virtual const PGCreateInfo *creates_pg() const { + return nullptr; + } + + virtual ostream &print(ostream &rhs) const = 0; + + virtual void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) = 0; + virtual op_scheduler_class get_scheduler_class() const = 0; + + virtual std::optional + get_mclock_profile_params() const { + return std::nullopt; + } + + virtual std::optional + get_dmclock_request_state() const { + return std::nullopt; + } + + virtual ~OpQueueable() {} + friend ostream& operator<<(ostream& out, const OpQueueable& q) { + return q.print(out); + } + + }; + +private: + OpQueueable::Ref qitem; + int cost; + unsigned priority; + utime_t start_time; + uint64_t owner; ///< global id (e.g., client.XXX) + epoch_t map_epoch; ///< an epoch we expect the PG to exist in + +public: + OpSchedulerItem( + OpQueueable::Ref &&item, + int cost, + unsigned priority, + utime_t start_time, + uint64_t owner, + epoch_t e) + : qitem(std::move(item)), + cost(cost), + priority(priority), + start_time(start_time), + owner(owner), + map_epoch(e) + {} + OpSchedulerItem(OpSchedulerItem &&) = default; + OpSchedulerItem(const OpSchedulerItem &) = delete; + OpSchedulerItem &operator=(OpSchedulerItem &&) = default; + OpSchedulerItem &operator=(const OpSchedulerItem &) = delete; + + OrderLocker::Ref get_order_locker(PGRef pg) { + return qitem->get_order_locker(pg); + } + uint32_t get_queue_token() const { + return qitem->get_queue_token(); + } + const spg_t& get_ordering_token() const { + return qitem->get_ordering_token(); + } + using op_type_t = OpQueueable::op_type_t; + OpQueueable::op_type_t get_op_type() const { + return qitem->get_op_type(); + } + std::optional maybe_get_op() const { + return qitem->maybe_get_op(); + } + uint64_t get_reserved_pushes() const { + return qitem->get_reserved_pushes(); + } + void run(OSD *osd, OSDShard *sdata,PGRef& pg, ThreadPool::TPHandle &handle) { + qitem->run(osd, sdata, pg, handle); + } + unsigned get_priority() const { return priority; } + int get_cost() const { return cost; } + utime_t get_start_time() const { return start_time; } + uint64_t get_owner() const { return owner; } + epoch_t get_map_epoch() const { return map_epoch; } + + auto get_mclock_profile_params() const { + return qitem->get_mclock_profile_params(); + } + auto get_dmclock_request_state() const { + return qitem->get_dmclock_request_state(); + } + + bool is_peering() const { + return qitem->is_peering(); + } + + const PGCreateInfo *creates_pg() const { + return qitem->creates_pg(); + } + + bool peering_requires_pg() const { + return qitem->peering_requires_pg(); + } + + op_scheduler_class get_scheduler_class() const { + return qitem->get_scheduler_class(); + } + + friend ostream& operator<<(ostream& out, const OpSchedulerItem& item) { + out << "OpSchedulerItem(" + << item.get_ordering_token() << " " << *item.qitem + << " prio " << item.get_priority() + << " cost " << item.get_cost() + << " e" << item.get_map_epoch(); + if (item.get_reserved_pushes()) { + out << " reserved_pushes " << item.get_reserved_pushes(); + } + return out << ")"; + } +}; // class OpSchedulerItem + +/// Implements boilerplate for operations queued for the pg lock +class PGOpQueueable : public OpSchedulerItem::OpQueueable { + spg_t pgid; +protected: + const spg_t& get_pgid() const { + return pgid; + } +public: + explicit PGOpQueueable(spg_t pg) : pgid(pg) {} + uint32_t get_queue_token() const final { + return get_pgid().ps(); + } + + const spg_t& get_ordering_token() const final { + return get_pgid(); + } + + OpSchedulerItem::OrderLocker::Ref get_order_locker(PGRef pg) final { + class Locker : public OpSchedulerItem::OrderLocker { + PGRef pg; + public: + explicit Locker(PGRef pg) : pg(pg) {} + void lock() final { + pg->lock(); + } + void unlock() final { + pg->unlock(); + } + }; + return OpSchedulerItem::OrderLocker::Ref( + new Locker(pg)); + } +}; + +class PGOpItem : public PGOpQueueable { + OpRequestRef op; + + const MOSDOp *maybe_get_mosd_op() const { + auto req = op->get_req(); + if (req->get_type() == CEPH_MSG_OSD_OP) { + return op->get_req(); + } else { + return nullptr; + } + } + +public: + PGOpItem(spg_t pg, OpRequestRef op) : PGOpQueueable(pg), op(std::move(op)) {} + op_type_t get_op_type() const final { + + return op_type_t::client_op; + } + + ostream &print(ostream &rhs) const final { + return rhs << "PGOpItem(op=" << *(op->get_req()) << ")"; + } + + std::optional maybe_get_op() const final { + return op; + } + + op_scheduler_class get_scheduler_class() const final { + if (maybe_get_mosd_op()) { + return op_scheduler_class::client; + } else { + return op_scheduler_class::immediate; + } + } + + std::optional + get_mclock_profile_params() const final { + auto op = maybe_get_mosd_op(); + if (!op) + return std::nullopt; + + return op->get_mclock_profile_params(); + } + + std::optional + get_dmclock_request_state() const final { + auto op = maybe_get_mosd_op(); + if (!op) + return std::nullopt; + + return op->get_dmclock_request_state(); + } + + void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final; +}; + +class PGPeeringItem : public PGOpQueueable { + PGPeeringEventRef evt; +public: + PGPeeringItem(spg_t pg, PGPeeringEventRef e) : PGOpQueueable(pg), evt(e) {} + op_type_t get_op_type() const final { + return op_type_t::peering_event; + } + ostream &print(ostream &rhs) const final { + return rhs << "PGPeeringEvent(" << evt->get_desc() << ")"; + } + void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final; + bool is_peering() const override { + return true; + } + bool peering_requires_pg() const override { + return evt->requires_pg; + } + const PGCreateInfo *creates_pg() const override { + return evt->create_info.get(); + } + op_scheduler_class get_scheduler_class() const final { + return op_scheduler_class::immediate; + } +}; + +class PGSnapTrim : public PGOpQueueable { + epoch_t epoch_queued; +public: + PGSnapTrim( + spg_t pg, + epoch_t epoch_queued) + : PGOpQueueable(pg), epoch_queued(epoch_queued) {} + op_type_t get_op_type() const final { + return op_type_t::bg_snaptrim; + } + ostream &print(ostream &rhs) const final { + return rhs << "PGSnapTrim(pgid=" << get_pgid() + << "epoch_queued=" << epoch_queued + << ")"; + } + void run( + OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final; + op_scheduler_class get_scheduler_class() const final { + return op_scheduler_class::background_best_effort; + } +}; + +class PGScrub : public PGOpQueueable { + epoch_t epoch_queued; +public: + PGScrub( + spg_t pg, + epoch_t epoch_queued) + : PGOpQueueable(pg), epoch_queued(epoch_queued) {} + op_type_t get_op_type() const final { + return op_type_t::bg_scrub; + } + ostream &print(ostream &rhs) const final { + return rhs << "PGScrub(pgid=" << get_pgid() + << "epoch_queued=" << epoch_queued + << ")"; + } + void run( + OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final; + op_scheduler_class get_scheduler_class() const final { + return op_scheduler_class::background_best_effort; + } +}; + +class PGRecovery : public PGOpQueueable { + epoch_t epoch_queued; + uint64_t reserved_pushes; +public: + PGRecovery( + spg_t pg, + epoch_t epoch_queued, + uint64_t reserved_pushes) + : PGOpQueueable(pg), + epoch_queued(epoch_queued), + reserved_pushes(reserved_pushes) {} + op_type_t get_op_type() const final { + return op_type_t::bg_recovery; + } + ostream &print(ostream &rhs) const final { + return rhs << "PGRecovery(pgid=" << get_pgid() + << "epoch_queued=" << epoch_queued + << "reserved_pushes=" << reserved_pushes + << ")"; + } + uint64_t get_reserved_pushes() const final { + return reserved_pushes; + } + void run( + OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final; + op_scheduler_class get_scheduler_class() const final { + return op_scheduler_class::background_recovery; + } +}; + +class PGRecoveryContext : public PGOpQueueable { + unique_ptr> c; + epoch_t epoch; +public: + PGRecoveryContext(spg_t pgid, + GenContext *c, epoch_t epoch) + : PGOpQueueable(pgid), + c(c), epoch(epoch) {} + op_type_t get_op_type() const final { + return op_type_t::bg_recovery; + } + ostream &print(ostream &rhs) const final { + return rhs << "PGRecoveryContext(pgid=" << get_pgid() + << " c=" << c.get() << " epoch=" << epoch + << ")"; + } + void run( + OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final; + op_scheduler_class get_scheduler_class() const final { + return op_scheduler_class::background_recovery; + } +}; + +class PGDelete : public PGOpQueueable { + epoch_t epoch_queued; +public: + PGDelete( + spg_t pg, + epoch_t epoch_queued) + : PGOpQueueable(pg), + epoch_queued(epoch_queued) {} + op_type_t get_op_type() const final { + return op_type_t::bg_pg_delete; + } + ostream &print(ostream &rhs) const final { + return rhs << "PGDelete(" << get_pgid() + << " e" << epoch_queued + << ")"; + } + void run( + OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final; + op_scheduler_class get_scheduler_class() const final { + return op_scheduler_class::background_best_effort; + } +}; + +} diff --git a/src/test/osd/TestMClockClientQueue.cc b/src/test/osd/TestMClockClientQueue.cc index 70e054c7b93..157f32fb708 100644 --- a/src/test/osd/TestMClockClientQueue.cc +++ b/src/test/osd/TestMClockClientQueue.cc @@ -8,6 +8,7 @@ #include "osd/mClockClientQueue.h" +using namespace ceph::osd::scheduler; int main(int argc, char **argv) { std::vector args(argv, argv+argc); @@ -36,26 +37,59 @@ public: client3(100000001) {} -#if 0 // more work needed here - Request create_client_op(epoch_t e, uint64_t owner) { - return Request(spg_t(), OpQueueItem(OpRequestRef(), e)); + struct MockDmclockItem : public PGOpQueueable { + ceph::qos::dmclock_request_t request; + MockDmclockItem(decltype(request) _request) : + PGOpQueueable(spg_t()), request(_request) {} + +public: + op_type_t get_op_type() const final { + return op_type_t::client_op; + } + + ostream &print(ostream &rhs) const final { return rhs; } + + std::optional maybe_get_op() const final { + return std::nullopt; + } + + op_scheduler_class get_scheduler_class() const final { + return op_scheduler_class::client; + } + + std::optional + get_dmclock_request_state() const final { + return request; + } + + void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final {} + }; + + template + Request create_dmclock(epoch_t e, uint64_t owner, Args... args) { + return Request( + OpSchedulerItem( + unique_ptr( + new MockDmclockItem( + std::forward(args)...)), + 12, 12, + utime_t(), owner, e)); } -#endif Request create_snaptrim(epoch_t e, uint64_t owner) { - return Request(OpQueueItem(unique_ptr(new PGSnapTrim(spg_t(), e)), + return Request(OpSchedulerItem(unique_ptr(new PGSnapTrim(spg_t(), e)), 12, 12, utime_t(), owner, e)); } Request create_scrub(epoch_t e, uint64_t owner) { - return Request(OpQueueItem(unique_ptr(new PGScrub(spg_t(), e)), + return Request(OpSchedulerItem(unique_ptr(new PGScrub(spg_t(), e)), 12, 12, utime_t(), owner, e)); } Request create_recovery(epoch_t e, uint64_t owner) { - return Request(OpQueueItem(unique_ptr(new PGRecovery(spg_t(), e, 64)), + return Request(OpSchedulerItem(unique_ptr(new PGRecovery(spg_t(), e, 64)), 12, 12, utime_t(), owner, e)); } @@ -131,6 +165,36 @@ TEST_F(MClockClientQueueTest, TestEnqueue) { } +TEST_F(MClockClientQueueTest, TestDistributedEnqueue) { + Request r1 = create_snaptrim(100, client1); + Request r2 = create_snaptrim(101, client2); + Request r3 = create_snaptrim(102, client3); + Request r4 = create_dmclock(103, client1, dmc::ReqParams(50,1)); + Request r5 = create_dmclock(104, client2, dmc::ReqParams(30,1)); + Request r6 = create_dmclock(105, client3, dmc::ReqParams(10,1)); + + q.enqueue(client1, 12, 0, std::move(r1)); + q.enqueue(client2, 12, 0, std::move(r2)); + q.enqueue(client3, 12, 0, std::move(r3)); + q.enqueue(client1, 12, 0, std::move(r4)); + q.enqueue(client2, 12, 0, std::move(r5)); + q.enqueue(client3, 12, 0, std::move(r6)); + + Request r = q.dequeue(); + r = q.dequeue(); + r = q.dequeue(); + + r = q.dequeue(); + ASSERT_EQ(105u, r.get_map_epoch()); + + r = q.dequeue(); + ASSERT_EQ(104u, r.get_map_epoch()); + + r = q.dequeue(); + ASSERT_EQ(103u, r.get_map_epoch()); +} + + TEST_F(MClockClientQueueTest, TestEnqueueStrict) { q.enqueue_strict(client1, 12, create_snaptrim(100, client1)); q.enqueue_strict(client2, 13, create_snaptrim(101, client2)); diff --git a/src/test/osd/TestMClockOpClassQueue.cc b/src/test/osd/TestMClockOpClassQueue.cc index 0f6b564ab50..b9da8379fb5 100644 --- a/src/test/osd/TestMClockOpClassQueue.cc +++ b/src/test/osd/TestMClockOpClassQueue.cc @@ -10,6 +10,7 @@ #include "osd/mClockOpClassQueue.h" +using namespace ceph::osd::scheduler; int main(int argc, char **argv) { std::vector args(argv, argv+argc); @@ -40,24 +41,24 @@ public: #if 0 // more work needed here Request create_client_op(epoch_t e, uint64_t owner) { - return Request(spg_t(), OpQueueItem(OpRequestRef(), e)); + return Request(spg_t(), OpSchedulerItem(OpRequestRef(), e)); } #endif Request create_snaptrim(epoch_t e, uint64_t owner) { - return Request(OpQueueItem(unique_ptr(new PGSnapTrim(spg_t(), e)), + return Request(OpSchedulerItem(unique_ptr(new PGSnapTrim(spg_t(), e)), 12, 12, utime_t(), owner, e)); } Request create_scrub(epoch_t e, uint64_t owner) { - return Request(OpQueueItem(unique_ptr(new PGScrub(spg_t(), e)), + return Request(OpSchedulerItem(unique_ptr(new PGScrub(spg_t(), e)), 12, 12, utime_t(), owner, e)); } Request create_recovery(epoch_t e, uint64_t owner) { - return Request(OpQueueItem(unique_ptr(new PGRecovery(spg_t(), e, 64)), + return Request(OpSchedulerItem(unique_ptr(new PGRecovery(spg_t(), e, 64)), 12, 12, utime_t(), owner, e)); }