int get_return_value() const;
private:
+ friend class Journaler;
friend std::ostream& operator<<(std::ostream&, const Future&);
+ inline FutureImplPtr get_future_impl() const {
+ return m_future_impl;
+ }
+
FutureImplPtr m_future_impl;
};
namespace journal {
-FutureImpl::FutureImpl(Finisher &finisher, const std::string &tag, uint64_t tid)
+FutureImpl::FutureImpl(Finisher &finisher, const std::string &tag, uint64_t tid,
+ uint64_t commit_tid)
: RefCountedObject(NULL, 0), m_finisher(finisher), m_tag(tag), m_tid(tid),
+ m_commit_tid(commit_tid),
m_lock(utils::unique_lock_name("FutureImpl::m_lock", this)), m_safe(false),
m_consistent(false), m_return_value(0), m_flush_state(FLUSH_STATE_NONE),
m_consistent_ack(this) {
};
typedef boost::intrusive_ptr<FlushHandler> FlushHandlerPtr;
- FutureImpl(Finisher &finisher, const std::string &tag, uint64_t tid);
+ FutureImpl(Finisher &finisher, const std::string &tag, uint64_t tid,
+ uint64_t commit_tid);
void init(const FutureImplPtr &prev_future);
inline uint64_t get_tid() const {
return m_tid;
}
+ inline uint64_t get_commit_tid() const {
+ return m_commit_tid;
+ }
void flush(Context *on_safe = NULL);
void wait(Context *on_safe);
Finisher &m_finisher;
std::string m_tag;
uint64_t m_tid;
+ uint64_t m_commit_tid;
mutable Mutex m_lock;
FutureImplPtr m_prev_future;
#include "common/Finisher.h"
#include "common/Timer.h"
#include "cls/journal/cls_journal_client.h"
+#include <set>
#define dout_subsys ceph_subsys_journaler
#undef dout_prefix
m_client_id(client_id), m_commit_interval(commit_interval), m_order(0),
m_splay_width(0), m_initialized(false), m_finisher(NULL), m_timer(NULL),
m_timer_lock("JournalMetadata::m_timer_lock"),
- m_lock("JournalMetadata::m_lock"), m_watch_ctx(this), m_watch_handle(0),
- m_minimum_set(0), m_active_set(0), m_update_notifications(0),
- m_commit_position_pending(false), m_commit_position_ctx(NULL) {
+ m_lock("JournalMetadata::m_lock"), m_commit_tid(0), m_watch_ctx(this),
+ m_watch_handle(0), m_minimum_set(0), m_active_set(0),
+ m_update_notifications(0), m_commit_position_ctx(NULL),
+ m_commit_position_task_ctx(NULL) {
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
}
m_active_set = object_set;
}
+void JournalMetadata::flush_commit_position() {
+ {
+ Mutex::Locker locker(m_lock);
+ if (m_commit_position_task_ctx == NULL) {
+ return;
+ }
+
+ Mutex::Locker timer_locker(m_timer_lock);
+ m_timer->cancel_event(m_commit_position_task_ctx);
+ m_commit_position_task_ctx = NULL;
+ }
+ handle_commit_position_task();
+}
+
void JournalMetadata::set_commit_position(
const ObjectSetPosition &commit_position, Context *on_safe) {
assert(on_safe != NULL);
assert(m_lock.is_locked());
Mutex::Locker timer_locker(m_timer_lock);
- if (!m_commit_position_pending) {
- m_commit_position_pending = true;
- m_timer->add_event_after(m_commit_interval, new C_CommitPositionTask(this));
+ if (m_commit_position_task_ctx == NULL) {
+ m_commit_position_task_ctx = new C_CommitPositionTask(this);
+ m_timer->add_event_after(m_commit_interval, m_commit_position_task_ctx);
}
}
schedule_watch_reset();
}
+uint64_t JournalMetadata::allocate_commit_tid(uint64_t object_num,
+ const std::string &tag,
+ uint64_t tid) {
+ Mutex::Locker locker(m_lock);
+ uint64_t commit_tid = ++m_commit_tid;
+ m_pending_commit_tids[commit_tid] = CommitEntry(object_num, tag, tid);
+
+ ldout(m_cct, 20) << "allocated commit tid: commit_tid=" << commit_tid << " ["
+ << "object_num=" << object_num << ", "
+ << "tag=" << tag << ", tid=" << tid << "]" << dendl;
+ return commit_tid;
+}
+
+bool JournalMetadata::committed(uint64_t commit_tid,
+ ObjectSetPosition *object_set_position) {
+ ldout(m_cct, 20) << "committed tid=" << commit_tid << dendl;
+
+ Mutex::Locker locker(m_lock);
+ {
+ CommitTids::iterator it = m_pending_commit_tids.find(commit_tid);
+ assert(it != m_pending_commit_tids.end());
+
+ CommitEntry &commit_entry = it->second;
+ commit_entry.committed = true;
+ }
+
+ if (!m_commit_position.entry_positions.empty()) {
+ *object_set_position = m_commit_position;
+ } else {
+ *object_set_position = m_client.commit_position;
+ }
+
+ bool update_commit_position = false;
+ while (!m_pending_commit_tids.empty()) {
+ CommitTids::iterator it = m_pending_commit_tids.begin();
+ CommitEntry &commit_entry = it->second;
+ if (!commit_entry.committed) {
+ break;
+ }
+
+ object_set_position->object_number = commit_entry.object_num;
+ if (!object_set_position->entry_positions.empty() &&
+ object_set_position->entry_positions.front().tag == commit_entry.tag) {
+ object_set_position->entry_positions.front() = EntryPosition(
+ commit_entry.tag, commit_entry.tid);
+ } else {
+ object_set_position->entry_positions.push_front(EntryPosition(
+ commit_entry.tag, commit_entry.tid));
+ }
+ m_pending_commit_tids.erase(it);
+ update_commit_position = true;
+ }
+
+ if (update_commit_position) {
+ // prune the position to have unique tags in commit-order
+ std::set<std::string> in_use_tags;
+ EntryPositions::iterator it = object_set_position->entry_positions.begin();
+ while (it != object_set_position->entry_positions.end()) {
+ if (!in_use_tags.insert(it->tag).second) {
+ it = object_set_position->entry_positions.erase(it);
+ } else {
+ ++it;
+ }
+ }
+
+ ldout(m_cct, 20) << "updated object set position: " << *object_set_position
+ << dendl;
+ }
+ return update_commit_position;
+}
+
void JournalMetadata::notify_update() {
ldout(m_cct, 10) << "notifying journal header update" << dendl;
return m_active_set;
}
+ void flush_commit_position();
void set_commit_position(const ObjectSetPosition &commit_position,
Context *on_safe);
void get_commit_position(ObjectSetPosition *commit_position) const {
void reserve_tid(const std::string &tag, uint64_t tid);
bool get_last_allocated_tid(const std::string &tag, uint64_t *tid) const;
+ uint64_t allocate_commit_tid(uint64_t object_num, const std::string &tag,
+ uint64_t tid);
+ bool committed(uint64_t commit_tid, ObjectSetPosition *object_set_position);
+
void notify_update();
void async_notify_update();
typedef std::map<std::string, uint64_t> AllocatedTids;
typedef std::list<Listener*> Listeners;
+ struct CommitEntry {
+ uint64_t object_num;
+ std::string tag;
+ uint64_t tid;
+ bool committed;
+
+ CommitEntry() : object_num(0), tid(0), committed(false) {
+ }
+ CommitEntry(uint64_t _object_num, const std::string &_tag, uint64_t _tid)
+ : object_num(_object_num), tag(_tag), tid(_tid), committed(false) {
+ }
+ };
+ typedef std::map<uint64_t, CommitEntry> CommitTids;
+
struct C_WatchCtx : public librados::WatchCtx2 {
JournalMetadata *journal_metadata;
virtual void finish(int r) {
journal_metadata->handle_immutable_metadata(r, on_finish);
}
-
};
+
struct C_Refresh : public Context {
JournalMetadata* journal_metadata;
uint64_t minimum_set;
mutable Mutex m_lock;
+ uint64_t m_commit_tid;
+ CommitTids m_pending_commit_tids;
+
Listeners m_listeners;
C_WatchCtx m_watch_ctx;
size_t m_update_notifications;
Cond m_update_cond;
- bool m_commit_position_pending;
ObjectSetPosition m_commit_position;
Context *m_commit_position_ctx;
+ Context *m_commit_position_task_ctx;
AsyncOpTracker m_async_op_tracker;
#include "journal/JournalPlayer.h"
#include "common/Finisher.h"
+#include "journal/Entry.h"
#include "journal/ReplayHandler.h"
#include "journal/Utils.h"
m_commit_object = commit_position.object_number;
m_commit_tag = commit_position.entry_positions.front().tag;
- for (size_t i=0; i<commit_position.entry_positions.size(); ++i) {
- const EntryPosition &entry_position = commit_position.entry_positions[i];
+ for (EntryPositions::const_iterator it =
+ commit_position.entry_positions.begin();
+ it != commit_position.entry_positions.end(); ++it) {
+ const EntryPosition &entry_position = *it;
m_commit_tids[entry_position.tag] = entry_position.tid;
}
}
}
}
-bool JournalPlayer::try_pop_front(Entry *entry,
- ObjectSetPosition *object_set_position) {
+bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
Mutex::Locker locker(m_lock);
if (m_state != STATE_PLAYBACK) {
return false;
}
}
- // TODO populate the object_set_position w/ current object number and
- // unique entry tag mappings
m_journal_metadata->reserve_tid(entry->get_tag(), entry->get_tid());
+ *commit_tid = m_journal_metadata->allocate_commit_tid(
+ object_player->get_object_number(), entry->get_tag(), entry->get_tid());
return true;
}
namespace journal {
+class Entry;
class ReplayHandler;
class JournalPlayer {
void prefetch_and_watch(double interval);
void unwatch();
- bool try_pop_front(Entry *entry, ObjectSetPosition *object_set_position);
+ bool try_pop_front(Entry *entry, uint64_t *commit_tid);
private:
typedef std::map<std::string, uint64_t> AllocatedTids;
uint8_t splay_offset = tid % splay_width;
ObjectRecorderPtr object_ptr = get_object(splay_offset);
+ uint64_t commit_tid = m_journal_metadata->allocate_commit_tid(
+ object_ptr->get_object_number(), tag, tid);
FutureImplPtr future(new FutureImpl(m_journal_metadata->get_finisher(),
- tag, tid));
+ tag, tid, commit_tid));
future->init(m_prev_future);
m_prev_future = future;
append_buffers.push_back(std::make_pair(future, entry_bl));
bool object_full = object_ptr->append(append_buffers);
- // TODO populate the object_set_position
-
if (object_full) {
ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full"
<< dendl;
}
JournalTrimmer::~JournalTrimmer() {
+ m_journal_metadata->flush_commit_position();
m_async_op_tracker.wait_for_ops();
}
return ctx.wait();
}
-void JournalTrimmer::update_commit_position(
- const ObjectSetPosition &object_set_position) {
- ldout(m_cct, 20) << __func__ << ": pos=" << object_set_position
- << dendl;
+void JournalTrimmer::committed(uint64_t commit_tid) {
+ ldout(m_cct, 20) << __func__ << ": commit_tid=" << commit_tid << dendl;
+
+ ObjectSetPosition object_set_position;
+ if (!m_journal_metadata->committed(commit_tid, &object_set_position)) {
+ return;
+ }
{
Mutex::Locker locker(m_lock);
~JournalTrimmer();
int remove_objects();
- void update_commit_position(const ObjectSetPosition &object_set_position);
+ void committed(uint64_t commit_tid);
private:
struct C_CommitPositionSafe : public Context {
#include "journal/JournalPlayer.h"
#include "journal/JournalRecorder.h"
#include "journal/JournalTrimmer.h"
-#include "journal/PayloadImpl.h"
+#include "journal/ReplayEntry.h"
#include "journal/ReplayHandler.h"
#include "cls/journal/cls_journal_client.h"
#include "cls/journal/cls_journal_types.h"
m_player->prefetch_and_watch(interval);
}
-bool Journaler::try_pop_front(Payload *payload) {
+bool Journaler::try_pop_front(ReplayEntry *replay_entry) {
assert(m_player != NULL);
Entry entry;
- ObjectSetPosition object_set_position;
- if (!m_player->try_pop_front(&entry, &object_set_position)) {
+ uint64_t commit_tid;
+ if (!m_player->try_pop_front(&entry, &commit_tid)) {
return false;
}
- *payload = Payload(new PayloadImpl(entry.get_data(), object_set_position));
+ *replay_entry = ReplayEntry(entry.get_data(), commit_tid);
return true;
}
m_player = NULL;
}
-void Journaler::update_commit_position(const Payload &payload) {
- PayloadImplPtr payload_impl = payload.get_payload_impl();
- m_trimmer->update_commit_position(payload_impl->get_object_set_position());
+void Journaler::committed(const ReplayEntry &replay_entry) {
+ m_trimmer->committed(replay_entry.get_commit_tid());
+}
+
+void Journaler::committed(const Future &future) {
+ FutureImplPtr future_impl = future.get_future_impl();
+ m_trimmer->committed(future_impl->get_commit_tid());
}
void Journaler::start_append() {
#include "include/buffer.h"
#include "include/rados/librados.hpp"
#include "journal/Future.h"
-#include "journal/Payload.h"
#include <string>
#include <map>
#include "include/assert.h"
class JournalPlayer;
class JournalRecorder;
class JournalTrimmer;
+class ReplayEntry;
class ReplayHandler;
class Journaler {
void start_replay(ReplayHandler *replay_handler);
void start_live_replay(ReplayHandler *replay_handler, double interval);
- bool try_pop_front(Payload *payload);
+ bool try_pop_front(ReplayEntry *replay_entry);
void stop_replay();
- void update_commit_position(const Payload &payload);
-
void start_append();
Future append(const std::string &tag, const bufferlist &bl);
void flush(Context *on_safe);
void stop_append(Context *on_safe);
+ void committed(const ReplayEntry &replay_entry);
+ void committed(const Future &future);
+
private:
librados::IoCtx m_header_ioctx;
librados::IoCtx m_data_ioctx;
journal/JournalTrimmer.cc \
journal/ObjectPlayer.cc \
journal/ObjectRecorder.cc \
- journal/Payload.cc \
- journal/PayloadImpl.cc \
journal/Utils.cc
noinst_LTLIBRARIES += libjournal.la
journal/JournalTrimmer.h \
journal/ObjectPlayer.h \
journal/ObjectRecorder.h \
- journal/Payload.h \
- journal/PayloadImpl.h \
+ journal/ReplayEntry.h \
journal/ReplayHandler.h \
journal/Utils.h
DENCODER_DEPS += libjournal.la
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "journal/Payload.h"
-#include "journal/PayloadImpl.h"
-
-namespace journal {
-
-const bufferlist &Payload::get_data() const {
- return m_payload_impl->get_data();
-}
-
-void intrusive_ptr_add_ref(PayloadImpl *p) {
- p->get();
-}
-
-void intrusive_ptr_release(PayloadImpl *p) {
- p->put();
-}
-
-} // namespace journal
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#ifndef CEPH_JOURNAL_PAYLOAD_H
-#define CEPH_JOURNAL_PAYLOAD_H
-
-#include "include/int_types.h"
-#include "include/buffer.h"
-#include <boost/intrusive_ptr.hpp>
-
-namespace journal {
-
-class PayloadImpl;
-
-class Payload {
-public:
- typedef boost::intrusive_ptr<PayloadImpl> PayloadImplPtr;
-
- Payload() {}
- Payload(const PayloadImplPtr &payload) : m_payload_impl(payload) {}
-
- inline bool is_valid() const {
- return m_payload_impl;
- }
-
- const bufferlist &get_data() const;
-
-private:
- friend class Journaler;
-
- inline PayloadImplPtr get_payload_impl() const {
- return m_payload_impl;
- }
-
- PayloadImplPtr m_payload_impl;
-};
-
-void intrusive_ptr_add_ref(PayloadImpl *p);
-void intrusive_ptr_release(PayloadImpl *p);
-
-} // namespace journal
-
-#endif // CEPH_JOURNAL_PAYLOAD_H
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "journal/PayloadImpl.h"
-
-namespace journal {
-
-PayloadImpl::PayloadImpl(const bufferlist &data,
- const ObjectSetPosition &object_set_position)
- : m_data(data), m_object_set_position(object_set_position) {
-}
-
-const bufferlist &PayloadImpl::get_data() const {
- return m_data;
-}
-
-const PayloadImpl::ObjectSetPosition &
-PayloadImpl::get_object_set_position() const {
- return m_object_set_position;
-}
-
-} // namespace journal
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#ifndef CEPH_JOURNAL_PAYLOAD_IMPL_H
-#define CEPH_JOURNAL_PAYLOAD_IMPL_H
-
-#include "include/int_types.h"
-#include "include/buffer.h"
-#include "common/RefCountedObj.h"
-#include "cls/journal/cls_journal_types.h"
-#include <boost/noncopyable.hpp>
-#include <boost/intrusive_ptr.hpp>
-#include "include/assert.h"
-
-namespace journal {
-
-class PayloadImpl;
-typedef boost::intrusive_ptr<PayloadImpl> PayloadImplPtr;
-
-class PayloadImpl : public RefCountedObject, boost::noncopyable {
-public:
- typedef cls::journal::ObjectSetPosition ObjectSetPosition;
-
- PayloadImpl(const bufferlist &data,
- const ObjectSetPosition &object_set_position);
-
- const bufferlist &get_data() const;
- const ObjectSetPosition &get_object_set_position() const;
-
-private:
- bufferlist m_data;
- ObjectSetPosition m_object_set_position;
-};
-
-} // namespace journal
-
-#endif // CEPH_JOURNAL_PAYLOAD_IMPL_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_REPLAY_ENTRY_H
+#define CEPH_JOURNAL_REPLAY_ENTRY_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+
+namespace journal {
+
+class ReplayEntry {
+public:
+ ReplayEntry() : m_commit_tid(0) {
+ }
+ ReplayEntry(const bufferlist &data, uint64_t commit_tid)
+ : m_data(data), m_commit_tid(commit_tid) {
+ }
+
+ inline const bufferlist &get_data() const {
+ return m_data;
+ }
+ inline uint64_t get_commit_tid() const {
+ return m_commit_tid;
+ }
+
+private:
+ bufferlist m_data;
+ uint64_t m_commit_tid;
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_REPLAY_ENTRY_H