]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mon: Paxos: trim through Paxos
authorJoao Eduardo Luis <joao.luis@inktank.com>
Wed, 4 Jul 2012 10:47:03 +0000 (11:47 +0100)
committerJoao Eduardo Luis <joao.luis@inktank.com>
Thu, 21 Feb 2013 18:02:22 +0000 (18:02 +0000)
Instead of directly modifying the store whenever we want to trim our Paxos
state, we should do it through Paxos, proposing the trim to the quorum and
commit it once accepted.

This enforces three major invariants that we will be able to leverage later
on during the store synchronization:

 1) The Leader will set the pace for trimming across the system. No one
    will trim their state unless they are committing the value proposed by
    the Leader;

 2) Following (1), the monitors in the quorum will trim at the same time.
    There will be no diverging states due to trimming on different monitors.

 3) Each trim will be kept as a transaction in the Paxos' store allowing
    us to obtain a consistent state during synchronization, by shipping
    the Paxos versions to the other monitor and applying them. We could
    incur in an inconsistent state if the trim happened without
    constraints, without being logged; by going through Paxos this concern
    is no longer relevant.

The trimming itself may be triggered each time a proposal finishes, which
is the time at which we know we have committed a new version on the store.

It shall be triggered iff we are sure we have enough versions on the store
to fill the gap of any monitor that might become alive and still hasn't
drifted enough to require synchronization. Roughly speaking, we will check
if the number of available versions is higher than 'paxos_max_join_drift'.

Furthermore, we added a new option, 'paxos_trim_tolerance', so we are able
to avoid trimming every single time the above condition is met -- which
would happen every time we trimmed a version, and then proposed a new one,
and then we would trim it again, etc. So, just tolerate a couple of commits
before trimming again.

Finally, we added support to enable/disable trimming, which will be
essential during the store synchronization process.

Signed-off-by: Joao Eduardo Luis <joao.luis@inktank.com>
14 files changed:
src/common/config_opts.h
src/mon/AuthMonitor.cc
src/mon/AuthMonitor.h
src/mon/LogMonitor.cc
src/mon/LogMonitor.h
src/mon/MDSMonitor.h
src/mon/OSDMonitor.cc
src/mon/OSDMonitor.h
src/mon/PGMonitor.cc
src/mon/PGMonitor.h
src/mon/Paxos.cc
src/mon/Paxos.h
src/mon/PaxosService.cc
src/mon/PaxosService.h

index 5e0449e3606ea0b24e49729cc71d99c6d55f39a2..7686983af886f3f42f69e0f3091b74b87d2b5ed4 100644 (file)
@@ -158,6 +158,8 @@ OPTION(mon_max_log_entries_per_event, OPT_INT, 4096)
 OPTION(paxos_max_join_drift, OPT_INT, 10)       // max paxos iterations before we must first slurp
 OPTION(paxos_propose_interval, OPT_DOUBLE, 1.0)  // gather updates for this long before proposing a map update
 OPTION(paxos_min_wait, OPT_DOUBLE, 0.05)  // min time to gather updates for after period of inactivity
+OPTION(paxos_trim_tolerance, OPT_INT, 3) // number of extra proposals tolerated before trimming
+OPTION(paxos_trim_disabled_max_versions, OPT_INT, 100) // maximum amount of versions we shall allow passing by without trimming
 OPTION(clock_offset, OPT_DOUBLE, 0) // how much to offset the system clock in Clock.cc
 OPTION(auth_cluster_required, OPT_STR, "cephx")   // required of mon, mds, osd daemons
 OPTION(auth_service_required, OPT_STR, "cephx")   // required by daemons of clients
index 7e93dff88d1c796b2dd6bf2861cfff62e3f21de4..92bbb0a30ea380f0437b440652719a5baf28426b 100644 (file)
@@ -205,10 +205,6 @@ void AuthMonitor::update_from_paxos()
   ::encode(mon->key_server, bl);
   paxos->stash_latest(version, bl);
   */
-  unsigned max = g_conf->paxos_max_join_drift * 2;
-  if (mon->is_leader() &&
-      version > max)
-    trim_to(version - max);
 }
 
 void AuthMonitor::increase_max_global_id()
@@ -265,6 +261,14 @@ void AuthMonitor::encode_pending(MonitorDBStore::Transaction *t)
     dout(10) << __func__ << " key server has no secrets; do not put them in tx" << dendl;
 }
 
+void AuthMonitor::update_trim()
+{
+  unsigned max = g_conf->paxos_max_join_drift * 2;
+  version_t version = get_version();
+  if (mon->is_leader() && (version > max))
+    set_trim_to(version - max);
+}
+
 bool AuthMonitor::preprocess_query(PaxosServiceMessage *m)
 {
   dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
index f84e42bef985cf6216620e869294e4344bd85e8c..f6e3e3c8f995c085da0e2f08ee0d64b3872797e5 100644 (file)
@@ -134,6 +134,7 @@ private:
   uint64_t assign_global_id(MAuth *m, bool should_increase_max);
   // propose pending update to peers
   void encode_pending(MonitorDBStore::Transaction *t);
+  void update_trim();
 
   bool preprocess_query(PaxosServiceMessage *m);  // true if processed.
   bool prepare_update(PaxosServiceMessage *m);
index 9ff7f50949d48a3ef590486a46a8eecd9864f151..fcb67d1c30381a51896252ebad08e465f84f7593 100644 (file)
@@ -164,11 +164,6 @@ void LogMonitor::update_from_paxos()
     }
   }
 
-  // trim
-  unsigned max = g_conf->mon_max_log_epochs;
-  if (mon->is_leader() && version > max)
-    trim_to(version - max);
-
   check_subs();
 }
 
@@ -211,6 +206,14 @@ void LogMonitor::encode_pending(MonitorDBStore::Transaction *t)
   put_version_latest_full(t, version);
 }
 
+void LogMonitor::update_trim()
+{
+  unsigned max = g_conf->mon_max_log_epochs;
+  version_t version = get_version();
+  if (mon->is_leader() && version > max)
+    set_trim_to(version - max);
+}
+
 bool LogMonitor::preprocess_query(PaxosServiceMessage *m)
 {
   dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
index 489b0d72bb804b85fdbf0c25312703df5453f5e8..60215dec6152240521d9b31d341b7ba396f21a81 100644 (file)
@@ -38,6 +38,7 @@ private:
   void create_pending();  // prepare a new pending
   // propose pending update to peers
   void encode_pending(MonitorDBStore::Transaction *t);
+  void update_trim();
   bool preprocess_query(PaxosServiceMessage *m);  // true if processed.
   bool prepare_update(PaxosServiceMessage *m);
 
index e8ccebea1fee8916a27830386bb7cc7df199c8f1..e55d0097cb77d12d4195a0876a848451a01cf0af 100644 (file)
@@ -76,6 +76,9 @@ class MDSMonitor : public PaxosService {
   void create_pending(); 
   void encode_pending(MonitorDBStore::Transaction *t);
 
+  bool should_trim() { return false; }
+  void encode_trim(MonitorDBStore::Transaction *t) { }
+
   void update_logger();
 
   void _updated(MMDSBeacon *m);
index a9f13ae34849ff356c2838423c773d20b3b82a86..e02060863d51805ee5e5c243991cca97b90bad2f 100644 (file)
@@ -503,6 +503,30 @@ void OSDMonitor::share_map_with_random_osd()
   mon->messenger->send_message(m, s->inst);
 }
 
+void OSDMonitor::update_trim()
+{
+  if (mon->pgmon()->is_readable() &&
+      mon->pgmon()->pg_map.creating_pgs.empty()) {
+    epoch_t floor = mon->pgmon()->pg_map.calc_min_last_epoch_clean();
+    dout(10) << " min_last_epoch_clean " << floor << dendl;
+    unsigned min = g_conf->mon_min_osdmap_epochs;
+    if (floor + min > get_version()) {
+      if (min < get_version())
+       floor = get_version() - min;
+      else
+       floor = 0;
+    }
+    if (floor > get_first_committed())
+      if (get_trim_to() < floor)
+       set_trim_to(floor);
+  }
+}
+
+bool OSDMonitor::should_trim()
+{
+  update_trim();
+  return (get_trim_to() > 0);
+}
 
 // -------------
 
@@ -1614,24 +1638,11 @@ void OSDMonitor::tick()
 #endif
   // ---------------
 
+  update_trim();
+
   if (do_propose ||
       !pending_inc.new_pg_temp.empty())  // also propose if we adjusted pg_temp
     propose_pending();
-
-  if (mon->pgmon()->is_readable() &&
-      mon->pgmon()->pg_map.creating_pgs.empty()) {
-    epoch_t floor = mon->pgmon()->pg_map.calc_min_last_epoch_clean();
-    dout(10) << " min_last_epoch_clean " << floor << dendl;
-    unsigned min = g_conf->mon_min_osdmap_epochs;
-    if (floor + min > get_version()) {
-      if (min < get_version())
-       floor = get_version() - min;
-      else
-       floor = 0;
-    }
-    if (floor > get_first_committed())
-      trim_to(floor); // we are now responsible for trimming our own versions.
-  }    
 }
 
 void OSDMonitor::handle_osd_timeouts(const utime_t &now,
index 7d327e4df90f8a5527258283012d8bb570d390ea..db4d14c51145fa004cb82a7ff9d2b2ca1f558686 100644 (file)
@@ -151,6 +151,9 @@ private:
   bool prepare_update(PaxosServiceMessage *m);
   bool should_propose(double &delay);
 
+  void update_trim();
+  bool should_trim();
+
   bool can_mark_down(int o);
   bool can_mark_up(int o);
   bool can_mark_out(int o);
index 056bd98f6457dc65785e84285f58ddcbce22857c..3cf969d554595d85be0c38b5b602cf42cc767449 100644 (file)
@@ -208,9 +208,7 @@ void PGMonitor::update_from_paxos()
   }
   */
 
-  unsigned max = g_conf->mon_max_pgmap_epochs;
-  if (mon->is_leader() && (version > max))
-    trim_to(version - max);
+  update_trim();
 
   send_pg_creates();
 
@@ -271,6 +269,15 @@ void PGMonitor::encode_pending(MonitorDBStore::Transaction *t)
   put_version_latest_full(t, version);
 }
 
+void PGMonitor::update_trim()
+{
+  unsigned max = g_conf->mon_max_pgmap_epochs;
+  version_t version = get_version();
+  if (mon->is_leader() && (version > max))
+    set_trim_to(version - max);
+}
+
+
 bool PGMonitor::preprocess_query(PaxosServiceMessage *m)
 {
   dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
index 626a590be7cbc0d0941fd5499579dcc5dac296d6..1affefbc2bf05e6fc76f97bcdda97676c2b14fc3 100644 (file)
@@ -54,6 +54,7 @@ private:
   void handle_osd_timeouts();
   void create_pending();  // prepare a new pending
   // propose pending update to peers
+  void update_trim();
   void encode_pending(MonitorDBStore::Transaction *t);
   void update_logger();
 
index 1658a8da1b72e4499a78c45f54c0767f1597087c..e964c128bccfb2e9ada12f12af9ad365f1079448 100644 (file)
@@ -767,6 +767,9 @@ void Paxos::finish_proposal()
     return;
   }
 
+  if (should_trim()) {
+    trim();
+  }
 
   if (is_active() && (proposals.size() > 0)) {
     propose_queued();
@@ -818,9 +821,6 @@ void Paxos::handle_lease(MMonPaxos *lease)
   lease_timeout_event = new C_LeaseTimeout(this);
   mon->timer.add_event_after(g_conf->mon_lease_ack_timeout, lease_timeout_event);
 
-  // trim?
-  trim_to(lease->first_committed);
-  
   // kick waiters
   finish_contexts(g_ceph_context, waiting_for_active);
   if (is_readable())
@@ -846,6 +846,8 @@ void Paxos::handle_lease_ack(MMonPaxos *ack)
               << " -- got everyone" << dendl;
       mon->timer.cancel_event(lease_ack_timeout_event);
       lease_ack_timeout_event = 0;
+
+
     } else {
       dout(10) << "handle_lease_ack from " << ack->get_source() 
               << " -- still need "
@@ -892,7 +894,7 @@ void Paxos::lease_renew_timeout()
  * trim old states
  */
 
-void Paxos::trim_to(MonitorDBStore::Transaction *t, version_t first, bool force)
+void Paxos::trim_to(MonitorDBStore::Transaction *t, version_t first)
 {
   dout(10) << "trim_to " << first << " (was " << first_committed << ")"
           << dendl;
@@ -908,11 +910,11 @@ void Paxos::trim_to(MonitorDBStore::Transaction *t, version_t first, bool force)
   t->put(get_name(), "first_committed", first_committed);
 }
 
-void Paxos::trim_to(version_t first, bool force)
+void Paxos::trim_to(version_t first)
 {
   MonitorDBStore::Transaction t;
   
-  trim_to(&t, first, force);
+  trim_to(&t, first);
 
   if (!t.empty()) {
     JSONFormatter f(true);
@@ -920,10 +922,25 @@ void Paxos::trim_to(version_t first, bool force)
     dout(30) << __func__ << " transaction dump:\n";
     f.flush(*_dout);
     *_dout << dendl;
-    get_store()->apply_transaction(t);
+
+    bufferlist bl;
+    t.encode(bl);
+
+    going_to_trim = true;
+    queue_proposal(bl, new C_Trimmed(this));
   }
 }
 
+void Paxos::trim_enable() {
+  trim_disabled_version = 0;
+  // We may not be the leader when we reach this function. We sure must
+  // have been the leader at some point, but we may have been demoted and
+  // we really should reset 'trim_disabled_version' if that was the case.
+  // So, make sure we only trim() iff we are the leader.
+  if (mon->is_leader() && should_trim())
+    trim();
+}
+
 /*
  * return a globally unique, monotonically increasing proposal number
  */
index f9bf09798f58b3415f0df3b03bbe389ce51139d1..a2be3907836ee01642f3aa4b8df4e8f510b89fcc 100644 (file)
@@ -532,6 +532,17 @@ private:
    */
 
   bool going_to_bootstrap;
+  /**
+   * Should be true if we have proposed to trim, or are in the middle of
+   * trimming; false otherwise.
+   */
+  bool going_to_trim;
+  /**
+   * If we have disabled trimming our state, this variable should have a
+   * value greater than zero, corresponding to the version we had at the time
+   * we disabled the trim.
+   */
+  version_t trim_disabled_version;
 
   /**
    * @defgroup Paxos_h_callbacks Callback classes.
@@ -607,6 +618,14 @@ private:
     }
   };
 
+  class C_Trimmed : public Context {
+    Paxos *paxos;
+  public:
+    C_Trimmed(Paxos *p) : paxos(p) { }
+    void finish(int r) {
+      paxos->going_to_trim = false;
+    }
+  };
   /**
    *
    */
@@ -997,7 +1016,9 @@ public:
                   lease_timeout_event(0),
                   accept_timeout_event(0),
                   clock_drift_warned(0),
-                  going_to_bootstrap(false) { }
+                  going_to_bootstrap(false),
+                  going_to_trim(false),
+                  trim_disabled_version(0) { }
 
   const string get_name() const {
     return paxos_name;
@@ -1111,19 +1132,66 @@ public:
    * Erase old states from stable storage.
    *
    * @param first The version we are trimming to
-   * @param force If specified, we may even erase the latest stashed version
-   *             iif @p first is higher than that version.
    */
-  void trim_to(version_t first, bool force = false);
+  void trim_to(version_t first);
   /**
    * Erase old states from stable storage.
    *
    * @param t A transaction
    * @param first The version we are trimming to
-   * @param force If specified, we may even erase the latest stashed version
-   *             iif @p first is higher than that version.
    */
-  void trim_to(MonitorDBStore::Transaction *t, version_t first, bool force=false);
+  void trim_to(MonitorDBStore::Transaction *t, version_t first);
+  /**
+   * Trim the Paxos state as much as we can.
+   */
+  void trim() {
+    assert(should_trim());
+    version_t trim_to_version = get_version() - g_conf->paxos_max_join_drift;
+    trim_to(trim_to_version);
+  }
+  /**
+   * Disable trimming
+   *
+   * This is required by the Monitor's store synchronization mechanisms
+   * to guarantee a consistent store state.
+   */
+  void trim_disable() {
+    if (!trim_disabled_version)
+      trim_disabled_version = get_version();
+  }
+  /**
+   * Enable trimming
+   */
+  void trim_enable();
+  /**
+   * Check if trimming has been disabled
+   *
+   * @returns true if trim has been disabled; false otherwise.
+   */
+  bool is_trim_disabled() { return (trim_disabled_version > 0); }
+  /**
+   * Check if we should trim.
+   *
+   * If trimming is disabled, we must take that into consideration and only
+   * return true if we are positively sure that we should trim soon.
+   *
+   * @returns true if we should trim; false otherwise.
+   */
+  bool should_trim() {
+    int available_versions = (get_version() - get_first_committed());
+    int maximum_versions =
+      (g_conf->paxos_max_join_drift + g_conf->paxos_trim_tolerance);
+
+    if (going_to_trim || (available_versions <= maximum_versions))
+      return false;
+
+    if (trim_disabled_version > 0) {
+      int disabled_versions = (get_version() - trim_disabled_version);
+      if (disabled_versions < g_conf->paxos_trim_disabled_max_versions)
+       return false;
+    }
+    return true;
+  }
  
   /**
    * @defgroup Paxos_h_slurping_funcs Slurping-related functions
index e5c4f2129c48dc846841247f684222034687d565..d5d14c01ed063fbe1cb6d9913eb7caec46e9e441 100644 (file)
@@ -125,6 +125,12 @@ void PaxosService::propose_pending()
    */
   MonitorDBStore::Transaction t;
   bufferlist bl;
+
+  if (should_trim()) {
+    encode_trim(&t);
+    set_trim_to(0);
+  }
+
   encode_pending(&t);
   have_pending = false;
 
@@ -263,35 +269,32 @@ void PaxosService::wakeup_proposing_waiters()
   finish_contexts(g_ceph_context, waiting_for_finished_proposal);
 }
 
-void PaxosService::trim_to(version_t first, bool force)
+void PaxosService::encode_trim(MonitorDBStore::Transaction *t)
 {
   version_t first_committed = get_first_committed();
   version_t latest_full = get_version("full", "latest");
+  version_t trim_to = get_trim_to();
 
   string latest_key = mon->store->combine_strings("full", latest_full);
   bool has_full = mon->store->exists(get_service_name(), latest_key);
 
-  dout(10) << __func__ << " " << first << " (was " << first_committed << ")"
+  dout(10) << __func__ << " " << trim_to << " (was " << first_committed << ")"
           << ", latest full " << latest_full << dendl;
 
-  if (first_committed >= first)
+  if (first_committed >= trim_to)
     return;
 
-  MonitorDBStore::Transaction t;
-  while ((first_committed < first)
-      && (force || (first_committed < latest_full))) {
+  while ((first_committed < trim_to)) {
     dout(20) << __func__ << first_committed << dendl;
-    t.erase(get_service_name(), first_committed);
+    t->erase(get_service_name(), first_committed);
 
     if (has_full) {
       latest_key = mon->store->combine_strings("full", first_committed);
-      if (mon->store->exists(get_service_name(), latest_key))
-       t.erase(get_service_name(), latest_key);
+      t->erase(get_service_name(), latest_key);
     }
 
     first_committed++;
   }
-  put_first_committed(&t, first_committed);
-  mon->store->apply_transaction(t);
+  put_first_committed(t, first_committed);
 }
 
index 4a787b539b9c13d91f069c2ed95ade6a086acf6d..e1de779bac0de237d1981eb321800ea7456adb92 100644 (file)
@@ -76,6 +76,12 @@ class PaxosService {
    * then have_pending should be true; otherwise, false.
    */
   bool have_pending; 
+  /**
+   * The version to trim to. If zero, we assume there is no version to be
+   * trimmed; otherwise, we assume we should trim to the version held by
+   * this variable.
+   */
+  version_t trim_version;
 
 protected:
   /**
@@ -175,6 +181,7 @@ public:
   PaxosService(Monitor *mn, Paxos *p, string name) 
     : mon(mn), paxos(p), service_name(name),
       service_version(0), proposal_timer(0), have_pending(false),
+      trim_version(0),
       last_committed_name("last_committed"),
       first_committed_name("first_committed"),
       last_accepted_name("last_accepted"),
@@ -553,6 +560,10 @@ public:
    */
   void wakeup_proposing_waiters();
 
+  /**
+   * @defgroup PaxosService_h_Trim
+   * @{
+   */
   /**
    * Trim our log. This implies getting rid of versions on the k/v store.
    * Services implementing us don't have to implement this function if they
@@ -568,7 +579,49 @@ public:
    *             expected behavior is that, when 'true', we will remove all
    *             the log versions even if we don't have a full map in store.
    */
-  void trim_to(version_t first, bool force = false);
+  virtual void encode_trim(MonitorDBStore::Transaction *t);
+  /**
+   * Check if we should trim.
+   *
+   * We define this function here, because we assume that as long as we know of
+   * a version to trim, we should trim. However, any implementation should feel
+   * free to define its own version of this function if deemed necessary.
+   *
+   * @returns true if we should trim; false otherwise.
+   */
+  virtual bool should_trim() {
+    update_trim();
+    return (get_trim_to() > 0);
+  }
+  /**
+   * Update our trim status. We do nothing here, because there is no
+   * straightforward way to update the trim version, since that's service
+   * specific. However, we do not force services to implement it, since there
+   * a couple of services that do not trim anything at all, and we don't want
+   * to shove this function down their throats if they are never going to use
+   * it anyway.
+   */
+  virtual void update_trim() { }
+  /**
+   * Set the trim version variable to @p ver
+   *
+   * @param ver The version to trim to.
+   */
+  void set_trim_to(version_t ver) {
+    trim_version = ver;
+  }
+  /**
+   * Get the version we should trim to.
+   *
+   * @returns the version we should trim to; if we return zero, it should be
+   *         assumed that there's no version to trim to.
+   */
+  version_t get_trim_to() {
+    return trim_version;
+  }
+  /**
+   * @}
+   */
 
 
   /**