}
JournalPlayer::~JournalPlayer() {
- m_async_op_tracker.wait_for_ops();
+ assert(m_async_op_tracker.empty());
{
Mutex::Locker locker(m_lock);
+ assert(m_shut_down);
assert(m_fetch_object_numbers.empty());
assert(!m_watch_scheduled);
}
prefetch();
}
-void JournalPlayer::unwatch() {
+void JournalPlayer::shut_down(Context *on_finish) {
ldout(m_cct, 20) << __func__ << dendl;
Mutex::Locker locker(m_lock);
+
+ assert(!m_shut_down);
+ m_shut_down = true;
m_watch_enabled = false;
+
+ on_finish = utils::create_async_context_callback(
+ m_journal_metadata, on_finish);
+
if (m_watch_scheduled) {
- for (auto &players : m_object_players) {
- players.second.begin()->second->unwatch();
+ ObjectPlayerPtr object_player = get_object_player();
+ switch (m_watch_step) {
+ case WATCH_STEP_FETCH_FIRST:
+ object_player = m_object_players.begin()->second.begin()->second;
+ // fallthrough
+ case WATCH_STEP_FETCH_CURRENT:
+ object_player->unwatch();
+ break;
+ case WATCH_STEP_ASSERT_ACTIVE:
+ break;
}
- m_watch_scheduled = false;
}
+
+ m_async_op_tracker.wait_for_ops(on_finish);
}
bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
assert(m_fetch_object_numbers.count(object_num) == 1);
m_fetch_object_numbers.erase(object_num);
+ if (m_shut_down) {
+ return;
+ }
+
if (r == -ENOENT) {
r = 0;
}
// by an incomplete tag sequence
ldout(m_cct, 20) << __func__ << ": asserting active tag="
<< *m_active_tag_tid << dendl;
+
+ m_async_op_tracker.start_op();
FunctionContext *ctx = new FunctionContext([this](int r) {
handle_watch_assert_active(r);
});
return;
}
+ ObjectPlayerPtr object_player;
double watch_interval = m_watch_interval;
- ObjectPlayerPtr object_player = get_object_player();
switch (m_watch_step) {
case WATCH_STEP_FETCH_CURRENT:
{
ldout(m_cct, 20) << __func__ << ": scheduling watch on "
<< object_player->get_oid() << dendl;
- C_Watch *ctx = new C_Watch(this, object_player->get_object_number());
+ Context *ctx = utils::create_async_context_callback(
+ m_journal_metadata, new C_Watch(this, object_player->get_object_number()));
object_player->watch(ctx, watch_interval);
}
void JournalPlayer::handle_watch(uint64_t object_num, int r) {
ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
- if (r == -ECANCELED) {
- // unwatch of object player(s)
- return;
- }
-
Mutex::Locker locker(m_lock);
assert(m_watch_scheduled);
m_watch_scheduled = false;
+ if (m_shut_down || r == -ECANCELED) {
+ // unwatch of object player(s)
+ return;
+ }
+
ObjectPlayerPtr object_player = get_object_player(object_num);
if (r == 0 && object_player->empty()) {
// possibly need to prune this empty object player if we've
}
m_watch_step = WATCH_STEP_FETCH_CURRENT;
- schedule_watch();
+ if (!m_shut_down && m_watch_enabled) {
+ schedule_watch();
+ }
+ m_async_op_tracker.finish_op();
}
void JournalPlayer::notify_entries_available() {
m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), m_order(order),
m_watch_interval(0), m_watch_task(NULL),
m_lock(utils::unique_lock_name("ObjectPlayer::m_lock", this)),
- m_fetch_in_progress(false), m_read_off(0), m_watch_ctx(NULL),
- m_watch_in_progress(false) {
+ m_fetch_in_progress(false), m_read_off(0) {
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
}
Mutex::Locker timer_locker(m_timer_lock);
Mutex::Locker locker(m_lock);
assert(!m_fetch_in_progress);
- assert(!m_watch_in_progress);
- assert(m_watch_ctx == NULL);
+ assert(m_watch_ctx == nullptr);
}
}
Mutex::Locker timer_locker(m_timer_lock);
m_watch_interval = interval;
- assert(m_watch_ctx == NULL);
+ assert(m_watch_ctx == nullptr);
m_watch_ctx = on_fetch;
- // watch callback might lead to re-scheduled watch
- if (!m_watch_in_progress) {
- schedule_watch();
- }
+ schedule_watch();
}
void ObjectPlayer::unwatch() {
Context *watch_ctx = nullptr;
{
Mutex::Locker timer_locker(m_timer_lock);
+ assert(!m_unwatched);
+ m_unwatched = true;
- cancel_watch();
+ if (!cancel_watch()) {
+ return;
+ }
std::swap(watch_ctx, m_watch_ctx);
- while (m_watch_in_progress) {
- m_watch_in_progress_cond.Wait(m_timer_lock);
- }
}
if (watch_ctx != nullptr) {
m_timer.add_event_after(m_watch_interval, m_watch_task);
}
-void ObjectPlayer::cancel_watch() {
+bool ObjectPlayer::cancel_watch() {
assert(m_timer_lock.is_locked());
ldout(m_cct, 20) << __func__ << ": " << m_oid << " cancelling watch" << dendl;
- if (m_watch_task != NULL) {
- m_timer.cancel_event(m_watch_task);
- m_watch_task = NULL;
+ if (m_watch_task != nullptr) {
+ bool canceled = m_timer.cancel_event(m_watch_task);
+ assert(canceled);
+
+ m_watch_task = nullptr;
+ return true;
}
+ return false;
}
void ObjectPlayer::handle_watch_task() {
assert(m_timer_lock.is_locked());
ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl;
- assert(m_watch_ctx != NULL);
+ assert(m_watch_ctx != nullptr);
+ assert(m_watch_task != nullptr);
- assert(!m_watch_in_progress);
- m_watch_in_progress = true;
- m_watch_task = NULL;
+ m_watch_task = nullptr;
fetch(new C_WatchFetch(this));
}
ldout(m_cct, 10) << __func__ << ": " << m_oid << " poll complete, r=" << r
<< dendl;
- Context *on_finish = nullptr;
+ Context *watch_ctx = nullptr;
{
Mutex::Locker timer_locker(m_timer_lock);
- assert(m_watch_in_progress);
if (r == -ENOENT) {
r = 0;
} else {
m_refetch_required = true;
}
- std::swap(on_finish, m_watch_ctx);
- }
-
- if (on_finish != nullptr) {
- on_finish->complete(r);
- }
-
- {
- Mutex::Locker locker(m_timer_lock);
- assert(m_watch_in_progress);
+ std::swap(watch_ctx, m_watch_ctx);
- // callback might have attempted to re-schedule the watch -- complete now
- if (m_watch_ctx != nullptr) {
- schedule_watch();
+ if (m_unwatched) {
+ m_unwatched = false;
+ r = -ECANCELED;
}
+ }
- m_watch_in_progress = false;
- m_watch_in_progress_cond.Signal();
+ if (watch_ctx != nullptr) {
+ watch_ctx->complete(r);
}
}
void ObjectPlayer::C_Fetch::finish(int r) {
r = object_player->handle_fetch_complete(r, read_bl);
+ object_player.reset();
+
on_finish->complete(r);
}
#include "gtest/gtest.h"
#include "test/journal/RadosTestFixture.h"
#include <list>
+#include <boost/scope_exit.hpp>
class TestJournalPlayer : public RadosTestFixture {
public:
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 0, 234, 120));
ASSERT_EQ(0, write_entry(oid, 1, 234, 121));
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 0, 234, 120));
ASSERT_EQ(0, write_entry(oid, 1, 234, 121));
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, metadata->set_active_set(1));
ASSERT_EQ(0, write_entry(oid, 0, 2, 852));
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, metadata->set_active_set(2));
ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
ASSERT_EQ(0, write_entry(oid, 1, 0, 3));
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 0, 234, 120));
ASSERT_EQ(0, write_entry(oid, 1, 235, 121));
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
ASSERT_EQ(0, metadata->set_active_set(2));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
metadata->set_minimum_set(2);
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 8, 300, 0));
ASSERT_EQ(0, write_entry(oid, 8, 301, 0));
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 0, 2, 852));
ASSERT_EQ(0, write_entry(oid, 0, 2, 856));
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, metadata->set_active_set(2));
ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
C_SaferCond ctx1;
cls::journal::Tag tag1;
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
ASSERT_EQ(0, write_entry(oid, 1, 0, 3));
ASSERT_EQ(0, init_metadata(metadata));
journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
ASSERT_EQ(0, metadata->set_active_set(1));
ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
ASSERT_EQ(expected_entries, entries);
}
+TEST_F(TestJournalPlayer, PrefechShutDown) {
+ std::string oid = get_temp_oid();
+
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ ASSERT_EQ(0, client_commit(oid, {}));
+
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+ player->prefetch();
+}
+
+TEST_F(TestJournalPlayer, LiveReplayShutDown) {
+ std::string oid = get_temp_oid();
+
+ ASSERT_EQ(0, create(oid));
+ ASSERT_EQ(0, client_register(oid));
+ ASSERT_EQ(0, client_commit(oid, {}));
+
+ journal::JournalMetadataPtr metadata = create_metadata(oid);
+ ASSERT_EQ(0, init_metadata(metadata));
+
+ journal::JournalPlayer *player = create_player(oid, metadata);
+ BOOST_SCOPE_EXIT_ALL( (player) ) {
+ C_SaferCond unwatch_ctx;
+ player->shut_down(&unwatch_ctx);
+ ASSERT_EQ(0, unwatch_ctx.wait());
+ };
+ player->prefetch_and_watch(0.25);
+}
+