include include/Makefile.am
include librados/Makefile.am
include libradosstriper/Makefile.am
+include journal/Makefile.am
include librbd/Makefile.am
include rgw/Makefile.am
include cls/Makefile.am
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/Entry.h"
+#include "include/encoding.h"
+#include "include/stringify.h"
+#include "common/Formatter.h"
+#include <strstream>
+
+#define dout_subsys ceph_subsys_journaler
+#undef dout_prefix
+#define dout_prefix *_dout << "Entry: "
+
+namespace journal {
+
+namespace {
+
+const uint32_t HEADER_FIXED_SIZE = 17; /// preamble, version, tid
+
+} // anonymous namespace
+
+void Entry::encode(bufferlist &bl) const {
+ bufferlist data_bl;
+ ::encode(preamble, data_bl);
+ ::encode(static_cast<uint8_t>(1), data_bl);
+ ::encode(m_tid, data_bl);
+ assert(HEADER_FIXED_SIZE == data_bl.length());
+
+ ::encode(m_tag, data_bl);
+ ::encode(m_data, data_bl);
+
+ uint32_t crc = data_bl.crc32c(0);
+ bl.claim_append(data_bl);
+ ::encode(crc, bl);
+}
+
+void Entry::decode(bufferlist::iterator &iter) {
+ uint32_t start_offset = iter.get_off();
+ uint64_t bl_preamble;
+ ::decode(bl_preamble, iter);
+ if (bl_preamble != preamble) {
+ throw buffer::malformed_input("incorrect preamble: " +
+ stringify(bl_preamble));
+ }
+
+ uint8_t version;
+ ::decode(version, iter);
+ if (version != 1) {
+ throw buffer::malformed_input("unknown version: " + stringify(version));
+ }
+
+ ::decode(m_tid, iter);
+ ::decode(m_tag, iter);
+ ::decode(m_data, iter);
+ uint32_t end_offset = iter.get_off();
+
+ uint32_t crc;
+ ::decode(crc, iter);
+
+ bufferlist data_bl;
+ data_bl.substr_of(iter.get_bl(), start_offset, end_offset - start_offset);
+ uint32_t actual_crc = data_bl.crc32c(0);
+ if (crc != actual_crc) {
+ throw buffer::malformed_input("crc mismatch: " + stringify(crc) +
+ " != " + stringify(actual_crc));
+ }
+}
+
+void Entry::dump(Formatter *f) const {
+ f->dump_string("tag", m_tag);
+ f->dump_unsigned("tid", m_tid);
+
+ std::stringstream data;
+ m_data.hexdump(data);
+ f->dump_string("data", data.str());
+}
+
+bool Entry::is_readable(bufferlist::iterator iter, uint32_t *bytes_needed) {
+ uint32_t start_off = iter.get_off();
+ if (iter.get_remaining() < HEADER_FIXED_SIZE) {
+ *bytes_needed = HEADER_FIXED_SIZE - iter.get_remaining();
+ return false;
+ }
+ uint64_t bl_preamble;
+ ::decode(bl_preamble, iter);
+ if (bl_preamble != preamble) {
+ *bytes_needed = 0;
+ return false;
+ }
+ iter.advance(HEADER_FIXED_SIZE - sizeof(bl_preamble));
+
+ if (iter.get_remaining() < sizeof(uint32_t)) {
+ *bytes_needed = sizeof(uint32_t) - iter.get_remaining();
+ return false;
+ }
+ uint32_t tag_size;
+ ::decode(tag_size, iter);
+
+ if (iter.get_remaining() < tag_size) {
+ *bytes_needed = tag_size - iter.get_remaining();
+ return false;
+ }
+ iter.advance(tag_size);
+
+ if (iter.get_remaining() < sizeof(uint32_t)) {
+ *bytes_needed = sizeof(uint32_t) - iter.get_remaining();
+ return false;
+ }
+ uint32_t data_size;
+ ::decode(data_size, iter);
+
+ if (iter.get_remaining() < data_size) {
+ *bytes_needed = data_size - iter.get_remaining();
+ return false;
+ }
+ iter.advance(data_size);
+ uint32_t end_off = iter.get_off();
+
+ if (iter.get_remaining() < sizeof(uint32_t)) {
+ *bytes_needed = sizeof(uint32_t) - iter.get_remaining();
+ return false;
+ }
+
+ bufferlist crc_bl;
+ crc_bl.substr_of(iter.get_bl(), start_off, end_off - start_off);
+
+ *bytes_needed = 0;
+ uint32_t crc;
+ ::decode(crc, iter);
+ if (crc != crc_bl.crc32c(0)) {
+ return false;
+ }
+ return true;
+}
+
+void Entry::generate_test_instances(std::list<Entry *> &o) {
+ o.push_back(new Entry("tag1", 123, bufferlist()));
+
+ bufferlist bl;
+ bl.append("data");
+ o.push_back(new Entry("tag2", 123, bl));
+}
+
+bool Entry::operator==(const Entry& rhs) const {
+ return (m_tag == rhs.m_tag && m_tid == rhs.m_tid &&
+ const_cast<bufferlist&>(m_data).contents_equal(
+ const_cast<bufferlist&>(rhs.m_data)));
+}
+
+std::ostream &operator<<(std::ostream &os, const Entry &entry) {
+ os << "Entry[tag=" << entry.get_tag() << ", tid=" << entry.get_tid() << ", "
+ << "data size=" << entry.get_data().length() << "]";
+ return os;
+}
+
+} // namespace journal
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_ENTRY_H
+#define CEPH_JOURNAL_ENTRY_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "include/encoding.h"
+#include <iosfwd>
+#include <string>
+
+namespace ceph {
+class Formatter;
+}
+
+namespace journal {
+
+class Entry {
+public:
+ Entry() : m_tid() {}
+ Entry(const std::string &tag, uint64_t tid, const bufferlist &data)
+ : m_tag(tag), m_tid(tid), m_data(data)
+ {
+ }
+
+ inline const std::string &get_tag() const {
+ return m_tag;
+ }
+ inline uint64_t get_tid() const {
+ return m_tid;
+ }
+ inline const bufferlist &get_data() const {
+ return m_data;
+ }
+
+ void encode(bufferlist &bl) const;
+ void decode(bufferlist::iterator &iter);
+ void dump(ceph::Formatter *f) const;
+
+ bool operator==(const Entry& rhs) const;
+
+ static bool is_readable(bufferlist::iterator iter, uint32_t *bytes_needed);
+ static void generate_test_instances(std::list<Entry *> &o);
+
+private:
+ static const uint64_t preamble = 0x3141592653589793;
+
+ std::string m_tag;
+ uint64_t m_tid;
+ bufferlist m_data;
+};
+
+std::ostream &operator<<(std::ostream &os, const Entry &entry);
+
+} // namespace journal
+
+using journal::operator<<;
+
+WRITE_CLASS_ENCODER(journal::Entry)
+
+#endif // CEPH_JOURNAL_ENTRY_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/Future.h"
+#include "journal/FutureImpl.h"
+
+namespace journal {
+
+void Future::flush(Context *on_safe) {
+ m_future_impl->flush(on_safe);
+}
+
+void Future::wait(Context *on_safe) {
+ m_future_impl->wait(on_safe);
+}
+
+bool Future::is_complete() const {
+ return m_future_impl->is_complete();
+}
+
+int Future::get_return_value() const {
+ return m_future_impl->get_return_value();
+}
+
+void intrusive_ptr_add_ref(FutureImpl *p) {
+ p->get();
+}
+
+void intrusive_ptr_release(FutureImpl *p) {
+ p->put();
+}
+
+std::ostream &operator<<(std::ostream &os, const Future &future) {
+ return os << *future.m_future_impl.get();
+}
+
+} // namespace journal
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_FUTURE_H
+#define CEPH_JOURNAL_FUTURE_H
+
+#include "include/int_types.h"
+#include <string>
+#include <iosfwd>
+#include <boost/intrusive_ptr.hpp>
+#include "include/assert.h"
+
+class Context;
+
+namespace journal {
+
+class FutureImpl;
+
+class Future {
+public:
+ typedef boost::intrusive_ptr<FutureImpl> FutureImplPtr;
+
+ Future(const FutureImplPtr &future_impl) : m_future_impl(future_impl) {}
+
+ void flush(Context *on_safe);
+ void wait(Context *on_safe);
+
+ bool is_complete() const;
+ int get_return_value() const;
+
+private:
+ friend std::ostream& operator<<(std::ostream&, const Future&);
+
+ FutureImplPtr m_future_impl;
+};
+
+void intrusive_ptr_add_ref(FutureImpl *p);
+void intrusive_ptr_release(FutureImpl *p);
+
+std::ostream &operator<<(std::ostream &os, const Future &future);
+
+} // namespace journal
+
+using journal::intrusive_ptr_add_ref;
+using journal::intrusive_ptr_release;
+using journal::operator<<;
+
+#endif // CEPH_JOURNAL_FUTURE_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/FutureImpl.h"
+#include "journal/Utils.h"
+
+namespace journal {
+
+FutureImpl::FutureImpl(const std::string &tag, uint64_t tid)
+ : RefCountedObject(NULL, 0), m_tag(tag), m_tid(tid),
+ m_lock(utils::unique_lock_name("FutureImpl::m_lock", this)), m_safe(false),
+ m_consistent(false), m_return_value(0), m_flush_state(FLUSH_STATE_NONE),
+ m_consistent_ack(this) {
+}
+
+void FutureImpl::init(const FutureImplPtr &prev_future) {
+ // chain ourself to the prior future (if any) to that we known when the
+ // journal is consistent
+ if (prev_future) {
+ m_prev_future = prev_future;
+ m_prev_future->wait(&m_consistent_ack);
+ } else {
+ m_consistent_ack.complete(0);
+ }
+}
+
+void FutureImpl::flush(Context *on_safe) {
+ bool complete;
+ FlushHandlerPtr flush_handler;
+ {
+ Mutex::Locker locker(m_lock);
+ complete = (m_safe && m_consistent);
+ if (!complete) {
+ if (on_safe != NULL) {
+ m_contexts.push_back(on_safe);
+ }
+
+ if (m_flush_state == FLUSH_STATE_NONE) {
+ m_flush_state = FLUSH_STATE_REQUESTED;
+ flush_handler = m_flush_handler;
+
+ // walk the chain backwards up to <splay width> futures
+ if (m_prev_future) {
+ m_prev_future->flush();
+ }
+ }
+ }
+ }
+
+ if (complete && on_safe != NULL) {
+ on_safe->complete(m_return_value);
+ } else if (flush_handler) {
+ // attached to journal object -- instruct it to flush all entries through
+ // this one. possible to become detached while lock is released, so flush
+ // will be re-requested by the object if it doesn't own the future
+ flush_handler->flush(this);
+ }
+}
+
+void FutureImpl::wait(Context *on_safe) {
+ assert(on_safe != NULL);
+ {
+ Mutex::Locker locker(m_lock);
+ if (!m_safe || !m_consistent) {
+ m_contexts.push_back(on_safe);
+ return;
+ }
+ }
+ on_safe->complete(m_return_value);
+}
+
+bool FutureImpl::is_complete() const {
+ Mutex::Locker locker(m_lock);
+ return m_safe && m_consistent;
+}
+
+int FutureImpl::get_return_value() const {
+ Mutex::Locker locker(m_lock);
+ assert(m_safe && m_consistent);
+ return m_return_value;
+}
+
+bool FutureImpl::attach(const FlushHandlerPtr &flush_handler) {
+ Mutex::Locker locker(m_lock);
+ assert(!m_flush_handler);
+ m_flush_handler = flush_handler;
+ return m_flush_state != FLUSH_STATE_NONE;
+}
+
+void FutureImpl::safe(int r) {
+ Mutex::Locker locker(m_lock);
+ assert(!m_safe);
+ m_safe = true;
+ if (m_return_value == 0) {
+ m_return_value = r;
+ }
+
+ m_flush_handler.reset();
+ if (m_consistent) {
+ finish();
+ }
+}
+
+void FutureImpl::consistent(int r) {
+ Mutex::Locker locker(m_lock);
+ assert(!m_consistent);
+ m_consistent = true;
+ m_prev_future.reset();
+ if (m_return_value == 0) {
+ m_return_value = r;
+ }
+
+ if (m_safe) {
+ finish();
+ }
+}
+
+void FutureImpl::finish() {
+ assert(m_lock.is_locked());
+ assert(m_safe && m_consistent);
+
+ Contexts contexts;
+ contexts.swap(m_contexts);
+ for (Contexts::iterator it = contexts.begin();
+ it != contexts.end(); ++it) {
+ (*it)->complete(m_return_value);
+ }
+}
+
+std::ostream &operator<<(std::ostream &os, const FutureImpl &future) {
+ os << "Future[tag=" << future.m_tag << ", tid=" << future.m_tid << "]";
+ return os;
+}
+
+void intrusive_ptr_add_ref(FutureImpl::FlushHandler *p) {
+ p->get();
+}
+
+void intrusive_ptr_release(FutureImpl::FlushHandler *p) {
+ p->put();
+}
+
+} // namespace journal
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_FUTURE_IMPL_H
+#define CEPH_JOURNAL_FUTURE_IMPL_H
+
+#include "include/int_types.h"
+#include "common/Mutex.h"
+#include "common/RefCountedObj.h"
+#include "journal/Future.h"
+#include <list>
+#include <boost/noncopyable.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include "include/assert.h"
+
+class Context;
+
+namespace journal {
+
+class FutureImpl;
+typedef boost::intrusive_ptr<FutureImpl> FutureImplPtr;
+
+class FutureImpl : public RefCountedObject, boost::noncopyable {
+public:
+ struct FlushHandler {
+ virtual ~FlushHandler() {}
+ virtual void flush(const FutureImplPtr &future) = 0;
+ virtual void get() = 0;
+ virtual void put() = 0;
+ };
+ typedef boost::intrusive_ptr<FlushHandler> FlushHandlerPtr;
+
+ FutureImpl(const std::string &tag, uint64_t tid);
+
+ void init(const FutureImplPtr &prev_future);
+
+ inline const std::string &get_tag() const {
+ return m_tag;
+ }
+ inline uint64_t get_tid() const {
+ return m_tid;
+ }
+
+ void flush(Context *on_safe = NULL);
+ void wait(Context *on_safe);
+
+ bool is_complete() const;
+ int get_return_value() const;
+
+ inline bool is_flush_in_progress() const {
+ Mutex::Locker locker(m_lock);
+ return (m_flush_state == FLUSH_STATE_IN_PROGRESS);
+ }
+ inline void set_flush_in_progress() {
+ Mutex::Locker locker(m_lock);
+ m_flush_state = FLUSH_STATE_IN_PROGRESS;
+ }
+
+ bool attach(const FlushHandlerPtr &flush_handler);
+ inline void detach() {
+ Mutex::Locker locker(m_lock);
+ assert(m_flush_handler);
+ m_flush_handler.reset();
+ }
+ inline FlushHandlerPtr get_flush_handler() const {
+ Mutex::Locker locker(m_lock);
+ return m_flush_handler;
+ }
+
+ void safe(int r);
+
+private:
+ friend std::ostream &operator<<(std::ostream &, const FutureImpl &);
+
+ typedef std::list<Context *> Contexts;
+
+ enum FlushState {
+ FLUSH_STATE_NONE,
+ FLUSH_STATE_REQUESTED,
+ FLUSH_STATE_IN_PROGRESS
+ };
+
+ struct C_ConsistentAck : public Context {
+ FutureImplPtr future;
+ C_ConsistentAck(FutureImpl *_future) : future(_future) {}
+ virtual void complete(int r) {
+ future->consistent(r);
+ future.reset();
+ }
+ virtual void finish(int r) {}
+ };
+
+ std::string m_tag;
+ uint64_t m_tid;
+
+ mutable Mutex m_lock;
+ FutureImplPtr m_prev_future;
+ bool m_safe;
+ bool m_consistent;
+ int m_return_value;
+
+ FlushHandlerPtr m_flush_handler;
+ FlushState m_flush_state;
+
+ C_ConsistentAck m_consistent_ack;
+ Contexts m_contexts;
+
+ void consistent(int r);
+ void finish();
+};
+
+void intrusive_ptr_add_ref(FutureImpl::FlushHandler *p);
+void intrusive_ptr_release(FutureImpl::FlushHandler *p);
+
+std::ostream &operator<<(std::ostream &os, const FutureImpl &future);
+
+} // namespace journal
+
+using journal::operator<<;
+
+#endif // CEPH_JOURNAL_FUTURE_IMPL_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/JournalMetadata.h"
+#include "common/errno.h"
+#include "common/Timer.h"
+#include "cls/journal/cls_journal_client.h"
+
+#define dout_subsys ceph_subsys_journaler
+#undef dout_prefix
+#define dout_prefix *_dout << "JournalMetadata: "
+
+namespace journal {
+
+using namespace cls::journal;
+
+namespace {
+
+struct C_NotifyUpdate : public Context {
+ JournalMetadataPtr journal_metadata;
+
+ C_NotifyUpdate(JournalMetadata *_journal_metadata)
+ : journal_metadata(_journal_metadata) {}
+
+ virtual void finish(int r) {
+ if (r == 0) {
+ journal_metadata->notify_update();
+ }
+ }
+};
+
+void rados_ctx_callback(rados_completion_t c, void *arg) {
+ Context *comp = reinterpret_cast<Context *>(arg);
+ comp->complete(rados_aio_get_return_value(c));
+}
+
+} // anonymous namespace
+
+JournalMetadata::JournalMetadata(librados::IoCtx &ioctx,
+ const std::string &oid,
+ const std::string &client_id)
+ : m_cct(NULL), m_oid(oid), m_client_id(client_id), m_order(0),
+ m_splay_width(0), m_initialized(false), m_timer(NULL),
+ m_timer_lock("JournalMetadata::m_timer_lock"),
+ m_lock("JournalMetadata::m_lock"), m_watch_ctx(this), m_watch_handle(0),
+ m_update_notifications(0) {
+ m_ioctx.dup(ioctx);
+ m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
+}
+
+JournalMetadata::~JournalMetadata() {
+ if (m_timer != NULL) {
+ Mutex::Locker locker(m_timer_lock);
+ m_timer->shutdown();
+ delete m_timer;
+ m_timer = NULL;
+ }
+
+ m_ioctx.unwatch2(m_watch_handle);
+ librados::Rados rados(m_ioctx);
+ rados.watch_flush();
+}
+
+int JournalMetadata::init() {
+ assert(!m_initialized);
+ m_initialized = true;
+
+ int r = client::get_immutable_metadata(m_ioctx, m_oid, &m_order,
+ &m_splay_width);
+ if (r < 0) {
+ lderr(m_cct) << __func__ << ": failed to retrieve journal metadata: "
+ << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ m_timer = new SafeTimer(m_cct, m_timer_lock, false);
+ m_timer->init();
+
+ r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx);
+ if (r < 0) {
+ lderr(m_cct) << __func__ << ": failed to watch journal"
+ << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ C_SaferCond cond;
+ refresh(&cond);
+ r = cond.wait();
+ if (r < 0) {
+ return r;
+ }
+ return 0;
+}
+
+int JournalMetadata::register_client(const std::string &description) {
+ assert(!m_client_id.empty());
+
+ int r = client::client_register(m_ioctx, m_oid, m_client_id, description);
+ if (r < 0) {
+ lderr(m_cct) << "failed to register journal client '" << m_client_id
+ << "': " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ notify_update();
+ return 0;
+}
+
+int JournalMetadata::unregister_client() {
+ assert(!m_client_id.empty());
+
+ int r = client::client_unregister(m_ioctx, m_oid, m_client_id);
+ if (r < 0) {
+ lderr(m_cct) << "failed to unregister journal client '" << m_client_id
+ << "': " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ notify_update();
+ return 0;
+}
+
+void JournalMetadata::add_listener(Listener *listener) {
+ Mutex::Locker locker(m_lock);
+ while (m_update_notifications > 0) {
+ m_update_cond.Wait(m_lock);
+ }
+ m_listeners.push_back(listener);
+}
+
+void JournalMetadata::remove_listener(Listener *listener) {
+ Mutex::Locker locker(m_lock);
+ while (m_update_notifications > 0) {
+ m_update_cond.Wait(m_lock);
+ }
+ m_listeners.remove(listener);
+}
+
+void JournalMetadata::set_minimum_set(uint64_t object_set) {
+ Mutex::Locker locker(m_lock);
+ if (m_minimum_set >= object_set) {
+ return;
+ }
+
+ librados::ObjectWriteOperation op;
+ client::set_minimum_set(&op, object_set);
+
+ C_NotifyUpdate *ctx = new C_NotifyUpdate(this);
+ librados::AioCompletion *comp =
+ librados::Rados::aio_create_completion(ctx, NULL, rados_ctx_callback);
+ int r = m_ioctx.aio_operate(m_oid, comp, &op);
+ assert(r == 0);
+ comp->release();
+
+ m_minimum_set = object_set;
+}
+
+void JournalMetadata::set_active_set(uint64_t object_set) {
+ Mutex::Locker locker(m_lock);
+ if (m_active_set >= object_set) {
+ return;
+ }
+
+ librados::ObjectWriteOperation op;
+ client::set_active_set(&op, object_set);
+
+ C_NotifyUpdate *ctx = new C_NotifyUpdate(this);
+ librados::AioCompletion *comp =
+ librados::Rados::aio_create_completion(ctx, NULL, rados_ctx_callback);
+ int r = m_ioctx.aio_operate(m_oid, comp, &op);
+ assert(r == 0);
+ comp->release();
+
+ m_active_set = object_set;
+}
+
+void JournalMetadata::set_commit_position(
+ const ObjectSetPosition &commit_position) {
+ Mutex::Locker locker(m_lock);
+
+ librados::ObjectWriteOperation op;
+ client::client_commit(&op, m_client_id, commit_position);
+
+ C_NotifyUpdate *ctx = new C_NotifyUpdate(this);
+ librados::AioCompletion *comp =
+ librados::Rados::aio_create_completion(ctx, NULL, rados_ctx_callback);
+ int r = m_ioctx.aio_operate(m_oid, comp, &op);
+ assert(r == 0);
+ comp->release();
+}
+
+void JournalMetadata::reserve_tid(const std::string &tag, uint64_t tid) {
+ Mutex::Locker locker(m_lock);
+ uint64_t &allocated_tid = m_allocated_tids[tag];
+ if (allocated_tid <= tid) {
+ allocated_tid = tid + 1;
+ }
+}
+
+bool JournalMetadata::get_last_allocated_tid(const std::string &tag,
+ uint64_t *tid) const {
+ Mutex::Locker locker(m_lock);
+
+ AllocatedTids::const_iterator it = m_allocated_tids.find(tag);
+ if (it == m_allocated_tids.end()) {
+ return false;
+ }
+
+ assert(it->second > 0);
+ *tid = it->second - 1;
+ return true;
+}
+
+void JournalMetadata::refresh(Context *on_complete) {
+ ldout(m_cct, 10) << "refreshing journal metadata" << dendl;
+ C_Refresh *refresh = new C_Refresh(this, on_complete);
+ client::get_mutable_metadata(m_ioctx, m_oid, &refresh->minimum_set,
+ &refresh->active_set,
+ &refresh->registered_clients, refresh);
+}
+
+void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
+ ldout(m_cct, 10) << "refreshed journal metadata: r=" << r << dendl;
+ if (r == 0) {
+ Mutex::Locker locker(m_lock);
+
+ Client client(m_client_id, "");
+ RegisteredClients::iterator it = refresh->registered_clients.find(client);
+ if (it != refresh->registered_clients.end()) {
+ m_minimum_set = refresh->minimum_set;
+ m_active_set = refresh->active_set;
+ m_registered_clients = refresh->registered_clients;
+ m_client = *it;
+
+ ++m_update_notifications;
+ m_lock.Unlock();
+ for (Listeners::iterator it = m_listeners.begin();
+ it != m_listeners.end(); ++it) {
+ (*it)->handle_update(this);
+ }
+ m_lock.Lock();
+ if (--m_update_notifications == 0) {
+ m_update_cond.Signal();
+ }
+ } else {
+ lderr(m_cct) << "failed to locate client: " << m_client_id << dendl;
+ r = -ENOENT;
+ }
+ }
+
+ if (refresh->on_finish != NULL) {
+ refresh->on_finish->complete(r);
+ }
+}
+
+void JournalMetadata::schedule_watch_reset() {
+ Mutex::Locker locker(m_timer_lock);
+ m_timer->add_event_after(0.1, new C_WatchReset(this));
+}
+
+void JournalMetadata::handle_watch_reset() {
+ int r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx);
+ if (r < 0) {
+ lderr(m_cct) << __func__ << ": failed to watch journal"
+ << cpp_strerror(r) << dendl;
+ schedule_watch_reset();
+ } else {
+ ldout(m_cct, 10) << __func__ << ": reset journal watch" << dendl;
+ refresh(NULL);
+ }
+}
+
+void JournalMetadata::handle_watch_notify(uint64_t notify_id, uint64_t cookie) {
+ ldout(m_cct, 10) << "journal header updated" << dendl;
+
+ bufferlist bl;
+ m_ioctx.notify_ack(m_oid, notify_id, cookie, bl);
+
+ refresh(NULL);
+}
+
+void JournalMetadata::handle_watch_error(int err) {
+ lderr(m_cct) << "journal watch error: " << cpp_strerror(err) << dendl;
+ schedule_watch_reset();
+}
+
+void JournalMetadata::notify_update() {
+ ldout(m_cct, 10) << "notifying journal header update" << dendl;
+
+ librados::AioCompletion *comp =
+ librados::Rados::aio_create_completion(NULL, NULL, NULL);
+
+ bufferlist bl;
+ int r = m_ioctx.aio_notify(m_oid, comp, bl, 5000, NULL);
+ assert(r == 0);
+
+ comp->release();
+}
+
+} // namespace journal
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_JOURNAL_METADATA_H
+#define CEPH_JOURNAL_JOURNAL_METADATA_H
+
+#include "include/int_types.h"
+#include "include/Context.h"
+#include "include/rados/librados.hpp"
+#include "common/Mutex.h"
+#include "common/RefCountedObj.h"
+#include "cls/journal/cls_journal_types.h"
+#include <boost/intrusive_ptr.hpp>
+#include <boost/noncopyable.hpp>
+#include <list>
+#include <map>
+#include <string>
+#include "include/assert.h"
+
+class SafeTimer;
+
+namespace journal {
+
+class JournalMetadata;
+typedef boost::intrusive_ptr<JournalMetadata> JournalMetadataPtr;
+
+class JournalMetadata : public RefCountedObject, boost::noncopyable {
+public:
+ typedef cls::journal::EntryPosition EntryPosition;
+ typedef cls::journal::EntryPositions EntryPositions;
+ typedef cls::journal::ObjectSetPosition ObjectSetPosition;
+ typedef cls::journal::Client Client;
+
+ typedef std::set<Client> RegisteredClients;
+
+ struct Listener {
+ virtual ~Listener() {};
+ virtual void handle_update(JournalMetadata *) = 0;
+ };
+
+ JournalMetadata(librados::IoCtx &ioctx, const std::string &oid,
+ const std::string &client_id);
+ ~JournalMetadata();
+
+ int init();
+
+ void add_listener(Listener *listener);
+ void remove_listener(Listener *listener);
+
+ int register_client(const std::string &description);
+ int unregister_client();
+
+ inline uint8_t get_order() const {
+ return m_order;
+ }
+ inline uint8_t get_splay_width() const {
+ return m_splay_width;
+ }
+
+ inline SafeTimer &get_timer() {
+ return *m_timer;
+ }
+ inline Mutex &get_timer_lock() {
+ return m_timer_lock;
+ }
+
+ void set_minimum_set(uint64_t object_set);
+ inline uint64_t get_minimum_set() const {
+ Mutex::Locker locker(m_lock);
+ return m_minimum_set;
+ }
+
+ void set_active_set(uint64_t object_set);
+ inline uint64_t get_active_set() const {
+ Mutex::Locker locker(m_lock);
+ return m_active_set;
+ }
+
+ void set_commit_position(const ObjectSetPosition &commit_position);
+ void get_commit_position(ObjectSetPosition *commit_position) const {
+ Mutex::Locker locker(m_lock);
+ *commit_position = m_client.commit_position;
+ }
+
+ inline uint64_t allocate_tid(const std::string &tag) {
+ Mutex::Locker locker(m_lock);
+ return m_allocated_tids[tag]++;
+ }
+ void reserve_tid(const std::string &tag, uint64_t tid);
+ bool get_last_allocated_tid(const std::string &tag, uint64_t *tid) const;
+
+ void notify_update();
+
+private:
+ typedef std::map<std::string, uint64_t> AllocatedTids;
+ typedef std::list<Listener*> Listeners;
+
+ struct C_WatchCtx : public librados::WatchCtx2 {
+ JournalMetadata *journal_metadata;
+
+ C_WatchCtx(JournalMetadata *_journal_metadata)
+ : journal_metadata(_journal_metadata) {}
+
+ virtual void handle_notify(uint64_t notify_id, uint64_t cookie,
+ uint64_t notifier_id, bufferlist& bl) {
+ journal_metadata->handle_watch_notify(notify_id, cookie);
+ }
+ virtual void handle_error(uint64_t cookie, int err) {
+ journal_metadata->handle_watch_error(err);
+ }
+ };
+ struct C_WatchReset : public Context {
+ JournalMetadataPtr journal_metadata;
+
+ C_WatchReset(JournalMetadata *_journal_metadata)
+ : journal_metadata(_journal_metadata) {}
+
+ virtual void finish(int r) {
+ journal_metadata->handle_watch_reset();
+ }
+ };
+
+ struct C_Refresh : public Context {
+ JournalMetadataPtr journal_metadata;
+ uint64_t minimum_set;
+ uint64_t active_set;
+ RegisteredClients registered_clients;
+ Context *on_finish;
+
+ C_Refresh(JournalMetadata *_journal_metadata, Context *_on_finish)
+ : journal_metadata(_journal_metadata), minimum_set(0), active_set(0),
+ on_finish(_on_finish) {}
+
+ virtual void finish(int r) {
+ journal_metadata->handle_refresh_complete(this, r);
+ }
+ };
+
+ librados::IoCtx m_ioctx;
+ CephContext *m_cct;
+ std::string m_oid;
+ std::string m_client_id;
+
+ uint8_t m_order;
+ uint8_t m_splay_width;
+ bool m_initialized;
+
+ SafeTimer *m_timer;
+ Mutex m_timer_lock;
+
+ mutable Mutex m_lock;
+
+ Listeners m_listeners;
+
+ C_WatchCtx m_watch_ctx;
+ uint64_t m_watch_handle;
+
+ uint64_t m_minimum_set;
+ uint64_t m_active_set;
+ RegisteredClients m_registered_clients;
+ Client m_client;
+
+ AllocatedTids m_allocated_tids;
+
+ size_t m_update_notifications;
+ Cond m_update_cond;
+
+ void refresh(Context *on_finish);
+ void handle_refresh_complete(C_Refresh *refresh, int r);
+
+ void schedule_watch_reset();
+ void handle_watch_reset();
+ void handle_watch_notify(uint64_t notify_id, uint64_t cookie);
+ void handle_watch_error(int err);
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_JOURNAL_METADATA_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/JournalPlayer.h"
+#include "journal/ReplayHandler.h"
+#include "journal/Utils.h"
+
+#define dout_subsys ceph_subsys_journaler
+#undef dout_prefix
+#define dout_prefix *_dout << "JournalPlayer: "
+
+namespace journal {
+
+JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
+ const std::string &object_oid_prefix,
+ const JournalMetadataPtr& journal_metadata,
+ ReplayHandler *replay_handler)
+ : RefCountedObject(NULL, 0), m_cct(NULL),
+ m_object_oid_prefix(object_oid_prefix),
+ m_journal_metadata(journal_metadata), m_replay_handler(replay_handler),
+ m_process_state(this), m_lock("JournalPlayer::m_lock"), m_state(STATE_INIT),
+ m_splay_offset(0), m_watch_enabled(false), m_watch_scheduled(false),
+ m_watch_interval(0), m_commit_object(0) {
+ m_ioctx.dup(ioctx);
+ m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+
+ ObjectSetPosition commit_position;
+ m_journal_metadata->get_commit_position(&commit_position);
+ if (!commit_position.entry_positions.empty()) {
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ m_splay_offset = commit_position.object_number % splay_width;
+ m_commit_object = commit_position.object_number;
+ m_commit_tag = commit_position.entry_positions.front().tag;
+
+ for (size_t i=0; i<commit_position.entry_positions.size(); ++i) {
+ const EntryPosition &entry_position = commit_position.entry_positions[i];
+ m_commit_tids[entry_position.tag] = entry_position.tid;
+ }
+ }
+}
+
+void JournalPlayer::prefetch() {
+ m_lock.Lock();
+ assert(m_state == STATE_INIT);
+ m_state = STATE_PREFETCH;
+
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ ldout(m_cct, 10) << __func__ << ": prefetching " << (2 * splay_width) << " "
+ << "objects" << dendl;
+
+ // prefetch starting from the last known commit set
+ C_PrefetchBatch *ctx = new C_PrefetchBatch(this);
+ uint64_t start_object = (m_commit_object / splay_width) * splay_width;
+ for (uint64_t object_number = start_object;
+ object_number < start_object + (2 * splay_width); ++object_number) {
+ ctx->add_fetch();
+ fetch(object_number, ctx);
+ }
+ m_lock.Unlock();
+
+ ctx->complete(0);
+}
+
+void JournalPlayer::prefetch_and_watch(double interval) {
+ {
+ Mutex::Locker locker(m_lock);
+ m_watch_enabled = true;
+ m_watch_interval = interval;
+ }
+ prefetch();
+}
+
+void JournalPlayer::unwatch() {
+ Mutex::Locker locker(m_lock);
+ if (m_watch_scheduled) {
+ ObjectPlayerPtr object_player = get_object_player();
+ assert(object_player);
+
+ object_player->unwatch();
+ m_watch_scheduled = false;
+ }
+}
+
+bool JournalPlayer::try_pop_front(Entry *entry,
+ ObjectSetPosition *object_set_position) {
+ Mutex::Locker locker(m_lock);
+ if (m_state != STATE_PLAYBACK) {
+ return false;
+ }
+
+ ObjectPlayerPtr object_player = get_object_player();
+ assert(object_player);
+
+ if (object_player->empty()) {
+ if (m_watch_enabled && !m_watch_scheduled) {
+ object_player->watch(&m_process_state, m_watch_interval);
+ m_watch_scheduled = true;
+ }
+ return false;
+ }
+
+ object_player->front(entry);
+ object_player->pop_front();
+
+ uint64_t last_tid;
+ if (m_journal_metadata->get_last_allocated_tid(entry->get_tag(), &last_tid) &&
+ entry->get_tid() != last_tid + 1) {
+ lderr(m_cct) << "missing prior journal entry: " << *entry << dendl;
+
+ m_state = STATE_ERROR;
+ m_lock.Unlock();
+ m_replay_handler->handle_error(-EINVAL);
+ m_lock.Lock();
+ return false;
+ }
+
+ // skip to next splay offset if we cannot apply the next entry in-sequence
+ if (!object_player->empty()) {
+ Entry peek_entry;
+ object_player->front(&peek_entry);
+ if (peek_entry.get_tag() == entry->get_tag() ||
+ (m_journal_metadata->get_last_allocated_tid(peek_entry.get_tag(),
+ &last_tid) &&
+ last_tid +1 != peek_entry.get_tid())) {
+ advance_splay_object();
+ }
+ } else {
+ advance_splay_object();
+
+ ObjectPlayerPtr next_set_object_player = get_next_set_object_player();
+ if (!next_set_object_player->empty()) {
+ remove_object_player(object_player, &m_process_state);
+ }
+ }
+
+ // TODO populate the object_set_position w/ current object number and
+ // unique entry tag mappings
+ m_journal_metadata->reserve_tid(entry->get_tag(), entry->get_tid());
+ return true;
+}
+
+void JournalPlayer::process_state(int r) {
+ ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
+ if (r == 0) {
+ Mutex::Locker locker(m_lock);
+ switch (m_state) {
+ case STATE_PREFETCH:
+ ldout(m_cct, 10) << "PREFETCH" << dendl;
+ r = process_prefetch();
+ break;
+ case STATE_PLAYBACK:
+ ldout(m_cct, 10) << "PLAYBACK" << dendl;
+ r = process_playback();
+ break;
+ case STATE_ERROR:
+ ldout(m_cct, 10) << "ERROR" << dendl;
+ break;
+ default:
+ lderr(m_cct) << "UNEXPECTED STATE (" << m_state << ")" << dendl;
+ assert(false);
+ break;
+ }
+ }
+
+ if (r < 0) {
+ {
+ Mutex::Locker locker(m_lock);
+ m_state = STATE_ERROR;
+ }
+ m_replay_handler->handle_error(r);
+ }
+}
+
+int JournalPlayer::process_prefetch() {
+ ldout(m_cct, 10) << __func__ << dendl;
+ assert(m_lock.is_locked());
+
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
+ assert(m_object_players.count(splay_offset) == 1);
+
+ ObjectPlayers &object_players = m_object_players[splay_offset];
+ assert(object_players.size() == 2);
+
+ ObjectPlayerPtr object_player = object_players.begin()->second;
+ assert(!object_player->is_fetch_in_progress());
+
+ ldout(m_cct, 15) << "seeking known commit position in "
+ << object_player->get_oid() << dendl;
+ Entry entry;
+ while (!object_player->empty()) {
+ object_player->front(&entry);
+ if (entry.get_tid() > m_commit_tids[entry.get_tag()]) {
+ ldout(m_cct, 10) << "located next uncommitted entry: " << entry
+ << dendl;
+ break;
+ }
+
+ ldout(m_cct, 20) << "skipping committed entry: " << entry << dendl;
+ object_player->pop_front();
+ }
+
+ // if this object contains the commit position, our read should start with
+ // the next consistent journal entry in the sequence
+ if (object_player->get_object_number() == m_commit_object) {
+ if (object_player->empty()) {
+ advance_splay_object();
+ } else {
+ Entry entry;
+ object_player->front(&entry);
+ if (entry.get_tag() == m_commit_tag) {
+ advance_splay_object();
+ }
+ }
+ }
+
+ ObjectPlayerPtr next_set_object_player = get_next_set_object_player();
+ if (object_player->empty() && !next_set_object_player->empty()) {
+ ldout(m_cct, 15) << object_player->get_oid() << " empty" << dendl;
+ remove_object_player(object_player, &m_process_state);
+ }
+ }
+
+ m_state = STATE_PLAYBACK;
+ ObjectPlayerPtr object_player = get_object_player();
+ if (!object_player->empty()) {
+ ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
+ m_lock.Unlock();
+ m_replay_handler->handle_entries_available();
+ m_lock.Lock();
+ } else if (m_watch_enabled) {
+ object_player->watch(&m_process_state, m_watch_interval);
+ m_watch_scheduled = true;
+ }
+ return 0;
+}
+
+int JournalPlayer::process_playback() {
+ ldout(m_cct, 10) << __func__ << dendl;
+ assert(m_lock.is_locked());
+
+ m_watch_scheduled = false;
+
+ ObjectPlayerPtr object_player = get_object_player();
+ if (!object_player->empty()) {
+ ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
+ m_lock.Unlock();
+ m_replay_handler->handle_entries_available();
+ m_lock.Lock();
+ }
+ return 0;
+}
+
+const JournalPlayer::ObjectPlayers &JournalPlayer::get_object_players() const {
+ assert(m_lock.is_locked());
+
+ assert(m_object_players.count(m_splay_offset) == 1);
+ SplayedObjectPlayers::const_iterator it = m_object_players.find(
+ m_splay_offset);
+ assert(it != m_object_players.end());
+
+ const ObjectPlayers &object_players = it->second;
+ assert(object_players.size() == 2);
+ return object_players;
+}
+
+ObjectPlayerPtr JournalPlayer::get_object_player() const {
+ assert(m_lock.is_locked());
+
+ const ObjectPlayers &object_players = get_object_players();
+ return object_players.begin()->second;
+}
+
+ObjectPlayerPtr JournalPlayer::get_next_set_object_player() const {
+ assert(m_lock.is_locked());
+
+ const ObjectPlayers &object_players = get_object_players();
+ return object_players.rbegin()->second;
+}
+
+void JournalPlayer::advance_splay_object() {
+ assert(m_lock.is_locked());
+ ++m_splay_offset;
+ m_splay_offset %= m_journal_metadata->get_splay_width();
+ ldout(m_cct, 20) << __func__ << ": new offset "
+ << static_cast<uint32_t>(m_splay_offset) << dendl;
+}
+
+void JournalPlayer::remove_object_player(const ObjectPlayerPtr &object_player,
+ Context *on_fetch) {
+ assert(m_lock.is_locked());
+
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ ObjectPlayers &object_players = m_object_players[
+ object_player->get_object_number() % splay_width];
+ assert(!object_players.empty());
+ assert(object_players.begin()->second == object_player);
+ object_players.erase(object_players.begin());
+
+ fetch(object_player->get_object_number() + (2 * splay_width), on_fetch);
+}
+
+void JournalPlayer::fetch(uint64_t object_num, Context *ctx) {
+ assert(m_lock.is_locked());
+
+ std::string oid = utils::get_object_name(m_object_oid_prefix, object_num);
+
+ ldout(m_cct, 10) << __func__ << ": " << oid << dendl;
+ C_Fetch *fetch_ctx = new C_Fetch(this, object_num, ctx);
+ ObjectPlayerPtr object_player(new ObjectPlayer(
+ m_ioctx, m_object_oid_prefix, object_num, m_journal_metadata->get_timer(),
+ m_journal_metadata->get_timer_lock(), m_journal_metadata->get_order()));
+
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ m_object_players[object_num % splay_width][object_num] = object_player;
+ object_player->fetch(fetch_ctx);
+}
+
+int JournalPlayer::handle_fetched(int r, uint64_t object_num) {
+ std::string oid = utils::get_object_name(m_object_oid_prefix, object_num);
+
+ ldout(m_cct, 10) << __func__ << ": fetched "
+ << utils::get_object_name(m_object_oid_prefix, object_num)
+ << ": r=" << r << dendl;
+ if (r < 0 && r != -ENOENT) {
+ return r;
+ }
+ return 0;
+}
+
+JournalPlayer::C_PrefetchBatch::C_PrefetchBatch(JournalPlayer *p)
+ : player(p), lock("JournalPlayer::C_PrefetchBatch::lock"), refs(1),
+ return_value(0) {
+ player->get();
+}
+
+void JournalPlayer::C_PrefetchBatch::add_fetch() {
+ Mutex::Locker locker(lock);
+ ++refs;
+}
+
+void JournalPlayer::C_PrefetchBatch::complete(int r) {
+ {
+ Mutex::Locker locker(lock);
+ if (r < 0 && return_value == 0) {
+ return_value = r;
+ }
+ --refs;
+ }
+
+ if (refs == 0) {
+ player->process_state(return_value);
+ player->put();
+ delete this;
+ }
+}
+
+} // namespace journal
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_JOURNAL_PLAYER_H
+#define CEPH_JOURNAL_JOURNAL_PLAYER_H
+
+#include "include/int_types.h"
+#include "include/Context.h"
+#include "include/rados/librados.hpp"
+#include "common/Mutex.h"
+#include "common/RefCountedObj.h"
+#include "journal/JournalMetadata.h"
+#include "journal/ObjectPlayer.h"
+#include "cls/journal/cls_journal_types.h"
+#include <map>
+
+class SafeTimer;
+
+namespace journal {
+
+class ReplayHandler;
+class JournalPlayer;
+typedef boost::intrusive_ptr<JournalPlayer> JournalPlayerPtr;
+
+class JournalPlayer : public RefCountedObject {
+public:
+ typedef cls::journal::EntryPosition EntryPosition;
+ typedef cls::journal::EntryPositions EntryPositions;
+ typedef cls::journal::ObjectSetPosition ObjectSetPosition;
+
+ JournalPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
+ const JournalMetadataPtr& journal_metadata,
+ ReplayHandler *replay_handler);
+
+ void prefetch();
+ void prefetch_and_watch(double interval);
+ void unwatch();
+
+ bool try_pop_front(Entry *entry, ObjectSetPosition *object_set_position);
+
+private:
+ typedef std::map<std::string, uint64_t> AllocatedTids;
+ typedef std::map<uint64_t, ObjectPlayerPtr> ObjectPlayers;
+ typedef std::map<uint8_t, ObjectPlayers> SplayedObjectPlayers;
+
+ enum State {
+ STATE_INIT,
+ STATE_PREFETCH,
+ STATE_PLAYBACK,
+ STATE_ERROR
+ };
+
+ struct C_ProcessState : public Context {
+ JournalPlayer *player;
+ C_ProcessState(JournalPlayer *p) : player(p) {}
+ virtual void complete(int r) {
+ player->process_state(r);
+ }
+ virtual void finish(int r) {}
+ };
+ struct C_PrefetchBatch : public Context {
+ JournalPlayer *player;
+ Mutex lock;
+ uint32_t refs;
+ int return_value;
+
+ C_PrefetchBatch(JournalPlayer *p);
+ void add_fetch();
+ virtual void complete(int r);
+ virtual void finish(int r) {}
+ };
+ struct C_Fetch : public Context {
+ JournalPlayer *player;
+ uint64_t object_num;
+ Context *on_fetch;
+ C_Fetch(JournalPlayer *p, uint64_t o, Context *c)
+ : player(p), object_num(o), on_fetch(c) {
+ player->get();
+ }
+ virtual void finish(int r) {
+ r = player->handle_fetched(r, object_num);
+ on_fetch->complete(r);
+ player->put();
+ }
+ };
+
+ librados::IoCtx m_ioctx;
+ CephContext *m_cct;
+ std::string m_object_oid_prefix;
+ JournalMetadataPtr m_journal_metadata;
+
+ ReplayHandler *m_replay_handler;
+
+ C_ProcessState m_process_state;
+
+ mutable Mutex m_lock;
+ State m_state;
+ uint8_t m_splay_offset;
+
+ bool m_watch_enabled;
+ bool m_watch_scheduled;
+ double m_watch_interval;
+
+ SplayedObjectPlayers m_object_players;
+ uint64_t m_commit_object;
+ std::string m_commit_tag;
+ AllocatedTids m_commit_tids;
+
+ void advance_splay_object();
+
+ const ObjectPlayers &get_object_players() const;
+ ObjectPlayerPtr get_object_player() const;
+ ObjectPlayerPtr get_next_set_object_player() const;
+ void remove_object_player(const ObjectPlayerPtr &object_player,
+ Context *on_fetch);
+
+ void process_state(int r);
+ int process_prefetch();
+ int process_playback();
+
+ void fetch(uint64_t object_num, Context *ctx);
+ int handle_fetched(int r, uint64_t object_num);
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_JOURNAL_PLAYER_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/JournalRecorder.h"
+#include "journal/Entry.h"
+#include "journal/Utils.h"
+
+#define dout_subsys ceph_subsys_journaler
+#undef dout_prefix
+#define dout_prefix *_dout << "JournalRecorder: "
+
+namespace journal {
+
+JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
+ const std::string &object_oid_prefix,
+ const JournalMetadataPtr& journal_metadata,
+ uint32_t flush_interval, uint64_t flush_bytes,
+ double flush_age)
+ : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
+ m_journal_metadata(journal_metadata), m_flush_interval(flush_interval),
+ m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_listener(this),
+ m_overflow_handler(this), m_lock("JournalerRecorder::m_lock"),
+ m_current_set(m_journal_metadata->get_active_set()) {
+
+ Mutex::Locker locker(m_lock);
+ m_ioctx.dup(ioctx);
+ m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
+
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
+ uint64_t object_number = splay_offset + (m_current_set * splay_width);
+ m_object_ptrs[splay_offset] = create_object_recorder(object_number);
+ }
+
+ m_journal_metadata->add_listener(&m_listener);
+}
+
+JournalRecorder::~JournalRecorder() {
+ m_journal_metadata->remove_listener(&m_listener);
+}
+
+Future JournalRecorder::append(const std::string &tag,
+ const bufferlist &payload_bl) {
+ Mutex::Locker locker(m_lock);
+
+ uint64_t tid = m_journal_metadata->allocate_tid(tag);
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ uint8_t splay_offset = tid % splay_width;
+
+ ObjectRecorderPtr object_ptr = get_object(splay_offset);
+ FutureImplPtr future(new FutureImpl(tag, tid));
+ future->init(m_prev_future);
+ m_prev_future = future;
+
+ bufferlist entry_bl;
+ ::encode(Entry(future->get_tag(), future->get_tid(), payload_bl), entry_bl);
+
+ AppendBuffers append_buffers;
+ append_buffers.push_back(std::make_pair(future, entry_bl));
+ bool object_full = object_ptr->append(append_buffers);
+
+ // TODO populate the object_set_position
+
+ if (object_full) {
+ ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full"
+ << dendl;
+ close_object_set(object_ptr->get_object_number() / splay_width);
+ }
+
+ return Future(future);
+}
+
+void JournalRecorder::flush() {
+ Mutex::Locker locker(m_lock);
+ for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
+ it != m_object_ptrs.end(); ++it) {
+ it->second->flush();
+ }
+}
+
+ObjectRecorderPtr JournalRecorder::get_object(uint8_t splay_offset) {
+ assert(m_lock.is_locked());
+
+ ObjectRecorderPtr object_recoder = m_object_ptrs[splay_offset];
+ assert(object_recoder != NULL);
+ return object_recoder;
+}
+
+void JournalRecorder::close_object_set(uint64_t object_set) {
+ assert(m_lock.is_locked());
+
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ if (object_set != m_current_set) {
+ return;
+ }
+
+ uint64_t active_set = m_journal_metadata->get_active_set();
+ if (active_set < m_current_set + 1) {
+ m_journal_metadata->set_active_set(m_current_set + 1);
+ }
+ m_current_set = m_journal_metadata->get_active_set();
+
+ ldout(m_cct, 10) << __func__ << ": advancing to object set "
+ << m_current_set << dendl;
+
+ // object recorders will invoke overflow handler as they complete
+ // closing the object to ensure correct order of future appends
+ for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
+ it != m_object_ptrs.end(); ++it) {
+ ObjectRecorderPtr object_recorder = it->second;
+ if (object_recorder != NULL &&
+ object_recorder->get_object_number() / splay_width == m_current_set) {
+ if (object_recorder->close_object()) {
+ // no in-flight ops, immediately create new recorder
+ create_next_object_recorder(object_recorder);
+ }
+ }
+ }
+}
+
+ObjectRecorderPtr JournalRecorder::create_object_recorder(
+ uint64_t object_number) {
+ ObjectRecorderPtr object_recorder(new ObjectRecorder(
+ m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number),
+ object_number, m_journal_metadata->get_timer(),
+ m_journal_metadata->get_timer_lock(), &m_overflow_handler,
+ m_journal_metadata->get_order(), m_flush_interval, m_flush_bytes,
+ m_flush_age));
+ return object_recorder;
+}
+
+void JournalRecorder::create_next_object_recorder(
+ ObjectRecorderPtr object_recorder) {
+ assert(m_lock.is_locked());
+
+ uint64_t object_number = object_recorder->get_object_number();
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ uint8_t splay_offset = object_number % splay_width;
+
+ ObjectRecorderPtr new_object_recorder = create_object_recorder(
+ (m_current_set * splay_width) + splay_offset);
+
+ AppendBuffers append_buffers;
+ object_recorder->claim_append_buffers(&append_buffers);
+ new_object_recorder->append(append_buffers);
+
+ m_object_ptrs[splay_offset] = new_object_recorder;
+}
+
+void JournalRecorder::handle_update() {
+ Mutex::Locker locker(m_lock);
+
+ uint64_t active_set = m_journal_metadata->get_active_set();
+ if (active_set > m_current_set) {
+ close_object_set(m_current_set);
+ }
+}
+
+void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) {
+ ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl;
+
+ Mutex::Locker locker(m_lock);
+
+ uint64_t object_number = object_recorder->get_object_number();
+ uint8_t splay_width = m_journal_metadata->get_splay_width();
+ uint8_t splay_offset = object_number % splay_width;
+ ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset];
+ assert(active_object_recorder->get_object_number() == object_number);
+
+ close_object_set(object_number / splay_width);
+ create_next_object_recorder(active_object_recorder);
+}
+
+} // namespace journal
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_JOURNAL_RECORDER_H
+#define CEPH_JOURNAL_JOURNAL_RECORDER_H
+
+#include "include/int_types.h"
+#include "include/rados/librados.hpp"
+#include "common/Mutex.h"
+#include "journal/Future.h"
+#include "journal/FutureImpl.h"
+#include "journal/JournalMetadata.h"
+#include "journal/ObjectRecorder.h"
+#include <map>
+#include <string>
+
+class SafeTimer;
+
+namespace journal {
+
+class JournalRecorder {
+public:
+ JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
+ const JournalMetadataPtr &journal_metadata,
+ uint32_t flush_interval, uint64_t flush_bytes,
+ double flush_age);
+ ~JournalRecorder();
+
+ Future append(const std::string &tag, const bufferlist &bl);
+ void flush();
+
+ ObjectRecorderPtr get_object(uint8_t splay_offset);
+
+private:
+ typedef std::map<uint8_t, ObjectRecorderPtr> ObjectRecorderPtrs;
+
+ struct Listener : public JournalMetadata::Listener {
+ JournalRecorder *journal_recorder;
+
+ Listener(JournalRecorder *_journal_recorder)
+ : journal_recorder(_journal_recorder) {}
+
+ virtual void handle_update(JournalMetadata *) {
+ journal_recorder->handle_update();
+ }
+ };
+
+ struct OverflowHandler : public ObjectRecorder::OverflowHandler {
+ JournalRecorder *journal_recorder;
+
+ OverflowHandler(JournalRecorder *_journal_recorder)
+ : journal_recorder(_journal_recorder) {}
+
+ virtual void overflow(ObjectRecorder *object_recorder) {
+ journal_recorder->handle_overflow(object_recorder);
+ }
+ };
+
+ librados::IoCtx m_ioctx;
+ CephContext *m_cct;
+ std::string m_object_oid_prefix;
+
+ JournalMetadataPtr m_journal_metadata;
+
+ uint32_t m_flush_interval;
+ uint64_t m_flush_bytes;
+ double m_flush_age;
+
+ Listener m_listener;
+ OverflowHandler m_overflow_handler;
+
+ Mutex m_lock;
+
+ uint64_t m_current_set;
+ ObjectRecorderPtrs m_object_ptrs;
+
+ FutureImplPtr m_prev_future;
+
+ void close_object_set(uint64_t object_set);
+ ObjectRecorderPtr create_object_recorder(uint64_t object_number);
+ void create_next_object_recorder(ObjectRecorderPtr object_recorder);
+
+ void handle_update();
+ void handle_overflow(ObjectRecorder *object_recorder);
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_JOURNAL_RECORDER_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/Journaler.h"
+#include "include/stringify.h"
+#include "common/errno.h"
+#include "journal/Entry.h"
+#include "journal/FutureImpl.h"
+#include "journal/JournalMetadata.h"
+#include "journal/JournalPlayer.h"
+#include "journal/JournalRecorder.h"
+#include "journal/PayloadImpl.h"
+#include "journal/ReplayHandler.h"
+#include "cls/journal/cls_journal_client.h"
+#include "cls/journal/cls_journal_types.h"
+
+#define dout_subsys ceph_subsys_journaler
+#undef dout_prefix
+#define dout_prefix *_dout << "Journaler: "
+
+namespace journal {
+
+namespace {
+
+static const std::string JOURNAL_HEADER_PREFIX = "journal.";
+static const std::string JOURNAL_OBJECT_PREFIX = "journal_data.";
+
+} // anonymous namespace
+
+using namespace cls::journal;
+
+Journaler::Journaler(librados::IoCtx &header_ioctx, librados::IoCtx &data_ioctx,
+ uint64_t journal_id, const std::string &client_id)
+ : m_client_id(client_id), m_metadata(NULL), m_player(NULL), m_recorder(NULL)
+{
+ m_header_ioctx.dup(header_ioctx);
+ m_data_ioctx.dup(data_ioctx);
+ m_cct = reinterpret_cast<CephContext *>(m_header_ioctx.cct());
+
+ m_header_oid = JOURNAL_HEADER_PREFIX + stringify(journal_id);
+ m_object_oid_prefix = JOURNAL_OBJECT_PREFIX + stringify(journal_id) + ".";
+
+ m_metadata = new JournalMetadata(m_header_ioctx, m_header_oid, m_client_id);
+ m_metadata->get();
+}
+
+Journaler::~Journaler() {
+ if (m_metadata != NULL) {
+ m_metadata->put();
+ m_metadata = NULL;
+ }
+ assert(m_player == NULL);
+ assert(m_recorder == NULL);
+}
+
+int Journaler::init() {
+ return m_metadata->init();
+}
+
+int Journaler::create(uint8_t order, uint8_t splay_width) {
+ if (order > 64 || order < 12) {
+ lderr(m_cct) << "order must be in the range [12, 64]" << dendl;
+ return -EDOM;
+ }
+ if (splay_width == 0) {
+ return -EINVAL;
+ }
+
+ ldout(m_cct, 5) << "creating new journal: " << m_header_oid << dendl;
+ int r = client::create(m_header_ioctx, m_header_oid, order, splay_width);
+ if (r < 0) {
+ lderr(m_cct) << "failed to create journal: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+ return 0;
+}
+
+int Journaler::register_client(const std::string &description) {
+ return m_metadata->register_client(description);
+}
+
+int Journaler::unregister_client() {
+ return m_metadata->unregister_client();
+}
+
+void Journaler::start_replay(ReplayHandler *replay_handler) {
+ create_player(replay_handler);
+ m_player->prefetch();
+}
+
+void Journaler::start_live_replay(ReplayHandler *replay_handler,
+ double interval) {
+ create_player(replay_handler);
+ m_player->prefetch_and_watch(interval);
+}
+
+bool Journaler::try_pop_front(Payload *payload) {
+ assert(m_player != NULL);
+
+ Entry entry;
+ ObjectSetPosition object_set_position;
+ if (!m_player->try_pop_front(&entry, &object_set_position)) {
+ return false;
+ }
+
+ *payload = Payload(new PayloadImpl(entry.get_data(), object_set_position));
+ return true;
+}
+
+void Journaler::stop_replay() {
+ assert(m_player != NULL);
+ m_player->unwatch();
+ m_player->put();
+ m_player = NULL;
+}
+
+void Journaler::start_append() {
+ assert(m_recorder == NULL);
+
+ // TODO verify active object set >= current replay object set
+
+ // TODO configurable flush intervals
+ m_recorder = new JournalRecorder(m_data_ioctx, m_object_oid_prefix,
+ m_metadata, 0, 0, 0);
+}
+
+void Journaler::stop_append() {
+ assert(m_recorder != NULL);
+ m_recorder->flush();
+ delete m_recorder;
+ m_recorder = NULL;
+}
+
+Future Journaler::append(const std::string &tag, const bufferlist &payload_bl) {
+ return m_recorder->append(tag, payload_bl);
+}
+
+void Journaler::create_player(ReplayHandler *replay_handler) {
+ assert(m_player == NULL);
+ m_player = new JournalPlayer(m_data_ioctx, m_object_oid_prefix, m_metadata,
+ replay_handler);
+}
+
+} // namespace journal
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_JOURNALER_H
+#define CEPH_JOURNAL_JOURNALER_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "include/rados/librados.hpp"
+#include "journal/Future.h"
+#include "journal/Payload.h"
+#include <string>
+#include <map>
+#include "include/assert.h"
+
+class SafeTimer;
+
+namespace journal {
+
+class JournalMetadata;
+class JournalPlayer;
+class JournalRecorder;
+class ReplayHandler;
+
+class Journaler {
+public:
+ Journaler(librados::IoCtx &header_ioctx, librados::IoCtx &data_ioctx,
+ uint64_t journal_id, const std::string &client_id);
+ ~Journaler();
+
+ int init();
+
+ int create(uint8_t order, uint8_t splay_width);
+
+ int register_client(const std::string &description);
+ int unregister_client();
+
+ void start_replay(ReplayHandler *replay_handler);
+ void start_live_replay(ReplayHandler *replay_handler, double interval);
+ bool try_pop_front(Payload *payload);
+ void stop_replay();
+
+ void start_append();
+ Future append(const std::string &tag, const bufferlist &bl);
+ void stop_append();
+
+private:
+ librados::IoCtx m_header_ioctx;
+ librados::IoCtx m_data_ioctx;
+ CephContext *m_cct;
+ std::string m_client_id;
+
+ std::string m_header_oid;
+ std::string m_object_oid_prefix;
+
+ JournalMetadata *m_metadata;
+ JournalPlayer *m_player;
+ JournalRecorder *m_recorder;
+
+ void create_player(ReplayHandler *replay_handler);
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_JOURNALER_H
--- /dev/null
+if ENABLE_CLIENT
+if WITH_RADOS
+
+libjournal_la_SOURCES = \
+ journal/Entry.cc \
+ journal/Future.cc \
+ journal/FutureImpl.cc \
+ journal/Journaler.cc \
+ journal/JournalMetadata.cc \
+ journal/JournalPlayer.cc \
+ journal/JournalRecorder.cc \
+ journal/ObjectPlayer.cc \
+ journal/ObjectRecorder.cc \
+ journal/Payload.cc \
+ journal/PayloadImpl.cc \
+ journal/Utils.cc
+
+noinst_LTLIBRARIES += libjournal.la
+noinst_HEADERS += \
+ journal/Entry.h \
+ journal/Future.h \
+ journal/FutureImpl.h \
+ journal/Journaler.h \
+ journal/JournalMetadata.h \
+ journal/JournalPlayer.h \
+ journal/JournalRecorder.h \
+ journal/ObjectPlayer.h \
+ journal/ObjectRecorder.h \
+ journal/Payload.h \
+ journal/PayloadImpl.h \
+ journal/ReplayHandler.h \
+ journal/Utils.h
+DENCODER_DEPS += libjournal.la
+
+endif # WITH_RADOS
+endif # ENABLE_CLIENT
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/ObjectPlayer.h"
+#include "journal/Utils.h"
+#include "common/Timer.h"
+#include <limits>
+
+#define dout_subsys ceph_subsys_journaler
+#undef dout_prefix
+#define dout_prefix *_dout << "ObjectPlayer: "
+
+namespace journal {
+
+namespace {
+
+void rados_ctx_callback(rados_completion_t c, void *arg) {
+ Context *ctx = reinterpret_cast<Context *>(arg);
+ ctx->complete(rados_aio_get_return_value(c));
+}
+
+} // anonymous namespace
+
+ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
+ const std::string &object_oid_prefix,
+ uint64_t object_num, SafeTimer &timer,
+ Mutex &timer_lock, uint8_t order)
+ : RefCountedObject(NULL, 0), m_object_num(object_num),
+ m_oid(utils::get_object_name(object_oid_prefix, m_object_num)),
+ m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), m_order(order),
+ m_watch_interval(0), m_watch_task(NULL), m_watch_fetch(this),
+ m_lock(utils::unique_lock_name("ObjectPlayer::m_lock", this)),
+ m_fetch_in_progress(false), m_read_off(0), m_watch_ctx(NULL),
+ m_watch_ctx_in_progress(false) {
+ m_ioctx.dup(ioctx);
+ m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
+}
+
+ObjectPlayer::~ObjectPlayer() {
+ {
+ Mutex::Locker locker(m_lock);
+ assert(m_watch_ctx == NULL);
+ }
+}
+
+void ObjectPlayer::fetch(Context *on_finish) {
+ ldout(m_cct, 10) << __func__ << ": " << m_oid << dendl;
+
+ Mutex::Locker locker(m_lock);
+ m_fetch_in_progress = true;
+
+ C_Fetch *context = new C_Fetch(this, on_finish);
+ librados::ObjectReadOperation op;
+ op.read(m_read_off, 2 << m_order, &context->read_bl, NULL);
+
+ librados::AioCompletion *rados_completion =
+ librados::Rados::aio_create_completion(context, rados_ctx_callback, NULL);
+ int r = m_ioctx.aio_operate(m_oid, rados_completion, &op, 0, NULL);
+ assert(r == 0);
+ rados_completion->release();
+}
+
+void ObjectPlayer::watch(Context *on_fetch, double interval) {
+ ldout(m_cct, 20) << __func__ << ": " << m_oid << " watch" << dendl;
+ {
+ Mutex::Locker locker(m_lock);
+ assert(m_watch_ctx == NULL);
+ m_watch_ctx = on_fetch;
+ }
+ {
+ Mutex::Locker locker(m_timer_lock);
+ m_watch_interval = interval;
+ }
+ schedule_watch();
+}
+
+void ObjectPlayer::unwatch() {
+ ldout(m_cct, 20) << __func__ << ": " << m_oid << " unwatch" << dendl;
+ {
+ Mutex::Locker locker(m_lock);
+ while (m_watch_ctx_in_progress) {
+ m_watch_ctx_cond.Wait(m_lock);
+ }
+ delete m_watch_ctx;
+ m_watch_ctx = NULL;
+ }
+ cancel_watch();
+}
+
+void ObjectPlayer::front(Entry *entry) const {
+ Mutex::Locker locker(m_lock);
+ assert(!m_entries.empty());
+ *entry = m_entries.front();
+}
+
+void ObjectPlayer::pop_front() {
+ Mutex::Locker locker(m_lock);
+ assert(!m_entries.empty());
+ m_entries.pop_front();
+}
+
+int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl) {
+ ldout(m_cct, 10) << __func__ << ": " << m_oid << ", r=" << r << ", len="
+ << bl.length() << dendl;
+
+ m_fetch_in_progress = false;
+ if (r < 0) {
+ return r;
+ }
+ if (bl.length() == 0) {
+ return -ENOENT;
+ }
+
+ Mutex::Locker locker(m_lock);
+ m_read_bl.append(bl);
+
+ bool invalid = false;
+ uint32_t invalid_start_off = 0;
+
+ bufferlist::iterator iter(&m_read_bl, m_read_off);
+ while (!iter.end()) {
+ uint32_t bytes_needed;
+ if (!Entry::is_readable(iter, &bytes_needed)) {
+ if (bytes_needed != 0) {
+ invalid_start_off = iter.get_off();
+ invalid = true;
+ lderr(m_cct) << ": partial record at offset " << iter.get_off()
+ << dendl;
+ break;
+ }
+
+ if (!invalid) {
+ invalid_start_off = iter.get_off();
+ invalid = true;
+ lderr(m_cct) << ": detected corrupt journal entry at offset "
+ << invalid_start_off << dendl;
+ }
+ ++iter;
+ continue;
+ }
+
+ if (invalid) {
+ uint32_t invalid_end_off = iter.get_off();
+ lderr(m_cct) << ": corruption range [" << invalid_start_off
+ << ", " << invalid_end_off << ")" << dendl;
+ m_invalid_ranges.insert(invalid_start_off, invalid_end_off);
+ invalid = false;
+ }
+
+ Entry entry;
+ ::decode(entry, iter);
+ ldout(m_cct, 20) << ": " << entry << " decoded" << dendl;
+
+ EntryKey entry_key(std::make_pair(entry.get_tag(), entry.get_tid()));
+ if (m_entry_keys.find(entry_key) == m_entry_keys.end()) {
+ m_entry_keys[entry_key] = m_entries.insert(m_entries.end(), entry);
+ } else {
+ ldout(m_cct, 10) << ": " << entry << " is duplicate, replacing" << dendl;
+ *m_entry_keys[entry_key] = entry;
+ }
+ }
+
+ m_read_off = m_read_bl.length();
+ if (invalid) {
+ uint32_t invalid_end_off = m_read_bl.length();
+ lderr(m_cct) << ": corruption range [" << invalid_start_off
+ << ", " << invalid_end_off << ")" << dendl;
+ m_invalid_ranges.insert(invalid_start_off, invalid_end_off);
+ }
+
+ if (!m_invalid_ranges.empty()) {
+ r = -EINVAL;
+ }
+ return r;
+}
+
+void ObjectPlayer::schedule_watch() {
+ ldout(m_cct, 20) << __func__ << ": " << m_oid << " scheduling watch" << dendl;
+ Mutex::Locker locker(m_timer_lock);
+ assert(m_watch_task == NULL);
+ m_watch_task = new C_WatchTask(this);
+ m_timer.add_event_after(m_watch_interval, m_watch_task);
+}
+
+void ObjectPlayer::cancel_watch() {
+ ldout(m_cct, 20) << __func__ << ": " << m_oid << " cancelling watch" << dendl;
+ Mutex::Locker locker(m_timer_lock);
+ if (m_watch_task != NULL) {
+ m_timer.cancel_event(m_watch_task);
+ m_watch_task = NULL;
+ }
+}
+
+void ObjectPlayer::handle_watch_task() {
+ ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl;
+ {
+ Mutex::Locker locker(m_timer_lock);
+ m_watch_task = NULL;
+ }
+ fetch(&m_watch_fetch);
+}
+
+void ObjectPlayer::handle_watch_fetched(int r) {
+ ldout(m_cct, 10) << __func__ << ": " << m_oid << " poll complete, r=" << r
+ << dendl;
+ if (r == -ENOENT) {
+ schedule_watch();
+ return;
+ }
+
+ Context *on_finish;
+ {
+ Mutex::Locker locker(m_lock);
+ m_watch_ctx_in_progress = true;
+ on_finish = m_watch_ctx;
+ m_watch_ctx = NULL;
+ }
+
+ if (on_finish != NULL) {
+ on_finish->complete(r);
+ }
+
+ {
+ Mutex::Locker locker(m_lock);
+ m_watch_ctx_in_progress = false;
+ m_watch_ctx_cond.Signal();
+ }
+}
+
+void ObjectPlayer::C_Fetch::finish(int r) {
+ r = object_player->handle_fetch_complete(r, read_bl);
+ on_finish->complete(r);
+ object_player->put();
+}
+
+void ObjectPlayer::C_WatchTask::finish(int r) {
+ object_player->handle_watch_task();
+}
+
+void ObjectPlayer::C_WatchFetch::finish(int r) {
+ object_player->handle_watch_fetched(r);
+}
+
+} // namespace journal
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_OBJECT_PLAYER_H
+#define CEPH_JOURNAL_OBJECT_PLAYER_H
+
+#include "include/Context.h"
+#include "include/hash_namespace.h"
+#include "include/interval_set.h"
+#include "include/rados/librados.hpp"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "common/RefCountedObj.h"
+#include "journal/Entry.h"
+#include <list>
+#include <string>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/noncopyable.hpp>
+#include <boost/unordered_map.hpp>
+#include "include/assert.h"
+
+class SafeTimer;
+
+namespace journal {
+
+class ObjectPlayer;
+typedef boost::intrusive_ptr<ObjectPlayer> ObjectPlayerPtr;
+
+class ObjectPlayer : public RefCountedObject {
+public:
+ typedef std::list<Entry> Entries;
+ typedef interval_set<uint64_t> InvalidRanges;
+
+ ObjectPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
+ uint64_t object_num, SafeTimer &timer, Mutex &timer_lock,
+ uint8_t order);
+ ~ObjectPlayer();
+
+ inline const std::string &get_oid() const {
+ return m_oid;
+ }
+ inline uint64_t get_object_number() const {
+ return m_object_num;
+ }
+
+ void fetch(Context *on_finish);
+ void watch(Context *on_fetch, double interval);
+ void unwatch();
+
+ inline bool is_fetch_in_progress() const {
+ Mutex::Locker locker(m_lock);
+ return m_fetch_in_progress;
+ }
+
+ void front(Entry *entry) const;
+ void pop_front();
+ inline bool empty() const {
+ Mutex::Locker locker(m_lock);
+ return m_entries.empty();
+ }
+
+ inline void get_entries(Entries *entries) {
+ Mutex::Locker locker(m_lock);
+ *entries = m_entries;
+ }
+ inline void get_invalid_ranges(InvalidRanges *invalid_ranges) {
+ Mutex::Locker locker(m_lock);
+ *invalid_ranges = m_invalid_ranges;
+ }
+
+private:
+ typedef std::pair<std::string, uint64_t> EntryKey;
+ typedef boost::unordered_map<EntryKey, Entries::iterator> EntryKeys;
+
+ struct C_Fetch : public Context {
+ ObjectPlayer *object_player;
+ Context *on_finish;
+ bufferlist read_bl;
+ C_Fetch(ObjectPlayer *o, Context *ctx)
+ : object_player(o), on_finish(ctx) {
+ object_player->get();
+ }
+ virtual void finish(int r);
+ };
+ struct C_WatchTask : public Context {
+ ObjectPlayer *object_player;
+ C_WatchTask(ObjectPlayer *o) : object_player(o) {
+ object_player->get();
+ }
+ virtual void finish(int r);
+ };
+ struct C_WatchFetch : public Context {
+ ObjectPlayer *object_player;
+ C_WatchFetch(ObjectPlayer *o) : object_player(o) {
+ }
+ virtual void complete(int r) {
+ finish(r);
+ object_player->put();
+ }
+ virtual void finish(int r);
+ };
+
+ librados::IoCtx m_ioctx;
+ uint64_t m_object_num;
+ std::string m_oid;
+ CephContext *m_cct;
+
+ SafeTimer &m_timer;
+ Mutex &m_timer_lock;
+
+ double m_fetch_interval;
+ uint8_t m_order;
+
+ double m_watch_interval;
+ Context *m_watch_task;
+ C_WatchFetch m_watch_fetch;
+
+ mutable Mutex m_lock;
+ bool m_fetch_in_progress;
+ bufferlist m_read_bl;
+ uint32_t m_read_off;
+
+ Entries m_entries;
+ EntryKeys m_entry_keys;
+ InvalidRanges m_invalid_ranges;
+
+ Context *m_watch_ctx;
+ Cond m_watch_ctx_cond;
+ bool m_watch_ctx_in_progress;
+
+ int handle_fetch_complete(int r, const bufferlist &bl);
+
+ void schedule_watch();
+ void cancel_watch();
+ void handle_watch_task();
+ void handle_watch_fetched(int r);
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_OBJECT_PLAYER_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/ObjectRecorder.h"
+#include "journal/Utils.h"
+#include "include/assert.h"
+#include "common/Timer.h"
+#include "cls/journal/cls_journal_client.h"
+
+#define dout_subsys ceph_subsys_journaler
+#undef dout_prefix
+#define dout_prefix *_dout << "ObjectRecorder: "
+
+using namespace cls::journal;
+
+namespace journal {
+
+namespace {
+
+void rados_ctx_callback(rados_completion_t c, void *arg) {
+ Context *ctx = reinterpret_cast<Context *>(arg);
+ ctx->complete(rados_aio_get_return_value(c));
+}
+
+} // anonymous
+
+ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
+ uint64_t object_number,
+ SafeTimer &timer, Mutex &timer_lock,
+ OverflowHandler *overflow_handler, uint8_t order,
+ uint32_t flush_interval, uint64_t flush_bytes,
+ double flush_age)
+ : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number),
+ m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock),
+ m_overflow_handler(overflow_handler), m_order(order),
+ m_soft_max_size(1 << m_order), m_flush_interval(flush_interval),
+ m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_flush_handler(this),
+ m_append_task(NULL),
+ m_lock(utils::unique_lock_name("ObjectRecorder::m_lock", this)),
+ m_append_tid(0), m_pending_bytes(0), m_size(0), m_overflowed(false),
+ m_object_closed(false) {
+ m_ioctx.dup(ioctx);
+ m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
+ assert(m_overflow_handler != NULL);
+}
+
+ObjectRecorder::~ObjectRecorder() {
+ cancel_append_task();
+ assert(m_append_buffers.empty());
+ assert(m_in_flight_appends.empty());
+}
+
+bool ObjectRecorder::append(const AppendBuffers &append_buffers) {
+ FutureImplPtr last_flushed_future;
+ {
+ Mutex::Locker locker(m_lock);
+ for (AppendBuffers::const_iterator iter = append_buffers.begin();
+ iter != append_buffers.end(); ++iter) {
+ if (append(*iter)) {
+ last_flushed_future = iter->first;
+ }
+ }
+ }
+
+ if (last_flushed_future) {
+ flush(last_flushed_future);
+ }
+ return (m_size + m_pending_bytes >= m_soft_max_size);
+}
+
+void ObjectRecorder::flush() {
+ ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
+
+ Mutex::Locker locker(m_lock);
+ if (flush_appends(true)) {
+ cancel_append_task();
+ }
+}
+
+void ObjectRecorder::flush(const FutureImplPtr &future) {
+ ldout(m_cct, 20) << __func__ << ": " << m_oid << " flushing " << *future
+ << dendl;
+
+ Mutex::Locker locker(m_lock);
+ if (future->get_flush_handler().get() != &m_flush_handler) {
+ // if we don't own this future, re-issue the flush so that it hits the
+ // correct journal object owner
+ future->flush();
+ return;
+ } else if (future->is_flush_in_progress()) {
+ return;
+ }
+
+ assert(!m_object_closed);
+ AppendBuffers::iterator it;
+ for (it = m_append_buffers.begin(); it != m_append_buffers.end(); ++it) {
+ if (it->first == future) {
+ break;
+ }
+ }
+ assert(it != m_append_buffers.end());
+ ++it;
+
+ AppendBuffers flush_buffers;
+ flush_buffers.splice(flush_buffers.end(), m_append_buffers,
+ m_append_buffers.begin(), it);
+ send_appends(&flush_buffers);
+}
+
+void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) {
+ ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
+
+ Mutex::Locker locker(m_lock);
+ assert(m_in_flight_appends.empty());
+ assert(m_object_closed || m_overflowed);
+ append_buffers->splice(append_buffers->end(), m_append_buffers,
+ m_append_buffers.begin(), m_append_buffers.end());
+}
+
+bool ObjectRecorder::close_object() {
+ ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
+
+ Mutex::Locker locker(m_lock);
+ m_object_closed = true;
+ if (flush_appends(true)) {
+ cancel_append_task();
+ }
+ return m_in_flight_appends.empty();
+}
+
+void ObjectRecorder::handle_append_task() {
+ {
+ Mutex::Locker locker(m_lock);
+ flush_appends(true);
+ }
+
+ {
+ Mutex::Locker locker(m_timer_lock);
+ m_append_task = NULL;
+ put();
+ }
+}
+
+void ObjectRecorder::cancel_append_task() {
+ Mutex::Locker locker(m_timer_lock);
+ if (m_append_task != NULL) {
+ m_timer.cancel_event(m_append_task);
+ m_append_task = NULL;
+ put();
+ }
+}
+
+void ObjectRecorder::schedule_append_task() {
+ Mutex::Locker locker(m_timer_lock);
+ if (m_append_task == NULL && m_flush_age > 0) {
+ get();
+ m_append_task = new C_AppendTask(this);
+ m_timer.add_event_after(m_flush_age, m_append_task);
+ }
+}
+
+bool ObjectRecorder::append(const AppendBuffer &append_buffer) {
+ assert(m_lock.is_locked());
+
+ bool flush_requested = append_buffer.first->attach(&m_flush_handler);
+ m_append_buffers.push_back(append_buffer);
+ m_pending_bytes += append_buffer.second.length();
+
+ if (flush_appends(false)) {
+ cancel_append_task();
+ } else {
+ schedule_append_task();
+ }
+ return flush_requested;
+}
+
+bool ObjectRecorder::flush_appends(bool force) {
+ assert(m_lock.is_locked());
+ if (m_object_closed || m_overflowed) {
+ return true;
+ }
+
+ if (m_append_buffers.empty() ||
+ (!force &&
+ m_size + m_pending_bytes < m_soft_max_size &&
+ (m_flush_interval > 0 && m_append_buffers.size() < m_flush_interval) &&
+ (m_flush_bytes > 0 && m_pending_bytes < m_flush_bytes))) {
+ return false;
+ }
+
+ m_pending_bytes = 0;
+ AppendBuffers append_buffers;
+ append_buffers.swap(m_append_buffers);
+ send_appends(&append_buffers);
+ return true;
+}
+
+void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
+ ldout(m_cct, 10) << __func__ << ": " << m_oid << " tid=" << tid
+ << ", r=" << r << dendl;
+
+ Mutex::Locker locker(m_lock);
+ InFlightAppends::iterator iter = m_in_flight_appends.find(tid);
+ if (iter == m_in_flight_appends.end()) {
+ // must have seen an overflow on a previous append op
+ assert(m_overflowed);
+ return;
+ } else if (r == -EOVERFLOW) {
+ m_overflowed = true;
+ append_overflowed(tid);
+ return;
+ }
+
+ assert(!m_overflowed || r != 0);
+ AppendBuffers &append_buffers = iter->second;
+ assert(!append_buffers.empty());
+
+ // Flag the associated futures as complete.
+ for (AppendBuffers::iterator buf_it = append_buffers.begin();
+ buf_it != append_buffers.end(); ++buf_it) {
+ ldout(m_cct, 20) << __func__ << ": " << *buf_it->first << " marked safe"
+ << dendl;
+ buf_it->first->safe(r);
+ }
+ m_in_flight_appends.erase(iter);
+
+ if (m_in_flight_appends.empty() && m_object_closed) {
+ // all remaining unsent appends should be redirected to new object
+ notify_overflow();
+ }
+}
+
+void ObjectRecorder::append_overflowed(uint64_t tid) {
+ ldout(m_cct, 10) << __func__ << ": " << m_oid << " append overflowed"
+ << dendl;
+
+ assert(m_lock.is_locked());
+ assert(!m_in_flight_appends.empty());
+ assert(m_in_flight_appends.begin()->first == tid);
+
+ cancel_append_task();
+
+ InFlightAppends in_flight_appends;
+ in_flight_appends.swap(m_in_flight_appends);
+
+ AppendBuffers restart_append_buffers;
+ for (InFlightAppends::iterator it = in_flight_appends.begin();
+ it != in_flight_appends.end(); ++it) {
+ restart_append_buffers.insert(restart_append_buffers.end(),
+ it->second.begin(), it->second.end());
+ }
+
+ restart_append_buffers.splice(restart_append_buffers.end(),
+ m_append_buffers,
+ m_append_buffers.begin(),
+ m_append_buffers.end());
+ restart_append_buffers.swap(m_append_buffers);
+ notify_overflow();
+}
+
+void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
+ assert(m_lock.is_locked());
+ assert(!append_buffers->empty());
+
+ uint64_t append_tid = m_append_tid++;
+ ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid="
+ << append_tid << dendl;
+ C_AppendFlush *append_flush = new C_AppendFlush(this, append_tid);
+
+ librados::ObjectWriteOperation op;
+ client::guard_append(&op, m_soft_max_size);
+
+ for (AppendBuffers::iterator it = append_buffers->begin();
+ it != append_buffers->end(); ++it) {
+ ldout(m_cct, 20) << __func__ << ": flushing " << *it->first
+ << dendl;
+ it->first->set_flush_in_progress();
+ op.append(it->second);
+ m_size += it->second.length();
+ }
+ m_in_flight_appends[append_tid].swap(*append_buffers);
+
+ librados::AioCompletion *rados_completion =
+ librados::Rados::aio_create_completion(append_flush, NULL,
+ rados_ctx_callback);
+ int r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
+ assert(r == 0);
+ rados_completion->release();
+}
+
+void ObjectRecorder::notify_overflow() {
+ assert(m_lock.is_locked());
+
+ for (AppendBuffers::const_iterator it = m_append_buffers.begin();
+ it != m_append_buffers.end(); ++it) {
+ ldout(m_cct, 20) << __func__ << ": overflowed " << *it->first
+ << dendl;
+ it->first->detach();
+ }
+
+ // TODO need to delay completion until after aio_notify completes
+ m_lock.Unlock();
+ m_overflow_handler->overflow(this);
+ m_lock.Lock();
+}
+
+} // namespace journal
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_OBJECT_RECORDER_H
+#define CEPH_JOURNAL_OBJECT_RECORDER_H
+
+#include "include/Context.h"
+#include "include/rados/librados.hpp"
+#include "common/Mutex.h"
+#include "common/RefCountedObj.h"
+#include "journal/FutureImpl.h"
+#include <list>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/noncopyable.hpp>
+#include "include/assert.h"
+
+class SafeTimer;
+
+namespace journal {
+
+class ObjectRecorder;
+typedef boost::intrusive_ptr<ObjectRecorder> ObjectRecorderPtr;
+
+typedef std::pair<FutureImplPtr, bufferlist> AppendBuffer;
+typedef std::list<AppendBuffer> AppendBuffers;
+
+class ObjectRecorder : public RefCountedObject, boost::noncopyable {
+public:
+ struct OverflowHandler {
+ virtual ~OverflowHandler() {}
+ virtual void overflow(ObjectRecorder *object_recorder) = 0;
+ };
+
+ ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
+ uint64_t object_number, SafeTimer &timer, Mutex &timer_lock,
+ OverflowHandler *overflow_handler, uint8_t order,
+ uint32_t flush_interval, uint64_t flush_bytes,
+ double flush_age);
+ ~ObjectRecorder();
+
+ inline uint64_t get_object_number() const {
+ return m_object_number;
+ }
+ inline const std::string &get_oid() const {
+ return m_oid;
+ }
+
+ bool append(const AppendBuffers &append_buffers);
+ void flush();
+ void flush(const FutureImplPtr &future);
+
+ void claim_append_buffers(AppendBuffers *append_buffers);
+ bool close_object();
+
+ inline CephContext *cct() const {
+ return m_cct;
+ }
+
+ inline size_t get_pending_appends() const {
+ Mutex::Locker locker(m_lock);
+ return m_append_buffers.size();
+ }
+
+private:
+ typedef std::map<uint64_t, AppendBuffers> InFlightAppends;
+
+ struct FlushHandler : public FutureImpl::FlushHandler {
+ ObjectRecorder *object_recorder;
+ FlushHandler(ObjectRecorder *o) : object_recorder(o) {}
+ virtual void get() {
+ object_recorder->get();
+ }
+ virtual void put() {
+ object_recorder->put();
+ }
+ virtual void flush(const FutureImplPtr &future) {
+ object_recorder->flush(future);
+ }
+ };
+ struct C_AppendTask : public Context {
+ ObjectRecorder *object_recorder;
+ C_AppendTask(ObjectRecorder *o) : object_recorder(o) {}
+ virtual void complete(int r) {
+ object_recorder->handle_append_task();
+ }
+ virtual void finish(int r) {}
+ };
+ struct C_AppendFlush : public Context {
+ ObjectRecorder *object_recorder;
+ uint64_t tid;
+ C_AppendFlush(ObjectRecorder *o, uint64_t _tid)
+ : object_recorder(o), tid(_tid) {
+ object_recorder->get();
+ }
+ virtual void finish(int r) {
+ object_recorder->handle_append_flushed(tid, r);
+ object_recorder->put();
+ }
+ };
+
+ librados::IoCtx m_ioctx;
+ std::string m_oid;
+ uint64_t m_object_number;
+ CephContext *m_cct;
+
+ SafeTimer &m_timer;
+ Mutex &m_timer_lock;
+
+ OverflowHandler *m_overflow_handler;
+
+ uint8_t m_order;
+ uint64_t m_soft_max_size;
+
+ uint32_t m_flush_interval;
+ uint64_t m_flush_bytes;
+ double m_flush_age;
+
+ FlushHandler m_flush_handler;
+
+ C_AppendTask *m_append_task;
+
+ mutable Mutex m_lock;
+ AppendBuffers m_append_buffers;
+ uint64_t m_append_tid;
+ uint32_t m_pending_bytes;
+
+ InFlightAppends m_in_flight_appends;
+ uint64_t m_size;
+ bool m_overflowed;
+ bool m_object_closed;
+
+ bufferlist m_prefetch_bl;
+
+ void handle_append_task();
+ void cancel_append_task();
+ void schedule_append_task();
+
+ bool append(const AppendBuffer &append_buffer);
+ bool flush_appends(bool force);
+ void handle_append_flushed(uint64_t tid, int r);
+ void append_overflowed(uint64_t tid);
+ void send_appends(AppendBuffers *append_buffers);
+
+ void notify_overflow();
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_OBJECT_RECORDER_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/Payload.h"
+#include "journal/PayloadImpl.h"
+
+namespace journal {
+
+const bufferlist &Payload::get_data() const {
+ return m_payload_impl->get_data();
+}
+
+void intrusive_ptr_add_ref(PayloadImpl *p) {
+ p->get();
+}
+
+void intrusive_ptr_release(PayloadImpl *p) {
+ p->put();
+}
+
+} // namespace journal
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_PAYLOAD_H
+#define CEPH_JOURNAL_PAYLOAD_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include <boost/intrusive_ptr.hpp>
+
+namespace journal {
+
+class PayloadImpl;
+
+class Payload {
+public:
+ typedef boost::intrusive_ptr<PayloadImpl> PayloadImplPtr;
+
+ Payload(const PayloadImplPtr &payload) : m_payload_impl(payload) {}
+
+ const bufferlist &get_data() const;
+
+private:
+ friend class Journaler;
+
+ inline PayloadImplPtr get_payload_impl() const {
+ return m_payload_impl;
+ }
+
+ PayloadImplPtr m_payload_impl;
+};
+
+void intrusive_ptr_add_ref(PayloadImpl *p);
+void intrusive_ptr_release(PayloadImpl *p);
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_PAYLOAD_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/PayloadImpl.h"
+
+namespace journal {
+
+PayloadImpl::PayloadImpl(const bufferlist &data,
+ const ObjectSetPosition &object_set_position)
+ : m_data(data), m_object_set_position(object_set_position) {
+}
+
+const bufferlist &PayloadImpl::get_data() const {
+ return m_data;
+}
+
+const PayloadImpl::ObjectSetPosition &
+PayloadImpl::get_object_set_position() const {
+ return m_object_set_position;
+}
+
+} // namespace journal
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_PAYLOAD_IMPL_H
+#define CEPH_JOURNAL_PAYLOAD_IMPL_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "common/RefCountedObj.h"
+#include "cls/journal/cls_journal_types.h"
+#include <boost/noncopyable.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include "include/assert.h"
+
+namespace journal {
+
+class PayloadImpl;
+typedef boost::intrusive_ptr<PayloadImpl> PayloadImplPtr;
+
+class PayloadImpl : public RefCountedObject, boost::noncopyable {
+public:
+ typedef cls::journal::ObjectSetPosition ObjectSetPosition;
+
+ PayloadImpl(const bufferlist &data,
+ const ObjectSetPosition &object_set_position);
+
+ const bufferlist &get_data() const;
+ const ObjectSetPosition &get_object_set_position() const;
+
+private:
+ bufferlist m_data;
+ ObjectSetPosition m_object_set_position;
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_PAYLOAD_IMPL_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_REPLAY_HANDLER_H
+#define CEPH_JOURNAL_REPLAY_HANDLER_H
+
+namespace journal {
+
+struct ReplayHandler {
+ virtual ~ReplayHandler() {}
+
+ virtual void handle_entries_available() = 0;
+ virtual void handle_error(int r) = 0;
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_REPLAY_HANDLER_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/Utils.h"
+#include "include/stringify.h"
+
+namespace journal {
+namespace utils {
+
+std::string get_object_name(const std::string &prefix, uint64_t number) {
+ return prefix + stringify(number);
+}
+
+std::string unique_lock_name(const std::string &name, void *address) {
+ return name + " (" + stringify(address) + ")";
+}
+
+} // namespace utils
+} // namespace journal
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_UTILS_H
+#define CEPH_JOURNAL_UTILS_H
+
+#include "include/int_types.h"
+#include <string>
+
+namespace journal {
+namespace utils {
+
+std::string get_object_name(const std::string &prefix, uint64_t number);
+
+std::string unique_lock_name(const std::string &name, void *address);
+
+} // namespace utils
+} // namespace journal
+
+#endif // CEPH_JOURNAL_UTILS_H