From: Sage Weil Date: Fri, 24 Jan 2014 22:35:41 +0000 (-0800) Subject: osd/ReplicatedPG: basic flush and evict agent functionality X-Git-Tag: v0.78~166^2~29 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=da9ed08ea358ede8dc887f1e8fd2495b625b3d23;p=ceph.git osd/ReplicatedPG: basic flush and evict agent functionality This is very basic flush and evict functionality for the tiering agent. The flush policy is very simple: if we are above the threshold and the object is dirty, and not super young, flush it. This is not too braindead of a policy (although we could clearly do something smarter). The evict policy is pretty simple: evict the object if it is clean and we are over our full threshold. If we are in the middle mode, try to estimate how cold the object is based on an accumulated histogram of objects we have examined so far, and decide to evict based on our position in that histogram relative to our "effort" level. Caveats: * the histograms are not refreshed * we aren't taking temperature into consideration yet, although some of the infrastructure is there. Signed-off-by: Sage Weil --- diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 89d8846f0c6..cca1881f100 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -388,6 +388,7 @@ OPTION(osd_backfill_retry_interval, OPT_DOUBLE, 10.0) // max agent flush ops OPTION(osd_agent_max_ops, OPT_INT, 4) +OPTION(osd_agent_min_evict_effort, OPT_FLOAT, .05) OPTION(osd_uuid, OPT_UUID, uuid_d()) OPTION(osd_data, OPT_STR, "/var/lib/ceph/osd/$cluster-$id") diff --git a/src/osd/Makefile.am b/src/osd/Makefile.am index 5110a6aa113..8b85ca681c6 100644 --- a/src/osd/Makefile.am +++ b/src/osd/Makefile.am @@ -33,6 +33,7 @@ noinst_HEADERS += \ osd/ReplicatedPG.h \ osd/PGBackend.h \ osd/ReplicatedBackend.h \ + osd/TierAgentState.h \ osd/Watch.h \ osd/osd_types.h diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 4c1d909f00e..14242b74c2d 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -5940,6 +5940,7 @@ void PG::RecoveryState::Active::exit() pg->state_clear(PG_STATE_REPLAY); utime_t dur = ceph_clock_now(pg->cct) - enter_time; pg->osd->recoverystate_perf->tinc(rs_active_latency, dur); + pg->agent_stop(); } /*------ReplicaActive-----*/ diff --git a/src/osd/PG.h b/src/osd/PG.h index a6fb1a6fb74..3eaebf0c7e3 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -1930,6 +1930,7 @@ public: virtual void get_watchers(std::list&) = 0; virtual void agent_work(int max) = 0; + virtual void agent_stop() = 0; }; ostream& operator<<(ostream& out, const PG& pg); diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index a6afed2e0c6..b43f2a9915e 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1189,6 +1189,9 @@ void ReplicatedPG::do_op(OpRequestRef op) hit_set_start_stamp + pool.info.hit_set_period <= m->get_recv_stamp()) { hit_set_persist(); } + + if (agent_state) + agent_choose_mode(); } if ((m->get_flags() & CEPH_OSD_FLAG_IGNORE_CACHE) == 0 && @@ -8709,6 +8712,7 @@ void ReplicatedPG::on_activate() } hit_set_setup(); + agent_setup(); } void ReplicatedPG::on_change(ObjectStore::Transaction *t) @@ -8794,6 +8798,7 @@ void ReplicatedPG::on_pool_change() { dout(10) << __func__ << dendl; hit_set_setup(); + agent_setup(); } // clear state. called on recovery completion AND cancellation. @@ -10023,6 +10028,10 @@ void ReplicatedPG::hit_set_persist() info.hit_set.current_info.end); dout(20) << __func__ << " archive " << oid << dendl; reset = true; + + if (agent_state) + agent_state->add_hit_set(info.hit_set.current_info.begin, hit_set); + } else { // persist snapshot of current hitset ::encode(*hit_set, bl); @@ -10149,6 +10158,8 @@ void ReplicatedPG::hit_set_trim(RepGather *repop, unsigned max) repop->ctx->op_t->remove(oid); repop->ctx->log.back().mod_desc.mark_unrollbackable(); } + if (agent_state) + agent_state->remove_oldest_hit_set(); info.hit_set.history.pop_front(); struct stat st; @@ -10160,6 +10171,364 @@ void ReplicatedPG::hit_set_trim(RepGather *repop, unsigned max) } +// ======================================= +// cache agent + +void ReplicatedPG::agent_setup() +{ + assert(is_locked()); + if (!is_primary() || + pool.info.cache_mode == pg_pool_t::CACHEMODE_NONE) { + agent_clear(); + return; + } + if (!agent_state) { + dout(10) << __func__ << " allocated new state" << dendl; + agent_state.reset(new TierAgentState); + } else { + dout(10) << __func__ << " keeping existing state" << dendl; + } + + agent_choose_mode(); +} + +void ReplicatedPG::agent_clear() +{ + agent_stop(); + agent_state.reset(NULL); +} + +void ReplicatedPG::agent_work(int start_max) +{ + lock(); + if (!agent_state) { + dout(10) << __func__ << " no agent state, stopping" << dendl; + unlock(); + return; + } + + if (agent_state->is_idle()) { + dout(10) << __func__ << " idle, stopping" << dendl; + unlock(); + return; + } + + dout(10) << __func__ + << " max " << start_max + << ", flush " << agent_state->get_flush_mode_name() + << ", evict " << agent_state->get_evict_mode_name() + << ", pos " << agent_state->position + << dendl; + + int ls_min = 1; + int ls_max = 10; // FIXME? + + // list some objects. this conveniently lists clones (oldest to + // newest) before heads... the same order we want to flush in. + // + // NOTE: do not flush the Sequencer. we will assume that the + // listing we get back is imprecise. + vector ls; + hobject_t next; + int r = pgbackend->objects_list_partial(agent_state->position, ls_min, ls_max, + 0 /* no filtering by snapid */, + &ls, &next); + assert(r >= 0); + dout(20) << __func__ << " got " << ls.size() << " objects" << dendl; + int started = 0; + for (vector::iterator p = ls.begin(); + p != ls.end(); + ++p) { + ObjectContextRef obc = get_object_context(*p, false, NULL); + if (!obc) { + // we didn't flush; we may miss something here. + dout(20) << __func__ << " no obc for " << *p << ", skipping" << dendl; + continue; + } + if (!obc->obs.exists) { + dout(20) << __func__ << " " << obc->obs.oi.soid << " dne, skipping" + << dendl; + continue; + } + if (scrubber.write_blocked_by_scrub(obc->obs.oi.soid)) { + dout(20) << __func__ << " scrubbing, skipping " << obc->obs.oi << dendl; + continue; + } + if (agent_state->flush_mode != TierAgentState::FLUSH_MODE_IDLE && + agent_maybe_flush(obc)) + ++started; + if (agent_state->evict_mode != TierAgentState::EVICT_MODE_IDLE && + agent_maybe_evict(obc)) + ++started; + if (started >= start_max) + break; + } + + if (next.is_max()) + agent_state->position = hobject_t(); + else + agent_state->position = next; + dout(20) << __func__ << " final position " << agent_state->position << dendl; + agent_choose_mode(); + unlock(); +} + +struct C_AgentFlushStartStop : public Context { + ReplicatedPGRef pg; + hobject_t oid; + C_AgentFlushStartStop(ReplicatedPG *p, hobject_t o) : pg(p), oid(o) { + pg->osd->agent_start_op(oid); + } + void finish(int r) { + pg->osd->agent_finish_op(oid); + } +}; + +bool ReplicatedPG::agent_maybe_flush(ObjectContextRef& obc) +{ + if (!obc->obs.oi.is_dirty()) { + dout(20) << __func__ << " skip (clean) " << obc->obs.oi << dendl; + return false; + } + + utime_t now = ceph_clock_now(NULL); + if (obc->obs.oi.mtime + utime_t(pool.info.cache_min_flush_age, 0) > now) { + dout(20) << __func__ << " skip (too young) " << obc->obs.oi << dendl; + return false; + } + + if (osd->agent_is_active_oid(obc->obs.oi.soid)) { + dout(20) << __func__ << " skip (flushing) " << obc->obs.oi << dendl; + return false; + } + + dout(10) << __func__ << " flushing " << obc->obs.oi << dendl; + + // FIXME: flush anything dirty, regardless of what distribution of + // ages we expect. + + vector ops; + tid_t rep_tid = osd->get_tid(); + osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid); + OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops, + &obc->obs, obc->ssc, this); + ctx->op_t = pgbackend->get_transaction(); + ctx->obc = obc; + ctx->mtime = ceph_clock_now(cct); + ctx->at_version = get_next_version(); + ctx->on_finish = new C_AgentFlushStartStop(this, obc->obs.oi.soid); + + start_flush(ctx, false); + return true; +} + +bool ReplicatedPG::agent_maybe_evict(ObjectContextRef& obc) +{ + const hobject_t& soid = obc->obs.oi.soid; + if (obc->obs.oi.is_dirty()) { + dout(20) << __func__ << " skip (dirty) " << obc->obs.oi << dendl; + return false; + } + if (!obc->obs.oi.watchers.empty()) { + dout(20) << __func__ << " skip (watchers) " << obc->obs.oi << dendl; + return false; + } + + if (soid.snap == CEPH_NOSNAP) { + int result = _verify_no_head_clones(soid, obc->ssc->snapset); + if (result < 0) { + dout(20) << __func__ << " skip (clones) " << obc->obs.oi << dendl; + return false; + } + } + + if (agent_state->evict_mode != TierAgentState::EVICT_MODE_FULL) { + // is this object old and/or cold enough? + int atime = -1, temp = 0; + agent_estimate_atime_temp(soid, &atime, NULL /*FIXME &temp*/); + + unsigned atime_upper = 0, atime_lower = 0; + if (atime < 0 && obc->obs.oi.mtime != utime_t()) + atime = ceph_clock_now(NULL).sec() - obc->obs.oi.mtime; + if (atime < 0) + atime = pool.info.hit_set_period * pool.info.hit_set_count; // "infinite" + if (atime >= 0) { + agent_state->atime_hist.add(atime); + agent_state->atime_hist.get_position_micro(atime, &atime_lower, + &atime_upper); + } + + unsigned temp_upper = 0, temp_lower = 0; + /* + // FIXME: bound atime based on creation time? + agent_state->temp_hist.add(atime); + agent_state->temp_hist.get_position_micro(temp, &temp_lower, &temp_upper); + */ + + dout(20) << __func__ + << " atime " << atime + << " pos " << atime_lower << "-" << atime_upper + << ", temp " << temp + << " pos " << temp_lower << "-" << temp_upper + << ", evict_effort " << agent_state->evict_effort + << dendl; + dout(30) << "agent_state:\n"; + Formatter *f = new_formatter(""); + f->open_object_section("agent_state"); + agent_state->dump(f); + f->close_section(); + f->flush(*_dout); + delete f; + *_dout << dendl; + + // FIXME: ignore temperature for now. + + // KISS: if [lower,upper] spans our target effort, evict it. + if (atime_lower >= agent_state->evict_effort) + return false; + } + + dout(10) << __func__ << " evicting " << obc->obs.oi << dendl; + RepGather *repop = simple_repop_create(obc); + OpContext *ctx = repop->ctx; + ctx->at_version = get_next_version(); + int r = _delete_head(ctx, true); + assert(r == 0); + finish_ctx(ctx, pg_log_entry_t::DELETE); + simple_repop_submit(repop); + return true; +} + +void ReplicatedPG::agent_stop() +{ + dout(20) << __func__ << dendl; + if (agent_state && !agent_state->is_idle()) { + agent_state->evict_mode = TierAgentState::EVICT_MODE_IDLE; + agent_state->flush_mode = TierAgentState::FLUSH_MODE_IDLE; + osd->agent_disable_pg(this); + } +} + +bool ReplicatedPG::agent_choose_mode() +{ + // get dirty, full ratios + uint64_t dirty_micro = 0; + uint64_t full_micro = 0; + if (pool.info.target_max_bytes) { + uint64_t avg_size = info.stats.stats.sum.num_bytes / + info.stats.stats.sum.num_objects; + dirty_micro = + info.stats.stats.sum.num_objects_dirty * avg_size * 1000000 / + (pool.info.target_max_bytes / pool.info.get_pg_num_divisor(info.pgid)); + full_micro = + info.stats.stats.sum.num_bytes * 1000000 / + (pool.info.target_max_bytes / pool.info.get_pg_num_divisor(info.pgid)); + } + if (pool.info.target_max_objects) { + uint64_t dirty_objects_micro = + info.stats.stats.sum.num_objects_dirty * 1000000 / + (pool.info.target_max_objects / pool.info.get_pg_num_divisor(info.pgid)); + if (dirty_objects_micro > dirty_micro) + dirty_micro = dirty_objects_micro; + uint64_t full_objects_micro = + info.stats.stats.sum.num_objects * 1000000 / + (pool.info.target_max_objects / pool.info.get_pg_num_divisor(info.pgid)); + if (full_objects_micro > full_micro) + full_micro = full_objects_micro; + } + dout(20) << __func__ << " dirty " << ((float)dirty_micro / 1000000.0) + << " full " << ((float)full_micro / 1000000.0) + << dendl; + + // flush mode + TierAgentState::flush_mode_t flush_mode = TierAgentState::FLUSH_MODE_IDLE; + if (dirty_micro > pool.info.cache_target_dirty_ratio_micro) + flush_mode = TierAgentState::FLUSH_MODE_ACTIVE; + + // evict mode + TierAgentState::evict_mode_t evict_mode = TierAgentState::EVICT_MODE_IDLE; + unsigned evict_effort = 0; + + if (full_micro > 1000000 || + pool.info.cache_target_full_ratio_micro >= 1000000) { + // evict anything clean + evict_mode = TierAgentState::EVICT_MODE_FULL; + evict_effort = 1000000; + } else if (full_micro > pool.info.cache_target_full_ratio_micro) { + // set effort in [0..1] range based on where we are between + evict_mode = TierAgentState::EVICT_MODE_SOME; + uint64_t over = full_micro - pool.info.cache_target_full_ratio_micro; + uint64_t span = 1000000 - pool.info.cache_target_full_ratio_micro; + evict_effort = MIN(over * 1000000 / span, + (unsigned)(1000000.0 * g_conf->osd_agent_min_evict_effort)); + } + + bool changed = false; + if (flush_mode != agent_state->flush_mode) { + dout(5) << __func__ << " flush_mode " + << TierAgentState::get_flush_mode_name(agent_state->flush_mode) + << " -> " + << TierAgentState::get_flush_mode_name(flush_mode) + << dendl; + agent_state->flush_mode = flush_mode; + changed = true; + } + if (evict_mode != agent_state->evict_mode) { + dout(5) << __func__ << " evict_mode " + << TierAgentState::get_evict_mode_name(agent_state->evict_mode) + << " -> " + << TierAgentState::get_evict_mode_name(evict_mode) + << dendl; + agent_state->evict_mode = evict_mode; + changed = true; + } + if (evict_effort != agent_state->evict_effort) { + dout(5) << __func__ << " evict_effort " + << ((float)agent_state->evict_effort / 1000000.0) + << " -> " + << ((float)evict_effort / 1000000.0) + << dendl; + agent_state->evict_effort = evict_effort; + } + if (changed) { + if (agent_state->is_idle()) + osd->agent_disable_pg(this); + else + osd->agent_enable_pg(this); + } + return changed; +} + +void ReplicatedPG::agent_estimate_atime_temp(const hobject_t& oid, + int *atime, int *temp) +{ + assert(hit_set); + *atime = -1; + if (temp) + *temp = 0; + if (hit_set->contains(oid)) { + *atime = 0; + if (temp) + ++(*temp); + else + return; + } + time_t now = ceph_clock_now(NULL).sec(); + for (map::iterator p = agent_state->hit_set_map.begin(); + p != agent_state->hit_set_map.end(); + ++p) { + if (p->second->contains(oid)) { + if (*atime < 0) + *atime = now - p->first; + if (temp) + ++(*temp); + else + return; + } + } +} + + // ========================================================================================== // SCRUB diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 02bd1450aa5..60faac8d9ab 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -28,6 +28,7 @@ #include "PG.h" #include "Watch.h" #include "OpRequest.h" +#include "TierAgentState.h" #include "messages/MOSDOp.h" #include "messages/MOSDOpReply.h" @@ -677,8 +678,32 @@ protected: hobject_t get_hit_set_current_object(utime_t stamp); hobject_t get_hit_set_archive_object(utime_t start, utime_t end); + // agent + boost::scoped_ptr agent_state; + friend class C_AgentFlushStartStop; + void agent_setup(); ///< initialize agent state + void agent_work(int max); ///< entry point to do some agent work + bool agent_maybe_flush(ObjectContextRef& obc); ///< maybe flush + bool agent_maybe_evict(ObjectContextRef& obc); ///< maybe evict + + /// estimate object atime and temperature + /// + /// @param oid [in] object name + /// @param atime [out] seconds since last access (lower bound) + /// @param temperature [out] relative temperature (# hitset bins we appear in) + void agent_estimate_atime_temp(const hobject_t& oid, + int *atime, int *temperature); + + /// stop the agent + void agent_stop(); + + /// clear agent state + void agent_clear(); + + bool agent_choose_mode(); ///< choose (new) agent mode(s) + /// true if we can send an ondisk/commit for v bool already_complete(eversion_t v) { for (xlist::iterator i = repop_queue.begin(); @@ -1238,8 +1263,6 @@ public: int getattrs_maybe_cache( ObjectContextRef obc, map *out); - - void agent_work(int max) { /* placeholder */ } }; inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop) diff --git a/src/osd/TierAgentState.h b/src/osd/TierAgentState.h new file mode 100644 index 00000000000..615633847b8 --- /dev/null +++ b/src/osd/TierAgentState.h @@ -0,0 +1,105 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_OSD_TIERAGENT_H +#define CEPH_OSD_TIERAGENT_H + +struct TierAgentState { + /// current position iterating across pool + hobject_t position; + + /// histogram of ages we've encountered + pow2_hist_t atime_hist; + pow2_hist_t temp_hist; + + /// past HitSet(s) (not current) + map hit_set_map; + + /// a few recent things we've seen that are clean + list recent_clean; + + enum flush_mode_t { + FLUSH_MODE_IDLE, // nothing to flush + FLUSH_MODE_ACTIVE, // flush what we can to bring down dirty count + } flush_mode; ///< current flush behavior + static const char *get_flush_mode_name(flush_mode_t m) { + switch (m) { + case FLUSH_MODE_IDLE: return "idle"; + case FLUSH_MODE_ACTIVE: return "active"; + default: assert(0 == "bad flush mode"); + } + } + const char *get_flush_mode_name() const { + return get_flush_mode_name(flush_mode); + } + + enum evict_mode_t { + EVICT_MODE_IDLE, // no need to evict anything + EVICT_MODE_SOME, // evict some things as we are near the target + EVICT_MODE_FULL, // evict anything + } evict_mode; ///< current evict behavior + static const char *get_evict_mode_name(evict_mode_t m) { + switch (m) { + case EVICT_MODE_IDLE: return "idle"; + case EVICT_MODE_SOME: return "some"; + case EVICT_MODE_FULL: return "full"; + default: assert(0 == "bad evict mode"); + } + } + const char *get_evict_mode_name() const { + return get_evict_mode_name(evict_mode); + } + + /// approximate ratio of objects (assuming they are uniformly + /// distributed) that i should aim to evict. + unsigned evict_effort; + + TierAgentState() + : flush_mode(FLUSH_MODE_IDLE), + evict_mode(EVICT_MODE_IDLE), + evict_effort(0) + {} + + /// false if we have any work to do + bool is_idle() const { + return + flush_mode == FLUSH_MODE_IDLE && + evict_mode == EVICT_MODE_IDLE; + } + + /// add archived HitSet + void add_hit_set(time_t start, HitSetRef hs) { + hit_set_map.insert(make_pair(start, hs)); + } + + /// remove old/trimmed HitSet + void remove_oldest_hit_set() { + if (!hit_set_map.empty()) + hit_set_map.erase(hit_set_map.begin()); + } + + void dump(Formatter *f) const { + f->dump_string("flush_mode", get_flush_mode_name()); + f->dump_string("evict_mode", get_evict_mode_name()); + f->dump_unsigned("evict_effort", evict_effort); + f->dump_stream("position") << position; + f->open_object_section("atime_hist"); + atime_hist.dump(f); + f->close_section(); + f->open_object_section("temp_hist"); + temp_hist.dump(f); + f->close_section(); + } +}; + +#endif diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 5db79572c9f..3e32b16a90b 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -2521,7 +2521,8 @@ public: if (get_write_lock()) { return true; } // else - waiters.push_back(op); + if (op) + waiters.push_back(op); return false; } bool get_write_lock() {