#include "journal/Future.h"
#include "journal/FutureImpl.h"
+#include "include/assert.h"
namespace journal {
}
void Future::wait(Context *on_safe) {
+ assert(on_safe != NULL);
m_future_impl->wait(on_safe);
}
// vim: ts=8 sw=2 smarttab
#include "journal/FutureImpl.h"
+#include "common/Finisher.h"
#include "journal/Utils.h"
namespace journal {
-FutureImpl::FutureImpl(const std::string &tag, uint64_t tid)
- : RefCountedObject(NULL, 0), m_tag(tag), m_tid(tid),
+FutureImpl::FutureImpl(Finisher &finisher, const std::string &tag, uint64_t tid)
+ : RefCountedObject(NULL, 0), m_finisher(finisher), m_tag(tag), m_tid(tid),
m_lock(utils::unique_lock_name("FutureImpl::m_lock", this)), m_safe(false),
m_consistent(false), m_return_value(0), m_flush_state(FLUSH_STATE_NONE),
m_consistent_ack(this) {
}
if (complete && on_safe != NULL) {
- on_safe->complete(m_return_value);
+ m_finisher.queue(on_safe, m_return_value);
} else if (flush_handler) {
// attached to journal object -- instruct it to flush all entries through
// this one. possible to become detached while lock is released, so flush
return;
}
}
- on_safe->complete(m_return_value);
+ m_finisher.queue(on_safe, m_return_value);
}
bool FutureImpl::is_complete() const {
#include "include/assert.h"
class Context;
+class Finisher;
namespace journal {
};
typedef boost::intrusive_ptr<FlushHandler> FlushHandlerPtr;
- FutureImpl(const std::string &tag, uint64_t tid);
+ FutureImpl(Finisher &finisher, const std::string &tag, uint64_t tid);
void init(const FutureImplPtr &prev_future);
virtual void finish(int r) {}
};
+ Finisher &m_finisher;
std::string m_tag;
uint64_t m_tid;
#include "journal/JournalMetadata.h"
#include "journal/Utils.h"
#include "common/errno.h"
+#include "common/Finisher.h"
#include "common/Timer.h"
#include "cls/journal/cls_journal_client.h"
double commit_interval)
: m_cct(NULL), m_oid(oid), m_client_id(client_id),
m_commit_interval(commit_interval), m_order(0), m_splay_width(0),
- m_initialized(false), m_timer(NULL),
+ m_initialized(false), m_finisher(NULL), m_timer(NULL),
m_timer_lock("JournalMetadata::m_timer_lock"),
m_lock("JournalMetadata::m_lock"), m_watch_ctx(this), m_watch_handle(0),
m_update_notifications(0), m_commit_position_pending(false),
delete m_timer;
m_timer = NULL;
}
+ if (m_finisher != NULL) {
+ m_finisher->stop();
+ delete m_finisher;
+ m_finisher = NULL;
+ }
m_ioctx.unwatch2(m_watch_handle);
librados::Rados rados(m_ioctx);
assert(!m_initialized);
m_initialized = true;
+ m_finisher = new Finisher(m_cct);
+ m_finisher->start();
+
m_timer = new SafeTimer(m_cct, m_timer_lock, false);
m_timer->init();
#include <string>
#include "include/assert.h"
+class Finisher;
class SafeTimer;
namespace journal {
return m_splay_width;
}
+ inline Finisher &get_finisher() {
+ return *m_finisher;
+ }
+
inline SafeTimer &get_timer() {
return *m_timer;
}
uint8_t m_splay_width;
bool m_initialized;
+ Finisher *m_finisher;
SafeTimer *m_timer;
Mutex m_timer_lock;
uint8_t splay_offset = tid % splay_width;
ObjectRecorderPtr object_ptr = get_object(splay_offset);
- FutureImplPtr future(new FutureImpl(tag, tid));
+ FutureImplPtr future(new FutureImpl(m_journal_metadata->get_finisher(),
+ tag, tid));
future->init(m_prev_future);
m_prev_future = future;