state(MDSMap::STATE_BOOT),
stopping(false),
purge_queue(g_ceph_context, whoami_,
- mdsmap_->get_metadata_pool(), objecter),
+ mdsmap_->get_metadata_pool(), objecter,
+ new FunctionContext(
+ [this](int r){
+ // Purge Queue operates inside mds_lock when we're calling into
+ // it, and outside when in background, so must handle both cases.
+ if (mds_lock.is_locked_by_me()) {
+ damaged();
+ } else {
+ damaged_unlocked();
+ }
+ }
+ )
+ ),
progress_thread(this), dispatch_depth(0),
hb(NULL), last_tid(0), osd_epoch_barrier(0), beacon(beacon_),
mds_slow_req_count(0),
CephContext *cct_,
mds_rank_t rank_,
const int64_t metadata_pool_,
- Objecter *objecter_)
+ Objecter *objecter_,
+ Context *on_error_)
:
cct(cct_),
rank(rank_),
journaler("pq", MDS_INO_PURGE_QUEUE + rank, metadata_pool,
CEPH_FS_ONDISK_MAGIC, objecter_, nullptr, 0, &timer,
&finisher),
+ on_error(on_error_),
ops_in_flight(0),
max_purge_ops(0),
drain_initial(0),
draining(false)
{
+ assert(cct != nullptr);
+ assert(on_error != nullptr);
+ assert(objecter != nullptr);
+ journaler.set_write_error_handler(on_error);
}
PurgeQueue::~PurgeQueue()
dout(20) << " decoding entry" << dendl;
PurgeItem item;
bufferlist::iterator q = bl.begin();
- ::decode(item, q);
+ try {
+ ::decode(item, q);
+ } catch (const buffer::error &err) {
+ derr << "Decode error at read_pos=0x" << std::hex
+ << journaler.get_read_pos() << dendl;
+ on_error->complete(0);
+ }
dout(20) << " executing item (0x" << std::hex << item.ino
<< std::dec << ")" << dendl;
_execute_item(item, journaler.get_read_pos());
Journaler journaler;
+ Context *on_error;
+
// Map of Journaler offset to PurgeItem
std::map<uint64_t, PurgeItem> in_flight;
void execute_item_complete(
uint64_t expire_to);
+
public:
void init();
void shutdown();
CephContext *cct_,
mds_rank_t rank_,
const int64_t metadata_pool_,
- Objecter *objecter_);
+ Objecter *objecter_,
+ Context *on_error);
~PurgeQueue();
};