sdata->context_queue.move_to(oncommits);
}
- if (sdata->scheduler->empty()) {
+ WorkItem work_item;
+ while (!std::get_if<OpSchedulerItem>(&work_item)) {
+ if (sdata->scheduler->empty()) {
+ if (osd->is_stopping()) {
+ sdata->shard_lock.unlock();
+ for (auto c : oncommits) {
+ dout(10) << __func__ << " discarding in-flight oncommit " << c << dendl;
+ delete c;
+ }
+ return; // OSD shutdown, discard.
+ }
+ sdata->shard_lock.unlock();
+ handle_oncommits(oncommits);
+ return;
+ }
+
+ work_item = sdata->scheduler->dequeue();
if (osd->is_stopping()) {
sdata->shard_lock.unlock();
for (auto c : oncommits) {
- dout(10) << __func__ << " discarding in-flight oncommit " << c << dendl;
- delete c;
+ dout(10) << __func__ << " discarding in-flight oncommit " << c << dendl;
+ delete c;
}
return; // OSD shutdown, discard.
}
- sdata->shard_lock.unlock();
- handle_oncommits(oncommits);
- return;
- }
- OpSchedulerItem item = sdata->scheduler->dequeue();
+ // If the work item is scheduled in the future, wait until
+ // the time returned in the dequeue response before retrying.
+ if (auto when_ready = std::get_if<double>(&work_item)) {
+ if (is_smallest_thread_index) {
+ sdata->shard_lock.unlock();
+ handle_oncommits(oncommits);
+ return;
+ }
+ std::unique_lock wait_lock{sdata->sdata_wait_lock};
+ auto future_time = ceph::real_clock::from_double(*when_ready);
+ dout(10) << __func__ << " dequeue future request at " << future_time << dendl;
+ sdata->shard_lock.unlock();
+ ++sdata->waiting_threads;
+ sdata->sdata_cond.wait_until(wait_lock, future_time);
+ --sdata->waiting_threads;
+ wait_lock.unlock();
+ sdata->shard_lock.lock();
+ }
+ } // while
+
+ // Access the stored item
+ auto item = std::move(std::get<OpSchedulerItem>(work_item));
if (osd->is_stopping()) {
sdata->shard_lock.unlock();
for (auto c : oncommits) {
sdata->scheduler->enqueue(std::move(item));
}
- if (empty) {
+ {
std::lock_guard l{sdata->sdata_wait_lock};
- sdata->sdata_cond.notify_all();
+ if (empty) {
+ sdata->sdata_cond.notify_all();
+ } else if (sdata->waiting_threads) {
+ sdata->sdata_cond.notify_one();
+ }
}
}
std::string sdata_wait_lock_name;
ceph::mutex sdata_wait_lock;
ceph::condition_variable sdata_cond;
+ int waiting_threads = 0;
ceph::mutex osdmap_lock; ///< protect shard_osdmap updates vs users w/o shard_lock
OSDMapRef shard_osdmap;
#pragma once
#include <ostream>
+#include <variant>
#include "common/ceph_context.h"
#include "osd/scheduler/OpSchedulerItem.h"
namespace ceph::osd::scheduler {
using client = uint64_t;
+using WorkItem = std::variant<std::monostate, OpSchedulerItem, double>;
/**
* Base interface for classes responsible for choosing
virtual bool empty() const = 0;
// Return next op to be processed
- virtual OpSchedulerItem dequeue() = 0;
+ virtual WorkItem dequeue() = 0;
// Dump formatted representation for the queue
virtual void dump(ceph::Formatter &f) const = 0;
return queue.empty();
}
- OpSchedulerItem dequeue() final {
+ WorkItem dequeue() final {
return queue.dequeue();
}
std::bind(&mClockScheduler::ClientRegistry::get_info,
&client_registry,
_1),
- dmc::AtLimit::Allow,
+ dmc::AtLimit::Wait,
cct->_conf.get_val<double>("osd_mclock_scheduler_anticipation_timeout"))
{
cct->_conf.add_observer(this);
// putting the item back in the queue
}
-OpSchedulerItem mClockScheduler::dequeue()
+WorkItem mClockScheduler::dequeue()
{
if (!immediate.empty()) {
auto ret = std::move(immediate.back());
immediate.pop_back();
- return ret;
+ return std::move(ret);
} else {
mclock_queue_t::PullReq result = scheduler.pull_request();
if (result.is_future()) {
- ceph_assert(
- 0 == "Not implemented, user would have to be able to be woken up");
- return std::move(*(OpSchedulerItem*)nullptr);
+ return result.getTime();
} else if (result.is_none()) {
ceph_assert(
0 == "Impossible, must have checked empty() first");
void enqueue_front(OpSchedulerItem &&item) final;
// Return an op to be dispatch
- OpSchedulerItem dequeue() final;
+ WorkItem dequeue() final;
// Returns if the queue is empty
bool empty() const final {
utime_t(), owner, e);
}
+OpSchedulerItem get_item(WorkItem item)
+{
+ return std::move(std::get<OpSchedulerItem>(item));
+}
+
TEST_F(mClockSchedulerTest, TestEmpty) {
ASSERT_TRUE(q.empty());
std::list<OpSchedulerItem> reqs;
- reqs.push_back(q.dequeue());
- reqs.push_back(q.dequeue());
+ reqs.push_back(get_item(q.dequeue()));
+ reqs.push_back(get_item(q.dequeue()));
ASSERT_FALSE(q.empty());
q.enqueue(create_item(103, client1));
q.enqueue(create_item(104, client1));
- auto r = q.dequeue();
+ auto r = get_item(q.dequeue());
ASSERT_EQ(100u, r.get_map_epoch());
- r = q.dequeue();
+ r = get_item(q.dequeue());
ASSERT_EQ(101u, r.get_map_epoch());
- r = q.dequeue();
+ r = get_item(q.dequeue());
ASSERT_EQ(102u, r.get_map_epoch());
- r = q.dequeue();
+ r = get_item(q.dequeue());
ASSERT_EQ(103u, r.get_map_epoch());
- r = q.dequeue();
+ r = get_item(q.dequeue());
ASSERT_EQ(104u, r.get_map_epoch());
}
}
for (unsigned i = 0; i < NUM * 3; ++i) {
ASSERT_FALSE(q.empty());
- auto r = q.dequeue();
+ auto r = get_item(q.dequeue());
auto owner = r.get_owner();
auto niter = next.find(owner);
ASSERT_FALSE(niter == next.end());