test/librados_test_stub/TestIoCtxImpl.h
noinst_LTLIBRARIES += librados_test_stub.la
+unittest_journal_SOURCES = \
+ test/journal/test_main.cc \
+ test/journal/test_Entry.cc \
+ test/journal/test_FutureImpl.cc \
+ test/journal/test_Journaler.cc \
+ test/journal/test_JournalMetadata.cc \
+ test/journal/test_JournalPlayer.cc \
+ test/journal/test_JournalRecorder.cc \
+ test/journal/test_ObjectPlayer.cc \
+ test/journal/test_ObjectRecorder.cc \
+ test/journal/RadosTestFixture.cc
+unittest_journal_CXXFLAGS = $(UNITTEST_CXXFLAGS)
+unittest_journal_LDADD = \
+ libjournal.la libcls_journal_client.la \
+ librados_test_stub.la librados_internal.la \
+ $(UNITTEST_LDADD) $(CEPH_GLOBAL) $(RADOS_TEST_LDADD)
+check_TESTPROGRAMS += unittest_journal
if WITH_RBD
ceph_smalliobenchrbd_SOURCES = \
test/system/st_rados_watch.h \
test/system/systest_runnable.h \
test/system/systest_settings.h \
- test/unit.h
+ test/unit.h \
+ test/journal/RadosTestFixture.h
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/journal/RadosTestFixture.h"
+#include "cls/journal/cls_journal_client.h"
+#include "include/stringify.h"
+
+RadosTestFixture::RadosTestFixture()
+ : m_timer_lock("m_timer_lock"), m_timer(NULL) {
+}
+
+void RadosTestFixture::SetUpTestCase() {
+ _pool_name = get_temp_pool_name();
+ ASSERT_EQ("", create_one_pool_pp(_pool_name, _rados));
+}
+
+void RadosTestFixture::TearDownTestCase() {
+ ASSERT_EQ(0, destroy_one_pool_pp(_pool_name, _rados));
+}
+
+std::string RadosTestFixture::get_temp_oid() {
+ ++_oid_number;
+ return "oid" + stringify(_oid_number);
+}
+
+void RadosTestFixture::SetUp() {
+ ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), m_ioctx));
+ m_timer = new SafeTimer(reinterpret_cast<CephContext*>(m_ioctx.cct()),
+ m_timer_lock, false);
+ m_timer->init();
+}
+
+void RadosTestFixture::TearDown() {
+ {
+ Mutex::Locker locker(m_timer_lock);
+ m_timer->shutdown();
+ }
+ delete m_timer;
+}
+
+int RadosTestFixture::create(const std::string &oid, uint8_t order,
+ uint8_t splay_width) {
+ return cls::journal::client::create(m_ioctx, oid, order, splay_width);
+}
+
+int RadosTestFixture::append(const std::string &oid, const bufferlist &bl) {
+ librados::ObjectWriteOperation op;
+ op.append(bl);
+ return m_ioctx.operate(oid, &op);
+}
+
+int RadosTestFixture::client_register(const std::string &oid,
+ const std::string &id,
+ const std::string &description) {
+ return cls::journal::client::client_register(m_ioctx, oid, id, description);
+}
+
+int RadosTestFixture::client_commit(const std::string &oid,
+ const std::string &id,
+ const cls::journal::ObjectSetPosition &commit_position) {
+ librados::ObjectWriteOperation op;
+ cls::journal::client::client_commit(&op, id, commit_position);
+ return m_ioctx.operate(oid, &op);
+}
+
+bufferlist RadosTestFixture::create_payload(const std::string &payload) {
+ bufferlist bl;
+ bl.append(payload);
+ return bl;
+}
+
+std::string RadosTestFixture::_pool_name;
+librados::Rados RadosTestFixture::_rados;
+uint64_t RadosTestFixture::_oid_number = 0;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "test/librados/test.h"
+#include "common/Mutex.h"
+#include "common/Timer.h"
+#include "cls/journal/cls_journal_types.h"
+#include "gtest/gtest.h"
+
+class RadosTestFixture : public ::testing::Test {
+public:
+ static void SetUpTestCase();
+ static void TearDownTestCase();
+
+ static std::string get_temp_oid();
+
+ RadosTestFixture();
+ virtual void SetUp();
+ virtual void TearDown();
+
+ int create(const std::string &oid, uint8_t order, uint8_t splay_width);
+ int append(const std::string &oid, const bufferlist &bl);
+
+ int client_register(const std::string &oid, const std::string &id,
+ const std::string &description);
+ int client_commit(const std::string &oid, const std::string &id,
+ const cls::journal::ObjectSetPosition &commit_position);
+
+ bufferlist create_payload(const std::string &payload);
+
+ static std::string _pool_name;
+ static librados::Rados _rados;
+ static uint64_t _oid_number;
+
+ librados::IoCtx m_ioctx;
+
+ Mutex m_timer_lock;
+ SafeTimer *m_timer;
+};
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/Entry.h"
+#include "gtest/gtest.h"
+
+class TestEntry : public ::testing::Test {
+};
+
+TEST_F(TestEntry, DefaultConstructor) {
+ journal::Entry entry;
+ ASSERT_EQ(0U, entry.get_tid());
+ ASSERT_EQ("", entry.get_tag());
+
+ bufferlist data(entry.get_data());
+ bufferlist expected_data;
+ ASSERT_TRUE(data.contents_equal(expected_data));
+}
+
+TEST_F(TestEntry, Constructor) {
+ bufferlist data;
+ data.append("data");
+ journal::Entry entry("tag", 123, data);
+
+ data.clear();
+ data = entry.get_data();
+
+ bufferlist expected_data;
+ expected_data.append("data");
+
+ ASSERT_EQ(123U, entry.get_tid());
+ ASSERT_EQ("tag", entry.get_tag());
+ ASSERT_TRUE(data.contents_equal(expected_data));
+}
+
+TEST_F(TestEntry, IsReadable) {
+ bufferlist data;
+ data.append("data");
+ journal::Entry entry("tag", 123, data);
+
+ bufferlist full_bl;
+ ::encode(entry, full_bl);
+
+ uint32_t bytes_needed;
+ for (size_t i = 0; i < full_bl.length() - 1; ++i) {
+ bufferlist partial_bl;
+ if (i > 0) {
+ partial_bl.substr_of(full_bl, 0, i);
+ }
+ ASSERT_FALSE(journal::Entry::is_readable(partial_bl.begin(),
+ &bytes_needed));
+ ASSERT_GT(bytes_needed, 0U);
+ }
+ ASSERT_TRUE(journal::Entry::is_readable(full_bl.begin(), &bytes_needed));
+ ASSERT_EQ(0U, bytes_needed);
+}
+
+TEST_F(TestEntry, IsReadableBadPreamble) {
+ bufferlist data;
+ data.append("data");
+ journal::Entry entry("tag", 123, data);
+
+ uint64_t stray_bytes = 0x1122334455667788;
+ bufferlist full_bl;
+ ::encode(stray_bytes, full_bl);
+ ::encode(entry, full_bl);
+
+ uint32_t bytes_needed;
+ bufferlist::iterator it = full_bl.begin();
+ ASSERT_FALSE(journal::Entry::is_readable(it, &bytes_needed));
+ ASSERT_EQ(0U, bytes_needed);
+
+ it.advance(sizeof(stray_bytes));
+ ASSERT_TRUE(journal::Entry::is_readable(it, &bytes_needed));
+ ASSERT_EQ(0U, bytes_needed);
+}
+
+TEST_F(TestEntry, IsReadableBadCRC) {
+ bufferlist data;
+ data.append("data");
+ journal::Entry entry("tag", 123, data);
+
+ bufferlist full_bl;
+ ::encode(entry, full_bl);
+
+ bufferlist bad_bl;
+ bad_bl.substr_of(full_bl, 0, full_bl.length() - 4);
+ ::encode(full_bl.crc32c(1), bad_bl);
+
+ uint32_t bytes_needed;
+ ASSERT_FALSE(journal::Entry::is_readable(bad_bl.begin(), &bytes_needed));
+ ASSERT_EQ(0U, bytes_needed);
+
+
+
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/FutureImpl.h"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "gtest/gtest.h"
+
+class TestFutureImpl : public ::testing::Test {
+public:
+
+ struct FlushHandler : public journal::FutureImpl::FlushHandler {
+ uint64_t refs;
+ uint64_t flushes;
+ FlushHandler() : refs(0), flushes(0) {}
+ virtual void get() {
+ ++refs;
+ }
+ virtual void put() {
+ assert(refs > 0);
+ --refs;
+ }
+ virtual void flush(const journal::FutureImplPtr &future) {
+ ++flushes;
+ }
+ };
+
+ journal::FutureImplPtr create_future(const std::string &tag, uint64_t tid,
+ const journal::FutureImplPtr &prev =
+ journal::FutureImplPtr()) {
+ journal::FutureImplPtr future(new journal::FutureImpl(tag, tid));
+ future->init(prev);
+ return future;
+ }
+
+ void flush(const journal::FutureImplPtr &future) {
+ }
+
+ FlushHandler m_flush_handler;
+};
+
+TEST_F(TestFutureImpl, Attach) {
+ journal::FutureImplPtr future = create_future("tag", 123);
+ ASSERT_FALSE(future->attach(&m_flush_handler));
+ ASSERT_EQ(1U, m_flush_handler.refs);
+}
+
+TEST_F(TestFutureImpl, AttachWithPendingFlush) {
+ journal::FutureImplPtr future = create_future("tag", 123);
+ future->flush(NULL);
+
+ ASSERT_TRUE(future->attach(&m_flush_handler));
+ ASSERT_EQ(1U, m_flush_handler.refs);
+}
+
+TEST_F(TestFutureImpl, Detach) {
+ journal::FutureImplPtr future = create_future("tag", 123);
+ ASSERT_FALSE(future->attach(&m_flush_handler));
+ future->detach();
+ ASSERT_EQ(0U, m_flush_handler.refs);
+}
+
+TEST_F(TestFutureImpl, DetachImplicit) {
+ journal::FutureImplPtr future = create_future("tag", 123);
+ ASSERT_FALSE(future->attach(&m_flush_handler));
+ future.reset();
+ ASSERT_EQ(0U, m_flush_handler.refs);
+}
+
+TEST_F(TestFutureImpl, Flush) {
+ journal::FutureImplPtr future = create_future("tag", 123);
+ ASSERT_FALSE(future->attach(&m_flush_handler));
+
+ C_SaferCond cond;
+ future->flush(&cond);
+
+ ASSERT_EQ(1U, m_flush_handler.flushes);
+ future->safe(-EIO);
+ ASSERT_EQ(-EIO, cond.wait());
+}
+
+TEST_F(TestFutureImpl, FlushWithoutContext) {
+ journal::FutureImplPtr future = create_future("tag", 123);
+ ASSERT_FALSE(future->attach(&m_flush_handler));
+
+ future->flush(NULL);
+ ASSERT_EQ(1U, m_flush_handler.flushes);
+ future->safe(-EIO);
+ ASSERT_TRUE(future->is_complete());
+ ASSERT_EQ(-EIO, future->get_return_value());
+}
+
+TEST_F(TestFutureImpl, FlushChain) {
+ journal::FutureImplPtr future1 = create_future("tag1", 123);
+ journal::FutureImplPtr future2 = create_future("tag1", 124, future1);
+ journal::FutureImplPtr future3 = create_future("tag2", 1, future2);
+ ASSERT_FALSE(future1->attach(&m_flush_handler));
+ ASSERT_FALSE(future2->attach(&m_flush_handler));
+ ASSERT_FALSE(future3->attach(&m_flush_handler));
+
+ C_SaferCond cond;
+ future3->flush(&cond);
+
+ ASSERT_EQ(3U, m_flush_handler.flushes);
+
+ future3->safe(0);
+ ASSERT_FALSE(future3->is_complete());
+
+ future1->safe(0);
+ ASSERT_FALSE(future3->is_complete());
+
+ future2->safe(-EIO);
+ ASSERT_TRUE(future3->is_complete());
+ ASSERT_EQ(-EIO, future3->get_return_value());
+ ASSERT_EQ(-EIO, cond.wait());
+ ASSERT_EQ(0, future1->get_return_value());
+}
+
+TEST_F(TestFutureImpl, FlushInProgress) {
+ journal::FutureImplPtr future1 = create_future("tag1", 123);
+ journal::FutureImplPtr future2 = create_future("tag1", 124, future1);
+ ASSERT_FALSE(future1->attach(&m_flush_handler));
+ ASSERT_FALSE(future2->attach(&m_flush_handler));
+
+ future1->set_flush_in_progress();
+ ASSERT_TRUE(future1->is_flush_in_progress());
+
+ future1->flush(NULL);
+ ASSERT_EQ(0U, m_flush_handler.flushes);
+}
+
+TEST_F(TestFutureImpl, FlushAlreadyComplete) {
+ journal::FutureImplPtr future = create_future("tag1", 123);
+ future->safe(-EIO);
+
+ C_SaferCond cond;
+ future->flush(&cond);
+ ASSERT_EQ(-EIO, cond.wait());
+}
+
+TEST_F(TestFutureImpl, Wait) {
+ journal::FutureImplPtr future = create_future("tag", 1);
+
+ C_SaferCond cond;
+ future->wait(&cond);
+ future->safe(-EEXIST);
+ ASSERT_EQ(-EEXIST, cond.wait());
+}
+
+TEST_F(TestFutureImpl, WaitAlreadyComplete) {
+ journal::FutureImplPtr future = create_future("tag", 1);
+ future->safe(-EEXIST);
+
+ C_SaferCond cond;
+ future->wait(&cond);
+ ASSERT_EQ(-EEXIST, cond.wait());
+}
+
+TEST_F(TestFutureImpl, SafePreservesError) {
+ journal::FutureImplPtr future1 = create_future("tag1", 123);
+ journal::FutureImplPtr future2 = create_future("tag1", 124, future1);
+
+ future1->safe(-EIO);
+ future2->safe(-EEXIST);
+ ASSERT_TRUE(future2->is_complete());
+ ASSERT_EQ(-EIO, future2->get_return_value());
+}
+
+TEST_F(TestFutureImpl, ConsistentPreservesError) {
+ journal::FutureImplPtr future1 = create_future("tag1", 123);
+ journal::FutureImplPtr future2 = create_future("tag1", 124, future1);
+
+ future2->safe(-EEXIST);
+ future1->safe(-EIO);
+ ASSERT_TRUE(future2->is_complete());
+ ASSERT_EQ(-EEXIST, future2->get_return_value());
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/JournalMetadata.h"
+#include "test/journal/RadosTestFixture.h"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include <boost/assign/list_of.hpp>
+#include <map>
+
+class TestJournalMetadata : public RadosTestFixture {
+public:
+ TestJournalMetadata() : m_listener(this) {}
+
+ struct Listener : public journal::JournalMetadata::Listener {
+ TestJournalMetadata *test_fixture;
+ Mutex mutex;
+ Cond cond;
+ std::map<journal::JournalMetadata*, uint32_t> updates;
+
+ Listener(TestJournalMetadata *_test_fixture)
+ : test_fixture(_test_fixture), mutex("mutex") {}
+
+ virtual void handle_update(journal::JournalMetadata *metadata) {
+ Mutex::Locker locker(mutex);
+ ++updates[metadata];
+ cond.Signal();
+ }
+ };
+
+ journal::JournalMetadataPtr create_metadata(const std::string &oid,
+ const std::string &client_id) {
+ journal::JournalMetadataPtr metadata(new journal::JournalMetadata(
+ m_ioctx, oid, client_id));
+ metadata->add_listener(&m_listener);
+ return metadata;
+ }
+
+ bool wait_for_update(journal::JournalMetadataPtr metadata) {
+ Mutex::Locker locker(m_listener.mutex);
+ while (m_listener.updates[metadata.get()] == 0) {
+ if (m_listener.cond.WaitInterval(
+ reinterpret_cast<CephContext*>(m_ioctx.cct()),
+ m_listener.mutex, utime_t(10, 0)) != 0) {
+ return false;
+ }
+ }
+ --m_listener.updates[metadata.get()];
+ return true;
+ }
+
+ Listener m_listener;
+};
+
+TEST_F(TestJournalMetadata, JournalDNE) {
+ std::string oid = get_temp_oid();
+
+ journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1");
+ ASSERT_EQ(-ENOENT, metadata1->init());
+}
+
+TEST_F(TestJournalMetadata, ClientDNE) {
+ std::string oid = get_temp_oid();
+
+ ASSERT_EQ(0, create(oid, 14, 2));
+ ASSERT_EQ(0, client_register(oid, "client1", ""));
+
+ journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1");
+ ASSERT_EQ(0, metadata1->init());
+
+ journal::JournalMetadataPtr metadata2 = create_metadata(oid, "client2");
+ ASSERT_EQ(-ENOENT, metadata2->init());
+}
+
+TEST_F(TestJournalMetadata, SetCommitPositions) {
+ std::string oid = get_temp_oid();
+
+ ASSERT_EQ(0, create(oid, 14, 2));
+ ASSERT_EQ(0, client_register(oid, "client1", ""));
+
+ journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1");
+ ASSERT_EQ(0, metadata1->init());
+
+ journal::JournalMetadataPtr metadata2 = create_metadata(oid, "client1");
+ ASSERT_EQ(0, metadata2->init());
+ ASSERT_TRUE(wait_for_update(metadata2));
+
+ journal::JournalMetadata::ObjectSetPosition commit_position;
+ journal::JournalMetadata::ObjectSetPosition read_commit_position;
+ metadata1->get_commit_position(&read_commit_position);
+ ASSERT_EQ(commit_position, read_commit_position);
+
+ journal::JournalMetadata::EntryPositions entry_positions;
+ entry_positions = boost::assign::list_of(
+ cls::journal::EntryPosition("tag1", 122));
+ commit_position = journal::JournalMetadata::ObjectSetPosition(1, entry_positions);
+
+ metadata1->set_commit_position(commit_position);
+ ASSERT_TRUE(wait_for_update(metadata2));
+
+ metadata2->get_commit_position(&read_commit_position);
+ ASSERT_EQ(commit_position, read_commit_position);
+}
+
+TEST_F(TestJournalMetadata, UpdateActiveObject) {
+ std::string oid = get_temp_oid();
+
+ ASSERT_EQ(0, create(oid, 14, 2));
+ ASSERT_EQ(0, client_register(oid, "client1", ""));
+
+ journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1");
+ ASSERT_EQ(0, metadata1->init());
+ ASSERT_TRUE(wait_for_update(metadata1));
+
+ ASSERT_EQ(0U, metadata1->get_active_set());
+
+ metadata1->set_active_set(123);
+ ASSERT_TRUE(wait_for_update(metadata1));
+
+ ASSERT_EQ(123U, metadata1->get_active_set());
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/JournalPlayer.h"
+#include "journal/Entry.h"
+#include "journal/JournalMetadata.h"
+#include "journal/ReplayHandler.h"
+#include "include/stringify.h"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "gtest/gtest.h"
+#include "test/journal/RadosTestFixture.h"
+#include <list>
+#include <boost/assign/list_of.hpp>
+
+class TestJournalPlayer : public RadosTestFixture {
+public:
+ typedef std::list<journal::Entry> Entries;
+
+ struct ReplayHandler : public journal::ReplayHandler {
+ Mutex lock;
+ Cond cond;
+ bool entries_available;
+ bool error_occurred;
+
+ ReplayHandler()
+ : lock("lock"), entries_available(false), error_occurred(false) {}
+
+ virtual bool filter_entry(const std::string &tag) {
+ return false;
+ }
+
+ virtual void handle_entries_available() {
+ Mutex::Locker locker(lock);
+ entries_available = true;
+ cond.Signal();
+ }
+
+ virtual void handle_error(int r) {
+ error_occurred = true;
+ }
+ };
+
+ int create(const std::string &oid) {
+ return RadosTestFixture::create(oid, 14, 2);
+ }
+
+ int client_register(const std::string &oid) {
+ return RadosTestFixture::client_register(oid, "client", "");
+ }
+
+ int client_commit(const std::string &oid,
+ journal::JournalPlayer::ObjectSetPosition position) {
+ return RadosTestFixture::client_commit(oid, "client", position);
+ }
+
+ journal::Entry create_entry(const std::string &tag, uint64_t tid) {
+ bufferlist payload_bl;
+ payload_bl.append("playload");
+ return journal::Entry(tag, tid, payload_bl);
+ }
+
+ journal::JournalMetadataPtr create_metadata(const std::string &oid) {
+ journal::JournalMetadataPtr metadata(new journal::JournalMetadata(
+ m_ioctx, oid, "client"));
+ return metadata;
+ }
+
+ journal::JournalPlayerPtr create_player(const std::string &oid,
+ const journal::JournalMetadataPtr &metadata) {
+ journal::JournalPlayerPtr player(new journal::JournalPlayer(
+ m_ioctx, oid + ".", metadata, &m_replay_hander));
+ return player;
+ }
+
+ bool wait_for_entries(journal::JournalPlayerPtr player, uint32_t count,
+ Entries *entries) {
+ entries->clear();
+ while (entries->size() < count) {
+ journal::Entry entry;
+ journal::JournalPlayer::ObjectSetPosition object_set_position;
+ while (entries->size() < count &&
+ player->try_pop_front(&entry, &object_set_position)) {
+ entries->push_back(entry);
+ }
+ if (entries->size() == count) {
+ break;
+ }
+
+ Mutex::Locker locker(m_replay_hander.lock);
+ if (m_replay_hander.entries_available) {
+ m_replay_hander.entries_available = false;
+ } else if (m_replay_hander.cond.WaitInterval(
+ reinterpret_cast<CephContext*>(m_ioctx.cct()),
+ m_replay_hander.lock, utime_t(10, 0)) != 0) {
+ break;
+ }
+ }
+ return entries->size() == count;
+ }
+
+ int write_entry(const std::string &oid, uint64_t object_num,
+ const std::string &tag, uint64_t tid) {
+ bufferlist bl;
+ ::encode(create_entry(tag, tid), bl);
+ return append(oid + "." + stringify(object_num), bl);
+ }
+
+ ReplayHandler m_replay_hander;
+};
+
+TEST_F(TestJournalPlayer, Prefetch) {
+ std::string oid = get_temp_oid();
+
+ journal::JournalPlayer::EntryPositions positions;
+ positions = boost::assign::list_of(
+ cls::journal::EntryPosition("tag1", 122));
+ cls::journal::ObjectSetPosition commit_position(0, positions);
+
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ ASSERT_EQ(0, client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, metadata->init());
+
+ journal::JournalPlayerPtr player = create_player(oid, metadata);
+
+ ASSERT_EQ(0, write_entry(oid, 0, "tag1", 122));
+ ASSERT_EQ(0, write_entry(oid, 1, "tag1", 123));
+ ASSERT_EQ(0, write_entry(oid, 0, "tag1", 124));
+ ASSERT_EQ(0, write_entry(oid, 1, "tag1", 125));
+
+ player->prefetch();
+
+ Entries entries;
+ ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+
+ Entries expected_entries;
+ expected_entries = boost::assign::list_of(
+ create_entry("tag1", 123))(
+ create_entry("tag1", 124))(
+ create_entry("tag1", 125));
+ ASSERT_EQ(expected_entries, entries);
+
+ uint64_t last_tid;
+ ASSERT_TRUE(metadata->get_last_allocated_tid("tag1", &last_tid));
+ ASSERT_EQ(125U, last_tid);
+}
+
+TEST_F(TestJournalPlayer, PrefetchWithoutCommit) {
+ std::string oid = get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ ASSERT_EQ(0, client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, metadata->init());
+
+ journal::JournalPlayerPtr player = create_player(oid, metadata);
+
+ ASSERT_EQ(0, write_entry(oid, 0, "tag1", 122));
+ ASSERT_EQ(0, write_entry(oid, 1, "tag1", 123));
+
+ player->prefetch();
+
+ Entries entries;
+ ASSERT_TRUE(wait_for_entries(player, 2, &entries));
+
+ Entries expected_entries;
+ expected_entries = boost::assign::list_of(
+ create_entry("tag1", 122))(
+ create_entry("tag1", 123));
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TEST_F(TestJournalPlayer, PrefetchMultipleTags) {
+ std::string oid = get_temp_oid();
+
+ journal::JournalPlayer::EntryPositions positions;
+ positions = boost::assign::list_of(
+ cls::journal::EntryPosition("tag1", 122))(
+ cls::journal::EntryPosition("tag2", 1));
+ cls::journal::ObjectSetPosition commit_position(0, positions);
+
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ ASSERT_EQ(0, client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, metadata->init());
+
+ journal::JournalPlayerPtr player = create_player(oid, metadata);
+
+ ASSERT_EQ(0, write_entry(oid, 0, "tag1", 120));
+ ASSERT_EQ(0, write_entry(oid, 0, "tag2", 0));
+ ASSERT_EQ(0, write_entry(oid, 1, "tag1", 121));
+ ASSERT_EQ(0, write_entry(oid, 1, "tag2", 1));
+ ASSERT_EQ(0, write_entry(oid, 0, "tag1", 122));
+ ASSERT_EQ(0, write_entry(oid, 1, "tag1", 123));
+ ASSERT_EQ(0, write_entry(oid, 0, "tag1", 124));
+ ASSERT_EQ(0, write_entry(oid, 0, "tag2", 2));
+
+ player->prefetch();
+
+ Entries entries;
+ ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+
+ uint64_t last_tid;
+ ASSERT_TRUE(metadata->get_last_allocated_tid("tag1", &last_tid));
+ ASSERT_EQ(124U, last_tid);
+ ASSERT_TRUE(metadata->get_last_allocated_tid("tag2", &last_tid));
+ ASSERT_EQ(2U, last_tid);
+}
+
+TEST_F(TestJournalPlayer, PrefetchCorruptSequence) {
+ std::string oid = get_temp_oid();
+
+ cls::journal::ObjectSetPosition commit_position;
+
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ ASSERT_EQ(0, client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, metadata->init());
+
+ journal::JournalPlayerPtr player = create_player(oid, metadata);
+
+ ASSERT_EQ(0, write_entry(oid, 0, "tag1", 120));
+ ASSERT_EQ(0, write_entry(oid, 0, "tag2", 0));
+ ASSERT_EQ(0, write_entry(oid, 1, "tag1", 121));
+ ASSERT_EQ(0, write_entry(oid, 0, "tag1", 124));
+
+ player->prefetch();
+ Entries entries;
+ ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+
+ journal::Entry entry;
+ cls::journal::ObjectSetPosition object_set_position;
+ ASSERT_FALSE(player->try_pop_front(&entry, &object_set_position));
+ ASSERT_TRUE(m_replay_hander.error_occurred);
+}
+
+TEST_F(TestJournalPlayer, PrefetchAndWatch) {
+ std::string oid = get_temp_oid();
+
+ journal::JournalPlayer::EntryPositions positions;
+ positions = boost::assign::list_of(
+ cls::journal::EntryPosition("tag1", 122));
+ cls::journal::ObjectSetPosition commit_position(0, positions);
+
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ ASSERT_EQ(0, client_commit(oid, commit_position));
+
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, metadata->init());
+
+ journal::JournalPlayerPtr player = create_player(oid, metadata);
+
+ ASSERT_EQ(0, write_entry(oid, 0, "tag1", 122));
+
+ player->prefetch_and_watch(0.25);
+
+ Entries entries;
+ ASSERT_EQ(0, write_entry(oid, 1, "tag1", 123));
+ ASSERT_TRUE(wait_for_entries(player, 1, &entries));
+
+ Entries expected_entries;
+ expected_entries = boost::assign::list_of(create_entry("tag1", 123));
+ ASSERT_EQ(expected_entries, entries);
+
+ ASSERT_EQ(0, write_entry(oid, 0, "tag1", 124));
+ ASSERT_TRUE(wait_for_entries(player, 1, &entries));
+
+ expected_entries = boost::assign::list_of(create_entry("tag1", 124));
+ ASSERT_EQ(expected_entries, entries);
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/JournalRecorder.h"
+#include "journal/JournalMetadata.h"
+#include "test/journal/RadosTestFixture.h"
+#include <limits>
+#include <list>
+
+class TestJournalRecorder : public RadosTestFixture {
+public:
+
+ virtual void TearDown() {
+ for (std::list<journal::JournalRecorder*>::iterator it = m_recorders.begin();
+ it != m_recorders.end(); ++it) {
+ delete *it;
+ }
+ RadosTestFixture::TearDown();
+ }
+
+ int client_register(const std::string &oid) {
+ return RadosTestFixture::client_register(oid, "client", "");
+ }
+
+ journal::JournalMetadataPtr create_metadata(const std::string &oid) {
+ journal::JournalMetadataPtr metadata(new journal::JournalMetadata(
+ m_ioctx, oid, "client"));
+ return metadata;
+ }
+
+ journal::JournalRecorder *create_recorder(const std::string &oid,
+ const journal::JournalMetadataPtr &metadata) {
+ journal::JournalRecorder *recorder(new journal::JournalRecorder(
+ m_ioctx, oid + ".", metadata, 0, std::numeric_limits<uint32_t>::max(), 0));
+ m_recorders.push_back(recorder);
+ return recorder;
+ }
+
+ std::list<journal::JournalRecorder*> m_recorders;
+
+};
+
+TEST_F(TestJournalRecorder, Append) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid, 12, 2));
+ ASSERT_EQ(0, client_register(oid));
+
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, metadata->init());
+
+ journal::JournalRecorder *recorder = create_recorder(oid, metadata);
+
+ journal::Future future1 = recorder->append("tag1", create_payload("payload"));
+
+ C_SaferCond cond;
+ future1.flush(&cond);
+ ASSERT_EQ(0, cond.wait());
+}
+
+TEST_F(TestJournalRecorder, AppendKnownOverflow) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid, 12, 2));
+ ASSERT_EQ(0, client_register(oid));
+
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, metadata->init());
+ ASSERT_EQ(0U, metadata->get_active_set());
+
+ journal::JournalRecorder *recorder = create_recorder(oid, metadata);
+
+ recorder->append("tag1", create_payload(std::string(1 << 12, '1')));
+ journal::Future future2 = recorder->append("tag1", create_payload(std::string(1, '2')));
+
+ C_SaferCond cond;
+ future2.flush(&cond);
+ ASSERT_EQ(0, cond.wait());
+
+ ASSERT_EQ(1U, metadata->get_active_set());
+}
+
+TEST_F(TestJournalRecorder, AppendDelayedOverflow) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid, 12, 2));
+ ASSERT_EQ(0, client_register(oid));
+
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, metadata->init());
+ ASSERT_EQ(0U, metadata->get_active_set());
+
+ journal::JournalRecorder *recorder1 = create_recorder(oid, metadata);
+ journal::JournalRecorder *recorder2 = create_recorder(oid, metadata);
+
+ recorder1->append("tag1", create_payload(std::string(1, '1')));
+ recorder2->append("tag2", create_payload(std::string(1 << 12, '2')));
+
+ journal::Future future = recorder2->append("tag1", create_payload(std::string(1, '3')));
+
+ C_SaferCond cond;
+ future.flush(&cond);
+ ASSERT_EQ(0, cond.wait());
+
+ ASSERT_EQ(1U, metadata->get_active_set());
+}
+
+TEST_F(TestJournalRecorder, FutureFlush) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid, 12, 2));
+ ASSERT_EQ(0, client_register(oid));
+
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, metadata->init());
+
+ journal::JournalRecorder *recorder = create_recorder(oid, metadata);
+
+ journal::Future future1 = recorder->append("tag1", create_payload("payload1"));
+ journal::Future future2 = recorder->append("tag1", create_payload("payload2"));
+
+ C_SaferCond cond;
+ future2.flush(&cond);
+ ASSERT_EQ(0, cond.wait());
+ ASSERT_TRUE(future1.is_complete());
+ ASSERT_TRUE(future2.is_complete());
+}
+
+TEST_F(TestJournalRecorder, Flush) {
+ std::string oid = get_temp_oid();
+ ASSERT_EQ(0, create(oid, 12, 2));
+ ASSERT_EQ(0, client_register(oid));
+
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, metadata->init());
+
+ journal::JournalRecorder *recorder = create_recorder(oid, metadata);
+
+ journal::Future future1 = recorder->append("tag1", create_payload("payload1"));
+ journal::Future future2 = recorder->append("tag1", create_payload("payload2"));
+
+ recorder->flush();
+
+ C_SaferCond cond;
+ future2.wait(&cond);
+ ASSERT_EQ(0, cond.wait());
+ ASSERT_TRUE(future1.is_complete());
+ ASSERT_TRUE(future2.is_complete());
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/Journaler.h"
+#include "include/stringify.h"
+#include "gtest/gtest.h"
+#include "test/librados/test.h"
+#include "test/journal/RadosTestFixture.h"
+
+class TestJournaler : public RadosTestFixture {
+public:
+
+ static const std::string CLIENT_ID;
+
+ static uint64_t get_temp_journal_id() {
+ return ++_journal_id;
+ }
+
+ virtual void SetUp() {
+ RadosTestFixture::SetUp();
+ m_journal_id = get_temp_journal_id();
+ m_journaler = new journal::Journaler(m_ioctx, m_ioctx, m_journal_id,
+ CLIENT_ID);
+ }
+
+ virtual void TearDown() {
+ delete m_journaler;
+ RadosTestFixture::TearDown();
+ }
+
+ int create_journal(uint8_t order, uint8_t splay_width) {
+ return m_journaler->create(order, splay_width);
+ }
+
+ int register_client(const std::string &client_id, const std::string &desc) {
+ journal::Journaler journaler(m_ioctx, m_ioctx, m_journal_id, client_id);
+ return journaler.register_client(desc);
+ }
+
+ static uint64_t _journal_id;
+
+ uint64_t m_journal_id;
+ journal::Journaler *m_journaler;
+};
+
+const std::string TestJournaler::CLIENT_ID = "client1";
+uint64_t TestJournaler::_journal_id = 0;
+
+TEST_F(TestJournaler, Create) {
+ ASSERT_EQ(0, create_journal(12, 8));
+}
+
+TEST_F(TestJournaler, CreateDuplicate) {
+ ASSERT_EQ(0, create_journal(12, 8));
+ ASSERT_EQ(-EEXIST, create_journal(12, 8));
+}
+
+TEST_F(TestJournaler, CreateInvalidParams) {
+ ASSERT_EQ(-EDOM, create_journal(1, 8));
+ ASSERT_EQ(-EDOM, create_journal(123, 8));
+ ASSERT_EQ(-EINVAL, create_journal(12, 0));
+}
+
+TEST_F(TestJournaler, Init) {
+ ASSERT_EQ(0, create_journal(12, 8));
+ ASSERT_EQ(0, register_client(CLIENT_ID, "foo"));
+ ASSERT_EQ(0, m_journaler->init());
+}
+
+TEST_F(TestJournaler, InitDNE) {
+ ASSERT_EQ(-ENOENT, m_journaler->init());
+}
+
+TEST_F(TestJournaler, RegisterClientDuplicate) {
+ ASSERT_EQ(0, register_client(CLIENT_ID, "foo"));
+ ASSERT_EQ(-EEXIST, register_client(CLIENT_ID, "foo2"));
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/ObjectPlayer.h"
+#include "journal/Entry.h"
+#include "include/stringify.h"
+#include "common/Mutex.h"
+#include "common/Timer.h"
+#include "gtest/gtest.h"
+#include "test/librados/test.h"
+#include "test/journal/RadosTestFixture.h"
+#include <boost/assign/list_of.hpp>
+
+class TestObjectPlayer : public RadosTestFixture {
+public:
+ journal::ObjectPlayerPtr create_object(const std::string &oid,
+ uint8_t order) {
+ journal::ObjectPlayerPtr object(new journal::ObjectPlayer(
+ m_ioctx, oid + ".", 0, *m_timer, m_timer_lock, order));
+ return object;
+ }
+
+ std::string get_object_name(const std::string &oid) {
+ return oid + ".0";
+ }
+};
+
+TEST_F(TestObjectPlayer, Fetch) {
+ std::string oid = get_temp_oid();
+
+ journal::Entry entry1("tag1", 123, create_payload(std::string(24, '1')));
+ journal::Entry entry2("tag1", 124, create_payload(std::string(24, '1')));
+
+ bufferlist bl;
+ ::encode(entry1, bl);
+ ::encode(entry2, bl);
+ ASSERT_EQ(0, append(get_object_name(oid), bl));
+
+ journal::ObjectPlayerPtr object = create_object(oid, 14);
+
+ C_SaferCond cond;
+ object->fetch(&cond);
+ ASSERT_EQ(0, cond.wait());
+
+ journal::ObjectPlayer::Entries entries;
+ object->get_entries(&entries);
+ ASSERT_EQ(2U, entries.size());
+
+ journal::ObjectPlayer::Entries expected_entries = boost::assign::list_of(
+ entry1)(entry2);
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TEST_F(TestObjectPlayer, FetchLarge) {
+ std::string oid = get_temp_oid();
+
+ journal::Entry entry1("tag1", 123,
+ create_payload(std::string(8192 - 33, '1')));
+ journal::Entry entry2("tag1", 124, create_payload(""));
+
+ bufferlist bl;
+ ::encode(entry1, bl);
+ ::encode(entry2, bl);
+ ASSERT_EQ(0, append(get_object_name(oid), bl));
+
+ journal::ObjectPlayerPtr object = create_object(oid, 12);
+
+ C_SaferCond cond;
+ object->fetch(&cond);
+ ASSERT_EQ(0, cond.wait());
+
+ journal::ObjectPlayer::Entries entries;
+ object->get_entries(&entries);
+ ASSERT_EQ(1U, entries.size());
+
+ journal::ObjectPlayer::Entries expected_entries = boost::assign::list_of(
+ entry1);
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TEST_F(TestObjectPlayer, FetchDeDup) {
+ std::string oid = get_temp_oid();
+
+ journal::Entry entry1("tag1", 123, create_payload(std::string(24, '1')));
+ journal::Entry entry2("tag1", 123, create_payload(std::string(24, '2')));
+
+ bufferlist bl;
+ ::encode(entry1, bl);
+ ::encode(entry2, bl);
+ ASSERT_EQ(0, append(get_object_name(oid), bl));
+
+ journal::ObjectPlayerPtr object = create_object(oid, 14);
+
+ C_SaferCond cond;
+ object->fetch(&cond);
+ ASSERT_EQ(0, cond.wait());
+
+ journal::ObjectPlayer::Entries entries;
+ object->get_entries(&entries);
+ ASSERT_EQ(1U, entries.size());
+
+ journal::ObjectPlayer::Entries expected_entries = boost::assign::list_of(
+ entry2);
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TEST_F(TestObjectPlayer, FetchEmpty) {
+ std::string oid = get_temp_oid();
+
+ bufferlist bl;
+ ASSERT_EQ(0, append(get_object_name(oid), bl));
+
+ journal::ObjectPlayerPtr object = create_object(oid, 14);
+
+ C_SaferCond cond;
+ object->fetch(&cond);
+ ASSERT_EQ(-ENOENT, cond.wait());
+ ASSERT_TRUE(object->empty());
+}
+
+TEST_F(TestObjectPlayer, FetchError) {
+ std::string oid = get_temp_oid();
+
+ journal::ObjectPlayerPtr object = create_object(oid, 14);
+
+ C_SaferCond cond;
+ object->fetch(&cond);
+ ASSERT_EQ(-ENOENT, cond.wait());
+ ASSERT_TRUE(object->empty());
+}
+
+TEST_F(TestObjectPlayer, FetchCorrupt) {
+ std::string oid = get_temp_oid();
+
+ journal::Entry entry1("tag1", 123, create_payload(std::string(24, '1')));
+ journal::Entry entry2("tag1", 124, create_payload(std::string(24, '2')));
+
+ bufferlist bl;
+ ::encode(entry1, bl);
+ ::encode(create_payload("corruption"), bl);
+ ::encode(entry2, bl);
+ ASSERT_EQ(0, append(get_object_name(oid), bl));
+
+ journal::ObjectPlayerPtr object = create_object(oid, 14);
+
+ C_SaferCond cond;
+ object->fetch(&cond);
+ ASSERT_EQ(-EINVAL, cond.wait());
+
+ journal::ObjectPlayer::Entries entries;
+ object->get_entries(&entries);
+ ASSERT_EQ(2U, entries.size());
+
+ journal::ObjectPlayer::Entries expected_entries = boost::assign::list_of(
+ entry1)(entry2);
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TEST_F(TestObjectPlayer, FetchAppend) {
+ std::string oid = get_temp_oid();
+
+ journal::Entry entry1("tag1", 123, create_payload(std::string(24, '1')));
+ journal::Entry entry2("tag1", 124, create_payload(std::string(24, '2')));
+
+ bufferlist bl;
+ ::encode(entry1, bl);
+ ASSERT_EQ(0, append(get_object_name(oid), bl));
+
+ journal::ObjectPlayerPtr object = create_object(oid, 14);
+
+ C_SaferCond cond1;
+ object->fetch(&cond1);
+ ASSERT_EQ(0, cond1.wait());
+
+ journal::ObjectPlayer::Entries entries;
+ object->get_entries(&entries);
+ ASSERT_EQ(1U, entries.size());
+
+ journal::ObjectPlayer::Entries expected_entries = boost::assign::list_of(
+ entry1);
+ ASSERT_EQ(expected_entries, entries);
+
+ bl.clear();
+ ::encode(entry2, bl);
+ ASSERT_EQ(0, append(get_object_name(oid), bl));
+
+ C_SaferCond cond2;
+ object->fetch(&cond2);
+ ASSERT_EQ(0, cond2.wait());
+
+ object->get_entries(&entries);
+ ASSERT_EQ(2U, entries.size());
+
+ expected_entries = boost::assign::list_of(entry1)(entry2);
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TEST_F(TestObjectPlayer, PopEntry) {
+ std::string oid = get_temp_oid();
+
+ journal::Entry entry1("tag1", 123, create_payload(std::string(24, '1')));
+ journal::Entry entry2("tag1", 124, create_payload(std::string(24, '1')));
+
+ bufferlist bl;
+ ::encode(entry1, bl);
+ ::encode(entry2, bl);
+ ASSERT_EQ(0, append(get_object_name(oid), bl));
+
+ journal::ObjectPlayerPtr object = create_object(oid, 14);
+
+ C_SaferCond cond;
+ object->fetch(&cond);
+ ASSERT_EQ(0, cond.wait());
+
+ journal::ObjectPlayer::Entries entries;
+ object->get_entries(&entries);
+ ASSERT_EQ(2U, entries.size());
+
+ journal::Entry entry;
+ object->front(&entry);
+ object->pop_front();
+ ASSERT_EQ(entry1, entry);
+ object->front(&entry);
+ object->pop_front();
+ ASSERT_EQ(entry2, entry);
+ ASSERT_TRUE(object->empty());
+}
+
+TEST_F(TestObjectPlayer, Watch) {
+ std::string oid = get_temp_oid();
+ journal::ObjectPlayerPtr object = create_object(oid, 14);
+
+ C_SaferCond cond1;
+ object->watch(&cond1, 0.1);
+
+ journal::Entry entry1("tag1", 123, create_payload(std::string(24, '1')));
+ journal::Entry entry2("tag1", 124, create_payload(std::string(24, '1')));
+
+ bufferlist bl;
+ ::encode(entry1, bl);
+ ASSERT_EQ(0, append(get_object_name(oid), bl));
+ ASSERT_EQ(0, cond1.wait());
+
+ journal::ObjectPlayer::Entries entries;
+ object->get_entries(&entries);
+ ASSERT_EQ(1U, entries.size());
+
+ journal::ObjectPlayer::Entries expected_entries;
+ expected_entries = boost::assign::list_of(entry1);
+ ASSERT_EQ(expected_entries, entries);
+
+ C_SaferCond cond2;
+ object->watch(&cond2, 0.1);
+
+ bl.clear();
+ ::encode(entry2, bl);
+ ASSERT_EQ(0, append(get_object_name(oid), bl));
+ ASSERT_EQ(0, cond2.wait());
+
+ object->get_entries(&entries);
+ ASSERT_EQ(2U, entries.size());
+
+ expected_entries = boost::assign::list_of(entry1)(entry2);
+ ASSERT_EQ(expected_entries, entries);
+}
+
+TEST_F(TestObjectPlayer, Unwatch) {
+ std::string oid = get_temp_oid();
+ journal::ObjectPlayerPtr object = create_object(oid, 14);
+
+ Mutex mutex("lock");
+ Cond cond;
+ bool done = false;
+ int rval = 0;
+ C_SafeCond *ctx = new C_SafeCond(&mutex, &cond, &done, &rval);
+ object->watch(ctx, 0.1);
+ ASSERT_FALSE(done);
+ object->unwatch();
+}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/ObjectRecorder.h"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+#include "common/Timer.h"
+#include "gtest/gtest.h"
+#include "test/librados/test.h"
+#include "test/journal/RadosTestFixture.h"
+#include <limits>
+#include <boost/assign/list_of.hpp>
+
+class TestObjectRecorder : public RadosTestFixture {
+public:
+ TestObjectRecorder()
+ : m_flush_interval(std::numeric_limits<uint32_t>::max()),
+ m_flush_bytes(std::numeric_limits<uint64_t>::max()),
+ m_flush_age(600)
+ {
+ }
+
+ struct OverflowHandler : public journal::ObjectRecorder::OverflowHandler {
+ Mutex lock;
+ Cond cond;
+ uint32_t overflows;
+
+ OverflowHandler() : lock("lock"), overflows(0) {}
+
+ virtual void overflow(journal::ObjectRecorder *object_recorder) {
+ Mutex::Locker locker(lock);
+ journal::AppendBuffers append_buffers;
+ object_recorder->claim_append_buffers(&append_buffers);
+
+ ++overflows;
+ cond.Signal();
+ }
+ };
+
+ uint32_t m_flush_interval;
+ uint64_t m_flush_bytes;
+ double m_flush_age;
+ OverflowHandler m_overflow_handler;
+
+ inline void set_flush_interval(uint32_t i) {
+ m_flush_interval = i;
+ }
+ inline void set_flush_bytes(uint64_t i) {
+ m_flush_bytes = i;
+ }
+ inline void set_flush_age(double i) {
+ m_flush_age = i;
+ }
+
+ journal::AppendBuffer create_append_buffer(const std::string &tag,
+ uint64_t tid,
+ const std::string &payload) {
+ journal::FutureImplPtr future(new journal::FutureImpl(tag, tid));
+ future->init(journal::FutureImplPtr());
+
+ bufferlist bl;
+ bl.append(payload);
+ return std::make_pair(future, bl);
+ }
+
+ journal::ObjectRecorderPtr create_object(const std::string &oid,
+ uint8_t order) {
+ journal::ObjectRecorderPtr object(new journal::ObjectRecorder(
+ m_ioctx, oid, 0, *m_timer, m_timer_lock, &m_overflow_handler, order,
+ m_flush_interval, m_flush_bytes, m_flush_age));
+ return object;
+ }
+};
+
+TEST_F(TestObjectRecorder, Append) {
+ std::string oid = get_temp_oid();
+
+ journal::ObjectRecorderPtr object = create_object(oid, 24);
+
+ journal::AppendBuffer append_buffer1 = create_append_buffer("tag", 123,
+ "payload");
+ journal::AppendBuffers append_buffers;
+ append_buffers = boost::assign::list_of(append_buffer1);
+ ASSERT_FALSE(object->append(append_buffers));
+ ASSERT_EQ(1U, object->get_pending_appends());
+
+ journal::AppendBuffer append_buffer2 = create_append_buffer("tag", 124,
+ "payload");
+ append_buffers = boost::assign::list_of(append_buffer2);
+ ASSERT_FALSE(object->append(append_buffers));
+ ASSERT_EQ(2U, object->get_pending_appends());
+
+ C_SaferCond cond;
+ append_buffer2.first->flush(&cond);
+ ASSERT_EQ(0, cond.wait());
+ ASSERT_EQ(0U, object->get_pending_appends());
+}
+
+TEST_F(TestObjectRecorder, AppendFlushByCount) {
+ std::string oid = get_temp_oid();
+
+ set_flush_interval(2);
+ journal::ObjectRecorderPtr object = create_object(oid, 24);
+
+ journal::AppendBuffer append_buffer1 = create_append_buffer("tag", 123,
+ "payload");
+ journal::AppendBuffers append_buffers;
+ append_buffers = boost::assign::list_of(append_buffer1);
+ ASSERT_FALSE(object->append(append_buffers));
+ ASSERT_EQ(1U, object->get_pending_appends());
+
+ journal::AppendBuffer append_buffer2 = create_append_buffer("tag", 124,
+ "payload");
+ append_buffers = boost::assign::list_of(append_buffer2);
+ ASSERT_FALSE(object->append(append_buffers));
+ ASSERT_EQ(0U, object->get_pending_appends());
+
+ C_SaferCond cond;
+ append_buffer2.first->wait(&cond);
+ ASSERT_EQ(0, cond.wait());
+}
+
+TEST_F(TestObjectRecorder, AppendFlushByBytes) {
+ std::string oid = get_temp_oid();
+
+ set_flush_bytes(10);
+ journal::ObjectRecorderPtr object = create_object(oid, 24);
+
+ journal::AppendBuffer append_buffer1 = create_append_buffer("tag", 123,
+ "payload");
+ journal::AppendBuffers append_buffers;
+ append_buffers = boost::assign::list_of(append_buffer1);
+ ASSERT_FALSE(object->append(append_buffers));
+ ASSERT_EQ(1U, object->get_pending_appends());
+
+ journal::AppendBuffer append_buffer2 = create_append_buffer("tag", 124,
+ "payload");
+ append_buffers = boost::assign::list_of(append_buffer2);
+ ASSERT_FALSE(object->append(append_buffers));
+ ASSERT_EQ(0U, object->get_pending_appends());
+
+ C_SaferCond cond;
+ append_buffer2.first->wait(&cond);
+ ASSERT_EQ(0, cond.wait());
+}
+
+TEST_F(TestObjectRecorder, AppendFlushByAge) {
+ std::string oid = get_temp_oid();
+
+ set_flush_age(0.1);
+ journal::ObjectRecorderPtr object = create_object(oid, 24);
+
+ journal::AppendBuffer append_buffer1 = create_append_buffer("tag", 123,
+ "payload");
+ journal::AppendBuffers append_buffers;
+ append_buffers = boost::assign::list_of(append_buffer1);
+ ASSERT_FALSE(object->append(append_buffers));
+
+ journal::AppendBuffer append_buffer2 = create_append_buffer("tag", 124,
+ "payload");
+ append_buffers = boost::assign::list_of(append_buffer2);
+ ASSERT_FALSE(object->append(append_buffers));
+
+ C_SaferCond cond;
+ append_buffer2.first->wait(&cond);
+ ASSERT_EQ(0, cond.wait());
+ ASSERT_EQ(0U, object->get_pending_appends());
+}
+
+TEST_F(TestObjectRecorder, AppendFilledObject) {
+ std::string oid = get_temp_oid();
+
+ journal::ObjectRecorderPtr object = create_object(oid, 12);
+
+ std::string payload(2048, '1');
+ journal::AppendBuffer append_buffer1 = create_append_buffer("tag", 123,
+ payload);
+ journal::AppendBuffers append_buffers;
+ append_buffers = boost::assign::list_of(append_buffer1);
+ ASSERT_FALSE(object->append(append_buffers));
+
+ journal::AppendBuffer append_buffer2 = create_append_buffer("tag", 124,
+ payload);
+ append_buffers = boost::assign::list_of(append_buffer2);
+ ASSERT_TRUE(object->append(append_buffers));
+
+ C_SaferCond cond;
+ append_buffer2.first->wait(&cond);
+ ASSERT_EQ(0, cond.wait());
+ ASSERT_EQ(0U, object->get_pending_appends());
+}
+
+TEST_F(TestObjectRecorder, Flush) {
+ std::string oid = get_temp_oid();
+
+ journal::ObjectRecorderPtr object = create_object(oid, 24);
+
+ journal::AppendBuffer append_buffer1 = create_append_buffer("tag", 123,
+ "payload");
+ journal::AppendBuffers append_buffers;
+ append_buffers = boost::assign::list_of(append_buffer1);
+ ASSERT_FALSE(object->append(append_buffers));
+ ASSERT_EQ(1U, object->get_pending_appends());
+
+ object->flush();
+
+ C_SaferCond cond;
+ append_buffer1.first->wait(&cond);
+ ASSERT_EQ(0, cond.wait());
+ ASSERT_EQ(0U, object->get_pending_appends());
+}
+
+TEST_F(TestObjectRecorder, FlushFuture) {
+ std::string oid = get_temp_oid();
+
+ journal::ObjectRecorderPtr object = create_object(oid, 24);
+
+ journal::AppendBuffer append_buffer = create_append_buffer("tag", 123,
+ "payload");
+ journal::AppendBuffers append_buffers;
+ append_buffers = boost::assign::list_of(append_buffer);
+ ASSERT_FALSE(object->append(append_buffers));
+ ASSERT_EQ(1U, object->get_pending_appends());
+
+ C_SaferCond cond;
+ append_buffer.first->wait(&cond);
+ object->flush(append_buffer.first);
+ ASSERT_TRUE(append_buffer.first->is_flush_in_progress() ||
+ append_buffer.first->is_complete());
+ ASSERT_EQ(0, cond.wait());
+}
+
+TEST_F(TestObjectRecorder, FlushDetachedFuture) {
+ std::string oid = get_temp_oid();
+
+ journal::ObjectRecorderPtr object = create_object(oid, 24);
+
+ journal::AppendBuffer append_buffer = create_append_buffer("tag", 123,
+ "payload");
+
+ journal::AppendBuffers append_buffers;
+ append_buffers = boost::assign::list_of(append_buffer);
+
+ object->flush(append_buffer.first);
+ ASSERT_FALSE(append_buffer.first->is_flush_in_progress());
+ ASSERT_FALSE(object->append(append_buffers));
+
+ // should automatically flush once its attached to the object
+ C_SaferCond cond;
+ append_buffer.first->wait(&cond);
+ ASSERT_EQ(0, cond.wait());
+}
+
+TEST_F(TestObjectRecorder, Overflow) {
+ std::string oid = get_temp_oid();
+
+ journal::ObjectRecorderPtr object1 = create_object(oid, 12);
+ journal::ObjectRecorderPtr object2 = create_object(oid, 12);
+
+ std::string payload(2048, '1');
+ journal::AppendBuffer append_buffer1 = create_append_buffer("tag", 123,
+ payload);
+ journal::AppendBuffer append_buffer2 = create_append_buffer("tag", 124,
+ payload);
+ journal::AppendBuffers append_buffers;
+ append_buffers = boost::assign::list_of(append_buffer1)(append_buffer2);
+ ASSERT_TRUE(object1->append(append_buffers));
+
+ C_SaferCond cond;
+ append_buffer2.first->wait(&cond);
+ ASSERT_EQ(0, cond.wait());
+ ASSERT_EQ(0U, object1->get_pending_appends());
+
+ journal::AppendBuffer append_buffer3 = create_append_buffer("bar", 123,
+ payload);
+ append_buffers = boost::assign::list_of(append_buffer3);
+
+ ASSERT_FALSE(object2->append(append_buffers));
+ append_buffer3.first->flush(NULL);
+
+ bool overflowed = false;
+ {
+ Mutex::Locker locker(m_overflow_handler.lock);
+ while (m_overflow_handler.overflows == 0) {
+ if (m_overflow_handler.cond.WaitInterval(
+ reinterpret_cast<CephContext*>(m_ioctx.cct()),
+ m_overflow_handler.lock, utime_t(10, 0)) != 0) {
+ break;
+ }
+ }
+ if (m_overflow_handler.overflows != 0) {
+ overflowed = true;
+ }
+ }
+
+ ASSERT_TRUE(overflowed);
+}
--- /dev/null
+// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "gtest/gtest.h"
+#include "common/ceph_argparse.h"
+#include "common/ceph_crypto.h"
+#include "common/config.h"
+#include "global/global_context.h"
+#include "global/global_init.h"
+#include <vector>
+
+int main(int argc, char **argv)
+{
+ ::testing::InitGoogleTest(&argc, argv);
+
+ std::vector<const char*> args;
+ argv_to_vec(argc, (const char **)argv, args);
+
+ global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
+ g_conf->set_val("lockdep", "true");
+ common_init_finish(g_ceph_context);
+
+ int r = RUN_ALL_TESTS();
+ g_ceph_context->put();
+ ceph::crypto::shutdown();
+ return r;
+}