From e563e8a84ee822fda413a96b1dcb422458abe57e Mon Sep 17 00:00:00 2001 From: Sridhar Seshasayee Date: Thu, 8 Oct 2020 19:33:02 +0530 Subject: [PATCH] osd: Handle possible future work items returned from mclock scheduler. It's quite possible for a scheduler like mClock to return no work items based on parameters like reservation, weight and limit. For e.g., if a client exceeds its limit for a given time period, the scheduler returns a response indicating that the operation in its queue will be scheduled at a later point in time. The osd in such a scenario must handle this case. mClockScheduler::dequeue() now either returns an item to be scheduled or a time (double) indicating when the op will be ready for dequeuing. The next dequeue is attempted in the subsequent cycle after waiting until the 'future_time' expires or when the worker thread is notified as part of OSD::ShardedOpWQ::_process() within the sharded threadpool worker loop. Signed-off-by: Sridhar Seshasayee --- src/osd/OSD.cc | 57 +++++++++++++++++++++++----- src/osd/OSD.h | 1 + src/osd/scheduler/OpScheduler.h | 6 ++- src/osd/scheduler/mClockScheduler.cc | 10 ++--- src/osd/scheduler/mClockScheduler.h | 2 +- src/test/osd/TestMClockScheduler.cc | 21 ++++++---- 6 files changed, 70 insertions(+), 27 deletions(-) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 932ce8539f9..cfbb82feb0e 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -10487,21 +10487,54 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) sdata->context_queue.move_to(oncommits); } - if (sdata->scheduler->empty()) { + WorkItem work_item; + while (!std::get_if(&work_item)) { + if (sdata->scheduler->empty()) { + if (osd->is_stopping()) { + sdata->shard_lock.unlock(); + for (auto c : oncommits) { + dout(10) << __func__ << " discarding in-flight oncommit " << c << dendl; + delete c; + } + return; // OSD shutdown, discard. + } + sdata->shard_lock.unlock(); + handle_oncommits(oncommits); + return; + } + + work_item = sdata->scheduler->dequeue(); if (osd->is_stopping()) { sdata->shard_lock.unlock(); for (auto c : oncommits) { - dout(10) << __func__ << " discarding in-flight oncommit " << c << dendl; - delete c; + dout(10) << __func__ << " discarding in-flight oncommit " << c << dendl; + delete c; } return; // OSD shutdown, discard. } - sdata->shard_lock.unlock(); - handle_oncommits(oncommits); - return; - } - OpSchedulerItem item = sdata->scheduler->dequeue(); + // If the work item is scheduled in the future, wait until + // the time returned in the dequeue response before retrying. + if (auto when_ready = std::get_if(&work_item)) { + if (is_smallest_thread_index) { + sdata->shard_lock.unlock(); + handle_oncommits(oncommits); + return; + } + std::unique_lock wait_lock{sdata->sdata_wait_lock}; + auto future_time = ceph::real_clock::from_double(*when_ready); + dout(10) << __func__ << " dequeue future request at " << future_time << dendl; + sdata->shard_lock.unlock(); + ++sdata->waiting_threads; + sdata->sdata_cond.wait_until(wait_lock, future_time); + --sdata->waiting_threads; + wait_lock.unlock(); + sdata->shard_lock.lock(); + } + } // while + + // Access the stored item + auto item = std::move(std::get(work_item)); if (osd->is_stopping()) { sdata->shard_lock.unlock(); for (auto c : oncommits) { @@ -10750,9 +10783,13 @@ void OSD::ShardedOpWQ::_enqueue(OpSchedulerItem&& item) { sdata->scheduler->enqueue(std::move(item)); } - if (empty) { + { std::lock_guard l{sdata->sdata_wait_lock}; - sdata->sdata_cond.notify_all(); + if (empty) { + sdata->sdata_cond.notify_all(); + } else if (sdata->waiting_threads) { + sdata->sdata_cond.notify_one(); + } } } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index dc25f604c35..c12ec84c75a 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -984,6 +984,7 @@ struct OSDShard { std::string sdata_wait_lock_name; ceph::mutex sdata_wait_lock; ceph::condition_variable sdata_cond; + int waiting_threads = 0; ceph::mutex osdmap_lock; ///< protect shard_osdmap updates vs users w/o shard_lock OSDMapRef shard_osdmap; diff --git a/src/osd/scheduler/OpScheduler.h b/src/osd/scheduler/OpScheduler.h index 4f95dfe1da2..0a17118f02f 100644 --- a/src/osd/scheduler/OpScheduler.h +++ b/src/osd/scheduler/OpScheduler.h @@ -15,6 +15,7 @@ #pragma once #include +#include #include "common/ceph_context.h" #include "osd/scheduler/OpSchedulerItem.h" @@ -22,6 +23,7 @@ namespace ceph::osd::scheduler { using client = uint64_t; +using WorkItem = std::variant; /** * Base interface for classes responsible for choosing @@ -40,7 +42,7 @@ public: virtual bool empty() const = 0; // Return next op to be processed - virtual OpSchedulerItem dequeue() = 0; + virtual WorkItem dequeue() = 0; // Dump formatted representation for the queue virtual void dump(ceph::Formatter &f) const = 0; @@ -117,7 +119,7 @@ public: return queue.empty(); } - OpSchedulerItem dequeue() final { + WorkItem dequeue() final { return queue.dequeue(); } diff --git a/src/osd/scheduler/mClockScheduler.cc b/src/osd/scheduler/mClockScheduler.cc index 0e96fbfae0d..c075ed7d2d3 100644 --- a/src/osd/scheduler/mClockScheduler.cc +++ b/src/osd/scheduler/mClockScheduler.cc @@ -35,7 +35,7 @@ mClockScheduler::mClockScheduler(CephContext *cct) : std::bind(&mClockScheduler::ClientRegistry::get_info, &client_registry, _1), - dmc::AtLimit::Allow, + dmc::AtLimit::Wait, cct->_conf.get_val("osd_mclock_scheduler_anticipation_timeout")) { cct->_conf.add_observer(this); @@ -114,18 +114,16 @@ void mClockScheduler::enqueue_front(OpSchedulerItem&& item) // putting the item back in the queue } -OpSchedulerItem mClockScheduler::dequeue() +WorkItem mClockScheduler::dequeue() { if (!immediate.empty()) { auto ret = std::move(immediate.back()); immediate.pop_back(); - return ret; + return std::move(ret); } else { mclock_queue_t::PullReq result = scheduler.pull_request(); if (result.is_future()) { - ceph_assert( - 0 == "Not implemented, user would have to be able to be woken up"); - return std::move(*(OpSchedulerItem*)nullptr); + return result.getTime(); } else if (result.is_none()) { ceph_assert( 0 == "Impossible, must have checked empty() first"); diff --git a/src/osd/scheduler/mClockScheduler.h b/src/osd/scheduler/mClockScheduler.h index 1b469a1a12e..2573afead3b 100644 --- a/src/osd/scheduler/mClockScheduler.h +++ b/src/osd/scheduler/mClockScheduler.h @@ -110,7 +110,7 @@ public: void enqueue_front(OpSchedulerItem &&item) final; // Return an op to be dispatch - OpSchedulerItem dequeue() final; + WorkItem dequeue() final; // Returns if the queue is empty bool empty() const final { diff --git a/src/test/osd/TestMClockScheduler.cc b/src/test/osd/TestMClockScheduler.cc index 65e2a1ae91f..775dbb2f993 100644 --- a/src/test/osd/TestMClockScheduler.cc +++ b/src/test/osd/TestMClockScheduler.cc @@ -77,6 +77,11 @@ OpSchedulerItem create_item( utime_t(), owner, e); } +OpSchedulerItem get_item(WorkItem item) +{ + return std::move(std::get(item)); +} + TEST_F(mClockSchedulerTest, TestEmpty) { ASSERT_TRUE(q.empty()); @@ -88,8 +93,8 @@ TEST_F(mClockSchedulerTest, TestEmpty) { std::list reqs; - reqs.push_back(q.dequeue()); - reqs.push_back(q.dequeue()); + reqs.push_back(get_item(q.dequeue())); + reqs.push_back(get_item(q.dequeue())); ASSERT_FALSE(q.empty()); @@ -115,19 +120,19 @@ TEST_F(mClockSchedulerTest, TestSingleClientOrderedEnqueueDequeue) { q.enqueue(create_item(103, client1)); q.enqueue(create_item(104, client1)); - auto r = q.dequeue(); + auto r = get_item(q.dequeue()); ASSERT_EQ(100u, r.get_map_epoch()); - r = q.dequeue(); + r = get_item(q.dequeue()); ASSERT_EQ(101u, r.get_map_epoch()); - r = q.dequeue(); + r = get_item(q.dequeue()); ASSERT_EQ(102u, r.get_map_epoch()); - r = q.dequeue(); + r = get_item(q.dequeue()); ASSERT_EQ(103u, r.get_map_epoch()); - r = q.dequeue(); + r = get_item(q.dequeue()); ASSERT_EQ(104u, r.get_map_epoch()); } @@ -145,7 +150,7 @@ TEST_F(mClockSchedulerTest, TestMultiClientOrderedEnqueueDequeue) { } for (unsigned i = 0; i < NUM * 3; ++i) { ASSERT_FALSE(q.empty()); - auto r = q.dequeue(); + auto r = get_item(q.dequeue()); auto owner = r.get_owner(); auto niter = next.find(owner); ASSERT_FALSE(niter == next.end()); -- 2.39.5