} // anonymous namespace
-OSD *osd = nullptr;
+OSD *osdptr = nullptr;
void handle_osd_signal(int signum)
{
- if (osd)
- osd->handle_signal(signum);
+ if (osdptr)
+ osdptr->handle_signal(signum);
}
static void usage()
forker.exit(1);
}
- osd = new OSD(g_ceph_context,
- store,
- whoami,
- ms_cluster,
- ms_public,
- ms_hb_front_client,
- ms_hb_back_client,
- ms_hb_front_server,
- ms_hb_back_server,
- ms_objecter,
- &mc,
- data_path,
- journal_path);
-
- int err = osd->pre_init();
+ osdptr = new OSD(g_ceph_context,
+ store,
+ whoami,
+ ms_cluster,
+ ms_public,
+ ms_hb_front_client,
+ ms_hb_back_client,
+ ms_hb_front_server,
+ ms_hb_back_server,
+ ms_objecter,
+ &mc,
+ data_path,
+ journal_path);
+
+ int err = osdptr->pre_init();
if (err < 0) {
derr << TEXT_RED << " ** ERROR: osd pre_init failed: " << cpp_strerror(-err)
<< TEXT_NORMAL << dendl;
ms_objecter->start();
// start osd
- err = osd->init();
+ err = osdptr->init();
if (err < 0) {
derr << TEXT_RED << " ** ERROR: osd init failed: " << cpp_strerror(-err)
<< TEXT_NORMAL << dendl;
register_async_signal_handler_oneshot(SIGINT, handle_osd_signal);
register_async_signal_handler_oneshot(SIGTERM, handle_osd_signal);
- osd->final_init();
+ osdptr->final_init();
if (g_conf().get_val<bool>("inject_early_sigterm"))
kill(getpid(), SIGTERM);
shutdown_async_signal_handler();
// done
- delete osd;
+ delete osdptr;
delete ms_public;
delete ms_hb_front_client;
delete ms_hb_back_client;
// Formatted output of the queue
virtual void dump(ceph::Formatter *f) const = 0;
+ // Human readable brief description of queue and relevant parameters
+ virtual void print(std::ostream &f) const = 0;
+
// Don't leak resources on destruction
virtual ~OpQueue() {};
};
}
f->close_section();
}
+
+ void print(std::ostream &ostream) const final {
+ ostream << "PrioritizedQueue";
+ }
};
#endif
normal.dump(f);
f->close_section();
}
+
+ void print(std::ostream &ostream) const final {
+ ostream << "WeightedPriorityQueue";
+ }
};
#endif
f->dump_int("size", queue.request_count());
f->close_section();
} // dump
+
+ void print(std::ostream &os) const final {
+ os << "mClockPriorityQueue";
+ }
};
} // namespace ceph
mClockOpClassSupport.cc
mClockOpClassQueue.cc
mClockClientQueue.cc
- OpQueueItem.cc
+ scheduler/OpScheduler.cc
+ scheduler/OpSchedulerItem.cc
PeeringState.cc
PGStateUtils.cc
MissingLoc.cc
#undef dout_prefix
#define dout_prefix _prefix(_dout, whoami, get_osdmap_epoch())
+using namespace ceph::osd::scheduler;
static ostream& _prefix(std::ostream* _dout, int whoami, epoch_t epoch) {
return *_dout << "osd." << whoami << " " << epoch << " ";
<< " in e" << m->get_map_epoch() << "/" << osdmap->get_epoch();
}
-void OSDService::enqueue_back(OpQueueItem&& qi)
+void OSDService::enqueue_back(OpSchedulerItem&& qi)
{
osd->op_shardedwq.queue(std::move(qi));
}
-void OSDService::enqueue_front(OpQueueItem&& qi)
+void OSDService::enqueue_front(OpSchedulerItem&& qi)
{
osd->op_shardedwq.queue_front(std::move(qi));
}
{
epoch_t e = get_osdmap_epoch();
enqueue_back(
- OpQueueItem(
- unique_ptr<OpQueueItem::OpQueueable>(
+ OpSchedulerItem(
+ unique_ptr<OpSchedulerItem::OpQueueable>(
new PGRecoveryContext(pg->get_pgid(), c, e)),
cct->_conf->osd_recovery_cost,
cct->_conf->osd_recovery_priority,
{
dout(10) << "queueing " << *pg << " for snaptrim" << dendl;
enqueue_back(
- OpQueueItem(
- unique_ptr<OpQueueItem::OpQueueable>(
+ OpSchedulerItem(
+ unique_ptr<OpSchedulerItem::OpQueueable>(
new PGSnapTrim(pg->get_pgid(), pg->get_osdmap_epoch())),
cct->_conf->osd_snap_trim_cost,
cct->_conf->osd_snap_trim_priority,
}
const auto epoch = pg->get_osdmap_epoch();
enqueue_back(
- OpQueueItem(
- unique_ptr<OpQueueItem::OpQueueable>(new PGScrub(pg->get_pgid(), epoch)),
+ OpSchedulerItem(
+ unique_ptr<OpSchedulerItem::OpQueueable>(new PGScrub(pg->get_pgid(), epoch)),
cct->_conf->osd_scrub_cost,
scrub_queue_priority,
ceph_clock_now(),
{
dout(10) << __func__ << " on " << pgid << " e " << e << dendl;
enqueue_back(
- OpQueueItem(
- unique_ptr<OpQueueItem::OpQueueable>(
+ OpSchedulerItem(
+ unique_ptr<OpSchedulerItem::OpQueueable>(
new PGDelete(pgid, e)),
cct->_conf->osd_pg_delete_cost,
cct->_conf->osd_pg_delete_priority,
{
ceph_assert(ceph_mutex_is_locked_by_me(recovery_lock));
enqueue_back(
- OpQueueItem(
- unique_ptr<OpQueueItem::OpQueueable>(
+ OpSchedulerItem(
+ unique_ptr<OpSchedulerItem::OpQueueable>(
new PGRecovery(
p.second->get_pgid(), p.first, reserved_pushes)),
cct->_conf->osd_recovery_cost,
op_tracker(cct, cct->_conf->osd_enable_op_tracker,
cct->_conf->osd_num_op_tracker_shard),
test_ops_hook(NULL),
- op_queue(get_io_queue()),
- op_prio_cutoff(get_io_prio_cut()),
op_shardedwq(
this,
cct->_conf->osd_op_thread_timeout,
OSDShard *one_shard = new OSDShard(
i,
cct,
- this,
- cct->_conf->osd_op_pq_max_tokens_per_priority,
- cct->_conf->osd_op_pq_min_cost,
- op_queue);
+ this);
shards.push_back(one_shard);
}
}
load_pgs();
dout(2) << "superblock: I am osd." << superblock.whoami << dendl;
- dout(0) << "using " << op_queue << " op queue with priority op cut off at " <<
- op_prio_cutoff << "." << dendl;
create_logger();
op->mark_queued_for_pg();
logger->tinc(l_osd_op_before_queue_op_lat, latency);
op_shardedwq.queue(
- OpQueueItem(
- unique_ptr<OpQueueItem::OpQueueable>(new PGOpItem(pg, std::move(op))),
+ OpSchedulerItem(
+ unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(pg, std::move(op))),
cost, priority, stamp, owner, epoch));
}
{
dout(15) << __func__ << " " << pgid << " " << evt->get_desc() << dendl;
op_shardedwq.queue(
- OpQueueItem(
- unique_ptr<OpQueueItem::OpQueueable>(new PGPeeringItem(pgid, evt)),
+ OpSchedulerItem(
+ unique_ptr<OpSchedulerItem::OpQueueable>(new PGPeeringItem(pgid, evt)),
10,
cct->_conf->osd_peering_op_priority,
utime_t(),
{
dout(15) << __func__ << " " << pgid << " " << evt->get_desc() << dendl;
op_shardedwq.queue_front(
- OpQueueItem(
- unique_ptr<OpQueueItem::OpQueueable>(new PGPeeringItem(pgid, evt)),
+ OpSchedulerItem(
+ unique_ptr<OpSchedulerItem::OpQueueable>(new PGPeeringItem(pgid, evt)),
10,
cct->_conf->osd_peering_op_priority,
utime_t(),
for (auto i = slot->to_process.rbegin();
i != slot->to_process.rend();
++i) {
- _enqueue_front(std::move(*i), osd->op_prio_cutoff);
+ scheduler->enqueue_front(std::move(*i));
}
slot->to_process.clear();
for (auto i = slot->waiting.rbegin();
i != slot->waiting.rend();
++i) {
- _enqueue_front(std::move(*i), osd->op_prio_cutoff);
+ scheduler->enqueue_front(std::move(*i));
}
slot->waiting.clear();
for (auto i = slot->waiting_peering.rbegin();
// items are waiting for maps we don't have yet. FIXME, maybe,
// someday, if we decide this inefficiency matters
for (auto j = i->second.rbegin(); j != i->second.rend(); ++j) {
- _enqueue_front(std::move(*j), osd->op_prio_cutoff);
+ scheduler->enqueue_front(std::move(*j));
}
}
slot->waiting_peering.clear();
}
}
+OSDShard::OSDShard(
+ int id,
+ CephContext *cct,
+ OSD *osd)
+ : shard_id(id),
+ cct(cct),
+ osd(osd),
+ shard_name(string("OSDShard.") + stringify(id)),
+ sdata_wait_lock_name(shard_name + "::sdata_wait_lock"),
+ sdata_wait_lock{make_mutex(sdata_wait_lock_name)},
+ osdmap_lock_name(shard_name + "::osdmap_lock"),
+ osdmap_lock{make_mutex(osdmap_lock_name)},
+ shard_lock_name(shard_name + "::shard_lock"),
+ shard_lock{make_mutex(shard_lock_name)},
+ scheduler(ceph::osd::scheduler::make_scheduler(cct)),
+ context_queue(sdata_wait_lock, sdata_cond)
+{
+ dout(0) << "using op scheduler " << *scheduler << dendl;
+}
+
// =============================================================
void OSD::ShardedOpWQ::_add_slot_waiter(
spg_t pgid,
OSDShardPGSlot *slot,
- OpQueueItem&& qi)
+ OpSchedulerItem&& qi)
{
if (qi.is_peering()) {
dout(20) << __func__ << " " << pgid
// peek at spg_t
sdata->shard_lock.lock();
- if (sdata->pqueue->empty() &&
+ if (sdata->scheduler->empty() &&
(!is_smallest_thread_index || sdata->context_queue.empty())) {
std::unique_lock wait_lock{sdata->sdata_wait_lock};
if (is_smallest_thread_index && !sdata->context_queue.empty()) {
sdata->sdata_cond.wait(wait_lock);
wait_lock.unlock();
sdata->shard_lock.lock();
- if (sdata->pqueue->empty() &&
+ if (sdata->scheduler->empty() &&
!(is_smallest_thread_index && !sdata->context_queue.empty())) {
sdata->shard_lock.unlock();
return;
sdata->context_queue.move_to(oncommits);
}
- if (sdata->pqueue->empty()) {
+ if (sdata->scheduler->empty()) {
if (osd->is_stopping()) {
sdata->shard_lock.unlock();
for (auto c : oncommits) {
return;
}
- OpQueueItem item = sdata->pqueue->dequeue();
+ OpSchedulerItem item = sdata->scheduler->dequeue();
if (osd->is_stopping()) {
sdata->shard_lock.unlock();
for (auto c : oncommits) {
handle_oncommits(oncommits);
}
-void OSD::ShardedOpWQ::_enqueue(OpQueueItem&& item) {
+void OSD::ShardedOpWQ::_enqueue(OpSchedulerItem&& item) {
uint32_t shard_index =
item.get_ordering_token().hash_to_shard(osd->shards.size());
+ dout(20) << __func__ << " " << item << dendl;
+
OSDShard* sdata = osd->shards[shard_index];
assert (NULL != sdata);
- unsigned priority = item.get_priority();
- unsigned cost = item.get_cost();
- sdata->shard_lock.lock();
- dout(20) << __func__ << " " << item << dendl;
- bool empty = sdata->pqueue->empty();
- if (priority >= osd->op_prio_cutoff)
- sdata->pqueue->enqueue_strict(
- item.get_owner(), priority, std::move(item));
- else
- sdata->pqueue->enqueue(
- item.get_owner(), priority, cost, std::move(item));
- sdata->shard_lock.unlock();
+ bool empty = true;
+ {
+ std::lock_guard l{sdata->shard_lock};
+ empty = sdata->scheduler->empty();
+ sdata->scheduler->enqueue(std::move(item));
+ }
if (empty) {
std::lock_guard l{sdata->sdata_wait_lock};
}
}
-void OSD::ShardedOpWQ::_enqueue_front(OpQueueItem&& item)
+void OSD::ShardedOpWQ::_enqueue_front(OpSchedulerItem&& item)
{
auto shard_index = item.get_ordering_token().hash_to_shard(osd->shards.size());
auto& sdata = osd->shards[shard_index];
if (p != sdata->pg_slots.end() &&
!p->second->to_process.empty()) {
// we may be racing with _process, which has dequeued a new item
- // from pqueue, put it on to_process, and is now busy taking the
+ // from scheduler, put it on to_process, and is now busy taking the
// pg lock. ensure this old requeued item is ordered before any
// such newer item in to_process.
p->second->to_process.push_front(std::move(item));
} else {
dout(20) << __func__ << " " << item << dendl;
}
- sdata->_enqueue_front(std::move(item), osd->op_prio_cutoff);
+ sdata->scheduler->enqueue_front(std::move(item));
sdata->shard_lock.unlock();
std::lock_guard l{sdata->sdata_wait_lock};
sdata->sdata_cond.notify_one();
}
}} // namespace ceph::osd_cmds
-
-
-std::ostream& operator<<(std::ostream& out, const io_queue& q) {
- switch(q) {
- case io_queue::prioritized:
- out << "prioritized";
- break;
- case io_queue::weightedpriority:
- out << "weightedpriority";
- break;
- case io_queue::mclock_opclass:
- out << "mclock_opclass";
- break;
- case io_queue::mclock_client:
- out << "mclock_client";
- break;
- }
- return out;
-}
#include "OpRequest.h"
#include "Session.h"
-#include "osd/OpQueueItem.h"
+#include "osd/scheduler/OpScheduler.h"
#include <atomic>
#include <map>
#include "common/sharedptr_registry.hpp"
#include "common/WeightedPriorityQueue.h"
#include "common/PrioritizedQueue.h"
-#include "osd/mClockOpClassQueue.h"
-#include "osd/mClockClientQueue.h"
#include "messages/MOSDOp.h"
#include "common/EventTrace.h"
#include "osd/osd_perf_counters.h"
class OSD;
class OSDService {
+ using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
public:
OSD *osd;
CephContext *cct;
md_config_cacher_t<Option::size_t> osd_max_object_size;
md_config_cacher_t<bool> osd_skip_data_digest;
- void enqueue_back(OpQueueItem&& qi);
- void enqueue_front(OpQueueItem&& qi);
+ void enqueue_back(OpSchedulerItem&& qi);
+ void enqueue_front(OpSchedulerItem&& qi);
void maybe_inject_dispatch_delay() {
if (g_conf()->osd_debug_inject_dispatch_delay_probability > 0) {
~OSDService();
};
-
-enum class io_queue {
- prioritized,
- weightedpriority,
- mclock_opclass,
- mclock_client,
-};
-
-
/*
Each PG slot includes queues for events that are processing and/or waiting
don't affect the given PG.)
- we maintain two separate wait lists, *waiting* and *waiting_peering*. The
- OpQueueItem has an is_peering() bool to determine which we use. Waiting
+ OpSchedulerItem has an is_peering() bool to determine which we use. Waiting
peering events are queued up by epoch required.
- when we wake a PG slot (e.g., we finished split, or got a newer osdmap, or
*/
struct OSDShardPGSlot {
+ using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
PGRef pg; ///< pg reference
- deque<OpQueueItem> to_process; ///< order items for this slot
+ deque<OpSchedulerItem> to_process; ///< order items for this slot
int num_running = 0; ///< _process threads doing pg lookup/lock
- deque<OpQueueItem> waiting; ///< waiting for pg (or map + pg)
+ deque<OpSchedulerItem> waiting; ///< waiting for pg (or map + pg)
/// waiting for map (peering evt)
- map<epoch_t,deque<OpQueueItem>> waiting_peering;
+ map<epoch_t,deque<OpSchedulerItem>> waiting_peering;
/// incremented by wake_pg_waiters; indicates racing _process threads
/// should bail out (their op has been requeued)
ceph::mutex shard_lock; ///< protects remaining members below
/// map of slots for each spg_t. maintains ordering of items dequeued
- /// from pqueue while _process thread drops shard lock to acquire the
+ /// from scheduler while _process thread drops shard lock to acquire the
/// pg lock. stale slots are removed by consume_map.
unordered_map<spg_t,unique_ptr<OSDShardPGSlot>> pg_slots;
ceph::condition_variable min_pg_epoch_cond;
/// priority queue
- std::unique_ptr<OpQueue<OpQueueItem, uint64_t>> pqueue;
+ ceph::osd::scheduler::OpSchedulerRef scheduler;
bool stop_waiting = false;
ContextQueue context_queue;
- void _enqueue_front(OpQueueItem&& item, unsigned cutoff) {
- unsigned priority = item.get_priority();
- unsigned cost = item.get_cost();
- if (priority >= cutoff)
- pqueue->enqueue_strict_front(
- item.get_owner(),
- priority, std::move(item));
- else
- pqueue->enqueue_front(
- item.get_owner(),
- priority, cost, std::move(item));
- }
-
void _attach_pg(OSDShardPGSlot *slot, PG *pg);
void _detach_pg(OSDShardPGSlot *slot);
OSDShard(
int id,
CephContext *cct,
- OSD *osd,
- uint64_t max_tok_per_prio, uint64_t min_cost,
- io_queue opqueue)
- : shard_id(id),
- cct(cct),
- osd(osd),
- shard_name(string("OSDShard.") + stringify(id)),
- sdata_wait_lock_name(shard_name + "::sdata_wait_lock"),
- sdata_wait_lock{make_mutex(sdata_wait_lock_name)},
- osdmap_lock_name(shard_name + "::osdmap_lock"),
- osdmap_lock{make_mutex(osdmap_lock_name)},
- shard_lock_name(shard_name + "::shard_lock"),
- shard_lock{make_mutex(shard_lock_name)},
- context_queue(sdata_wait_lock, sdata_cond) {
- if (opqueue == io_queue::weightedpriority) {
- pqueue = std::make_unique<
- WeightedPriorityQueue<OpQueueItem,uint64_t>>(
- max_tok_per_prio, min_cost);
- } else if (opqueue == io_queue::prioritized) {
- pqueue = std::make_unique<
- PrioritizedQueue<OpQueueItem,uint64_t>>(
- max_tok_per_prio, min_cost);
- } else if (opqueue == io_queue::mclock_opclass) {
- pqueue = std::make_unique<ceph::mClockOpClassQueue>(cct);
- } else if (opqueue == io_queue::mclock_client) {
- pqueue = std::make_unique<ceph::mClockClientQueue>(cct);
- }
- }
+ OSD *osd);
};
class OSD : public Dispatcher,
public md_config_obs_t {
+ using OpSchedulerItem = ceph::osd::scheduler::OpSchedulerItem;
+
/** OSD **/
// global lock
ceph::mutex osd_lock = ceph::make_mutex("OSD::osd_lock");
friend struct C_FinishSplits;
friend struct C_OpenPGs;
- // -- op queue --
- friend std::ostream& operator<<(std::ostream& out, const io_queue& q);
-
- const io_queue op_queue;
-public:
- const unsigned int op_prio_cutoff;
protected:
/*
* The ordered op delivery chain is:
*
- * fast dispatch -> pqueue back
- * pqueue front <-> to_process back
+ * fast dispatch -> scheduler back
+ * scheduler front <-> to_process back
* to_process front -> RunVis(item)
* <- queue_front()
*
- * The pqueue is per-shard, and to_process is per pg_slot. Items can be
- * pushed back up into to_process and/or pqueue while order is preserved.
+ * The scheduler is per-shard, and to_process is per pg_slot. Items can be
+ * pushed back up into to_process and/or scheduler while order is preserved.
*
* Multiple worker threads can operate on each shard.
*
* wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
* and already requeued the items.
*/
- friend class PGOpItem;
- friend class PGPeeringItem;
- friend class PGRecovery;
- friend class PGDelete;
+ friend class ceph::osd::scheduler::PGOpItem;
+ friend class ceph::osd::scheduler::PGPeeringItem;
+ friend class ceph::osd::scheduler::PGRecovery;
+ friend class ceph::osd::scheduler::PGDelete;
class ShardedOpWQ
- : public ShardedThreadPool::ShardedWQ<OpQueueItem>
+ : public ShardedThreadPool::ShardedWQ<OpSchedulerItem>
{
OSD *osd;
time_t ti,
time_t si,
ShardedThreadPool* tp)
- : ShardedThreadPool::ShardedWQ<OpQueueItem>(ti, si, tp),
+ : ShardedThreadPool::ShardedWQ<OpSchedulerItem>(ti, si, tp),
osd(o) {
}
void _add_slot_waiter(
spg_t token,
OSDShardPGSlot *slot,
- OpQueueItem&& qi);
+ OpSchedulerItem&& qi);
/// try to do some work
void _process(uint32_t thread_index, heartbeat_handle_d *hb) override;
/// enqueue a new item
- void _enqueue(OpQueueItem&& item) override;
+ void _enqueue(OpSchedulerItem&& item) override;
/// requeue an old item (at the front of the line)
- void _enqueue_front(OpQueueItem&& item) override;
+ void _enqueue_front(OpSchedulerItem&& item) override;
void return_waiting_threads() override {
for(uint32_t i = 0; i < osd->num_shards; i++) {
std::scoped_lock l{sdata->shard_lock};
f->open_object_section(queue_name);
- sdata->pqueue->dump(f);
+ sdata->scheduler->dump(*f);
f->close_section();
}
}
ceph_assert(sdata);
std::lock_guard l(sdata->shard_lock);
if (thread_index < osd->num_shards) {
- return sdata->pqueue->empty() && sdata->context_queue.empty();
+ return sdata->scheduler->empty() && sdata->context_queue.empty();
} else {
- return sdata->pqueue->empty();
+ return sdata->scheduler->empty();
}
}
void ms_handle_remote_reset(Connection *con) override {}
bool ms_handle_refused(Connection *con) override;
- io_queue get_io_queue() const {
- if (cct->_conf->osd_op_queue == "debug_random") {
- static io_queue index_lookup[] = { io_queue::prioritized,
- io_queue::weightedpriority,
- io_queue::mclock_opclass,
- io_queue::mclock_client };
- srand(time(NULL));
- unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0]));
- return index_lookup[which];
- } else if (cct->_conf->osd_op_queue == "prioritized") {
- return io_queue::prioritized;
- } else if (cct->_conf->osd_op_queue == "mclock_opclass") {
- return io_queue::mclock_opclass;
- } else if (cct->_conf->osd_op_queue == "mclock_client") {
- return io_queue::mclock_client;
- } else {
- // default / catch-all is 'wpq'
- return io_queue::weightedpriority;
- }
- }
-
- unsigned int get_io_prio_cut() const {
- if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
- srand(time(NULL));
- return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
- } else if (cct->_conf->osd_op_queue_cut_off == "high") {
- return CEPH_MSG_PRIO_HIGH;
- } else {
- // default / catch-all is 'low'
- return CEPH_MSG_PRIO_LOW;
- }
- }
-
public:
/* internal and external can point to the same messenger, they will still
* be cleaned up properly*/
};
-std::ostream& operator<<(std::ostream& out, const io_queue& q);
-
-
//compatibility of the executable
extern const CompatSet::Feature ceph_osd_feature_compat[];
extern const CompatSet::Feature ceph_osd_feature_ro_compat[];
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2016 Red Hat Inc.
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#include "OpQueueItem.h"
-#include "OSD.h"
-
-void PGOpItem::run(
- OSD *osd,
- OSDShard *sdata,
- PGRef& pg,
- ThreadPool::TPHandle &handle)
-{
- osd->dequeue_op(pg, op, handle);
- pg->unlock();
-}
-
-void PGPeeringItem::run(
- OSD *osd,
- OSDShard *sdata,
- PGRef& pg,
- ThreadPool::TPHandle &handle)
-{
- osd->dequeue_peering_evt(sdata, pg.get(), evt, handle);
-}
-
-void PGSnapTrim::run(
- OSD *osd,
- OSDShard *sdata,
- PGRef& pg,
- ThreadPool::TPHandle &handle)
-{
- pg->snap_trimmer(epoch_queued);
- pg->unlock();
-}
-
-void PGScrub::run(
- OSD *osd,
- OSDShard *sdata,
- PGRef& pg,
- ThreadPool::TPHandle &handle)
-{
- pg->scrub(epoch_queued, handle);
- pg->unlock();
-}
-
-void PGRecovery::run(
- OSD *osd,
- OSDShard *sdata,
- PGRef& pg,
- ThreadPool::TPHandle &handle)
-{
- osd->do_recovery(pg.get(), epoch_queued, reserved_pushes, handle);
- pg->unlock();
-}
-
-void PGRecoveryContext::run(
- OSD *osd,
- OSDShard *sdata,
- PGRef& pg,
- ThreadPool::TPHandle &handle)
-{
- c.release()->complete(handle);
- pg->unlock();
-}
-
-void PGDelete::run(
- OSD *osd,
- OSDShard *sdata,
- PGRef& pg,
- ThreadPool::TPHandle &handle)
-{
- osd->dequeue_delete(sdata, pg.get(), epoch_queued, handle);
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2016 Red Hat Inc.
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#pragma once
-
-#include <ostream>
-
-#include "include/types.h"
-#include "include/utime.h"
-#include "osd/OpRequest.h"
-#include "osd/PG.h"
-#include "PGPeeringEvent.h"
-
-class OSD;
-class OSDShard;
-
-class OpQueueItem {
-public:
- class OrderLocker {
- public:
- using Ref = unique_ptr<OrderLocker>;
- virtual void lock() = 0;
- virtual void unlock() = 0;
- virtual ~OrderLocker() {}
- };
- // Abstraction for operations queueable in the op queue
- class OpQueueable {
- public:
- enum class op_type_t {
- client_op,
- peering_event,
- bg_snaptrim,
- bg_recovery,
- bg_scrub,
- bg_pg_delete
- };
- using Ref = std::unique_ptr<OpQueueable>;
-
- /// Items with the same queue token will end up in the same shard
- virtual uint32_t get_queue_token() const = 0;
-
- /* Items will be dequeued and locked atomically w.r.t. other items with the
- * same ordering token */
- virtual const spg_t& get_ordering_token() const = 0;
- virtual OrderLocker::Ref get_order_locker(PGRef pg) = 0;
- virtual op_type_t get_op_type() const = 0;
- virtual std::optional<OpRequestRef> maybe_get_op() const {
- return std::nullopt;
- }
-
- virtual uint64_t get_reserved_pushes() const {
- return 0;
- }
-
- virtual bool is_peering() const {
- return false;
- }
- virtual bool peering_requires_pg() const {
- ceph_abort();
- }
- virtual const PGCreateInfo *creates_pg() const {
- return nullptr;
- }
-
- virtual ostream &print(ostream &rhs) const = 0;
-
- virtual void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) = 0;
- virtual ~OpQueueable() {}
- friend ostream& operator<<(ostream& out, const OpQueueable& q) {
- return q.print(out);
- }
-
- };
-
-private:
- OpQueueable::Ref qitem;
- int cost;
- unsigned priority;
- utime_t start_time;
- uint64_t owner; ///< global id (e.g., client.XXX)
- epoch_t map_epoch; ///< an epoch we expect the PG to exist in
-
-public:
- OpQueueItem(
- OpQueueable::Ref &&item,
- int cost,
- unsigned priority,
- utime_t start_time,
- uint64_t owner,
- epoch_t e)
- : qitem(std::move(item)),
- cost(cost),
- priority(priority),
- start_time(start_time),
- owner(owner),
- map_epoch(e)
- {}
- OpQueueItem(OpQueueItem &&) = default;
- OpQueueItem(const OpQueueItem &) = delete;
- OpQueueItem &operator=(OpQueueItem &&) = default;
- OpQueueItem &operator=(const OpQueueItem &) = delete;
-
- OrderLocker::Ref get_order_locker(PGRef pg) {
- return qitem->get_order_locker(pg);
- }
- uint32_t get_queue_token() const {
- return qitem->get_queue_token();
- }
- const spg_t& get_ordering_token() const {
- return qitem->get_ordering_token();
- }
- using op_type_t = OpQueueable::op_type_t;
- OpQueueable::op_type_t get_op_type() const {
- return qitem->get_op_type();
- }
- std::optional<OpRequestRef> maybe_get_op() const {
- return qitem->maybe_get_op();
- }
- uint64_t get_reserved_pushes() const {
- return qitem->get_reserved_pushes();
- }
- void run(OSD *osd, OSDShard *sdata,PGRef& pg, ThreadPool::TPHandle &handle) {
- qitem->run(osd, sdata, pg, handle);
- }
- unsigned get_priority() const { return priority; }
- int get_cost() const { return cost; }
- utime_t get_start_time() const { return start_time; }
- uint64_t get_owner() const { return owner; }
- epoch_t get_map_epoch() const { return map_epoch; }
-
- bool is_peering() const {
- return qitem->is_peering();
- }
-
- const PGCreateInfo *creates_pg() const {
- return qitem->creates_pg();
- }
-
- bool peering_requires_pg() const {
- return qitem->peering_requires_pg();
- }
-
- friend ostream& operator<<(ostream& out, const OpQueueItem& item) {
- out << "OpQueueItem("
- << item.get_ordering_token() << " " << *item.qitem
- << " prio " << item.get_priority()
- << " cost " << item.get_cost()
- << " e" << item.get_map_epoch();
- if (item.get_reserved_pushes()) {
- out << " reserved_pushes " << item.get_reserved_pushes();
- }
- return out << ")";
- }
-}; // class OpQueueItem
-
-/// Implements boilerplate for operations queued for the pg lock
-class PGOpQueueable : public OpQueueItem::OpQueueable {
- spg_t pgid;
-protected:
- const spg_t& get_pgid() const {
- return pgid;
- }
-public:
- explicit PGOpQueueable(spg_t pg) : pgid(pg) {}
- uint32_t get_queue_token() const override final {
- return get_pgid().ps();
- }
-
- const spg_t& get_ordering_token() const override final {
- return get_pgid();
- }
-
- OpQueueItem::OrderLocker::Ref get_order_locker(PGRef pg) override final {
- class Locker : public OpQueueItem::OrderLocker {
- PGRef pg;
- public:
- explicit Locker(PGRef pg) : pg(pg) {}
- void lock() override final {
- pg->lock();
- }
- void unlock() override final {
- pg->unlock();
- }
- };
- return OpQueueItem::OrderLocker::Ref(
- new Locker(pg));
- }
-};
-
-class PGOpItem : public PGOpQueueable {
- OpRequestRef op;
-public:
- PGOpItem(spg_t pg, OpRequestRef op) : PGOpQueueable(pg), op(std::move(op)) {}
- op_type_t get_op_type() const override final {
- return op_type_t::client_op;
- }
- ostream &print(ostream &rhs) const override final {
- return rhs << "PGOpItem(op=" << *(op->get_req()) << ")";
- }
- std::optional<OpRequestRef> maybe_get_op() const override final {
- return op;
- }
- void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final;
-};
-
-class PGPeeringItem : public PGOpQueueable {
- PGPeeringEventRef evt;
-public:
- PGPeeringItem(spg_t pg, PGPeeringEventRef e) : PGOpQueueable(pg), evt(e) {}
- op_type_t get_op_type() const override final {
- return op_type_t::peering_event;
- }
- ostream &print(ostream &rhs) const override final {
- return rhs << "PGPeeringEvent(" << evt->get_desc() << ")";
- }
- void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final;
- bool is_peering() const override {
- return true;
- }
- bool peering_requires_pg() const override {
- return evt->requires_pg;
- }
- const PGCreateInfo *creates_pg() const override {
- return evt->create_info.get();
- }
-};
-
-class PGSnapTrim : public PGOpQueueable {
- epoch_t epoch_queued;
-public:
- PGSnapTrim(
- spg_t pg,
- epoch_t epoch_queued)
- : PGOpQueueable(pg), epoch_queued(epoch_queued) {}
- op_type_t get_op_type() const override final {
- return op_type_t::bg_snaptrim;
- }
- ostream &print(ostream &rhs) const override final {
- return rhs << "PGSnapTrim(pgid=" << get_pgid()
- << "epoch_queued=" << epoch_queued
- << ")";
- }
- void run(
- OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final;
-};
-
-class PGScrub : public PGOpQueueable {
- epoch_t epoch_queued;
-public:
- PGScrub(
- spg_t pg,
- epoch_t epoch_queued)
- : PGOpQueueable(pg), epoch_queued(epoch_queued) {}
- op_type_t get_op_type() const override final {
- return op_type_t::bg_scrub;
- }
- ostream &print(ostream &rhs) const override final {
- return rhs << "PGScrub(pgid=" << get_pgid()
- << "epoch_queued=" << epoch_queued
- << ")";
- }
- void run(
- OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final;
-};
-
-class PGRecovery : public PGOpQueueable {
- epoch_t epoch_queued;
- uint64_t reserved_pushes;
-public:
- PGRecovery(
- spg_t pg,
- epoch_t epoch_queued,
- uint64_t reserved_pushes)
- : PGOpQueueable(pg),
- epoch_queued(epoch_queued),
- reserved_pushes(reserved_pushes) {}
- op_type_t get_op_type() const override final {
- return op_type_t::bg_recovery;
- }
- virtual ostream &print(ostream &rhs) const override final {
- return rhs << "PGRecovery(pgid=" << get_pgid()
- << "epoch_queued=" << epoch_queued
- << "reserved_pushes=" << reserved_pushes
- << ")";
- }
- virtual uint64_t get_reserved_pushes() const override final {
- return reserved_pushes;
- }
- virtual void run(
- OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final;
-};
-
-class PGRecoveryContext : public PGOpQueueable {
- unique_ptr<GenContext<ThreadPool::TPHandle&>> c;
- epoch_t epoch;
-public:
- PGRecoveryContext(spg_t pgid,
- GenContext<ThreadPool::TPHandle&> *c, epoch_t epoch)
- : PGOpQueueable(pgid),
- c(c), epoch(epoch) {}
- op_type_t get_op_type() const override final {
- return op_type_t::bg_recovery;
- }
- ostream &print(ostream &rhs) const override final {
- return rhs << "PGRecoveryContext(pgid=" << get_pgid()
- << " c=" << c.get() << " epoch=" << epoch
- << ")";
- }
- void run(
- OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final;
-};
-
-class PGDelete : public PGOpQueueable {
- epoch_t epoch_queued;
-public:
- PGDelete(
- spg_t pg,
- epoch_t epoch_queued)
- : PGOpQueueable(pg),
- epoch_queued(epoch_queued) {}
- op_type_t get_op_type() const override final {
- return op_type_t::bg_pg_delete;
- }
- ostream &print(ostream &rhs) const override final {
- return rhs << "PGDelete(" << get_pgid()
- << " e" << epoch_queued
- << ")";
- }
- void run(
- OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) override final;
-};
#include "OpRequest.h"
#include "ScrubStore.h"
#include "Session.h"
+#include "osd/scheduler/OpSchedulerItem.h"
#include "common/Timer.h"
#include "common/perf_counters.h"
#undef dout_prefix
#define dout_prefix _prefix(_dout, this)
+using namespace ceph::osd::scheduler;
+
template <class T>
static ostream& _prefix(std::ostream *_dout, T *t)
{
} else {
dout(20) << __func__ << " " << op << dendl;
osd->enqueue_front(
- OpQueueItem(
- unique_ptr<OpQueueItem::OpQueueable>(new PGOpItem(info.pgid, op)),
+ OpSchedulerItem(
+ unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(info.pgid, op)),
op->get_req()->get_cost(),
op->get_req()->get_priority(),
op->get_req()->get_recv_stamp(),
dout(20) << __func__ << " " << p->first << " " << p->second << dendl;
for (auto q = p->second.rbegin(); q != p->second.rend(); ++q) {
auto req = *q;
- osd->enqueue_front(OpQueueItem(
- unique_ptr<OpQueueItem::OpQueueable>(new PGOpItem(info.pgid, req)),
+ osd->enqueue_front(OpSchedulerItem(
+ unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(info.pgid, req)),
req->get_req()->get_cost(),
req->get_req()->get_priority(),
req->get_req()->get_recv_stamp(),
MEMPOOL_DEFINE_OBJECT_FACTORY(PrimaryLogPG, replicatedpg, osd);
+using namespace ceph::osd::scheduler;
+
/**
* The CopyCallback class defines an interface for completions to the
* copy_start code. Users of the copy infrastructure must implement
scrubber.active_rep_scrub->get_req())->chunky) {
auto& op = scrubber.active_rep_scrub;
osd->enqueue_back(
- OpQueueItem(
- unique_ptr<OpQueueItem::OpQueueable>(new PGOpItem(info.pgid, op)),
+ OpSchedulerItem(
+ unique_ptr<OpSchedulerItem::OpQueueable>(new PGOpItem(info.pgid, op)),
op->get_req()->get_cost(),
op->get_req()->get_priority(),
op->get_req()->get_recv_stamp(),
#include "common/config.h"
#include "common/ceph_context.h"
#include "common/mClockPriorityQueue.h"
-#include "osd/OpQueueItem.h"
+#include "osd/scheduler/OpSchedulerItem.h"
#include "osd/mClockOpClassSupport.h"
namespace ceph {
- using Request = OpQueueItem;
+ using Request = ceph::osd::scheduler::OpSchedulerItem;
using Client = uint64_t;
// This class exists to bridge the ceph code, which treats the class
// Formatted output of the queue
void dump(ceph::Formatter *f) const override final;
+ void print(std::ostream &ostream) const final {
+ ostream << "mClockClientQueue";
+ }
+
protected:
InnerClient get_inner_client(const Client& cl, const Request& request);
- }; // class mClockClientAdapter
+ }; // class mClockClientQueue
} // namespace ceph
#include "common/config.h"
#include "common/ceph_context.h"
#include "common/mClockPriorityQueue.h"
-#include "osd/OpQueueItem.h"
+#include "osd/scheduler/OpSchedulerItem.h"
#include "osd/mClockOpClassSupport.h"
namespace ceph {
- using Request = OpQueueItem;
+ using Request = ceph::osd::scheduler::OpSchedulerItem;
using Client = uint64_t;
// This class exists to bridge the ceph code, which treats the class
// Formatted output of the queue
void dump(ceph::Formatter *f) const override final;
- }; // class mClockOpClassAdapter
+
+ void print(std::ostream &ostream) const final {
+ ostream << "mClockOpClassQueue";
+ }
+ }; // class mClockOpClassQueue
} // namespace ceph
*/
-#include "common/dout.h"
#include "osd/mClockOpClassSupport.h"
-#include "osd/OpQueueItem.h"
-
+#include "common/dout.h"
#include "include/ceph_assert.h"
namespace ceph {
}
osd_op_type_t
- OpClassClientInfoMgr::osd_op_type(const OpQueueItem& op) const {
+ OpClassClientInfoMgr::osd_op_type(
+ const ceph::osd::scheduler::OpSchedulerItem& op) const {
osd_op_type_t type = convert_op_type(op.get_op_type());
if (osd_op_type_t::client_op != type) {
return type;
#include "dmclock/src/dmclock_server.h"
#include "osd/OpRequest.h"
-#include "osd/OpQueueItem.h"
+#include "osd/scheduler/OpSchedulerItem.h"
namespace ceph {
namespace mclock {
- using op_item_type_t = OpQueueItem::OpQueueable::op_type_t;
+ using op_item_type_t =
+ ceph::osd::scheduler::OpSchedulerItem::OpQueueable::op_type_t;
enum class osd_op_type_t {
client_op, osd_rep_op, bg_snaptrim, bg_recovery, bg_scrub, bg_pg_delete,
}
}
- osd_op_type_t osd_op_type(const OpQueueItem&) const;
+ osd_op_type_t osd_op_type(
+ const ceph::osd::scheduler::OpSchedulerItem&) const;
// used for debugging since faster implementation can be done
// with rep_op_msg_bitmap
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <ostream>
+
+#include "osd/scheduler/OpScheduler.h"
+
+#include "common/PrioritizedQueue.h"
+#include "common/WeightedPriorityQueue.h"
+#include "osd/scheduler/mClockScheduler.h"
+#include "osd/mClockClientQueue.h"
+#include "osd/mClockOpClassQueue.h"
+
+namespace ceph::osd::scheduler {
+
+OpSchedulerRef make_scheduler(CephContext *cct)
+{
+ const std::string *type = &cct->_conf->osd_op_queue;
+ if (*type == "debug_random") {
+ static const std::string index_lookup[] = { "prioritized",
+ "mclock_opclass",
+ "mclock_client",
+ "wpq" };
+ srand(time(NULL));
+ unsigned which = rand() % (sizeof(index_lookup) / sizeof(index_lookup[0]));
+ type = &index_lookup[which];
+ }
+
+ if (*type == "prioritized") {
+ return std::make_unique<
+ ClassedOpQueueScheduler<PrioritizedQueue<OpSchedulerItem, client>>>(
+ cct,
+ cct->_conf->osd_op_pq_max_tokens_per_priority,
+ cct->_conf->osd_op_pq_min_cost
+ );
+ } else if (*type == "mclock_opclass") {
+ return std::make_unique<
+ ClassedOpQueueScheduler<mClockOpClassQueue>>(
+ cct,
+ cct
+ );
+ } else if (*type == "mclock_client") {
+ return std::make_unique<
+ ClassedOpQueueScheduler<mClockClientQueue>>(
+ cct,
+ cct
+ );
+ } else if (*type == "wpq" ) {
+ // default is 'wpq'
+ return std::make_unique<
+ ClassedOpQueueScheduler<WeightedPriorityQueue<OpSchedulerItem, client>>>(
+ cct,
+ cct->_conf->osd_op_pq_max_tokens_per_priority,
+ cct->_conf->osd_op_pq_min_cost
+ );
+ } else {
+ ceph_assert("Invalid choice of wq" == 0);
+ }
+}
+
+std::ostream &operator<<(std::ostream &lhs, const OpScheduler &rhs) {
+ rhs.print(lhs);
+ return lhs;
+}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <ostream>
+
+#include "common/ceph_context.h"
+#include "osd/scheduler/OpSchedulerItem.h"
+
+namespace ceph::osd::scheduler {
+
+using client = uint64_t;
+
+/**
+ * Base interface for classes responsible for choosing
+ * op processing order in the OSD.
+ */
+class OpScheduler {
+public:
+ // Enqueue op for scheduling
+ virtual void enqueue(OpSchedulerItem &&item) = 0;
+
+ // Enqueue op for processing as though it were enqueued prior
+ // to other items already scheduled.
+ virtual void enqueue_front(OpSchedulerItem &&item) = 0;
+
+ // Returns true iff there are no ops scheduled
+ virtual bool empty() const = 0;
+
+ // Return next op to be processed
+ virtual OpSchedulerItem dequeue() = 0;
+
+ // Dump formatted representation for the queue
+ virtual void dump(ceph::Formatter &f) const = 0;
+
+ // Print human readable brief description with relevant parameters
+ virtual void print(std::ostream &out) const = 0;
+
+ // Destructor
+ virtual ~OpScheduler() {};
+};
+
+std::ostream &operator<<(std::ostream &lhs, const OpScheduler &);
+using OpSchedulerRef = std::unique_ptr<OpScheduler>;
+
+OpSchedulerRef make_scheduler(CephContext *cct);
+
+/**
+ * Implements OpScheduler in terms of OpQueue
+ *
+ * Templated on queue type to avoid dynamic dispatch, T should implement
+ * OpQueue<OpSchedulerItem, client>. This adapter is mainly responsible for
+ * the boilerplate priority cutoff/strict concept which is needed for
+ * OpQueue based implementations.
+ */
+template <typename T>
+class ClassedOpQueueScheduler : public OpScheduler {
+ unsigned cutoff;
+ T queue;
+
+ static unsigned int get_io_prio_cut(CephContext *cct) {
+ if (cct->_conf->osd_op_queue_cut_off == "debug_random") {
+ srand(time(NULL));
+ return (rand() % 2 < 1) ? CEPH_MSG_PRIO_HIGH : CEPH_MSG_PRIO_LOW;
+ } else if (cct->_conf->osd_op_queue_cut_off == "high") {
+ return CEPH_MSG_PRIO_HIGH;
+ } else {
+ // default / catch-all is 'low'
+ return CEPH_MSG_PRIO_LOW;
+ }
+ }
+public:
+ template <typename... Args>
+ ClassedOpQueueScheduler(CephContext *cct, Args&&... args) :
+ cutoff(get_io_prio_cut(cct)),
+ queue(std::forward<Args>(args)...)
+ {}
+
+ void enqueue(OpSchedulerItem &&item) final {
+ unsigned priority = item.get_priority();
+ unsigned cost = item.get_cost();
+
+ if (priority >= cutoff)
+ queue.enqueue_strict(
+ item.get_owner(), priority, std::move(item));
+ else
+ queue.enqueue(
+ item.get_owner(), priority, cost, std::move(item));
+ }
+
+ void enqueue_front(OpSchedulerItem &&item) final {
+ unsigned priority = item.get_priority();
+ unsigned cost = item.get_cost();
+ if (priority >= cutoff)
+ queue.enqueue_strict_front(
+ item.get_owner(),
+ priority, std::move(item));
+ else
+ queue.enqueue_front(
+ item.get_owner(),
+ priority, cost, std::move(item));
+ }
+
+ bool empty() const final {
+ return queue.empty();
+ }
+
+ OpSchedulerItem dequeue() final {
+ return queue.dequeue();
+ }
+
+ void dump(ceph::Formatter &f) const final {
+ return queue.dump(&f);
+ }
+
+ void print(std::ostream &out) const final {
+ out << "ClassedOpQueueScheduler(queue=";
+ queue.print(out);
+ out << ", cutoff=" << cutoff << ")";
+ }
+
+ ~ClassedOpQueueScheduler() final {};
+};
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "osd/scheduler/OpSchedulerItem.h"
+#include "osd/OSD.h"
+
+namespace ceph::osd::scheduler {
+
+void PGOpItem::run(
+ OSD *osd,
+ OSDShard *sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle &handle)
+{
+ osd->dequeue_op(pg, op, handle);
+ pg->unlock();
+}
+
+void PGPeeringItem::run(
+ OSD *osd,
+ OSDShard *sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle &handle)
+{
+ osd->dequeue_peering_evt(sdata, pg.get(), evt, handle);
+}
+
+void PGSnapTrim::run(
+ OSD *osd,
+ OSDShard *sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle &handle)
+{
+ pg->snap_trimmer(epoch_queued);
+ pg->unlock();
+}
+
+void PGScrub::run(
+ OSD *osd,
+ OSDShard *sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle &handle)
+{
+ pg->scrub(epoch_queued, handle);
+ pg->unlock();
+}
+
+void PGRecovery::run(
+ OSD *osd,
+ OSDShard *sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle &handle)
+{
+ osd->do_recovery(pg.get(), epoch_queued, reserved_pushes, handle);
+ pg->unlock();
+}
+
+void PGRecoveryContext::run(
+ OSD *osd,
+ OSDShard *sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle &handle)
+{
+ c.release()->complete(handle);
+ pg->unlock();
+}
+
+void PGDelete::run(
+ OSD *osd,
+ OSDShard *sdata,
+ PGRef& pg,
+ ThreadPool::TPHandle &handle)
+{
+ osd->dequeue_delete(sdata, pg.get(), epoch_queued, handle);
+}
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 Red Hat Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <ostream>
+
+#include "include/types.h"
+#include "include/utime.h"
+#include "osd/OpRequest.h"
+#include "osd/PG.h"
+#include "osd/PGPeeringEvent.h"
+#include "common/mClockCommon.h"
+#include "messages/MOSDOp.h"
+
+
+class OSD;
+class OSDShard;
+
+namespace ceph::osd::scheduler {
+
+enum class op_scheduler_class : uint8_t {
+ background_recovery = 0,
+ background_best_effort,
+ immediate,
+ client,
+};
+
+class OpSchedulerItem {
+public:
+ class OrderLocker {
+ public:
+ using Ref = unique_ptr<OrderLocker>;
+ virtual void lock() = 0;
+ virtual void unlock() = 0;
+ virtual ~OrderLocker() {}
+ };
+
+ // Abstraction for operations queueable in the op queue
+ class OpQueueable {
+ public:
+ enum class op_type_t {
+ client_op,
+ peering_event,
+ bg_snaptrim,
+ bg_recovery,
+ bg_scrub,
+ bg_pg_delete
+ };
+ using Ref = std::unique_ptr<OpQueueable>;
+
+ /// Items with the same queue token will end up in the same shard
+ virtual uint32_t get_queue_token() const = 0;
+
+ /* Items will be dequeued and locked atomically w.r.t. other items with the
+ * same ordering token */
+ virtual const spg_t& get_ordering_token() const = 0;
+ virtual OrderLocker::Ref get_order_locker(PGRef pg) = 0;
+ virtual op_type_t get_op_type() const = 0;
+ virtual std::optional<OpRequestRef> maybe_get_op() const {
+ return std::nullopt;
+ }
+
+ virtual uint64_t get_reserved_pushes() const {
+ return 0;
+ }
+
+ virtual bool is_peering() const {
+ return false;
+ }
+ virtual bool peering_requires_pg() const {
+ ceph_abort();
+ }
+ virtual const PGCreateInfo *creates_pg() const {
+ return nullptr;
+ }
+
+ virtual ostream &print(ostream &rhs) const = 0;
+
+ virtual void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) = 0;
+ virtual op_scheduler_class get_scheduler_class() const = 0;
+
+ virtual std::optional<ceph::qos::mclock_profile_params_t>
+ get_mclock_profile_params() const {
+ return std::nullopt;
+ }
+
+ virtual std::optional<ceph::qos::dmclock_request_t>
+ get_dmclock_request_state() const {
+ return std::nullopt;
+ }
+
+ virtual ~OpQueueable() {}
+ friend ostream& operator<<(ostream& out, const OpQueueable& q) {
+ return q.print(out);
+ }
+
+ };
+
+private:
+ OpQueueable::Ref qitem;
+ int cost;
+ unsigned priority;
+ utime_t start_time;
+ uint64_t owner; ///< global id (e.g., client.XXX)
+ epoch_t map_epoch; ///< an epoch we expect the PG to exist in
+
+public:
+ OpSchedulerItem(
+ OpQueueable::Ref &&item,
+ int cost,
+ unsigned priority,
+ utime_t start_time,
+ uint64_t owner,
+ epoch_t e)
+ : qitem(std::move(item)),
+ cost(cost),
+ priority(priority),
+ start_time(start_time),
+ owner(owner),
+ map_epoch(e)
+ {}
+ OpSchedulerItem(OpSchedulerItem &&) = default;
+ OpSchedulerItem(const OpSchedulerItem &) = delete;
+ OpSchedulerItem &operator=(OpSchedulerItem &&) = default;
+ OpSchedulerItem &operator=(const OpSchedulerItem &) = delete;
+
+ OrderLocker::Ref get_order_locker(PGRef pg) {
+ return qitem->get_order_locker(pg);
+ }
+ uint32_t get_queue_token() const {
+ return qitem->get_queue_token();
+ }
+ const spg_t& get_ordering_token() const {
+ return qitem->get_ordering_token();
+ }
+ using op_type_t = OpQueueable::op_type_t;
+ OpQueueable::op_type_t get_op_type() const {
+ return qitem->get_op_type();
+ }
+ std::optional<OpRequestRef> maybe_get_op() const {
+ return qitem->maybe_get_op();
+ }
+ uint64_t get_reserved_pushes() const {
+ return qitem->get_reserved_pushes();
+ }
+ void run(OSD *osd, OSDShard *sdata,PGRef& pg, ThreadPool::TPHandle &handle) {
+ qitem->run(osd, sdata, pg, handle);
+ }
+ unsigned get_priority() const { return priority; }
+ int get_cost() const { return cost; }
+ utime_t get_start_time() const { return start_time; }
+ uint64_t get_owner() const { return owner; }
+ epoch_t get_map_epoch() const { return map_epoch; }
+
+ auto get_mclock_profile_params() const {
+ return qitem->get_mclock_profile_params();
+ }
+ auto get_dmclock_request_state() const {
+ return qitem->get_dmclock_request_state();
+ }
+
+ bool is_peering() const {
+ return qitem->is_peering();
+ }
+
+ const PGCreateInfo *creates_pg() const {
+ return qitem->creates_pg();
+ }
+
+ bool peering_requires_pg() const {
+ return qitem->peering_requires_pg();
+ }
+
+ op_scheduler_class get_scheduler_class() const {
+ return qitem->get_scheduler_class();
+ }
+
+ friend ostream& operator<<(ostream& out, const OpSchedulerItem& item) {
+ out << "OpSchedulerItem("
+ << item.get_ordering_token() << " " << *item.qitem
+ << " prio " << item.get_priority()
+ << " cost " << item.get_cost()
+ << " e" << item.get_map_epoch();
+ if (item.get_reserved_pushes()) {
+ out << " reserved_pushes " << item.get_reserved_pushes();
+ }
+ return out << ")";
+ }
+}; // class OpSchedulerItem
+
+/// Implements boilerplate for operations queued for the pg lock
+class PGOpQueueable : public OpSchedulerItem::OpQueueable {
+ spg_t pgid;
+protected:
+ const spg_t& get_pgid() const {
+ return pgid;
+ }
+public:
+ explicit PGOpQueueable(spg_t pg) : pgid(pg) {}
+ uint32_t get_queue_token() const final {
+ return get_pgid().ps();
+ }
+
+ const spg_t& get_ordering_token() const final {
+ return get_pgid();
+ }
+
+ OpSchedulerItem::OrderLocker::Ref get_order_locker(PGRef pg) final {
+ class Locker : public OpSchedulerItem::OrderLocker {
+ PGRef pg;
+ public:
+ explicit Locker(PGRef pg) : pg(pg) {}
+ void lock() final {
+ pg->lock();
+ }
+ void unlock() final {
+ pg->unlock();
+ }
+ };
+ return OpSchedulerItem::OrderLocker::Ref(
+ new Locker(pg));
+ }
+};
+
+class PGOpItem : public PGOpQueueable {
+ OpRequestRef op;
+
+ const MOSDOp *maybe_get_mosd_op() const {
+ auto req = op->get_req();
+ if (req->get_type() == CEPH_MSG_OSD_OP) {
+ return op->get_req<MOSDOp>();
+ } else {
+ return nullptr;
+ }
+ }
+
+public:
+ PGOpItem(spg_t pg, OpRequestRef op) : PGOpQueueable(pg), op(std::move(op)) {}
+ op_type_t get_op_type() const final {
+
+ return op_type_t::client_op;
+ }
+
+ ostream &print(ostream &rhs) const final {
+ return rhs << "PGOpItem(op=" << *(op->get_req()) << ")";
+ }
+
+ std::optional<OpRequestRef> maybe_get_op() const final {
+ return op;
+ }
+
+ op_scheduler_class get_scheduler_class() const final {
+ if (maybe_get_mosd_op()) {
+ return op_scheduler_class::client;
+ } else {
+ return op_scheduler_class::immediate;
+ }
+ }
+
+ std::optional<ceph::qos::mclock_profile_params_t>
+ get_mclock_profile_params() const final {
+ auto op = maybe_get_mosd_op();
+ if (!op)
+ return std::nullopt;
+
+ return op->get_mclock_profile_params();
+ }
+
+ std::optional<ceph::qos::dmclock_request_t>
+ get_dmclock_request_state() const final {
+ auto op = maybe_get_mosd_op();
+ if (!op)
+ return std::nullopt;
+
+ return op->get_dmclock_request_state();
+ }
+
+ void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final;
+};
+
+class PGPeeringItem : public PGOpQueueable {
+ PGPeeringEventRef evt;
+public:
+ PGPeeringItem(spg_t pg, PGPeeringEventRef e) : PGOpQueueable(pg), evt(e) {}
+ op_type_t get_op_type() const final {
+ return op_type_t::peering_event;
+ }
+ ostream &print(ostream &rhs) const final {
+ return rhs << "PGPeeringEvent(" << evt->get_desc() << ")";
+ }
+ void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final;
+ bool is_peering() const override {
+ return true;
+ }
+ bool peering_requires_pg() const override {
+ return evt->requires_pg;
+ }
+ const PGCreateInfo *creates_pg() const override {
+ return evt->create_info.get();
+ }
+ op_scheduler_class get_scheduler_class() const final {
+ return op_scheduler_class::immediate;
+ }
+};
+
+class PGSnapTrim : public PGOpQueueable {
+ epoch_t epoch_queued;
+public:
+ PGSnapTrim(
+ spg_t pg,
+ epoch_t epoch_queued)
+ : PGOpQueueable(pg), epoch_queued(epoch_queued) {}
+ op_type_t get_op_type() const final {
+ return op_type_t::bg_snaptrim;
+ }
+ ostream &print(ostream &rhs) const final {
+ return rhs << "PGSnapTrim(pgid=" << get_pgid()
+ << "epoch_queued=" << epoch_queued
+ << ")";
+ }
+ void run(
+ OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final;
+ op_scheduler_class get_scheduler_class() const final {
+ return op_scheduler_class::background_best_effort;
+ }
+};
+
+class PGScrub : public PGOpQueueable {
+ epoch_t epoch_queued;
+public:
+ PGScrub(
+ spg_t pg,
+ epoch_t epoch_queued)
+ : PGOpQueueable(pg), epoch_queued(epoch_queued) {}
+ op_type_t get_op_type() const final {
+ return op_type_t::bg_scrub;
+ }
+ ostream &print(ostream &rhs) const final {
+ return rhs << "PGScrub(pgid=" << get_pgid()
+ << "epoch_queued=" << epoch_queued
+ << ")";
+ }
+ void run(
+ OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final;
+ op_scheduler_class get_scheduler_class() const final {
+ return op_scheduler_class::background_best_effort;
+ }
+};
+
+class PGRecovery : public PGOpQueueable {
+ epoch_t epoch_queued;
+ uint64_t reserved_pushes;
+public:
+ PGRecovery(
+ spg_t pg,
+ epoch_t epoch_queued,
+ uint64_t reserved_pushes)
+ : PGOpQueueable(pg),
+ epoch_queued(epoch_queued),
+ reserved_pushes(reserved_pushes) {}
+ op_type_t get_op_type() const final {
+ return op_type_t::bg_recovery;
+ }
+ ostream &print(ostream &rhs) const final {
+ return rhs << "PGRecovery(pgid=" << get_pgid()
+ << "epoch_queued=" << epoch_queued
+ << "reserved_pushes=" << reserved_pushes
+ << ")";
+ }
+ uint64_t get_reserved_pushes() const final {
+ return reserved_pushes;
+ }
+ void run(
+ OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final;
+ op_scheduler_class get_scheduler_class() const final {
+ return op_scheduler_class::background_recovery;
+ }
+};
+
+class PGRecoveryContext : public PGOpQueueable {
+ unique_ptr<GenContext<ThreadPool::TPHandle&>> c;
+ epoch_t epoch;
+public:
+ PGRecoveryContext(spg_t pgid,
+ GenContext<ThreadPool::TPHandle&> *c, epoch_t epoch)
+ : PGOpQueueable(pgid),
+ c(c), epoch(epoch) {}
+ op_type_t get_op_type() const final {
+ return op_type_t::bg_recovery;
+ }
+ ostream &print(ostream &rhs) const final {
+ return rhs << "PGRecoveryContext(pgid=" << get_pgid()
+ << " c=" << c.get() << " epoch=" << epoch
+ << ")";
+ }
+ void run(
+ OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final;
+ op_scheduler_class get_scheduler_class() const final {
+ return op_scheduler_class::background_recovery;
+ }
+};
+
+class PGDelete : public PGOpQueueable {
+ epoch_t epoch_queued;
+public:
+ PGDelete(
+ spg_t pg,
+ epoch_t epoch_queued)
+ : PGOpQueueable(pg),
+ epoch_queued(epoch_queued) {}
+ op_type_t get_op_type() const final {
+ return op_type_t::bg_pg_delete;
+ }
+ ostream &print(ostream &rhs) const final {
+ return rhs << "PGDelete(" << get_pgid()
+ << " e" << epoch_queued
+ << ")";
+ }
+ void run(
+ OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final;
+ op_scheduler_class get_scheduler_class() const final {
+ return op_scheduler_class::background_best_effort;
+ }
+};
+
+}
#include "osd/mClockClientQueue.h"
+using namespace ceph::osd::scheduler;
int main(int argc, char **argv) {
std::vector<const char*> args(argv, argv+argc);
client3(100000001)
{}
-#if 0 // more work needed here
- Request create_client_op(epoch_t e, uint64_t owner) {
- return Request(spg_t(), OpQueueItem(OpRequestRef(), e));
+ struct MockDmclockItem : public PGOpQueueable {
+ ceph::qos::dmclock_request_t request;
+ MockDmclockItem(decltype(request) _request) :
+ PGOpQueueable(spg_t()), request(_request) {}
+
+public:
+ op_type_t get_op_type() const final {
+ return op_type_t::client_op;
+ }
+
+ ostream &print(ostream &rhs) const final { return rhs; }
+
+ std::optional<OpRequestRef> maybe_get_op() const final {
+ return std::nullopt;
+ }
+
+ op_scheduler_class get_scheduler_class() const final {
+ return op_scheduler_class::client;
+ }
+
+ std::optional<ceph::qos::dmclock_request_t>
+ get_dmclock_request_state() const final {
+ return request;
+ }
+
+ void run(OSD *osd, OSDShard *sdata, PGRef& pg, ThreadPool::TPHandle &handle) final {}
+ };
+
+ template <typename... Args>
+ Request create_dmclock(epoch_t e, uint64_t owner, Args... args) {
+ return Request(
+ OpSchedulerItem(
+ unique_ptr<OpSchedulerItem::OpQueueable>(
+ new MockDmclockItem(
+ std::forward<Args>(args)...)),
+ 12, 12,
+ utime_t(), owner, e));
}
-#endif
Request create_snaptrim(epoch_t e, uint64_t owner) {
- return Request(OpQueueItem(unique_ptr<OpQueueItem::OpQueueable>(new PGSnapTrim(spg_t(), e)),
+ return Request(OpSchedulerItem(unique_ptr<OpSchedulerItem::OpQueueable>(new PGSnapTrim(spg_t(), e)),
12, 12,
utime_t(), owner, e));
}
Request create_scrub(epoch_t e, uint64_t owner) {
- return Request(OpQueueItem(unique_ptr<OpQueueItem::OpQueueable>(new PGScrub(spg_t(), e)),
+ return Request(OpSchedulerItem(unique_ptr<OpSchedulerItem::OpQueueable>(new PGScrub(spg_t(), e)),
12, 12,
utime_t(), owner, e));
}
Request create_recovery(epoch_t e, uint64_t owner) {
- return Request(OpQueueItem(unique_ptr<OpQueueItem::OpQueueable>(new PGRecovery(spg_t(), e, 64)),
+ return Request(OpSchedulerItem(unique_ptr<OpSchedulerItem::OpQueueable>(new PGRecovery(spg_t(), e, 64)),
12, 12,
utime_t(), owner, e));
}
}
+TEST_F(MClockClientQueueTest, TestDistributedEnqueue) {
+ Request r1 = create_snaptrim(100, client1);
+ Request r2 = create_snaptrim(101, client2);
+ Request r3 = create_snaptrim(102, client3);
+ Request r4 = create_dmclock(103, client1, dmc::ReqParams(50,1));
+ Request r5 = create_dmclock(104, client2, dmc::ReqParams(30,1));
+ Request r6 = create_dmclock(105, client3, dmc::ReqParams(10,1));
+
+ q.enqueue(client1, 12, 0, std::move(r1));
+ q.enqueue(client2, 12, 0, std::move(r2));
+ q.enqueue(client3, 12, 0, std::move(r3));
+ q.enqueue(client1, 12, 0, std::move(r4));
+ q.enqueue(client2, 12, 0, std::move(r5));
+ q.enqueue(client3, 12, 0, std::move(r6));
+
+ Request r = q.dequeue();
+ r = q.dequeue();
+ r = q.dequeue();
+
+ r = q.dequeue();
+ ASSERT_EQ(105u, r.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_EQ(104u, r.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_EQ(103u, r.get_map_epoch());
+}
+
+
TEST_F(MClockClientQueueTest, TestEnqueueStrict) {
q.enqueue_strict(client1, 12, create_snaptrim(100, client1));
q.enqueue_strict(client2, 13, create_snaptrim(101, client2));
#include "osd/mClockOpClassQueue.h"
+using namespace ceph::osd::scheduler;
int main(int argc, char **argv) {
std::vector<const char*> args(argv, argv+argc);
#if 0 // more work needed here
Request create_client_op(epoch_t e, uint64_t owner) {
- return Request(spg_t(), OpQueueItem(OpRequestRef(), e));
+ return Request(spg_t(), OpSchedulerItem(OpRequestRef(), e));
}
#endif
Request create_snaptrim(epoch_t e, uint64_t owner) {
- return Request(OpQueueItem(unique_ptr<OpQueueItem::OpQueueable>(new PGSnapTrim(spg_t(), e)),
+ return Request(OpSchedulerItem(unique_ptr<OpSchedulerItem::OpQueueable>(new PGSnapTrim(spg_t(), e)),
12, 12,
utime_t(), owner, e));
}
Request create_scrub(epoch_t e, uint64_t owner) {
- return Request(OpQueueItem(unique_ptr<OpQueueItem::OpQueueable>(new PGScrub(spg_t(), e)),
+ return Request(OpSchedulerItem(unique_ptr<OpSchedulerItem::OpQueueable>(new PGScrub(spg_t(), e)),
12, 12,
utime_t(), owner, e));
}
Request create_recovery(epoch_t e, uint64_t owner) {
- return Request(OpQueueItem(unique_ptr<OpQueueItem::OpQueueable>(new PGRecovery(spg_t(), e, 64)),
+ return Request(OpSchedulerItem(unique_ptr<OpSchedulerItem::OpQueueable>(new PGRecovery(spg_t(), e, 64)),
12, 12,
utime_t(), owner, e));
}