]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: use provided work queue and timer
authorJason Dillaman <dillaman@redhat.com>
Thu, 3 Mar 2016 04:44:20 +0000 (23:44 -0500)
committerJason Dillaman <dillaman@redhat.com>
Tue, 8 Mar 2016 12:11:51 +0000 (07:11 -0500)
This avoids the need to open two threads per journaler.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
14 files changed:
src/journal/FutureImpl.cc
src/journal/JournalMetadata.cc
src/journal/JournalMetadata.h
src/journal/JournalPlayer.cc
src/journal/JournalRecorder.cc
src/journal/JournalTrimmer.cc
src/journal/Journaler.cc
src/journal/Journaler.h
src/test/journal/RadosTestFixture.cc
src/test/journal/RadosTestFixture.h
src/test/journal/test_JournalMetadata.cc
src/test/journal/test_JournalTrimmer.cc
src/test/journal/test_Journaler.cc
src/test/journal/test_ObjectRecorder.cc

index 0345c09cff18e5965099fd4e210f764ecd69ded5..11eda4456616c2f5f506b49b6dfefdec29cc2dce 100644 (file)
@@ -2,7 +2,6 @@
 // vim: ts=8 sw=2 smarttab
 
 #include "journal/FutureImpl.h"
-#include "common/Finisher.h"
 #include "journal/JournalMetadata.h"
 #include "journal/Utils.h"
 
@@ -52,7 +51,7 @@ void FutureImpl::flush(Context *on_safe) {
   }
 
   if (complete && on_safe != NULL) {
-    m_journal_metadata->get_finisher().queue(on_safe, m_return_value);
+    m_journal_metadata->queue(on_safe, m_return_value);
   } else if (flush_handler) {
     // attached to journal object -- instruct it to flush all entries through
     // this one.  possible to become detached while lock is released, so flush
@@ -70,7 +69,7 @@ void FutureImpl::wait(Context *on_safe) {
       return;
     }
   }
-  m_journal_metadata->get_finisher().queue(on_safe, m_return_value);
+  m_journal_metadata->queue(on_safe, m_return_value);
 }
 
 bool FutureImpl::is_complete() const {
index 5652ba6f5ce3058f12bf1fba9f19e43bacb73cfd..bf7ac2808864dd1e13309a794ac31a229693f1dc 100644 (file)
@@ -4,7 +4,6 @@
 #include "journal/JournalMetadata.h"
 #include "journal/Utils.h"
 #include "common/errno.h"
-#include "common/Finisher.h"
 #include "common/Timer.h"
 #include "cls/journal/cls_journal_client.h"
 #include <functional>
@@ -231,14 +230,15 @@ struct C_FlushCommitPosition : public Context {
 
 } // anonymous namespace
 
-JournalMetadata::JournalMetadata(librados::IoCtx &ioctx,
+JournalMetadata::JournalMetadata(ContextWQ *work_queue, SafeTimer *timer,
+                                 Mutex *timer_lock, librados::IoCtx &ioctx,
                                  const std::string &oid,
                                  const std::string &client_id,
                                  double commit_interval)
     : RefCountedObject(NULL, 0), m_cct(NULL), m_oid(oid),
       m_client_id(client_id), m_commit_interval(commit_interval), m_order(0),
-      m_splay_width(0), m_pool_id(-1), m_initialized(false), m_finisher(NULL),
-      m_timer(NULL), m_timer_lock("JournalMetadata::m_timer_lock"),
+      m_splay_width(0), m_pool_id(-1), m_initialized(false),
+      m_work_queue(work_queue), m_timer(timer), m_timer_lock(timer_lock),
       m_lock("JournalMetadata::m_lock"), m_commit_tid(0), m_watch_ctx(this),
       m_watch_handle(0), m_minimum_set(0), m_active_set(0),
       m_update_notifications(0), m_commit_position_ctx(NULL),
@@ -249,7 +249,7 @@ JournalMetadata::JournalMetadata(librados::IoCtx &ioctx,
 
 JournalMetadata::~JournalMetadata() {
   if (m_initialized) {
-    shutdown();
+    shut_down();
   }
 }
 
@@ -257,12 +257,6 @@ void JournalMetadata::init(Context *on_init) {
   assert(!m_initialized);
   m_initialized = true;
 
-  m_finisher = new Finisher(m_cct);
-  m_finisher->start();
-
-  m_timer = new SafeTimer(m_cct, m_timer_lock, true);
-  m_timer->init();
-
   int r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx);
   if (r < 0) {
     lderr(m_cct) << __func__ << ": failed to watch journal"
@@ -275,7 +269,7 @@ void JournalMetadata::init(Context *on_init) {
   get_immutable_metadata(&m_order, &m_splay_width, &m_pool_id, ctx);
 }
 
-void JournalMetadata::shutdown() {
+void JournalMetadata::shut_down() {
 
   ldout(m_cct, 20) << __func__ << dendl;
 
@@ -292,24 +286,10 @@ void JournalMetadata::shutdown() {
 
   flush_commit_position();
 
-  if (m_timer != NULL) {
-    Mutex::Locker locker(m_timer_lock);
-    m_timer->shutdown();
-    delete m_timer;
-    m_timer = NULL;
-  }
-
-  if (m_finisher != NULL) {
-    m_finisher->stop();
-    delete m_finisher;
-    m_finisher = NULL;
-  }
-
   librados::Rados rados(m_ioctx);
   rados.watch_flush();
 
   m_async_op_tracker.wait_for_ops();
-  m_ioctx.aio_flush();
 }
 
 void JournalMetadata::get_immutable_metadata(uint8_t *order,
@@ -458,7 +438,7 @@ void JournalMetadata::set_active_set(uint64_t object_set) {
 void JournalMetadata::flush_commit_position() {
   ldout(m_cct, 20) << __func__ << dendl;
 
-  Mutex::Locker timer_locker(m_timer_lock);
+  Mutex::Locker timer_locker(*m_timer_lock);
   Mutex::Locker locker(m_lock);
   if (m_commit_position_ctx == nullptr) {
     return;
@@ -471,12 +451,12 @@ void JournalMetadata::flush_commit_position() {
 void JournalMetadata::flush_commit_position(Context *on_safe) {
   ldout(m_cct, 20) << __func__ << dendl;
 
-  Mutex::Locker timer_locker(m_timer_lock);
+  Mutex::Locker timer_locker(*m_timer_lock);
   Mutex::Locker locker(m_lock);
   if (m_commit_position_ctx == nullptr) {
     // nothing to flush
     if (on_safe != nullptr) {
-      m_finisher->queue(on_safe, 0);
+      m_work_queue->queue(on_safe, 0);
     }
     return;
   }
@@ -567,7 +547,7 @@ void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
 void JournalMetadata::cancel_commit_task() {
   ldout(m_cct, 20) << __func__ << dendl;
 
-  assert(m_timer_lock.is_locked());
+  assert(m_timer_lock->is_locked());
   assert(m_lock.is_locked());
   assert(m_commit_position_ctx != nullptr);
   assert(m_commit_position_task_ctx != nullptr);
@@ -579,7 +559,7 @@ void JournalMetadata::cancel_commit_task() {
 void JournalMetadata::schedule_commit_task() {
   ldout(m_cct, 20) << __func__ << dendl;
 
-  assert(m_timer_lock.is_locked());
+  assert(m_timer_lock->is_locked());
   assert(m_lock.is_locked());
   assert(m_commit_position_ctx != nullptr);
   if (m_commit_position_task_ctx == NULL) {
@@ -589,7 +569,7 @@ void JournalMetadata::schedule_commit_task() {
 }
 
 void JournalMetadata::handle_commit_position_task() {
-  assert(m_timer_lock.is_locked());
+  assert(m_timer_lock->is_locked());
   assert(m_lock.is_locked());
   ldout(m_cct, 20) << __func__ << dendl;
 
@@ -610,12 +590,12 @@ void JournalMetadata::handle_commit_position_task() {
 }
 
 void JournalMetadata::schedule_watch_reset() {
-  assert(m_timer_lock.is_locked());
+  assert(m_timer_lock->is_locked());
   m_timer->add_event_after(0.1, new C_WatchReset(this));
 }
 
 void JournalMetadata::handle_watch_reset() {
-  assert(m_timer_lock.is_locked());
+  assert(m_timer_lock->is_locked());
   if (!m_initialized) {
     return;
   }
@@ -642,7 +622,7 @@ void JournalMetadata::handle_watch_notify(uint64_t notify_id, uint64_t cookie) {
 
 void JournalMetadata::handle_watch_error(int err) {
   lderr(m_cct) << "journal watch error: " << cpp_strerror(err) << dendl;
-  Mutex::Locker timer_locker(m_timer_lock);
+  Mutex::Locker timer_locker(*m_timer_lock);
   Mutex::Locker locker(m_lock);
 
   // release old watch on error
@@ -679,7 +659,7 @@ void JournalMetadata::committed(uint64_t commit_tid,
   ObjectSetPosition commit_position;
   Context *stale_ctx = nullptr;
   {
-    Mutex::Locker timer_locker(m_timer_lock);
+    Mutex::Locker timer_locker(*m_timer_lock);
     Mutex::Locker locker(m_lock);
     assert(commit_tid > m_commit_position_tid);
 
index 6d0e7272a996952261dc0278053fb6cf28882b2e..2c590f2e760ce76b3adec61db940e92647e0f22f 100644 (file)
@@ -10,6 +10,7 @@
 #include "common/Cond.h"
 #include "common/Mutex.h"
 #include "common/RefCountedObj.h"
+#include "common/WorkQueue.h"
 #include "cls/journal/cls_journal_types.h"
 #include "journal/AsyncOpTracker.h"
 #include <boost/intrusive_ptr.hpp>
@@ -21,7 +22,6 @@
 #include <string>
 #include "include/assert.h"
 
-class Finisher;
 class SafeTimer;
 
 namespace journal {
@@ -46,12 +46,13 @@ public:
     virtual void handle_update(JournalMetadata *) = 0;
   };
 
-  JournalMetadata(librados::IoCtx &ioctx, const std::string &oid,
+  JournalMetadata(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock,
+                  librados::IoCtx &ioctx, const std::string &oid,
                   const std::string &client_id, double commit_interval);
   ~JournalMetadata();
 
   void init(Context *on_init);
-  void shutdown();
+  void shut_down();
 
   void get_immutable_metadata(uint8_t *order, uint8_t *splay_width,
                              int64_t *pool_id, Context *on_finish);
@@ -84,15 +85,15 @@ public:
     return m_pool_id;
   }
 
-  inline Finisher &get_finisher() {
-    return *m_finisher;
+  inline void queue(Context *on_finish, int r) {
+    m_work_queue->queue(on_finish, r);
   }
 
   inline SafeTimer &get_timer() {
     return *m_timer;
   }
   inline Mutex &get_timer_lock() {
-    return m_timer_lock;
+    return *m_timer_lock;
   }
 
   void set_minimum_set(uint64_t object_set);
@@ -283,9 +284,9 @@ private:
   int64_t m_pool_id;
   bool m_initialized;
 
-  Finisher *m_finisher;
+  ContextWQ *m_work_queue;
   SafeTimer *m_timer;
-  Mutex m_timer_lock;
+  Mutex *m_timer_lock;
 
   mutable Mutex m_lock;
 
index 0478cec91be98d64f6fe37592618cd517d9dee96..3559e240fb65adba0a0d4f6a6836dea9a18cf573 100644 (file)
@@ -2,7 +2,6 @@
 // vim: ts=8 sw=2 smarttab
 
 #include "journal/JournalPlayer.h"
-#include "common/Finisher.h"
 #include "journal/Entry.h"
 #include "journal/ReplayHandler.h"
 #include "journal/Utils.h"
@@ -163,7 +162,7 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
       m_watch_scheduled = true;
     } else if (!m_watch_enabled && !object_player->is_fetch_in_progress()) {
       ldout(m_cct, 10) << __func__ << ": replay complete" << dendl;
-      m_journal_metadata->get_finisher().queue(new C_HandleComplete(
+      m_journal_metadata->queue(new C_HandleComplete(
         m_replay_handler), 0);
     }
     return false;
@@ -179,7 +178,7 @@ bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
     lderr(m_cct) << "missing prior journal entry: " << *entry << dendl;
 
     m_state = STATE_ERROR;
-    m_journal_metadata->get_finisher().queue(new C_HandleComplete(
+    m_journal_metadata->queue(new C_HandleComplete(
       m_replay_handler), -ENOMSG);
     return false;
   }
@@ -331,7 +330,7 @@ int JournalPlayer::process_prefetch(uint64_t object_number) {
   ObjectPlayerPtr object_player = get_object_player();
   if (!object_player->empty()) {
     ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
-    m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
+    m_journal_metadata->queue(new C_HandleEntriesAvailable(
       m_replay_handler), 0);
   } else if (m_watch_enabled) {
     object_player->watch(
@@ -341,7 +340,7 @@ int JournalPlayer::process_prefetch(uint64_t object_number) {
   } else {
     ldout(m_cct, 10) << __func__ << ": no uncommitted entries available"
                      << dendl;
-    m_journal_metadata->get_finisher().queue(new C_HandleComplete(
+    m_journal_metadata->queue(new C_HandleComplete(
       m_replay_handler), 0);
   }
   return 0;
@@ -360,11 +359,11 @@ int JournalPlayer::process_playback(uint64_t object_number) {
     uint64_t object_set = object_player->get_object_number() / splay_width;
     if (!object_player->empty()) {
       ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
-      m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
+      m_journal_metadata->queue(new C_HandleEntriesAvailable(
         m_replay_handler), 0);
     } else if (object_set == active_set) {
       ldout(m_cct, 10) << __func__ << ": replay complete" << dendl;
-      m_journal_metadata->get_finisher().queue(new C_HandleComplete(
+      m_journal_metadata->queue(new C_HandleComplete(
         m_replay_handler), 0);
     }
   }
index 485c1f2988cd52002281c908f874c82dbb9c609a..065f692821771a7122775e16d437dccfc194d008 100644 (file)
@@ -2,7 +2,6 @@
 // vim: ts=8 sw=2 smarttab
 
 #include "journal/JournalRecorder.h"
-#include "common/Finisher.h"
 #include "journal/Entry.h"
 #include "journal/Utils.h"
 
@@ -32,7 +31,7 @@ struct C_Flush : public Context {
     }
     if (pending_flushes.dec() == 0) {
       // ensure all prior callback have been flushed as well
-      journal_metadata->get_finisher().queue(on_finish, ret_val);
+      journal_metadata->queue(on_finish, ret_val);
       delete this;
     }
   }
index f00925303c01d77a38a884056cd44394a96a41de..68ba5f42ab4e195ad6f7dde6bd497ad04b8f8670 100644 (file)
@@ -5,7 +5,6 @@
 #include "journal/Utils.h"
 #include "common/Cond.h"
 #include "common/errno.h"
-#include "common/Finisher.h"
 #include <limits>
 
 #define dout_subsys ceph_subsys_journaler
index a6220ef1012afbd91d4115492c2c7982e3de6476..81425d59ab5c06ff69d66299377070aa91b354eb 100644 (file)
@@ -4,6 +4,8 @@
 #include "journal/Journaler.h"
 #include "include/stringify.h"
 #include "common/errno.h"
+#include "common/Timer.h"
+#include "common/WorkQueue.h"
 #include "journal/Entry.h"
 #include "journal/FutureImpl.h"
 #include "journal/JournalMetadata.h"
@@ -51,31 +53,74 @@ std::string Journaler::object_oid_prefix(int pool_id,
   return JOURNAL_OBJECT_PREFIX + stringify(pool_id) + "." + journal_id + ".";
 }
 
+Journaler::Threads::Threads(CephContext *cct)
+    : timer_lock("Journaler::timer_lock") {
+  thread_pool = new ThreadPool(cct, "Journaler::thread_pool", "tp_journal", 1);
+  thread_pool->start();
+
+  work_queue = new ContextWQ("Journaler::work_queue", 60, thread_pool);
+
+  timer = new SafeTimer(cct, timer_lock, true);
+  timer->init();
+}
+
+Journaler::Threads::~Threads() {
+  {
+    Mutex::Locker timer_locker(timer_lock);
+    timer->shutdown();
+  }
+  delete timer;
+
+  work_queue->drain();
+  delete work_queue;
+
+  thread_pool->stop();
+  delete thread_pool;
+}
+
 Journaler::Journaler(librados::IoCtx &header_ioctx,
+                     const std::string &journal_id,
+                     const std::string &client_id, double commit_interval)
+    : m_threads(new Threads(reinterpret_cast<CephContext*>(header_ioctx.cct()))),
+      m_client_id(client_id) {
+  set_up(m_threads->work_queue, m_threads->timer, &m_threads->timer_lock,
+         header_ioctx, journal_id, commit_interval);
+}
+
+Journaler::Journaler(ContextWQ *work_queue, SafeTimer *timer,
+                     Mutex *timer_lock, librados::IoCtx &header_ioctx,
                     const std::string &journal_id,
                     const std::string &client_id, double commit_interval)
-  : m_client_id(client_id), m_metadata(NULL), m_player(NULL), m_recorder(NULL),
-    m_trimmer(NULL)
-{
+    : m_client_id(client_id) {
+  set_up(work_queue, timer, timer_lock, header_ioctx, journal_id,
+         commit_interval);
+}
+
+void Journaler::set_up(ContextWQ *work_queue, SafeTimer *timer,
+                       Mutex *timer_lock, librados::IoCtx &header_ioctx,
+                       const std::string &journal_id, double commit_interval) {
   m_header_ioctx.dup(header_ioctx);
   m_cct = reinterpret_cast<CephContext *>(m_header_ioctx.cct());
 
   m_header_oid = header_oid(journal_id);
   m_object_oid_prefix = object_oid_prefix(m_header_ioctx.get_id(), journal_id);
 
-  m_metadata = new JournalMetadata(m_header_ioctx, m_header_oid, m_client_id,
+  m_metadata = new JournalMetadata(work_queue, timer, timer_lock,
+                                   m_header_ioctx, m_header_oid, m_client_id,
                                    commit_interval);
   m_metadata->get();
 }
 
 Journaler::~Journaler() {
-  if (m_metadata != NULL) {
+  if (m_metadata != nullptr) {
     m_metadata->put();
-    m_metadata = NULL;
+    m_metadata = nullptr;
   }
   delete m_trimmer;
-  assert(m_player == NULL);
-  assert(m_recorder == NULL);
+  assert(m_player == nullptr);
+  assert(m_recorder == nullptr);
+
+  delete m_threads;
 }
 
 int Journaler::exists(bool *header_exists) const {
@@ -116,8 +161,8 @@ int Journaler::init_complete() {
   return 0;
 }
 
-void Journaler::shutdown() {
-  m_metadata->shutdown();
+void Journaler::shut_down() {
+  m_metadata->shut_down();
 }
 
 void Journaler::get_immutable_metadata(uint8_t *order, uint8_t *splay_width,
@@ -152,7 +197,7 @@ int Journaler::create(uint8_t order, uint8_t splay_width, int64_t pool_id) {
 }
 
 int Journaler::remove(bool force) {
-  m_metadata->shutdown();
+  m_metadata->shut_down();
 
   ldout(m_cct, 5) << "removing journal: " << m_header_oid << dendl;
   int r = m_trimmer->remove_objects(force);
index 2e9ba85bb20836c2d65b19584acc97df3434ea58..d0be73ab17d384581b5463294a7466fd97791f80 100644 (file)
@@ -15,7 +15,9 @@
 #include <string>
 #include "include/assert.h"
 
+class ContextWQ;
 class SafeTimer;
+class ThreadPool;
 
 namespace journal {
 
@@ -28,6 +30,17 @@ class ReplayHandler;
 
 class Journaler {
 public:
+  struct Threads {
+    Threads(CephContext *cct);
+    ~Threads();
+
+    ThreadPool *thread_pool = nullptr;
+    ContextWQ *work_queue = nullptr;
+
+    SafeTimer *timer = nullptr;
+    Mutex timer_lock;
+  };
+
   typedef std::list<cls::journal::Tag> Tags;
   typedef std::set<cls::journal::Client> RegisteredClients;
 
@@ -37,6 +50,9 @@ public:
 
   Journaler(librados::IoCtx &header_ioctx, const std::string &journal_id,
            const std::string &client_id, double commit_interval);
+  Journaler(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock,
+            librados::IoCtx &header_ioctx, const std::string &journal_id,
+           const std::string &client_id, double commit_interval);
   ~Journaler();
 
   int exists(bool *header_exists) const;
@@ -44,7 +60,7 @@ public:
   int remove(bool force);
 
   void init(Context *on_init);
-  void shutdown();
+  void shut_down();
 
   void get_immutable_metadata(uint8_t *order, uint8_t *splay_width,
                              int64_t *pool_id, Context *on_finish);
@@ -95,6 +111,8 @@ private:
     }
   };
 
+  Threads *m_threads = nullptr;
+
   mutable librados::IoCtx m_header_ioctx;
   librados::IoCtx m_data_ioctx;
   CephContext *m_cct;
@@ -103,10 +121,14 @@ private:
   std::string m_header_oid;
   std::string m_object_oid_prefix;
 
-  JournalMetadata *m_metadata;
-  JournalPlayer *m_player;
-  JournalRecorder *m_recorder;
-  JournalTrimmer *m_trimmer;
+  JournalMetadata *m_metadata = nullptr;
+  JournalPlayer *m_player = nullptr;
+  JournalRecorder *m_recorder = nullptr;
+  JournalTrimmer *m_trimmer = nullptr;
+
+  void set_up(ContextWQ *work_queue, SafeTimer *timer, Mutex *timer_lock,
+              librados::IoCtx &header_ioctx, const std::string &journal_id,
+              double commit_interval);
 
   int init_complete();
   void create_player(ReplayHandler *replay_handler);
index 82f946c4d79eabac062eda7e75a6753ba9e71187..6fa8759545961d0d00e3db11d7951221b96cf3fb 100644 (file)
@@ -4,6 +4,7 @@
 #include "test/journal/RadosTestFixture.h"
 #include "cls/journal/cls_journal_client.h"
 #include "include/stringify.h"
+#include "common/WorkQueue.h"
 
 RadosTestFixture::RadosTestFixture()
   : m_timer_lock("m_timer_lock"), m_timer(NULL), m_listener(this) {
@@ -12,10 +13,18 @@ RadosTestFixture::RadosTestFixture()
 void RadosTestFixture::SetUpTestCase() {
   _pool_name = get_temp_pool_name();
   ASSERT_EQ("", create_one_pool_pp(_pool_name, _rados));
+
+  CephContext* cct = reinterpret_cast<CephContext*>(_rados.cct());
+  _thread_pool = new ThreadPool(cct, "RadosTestFixture::_thread_pool",
+                                 "tp_test", 1);
+  _thread_pool->start();
 }
 
 void RadosTestFixture::TearDownTestCase() {
   ASSERT_EQ(0, destroy_one_pool_pp(_pool_name, _rados));
+
+  _thread_pool->stop();
+  delete _thread_pool;
 }
 
 std::string RadosTestFixture::get_temp_oid() {
@@ -25,8 +34,12 @@ std::string RadosTestFixture::get_temp_oid() {
 
 void RadosTestFixture::SetUp() {
   ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), m_ioctx));
-  m_timer = new SafeTimer(reinterpret_cast<CephContext*>(m_ioctx.cct()),
-                          m_timer_lock, true);
+
+  CephContext* cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
+  m_work_queue = new ContextWQ("RadosTestFixture::m_work_queue", 60,
+                               _thread_pool);
+
+  m_timer = new SafeTimer(cct, m_timer_lock, true);
   m_timer->init();
 }
 
@@ -36,6 +49,9 @@ void RadosTestFixture::TearDown() {
     m_timer->shutdown();
   }
   delete m_timer;
+
+  m_work_queue->drain();
+  delete m_work_queue;
 }
 
 int RadosTestFixture::create(const std::string &oid, uint8_t order,
@@ -47,7 +63,8 @@ journal::JournalMetadataPtr RadosTestFixture::create_metadata(
     const std::string &oid, const std::string &client_id,
     double commit_internal) {
   journal::JournalMetadataPtr metadata(new journal::JournalMetadata(
-    m_ioctx, oid, client_id, commit_internal));
+    m_work_queue, m_timer, &m_timer_lock, m_ioctx, oid, client_id,
+    commit_internal));
   return metadata;
 }
 
@@ -101,3 +118,4 @@ bool RadosTestFixture::wait_for_update(journal::JournalMetadataPtr metadata) {
 std::string RadosTestFixture::_pool_name;
 librados::Rados RadosTestFixture::_rados;
 uint64_t RadosTestFixture::_oid_number = 0;
+ThreadPool *RadosTestFixture::_thread_pool = nullptr;
index 0635032524400391bad9e0c192f99d68b449b7ec..3415b0b6f02e0441d46cad474d940e73bca9f750 100644 (file)
@@ -8,6 +8,8 @@
 #include "cls/journal/cls_journal_types.h"
 #include "gtest/gtest.h"
 
+class ThreadPool;
+
 class RadosTestFixture : public ::testing::Test {
 public:
   static void SetUpTestCase();
@@ -56,9 +58,12 @@ public:
   static std::string _pool_name;
   static librados::Rados _rados;
   static uint64_t _oid_number;
+  static ThreadPool *_thread_pool;
 
   librados::IoCtx m_ioctx;
 
+  ContextWQ *m_work_queue;
+
   Mutex m_timer_lock;
   SafeTimer *m_timer;
 
index ec96fc01e9dd839afc8af602edda33e754fec7b5..850263d888042f7ce70f0e43fdf1c92d6b17c2ea 100644 (file)
@@ -14,6 +14,8 @@ public:
          it != m_metadata_list.end(); ++it) {
       (*it)->remove_listener(&m_listener);
     }
+    m_metadata_list.clear();
+
     RadosTestFixture::TearDown();
   }
 
index 9c55aa4f5d2ee57b5f5b7ae67dc6544e6a3107bc..9a9291fb418dc582c9f6725bc17f933edad3f2e5 100644 (file)
@@ -16,6 +16,8 @@ public:
          it != m_metadata_list.end(); ++it) {
       (*it)->remove_listener(&m_listener);
     }
+    m_metadata_list.clear();
+
     for (std::list<journal::JournalTrimmer*>::iterator it = m_trimmers.begin();
          it != m_trimmers.end(); ++it) {
       delete *it;
index 2c33f052ddedc1c75f871f75697ed4f3c313f242..df029a5a4773a79c86620a7a48591a32d1a11845 100644 (file)
@@ -20,7 +20,8 @@ public:
   virtual void SetUp() {
     RadosTestFixture::SetUp();
     m_journal_id = get_temp_journal_id();
-    m_journaler = new journal::Journaler(m_ioctx, m_journal_id, CLIENT_ID, 5);
+    m_journaler = new journal::Journaler(m_work_queue, m_timer, &m_timer_lock,
+                                         m_ioctx, m_journal_id, CLIENT_ID, 5);
   }
 
   virtual void TearDown() {
@@ -39,7 +40,8 @@ public:
   }
 
   int register_client(const std::string &client_id, const std::string &desc) {
-    journal::Journaler journaler(m_ioctx, m_journal_id, client_id, 5);
+    journal::Journaler journaler(m_work_queue, m_timer, &m_timer_lock,
+                                 m_ioctx, m_journal_id, client_id, 5);
     bufferlist data;
     data.append(desc);
     C_SaferCond cond;
@@ -48,7 +50,8 @@ public:
   }
 
   int update_client(const std::string &client_id, const std::string &desc) {
-    journal::Journaler journaler(m_ioctx, m_journal_id, client_id, 5);
+    journal::Journaler journaler(m_work_queue, m_timer, &m_timer_lock,
+                                 m_ioctx, m_journal_id, client_id, 5);
     bufferlist data;
     data.append(desc);
     C_SaferCond cond;
@@ -57,7 +60,8 @@ public:
   }
 
   int unregister_client(const std::string &client_id) {
-    journal::Journaler journaler(m_ioctx, m_journal_id, client_id, 5);
+    journal::Journaler journaler(m_work_queue, m_timer, &m_timer_lock,
+                                 m_ioctx, m_journal_id, client_id, 5);
     C_SaferCond cond;
     journaler.unregister_client(&cond);
     return cond.wait();
index a04b3a842677b4795ecf1eef4339264668cd2462..65d74b6e58afb8594c08844bbf91f3920ee5b4f5 100644 (file)
@@ -52,6 +52,8 @@ public:
       (*it)->flush(&cond);
       cond.wait();
     }
+    m_object_recorders.clear();
+
     RadosTestFixture::TearDown();
   }