]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: player shutdown is now handled asynchronously
authorJason Dillaman <dillaman@redhat.com>
Tue, 24 May 2016 16:06:26 +0000 (12:06 -0400)
committerJason Dillaman <dillaman@redhat.com>
Wed, 25 May 2016 12:19:17 +0000 (08:19 -0400)
Fixes: http://tracker.ceph.com/issues/15949
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/journal/AsyncOpTracker.cc
src/journal/AsyncOpTracker.h
src/journal/JournalPlayer.cc
src/journal/JournalPlayer.h
src/journal/Journaler.cc
src/journal/ObjectPlayer.cc
src/journal/ObjectPlayer.h
src/journal/Utils.h
src/test/journal/test_JournalPlayer.cc
src/test/journal/test_ObjectPlayer.cc

index 8c24088681e32f2e3fc850b877912b48c28ceb11..13a55fa7a47cce6e8863ce00f24a644a7c4a8b5e 100644 (file)
@@ -22,10 +22,18 @@ void AsyncOpTracker::start_op() {
 }
 
 void AsyncOpTracker::finish_op() {
-  Mutex::Locker locker(m_lock);
-  assert(m_pending_ops > 0);
-  if (--m_pending_ops == 0) {
-    m_cond.Signal();
+  Context *on_finish = nullptr;
+  {
+    Mutex::Locker locker(m_lock);
+    assert(m_pending_ops > 0);
+    if (--m_pending_ops == 0) {
+      m_cond.Signal();
+      std::swap(on_finish, m_on_finish);
+    }
+  }
+
+  if (on_finish != nullptr) {
+    on_finish->complete(0);
   }
 }
 
@@ -36,4 +44,21 @@ void AsyncOpTracker::wait_for_ops() {
   }
 }
 
+void AsyncOpTracker::wait_for_ops(Context *on_finish) {
+  {
+    Mutex::Locker locker(m_lock);
+    assert(m_on_finish == nullptr);
+    if (m_pending_ops > 0) {
+      m_on_finish = on_finish;
+      return;
+    }
+  }
+  on_finish->complete(0);
+}
+
+bool AsyncOpTracker::empty() {
+  Mutex::Locker locker(m_lock);
+  return (m_pending_ops == 0);
+}
+
 } // namespace journal
index cec332f8471bad585a9fa0ee1b19b6e51663f96f..a88cd453fe912c648d98964a52c96c44c7e03d6f 100644 (file)
@@ -8,6 +8,8 @@
 #include "common/Cond.h"
 #include "common/Mutex.h"
 
+struct Context;
+
 namespace journal {
 
 class AsyncOpTracker {
@@ -19,11 +21,15 @@ public:
   void finish_op();
 
   void wait_for_ops();
+  void wait_for_ops(Context *on_finish);
+
+  bool empty();
 
 private:
   Mutex m_lock;
   Cond m_cond;
   uint32_t m_pending_ops;
+  Context *m_on_finish = nullptr;
 
 };
 
index a79b2d484f773190ad51f0b27cf11ccf414def55..28905a2ed1304df65b4fad77ea9fe836bfa63f5f 100644 (file)
@@ -79,9 +79,10 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
 }
 
 JournalPlayer::~JournalPlayer() {
-  m_async_op_tracker.wait_for_ops();
+  assert(m_async_op_tracker.empty());
   {
     Mutex::Locker locker(m_lock);
+    assert(m_shut_down);
     assert(m_fetch_object_numbers.empty());
     assert(!m_watch_scheduled);
   }
@@ -140,16 +141,32 @@ void JournalPlayer::prefetch_and_watch(double interval) {
   prefetch();
 }
 
-void JournalPlayer::unwatch() {
+void JournalPlayer::shut_down(Context *on_finish) {
   ldout(m_cct, 20) << __func__ << dendl;
   Mutex::Locker locker(m_lock);
+
+  assert(!m_shut_down);
+  m_shut_down = true;
   m_watch_enabled = false;
+
+  on_finish = utils::create_async_context_callback(
+      m_journal_metadata, on_finish);
+
   if (m_watch_scheduled) {
-    for (auto &players : m_object_players) {
-      players.second.begin()->second->unwatch();
+    ObjectPlayerPtr object_player = get_object_player();
+    switch (m_watch_step) {
+    case WATCH_STEP_FETCH_FIRST:
+      object_player = m_object_players.begin()->second.begin()->second;
+      // fallthrough
+    case WATCH_STEP_FETCH_CURRENT:
+      object_player->unwatch();
+      break;
+    case WATCH_STEP_ASSERT_ACTIVE:
+      break;
     }
-    m_watch_scheduled = false;
   }
+
+  m_async_op_tracker.wait_for_ops(on_finish);
 }
 
 bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
@@ -623,6 +640,10 @@ void JournalPlayer::handle_fetched(uint64_t object_num, int r) {
   assert(m_fetch_object_numbers.count(object_num) == 1);
   m_fetch_object_numbers.erase(object_num);
 
+  if (m_shut_down) {
+    return;
+  }
+
   if (r == -ENOENT) {
     r = 0;
   }
@@ -647,6 +668,8 @@ void JournalPlayer::schedule_watch() {
     // by an incomplete tag sequence
     ldout(m_cct, 20) << __func__ << ": asserting active tag="
                      << *m_active_tag_tid << dendl;
+
+    m_async_op_tracker.start_op();
     FunctionContext *ctx = new FunctionContext([this](int r) {
         handle_watch_assert_active(r);
       });
@@ -654,9 +677,9 @@ void JournalPlayer::schedule_watch() {
     return;
   }
 
+  ObjectPlayerPtr object_player;
   double watch_interval = m_watch_interval;
 
-  ObjectPlayerPtr object_player = get_object_player();
   switch (m_watch_step) {
   case WATCH_STEP_FETCH_CURRENT:
     {
@@ -684,21 +707,22 @@ void JournalPlayer::schedule_watch() {
 
   ldout(m_cct, 20) << __func__ << ": scheduling watch on "
                    << object_player->get_oid() << dendl;
-  C_Watch *ctx = new C_Watch(this, object_player->get_object_number());
+  Context *ctx = utils::create_async_context_callback(
+    m_journal_metadata, new C_Watch(this, object_player->get_object_number()));
   object_player->watch(ctx, watch_interval);
 }
 
 void JournalPlayer::handle_watch(uint64_t object_num, int r) {
   ldout(m_cct, 10) << __func__ << ": r=" << r << dendl;
-  if (r == -ECANCELED) {
-    // unwatch of object player(s)
-    return;
-  }
-
   Mutex::Locker locker(m_lock);
   assert(m_watch_scheduled);
   m_watch_scheduled = false;
 
+  if (m_shut_down || r == -ECANCELED) {
+    // unwatch of object player(s)
+    return;
+  }
+
   ObjectPlayerPtr object_player = get_object_player(object_num);
   if (r == 0 && object_player->empty()) {
     // possibly need to prune this empty object player if we've
@@ -737,7 +761,10 @@ void JournalPlayer::handle_watch_assert_active(int r) {
   }
 
   m_watch_step = WATCH_STEP_FETCH_CURRENT;
-  schedule_watch();
+  if (!m_shut_down && m_watch_enabled) {
+    schedule_watch();
+  }
+  m_async_op_tracker.finish_op();
 }
 
 void JournalPlayer::notify_entries_available() {
index eb156b3e711f54d27bb18d724bcf43931a9b1156..f50258296c9fb1dcc40b7b4ce3c3fbaaa1d0b960 100644 (file)
@@ -36,7 +36,7 @@ public:
 
   void prefetch();
   void prefetch_and_watch(double interval);
-  void unwatch();
+  void shut_down(Context *on_finish);
 
   bool try_pop_front(Entry *entry, uint64_t *commit_tid);
 
@@ -79,6 +79,10 @@ private:
     uint64_t object_num;
     C_Watch(JournalPlayer *player, uint64_t object_num)
       : player(player), object_num(object_num) {
+      player->m_async_op_tracker.start_op();
+    }
+    virtual ~C_Watch() {
+      player->m_async_op_tracker.finish_op();
     }
 
     virtual void finish(int r) override {
@@ -105,6 +109,7 @@ private:
   WatchStep m_watch_step = WATCH_STEP_FETCH_CURRENT;
   bool m_watch_prune_active_tag = false;
 
+  bool m_shut_down = false;
   bool m_handler_notified = false;
 
   ObjectNumbers m_fetch_object_numbers;
index 2a02f60af3c56448d053e98d51a87543851517a5..d1e80ea8b6173c0dd8db954c2016f907ad2834e1 100644 (file)
@@ -317,7 +317,12 @@ bool Journaler::try_pop_front(ReplayEntry *replay_entry,
 
 void Journaler::stop_replay() {
   assert(m_player != NULL);
-  m_player->unwatch();
+
+  // TODO
+  C_SaferCond ctx;
+  m_player->shut_down(&ctx);
+  ctx.wait();
+
   delete m_player;
   m_player = NULL;
 }
index 2c2f3e33a40b8e1d3134613c3a57a465c2174141..f86e3ef93702e62402f0b09485aee8619c5e8db5 100644 (file)
@@ -21,8 +21,7 @@ ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
     m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), m_order(order),
     m_watch_interval(0), m_watch_task(NULL),
     m_lock(utils::unique_lock_name("ObjectPlayer::m_lock", this)),
-    m_fetch_in_progress(false), m_read_off(0), m_watch_ctx(NULL),
-    m_watch_in_progress(false) {
+    m_fetch_in_progress(false), m_read_off(0) {
   m_ioctx.dup(ioctx);
   m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
 }
@@ -32,8 +31,7 @@ ObjectPlayer::~ObjectPlayer() {
     Mutex::Locker timer_locker(m_timer_lock);
     Mutex::Locker locker(m_lock);
     assert(!m_fetch_in_progress);
-    assert(!m_watch_in_progress);
-    assert(m_watch_ctx == NULL);
+    assert(m_watch_ctx == nullptr);
   }
 }
 
@@ -62,13 +60,10 @@ void ObjectPlayer::watch(Context *on_fetch, double interval) {
   Mutex::Locker timer_locker(m_timer_lock);
   m_watch_interval = interval;
 
-  assert(m_watch_ctx == NULL);
+  assert(m_watch_ctx == nullptr);
   m_watch_ctx = on_fetch;
 
-  // watch callback might lead to re-scheduled watch
-  if (!m_watch_in_progress) {
-    schedule_watch();
-  }
+  schedule_watch();
 }
 
 void ObjectPlayer::unwatch() {
@@ -76,13 +71,14 @@ void ObjectPlayer::unwatch() {
   Context *watch_ctx = nullptr;
   {
     Mutex::Locker timer_locker(m_timer_lock);
+    assert(!m_unwatched);
+    m_unwatched = true;
 
-    cancel_watch();
+    if (!cancel_watch()) {
+      return;
+    }
 
     std::swap(watch_ctx, m_watch_ctx);
-    while (m_watch_in_progress) {
-      m_watch_in_progress_cond.Wait(m_timer_lock);
-    }
   }
 
   if (watch_ctx != nullptr) {
@@ -190,24 +186,27 @@ void ObjectPlayer::schedule_watch() {
   m_timer.add_event_after(m_watch_interval, m_watch_task);
 }
 
-void ObjectPlayer::cancel_watch() {
+bool ObjectPlayer::cancel_watch() {
   assert(m_timer_lock.is_locked());
   ldout(m_cct, 20) << __func__ << ": " << m_oid << " cancelling watch" << dendl;
-  if (m_watch_task != NULL) {
-    m_timer.cancel_event(m_watch_task);
-    m_watch_task = NULL;
+  if (m_watch_task != nullptr) {
+    bool canceled = m_timer.cancel_event(m_watch_task);
+    assert(canceled);
+
+    m_watch_task = nullptr;
+    return true;
   }
+  return false;
 }
 
 void ObjectPlayer::handle_watch_task() {
   assert(m_timer_lock.is_locked());
 
   ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl;
-  assert(m_watch_ctx != NULL);
+  assert(m_watch_ctx != nullptr);
+  assert(m_watch_task != nullptr);
 
-  assert(!m_watch_in_progress);
-  m_watch_in_progress = true;
-  m_watch_task = NULL;
+  m_watch_task = nullptr;
   fetch(new C_WatchFetch(this));
 }
 
@@ -215,38 +214,31 @@ void ObjectPlayer::handle_watch_fetched(int r) {
   ldout(m_cct, 10) << __func__ << ": " << m_oid << " poll complete, r=" << r
                    << dendl;
 
-  Context *on_finish = nullptr;
+  Context *watch_ctx = nullptr;
   {
     Mutex::Locker timer_locker(m_timer_lock);
-    assert(m_watch_in_progress);
     if (r == -ENOENT) {
       r = 0;
     } else {
       m_refetch_required = true;
     }
-    std::swap(on_finish, m_watch_ctx);
-  }
-
-  if (on_finish != nullptr) {
-    on_finish->complete(r);
-  }
-
-  {
-    Mutex::Locker locker(m_timer_lock);
-    assert(m_watch_in_progress);
+    std::swap(watch_ctx, m_watch_ctx);
 
-    // callback might have attempted to re-schedule the watch -- complete now
-    if (m_watch_ctx != nullptr) {
-      schedule_watch();
+    if (m_unwatched) {
+      m_unwatched = false;
+      r = -ECANCELED;
     }
+  }
 
-    m_watch_in_progress = false;
-    m_watch_in_progress_cond.Signal();
+  if (watch_ctx != nullptr) {
+    watch_ctx->complete(r);
   }
 }
 
 void ObjectPlayer::C_Fetch::finish(int r) {
   r = object_player->handle_fetch_complete(r, read_bl);
+  object_player.reset();
+
   on_finish->complete(r);
 }
 
index 73fa7d132b289877d71bf5ea3bd920478e7fbfa0..d0809cec8fe26d350ce2cb53aab6bd25c37d01ab 100644 (file)
@@ -117,16 +117,15 @@ private:
   EntryKeys m_entry_keys;
   InvalidRanges m_invalid_ranges;
 
-  Context *m_watch_ctx;
-  Cond m_watch_in_progress_cond;
-  bool m_watch_in_progress;
+  Context *m_watch_ctx = nullptr;
 
+  bool m_unwatched = false;
   bool m_refetch_required = true;
 
   int handle_fetch_complete(int r, const bufferlist &bl);
 
   void schedule_watch();
-  void cancel_watch();
+  bool cancel_watch();
   void handle_watch_task();
   void handle_watch_fetched(int r);
 };
index e29f359acaedac3490099779deb20996e91cd03a..b0cee75ae828e2bdd11323c55147c959303df6fc 100644 (file)
@@ -5,12 +5,30 @@
 #define CEPH_JOURNAL_UTILS_H
 
 #include "include/int_types.h"
+#include "include/Context.h"
 #include "include/rados/librados.hpp"
 #include <string>
 
 namespace journal {
 namespace utils {
 
+namespace detail {
+
+template <typename M>
+struct C_AsyncCallback : public Context {
+  M journal_metadata;
+  Context *on_finish;
+
+  C_AsyncCallback(M journal_metadata, Context *on_finish)
+    : journal_metadata(journal_metadata), on_finish(on_finish) {
+  }
+  virtual void finish(int r) {
+    journal_metadata->queue(on_finish, r);
+  }
+};
+
+} // namespace detail
+
 template <typename T, void(T::*MF)(int)>
 void rados_state_callback(rados_completion_t c, void *arg) {
   T *obj = reinterpret_cast<T*>(arg);
@@ -24,6 +42,12 @@ std::string unique_lock_name(const std::string &name, void *address);
 
 void rados_ctx_callback(rados_completion_t c, void *arg);
 
+template <typename M>
+Context *create_async_context_callback(M journal_metadata, Context *on_finish) {
+  // use async callback to acquire a clean lock context
+  return new detail::C_AsyncCallback<M>(journal_metadata, on_finish);
+}
+
 } // namespace utils
 } // namespace journal
 
index d106ac4e28d983a71e52bd11bb08ff13df12ef33..000f13ba4d87e15395a9d907fc82b3c44f8fb765 100644 (file)
@@ -11,6 +11,7 @@
 #include "gtest/gtest.h"
 #include "test/journal/RadosTestFixture.h"
 #include <list>
+#include <boost/scope_exit.hpp>
 
 class TestJournalPlayer : public RadosTestFixture {
 public:
@@ -142,6 +143,11 @@ TEST_F(TestJournalPlayer, Prefetch) {
   ASSERT_EQ(0, init_metadata(metadata));
 
   journal::JournalPlayer *player = create_player(oid, metadata);
+  BOOST_SCOPE_EXIT_ALL( (player) ) {
+    C_SaferCond unwatch_ctx;
+    player->shut_down(&unwatch_ctx);
+    ASSERT_EQ(0, unwatch_ctx.wait());
+  };
 
   ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
   ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
@@ -183,6 +189,11 @@ TEST_F(TestJournalPlayer, PrefetchSkip) {
   ASSERT_EQ(0, init_metadata(metadata));
 
   journal::JournalPlayer *player = create_player(oid, metadata);
+  BOOST_SCOPE_EXIT_ALL( (player) ) {
+    C_SaferCond unwatch_ctx;
+    player->shut_down(&unwatch_ctx);
+    ASSERT_EQ(0, unwatch_ctx.wait());
+  };
 
   ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
   ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
@@ -213,6 +224,11 @@ TEST_F(TestJournalPlayer, PrefetchWithoutCommit) {
   ASSERT_EQ(0, init_metadata(metadata));
 
   journal::JournalPlayer *player = create_player(oid, metadata);
+  BOOST_SCOPE_EXIT_ALL( (player) ) {
+    C_SaferCond unwatch_ctx;
+    player->shut_down(&unwatch_ctx);
+    ASSERT_EQ(0, unwatch_ctx.wait());
+  };
 
   ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
   ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
@@ -248,6 +264,11 @@ TEST_F(TestJournalPlayer, PrefetchMultipleTags) {
   ASSERT_EQ(0, init_metadata(metadata));
 
   journal::JournalPlayer *player = create_player(oid, metadata);
+  BOOST_SCOPE_EXIT_ALL( (player) ) {
+    C_SaferCond unwatch_ctx;
+    player->shut_down(&unwatch_ctx);
+    ASSERT_EQ(0, unwatch_ctx.wait());
+  };
 
   ASSERT_EQ(0, write_entry(oid, 0, 234, 120));
   ASSERT_EQ(0, write_entry(oid, 1, 234, 121));
@@ -282,6 +303,11 @@ TEST_F(TestJournalPlayer, PrefetchCorruptSequence) {
   ASSERT_EQ(0, init_metadata(metadata));
 
   journal::JournalPlayer *player = create_player(oid, metadata);
+  BOOST_SCOPE_EXIT_ALL( (player) ) {
+    C_SaferCond unwatch_ctx;
+    player->shut_down(&unwatch_ctx);
+    ASSERT_EQ(0, unwatch_ctx.wait());
+  };
 
   ASSERT_EQ(0, write_entry(oid, 0, 234, 120));
   ASSERT_EQ(0, write_entry(oid, 1, 234, 121));
@@ -311,6 +337,11 @@ TEST_F(TestJournalPlayer, PrefetchMissingSequence) {
   ASSERT_EQ(0, init_metadata(metadata));
 
   journal::JournalPlayer *player = create_player(oid, metadata);
+  BOOST_SCOPE_EXIT_ALL( (player) ) {
+    C_SaferCond unwatch_ctx;
+    player->shut_down(&unwatch_ctx);
+    ASSERT_EQ(0, unwatch_ctx.wait());
+  };
 
   ASSERT_EQ(0, metadata->set_active_set(1));
   ASSERT_EQ(0, write_entry(oid, 0, 2, 852));
@@ -356,6 +387,11 @@ TEST_F(TestJournalPlayer, PrefetchLargeMissingSequence) {
   ASSERT_EQ(0, init_metadata(metadata));
 
   journal::JournalPlayer *player = create_player(oid, metadata);
+  BOOST_SCOPE_EXIT_ALL( (player) ) {
+    C_SaferCond unwatch_ctx;
+    player->shut_down(&unwatch_ctx);
+    ASSERT_EQ(0, unwatch_ctx.wait());
+  };
 
   ASSERT_EQ(0, metadata->set_active_set(2));
   ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
@@ -387,6 +423,11 @@ TEST_F(TestJournalPlayer, PrefetchBlockedNewTag) {
   ASSERT_EQ(0, init_metadata(metadata));
 
   journal::JournalPlayer *player = create_player(oid, metadata);
+  BOOST_SCOPE_EXIT_ALL( (player) ) {
+    C_SaferCond unwatch_ctx;
+    player->shut_down(&unwatch_ctx);
+    ASSERT_EQ(0, unwatch_ctx.wait());
+  };
 
   ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
   ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
@@ -421,6 +462,11 @@ TEST_F(TestJournalPlayer, PrefetchStaleEntries) {
   ASSERT_EQ(0, init_metadata(metadata));
 
   journal::JournalPlayer *player = create_player(oid, metadata);
+  BOOST_SCOPE_EXIT_ALL( (player) ) {
+    C_SaferCond unwatch_ctx;
+    player->shut_down(&unwatch_ctx);
+    ASSERT_EQ(0, unwatch_ctx.wait());
+  };
 
   ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
   ASSERT_EQ(0, write_entry(oid, 1, 0, 3));
@@ -452,6 +498,11 @@ TEST_F(TestJournalPlayer, PrefetchUnexpectedTag) {
   ASSERT_EQ(0, init_metadata(metadata));
 
   journal::JournalPlayer *player = create_player(oid, metadata);
+  BOOST_SCOPE_EXIT_ALL( (player) ) {
+    C_SaferCond unwatch_ctx;
+    player->shut_down(&unwatch_ctx);
+    ASSERT_EQ(0, unwatch_ctx.wait());
+  };
 
   ASSERT_EQ(0, write_entry(oid, 0, 234, 120));
   ASSERT_EQ(0, write_entry(oid, 1, 235, 121));
@@ -484,6 +535,11 @@ TEST_F(TestJournalPlayer, PrefetchAndWatch) {
   ASSERT_EQ(0, init_metadata(metadata));
 
   journal::JournalPlayer *player = create_player(oid, metadata);
+  BOOST_SCOPE_EXIT_ALL( (player) ) {
+    C_SaferCond unwatch_ctx;
+    player->shut_down(&unwatch_ctx);
+    ASSERT_EQ(0, unwatch_ctx.wait());
+  };
 
   ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
 
@@ -518,6 +574,11 @@ TEST_F(TestJournalPlayer, PrefetchSkippedObject) {
   ASSERT_EQ(0, metadata->set_active_set(2));
 
   journal::JournalPlayer *player = create_player(oid, metadata);
+  BOOST_SCOPE_EXIT_ALL( (player) ) {
+    C_SaferCond unwatch_ctx;
+    player->shut_down(&unwatch_ctx);
+    ASSERT_EQ(0, unwatch_ctx.wait());
+  };
 
   ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
   ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
@@ -565,6 +626,11 @@ TEST_F(TestJournalPlayer, ImbalancedJournal) {
   metadata->set_minimum_set(2);
 
   journal::JournalPlayer *player = create_player(oid, metadata);
+  BOOST_SCOPE_EXIT_ALL( (player) ) {
+    C_SaferCond unwatch_ctx;
+    player->shut_down(&unwatch_ctx);
+    ASSERT_EQ(0, unwatch_ctx.wait());
+  };
 
   ASSERT_EQ(0, write_entry(oid, 8, 300, 0));
   ASSERT_EQ(0, write_entry(oid, 8, 301, 0));
@@ -607,6 +673,11 @@ TEST_F(TestJournalPlayer, LiveReplayLaggyAppend) {
   ASSERT_EQ(0, init_metadata(metadata));
 
   journal::JournalPlayer *player = create_player(oid, metadata);
+  BOOST_SCOPE_EXIT_ALL( (player) ) {
+    C_SaferCond unwatch_ctx;
+    player->shut_down(&unwatch_ctx);
+    ASSERT_EQ(0, unwatch_ctx.wait());
+  };
 
   ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
   ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
@@ -652,6 +723,11 @@ TEST_F(TestJournalPlayer, LiveReplayMissingSequence) {
   ASSERT_EQ(0, init_metadata(metadata));
 
   journal::JournalPlayer *player = create_player(oid, metadata);
+  BOOST_SCOPE_EXIT_ALL( (player) ) {
+    C_SaferCond unwatch_ctx;
+    player->shut_down(&unwatch_ctx);
+    ASSERT_EQ(0, unwatch_ctx.wait());
+  };
 
   ASSERT_EQ(0, write_entry(oid, 0, 2, 852));
   ASSERT_EQ(0, write_entry(oid, 0, 2, 856));
@@ -702,6 +778,11 @@ TEST_F(TestJournalPlayer, LiveReplayLargeMissingSequence) {
   ASSERT_EQ(0, init_metadata(metadata));
 
   journal::JournalPlayer *player = create_player(oid, metadata);
+  BOOST_SCOPE_EXIT_ALL( (player) ) {
+    C_SaferCond unwatch_ctx;
+    player->shut_down(&unwatch_ctx);
+    ASSERT_EQ(0, unwatch_ctx.wait());
+  };
 
   ASSERT_EQ(0, metadata->set_active_set(2));
   ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
@@ -733,6 +814,11 @@ TEST_F(TestJournalPlayer, LiveReplayBlockedNewTag) {
   ASSERT_EQ(0, init_metadata(metadata));
 
   journal::JournalPlayer *player = create_player(oid, metadata);
+  BOOST_SCOPE_EXIT_ALL( (player) ) {
+    C_SaferCond unwatch_ctx;
+    player->shut_down(&unwatch_ctx);
+    ASSERT_EQ(0, unwatch_ctx.wait());
+  };
 
   C_SaferCond ctx1;
   cls::journal::Tag tag1;
@@ -787,6 +873,11 @@ TEST_F(TestJournalPlayer, LiveReplayStaleEntries) {
   ASSERT_EQ(0, init_metadata(metadata));
 
   journal::JournalPlayer *player = create_player(oid, metadata);
+  BOOST_SCOPE_EXIT_ALL( (player) ) {
+    C_SaferCond unwatch_ctx;
+    player->shut_down(&unwatch_ctx);
+    ASSERT_EQ(0, unwatch_ctx.wait());
+  };
 
   ASSERT_EQ(0, write_entry(oid, 1, 0, 1));
   ASSERT_EQ(0, write_entry(oid, 1, 0, 3));
@@ -818,6 +909,11 @@ TEST_F(TestJournalPlayer, LiveReplayRefetchRemoveEmpty) {
   ASSERT_EQ(0, init_metadata(metadata));
 
   journal::JournalPlayer *player = create_player(oid, metadata);
+  BOOST_SCOPE_EXIT_ALL( (player) ) {
+    C_SaferCond unwatch_ctx;
+    player->shut_down(&unwatch_ctx);
+    ASSERT_EQ(0, unwatch_ctx.wait());
+  };
 
   ASSERT_EQ(0, metadata->set_active_set(1));
   ASSERT_EQ(0, write_entry(oid, 0, 0, 0));
@@ -844,3 +940,41 @@ TEST_F(TestJournalPlayer, LiveReplayRefetchRemoveEmpty) {
   ASSERT_EQ(expected_entries, entries);
 }
 
+TEST_F(TestJournalPlayer, PrefechShutDown) {
+  std::string oid = get_temp_oid();
+
+  ASSERT_EQ(0, create(oid));
+  ASSERT_EQ(0, client_register(oid));
+  ASSERT_EQ(0, client_commit(oid, {}));
+
+  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  ASSERT_EQ(0, init_metadata(metadata));
+
+  journal::JournalPlayer *player = create_player(oid, metadata);
+  BOOST_SCOPE_EXIT_ALL( (player) ) {
+    C_SaferCond unwatch_ctx;
+    player->shut_down(&unwatch_ctx);
+    ASSERT_EQ(0, unwatch_ctx.wait());
+  };
+  player->prefetch();
+}
+
+TEST_F(TestJournalPlayer, LiveReplayShutDown) {
+  std::string oid = get_temp_oid();
+
+  ASSERT_EQ(0, create(oid));
+  ASSERT_EQ(0, client_register(oid));
+  ASSERT_EQ(0, client_commit(oid, {}));
+
+  journal::JournalMetadataPtr metadata = create_metadata(oid);
+  ASSERT_EQ(0, init_metadata(metadata));
+
+  journal::JournalPlayer *player = create_player(oid, metadata);
+  BOOST_SCOPE_EXIT_ALL( (player) ) {
+    C_SaferCond unwatch_ctx;
+    player->shut_down(&unwatch_ctx);
+    ASSERT_EQ(0, unwatch_ctx.wait());
+  };
+  player->prefetch_and_watch(0.25);
+}
+
index 6103ee6b67f01eb6930c79d3aae1950cf7c0ae86..67c35a1d12c641c05d4f0026022d4566f7dd322c 100644 (file)
@@ -262,14 +262,11 @@ TEST_F(TestObjectPlayer, Unwatch) {
   std::string oid = get_temp_oid();
   journal::ObjectPlayerPtr object = create_object(oid, 14);
 
-  Mutex mutex("lock");
-  Cond cond;
-  bool done = false;
-  int rval = 0;
-  C_SafeCond *ctx = new C_SafeCond(&mutex, &cond, &done, &rval);
-  object->watch(ctx, 600);
+  C_SaferCond watch_ctx;
+  object->watch(&watch_ctx, 600);
 
   usleep(200000);
-  ASSERT_FALSE(done);
+
   object->unwatch();
+  ASSERT_EQ(-ECANCELED, watch_ctx.wait());
 }