OPTION(paxos_max_join_drift, OPT_INT, 10) // max paxos iterations before we must first slurp
OPTION(paxos_propose_interval, OPT_DOUBLE, 1.0) // gather updates for this long before proposing a map update
OPTION(paxos_min_wait, OPT_DOUBLE, 0.05) // min time to gather updates for after period of inactivity
+OPTION(paxos_trim_tolerance, OPT_INT, 3) // number of extra proposals tolerated before trimming
+OPTION(paxos_trim_disabled_max_versions, OPT_INT, 100) // maximum amount of versions we shall allow passing by without trimming
OPTION(clock_offset, OPT_DOUBLE, 0) // how much to offset the system clock in Clock.cc
OPTION(auth_cluster_required, OPT_STR, "cephx") // required of mon, mds, osd daemons
OPTION(auth_service_required, OPT_STR, "cephx") // required by daemons of clients
::encode(mon->key_server, bl);
paxos->stash_latest(version, bl);
*/
- unsigned max = g_conf->paxos_max_join_drift * 2;
- if (mon->is_leader() &&
- version > max)
- trim_to(version - max);
}
void AuthMonitor::increase_max_global_id()
dout(10) << __func__ << " key server has no secrets; do not put them in tx" << dendl;
}
+void AuthMonitor::update_trim()
+{
+ unsigned max = g_conf->paxos_max_join_drift * 2;
+ version_t version = get_version();
+ if (mon->is_leader() && (version > max))
+ set_trim_to(version - max);
+}
+
bool AuthMonitor::preprocess_query(PaxosServiceMessage *m)
{
dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
uint64_t assign_global_id(MAuth *m, bool should_increase_max);
// propose pending update to peers
void encode_pending(MonitorDBStore::Transaction *t);
+ void update_trim();
bool preprocess_query(PaxosServiceMessage *m); // true if processed.
bool prepare_update(PaxosServiceMessage *m);
}
}
- // trim
- unsigned max = g_conf->mon_max_log_epochs;
- if (mon->is_leader() && version > max)
- trim_to(version - max);
-
check_subs();
}
put_version_latest_full(t, version);
}
+void LogMonitor::update_trim()
+{
+ unsigned max = g_conf->mon_max_log_epochs;
+ version_t version = get_version();
+ if (mon->is_leader() && version > max)
+ set_trim_to(version - max);
+}
+
bool LogMonitor::preprocess_query(PaxosServiceMessage *m)
{
dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
void create_pending(); // prepare a new pending
// propose pending update to peers
void encode_pending(MonitorDBStore::Transaction *t);
+ void update_trim();
bool preprocess_query(PaxosServiceMessage *m); // true if processed.
bool prepare_update(PaxosServiceMessage *m);
void create_pending();
void encode_pending(MonitorDBStore::Transaction *t);
+ bool should_trim() { return false; }
+ void encode_trim(MonitorDBStore::Transaction *t) { }
+
void update_logger();
void _updated(MMDSBeacon *m);
mon->messenger->send_message(m, s->inst);
}
+void OSDMonitor::update_trim()
+{
+ if (mon->pgmon()->is_readable() &&
+ mon->pgmon()->pg_map.creating_pgs.empty()) {
+ epoch_t floor = mon->pgmon()->pg_map.calc_min_last_epoch_clean();
+ dout(10) << " min_last_epoch_clean " << floor << dendl;
+ unsigned min = g_conf->mon_min_osdmap_epochs;
+ if (floor + min > get_version()) {
+ if (min < get_version())
+ floor = get_version() - min;
+ else
+ floor = 0;
+ }
+ if (floor > get_first_committed())
+ if (get_trim_to() < floor)
+ set_trim_to(floor);
+ }
+}
+
+bool OSDMonitor::should_trim()
+{
+ update_trim();
+ return (get_trim_to() > 0);
+}
// -------------
#endif
// ---------------
+ update_trim();
+
if (do_propose ||
!pending_inc.new_pg_temp.empty()) // also propose if we adjusted pg_temp
propose_pending();
-
- if (mon->pgmon()->is_readable() &&
- mon->pgmon()->pg_map.creating_pgs.empty()) {
- epoch_t floor = mon->pgmon()->pg_map.calc_min_last_epoch_clean();
- dout(10) << " min_last_epoch_clean " << floor << dendl;
- unsigned min = g_conf->mon_min_osdmap_epochs;
- if (floor + min > get_version()) {
- if (min < get_version())
- floor = get_version() - min;
- else
- floor = 0;
- }
- if (floor > get_first_committed())
- trim_to(floor); // we are now responsible for trimming our own versions.
- }
}
void OSDMonitor::handle_osd_timeouts(const utime_t &now,
bool prepare_update(PaxosServiceMessage *m);
bool should_propose(double &delay);
+ void update_trim();
+ bool should_trim();
+
bool can_mark_down(int o);
bool can_mark_up(int o);
bool can_mark_out(int o);
}
*/
- unsigned max = g_conf->mon_max_pgmap_epochs;
- if (mon->is_leader() && (version > max))
- trim_to(version - max);
+ update_trim();
send_pg_creates();
put_version_latest_full(t, version);
}
+void PGMonitor::update_trim()
+{
+ unsigned max = g_conf->mon_max_pgmap_epochs;
+ version_t version = get_version();
+ if (mon->is_leader() && (version > max))
+ set_trim_to(version - max);
+}
+
+
bool PGMonitor::preprocess_query(PaxosServiceMessage *m)
{
dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
void handle_osd_timeouts();
void create_pending(); // prepare a new pending
// propose pending update to peers
+ void update_trim();
void encode_pending(MonitorDBStore::Transaction *t);
void update_logger();
return;
}
+ if (should_trim()) {
+ trim();
+ }
if (is_active() && (proposals.size() > 0)) {
propose_queued();
lease_timeout_event = new C_LeaseTimeout(this);
mon->timer.add_event_after(g_conf->mon_lease_ack_timeout, lease_timeout_event);
- // trim?
- trim_to(lease->first_committed);
-
// kick waiters
finish_contexts(g_ceph_context, waiting_for_active);
if (is_readable())
<< " -- got everyone" << dendl;
mon->timer.cancel_event(lease_ack_timeout_event);
lease_ack_timeout_event = 0;
+
+
} else {
dout(10) << "handle_lease_ack from " << ack->get_source()
<< " -- still need "
* trim old states
*/
-void Paxos::trim_to(MonitorDBStore::Transaction *t, version_t first, bool force)
+void Paxos::trim_to(MonitorDBStore::Transaction *t, version_t first)
{
dout(10) << "trim_to " << first << " (was " << first_committed << ")"
<< dendl;
t->put(get_name(), "first_committed", first_committed);
}
-void Paxos::trim_to(version_t first, bool force)
+void Paxos::trim_to(version_t first)
{
MonitorDBStore::Transaction t;
- trim_to(&t, first, force);
+ trim_to(&t, first);
if (!t.empty()) {
JSONFormatter f(true);
dout(30) << __func__ << " transaction dump:\n";
f.flush(*_dout);
*_dout << dendl;
- get_store()->apply_transaction(t);
+
+ bufferlist bl;
+ t.encode(bl);
+
+ going_to_trim = true;
+ queue_proposal(bl, new C_Trimmed(this));
}
}
+void Paxos::trim_enable() {
+ trim_disabled_version = 0;
+ // We may not be the leader when we reach this function. We sure must
+ // have been the leader at some point, but we may have been demoted and
+ // we really should reset 'trim_disabled_version' if that was the case.
+ // So, make sure we only trim() iff we are the leader.
+ if (mon->is_leader() && should_trim())
+ trim();
+}
+
/*
* return a globally unique, monotonically increasing proposal number
*/
*/
bool going_to_bootstrap;
+ /**
+ * Should be true if we have proposed to trim, or are in the middle of
+ * trimming; false otherwise.
+ */
+ bool going_to_trim;
+ /**
+ * If we have disabled trimming our state, this variable should have a
+ * value greater than zero, corresponding to the version we had at the time
+ * we disabled the trim.
+ */
+ version_t trim_disabled_version;
/**
* @defgroup Paxos_h_callbacks Callback classes.
}
};
+ class C_Trimmed : public Context {
+ Paxos *paxos;
+ public:
+ C_Trimmed(Paxos *p) : paxos(p) { }
+ void finish(int r) {
+ paxos->going_to_trim = false;
+ }
+ };
/**
*
*/
lease_timeout_event(0),
accept_timeout_event(0),
clock_drift_warned(0),
- going_to_bootstrap(false) { }
+ going_to_bootstrap(false),
+ going_to_trim(false),
+ trim_disabled_version(0) { }
const string get_name() const {
return paxos_name;
* Erase old states from stable storage.
*
* @param first The version we are trimming to
- * @param force If specified, we may even erase the latest stashed version
- * iif @p first is higher than that version.
*/
- void trim_to(version_t first, bool force = false);
+ void trim_to(version_t first);
/**
* Erase old states from stable storage.
*
* @param t A transaction
* @param first The version we are trimming to
- * @param force If specified, we may even erase the latest stashed version
- * iif @p first is higher than that version.
*/
- void trim_to(MonitorDBStore::Transaction *t, version_t first, bool force=false);
+ void trim_to(MonitorDBStore::Transaction *t, version_t first);
+ /**
+ * Trim the Paxos state as much as we can.
+ */
+ void trim() {
+ assert(should_trim());
+ version_t trim_to_version = get_version() - g_conf->paxos_max_join_drift;
+ trim_to(trim_to_version);
+ }
+ /**
+ * Disable trimming
+ *
+ * This is required by the Monitor's store synchronization mechanisms
+ * to guarantee a consistent store state.
+ */
+ void trim_disable() {
+ if (!trim_disabled_version)
+ trim_disabled_version = get_version();
+ }
+ /**
+ * Enable trimming
+ */
+ void trim_enable();
+ /**
+ * Check if trimming has been disabled
+ *
+ * @returns true if trim has been disabled; false otherwise.
+ */
+ bool is_trim_disabled() { return (trim_disabled_version > 0); }
+ /**
+ * Check if we should trim.
+ *
+ * If trimming is disabled, we must take that into consideration and only
+ * return true if we are positively sure that we should trim soon.
+ *
+ * @returns true if we should trim; false otherwise.
+ */
+ bool should_trim() {
+ int available_versions = (get_version() - get_first_committed());
+ int maximum_versions =
+ (g_conf->paxos_max_join_drift + g_conf->paxos_trim_tolerance);
+
+ if (going_to_trim || (available_versions <= maximum_versions))
+ return false;
+
+ if (trim_disabled_version > 0) {
+ int disabled_versions = (get_version() - trim_disabled_version);
+ if (disabled_versions < g_conf->paxos_trim_disabled_max_versions)
+ return false;
+ }
+ return true;
+ }
/**
* @defgroup Paxos_h_slurping_funcs Slurping-related functions
*/
MonitorDBStore::Transaction t;
bufferlist bl;
+
+ if (should_trim()) {
+ encode_trim(&t);
+ set_trim_to(0);
+ }
+
encode_pending(&t);
have_pending = false;
finish_contexts(g_ceph_context, waiting_for_finished_proposal);
}
-void PaxosService::trim_to(version_t first, bool force)
+void PaxosService::encode_trim(MonitorDBStore::Transaction *t)
{
version_t first_committed = get_first_committed();
version_t latest_full = get_version("full", "latest");
+ version_t trim_to = get_trim_to();
string latest_key = mon->store->combine_strings("full", latest_full);
bool has_full = mon->store->exists(get_service_name(), latest_key);
- dout(10) << __func__ << " " << first << " (was " << first_committed << ")"
+ dout(10) << __func__ << " " << trim_to << " (was " << first_committed << ")"
<< ", latest full " << latest_full << dendl;
- if (first_committed >= first)
+ if (first_committed >= trim_to)
return;
- MonitorDBStore::Transaction t;
- while ((first_committed < first)
- && (force || (first_committed < latest_full))) {
+ while ((first_committed < trim_to)) {
dout(20) << __func__ << first_committed << dendl;
- t.erase(get_service_name(), first_committed);
+ t->erase(get_service_name(), first_committed);
if (has_full) {
latest_key = mon->store->combine_strings("full", first_committed);
- if (mon->store->exists(get_service_name(), latest_key))
- t.erase(get_service_name(), latest_key);
+ t->erase(get_service_name(), latest_key);
}
first_committed++;
}
- put_first_committed(&t, first_committed);
- mon->store->apply_transaction(t);
+ put_first_committed(t, first_committed);
}
* then have_pending should be true; otherwise, false.
*/
bool have_pending;
+ /**
+ * The version to trim to. If zero, we assume there is no version to be
+ * trimmed; otherwise, we assume we should trim to the version held by
+ * this variable.
+ */
+ version_t trim_version;
protected:
/**
PaxosService(Monitor *mn, Paxos *p, string name)
: mon(mn), paxos(p), service_name(name),
service_version(0), proposal_timer(0), have_pending(false),
+ trim_version(0),
last_committed_name("last_committed"),
first_committed_name("first_committed"),
last_accepted_name("last_accepted"),
*/
void wakeup_proposing_waiters();
+ /**
+ * @defgroup PaxosService_h_Trim
+ * @{
+ */
/**
* Trim our log. This implies getting rid of versions on the k/v store.
* Services implementing us don't have to implement this function if they
* expected behavior is that, when 'true', we will remove all
* the log versions even if we don't have a full map in store.
*/
- void trim_to(version_t first, bool force = false);
+ virtual void encode_trim(MonitorDBStore::Transaction *t);
+ /**
+ * Check if we should trim.
+ *
+ * We define this function here, because we assume that as long as we know of
+ * a version to trim, we should trim. However, any implementation should feel
+ * free to define its own version of this function if deemed necessary.
+ *
+ * @returns true if we should trim; false otherwise.
+ */
+ virtual bool should_trim() {
+ update_trim();
+ return (get_trim_to() > 0);
+ }
+ /**
+ * Update our trim status. We do nothing here, because there is no
+ * straightforward way to update the trim version, since that's service
+ * specific. However, we do not force services to implement it, since there
+ * a couple of services that do not trim anything at all, and we don't want
+ * to shove this function down their throats if they are never going to use
+ * it anyway.
+ */
+ virtual void update_trim() { }
+ /**
+ * Set the trim version variable to @p ver
+ *
+ * @param ver The version to trim to.
+ */
+ void set_trim_to(version_t ver) {
+ trim_version = ver;
+ }
+ /**
+ * Get the version we should trim to.
+ *
+ * @returns the version we should trim to; if we return zero, it should be
+ * assumed that there's no version to trim to.
+ */
+ version_t get_trim_to() {
+ return trim_version;
+ }
+ /**
+ * @}
+ */
/**