MDS::MDS(const std::string &n, Messenger *m, MonClient *mc) :
Dispatcher(m->cct),
mds_lock("MDS::mds_lock"),
+ stopping(false),
timer(m->cct, mds_lock),
hb(NULL),
beacon(m->cct, mc, n),
{
assert(signum == SIGINT || signum == SIGTERM);
derr << "*** got signal " << sys_siglist[signum] << " ***" << dendl;
- mds_lock.Lock();
- suicide();
- mds_lock.Unlock();
+ {
+ Mutex::Locker l(mds_lock);
+ if (stopping) {
+ return;
+ }
+ suicide();
+ }
}
void MDS::damaged()
void MDS::suicide(bool fast)
{
assert(mds_lock.is_locked());
+ // It should never be possible to suicide to get called twice, because
+ // anyone picking up mds_lock checks if stopping is true and drops
+ // out if it is.
+ assert(stopping == false);
+ stopping = true;
+
set_want_state(MDSMap::STATE_DNE); // whatever.
if (!fast && !mdsmap->is_dne_gid(mds_gid_t(monc->get_global_id()))) {
dout(0) << "respawn execv " << orig_argv[0]
<< " failed with " << cpp_strerror(errno) << dendl;
- suicide(true);
+
+ // We have to assert out here, because suicide() returns, and callers
+ // to respawn expect it never to return.
+ assert(0);
}
void MDS::handle_write_error(int err)
bool MDS::ms_dispatch(Message *m)
{
- bool ret;
- mds_lock.Lock();
+ bool ret = false;
+
+ Mutex::Locker l(mds_lock);
+ if (stopping) {
+ return false;
+ }
heartbeat_reset();
ret = _dispatch(m, true);
dec_dispatch_depth();
}
- mds_lock.Unlock();
+
return ret;
}
return false;
Mutex::Locker l(mds_lock);
+ if (stopping) {
+ return false;
+ }
dout(5) << "ms_handle_reset on " << con->get_peer_addr() << dendl;
if (want_state == CEPH_MDS_STATE_DNE)
return false;
return;
Mutex::Locker l(mds_lock);
+ if (stopping) {
+ return;
+ }
+
dout(5) << "ms_handle_remote_reset on " << con->get_peer_addr() << dendl;
if (want_state == CEPH_MDS_STATE_DNE)
return;
bool& is_valid, CryptoKey& session_key)
{
Mutex::Locker l(mds_lock);
+ if (stopping) {
+ return false;
+ }
if (want_state == CEPH_MDS_STATE_DNE)
return false;
void MDS::ms_handle_accept(Connection *con)
{
Mutex::Locker l(mds_lock);
+ if (stopping) {
+ return;
+ }
+
Session *s = static_cast<Session *>(con->get_priv());
dout(10) << "ms_handle_accept " << con->get_peer_addr() << " con " << con << " session " << s << dendl;
if (s) {
{
Mutex::Locker l(mds->mds_lock);
while (true) {
- while (!stopping &&
+ while (!mds->stopping &&
mds->finished_queue.empty() &&
(mds->waiting_for_nolaggy.empty() || mds->beacon.is_laggy())) {
cond.Wait(mds->mds_lock);
}
- if (stopping) {
+ if (mds->stopping) {
break;
}
void MDS::ProgressThread::shutdown()
{
assert(mds->mds_lock.is_locked_by_me());
+ assert(mds->stopping);
- stopping = true;
- cond.Signal();
- mds->mds_lock.Unlock();
- if (is_started())
- join();
- mds->mds_lock.Lock();
+ if (am_self()) {
+ // Stopping is set, we will fall out of our main loop naturally
+ } else {
+ // Kick the thread to notice mds->stopping, and join it
+ cond.Signal();
+ mds->mds_lock.Unlock();
+ if (is_started())
+ join();
+ mds->mds_lock.Lock();
+ }
}
/**
class MDS : public Dispatcher, public md_config_obs_t {
public:
+
+ /* Global MDS lock: every time someone takes this, they must
+ * also check the `stopping` flag. If stopping is true, you
+ * must either do nothing and immediately drop the lock, or
+ * never drop the lock again (i.e. call respawn()) */
Mutex mds_lock;
+ bool stopping;
+
SafeTimer timer;
private:
private:
class ProgressThread : public Thread {
MDS *mds;
- bool stopping;
Cond cond;
public:
- ProgressThread(MDS *mds_) : mds(mds_), stopping(false) {}
+ ProgressThread(MDS *mds_) : mds(mds_) {}
void * entry();
void shutdown();
void signal() {cond.Signal();}
/**
* Terminate this daemon process.
*
+ * This function will return, but once it does so the calling thread
+ * must do no more work as all subsystems will have been shut down.
+ *
* @param fast: if true, do not send a message to the mon before shutting
* down
*/