]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os/bluestore/bluefs: Split single bluefs lock into multiple smaller locks
authorAdam Kupczyk <akupczyk@redhat.com>
Tue, 29 Jun 2021 11:03:56 +0000 (13:03 +0200)
committerIgor Fedotov <igor.fedotov@croit.io>
Tue, 27 Jun 2023 10:52:06 +0000 (13:52 +0300)
Splits bluefs lock into log, dirty, dirs, file and writer locks.
This breaks severe locking issues, and makes bluefs more multithreaded.

Signed-off-by: Adam Kupczyk <akupczyk@redhat.com>
(cherry picked from commit e74474d1fd29164555e1ffab79bde991f621130d)

 Conflicts:
src/os/bluestore/BlueFS.cc
 (misordered backports, lacking https://github.com/ceph/ceph/pull/48171
  in the source commit)

src/os/bluestore/BlueFS.cc
src/os/bluestore/BlueFS.h

index 5c994dda05780119608f2779936bf5ff92c88f67..b293a921654566ad85a0efd3e9b34ec2f5bfb019 100644 (file)
@@ -148,7 +148,7 @@ private:
       out.append(ss);
     } else if (command == "bluefs files list") {
       const char* devnames[3] = {"wal","db","slow"};
-      std::lock_guard l(bluefs->lock);
+      std::lock_guard l(bluefs->dirs_lock);
       f->open_array_section("files");
       for (auto &d : bluefs->dir_map) {
         std::string dir = d.first;
@@ -310,6 +310,7 @@ void BlueFS::_shutdown_logger()
   delete logger;
 }
 
+//AK - TODO - locking needed but not certain
 void BlueFS::_update_logger_stats()
 {
   // we must be holding the lock
@@ -392,7 +393,6 @@ void BlueFS::handle_discard(unsigned id, interval_set<uint64_t>& to_release)
 
 uint64_t BlueFS::get_used()
 {
-  std::lock_guard l(lock);
   uint64_t used = 0;
   for (unsigned id = 0; id < MAX_BDEV; ++id) {
     used += _get_used(id);
@@ -418,7 +418,6 @@ uint64_t BlueFS::get_used(unsigned id)
 {
   ceph_assert(id < alloc.size());
   ceph_assert(alloc[id]);
-  std::lock_guard l(lock);
   return _get_used(id);
 }
 
@@ -431,13 +430,11 @@ uint64_t BlueFS::_get_total(unsigned id) const
 
 uint64_t BlueFS::get_total(unsigned id)
 {
-  std::lock_guard l(lock);
   return _get_total(id);
 }
 
 uint64_t BlueFS::get_free(unsigned id)
 {
-  std::lock_guard l(lock);
   ceph_assert(id < alloc.size());
   return alloc[id]->get_free();
 }
@@ -467,7 +464,7 @@ void BlueFS::dump_block_extents(ostream& out)
 
 int BlueFS::get_block_extents(unsigned id, interval_set<uint64_t> *extents)
 {
-  std::lock_guard l(lock);
+  std::lock_guard dirl(dirs_lock);
   dout(10) << __func__ << " bdev " << id << dendl;
   ceph_assert(id < alloc.size());
   for (auto& p : file_map) {
@@ -482,7 +479,6 @@ int BlueFS::get_block_extents(unsigned id, interval_set<uint64_t> *extents)
 
 int BlueFS::mkfs(uuid_d osd_uuid, const bluefs_layout_t& layout)
 {
-  std::unique_lock l(lock);
   dout(1) << __func__
          << " osd_uuid " << osd_uuid
          << dendl;
@@ -519,7 +515,7 @@ int BlueFS::mkfs(uuid_d osd_uuid, const bluefs_layout_t& layout)
 
   // initial txn
   log_t.op_init();
-  _flush_and_sync_log(l);
+  flush_and_sync_log();
 
   // write supers
   super.log_fnode = log_file->fnode;
@@ -869,7 +865,7 @@ int BlueFS::prepare_new_device(int id, const bluefs_layout_t& layout)
       new_log_dev_cur = BDEV_NEWDB;
       new_log_dev_next = BDEV_DB;
     }
-    _rewrite_log_and_layout_sync(false,
+    rewrite_log_and_layout_sync(false,
       BDEV_NEWDB,
       new_log_dev_cur,
       new_log_dev_next,
@@ -877,7 +873,7 @@ int BlueFS::prepare_new_device(int id, const bluefs_layout_t& layout)
       layout);
     //}
   } else if(id == BDEV_NEWWAL) {
-    _rewrite_log_and_layout_sync(false,
+    rewrite_log_and_layout_sync(false,
       BDEV_DB,
       BDEV_NEWWAL,
       BDEV_WAL,
@@ -908,7 +904,6 @@ void BlueFS::get_devices(set<string> *ls)
 
 int BlueFS::fsck()
 {
-  std::lock_guard l(lock);
   dout(1) << __func__ << dendl;
   // hrm, i think we check everything on mount...
   return 0;
@@ -1712,7 +1707,7 @@ int BlueFS::device_migrate_to_existing(
         new_log_dev_next;
   }
 
-  _rewrite_log_and_layout_sync(
+  rewrite_log_and_layout_sync(
     false,
     (flags & REMOVE_DB) ? BDEV_SLOW : BDEV_DB,
     new_log_dev_cur,
@@ -1847,7 +1842,7 @@ int BlueFS::device_migrate_to_new(
         BDEV_DB :
        BDEV_SLOW;
 
-  _rewrite_log_and_layout_sync(
+  rewrite_log_and_layout_sync(
     false,
     super_dev,
     new_log_dev_cur,
@@ -1877,12 +1872,16 @@ void BlueFS::_drop_link(FileRef file)
   dout(20) << __func__ << " had refs " << file->refs
           << " on " << file->fnode << dendl;
   ceph_assert(file->refs > 0);
+  ceph_assert(ceph_mutex_is_locked(log_lock));
+
   --file->refs;
   if (file->refs == 0) {
     dout(20) << __func__ << " destroying " << file->fnode << dendl;
     ceph_assert(file->num_reading.load() == 0);
     vselector->sub_usage(file->vselector_hint, file->fnode);
     log_t.op_file_remove(file->fnode.ino);
+
+    std::lock_guard dl(dirty_lock);
     for (auto& r : file->fnode.extents) {
       pending_release[r.bdev].insert(r.offset, r.length);
     }
@@ -1890,6 +1889,7 @@ void BlueFS::_drop_link(FileRef file)
     file->deleted = true;
 
     if (file->dirty_seq) {
+      // retract request to serialize changes
       ceph_assert(file->dirty_seq > log_seq_stable);
       ceph_assert(dirty_files.count(file->dirty_seq));
       auto it = dirty_files[file->dirty_seq].iterator_to(*file);
@@ -2116,8 +2116,9 @@ int64_t BlueFS::_read(
   return ret;
 }
 
-void BlueFS::_invalidate_cache(FileRef f, uint64_t offset, uint64_t length)
+void BlueFS::invalidate_cache(FileRef f, uint64_t offset, uint64_t length)
 {
+  std::lock_guard l(f->lock);
   dout(10) << __func__ << " file " << f->fnode
           << " 0x" << std::hex << offset << "~" << length << std::dec
            << dendl;
@@ -2137,8 +2138,9 @@ void BlueFS::_invalidate_cache(FileRef f, uint64_t offset, uint64_t length)
   }
 }
 
-uint64_t BlueFS::_estimate_log_size()
+uint64_t BlueFS::estimate_log_size()
 {
+  std::lock_guard dirl(dirs_lock);
   int avg_dir_size = 40;  // fixme
   int avg_file_size = 12;
   uint64_t size = 4096 * 2;
@@ -2150,37 +2152,44 @@ uint64_t BlueFS::_estimate_log_size()
 
 void BlueFS::compact_log()
 {
-  std::unique_lock<ceph::mutex> l(lock);
   if (!cct->_conf->bluefs_replay_recovery_disable_compact) {
     if (cct->_conf->bluefs_compact_log_sync) {
-      _compact_log_sync();
+      compact_log_sync();
     } else {
-      _compact_log_async(l);
+      compact_log_async();
     }
   }
 }
 
-bool BlueFS::_should_compact_log()
+bool BlueFS::should_start_compact_log()
 {
-  uint64_t current = log_writer->file->fnode.size;
-  uint64_t expected = _estimate_log_size();
+  if (log_is_compacting.load() == true) {
+    // compaction is already running
+    return false;
+  }
+  uint64_t current;
+  {
+    std::lock_guard dirl(log_lock);
+    current = log_writer->file->fnode.size;
+  }
+  uint64_t expected = estimate_log_size();
   float ratio = (float)current / (float)expected;
   dout(10) << __func__ << " current 0x" << std::hex << current
           << " expected " << expected << std::dec
           << " ratio " << ratio
-          << (new_log ? " (async compaction in progress)" : "")
           << dendl;
-  if (new_log ||
-      current < cct->_conf->bluefs_log_compact_min_size ||
+  if (current < cct->_conf->bluefs_log_compact_min_size ||
       ratio < cct->_conf->bluefs_log_compact_min_ratio) {
     return false;
   }
   return true;
 }
 
-void BlueFS::_compact_log_dump_metadata(bluefs_transaction_t *t,
+void BlueFS::compact_log_dump_metadata(bluefs_transaction_t *t,
                                        int flags)
 {
+  std::lock_guard dirl(dirs_lock);
+
   t->seq = 1;
   t->uuid = super.uuid;
   dout(20) << __func__ << " op_init" << dendl;
@@ -2190,7 +2199,7 @@ void BlueFS::_compact_log_dump_metadata(bluefs_transaction_t *t,
     if (ino == 1)
       continue;
     ceph_assert(ino > 1);
-
+    //AK - TODO - touching fnode - need to lock
     for(auto& e : file_ref->fnode.extents) {
       auto bdev = e.bdev;
       auto bdev_new = bdev;
@@ -2227,12 +2236,12 @@ void BlueFS::_compact_log_dump_metadata(bluefs_transaction_t *t,
   }
 }
 
-void BlueFS::_compact_log_sync()
+void BlueFS::compact_log_sync()
 {
   dout(10) << __func__ << dendl;
   auto prefer_bdev =
     vselector->select_prefer_bdev(log_writer->file->vselector_hint);
-  _rewrite_log_and_layout_sync(true,
+  rewrite_log_and_layout_sync(true,
     BDEV_DB,
     prefer_bdev,
     prefer_bdev,
@@ -2241,13 +2250,16 @@ void BlueFS::_compact_log_sync()
   logger->inc(l_bluefs_log_compactions);
 }
 
-void BlueFS::_rewrite_log_and_layout_sync(bool allocate_with_fallback,
-                                         int super_dev,
-                                         int log_dev,
-                                         int log_dev_new,
-                                         int flags,
-                                         std::optional<bluefs_layout_t> layout)
+void BlueFS::rewrite_log_and_layout_sync(bool allocate_with_fallback,
+                                        int super_dev,
+                                        int log_dev,
+                                        int log_dev_new,
+                                        int flags,
+                                        std::optional<bluefs_layout_t> layout)
 {
+  //ceph_assert(ceph_mutex_is_notlocked(log_lock));
+  std::lock_guard ll(log_lock);
+
   File *log_file = log_writer->file.get();
 
   // clear out log (be careful who calls us!!!)
@@ -2259,7 +2271,7 @@ void BlueFS::_rewrite_log_and_layout_sync(bool allocate_with_fallback,
                       << " flags:" << flags
                       << dendl;
   bluefs_transaction_t t;
-  _compact_log_dump_metadata(&t, flags);
+  compact_log_dump_metadata(&t, flags);
 
   dout(20) << __func__ << " op_jump_seq " << log_seq << dendl;
   t.op_jump_seq(log_seq);
@@ -2327,6 +2339,7 @@ void BlueFS::_rewrite_log_and_layout_sync(bool allocate_with_fallback,
   flush_bdev();
 
   dout(10) << __func__ << " release old log extents " << old_fnode.extents << dendl;
+  std::lock_guard dl(dirty_lock);
   for (auto& r : old_fnode.extents) {
     pending_release[r.bdev].insert(r.offset, r.length);
   }
@@ -2354,29 +2367,42 @@ void BlueFS::_rewrite_log_and_layout_sync(bool allocate_with_fallback,
  *
  * 8. Release the old log space.  Clean up.
  */
-void BlueFS::_compact_log_async(std::unique_lock<ceph::mutex>& l)
+
+void BlueFS::compact_log_async()
 {
   dout(10) << __func__ << dendl;
+  // only one compaction allowed at one time
+  bool old_is_comp = std::atomic_exchange(&log_is_compacting, true);
+  if (old_is_comp) {
+    dout(10) << __func__ << " ongoing" <<dendl;
+    return;
+  }
+
+  log_lock.lock();
   File *log_file = log_writer->file.get();
-  ceph_assert(!new_log);
-  ceph_assert(!new_log_writer);
+  FileWriter *new_log_writer = nullptr;
+  FileRef new_log = nullptr;
+  uint64_t new_log_jump_to = 0;
+  uint64_t old_log_jump_to = 0;
 
-  // create a new log [writer] so that we know compaction is in progress
-  // (see _should_compact_log)
   new_log = ceph::make_ref<File>();
-  new_log->fnode.ino = 0;   // so that _flush_range won't try to log the fnode
+  new_log->fnode.ino = 0;   // we use _flush_special to avoid log of the fnode
 
-  // 0. wait for any racing flushes to complete.  (We do not want to block
-  // in _flush_sync_log with jump_to set or else a racing thread might flush
-  // our entries and our jump_to update won't be correct.)
-  while (log_flushing) {
-    dout(10) << __func__ << " log is currently flushing, waiting" << dendl;
-    log_cond.wait(l);
-  }
+  // Part 1.
+  // Prepare current log for jumping into it.
+  // 1. Allocate extent
+  // 2. Update op to log
+  // 3. Jump op to log
+  // During that, no one else can write to log, otherwise we risk jumping backwards.
+  // We need to sync log, because we are injecting discontinuity, and writer is not prepared for that.
+
+  //signal _maybe_extend_log that expansion of log is temporary inacceptable
+  bool old_forbidden = atomic_exchange(&log_forbidden_to_expand, true);
+  ceph_assert(old_forbidden == false);
 
   vselector->sub_usage(log_file->vselector_hint, log_file->fnode);
 
-  // 1. allocate new log space and jump to it.
+  // 1.1 allocate new log space and jump to it.
   old_log_jump_to = log_file->fnode.get_allocated();
   uint64_t runway = log_file->fnode.get_allocated() - log_writer->get_effective_write_pos();
   dout(10) << __func__ << " old_log_jump_to 0x" << std::hex << old_log_jump_to
@@ -2391,21 +2417,24 @@ void BlueFS::_compact_log_async(std::unique_lock<ceph::mutex>& l)
 
   // update the log file change and log a jump to the offset where we want to
   // write the new entries
-  log_t.op_file_update(log_file->fnode);
-  log_t.op_jump(log_seq, old_log_jump_to);
+  log_t.op_file_update(log_file->fnode); // 1.2
+  log_t.op_jump(log_seq, old_log_jump_to); // 1.3
 
   // we need to flush all bdev because we will be streaming all dirty files to log
   // TODO - think - if _flush_and_sync_log_jump will not add dirty files nor release pending allocations
   // then flush_bdev() will not be necessary
   flush_bdev();
-
   _flush_and_sync_log_jump(old_log_jump_to, runway);
 
+  log_lock.unlock();
+  // out of jump section - now log can be used to write to
+
   // 2. prepare compacted log
   bluefs_transaction_t t;
-  //avoid record two times in log_t and _compact_log_dump_metadata.
-  log_t.clear();
-  _compact_log_dump_metadata(&t, 0);
+  //this needs files lock
+  //what will happen, if a file is modified *twice* before we stream it to log?
+  //the later state that we capture will be seen earlier and replay will see a temporary retraction (!)
+  compact_log_dump_metadata(&t, 0);
 
   uint64_t max_alloc_size = std::max(alloc_size[BDEV_WAL],
                                     std::max(alloc_size[BDEV_DB],
@@ -2423,7 +2452,7 @@ void BlueFS::_compact_log_async(std::unique_lock<ceph::mutex>& l)
   ceph_assert(r == 0);
 
   // we might have some more ops in log_t due to _allocate call
-  t.claim_ops(log_t);
+  // t.claim_ops(log_t); no we no longer track allocations in log
 
   bufferlist bl;
   encode(t, bl);
@@ -2433,15 +2462,16 @@ void BlueFS::_compact_log_async(std::unique_lock<ceph::mutex>& l)
           << std::dec << dendl;
 
   new_log_writer = _create_writer(new_log);
-  new_log_writer->append(bl);
 
+  new_log_writer->append(bl);
+  new_log_writer->lock.lock();
   // 3. flush
   r = _flush_special(new_log_writer);
   ceph_assert(r == 0);
 
   // 4. wait
-  _flush_bdev_safely(new_log_writer);
-
+  _flush_bdev(new_log_writer);
+  new_log_writer->lock.unlock();
   // 5. update our log fnode
   // discard first old_log_jump_to extents
 
@@ -2496,29 +2526,42 @@ void BlueFS::_compact_log_async(std::unique_lock<ceph::mutex>& l)
   ++super.version;
   _write_super(BDEV_DB);
 
-  lock.unlock();
   flush_bdev();
-  lock.lock();
+
+  old_forbidden = atomic_exchange(&log_forbidden_to_expand, false);
+  ceph_assert(old_forbidden == true);
+  //to wake up if someone was in need of expanding log
+  log_cond.notify_all();
 
   // 7. release old space
   dout(10) << __func__ << " release old log extents " << old_extents << dendl;
-  for (auto& r : old_extents) {
-    pending_release[r.bdev].insert(r.offset, r.length);
+  {
+    std::lock_guard dl(dirty_lock);
+    for (auto& r : old_extents) {
+      pending_release[r.bdev].insert(r.offset, r.length);
+    }
   }
 
   // delete the new log, remove from the dirty files list
   _close_writer(new_log_writer);
+  // flush_special does not dirty files
+  /*
   if (new_log->dirty_seq) {
+    std::lock_guard dl(dirty_lock);
     ceph_assert(dirty_files.count(new_log->dirty_seq));
     auto it = dirty_files[new_log->dirty_seq].iterator_to(*new_log);
     dirty_files[new_log->dirty_seq].erase(it);
   }
+  */
   new_log_writer = nullptr;
   new_log = nullptr;
   log_cond.notify_all();
 
   dout(10) << __func__ << " log extents " << log_file->fnode.extents << dendl;
   logger->inc(l_bluefs_log_compactions);
+
+  old_is_comp = atomic_exchange(&log_is_compacting, false);
+  ceph_assert(old_is_comp);
 }
 
 void BlueFS::_pad_bl(bufferlist& bl)
@@ -2531,10 +2574,26 @@ void BlueFS::_pad_bl(bufferlist& bl)
   }
 }
 
+/**
+ * Adds file modifications from `dirty_files` to bluefs transactions
+ * already stored in `log_t`. Writes them to disk and waits until are stable.
+ * Guarantees that file modifications with `want_seq` are already stable on disk.
+ * In addition may insert jump forward transaction to log write position `jump_to`.
+ *
+ * it should lock ability to:
+ * 1) add to log_t
+ * 2) modify dirty_files
+ * 3) add to pending_release
+ *
+ * pending_release and log_t go with same lock
+ */
+
 // Adds to log_t file modifications mentioned in `dirty_files`.
 // Note: some bluefs ops may have already been stored in log_t transaction.
 uint64_t BlueFS::_consume_dirty()
 {
+  ceph_assert(ceph_mutex_is_locked(dirty_lock));
+  ceph_assert(ceph_mutex_is_locked(log_lock));
   //acquire new seq
   // this will became log_seq_stable once we write
   uint64_t seq = log_t.seq = ++log_seq;
@@ -2557,6 +2616,7 @@ uint64_t BlueFS::_consume_dirty()
 // Returns space available *BEFORE* adding new space. Signed for additional <0 detection.
 int64_t BlueFS::_maybe_extend_log()
 {
+  ceph_assert(ceph_mutex_is_locked(log_lock));
   // allocate some more space (before we run out)?
   // BTW: this triggers `flush()` in the `page_aligned_appender` of `log_writer`.
   int64_t runway = log_writer->file->fnode.get_allocated() -
@@ -2579,7 +2639,7 @@ int64_t BlueFS::_maybe_extend_log()
      * - re-run compaction with more runway for old log
      * - add OP_FILE_ADDEXT that adds extent; will be compatible with both logs
      */
-    if (new_log_writer) {
+    if (log_forbidden_to_expand.load() == true) {
       return -EWOULDBLOCK;
     }
     vselector->sub_usage(log_writer->file->vselector_hint, log_writer->file->fnode);
@@ -2596,6 +2656,7 @@ int64_t BlueFS::_maybe_extend_log()
 
 void BlueFS::_flush_and_sync_log_core(int64_t runway)
 {
+  ceph_assert(ceph_mutex_is_locked(log_lock));
   dout(10) << __func__ << " " << log_t << dendl;
 
   bufferlist bl;
@@ -2625,6 +2686,8 @@ void BlueFS::_flush_and_sync_log_core(int64_t runway)
 // Clears dirty_files up to (including) seq_stable.
 void BlueFS::_clear_dirty_set_stable(uint64_t seq)
 {
+  std::lock_guard lg(dirty_lock);
+
   // clean dirty files
   if (seq > log_seq_stable) {
     log_seq_stable = seq;
@@ -2681,33 +2744,46 @@ void BlueFS::_release_pending_allocations(vector<interval_set<uint64_t>>& to_rel
   }
 }
 
-int BlueFS::_flush_and_sync_log(std::unique_lock<ceph::mutex>& l,
-                              uint64_t want_seq)
+int BlueFS::flush_and_sync_log(uint64_t want_seq)
 {
-  if (want_seq && want_seq <= log_seq_stable) {
-    dout(10) << __func__ << " want_seq " << want_seq << " <= log_seq_stable "
-            << log_seq_stable << ", done" << dendl;
-    return 0;
-  }
+  // we synchronize writing to log, by lock to log_lock
   int64_t available_runway;
   do {
+    log_lock.lock();
+    dirty_lock.lock();
+    if (want_seq && want_seq <= log_seq_stable) {
+      dout(10) << __func__ << " want_seq " << want_seq << " <= log_seq_stable "
+              << log_seq_stable << ", done" << dendl;
+      dirty_lock.unlock();
+      log_lock.unlock();
+      return 0;
+    }
+
     available_runway = _maybe_extend_log();
     if (available_runway == -EWOULDBLOCK) {
-      while (new_log_writer) {
-       dout(10) << __func__ << " waiting for async compaction" << dendl;
-       log_cond.wait(l);
+      // we are in need of adding runway, but we are during log-switch from compaction
+      dirty_lock.unlock();
+      //instead log_lock_unlock() do move ownership
+      std::unique_lock<ceph::mutex> ll(log_lock, std::adopt_lock);
+      while (log_forbidden_to_expand.load()) {
+       log_cond.wait(ll);
       }
+    } else {
+      ceph_assert(available_runway >= 0);
     }
-  } while (available_runway == -EWOULDBLOCK);
+  } while (available_runway < 0);
 
   ceph_assert(want_seq == 0 || want_seq <= log_seq + 1); // illegal to request seq that was not created yet
 
   uint64_t seq = _consume_dirty();
   vector<interval_set<uint64_t>> to_release(pending_release.size());
   to_release.swap(pending_release);
+  dirty_lock.unlock();
 
   _flush_and_sync_log_core(available_runway);
-  _flush_bdev_safely(log_writer);
+  _flush_bdev(log_writer);
+  //now log_lock is no longer needed
+  log_lock.unlock();
 
   _clear_dirty_set_stable(seq);
   _release_pending_allocations(to_release);
@@ -2720,11 +2796,16 @@ int BlueFS::_flush_and_sync_log(std::unique_lock<ceph::mutex>& l,
 int BlueFS::_flush_and_sync_log_jump(uint64_t jump_to,
                                     int64_t available_runway)
 {
+  ceph_assert(ceph_mutex_is_locked(log_lock));
+
   ceph_assert(jump_to);
+  // we synchronize writing to log, by lock to log_lock
+
+  dirty_lock.lock();
   uint64_t seq = _consume_dirty();
   vector<interval_set<uint64_t>> to_release(pending_release.size());
   to_release.swap(pending_release);
-
+  dirty_lock.unlock();
   _flush_and_sync_log_core(available_runway);
 
   dout(10) << __func__ << " jumping log offset from 0x" << std::hex
@@ -2734,7 +2815,7 @@ int BlueFS::_flush_and_sync_log_jump(uint64_t jump_to,
   log_writer->file->fnode.size = jump_to;
   vselector->add_usage(log_writer->file->vselector_hint, log_writer->file->fnode.size);
 
-  _flush_bdev_safely(log_writer);
+  _flush_bdev(log_writer);
 
   _clear_dirty_set_stable(seq);
   _release_pending_allocations(to_release);
@@ -2749,6 +2830,7 @@ ceph::bufferlist BlueFS::FileWriter::flush_buffer(
   const unsigned length,
   const bluefs_super_t& super)
 {
+  ceph_assert(ceph_mutex_is_locked(this->lock) || file->fnode.ino == 1);
   ceph::bufferlist bl;
   if (partial) {
     tail_block.splice(0, tail_block.length(), &bl);
@@ -2787,6 +2869,8 @@ ceph::bufferlist BlueFS::FileWriter::flush_buffer(
 
 int BlueFS::_signal_dirty_to_log(FileWriter *h)
 {
+  ceph_assert(ceph_mutex_is_locked(h->lock));
+  std::lock_guard dl(dirty_lock);
   if (h->file->deleted) {
     dout(10) << __func__ << "  deleted, no-op" << dendl;
     return 0;
@@ -2817,8 +2901,14 @@ int BlueFS::_signal_dirty_to_log(FileWriter *h)
   return 0;
 }
 
+void BlueFS::flush_range(FileWriter *h, uint64_t offset, uint64_t length) {
+  std::unique_lock hl(h->lock);
+  _flush_range(h, offset, length);
+}
+
 int BlueFS::_flush_range(FileWriter *h, uint64_t offset, uint64_t length)
 {
+  ceph_assert(ceph_mutex_is_locked(h->lock));
   dout(10) << __func__ << " " << h << " pos 0x" << std::hex << h->pos
           << " 0x" << offset << "~" << length << std::dec
           << " to " << h->file->fnode << dendl;
@@ -2841,13 +2931,13 @@ int BlueFS::_flush_range(FileWriter *h, uint64_t offset, uint64_t length)
              << std::hex << offset << "~" << length << std::dec
              << dendl;
   }
+  std::lock_guard file_lock(h->file->lock);
   ceph_assert(offset <= h->file->fnode.size);
 
   uint64_t allocated = h->file->fnode.get_allocated();
   vselector->sub_usage(h->file->vselector_hint, h->file->fnode);
   // do not bother to dirty the file if we are overwriting
   // previously allocated extents.
-
   if (allocated < offset + length) {
     // we should never run out of log space here; see the min runway check
     // in _flush_and_sync_log.
@@ -2868,12 +2958,15 @@ int BlueFS::_flush_range(FileWriter *h, uint64_t offset, uint64_t length)
     h->file->fnode.size = offset + length;
     h->file->is_dirty = true;
   }
+
   dout(20) << __func__ << " file now, unflushed " << h->file->fnode << dendl;
   return _flush_data(h, offset, length, buffered);
 }
 
 int BlueFS::_flush_data(FileWriter *h, uint64_t offset, uint64_t length, bool buffered)
 {
+  //ceph_assert(ceph_mutex_is_locked(h->lock));
+  //ceph_assert(ceph_mutex_is_locked(h->file->lock));
   uint64_t x_off = 0;
   auto p = h->file->fnode.seek(offset, &x_off);
   ceph_assert(p != h->file->fnode.extents.end());
@@ -2980,18 +3073,49 @@ void BlueFS::wait_for_aio(FileWriter *h)
 }
 #endif
 
-int BlueFS::_flush(FileWriter *h, bool force, std::unique_lock<ceph::mutex>& l)
+
+void BlueFS::append_try_flush(FileWriter *h, const char* buf, size_t len)
+{
+  std::unique_lock hl(h->lock);
+  size_t max_size = 1ull << 30; // cap to 1GB
+  while (len > 0) {
+    bool need_flush = true;
+    auto l0 = h->get_buffer_length();
+    if (l0 < max_size) {
+      size_t l = std::min(len, max_size - l0);
+      h->append(buf, l);
+      buf += l;
+      len -= l;
+      need_flush = h->get_buffer_length() >= cct->_conf->bluefs_min_flush_size;
+    }
+    if (need_flush) {
+      bool flushed = false;
+      int r = _flush(h, true, &flushed);
+      ceph_assert(r == 0);
+      if (r == 0 && flushed) {
+       maybe_compact_log();
+      }
+      // make sure we've made any progress with flush hence the
+      // loop doesn't iterate forever
+      ceph_assert(h->get_buffer_length() < max_size);
+    }
+  }
+}
+
+void BlueFS::flush(FileWriter *h, bool force)
 {
+  std::unique_lock hl(h->lock);
   bool flushed = false;
   int r = _flush(h, force, &flushed);
+  ceph_assert(r == 0);
   if (r == 0 && flushed) {
-    _maybe_compact_log(l);
+    maybe_compact_log();
   }
-  return r;
 }
 
 int BlueFS::_flush(FileWriter *h, bool force, bool *flushed)
 {
+  ceph_assert(ceph_mutex_is_locked(h->lock));
   uint64_t length = h->get_buffer_length();
   uint64_t offset = h->pos;
   if (flushed) {
@@ -3028,6 +3152,8 @@ int BlueFS::_flush(FileWriter *h, bool force, bool *flushed)
 // smart enough to discover it on its own.
 int BlueFS::_flush_special(FileWriter *h)
 {
+  //ceph_assert(ceph_mutex_is_locked(h->lock));
+  //ceph_assert(ceph_mutex_is_locked(h->file->lock));
   uint64_t length = h->get_buffer_length();
   uint64_t offset = h->pos;
   ceph_assert(length + offset <= h->file->fnode.get_allocated());
@@ -3037,8 +3163,9 @@ int BlueFS::_flush_special(FileWriter *h)
   return _flush_data(h, offset, length, false);
 }
 
-int BlueFS::_truncate(FileWriter *h, uint64_t offset)
+int BlueFS::truncate(FileWriter *h, uint64_t offset)
 {
+  std::lock_guard hl(h->lock);
   dout(10) << __func__ << " 0x" << std::hex << offset << std::dec
            << " file " << h->file->fnode << dendl;
   if (h->file->deleted) {
@@ -3068,17 +3195,19 @@ int BlueFS::_truncate(FileWriter *h, uint64_t offset)
     ceph_abort_msg("truncate up not supported");
   }
   ceph_assert(h->file->fnode.size >= offset);
-  _flush_bdev_safely(h);
+  _flush_bdev(h);
   vselector->sub_usage(h->file->vselector_hint, h->file->fnode.size);
   h->file->fnode.size = offset;
   vselector->add_usage(h->file->vselector_hint, h->file->fnode.size);
 
+  std::lock_guard ll(log_lock);
   log_t.op_file_update_inc(h->file->fnode);
   return 0;
 }
 
-int BlueFS::_fsync(FileWriter *h, std::unique_lock<ceph::mutex>& l)
+int BlueFS::fsync(FileWriter *h)
 {
+  std::unique_lock hl(h->lock);
   dout(10) << __func__ << " " << h << " " << h->file->fnode << dendl;
   int r = _flush(h, true);
   if (r < 0)
@@ -3089,39 +3218,40 @@ int BlueFS::_fsync(FileWriter *h, std::unique_lock<ceph::mutex>& l)
   }
   uint64_t old_dirty_seq = h->file->dirty_seq;
 
-  _flush_bdev_safely(h);
+  _flush_bdev(h);
 
   if (old_dirty_seq) {
     uint64_t s = log_seq;
     dout(20) << __func__ << " file metadata was dirty (" << old_dirty_seq
             << ") on " << h->file->fnode << ", flushing log" << dendl;
-    _flush_and_sync_log(l, old_dirty_seq);
+    flush_and_sync_log(old_dirty_seq);
+    // AK - TODO - think - how can dirty_seq change if we are under h lock?
     ceph_assert(h->file->dirty_seq == 0 ||  // cleaned
-          h->file->dirty_seq > s);    // or redirtied by someone else
+               h->file->dirty_seq > s);    // or redirtied by someone else
   }
+  maybe_compact_log();
   return 0;
 }
 
-void BlueFS::_flush_bdev_safely(FileWriter *h)
+// be careful - either h->file->lock or log_lock must be taken
+void BlueFS::_flush_bdev(FileWriter *h)
 {
+  if (h->file->fnode.ino != 1) {
+    ceph_assert(ceph_mutex_is_locked(h->lock));
+  } else {
+    ceph_assert(ceph_mutex_is_locked(log_lock));
+  }
   std::array<bool, MAX_BDEV> flush_devs = h->dirty_devs;
   h->dirty_devs.fill(false);
 #ifdef HAVE_LIBAIO
   if (!cct->_conf->bluefs_sync_write) {
     list<aio_t> completed_ios;
     _claim_completed_aios(h, &completed_ios);
-    lock.unlock();
     wait_for_aio(h);
     completed_ios.clear();
-    flush_bdev(flush_devs);
-    lock.lock();
-  } else
-#endif
-  {
-    lock.unlock();
-    flush_bdev(flush_devs);
-    lock.lock();
   }
+#endif
+  flush_bdev(flush_devs);
 }
 
 void BlueFS::flush_bdev(std::array<bool, MAX_BDEV>& dirty_bdevs)
@@ -3253,8 +3383,9 @@ int BlueFS::_allocate(uint8_t id, uint64_t len,
   return 0;
 }
 
-int BlueFS::_preallocate(FileRef f, uint64_t off, uint64_t len)
+int BlueFS::preallocate(FileRef f, uint64_t off, uint64_t len)
 {
+  std::lock_guard fl(f->lock);
   dout(10) << __func__ << " file " << f->fnode << " 0x"
           << std::hex << off << "~" << len << std::dec << dendl;
   if (f->deleted) {
@@ -3273,6 +3404,7 @@ int BlueFS::_preallocate(FileRef f, uint64_t off, uint64_t len)
     vselector->add_usage(f->vselector_hint, f->fnode);
     if (r < 0)
       return r;
+    std::lock_guard ll(log_lock);
     log_t.op_file_update_inc(f->fnode);
   }
   return 0;
@@ -3280,8 +3412,13 @@ int BlueFS::_preallocate(FileRef f, uint64_t off, uint64_t len)
 
 void BlueFS::sync_metadata(bool avoid_compact)
 {
-  std::unique_lock l(lock);
-  if (log_t.empty() && dirty_files.empty()) {
+  bool can_skip_flush;
+  {
+    std::lock_guard ll(log_lock);
+    std::lock_guard dl(dirty_lock);
+    can_skip_flush = log_t.empty() && dirty_files.empty();
+  }
+  if (can_skip_flush) {
     dout(10) << __func__ << " - no pending log events" << dendl;
   } else {
     utime_t start;
@@ -3289,23 +3426,23 @@ void BlueFS::sync_metadata(bool avoid_compact)
     start = ceph_clock_now();
     *_dout <<  dendl;
     flush_bdev(); // FIXME?
-    _flush_and_sync_log(l);
+    flush_and_sync_log();
     dout(10) << __func__ << " done in " << (ceph_clock_now() - start) << dendl;
   }
 
   if (!avoid_compact) {
-    _maybe_compact_log(l);
+    maybe_compact_log();
   }
 }
 
-void BlueFS::_maybe_compact_log(std::unique_lock<ceph::mutex>& l)
+void BlueFS::maybe_compact_log()
 {
   if (!cct->_conf->bluefs_replay_recovery_disable_compact &&
-      _should_compact_log()) {
+      should_start_compact_log()) {
     if (cct->_conf->bluefs_compact_log_sync) {
-      _compact_log_sync();
+      compact_log_sync();
     } else {
-      _compact_log_async(l);
+      compact_log_async();
     }
   }
 }
@@ -3316,7 +3453,7 @@ int BlueFS::open_for_write(
   FileWriter **h,
   bool overwrite)
 {
-  std::lock_guard l(lock);
+  std::lock_guard dirl(dirs_lock);
   dout(10) << __func__ << " " << dirname << "/" << filename << dendl;
   map<string,DirRef>::iterator p = dir_map.find(dirname);
   DirRef dir;
@@ -3378,11 +3515,12 @@ int BlueFS::open_for_write(
   dout(20) << __func__ << " mapping " << dirname << "/" << filename
           << " vsel_hint " << file->vselector_hint
           << dendl;
-
-  log_t.op_file_update(file->fnode);
-  if (create)
-    log_t.op_dir_link(dirname, filename, file->fnode.ino);
-
+  {
+    std::lock_guard ll(log_lock);
+    log_t.op_file_update(file->fnode);
+    if (create)
+      log_t.op_dir_link(dirname, filename, file->fnode.ino);
+  }
   *h = _create_writer(file);
 
   if (boost::algorithm::ends_with(filename, ".log")) {
@@ -3412,7 +3550,7 @@ BlueFS::FileWriter *BlueFS::_create_writer(FileRef f)
   return w;
 }
 
-void BlueFS::_close_writer(FileWriter *h)
+void BlueFS::_drain_writer(FileWriter *h)
 {
   dout(10) << __func__ << " " << h << " type " << h->writer_type << dendl;
   //h->buffer.reassign_to_mempool(mempool::mempool_bluefs_file_writer);
@@ -3428,18 +3566,31 @@ void BlueFS::_close_writer(FileWriter *h)
   if (h->file->fnode.size >= (1ull << 30)) {
     dout(10) << __func__ << " file is unexpectedly large:" << h->file->fnode << dendl;
   }
+}
+
+void BlueFS::_close_writer(FileWriter *h)
+{
+  _drain_writer(h);
+  delete h;
+}
+void BlueFS::close_writer(FileWriter *h)
+{
+  {
+    std::lock_guard l(h->lock);
+    _drain_writer(h);
+  }
   delete h;
 }
 
 uint64_t BlueFS::debug_get_dirty_seq(FileWriter *h)
 {
-  std::lock_guard l(lock);
+  std::lock_guard l(h->lock);
   return h->file->dirty_seq;
 }
 
 bool BlueFS::debug_get_is_dev_dirty(FileWriter *h, uint8_t dev)
 {
-  std::lock_guard l(lock);
+  std::lock_guard l(h->lock);
   return h->dirty_devs[dev];
 }
 
@@ -3449,7 +3600,7 @@ int BlueFS::open_for_read(
   FileReader **h,
   bool random)
 {
-  std::lock_guard l(lock);
+  std::lock_guard dirl(dirs_lock);
   dout(10) << __func__ << " " << dirname << "/" << filename
           << (random ? " (random)":" (sequential)") << dendl;
   map<string,DirRef>::iterator p = dir_map.find(dirname);
@@ -3478,7 +3629,8 @@ int BlueFS::rename(
   std::string_view old_dirname, std::string_view old_filename,
   std::string_view new_dirname, std::string_view new_filename)
 {
-  std::lock_guard l(lock);
+  std::lock_guard dirl(dirs_lock);
+  std::lock_guard ll(log_lock);
   dout(10) << __func__ << " " << old_dirname << "/" << old_filename
           << " -> " << new_dirname << "/" << new_filename << dendl;
   map<string,DirRef>::iterator p = dir_map.find(old_dirname);
@@ -3525,7 +3677,8 @@ int BlueFS::rename(
 
 int BlueFS::mkdir(std::string_view dirname)
 {
-  std::lock_guard l(lock);
+  std::lock_guard dirl(dirs_lock);
+  std::lock_guard ll(log_lock);
   dout(10) << __func__ << " " << dirname << dendl;
   map<string,DirRef>::iterator p = dir_map.find(dirname);
   if (p != dir_map.end()) {
@@ -3539,7 +3692,8 @@ int BlueFS::mkdir(std::string_view dirname)
 
 int BlueFS::rmdir(std::string_view dirname)
 {
-  std::lock_guard l(lock);
+  std::lock_guard dirl(dirs_lock);
+  std::lock_guard ll(log_lock);
   dout(10) << __func__ << " " << dirname << dendl;
   auto p = dir_map.find(dirname);
   if (p == dir_map.end()) {
@@ -3558,7 +3712,7 @@ int BlueFS::rmdir(std::string_view dirname)
 
 bool BlueFS::dir_exists(std::string_view dirname)
 {
-  std::lock_guard l(lock);
+  std::lock_guard dirl(dirs_lock);
   map<string,DirRef>::iterator p = dir_map.find(dirname);
   bool exists = p != dir_map.end();
   dout(10) << __func__ << " " << dirname << " = " << (int)exists << dendl;
@@ -3568,7 +3722,7 @@ bool BlueFS::dir_exists(std::string_view dirname)
 int BlueFS::stat(std::string_view dirname, std::string_view filename,
                 uint64_t *size, utime_t *mtime)
 {
-  std::lock_guard l(lock);
+  std::lock_guard dirl(dirs_lock);
   dout(10) << __func__ << " " << dirname << "/" << filename << dendl;
   map<string,DirRef>::iterator p = dir_map.find(dirname);
   if (p == dir_map.end()) {
@@ -3596,7 +3750,7 @@ int BlueFS::stat(std::string_view dirname, std::string_view filename,
 int BlueFS::lock_file(std::string_view dirname, std::string_view filename,
                      FileLock **plock)
 {
-  std::lock_guard l(lock);
+  std::lock_guard dirl(dirs_lock);
   dout(10) << __func__ << " " << dirname << "/" << filename << dendl;
   map<string,DirRef>::iterator p = dir_map.find(dirname);
   if (p == dir_map.end()) {
@@ -3616,6 +3770,7 @@ int BlueFS::lock_file(std::string_view dirname, std::string_view filename,
     file_map[ino_last] = file;
     dir->file_map[string{filename}] = file;
     ++file->refs;
+    std::lock_guard ll(log_lock);
     log_t.op_file_update(file->fnode);
     log_t.op_dir_link(dirname, filename, file->fnode.ino);
   } else {
@@ -3634,7 +3789,7 @@ int BlueFS::lock_file(std::string_view dirname, std::string_view filename,
 
 int BlueFS::unlock_file(FileLock *fl)
 {
-  std::lock_guard l(lock);
+  std::lock_guard dirl(dirs_lock);
   dout(10) << __func__ << " " << fl << " on " << fl->file->fnode << dendl;
   ceph_assert(fl->file->locked);
   fl->file->locked = false;
@@ -3648,7 +3803,7 @@ int BlueFS::readdir(std::string_view dirname, vector<string> *ls)
   if (!dirname.empty() && dirname.back() == '/') {
     dirname.remove_suffix(1);
   }
-  std::lock_guard l(lock);
+  std::lock_guard dirl(dirs_lock);
   dout(10) << __func__ << " " << dirname << dendl;
   if (dirname.empty()) {
     // list dirs
@@ -3676,7 +3831,8 @@ int BlueFS::readdir(std::string_view dirname, vector<string> *ls)
 
 int BlueFS::unlink(std::string_view dirname, std::string_view filename)
 {
-  std::lock_guard l(lock);
+  std::lock_guard dirl(dirs_lock);
+  std::lock_guard ll(log_lock);
   dout(10) << __func__ << " " << dirname << "/" << filename << dendl;
   map<string,DirRef>::iterator p = dir_map.find(dirname);
   if (p == dir_map.end()) {
index 801d4bf92c0ba064ded6ef31643985696e2c2d74..79a7464e10c7095feac4cc1afdde896298666c44 100644 (file)
@@ -121,6 +121,11 @@ public:
     std::atomic_int num_reading;
 
     void* vselector_hint = nullptr;
+    /* lock protects fnode and other the parts that can be modified during read & write operations.
+       Does not protect values that are fixed
+       Does not need to be taken when doing one-time operations:
+       _replay, device_migrate_to_existing, device_migrate_to_new */
+    ceph::mutex lock = ceph::make_mutex("BlueFS::File::lock");
 
   private:
     FRIEND_MAKE_REF(File);
@@ -299,8 +304,9 @@ public:
   };
 
 private:
-  ceph::mutex lock = ceph::make_mutex("BlueFS::lock");
-
+  ceph::mutex log_lock = ceph::make_mutex("BlueFS::log_lock");
+  ceph::mutex dirs_lock = ceph::make_mutex("BlueFS::dirs_lock");
+  ceph::mutex dirty_lock = ceph::make_mutex("BlueFS::dirty_lock");
   PerfCounters *logger = nullptr;
 
   uint64_t max_bytes[MAX_BDEV] = {0};
@@ -324,13 +330,12 @@ private:
   FileWriter *log_writer = 0;  ///< writer for the log
   bluefs_transaction_t log_t;  ///< pending, unwritten log transaction
   bool log_flushing = false;   ///< true while flushing the log
-  ceph::condition_variable log_cond;
-
-  uint64_t new_log_jump_to = 0;
-  uint64_t old_log_jump_to = 0;
-  FileRef new_log = nullptr;
-  FileWriter *new_log_writer = nullptr;
 
+  ceph::condition_variable log_cond;                             ///< used for state control between log flush / log compaction
+  ceph::mutex cond_lock = ceph::make_mutex("BlueFS::cond_lock"); ///< ....
+  std::atomic<bool> log_is_compacting{false};                    ///< signals that bluefs log is already ongoing compaction
+  std::atomic<bool> log_forbidden_to_expand{false};              ///< used to signal that async compaction is in state
+                                                                 ///  that prohibits expansion of bluefs log
   /*
    * There are up to 3 block devices:
    *
@@ -344,6 +349,11 @@ private:
   std::vector<Allocator*> alloc;                   ///< allocators for bdevs
   std::vector<uint64_t> alloc_size;                ///< alloc size for each device
   std::vector<interval_set<uint64_t>> pending_release; ///< extents to release
+  // TODO: it should be examined what makes pending_release immune to
+  // eras in a way similar to dirty_files. Hints:
+  // 1) we have actually only 2 eras: log_seq and log_seq+1
+  // 2) we usually not remove extents from files. And when we do, we force log-syncing.
+
   //std::vector<interval_set<uint64_t>> block_unused_too_granular;
 
   BlockDevice::aio_callback_t discard_cb[3]; //discard callbacks for each dev
@@ -390,10 +400,9 @@ private:
   int _signal_dirty_to_log(FileWriter *h);
   int _flush_range(FileWriter *h, uint64_t offset, uint64_t length);
   int _flush_data(FileWriter *h, uint64_t offset, uint64_t length, bool buffered);
-  int _flush(FileWriter *h, bool force, std::unique_lock<ceph::mutex>& l);
   int _flush(FileWriter *h, bool force, bool *flushed = nullptr);
   int _flush_special(FileWriter *h);
-  int _fsync(FileWriter *h, std::unique_lock<ceph::mutex>& l);
+  int _fsync(FileWriter *h);
 
 #ifdef HAVE_LIBAIO
   void _claim_completed_aios(FileWriter *h, std::list<aio_t> *ls);
@@ -409,11 +418,10 @@ private:
   void _flush_and_sync_log_core(int64_t available_runway);
   int _flush_and_sync_log_jump(uint64_t jump_to,
                               int64_t available_runway);
-  int _flush_and_sync_log(std::unique_lock<ceph::mutex>& l,
-                         uint64_t want_seq = 0);
+  int flush_and_sync_log(uint64_t want_seq = 0);
 
-  uint64_t _estimate_log_size();
-  bool _should_compact_log();
+  uint64_t estimate_log_size();
+  bool should_start_compact_log();
 
   enum {
     REMOVE_DB = 1,
@@ -421,12 +429,12 @@ private:
     RENAME_SLOW2DB = 4,
     RENAME_DB2SLOW = 8,
   };
-  void _compact_log_dump_metadata(bluefs_transaction_t *t,
-                                 int flags);
-  void _compact_log_sync();
-  void _compact_log_async(std::unique_lock<ceph::mutex>& l);
+  void compact_log_dump_metadata(bluefs_transaction_t *t,
+                                int flags);
+  void compact_log_sync();
+  void compact_log_async();
 
-  void _rewrite_log_and_layout_sync(bool allocate_with_fallback,
+  void rewrite_log_and_layout_sync(bool allocate_with_fallback,
                                    int super_dev,
                                    int log_dev,
                                    int new_log_dev,
@@ -435,7 +443,7 @@ private:
 
   //void _aio_finish(void *priv);
 
-  void _flush_bdev_safely(FileWriter *h);
+  void _flush_bdev(FileWriter *h);
   void flush_bdev();  // this is safe to call without a lock
   void flush_bdev(std::array<bool, MAX_BDEV>& dirty_bdevs);  // this is safe to call without a lock
 
@@ -454,8 +462,6 @@ private:
     uint64_t len,    ///< [in] this many bytes
     char *out);      ///< [out] optional: or copy it here
 
-  void _invalidate_cache(FileRef f, uint64_t offset, uint64_t length);
-
   int _open_super();
   int _write_super(int dev);
   int _check_allocations(const bluefs_fnode_t& fnode,
@@ -468,6 +474,7 @@ private:
   int _replay(bool noop, bool to_stdout = false); ///< replay journal
 
   FileWriter *_create_writer(FileRef f);
+  void _drain_writer(FileWriter *h);
   void _close_writer(FileWriter *h);
 
   // always put the super in the second 4k block.  FIXME should this be
@@ -533,10 +540,8 @@ public:
     FileReader **h,
     bool random = false);
 
-  void close_writer(FileWriter *h) {
-    std::lock_guard l(lock);
-    _close_writer(h);
-  }
+  // data added after last fsync() is lost
+  void close_writer(FileWriter *h);
 
   int rename(std::string_view old_dir, std::string_view old_file,
             std::string_view new_dir, std::string_view new_file);
@@ -560,7 +565,7 @@ public:
   /// sync any uncommitted state to disk
   void sync_metadata(bool avoid_compact);
   /// test and compact log, if necessary
-  void _maybe_compact_log(std::unique_lock<ceph::mutex>& l);
+  void maybe_compact_log();
 
   void set_volume_selector(BlueFSVolumeSelector* s) {
     vselector.reset(s);
@@ -582,42 +587,11 @@ public:
   // handler for discard event
   void handle_discard(unsigned dev, interval_set<uint64_t>& to_release);
 
-  void flush(FileWriter *h, bool force = false) {
-    std::unique_lock l(lock);
-    int r = _flush(h, force, l);
-    ceph_assert(r == 0);
-  }
+  void flush(FileWriter *h, bool force = false);
 
-  void append_try_flush(FileWriter *h, const char* buf, size_t len) {
-    size_t max_size = 1ull << 30; // cap to 1GB
-    while (len > 0) {
-      bool need_flush = true;
-      auto l0 = h->get_buffer_length();
-      if (l0 < max_size) {
-       size_t l = std::min(len, max_size - l0);
-       h->append(buf, l);
-       buf += l;
-       len -= l;
-       need_flush = h->get_buffer_length() >= cct->_conf->bluefs_min_flush_size;
-      }
-      if (need_flush) {
-       flush(h, true);
-       // make sure we've made any progress with flush hence the
-       // loop doesn't iterate forever
-       ceph_assert(h->get_buffer_length() < max_size);
-      }
-    }
-  }
-  void flush_range(FileWriter *h, uint64_t offset, uint64_t length) {
-    std::lock_guard l(lock);
-    _flush_range(h, offset, length);
-  }
-  int fsync(FileWriter *h) {
-    std::unique_lock l(lock);
-    int r = _fsync(h, l);
-    _maybe_compact_log(l);
-    return r;
-  }
+  void append_try_flush(FileWriter *h, const char* buf, size_t len);
+  void flush_range(FileWriter *h, uint64_t offset, uint64_t length);
+  int fsync(FileWriter *h);
   int64_t read(FileReader *h, uint64_t offset, size_t len,
           ceph::buffer::list *outbl, char *out) {
     // no need to hold the global lock here; we only touch h and
@@ -632,18 +606,9 @@ public:
     // atomics and asserts).
     return _read_random(h, offset, len, out);
   }
-  void invalidate_cache(FileRef f, uint64_t offset, uint64_t len) {
-    std::lock_guard l(lock);
-    _invalidate_cache(f, offset, len);
-  }
-  int preallocate(FileRef f, uint64_t offset, uint64_t len) {
-    std::lock_guard l(lock);
-    return _preallocate(f, offset, len);
-  }
-  int truncate(FileWriter *h, uint64_t offset) {
-    std::lock_guard l(lock);
-    return _truncate(h, offset);
-  }
+  void invalidate_cache(FileRef f, uint64_t offset, uint64_t len);
+  int preallocate(FileRef f, uint64_t offset, uint64_t len);
+  int truncate(FileWriter *h, uint64_t offset);
   int do_replay_recovery_read(FileReader *log,
                              size_t log_pos,
                              size_t read_offset,