const string PREFIX_COLL = "C"; // collection name -> cnode_t
const string PREFIX_OBJ = "O"; // object name -> onode_t
const string PREFIX_OMAP = "M"; // u64 + keyname -> value
-const string PREFIX_WAL = "L"; // id -> wal_transaction_t
+const string PREFIX_DEFERRED = "L"; // id -> deferred_transaction_t
const string PREFIX_ALLOC = "B"; // u64 offset -> u64 length (freelist)
const string PREFIX_SHARED_BLOB = "X"; // u64 offset -> shared_blob_t
out->push_back('~');
}
-static void get_wal_key(uint64_t seq, string *out)
+static void get_deferred_key(uint64_t seq, string *out)
{
_key_encode_u64(seq, out);
}
coll_lock("BlueStore::coll_lock"),
throttle_ops(cct, "bluestore_max_ops", cct->_conf->bluestore_max_ops),
throttle_bytes(cct, "bluestore_max_bytes", cct->_conf->bluestore_max_bytes),
- throttle_wal_ops(cct, "bluestore_wal_max_ops",
+ throttle_deferred_ops(cct, "bluestore_deferred_max_ops",
cct->_conf->bluestore_max_ops +
- cct->_conf->bluestore_wal_max_ops),
- throttle_wal_bytes(cct, "bluestore_wal_max_bytes",
+ cct->_conf->bluestore_deferred_max_ops),
+ throttle_deferred_bytes(cct, "bluestore_deferred_max_bytes",
cct->_conf->bluestore_max_bytes +
- cct->_conf->bluestore_wal_max_bytes),
- wal_tp(cct,
- "BlueStore::wal_tp",
- "tp_wal",
- cct->_conf->bluestore_sync_wal_apply ? 0 : cct->_conf->bluestore_wal_threads,
- "bluestore_wal_threads"),
- wal_wq(this,
- cct->_conf->bluestore_wal_thread_timeout,
- cct->_conf->bluestore_wal_thread_suicide_timeout,
- &wal_tp),
+ cct->_conf->bluestore_deferred_max_bytes),
+ deferred_tp(cct,
+ "BlueStore::deferred_tp",
+ "tp_deferred",
+ cct->_conf->bluestore_sync_deferred_apply ? 0 : cct->_conf->bluestore_deferred_threads,
+ "bluestore_deferred_threads"),
+ deferred_wq(this,
+ cct->_conf->bluestore_deferred_thread_timeout,
+ cct->_conf->bluestore_deferred_thread_suicide_timeout,
+ &deferred_tp),
m_finisher_num(1),
kv_sync_thread(this),
kv_stop(false),
logger(NULL),
debug_read_error_lock("BlueStore::debug_read_error_lock"),
csum_type(Checksummer::CSUM_CRC32C),
- sync_wal_apply(cct->_conf->bluestore_sync_wal_apply),
+ sync_deferred_apply(cct->_conf->bluestore_sync_deferred_apply),
mempool_thread(this)
{
_init_logger();
coll_lock("BlueStore::coll_lock"),
throttle_ops(cct, "bluestore_max_ops", cct->_conf->bluestore_max_ops),
throttle_bytes(cct, "bluestore_max_bytes", cct->_conf->bluestore_max_bytes),
- throttle_wal_ops(cct, "bluestore_wal_max_ops",
+ throttle_deferred_ops(cct, "bluestore_deferred_max_ops",
cct->_conf->bluestore_max_ops +
- cct->_conf->bluestore_wal_max_ops),
- throttle_wal_bytes(cct, "bluestore_wal_max_bytes",
+ cct->_conf->bluestore_deferred_max_ops),
+ throttle_deferred_bytes(cct, "bluestore_deferred_max_bytes",
cct->_conf->bluestore_max_bytes +
- cct->_conf->bluestore_wal_max_bytes),
- wal_tp(cct,
- "BlueStore::wal_tp",
- "tp_wal",
- cct->_conf->bluestore_sync_wal_apply ? 0 : cct->_conf->bluestore_wal_threads,
- "bluestore_wal_threads"),
- wal_wq(this,
- cct->_conf->bluestore_wal_thread_timeout,
- cct->_conf->bluestore_wal_thread_suicide_timeout,
- &wal_tp),
+ cct->_conf->bluestore_deferred_max_bytes),
+ deferred_tp(cct,
+ "BlueStore::deferred_tp",
+ "tp_deferred",
+ cct->_conf->bluestore_sync_deferred_apply ? 0 : cct->_conf->bluestore_deferred_threads,
+ "bluestore_deferred_threads"),
+ deferred_wq(this,
+ cct->_conf->bluestore_deferred_thread_timeout,
+ cct->_conf->bluestore_deferred_thread_suicide_timeout,
+ &deferred_tp),
m_finisher_num(1),
kv_sync_thread(this),
kv_stop(false),
csum_type(Checksummer::CSUM_CRC32C),
min_alloc_size(_min_alloc_size),
min_alloc_size_order(ctz(_min_alloc_size)),
- sync_wal_apply(cct->_conf->bluestore_sync_wal_apply),
+ sync_deferred_apply(cct->_conf->bluestore_sync_deferred_apply),
mempool_thread(this)
{
_init_logger();
"bluestore_compression_min_blob_size",
"bluestore_compression_max_blob_size",
"bluestore_max_alloc_size",
- "bluestore_prefer_wal_size",
+ "bluestore_prefer_deferred_size",
NULL
};
return KEYS;
changed.count("bluestore_compression_max_blob_size")) {
_set_compression();
}
- if (changed.count("bluestore_prefer_wal_size") ||
+ if (changed.count("bluestore_prefer_deferred_size") ||
changed.count("bluestore_max_alloc_size")) {
if (bdev) {
// only after startup
"Average kv_commiting state latency");
b.add_time_avg(l_bluestore_state_kv_done_lat, "state_kv_done_lat",
"Average kv_done state latency");
- b.add_time_avg(l_bluestore_state_wal_queued_lat, "state_wal_queued_lat",
- "Average wal_queued state latency");
- b.add_time_avg(l_bluestore_state_wal_applying_lat, "state_wal_applying_lat",
- "Average wal_applying state latency");
- b.add_time_avg(l_bluestore_state_wal_aio_wait_lat, "state_wal_aio_wait_lat",
+ b.add_time_avg(l_bluestore_state_deferred_queued_lat, "state_deferred_queued_lat",
+ "Average deferred_queued state latency");
+ b.add_time_avg(l_bluestore_state_deferred_applying_lat, "state_deferred_applying_lat",
+ "Average deferred_applying state latency");
+ b.add_time_avg(l_bluestore_state_deferred_aio_wait_lat, "state_deferred_aio_wait_lat",
"Average aio_wait state latency");
- b.add_time_avg(l_bluestore_state_wal_cleanup_lat, "state_wal_cleanup_lat",
+ b.add_time_avg(l_bluestore_state_deferred_cleanup_lat, "state_deferred_cleanup_lat",
"Average cleanup state latency");
b.add_time_avg(l_bluestore_state_finishing_lat, "state_finishing_lat",
"Average finishing state latency");
"Sum for compress ops rejected due to low net gain of space");
b.add_u64(l_bluestore_write_pad_bytes, "write_pad_bytes",
"Sum for write-op padded bytes");
- b.add_u64(l_bluestore_wal_write_ops, "wal_write_ops",
- "Sum for wal write op");
- b.add_u64(l_bluestore_wal_write_bytes, "wal_write_bytes",
- "Sum for wal write bytes");
+ b.add_u64(l_bluestore_deferred_write_ops, "deferred_write_ops",
+ "Sum for deferred write op");
+ b.add_u64(l_bluestore_deferred_write_bytes, "deferred_write_bytes",
+ "Sum for deferred write bytes");
b.add_u64(l_bluestore_write_penalty_read_ops, "write_penalty_read_ops",
"Sum for write penalty read ops");
b.add_u64(l_bluestore_allocated, "bluestore_allocated",
"Small writes into existing or sparse small blobs (bytes)");
b.add_u64(l_bluestore_write_small_unused, "bluestore_write_small_unused",
"Small writes into unused portion of existing blob");
- b.add_u64(l_bluestore_write_small_wal, "bluestore_write_small_wal",
- "Small overwrites using WAL");
+ b.add_u64(l_bluestore_write_small_deferred, "bluestore_write_small_deferred",
+ "Small overwrites using deferred");
b.add_u64(l_bluestore_write_small_pre_read, "bluestore_write_small_pre_read",
"Small writes that required we read some data (possibly cached) to "
"fill out the block");
"Current ops in queue");
b.add_u64(l_bluestore_cur_bytes_in_queue, "bluestore_cur_bytes_in_queue",
"Current bytes in queue");
- b.add_u64(l_bluestore_cur_ops_in_wal_queue, "bluestore_cur_ops_in_wal_queue",
- "Current wal ops in wal queue");
- b.add_u64(l_bluestore_cur_bytes_in_wal_queue, "bluestore_cur_bytes_in_wal_queue",
- "Current wal bytes in wal queue");
+ b.add_u64(l_bluestore_cur_ops_in_deferred_queue, "bluestore_cur_ops_in_deferred_queue",
+ "Current deferred ops in queue");
+ b.add_u64(l_bluestore_cur_bytes_in_deferred_queue, "bluestore_cur_bytes_in_deferred_queue",
+ "Current deferred bytes in queue");
b.add_u64(l_bluestore_txc, "bluestore_txc", "Transactions committed");
b.add_u64(l_bluestore_onode_reshard, "bluestore_onode_reshard",
max_alloc_size = cct->_conf->bluestore_max_alloc_size;
- if (cct->_conf->bluestore_prefer_wal_size) {
- prefer_wal_size = cct->_conf->bluestore_prefer_wal_size;
+ if (cct->_conf->bluestore_prefer_deferred_size) {
+ prefer_deferred_size = cct->_conf->bluestore_prefer_deferred_size;
} else {
assert(bdev);
if (bdev->is_rotational()) {
- prefer_wal_size = cct->_conf->bluestore_prefer_wal_size_hdd;
+ prefer_deferred_size = cct->_conf->bluestore_prefer_deferred_size_hdd;
} else {
- prefer_wal_size = cct->_conf->bluestore_prefer_wal_size_ssd;
+ prefer_deferred_size = cct->_conf->bluestore_prefer_deferred_size_ssd;
}
}
for (auto f : finishers) {
f->start();
}
- wal_tp.start();
+ deferred_tp.start();
kv_sync_thread.create("bstore_kv_sync");
- r = _wal_replay();
+ r = _deferred_replay();
if (r < 0)
goto out_stop;
out_stop:
_kv_stop();
- wal_wq.drain();
- wal_tp.stop();
+ deferred_wq.drain();
+ deferred_tp.stop();
for (auto f : finishers) {
f->wait_for_empty();
f->stop();
dout(20) << __func__ << " stopping kv thread" << dendl;
_kv_stop();
- dout(20) << __func__ << " draining wal_wq" << dendl;
- wal_wq.drain();
- dout(20) << __func__ << " stopping wal_tp" << dendl;
- wal_tp.stop();
+ dout(20) << __func__ << " draining deferred_wq" << dendl;
+ deferred_wq.drain();
+ dout(20) << __func__ << " stopping deferred_tp" << dendl;
+ deferred_tp.stop();
for (auto f : finishers) {
dout(20) << __func__ << " draining finisher" << dendl;
f->wait_for_empty();
}
}
- dout(1) << __func__ << " checking wal events" << dendl;
- it = db->get_iterator(PREFIX_WAL);
+ dout(1) << __func__ << " checking deferred events" << dendl;
+ it = db->get_iterator(PREFIX_DEFERRED);
if (it) {
for (it->lower_bound(string()); it->valid(); it->next()) {
bufferlist bl = it->value();
bufferlist::iterator p = bl.begin();
- bluestore_wal_transaction_t wt;
+ bluestore_deferred_transaction_t wt;
try {
::decode(wt, p);
} catch (buffer::error& e) {
- derr << __func__ << " failed to decode wal txn "
+ derr << __func__ << " failed to decode deferred txn "
<< pretty_binary_string(it->key()) << dendl;
r = -EIO;
goto out_scan;
}
- dout(20) << __func__ << " wal " << wt.seq
+ dout(20) << __func__ << " deferred " << wt.seq
<< " ops " << wt.ops.size()
<< " released 0x" << std::hex << wt.released << std::dec << dendl;
for (auto e = wt.released.begin(); e != wt.released.end(); ++e) {
apply(
- e.get_start(), e.get_len(), block_size, used_blocks, "wal",
+ e.get_start(), e.get_len(), block_size, used_blocks, "deferred",
[&](uint64_t pos, boost::dynamic_bitset<> &bs) {
bs.set(pos);
}
case TransContext::STATE_KV_DONE:
txc->log_state_latency(logger, l_bluestore_state_kv_done_lat);
- if (txc->wal_txn) {
- txc->state = TransContext::STATE_WAL_QUEUED;
- if (sync_wal_apply) {
- _wal_apply(txc);
+ if (txc->deferred_txn) {
+ txc->state = TransContext::STATE_DEFERRED_QUEUED;
+ if (sync_deferred_apply) {
+ _deferred_apply(txc);
} else {
- wal_wq.queue(txc);
+ deferred_wq.queue(txc);
}
return;
}
txc->state = TransContext::STATE_FINISHING;
break;
- case TransContext::STATE_WAL_APPLYING:
- txc->log_state_latency(logger, l_bluestore_state_wal_applying_lat);
+ case TransContext::STATE_DEFERRED_APPLYING:
+ txc->log_state_latency(logger, l_bluestore_state_deferred_applying_lat);
if (txc->ioc.has_pending_aios()) {
- txc->state = TransContext::STATE_WAL_AIO_WAIT;
+ txc->state = TransContext::STATE_DEFERRED_AIO_WAIT;
_txc_aio_submit(txc);
return;
}
// ** fall-thru **
- case TransContext::STATE_WAL_AIO_WAIT:
- txc->log_state_latency(logger, l_bluestore_state_wal_aio_wait_lat);
- _wal_finish(txc);
+ case TransContext::STATE_DEFERRED_AIO_WAIT:
+ txc->log_state_latency(logger, l_bluestore_state_deferred_aio_wait_lat);
+ _deferred_finish(txc);
return;
- case TransContext::STATE_WAL_CLEANUP:
- txc->log_state_latency(logger, l_bluestore_state_wal_cleanup_lat);
+ case TransContext::STATE_DEFERRED_CLEANUP:
+ txc->log_state_latency(logger, l_bluestore_state_deferred_cleanup_lat);
txc->state = TransContext::STATE_FINISHING;
// ** fall-thru **
txc->removed_collections.pop_front();
}
- _op_queue_release_wal_throttle(txc);
+ _op_queue_release_deferred_throttle(txc);
OpSequencerRef osr = txc->osr;
{
std::unique_lock<std::mutex> l(kv_lock);
while (true) {
assert(kv_committing.empty());
- if (kv_queue.empty() && wal_cleanup_queue.empty()) {
+ if (kv_queue.empty() && deferred_cleanup_queue.empty()) {
if (kv_stop)
break;
dout(20) << __func__ << " sleep" << dendl;
dout(20) << __func__ << " wake" << dendl;
} else {
deque<TransContext*> kv_submitting;
- deque<TransContext*> wal_cleaning;
+ deque<TransContext*> deferred_cleaning;
dout(20) << __func__ << " committing " << kv_queue.size()
<< " submitting " << kv_queue_unsubmitted.size()
- << " cleaning " << wal_cleanup_queue.size() << dendl;
+ << " cleaning " << deferred_cleanup_queue.size() << dendl;
kv_committing.swap(kv_queue);
kv_submitting.swap(kv_queue_unsubmitted);
- wal_cleaning.swap(wal_cleanup_queue);
+ deferred_cleaning.swap(deferred_cleanup_queue);
utime_t start = ceph_clock_now();
l.unlock();
dout(30) << __func__ << " committing txc " << kv_committing << dendl;
dout(30) << __func__ << " submitting txc " << kv_submitting << dendl;
- dout(30) << __func__ << " wal_cleaning txc " << wal_cleaning << dendl;
+ dout(30) << __func__ << " deferred_cleaning txc " << deferred_cleaning << dendl;
// flush/barrier on block device
bdev->flush();
}
}
- // cleanup sync wal keys
- for (std::deque<TransContext *>::iterator it = wal_cleaning.begin();
- it != wal_cleaning.end();
+ // cleanup sync deferred keys
+ for (std::deque<TransContext *>::iterator it = deferred_cleaning.begin();
+ it != deferred_cleaning.end();
++it) {
- bluestore_wal_transaction_t& wt =*(*it)->wal_txn;
+ bluestore_deferred_transaction_t& wt =*(*it)->deferred_txn;
// kv metadata updates
_txc_finalize_kv(*it, synct);
- // cleanup the wal
+ // cleanup the deferred
string key;
- get_wal_key(wt.seq, &key);
- synct->rm_single_key(PREFIX_WAL, key);
+ get_deferred_key(wt.seq, &key);
+ synct->rm_single_key(PREFIX_DEFERRED, key);
}
// submit synct synchronously (block and wait for it to commit)
utime_t finish = ceph_clock_now();
utime_t dur = finish - start;
dout(20) << __func__ << " committed " << kv_committing.size()
- << " cleaned " << wal_cleaning.size()
+ << " cleaned " << deferred_cleaning.size()
<< " in " << dur << dendl;
while (!kv_committing.empty()) {
TransContext *txc = kv_committing.front();
_txc_state_proc(txc);
kv_committing.pop_front();
}
- while (!wal_cleaning.empty()) {
- TransContext *txc = wal_cleaning.front();
+ while (!deferred_cleaning.empty()) {
+ TransContext *txc = deferred_cleaning.front();
_txc_release_alloc(txc);
_txc_state_proc(txc);
- wal_cleaning.pop_front();
+ deferred_cleaning.pop_front();
}
// this is as good a place as any ...
dout(10) << __func__ << " finish" << dendl;
}
-bluestore_wal_op_t *BlueStore::_get_wal_op(TransContext *txc, OnodeRef o)
+bluestore_deferred_op_t *BlueStore::_get_deferred_op(TransContext *txc, OnodeRef o)
{
- if (!txc->wal_txn) {
- txc->wal_txn = new bluestore_wal_transaction_t;
+ if (!txc->deferred_txn) {
+ txc->deferred_txn = new bluestore_deferred_transaction_t;
}
- txc->wal_txn->ops.push_back(bluestore_wal_op_t());
- return &txc->wal_txn->ops.back();
+ txc->deferred_txn->ops.push_back(bluestore_deferred_op_t());
+ return &txc->deferred_txn->ops.back();
}
-int BlueStore::_wal_apply(TransContext *txc)
+int BlueStore::_deferred_apply(TransContext *txc)
{
- bluestore_wal_transaction_t& wt = *txc->wal_txn;
+ bluestore_deferred_transaction_t& wt = *txc->deferred_txn;
dout(20) << __func__ << " txc " << txc << " seq " << wt.seq << dendl;
- txc->log_state_latency(logger, l_bluestore_state_wal_queued_lat);
- txc->state = TransContext::STATE_WAL_APPLYING;
+ txc->log_state_latency(logger, l_bluestore_state_deferred_queued_lat);
+ txc->state = TransContext::STATE_DEFERRED_APPLYING;
- if (cct->_conf->bluestore_inject_wal_apply_delay) {
- dout(20) << __func__ << " bluestore_inject_wal_apply_delay "
- << cct->_conf->bluestore_inject_wal_apply_delay
+ if (cct->_conf->bluestore_inject_deferred_apply_delay) {
+ dout(20) << __func__ << " bluestore_inject_deferred_apply_delay "
+ << cct->_conf->bluestore_inject_deferred_apply_delay
<< dendl;
utime_t t;
- t.set_from_double(cct->_conf->bluestore_inject_wal_apply_delay);
+ t.set_from_double(cct->_conf->bluestore_inject_deferred_apply_delay);
t.sleep();
dout(20) << __func__ << " finished sleep" << dendl;
}
assert(txc->ioc.pending_aios.empty());
- for (list<bluestore_wal_op_t>::iterator p = wt.ops.begin();
+ for (list<bluestore_deferred_op_t>::iterator p = wt.ops.begin();
p != wt.ops.end();
++p) {
- int r = _do_wal_op(txc, *p);
+ int r = _do_deferred_op(txc, *p);
assert(r == 0);
}
return 0;
}
-int BlueStore::_wal_finish(TransContext *txc)
+int BlueStore::_deferred_finish(TransContext *txc)
{
- bluestore_wal_transaction_t& wt = *txc->wal_txn;
+ bluestore_deferred_transaction_t& wt = *txc->deferred_txn;
dout(20) << __func__ << " txc " << " seq " << wt.seq << txc << dendl;
// move released back to txc
- txc->wal_txn->released.swap(txc->released);
- assert(txc->wal_txn->released.empty());
+ txc->deferred_txn->released.swap(txc->released);
+ assert(txc->deferred_txn->released.empty());
std::lock_guard<std::mutex> l2(txc->osr->qlock);
std::lock_guard<std::mutex> l(kv_lock);
- txc->state = TransContext::STATE_WAL_CLEANUP;
+ txc->state = TransContext::STATE_DEFERRED_CLEANUP;
txc->osr->qcond.notify_all();
- wal_cleanup_queue.push_back(txc);
+ deferred_cleanup_queue.push_back(txc);
kv_cond.notify_one();
return 0;
}
-int BlueStore::_do_wal_op(TransContext *txc, bluestore_wal_op_t& wo)
+int BlueStore::_do_deferred_op(TransContext *txc, bluestore_deferred_op_t& wo)
{
switch (wo.op) {
- case bluestore_wal_op_t::OP_WRITE:
+ case bluestore_deferred_op_t::OP_WRITE:
{
dout(20) << __func__ << " write " << wo.extents << dendl;
- logger->inc(l_bluestore_wal_write_ops);
- logger->inc(l_bluestore_wal_write_bytes, wo.data.length());
+ logger->inc(l_bluestore_deferred_write_ops);
+ logger->inc(l_bluestore_deferred_write_bytes, wo.data.length());
bufferlist::iterator p = wo.data.begin();
for (auto& e : wo.extents) {
bufferlist bl;
break;
default:
- assert(0 == "unrecognized wal op");
+ assert(0 == "unrecognized deferred op");
}
return 0;
}
-int BlueStore::_wal_replay()
+int BlueStore::_deferred_replay()
{
dout(10) << __func__ << " start" << dendl;
OpSequencerRef osr = new OpSequencer(cct);
int count = 0;
- KeyValueDB::Iterator it = db->get_iterator(PREFIX_WAL);
+ KeyValueDB::Iterator it = db->get_iterator(PREFIX_DEFERRED);
for (it->lower_bound(string()); it->valid(); it->next(), ++count) {
dout(20) << __func__ << " replay " << pretty_binary_string(it->key())
<< dendl;
- bluestore_wal_transaction_t *wal_txn = new bluestore_wal_transaction_t;
+ bluestore_deferred_transaction_t *deferred_txn = new bluestore_deferred_transaction_t;
bufferlist bl = it->value();
bufferlist::iterator p = bl.begin();
try {
- ::decode(*wal_txn, p);
+ ::decode(*deferred_txn, p);
} catch (buffer::error& e) {
- derr << __func__ << " failed to decode wal txn "
+ derr << __func__ << " failed to decode deferred txn "
<< pretty_binary_string(it->key()) << dendl;
- delete wal_txn;
+ delete deferred_txn;
return -EIO;
}
TransContext *txc = _txc_create(osr.get());
- txc->wal_txn = wal_txn;
+ txc->deferred_txn = deferred_txn;
txc->state = TransContext::STATE_KV_DONE;
_txc_state_proc(txc);
}
_txc_write_nodes(txc, txc->t);
- // journal wal items
- if (txc->wal_txn) {
- // move releases to after wal
- txc->wal_txn->released.swap(txc->released);
+ // journal deferred items
+ if (txc->deferred_txn) {
+ // move releases to after deferred
+ txc->deferred_txn->released.swap(txc->released);
assert(txc->released.empty());
- txc->wal_txn->seq = ++wal_seq;
+ txc->deferred_txn->seq = ++deferred_seq;
bufferlist bl;
- ::encode(*txc->wal_txn, bl);
+ ::encode(*txc->deferred_txn, bl);
string key;
- get_wal_key(txc->wal_txn->seq, &key);
- txc->t->set(PREFIX_WAL, key, bl);
+ get_deferred_key(txc->deferred_txn->seq, &key);
+ txc->t->set(PREFIX_DEFERRED, key, bl);
}
if (handle)
handle->suspend_tp_timeout();
_op_queue_reserve_throttle(txc);
- _op_queue_reserve_wal_throttle(txc);
+ _op_queue_reserve_deferred_throttle(txc);
if (handle)
handle->reset_tp_timeout();
logger->set(l_bluestore_cur_bytes_in_queue, throttle_bytes.get_current());
}
-void BlueStore::_op_queue_reserve_wal_throttle(TransContext *txc)
+void BlueStore::_op_queue_reserve_deferred_throttle(TransContext *txc)
{
- throttle_wal_ops.get(txc->ops);
- throttle_wal_bytes.get(txc->bytes);
+ throttle_deferred_ops.get(txc->ops);
+ throttle_deferred_bytes.get(txc->bytes);
- logger->set(l_bluestore_cur_ops_in_wal_queue, throttle_wal_ops.get_current());
- logger->set(l_bluestore_cur_bytes_in_wal_queue, throttle_wal_bytes.get_current());
+ logger->set(l_bluestore_cur_ops_in_deferred_queue, throttle_deferred_ops.get_current());
+ logger->set(l_bluestore_cur_bytes_in_deferred_queue, throttle_deferred_bytes.get_current());
}
-void BlueStore::_op_queue_release_wal_throttle(TransContext *txc)
+void BlueStore::_op_queue_release_deferred_throttle(TransContext *txc)
{
- throttle_wal_ops.put(txc->ops);
- throttle_wal_bytes.put(txc->bytes);
+ throttle_deferred_ops.put(txc->ops);
+ throttle_deferred_bytes.put(txc->bytes);
- logger->set(l_bluestore_cur_ops_in_wal_queue, throttle_wal_ops.get_current());
- logger->set(l_bluestore_cur_bytes_in_wal_queue, throttle_wal_bytes.get_current());
+ logger->set(l_bluestore_cur_ops_in_deferred_queue, throttle_deferred_ops.get_current());
+ logger->set(l_bluestore_cur_bytes_in_deferred_queue, throttle_deferred_bytes.get_current());
}
void BlueStore::_txc_aio_submit(TransContext *txc)
wctx->buffered ? 0 : Buffer::FLAG_NOCACHE);
if (!g_conf->bluestore_debug_omit_block_device_write) {
- if (b_len <= prefer_wal_size) {
+ if (b_len <= prefer_deferred_size) {
dout(20) << __func__ << " defering small 0x" << std::hex
- << b_len << std::dec << " unused write via wal" << dendl;
- bluestore_wal_op_t *op = _get_wal_op(txc, o);
- op->op = bluestore_wal_op_t::OP_WRITE;
+ << b_len << std::dec << " unused write via deferred" << dendl;
+ bluestore_deferred_op_t *op = _get_deferred_op(txc, o);
+ op->op = bluestore_deferred_op_t::OP_WRITE;
b->get_blob().map(
b_off, b_len,
[&](uint64_t offset, uint64_t length) {
logger->inc(l_bluestore_write_small_pre_read);
}
- // chunk-aligned wal overwrite?
+ // chunk-aligned deferred overwrite?
if (b->get_blob().get_ondisk_length() >= b_off + b_len &&
b_off % chunk_size == 0 &&
b_len % chunk_size == 0 &&
b->get_blob().is_allocated(b_off, b_len)) {
- bluestore_wal_op_t *op = _get_wal_op(txc, o);
- op->op = bluestore_wal_op_t::OP_WRITE;
+ bluestore_deferred_op_t *op = _get_deferred_op(txc, o);
+ op->op = bluestore_deferred_op_t::OP_WRITE;
_buffer_cache_write(txc, b, b_off, padded,
wctx->buffered ? 0 : Buffer::FLAG_NOCACHE);
b->dirty_blob().calc_csum(b_off, padded);
}
op->data.claim(padded);
- dout(20) << __func__ << " wal write 0x" << std::hex << b_off << "~"
+ dout(20) << __func__ << " deferred write 0x" << std::hex << b_off << "~"
<< b_len << std::dec << " of mutable " << *b
<< " at " << op->extents << dendl;
Extent *le = o->extent_map.set_lextent(offset, offset - bstart, length,
b->dirty_blob().mark_used(le->blob_offset, le->length);
txc->statfs_delta.stored() += le->length;
dout(20) << __func__ << " lex " << *le << dendl;
- logger->inc(l_bluestore_write_small_wal);
+ logger->inc(l_bluestore_write_small_deferred);
return;
}
// queue io
if (!g_conf->bluestore_debug_omit_block_device_write) {
- if (l->length() <= prefer_wal_size) {
+ if (l->length() <= prefer_deferred_size) {
dout(20) << __func__ << " defering small 0x" << std::hex
- << l->length() << std::dec << " write via wal" << dendl;
- bluestore_wal_op_t *op = _get_wal_op(txc, o);
- op->op = bluestore_wal_op_t::OP_WRITE;
+ << l->length() << std::dec << " write via deferred" << dendl;
+ bluestore_deferred_op_t *op = _get_deferred_op(txc, o);
+ op->op = bluestore_deferred_op_t::OP_WRITE;
b->get_blob().map(
b_off, l->length(),
[&](uint64_t offset, uint64_t length) {
_dump_onode(o);
- // ensure any wal IO has completed before we truncate off any extents
+ // ensure any deferred IO has completed before we truncate off any extents
// they may touch.
o->flush();
return 0;
if (offset < o->onode.size) {
- // ensure any wal IO has completed before we truncate off any extents
+ // ensure any deferred IO has completed before we truncate off any extents
// they may touch.
o->flush();
uint64_t num_super = 0;
uint64_t num_coll = 0;
uint64_t num_omap = 0;
- uint64_t num_wal = 0;
+ uint64_t num_deferred = 0;
uint64_t num_alloc = 0;
uint64_t num_stat = 0;
uint64_t num_others = 0;
} else if (key.first == PREFIX_OMAP) {
hist.update_hist_entry(hist.key_hist, PREFIX_OMAP, key_size, value_size);
num_omap++;
- } else if (key.first == PREFIX_WAL) {
- hist.update_hist_entry(hist.key_hist, PREFIX_WAL, key_size, value_size);
- num_wal++;
+ } else if (key.first == PREFIX_DEFERRED) {
+ hist.update_hist_entry(hist.key_hist, PREFIX_DEFERRED, key_size, value_size);
+ num_deferred++;
} else if (key.first == PREFIX_ALLOC || key.first == "b" ) {
hist.update_hist_entry(hist.key_hist, PREFIX_ALLOC, key_size, value_size);
num_alloc++;
f->dump_unsigned("num_super", num_super);
f->dump_unsigned("num_coll", num_coll);
f->dump_unsigned("num_omap", num_omap);
- f->dump_unsigned("num_wal", num_wal);
+ f->dump_unsigned("num_deferred", num_deferred);
f->dump_unsigned("num_alloc", num_alloc);
f->dump_unsigned("num_stat", num_stat);
f->dump_unsigned("num_shared_shards", num_shared_shards);
l_bluestore_state_kv_queued_lat,
l_bluestore_state_kv_committing_lat,
l_bluestore_state_kv_done_lat,
- l_bluestore_state_wal_queued_lat,
- l_bluestore_state_wal_applying_lat,
- l_bluestore_state_wal_aio_wait_lat,
- l_bluestore_state_wal_cleanup_lat,
+ l_bluestore_state_deferred_queued_lat,
+ l_bluestore_state_deferred_applying_lat,
+ l_bluestore_state_deferred_aio_wait_lat,
+ l_bluestore_state_deferred_cleanup_lat,
l_bluestore_state_finishing_lat,
l_bluestore_state_done_lat,
l_bluestore_submit_lat,
l_bluestore_compress_success_count,
l_bluestore_compress_rejected_count,
l_bluestore_write_pad_bytes,
- l_bluestore_wal_write_ops,
- l_bluestore_wal_write_bytes,
+ l_bluestore_deferred_write_ops,
+ l_bluestore_deferred_write_bytes,
l_bluestore_write_penalty_read_ops,
l_bluestore_allocated,
l_bluestore_stored,
l_bluestore_write_small,
l_bluestore_write_small_bytes,
l_bluestore_write_small_unused,
- l_bluestore_write_small_wal,
+ l_bluestore_write_small_deferred,
l_bluestore_write_small_pre_read,
l_bluestore_write_small_new,
l_bluestore_cur_ops_in_queue,
l_bluestore_cur_bytes_in_queue,
- l_bluestore_cur_ops_in_wal_queue,
- l_bluestore_cur_bytes_in_wal_queue,
+ l_bluestore_cur_ops_in_deferred_queue,
+ l_bluestore_cur_bytes_in_deferred_queue,
l_bluestore_txc,
l_bluestore_onode_reshard,
std::atomic<int> flushing_count = {0};
std::mutex flush_lock; ///< protect flush_txns
std::condition_variable flush_cond; ///< wait here for unapplied txns
- set<TransContext*> flush_txns; ///< committing or wal txns
+ set<TransContext*> flush_txns; ///< committing or deferred txns
Onode(Collection *c, const ghobject_t& o,
const mempool::bluestore_meta_other::string& k)
STATE_KV_QUEUED, // queued for kv_sync_thread submission
STATE_KV_SUBMITTED, // submitted to kv; not yet synced
STATE_KV_DONE,
- STATE_WAL_QUEUED,
- STATE_WAL_APPLYING,
- STATE_WAL_AIO_WAIT,
- STATE_WAL_CLEANUP, // remove wal kv record
- STATE_WAL_DONE,
+ STATE_DEFERRED_QUEUED,
+ STATE_DEFERRED_APPLYING,
+ STATE_DEFERRED_AIO_WAIT,
+ STATE_DEFERRED_CLEANUP, // remove deferred kv record
+ STATE_DEFERRED_DONE,
STATE_FINISHING,
STATE_DONE,
} state_t;
- state_t state;
+ state_t state = STATE_PREPARE;
const char *get_state_name() {
switch (state) {
case STATE_KV_QUEUED: return "kv_queued";
case STATE_KV_SUBMITTED: return "kv_submitted";
case STATE_KV_DONE: return "kv_done";
- case STATE_WAL_QUEUED: return "wal_queued";
- case STATE_WAL_APPLYING: return "wal_applying";
- case STATE_WAL_AIO_WAIT: return "wal_aio_wait";
- case STATE_WAL_CLEANUP: return "wal_cleanup";
- case STATE_WAL_DONE: return "wal_done";
+ case STATE_DEFERRED_QUEUED: return "deferred_queued";
+ case STATE_DEFERRED_APPLYING: return "deferred_applying";
+ case STATE_DEFERRED_AIO_WAIT: return "deferred_aio_wait";
+ case STATE_DEFERRED_CLEANUP: return "deferred_cleanup";
+ case STATE_DEFERRED_DONE: return "deferred_done";
case STATE_FINISHING: return "finishing";
case STATE_DONE: return "done";
}
case l_bluestore_state_kv_queued_lat: return "kv_queued";
case l_bluestore_state_kv_committing_lat: return "kv_committing";
case l_bluestore_state_kv_done_lat: return "kv_done";
- case l_bluestore_state_wal_queued_lat: return "wal_queued";
- case l_bluestore_state_wal_applying_lat: return "wal_applying";
- case l_bluestore_state_wal_aio_wait_lat: return "wal_aio_wait";
- case l_bluestore_state_wal_cleanup_lat: return "wal_cleanup";
+ case l_bluestore_state_deferred_queued_lat: return "deferred_queued";
+ case l_bluestore_state_deferred_applying_lat: return "deferred_applying";
+ case l_bluestore_state_deferred_aio_wait_lat: return "deferred_aio_wait";
+ case l_bluestore_state_deferred_cleanup_lat: return "deferred_cleanup";
case l_bluestore_state_finishing_lat: return "finishing";
case l_bluestore_state_done_lat: return "done";
}
OpSequencerRef osr;
boost::intrusive::list_member_hook<> sequencer_item;
- uint64_t ops, bytes;
+ uint64_t ops = 0, bytes = 0;
set<OnodeRef> onodes; ///< these need to be updated/written
set<OnodeRef> modified_objects; ///< objects we modified (and need a ref)
set<SharedBlobRef> shared_blobs_written; ///< update these on io completion
KeyValueDB::Transaction t; ///< then we will commit this
- Context *oncommit; ///< signal on commit
- Context *onreadable; ///< signal on readable
- Context *onreadable_sync; ///< signal on readable
+ Context *oncommit = nullptr; ///< signal on commit
+ Context *onreadable = nullptr; ///< signal on readable
+ Context *onreadable_sync = nullptr; ///< signal on readable
list<Context*> oncommits; ///< more commit completions
list<CollectionRef> removed_collections; ///< colls we removed
- boost::intrusive::list_member_hook<> wal_queue_item;
- bluestore_wal_transaction_t *wal_txn; ///< wal transaction (if any)
+ boost::intrusive::list_member_hook<> deferred_queue_item;
+ bluestore_deferred_transaction_t *deferred_txn = nullptr; ///< if any
interval_set<uint64_t> allocated, released;
struct volatile_statfs{
uint64_t last_blobid = 0; ///< if non-zero, highest new blobid we allocated
explicit TransContext(CephContext* cct, OpSequencer *o)
- : state(STATE_PREPARE),
- osr(o),
- ops(0),
- bytes(0),
- oncommit(NULL),
- onreadable(NULL),
- onreadable_sync(NULL),
- wal_txn(NULL),
+ : osr(o),
ioc(cct, this),
start(ceph_clock_now()) {
- last_stamp = start;
+ last_stamp = start;
}
~TransContext() {
- delete wal_txn;
+ delete deferred_txn;
}
void write_onode(OnodeRef &o) {
boost::intrusive::member_hook<
TransContext,
boost::intrusive::list_member_hook<>,
- &TransContext::wal_queue_item> > wal_queue_t;
- wal_queue_t wal_q; ///< transactions
+ &TransContext::deferred_queue_item> > deferred_queue_t;
+ deferred_queue_t deferred_q; ///< transactions
- boost::intrusive::list_member_hook<> wal_osr_queue_item;
+ boost::intrusive::list_member_hook<> deferred_osr_queue_item;
Sequencer *parent;
- std::mutex wal_apply_mutex;
+ std::mutex deferred_apply_mutex;
uint64_t last_seq = 0;
}
};
- class WALWQ : public ThreadPool::WorkQueue<TransContext> {
- // We need to order WAL items within each Sequencer. To do that,
+ class DeferredWQ : public ThreadPool::WorkQueue<TransContext> {
+ // We need to order DEFERRED items within each Sequencer. To do that,
// queue each txc under osr, and queue the osr's here. When we
// dequeue an txc, requeue the osr if there are more pending, and
// do it at the end of the list so that the next thread does not
- // get a conflicted txc. Hold an osr mutex while doing the wal to
+ // get a conflicted txc. Hold an osr mutex while doing the deferred to
// preserve the ordering.
public:
typedef boost::intrusive::list<
boost::intrusive::member_hook<
OpSequencer,
boost::intrusive::list_member_hook<>,
- &OpSequencer::wal_osr_queue_item> > wal_osr_queue_t;
+ &OpSequencer::deferred_osr_queue_item> > deferred_osr_queue_t;
private:
BlueStore *store;
- wal_osr_queue_t wal_queue;
+ deferred_osr_queue_t deferred_queue;
public:
- WALWQ(BlueStore *s, time_t ti, time_t sti, ThreadPool *tp)
- : ThreadPool::WorkQueue<TransContext>("BlueStore::WALWQ", ti, sti, tp),
+ DeferredWQ(BlueStore *s, time_t ti, time_t sti, ThreadPool *tp)
+ : ThreadPool::WorkQueue<TransContext>("BlueStore::DeferredWQ", ti, sti,
+ tp),
store(s) {
}
bool _empty() {
- return wal_queue.empty();
+ return deferred_queue.empty();
}
bool _enqueue(TransContext *i) {
- if (i->osr->wal_q.empty()) {
- wal_queue.push_back(*i->osr);
+ if (i->osr->deferred_q.empty()) {
+ deferred_queue.push_back(*i->osr);
}
- i->osr->wal_q.push_back(*i);
+ i->osr->deferred_q.push_back(*i);
return true;
}
void _dequeue(TransContext *p) {
assert(0 == "not needed, not implemented");
}
TransContext *_dequeue() {
- if (wal_queue.empty())
+ if (deferred_queue.empty())
return NULL;
- OpSequencer *osr = &wal_queue.front();
- TransContext *i = &osr->wal_q.front();
- osr->wal_q.pop_front();
- wal_queue.pop_front();
- if (!osr->wal_q.empty()) {
+ OpSequencer *osr = &deferred_queue.front();
+ TransContext *i = &osr->deferred_q.front();
+ osr->deferred_q.pop_front();
+ deferred_queue.pop_front();
+ if (!osr->deferred_q.empty()) {
// requeue at the end to minimize contention
- wal_queue.push_back(*i->osr);
+ deferred_queue.push_back(*i->osr);
}
- // preserve wal ordering for this sequencer by taking the lock
+ // preserve deferred ordering for this sequencer by taking the lock
// while still holding the queue lock
- i->osr->wal_apply_mutex.lock();
+ i->osr->deferred_apply_mutex.lock();
return i;
}
void _process(TransContext *i, ThreadPool::TPHandle &) override {
- store->_wal_apply(i);
- i->osr->wal_apply_mutex.unlock();
+ store->_deferred_apply(i);
+ i->osr->deferred_apply_mutex.unlock();
}
void _clear() {
- assert(wal_queue.empty());
+ assert(deferred_queue.empty());
}
void flush() {
std::atomic<uint64_t> blobid_max = {0};
Throttle throttle_ops, throttle_bytes; ///< submit to commit
- Throttle throttle_wal_ops, throttle_wal_bytes; ///< submit to wal complete
+ Throttle throttle_deferred_ops, throttle_deferred_bytes; ///< submit to deferred complete
interval_set<uint64_t> bluefs_extents; ///< block extents owned by bluefs
interval_set<uint64_t> bluefs_extents_reclaiming; ///< currently reclaiming
- std::mutex wal_lock;
- std::atomic<uint64_t> wal_seq = {0};
- ThreadPool wal_tp;
- WALWQ wal_wq;
+ std::mutex deferred_lock;
+ std::atomic<uint64_t> deferred_seq = {0};
+ ThreadPool deferred_tp;
+ DeferredWQ deferred_wq;
int m_finisher_num;
vector<Finisher*> finishers;
deque<TransContext*> kv_queue; ///< ready, already submitted
deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
deque<TransContext*> kv_committing; ///< currently syncing
- deque<TransContext*> wal_cleanup_queue; ///< wal done, ready for cleanup
+ deque<TransContext*> deferred_cleanup_queue; ///< deferred done, ready for cleanup
PerfCounters *logger;
uint64_t min_alloc_size = 0; ///< minimum allocation unit (power of 2)
size_t min_alloc_size_order = 0; ///< bits for min_alloc_size
- uint64_t prefer_wal_size = 0; ///< size threshold for forced wal writes
+ uint64_t prefer_deferred_size = 0; ///< size threshold for forced deferred writes
uint64_t max_alloc_size = 0; ///< maximum allocation unit (power of 2)
- bool sync_wal_apply; ///< see config option bluestore_sync_wal_apply
+ bool sync_deferred_apply; ///< see config option bluestore_sync_deferred_apply
std::atomic<Compressor::CompressionMode> comp_mode = {Compressor::COMP_NONE}; ///< compression mode
CompressorRef compressor;
}
}
- bluestore_wal_op_t *_get_wal_op(TransContext *txc, OnodeRef o);
- int _wal_apply(TransContext *txc);
- int _wal_finish(TransContext *txc);
- int _do_wal_op(TransContext *txc, bluestore_wal_op_t& wo);
- int _wal_replay();
+ bluestore_deferred_op_t *_get_deferred_op(TransContext *txc, OnodeRef o);
+ int _deferred_apply(TransContext *txc);
+ int _deferred_finish(TransContext *txc);
+ int _do_deferred_op(TransContext *txc, bluestore_deferred_op_t& wo);
+ int _deferred_replay();
int _fsck_check_extents(
const ghobject_t& oid,
void _op_queue_reserve_throttle(TransContext *txc);
void _op_queue_release_throttle(TransContext *txc);
- void _op_queue_reserve_wal_throttle(TransContext *txc);
- void _op_queue_release_wal_throttle(TransContext *txc);
+ void _op_queue_reserve_deferred_throttle(TransContext *txc);
+ void _op_queue_release_deferred_throttle(TransContext *txc);
};
inline ostream& operator<<(ostream& out, const BlueStore::OpSequencer& s) {