r = admin_socket->register_command("bluefs debug_inject_read_zeros", hook,
"Injects 8K zeros into next BlueFS read. Debug only.");
ceph_assert(r == 0);
+ r = admin_socket->register_command("bluefs spillover cleaner stats", hook,
+ "Show spillover cleaner thread stats");
+ ceph_assert(r == 0);
}
}
return hook;
f->flush(out);
} else if (command == "bluefs debug_inject_read_zeros") {
bluefs->inject_read_zeros++;
+ } else if (command == "bluefs spillover cleaner stats") {
+ bluefs->dump_spillover_cleaner_stats(f);
} else {
errss << "Invalid command" << std::endl;
return -ENOSYS;
ioc(MAX_BDEV),
alloc(MAX_BDEV),
alloc_size(MAX_BDEV, 0),
- locked_alloc(MAX_BDEV)
+ locked_alloc(MAX_BDEV),
+ spillover_cleaner_thread(this)
{
dirty.pending_release.resize(MAX_BDEV);
discard_cb[BDEV_WAL] = wal_discard_cb;
return r;
}
+int BlueFS::migrate_file(
+ CephContext* cct,
+ FileRef file_ref,
+ int from_bdev,
+ int to_bdev)
+{
+ vector<byte> buf;
+ bool buffered = cct->_conf->bluefs_buffered_io;
+ bufferlist bl;
+ mempool::bluefs::vector<bluefs_extent_t> old_fnode_extents;
+ bluefs_fnode_t new_fnode;
+
+ std::unique_lock l(file_ref->lock);
+
+ if (file_ref->deleted)
+ return 0;
+
+ bool rewrite = std::any_of(
+ file_ref->fnode.extents.begin(),
+ file_ref->fnode.extents.end(),
+ [=](auto& ext) {
+ return ext.bdev != to_bdev;
+ });
+
+ if (!rewrite)
+ return 0;
+
+ old_fnode_extents = file_ref->fnode.extents;
+
+ for (const auto &old_ext : old_fnode_extents) {
+ buf.resize(old_ext.length);
+ int r = _bdev_read_random(old_ext.bdev,
+ old_ext.offset,
+ old_ext.length,
+ (char*)&buf.at(0),
+ buffered);
+ if (r != 0) {
+ derr << __func__ << " failed to read 0x" << std::hex
+ << old_ext.offset << "~" << old_ext.length << std::dec
+ << " from " << (int)old_ext.bdev << dendl;
+ return -EIO;
+ }
+ bl.append((char*)&buf[0], old_ext.length);
+ }
+
+ auto r = _allocate(to_bdev, bl.length(), 0,
+ &new_fnode, nullptr, 0, false);
+ if (r < 0) {
+ dout(10) << __func__ << " unable to allocate len 0x" << std::hex
+ << bl.length() << std::dec << " from " << (int)to_bdev
+ << ": " << cpp_strerror(r) << dendl;
+ return -ENOSPC;
+ }
+
+ uint64_t off = 0;
+ for (auto& i : new_fnode.extents) {
+ bufferlist cur;
+ uint64_t cur_len = std::min<uint64_t>(i.length, bl.length() - off);
+ ceph_assert(cur_len > 0);
+ cur.substr_of(bl, off, cur_len);
+ int w = bdev[to_bdev]->write(i.offset, cur, buffered);
+ ceph_assert(w == 0);
+ off += cur_len;
+ vselector->add_usage(file_ref->vselector_hint, i);
+ }
+
+ file_ref->fnode.swap_extents(new_fnode);
+ l.unlock();
+
+ {
+ std::lock_guard ll(log.lock);
+ log.t.op_file_update(file_ref->fnode);
+ }
+
+ sync_metadata(false);
+
+ std::map<int, PExtentVector> releases;
+ for (const auto& old_ext : old_fnode_extents) {
+ vselector->sub_usage(file_ref->vselector_hint, old_ext);
+ releases[old_ext.bdev].emplace_back(
+ old_ext.offset,
+ old_ext.length);
+ if (is_shared_alloc(old_ext.bdev)) {
+ shared_alloc->bluefs_used -= old_ext.length;
+ }
+ }
+ for (auto& [bdev, vec] : releases) {
+ alloc[bdev]->release(vec);
+ }
+
+ return 0;
+}
+
int BlueFS::device_migrate_to_existing(
CephContext *cct,
const set<int>& devs_source,
outss << "device " << type << " trim done";
}
}
+
+void *BlueFS::SpilloverCleanerThread::entry() {
+ auto cct = bluefs->cct;
+ dout(10) << __func__ << " starting" << dendl;
+ {
+ std::lock_guard l(lock);
+ start = true;
+ cond.notify_all();
+ }
+
+ while (true) {
+ {
+ std::lock_guard l(lock);
+ if (stop) {
+ dout(10) << __func__ << " stopping" << dendl;
+ break;
+ }
+ }
+
+ auto start = ceph::mono_clock::now();
+ SpillOverCleanerAction action = logic->advance(bluefs);
+ auto end = ceph::mono_clock::now();
+
+ auto run_time =
+ std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
+
+ dout(20) << __func__
+ << " logic=" << logic->name
+ << " action=" << static_cast<int>(action)
+ << " runtime=" << run_time.count() << "ms"
+ << dendl;
+
+ if (action == SpillOverCleanerAction::DONE) {
+ break;
+ } else if (action == SpillOverCleanerAction::SLEEP) {
+ auto dur = std::chrono::seconds(
+ bluefs->cct->_conf->bluefs_spillover_idle_time);
+ dout(20) << __func__
+ << " entering sleep"
+ << " interval s=" << dur.count()
+ << dendl;
+ std::unique_lock l(lock);
+ cond.wait_for(l, dur);
+ } else if (action == SpillOverCleanerAction::CONTINUE) {
+ double work_ratio =
+ std::max(bluefs->cct->_conf->bluefs_spillover_cleaner_work_ratio, 0.01);
+ auto dur = std::chrono::duration_cast<std::chrono::milliseconds>(
+ run_time * (1.0 - work_ratio) / work_ratio);
+ dout(20) << __func__
+ << " entering wait"
+ << " work_ratio=" << work_ratio
+ << " runtime ms=" << run_time.count()
+ << " sleep ms=" << dur.count()
+ << dendl;
+ std::unique_lock l(lock);
+ cond.wait_for(l, dur);
+ }
+ }
+
+ {
+ std::lock_guard l(lock);
+ start = false;
+ created = false;
+ }
+
+ dout(10) << __func__ << " exiting" << dendl;
+
+ return nullptr;
+}
+
+BlueFS::SpillOverCleanerAction BlueFS::RebalanceToDB::advance(BlueFS *fs)
+{
+ if (idx >= pending.size()) {
+ pending.clear();
+ idx = 0;
+ std::unique_lock nl(fs->nodes.lock);
+ for (auto& [dir, dir_ref] : fs->nodes.dir_map) {
+ for (auto& [fname, file_ref] : dir_ref->file_map) {
+ if (file_ref->fnode.ino == 1) continue;
+
+ std::lock_guard fl(file_ref->lock);
+
+ bool has_slow = std::any_of(
+ file_ref->fnode.extents.begin(),
+ file_ref->fnode.extents.end(),
+ [](const auto& e) {
+ return e.bdev == BlueFS::BDEV_SLOW;
+ });
+
+ if (has_slow)
+ pending.push_back({dir + "/" + fname,file_ref});
+ }
+ }
+ if (pending.empty()) {
+ return SpillOverCleanerAction::SLEEP;
+ }
+ }
+ auto& [path, file] = pending[idx++];
+ int r = fs->migrate_file(
+ fs->cct,
+ file,
+ BlueFS::BDEV_SLOW,
+ BlueFS::BDEV_DB);
+
+ if (r == 0) {
+ uint64_t migrated;
+ uint64_t size;
+ {
+ std::lock_guard fl(file->lock);
+ migrated = file->fnode.allocated;
+ size = file->fnode.size;
+ }
+ append_entry(path,
+ size,
+ migrated,
+ BlueFS::BDEV_SLOW,
+ BlueFS::BDEV_DB);
+ }
+ return SpillOverCleanerAction::CONTINUE;
+}
+
+void BlueFS::RebalanceToDB::append_entry(
+ const std::string& path,
+ uint64_t size,
+ uint64_t migrated,
+ int from_bdev,
+ int to_bdev)
+{
+ std::ostringstream ss;
+ ss << path
+ << " size=0x" << std::hex << size
+ << " migrated=0x" << migrated
+ << std::dec
+ << " from dev=" << from_bdev
+ << "->" << to_bdev
+ << " ts="
+ << ceph_clock_now();
+
+ history.push_back(ss.str());
+
+ while (history.size() > max_history) {
+ history.pop_front();
+ }
+}
+
+void BlueFS::RebalanceToDB::dump_history(Formatter* f)
+{
+ f->open_array_section("Files Migrated");
+ for (const auto& entry : history) {
+ f->dump_string("File", entry);
+ }
+ f->close_section();
+}
+
+void BlueFS::RebalanceToDB::dump_plan(Formatter* f)
+{
+ f->open_array_section("pending_files");
+ for (size_t i = idx; i < pending.size(); i++) {
+ auto& [path, fileref] = pending[i];
+ f->dump_string("file", path);
+ }
+ f->close_section();
+}
+
// ===============================================
// OriginalVolumeSelector
}
int fsck();
+ int migrate_file(
+ CephContext *cct,
+ FileRef file_ref,
+ int from_bdev,
+ int to_bdev
+ );
int device_migrate_to_new(
CephContext *cct,
const std::set<int>& devs_source,
const std::string& dir,
const std::string& name
);
+
+ enum class SpillOverCleanerAction {
+ CONTINUE,
+ SLEEP,
+ DONE
+ };
+
+ struct SpilloverCleanerLogic {
+ virtual ~SpilloverCleanerLogic() = default;
+ const char* const name;
+ explicit SpilloverCleanerLogic(const char* n) : name(n) {}
+ virtual SpillOverCleanerAction advance(
+ BlueFS* fs
+ ) = 0;
+ virtual void dump_history(Formatter* f) = 0;
+ virtual void dump_plan(Formatter* f) = 0;
+ };
+
+ struct SpilloverCleanerThread : public Thread {
+ public:
+ explicit SpilloverCleanerThread(BlueFS* fs)
+ : bluefs(fs) {}
+ void* entry() override;
+ void init() {
+ std::lock_guard l(lock);
+ if (created) {
+ return;
+ }
+ stop = false;
+ logic = std::make_shared<RebalanceToDB>();
+ create("bluefs_splctr");
+ created = true;
+ }
+ void shutdown() {
+ {
+ std::unique_lock l(lock);
+ if (!created) {
+ return;
+ }
+ while (!start) {
+ cond.wait(l);
+ }
+ stop = true;
+ cond.notify_all();
+ }
+ join();
+ }
+
+ void update_logic(std::shared_ptr<SpilloverCleanerLogic> logic_)
+ {
+ std::lock_guard l(lock);
+ logic = std::move(logic_);
+ cond.notify_all();
+ }
+ void dump(Formatter* f) {
+ std::lock_guard l(lock);
+ if (logic) {
+ logic->dump_history(f);
+ logic->dump_plan(f);
+ }
+ }
+ private:
+ BlueFS* bluefs;
+ ceph::mutex lock = ceph::make_mutex("SpilloverCleanerThread::lock");
+ ceph::condition_variable cond;
+
+ bool stop = false;
+ bool start = false;
+ bool created = false;
+
+ std::shared_ptr<SpilloverCleanerLogic> logic;
+ } spillover_cleaner_thread;
+
+ struct RebalanceToDB : public SpilloverCleanerLogic {
+ std::vector<std::pair<std::string, FileRef>> pending;
+ std::deque<std::string> history;
+ static constexpr size_t max_history = 100;
+ size_t idx = 0;
+ RebalanceToDB()
+ : SpilloverCleanerLogic("RebalanceToDB") {}
+ SpillOverCleanerAction advance(
+ BlueFS* fs
+ ) override;
+ void dump_history(Formatter* f) override;
+ void dump_plan(Formatter* f) override;
+ void append_entry(const std::string& path,
+ uint64_t size,
+ uint64_t migrated,
+ int from_bdev,
+ int to_bdev);
+ };
+public:
+ void spillover_cleaner_start()
+ {
+ spillover_cleaner_thread.init();
+ }
+ void spillover_cleaner_stop()
+ {
+ spillover_cleaner_thread.shutdown();
+ }
+ void update_spillover_cleaner_from_config()
+ {
+ if(cct->_conf->bluefs_spillover_cleaner) {
+ spillover_cleaner_start();
+ } else {
+ spillover_cleaner_stop();
+ }
+ }
+ void dump_spillover_cleaner_stats(Formatter* f) {
+ f->open_object_section("spillover_cleaner_stats");
+ spillover_cleaner_thread.dump(f);
+ f->close_section();
+ }
};
class OriginalVolumeSelector : public BlueFSVolumeSelector {