OPTION(bluestore_fsck_on_mkfs, OPT_BOOL, true)
OPTION(bluestore_fsck_on_mkfs_deep, OPT_BOOL, false)
OPTION(bluestore_sync_submit_transaction, OPT_BOOL, false) // submit kv txn in queueing thread (not kv_sync_thread)
-OPTION(bluestore_sync_deferred_apply, OPT_BOOL, true) // perform initial wal work synchronously (possibly in combination with aio so we only *queue* ios)
-OPTION(bluestore_deferred_threads, OPT_INT, 4)
-OPTION(bluestore_deferred_thread_timeout, OPT_INT, 30)
-OPTION(bluestore_deferred_thread_suicide_timeout, OPT_INT, 120)
OPTION(bluestore_max_ops, OPT_U64, 512)
OPTION(bluestore_max_bytes, OPT_U64, 64*1024*1024)
OPTION(bluestore_deferred_max_ops, OPT_U64, 512)
OPTION(bluestore_debug_inject_read_err, OPT_BOOL, false)
OPTION(bluestore_debug_randomize_serial_transaction, OPT_INT, 0)
OPTION(bluestore_debug_omit_block_device_write, OPT_BOOL, false)
-OPTION(bluestore_inject_deferred_apply_delay, OPT_FLOAT, 0)
OPTION(bluestore_shard_finishers, OPT_BOOL, false)
OPTION(kstore_max_ops, OPT_U64, 512)
throttle_deferred_bytes(cct, "bluestore_deferred_max_bytes",
cct->_conf->bluestore_max_bytes +
cct->_conf->bluestore_deferred_max_bytes),
- deferred_tp(cct,
- "BlueStore::deferred_tp",
- "tp_deferred",
- cct->_conf->bluestore_sync_deferred_apply ? 0 : cct->_conf->bluestore_deferred_threads,
- "bluestore_deferred_threads"),
- deferred_wq(this,
- cct->_conf->bluestore_deferred_thread_timeout,
- cct->_conf->bluestore_deferred_thread_suicide_timeout,
- &deferred_tp),
m_finisher_num(1),
kv_sync_thread(this),
kv_stop(false),
logger(NULL),
debug_read_error_lock("BlueStore::debug_read_error_lock"),
csum_type(Checksummer::CSUM_CRC32C),
- sync_deferred_apply(cct->_conf->bluestore_sync_deferred_apply),
mempool_thread(this)
{
_init_logger();
throttle_deferred_bytes(cct, "bluestore_deferred_max_bytes",
cct->_conf->bluestore_max_bytes +
cct->_conf->bluestore_deferred_max_bytes),
- deferred_tp(cct,
- "BlueStore::deferred_tp",
- "tp_deferred",
- cct->_conf->bluestore_sync_deferred_apply ? 0 : cct->_conf->bluestore_deferred_threads,
- "bluestore_deferred_threads"),
- deferred_wq(this,
- cct->_conf->bluestore_deferred_thread_timeout,
- cct->_conf->bluestore_deferred_thread_suicide_timeout,
- &deferred_tp),
m_finisher_num(1),
kv_sync_thread(this),
kv_stop(false),
csum_type(Checksummer::CSUM_CRC32C),
min_alloc_size(_min_alloc_size),
min_alloc_size_order(ctz(_min_alloc_size)),
- sync_deferred_apply(cct->_conf->bluestore_sync_deferred_apply),
mempool_thread(this)
{
_init_logger();
"Average kv_done state latency");
b.add_time_avg(l_bluestore_state_deferred_queued_lat, "state_deferred_queued_lat",
"Average deferred_queued state latency");
- b.add_time_avg(l_bluestore_state_deferred_applying_lat, "state_deferred_applying_lat",
- "Average deferred_applying state latency");
b.add_time_avg(l_bluestore_state_deferred_aio_wait_lat, "state_deferred_aio_wait_lat",
"Average aio_wait state latency");
b.add_time_avg(l_bluestore_state_deferred_cleanup_lat, "state_deferred_cleanup_lat",
for (auto f : finishers) {
f->start();
}
- deferred_tp.start();
kv_sync_thread.create("bstore_kv_sync");
r = _deferred_replay();
out_stop:
_kv_stop();
- deferred_wq.drain();
- deferred_tp.stop();
for (auto f : finishers) {
f->wait_for_empty();
f->stop();
dout(20) << __func__ << " stopping kv thread" << dendl;
_kv_stop();
- dout(20) << __func__ << " draining deferred_wq" << dendl;
- deferred_wq.drain();
- dout(20) << __func__ << " stopping deferred_tp" << dendl;
- deferred_tp.stop();
for (auto f : finishers) {
dout(20) << __func__ << " draining finisher" << dendl;
f->wait_for_empty();
txc->log_state_latency(logger, l_bluestore_state_kv_done_lat);
if (txc->deferred_txn) {
txc->state = TransContext::STATE_DEFERRED_QUEUED;
- if (sync_deferred_apply) {
- _deferred_apply(txc);
- } else {
- deferred_wq.queue(txc);
- }
+ _deferred_queue(txc);
return;
}
txc->state = TransContext::STATE_FINISHING;
break;
- case TransContext::STATE_DEFERRED_APPLYING:
- txc->log_state_latency(logger, l_bluestore_state_deferred_applying_lat);
- if (txc->ioc.has_pending_aios()) {
- txc->state = TransContext::STATE_DEFERRED_AIO_WAIT;
- _txc_aio_submit(txc);
- return;
- }
- // ** fall-thru **
-
case TransContext::STATE_DEFERRED_AIO_WAIT:
txc->log_state_latency(logger, l_bluestore_state_deferred_aio_wait_lat);
_deferred_finish(txc);
dout(10) << __func__ << " finish" << dendl;
}
-bluestore_deferred_op_t *BlueStore::_get_deferred_op(TransContext *txc, OnodeRef o)
+bluestore_deferred_op_t *BlueStore::_get_deferred_op(
+ TransContext *txc, OnodeRef o)
{
if (!txc->deferred_txn) {
txc->deferred_txn = new bluestore_deferred_transaction_t;
return &txc->deferred_txn->ops.back();
}
-int BlueStore::_deferred_apply(TransContext *txc)
+void BlueStore::_deferred_queue(TransContext *txc)
{
- bluestore_deferred_transaction_t& wt = *txc->deferred_txn;
- dout(20) << __func__ << " txc " << txc << " seq " << wt.seq << dendl;
- txc->log_state_latency(logger, l_bluestore_state_deferred_queued_lat);
- txc->state = TransContext::STATE_DEFERRED_APPLYING;
-
- if (cct->_conf->bluestore_inject_deferred_apply_delay) {
- dout(20) << __func__ << " bluestore_inject_deferred_apply_delay "
- << cct->_conf->bluestore_inject_deferred_apply_delay
- << dendl;
- utime_t t;
- t.set_from_double(cct->_conf->bluestore_inject_deferred_apply_delay);
- t.sleep();
- dout(20) << __func__ << " finished sleep" << dendl;
+ dout(20) << __func__ << " txc " << txc << " on " << txc->osr << dendl;
+ std::lock_guard<std::mutex> l(deferred_lock);
+ if (txc->osr->deferred_pending.empty() &&
+ txc->osr->deferred_running.empty()) {
+ deferred_queue.push_back(*txc->osr);
+ }
+ txc->osr->deferred_pending.push_back(*txc);
+ if (txc->osr->deferred_running.empty()) {
+ _deferred_try_submit(txc->osr.get());
}
+}
- assert(txc->ioc.pending_aios.empty());
- for (list<bluestore_deferred_op_t>::iterator p = wt.ops.begin();
- p != wt.ops.end();
- ++p) {
- int r = _do_deferred_op(txc, *p);
- assert(r == 0);
+void BlueStore::_deferred_try_submit()
+{
+ dout(20) << __func__ << " " << deferred_queue.size() << " osrs" << dendl;
+ for (auto& osr : deferred_queue) {
+ if (osr.deferred_running.empty()) {
+ _deferred_try_submit(&osr);
+ }
}
+}
- _txc_state_proc(txc);
- return 0;
+void BlueStore::_deferred_try_submit(OpSequencer *osr)
+{
+ dout(10) << __func__ << " osr " << osr << " " << osr->deferred_pending.size()
+ << " pending " << dendl;
+ assert(osr->deferred_running.empty());
+ osr->deferred_pending.swap(osr->deferred_running);
+
+ // attach all IO to the last in the batch
+ TransContext *last = &osr->deferred_running.back();
+
+ // reverse order
+ for (auto i = osr->deferred_running.rbegin();
+ i != osr->deferred_running.rend();
+ ++i) {
+ TransContext *txc = &*i;
+ bluestore_deferred_transaction_t& wt = *txc->deferred_txn;
+ dout(20) << __func__ << " txc " << txc << " seq " << wt.seq << dendl;
+ txc->log_state_latency(logger, l_bluestore_state_deferred_queued_lat);
+ txc->state = TransContext::STATE_DEFERRED_AIO_WAIT;
+ for (auto opi = wt.ops.rbegin(); opi != wt.ops.rend(); ++opi) {
+ const auto& op = *opi;
+ uint64_t bl_offset = 0;
+ for (auto e : op.extents) {
+ interval_set<uint64_t> cur;
+ cur.insert(e.offset, e.length);
+ interval_set<uint64_t> overlap;
+ overlap.intersection_of(cur, osr->deferred_blocks);
+ cur.subtract(overlap);
+ dout(20) << __func__ << " txc " << txc << " " << e << std::hex
+ << " overlap 0x" << overlap << " new 0x" << cur
+ << " from bl_offset 0x" << bl_offset << std::dec << dendl;
+ for (auto j = cur.begin(); j != cur.end(); ++j) {
+ bufferlist bl;
+ bl.substr_of(op.data, bl_offset + j.get_start() - e.offset,
+ j.get_len());
+ if (!g_conf->bluestore_debug_omit_block_device_write) {
+ int r = bdev->aio_write(j.get_start(), bl, &last->ioc, false);
+ assert(r == 0);
+ }
+ txc->osr->deferred_blocks.insert(j.get_start(), j.get_len());
+ }
+ bl_offset += e.length;
+ }
+ }
+ }
+ osr->deferred_txc = last;
+ dout(20) << " osr " << osr << " deferred_blocks 0x" << std::hex
+ << osr->deferred_blocks << std::dec << dendl;
+ _txc_aio_submit(last);
}
int BlueStore::_deferred_finish(TransContext *txc)
{
bluestore_deferred_transaction_t& wt = *txc->deferred_txn;
- dout(20) << __func__ << " txc " << " seq " << wt.seq << txc << dendl;
+ dout(20) << __func__ << " txc " << txc << " seq " << wt.seq << dendl;
+
+ OpSequencer::deferred_queue_t finished;
+ {
+ std::lock_guard<std::mutex> l(deferred_lock);
+ assert(txc->osr->deferred_txc == txc);
+ txc->osr->deferred_blocks.clear();
+ finished.swap(txc->osr->deferred_running);
+ if (!txc->osr->deferred_pending.empty()) {
+ _deferred_try_submit(txc->osr.get());
+ } else {
+ auto q = deferred_queue.iterator_to(*txc->osr);
+ deferred_queue.erase(q);
+ }
+ }
+
std::lock_guard<std::mutex> l2(txc->osr->qlock);
std::lock_guard<std::mutex> l(kv_lock);
- txc->state = TransContext::STATE_DEFERRED_CLEANUP;
- txc->osr->qcond.notify_all();
- throttle_deferred_ops.put(txc->ops);
- throttle_deferred_bytes.put(txc->bytes);
- deferred_cleanup_queue.push_back(txc);
+ for (auto& i : finished) {
+ TransContext *txc = &i;
+ txc->state = TransContext::STATE_DEFERRED_CLEANUP;
+ txc->osr->qcond.notify_all();
+ throttle_deferred_ops.put(txc->ops);
+ throttle_deferred_bytes.put(txc->bytes);
+ deferred_cleanup_queue.push_back(txc);
+ }
+ finished.clear();
kv_cond.notify_one();
return 0;
}
int r = bdev->aio_write(e.offset, bl, &txc->ioc, false);
assert(r == 0);
}
+ txc->osr->deferred_blocks.insert(e.offset, e.length);
}
}
break;
if (!g_conf->bluestore_debug_omit_block_device_write) {
if (b_len <= prefer_deferred_size) {
- dout(20) << __func__ << " defering small 0x" << std::hex
+ dout(20) << __func__ << " deferring small 0x" << std::hex
<< b_len << std::dec << " unused write via deferred" << dendl;
bluestore_deferred_op_t *op = _get_deferred_op(txc, o);
op->op = bluestore_deferred_op_t::OP_WRITE;
// queue io
if (!g_conf->bluestore_debug_omit_block_device_write) {
if (l->length() <= prefer_deferred_size) {
- dout(20) << __func__ << " defering small 0x" << std::hex
+ dout(20) << __func__ << " deferring small 0x" << std::hex
<< l->length() << std::dec << " write via deferred" << dendl;
bluestore_deferred_op_t *op = _get_deferred_op(txc, o);
op->op = bluestore_deferred_op_t::OP_WRITE;
l_bluestore_state_kv_committing_lat,
l_bluestore_state_kv_done_lat,
l_bluestore_state_deferred_queued_lat,
- l_bluestore_state_deferred_applying_lat,
l_bluestore_state_deferred_aio_wait_lat,
l_bluestore_state_deferred_cleanup_lat,
l_bluestore_state_finishing_lat,
STATE_KV_QUEUED, // queued for kv_sync_thread submission
STATE_KV_SUBMITTED, // submitted to kv; not yet synced
STATE_KV_DONE,
- STATE_DEFERRED_QUEUED,
- STATE_DEFERRED_APPLYING,
- STATE_DEFERRED_AIO_WAIT,
+ STATE_DEFERRED_QUEUED, // in deferred_queue
+ STATE_DEFERRED_AIO_WAIT, // aio in flight, waiting for completion
STATE_DEFERRED_CLEANUP, // remove deferred kv record
STATE_DEFERRED_DONE,
STATE_FINISHING,
case STATE_KV_SUBMITTED: return "kv_submitted";
case STATE_KV_DONE: return "kv_done";
case STATE_DEFERRED_QUEUED: return "deferred_queued";
- case STATE_DEFERRED_APPLYING: return "deferred_applying";
case STATE_DEFERRED_AIO_WAIT: return "deferred_aio_wait";
case STATE_DEFERRED_CLEANUP: return "deferred_cleanup";
case STATE_DEFERRED_DONE: return "deferred_done";
case l_bluestore_state_kv_committing_lat: return "kv_committing";
case l_bluestore_state_kv_done_lat: return "kv_done";
case l_bluestore_state_deferred_queued_lat: return "deferred_queued";
- case l_bluestore_state_deferred_applying_lat: return "deferred_applying";
case l_bluestore_state_deferred_aio_wait_lat: return "deferred_aio_wait";
case l_bluestore_state_deferred_cleanup_lat: return "deferred_cleanup";
case l_bluestore_state_finishing_lat: return "finishing";
TransContext,
boost::intrusive::list_member_hook<>,
&TransContext::deferred_queue_item> > deferred_queue_t;
- deferred_queue_t deferred_q; ///< transactions
+ deferred_queue_t deferred_pending; ///< waiting
+ deferred_queue_t deferred_running; ///< in flight ios
+ interval_set<uint64_t> deferred_blocks; ///< blocks in flight
+ TransContext *deferred_txc; ///< txc carrying this batch
boost::intrusive::list_member_hook<> deferred_osr_queue_item;
Sequencer *parent;
BlueStore *store;
- std::mutex deferred_apply_mutex;
-
uint64_t last_seq = 0;
std::atomic_int txc_with_unstable_io = {0}; ///< num txcs with unstable io
}
};
- class DeferredWQ : public ThreadPool::WorkQueue<TransContext> {
- // We need to order DEFERRED items within each Sequencer. To do that,
- // queue each txc under osr, and queue the osr's here. When we
- // dequeue an txc, requeue the osr if there are more pending, and
- // do it at the end of the list so that the next thread does not
- // get a conflicted txc. Hold an osr mutex while doing the deferred to
- // preserve the ordering.
- public:
- typedef boost::intrusive::list<
+ typedef boost::intrusive::list<
+ OpSequencer,
+ boost::intrusive::member_hook<
OpSequencer,
- boost::intrusive::member_hook<
- OpSequencer,
- boost::intrusive::list_member_hook<>,
- &OpSequencer::deferred_osr_queue_item> > deferred_osr_queue_t;
-
- private:
- BlueStore *store;
- deferred_osr_queue_t deferred_queue;
-
- public:
- DeferredWQ(BlueStore *s, time_t ti, time_t sti, ThreadPool *tp)
- : ThreadPool::WorkQueue<TransContext>("BlueStore::DeferredWQ", ti, sti,
- tp),
- store(s) {
- }
- bool _empty() {
- return deferred_queue.empty();
- }
- bool _enqueue(TransContext *i) {
- if (i->osr->deferred_q.empty()) {
- deferred_queue.push_back(*i->osr);
- }
- i->osr->deferred_q.push_back(*i);
- return true;
- }
- void _dequeue(TransContext *p) {
- assert(0 == "not needed, not implemented");
- }
- TransContext *_dequeue() {
- if (deferred_queue.empty())
- return NULL;
- OpSequencer *osr = &deferred_queue.front();
- TransContext *i = &osr->deferred_q.front();
- osr->deferred_q.pop_front();
- deferred_queue.pop_front();
- if (!osr->deferred_q.empty()) {
- // requeue at the end to minimize contention
- deferred_queue.push_back(*i->osr);
- }
-
- // preserve deferred ordering for this sequencer by taking the lock
- // while still holding the queue lock
- i->osr->deferred_apply_mutex.lock();
- return i;
- }
- void _process(TransContext *i, ThreadPool::TPHandle &) override {
- store->_deferred_apply(i);
- i->osr->deferred_apply_mutex.unlock();
- }
- void _clear() {
- assert(deferred_queue.empty());
- }
-
- void flush() {
- drain();
- }
- };
+ boost::intrusive::list_member_hook<>,
+ &OpSequencer::deferred_osr_queue_item> > deferred_osr_queue_t;
struct KVSyncThread : public Thread {
BlueStore *store;
std::mutex deferred_lock;
std::atomic<uint64_t> deferred_seq = {0};
- ThreadPool deferred_tp;
- DeferredWQ deferred_wq;
+ deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
int m_finisher_num;
vector<Finisher*> finishers;
uint64_t max_alloc_size = 0; ///< maximum allocation unit (power of 2)
- bool sync_deferred_apply; ///< see config option bluestore_sync_deferred_apply
-
std::atomic<Compressor::CompressionMode> comp_mode = {Compressor::COMP_NONE}; ///< compression mode
CompressorRef compressor;
std::atomic<uint64_t> comp_min_blob_size = {0};
}
bluestore_deferred_op_t *_get_deferred_op(TransContext *txc, OnodeRef o);
- int _deferred_apply(TransContext *txc);
+ void _deferred_queue(TransContext *txc);
+ void _deferred_try_submit();
+ void _deferred_try_submit(OpSequencer *osr);
int _deferred_finish(TransContext *txc);
int _do_deferred_op(TransContext *txc, bluestore_deferred_op_t& wo);
int _deferred_replay();
}
}
+TEST_P(StoreTest, MultiSmallWriteSameBlock) {
+ ObjectStore::Sequencer osr("test");
+ int r;
+ coll_t cid;
+ ghobject_t a(hobject_t(sobject_t("Object 1", CEPH_NOSNAP)));
+ {
+ ObjectStore::Transaction t;
+ t.create_collection(cid, 0);
+ cerr << "Creating collection " << cid << std::endl;
+ r = apply_transaction(store, &osr, std::move(t));
+ ASSERT_EQ(r, 0);
+ }
+ bufferlist bl;
+ bl.append("short");
+ C_SaferCond c, d;
+ // touch same block in both same transaction, tls, and pipelined txns
+ {
+ ObjectStore::Transaction t, u;
+ t.write(cid, a, 0, 5, bl, 0);
+ t.write(cid, a, 5, 5, bl, 0);
+ t.write(cid, a, 4094, 5, bl, 0);
+ t.write(cid, a, 9000, 5, bl, 0);
+ u.write(cid, a, 10, 5, bl, 0);
+ u.write(cid, a, 7000, 5, bl, 0);
+ vector<ObjectStore::Transaction> v = {t, u};
+ store->queue_transactions(&osr, v, nullptr, &c);
+ }
+ {
+ ObjectStore::Transaction t, u;
+ t.write(cid, a, 40, 5, bl, 0);
+ t.write(cid, a, 45, 5, bl, 0);
+ t.write(cid, a, 4094, 5, bl, 0);
+ t.write(cid, a, 6000, 5, bl, 0);
+ u.write(cid, a, 610, 5, bl, 0);
+ u.write(cid, a, 11000, 5, bl, 0);
+ vector<ObjectStore::Transaction> v = {t, u};
+ store->queue_transactions(&osr, v, nullptr, &d);
+ }
+ c.wait();
+ d.wait();
+ {
+ bufferlist bl2;
+ r = store->read(cid, a, 0, 16000, bl2);
+ ASSERT_GE(r, 0);
+ }
+ {
+ ObjectStore::Transaction t;
+ t.remove(cid, a);
+ t.remove_collection(cid);
+ cerr << "Cleaning" << std::endl;
+ r = apply_transaction(store, &osr, std::move(t));
+ ASSERT_EQ(r, 0);
+ }
+}
+
TEST_P(StoreTest, SmallSkipFront) {
ObjectStore::Sequencer osr("test");
int r;