]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osdc/Journaler: add 'stopping' check to various finish callbacks
authorYan, Zheng <zyan@redhat.com>
Mon, 11 Dec 2017 00:43:57 +0000 (08:43 +0800)
committerYan, Zheng <zyan@redhat.com>
Mon, 11 Dec 2017 01:12:04 +0000 (09:12 +0800)
These callbacks are executed by finisher. When they are being executed,
Journaler can be in stopping state.

Fixes: http://tracker.ceph.com/issues/22360
Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
src/osdc/Journaler.cc

index 0896e70112d5b048d6807a6fae0f151dc365d39d..a573cffd26278c3ee447e1cedb8595dc0885ce2c 100644 (file)
@@ -213,6 +213,10 @@ void Journaler::_reread_head(Context *onfinish)
 void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
 {
   lock_guard l(lock);
+  if (stopping) {
+    finish->complete(-EAGAIN);
+    return;
+  }
 
   //read on-disk header into
   assert(bl.length() || r < 0 );
@@ -241,6 +245,8 @@ void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
 void Journaler::_finish_read_head(int r, bufferlist& bl)
 {
   lock_guard l(lock);
+  if (stopping)
+    return;
 
   assert(state == STATE_READHEAD);
 
@@ -331,6 +337,10 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end,
                                C_OnFinisher *onfinish)
 {
   lock_guard l(lock);
+  if (stopping) {
+    onfinish->complete(-EAGAIN);
+    return;
+  }
 
   assert(new_end >= write_pos || r < 0);
   ldout(cct, 1) << "_finish_reprobe new_end = " << new_end
@@ -344,6 +354,8 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end,
 void Journaler::_finish_probe_end(int r, uint64_t end)
 {
   lock_guard l(lock);
+  if (stopping)
+    return;
 
   assert(state == STATE_PROBING);
   if (r < 0) { // error in probing
@@ -396,6 +408,10 @@ void Journaler::_finish_reread_head_and_probe(int r, C_OnFinisher *onfinish)
 {
   // Expect to be called back from finish_reread_head, which already takes lock
   // lock is locked
+  if (stopping) {
+    onfinish->complete(-EAGAIN);
+    return;
+  }
 
   assert(!r); //if we get an error, we're boned
   _reprobe(onfinish);
@@ -587,6 +603,8 @@ uint64_t Journaler::append_entry(bufferlist& bl)
 
 void Journaler::_do_flush(unsigned amount)
 {
+  if (stopping)
+    return;
   if (write_pos == flush_pos)
     return;
   assert(write_pos > flush_pos);
@@ -709,6 +727,10 @@ void Journaler::_wait_for_flush(Context *onsafe)
 void Journaler::flush(Context *onsafe)
 {
   lock_guard l(lock);
+  if (stopping) {
+    onsafe->complete(-EAGAIN);
+    return;
+  }
   _flush(wrap_finisher(onsafe));
 }
 
@@ -1013,6 +1035,9 @@ void Journaler::_issue_read(uint64_t len)
 
 void Journaler::_prefetch()
 {
+  if (stopping)
+    return;
+
   ldout(cct, 10) << "_prefetch" << dendl;
   // prefetch
   uint64_t pf;
@@ -1158,6 +1183,10 @@ void Journaler::erase(Context *completion)
 void Journaler::_finish_erase(int data_result, C_OnFinisher *completion)
 {
   lock_guard l(lock);
+  if (stopping) {
+    completion->complete(-EAGAIN);
+    return;
+  }
 
   if (data_result == 0) {
     // Async delete the journal header
@@ -1265,6 +1294,9 @@ void Journaler::trim()
 
 void Journaler::_trim()
 {
+  if (stopping)
+    return;
+
   assert(!readonly);
   uint64_t period = get_layout_period();
   uint64_t trim_to = last_committed.expire_pos;
@@ -1526,7 +1558,9 @@ void Journaler::shutdown()
     f->complete(-EAGAIN);
   }
 
-  finish_contexts(cct, waitfor_recover, -ESHUTDOWN);
+  list<Context*> ls;
+  ls.swap(waitfor_recover);
+  finish_contexts(cct, ls, -ESHUTDOWN);
 
   std::map<uint64_t, std::list<Context*> >::iterator i;
   for (i = waitfor_safe.begin(); i != waitfor_safe.end(); ++i) {