#include <boost/random/mersenne_twister.hpp>
#include <boost/random/uniform_real.hpp>
+#include "common/dout.h"
#include "include/cpp-btree/btree_set.h"
#include "BlueStore.h"
dout(LogLevelV) << __func__ << " csum: " << std::hex << v << std::dec
<< dendl;
}
- std::lock_guard l(e.blob->get_cache()->lock);
- for (auto& i : e.blob->get_bc().buffer_map) {
- dout(LogLevelV) << __func__ << " 0x" << std::hex << i.first
- << "~" << i.second.length << std::dec
- << " " << i.second << dendl;
- }
}
}
<< " len " << p->second.length() << dendl;
}
_dump_extent_map<LogLevelV>(cct, o.extent_map);
+
+ for (auto& i : o.bc.buffer_map) {
+ dout(LogLevelV) << __func__ << " 0x" << std::hex << i.first << "~"
+ << i.second.length << std::dec << " " << i.second
+ << dendl;
+ }
}
template <int LogLevelV>
#undef dout_prefix
#define dout_prefix *_dout << "bluestore.BufferSpace(" << this << " in " << cache << ") "
-void BlueStore::BufferSpace::_clear(BufferCacheShard* cache)
+bool BlueStore::BufferSpace::_dup_writing(TransContext* txc, BufferCacheShard* cache, OnodeRef onode)
{
- // note: we already hold cache->lock
- ldout(cache->cct, 20) << __func__ << dendl;
- while (!buffer_map.empty()) {
- _rm_buffer(cache, buffer_map.begin());
+ bool copied = false;
+ if (!writing.empty()) {
+ copied = true;
+ for (auto it = writing.begin(); it != writing.end(); ++it) {
+ Buffer& b = *it;
+ Buffer to_b(&onode->bc, b.state, b.seq, b.offset, b.data, b.flags);
+ BufferSpace& to = onode->bc;
+ ceph_assert(to_b.is_writing());
+ to._add_buffer(cache, &to, std::move(to_b), b.cache_private, 0, nullptr);
+ txc->buffers_written.insert({onode.get(), b.offset, b.seq});
+ }
}
+ return copied;
}
+
int BlueStore::BufferSpace::_discard(BufferCacheShard* cache, uint32_t offset, uint32_t length)
{
// note: we already hold cache->lock
- ldout(cache->cct, 20) << __func__ << std::hex << " 0x" << offset << "~" << length
- << std::dec << dendl;
+ ldout(cache->cct, 20) << __func__ << std::hex << " 0x" << offset << "~"
+ << length << std::dec << dendl;
int cache_private = 0;
cache->_audit("discard start");
auto i = _data_lower_bound(offset);
if (b->offset >= end) {
break;
}
+ // If no overlap is found between what we want to read and what we have
+ // cached, then forget about continuing
+ if (std::max(b->end(), offset + length) - std::min(b->offset, offset) >
+ length + b->length) {
+ break;
+ }
if (b->cache_private > cache_private) {
cache_private = b->cache_private;
}
for (auto i = _data_lower_bound(offset);
i != buffer_map.end() && offset < end && i->first < end; ++i) {
Buffer *b = &i->second;
- ceph_assert(b->end() > offset);
+ if (b->end() <= offset) {
+ break;
+ }
+
+ // If no overlap is found between what we want to read and what we have cached, then forget about continuing
+ if (std::max(b->end(), offset+length) - std::min(b->offset, offset) > length + b->length) {
+ break;
+ }
bool val = false;
if (flags & BYPASS_CLEAN_CACHE)
cache->logger->inc(l_bluestore_buffer_miss_bytes, miss_bytes);
}
-void BlueStore::BufferSpace::_finish_write(BufferCacheShard* cache, uint64_t seq)
+void BlueStore::BufferSpace::_finish_write(BufferCacheShard* cache, uint32_t offset, uint64_t seq)
{
auto i = writing.begin();
while (i != writing.end()) {
cache->_audit("finish_write end");
}
-/*
- copy Buffers that are in writing queue
- returns:
- true if something copied
- false if nothing copied
-*/
-bool BlueStore::BufferSpace::_dup_writing(BufferCacheShard* cache, BufferSpace* to)
-{
- bool copied = false;
- if (!writing.empty()) {
- copied = true;
- for (auto it = writing.begin(); it != writing.end(); ++it) {
- Buffer& b = *it;
- ceph_assert(b.is_writing());
- to->_add_buffer(cache, to,
- Buffer(to, b.state, b.seq, b.offset, b.data, b.flags), 0,
- 0, nullptr);
- }
- }
- return copied;
-}
-
void BlueStore::BufferSpace::split(BufferCacheShard* cache, size_t pos, BlueStore::BufferSpace &r)
{
std::lock_guard lk(cache->lock);
if (coll_cache != get_cache()) {
goto again;
}
- bc._clear(coll_cache);
coll_cache->rm_blob();
}
- SharedBlob* sb = shared_blob.get();
- ceph_assert(sb || (!sb && bc.buffer_map.empty()));
}
void BlueStore::Blob::dump(Formatter* f) const
return out;
}
-void BlueStore::Blob::discard_unallocated(Collection *coll)
+void BlueStore::Blob::discard_unallocated(Collection *coll, Onode* onode, uint32_t logical_offset)
{
if (get_blob().is_shared()) {
return;
ceph_assert(discard == all_invalid); // in case of compressed blob all
// or none pextents are invalid.
if (discard) {
- dirty_bc().discard(get_cache(), 0,
+ onode->bc.discard(onode->c->cache, logical_offset,
get_blob().get_logical_length());
}
} else {
size_t pos = 0;
for (auto e : get_blob().get_extents()) {
if (!e.is_valid()) {
- dout(20) << __func__ << " 0x" << std::hex << pos
+ dout(20) << __func__ << " 0x" << std::hex << pos
<< "~" << e.length
- << std::dec << dendl;
- dirty_bc().discard(get_cache(), pos, e.length);
+ << std::dec << dendl;
+ onode->bc.discard(onode->c->cache, logical_offset + pos, e.length);
}
pos += e.length;
}
#undef dout_context
#define dout_context cct
-// Cut Buffers that are not covered by extents.
-// It happens when we punch hole in Blob, but not refill with new data.
-// Normally it is not a problem (other then wasted memory),
-// but when 2 Blobs are merged Buffers might collide.
-// Todo: in future cut Buffers when we delete extents from Blobs,
-// and get rid of this function.
-void BlueStore::Blob::discard_unused_buffers(CephContext* cct, BufferCacheShard* cache)
-{
- dout(25) << __func__ << " input " << *this << " bc=" << bc << dendl;
- const PExtentVector& extents = get_blob().get_extents();
- uint32_t epos = 0;
- auto e = extents.begin();
- while(e != extents.end()) {
- if (!e->is_valid()) {
- bc._discard(cache, epos, e->length);
- }
- epos += e->length;
- ++e;
- }
- ceph_assert(epos <= blob.get_logical_length());
- // Preferably, we would trim up to blob.get_logical_length(),
- // but we copied writing buffers (see _dup_writing) before blob logical_length is fixed.
- bc._discard(cache, epos, OBJECT_MAX_SIZE - epos);
- dout(25) << __func__ << " output bc=" << bc << dendl;
-}
-
void BlueStore::Blob::dup(const Blob& from, bool copy_used_in_blob)
{
set_shared_blob(from.shared_blob);
// now apply freshly merged tmp_extents into dst blob
dst_blob.dirty_extents().swap(tmp_extents);
- // move BufferSpace buffers
- while(!src->bc.buffer_map.empty()) {
- auto buf = src->bc.buffer_map.extract(src->bc.buffer_map.cbegin());
- buf.mapped().space = &dst->bc;
- if (dst->bc.buffer_map.count(buf.key()) == 0) {
- dst->bc.buffer_map.emplace(buf.key(), std::move(buf.mapped()));
- }
- }
- // move BufferSpace writing
- auto wrt_dst_it = dst->bc.writing.begin();
- while(!src->bc.writing.empty()) {
- Buffer& buf = src->bc.writing.front();
- src->bc.writing.pop_front();
- while (wrt_dst_it != dst->bc.writing.end() && wrt_dst_it->seq < buf.seq) {
- ++wrt_dst_it;
- }
- dst->bc.writing.insert(wrt_dst_it, buf);
- }
dout(20) << __func__ << " result=" << *dst << dendl;
return dst_blob.get_logical_length();
}
#undef dout_context
#define dout_context collection->store->cct
-void BlueStore::Blob::finish_write(uint64_t seq)
-{
- while (true) {
- auto coll = get_collection();
- BufferCacheShard *cache = coll->cache;
- std::lock_guard l(cache->lock);
- if (coll->cache != cache) {
- dout(20) << __func__
- << " raced with sb cache update, was " << cache
- << ", now " << coll->cache << ", retrying"
- << dendl;
- continue;
- }
- bc._finish_write(cache, seq);
- break;
- }
-}
-
void BlueStore::Blob::split(Collection *coll, uint32_t blob_offset, Blob *r)
{
dout(10) << __func__ << " 0x" << std::hex << blob_offset << std::dec
&(r->used_in_blob));
lb.split(blob_offset, rb);
- dirty_bc().split(get_cache(), blob_offset, r->dirty_bc());
dout(10) << __func__ << " 0x" << std::hex << blob_offset << std::dec
<< " finish " << *this << dendl;
if (e.logical_offset >= end) {
break;
}
- dout(25) << __func__ << " src " << e
- << " bc=" << e.blob->bc << dendl;
- const bluestore_blob_t& blob = e.blob->get_blob();
+ dout(25) << __func__ << " src " << e << " bc=" << onoderef->bc << dendl;
+ const bluestore_blob_t &blob = e.blob->get_blob();
// make sure it is shared
if (!blob.is_shared()) {
dirty_range_begin = std::min<uint32_t>(dirty_range_begin, e.blob_start());
// first try to find a shared blob nearby
// that can accomodate extra extents
- uint32_t blob_width; //to signal when extents end
- dout(20) << __func__ << std::hex
- << " e.blob_start=" << e.blob_start()
- << " e.logical_offset=" << e.logical_offset
- << std::dec << dendl;
- Blob* b = blob.is_compressed() ? nullptr :
- find_mergable_companion(e.blob.get(), e.blob_start(), blob_width, candidates);
+ uint32_t blob_width; // to signal when extents end
+ dout(20) << __func__ << std::hex << " e.blob_start=" << e.blob_start()
+ << " e.logical_offset=" << e.logical_offset << std::dec << dendl;
+ Blob *b = blob.is_compressed()
+ ? nullptr
+ : find_mergable_companion(e.blob.get(), e.blob_start(),
+ blob_width, candidates);
if (b) {
- dout(20) << __func__ << " merging to: " << *b << " bc=" << b->bc << dendl;
- e.blob->discard_unused_buffers(store->cct, c->cache);
- b->discard_unused_buffers(store->cct, c->cache);
- uint32_t b_logical_length = b->merge_blob(store->cct, e.blob.get());
- for (auto p : blob.get_extents()) {
- if (p.is_valid()) {
- b->get_shared_blob()->get_ref(p.offset, p.length);
- }
- }
- // reblob extents might erase e
- dirty_range_end = std::max<uint32_t>(dirty_range_end, e.blob_start() + b_logical_length);
- uint32_t goto_logical_offset = e.logical_offset + e.length;
- reblob_extents(e.blob_start(), e.blob_start() + blob_width,
+ dout(20) << __func__ << " merging to: " << *b << " bc=" << onode->bc
+ << dendl;
+ uint32_t b_logical_length = b->merge_blob(store->cct, e.blob.get());
+ for (auto p : blob.get_extents()) {
+ if (p.is_valid()) {
+ b->shared_blob->get_ref(p.offset, p.length);
+ }
+ }
+ // reblob extents might erase e
+ dirty_range_end = std::max<uint32_t>(dirty_range_end, e.blob_start() + b_logical_length);
+ uint32_t goto_logical_offset = e.logical_offset + e.length;
+ reblob_extents(e.blob_start(), e.blob_start() + blob_width,
e.blob, b);
- ep = seek_lextent(goto_logical_offset);
- dout(20) << __func__ << " merged: " << *b << dendl;
+ ep = seek_lextent(goto_logical_offset);
+ dout(20) << __func__ << " merged: " << *b << dendl;
} else {
- // no candidate, has to convert to shared
- c->make_blob_shared(store->_assign_blobid(txc), e.blob);
- ceph_assert(e.logical_end() > 0);
- dirty_range_end = std::max<uint32_t>(dirty_range_end, e.logical_end());
- ++ep;
+ // no candidate, has to convert to shared
+ c->make_blob_shared(store->_assign_blobid(txc), e.blob);
+ ceph_assert(e.logical_end() > 0);
+ dirty_range_end = std::max<uint32_t>(dirty_range_end, e.logical_end());
+ ++ep;
}
} else {
c->load_shared_blob(e.blob->get_shared_blob());
e.blob->last_encoded_id = n;
id_to_blob[n] = cb;
e.blob->dup(*cb);
- // By default do not copy buffers to clones, and let them read data by themselves.
- // The exception are 'writing' buffers, which are not yet stable on device.
- bool some_copied = e.blob->bc._dup_writing(cb->get_cache(), &cb->bc);
- if (some_copied) {
- // Pretend we just wrote those buffers;
- // we need to get _finish_write called, so we can clear then from writing list.
- // Otherwise it will be stuck until someone does write-op on clone.
- txc->blobs_written.insert(cb);
- }
// bump the extent refs on the copied blob's extents
for (auto p : blob.get_extents()) {
txc->statfs_delta.compressed_original() += ne->length;
if (blob_duped) {
txc->statfs_delta.compressed() +=
- cb->get_blob().get_compressed_payload_length();
+ cb->get_blob().get_compressed_payload_length();
}
}
dout(20) << __func__ << " dst " << *ne << dendl;
++n;
}
+ // By default do not copy buffers to clones, and let them read data by
+ // themselves. The exception are 'writing' buffers, which are not yet
+ // stable on device.
+ oldo->bc._dup_writing(txc, onode->c->cache, newo);
+
if (src_dirty) {
oldo->extent_map.dirty_range(dirty_range_begin,
dirty_range_end - dirty_range_begin);
cb->dirty_blob().set_flag(bluestore_blob_t::FLAG_SHARED);
cb->set_shared_blob(e.blob->get_shared_blob());
}
- // By default do not copy buffers to clones, and let them read data by themselves.
- // The exception are 'writing' buffers, which are not yet stable on device.
- bool some_copied = e.blob->bc._dup_writing(cb->get_cache(), &cb->bc);
- if (some_copied) {
- // Pretend we just wrote those buffers;
- // we need to get _finish_write called, so we can clear then from writing list.
- // Otherwise it will be stuck until someone does write-op on the clone.
- txc->blobs_written.insert(cb);
- }
txc->write_shared_blob(e.blob->get_shared_blob());
dout(20) << __func__ << " new " << *cb << dendl;
dout(20) << __func__ << " dst " << *ne << dendl;
++n;
}
+ // By default do not copy buffers to clones, and let them read data by
+ // themselves. The exception are 'writing' buffers, which are not yet
+ // stable on device.
+ oldo->bc._dup_writing(txc, onode->c->cache, newo);
+
if (src_dirty) {
dirty_range(dirty_range_begin, dirty_range_end - dirty_range_begin);
txc->write_onode(oldo);
}
if (e->blob_escapes_range(shard_start, shard_end - shard_start)) {
- if (!e->blob->is_spanning()) {
- // We have two options: (1) split the blob into pieces at the
- // shard boundaries (and adjust extents accordingly), or (2)
- // mark it spanning. We prefer to cut the blob if we can. Note that
- // we may have to split it multiple times--potentially at every
- // shard boundary.
- bool must_span = false;
- BlobRef b = e->blob;
- if (b->can_split()) {
- uint32_t bstart = e->blob_start();
- uint32_t bend = e->blob_end();
- for (const auto& sh : shards) {
- if (bstart < sh.shard_info->offset &&
- bend > sh.shard_info->offset) {
- uint32_t blob_offset = sh.shard_info->offset - bstart;
- if (b->can_split_at(blob_offset)) {
- dout(20) << __func__ << " splitting blob, bstart 0x"
- << std::hex << bstart << " blob_offset 0x"
- << blob_offset << std::dec << " " << *b << dendl;
- b = split_blob(b, blob_offset, sh.shard_info->offset);
- // switch b to the new right-hand side, in case it
- // *also* has to get split.
- bstart += blob_offset;
- onode->c->store->logger->inc(l_bluestore_blob_split);
- } else {
- must_span = true;
- break;
- }
- }
- }
- } else {
- must_span = true;
- }
- if (must_span) {
+ if (!e->blob->is_spanning()) {
+ // We have two options: (1) split the blob into pieces at the
+ // shard boundaries (and adjust extents accordingly), or (2)
+ // mark it spanning. We prefer to cut the blob if we can. Note that
+ // we may have to split it multiple times--potentially at every
+ // shard boundary.
+ bool must_span = false;
+ BlobRef b = e->blob;
+ if (onode->bc.writing.empty() && b->can_split()) {
+ uint32_t bstart = e->blob_start();
+ uint32_t bend = e->blob_end();
+ for (const auto& sh : shards) {
+ if (bstart < sh.shard_info->offset &&
+ bend > sh.shard_info->offset) {
+ uint32_t blob_offset = sh.shard_info->offset - bstart;
+ if (b->can_split_at(blob_offset)) {
+ dout(20) << __func__ << " splitting blob, bstart 0x"
+ << std::hex << bstart << " blob_offset 0x"
+ << blob_offset << std::dec << " " << *b << dendl;
+ b = split_blob(b, blob_offset, sh.shard_info->offset);
+ // switch b to the new right-hand side, in case it
+ // *also* has to get split.
+ bstart += blob_offset;
+ onode->c->store->logger->inc(l_bluestore_blob_split);
+ } else {
+ must_span = true;
+ break;
+ }
+ }
+ }
+ } else {
+ must_span = true;
+ }
+ if (must_span) {
auto bid = allocate_spanning_blob_id();
b->id = bid;
spanning_blob_map[b->id] = b;
// may not be faulted in)
auto rehome_blob = [&](Blob* b) {
- for (auto& i : b->bc.buffer_map) {
- if (!i.second.is_writing()) {
- ldout(store->cct, 1) << __func__ << " moving " << i.second
- << dendl;
- dest->cache->_move(cache, &i.second);
- } else {
- ldout(store->cct, 1) << __func__ << " not moving " << i.second
- << dendl;
- }
- }
cache->rm_blob();
dest->cache->add_blob();
SharedBlob* sb = b->get_shared_blob().get();
};
for (auto& e : o->extent_map.extent_map) {
- e.blob->last_encoded_id = -1;
+ e.blob->last_encoded_id = -1;
}
for (auto& b : o->extent_map.spanning_blob_map) {
- b.second->last_encoded_id = -1;
+ b.second->last_encoded_id = -1;
+ }
+ // By default do not copy buffers to clones, and let them read data by
+ // themselves. The exception are 'writing' buffers, which are not yet
+ // stable on device.
+ for (auto &i : o->bc.buffer_map) {
+ if (!i.second.is_writing()) {
+ ldout(store->cct, 1)
+ << __func__ << " moving " << i.second << dendl;
+ dest->cache->_move(cache, &i.second);
+ } else {
+ ldout(store->cct, 1)
+ << __func__ << " not moving " << i.second << dendl;
+ }
}
for (auto& e : o->extent_map.extent_map) {
cache->rm_extent();
ready_regions_t cache_res;
interval_set<uint32_t> cache_interval;
- bptr->dirty_bc().read(
- bptr->get_cache(), b_off, b_len, cache_res, cache_interval,
+ o->bc.read(
+ o->c->cache, pos, b_len, cache_res, cache_interval,
read_cache_policy);
dout(20) << __func__ << " blob " << *bptr << std::hex
- << " need 0x" << b_off << "~" << b_len
+ << " need 0x" << pos << "~" << b_len
<< " cache has 0x" << cache_interval
<< std::dec << dendl;
while (b_len > 0) {
unsigned l;
if (pc != cache_res.end() &&
- pc->first == b_off) {
+ pc->first == pos) {
l = pc->second.length();
ready_regions[pos] = std::move(pc->second);
dout(30) << __func__ << " use cache 0x" << std::hex << pos << ": 0x"
- << b_off << "~" << l << std::dec << dendl;
+ << pos << "~" << l << std::dec << dendl;
++pc;
} else {
l = b_len;
if (pc != cache_res.end()) {
- ceph_assert(pc->first > b_off);
- l = pc->first - b_off;
+ ceph_assert(pc->first > pos);
+ l = pc->first - pos;
}
dout(30) << __func__ << " will read 0x" << std::hex << pos << ": 0x"
<< b_off << "~" << l << std::dec << dendl;
if (bptr->get_blob().is_compressed()) {
ceph_assert(p != compressed_blob_bls.end());
bufferlist& compressed_bl = *p++;
- if (_verify_csum(o, &bptr->get_blob(), 0, compressed_bl,
- r2r.front().regs.front().logical_offset) < 0) {
+ uint32_t offset = r2r.front().regs.front().logical_offset;
+ uint32_t length = r2r.front().regs.front().length;
+ if (_verify_csum(o, &bptr->get_blob(), 0, compressed_bl, offset) < 0) {
*csum_error = true;
return -EIO;
}
if (r < 0)
return r;
if (buffered) {
- bptr->dirty_bc().did_read(bptr->get_cache(), 0,
- raw_bl);
+ bufferlist region_buffer;
+ // todo bad offset
+ region_buffer.substr_of(raw_bl, offset, length);
+ // need offset before padding
+ o->bc.did_read(o->c->cache, offset,
+ length, std::move(region_buffer));
}
for (auto& req : r2r) {
for (auto& r : req.regs) {
}
} else {
for (auto& req : r2r) {
- if (_verify_csum(o, &bptr->get_blob(), req.r_off, req.bl,
- req.regs.front().logical_offset) < 0) {
+ uint64_t offset = r2r.front().regs.front().logical_offset;
+ if (_verify_csum(o, &bptr->get_blob(), req.r_off, req.bl, offset) < 0) {
*csum_error = true;
return -EIO;
}
- if (buffered) {
- bptr->dirty_bc().did_read(bptr->get_cache(),
- req.r_off, req.bl);
- }
// prune and keep result
for (const auto& r : req.regs) {
+ if (buffered) {
+ bufferlist region_buffer;
+ region_buffer.substr_of(req.bl, r.front, r.length);
+ // need offset before padding
+ o->bc.did_read(o->c->cache, r.logical_offset, r.length, std::move(region_buffer));
+ }
ready_regions[r.logical_offset].substr_of(req.bl, r.front, r.length);
}
}
dout(20) << __func__ << " " << txc << " onodes " << txc->onodes << dendl;
ceph_assert(txc->get_state() == TransContext::STATE_FINISHING);
- for (auto& sb : txc->blobs_written) {
- sb->finish_write(txc->seq);
+ for (auto& [onode, offset, seq] : txc->buffers_written) {
+ onode->bc._finish_write(onode->c->cache, offset, seq);
}
- txc->blobs_written.clear();
+ txc->buffers_written.clear();
+
while (!txc->removed_collections.empty()) {
_queue_reap_collection(txc->removed_collections.front());
txc->removed_collections.pop_front();
uint64_t b_off = offset - head_pad - bstart;
uint64_t b_len = length + head_pad + tail_pad;
- // direct write into unused blocks of an existing mutable blob?
- if ((b_off % chunk_size == 0 && b_len % chunk_size == 0) &&
- b->get_blob().get_ondisk_length() >= b_off + b_len &&
- b->get_blob().is_unused(b_off, b_len) &&
- b->get_blob().is_allocated(b_off, b_len)) {
- _apply_padding(head_pad, tail_pad, bl);
-
- dout(20) << __func__ << " write to unused 0x" << std::hex
- << b_off << "~" << b_len
- << " pad 0x" << head_pad << " + 0x" << tail_pad
- << std::dec << " of mutable " << *b << dendl;
- _buffer_cache_write(txc, b, b_off, bl,
- wctx->buffered ? 0 : Buffer::FLAG_NOCACHE);
-
- if (!g_conf()->bluestore_debug_omit_block_device_write) {
- if (b_len < prefer_deferred_size) {
- dout(20) << __func__ << " deferring small 0x" << std::hex
+ // direct write into unused blocks of an existing mutable blob?
+ if ((b_off % chunk_size == 0 && b_len % chunk_size == 0) &&
+ b->get_blob().get_ondisk_length() >= b_off + b_len &&
+ b->get_blob().is_unused(b_off, b_len) &&
+ b->get_blob().is_allocated(b_off, b_len)) {
+ _buffer_cache_write(txc, b, o, offset, bl,
+ wctx->buffered ? 0 : Buffer::FLAG_NOCACHE);
+ _apply_padding(head_pad, tail_pad, bl);
+
+ dout(20) << __func__ << " write to unused 0x" << std::hex << b_off
+ << "~" << b_len << " pad 0x" << head_pad << " + 0x"
+ << tail_pad << std::dec << " of mutable " << *b << dendl;
+ _buffer_cache_write(txc, b, o, offset - head_pad, bl,
+ wctx->buffered ? 0 : Buffer::FLAG_NOCACHE);
+
+ if (!g_conf()->bluestore_debug_omit_block_device_write) {
+ if (b_len < prefer_deferred_size) {
+ dout(20) << __func__ << " deferring small 0x" << std::hex
<< b_len << std::dec << " unused write via deferred" << dendl;
- bluestore_deferred_op_t *op = _get_deferred_op(txc, bl.length());
- op->op = bluestore_deferred_op_t::OP_WRITE;
- b->get_blob().map(
+ bluestore_deferred_op_t *op = _get_deferred_op(txc, bl.length());
+ op->op = bluestore_deferred_op_t::OP_WRITE;
+ b->get_blob().map(
b_off, b_len,
- [&](uint64_t offset, uint64_t length) {
- op->extents.emplace_back(bluestore_pextent_t(offset, length));
- return 0;
- });
- op->data = bl;
- } else {
- b->get_blob().map_bl(
- b_off, bl,
+ [&](uint64_t offset, uint64_t length) {
+ op->extents.emplace_back(bluestore_pextent_t(offset, length));
+ return 0;
+ });
+ op->data = bl;
+ } else {
+ b->get_blob().map_bl(
+ b_off, bl,
[&](uint64_t offset, bufferlist& t) {
- bdev->aio_write(offset, t,
+ bdev->aio_write(offset, t,
&txc->ioc, wctx->buffered);
- });
- }
- }
- b->dirty_blob().calc_csum(b_off, bl);
- dout(20) << __func__ << " lex old " << *ep << dendl;
- Extent *le = o->extent_map.set_lextent(c, offset, b_off + head_pad, length,
+ });
+ }
+ }
+ b->dirty_blob().calc_csum(b_off, bl);
+ dout(20) << __func__ << " lex old " << *ep << dendl;
+ Extent *le = o->extent_map.set_lextent(c, offset, b_off + head_pad, length,
b,
&wctx->old_extents);
b->dirty_blob().mark_used(le->blob_offset, le->length);
}
logger->inc(l_bluestore_write_small_pre_read);
- _buffer_cache_write(txc, b, b_off, bl,
- wctx->buffered ? 0 : Buffer::FLAG_NOCACHE);
+ bufferlist without_padding;
+ without_padding.substr_of(bl, head_pad, bl.length() - head_pad);
+ _buffer_cache_write(txc, b, o, offset - head_read, std::move(without_padding),
+ wctx->buffered ? 0 : Buffer::FLAG_NOCACHE);
- b->dirty_blob().calc_csum(b_off, bl);
+ b->dirty_blob().calc_csum(b_off, bl);
- if (!g_conf()->bluestore_debug_omit_block_device_write) {
- bluestore_deferred_op_t *op = _get_deferred_op(txc, bl.length());
- op->op = bluestore_deferred_op_t::OP_WRITE;
- int r = b->get_blob().map(
- b_off, b_len,
+ if (!g_conf()->bluestore_debug_omit_block_device_write) {
+ bluestore_deferred_op_t *op = _get_deferred_op(txc, bl.length());
+ op->op = bluestore_deferred_op_t::OP_WRITE;
+ int r = b->get_blob().map(
+ b_off, b_len,
[&](uint64_t offset, uint64_t length) {
- op->extents.emplace_back(bluestore_pextent_t(offset, length));
- return 0;
- });
- ceph_assert(r == 0);
- op->data = std::move(bl);
- dout(20) << __func__ << " deferred write 0x" << std::hex << b_off << "~"
+ op->extents.emplace_back(bluestore_pextent_t(offset, length));
+ return 0;
+ });
+ ceph_assert(r == 0);
+ op->data = std::move(bl);
+ dout(20) << __func__ << " deferred write 0x" << std::hex << b_off << "~"
<< b_len << std::dec << " of mutable " << *b
<< " at " << op->extents << dendl;
- }
+ }
- Extent *le = o->extent_map.set_lextent(c, offset, offset - bstart, length,
+ Extent *le = o->extent_map.set_lextent(c, offset, offset - bstart, length,
b, &wctx->old_extents);
- b->dirty_blob().mark_used(le->blob_offset, le->length);
- txc->statfs_delta.stored() += le->length;
- dout(20) << __func__ << " lex " << *le << dendl;
- return;
- }
- // try to reuse blob if we can
- if (b->can_reuse_blob(min_alloc_size,
+ b->dirty_blob().mark_used(le->blob_offset, le->length);
+ txc->statfs_delta.stored() += le->length;
+ dout(20) << __func__ << " lex " << *le << dendl;
+ return;
+ }
+ // try to reuse blob if we can
+ if (b->can_reuse_blob(min_alloc_size,
max_bsize,
offset0 - bstart,
&alloc_len)) {
logger->inc(l_bluestore_write_penalty_read_ops);
}
auto& b0 = dctx.blob_ref;
- _buffer_cache_write(txc, b0, dctx.b_off, bl,
+ _buffer_cache_write(txc, b0, o, dctx.off - dctx.head_read, bl,
wctx->buffered ? 0 : Buffer::FLAG_NOCACHE);
b0->dirty_blob().calc_csum(dctx.b_off, bl);
wi.b->dirty_blob().mark_used(le->blob_offset, le->length);
txc->statfs_delta.stored() += le->length;
dout(20) << __func__ << " lex " << *le << dendl;
- _buffer_cache_write(txc, wi.b, b_off, wi.bl,
+ bufferlist without_pad;
+ without_pad.substr_of(wi.bl, wi.b_off0-wi.b_off, wi.length0);
+ _buffer_cache_write(txc, wi.b, o, wi.logical_offset, std::move(without_pad),
wctx->buffered ? 0 : Buffer::FLAG_NOCACHE);
// queue io
const bluestore_blob_t& blob = b->get_blob();
if (blob.is_compressed()) {
if (lo.blob_empty) {
- txc->statfs_delta.compressed() -= blob.get_compressed_payload_length();
+ txc->statfs_delta.compressed() -= blob.get_compressed_payload_length();
}
txc->statfs_delta.compressed_original() -= lo.e.length;
}
// longer allocated. Note that this will leave behind edge bits
// that are no longer referenced but not deallocated (until they
// age out of the cache naturally).
- b->discard_unallocated(c.get());
+ // b->discard_unallocated(c.get(), o.get(), lo.e.logical_offset);
for (auto e : r) {
dout(20) << __func__ << " release " << e << dendl;
txc->released.insert(e.offset, e.length);
WriteContext wctx;
if (offset < o->onode.size) {
uint64_t length = o->onode.size - offset;
+ o->bc.discard(o->c->cache, offset, length);
o->extent_map.fault_range(db, offset, length);
o->extent_map.punch_hole(c, offset, length, &wctx.old_extents);
o->extent_map.dirty_range(offset, length);
<< " 0x" << dstoff << "~" << length << std::dec << dendl;
oldo->extent_map.fault_range(db, srcoff, length);
newo->extent_map.fault_range(db, dstoff, length);
+ // it is possible the onode had previous buffers written
+ newo->bc.discard(c->cache, dstoff, length);
_dump_onode<30>(cct, *oldo);
_dump_onode<30>(cct, *newo);
struct BufferSpace;
struct Collection;
+ struct Onode;
typedef boost::intrusive_ptr<Collection> CollectionRef;
+ typedef boost::intrusive_ptr<Onode> OnodeRef;
struct AioContext {
virtual void aio_finish(BlueStore *store) = 0;
unsigned f = 0)
: space(space), state(s), flags(f), seq(q), offset(o),
length(b.length()), data(b) {}
+ Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, ceph::buffer::list&& b,
+ unsigned f = 0)
+ : space(space), state(s), flags(f), seq(q), offset(o),
+ length(b.length()), data(b) {}
Buffer(Buffer &&other) {
std::swap(space, other.space);
}
int _discard(BufferCacheShard* cache, uint32_t offset, uint32_t length);
+ void write(BufferCacheShard* cache, uint64_t seq, uint32_t offset, ceph::buffer::list&& bl,
+ unsigned flags) {
+ std::lock_guard l(cache->lock);
+ Buffer b(this, Buffer::STATE_WRITING, seq, offset, std::move(bl),
+ flags);
+ uint16_t cache_private = _discard(cache, offset, bl.length());
+ _add_buffer(cache, this, Buffer(this, Buffer::STATE_WRITING, seq, offset, std::move(bl),
+ flags), cache_private, (flags & Buffer::FLAG_NOCACHE) ? 0 : 1, nullptr);
+ cache->_trim();
+ }
void write(BufferCacheShard* cache, uint64_t seq, uint32_t offset, ceph::buffer::list& bl,
unsigned flags) {
std::lock_guard l(cache->lock);
nullptr);
cache->_trim();
}
- void _finish_write(BufferCacheShard* cache, uint64_t seq);
- void did_read(BufferCacheShard* cache, uint32_t offset, ceph::buffer::list& bl) {
+ void _finish_write(BufferCacheShard* cache, uint32_t offset, uint64_t seq);
+ void did_read(BufferCacheShard* cache, uint32_t offset, uint32_t length, ceph::buffer::list&& bl) {
std::lock_guard l(cache->lock);
uint16_t cache_private = _discard(cache, offset, bl.length());
_add_buffer(
discard(cache, offset, (uint32_t)-1 - offset);
}
- bool _dup_writing(BufferCacheShard* cache, BufferSpace* to);
+ bool _dup_writing(TransContext* txc, BufferCacheShard* cache, OnodeRef onode);
void split(BufferCacheShard* cache, size_t pos, BufferSpace &r);
void dump(BufferCacheShard* cache, ceph::Formatter *f) const {
ceph_assert(get_cache());
}
Blob(CollectionRef collection) : collection(collection) {}
- BufferSpace bc;
private:
SharedBlobRef shared_blob; ///< shared blob state (if any)
mutable bluestore_blob_t blob; ///< decoded blob metadata
bool can_split() {
std::lock_guard l(get_cache()->lock);
// splitting a BufferSpace writing list is too hard; don't try.
- return get_bc().writing.empty() &&
- used_in_blob.can_split() &&
+ return used_in_blob.can_split() &&
get_blob().can_split();
}
#endif
return blob;
}
- /// clear buffers from unused sections
- void discard_unused_buffers(CephContext* cct, BufferCacheShard* cache);
-
- inline const BufferSpace& get_bc() const {
- return bc;
- }
- inline BufferSpace& dirty_bc() {
- return bc;
- }
/// discard buffers for unallocated regions
- void discard_unallocated(Collection *coll);
+ void discard_unallocated(Collection *coll, BlueStore::Onode *onode, uint32_t logical_offset);
/// get logical references
void get_ref(Collection *coll, uint32_t offset, uint32_t length);
/// put logical references, and get back any released extents
bool put_ref(Collection *coll, uint32_t offset, uint32_t length,
PExtentVector *r);
- // update caches to reflect content up to seq
- void finish_write(uint64_t seq);
/// split the blob
void split(Collection *coll, uint32_t blob_offset, Blob *o);
boost::intrusive::list_member_hook<>,
&OldExtent::old_extent_item> > old_extent_map_t;
- struct Onode;
/// a sharded extent map, mapping offsets to lextents to blobs
struct ExtentMap {
Onode *onode;
extent_map_t extent_map; ///< map of Extents to Blobs
blob_map_t spanning_blob_map; ///< blobs that span shards
- typedef boost::intrusive_ptr<Onode> OnodeRef;
struct Shard {
bluestore_onode_t::shard_info *shard_info = nullptr;
/// (it can be pinned and hence physically out
/// of it at the moment though)
ExtentMap extent_map;
+ BufferSpace bc; ///< buffer cache
// track txc's that have not been committed to kv store (and whose
// effects cannot be read via the kvdb read methods)
private:
void _decode(const ceph::buffer::list& v);
};
- typedef boost::intrusive_ptr<Onode> OnodeRef;
/// A generic Cache Shard
struct CacheShard {
std::set<OnodeRef> modified_objects; ///< objects we modified (and need a ref)
std::set<SharedBlobRef> shared_blobs; ///< these need to be updated/written
- std::set<BlobRef> blobs_written; ///< update these on io completion
+ std::set<std::tuple<Onode*, uint32_t, uint64_t>> buffers_written; ///< update these on io completion (buffer -> offset, seq pair)
+
KeyValueDB::Transaction t; ///< then we will commit this
std::list<Context*> oncommits; ///< more commit completions
std::list<CollectionRef> removed_collections; ///< colls we removed
void _buffer_cache_write(
TransContext *txc,
- BlobRef b,
- uint64_t offset,
+ BlobRef blob,
+ OnodeRef onode,
+ uint32_t offset,
+ ceph::buffer::list&& bl,
+ unsigned flags) {
+ onode->bc.write(onode->c->cache, txc->seq, offset, std::move(bl),
+ flags);
+ txc->buffers_written.insert({onode.get(), offset, txc->seq});
+ }
+
+ void _buffer_cache_write(
+ TransContext *txc,
+ BlobRef blob,
+ OnodeRef onode,
+ uint32_t offset,
ceph::buffer::list& bl,
unsigned flags) {
- b->dirty_bc().write(b->get_cache(), txc->seq, offset, bl,
+ onode->bc.write(onode->c->cache, txc->seq, offset, bl,
flags);
- txc->blobs_written.insert(b);
+ txc->buffers_written.insert({onode.get(), offset, txc->seq});
}
int _collection_list(