TransContext *txc = new TransContext(osr);
txc->t = db->get_transaction();
osr->queue_new(txc);
- dout(20) << __func__ << " osr " << osr << " = " << txc << dendl;
+ dout(20) << __func__ << " osr " << osr << " = " << txc
+ << " seq " << txc->seq << dendl;
return txc;
}
//assert(txc->osr->qlock.is_locked()); // see _txc_finish_io
txc->log_state_latency(logger, l_bluestore_state_io_done_lat);
txc->state = TransContext::STATE_KV_QUEUED;
+ for (auto& o : txc->onodes) {
+ o->bc.finish_write(txc->seq);
+ }
if (!g_conf->bluestore_sync_transaction) {
if (g_conf->bluestore_sync_submit_transaction) {
_txc_finalize_kv(txc, txc->t);
dout(log_level) << __func__ << " overlay_refs " << o->onode.overlay_refs
<< dendl;
}
+ if (!o->bc.empty()) {
+ dout(log_level) << __func__ << " buffer_cache size 0x" << std::hex
+ << o->bc.size << std::dec << dendl;
+ for (auto& i : o->bc.buffer_map) {
+ dout(log_level) << __func__ << " 0x" << std::hex << i.first << "~0x"
+ << i.second->length << std::dec
+ << " seq " << i.second->seq
+ << " " << Buffer::get_state_name(i.second->state)
+ << dendl;
+ }
+ }
if (o->tail_bl.length()) {
dout(log_level) << __func__ << " tail offset 0x" << std::hex << o->tail_offset
<< " len 0x" << o->tail_bl.length() << std::dec
wctx.buffered = true;
}
+ // write in buffer cache
+ o->bc.write(txc->seq, offset, bl);
+
bufferlist::iterator p = bl.begin();
if (offset / min_alloc_size == (end - 1) / min_alloc_size &&
(length != min_alloc_size)) {
o->clear_tail();
}
+ o->bc.discard(offset, length);
+
WriteContext wctx;
o->onode.punch_hole(offset, length, &wctx.lex_old);
_wctx_finish(txc, c, o, &wctx);
// they may touch.
o->flush();
+ o->bc.truncate(offset);
+
WriteContext wctx;
o->onode.punch_hole(offset, o->onode.size, &wctx.lex_old);
_wctx_finish(txc, c, o, &wctx);
class TransContext;
+ /// cached buffer
+ struct Buffer {
+ enum {
+ STATE_UNDEF = 0,
+ STATE_CLEAN,
+ STATE_WRITING,
+ STATE_READING,
+ };
+ static const char *get_state_name(int s) {
+ switch (s) {
+ case STATE_UNDEF: return "undef";
+ case STATE_CLEAN: return "clean";
+ case STATE_WRITING: return "writing";
+ case STATE_READING: return "reading";
+ default: return "???";
+ }
+ }
+
+ unsigned state; ///< STATE_*
+ uint64_t seq;
+ uint64_t offset, length;
+ bufferlist data;
+
+ boost::intrusive::list_member_hook<> onode_lru_item;
+
+ Buffer(unsigned s, uint64_t q, uint64_t o, uint64_t l)
+ : state(s), seq(q), offset(o), length(l) {}
+ Buffer(unsigned s, uint64_t q, uint64_t o, bufferlist& b)
+ : state(s), seq(q), offset(o), length(b.length()), data(b) {}
+
+ bool is_clean() const {
+ return state == STATE_CLEAN;
+ }
+ bool is_writing() const {
+ return state == STATE_WRITING;
+ }
+ bool is_reading() const {
+ return state == STATE_READING;
+ }
+
+ uint64_t end() const {
+ return offset + length;
+ }
+
+ void truncate(uint64_t newlen) {
+ assert(newlen > length);
+ if (data.length()) {
+ bufferlist t;
+ t.substr_of(data, 0, newlen);
+ data.claim(t);
+ }
+ length = newlen;
+ }
+
+ void dump(Formatter *f) const {
+ f->dump_string("state", get_state_name(state));
+ f->dump_unsigned("seq", seq);
+ f->dump_unsigned("offset", offset);
+ f->dump_unsigned("length", length);
+ f->dump_unsigned("data_length", data.length());
+ }
+ };
+
+ struct BufferSpace {
+ typedef boost::intrusive::list<
+ Buffer,
+ boost::intrusive::member_hook<
+ Buffer,
+ boost::intrusive::list_member_hook<>,
+ &Buffer::onode_lru_item> > lru_list_t;
+
+ map<uint64_t,std::unique_ptr<Buffer>> buffer_map;
+ lru_list_t lru;
+ uint64_t size = 0;
+
+ void _add_buffer(Buffer *b) {
+ buffer_map[b->offset].reset(b);
+ lru.push_front(*b);
+ size += b->length;
+ }
+ void _rm_buffer(map<uint64_t,std::unique_ptr<Buffer>>::iterator p) {
+ size -= p->second->length;
+ lru.erase(lru.iterator_to(*p->second));
+ buffer_map.erase(p);
+ }
+
+ /// move to top of lru
+ void _touch_buffer(Buffer *b) {
+ lru_list_t::iterator p = lru.iterator_to(*b);
+ lru.erase(p);
+ lru.push_front(*b);
+ }
+
+ map<uint64_t,std::unique_ptr<Buffer>>::iterator _data_lower_bound(
+ uint64_t offset) {
+ auto i = buffer_map.lower_bound(offset);
+ if (i != buffer_map.begin()) {
+ --i;
+ if (i->first + i->second->length <= offset)
+ ++i;
+ }
+ return i;
+ }
+
+ bool empty() const {
+ return buffer_map.empty();
+ }
+
+ void discard(uint64_t offset, uint64_t length) {
+ auto i = _data_lower_bound(offset);
+ uint64_t end = offset + length;
+ while (i != buffer_map.end()) {
+ Buffer *b = i->second.get();
+ if (b->offset >= offset + length) {
+ break;
+ }
+ if (b->offset < offset) {
+ uint64_t front = offset - b->offset;
+ if (b->offset + b->length > offset + length) {
+ // drop middle (split)
+ uint64_t tail = b->offset + b->length - (offset + length);
+ if (b->data.length()) {
+ bufferlist bl;
+ bl.substr_of(b->data, b->length - tail, tail);
+ _add_buffer(new Buffer(b->state, b->seq, end, bl));
+ } else {
+ _add_buffer(new Buffer(b->state, b->seq, end, tail));
+ }
+ size -= b->length - front - tail;
+ b->truncate(front);
+ return;
+ } else {
+ // drop tail
+ size -= b->length - front;
+ b->truncate(front);
+ ++i;
+ continue;
+ }
+ }
+ if (b->end() <= end) {
+ // drop entire buffer
+ _rm_buffer(i++);
+ continue;
+ }
+ // drop front
+ uint64_t keep = b->end() - end;
+ size -= b->length - keep;
+ if (b->data.length()) {
+ bufferlist bl;
+ bl.substr_of(b->data, b->length - keep, keep);
+ _add_buffer(new Buffer(b->state, b->seq, end, bl));
+ _rm_buffer(i);
+ } else {
+ _add_buffer(new Buffer(b->state, b->seq, end, keep));
+ _rm_buffer(i);
+ }
+ return;
+ }
+ }
+
+ void write(uint64_t seq, uint64_t offset, bufferlist& bl) {
+ discard(offset, bl.length());
+ _add_buffer(new Buffer(Buffer::STATE_WRITING, seq, offset, bl));
+ }
+ void finish_write(uint64_t seq) {
+ // fixme: be more efficient... intrusive_list just for writing, perhaps?
+ for (auto i = buffer_map.begin(); i != buffer_map.end(); ++i) {
+ if (i->second->is_writing() &&
+ i->second->seq <= seq) {
+ i->second->state = Buffer::STATE_CLEAN;
+ }
+ }
+ }
+
+ void truncate(uint64_t offset) {
+ discard(offset, (uint64_t)-1 - offset);
+ }
+
+ void trim(uint64_t keep) {
+ lru_list_t::iterator i = lru.end();
+ while (size > keep) {
+ Buffer *b = &*i;
+ if (b->is_clean()) {
+ auto p = buffer_map.find(b->offset);
+ if (i != lru.begin())
+ ++i;
+ _rm_buffer(p);
+ } else {
+ if (i != lru.begin()) {
+ ++i;
+ continue;
+ } else {
+ break;
+ }
+ }
+ }
+ }
+
+ void dump(Formatter *f) const {
+ f->dump_unsigned("size", size);
+ f->open_array_section("buffers");
+ for (auto& i : buffer_map) {
+ f->open_object_section("buffer");
+ assert(i.first == i.second->offset);
+ i.second->dump(f);
+ f->close_section();
+ }
+ f->close_section();
+ }
+ };
+
/// an in-memory extent-map, shared by a group of objects (w/ same hash value)
struct BnodeSet;
std::condition_variable flush_cond; ///< wait here for unapplied txns
set<TransContext*> flush_txns; ///< committing or wal txns
+ BufferSpace bc;
+
uint64_t tail_offset = 0;
uint64_t tail_txc_seq = 0;
bufferlist tail_bl;