dout(1) << __func__ << dendl;
_osr_drain_all();
- _osr_discard_all();
- assert(osr_set.empty()); // nobody should be creating sequencers!
+ _osr_unregister_all();
mempool_thread.shutdown();
txc->state = TransContext::STATE_DONE;
}
- _osr_reap_done(osr.get());
+ bool empty = _osr_reap_done(osr.get());
+ if (empty && osr->zombie) {
+ dout(10) << __func__ << " reaping empty zombie osr " << osr << dendl;
+ osr->_unregister();
+ }
}
void BlueStore::_txc_release_alloc(TransContext *txc)
txc->released.clear();
}
-void BlueStore::_osr_reap_done(OpSequencer *osr)
+bool BlueStore::_osr_reap_done(OpSequencer *osr)
{
CollectionRef c;
-
+ bool empty = false;
{
std::lock_guard<std::mutex> l(osr->qlock);
dout(20) << __func__ << " osr " << osr << dendl;
delete txc;
osr->qcond.notify_all();
}
- if (osr->q.empty())
+ if (osr->q.empty()) {
dout(20) << __func__ << " osr " << osr << " q now empty" << dendl;
+ empty = true;
+ }
}
if (c) {
c->trim_cache();
}
+
+ return empty;
}
void BlueStore::_osr_drain_all()
std::lock_guard<std::mutex> l(osr_lock);
s = osr_set;
}
+ dout(20) << __func__ << " osr_set " << s << dendl;
deferred_aggressive = true;
{
// submit anything pending
std::lock_guard<std::mutex> l(deferred_lock);
- // include deferred osrs in our wait list; these may have been
- // deregistered already!
- for (auto& osr : deferred_queue) {
- s.insert(&osr);
- }
_deferred_try_submit();
}
{
dout(10) << __func__ << " done" << dendl;
}
-void BlueStore::_osr_discard_all()
+void BlueStore::_osr_unregister_all()
{
- dout(10) << __func__ << " " << osr_set << dendl;
set<OpSequencerRef> s;
{
std::lock_guard<std::mutex> l(osr_lock);
s = osr_set;
}
+ dout(10) << __func__ << " " << s << dendl;
for (auto osr : s) {
- osr->discard();
+ osr->_unregister();
+
+ if (!osr->zombie) {
+ // break link from Sequencer to us so that this OpSequencer
+ // instance can die with this mount/umount cycle. note that
+ // we assume umount() will not race against ~Sequencer.
+ assert(osr->parent);
+ osr->parent->p.reset();
+ }
+ }
+ // nobody should be creating sequencers during umount either.
+ {
+ std::lock_guard<std::mutex> l(osr_lock);
+ assert(osr_set.empty());
}
}
std::atomic_int kv_submitted_waiters = {0};
- std::mutex register_lock;
- bool registered = true;
+ std::atomic_bool registered = {true}; ///< registered in BlueStore's osr_set
+ std::atomic_bool zombie = {false}; ///< owning Sequencer has gone away
OpSequencer(CephContext* cct, BlueStore *store)
: Sequencer_impl(cct),
parent(NULL), store(store) {
- std::lock_guard<std::mutex> l(register_lock);
store->register_osr(this);
}
~OpSequencer() {
assert(q.empty());
- discard();
+ _unregister();
}
void discard() override {
- std::lock_guard<std::mutex> l(register_lock);
+ // Note that we may have txc's in flight when the parent Sequencer
+ // goes away. Reflect this with zombie==registered==true and let
+ // _osr_reap_done or _osr_drain_all clean up later.
+ assert(!zombie);
+ zombie = true;
+ parent = nullptr;
+ bool empty;
+ {
+ std::lock_guard<std::mutex> l(qlock);
+ empty = q.empty();
+ }
+ if (empty) {
+ _unregister();
+ }
+ }
+
+ void _unregister() {
if (registered) {
- registered = false;
store->unregister_osr(this);
+ registered = false;
}
}
void _txc_finish(TransContext *txc);
void _txc_release_alloc(TransContext *txc);
- void _osr_reap_done(OpSequencer *osr);
+ bool _osr_reap_done(OpSequencer *osr);
void _osr_drain_all();
- void _osr_discard_all();
+ void _osr_unregister_all();
void _kv_sync_thread();
void _kv_stop() {