]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
tests: initial unit tests for new generic journal library
authorJason Dillaman <dillaman@redhat.com>
Wed, 10 Jun 2015 01:37:39 +0000 (21:37 -0400)
committerJason Dillaman <dillaman@redhat.com>
Fri, 6 Nov 2015 01:42:41 +0000 (20:42 -0500)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
13 files changed:
src/test/Makefile-client.am
src/test/Makefile.am
src/test/journal/RadosTestFixture.cc [new file with mode: 0644]
src/test/journal/RadosTestFixture.h [new file with mode: 0644]
src/test/journal/test_Entry.cc [new file with mode: 0644]
src/test/journal/test_FutureImpl.cc [new file with mode: 0644]
src/test/journal/test_JournalMetadata.cc [new file with mode: 0644]
src/test/journal/test_JournalPlayer.cc [new file with mode: 0644]
src/test/journal/test_JournalRecorder.cc [new file with mode: 0644]
src/test/journal/test_Journaler.cc [new file with mode: 0644]
src/test/journal/test_ObjectPlayer.cc [new file with mode: 0644]
src/test/journal/test_ObjectRecorder.cc [new file with mode: 0644]
src/test/journal/test_main.cc [new file with mode: 0644]

index 41b816fc2849032374db3e350be62082d374f171..e64ff62c387977b094e7188e46198df77e9c2766 100644 (file)
@@ -300,6 +300,23 @@ noinst_HEADERS += \
        test/librados_test_stub/TestIoCtxImpl.h
 noinst_LTLIBRARIES += librados_test_stub.la
 
+unittest_journal_SOURCES = \
+       test/journal/test_main.cc \
+        test/journal/test_Entry.cc \
+       test/journal/test_FutureImpl.cc \
+       test/journal/test_Journaler.cc \
+       test/journal/test_JournalMetadata.cc \
+       test/journal/test_JournalPlayer.cc \
+       test/journal/test_JournalRecorder.cc \
+       test/journal/test_ObjectPlayer.cc \
+       test/journal/test_ObjectRecorder.cc \
+       test/journal/RadosTestFixture.cc
+unittest_journal_CXXFLAGS = $(UNITTEST_CXXFLAGS)
+unittest_journal_LDADD = \
+       libjournal.la libcls_journal_client.la \
+       librados_test_stub.la librados_internal.la \
+       $(UNITTEST_LDADD) $(CEPH_GLOBAL) $(RADOS_TEST_LDADD)
+check_TESTPROGRAMS += unittest_journal
 
 if WITH_RBD
 ceph_smalliobenchrbd_SOURCES = \
index 9b60debfd8daa12f58cb23f80c73b22ff63f1735..7bbab2891206abc64f7da3235f4eacd6ff16e0d6 100644 (file)
@@ -467,4 +467,5 @@ noinst_HEADERS += \
        test/system/st_rados_watch.h \
        test/system/systest_runnable.h \
        test/system/systest_settings.h \
-       test/unit.h
+       test/unit.h \
+       test/journal/RadosTestFixture.h
diff --git a/src/test/journal/RadosTestFixture.cc b/src/test/journal/RadosTestFixture.cc
new file mode 100644 (file)
index 0000000..382fdea
--- /dev/null
@@ -0,0 +1,74 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/journal/RadosTestFixture.h"
+#include "cls/journal/cls_journal_client.h"
+#include "include/stringify.h"
+
+RadosTestFixture::RadosTestFixture()
+  : m_timer_lock("m_timer_lock"), m_timer(NULL) {
+}
+
+void RadosTestFixture::SetUpTestCase() {
+  _pool_name = get_temp_pool_name();
+  ASSERT_EQ("", create_one_pool_pp(_pool_name, _rados));
+}
+
+void RadosTestFixture::TearDownTestCase() {
+  ASSERT_EQ(0, destroy_one_pool_pp(_pool_name, _rados));
+}
+
+std::string RadosTestFixture::get_temp_oid() {
+  ++_oid_number;
+  return "oid" + stringify(_oid_number);
+}
+
+void RadosTestFixture::SetUp() {
+  ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), m_ioctx));
+  m_timer = new SafeTimer(reinterpret_cast<CephContext*>(m_ioctx.cct()),
+                          m_timer_lock, false);
+  m_timer->init();
+}
+
+void RadosTestFixture::TearDown() {
+  {
+    Mutex::Locker locker(m_timer_lock);
+    m_timer->shutdown();
+  }
+  delete m_timer;
+}
+
+int RadosTestFixture::create(const std::string &oid, uint8_t order,
+                             uint8_t splay_width) {
+  return cls::journal::client::create(m_ioctx, oid, order, splay_width);
+}
+
+int RadosTestFixture::append(const std::string &oid, const bufferlist &bl) {
+  librados::ObjectWriteOperation op;
+  op.append(bl);
+  return m_ioctx.operate(oid, &op);
+}
+
+int RadosTestFixture::client_register(const std::string &oid,
+                                      const std::string &id,
+                                      const std::string &description) {
+  return cls::journal::client::client_register(m_ioctx, oid, id, description);
+}
+
+int RadosTestFixture::client_commit(const std::string &oid,
+                                    const std::string &id,
+                                    const cls::journal::ObjectSetPosition &commit_position) {
+  librados::ObjectWriteOperation op;
+  cls::journal::client::client_commit(&op, id, commit_position);
+  return m_ioctx.operate(oid, &op);
+}
+
+bufferlist RadosTestFixture::create_payload(const std::string &payload) {
+  bufferlist bl;
+  bl.append(payload);
+  return bl;
+}
+
+std::string RadosTestFixture::_pool_name;
+librados::Rados RadosTestFixture::_rados;
+uint64_t RadosTestFixture::_oid_number = 0;
diff --git a/src/test/journal/RadosTestFixture.h b/src/test/journal/RadosTestFixture.h
new file mode 100644 (file)
index 0000000..a58bdc2
--- /dev/null
@@ -0,0 +1,39 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/librados/test.h"
+#include "common/Mutex.h"
+#include "common/Timer.h"
+#include "cls/journal/cls_journal_types.h"
+#include "gtest/gtest.h"
+
+class RadosTestFixture : public ::testing::Test {
+public:
+  static void SetUpTestCase();
+  static void TearDownTestCase();
+
+  static std::string get_temp_oid();
+
+  RadosTestFixture();
+  virtual void SetUp();
+  virtual void TearDown();
+
+  int create(const std::string &oid, uint8_t order, uint8_t splay_width);
+  int append(const std::string &oid, const bufferlist &bl);
+
+  int client_register(const std::string &oid, const std::string &id,
+                      const std::string &description);
+  int client_commit(const std::string &oid, const std::string &id,
+                    const cls::journal::ObjectSetPosition &commit_position);
+
+  bufferlist create_payload(const std::string &payload);
+
+  static std::string _pool_name;
+  static librados::Rados _rados;
+  static uint64_t _oid_number;
+
+  librados::IoCtx m_ioctx;
+
+  Mutex m_timer_lock;
+  SafeTimer *m_timer;
+};
diff --git a/src/test/journal/test_Entry.cc b/src/test/journal/test_Entry.cc
new file mode 100644 (file)
index 0000000..e042978
--- /dev/null
@@ -0,0 +1,96 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/Entry.h"
+#include "gtest/gtest.h"
+
+class TestEntry : public ::testing::Test {
+};
+
+TEST_F(TestEntry, DefaultConstructor) {
+  journal::Entry entry;
+  ASSERT_EQ(0U, entry.get_tid());
+  ASSERT_EQ("", entry.get_tag());
+
+  bufferlist data(entry.get_data());
+  bufferlist expected_data;
+  ASSERT_TRUE(data.contents_equal(expected_data));
+}
+
+TEST_F(TestEntry, Constructor) {
+  bufferlist data;
+  data.append("data");
+  journal::Entry entry("tag", 123, data);
+
+  data.clear();
+  data = entry.get_data();
+
+  bufferlist expected_data;
+  expected_data.append("data");
+
+  ASSERT_EQ(123U, entry.get_tid());
+  ASSERT_EQ("tag", entry.get_tag());
+  ASSERT_TRUE(data.contents_equal(expected_data));
+}
+
+TEST_F(TestEntry, IsReadable) {
+  bufferlist data;
+  data.append("data");
+  journal::Entry entry("tag", 123, data);
+
+  bufferlist full_bl;
+  ::encode(entry, full_bl);
+
+  uint32_t bytes_needed;
+  for (size_t i = 0; i < full_bl.length() - 1; ++i) {
+    bufferlist partial_bl;
+    if (i > 0) {
+      partial_bl.substr_of(full_bl, 0, i);
+    }
+    ASSERT_FALSE(journal::Entry::is_readable(partial_bl.begin(),
+                                             &bytes_needed));
+    ASSERT_GT(bytes_needed, 0U);
+  }
+  ASSERT_TRUE(journal::Entry::is_readable(full_bl.begin(), &bytes_needed));
+  ASSERT_EQ(0U, bytes_needed);
+}
+
+TEST_F(TestEntry, IsReadableBadPreamble) {
+  bufferlist data;
+  data.append("data");
+  journal::Entry entry("tag", 123, data);
+
+  uint64_t stray_bytes = 0x1122334455667788;
+  bufferlist full_bl;
+  ::encode(stray_bytes, full_bl);
+  ::encode(entry, full_bl);
+
+  uint32_t bytes_needed;
+  bufferlist::iterator it = full_bl.begin();
+  ASSERT_FALSE(journal::Entry::is_readable(it, &bytes_needed));
+  ASSERT_EQ(0U, bytes_needed);
+
+  it.advance(sizeof(stray_bytes));
+  ASSERT_TRUE(journal::Entry::is_readable(it, &bytes_needed));
+  ASSERT_EQ(0U, bytes_needed);
+}
+
+TEST_F(TestEntry, IsReadableBadCRC) {
+  bufferlist data;
+  data.append("data");
+  journal::Entry entry("tag", 123, data);
+
+  bufferlist full_bl;
+  ::encode(entry, full_bl);
+
+  bufferlist bad_bl;
+  bad_bl.substr_of(full_bl, 0, full_bl.length() - 4);
+  ::encode(full_bl.crc32c(1), bad_bl);
+
+  uint32_t bytes_needed;
+  ASSERT_FALSE(journal::Entry::is_readable(bad_bl.begin(), &bytes_needed));
+  ASSERT_EQ(0U, bytes_needed);
+
+
+
+}
diff --git a/src/test/journal/test_FutureImpl.cc b/src/test/journal/test_FutureImpl.cc
new file mode 100644 (file)
index 0000000..021e8ad
--- /dev/null
@@ -0,0 +1,177 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/FutureImpl.h"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "gtest/gtest.h"
+
+class TestFutureImpl : public ::testing::Test {
+public:
+
+  struct FlushHandler : public journal::FutureImpl::FlushHandler {
+    uint64_t refs;
+    uint64_t flushes;
+    FlushHandler() : refs(0), flushes(0) {}
+    virtual void get() {
+      ++refs;
+    }
+    virtual void put() {
+      assert(refs > 0);
+      --refs;
+    }
+    virtual void flush(const journal::FutureImplPtr &future) {
+      ++flushes;
+    }
+  };
+
+  journal::FutureImplPtr create_future(const std::string &tag, uint64_t tid,
+                                       const journal::FutureImplPtr &prev =
+                                         journal::FutureImplPtr()) {
+    journal::FutureImplPtr future(new journal::FutureImpl(tag, tid));
+    future->init(prev);
+    return future;
+  }
+
+  void flush(const journal::FutureImplPtr &future) {
+  }
+
+  FlushHandler m_flush_handler;
+};
+
+TEST_F(TestFutureImpl, Attach) {
+  journal::FutureImplPtr future = create_future("tag", 123);
+  ASSERT_FALSE(future->attach(&m_flush_handler));
+  ASSERT_EQ(1U, m_flush_handler.refs);
+}
+
+TEST_F(TestFutureImpl, AttachWithPendingFlush) {
+  journal::FutureImplPtr future = create_future("tag", 123);
+  future->flush(NULL);
+
+  ASSERT_TRUE(future->attach(&m_flush_handler));
+  ASSERT_EQ(1U, m_flush_handler.refs);
+}
+
+TEST_F(TestFutureImpl, Detach) {
+  journal::FutureImplPtr future = create_future("tag", 123);
+  ASSERT_FALSE(future->attach(&m_flush_handler));
+  future->detach();
+  ASSERT_EQ(0U, m_flush_handler.refs);
+}
+
+TEST_F(TestFutureImpl, DetachImplicit) {
+  journal::FutureImplPtr future = create_future("tag", 123);
+  ASSERT_FALSE(future->attach(&m_flush_handler));
+  future.reset();
+  ASSERT_EQ(0U, m_flush_handler.refs);
+}
+
+TEST_F(TestFutureImpl, Flush) {
+  journal::FutureImplPtr future = create_future("tag", 123);
+  ASSERT_FALSE(future->attach(&m_flush_handler));
+
+  C_SaferCond cond;
+  future->flush(&cond);
+
+  ASSERT_EQ(1U, m_flush_handler.flushes);
+  future->safe(-EIO);
+  ASSERT_EQ(-EIO, cond.wait());
+}
+
+TEST_F(TestFutureImpl, FlushWithoutContext) {
+  journal::FutureImplPtr future = create_future("tag", 123);
+  ASSERT_FALSE(future->attach(&m_flush_handler));
+
+  future->flush(NULL);
+  ASSERT_EQ(1U, m_flush_handler.flushes);
+  future->safe(-EIO);
+  ASSERT_TRUE(future->is_complete());
+  ASSERT_EQ(-EIO, future->get_return_value());
+}
+
+TEST_F(TestFutureImpl, FlushChain) {
+  journal::FutureImplPtr future1 = create_future("tag1", 123);
+  journal::FutureImplPtr future2 = create_future("tag1", 124, future1);
+  journal::FutureImplPtr future3 = create_future("tag2", 1, future2);
+  ASSERT_FALSE(future1->attach(&m_flush_handler));
+  ASSERT_FALSE(future2->attach(&m_flush_handler));
+  ASSERT_FALSE(future3->attach(&m_flush_handler));
+
+  C_SaferCond cond;
+  future3->flush(&cond);
+
+  ASSERT_EQ(3U, m_flush_handler.flushes);
+
+  future3->safe(0);
+  ASSERT_FALSE(future3->is_complete());
+
+  future1->safe(0);
+  ASSERT_FALSE(future3->is_complete());
+
+  future2->safe(-EIO);
+  ASSERT_TRUE(future3->is_complete());
+  ASSERT_EQ(-EIO, future3->get_return_value());
+  ASSERT_EQ(-EIO, cond.wait());
+  ASSERT_EQ(0, future1->get_return_value());
+}
+
+TEST_F(TestFutureImpl, FlushInProgress) {
+  journal::FutureImplPtr future1 = create_future("tag1", 123);
+  journal::FutureImplPtr future2 = create_future("tag1", 124, future1);
+  ASSERT_FALSE(future1->attach(&m_flush_handler));
+  ASSERT_FALSE(future2->attach(&m_flush_handler));
+
+  future1->set_flush_in_progress();
+  ASSERT_TRUE(future1->is_flush_in_progress());
+
+  future1->flush(NULL);
+  ASSERT_EQ(0U, m_flush_handler.flushes);
+}
+
+TEST_F(TestFutureImpl, FlushAlreadyComplete) {
+  journal::FutureImplPtr future = create_future("tag1", 123);
+  future->safe(-EIO);
+
+  C_SaferCond cond;
+  future->flush(&cond);
+  ASSERT_EQ(-EIO, cond.wait());
+}
+
+TEST_F(TestFutureImpl, Wait) {
+  journal::FutureImplPtr future = create_future("tag", 1);
+
+  C_SaferCond cond;
+  future->wait(&cond);
+  future->safe(-EEXIST);
+  ASSERT_EQ(-EEXIST, cond.wait());
+}
+
+TEST_F(TestFutureImpl, WaitAlreadyComplete) {
+  journal::FutureImplPtr future = create_future("tag", 1);
+  future->safe(-EEXIST);
+
+  C_SaferCond cond;
+  future->wait(&cond);
+  ASSERT_EQ(-EEXIST, cond.wait());
+}
+
+TEST_F(TestFutureImpl, SafePreservesError) {
+  journal::FutureImplPtr future1 = create_future("tag1", 123);
+  journal::FutureImplPtr future2 = create_future("tag1", 124, future1);
+
+  future1->safe(-EIO);
+  future2->safe(-EEXIST);
+  ASSERT_TRUE(future2->is_complete());
+  ASSERT_EQ(-EIO, future2->get_return_value());
+}
+
+TEST_F(TestFutureImpl, ConsistentPreservesError) {
+  journal::FutureImplPtr future1 = create_future("tag1", 123);
+  journal::FutureImplPtr future2 = create_future("tag1", 124, future1);
+
+  future2->safe(-EEXIST);
+  future1->safe(-EIO);
+  ASSERT_TRUE(future2->is_complete());
+  ASSERT_EQ(-EEXIST, future2->get_return_value());
+}
diff --git a/src/test/journal/test_JournalMetadata.cc b/src/test/journal/test_JournalMetadata.cc
new file mode 100644 (file)
index 0000000..78652c4
--- /dev/null
@@ -0,0 +1,121 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/JournalMetadata.h"
+#include "test/journal/RadosTestFixture.h"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include <boost/assign/list_of.hpp>
+#include <map>
+
+class TestJournalMetadata : public RadosTestFixture {
+public:
+  TestJournalMetadata() : m_listener(this) {}
+
+  struct Listener : public journal::JournalMetadata::Listener {
+    TestJournalMetadata *test_fixture;
+    Mutex mutex;
+    Cond cond;
+    std::map<journal::JournalMetadata*, uint32_t> updates;
+
+    Listener(TestJournalMetadata *_test_fixture)
+      : test_fixture(_test_fixture), mutex("mutex") {}
+
+    virtual void handle_update(journal::JournalMetadata *metadata) {
+      Mutex::Locker locker(mutex);
+      ++updates[metadata];
+      cond.Signal();
+    }
+  };
+
+  journal::JournalMetadataPtr create_metadata(const std::string &oid,
+                                              const std::string &client_id) {
+    journal::JournalMetadataPtr metadata(new journal::JournalMetadata(
+      m_ioctx, oid, client_id));
+    metadata->add_listener(&m_listener);
+    return metadata;
+  }
+
+  bool wait_for_update(journal::JournalMetadataPtr metadata) {
+    Mutex::Locker locker(m_listener.mutex);
+    while (m_listener.updates[metadata.get()] == 0) {
+      if (m_listener.cond.WaitInterval(
+            reinterpret_cast<CephContext*>(m_ioctx.cct()),
+            m_listener.mutex, utime_t(10, 0)) != 0) {
+        return false;
+      }
+    }
+    --m_listener.updates[metadata.get()];
+    return true;
+  }
+
+  Listener m_listener;
+};
+
+TEST_F(TestJournalMetadata, JournalDNE) {
+  std::string oid = get_temp_oid();
+
+  journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1");
+  ASSERT_EQ(-ENOENT, metadata1->init());
+}
+
+TEST_F(TestJournalMetadata, ClientDNE) {
+  std::string oid = get_temp_oid();
+
+  ASSERT_EQ(0, create(oid, 14, 2));
+  ASSERT_EQ(0, client_register(oid, "client1", ""));
+
+  journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1");
+  ASSERT_EQ(0, metadata1->init());
+
+  journal::JournalMetadataPtr metadata2 = create_metadata(oid, "client2");
+  ASSERT_EQ(-ENOENT, metadata2->init());
+}
+
+TEST_F(TestJournalMetadata, SetCommitPositions) {
+  std::string oid = get_temp_oid();
+
+  ASSERT_EQ(0, create(oid, 14, 2));
+  ASSERT_EQ(0, client_register(oid, "client1", ""));
+
+  journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1");
+  ASSERT_EQ(0, metadata1->init());
+
+  journal::JournalMetadataPtr metadata2 = create_metadata(oid, "client1");
+  ASSERT_EQ(0, metadata2->init());
+  ASSERT_TRUE(wait_for_update(metadata2));
+
+  journal::JournalMetadata::ObjectSetPosition commit_position;
+  journal::JournalMetadata::ObjectSetPosition read_commit_position;
+  metadata1->get_commit_position(&read_commit_position);
+  ASSERT_EQ(commit_position, read_commit_position);
+
+  journal::JournalMetadata::EntryPositions entry_positions;
+  entry_positions = boost::assign::list_of(
+    cls::journal::EntryPosition("tag1", 122));
+  commit_position = journal::JournalMetadata::ObjectSetPosition(1, entry_positions);
+
+  metadata1->set_commit_position(commit_position);
+  ASSERT_TRUE(wait_for_update(metadata2));
+
+  metadata2->get_commit_position(&read_commit_position);
+  ASSERT_EQ(commit_position, read_commit_position);
+}
+
+TEST_F(TestJournalMetadata, UpdateActiveObject) {
+  std::string oid = get_temp_oid();
+
+  ASSERT_EQ(0, create(oid, 14, 2));
+  ASSERT_EQ(0, client_register(oid, "client1", ""));
+
+  journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1");
+  ASSERT_EQ(0, metadata1->init());
+  ASSERT_TRUE(wait_for_update(metadata1));
+
+  ASSERT_EQ(0U, metadata1->get_active_set());
+
+  metadata1->set_active_set(123);
+  ASSERT_TRUE(wait_for_update(metadata1));
+
+  ASSERT_EQ(123U, metadata1->get_active_set());
+}
diff --git a/src/test/journal/test_JournalPlayer.cc b/src/test/journal/test_JournalPlayer.cc
new file mode 100644 (file)
index 0000000..555a943
--- /dev/null
@@ -0,0 +1,282 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/JournalPlayer.h"
+#include "journal/Entry.h"
+#include "journal/JournalMetadata.h"
+#include "journal/ReplayHandler.h"
+#include "include/stringify.h"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "gtest/gtest.h"
+#include "test/journal/RadosTestFixture.h"
+#include <list>
+#include <boost/assign/list_of.hpp>
+
+class TestJournalPlayer : public RadosTestFixture {
+public:
+  typedef std::list<journal::Entry> Entries;
+
+  struct ReplayHandler : public journal::ReplayHandler {
+    Mutex lock;
+    Cond cond;
+    bool entries_available;
+    bool error_occurred;
+
+    ReplayHandler()
+      : lock("lock"), entries_available(false), error_occurred(false) {}
+
+    virtual bool filter_entry(const std::string &tag) {
+      return false;
+    }
+
+    virtual void handle_entries_available() {
+      Mutex::Locker locker(lock);
+      entries_available = true;
+      cond.Signal();
+    }
+
+    virtual void handle_error(int r) {
+      error_occurred = true;
+    }
+  };
+
+  int create(const std::string &oid) {
+    return RadosTestFixture::create(oid, 14, 2);
+  }
+
+  int client_register(const std::string &oid) {
+    return RadosTestFixture::client_register(oid, "client", "");
+  }
+
+  int client_commit(const std::string &oid,
+                    journal::JournalPlayer::ObjectSetPosition position) {
+    return RadosTestFixture::client_commit(oid, "client", position);
+  }
+
+  journal::Entry create_entry(const std::string &tag, uint64_t tid) {
+    bufferlist payload_bl;
+    payload_bl.append("playload");
+    return journal::Entry(tag, tid, payload_bl);
+  }
+
+  journal::JournalMetadataPtr create_metadata(const std::string &oid) {
+    journal::JournalMetadataPtr metadata(new journal::JournalMetadata(
+      m_ioctx, oid, "client"));
+    return metadata;
+  }
+
+  journal::JournalPlayerPtr create_player(const std::string &oid,
+                                          const journal::JournalMetadataPtr &metadata) {
+    journal::JournalPlayerPtr player(new journal::JournalPlayer(
+      m_ioctx, oid + ".", metadata, &m_replay_hander));
+    return player;
+  }
+
+  bool wait_for_entries(journal::JournalPlayerPtr player, uint32_t count,
+                        Entries *entries) {
+    entries->clear();
+    while (entries->size() < count) {
+      journal::Entry entry;
+      journal::JournalPlayer::ObjectSetPosition object_set_position;
+      while (entries->size() < count &&
+             player->try_pop_front(&entry, &object_set_position)) {
+        entries->push_back(entry);
+      }
+      if (entries->size() == count) {
+        break;
+      }
+
+      Mutex::Locker locker(m_replay_hander.lock);
+      if (m_replay_hander.entries_available) {
+        m_replay_hander.entries_available = false;
+      } else if (m_replay_hander.cond.WaitInterval(
+          reinterpret_cast<CephContext*>(m_ioctx.cct()),
+          m_replay_hander.lock, utime_t(10, 0)) != 0) {
+        break;
+      }
+    }
+    return entries->size() == count;
+  }
+
+  int write_entry(const std::string &oid, uint64_t object_num,
+                  const std::string &tag, uint64_t tid) {
+    bufferlist bl;
+    ::encode(create_entry(tag, tid), bl);
+    return append(oid + "." + stringify(object_num), bl);
+  }
+
+  ReplayHandler m_replay_hander;
+};
+
+TEST_F(TestJournalPlayer, Prefetch) {
+  std::string oid = get_temp_oid();
+
+  journal::JournalPlayer::EntryPositions positions;
+  positions = boost::assign::list_of(
+    cls::journal::EntryPosition("tag1", 122));
+  cls::journal::ObjectSetPosition commit_position(0, positions);
+
+  ASSERT_EQ(0, create(oid));
+  ASSERT_EQ(0, client_register(oid));
+  ASSERT_EQ(0, client_commit(oid, commit_position));
+
+  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  ASSERT_EQ(0, metadata->init());
+
+  journal::JournalPlayerPtr player = create_player(oid, metadata);
+
+  ASSERT_EQ(0, write_entry(oid, 0, "tag1", 122));
+  ASSERT_EQ(0, write_entry(oid, 1, "tag1", 123));
+  ASSERT_EQ(0, write_entry(oid, 0, "tag1", 124));
+  ASSERT_EQ(0, write_entry(oid, 1, "tag1", 125));
+
+  player->prefetch();
+
+  Entries entries;
+  ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+
+  Entries expected_entries;
+  expected_entries = boost::assign::list_of(
+    create_entry("tag1", 123))(
+    create_entry("tag1", 124))(
+    create_entry("tag1", 125));
+  ASSERT_EQ(expected_entries, entries);
+
+  uint64_t last_tid;
+  ASSERT_TRUE(metadata->get_last_allocated_tid("tag1", &last_tid));
+  ASSERT_EQ(125U, last_tid);
+}
+
+TEST_F(TestJournalPlayer, PrefetchWithoutCommit) {
+  std::string oid = get_temp_oid();
+
+  cls::journal::ObjectSetPosition commit_position;
+
+  ASSERT_EQ(0, create(oid));
+  ASSERT_EQ(0, client_register(oid));
+  ASSERT_EQ(0, client_commit(oid, commit_position));
+
+  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  ASSERT_EQ(0, metadata->init());
+
+  journal::JournalPlayerPtr player = create_player(oid, metadata);
+
+  ASSERT_EQ(0, write_entry(oid, 0, "tag1", 122));
+  ASSERT_EQ(0, write_entry(oid, 1, "tag1", 123));
+
+  player->prefetch();
+
+  Entries entries;
+  ASSERT_TRUE(wait_for_entries(player, 2, &entries));
+
+  Entries expected_entries;
+  expected_entries = boost::assign::list_of(
+    create_entry("tag1", 122))(
+    create_entry("tag1", 123));
+  ASSERT_EQ(expected_entries, entries);
+}
+
+TEST_F(TestJournalPlayer, PrefetchMultipleTags) {
+  std::string oid = get_temp_oid();
+
+  journal::JournalPlayer::EntryPositions positions;
+  positions = boost::assign::list_of(
+    cls::journal::EntryPosition("tag1", 122))(
+    cls::journal::EntryPosition("tag2", 1));
+  cls::journal::ObjectSetPosition commit_position(0, positions);
+
+  ASSERT_EQ(0, create(oid));
+  ASSERT_EQ(0, client_register(oid));
+  ASSERT_EQ(0, client_commit(oid, commit_position));
+
+  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  ASSERT_EQ(0, metadata->init());
+
+  journal::JournalPlayerPtr player = create_player(oid, metadata);
+
+  ASSERT_EQ(0, write_entry(oid, 0, "tag1", 120));
+  ASSERT_EQ(0, write_entry(oid, 0, "tag2", 0));
+  ASSERT_EQ(0, write_entry(oid, 1, "tag1", 121));
+  ASSERT_EQ(0, write_entry(oid, 1, "tag2", 1));
+  ASSERT_EQ(0, write_entry(oid, 0, "tag1", 122));
+  ASSERT_EQ(0, write_entry(oid, 1, "tag1", 123));
+  ASSERT_EQ(0, write_entry(oid, 0, "tag1", 124));
+  ASSERT_EQ(0, write_entry(oid, 0, "tag2", 2));
+
+  player->prefetch();
+
+  Entries entries;
+  ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+
+  uint64_t last_tid;
+  ASSERT_TRUE(metadata->get_last_allocated_tid("tag1", &last_tid));
+  ASSERT_EQ(124U, last_tid);
+  ASSERT_TRUE(metadata->get_last_allocated_tid("tag2", &last_tid));
+  ASSERT_EQ(2U, last_tid);
+}
+
+TEST_F(TestJournalPlayer, PrefetchCorruptSequence) {
+  std::string oid = get_temp_oid();
+
+  cls::journal::ObjectSetPosition commit_position;
+
+  ASSERT_EQ(0, create(oid));
+  ASSERT_EQ(0, client_register(oid));
+  ASSERT_EQ(0, client_commit(oid, commit_position));
+
+  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  ASSERT_EQ(0, metadata->init());
+
+  journal::JournalPlayerPtr player = create_player(oid, metadata);
+
+  ASSERT_EQ(0, write_entry(oid, 0, "tag1", 120));
+  ASSERT_EQ(0, write_entry(oid, 0, "tag2", 0));
+  ASSERT_EQ(0, write_entry(oid, 1, "tag1", 121));
+  ASSERT_EQ(0, write_entry(oid, 0, "tag1", 124));
+
+  player->prefetch();
+  Entries entries;
+  ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+
+  journal::Entry entry;
+  cls::journal::ObjectSetPosition object_set_position;
+  ASSERT_FALSE(player->try_pop_front(&entry, &object_set_position));
+  ASSERT_TRUE(m_replay_hander.error_occurred);
+}
+
+TEST_F(TestJournalPlayer, PrefetchAndWatch) {
+  std::string oid = get_temp_oid();
+
+  journal::JournalPlayer::EntryPositions positions;
+  positions = boost::assign::list_of(
+    cls::journal::EntryPosition("tag1", 122));
+  cls::journal::ObjectSetPosition commit_position(0, positions);
+
+  ASSERT_EQ(0, create(oid));
+  ASSERT_EQ(0, client_register(oid));
+  ASSERT_EQ(0, client_commit(oid, commit_position));
+
+  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  ASSERT_EQ(0, metadata->init());
+
+  journal::JournalPlayerPtr player = create_player(oid, metadata);
+
+  ASSERT_EQ(0, write_entry(oid, 0, "tag1", 122));
+
+  player->prefetch_and_watch(0.25);
+
+  Entries entries;
+  ASSERT_EQ(0, write_entry(oid, 1, "tag1", 123));
+  ASSERT_TRUE(wait_for_entries(player, 1, &entries));
+
+  Entries expected_entries;
+  expected_entries = boost::assign::list_of(create_entry("tag1", 123));
+  ASSERT_EQ(expected_entries, entries);
+
+  ASSERT_EQ(0, write_entry(oid, 0, "tag1", 124));
+  ASSERT_TRUE(wait_for_entries(player, 1, &entries));
+
+  expected_entries = boost::assign::list_of(create_entry("tag1", 124));
+  ASSERT_EQ(expected_entries, entries);
+}
diff --git a/src/test/journal/test_JournalRecorder.cc b/src/test/journal/test_JournalRecorder.cc
new file mode 100644 (file)
index 0000000..e8488b6
--- /dev/null
@@ -0,0 +1,146 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/JournalRecorder.h"
+#include "journal/JournalMetadata.h"
+#include "test/journal/RadosTestFixture.h"
+#include <limits>
+#include <list>
+
+class TestJournalRecorder : public RadosTestFixture {
+public:
+
+  virtual void TearDown() {
+    for (std::list<journal::JournalRecorder*>::iterator it = m_recorders.begin();
+         it != m_recorders.end(); ++it) {
+      delete *it;
+    }
+    RadosTestFixture::TearDown();
+  }
+
+  int client_register(const std::string &oid) {
+    return RadosTestFixture::client_register(oid, "client", "");
+  }
+
+  journal::JournalMetadataPtr create_metadata(const std::string &oid) {
+    journal::JournalMetadataPtr metadata(new journal::JournalMetadata(
+      m_ioctx, oid, "client"));
+    return metadata;
+  }
+
+  journal::JournalRecorder *create_recorder(const std::string &oid,
+                                            const journal::JournalMetadataPtr &metadata) {
+    journal::JournalRecorder *recorder(new journal::JournalRecorder(
+      m_ioctx, oid + ".", metadata, 0, std::numeric_limits<uint32_t>::max(), 0));
+    m_recorders.push_back(recorder);
+    return recorder;
+  }
+
+  std::list<journal::JournalRecorder*> m_recorders;
+
+};
+
+TEST_F(TestJournalRecorder, Append) {
+  std::string oid = get_temp_oid();
+  ASSERT_EQ(0, create(oid, 12, 2));
+  ASSERT_EQ(0, client_register(oid));
+
+  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  ASSERT_EQ(0, metadata->init());
+
+  journal::JournalRecorder *recorder = create_recorder(oid, metadata);
+
+  journal::Future future1 = recorder->append("tag1", create_payload("payload"));
+
+  C_SaferCond cond;
+  future1.flush(&cond);
+  ASSERT_EQ(0, cond.wait());
+}
+
+TEST_F(TestJournalRecorder, AppendKnownOverflow) {
+  std::string oid = get_temp_oid();
+  ASSERT_EQ(0, create(oid, 12, 2));
+  ASSERT_EQ(0, client_register(oid));
+
+  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  ASSERT_EQ(0, metadata->init());
+  ASSERT_EQ(0U, metadata->get_active_set());
+
+  journal::JournalRecorder *recorder = create_recorder(oid, metadata);
+
+  recorder->append("tag1", create_payload(std::string(1 << 12, '1')));
+  journal::Future future2 = recorder->append("tag1", create_payload(std::string(1, '2')));
+
+  C_SaferCond cond;
+  future2.flush(&cond);
+  ASSERT_EQ(0, cond.wait());
+
+  ASSERT_EQ(1U, metadata->get_active_set());
+}
+
+TEST_F(TestJournalRecorder, AppendDelayedOverflow) {
+  std::string oid = get_temp_oid();
+  ASSERT_EQ(0, create(oid, 12, 2));
+  ASSERT_EQ(0, client_register(oid));
+
+  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  ASSERT_EQ(0, metadata->init());
+  ASSERT_EQ(0U, metadata->get_active_set());
+
+  journal::JournalRecorder *recorder1 = create_recorder(oid, metadata);
+  journal::JournalRecorder *recorder2 = create_recorder(oid, metadata);
+
+  recorder1->append("tag1", create_payload(std::string(1, '1')));
+  recorder2->append("tag2", create_payload(std::string(1 << 12, '2')));
+
+  journal::Future future = recorder2->append("tag1", create_payload(std::string(1, '3')));
+
+  C_SaferCond cond;
+  future.flush(&cond);
+  ASSERT_EQ(0, cond.wait());
+
+  ASSERT_EQ(1U, metadata->get_active_set());
+}
+
+TEST_F(TestJournalRecorder, FutureFlush) {
+  std::string oid = get_temp_oid();
+  ASSERT_EQ(0, create(oid, 12, 2));
+  ASSERT_EQ(0, client_register(oid));
+
+  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  ASSERT_EQ(0, metadata->init());
+
+  journal::JournalRecorder *recorder = create_recorder(oid, metadata);
+
+  journal::Future future1 = recorder->append("tag1", create_payload("payload1"));
+  journal::Future future2 = recorder->append("tag1", create_payload("payload2"));
+
+  C_SaferCond cond;
+  future2.flush(&cond);
+  ASSERT_EQ(0, cond.wait());
+  ASSERT_TRUE(future1.is_complete());
+  ASSERT_TRUE(future2.is_complete());
+}
+
+TEST_F(TestJournalRecorder, Flush) {
+  std::string oid = get_temp_oid();
+  ASSERT_EQ(0, create(oid, 12, 2));
+  ASSERT_EQ(0, client_register(oid));
+
+  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  ASSERT_EQ(0, metadata->init());
+
+  journal::JournalRecorder *recorder = create_recorder(oid, metadata);
+
+  journal::Future future1 = recorder->append("tag1", create_payload("payload1"));
+  journal::Future future2 = recorder->append("tag1", create_payload("payload2"));
+
+  recorder->flush();
+
+  C_SaferCond cond;
+  future2.wait(&cond);
+  ASSERT_EQ(0, cond.wait());
+  ASSERT_TRUE(future1.is_complete());
+  ASSERT_TRUE(future2.is_complete());
+}
+
diff --git a/src/test/journal/test_Journaler.cc b/src/test/journal/test_Journaler.cc
new file mode 100644 (file)
index 0000000..1086c72
--- /dev/null
@@ -0,0 +1,78 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/Journaler.h"
+#include "include/stringify.h"
+#include "gtest/gtest.h"
+#include "test/librados/test.h"
+#include "test/journal/RadosTestFixture.h"
+
+class TestJournaler : public RadosTestFixture {
+public:
+
+  static const std::string CLIENT_ID;
+
+  static uint64_t get_temp_journal_id() {
+    return ++_journal_id;
+  }
+
+  virtual void SetUp() {
+    RadosTestFixture::SetUp();
+    m_journal_id = get_temp_journal_id();
+    m_journaler = new journal::Journaler(m_ioctx, m_ioctx, m_journal_id,
+                                         CLIENT_ID);
+  }
+
+  virtual void TearDown() {
+    delete m_journaler;
+    RadosTestFixture::TearDown();
+  }
+
+  int create_journal(uint8_t order, uint8_t splay_width) {
+    return m_journaler->create(order, splay_width);
+  }
+
+  int register_client(const std::string &client_id, const std::string &desc) {
+    journal::Journaler journaler(m_ioctx, m_ioctx, m_journal_id, client_id);
+    return journaler.register_client(desc);
+  }
+
+  static uint64_t _journal_id;
+
+  uint64_t m_journal_id;
+  journal::Journaler *m_journaler;
+};
+
+const std::string TestJournaler::CLIENT_ID = "client1";
+uint64_t TestJournaler::_journal_id = 0;
+
+TEST_F(TestJournaler, Create) {
+  ASSERT_EQ(0, create_journal(12, 8));
+}
+
+TEST_F(TestJournaler, CreateDuplicate) {
+  ASSERT_EQ(0, create_journal(12, 8));
+  ASSERT_EQ(-EEXIST, create_journal(12, 8));
+}
+
+TEST_F(TestJournaler, CreateInvalidParams) {
+  ASSERT_EQ(-EDOM, create_journal(1, 8));
+  ASSERT_EQ(-EDOM, create_journal(123, 8));
+  ASSERT_EQ(-EINVAL, create_journal(12, 0));
+}
+
+TEST_F(TestJournaler, Init) {
+  ASSERT_EQ(0, create_journal(12, 8));
+  ASSERT_EQ(0, register_client(CLIENT_ID, "foo"));
+  ASSERT_EQ(0, m_journaler->init());
+}
+
+TEST_F(TestJournaler, InitDNE) {
+  ASSERT_EQ(-ENOENT, m_journaler->init());
+}
+
+TEST_F(TestJournaler, RegisterClientDuplicate) {
+  ASSERT_EQ(0, register_client(CLIENT_ID, "foo"));
+  ASSERT_EQ(-EEXIST, register_client(CLIENT_ID, "foo2"));
+}
+
diff --git a/src/test/journal/test_ObjectPlayer.cc b/src/test/journal/test_ObjectPlayer.cc
new file mode 100644 (file)
index 0000000..2a32d00
--- /dev/null
@@ -0,0 +1,279 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/ObjectPlayer.h"
+#include "journal/Entry.h"
+#include "include/stringify.h"
+#include "common/Mutex.h"
+#include "common/Timer.h"
+#include "gtest/gtest.h"
+#include "test/librados/test.h"
+#include "test/journal/RadosTestFixture.h"
+#include <boost/assign/list_of.hpp>
+
+class TestObjectPlayer : public RadosTestFixture {
+public:
+  journal::ObjectPlayerPtr create_object(const std::string &oid,
+                                         uint8_t order) {
+    journal::ObjectPlayerPtr object(new journal::ObjectPlayer(
+      m_ioctx, oid + ".", 0, *m_timer, m_timer_lock, order));
+    return object;
+  }
+
+  std::string get_object_name(const std::string &oid) {
+    return oid + ".0";
+  }
+};
+
+TEST_F(TestObjectPlayer, Fetch) {
+  std::string oid = get_temp_oid();
+
+  journal::Entry entry1("tag1", 123, create_payload(std::string(24, '1')));
+  journal::Entry entry2("tag1", 124, create_payload(std::string(24, '1')));
+
+  bufferlist bl;
+  ::encode(entry1, bl);
+  ::encode(entry2, bl);
+  ASSERT_EQ(0, append(get_object_name(oid), bl));
+
+  journal::ObjectPlayerPtr object = create_object(oid, 14);
+
+  C_SaferCond cond;
+  object->fetch(&cond);
+  ASSERT_EQ(0, cond.wait());
+
+  journal::ObjectPlayer::Entries entries;
+  object->get_entries(&entries);
+  ASSERT_EQ(2U, entries.size());
+
+  journal::ObjectPlayer::Entries expected_entries = boost::assign::list_of(
+    entry1)(entry2);
+  ASSERT_EQ(expected_entries, entries);
+}
+
+TEST_F(TestObjectPlayer, FetchLarge) {
+  std::string oid = get_temp_oid();
+
+  journal::Entry entry1("tag1", 123,
+                        create_payload(std::string(8192 - 33, '1')));
+  journal::Entry entry2("tag1", 124, create_payload(""));
+
+  bufferlist bl;
+  ::encode(entry1, bl);
+  ::encode(entry2, bl);
+  ASSERT_EQ(0, append(get_object_name(oid), bl));
+
+  journal::ObjectPlayerPtr object = create_object(oid, 12);
+
+  C_SaferCond cond;
+  object->fetch(&cond);
+  ASSERT_EQ(0, cond.wait());
+
+  journal::ObjectPlayer::Entries entries;
+  object->get_entries(&entries);
+  ASSERT_EQ(1U, entries.size());
+
+  journal::ObjectPlayer::Entries expected_entries = boost::assign::list_of(
+    entry1);
+  ASSERT_EQ(expected_entries, entries);
+}
+
+TEST_F(TestObjectPlayer, FetchDeDup) {
+  std::string oid = get_temp_oid();
+
+  journal::Entry entry1("tag1", 123, create_payload(std::string(24, '1')));
+  journal::Entry entry2("tag1", 123, create_payload(std::string(24, '2')));
+
+  bufferlist bl;
+  ::encode(entry1, bl);
+  ::encode(entry2, bl);
+  ASSERT_EQ(0, append(get_object_name(oid), bl));
+
+  journal::ObjectPlayerPtr object = create_object(oid, 14);
+
+  C_SaferCond cond;
+  object->fetch(&cond);
+  ASSERT_EQ(0, cond.wait());
+
+  journal::ObjectPlayer::Entries entries;
+  object->get_entries(&entries);
+  ASSERT_EQ(1U, entries.size());
+
+  journal::ObjectPlayer::Entries expected_entries = boost::assign::list_of(
+    entry2);
+  ASSERT_EQ(expected_entries, entries);
+}
+
+TEST_F(TestObjectPlayer, FetchEmpty) {
+  std::string oid = get_temp_oid();
+
+  bufferlist bl;
+  ASSERT_EQ(0, append(get_object_name(oid), bl));
+
+  journal::ObjectPlayerPtr object = create_object(oid, 14);
+
+  C_SaferCond cond;
+  object->fetch(&cond);
+  ASSERT_EQ(-ENOENT, cond.wait());
+  ASSERT_TRUE(object->empty());
+}
+
+TEST_F(TestObjectPlayer, FetchError) {
+  std::string oid = get_temp_oid();
+
+  journal::ObjectPlayerPtr object = create_object(oid, 14);
+
+  C_SaferCond cond;
+  object->fetch(&cond);
+  ASSERT_EQ(-ENOENT, cond.wait());
+  ASSERT_TRUE(object->empty());
+}
+
+TEST_F(TestObjectPlayer, FetchCorrupt) {
+  std::string oid = get_temp_oid();
+
+  journal::Entry entry1("tag1", 123, create_payload(std::string(24, '1')));
+  journal::Entry entry2("tag1", 124, create_payload(std::string(24, '2')));
+
+  bufferlist bl;
+  ::encode(entry1, bl);
+  ::encode(create_payload("corruption"), bl);
+  ::encode(entry2, bl);
+  ASSERT_EQ(0, append(get_object_name(oid), bl));
+
+  journal::ObjectPlayerPtr object = create_object(oid, 14);
+
+  C_SaferCond cond;
+  object->fetch(&cond);
+  ASSERT_EQ(-EINVAL, cond.wait());
+
+  journal::ObjectPlayer::Entries entries;
+  object->get_entries(&entries);
+  ASSERT_EQ(2U, entries.size());
+
+  journal::ObjectPlayer::Entries expected_entries = boost::assign::list_of(
+    entry1)(entry2);
+  ASSERT_EQ(expected_entries, entries);
+}
+
+TEST_F(TestObjectPlayer, FetchAppend) {
+  std::string oid = get_temp_oid();
+
+  journal::Entry entry1("tag1", 123, create_payload(std::string(24, '1')));
+  journal::Entry entry2("tag1", 124, create_payload(std::string(24, '2')));
+
+  bufferlist bl;
+  ::encode(entry1, bl);
+  ASSERT_EQ(0, append(get_object_name(oid), bl));
+
+  journal::ObjectPlayerPtr object = create_object(oid, 14);
+
+  C_SaferCond cond1;
+  object->fetch(&cond1);
+  ASSERT_EQ(0, cond1.wait());
+
+  journal::ObjectPlayer::Entries entries;
+  object->get_entries(&entries);
+  ASSERT_EQ(1U, entries.size());
+
+  journal::ObjectPlayer::Entries expected_entries = boost::assign::list_of(
+    entry1);
+  ASSERT_EQ(expected_entries, entries);
+
+  bl.clear();
+  ::encode(entry2, bl);
+  ASSERT_EQ(0, append(get_object_name(oid), bl));
+
+  C_SaferCond cond2;
+  object->fetch(&cond2);
+  ASSERT_EQ(0, cond2.wait());
+
+  object->get_entries(&entries);
+  ASSERT_EQ(2U, entries.size());
+
+  expected_entries = boost::assign::list_of(entry1)(entry2);
+  ASSERT_EQ(expected_entries, entries);
+}
+
+TEST_F(TestObjectPlayer, PopEntry) {
+  std::string oid = get_temp_oid();
+
+  journal::Entry entry1("tag1", 123, create_payload(std::string(24, '1')));
+  journal::Entry entry2("tag1", 124, create_payload(std::string(24, '1')));
+
+  bufferlist bl;
+  ::encode(entry1, bl);
+  ::encode(entry2, bl);
+  ASSERT_EQ(0, append(get_object_name(oid), bl));
+
+  journal::ObjectPlayerPtr object = create_object(oid, 14);
+
+  C_SaferCond cond;
+  object->fetch(&cond);
+  ASSERT_EQ(0, cond.wait());
+
+  journal::ObjectPlayer::Entries entries;
+  object->get_entries(&entries);
+  ASSERT_EQ(2U, entries.size());
+
+  journal::Entry entry;
+  object->front(&entry);
+  object->pop_front();
+  ASSERT_EQ(entry1, entry);
+  object->front(&entry);
+  object->pop_front();
+  ASSERT_EQ(entry2, entry);
+  ASSERT_TRUE(object->empty());
+}
+
+TEST_F(TestObjectPlayer, Watch) {
+  std::string oid = get_temp_oid();
+  journal::ObjectPlayerPtr object = create_object(oid, 14);
+
+  C_SaferCond cond1;
+  object->watch(&cond1, 0.1);
+
+  journal::Entry entry1("tag1", 123, create_payload(std::string(24, '1')));
+  journal::Entry entry2("tag1", 124, create_payload(std::string(24, '1')));
+
+  bufferlist bl;
+  ::encode(entry1, bl);
+  ASSERT_EQ(0, append(get_object_name(oid), bl));
+  ASSERT_EQ(0, cond1.wait());
+
+  journal::ObjectPlayer::Entries entries;
+  object->get_entries(&entries);
+  ASSERT_EQ(1U, entries.size());
+
+  journal::ObjectPlayer::Entries expected_entries;
+  expected_entries = boost::assign::list_of(entry1);
+  ASSERT_EQ(expected_entries, entries);
+
+  C_SaferCond cond2;
+  object->watch(&cond2, 0.1);
+
+  bl.clear();
+  ::encode(entry2, bl);
+  ASSERT_EQ(0, append(get_object_name(oid), bl));
+  ASSERT_EQ(0, cond2.wait());
+
+  object->get_entries(&entries);
+  ASSERT_EQ(2U, entries.size());
+
+  expected_entries = boost::assign::list_of(entry1)(entry2);
+  ASSERT_EQ(expected_entries, entries);
+}
+
+TEST_F(TestObjectPlayer, Unwatch) {
+  std::string oid = get_temp_oid();
+  journal::ObjectPlayerPtr object = create_object(oid, 14);
+
+  Mutex mutex("lock");
+  Cond cond;
+  bool done = false;
+  int rval = 0;
+  C_SafeCond *ctx = new C_SafeCond(&mutex, &cond, &done, &rval);
+  object->watch(ctx, 0.1);
+  ASSERT_FALSE(done);
+  object->unwatch();
+}
diff --git a/src/test/journal/test_ObjectRecorder.cc b/src/test/journal/test_ObjectRecorder.cc
new file mode 100644 (file)
index 0000000..8897206
--- /dev/null
@@ -0,0 +1,297 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/ObjectRecorder.h"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "common/Timer.h"
+#include "gtest/gtest.h"
+#include "test/librados/test.h"
+#include "test/journal/RadosTestFixture.h"
+#include <limits>
+#include <boost/assign/list_of.hpp>
+
+class TestObjectRecorder : public RadosTestFixture {
+public:
+  TestObjectRecorder()
+    : m_flush_interval(std::numeric_limits<uint32_t>::max()),
+      m_flush_bytes(std::numeric_limits<uint64_t>::max()),
+      m_flush_age(600)
+  {
+  }
+
+  struct OverflowHandler : public journal::ObjectRecorder::OverflowHandler {
+    Mutex lock;
+    Cond cond;
+    uint32_t overflows;
+
+    OverflowHandler() : lock("lock"), overflows(0) {}
+
+    virtual void overflow(journal::ObjectRecorder *object_recorder) {
+      Mutex::Locker locker(lock);
+      journal::AppendBuffers append_buffers;
+      object_recorder->claim_append_buffers(&append_buffers);
+
+      ++overflows;
+      cond.Signal();
+    }
+  };
+
+  uint32_t m_flush_interval;
+  uint64_t m_flush_bytes;
+  double m_flush_age;
+  OverflowHandler m_overflow_handler;
+
+  inline void set_flush_interval(uint32_t i) {
+    m_flush_interval = i;
+  }
+  inline void set_flush_bytes(uint64_t i) {
+    m_flush_bytes = i;
+  }
+  inline void set_flush_age(double i) {
+    m_flush_age = i;
+  }
+
+  journal::AppendBuffer create_append_buffer(const std::string &tag,
+                                             uint64_t tid,
+                                             const std::string &payload) {
+    journal::FutureImplPtr future(new journal::FutureImpl(tag, tid));
+    future->init(journal::FutureImplPtr());
+
+    bufferlist bl;
+    bl.append(payload);
+    return std::make_pair(future, bl);
+  }
+
+  journal::ObjectRecorderPtr create_object(const std::string &oid,
+                                           uint8_t order) {
+    journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
+      m_ioctx, oid, 0, *m_timer, m_timer_lock, &m_overflow_handler, order,
+      m_flush_interval, m_flush_bytes, m_flush_age));
+    return object;
+  }
+};
+
+TEST_F(TestObjectRecorder, Append) {
+  std::string oid = get_temp_oid();
+
+  journal::ObjectRecorderPtr object = create_object(oid, 24);
+
+  journal::AppendBuffer append_buffer1 = create_append_buffer("tag", 123,
+                                                             "payload");
+  journal::AppendBuffers append_buffers;
+  append_buffers = boost::assign::list_of(append_buffer1);
+  ASSERT_FALSE(object->append(append_buffers));
+  ASSERT_EQ(1U, object->get_pending_appends());
+
+  journal::AppendBuffer append_buffer2 = create_append_buffer("tag", 124,
+                                                             "payload");
+  append_buffers = boost::assign::list_of(append_buffer2);
+  ASSERT_FALSE(object->append(append_buffers));
+  ASSERT_EQ(2U, object->get_pending_appends());
+
+  C_SaferCond cond;
+  append_buffer2.first->flush(&cond);
+  ASSERT_EQ(0, cond.wait());
+  ASSERT_EQ(0U, object->get_pending_appends());
+}
+
+TEST_F(TestObjectRecorder, AppendFlushByCount) {
+  std::string oid = get_temp_oid();
+
+  set_flush_interval(2);
+  journal::ObjectRecorderPtr object = create_object(oid, 24);
+
+  journal::AppendBuffer append_buffer1 = create_append_buffer("tag", 123,
+                                                             "payload");
+  journal::AppendBuffers append_buffers;
+  append_buffers = boost::assign::list_of(append_buffer1);
+  ASSERT_FALSE(object->append(append_buffers));
+  ASSERT_EQ(1U, object->get_pending_appends());
+
+  journal::AppendBuffer append_buffer2 = create_append_buffer("tag", 124,
+                                                             "payload");
+  append_buffers = boost::assign::list_of(append_buffer2);
+  ASSERT_FALSE(object->append(append_buffers));
+  ASSERT_EQ(0U, object->get_pending_appends());
+
+  C_SaferCond cond;
+  append_buffer2.first->wait(&cond);
+  ASSERT_EQ(0, cond.wait());
+}
+
+TEST_F(TestObjectRecorder, AppendFlushByBytes) {
+  std::string oid = get_temp_oid();
+
+  set_flush_bytes(10);
+  journal::ObjectRecorderPtr object = create_object(oid, 24);
+
+  journal::AppendBuffer append_buffer1 = create_append_buffer("tag", 123,
+                                                             "payload");
+  journal::AppendBuffers append_buffers;
+  append_buffers = boost::assign::list_of(append_buffer1);
+  ASSERT_FALSE(object->append(append_buffers));
+  ASSERT_EQ(1U, object->get_pending_appends());
+
+  journal::AppendBuffer append_buffer2 = create_append_buffer("tag", 124,
+                                                             "payload");
+  append_buffers = boost::assign::list_of(append_buffer2);
+  ASSERT_FALSE(object->append(append_buffers));
+  ASSERT_EQ(0U, object->get_pending_appends());
+
+  C_SaferCond cond;
+  append_buffer2.first->wait(&cond);
+  ASSERT_EQ(0, cond.wait());
+}
+
+TEST_F(TestObjectRecorder, AppendFlushByAge) {
+  std::string oid = get_temp_oid();
+
+  set_flush_age(0.1);
+  journal::ObjectRecorderPtr object = create_object(oid, 24);
+
+  journal::AppendBuffer append_buffer1 = create_append_buffer("tag", 123,
+                                                             "payload");
+  journal::AppendBuffers append_buffers;
+  append_buffers = boost::assign::list_of(append_buffer1);
+  ASSERT_FALSE(object->append(append_buffers));
+
+  journal::AppendBuffer append_buffer2 = create_append_buffer("tag", 124,
+                                                             "payload");
+  append_buffers = boost::assign::list_of(append_buffer2);
+  ASSERT_FALSE(object->append(append_buffers));
+
+  C_SaferCond cond;
+  append_buffer2.first->wait(&cond);
+  ASSERT_EQ(0, cond.wait());
+  ASSERT_EQ(0U, object->get_pending_appends());
+}
+
+TEST_F(TestObjectRecorder, AppendFilledObject) {
+  std::string oid = get_temp_oid();
+
+  journal::ObjectRecorderPtr object = create_object(oid, 12);
+
+  std::string payload(2048, '1');
+  journal::AppendBuffer append_buffer1 = create_append_buffer("tag", 123,
+                                                              payload);
+  journal::AppendBuffers append_buffers;
+  append_buffers = boost::assign::list_of(append_buffer1);
+  ASSERT_FALSE(object->append(append_buffers));
+
+  journal::AppendBuffer append_buffer2 = create_append_buffer("tag", 124,
+                                                              payload);
+  append_buffers = boost::assign::list_of(append_buffer2);
+  ASSERT_TRUE(object->append(append_buffers));
+
+  C_SaferCond cond;
+  append_buffer2.first->wait(&cond);
+  ASSERT_EQ(0, cond.wait());
+  ASSERT_EQ(0U, object->get_pending_appends());
+}
+
+TEST_F(TestObjectRecorder, Flush) {
+  std::string oid = get_temp_oid();
+
+  journal::ObjectRecorderPtr object = create_object(oid, 24);
+
+  journal::AppendBuffer append_buffer1 = create_append_buffer("tag", 123,
+                                                             "payload");
+  journal::AppendBuffers append_buffers;
+  append_buffers = boost::assign::list_of(append_buffer1);
+  ASSERT_FALSE(object->append(append_buffers));
+  ASSERT_EQ(1U, object->get_pending_appends());
+
+  object->flush();
+
+  C_SaferCond cond;
+  append_buffer1.first->wait(&cond);
+  ASSERT_EQ(0, cond.wait());
+  ASSERT_EQ(0U, object->get_pending_appends());
+}
+
+TEST_F(TestObjectRecorder, FlushFuture) {
+  std::string oid = get_temp_oid();
+
+  journal::ObjectRecorderPtr object = create_object(oid, 24);
+
+  journal::AppendBuffer append_buffer = create_append_buffer("tag", 123,
+                                                             "payload");
+  journal::AppendBuffers append_buffers;
+  append_buffers = boost::assign::list_of(append_buffer);
+  ASSERT_FALSE(object->append(append_buffers));
+  ASSERT_EQ(1U, object->get_pending_appends());
+
+  C_SaferCond cond;
+  append_buffer.first->wait(&cond);
+  object->flush(append_buffer.first);
+  ASSERT_TRUE(append_buffer.first->is_flush_in_progress() ||
+              append_buffer.first->is_complete());
+  ASSERT_EQ(0, cond.wait());
+}
+
+TEST_F(TestObjectRecorder, FlushDetachedFuture) {
+  std::string oid = get_temp_oid();
+
+  journal::ObjectRecorderPtr object = create_object(oid, 24);
+
+  journal::AppendBuffer append_buffer = create_append_buffer("tag", 123,
+                                                             "payload");
+
+  journal::AppendBuffers append_buffers;
+  append_buffers = boost::assign::list_of(append_buffer);
+
+  object->flush(append_buffer.first);
+  ASSERT_FALSE(append_buffer.first->is_flush_in_progress());
+  ASSERT_FALSE(object->append(append_buffers));
+
+  // should automatically flush once its attached to the object
+  C_SaferCond cond;
+  append_buffer.first->wait(&cond);
+  ASSERT_EQ(0, cond.wait());
+}
+
+TEST_F(TestObjectRecorder, Overflow) {
+  std::string oid = get_temp_oid();
+
+  journal::ObjectRecorderPtr object1 = create_object(oid, 12);
+  journal::ObjectRecorderPtr object2 = create_object(oid, 12);
+
+  std::string payload(2048, '1');
+  journal::AppendBuffer append_buffer1 = create_append_buffer("tag", 123,
+                                                              payload);
+  journal::AppendBuffer append_buffer2 = create_append_buffer("tag", 124,
+                                                              payload);
+  journal::AppendBuffers append_buffers;
+  append_buffers = boost::assign::list_of(append_buffer1)(append_buffer2);
+  ASSERT_TRUE(object1->append(append_buffers));
+
+  C_SaferCond cond;
+  append_buffer2.first->wait(&cond);
+  ASSERT_EQ(0, cond.wait());
+  ASSERT_EQ(0U, object1->get_pending_appends());
+
+  journal::AppendBuffer append_buffer3 = create_append_buffer("bar", 123,
+                                                              payload);
+  append_buffers = boost::assign::list_of(append_buffer3);
+
+  ASSERT_FALSE(object2->append(append_buffers));
+  append_buffer3.first->flush(NULL);
+
+  bool overflowed = false;
+  {
+    Mutex::Locker locker(m_overflow_handler.lock);
+    while (m_overflow_handler.overflows == 0) {
+      if (m_overflow_handler.cond.WaitInterval(
+            reinterpret_cast<CephContext*>(m_ioctx.cct()),
+            m_overflow_handler.lock, utime_t(10, 0)) != 0) {
+        break;
+      }
+    }
+    if (m_overflow_handler.overflows != 0) {
+      overflowed = true;
+    }
+  }
+
+  ASSERT_TRUE(overflowed);
+}
diff --git a/src/test/journal/test_main.cc b/src/test/journal/test_main.cc
new file mode 100644 (file)
index 0000000..3323b60
--- /dev/null
@@ -0,0 +1,27 @@
+// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "gtest/gtest.h"
+#include "common/ceph_argparse.h"
+#include "common/ceph_crypto.h"
+#include "common/config.h"
+#include "global/global_context.h"
+#include "global/global_init.h"
+#include <vector>
+
+int main(int argc, char **argv)
+{
+  ::testing::InitGoogleTest(&argc, argv);
+
+  std::vector<const char*> args;
+  argv_to_vec(argc, (const char **)argv, args);
+
+  global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
+  g_conf->set_val("lockdep", "true");
+  common_init_finish(g_ceph_context);
+
+  int r = RUN_ALL_TESTS();
+  g_ceph_context->put();
+  ceph::crypto::shutdown();
+  return r;
+}