hit_set_start_stamp + pool.info.hit_set_period <= m->get_recv_stamp()) {
hit_set_persist();
}
+
+ if (agent_state)
+ agent_choose_mode();
}
if ((m->get_flags() & CEPH_OSD_FLAG_IGNORE_CACHE) == 0 &&
}
hit_set_setup();
+ agent_setup();
}
void ReplicatedPG::on_change(ObjectStore::Transaction *t)
{
dout(10) << __func__ << dendl;
hit_set_setup();
+ agent_setup();
}
// clear state. called on recovery completion AND cancellation.
info.hit_set.current_info.end);
dout(20) << __func__ << " archive " << oid << dendl;
reset = true;
+
+ if (agent_state)
+ agent_state->add_hit_set(info.hit_set.current_info.begin, hit_set);
+
} else {
// persist snapshot of current hitset
::encode(*hit_set, bl);
repop->ctx->op_t->remove(oid);
repop->ctx->log.back().mod_desc.mark_unrollbackable();
}
+ if (agent_state)
+ agent_state->remove_oldest_hit_set();
info.hit_set.history.pop_front();
struct stat st;
}
+// =======================================
+// cache agent
+
+void ReplicatedPG::agent_setup()
+{
+ assert(is_locked());
+ if (!is_primary() ||
+ pool.info.cache_mode == pg_pool_t::CACHEMODE_NONE) {
+ agent_clear();
+ return;
+ }
+ if (!agent_state) {
+ dout(10) << __func__ << " allocated new state" << dendl;
+ agent_state.reset(new TierAgentState);
+ } else {
+ dout(10) << __func__ << " keeping existing state" << dendl;
+ }
+
+ agent_choose_mode();
+}
+
+void ReplicatedPG::agent_clear()
+{
+ agent_stop();
+ agent_state.reset(NULL);
+}
+
+void ReplicatedPG::agent_work(int start_max)
+{
+ lock();
+ if (!agent_state) {
+ dout(10) << __func__ << " no agent state, stopping" << dendl;
+ unlock();
+ return;
+ }
+
+ if (agent_state->is_idle()) {
+ dout(10) << __func__ << " idle, stopping" << dendl;
+ unlock();
+ return;
+ }
+
+ dout(10) << __func__
+ << " max " << start_max
+ << ", flush " << agent_state->get_flush_mode_name()
+ << ", evict " << agent_state->get_evict_mode_name()
+ << ", pos " << agent_state->position
+ << dendl;
+
+ int ls_min = 1;
+ int ls_max = 10; // FIXME?
+
+ // list some objects. this conveniently lists clones (oldest to
+ // newest) before heads... the same order we want to flush in.
+ //
+ // NOTE: do not flush the Sequencer. we will assume that the
+ // listing we get back is imprecise.
+ vector<hobject_t> ls;
+ hobject_t next;
+ int r = pgbackend->objects_list_partial(agent_state->position, ls_min, ls_max,
+ 0 /* no filtering by snapid */,
+ &ls, &next);
+ assert(r >= 0);
+ dout(20) << __func__ << " got " << ls.size() << " objects" << dendl;
+ int started = 0;
+ for (vector<hobject_t>::iterator p = ls.begin();
+ p != ls.end();
+ ++p) {
+ ObjectContextRef obc = get_object_context(*p, false, NULL);
+ if (!obc) {
+ // we didn't flush; we may miss something here.
+ dout(20) << __func__ << " no obc for " << *p << ", skipping" << dendl;
+ continue;
+ }
+ if (!obc->obs.exists) {
+ dout(20) << __func__ << " " << obc->obs.oi.soid << " dne, skipping"
+ << dendl;
+ continue;
+ }
+ if (scrubber.write_blocked_by_scrub(obc->obs.oi.soid)) {
+ dout(20) << __func__ << " scrubbing, skipping " << obc->obs.oi << dendl;
+ continue;
+ }
+ if (agent_state->flush_mode != TierAgentState::FLUSH_MODE_IDLE &&
+ agent_maybe_flush(obc))
+ ++started;
+ if (agent_state->evict_mode != TierAgentState::EVICT_MODE_IDLE &&
+ agent_maybe_evict(obc))
+ ++started;
+ if (started >= start_max)
+ break;
+ }
+
+ if (next.is_max())
+ agent_state->position = hobject_t();
+ else
+ agent_state->position = next;
+ dout(20) << __func__ << " final position " << agent_state->position << dendl;
+ agent_choose_mode();
+ unlock();
+}
+
+struct C_AgentFlushStartStop : public Context {
+ ReplicatedPGRef pg;
+ hobject_t oid;
+ C_AgentFlushStartStop(ReplicatedPG *p, hobject_t o) : pg(p), oid(o) {
+ pg->osd->agent_start_op(oid);
+ }
+ void finish(int r) {
+ pg->osd->agent_finish_op(oid);
+ }
+};
+
+bool ReplicatedPG::agent_maybe_flush(ObjectContextRef& obc)
+{
+ if (!obc->obs.oi.is_dirty()) {
+ dout(20) << __func__ << " skip (clean) " << obc->obs.oi << dendl;
+ return false;
+ }
+
+ utime_t now = ceph_clock_now(NULL);
+ if (obc->obs.oi.mtime + utime_t(pool.info.cache_min_flush_age, 0) > now) {
+ dout(20) << __func__ << " skip (too young) " << obc->obs.oi << dendl;
+ return false;
+ }
+
+ if (osd->agent_is_active_oid(obc->obs.oi.soid)) {
+ dout(20) << __func__ << " skip (flushing) " << obc->obs.oi << dendl;
+ return false;
+ }
+
+ dout(10) << __func__ << " flushing " << obc->obs.oi << dendl;
+
+ // FIXME: flush anything dirty, regardless of what distribution of
+ // ages we expect.
+
+ vector<OSDOp> ops;
+ tid_t rep_tid = osd->get_tid();
+ osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
+ OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops,
+ &obc->obs, obc->ssc, this);
+ ctx->op_t = pgbackend->get_transaction();
+ ctx->obc = obc;
+ ctx->mtime = ceph_clock_now(cct);
+ ctx->at_version = get_next_version();
+ ctx->on_finish = new C_AgentFlushStartStop(this, obc->obs.oi.soid);
+
+ start_flush(ctx, false);
+ return true;
+}
+
+bool ReplicatedPG::agent_maybe_evict(ObjectContextRef& obc)
+{
+ const hobject_t& soid = obc->obs.oi.soid;
+ if (obc->obs.oi.is_dirty()) {
+ dout(20) << __func__ << " skip (dirty) " << obc->obs.oi << dendl;
+ return false;
+ }
+ if (!obc->obs.oi.watchers.empty()) {
+ dout(20) << __func__ << " skip (watchers) " << obc->obs.oi << dendl;
+ return false;
+ }
+
+ if (soid.snap == CEPH_NOSNAP) {
+ int result = _verify_no_head_clones(soid, obc->ssc->snapset);
+ if (result < 0) {
+ dout(20) << __func__ << " skip (clones) " << obc->obs.oi << dendl;
+ return false;
+ }
+ }
+
+ if (agent_state->evict_mode != TierAgentState::EVICT_MODE_FULL) {
+ // is this object old and/or cold enough?
+ int atime = -1, temp = 0;
+ agent_estimate_atime_temp(soid, &atime, NULL /*FIXME &temp*/);
+
+ unsigned atime_upper = 0, atime_lower = 0;
+ if (atime < 0 && obc->obs.oi.mtime != utime_t())
+ atime = ceph_clock_now(NULL).sec() - obc->obs.oi.mtime;
+ if (atime < 0)
+ atime = pool.info.hit_set_period * pool.info.hit_set_count; // "infinite"
+ if (atime >= 0) {
+ agent_state->atime_hist.add(atime);
+ agent_state->atime_hist.get_position_micro(atime, &atime_lower,
+ &atime_upper);
+ }
+
+ unsigned temp_upper = 0, temp_lower = 0;
+ /*
+ // FIXME: bound atime based on creation time?
+ agent_state->temp_hist.add(atime);
+ agent_state->temp_hist.get_position_micro(temp, &temp_lower, &temp_upper);
+ */
+
+ dout(20) << __func__
+ << " atime " << atime
+ << " pos " << atime_lower << "-" << atime_upper
+ << ", temp " << temp
+ << " pos " << temp_lower << "-" << temp_upper
+ << ", evict_effort " << agent_state->evict_effort
+ << dendl;
+ dout(30) << "agent_state:\n";
+ Formatter *f = new_formatter("");
+ f->open_object_section("agent_state");
+ agent_state->dump(f);
+ f->close_section();
+ f->flush(*_dout);
+ delete f;
+ *_dout << dendl;
+
+ // FIXME: ignore temperature for now.
+
+ // KISS: if [lower,upper] spans our target effort, evict it.
+ if (atime_lower >= agent_state->evict_effort)
+ return false;
+ }
+
+ dout(10) << __func__ << " evicting " << obc->obs.oi << dendl;
+ RepGather *repop = simple_repop_create(obc);
+ OpContext *ctx = repop->ctx;
+ ctx->at_version = get_next_version();
+ int r = _delete_head(ctx, true);
+ assert(r == 0);
+ finish_ctx(ctx, pg_log_entry_t::DELETE);
+ simple_repop_submit(repop);
+ return true;
+}
+
+void ReplicatedPG::agent_stop()
+{
+ dout(20) << __func__ << dendl;
+ if (agent_state && !agent_state->is_idle()) {
+ agent_state->evict_mode = TierAgentState::EVICT_MODE_IDLE;
+ agent_state->flush_mode = TierAgentState::FLUSH_MODE_IDLE;
+ osd->agent_disable_pg(this);
+ }
+}
+
+bool ReplicatedPG::agent_choose_mode()
+{
+ // get dirty, full ratios
+ uint64_t dirty_micro = 0;
+ uint64_t full_micro = 0;
+ if (pool.info.target_max_bytes) {
+ uint64_t avg_size = info.stats.stats.sum.num_bytes /
+ info.stats.stats.sum.num_objects;
+ dirty_micro =
+ info.stats.stats.sum.num_objects_dirty * avg_size * 1000000 /
+ (pool.info.target_max_bytes / pool.info.get_pg_num_divisor(info.pgid));
+ full_micro =
+ info.stats.stats.sum.num_bytes * 1000000 /
+ (pool.info.target_max_bytes / pool.info.get_pg_num_divisor(info.pgid));
+ }
+ if (pool.info.target_max_objects) {
+ uint64_t dirty_objects_micro =
+ info.stats.stats.sum.num_objects_dirty * 1000000 /
+ (pool.info.target_max_objects / pool.info.get_pg_num_divisor(info.pgid));
+ if (dirty_objects_micro > dirty_micro)
+ dirty_micro = dirty_objects_micro;
+ uint64_t full_objects_micro =
+ info.stats.stats.sum.num_objects * 1000000 /
+ (pool.info.target_max_objects / pool.info.get_pg_num_divisor(info.pgid));
+ if (full_objects_micro > full_micro)
+ full_micro = full_objects_micro;
+ }
+ dout(20) << __func__ << " dirty " << ((float)dirty_micro / 1000000.0)
+ << " full " << ((float)full_micro / 1000000.0)
+ << dendl;
+
+ // flush mode
+ TierAgentState::flush_mode_t flush_mode = TierAgentState::FLUSH_MODE_IDLE;
+ if (dirty_micro > pool.info.cache_target_dirty_ratio_micro)
+ flush_mode = TierAgentState::FLUSH_MODE_ACTIVE;
+
+ // evict mode
+ TierAgentState::evict_mode_t evict_mode = TierAgentState::EVICT_MODE_IDLE;
+ unsigned evict_effort = 0;
+
+ if (full_micro > 1000000 ||
+ pool.info.cache_target_full_ratio_micro >= 1000000) {
+ // evict anything clean
+ evict_mode = TierAgentState::EVICT_MODE_FULL;
+ evict_effort = 1000000;
+ } else if (full_micro > pool.info.cache_target_full_ratio_micro) {
+ // set effort in [0..1] range based on where we are between
+ evict_mode = TierAgentState::EVICT_MODE_SOME;
+ uint64_t over = full_micro - pool.info.cache_target_full_ratio_micro;
+ uint64_t span = 1000000 - pool.info.cache_target_full_ratio_micro;
+ evict_effort = MIN(over * 1000000 / span,
+ (unsigned)(1000000.0 * g_conf->osd_agent_min_evict_effort));
+ }
+
+ bool changed = false;
+ if (flush_mode != agent_state->flush_mode) {
+ dout(5) << __func__ << " flush_mode "
+ << TierAgentState::get_flush_mode_name(agent_state->flush_mode)
+ << " -> "
+ << TierAgentState::get_flush_mode_name(flush_mode)
+ << dendl;
+ agent_state->flush_mode = flush_mode;
+ changed = true;
+ }
+ if (evict_mode != agent_state->evict_mode) {
+ dout(5) << __func__ << " evict_mode "
+ << TierAgentState::get_evict_mode_name(agent_state->evict_mode)
+ << " -> "
+ << TierAgentState::get_evict_mode_name(evict_mode)
+ << dendl;
+ agent_state->evict_mode = evict_mode;
+ changed = true;
+ }
+ if (evict_effort != agent_state->evict_effort) {
+ dout(5) << __func__ << " evict_effort "
+ << ((float)agent_state->evict_effort / 1000000.0)
+ << " -> "
+ << ((float)evict_effort / 1000000.0)
+ << dendl;
+ agent_state->evict_effort = evict_effort;
+ }
+ if (changed) {
+ if (agent_state->is_idle())
+ osd->agent_disable_pg(this);
+ else
+ osd->agent_enable_pg(this);
+ }
+ return changed;
+}
+
+void ReplicatedPG::agent_estimate_atime_temp(const hobject_t& oid,
+ int *atime, int *temp)
+{
+ assert(hit_set);
+ *atime = -1;
+ if (temp)
+ *temp = 0;
+ if (hit_set->contains(oid)) {
+ *atime = 0;
+ if (temp)
+ ++(*temp);
+ else
+ return;
+ }
+ time_t now = ceph_clock_now(NULL).sec();
+ for (map<time_t,HitSetRef>::iterator p = agent_state->hit_set_map.begin();
+ p != agent_state->hit_set_map.end();
+ ++p) {
+ if (p->second->contains(oid)) {
+ if (*atime < 0)
+ *atime = now - p->first;
+ if (temp)
+ ++(*temp);
+ else
+ return;
+ }
+ }
+}
+
+
// ==========================================================================================
// SCRUB
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Sage Weil <sage@inktank.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_OSD_TIERAGENT_H
+#define CEPH_OSD_TIERAGENT_H
+
+struct TierAgentState {
+ /// current position iterating across pool
+ hobject_t position;
+
+ /// histogram of ages we've encountered
+ pow2_hist_t atime_hist;
+ pow2_hist_t temp_hist;
+
+ /// past HitSet(s) (not current)
+ map<time_t,HitSetRef> hit_set_map;
+
+ /// a few recent things we've seen that are clean
+ list<hobject_t> recent_clean;
+
+ enum flush_mode_t {
+ FLUSH_MODE_IDLE, // nothing to flush
+ FLUSH_MODE_ACTIVE, // flush what we can to bring down dirty count
+ } flush_mode; ///< current flush behavior
+ static const char *get_flush_mode_name(flush_mode_t m) {
+ switch (m) {
+ case FLUSH_MODE_IDLE: return "idle";
+ case FLUSH_MODE_ACTIVE: return "active";
+ default: assert(0 == "bad flush mode");
+ }
+ }
+ const char *get_flush_mode_name() const {
+ return get_flush_mode_name(flush_mode);
+ }
+
+ enum evict_mode_t {
+ EVICT_MODE_IDLE, // no need to evict anything
+ EVICT_MODE_SOME, // evict some things as we are near the target
+ EVICT_MODE_FULL, // evict anything
+ } evict_mode; ///< current evict behavior
+ static const char *get_evict_mode_name(evict_mode_t m) {
+ switch (m) {
+ case EVICT_MODE_IDLE: return "idle";
+ case EVICT_MODE_SOME: return "some";
+ case EVICT_MODE_FULL: return "full";
+ default: assert(0 == "bad evict mode");
+ }
+ }
+ const char *get_evict_mode_name() const {
+ return get_evict_mode_name(evict_mode);
+ }
+
+ /// approximate ratio of objects (assuming they are uniformly
+ /// distributed) that i should aim to evict.
+ unsigned evict_effort;
+
+ TierAgentState()
+ : flush_mode(FLUSH_MODE_IDLE),
+ evict_mode(EVICT_MODE_IDLE),
+ evict_effort(0)
+ {}
+
+ /// false if we have any work to do
+ bool is_idle() const {
+ return
+ flush_mode == FLUSH_MODE_IDLE &&
+ evict_mode == EVICT_MODE_IDLE;
+ }
+
+ /// add archived HitSet
+ void add_hit_set(time_t start, HitSetRef hs) {
+ hit_set_map.insert(make_pair(start, hs));
+ }
+
+ /// remove old/trimmed HitSet
+ void remove_oldest_hit_set() {
+ if (!hit_set_map.empty())
+ hit_set_map.erase(hit_set_map.begin());
+ }
+
+ void dump(Formatter *f) const {
+ f->dump_string("flush_mode", get_flush_mode_name());
+ f->dump_string("evict_mode", get_evict_mode_name());
+ f->dump_unsigned("evict_effort", evict_effort);
+ f->dump_stream("position") << position;
+ f->open_object_section("atime_hist");
+ atime_hist.dump(f);
+ f->close_section();
+ f->open_object_section("temp_hist");
+ temp_hist.dump(f);
+ f->close_section();
+ }
+};
+
+#endif