From: John Spray Date: Fri, 23 Dec 2016 18:00:17 +0000 (+0000) Subject: mds: implement PurgeQueue throttling X-Git-Tag: v12.0.1~140^2~18 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f2fb1874ca29560676a09d69801b4db0ae4d8394;p=ceph.git mds: implement PurgeQueue throttling Signed-off-by: John Spray --- diff --git a/src/mds/MDSDaemon.cc b/src/mds/MDSDaemon.cc index 69b76f3e6f6b..ff5890b56b30 100644 --- a/src/mds/MDSDaemon.cc +++ b/src/mds/MDSDaemon.cc @@ -352,9 +352,10 @@ const char** MDSDaemon::get_tracked_conf_keys() const "clog_to_syslog", "clog_to_syslog_facility", "clog_to_syslog_level", - // StrayManager + // PurgeQueue "mds_max_purge_ops", "mds_max_purge_ops_per_pg", + "mds_max_purge_files", "clog_to_graylog", "clog_to_graylog_host", "clog_to_graylog_port", diff --git a/src/mds/PurgeQueue.cc b/src/mds/PurgeQueue.cc index 5c4ba15ab3fb..7e57eabc7fa8 100644 --- a/src/mds/PurgeQueue.cc +++ b/src/mds/PurgeQueue.cc @@ -56,8 +56,6 @@ void PurgeItem::decode(bufferlist::iterator &p) // TODO: implement purge queue creation on startup // if we are on a filesystem created before purge queues existed -// TODO: ensure that a deactivating MDS rank blocks -// on complete drain of this queue before finishing // TODO: when we're deactivating, lift all limits on // how many OSD ops we're allowed to emit at a time to // race through the queue as fast as we can. @@ -79,7 +77,9 @@ PurgeQueue::PurgeQueue( objecter(objecter_), journaler("pq", MDS_INO_PURGE_QUEUE + rank, metadata_pool, CEPH_FS_ONDISK_MAGIC, objecter_, nullptr, 0, &timer, - &finisher) + &finisher), + ops_in_flight(0), + max_purge_ops(0) { } @@ -94,10 +94,9 @@ void PurgeQueue::create_logger() { PerfCountersBuilder pcb(g_ceph_context, "purge_queue", l_pq_first, l_pq_last); - //pcb.add_u64(l_mdc_num_strays_purging, "num_strays_purging", "Stray dentries purging"); - // pcb.add_u64(l_mdc_num_purge_ops, "num_purge_ops", "Purge operations"); + pcb.add_u64(l_pq_executing_ops, "pq_executing_ops", "Purge queue ops in flight"); pcb.add_u64(l_pq_executing, "pq_executing", "Purge queue tasks in flight"); - pcb.add_u64_counter(l_pq_executed, "pq_executed", "Purge queue tasks executed"); + pcb.add_u64_counter(l_pq_executed, "pq_executed", "Purge queue tasks executed", "purg"); logger.reset(pcb.create_perf_counters()); g_ceph_context->get_perfcounters_collection()->add(logger.get()); @@ -168,74 +167,65 @@ void PurgeQueue::push(const PurgeItem &pi, Context *completion) // Note that flush calls are not 1:1 with IOs, Journaler // does its own batching. So we just call every time. - // FIXME: *actually* as soon as we call _consume it will - // do a flush via _issue_read, so we really are doing one - // write per event. Avoid this by avoiding doing the journaler - // read (see "if we could consume this PurgeItem immediately...") journaler.flush(completion); // Maybe go ahead and do something with it right away _consume(); - - // TODO: if we could consume this PurgeItem immediately, and - // Journaler does not have any outstanding prefetches, then - // short circuit its read by advancing read_pos to write_pos - // and passing the PurgeItem straight into _execute_item } -#if 0 -uint32_t StrayManager::_calculate_ops_required(CInode *in, bool trunc) +uint32_t PurgeQueue::_calculate_ops(const PurgeItem &item) const { uint32_t ops_required = 0; - if (in->is_dir()) { + if (item.action == PurgeItem::PURGE_DIR) { // Directory, count dirfrags to be deleted std::list ls; - if (!in->dirfragtree.is_leaf(frag_t())) { - in->dirfragtree.get_leaves(ls); + if (!item.fragtree.is_leaf(frag_t())) { + item.fragtree.get_leaves(ls); } // One for the root, plus any leaves ops_required = 1 + ls.size(); } else { // File, work out concurrent Filer::purge deletes - const uint64_t to = MAX(in->inode.max_size_ever, - MAX(in->inode.size, in->inode.get_max_size())); + const uint64_t num = (item.size > 0) ? + Striper::get_num_objects(item.layout, item.size) : 1; - const uint64_t num = (to > 0) ? Striper::get_num_objects(in->inode.layout, to) : 1; ops_required = MIN(num, g_conf->filer_max_purge_ops); // Account for removing (or zeroing) backtrace ops_required += 1; // Account for deletions for old pools - if (!trunc) { - ops_required += in->get_projected_inode()->old_pools.size(); + if (item.action != PurgeItem::TRUNCATE_FILE) { + ops_required += item.old_pools.size(); } } return ops_required; } -#endif bool PurgeQueue::can_consume() { -#if 0 - // Calculate how much of the ops allowance is available, allowing - // for the case where the limit is currently being exceeded. - uint32_t ops_avail; - if (ops_in_flight <= max_purge_ops) { - ops_avail = max_purge_ops - ops_in_flight; - } else { - ops_avail = 0; + dout(20) << ops_in_flight << "/" << max_purge_ops << " ops, " + << in_flight.size() << "/" << g_conf->mds_max_purge_files + << " files" << dendl; + + if (in_flight.size() == 0 && cct->_conf->mds_max_purge_files > 0) { + // Always permit consumption if nothing is in flight, so that the ops + // limit can never be so low as to forbid all progress (unless + // administrator has deliberately paused purging by setting max + // purge files to zero). + return true; } - dout(10) << __func__ << ": allocating allowance " - << ops_required << " to " << ops_in_flight << " in flight" << dendl; - - logger->set(l_mdc_num_purge_ops, ops_in_flight); -#endif + if (ops_in_flight >= max_purge_ops) { + dout(20) << "Throttling on op limit " << ops_in_flight << "/" + << max_purge_ops << dendl; + return false; + } - // TODO: enforce limits (currently just allowing one in flight) - if (in_flight.size() > cct->_conf->mds_max_purge_files) { + if (in_flight.size() >= cct->_conf->mds_max_purge_files) { + dout(20) << "Throttling on item limit " << in_flight.size() + << "/" << cct->_conf->mds_max_purge_files << dendl; return false; } else { return true; @@ -246,41 +236,38 @@ void PurgeQueue::_consume() { assert(lock.is_locked_by_me()); - // Because we are the writer and the reader of the journal - // via the same Journaler instance, we never need to reread_head - - if (!can_consume()) { - dout(10) << " cannot consume right now" << dendl; - - return; - } + while(can_consume()) { + if (!journaler.is_readable()) { + dout(10) << " not readable right now" << dendl; + // Because we are the writer and the reader of the journal + // via the same Journaler instance, we never need to reread_head + if (!journaler.have_waiter()) { + journaler.wait_for_readable(new FunctionContext([this](int r) { + Mutex::Locker l(lock); + if (r == 0) { + _consume(); + } + })); + } - if (!journaler.is_readable()) { - dout(10) << " not readable right now" << dendl; - if (!journaler.have_waiter()) { - journaler.wait_for_readable(new FunctionContext([this](int r) { - Mutex::Locker l(lock); - if (r == 0) { - _consume(); - } - })); + return; } - return; + // The journaler is readable: consume an entry + bufferlist bl; + bool readable = journaler.try_read_entry(bl); + assert(readable); // we checked earlier + + dout(20) << " decoding entry" << dendl; + PurgeItem item; + bufferlist::iterator q = bl.begin(); + ::decode(item, q); + dout(20) << " executing item (0x" << std::hex << item.ino + << std::dec << ")" << dendl; + _execute_item(item, journaler.get_read_pos()); } - // The journaler is readable: consume an entry - bufferlist bl; - bool readable = journaler.try_read_entry(bl); - assert(readable); // we checked earlier - - dout(20) << " decoding entry" << dendl; - PurgeItem item; - bufferlist::iterator q = bl.begin(); - ::decode(item, q); - dout(20) << " executing item (0x" << std::hex << item.ino - << std::dec << ")" << dendl; - _execute_item(item, journaler.get_read_pos()); + dout(10) << " cannot consume right now" << dendl; } void PurgeQueue::_execute_item( @@ -291,6 +278,8 @@ void PurgeQueue::_execute_item( in_flight[expire_to] = item; logger->set(l_pq_executing, in_flight.size()); + ops_in_flight += _calculate_ops(item); + logger->set(l_pq_executing_ops, ops_in_flight); SnapContext nullsnapc; @@ -387,19 +376,12 @@ void PurgeQueue::execute_item_complete( dout(10) << "completed item for ino 0x" << std::hex << iter->second.ino << std::dec << dendl; + ops_in_flight -= _calculate_ops(iter->second); + logger->set(l_pq_executing_ops, ops_in_flight); + in_flight.erase(iter); logger->set(l_pq_executing, in_flight.size()); -#if 0 - // Release resources - dout(10) << __func__ << ": decrementing op allowance " - << ops_allowance << " from " << ops_in_flight << " in flight" << dendl; - assert(ops_in_flight >= ops_allowance); - ops_in_flight -= ops_allowance; - logger->set(l_mdc_num_purge_ops, ops_in_flight); - files_purging -= 1; -#endif - logger->inc(l_pq_executed); _consume(); @@ -443,6 +425,25 @@ void PurgeQueue::handle_conf_change(const struct md_config_t *conf, if (changed.count("mds_max_purge_ops") || changed.count("mds_max_purge_ops_per_pg")) { update_op_limit(mds_map); + } else if (changed.count("mds_max_purge_files")) { + Mutex::Locker l(lock); + + if (in_flight.empty()) { + // We might have gone from zero to a finite limit, so + // might need to kick off consume. + dout(4) << "maybe start work again (max_purge_files=" + << conf->mds_max_purge_files << dendl; + finisher.queue(new FunctionContext([this](int r){ + Mutex::Locker l(lock); + _consume(); + })); + } } } +bool PurgeQueue::is_idle() const +{ + return in_flight.empty() && ( + journaler.get_read_pos() == journaler.get_write_pos()); +} + diff --git a/src/mds/PurgeQueue.h b/src/mds/PurgeQueue.h index c66a3b0a3766..b235536745ba 100644 --- a/src/mds/PurgeQueue.h +++ b/src/mds/PurgeQueue.h @@ -55,14 +55,9 @@ WRITE_CLASS_ENCODER(PurgeItem) enum { l_pq_first = 3500, - // >> TODO populate from PurgeQueue - // How many stray dentries are currently enqueued for purge - //l_mdc_num_strays_purging, - // How many purge RADOS ops might currently be in flight? - //l_mdc_num_purge_ops, - // << TODO // How many items have been finished by PurgeQueue + l_pq_executing_ops, l_pq_executing, l_pq_executed, l_pq_last @@ -100,11 +95,11 @@ protected: // Throttled allowances uint64_t ops_in_flight; - uint64_t files_purging; // Dynamic op limit per MDS based on PG count uint64_t max_purge_ops; + uint32_t _calculate_ops(const PurgeItem &item) const; bool can_consume(); @@ -132,6 +127,10 @@ public: // to the queue (there is no callback for when it is executed) void push(const PurgeItem &pi, Context *completion); + // If the on-disk queue is empty and we are not currently processing + // anything. + bool is_idle() const; + void update_op_limit(const MDSMap &mds_map); void handle_conf_change(const struct md_config_t *conf,