]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
[BlueStore]: [Remove Allocations from RocksDB] 39871/head
authorGabriel BenHanokh <benhanokh@gmail.com>
Thu, 14 Jan 2021 06:59:35 +0000 (08:59 +0200)
committerGabriel BenHanokh <benhanokh@gmail.com>
Wed, 11 Aug 2021 13:53:09 +0000 (16:53 +0300)
Currently BlueStore keeps its allocation info inside RocksDB.
BlueStore is committing all allocation information (alloc/release) into RocksDB (column-family B) before the client Write is performed causing a delay in write path and adding significant load to the CPU/Memory/Disk.
Committing all state into RocksDB allows Ceph to survive failures without losing the allocation state.

The new code skips the RocksDB updates on allocation time and instead perform a full desatge of the allocator object with all the OSD allocation state in a single step during umount().
This results with an 25% increase in IOPS and reduced latency in small random-write workloads, but exposes the system to losing allocation info in failure cases where we don't call umount.
We added code to perform a full allocation-map rebuild from information stored inside the ONode which is used in failure cases.
When we perform a graceful shutdown there is no need for recovery and we simply read the allocation-map from a flat file where the allocation-map was stored during umount() (in fact this mode is faster and shaves few seconds from boot time since reading a flat file is faster than iterating over RocksDB)

Open Issues:

There is a bug in the src/stop.sh script killing ceph without invoking umount() which means anyone using it will always invoke the recovery path.
Adam Kupczyk is fixing this issue in a separate PR.
A simple workaround is to add a call to 'killall -15 ceph-osd' before calling src/stop.sh

Fast-Shutdown and Ceph Suicide (done when the system underperforms) stop the system without a proper drain and a call to umount.
This will trigger a full recovery which can be long( 3 minutes in my testing, but your your mileage may vary).
We plan on adding a follow up PR doing the following in Fast-Shutdown and Ceph Suicide:

Block the OSD queues from accepting any new request
Delete all items in queue which we didn't start yet
Drain all in-flight tasks
call umount (and destage the allocation-map)
If drain didn't complete within a predefined time-limit (say 3 minutes) -> kill the OSD
Signed-off-by: Gabriel Benhanokh <gbenhano@redhat.com>
create allocator from on-disk onodes and BlueFS inodes
change allocator + add stat counters + report illegal physical-extents
compare allocator after rebuild from ONodes
prevent collection from being open twice
removed FSCK repo check for null-fm
Bug-Fix: don't add BlueFS allocation to shared allocator
add configuration option to commit to No-Column-B
Only invalidate allocation file after opening rocksdb in read-write mode
fix tests not to expect failure in cases unapplicable to null-allocator
accept non-existing allocation file and don't fail the invaladtion as it could happen legally
don't commit to null-fm when db is opened in repair-mode
add a reverse mechanism from null_fm to real_fm (using RocksDB)
Using Ceph encode/decode, adding more info to header/trailer, add crc protection
Code cleanup

some changes requested by Adam (cleanup and style changes)

Signed-off-by: Gabriel Benhanokh <gbenhano@redhat.com>
doc/man/8/ceph-bluestore-tool.rst
src/common/options/global.yaml.in
src/os/bluestore/BitmapFreelistManager.cc
src/os/bluestore/BlueFS.cc
src/os/bluestore/BlueFS.h
src/os/bluestore/BlueStore.cc
src/os/bluestore/BlueStore.h
src/os/bluestore/FreelistManager.h
src/os/bluestore/bluestore_tool.cc
src/test/objectstore/store_test.cc

index 8c92275dafc06a892d10da9c45d6123e6ce40504..c6f198496db8fbe171afd8ca80736f413545936a 100644 (file)
@@ -17,6 +17,9 @@ Synopsis
   [ --log-file | -l *filename* ]
   [ --deep ]
 | **ceph-bluestore-tool** fsck|repair --path *osd path* [ --deep ]
+| **ceph-bluestore-tool** qfsck       --path *osd path*
+| **ceph-bluestore-tool** allocmap    --path *osd path*
+| **ceph-bluestore-tool** restore_cfb --path *osd path*
 | **ceph-bluestore-tool** show-label --dev *device* ...
 | **ceph-bluestore-tool** prime-osd-dir --dev *device* --path *osd path*
 | **ceph-bluestore-tool** bluefs-export --path *osd path* --out-dir *dir*
@@ -49,6 +52,19 @@ Commands
 
    Run a consistency check *and* repair any errors we can.
 
+:command:`qfsck`
+
+   run consistency check on BlueStore metadata comparing allocator data (from RocksDB CFB when exists and if not uses allocation-file) with ONodes state.
+
+:command:`allocmap`
+
+   performs the same check done by qfsck and then stores a new allocation-file (command is disabled by default and requires a special build)
+
+:command:`restore_cfb`
+
+   Reverses changes done by the new NCB code (either through ceph restart or when running allocmap command) and restores RocksDB B Column-Family (allocator-map).
+
+
 :command:`bluefs-export`
 
    Export the contents of BlueFS (i.e., RocksDB files) to an output directory.
index 78d7030808976d2ff52b5664339e77e7471fe4e7..09a2fdd02b4093cc1f6930f52de286f0f7f3b234 100644 (file)
@@ -4782,6 +4782,12 @@ options:
   desc: Run fsck at umount
   default: false
   with_legacy: true
+- name: bluestore_allocation_from_file
+  type: bool
+  level: dev
+  desc: Remove allocation info from RocksDB and store the info in a new allocation file
+  default: true
+  with_legacy: true
 - name: bluestore_fsck_on_umount_deep
   type: bool
   level: dev
index edc489b6851b15c19283d1adedb0e4ce54850d30..2c8971296268473b368a789d0179dd0bd589cd1d 100644 (file)
@@ -486,7 +486,9 @@ void BitmapFreelistManager::allocate(
 {
   dout(10) << __func__ << " 0x" << std::hex << offset << "~" << length
           << std::dec << dendl;
-  _xor(offset, length, txn);
+  if (!is_null_manager()) {
+    _xor(offset, length, txn);
+  }
 }
 
 void BitmapFreelistManager::release(
@@ -495,7 +497,9 @@ void BitmapFreelistManager::release(
 {
   dout(10) << __func__ << " 0x" << std::hex << offset << "~" << length
           << std::dec << dendl;
-  _xor(offset, length, txn);
+  if (!is_null_manager()) {
+    _xor(offset, length, txn);
+  }
 }
 
 void BitmapFreelistManager::_xor(
index 3a653f425d043392016acae6c17c1aad8f0c2e94..589c54b6fc48d0acbf9ce393e3b41c4a2a253d18 100644 (file)
@@ -3099,11 +3099,13 @@ int BlueFS::_allocate(uint8_t id, uint64_t len,
               << ", fragmentation " << alloc[id]->get_fragmentation()
               << ", allocated 0x" << (alloc_len > 0 ? alloc_len : 0)
              << std::dec << dendl;
+    } else {
+      dout(20) << __func__ << " alloc-id not set on index="<< (int)id << " unable to allocate 0x" << std::hex << need
+              << " on bdev " << (int)id << std::dec << dendl;
     }
-
     if (id != BDEV_SLOW) {
       dout(20) << __func__ << " fallback to bdev "
-               << (int)id + 1
+              << (int)id + 1
               << dendl;
       return _allocate(id + 1, len, node);
     } else {
index aefe083c993fb344c138f87e88396ddda6dfe6e3..6f92965e4effd027a52d938ecbd43c17b3aa2df6 100644 (file)
@@ -206,7 +206,7 @@ public:
     }
 
     // note: BlueRocksEnv uses this append exclusively, so it's safe
-    // to use buffer_appender exclusively here (e.g., it's notion of
+    // to use buffer_appender exclusively here (e.g., its notion of
     // offset will remain accurate).
     void append(const char *buf, size_t len) {
       uint64_t l0 = get_buffer_length();
@@ -214,6 +214,11 @@ public:
       buffer_appender.append(buf, len);
     }
 
+    void append(const byte *buf, size_t len) {
+      // allow callers to use byte type instead of char* as we simply pass byte array
+      append((const char*)buf, len);
+    }
+
     // note: used internally only, for ino 1 or 0.
     void append(ceph::buffer::list& bl) {
       uint64_t l0 = get_buffer_length();
index 219908da64874d6fc4d933de32aa6253ee0fb27e..615cba6cdda4470b836ff239905e148d0ac731e5 100644 (file)
@@ -5107,7 +5107,7 @@ void BlueStore::_close_path()
 }
 
 int BlueStore::_write_bdev_label(CephContext *cct,
-                                string path, bluestore_bdev_label_t label)
+                                const string &path, bluestore_bdev_label_t label)
 {
   dout(10) << __func__ << " path " << path << " label " << label << dendl;
   bufferlist bl;
@@ -5142,7 +5142,7 @@ out:
   return r;
 }
 
-int BlueStore::_read_bdev_label(CephContext* cct, string path,
+int BlueStore::_read_bdev_label(CephContext* cct, const string &path,
                                bluestore_bdev_label_t *label)
 {
   dout(10) << __func__ << dendl;
@@ -5310,13 +5310,30 @@ void BlueStore::_close_bdev()
   bdev = NULL;
 }
 
-int BlueStore::_open_fm(KeyValueDB::Transaction t, bool read_only)
+int BlueStore::_open_fm(KeyValueDB::Transaction t, bool read_only, bool fm_restore)
 {
   int r;
 
+  dout(5) << __func__ << "::NCB::freelist_type=" << freelist_type << dendl;
   ceph_assert(fm == NULL);
+  // fm_restore means we are transitioning from null-fm to bitmap-fm
+  ceph_assert(!fm_restore || (freelist_type != "null"));
+  // fm restore must pass in a valid transaction
+  ceph_assert(!fm_restore || (t != nullptr));
+
+  // When allocation-info is stored in a single file we set freelist_type to "null"
+  bool set_null_freemap = false;
+  if (freelist_type == "null") {
+    // use BitmapFreelistManager with the null option to stop allocations from going to RocksDB
+    // we will store the allocation info in a single file during umount()
+    freelist_type = "bitmap";
+    set_null_freemap = true;
+  }
   fm = FreelistManager::create(cct, freelist_type, PREFIX_ALLOC);
   ceph_assert(fm);
+  if (set_null_freemap) {
+    fm->set_null_manager();
+  }
   if (t) {
     // create mode. initialize freespace
     dout(20) << __func__ << " initializing freespace" << dendl;
@@ -5336,13 +5353,18 @@ int BlueStore::_open_fm(KeyValueDB::Transaction t, bool read_only)
     }
 #endif
     fm->create(bdev->get_size(), alloc_size, t);
-
-    // allocate superblock reserved space.  note that we do not mark
-    // bluefs space as allocated in the freelist; we instead rely on
-    // bluefs doing that itself.
     auto reserved = _get_ondisk_reserved();
-    fm->allocate(0, reserved, t);
-
+    if (fm_restore) {
+      // we need to allocate the full space in restore case
+      // as later we will add free-space marked in the allocator file
+      fm->allocate(0, bdev->get_size(), t);
+    } else {
+      // allocate superblock reserved space.  note that we do not mark
+      // bluefs space as allocated in the freelist; we instead rely on
+      // bluefs doing that itself.
+      fm->allocate(0, reserved, t);
+    }
+    // debug code - not needed for NULL FM
     if (cct->_conf->bluestore_debug_prefill > 0) {
       uint64_t end = bdev->get_size() - reserved;
       dout(1) << __func__ << " pre-fragmenting freespace, using "
@@ -5488,18 +5510,47 @@ int BlueStore::_init_alloc()
 #endif
   
   uint64_t num = 0, bytes = 0;
+  utime_t start_time = ceph_clock_now();
+  if (!fm->is_null_manager()) {
+    // This is the original path - loading allocation map from RocksDB and feeding into the allocator
+    dout(5) << __func__ << "::NCB::loading allocation from FM -> shared_alloc" << dendl;
+    // initialize from freelist
+    fm->enumerate_reset();
+    uint64_t offset, length;
+    while (fm->enumerate_next(db, &offset, &length)) {
+      shared_alloc.a->init_add_free(offset, length);
+      ++num;
+      bytes += length;
+    }
+    fm->enumerate_reset();
+
+    utime_t duration = ceph_clock_now() - start_time;
+    dout(5) << __func__ << "::num_entries=" << num << " free_size=" << bytes << " alloc_size=" <<
+      shared_alloc.a->get_capacity() - bytes << " time=" << duration << " seconds" << dendl;
+  } else {
+    // This is the new path reading the allocation map from a flat bluefs file and feeding them into the allocator
 
-  dout(1) << __func__ << " opening allocation metadata" << dendl;
-  // initialize from freelist
-  fm->enumerate_reset();
-  uint64_t offset, length;
-  while (fm->enumerate_next(db, &offset, &length)) {
-    shared_alloc.a->init_add_free(offset, length);
-    ++num;
-    bytes += length;
-  }
-  fm->enumerate_reset();
+    if (!cct->_conf->bluestore_allocation_from_file) {
+      derr << __func__ << "::NCB::cct->_conf->bluestore_allocation_from_file is set to FALSE with an active NULL-FM" << dendl;
+      derr << __func__ << "::NCB::Please change the value of bluestore_allocation_from_file to TRUE in your ceph.conf file" << dendl;
+      return -ENOTSUP; // Operation not supported
+    }
 
+    if (restore_allocator(shared_alloc.a, &num, &bytes) == 0) {
+      dout(5) << __func__ << "::NCB::restore_allocator() completed successfully shared_alloc.a=" << shared_alloc.a << dendl;
+    } else {
+      // This must mean that we had an unplanned shutdown and didn't manage to destage the allocator
+      dout(1) << __func__ << "::NCB::restore_allocator() failed!" << dendl;
+      dout(1) << __func__ << "::NCB::Run Full Recovery from ONodes (might take a while) ..." << dendl;
+      // if failed must recover from on-disk ONode internal state
+      if (read_allocation_from_drive_on_startup() != 0) {
+       derr << __func__ << "::NCB::Failed Recovery" << dendl;
+       derr << __func__ << "::NCB::Ceph-OSD won't start, make sure your drives are connected and readable" << dendl;
+       derr << __func__ << "::NCB::If no HW fault is found, please report failure and consider redeploying OSD" << dendl;
+       return -ENOTRECOVERABLE;
+      }
+    }
+  }
   dout(1) << __func__
           << " loaded " << byte_u_t(bytes) << " in " << num << " extents"
           << std::hex
@@ -5882,8 +5933,7 @@ int BlueStore::_is_bluefs(bool create, bool* ret)
 */
 int BlueStore::_open_db_and_around(bool read_only, bool to_repair)
 {
-  dout(0) << __func__ << " read-only:" << read_only
-          << " repair:" << to_repair << dendl;
+  dout(5) << __func__ << "::NCB::read_only=" << read_only << ", to_repair=" << to_repair << dendl;
   {
     string type;
     int r = read_meta("type", &type);
@@ -5918,6 +5968,9 @@ int BlueStore::_open_db_and_around(bool read_only, bool to_repair)
   if (r < 0)
     goto out_fsid;
 
+  // GBH: can probably skip open_db step in REad-Only mode when operating in NULL-FM mode
+  // (might need to open if failed to restore from file)
+
   // open in read-only first to read FM list and init allocator
   // as they might be needed for some BlueFS procedures
   r = _open_db(false, false, true);
@@ -5944,11 +5997,31 @@ int BlueStore::_open_db_and_around(bool read_only, bool to_repair)
   // And now it's time to do that
   //
   _close_db(true);
-
   r = _open_db(false, to_repair, read_only);
   if (r < 0) {
     goto out_alloc;
   }
+
+  // when function is called in repair mode (to_repair=true) we skip db->open()/create()
+  // we can't change bluestore allocation so no need to invlidate allocation-file
+  if (fm->is_null_manager() && !read_only && !to_repair) {
+    // Now that we load the allocation map we need to invalidate the file as new allocation won't be reflected
+    // Changes to the allocation map (alloc/release) are not updated inline and will only be stored on umount()
+    // This means that we should not use the existing file on failure case (unplanned shutdown) and must resort
+    //  to recovery from RocksDB::ONodes
+    r = invalidate_allocation_file_on_bluefs();
+    if (r != 0) {
+      derr << __func__ << "::NCB::invalidate_allocation_file_on_bluefs() failed!" << dendl;
+      goto out_alloc;
+    }
+  }
+
+  // when function is called in repair mode (to_repair=true) we skip db->open()/create()
+  if (!read_only && !to_repair && cct->_conf->bluestore_allocation_from_file) {
+    dout(5) << __func__ << "::NCB::Commit to Null-Manager" << dendl;
+    commit_to_null_manager();
+  }
+
   return 0;
 
 out_alloc:
@@ -6222,9 +6295,14 @@ void BlueStore::_dump_alloc_on_failure()
 
 int BlueStore::_open_collections()
 {
+  if (!coll_map.empty()) {
+    // could be opened from another path
+    dout(20) << __func__ << "::NCB::collections are already opened, nothing to do" << dendl;
+    return 0;
+  }
+
   dout(10) << __func__ << dendl;
   collections_had_errors = false;
-  ceph_assert(coll_map.empty());
   KeyValueDB::Iterator it = db->get_iterator(PREFIX_COLL);
   for (it->upper_bound(string());
        it->valid();
@@ -6664,7 +6742,7 @@ int BlueStore::add_new_bluefs_device(int id, const string& dev_path)
     derr << __func__ << " bluefs isn't configured, can't add new device " << dendl;
     return -EIO;
   }
-
+  dout(5) << __func__ << "::NCB::calling open_db_and_around(read-only)" << dendl;
   r = _open_db_and_around(true);
   if (r < 0) {
     return r;
@@ -6715,7 +6793,6 @@ int BlueStore::add_new_bluefs_device(int id, const string& dev_path)
     bluefs_layout.shared_bdev = BlueFS::BDEV_SLOW;
     bluefs_layout.dedicated_db = true;
   }
-
   bluefs->umount();
   bluefs->mount();
 
@@ -6986,6 +7063,7 @@ int BlueStore::expand_devices(ostream& out)
     // mount in read/write to sync expansion changes
     r = _mount();
     ceph_assert(r == 0);
+    dout(5) << __func__ << "::NCB::calling umount()" << dendl;
     umount();
   } else {
     _close_db_and_around(true);
@@ -7024,10 +7102,10 @@ void BlueStore::set_cache_shards(unsigned num)
 
 int BlueStore::_mount()
 {
-  dout(1) << __func__ << " path " << path << dendl;
-
+  dout(5) << __func__ << "NCB:: path " << path << dendl;
   _kv_only = false;
   if (cct->_conf->bluestore_fsck_on_mount) {
+    dout(5) << __func__ << "::NCB::calling fsck()" << dendl;
     int rc = fsck(cct->_conf->bluestore_fsck_on_mount_deep);
     if (rc < 0)
       return rc;
@@ -7044,6 +7122,7 @@ int BlueStore::_mount()
     return -EINVAL;
   }
 
+  dout(5) << __func__ << "::NCB::calling open_db_and_around(read/write)" << dendl;
   int r = _open_db_and_around(false);
   if (r < 0) {
     return r;
@@ -7059,6 +7138,7 @@ int BlueStore::_mount()
     return r;
   }
 
+  // The recovery process for allocation-map needs to open collection early
   r = _open_collections();
   if (r < 0) {
     return r;
@@ -7105,6 +7185,7 @@ int BlueStore::_mount()
     auto was_per_pool_omap = per_pool_omap;
 
     dout(1) << __func__ << " quick-fix on mount" << dendl;
+    dout(5) << __func__ << "::NCB::calling fsck_on_open(FSCK_SHALLOW)" << dendl;
     _fsck_on_open(FSCK_SHALLOW, true);
 
     //reread statfs
@@ -7124,12 +7205,15 @@ int BlueStore::_mount()
 
 int BlueStore::umount()
 {
+  dout(5) << __func__ << "::NCB::entered" << dendl;
   ceph_assert(_kv_only || mounted);
-  dout(1) << __func__ << dendl;
-
+  bool was_mounted = mounted;
   _osr_drain_all();
 
   mounted = false;
+
+  ceph_assert(shared_alloc.a);
+
   if (!_kv_only) {
     mempool_thread.shutdown();
 #ifdef HAVE_LIBZBD
@@ -7142,11 +7226,23 @@ int BlueStore::umount()
     _kv_stop();
     _shutdown_cache();
     dout(20) << __func__ << " closing" << dendl;
+  }
 
+  // GBH - Vault the allocation state
+  dout(5) << "NCB::BlueStore::umount->store_allocation_state_on_bluestore() " << dendl;
+  if (was_mounted && fm->is_null_manager()) {
+    int ret = store_allocator(shared_alloc.a);
+    if (ret != 0) {
+      derr << __func__ << "::NCB::store_allocator() failed (continue with bitmapFreelistManager)" << dendl;
+      return ret;
+    }
+    dout(5) << __func__ << "::NCB::store_allocator() completed successfully" << dendl;
   }
+
   _close_db_and_around(false);
 
   if (cct->_conf->bluestore_fsck_on_umount) {
+    dout(5) << __func__ << "::NCB::calling fsck()" << dendl;
     int rc = fsck(cct->_conf->bluestore_fsck_on_umount_deep);
     if (rc < 0)
       return rc;
@@ -7198,6 +7294,7 @@ int _fsck_sum_extents(
   return 0;
 }
 
+
 int BlueStore::_fsck_check_extents(
   const coll_t& cid,
   const ghobject_t& oid,
@@ -7220,6 +7317,7 @@ int BlueStore::_fsck_check_extents(
     }
     if (depth != FSCK_SHALLOW) {
       bool already = false;
+      //dout(1) << __func__ << "::NCB::FSCK<" << e.offset << "," << e.length << ">" << dendl;
       apply_for_bitset_range(
         e.offset, e.length, granularity, used_blocks,
         [&](uint64_t pos, mempool_dynamic_bitset &bs) {
@@ -7229,7 +7327,7 @@ int BlueStore::_fsck_check_extents(
                pos * min_alloc_size, min_alloc_size, !already);
            }
             if (!already) {
-              derr << "fsck error: " << oid << " extent " << e
+              derr << __func__ << "::fsck error: " << oid << " extent " << e
                   << " or a subset is already allocated (misreferenced)" << dendl;
              ++errors;
              already = true;
@@ -7720,7 +7818,6 @@ public:
           nullptr, // referenced
           ctx);
       }
-      //std::cout << "processed " << batch << std::endl;
       batch->entry_count = 0;
       batch->running--;
     }
@@ -8070,7 +8167,6 @@ void BlueStore::_fsck_check_objects(FSCKDepth depth,
 
       if (!queued) {
         ++processed_myself;
-
          o = fsck_check_objects_shallow(
           depth,
           pool_id,
@@ -8223,7 +8319,8 @@ Detection stage (in processing order):
 */
 int BlueStore::_fsck(BlueStore::FSCKDepth depth, bool repair)
 {
-  dout(1) << __func__
+  dout(5) << __func__ << "::NCB::depth=" << depth << ", repair="<< repair << dendl;
+  dout(5) << __func__
     << (repair ? " repair" : " check")
     << (depth == FSCK_DEEP ? " (deep)" :
       depth == FSCK_SHALLOW ? " (shallow)" : " (regular)")
@@ -8231,7 +8328,7 @@ int BlueStore::_fsck(BlueStore::FSCKDepth depth, bool repair)
 
   // in deep mode we need R/W write access to be able to replay deferred ops
   bool read_only = !(repair || depth == FSCK_DEEP);
-
+  dout(5) << __func__ << "::NCB::calling open_db_and_around()" << dendl;
   int r = _open_db_and_around(read_only);
   if (r < 0) {
     return r;
@@ -8247,6 +8344,7 @@ int BlueStore::_fsck(BlueStore::FSCKDepth depth, bool repair)
     }
   }
 
+  // NullFreelistManager needs to open collection early
   r = _open_collections();
   if (r < 0) {
     return r;
@@ -8264,6 +8362,7 @@ int BlueStore::_fsck(BlueStore::FSCKDepth depth, bool repair)
     r = _deferred_replay();
     _kv_stop();
   }
+
   if (r < 0) {
     return r;
   }
@@ -8272,6 +8371,7 @@ int BlueStore::_fsck(BlueStore::FSCKDepth depth, bool repair)
 
 int BlueStore::_fsck_on_open(BlueStore::FSCKDepth depth, bool repair)
 {
+  dout(5) << __func__ << "::NCB::entered" << dendl;
   dout(1) << __func__
          << " <<<START>>>"
          << (repair ? " repair" : " check")
@@ -8313,6 +8413,7 @@ int BlueStore::_fsck_on_open(BlueStore::FSCKDepth depth, bool repair)
     int r = bluefs->get_block_extents(bluefs_layout.shared_bdev, &bluefs_extents);
     ceph_assert(r == 0);
     for (auto [start, len] : bluefs_extents) {
+      //dout(0) << __func__ << "::NCB::BFS extent <"<< start << ", " << len << ">" << dendl;
       apply_for_bitset_range(start, len, alloc_size, used_blocks,
         [&](uint64_t pos, mempool_dynamic_bitset& bs) {
           ceph_assert(pos < bs.size());
@@ -8589,6 +8690,7 @@ int BlueStore::_fsck_on_open(BlueStore::FSCKDepth depth, bool repair)
              continue;
            }
            PExtentVector exts;
+           dout(5) << __func__ << "::NCB::(F)shared_alloc.a=" << shared_alloc.a << ", length=" << e->length << dendl;
            int64_t alloc_len =
               shared_alloc.a->allocate(e->length, min_alloc_size,
                                       0, 0, &exts);
@@ -8856,7 +8958,8 @@ int BlueStore::_fsck_on_open(BlueStore::FSCKDepth depth, bool repair)
     }
 
     dout(1) << __func__ << " checking freelist vs allocated" << dendl;
-    {
+    // skip freelist vs allocated compare when we have Null fm
+    if (!fm->is_null_manager()) {
       fm->enumerate_reset();
       uint64_t offset, length;
       while (fm->enumerate_next(db, &offset, &length)) {
@@ -8975,12 +9078,17 @@ void BlueStore::inject_broken_shared_blob_key(const string& key,
 
 void BlueStore::inject_leaked(uint64_t len)
 {
-  KeyValueDB::Transaction txn;
-  txn = db->get_transaction();
-
   PExtentVector exts;
   int64_t alloc_len = shared_alloc.a->allocate(len, min_alloc_size,
                                           min_alloc_size * 256, 0, &exts);
+
+  if (fm->is_null_manager()) {
+    return;
+  }
+
+  KeyValueDB::Transaction txn;
+  txn = db->get_transaction();
+
   ceph_assert(alloc_len >= (int64_t)len);
   for (auto& p : exts) {
     fm->allocate(p.offset, p.length, txn);
@@ -8990,6 +9098,8 @@ void BlueStore::inject_leaked(uint64_t len)
 
 void BlueStore::inject_false_free(coll_t cid, ghobject_t oid)
 {
+  ceph_assert(!fm->is_null_manager());
+
   KeyValueDB::Transaction txn;
   OnodeRef o;
   CollectionRef c = _get_collection(cid);
@@ -11115,12 +11225,11 @@ int BlueStore::_open_super_meta()
     db->get(PREFIX_SUPER, "freelist_type", &bl);
     if (bl.length()) {
       freelist_type = std::string(bl.c_str(), bl.length());
-      dout(1) << __func__ << " freelist_type " << freelist_type << dendl;
     } else {
       ceph_abort_msg("Not Support extent freelist manager");
     }
+    dout(5) << __func__ << "::NCB::freelist_type=" << freelist_type << dendl;
   }
-
   // ondisk format
   int32_t compat_ondisk_format = 0;
   {
@@ -11645,42 +11754,45 @@ void BlueStore::_txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t)
           << " released 0x" << txc->released
           << std::dec << dendl;
 
-  // We have to handle the case where we allocate *and* deallocate the
-  // same region in this transaction.  The freelist doesn't like that.
-  // (Actually, the only thing that cares is the BitmapFreelistManager
-  // debug check. But that's important.)
-  interval_set<uint64_t> tmp_allocated, tmp_released;
-  interval_set<uint64_t> *pallocated = &txc->allocated;
-  interval_set<uint64_t> *preleased = &txc->released;
-  if (!txc->allocated.empty() && !txc->released.empty()) {
-    interval_set<uint64_t> overlap;
-    overlap.intersection_of(txc->allocated, txc->released);
-    if (!overlap.empty()) {
-      tmp_allocated = txc->allocated;
-      tmp_allocated.subtract(overlap);
-      tmp_released = txc->released;
-      tmp_released.subtract(overlap);
-      dout(20) << __func__ << "  overlap 0x" << std::hex << overlap
-              << ", new allocated 0x" << tmp_allocated
-              << " released 0x" << tmp_released << std::dec
-              << dendl;
-      pallocated = &tmp_allocated;
-      preleased = &tmp_released;
+  if (!fm->is_null_manager())
+  {
+    // We have to handle the case where we allocate *and* deallocate the
+    // same region in this transaction.  The freelist doesn't like that.
+    // (Actually, the only thing that cares is the BitmapFreelistManager
+    // debug check. But that's important.)
+    interval_set<uint64_t> tmp_allocated, tmp_released;
+    interval_set<uint64_t> *pallocated = &txc->allocated;
+    interval_set<uint64_t> *preleased = &txc->released;
+    if (!txc->allocated.empty() && !txc->released.empty()) {
+      interval_set<uint64_t> overlap;
+      overlap.intersection_of(txc->allocated, txc->released);
+      if (!overlap.empty()) {
+       tmp_allocated = txc->allocated;
+       tmp_allocated.subtract(overlap);
+       tmp_released = txc->released;
+       tmp_released.subtract(overlap);
+       dout(20) << __func__ << "  overlap 0x" << std::hex << overlap
+                << ", new allocated 0x" << tmp_allocated
+                << " released 0x" << tmp_released << std::dec
+                << dendl;
+       pallocated = &tmp_allocated;
+       preleased = &tmp_released;
+      }
     }
-  }
 
-  // update freelist with non-overlap sets
-  for (interval_set<uint64_t>::iterator p = pallocated->begin();
-       p != pallocated->end();
-       ++p) {
-    fm->allocate(p.get_start(), p.get_len(), t);
-  }
-  for (interval_set<uint64_t>::iterator p = preleased->begin();
-       p != preleased->end();
-       ++p) {
-    dout(20) << __func__ << " release 0x" << std::hex << p.get_start()
-            << "~" << p.get_len() << std::dec << dendl;
-    fm->release(p.get_start(), p.get_len(), t);
+    // update freelist with non-overlap sets
+    for (interval_set<uint64_t>::iterator p = pallocated->begin();
+        p != pallocated->end();
+        ++p) {
+      fm->allocate(p.get_start(), p.get_len(), t);
+    }
+    for (interval_set<uint64_t>::iterator p = preleased->begin();
+        p != preleased->end();
+        ++p) {
+      dout(20) << __func__ << " release 0x" << std::hex << p.get_start()
+              << "~" << p.get_len() << std::dec << dendl;
+      fm->release(p.get_start(), p.get_len(), t);
+    }
   }
 
 #ifdef HAVE_LIBZBD
@@ -13709,6 +13821,11 @@ void BlueStore::_do_write_small(
   return;
 }
 
+bool BlueStore::has_null_fm()
+{
+  return fm->is_null_manager();
+}
+
 bool BlueStore::BigDeferredWriteContext::can_defer(
     BlueStore::extent_map_t::iterator ep,
     uint64_t prefer_deferred_size,
@@ -14166,6 +14283,7 @@ int BlueStore::_do_alloc_write(
     need, min_alloc_size, need,
     0, &prealloc);
   if (prealloc_left < 0 || prealloc_left < (int64_t)need) {
+    dout(5) << __func__ << "::NCB::failed allocation of " << need << " bytes!! shared_alloc.a=" << shared_alloc.a << dendl;
     derr << __func__ << " failed to allocate 0x" << std::hex << need
          << " allocated 0x " << (prealloc_left < 0 ? 0 : prealloc_left)
          << " min_alloc_size 0x" << min_alloc_size
@@ -14174,6 +14292,7 @@ int BlueStore::_do_alloc_write(
     if (prealloc.size()) {
       shared_alloc.a->release(prealloc);
     }
+    dout(5) << __func__ << "::NCB::(2)shared_alloc.a=" << shared_alloc.a << dendl;
     return -ENOSPC;
   }
   _collect_allocation_stats(need, min_alloc_size, prealloc);
@@ -16329,6 +16448,8 @@ bool BlueStoreRepairer::fix_leaked(KeyValueDB *db,
                                   uint64_t offset, uint64_t len)
 {
   std::lock_guard l(lock);
+  ceph_assert(!fm->is_null_manager());
+
   if (!fix_fm_leaked_txn) {
     fix_fm_leaked_txn = db->get_transaction();
   }
@@ -16341,6 +16462,8 @@ bool BlueStoreRepairer::fix_false_free(KeyValueDB *db,
                                       uint64_t offset, uint64_t len)
 {
   std::lock_guard l(lock);
+  ceph_assert(!fm->is_null_manager());
+
   if (!fix_fm_false_free_txn) {
     fix_fm_false_free_txn = db->get_transaction();
   }
@@ -16577,3 +16700,1446 @@ void RocksDBBlueFSVolumeSelector::dump(ostream& sout) {
 }
 
 // =======================================================
+
+//================================================================================================================
+// BlueStore is committing all allocation information (alloc/release) into RocksDB before the client Write is performed.
+// This cause a delay in write path and add significant load to the CPU/Memory/Disk.
+// The reason for the RocksDB updates is that it allows Ceph to survive any failure without losing the allocation state.
+//
+// We changed the code skiping RocksDB updates on allocation time and instead performing a full desatge of the allocator object
+// with all the OSD allocation state in a single step during umount().
+// This change leads to a 25% increase in IOPS and reduced latency in small random-write workload, but exposes the system
+// to losing allocation info in failure cases where we don't call umount.
+// We add code to perform a full allocation-map rebuild from information stored inside the ONode which is used in failure cases.
+// When we perform a graceful shutdown there is no need for recovery and we simply read the allocation-map from a flat file
+// where we store the allocation-map during umount().
+//================================================================================================================
+
+#undef dout_prefix
+#define dout_prefix *_dout << "bluestore::NCB::" << __func__ << "::"
+
+static const std::string allocator_dir    = "ALLOCATOR_NCB_DIR";
+static const std::string allocator_file   = "ALLOCATOR_NCB_FILE";
+static uint32_t    s_format_version = 0x01; // support future changes to allocator-map file
+static uint32_t    s_serial         = 0x01;
+
+#if 1
+#define CEPHTOH_32 le32toh
+#define CEPHTOH_64 le64toh
+#define HTOCEPH_32 htole32
+#define HTOCEPH_64 htole64
+#else
+// help debug the encode/decode by forcing alien format
+#define CEPHTOH_32 be32toh
+#define CEPHTOH_64 be64toh
+#define HTOCEPH_32 htobe32
+#define HTOCEPH_64 htobe64
+#endif
+
+// 48 Bytes header for on-disk alloator image
+const uint64_t ALLOCATOR_IMAGE_VALID_SIGNATURE = 0x1FACE0FF;
+struct allocator_image_header {
+  uint32_t format_version;     // 0x00
+  uint32_t valid_signature;    // 0x04
+  utime_t  timestamp;          // 0x08
+  uint32_t serial;             // 0x10
+  uint32_t pad[0x7];           // 0x14
+
+  allocator_image_header() {
+    memset((char*)this, 0, sizeof(allocator_image_header));
+  }
+
+  // create header in CEPH format
+  allocator_image_header(utime_t timestamp, uint32_t format_version, uint32_t serial) {
+    this->format_version  = format_version;
+    this->timestamp       = timestamp;
+    this->valid_signature = ALLOCATOR_IMAGE_VALID_SIGNATURE;
+    this->serial          = serial;
+    memset(this->pad, 0, sizeof(this->pad));
+  }
+
+  friend std::ostream& operator<<(std::ostream& out, const allocator_image_header& header) {
+    out << "format_version  = " << header.format_version << std::endl;
+    out << "valid_signature = " << header.valid_signature << "/" << ALLOCATOR_IMAGE_VALID_SIGNATURE << std::endl;
+    out << "timestamp       = " << header.timestamp << std::endl;
+    out << "serial          = " << header.serial << std::endl;
+    for (unsigned i = 0; i < sizeof(header.pad)/sizeof(uint32_t); i++) {
+      if (header.pad[i]) {
+       out << "header.pad[" << i << "] = " << header.pad[i] << std::endl;
+      }
+    }
+    return out;
+  }
+
+  DENC(allocator_image_header, v, p) {
+    denc(v.format_version, p);
+    denc(v.valid_signature, p);
+    denc(v.timestamp.tv.tv_sec, p);
+    denc(v.timestamp.tv.tv_nsec, p);
+    denc(v.serial, p);
+    for (auto& pad: v.pad) {
+      denc(pad, p);
+    }
+  }
+
+
+  int verify(CephContext* cct, const std::string &path) {
+    if (valid_signature == ALLOCATOR_IMAGE_VALID_SIGNATURE) {
+      for (unsigned i = 0; i < (sizeof(pad) / sizeof(uint32_t)); i++) {
+       if (this->pad[i]) {
+         derr << "Illegal Header - pad[" << i << "]="<< pad[i] << dendl;
+         return -1;
+       }
+      }
+      return 0;
+    }
+    else {
+      derr << "Illegal Header - signature="<< valid_signature << "(" << ALLOCATOR_IMAGE_VALID_SIGNATURE << ")" << dendl;
+      return -1;
+    }
+  }
+};
+WRITE_CLASS_DENC(allocator_image_header)
+
+struct extent_t {
+  uint64_t offset;
+  uint64_t length;
+
+  //extent_t(uint64_t _offset, uint64_t _length) : offset(_offset), length(_length) {}
+};
+
+// 56 Bytes trailer for on-disk alloator image
+struct allocator_image_trailer {
+  extent_t null_extent;         // 0x00
+
+  uint32_t format_version;     // 0x10
+  uint32_t valid_signature;    // 0x14
+
+  utime_t  timestamp;          // 0x18
+
+  uint32_t serial;             // 0x20
+  uint32_t pad;                // 0x24
+  uint64_t entries_count;      // 0x28
+  uint64_t allocation_size;    // 0x30
+
+  // trailer is created in CEPH format
+  allocator_image_trailer(utime_t timestamp, uint32_t format_version, uint32_t serial, uint64_t entries_count, uint64_t allocation_size) {
+    memset((char*)&(this->null_extent), 0, sizeof(this->null_extent));
+    this->format_version  = format_version;
+    this->valid_signature = ALLOCATOR_IMAGE_VALID_SIGNATURE;
+    this->timestamp       = timestamp;
+    this->serial          = serial;
+    this->pad             = 0;
+    this->entries_count   = entries_count;
+    this->allocation_size = allocation_size;
+  }
+
+  allocator_image_trailer() {
+    memset((char*)this, 0, sizeof(allocator_image_trailer));
+  }
+
+  friend std::ostream& operator<<(std::ostream& out, const allocator_image_trailer& trailer) {
+    if (trailer.null_extent.offset || trailer.null_extent.length) {
+      out << "trailer.null_extent.offset = " << trailer.null_extent.offset << std::endl;
+      out << "trailer.null_extent.length = " << trailer.null_extent.length << std::endl;
+    }
+    out << "format_version  = " << trailer.format_version << std::endl;
+    out << "valid_signature = " << trailer.valid_signature << "/" << ALLOCATOR_IMAGE_VALID_SIGNATURE << std::endl;
+    out << "timestamp       = " << trailer.timestamp << std::endl;
+    out << "serial          = " << trailer.serial << std::endl;
+    if (trailer.pad) {
+      out << "trailer.pad= " << trailer.pad << std::endl;
+    }
+    out << "entries_count   = " << trailer.entries_count   << std::endl;
+    out << "allocation_size = " << trailer.allocation_size << std::endl;
+    return out;
+  }
+
+  int verify(CephContext* cct, const std::string &path, const allocator_image_header *p_header, uint64_t entries_count, uint64_t allocation_size) {
+    if (valid_signature == ALLOCATOR_IMAGE_VALID_SIGNATURE) {
+
+      // trailer must starts with null extents (both fields set to zero) [no need to convert formats for zero)
+      if (null_extent.offset || null_extent.length) {
+       derr << "illegal trailer - null_extent = [" << null_extent.offset << "," << null_extent.length << "]"<< dendl;
+       return -1;
+      }
+
+      if (serial != p_header->serial) {
+       derr << "Illegal trailer: header->serial(" << p_header->serial << ") != trailer->serial(" << serial << ")" << dendl;
+       return -1;
+      }
+
+      if (format_version != p_header->format_version) {
+       derr << "Illegal trailer: header->format_version(" << p_header->format_version
+            << ") != trailer->format_version(" << format_version << ")" << dendl;
+       return -1;
+      }
+
+      if (timestamp != p_header->timestamp) {
+       derr << "Illegal trailer: header->timestamp(" << p_header->timestamp
+            << ") != trailer->timestamp(" << timestamp << ")" << dendl;
+       return -1;
+      }
+
+      if (this->entries_count != entries_count) {
+       derr << "Illegal trailer: entries_count(" << entries_count << ") != trailer->entries_count("
+            << this->entries_count << ")" << dendl;
+       return -1;
+      }
+
+      if (this->allocation_size != allocation_size) {
+       derr << "Illegal trailer: allocation_size(" << allocation_size << ") != trailer->allocation_size("
+            << this->allocation_size << ")" << dendl;
+       return -1;
+      }
+
+      if (pad) {
+       derr << "Illegal Trailer - pad="<< pad << dendl;
+       return -1;
+      }
+
+      // if arrived here -> trailer is valid !!
+      return 0;
+    } else {
+      derr << "Illegal Trailer - signature="<< valid_signature << "(" << ALLOCATOR_IMAGE_VALID_SIGNATURE << ")" << dendl;
+      return -1;
+    }
+  }
+
+  DENC(allocator_image_trailer, v, p) {
+    denc(v.null_extent.offset, p);
+    denc(v.null_extent.length, p);
+    denc(v.format_version, p);
+    denc(v.valid_signature, p);
+    denc(v.timestamp.tv.tv_sec, p);
+    denc(v.timestamp.tv.tv_nsec, p);
+    denc(v.serial, p);
+    denc(v.pad, p);
+    denc(v.entries_count, p);
+    denc(v.allocation_size, p);
+  }
+};
+WRITE_CLASS_DENC(allocator_image_trailer)
+
+
+//-------------------------------------------------------------------------------------
+// invalidate old allocation file if exists so will go directly to recovery after failure
+// we can safely ignore non-existing file
+int BlueStore::invalidate_allocation_file_on_bluefs()
+{
+  BlueFS::FileWriter *p_handle = nullptr;
+  if (!bluefs->dir_exists(allocator_dir)) {
+    dout(5) << "allocator_dir(" << allocator_dir << ") doesn't exist" << dendl;
+    // nothing to do -> return
+    return 0;
+  }
+
+  int ret = bluefs->stat(allocator_dir, allocator_file, nullptr, nullptr);
+  if (ret != 0) {
+    dout(5) << "allocator_file(" << allocator_file << ") doesn't exist" << dendl;
+    // nothing to do -> return
+    return 0;
+  }
+
+
+  ret = bluefs->open_for_write(allocator_dir, allocator_file, &p_handle, true);
+  if (ret != 0) {
+    derr << "Failed open_for_write with error-code " << ret << dendl;
+    return -1;
+  }
+
+  dout(5) << "invalidate using bluefs->truncate(p_handle, 0)" << dendl;
+  ret = bluefs->truncate(p_handle, 0);
+  if (ret != 0) {
+    derr << "Failed truncate with error-code " << ret << dendl;
+    bluefs->close_writer(p_handle);
+    return -1;
+  }
+
+  bluefs->fsync(p_handle);
+  bluefs->close_writer(p_handle);
+
+  return 0;
+}
+
+//-----------------------------------------------------------------------------------
+// load bluefs extents into bluefs_extents_vec
+int load_bluefs_extents(BlueFS                *bluefs,
+                       bluefs_layout_t       *bluefs_layout,
+                       CephContext*           cct,
+                       const std::string     &path,
+                       std::vector<extent_t> &bluefs_extents_vec,
+                       uint64_t               min_alloc_size)
+{
+  if (! bluefs) {
+    dout(5) <<  "No BlueFS device found!!" << dendl;
+    return 0;
+  }
+
+  interval_set<uint64_t> bluefs_extents;
+  int ret = bluefs->get_block_extents(bluefs_layout->shared_bdev, &bluefs_extents);
+  if (ret < 0) {
+    derr  <<  "failed bluefs->get_block_extents()!!" << dendl;
+    return ret;
+  }
+
+  for (auto itr = bluefs_extents.begin(); itr != bluefs_extents.end(); itr++) {
+    extent_t e = { .offset = itr.get_start(), .length = itr.get_len() };
+    bluefs_extents_vec.push_back(e);
+  }
+
+  dout(5) <<  "BlueFS extent_count=" << bluefs_extents_vec.size() << dendl;
+  return 0;
+}
+
+//-----------------------------------------------------------------------------------
+int BlueStore::copy_allocator(Allocator* src_alloc, Allocator* dest_alloc, uint64_t* p_num_entries)
+{
+  *p_num_entries = 0;
+  auto count_entries = [&](uint64_t extent_offset, uint64_t extent_length) {
+    (*p_num_entries)++;
+  };
+  src_alloc->dump(count_entries);
+
+  dout(5) << "count num_entries=" << *p_num_entries << dendl;
+
+  // add 16K extra entries in case new allocation happened
+  (*p_num_entries) += 16*1024;
+  unique_ptr<extent_t[]> arr;
+  try {
+    arr = make_unique<extent_t[]>(*p_num_entries);
+  } catch (std::bad_alloc&) {
+    derr << "****Failed dynamic allocation, num_entries=" << *p_num_entries << dendl;
+    return -1;
+  }
+
+  uint64_t idx         = 0;
+  bool     null_extent = false;
+  auto copy_entries = [&](uint64_t extent_offset, uint64_t extent_length) {
+    if (extent_length > 0) {
+      if (idx < *p_num_entries) {
+       arr[idx] = {extent_offset, extent_length};
+      }
+      idx++;
+    }
+    else {
+      null_extent = true;
+      derr << "zero length extent!!! offset=" << extent_offset << ", index=" << idx << dendl;
+    }
+  };
+  src_alloc->dump(copy_entries);
+
+  dout(5) << "copy num_entries=" << idx << dendl;
+  if (idx > *p_num_entries) {
+    derr << "****spillover, num_entries=" << *p_num_entries << ", spillover=" << (idx - *p_num_entries) << dendl;
+    return -1;
+  }
+
+  if (null_extent) {
+    derr << "null entries were found!" << dendl;
+    return -1;
+  }
+
+  *p_num_entries = idx;
+
+  for (idx = 0; idx < *p_num_entries; idx++) {
+    const extent_t *p_extent = &arr[idx];
+    dest_alloc->init_add_free(p_extent->offset, p_extent->length);
+  }
+
+  return 0;
+}
+
+//-----------------------------------------------------------------------------------
+static uint32_t flush_extent_buffer_with_crc(BlueFS::FileWriter *p_handle, const char* buffer, const char *p_curr, uint32_t crc)
+{
+  std::ptrdiff_t length = p_curr - buffer;
+  p_handle->append(buffer, length);
+
+  crc = ceph_crc32c(crc, (const uint8_t*)buffer, length);
+  uint32_t encoded_crc = HTOCEPH_32(crc);
+  p_handle->append((byte*)&encoded_crc, sizeof(encoded_crc));
+
+  return crc;
+}
+
+const unsigned MAX_EXTENTS_IN_BUFFER = 4 * 1024; // 4K extents = 64KB of data
+// write the allocator to a flat bluefs file - 4K extents at a time
+//-----------------------------------------------------------------------------------
+int BlueStore::store_allocator(Allocator* src_allocator)
+{
+  utime_t  start_time = ceph_clock_now();
+  int ret = 0;
+
+  // create dir if doesn't exist already
+  if (!bluefs->dir_exists(allocator_dir) ) {
+    ret = bluefs->mkdir(allocator_dir);
+    if (ret != 0) {
+      derr << "Failed mkdir with error-code " << ret << dendl;
+      return -1;
+    }
+  }
+
+  // reuse previous file-allocation if exists
+  ret = bluefs->stat(allocator_dir, allocator_file, nullptr, nullptr);
+  bool overwrite_file = (ret == 0);
+  //derr <<  __func__ << "bluefs->open_for_write(" << overwrite_file << ")" << dendl;
+  BlueFS::FileWriter *p_handle = nullptr;
+  ret = bluefs->open_for_write(allocator_dir, allocator_file, &p_handle, overwrite_file);
+  if (ret != 0) {
+    derr <<  __func__ << "Failed open_for_write with error-code " << ret << dendl;
+    return -1;
+  }
+
+  uint64_t file_size = p_handle->file->fnode.size;
+  uint64_t allocated = p_handle->file->fnode.get_allocated();
+  dout(5) << "file_size=" << file_size << ", allocated=" << allocated << dendl;
+
+  unique_ptr<Allocator> allocator(clone_allocator_without_bluefs(src_allocator));
+  if (!allocator) {
+    bluefs->close_writer(p_handle);
+    return -1;
+  }
+
+  // store all extents (except for the bluefs extents we removed) in a single flat file
+  utime_t                 timestamp = ceph_clock_now();
+  uint32_t                crc       = -1;
+  {
+    allocator_image_header  header(timestamp, s_format_version, s_serial);
+    bufferlist              header_bl;
+    //dout(5) << " header = \n" << header << dendl;
+    encode(header, header_bl);
+    crc = header_bl.crc32c(crc);
+    encode(crc, header_bl);
+    p_handle->append(header_bl);
+  }
+
+  crc = -1;                                     // reset crc
+  extent_t        buffer[MAX_EXTENTS_IN_BUFFER]; // 64KB
+  extent_t       *p_curr          = buffer;
+  const extent_t *p_end           = buffer + MAX_EXTENTS_IN_BUFFER;
+  uint64_t        extent_count    = 0;
+  uint64_t        allocation_size = 0;
+  auto iterated_allocation = [&](uint64_t extent_offset, uint64_t extent_length) {
+    if (extent_length == 0) {
+      derr <<  __func__ << "" << extent_count << "::[" << extent_offset << "," << extent_length << "]" << dendl;
+      ret = -1;
+      return;
+    }
+    //dout(5) <<  "" << extent_count << "[" << extent_offset << "," << extent_length << "]" << dendl;
+    p_curr->offset = HTOCEPH_64(extent_offset);
+    p_curr->length = HTOCEPH_64(extent_length);
+    extent_count++;
+    allocation_size += extent_length;
+    p_curr++;
+
+    if (p_curr == p_end) {
+      crc = flush_extent_buffer_with_crc(p_handle, (const char*)buffer, (const char*)p_curr, crc);
+      //dout(5) <<  " extent_count=" << extent_count << ", crc=" << crc << dendl;
+      p_curr = buffer; // recycle the buffer
+    }
+  };
+  allocator->dump(iterated_allocation);
+  // if got null extent -> fail the operation
+  if (ret != 0) {
+    derr << "Illegal extent, fail store operation" << dendl;
+    derr << "invalidate using bluefs->truncate(p_handle, 0)" << dendl;
+    bluefs->truncate(p_handle, 0);
+    bluefs->close_writer(p_handle);
+    return -1;
+  }
+
+  // if we got any leftovers -> add crc and append to file
+  if (p_curr > buffer) {
+    crc = flush_extent_buffer_with_crc(p_handle, (const char*)buffer, (const char*)p_curr, crc);
+    //dout(5) <<  " extent_count=" << extent_count << ", crc=" << crc << dendl;
+  }
+
+  {
+    allocator_image_trailer trailer(timestamp, s_format_version, s_serial, extent_count, allocation_size);
+    bufferlist trailer_bl;
+    //dout(5) << "trailer=\n" << trailer << dendl;
+    encode(trailer, trailer_bl);
+    uint32_t crc = -1;
+    crc = trailer_bl.crc32c(crc);
+    encode(crc, trailer_bl);
+    p_handle->append(trailer_bl);
+  }
+
+  bluefs->fsync(p_handle);
+  bluefs->truncate(p_handle, p_handle->pos);
+  bluefs->fsync(p_handle);
+
+  utime_t duration = ceph_clock_now() - start_time;
+  dout(5) <<"WRITE-extent_count=" << extent_count << ", file_size=" << p_handle->file->fnode.size << dendl;
+  dout(5) <<"p_handle->pos=" << p_handle->pos << " WRITE-duration=" << duration << " seconds" << dendl;
+
+  bluefs->close_writer(p_handle);
+  return 0;
+}
+
+//-----------------------------------------------------------------------------------
+Allocator* BlueStore::create_bitmap_allocator(uint64_t bdev_size) {
+  // create allocator
+  uint64_t alloc_size = min_alloc_size;
+  Allocator* alloc = Allocator::create(cct, "bitmap", bdev_size, alloc_size, "recovery");
+  if (alloc) {
+    return alloc;
+  } else {
+    derr << "Failed Allocator Creation" << dendl;
+    return nullptr;
+  }
+
+}
+
+//-----------------------------------------------------------------------------------
+size_t calc_allocator_image_header_size()
+{
+  utime_t                 timestamp = ceph_clock_now();
+  allocator_image_header  header(timestamp, s_format_version, s_serial);
+  bufferlist              header_bl;
+  encode(header, header_bl);
+  uint32_t crc = -1;
+  crc = header_bl.crc32c(crc);
+  encode(crc, header_bl);
+
+  return header_bl.length();
+}
+
+//-----------------------------------------------------------------------------------
+int calc_allocator_image_trailer_size()
+{
+  utime_t                 timestamp       = ceph_clock_now();
+  uint64_t                extent_count    = -1;
+  uint64_t                allocation_size = -1;
+  uint32_t                crc             = -1;
+  bufferlist              trailer_bl;
+  allocator_image_trailer trailer(timestamp, s_format_version, s_serial, extent_count, allocation_size);
+
+  encode(trailer, trailer_bl);
+  crc = trailer_bl.crc32c(crc);
+  encode(crc, trailer_bl);
+  return trailer_bl.length();
+}
+
+//-----------------------------------------------------------------------------------
+int BlueStore::restore_allocator(Allocator* allocator, uint64_t *num, uint64_t *bytes)
+{
+  utime_t start_time = ceph_clock_now();
+  BlueFS::FileReader *p_temp_handle = nullptr;
+  int ret = bluefs->open_for_read(allocator_dir, allocator_file, &p_temp_handle, false);
+  if (ret != 0) {
+    derr << "Failed open_for_read with error-code " << ret << dendl;
+    return -1;
+  }
+  unique_ptr<BlueFS::FileReader> p_handle(p_temp_handle);
+  uint64_t read_alloc_size = 0;
+  uint64_t file_size = p_handle->file->fnode.size;
+  dout(5) << "file_size=" << file_size << ",sizeof(extent_t)=" << sizeof(extent_t) << dendl;
+
+  // make sure we were able to store a valid copy
+  if (file_size == 0) {
+    derr << "No Valid allocation info on disk (empty file)" << dendl;
+    return -1;
+  }
+
+  // first read the header
+  size_t                 offset = 0;
+  allocator_image_header header;
+  int                    header_size = calc_allocator_image_header_size();
+  {
+    bufferlist header_bl,temp_bl;
+    int        read_bytes = bluefs->read(p_handle.get(), offset, header_size, &temp_bl, nullptr);
+    if (read_bytes != header_size) {
+      derr << "Failed bluefs->read() for header::read_bytes=" << read_bytes << ", req_bytes=" << header_size << dendl;
+      return -1;
+    }
+
+    offset += read_bytes;
+
+    header_bl.claim_append(temp_bl);
+    auto p = header_bl.cbegin();
+    decode(header, p);
+    //dout(5) << " header = \n" << header << dendl;
+    if (header.verify(cct, path) != 0 ) {
+      derr << "header = \n" << header << dendl;
+      return -1;
+    }
+
+    uint32_t crc_calc = -1, crc;
+    crc_calc = header_bl.cbegin().crc32c(p.get_off(), crc_calc); //crc from begin to current pos
+    decode(crc, p);
+    if (crc != crc_calc) {
+      derr << "crc mismatch!!! crc=" << crc << ", crc_calc=" << crc_calc << dendl;
+      derr << "header = \n" << header << dendl;
+      return -1;
+    }
+
+    // increment version for next store
+    s_serial = header.serial + 1;
+  }
+
+  // then read the payload (extents list) using a recycled buffer
+  extent_t        buffer[MAX_EXTENTS_IN_BUFFER]; // 64KB
+  uint32_t        crc                = -1;
+  int             trailer_size       = calc_allocator_image_trailer_size();
+  uint64_t        extent_count       = 0;
+  uint64_t        extents_bytes_left = file_size - (header_size + trailer_size + sizeof(crc));
+  while (extents_bytes_left) {
+    int req_bytes  = std::min(extents_bytes_left, sizeof(buffer));
+    int read_bytes = bluefs->read(p_handle.get(), offset, req_bytes, nullptr, (char*)buffer);
+    //dout(5) << " bluefs->read()::read_bytes=" << read_bytes << ", req_bytes=" << req_bytes << dendl;
+    if (read_bytes != req_bytes) {
+      derr << "Failed bluefs->read()::read_bytes=" << read_bytes << ", req_bytes=" << req_bytes << dendl;
+      return -1;
+    }
+
+    //dout(5) <<  "extents_bytes_left=" << extents_bytes_left << ", offset=" << offset << ", extent_count=" << extent_count << dendl;
+    offset             += read_bytes;
+    extents_bytes_left -= read_bytes;
+
+    const unsigned  num_extent_in_buffer = read_bytes/sizeof(extent_t);
+    const extent_t *p_end                = buffer + num_extent_in_buffer;
+    for (const extent_t *p_ext = buffer; p_ext < p_end; p_ext++) {
+      uint64_t offset = CEPHTOH_64(p_ext->offset);
+      uint64_t length = CEPHTOH_64(p_ext->length);
+      //dout(5) <<  "" << extent_count << "::[" << offset << "," << length << "]" << dendl;
+      read_alloc_size += length;
+
+      if (length > 0) {
+       allocator->init_add_free(offset, length);
+       extent_count ++;
+      } else {
+       derr << "extent with zero length at idx=" << extent_count << dendl;
+       return -1;
+      }
+    }
+
+    uint32_t calc_crc = ceph_crc32c(crc, (const uint8_t*)buffer, read_bytes);
+    read_bytes        = bluefs->read(p_handle.get(), offset, sizeof(crc), nullptr, (char*)&crc);
+    //dout(5) <<  "read-crc::read_bytes=" << read_bytes << ", offset=" << offset << dendl; 
+    if (read_bytes == sizeof(crc) ) {
+      crc     = CEPHTOH_32(crc);
+      if (crc != calc_crc) {
+       derr << "data crc mismatch!!! crc=" << crc << ", calc_crc=" << calc_crc << dendl;
+       derr << "extents_bytes_left=" << extents_bytes_left << ", offset=" << offset << ", extent_count=" << extent_count << dendl;
+       return -1;
+      }
+
+      offset += read_bytes;
+      if (extents_bytes_left) {
+       extents_bytes_left -= read_bytes;
+      }
+    } else {
+      derr << "Failed bluefs->read() for crc::read_bytes=" << read_bytes << ", req_bytes=" << sizeof(crc) << dendl;
+      return -1;
+    }
+
+  }
+
+  // finally, read teh trailer and verify it is in good shape and that we got all the extents
+  {
+    bufferlist trailer_bl,temp_bl;
+    int        read_bytes = bluefs->read(p_handle.get(), offset, trailer_size, &temp_bl, nullptr);
+    if (read_bytes != trailer_size) {
+      derr << "Failed bluefs->read() for trailer::read_bytes=" << read_bytes << ", req_bytes=" << trailer_size << dendl;
+      return -1;
+    }
+    offset += read_bytes;
+
+    trailer_bl.claim_append(temp_bl);
+    uint32_t crc_calc = -1;
+    uint32_t crc;
+    allocator_image_trailer trailer;
+    auto p = trailer_bl.cbegin();
+    decode(trailer, p);
+    if (trailer.verify(cct, path, &header, extent_count, read_alloc_size) != 0 ) {
+      derr << "trailer=\n" << trailer << dendl;
+      return -1;
+    }
+
+    crc_calc = trailer_bl.cbegin().crc32c(p.get_off(), crc_calc); //crc from begin to current pos
+    decode(crc, p);
+    if (crc != crc_calc) {
+      derr << "trailer crc mismatch!::crc=" << crc << ", crc_calc=" << crc_calc << dendl;
+      derr << "trailer=\n" << trailer << dendl;
+      return -1;
+    }
+  }
+
+  utime_t duration = ceph_clock_now() - start_time;
+  dout(5) << "READ--extent_count=" << extent_count << ", read_alloc_size=  "
+           << read_alloc_size << ", file_size=" << file_size << dendl;
+  dout(5) << "READ duration=" << duration << " seconds, s_serial=" << s_serial << dendl;
+  *num   = extent_count;
+  *bytes = read_alloc_size;
+  return 0;
+}
+
+//-------------------------------------------------------------------------
+void BlueStore::ExtentMap::provide_shard_info_to_onode(bufferlist v, uint32_t shard_id)
+{
+  auto cct  = onode->c->store->cct;
+  auto path = onode->c->store->path;
+  if (shard_id < shards.size()) {
+    auto p = &shards[shard_id];
+    if (!p->loaded) {
+      dout(30) << "opening shard 0x" << std::hex << p->shard_info->offset << std::dec << dendl;
+      p->extents = decode_some(v);
+      p->loaded = true;
+      dout(20) << "open shard 0x" << std::hex << p->shard_info->offset << std::dec << dendl;
+      ceph_assert(p->dirty == false);
+      ceph_assert(v.length() == p->shard_info->bytes);
+    }
+  } else {
+    derr << "illegal shard-id=" << shard_id << " shards.size()=" << shards.size() << dendl;
+    ceph_assert(shard_id < shards.size());
+  }
+}
+
+//---------------------------------------------------------
+// Process all physical extents from a given Onode (including all its shards)
+void BlueStore::read_allocation_from_single_onode(
+  Allocator*           allocator,
+  BlueStore::OnodeRef& onode_ref,
+  read_alloc_stats_t&  stats)
+{
+  // create a map holding all physical-extents of this Onode to prevent duplication from being added twice and more
+  std::unordered_map<uint64_t, uint32_t> lcl_extnt_map;
+  unsigned blobs_count = 0;
+  uint64_t pos = 0;
+
+  stats.spanning_blob_count += onode_ref->extent_map.spanning_blob_map.size();
+  // first iterate over all logical-extents
+  for (struct Extent& l_extent : onode_ref->extent_map.extent_map) {
+    ceph_assert(l_extent.logical_offset >= pos);
+
+    pos = l_extent.logical_offset + l_extent.length;
+    ceph_assert(l_extent.blob);
+    const bluestore_blob_t& blob         = l_extent.blob->get_blob();
+    const PExtentVector&    p_extent_vec = blob.get_extents();
+    blobs_count++;
+    if (blob.is_compressed()) {
+      stats.compressed_blob_count++;
+    }
+
+    // process all physical extent in this blob
+    for (auto p_extent = p_extent_vec.begin(); p_extent != p_extent_vec.end(); p_extent++) {
+      auto offset = p_extent->offset;
+      auto length = p_extent->length;
+
+      // Offset of -1 means that the extent was removed (and it is only a place holder) and can be safely skipped
+      if (offset == (uint64_t)-1) {
+       stats.skipped_illegal_extent++;
+       continue;
+      }
+
+      // skip repeating extents
+      auto lcl_itr = lcl_extnt_map.find(offset);
+      if (lcl_itr != lcl_extnt_map.end()) {
+       // repeated extents must have the same length!
+
+       // --Note--
+       // This asserts triggers because of a corruption which was hidden until now
+       // It was not introduced by this PR (we merely report it now)
+       // Don't shoot me I'm only the messenger :-)
+       ceph_assert(lcl_extnt_map[offset] == length);
+       stats.skipped_repeated_extent++;
+       ceph_assert(blobs_count > 0);
+      } else {
+       lcl_extnt_map[offset] = length;
+       allocator->init_rm_free(offset, length);
+       stats.extent_count++;
+      }
+    }
+  }
+
+  if (blobs_count < MAX_BLOBS_IN_ONODE) {
+    stats.blobs_in_onode[blobs_count]++;
+  } else {
+    // store all counts higher than MAX_BLOBS_IN_ONODE in a single bucket at offset zero
+    //std::cout << "***BCF::Blob-count=" << blobs_count << std::endl;
+    stats.blobs_in_onode[MAX_BLOBS_IN_ONODE]++;
+  }
+}
+
+//-------------------------------------------------------------------------
+int BlueStore::read_allocation_from_onodes(Allocator* allocator, read_alloc_stats_t& stats)
+{
+  // finally add all space take by user data
+  auto it = db->get_iterator(PREFIX_OBJ, KeyValueDB::ITERATOR_NOCACHE);
+  if (!it) {
+    // TBD - find a better error code
+    derr << "failed db->get_iterator(PREFIX_OBJ)" << dendl;
+    return -1;
+  }
+
+  CollectionRef       collection_ref;
+  spg_t               pgid;
+  BlueStore::OnodeRef onode_ref;
+  bool                has_open_onode = false;
+  uint32_t            shard_id       = 0;
+  uint64_t            kv_count       = 0;
+  uint64_t            count_interval = 1'000'000;
+  // iterate over all ONodes stored in RocksDB
+  for (it->lower_bound(string()); it->valid(); it->next(), kv_count++) {
+    // trace an even after every million processed objects (typically every 5-10 seconds)
+    if (kv_count && (kv_count % count_interval == 0) ) {
+      dout(5) << "processed objects count = " << kv_count << dendl;
+    }
+
+    // Shards - Code
+    // add the extents from the shards to the main Obj
+    if (is_extent_shard_key(it->key())) {
+      // shards must follow a valid main object
+      if (has_open_onode) {
+       // shards keys must start with the main object key
+       if (it->key().find(onode_ref->key) == 0) {
+         // shards count can't exceed declared shard-count in the main-object
+         if (shard_id < onode_ref->extent_map.shards.size()) {
+           onode_ref->extent_map.provide_shard_info_to_onode(it->value(), shard_id);
+           stats.shard_count++;
+           shard_id++;
+         } else {
+           derr << "illegal shard_id=" << shard_id << ", shards.size()=" << onode_ref->extent_map.shards.size() << dendl;
+           derr << "shard->key=" << pretty_binary_string(it->key()) << dendl;
+           ceph_assert(shard_id < onode_ref->extent_map.shards.size());
+         }
+       } else {
+         derr << "illegal shard-key::onode->key=" << pretty_binary_string(onode_ref->key) << " shard->key=" << pretty_binary_string(it->key()) << dendl;
+         ceph_assert(it->key().find(onode_ref->key) == 0);
+       }
+      } else {
+       derr << "error::shard without main objects for key=" << pretty_binary_string(it->key()) << dendl;
+       ceph_assert(has_open_onode);
+      }
+
+    } else {
+      // Main Object Code
+
+      if (has_open_onode) {
+       // make sure we got all shards of this object
+       if (shard_id == onode_ref->extent_map.shards.size()) {
+         // We completed an Onode Object -> pass it to be processed
+         read_allocation_from_single_onode(allocator, onode_ref, stats);
+       } else {
+         derr << "Missing shards! shard_id=" << shard_id << ", shards.size()=" << onode_ref->extent_map.shards.size() << dendl;
+         ceph_assert(shard_id == onode_ref->extent_map.shards.size());
+       }
+      } else {
+       // We opened a new Object
+       has_open_onode =  true;
+      }
+
+      // The main Obj is always first in RocksDB so we can start with shard_id set to zero
+      shard_id = 0;
+      stats.onode_count++;
+      ghobject_t oid;
+      int ret = get_key_object(it->key(), &oid);
+      if (ret < 0) {
+       derr << "bad object key " << pretty_binary_string(it->key()) << dendl;
+       ceph_assert(ret == 0);
+       continue;
+      }
+
+      // fill collection_ref if doesn't exist yet
+      // We process all the obejcts in a given collection and then move to the next collection
+      // This means we only search once for every given collection
+      if (!collection_ref                                     ||
+         oid.shard_id                != pgid.shard           ||
+         oid.hobj.get_logical_pool() != (int64_t)pgid.pool() ||
+         !collection_ref->contains(oid)) {
+       stats.collection_search++;
+       collection_ref = nullptr;
+
+       for (auto& p : coll_map) {
+         if (p.second->contains(oid)) {
+           collection_ref = p.second;
+           break;
+         }
+       }
+
+       if (!collection_ref) {
+         derr << "stray object " << oid << " not owned by any collection" << dendl;
+         ceph_assert(collection_ref);
+         continue;
+       }
+
+       collection_ref->cid.is_pg(&pgid);
+      }
+
+      //std::cout << "[" << stats.onode_count << "] oid="<< oid << " key=" << pretty_binary_string(it->key()) << std::endl;
+      onode_ref.reset(BlueStore::Onode::decode(collection_ref, oid, it->key(), it->value()));
+    }
+  }
+
+  // process the last object
+  if (has_open_onode) {
+    // make sure we got all shards of this object
+    if (shard_id == onode_ref->extent_map.shards.size()) {
+      // We completed an Onode Object -> pass it to be processed
+      read_allocation_from_single_onode(allocator, onode_ref, stats);
+    } else {
+      derr << "Last Object is missing shards! shard_id=" << shard_id << ", shards.size()=" << onode_ref->extent_map.shards.size() << dendl;
+      ceph_assert(shard_id == onode_ref->extent_map.shards.size());
+    }
+  }
+  dout(5) << "onode_count=" << stats.onode_count << " ,shard_count=" << stats.shard_count << dendl;
+
+  return 0;
+}
+
+//---------------------------------------------------------
+int BlueStore::reconstruct_allocations(Allocator* allocator, read_alloc_stats_t &stats)
+{
+  uint64_t memory_target = cct->_conf.get_val<Option::size_t>("osd_memory_target");
+  uint64_t bdev_size = bdev->get_size();
+  dout(5) << "memory_target=" << memory_target << ", bdev_size=" << bdev_size << dendl;
+
+  // start by marking the full device space as allocated and then remove each extent we find
+  dout(5) << "init_add_free(0, " << bdev_size << ")" << dendl;
+  allocator->init_add_free(0, bdev_size);
+
+  // first add space used by superblock
+  auto super_length = std::max<uint64_t>(min_alloc_size, SUPER_RESERVED);
+  dout(5) << "init_rm_free(0, " << super_length << ")" << dendl;
+  allocator->init_rm_free(0, super_length);
+  stats.extent_count++;
+
+  dout(5) << "calling read_allocation_from_onodes()" << dendl;
+  // then add all space taken by Objects
+  int ret = read_allocation_from_onodes(allocator, stats);
+  if (ret < 0) {
+    derr << "failed read_allocation_from_onodes()" << dendl;
+    return ret;
+  }
+
+  return 0;
+}
+
+//---------------------------------------------------------
+int BlueStore::read_allocation_from_drive_on_startup()
+{
+  int ret = 0;
+  dout(5) << "Start Allocation Recovery from ONodes ..." << dendl;
+
+  ret = _open_collections();
+  if (ret < 0) {
+    return ret;
+  }
+
+  read_alloc_stats_t stats = {};
+  utime_t    start = ceph_clock_now();
+  Allocator *allocator = create_bitmap_allocator(bdev->get_size());
+  if (allocator == nullptr) {
+    derr << "****failed create_bitmap_allocator()" << dendl;
+    return -1;
+  }
+
+  ret = reconstruct_allocations(allocator, stats);
+  if (ret != 0) {
+    delete allocator;
+    return ret;
+  }
+
+  uint64_t num_entries = 0;
+  dout(5) << " calling copy_allocator(bitmap_allocator -> shared_alloc.a)" << dendl;  
+  copy_allocator(allocator, shared_alloc.a, &num_entries);
+  delete allocator;
+  utime_t duration = ceph_clock_now() - start;
+  dout(5) << " <<<FINISH>>> in " << duration << " seconds, num_entries=" << num_entries << dendl;
+  dout(5) << "num_entries=" << num_entries << ", extent_count=" << stats.extent_count << dendl;  
+  dout(5) << "Allocation Recovery was completed" << dendl;
+  return ret;
+}
+
+
+
+
+// Only used for debugging purposes - we build a secondary allocator from the Onodes and compare it to the existing one
+// Not meant to be run by customers
+#ifdef CEPH_BLUESTORE_TOOL_RESTORE_ALLOCATION
+
+#include <stdlib.h>
+#include <algorithm>
+//---------------------------------------------------------
+int cmpfunc (const void * a, const void * b)
+{
+  if ( ((extent_t*)a)->offset > ((extent_t*)b)->offset ) {
+    return 1;
+  }
+  else if( ((extent_t*)a)->offset < ((extent_t*)b)->offset ) {
+    return -1;
+  }
+  else {
+    return 0;
+  }
+}
+
+// compare the allocator built from Onodes with the system allocator (CF-B)
+//---------------------------------------------------------
+int BlueStore::compare_allocators(Allocator* alloc1, Allocator* alloc2, uint64_t req_extent_count, uint64_t memory_target)
+{
+  uint64_t allocation_size = std::min((req_extent_count) * sizeof(extent_t), memory_target / 3);
+  uint64_t extent_count    = allocation_size/sizeof(extent_t);
+  dout(5) << "req_extent_count=" << req_extent_count << ", granted extent_count="<< extent_count << dendl;
+
+  unique_ptr<extent_t[]> arr1;
+  unique_ptr<extent_t[]> arr2;
+  try {
+    arr1 = make_unique<extent_t[]>(extent_count);
+    arr2 = make_unique<extent_t[]>(extent_count);
+  } catch (std::bad_alloc&) {
+    derr << "****Failed dynamic allocation, extent_count=" << extent_count << dendl;
+    return -1;
+  }
+
+  // copy the extents from the allocators into simple array and then compare them
+  uint64_t size1 = 0, size2 = 0;
+  uint64_t idx1  = 0, idx2  = 0;
+  auto iterated_mapper1 = [&](uint64_t offset, uint64_t length) {
+    //std::cout << "[" << idx1 << "]<" << offset << "," << length << ">" << std::endl;
+    size1 += length;
+    if (idx1 < extent_count) {
+      arr1[idx1++] = {offset, length};
+    }
+    else if (idx1 == extent_count) {
+      derr << "(2)compare_allocators:: spillover"  << dendl;
+      idx1 ++;
+    }
+
+  };
+
+  auto iterated_mapper2 = [&](uint64_t offset, uint64_t length) {
+    //std::cout << "[" << idx2 << "]<" << offset << "," << length << ">" << std::endl;
+    size2 += length;
+    if (idx2 < extent_count) {
+      arr2[idx2++] = {offset, length};
+    }
+    else if (idx2 == extent_count) {
+      derr << "(2)compare_allocators:: spillover"  << dendl;
+      idx2 ++;
+    }
+  };
+
+  alloc1->dump(iterated_mapper1);
+  //std::cout << __func__ << "alloc1->dump()::entry_count=" << idx1 << " size=" << size1 << std::endl;
+
+  alloc2->dump(iterated_mapper2);
+  //std::cout << __func__ << "::alloc2->dump()::entry_count=" << idx2 << " size=" << size2 << std::endl;
+
+  qsort(arr1.get(), std::min(idx1, extent_count), sizeof(extent_t), cmpfunc);
+  qsort(arr2.get(), std::min(idx2, extent_count), sizeof(extent_t), cmpfunc);
+
+  if (idx1 == idx2) {
+    idx1 = idx2 = std::min(idx1, extent_count);
+    if (memcmp(arr1.get(), arr2.get(), sizeof(extent_t) * idx2) == 0) {
+      return 0;
+    }
+    derr << "Failed memcmp(arr1, arr2, sizeof(extent_t)*idx2)"  << dendl;
+    for (uint64_t i = 0; i < idx1; i++) {
+      if (memcmp(arr1.get()+i, arr2.get()+i, sizeof(extent_t)) != 0) {
+       derr << "!!!![" << i << "] arr1::<" << arr1[i].offset << "," << arr1[i].length << ">" << dendl;
+       derr << "!!!![" << i << "] arr2::<" << arr2[i].offset << "," << arr2[i].length << ">" << dendl;
+       return -1;
+      }
+    }
+    return 0;
+  } else {
+    derr << "mismatch:: idx1=" << idx1 << " idx2=" << idx2 << dendl;
+    std::cout << "==================================================================="  << std::endl;
+    for (uint64_t i = 0; i < idx1; i++) {
+      std::cout << "arr1[" << i << "]<" << arr1[i].offset << "," << arr1[i].length << "> " << std::endl;
+    }
+
+    std::cout << "==================================================================="  << std::endl;
+    for (uint64_t i = 0; i < idx2; i++) {
+      std::cout << "arr2[" << i << "]<" << arr2[i].offset << "," << arr2[i].length << "> " << std::endl;
+    }
+    return -1;
+  }
+}
+
+//---------------------------------------------------------
+int BlueStore::add_existing_bluefs_allocation(Allocator* allocator, read_alloc_stats_t &stats)
+{
+  // then add space used by bluefs to store rocksdb
+  unsigned extent_count = 0;
+  if (bluefs) {
+    interval_set<uint64_t> bluefs_extents;
+    int ret = bluefs->get_block_extents(bluefs_layout.shared_bdev, &bluefs_extents);
+    if (ret < 0) {
+      return ret;
+    }
+    for (auto itr = bluefs_extents.begin(); itr != bluefs_extents.end(); extent_count++, itr++) {
+      //dout(5) << "BlueFS[" << extent_count << "] <" << itr.get_start() << "," << itr.get_len() << ">" << dendl;
+      allocator->init_rm_free(itr.get_start(), itr.get_len());
+      stats.extent_count++;
+    }
+  }
+
+  dout(5) << "bluefs extent_count=" << extent_count << dendl;
+  return 0;
+}
+
+//---------------------------------------------------------
+int BlueStore::read_allocation_from_drive_for_bluestore_tool(bool test_store_and_restore)
+{
+  dout(5) << "test_store_and_restore=" << test_store_and_restore << dendl;
+  int ret = 0;
+  uint64_t memory_target = cct->_conf.get_val<Option::size_t>("osd_memory_target");
+  dout(5) << "calling open_db_and_around()" << dendl;
+  ret = _open_db_and_around(true, false/*, true*/);
+  if (ret < 0) {
+    return ret;
+  }
+
+  ret = _open_collections();
+  if (ret < 0) {
+    _close_db_and_around(false); return ret;
+  }
+
+  read_alloc_stats_t stats = {};
+  uint64_t bdev_size = bdev->get_size();
+  Allocator* allocator = create_bitmap_allocator(bdev_size);
+  if (allocator) {
+    dout(5) << "bitmap-allocator=" << allocator << dendl;
+  } else {
+    return -1;
+  }
+  dout(5) << " calling reconstruct_allocations()" << dendl;
+  utime_t    start = ceph_clock_now();
+  ret = reconstruct_allocations(allocator, stats);
+  if (ret != 0) {
+    _close_db_and_around(false); return ret;
+  }
+
+  // add allocation space used by the bluefs itself
+  ret = add_existing_bluefs_allocation(allocator, stats);
+  if (ret < 0) {
+    _close_db_and_around(false); return ret;
+  }
+
+  utime_t duration = ceph_clock_now() - start;
+  stats.insert_count = 0;
+  auto count_entries = [&](uint64_t extent_offset, uint64_t extent_length) {
+    stats.insert_count++;
+  };
+  allocator->dump(count_entries);
+
+  dout(5) << "\n" << " <<<FINISH>>> in " << duration << " seconds; insert_count=" << stats.insert_count << dendl;
+  dout(5) << "\n" << " <<<FINISH>>> in " << duration << " seconds; extent_count=" << stats.extent_count << dendl;
+
+
+  dout(5) << "calling compare_allocator(shared_alloc.a) insert_count=" << stats.insert_count << dendl;
+  ret = compare_allocators(allocator, shared_alloc.a, stats.insert_count, memory_target);
+  if (ret == 0) {
+    dout(5) << "SUCCESS!!! compare(allocator, shared_alloc.a)" << dendl;
+  } else {
+    derr << "**** FAILURE compare(allocator, shared_alloc.a)::ret=" << ret << dendl;
+  }
+
+  if (test_store_and_restore) {
+    dout(5) << "calling store_allocator(shared_alloc.a)" << dendl;
+    store_allocator(shared_alloc.a);
+    Allocator* alloc2 = create_bitmap_allocator(bdev_size);
+    if (alloc2) {
+      dout(5) << "bitmap-allocator=" << alloc2 << dendl;
+      dout(5) << "calling restore_allocator()" << dendl;
+      uint64_t num, bytes;
+      int ret = restore_allocator(alloc2, &num, &bytes);
+      if (ret == 0) {
+       // add allocation space used by the bluefs itself
+       ret = add_existing_bluefs_allocation(alloc2, stats);
+       if (ret < 0) {
+         _close_db_and_around(false); return ret;
+       }
+       // verify that we can store and restore allocator to/from drive
+       ret = compare_allocators(alloc2, shared_alloc.a, stats.insert_count, memory_target);
+       if (ret == 0) {
+         dout(5) << "SUCCESS!!! compare(alloc2, shared_alloc.a)" << dendl;
+       } else {
+         derr << "**** FAILURE compare(alloc2, shared_alloc.a)::ret=" << ret << dendl;
+       }
+      } else {
+       derr << "******Failed restore_allocator******\n" << dendl;
+      }
+      delete alloc2;
+    } else {
+      derr << "Failed allcoator2 create" << dendl;
+    }
+  }
+
+  std::cout << "<<<FINISH>>> in " << duration << " seconds; insert_count=" << stats.insert_count << "\n\n" << std::endl;
+  std::cout << stats << std::endl;
+
+  //out_db:
+  delete allocator;
+  _shutdown_cache();
+  _close_db_and_around(false);
+  return ret;
+}
+
+//---------------------------------------------------------
+int BlueStore::db_cleanup(int ret)
+{
+  _shutdown_cache();
+  _close_db_and_around(false);
+  return ret;
+}
+
+//---------------------------------------------------------
+Allocator* BlueStore::clone_allocator_without_bluefs(Allocator *src_allocator)
+{
+  uint64_t   bdev_size = bdev->get_size();
+  Allocator* allocator = create_bitmap_allocator(bdev_size);
+  if (allocator) {
+    dout(5) << "bitmap-allocator=" << allocator << dendl;
+  } else {
+    derr << "****failed create_bitmap_allocator()" << dendl;
+    return nullptr;
+  }
+
+  uint64_t num_entries = 0;
+  dout(5) << "calling copy_allocator(shared_alloc.a -> bitmap_allocator)" << dendl;  
+  copy_allocator(src_allocator, allocator, &num_entries);
+
+  // BlueFS stores its internal allocation outside RocksDB (FM) so we should not destage them to the allcoator-file
+  // we are going to hide bluefs allocation during allocator-destage as they are stored elsewhere
+  {
+    std::vector<extent_t> bluefs_extents_vec;
+    // load current bluefs internal allocation into a vector
+    load_bluefs_extents(bluefs, &bluefs_layout, cct, path, bluefs_extents_vec, min_alloc_size);
+    // then remove them from the shared allocator before dumping it to disk (bluefs stored them internally)
+    for (auto itr = bluefs_extents_vec.begin(); itr != bluefs_extents_vec.end(); ++itr) {
+      allocator->init_add_free(itr->offset, itr->length);
+    }
+  }
+
+  return allocator;
+}
+
+//---------------------------------------------------------
+static void clear_allocation_objects_from_rocksdb(KeyValueDB *db, CephContext *cct, const std::string &path)
+{
+  dout(5) << "t->rmkeys_by_prefix(PREFIX_ALLOC_BITMAP)" << dendl;
+  KeyValueDB::Transaction t = db->get_transaction();
+  t->rmkeys_by_prefix(PREFIX_ALLOC_BITMAP);
+  db->submit_transaction_sync(t);
+}
+
+//---------------------------------------------------------
+void BlueStore::copy_allocator_content_to_fm(Allocator *allocator, FreelistManager *real_fm)
+{
+  unsigned max_txn = 1024;
+  dout(5) << "max_transaction_submit=" << max_txn << dendl;
+  uint64_t size = 0, idx = 0;
+  KeyValueDB::Transaction txn = db->get_transaction();
+  auto iterated_insert = [&](uint64_t offset, uint64_t length) {
+    size += length;
+    real_fm->release(offset, length, txn);
+    if ((++idx % max_txn) == 0) {
+      db->submit_transaction_sync(txn);
+      txn = db->get_transaction();
+    }
+  };
+  allocator->dump(iterated_insert);
+  if (idx % max_txn != 0) {
+    db->submit_transaction_sync(txn);
+  }
+  dout(5) << "size=" << size << ", num extents=" << idx  << dendl;
+}
+
+//---------------------------------------------------------
+Allocator* BlueStore::initialize_allocator_from_freelist(FreelistManager *real_fm)
+{
+  dout(5) << "real_fm->enumerate_next" << dendl;
+  Allocator* allocator2 = create_bitmap_allocator(bdev->get_size());
+  if (allocator2) {
+    dout(5) << "bitmap-allocator=" << allocator2 << dendl;
+  } else {
+    return nullptr;
+  }
+
+  uint64_t size2 = 0, idx2 = 0;
+  real_fm->enumerate_reset();
+  uint64_t offset, length;
+  while (real_fm->enumerate_next(db, &offset, &length)) {
+    allocator2->init_add_free(offset, length);
+    ++idx2;
+    size2 += length;
+  }
+  real_fm->enumerate_reset();
+
+  dout(5) << "size2=" << size2 << ", num2=" << idx2 << dendl;
+  return allocator2;
+}
+
+//---------------------------------------------------------
+// close the active fm and open it in a new mode like makefs()
+// but make sure to mark the full device space as allocated
+// later we will mark all exetents from the allocator as free
+int BlueStore::reset_fm_for_restore()
+{
+  dout(5) << "<<==>> fm->clear_null_manager()" << dendl;
+  fm->shutdown();
+  delete fm;
+  fm = nullptr;
+  freelist_type = "bitmap";
+  KeyValueDB::Transaction t = db->get_transaction();
+  // call _open_fm() with fm_restore set to TRUE
+  // this will mark the full device space as allocated (and not just the reserved space)
+  _open_fm(t, true, true);
+  if (fm == nullptr) {
+    derr << "Failed _open_fm()" << dendl;
+    return -1;
+  }
+  db->submit_transaction_sync(t);
+  ceph_assert(!fm->is_null_manager());
+  dout(5) << "fm was reactivated in full mode" << dendl;
+  return 0;
+}
+
+
+//---------------------------------------------------------
+// create a temp allocator filled with allocation state from the fm
+// and compare it to the base allocator passed in
+int BlueStore::verify_rocksdb_allocations(Allocator *allocator)
+{
+  dout(5) << "verify that shared_alloc content is identical to FM" << dendl;
+  // initialize from freelist
+  Allocator* temp_allocator = initialize_allocator_from_freelist(fm);
+  if (temp_allocator == nullptr) {
+    return -1;
+  }
+
+  uint64_t insert_count = 0;
+  auto count_entries = [&](uint64_t extent_offset, uint64_t extent_length) {
+    insert_count++;
+  };
+  temp_allocator->dump(count_entries);
+  uint64_t memory_target = cct->_conf.get_val<Option::size_t>("osd_memory_target");
+  int ret = compare_allocators(allocator, temp_allocator, insert_count, memory_target);
+
+  delete temp_allocator;
+
+  if (ret == 0) {
+    dout(5) << "SUCCESS!!! compare(allocator, temp_allocator)" << dendl;
+    return 0;
+  } else {
+    derr << "**** FAILURE compare(allocator, temp_allocator)::ret=" << ret << dendl;
+    return -1;
+  }
+}
+
+//---------------------------------------------------------
+// convert back the system from null-allocator to using rocksdb to store allocation
+int BlueStore::push_allocation_to_rocksdb()
+{
+  if (cct->_conf->bluestore_allocation_from_file) {
+    derr << "cct->_conf->bluestore_allocation_from_file must be cleared first" << dendl;
+    derr << "please change default to false in ceph.conf file>" << dendl;
+    return -1;
+  }
+
+  dout(5) << "calling open_db_and_around() in read/write mode" << dendl;
+  int ret = _open_db_and_around(false);
+  if (ret < 0) {
+    return ret;
+  }
+
+  if (!fm->is_null_manager()) {
+    derr << "This is not a NULL-MANAGER -> nothing to do..." << dendl;
+    return db_cleanup(0);
+  }
+
+  // start by creating a clone copy of the shared-allocator
+  unique_ptr<Allocator> allocator(clone_allocator_without_bluefs(shared_alloc.a));
+  if (!allocator) {
+    return db_cleanup(-1);
+  }
+
+  // remove all objects of PREFIX_ALLOC_BITMAP from RocksDB to guarantee a clean start
+  clear_allocation_objects_from_rocksdb(db, cct, path);
+
+  // then open fm in new mode with the full devie marked as alloctaed
+  if (reset_fm_for_restore() != 0) {
+    return db_cleanup(-1);
+  }
+
+  // push the free-space from the allocator (shared-alloc without bfs) to rocksdb
+  copy_allocator_content_to_fm(allocator.get(), fm);
+
+  // compare the allocator info with the info stored in the fm/rocksdb
+  if (verify_rocksdb_allocations(allocator.get()) == 0) {
+    // all is good -> we can commit to rocksdb allocator
+    commit_to_real_manager();
+  } else {
+    return db_cleanup(-1);
+  }
+
+  // can't be too paranoid :-)
+  dout(5) << "Running full scale verification..." << dendl;
+  // close db/fm/allocator and start fresh
+  db_cleanup(0);
+  dout(5) << "calling open_db_and_around() in read-only mode" << dendl;
+  ret = _open_db_and_around(true);
+  if (ret < 0) {
+    return db_cleanup(ret);
+  }
+  ceph_assert(!fm->is_null_manager());
+  ceph_assert(verify_rocksdb_allocations(allocator.get()) == 0);
+
+  return db_cleanup(ret);
+}
+
+#endif // CEPH_BLUESTORE_TOOL_RESTORE_ALLOCATION
+
+//-------------------------------------------------------------------------------------
+static int commit_freelist_type(KeyValueDB *db, const std::string& freelist_type, CephContext *cct, const std::string &path)
+{
+  // When freelist_type to "bitmap" we will store allocation in RocksDB
+  // When allocation-info is stored in a single file we set freelist_type to "null"
+  // This will direct the startup code to read allocation from file and not RocksDB
+  KeyValueDB::Transaction t = db->get_transaction();
+  if (t == nullptr) {
+    derr << "db->get_transaction() failed!!!" << dendl;
+    return -1;
+  }
+
+  bufferlist bl;
+  bl.append(freelist_type);
+  t->set(PREFIX_SUPER, "freelist_type", bl);
+
+  return db->submit_transaction_sync(t);
+}
+
+//-------------------------------------------------------------------------------------
+int BlueStore::commit_to_null_manager()
+{
+  dout(5) << "Set FreelistManager to NULL FM..." << dendl;
+  fm->set_null_manager();
+  freelist_type = "null";
+#if 1
+  return commit_freelist_type(db, freelist_type, cct, path);
+#else
+  // should check how long this step take on a big configuration as deletes are expensive
+  if (commit_freelist_type(db, freelist_type, cct, path) == 0) {
+    // remove all objects of PREFIX_ALLOC_BITMAP from RocksDB to guarantee a clean start
+    clear_allocation_objects_from_rocksdb(db, cct, path);
+  }
+#endif
+}
+
+
+//-------------------------------------------------------------------------------------
+int BlueStore::commit_to_real_manager()
+{
+  dout(5) << "Set FreelistManager to Real FM..." << dendl;
+  ceph_assert(!fm->is_null_manager());
+  freelist_type = "bitmap";
+  return commit_freelist_type(db, freelist_type, cct, path);
+}
+
+//================================================================================================================
+//================================================================================================================
index a73b3b853359bfbab5b90f17b63b8d9bb18e643a..a2a1750dfd123a8025bc2f3d3c06e69075e76ff6 100644 (file)
@@ -59,7 +59,6 @@
 class Allocator;
 class FreelistManager;
 class BlueStoreRepairer;
-
 //#define DEBUG_CACHE
 //#define DEBUG_DEFERRED
 
@@ -67,7 +66,7 @@ class BlueStoreRepairer;
 
 // constants for Buffer::optimize()
 #define MAX_BUFFER_SLOP_RATIO_DEN  8  // so actually 1/N
-
+#define CEPH_BLUESTORE_TOOL_RESTORE_ALLOCATION
 
 enum {
   l_bluestore_first = 732430,
@@ -952,6 +951,8 @@ public:
 
     /// split a blob (and referring extents)
     BlobRef split_blob(BlobRef lb, uint32_t blob_offset, uint32_t pos);
+
+    void provide_shard_info_to_onode(bufferlist v, uint32_t shard_id);
   };
 
   /// Compressed Blob Garbage collector
@@ -2030,6 +2031,7 @@ public:
     bool apply_defer();
   };
 
+  bool has_null_fm();
   // --------------------------------------------------------
   // members
 private:
@@ -2383,7 +2385,7 @@ private:
               bool to_repair_db=false,
               bool read_only = false);
   void _close_db(bool read_only);
-  int _open_fm(KeyValueDB::Transaction t, bool read_only);
+  int _open_fm(KeyValueDB::Transaction t, bool read_only, bool fm_restore = false);
   void _close_fm();
   int _write_out_fm_meta(uint64_t target_size);
   int _create_alloc();
@@ -2411,8 +2413,8 @@ public:
   }
 
   static int _write_bdev_label(CephContext* cct,
-                              std::string path, bluestore_bdev_label_t label);
-  static int _read_bdev_label(CephContext* cct, std::string path,
+                              const std::string &path, bluestore_bdev_label_t label);
+  static int _read_bdev_label(CephContext* cct, const std::string &path,
                              bluestore_bdev_label_t *label);
 private:
   int _check_or_set_bdev_label(std::string path, uint64_t size, std::string desc,
@@ -3447,8 +3449,96 @@ public:
     mempool::bluestore_fsck::list<std::string>* expecting_shards,
     std::map<BlobRef, bluestore_blob_t::unused_t>* referenced,
     const BlueStore::FSCK_ObjectCtx& ctx);
-
+#ifdef CEPH_BLUESTORE_TOOL_RESTORE_ALLOCATION
+  int  push_allocation_to_rocksdb();
+  int  read_allocation_from_drive_for_bluestore_tool(bool test_store_and_restore);
+  int  read_allocation_from_drive_for_fsck() { return read_allocation_from_drive_for_bluestore_tool(false); }
+#endif
 private:
+#define MAX_BLOBS_IN_ONODE 128
+  struct  read_alloc_stats_t {
+    //read_alloc_stats_t() { memset(&this, 0, sizeof(read_alloc_stats_t)); }
+    uint32_t onode_count             = 0;
+    uint32_t shard_count             = 0;
+
+    uint32_t skipped_repeated_extent = 0;
+    uint32_t skipped_illegal_extent  = 0;
+
+    uint32_t collection_search       = 0;
+    uint32_t pad_limit_count         = 0;
+
+    uint64_t compressed_blob_count   = 0;
+    uint64_t spanning_blob_count     = 0;
+    uint64_t insert_count            = 0;
+    uint64_t extent_count            = 0;
+
+    uint64_t saved_inplace_count     = 0;
+    uint32_t merge_insert_count      = 0;
+    uint32_t merge_inplace_count     = 0;
+
+    std::array<uint32_t, MAX_BLOBS_IN_ONODE+1>blobs_in_onode = {};
+    //uint32_t blobs_in_onode[MAX_BLOBS_IN_ONODE+1];
+  };
+
+  friend std::ostream& operator<<(std::ostream& out, const read_alloc_stats_t& stats) {
+    out << "==========================================================" << std::endl;
+    out << "NCB::onode_count             = " ;out.width(10);out << stats.onode_count << std::endl
+       << "NCB::shard_count             = " ;out.width(10);out << stats.shard_count << std::endl
+       << "NCB::compressed_blob_count   = " ;out.width(10);out << stats.compressed_blob_count << std::endl
+       << "NCB::spanning_blob_count     = " ;out.width(10);out << stats.spanning_blob_count << std::endl
+       << "NCB::collection search       = " ;out.width(10);out << stats.collection_search << std::endl
+       << "NCB::skipped_repeated_extent = " ;out.width(10);out << stats.skipped_repeated_extent << std::endl
+       << "NCB::skipped_illegal_extent  = " ;out.width(10);out << stats.skipped_illegal_extent << std::endl
+       << "NCB::extent_count            = " ;out.width(10);out << stats.extent_count << std::endl
+       << "NCB::insert_count            = " ;out.width(10);out << stats.insert_count << std::endl;
+
+    if (stats.merge_insert_count) {
+      out << "NCB::merge_insert_count      = " ;out.width(10);out << stats.merge_insert_count  << std::endl;
+    }
+    if (stats.merge_inplace_count ) {
+      out << "NCB::merge_inplace_count     = " ;out.width(10);out << stats.merge_inplace_count << std::endl;
+      out << "NCB::saved_inplace_count     = " ;out.width(10);out << stats.saved_inplace_count << std::endl;
+      out << "NCB::saved inplace per call  = " ;out.width(10);out << stats.saved_inplace_count/stats.merge_inplace_count << std::endl;
+    }
+    out << "==========================================================" << std::endl;
+
+    for (unsigned i = 0; i < MAX_BLOBS_IN_ONODE; i++ ) {
+      if (stats.blobs_in_onode[i]) {
+       out << "NCB::We had " ;out.width(9); out << stats.blobs_in_onode[i]
+           << " ONodes with "; out.width(3); out << i << " blobs" << std::endl;
+      }
+    }
+
+    if (stats.blobs_in_onode[MAX_BLOBS_IN_ONODE]) {
+      out << "NCB::We had " ;out.width(9);out << stats.blobs_in_onode[MAX_BLOBS_IN_ONODE]
+         << " ONodes with more than " << MAX_BLOBS_IN_ONODE << " blobs" << std::endl;
+    }
+    return out;
+  }
+
+  int  compare_allocators(Allocator* alloc1, Allocator* alloc2, uint64_t req_extent_count, uint64_t memory_target);
+  Allocator* create_bitmap_allocator(uint64_t bdev_size);
+  int  add_existing_bluefs_allocation(Allocator* allocator, read_alloc_stats_t& stats);
+  int  allocator_add_restored_entries(Allocator *allocator, const void *buff, unsigned extent_count, uint64_t *p_read_alloc_size,
+                                     uint64_t  *p_extent_count, const void *v_header, BlueFS::FileReader *p_handle, uint64_t offset);
+
+  int  copy_allocator(Allocator* src_alloc, Allocator *dest_alloc, uint64_t* p_num_entries);
+  int  store_allocator(Allocator* allocator);
+  int  invalidate_allocation_file_on_bluefs();
+  int  restore_allocator(Allocator* allocator, uint64_t *num, uint64_t *bytes);
+  int  read_allocation_from_drive_on_startup();
+  int  reconstruct_allocations(Allocator* allocator, read_alloc_stats_t &stats);
+  int  read_allocation_from_onodes(Allocator* allocator, read_alloc_stats_t& stats);
+  int  commit_to_null_manager();
+  int  commit_to_real_manager();
+  int  db_cleanup(int ret);
+  int  reset_fm_for_restore();
+  int  verify_rocksdb_allocations(Allocator *allocator);
+  Allocator* clone_allocator_without_bluefs(Allocator *src_allocator);
+  Allocator* initialize_allocator_from_freelist(FreelistManager *real_fm);
+  void copy_allocator_content_to_fm(Allocator *allocator, FreelistManager *real_fm);
+  void read_allocation_from_single_onode(Allocator* allocator, BlueStore::OnodeRef& onode_ref, read_alloc_stats_t&  stats);
+
   void _fsck_check_object_omap(FSCKDepth depth,
     OnodeRef& o,
     const BlueStore::FSCK_ObjectCtx& ctx);
@@ -3615,6 +3705,7 @@ public:
       return false;
     }
   };
+
 public:
   void fix_per_pool_omap(KeyValueDB *db, int);
   bool remove_key(KeyValueDB *db, const std::string& prefix, const std::string& key);
index 23be5148e2e032463473e2e4492b5c7e85a733f1..18ca45f85649af3ebfef1adf3d137f9ffd949de9 100644 (file)
 #include "bluestore_types.h"
 
 class FreelistManager {
+  bool         null_manager = false;
 public:
   CephContext* cct;
-  FreelistManager(CephContext* cct) : cct(cct) {}
+  explicit FreelistManager(CephContext* cct) : cct(cct) {}
   virtual ~FreelistManager() {}
 
   static FreelistManager *create(
@@ -50,6 +51,13 @@ public:
 
   virtual void get_meta(uint64_t target_size,
     std::vector<std::pair<string, string>>*) const = 0;
+
+  void set_null_manager() {
+    null_manager = true;
+  }
+  bool is_null_manager() {
+    return null_manager;
+  }
 };
 
 
index 178b6f2c7a808ac2fad054e61978ca41d362804e..ee03dc1a02cd899c7e11589695bf9994bd4df637 100644 (file)
@@ -307,6 +307,9 @@ int main(int argc, char **argv)
   po_positional.add_options()
     ("command", po::value<string>(&action),
         "fsck, "
+        "qfsck, "
+        "allocmap, "
+        "restore_cfb, "
         "repair, "
         "quick-fix, "
         "bluefs-export, "
@@ -417,7 +420,7 @@ int main(int argc, char **argv)
     }
   }
 
-  if (action == "fsck" || action == "repair" || action == "quick-fix") {
+  if (action == "fsck" || action == "repair" || action == "quick-fix" || action == "allocmap" || action == "qfsck" || action == "restore_cfb") {
     if (path.empty()) {
       cerr << "must specify bluestore path" << std::endl;
       exit(EXIT_FAILURE);
@@ -537,7 +540,58 @@ int main(int argc, char **argv)
     }
   }
 
-  if (action == "fsck" ||
+  if (action == "restore_cfb") {
+#ifndef CEPH_BLUESTORE_TOOL_RESTORE_ALLOCATION
+    cerr << action << " bluestore.restore_cfb is not supported!!! " << std::endl;
+    exit(EXIT_FAILURE);
+#else
+    cout << action << " bluestore.restore_cfb" << std::endl;
+    validate_path(cct.get(), path, false);
+    BlueStore bluestore(cct.get(), path);
+    int r = bluestore.push_allocation_to_rocksdb();
+    if (r < 0) {
+      cerr << action << " failed: " << cpp_strerror(r) << std::endl;
+      exit(EXIT_FAILURE);
+    } else {
+      cout << action << " success" << std::endl;
+    }
+#endif
+  }
+  else if (action == "allocmap") {
+#ifndef CEPH_BLUESTORE_TOOL_ENABLE_ALLOCMAP
+    cerr << action << " bluestore.allocmap is not supported!!! " << std::endl;
+    exit(EXIT_FAILURE);
+#else
+    cout << action << " bluestore.allocmap" << std::endl;
+    validate_path(cct.get(), path, false);
+    BlueStore bluestore(cct.get(), path);
+    int r = bluestore.read_allocation_from_drive_for_bluestore_tool(true);
+    if (r < 0) {
+      cerr << action << " failed: " << cpp_strerror(r) << std::endl;
+      exit(EXIT_FAILURE);
+    } else {
+      cout << action << " success" << std::endl;
+    }
+#endif
+  }
+  else if( action == "qfsck" ) {
+#ifndef CEPH_BLUESTORE_TOOL_RESTORE_ALLOCATION
+    cerr << action << " bluestore.qfsck is not supported!!! " << std::endl;
+    exit(EXIT_FAILURE);
+#else
+    cout << action << " bluestore.quick-fsck" << std::endl;
+    validate_path(cct.get(), path, false);
+    BlueStore bluestore(cct.get(), path);
+    int r = bluestore.read_allocation_from_drive_for_fsck();
+    if (r < 0) {
+      cerr << action << " failed: " << cpp_strerror(r) << std::endl;
+      exit(EXIT_FAILURE);
+    } else {
+      cout << action << " success" << std::endl;
+    }
+#endif
+  }
+  else if (action == "fsck" ||
       action == "repair" ||
       action == "quick-fix") {
     validate_path(cct.get(), path, false);
index c77929b0c22ea18d7273b74201fdbfc9b48413c6..deb841c510f6940a4fe042746530e372c0fae536 100644 (file)
@@ -8206,24 +8206,36 @@ TEST_P(StoreTestSpecificAUSize, BluestoreRepairTest) {
   }
 
   bstore->umount();
+  bool err_was_injected = false;
   //////////// leaked pextent fix ////////////
   cerr << "fix leaked pextents" << std::endl;
   ASSERT_EQ(bstore->fsck(false), 0);
   ASSERT_EQ(bstore->repair(false), 0);
   bstore->mount();
-  bstore->inject_leaked(0x30000);
+  if (!bstore->has_null_fm()) {
+    bstore->inject_leaked(0x30000);
+    err_was_injected = true;
+  }
+
   bstore->umount();
-  ASSERT_EQ(bstore->fsck(false), 1);
+  if (err_was_injected) {
+    ASSERT_EQ(bstore->fsck(false), 1);
+  }
   ASSERT_EQ(bstore->repair(false), 0);
   ASSERT_EQ(bstore->fsck(false), 0);
 
   //////////// false free fix ////////////
   cerr << "fix false free pextents" << std::endl;
   bstore->mount();
-  bstore->inject_false_free(cid, hoid);
+  if (!bstore->has_null_fm()) {
+    bstore->inject_false_free(cid, hoid);
+    err_was_injected = true;
+  }
   bstore->umount();
-  ASSERT_EQ(bstore->fsck(false), 2);
-  ASSERT_EQ(bstore->repair(false), 0);
+  if (err_was_injected) {
+    ASSERT_EQ(bstore->fsck(false), 2);
+    ASSERT_EQ(bstore->repair(false), 0);
+  }
   ASSERT_EQ(bstore->fsck(false), 0);
 
   //////////// verify invalid statfs ///////////
@@ -8266,9 +8278,9 @@ TEST_P(StoreTestSpecificAUSize, BluestoreRepairTest) {
   bstore->inject_misreference(cid, hoid, cid, hoid_dup, 0);
   bstore->inject_misreference(cid, hoid, cid, hoid_dup, (offs_base * repeats) / 2);
   bstore->inject_misreference(cid, hoid, cid, hoid_dup, offs_base * (repeats -1) );
-  
+  int expected_errors = bstore->has_null_fm() ? 3 : 6;
   bstore->umount();
-  ASSERT_EQ(bstore->fsck(false), 6);
+  ASSERT_EQ(bstore->fsck(false), expected_errors);
   ASSERT_EQ(bstore->repair(false), 0);
 
   ASSERT_EQ(bstore->fsck(true), 0);