--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "rbd_replay/ActionTypes.h"
+#include "include/assert.h"
+#include "include/byteorder.h"
+#include "include/stringify.h"
+#include "common/Formatter.h"
+#include <iostream>
+#include <boost/variant.hpp>
+
+namespace rbd_replay {
+namespace action {
+
+namespace {
+
+bool byte_swap_required(__u8 version) {
+#if defined(CEPH_LITTLE_ENDIAN)
+ return (version == 0);
+#else
+ return false;
+#endif
+}
+
+void decode_big_endian_string(std::string &str, bufferlist::iterator &it) {
+#if defined(CEPH_LITTLE_ENDIAN)
+ uint32_t length;
+ ::decode(length, it);
+ length = swab32(length);
+ str.clear();
+ it.copy(length, str);
+#else
+ assert(false);
+#endif
+}
+
+class EncodeVisitor : public boost::static_visitor<void> {
+public:
+ EncodeVisitor(bufferlist &bl) : m_bl(bl) {
+ }
+
+ template <typename Action>
+ inline void operator()(const Action &action) const {
+ ::encode(static_cast<uint8_t>(Action::ACTION_TYPE), m_bl);
+ action.encode(m_bl);
+ }
+private:
+ bufferlist &m_bl;
+};
+
+class DecodeVisitor : public boost::static_visitor<void> {
+public:
+ DecodeVisitor(__u8 version, bufferlist::iterator &iter)
+ : m_version(version), m_iter(iter) {
+ }
+
+ template <typename Action>
+ inline void operator()(Action &action) const {
+ action.decode(m_version, m_iter);
+ }
+private:
+ __u8 m_version;
+ bufferlist::iterator &m_iter;
+};
+
+class DumpVisitor : public boost::static_visitor<void> {
+public:
+ DumpVisitor(Formatter *formatter) : m_formatter(formatter) {}
+
+ template <typename Action>
+ inline void operator()(const Action &action) const {
+ ActionType action_type = Action::ACTION_TYPE;
+ m_formatter->dump_string("action_type", stringify(action_type));
+ action.dump(m_formatter);
+ }
+private:
+ ceph::Formatter *m_formatter;
+};
+
+} // anonymous namespace
+
+void Dependency::encode(bufferlist &bl) const {
+ ::encode(id, bl);
+ ::encode(time_delta, bl);
+}
+
+void Dependency::decode(bufferlist::iterator &it) {
+ decode(1, it);
+}
+
+void Dependency::decode(__u8 version, bufferlist::iterator &it) {
+ ::decode(id, it);
+ ::decode(time_delta, it);
+ if (byte_swap_required(version)) {
+ id = swab32(id);
+ time_delta = swab64(time_delta);
+ }
+}
+
+void Dependency::dump(Formatter *f) const {
+ f->dump_unsigned("id", id);
+ f->dump_unsigned("time_delta", time_delta);
+}
+
+void Dependency::generate_test_instances(std::list<Dependency *> &o) {
+ o.push_back(new Dependency());
+ o.push_back(new Dependency(1, 123456789));
+}
+
+void ActionBase::encode(bufferlist &bl) const {
+ ::encode(id, bl);
+ ::encode(thread_id, bl);
+ ::encode(dependencies, bl);
+}
+
+void ActionBase::decode(__u8 version, bufferlist::iterator &it) {
+ ::decode(id, it);
+ ::decode(thread_id, it);
+ if (version == 0) {
+ uint32_t num_successors;
+ ::decode(num_successors, it);
+
+ uint32_t num_completion_successors;
+ ::decode(num_completion_successors, it);
+ }
+
+ if (byte_swap_required(version)) {
+ id = swab32(id);
+ thread_id = swab64(thread_id);
+
+ uint32_t dep_count;
+ ::decode(dep_count, it);
+ dep_count = swab32(dep_count);
+ dependencies.resize(dep_count);
+ for (uint32_t i = 0; i < dep_count; ++i) {
+ dependencies[i].decode(0, it);
+ }
+ } else {
+ ::decode(dependencies, it);
+ }
+}
+
+void ActionBase::dump(Formatter *f) const {
+ f->dump_unsigned("id", id);
+ f->dump_unsigned("thread_id", thread_id);
+ f->open_array_section("dependencies");
+ for (size_t i = 0; i < dependencies.size(); ++i) {
+ f->open_object_section("dependency");
+ dependencies[i].dump(f);
+ f->close_section();
+ }
+ f->close_section();
+}
+
+void ImageActionBase::encode(bufferlist &bl) const {
+ ActionBase::encode(bl);
+ ::encode(imagectx_id, bl);
+}
+
+void ImageActionBase::decode(__u8 version, bufferlist::iterator &it) {
+ ActionBase::decode(version, it);
+ ::decode(imagectx_id, it);
+ if (byte_swap_required(version)) {
+ imagectx_id = swab64(imagectx_id);
+ }
+}
+
+void ImageActionBase::dump(Formatter *f) const {
+ ActionBase::dump(f);
+ f->dump_unsigned("imagectx_id", imagectx_id);
+}
+
+void IoActionBase::encode(bufferlist &bl) const {
+ ImageActionBase::encode(bl);
+ ::encode(offset, bl);
+ ::encode(length, bl);
+}
+
+void IoActionBase::decode(__u8 version, bufferlist::iterator &it) {
+ ImageActionBase::decode(version, it);
+ ::decode(offset, it);
+ ::decode(length, it);
+ if (byte_swap_required(version)) {
+ offset = swab64(offset);
+ length = swab64(length);
+ }
+}
+
+void IoActionBase::dump(Formatter *f) const {
+ ImageActionBase::dump(f);
+ f->dump_unsigned("offset", offset);
+ f->dump_unsigned("length", length);
+}
+
+void OpenImageAction::encode(bufferlist &bl) const {
+ ImageActionBase::encode(bl);
+ ::encode(name, bl);
+ ::encode(snap_name, bl);
+ ::encode(read_only, bl);
+}
+
+void OpenImageAction::decode(__u8 version, bufferlist::iterator &it) {
+ ImageActionBase::decode(version, it);
+ if (byte_swap_required(version)) {
+ decode_big_endian_string(name, it);
+ decode_big_endian_string(snap_name, it);
+ } else {
+ ::decode(name, it);
+ ::decode(snap_name, it);
+ }
+ ::decode(read_only, it);
+}
+
+void OpenImageAction::dump(Formatter *f) const {
+ ImageActionBase::dump(f);
+ f->dump_string("name", name);
+ f->dump_string("snap_name", snap_name);
+ f->dump_bool("read_only", read_only);
+}
+
+void UnknownAction::encode(bufferlist &bl) const {
+ assert(false);
+}
+
+void UnknownAction::decode(__u8 version, bufferlist::iterator &it) {
+}
+
+void UnknownAction::dump(Formatter *f) const {
+}
+
+void ActionEntry::encode(bufferlist &bl) const {
+ ENCODE_START(1, 1, bl);
+ boost::apply_visitor(EncodeVisitor(bl), action);
+ ENCODE_FINISH(bl);
+}
+
+void ActionEntry::decode(bufferlist::iterator &it) {
+ DECODE_START(1, it);
+ decode(struct_v, it);
+ DECODE_FINISH(it);
+}
+
+void ActionEntry::decode_unversioned(bufferlist::iterator &it) {
+ decode(0, it);
+}
+
+void ActionEntry::decode(__u8 version, bufferlist::iterator &it) {
+ uint8_t action_type;
+ ::decode(action_type, it);
+
+ // select the correct action variant based upon the action_type
+ switch (action_type) {
+ case ACTION_TYPE_START_THREAD:
+ action = StartThreadAction();
+ break;
+ case ACTION_TYPE_STOP_THREAD:
+ action = StopThreadAction();
+ break;
+ case ACTION_TYPE_READ:
+ action = ReadAction();
+ break;
+ case ACTION_TYPE_WRITE:
+ action = WriteAction();
+ break;
+ case ACTION_TYPE_AIO_READ:
+ action = AioReadAction();
+ break;
+ case ACTION_TYPE_AIO_WRITE:
+ action = AioWriteAction();
+ break;
+ case ACTION_TYPE_OPEN_IMAGE:
+ action = OpenImageAction();
+ break;
+ case ACTION_TYPE_CLOSE_IMAGE:
+ action = CloseImageAction();
+ break;
+ }
+
+ boost::apply_visitor(DecodeVisitor(version, it), action);
+}
+
+void ActionEntry::dump(Formatter *f) const {
+ boost::apply_visitor(DumpVisitor(f), action);
+}
+
+void ActionEntry::generate_test_instances(std::list<ActionEntry *> &o) {
+ Dependencies dependencies;
+ dependencies.push_back(Dependency(3, 123456789));
+ dependencies.push_back(Dependency(4, 234567890));
+
+ o.push_back(new ActionEntry(StartThreadAction()));
+ o.push_back(new ActionEntry(StartThreadAction(1, 123456789, dependencies)));
+ o.push_back(new ActionEntry(StopThreadAction()));
+ o.push_back(new ActionEntry(StopThreadAction(1, 123456789, dependencies)));
+
+ o.push_back(new ActionEntry(ReadAction()));
+ o.push_back(new ActionEntry(ReadAction(1, 123456789, dependencies, 3, 4, 5)));
+ o.push_back(new ActionEntry(WriteAction()));
+ o.push_back(new ActionEntry(WriteAction(1, 123456789, dependencies, 3, 4,
+ 5)));
+ o.push_back(new ActionEntry(AioReadAction()));
+ o.push_back(new ActionEntry(AioReadAction(1, 123456789, dependencies, 3, 4,
+ 5)));
+ o.push_back(new ActionEntry(AioWriteAction()));
+ o.push_back(new ActionEntry(AioWriteAction(1, 123456789, dependencies, 3, 4,
+ 5)));
+
+ o.push_back(new ActionEntry(OpenImageAction()));
+ o.push_back(new ActionEntry(OpenImageAction(1, 123456789, dependencies, 3,
+ "image_name", "snap_name",
+ true)));
+ o.push_back(new ActionEntry(CloseImageAction()));
+ o.push_back(new ActionEntry(CloseImageAction(1, 123456789, dependencies, 3)));
+}
+
+} // namespace action
+} // namespace rbd_replay
+
+std::ostream &operator<<(std::ostream &out,
+ const rbd_replay::action::ActionType &type) {
+ using namespace rbd_replay::action;
+
+ switch (type) {
+ case ACTION_TYPE_START_THREAD:
+ out << "StartThread";
+ break;
+ case ACTION_TYPE_STOP_THREAD:
+ out << "StopThread";
+ break;
+ case ACTION_TYPE_READ:
+ out << "Read";
+ break;
+ case ACTION_TYPE_WRITE:
+ out << "Write";
+ break;
+ case ACTION_TYPE_AIO_READ:
+ out << "AioRead";
+ break;
+ case ACTION_TYPE_AIO_WRITE:
+ out << "AioWrite";
+ break;
+ case ACTION_TYPE_OPEN_IMAGE:
+ out << "OpenImage";
+ break;
+ case ACTION_TYPE_CLOSE_IMAGE:
+ out << "CloseImage";
+ break;
+ default:
+ out << "Unknown (" << static_cast<uint32_t>(type) << ")";
+ break;
+ }
+ return out;
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_RBD_REPLAY_ACTION_TYPES_H
+#define CEPH_RBD_REPLAY_ACTION_TYPES_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+#include "include/encoding.h"
+#include <iosfwd>
+#include <list>
+#include <string>
+#include <vector>
+#include <boost/variant/variant.hpp>
+
+namespace ceph { class Formatter; }
+
+namespace rbd_replay {
+namespace action {
+
+typedef uint64_t imagectx_id_t;
+typedef uint64_t thread_id_t;
+
+/// Even IDs are normal actions, odd IDs are completions.
+typedef uint32_t action_id_t;
+
+static const std::string BANNER("rbd-replay-trace");
+
+/**
+ * Dependencies link actions to earlier actions or completions.
+ * If an action has a dependency \c d then it waits until \c d.time_delta
+ * nanoseconds after the action or completion with ID \c d.id has fired.
+ */
+struct Dependency {
+ /// ID of the action or completion to wait for.
+ action_id_t id;
+
+ /// Nanoseconds of delay to wait until after the action or completion fires.
+ uint64_t time_delta;
+
+ /**
+ * @param id ID of the action or completion to wait for.
+ * @param time_delta Nanoseconds of delay to wait after the action or
+ * completion fires.
+ */
+ Dependency() : id(0), time_delta(0) {
+ }
+ Dependency(action_id_t id, uint64_t time_delta)
+ : id(id), time_delta(time_delta) {
+ }
+
+ void encode(bufferlist &bl) const;
+ void decode(bufferlist::iterator &it);
+ void decode(__u8 version, bufferlist::iterator &it);
+ void dump(Formatter *f) const;
+
+ static void generate_test_instances(std::list<Dependency *> &o);
+};
+
+WRITE_CLASS_ENCODER(Dependency);
+
+typedef std::vector<Dependency> Dependencies;
+
+enum ActionType {
+ ACTION_TYPE_START_THREAD = 0,
+ ACTION_TYPE_STOP_THREAD = 1,
+ ACTION_TYPE_READ = 2,
+ ACTION_TYPE_WRITE = 3,
+ ACTION_TYPE_AIO_READ = 4,
+ ACTION_TYPE_AIO_WRITE = 5,
+ ACTION_TYPE_OPEN_IMAGE = 6,
+ ACTION_TYPE_CLOSE_IMAGE = 7
+};
+
+struct ActionBase {
+ action_id_t id;
+ thread_id_t thread_id;
+ Dependencies dependencies;
+
+ ActionBase() : id(0), thread_id(0) {
+ }
+ ActionBase(action_id_t id, thread_id_t thread_id,
+ const Dependencies &dependencies)
+ : id(id), thread_id(thread_id), dependencies(dependencies) {
+ }
+
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::iterator &it);
+ void dump(Formatter *f) const;
+};
+
+struct StartThreadAction : public ActionBase {
+ static const ActionType ACTION_TYPE = ACTION_TYPE_START_THREAD;
+
+ StartThreadAction() {
+ }
+ StartThreadAction(action_id_t id, thread_id_t thread_id,
+ const Dependencies &dependencies)
+ : ActionBase(id, thread_id, dependencies) {
+ }
+};
+
+struct StopThreadAction : public ActionBase {
+ static const ActionType ACTION_TYPE = ACTION_TYPE_STOP_THREAD;
+
+ StopThreadAction() {
+ }
+ StopThreadAction(action_id_t id, thread_id_t thread_id,
+ const Dependencies &dependencies)
+ : ActionBase(id, thread_id, dependencies) {
+ }
+};
+
+struct ImageActionBase : public ActionBase {
+ imagectx_id_t imagectx_id;
+
+ ImageActionBase() : imagectx_id(0) {
+ }
+ ImageActionBase(action_id_t id, thread_id_t thread_id,
+ const Dependencies &dependencies, imagectx_id_t imagectx_id)
+ : ActionBase(id, thread_id, dependencies), imagectx_id(imagectx_id) {
+ }
+
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::iterator &it);
+ void dump(Formatter *f) const;
+};
+
+struct IoActionBase : public ImageActionBase {
+ uint64_t offset;
+ uint64_t length;
+
+ IoActionBase() : offset(0), length(0) {
+ }
+ IoActionBase(action_id_t id, thread_id_t thread_id,
+ const Dependencies &dependencies, imagectx_id_t imagectx_id,
+ uint64_t offset, uint64_t length)
+ : ImageActionBase(id, thread_id, dependencies, imagectx_id),
+ offset(offset), length(length) {
+ }
+
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::iterator &it);
+ void dump(Formatter *f) const;
+};
+
+struct ReadAction : public IoActionBase {
+ static const ActionType ACTION_TYPE = ACTION_TYPE_READ;
+
+ ReadAction() {
+ }
+ ReadAction(action_id_t id, thread_id_t thread_id,
+ const Dependencies &dependencies, imagectx_id_t imagectx_id,
+ uint64_t offset, uint64_t length)
+ : IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) {
+ }
+};
+
+struct WriteAction : public IoActionBase {
+ static const ActionType ACTION_TYPE = ACTION_TYPE_WRITE;
+
+ WriteAction() {
+ }
+ WriteAction(action_id_t id, thread_id_t thread_id,
+ const Dependencies &dependencies, imagectx_id_t imagectx_id,
+ uint64_t offset, uint64_t length)
+ : IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) {
+ }
+};
+
+struct AioReadAction : public IoActionBase {
+ static const ActionType ACTION_TYPE = ACTION_TYPE_AIO_READ;
+
+ AioReadAction() {
+ }
+ AioReadAction(action_id_t id, thread_id_t thread_id,
+ const Dependencies &dependencies, imagectx_id_t imagectx_id,
+ uint64_t offset, uint64_t length)
+ : IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) {
+ }
+};
+
+struct AioWriteAction : public IoActionBase {
+ static const ActionType ACTION_TYPE = ACTION_TYPE_AIO_WRITE;
+
+ AioWriteAction() {
+ }
+ AioWriteAction(action_id_t id, thread_id_t thread_id,
+ const Dependencies &dependencies, imagectx_id_t imagectx_id,
+ uint64_t offset, uint64_t length)
+ : IoActionBase(id, thread_id, dependencies, imagectx_id, offset, length) {
+ }
+};
+
+struct OpenImageAction : public ImageActionBase {
+ static const ActionType ACTION_TYPE = ACTION_TYPE_OPEN_IMAGE;
+
+ std::string name;
+ std::string snap_name;
+ bool read_only;
+
+ OpenImageAction() : read_only(false) {
+ }
+ OpenImageAction(action_id_t id, thread_id_t thread_id,
+ const Dependencies &dependencies, imagectx_id_t imagectx_id,
+ const std::string &name, const std::string &snap_name,
+ bool read_only)
+ : ImageActionBase(id, thread_id, dependencies, imagectx_id),
+ name(name), snap_name(snap_name), read_only(read_only) {
+ }
+
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::iterator &it);
+ void dump(Formatter *f) const;
+};
+
+struct CloseImageAction : public ImageActionBase {
+ static const ActionType ACTION_TYPE = ACTION_TYPE_CLOSE_IMAGE;
+
+ CloseImageAction() {
+ }
+ CloseImageAction(action_id_t id, thread_id_t thread_id,
+ const Dependencies &dependencies, imagectx_id_t imagectx_id)
+ : ImageActionBase(id, thread_id, dependencies, imagectx_id) {
+ }
+};
+
+struct UnknownAction {
+ static const ActionType ACTION_TYPE = static_cast<ActionType>(-1);
+
+ void encode(bufferlist &bl) const;
+ void decode(__u8 version, bufferlist::iterator &it);
+ void dump(Formatter *f) const;
+};
+
+typedef boost::variant<StartThreadAction,
+ StopThreadAction,
+ ReadAction,
+ WriteAction,
+ AioReadAction,
+ AioWriteAction,
+ OpenImageAction,
+ CloseImageAction,
+ UnknownAction> Action;
+
+class ActionEntry {
+public:
+ Action action;
+
+ ActionEntry() : action(UnknownAction()) {
+ }
+ ActionEntry(const Action &action) : action(action) {
+ }
+
+ void encode(bufferlist &bl) const;
+ void decode(bufferlist::iterator &it);
+ void decode_unversioned(bufferlist::iterator &it);
+ void dump(Formatter *f) const;
+
+ static void generate_test_instances(std::list<ActionEntry *> &o);
+
+private:
+ void decode(__u8 version, bufferlist::iterator &it);
+};
+
+WRITE_CLASS_ENCODER(ActionEntry);
+
+} // namespace action
+} // namespace rbd_replay
+
+std::ostream &operator<<(std::ostream &out,
+ const rbd_replay::action::ActionType &type);
+
+using rbd_replay::action::decode;
+using rbd_replay::action::encode;
+
+#endif // CEPH_RBD_REPLAY_ACTION_TYPES_H
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "rbd_replay/BufferReader.h"
+#include "include/assert.h"
+#include "include/intarith.h"
+#include "include/page.h"
+
+namespace rbd_replay {
+
+BufferReader::BufferReader(int fd, size_t min_bytes, size_t max_bytes)
+ : m_fd(fd), m_min_bytes(min_bytes), m_max_bytes(max_bytes),
+ m_bl_it(m_bl.begin()) {
+ assert(m_min_bytes <= m_max_bytes);
+}
+
+int BufferReader::fetch(bufferlist::iterator **it) {
+ if (m_bl_it.get_remaining() < m_min_bytes) {
+ ssize_t bytes_to_read = ROUND_UP_TO(m_max_bytes - m_bl_it.get_remaining(),
+ CEPH_PAGE_SIZE);
+ while (bytes_to_read > 0) {
+ int r = m_bl.read_fd(m_fd, CEPH_PAGE_SIZE);
+ if (r < 0) {
+ return r;
+ }
+ assert(r <= bytes_to_read);
+ bytes_to_read -= r;
+ }
+ }
+
+ *it = &m_bl_it;
+ return 0;
+}
+
+} // namespace rbd_replay
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_RBD_REPLAY_BUFFER_READER_H
+#define CEPH_RBD_REPLAY_BUFFER_READER_H
+
+#include "include/int_types.h"
+#include "include/buffer.h"
+
+namespace rbd_replay {
+
+class BufferReader {
+public:
+ static const size_t DEFAULT_MIN_BYTES = 1<<20;
+ static const size_t DEFAULT_MAX_BYTES = 1<<22;
+
+ BufferReader(int fd, size_t min_bytes = DEFAULT_MIN_BYTES,
+ size_t max_bytes = DEFAULT_MAX_BYTES);
+
+ int fetch(bufferlist::iterator **it);
+
+private:
+ int m_fd;
+ size_t m_min_bytes;
+ size_t m_max_bytes;
+ bufferlist m_bl;
+ bufferlist::iterator m_bl_it;
+
+};
+
+} // namespace rbd_replay
+
+#endif // CEPH_RBD_REPLAY_BUFFER_READER_H
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#include "Deser.hpp"
-#include <arpa/inet.h>
-#include <cstdlib>
-#include <endian.h>
-
-
-rbd_replay::Deser::Deser(std::istream &in)
- : m_in(in) {
-}
-
-uint8_t rbd_replay::Deser::read_uint8_t() {
- uint8_t data;
- m_in.read(reinterpret_cast<char*>(&data), sizeof(data));
- return data;
-}
-
-uint16_t rbd_replay::Deser::read_uint16_t() {
- uint16_t data;
- m_in.read(reinterpret_cast<char*>(&data), sizeof(data));
- return ntohs(data);
-}
-
-uint32_t rbd_replay::Deser::read_uint32_t() {
- uint32_t data;
- m_in.read(reinterpret_cast<char*>(&data), sizeof(data));
- return ntohl(data);
-}
-
-uint64_t rbd_replay::Deser::read_uint64_t() {
- uint64_t data;
- m_in.read(reinterpret_cast<char*>(&data), sizeof(data));
-#if __BYTE_ORDER == __LITTLE_ENDIAN
- data = (static_cast<uint64_t>(ntohl(data)) << 32 | ntohl(data >> 32));
-#endif
- return data;
-}
-
-std::string rbd_replay::Deser::read_string() {
- uint32_t length = read_uint32_t();
- char* data = reinterpret_cast<char*>(malloc(length));
- m_in.read(data, length);
- std::string s(data, length);
- free(data);
- return s;
-}
-
-bool rbd_replay::Deser::read_bool() {
- return read_uint8_t() != 0;
-}
-
-bool rbd_replay::Deser::eof() {
- return m_in.eof();
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#ifndef _INCLUDED_RBD_REPLAY_DESER_HPP
-#define _INCLUDED_RBD_REPLAY_DESER_HPP
-
-#include <iostream>
-#include <stdint.h>
-
-namespace rbd_replay {
-
-/**
- Helper for deserializing data in an architecture-indepdendent way.
- Everything is read big-endian.
- @see Ser
-*/
-class Deser {
-public:
- Deser(std::istream &in);
-
- uint8_t read_uint8_t();
-
- uint16_t read_uint16_t();
-
- uint32_t read_uint32_t();
-
- uint64_t read_uint64_t();
-
- std::string read_string();
-
- bool read_bool();
-
- bool eof();
-
-private:
- std::istream &m_in;
-};
-
-}
-
-#endif
if WITH_RADOS
if WITH_RBD
+librbd_replay_types_la_SOURCES = \
+ rbd_replay/ActionTypes.cc
+noinst_HEADERS += \
+ rbd_replay/ActionTypes.h
+noinst_LTLIBRARIES += librbd_replay_types.la
+DENCODER_DEPS += librbd_replay_types.la
+
# librbd_replay_la exists only to help with unit tests
-librbd_replay_la_SOURCES = rbd_replay/actions.cc \
- rbd_replay/Deser.cc \
+librbd_replay_la_SOURCES = \
+ rbd_replay/actions.cc \
+ rbd_replay/BufferReader.cc \
rbd_replay/ImageNameMap.cc \
rbd_replay/PendingIO.cc \
rbd_replay/rbd_loc.cc \
- rbd_replay/Replayer.cc \
- rbd_replay/Ser.cc
-librbd_replay_la_LIBADD = $(LIBRBD) \
+ rbd_replay/Replayer.cc
+librbd_replay_la_LIBADD = \
+ $(LIBRBD) \
$(LIBRADOS) \
$(CEPH_GLOBAL)
noinst_LTLIBRARIES += librbd_replay.la
-noinst_HEADERS += rbd_replay/BoundedBuffer.hpp \
+noinst_HEADERS += \
rbd_replay/actions.hpp \
- rbd_replay/Deser.hpp \
+ rbd_replay/BoundedBuffer.hpp \
+ rbd_replay/BufferReader.h \
rbd_replay/ImageNameMap.hpp \
rbd_replay/ios.hpp \
rbd_replay/PendingIO.hpp \
rbd_replay/rbd_loc.hpp \
rbd_replay/rbd_replay_debug.hpp \
- rbd_replay/Replayer.hpp \
- rbd_replay/Ser.hpp
-
+ rbd_replay/Replayer.hpp
-rbd_replay_SOURCES = rbd_replay/rbd-replay.cc
-rbd_replay_LDADD = $(LIBRBD) \
+rbd_replay_SOURCES = \
+ rbd_replay/rbd-replay.cc
+rbd_replay_LDADD = \
+ librbd_replay.la \
+ librbd_replay_types.la \
+ $(LIBRBD) \
$(LIBRADOS) \
$(CEPH_GLOBAL) \
- librbd_replay.la
+ $(LIBCOMMON)
if LINUX
bin_PROGRAMS += rbd-replay
librbd_replay.la
noinst_LTLIBRARIES += librbd_replay_ios.la
-rbd_replay_prep_SOURCES = rbd_replay/rbd-replay-prep.cc
-rbd_replay_prep_LDADD = $(LIBRBD) \
- $(LIBRADOS) \
- $(CEPH_GLOBAL) \
+rbd_replay_prep_SOURCES = \
+ rbd_replay/rbd-replay-prep.cc
+rbd_replay_prep_LDADD = \
librbd_replay.la \
librbd_replay_ios.la \
+ librbd_replay_types.la \
+ $(LIBRBD) \
+ $(LIBRADOS) \
+ $(CEPH_GLOBAL) \
+ $(LIBCOMMON) \
-lbabeltrace \
-lbabeltrace-ctf \
-lboost_date_time
#include "Replayer.hpp"
#include "common/errno.h"
+#include "rbd_replay/ActionTypes.h"
+#include "rbd_replay/BufferReader.h"
#include <boost/foreach.hpp>
#include <boost/thread/thread.hpp>
+#include <boost/scope_exit.hpp>
#include <fstream>
#include "global/global_context.h"
#include "rbd_replay_debug.hpp"
using namespace std;
using namespace rbd_replay;
+namespace {
+
+bool is_versioned_replay(BufferReader &buffer_reader) {
+ bufferlist::iterator *it;
+ int r = buffer_reader.fetch(&it);
+ if (r < 0) {
+ return false;
+ }
+
+ if (it->get_remaining() < action::BANNER.size()) {
+ return false;
+ }
+
+ std::string banner;
+ it->copy(action::BANNER.size(), banner);
+ bool versioned = (banner == action::BANNER);
+ if (!versioned) {
+ it->seek(0);
+ }
+ return versioned;
+}
+
+} // anonymous namespace
Worker::Worker(Replayer &replayer)
: m_replayer(replayer),
m_rbd = new librbd::RBD();
map<thread_id_t, Worker*> workers;
- ifstream input(replay_file.c_str(), ios::in | ios::binary);
- if (!input.is_open()) {
- cerr << "Failed to open " << replay_file << std::endl;
- exit(1);
+ int fd = open(replay_file.c_str(), O_RDONLY);
+ if (fd < 0) {
+ std::cerr << "Failed to open " << replay_file << ": "
+ << cpp_strerror(errno) << std::endl;
+ exit(1);
}
+ BOOST_SCOPE_EXIT( (fd) ) {
+ close(fd);
+ } BOOST_SCOPE_EXIT_END;
- Deser deser(input);
+ BufferReader buffer_reader(fd);
+ bool versioned = is_versioned_replay(buffer_reader);
while (true) {
- Action::ptr action = Action::read_from(deser);
+ action::ActionEntry action_entry;
+ try {
+ bufferlist::iterator *it;
+ int r = buffer_reader.fetch(&it);
+ if (r < 0) {
+ std::cerr << "Failed to read from trace file: " << cpp_strerror(r)
+ << std::endl;
+ exit(-r);
+ }
+
+ if (versioned) {
+ action_entry.decode(*it);
+ } else {
+ action_entry.decode_unversioned(*it);
+ }
+ } catch (const buffer::error &err) {
+ std::cerr << "Failed to decode trace action" << std::endl;
+ exit(1);
+ }
+
+ Action::ptr action = Action::construct(action_entry);
if (!action) {
- break;
+ // unknown / unsupported action
+ continue;
}
+
if (action->is_start_thread()) {
Worker *worker = new Worker(*this);
workers[action->thread_id()] = worker;
return tracker.actions.count(id) > 0;
}
-void Replayer::wait_for_actions(const vector<dependency_d> &deps) {
+void Replayer::wait_for_actions(const action::Dependencies &deps) {
boost::posix_time::ptime release_time(boost::posix_time::neg_infin);
- BOOST_FOREACH(const dependency_d &dep, deps) {
+ BOOST_FOREACH(const action::Dependency &dep, deps) {
dout(DEPGRAPH_LEVEL) << "Waiting for " << dep.id << dendl;
boost::system_time start_time(boost::get_system_time());
action_tracker_d &tracker = tracker_for(dep.id);
#include <boost/thread/mutex.hpp>
#include <boost/thread/shared_mutex.hpp>
+#include "rbd_replay/ActionTypes.h"
#include "BoundedBuffer.hpp"
#include "ImageNameMap.hpp"
#include "PendingIO.hpp"
bool is_action_complete(action_id_t id);
- void wait_for_actions(const std::vector<dependency_d> &deps);
+ void wait_for_actions(const action::Dependencies &deps);
std::string pool_name() const;
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#include "Ser.hpp"
-#include <arpa/inet.h>
-#include <cstdlib>
-#include <endian.h>
-
-
-rbd_replay::Ser::Ser(std::ostream &out)
- : m_out(out) {
-}
-
-void rbd_replay::Ser::write_uint8_t(uint8_t data) {
- m_out.write(reinterpret_cast<char*>(&data), sizeof(data));
-}
-
-void rbd_replay::Ser::write_uint16_t(uint16_t data) {
- data = htons(data);
- m_out.write(reinterpret_cast<char*>(&data), sizeof(data));
-}
-
-void rbd_replay::Ser::write_uint32_t(uint32_t data) {
- data = htonl(data);
- m_out.write(reinterpret_cast<char*>(&data), sizeof(data));
-}
-
-void rbd_replay::Ser::write_uint64_t(uint64_t data) {
-#if __BYTE_ORDER == __LITTLE_ENDIAN
- data = (static_cast<uint64_t>(htonl(data)) << 32 | htonl(data >> 32));
-#endif
- m_out.write(reinterpret_cast<char*>(&data), sizeof(data));
-}
-
-void rbd_replay::Ser::write_string(const std::string& data) {
- write_uint32_t(data.length());
- m_out.write(data.data(), data.length());
-}
-
-void rbd_replay::Ser::write_bool(bool data) {
- write_uint8_t(data ? 1 : 0);
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#ifndef _INCLUDED_RBD_REPLAY_SER_HPP
-#define _INCLUDED_RBD_REPLAY_SER_HPP
-
-#include <iostream>
-#include <stdint.h>
-
-namespace rbd_replay {
-
-/**
- Helper for serializing data in an architecture-indepdendent way.
- Everything is written big-endian.
- @see Deser
-*/
-class Ser {
-public:
- Ser(std::ostream &out);
-
- void write_uint8_t(uint8_t);
-
- void write_uint16_t(uint16_t);
-
- void write_uint32_t(uint32_t);
-
- void write_uint64_t(uint64_t);
-
- void write_string(const std::string&);
-
- void write_bool(bool b);
-
-private:
- std::ostream &m_out;
-};
-
-}
-
-#endif
using namespace rbd_replay;
using namespace std;
+namespace {
-Action::Action(action_id_t id,
- thread_id_t thread_id,
- std::vector<dependency_d> &predecessors)
- : m_id(id),
- m_thread_id(thread_id),
- m_predecessors(predecessors) {
- }
-
-Action::~Action() {
+std::string create_fake_data() {
+ char data[1 << 20]; // 1 MB
+ for (unsigned int i = 0; i < sizeof(data); i++) {
+ data[i] = (char) i;
+ }
+ return std::string(data, sizeof(data));
}
-Action::ptr Action::read_from(Deser &d) {
- uint8_t type = d.read_uint8_t();
- if (d.eof()) {
- return Action::ptr();
- }
- uint32_t ionum = d.read_uint32_t();
- uint64_t thread_id = d.read_uint64_t();
- d.read_uint32_t(); // unused
- d.read_uint32_t(); // unused
- uint32_t num_dependencies = d.read_uint32_t();
- vector<dependency_d> deps;
- for (unsigned int i = 0; i < num_dependencies; i++) {
- uint32_t dep_id = d.read_uint32_t();
- uint64_t time_delta = d.read_uint64_t();
- deps.push_back(dependency_d(dep_id, time_delta));
+struct ConstructVisitor : public boost::static_visitor<Action::ptr> {
+ inline Action::ptr operator()(const action::StartThreadAction &action) const {
+ return Action::ptr(new StartThreadAction(action));
}
- DummyAction dummy(ionum, thread_id, deps);
- switch (type) {
- case IO_START_THREAD:
- return StartThreadAction::read_from(dummy, d);
- case IO_STOP_THREAD:
- return StopThreadAction::read_from(dummy, d);
- case IO_READ:
- return ReadAction::read_from(dummy, d);
- case IO_WRITE:
- return WriteAction::read_from(dummy, d);
- case IO_ASYNC_READ:
- return AioReadAction::read_from(dummy, d);
- case IO_ASYNC_WRITE:
- return AioWriteAction::read_from(dummy, d);
- case IO_OPEN_IMAGE:
- return OpenImageAction::read_from(dummy, d);
- case IO_CLOSE_IMAGE:
- return CloseImageAction::read_from(dummy, d);
- default:
- cerr << "Invalid action type: " << type << std::endl;
- exit(1);
+
+ inline Action::ptr operator()(const action::StopThreadAction &action) const{
+ return Action::ptr(new StopThreadAction(action));
}
-}
-std::ostream& Action::dump_action_fields(std::ostream& o) const {
- o << "id=" << m_id << ", thread_id=" << m_thread_id << ", predecessors=[";
- bool first = true;
- BOOST_FOREACH(const dependency_d &d, m_predecessors) {
- if (!first) {
- o << ",";
- }
- o << d.id;
- first = false;
+ inline Action::ptr operator()(const action::ReadAction &action) const {
+ return Action::ptr(new ReadAction(action));
}
- return o << "]";
-}
-std::ostream& rbd_replay::operator<<(std::ostream& o, const Action& a) {
- return a.dump(o);
-}
+ inline Action::ptr operator()(const action::AioReadAction &action) const {
+ return Action::ptr(new AioReadAction(action));
+ }
+ inline Action::ptr operator()(const action::WriteAction &action) const {
+ return Action::ptr(new WriteAction(action));
+ }
-std::ostream& DummyAction::dump(std::ostream& o) const {
- o << "DummyAction[";
- dump_action_fields(o);
- return o << "]";
-}
+ inline Action::ptr operator()(const action::AioWriteAction &action) const {
+ return Action::ptr(new AioWriteAction(action));
+ }
+ inline Action::ptr operator()(const action::OpenImageAction &action) const {
+ return Action::ptr(new OpenImageAction(action));
+ }
-StartThreadAction::StartThreadAction(Action &src)
- : Action(src) {
-}
+ inline Action::ptr operator()(const action::CloseImageAction &action) const {
+ return Action::ptr(new CloseImageAction(action));
+ }
-void StartThreadAction::perform(ActionCtx &ctx) {
- cerr << "StartThreadAction should never actually be performed" << std::endl;
- exit(1);
-}
+ inline Action::ptr operator()(const action::UnknownAction &action) const {
+ return Action::ptr();
+ }
+};
-bool StartThreadAction::is_start_thread() {
- return true;
-}
+} // anonymous namespace
-Action::ptr StartThreadAction::read_from(Action &src, Deser &d) {
- return Action::ptr(new StartThreadAction(src));
+std::ostream& rbd_replay::operator<<(std::ostream& o, const Action& a) {
+ return a.dump(o);
}
-std::ostream& StartThreadAction::dump(std::ostream& o) const {
- o << "StartThreadAction[";
- dump_action_fields(o);
- return o << "]";
+Action::ptr Action::construct(const action::ActionEntry &action_entry) {
+ return boost::apply_visitor(ConstructVisitor(), action_entry.action);
}
-
-StopThreadAction::StopThreadAction(Action &src)
- : Action(src) {
+void StartThreadAction::perform(ActionCtx &ctx) {
+ cerr << "StartThreadAction should never actually be performed" << std::endl;
+ exit(1);
}
void StopThreadAction::perform(ActionCtx &ctx) {
ctx.stop();
}
-Action::ptr StopThreadAction::read_from(Action &src, Deser &d) {
- return Action::ptr(new StopThreadAction(src));
-}
-
-std::ostream& StopThreadAction::dump(std::ostream& o) const {
- o << "StopThreadAction[";
- dump_action_fields(o);
- return o << "]";
-}
-
-
-AioReadAction::AioReadAction(const Action &src,
- imagectx_id_t imagectx_id,
- uint64_t offset,
- uint64_t length)
- : Action(src),
- m_imagectx_id(imagectx_id),
- m_offset(offset),
- m_length(length) {
- }
-
-Action::ptr AioReadAction::read_from(Action &src, Deser &d) {
- imagectx_id_t imagectx_id = d.read_uint64_t();
- uint64_t offset = d.read_uint64_t();
- uint64_t length = d.read_uint64_t();
- return Action::ptr(new AioReadAction(src, imagectx_id, offset, length));
-}
-
void AioReadAction::perform(ActionCtx &worker) {
dout(ACTION_LEVEL) << "Performing " << *this << dendl;
- librbd::Image *image = worker.get_image(m_imagectx_id);
+ librbd::Image *image = worker.get_image(m_action.imagectx_id);
assert(image);
PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
worker.add_pending(io);
- int r = image->aio_read(m_offset, m_length, io->bufferlist(), &io->completion());
+ int r = image->aio_read(m_action.offset, m_action.length, io->bufferlist(), &io->completion());
assertf(r >= 0, "id = %d, r = %d", id(), r);
}
-std::ostream& AioReadAction::dump(std::ostream& o) const {
- o << "AioReadAction[";
- dump_action_fields(o);
- return o << ", imagectx_id=" << m_imagectx_id << ", offset=" << m_offset << ", length=" << m_length << "]";
-}
-
-
-ReadAction::ReadAction(const Action &src,
- imagectx_id_t imagectx_id,
- uint64_t offset,
- uint64_t length)
- : Action(src),
- m_imagectx_id(imagectx_id),
- m_offset(offset),
- m_length(length) {
- }
-
-Action::ptr ReadAction::read_from(Action &src, Deser &d) {
- imagectx_id_t imagectx_id = d.read_uint64_t();
- uint64_t offset = d.read_uint64_t();
- uint64_t length = d.read_uint64_t();
- return Action::ptr(new ReadAction(src, imagectx_id, offset, length));
-}
-
void ReadAction::perform(ActionCtx &worker) {
dout(ACTION_LEVEL) << "Performing " << *this << dendl;
- librbd::Image *image = worker.get_image(m_imagectx_id);
+ librbd::Image *image = worker.get_image(m_action.imagectx_id);
PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
worker.add_pending(io);
- ssize_t r = image->read(m_offset, m_length, io->bufferlist());
+ ssize_t r = image->read(m_action.offset, m_action.length, io->bufferlist());
assertf(r >= 0, "id = %d, r = %d", id(), r);
worker.remove_pending(io);
}
-std::ostream& ReadAction::dump(std::ostream& o) const {
- o << "ReadAction[";
- dump_action_fields(o);
- return o << ", imagectx_id=" << m_imagectx_id << ", offset=" << m_offset << ", length=" << m_length << "]";
-}
-
-
-AioWriteAction::AioWriteAction(const Action &src,
- imagectx_id_t imagectx_id,
- uint64_t offset,
- uint64_t length)
- : Action(src),
- m_imagectx_id(imagectx_id),
- m_offset(offset),
- m_length(length) {
- }
-
-Action::ptr AioWriteAction::read_from(Action &src, Deser &d) {
- imagectx_id_t imagectx_id = d.read_uint64_t();
- uint64_t offset = d.read_uint64_t();
- uint64_t length = d.read_uint64_t();
- return Action::ptr(new AioWriteAction(src, imagectx_id, offset, length));
-}
-
-static std::string create_fake_data() {
- char data[1 << 20]; // 1 MB
- for (unsigned int i = 0; i < sizeof(data); i++) {
- data[i] = (char) i;
- }
- return std::string(data, sizeof(data));
-}
void AioWriteAction::perform(ActionCtx &worker) {
static const std::string fake_data(create_fake_data());
dout(ACTION_LEVEL) << "Performing " << *this << dendl;
- librbd::Image *image = worker.get_image(m_imagectx_id);
+ librbd::Image *image = worker.get_image(m_action.imagectx_id);
PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
- uint64_t remaining = m_length;
+ uint64_t remaining = m_action.length;
while (remaining > 0) {
uint64_t n = std::min(remaining, (uint64_t)fake_data.length());
io->bufferlist().append(fake_data.data(), n);
if (worker.readonly()) {
worker.remove_pending(io);
} else {
- int r = image->aio_write(m_offset, m_length, io->bufferlist(), &io->completion());
+ int r = image->aio_write(m_action.offset, m_action.length, io->bufferlist(), &io->completion());
assertf(r >= 0, "id = %d, r = %d", id(), r);
}
}
-std::ostream& AioWriteAction::dump(std::ostream& o) const {
- o << "AioWriteAction[";
- dump_action_fields(o);
- return o << ", imagectx_id=" << m_imagectx_id << ", offset=" << m_offset << ", length=" << m_length << "]";
-}
-
-
-WriteAction::WriteAction(const Action &src,
- imagectx_id_t imagectx_id,
- uint64_t offset,
- uint64_t length)
- : Action(src),
- m_imagectx_id(imagectx_id),
- m_offset(offset),
- m_length(length) {
- }
-
-Action::ptr WriteAction::read_from(Action &src, Deser &d) {
- imagectx_id_t imagectx_id = d.read_uint64_t();
- uint64_t offset = d.read_uint64_t();
- uint64_t length = d.read_uint64_t();
- return Action::ptr(new WriteAction(src, imagectx_id, offset, length));
-}
-
void WriteAction::perform(ActionCtx &worker) {
dout(ACTION_LEVEL) << "Performing " << *this << dendl;
- librbd::Image *image = worker.get_image(m_imagectx_id);
+ librbd::Image *image = worker.get_image(m_action.imagectx_id);
PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
worker.add_pending(io);
- io->bufferlist().append_zero(m_length);
+ io->bufferlist().append_zero(m_action.length);
if (!worker.readonly()) {
- ssize_t r = image->write(m_offset, m_length, io->bufferlist());
+ ssize_t r = image->write(m_action.offset, m_action.length, io->bufferlist());
assertf(r >= 0, "id = %d, r = %d", id(), r);
}
worker.remove_pending(io);
}
-std::ostream& WriteAction::dump(std::ostream& o) const {
- o << "WriteAction[";
- dump_action_fields(o);
- return o << ", imagectx_id=" << m_imagectx_id << ", offset=" << m_offset << ", length=" << m_length << "]";
-}
-
-
-OpenImageAction::OpenImageAction(Action &src,
- imagectx_id_t imagectx_id,
- string name,
- string snap_name,
- bool readonly)
- : Action(src),
- m_imagectx_id(imagectx_id),
- m_name(name),
- m_snap_name(snap_name),
- m_readonly(readonly) {
- }
-
-Action::ptr OpenImageAction::read_from(Action &src, Deser &d) {
- imagectx_id_t imagectx_id = d.read_uint64_t();
- string name = d.read_string();
- string snap_name = d.read_string();
- bool readonly = d.read_bool();
- return Action::ptr(new OpenImageAction(src, imagectx_id, name, snap_name, readonly));
-}
-
void OpenImageAction::perform(ActionCtx &worker) {
dout(ACTION_LEVEL) << "Performing " << *this << dendl;
PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
worker.add_pending(io);
librbd::Image *image = new librbd::Image();
librbd::RBD *rbd = worker.rbd();
- rbd_loc name(worker.map_image_name(m_name, m_snap_name));
+ rbd_loc name(worker.map_image_name(m_action.name, m_action.snap_name));
int r;
- if (m_readonly || worker.readonly()) {
+ if (m_action.read_only || worker.readonly()) {
r = rbd->open_read_only(*worker.ioctx(), *image, name.image.c_str(), name.snap.c_str());
} else {
r = rbd->open(*worker.ioctx(), *image, name.image.c_str(), name.snap.c_str());
}
if (r) {
- cerr << "Unable to open image '" << m_name
- << "' with snap '" << m_snap_name
+ cerr << "Unable to open image '" << m_action.name
+ << "' with snap '" << m_action.snap_name
<< "' (mapped to '" << name.str()
- << "') and readonly " << m_readonly
+ << "') and readonly " << m_action.read_only
<< ": (" << -r << ") " << strerror(-r) << std::endl;
exit(1);
}
- worker.put_image(m_imagectx_id, image);
+ worker.put_image(m_action.imagectx_id, image);
worker.remove_pending(io);
}
-std::ostream& OpenImageAction::dump(std::ostream& o) const {
- o << "OpenImageAction[";
- dump_action_fields(o);
- return o << ", imagectx_id=" << m_imagectx_id << ", name='" << m_name << "', snap_name='" << m_snap_name << "', readonly=" << m_readonly << "]";
-}
-
-
-CloseImageAction::CloseImageAction(Action &src,
- imagectx_id_t imagectx_id)
- : Action(src),
- m_imagectx_id(imagectx_id) {
- }
-
-Action::ptr CloseImageAction::read_from(Action &src, Deser &d) {
- imagectx_id_t imagectx_id = d.read_uint64_t();
- return Action::ptr(new CloseImageAction(src, imagectx_id));
-}
-
void CloseImageAction::perform(ActionCtx &worker) {
dout(ACTION_LEVEL) << "Performing " << *this << dendl;
- worker.erase_image(m_imagectx_id);
+ worker.erase_image(m_action.imagectx_id);
worker.set_action_complete(pending_io_id());
}
-std::ostream& CloseImageAction::dump(std::ostream& o) const {
- o << "CloseImageAction[";
- dump_action_fields(o);
- return o << ", imagectx_id=" << m_imagectx_id << "]";
-}
#include <boost/shared_ptr.hpp>
#include "include/rbd/librbd.hpp"
-#include "Deser.hpp"
+#include "common/Formatter.h"
+#include "rbd_replay/ActionTypes.h"
#include "rbd_loc.hpp"
+#include <iostream>
// Stupid Doxygen requires this or else the typedef docs don't appear anywhere.
/// @file rbd_replay/actions.hpp
/// Even IDs are normal actions, odd IDs are completions.
typedef uint32_t action_id_t;
-/**
- Dependencies link actions to earlier actions or completions.
- If an action has a dependency \c d then it waits until \c d.time_delta nanoseconds after the action or completion with ID \c d.id has fired.
-*/
-struct dependency_d {
- /// ID of the action or completion to wait for.
- action_id_t id;
-
- /// Nanoseconds of delay to wait until after the action or completion fires.
- uint64_t time_delta;
-
- /**
- @param id ID of the action or completion to wait for.
- @param time_delta Nanoseconds of delay to wait after the action or completion fires.
- */
- dependency_d(action_id_t id,
- uint64_t time_delta)
- : id(id),
- time_delta(time_delta) {
- }
-};
-
-// These are written to files, so don't change existing assignments.
-enum io_type {
- IO_START_THREAD,
- IO_STOP_THREAD,
- IO_READ,
- IO_WRITE,
- IO_ASYNC_READ,
- IO_ASYNC_WRITE,
- IO_OPEN_IMAGE,
- IO_CLOSE_IMAGE,
-};
-
-
class PendingIO;
-
/**
%Context through which an Action interacts with its environment.
*/
public:
typedef boost::shared_ptr<Action> ptr;
- Action(action_id_t id,
- thread_id_t thread_id,
- std::vector<dependency_d> &predecessors);
-
- virtual ~Action();
+ virtual ~Action() {
+ }
virtual void perform(ActionCtx &ctx) = 0;
/// Returns the ID of the completion corresponding to this action.
action_id_t pending_io_id() {
- return m_id + 1;
+ return id() + 1;
}
// There's probably a better way to do this, but oh well.
return false;
}
- action_id_t id() const {
- return m_id;
- }
+ virtual action_id_t id() const = 0;
+ virtual thread_id_t thread_id() const = 0;
+ virtual const action::Dependencies& predecessors() const = 0;
+
+ virtual std::ostream& dump(std::ostream& o) const = 0;
+
+ static ptr construct(const action::ActionEntry &action_entry);
+};
- thread_id_t thread_id() const {
- return m_thread_id;
+template <typename ActionType>
+class TypedAction : public Action {
+public:
+ TypedAction(const ActionType &action) : m_action(action) {
}
- const std::vector<dependency_d>& predecessors() const {
- return m_predecessors;
+ virtual action_id_t id() const {
+ return m_action.id;
}
- /// Reads and constructs an action from the replay file.
- static ptr read_from(Deser &d);
+ virtual thread_id_t thread_id() const {
+ return m_action.thread_id;
+ }
-protected:
- std::ostream& dump_action_fields(std::ostream& o) const;
+ virtual const action::Dependencies& predecessors() const {
+ return m_action.dependencies;
+ }
-private:
- friend std::ostream& operator<<(std::ostream&, const Action&);
+ virtual std::ostream& dump(std::ostream& o) const {
+ o << get_action_name() << ": ";
+ ceph::JSONFormatter formatter(false);
+ formatter.open_object_section("");
+ m_action.dump(&formatter);
+ formatter.close_section();
+ formatter.flush(o);
+ return o;
+ }
- virtual std::ostream& dump(std::ostream& o) const = 0;
+protected:
+ const ActionType m_action;
- const action_id_t m_id;
- const thread_id_t m_thread_id;
- const std::vector<dependency_d> m_predecessors;
+ virtual const char *get_action_name() const = 0;
};
/// Writes human-readable debug information about the action to the stream.
/// @related Action
std::ostream& operator<<(std::ostream& o, const Action& a);
-
-/**
- Placeholder for partially-constructed actions.
- Does nothing, and does not appear in the replay file.
- */
-class DummyAction : public Action {
+class StartThreadAction : public TypedAction<action::StartThreadAction> {
public:
- DummyAction(action_id_t id,
- thread_id_t thread_id,
- std::vector<dependency_d> &predecessors)
- : Action(id, thread_id, predecessors) {
+ explicit StartThreadAction(const action::StartThreadAction &action)
+ : TypedAction<action::StartThreadAction>(action) {
}
- void perform(ActionCtx &ctx) {
+ virtual bool is_start_thread() {
+ return true;
}
+ virtual void perform(ActionCtx &ctx);
-private:
- std::ostream& dump(std::ostream& o) const;
-};
-
-
-class StopThreadAction : public Action {
-public:
- explicit StopThreadAction(Action &src);
-
- void perform(ActionCtx &ctx);
-
- static Action::ptr read_from(Action &src, Deser &d);
-
-private:
- std::ostream& dump(std::ostream& o) const;
+protected:
+ virtual const char *get_action_name() const {
+ return "StartThreadAction";
+ }
};
-
-class AioReadAction : public Action {
+class StopThreadAction : public TypedAction<action::StopThreadAction> {
public:
- AioReadAction(const Action &src,
- imagectx_id_t imagectx_id,
- uint64_t offset,
- uint64_t length);
-
- void perform(ActionCtx &ctx);
-
- static Action::ptr read_from(Action &src, Deser &d);
+ explicit StopThreadAction(const action::StopThreadAction &action)
+ : TypedAction<action::StopThreadAction>(action) {
+ }
-private:
- std::ostream& dump(std::ostream& o) const;
+ virtual void perform(ActionCtx &ctx);
- imagectx_id_t m_imagectx_id;
- uint64_t m_offset;
- uint64_t m_length;
+protected:
+ virtual const char *get_action_name() const {
+ return "StartThreadAction";
+ }
};
-class ReadAction : public Action {
+class AioReadAction : public TypedAction<action::AioReadAction> {
public:
- ReadAction(const Action &src,
- imagectx_id_t imagectx_id,
- uint64_t offset,
- uint64_t length);
-
- void perform(ActionCtx &ctx);
-
- static Action::ptr read_from(Action &src, Deser &d);
+ AioReadAction(const action::AioReadAction &action)
+ : TypedAction<action::AioReadAction>(action) {
+ }
-private:
- std::ostream& dump(std::ostream& o) const;
+ virtual void perform(ActionCtx &ctx);
- imagectx_id_t m_imagectx_id;
- uint64_t m_offset;
- uint64_t m_length;
+protected:
+ virtual const char *get_action_name() const {
+ return "AioReadAction";
+ }
};
-class AioWriteAction : public Action {
+class ReadAction : public TypedAction<action::ReadAction> {
public:
- AioWriteAction(const Action &src,
- imagectx_id_t imagectx_id,
- uint64_t offset,
- uint64_t length);
-
- void perform(ActionCtx &ctx);
-
- static Action::ptr read_from(Action &src, Deser &d);
+ ReadAction(const action::ReadAction &action)
+ : TypedAction<action::ReadAction>(action) {
+ }
-private:
- std::ostream& dump(std::ostream& o) const;
+ virtual void perform(ActionCtx &ctx);
- imagectx_id_t m_imagectx_id;
- uint64_t m_offset;
- uint64_t m_length;
+protected:
+ virtual const char *get_action_name() const {
+ return "ReadAction";
+ }
};
-class WriteAction : public Action {
+class AioWriteAction : public TypedAction<action::AioWriteAction> {
public:
- WriteAction(const Action &src,
- imagectx_id_t imagectx_id,
- uint64_t offset,
- uint64_t length);
-
- void perform(ActionCtx &ctx);
-
- static Action::ptr read_from(Action &src, Deser &d);
+ AioWriteAction(const action::AioWriteAction &action)
+ : TypedAction<action::AioWriteAction>(action) {
+ }
-private:
- std::ostream& dump(std::ostream& o) const;
+ virtual void perform(ActionCtx &ctx);
- imagectx_id_t m_imagectx_id;
- uint64_t m_offset;
- uint64_t m_length;
+protected:
+ virtual const char *get_action_name() const {
+ return "AioWriteAction";
+ }
};
-class OpenImageAction : public Action {
+class WriteAction : public TypedAction<action::WriteAction> {
public:
- OpenImageAction(Action &src,
- imagectx_id_t imagectx_id,
- std::string name,
- std::string snap_name,
- bool readonly);
-
- void perform(ActionCtx &ctx);
-
- static Action::ptr read_from(Action &src, Deser &d);
+ WriteAction(const action::WriteAction &action)
+ : TypedAction<action::WriteAction>(action) {
+ }
-private:
- std::ostream& dump(std::ostream& o) const;
+ virtual void perform(ActionCtx &ctx);
- imagectx_id_t m_imagectx_id;
- std::string m_name;
- std::string m_snap_name;
- bool m_readonly;
+protected:
+ virtual const char *get_action_name() const {
+ return "WriteAction";
+ }
};
-class CloseImageAction : public Action {
+class OpenImageAction : public TypedAction<action::OpenImageAction> {
public:
- CloseImageAction(Action &src,
- imagectx_id_t imagectx_id);
-
- void perform(ActionCtx &ctx);
-
- static Action::ptr read_from(Action &src, Deser &d);
+ OpenImageAction(const action::OpenImageAction &action)
+ : TypedAction<action::OpenImageAction>(action) {
+ }
-private:
- std::ostream& dump(std::ostream& o) const;
+ virtual void perform(ActionCtx &ctx);
- imagectx_id_t m_imagectx_id;
+protected:
+ virtual const char *get_action_name() const {
+ return "OpenImageAction";
+ }
};
-class StartThreadAction : public Action {
+class CloseImageAction : public TypedAction<action::CloseImageAction> {
public:
- explicit StartThreadAction(Action &src);
-
- void perform(ActionCtx &ctx);
-
- bool is_start_thread();
+ CloseImageAction(const action::CloseImageAction &action)
+ : TypedAction<action::CloseImageAction>(action) {
+ }
- static Action::ptr read_from(Action &src, Deser &d);
+ virtual void perform(ActionCtx &ctx);
-private:
- std::ostream& dump(std::ostream& o) const;
+protected:
+ virtual const char *get_action_name() const {
+ return "CloseImageAction";
+ }
};
}
// In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b.
#include "ios.hpp"
+#include "rbd_replay/ActionTypes.h"
using namespace std;
using namespace rbd_replay;
-bool rbd_replay::compare_io_ptrs_by_start_time(IO::ptr p1, IO::ptr p2) {
- return p1->start_time() < p2->start_time();
+namespace {
+
+bool compare_dependencies_by_start_time(const action::Dependency &lhs,
+ const action::Dependency &rhs) {
+ return lhs.time_delta < rhs.time_delta;
+}
+
+action::Dependencies convert_dependencies(uint64_t start_time,
+ const io_set_t &deps) {
+ action::Dependencies action_deps;
+ action_deps.reserve(deps.size());
+ for (io_set_t::const_iterator it = deps.begin(); it != deps.end(); ++it) {
+ boost::shared_ptr<IO> io = *it;
+ uint64_t time_delta = 0;
+ if (start_time >= io->start_time()) {
+ time_delta = start_time - io->start_time();
+ }
+ action_deps.push_back(action::Dependency(io->ionum(), time_delta));
+ }
+ std::sort(action_deps.begin(), action_deps.end(),
+ compare_dependencies_by_start_time);
+ return action_deps;
}
+} // anonymous namespace
+
void IO::write_debug_base(ostream& out, string type) const {
out << m_ionum << ": " << m_start_time / 1000000.0 << ": " << type << ", thread = " << m_thread_id << ", deps = {";
bool first = true;
}
-void IO::write_to(Ser& out, io_type iotype) const {
- // TODO break compatibility now to add version (and yank unused fields)?
- out.write_uint8_t(iotype);
- out.write_uint32_t(m_ionum);
- out.write_uint64_t(m_thread_id);
- out.write_uint32_t(0);
- out.write_uint32_t(0);
- out.write_uint32_t(m_dependencies.size());
- vector<IO::ptr> deps;
- for (io_set_t::const_iterator itr = m_dependencies.begin(), end = m_dependencies.end(); itr != end; ++itr) {
- deps.push_back(*itr);
- }
- sort(deps.begin(), deps.end(), compare_io_ptrs_by_start_time);
- for (vector<IO::ptr>::const_iterator itr = deps.begin(), end = deps.end(); itr != end; ++itr) {
- out.write_uint32_t((*itr)->m_ionum);
- out.write_uint64_t(m_start_time - (*itr)->m_start_time);
- }
-}
-
ostream& operator<<(ostream& out, IO::ptr io) {
io->write_debug(out);
return out;
}
-void StartThreadIO::write_to(Ser& out) const {
- IO::write_to(out, IO_START_THREAD);
+void StartThreadIO::encode(bufferlist &bl) const {
+ action::Action action((action::StartThreadAction(
+ ionum(), thread_id(), convert_dependencies(start_time(), dependencies()))));
+ ::encode(action, bl);
}
void StartThreadIO::write_debug(std::ostream& out) const {
write_debug_base(out, "start thread");
}
-void StopThreadIO::write_to(Ser& out) const {
- IO::write_to(out, IO_STOP_THREAD);
+void StopThreadIO::encode(bufferlist &bl) const {
+ action::Action action((action::StopThreadAction(
+ ionum(), thread_id(), convert_dependencies(start_time(), dependencies()))));
+ ::encode(action, bl);
}
void StopThreadIO::write_debug(std::ostream& out) const {
write_debug_base(out, "stop thread");
}
-void ReadIO::write_to(Ser& out) const {
- IO::write_to(out, IO_READ);
- out.write_uint64_t(m_imagectx);
- out.write_uint64_t(m_offset);
- out.write_uint64_t(m_length);
+void ReadIO::encode(bufferlist &bl) const {
+ action::Action action((action::ReadAction(
+ ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
+ m_imagectx, m_offset, m_length)));
+ ::encode(action, bl);
}
void ReadIO::write_debug(std::ostream& out) const {
out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
}
-void WriteIO::write_to(Ser& out) const {
- IO::write_to(out, IO_WRITE);
- out.write_uint64_t(m_imagectx);
- out.write_uint64_t(m_offset);
- out.write_uint64_t(m_length);
+void WriteIO::encode(bufferlist &bl) const {
+ action::Action action((action::WriteAction(
+ ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
+ m_imagectx, m_offset, m_length)));
+ ::encode(action, bl);
}
void WriteIO::write_debug(std::ostream& out) const {
out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
}
-void AioReadIO::write_to(Ser& out) const {
- IO::write_to(out, IO_ASYNC_READ);
- out.write_uint64_t(m_imagectx);
- out.write_uint64_t(m_offset);
- out.write_uint64_t(m_length);
+void AioReadIO::encode(bufferlist &bl) const {
+ action::Action action((action::AioReadAction(
+ ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
+ m_imagectx, m_offset, m_length)));
+ ::encode(action, bl);
}
void AioReadIO::write_debug(std::ostream& out) const {
out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
}
-void AioWriteIO::write_to(Ser& out) const {
- IO::write_to(out, IO_ASYNC_WRITE);
- out.write_uint64_t(m_imagectx);
- out.write_uint64_t(m_offset);
- out.write_uint64_t(m_length);
+void AioWriteIO::encode(bufferlist &bl) const {
+ action::Action action((action::AioWriteAction(
+ ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
+ m_imagectx, m_offset, m_length)));
+ ::encode(action, bl);
}
void AioWriteIO::write_debug(std::ostream& out) const {
out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]";
}
-void OpenImageIO::write_to(Ser& out) const {
- IO::write_to(out, IO_OPEN_IMAGE);
- out.write_uint64_t(m_imagectx);
- out.write_string(m_name);
- out.write_string(m_snap_name);
- out.write_bool(m_readonly);
+void OpenImageIO::encode(bufferlist &bl) const {
+ action::Action action((action::OpenImageAction(
+ ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
+ m_imagectx, m_name, m_snap_name, m_readonly)));
+ ::encode(action, bl);
}
void OpenImageIO::write_debug(std::ostream& out) const {
out << ", imagectx=" << m_imagectx << ", name='" << m_name << "', snap_name='" << m_snap_name << "', readonly=" << m_readonly;
}
-void CloseImageIO::write_to(Ser& out) const {
- IO::write_to(out, IO_CLOSE_IMAGE);
- out.write_uint64_t(m_imagectx);
+void CloseImageIO::encode(bufferlist &bl) const {
+ action::Action action((action::CloseImageAction(
+ ionum(), thread_id(), convert_dependencies(start_time(), dependencies()),
+ m_imagectx)));
+ ::encode(action, bl);
}
void CloseImageIO::write_debug(std::ostream& out) const {
// This code assumes that IO IDs and timestamps are related monotonically.
// In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b.
+#include "include/buffer.h"
#include <boost/enable_shared_from_this.hpp>
#include <boost/shared_ptr.hpp>
#include <iostream>
#include <set>
#include <vector>
#include "actions.hpp"
-#include "Ser.hpp"
namespace rbd_replay {
return m_dependencies;
}
- virtual void write_to(Ser& out) const = 0;
+ virtual void encode(bufferlist &bl) const = 0;
void set_ionum(action_id_t ionum) {
m_ionum = ionum;
return m_ionum;
}
+ thread_id_t thread_id() const {
+ return m_thread_id;
+ }
+
virtual void write_debug(std::ostream& out) const = 0;
protected:
- void write_to(Ser& out, io_type iotype) const;
-
void write_debug_base(std::ostream& out, std::string iotype) const;
private:
: IO(ionum, start_time, thread_id, io_set_t()) {
}
- void write_to(Ser& out) const;
+ virtual void encode(bufferlist &bl) const;
void write_debug(std::ostream& out) const;
};
: IO(ionum, start_time, thread_id, deps) {
}
- void write_to(Ser& out) const;
+ virtual void encode(bufferlist &bl) const;
void write_debug(std::ostream& out) const;
};
m_length(length) {
}
- void write_to(Ser& out) const;
+ virtual void encode(bufferlist &bl) const;
void write_debug(std::ostream& out) const;
m_length(length) {
}
- void write_to(Ser& out) const;
+ virtual void encode(bufferlist &bl) const;
void write_debug(std::ostream& out) const;
m_length(length) {
}
- void write_to(Ser& out) const;
+ virtual void encode(bufferlist &bl) const;
void write_debug(std::ostream& out) const;
m_length(length) {
}
- void write_to(Ser& out) const;
+ virtual void encode(bufferlist &bl) const;
void write_debug(std::ostream& out) const;
m_readonly(readonly) {
}
- void write_to(Ser& out) const;
+ virtual void encode(bufferlist &bl) const;
imagectx_id_t imagectx() const {
return m_imagectx;
m_imagectx(imagectx) {
}
- void write_to(Ser& out) const;
+ virtual void encode(bufferlist &bl) const;
imagectx_id_t imagectx() const {
return m_imagectx;
imagectx_id_t m_imagectx;
};
-/// @related IO
-bool compare_io_ptrs_by_start_time(IO::ptr p1, IO::ptr p2);
-
}
#endif
// This code assumes that IO IDs and timestamps are related monotonically.
// In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b.
+#include "common/errno.h"
+#include "rbd_replay/ActionTypes.h"
#include <babeltrace/babeltrace.h>
#include <babeltrace/ctf/events.h>
#include <babeltrace/ctf/iterator.h>
+#include <sys/types.h>
+#include <fcntl.h>
#include <cstdlib>
#include <string>
#include <assert.h>
#include <fstream>
#include <set>
#include <boost/thread/thread.hpp>
+#include <boost/scope_exit.hpp>
#include "ios.hpp"
using namespace std;
struct bt_iter *bt_itr = bt_ctf_get_iter(itr);
- ofstream myfile;
- myfile.open(output_file_name.c_str(), ios::out | ios::binary | ios::trunc);
- ASSERT_EXIT(!myfile.fail(), "Error opening output file " <<
- output_file_name);
+ int fd = open(output_file_name.c_str(), O_WRONLY | O_CREAT | O_EXCL, 0644);
+ ASSERT_EXIT(fd >= 0, "Error opening output file " << output_file_name <<
+ ": " << cpp_strerror(errno));
+ BOOST_SCOPE_EXIT( (fd) ) {
+ close(fd);
+ } BOOST_SCOPE_EXIT_END;
- Ser ser(myfile);
+ write_banner(fd);
uint64_t trace_start = 0;
bool first = true;
IO::ptrs ptrs;
process_event(ts, evt, &ptrs);
- serialize_events(ser, ptrs);
+ serialize_events(fd, ptrs);
int r = bt_iter_next(bt_itr);
ASSERT_EXIT(r == 0, "Error advancing event iterator");
bt_ctf_iter_destroy(itr);
- insert_thread_stops(ser);
- myfile.close();
+ insert_thread_stops(fd);
}
private:
- void serialize_events(Ser &ser, const IO::ptrs &ptrs) {
+ void write_banner(int fd) {
+ bufferlist bl;
+ bl.append(rbd_replay::action::BANNER);
+ int r = bl.write_fd(fd);
+ ASSERT_EXIT(r >= 0, "Error writing to output file: " << cpp_strerror(r));
+ }
+
+ void serialize_events(int fd, const IO::ptrs &ptrs) {
for (IO::ptrs::const_iterator it = ptrs.begin(); it != ptrs.end(); ++it) {
IO::ptr io(*it);
- io->write_to(ser);
+
+ bufferlist bl;
+ io->encode(bl);
+
+ int r = bl.write_fd(fd);
+ ASSERT_EXIT(r >= 0, "Error writing to output file: " << cpp_strerror(r));
if (m_verbose) {
io->write_debug(std::cout);
}
}
- void insert_thread_stops(Ser &ser) {
+ void insert_thread_stops(int fd) {
IO::ptrs ios;
for (map<thread_id_t, Thread::ptr>::const_iterator itr = m_threads.begin(),
end = m_threads.end(); itr != end; ++itr) {
thread->id(),
m_recent_completions)));
}
- serialize_events(ser, ios);
+ serialize_events(fd, ios);
}
void process_event(uint64_t ts, struct bt_ctf_event *evt,