]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Revert "Merge pull request #42099 from aclamk/wip-bluefs-fine-grain-locking-2" 43627/head
authorSage Weil <sage@newdream.net>
Fri, 22 Oct 2021 02:18:12 +0000 (22:18 -0400)
committerSage Weil <sage@newdream.net>
Fri, 22 Oct 2021 02:18:12 +0000 (22:18 -0400)
This reverts commit 44e89c613c6b882f3436714fdaa6c9c04b94b62d, reversing
changes made to 552ac6657753f00f039f267e20e5a4c399c2bb10.

This is causing failures in
ObjectStore/StoreTestSpecificAUSize.SyntheticMatrixCsumVsCompression/2
and ObjectStore/StoreTestSpecificAUSize.SpilloverFixedTest/2

Signed-off-by: Sage Weil <sage@newdream.net>
src/os/bluestore/BlueFS.cc
src/os/bluestore/BlueFS.h

index 7add76833705f0ef52e544e357a8ee803b536512..f34996be3e000f9d685d05f73133f4bba3aa6018 100644 (file)
@@ -148,9 +148,9 @@ private:
       out.append(ss);
     } else if (command == "bluefs files list") {
       const char* devnames[3] = {"wal","db","slow"};
-      std::lock_guard l(bluefs->nodes.lock);
+      std::lock_guard l(bluefs->lock);
       f->open_array_section("files");
-      for (auto &d : bluefs->nodes.dir_map) {
+      for (auto &d : bluefs->dir_map) {
         std::string dir = d.first;
         for (auto &r : d.second->file_map) {
           f->open_object_section("file");
@@ -310,12 +310,11 @@ void BlueFS::_shutdown_logger()
   delete logger;
 }
 
-//AK - TODO - locking needed but not certain
 void BlueFS::_update_logger_stats()
 {
   // we must be holding the lock
-  logger->set(l_bluefs_num_files, nodes.file_map.size());
-  logger->set(l_bluefs_log_bytes, log.writer->file->fnode.size);
+  logger->set(l_bluefs_num_files, file_map.size());
+  logger->set(l_bluefs_log_bytes, log_writer->file->fnode.size);
 
   if (alloc[BDEV_WAL]) {
     logger->set(l_bluefs_wal_total_bytes, _get_total(BDEV_WAL));
@@ -393,6 +392,7 @@ void BlueFS::handle_discard(unsigned id, interval_set<uint64_t>& to_release)
 
 uint64_t BlueFS::get_used()
 {
+  std::lock_guard l(lock);
   uint64_t used = 0;
   for (unsigned id = 0; id < MAX_BDEV; ++id) {
     used += _get_used(id);
@@ -418,6 +418,7 @@ uint64_t BlueFS::get_used(unsigned id)
 {
   ceph_assert(id < alloc.size());
   ceph_assert(alloc[id]);
+  std::lock_guard l(lock);
   return _get_used(id);
 }
 
@@ -430,11 +431,13 @@ uint64_t BlueFS::_get_total(unsigned id) const
 
 uint64_t BlueFS::get_total(unsigned id)
 {
+  std::lock_guard l(lock);
   return _get_total(id);
 }
 
 uint64_t BlueFS::get_free(unsigned id)
 {
+  std::lock_guard l(lock);
   ceph_assert(id < alloc.size());
   return alloc[id]->get_free();
 }
@@ -464,10 +467,10 @@ void BlueFS::dump_block_extents(ostream& out)
 
 int BlueFS::get_block_extents(unsigned id, interval_set<uint64_t> *extents)
 {
-  std::lock_guard nl(nodes.lock);
+  std::lock_guard l(lock);
   dout(10) << __func__ << " bdev " << id << dendl;
   ceph_assert(id < alloc.size());
-  for (auto& p : nodes.file_map) {
+  for (auto& p : file_map) {
     for (auto& q : p.second->fnode.extents) {
       if (q.bdev == id) {
         extents->insert(q.offset, q.length);
@@ -479,6 +482,7 @@ int BlueFS::get_block_extents(unsigned id, interval_set<uint64_t> *extents)
 
 int BlueFS::mkfs(uuid_d osd_uuid, const bluefs_layout_t& layout)
 {
+  std::unique_lock l(lock);
   dout(1) << __func__
          << " osd_uuid " << osd_uuid
          << dendl;
@@ -511,24 +515,22 @@ int BlueFS::mkfs(uuid_d osd_uuid, const bluefs_layout_t& layout)
     &log_file->fnode);
   vselector->add_usage(log_file->vselector_hint, log_file->fnode);
   ceph_assert(r == 0);
-  log.writer = _create_writer(log_file);
+  log_writer = _create_writer(log_file);
 
   // initial txn
-  ceph_assert(log.seq_live == 1);
-  log.t.seq = 1;
-  log.t.op_init();
-  _flush_and_sync_log_LD();
+  log_t.op_init();
+  _flush_and_sync_log(l);
 
   // write supers
   super.log_fnode = log_file->fnode;
   super.memorized_layout = layout;
   _write_super(BDEV_DB);
-  _flush_bdev();
+  flush_bdev();
 
   // clean up
   super = bluefs_super_t();
-  _close_writer(log.writer);
-  log.writer = NULL;
+  _close_writer(log_writer);
+  log_writer = NULL;
   vselector.reset(nullptr);
   _stop_alloc();
   _shutdown_logger();
@@ -783,7 +785,7 @@ int BlueFS::mount()
   }
 
   // init freelist
-  for (auto& p : nodes.file_map) {
+  for (auto& p : file_map) {
     dout(30) << __func__ << " noting alloc for " << p.second->fnode << dendl;
     for (auto& q : p.second->fnode.extents) {
       bool is_shared = is_shared_alloc(q.bdev);
@@ -806,11 +808,11 @@ int BlueFS::mount()
   }
 
   // set up the log for future writes
-  log.writer = _create_writer(_get_file(1));
-  ceph_assert(log.writer->file->fnode.ino == 1);
-  log.writer->pos = log.writer->file->fnode.size;
+  log_writer = _create_writer(_get_file(1));
+  ceph_assert(log_writer->file->fnode.ino == 1);
+  log_writer->pos = log_writer->file->fnode.size;
   dout(10) << __func__ << " log write pos set to 0x"
-           << std::hex << log.writer->pos << std::dec
+           << std::hex << log_writer->pos << std::dec
            << dendl;
 
   return 0;
@@ -843,15 +845,15 @@ void BlueFS::umount(bool avoid_compact)
 
   sync_metadata(avoid_compact);
 
-  _close_writer(log.writer);
-  log.writer = NULL;
+  _close_writer(log_writer);
+  log_writer = NULL;
 
   vselector.reset(nullptr);
   _stop_alloc();
-  nodes.file_map.clear();
-  nodes.dir_map.clear();
+  file_map.clear();
+  dir_map.clear();
   super = bluefs_super_t();
-  log.t.clear();
+  log_t.clear();
   _shutdown_logger();
 }
 
@@ -866,7 +868,7 @@ int BlueFS::prepare_new_device(int id, const bluefs_layout_t& layout)
       new_log_dev_cur = BDEV_NEWDB;
       new_log_dev_next = BDEV_DB;
     }
-    _rewrite_log_and_layout_sync_LN_LD(false,
+    _rewrite_log_and_layout_sync(false,
       BDEV_NEWDB,
       new_log_dev_cur,
       new_log_dev_next,
@@ -874,7 +876,7 @@ int BlueFS::prepare_new_device(int id, const bluefs_layout_t& layout)
       layout);
     //}
   } else if(id == BDEV_NEWWAL) {
-    _rewrite_log_and_layout_sync_LN_LD(false,
+    _rewrite_log_and_layout_sync(false,
       BDEV_DB,
       BDEV_NEWWAL,
       BDEV_WAL,
@@ -905,6 +907,7 @@ void BlueFS::get_devices(set<string> *ls)
 
 int BlueFS::fsck()
 {
+  std::lock_guard l(lock);
   dout(1) << __func__ << dendl;
   // hrm, i think we check everything on mount...
   return 0;
@@ -1035,7 +1038,7 @@ int BlueFS::_replay(bool noop, bool to_stdout)
 {
   dout(10) << __func__ << (noop ? " NO-OP" : "") << dendl;
   ino_last = 1;  // by the log
-  uint64_t log_seq = 0;
+  log_seq = 0;
 
   FileRef log_file;
   log_file = _get_file(1);
@@ -1089,7 +1092,7 @@ int BlueFS::_replay(bool noop, bool to_stdout)
       int r = _read(log_reader, read_pos, super.block_size,
                    &bl, NULL);
       if (r != (int)super.block_size && cct->_conf->bluefs_replay_recovery) {
-       r += _do_replay_recovery_read(log_reader, pos, read_pos + r, super.block_size - r, &bl);
+       r += do_replay_recovery_read(log_reader, pos, read_pos + r, super.block_size - r, &bl);
       }
       assert(r == (int)super.block_size);
       read_pos += r;
@@ -1149,7 +1152,7 @@ int BlueFS::_replay(bool noop, bool to_stdout)
                  << ", which is past eof" << dendl;
        if (cct->_conf->bluefs_replay_recovery) {
          //try to search for more data
-         r += _do_replay_recovery_read(log_reader, pos, read_pos + r, more - r, &t);
+         r += do_replay_recovery_read(log_reader, pos, read_pos + r, more - r, &t);
          if (r < (int)more) {
            //in normal mode we must read r==more, for recovery it is too strict
            break;
@@ -1223,7 +1226,7 @@ int BlueFS::_replay(bool noop, bool to_stdout)
                       << std::endl;
           }
 
-         ceph_assert(next_seq > log_seq);
+         ceph_assert(next_seq >= log_seq);
          log_seq = next_seq - 1; // we will increment it below
          uint64_t skip = offset - read_pos;
          if (skip) {
@@ -1251,7 +1254,7 @@ int BlueFS::_replay(bool noop, bool to_stdout)
                       << ":  op_jump_seq " << next_seq << std::endl;
           }
 
-         ceph_assert(next_seq > log_seq);
+         ceph_assert(next_seq >= log_seq);
          log_seq = next_seq - 1; // we will increment it below
        }
        break;
@@ -1299,8 +1302,8 @@ int BlueFS::_replay(bool noop, bool to_stdout)
          if (!noop) {
            FileRef file = _get_file(ino);
            ceph_assert(file->fnode.ino);
-           map<string,DirRef>::iterator q = nodes.dir_map.find(dirname);
-           ceph_assert(q != nodes.dir_map.end());
+           map<string,DirRef>::iterator q = dir_map.find(dirname);
+           ceph_assert(q != dir_map.end());
            map<string,FileRef>::iterator r = q->second->file_map.find(filename);
            ceph_assert(r == q->second->file_map.end());
 
@@ -1330,8 +1333,8 @@ int BlueFS::_replay(bool noop, bool to_stdout)
           }
  
          if (!noop) {
-           map<string,DirRef>::iterator q = nodes.dir_map.find(dirname);
-           ceph_assert(q != nodes.dir_map.end());
+           map<string,DirRef>::iterator q = dir_map.find(dirname);
+           ceph_assert(q != dir_map.end());
            map<string,FileRef>::iterator r = q->second->file_map.find(filename);
            ceph_assert(r != q->second->file_map.end());
             ceph_assert(r->second->refs > 0); 
@@ -1353,9 +1356,9 @@ int BlueFS::_replay(bool noop, bool to_stdout)
           }
 
          if (!noop) {
-           map<string,DirRef>::iterator q = nodes.dir_map.find(dirname);
-           ceph_assert(q == nodes.dir_map.end());
-           nodes.dir_map[dirname] = ceph::make_ref<Dir>();
+           map<string,DirRef>::iterator q = dir_map.find(dirname);
+           ceph_assert(q == dir_map.end());
+           dir_map[dirname] = ceph::make_ref<Dir>();
          }
        }
        break;
@@ -1372,10 +1375,10 @@ int BlueFS::_replay(bool noop, bool to_stdout)
           }
 
          if (!noop) {
-           map<string,DirRef>::iterator q = nodes.dir_map.find(dirname);
-           ceph_assert(q != nodes.dir_map.end());
+           map<string,DirRef>::iterator q = dir_map.find(dirname);
+           ceph_assert(q != dir_map.end());
            ceph_assert(q->second->file_map.empty());
-           nodes.dir_map.erase(q);
+           dir_map.erase(q);
          }
        }
        break;
@@ -1436,8 +1439,8 @@ int BlueFS::_replay(bool noop, bool to_stdout)
           }
 
           if (!noop) {
-            auto p = nodes.file_map.find(ino);
-            ceph_assert(p != nodes.file_map.end());
+            auto p = file_map.find(ino);
+            ceph_assert(p != file_map.end());
             vselector->sub_usage(p->second->vselector_hint, p->second->fnode);
             if (cct->_conf->bluefs_log_replay_check_allocations) {
              int r = _check_allocations(p->second->fnode,
@@ -1446,7 +1449,7 @@ int BlueFS::_replay(bool noop, bool to_stdout)
                return r;
               }
             }
-            nodes.file_map.erase(p);
+            file_map.erase(p);
           }
         }
        break;
@@ -1466,11 +1469,6 @@ int BlueFS::_replay(bool noop, bool to_stdout)
   }
   if (!noop) {
     vselector->add_usage(log_file->vselector_hint, log_file->fnode);
-    log.seq_live = log_seq + 1;
-    dirty.seq_live = log_seq + 1;
-    log.t.seq = log.seq_live;
-    //log.seq_stable = log_seq;
-    dirty.seq_stable = log_seq;
   }
 
   dout(10) << __func__ << " log file size was 0x"
@@ -1484,7 +1482,7 @@ int BlueFS::_replay(bool noop, bool to_stdout)
 
   if (!noop) {
     // verify file link counts are all >0
-    for (auto& p : nodes.file_map) {
+    for (auto& p : file_map) {
       if (p.second->refs == 0 &&
          p.second->fnode.ino > 1) {
        derr << __func__ << " file with link count 0: " << p.second->fnode
@@ -1501,7 +1499,7 @@ int BlueFS::_replay(bool noop, bool to_stdout)
 int BlueFS::log_dump()
 {
   // only dump log file's content
-  ceph_assert(log.writer == nullptr && "cannot log_dump on mounted BlueFS");
+  ceph_assert(log_writer == nullptr && "cannot log_dump on mounted BlueFS");
   int r = _open_super();
   if (r < 0) {
     derr << __func__ << " failed to open super: " << cpp_strerror(r) << dendl;
@@ -1543,7 +1541,7 @@ int BlueFS::device_migrate_to_existing(
     dout(0) << __func__ << " super to be written to " << dev_target << dendl;
   }
 
-  for (auto& [ino, file_ref] : nodes.file_map) {
+  for (auto& [ino, file_ref] : file_map) {
     //do not copy log
     if (file_ref->fnode.ino == 1) {
       continue;
@@ -1650,7 +1648,7 @@ int BlueFS::device_migrate_to_existing(
         new_log_dev_next;
   }
 
-  _rewrite_log_and_layout_sync_LN_LD(
+  _rewrite_log_and_layout_sync(
     false,
     (flags & REMOVE_DB) ? BDEV_SLOW : BDEV_DB,
     new_log_dev_cur,
@@ -1681,7 +1679,7 @@ int BlueFS::device_migrate_to_new(
   flags |= devs_source.count(BDEV_WAL) ? REMOVE_WAL : 0;
   int dev_target_new = dev_target; //FIXME: remove, makes no sense
 
-  for (auto& p : nodes.file_map) {
+  for (auto& p : file_map) {
     //do not copy log
     if (p.second->fnode.ino == 1) {
       continue;
@@ -1785,7 +1783,7 @@ int BlueFS::device_migrate_to_new(
         BDEV_DB :
        BDEV_SLOW;
 
-  _rewrite_log_and_layout_sync_LN_LD(
+  _rewrite_log_and_layout_sync(
     false,
     super_dev,
     new_log_dev_cur,
@@ -1797,10 +1795,10 @@ int BlueFS::device_migrate_to_new(
 
 BlueFS::FileRef BlueFS::_get_file(uint64_t ino)
 {
-  auto p = nodes.file_map.find(ino);
-  if (p == nodes.file_map.end()) {
+  auto p = file_map.find(ino);
+  if (p == file_map.end()) {
     FileRef f = ceph::make_ref<File>();
-    nodes.file_map[ino] = f;
+    file_map[ino] = f;
     dout(30) << __func__ << " ino " << ino << " = " << f
             << " (new)" << dendl;
     return f;
@@ -1810,33 +1808,29 @@ BlueFS::FileRef BlueFS::_get_file(uint64_t ino)
   }
 }
 
-void BlueFS::_drop_link_D(FileRef file)
+void BlueFS::_drop_link(FileRef file)
 {
   dout(20) << __func__ << " had refs " << file->refs
           << " on " << file->fnode << dendl;
   ceph_assert(file->refs > 0);
-  ceph_assert(ceph_mutex_is_locked(log.lock));
-
   --file->refs;
   if (file->refs == 0) {
     dout(20) << __func__ << " destroying " << file->fnode << dendl;
     ceph_assert(file->num_reading.load() == 0);
     vselector->sub_usage(file->vselector_hint, file->fnode);
-    log.t.op_file_remove(file->fnode.ino);
-
-    std::lock_guard dl(dirty.lock);
+    log_t.op_file_remove(file->fnode.ino);
     for (auto& r : file->fnode.extents) {
       pending_release[r.bdev].insert(r.offset, r.length);
     }
-    nodes.file_map.erase(file->fnode.ino);
+    file_map.erase(file->fnode.ino);
     file->deleted = true;
 
-    if (file->dirty_seq > dirty.seq_stable) {
-      // retract request to serialize changes
-      ceph_assert(dirty.files.count(file->dirty_seq));
-      auto it = dirty.files[file->dirty_seq].iterator_to(*file);
-      dirty.files[file->dirty_seq].erase(it);
-      file->dirty_seq = dirty.seq_stable;
+    if (file->dirty_seq) {
+      ceph_assert(file->dirty_seq > log_seq_stable);
+      ceph_assert(dirty_files.count(file->dirty_seq));
+      auto it = dirty_files[file->dirty_seq].iterator_to(*file);
+      dirty_files[file->dirty_seq].erase(it);
+      file->dirty_seq = 0;
     }
   }
 }
@@ -2055,9 +2049,8 @@ int64_t BlueFS::_read(
   return ret;
 }
 
-void BlueFS::invalidate_cache(FileRef f, uint64_t offset, uint64_t length)
+void BlueFS::_invalidate_cache(FileRef f, uint64_t offset, uint64_t length)
 {
-  std::lock_guard l(f->lock);
   dout(10) << __func__ << " file " << f->fnode
           << " 0x" << std::hex << offset << "~" << length << std::dec
            << dendl;
@@ -2077,68 +2070,60 @@ void BlueFS::invalidate_cache(FileRef f, uint64_t offset, uint64_t length)
   }
 }
 
-uint64_t BlueFS::_estimate_log_size_N()
+uint64_t BlueFS::_estimate_log_size()
 {
-  std::lock_guard nl(nodes.lock);
   int avg_dir_size = 40;  // fixme
   int avg_file_size = 12;
   uint64_t size = 4096 * 2;
-  size += nodes.file_map.size() * (1 + sizeof(bluefs_fnode_t));
-  size += nodes.dir_map.size() + (1 + avg_dir_size);
-  size += nodes.file_map.size() * (1 + avg_dir_size + avg_file_size);
+  size += file_map.size() * (1 + sizeof(bluefs_fnode_t));
+  size += dir_map.size() + (1 + avg_dir_size);
+  size += file_map.size() * (1 + avg_dir_size + avg_file_size);
   return round_up_to(size, super.block_size);
 }
 
 void BlueFS::compact_log()
 {
+  std::unique_lock<ceph::mutex> l(lock);
   if (!cct->_conf->bluefs_replay_recovery_disable_compact) {
     if (cct->_conf->bluefs_compact_log_sync) {
-      _compact_log_sync_LN_LD();
+      _compact_log_sync();
     } else {
-      _compact_log_async_LD_NF_D();
+      _compact_log_async(l);
     }
   }
 }
 
-bool BlueFS::_should_start_compact_log_L_N()
+bool BlueFS::_should_compact_log()
 {
-  if (log_is_compacting.load() == true) {
-    // compaction is already running
-    return false;
-  }
-  uint64_t current;
-  {
-    std::lock_guard ll(log.lock);
-    current = log.writer->file->fnode.size;
-  }
-  uint64_t expected = _estimate_log_size_N();
+  uint64_t current = log_writer->file->fnode.size;
+  uint64_t expected = _estimate_log_size();
   float ratio = (float)current / (float)expected;
   dout(10) << __func__ << " current 0x" << std::hex << current
           << " expected " << expected << std::dec
           << " ratio " << ratio
+          << (new_log ? " (async compaction in progress)" : "")
           << dendl;
-  if (current < cct->_conf->bluefs_log_compact_min_size ||
+  if (new_log ||
+      current < cct->_conf->bluefs_log_compact_min_size ||
       ratio < cct->_conf->bluefs_log_compact_min_ratio) {
     return false;
   }
   return true;
 }
 
-void BlueFS::_compact_log_dump_metadata_N(bluefs_transaction_t *t,
+void BlueFS::_compact_log_dump_metadata(bluefs_transaction_t *t,
                                        int flags)
 {
-  std::lock_guard nl(nodes.lock);
-
   t->seq = 1;
   t->uuid = super.uuid;
   dout(20) << __func__ << " op_init" << dendl;
 
   t->op_init();
-  for (auto& [ino, file_ref] : nodes.file_map) {
+  for (auto& [ino, file_ref] : file_map) {
     if (ino == 1)
       continue;
     ceph_assert(ino > 1);
-    //AK - TODO - touching fnode - need to lock
+
     for(auto& e : file_ref->fnode.extents) {
       auto bdev = e.bdev;
       auto bdev_new = bdev;
@@ -2164,41 +2149,7 @@ void BlueFS::_compact_log_dump_metadata_N(bluefs_transaction_t *t,
     dout(20) << __func__ << " op_file_update " << file_ref->fnode << dendl;
     t->op_file_update(file_ref->fnode);
   }
-  for (auto& [path, dir_ref] : nodes.dir_map) {
-    dout(20) << __func__ << " op_dir_create " << path << dendl;
-    t->op_dir_create(path);
-    for (auto& [fname, file_ref] : dir_ref->file_map) {
-      dout(20) << __func__ << " op_dir_link " << path << "/" << fname
-              << " to " << file_ref->fnode.ino << dendl;
-      t->op_dir_link(path, fname, file_ref->fnode.ino);
-    }
-  }
-}
-/* Streams to t files modified before *capture_before_seq* and all dirs */
-void BlueFS::compact_log_async_dump_metadata_NF(bluefs_transaction_t *t,
-                                            uint64_t capture_before_seq)
-{
-  std::lock_guard nl(nodes.lock);
-
-  t->seq = 1;
-  t->uuid = super.uuid;
-  dout(20) << __func__ << " op_init" << dendl;
-
-  t->op_init();
-  for (auto& [ino, file_ref] : nodes.file_map) {
-    if (ino == 1)
-      continue;
-    ceph_assert(ino > 1);
-    std::lock_guard fl(file_ref->lock);
-    if (file_ref->dirty_seq < capture_before_seq) {
-      dout(20) << __func__ << " op_file_update " << file_ref->fnode << dendl;
-      t->op_file_update(file_ref->fnode);
-    } else {
-      dout(20) << __func__ << " skipping just modified, dirty_seq="
-              << file_ref->dirty_seq << " " << file_ref->fnode << dendl;
-    }
-  }
-  for (auto& [path, dir_ref] : nodes.dir_map) {
+  for (auto& [path, dir_ref] : dir_map) {
     dout(20) << __func__ << " op_dir_create " << path << dendl;
     t->op_dir_create(path);
     for (auto& [fname, file_ref] : dir_ref->file_map) {
@@ -2209,12 +2160,12 @@ void BlueFS::compact_log_async_dump_metadata_NF(bluefs_transaction_t *t,
   }
 }
 
-void BlueFS::_compact_log_sync_LN_LD()
+void BlueFS::_compact_log_sync()
 {
   dout(10) << __func__ << dendl;
   auto prefer_bdev =
-    vselector->select_prefer_bdev(log.writer->file->vselector_hint);
-  _rewrite_log_and_layout_sync_LN_LD(true,
+    vselector->select_prefer_bdev(log_writer->file->vselector_hint);
+  _rewrite_log_and_layout_sync(true,
     BDEV_DB,
     prefer_bdev,
     prefer_bdev,
@@ -2223,24 +2174,17 @@ void BlueFS::_compact_log_sync_LN_LD()
   logger->inc(l_bluefs_log_compactions);
 }
 
-void BlueFS::_rewrite_log_and_layout_sync_LN_LD(bool allocate_with_fallback,
-                                        int super_dev,
-                                        int log_dev,
-                                        int log_dev_new,
-                                        int flags,
-                                        std::optional<bluefs_layout_t> layout)
+void BlueFS::_rewrite_log_and_layout_sync(bool allocate_with_fallback,
+                                         int super_dev,
+                                         int log_dev,
+                                         int log_dev_new,
+                                         int flags,
+                                         std::optional<bluefs_layout_t> layout)
 {
-  std::lock_guard ll(log.lock);
+  File *log_file = log_writer->file.get();
 
-  File *log_file = log.writer->file.get();
-
-  // log.t.seq is always set to current live seq
-  ceph_assert(log.t.seq == log.seq_live);
-  // Capturing entire state. Dump anything that has been stored there.
-  log.t.clear();
-  log.t.seq = log.seq_live;
-  // From now on, no changes to log.t are permitted until we finish rewriting log.
-  // Can allow dirty to remain dirty - log.seq_live will not change.
+  // clear out log (be careful who calls us!!!)
+  log_t.clear();
 
   dout(20) << __func__ << " super_dev:" << super_dev
                        << " log_dev:" << log_dev
@@ -2248,10 +2192,10 @@ void BlueFS::_rewrite_log_and_layout_sync_LN_LD(bool allocate_with_fallback,
                       << " flags:" << flags
                       << dendl;
   bluefs_transaction_t t;
-  _compact_log_dump_metadata_N(&t, flags);
+  _compact_log_dump_metadata(&t, flags);
 
-  dout(20) << __func__ << " op_jump_seq " << log.seq_live << dendl;
-  t.op_jump_seq(log.seq_live);
+  dout(20) << __func__ << " op_jump_seq " << log_seq << dendl;
+  t.op_jump_seq(log_seq);
 
   bufferlist bl;
   encode(t, bl);
@@ -2278,25 +2222,25 @@ void BlueFS::_rewrite_log_and_layout_sync_LN_LD(bool allocate_with_fallback,
     }
   }
 
-  _close_writer(log.writer);
+  _close_writer(log_writer);
 
   log_file->fnode.size = bl.length();
   vselector->sub_usage(log_file->vselector_hint, old_fnode);
   vselector->add_usage(log_file->vselector_hint, log_file->fnode);
 
-  log.writer = _create_writer(log_file);
-  log.writer->append(bl);
-  r = _flush_special(log.writer);
+  log_writer = _create_writer(log_file);
+  log_writer->append(bl);
+  r = _flush(log_writer, true);
   ceph_assert(r == 0);
 #ifdef HAVE_LIBAIO
   if (!cct->_conf->bluefs_sync_write) {
     list<aio_t> completed_ios;
-    _claim_completed_aios(log.writer, &completed_ios);
-    _wait_for_aio(log.writer);
+    _claim_completed_aios(log_writer, &completed_ios);
+    wait_for_aio(log_writer);
     completed_ios.clear();
   }
 #endif
-  _flush_bdev();
+  flush_bdev();
 
   super.memorized_layout = layout;
   super.log_fnode = log_file->fnode;
@@ -2311,10 +2255,9 @@ void BlueFS::_rewrite_log_and_layout_sync_LN_LD(bool allocate_with_fallback,
 
   ++super.version;
   _write_super(super_dev);
-  _flush_bdev();
+  flush_bdev();
 
   dout(10) << __func__ << " release old log extents " << old_fnode.extents << dendl;
-  std::lock_guard dl(dirty.lock);
   for (auto& r : old_fnode.extents) {
     pending_release[r.bdev].insert(r.offset, r.length);
   }
@@ -2342,44 +2285,30 @@ void BlueFS::_rewrite_log_and_layout_sync_LN_LD(bool allocate_with_fallback,
  *
  * 8. Release the old log space.  Clean up.
  */
-
-void BlueFS::_compact_log_async_LD_NF_D() //also locks FW for new_writer
+void BlueFS::_compact_log_async(std::unique_lock<ceph::mutex>& l)
 {
   dout(10) << __func__ << dendl;
-  // only one compaction allowed at one time
-  bool old_is_comp = std::atomic_exchange(&log_is_compacting, true);
-  if (old_is_comp) {
-    dout(10) << __func__ << " ongoing" <<dendl;
-    return;
-  }
-
-  log.lock.lock();
-  File *log_file = log.writer->file.get();
-  FileWriter *new_log_writer = nullptr;
-  FileRef new_log = nullptr;
-  uint64_t new_log_jump_to = 0;
-  uint64_t old_log_jump_to = 0;
+  File *log_file = log_writer->file.get();
+  ceph_assert(!new_log);
+  ceph_assert(!new_log_writer);
 
+  // create a new log [writer] so that we know compaction is in progress
+  // (see _should_compact_log)
   new_log = ceph::make_ref<File>();
-  new_log->fnode.ino = 0;   // we use _flush_special to avoid log of the fnode
-
-  // Part 1.
-  // Prepare current log for jumping into it.
-  // 1. Allocate extent
-  // 2. Update op to log
-  // 3. Jump op to log
-  // During that, no one else can write to log, otherwise we risk jumping backwards.
-  // We need to sync log, because we are injecting discontinuity, and writer is not prepared for that.
+  new_log->fnode.ino = 0;   // so that _flush_range won't try to log the fnode
 
-  //signal _maybe_extend_log that expansion of log is temporary inacceptable
-  bool old_forbidden = atomic_exchange(&log_forbidden_to_expand, true);
-  ceph_assert(old_forbidden == false);
+  // 0. wait for any racing flushes to complete.  (We do not want to block
+  // in _flush_sync_log with jump_to set or else a racing thread might flush
+  // our entries and our jump_to update won't be correct.)
+  while (log_flushing) {
+    dout(10) << __func__ << " log is currently flushing, waiting" << dendl;
+    log_cond.wait(l);
+  }
 
   vselector->sub_usage(log_file->vselector_hint, log_file->fnode);
 
-  // 1.1 allocate new log space and jump to it.
+  // 1. allocate new log space and jump to it.
   old_log_jump_to = log_file->fnode.get_allocated();
-  uint64_t runway = log_file->fnode.get_allocated() - log.writer->get_effective_write_pos();
   dout(10) << __func__ << " old_log_jump_to 0x" << std::hex << old_log_jump_to
            << " need 0x" << (old_log_jump_to + cct->_conf->bluefs_max_log_runway) << std::dec << dendl;
   int r = _allocate(vselector->select_prefer_bdev(log_file->vselector_hint),
@@ -2392,22 +2321,19 @@ void BlueFS::_compact_log_async_LD_NF_D() //also locks FW for new_writer
 
   // update the log file change and log a jump to the offset where we want to
   // write the new entries
-  log.t.op_file_update(log_file->fnode);
-  // jump to new position should mean next seq
-  log.t.op_jump(log.seq_live + 1, old_log_jump_to);
-  uint64_t seq_now = log.seq_live;
-  // we need to flush all bdev because we will be streaming all dirty files to log
-  // TODO - think - if _flush_and_sync_log_jump will not add dirty files nor release pending allocations
-  // then flush_bdev() will not be necessary
-  _flush_bdev();
-  _flush_and_sync_log_jump_D(old_log_jump_to, runway);
-
-  log.lock.unlock();
-  // out of jump section - now log can be used to write to
+  log_t.op_file_update(log_file->fnode);
+  log_t.op_jump(log_seq, old_log_jump_to);
+
+  flush_bdev();  // FIXME?
+
+  _flush_and_sync_log(l, 0, old_log_jump_to);
 
   // 2. prepare compacted log
   bluefs_transaction_t t;
-  compact_log_async_dump_metadata_NF(&t, seq_now);
+  //avoid record two times in log_t and _compact_log_dump_metadata.
+  log_t.clear();
+  _compact_log_dump_metadata(&t, 0);
+
   uint64_t max_alloc_size = std::max(alloc_size[BDEV_WAL],
                                     std::max(alloc_size[BDEV_DB],
                                              alloc_size[BDEV_SLOW]));
@@ -2415,8 +2341,7 @@ void BlueFS::_compact_log_async_LD_NF_D() //also locks FW for new_writer
   // conservative estimate for final encoded size
   new_log_jump_to = round_up_to(t.op_bl.length() + super.block_size * 2,
                                 max_alloc_size);
-  //newly constructed log head will jump to what we had before
-  t.op_jump(seq_now, new_log_jump_to);
+  t.op_jump(log_seq, new_log_jump_to);
 
   // allocate
   //FIXME: check if we want DB here?
@@ -2425,7 +2350,7 @@ void BlueFS::_compact_log_async_LD_NF_D() //also locks FW for new_writer
   ceph_assert(r == 0);
 
   // we might have some more ops in log_t due to _allocate call
-  // t.claim_ops(log_t); no we no longer track allocations in log
+  t.claim_ops(log_t);
 
   bufferlist bl;
   encode(t, bl);
@@ -2435,18 +2360,15 @@ void BlueFS::_compact_log_async_LD_NF_D() //also locks FW for new_writer
           << std::dec << dendl;
 
   new_log_writer = _create_writer(new_log);
-
   new_log_writer->append(bl);
-  new_log->lock.lock();
-  new_log_writer->lock.lock();
+
   // 3. flush
-  r = _flush_special(new_log_writer);
+  r = _flush(new_log_writer, true);
   ceph_assert(r == 0);
 
   // 4. wait
-  _flush_bdev(new_log_writer);
-  new_log_writer->lock.unlock();
-  new_log->lock.unlock();
+  _flush_bdev_safely(new_log_writer);
+
   // 5. update our log fnode
   // discard first old_log_jump_to extents
 
@@ -2487,8 +2409,8 @@ void BlueFS::_compact_log_async_LD_NF_D() //also locks FW for new_writer
   // swap the log files. New log file is the log file now.
   new_log->fnode.swap_extents(log_file->fnode);
 
-  log.writer->pos = log.writer->file->fnode.size =
-    log.writer->pos - old_log_jump_to + new_log_jump_to;
+  log_writer->pos = log_writer->file->fnode.size =
+    log_writer->pos - old_log_jump_to + new_log_jump_to;
 
   vselector->add_usage(log_file->vselector_hint, log_file->fnode);
 
@@ -2498,33 +2420,29 @@ void BlueFS::_compact_log_async_LD_NF_D() //also locks FW for new_writer
   ++super.version;
   _write_super(BDEV_DB);
 
-  _flush_bdev();
-
-  old_forbidden = atomic_exchange(&log_forbidden_to_expand, false);
-  ceph_assert(old_forbidden == true);
-  //to wake up if someone was in need of expanding log
-  log_cond.notify_all();
+  lock.unlock();
+  flush_bdev();
+  lock.lock();
 
   // 7. release old space
   dout(10) << __func__ << " release old log extents " << old_extents << dendl;
-  {
-    std::lock_guard dl(dirty.lock);
-    for (auto& r : old_extents) {
-      pending_release[r.bdev].insert(r.offset, r.length);
-    }
+  for (auto& r : old_extents) {
+    pending_release[r.bdev].insert(r.offset, r.length);
   }
 
   // delete the new log, remove from the dirty files list
   _close_writer(new_log_writer);
+  if (new_log->dirty_seq) {
+    ceph_assert(dirty_files.count(new_log->dirty_seq));
+    auto it = dirty_files[new_log->dirty_seq].iterator_to(*new_log);
+    dirty_files[new_log->dirty_seq].erase(it);
+  }
   new_log_writer = nullptr;
   new_log = nullptr;
   log_cond.notify_all();
 
   dout(10) << __func__ << " log extents " << log_file->fnode.extents << dendl;
   logger->inc(l_bluefs_log_compactions);
-
-  old_is_comp = atomic_exchange(&log_is_compacting, false);
-  ceph_assert(old_is_comp);
 }
 
 void BlueFS::_pad_bl(bufferlist& bl)
@@ -2537,108 +2455,76 @@ void BlueFS::_pad_bl(bufferlist& bl)
   }
 }
 
-/**
- * Adds file modifications from `dirty.files` to bluefs transactions
- * already stored in `log.t`. Writes them to disk and waits until are stable.
- * Guarantees that file modifications with `want_seq` are already stable on disk.
- * In addition may insert jump forward transaction to log write position `jump_to`.
- *
- * it should lock ability to:
- * 1) add to log.t
- * 2) modify dirty.files
- * 3) add to pending_release
- *
- * pending_release and log.t go with same lock
- */
 
-// Returns log seq that was live before advance.
-uint64_t BlueFS::_log_advance_seq()
+int BlueFS::_flush_and_sync_log(std::unique_lock<ceph::mutex>& l,
+                               uint64_t want_seq,
+                               uint64_t jump_to)
 {
-  ceph_assert(ceph_mutex_is_locked(dirty.lock));
-  ceph_assert(ceph_mutex_is_locked(log.lock));
-  //acquire new seq
-  // this will became seq_stable once we write
-  ceph_assert(dirty.seq_stable < dirty.seq_live);
-  ceph_assert(log.t.seq == log.seq_live);
-  uint64_t seq = log.seq_live;
-  log.t.uuid = super.uuid;
-
-  ++dirty.seq_live;
-  ++log.seq_live;
-  ceph_assert(dirty.seq_live == log.seq_live);
-  return seq;
-}
+  while (log_flushing) {
+    dout(10) << __func__ << " want_seq " << want_seq
+            << " log is currently flushing, waiting" << dendl;
+    ceph_assert(!jump_to);
+    log_cond.wait(l);
+  }
+  if (want_seq && want_seq <= log_seq_stable) {
+    dout(10) << __func__ << " want_seq " << want_seq << " <= log_seq_stable "
+            << log_seq_stable << ", done" << dendl;
+    ceph_assert(!jump_to);
+    return 0;
+  }
+  if (log_t.empty() && dirty_files.empty()) {
+    dout(10) << __func__ << " want_seq " << want_seq
+            << " " << log_t << " not dirty, dirty_files empty, no-op" << dendl;
+    ceph_assert(!jump_to);
+    return 0;
+  }
 
+  vector<interval_set<uint64_t>> to_release(pending_release.size());
+  to_release.swap(pending_release);
 
-// Adds to log.t file modifications mentioned in `dirty.files`.
-// Note: some bluefs ops may have already been stored in log.t transaction.
-void BlueFS::_consume_dirty(uint64_t seq)
-{
-  ceph_assert(ceph_mutex_is_locked(dirty.lock));
-  ceph_assert(ceph_mutex_is_locked(log.lock));
+  uint64_t seq = log_t.seq = ++log_seq;
+  ceph_assert(want_seq == 0 || want_seq <= seq);
+  log_t.uuid = super.uuid;
 
   // log dirty files
-  // we just incremented log_seq. It is now illegal to add to dirty.files[log_seq]
-  auto lsi = dirty.files.find(seq);
-  if (lsi != dirty.files.end()) {
-    dout(20) << __func__ << " " << lsi->second.size() << " dirty.files" << dendl;
+  auto lsi = dirty_files.find(seq);
+  if (lsi != dirty_files.end()) {
+    dout(20) << __func__ << " " << lsi->second.size() << " dirty_files" << dendl;
     for (auto &f : lsi->second) {
       dout(20) << __func__ << "   op_file_update " << f.fnode << dendl;
-      log.t.op_file_update(f.fnode);
+      log_t.op_file_update(f.fnode);
     }
   }
-}
 
-// Extends log if its free space is smaller then bluefs_min_log_runway.
-// Returns space available *BEFORE* adding new space. Signed for additional <0 detection.
-int64_t BlueFS::_maybe_extend_log()
-{
-  ceph_assert(ceph_mutex_is_locked(log.lock));
+  dout(10) << __func__ << " " << log_t << dendl;
+  ceph_assert(!log_t.empty());
+
   // allocate some more space (before we run out)?
-  // BTW: this triggers `flush()` in the `page_aligned_appender` of `log.writer`.
-  int64_t runway = log.writer->file->fnode.get_allocated() -
-    log.writer->get_effective_write_pos();
+  // BTW: this triggers `flush()` in the `page_aligned_appender` of `log_writer`.
+  int64_t runway = log_writer->file->fnode.get_allocated() -
+    log_writer->get_effective_write_pos();
+  bool just_expanded_log = false;
   if (runway < (int64_t)cct->_conf->bluefs_min_log_runway) {
     dout(10) << __func__ << " allocating more log runway (0x"
             << std::hex << runway << std::dec  << " remaining)" << dendl;
-    /*
-     * Usually, when we are low on space in log, we just allocate new extent,
-     * put update op(log) to log and we are fine.
-     * Problem - it interferes with log compaction:
-     * New log produced in compaction will include - as last op - jump into some offset (anchor) of current log.
-     * It is assumed that log region (anchor - end) will contain all changes made by bluefs since
-     * full state capture into new log.
-     * Putting log update into (anchor - end) region is illegal, because any update there must be compatible with
-     * both logs, but old log is different then new log.
-     *
-     * Possible solutions:
-     * - stall extending log until we finish compacting and switch log (CURRENT)
-     * - re-run compaction with more runway for old log
-     * - add OP_FILE_ADDEXT that adds extent; will be compatible with both logs
-     */
-    if (log_forbidden_to_expand.load() == true) {
-      return -EWOULDBLOCK;
+    while (new_log_writer) {
+      dout(10) << __func__ << " waiting for async compaction" << dendl;
+      log_cond.wait(l);
     }
-    vselector->sub_usage(log.writer->file->vselector_hint, log.writer->file->fnode);
+    vselector->sub_usage(log_writer->file->vselector_hint, log_writer->file->fnode);
     int r = _allocate(
-      vselector->select_prefer_bdev(log.writer->file->vselector_hint),
+      vselector->select_prefer_bdev(log_writer->file->vselector_hint),
       cct->_conf->bluefs_max_log_runway,
-      &log.writer->file->fnode);
+      &log_writer->file->fnode);
     ceph_assert(r == 0);
-    vselector->add_usage(log.writer->file->vselector_hint, log.writer->file->fnode);
-    log.t.op_file_update(log.writer->file->fnode);
+    vselector->add_usage(log_writer->file->vselector_hint, log_writer->file->fnode);
+    log_t.op_file_update(log_writer->file->fnode);
+    just_expanded_log = true;
   }
-  return runway;
-}
-
-void BlueFS::_flush_and_sync_log_core(int64_t runway)
-{
-  ceph_assert(ceph_mutex_is_locked(log.lock));
-  dout(10) << __func__ << " " << log.t << dendl;
 
   bufferlist bl;
   bl.reserve(super.block_size);
-  encode(log.t, bl);
+  encode(log_t, bl);
   // pad to block boundary
   size_t realign = super.block_size - (bl.length() % super.block_size);
   if (realign && realign != super.block_size)
@@ -2646,35 +2532,41 @@ void BlueFS::_flush_and_sync_log_core(int64_t runway)
 
   logger->inc(l_bluefs_logged_bytes, bl.length());
 
-  if (true) {
+  if (just_expanded_log) {
     ceph_assert(bl.length() <= runway); // if we write this, we will have an unrecoverable data loss
-                                        // transaction will not fit extents before growth -> data loss on _replay
   }
 
-  log.writer->append(bl);
+  log_writer->append(bl);
 
-  // prepare log for new transactions
-  log.t.clear();
-  log.t.seq = log.seq_live;
+  log_t.clear();
+  log_t.seq = 0;  // just so debug output is less confusing
+  log_flushing = true;
 
-  int r = _flush_special(log.writer);
+  int r = _flush(log_writer, true);
   ceph_assert(r == 0);
-}
 
-// Clears dirty.files up to (including) seq_stable.
-void BlueFS::_clear_dirty_set_stable_D(uint64_t seq)
-{
-  std::lock_guard dl(dirty.lock);
+  if (jump_to) {
+    dout(10) << __func__ << " jumping log offset from 0x" << std::hex
+            << log_writer->pos << " -> 0x" << jump_to << std::dec << dendl;
+    log_writer->pos = jump_to;
+    vselector->sub_usage(log_writer->file->vselector_hint, log_writer->file->fnode.size);
+    log_writer->file->fnode.size = jump_to;
+    vselector->add_usage(log_writer->file->vselector_hint, log_writer->file->fnode.size);
+  }
+
+  _flush_bdev_safely(log_writer);
+
+  log_flushing = false;
+  log_cond.notify_all();
 
   // clean dirty files
-  if (seq > dirty.seq_stable) {
-    dirty.seq_stable = seq;
-    dout(20) << __func__ << " seq_stable " << dirty.seq_stable << dendl;
-
-    // undirty all files that were already streamed to log
-    auto p = dirty.files.begin();
-    while (p != dirty.files.end()) {
-      if (p->first > dirty.seq_stable) {
+  if (seq > log_seq_stable) {
+    log_seq_stable = seq;
+    dout(20) << __func__ << " log_seq_stable " << log_seq_stable << dendl;
+
+    auto p = dirty_files.begin();
+    while (p != dirty_files.end()) {
+      if (p->first > log_seq_stable) {
         dout(20) << __func__ << " done cleaning up dirty files" << dendl;
         break;
       }
@@ -2682,24 +2574,22 @@ void BlueFS::_clear_dirty_set_stable_D(uint64_t seq)
       auto l = p->second.begin();
       while (l != p->second.end()) {
         File *file = &*l;
-        ceph_assert(file->dirty_seq <= dirty.seq_stable);
+        ceph_assert(file->dirty_seq > 0);
+        ceph_assert(file->dirty_seq <= log_seq_stable);
         dout(20) << __func__ << " cleaned file " << file->fnode << dendl;
-        file->dirty_seq = dirty.seq_stable;
+        file->dirty_seq = 0;
         p->second.erase(l++);
       }
 
       ceph_assert(p->second.empty());
-      dirty.files.erase(p++);
+      dirty_files.erase(p++);
     }
   } else {
-    dout(20) << __func__ << " seq_stable " << dirty.seq_stable
+    dout(20) << __func__ << " log_seq_stable " << log_seq_stable
              << " already >= out seq " << seq
              << ", we lost a race against another log flush, done" << dendl;
   }
-}
 
-void BlueFS::_release_pending_allocations(vector<interval_set<uint64_t>>& to_release)
-{
   for (unsigned i = 0; i < to_release.size(); ++i) {
     if (!to_release[i].empty()) {
       /* OK, now we have the guarantee alloc[i] won't be null. */
@@ -2719,85 +2609,9 @@ void BlueFS::_release_pending_allocations(vector<interval_set<uint64_t>>& to_rel
       }
     }
   }
-}
-
-int BlueFS::_flush_and_sync_log_LD(uint64_t want_seq)
-{
-  int64_t available_runway;
-  do {
-    log.lock.lock();
-    dirty.lock.lock();
-    if (want_seq && want_seq <= dirty.seq_stable) {
-      dout(10) << __func__ << " want_seq " << want_seq << " <= seq_stable "
-              << dirty.seq_stable << ", done" << dendl;
-      dirty.lock.unlock();
-      log.lock.unlock();
-      return 0;
-    }
-
-    available_runway = _maybe_extend_log();
-    if (available_runway == -EWOULDBLOCK) {
-      // we are in need of adding runway, but we are during log-switch from compaction
-      dirty.lock.unlock();
-      //instead log.lock_unlock() do move ownership
-      std::unique_lock<ceph::mutex> ll(log.lock, std::adopt_lock);
-      while (log_forbidden_to_expand.load()) {
-       log_cond.wait(ll);
-      }
-    } else {
-      ceph_assert(available_runway >= 0);
-    }
-  } while (available_runway < 0);
-  
-  ceph_assert(want_seq == 0 || want_seq <= dirty.seq_live); // illegal to request seq that was not created yet
-  uint64_t seq =_log_advance_seq();
-  _consume_dirty(seq);
-  vector<interval_set<uint64_t>> to_release(pending_release.size());
-  to_release.swap(pending_release);
-  dirty.lock.unlock();
-
-  _flush_and_sync_log_core(available_runway);
-  _flush_bdev(log.writer);
-  //now log.lock is no longer needed
-  log.lock.unlock();
-
-  _clear_dirty_set_stable_D(seq);
-  _release_pending_allocations(to_release);
 
   _update_logger_stats();
-  return 0;
-}
-
-// Flushes log and immediately adjusts log_writer pos.
-int BlueFS::_flush_and_sync_log_jump_D(uint64_t jump_to,
-                                    int64_t available_runway)
-{
-  ceph_assert(ceph_mutex_is_locked(log.lock));
-
-  ceph_assert(jump_to);
-  // we synchronize writing to log, by lock to log.lock
 
-  dirty.lock.lock();
-  uint64_t seq =_log_advance_seq();
-  _consume_dirty(seq);
-  vector<interval_set<uint64_t>> to_release(pending_release.size());
-  to_release.swap(pending_release);
-  dirty.lock.unlock();
-  _flush_and_sync_log_core(available_runway);
-
-  dout(10) << __func__ << " jumping log offset from 0x" << std::hex
-          << log.writer->pos << " -> 0x" << jump_to << std::dec << dendl;
-  log.writer->pos = jump_to;
-  vselector->sub_usage(log.writer->file->vselector_hint, log.writer->file->fnode.size);
-  log.writer->file->fnode.size = jump_to;
-  vselector->add_usage(log.writer->file->vselector_hint, log.writer->file->fnode.size);
-
-  _flush_bdev(log.writer);
-
-  _clear_dirty_set_stable_D(seq);
-  _release_pending_allocations(to_release);
-
-  _update_logger_stats();
   return 0;
 }
 
@@ -2807,7 +2621,6 @@ ceph::bufferlist BlueFS::FileWriter::flush_buffer(
   const unsigned length,
   const bluefs_super_t& super)
 {
-  ceph_assert(ceph_mutex_is_locked(this->lock) || file->fnode.ino == 1);
   ceph::bufferlist bl;
   if (partial) {
     tail_block.splice(0, tail_block.length(), &bl);
@@ -2844,43 +2657,35 @@ ceph::bufferlist BlueFS::FileWriter::flush_buffer(
   return bl;
 }
 
-int BlueFS::_signal_dirty_to_log_D(FileWriter *h)
+int BlueFS::_signal_dirty_to_log(FileWriter *h)
 {
-  ceph_assert(ceph_mutex_is_locked(h->lock));
-  std::lock_guard dl(dirty.lock);
   h->file->fnode.mtime = ceph_clock_now();
   ceph_assert(h->file->fnode.ino >= 1);
-  if (h->file->dirty_seq <= dirty.seq_stable) {
-    h->file->dirty_seq = dirty.seq_live;
-    dirty.files[h->file->dirty_seq].push_back(*h->file);
-    dout(20) << __func__ << " dirty_seq = " << dirty.seq_live
+  if (h->file->dirty_seq == 0) {
+    h->file->dirty_seq = log_seq + 1;
+    dirty_files[h->file->dirty_seq].push_back(*h->file);
+    dout(20) << __func__ << " dirty_seq = " << log_seq + 1
             << " (was clean)" << dendl;
   } else {
-    if (h->file->dirty_seq != dirty.seq_live) {
+    if (h->file->dirty_seq != log_seq + 1) {
       // need re-dirty, erase from list first
-      ceph_assert(dirty.files.count(h->file->dirty_seq));
-      auto it = dirty.files[h->file->dirty_seq].iterator_to(*h->file);
-      dirty.files[h->file->dirty_seq].erase(it);
-      h->file->dirty_seq = dirty.seq_live;
-      dirty.files[h->file->dirty_seq].push_back(*h->file);
-      dout(20) << __func__ << " dirty_seq = " << dirty.seq_live
+      ceph_assert(dirty_files.count(h->file->dirty_seq));
+      auto it = dirty_files[h->file->dirty_seq].iterator_to(*h->file);
+      dirty_files[h->file->dirty_seq].erase(it);
+      h->file->dirty_seq = log_seq + 1;
+      dirty_files[h->file->dirty_seq].push_back(*h->file);
+      dout(20) << __func__ << " dirty_seq = " << log_seq + 1
               << " (was " << h->file->dirty_seq << ")" << dendl;
     } else {
-      dout(20) << __func__ << " dirty_seq = " << dirty.seq_live
+      dout(20) << __func__ << " dirty_seq = " << log_seq + 1
               << " (unchanged, do nothing) " << dendl;
     }
   }
   return 0;
 }
 
-void BlueFS::flush_range/*WF*/(FileWriter *h, uint64_t offset, uint64_t length) {
-  std::unique_lock hl(h->lock);
-  _flush_range_F(h, offset, length);
-}
-
-int BlueFS::_flush_range_F(FileWriter *h, uint64_t offset, uint64_t length)
+int BlueFS::_flush_range(FileWriter *h, uint64_t offset, uint64_t length)
 {
-  ceph_assert(ceph_mutex_is_locked(h->lock));
   dout(10) << __func__ << " " << h << " pos 0x" << std::hex << h->pos
           << " 0x" << offset << "~" << length << std::dec
           << " to " << h->file->fnode << dendl;
@@ -2891,8 +2696,11 @@ int BlueFS::_flush_range_F(FileWriter *h, uint64_t offset, uint64_t length)
 
   ceph_assert(h->file->num_readers.load() == 0);
 
-  bool buffered = cct->_conf->bluefs_buffered_io;
-  ceph_assert(h->file->fnode.ino > 1);
+  bool buffered;
+  if (h->file->fnode.ino == 1)
+    buffered = false;
+  else
+    buffered = cct->_conf->bluefs_buffered_io;
 
   if (offset + length <= h->pos)
     return 0;
@@ -2903,16 +2711,17 @@ int BlueFS::_flush_range_F(FileWriter *h, uint64_t offset, uint64_t length)
              << std::hex << offset << "~" << length << std::dec
              << dendl;
   }
-  std::lock_guard file_lock(h->file->lock);
   ceph_assert(offset <= h->file->fnode.size);
 
   uint64_t allocated = h->file->fnode.get_allocated();
   vselector->sub_usage(h->file->vselector_hint, h->file->fnode);
   // do not bother to dirty the file if we are overwriting
   // previously allocated extents.
+
   if (allocated < offset + length) {
     // we should never run out of log space here; see the min runway check
     // in _flush_and_sync_log.
+    ceph_assert(h->file->fnode.ino != 1);
     int r = _allocate(vselector->select_prefer_bdev(h->file->vselector_hint),
                      offset + length - allocated,
                      &h->file->fnode);
@@ -2928,19 +2737,15 @@ int BlueFS::_flush_range_F(FileWriter *h, uint64_t offset, uint64_t length)
   }
   if (h->file->fnode.size < offset + length) {
     h->file->fnode.size = offset + length;
-    h->file->is_dirty = true;
+    if (h->file->fnode.ino > 1) {
+      // we do not need to dirty the log file (or it's compacting
+      // replacement) when the file size changes because replay is
+      // smart enough to discover it on its own.
+      h->file->is_dirty = true;
+    }
   }
-
   dout(20) << __func__ << " file now, unflushed " << h->file->fnode << dendl;
-  return _flush_data(h, offset, length, buffered);
-}
 
-int BlueFS::_flush_data(FileWriter *h, uint64_t offset, uint64_t length, bool buffered)
-{
-  if (h->file->fnode.ino != 1) {
-    ceph_assert(ceph_mutex_is_locked(h->lock));
-    ceph_assert(ceph_mutex_is_locked(h->file->lock));
-  }
   uint64_t x_off = 0;
   auto p = h->file->fnode.seek(offset, &x_off);
   ceph_assert(p != h->file->fnode.extents.end());
@@ -3030,7 +2835,7 @@ void BlueFS::_claim_completed_aios(FileWriter *h, list<aio_t> *ls)
   dout(10) << __func__ << " got " << ls->size() << " aios" << dendl;
 }
 
-void BlueFS::_wait_for_aio(FileWriter *h)
+void BlueFS::wait_for_aio(FileWriter *h)
 {
   // NOTE: this is safe to call without a lock, as long as our reference is
   // stable.
@@ -3047,55 +2852,18 @@ void BlueFS::_wait_for_aio(FileWriter *h)
 }
 #endif
 
-void BlueFS::append_try_flush(FileWriter *h, const char* buf, size_t len)
-{
-  bool flushed_sum = false;
-  {
-    std::unique_lock hl(h->lock);
-    size_t max_size = 1ull << 30; // cap to 1GB
-    while (len > 0) {
-      bool need_flush = true;
-      auto l0 = h->get_buffer_length();
-      if (l0 < max_size) {
-       size_t l = std::min(len, max_size - l0);
-       h->append(buf, l);
-       buf += l;
-       len -= l;
-       need_flush = h->get_buffer_length() >= cct->_conf->bluefs_min_flush_size;
-      }
-      if (need_flush) {
-       bool flushed = false;
-       int r = _flush_F(h, true, &flushed);
-       ceph_assert(r == 0);
-       flushed_sum |= flushed;
-       // make sure we've made any progress with flush hence the
-       // loop doesn't iterate forever
-       ceph_assert(h->get_buffer_length() < max_size);
-      }
-    }
-  }
-  if (flushed_sum) {
-    _maybe_compact_log_LN_NF_LD_D();
-  }
-}
-
-void BlueFS::flush(FileWriter *h, bool force)
+int BlueFS::_flush(FileWriter *h, bool force, std::unique_lock<ceph::mutex>& l)
 {
   bool flushed = false;
-  int r;
-  {
-    std::unique_lock hl(h->lock);
-    r = _flush_F(h, force, &flushed);
-    ceph_assert(r == 0);
-  }
+  int r = _flush(h, force, &flushed);
   if (r == 0 && flushed) {
-    _maybe_compact_log_LN_NF_LD_D();
+    _maybe_compact_log(l);
   }
+  return r;
 }
 
-int BlueFS::_flush_F(FileWriter *h, bool force, bool *flushed)
+int BlueFS::_flush(FileWriter *h, bool force, bool *flushed)
 {
-  ceph_assert(ceph_mutex_is_locked(h->lock));
   uint64_t length = h->get_buffer_length();
   uint64_t offset = h->pos;
   if (flushed) {
@@ -3117,34 +2885,15 @@ int BlueFS::_flush_F(FileWriter *h, bool force, bool *flushed)
            << std::hex << offset << "~" << length << std::dec
           << " to " << h->file->fnode << dendl;
   ceph_assert(h->pos <= h->file->fnode.size);
-  int r = _flush_range_F(h, offset, length);
+  int r = _flush_range(h, offset, length);
   if (flushed) {
     *flushed = true;
   }
   return r;
 }
 
-// Flush for bluefs special files.
-// Does not add extents to h.
-// Does not mark h as dirty.
-// we do not need to dirty the log file (or it's compacting
-// replacement) when the file size changes because replay is
-// smart enough to discover it on its own.
-int BlueFS::_flush_special(FileWriter *h)
+int BlueFS::_truncate(FileWriter *h, uint64_t offset)
 {
-  ceph_assert(h->file->fnode.ino <= 1);
-  uint64_t length = h->get_buffer_length();
-  uint64_t offset = h->pos;
-  ceph_assert(length + offset <= h->file->fnode.get_allocated());
-  if (h->file->fnode.size < offset + length) {
-    h->file->fnode.size = offset + length;
-  }
-  return _flush_data(h, offset, length, false);
-}
-
-int BlueFS::truncate(FileWriter *h, uint64_t offset)
-{
-  std::lock_guard hl(h->lock);
   dout(10) << __func__ << " 0x" << std::hex << offset << std::dec
            << " file " << h->file->fnode << dendl;
   if (h->file->deleted) {
@@ -3163,7 +2912,7 @@ int BlueFS::truncate(FileWriter *h, uint64_t offset)
     ceph_abort_msg("actually this shouldn't happen");
   }
   if (h->get_buffer_length()) {
-    int r = _flush_F(h, true);
+    int r = _flush(h, true);
     if (r < 0)
       return r;
   }
@@ -3177,63 +2926,58 @@ int BlueFS::truncate(FileWriter *h, uint64_t offset)
   vselector->sub_usage(h->file->vselector_hint, h->file->fnode.size);
   h->file->fnode.size = offset;
   vselector->add_usage(h->file->vselector_hint, h->file->fnode.size);
-  std::lock_guard ll(log.lock);
-  log.t.op_file_update(h->file->fnode);
+  log_t.op_file_update(h->file->fnode);
   return 0;
 }
 
-int BlueFS::fsync(FileWriter *h)
+int BlueFS::_fsync(FileWriter *h, std::unique_lock<ceph::mutex>& l)
 {
-  std::unique_lock hl(h->lock);
-  uint64_t old_dirty_seq = 0;
-  {
-    dout(10) << __func__ << " " << h << " " << h->file->fnode << dendl;
-    int r = _flush_F(h, true);
-    if (r < 0)
-      return r;
-    _flush_bdev(h);
-    if (h->file->is_dirty) {
-      _signal_dirty_to_log_D(h);
-      h->file->is_dirty = false;
-    }
-    {
-      std::lock_guard dl(dirty.lock);
-      if (dirty.seq_stable < h->file->dirty_seq) {
-       old_dirty_seq = h->file->dirty_seq;
-       dout(20) << __func__ << " file metadata was dirty (" << old_dirty_seq
-                << ") on " << h->file->fnode << ", flushing log" << dendl;
-      }
-    }
+  dout(10) << __func__ << " " << h << " " << h->file->fnode << dendl;
+  int r = _flush(h, true);
+  if (r < 0)
+     return r;
+  if (h->file->is_dirty) {
+    _signal_dirty_to_log(h);
+    h->file->is_dirty = false;
   }
+  uint64_t old_dirty_seq = h->file->dirty_seq;
+
+  _flush_bdev_safely(h);
+
   if (old_dirty_seq) {
-    _flush_and_sync_log_LD(old_dirty_seq);
+    uint64_t s = log_seq;
+    dout(20) << __func__ << " file metadata was dirty (" << old_dirty_seq
+            << ") on " << h->file->fnode << ", flushing log" << dendl;
+    _flush_and_sync_log(l, old_dirty_seq);
+    ceph_assert(h->file->dirty_seq == 0 ||  // cleaned
+          h->file->dirty_seq > s);    // or redirtied by someone else
   }
-  _maybe_compact_log_LN_NF_LD_D();
   return 0;
 }
 
-// be careful - either h->file->lock or log.lock must be taken
-void BlueFS::_flush_bdev(FileWriter *h)
+void BlueFS::_flush_bdev_safely(FileWriter *h)
 {
-  if (h->file->fnode.ino != 1) {
-    ceph_assert(ceph_mutex_is_locked(h->lock));
-  } else {
-    ceph_assert(ceph_mutex_is_locked(log.lock));
-  }
   std::array<bool, MAX_BDEV> flush_devs = h->dirty_devs;
   h->dirty_devs.fill(false);
 #ifdef HAVE_LIBAIO
   if (!cct->_conf->bluefs_sync_write) {
     list<aio_t> completed_ios;
     _claim_completed_aios(h, &completed_ios);
-    _wait_for_aio(h);
+    lock.unlock();
+    wait_for_aio(h);
     completed_ios.clear();
-  }
+    flush_bdev(flush_devs);
+    lock.lock();
+  } else
 #endif
-  _flush_bdev(flush_devs);
+  {
+    lock.unlock();
+    flush_bdev(flush_devs);
+    lock.lock();
+  }
 }
 
-void BlueFS::_flush_bdev(std::array<bool, MAX_BDEV>& dirty_bdevs)
+void BlueFS::flush_bdev(std::array<bool, MAX_BDEV>& dirty_bdevs)
 {
   // NOTE: this is safe to call without a lock.
   dout(20) << __func__ << dendl;
@@ -3243,7 +2987,7 @@ void BlueFS::_flush_bdev(std::array<bool, MAX_BDEV>& dirty_bdevs)
   }
 }
 
-void BlueFS::_flush_bdev()
+void BlueFS::flush_bdev()
 {
   // NOTE: this is safe to call without a lock.
   dout(20) << __func__ << dendl;
@@ -3364,10 +3108,8 @@ int BlueFS::_allocate(uint8_t id, uint64_t len,
   return 0;
 }
 
-int BlueFS::preallocate(FileRef f, uint64_t off, uint64_t len)
+int BlueFS::_preallocate(FileRef f, uint64_t off, uint64_t len)
 {
-  std::lock_guard ll(log.lock);
-  std::lock_guard fl(f->lock);
   dout(10) << __func__ << " file " << f->fnode << " 0x"
           << std::hex << off << "~" << len << std::dec << dendl;
   if (f->deleted) {
@@ -3386,44 +3128,39 @@ int BlueFS::preallocate(FileRef f, uint64_t off, uint64_t len)
     vselector->add_usage(f->vselector_hint, f->fnode);
     if (r < 0)
       return r;
-    log.t.op_file_update(f->fnode);
+    log_t.op_file_update(f->fnode);
   }
   return 0;
 }
 
 void BlueFS::sync_metadata(bool avoid_compact)
 {
-  bool can_skip_flush;
-  {
-    std::lock_guard ll(log.lock);
-    std::lock_guard dl(dirty.lock);
-    can_skip_flush = log.t.empty() && dirty.files.empty();
-  }
-  if (can_skip_flush) {
+  std::unique_lock l(lock);
+  if (log_t.empty() && dirty_files.empty()) {
     dout(10) << __func__ << " - no pending log events" << dendl;
   } else {
     utime_t start;
     lgeneric_subdout(cct, bluefs, 10) << __func__;
     start = ceph_clock_now();
     *_dout <<  dendl;
-    _flush_bdev(); // FIXME?
-    _flush_and_sync_log_LD();
+    flush_bdev(); // FIXME?
+    _flush_and_sync_log(l);
     dout(10) << __func__ << " done in " << (ceph_clock_now() - start) << dendl;
   }
 
   if (!avoid_compact) {
-    _maybe_compact_log_LN_NF_LD_D();
+    _maybe_compact_log(l);
   }
 }
 
-void BlueFS::_maybe_compact_log_LN_NF_LD_D()
+void BlueFS::_maybe_compact_log(std::unique_lock<ceph::mutex>& l)
 {
   if (!cct->_conf->bluefs_replay_recovery_disable_compact &&
-      _should_start_compact_log_L_N()) {
+      _should_compact_log()) {
     if (cct->_conf->bluefs_compact_log_sync) {
-      _compact_log_sync_LN_LD();
+      _compact_log_sync();
     } else {
-      _compact_log_async_LD_NF_D();
+      _compact_log_async(l);
     }
   }
 }
@@ -3434,11 +3171,11 @@ int BlueFS::open_for_write(
   FileWriter **h,
   bool overwrite)
 {
-  std::unique_lock nl(nodes.lock);
+  std::lock_guard l(lock);
   dout(10) << __func__ << " " << dirname << "/" << filename << dendl;
-  map<string,DirRef>::iterator p = nodes.dir_map.find(dirname);
+  map<string,DirRef>::iterator p = dir_map.find(dirname);
   DirRef dir;
-  if (p == nodes.dir_map.end()) {
+  if (p == dir_map.end()) {
     // implicitly create the dir
     dout(20) << __func__ << "  dir " << dirname
             << " does not exist" << dendl;
@@ -3460,7 +3197,7 @@ int BlueFS::open_for_write(
     }
     file = ceph::make_ref<File>();
     file->fnode.ino = ++ino_last;
-    nodes.file_map[ino_last] = file;
+    file_map[ino_last] = file;
     dir->file_map[string{filename}] = file;
     ++file->refs;
     create = true;
@@ -3496,13 +3233,11 @@ int BlueFS::open_for_write(
   dout(20) << __func__ << " mapping " << dirname << "/" << filename
           << " vsel_hint " << file->vselector_hint
           << dendl;
-  nl.unlock();
-  {
-    std::lock_guard ll(log.lock);
-    log.t.op_file_update(file->fnode);
-    if (create)
-      log.t.op_dir_link(dirname, filename, file->fnode.ino);
-  }
+
+  log_t.op_file_update(file->fnode);
+  if (create)
+    log_t.op_dir_link(dirname, filename, file->fnode.ino);
+
   *h = _create_writer(file);
 
   if (boost::algorithm::ends_with(filename, ".log")) {
@@ -3532,7 +3267,7 @@ BlueFS::FileWriter *BlueFS::_create_writer(FileRef f)
   return w;
 }
 
-void BlueFS::_drain_writer(FileWriter *h)
+void BlueFS::_close_writer(FileWriter *h)
 {
   dout(10) << __func__ << " " << h << " type " << h->writer_type << dendl;
   //h->buffer.reassign_to_mempool(mempool::mempool_bluefs_file_writer);
@@ -3548,31 +3283,18 @@ void BlueFS::_drain_writer(FileWriter *h)
   if (h->file->fnode.size >= (1ull << 30)) {
     dout(10) << __func__ << " file is unexpectedly large:" << h->file->fnode << dendl;
   }
-}
-
-void BlueFS::_close_writer(FileWriter *h)
-{
-  _drain_writer(h);
-  delete h;
-}
-void BlueFS::close_writer(FileWriter *h)
-{
-  {
-    std::lock_guard l(h->lock);
-    _drain_writer(h);
-  }
   delete h;
 }
 
 uint64_t BlueFS::debug_get_dirty_seq(FileWriter *h)
 {
-  std::lock_guard l(h->lock);
+  std::lock_guard l(lock);
   return h->file->dirty_seq;
 }
 
 bool BlueFS::debug_get_is_dev_dirty(FileWriter *h, uint8_t dev)
 {
-  std::lock_guard l(h->lock);
+  std::lock_guard l(lock);
   return h->dirty_devs[dev];
 }
 
@@ -3582,11 +3304,11 @@ int BlueFS::open_for_read(
   FileReader **h,
   bool random)
 {
-  std::lock_guard nl(nodes.lock);
+  std::lock_guard l(lock);
   dout(10) << __func__ << " " << dirname << "/" << filename
           << (random ? " (random)":" (sequential)") << dendl;
-  map<string,DirRef>::iterator p = nodes.dir_map.find(dirname);
-  if (p == nodes.dir_map.end()) {
+  map<string,DirRef>::iterator p = dir_map.find(dirname);
+  if (p == dir_map.end()) {
     dout(20) << __func__ << " dir " << dirname << " not found" << dendl;
     return -ENOENT;
   }
@@ -3611,12 +3333,11 @@ int BlueFS::rename(
   std::string_view old_dirname, std::string_view old_filename,
   std::string_view new_dirname, std::string_view new_filename)
 {
-  std::lock_guard ll(log.lock);
-  std::lock_guard nl(nodes.lock);
+  std::lock_guard l(lock);
   dout(10) << __func__ << " " << old_dirname << "/" << old_filename
           << " -> " << new_dirname << "/" << new_filename << dendl;
-  map<string,DirRef>::iterator p = nodes.dir_map.find(old_dirname);
-  if (p == nodes.dir_map.end()) {
+  map<string,DirRef>::iterator p = dir_map.find(old_dirname);
+  if (p == dir_map.end()) {
     dout(20) << __func__ << " dir " << old_dirname << " not found" << dendl;
     return -ENOENT;
   }
@@ -3630,8 +3351,8 @@ int BlueFS::rename(
   }
   FileRef file = q->second;
 
-  p = nodes.dir_map.find(new_dirname);
-  if (p == nodes.dir_map.end()) {
+  p = dir_map.find(new_dirname);
+  if (p == dir_map.end()) {
     dout(20) << __func__ << " dir " << new_dirname << " not found" << dendl;
     return -ENOENT;
   }
@@ -3642,8 +3363,8 @@ int BlueFS::rename(
             << ") file " << new_filename
             << " already exists, unlinking" << dendl;
     ceph_assert(q->second != file);
-    log.t.op_dir_unlink(new_dirname, new_filename);
-    _drop_link_D(q->second);
+    log_t.op_dir_unlink(new_dirname, new_filename);
+    _drop_link(q->second);
   }
 
   dout(10) << __func__ << " " << new_dirname << "/" << new_filename << " "
@@ -3652,33 +3373,31 @@ int BlueFS::rename(
   new_dir->file_map[string{new_filename}] = file;
   old_dir->file_map.erase(string{old_filename});
 
-  log.t.op_dir_link(new_dirname, new_filename, file->fnode.ino);
-  log.t.op_dir_unlink(old_dirname, old_filename);
+  log_t.op_dir_link(new_dirname, new_filename, file->fnode.ino);
+  log_t.op_dir_unlink(old_dirname, old_filename);
   return 0;
 }
 
 int BlueFS::mkdir(std::string_view dirname)
 {
-  std::lock_guard ll(log.lock);
-  std::lock_guard nl(nodes.lock);
+  std::lock_guard l(lock);
   dout(10) << __func__ << " " << dirname << dendl;
-  map<string,DirRef>::iterator p = nodes.dir_map.find(dirname);
-  if (p != nodes.dir_map.end()) {
+  map<string,DirRef>::iterator p = dir_map.find(dirname);
+  if (p != dir_map.end()) {
     dout(20) << __func__ << " dir " << dirname << " exists" << dendl;
     return -EEXIST;
   }
-  nodes.dir_map[string{dirname}] = ceph::make_ref<Dir>();
-  log.t.op_dir_create(dirname);
+  dir_map[string{dirname}] = ceph::make_ref<Dir>();
+  log_t.op_dir_create(dirname);
   return 0;
 }
 
 int BlueFS::rmdir(std::string_view dirname)
 {
-  std::lock_guard ll(log.lock);
-  std::lock_guard nl(nodes.lock);
+  std::lock_guard l(lock);
   dout(10) << __func__ << " " << dirname << dendl;
-  auto p = nodes.dir_map.find(dirname);
-  if (p == nodes.dir_map.end()) {
+  auto p = dir_map.find(dirname);
+  if (p == dir_map.end()) {
     dout(20) << __func__ << " dir " << dirname << " does not exist" << dendl;
     return -ENOENT;
   }
@@ -3687,16 +3406,16 @@ int BlueFS::rmdir(std::string_view dirname)
     dout(20) << __func__ << " dir " << dirname << " not empty" << dendl;
     return -ENOTEMPTY;
   }
-  nodes.dir_map.erase(string{dirname});
-  log.t.op_dir_remove(dirname);
+  dir_map.erase(string{dirname});
+  log_t.op_dir_remove(dirname);
   return 0;
 }
 
 bool BlueFS::dir_exists(std::string_view dirname)
 {
-  std::lock_guard nl(nodes.lock);
-  map<string,DirRef>::iterator p = nodes.dir_map.find(dirname);
-  bool exists = p != nodes.dir_map.end();
+  std::lock_guard l(lock);
+  map<string,DirRef>::iterator p = dir_map.find(dirname);
+  bool exists = p != dir_map.end();
   dout(10) << __func__ << " " << dirname << " = " << (int)exists << dendl;
   return exists;
 }
@@ -3704,10 +3423,10 @@ bool BlueFS::dir_exists(std::string_view dirname)
 int BlueFS::stat(std::string_view dirname, std::string_view filename,
                 uint64_t *size, utime_t *mtime)
 {
-  std::lock_guard nl(nodes.lock);
+  std::lock_guard l(lock);
   dout(10) << __func__ << " " << dirname << "/" << filename << dendl;
-  map<string,DirRef>::iterator p = nodes.dir_map.find(dirname);
-  if (p == nodes.dir_map.end()) {
+  map<string,DirRef>::iterator p = dir_map.find(dirname);
+  if (p == dir_map.end()) {
     dout(20) << __func__ << " dir " << dirname << " not found" << dendl;
     return -ENOENT;
   }
@@ -3732,11 +3451,10 @@ int BlueFS::stat(std::string_view dirname, std::string_view filename,
 int BlueFS::lock_file(std::string_view dirname, std::string_view filename,
                      FileLock **plock)
 {
-  std::lock_guard ll(log.lock);
-  std::lock_guard nl(nodes.lock);
+  std::lock_guard l(lock);
   dout(10) << __func__ << " " << dirname << "/" << filename << dendl;
-  map<string,DirRef>::iterator p = nodes.dir_map.find(dirname);
-  if (p == nodes.dir_map.end()) {
+  map<string,DirRef>::iterator p = dir_map.find(dirname);
+  if (p == dir_map.end()) {
     dout(20) << __func__ << " dir " << dirname << " not found" << dendl;
     return -ENOENT;
   }
@@ -3750,11 +3468,11 @@ int BlueFS::lock_file(std::string_view dirname, std::string_view filename,
     file = ceph::make_ref<File>();
     file->fnode.ino = ++ino_last;
     file->fnode.mtime = ceph_clock_now();
-    nodes.file_map[ino_last] = file;
+    file_map[ino_last] = file;
     dir->file_map[string{filename}] = file;
     ++file->refs;
-    log.t.op_file_update(file->fnode);
-    log.t.op_dir_link(dirname, filename, file->fnode.ino);
+    log_t.op_file_update(file->fnode);
+    log_t.op_dir_link(dirname, filename, file->fnode.ino);
   } else {
     file = q->second;
     if (file->locked) {
@@ -3771,7 +3489,7 @@ int BlueFS::lock_file(std::string_view dirname, std::string_view filename,
 
 int BlueFS::unlock_file(FileLock *fl)
 {
-  std::lock_guard nl(nodes.lock);
+  std::lock_guard l(lock);
   dout(10) << __func__ << " " << fl << " on " << fl->file->fnode << dendl;
   ceph_assert(fl->file->locked);
   fl->file->locked = false;
@@ -3785,18 +3503,18 @@ int BlueFS::readdir(std::string_view dirname, vector<string> *ls)
   if (!dirname.empty() && dirname.back() == '/') {
     dirname.remove_suffix(1);
   }
-  std::lock_guard nl(nodes.lock);
+  std::lock_guard l(lock);
   dout(10) << __func__ << " " << dirname << dendl;
   if (dirname.empty()) {
     // list dirs
-    ls->reserve(nodes.dir_map.size() + 2);
-    for (auto& q : nodes.dir_map) {
+    ls->reserve(dir_map.size() + 2);
+    for (auto& q : dir_map) {
       ls->push_back(q.first);
     }
   } else {
     // list files in dir
-    map<string,DirRef>::iterator p = nodes.dir_map.find(dirname);
-    if (p == nodes.dir_map.end()) {
+    map<string,DirRef>::iterator p = dir_map.find(dirname);
+    if (p == dir_map.end()) {
       dout(20) << __func__ << " dir " << dirname << " not found" << dendl;
       return -ENOENT;
     }
@@ -3813,11 +3531,10 @@ int BlueFS::readdir(std::string_view dirname, vector<string> *ls)
 
 int BlueFS::unlink(std::string_view dirname, std::string_view filename)
 {
-  std::lock_guard ll(log.lock);
-  std::lock_guard nl(nodes.lock);
+  std::lock_guard l(lock);
   dout(10) << __func__ << " " << dirname << "/" << filename << dendl;
-  map<string,DirRef>::iterator p = nodes.dir_map.find(dirname);
-  if (p == nodes.dir_map.end()) {
+  map<string,DirRef>::iterator p = dir_map.find(dirname);
+  if (p == dir_map.end()) {
     dout(20) << __func__ << " dir " << dirname << " not found" << dendl;
     return -ENOENT;
   }
@@ -3835,8 +3552,8 @@ int BlueFS::unlink(std::string_view dirname, std::string_view filename)
     return -EBUSY;
   }
   dir->file_map.erase(string{filename});
-  log.t.op_dir_unlink(dirname, filename);
-  _drop_link_D(file);
+  log_t.op_dir_unlink(dirname, filename);
+  _drop_link(file);
   return 0;
 }
 
@@ -3859,7 +3576,7 @@ bool BlueFS::wal_is_rotational()
   When we find it, we decode following bytes as extent.
   We read that whole extent and then check if merged with existing log part gives a proper bluefs transaction.
  */
-int BlueFS::_do_replay_recovery_read(FileReader *log_reader,
+int BlueFS::do_replay_recovery_read(FileReader *log_reader,
                                    size_t replay_pos,
                                    size_t read_offset,
                                    size_t read_len,
@@ -3914,7 +3631,7 @@ int BlueFS::_do_replay_recovery_read(FileReader *log_reader,
     dout(2) << __func__ << " processing " << get_device_name(dev) << dendl;
     interval_set<uint64_t> disk_regions;
     disk_regions.insert(0, bdev[dev]->get_size());
-    for (auto f : nodes.file_map) {
+    for (auto f : file_map) {
       auto& e = f.second->fnode.extents;
       for (auto& p : e) {
        if (p.bdev == dev) {
index 085b131d0dcc260fbe142032d3816a21ba1dccec..b285592b41df4c83ef9d0564126df5c2c8e6bd0c 100644 (file)
@@ -121,11 +121,6 @@ public:
     std::atomic_int num_reading;
 
     void* vselector_hint = nullptr;
-    /* lock protects fnode and other the parts that can be modified during read & write operations.
-       Does not protect values that are fixed
-       Does not need to be taken when doing one-time operations:
-       _replay, device_migrate_to_existing, device_migrate_to_new */
-    ceph::mutex lock = ceph::make_mutex("BlueFS::File::lock");
 
   private:
     FRIEND_MAKE_REF(File);
@@ -309,6 +304,8 @@ public:
   };
 
 private:
+  ceph::mutex lock = ceph::make_mutex("BlueFS::lock");
+
   PerfCounters *logger = nullptr;
 
   uint64_t max_bytes[MAX_BDEV] = {0};
@@ -319,35 +316,26 @@ private:
   };
 
   // cache
-  struct {
-    ceph::mutex lock = ceph::make_mutex("BlueFS::nodes.lock");
-    mempool::bluefs::map<std::string, DirRef, std::less<>> dir_map; ///< dirname -> Dir
-    mempool::bluefs::unordered_map<uint64_t, FileRef> file_map;     ///< ino -> File
-  } nodes;
+  mempool::bluefs::map<std::string, DirRef, std::less<>> dir_map;          ///< dirname -> Dir
+  mempool::bluefs::unordered_map<uint64_t, FileRef> file_map; ///< ino -> File
+
+  // map of dirty files, files of same dirty_seq are grouped into list.
+  std::map<uint64_t, dirty_file_list_t> dirty_files;
 
   bluefs_super_t super;        ///< latest superblock (as last written)
   uint64_t ino_last = 0;       ///< last assigned ino (this one is in use)
+  uint64_t log_seq = 0;        ///< last used log seq (by current pending log_t)
+  uint64_t log_seq_stable = 0; ///< last stable/synced log seq
+  FileWriter *log_writer = 0;  ///< writer for the log
+  bluefs_transaction_t log_t;  ///< pending, unwritten log transaction
+  bool log_flushing = false;   ///< true while flushing the log
+  ceph::condition_variable log_cond;
+
+  uint64_t new_log_jump_to = 0;
+  uint64_t old_log_jump_to = 0;
+  FileRef new_log = nullptr;
+  FileWriter *new_log_writer = nullptr;
 
-  struct {
-    ceph::mutex lock = ceph::make_mutex("BlueFS::log.lock");
-    //uint64_t seq_stable = 0; //seq that is now stable on disk    AK - consider also mirroring this
-    uint64_t seq_live = 1;   //seq that log is currently writing to; mirrors dirty.seq_live
-    FileWriter *writer = 0;
-    bluefs_transaction_t t;
-  } log;
-
-  struct {
-    ceph::mutex lock = ceph::make_mutex("BlueFS::dirty.lock");
-    uint64_t seq_stable = 0; //seq that is now stable on disk
-    uint64_t seq_live = 1;   //seq that is ongoing and dirty files will be written to
-    // map of dirty files, files of same dirty_seq are grouped into list.
-    std::map<uint64_t, dirty_file_list_t> files;
-  } dirty;
-
-  ceph::condition_variable log_cond;                             ///< used for state control between log flush / log compaction
-  std::atomic<bool> log_is_compacting{false};                    ///< signals that bluefs log is already ongoing compaction
-  std::atomic<bool> log_forbidden_to_expand{false};              ///< used to signal that async compaction is in state
-                                                                 ///  that prohibits expansion of bluefs log
   /*
    * There are up to 3 block devices:
    *
@@ -361,11 +349,6 @@ private:
   std::vector<Allocator*> alloc;                   ///< allocators for bdevs
   std::vector<uint64_t> alloc_size;                ///< alloc size for each device
   std::vector<interval_set<uint64_t>> pending_release; ///< extents to release
-  // TODO: it should be examined what makes pending_release immune to
-  // eras in a way similar to dirty_files. Hints:
-  // 1) we have actually only 2 eras: log_seq and log_seq+1
-  // 2) we usually not remove extents from files. And when we do, we force log-syncing.
-
   //std::vector<interval_set<uint64_t>> block_unused_too_granular;
 
   BlockDevice::aio_callback_t discard_cb[3]; //discard callbacks for each dev
@@ -397,7 +380,7 @@ private:
 
 
   FileRef _get_file(uint64_t ino);
-  void _drop_link_D(FileRef f);
+  void _drop_link(FileRef f);
 
   unsigned _get_slow_device_id() {
     return bdev[BDEV_SLOW] ? BDEV_SLOW : BDEV_DB;
@@ -409,32 +392,22 @@ private:
                                 PExtentVector* extents);
 
   /* signal replay log to include h->file in nearest log flush */
-  int _signal_dirty_to_log_D(FileWriter *h);
-  int _flush_range_F(FileWriter *h, uint64_t offset, uint64_t length);
-  int _flush_data(FileWriter *h, uint64_t offset, uint64_t length, bool buffered);
-  int _flush_F(FileWriter *h, bool force, bool *flushed = nullptr);
-  int _flush_special(FileWriter *h);
-  int _fsync(FileWriter *h);
+  int _signal_dirty_to_log(FileWriter *h);
+  int _flush_range(FileWriter *h, uint64_t offset, uint64_t length);
+  int _flush(FileWriter *h, bool force, std::unique_lock<ceph::mutex>& l);
+  int _flush(FileWriter *h, bool force, bool *flushed = nullptr);
+  int _fsync(FileWriter *h, std::unique_lock<ceph::mutex>& l);
 
 #ifdef HAVE_LIBAIO
   void _claim_completed_aios(FileWriter *h, std::list<aio_t> *ls);
-  void _wait_for_aio(FileWriter *h);  // safe to call without a lock
+  void wait_for_aio(FileWriter *h);  // safe to call without a lock
 #endif
 
-  int64_t _maybe_extend_log();
-  void _extend_log();
-  uint64_t _log_advance_seq();
-  void _consume_dirty(uint64_t seq);
-  void _clear_dirty_set_stable_D(uint64_t seq_stable);
-  void _release_pending_allocations(std::vector<interval_set<uint64_t>>& to_release);
-
-  void _flush_and_sync_log_core(int64_t available_runway);
-  int _flush_and_sync_log_jump_D(uint64_t jump_to,
-                              int64_t available_runway);
-  int _flush_and_sync_log_LD(uint64_t want_seq = 0);
-
-  uint64_t _estimate_log_size_N();
-  bool _should_start_compact_log_L_N();
+  int _flush_and_sync_log(std::unique_lock<ceph::mutex>& l,
+                         uint64_t want_seq = 0,
+                         uint64_t jump_to = 0);
+  uint64_t _estimate_log_size();
+  bool _should_compact_log();
 
   enum {
     REMOVE_DB = 1,
@@ -442,15 +415,12 @@ private:
     RENAME_SLOW2DB = 4,
     RENAME_DB2SLOW = 8,
   };
-  void _compact_log_dump_metadata_N(bluefs_transaction_t *t,
-                                int flags);
-  void compact_log_async_dump_metadata_NF(bluefs_transaction_t *t,
-                                         uint64_t capture_before_seq);
+  void _compact_log_dump_metadata(bluefs_transaction_t *t,
+                                 int flags);
+  void _compact_log_sync();
+  void _compact_log_async(std::unique_lock<ceph::mutex>& l);
 
-  void _compact_log_sync_LN_LD();
-  void _compact_log_async_LD_NF_D();
-
-  void _rewrite_log_and_layout_sync_LN_LD(bool allocate_with_fallback,
+  void _rewrite_log_and_layout_sync(bool allocate_with_fallback,
                                    int super_dev,
                                    int log_dev,
                                    int new_log_dev,
@@ -459,9 +429,9 @@ private:
 
   //void _aio_finish(void *priv);
 
-  void _flush_bdev(FileWriter *h);
-  void _flush_bdev();  // this is safe to call without a lock
-  void _flush_bdev(std::array<bool, MAX_BDEV>& dirty_bdevs);  // this is safe to call without a lock
+  void _flush_bdev_safely(FileWriter *h);
+  void flush_bdev();  // this is safe to call without a lock
+  void flush_bdev(std::array<bool, MAX_BDEV>& dirty_bdevs);  // this is safe to call without a lock
 
   int _preallocate(FileRef f, uint64_t off, uint64_t len);
   int _truncate(FileWriter *h, uint64_t off);
@@ -478,6 +448,8 @@ private:
     uint64_t len,    ///< [in] this many bytes
     char *out);      ///< [out] optional: or copy it here
 
+  void _invalidate_cache(FileRef f, uint64_t offset, uint64_t length);
+
   int _open_super();
   int _write_super(int dev);
   int _check_allocations(const bluefs_fnode_t& fnode,
@@ -490,7 +462,6 @@ private:
   int _replay(bool noop, bool to_stdout = false); ///< replay journal
 
   FileWriter *_create_writer(FileRef f);
-  void _drain_writer(FileWriter *h);
   void _close_writer(FileWriter *h);
 
   // always put the super in the second 4k block.  FIXME should this be
@@ -556,8 +527,10 @@ public:
     FileReader **h,
     bool random = false);
 
-  // data added after last fsync() is lost
-  void close_writer(FileWriter *h);
+  void close_writer(FileWriter *h) {
+    std::lock_guard l(lock);
+    _close_writer(h);
+  }
 
   int rename(std::string_view old_dir, std::string_view old_file,
             std::string_view new_dir, std::string_view new_file);
@@ -581,7 +554,7 @@ public:
   /// sync any uncommitted state to disk
   void sync_metadata(bool avoid_compact);
   /// test and compact log, if necessary
-  void _maybe_compact_log_LN_NF_LD_D();
+  void _maybe_compact_log(std::unique_lock<ceph::mutex>& l);
 
   void set_volume_selector(BlueFSVolumeSelector* s) {
     vselector.reset(s);
@@ -603,11 +576,42 @@ public:
   // handler for discard event
   void handle_discard(unsigned dev, interval_set<uint64_t>& to_release);
 
-  void flush(FileWriter *h, bool force = false);
+  void flush(FileWriter *h, bool force = false) {
+    std::unique_lock l(lock);
+    int r = _flush(h, force, l);
+    ceph_assert(r == 0);
+  }
 
-  void append_try_flush(FileWriter *h, const char* buf, size_t len);
-  void flush_range(FileWriter *h, uint64_t offset, uint64_t length);
-  int fsync(FileWriter *h);
+  void append_try_flush(FileWriter *h, const char* buf, size_t len) {
+    size_t max_size = 1ull << 30; // cap to 1GB
+    while (len > 0) {
+      bool need_flush = true;
+      auto l0 = h->get_buffer_length();
+      if (l0 < max_size) {
+       size_t l = std::min(len, max_size - l0);
+       h->append(buf, l);
+       buf += l;
+       len -= l;
+       need_flush = h->get_buffer_length() >= cct->_conf->bluefs_min_flush_size;
+      }
+      if (need_flush) {
+       flush(h, true);
+       // make sure we've made any progress with flush hence the
+       // loop doesn't iterate forever
+       ceph_assert(h->get_buffer_length() < max_size);
+      }
+    }
+  }
+  void flush_range(FileWriter *h, uint64_t offset, uint64_t length) {
+    std::lock_guard l(lock);
+    _flush_range(h, offset, length);
+  }
+  int fsync(FileWriter *h) {
+    std::unique_lock l(lock);
+    int r = _fsync(h, l);
+    _maybe_compact_log(l);
+    return r;
+  }
   int64_t read(FileReader *h, uint64_t offset, size_t len,
           ceph::buffer::list *outbl, char *out) {
     // no need to hold the global lock here; we only touch h and
@@ -622,14 +626,23 @@ public:
     // atomics and asserts).
     return _read_random(h, offset, len, out);
   }
-  void invalidate_cache(FileRef f, uint64_t offset, uint64_t len);
-  int preallocate(FileRef f, uint64_t offset, uint64_t len);
-  int truncate(FileWriter *h, uint64_t offset);
-  int _do_replay_recovery_read(FileReader *log,
-                              size_t log_pos,
-                              size_t read_offset,
-                              size_t read_len,
-                              bufferlist* bl);
+  void invalidate_cache(FileRef f, uint64_t offset, uint64_t len) {
+    std::lock_guard l(lock);
+    _invalidate_cache(f, offset, len);
+  }
+  int preallocate(FileRef f, uint64_t offset, uint64_t len) {
+    std::lock_guard l(lock);
+    return _preallocate(f, offset, len);
+  }
+  int truncate(FileWriter *h, uint64_t offset) {
+    std::lock_guard l(lock);
+    return _truncate(h, offset);
+  }
+  int do_replay_recovery_read(FileReader *log,
+                             size_t log_pos,
+                             size_t read_offset,
+                             size_t read_len,
+                             bufferlist* bl);
 
   size_t probe_alloc_avail(int dev, uint64_t alloc_size);
 
@@ -695,22 +708,5 @@ public:
 
   void get_paths(const std::string& base, paths& res) const override;
 };
-/**
- * Directional graph of locks.
- * Vertices - Locks. Edges (directed) - locking progression.
- * Edge A->B exist if last taken lock was A and next taken lock is B.
- * 
- * Column represents last lock taken.
- * Row represents next lock taken.
- *
- *     <        | L | D | N | F | W
- * -------------|---|---|---|---|---
- * log        L |     <   <         
- * dirty      D |                   
- * nodes      N |             <     
- * File       F |                  
- * FileWriter W | <   <       <      
- * 
- * Claim: Deadlock is possible IFF graph contains cycles.
- */
+
 #endif