]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: new generic journal implementation
authorJason Dillaman <dillaman@redhat.com>
Tue, 9 Jun 2015 20:38:43 +0000 (16:38 -0400)
committerJason Dillaman <dillaman@redhat.com>
Fri, 6 Nov 2015 01:42:41 +0000 (20:42 -0500)
The initial use-case for this journal is for librbd and its mirroring
feature. This journal is different from the current MDS journal in that
it organizes journal entries across journal objects w/o striping
individual entries.  It also allows multiple clients to read and write
from a single journal concurrently.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
27 files changed:
src/Makefile.am
src/journal/Entry.cc [new file with mode: 0644]
src/journal/Entry.h [new file with mode: 0644]
src/journal/Future.cc [new file with mode: 0644]
src/journal/Future.h [new file with mode: 0644]
src/journal/FutureImpl.cc [new file with mode: 0644]
src/journal/FutureImpl.h [new file with mode: 0644]
src/journal/JournalMetadata.cc [new file with mode: 0644]
src/journal/JournalMetadata.h [new file with mode: 0644]
src/journal/JournalPlayer.cc [new file with mode: 0644]
src/journal/JournalPlayer.h [new file with mode: 0644]
src/journal/JournalRecorder.cc [new file with mode: 0644]
src/journal/JournalRecorder.h [new file with mode: 0644]
src/journal/Journaler.cc [new file with mode: 0644]
src/journal/Journaler.h [new file with mode: 0644]
src/journal/Makefile.am [new file with mode: 0644]
src/journal/ObjectPlayer.cc [new file with mode: 0644]
src/journal/ObjectPlayer.h [new file with mode: 0644]
src/journal/ObjectRecorder.cc [new file with mode: 0644]
src/journal/ObjectRecorder.h [new file with mode: 0644]
src/journal/Payload.cc [new file with mode: 0644]
src/journal/Payload.h [new file with mode: 0644]
src/journal/PayloadImpl.cc [new file with mode: 0644]
src/journal/PayloadImpl.h [new file with mode: 0644]
src/journal/ReplayHandler.h [new file with mode: 0644]
src/journal/Utils.cc [new file with mode: 0644]
src/journal/Utils.h [new file with mode: 0644]

index 3bb6c19daf71d5b152831164bb212ddef88492de..9e31b041bcac4b3484da87f456fb40fe26bd16d6 100644 (file)
@@ -31,6 +31,7 @@ include messages/Makefile.am
 include include/Makefile.am
 include librados/Makefile.am
 include libradosstriper/Makefile.am
+include journal/Makefile.am
 include librbd/Makefile.am
 include rgw/Makefile.am
 include cls/Makefile.am
diff --git a/src/journal/Entry.cc b/src/journal/Entry.cc
new file mode 100644 (file)
index 0000000..bd26ebe
--- /dev/null
@@ -0,0 +1,156 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/Entry.h"
+#include "include/encoding.h"
+#include "include/stringify.h"
+#include "common/Formatter.h"
+#include <strstream>
+
+#define dout_subsys ceph_subsys_journaler
+#undef dout_prefix
+#define dout_prefix *_dout << "Entry: "
+
+namespace journal {
+
+namespace {
+
+const uint32_t HEADER_FIXED_SIZE = 17; /// preamble, version, tid
+
+} // anonymous namespace
+
+void Entry::encode(bufferlist &bl) const {
+  bufferlist data_bl;
+  ::encode(preamble, data_bl);
+  ::encode(static_cast<uint8_t>(1), data_bl);
+  ::encode(m_tid, data_bl);
+  assert(HEADER_FIXED_SIZE == data_bl.length());
+
+  ::encode(m_tag, data_bl);
+  ::encode(m_data, data_bl);
+
+  uint32_t crc = data_bl.crc32c(0);
+  bl.claim_append(data_bl);
+  ::encode(crc, bl);
+}
+
+void Entry::decode(bufferlist::iterator &iter) {
+  uint32_t start_offset = iter.get_off();
+  uint64_t bl_preamble;
+  ::decode(bl_preamble, iter);
+  if (bl_preamble != preamble) {
+    throw buffer::malformed_input("incorrect preamble: " +
+                                  stringify(bl_preamble));
+  }
+
+  uint8_t version;
+  ::decode(version, iter);
+  if (version != 1) {
+    throw buffer::malformed_input("unknown version: " + stringify(version));
+  }
+
+  ::decode(m_tid, iter);
+  ::decode(m_tag, iter);
+  ::decode(m_data, iter);
+  uint32_t end_offset = iter.get_off();
+
+  uint32_t crc;
+  ::decode(crc, iter);
+
+  bufferlist data_bl;
+  data_bl.substr_of(iter.get_bl(), start_offset, end_offset - start_offset);
+  uint32_t actual_crc = data_bl.crc32c(0);
+  if (crc != actual_crc) {
+    throw buffer::malformed_input("crc mismatch: " + stringify(crc) +
+                                  " != " + stringify(actual_crc));
+  }
+}
+
+void Entry::dump(Formatter *f) const {
+  f->dump_string("tag", m_tag);
+  f->dump_unsigned("tid", m_tid);
+
+  std::stringstream data;
+  m_data.hexdump(data);
+  f->dump_string("data", data.str());
+}
+
+bool Entry::is_readable(bufferlist::iterator iter, uint32_t *bytes_needed) {
+  uint32_t start_off = iter.get_off();
+  if (iter.get_remaining() < HEADER_FIXED_SIZE) {
+    *bytes_needed = HEADER_FIXED_SIZE - iter.get_remaining();
+    return false;
+  }
+  uint64_t bl_preamble;
+  ::decode(bl_preamble, iter);
+  if (bl_preamble != preamble) {
+    *bytes_needed = 0;
+    return false;
+  }
+  iter.advance(HEADER_FIXED_SIZE - sizeof(bl_preamble));
+
+  if (iter.get_remaining() < sizeof(uint32_t)) {
+    *bytes_needed = sizeof(uint32_t) - iter.get_remaining();
+    return false;
+  }
+  uint32_t tag_size;
+  ::decode(tag_size, iter);
+
+  if (iter.get_remaining() < tag_size) {
+    *bytes_needed = tag_size - iter.get_remaining();
+    return false;
+  }
+  iter.advance(tag_size);
+
+  if (iter.get_remaining() < sizeof(uint32_t)) {
+    *bytes_needed = sizeof(uint32_t) - iter.get_remaining();
+    return false;
+  }
+  uint32_t data_size;
+  ::decode(data_size, iter);
+
+  if (iter.get_remaining() < data_size) {
+    *bytes_needed = data_size - iter.get_remaining();
+    return false;
+  }
+  iter.advance(data_size);
+  uint32_t end_off = iter.get_off();
+
+  if (iter.get_remaining() < sizeof(uint32_t)) {
+    *bytes_needed = sizeof(uint32_t) - iter.get_remaining();
+    return false;
+  }
+
+  bufferlist crc_bl;
+  crc_bl.substr_of(iter.get_bl(), start_off, end_off - start_off);
+
+  *bytes_needed = 0;
+  uint32_t crc;
+  ::decode(crc, iter);
+  if (crc != crc_bl.crc32c(0)) {
+    return false;
+  }
+  return true;
+}
+
+void Entry::generate_test_instances(std::list<Entry *> &o) {
+  o.push_back(new Entry("tag1", 123, bufferlist()));
+
+  bufferlist bl;
+  bl.append("data");
+  o.push_back(new Entry("tag2", 123, bl));
+}
+
+bool Entry::operator==(const Entry& rhs) const {
+  return (m_tag == rhs.m_tag && m_tid == rhs.m_tid &&
+          const_cast<bufferlist&>(m_data).contents_equal(
+            const_cast<bufferlist&>(rhs.m_data)));
+}
+
+std::ostream &operator<<(std::ostream &os, const Entry &entry) {
+  os << "Entry[tag=" << entry.get_tag() << ", tid=" << entry.get_tid() << ", "
+     << "data size=" << entry.get_data().length() << "]";
+  return os;
+}
+
+} // namespace journal
diff --git a/src/journal/Entry.h b/src/journal/Entry.h
new file mode 100644 (file)
index 0000000..9e85df4
--- /dev/null
@@ -0,0 +1,62 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_ENTRY_H
+#define CEPH_JOURNAL_ENTRY_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "include/encoding.h"
+#include <iosfwd>
+#include <string>
+
+namespace ceph {
+class Formatter;
+}
+
+namespace journal {
+
+class Entry {
+public:
+  Entry() : m_tid() {}
+  Entry(const std::string &tag, uint64_t tid, const bufferlist &data)
+    : m_tag(tag), m_tid(tid), m_data(data)
+  {
+  }
+
+  inline const std::string &get_tag() const {
+    return m_tag;
+  }
+  inline uint64_t get_tid() const {
+    return m_tid;
+  }
+  inline const bufferlist &get_data() const {
+    return m_data;
+  }
+
+  void encode(bufferlist &bl) const;
+  void decode(bufferlist::iterator &iter);
+  void dump(ceph::Formatter *f) const;
+
+  bool operator==(const Entry& rhs) const;
+
+  static bool is_readable(bufferlist::iterator iter, uint32_t *bytes_needed);
+  static void generate_test_instances(std::list<Entry *> &o);
+
+private:
+  static const uint64_t preamble = 0x3141592653589793;
+
+  std::string m_tag;
+  uint64_t m_tid;
+  bufferlist m_data;
+};
+
+std::ostream &operator<<(std::ostream &os, const Entry &entry);
+
+} // namespace journal
+
+using journal::operator<<;
+
+WRITE_CLASS_ENCODER(journal::Entry)
+
+#endif // CEPH_JOURNAL_ENTRY_H
diff --git a/src/journal/Future.cc b/src/journal/Future.cc
new file mode 100644 (file)
index 0000000..5b05106
--- /dev/null
@@ -0,0 +1,38 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/Future.h"
+#include "journal/FutureImpl.h"
+
+namespace journal {
+
+void Future::flush(Context *on_safe) {
+  m_future_impl->flush(on_safe);
+}
+
+void Future::wait(Context *on_safe) {
+  m_future_impl->wait(on_safe);
+}
+
+bool Future::is_complete() const {
+  return m_future_impl->is_complete();
+}
+
+int Future::get_return_value() const {
+  return m_future_impl->get_return_value();
+}
+
+void intrusive_ptr_add_ref(FutureImpl *p) {
+  p->get();
+}
+
+void intrusive_ptr_release(FutureImpl *p) {
+  p->put();
+}
+
+std::ostream &operator<<(std::ostream &os, const Future &future) {
+  return os << *future.m_future_impl.get();
+}
+
+} // namespace journal
+
diff --git a/src/journal/Future.h b/src/journal/Future.h
new file mode 100644 (file)
index 0000000..3ded9f6
--- /dev/null
@@ -0,0 +1,48 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_FUTURE_H
+#define CEPH_JOURNAL_FUTURE_H
+
+#include "include/int_types.h"
+#include <string>
+#include <iosfwd>
+#include <boost/intrusive_ptr.hpp>
+#include "include/assert.h"
+
+class Context;
+
+namespace journal {
+
+class FutureImpl;
+
+class Future {
+public:
+  typedef boost::intrusive_ptr<FutureImpl> FutureImplPtr;
+
+  Future(const FutureImplPtr &future_impl) : m_future_impl(future_impl) {}
+
+  void flush(Context *on_safe);
+  void wait(Context *on_safe);
+
+  bool is_complete() const;
+  int get_return_value() const;
+
+private:
+  friend std::ostream& operator<<(std::ostream&, const Future&);
+
+  FutureImplPtr m_future_impl;
+};
+
+void intrusive_ptr_add_ref(FutureImpl *p);
+void intrusive_ptr_release(FutureImpl *p);
+
+std::ostream &operator<<(std::ostream &os, const Future &future);
+
+} // namespace journal
+
+using journal::intrusive_ptr_add_ref;
+using journal::intrusive_ptr_release;
+using journal::operator<<;
+
+#endif // CEPH_JOURNAL_FUTURE_H
diff --git a/src/journal/FutureImpl.cc b/src/journal/FutureImpl.cc
new file mode 100644 (file)
index 0000000..3ca8688
--- /dev/null
@@ -0,0 +1,143 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/FutureImpl.h"
+#include "journal/Utils.h"
+
+namespace journal {
+
+FutureImpl::FutureImpl(const std::string &tag, uint64_t tid)
+  : RefCountedObject(NULL, 0), m_tag(tag), m_tid(tid),
+    m_lock(utils::unique_lock_name("FutureImpl::m_lock", this)), m_safe(false),
+    m_consistent(false), m_return_value(0), m_flush_state(FLUSH_STATE_NONE),
+    m_consistent_ack(this) {
+}
+
+void FutureImpl::init(const FutureImplPtr &prev_future) {
+  // chain ourself to the prior future (if any) to that we known when the
+  // journal is consistent
+  if (prev_future) {
+    m_prev_future = prev_future;
+    m_prev_future->wait(&m_consistent_ack);
+  } else {
+    m_consistent_ack.complete(0);
+  }
+}
+
+void FutureImpl::flush(Context *on_safe) {
+  bool complete;
+  FlushHandlerPtr flush_handler;
+  {
+    Mutex::Locker locker(m_lock);
+    complete = (m_safe && m_consistent);
+    if (!complete) {
+      if (on_safe != NULL) {
+        m_contexts.push_back(on_safe);
+      }
+
+      if (m_flush_state == FLUSH_STATE_NONE) {
+        m_flush_state = FLUSH_STATE_REQUESTED;
+        flush_handler = m_flush_handler;
+
+        // walk the chain backwards up to <splay width> futures
+        if (m_prev_future) {
+          m_prev_future->flush();
+        }
+      }
+    }
+  }
+
+  if (complete && on_safe != NULL) {
+    on_safe->complete(m_return_value);
+  } else if (flush_handler) {
+    // attached to journal object -- instruct it to flush all entries through
+    // this one.  possible to become detached while lock is released, so flush
+    // will be re-requested by the object if it doesn't own the future
+    flush_handler->flush(this);
+  }
+}
+
+void FutureImpl::wait(Context *on_safe) {
+  assert(on_safe != NULL);
+  {
+    Mutex::Locker locker(m_lock);
+    if (!m_safe || !m_consistent) {
+      m_contexts.push_back(on_safe);
+      return;
+    }
+  }
+  on_safe->complete(m_return_value);
+}
+
+bool FutureImpl::is_complete() const {
+  Mutex::Locker locker(m_lock);
+  return m_safe && m_consistent;
+}
+
+int FutureImpl::get_return_value() const {
+  Mutex::Locker locker(m_lock);
+  assert(m_safe && m_consistent);
+  return m_return_value;
+}
+
+bool FutureImpl::attach(const FlushHandlerPtr &flush_handler) {
+  Mutex::Locker locker(m_lock);
+  assert(!m_flush_handler);
+  m_flush_handler = flush_handler;
+  return m_flush_state != FLUSH_STATE_NONE;
+}
+
+void FutureImpl::safe(int r) {
+  Mutex::Locker locker(m_lock);
+  assert(!m_safe);
+  m_safe = true;
+  if (m_return_value == 0) {
+    m_return_value = r;
+  }
+
+  m_flush_handler.reset();
+  if (m_consistent) {
+    finish();
+  }
+}
+
+void FutureImpl::consistent(int r) {
+  Mutex::Locker locker(m_lock);
+  assert(!m_consistent);
+  m_consistent = true;
+  m_prev_future.reset();
+  if (m_return_value == 0) {
+    m_return_value = r;
+  }
+
+  if (m_safe) {
+    finish();
+  }
+}
+
+void FutureImpl::finish() {
+  assert(m_lock.is_locked());
+  assert(m_safe && m_consistent);
+
+  Contexts contexts;
+  contexts.swap(m_contexts);
+  for (Contexts::iterator it = contexts.begin();
+       it != contexts.end(); ++it) {
+    (*it)->complete(m_return_value);
+  }
+}
+
+std::ostream &operator<<(std::ostream &os, const FutureImpl &future) {
+  os << "Future[tag=" << future.m_tag << ", tid=" << future.m_tid << "]";
+  return os;
+}
+
+void intrusive_ptr_add_ref(FutureImpl::FlushHandler *p) {
+  p->get();
+}
+
+void intrusive_ptr_release(FutureImpl::FlushHandler *p) {
+  p->put();
+}
+
+} // namespace journal
diff --git a/src/journal/FutureImpl.h b/src/journal/FutureImpl.h
new file mode 100644 (file)
index 0000000..eaf5feb
--- /dev/null
@@ -0,0 +1,121 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_FUTURE_IMPL_H
+#define CEPH_JOURNAL_FUTURE_IMPL_H
+
+#include "include/int_types.h"
+#include "common/Mutex.h"
+#include "common/RefCountedObj.h"
+#include "journal/Future.h"
+#include <list>
+#include <boost/noncopyable.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include "include/assert.h"
+
+class Context;
+
+namespace journal {
+
+class FutureImpl;
+typedef boost::intrusive_ptr<FutureImpl> FutureImplPtr;
+
+class FutureImpl : public RefCountedObject, boost::noncopyable {
+public:
+  struct FlushHandler {
+    virtual ~FlushHandler() {}
+    virtual void flush(const FutureImplPtr &future) = 0;
+    virtual void get() = 0;
+    virtual void put() = 0;
+  };
+  typedef boost::intrusive_ptr<FlushHandler> FlushHandlerPtr;
+
+  FutureImpl(const std::string &tag, uint64_t tid);
+
+  void init(const FutureImplPtr &prev_future);
+
+  inline const std::string &get_tag() const {
+    return m_tag;
+  }
+  inline uint64_t get_tid() const {
+    return m_tid;
+  }
+
+  void flush(Context *on_safe = NULL);
+  void wait(Context *on_safe);
+
+  bool is_complete() const;
+  int get_return_value() const;
+
+  inline bool is_flush_in_progress() const {
+    Mutex::Locker locker(m_lock);
+    return (m_flush_state == FLUSH_STATE_IN_PROGRESS);
+  }
+  inline void set_flush_in_progress() {
+    Mutex::Locker locker(m_lock);
+    m_flush_state = FLUSH_STATE_IN_PROGRESS;
+  }
+
+  bool attach(const FlushHandlerPtr &flush_handler);
+  inline void detach() {
+    Mutex::Locker locker(m_lock);
+    assert(m_flush_handler);
+    m_flush_handler.reset();
+  }
+  inline FlushHandlerPtr get_flush_handler() const {
+    Mutex::Locker locker(m_lock);
+    return m_flush_handler;
+  }
+
+  void safe(int r);
+
+private:
+  friend std::ostream &operator<<(std::ostream &, const FutureImpl &);
+
+  typedef std::list<Context *> Contexts;
+
+  enum FlushState {
+    FLUSH_STATE_NONE,
+    FLUSH_STATE_REQUESTED,
+    FLUSH_STATE_IN_PROGRESS
+  };
+
+  struct C_ConsistentAck : public Context {
+    FutureImplPtr future;
+    C_ConsistentAck(FutureImpl *_future) : future(_future) {}
+    virtual void complete(int r) {
+      future->consistent(r);
+      future.reset();
+    }
+    virtual void finish(int r) {}
+  };
+
+  std::string m_tag;
+  uint64_t m_tid;
+
+  mutable Mutex m_lock;
+  FutureImplPtr m_prev_future;
+  bool m_safe;
+  bool m_consistent;
+  int m_return_value;
+
+  FlushHandlerPtr m_flush_handler;
+  FlushState m_flush_state;
+
+  C_ConsistentAck m_consistent_ack;
+  Contexts m_contexts;
+
+  void consistent(int r);
+  void finish();
+};
+
+void intrusive_ptr_add_ref(FutureImpl::FlushHandler *p);
+void intrusive_ptr_release(FutureImpl::FlushHandler *p);
+
+std::ostream &operator<<(std::ostream &os, const FutureImpl &future);
+
+} // namespace journal
+
+using journal::operator<<;
+
+#endif // CEPH_JOURNAL_FUTURE_IMPL_H
diff --git a/src/journal/JournalMetadata.cc b/src/journal/JournalMetadata.cc
new file mode 100644 (file)
index 0000000..35cf217
--- /dev/null
@@ -0,0 +1,300 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/JournalMetadata.h"
+#include "common/errno.h"
+#include "common/Timer.h"
+#include "cls/journal/cls_journal_client.h"
+
+#define dout_subsys ceph_subsys_journaler
+#undef dout_prefix
+#define dout_prefix *_dout << "JournalMetadata: "
+
+namespace journal {
+
+using namespace cls::journal;
+
+namespace {
+
+struct C_NotifyUpdate : public Context {
+  JournalMetadataPtr journal_metadata;
+
+  C_NotifyUpdate(JournalMetadata *_journal_metadata)
+    : journal_metadata(_journal_metadata) {}
+
+  virtual void finish(int r) {
+    if (r == 0) {
+      journal_metadata->notify_update();
+    }
+  }
+};
+
+void rados_ctx_callback(rados_completion_t c, void *arg) {
+  Context *comp = reinterpret_cast<Context *>(arg);
+  comp->complete(rados_aio_get_return_value(c));
+}
+
+} // anonymous namespace
+
+JournalMetadata::JournalMetadata(librados::IoCtx &ioctx,
+                                 const std::string &oid,
+                                 const std::string &client_id)
+    : m_cct(NULL), m_oid(oid), m_client_id(client_id), m_order(0),
+      m_splay_width(0), m_initialized(false),  m_timer(NULL),
+      m_timer_lock("JournalMetadata::m_timer_lock"),
+      m_lock("JournalMetadata::m_lock"), m_watch_ctx(this), m_watch_handle(0),
+      m_update_notifications(0) {
+  m_ioctx.dup(ioctx);
+  m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
+}
+
+JournalMetadata::~JournalMetadata() {
+  if (m_timer != NULL) {
+    Mutex::Locker locker(m_timer_lock);
+    m_timer->shutdown();
+    delete m_timer;
+    m_timer = NULL;
+  }
+
+  m_ioctx.unwatch2(m_watch_handle);
+  librados::Rados rados(m_ioctx);
+  rados.watch_flush();
+}
+
+int JournalMetadata::init() {
+  assert(!m_initialized);
+  m_initialized = true;
+
+  int r = client::get_immutable_metadata(m_ioctx, m_oid, &m_order,
+                                         &m_splay_width);
+  if (r < 0) {
+    lderr(m_cct) << __func__ << ": failed to retrieve journal metadata: "
+                 << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  m_timer = new SafeTimer(m_cct, m_timer_lock, false);
+  m_timer->init();
+
+  r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx);
+  if (r < 0) {
+    lderr(m_cct) << __func__ << ": failed to watch journal"
+                 << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  C_SaferCond cond;
+  refresh(&cond);
+  r = cond.wait();
+  if (r < 0) {
+    return r;
+  }
+  return 0;
+}
+
+int JournalMetadata::register_client(const std::string &description) {
+  assert(!m_client_id.empty());
+
+  int r = client::client_register(m_ioctx, m_oid, m_client_id, description);
+  if (r < 0) {
+    lderr(m_cct) << "failed to register journal client '" << m_client_id
+                 << "': " << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  notify_update();
+  return 0;
+}
+
+int JournalMetadata::unregister_client() {
+  assert(!m_client_id.empty());
+
+  int r = client::client_unregister(m_ioctx, m_oid, m_client_id);
+  if (r < 0) {
+    lderr(m_cct) << "failed to unregister journal client '" << m_client_id
+                 << "': " << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  notify_update();
+  return 0;
+}
+
+void JournalMetadata::add_listener(Listener *listener) {
+  Mutex::Locker locker(m_lock);
+  while (m_update_notifications > 0) {
+    m_update_cond.Wait(m_lock);
+  }
+  m_listeners.push_back(listener);
+}
+
+void JournalMetadata::remove_listener(Listener *listener) {
+  Mutex::Locker locker(m_lock);
+  while (m_update_notifications > 0) {
+    m_update_cond.Wait(m_lock);
+  }
+  m_listeners.remove(listener);
+}
+
+void JournalMetadata::set_minimum_set(uint64_t object_set) {
+  Mutex::Locker locker(m_lock);
+  if (m_minimum_set >= object_set) {
+    return;
+  }
+
+  librados::ObjectWriteOperation op;
+  client::set_minimum_set(&op, object_set);
+
+  C_NotifyUpdate *ctx = new C_NotifyUpdate(this);
+  librados::AioCompletion *comp =
+    librados::Rados::aio_create_completion(ctx, NULL, rados_ctx_callback);
+  int r = m_ioctx.aio_operate(m_oid, comp, &op);
+  assert(r == 0);
+  comp->release();
+
+  m_minimum_set = object_set;
+}
+
+void JournalMetadata::set_active_set(uint64_t object_set) {
+  Mutex::Locker locker(m_lock);
+  if (m_active_set >= object_set) {
+    return;
+  }
+
+  librados::ObjectWriteOperation op;
+  client::set_active_set(&op, object_set);
+
+  C_NotifyUpdate *ctx = new C_NotifyUpdate(this);
+  librados::AioCompletion *comp =
+    librados::Rados::aio_create_completion(ctx, NULL, rados_ctx_callback);
+  int r = m_ioctx.aio_operate(m_oid, comp, &op);
+  assert(r == 0);
+  comp->release();
+
+  m_active_set = object_set;
+}
+
+void JournalMetadata::set_commit_position(
+    const ObjectSetPosition &commit_position) {
+  Mutex::Locker locker(m_lock);
+
+  librados::ObjectWriteOperation op;
+  client::client_commit(&op, m_client_id, commit_position);
+
+  C_NotifyUpdate *ctx = new C_NotifyUpdate(this);
+  librados::AioCompletion *comp =
+    librados::Rados::aio_create_completion(ctx, NULL, rados_ctx_callback);
+  int r = m_ioctx.aio_operate(m_oid, comp, &op);
+  assert(r == 0);
+  comp->release();
+}
+
+void JournalMetadata::reserve_tid(const std::string &tag, uint64_t tid) {
+  Mutex::Locker locker(m_lock);
+  uint64_t &allocated_tid = m_allocated_tids[tag];
+  if (allocated_tid <= tid) {
+    allocated_tid = tid + 1;
+  }
+}
+
+bool JournalMetadata::get_last_allocated_tid(const std::string &tag,
+                                             uint64_t *tid) const {
+  Mutex::Locker locker(m_lock);
+
+  AllocatedTids::const_iterator it = m_allocated_tids.find(tag);
+  if (it == m_allocated_tids.end()) {
+    return false;
+  }
+
+  assert(it->second > 0);
+  *tid = it->second - 1;
+  return true;
+}
+
+void JournalMetadata::refresh(Context *on_complete) {
+  ldout(m_cct, 10) << "refreshing journal metadata" << dendl;
+  C_Refresh *refresh = new C_Refresh(this, on_complete);
+  client::get_mutable_metadata(m_ioctx, m_oid, &refresh->minimum_set,
+                               &refresh->active_set,
+                               &refresh->registered_clients, refresh);
+}
+
+void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
+  ldout(m_cct, 10) << "refreshed journal metadata: r=" << r << dendl;
+  if (r == 0) {
+    Mutex::Locker locker(m_lock);
+
+    Client client(m_client_id, "");
+    RegisteredClients::iterator it = refresh->registered_clients.find(client);
+    if (it != refresh->registered_clients.end()) {
+      m_minimum_set = refresh->minimum_set;
+      m_active_set = refresh->active_set;
+      m_registered_clients = refresh->registered_clients;
+      m_client = *it;
+
+      ++m_update_notifications;
+      m_lock.Unlock();
+      for (Listeners::iterator it = m_listeners.begin();
+           it != m_listeners.end(); ++it) {
+        (*it)->handle_update(this);
+      }
+      m_lock.Lock();
+      if (--m_update_notifications == 0) {
+        m_update_cond.Signal();
+      }
+    } else {
+      lderr(m_cct) << "failed to locate client: " << m_client_id << dendl;
+      r = -ENOENT;
+    }
+  }
+
+  if (refresh->on_finish != NULL) {
+    refresh->on_finish->complete(r);
+  }
+}
+
+void JournalMetadata::schedule_watch_reset() {
+  Mutex::Locker locker(m_timer_lock);
+  m_timer->add_event_after(0.1, new C_WatchReset(this));
+}
+
+void JournalMetadata::handle_watch_reset() {
+  int r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx);
+  if (r < 0) {
+    lderr(m_cct) << __func__ << ": failed to watch journal"
+                 << cpp_strerror(r) << dendl;
+    schedule_watch_reset();
+  } else {
+    ldout(m_cct, 10) << __func__ << ": reset journal watch" << dendl;
+    refresh(NULL);
+  }
+}
+
+void JournalMetadata::handle_watch_notify(uint64_t notify_id, uint64_t cookie) {
+  ldout(m_cct, 10) << "journal header updated" << dendl;
+
+  bufferlist bl;
+  m_ioctx.notify_ack(m_oid, notify_id, cookie, bl);
+
+  refresh(NULL);
+}
+
+void JournalMetadata::handle_watch_error(int err) {
+  lderr(m_cct) << "journal watch error: " << cpp_strerror(err) << dendl;
+  schedule_watch_reset();
+}
+
+void JournalMetadata::notify_update() {
+  ldout(m_cct, 10) << "notifying journal header update" << dendl;
+
+  librados::AioCompletion *comp =
+    librados::Rados::aio_create_completion(NULL, NULL, NULL);
+
+  bufferlist bl;
+  int r = m_ioctx.aio_notify(m_oid, comp, bl, 5000, NULL);
+  assert(r == 0);
+
+  comp->release();
+}
+
+} // namespace journal
diff --git a/src/journal/JournalMetadata.h b/src/journal/JournalMetadata.h
new file mode 100644 (file)
index 0000000..2bab687
--- /dev/null
@@ -0,0 +1,179 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_JOURNAL_METADATA_H
+#define CEPH_JOURNAL_JOURNAL_METADATA_H
+
+#include "include/int_types.h"
+#include "include/Context.h"
+#include "include/rados/librados.hpp"
+#include "common/Mutex.h"
+#include "common/RefCountedObj.h"
+#include "cls/journal/cls_journal_types.h"
+#include <boost/intrusive_ptr.hpp>
+#include <boost/noncopyable.hpp>
+#include <list>
+#include <map>
+#include <string>
+#include "include/assert.h"
+
+class SafeTimer;
+
+namespace journal {
+
+class JournalMetadata;
+typedef boost::intrusive_ptr<JournalMetadata> JournalMetadataPtr;
+
+class JournalMetadata : public RefCountedObject, boost::noncopyable {
+public:
+  typedef cls::journal::EntryPosition EntryPosition;
+  typedef cls::journal::EntryPositions EntryPositions;
+  typedef cls::journal::ObjectSetPosition ObjectSetPosition;
+  typedef cls::journal::Client Client;
+
+  typedef std::set<Client> RegisteredClients;
+
+  struct Listener {
+    virtual ~Listener() {};
+    virtual void handle_update(JournalMetadata *) = 0;
+  };
+
+  JournalMetadata(librados::IoCtx &ioctx, const std::string &oid,
+                  const std::string &client_id);
+  ~JournalMetadata();
+
+  int init();
+
+  void add_listener(Listener *listener);
+  void remove_listener(Listener *listener);
+
+  int register_client(const std::string &description);
+  int unregister_client();
+
+  inline uint8_t get_order() const {
+    return m_order;
+  }
+  inline uint8_t get_splay_width() const {
+    return m_splay_width;
+  }
+
+  inline SafeTimer &get_timer() {
+    return *m_timer;
+  }
+  inline Mutex &get_timer_lock() {
+    return m_timer_lock;
+  }
+
+  void set_minimum_set(uint64_t object_set);
+  inline uint64_t get_minimum_set() const {
+    Mutex::Locker locker(m_lock);
+    return m_minimum_set;
+  }
+
+  void set_active_set(uint64_t object_set);
+  inline uint64_t get_active_set() const {
+    Mutex::Locker locker(m_lock);
+    return m_active_set;
+  }
+
+  void set_commit_position(const ObjectSetPosition &commit_position);
+  void get_commit_position(ObjectSetPosition *commit_position) const {
+    Mutex::Locker locker(m_lock);
+    *commit_position = m_client.commit_position;
+  }
+
+  inline uint64_t allocate_tid(const std::string &tag) {
+    Mutex::Locker locker(m_lock);
+    return m_allocated_tids[tag]++;
+  }
+  void reserve_tid(const std::string &tag, uint64_t tid);
+  bool get_last_allocated_tid(const std::string &tag, uint64_t *tid) const;
+
+  void notify_update();
+
+private:
+  typedef std::map<std::string, uint64_t> AllocatedTids;
+  typedef std::list<Listener*> Listeners;
+
+  struct C_WatchCtx : public librados::WatchCtx2 {
+    JournalMetadata *journal_metadata;
+
+    C_WatchCtx(JournalMetadata *_journal_metadata)
+      : journal_metadata(_journal_metadata) {}
+
+    virtual void handle_notify(uint64_t notify_id, uint64_t cookie,
+                               uint64_t notifier_id, bufferlist& bl) {
+      journal_metadata->handle_watch_notify(notify_id, cookie);
+    }
+    virtual void handle_error(uint64_t cookie, int err) {
+      journal_metadata->handle_watch_error(err);
+    }
+  };
+  struct C_WatchReset : public Context {
+    JournalMetadataPtr journal_metadata;
+
+    C_WatchReset(JournalMetadata *_journal_metadata)
+      : journal_metadata(_journal_metadata) {}
+
+    virtual void finish(int r) {
+      journal_metadata->handle_watch_reset();
+    }
+  };
+
+  struct C_Refresh : public Context {
+    JournalMetadataPtr journal_metadata;
+    uint64_t minimum_set;
+    uint64_t active_set;
+    RegisteredClients registered_clients;
+    Context *on_finish;
+
+    C_Refresh(JournalMetadata *_journal_metadata, Context *_on_finish)
+      : journal_metadata(_journal_metadata), minimum_set(0), active_set(0),
+        on_finish(_on_finish) {}
+
+    virtual void finish(int r) {
+      journal_metadata->handle_refresh_complete(this, r);
+    }
+  };
+
+  librados::IoCtx m_ioctx;
+  CephContext *m_cct;
+  std::string m_oid;
+  std::string m_client_id;
+
+  uint8_t m_order;
+  uint8_t m_splay_width;
+  bool m_initialized;
+
+  SafeTimer *m_timer;
+  Mutex m_timer_lock;
+
+  mutable Mutex m_lock;
+
+  Listeners m_listeners;
+
+  C_WatchCtx m_watch_ctx;
+  uint64_t m_watch_handle;
+
+  uint64_t m_minimum_set;
+  uint64_t m_active_set;
+  RegisteredClients m_registered_clients;
+  Client m_client;
+
+  AllocatedTids m_allocated_tids;
+
+  size_t m_update_notifications;
+  Cond m_update_cond;
+
+  void refresh(Context *on_finish);
+  void handle_refresh_complete(C_Refresh *refresh, int r);
+
+  void schedule_watch_reset();
+  void handle_watch_reset();
+  void handle_watch_notify(uint64_t notify_id, uint64_t cookie);
+  void handle_watch_error(int err);
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_JOURNAL_METADATA_H
diff --git a/src/journal/JournalPlayer.cc b/src/journal/JournalPlayer.cc
new file mode 100644 (file)
index 0000000..ed70519
--- /dev/null
@@ -0,0 +1,358 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/JournalPlayer.h"
+#include "journal/ReplayHandler.h"
+#include "journal/Utils.h"
+
+#define dout_subsys ceph_subsys_journaler
+#undef dout_prefix
+#define dout_prefix *_dout << "JournalPlayer: "
+
+namespace journal {
+
+JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
+                             const std::string &object_oid_prefix,
+                             const JournalMetadataPtr& journal_metadata,
+                             ReplayHandler *replay_handler)
+  : RefCountedObject(NULL, 0), m_cct(NULL),
+    m_object_oid_prefix(object_oid_prefix),
+    m_journal_metadata(journal_metadata), m_replay_handler(replay_handler),
+    m_process_state(this), m_lock("JournalPlayer::m_lock"), m_state(STATE_INIT),
+    m_splay_offset(0), m_watch_enabled(false), m_watch_scheduled(false),
+    m_watch_interval(0), m_commit_object(0) {
+  m_ioctx.dup(ioctx);
+  m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+
+  ObjectSetPosition commit_position;
+  m_journal_metadata->get_commit_position(&commit_position);
+  if (!commit_position.entry_positions.empty()) {
+    uint8_t splay_width = m_journal_metadata->get_splay_width();
+    m_splay_offset = commit_position.object_number % splay_width;
+    m_commit_object = commit_position.object_number;
+    m_commit_tag = commit_position.entry_positions.front().tag;
+
+    for (size_t i=0; i<commit_position.entry_positions.size(); ++i) {
+      const EntryPosition &entry_position = commit_position.entry_positions[i];
+      m_commit_tids[entry_position.tag] = entry_position.tid;
+    }
+  }
+}
+
+void JournalPlayer::prefetch() {
+  m_lock.Lock();
+  assert(m_state == STATE_INIT);
+  m_state = STATE_PREFETCH;
+
+  uint8_t splay_width = m_journal_metadata->get_splay_width();
+  ldout(m_cct, 10) << __func__ << ": prefetching " << (2 * splay_width) << " "
+                   << "objects" << dendl;
+
+  // prefetch starting from the last known commit set
+  C_PrefetchBatch *ctx = new C_PrefetchBatch(this);
+  uint64_t start_object = (m_commit_object / splay_width) * splay_width;
+  for (uint64_t object_number = start_object;
+       object_number < start_object + (2 * splay_width); ++object_number) {
+    ctx->add_fetch();
+    fetch(object_number, ctx);
+  }
+  m_lock.Unlock();
+
+  ctx->complete(0);
+}
+
+void JournalPlayer::prefetch_and_watch(double interval) {
+  {
+    Mutex::Locker locker(m_lock);
+    m_watch_enabled = true;
+    m_watch_interval = interval;
+  }
+  prefetch();
+}
+
+void JournalPlayer::unwatch() {
+  Mutex::Locker locker(m_lock);
+  if (m_watch_scheduled) {
+    ObjectPlayerPtr object_player = get_object_player();
+    assert(object_player);
+
+    object_player->unwatch();
+    m_watch_scheduled = false;
+  }
+}
+
+bool JournalPlayer::try_pop_front(Entry *entry,
+                                  ObjectSetPosition *object_set_position) {
+  Mutex::Locker locker(m_lock);
+  if (m_state != STATE_PLAYBACK) {
+    return false;
+  }
+
+  ObjectPlayerPtr object_player = get_object_player();
+  assert(object_player);
+
+  if (object_player->empty()) {
+    if (m_watch_enabled && !m_watch_scheduled) {
+      object_player->watch(&m_process_state, m_watch_interval);
+      m_watch_scheduled = true;
+    }
+    return false;
+  }
+
+  object_player->front(entry);
+  object_player->pop_front();
+
+  uint64_t last_tid;
+  if (m_journal_metadata->get_last_allocated_tid(entry->get_tag(), &last_tid) &&
+      entry->get_tid() != last_tid + 1) {
+    lderr(m_cct) << "missing prior journal entry: " << *entry << dendl;
+
+    m_state = STATE_ERROR;
+    m_lock.Unlock();
+    m_replay_handler->handle_error(-EINVAL);
+    m_lock.Lock();
+    return false;
+  }
+
+  // skip to next splay offset if we cannot apply the next entry in-sequence
+  if (!object_player->empty()) {
+    Entry peek_entry;
+    object_player->front(&peek_entry);
+    if (peek_entry.get_tag() == entry->get_tag() ||
+        (m_journal_metadata->get_last_allocated_tid(peek_entry.get_tag(),
+                                                    &last_tid) &&
+         last_tid +1 != peek_entry.get_tid())) {
+      advance_splay_object();
+    }
+    } else {
+    advance_splay_object();
+
+    ObjectPlayerPtr next_set_object_player = get_next_set_object_player();
+    if (!next_set_object_player->empty()) {
+      remove_object_player(object_player, &m_process_state);
+    }
+  }
+
+  // TODO populate the object_set_position w/ current object number and
+  //      unique entry tag mappings
+  m_journal_metadata->reserve_tid(entry->get_tag(), entry->get_tid());
+  return true;
+}
+
+void JournalPlayer::process_state(int r) {
+  ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
+  if (r == 0) {
+    Mutex::Locker locker(m_lock);
+    switch (m_state) {
+    case STATE_PREFETCH:
+      ldout(m_cct, 10) << "PREFETCH" << dendl;
+      r = process_prefetch();
+      break;
+    case STATE_PLAYBACK:
+      ldout(m_cct, 10) << "PLAYBACK" << dendl;
+      r = process_playback();
+      break;
+    case STATE_ERROR:
+      ldout(m_cct, 10) << "ERROR" << dendl;
+      break;
+    default:
+      lderr(m_cct) << "UNEXPECTED STATE (" << m_state << ")" << dendl;
+      assert(false);
+      break;
+    }
+  }
+
+  if (r < 0) {
+    {
+      Mutex::Locker locker(m_lock);
+      m_state = STATE_ERROR;
+    }
+    m_replay_handler->handle_error(r);
+  }
+}
+
+int JournalPlayer::process_prefetch() {
+  ldout(m_cct, 10) << __func__ << dendl;
+  assert(m_lock.is_locked());
+
+  uint8_t splay_width = m_journal_metadata->get_splay_width();
+  for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
+    assert(m_object_players.count(splay_offset) == 1);
+
+    ObjectPlayers &object_players = m_object_players[splay_offset];
+    assert(object_players.size() == 2);
+
+    ObjectPlayerPtr object_player = object_players.begin()->second;
+    assert(!object_player->is_fetch_in_progress());
+
+    ldout(m_cct, 15) << "seeking known commit position in "
+                     << object_player->get_oid() << dendl;
+    Entry entry;
+    while (!object_player->empty()) {
+      object_player->front(&entry);
+      if (entry.get_tid() > m_commit_tids[entry.get_tag()]) {
+        ldout(m_cct, 10) << "located next uncommitted entry: " << entry
+                         << dendl;
+        break;
+      }
+
+      ldout(m_cct, 20) << "skipping committed entry: " << entry << dendl;
+      object_player->pop_front();
+    }
+
+    // if this object contains the commit position, our read should start with
+    // the next consistent journal entry in the sequence
+    if (object_player->get_object_number() == m_commit_object) {
+      if (object_player->empty()) {
+        advance_splay_object();
+      } else {
+        Entry entry;
+        object_player->front(&entry);
+        if (entry.get_tag() == m_commit_tag) {
+          advance_splay_object();
+        }
+      }
+    }
+
+    ObjectPlayerPtr next_set_object_player = get_next_set_object_player();
+    if (object_player->empty() && !next_set_object_player->empty()) {
+      ldout(m_cct, 15) << object_player->get_oid() << " empty" << dendl;
+      remove_object_player(object_player, &m_process_state);
+    }
+  }
+
+  m_state = STATE_PLAYBACK;
+  ObjectPlayerPtr object_player = get_object_player();
+  if (!object_player->empty()) {
+    ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
+    m_lock.Unlock();
+    m_replay_handler->handle_entries_available();
+    m_lock.Lock();
+  } else if (m_watch_enabled) {
+    object_player->watch(&m_process_state, m_watch_interval);
+    m_watch_scheduled = true;
+  }
+  return 0;
+}
+
+int JournalPlayer::process_playback() {
+  ldout(m_cct, 10) << __func__ << dendl;
+  assert(m_lock.is_locked());
+
+  m_watch_scheduled = false;
+
+  ObjectPlayerPtr object_player = get_object_player();
+  if (!object_player->empty()) {
+    ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
+    m_lock.Unlock();
+    m_replay_handler->handle_entries_available();
+    m_lock.Lock();
+  }
+  return 0;
+}
+
+const JournalPlayer::ObjectPlayers &JournalPlayer::get_object_players() const {
+  assert(m_lock.is_locked());
+
+  assert(m_object_players.count(m_splay_offset) == 1);
+  SplayedObjectPlayers::const_iterator it = m_object_players.find(
+    m_splay_offset);
+  assert(it != m_object_players.end());
+
+  const ObjectPlayers &object_players = it->second;
+  assert(object_players.size() == 2);
+  return object_players;
+}
+
+ObjectPlayerPtr JournalPlayer::get_object_player() const {
+  assert(m_lock.is_locked());
+
+  const ObjectPlayers &object_players = get_object_players();
+  return object_players.begin()->second;
+}
+
+ObjectPlayerPtr JournalPlayer::get_next_set_object_player() const {
+  assert(m_lock.is_locked());
+
+  const ObjectPlayers &object_players = get_object_players();
+  return object_players.rbegin()->second;
+}
+
+void JournalPlayer::advance_splay_object() {
+  assert(m_lock.is_locked());
+  ++m_splay_offset;
+  m_splay_offset %= m_journal_metadata->get_splay_width();
+  ldout(m_cct, 20) << __func__ << ": new offset "
+                   << static_cast<uint32_t>(m_splay_offset) << dendl;
+}
+
+void JournalPlayer::remove_object_player(const ObjectPlayerPtr &object_player,
+                                         Context *on_fetch) {
+  assert(m_lock.is_locked());
+
+  uint8_t splay_width = m_journal_metadata->get_splay_width();
+  ObjectPlayers &object_players = m_object_players[
+    object_player->get_object_number() % splay_width];
+  assert(!object_players.empty());
+  assert(object_players.begin()->second == object_player);
+  object_players.erase(object_players.begin());
+
+  fetch(object_player->get_object_number() + (2 * splay_width), on_fetch);
+}
+
+void JournalPlayer::fetch(uint64_t object_num, Context *ctx) {
+  assert(m_lock.is_locked());
+
+  std::string oid = utils::get_object_name(m_object_oid_prefix, object_num);
+
+  ldout(m_cct, 10) << __func__ << ": " << oid << dendl;
+  C_Fetch *fetch_ctx = new C_Fetch(this, object_num, ctx);
+  ObjectPlayerPtr object_player(new ObjectPlayer(
+    m_ioctx, m_object_oid_prefix, object_num, m_journal_metadata->get_timer(),
+    m_journal_metadata->get_timer_lock(), m_journal_metadata->get_order()));
+
+  uint8_t splay_width = m_journal_metadata->get_splay_width();
+  m_object_players[object_num % splay_width][object_num] = object_player;
+  object_player->fetch(fetch_ctx);
+}
+
+int JournalPlayer::handle_fetched(int r, uint64_t object_num) {
+  std::string oid = utils::get_object_name(m_object_oid_prefix, object_num);
+
+  ldout(m_cct, 10) << __func__ << ": fetched "
+                   << utils::get_object_name(m_object_oid_prefix, object_num)
+                   << ": r=" << r << dendl;
+  if (r < 0 && r != -ENOENT) {
+    return r;
+  }
+  return 0;
+}
+
+JournalPlayer::C_PrefetchBatch::C_PrefetchBatch(JournalPlayer *p)
+  : player(p), lock("JournalPlayer::C_PrefetchBatch::lock"), refs(1),
+    return_value(0) {
+  player->get();
+}
+
+void JournalPlayer::C_PrefetchBatch::add_fetch() {
+  Mutex::Locker locker(lock);
+  ++refs;
+}
+
+void JournalPlayer::C_PrefetchBatch::complete(int r) {
+  {
+    Mutex::Locker locker(lock);
+    if (r < 0 && return_value == 0) {
+      return_value = r;
+    }
+    --refs;
+  }
+
+  if (refs == 0) {
+    player->process_state(return_value);
+    player->put();
+    delete this;
+  }
+}
+
+} // namespace journal
diff --git a/src/journal/JournalPlayer.h b/src/journal/JournalPlayer.h
new file mode 100644 (file)
index 0000000..43d81f4
--- /dev/null
@@ -0,0 +1,127 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_JOURNAL_PLAYER_H
+#define CEPH_JOURNAL_JOURNAL_PLAYER_H
+
+#include "include/int_types.h"
+#include "include/Context.h"
+#include "include/rados/librados.hpp"
+#include "common/Mutex.h"
+#include "common/RefCountedObj.h"
+#include "journal/JournalMetadata.h"
+#include "journal/ObjectPlayer.h"
+#include "cls/journal/cls_journal_types.h"
+#include <map>
+
+class SafeTimer;
+
+namespace journal {
+
+class ReplayHandler;
+class JournalPlayer;
+typedef boost::intrusive_ptr<JournalPlayer> JournalPlayerPtr;
+
+class JournalPlayer : public RefCountedObject {
+public:
+  typedef cls::journal::EntryPosition EntryPosition;
+  typedef cls::journal::EntryPositions EntryPositions;
+  typedef cls::journal::ObjectSetPosition ObjectSetPosition;
+
+  JournalPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
+                const JournalMetadataPtr& journal_metadata,
+                ReplayHandler *replay_handler);
+
+  void prefetch();
+  void prefetch_and_watch(double interval);
+  void unwatch();
+
+  bool try_pop_front(Entry *entry, ObjectSetPosition *object_set_position);
+
+private:
+  typedef std::map<std::string, uint64_t> AllocatedTids;
+  typedef std::map<uint64_t, ObjectPlayerPtr> ObjectPlayers;
+  typedef std::map<uint8_t, ObjectPlayers> SplayedObjectPlayers;
+
+  enum State {
+    STATE_INIT,
+    STATE_PREFETCH,
+    STATE_PLAYBACK,
+    STATE_ERROR
+  };
+
+  struct C_ProcessState : public Context {
+    JournalPlayer *player;
+    C_ProcessState(JournalPlayer *p) : player(p) {}
+    virtual void complete(int r) {
+      player->process_state(r);
+    }
+    virtual void finish(int r) {}
+  };
+  struct C_PrefetchBatch : public Context {
+    JournalPlayer *player;
+    Mutex lock;
+    uint32_t refs;
+    int return_value;
+
+    C_PrefetchBatch(JournalPlayer *p);
+    void add_fetch();
+    virtual void complete(int r);
+    virtual void finish(int r) {}
+  };
+  struct C_Fetch : public Context {
+    JournalPlayer *player;
+    uint64_t object_num;
+    Context *on_fetch;
+    C_Fetch(JournalPlayer *p, uint64_t o, Context *c)
+      : player(p), object_num(o), on_fetch(c) {
+      player->get();
+    }
+    virtual void finish(int r) {
+      r = player->handle_fetched(r, object_num);
+      on_fetch->complete(r);
+      player->put();
+    }
+  };
+
+  librados::IoCtx m_ioctx;
+  CephContext *m_cct;
+  std::string m_object_oid_prefix;
+  JournalMetadataPtr m_journal_metadata;
+
+  ReplayHandler *m_replay_handler;
+
+  C_ProcessState m_process_state;
+
+  mutable Mutex m_lock;
+  State m_state;
+  uint8_t m_splay_offset;
+
+  bool m_watch_enabled;
+  bool m_watch_scheduled;
+  double m_watch_interval;
+
+  SplayedObjectPlayers m_object_players;
+  uint64_t m_commit_object;
+  std::string m_commit_tag;
+  AllocatedTids m_commit_tids;
+
+  void advance_splay_object();
+
+  const ObjectPlayers &get_object_players() const;
+  ObjectPlayerPtr get_object_player() const;
+  ObjectPlayerPtr get_next_set_object_player() const;
+  void remove_object_player(const ObjectPlayerPtr &object_player,
+                            Context *on_fetch);
+
+  void process_state(int r);
+  int process_prefetch();
+  int process_playback();
+
+  void fetch(uint64_t object_num, Context *ctx);
+  int handle_fetched(int r, uint64_t object_num);
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_JOURNAL_PLAYER_H
diff --git a/src/journal/JournalRecorder.cc b/src/journal/JournalRecorder.cc
new file mode 100644 (file)
index 0000000..9050775
--- /dev/null
@@ -0,0 +1,174 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/JournalRecorder.h"
+#include "journal/Entry.h"
+#include "journal/Utils.h"
+
+#define dout_subsys ceph_subsys_journaler
+#undef dout_prefix
+#define dout_prefix *_dout << "JournalRecorder: "
+
+namespace journal {
+
+JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
+                                 const std::string &object_oid_prefix,
+                                 const JournalMetadataPtr& journal_metadata,
+                                 uint32_t flush_interval, uint64_t flush_bytes,
+                                 double flush_age)
+  : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
+    m_journal_metadata(journal_metadata), m_flush_interval(flush_interval),
+    m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_listener(this),
+    m_overflow_handler(this), m_lock("JournalerRecorder::m_lock"),
+    m_current_set(m_journal_metadata->get_active_set()) {
+
+  Mutex::Locker locker(m_lock);
+  m_ioctx.dup(ioctx);
+  m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
+
+  uint8_t splay_width = m_journal_metadata->get_splay_width();
+  for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
+    uint64_t object_number = splay_offset + (m_current_set * splay_width);
+    m_object_ptrs[splay_offset] = create_object_recorder(object_number);
+  }
+
+  m_journal_metadata->add_listener(&m_listener);
+}
+
+JournalRecorder::~JournalRecorder() {
+  m_journal_metadata->remove_listener(&m_listener);
+}
+
+Future JournalRecorder::append(const std::string &tag,
+                               const bufferlist &payload_bl) {
+  Mutex::Locker locker(m_lock);
+
+  uint64_t tid = m_journal_metadata->allocate_tid(tag);
+  uint8_t splay_width = m_journal_metadata->get_splay_width();
+  uint8_t splay_offset = tid % splay_width;
+
+  ObjectRecorderPtr object_ptr = get_object(splay_offset);
+  FutureImplPtr future(new FutureImpl(tag, tid));
+  future->init(m_prev_future);
+  m_prev_future = future;
+
+  bufferlist entry_bl;
+  ::encode(Entry(future->get_tag(), future->get_tid(), payload_bl), entry_bl);
+
+  AppendBuffers append_buffers;
+  append_buffers.push_back(std::make_pair(future, entry_bl));
+  bool object_full = object_ptr->append(append_buffers);
+
+  // TODO populate the object_set_position
+
+  if (object_full) {
+    ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full"
+                     << dendl;
+    close_object_set(object_ptr->get_object_number() / splay_width);
+  }
+
+  return Future(future);
+}
+
+void JournalRecorder::flush() {
+  Mutex::Locker locker(m_lock);
+  for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
+       it != m_object_ptrs.end(); ++it) {
+    it->second->flush();
+  }
+}
+
+ObjectRecorderPtr JournalRecorder::get_object(uint8_t splay_offset) {
+  assert(m_lock.is_locked());
+
+  ObjectRecorderPtr object_recoder = m_object_ptrs[splay_offset];
+  assert(object_recoder != NULL);
+  return object_recoder;
+}
+
+void JournalRecorder::close_object_set(uint64_t object_set) {
+  assert(m_lock.is_locked());
+
+  uint8_t splay_width = m_journal_metadata->get_splay_width();
+  if (object_set != m_current_set) {
+    return;
+  }
+
+  uint64_t active_set = m_journal_metadata->get_active_set();
+  if (active_set < m_current_set + 1) {
+    m_journal_metadata->set_active_set(m_current_set + 1);
+  }
+  m_current_set = m_journal_metadata->get_active_set();
+
+  ldout(m_cct, 10) << __func__ << ": advancing to object set "
+                   << m_current_set << dendl;
+
+  // object recorders will invoke overflow handler as they complete
+  // closing the object to ensure correct order of future appends
+  for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
+       it != m_object_ptrs.end(); ++it) {
+    ObjectRecorderPtr object_recorder = it->second;
+    if (object_recorder != NULL &&
+        object_recorder->get_object_number() / splay_width == m_current_set) {
+      if (object_recorder->close_object()) {
+        // no in-flight ops, immediately create new recorder
+        create_next_object_recorder(object_recorder);
+      }
+    }
+  }
+}
+
+ObjectRecorderPtr JournalRecorder::create_object_recorder(
+    uint64_t object_number) {
+  ObjectRecorderPtr object_recorder(new ObjectRecorder(
+    m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number),
+    object_number, m_journal_metadata->get_timer(),
+    m_journal_metadata->get_timer_lock(), &m_overflow_handler,
+    m_journal_metadata->get_order(), m_flush_interval, m_flush_bytes,
+    m_flush_age));
+  return object_recorder;
+}
+
+void JournalRecorder::create_next_object_recorder(
+    ObjectRecorderPtr object_recorder) {
+  assert(m_lock.is_locked());
+
+  uint64_t object_number = object_recorder->get_object_number();
+  uint8_t splay_width = m_journal_metadata->get_splay_width();
+  uint8_t splay_offset = object_number % splay_width;
+
+  ObjectRecorderPtr new_object_recorder = create_object_recorder(
+     (m_current_set * splay_width) + splay_offset);
+
+  AppendBuffers append_buffers;
+  object_recorder->claim_append_buffers(&append_buffers);
+  new_object_recorder->append(append_buffers);
+
+  m_object_ptrs[splay_offset] = new_object_recorder;
+}
+
+void JournalRecorder::handle_update() {
+  Mutex::Locker locker(m_lock);
+
+  uint64_t active_set = m_journal_metadata->get_active_set();
+  if (active_set > m_current_set) {
+    close_object_set(m_current_set);
+  }
+}
+
+void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) {
+  ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl;
+
+  Mutex::Locker locker(m_lock);
+
+  uint64_t object_number = object_recorder->get_object_number();
+  uint8_t splay_width = m_journal_metadata->get_splay_width();
+  uint8_t splay_offset = object_number % splay_width;
+  ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset];
+  assert(active_object_recorder->get_object_number() == object_number);
+
+  close_object_set(object_number / splay_width);
+  create_next_object_recorder(active_object_recorder);
+}
+
+} // namespace journal
diff --git a/src/journal/JournalRecorder.h b/src/journal/JournalRecorder.h
new file mode 100644 (file)
index 0000000..0604d55
--- /dev/null
@@ -0,0 +1,89 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_JOURNAL_RECORDER_H
+#define CEPH_JOURNAL_JOURNAL_RECORDER_H
+
+#include "include/int_types.h"
+#include "include/rados/librados.hpp"
+#include "common/Mutex.h"
+#include "journal/Future.h"
+#include "journal/FutureImpl.h"
+#include "journal/JournalMetadata.h"
+#include "journal/ObjectRecorder.h"
+#include <map>
+#include <string>
+
+class SafeTimer;
+
+namespace journal {
+
+class JournalRecorder {
+public:
+  JournalRecorder(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
+                  const JournalMetadataPtr &journal_metadata,
+                  uint32_t flush_interval, uint64_t flush_bytes,
+                  double flush_age);
+  ~JournalRecorder();
+
+  Future append(const std::string &tag, const bufferlist &bl);
+  void flush();
+
+  ObjectRecorderPtr get_object(uint8_t splay_offset);
+
+private:
+  typedef std::map<uint8_t, ObjectRecorderPtr> ObjectRecorderPtrs;
+
+  struct Listener : public JournalMetadata::Listener {
+    JournalRecorder *journal_recorder;
+
+    Listener(JournalRecorder *_journal_recorder)
+      : journal_recorder(_journal_recorder) {}
+
+    virtual void handle_update(JournalMetadata *) {
+      journal_recorder->handle_update();
+    }
+  };
+
+  struct OverflowHandler : public ObjectRecorder::OverflowHandler {
+    JournalRecorder *journal_recorder;
+
+    OverflowHandler(JournalRecorder *_journal_recorder)
+      : journal_recorder(_journal_recorder) {}
+
+    virtual void overflow(ObjectRecorder *object_recorder) {
+      journal_recorder->handle_overflow(object_recorder);
+    }
+  };
+
+  librados::IoCtx m_ioctx;
+  CephContext *m_cct;
+  std::string m_object_oid_prefix;
+
+  JournalMetadataPtr m_journal_metadata;
+
+  uint32_t m_flush_interval;
+  uint64_t m_flush_bytes;
+  double m_flush_age;
+
+  Listener m_listener;
+  OverflowHandler m_overflow_handler;
+
+  Mutex m_lock;
+
+  uint64_t m_current_set;
+  ObjectRecorderPtrs m_object_ptrs;
+
+  FutureImplPtr m_prev_future;
+
+  void close_object_set(uint64_t object_set);
+  ObjectRecorderPtr create_object_recorder(uint64_t object_number);
+  void create_next_object_recorder(ObjectRecorderPtr object_recorder);
+
+  void handle_update();
+  void handle_overflow(ObjectRecorder *object_recorder);
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_JOURNAL_RECORDER_H
diff --git a/src/journal/Journaler.cc b/src/journal/Journaler.cc
new file mode 100644 (file)
index 0000000..4613517
--- /dev/null
@@ -0,0 +1,144 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/Journaler.h"
+#include "include/stringify.h"
+#include "common/errno.h"
+#include "journal/Entry.h"
+#include "journal/FutureImpl.h"
+#include "journal/JournalMetadata.h"
+#include "journal/JournalPlayer.h"
+#include "journal/JournalRecorder.h"
+#include "journal/PayloadImpl.h"
+#include "journal/ReplayHandler.h"
+#include "cls/journal/cls_journal_client.h"
+#include "cls/journal/cls_journal_types.h"
+
+#define dout_subsys ceph_subsys_journaler
+#undef dout_prefix
+#define dout_prefix *_dout << "Journaler: "
+
+namespace journal {
+
+namespace {
+
+static const std::string JOURNAL_HEADER_PREFIX = "journal.";
+static const std::string JOURNAL_OBJECT_PREFIX = "journal_data.";
+
+} // anonymous namespace
+
+using namespace cls::journal;
+
+Journaler::Journaler(librados::IoCtx &header_ioctx, librados::IoCtx &data_ioctx,
+                     uint64_t journal_id, const std::string &client_id)
+  : m_client_id(client_id), m_metadata(NULL), m_player(NULL), m_recorder(NULL)
+{
+  m_header_ioctx.dup(header_ioctx);
+  m_data_ioctx.dup(data_ioctx);
+  m_cct = reinterpret_cast<CephContext *>(m_header_ioctx.cct());
+
+  m_header_oid = JOURNAL_HEADER_PREFIX + stringify(journal_id);
+  m_object_oid_prefix = JOURNAL_OBJECT_PREFIX + stringify(journal_id) + ".";
+
+  m_metadata = new JournalMetadata(m_header_ioctx, m_header_oid, m_client_id);
+  m_metadata->get();
+}
+
+Journaler::~Journaler() {
+  if (m_metadata != NULL) {
+    m_metadata->put();
+    m_metadata = NULL;
+  }
+  assert(m_player == NULL);
+  assert(m_recorder == NULL);
+}
+
+int Journaler::init() {
+  return m_metadata->init();
+}
+
+int Journaler::create(uint8_t order, uint8_t splay_width) {
+  if (order > 64 || order < 12) {
+    lderr(m_cct) << "order must be in the range [12, 64]" << dendl;
+    return -EDOM;
+  }
+  if (splay_width == 0) {
+    return -EINVAL;
+  }
+
+  ldout(m_cct, 5) << "creating new journal: " << m_header_oid << dendl;
+  int r = client::create(m_header_ioctx, m_header_oid, order, splay_width);
+  if (r < 0) {
+    lderr(m_cct) << "failed to create journal: " << cpp_strerror(r) << dendl;
+    return r;
+  }
+  return 0;
+}
+
+int Journaler::register_client(const std::string &description) {
+  return m_metadata->register_client(description);
+}
+
+int Journaler::unregister_client() {
+  return m_metadata->unregister_client();
+}
+
+void Journaler::start_replay(ReplayHandler *replay_handler) {
+  create_player(replay_handler);
+  m_player->prefetch();
+}
+
+void Journaler::start_live_replay(ReplayHandler *replay_handler,
+                                  double interval) {
+  create_player(replay_handler);
+  m_player->prefetch_and_watch(interval);
+}
+
+bool Journaler::try_pop_front(Payload *payload) {
+  assert(m_player != NULL);
+
+  Entry entry;
+  ObjectSetPosition object_set_position;
+  if (!m_player->try_pop_front(&entry, &object_set_position)) {
+    return false;
+  }
+
+  *payload = Payload(new PayloadImpl(entry.get_data(), object_set_position));
+  return true;
+}
+
+void Journaler::stop_replay() {
+  assert(m_player != NULL);
+  m_player->unwatch();
+  m_player->put();
+  m_player = NULL;
+}
+
+void Journaler::start_append() {
+  assert(m_recorder == NULL);
+
+  // TODO verify active object set >= current replay object set
+
+  // TODO configurable flush intervals
+  m_recorder = new JournalRecorder(m_data_ioctx, m_object_oid_prefix,
+                                   m_metadata, 0, 0, 0);
+}
+
+void Journaler::stop_append() {
+  assert(m_recorder != NULL);
+  m_recorder->flush();
+  delete m_recorder;
+  m_recorder = NULL;
+}
+
+Future Journaler::append(const std::string &tag, const bufferlist &payload_bl) {
+  return m_recorder->append(tag, payload_bl);
+}
+
+void Journaler::create_player(ReplayHandler *replay_handler) {
+  assert(m_player == NULL);
+  m_player = new JournalPlayer(m_data_ioctx, m_object_oid_prefix, m_metadata,
+                               replay_handler);
+}
+
+} // namespace journal
diff --git a/src/journal/Journaler.h b/src/journal/Journaler.h
new file mode 100644 (file)
index 0000000..ff054db
--- /dev/null
@@ -0,0 +1,65 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_JOURNALER_H
+#define CEPH_JOURNAL_JOURNALER_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "include/rados/librados.hpp"
+#include "journal/Future.h"
+#include "journal/Payload.h"
+#include <string>
+#include <map>
+#include "include/assert.h"
+
+class SafeTimer;
+
+namespace journal {
+
+class JournalMetadata;
+class JournalPlayer;
+class JournalRecorder;
+class ReplayHandler;
+
+class Journaler {
+public:
+  Journaler(librados::IoCtx &header_ioctx, librados::IoCtx &data_ioctx,
+            uint64_t journal_id, const std::string &client_id);
+  ~Journaler();
+
+  int init();
+
+  int create(uint8_t order, uint8_t splay_width);
+
+  int register_client(const std::string &description);
+  int unregister_client();
+
+  void start_replay(ReplayHandler *replay_handler);
+  void start_live_replay(ReplayHandler *replay_handler, double interval);
+  bool try_pop_front(Payload *payload);
+  void stop_replay();
+
+  void start_append();
+  Future append(const std::string &tag, const bufferlist &bl);
+  void stop_append();
+
+private:
+  librados::IoCtx m_header_ioctx;
+  librados::IoCtx m_data_ioctx;
+  CephContext *m_cct;
+  std::string m_client_id;
+
+  std::string m_header_oid;
+  std::string m_object_oid_prefix;
+
+  JournalMetadata *m_metadata;
+  JournalPlayer *m_player;
+  JournalRecorder *m_recorder;
+
+  void create_player(ReplayHandler *replay_handler);
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_JOURNALER_H
diff --git a/src/journal/Makefile.am b/src/journal/Makefile.am
new file mode 100644 (file)
index 0000000..8c8bc97
--- /dev/null
@@ -0,0 +1,36 @@
+if ENABLE_CLIENT
+if WITH_RADOS
+
+libjournal_la_SOURCES = \
+       journal/Entry.cc \
+       journal/Future.cc \
+       journal/FutureImpl.cc \
+       journal/Journaler.cc \
+       journal/JournalMetadata.cc \
+       journal/JournalPlayer.cc \
+       journal/JournalRecorder.cc \
+       journal/ObjectPlayer.cc \
+       journal/ObjectRecorder.cc \
+       journal/Payload.cc \
+       journal/PayloadImpl.cc \
+       journal/Utils.cc
+
+noinst_LTLIBRARIES += libjournal.la
+noinst_HEADERS += \
+       journal/Entry.h \
+       journal/Future.h \
+       journal/FutureImpl.h \
+       journal/Journaler.h \
+       journal/JournalMetadata.h \
+       journal/JournalPlayer.h \
+       journal/JournalRecorder.h \
+       journal/ObjectPlayer.h \
+       journal/ObjectRecorder.h \
+       journal/Payload.h \
+       journal/PayloadImpl.h \
+       journal/ReplayHandler.h \
+       journal/Utils.h
+DENCODER_DEPS += libjournal.la
+
+endif # WITH_RADOS
+endif # ENABLE_CLIENT
diff --git a/src/journal/ObjectPlayer.cc b/src/journal/ObjectPlayer.cc
new file mode 100644 (file)
index 0000000..704937d
--- /dev/null
@@ -0,0 +1,244 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/ObjectPlayer.h"
+#include "journal/Utils.h"
+#include "common/Timer.h"
+#include <limits>
+
+#define dout_subsys ceph_subsys_journaler
+#undef dout_prefix
+#define dout_prefix *_dout << "ObjectPlayer: "
+
+namespace journal {
+
+namespace {
+
+void rados_ctx_callback(rados_completion_t c, void *arg) {
+  Context *ctx = reinterpret_cast<Context *>(arg);
+  ctx->complete(rados_aio_get_return_value(c));
+}
+
+} // anonymous namespace
+
+ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
+                           const std::string &object_oid_prefix,
+                           uint64_t object_num, SafeTimer &timer,
+                           Mutex &timer_lock, uint8_t order)
+  : RefCountedObject(NULL, 0), m_object_num(object_num),
+    m_oid(utils::get_object_name(object_oid_prefix, m_object_num)),
+    m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), m_order(order),
+    m_watch_interval(0), m_watch_task(NULL), m_watch_fetch(this),
+    m_lock(utils::unique_lock_name("ObjectPlayer::m_lock", this)),
+    m_fetch_in_progress(false), m_read_off(0), m_watch_ctx(NULL),
+    m_watch_ctx_in_progress(false) {
+  m_ioctx.dup(ioctx);
+  m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
+}
+
+ObjectPlayer::~ObjectPlayer() {
+  {
+    Mutex::Locker locker(m_lock);
+    assert(m_watch_ctx == NULL);
+  }
+}
+
+void ObjectPlayer::fetch(Context *on_finish) {
+  ldout(m_cct, 10) << __func__ << ": " << m_oid << dendl;
+
+  Mutex::Locker locker(m_lock);
+  m_fetch_in_progress = true;
+
+  C_Fetch *context = new C_Fetch(this, on_finish);
+  librados::ObjectReadOperation op;
+  op.read(m_read_off, 2 << m_order, &context->read_bl, NULL);
+
+  librados::AioCompletion *rados_completion =
+    librados::Rados::aio_create_completion(context, rados_ctx_callback, NULL);
+  int r = m_ioctx.aio_operate(m_oid, rados_completion, &op, 0, NULL);
+  assert(r == 0);
+  rados_completion->release();
+}
+
+void ObjectPlayer::watch(Context *on_fetch, double interval) {
+  ldout(m_cct, 20) << __func__ << ": " << m_oid << " watch" << dendl;
+  {
+    Mutex::Locker locker(m_lock);
+    assert(m_watch_ctx == NULL);
+    m_watch_ctx = on_fetch;
+  }
+  {
+    Mutex::Locker locker(m_timer_lock);
+    m_watch_interval = interval;
+  }
+  schedule_watch();
+}
+
+void ObjectPlayer::unwatch() {
+  ldout(m_cct, 20) << __func__ << ": " << m_oid << " unwatch" << dendl;
+  {
+    Mutex::Locker locker(m_lock);
+    while (m_watch_ctx_in_progress) {
+      m_watch_ctx_cond.Wait(m_lock);
+    }
+    delete m_watch_ctx;
+    m_watch_ctx = NULL;
+  }
+  cancel_watch();
+}
+
+void ObjectPlayer::front(Entry *entry) const {
+  Mutex::Locker locker(m_lock);
+  assert(!m_entries.empty());
+  *entry = m_entries.front();
+}
+
+void ObjectPlayer::pop_front() {
+  Mutex::Locker locker(m_lock);
+  assert(!m_entries.empty());
+  m_entries.pop_front();
+}
+
+int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl) {
+  ldout(m_cct, 10) << __func__ << ": " << m_oid << ", r=" << r << ", len="
+                   << bl.length() << dendl;
+
+  m_fetch_in_progress = false;
+  if (r < 0) {
+    return r;
+  }
+  if (bl.length() == 0) {
+    return -ENOENT;
+  }
+
+  Mutex::Locker locker(m_lock);
+  m_read_bl.append(bl);
+
+  bool invalid = false;
+  uint32_t invalid_start_off = 0;
+
+  bufferlist::iterator iter(&m_read_bl, m_read_off);
+  while (!iter.end()) {
+    uint32_t bytes_needed;
+    if (!Entry::is_readable(iter, &bytes_needed)) {
+      if (bytes_needed != 0) {
+        invalid_start_off = iter.get_off();
+        invalid = true;
+        lderr(m_cct) << ": partial record at offset " << iter.get_off()
+                     << dendl;
+        break;
+      }
+
+      if (!invalid) {
+        invalid_start_off = iter.get_off();
+        invalid = true;
+        lderr(m_cct) << ": detected corrupt journal entry at offset "
+                     << invalid_start_off << dendl;
+      }
+      ++iter;
+      continue;
+    }
+
+    if (invalid) {
+      uint32_t invalid_end_off = iter.get_off();
+      lderr(m_cct) << ": corruption range [" << invalid_start_off
+                   << ", " << invalid_end_off << ")" << dendl;
+      m_invalid_ranges.insert(invalid_start_off, invalid_end_off);
+      invalid = false;
+    }
+
+    Entry entry;
+    ::decode(entry, iter);
+    ldout(m_cct, 20) << ": " << entry << " decoded" << dendl;
+
+    EntryKey entry_key(std::make_pair(entry.get_tag(), entry.get_tid()));
+    if (m_entry_keys.find(entry_key) == m_entry_keys.end()) {
+      m_entry_keys[entry_key] = m_entries.insert(m_entries.end(), entry);
+    } else {
+      ldout(m_cct, 10) << ": " << entry << " is duplicate, replacing" << dendl;
+      *m_entry_keys[entry_key] = entry;
+    }
+  }
+
+  m_read_off = m_read_bl.length();
+  if (invalid) {
+    uint32_t invalid_end_off = m_read_bl.length();
+    lderr(m_cct) << ": corruption range [" << invalid_start_off
+                 << ", " << invalid_end_off << ")" << dendl;
+    m_invalid_ranges.insert(invalid_start_off, invalid_end_off);
+  }
+
+  if (!m_invalid_ranges.empty()) {
+    r = -EINVAL;
+  }
+  return r;
+}
+
+void ObjectPlayer::schedule_watch() {
+  ldout(m_cct, 20) << __func__ << ": " << m_oid << " scheduling watch" << dendl;
+  Mutex::Locker locker(m_timer_lock);
+  assert(m_watch_task == NULL);
+  m_watch_task = new C_WatchTask(this);
+  m_timer.add_event_after(m_watch_interval, m_watch_task);
+}
+
+void ObjectPlayer::cancel_watch() {
+  ldout(m_cct, 20) << __func__ << ": " << m_oid << " cancelling watch" << dendl;
+  Mutex::Locker locker(m_timer_lock);
+  if (m_watch_task != NULL) {
+    m_timer.cancel_event(m_watch_task);
+    m_watch_task = NULL;
+  }
+}
+
+void ObjectPlayer::handle_watch_task() {
+  ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl;
+  {
+    Mutex::Locker locker(m_timer_lock);
+    m_watch_task = NULL;
+  }
+  fetch(&m_watch_fetch);
+}
+
+void ObjectPlayer::handle_watch_fetched(int r) {
+  ldout(m_cct, 10) << __func__ << ": " << m_oid << " poll complete, r=" << r
+                   << dendl;
+  if (r == -ENOENT) {
+    schedule_watch();
+    return;
+  }
+
+  Context *on_finish;
+  {
+    Mutex::Locker locker(m_lock);
+    m_watch_ctx_in_progress = true;
+    on_finish = m_watch_ctx;
+    m_watch_ctx = NULL;
+  }
+
+  if (on_finish != NULL) {
+    on_finish->complete(r);
+  }
+
+  {
+    Mutex::Locker locker(m_lock);
+    m_watch_ctx_in_progress = false;
+    m_watch_ctx_cond.Signal();
+  }
+}
+
+void ObjectPlayer::C_Fetch::finish(int r) {
+  r = object_player->handle_fetch_complete(r, read_bl);
+  on_finish->complete(r);
+  object_player->put();
+}
+
+void ObjectPlayer::C_WatchTask::finish(int r) {
+  object_player->handle_watch_task();
+}
+
+void ObjectPlayer::C_WatchFetch::finish(int r) {
+  object_player->handle_watch_fetched(r);
+}
+
+} // namespace journal
diff --git a/src/journal/ObjectPlayer.h b/src/journal/ObjectPlayer.h
new file mode 100644 (file)
index 0000000..ff85575
--- /dev/null
@@ -0,0 +1,141 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_OBJECT_PLAYER_H
+#define CEPH_JOURNAL_OBJECT_PLAYER_H
+
+#include "include/Context.h"
+#include "include/hash_namespace.h"
+#include "include/interval_set.h"
+#include "include/rados/librados.hpp"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "common/RefCountedObj.h"
+#include "journal/Entry.h"
+#include <list>
+#include <string>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/noncopyable.hpp>
+#include <boost/unordered_map.hpp>
+#include "include/assert.h"
+
+class SafeTimer;
+
+namespace journal {
+
+class ObjectPlayer;
+typedef boost::intrusive_ptr<ObjectPlayer> ObjectPlayerPtr;
+
+class ObjectPlayer : public RefCountedObject {
+public:
+  typedef std::list<Entry> Entries;
+  typedef interval_set<uint64_t> InvalidRanges;
+
+  ObjectPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
+               uint64_t object_num, SafeTimer &timer, Mutex &timer_lock,
+               uint8_t order);
+  ~ObjectPlayer();
+
+  inline const std::string &get_oid() const {
+    return m_oid;
+  }
+  inline uint64_t get_object_number() const {
+    return m_object_num;
+  }
+
+  void fetch(Context *on_finish);
+  void watch(Context *on_fetch, double interval);
+  void unwatch();
+
+  inline bool is_fetch_in_progress() const {
+    Mutex::Locker locker(m_lock);
+    return m_fetch_in_progress;
+  }
+
+  void front(Entry *entry) const;
+  void pop_front();
+  inline bool empty() const {
+    Mutex::Locker locker(m_lock);
+    return m_entries.empty();
+  }
+
+  inline void get_entries(Entries *entries) {
+    Mutex::Locker locker(m_lock);
+    *entries = m_entries;
+  }
+  inline void get_invalid_ranges(InvalidRanges *invalid_ranges) {
+    Mutex::Locker locker(m_lock);
+    *invalid_ranges = m_invalid_ranges;
+  }
+
+private:
+  typedef std::pair<std::string, uint64_t> EntryKey;
+  typedef boost::unordered_map<EntryKey, Entries::iterator> EntryKeys;
+
+  struct C_Fetch : public Context {
+    ObjectPlayer *object_player;
+    Context *on_finish;
+    bufferlist read_bl;
+    C_Fetch(ObjectPlayer *o, Context *ctx)
+      : object_player(o), on_finish(ctx) {
+      object_player->get();
+    }
+    virtual void finish(int r);
+  };
+  struct C_WatchTask : public Context {
+    ObjectPlayer *object_player;
+    C_WatchTask(ObjectPlayer *o) : object_player(o) {
+      object_player->get();
+    }
+    virtual void finish(int r);
+  };
+  struct C_WatchFetch : public Context {
+    ObjectPlayer *object_player;
+    C_WatchFetch(ObjectPlayer *o) : object_player(o) {
+    }
+    virtual void complete(int r) {
+      finish(r);
+      object_player->put();
+    }
+    virtual void finish(int r);
+  };
+
+  librados::IoCtx m_ioctx;
+  uint64_t m_object_num;
+  std::string m_oid;
+  CephContext *m_cct;
+
+  SafeTimer &m_timer;
+  Mutex &m_timer_lock;
+
+  double m_fetch_interval;
+  uint8_t m_order;
+
+  double m_watch_interval;
+  Context *m_watch_task;
+  C_WatchFetch m_watch_fetch;
+
+  mutable Mutex m_lock;
+  bool m_fetch_in_progress;
+  bufferlist m_read_bl;
+  uint32_t m_read_off;
+
+  Entries m_entries;
+  EntryKeys m_entry_keys;
+  InvalidRanges m_invalid_ranges;
+
+  Context *m_watch_ctx;
+  Cond m_watch_ctx_cond;
+  bool m_watch_ctx_in_progress;
+
+  int handle_fetch_complete(int r, const bufferlist &bl);
+
+  void schedule_watch();
+  void cancel_watch();
+  void handle_watch_task();
+  void handle_watch_fetched(int r);
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_OBJECT_PLAYER_H
diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc
new file mode 100644 (file)
index 0000000..2677b28
--- /dev/null
@@ -0,0 +1,307 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/ObjectRecorder.h"
+#include "journal/Utils.h"
+#include "include/assert.h"
+#include "common/Timer.h"
+#include "cls/journal/cls_journal_client.h"
+
+#define dout_subsys ceph_subsys_journaler
+#undef dout_prefix
+#define dout_prefix *_dout << "ObjectRecorder: "
+
+using namespace cls::journal;
+
+namespace journal {
+
+namespace {
+
+void rados_ctx_callback(rados_completion_t c, void *arg) {
+  Context *ctx = reinterpret_cast<Context *>(arg);
+  ctx->complete(rados_aio_get_return_value(c));
+}
+
+} // anonymous
+
+ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
+                               uint64_t object_number,
+                               SafeTimer &timer, Mutex &timer_lock,
+                               OverflowHandler *overflow_handler, uint8_t order,
+                               uint32_t flush_interval, uint64_t flush_bytes,
+                               double flush_age)
+  : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number),
+    m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock),
+    m_overflow_handler(overflow_handler), m_order(order),
+    m_soft_max_size(1 << m_order), m_flush_interval(flush_interval),
+    m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_flush_handler(this),
+    m_append_task(NULL),
+    m_lock(utils::unique_lock_name("ObjectRecorder::m_lock", this)),
+    m_append_tid(0), m_pending_bytes(0), m_size(0), m_overflowed(false),
+    m_object_closed(false) {
+  m_ioctx.dup(ioctx);
+  m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
+  assert(m_overflow_handler != NULL);
+}
+
+ObjectRecorder::~ObjectRecorder() {
+  cancel_append_task();
+  assert(m_append_buffers.empty());
+  assert(m_in_flight_appends.empty());
+}
+
+bool ObjectRecorder::append(const AppendBuffers &append_buffers) {
+  FutureImplPtr last_flushed_future;
+  {
+    Mutex::Locker locker(m_lock);
+    for (AppendBuffers::const_iterator iter = append_buffers.begin();
+         iter != append_buffers.end(); ++iter) {
+      if (append(*iter)) {
+        last_flushed_future = iter->first;
+      }
+    }
+  }
+
+  if (last_flushed_future) {
+    flush(last_flushed_future);
+  }
+  return (m_size + m_pending_bytes >= m_soft_max_size);
+}
+
+void ObjectRecorder::flush() {
+  ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
+
+  Mutex::Locker locker(m_lock);
+  if (flush_appends(true)) {
+    cancel_append_task();
+  }
+}
+
+void ObjectRecorder::flush(const FutureImplPtr &future) {
+  ldout(m_cct, 20) << __func__ << ": " << m_oid << " flushing " << *future
+                   << dendl;
+
+  Mutex::Locker locker(m_lock);
+  if (future->get_flush_handler().get() != &m_flush_handler) {
+    // if we don't own this future, re-issue the flush so that it hits the
+    // correct journal object owner
+    future->flush();
+    return;
+  } else if (future->is_flush_in_progress()) {
+    return;
+  }
+
+  assert(!m_object_closed);
+  AppendBuffers::iterator it;
+  for (it = m_append_buffers.begin(); it != m_append_buffers.end(); ++it) {
+    if (it->first == future) {
+      break;
+    }
+  }
+  assert(it != m_append_buffers.end());
+  ++it;
+
+  AppendBuffers flush_buffers;
+  flush_buffers.splice(flush_buffers.end(), m_append_buffers,
+                       m_append_buffers.begin(), it);
+  send_appends(&flush_buffers);
+}
+
+void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) {
+  ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
+
+  Mutex::Locker locker(m_lock);
+  assert(m_in_flight_appends.empty());
+  assert(m_object_closed || m_overflowed);
+  append_buffers->splice(append_buffers->end(), m_append_buffers,
+                         m_append_buffers.begin(), m_append_buffers.end());
+}
+
+bool ObjectRecorder::close_object() {
+  ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
+
+  Mutex::Locker locker(m_lock);
+  m_object_closed = true;
+  if (flush_appends(true)) {
+    cancel_append_task();
+  }
+  return m_in_flight_appends.empty();
+}
+
+void ObjectRecorder::handle_append_task() {
+  {
+    Mutex::Locker locker(m_lock);
+    flush_appends(true);
+  }
+
+  {
+    Mutex::Locker locker(m_timer_lock);
+    m_append_task = NULL;
+    put();
+  }
+}
+
+void ObjectRecorder::cancel_append_task() {
+  Mutex::Locker locker(m_timer_lock);
+  if (m_append_task != NULL) {
+    m_timer.cancel_event(m_append_task);
+    m_append_task = NULL;
+    put();
+  }
+}
+
+void ObjectRecorder::schedule_append_task() {
+  Mutex::Locker locker(m_timer_lock);
+  if (m_append_task == NULL && m_flush_age > 0) {
+    get();
+    m_append_task = new C_AppendTask(this);
+    m_timer.add_event_after(m_flush_age, m_append_task);
+  }
+}
+
+bool ObjectRecorder::append(const AppendBuffer &append_buffer) {
+  assert(m_lock.is_locked());
+
+  bool flush_requested = append_buffer.first->attach(&m_flush_handler);
+  m_append_buffers.push_back(append_buffer);
+  m_pending_bytes += append_buffer.second.length();
+
+  if (flush_appends(false)) {
+    cancel_append_task();
+  } else {
+    schedule_append_task();
+  }
+  return flush_requested;
+}
+
+bool ObjectRecorder::flush_appends(bool force) {
+  assert(m_lock.is_locked());
+  if (m_object_closed || m_overflowed) {
+    return true;
+  }
+
+  if (m_append_buffers.empty() ||
+      (!force &&
+       m_size + m_pending_bytes < m_soft_max_size &&
+       (m_flush_interval > 0 && m_append_buffers.size() < m_flush_interval) &&
+       (m_flush_bytes > 0 && m_pending_bytes < m_flush_bytes))) {
+    return false;
+  }
+
+  m_pending_bytes = 0;
+  AppendBuffers append_buffers;
+  append_buffers.swap(m_append_buffers);
+  send_appends(&append_buffers);
+  return true;
+}
+
+void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
+  ldout(m_cct, 10) << __func__ << ": " << m_oid << " tid=" << tid
+                   << ", r=" << r << dendl;
+
+  Mutex::Locker locker(m_lock);
+  InFlightAppends::iterator iter = m_in_flight_appends.find(tid);
+  if (iter == m_in_flight_appends.end()) {
+    // must have seen an overflow on a previous append op
+    assert(m_overflowed);
+    return;
+  } else if (r == -EOVERFLOW) {
+    m_overflowed = true;
+    append_overflowed(tid);
+    return;
+  }
+
+  assert(!m_overflowed || r != 0);
+  AppendBuffers &append_buffers = iter->second;
+  assert(!append_buffers.empty());
+
+  // Flag the associated futures as complete.
+  for (AppendBuffers::iterator buf_it = append_buffers.begin();
+       buf_it != append_buffers.end(); ++buf_it) {
+    ldout(m_cct, 20) << __func__ << ": " << *buf_it->first << " marked safe"
+                     << dendl;
+    buf_it->first->safe(r);
+  }
+  m_in_flight_appends.erase(iter);
+
+  if (m_in_flight_appends.empty() && m_object_closed) {
+    // all remaining unsent appends should be redirected to new object
+    notify_overflow();
+  }
+}
+
+void ObjectRecorder::append_overflowed(uint64_t tid) {
+  ldout(m_cct, 10) << __func__ << ": " << m_oid << " append overflowed"
+                   << dendl;
+
+  assert(m_lock.is_locked());
+  assert(!m_in_flight_appends.empty());
+  assert(m_in_flight_appends.begin()->first == tid);
+
+  cancel_append_task();
+
+  InFlightAppends in_flight_appends;
+  in_flight_appends.swap(m_in_flight_appends);
+
+  AppendBuffers restart_append_buffers;
+  for (InFlightAppends::iterator it = in_flight_appends.begin();
+       it != in_flight_appends.end(); ++it) {
+    restart_append_buffers.insert(restart_append_buffers.end(),
+                                  it->second.begin(), it->second.end());
+  }
+
+  restart_append_buffers.splice(restart_append_buffers.end(),
+                                m_append_buffers,
+                                m_append_buffers.begin(),
+                                m_append_buffers.end());
+  restart_append_buffers.swap(m_append_buffers);
+  notify_overflow();
+}
+
+void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
+  assert(m_lock.is_locked());
+  assert(!append_buffers->empty());
+
+  uint64_t append_tid = m_append_tid++;
+  ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid="
+                   << append_tid << dendl;
+  C_AppendFlush *append_flush = new C_AppendFlush(this, append_tid);
+
+  librados::ObjectWriteOperation op;
+  client::guard_append(&op, m_soft_max_size);
+
+  for (AppendBuffers::iterator it = append_buffers->begin();
+       it != append_buffers->end(); ++it) {
+    ldout(m_cct, 20) << __func__ << ": flushing " << *it->first
+                     << dendl;
+    it->first->set_flush_in_progress();
+    op.append(it->second);
+    m_size += it->second.length();
+  }
+  m_in_flight_appends[append_tid].swap(*append_buffers);
+
+  librados::AioCompletion *rados_completion =
+    librados::Rados::aio_create_completion(append_flush, NULL,
+                                           rados_ctx_callback);
+  int r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
+  assert(r == 0);
+  rados_completion->release();
+}
+
+void ObjectRecorder::notify_overflow() {
+  assert(m_lock.is_locked());
+
+  for (AppendBuffers::const_iterator it = m_append_buffers.begin();
+       it != m_append_buffers.end(); ++it) {
+    ldout(m_cct, 20) << __func__ << ": overflowed " << *it->first
+                     << dendl;
+    it->first->detach();
+  }
+
+  // TODO need to delay completion until after aio_notify completes
+  m_lock.Unlock();
+  m_overflow_handler->overflow(this);
+  m_lock.Lock();
+}
+
+} // namespace journal
diff --git a/src/journal/ObjectRecorder.h b/src/journal/ObjectRecorder.h
new file mode 100644 (file)
index 0000000..d7d2ed1
--- /dev/null
@@ -0,0 +1,149 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_OBJECT_RECORDER_H
+#define CEPH_JOURNAL_OBJECT_RECORDER_H
+
+#include "include/Context.h"
+#include "include/rados/librados.hpp"
+#include "common/Mutex.h"
+#include "common/RefCountedObj.h"
+#include "journal/FutureImpl.h"
+#include <list>
+#include <boost/intrusive_ptr.hpp>
+#include <boost/noncopyable.hpp>
+#include "include/assert.h"
+
+class SafeTimer;
+
+namespace journal {
+
+class ObjectRecorder;
+typedef boost::intrusive_ptr<ObjectRecorder> ObjectRecorderPtr;
+
+typedef std::pair<FutureImplPtr, bufferlist> AppendBuffer;
+typedef std::list<AppendBuffer> AppendBuffers;
+
+class ObjectRecorder : public RefCountedObject, boost::noncopyable {
+public:
+  struct OverflowHandler {
+    virtual ~OverflowHandler() {}
+    virtual void overflow(ObjectRecorder *object_recorder) = 0;
+  };
+
+  ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
+                 uint64_t object_number, SafeTimer &timer, Mutex &timer_lock,
+                 OverflowHandler *overflow_handler, uint8_t order,
+                 uint32_t flush_interval, uint64_t flush_bytes,
+                 double flush_age);
+  ~ObjectRecorder();
+
+  inline uint64_t get_object_number() const {
+    return m_object_number;
+  }
+  inline const std::string &get_oid() const {
+    return m_oid;
+  }
+
+  bool append(const AppendBuffers &append_buffers);
+  void flush();
+  void flush(const FutureImplPtr &future);
+
+  void claim_append_buffers(AppendBuffers *append_buffers);
+  bool close_object();
+
+  inline CephContext *cct() const {
+    return m_cct;
+  }
+
+  inline size_t get_pending_appends() const {
+    Mutex::Locker locker(m_lock);
+    return m_append_buffers.size();
+  }
+
+private:
+  typedef std::map<uint64_t, AppendBuffers> InFlightAppends;
+
+  struct FlushHandler : public FutureImpl::FlushHandler {
+    ObjectRecorder *object_recorder;
+    FlushHandler(ObjectRecorder *o) : object_recorder(o) {}
+    virtual void get() {
+      object_recorder->get();
+    }
+    virtual void put() {
+      object_recorder->put();
+    }
+    virtual void flush(const FutureImplPtr &future) {
+      object_recorder->flush(future);
+    }
+  };
+  struct C_AppendTask : public Context {
+    ObjectRecorder *object_recorder;
+    C_AppendTask(ObjectRecorder *o) : object_recorder(o) {}
+    virtual void complete(int r) {
+      object_recorder->handle_append_task();
+    }
+    virtual void finish(int r) {}
+  };
+  struct C_AppendFlush : public Context {
+    ObjectRecorder *object_recorder;
+    uint64_t tid;
+    C_AppendFlush(ObjectRecorder *o, uint64_t _tid)
+        : object_recorder(o), tid(_tid) {
+      object_recorder->get();
+    }
+    virtual void finish(int r) {
+      object_recorder->handle_append_flushed(tid, r);
+      object_recorder->put();
+    }
+  };
+
+  librados::IoCtx m_ioctx;
+  std::string m_oid;
+  uint64_t m_object_number;
+  CephContext *m_cct;
+
+  SafeTimer &m_timer;
+  Mutex &m_timer_lock;
+
+  OverflowHandler *m_overflow_handler;
+
+  uint8_t m_order;
+  uint64_t m_soft_max_size;
+
+  uint32_t m_flush_interval;
+  uint64_t m_flush_bytes;
+  double m_flush_age;
+
+  FlushHandler m_flush_handler;
+
+  C_AppendTask *m_append_task;
+
+  mutable Mutex m_lock;
+  AppendBuffers m_append_buffers;
+  uint64_t m_append_tid;
+  uint32_t m_pending_bytes;
+
+  InFlightAppends m_in_flight_appends;
+  uint64_t m_size;
+  bool m_overflowed;
+  bool m_object_closed;
+
+  bufferlist m_prefetch_bl;
+
+  void handle_append_task();
+  void cancel_append_task();
+  void schedule_append_task();
+
+  bool append(const AppendBuffer &append_buffer);
+  bool flush_appends(bool force);
+  void handle_append_flushed(uint64_t tid, int r);
+  void append_overflowed(uint64_t tid);
+  void send_appends(AppendBuffers *append_buffers);
+
+  void notify_overflow();
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_OBJECT_RECORDER_H
diff --git a/src/journal/Payload.cc b/src/journal/Payload.cc
new file mode 100644 (file)
index 0000000..8015528
--- /dev/null
@@ -0,0 +1,21 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/Payload.h"
+#include "journal/PayloadImpl.h"
+
+namespace journal {
+
+const bufferlist &Payload::get_data() const {
+  return m_payload_impl->get_data();
+}
+
+void intrusive_ptr_add_ref(PayloadImpl *p) {
+  p->get();
+}
+
+void intrusive_ptr_release(PayloadImpl *p) {
+  p->put();
+}
+
+} // namespace journal
diff --git a/src/journal/Payload.h b/src/journal/Payload.h
new file mode 100644 (file)
index 0000000..a878111
--- /dev/null
@@ -0,0 +1,38 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_PAYLOAD_H
+#define CEPH_JOURNAL_PAYLOAD_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include <boost/intrusive_ptr.hpp>
+
+namespace journal {
+
+class PayloadImpl;
+
+class Payload {
+public:
+  typedef boost::intrusive_ptr<PayloadImpl> PayloadImplPtr;
+
+  Payload(const PayloadImplPtr &payload) : m_payload_impl(payload) {}
+
+  const bufferlist &get_data() const;
+
+private:
+  friend class Journaler;
+
+  inline PayloadImplPtr get_payload_impl() const {
+    return m_payload_impl;
+  }
+
+  PayloadImplPtr m_payload_impl;
+};
+
+void intrusive_ptr_add_ref(PayloadImpl *p);
+void intrusive_ptr_release(PayloadImpl *p);
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_PAYLOAD_H
diff --git a/src/journal/PayloadImpl.cc b/src/journal/PayloadImpl.cc
new file mode 100644 (file)
index 0000000..6bb134f
--- /dev/null
@@ -0,0 +1,22 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/PayloadImpl.h"
+
+namespace journal {
+
+PayloadImpl::PayloadImpl(const bufferlist &data,
+                         const ObjectSetPosition &object_set_position)
+  : m_data(data), m_object_set_position(object_set_position) {
+}
+
+const bufferlist &PayloadImpl::get_data() const {
+  return m_data;
+}
+
+const PayloadImpl::ObjectSetPosition &
+PayloadImpl::get_object_set_position() const {
+  return m_object_set_position;
+}
+
+} // namespace journal
diff --git a/src/journal/PayloadImpl.h b/src/journal/PayloadImpl.h
new file mode 100644 (file)
index 0000000..9c4d0b5
--- /dev/null
@@ -0,0 +1,37 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_PAYLOAD_IMPL_H
+#define CEPH_JOURNAL_PAYLOAD_IMPL_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "common/RefCountedObj.h"
+#include "cls/journal/cls_journal_types.h"
+#include <boost/noncopyable.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include "include/assert.h"
+
+namespace journal {
+
+class PayloadImpl;
+typedef boost::intrusive_ptr<PayloadImpl> PayloadImplPtr;
+
+class PayloadImpl : public RefCountedObject, boost::noncopyable {
+public:
+  typedef cls::journal::ObjectSetPosition ObjectSetPosition;
+
+  PayloadImpl(const bufferlist &data,
+              const ObjectSetPosition &object_set_position);
+
+  const bufferlist &get_data() const;
+  const ObjectSetPosition &get_object_set_position() const;
+
+private:
+  bufferlist m_data;
+  ObjectSetPosition m_object_set_position;
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_PAYLOAD_IMPL_H
diff --git a/src/journal/ReplayHandler.h b/src/journal/ReplayHandler.h
new file mode 100644 (file)
index 0000000..4712684
--- /dev/null
@@ -0,0 +1,18 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_REPLAY_HANDLER_H
+#define CEPH_JOURNAL_REPLAY_HANDLER_H
+
+namespace journal {
+
+struct ReplayHandler  {
+  virtual ~ReplayHandler() {}
+
+  virtual void handle_entries_available() = 0;
+  virtual void handle_error(int r) = 0;
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_REPLAY_HANDLER_H
diff --git a/src/journal/Utils.cc b/src/journal/Utils.cc
new file mode 100644 (file)
index 0000000..eea6136
--- /dev/null
@@ -0,0 +1,19 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/Utils.h"
+#include "include/stringify.h"
+
+namespace journal {
+namespace utils {
+
+std::string get_object_name(const std::string &prefix, uint64_t number) {
+  return prefix + stringify(number);
+}
+
+std::string unique_lock_name(const std::string &name, void *address) {
+  return name + " (" + stringify(address) + ")";
+}
+
+} // namespace utils
+} // namespace journal
diff --git a/src/journal/Utils.h b/src/journal/Utils.h
new file mode 100644 (file)
index 0000000..1e449b1
--- /dev/null
@@ -0,0 +1,20 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_UTILS_H
+#define CEPH_JOURNAL_UTILS_H
+
+#include "include/int_types.h"
+#include <string>
+
+namespace journal {
+namespace utils {
+
+std::string get_object_name(const std::string &prefix, uint64_t number);
+
+std::string unique_lock_name(const std::string &name, void *address);
+
+} // namespace utils
+} // namespace journal
+
+#endif // CEPH_JOURNAL_UTILS_H