From: Jaya Prakash Date: Thu, 16 Apr 2026 15:30:28 +0000 (+0000) Subject: os/bluestore: Spillover Cleaner Thread implementation in BlueFS X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=9110805c75758a304e786c4742510469a7d2b20b;p=ceph.git os/bluestore: Spillover Cleaner Thread implementation in BlueFS Fixes: https://tracker.ceph.com/issues/74319 Signed-off-by: Jaya Prakash --- diff --git a/src/os/bluestore/BlueFS.cc b/src/os/bluestore/BlueFS.cc index 875269f04a4..9e6e768b791 100644 --- a/src/os/bluestore/BlueFS.cc +++ b/src/os/bluestore/BlueFS.cc @@ -114,6 +114,9 @@ public: r = admin_socket->register_command("bluefs debug_inject_read_zeros", hook, "Injects 8K zeros into next BlueFS read. Debug only."); ceph_assert(r == 0); + r = admin_socket->register_command("bluefs spillover cleaner stats", hook, + "Show spillover cleaner thread stats"); + ceph_assert(r == 0); } } return hook; @@ -205,6 +208,8 @@ private: f->flush(out); } else if (command == "bluefs debug_inject_read_zeros") { bluefs->inject_read_zeros++; + } else if (command == "bluefs spillover cleaner stats") { + bluefs->dump_spillover_cleaner_stats(f); } else { errss << "Invalid command" << std::endl; return -ENOSYS; @@ -219,7 +224,8 @@ BlueFS::BlueFS(CephContext* cct) ioc(MAX_BDEV), alloc(MAX_BDEV), alloc_size(MAX_BDEV, 0), - locked_alloc(MAX_BDEV) + locked_alloc(MAX_BDEV), + spillover_cleaner_thread(this) { dirty.pending_release.resize(MAX_BDEV); discard_cb[BDEV_WAL] = wal_discard_cb; @@ -1986,6 +1992,99 @@ int BlueFS::log_dump() return r; } +int BlueFS::migrate_file( + CephContext* cct, + FileRef file_ref, + int from_bdev, + int to_bdev) +{ + vector buf; + bool buffered = cct->_conf->bluefs_buffered_io; + bufferlist bl; + mempool::bluefs::vector old_fnode_extents; + bluefs_fnode_t new_fnode; + + std::unique_lock l(file_ref->lock); + + if (file_ref->deleted) + return 0; + + bool rewrite = std::any_of( + file_ref->fnode.extents.begin(), + file_ref->fnode.extents.end(), + [=](auto& ext) { + return ext.bdev != to_bdev; + }); + + if (!rewrite) + return 0; + + old_fnode_extents = file_ref->fnode.extents; + + for (const auto &old_ext : old_fnode_extents) { + buf.resize(old_ext.length); + int r = _bdev_read_random(old_ext.bdev, + old_ext.offset, + old_ext.length, + (char*)&buf.at(0), + buffered); + if (r != 0) { + derr << __func__ << " failed to read 0x" << std::hex + << old_ext.offset << "~" << old_ext.length << std::dec + << " from " << (int)old_ext.bdev << dendl; + return -EIO; + } + bl.append((char*)&buf[0], old_ext.length); + } + + auto r = _allocate(to_bdev, bl.length(), 0, + &new_fnode, nullptr, 0, false); + if (r < 0) { + dout(10) << __func__ << " unable to allocate len 0x" << std::hex + << bl.length() << std::dec << " from " << (int)to_bdev + << ": " << cpp_strerror(r) << dendl; + return -ENOSPC; + } + + uint64_t off = 0; + for (auto& i : new_fnode.extents) { + bufferlist cur; + uint64_t cur_len = std::min(i.length, bl.length() - off); + ceph_assert(cur_len > 0); + cur.substr_of(bl, off, cur_len); + int w = bdev[to_bdev]->write(i.offset, cur, buffered); + ceph_assert(w == 0); + off += cur_len; + vselector->add_usage(file_ref->vselector_hint, i); + } + + file_ref->fnode.swap_extents(new_fnode); + l.unlock(); + + { + std::lock_guard ll(log.lock); + log.t.op_file_update(file_ref->fnode); + } + + sync_metadata(false); + + std::map releases; + for (const auto& old_ext : old_fnode_extents) { + vselector->sub_usage(file_ref->vselector_hint, old_ext); + releases[old_ext.bdev].emplace_back( + old_ext.offset, + old_ext.length); + if (is_shared_alloc(old_ext.bdev)) { + shared_alloc->bluefs_used -= old_ext.length; + } + } + for (auto& [bdev, vec] : releases) { + alloc[bdev]->release(vec); + } + + return 0; +} + int BlueFS::device_migrate_to_existing( CephContext *cct, const set& devs_source, @@ -5377,6 +5476,170 @@ void BlueFS::trim_free_space(const string& type, std::ostream& outss) outss << "device " << type << " trim done"; } } + +void *BlueFS::SpilloverCleanerThread::entry() { + auto cct = bluefs->cct; + dout(10) << __func__ << " starting" << dendl; + { + std::lock_guard l(lock); + start = true; + cond.notify_all(); + } + + while (true) { + { + std::lock_guard l(lock); + if (stop) { + dout(10) << __func__ << " stopping" << dendl; + break; + } + } + + auto start = ceph::mono_clock::now(); + SpillOverCleanerAction action = logic->advance(bluefs); + auto end = ceph::mono_clock::now(); + + auto run_time = + std::chrono::duration_cast(end - start); + + dout(20) << __func__ + << " logic=" << logic->name + << " action=" << static_cast(action) + << " runtime=" << run_time.count() << "ms" + << dendl; + + if (action == SpillOverCleanerAction::DONE) { + break; + } else if (action == SpillOverCleanerAction::SLEEP) { + auto dur = std::chrono::seconds( + bluefs->cct->_conf->bluefs_spillover_idle_time); + dout(20) << __func__ + << " entering sleep" + << " interval s=" << dur.count() + << dendl; + std::unique_lock l(lock); + cond.wait_for(l, dur); + } else if (action == SpillOverCleanerAction::CONTINUE) { + double work_ratio = + std::max(bluefs->cct->_conf->bluefs_spillover_cleaner_work_ratio, 0.01); + auto dur = std::chrono::duration_cast( + run_time * (1.0 - work_ratio) / work_ratio); + dout(20) << __func__ + << " entering wait" + << " work_ratio=" << work_ratio + << " runtime ms=" << run_time.count() + << " sleep ms=" << dur.count() + << dendl; + std::unique_lock l(lock); + cond.wait_for(l, dur); + } + } + + { + std::lock_guard l(lock); + start = false; + created = false; + } + + dout(10) << __func__ << " exiting" << dendl; + + return nullptr; +} + +BlueFS::SpillOverCleanerAction BlueFS::RebalanceToDB::advance(BlueFS *fs) +{ + if (idx >= pending.size()) { + pending.clear(); + idx = 0; + std::unique_lock nl(fs->nodes.lock); + for (auto& [dir, dir_ref] : fs->nodes.dir_map) { + for (auto& [fname, file_ref] : dir_ref->file_map) { + if (file_ref->fnode.ino == 1) continue; + + std::lock_guard fl(file_ref->lock); + + bool has_slow = std::any_of( + file_ref->fnode.extents.begin(), + file_ref->fnode.extents.end(), + [](const auto& e) { + return e.bdev == BlueFS::BDEV_SLOW; + }); + + if (has_slow) + pending.push_back({dir + "/" + fname,file_ref}); + } + } + if (pending.empty()) { + return SpillOverCleanerAction::SLEEP; + } + } + auto& [path, file] = pending[idx++]; + int r = fs->migrate_file( + fs->cct, + file, + BlueFS::BDEV_SLOW, + BlueFS::BDEV_DB); + + if (r == 0) { + uint64_t migrated; + uint64_t size; + { + std::lock_guard fl(file->lock); + migrated = file->fnode.allocated; + size = file->fnode.size; + } + append_entry(path, + size, + migrated, + BlueFS::BDEV_SLOW, + BlueFS::BDEV_DB); + } + return SpillOverCleanerAction::CONTINUE; +} + +void BlueFS::RebalanceToDB::append_entry( + const std::string& path, + uint64_t size, + uint64_t migrated, + int from_bdev, + int to_bdev) +{ + std::ostringstream ss; + ss << path + << " size=0x" << std::hex << size + << " migrated=0x" << migrated + << std::dec + << " from dev=" << from_bdev + << "->" << to_bdev + << " ts=" + << ceph_clock_now(); + + history.push_back(ss.str()); + + while (history.size() > max_history) { + history.pop_front(); + } +} + +void BlueFS::RebalanceToDB::dump_history(Formatter* f) +{ + f->open_array_section("Files Migrated"); + for (const auto& entry : history) { + f->dump_string("File", entry); + } + f->close_section(); +} + +void BlueFS::RebalanceToDB::dump_plan(Formatter* f) +{ + f->open_array_section("pending_files"); + for (size_t i = idx; i < pending.size(); i++) { + auto& [path, fileref] = pending[i]; + f->dump_string("file", path); + } + f->close_section(); +} + // =============================================== // OriginalVolumeSelector diff --git a/src/os/bluestore/BlueFS.h b/src/os/bluestore/BlueFS.h index 7295fd47869..8226867797b 100644 --- a/src/os/bluestore/BlueFS.h +++ b/src/os/bluestore/BlueFS.h @@ -810,6 +810,12 @@ public: } int fsck(); + int migrate_file( + CephContext *cct, + FileRef file_ref, + int from_bdev, + int to_bdev + ); int device_migrate_to_new( CephContext *cct, const std::set& devs_source, @@ -965,6 +971,119 @@ private: const std::string& dir, const std::string& name ); + + enum class SpillOverCleanerAction { + CONTINUE, + SLEEP, + DONE + }; + + struct SpilloverCleanerLogic { + virtual ~SpilloverCleanerLogic() = default; + const char* const name; + explicit SpilloverCleanerLogic(const char* n) : name(n) {} + virtual SpillOverCleanerAction advance( + BlueFS* fs + ) = 0; + virtual void dump_history(Formatter* f) = 0; + virtual void dump_plan(Formatter* f) = 0; + }; + + struct SpilloverCleanerThread : public Thread { + public: + explicit SpilloverCleanerThread(BlueFS* fs) + : bluefs(fs) {} + void* entry() override; + void init() { + std::lock_guard l(lock); + if (created) { + return; + } + stop = false; + logic = std::make_shared(); + create("bluefs_splctr"); + created = true; + } + void shutdown() { + { + std::unique_lock l(lock); + if (!created) { + return; + } + while (!start) { + cond.wait(l); + } + stop = true; + cond.notify_all(); + } + join(); + } + + void update_logic(std::shared_ptr logic_) + { + std::lock_guard l(lock); + logic = std::move(logic_); + cond.notify_all(); + } + void dump(Formatter* f) { + std::lock_guard l(lock); + if (logic) { + logic->dump_history(f); + logic->dump_plan(f); + } + } + private: + BlueFS* bluefs; + ceph::mutex lock = ceph::make_mutex("SpilloverCleanerThread::lock"); + ceph::condition_variable cond; + + bool stop = false; + bool start = false; + bool created = false; + + std::shared_ptr logic; + } spillover_cleaner_thread; + + struct RebalanceToDB : public SpilloverCleanerLogic { + std::vector> pending; + std::deque history; + static constexpr size_t max_history = 100; + size_t idx = 0; + RebalanceToDB() + : SpilloverCleanerLogic("RebalanceToDB") {} + SpillOverCleanerAction advance( + BlueFS* fs + ) override; + void dump_history(Formatter* f) override; + void dump_plan(Formatter* f) override; + void append_entry(const std::string& path, + uint64_t size, + uint64_t migrated, + int from_bdev, + int to_bdev); + }; +public: + void spillover_cleaner_start() + { + spillover_cleaner_thread.init(); + } + void spillover_cleaner_stop() + { + spillover_cleaner_thread.shutdown(); + } + void update_spillover_cleaner_from_config() + { + if(cct->_conf->bluefs_spillover_cleaner) { + spillover_cleaner_start(); + } else { + spillover_cleaner_stop(); + } + } + void dump_spillover_cleaner_stats(Formatter* f) { + f->open_object_section("spillover_cleaner_stats"); + spillover_cleaner_thread.dump(f); + f->close_section(); + } }; class OriginalVolumeSelector : public BlueFSVolumeSelector {