finish_contexts(g_ceph_context, waiting_for_readable);
finish_contexts(g_ceph_context, waiting_for_writeable);
- queue_next();
+ maybe_propose_pending();
}
}
}
finish_contexts(g_ceph_context, waiting_for_readable);
finish_contexts(g_ceph_context, waiting_for_writeable);
- queue_next();
+ maybe_propose_pending();
}
}
assert(mon->is_leader());
assert(is_refresh());
- if (proposals.empty()) {
- return; // must have been updating previous
- }
-
- C_Proposal *proposal = static_cast<C_Proposal*>(proposals.front());
- if (proposal->proposed) {
- dout(10) << __func__ << " proposal " << proposal << " took "
- << (ceph_clock_now(NULL) - proposal->proposal_time)
- << " to finish" << dendl;
- proposals.pop_front();
- proposal->complete(0);
- } else {
- // must have been updating previous.
- }
+ list<Context*> ls;
+ ls.swap(committing_finishers);
+ finish_contexts(g_ceph_context, ls);
}
void Paxos::finish_round()
state = STATE_ACTIVE;
}
-void Paxos::queue_next()
+void Paxos::maybe_propose_pending()
{
- dout(10) << __func__ << " state " << state
- << " proposals left " << proposals.size() << dendl;
+ dout(10) << __func__ << " state " << state << dendl;
if (should_trim()) {
trim();
}
- if (is_active() && !proposals.empty()) {
- propose_queued();
+ if (is_active() && pending_proposal) {
+ propose_pending();
}
}
dout(10) << "trim to " << end << " (was " << first_committed << ")" << dendl;
- MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
+ MonitorDBStore::TransactionRef t = get_pending_transaction();
for (version_t v = first_committed; v < end; ++v) {
dout(10) << "trim " << v << dendl;
t->compact_range(get_name(), stringify(first_committed - 1), stringify(end));
}
- dout(30) << __func__ << " transaction dump:\n";
- JSONFormatter f(true);
- t->dump(&f);
- f.flush(*_dout);
- *_dout << dendl;
-
- bufferlist bl;
- t->encode(bl);
-
trimming = true;
- queue_proposal(bl, new C_Trimmed(this));
+ queue_pending_finisher(new C_Trimmed(this));
}
/*
}
}
-void Paxos::shutdown() {
+void Paxos::shutdown()
+{
dout(10) << __func__ << " cancel all contexts" << dendl;
+
+ // discard pending transaction
+ pending_proposal.reset();
+
finish_contexts(g_ceph_context, waiting_for_writeable, -ECANCELED);
finish_contexts(g_ceph_context, waiting_for_commit, -ECANCELED);
finish_contexts(g_ceph_context, waiting_for_readable, -ECANCELED);
finish_contexts(g_ceph_context, waiting_for_active, -ECANCELED);
- finish_contexts(g_ceph_context, proposals, -ECANCELED);
+ finish_contexts(g_ceph_context, pending_finishers, -ECANCELED);
+ finish_contexts(g_ceph_context, committing_finishers, -ECANCELED);
if (logger)
g_ceph_context->get_perfcounters_collection()->remove(logger);
delete logger;
cancel_events();
new_value.clear();
- finish_contexts(g_ceph_context, proposals, -EAGAIN);
+ // discard pending transaction
+ pending_proposal.reset();
+
+ finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
+ finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);
logger->inc(l_paxos_start_leader);
// start a timer, in case the leader never manages to issue a lease
reset_lease_timeout();
+ // discard pending transaction
+ pending_proposal.reset();
+
// no chance to write now!
finish_contexts(g_ceph_context, waiting_for_writeable, -EAGAIN);
finish_contexts(g_ceph_context, waiting_for_commit, -EAGAIN);
- finish_contexts(g_ceph_context, proposals, -EAGAIN);
+ finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
+ finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);
logger->inc(l_paxos_start_peon);
}
}
state = STATE_RECOVERING;
- finish_contexts(g_ceph_context, proposals, -EAGAIN);
+ // discard pending transaction
+ pending_proposal.reset();
+
+ finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);
+ finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
finish_contexts(g_ceph_context, waiting_for_commit, -EAGAIN);
finish_contexts(g_ceph_context, waiting_for_active, -EAGAIN);
is_lease_valid();
}
-void Paxos::list_proposals(ostream& out)
-{
- out << __func__ << " " << proposals.size() << " in queue:\n";
- list<Context*>::iterator p_it = proposals.begin();
- for (int i = 0; p_it != proposals.end(); ++p_it, ++i) {
- C_Proposal *p = (C_Proposal*) *p_it;
- out << "-- entry #" << i << "\n";
- out << *p << "\n";
- }
-}
-
-void Paxos::propose_queued()
+void Paxos::propose_pending()
{
assert(is_active());
- assert(!proposals.empty());
-
- C_Proposal *proposal = static_cast<C_Proposal*>(proposals.front());
- assert(!proposal->proposed);
+ assert(pending_proposal);
cancel_events();
- dout(10) << __func__ << " " << (last_committed + 1)
- << " " << proposal->bl.length() << " bytes" << dendl;
- proposal->proposed = true;
- dout(30) << __func__ << " ";
- list_proposals(*_dout);
+ bufferlist bl;
+ pending_proposal->encode(bl);
+ pending_proposal.reset();
+
+ dout(10) << __func__ << " " << (last_committed + 1)
+ << " " << bl.length() << " bytes" << dendl;
+ dout(30) << __func__ << " transaction dump:\n";
+ JSONFormatter f(true);
+ pending_proposal->dump(&f);
+ f.flush(*_dout);
*_dout << dendl;
+ committing_finishers.swap(pending_finishers);
state = STATE_UPDATING;
- begin(proposal->bl);
+ begin(bl);
}
-void Paxos::queue_proposal(bufferlist& bl, Context *onfinished)
+void Paxos::queue_pending_finisher(Context *onfinished)
{
- dout(5) << __func__ << " bl " << bl.length() << " bytes;"
- << " ctx = " << onfinished << dendl;
-
- proposals.push_back(new C_Proposal(onfinished, bl));
+ dout(5) << __func__ << " " << onfinished << dendl;
+ assert(onfinished);
+ pending_finishers.push_back(onfinished);
}
-bool Paxos::propose_new_value(bufferlist& bl, Context *onfinished)
+MonitorDBStore::TransactionRef Paxos::get_pending_transaction()
{
assert(mon->is_leader());
+ if (!pending_proposal) {
+ pending_proposal.reset(new MonitorDBStore::Transaction);
+ assert(pending_finishers.empty());
+ }
+ return pending_proposal;
+}
- queue_proposal(bl, onfinished);
-
- if (!is_active()) {
- dout(5) << __func__ << " not active; proposal queued" << dendl;
+bool Paxos::trigger_propose()
+{
+ if (is_active()) {
+ dout(10) << __func__ << " active, proposing now" << dendl;
+ propose_pending();
return true;
+ } else {
+ dout(10) << __func__ << " not active, will propose later" << dendl;
+ return false;
}
-
- propose_queued();
-
- return true;
}
bool Paxos::is_consistent()
* fully committed.
*/
list<Context*> waiting_for_commit;
+
/**
+ * Pending proposal transaction
*
+ * This is the transaction that is under construction and pending
+ * proposal. We will add operations to it until we decide it is
+ * time to start a paxos round.
*/
- list<Context*> proposals;
+ MonitorDBStore::TransactionRef pending_proposal;
+
/**
- * @}
+ * Finishers for pending transaction
+ *
+ * These are waiting for updates in the pending proposal/transaction
+ * to be committed.
+ */
+ list<Context*> pending_finishers;
+
+ /**
+ * Finishers for committing transaction
+ *
+ * When the pending_proposal is submitted, pending_finishers move to
+ * this list. When it commits, these finishers are notified.
*/
+ list<Context*> committing_finishers;
/**
* @defgroup Paxos_h_sync_warns Synchronization warnings
void warn_on_future_time(utime_t t, entity_name_t from);
/**
- * Queue a new proposal by pushing it at the back of the queue; do not
- * propose it.
- *
- * @param bl The bufferlist to be proposed
- * @param onfinished The callback to be called once the proposal finishes
- */
- void queue_proposal(bufferlist& bl, Context *onfinished);
- /**
- * Begin proposing the Proposal at the front of the proposals queue.
+ * Begin proposing the pending_proposal.
*/
- void propose_queued();
+ void propose_pending();
/**
* refresh state from store
void commit_proposal();
void finish_round();
- void queue_next();
+
+ /**
+ * propose pending, if any
+ *
+ * This is called at the end of the round to check if there is another
+ * pending proposal ready to go.
+ */
+ void maybe_propose_pending();
public:
/**
}
/**
- * List all queued proposals
+ * Get a transaction to submit operations to propose against
*
- * @param out[out] Output Stream onto which we will output the list
- * of queued proposals.
+ * Apply operations to this transaction. It will eventually be proposed
+ * to paxos.
*/
- void list_proposals(ostream& out);
+ MonitorDBStore::TransactionRef get_pending_transaction();
+
/**
- * Propose a new value to the Leader.
+ * Queue a completion for the pending proposal
*
- * This function enables the submission of a new value to the Leader, which
- * will trigger a new proposal.
+ * This completion will get triggered when the pending proposal
+ * transaction commits.
+ */
+ void queue_pending_finisher(Context *onfinished);
+
+ /**
+ * (try to) trigger a proposal
*
- * @param bl A bufferlist holding the value to be proposed
- * @param onfinish A callback to be fired up once we finish the proposal
+ * Tell paxos that it should submit the pending proposal. Note that if it
+ * is not active (e.g., because it is already in the midst of committing
+ * something) that will be deferred (e.g., until the current round finishes).
*/
- bool propose_new_value(bufferlist& bl, Context *onfinished=0);
+ bool trigger_propose();
+
/**
* Add oncommit to the back of the list of callbacks waiting for us to
* finish committing.
}
/**
- * @note The value we propose is encoded in a bufferlist, passed to
- * Paxos::propose_new_value and it is obtained by calling a
- * function that must be implemented by the class implementing us.
- * I.e., the function encode_pending will be the one responsible
- * to encode whatever is pending on the implementation class into a
- * bufferlist, so we can then propose that as a value through Paxos.
+ * @note What we contirbute to the pending Paxos transaction is
+ * obtained by calling a function that must be implemented by
+ * the class implementing us. I.e., the function
+ * encode_pending will be the one responsible to encode
+ * whatever is pending on the implementation class into a
+ * bufferlist, so we can then propose that as a value through
+ * Paxos.
*/
- MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
- bufferlist bl;
+ MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
if (should_stash_full())
encode_full(t);
t->put(get_service_name(), "format_version", format_version);
}
- dout(30) << __func__ << " transaction dump:\n";
- JSONFormatter f(true);
- t->dump(&f);
- f.flush(*_dout);
- *_dout << dendl;
-
- t->encode(bl);
-
// apply to paxos
proposing = true;
- paxos->propose_new_value(bl, new C_Committed(this));
+ paxos->queue_pending_finisher(new C_Committed(this));
+ paxos->trigger_propose();
}
bool PaxosService::should_stash_full()
}
dout(10) << __func__ << " trimming to " << trim_to << ", " << to_remove << " states" << dendl;
- MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
+ MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
trim(t, get_first_committed(), trim_to);
put_first_committed(t, trim_to);
// let the service add any extra stuff
encode_trim_extra(t, trim_to);
- bufferlist bl;
- t->encode(bl);
- paxos->propose_new_value(bl, NULL);
+ paxos->trigger_propose();
}
void PaxosService::trim(MonitorDBStore::TransactionRef t,