]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os/bluestore: parallelize quick-fix running.
authorIgor Fedotov <ifedotov@suse.com>
Sun, 8 Sep 2019 22:37:13 +0000 (01:37 +0300)
committerIgor Fedotov <ifedotov@suse.com>
Mon, 18 Nov 2019 09:14:25 +0000 (12:14 +0300)
Signed-off-by: Igor Fedotov <ifedotov@suse.com>
(cherry picked from commit c432f2268db60eb701fd9b17618537513f5d4eea)

 Conflicts:
src/os/bluestore/BlueStore.cc
        - trivial

src/common/legacy_config_opts.h
src/common/options.cc
src/os/bluestore/BlueStore.cc
src/os/bluestore/BlueStore.h

index a7c8907bf2d7a3ce216dea7fe55ca6e313b0f0b1..d9ea52b2928745c7b2d1f4709457d2b109f6b037 100644 (file)
@@ -1050,6 +1050,7 @@ OPTION(bluestore_fsck_on_mkfs, OPT_BOOL)
 OPTION(bluestore_fsck_on_mkfs_deep, OPT_BOOL)
 OPTION(bluestore_sync_submit_transaction, OPT_BOOL) // submit kv txn in queueing thread (not kv_sync_thread)
 OPTION(bluestore_fsck_read_bytes_cap, OPT_U64)
+OPTION(bluestore_fsck_quick_fix_threads, OPT_INT)
 OPTION(bluestore_throttle_bytes, OPT_U64)
 OPTION(bluestore_throttle_deferred_bytes, OPT_U64)
 OPTION(bluestore_throttle_cost_per_io_hdd, OPT_U64)
index f22fe1a89da4238ec5926325330abc859bc33a2d..1b63c5516b0e317403568028b3b935acd3197245 100644 (file)
@@ -4759,6 +4759,10 @@ std::vector<Option> get_global_options() {
     .set_flag(Option::FLAG_RUNTIME)
     .set_description("Maximum bytes read at once by deep fsck"),
 
+    Option("bluestore_fsck_quick_fix_threads", Option::TYPE_INT, Option::LEVEL_ADVANCED)
+      .set_default(2)
+      .set_description("Number of additional threads to perform quick-fix (shallow fsck) command"),
+
     Option("bluestore_throttle_bytes", Option::TYPE_SIZE, Option::LEVEL_ADVANCED)
     .set_default(64_M)
     .set_flag(Option::FLAG_RUNTIME)
index d8e5313eaa27f9a0d3ba095a6a34dcea78bdf203..6dd8384a1c16841e9efce50281b985ad68c0569f 100644 (file)
@@ -6841,13 +6841,15 @@ void BlueStore::_fsck_check_pool_statfs(
   }
 }
 
-BlueStore::OnodeRef BlueStore::_fsck_check_objects_shallow(
-  FSCKDepth depth,
+BlueStore::OnodeRef BlueStore::fsck_check_objects_shallow(
+  BlueStore::FSCKDepth depth,
+  bool need_per_pool_stats,
+  int64_t pool_id,
   BlueStore::CollectionRef c,
   const ghobject_t& oid,
-  KeyValueDB::Iterator it,
+  const string& key,
+  const bufferlist& value,
   mempool::bluestore_fsck::list<string>& expecting_shards,
-  store_statfs_t* expected_statfs,
   map<BlobRef, bluestore_blob_t::unused_t>* referenced,
   const BlueStore::FSCK_ObjectCtx& ctx)
 {
@@ -6858,14 +6860,17 @@ BlueStore::OnodeRef BlueStore::_fsck_check_objects_shallow(
   auto& num_sharded_objects = ctx.num_sharded_objects;
   auto& num_spanning_blobs = ctx.num_spanning_blobs;
   auto used_blocks = ctx.used_blocks;
+  auto sb_info_lock = ctx.sb_info_lock;
   auto& sb_info = ctx.sb_info;
   auto repairer = ctx.repairer;
 
+  store_statfs_t* res_statfs = need_per_pool_stats ?
+    &ctx.expected_pool_statfs[pool_id] :
+    &ctx.expected_store_statfs;
 
   dout(10) << __func__ << "  " << oid << dendl;
-  store_statfs_t onode_statfs;
   OnodeRef o;
-  o.reset(Onode::decode(c, oid, it->key(), it->value()));
+  o.reset(Onode::decode(c, oid, key, value));
   ++num_objects;
 
   num_spanning_blobs += o->extent_map.spanning_blob_map.size();
@@ -6913,7 +6918,7 @@ BlueStore::OnodeRef BlueStore::_fsck_check_objects_shallow(
       ++errors;
     }
     pos = l.logical_offset + l.length;
-    onode_statfs.data_stored += l.length;
+    res_statfs->data_stored += l.length;
     ceph_assert(l.blob);
     const bluestore_blob_t& blob = l.blob->get_blob();
 
@@ -6963,8 +6968,8 @@ BlueStore::OnodeRef BlueStore::_fsck_check_objects_shallow(
       ++errors;
     }
     if (blob.is_compressed()) {
-      onode_statfs.data_compressed += blob.get_compressed_payload_length();
-      onode_statfs.data_compressed_original +=
+      res_statfs->data_compressed += blob.get_compressed_payload_length();
+      res_statfs->data_compressed_original +=
         i.first->get_referenced_bytes();
     }
     if (blob.is_shared()) {
@@ -6980,6 +6985,10 @@ BlueStore::OnodeRef BlueStore::_fsck_check_objects_shallow(
           << dendl;
         ++errors;
       }
+      // the below lock is optional and provided in multithreading mode only
+      if (sb_info_lock) {
+        sb_info_lock->lock();
+      }
       sb_info_t& sbi = sb_info[i.first->shared_blob->get_sbid()];
       ceph_assert(sbi.cid == coll_t() || sbi.cid == c->cid);
       ceph_assert(sbi.pool_id == INT64_MIN ||
@@ -6994,6 +7003,9 @@ BlueStore::OnodeRef BlueStore::_fsck_check_objects_shallow(
           sbi.ref_map.get(e.offset, e.length);
         }
       }
+      if (sb_info_lock) {
+        sb_info_lock->unlock();
+      }
     } else if (depth != FSCK_SHALLOW) {
       ceph_assert(used_blocks);
       errors += _fsck_check_extents(c->cid, oid, blob.get_extents(),
@@ -7001,37 +7013,320 @@ BlueStore::OnodeRef BlueStore::_fsck_check_objects_shallow(
         *used_blocks,
         fm->get_alloc_size(),
         repairer,
-        onode_statfs,
+        *res_statfs,
         depth);
     } else {
       errors += _fsck_sum_extents(
         blob.get_extents(),
         blob.is_compressed(),
-        onode_statfs);
+        *res_statfs);
     }
   } // for (auto& i : ref_map)
-  expected_statfs->add(onode_statfs);
   return o;
 }
 
+#include "common/WorkQueue.h"
+
+class ShallowFSCKThreadPool : public ThreadPool
+{
+public:
+  ShallowFSCKThreadPool(CephContext* cct_, std::string nm, std::string tn, int n) :
+    ThreadPool(cct_, nm, tn, n) {
+  }
+  void worker(ThreadPool::WorkThread* wt) override {
+    int next_wq = 0;
+    while (!_stop) {
+      next_wq %= work_queues.size();
+      WorkQueue_ *wq = work_queues[next_wq++];
+
+      void* item = wq->_void_dequeue();
+      if (item) {
+        processing++;
+        TPHandle tp_handle(cct, nullptr, wq->timeout_interval, wq->suicide_interval);
+        wq->_void_process(item, tp_handle);
+        processing--;
+      }
+    }
+  }
+  template <size_t BatchLen>
+  struct FSCKWorkQueue : public ThreadPool::WorkQueue_
+  {
+    struct Entry {
+      int64_t pool_id;
+      BlueStore::CollectionRef c;
+      ghobject_t oid;
+      string key;
+      bufferlist value;
+    };
+    struct Batch {
+      std::atomic<size_t> running = { 0 };
+      size_t entry_count = 0;
+      std::array<Entry, BatchLen> entries;
+
+      int64_t errors = 0;
+      int64_t warnings = 0;
+      uint64_t num_objects = 0;
+      uint64_t num_extents = 0;
+      uint64_t num_blobs = 0;
+      uint64_t num_sharded_objects = 0;
+      uint64_t num_spanning_blobs = 0;
+      store_statfs_t expected_store_statfs;
+      BlueStore::per_pool_statfs expected_pool_statfs;
+    };
+
+    size_t batchCount;
+    BlueStore* store = nullptr;
+    bool need_per_pool_stats;
+
+    mempool::bluestore_fsck::list<string>* expecting_shards = nullptr;
+    ceph::mutex* sb_info_lock = nullptr;
+    BlueStore::sb_info_map_t* sb_info = nullptr;
+    BlueStoreRepairer* repairer = nullptr;
+
+    Batch* batches = nullptr;
+    size_t last_batch_pos = 0;
+    bool batch_acquired = false;
+
+    FSCKWorkQueue(std::string n,
+                  size_t _batchCount,
+                  BlueStore* _store,
+                  bool _need_per_pool_stats,
+                  mempool::bluestore_fsck::list<string>& _expecting_shards,
+                  ceph::mutex* _sb_info_lock,
+                  BlueStore::sb_info_map_t& _sb_info,
+                  BlueStoreRepairer* _repairer) :
+      WorkQueue_(n, time_t(), time_t()),
+      batchCount(_batchCount),
+      store(_store),
+      need_per_pool_stats(_need_per_pool_stats),
+      expecting_shards(&_expecting_shards),
+      sb_info_lock(_sb_info_lock),
+      sb_info(&_sb_info),
+      repairer(_repairer)
+    {
+      batches = new Batch[batchCount];
+    }
+    ~FSCKWorkQueue() {
+      delete[] batches;
+    }
+
+    /// Remove all work items from the queue.
+    void _clear() override {
+      //do nothing
+    }
+    /// Check whether there is anything to do.
+    bool _empty() override {
+      ceph_assert(false);
+    }
+
+    /// Get the next work item to process.
+    void* _void_dequeue() override {
+      size_t pos = rand() % batchCount;
+      size_t pos0 = pos;
+      do {
+        auto& batch = batches[pos];
+        if (batch.running.fetch_add(1) == 0) {
+          if (batch.entry_count) {
+            return &batch;
+          }
+        }
+        batch.running--;
+        pos++;
+        pos %= batchCount;
+      } while (pos != pos0);
+      return nullptr;
+    }
+    /** @brief Process the work item.
+     * This function will be called several times in parallel
+     * and must therefore be thread-safe. */
+    void _void_process(void* item, TPHandle& handle) override {
+      Batch* batch = (Batch*)item;
+
+      BlueStore::FSCK_ObjectCtx ctx(
+        batch->errors,
+        batch->warnings,
+        batch->num_objects,
+        batch->num_extents,
+        batch->num_blobs,
+        batch->num_sharded_objects,
+        batch->num_spanning_blobs,
+        nullptr, // used_blocks
+        nullptr, // used_omap_head;
+        nullptr, // used_per_pool_omap_head;
+        nullptr, // used_pgmeta_omap_head;
+        sb_info_lock,
+        *sb_info,
+        batch->expected_store_statfs,
+        batch->expected_pool_statfs,
+        repairer);
+
+      for (size_t i = 0; i < batch->entry_count; i++) {
+        auto& entry = batch->entries[i];
+
+        store->fsck_check_objects_shallow(
+          BlueStore::FSCK_SHALLOW,
+          need_per_pool_stats,
+          entry.pool_id,
+          entry.c,
+          entry.oid,
+          entry.key,
+          entry.value,
+          *expecting_shards,
+          nullptr, // referenced
+          ctx);
+      }
+      //std::cout << "processed " << batch << std::endl;
+      batch->entry_count = 0;
+      batch->running--;
+    }
+    /** @brief Synchronously finish processing a work item.
+     * This function is called after _void_process with the global thread pool lock held,
+     * so at most one copy will execute simultaneously for a given thread pool.
+     * It can be used for non-thread-safe finalization. */
+    void _void_process_finish(void*) override {
+      ceph_assert(false);
+    }
+
+    bool queue(
+      int64_t pool_id,
+      BlueStore::CollectionRef c,
+      const ghobject_t& oid,
+      const string& key,
+      const bufferlist& value) {
+      bool res = false;
+      size_t pos0 = last_batch_pos;
+      if (!batch_acquired) {
+        do {
+          auto& batch = batches[last_batch_pos];
+          if (batch.running.fetch_add(1) == 0) {
+            if (batch.entry_count < BatchLen) {
+              batch_acquired = true;
+              break;
+            }
+          }
+          batch.running.fetch_sub(1);
+          last_batch_pos++;
+          last_batch_pos %= batchCount;
+        } while (last_batch_pos != pos0);
+      }
+      if (batch_acquired) {
+        auto& batch = batches[last_batch_pos];
+        ceph_assert(batch.running);
+        ceph_assert(batch.entry_count < BatchLen);
+
+        auto& entry = batch.entries[batch.entry_count];
+        entry.pool_id = pool_id;
+        entry.c = c;
+        entry.oid = oid;
+        entry.key = key;
+        entry.value = value;
+
+        ++batch.entry_count;
+        if (batch.entry_count == BatchLen) {
+          batch_acquired = false;
+          batch.running.fetch_sub(1);
+          last_batch_pos++;
+          last_batch_pos %= batchCount;
+        }
+        res = true;
+      }
+      return res;
+    }
+
+    void finalize(ThreadPool& tp,
+                  BlueStore::FSCK_ObjectCtx& ctx) {
+      if (batch_acquired) {
+        auto& batch = batches[last_batch_pos];
+        ceph_assert(batch.running);
+        batch.running.fetch_sub(1);
+      }
+      tp.stop();
+
+      for (size_t i = 0; i < batchCount; i++) {
+        auto& batch = batches[i];
+
+        //process leftovers if any
+        if (batch.entry_count) {
+          TPHandle tp_handle(store->cct,
+            nullptr,
+            timeout_interval,
+            suicide_interval);
+          ceph_assert(batch.running == 0);
+
+          batch.running++; // just to be on-par with the regular call
+          _void_process(&batch, tp_handle);
+        }
+        ceph_assert(batch.entry_count == 0);
+
+        ctx.errors += batch.errors;
+        ctx.warnings += batch.warnings;
+        ctx.num_objects += batch.num_objects;
+        ctx.num_extents += batch.num_extents;
+        ctx.num_blobs += batch.num_blobs;
+        ctx.num_sharded_objects += batch.num_sharded_objects;
+        ctx.num_spanning_blobs += batch.num_spanning_blobs;
+        ctx.expected_store_statfs.add(batch.expected_store_statfs);
+
+        for (auto it = batch.expected_pool_statfs.begin();
+          it != batch.expected_pool_statfs.end();
+          it++) {
+          ctx.expected_pool_statfs[it->first].add(it->second);
+        }
+      }
+    }
+  };
+};
+
 void BlueStore::_fsck_check_objects(FSCKDepth depth,
   bool need_per_pool_stats,
-  const BlueStore::FSCK_ObjectCtx& ctx)
+  BlueStore::FSCK_ObjectCtx& ctx)
 {
+  //no need for the below lock when in non-shallow mode as
+  // there is no multithreading in this case
+  if (depth != FSCK_SHALLOW) {
+    ctx.sb_info_lock = nullptr;
+  }
+
   auto& errors = ctx.errors;
-  auto* used_omap_head = ctx.used_omap_head;
-  auto* used_pgmeta_omap_head = ctx.used_pgmeta_omap_head;
+  auto used_omap_head = ctx.used_omap_head;
+  auto used_pgmeta_omap_head = ctx.used_pgmeta_omap_head;
+  auto sb_info_lock = ctx.sb_info_lock;
+  auto sb_info = ctx.sb_info;
+  auto repairer = ctx.repairer;
 
   uint64_t_btree_t used_nids;
 
+  size_t processed_myself = 0;
+
   auto it = db->get_iterator(PREFIX_OBJ);
+  mempool::bluestore_fsck::list<string> expecting_shards;
   if (it) {
-    //fill global if not overriden below
-    store_statfs_t* expected_statfs = &ctx.expected_store_statfs;
+    const size_t thread_count = cct->_conf->bluestore_fsck_quick_fix_threads;
+    typedef ShallowFSCKThreadPool::FSCKWorkQueue<256> WQ;
+    std::unique_ptr<WQ> wq(
+      new WQ(
+        "FSCKWorkQueue",
+        (thread_count ? : 1) * 32,
+        this,
+        need_per_pool_stats,
+        expecting_shards,
+        sb_info_lock,
+        sb_info,
+        repairer));
+
+    ShallowFSCKThreadPool thread_pool(cct, "ShallowFSCKThreadPool", "ShallowFSCK", thread_count);
+
+    thread_pool.add_work_queue(wq.get());
+    if (depth == FSCK_SHALLOW && thread_count > 0) {
+      //not the best place but let's check anyway
+      ceph_assert(sb_info_lock);
+      thread_pool.start();
+    }
 
+    //fill global if not overriden below
     CollectionRef c;
+    int64_t pool_id = -1;
     spg_t pgid;
-    mempool::bluestore_fsck::list<string> expecting_shards;
     for (it->lower_bound(string()); it->valid(); it->next()) {
       dout(30) << __func__ << " key "
         << pretty_binary_string(it->key()) << dendl;
@@ -7104,12 +7399,9 @@ void BlueStore::_fsck_check_objects(FSCKDepth depth,
           ++errors;
           continue;
         }
-        auto pool_id = c->cid.is_pg(&pgid) ? pgid.pool() : META_POOL_ID;
+        pool_id = c->cid.is_pg(&pgid) ? pgid.pool() : META_POOL_ID;
         dout(20) << __func__ << "  collection " << c->cid << " " << c->cnode
           << dendl;
-        if (need_per_pool_stats) {
-          expected_statfs = &ctx.expected_pool_statfs[pool_id];
-        }
 
         dout(20) << __func__ << "  collection " << c->cid << " " << c->cnode
           << dendl;
@@ -7126,25 +7418,30 @@ void BlueStore::_fsck_check_objects(FSCKDepth depth,
       }
 
       bool queued = false;
-      /*if (depth == FSCK_SHALLOW) {
-        queued = _fsck_queue_check_object(
+      if (depth == FSCK_SHALLOW && thread_count > 0) {
+        queued = wq->queue(
+          pool_id,
           c,
           oid,
-          it,
-        );
-      }*/
+          it->key(),
+          it->value());
+      }
       OnodeRef o;
       map<BlobRef, bluestore_blob_t::unused_t> referenced;
 
       if (!queued) {
-         o = _fsck_check_objects_shallow(
+        ++processed_myself;
+
+         o = fsck_check_objects_shallow(
           depth,
+          need_per_pool_stats,
+          pool_id,
           c,
           oid,
-          it,
+          it->key(),
+          it->value(),
           expecting_shards,
-          expected_statfs,
-           &referenced,
+          &referenced,
           ctx);
       }
 
@@ -7240,6 +7537,17 @@ void BlueStore::_fsck_check_objects(FSCKDepth depth,
         } // deep
       } //if (depth != FSCK_SHALLOW)
     } // for (it->lower_bound(string()); it->valid(); it->next())
+    if (depth == FSCK_SHALLOW && thread_count > 0) {
+      wq->finalize(thread_pool, ctx);
+      if (processed_myself) {
+        // may be needs more threads?
+        dout(0) << __func__ << " partial offload"
+                << ", done myself " << processed_myself
+                << " of " << ctx.num_objects
+                << "objects, threads " << thread_count
+                << dendl;
+      }
+    }
   } // if (it)
 }
 /**
@@ -7308,7 +7616,6 @@ int BlueStore::_fsck(BlueStore::FSCKDepth depth, bool repair)
   uint64_t num_shared_blobs = 0;
   uint64_t num_sharded_objects = 0;
   BlueStoreRepairer repairer;
-  //store_statfs_t* expected_statfs = nullptr;
   // in deep mode we need R/W write access to be able to replay deferred ops
   bool read_only = !(repair || depth == FSCK_DEEP);
 
@@ -7437,10 +7744,10 @@ int BlueStore::_fsck(BlueStore::FSCKDepth depth, bool repair)
     goto out_scan;
   }
   // walk PREFIX_OBJ
-  dout(1) << __func__ << " walking object keyspace" << dendl;
-  _fsck_check_objects(depth,
-    need_per_pool_stats,
-    BlueStore::FSCK_ObjectCtx(
+  {
+    dout(1) << __func__ << " walking object keyspace" << dendl;
+    ceph::mutex sb_info_lock =  ceph::make_mutex("BlueStore::fsck::sbinfo_lock");
+    BlueStore::FSCK_ObjectCtx ctx(
       errors,
       warnings,
       num_objects,
@@ -7452,10 +7759,15 @@ int BlueStore::_fsck(BlueStore::FSCKDepth depth, bool repair)
       &used_omap_head,
       nullptr,
       &used_pgmeta_omap_head,
+      &sb_info_lock,
       sb_info,
       expected_store_statfs,
       expected_pool_statfs,
-      repair ? &repairer : nullptr));
+      repair ? &repairer : nullptr);
+    _fsck_check_objects(depth,
+      need_per_pool_stats,
+      ctx);
+  }
 
   dout(1) << __func__ << " checking shared_blobs" << dendl;
   it = db->get_iterator(PREFIX_SHARED_BLOB);
index 01e36aefc26f0f4f4330780a2f812f1fd272356c..3ba2950197198dd19034377c88636c739ecfbb8f 100644 (file)
@@ -2332,13 +2332,13 @@ public:
   using  per_pool_statfs =
     mempool::bluestore_fsck::map<uint64_t, store_statfs_t>;
 
-private:
   enum FSCKDepth {
     FSCK_REGULAR,
     FSCK_DEEP,
     FSCK_SHALLOW
   };
 
+private:
   int _fsck_check_extents(
     const coll_t& cid,
     const ghobject_t& oid,
@@ -3063,6 +3063,7 @@ private:
   };
   size_t available_freespace(uint64_t alloc_size) override;
 
+public:
   struct sb_info_t {
     coll_t cid;
     int64_t pool_id = INT64_MIN;
@@ -3078,7 +3079,6 @@ private:
     mempool::bluestore_fsck::pool_allocator<uint64_t>> uint64_t_btree_t;
 
   typedef mempool::bluestore_fsck::map<uint64_t, sb_info_t> sb_info_map_t;
-
   struct FSCK_ObjectCtx {
     int64_t& errors;
     int64_t& warnings;
@@ -3092,6 +3092,8 @@ private:
     uint64_t_btree_t* used_omap_head;
     uint64_t_btree_t* used_per_pool_omap_head;
     uint64_t_btree_t* used_pgmeta_omap_head;
+
+    ceph::mutex* sb_info_lock;
     sb_info_map_t& sb_info;
 
     store_statfs_t& expected_store_statfs;
@@ -3109,6 +3111,7 @@ private:
                    uint64_t_btree_t* _used_omap_head,
                    uint64_t_btree_t* _used_per_pool_omap_head,
                    uint64_t_btree_t* _used_pgmeta_omap_head,
+                   ceph::mutex* _sb_info_lock,
                    sb_info_map_t& _sb_info,
                    store_statfs_t& _store_statfs,
                    per_pool_statfs& _pool_statfs,
@@ -3124,25 +3127,30 @@ private:
       used_omap_head(_used_omap_head),
       used_per_pool_omap_head(_used_per_pool_omap_head),
       used_pgmeta_omap_head(_used_pgmeta_omap_head),
+      sb_info_lock(_sb_info_lock),
       sb_info(_sb_info),
       expected_store_statfs(_store_statfs),
       expected_pool_statfs(_pool_statfs),
       repairer(_repairer) {
     }
   };
-  OnodeRef _fsck_check_objects_shallow(
+
+  OnodeRef fsck_check_objects_shallow(
     FSCKDepth depth,
+    bool need_per_pool_stats,
+    int64_t pool_id,
     CollectionRef c,
     const ghobject_t& oid,
-    KeyValueDB::Iterator it,
+    const string& key,
+    const bufferlist& value,
     mempool::bluestore_fsck::list<string>& expecting_shards,
-    store_statfs_t* expected_statfs,
     map<BlobRef, bluestore_blob_t::unused_t>* referenced,
     const BlueStore::FSCK_ObjectCtx& ctx);
 
+private:
   void _fsck_check_objects(FSCKDepth depth,
     bool need_per_pool_stats,
-    const FSCK_ObjectCtx& ctx);
+    FSCK_ObjectCtx& ctx);
 };
 
 inline ostream& operator<<(ostream& out, const BlueStore::volatile_statfs& s) {