]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
journal: simplified commit position tracking
authorJason Dillaman <dillaman@redhat.com>
Fri, 17 Jul 2015 15:26:54 +0000 (11:26 -0400)
committerJason Dillaman <dillaman@redhat.com>
Fri, 6 Nov 2015 01:42:42 +0000 (20:42 -0500)
Now the journal player and recorder will allocate a tid to represent
the associated journal entry.  The order of these allocations are
tracked so that the commit position can be moved only when all prior
commits are safely on disk.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
18 files changed:
src/journal/Future.h
src/journal/FutureImpl.cc
src/journal/FutureImpl.h
src/journal/JournalMetadata.cc
src/journal/JournalMetadata.h
src/journal/JournalPlayer.cc
src/journal/JournalPlayer.h
src/journal/JournalRecorder.cc
src/journal/JournalTrimmer.cc
src/journal/JournalTrimmer.h
src/journal/Journaler.cc
src/journal/Journaler.h
src/journal/Makefile.am
src/journal/Payload.cc [deleted file]
src/journal/Payload.h [deleted file]
src/journal/PayloadImpl.cc [deleted file]
src/journal/PayloadImpl.h [deleted file]
src/journal/ReplayEntry.h [new file with mode: 0644]

index 88ad436ef2fba80217f2d2fe8138ed39b2b333ec..dbc7d4d687766edf3bb392478c8e893cd542946a 100644 (file)
@@ -34,8 +34,13 @@ public:
   int get_return_value() const;
 
 private:
+  friend class Journaler;
   friend std::ostream& operator<<(std::ostream&, const Future&);
 
+  inline FutureImplPtr get_future_impl() const {
+    return m_future_impl;
+  }
+
   FutureImplPtr m_future_impl;
 };
 
index 4b966f0a8a82f2f35cf778030fab79916b700544..59494a17f4497dcd55f2f43859970900372867dc 100644 (file)
@@ -7,8 +7,10 @@
 
 namespace journal {
 
-FutureImpl::FutureImpl(Finisher &finisher, const std::string &tag, uint64_t tid)
+FutureImpl::FutureImpl(Finisher &finisher, const std::string &tag, uint64_t tid,
+                       uint64_t commit_tid)
   : RefCountedObject(NULL, 0), m_finisher(finisher), m_tag(tag), m_tid(tid),
+    m_commit_tid(commit_tid),
     m_lock(utils::unique_lock_name("FutureImpl::m_lock", this)), m_safe(false),
     m_consistent(false), m_return_value(0), m_flush_state(FLUSH_STATE_NONE),
     m_consistent_ack(this) {
index 69ea4461e149f47b28869d80b3a35d7e6de546c3..d936805acdc0c3ad5f737bb41bbd14689712331b 100644 (file)
@@ -31,7 +31,8 @@ public:
   };
   typedef boost::intrusive_ptr<FlushHandler> FlushHandlerPtr;
 
-  FutureImpl(Finisher &finisher, const std::string &tag, uint64_t tid);
+  FutureImpl(Finisher &finisher, const std::string &tag, uint64_t tid,
+             uint64_t commit_tid);
 
   void init(const FutureImplPtr &prev_future);
 
@@ -41,6 +42,9 @@ public:
   inline uint64_t get_tid() const {
     return m_tid;
   }
+  inline uint64_t get_commit_tid() const {
+    return m_commit_tid;
+  }
 
   void flush(Context *on_safe = NULL);
   void wait(Context *on_safe);
@@ -94,6 +98,7 @@ private:
   Finisher &m_finisher;
   std::string m_tag;
   uint64_t m_tid;
+  uint64_t m_commit_tid;
 
   mutable Mutex m_lock;
   FutureImplPtr m_prev_future;
index 56c0db32e3f83b7786d5655b4297bc8d4b360cff..bd486f5cae90656205af077a1d2796dc446e2ba5 100644 (file)
@@ -7,6 +7,7 @@
 #include "common/Finisher.h"
 #include "common/Timer.h"
 #include "cls/journal/cls_journal_client.h"
+#include <set>
 
 #define dout_subsys ceph_subsys_journaler
 #undef dout_prefix
@@ -24,9 +25,10 @@ JournalMetadata::JournalMetadata(librados::IoCtx &ioctx,
       m_client_id(client_id), m_commit_interval(commit_interval), m_order(0),
       m_splay_width(0), m_initialized(false), m_finisher(NULL), m_timer(NULL),
       m_timer_lock("JournalMetadata::m_timer_lock"),
-      m_lock("JournalMetadata::m_lock"), m_watch_ctx(this), m_watch_handle(0),
-      m_minimum_set(0), m_active_set(0), m_update_notifications(0),
-      m_commit_position_pending(false), m_commit_position_ctx(NULL) {
+      m_lock("JournalMetadata::m_lock"), m_commit_tid(0), m_watch_ctx(this),
+      m_watch_handle(0), m_minimum_set(0), m_active_set(0),
+      m_update_notifications(0), m_commit_position_ctx(NULL),
+      m_commit_position_task_ctx(NULL) {
   m_ioctx.dup(ioctx);
   m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
 }
@@ -178,6 +180,20 @@ void JournalMetadata::set_active_set(uint64_t object_set) {
   m_active_set = object_set;
 }
 
+void JournalMetadata::flush_commit_position() {
+  {
+    Mutex::Locker locker(m_lock);
+    if (m_commit_position_task_ctx == NULL) {
+      return;
+    }
+
+    Mutex::Locker timer_locker(m_timer_lock);
+    m_timer->cancel_event(m_commit_position_task_ctx);
+    m_commit_position_task_ctx = NULL;
+  }
+  handle_commit_position_task();
+}
+
 void JournalMetadata::set_commit_position(
     const ObjectSetPosition &commit_position, Context *on_safe) {
   assert(on_safe != NULL);
@@ -281,9 +297,9 @@ void JournalMetadata::schedule_commit_task() {
   assert(m_lock.is_locked());
 
   Mutex::Locker timer_locker(m_timer_lock);
-  if (!m_commit_position_pending) {
-    m_commit_position_pending = true;
-    m_timer->add_event_after(m_commit_interval, new C_CommitPositionTask(this));
+  if (m_commit_position_task_ctx == NULL) {
+    m_commit_position_task_ctx = new C_CommitPositionTask(this);
+    m_timer->add_event_after(m_commit_interval, m_commit_position_task_ctx);
   }
 }
 
@@ -335,6 +351,77 @@ void JournalMetadata::handle_watch_error(int err) {
   schedule_watch_reset();
 }
 
+uint64_t JournalMetadata::allocate_commit_tid(uint64_t object_num,
+                                              const std::string &tag,
+                                              uint64_t tid) {
+  Mutex::Locker locker(m_lock);
+  uint64_t commit_tid = ++m_commit_tid;
+  m_pending_commit_tids[commit_tid] = CommitEntry(object_num, tag, tid);
+
+  ldout(m_cct, 20) << "allocated commit tid: commit_tid=" << commit_tid << " ["
+                   << "object_num=" << object_num << ", "
+                   << "tag=" << tag << ", tid=" << tid << "]" << dendl;
+  return commit_tid;
+}
+
+bool JournalMetadata::committed(uint64_t commit_tid,
+                                ObjectSetPosition *object_set_position) {
+  ldout(m_cct, 20) << "committed tid=" << commit_tid << dendl;
+
+  Mutex::Locker locker(m_lock);
+  {
+    CommitTids::iterator it = m_pending_commit_tids.find(commit_tid);
+    assert(it != m_pending_commit_tids.end());
+
+    CommitEntry &commit_entry = it->second;
+    commit_entry.committed = true;
+  }
+
+  if (!m_commit_position.entry_positions.empty()) {
+    *object_set_position = m_commit_position;
+  } else {
+    *object_set_position = m_client.commit_position;
+  }
+
+  bool update_commit_position = false;
+  while (!m_pending_commit_tids.empty()) {
+    CommitTids::iterator it = m_pending_commit_tids.begin();
+    CommitEntry &commit_entry = it->second;
+    if (!commit_entry.committed) {
+      break;
+    }
+
+    object_set_position->object_number = commit_entry.object_num;
+    if (!object_set_position->entry_positions.empty() &&
+        object_set_position->entry_positions.front().tag == commit_entry.tag) {
+      object_set_position->entry_positions.front() = EntryPosition(
+        commit_entry.tag, commit_entry.tid);
+    } else {
+      object_set_position->entry_positions.push_front(EntryPosition(
+        commit_entry.tag, commit_entry.tid));
+    }
+    m_pending_commit_tids.erase(it);
+    update_commit_position = true;
+  }
+
+  if (update_commit_position) {
+    // prune the position to have unique tags in commit-order
+    std::set<std::string> in_use_tags;
+    EntryPositions::iterator it = object_set_position->entry_positions.begin();
+    while (it != object_set_position->entry_positions.end()) {
+      if (!in_use_tags.insert(it->tag).second) {
+        it = object_set_position->entry_positions.erase(it);
+      } else {
+        ++it;
+      }
+    }
+
+    ldout(m_cct, 20) << "updated object set position: " << *object_set_position
+                     << dendl;
+  }
+  return update_commit_position;
+}
+
 void JournalMetadata::notify_update() {
   ldout(m_cct, 10) << "notifying journal header update" << dendl;
 
index 89ddcf1296b09753af52867533df3ddb6758aa95..0a933b019563c93184c2ca4d15e9f870b2a9bd91 100644 (file)
@@ -87,6 +87,7 @@ public:
     return m_active_set;
   }
 
+  void flush_commit_position();
   void set_commit_position(const ObjectSetPosition &commit_position,
                            Context *on_safe);
   void get_commit_position(ObjectSetPosition *commit_position) const {
@@ -106,6 +107,10 @@ public:
   void reserve_tid(const std::string &tag, uint64_t tid);
   bool get_last_allocated_tid(const std::string &tag, uint64_t *tid) const;
 
+  uint64_t allocate_commit_tid(uint64_t object_num, const std::string &tag,
+                               uint64_t tid);
+  bool committed(uint64_t commit_tid, ObjectSetPosition *object_set_position);
+
   void notify_update();
   void async_notify_update();
 
@@ -113,6 +118,20 @@ private:
   typedef std::map<std::string, uint64_t> AllocatedTids;
   typedef std::list<Listener*> Listeners;
 
+  struct CommitEntry {
+    uint64_t object_num;
+    std::string tag;
+    uint64_t tid;
+    bool committed;
+
+    CommitEntry() : object_num(0), tid(0), committed(false) {
+    }
+    CommitEntry(uint64_t _object_num, const std::string &_tag, uint64_t _tid)
+      : object_num(_object_num), tag(_tag), tid(_tid), committed(false) {
+    }
+  };
+  typedef std::map<uint64_t, CommitEntry> CommitTids;
+
   struct C_WatchCtx : public librados::WatchCtx2 {
     JournalMetadata *journal_metadata;
 
@@ -186,8 +205,8 @@ private:
     virtual void finish(int r) {
       journal_metadata->handle_immutable_metadata(r, on_finish);
     }
-
   };
+
   struct C_Refresh : public Context {
     JournalMetadata* journal_metadata;
     uint64_t minimum_set;
@@ -225,6 +244,9 @@ private:
 
   mutable Mutex m_lock;
 
+  uint64_t m_commit_tid;
+  CommitTids m_pending_commit_tids;
+
   Listeners m_listeners;
 
   C_WatchCtx m_watch_ctx;
@@ -240,9 +262,9 @@ private:
   size_t m_update_notifications;
   Cond m_update_cond;
 
-  bool m_commit_position_pending;
   ObjectSetPosition m_commit_position;
   Context *m_commit_position_ctx;
+  Context *m_commit_position_task_ctx;
 
   AsyncOpTracker m_async_op_tracker;
 
index 421a38b121e11616d9188d4cc0a81753bb22a90f..db92590b434b238e70801e2682c12b2c106dfd65 100644 (file)
@@ -3,6 +3,7 @@
 
 #include "journal/JournalPlayer.h"
 #include "common/Finisher.h"
+#include "journal/Entry.h"
 #include "journal/ReplayHandler.h"
 #include "journal/Utils.h"
 
@@ -67,8 +68,10 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx,
     m_commit_object = commit_position.object_number;
     m_commit_tag = commit_position.entry_positions.front().tag;
 
-    for (size_t i=0; i<commit_position.entry_positions.size(); ++i) {
-      const EntryPosition &entry_position = commit_position.entry_positions[i];
+    for (EntryPositions::const_iterator it =
+           commit_position.entry_positions.begin();
+         it != commit_position.entry_positions.end(); ++it) {
+      const EntryPosition &entry_position = *it;
       m_commit_tids[entry_position.tag] = entry_position.tid;
     }
   }
@@ -121,8 +124,7 @@ void JournalPlayer::unwatch() {
   }
 }
 
-bool JournalPlayer::try_pop_front(Entry *entry,
-                                  ObjectSetPosition *object_set_position) {
+bool JournalPlayer::try_pop_front(Entry *entry, uint64_t *commit_tid) {
   Mutex::Locker locker(m_lock);
   if (m_state != STATE_PLAYBACK) {
     return false;
@@ -175,9 +177,9 @@ bool JournalPlayer::try_pop_front(Entry *entry,
     }
   }
 
-  // TODO populate the object_set_position w/ current object number and
-  //      unique entry tag mappings
   m_journal_metadata->reserve_tid(entry->get_tag(), entry->get_tid());
+  *commit_tid = m_journal_metadata->allocate_commit_tid(
+    object_player->get_object_number(), entry->get_tag(), entry->get_tid());
   return true;
 }
 
index 613c9e305b612ea0e0ede85f32671525e537d98e..7d4855960d71859a5ff5931e825aefee719dd008 100644 (file)
@@ -18,6 +18,7 @@ class SafeTimer;
 
 namespace journal {
 
+class Entry;
 class ReplayHandler;
 
 class JournalPlayer {
@@ -35,7 +36,7 @@ public:
   void prefetch_and_watch(double interval);
   void unwatch();
 
-  bool try_pop_front(Entry *entry, ObjectSetPosition *object_set_position);
+  bool try_pop_front(Entry *entry, uint64_t *commit_tid);
 
 private:
   typedef std::map<std::string, uint64_t> AllocatedTids;
index a921af26c7ef1437b162259e4435e651c1ea5356..4fb7765c59f55c7c2ddb18e82fc7564dc6aa24ad 100644 (file)
@@ -48,8 +48,10 @@ Future JournalRecorder::append(const std::string &tag,
   uint8_t splay_offset = tid % splay_width;
 
   ObjectRecorderPtr object_ptr = get_object(splay_offset);
+  uint64_t commit_tid = m_journal_metadata->allocate_commit_tid(
+    object_ptr->get_object_number(), tag, tid);
   FutureImplPtr future(new FutureImpl(m_journal_metadata->get_finisher(),
-                                      tag, tid));
+                                      tag, tid, commit_tid));
   future->init(m_prev_future);
   m_prev_future = future;
 
@@ -60,8 +62,6 @@ Future JournalRecorder::append(const std::string &tag,
   append_buffers.push_back(std::make_pair(future, entry_bl));
   bool object_full = object_ptr->append(append_buffers);
 
-  // TODO populate the object_set_position
-
   if (object_full) {
     ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full"
                      << dendl;
index cdb517b9b50f2f4c42c1a7f297809d0c1d027e5b..2af67a8298393ba5e90c42353230ebb37ae9760f 100644 (file)
@@ -25,6 +25,7 @@ JournalTrimmer::JournalTrimmer(librados::IoCtx &ioctx,
 }
 
 JournalTrimmer::~JournalTrimmer() {
+  m_journal_metadata->flush_commit_position();
   m_async_op_tracker.wait_for_ops();
 }
 
@@ -53,10 +54,13 @@ int JournalTrimmer::remove_objects() {
   return ctx.wait();
 }
 
-void JournalTrimmer::update_commit_position(
-    const ObjectSetPosition &object_set_position) {
-  ldout(m_cct, 20) << __func__ << ": pos=" << object_set_position
-                   << dendl;
+void JournalTrimmer::committed(uint64_t commit_tid) {
+  ldout(m_cct, 20) << __func__ << ": commit_tid=" << commit_tid << dendl;
+
+  ObjectSetPosition object_set_position;
+  if (!m_journal_metadata->committed(commit_tid, &object_set_position)) {
+    return;
+  }
 
   {
     Mutex::Locker locker(m_lock);
index 1ae994da57dc5153b3b3238ba22d2a4cb48c370c..937340733d703f18aa3fdef7f2e5528bdd030322 100644 (file)
@@ -23,7 +23,7 @@ public:
   ~JournalTrimmer();
 
   int remove_objects();
-  void update_commit_position(const ObjectSetPosition &object_set_position);
+  void committed(uint64_t commit_tid);
 
 private:
   struct C_CommitPositionSafe : public Context {
index b7ca392fc97f4806b6e396a639d2bde897cd4378..eaa4ce15d9cf87dc06f5dcac8683ac0b9f85898f 100644 (file)
@@ -10,7 +10,7 @@
 #include "journal/JournalPlayer.h"
 #include "journal/JournalRecorder.h"
 #include "journal/JournalTrimmer.h"
-#include "journal/PayloadImpl.h"
+#include "journal/ReplayEntry.h"
 #include "journal/ReplayHandler.h"
 #include "cls/journal/cls_journal_client.h"
 #include "cls/journal/cls_journal_types.h"
@@ -131,16 +131,16 @@ void Journaler::start_live_replay(ReplayHandler *replay_handler,
   m_player->prefetch_and_watch(interval);
 }
 
-bool Journaler::try_pop_front(Payload *payload) {
+bool Journaler::try_pop_front(ReplayEntry *replay_entry) {
   assert(m_player != NULL);
 
   Entry entry;
-  ObjectSetPosition object_set_position;
-  if (!m_player->try_pop_front(&entry, &object_set_position)) {
+  uint64_t commit_tid;
+  if (!m_player->try_pop_front(&entry, &commit_tid)) {
     return false;
   }
 
-  *payload = Payload(new PayloadImpl(entry.get_data(), object_set_position));
+  *replay_entry = ReplayEntry(entry.get_data(), commit_tid);
   return true;
 }
 
@@ -151,9 +151,13 @@ void Journaler::stop_replay() {
   m_player = NULL;
 }
 
-void Journaler::update_commit_position(const Payload &payload) {
-  PayloadImplPtr payload_impl = payload.get_payload_impl();
-  m_trimmer->update_commit_position(payload_impl->get_object_set_position());
+void Journaler::committed(const ReplayEntry &replay_entry) {
+  m_trimmer->committed(replay_entry.get_commit_tid());
+}
+
+void Journaler::committed(const Future &future) {
+  FutureImplPtr future_impl = future.get_future_impl();
+  m_trimmer->committed(future_impl->get_commit_tid());
 }
 
 void Journaler::start_append() {
index f27038d5cf030cf5b325265a7fe3511ca0780744..4b4959a879831382da67b1827ece4ea28752602f 100644 (file)
@@ -8,7 +8,6 @@
 #include "include/buffer.h"
 #include "include/rados/librados.hpp"
 #include "journal/Future.h"
-#include "journal/Payload.h"
 #include <string>
 #include <map>
 #include "include/assert.h"
@@ -22,6 +21,7 @@ class JournalMetadata;
 class JournalPlayer;
 class JournalRecorder;
 class JournalTrimmer;
+class ReplayEntry;
 class ReplayHandler;
 
 class Journaler {
@@ -40,16 +40,17 @@ public:
 
   void start_replay(ReplayHandler *replay_handler);
   void start_live_replay(ReplayHandler *replay_handler, double interval);
-  bool try_pop_front(Payload *payload);
+  bool try_pop_front(ReplayEntry *replay_entry);
   void stop_replay();
 
-  void update_commit_position(const Payload &payload);
-
   void start_append();
   Future append(const std::string &tag, const bufferlist &bl);
   void flush(Context *on_safe);
   void stop_append(Context *on_safe);
 
+  void committed(const ReplayEntry &replay_entry);
+  void committed(const Future &future);
+
 private:
   librados::IoCtx m_header_ioctx;
   librados::IoCtx m_data_ioctx;
index 99b84b148769c8f65e28f52e3efad0e5c392ff38..4f0cd18c01466b42ff469a0af8b295df7014be1f 100644 (file)
@@ -13,8 +13,6 @@ libjournal_la_SOURCES = \
        journal/JournalTrimmer.cc \
        journal/ObjectPlayer.cc \
        journal/ObjectRecorder.cc \
-       journal/Payload.cc \
-       journal/PayloadImpl.cc \
        journal/Utils.cc
 
 noinst_LTLIBRARIES += libjournal.la
@@ -30,8 +28,7 @@ noinst_HEADERS += \
        journal/JournalTrimmer.h \
        journal/ObjectPlayer.h \
        journal/ObjectRecorder.h \
-       journal/Payload.h \
-       journal/PayloadImpl.h \
+       journal/ReplayEntry.h \
        journal/ReplayHandler.h \
        journal/Utils.h
 DENCODER_DEPS += libjournal.la
diff --git a/src/journal/Payload.cc b/src/journal/Payload.cc
deleted file mode 100644 (file)
index 8015528..0000000
+++ /dev/null
@@ -1,21 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "journal/Payload.h"
-#include "journal/PayloadImpl.h"
-
-namespace journal {
-
-const bufferlist &Payload::get_data() const {
-  return m_payload_impl->get_data();
-}
-
-void intrusive_ptr_add_ref(PayloadImpl *p) {
-  p->get();
-}
-
-void intrusive_ptr_release(PayloadImpl *p) {
-  p->put();
-}
-
-} // namespace journal
diff --git a/src/journal/Payload.h b/src/journal/Payload.h
deleted file mode 100644 (file)
index 70f774a..0000000
+++ /dev/null
@@ -1,43 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#ifndef CEPH_JOURNAL_PAYLOAD_H
-#define CEPH_JOURNAL_PAYLOAD_H
-
-#include "include/int_types.h"
-#include "include/buffer.h"
-#include <boost/intrusive_ptr.hpp>
-
-namespace journal {
-
-class PayloadImpl;
-
-class Payload {
-public:
-  typedef boost::intrusive_ptr<PayloadImpl> PayloadImplPtr;
-
-  Payload() {}
-  Payload(const PayloadImplPtr &payload) : m_payload_impl(payload) {}
-
-  inline bool is_valid() const {
-    return m_payload_impl;
-  }
-
-  const bufferlist &get_data() const;
-
-private:
-  friend class Journaler;
-
-  inline PayloadImplPtr get_payload_impl() const {
-    return m_payload_impl;
-  }
-
-  PayloadImplPtr m_payload_impl;
-};
-
-void intrusive_ptr_add_ref(PayloadImpl *p);
-void intrusive_ptr_release(PayloadImpl *p);
-
-} // namespace journal
-
-#endif // CEPH_JOURNAL_PAYLOAD_H
diff --git a/src/journal/PayloadImpl.cc b/src/journal/PayloadImpl.cc
deleted file mode 100644 (file)
index 6bb134f..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#include "journal/PayloadImpl.h"
-
-namespace journal {
-
-PayloadImpl::PayloadImpl(const bufferlist &data,
-                         const ObjectSetPosition &object_set_position)
-  : m_data(data), m_object_set_position(object_set_position) {
-}
-
-const bufferlist &PayloadImpl::get_data() const {
-  return m_data;
-}
-
-const PayloadImpl::ObjectSetPosition &
-PayloadImpl::get_object_set_position() const {
-  return m_object_set_position;
-}
-
-} // namespace journal
diff --git a/src/journal/PayloadImpl.h b/src/journal/PayloadImpl.h
deleted file mode 100644 (file)
index 9c4d0b5..0000000
+++ /dev/null
@@ -1,37 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-
-#ifndef CEPH_JOURNAL_PAYLOAD_IMPL_H
-#define CEPH_JOURNAL_PAYLOAD_IMPL_H
-
-#include "include/int_types.h"
-#include "include/buffer.h"
-#include "common/RefCountedObj.h"
-#include "cls/journal/cls_journal_types.h"
-#include <boost/noncopyable.hpp>
-#include <boost/intrusive_ptr.hpp>
-#include "include/assert.h"
-
-namespace journal {
-
-class PayloadImpl;
-typedef boost::intrusive_ptr<PayloadImpl> PayloadImplPtr;
-
-class PayloadImpl : public RefCountedObject, boost::noncopyable {
-public:
-  typedef cls::journal::ObjectSetPosition ObjectSetPosition;
-
-  PayloadImpl(const bufferlist &data,
-              const ObjectSetPosition &object_set_position);
-
-  const bufferlist &get_data() const;
-  const ObjectSetPosition &get_object_set_position() const;
-
-private:
-  bufferlist m_data;
-  ObjectSetPosition m_object_set_position;
-};
-
-} // namespace journal
-
-#endif // CEPH_JOURNAL_PAYLOAD_IMPL_H
diff --git a/src/journal/ReplayEntry.h b/src/journal/ReplayEntry.h
new file mode 100644 (file)
index 0000000..4dd3ba4
--- /dev/null
@@ -0,0 +1,34 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_JOURNAL_REPLAY_ENTRY_H
+#define CEPH_JOURNAL_REPLAY_ENTRY_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+
+namespace journal {
+
+class ReplayEntry {
+public:
+  ReplayEntry() : m_commit_tid(0) {
+  }
+  ReplayEntry(const bufferlist &data, uint64_t commit_tid)
+    : m_data(data), m_commit_tid(commit_tid) {
+  }
+
+  inline const bufferlist &get_data() const {
+    return m_data;
+  }
+  inline uint64_t get_commit_tid() const {
+    return m_commit_tid;
+  }
+
+private:
+  bufferlist m_data;
+  uint64_t m_commit_tid;
+};
+
+} // namespace journal
+
+#endif // CEPH_JOURNAL_REPLAY_ENTRY_H