// vim: ts=8 sw=2 smarttab
#include "journal/JournalRecorder.h"
+#include "common/Finisher.h"
#include "journal/Entry.h"
#include "journal/Utils.h"
namespace journal {
+namespace {
+
+struct C_Flush : public Context {
+ JournalMetadataPtr journal_metadata;
+ Context *on_finish;
+ atomic_t pending_flushes;
+ int ret_val;
+
+ C_Flush(JournalMetadataPtr _journal_metadata, Context *_on_finish,
+ size_t _pending_flushes)
+ : journal_metadata(_journal_metadata), on_finish(_on_finish),
+ pending_flushes(_pending_flushes), ret_val(0) {
+ }
+
+ virtual void complete(int r) {
+ if (r < 0 && ret_val == 0) {
+ ret_val = r;
+ }
+ if (pending_flushes.dec() == 0) {
+ // ensure all prior callback have been flushed as well
+ journal_metadata->get_finisher().queue(on_finish, ret_val);
+ delete this;
+ }
+ }
+ virtual void finish(int r) {
+ }
+};
+
+} // anonymous namespace
+
JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
const std::string &object_oid_prefix,
const JournalMetadataPtr& journal_metadata,
{
Mutex::Locker locker(m_lock);
- ctx = new C_Flush(on_safe, m_object_ptrs.size());
+ ctx = new C_Flush(m_journal_metadata, on_safe, m_object_ptrs.size() + 1);
for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
it != m_object_ptrs.end(); ++it) {
it->second->flush(ctx);
}
}
- ctx->unblock();
+ // avoid holding the lock in case there is nothing to flush
+ ctx->complete(0);
}
ObjectRecorderPtr JournalRecorder::get_object(uint8_t splay_offset) {
}
};
- struct C_Flush : public Context {
- Context *on_finish;
- atomic_t pending_flushes;
- int ret_val;
-
- C_Flush(Context *_on_finish, size_t _pending_flushes)
- : on_finish(_on_finish), pending_flushes(_pending_flushes + 1),
- ret_val(0) {
- }
-
- void unblock() {
- complete(0);
- }
- virtual void complete(int r) {
- if (r < 0 && ret_val == 0) {
- ret_val = r;
- }
- if (pending_flushes.dec() == 0) {
- on_finish->complete(ret_val);
- delete this;
- }
- }
- virtual void finish(int r) {
- }
- };
-
librados::IoCtx m_ioctx;
CephContext *m_cct;
std::string m_object_oid_prefix;