* opens both DB and dependant super_meta, FreelistManager and allocator
* in the proper order
*/
-int BlueStore::_open_db_and_around(
- bool read_only,
- bool to_repair,
- bool apply_deferred,
- bool remove_deferred)
-{
- dout(5) << __func__ << "read_only=" << read_only
- << ", to_repair=" << to_repair
- << ", deferred=" << (apply_deferred?"apply;":"noapply;")
- << (remove_deferred?"remove":"noremove") << dendl;
- ceph_assert(remove_deferred == false || apply_deferred == true);
- ceph_assert(read_only == false || remove_deferred == false);
- std::vector<std::string> keys_to_remove;
+int BlueStore::_open_db_and_around(bool read_only, bool to_repair)
+{
+ dout(5) << __func__ << "::NCB::read_only=" << read_only << ", to_repair=" << to_repair << dendl;
{
string type;
int r = read_meta("type", &type);
if (bdev_label_multi) {
_main_bdev_label_try_reserve();
}
- // This is the place where we can apply deferred writes
- // without risk of some interaction with RocksDB allocating.
- if (apply_deferred) {
- _deferred_replay(remove_deferred ? &keys_to_remove : nullptr);
- }
// Re-open in the proper mode(s).
_post_init_alloc();
}
- if (remove_deferred && !keys_to_remove.empty()) {
- KeyValueDB::Transaction deferred_keys_remove_txn = db->get_transaction();
- for (auto& s : keys_to_remove) {
- deferred_keys_remove_txn->rmkey(PREFIX_DEFERRED, s);
- }
- db->submit_transaction_sync(deferred_keys_remove_txn);
- }
-
// when function is called in repair mode (to_repair=true) we skip db->open()/create()
// we can't change bluestore allocation so no need to invlidate allocation-file
if (fm->is_null_manager() && !read_only && !to_repair) {
}
});
- r = _deferred_replay(nullptr);
+ r = _deferred_replay();
if (r < 0) {
return r;
}
return -EINVAL;
}
- int r = _open_db_and_around(false, false, true, true);
+ dout(5) << __func__ << "::NCB::calling open_db_and_around(read/write)" << dendl;
+ int r = _open_db_and_around(false);
if (r < 0) {
return r;
}
}
});
+ r = _deferred_replay();
+ if (r < 0) {
+ return r;
+ }
+
mempool_thread.init();
if ((!per_pool_stat_collection || per_pool_omap != OMAP_PER_PG) &&
// in deep mode we need R/W write access to be able to replay deferred ops
const bool read_only = !(repair || depth == FSCK_DEEP);
- int r = _open_db_and_around(read_only, false, !read_only, !read_only);
+ int r = _open_db_and_around(read_only);
if (r < 0) {
return r;
}
mempool_thread.shutdown();
_shutdown_cache();
});
+ // we need finisher and kv_{sync,finalize}_thread *just* for replay
+ // enable in repair or deep mode modes only
+ if (!read_only) {
+ _kv_start();
+ r = _deferred_replay();
+ _kv_stop();
+ }
+
+ if (r < 0) {
+ return r;
+ }
return _fsck_on_open(depth, repair);
}
deque<DeferredBatch*> deferred_stable_queue; ///< deferred ios done + stable
std::unique_lock l{kv_lock};
ceph_assert(!kv_sync_started);
- ceph_assert(!db_was_opened_read_only);
kv_sync_started = true;
kv_cond.notify_all();
}
}
-int BlueStore::_deferred_replay(std::vector<std::string>* keys_to_remove)
+int BlueStore::_deferred_replay()
{
dout(10) << __func__ << " start" << dendl;
int count = 0;
}
if (tracepoint_debug_deferred_replay_start) tracepoint_debug_deferred_replay_start();
IOContext ioctx(cct, nullptr);
+ KeyValueDB::Transaction t = db->get_transaction();
KeyValueDB::Iterator it = db->get_iterator(PREFIX_DEFERRED);
- for (it->lower_bound(string()); it->valid(); /*iterator update outside*/) {
+ for (it->lower_bound(string()); it->valid(); it->next(), ++count) {
dout(20) << __func__ << " replay " << pretty_binary_string(it->key())
<< dendl;
- if (keys_to_remove) {
- keys_to_remove->push_back(it->key());
- }
- bluestore_deferred_transaction_t deferred_txn;
+ t->rmkey(PREFIX_DEFERRED, it->key());
+ bluestore_deferred_transaction_t *deferred_txn =
+ new bluestore_deferred_transaction_t;
bufferlist bl = it->value();
auto p = bl.cbegin();
try {
- decode(deferred_txn, p);
-
- bool has_some = _eliminate_outdated_deferred(&deferred_txn, bluefs_extents);
- if (has_some) {
- if (tracepoint_debug_deferred_replay_track) tracepoint_debug_deferred_replay_track(deferred_txn);
- for (auto& op: deferred_txn.ops) {
- for (auto& e : op.extents) {
- bufferlist t;
- op.data.splice(0, e.length, &t);
- bdev->aio_write(e.offset, t, &ioctx, false);
- }
- }
- }
+ decode(*deferred_txn, p);
} catch (ceph::buffer::error& e) {
derr << __func__ << " failed to decode deferred txn "
<< pretty_binary_string(it->key()) << dendl;
+ delete deferred_txn;
r = -EIO;
+ goto out;
}
- // update loop iteration here, so we can inject action
- ++count;
- it->next();
- if (ioctx.num_pending.load() > 100 || !it->valid()) {
- dout(20) << __func__ << "submitting IO batch" << dendl;
- bdev->aio_submit(&ioctx);
- ioctx.aio_wait();
- dout(20) << __func__ << "wait done" << dendl;
- ioctx.release_running_aios();
+ bool has_some = _eliminate_outdated_deferred(deferred_txn, bluefs_extents);
+ if (has_some) {
+ if (tracepoint_debug_deferred_replay_track) tracepoint_debug_deferred_replay_track(*deferred_txn);
+ for (auto& op: deferred_txn->ops) {
+ for (auto& e : op.extents) {
+ bufferlist t;
+ op.data.splice(0, e.length, &t);
+ bdev->aio_write(e.offset, t, &ioctx, false);
+ }
+ }
+ } else {
+ delete deferred_txn;
}
}
+ out:
+ bdev->aio_submit(&ioctx);
+ dout(20) << __func__ << "waiting to complete IO" << dendl;
+ ioctx.aio_wait();
+ dout(20) << __func__ << "wait done" << dendl;
+ if (!db_was_opened_read_only) {
+ db->submit_transaction_sync(t);
+ dout(20) << __func__ << "removed L keys" << dendl;
+ } else {
+ dout(10) << __func__ << "DB read-pnly, skipped L keys removal" << dendl;
+ }
if (tracepoint_debug_deferred_replay_end) tracepoint_debug_deferred_replay_end();
dout(10) << __func__ << " completed " << count << " events" << dendl;
return r;