]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
mon: accumulate a single pending transaction and propose it all at once
authorSage Weil <sage@redhat.com>
Tue, 13 Jan 2015 22:51:04 +0000 (14:51 -0800)
committerSage Weil <sage@redhat.com>
Tue, 13 Jan 2015 22:51:04 +0000 (14:51 -0800)
Previous we would queue lots of distinct encoded Transactions from various
callers, usually one per PaxosService.  These would be sent through paxos
one at a time.

If there is a completed transaction there is no reason to delay; it is
more efficient to push it through immediately.  Since we will propose
anything pending right when we finish, there is minimal opportunity for
other work to get done.

Instead, accumulate everything in a single MonitorDBStore::Transaction and
propose all pending changes all at once.  Encode at propose time and
expose the Transaction to the callers so they can add their changes.

Signed-off-by: Sage Weil <sage@redhat.com>
src/mon/ConfigKeyService.cc
src/mon/Monitor.cc
src/mon/Paxos.cc
src/mon/Paxos.h
src/mon/PaxosService.cc

index 9b1eacb035faa4bac3014f6221cbab36ae1a16de..97126ed0d1f0edd34da8b176da64e3fa8e7073aa 100644 (file)
@@ -48,19 +48,21 @@ int ConfigKeyService::store_get(string key, bufferlist &bl)
 void ConfigKeyService::store_put(string key, bufferlist &bl, Context *cb)
 {
   bufferlist proposal_bl;
-  MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
+  MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
   t->put(STORE_PREFIX, key, bl);
-  t->encode(proposal_bl);
-  paxos->propose_new_value(proposal_bl, cb);
+  if (cb)
+    paxos->queue_pending_finisher(cb);
+  paxos->trigger_propose();
 }
 
 void ConfigKeyService::store_delete(string key, Context *cb)
 {
   bufferlist proposal_bl;
-  MonitorDBStore::Transaction t;
-  t.erase(STORE_PREFIX, key);
-  t.encode(proposal_bl);
-  paxos->propose_new_value(proposal_bl, cb);
+  MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
+  t->erase(STORE_PREFIX, key);
+  if (cb)
+    paxos->queue_pending_finisher(cb);
+  paxos->trigger_propose();
 }
 
 bool ConfigKeyService::store_exists(string key)
index 78e2061fe6bebf4d74d09f0423ac2bd896aee1b9..5d9932a47a48ffeb2e4e3ec2b1fdf12545fd99d4 100644 (file)
@@ -4144,11 +4144,9 @@ void Monitor::tick()
 
   if (is_leader() && paxos->is_active() && fingerprint.is_zero()) {
     // this is only necessary on upgraded clusters.
-    MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
+    MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
     prepare_new_fingerprint(t);
-    bufferlist tbl;
-    t->encode(tbl);
-    paxos->propose_new_value(tbl, new C_NoopContext);
+    paxos->trigger_propose();
   }
 
   new_tick();
index 6f3a544199012da1816d8fe00a44c88c7b32ae51..2bd0893d21042561357ae8bed0fbe64de2e6fd75 100644 (file)
@@ -573,7 +573,7 @@ void Paxos::handle_last(MMonPaxos *last)
          finish_contexts(g_ceph_context, waiting_for_readable);
          finish_contexts(g_ceph_context, waiting_for_writeable);
 
-          queue_next();
+          maybe_propose_pending();
        }
       }
     }
@@ -923,7 +923,7 @@ void Paxos::commit_finish()
     finish_contexts(g_ceph_context, waiting_for_readable);
     finish_contexts(g_ceph_context, waiting_for_writeable);
 
-    queue_next();
+    maybe_propose_pending();
   }
 }
 
@@ -1039,20 +1039,9 @@ void Paxos::commit_proposal()
   assert(mon->is_leader());
   assert(is_refresh());
 
-  if (proposals.empty()) {
-    return;  // must have been updating previous
-  }
-
-  C_Proposal *proposal = static_cast<C_Proposal*>(proposals.front());
-  if (proposal->proposed) {
-    dout(10) << __func__ << " proposal " << proposal << " took "
-            << (ceph_clock_now(NULL) - proposal->proposal_time)
-            << " to finish" << dendl;
-    proposals.pop_front();
-    proposal->complete(0);
-  } else {
-    // must have been updating previous.
-  }
+  list<Context*> ls;
+  ls.swap(committing_finishers);
+  finish_contexts(g_ceph_context, ls);
 }
 
 void Paxos::finish_round()
@@ -1064,17 +1053,16 @@ void Paxos::finish_round()
   state = STATE_ACTIVE;
 }
 
-void Paxos::queue_next()
+void Paxos::maybe_propose_pending()
 {
-  dout(10) << __func__ << " state " << state
-          << " proposals left " << proposals.size() << dendl;
+  dout(10) << __func__ << " state " << state << dendl;
 
   if (should_trim()) {
     trim();
   }
 
-  if (is_active() && !proposals.empty()) {
-    propose_queued();
+  if (is_active() && pending_proposal) {
+    propose_pending();
   }
 }
 
@@ -1212,7 +1200,7 @@ void Paxos::trim()
 
   dout(10) << "trim to " << end << " (was " << first_committed << ")" << dendl;
 
-  MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
+  MonitorDBStore::TransactionRef t = get_pending_transaction();
 
   for (version_t v = first_committed; v < end; ++v) {
     dout(10) << "trim " << v << dendl;
@@ -1224,17 +1212,8 @@ void Paxos::trim()
     t->compact_range(get_name(), stringify(first_committed - 1), stringify(end));
   }
 
-  dout(30) << __func__ << " transaction dump:\n";
-  JSONFormatter f(true);
-  t->dump(&f);
-  f.flush(*_dout);
-  *_dout << dendl;
-
-  bufferlist bl;
-  t->encode(bl);
-
   trimming = true;
-  queue_proposal(bl, new C_Trimmed(this));
+  queue_pending_finisher(new C_Trimmed(this));
 }
 
 /*
@@ -1298,13 +1277,19 @@ void Paxos::cancel_events()
   }
 }
 
-void Paxos::shutdown() {
+void Paxos::shutdown()
+{
   dout(10) << __func__ << " cancel all contexts" << dendl;
+
+  // discard pending transaction
+  pending_proposal.reset();
+
   finish_contexts(g_ceph_context, waiting_for_writeable, -ECANCELED);
   finish_contexts(g_ceph_context, waiting_for_commit, -ECANCELED);
   finish_contexts(g_ceph_context, waiting_for_readable, -ECANCELED);
   finish_contexts(g_ceph_context, waiting_for_active, -ECANCELED);
-  finish_contexts(g_ceph_context, proposals, -ECANCELED);
+  finish_contexts(g_ceph_context, pending_finishers, -ECANCELED);
+  finish_contexts(g_ceph_context, committing_finishers, -ECANCELED);
   if (logger)
     g_ceph_context->get_perfcounters_collection()->remove(logger);
   delete logger;
@@ -1315,7 +1300,11 @@ void Paxos::leader_init()
   cancel_events();
   new_value.clear();
 
-  finish_contexts(g_ceph_context, proposals, -EAGAIN);
+  // discard pending transaction
+  pending_proposal.reset();
+
+  finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
+  finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);
 
   logger->inc(l_paxos_start_leader);
 
@@ -1342,10 +1331,14 @@ void Paxos::peon_init()
   // start a timer, in case the leader never manages to issue a lease
   reset_lease_timeout();
 
+  // discard pending transaction
+  pending_proposal.reset();
+
   // no chance to write now!
   finish_contexts(g_ceph_context, waiting_for_writeable, -EAGAIN);
   finish_contexts(g_ceph_context, waiting_for_commit, -EAGAIN);
-  finish_contexts(g_ceph_context, proposals, -EAGAIN);
+  finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
+  finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);
 
   logger->inc(l_paxos_start_peon);
 }
@@ -1365,7 +1358,11 @@ void Paxos::restart()
   }
   state = STATE_RECOVERING;
 
-  finish_contexts(g_ceph_context, proposals, -EAGAIN);
+  // discard pending transaction
+  pending_proposal.reset();
+
+  finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);
+  finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
   finish_contexts(g_ceph_context, waiting_for_commit, -EAGAIN);
   finish_contexts(g_ceph_context, waiting_for_active, -EAGAIN);
 
@@ -1484,60 +1481,57 @@ bool Paxos::is_writeable()
     is_lease_valid();
 }
 
-void Paxos::list_proposals(ostream& out)
-{
-  out << __func__ << " " << proposals.size() << " in queue:\n";
-  list<Context*>::iterator p_it = proposals.begin();
-  for (int i = 0; p_it != proposals.end(); ++p_it, ++i) {
-    C_Proposal *p = (C_Proposal*) *p_it;
-    out << "-- entry #" << i << "\n";
-    out << *p << "\n";
-  }
-}
-
-void Paxos::propose_queued()
+void Paxos::propose_pending()
 {
   assert(is_active());
-  assert(!proposals.empty());
-
-  C_Proposal *proposal = static_cast<C_Proposal*>(proposals.front());
-  assert(!proposal->proposed);
+  assert(pending_proposal);
 
   cancel_events();
-  dout(10) << __func__ << " " << (last_committed + 1)
-         << " " << proposal->bl.length() << " bytes" << dendl;
-  proposal->proposed = true;
 
-  dout(30) << __func__ << " ";
-  list_proposals(*_dout);
+  bufferlist bl;
+  pending_proposal->encode(bl);
+  pending_proposal.reset();
+
+  dout(10) << __func__ << " " << (last_committed + 1)
+          << " " << bl.length() << " bytes" << dendl;
+  dout(30) << __func__ << " transaction dump:\n";
+  JSONFormatter f(true);
+  pending_proposal->dump(&f);
+  f.flush(*_dout);
   *_dout << dendl;
 
+  committing_finishers.swap(pending_finishers);
   state = STATE_UPDATING;
-  begin(proposal->bl);
+  begin(bl);
 }
 
-void Paxos::queue_proposal(bufferlist& bl, Context *onfinished)
+void Paxos::queue_pending_finisher(Context *onfinished)
 {
-  dout(5) << __func__ << " bl " << bl.length() << " bytes;"
-         << " ctx = " << onfinished << dendl;
-
-  proposals.push_back(new C_Proposal(onfinished, bl));
+  dout(5) << __func__ << " " << onfinished << dendl;
+  assert(onfinished);
+  pending_finishers.push_back(onfinished);
 }
 
-bool Paxos::propose_new_value(bufferlist& bl, Context *onfinished)
+MonitorDBStore::TransactionRef Paxos::get_pending_transaction()
 {
   assert(mon->is_leader());
+  if (!pending_proposal) {
+    pending_proposal.reset(new MonitorDBStore::Transaction);
+    assert(pending_finishers.empty());
+  }
+  return pending_proposal;
+}
 
-  queue_proposal(bl, onfinished);
-
-  if (!is_active()) {
-    dout(5) << __func__ << " not active; proposal queued" << dendl; 
+bool Paxos::trigger_propose()
+{
+  if (is_active()) {
+    dout(10) << __func__ << " active, proposing now" << dendl;
+    propose_pending();
     return true;
+  } else {
+    dout(10) << __func__ << " not active, will propose later" << dendl;
+    return false;
   }
-
-  propose_queued();
-  
-  return true;
 }
 
 bool Paxos::is_consistent()
index 8025f06a080c1c12c787f05df75fb1ed5d5c944a..1bb710b4ccdaa87a532c6c346099512e46521534 100644 (file)
@@ -575,13 +575,31 @@ private:
    *         fully committed.
    */
   list<Context*> waiting_for_commit;
+
   /**
+   * Pending proposal transaction
    *
+   * This is the transaction that is under construction and pending
+   * proposal.  We will add operations to it until we decide it is
+   * time to start a paxos round.
    */
-  list<Context*> proposals;
+  MonitorDBStore::TransactionRef pending_proposal;
+
   /**
-   * @}
+   * Finishers for pending transaction
+   *
+   * These are waiting for updates in the pending proposal/transaction
+   * to be committed.
+   */
+  list<Context*> pending_finishers;
+
+  /**
+   * Finishers for committing transaction
+   *
+   * When the pending_proposal is submitted, pending_finishers move to
+   * this list.  When it commits, these finishers are notified.
    */
+  list<Context*> committing_finishers;
 
   /**
    * @defgroup Paxos_h_sync_warns Synchronization warnings
@@ -1046,17 +1064,9 @@ private:
   void warn_on_future_time(utime_t t, entity_name_t from);
 
   /**
-   * Queue a new proposal by pushing it at the back of the queue; do not
-   * propose it.
-   *
-   * @param bl The bufferlist to be proposed
-   * @param onfinished The callback to be called once the proposal finishes
-   */
-  void queue_proposal(bufferlist& bl, Context *onfinished);
-  /**
-   * Begin proposing the Proposal at the front of the proposals queue.
+   * Begin proposing the pending_proposal.
    */
-  void propose_queued();
+  void propose_pending();
 
   /**
    * refresh state from store
@@ -1070,7 +1080,14 @@ private:
 
   void commit_proposal();
   void finish_round();
-  void queue_next();
+
+  /**
+   * propose pending, if any
+   *
+   * This is called at the end of the round to check if there is another
+   * pending proposal ready to go.
+   */
+  void maybe_propose_pending();
 
 public:
   /**
@@ -1335,22 +1352,30 @@ public:
   }
 
   /**
-   * List all queued proposals
+   * Get a transaction to submit operations to propose against
    *
-   * @param out[out] Output Stream onto which we will output the list
-   *                of queued proposals.
+   * Apply operations to this transaction.  It will eventually be proposed
+   * to paxos.
    */
-  void list_proposals(ostream& out);
+  MonitorDBStore::TransactionRef get_pending_transaction();
+
   /**
-   * Propose a new value to the Leader.
+   * Queue a completion for the pending proposal
    *
-   * This function enables the submission of a new value to the Leader, which
-   * will trigger a new proposal.
+   * This completion will get triggered when the pending proposal
+   * transaction commits.
+   */
+  void queue_pending_finisher(Context *onfinished);
+
+  /**
+   * (try to) trigger a proposal
    *
-   * @param bl A bufferlist holding the value to be proposed
-   * @param onfinish A callback to be fired up once we finish the proposal
+   * Tell paxos that it should submit the pending proposal.  Note that if it
+   * is not active (e.g., because it is already in the midst of committing
+   * something) that will be deferred (e.g., until the current round finishes).
    */
-  bool propose_new_value(bufferlist& bl, Context *onfinished=0);
+  bool trigger_propose();
+
   /**
    * Add oncommit to the back of the list of callbacks waiting for us to
    * finish committing.
index 92257209565f821521ba7fb3725fa4c46ef86da9..8d06b0bed85acbd061e252d084f6419cc47e69df 100644 (file)
@@ -181,15 +181,15 @@ void PaxosService::propose_pending()
   }
 
   /**
-   * @note The value we propose is encoded in a bufferlist, passed to 
-   *      Paxos::propose_new_value and it is obtained by calling a 
-   *      function that must be implemented by the class implementing us.
-   *      I.e., the function encode_pending will be the one responsible
-   *      to encode whatever is pending on the implementation class into a
-   *      bufferlist, so we can then propose that as a value through Paxos.
+   * @note What we contirbute to the pending Paxos transaction is
+   *      obtained by calling a function that must be implemented by
+   *      the class implementing us.  I.e., the function
+   *      encode_pending will be the one responsible to encode
+   *      whatever is pending on the implementation class into a
+   *      bufferlist, so we can then propose that as a value through
+   *      Paxos.
    */
-  MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
-  bufferlist bl;
+  MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
 
   if (should_stash_full())
     encode_full(t);
@@ -201,17 +201,10 @@ void PaxosService::propose_pending()
     t->put(get_service_name(), "format_version", format_version);
   }
 
-  dout(30) << __func__ << " transaction dump:\n";
-  JSONFormatter f(true);
-  t->dump(&f);
-  f.flush(*_dout);
-  *_dout << dendl;
-
-  t->encode(bl);
-
   // apply to paxos
   proposing = true;
-  paxos->propose_new_value(bl, new C_Committed(this));
+  paxos->queue_pending_finisher(new C_Committed(this));
+  paxos->trigger_propose();
 }
 
 bool PaxosService::should_stash_full()
@@ -351,16 +344,14 @@ void PaxosService::maybe_trim()
   }
 
   dout(10) << __func__ << " trimming to " << trim_to << ", " << to_remove << " states" << dendl;
-  MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
+  MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
   trim(t, get_first_committed(), trim_to);
   put_first_committed(t, trim_to);
 
   // let the service add any extra stuff
   encode_trim_extra(t, trim_to);
 
-  bufferlist bl;
-  t->encode(bl);
-  paxos->propose_new_value(bl, NULL);
+  paxos->trigger_propose();
 }
 
 void PaxosService::trim(MonitorDBStore::TransactionRef t,