From: Jason Dillaman Date: Wed, 2 Mar 2016 15:09:07 +0000 (-0500) Subject: journal: properly handle tag transition X-Git-Tag: v10.1.0~187^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=21d959529c3542e3db4311a00384a03295cd3bde;p=ceph.git journal: properly handle tag transition Now that the tag concept has been re-used for delineating epochs for librbd, we need playback to properly handle the cases where the active playback tag abruptly ends and a newer tag is inserted in the first splay offset object. Signed-off-by: Jason Dillaman --- diff --git a/src/journal/JournalPlayer.cc b/src/journal/JournalPlayer.cc index 0478cec91be9..664b4b001024 100644 --- a/src/journal/JournalPlayer.cc +++ b/src/journal/JournalPlayer.cc @@ -68,7 +68,9 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx, // start replay after the last committed entry's object uint8_t splay_width = m_journal_metadata->get_splay_width(); - m_commit_object = commit_position.object_positions.front().object_number; + auto &active_position = commit_position.object_positions.front(); + m_active_tag_tid = active_position.tag_tid; + m_commit_object = active_position.object_number; m_splay_offset = m_commit_object % splay_width; for (auto &position : commit_position.object_positions) { uint8_t splay_offset = position.object_number % splay_width; @@ -134,13 +136,13 @@ void JournalPlayer::prefetch_and_watch(double interval) { } void JournalPlayer::unwatch() { + ldout(m_cct, 20) << __func__ << dendl; Mutex::Locker locker(m_lock); m_watch_enabled = false; if (m_watch_scheduled) { - ObjectPlayerPtr object_player = get_object_player(); - assert(object_player); - - object_player->unwatch(); + for (auto &players : m_object_players) { + players.second.begin()->second->unwatch(); + } m_watch_scheduled = false; } } @@ -152,30 +154,38 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) { return false; } - ObjectPlayerPtr object_player = get_object_player(); - assert(object_player); + if (!is_object_set_ready()) { + return false; + } - if (object_player->empty()) { - if (m_watch_enabled && !m_watch_scheduled) { - object_player->watch( - new C_Watch(this, object_player->get_object_number()), - m_watch_interval); - m_watch_scheduled = true; - } else if (!m_watch_enabled && !object_player->is_fetch_in_progress()) { + if (!verify_playback_ready()) { + if (!m_watch_enabled) { ldout(m_cct, 10) << __func__ << ": replay complete" << dendl; m_journal_metadata->get_finisher().queue(new C_HandleComplete( m_replay_handler), 0); + } else if (!m_watch_scheduled) { + schedule_watch(); } return false; } + ObjectPlayerPtr object_player = get_object_player(); + assert(object_player && !object_player->empty()); + object_player->front(entry); object_player->pop_front(); uint64_t last_entry_tid; - if (m_journal_metadata->get_last_allocated_entry_tid( - entry->get_tag_tid(), &last_entry_tid) && - entry->get_entry_tid() != last_entry_tid + 1) { + if (m_active_tag_tid && *m_active_tag_tid != entry->get_tag_tid()) { + lderr(m_cct) << "unexpected tag in journal entry: " << *entry << dendl; + + m_state = STATE_ERROR; + m_journal_metadata->get_finisher().queue(new C_HandleComplete( + m_replay_handler), -ENOMSG); + return false; + } else if (m_journal_metadata->get_last_allocated_entry_tid( + entry->get_tag_tid(), &last_entry_tid) && + entry->get_entry_tid() != last_entry_tid + 1) { lderr(m_cct) << "missing prior journal entry: " << *entry << dendl; m_state = STATE_ERROR; @@ -184,20 +194,9 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) { return false; } - // skip to next splay offset if we cannot apply the next entry in-sequence - if (!object_player->empty()) { - Entry peek_entry; - object_player->front(&peek_entry); - if (peek_entry.get_tag_tid() == entry->get_tag_tid() || - (m_journal_metadata->get_last_allocated_entry_tid( - peek_entry.get_tag_tid(), &last_entry_tid) && - last_entry_tid + 1 != peek_entry.get_entry_tid())) { - advance_splay_object(); - } - } else { - advance_splay_object(); - remove_empty_object_player(object_player); - } + m_active_tag_tid = entry->get_tag_tid(); + advance_splay_object(); + remove_empty_object_player(object_player); m_journal_metadata->reserve_entry_tid(entry->get_tag_tid(), entry->get_entry_tid()); @@ -210,8 +209,9 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) { void JournalPlayer::process_state(uint64_t object_number, int r) { ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << ", " << "r=" << r << dendl; + + assert(m_lock.is_locked()); if (r >= 0) { - Mutex::Locker locker(m_lock); switch (m_state) { case STATE_PREFETCH: ldout(m_cct, 10) << "PREFETCH" << dendl; @@ -232,11 +232,9 @@ void JournalPlayer::process_state(uint64_t object_number, int r) { } if (r < 0) { - { - Mutex::Locker locker(m_lock); - m_state = STATE_ERROR; - } - m_replay_handler->handle_complete(r); + m_state = STATE_ERROR; + m_journal_metadata->get_finisher().queue(new C_HandleComplete( + m_replay_handler), r); } } @@ -328,16 +326,14 @@ int JournalPlayer::process_prefetch(uint64_t object_number) { } m_state = STATE_PLAYBACK; - ObjectPlayerPtr object_player = get_object_player(); - if (!object_player->empty()) { + if (!is_object_set_ready()) { + ldout(m_cct, 10) << __func__ << ": waiting for full object set" << dendl; + } else if (verify_playback_ready()) { ldout(m_cct, 10) << __func__ << ": entries available" << dendl; m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable( m_replay_handler), 0); } else if (m_watch_enabled) { - object_player->watch( - new C_Watch(this, object_player->get_object_number()), - m_watch_interval); - m_watch_scheduled = true; + schedule_watch(); } else { ldout(m_cct, 10) << __func__ << ": no uncommitted entries available" << dendl; @@ -351,18 +347,20 @@ int JournalPlayer::process_playback(uint64_t object_number) { ldout(m_cct, 10) << __func__ << ": object_num=" << object_number << dendl; assert(m_lock.is_locked()); - m_watch_scheduled = false; + if (!is_object_set_ready()) { + return 0; + } ObjectPlayerPtr object_player = get_object_player(); - if (object_player->get_object_number() == object_number) { + if (verify_playback_ready()) { + ldout(m_cct, 10) << __func__ << ": entries available" << dendl; + m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable( + m_replay_handler), 0); + } else if (!m_watch_enabled && is_object_set_ready()) { uint8_t splay_width = m_journal_metadata->get_splay_width(); uint64_t active_set = m_journal_metadata->get_active_set(); uint64_t object_set = object_player->get_object_number() / splay_width; - if (!object_player->empty()) { - ldout(m_cct, 10) << __func__ << ": entries available" << dendl; - m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable( - m_replay_handler), 0); - } else if (object_set == active_set) { + if (object_set == active_set) { ldout(m_cct, 10) << __func__ << ": replay complete" << dendl; m_journal_metadata->get_finisher().queue(new C_HandleComplete( m_replay_handler), 0); @@ -371,6 +369,61 @@ int JournalPlayer::process_playback(uint64_t object_number) { return 0; } +bool JournalPlayer::is_object_set_ready() const { + assert(m_lock.is_locked()); + if (m_watch_scheduled) { + return false; + } + for (auto &players : m_object_players) { + if (players.second.begin()->second->is_fetch_in_progress()) { + return false; + } + } + return true; +} + +bool JournalPlayer::verify_playback_ready() { + assert(m_lock.is_locked()); + assert(is_object_set_ready()); + + ObjectPlayerPtr object_player = get_object_player(); + assert(object_player); + + // Verify is the active object player has another entry available + // in the sequence + Entry entry; + bool entry_available = false; + if (!object_player->empty()) { + entry_available = true; + object_player->front(&entry); + if (!m_active_tag_tid || entry.get_tag_tid() == *m_active_tag_tid) { + return true; + } + } + + // NOTE: replay currently does not check tag class to playback multiple tags + // from different classes (issue #14909). When a new tag is discovered, it + // is assumed that the previous tag was closed at the last replayable entry. + object_player = m_object_players.begin()->second.begin()->second; + if (!object_player->empty() && m_active_tag_tid) { + object_player->front(&entry); + if (entry.get_tag_tid() > *m_active_tag_tid && + entry.get_entry_tid() == 0) { + uint8_t splay_width = m_journal_metadata->get_splay_width(); + m_active_tag_tid = entry.get_tag_tid(); + m_splay_offset = object_player->get_object_number() / splay_width; + + ldout(m_cct, 20) << __func__ << ": new tag " << entry.get_tag_tid() << " " + << "detected, adjusting offset to " + << static_cast(m_splay_offset) << dendl; + return true; + } + } + + // if any entry is available, we can test if the sequence is corrupt + return entry_available; +} + const JournalPlayer::ObjectPlayers &JournalPlayer::get_object_players() const { assert(m_lock.is_locked()); @@ -405,6 +458,7 @@ void JournalPlayer::advance_splay_object() { bool JournalPlayer::remove_empty_object_player(const ObjectPlayerPtr &player) { assert(m_lock.is_locked()); + assert(!m_watch_scheduled); uint8_t splay_width = m_journal_metadata->get_splay_width(); uint64_t object_set = player->get_object_number() / splay_width; @@ -447,11 +501,12 @@ void JournalPlayer::handle_fetched(uint64_t object_num, int r) { ldout(m_cct, 10) << __func__ << ": " << utils::get_object_name(m_object_oid_prefix, object_num) << ": r=" << r << dendl; + + Mutex::Locker locker(m_lock); if (r == -ENOENT) { r = 0; } if (r == 0) { - Mutex::Locker locker(m_lock); uint8_t splay_width = m_journal_metadata->get_splay_width(); uint8_t splay_offset = object_num % splay_width; assert(m_object_players.count(splay_offset) == 1); @@ -461,15 +516,46 @@ void JournalPlayer::handle_fetched(uint64_t object_num, int r) { ObjectPlayerPtr object_player = object_players[object_num]; remove_empty_object_player(object_player); } - process_state(object_num, r); } -void JournalPlayer::handle_watch(uint64_t object_num, int r) { - ldout(m_cct, 10) << __func__ << ": " - << utils::get_object_name(m_object_oid_prefix, object_num) - << ": r=" << r << dendl; - process_state(object_num, r); +void JournalPlayer::schedule_watch() { + ldout(m_cct, 10) << __func__ << dendl; + assert(m_lock.is_locked()); + if (m_watch_scheduled) { + return; + } + + // poll first splay offset and active splay offset since + // new records should only appear in those two objects + C_Watch *ctx = new C_Watch(this); + ObjectPlayerPtr object_player = get_object_player(); + object_player->watch(ctx, m_watch_interval); + + uint8_t splay_width = m_journal_metadata->get_splay_width(); + if (object_player->get_object_number() % splay_width != 0) { + ++ctx->pending_fetches; + + object_player = m_object_players.begin()->second.begin()->second; + object_player->watch(ctx, m_watch_interval); + } + m_watch_scheduled = true; +} + +void JournalPlayer::handle_watch(int r) { + ldout(m_cct, 10) << __func__ << ": r=" << r << dendl; + + Mutex::Locker locker(m_lock); + m_watch_scheduled = false; + std::set object_numbers; + for (auto &players : m_object_players) { + object_numbers.insert( + players.second.begin()->second->get_object_number()); + } + + for (auto object_num : object_numbers) { + process_state(object_num, r); + } } } // namespace journal diff --git a/src/journal/JournalPlayer.h b/src/journal/JournalPlayer.h index a07728ec0a21..bdb93d7d107b 100644 --- a/src/journal/JournalPlayer.h +++ b/src/journal/JournalPlayer.h @@ -12,6 +12,8 @@ #include "journal/JournalMetadata.h" #include "journal/ObjectPlayer.h" #include "cls/journal/cls_journal_types.h" +#include +#include #include class SafeTimer; @@ -51,17 +53,6 @@ private: STATE_ERROR }; - struct C_Watch : public Context { - JournalPlayer *player; - uint64_t object_num; - - C_Watch(JournalPlayer *p, uint64_t o) : player(p), object_num(o) { - } - virtual void finish(int r) { - player->handle_watch(object_num, r); - } - }; - struct C_Fetch : public Context { JournalPlayer *player; uint64_t object_num; @@ -76,6 +67,34 @@ private: } }; + struct C_Watch : public Context { + JournalPlayer *player; + uint8_t pending_fetches = 1; + int ret_val = 0; + + C_Watch(JournalPlayer *player) : player(player) { + } + + virtual void complete(int r) override { + player->m_lock.Lock(); + if (ret_val == 0 && r < 0) { + ret_val = r; + } + + assert(pending_fetches > 0); + if (--pending_fetches == 0) { + player->m_lock.Unlock(); + Context::complete(ret_val); + } else { + player->m_lock.Unlock(); + } + } + + virtual void finish(int r) override { + player->handle_watch(r); + } + }; + librados::IoCtx m_ioctx; CephContext *m_cct; std::string m_object_oid_prefix; @@ -97,9 +116,12 @@ private: SplayedObjectPlayers m_object_players; uint64_t m_commit_object; SplayedObjectPositions m_commit_positions; + boost::optional m_active_tag_tid = boost::none; void advance_splay_object(); + bool is_object_set_ready() const; + bool verify_playback_ready(); const ObjectPlayers &get_object_players() const; ObjectPlayerPtr get_object_player() const; ObjectPlayerPtr get_next_set_object_player() const; @@ -111,7 +133,9 @@ private: void fetch(uint64_t object_num); void handle_fetched(uint64_t object_num, int r); - void handle_watch(uint64_t object_num, int r); + + void schedule_watch(); + void handle_watch(int r); }; } // namespace journal diff --git a/src/journal/ObjectPlayer.cc b/src/journal/ObjectPlayer.cc index 2aca0d73b47b..e890dfa2d90b 100644 --- a/src/journal/ObjectPlayer.cc +++ b/src/journal/ObjectPlayer.cc @@ -207,11 +207,10 @@ void ObjectPlayer::handle_watch_fetched(int r) { Mutex::Locker timer_locker(m_timer_lock); assert(m_watch_in_progress); if (r == -ENOENT) { - schedule_watch(); - } else { - on_finish = m_watch_ctx; - m_watch_ctx = NULL; + r = 0; } + on_finish = m_watch_ctx; + m_watch_ctx = NULL; } if (on_finish != NULL) { diff --git a/src/test/journal/test_JournalPlayer.cc b/src/test/journal/test_JournalPlayer.cc index 7612b4982f56..4eb5628e59b4 100644 --- a/src/test/journal/test_JournalPlayer.cc +++ b/src/test/journal/test_JournalPlayer.cc @@ -249,11 +249,12 @@ TEST_F(TestJournalPlayer, PrefetchMultipleTags) { journal::JournalPlayer::ObjectPositions positions; positions = { - cls::journal::ObjectPosition(0, 234, 122), - cls::journal::ObjectPosition(1, 345, 1)}; + cls::journal::ObjectPosition(2, 234, 122), + cls::journal::ObjectPosition(1, 234, 121), + cls::journal::ObjectPosition(0, 234, 120)}; cls::journal::ObjectSetPosition commit_position(positions); - ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, create(oid, 3)); ASSERT_EQ(0, client_register(oid)); ASSERT_EQ(0, client_commit(oid, commit_position)); @@ -263,13 +264,11 @@ TEST_F(TestJournalPlayer, PrefetchMultipleTags) { journal::JournalPlayer *player = create_player(oid, metadata); ASSERT_EQ(0, write_entry(oid, 0, 234, 120)); - ASSERT_EQ(0, write_entry(oid, 0, 345, 0)); ASSERT_EQ(0, write_entry(oid, 1, 234, 121)); - ASSERT_EQ(0, write_entry(oid, 1, 345, 1)); - ASSERT_EQ(0, write_entry(oid, 0, 234, 122)); - ASSERT_EQ(0, write_entry(oid, 1, 234, 123)); - ASSERT_EQ(0, write_entry(oid, 0, 234, 124)); - ASSERT_EQ(0, write_entry(oid, 0, 345, 2)); + ASSERT_EQ(0, write_entry(oid, 2, 234, 122)); + ASSERT_EQ(0, write_entry(oid, 0, 234, 123)); + ASSERT_EQ(0, write_entry(oid, 1, 234, 124)); + ASSERT_EQ(0, write_entry(oid, 0, 236, 0)); // new tag allocated player->prefetch(); @@ -280,8 +279,8 @@ TEST_F(TestJournalPlayer, PrefetchMultipleTags) { uint64_t last_tid; ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid)); ASSERT_EQ(124U, last_tid); - ASSERT_TRUE(metadata->get_last_allocated_entry_tid(345, &last_tid)); - ASSERT_EQ(2U, last_tid); + ASSERT_TRUE(metadata->get_last_allocated_entry_tid(236, &last_tid)); + ASSERT_EQ(0U, last_tid); } TEST_F(TestJournalPlayer, PrefetchCorruptSequence) { @@ -299,13 +298,41 @@ TEST_F(TestJournalPlayer, PrefetchCorruptSequence) { journal::JournalPlayer *player = create_player(oid, metadata); ASSERT_EQ(0, write_entry(oid, 0, 234, 120)); - ASSERT_EQ(0, write_entry(oid, 0, 345, 0)); ASSERT_EQ(0, write_entry(oid, 1, 234, 121)); ASSERT_EQ(0, write_entry(oid, 0, 234, 124)); player->prefetch(); Entries entries; - ASSERT_TRUE(wait_for_entries(player, 3, &entries)); + ASSERT_TRUE(wait_for_entries(player, 2, &entries)); + + journal::Entry entry; + uint64_t commit_tid; + ASSERT_FALSE(player->try_pop_front(&entry, &commit_tid)); + ASSERT_TRUE(wait_for_complete(player)); + ASSERT_EQ(-ENOMSG, m_replay_hander.complete_result); +} + +TEST_F(TestJournalPlayer, PrefetchUnexpectedTag) { + std::string oid = get_temp_oid(); + + cls::journal::ObjectSetPosition commit_position; + + ASSERT_EQ(0, create(oid)); + ASSERT_EQ(0, client_register(oid)); + ASSERT_EQ(0, client_commit(oid, commit_position)); + + journal::JournalMetadataPtr metadata = create_metadata(oid); + ASSERT_EQ(0, init_metadata(metadata)); + + journal::JournalPlayer *player = create_player(oid, metadata); + + ASSERT_EQ(0, write_entry(oid, 0, 234, 120)); + ASSERT_EQ(0, write_entry(oid, 1, 235, 121)); + ASSERT_EQ(0, write_entry(oid, 0, 234, 124)); + + player->prefetch(); + Entries entries; + ASSERT_TRUE(wait_for_entries(player, 1, &entries)); journal::Entry entry; uint64_t commit_tid; diff --git a/src/test/journal/test_ObjectPlayer.cc b/src/test/journal/test_ObjectPlayer.cc index 586f97094a6d..6103ee6b67f0 100644 --- a/src/test/journal/test_ObjectPlayer.cc +++ b/src/test/journal/test_ObjectPlayer.cc @@ -267,7 +267,7 @@ TEST_F(TestObjectPlayer, Unwatch) { bool done = false; int rval = 0; C_SafeCond *ctx = new C_SafeCond(&mutex, &cond, &done, &rval); - object->watch(ctx, 0.1); + object->watch(ctx, 600); usleep(200000); ASSERT_FALSE(done);