Buffer *b = &*i;
assert(b->is_clean());
dout(20) << __func__ << " rm " << *b << dendl;
- b->space->_rm_buffer(b);
+ b->space->_rm_buffer(this, b);
}
// onodes
// adjust evict size before buffer goes invalid
to_evict_bytes -= b->length;
evicted += b->length;
- b->space->_rm_buffer(b);
+ b->space->_rm_buffer(this, b);
}
if (evicted > 0) {
Buffer *b = &*buffer_warm_out.rbegin();
assert(b->is_empty());
dout(20) << __func__ << " buffer_warm_out rm " << *b << dendl;
- b->space->_rm_buffer(b);
+ b->space->_rm_buffer(this, b);
}
}
#undef dout_prefix
#define dout_prefix *_dout << "bluestore.BufferSpace(" << this << " in " << cache << ") "
-void BlueStore::BufferSpace::_clear()
+void BlueStore::BufferSpace::_clear(Cache* cache)
{
// note: we already hold cache->lock
ldout(cache->cct, 10) << __func__ << dendl;
while (!buffer_map.empty()) {
- _rm_buffer(buffer_map.begin());
+ _rm_buffer(cache, buffer_map.begin());
}
}
-int BlueStore::BufferSpace::_discard(uint32_t offset, uint32_t length)
+int BlueStore::BufferSpace::_discard(Cache* cache, uint32_t offset, uint32_t length)
{
// note: we already hold cache->lock
ldout(cache->cct, 20) << __func__ << std::hex << " 0x" << offset << "~" << length
if (b->data.length()) {
bufferlist bl;
bl.substr_of(b->data, b->length - tail, tail);
- _add_buffer(new Buffer(this, b->state, b->seq, end, bl), 0, b);
+ _add_buffer(cache, new Buffer(this, b->state, b->seq, end, bl), 0, b);
} else {
- _add_buffer(new Buffer(this, b->state, b->seq, end, tail), 0, b);
+ _add_buffer(cache, new Buffer(this, b->state, b->seq, end, tail), 0, b);
}
if (!b->is_writing()) {
cache->_adjust_buffer_size(b, front - (int64_t)b->length);
}
if (b->end() <= end) {
// drop entire buffer
- _rm_buffer(i++);
+ _rm_buffer(cache, i++);
continue;
}
// drop front
if (b->data.length()) {
bufferlist bl;
bl.substr_of(b->data, b->length - keep, keep);
- _add_buffer(new Buffer(this, b->state, b->seq, end, bl), 0, b);
+ _add_buffer(cache, new Buffer(this, b->state, b->seq, end, bl), 0, b);
} else {
- _add_buffer(new Buffer(this, b->state, b->seq, end, keep), 0, b);
+ _add_buffer(cache, new Buffer(this, b->state, b->seq, end, keep), 0, b);
}
- _rm_buffer(i);
+ _rm_buffer(cache, i);
cache->_audit("discard end 2");
break;
}
}
void BlueStore::BufferSpace::read(
+ Cache* cache,
uint32_t offset, uint32_t length,
BlueStore::ready_regions_t& res,
interval_set<uint32_t>& res_intervals)
cache->logger->inc(l_bluestore_buffer_miss_bytes, miss_bytes);
}
-void BlueStore::BufferSpace::finish_write(uint64_t seq)
+void BlueStore::BufferSpace::finish_write(Cache* cache, uint64_t seq)
{
std::lock_guard<std::recursive_mutex> l(cache->lock);
cache->_audit("finish_write end");
}
-void BlueStore::BufferSpace::split(size_t pos, BlueStore::BufferSpace &r)
+void BlueStore::BufferSpace::split(Cache* cache, size_t pos, BlueStore::BufferSpace &r)
{
std::lock_guard<std::recursive_mutex> lk(cache->lock);
- assert(r.cache == cache);
if (buffer_map.empty())
return;
if (p->second->data.length()) {
bufferlist bl;
bl.substr_of(p->second->data, left, right);
- r._add_buffer(new Buffer(&r, p->second->state, p->second->seq, 0, bl),
+ r._add_buffer(cache, new Buffer(&r, p->second->state, p->second->seq, 0, bl),
0, p->second.get());
} else {
- r._add_buffer(new Buffer(&r, p->second->state, p->second->seq, 0, right),
+ r._add_buffer(cache, new Buffer(&r, p->second->state, p->second->seq, 0, right),
0, p->second.get());
}
cache->_adjust_buffer_size(p->second.get(), -right);
assert(p->second->end() > pos);
ldout(cache->cct, 30) << __func__ << " move " << *p->second << dendl;
if (p->second->data.length()) {
- r._add_buffer(new Buffer(&r, p->second->state, p->second->seq,
+ r._add_buffer(cache, new Buffer(&r, p->second->state, p->second->seq,
p->second->offset - pos, p->second->data),
0, p->second.get());
} else {
- r._add_buffer(new Buffer(&r, p->second->state, p->second->seq,
+ r._add_buffer(cache, new Buffer(&r, p->second->state, p->second->seq,
p->second->offset - pos, p->second->length),
0, p->second.get());
}
if (p == buffer_map.begin()) {
- _rm_buffer(p);
+ _rm_buffer(cache, p);
break;
} else {
- _rm_buffer(p--);
+ _rm_buffer(cache, p--);
}
}
assert(writing.empty());
return out << ")";
}
-BlueStore::SharedBlob::SharedBlob(uint64_t i, Cache *c)
- : sbid(i),
- bc(c)
+BlueStore::SharedBlob::SharedBlob(uint64_t i, Collection *_coll)
+ : sbid(i), coll(_coll)
{
assert(sbid > 0);
+ if (get_cache()) {
+ get_cache()->add_blob();
+ }
}
BlueStore::SharedBlob::~SharedBlob()
{
- if (bc.cache) { // the dummy instances have a nullptr
- std::lock_guard<std::recursive_mutex> l(bc.cache->lock);
- bc._clear();
+ if (get_cache()) { // the dummy instances have a nullptr
+ std::lock_guard<std::recursive_mutex> l(get_cache()->lock);
+ bc._clear(get_cache());
+ get_cache()->rm_blob();
}
}
void BlueStore::SharedBlob::put()
{
if (--nref == 0) {
- ldout(bc.cache->cct, 20) << __func__ << " " << this
- << " removing self from set " << parent_set
- << dendl;
- if (parent_set) {
- if (parent_set->remove(this)) {
+ ldout(coll->store->cct, 20) << __func__ << " " << this
+ << " removing self from set " << get_parent()
+ << dendl;
+ if (get_parent()) {
+ if (get_parent()->remove(this)) {
delete this;
} else {
- ldout(bc.cache->cct, 20)
+ ldout(coll->store->cct, 20)
<< __func__ << " " << this << " lost race to remove myself from set"
<< dendl;
}
assert(discard == all_invalid); // in case of compressed blob all
// or none pextents are invalid.
if (discard) {
- shared_blob->bc.discard(0, blob.get_compressed_payload_original_length());
+ shared_blob->bc.discard(shared_blob->get_cache(), 0, blob.get_compressed_payload_original_length());
}
} else {
size_t pos = 0;
for (auto e : blob.extents) {
if (!e.is_valid()) {
- shared_blob->bc.discard(pos, e.length);
+ shared_blob->bc.discard(shared_blob->get_cache(), pos, e.length);
}
pos += e.length;
}
lb.csum_data = bufferptr(old.c_str(), pos);
}
- shared_blob->bc.split(blob_offset, r->shared_blob->bc);
+ shared_blob->bc.split(shared_blob->get_cache(), blob_offset, r->shared_blob->bc);
dout(10) << __func__ << " 0x" << std::hex << blob_offset << std::dec
<< " finish " << *this << dendl;
assert(!b->shared_blob);
const bluestore_blob_t& blob = b->get_blob();
if (!blob.is_shared()) {
- b->shared_blob = new SharedBlob(cache);
+ b->shared_blob = new SharedBlob(this);
return;
}
ldout(store->cct, 10) << __func__ << " sbid 0x" << std::hex << sbid
<< std::dec << " had " << *b->shared_blob << dendl;
} else {
- b->shared_blob = new SharedBlob(sbid, cache);
- shared_blob_set.add(b->shared_blob.get());
+ b->shared_blob = new SharedBlob(sbid, this);
+ shared_blob_set.add(this, b->shared_blob.get());
ldout(store->cct, 10) << __func__ << " sbid 0x" << std::hex << sbid
<< std::dec << " opened " << *b->shared_blob
<< dendl;
// update shared blob
b->shared_blob->loaded = true; // we are new and therefore up to date
b->shared_blob->sbid = sbid;
- shared_blob_set.add(b->shared_blob.get());
+ shared_blob_set.add(this, b->shared_blob.get());
for (auto p : blob.extents) {
if (p.is_valid()) {
b->shared_blob->shared_blob.ref_map.get(p.offset, p.length);
ready_regions_t cache_res;
interval_set<uint32_t> cache_interval;
- bptr->shared_blob->bc.read(b_off, b_len, cache_res, cache_interval);
+ bptr->shared_blob->bc.read(bptr->shared_blob->get_cache(), b_off, b_len, cache_res, cache_interval);
dout(20) << __func__ << " blob " << *bptr << std::hex
<< " need 0x" << b_off << "~" << b_len
<< " cache has 0x" << cache_interval
if (r < 0)
return r;
if (buffered) {
- bptr->shared_blob->bc.did_read(0, raw_bl);
+ bptr->shared_blob->bc.did_read(bptr->shared_blob->get_cache(), 0, raw_bl);
}
for (auto& i : b2r_it->second) {
ready_regions[i.logical_offset].substr_of(
return -EIO;
}
if (buffered) {
- bptr->shared_blob->bc.did_read(r_off, bl);
+ bptr->shared_blob->bc.did_read(bptr->shared_blob->get_cache(), r_off, bl);
}
// prune and keep result
txc->log_state_latency(logger, l_bluestore_state_io_done_lat);
txc->state = TransContext::STATE_KV_QUEUED;
for (auto& sb : txc->shared_blobs_written) {
- sb->bc.finish_write(txc->seq);
+ sb->bc.finish_write(sb->get_cache(), txc->seq);
}
txc->shared_blobs_written.clear();
if (cct->_conf->bluestore_sync_submit_transaction &&
dout(log_level) << __func__ << " csum: " << std::hex << v << std::dec
<< dendl;
}
- std::lock_guard<std::recursive_mutex> l(e.blob->shared_blob->bc.cache->lock);
+ std::lock_guard<std::recursive_mutex> l(e.blob->shared_blob->get_cache()->lock);
for (auto& i : e.blob->shared_blob->bc.buffer_map) {
dout(log_level) << __func__ << " 0x" << std::hex << i.first
- << "~" << i.second->length << std::dec
- << " " << *i.second << dendl;
+ << "~" << i.second->length << std::dec
+ << " " << *i.second << dendl;
}
}
}
mempool::bluestore_meta_other::map<uint32_t, std::unique_ptr<Buffer>>
buffer_map;
- Cache *cache;
// we use a bare intrusive list here instead of std::map because
// it uses less memory and we expect this to be very small (very
// few IOs in flight to the same Blob at the same time).
state_list_t writing; ///< writing buffers, sorted by seq, ascending
- BufferSpace(Cache *c) : cache(c) {
- if (cache) {
- cache->add_blob();
- }
- }
~BufferSpace() {
assert(buffer_map.empty());
assert(writing.empty());
- if (cache) {
- cache->rm_blob();
- }
}
- void _add_buffer(Buffer *b, int level, Buffer *near) {
+ void _add_buffer(Cache* cache, Buffer *b, int level, Buffer *near) {
cache->_audit("_add_buffer start");
buffer_map[b->offset].reset(b);
if (b->is_writing()) {
}
cache->_audit("_add_buffer end");
}
- void _rm_buffer(Buffer *b) {
- _rm_buffer(buffer_map.find(b->offset));
+ void _rm_buffer(Cache* cache, Buffer *b) {
+ _rm_buffer(cache, buffer_map.find(b->offset));
}
- void _rm_buffer(map<uint32_t,std::unique_ptr<Buffer>>::iterator p) {
+ void _rm_buffer(Cache* cache, map<uint32_t, std::unique_ptr<Buffer>>::iterator p) {
assert(p != buffer_map.end());
cache->_audit("_rm_buffer start");
if (p->second->is_writing()) {
}
// must be called under protection of the Cache lock
- void _clear();
+ void _clear(Cache* cache);
// return value is the highest cache_private of a trimmed buffer, or 0.
- int discard(uint32_t offset, uint32_t length) {
+ int discard(Cache* cache, uint32_t offset, uint32_t length) {
std::lock_guard<std::recursive_mutex> l(cache->lock);
- return _discard(offset, length);
+ return _discard(cache, offset, length);
}
- int _discard(uint32_t offset, uint32_t length);
+ int _discard(Cache* cache, uint32_t offset, uint32_t length);
- void write(uint64_t seq, uint32_t offset, bufferlist& bl, unsigned flags) {
+ void write(Cache* cache, uint64_t seq, uint32_t offset, bufferlist& bl, unsigned flags) {
std::lock_guard<std::recursive_mutex> l(cache->lock);
Buffer *b = new Buffer(this, Buffer::STATE_WRITING, seq, offset, bl,
flags);
- b->cache_private = _discard(offset, bl.length());
- _add_buffer(b, (flags & Buffer::FLAG_NOCACHE) ? 0 : 1, nullptr);
+ b->cache_private = _discard(cache, offset, bl.length());
+ _add_buffer(cache, b, (flags & Buffer::FLAG_NOCACHE) ? 0 : 1, nullptr);
}
- void finish_write(uint64_t seq);
- void did_read(uint32_t offset, bufferlist& bl) {
+ void finish_write(Cache* cache, uint64_t seq);
+ void did_read(Cache* cache, uint32_t offset, bufferlist& bl) {
std::lock_guard<std::recursive_mutex> l(cache->lock);
Buffer *b = new Buffer(this, Buffer::STATE_CLEAN, 0, offset, bl);
- b->cache_private = _discard(offset, bl.length());
- _add_buffer(b, 1, nullptr);
+ b->cache_private = _discard(cache, offset, bl.length());
+ _add_buffer(cache, b, 1, nullptr);
}
- void read(uint32_t offset, uint32_t length,
+ void read(Cache* cache, uint32_t offset, uint32_t length,
BlueStore::ready_regions_t& res,
interval_set<uint32_t>& res_intervals);
- void truncate(uint32_t offset) {
- discard(offset, (uint32_t)-1 - offset);
+ void truncate(Cache* cache, uint32_t offset) {
+ discard(cache, offset, (uint32_t)-1 - offset);
}
- void split(size_t pos, BufferSpace &r);
+ void split(Cache* cache, size_t pos, BufferSpace &r);
- void dump(Formatter *f) const {
+ void dump(Cache* cache, Formatter *f) const {
std::lock_guard<std::recursive_mutex> l(cache->lock);
f->open_array_section("buffers");
for (auto& i : buffer_map) {
};
struct SharedBlobSet;
+ struct Collection;
/// in-memory shared blob state (incl cached buffers)
struct SharedBlob {
// these are defined/set if the blob is marked 'shared'
uint64_t sbid = 0; ///< shared blob id
- SharedBlobSet *parent_set = 0; ///< containing SharedBlobSet
-
+ Collection* coll = nullptr;
BufferSpace bc; ///< buffer cache
- SharedBlob(Cache *c) : bc(c) {}
- SharedBlob(uint64_t i, Cache *c);
+ SharedBlob(Collection *_coll) : coll(_coll) {
+ if (get_cache()) {
+ get_cache()->add_blob();
+ }
+ }
+ SharedBlob(uint64_t i, Collection *_coll);
~SharedBlob();
friend void intrusive_ptr_add_ref(SharedBlob *b) { b->get(); }
rjhash<uint32_t> h;
return h(e.sbid);
}
+ inline Cache* get_cache() {
+ return coll ? coll->cache : nullptr;
+ }
+ inline SharedBlobSet* get_parent() {
+ return coll ? &(coll->shared_blob_set) : nullptr;
+ }
};
typedef boost::intrusive_ptr<SharedBlob> SharedBlobRef;
return p->second;
}
- void add(SharedBlob *sb) {
+ void add(Collection* coll, SharedBlob *sb) {
std::lock_guard<std::mutex> l(lock);
sb_map[sb->sbid] = sb;
- sb->parent_set = this;
+ sb->coll = coll;
}
bool remove(SharedBlob *sb) {
std::lock_guard<std::mutex> l(lock);
if (sb->nref == 0) {
- assert(sb->parent_set == this);
+ assert(sb->get_parent() == this);
sb_map.erase(sb->sbid);
return true;
}
std::lock_guard<std::mutex> l(lock);
return sb_map.empty();
}
-
- void violently_clear() {
- std::lock_guard<std::mutex> l(lock);
- for (auto& p : sb_map) {
- p.second->parent_set = nullptr;
- }
- sb_map.clear();
- }
};
//#define CACHE_BLOB_BL // not sure if this is a win yet or not... :/
}
bool can_split() const {
- std::lock_guard<std::recursive_mutex> l(shared_blob->bc.cache->lock);
+ std::lock_guard<std::recursive_mutex> l(shared_blob->get_cache()->lock);
// splitting a BufferSpace writing list is too hard; don't try.
return shared_blob->bc.writing.empty() && get_blob().can_split();
}
}
~Extent() {
if (blob) {
- blob->shared_blob->bc.cache->rm_extent();
+ blob->shared_blob->get_cache()->rm_extent();
}
}
void assign_blob(const BlobRef& b) {
assert(!blob);
blob = b;
- blob->shared_blob->bc.cache->add_extent();
+ blob->shared_blob->get_cache()->add_extent();
}
// comparators for intrusive_set
BlobRef new_blob() {
BlobRef b = new Blob(store->cct);
- b->shared_blob = new SharedBlob(cache);
+ b->shared_blob = new SharedBlob(this);
return b;
}
uint64_t offset,
bufferlist& bl,
unsigned flags) {
- b->shared_blob->bc.write(txc->seq, offset, bl, flags);
+ b->shared_blob->bc.write(b->shared_blob->get_cache(), txc->seq, offset, bl, flags);
txc->shared_blobs_written.insert(b->shared_blob);
}
TEST(Blob, split)
{
+ BlueStore store(g_ceph_context, "");
BlueStore::Cache *cache = BlueStore::Cache::create(
g_ceph_context, "lru", NULL);
+ BlueStore::Collection coll(&store, cache, coll_t());
{
BlueStore::Blob L(g_ceph_context), R(g_ceph_context);
- L.shared_blob = new BlueStore::SharedBlob(cache);
+ L.shared_blob = new BlueStore::SharedBlob(&coll);
L.shared_blob->get(); // hack to avoid dtor from running
- R.shared_blob = new BlueStore::SharedBlob(cache);
+ R.shared_blob = new BlueStore::SharedBlob(&coll);
R.shared_blob->get(); // hack to avoid dtor from running
L.dirty_blob().extents.emplace_back(bluestore_pextent_t(0x2000, 0x2000));
L.dirty_blob().init_csum(Checksummer::CSUM_CRC32C, 12, 0x2000);
}
{
BlueStore::Blob L(g_ceph_context), R(g_ceph_context);
- L.shared_blob = new BlueStore::SharedBlob(cache);
+ L.shared_blob = new BlueStore::SharedBlob(&coll);
L.shared_blob->get(); // hack to avoid dtor from running
- R.shared_blob = new BlueStore::SharedBlob(cache);
+ R.shared_blob = new BlueStore::SharedBlob(&coll);
R.shared_blob->get(); // hack to avoid dtor from running
L.dirty_blob().extents.emplace_back(bluestore_pextent_t(0x2000, 0x1000));
L.dirty_blob().extents.emplace_back(bluestore_pextent_t(0x12000, 0x1000));
TEST(ExtentMap, find_lextent)
{
+ BlueStore store(g_ceph_context, "");
BlueStore::LRUCache cache(g_ceph_context);
BlueStore::ExtentMap em(g_ceph_context, nullptr);
BlueStore::BlobRef br(new BlueStore::Blob(g_ceph_context));
- br->shared_blob = new BlueStore::SharedBlob(&cache);
+ BlueStore::Collection coll(&store, &cache, coll_t());
+ br->shared_blob = new BlueStore::SharedBlob(&coll);
ASSERT_EQ(em.extent_map.end(), em.find_lextent(0));
ASSERT_EQ(em.extent_map.end(), em.find_lextent(100));
TEST(ExtentMap, seek_lextent)
{
+ BlueStore store(g_ceph_context, "");
BlueStore::LRUCache cache(g_ceph_context);
BlueStore::ExtentMap em(g_ceph_context, nullptr);
BlueStore::BlobRef br(new BlueStore::Blob(g_ceph_context));
- br->shared_blob = new BlueStore::SharedBlob(&cache);
+ BlueStore::Collection coll(&store, &cache, coll_t());
+ br->shared_blob = new BlueStore::SharedBlob(&coll);
ASSERT_EQ(em.extent_map.end(), em.seek_lextent(0));
ASSERT_EQ(em.extent_map.end(), em.seek_lextent(100));
TEST(ExtentMap, has_any_lextents)
{
+ BlueStore store(g_ceph_context, "");
BlueStore::LRUCache cache(g_ceph_context);
BlueStore::ExtentMap em(g_ceph_context, nullptr);
BlueStore::BlobRef b(new BlueStore::Blob(g_ceph_context));
- b->shared_blob = new BlueStore::SharedBlob(&cache);
+ BlueStore::Collection coll(&store, &cache, coll_t());
+ b->shared_blob = new BlueStore::SharedBlob(&coll);
ASSERT_FALSE(em.has_any_lextents(0, 0));
ASSERT_FALSE(em.has_any_lextents(0, 1000));
TEST(ExtentMap, compress_extent_map)
{
+ BlueStore store(g_ceph_context, "");
BlueStore::LRUCache cache(g_ceph_context);
BlueStore::ExtentMap em(g_ceph_context, nullptr);
BlueStore::BlobRef b1(new BlueStore::Blob(g_ceph_context));
BlueStore::BlobRef b2(new BlueStore::Blob(g_ceph_context));
BlueStore::BlobRef b3(new BlueStore::Blob(g_ceph_context));
- b1->shared_blob = new BlueStore::SharedBlob(&cache);
- b2->shared_blob = new BlueStore::SharedBlob(&cache);
- b3->shared_blob = new BlueStore::SharedBlob(&cache);
+ BlueStore::Collection coll(&store, &cache, coll_t());
+ b1->shared_blob = new BlueStore::SharedBlob(&coll);
+ b2->shared_blob = new BlueStore::SharedBlob(&coll);
+ b3->shared_blob = new BlueStore::SharedBlob(&coll);
em.extent_map.insert(*new BlueStore::Extent(0, 0, 100, b1));
em.extent_map.insert(*new BlueStore::Extent(100, 0, 100, b2));