]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osdc/Journaler: add a shutdown() method
authorJohn Spray <john.spray@redhat.com>
Tue, 16 Jun 2015 13:25:33 +0000 (14:25 +0100)
committerJohn Spray <john.spray@redhat.com>
Thu, 25 Jun 2015 15:19:24 +0000 (16:19 +0100)
Because consumers may be blocked on the on_readable
condition, it is necessary for Journaler to have
an explicit shutdown method that fires the completion

Signed-off-by: John Spray <john.spray@redhat.com>
src/osdc/Journaler.cc
src/osdc/Journaler.h

index a723c164d74bbe7907dc657803dcc39e4dcb8ceb..3daea9400a51d9271c8f91a55c99832ae0034954 100644 (file)
@@ -141,6 +141,10 @@ public:
 void Journaler::recover(Context *onread) 
 {
   Mutex::Locker l(lock);
+  if (stopping) {
+    onread->complete(-EAGAIN);
+    return;
+  }
 
   ldout(cct, 1) << "recover start" << dendl;
   assert(state != STATE_ACTIVE);
@@ -150,7 +154,7 @@ void Journaler::recover(Context *onread)
     waitfor_recover.push_back(onread);
   
   if (state != STATE_UNDEF) {
-    ldout(cct, 1) << "recover - already recoverying" << dendl;
+    ldout(cct, 1) << "recover - already recovering" << dendl;
     return;
   }
 
@@ -634,6 +638,10 @@ void Journaler::_do_flush(unsigned amount)
 void Journaler::wait_for_flush(Context *onsafe)
 {
   Mutex::Locker l(lock);
+  if (stopping) {
+    onsafe->complete(-EAGAIN);
+    return;
+  }
   _wait_for_flush(onsafe);
 }
 
@@ -1122,6 +1130,10 @@ bool Journaler::try_read_entry(bufferlist& bl)
 void Journaler::wait_for_readable(Context *onreadable)
 {
   Mutex::Locker l(lock);
+  if (stopping) {
+    onreadable->complete(-EAGAIN);
+    return;
+  }
 
   assert(on_readable == 0);
   if (!readable) {
@@ -1395,3 +1407,30 @@ C_OnFinisher *Journaler::wrap_finisher(Context *c)
     return NULL;
   }
 }
+
+void Journaler::shutdown()
+{
+  Mutex::Locker l(lock);
+
+  ldout(cct, 1) << __func__ << dendl;
+
+  readable = false;
+  stopping = true;
+
+  // Kick out anyone reading from journal
+  error = -EAGAIN;
+  if (on_readable) {
+    C_OnFinisher *f = on_readable;
+    on_readable = 0;
+    f->complete(-EAGAIN);
+  }
+
+  finish_contexts(cct, waitfor_recover, 0);
+
+  std::map<uint64_t, std::list<Context*> >::iterator i;
+  for (i = waitfor_safe.begin(); i != waitfor_safe.end(); ++i) {
+    finish_contexts(cct, i->second, -EAGAIN);
+  }
+  waitfor_safe.clear();
+}
+
index fead2af5d040a6b655f3391c828142f58e15bfe8..97e2822a4396c81b31e1cc8e229b150f49609e9b 100644 (file)
@@ -387,7 +387,8 @@ public:
     read_pos(0), requested_pos(0), received_pos(0),
     fetch_len(0), temp_fetch_len(0),
     on_readable(0), on_write_error(NULL), called_write_error(false),
-    expire_pos(0), trimming_pos(0), trimmed_pos(0), readable(false)
+    expire_pos(0), trimming_pos(0), trimmed_pos(0), readable(false),
+    stopping(false)
   {
     memset(&layout, 0, sizeof(layout));
   }
@@ -468,6 +469,15 @@ public:
 
   void set_write_error_handler(Context *c);
 
+  /**
+   * Cause any ongoing waits to error out with -EAGAIN, set error
+   * to -EAGAIN.
+   */
+  void shutdown();
+protected:
+  bool stopping;
+public:
+
   // Synchronous getters
   // ===================
   // TODO: need some locks on reads for true safety