From 6056f8c45c99bd37cb18933a37cc238c7e9a7c7d Mon Sep 17 00:00:00 2001 From: Jason Dillaman Date: Fri, 13 May 2016 14:49:07 -0400 Subject: [PATCH] journal: re-fetch active object before advancing set during replay During a live replay, it's possible that an append and and overflow into the next object could race with the live playback of the same object. Re-fetch an "empty" object at least once before advancing to next set to ensure all records have been read. Fixes: http://tracker.ceph.com/issues/15665 Signed-off-by: Jason Dillaman --- src/journal/JournalPlayer.cc | 17 +++++----- src/journal/JournalPlayer.h | 1 - src/journal/ObjectPlayer.cc | 2 ++ src/journal/ObjectPlayer.h | 9 ++++++ src/test/journal/test_JournalPlayer.cc | 45 ++++++++++++++++++++++++++ 5 files changed, 64 insertions(+), 10 deletions(-) diff --git a/src/journal/JournalPlayer.cc b/src/journal/JournalPlayer.cc index 2f1d96df15c68..d0d62694a9af3 100644 --- a/src/journal/JournalPlayer.cc +++ b/src/journal/JournalPlayer.cc @@ -394,13 +394,6 @@ bool JournalPlayer::verify_playback_ready() { } } - // if we just advanced to this object, make sure we have the latest - // set of data before advancing to a new tag - if (m_watch_enabled && m_watch_required) { - m_watch_required = false; - return false; - } - // NOTE: replay currently does not check tag class to playback multiple tags // from different classes (issue #14909). When a new tag is discovered, it // is assumed that the previous tag was closed at the last replayable entry. @@ -452,7 +445,6 @@ void JournalPlayer::advance_splay_object() { assert(m_lock.is_locked()); ++m_splay_offset; m_splay_offset %= m_journal_metadata->get_splay_width(); - m_watch_required = true; ldout(m_cct, 20) << __func__ << ": new offset " << static_cast(m_splay_offset) << dendl; } @@ -466,9 +458,16 @@ bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) { uint64_t active_set = m_journal_metadata->get_active_set(); if (!player->empty() || object_set == active_set) { return false; + } else if (m_watch_enabled && object_set < active_set && + player->refetch_required()) { + ldout(m_cct, 20) << __func__ << ": refetching " << player->get_oid() + << dendl; + player->clear_refetch_required(); + return false; } - ldout(m_cct, 15) << player->get_oid() << " empty" << dendl; + ldout(m_cct, 15) << __func__ << ": " << player->get_oid() << " empty" + << dendl; ObjectPlayers &object_players = m_object_players[ player->get_object_number() % splay_width]; assert(!object_players.empty()); diff --git a/src/journal/JournalPlayer.h b/src/journal/JournalPlayer.h index 80a9ff7c1f089..85a8b28da25f5 100644 --- a/src/journal/JournalPlayer.h +++ b/src/journal/JournalPlayer.h @@ -114,7 +114,6 @@ private: bool m_watch_enabled; bool m_watch_scheduled; double m_watch_interval; - bool m_watch_required = false; bool m_handler_notified = false; diff --git a/src/journal/ObjectPlayer.cc b/src/journal/ObjectPlayer.cc index 894b56f5da2da..2c2f3e33a40b8 100644 --- a/src/journal/ObjectPlayer.cc +++ b/src/journal/ObjectPlayer.cc @@ -221,6 +221,8 @@ void ObjectPlayer::handle_watch_fetched(int r) { assert(m_watch_in_progress); if (r == -ENOENT) { r = 0; + } else { + m_refetch_required = true; } std::swap(on_finish, m_watch_ctx); } diff --git a/src/journal/ObjectPlayer.h b/src/journal/ObjectPlayer.h index f68ee3741d827..73fa7d132b289 100644 --- a/src/journal/ObjectPlayer.h +++ b/src/journal/ObjectPlayer.h @@ -62,6 +62,13 @@ public: *invalid_ranges = m_invalid_ranges; } + inline bool refetch_required() const { + return m_refetch_required; + } + inline void clear_refetch_required() { + m_refetch_required = false; + } + private: typedef std::pair EntryKey; typedef boost::unordered_map EntryKeys; @@ -114,6 +121,8 @@ private: Cond m_watch_in_progress_cond; bool m_watch_in_progress; + bool m_refetch_required = true; + int handle_fetch_complete(int r, const bufferlist &bl); void schedule_watch(); diff --git a/src/test/journal/test_JournalPlayer.cc b/src/test/journal/test_JournalPlayer.cc index f989dd536b7a0..7ad92be3e2ca0 100644 --- a/src/test/journal/test_JournalPlayer.cc +++ b/src/test/journal/test_JournalPlayer.cc @@ -452,3 +452,48 @@ TEST_F(TestJournalPlayer, ImbalancedJournal) { ASSERT_TRUE(metadata->get_last_allocated_entry_tid(301, &last_tid)); ASSERT_EQ(3U, last_tid); } + +TEST_F(TestJournalPlayer, LiveReplayLaggyAppend) { + std::string oid = get_temp_oid(); + + cls::journal::ObjectSetPosition commit_position; + + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + ASSERT_EQ(0, client_commit(oid, commit_position)); + + journal::JournalMetadataPtr metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + journal::JournalPlayer *player = create_player(oid, metadata); + + ASSERT_EQ(0, write_entry(oid, 0, 0, 0)); + ASSERT_EQ(0, write_entry(oid, 1, 0, 1)); + ASSERT_EQ(0, write_entry(oid, 0, 0, 2)); + ASSERT_EQ(0, write_entry(oid, 0, 0, 4)); + ASSERT_EQ(0, write_entry(oid, 3, 0, 5)); // laggy entry 0/3 in object 1 + metadata->set_active_set(1); + player->prefetch_and_watch(0.25); + + Entries entries; + ASSERT_TRUE(wait_for_entries(player, 3, &entries)); + + Entries expected_entries = { + create_entry(0, 0), + create_entry(0, 1), + create_entry(0, 2)}; + ASSERT_EQ(expected_entries, entries); + + journal::Entry entry; + uint64_t commit_tid; + ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid)); + + ASSERT_EQ(0, write_entry(oid, 1, 0, 3)); + ASSERT_TRUE(wait_for_entries(player, 3, &entries)); + + expected_entries = { + create_entry(0, 3), + create_entry(0, 4), + create_entry(0, 5)}; + ASSERT_EQ(expected_entries, entries); +} -- 2.39.5