virtual int flush() = 0;
virtual bool try_discard(interval_set<uint64_t> &to_release, bool async=true) { return false; }
virtual void discard_drain() { return; }
- virtual void swap_discard_queued(interval_set<uint64_t>& other) { other.clear(); }
+ virtual const interval_set<uint64_t>* get_discard_queued() { return nullptr;}
// for managing buffered readers/writers
virtual int invalidate_cache(uint64_t off, uint64_t len) = 0;
virtual int open(const std::string& path) = 0;
{
dout(10) << __func__ << dendl;
std::unique_lock l(discard_lock);
- while (!discard_queued.empty() || (discard_running > 0)) {
+ while (!discard_queued.empty() || discard_running) {
discard_cond.wait(l);
}
}
dout(10) << __func__ << " end" << dendl;
}
-void KernelDevice::swap_discard_queued(interval_set<uint64_t>& other)
-{
- std::unique_lock l(discard_lock);
- discard_queued.swap(other);
-}
-
void KernelDevice::_discard_thread(uint64_t tid)
{
dout(10) << __func__ << " thread " << tid << " start" << dendl;
discard_cond.wait(l);
dout(20) << __func__ << " wake" << dendl;
} else {
- // Limit local processing to MAX_LOCAL_DISCARD items.
- // This will allow threads to work in parallel
- // instead of a single thread taking over the whole discard_queued.
- // It will also allow threads to finish in a timely manner.
- constexpr unsigned MAX_LOCAL_DISCARD = 10;
- unsigned count = 0;
- for (auto p = discard_queued.begin();
- p != discard_queued.end() && count < MAX_LOCAL_DISCARD;
- ++p, ++count) {
- discard_processing.insert(p.get_start(), p.get_len());
- discard_queued.erase(p);
- }
-
- // there are multiple active threads -> must use a counter instead of a flag
- discard_running ++;
+ // Swap the queued discards for a local list we'll process here
+ // without caring about thread fairness. This allows the current
+ // thread to wait on the discard running while other threads pick
+ // up the next-in-queue, and do the same, ultimately issuing more
+ // discards in parallel, which is the goal.
+ discard_processing.swap(discard_queued);
+ discard_running = true;
l.unlock();
dout(20) << __func__ << " finishing" << dendl;
for (auto p = discard_processing.begin(); p != discard_processing.end(); ++p) {
discard_callback(discard_callback_priv, static_cast<void*>(&discard_processing));
discard_processing.clear();
l.lock();
- discard_running --;
- ceph_assert(discard_running >= 0);
+ discard_running = false;
}
}
ceph::mutex discard_lock = ceph::make_mutex("KernelDevice::discard_lock");
ceph::condition_variable discard_cond;
- int discard_running = 0;
+ bool discard_running = false;
interval_set<uint64_t> discard_queued;
struct AioCompletionThread : public Thread {
void aio_submit(IOContext *ioc) override;
void discard_drain() override;
- void swap_discard_queued(interval_set<uint64_t>& other) override;
+ const interval_set<uint64_t>* get_discard_queued() override { return &discard_queued;}
int collect_metadata(const std::string& prefix, std::map<std::string,std::string> *pm) const override;
int get_devname(std::string *s) const override {
if (devname.empty()) {
bdev->discard_drain();
}
- interval_set<uint64_t> discard_queued;
- bdev->swap_discard_queued(discard_queued);
- if (discard_queued.num_intervals() > 0) {
- dout(10) << __func__ << "::discard_drain: size=" << discard_queued.size()
- << " num_intervals=" << discard_queued.num_intervals() << dendl;
+ auto discard_queued = bdev->get_discard_queued();
+ if (discard_queued && (discard_queued->num_intervals() > 0)) {
+ dout(10) << __func__ << "::discard_drain: size=" << discard_queued->size()
+ << " num_intervals=" << discard_queued->num_intervals() << dendl;
// copy discard_queued to the allocator before storing it
- for (auto p = discard_queued.begin(); p != discard_queued.end(); ++p) {
+ for (auto p = discard_queued->begin(); p != discard_queued->end(); ++p) {
dout(20) << __func__ << "::discarded-extent=[" << p.get_start() << ", " << p.get_len() << "]" << dendl;
alloc->init_add_free(p.get_start(), p.get_len());
}
}
- // drain the items in the threads local discard_processing queues
- // There are only a few items in those queues so it is fine to do so in fast shutdown
- bdev->discard_drain();
int ret = store_allocator(alloc);
if (unlikely(ret != 0)) {
derr << __func__ << "::NCB::store_allocator() failed (we will need to rebuild it on startup)" << dendl;