]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: fix issues discovered via valgrind
authorJason Dillaman <dillaman@redhat.com>
Thu, 16 Jul 2015 18:41:49 +0000 (14:41 -0400)
committerJason Dillaman <dillaman@redhat.com>
Fri, 6 Nov 2015 01:42:42 +0000 (20:42 -0500)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
15 files changed:
src/journal/AsyncOpTracker.cc [new file with mode: 0644]
src/journal/AsyncOpTracker.h [new file with mode: 0644]
src/journal/JournalMetadata.cc
src/journal/JournalMetadata.h
src/journal/JournalPlayer.cc
src/journal/JournalPlayer.h
src/journal/JournalTrimmer.cc
src/journal/JournalTrimmer.h
src/journal/Journaler.cc
src/journal/Makefile.am
src/journal/ObjectPlayer.cc
src/journal/ObjectPlayer.h
src/journal/ObjectRecorder.cc
src/journal/ObjectRecorder.h
src/journal/ReplayHandler.h

diff --git a/src/journal/AsyncOpTracker.cc b/src/journal/AsyncOpTracker.cc
new file mode 100644 (file)
index 0000000..8c24088
--- /dev/null
@@ -0,0 +1,39 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "journal/AsyncOpTracker.h"
+#include "journal/Utils.h"
+#include "include/assert.h"
+
+namespace journal {
+
+AsyncOpTracker::AsyncOpTracker()
+  : m_lock(utils::unique_lock_name("AsyncOpTracker::m_lock", this)),
+    m_pending_ops(0) {
+}
+
+AsyncOpTracker::~AsyncOpTracker() {
+  wait_for_ops();
+}
+
+void AsyncOpTracker::start_op() {
+  Mutex::Locker locker(m_lock);
+  ++m_pending_ops;
+}
+
+void AsyncOpTracker::finish_op() {
+  Mutex::Locker locker(m_lock);
+  assert(m_pending_ops > 0);
+  if (--m_pending_ops == 0) {
+    m_cond.Signal();
+  }
+}
+
+void AsyncOpTracker::wait_for_ops() {
+  Mutex::Locker locker(m_lock);
+  while (m_pending_ops > 0) {
+    m_cond.Wait(m_lock);
+  }
+}
+
+} // namespace journal
diff --git a/src/journal/AsyncOpTracker.h b/src/journal/AsyncOpTracker.h
new file mode 100644 (file)
index 0000000..cec332f
--- /dev/null
@@ -0,0 +1,32 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_ASYNC_OP_TRACKER_H
+#define CEPH_JOURNAL_ASYNC_OP_TRACKER_H
+
+#include "include/int_types.h"
+#include "common/Cond.h"
+#include "common/Mutex.h"
+
+namespace journal {
+
+class AsyncOpTracker {
+public:
+  AsyncOpTracker();
+  ~AsyncOpTracker();
+
+  void start_op();
+  void finish_op();
+
+  void wait_for_ops();
+
+private:
+  Mutex m_lock;
+  Cond m_cond;
+  uint32_t m_pending_ops;
+
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_ASYNC_OP_TRACKER_H
index 51c4cef803191178543ec563943a46cdb88ae1f0..56c0db32e3f83b7786d5655b4297bc8d4b360cff 100644 (file)
@@ -20,33 +20,21 @@ JournalMetadata::JournalMetadata(librados::IoCtx &ioctx,
                                  const std::string &oid,
                                  const std::string &client_id,
                                  double commit_interval)
-    : m_cct(NULL), m_oid(oid), m_client_id(client_id),
-      m_commit_interval(commit_interval), m_order(0), m_splay_width(0),
-      m_initialized(false), m_finisher(NULL), m_timer(NULL),
+    : RefCountedObject(NULL, 0), m_cct(NULL), m_oid(oid),
+      m_client_id(client_id), m_commit_interval(commit_interval), m_order(0),
+      m_splay_width(0), m_initialized(false), m_finisher(NULL), m_timer(NULL),
       m_timer_lock("JournalMetadata::m_timer_lock"),
       m_lock("JournalMetadata::m_lock"), m_watch_ctx(this), m_watch_handle(0),
-      m_update_notifications(0), m_commit_position_pending(false),
-      m_commit_position_ctx(NULL) {
+      m_minimum_set(0), m_active_set(0), m_update_notifications(0),
+      m_commit_position_pending(false), m_commit_position_ctx(NULL) {
   m_ioctx.dup(ioctx);
   m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
 }
 
 JournalMetadata::~JournalMetadata() {
-  if (m_timer != NULL) {
-    Mutex::Locker locker(m_timer_lock);
-    m_timer->shutdown();
-    delete m_timer;
-    m_timer = NULL;
+  if (m_initialized) {
+    shutdown();
   }
-  if (m_finisher != NULL) {
-    m_finisher->stop();
-    delete m_finisher;
-    m_finisher = NULL;
-  }
-
-  m_ioctx.unwatch2(m_watch_handle);
-  librados::Rados rados(m_ioctx);
-  rados.watch_flush();
 }
 
 void JournalMetadata::init(Context *on_init) {
@@ -56,7 +44,7 @@ void JournalMetadata::init(Context *on_init) {
   m_finisher = new Finisher(m_cct);
   m_finisher->start();
 
-  m_timer = new SafeTimer(m_cct, m_timer_lock, false);
+  m_timer = new SafeTimer(m_cct, m_timer_lock, true);
   m_timer->init();
 
   int r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx);
@@ -72,6 +60,34 @@ void JournalMetadata::init(Context *on_init) {
                                  ctx);
 }
 
+void JournalMetadata::shutdown() {
+  assert(m_initialized);
+  m_initialized = false;
+
+  if (m_timer != NULL) {
+    Mutex::Locker locker(m_timer_lock);
+    m_timer->shutdown();
+    delete m_timer;
+    m_timer = NULL;
+  }
+
+  if (m_finisher != NULL) {
+    m_finisher->stop();
+    delete m_finisher;
+    m_finisher = NULL;
+  }
+
+  if (m_watch_handle != 0) {
+    m_ioctx.unwatch2(m_watch_handle);
+    librados::Rados rados(m_ioctx);
+    rados.watch_flush();
+    m_watch_handle = 0;
+  }
+
+  m_async_op_tracker.wait_for_ops();
+  m_ioctx.aio_flush();
+}
+
 int JournalMetadata::register_client(const std::string &description) {
   ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
   int r = client::client_register(m_ioctx, m_oid, m_client_id, description);
index ce621c5e15794e3e15991a6b35513880be17ec20..89ddcf1296b09753af52867533df3ddb6758aa95 100644 (file)
@@ -7,9 +7,11 @@
 #include "include/int_types.h"
 #include "include/Context.h"
 #include "include/rados/librados.hpp"
+#include "common/Cond.h"
 #include "common/Mutex.h"
 #include "common/RefCountedObj.h"
 #include "cls/journal/cls_journal_types.h"
+#include "journal/AsyncOpTracker.h"
 #include <boost/intrusive_ptr.hpp>
 #include <boost/noncopyable.hpp>
 #include <list>
@@ -44,6 +46,7 @@ public:
   ~JournalMetadata();
 
   void init(Context *on_init);
+  void shutdown();
 
   void add_listener(Listener *listener);
   void remove_listener(Listener *listener);
@@ -148,12 +151,16 @@ private:
   };
 
   struct C_NotifyUpdate : public Context {
-    JournalMetadataPtr journal_metadata;
+    JournalMetadata* journal_metadata;
     Context *on_safe;
 
     C_NotifyUpdate(JournalMetadata *_journal_metadata, Context *_on_safe = NULL)
-      : journal_metadata(_journal_metadata), on_safe(_on_safe) {}
-
+      : journal_metadata(_journal_metadata), on_safe(_on_safe) {
+      journal_metadata->m_async_op_tracker.start_op();
+    }
+    virtual ~C_NotifyUpdate() {
+      journal_metadata->m_async_op_tracker.finish_op();
+    }
     virtual void finish(int r) {
       if (r == 0) {
         journal_metadata->async_notify_update();
@@ -165,20 +172,24 @@ private:
   };
 
   struct C_ImmutableMetadata : public Context {
-    JournalMetadataPtr journal_metadata;
+    JournalMetadata* journal_metadata;
     Context *on_finish;
 
     C_ImmutableMetadata(JournalMetadata *_journal_metadata, Context *_on_finish)
       : journal_metadata(_journal_metadata), on_finish(_on_finish) {
+      Mutex::Locker locker(journal_metadata->m_lock);
+      journal_metadata->m_async_op_tracker.start_op();
+    }
+    virtual ~C_ImmutableMetadata() {
+      journal_metadata->m_async_op_tracker.finish_op();
     }
-
     virtual void finish(int r) {
       journal_metadata->handle_immutable_metadata(r, on_finish);
     }
 
   };
   struct C_Refresh : public Context {
-    JournalMetadataPtr journal_metadata;
+    JournalMetadata* journal_metadata;
     uint64_t minimum_set;
     uint64_t active_set;
     RegisteredClients registered_clients;
@@ -186,8 +197,13 @@ private:
 
     C_Refresh(JournalMetadata *_journal_metadata, Context *_on_finish)
       : journal_metadata(_journal_metadata), minimum_set(0), active_set(0),
-        on_finish(_on_finish) {}
-
+        on_finish(_on_finish) {
+      Mutex::Locker locker(journal_metadata->m_lock);
+      journal_metadata->m_async_op_tracker.start_op();
+    }
+    virtual ~C_Refresh() {
+      journal_metadata->m_async_op_tracker.finish_op();
+    }
     virtual void finish(int r) {
       journal_metadata->handle_refresh_complete(this, r);
     }
@@ -228,6 +244,8 @@ private:
   ObjectSetPosition m_commit_position;
   Context *m_commit_position_ctx;
 
+  AsyncOpTracker m_async_op_tracker;
+
   void handle_immutable_metadata(int r, Context *on_init);
 
   void refresh(Context *on_finish);
index ef11420d74defc98a26f8501738024ddcf275783..421a38b121e11616d9188d4cc0a81753bb22a90f 100644 (file)
@@ -14,18 +14,36 @@ namespace journal {
 
 namespace {
 
-struct C_HandleComplete: public Context {
+struct C_HandleComplete : public Context {
   ReplayHandler *replay_handler;
 
   C_HandleComplete(ReplayHandler *_replay_handler)
     : replay_handler(_replay_handler) {
+    replay_handler->get();
+  }
+  virtual ~C_HandleComplete() {
+    replay_handler->put();
   }
-
   virtual void finish(int r) {
     replay_handler->handle_complete(r);
   }
 };
 
+struct C_HandleEntriesAvailable : public Context {
+  ReplayHandler *replay_handler;
+
+  C_HandleEntriesAvailable(ReplayHandler *_replay_handler)
+      : replay_handler(_replay_handler) {
+    replay_handler->get();
+  }
+  virtual ~C_HandleEntriesAvailable() {
+    replay_handler->put();
+  }
+  virtual void finish(int r) {
+    replay_handler->handle_entries_available();
+  }
+};
+
 } // anonymous namespace
 
 JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
@@ -37,6 +55,7 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
     m_process_state(this), m_lock("JournalPlayer::m_lock"), m_state(STATE_INIT),
     m_splay_offset(0), m_watch_enabled(false), m_watch_scheduled(false),
     m_watch_interval(0), m_commit_object(0) {
+  m_replay_handler->get();
   m_ioctx.dup(ioctx);
   m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
 
@@ -55,6 +74,11 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
   }
 }
 
+JournalPlayer::~JournalPlayer() {
+  m_async_op_tracker.wait_for_ops();
+  m_replay_handler->put();
+}
+
 void JournalPlayer::prefetch() {
   m_lock.Lock();
   assert(m_state == STATE_INIT);
@@ -244,9 +268,8 @@ int JournalPlayer::process_prefetch() {
   ObjectPlayerPtr object_player = get_object_player();
   if (!object_player->empty()) {
     ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
-    m_lock.Unlock();
-    m_replay_handler->handle_entries_available();
-    m_lock.Lock();
+    m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
+      m_replay_handler), 0);
   } else if (m_watch_enabled) {
     object_player->watch(&m_process_state, m_watch_interval);
     m_watch_scheduled = true;
@@ -268,9 +291,8 @@ int JournalPlayer::process_playback() {
   ObjectPlayerPtr object_player = get_object_player();
   if (!object_player->empty()) {
     ldout(m_cct, 10) << __func__ << ": entries available" << dendl;
-    m_lock.Unlock();
-    m_replay_handler->handle_entries_available();
-    m_lock.Lock();
+    m_journal_metadata->get_finisher().queue(new C_HandleEntriesAvailable(
+      m_replay_handler), 0);
   }
   return 0;
 }
@@ -355,7 +377,7 @@ int JournalPlayer::handle_fetched(int r, uint64_t object_num) {
 JournalPlayer::C_PrefetchBatch::C_PrefetchBatch(JournalPlayer *p)
   : player(p), lock("JournalPlayer::C_PrefetchBatch::lock"), refs(1),
     return_value(0) {
-  player->get();
+  player->m_async_op_tracker.start_op();
 }
 
 void JournalPlayer::C_PrefetchBatch::add_fetch() {
@@ -374,7 +396,6 @@ void JournalPlayer::C_PrefetchBatch::complete(int r) {
 
   if (refs == 0) {
     player->process_state(return_value);
-    player->put();
     delete this;
   }
 }
index 43d81f4c9e12bbba9f68af588aafef957f902779..613c9e305b612ea0e0ede85f32671525e537d98e 100644 (file)
@@ -8,7 +8,7 @@
 #include "include/Context.h"
 #include "include/rados/librados.hpp"
 #include "common/Mutex.h"
-#include "common/RefCountedObj.h"
+#include "journal/AsyncOpTracker.h"
 #include "journal/JournalMetadata.h"
 #include "journal/ObjectPlayer.h"
 #include "cls/journal/cls_journal_types.h"
@@ -19,10 +19,8 @@ class SafeTimer;
 namespace journal {
 
 class ReplayHandler;
-class JournalPlayer;
-typedef boost::intrusive_ptr<JournalPlayer> JournalPlayerPtr;
 
-class JournalPlayer : public RefCountedObject {
+class JournalPlayer {
 public:
   typedef cls::journal::EntryPosition EntryPosition;
   typedef cls::journal::EntryPositions EntryPositions;
@@ -31,6 +29,7 @@ public:
   JournalPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
                 const JournalMetadataPtr& journal_metadata,
                 ReplayHandler *replay_handler);
+  ~JournalPlayer();
 
   void prefetch();
   void prefetch_and_watch(double interval);
@@ -58,6 +57,7 @@ private:
     }
     virtual void finish(int r) {}
   };
+
   struct C_PrefetchBatch : public Context {
     JournalPlayer *player;
     Mutex lock;
@@ -65,22 +65,28 @@ private:
     int return_value;
 
     C_PrefetchBatch(JournalPlayer *p);
+    virtual ~C_PrefetchBatch() {
+      player->m_async_op_tracker.finish_op();
+    }
     void add_fetch();
     virtual void complete(int r);
     virtual void finish(int r) {}
   };
+
   struct C_Fetch : public Context {
     JournalPlayer *player;
     uint64_t object_num;
     Context *on_fetch;
     C_Fetch(JournalPlayer *p, uint64_t o, Context *c)
       : player(p), object_num(o), on_fetch(c) {
-      player->get();
+      player->m_async_op_tracker.start_op();
+    }
+    virtual ~C_Fetch() {
+      player->m_async_op_tracker.finish_op();
     }
     virtual void finish(int r) {
       r = player->handle_fetched(r, object_num);
       on_fetch->complete(r);
-      player->put();
     }
   };
 
@@ -93,6 +99,8 @@ private:
 
   C_ProcessState m_process_state;
 
+  AsyncOpTracker m_async_op_tracker;
+
   mutable Mutex m_lock;
   State m_state;
   uint8_t m_splay_offset;
index 2b34873e5a27224550cf62b9856a765fc91c406a..cdb517b9b50f2f4c42c1a7f297809d0c1d027e5b 100644 (file)
@@ -5,6 +5,7 @@
 #include "journal/Utils.h"
 #include "common/Cond.h"
 #include "common/errno.h"
+#include "common/Finisher.h"
 #include <limits>
 
 #define dout_subsys ceph_subsys_journaler
@@ -18,19 +19,18 @@ JournalTrimmer::JournalTrimmer(librados::IoCtx &ioctx,
                                const JournalMetadataPtr &journal_metadata)
     : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
       m_journal_metadata(journal_metadata), m_lock("JournalTrimmer::m_lock"),
-      m_pending_ops(0), m_remove_set_pending(false), m_remove_set(0),
-      m_remove_set_ctx(NULL) {
+      m_remove_set_pending(false), m_remove_set(0), m_remove_set_ctx(NULL) {
   m_ioctx.dup(ioctx);
   m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
 }
 
 JournalTrimmer::~JournalTrimmer() {
-  wait_for_ops();
+  m_async_op_tracker.wait_for_ops();
 }
 
 int JournalTrimmer::remove_objects() {
   ldout(m_cct, 20) << __func__ << dendl;
-  wait_for_ops();
+  m_async_op_tracker.wait_for_ops();
 
   C_SaferCond ctx;
   {
@@ -60,33 +60,13 @@ void JournalTrimmer::update_commit_position(
 
   {
     Mutex::Locker locker(m_lock);
-    start_op();
+    m_async_op_tracker.start_op();
   }
 
   Context *ctx = new C_CommitPositionSafe(this, object_set_position);
   m_journal_metadata->set_commit_position(object_set_position, ctx);
 }
 
-void JournalTrimmer::start_op() {
-  assert(m_lock.is_locked());
-  ++m_pending_ops;
-}
-
-void JournalTrimmer::finish_op() {
-  assert(m_lock.is_locked());
-  assert(m_pending_ops > 0);
-  if (--m_pending_ops == 0) {
-    m_pending_ops_cond.Signal();
-  }
-}
-
-void JournalTrimmer::wait_for_ops() {
-  Mutex::Locker locker(m_lock);
-  while (m_pending_ops > 0) {
-    m_pending_ops_cond.Wait(m_lock);
-  }
-}
-
 void JournalTrimmer::trim_objects(uint64_t minimum_set) {
   assert(m_lock.is_locked());
 
@@ -108,7 +88,7 @@ void JournalTrimmer::trim_objects(uint64_t minimum_set) {
 void JournalTrimmer::remove_set(uint64_t object_set) {
   assert(m_lock.is_locked());
 
-  start_op();
+  m_async_op_tracker.start_op();
   uint8_t splay_width = m_journal_metadata->get_splay_width();
   C_RemoveSet *ctx = new C_RemoveSet(this, object_set, splay_width);
 
@@ -128,7 +108,6 @@ void JournalTrimmer::remove_set(uint64_t object_set) {
     assert(r == 0);
     comp->release();
   }
-  ctx->complete(-ENOENT);
 }
 
 void JournalTrimmer::handle_commit_position_safe(
@@ -164,7 +143,7 @@ void JournalTrimmer::handle_commit_position_safe(
       trim_objects(object_set_position.object_number / splay_width);
     }
   }
-  finish_op();
+  m_async_op_tracker.finish_op();
 }
 
 void JournalTrimmer::handle_set_removed(int r, uint64_t object_set) {
@@ -192,7 +171,7 @@ void JournalTrimmer::handle_set_removed(int r, uint64_t object_set) {
     ldout(m_cct, 20) << "completing remove set context" << dendl;
     m_remove_set_ctx->complete(r);
   }
-  finish_op();
+  m_async_op_tracker.finish_op();
 }
 
 JournalTrimmer::C_RemoveSet::C_RemoveSet(JournalTrimmer *_journal_trimmer,
@@ -200,7 +179,7 @@ JournalTrimmer::C_RemoveSet::C_RemoveSet(JournalTrimmer *_journal_trimmer,
                                          uint8_t _splay_width)
   : journal_trimmer(_journal_trimmer), object_set(_object_set),
     lock(utils::unique_lock_name("C_RemoveSet::lock", this)),
-    refs(_splay_width + 1), return_value(-ENOENT) {
+    refs(_splay_width), return_value(-ENOENT) {
 }
 
 void JournalTrimmer::C_RemoveSet::complete(int r) {
index 1ed6486a8305c29314210062563e8785f36a7a7e..1ae994da57dc5153b3b3238ba22d2a4cb48c370c 100644 (file)
@@ -8,6 +8,7 @@
 #include "include/rados/librados.hpp"
 #include "include/Context.h"
 #include "common/Mutex.h"
+#include "journal/AsyncOpTracker.h"
 #include "journal/JournalMetadata.h"
 #include "cls/journal/cls_journal_types.h"
 
@@ -34,9 +35,6 @@ private:
       : journal_trimmer(_journal_trimmer),
         object_set_position(_object_set_position) {}
 
-    virtual void complete(int r) {
-      finish(r);
-    }
     virtual void finish(int r) {
       journal_trimmer->handle_commit_position_safe(r, object_set_position);
     }
@@ -62,19 +60,14 @@ private:
 
   JournalMetadataPtr m_journal_metadata;
 
-  Mutex m_lock;
+  AsyncOpTracker m_async_op_tracker;
 
-  size_t m_pending_ops;
-  Cond m_pending_ops_cond;
+  Mutex m_lock;
 
   bool m_remove_set_pending;
   uint64_t m_remove_set;
   Context *m_remove_set_ctx;
 
-  void start_op();
-  void finish_op();
-  void wait_for_ops();
-
   void trim_objects(uint64_t minimum_set);
   void remove_set(uint64_t object_set);
 
index c61676bc0095fdfec354a35aabb3919bbce847fb..b7ca392fc97f4806b6e396a639d2bde897cd4378 100644 (file)
@@ -147,7 +147,7 @@ bool Journaler::try_pop_front(Payload *payload) {
 void Journaler::stop_replay() {
   assert(m_player != NULL);
   m_player->unwatch();
-  m_player->put();
+  delete m_player;
   m_player = NULL;
 }
 
index af85525080ddc6602b03f3478303563b148388d8..99b84b148769c8f65e28f52e3efad0e5c392ff38 100644 (file)
@@ -2,6 +2,7 @@ if ENABLE_CLIENT
 if WITH_RADOS
 
 libjournal_la_SOURCES = \
+       journal/AsyncOpTracker.cc \
        journal/Entry.cc \
        journal/Future.cc \
        journal/FutureImpl.cc \
@@ -18,6 +19,7 @@ libjournal_la_SOURCES = \
 
 noinst_LTLIBRARIES += libjournal.la
 noinst_HEADERS += \
+       journal/AsyncOpTracker.h \
        journal/Entry.h \
        journal/Future.h \
        journal/FutureImpl.h \
index 0a5b0568640dad8312a72eea506b616cf4df832e..ed5798198d127e3f2ca03bdcd18a150a455aecf1 100644 (file)
@@ -19,10 +19,10 @@ ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
   : RefCountedObject(NULL, 0), m_object_num(object_num),
     m_oid(utils::get_object_name(object_oid_prefix, m_object_num)),
     m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), m_order(order),
-    m_watch_interval(0), m_watch_task(NULL), m_watch_fetch(this),
+    m_watch_interval(0), m_watch_task(NULL),
     m_lock(utils::unique_lock_name("ObjectPlayer::m_lock", this)),
     m_fetch_in_progress(false), m_read_off(0), m_watch_ctx(NULL),
-    m_watch_ctx_in_progress(false) {
+    m_watch_in_progress(false) {
   m_ioctx.dup(ioctx);
   m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
 }
@@ -30,6 +30,7 @@ ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
 ObjectPlayer::~ObjectPlayer() {
   {
     Mutex::Locker locker(m_lock);
+    assert(!m_fetch_in_progress);
     assert(m_watch_ctx == NULL);
   }
 }
@@ -54,29 +55,33 @@ void ObjectPlayer::fetch(Context *on_finish) {
 
 void ObjectPlayer::watch(Context *on_fetch, double interval) {
   ldout(m_cct, 20) << __func__ << ": " << m_oid << " watch" << dendl;
-  {
-    Mutex::Locker locker(m_lock);
-    assert(m_watch_ctx == NULL);
-    m_watch_ctx = on_fetch;
-  }
-  {
-    Mutex::Locker locker(m_timer_lock);
-    m_watch_interval = interval;
-  }
+
+  Mutex::Locker timer_locker(m_timer_lock);
+  m_watch_interval = interval;
+
+  Mutex::Locker locker(m_lock);
+  assert(m_watch_ctx == NULL);
+  m_watch_ctx = on_fetch;
+
   schedule_watch();
 }
 
 void ObjectPlayer::unwatch() {
   ldout(m_cct, 20) << __func__ << ": " << m_oid << " unwatch" << dendl;
-  {
-    Mutex::Locker locker(m_lock);
-    while (m_watch_ctx_in_progress) {
-      m_watch_ctx_cond.Wait(m_lock);
-    }
-    delete m_watch_ctx;
-    m_watch_ctx = NULL;
-  }
+  Mutex::Locker timer_locker(m_timer_lock);
+  Mutex::Locker locker(m_lock);
+
   cancel_watch();
+
+  Context *ctx = m_watch_ctx;
+  m_watch_ctx = NULL;
+
+  m_timer_lock.Unlock();
+  while (m_watch_in_progress) {
+    m_watch_in_progress_cond.Wait(m_lock);
+  }
+  m_timer_lock.Lock();
+  delete ctx;
 }
 
 void ObjectPlayer::front(Entry *entry) const {
@@ -167,16 +172,21 @@ int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl) {
 }
 
 void ObjectPlayer::schedule_watch() {
+  assert(m_timer_lock.is_locked());
+  assert(m_lock.is_locked());
+  if (m_watch_ctx == NULL) {
+    return;
+  }
+
   ldout(m_cct, 20) << __func__ << ": " << m_oid << " scheduling watch" << dendl;
-  Mutex::Locker locker(m_timer_lock);
   assert(m_watch_task == NULL);
   m_watch_task = new C_WatchTask(this);
   m_timer.add_event_after(m_watch_interval, m_watch_task);
 }
 
 void ObjectPlayer::cancel_watch() {
+  assert(m_timer_lock.is_locked());
   ldout(m_cct, 20) << __func__ << ": " << m_oid << " cancelling watch" << dendl;
-  Mutex::Locker locker(m_timer_lock);
   if (m_watch_task != NULL) {
     m_timer.cancel_event(m_watch_task);
     m_watch_task = NULL;
@@ -184,28 +194,34 @@ void ObjectPlayer::cancel_watch() {
 }
 
 void ObjectPlayer::handle_watch_task() {
+  assert(m_timer_lock.is_locked());
+
   ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl;
   {
-    Mutex::Locker locker(m_timer_lock);
+    Mutex::Locker locker(m_lock);
+    assert(m_watch_ctx != NULL);
+
+    m_watch_in_progress = true;
     m_watch_task = NULL;
   }
-  fetch(&m_watch_fetch);
+  fetch(new C_WatchFetch(this));
 }
 
 void ObjectPlayer::handle_watch_fetched(int r) {
   ldout(m_cct, 10) << __func__ << ": " << m_oid << " poll complete, r=" << r
                    << dendl;
-  if (r == -ENOENT) {
-    schedule_watch();
-    return;
-  }
 
-  Context *on_finish;
+  Context *on_finish = NULL;
   {
+    Mutex::Locker timer_locker(m_timer_lock);
     Mutex::Locker locker(m_lock);
-    m_watch_ctx_in_progress = true;
-    on_finish = m_watch_ctx;
-    m_watch_ctx = NULL;
+    assert(m_watch_in_progress);
+    if (r == -ENOENT) {
+      schedule_watch();
+    } else {
+      on_finish = m_watch_ctx;
+      m_watch_ctx = NULL;
+    }
   }
 
   if (on_finish != NULL) {
@@ -214,15 +230,14 @@ void ObjectPlayer::handle_watch_fetched(int r) {
 
   {
     Mutex::Locker locker(m_lock);
-    m_watch_ctx_in_progress = false;
-    m_watch_ctx_cond.Signal();
+    m_watch_in_progress = false;
+    m_watch_in_progress_cond.Signal();
   }
 }
 
 void ObjectPlayer::C_Fetch::finish(int r) {
   r = object_player->handle_fetch_complete(r, read_bl);
   on_finish->complete(r);
-  object_player->put();
 }
 
 void ObjectPlayer::C_WatchTask::finish(int r) {
index ff85575f304d44cfe604ad53a8fa721d2d1b4b82..5c00ba1c056228ed5cd1cad33118765a50a1616b 100644 (file)
@@ -73,30 +73,24 @@ private:
   typedef boost::unordered_map<EntryKey, Entries::iterator> EntryKeys;
 
   struct C_Fetch : public Context {
-    ObjectPlayer *object_player;
+    ObjectPlayerPtr object_player;
     Context *on_finish;
     bufferlist read_bl;
     C_Fetch(ObjectPlayer *o, Context *ctx)
       : object_player(o), on_finish(ctx) {
-      object_player->get();
     }
     virtual void finish(int r);
   };
   struct C_WatchTask : public Context {
-    ObjectPlayer *object_player;
+    ObjectPlayerPtr object_player;
     C_WatchTask(ObjectPlayer *o) : object_player(o) {
-      object_player->get();
     }
     virtual void finish(int r);
   };
   struct C_WatchFetch : public Context {
-    ObjectPlayer *object_player;
+    ObjectPlayerPtr object_player;
     C_WatchFetch(ObjectPlayer *o) : object_player(o) {
     }
-    virtual void complete(int r) {
-      finish(r);
-      object_player->put();
-    }
     virtual void finish(int r);
   };
 
@@ -113,7 +107,6 @@ private:
 
   double m_watch_interval;
   Context *m_watch_task;
-  C_WatchFetch m_watch_fetch;
 
   mutable Mutex m_lock;
   bool m_fetch_in_progress;
@@ -125,8 +118,8 @@ private:
   InvalidRanges m_invalid_ranges;
 
   Context *m_watch_ctx;
-  Cond m_watch_ctx_cond;
-  bool m_watch_ctx_in_progress;
+  Cond m_watch_in_progress_cond;
+  bool m_watch_in_progress;
 
   int handle_fetch_complete(int r, const bufferlist &bl);
 
index af16ccdb00a24853d0225fcba6f0fc5cdee6ac9b..cf96b9417ecdb3b2f07fa443f4249d7aa75fdd1f 100644 (file)
@@ -37,7 +37,7 @@ ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
 }
 
 ObjectRecorder::~ObjectRecorder() {
-  cancel_append_task();
+  assert(m_append_task == NULL);
   assert(m_append_buffers.empty());
   assert(m_in_flight_appends.empty());
 }
@@ -72,12 +72,12 @@ void ObjectRecorder::flush(Context *on_safe) {
       future = Future(m_append_buffers.rbegin()->first);
 
       flush_appends(true);
-      cancel_append_task();
     } else if (!m_in_flight_appends.empty()) {
       AppendBuffers &append_buffers = m_in_flight_appends.rbegin()->second;
       assert(!append_buffers.empty());
       future = Future(append_buffers.rbegin()->first);
     }
+    cancel_append_task();
   }
 
   if (future.is_valid()) {
@@ -85,7 +85,6 @@ void ObjectRecorder::flush(Context *on_safe) {
   } else {
     on_safe->complete(0);
   }
-
 }
 
 void ObjectRecorder::flush(const FutureImplPtr &future) {
@@ -140,16 +139,11 @@ bool ObjectRecorder::close_object() {
 }
 
 void ObjectRecorder::handle_append_task() {
-  {
-    Mutex::Locker locker(m_lock);
-    flush_appends(true);
-  }
+  assert(m_timer_lock.is_locked());
+  m_append_task = NULL;
 
-  {
-    Mutex::Locker locker(m_timer_lock);
-    m_append_task = NULL;
-    put();
-  }
+  Mutex::Locker locker(m_lock);
+  flush_appends(true);
 }
 
 void ObjectRecorder::cancel_append_task() {
@@ -157,14 +151,12 @@ void ObjectRecorder::cancel_append_task() {
   if (m_append_task != NULL) {
     m_timer.cancel_event(m_append_task);
     m_append_task = NULL;
-    put();
   }
 }
 
 void ObjectRecorder::schedule_append_task() {
   Mutex::Locker locker(m_timer_lock);
   if (m_append_task == NULL && m_flush_age > 0) {
-    get();
     m_append_task = new C_AppendTask(this);
     m_timer.add_event_after(m_flush_age, m_append_task);
   }
index cc6425c23b81d1340a31477c56b25bfa482a4942..566c41fd780b12ae2d891b12f74d7b2e66c86f72 100644 (file)
@@ -79,11 +79,11 @@ private:
   };
   struct C_AppendTask : public Context {
     ObjectRecorder *object_recorder;
-    C_AppendTask(ObjectRecorder *o) : object_recorder(o) {}
-    virtual void complete(int r) {
+    C_AppendTask(ObjectRecorder *o) : object_recorder(o) {
+    }
+    virtual void finish(int r) {
       object_recorder->handle_append_task();
     }
-    virtual void finish(int r) {}
   };
   struct C_AppendFlush : public Context {
     ObjectRecorder *object_recorder;
index 208350461fe47e29b656391d3805aa14a7317764..e61240d8c1f1134208c46247dabdf2f9917151f0 100644 (file)
@@ -9,6 +9,9 @@ namespace journal {
 struct ReplayHandler  {
   virtual ~ReplayHandler() {}
 
+  virtual void get() = 0;
+  virtual void put() = 0;
+
   virtual void handle_entries_available() = 0;
   virtual void handle_complete(int r) = 0;
 };