return false;
}
- // =======================================================
-
+// =======================================================
+
+// DeferredBatch
+#undef dout_prefix
+#define dout_prefix *_dout << "bluestore.DeferredBatch(" << this << ") "
+
+void BlueStore::DeferredBatch::prepare_write(
+ CephContext *cct,
+ uint64_t seq, uint64_t offset, uint64_t length,
+ bufferlist::const_iterator& blp)
+{
+ _discard(cct, offset, length);
+ auto i = iomap.insert(make_pair(offset, deferred_io()));
+ assert(i.second); // this should be a new insertion
+ i.first->second.seq = seq;
+ blp.copy(length, i.first->second.bl);
+ dout(20) << __func__ << " seq " << seq
+ << " 0x" << std::hex << offset << "~" << length
+ << " crc " << i.first->second.bl.crc32c(-1)
+ << std::dec << dendl;
+ seq_bytes[seq] += length;
+#ifdef DEBUG_DEFERRED
+ _audit(cct);
+#endif
+}
+
+void BlueStore::DeferredBatch::_discard(
+ CephContext *cct, uint64_t offset, uint64_t length)
+{
+ generic_dout(20) << __func__ << " 0x" << std::hex << offset << "~" << length
+ << std::dec << dendl;
+ auto p = iomap.lower_bound(offset);
+ if (p != iomap.begin()) {
+ --p;
+ auto end = p->first + p->second.bl.length();
+ if (end > offset) {
+ bufferlist head;
+ head.substr_of(p->second.bl, 0, offset - p->first);
+ dout(20) << __func__ << " keep head " << p->second.seq
+ << " 0x" << std::hex << p->first << "~" << p->second.bl.length()
+ << " -> 0x" << head.length() << std::dec << dendl;
+ auto i = seq_bytes.find(p->second.seq);
+ if (end > offset + length) {
+ bufferlist tail;
+ tail.substr_of(p->second.bl, offset + length - p->first,
+ end - (offset + length));
+ dout(20) << __func__ << " keep tail " << p->second.seq
+ << " 0x" << std::hex << p->first << "~" << p->second.bl.length()
+ << " -> 0x" << tail.length() << std::dec << dendl;
+ auto &n = iomap[offset + length];
+ n.bl.swap(tail);
+ n.seq = p->second.seq;
+ i->second -= length;
+ } else {
+ i->second -= end - offset;
+ }
+ p->second.bl.swap(head);
+ }
+ ++p;
+ }
+ while (p != iomap.end()) {
+ if (p->first >= offset + length) {
+ break;
+ }
+ auto i = seq_bytes.find(p->second.seq);
+ auto end = p->first + p->second.bl.length();
+ if (end > offset + length) {
+ unsigned drop_front = offset + length - p->first;
+ unsigned keep_tail = end - (offset + length);
+ dout(20) << __func__ << " truncate front " << p->second.seq
+ << " 0x" << std::hex << p->first << "~" << p->second.bl.length()
+ << " drop_front 0x" << drop_front << " keep_tail 0x" << keep_tail
+ << " to 0x" << (offset + length) << "~" << keep_tail
+ << std::dec << dendl;
+ auto &s = iomap[offset + length];
+ s.seq = p->second.seq;
+ s.bl.substr_of(p->second.bl, drop_front, keep_tail);
+ i->second -= drop_front;
+ } else {
+ dout(20) << __func__ << " drop " << p->second.seq
+ << " 0x" << std::hex << p->first << "~" << p->second.bl.length()
+ << std::dec << dendl;
+ i->second -= p->second.bl.length();
+ }
+ p = iomap.erase(p);
+ }
+}
+
+void BlueStore::DeferredBatch::_audit(CephContext *cct)
+{
+ map<uint64_t,int> sb;
+ for (auto p : seq_bytes) {
+ sb[p.first] = 0; // make sure we have the same set of keys
+ }
+ uint64_t pos = 0;
+ for (auto& p : iomap) {
+ assert(p.first >= pos);
+ sb[p.second.seq] += p.second.bl.length();
+ pos = p.first + p.second.bl.length();
+ }
+ assert(sb == seq_bytes);
+}
+
+
// Collection
#undef dout_prefix
static void aio_cb(void *priv, void *priv2)
{
BlueStore *store = static_cast<BlueStore*>(priv);
- store->_txc_aio_finish(priv2);
+ if ((unsigned long long)priv2 & 0x1ull) {
+ store->deferred_aio_finish((void*)((unsigned long long)priv2 & ~1ull));
+ } else {
+ store->txc_aio_finish(priv2);
+ }
}
BlueStore::BlueStore(CephContext *cct, const string& path)
txc->state = TransContext::STATE_FINISHING;
break;
- case TransContext::STATE_DEFERRED_AIO_WAIT:
- txc->log_state_latency(logger, l_bluestore_state_deferred_aio_wait_lat);
- _deferred_finish(txc);
- return;
-
case TransContext::STATE_DEFERRED_CLEANUP:
txc->log_state_latency(logger, l_bluestore_state_deferred_cleanup_lat);
txc->state = TransContext::STATE_FINISHING;
{
// submit anything pending
std::lock_guard<std::mutex> l(deferred_lock);
- if (!osr->deferred_pending.empty()) {
- _deferred_try_submit(osr);
+ if (osr->deferred_pending) {
+ _deferred_submit(osr);
}
}
{
dout(20) << __func__ << " wake" << dendl;
} else {
deque<TransContext*> kv_submitting;
- deque<TransContext*> deferred_done, deferred_stable;
+ deque<DeferredBatch*> deferred_done, deferred_stable;
dout(20) << __func__ << " committing " << kv_queue.size()
<< " submitting " << kv_queue_unsubmitted.size()
<< " deferred done " << deferred_done_queue.size()
if (num_aios) {
force_flush = true;
} else if (kv_committing.empty() && kv_submitting.empty() &&
- deferred_stable.empty()) {
+ deferred_stable.empty()) {
force_flush = true; // there's nothing else to commit!
} else if (deferred_aggressive) {
force_flush = true;
}
// cleanup sync deferred keys
- for (auto txc : deferred_stable) {
- bluestore_deferred_transaction_t& wt = *txc->deferred_txn;
- if (!wt.released.empty()) {
- // kraken replay compat only
- txc->released = wt.released;
- dout(10) << __func__ << " deferred txn has released " << txc->released
- << " (we just upgraded from kraken) on " << txc << dendl;
- _txc_finalize_kv(txc, synct);
+ for (auto b : deferred_stable) {
+ for (auto& txc : b->txcs) {
+ bluestore_deferred_transaction_t& wt = *txc.deferred_txn;
+ if (!wt.released.empty()) {
+ // kraken replay compat only
+ txc.released = wt.released;
+ dout(10) << __func__ << " deferred txn has released "
+ << txc.released
+ << " (we just upgraded from kraken) on " << &txc << dendl;
+ _txc_finalize_kv(&txc, synct);
+ }
+ // cleanup the deferred
+ string key;
+ get_deferred_key(wt.seq, &key);
+ synct->rm_single_key(PREFIX_DEFERRED, key);
}
- // cleanup the deferred
- string key;
- get_deferred_key(wt.seq, &key);
- synct->rm_single_key(PREFIX_DEFERRED, key);
}
// submit synct synchronously (block and wait for it to commit)
_txc_state_proc(txc);
kv_committing.pop_front();
}
- while (!deferred_stable.empty()) {
- TransContext *txc = deferred_stable.front();
- _txc_state_proc(txc);
- deferred_stable.pop_front();
+ for (auto b : deferred_stable) {
+ auto p = b->txcs.begin();
+ while (p != b->txcs.end()) {
+ TransContext *txc = &*p;
+ p = b->txcs.erase(p); // unlink here because
+ _txc_state_proc(txc); // this may destroy txc
+ }
+ delete b;
}
if (!deferred_aggressive) {
void BlueStore::_deferred_queue(TransContext *txc)
{
- dout(20) << __func__ << " txc " << txc << " on " << txc->osr << dendl;
+ dout(20) << __func__ << " txc " << txc << " osr " << txc->osr << dendl;
std::lock_guard<std::mutex> l(deferred_lock);
- if (txc->osr->deferred_pending.empty() &&
- txc->osr->deferred_running.empty()) {
+ if (!txc->osr->deferred_pending &&
+ !txc->osr->deferred_running) {
deferred_queue.push_back(*txc->osr);
}
- txc->osr->deferred_pending.push_back(*txc);
+ if (!txc->osr->deferred_pending) {
+ txc->osr->deferred_pending = new DeferredBatch(cct, txc->osr.get());
+ }
++deferred_queue_size;
+ txc->osr->deferred_pending->txcs.push_back(*txc);
+ bluestore_deferred_transaction_t& wt = *txc->deferred_txn;
+ for (auto opi = wt.ops.begin(); opi != wt.ops.end(); ++opi) {
+ const auto& op = *opi;
+ assert(op.op == bluestore_deferred_op_t::OP_WRITE);
+ bufferlist::const_iterator p = op.data.begin();
+ for (auto e : op.extents) {
+ txc->osr->deferred_pending->prepare_write(
+ cct, wt.seq, e.offset, e.length, p);
+ }
+ }
if (deferred_aggressive &&
- txc->osr->deferred_running.empty()) {
- _deferred_try_submit(txc->osr.get());
+ !txc->osr->deferred_running) {
+ _deferred_submit(txc->osr.get());
}
}
dout(20) << __func__ << " " << deferred_queue.size() << " osrs, "
<< deferred_queue_size << " txcs" << dendl;
for (auto& osr : deferred_queue) {
- if (osr.deferred_running.empty()) {
- _deferred_try_submit(&osr);
+ if (!osr.deferred_running) {
+ _deferred_submit(&osr);
}
}
}
-void BlueStore::_deferred_try_submit(OpSequencer *osr)
+void BlueStore::_deferred_submit(OpSequencer *osr)
{
- dout(10) << __func__ << " osr " << osr << " " << osr->deferred_pending.size()
- << " pending " << dendl;
- assert(!osr->deferred_pending.empty());
- assert(osr->deferred_running.empty());
+ dout(10) << __func__ << " osr " << osr
+ << " " << osr->deferred_pending->iomap.size() << " ios pending "
+ << dendl;
+ assert(osr->deferred_pending);
+ assert(!osr->deferred_running);
- deferred_queue_size -= osr->deferred_pending.size();
+ auto b = osr->deferred_pending;
+ deferred_queue_size -= b->seq_bytes.size();
assert(deferred_queue_size >= 0);
- osr->deferred_running.swap(osr->deferred_pending);
- // attach all IO to the last in the batch
- TransContext *last = &osr->deferred_running.back();
+ osr->deferred_running = osr->deferred_pending;
+ osr->deferred_pending = nullptr;
- // reverse order
- for (auto i = osr->deferred_running.rbegin();
- i != osr->deferred_running.rend();
- ++i) {
- TransContext *txc = &*i;
- bluestore_deferred_transaction_t& wt = *txc->deferred_txn;
- dout(20) << __func__ << " txc " << txc << " seq " << wt.seq << dendl;
- txc->log_state_latency(logger, l_bluestore_state_deferred_queued_lat);
- txc->state = TransContext::STATE_DEFERRED_AIO_WAIT;
- for (auto opi = wt.ops.rbegin(); opi != wt.ops.rend(); ++opi) {
- assert(opi->op == bluestore_deferred_op_t::OP_WRITE);
- const auto& op = *opi;
- uint64_t bl_offset = 0;
- for (auto e : op.extents) {
- interval_set<uint64_t> cur;
- cur.insert(e.offset, e.length);
- interval_set<uint64_t> overlap;
- overlap.intersection_of(cur, osr->deferred_blocks);
- cur.subtract(overlap);
- dout(20) << __func__ << " txc " << txc << " " << e << std::hex
- << " overlap 0x" << overlap << " new 0x" << cur
- << " from bl_offset 0x" << bl_offset << std::dec << dendl;
- for (auto j = cur.begin(); j != cur.end(); ++j) {
- bufferlist bl;
- bl.substr_of(op.data, bl_offset + j.get_start() - e.offset,
- j.get_len());
- if (!g_conf->bluestore_debug_omit_block_device_write) {
- logger->inc(l_bluestore_deferred_write_ops);
- logger->inc(l_bluestore_deferred_write_bytes, bl.length());
- int r = bdev->aio_write(j.get_start(), bl, &last->ioc, false);
- assert(r == 0);
- }
- txc->osr->deferred_blocks.insert(j.get_start(), j.get_len());
+ uint64_t start = 0, pos = 0;
+ bufferlist bl;
+ auto i = b->iomap.begin();
+ while (true) {
+ if (i == b->iomap.end() || i->first != pos) {
+ if (bl.length()) {
+ dout(20) << __func__ << " write 0x" << std::hex
+ << start << "~" << bl.length()
+ << " crc " << bl.crc32c(-1) << std::dec << dendl;
+ if (!g_conf->bluestore_debug_omit_block_device_write) {
+ logger->inc(l_bluestore_deferred_write_ops);
+ logger->inc(l_bluestore_deferred_write_bytes, bl.length());
+ int r = bdev->aio_write(start, bl, &b->ioc, false);
+ assert(r == 0);
}
- bl_offset += e.length;
}
+ if (i == b->iomap.end()) {
+ break;
+ }
+ start = 0;
+ pos = i->first;
+ bl.clear();
+ }
+ dout(20) << __func__ << " seq " << i->second.seq << " 0x"
+ << std::hex << pos << "~" << i->second.bl.length() << std::dec
+ << dendl;
+ if (!bl.length()) {
+ start = pos;
}
+ pos += i->second.bl.length();
+ bl.claim_append(i->second.bl);
+ ++i;
}
- osr->deferred_txc = last;
- dout(20) << __func__ << " osr " << osr << " deferred_blocks 0x" << std::hex
- << osr->deferred_blocks << std::dec << dendl;
- _txc_aio_submit(last);
+ bdev->aio_submit(&b->ioc);
}
-int BlueStore::_deferred_finish(TransContext *txc)
+void BlueStore::_deferred_aio_finish(OpSequencer *osr)
{
- bluestore_deferred_transaction_t& wt = *txc->deferred_txn;
- dout(20) << __func__ << " txc " << txc << " seq " << wt.seq << dendl;
+ dout(10) << __func__ << " osr " << osr << dendl;
+ assert(osr->deferred_running);
+
+ DeferredBatch *b = osr->deferred_running;
+ {
+ std::lock_guard<std::mutex> l2(osr->qlock);
+ std::lock_guard<std::mutex> l(kv_lock);
+ for (auto& i : b->txcs) {
+ TransContext *txc = &i;
+ txc->state = TransContext::STATE_DEFERRED_CLEANUP;
+ txc->osr->qcond.notify_all();
+ throttle_deferred_bytes.put(txc->cost);
+ }
+ deferred_done_queue.emplace_back(b);
+ }
- OpSequencer::deferred_queue_t finished;
{
std::lock_guard<std::mutex> l(deferred_lock);
- assert(txc->osr->deferred_txc == txc);
- txc->osr->deferred_blocks.clear();
- finished.swap(txc->osr->deferred_running);
- if (txc->osr->deferred_pending.empty()) {
- auto q = deferred_queue.iterator_to(*txc->osr);
+ assert(osr->deferred_running == b);
+ osr->deferred_running = nullptr;
+ if (!osr->deferred_pending) {
+ auto q = deferred_queue.iterator_to(*osr);
deferred_queue.erase(q);
} else if (deferred_aggressive) {
- _deferred_try_submit(txc->osr.get());
+ _deferred_submit(osr);
}
}
- std::lock_guard<std::mutex> l2(txc->osr->qlock);
- std::lock_guard<std::mutex> l(kv_lock);
- for (auto& i : finished) {
- TransContext *txc = &i;
- txc->state = TransContext::STATE_DEFERRED_CLEANUP;
- txc->osr->qcond.notify_all();
- throttle_deferred_bytes.put(txc->cost);
- deferred_done_queue.push_back(txc);
- }
- finished.clear();
-
// in the normal case, do not bother waking up the kv thread; it will
// catch us on the next commit anyway.
if (deferred_aggressive) {
+ std::lock_guard<std::mutex> l(kv_lock);
kv_cond.notify_one();
}
- return 0;
}
int BlueStore::_deferred_replay()
class BlueFS;
//#define DEBUG_CACHE
+//#define DEBUG_DEFERRED
enum {
l_bluestore_first = 732430,
STATE_KV_QUEUED, // queued for kv_sync_thread submission
STATE_KV_SUBMITTED, // submitted to kv; not yet synced
STATE_KV_DONE,
- STATE_DEFERRED_QUEUED, // in deferred_queue
- STATE_DEFERRED_AIO_WAIT, // aio in flight, waiting for completion
+ STATE_DEFERRED_QUEUED, // in deferred_queue (pending or running)
STATE_DEFERRED_CLEANUP, // remove deferred kv record
STATE_DEFERRED_DONE,
STATE_FINISHING,
case STATE_KV_SUBMITTED: return "kv_submitted";
case STATE_KV_DONE: return "kv_done";
case STATE_DEFERRED_QUEUED: return "deferred_queued";
- case STATE_DEFERRED_AIO_WAIT: return "deferred_aio_wait";
case STATE_DEFERRED_CLEANUP: return "deferred_cleanup";
case STATE_DEFERRED_DONE: return "deferred_done";
case STATE_FINISHING: return "finishing";
case l_bluestore_state_kv_committing_lat: return "kv_committing";
case l_bluestore_state_kv_done_lat: return "kv_done";
case l_bluestore_state_deferred_queued_lat: return "deferred_queued";
- case l_bluestore_state_deferred_aio_wait_lat: return "deferred_aio_wait";
case l_bluestore_state_deferred_cleanup_lat: return "deferred_cleanup";
case l_bluestore_state_finishing_lat: return "finishing";
case l_bluestore_state_done_lat: return "done";
}
};
+ typedef boost::intrusive::list<
+ TransContext,
+ boost::intrusive::member_hook<
+ TransContext,
+ boost::intrusive::list_member_hook<>,
+ &TransContext::deferred_queue_item> > deferred_queue_t;
+
+ struct DeferredBatch {
+ struct deferred_io {
+ bufferlist bl; ///< data
+ uint64_t seq; ///< deferred transaction seq
+ };
+ map<uint64_t,deferred_io> iomap; ///< map of ios in this batch
+ deferred_queue_t txcs; ///< txcs in this batch
+ IOContext ioc; ///< our aios
+ /// bytes of pending io for each deferred seq (may be 0)
+ map<uint64_t,int> seq_bytes;
+
+ void _discard(CephContext *cct, uint64_t offset, uint64_t length);
+ void _audit(CephContext *cct);
+
+ DeferredBatch(CephContext *cct, OpSequencer *osr)
+ : ioc(cct, (void*)((unsigned long long)osr | 1ull)) {}
+
+ /// prepare a write
+ void prepare_write(CephContext *cct,
+ uint64_t seq, uint64_t offset, uint64_t length,
+ bufferlist::const_iterator& p);
+ };
+
class OpSequencer : public Sequencer_impl {
public:
std::mutex qlock;
&TransContext::sequencer_item> > q_list_t;
q_list_t q; ///< transactions
- typedef boost::intrusive::list<
- TransContext,
- boost::intrusive::member_hook<
- TransContext,
- boost::intrusive::list_member_hook<>,
- &TransContext::deferred_queue_item> > deferred_queue_t;
- deferred_queue_t deferred_pending; ///< waiting
- deferred_queue_t deferred_running; ///< in flight ios
- interval_set<uint64_t> deferred_blocks; ///< blocks in flight
- TransContext *deferred_txc; ///< txc carrying this batch
-
boost::intrusive::list_member_hook<> deferred_osr_queue_item;
+ DeferredBatch *deferred_running = nullptr;
+ DeferredBatch *deferred_pending = nullptr;
+
Sequencer *parent;
BlueStore *store;
deque<TransContext*> kv_queue; ///< ready, already submitted
deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
deque<TransContext*> kv_committing; ///< currently syncing
- deque<TransContext*> deferred_done_queue; ///< deferred ios done
- deque<TransContext*> deferred_stable_queue; ///< deferred ios done + stable
+ deque<DeferredBatch*> deferred_done_queue; ///< deferred ios done
+ deque<DeferredBatch*> deferred_stable_queue; ///< deferred ios done + stable
PerfCounters *logger = nullptr;
void _txc_state_proc(TransContext *txc);
void _txc_aio_submit(TransContext *txc);
public:
- void _txc_aio_finish(void *p) {
+ void txc_aio_finish(void *p) {
_txc_state_proc(static_cast<TransContext*>(p));
}
private:
}
bluestore_deferred_op_t *_get_deferred_op(TransContext *txc, OnodeRef o);
+public:
+ void deferred_aio_finish(void *priv) {
+ _deferred_aio_finish(static_cast<OpSequencer*>(priv));
+ }
+private:
void _deferred_queue(TransContext *txc);
void deferred_try_submit() {
std::lock_guard<std::mutex> l(deferred_lock);
_deferred_try_submit();
}
void _deferred_try_submit();
- void _deferred_try_submit(OpSequencer *osr);
- int _deferred_finish(TransContext *txc);
+ void _deferred_submit(OpSequencer *osr);
+ void _deferred_aio_finish(OpSequencer *osr);
int _deferred_replay();
public: