}
}
-BlueStore::OnodeRef BlueStore::_fsck_check_objects_shallow(
- FSCKDepth depth,
+BlueStore::OnodeRef BlueStore::fsck_check_objects_shallow(
+ BlueStore::FSCKDepth depth,
+ bool need_per_pool_stats,
+ int64_t pool_id,
BlueStore::CollectionRef c,
const ghobject_t& oid,
- KeyValueDB::Iterator it,
+ const string& key,
+ const bufferlist& value,
mempool::bluestore_fsck::list<string>& expecting_shards,
- store_statfs_t* expected_statfs,
map<BlobRef, bluestore_blob_t::unused_t>* referenced,
const BlueStore::FSCK_ObjectCtx& ctx)
{
auto& num_sharded_objects = ctx.num_sharded_objects;
auto& num_spanning_blobs = ctx.num_spanning_blobs;
auto used_blocks = ctx.used_blocks;
+ auto sb_info_lock = ctx.sb_info_lock;
auto& sb_info = ctx.sb_info;
auto repairer = ctx.repairer;
+ store_statfs_t* res_statfs = need_per_pool_stats ?
+ &ctx.expected_pool_statfs[pool_id] :
+ &ctx.expected_store_statfs;
dout(10) << __func__ << " " << oid << dendl;
- store_statfs_t onode_statfs;
OnodeRef o;
- o.reset(Onode::decode(c, oid, it->key(), it->value()));
+ o.reset(Onode::decode(c, oid, key, value));
++num_objects;
num_spanning_blobs += o->extent_map.spanning_blob_map.size();
++errors;
}
pos = l.logical_offset + l.length;
- onode_statfs.data_stored += l.length;
+ res_statfs->data_stored += l.length;
ceph_assert(l.blob);
const bluestore_blob_t& blob = l.blob->get_blob();
++errors;
}
if (blob.is_compressed()) {
- onode_statfs.data_compressed += blob.get_compressed_payload_length();
- onode_statfs.data_compressed_original +=
+ res_statfs->data_compressed += blob.get_compressed_payload_length();
+ res_statfs->data_compressed_original +=
i.first->get_referenced_bytes();
}
if (blob.is_shared()) {
<< dendl;
++errors;
}
+ // the below lock is optional and provided in multithreading mode only
+ if (sb_info_lock) {
+ sb_info_lock->lock();
+ }
sb_info_t& sbi = sb_info[i.first->shared_blob->get_sbid()];
ceph_assert(sbi.cid == coll_t() || sbi.cid == c->cid);
ceph_assert(sbi.pool_id == INT64_MIN ||
sbi.ref_map.get(e.offset, e.length);
}
}
+ if (sb_info_lock) {
+ sb_info_lock->unlock();
+ }
} else if (depth != FSCK_SHALLOW) {
ceph_assert(used_blocks);
errors += _fsck_check_extents(c->cid, oid, blob.get_extents(),
*used_blocks,
fm->get_alloc_size(),
repairer,
- onode_statfs,
+ *res_statfs,
depth);
} else {
errors += _fsck_sum_extents(
blob.get_extents(),
blob.is_compressed(),
- onode_statfs);
+ *res_statfs);
}
} // for (auto& i : ref_map)
- expected_statfs->add(onode_statfs);
return o;
}
+#include "common/WorkQueue.h"
+
+class ShallowFSCKThreadPool : public ThreadPool
+{
+public:
+ ShallowFSCKThreadPool(CephContext* cct_, std::string nm, std::string tn, int n) :
+ ThreadPool(cct_, nm, tn, n) {
+ }
+ void worker(ThreadPool::WorkThread* wt) override {
+ int next_wq = 0;
+ while (!_stop) {
+ next_wq %= work_queues.size();
+ WorkQueue_ *wq = work_queues[next_wq++];
+
+ void* item = wq->_void_dequeue();
+ if (item) {
+ processing++;
+ TPHandle tp_handle(cct, nullptr, wq->timeout_interval, wq->suicide_interval);
+ wq->_void_process(item, tp_handle);
+ processing--;
+ }
+ }
+ }
+ template <size_t BatchLen>
+ struct FSCKWorkQueue : public ThreadPool::WorkQueue_
+ {
+ struct Entry {
+ int64_t pool_id;
+ BlueStore::CollectionRef c;
+ ghobject_t oid;
+ string key;
+ bufferlist value;
+ };
+ struct Batch {
+ std::atomic<size_t> running = { 0 };
+ size_t entry_count = 0;
+ std::array<Entry, BatchLen> entries;
+
+ int64_t errors = 0;
+ int64_t warnings = 0;
+ uint64_t num_objects = 0;
+ uint64_t num_extents = 0;
+ uint64_t num_blobs = 0;
+ uint64_t num_sharded_objects = 0;
+ uint64_t num_spanning_blobs = 0;
+ store_statfs_t expected_store_statfs;
+ BlueStore::per_pool_statfs expected_pool_statfs;
+ };
+
+ size_t batchCount;
+ BlueStore* store = nullptr;
+ bool need_per_pool_stats;
+
+ mempool::bluestore_fsck::list<string>* expecting_shards = nullptr;
+ ceph::mutex* sb_info_lock = nullptr;
+ BlueStore::sb_info_map_t* sb_info = nullptr;
+ BlueStoreRepairer* repairer = nullptr;
+
+ Batch* batches = nullptr;
+ size_t last_batch_pos = 0;
+ bool batch_acquired = false;
+
+ FSCKWorkQueue(std::string n,
+ size_t _batchCount,
+ BlueStore* _store,
+ bool _need_per_pool_stats,
+ mempool::bluestore_fsck::list<string>& _expecting_shards,
+ ceph::mutex* _sb_info_lock,
+ BlueStore::sb_info_map_t& _sb_info,
+ BlueStoreRepairer* _repairer) :
+ WorkQueue_(n, time_t(), time_t()),
+ batchCount(_batchCount),
+ store(_store),
+ need_per_pool_stats(_need_per_pool_stats),
+ expecting_shards(&_expecting_shards),
+ sb_info_lock(_sb_info_lock),
+ sb_info(&_sb_info),
+ repairer(_repairer)
+ {
+ batches = new Batch[batchCount];
+ }
+ ~FSCKWorkQueue() {
+ delete[] batches;
+ }
+
+ /// Remove all work items from the queue.
+ void _clear() override {
+ //do nothing
+ }
+ /// Check whether there is anything to do.
+ bool _empty() override {
+ ceph_assert(false);
+ }
+
+ /// Get the next work item to process.
+ void* _void_dequeue() override {
+ size_t pos = rand() % batchCount;
+ size_t pos0 = pos;
+ do {
+ auto& batch = batches[pos];
+ if (batch.running.fetch_add(1) == 0) {
+ if (batch.entry_count) {
+ return &batch;
+ }
+ }
+ batch.running--;
+ pos++;
+ pos %= batchCount;
+ } while (pos != pos0);
+ return nullptr;
+ }
+ /** @brief Process the work item.
+ * This function will be called several times in parallel
+ * and must therefore be thread-safe. */
+ void _void_process(void* item, TPHandle& handle) override {
+ Batch* batch = (Batch*)item;
+
+ BlueStore::FSCK_ObjectCtx ctx(
+ batch->errors,
+ batch->warnings,
+ batch->num_objects,
+ batch->num_extents,
+ batch->num_blobs,
+ batch->num_sharded_objects,
+ batch->num_spanning_blobs,
+ nullptr, // used_blocks
+ nullptr, // used_omap_head;
+ nullptr, // used_per_pool_omap_head;
+ nullptr, // used_pgmeta_omap_head;
+ sb_info_lock,
+ *sb_info,
+ batch->expected_store_statfs,
+ batch->expected_pool_statfs,
+ repairer);
+
+ for (size_t i = 0; i < batch->entry_count; i++) {
+ auto& entry = batch->entries[i];
+
+ store->fsck_check_objects_shallow(
+ BlueStore::FSCK_SHALLOW,
+ need_per_pool_stats,
+ entry.pool_id,
+ entry.c,
+ entry.oid,
+ entry.key,
+ entry.value,
+ *expecting_shards,
+ nullptr, // referenced
+ ctx);
+ }
+ //std::cout << "processed " << batch << std::endl;
+ batch->entry_count = 0;
+ batch->running--;
+ }
+ /** @brief Synchronously finish processing a work item.
+ * This function is called after _void_process with the global thread pool lock held,
+ * so at most one copy will execute simultaneously for a given thread pool.
+ * It can be used for non-thread-safe finalization. */
+ void _void_process_finish(void*) override {
+ ceph_assert(false);
+ }
+
+ bool queue(
+ int64_t pool_id,
+ BlueStore::CollectionRef c,
+ const ghobject_t& oid,
+ const string& key,
+ const bufferlist& value) {
+ bool res = false;
+ size_t pos0 = last_batch_pos;
+ if (!batch_acquired) {
+ do {
+ auto& batch = batches[last_batch_pos];
+ if (batch.running.fetch_add(1) == 0) {
+ if (batch.entry_count < BatchLen) {
+ batch_acquired = true;
+ break;
+ }
+ }
+ batch.running.fetch_sub(1);
+ last_batch_pos++;
+ last_batch_pos %= batchCount;
+ } while (last_batch_pos != pos0);
+ }
+ if (batch_acquired) {
+ auto& batch = batches[last_batch_pos];
+ ceph_assert(batch.running);
+ ceph_assert(batch.entry_count < BatchLen);
+
+ auto& entry = batch.entries[batch.entry_count];
+ entry.pool_id = pool_id;
+ entry.c = c;
+ entry.oid = oid;
+ entry.key = key;
+ entry.value = value;
+
+ ++batch.entry_count;
+ if (batch.entry_count == BatchLen) {
+ batch_acquired = false;
+ batch.running.fetch_sub(1);
+ last_batch_pos++;
+ last_batch_pos %= batchCount;
+ }
+ res = true;
+ }
+ return res;
+ }
+
+ void finalize(ThreadPool& tp,
+ BlueStore::FSCK_ObjectCtx& ctx) {
+ if (batch_acquired) {
+ auto& batch = batches[last_batch_pos];
+ ceph_assert(batch.running);
+ batch.running.fetch_sub(1);
+ }
+ tp.stop();
+
+ for (size_t i = 0; i < batchCount; i++) {
+ auto& batch = batches[i];
+
+ //process leftovers if any
+ if (batch.entry_count) {
+ TPHandle tp_handle(store->cct,
+ nullptr,
+ timeout_interval,
+ suicide_interval);
+ ceph_assert(batch.running == 0);
+
+ batch.running++; // just to be on-par with the regular call
+ _void_process(&batch, tp_handle);
+ }
+ ceph_assert(batch.entry_count == 0);
+
+ ctx.errors += batch.errors;
+ ctx.warnings += batch.warnings;
+ ctx.num_objects += batch.num_objects;
+ ctx.num_extents += batch.num_extents;
+ ctx.num_blobs += batch.num_blobs;
+ ctx.num_sharded_objects += batch.num_sharded_objects;
+ ctx.num_spanning_blobs += batch.num_spanning_blobs;
+ ctx.expected_store_statfs.add(batch.expected_store_statfs);
+
+ for (auto it = batch.expected_pool_statfs.begin();
+ it != batch.expected_pool_statfs.end();
+ it++) {
+ ctx.expected_pool_statfs[it->first].add(it->second);
+ }
+ }
+ }
+ };
+};
+
void BlueStore::_fsck_check_objects(FSCKDepth depth,
bool need_per_pool_stats,
- const BlueStore::FSCK_ObjectCtx& ctx)
+ BlueStore::FSCK_ObjectCtx& ctx)
{
+ //no need for the below lock when in non-shallow mode as
+ // there is no multithreading in this case
+ if (depth != FSCK_SHALLOW) {
+ ctx.sb_info_lock = nullptr;
+ }
+
auto& errors = ctx.errors;
- auto* used_omap_head = ctx.used_omap_head;
- auto* used_pgmeta_omap_head = ctx.used_pgmeta_omap_head;
+ auto used_omap_head = ctx.used_omap_head;
+ auto used_pgmeta_omap_head = ctx.used_pgmeta_omap_head;
+ auto sb_info_lock = ctx.sb_info_lock;
+ auto sb_info = ctx.sb_info;
+ auto repairer = ctx.repairer;
uint64_t_btree_t used_nids;
+ size_t processed_myself = 0;
+
auto it = db->get_iterator(PREFIX_OBJ);
+ mempool::bluestore_fsck::list<string> expecting_shards;
if (it) {
- //fill global if not overriden below
- store_statfs_t* expected_statfs = &ctx.expected_store_statfs;
+ const size_t thread_count = cct->_conf->bluestore_fsck_quick_fix_threads;
+ typedef ShallowFSCKThreadPool::FSCKWorkQueue<256> WQ;
+ std::unique_ptr<WQ> wq(
+ new WQ(
+ "FSCKWorkQueue",
+ (thread_count ? : 1) * 32,
+ this,
+ need_per_pool_stats,
+ expecting_shards,
+ sb_info_lock,
+ sb_info,
+ repairer));
+
+ ShallowFSCKThreadPool thread_pool(cct, "ShallowFSCKThreadPool", "ShallowFSCK", thread_count);
+
+ thread_pool.add_work_queue(wq.get());
+ if (depth == FSCK_SHALLOW && thread_count > 0) {
+ //not the best place but let's check anyway
+ ceph_assert(sb_info_lock);
+ thread_pool.start();
+ }
+ //fill global if not overriden below
CollectionRef c;
+ int64_t pool_id = -1;
spg_t pgid;
- mempool::bluestore_fsck::list<string> expecting_shards;
for (it->lower_bound(string()); it->valid(); it->next()) {
dout(30) << __func__ << " key "
<< pretty_binary_string(it->key()) << dendl;
++errors;
continue;
}
- auto pool_id = c->cid.is_pg(&pgid) ? pgid.pool() : META_POOL_ID;
+ pool_id = c->cid.is_pg(&pgid) ? pgid.pool() : META_POOL_ID;
dout(20) << __func__ << " collection " << c->cid << " " << c->cnode
<< dendl;
- if (need_per_pool_stats) {
- expected_statfs = &ctx.expected_pool_statfs[pool_id];
- }
dout(20) << __func__ << " collection " << c->cid << " " << c->cnode
<< dendl;
}
bool queued = false;
- /*if (depth == FSCK_SHALLOW) {
- queued = _fsck_queue_check_object(
+ if (depth == FSCK_SHALLOW && thread_count > 0) {
+ queued = wq->queue(
+ pool_id,
c,
oid,
- it,
- );
- }*/
+ it->key(),
+ it->value());
+ }
OnodeRef o;
map<BlobRef, bluestore_blob_t::unused_t> referenced;
if (!queued) {
- o = _fsck_check_objects_shallow(
+ ++processed_myself;
+
+ o = fsck_check_objects_shallow(
depth,
+ need_per_pool_stats,
+ pool_id,
c,
oid,
- it,
+ it->key(),
+ it->value(),
expecting_shards,
- expected_statfs,
- &referenced,
+ &referenced,
ctx);
}
} // deep
} //if (depth != FSCK_SHALLOW)
} // for (it->lower_bound(string()); it->valid(); it->next())
+ if (depth == FSCK_SHALLOW && thread_count > 0) {
+ wq->finalize(thread_pool, ctx);
+ if (processed_myself) {
+ // may be needs more threads?
+ dout(0) << __func__ << " partial offload"
+ << ", done myself " << processed_myself
+ << " of " << ctx.num_objects
+ << "objects, threads " << thread_count
+ << dendl;
+ }
+ }
} // if (it)
}
/**
uint64_t num_shared_blobs = 0;
uint64_t num_sharded_objects = 0;
BlueStoreRepairer repairer;
- //store_statfs_t* expected_statfs = nullptr;
// in deep mode we need R/W write access to be able to replay deferred ops
bool read_only = !(repair || depth == FSCK_DEEP);
goto out_scan;
}
// walk PREFIX_OBJ
- dout(1) << __func__ << " walking object keyspace" << dendl;
- _fsck_check_objects(depth,
- need_per_pool_stats,
- BlueStore::FSCK_ObjectCtx(
+ {
+ dout(1) << __func__ << " walking object keyspace" << dendl;
+ ceph::mutex sb_info_lock = ceph::make_mutex("BlueStore::fsck::sbinfo_lock");
+ BlueStore::FSCK_ObjectCtx ctx(
errors,
warnings,
num_objects,
&used_omap_head,
nullptr,
&used_pgmeta_omap_head,
+ &sb_info_lock,
sb_info,
expected_store_statfs,
expected_pool_statfs,
- repair ? &repairer : nullptr));
+ repair ? &repairer : nullptr);
+ _fsck_check_objects(depth,
+ need_per_pool_stats,
+ ctx);
+ }
dout(1) << __func__ << " checking shared_blobs" << dendl;
it = db->get_iterator(PREFIX_SHARED_BLOB);