uint16_t cache_private, int level,
Buffer *near)
{
+ ldout(cache->cct, 20) << __func__ << "? " << b << dendl;
cache->_audit("_add_buffer start");
ceph_assert(!b->set_item.is_linked());
- buffer_map.insert(*b);
- b->cache_private = cache_private;
+ bool add_to_map = true;
if (b->is_writing()) {
+ ceph_assert(b->txc);
// we might get already cached data for which resetting mempool is inppropriate
// hence calling try_assign_to_mempool
- b->data.try_assign_to_mempool(mempool::mempool_bluestore_writing);
- cache->get_writings().add_writing(&onode, b);
- } else {
+ if (b->txc->add_writing(&onode, b->offset, b->length)) {
+ b->data.try_assign_to_mempool(mempool::mempool_bluestore_writing);
+ } else if (b->flags & Buffer::FLAG_NOCACHE) {
+ //txc is being finished and hence it hasn't added us to writing list.
+ // And we don't need to cache this buffer.
+ // So we delete it.
+ ldout(cache->cct, 20) << __func__ <<
+ " not added to writing, releasing " << b
+ << dendl;
+ delete b;
+ b = nullptr;
+ add_to_map = false;
+ } else {
+ //txc is being finished and hence it hasn't added us to writing list.
+ // So we can cache it
+ b->state = Buffer::STATE_CLEAN;
+ b->txc = nullptr;
+ b->maybe_rebuild();
+ }
+ }
+ if (add_to_map) {
+ ldout(cache->cct, 20) << __func__ << " added " << b << dendl;
b->data.reassign_to_mempool(mempool::mempool_bluestore_cache_data);
- cache->_add(b, level, near);
+ b->cache_private = cache_private;
+ buffer_map.insert(*b);
+ if (!b->is_writing()) {
+ cache->_add(b, level, near);
+ }
}
- cache->_audit("_add_buffer end");
+ cache->_audit("_add_buffer end");
}
void BlueStore::BufferSpace::__rm_buffer(BufferCacheShard* cache,
if (!b->is_writing()) {
cache->_rm(b);
}
+ ldout(cache->cct, 20) << __func__ << " erasing " << b << dendl;
__erase_from_map(b);
cache->_audit("_rm_buffer end");
}
__erase_from_map(b);
} else {
b->state = Buffer::STATE_CLEAN;
+ b->txc = nullptr;
b->maybe_rebuild();
b->data.reassign_to_mempool(mempool::mempool_bluestore_cache_data);
cache->_add(b, 1, nullptr);
return out;
}
-// Writings
-void BlueStore::Writings::finish_writing(TransContext* txc)
+// TransContext
+bool BlueStore::TransContext::add_writing(Onode* o, uint32_t off, uint32_t len)
{
- // We might get a race when onode cloning/overwriting put more
- // WriteEntries with the same txc while finish_writing() is in progress
- // (and after finished list has been already filled).
- // So we might want to repeat processing for these new onodes.
- // The proposed processing scheme below is apparently valid:
- // there should be no new entries after empty 'finished' container
- // processing.
- //
- write_list_t finished;
- do {
- finished.clear();
- {
- std::lock_guard l(lock);
- auto it = txc_to_buf.find(txc);
- if (it != txc_to_buf.end()) {
- finished.swap(it->second);
- txc_to_buf.erase(it);
- }
- }
- for (auto& e : finished) {
- e.onode->finish_write(txc, e.offset, e.length);
- }
+ std::lock_guard l(writings_lock);
- // If 'finished' is not empty - new entries could appear in txc_to_buf
- // after the last swap. Hence we need to reiterate. And this could even
- // need multiple iterations.
- // But if 'finished' list is empty - no more entries matching the txc
- // would appear (previous onode::finish_write() calls would be logical
- // "walls" as they acquires relevant onode cache's locks).
- // So we're safe to exit the loop.
- //
- } while (!finished.empty());
+ // Need to indicate non-initial observers that we're done.
+ if (were_writings && writings.empty()) {
+ return false;
+ }
+ writings.emplace_back(o, off, len);
+ were_writings = true;
+ return true;
+}
+
+void BlueStore::TransContext::finish_writing()
+{
+ write_list_t finished;
+ {
+ std::lock_guard l(writings_lock);
+ finished.swap(writings);
+ }
+ for (auto& e : finished) {
+ e.onode->finish_write(this, e.offset, e.length);
+ }
}
// OnodeSpace
dout(20) << __func__ << " " << txc << " onodes " << txc->onodes << dendl;
ceph_assert(txc->get_state() == TransContext::STATE_FINISHING);
- writings.finish_writing(txc);
+ txc->finish_writing();
while (!txc->removed_collections.empty()) {
_queue_reap_collection(txc->removed_collections.front());
for (auto i : onode_cache_shards) {
ceph_assert(i->empty());
}
- ceph_assert(writings.empty());
ceph_assert(Buffer::total == 0);
}
}
};
struct BufferCacheShard;
- class Writings;
/// map logical extent range (object) onto buffers
struct BufferSpace {
friend std::ostream& operator<<(std::ostream& out, const BufferSpace& bc);
};
- class Writings {
- struct WriteEntry {
- Onode* onode;
- uint32_t offset;
- uint32_t length;
- WriteEntry(Onode* _o, Buffer* b)
- : onode(_o), offset(b->offset), length(b->length) {}
- };
-
- using write_list_t = mempool::bluestore_writing::list<WriteEntry>;
-
- ceph::mutex lock = ceph::make_mutex("BlueStore::Writings::lock");
-
- // TransContext -> list of <onode, offset, length>
- mempool::bluestore_writing::map<void*, write_list_t> txc_to_buf;
-
- public:
- ~Writings() {
- // sanity
- ceph_assert(txc_to_buf.empty());
- }
- void add_writing(Onode* o, Buffer* b) {
- std::lock_guard l(lock);
- txc_to_buf[b->txc].emplace_back(o, b);
- }
- void finish_writing(TransContext* txc);
-
- bool empty() {
- std::lock_guard l(lock);
- return txc_to_buf.empty();
- }
- } writings;
-
struct SharedBlobSet;
/// in-memory shared blob state (incl cached buffers)
std::atomic<uint64_t> num_extents = {0};
std::atomic<uint64_t> num_blobs = {0};
uint64_t buffer_bytes = 0;
- Writings& writings;
public:
BufferCacheShard(BlueStore* store)
- : CacheShard(store->cct), writings(store->writings) {
+ : CacheShard(store->cct) {
}
virtual ~BufferCacheShard() {
ceph_assert(num_blobs == 0);
virtual void _touch(Buffer *b) = 0;
virtual void _adjust_size(Buffer *b, int64_t delta) = 0;
- Writings& get_writings() { return writings; }
uint64_t _get_bytes() {
return buffer_bytes;
}
ZTracer::Trace trace;
#endif
+ ceph::mutex writings_lock = ceph::make_mutex("BlueStore::TransContextWritings::lock");
+ struct WriteObserverEntry {
+ Onode* onode;
+ uint32_t offset;
+ uint32_t length;
+ WriteObserverEntry(Onode* _o, uint32_t off, uint32_t len)
+ : onode(_o), offset(off), length(len) {}
+ };
+ using write_list_t = mempool::bluestore_writing::list<WriteObserverEntry>;
+ write_list_t writings;
+ bool were_writings = false;
+
+ bool add_writing(Onode* o, uint32_t off, uint32_t len);
+ void finish_writing();
+
explicit TransContext(CephContext* cct, Collection *c, OpSequencer *o,
std::list<Context*> *on_commits)
: ch(c),