return bl;
}
+int RadosTestFixture::init_metadata(journal::JournalMetadataPtr metadata) {
+ C_SaferCond cond;
+ metadata->init(&cond);
+ return cond.wait();
+}
+
bool RadosTestFixture::wait_for_update(journal::JournalMetadataPtr metadata) {
Mutex::Locker locker(m_listener.mutex);
while (m_listener.updates[metadata.get()] == 0) {
}
};
+ int init_metadata(journal::JournalMetadataPtr metadata);
+
bool wait_for_update(journal::JournalMetadataPtr metadata);
static std::string _pool_name;
#include "journal/FutureImpl.h"
#include "common/Cond.h"
+#include "common/Finisher.h"
#include "common/Mutex.h"
#include "gtest/gtest.h"
+#include "test/journal/RadosTestFixture.h"
-class TestFutureImpl : public ::testing::Test {
+class TestFutureImpl : public RadosTestFixture {
public:
+ TestFutureImpl() : m_finisher(NULL) {
+ }
+ ~TestFutureImpl() {
+ m_finisher->stop();
+ delete m_finisher;
+ }
+
struct FlushHandler : public journal::FutureImpl::FlushHandler {
uint64_t refs;
uint64_t flushes;
}
};
+ void SetUp() {
+ RadosTestFixture::SetUp();
+ m_finisher = new Finisher(reinterpret_cast<CephContext*>(m_ioctx.cct()));
+ m_finisher->start();
+ }
+
journal::FutureImplPtr create_future(const std::string &tag, uint64_t tid,
const journal::FutureImplPtr &prev =
journal::FutureImplPtr()) {
- journal::FutureImplPtr future(new journal::FutureImpl(tag, tid));
+ journal::FutureImplPtr future(new journal::FutureImpl(*m_finisher,
+ tag, tid));
future->init(prev);
return future;
}
void flush(const journal::FutureImplPtr &future) {
}
+ Finisher *m_finisher;
+
FlushHandler m_flush_handler;
};
std::string oid = get_temp_oid();
journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1");
- ASSERT_EQ(-ENOENT, metadata1->init());
+ ASSERT_EQ(-ENOENT, init_metadata(metadata1));
}
TEST_F(TestJournalMetadata, ClientDNE) {
ASSERT_EQ(0, client_register(oid, "client1", ""));
journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1");
- ASSERT_EQ(0, metadata1->init());
+ ASSERT_EQ(0, init_metadata(metadata1));
journal::JournalMetadataPtr metadata2 = create_metadata(oid, "client2");
- ASSERT_EQ(-ENOENT, metadata2->init());
+ ASSERT_EQ(-ENOENT, init_metadata(metadata2));
}
TEST_F(TestJournalMetadata, SetCommitPositions) {
ASSERT_EQ(0, client_register(oid, "client1", ""));
journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1");
- ASSERT_EQ(0, metadata1->init());
+ ASSERT_EQ(0, init_metadata(metadata1));
journal::JournalMetadataPtr metadata2 = create_metadata(oid, "client1");
- ASSERT_EQ(0, metadata2->init());
+ ASSERT_EQ(0, init_metadata(metadata2));
ASSERT_TRUE(wait_for_update(metadata2));
journal::JournalMetadata::ObjectSetPosition commit_position;
ASSERT_EQ(0, client_register(oid, "client1", ""));
journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1");
- ASSERT_EQ(0, metadata1->init());
+ ASSERT_EQ(0, init_metadata(metadata1));
ASSERT_TRUE(wait_for_update(metadata1));
ASSERT_EQ(0U, metadata1->get_active_set());
Mutex lock;
Cond cond;
bool entries_available;
- bool error_occurred;
+ bool complete;
+ int complete_result;
ReplayHandler()
- : lock("lock"), entries_available(false), error_occurred(false) {}
+ : lock("lock"), entries_available(false), complete(false),
+ complete_result(0) {}
virtual bool filter_entry(const std::string &tag) {
return false;
cond.Signal();
}
- virtual void handle_error(int r) {
- error_occurred = true;
+ virtual void handle_complete(int r) {
+ Mutex::Locker locker(lock);
+ complete = true;
+ complete_result = r;
+ cond.Signal();
}
};
return entries->size() == count;
}
+ bool wait_for_complete(journal::JournalPlayerPtr player) {
+ journal::Entry entry;
+ journal::JournalPlayer::ObjectSetPosition object_set_position;
+ player->try_pop_front(&entry, &object_set_position);
+
+ Mutex::Locker locker(m_replay_hander.lock);
+ while (!m_replay_hander.complete) {
+ if (m_replay_hander.cond.WaitInterval(
+ reinterpret_cast<CephContext*>(m_ioctx.cct()),
+ m_replay_hander.lock, utime_t(10, 0)) != 0) {
+ return false;
+ }
+ }
+ m_replay_hander.complete = false;
+ return true;
+ }
+
int write_entry(const std::string &oid, uint64_t object_num,
const std::string &tag, uint64_t tid) {
bufferlist bl;
ASSERT_EQ(0, client_commit(oid, commit_position));
journal::JournalMetadataPtr metadata = create_metadata(oid);
- ASSERT_EQ(0, metadata->init());
+ ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayerPtr player = create_player(oid, metadata);
Entries entries;
ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+ ASSERT_TRUE(wait_for_complete(player));
Entries expected_entries;
expected_entries = boost::assign::list_of(
ASSERT_EQ(0, client_commit(oid, commit_position));
journal::JournalMetadataPtr metadata = create_metadata(oid);
- ASSERT_EQ(0, metadata->init());
+ ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayerPtr player = create_player(oid, metadata);
Entries entries;
ASSERT_TRUE(wait_for_entries(player, 2, &entries));
+ ASSERT_TRUE(wait_for_complete(player));
Entries expected_entries;
expected_entries = boost::assign::list_of(
ASSERT_EQ(0, client_commit(oid, commit_position));
journal::JournalMetadataPtr metadata = create_metadata(oid);
- ASSERT_EQ(0, metadata->init());
+ ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayerPtr player = create_player(oid, metadata);
Entries entries;
ASSERT_TRUE(wait_for_entries(player, 3, &entries));
+ ASSERT_TRUE(wait_for_complete(player));
uint64_t last_tid;
ASSERT_TRUE(metadata->get_last_allocated_tid("tag1", &last_tid));
ASSERT_EQ(0, client_commit(oid, commit_position));
journal::JournalMetadataPtr metadata = create_metadata(oid);
- ASSERT_EQ(0, metadata->init());
+ ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayerPtr player = create_player(oid, metadata);
journal::Entry entry;
cls::journal::ObjectSetPosition object_set_position;
ASSERT_FALSE(player->try_pop_front(&entry, &object_set_position));
- ASSERT_TRUE(m_replay_hander.error_occurred);
+ ASSERT_TRUE(wait_for_complete(player));
+ ASSERT_NE(0, m_replay_hander.complete_result);
}
TEST_F(TestJournalPlayer, PrefetchAndWatch) {
ASSERT_EQ(0, client_commit(oid, commit_position));
journal::JournalMetadataPtr metadata = create_metadata(oid);
- ASSERT_EQ(0, metadata->init());
+ ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayerPtr player = create_player(oid, metadata);
ASSERT_EQ(0, client_register(oid));
journal::JournalMetadataPtr metadata = create_metadata(oid);
- ASSERT_EQ(0, metadata->init());
+ ASSERT_EQ(0, init_metadata(metadata));
journal::JournalRecorder *recorder = create_recorder(oid, metadata);
ASSERT_EQ(0, client_register(oid));
journal::JournalMetadataPtr metadata = create_metadata(oid);
- ASSERT_EQ(0, metadata->init());
+ ASSERT_EQ(0, init_metadata(metadata));
ASSERT_EQ(0U, metadata->get_active_set());
journal::JournalRecorder *recorder = create_recorder(oid, metadata);
ASSERT_EQ(0, client_register(oid));
journal::JournalMetadataPtr metadata = create_metadata(oid);
- ASSERT_EQ(0, metadata->init());
+ ASSERT_EQ(0, init_metadata(metadata));
ASSERT_EQ(0U, metadata->get_active_set());
journal::JournalRecorder *recorder1 = create_recorder(oid, metadata);
ASSERT_EQ(0, client_register(oid));
journal::JournalMetadataPtr metadata = create_metadata(oid);
- ASSERT_EQ(0, metadata->init());
+ ASSERT_EQ(0, init_metadata(metadata));
journal::JournalRecorder *recorder = create_recorder(oid, metadata);
ASSERT_EQ(0, client_register(oid));
journal::JournalMetadataPtr metadata = create_metadata(oid);
- ASSERT_EQ(0, metadata->init());
+ ASSERT_EQ(0, init_metadata(metadata));
journal::JournalRecorder *recorder = create_recorder(oid, metadata);
journal::Future future1 = recorder->append("tag1", create_payload("payload1"));
journal::Future future2 = recorder->append("tag1", create_payload("payload2"));
- recorder->flush();
+ C_SaferCond cond1;
+ recorder->flush(&cond1);
+ ASSERT_EQ(0, cond1.wait());
- C_SaferCond cond;
- future2.wait(&cond);
- ASSERT_EQ(0, cond.wait());
+ C_SaferCond cond2;
+ future2.wait(&cond2);
+ ASSERT_EQ(0, cond2.wait());
ASSERT_TRUE(future1.is_complete());
ASSERT_TRUE(future2.is_complete());
}
ASSERT_EQ(0, client_register(oid));
journal::JournalMetadataPtr metadata = create_metadata(oid);
- ASSERT_EQ(0, metadata->init());
+ ASSERT_EQ(0, init_metadata(metadata));
ASSERT_TRUE(wait_for_update(metadata));
metadata->set_active_set(10);
ASSERT_EQ(0, client_register(oid));
journal::JournalMetadataPtr metadata1 = create_metadata(oid);
- ASSERT_EQ(0, metadata1->init());
+ ASSERT_EQ(0, init_metadata(metadata1));
ASSERT_TRUE(wait_for_update(metadata1));
metadata1->set_active_set(10);
ASSERT_TRUE(wait_for_update(metadata1));
journal::JournalMetadataPtr metadata2 = create_metadata(oid);
- ASSERT_EQ(0, metadata2->init());
+ ASSERT_EQ(0, init_metadata(metadata2));
ASSERT_EQ(0, append(oid + ".0", create_payload("payload")));
ASSERT_EQ(0, append(oid + ".2", create_payload("payload")));
ASSERT_EQ(0, client_register(oid, "client2", "slow client"));
journal::JournalMetadataPtr metadata = create_metadata(oid);
- ASSERT_EQ(0, metadata->init());
+ ASSERT_EQ(0, init_metadata(metadata));
ASSERT_TRUE(wait_for_update(metadata));
metadata->set_active_set(10);
ASSERT_EQ(0, client_register(oid));
journal::JournalMetadataPtr metadata = create_metadata(oid);
- ASSERT_EQ(0, metadata->init());
+ ASSERT_EQ(0, init_metadata(metadata));
ASSERT_TRUE(wait_for_update(metadata));
metadata->set_active_set(10);
ASSERT_EQ(0, client_register(oid, "client2", "other client"));
journal::JournalMetadataPtr metadata = create_metadata(oid);
- ASSERT_EQ(0, metadata->init());
+ ASSERT_EQ(0, init_metadata(metadata));
ASSERT_TRUE(wait_for_update(metadata));
journal::JournalTrimmer *trimmer = create_trimmer(oid, metadata);
#include "gtest/gtest.h"
#include "test/librados/test.h"
#include "test/journal/RadosTestFixture.h"
+#include "include/stringify.h"
class TestJournaler : public RadosTestFixture {
public:
static const std::string CLIENT_ID;
- static uint64_t get_temp_journal_id() {
- return ++_journal_id;
+ static std::string get_temp_journal_id() {
+ return stringify(++_journal_id);
}
virtual void SetUp() {
return m_journaler->create(order, splay_width);
}
+ int init_journaler() {
+ C_SaferCond cond;
+ m_journaler->init(&cond);
+ return cond.wait();
+ }
+
int register_client(const std::string &client_id, const std::string &desc) {
journal::Journaler journaler(m_ioctx, m_ioctx, m_journal_id, client_id);
return journaler.register_client(desc);
static uint64_t _journal_id;
- uint64_t m_journal_id;
+ std::string m_journal_id;
journal::Journaler *m_journaler;
};
TEST_F(TestJournaler, Init) {
ASSERT_EQ(0, create_journal(12, 8));
ASSERT_EQ(0, register_client(CLIENT_ID, "foo"));
- ASSERT_EQ(0, m_journaler->init());
+ ASSERT_EQ(0, init_journaler());
}
TEST_F(TestJournaler, InitDNE) {
- ASSERT_EQ(-ENOENT, m_journaler->init());
+ ASSERT_EQ(-ENOENT, init_journaler());
}
TEST_F(TestJournaler, RegisterClientDuplicate) {
C_SaferCond cond;
object->fetch(&cond);
- ASSERT_EQ(0, cond.wait());
+ ASSERT_LE(0, cond.wait());
journal::ObjectPlayer::Entries entries;
object->get_entries(&entries);
C_SaferCond cond;
object->fetch(&cond);
- ASSERT_EQ(0, cond.wait());
+ ASSERT_LE(0, cond.wait());
journal::ObjectPlayer::Entries entries;
object->get_entries(&entries);
C_SaferCond cond;
object->fetch(&cond);
- ASSERT_EQ(0, cond.wait());
+ ASSERT_LE(0, cond.wait());
journal::ObjectPlayer::Entries entries;
object->get_entries(&entries);
C_SaferCond cond1;
object->fetch(&cond1);
- ASSERT_EQ(0, cond1.wait());
+ ASSERT_LE(0, cond1.wait());
journal::ObjectPlayer::Entries entries;
object->get_entries(&entries);
C_SaferCond cond2;
object->fetch(&cond2);
- ASSERT_EQ(0, cond2.wait());
+ ASSERT_LE(0, cond2.wait());
object->get_entries(&entries);
ASSERT_EQ(2U, entries.size());
C_SaferCond cond;
object->fetch(&cond);
- ASSERT_EQ(0, cond.wait());
+ ASSERT_LE(0, cond.wait());
journal::ObjectPlayer::Entries entries;
object->get_entries(&entries);
bufferlist bl;
::encode(entry1, bl);
ASSERT_EQ(0, append(get_object_name(oid), bl));
- ASSERT_EQ(0, cond1.wait());
+ ASSERT_LE(0, cond1.wait());
journal::ObjectPlayer::Entries entries;
object->get_entries(&entries);
bl.clear();
::encode(entry2, bl);
ASSERT_EQ(0, append(get_object_name(oid), bl));
- ASSERT_EQ(0, cond2.wait());
+ ASSERT_LE(0, cond2.wait());
object->get_entries(&entries);
ASSERT_EQ(2U, entries.size());
#include "journal/ObjectRecorder.h"
#include "common/Cond.h"
+#include "common/Finisher.h"
#include "common/Mutex.h"
#include "common/Timer.h"
#include "gtest/gtest.h"
TestObjectRecorder()
: m_flush_interval(std::numeric_limits<uint32_t>::max()),
m_flush_bytes(std::numeric_limits<uint64_t>::max()),
- m_flush_age(600)
+ m_flush_age(600),
+ m_finisher(NULL)
{
}
+ ~TestObjectRecorder() {
+ m_finisher->stop();
+ delete m_finisher;
+ }
+
struct OverflowHandler : public journal::ObjectRecorder::OverflowHandler {
Mutex lock;
Cond cond;
double m_flush_age;
OverflowHandler m_overflow_handler;
+ Finisher *m_finisher;
+
+ void SetUp() {
+ RadosTestFixture::SetUp();
+ m_finisher = new Finisher(reinterpret_cast<CephContext*>(m_ioctx.cct()));
+ m_finisher->start();
+ }
+
inline void set_flush_interval(uint32_t i) {
m_flush_interval = i;
}
journal::AppendBuffer create_append_buffer(const std::string &tag,
uint64_t tid,
const std::string &payload) {
- journal::FutureImplPtr future(new journal::FutureImpl(tag, tid));
+ journal::FutureImplPtr future(new journal::FutureImpl(*m_finisher,
+ tag, tid));
future->init(journal::FutureImplPtr());
bufferlist bl;
ASSERT_FALSE(object->append(append_buffers));
ASSERT_EQ(1U, object->get_pending_appends());
- object->flush();
+ C_SaferCond cond1;
+ object->flush(&cond1);
+ ASSERT_EQ(0, cond1.wait());
- C_SaferCond cond;
- append_buffer1.first->wait(&cond);
- ASSERT_EQ(0, cond.wait());
+ C_SaferCond cond2;
+ append_buffer1.first->wait(&cond2);
+ ASSERT_EQ(0, cond2.wait());
ASSERT_EQ(0U, object->get_pending_appends());
}