]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
os/bluestore: Spillover Cleaner Thread implementation in BlueFS
authorJaya Prakash <jayaprakash@ibm.com>
Thu, 16 Apr 2026 15:30:28 +0000 (15:30 +0000)
committerJaya Prakash <jayaprakash@ibm.com>
Mon, 1 Jun 2026 18:37:27 +0000 (18:37 +0000)
Fixes: https://tracker.ceph.com/issues/74319
Signed-off-by: Jaya Prakash <jayaprakash@ibm.com>
src/os/bluestore/BlueFS.cc
src/os/bluestore/BlueFS.h

index 875269f04a4f743dd3d3b152028980526dc9a359..9e6e768b791fe1cb411959b3303c3e94f61486c9 100644 (file)
@@ -114,6 +114,9 @@ public:
        r = admin_socket->register_command("bluefs debug_inject_read_zeros", hook,
                                           "Injects 8K zeros into next BlueFS read. Debug only.");
        ceph_assert(r == 0);
+        r = admin_socket->register_command("bluefs spillover cleaner stats", hook,
+              "Show spillover cleaner thread stats");
+        ceph_assert(r == 0);
       }
     }
     return hook;
@@ -205,6 +208,8 @@ private:
       f->flush(out);
     } else if (command == "bluefs debug_inject_read_zeros") {
       bluefs->inject_read_zeros++;
+    } else if (command == "bluefs spillover cleaner stats") {
+      bluefs->dump_spillover_cleaner_stats(f);
     } else {
       errss << "Invalid command" << std::endl;
       return -ENOSYS;
@@ -219,7 +224,8 @@ BlueFS::BlueFS(CephContext* cct)
     ioc(MAX_BDEV),
     alloc(MAX_BDEV),
     alloc_size(MAX_BDEV, 0),
-    locked_alloc(MAX_BDEV)
+    locked_alloc(MAX_BDEV),
+    spillover_cleaner_thread(this)
 {
   dirty.pending_release.resize(MAX_BDEV);
   discard_cb[BDEV_WAL] = wal_discard_cb;
@@ -1986,6 +1992,99 @@ int BlueFS::log_dump()
   return r;
 }
 
+int BlueFS::migrate_file(
+  CephContext* cct,
+  FileRef file_ref,
+  int from_bdev,
+  int to_bdev)
+{
+  vector<byte> buf;
+  bool buffered = cct->_conf->bluefs_buffered_io;
+  bufferlist bl;
+  mempool::bluefs::vector<bluefs_extent_t> old_fnode_extents;
+  bluefs_fnode_t new_fnode;
+
+  std::unique_lock l(file_ref->lock);
+
+  if (file_ref->deleted)
+    return 0;
+
+  bool rewrite = std::any_of(
+    file_ref->fnode.extents.begin(),
+    file_ref->fnode.extents.end(),
+    [=](auto& ext) {
+      return ext.bdev != to_bdev;
+    });
+
+  if (!rewrite)
+    return 0;
+
+  old_fnode_extents = file_ref->fnode.extents;
+
+  for (const auto &old_ext : old_fnode_extents) {
+    buf.resize(old_ext.length);
+    int r = _bdev_read_random(old_ext.bdev,
+      old_ext.offset,
+      old_ext.length,
+      (char*)&buf.at(0),
+      buffered);
+    if (r != 0) {
+      derr << __func__ << " failed to read 0x" << std::hex
+           << old_ext.offset << "~" << old_ext.length << std::dec
+           << " from " << (int)old_ext.bdev << dendl;
+      return -EIO;
+    }
+    bl.append((char*)&buf[0], old_ext.length);
+  }
+
+  auto r = _allocate(to_bdev, bl.length(), 0,
+    &new_fnode, nullptr, 0, false);
+  if (r < 0) {
+    dout(10) << __func__ << " unable to allocate len 0x" << std::hex
+        << bl.length() << std::dec << " from " << (int)to_bdev
+        << ": " << cpp_strerror(r) << dendl;
+    return -ENOSPC;
+  }
+
+  uint64_t off = 0;
+  for (auto& i : new_fnode.extents) {
+    bufferlist cur;
+    uint64_t cur_len = std::min<uint64_t>(i.length, bl.length() - off);
+    ceph_assert(cur_len > 0);
+    cur.substr_of(bl, off, cur_len);
+    int w = bdev[to_bdev]->write(i.offset, cur, buffered);
+    ceph_assert(w == 0);
+    off += cur_len;
+    vselector->add_usage(file_ref->vselector_hint, i);
+  }
+
+  file_ref->fnode.swap_extents(new_fnode);
+  l.unlock();
+
+  {
+    std::lock_guard ll(log.lock);
+    log.t.op_file_update(file_ref->fnode);
+  }
+
+  sync_metadata(false);
+
+  std::map<int, PExtentVector> releases;
+  for (const auto& old_ext : old_fnode_extents) {
+    vselector->sub_usage(file_ref->vselector_hint, old_ext);
+    releases[old_ext.bdev].emplace_back(
+      old_ext.offset,
+      old_ext.length);
+    if (is_shared_alloc(old_ext.bdev)) {
+      shared_alloc->bluefs_used -= old_ext.length;
+    }
+  }
+  for (auto& [bdev, vec] : releases) {
+    alloc[bdev]->release(vec);
+  }
+
+  return 0;
+}
+
 int BlueFS::device_migrate_to_existing(
   CephContext *cct,
   const set<int>& devs_source,
@@ -5377,6 +5476,170 @@ void BlueFS::trim_free_space(const string& type, std::ostream& outss)
     outss << "device " << type << " trim done";
   }
 }
+
+void *BlueFS::SpilloverCleanerThread::entry() {
+  auto cct = bluefs->cct;
+  dout(10) << __func__ << " starting" << dendl;
+  {
+    std::lock_guard l(lock);
+    start = true;
+    cond.notify_all();
+  }
+
+  while (true) {
+    {
+      std::lock_guard l(lock);
+      if (stop) {
+        dout(10) << __func__ << " stopping" << dendl;
+        break;
+      }
+    }
+
+    auto start = ceph::mono_clock::now();
+    SpillOverCleanerAction action = logic->advance(bluefs);
+    auto end = ceph::mono_clock::now();
+
+    auto run_time =
+      std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
+
+    dout(20) << __func__
+         << " logic=" << logic->name
+         << " action=" << static_cast<int>(action)
+         << " runtime=" << run_time.count() << "ms"
+         << dendl;
+
+    if (action == SpillOverCleanerAction::DONE) {
+      break;
+    } else if (action == SpillOverCleanerAction::SLEEP) {
+      auto dur = std::chrono::seconds(
+        bluefs->cct->_conf->bluefs_spillover_idle_time);
+      dout(20) << __func__
+              << " entering sleep"
+              << " interval s=" << dur.count()
+              << dendl;
+      std::unique_lock l(lock);
+      cond.wait_for(l, dur);
+    } else if (action == SpillOverCleanerAction::CONTINUE) {
+      double work_ratio =
+        std::max(bluefs->cct->_conf->bluefs_spillover_cleaner_work_ratio, 0.01);
+      auto dur = std::chrono::duration_cast<std::chrono::milliseconds>(
+        run_time * (1.0 - work_ratio) / work_ratio);
+      dout(20) << __func__
+              << " entering wait"
+              << " work_ratio=" << work_ratio
+              << " runtime ms=" << run_time.count()
+              << " sleep ms=" << dur.count()
+              << dendl;
+      std::unique_lock l(lock);
+      cond.wait_for(l, dur);
+    }
+  }
+
+  {
+    std::lock_guard l(lock);
+    start = false;
+    created = false;
+  }
+
+  dout(10) << __func__ << " exiting" << dendl;
+
+  return nullptr;
+}
+
+BlueFS::SpillOverCleanerAction BlueFS::RebalanceToDB::advance(BlueFS *fs)
+{
+  if (idx >= pending.size()) {
+    pending.clear();
+    idx = 0;
+    std::unique_lock nl(fs->nodes.lock);
+    for (auto& [dir, dir_ref] : fs->nodes.dir_map) {
+      for (auto& [fname, file_ref] : dir_ref->file_map) {
+        if (file_ref->fnode.ino == 1) continue;
+
+        std::lock_guard fl(file_ref->lock);
+
+        bool has_slow = std::any_of(
+          file_ref->fnode.extents.begin(),
+          file_ref->fnode.extents.end(),
+          [](const auto& e) {
+            return e.bdev == BlueFS::BDEV_SLOW;
+          });
+
+        if (has_slow)
+          pending.push_back({dir + "/" + fname,file_ref});
+      }
+    }
+    if (pending.empty()) {
+      return SpillOverCleanerAction::SLEEP;
+    }
+  }
+  auto& [path, file] = pending[idx++];
+  int r = fs->migrate_file(
+    fs->cct,
+    file,
+    BlueFS::BDEV_SLOW,
+    BlueFS::BDEV_DB);
+
+  if (r == 0) {
+    uint64_t migrated;
+    uint64_t size;
+    {
+      std::lock_guard fl(file->lock);
+      migrated = file->fnode.allocated;
+      size = file->fnode.size;
+    }
+    append_entry(path,
+      size,
+      migrated,
+      BlueFS::BDEV_SLOW,
+      BlueFS::BDEV_DB);
+  }
+  return SpillOverCleanerAction::CONTINUE;
+}
+
+void BlueFS::RebalanceToDB::append_entry(
+    const std::string& path,
+    uint64_t size,
+    uint64_t migrated,
+    int from_bdev,
+    int to_bdev)
+{
+  std::ostringstream ss;
+  ss << path
+    << " size=0x" << std::hex << size
+    << " migrated=0x" << migrated
+    << std::dec
+    << " from dev=" << from_bdev
+    << "->" << to_bdev
+    << " ts="
+    << ceph_clock_now();
+
+  history.push_back(ss.str());
+
+  while (history.size() > max_history) {
+    history.pop_front();
+  }
+}
+
+void BlueFS::RebalanceToDB::dump_history(Formatter* f)
+{
+  f->open_array_section("Files Migrated");
+  for (const auto& entry : history) {
+    f->dump_string("File", entry);
+  }
+  f->close_section();
+}
+
+void BlueFS::RebalanceToDB::dump_plan(Formatter* f)
+{
+  f->open_array_section("pending_files");
+  for (size_t i = idx; i < pending.size(); i++) {
+    auto& [path, fileref] = pending[i];
+    f->dump_string("file", path);
+  }
+  f->close_section();
+}
+
 // ===============================================
 // OriginalVolumeSelector
 
index 7295fd47869b9bbcf1575e52fbcca870b9a172a0..8226867797b9b50f2ac19006a2da6470a128a7a4 100644 (file)
@@ -810,6 +810,12 @@ public:
   }
   int fsck();
 
+  int migrate_file(
+    CephContext *cct,
+    FileRef file_ref,
+    int from_bdev,
+    int to_bdev
+  );
   int device_migrate_to_new(
     CephContext *cct,
     const std::set<int>& devs_source,
@@ -965,6 +971,119 @@ private:
     const std::string& dir,
     const std::string& name
   );
+
+  enum class SpillOverCleanerAction {
+    CONTINUE,
+    SLEEP,
+    DONE
+  };
+
+  struct SpilloverCleanerLogic {
+    virtual ~SpilloverCleanerLogic() = default;
+    const char* const name;
+    explicit SpilloverCleanerLogic(const char* n) : name(n) {}
+    virtual SpillOverCleanerAction advance(
+      BlueFS* fs
+    ) = 0;
+    virtual void dump_history(Formatter* f) = 0;
+    virtual void dump_plan(Formatter* f) = 0;
+  };
+
+  struct SpilloverCleanerThread : public Thread {
+  public:
+    explicit SpilloverCleanerThread(BlueFS* fs)
+      : bluefs(fs) {}
+    void* entry() override;
+    void init() {
+      std::lock_guard l(lock);
+      if (created) {
+        return;
+      }
+      stop = false;
+      logic = std::make_shared<RebalanceToDB>();
+      create("bluefs_splctr");
+      created = true;
+    }
+    void shutdown() {
+      {
+        std::unique_lock l(lock);
+        if (!created) {
+          return;
+        }
+        while (!start) {
+          cond.wait(l);
+        }
+        stop = true;
+        cond.notify_all();
+      }
+      join();
+    }
+
+    void update_logic(std::shared_ptr<SpilloverCleanerLogic> logic_)
+    {
+      std::lock_guard l(lock);
+      logic = std::move(logic_);
+      cond.notify_all();
+    }
+    void dump(Formatter* f) {
+      std::lock_guard l(lock);
+      if (logic) {
+        logic->dump_history(f);
+        logic->dump_plan(f);
+      }
+    }
+  private:
+    BlueFS* bluefs;
+    ceph::mutex lock = ceph::make_mutex("SpilloverCleanerThread::lock");
+    ceph::condition_variable cond;
+
+    bool stop = false;
+    bool start = false;
+    bool created = false;
+
+    std::shared_ptr<SpilloverCleanerLogic> logic;
+  } spillover_cleaner_thread;
+
+  struct RebalanceToDB : public SpilloverCleanerLogic {
+    std::vector<std::pair<std::string, FileRef>> pending;
+    std::deque<std::string> history;
+    static constexpr size_t max_history = 100;
+    size_t idx = 0;
+    RebalanceToDB()
+      : SpilloverCleanerLogic("RebalanceToDB") {}
+    SpillOverCleanerAction advance(
+      BlueFS* fs
+    ) override;
+    void dump_history(Formatter* f) override;
+    void dump_plan(Formatter* f) override;
+    void append_entry(const std::string& path,
+      uint64_t size,
+      uint64_t migrated,
+      int from_bdev,
+      int to_bdev);
+  };
+public:
+  void spillover_cleaner_start()
+  {
+    spillover_cleaner_thread.init();
+  }
+  void spillover_cleaner_stop()
+  {
+    spillover_cleaner_thread.shutdown();
+  }
+  void update_spillover_cleaner_from_config()
+  {
+    if(cct->_conf->bluefs_spillover_cleaner) {
+      spillover_cleaner_start();
+    } else {
+      spillover_cleaner_stop();
+    }
+  }
+  void dump_spillover_cleaner_stats(Formatter* f) {
+    f->open_object_section("spillover_cleaner_stats");
+    spillover_cleaner_thread.dump(f);
+    f->close_section();
+  }
 };
 
 class OriginalVolumeSelector : public BlueFSVolumeSelector {