// TODO: use allocated tag_id
future = m_journaler->append(0, bl);
+
+ // delay committing op event to ensure consistent replay
+ assert(m_op_futures.count(op_tid) == 0);
+ m_op_futures[op_tid] = future;
}
on_safe = create_async_context_callback(m_image_ctx, on_safe);
- future.flush(new C_OpEventSafe(this, op_tid, future, on_safe));
+ future.flush(on_safe);
CephContext *cct = m_image_ctx.cct;
ldout(cct, 10) << this << " " << __func__ << ": "
bufferlist bl;
::encode(event_entry, bl);
- Future future;
+ Future op_start_future;
+ Future op_finish_future;
{
Mutex::Locker locker(m_lock);
assert(m_state == STATE_READY);
+ // ready to commit op event
+ auto it = m_op_futures.find(op_tid);
+ assert(it != m_op_futures.end());
+ op_start_future = it->second;
+ m_op_futures.erase(it);
+
// TODO: use allocated tag_id
- future = m_journaler->append(0, bl);
+ op_finish_future = m_journaler->append(0, bl);
}
- future.flush(new C_OpEventSafe(this, op_tid, future, nullptr));
+ op_finish_future.flush(new C_OpEventSafe(this, op_tid, op_start_future,
+ op_finish_future));
}
template <typename I>
}
template <typename I>
-void Journal<I>::handle_op_event_safe(int r, uint64_t tid, const Future &future,
- Context *on_safe) {
+void Journal<I>::handle_op_event_safe(int r, uint64_t tid,
+ const Future &op_start_future,
+ const Future &op_finish_future) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": r=" << r << ", "
<< "tid=" << tid << dendl;
lderr(cct) << "failed to commit op event: " << cpp_strerror(r) << dendl;
}
- m_journaler->committed(future);
- if (on_safe != nullptr) {
- on_safe->complete(r);
- }
+ m_journaler->committed(op_start_future);
+ m_journaler->committed(op_finish_future);
+
+ // reduce the replay window after committing an op event
+ m_journaler->flush_commit_position(nullptr);
}
template <typename I>
#include "include/atomic.h"
#include "include/Context.h"
#include "include/interval_set.h"
-#include "include/unordered_map.h"
#include "include/rados/librados.hpp"
#include "common/Mutex.h"
#include "journal/Future.h"
#include <iosfwd>
#include <list>
#include <string>
+#include <unordered_map>
class Context;
namespace journal {
}
};
- typedef ceph::unordered_map<uint64_t, Event> Events;
+ typedef std::unordered_map<uint64_t, Event> Events;
+ typedef std::unordered_map<uint64_t, Future> TidToFutures;
struct C_IOEventSafe : public Context {
Journal *journal;
struct C_OpEventSafe : public Context {
Journal *journal;
uint64_t tid;
- Future future;
- Context *on_safe;
+ Future op_start_future;
+ Future op_finish_future;
- C_OpEventSafe(Journal *journal, uint64_t tid, const Future &future,
- Context *on_safe)
- : journal(journal), tid(tid), future(future), on_safe(on_safe) {
+ C_OpEventSafe(Journal *journal, uint64_t tid, const Future &op_start_future,
+ const Future &op_finish_future)
+ : journal(journal), tid(tid), op_start_future(op_start_future),
+ op_finish_future(op_finish_future) {
}
virtual void finish(int r) {
- journal->handle_op_event_safe(r, tid, future, on_safe);
+ journal->handle_op_event_safe(r, tid, op_start_future, op_finish_future);
}
};
Events m_events;
atomic_t m_op_tid;
+ TidToFutures m_op_futures;
bool m_blocking_writes;
void handle_journal_destroyed(int r);
void handle_io_event_safe(int r, uint64_t tid);
- void handle_op_event_safe(int r, uint64_t tid, const Future &future,
- Context *on_safe);
+ void handle_op_event_safe(int r, uint64_t tid, const Future &op_start_future,
+ const Future &op_finish_future);
void stop_recording();