int BlueStore::_deferred_replay()
{
dout(10) << __func__ << " start" << dendl;
- OpSequencerRef osr = new OpSequencer(cct);
+ OpSequencerRef osr = new OpSequencer(cct, this);
int count = 0;
KeyValueDB::Iterator it = db->get_iterator(PREFIX_DEFERRED);
for (it->lower_bound(string()); it->valid(); it->next(), ++count) {
osr = static_cast<OpSequencer *>(posr->p.get());
dout(10) << __func__ << " existing " << osr << " " << *osr << dendl;
} else {
- osr = new OpSequencer(cct);
+ osr = new OpSequencer(cct, this);
osr->parent = posr;
posr->p = osr;
dout(10) << __func__ << " new " << osr << " " << *osr << dendl;
boost::intrusive::list_member_hook<> deferred_osr_queue_item;
Sequencer *parent;
+ BlueStore *store;
std::mutex deferred_apply_mutex;
std::atomic_int kv_submitted_waiters = {0};
- OpSequencer(CephContext* cct)
+ OpSequencer(CephContext* cct, BlueStore *store)
//set the qlock to PTHREAD_MUTEX_RECURSIVE mode
: Sequencer_impl(cct),
- parent(NULL) {
+ parent(NULL), store(store) {
+ store->register_osr(this);
}
~OpSequencer() {
assert(q.empty());
+ store->unregister_osr(this);
}
void queue_new(TransContext *txc) {
vector<Cache*> cache_shards;
+ std::mutex osr_lock; ///< protect osd_set
+ std::set<OpSequencer*> osr_set; ///< set of all OpSequencers
+
std::atomic<uint64_t> nid_last = {0};
std::atomic<uint64_t> nid_max = {0};
std::atomic<uint64_t> blobid_last = {0};
f->close_section();
}
+ void register_osr(OpSequencer *osr) {
+ std::lock_guard<std::mutex> l(osr_lock);
+ osr_set.insert(osr);
+ }
+ void unregister_osr(OpSequencer *osr) {
+ std::lock_guard<std::mutex> l(osr_lock);
+ osr_set.erase(osr);
+ }
+
public:
int statfs(struct store_statfs_t *buf) override;