]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: add error handling in PurgeQueue
authorJohn Spray <john.spray@redhat.com>
Sat, 11 Feb 2017 14:55:52 +0000 (14:55 +0000)
committerJohn Spray <john.spray@redhat.com>
Wed, 8 Mar 2017 10:27:00 +0000 (10:27 +0000)
For decode errors, and for Journaler errors.
Both are considered damage to the MDS rank, as
with other per-rank data structures.

Signed-off-by: John Spray <john.spray@redhat.com>
src/mds/MDSRank.cc
src/mds/PurgeQueue.cc
src/mds/PurgeQueue.h

index 9be0c08b0a9e8194436d395fd072f4a431087e10..1e7b1ce2bbb54019c5c867797cd3be43435f18b3 100644 (file)
@@ -68,7 +68,19 @@ MDSRank::MDSRank(
     state(MDSMap::STATE_BOOT),
     stopping(false),
     purge_queue(g_ceph_context, whoami_,
-        mdsmap_->get_metadata_pool(), objecter),
+      mdsmap_->get_metadata_pool(), objecter,
+      new FunctionContext(
+          [this](int r){
+          // Purge Queue operates inside mds_lock when we're calling into
+          // it, and outside when in background, so must handle both cases.
+          if (mds_lock.is_locked_by_me()) {
+            damaged();
+          } else {
+            damaged_unlocked();
+          }
+        }
+      )
+    ),
     progress_thread(this), dispatch_depth(0),
     hb(NULL), last_tid(0), osd_epoch_barrier(0), beacon(beacon_),
     mds_slow_req_count(0),
index 8c99a47215e2111afbc33d4191fa5e4a7d90d6a2..478c8bd5a500710285ed27a1f06b69fc69b45c6b 100644 (file)
@@ -60,7 +60,8 @@ PurgeQueue::PurgeQueue(
       CephContext *cct_,
       mds_rank_t rank_,
       const int64_t metadata_pool_,
-      Objecter *objecter_)
+      Objecter *objecter_,
+      Context *on_error_)
   :
     cct(cct_),
     rank(rank_),
@@ -73,11 +74,16 @@ PurgeQueue::PurgeQueue(
     journaler("pq", MDS_INO_PURGE_QUEUE + rank, metadata_pool,
       CEPH_FS_ONDISK_MAGIC, objecter_, nullptr, 0, &timer,
       &finisher),
+    on_error(on_error_),
     ops_in_flight(0),
     max_purge_ops(0),
     drain_initial(0),
     draining(false)
 {
+  assert(cct != nullptr);
+  assert(on_error != nullptr);
+  assert(objecter != nullptr);
+  journaler.set_write_error_handler(on_error);
 }
 
 PurgeQueue::~PurgeQueue()
@@ -264,7 +270,13 @@ void PurgeQueue::_consume()
     dout(20) << " decoding entry" << dendl;
     PurgeItem item;
     bufferlist::iterator q = bl.begin();
-    ::decode(item, q);
+    try {
+      ::decode(item, q);
+    } catch (const buffer::error &err) {
+      derr << "Decode error at read_pos=0x" << std::hex
+           << journaler.get_read_pos() << dendl;
+      on_error->complete(0);
+    }
     dout(20) << " executing item (0x" << std::hex << item.ino
              << std::dec << ")" << dendl;
     _execute_item(item, journaler.get_read_pos());
index e86833d882e674f0c9a1785972347e67f9d3c080..e572cb4fab8abf24609440b4f6c4c89cf4018baf 100644 (file)
@@ -90,6 +90,8 @@ protected:
 
   Journaler journaler;
 
+  Context *on_error;
+
   // Map of Journaler offset to PurgeItem
   std::map<uint64_t, PurgeItem> in_flight;
 
@@ -118,6 +120,7 @@ protected:
   void execute_item_complete(
       uint64_t expire_to);
 
+
 public:
   void init();
   void shutdown();
@@ -164,7 +167,8 @@ public:
       CephContext *cct_,
       mds_rank_t rank_,
       const int64_t metadata_pool_,
-      Objecter *objecter_);
+      Objecter *objecter_,
+      Context *on_error);
   ~PurgeQueue();
 };