On fast-shutdown take over the main discarded queue copying it to the allocator and only wait for the threads to commit their small private discarded queues
Signed-off-by: Gabriel BenHanokh <gbenhano@redhat.com>
virtual int flush() = 0;
virtual bool try_discard(interval_set<uint64_t> &to_release, bool async=true) { return false; }
virtual void discard_drain() { return; }
- virtual const interval_set<uint64_t>* get_discard_queued() { return nullptr;}
+ virtual void swap_discard_queued(interval_set<uint64_t>& other) { other.clear(); }
// for managing buffered readers/writers
virtual int invalidate_cache(uint64_t off, uint64_t len) = 0;
virtual int open(const std::string& path) = 0;
{
dout(10) << __func__ << dendl;
std::unique_lock l(discard_lock);
- while (!discard_queued.empty() || discard_running) {
+ while (!discard_queued.empty() || (discard_running > 0)) {
discard_cond.wait(l);
}
}
dout(10) << __func__ << " end" << dendl;
}
+void KernelDevice::swap_discard_queued(interval_set<uint64_t>& other)
+{
+ std::unique_lock l(discard_lock);
+ discard_queued.swap(other);
+}
+
void KernelDevice::_discard_thread(uint64_t tid)
{
dout(10) << __func__ << " thread " << tid << " start" << dendl;
discard_cond.wait(l);
dout(20) << __func__ << " wake" << dendl;
} else {
- // Swap the queued discards for a local list we'll process here
- // without caring about thread fairness. This allows the current
- // thread to wait on the discard running while other threads pick
- // up the next-in-queue, and do the same, ultimately issuing more
- // discards in parallel, which is the goal.
- discard_processing.swap(discard_queued);
- discard_running = true;
+ // Limit local processing to MAX_LOCAL_DISCARD items.
+ // This will allow threads to work in parallel
+ // instead of a single thread taking over the whole discard_queued.
+ // It will also allow threads to finish in a timely manner.
+ constexpr unsigned MAX_LOCAL_DISCARD = 10;
+ unsigned count = 0;
+ for (auto p = discard_queued.begin();
+ p != discard_queued.end() && count < MAX_LOCAL_DISCARD;
+ ++p, ++count) {
+ discard_processing.insert(p.get_start(), p.get_len());
+ discard_queued.erase(p);
+ }
+
+ // there are multiple active threads -> must use a counter instead of a flag
+ discard_running ++;
l.unlock();
dout(20) << __func__ << " finishing" << dendl;
for (auto p = discard_processing.begin(); p != discard_processing.end(); ++p) {
discard_callback(discard_callback_priv, static_cast<void*>(&discard_processing));
discard_processing.clear();
l.lock();
- discard_running = false;
+ discard_running --;
+ ceph_assert(discard_running >= 0);
}
}
ceph::mutex discard_lock = ceph::make_mutex("KernelDevice::discard_lock");
ceph::condition_variable discard_cond;
- bool discard_running = false;
+ int discard_running = 0;
interval_set<uint64_t> discard_queued;
struct AioCompletionThread : public Thread {
void aio_submit(IOContext *ioc) override;
void discard_drain() override;
- const interval_set<uint64_t>* get_discard_queued() override { return &discard_queued;}
+ void swap_discard_queued(interval_set<uint64_t>& other) override;
int collect_metadata(const std::string& prefix, std::map<std::string,std::string> *pm) const override;
int get_devname(std::string *s) const override {
if (devname.empty()) {
bdev->discard_drain();
}
- auto discard_queued = bdev->get_discard_queued();
- if (discard_queued && (discard_queued->num_intervals() > 0)) {
- dout(10) << __func__ << "::discard_drain: size=" << discard_queued->size()
- << " num_intervals=" << discard_queued->num_intervals() << dendl;
+ interval_set<uint64_t> discard_queued;
+ bdev->swap_discard_queued(discard_queued);
+ if (discard_queued.num_intervals() > 0) {
+ dout(10) << __func__ << "::discard_drain: size=" << discard_queued.size()
+ << " num_intervals=" << discard_queued.num_intervals() << dendl;
// copy discard_queued to the allocator before storing it
- for (auto p = discard_queued->begin(); p != discard_queued->end(); ++p) {
+ for (auto p = discard_queued.begin(); p != discard_queued.end(); ++p) {
dout(20) << __func__ << "::discarded-extent=[" << p.get_start() << ", " << p.get_len() << "]" << dendl;
alloc->init_add_free(p.get_start(), p.get_len());
}
}
+ // drain the items in the threads local discard_processing queues
+ // There are only a few items in those queues so it is fine to do so in fast shutdown
+ bdev->discard_drain();
int ret = store_allocator(alloc);
if (unlikely(ret != 0)) {
derr << __func__ << "::NCB::store_allocator() failed (we will need to rebuild it on startup)" << dendl;