From: John Spray Date: Sat, 11 Feb 2017 14:55:52 +0000 (+0000) Subject: mds: add error handling in PurgeQueue X-Git-Tag: v12.0.1~140^2~11 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=6d59f15e127d3d57b5baa93e92aee83a333e8e19;p=ceph.git mds: add error handling in PurgeQueue For decode errors, and for Journaler errors. Both are considered damage to the MDS rank, as with other per-rank data structures. Signed-off-by: John Spray --- diff --git a/src/mds/MDSRank.cc b/src/mds/MDSRank.cc index 9be0c08b0a9e..1e7b1ce2bbb5 100644 --- a/src/mds/MDSRank.cc +++ b/src/mds/MDSRank.cc @@ -68,7 +68,19 @@ MDSRank::MDSRank( state(MDSMap::STATE_BOOT), stopping(false), purge_queue(g_ceph_context, whoami_, - mdsmap_->get_metadata_pool(), objecter), + mdsmap_->get_metadata_pool(), objecter, + new FunctionContext( + [this](int r){ + // Purge Queue operates inside mds_lock when we're calling into + // it, and outside when in background, so must handle both cases. + if (mds_lock.is_locked_by_me()) { + damaged(); + } else { + damaged_unlocked(); + } + } + ) + ), progress_thread(this), dispatch_depth(0), hb(NULL), last_tid(0), osd_epoch_barrier(0), beacon(beacon_), mds_slow_req_count(0), diff --git a/src/mds/PurgeQueue.cc b/src/mds/PurgeQueue.cc index 8c99a47215e2..478c8bd5a500 100644 --- a/src/mds/PurgeQueue.cc +++ b/src/mds/PurgeQueue.cc @@ -60,7 +60,8 @@ PurgeQueue::PurgeQueue( CephContext *cct_, mds_rank_t rank_, const int64_t metadata_pool_, - Objecter *objecter_) + Objecter *objecter_, + Context *on_error_) : cct(cct_), rank(rank_), @@ -73,11 +74,16 @@ PurgeQueue::PurgeQueue( journaler("pq", MDS_INO_PURGE_QUEUE + rank, metadata_pool, CEPH_FS_ONDISK_MAGIC, objecter_, nullptr, 0, &timer, &finisher), + on_error(on_error_), ops_in_flight(0), max_purge_ops(0), drain_initial(0), draining(false) { + assert(cct != nullptr); + assert(on_error != nullptr); + assert(objecter != nullptr); + journaler.set_write_error_handler(on_error); } PurgeQueue::~PurgeQueue() @@ -264,7 +270,13 @@ void PurgeQueue::_consume() dout(20) << " decoding entry" << dendl; PurgeItem item; bufferlist::iterator q = bl.begin(); - ::decode(item, q); + try { + ::decode(item, q); + } catch (const buffer::error &err) { + derr << "Decode error at read_pos=0x" << std::hex + << journaler.get_read_pos() << dendl; + on_error->complete(0); + } dout(20) << " executing item (0x" << std::hex << item.ino << std::dec << ")" << dendl; _execute_item(item, journaler.get_read_pos()); diff --git a/src/mds/PurgeQueue.h b/src/mds/PurgeQueue.h index e86833d882e6..e572cb4fab8a 100644 --- a/src/mds/PurgeQueue.h +++ b/src/mds/PurgeQueue.h @@ -90,6 +90,8 @@ protected: Journaler journaler; + Context *on_error; + // Map of Journaler offset to PurgeItem std::map in_flight; @@ -118,6 +120,7 @@ protected: void execute_item_complete( uint64_t expire_to); + public: void init(); void shutdown(); @@ -164,7 +167,8 @@ public: CephContext *cct_, mds_rank_t rank_, const int64_t metadata_pool_, - Objecter *objecter_); + Objecter *objecter_, + Context *on_error); ~PurgeQueue(); };