]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-replay: added version control to trace output file
authorJason Dillaman <dillaman@redhat.com>
Thu, 8 Oct 2015 17:21:29 +0000 (13:21 -0400)
committerJason Dillaman <dillaman@redhat.com>
Thu, 15 Oct 2015 18:44:21 +0000 (14:44 -0400)
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
(cherry picked from commit 3ecdae8388d69123b937a40ce614a0b795a757f1)

16 files changed:
src/rbd_replay/ActionTypes.cc [new file with mode: 0644]
src/rbd_replay/ActionTypes.h [new file with mode: 0644]
src/rbd_replay/BufferReader.cc [new file with mode: 0644]
src/rbd_replay/BufferReader.h [new file with mode: 0644]
src/rbd_replay/Deser.cc [deleted file]
src/rbd_replay/Deser.hpp [deleted file]
src/rbd_replay/Makefile.am
src/rbd_replay/Replayer.cc
src/rbd_replay/Replayer.hpp
src/rbd_replay/Ser.cc [deleted file]
src/rbd_replay/Ser.hpp [deleted file]
src/rbd_replay/actions.cc
src/rbd_replay/actions.hpp
src/rbd_replay/ios.cc
src/rbd_replay/ios.hpp
src/rbd_replay/rbd-replay-prep.cc

diff --git a/src/rbd_replay/ActionTypes.cc b/src/rbd_replay/ActionTypes.cc
new file mode 100644 (file)
index 0000000..36ed3ca
--- /dev/null
@@ -0,0 +1,354 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "rbd_replay/ActionTypes.h"
+#include "include/assert.h"
+#include "include/byteorder.h"
+#include "include/stringify.h"
+#include "common/Formatter.h"
+#include <iostream>
+#include <boost/variant.hpp>
+
+namespace rbd_replay {
+namespace action {
+
+namespace {
+
+bool byte_swap_required(__u8 version) {
+#if defined(CEPH_LITTLE_ENDIAN)
+  return (version == 0);
+#else
+  return false;
+#endif
+}
+
+void decode_big_endian_string(std::string &str, bufferlist::iterator &it) {
+#if defined(CEPH_LITTLE_ENDIAN)
+  uint32_t length;
+  ::decode(length, it);
+  length = swab32(length);
+  str.clear();
+  it.copy(length, str);
+#else
+  assert(false);
+#endif
+}
+
+class EncodeVisitor : public boost::static_visitor<void> {
+public:
+  EncodeVisitor(bufferlist &bl) : m_bl(bl) {
+  }
+
+  template <typename Action>
+  inline void operator()(const Action &action) const {
+    ::encode(static_cast<uint8_t>(Action::ACTION_TYPE), m_bl);
+    action.encode(m_bl);
+  }
+private:
+  bufferlist &m_bl;
+};
+
+class DecodeVisitor : public boost::static_visitor<void> {
+public:
+  DecodeVisitor(__u8 version, bufferlist::iterator &iter)
+    : m_version(version), m_iter(iter) {
+  }
+
+  template <typename Action>
+  inline void operator()(Action &action) const {
+    action.decode(m_version, m_iter);
+  }
+private:
+  __u8 m_version;
+  bufferlist::iterator &m_iter;
+};
+
+class DumpVisitor : public boost::static_visitor<void> {
+public:
+  DumpVisitor(Formatter *formatter) : m_formatter(formatter) {}
+
+  template <typename Action>
+  inline void operator()(const Action &action) const {
+    ActionType action_type = Action::ACTION_TYPE;
+    m_formatter->dump_string("action_type", stringify(action_type));
+    action.dump(m_formatter);
+  }
+private:
+  ceph::Formatter *m_formatter;
+};
+
+} // anonymous namespace
+
+void Dependency::encode(bufferlist &bl) const {
+  ::encode(id, bl);
+  ::encode(time_delta, bl);
+}
+
+void Dependency::decode(bufferlist::iterator &it) {
+  decode(1, it);
+}
+
+void Dependency::decode(__u8 version, bufferlist::iterator &it) {
+  ::decode(id, it);
+  ::decode(time_delta, it);
+  if (byte_swap_required(version)) {
+    id = swab32(id);
+    time_delta = swab64(time_delta);
+  }
+}
+
+void Dependency::dump(Formatter *f) const {
+  f->dump_unsigned("id", id);
+  f->dump_unsigned("time_delta", time_delta);
+}
+
+void Dependency::generate_test_instances(std::list<Dependency *> &o) {
+  o.push_back(new Dependency());
+  o.push_back(new Dependency(1, 123456789));
+}
+
+void ActionBase::encode(bufferlist &bl) const {
+  ::encode(id, bl);
+  ::encode(thread_id, bl);
+  ::encode(dependencies, bl);
+}
+
+void ActionBase::decode(__u8 version, bufferlist::iterator &it) {
+  ::decode(id, it);
+  ::decode(thread_id, it);
+  if (version == 0) {
+    uint32_t num_successors;
+    ::decode(num_successors, it);
+
+    uint32_t num_completion_successors;
+    ::decode(num_completion_successors, it);
+  }
+
+  if (byte_swap_required(version)) {
+    id = swab32(id);
+    thread_id = swab64(thread_id);
+
+    uint32_t dep_count;
+    ::decode(dep_count, it);
+    dep_count = swab32(dep_count);
+    dependencies.resize(dep_count);
+    for (uint32_t i = 0; i < dep_count; ++i) {
+      dependencies[i].decode(0, it);
+    }
+  } else {
+    ::decode(dependencies, it);
+  }
+}
+
+void ActionBase::dump(Formatter *f) const {
+  f->dump_unsigned("id", id);
+  f->dump_unsigned("thread_id", thread_id);
+  f->open_array_section("dependencies");
+  for (size_t i = 0; i < dependencies.size(); ++i) {
+    f->open_object_section("dependency");
+    dependencies[i].dump(f);
+    f->close_section();
+  }
+  f->close_section();
+}
+
+void ImageActionBase::encode(bufferlist &bl) const {
+  ActionBase::encode(bl);
+  ::encode(imagectx_id, bl);
+}
+
+void ImageActionBase::decode(__u8 version, bufferlist::iterator &it) {
+  ActionBase::decode(version, it);
+  ::decode(imagectx_id, it);
+  if (byte_swap_required(version)) {
+    imagectx_id = swab64(imagectx_id);
+  }
+}
+
+void ImageActionBase::dump(Formatter *f) const {
+  ActionBase::dump(f);
+  f->dump_unsigned("imagectx_id", imagectx_id);
+}
+
+void IoActionBase::encode(bufferlist &bl) const {
+  ImageActionBase::encode(bl);
+  ::encode(offset, bl);
+  ::encode(length, bl);
+}
+
+void IoActionBase::decode(__u8 version, bufferlist::iterator &it) {
+  ImageActionBase::decode(version, it);
+  ::decode(offset, it);
+  ::decode(length, it);
+  if (byte_swap_required(version)) {
+    offset = swab64(offset);
+    length = swab64(length);
+  }
+}
+
+void IoActionBase::dump(Formatter *f) const {
+  ImageActionBase::dump(f);
+  f->dump_unsigned("offset", offset);
+  f->dump_unsigned("length", length);
+}
+
+void OpenImageAction::encode(bufferlist &bl) const {
+  ImageActionBase::encode(bl);
+  ::encode(name, bl);
+  ::encode(snap_name, bl);
+  ::encode(read_only, bl);
+}
+
+void OpenImageAction::decode(__u8 version, bufferlist::iterator &it) {
+  ImageActionBase::decode(version, it);
+  if (byte_swap_required(version)) {
+    decode_big_endian_string(name, it);
+    decode_big_endian_string(snap_name, it);
+  } else {
+    ::decode(name, it);
+    ::decode(snap_name, it);
+  }
+  ::decode(read_only, it);
+}
+
+void OpenImageAction::dump(Formatter *f) const {
+  ImageActionBase::dump(f);
+  f->dump_string("name", name);
+  f->dump_string("snap_name", snap_name);
+  f->dump_bool("read_only", read_only);
+}
+
+void UnknownAction::encode(bufferlist &bl) const {
+  assert(false);
+}
+
+void UnknownAction::decode(__u8 version, bufferlist::iterator &it) {
+}
+
+void UnknownAction::dump(Formatter *f) const {
+}
+
+void ActionEntry::encode(bufferlist &bl) const {
+  ENCODE_START(1, 1, bl);
+  boost::apply_visitor(EncodeVisitor(bl), action);
+  ENCODE_FINISH(bl);
+}
+
+void ActionEntry::decode(bufferlist::iterator &it) {
+  DECODE_START(1, it);
+  decode(struct_v, it);
+  DECODE_FINISH(it);
+}
+
+void ActionEntry::decode_unversioned(bufferlist::iterator &it) {
+  decode(0, it);
+}
+
+void ActionEntry::decode(__u8 version, bufferlist::iterator &it) {
+  uint8_t action_type;
+  ::decode(action_type, it);
+
+  // select the correct action variant based upon the action_type
+  switch (action_type) {
+  case ACTION_TYPE_START_THREAD:
+    action = StartThreadAction();
+    break;
+  case ACTION_TYPE_STOP_THREAD:
+    action = StopThreadAction();
+    break;
+  case ACTION_TYPE_READ:
+    action = ReadAction();
+    break;
+  case ACTION_TYPE_WRITE:
+    action = WriteAction();
+    break;
+  case ACTION_TYPE_AIO_READ:
+    action = AioReadAction();
+    break;
+  case ACTION_TYPE_AIO_WRITE:
+    action = AioWriteAction();
+    break;
+  case ACTION_TYPE_OPEN_IMAGE:
+    action = OpenImageAction();
+    break;
+  case ACTION_TYPE_CLOSE_IMAGE:
+    action = CloseImageAction();
+    break;
+  }
+
+  boost::apply_visitor(DecodeVisitor(version, it), action);
+}
+
+void ActionEntry::dump(Formatter *f) const {
+  boost::apply_visitor(DumpVisitor(f), action);
+}
+
+void ActionEntry::generate_test_instances(std::list<ActionEntry *> &o) {
+  Dependencies dependencies;
+  dependencies.push_back(Dependency(3, 123456789));
+  dependencies.push_back(Dependency(4, 234567890));
+
+  o.push_back(new ActionEntry(StartThreadAction()));
+  o.push_back(new ActionEntry(StartThreadAction(1, 123456789, dependencies)));
+  o.push_back(new ActionEntry(StopThreadAction()));
+  o.push_back(new ActionEntry(StopThreadAction(1, 123456789, dependencies)));
+
+  o.push_back(new ActionEntry(ReadAction()));
+  o.push_back(new ActionEntry(ReadAction(1, 123456789, dependencies, 3, 4, 5)));
+  o.push_back(new ActionEntry(WriteAction()));
+  o.push_back(new ActionEntry(WriteAction(1, 123456789, dependencies, 3, 4,
+                                          5)));
+  o.push_back(new ActionEntry(AioReadAction()));
+  o.push_back(new ActionEntry(AioReadAction(1, 123456789, dependencies, 3, 4,
+                                            5)));
+  o.push_back(new ActionEntry(AioWriteAction()));
+  o.push_back(new ActionEntry(AioWriteAction(1, 123456789, dependencies, 3, 4,
+                                             5)));
+
+  o.push_back(new ActionEntry(OpenImageAction()));
+  o.push_back(new ActionEntry(OpenImageAction(1, 123456789, dependencies, 3,
+                                              "image_name", "snap_name",
+                                              true)));
+  o.push_back(new ActionEntry(CloseImageAction()));
+  o.push_back(new ActionEntry(CloseImageAction(1, 123456789, dependencies, 3)));
+}
+
+} // namespace action
+} // namespace rbd_replay
+
+std::ostream &operator<<(std::ostream &out,
+                         const rbd_replay::action::ActionType &type) {
+  using namespace rbd_replay::action;
+
+  switch (type) {
+  case ACTION_TYPE_START_THREAD:
+    out << "StartThread";
+    break;
+  case ACTION_TYPE_STOP_THREAD:
+    out << "StopThread";
+    break;
+  case ACTION_TYPE_READ:
+    out << "Read";
+    break;
+  case ACTION_TYPE_WRITE:
+    out << "Write";
+    break;
+  case ACTION_TYPE_AIO_READ:
+    out << "AioRead";
+    break;
+  case ACTION_TYPE_AIO_WRITE:
+    out << "AioWrite";
+    break;
+  case ACTION_TYPE_OPEN_IMAGE:
+    out << "OpenImage";
+    break;
+  case ACTION_TYPE_CLOSE_IMAGE:
+    out << "CloseImage";
+    break;
+  default:
+    out << "Unknown (" << static_cast<uint32_t>(type) << ")";
+    break;
+  }
+  return out;
+}
+
diff --git a/src/rbd_replay/ActionTypes.h b/src/rbd_replay/ActionTypes.h
new file mode 100644 (file)
index 0000000..63ef34e
--- /dev/null
@@ -0,0 +1,277 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_RBD_REPLAY_ACTION_TYPES_H
+#define CEPH_RBD_REPLAY_ACTION_TYPES_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "include/encoding.h"
+#include <iosfwd>
+#include <list>
+#include <string>
+#include <vector>
+#include <boost/variant/variant.hpp>
+
+namespace ceph { class Formatter; }
+
+namespace rbd_replay {
+namespace action {
+
+typedef uint64_t imagectx_id_t;
+typedef uint64_t thread_id_t;
+
+/// Even IDs are normal actions, odd IDs are completions.
+typedef uint32_t action_id_t;
+
+static const std::string BANNER("rbd-replay-trace");
+
+/**
+ * Dependencies link actions to earlier actions or completions.
+ * If an action has a dependency \c d then it waits until \c d.time_delta
+ * nanoseconds after the action or completion with ID \c d.id has fired.
+ */
+struct Dependency {
+  /// ID of the action or completion to wait for.
+  action_id_t id;
+
+  /// Nanoseconds of delay to wait until after the action or completion fires.
+  uint64_t time_delta;
+
+  /**
+   * @param id ID of the action or completion to wait for.
+   * @param time_delta Nanoseconds of delay to wait after the action or
+   *                   completion fires.
+   */
+  Dependency() : id(0), time_delta(0) {
+  }
+  Dependency(action_id_t id, uint64_t time_delta)
+    : id(id), time_delta(time_delta) {
+  }
+
+  void encode(bufferlist &bl) const;
+  void decode(bufferlist::iterator &it);
+  void decode(__u8 version, bufferlist::iterator &it);
+  void dump(Formatter *f) const;
+
+  static void generate_test_instances(std::list<Dependency *> &o);
+};
+
+WRITE_CLASS_ENCODER(Dependency);
+
+typedef std::vector<Dependency> Dependencies;
+
+enum ActionType {
+  ACTION_TYPE_START_THREAD = 0,
+  ACTION_TYPE_STOP_THREAD  = 1,
+  ACTION_TYPE_READ         = 2,
+  ACTION_TYPE_WRITE        = 3,
+  ACTION_TYPE_AIO_READ     = 4,
+  ACTION_TYPE_AIO_WRITE    = 5,
+  ACTION_TYPE_OPEN_IMAGE   = 6,
+  ACTION_TYPE_CLOSE_IMAGE  = 7
+};
+
+struct ActionBase {
+  action_id_t id;
+  thread_id_t thread_id;
+  Dependencies dependencies;
+
+  ActionBase() : id(0), thread_id(0) {
+  }
+  ActionBase(action_id_t id, thread_id_t thread_id,
+             const Dependencies &dependencies)
+    : id(id), thread_id(thread_id), dependencies(dependencies) {
+  }
+
+  void encode(bufferlist &bl) const;
+  void decode(__u8 version, bufferlist::iterator &it);
+  void dump(Formatter *f) const;
+};
+
+struct StartThreadAction : public ActionBase {
+  static const ActionType ACTION_TYPE = ACTION_TYPE_START_THREAD;
+
+  StartThreadAction() {
+  }
+  StartThreadAction(action_id_t id, thread_id_t thread_id,
+                    const Dependencies &dependencies)
+    : ActionBase(id, thread_id, dependencies) {
+  }
+};
+
+struct StopThreadAction : public ActionBase {
+  static const ActionType ACTION_TYPE = ACTION_TYPE_STOP_THREAD;
+
+  StopThreadAction() {
+  }
+  StopThreadAction(action_id_t id, thread_id_t thread_id,
+                   const Dependencies &dependencies)
+    : ActionBase(id, thread_id, dependencies) {
+  }
+};
+
+struct ImageActionBase : public ActionBase {
+  imagectx_id_t imagectx_id;
+
+  ImageActionBase() : imagectx_id(0) {
+  }
+  ImageActionBase(action_id_t id, thread_id_t thread_id,
+                  const Dependencies &dependencies, imagectx_id_t imagectx_id)
+    : ActionBase(id, thread_id, dependencies), imagectx_id(imagectx_id) {
+  }
+
+  void encode(bufferlist &bl) const;
+  void decode(__u8 version, bufferlist::iterator &it);
+  void dump(Formatter *f) const;
+};
+
+struct IoActionBase : public ImageActionBase {
+  uint64_t offset;
+  uint64_t length;
+
+  IoActionBase() : offset(0), length(0) {
+  }
+  IoActionBase(action_id_t id, thread_id_t thread_id,
+               const Dependencies &dependencies, imagectx_id_t imagectx_id,
+               uint64_t offset, uint64_t length)
+    : ImageActionBase(id, thread_id, dependencies, imagectx_id),
+      offset(offset), length(length) {
+  }
+
+  void encode(bufferlist &bl) const;
+  void decode(__u8 version, bufferlist::iterator &it);
+  void dump(Formatter *f) const;
+};
+
+struct ReadAction : public IoActionBase {
+  static const ActionType ACTION_TYPE = ACTION_TYPE_READ;
+
+  ReadAction() {
+  }
+  ReadAction(action_id_t id, thread_id_t thread_id,
+             const Dependencies &dependencies, imagectx_id_t imagectx_id,
+             uint64_t offset, uint64_t length)
+    : IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) {
+  }
+};
+
+struct WriteAction : public IoActionBase {
+  static const ActionType ACTION_TYPE = ACTION_TYPE_WRITE;
+
+  WriteAction() {
+  }
+  WriteAction(action_id_t id, thread_id_t thread_id,
+              const Dependencies &dependencies, imagectx_id_t imagectx_id,
+              uint64_t offset, uint64_t length)
+    : IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) {
+  }
+};
+
+struct AioReadAction : public IoActionBase {
+  static const ActionType ACTION_TYPE = ACTION_TYPE_AIO_READ;
+
+  AioReadAction() {
+  }
+  AioReadAction(action_id_t id, thread_id_t thread_id,
+                const Dependencies &dependencies, imagectx_id_t imagectx_id,
+                uint64_t offset, uint64_t length)
+    : IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) {
+  }
+};
+
+struct AioWriteAction : public IoActionBase {
+  static const ActionType ACTION_TYPE = ACTION_TYPE_AIO_WRITE;
+
+  AioWriteAction() {
+  }
+  AioWriteAction(action_id_t id, thread_id_t thread_id,
+                 const Dependencies &dependencies, imagectx_id_t imagectx_id,
+                 uint64_t offset, uint64_t length)
+    : IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) {
+  }
+};
+
+struct OpenImageAction : public ImageActionBase {
+  static const ActionType ACTION_TYPE = ACTION_TYPE_OPEN_IMAGE;
+
+  std::string name;
+  std::string snap_name;
+  bool read_only;
+
+  OpenImageAction() : read_only(false) {
+  }
+  OpenImageAction(action_id_t id, thread_id_t thread_id,
+                  const Dependencies &dependencies, imagectx_id_t imagectx_id,
+                  const std::string &name, const std::string &snap_name,
+                  bool read_only)
+    : ImageActionBase(id, thread_id, dependencies, imagectx_id),
+      name(name), snap_name(snap_name), read_only(read_only) {
+  }
+
+  void encode(bufferlist &bl) const;
+  void decode(__u8 version, bufferlist::iterator &it);
+  void dump(Formatter *f) const;
+};
+
+struct CloseImageAction : public ImageActionBase {
+  static const ActionType ACTION_TYPE = ACTION_TYPE_CLOSE_IMAGE;
+
+  CloseImageAction() {
+  }
+  CloseImageAction(action_id_t id, thread_id_t thread_id,
+                   const Dependencies &dependencies, imagectx_id_t imagectx_id)
+    : ImageActionBase(id, thread_id, dependencies, imagectx_id) {
+  }
+};
+
+struct UnknownAction {
+  static const ActionType ACTION_TYPE = static_cast<ActionType>(-1);
+
+  void encode(bufferlist &bl) const;
+  void decode(__u8 version, bufferlist::iterator &it);
+  void dump(Formatter *f) const;
+};
+
+typedef boost::variant<StartThreadAction,
+                       StopThreadAction,
+                       ReadAction,
+                       WriteAction,
+                       AioReadAction,
+                       AioWriteAction,
+                       OpenImageAction,
+                       CloseImageAction,
+                       UnknownAction> Action;
+
+class ActionEntry {
+public:
+  Action action;
+
+  ActionEntry() : action(UnknownAction()) {
+  }
+  ActionEntry(const Action &action) : action(action) {
+  }
+
+  void encode(bufferlist &bl) const;
+  void decode(bufferlist::iterator &it);
+  void decode_unversioned(bufferlist::iterator &it);
+  void dump(Formatter *f) const;
+
+  static void generate_test_instances(std::list<ActionEntry *> &o);
+
+private:
+  void decode(__u8 version, bufferlist::iterator &it);
+};
+
+WRITE_CLASS_ENCODER(ActionEntry);
+
+} // namespace action
+} // namespace rbd_replay
+
+std::ostream &operator<<(std::ostream &out,
+                         const rbd_replay::action::ActionType &type);
+
+using rbd_replay::action::decode;
+using rbd_replay::action::encode;
+
+#endif // CEPH_RBD_REPLAY_ACTION_TYPES_H
diff --git a/src/rbd_replay/BufferReader.cc b/src/rbd_replay/BufferReader.cc
new file mode 100644 (file)
index 0000000..4d1b604
--- /dev/null
@@ -0,0 +1,35 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "rbd_replay/BufferReader.h"
+#include "include/assert.h"
+#include "include/intarith.h"
+#include "include/page.h"
+
+namespace rbd_replay {
+
+BufferReader::BufferReader(int fd, size_t min_bytes, size_t max_bytes)
+  : m_fd(fd), m_min_bytes(min_bytes), m_max_bytes(max_bytes),
+    m_bl_it(m_bl.begin()) {
+  assert(m_min_bytes <= m_max_bytes);
+}
+
+int BufferReader::fetch(bufferlist::iterator **it) {
+  if (m_bl_it.get_remaining() < m_min_bytes) {
+    ssize_t bytes_to_read = ROUND_UP_TO(m_max_bytes - m_bl_it.get_remaining(),
+                                        CEPH_PAGE_SIZE);
+    while (bytes_to_read > 0) {
+      int r = m_bl.read_fd(m_fd, CEPH_PAGE_SIZE);
+      if (r < 0) {
+        return r;
+      }
+      assert(r <= bytes_to_read);
+      bytes_to_read -= r;
+    }
+  }
+
+  *it = &m_bl_it;
+  return 0;
+}
+
+} // namespace rbd_replay
diff --git a/src/rbd_replay/BufferReader.h b/src/rbd_replay/BufferReader.h
new file mode 100644 (file)
index 0000000..95b1533
--- /dev/null
@@ -0,0 +1,33 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_RBD_REPLAY_BUFFER_READER_H
+#define CEPH_RBD_REPLAY_BUFFER_READER_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+
+namespace rbd_replay {
+
+class BufferReader {
+public:
+  static const size_t DEFAULT_MIN_BYTES = 1<<20;
+  static const size_t DEFAULT_MAX_BYTES = 1<<22;
+
+  BufferReader(int fd, size_t min_bytes = DEFAULT_MIN_BYTES,
+               size_t max_bytes = DEFAULT_MAX_BYTES);
+
+  int fetch(bufferlist::iterator **it);
+
+private:
+  int m_fd;
+  size_t m_min_bytes;
+  size_t m_max_bytes;
+  bufferlist m_bl;
+  bufferlist::iterator m_bl_it;
+
+};
+
+} // namespace rbd_replay
+
+#endif // CEPH_RBD_REPLAY_BUFFER_READER_H
diff --git a/src/rbd_replay/Deser.cc b/src/rbd_replay/Deser.cc
deleted file mode 100644 (file)
index 986a18c..0000000
+++ /dev/null
@@ -1,67 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-
-#include "Deser.hpp"
-#include <arpa/inet.h>
-#include <cstdlib>
-#include <endian.h>
-
-
-rbd_replay::Deser::Deser(std::istream &in)
-  : m_in(in) {
-}
-
-uint8_t rbd_replay::Deser::read_uint8_t() {
-  uint8_t data;
-  m_in.read(reinterpret_cast<char*>(&data), sizeof(data));
-  return data;
-}
-
-uint16_t rbd_replay::Deser::read_uint16_t() {
-  uint16_t data;
-  m_in.read(reinterpret_cast<char*>(&data), sizeof(data));
-  return ntohs(data);
-}
-
-uint32_t rbd_replay::Deser::read_uint32_t() {
-  uint32_t data;
-  m_in.read(reinterpret_cast<char*>(&data), sizeof(data));
-  return ntohl(data);
-}
-
-uint64_t rbd_replay::Deser::read_uint64_t() {
-  uint64_t data;
-  m_in.read(reinterpret_cast<char*>(&data), sizeof(data));
-#if __BYTE_ORDER == __LITTLE_ENDIAN
-  data = (static_cast<uint64_t>(ntohl(data)) << 32 | ntohl(data >> 32));
-#endif
-  return data;
-}
-
-std::string rbd_replay::Deser::read_string() {
-  uint32_t length = read_uint32_t();
-  char* data = reinterpret_cast<char*>(malloc(length));
-  m_in.read(data, length);
-  std::string s(data, length);
-  free(data);
-  return s;
-}
-
-bool rbd_replay::Deser::read_bool() {
-  return read_uint8_t() != 0;
-}
-
-bool rbd_replay::Deser::eof() {
-  return m_in.eof();
-}
diff --git a/src/rbd_replay/Deser.hpp b/src/rbd_replay/Deser.hpp
deleted file mode 100644 (file)
index b466ace..0000000
+++ /dev/null
@@ -1,52 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-
-#ifndef _INCLUDED_RBD_REPLAY_DESER_HPP
-#define _INCLUDED_RBD_REPLAY_DESER_HPP
-
-#include <iostream>
-#include <stdint.h>
-
-namespace rbd_replay {
-
-/**
-   Helper for deserializing data in an architecture-indepdendent way.
-   Everything is read big-endian.
-   @see Ser
-*/
-class Deser {
-public:
-  Deser(std::istream &in);
-
-  uint8_t read_uint8_t();
-
-  uint16_t read_uint16_t();
-
-  uint32_t read_uint32_t();
-
-  uint64_t read_uint64_t();
-
-  std::string read_string();
-
-  bool read_bool();
-
-  bool eof();
-
-private:
-  std::istream &m_in;
-};
-
-}
-
-#endif
index fa101b7423fdc8f6adb74d3eb0279cb42120c477..23a8e9152e089d934bbd78d1e291850347e40287 100644 (file)
@@ -2,35 +2,46 @@ if ENABLE_CLIENT
 if WITH_RADOS
 if WITH_RBD
 
+librbd_replay_types_la_SOURCES = \
+       rbd_replay/ActionTypes.cc
+noinst_HEADERS += \
+        rbd_replay/ActionTypes.h
+noinst_LTLIBRARIES += librbd_replay_types.la
+DENCODER_DEPS += librbd_replay_types.la
+
 # librbd_replay_la exists only to help with unit tests
-librbd_replay_la_SOURCES = rbd_replay/actions.cc \
-       rbd_replay/Deser.cc \
+librbd_replay_la_SOURCES = \
+       rbd_replay/actions.cc \
+       rbd_replay/BufferReader.cc \
        rbd_replay/ImageNameMap.cc \
        rbd_replay/PendingIO.cc \
        rbd_replay/rbd_loc.cc \
-       rbd_replay/Replayer.cc \
-       rbd_replay/Ser.cc
-librbd_replay_la_LIBADD = $(LIBRBD) \
+       rbd_replay/Replayer.cc
+librbd_replay_la_LIBADD = \
+       $(LIBRBD) \
        $(LIBRADOS) \
        $(CEPH_GLOBAL)
 noinst_LTLIBRARIES += librbd_replay.la
-noinst_HEADERS += rbd_replay/BoundedBuffer.hpp \
+noinst_HEADERS += \
        rbd_replay/actions.hpp \
-       rbd_replay/Deser.hpp \
+       rbd_replay/BoundedBuffer.hpp \
+       rbd_replay/BufferReader.h \
        rbd_replay/ImageNameMap.hpp \
        rbd_replay/ios.hpp \
        rbd_replay/PendingIO.hpp \
        rbd_replay/rbd_loc.hpp \
        rbd_replay/rbd_replay_debug.hpp \
-       rbd_replay/Replayer.hpp \
-       rbd_replay/Ser.hpp
-
+       rbd_replay/Replayer.hpp
 
-rbd_replay_SOURCES = rbd_replay/rbd-replay.cc
-rbd_replay_LDADD = $(LIBRBD) \
+rbd_replay_SOURCES = \
+       rbd_replay/rbd-replay.cc
+rbd_replay_LDADD = \
+       librbd_replay.la \
+       librbd_replay_types.la \
+       $(LIBRBD) \
        $(LIBRADOS) \
        $(CEPH_GLOBAL) \
-       librbd_replay.la
+       $(LIBCOMMON)
 
 if LINUX
 bin_PROGRAMS += rbd-replay
@@ -43,12 +54,16 @@ librbd_replay_ios_la_LIBADD = $(LIBRBD) \
        librbd_replay.la
 noinst_LTLIBRARIES += librbd_replay_ios.la
 
-rbd_replay_prep_SOURCES = rbd_replay/rbd-replay-prep.cc
-rbd_replay_prep_LDADD = $(LIBRBD) \
-       $(LIBRADOS) \
-       $(CEPH_GLOBAL) \
+rbd_replay_prep_SOURCES = \
+       rbd_replay/rbd-replay-prep.cc
+rbd_replay_prep_LDADD = \
        librbd_replay.la \
        librbd_replay_ios.la \
+       librbd_replay_types.la \
+       $(LIBRBD) \
+       $(LIBRADOS) \
+       $(CEPH_GLOBAL) \
+       $(LIBCOMMON) \
        -lbabeltrace \
        -lbabeltrace-ctf \
        -lboost_date_time
index 9deb0ffb6e4a2b0d1ecb16903baf68f6c13feb1d..d62aedb6b7769fa0f7428d59667cf7bc1fb199ff 100644 (file)
 
 #include "Replayer.hpp"
 #include "common/errno.h"
+#include "rbd_replay/ActionTypes.h"
+#include "rbd_replay/BufferReader.h"
 #include <boost/foreach.hpp>
 #include <boost/thread/thread.hpp>
+#include <boost/scope_exit.hpp>
 #include <fstream>
 #include "global/global_context.h"
 #include "rbd_replay_debug.hpp"
 using namespace std;
 using namespace rbd_replay;
 
+namespace {
+
+bool is_versioned_replay(BufferReader &buffer_reader) {
+  bufferlist::iterator *it;
+  int r = buffer_reader.fetch(&it);
+  if (r < 0) {
+    return false;
+  }
+
+  if (it->get_remaining() < action::BANNER.size()) {
+    return false;
+  }
+
+  std::string banner;
+  it->copy(action::BANNER.size(), banner);
+  bool versioned = (banner == action::BANNER);
+  if (!versioned) {
+    it->seek(0);
+  }
+  return versioned;
+}
+
+} // anonymous namespace
 
 Worker::Worker(Replayer &replayer)
   : m_replayer(replayer),
@@ -174,18 +200,45 @@ void Replayer::run(const std::string& replay_file) {
       m_rbd = new librbd::RBD();
       map<thread_id_t, Worker*> workers;
 
-      ifstream input(replay_file.c_str(), ios::in | ios::binary);
-      if (!input.is_open()) {
-       cerr << "Failed to open " << replay_file << std::endl;
-       exit(1);
+      int fd = open(replay_file.c_str(), O_RDONLY);
+      if (fd < 0) {
+        std::cerr << "Failed to open " << replay_file << ": "
+                  << cpp_strerror(errno) << std::endl;
+        exit(1);
       }
+      BOOST_SCOPE_EXIT( (fd) ) {
+        close(fd);
+      } BOOST_SCOPE_EXIT_END;
 
-      Deser deser(input);
+      BufferReader buffer_reader(fd);
+      bool versioned = is_versioned_replay(buffer_reader);
       while (true) {
-       Action::ptr action = Action::read_from(deser);
+        action::ActionEntry action_entry;
+        try {
+          bufferlist::iterator *it;
+          int r = buffer_reader.fetch(&it);
+          if (r < 0) {
+            std::cerr << "Failed to read from trace file: " << cpp_strerror(r)
+                      << std::endl;
+            exit(-r);
+          }
+
+          if (versioned) {
+            action_entry.decode(*it);
+          } else {
+            action_entry.decode_unversioned(*it);
+          }
+        } catch (const buffer::error &err) {
+          std::cerr << "Failed to decode trace action" << std::endl;
+          exit(1);
+        }
+
+       Action::ptr action = Action::construct(action_entry);
        if (!action) {
-         break;
+          // unknown / unsupported action
+         continue;
        }
+
        if (action->is_start_thread()) {
          Worker *worker = new Worker(*this);
          workers[action->thread_id()] = worker;
@@ -259,9 +312,9 @@ bool Replayer::is_action_complete(action_id_t id) {
   return tracker.actions.count(id) > 0;
 }
 
-void Replayer::wait_for_actions(const vector<dependency_d> &deps) {
+void Replayer::wait_for_actions(const action::Dependencies &deps) {
   boost::posix_time::ptime release_time(boost::posix_time::neg_infin);
-  BOOST_FOREACH(const dependency_d &dep, deps) {
+  BOOST_FOREACH(const action::Dependency &dep, deps) {
     dout(DEPGRAPH_LEVEL) << "Waiting for " << dep.id << dendl;
     boost::system_time start_time(boost::get_system_time());
     action_tracker_d &tracker = tracker_for(dep.id);
index 538e7fddd4df31d1207282b6dc17095b0625c018..acad7258fccd4c8ff7c21a52310c88699343b00f 100644 (file)
@@ -17,6 +17,7 @@
 
 #include <boost/thread/mutex.hpp>
 #include <boost/thread/shared_mutex.hpp>
+#include "rbd_replay/ActionTypes.h"
 #include "BoundedBuffer.hpp"
 #include "ImageNameMap.hpp"
 #include "PendingIO.hpp"
@@ -100,7 +101,7 @@ public:
 
   bool is_action_complete(action_id_t id);
 
-  void wait_for_actions(const std::vector<dependency_d> &deps);
+  void wait_for_actions(const action::Dependencies &deps);
 
   std::string pool_name() const;
 
diff --git a/src/rbd_replay/Ser.cc b/src/rbd_replay/Ser.cc
deleted file mode 100644 (file)
index 97a63cd..0000000
+++ /dev/null
@@ -1,53 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-
-#include "Ser.hpp"
-#include <arpa/inet.h>
-#include <cstdlib>
-#include <endian.h>
-
-
-rbd_replay::Ser::Ser(std::ostream &out)
-  : m_out(out) {
-}
-
-void rbd_replay::Ser::write_uint8_t(uint8_t data) {
-  m_out.write(reinterpret_cast<char*>(&data), sizeof(data));
-}
-
-void rbd_replay::Ser::write_uint16_t(uint16_t data) {
-  data = htons(data);
-  m_out.write(reinterpret_cast<char*>(&data), sizeof(data));
-}
-
-void rbd_replay::Ser::write_uint32_t(uint32_t data) {
-  data = htonl(data);
-  m_out.write(reinterpret_cast<char*>(&data), sizeof(data));
-}
-
-void rbd_replay::Ser::write_uint64_t(uint64_t data) {
-#if __BYTE_ORDER == __LITTLE_ENDIAN
-  data = (static_cast<uint64_t>(htonl(data)) << 32 | htonl(data >> 32));
-#endif
-  m_out.write(reinterpret_cast<char*>(&data), sizeof(data));
-}
-
-void rbd_replay::Ser::write_string(const std::string& data) {
-  write_uint32_t(data.length());
-  m_out.write(data.data(), data.length());
-}
-
-void rbd_replay::Ser::write_bool(bool data) {
-  write_uint8_t(data ? 1 : 0);
-}
diff --git a/src/rbd_replay/Ser.hpp b/src/rbd_replay/Ser.hpp
deleted file mode 100644 (file)
index 130465d..0000000
+++ /dev/null
@@ -1,50 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-
-#ifndef _INCLUDED_RBD_REPLAY_SER_HPP
-#define _INCLUDED_RBD_REPLAY_SER_HPP
-
-#include <iostream>
-#include <stdint.h>
-
-namespace rbd_replay {
-
-/**
-   Helper for serializing data in an architecture-indepdendent way.
-   Everything is written big-endian.
-   @see Deser
-*/
-class Ser {
-public:
-  Ser(std::ostream &out);
-
-  void write_uint8_t(uint8_t);
-
-  void write_uint16_t(uint16_t);
-
-  void write_uint32_t(uint32_t);
-
-  void write_uint64_t(uint64_t);
-
-  void write_string(const std::string&);
-
-  void write_bool(bool b);
-
-private:
-  std::ostream &m_out;
-};
-
-}
-
-#endif
index 0c327b653edf4e8eaf6f260069065f7b965e1eb9..7726d08706dbc40849d3726c38d23a2886758d4f 100644 (file)
 using namespace rbd_replay;
 using namespace std;
 
+namespace {
 
-Action::Action(action_id_t id,
-               thread_id_t thread_id,
-               std::vector<dependency_d> &predecessors)
-  : m_id(id),
-    m_thread_id(thread_id),
-    m_predecessors(predecessors) {
-    }
-
-Action::~Action() {
+std::string create_fake_data() {
+  char data[1 << 20]; // 1 MB
+  for (unsigned int i = 0; i < sizeof(data); i++) {
+    data[i] = (char) i;
+  }
+  return std::string(data, sizeof(data));
 }
 
-Action::ptr Action::read_from(Deser &d) {
-  uint8_t type = d.read_uint8_t();
-  if (d.eof()) {
-    return Action::ptr();
-  }
-  uint32_t ionum = d.read_uint32_t();
-  uint64_t thread_id = d.read_uint64_t();
-  d.read_uint32_t(); // unused
-  d.read_uint32_t(); // unused
-  uint32_t num_dependencies = d.read_uint32_t();
-  vector<dependency_d> deps;
-  for (unsigned int i = 0; i < num_dependencies; i++) {
-    uint32_t dep_id = d.read_uint32_t();
-    uint64_t time_delta = d.read_uint64_t();
-    deps.push_back(dependency_d(dep_id, time_delta));
+struct ConstructVisitor : public boost::static_visitor<Action::ptr> {
+  inline Action::ptr operator()(const action::StartThreadAction &action) const {
+    return Action::ptr(new StartThreadAction(action));
   }
-  DummyAction dummy(ionum, thread_id, deps);
-  switch (type) {
-  case IO_START_THREAD:
-    return StartThreadAction::read_from(dummy, d);
-  case IO_STOP_THREAD:
-    return StopThreadAction::read_from(dummy, d);
-  case IO_READ:
-    return ReadAction::read_from(dummy, d);
-  case IO_WRITE:
-    return WriteAction::read_from(dummy, d);
-  case IO_ASYNC_READ:
-    return AioReadAction::read_from(dummy, d);
-  case IO_ASYNC_WRITE:
-    return AioWriteAction::read_from(dummy, d);
-  case IO_OPEN_IMAGE:
-    return OpenImageAction::read_from(dummy, d);
-  case IO_CLOSE_IMAGE:
-    return CloseImageAction::read_from(dummy, d);
-  default:
-    cerr << "Invalid action type: " << type << std::endl;
-    exit(1);
+
+  inline Action::ptr operator()(const action::StopThreadAction &action) const{
+    return Action::ptr(new StopThreadAction(action));
   }
-}
 
-std::ostream& Action::dump_action_fields(std::ostream& o) const {
-  o << "id=" << m_id << ", thread_id=" << m_thread_id << ", predecessors=[";
-  bool first = true;
-  BOOST_FOREACH(const dependency_d &d, m_predecessors) {
-    if (!first) {
-      o << ",";
-    }
-    o << d.id;
-    first = false;
+  inline Action::ptr operator()(const action::ReadAction &action) const {
+    return Action::ptr(new ReadAction(action));
   }
-  return o << "]";
-}
 
-std::ostream& rbd_replay::operator<<(std::ostream& o, const Action& a) {
-  return a.dump(o);
-}
+  inline Action::ptr operator()(const action::AioReadAction &action) const {
+    return Action::ptr(new AioReadAction(action));
+  }
 
+  inline Action::ptr operator()(const action::WriteAction &action) const {
+    return Action::ptr(new WriteAction(action));
+  }
 
-std::ostream& DummyAction::dump(std::ostream& o) const {
-  o << "DummyAction[";
-  dump_action_fields(o);
-  return o << "]";
-}
+  inline Action::ptr operator()(const action::AioWriteAction &action) const {
+    return Action::ptr(new AioWriteAction(action));
+  }
 
+  inline Action::ptr operator()(const action::OpenImageAction &action) const {
+    return Action::ptr(new OpenImageAction(action));
+  }
 
-StartThreadAction::StartThreadAction(Action &src)
-  : Action(src) {
-}
+  inline Action::ptr operator()(const action::CloseImageAction &action) const {
+    return Action::ptr(new CloseImageAction(action));
+  }
 
-void StartThreadAction::perform(ActionCtx &ctx) {
-  cerr << "StartThreadAction should never actually be performed" << std::endl;
-  exit(1);
-}
+  inline Action::ptr operator()(const action::UnknownAction &action) const {
+    return Action::ptr();
+  }
+};
 
-bool StartThreadAction::is_start_thread() {
-  return true;
-}
+} // anonymous namespace
 
-Action::ptr StartThreadAction::read_from(Action &src, Deser &d) {
-  return Action::ptr(new StartThreadAction(src));
+std::ostream& rbd_replay::operator<<(std::ostream& o, const Action& a) {
+  return a.dump(o);
 }
 
-std::ostream& StartThreadAction::dump(std::ostream& o) const {
-  o << "StartThreadAction[";
-  dump_action_fields(o);
-  return o << "]";
+Action::ptr Action::construct(const action::ActionEntry &action_entry) {
+  return boost::apply_visitor(ConstructVisitor(), action_entry.action);
 }
 
-
-StopThreadAction::StopThreadAction(Action &src)
-  : Action(src) {
+void StartThreadAction::perform(ActionCtx &ctx) {
+  cerr << "StartThreadAction should never actually be performed" << std::endl;
+  exit(1);
 }
 
 void StopThreadAction::perform(ActionCtx &ctx) {
@@ -132,116 +90,33 @@ void StopThreadAction::perform(ActionCtx &ctx) {
   ctx.stop();
 }
 
-Action::ptr StopThreadAction::read_from(Action &src, Deser &d) {
-  return Action::ptr(new StopThreadAction(src));
-}
-
-std::ostream& StopThreadAction::dump(std::ostream& o) const {
-  o << "StopThreadAction[";
-  dump_action_fields(o);
-  return o << "]";
-}
-
-
-AioReadAction::AioReadAction(const Action &src,
-                             imagectx_id_t imagectx_id,
-                             uint64_t offset,
-                             uint64_t length)
-  : Action(src),
-    m_imagectx_id(imagectx_id),
-    m_offset(offset),
-    m_length(length) {
-    }
-
-Action::ptr AioReadAction::read_from(Action &src, Deser &d) {
-  imagectx_id_t imagectx_id = d.read_uint64_t();
-  uint64_t offset = d.read_uint64_t();
-  uint64_t length = d.read_uint64_t();
-  return Action::ptr(new AioReadAction(src, imagectx_id, offset, length));
-}
-
 void AioReadAction::perform(ActionCtx &worker) {
   dout(ACTION_LEVEL) << "Performing " << *this << dendl;
-  librbd::Image *image = worker.get_image(m_imagectx_id);
+  librbd::Image *image = worker.get_image(m_action.imagectx_id);
   assert(image);
   PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
   worker.add_pending(io);
-  int r = image->aio_read(m_offset, m_length, io->bufferlist(), &io->completion());
+  int r = image->aio_read(m_action.offset, m_action.length, io->bufferlist(), &io->completion());
   assertf(r >= 0, "id = %d, r = %d", id(), r);
 }
 
-std::ostream& AioReadAction::dump(std::ostream& o) const {
-  o << "AioReadAction[";
-  dump_action_fields(o);
-  return o << ", imagectx_id=" << m_imagectx_id << ", offset=" << m_offset << ", length=" << m_length << "]";
-}
-
-
-ReadAction::ReadAction(const Action &src,
-                       imagectx_id_t imagectx_id,
-                       uint64_t offset,
-                       uint64_t length)
-  : Action(src),
-    m_imagectx_id(imagectx_id),
-    m_offset(offset),
-    m_length(length) {
-    }
-
-Action::ptr ReadAction::read_from(Action &src, Deser &d) {
-  imagectx_id_t imagectx_id = d.read_uint64_t();
-  uint64_t offset = d.read_uint64_t();
-  uint64_t length = d.read_uint64_t();
-  return Action::ptr(new ReadAction(src, imagectx_id, offset, length));
-}
-
 void ReadAction::perform(ActionCtx &worker) {
   dout(ACTION_LEVEL) << "Performing " << *this << dendl;
-  librbd::Image *image = worker.get_image(m_imagectx_id);
+  librbd::Image *image = worker.get_image(m_action.imagectx_id);
   PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
   worker.add_pending(io);
-  ssize_t r = image->read(m_offset, m_length, io->bufferlist());
+  ssize_t r = image->read(m_action.offset, m_action.length, io->bufferlist());
   assertf(r >= 0, "id = %d, r = %d", id(), r);
   worker.remove_pending(io);
 }
 
-std::ostream& ReadAction::dump(std::ostream& o) const {
-  o << "ReadAction[";
-  dump_action_fields(o);
-  return o << ", imagectx_id=" << m_imagectx_id << ", offset=" << m_offset << ", length=" << m_length << "]";
-}
-
-
-AioWriteAction::AioWriteAction(const Action &src,
-                               imagectx_id_t imagectx_id,
-                               uint64_t offset,
-                               uint64_t length)
-  : Action(src),
-    m_imagectx_id(imagectx_id),
-    m_offset(offset),
-    m_length(length) {
-    }
-
-Action::ptr AioWriteAction::read_from(Action &src, Deser &d) {
-  imagectx_id_t imagectx_id = d.read_uint64_t();
-  uint64_t offset = d.read_uint64_t();
-  uint64_t length = d.read_uint64_t();
-  return Action::ptr(new AioWriteAction(src, imagectx_id, offset, length));
-}
-
-static std::string create_fake_data() {
-  char data[1 << 20]; // 1 MB
-  for (unsigned int i = 0; i < sizeof(data); i++) {
-    data[i] = (char) i;
-  }
-  return std::string(data, sizeof(data));
-}
 
 void AioWriteAction::perform(ActionCtx &worker) {
   static const std::string fake_data(create_fake_data());
   dout(ACTION_LEVEL) << "Performing " << *this << dendl;
-  librbd::Image *image = worker.get_image(m_imagectx_id);
+  librbd::Image *image = worker.get_image(m_action.imagectx_id);
   PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
-  uint64_t remaining = m_length;
+  uint64_t remaining = m_action.length;
   while (remaining > 0) {
     uint64_t n = std::min(remaining, (uint64_t)fake_data.length());
     io->bufferlist().append(fake_data.data(), n);
@@ -251,126 +126,52 @@ void AioWriteAction::perform(ActionCtx &worker) {
   if (worker.readonly()) {
     worker.remove_pending(io);
   } else {
-    int r = image->aio_write(m_offset, m_length, io->bufferlist(), &io->completion());
+    int r = image->aio_write(m_action.offset, m_action.length, io->bufferlist(), &io->completion());
     assertf(r >= 0, "id = %d, r = %d", id(), r);
   }
 }
 
-std::ostream& AioWriteAction::dump(std::ostream& o) const {
-  o << "AioWriteAction[";
-  dump_action_fields(o);
-  return o << ", imagectx_id=" << m_imagectx_id << ", offset=" << m_offset << ", length=" << m_length << "]";
-}
-
-
-WriteAction::WriteAction(const Action &src,
-                         imagectx_id_t imagectx_id,
-                         uint64_t offset,
-                         uint64_t length)
-  : Action(src),
-    m_imagectx_id(imagectx_id),
-    m_offset(offset),
-    m_length(length) {
-    }
-
-Action::ptr WriteAction::read_from(Action &src, Deser &d) {
-  imagectx_id_t imagectx_id = d.read_uint64_t();
-  uint64_t offset = d.read_uint64_t();
-  uint64_t length = d.read_uint64_t();
-  return Action::ptr(new WriteAction(src, imagectx_id, offset, length));
-}
-
 void WriteAction::perform(ActionCtx &worker) {
   dout(ACTION_LEVEL) << "Performing " << *this << dendl;
-  librbd::Image *image = worker.get_image(m_imagectx_id);
+  librbd::Image *image = worker.get_image(m_action.imagectx_id);
   PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
   worker.add_pending(io);
-  io->bufferlist().append_zero(m_length);
+  io->bufferlist().append_zero(m_action.length);
   if (!worker.readonly()) {
-    ssize_t r = image->write(m_offset, m_length, io->bufferlist());
+    ssize_t r = image->write(m_action.offset, m_action.length, io->bufferlist());
     assertf(r >= 0, "id = %d, r = %d", id(), r);
   }
   worker.remove_pending(io);
 }
 
-std::ostream& WriteAction::dump(std::ostream& o) const {
-  o << "WriteAction[";
-  dump_action_fields(o);
-  return o << ", imagectx_id=" << m_imagectx_id << ", offset=" << m_offset << ", length=" << m_length << "]";
-}
-
-
-OpenImageAction::OpenImageAction(Action &src,
-                                 imagectx_id_t imagectx_id,
-                                 string name,
-                                 string snap_name,
-                                 bool readonly)
-  : Action(src),
-    m_imagectx_id(imagectx_id),
-    m_name(name),
-    m_snap_name(snap_name),
-    m_readonly(readonly) {
-    }
-
-Action::ptr OpenImageAction::read_from(Action &src, Deser &d) {
-  imagectx_id_t imagectx_id = d.read_uint64_t();
-  string name = d.read_string();
-  string snap_name = d.read_string();
-  bool readonly = d.read_bool();
-  return Action::ptr(new OpenImageAction(src, imagectx_id, name, snap_name, readonly));
-}
-
 void OpenImageAction::perform(ActionCtx &worker) {
   dout(ACTION_LEVEL) << "Performing " << *this << dendl;
   PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
   worker.add_pending(io);
   librbd::Image *image = new librbd::Image();
   librbd::RBD *rbd = worker.rbd();
-  rbd_loc name(worker.map_image_name(m_name, m_snap_name));
+  rbd_loc name(worker.map_image_name(m_action.name, m_action.snap_name));
   int r;
-  if (m_readonly || worker.readonly()) {
+  if (m_action.read_only || worker.readonly()) {
     r = rbd->open_read_only(*worker.ioctx(), *image, name.image.c_str(), name.snap.c_str());
   } else {
     r = rbd->open(*worker.ioctx(), *image, name.image.c_str(), name.snap.c_str());
   }
   if (r) {
-    cerr << "Unable to open image '" << m_name
-        << "' with snap '" << m_snap_name
+    cerr << "Unable to open image '" << m_action.name
+        << "' with snap '" << m_action.snap_name
         << "' (mapped to '" << name.str()
-        << "') and readonly " << m_readonly
+        << "') and readonly " << m_action.read_only
         << ": (" << -r << ") " << strerror(-r) << std::endl;
     exit(1);
   }
-  worker.put_image(m_imagectx_id, image);
+  worker.put_image(m_action.imagectx_id, image);
   worker.remove_pending(io);
 }
 
-std::ostream& OpenImageAction::dump(std::ostream& o) const {
-  o << "OpenImageAction[";
-  dump_action_fields(o);
-  return o << ", imagectx_id=" << m_imagectx_id << ", name='" << m_name << "', snap_name='" << m_snap_name << "', readonly=" << m_readonly << "]";
-}
-
-
-CloseImageAction::CloseImageAction(Action &src,
-                                   imagectx_id_t imagectx_id)
-  : Action(src),
-    m_imagectx_id(imagectx_id) {
-    }
-
-Action::ptr CloseImageAction::read_from(Action &src, Deser &d) {
-  imagectx_id_t imagectx_id = d.read_uint64_t();
-  return Action::ptr(new CloseImageAction(src, imagectx_id));
-}
-
 void CloseImageAction::perform(ActionCtx &worker) {
   dout(ACTION_LEVEL) << "Performing " << *this << dendl;
-  worker.erase_image(m_imagectx_id);
+  worker.erase_image(m_action.imagectx_id);
   worker.set_action_complete(pending_io_id());
 }
 
-std::ostream& CloseImageAction::dump(std::ostream& o) const {
-  o << "CloseImageAction[";
-  dump_action_fields(o);
-  return o << ", imagectx_id=" << m_imagectx_id << "]";
-}
index e9522dbf99fcdbdb53dac2b771203e673375c274..ea46a883362e62ec7badd833d9f2c2bfdade7547 100644 (file)
 
 #include <boost/shared_ptr.hpp>
 #include "include/rbd/librbd.hpp"
-#include "Deser.hpp"
+#include "common/Formatter.h"
+#include "rbd_replay/ActionTypes.h"
 #include "rbd_loc.hpp"
+#include <iostream>
 
 // Stupid Doxygen requires this or else the typedef docs don't appear anywhere.
 /// @file rbd_replay/actions.hpp
@@ -31,44 +33,8 @@ typedef uint64_t thread_id_t;
 /// Even IDs are normal actions, odd IDs are completions.
 typedef uint32_t action_id_t;
 
-/**
-   Dependencies link actions to earlier actions or completions.
-   If an action has a dependency \c d then it waits until \c d.time_delta nanoseconds after the action or completion with ID \c d.id has fired.
-*/
-struct dependency_d {
-  /// ID of the action or completion to wait for.
-  action_id_t id;
-
-  /// Nanoseconds of delay to wait until after the action or completion fires.
-  uint64_t time_delta;
-
-  /**
-     @param id ID of the action or completion to wait for.
-     @param time_delta Nanoseconds of delay to wait after the action or completion fires.
-   */
-  dependency_d(action_id_t id,
-              uint64_t time_delta)
-    : id(id),
-      time_delta(time_delta) {
-  }
-};
-
-// These are written to files, so don't change existing assignments.
-enum io_type {
-  IO_START_THREAD,
-  IO_STOP_THREAD,
-  IO_READ,
-  IO_WRITE,
-  IO_ASYNC_READ,
-  IO_ASYNC_WRITE,
-  IO_OPEN_IMAGE,
-  IO_CLOSE_IMAGE,
-};
-
-
 class PendingIO;
 
-
 /**
    %Context through which an Action interacts with its environment.
  */
@@ -131,17 +97,14 @@ class Action {
 public:
   typedef boost::shared_ptr<Action> ptr;
 
-  Action(action_id_t id,
-        thread_id_t thread_id,
-        std::vector<dependency_d> &predecessors);
-
-  virtual ~Action();
+  virtual ~Action() {
+  }
 
   virtual void perform(ActionCtx &ctx) = 0;
 
   /// Returns the ID of the completion corresponding to this action.
   action_id_t pending_io_id() {
-    return m_id + 1;
+    return id() + 1;
   }
 
   // There's probably a better way to do this, but oh well.
@@ -149,202 +112,172 @@ public:
     return false;
   }
 
-  action_id_t id() const {
-    return m_id;
-  }
+  virtual action_id_t id() const = 0;
+  virtual thread_id_t thread_id() const = 0;
+  virtual const action::Dependencies& predecessors() const = 0;
+
+  virtual std::ostream& dump(std::ostream& o) const = 0;
+
+  static ptr construct(const action::ActionEntry &action_entry);
+};
 
-  thread_id_t thread_id() const {
-    return m_thread_id;
+template <typename ActionType>
+class TypedAction : public Action {
+public:
+  TypedAction(const ActionType &action) : m_action(action) {
   }
 
-  const std::vector<dependency_d>& predecessors() const {
-    return m_predecessors;
+  virtual action_id_t id() const {
+    return m_action.id;
   }
 
-  /// Reads and constructs an action from the replay file.
-  static ptr read_from(Deser &d);
+  virtual thread_id_t thread_id() const {
+    return m_action.thread_id;
+  }
 
-protected:
-  std::ostream& dump_action_fields(std::ostream& o) const;
+  virtual const action::Dependencies& predecessors() const {
+    return m_action.dependencies;
+  }
 
-private:
-  friend std::ostream& operator<<(std::ostream&, const Action&);
+  virtual std::ostream& dump(std::ostream& o) const {
+    o << get_action_name() << ": ";
+    ceph::JSONFormatter formatter(false);
+    formatter.open_object_section("");
+    m_action.dump(&formatter);
+    formatter.close_section();
+    formatter.flush(o);
+    return o;
+  }
 
-  virtual std::ostream& dump(std::ostream& o) const = 0;
+protected:
+  const ActionType m_action;
 
-  const action_id_t m_id;
-  const thread_id_t m_thread_id;
-  const std::vector<dependency_d> m_predecessors;
+  virtual const char *get_action_name() const = 0;
 };
 
 /// Writes human-readable debug information about the action to the stream.
 /// @related Action
 std::ostream& operator<<(std::ostream& o, const Action& a);
 
-
-/**
-   Placeholder for partially-constructed actions.
-   Does nothing, and does not appear in the replay file.
- */
-class DummyAction : public Action {
+class StartThreadAction : public TypedAction<action::StartThreadAction> {
 public:
-  DummyAction(action_id_t id,
-             thread_id_t thread_id,
-             std::vector<dependency_d> &predecessors)
-    : Action(id, thread_id, predecessors) {
+  explicit StartThreadAction(const action::StartThreadAction &action)
+    : TypedAction<action::StartThreadAction>(action) {
   }
 
-  void perform(ActionCtx &ctx) {
+  virtual bool is_start_thread() {
+    return true;
   }
+  virtual void perform(ActionCtx &ctx);
 
-private:
-  std::ostream& dump(std::ostream& o) const;
-};
-
-
-class StopThreadAction : public Action {
-public:
-  explicit StopThreadAction(Action &src);
-
-  void perform(ActionCtx &ctx);
-
-  static Action::ptr read_from(Action &src, Deser &d);
-
-private:
-  std::ostream& dump(std::ostream& o) const;
+protected:
+  virtual const char *get_action_name() const {
+    return "StartThreadAction";
+  }
 };
 
-
-class AioReadAction : public Action {
+class StopThreadAction : public TypedAction<action::StopThreadAction> {
 public:
-  AioReadAction(const Action &src,
-               imagectx_id_t imagectx_id,
-               uint64_t offset,
-               uint64_t length);
-
-  void perform(ActionCtx &ctx);
-
-  static Action::ptr read_from(Action &src, Deser &d);
+  explicit StopThreadAction(const action::StopThreadAction &action)
+    : TypedAction<action::StopThreadAction>(action) {
+  }
 
-private:
-  std::ostream& dump(std::ostream& o) const;
+  virtual void perform(ActionCtx &ctx);
 
-  imagectx_id_t m_imagectx_id;
-  uint64_t m_offset;
-  uint64_t m_length;
+protected:
+  virtual const char *get_action_name() const {
+    return "StartThreadAction";
+  }
 };
 
 
-class ReadAction : public Action {
+class AioReadAction : public TypedAction<action::AioReadAction> {
 public:
-  ReadAction(const Action &src,
-            imagectx_id_t imagectx_id,
-            uint64_t offset,
-            uint64_t length);
-
-  void perform(ActionCtx &ctx);
-
-  static Action::ptr read_from(Action &src, Deser &d);
+  AioReadAction(const action::AioReadAction &action)
+    : TypedAction<action::AioReadAction>(action) {
+  }
 
-private:
-  std::ostream& dump(std::ostream& o) const;
+  virtual void perform(ActionCtx &ctx);
 
-  imagectx_id_t m_imagectx_id;
-  uint64_t m_offset;
-  uint64_t m_length;
+protected:
+  virtual const char *get_action_name() const {
+    return "AioReadAction";
+  }
 };
 
 
-class AioWriteAction : public Action {
+class ReadAction : public TypedAction<action::ReadAction> {
 public:
-  AioWriteAction(const Action &src,
-                imagectx_id_t imagectx_id,
-                uint64_t offset,
-                uint64_t length);
-
-  void perform(ActionCtx &ctx);
-
-  static Action::ptr read_from(Action &src, Deser &d);
+  ReadAction(const action::ReadAction &action)
+    : TypedAction<action::ReadAction>(action) {
+  }
 
-private:
-  std::ostream& dump(std::ostream& o) const;
+  virtual void perform(ActionCtx &ctx);
 
-  imagectx_id_t m_imagectx_id;
-  uint64_t m_offset;
-  uint64_t m_length;
+protected:
+  virtual const char *get_action_name() const {
+    return "ReadAction";
+  }
 };
 
 
-class WriteAction : public Action {
+class AioWriteAction : public TypedAction<action::AioWriteAction> {
 public:
-  WriteAction(const Action &src,
-             imagectx_id_t imagectx_id,
-             uint64_t offset,
-             uint64_t length);
-
-  void perform(ActionCtx &ctx);
-
-  static Action::ptr read_from(Action &src, Deser &d);
+  AioWriteAction(const action::AioWriteAction &action)
+    : TypedAction<action::AioWriteAction>(action) {
+  }
 
-private:
-  std::ostream& dump(std::ostream& o) const;
+  virtual void perform(ActionCtx &ctx);
 
-  imagectx_id_t m_imagectx_id;
-  uint64_t m_offset;
-  uint64_t m_length;
+protected:
+  virtual const char *get_action_name() const {
+    return "AioWriteAction";
+  }
 };
 
 
-class OpenImageAction : public Action {
+class WriteAction : public TypedAction<action::WriteAction> {
 public:
-  OpenImageAction(Action &src,
-                 imagectx_id_t imagectx_id,
-                 std::string name,
-                 std::string snap_name,
-                 bool readonly);
-
-  void perform(ActionCtx &ctx);
-
-  static Action::ptr read_from(Action &src, Deser &d);
+  WriteAction(const action::WriteAction &action)
+    : TypedAction<action::WriteAction>(action) {
+  }
 
-private:
-  std::ostream& dump(std::ostream& o) const;
+  virtual void perform(ActionCtx &ctx);
 
-  imagectx_id_t m_imagectx_id;
-  std::string m_name;
-  std::string m_snap_name;
-  bool m_readonly;
+protected:
+  virtual const char *get_action_name() const {
+    return "WriteAction";
+  }
 };
 
 
-class CloseImageAction : public Action {
+class OpenImageAction : public TypedAction<action::OpenImageAction> {
 public:
-  CloseImageAction(Action &src,
-                  imagectx_id_t imagectx_id);
-
-  void perform(ActionCtx &ctx);
-
-  static Action::ptr read_from(Action &src, Deser &d);
+  OpenImageAction(const action::OpenImageAction &action)
+    : TypedAction<action::OpenImageAction>(action) {
+  }
 
-private:
-  std::ostream& dump(std::ostream& o) const;
+  virtual void perform(ActionCtx &ctx);
 
-  imagectx_id_t m_imagectx_id;
+protected:
+  virtual const char *get_action_name() const {
+    return "OpenImageAction";
+  }
 };
 
 
-class StartThreadAction : public Action {
+class CloseImageAction : public TypedAction<action::CloseImageAction> {
 public:
-  explicit StartThreadAction(Action &src);
-
-  void perform(ActionCtx &ctx);
-
-  bool is_start_thread();
+  CloseImageAction(const action::CloseImageAction &action)
+    : TypedAction<action::CloseImageAction>(action) {
+  }
 
-  static Action::ptr read_from(Action &src, Deser &d);
+  virtual void perform(ActionCtx &ctx);
 
-private:
-  std::ostream& dump(std::ostream& o) const;
+protected:
+  virtual const char *get_action_name() const {
+    return "CloseImageAction";
+  }
 };
 
 }
index 21a68019ececd82b0999bea38ed1b27add4363a6..7437bed8897846fc6786e566bfdd90a0ac9d9f59 100644 (file)
 // In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b.
 
 #include "ios.hpp"
+#include "rbd_replay/ActionTypes.h"
 
 using namespace std;
 using namespace rbd_replay;
 
-bool rbd_replay::compare_io_ptrs_by_start_time(IO::ptr p1, IO::ptr p2) {
-  return p1->start_time() < p2->start_time();
+namespace {
+
+bool compare_dependencies_by_start_time(const action::Dependency &lhs,
+                                        const action::Dependency &rhs) {
+  return lhs.time_delta < rhs.time_delta;
+}
+
+action::Dependencies convert_dependencies(uint64_t start_time,
+                                          const io_set_t &deps) {
+  action::Dependencies action_deps;
+  action_deps.reserve(deps.size());
+  for (io_set_t::const_iterator it = deps.begin(); it != deps.end(); ++it) {
+    boost::shared_ptr<IO> io = *it;
+    uint64_t time_delta = 0;
+    if (start_time >= io->start_time()) {
+      time_delta = start_time - io->start_time();
+    }
+    action_deps.push_back(action::Dependency(io->ionum(), time_delta));
+  }
+  std::sort(action_deps.begin(), action_deps.end(),
+            compare_dependencies_by_start_time);
+  return action_deps;
 }
 
+} // anonymous namespace
+
 void IO::write_debug_base(ostream& out, string type) const {
   out << m_ionum << ": " << m_start_time / 1000000.0 << ": " << type << ", thread = " << m_thread_id << ", deps = {";
   bool first = true;
@@ -39,51 +62,36 @@ void IO::write_debug_base(ostream& out, string type) const {
 }
 
 
-void IO::write_to(Ser& out, io_type iotype) const {
-  // TODO break compatibility now to add version (and yank unused fields)?
-  out.write_uint8_t(iotype);
-  out.write_uint32_t(m_ionum);
-  out.write_uint64_t(m_thread_id);
-  out.write_uint32_t(0);
-  out.write_uint32_t(0);
-  out.write_uint32_t(m_dependencies.size());
-  vector<IO::ptr> deps;
-  for (io_set_t::const_iterator itr = m_dependencies.begin(), end = m_dependencies.end(); itr != end; ++itr) {
-    deps.push_back(*itr);
-  }
-  sort(deps.begin(), deps.end(), compare_io_ptrs_by_start_time);
-  for (vector<IO::ptr>::const_iterator itr = deps.begin(), end = deps.end(); itr != end; ++itr) {
-    out.write_uint32_t((*itr)->m_ionum);
-    out.write_uint64_t(m_start_time - (*itr)->m_start_time);
-  }
-}
-
 ostream& operator<<(ostream& out, IO::ptr io) {
   io->write_debug(out);
   return out;
 }
 
-void StartThreadIO::write_to(Ser& out) const {
-  IO::write_to(out, IO_START_THREAD);
+void StartThreadIO::encode(bufferlist &bl) const {
+  action::Action action((action::StartThreadAction(
+    ionum(), thread_id(), convert_dependencies(start_time(), dependencies()))));
+  ::encode(action, bl);
 }
 
 void StartThreadIO::write_debug(std::ostream& out) const {
   write_debug_base(out, "start thread");
 }
 
-void StopThreadIO::write_to(Ser& out) const {
-  IO::write_to(out, IO_STOP_THREAD);
+void StopThreadIO::encode(bufferlist &bl) const {
+  action::Action action((action::StopThreadAction(
+    ionum(), thread_id(), convert_dependencies(start_time(), dependencies()))));
+  ::encode(action, bl);
 }
 
 void StopThreadIO::write_debug(std::ostream& out) const {
   write_debug_base(out, "stop thread");
 }
 
-void ReadIO::write_to(Ser& out) const {
-  IO::write_to(out, IO_READ);
-  out.write_uint64_t(m_imagectx);
-  out.write_uint64_t(m_offset);
-  out.write_uint64_t(m_length);
+void ReadIO::encode(bufferlist &bl) const {
+  action::Action action((action::ReadAction(
+    ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
+    m_imagectx, m_offset, m_length)));
+  ::encode(action, bl);
 }
 
 void ReadIO::write_debug(std::ostream& out) const {
@@ -91,11 +99,11 @@ void ReadIO::write_debug(std::ostream& out) const {
   out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
 }
 
-void WriteIO::write_to(Ser& out) const {
-  IO::write_to(out, IO_WRITE);
-  out.write_uint64_t(m_imagectx);
-  out.write_uint64_t(m_offset);
-  out.write_uint64_t(m_length);
+void WriteIO::encode(bufferlist &bl) const {
+  action::Action action((action::WriteAction(
+    ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
+    m_imagectx, m_offset, m_length)));
+  ::encode(action, bl);
 }
 
 void WriteIO::write_debug(std::ostream& out) const {
@@ -103,11 +111,11 @@ void WriteIO::write_debug(std::ostream& out) const {
   out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
 }
 
-void AioReadIO::write_to(Ser& out) const {
-  IO::write_to(out, IO_ASYNC_READ);
-  out.write_uint64_t(m_imagectx);
-  out.write_uint64_t(m_offset);
-  out.write_uint64_t(m_length);
+void AioReadIO::encode(bufferlist &bl) const {
+  action::Action action((action::AioReadAction(
+    ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
+    m_imagectx, m_offset, m_length)));
+  ::encode(action, bl);
 }
 
 void AioReadIO::write_debug(std::ostream& out) const {
@@ -115,11 +123,11 @@ void AioReadIO::write_debug(std::ostream& out) const {
   out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
 }
 
-void AioWriteIO::write_to(Ser& out) const {
-  IO::write_to(out, IO_ASYNC_WRITE);
-  out.write_uint64_t(m_imagectx);
-  out.write_uint64_t(m_offset);
-  out.write_uint64_t(m_length);
+void AioWriteIO::encode(bufferlist &bl) const {
+  action::Action action((action::AioWriteAction(
+    ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
+    m_imagectx, m_offset, m_length)));
+  ::encode(action, bl);
 }
 
 void AioWriteIO::write_debug(std::ostream& out) const {
@@ -127,12 +135,11 @@ void AioWriteIO::write_debug(std::ostream& out) const {
   out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
 }
 
-void OpenImageIO::write_to(Ser& out) const {
-  IO::write_to(out, IO_OPEN_IMAGE);
-  out.write_uint64_t(m_imagectx);
-  out.write_string(m_name);
-  out.write_string(m_snap_name);
-  out.write_bool(m_readonly);
+void OpenImageIO::encode(bufferlist &bl) const {
+  action::Action action((action::OpenImageAction(
+    ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
+    m_imagectx, m_name, m_snap_name, m_readonly)));
+  ::encode(action, bl);
 }
 
 void OpenImageIO::write_debug(std::ostream& out) const {
@@ -140,9 +147,11 @@ void OpenImageIO::write_debug(std::ostream& out) const {
   out << ", imagectx=" << m_imagectx << ", name='" << m_name << "', snap_name='" << m_snap_name << "', readonly=" << m_readonly;
 }
 
-void CloseImageIO::write_to(Ser& out) const {
-  IO::write_to(out, IO_CLOSE_IMAGE);
-  out.write_uint64_t(m_imagectx);
+void CloseImageIO::encode(bufferlist &bl) const {
+  action::Action action((action::CloseImageAction(
+    ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
+    m_imagectx)));
+  ::encode(action, bl);
 }
 
 void CloseImageIO::write_debug(std::ostream& out) const {
index 73e6f7674592a179f3a1d8396ed8f3b520961147..17559331c7a53d7a48dc6cdf57d2382b10703a29 100644 (file)
@@ -18,6 +18,7 @@
 // This code assumes that IO IDs and timestamps are related monotonically.
 // In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b.
 
+#include "include/buffer.h"
 #include <boost/enable_shared_from_this.hpp>
 #include <boost/shared_ptr.hpp>
 #include <iostream>
@@ -25,7 +26,6 @@
 #include <set>
 #include <vector>
 #include "actions.hpp"
-#include "Ser.hpp"
 
 
 namespace rbd_replay {
@@ -77,7 +77,7 @@ public:
     return m_dependencies;
   }
 
-  virtual void write_to(Ser& out) const = 0;
+  virtual void encode(bufferlist &bl) const = 0;
 
   void set_ionum(action_id_t ionum) {
     m_ionum = ionum;
@@ -87,11 +87,13 @@ public:
     return m_ionum;
   }
 
+  thread_id_t thread_id() const {
+    return m_thread_id;
+  }
+
   virtual void write_debug(std::ostream& out) const = 0;
 
 protected:
-  void write_to(Ser& out, io_type iotype) const;
-
   void write_debug_base(std::ostream& out, std::string iotype) const;
 
 private:
@@ -115,7 +117,7 @@ public:
     : IO(ionum, start_time, thread_id, io_set_t()) {
   }
 
-  void write_to(Ser& out) const;
+  virtual void encode(bufferlist &bl) const;
 
   void write_debug(std::ostream& out) const;
 };
@@ -129,7 +131,7 @@ public:
     : IO(ionum, start_time, thread_id, deps) {
   }
 
-  void write_to(Ser& out) const;
+  virtual void encode(bufferlist &bl) const;
 
   void write_debug(std::ostream& out) const;
 };
@@ -149,7 +151,7 @@ public:
       m_length(length) {
   }
 
-  void write_to(Ser& out) const;
+  virtual void encode(bufferlist &bl) const;
 
   void write_debug(std::ostream& out) const;
 
@@ -174,7 +176,7 @@ public:
       m_length(length) {
   }
 
-  void write_to(Ser& out) const;
+  virtual void encode(bufferlist &bl) const;
 
   void write_debug(std::ostream& out) const;
 
@@ -199,7 +201,7 @@ public:
       m_length(length) {
   }
 
-  void write_to(Ser& out) const;
+  virtual void encode(bufferlist &bl) const;
 
   void write_debug(std::ostream& out) const;
 
@@ -224,7 +226,7 @@ public:
       m_length(length) {
   }
 
-  void write_to(Ser& out) const;
+  virtual void encode(bufferlist &bl) const;
 
   void write_debug(std::ostream& out) const;
 
@@ -251,7 +253,7 @@ public:
       m_readonly(readonly) {
   }
 
-  void write_to(Ser& out) const;
+  virtual void encode(bufferlist &bl) const;
 
   imagectx_id_t imagectx() const {
     return m_imagectx;
@@ -277,7 +279,7 @@ public:
       m_imagectx(imagectx) {
   }
 
-  void write_to(Ser& out) const;
+  virtual void encode(bufferlist &bl) const;
 
   imagectx_id_t imagectx() const {
     return m_imagectx;
@@ -289,9 +291,6 @@ private:
   imagectx_id_t m_imagectx;
 };
 
-/// @related IO
-bool compare_io_ptrs_by_start_time(IO::ptr p1, IO::ptr p2);
-
 }
 
 #endif
index 94042a5191eecd959b5826edeb93b860b2ba4e09..61cff592cf2759d7dfa49183f468ab821cf89557 100644 (file)
 // This code assumes that IO IDs and timestamps are related monotonically.
 // In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b.
 
+#include "common/errno.h"
+#include "rbd_replay/ActionTypes.h"
 #include <babeltrace/babeltrace.h>
 #include <babeltrace/ctf/events.h>
 #include <babeltrace/ctf/iterator.h>
+#include <sys/types.h>
+#include <fcntl.h>
 #include <cstdlib>
 #include <string>
 #include <assert.h>
 #include <fstream>
 #include <set>
 #include <boost/thread/thread.hpp>
+#include <boost/scope_exit.hpp>
 #include "ios.hpp"
 
 using namespace std;
@@ -193,12 +198,14 @@ public:
 
     struct bt_iter *bt_itr = bt_ctf_get_iter(itr);
 
-    ofstream myfile;
-    myfile.open(output_file_name.c_str(), ios::out | ios::binary | ios::trunc);
-    ASSERT_EXIT(!myfile.fail(), "Error opening output file " <<
-                                output_file_name);
+    int fd = open(output_file_name.c_str(), O_WRONLY | O_CREAT | O_EXCL, 0644);
+    ASSERT_EXIT(fd >= 0, "Error opening output file " << output_file_name <<
+                         ": " << cpp_strerror(errno));
+    BOOST_SCOPE_EXIT( (fd) ) {
+      close(fd);
+    } BOOST_SCOPE_EXIT_END;
 
-    Ser ser(myfile);
+    write_banner(fd);
 
     uint64_t trace_start = 0;
     bool first = true;
@@ -219,7 +226,7 @@ public:
 
       IO::ptrs ptrs;
       process_event(ts, evt, &ptrs);
-      serialize_events(ser, ptrs);
+      serialize_events(fd, ptrs);
 
       int r = bt_iter_next(bt_itr);
       ASSERT_EXIT(r == 0, "Error advancing event iterator");
@@ -227,15 +234,26 @@ public:
 
     bt_ctf_iter_destroy(itr);
 
-    insert_thread_stops(ser);
-    myfile.close();
+    insert_thread_stops(fd);
   }
 
 private:
-  void serialize_events(Ser &ser, const IO::ptrs &ptrs) {
+  void write_banner(int fd) {
+    bufferlist bl;
+    bl.append(rbd_replay::action::BANNER);
+    int r = bl.write_fd(fd);
+    ASSERT_EXIT(r >= 0, "Error writing to output file: " << cpp_strerror(r));
+  }
+
+  void serialize_events(int fd, const IO::ptrs &ptrs) {
     for (IO::ptrs::const_iterator it = ptrs.begin(); it != ptrs.end(); ++it) {
       IO::ptr io(*it);
-      io->write_to(ser);
+
+      bufferlist bl;
+      io->encode(bl);
+
+      int r = bl.write_fd(fd);
+      ASSERT_EXIT(r >= 0, "Error writing to output file: " << cpp_strerror(r));
 
       if (m_verbose) {
         io->write_debug(std::cout);
@@ -244,7 +262,7 @@ private:
     }
   }
 
-  void insert_thread_stops(Ser &ser) {
+  void insert_thread_stops(int fd) {
     IO::ptrs ios;
     for (map<thread_id_t, Thread::ptr>::const_iterator itr = m_threads.begin(),
          end = m_threads.end(); itr != end; ++itr) {
@@ -253,7 +271,7 @@ private:
                                              thread->id(),
                                              m_recent_completions)));
     }
-    serialize_events(ser, ios);
+    serialize_events(fd, ios);
   }
 
   void process_event(uint64_t ts, struct bt_ctf_event *evt,