return Future(future);
}
-void JournalRecorder::flush() {
+void JournalRecorder::flush(Context *on_safe) {
Mutex::Locker locker(m_lock);
+
+ C_Flush *ctx = new C_Flush(on_safe, m_object_ptrs.size());
for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
it != m_object_ptrs.end(); ++it) {
- it->second->flush();
+ it->second->flush(ctx);
}
}
#define CEPH_JOURNAL_JOURNAL_RECORDER_H
#include "include/int_types.h"
+#include "include/atomic.h"
+#include "include/Context.h"
#include "include/rados/librados.hpp"
#include "common/Mutex.h"
#include "journal/Future.h"
~JournalRecorder();
Future append(const std::string &tag, const bufferlist &bl);
- void flush();
+ void flush(Context *on_safe);
ObjectRecorderPtr get_object(uint8_t splay_offset);
}
};
+ struct C_Flush : public Context {
+ Context *on_finish;
+ atomic_t pending_flushes;
+ int ret_val;
+
+ C_Flush(Context *_on_finish, size_t _pending_flushes)
+ : on_finish(_on_finish), pending_flushes(_pending_flushes), ret_val(0) {
+ }
+
+ virtual void complete(int r) {
+ if (r < 0 && ret_val == 0) {
+ ret_val = r;
+ }
+ if (pending_flushes.dec() == 0) {
+ on_finish->complete(ret_val);
+ delete this;
+ }
+ }
+ virtual void finish(int r) {
+ }
+ };
+
librados::IoCtx m_ioctx;
CephContext *m_cct;
std::string m_object_oid_prefix;
}
void Journaler::flush(Context *on_safe) {
- // TODO pass ctx
- m_recorder->flush();
+ m_recorder->flush(on_safe);
}
void Journaler::create_player(ReplayHandler *replay_handler) {
// vim: ts=8 sw=2 smarttab
#include "journal/ObjectRecorder.h"
+#include "journal/Future.h"
#include "journal/Utils.h"
#include "include/assert.h"
#include "common/Timer.h"
return (m_size + m_pending_bytes >= m_soft_max_size);
}
-void ObjectRecorder::flush() {
+void ObjectRecorder::flush(Context *on_safe) {
ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
- Mutex::Locker locker(m_lock);
- if (flush_appends(true)) {
- cancel_append_task();
+ Future future;
+ {
+ Mutex::Locker locker(m_lock);
+
+ // attach the flush to the most recent append
+ if (!m_append_buffers.empty()) {
+ future = Future(m_append_buffers.rbegin()->first);
+
+ flush_appends(true);
+ cancel_append_task();
+ } else if (!m_in_flight_appends.empty()) {
+ AppendBuffers &append_buffers = m_in_flight_appends.rbegin()->second;
+ assert(!append_buffers.empty());
+ future = Future(append_buffers.rbegin()->first);
+ }
+ }
+
+ if (future.is_valid()) {
+ future.flush(on_safe);
+ } else {
+ on_safe->complete(0);
}
+
}
void ObjectRecorder::flush(const FutureImplPtr &future) {
}
bool append(const AppendBuffers &append_buffers);
- void flush();
+ void flush(Context *on_safe);
void flush(const FutureImplPtr &future);
void claim_append_buffers(AppendBuffers *append_buffers);