From: John Spray Date: Thu, 1 Dec 2016 20:22:43 +0000 (+0000) Subject: mds: use a persistent queue for purging deleted files X-Git-Tag: v12.0.1~140^2~23 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8ebf7d95a9071de24bb1e56a6423c505169cb4de;p=ceph-ci.git mds: use a persistent queue for purging deleted files To avoid creating stray directories of unbounded size and all the associated pain, use a more appropriate datastructure to store a FIFO of inodes that need purging. Fixes: http://tracker.ceph.com/issues/11950 Signed-off-by: John Spray --- diff --git a/src/mds/CMakeLists.txt b/src/mds/CMakeLists.txt index 72d370992a6..b7f7d0c5115 100644 --- a/src/mds/CMakeLists.txt +++ b/src/mds/CMakeLists.txt @@ -11,6 +11,7 @@ set(mds_srcs MDCache.cc RecoveryQueue.cc StrayManager.cc + PurgeQueue.cc Locker.cc Migrator.cc MDBalancer.cc diff --git a/src/mds/MDCache.cc b/src/mds/MDCache.cc index 37c42f40ddb..385b12e2e19 100644 --- a/src/mds/MDCache.cc +++ b/src/mds/MDCache.cc @@ -461,6 +461,8 @@ void MDCache::create_mydir_hierarchy(MDSGather *gather) mydir->commit(0, gather->new_sub()); myin->store(gather->new_sub()); + + stray_manager.purge_queue.create(new C_IO_Wrapper(mds, gather->new_sub())); } struct C_MDC_CreateSystemFile : public MDCacheLogContext { @@ -584,8 +586,15 @@ void MDCache::open_root_inode(MDSInternalContextBase *c) void MDCache::open_mydir_inode(MDSInternalContextBase *c) { + MDSGatherBuilder gather(g_ceph_context); + CInode *in = create_system_inode(MDS_INO_MDSDIR(mds->get_nodeid()), S_IFDIR|0755); // initially inaccurate! - in->fetch(c); + in->fetch(gather.new_sub()); + + stray_manager.purge_queue.open(new C_IO_Wrapper(mds, gather.new_sub())); + + gather.set_finisher(c); + gather.activate(); } void MDCache::open_root() diff --git a/src/mds/MDCache.h b/src/mds/MDCache.h index 3e49b4020e9..a8e174cd781 100644 --- a/src/mds/MDCache.h +++ b/src/mds/MDCache.h @@ -73,7 +73,7 @@ enum { l_mdc_first = 3000, // How many inodes currently in stray dentries l_mdc_num_strays, - // How many stray dentries are currently being purged + // How many stray dentries are currently enqueued for purge l_mdc_num_strays_purging, // How many stray dentries are currently delayed for purge due to refs l_mdc_num_strays_delayed, diff --git a/src/mds/MDSRank.cc b/src/mds/MDSRank.cc index e7da0aec4aa..f87cb40ee60 100644 --- a/src/mds/MDSRank.cc +++ b/src/mds/MDSRank.cc @@ -159,6 +159,8 @@ void MDSRankDispatcher::init() progress_thread.create("mds_rank_progr"); + mdcache->stray_manager.purge_queue.init(); + finisher->start(); } @@ -239,6 +241,8 @@ void MDSRankDispatcher::shutdown() // shut down cache mdcache->shutdown(); + mdcache->stray_manager.purge_queue.shutdown(); + if (objecter->initialized.read()) objecter->shutdown(); diff --git a/src/mds/PurgeQueue.cc b/src/mds/PurgeQueue.cc new file mode 100644 index 00000000000..56f5e20fe29 --- /dev/null +++ b/src/mds/PurgeQueue.cc @@ -0,0 +1,284 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "common/debug.h" +#include "mds/mdstypes.h" +#include "mds/CInode.h" + +#include "PurgeQueue.h" + + +#define dout_context cct +#define dout_subsys ceph_subsys_mds +#undef dout_prefix +#define dout_prefix _prefix(_dout, rank) << __func__ << ": " +static ostream& _prefix(std::ostream *_dout, mds_rank_t rank) { + return *_dout << "mds." << rank << ".purge_queue "; +} + +void PurgeItem::encode(bufferlist &bl) const +{ + ENCODE_START(1, 1, bl); + ::encode(ino, bl); + ::encode(size, bl); + ::encode(layout, bl, CEPH_FEATURE_FS_FILE_LAYOUT_V2); + ::encode(old_pools, bl); + ::encode(snapc, bl); + ENCODE_FINISH(bl); +} + +void PurgeItem::decode(bufferlist::iterator &p) +{ + DECODE_START(1, p); + ::decode(ino, p); + ::decode(size, p); + ::decode(layout, p); + ::decode(old_pools, p); + ::decode(snapc, p); + DECODE_FINISH(p); +} + +// TODO: implement purge queue creation on startup +// if we are on a filesystem created before purge queues existed +// TODO: ensure that a deactivating MDS rank blocks +// on complete drain of this queue before finishing +// TODO: when we're deactivating, lift all limits on +// how many OSD ops we're allowed to emit at a time to +// race through the queue as fast as we can. +// TODO: populate logger here to gather latency stat? +// ...and a stat for the size of the queue, if we can +// somehow track that? Could do an initial pass through +// the whole queue to count the items at startup? +// TODO: there is absolutely no reason to consume an inode number +// for this. Shoudl just give objects a string name with a rank +// suffix, like we do for MDSTables. Requires a little refactor +// of Journaler. +PurgeQueue::PurgeQueue( + CephContext *cct_, + mds_rank_t rank_, + const int64_t metadata_pool_, + Objecter *objecter_) + : + cct(cct_), + rank(rank_), + lock("PurgeQueue"), + metadata_pool(metadata_pool_), + finisher(cct, "PurgeQueue", "PQ_Finisher"), + timer(cct, lock), + filer(objecter_, &finisher), + objecter(objecter_), + journaler("pq", MDS_INO_PURGE_QUEUE + rank, metadata_pool, + CEPH_FS_ONDISK_MAGIC, objecter_, nullptr, 0, &timer, + &finisher) +{ +} + +void PurgeQueue::init() +{ + Mutex::Locker l(lock); + + finisher.start(); + timer.init(); +} + +void PurgeQueue::shutdown() +{ + Mutex::Locker l(lock); + + journaler.shutdown(); + timer.shutdown(); + finisher.stop(); +} + +void PurgeQueue::open(Context *completion) +{ + dout(4) << "opening" << dendl; + + Mutex::Locker l(lock); + + journaler.recover(new FunctionContext([this, completion](int r){ + Mutex::Locker l(lock); + dout(4) << "open complete" << dendl; + if (r == 0) { + journaler.set_writeable(); + } + completion->complete(r); + })); +} + +void PurgeQueue::create(Context *fin) +{ + dout(4) << "creating" << dendl; + Mutex::Locker l(lock); + + file_layout_t layout = file_layout_t::get_default(); + layout.pool_id = metadata_pool; + journaler.set_writeable(); + journaler.create(&layout, JOURNAL_FORMAT_RESILIENT); + journaler.write_head(fin); +} + +void PurgeQueue::push(const PurgeItem &pi, Context *completion) +{ + dout(4) << "pushing inode 0x" << std::hex << pi.ino << std::dec << dendl; + Mutex::Locker l(lock); + + // Callers should have waited for open() before using us + assert(!journaler.is_readonly()); + + bufferlist bl; + + ::encode(pi, bl); + journaler.append_entry(bl); + + // Note that flush calls are not 1:1 with IOs, Journaler + // does its own batching. So we just call every time. + // FIXME: *actually* as soon as we call _consume it will + // do a flush via _issue_read, so we really are doing one + // write per event. Avoid this by avoiding doing the journaler + // read (see "if we could consume this PurgeItem immediately...") + journaler.flush(completion); + + // Maybe go ahead and do something with it right away + _consume(); + + // TODO: if we could consume this PurgeItem immediately, and + // Journaler does not have any outstanding prefetches, then + // short circuit its read by advancing read_pos to write_pos + // and passing the PurgeItem straight into _execute_item +} + +bool PurgeQueue::can_consume() +{ + // TODO: enforce limits (currently just allowing one in flight) + if (in_flight.size() > 0) { + return false; + } else { + return true; + } +} + +void PurgeQueue::_consume() +{ + assert(lock.is_locked_by_me()); + + // Because we are the writer and the reader of the journal + // via the same Journaler instance, we never need to reread_head + + if (!can_consume()) { + dout(10) << " cannot consume right now" << dendl; + + return; + } + + if (!journaler.is_readable()) { + dout(10) << " not readable right now" << dendl; + if (!journaler.have_waiter()) { + journaler.wait_for_readable(new FunctionContext([this](int r) { + Mutex::Locker l(lock); + if (r == 0) { + _consume(); + } + })); + } + + return; + } + + // The journaler is readable: consume an entry + bufferlist bl; + bool readable = journaler.try_read_entry(bl); + assert(readable); // we checked earlier + + dout(20) << " decoding entry" << dendl; + PurgeItem item; + bufferlist::iterator q = bl.begin(); + ::decode(item, q); + dout(20) << " executing item (0x" << std::hex << item.ino + << std::dec << ")" << dendl; + _execute_item(item, journaler.get_read_pos()); +} + +void PurgeQueue::_execute_item( + const PurgeItem &item, + uint64_t expire_to) +{ + assert(lock.is_locked_by_me()); + + in_flight[expire_to] = item; + + // TODO: handle things other than normal file purges + // (directories, snapshot truncates) + C_GatherBuilder gather(cct); + if (item.size > 0) { + uint64_t num = Striper::get_num_objects(item.layout, item.size); + dout(10) << " 0~" << item.size << " objects 0~" << num + << " snapc " << item.snapc << " on " << item.ino << dendl; + filer.purge_range(item.ino, &item.layout, item.snapc, + 0, num, ceph::real_clock::now(), 0, + gather.new_sub()); + } + + // remove the backtrace object if it was not purged + object_t oid = CInode::get_object_name(item.ino, frag_t(), ""); + if (!gather.has_subs() || !item.layout.pool_ns.empty()) { + object_locator_t oloc(item.layout.pool_id); + dout(10) << " remove backtrace object " << oid + << " pool " << oloc.pool << " snapc " << item.snapc << dendl; + objecter->remove(oid, oloc, item.snapc, + ceph::real_clock::now(), 0, + NULL, gather.new_sub()); + } + + // remove old backtrace objects + for (const auto &p : item.old_pools) { + object_locator_t oloc(p); + dout(10) << " remove backtrace object " << oid + << " old pool " << p << " snapc " << item.snapc << dendl; + objecter->remove(oid, oloc, item.snapc, + ceph::real_clock::now(), 0, + NULL, gather.new_sub()); + } + assert(gather.has_subs()); + + gather.set_finisher(new FunctionContext([this, expire_to](int r){ + execute_item_complete(expire_to); + })); + gather.activate(); +} + +void PurgeQueue::execute_item_complete( + uint64_t expire_to) +{ + dout(10) << "complete at 0x" << std::hex << expire_to << std::dec << dendl; + Mutex::Locker l(lock); + assert(in_flight.count(expire_to) == 1); + + auto iter = in_flight.find(expire_to); + assert(iter != in_flight.end()); + if (iter == in_flight.begin()) { + // This was the lowest journal position in flight, so we can now + // safely expire the journal up to here. + journaler.set_expire_pos(expire_to); + journaler.trim(); + } + + dout(10) << "completed item for ino 0x" << std::hex << iter->second.ino + << std::dec << dendl; + + in_flight.erase(iter); + + _consume(); +} + diff --git a/src/mds/PurgeQueue.h b/src/mds/PurgeQueue.h new file mode 100644 index 00000000000..e94759a8728 --- /dev/null +++ b/src/mds/PurgeQueue.h @@ -0,0 +1,101 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef PURGE_QUEUE_H_ +#define PURGE_QUEUE_H_ + +#include "include/compact_set.h" +#include "osdc/Journaler.h" + + +class PurgeItem +{ +public: + inodeno_t ino; + uint64_t size; + file_layout_t layout; + compact_set old_pools; + SnapContext snapc; + + PurgeItem() + : ino(0), size(0) + {} + + void encode(bufferlist &bl) const; + void decode(bufferlist::iterator &p); +}; +WRITE_CLASS_ENCODER(PurgeItem) + +/** + * Note that this class does not take a reference to MDSRank: we are + * independent of all the metadata structures and do not need to + * take mds_lock for anything. + */ +class PurgeQueue +{ +protected: + CephContext *cct; + const mds_rank_t rank; + Mutex lock; + + int64_t metadata_pool; + + // Don't use the MDSDaemon's Finisher and Timer, because this class + // operates outside of MDSDaemon::mds_lock + Finisher finisher; + SafeTimer timer; + Filer filer; + Objecter *objecter; + Journaler journaler; + + std::map in_flight; + + //PerfCounters *logger; + + bool can_consume(); + + void _consume(); + + void _execute_item( + const PurgeItem &item, + uint64_t expire_to); + void execute_item_complete( + uint64_t expire_to); + +public: + void init(); + void shutdown(); + + // Write an empty queue, use this during MDS rank creation + void create(Context *completion); + + // Read the Journaler header for an existing queue and start consuming + void open(Context *completion); + + // Submit one entry to the work queue. Call back when it is persisted + // to the queue (there is no callback for when it is executed) + void push(const PurgeItem &pi, Context *completion); + + PurgeQueue( + CephContext *cct_, + mds_rank_t rank_, + const int64_t metadata_pool_, + Objecter *objecter_); + ~PurgeQueue() + {} +}; + + +#endif + diff --git a/src/mds/StrayManager.cc b/src/mds/StrayManager.cc index de2955a5534..e16d2f9932e 100644 --- a/src/mds/StrayManager.cc +++ b/src/mds/StrayManager.cc @@ -138,45 +138,27 @@ void StrayManager::purge(CDentry *dn, uint32_t op_allowance) assert(in->last == CEPH_NOSNAP); } + uint64_t to = 0; if (in->is_file()) { - uint64_t to = in->inode.get_max_size(); + to = in->inode.get_max_size(); to = MAX(in->inode.size, to); // when truncating a file, the filer does not delete stripe objects that are // truncated to zero. so we need to purge stripe objects up to the max size // the file has ever been. to = MAX(in->inode.max_size_ever, to); - if (to > 0) { - uint64_t num = Striper::get_num_objects(in->inode.layout, to); - dout(10) << __func__ << " 0~" << to << " objects 0~" << num - << " snapc " << snapc << " on " << *in << dendl; - filer.purge_range(in->inode.ino, &in->inode.layout, *snapc, - 0, num, ceph::real_clock::now(), 0, - gather.new_sub()); - } } inode_t *pi = in->get_projected_inode(); - object_t oid = CInode::get_object_name(pi->ino, frag_t(), ""); - // remove the backtrace object if it was not purged - if (!gather.has_subs() || !pi->layout.pool_ns.empty()) { - object_locator_t oloc(pi->layout.pool_id); - dout(10) << __func__ << " remove backtrace object " << oid - << " pool " << oloc.pool << " snapc " << snapc << dendl; - mds->objecter->remove(oid, oloc, *snapc, - ceph::real_clock::now(), 0, - gather.new_sub()); - } - // remove old backtrace objects - for (compact_set::iterator p = pi->old_pools.begin(); - p != pi->old_pools.end(); - ++p) { - object_locator_t oloc(*p); - dout(10) << __func__ << " remove backtrace object " << oid - << " old pool " << *p << " snapc " << snapc << dendl; - mds->objecter->remove(oid, oloc, *snapc, - ceph::real_clock::now(), 0, - gather.new_sub()); - } + + PurgeItem item; + item.ino = in->inode.ino; + item.size = to; + item.layout = pi->layout; + item.old_pools = pi->old_pools; + item.snapc = *snapc; + + purge_queue.push(item, gather.new_sub()); + assert(gather.has_subs()); gather.activate(); } @@ -862,7 +844,9 @@ StrayManager::StrayManager(MDSRank *mds) ops_in_flight(0), files_purging(0), max_purge_ops(0), num_strays(0), num_strays_purging(0), num_strays_delayed(0), - filer(mds->objecter, mds->finisher) + filer(mds->objecter, mds->finisher), + purge_queue(g_ceph_context, mds->get_nodeid(), + mds->mdsmap->get_metadata_pool(), mds->objecter) { assert(mds != NULL); } diff --git a/src/mds/StrayManager.h b/src/mds/StrayManager.h index d58ebfbbd73..65017798e8b 100644 --- a/src/mds/StrayManager.h +++ b/src/mds/StrayManager.h @@ -17,6 +17,7 @@ #include "include/elist.h" #include #include "osdc/Filer.h" +#include "mds/PurgeQueue.h" class MDSRank; class PerfCounters; @@ -72,6 +73,8 @@ class StrayManager Filer filer; + PurgeQueue purge_queue; + void truncate(CDentry *dn, uint32_t op_allowance); /** @@ -95,6 +98,13 @@ class StrayManager */ void _truncate_stray_logged(CDentry *dn, LogSegment *ls); + // FIXME: doing this to let MDCache call through into purgequeue + // for initialization and teardown + // >> + friend class MDCache; + friend class MDSRankDispatcher; + // << + friend class StrayManagerIOContext; friend class StrayManagerLogContext; friend class StrayManagerContext; diff --git a/src/mds/mdstypes.h b/src/mds/mdstypes.h index 9a6779307d6..ed7067fc25b 100644 --- a/src/mds/mdstypes.h +++ b/src/mds/mdstypes.h @@ -52,6 +52,7 @@ #define MDS_INO_LOG_OFFSET (2*MAX_MDS) #define MDS_INO_LOG_BACKUP_OFFSET (3*MAX_MDS) #define MDS_INO_LOG_POINTER_OFFSET (4*MAX_MDS) +#define MDS_INO_PURGE_QUEUE (5*MAX_MDS) #define MDS_INO_SYSTEM_BASE ((6*MAX_MDS) + (MAX_MDS * NUM_STRAY))