void Journaler::recover(Context *onread)
{
Mutex::Locker l(lock);
+ if (stopping) {
+ onread->complete(-EAGAIN);
+ return;
+ }
ldout(cct, 1) << "recover start" << dendl;
assert(state != STATE_ACTIVE);
waitfor_recover.push_back(onread);
if (state != STATE_UNDEF) {
- ldout(cct, 1) << "recover - already recoverying" << dendl;
+ ldout(cct, 1) << "recover - already recovering" << dendl;
return;
}
void Journaler::wait_for_flush(Context *onsafe)
{
Mutex::Locker l(lock);
+ if (stopping) {
+ onsafe->complete(-EAGAIN);
+ return;
+ }
_wait_for_flush(onsafe);
}
void Journaler::wait_for_readable(Context *onreadable)
{
Mutex::Locker l(lock);
+ if (stopping) {
+ onreadable->complete(-EAGAIN);
+ return;
+ }
assert(on_readable == 0);
if (!readable) {
return NULL;
}
}
+
+void Journaler::shutdown()
+{
+ Mutex::Locker l(lock);
+
+ ldout(cct, 1) << __func__ << dendl;
+
+ readable = false;
+ stopping = true;
+
+ // Kick out anyone reading from journal
+ error = -EAGAIN;
+ if (on_readable) {
+ C_OnFinisher *f = on_readable;
+ on_readable = 0;
+ f->complete(-EAGAIN);
+ }
+
+ finish_contexts(cct, waitfor_recover, 0);
+
+ std::map<uint64_t, std::list<Context*> >::iterator i;
+ for (i = waitfor_safe.begin(); i != waitfor_safe.end(); ++i) {
+ finish_contexts(cct, i->second, -EAGAIN);
+ }
+ waitfor_safe.clear();
+}
+
read_pos(0), requested_pos(0), received_pos(0),
fetch_len(0), temp_fetch_len(0),
on_readable(0), on_write_error(NULL), called_write_error(false),
- expire_pos(0), trimming_pos(0), trimmed_pos(0), readable(false)
+ expire_pos(0), trimming_pos(0), trimmed_pos(0), readable(false),
+ stopping(false)
{
memset(&layout, 0, sizeof(layout));
}
void set_write_error_handler(Context *c);
+ /**
+ * Cause any ongoing waits to error out with -EAGAIN, set error
+ * to -EAGAIN.
+ */
+ void shutdown();
+protected:
+ bool stopping;
+public:
+
// Synchronous getters
// ===================
// TODO: need some locks on reads for true safety