*/
struct Sequencer_impl : public RefCountedObject {
CephContext* cct;
+
+ // block until any previous transactions are visible. specifically,
+ // collection_list and collection_empty need to reflect prior operations.
virtual void flush() = 0;
+ // called when we are done with the impl. the impl may have a different
+ // (longer) lifecycle than the Sequencer.
+ virtual void discard() {}
+
/**
* Async flush_commit
*
Sequencer_implRef p;
explicit Sequencer(string n)
- : name(n), shard_hint(spg_t()), p(NULL) {}
+ : name(n), shard_hint(spg_t()), p(NULL) {
+ }
~Sequencer() {
+ if (p)
+ p->discard(); // tell impl we are done with it
}
/// return a unique string identifier for this sequencer
dout(1) << __func__ << dendl;
_osr_drain_all();
+ _osr_discard_all();
+ assert(osr_set.empty()); // nobody should be creating sequencers!
mempool_thread.shutdown();
{
dout(10) << __func__ << dendl;
- // WARNING: we make a (somewhat sloppy) assumption here that
- // no OpSequencers will be created or destroyed for the duration
- // of this method.
- set<OpSequencer*> s;
+ set<OpSequencerRef> s;
{
std::lock_guard<std::mutex> l(osr_lock);
s = osr_set;
}
for (auto osr : s) {
- dout(20) << __func__ << " flush " << osr << dendl;
+ dout(20) << __func__ << " drain " << osr << dendl;
osr->drain();
}
dout(10) << __func__ << " done" << dendl;
}
+void BlueStore::_osr_discard_all()
+{
+ dout(10) << __func__ << " " << osr_set << dendl;
+ set<OpSequencerRef> s;
+ {
+ std::lock_guard<std::mutex> l(osr_lock);
+ s = osr_set;
+ }
+ for (auto osr : s) {
+ osr->discard();
+ }
+}
+
void BlueStore::_kv_sync_thread()
{
dout(10) << __func__ << " start" << dendl;
_txc_state_proc(txc);
}
dout(20) << __func__ << " draining osr" << dendl;
- osr->drain();
+ _osr_drain_all();
+ osr->discard();
dout(10) << __func__ << " completed " << count << " events" << dendl;
return 0;
}
std::atomic_int kv_submitted_waiters = {0};
+ std::mutex register_lock;
+ bool registered = true;
+
OpSequencer(CephContext* cct, BlueStore *store)
//set the qlock to PTHREAD_MUTEX_RECURSIVE mode
: Sequencer_impl(cct),
parent(NULL), store(store) {
+ std::lock_guard<std::mutex> l(register_lock);
store->register_osr(this);
}
~OpSequencer() {
assert(q.empty());
- store->unregister_osr(this);
+ discard();
+ }
+
+ void discard() override {
+ std::lock_guard<std::mutex> l(register_lock);
+ if (registered) {
+ registered = false;
+ store->unregister_osr(this);
+ }
}
void queue_new(TransContext *txc) {
vector<Cache*> cache_shards;
- std::mutex osr_lock; ///< protect osd_set
- std::set<OpSequencer*> osr_set; ///< set of all OpSequencers
+ std::mutex osr_lock; ///< protect osd_set
+ std::set<OpSequencerRef> osr_set; ///< set of all OpSequencers
std::atomic<uint64_t> nid_last = {0};
std::atomic<uint64_t> nid_max = {0};
void _osr_reap_done(OpSequencer *osr);
void _osr_drain_all();
+ void _osr_discard_all();
void _kv_sync_thread();
void _kv_stop() {