uint64_t commit_tid)
: RefCountedObject(NULL, 0), m_tag_tid(tag_tid), m_entry_tid(entry_tid),
m_commit_tid(commit_tid),
- m_lock(utils::unique_lock_name("FutureImpl::m_lock", this)), m_safe(false),
+ m_lock("FutureImpl::m_lock", false, false), m_safe(false),
m_consistent(false), m_return_value(0), m_flush_state(FLUSH_STATE_NONE),
m_consistent_ack(this) {
}
}
void FutureImpl::flush(Context *on_safe) {
+
bool complete;
- FlushHandlerPtr flush_handler;
+ FlushHandlers flush_handlers;
+ FutureImplPtr prev_future;
{
Mutex::Locker locker(m_lock);
complete = (m_safe && m_consistent);
if (!complete) {
- if (on_safe != NULL) {
+ if (on_safe != nullptr) {
m_contexts.push_back(on_safe);
}
- if (m_flush_state == FLUSH_STATE_NONE) {
- m_flush_state = FLUSH_STATE_REQUESTED;
- flush_handler = m_flush_handler;
-
- // walk the chain backwards up to <splay width> futures
- if (m_prev_future) {
- m_prev_future->flush();
- }
- }
+ prev_future = prepare_flush(&flush_handlers);
}
}
+ // instruct prior futures to flush as well
+ while (prev_future) {
+ Mutex::Locker locker(prev_future->m_lock);
+ prev_future = prev_future->prepare_flush(&flush_handlers);
+ }
+
if (complete && on_safe != NULL) {
on_safe->complete(m_return_value);
- } else if (flush_handler) {
+ } else if (!flush_handlers.empty()) {
// attached to journal object -- instruct it to flush all entries through
// this one. possible to become detached while lock is released, so flush
// will be re-requested by the object if it doesn't own the future
- flush_handler->flush(this);
+ for (auto &pair : flush_handlers) {
+ pair.first->flush(pair.second);
+ }
+ }
+}
+
+FutureImplPtr FutureImpl::prepare_flush(FlushHandlers *flush_handlers) {
+ assert(m_lock.is_locked());
+
+ if (m_flush_state == FLUSH_STATE_NONE) {
+ m_flush_state = FLUSH_STATE_REQUESTED;
+
+ if (m_flush_handler && flush_handlers->count(m_flush_handler) == 0) {
+ flush_handlers->insert({m_flush_handler, this});
+ }
}
+ return m_prev_future;
}
void FutureImpl::wait(Context *on_safe) {
#include "common/RefCountedObj.h"
#include "journal/Future.h"
#include <list>
+#include <map>
#include <boost/noncopyable.hpp>
#include <boost/intrusive_ptr.hpp>
#include "include/assert.h"
private:
friend std::ostream &operator<<(std::ostream &, const FutureImpl &);
+ typedef std::map<FlushHandlerPtr, FutureImplPtr> FlushHandlers;
typedef std::list<Context *> Contexts;
enum FlushState {
C_ConsistentAck m_consistent_ack;
Contexts m_contexts;
+ FutureImplPtr prepare_flush(FlushHandlers *flush_handlers);
+
void consistent(int r);
void finish_unlock();
};
future1);
journal::FutureImplPtr future3 = create_future(235, 1, 458,
future2);
+
+ FlushHandler flush_handler;
ASSERT_FALSE(future1->attach(&m_flush_handler));
- ASSERT_FALSE(future2->attach(&m_flush_handler));
+ ASSERT_FALSE(future2->attach(&flush_handler));
ASSERT_FALSE(future3->attach(&m_flush_handler));
C_SaferCond cond;
future3->flush(&cond);
- ASSERT_EQ(3U, m_flush_handler.flushes);
+ ASSERT_EQ(1U, m_flush_handler.flushes);
+ ASSERT_EQ(1U, flush_handler.flushes);
future3->safe(0);
ASSERT_FALSE(future3->is_complete());