From: Adam Crume Date: Tue, 19 Aug 2014 23:04:40 +0000 (-0700) Subject: rbd_replay: Add unit test for batch_unreachable_from X-Git-Tag: v0.86~231^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=552c4b4479aa5c2a81df7bdf317f1884ed6c0a6c;p=ceph.git rbd_replay: Add unit test for batch_unreachable_from This requires a fair amount of code reorganization, since the types in the function signature were previously not in a header file. Signed-off-by: Adam Crume --- diff --git a/src/rbd_replay/Makefile.am b/src/rbd_replay/Makefile.am index 9420afdbab4a..f1f406f24350 100644 --- a/src/rbd_replay/Makefile.am +++ b/src/rbd_replay/Makefile.am @@ -14,6 +14,7 @@ noinst_HEADERS += rbd_replay/BoundedBuffer.hpp \ rbd_replay/actions.hpp \ rbd_replay/Deser.hpp \ rbd_replay/ImageNameMap.hpp \ + rbd_replay/ios.hpp \ rbd_replay/PendingIO.hpp \ rbd_replay/rbd_loc.hpp \ rbd_replay/rbd_replay_debug.hpp \ @@ -31,12 +32,19 @@ if LINUX bin_PROGRAMS += rbd-replay endif #LINUX -# TODO: See if we need any new dependencies +librbd_replay_ios_la_SOURCES = rbd_replay/ios.cc +librbd_replay_ios_la_LIBADD = $(LIBRBD) \ + $(LIBRADOS) \ + $(CEPH_GLOBAL) \ + librbd_replay.la +noinst_LTLIBRARIES += librbd_replay_ios.la + rbd_replay_prep_SOURCES = rbd_replay/rbd-replay-prep.cc rbd_replay_prep_LDADD = $(LIBRBD) \ $(LIBRADOS) \ $(CEPH_GLOBAL) \ librbd_replay.la \ + librbd_replay_ios.la \ -lbabeltrace \ -lbabeltrace-ctf \ -lboost_date_time diff --git a/src/rbd_replay/ios.cc b/src/rbd_replay/ios.cc new file mode 100644 index 000000000000..ccc560f85c81 --- /dev/null +++ b/src/rbd_replay/ios.cc @@ -0,0 +1,273 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +// This code assumes that IO IDs and timestamps are related monotonically. +// In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b. + +#include "ios.hpp" + +using namespace std; +using namespace rbd_replay; + +bool rbd_replay::compare_io_ptrs_by_start_time(IO::ptr p1, IO::ptr p2) { + return p1->start_time() < p2->start_time(); +} + +static uint64_t min_time(const map& s) { + if (s.empty()) { + return 0; + } + return s.begin()->second->start_time(); +} + +static uint64_t max_time(const map& s) { + if (s.empty()) { + return 0; + } + map::const_iterator itr(s.end()); + --itr; + return itr->second->start_time(); +} + +void IO::add_dependencies(const io_set_t& deps) { + io_set_t base(m_dependencies); + for (io_set_t::const_iterator itr = deps.begin(); itr != deps.end(); ++itr) { + ptr dep(*itr); + for (io_set_t::const_iterator itr2 = dep->m_dependencies.begin(); itr2 != dep->m_dependencies.end(); ++itr2) { + base.insert(*itr2); + } + } + batch_unreachable_from(deps, base, &m_dependencies); +} + +void IO::write_debug_base(ostream& out, string type) const { + out << m_ionum << ": " << m_start_time / 1000000.0 << ": " << type << ", thread = " << m_thread_id << ", deps = {"; + bool first = true; + for (io_set_t::iterator itr = m_dependencies.begin(), end = m_dependencies.end(); itr != end; ++itr) { + if (first) { + first = false; + } else { + out << ", "; + } + out << (*itr)->m_ionum << ": " << m_start_time - (*itr)->m_start_time; + } + out << "}, num_successors = " << m_num_successors << ", numCompletionSuccessors = " << num_completion_successors(); +} + + +void IO::write_to(Ser& out, io_type iotype) const { + out.write_uint8_t(iotype); + out.write_uint32_t(m_ionum); + out.write_uint64_t(m_thread_id); + out.write_uint32_t(m_num_successors); + out.write_uint32_t(num_completion_successors()); + out.write_uint32_t(m_dependencies.size()); + vector deps; + for (io_set_t::const_iterator itr = m_dependencies.begin(), end = m_dependencies.end(); itr != end; ++itr) { + deps.push_back(*itr); + } + sort(deps.begin(), deps.end(), compare_io_ptrs_by_start_time); + for (vector::const_iterator itr = deps.begin(), end = deps.end(); itr != end; ++itr) { + out.write_uint32_t((*itr)->m_ionum); + out.write_uint64_t(m_start_time - (*itr)->m_start_time); + } +} + +IO::ptr IO::create_completion(uint64_t start_time, thread_id_t thread_id) { + assert(!m_completion.lock()); + IO::ptr completion(new CompletionIO(m_ionum + 1, start_time, thread_id)); + m_completion = completion; + completion->m_dependencies.insert(shared_from_this()); + return completion; +} + + +// TODO: Add unit tests +// Anything in 'deps' which is not reachable from 'base' is added to 'unreachable' +void rbd_replay::batch_unreachable_from(const io_set_t& deps, const io_set_t& base, io_set_t* unreachable) { + if (deps.empty()) { + return; + } + + map searching_for; + for (io_set_t::const_iterator itr = deps.begin(); itr != deps.end(); ++itr) { + searching_for[(*itr)->ionum()] = *itr; + } + + map boundary; + for (io_set_t::const_iterator itr = base.begin(); itr != base.end(); ++itr) { + boundary[(*itr)->ionum()] = *itr; + } + + // The boundary horizon is the maximum timestamp of IOs in the boundary. + // This monotonically decreases, because dependencies (which are added to the set) + // have earlier timestamp than the dependent IOs (which were just removed from the set). + uint64_t boundary_horizon = max_time(boundary); + + for (io_map_t::iterator itr = searching_for.begin(); itr != searching_for.end(); ) { + if (boundary_horizon >= itr->second->start_time()) { + break; + } + unreachable->insert(itr->second); + searching_for.erase(itr++); + } + if (searching_for.empty()) { + return; + } + + // The searching horizon is the minimum timestamp of IOs in the searching set. + // This monotonically increases, because elements are only removed from the set. + uint64_t searching_horizon = min_time(searching_for); + + while (!boundary.empty()) { + // Take an IO from the end, which has the highest timestamp. + // This reduces the boundary horizon as early as possible, + // which means we can short cut as soon as possible. + map >::iterator b_itr(boundary.end()); + --b_itr; + boost::shared_ptr io(b_itr->second); + boundary.erase(b_itr); + + for (io_set_t::const_iterator itr = io->dependencies().begin(), end = io->dependencies().end(); itr != end; ++itr) { + IO::ptr dep(*itr); + assertf(dep->ionum() < io->ionum(), "IO: %d, dependency: %d", io->ionum(), dep->ionum()); + io_map_t::iterator p = searching_for.find(dep->ionum()); + if (p != searching_for.end()) { + searching_for.erase(p); + if (dep->start_time() == searching_horizon) { + searching_horizon = min_time(searching_for); + if (searching_horizon == 0) { + return; + } + } + } + boundary[dep->ionum()] = dep; + } + + boundary_horizon = max_time(boundary); + if (boundary_horizon != 0) { + // Anything we're searching for that has a timestamp greater than the + // boundary horizon will never be found, since the boundary horizon + // falls monotonically. + for (io_map_t::iterator itr = searching_for.begin(); itr != searching_for.end(); ) { + if (boundary_horizon >= itr->second->start_time()) { + break; + } + unreachable->insert(itr->second); + searching_for.erase(itr++); + } + searching_horizon = min_time(searching_for); + if (searching_horizon == 0) { + return; + } + } + } + + // Anything we're still searching for has not been found. + for (io_map_t::iterator itr = searching_for.begin(), end = searching_for.end(); itr != end; ++itr) { + unreachable->insert(itr->second); + } +} + +ostream& operator<<(ostream& out, IO::ptr io) { + io->write_debug(out); + return out; +} + +void StartThreadIO::write_to(Ser& out) const { + IO::write_to(out, IO_START_THREAD); +} + +void StartThreadIO::write_debug(std::ostream& out) const { + write_debug_base(out, "start thread"); +} + +void StopThreadIO::write_to(Ser& out) const { + IO::write_to(out, IO_STOP_THREAD); +} + +void StopThreadIO::write_debug(std::ostream& out) const { + write_debug_base(out, "stop thread"); +} + +void ReadIO::write_to(Ser& out) const { + IO::write_to(out, IO_READ); + out.write_uint64_t(m_imagectx); + out.write_uint64_t(m_offset); + out.write_uint64_t(m_length); +} + +void ReadIO::write_debug(std::ostream& out) const { + write_debug_base(out, "read"); + out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]"; +} + +void WriteIO::write_to(Ser& out) const { + IO::write_to(out, IO_WRITE); + out.write_uint64_t(m_imagectx); + out.write_uint64_t(m_offset); + out.write_uint64_t(m_length); +} + +void WriteIO::write_debug(std::ostream& out) const { + write_debug_base(out, "write"); + out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]"; +} + +void AioReadIO::write_to(Ser& out) const { + IO::write_to(out, IO_ASYNC_READ); + out.write_uint64_t(m_imagectx); + out.write_uint64_t(m_offset); + out.write_uint64_t(m_length); +} + +void AioReadIO::write_debug(std::ostream& out) const { + write_debug_base(out, "aio read"); + out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]"; +} + +void AioWriteIO::write_to(Ser& out) const { + IO::write_to(out, IO_ASYNC_WRITE); + out.write_uint64_t(m_imagectx); + out.write_uint64_t(m_offset); + out.write_uint64_t(m_length); +} + +void AioWriteIO::write_debug(std::ostream& out) const { + write_debug_base(out, "aio write"); + out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]"; +} + +void OpenImageIO::write_to(Ser& out) const { + IO::write_to(out, IO_OPEN_IMAGE); + out.write_uint64_t(m_imagectx); + out.write_string(m_name); + out.write_string(m_snap_name); + out.write_bool(m_readonly); +} + +void OpenImageIO::write_debug(std::ostream& out) const { + write_debug_base(out, "open image"); + out << ", imagectx=" << m_imagectx << ", name='" << m_name << "', snap_name='" << m_snap_name << "', readonly=" << m_readonly; +} + +void CloseImageIO::write_to(Ser& out) const { + IO::write_to(out, IO_CLOSE_IMAGE); + out.write_uint64_t(m_imagectx); +} + +void CloseImageIO::write_debug(std::ostream& out) const { + write_debug_base(out, "close image"); + out << ", imagectx=" << m_imagectx; +} diff --git a/src/rbd_replay/ios.hpp b/src/rbd_replay/ios.hpp new file mode 100644 index 000000000000..ad9ff2a5c992 --- /dev/null +++ b/src/rbd_replay/ios.hpp @@ -0,0 +1,334 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Adam Crume + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef _INCLUDED_RBD_REPLAY_IOS_HPP +#define _INCLUDED_RBD_REPLAY_IOS_HPP + +// This code assumes that IO IDs and timestamps are related monotonically. +// In other words, (a.id < b.id) == (a.timestamp < b.timestamp) for all IOs a and b. + +#include +#include +#include +#include +#include +#include "actions.hpp" +#include "Ser.hpp" + + +namespace rbd_replay { + +class IO; + +typedef std::set > io_set_t; + +typedef std::map > io_map_t; + +void batch_unreachable_from(const io_set_t& deps, const io_set_t& base, io_set_t* unreachable); + +class IO : public boost::enable_shared_from_this { +public: + typedef boost::shared_ptr ptr; + + typedef boost::weak_ptr weak_ptr; + + IO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id, + ptr prev) + : m_ionum(ionum), + m_start_time(start_time), + m_dependencies(io_set_t()), + m_completion(weak_ptr()), + m_num_successors(0), + m_thread_id(thread_id), + m_prev(prev) { + } + + virtual ~IO() { + } + + uint64_t start_time() const { + return m_start_time; + } + + io_set_t& dependencies() { + return m_dependencies; + } + + const io_set_t& dependencies() const { + return m_dependencies; + } + + void add_dependencies(const io_set_t& deps); + + uint64_t num_completion_successors() const { + ptr c(m_completion.lock()); + return c ? c->m_num_successors : 0; + } + + void write_to(Ser& out, io_type iotype) const; + + virtual void write_to(Ser& out) const = 0; + + virtual bool is_completion() const { + return false; + } + + void set_ionum(action_id_t ionum) { + m_ionum = ionum; + } + + action_id_t ionum() const { + return m_ionum; + } + + ptr prev() const { + return m_prev; + } + + void set_num_successors(uint32_t n) { + m_num_successors = n; + } + + uint32_t num_successors() const { + return m_num_successors; + } + + void write_debug_base(std::ostream& out, std::string iotype) const; + + virtual void write_debug(std::ostream& out) const = 0; + + // The result must be stored somewhere, or else m_completion will expire + ptr create_completion(uint64_t start_time, thread_id_t thread_id); + +private: + action_id_t m_ionum; + uint64_t m_start_time; + io_set_t m_dependencies; + boost::weak_ptr m_completion; + uint32_t m_num_successors; + thread_id_t m_thread_id; + ptr m_prev; +}; + +std::ostream& operator<<(std::ostream& out, IO::ptr io); + + +class StartThreadIO : public IO { +public: + StartThreadIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id) + : IO(ionum, start_time, thread_id, IO::ptr()) { + } + + void write_to(Ser& out) const; + + void write_debug(std::ostream& out) const; +}; + +class StopThreadIO : public IO { +public: + StopThreadIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id) + : IO(ionum, start_time, thread_id, IO::ptr()) { + } + + void write_to(Ser& out) const; + + void write_debug(std::ostream& out) const; +}; + +class ReadIO : public IO { +public: + ReadIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id, + IO::ptr prev, + imagectx_id_t imagectx, + uint64_t offset, + uint64_t length) + : IO(ionum, start_time, thread_id, prev), + m_imagectx(imagectx), + m_offset(offset), + m_length(length) { + } + + void write_to(Ser& out) const; + + void write_debug(std::ostream& out) const; + +private: + imagectx_id_t m_imagectx; + uint64_t m_offset; + uint64_t m_length; +}; + +class WriteIO : public IO { +public: + WriteIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id, + IO::ptr prev, + imagectx_id_t imagectx, + uint64_t offset, + uint64_t length) + : IO(ionum, start_time, thread_id, prev), + m_imagectx(imagectx), + m_offset(offset), + m_length(length) { + } + + void write_to(Ser& out) const; + + void write_debug(std::ostream& out) const; + +private: + imagectx_id_t m_imagectx; + uint64_t m_offset; + uint64_t m_length; +}; + +class AioReadIO : public IO { +public: + AioReadIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id, + IO::ptr prev, + imagectx_id_t imagectx, + uint64_t offset, + uint64_t length) + : IO(ionum, start_time, thread_id, prev), + m_imagectx(imagectx), + m_offset(offset), + m_length(length) { + } + + void write_to(Ser& out) const; + + void write_debug(std::ostream& out) const; + +private: + imagectx_id_t m_imagectx; + uint64_t m_offset; + uint64_t m_length; +}; + +class AioWriteIO : public IO { +public: + AioWriteIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id, + IO::ptr prev, + imagectx_id_t imagectx, + uint64_t offset, + uint64_t length) + : IO(ionum, start_time, thread_id, prev), + m_imagectx(imagectx), + m_offset(offset), + m_length(length) { + } + + void write_to(Ser& out) const; + + void write_debug(std::ostream& out) const; + +private: + imagectx_id_t m_imagectx; + uint64_t m_offset; + uint64_t m_length; +}; + +class OpenImageIO : public IO { +public: + OpenImageIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id, + IO::ptr prev, + imagectx_id_t imagectx, + const std::string& name, + const std::string& snap_name, + bool readonly) + : IO(ionum, start_time, thread_id, prev), + m_imagectx(imagectx), + m_name(name), + m_snap_name(snap_name), + m_readonly(readonly) { + } + + void write_to(Ser& out) const; + + imagectx_id_t imagectx() const { + return m_imagectx; + } + + void write_debug(std::ostream& out) const; + +private: + imagectx_id_t m_imagectx; + std::string m_name; + std::string m_snap_name; + bool m_readonly; +}; + +class CloseImageIO : public IO { +public: + CloseImageIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id, + IO::ptr prev, + imagectx_id_t imagectx) + : IO(ionum, start_time, thread_id, prev), + m_imagectx(imagectx) { + } + + void write_to(Ser& out) const; + + imagectx_id_t imagectx() const { + return m_imagectx; + } + + void write_debug(std::ostream& out) const; + +private: + imagectx_id_t m_imagectx; +}; + +class CompletionIO : public IO { +public: + CompletionIO(action_id_t ionum, + uint64_t start_time, + thread_id_t thread_id) + : IO(ionum, start_time, thread_id, IO::ptr()) { + } + + void write_to(Ser& out) const { + } + + bool is_completion() const { + return true; + } + + void write_debug(std::ostream& out) const { + write_debug_base(out, "completion"); + } +}; + +bool compare_io_ptrs_by_start_time(IO::ptr p1, IO::ptr p2); + +} + +#endif diff --git a/src/rbd_replay/rbd-replay-prep.cc b/src/rbd_replay/rbd-replay-prep.cc index 9e789a0a4924..63a5fda55ac4 100644 --- a/src/rbd_replay/rbd-replay-prep.cc +++ b/src/rbd_replay/rbd-replay-prep.cc @@ -21,125 +21,13 @@ #include #include #include -#include #include #include -#include "actions.hpp" -#include "Ser.hpp" +#include "ios.hpp" using namespace std; using namespace rbd_replay; -// Allows us to easily expose all the functions to make debugging easier. -#define STATIC static - -class IO; - -typedef set > io_set_t; - -typedef map > io_map_t; - -STATIC void batch_unreachable_from(const io_set_t& deps, const io_set_t& base, io_set_t* unreachable); - -class IO : public boost::enable_shared_from_this { -public: - typedef boost::shared_ptr ptr; - - typedef boost::weak_ptr weak_ptr; - - IO(action_id_t ionum, - uint64_t start_time, - thread_id_t thread_id, - ptr prev) - : m_ionum(ionum), - m_start_time(start_time), - m_dependencies(io_set_t()), - m_completion(weak_ptr()), - m_num_successors(0), - m_thread_id(thread_id), - m_prev(prev) { - } - - virtual ~IO() { - } - - uint64_t start_time() const { - return m_start_time; - } - - io_set_t& dependencies() { - return m_dependencies; - } - - const io_set_t& dependencies() const { - return m_dependencies; - } - - void add_dependencies(const io_set_t& deps) { - io_set_t base(m_dependencies); - for (io_set_t::const_iterator itr = deps.begin(); itr != deps.end(); ++itr) { - ptr dep(*itr); - for (io_set_t::const_iterator itr2 = dep->m_dependencies.begin(); itr2 != dep->m_dependencies.end(); ++itr2) { - base.insert(*itr2); - } - } - batch_unreachable_from(deps, base, &m_dependencies); - } - - uint64_t num_completion_successors() const { - ptr c(m_completion.lock()); - return c ? c->m_num_successors : 0; - } - - void write_to(Ser& out, io_type iotype) const; - - virtual void write_to(Ser& out) const = 0; - - virtual bool is_completion() const { - return false; - } - - void set_ionum(action_id_t ionum) { - m_ionum = ionum; - } - - action_id_t ionum() const { - return m_ionum; - } - - ptr prev() const { - return m_prev; - } - - void set_num_successors(uint32_t n) { - m_num_successors = n; - } - - uint32_t num_successors() const { - return m_num_successors; - } - - void write_debug_base(ostream& out, string iotype); - - virtual void write_debug(ostream& out) = 0; - - // The result must be stored somewhere, or else m_completion will expire - ptr create_completion(uint64_t start_time, thread_id_t thread_id); - -private: - action_id_t m_ionum; - uint64_t m_start_time; - io_set_t m_dependencies; - boost::weak_ptr m_completion; - uint32_t m_num_successors; - thread_id_t m_thread_id; - ptr m_prev; -}; - -ostream& operator<<(ostream& out, IO::ptr io) { - io->write_debug(out); - return out; -} class Thread { public: @@ -197,413 +85,11 @@ private: uint64_t m_max_ts; }; -void IO::write_debug_base(ostream& out, string type) { - out << m_ionum << ": " << m_start_time / 1000000.0 << ": " << type << ", thread = " << m_thread_id << ", deps = {"; - bool first = true; - for (io_set_t::iterator itr = m_dependencies.begin(), end = m_dependencies.end(); itr != end; ++itr) { - if (first) { - first = false; - } else { - out << ", "; - } - out << (*itr)->m_ionum << ": " << m_start_time - (*itr)->m_start_time; - } - out << "}, num_successors = " << m_num_successors << ", numCompletionSuccessors = " << num_completion_successors(); -} - -class StartThreadIO : public IO { -public: - StartThreadIO(action_id_t ionum, - uint64_t start_time, - thread_id_t thread_id) - : IO(ionum, start_time, thread_id, IO::ptr()) { - } - - void write_to(Ser& out) const { - IO::write_to(out, IO_START_THREAD); - } - - void write_debug(ostream& out) { - write_debug_base(out, "start thread"); - } -}; - -class StopThreadIO : public IO { -public: - StopThreadIO(action_id_t ionum, - uint64_t start_time, - thread_id_t thread_id) - : IO(ionum, start_time, thread_id, IO::ptr()) { - } - - void write_to(Ser& out) const { - IO::write_to(out, IO_STOP_THREAD); - } - - void write_debug(ostream& out) { - write_debug_base(out, "stop thread"); - } -}; - -class ReadIO : public IO { -public: - ReadIO(action_id_t ionum, - uint64_t start_time, - thread_id_t thread_id, - IO::ptr prev, - imagectx_id_t imagectx, - uint64_t offset, - uint64_t length) - : IO(ionum, start_time, thread_id, prev), - m_imagectx(imagectx), - m_offset(offset), - m_length(length) { - } - - void write_to(Ser& out) const { - IO::write_to(out, IO_READ); - out.write_uint64_t(m_imagectx); - out.write_uint64_t(m_offset); - out.write_uint64_t(m_length); - } - - void write_debug(ostream& out) { - write_debug_base(out, "read"); - out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]"; - } - -private: - imagectx_id_t m_imagectx; - uint64_t m_offset; - uint64_t m_length; -}; - -class WriteIO : public IO { -public: - WriteIO(action_id_t ionum, - uint64_t start_time, - thread_id_t thread_id, - IO::ptr prev, - imagectx_id_t imagectx, - uint64_t offset, - uint64_t length) - : IO(ionum, start_time, thread_id, prev), - m_imagectx(imagectx), - m_offset(offset), - m_length(length) { - } - - void write_to(Ser& out) const { - IO::write_to(out, IO_WRITE); - out.write_uint64_t(m_imagectx); - out.write_uint64_t(m_offset); - out.write_uint64_t(m_length); - } - - void write_debug(ostream& out) { - write_debug_base(out, "write"); - out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]"; - } - -private: - imagectx_id_t m_imagectx; - uint64_t m_offset; - uint64_t m_length; -}; - -class AioReadIO : public IO { -public: - AioReadIO(action_id_t ionum, - uint64_t start_time, - thread_id_t thread_id, - IO::ptr prev, - imagectx_id_t imagectx, - uint64_t offset, - uint64_t length) - : IO(ionum, start_time, thread_id, prev), - m_imagectx(imagectx), - m_offset(offset), - m_length(length) { - } - - void write_to(Ser& out) const { - IO::write_to(out, IO_ASYNC_READ); - out.write_uint64_t(m_imagectx); - out.write_uint64_t(m_offset); - out.write_uint64_t(m_length); - } - - void write_debug(ostream& out) { - write_debug_base(out, "aio read"); - out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]"; - } -private: - imagectx_id_t m_imagectx; - uint64_t m_offset; - uint64_t m_length; -}; - -class AioWriteIO : public IO { -public: - AioWriteIO(action_id_t ionum, - uint64_t start_time, - thread_id_t thread_id, - IO::ptr prev, - imagectx_id_t imagectx, - uint64_t offset, - uint64_t length) - : IO(ionum, start_time, thread_id, prev), - m_imagectx(imagectx), - m_offset(offset), - m_length(length) { - } - - void write_to(Ser& out) const { - IO::write_to(out, IO_ASYNC_WRITE); - out.write_uint64_t(m_imagectx); - out.write_uint64_t(m_offset); - out.write_uint64_t(m_length); - } - - void write_debug(ostream& out) { - write_debug_base(out, "aio write"); - out << ", imagectx=" << m_imagectx << ", offset=" << m_offset << ", length=" << m_length << "]"; - } - -private: - imagectx_id_t m_imagectx; - uint64_t m_offset; - uint64_t m_length; -}; - -class OpenImageIO : public IO { -public: - OpenImageIO(action_id_t ionum, - uint64_t start_time, - thread_id_t thread_id, - IO::ptr prev, - imagectx_id_t imagectx, - const string& name, - const string& snap_name, - bool readonly) - : IO(ionum, start_time, thread_id, prev), - m_imagectx(imagectx), - m_name(name), - m_snap_name(snap_name), - m_readonly(readonly) { - } - - void write_to(Ser& out) const { - IO::write_to(out, IO_OPEN_IMAGE); - out.write_uint64_t(m_imagectx); - out.write_string(m_name); - out.write_string(m_snap_name); - out.write_bool(m_readonly); - } - - imagectx_id_t imagectx() const { - return m_imagectx; - } - - void write_debug(ostream& out) { - write_debug_base(out, "open image"); - out << ", imagectx=" << m_imagectx << ", name='" << m_name << "', snap_name='" << m_snap_name << "', readonly=" << m_readonly; - } - -private: - imagectx_id_t m_imagectx; - string m_name; - string m_snap_name; - bool m_readonly; -}; - -class CloseImageIO : public IO { -public: - CloseImageIO(action_id_t ionum, - uint64_t start_time, - thread_id_t thread_id, - IO::ptr prev, - imagectx_id_t imagectx) - : IO(ionum, start_time, thread_id, prev), - m_imagectx(imagectx) { - } - - void write_to(Ser& out) const { - IO::write_to(out, IO_CLOSE_IMAGE); - out.write_uint64_t(m_imagectx); - } - - imagectx_id_t imagectx() const { - return m_imagectx; - } - - void write_debug(ostream& out) { - write_debug_base(out, "close image"); - out << ", imagectx=" << m_imagectx; - } - -private: - imagectx_id_t m_imagectx; -}; - -class CompletionIO : public IO { -public: - CompletionIO(action_id_t ionum, - uint64_t start_time, - thread_id_t thread_id) - : IO(ionum, start_time, thread_id, IO::ptr()) { - } - - void write_to(Ser& out) const { - } - - bool is_completion() const { - return true; - } - - void write_debug(ostream& out) { - write_debug_base(out, "completion"); - } -}; - -STATIC bool compare_io_ptrs_by_start_time(IO::ptr p1, IO::ptr p2) { - return p1->start_time() < p2->start_time(); -} - -void IO::write_to(Ser& out, io_type iotype) const { - out.write_uint8_t(iotype); - out.write_uint32_t(m_ionum); - out.write_uint64_t(m_thread_id); - out.write_uint32_t(m_num_successors); - out.write_uint32_t(num_completion_successors()); - out.write_uint32_t(m_dependencies.size()); - vector deps; - for (io_set_t::const_iterator itr = m_dependencies.begin(), end = m_dependencies.end(); itr != end; ++itr) { - deps.push_back(*itr); - } - sort(deps.begin(), deps.end(), compare_io_ptrs_by_start_time); - for (vector::const_iterator itr = deps.begin(), end = deps.end(); itr != end; ++itr) { - out.write_uint32_t((*itr)->m_ionum); - out.write_uint64_t(m_start_time - (*itr)->m_start_time); - } -} - -IO::ptr IO::create_completion(uint64_t start_time, thread_id_t thread_id) { - assert(!m_completion.lock()); - IO::ptr completion(new CompletionIO(m_ionum + 1, start_time, thread_id)); - m_completion = completion; - completion->m_dependencies.insert(shared_from_this()); - return completion; -} - -STATIC uint64_t min_time(const map& s) { - if (s.empty()) { - return 0; - } - return s.begin()->second->start_time(); -} - -STATIC uint64_t max_time(const map& s) { - if (s.empty()) { - return 0; - } - map::const_iterator itr(s.end()); - --itr; - return itr->second->start_time(); -} - -// TODO: Add unit tests -// Anything in 'deps' which is not reachable from 'base' is added to 'unreachable' -STATIC void batch_unreachable_from(const io_set_t& deps, const io_set_t& base, io_set_t* unreachable) { - if (deps.empty()) { - return; - } - - map searching_for; - for (io_set_t::const_iterator itr = deps.begin(); itr != deps.end(); ++itr) { - searching_for[(*itr)->ionum()] = *itr; - } - - map boundary; - for (io_set_t::const_iterator itr = base.begin(); itr != base.end(); ++itr) { - boundary[(*itr)->ionum()] = *itr; - } - - // The boundary horizon is the maximum timestamp of IOs in the boundary. - // This monotonically decreases, because dependencies (which are added to the set) - // have earlier timestamp than the dependent IOs (which were just removed from the set). - uint64_t boundary_horizon = max_time(boundary); - - for (io_map_t::iterator itr = searching_for.begin(); itr != searching_for.end(); ) { - if (boundary_horizon >= itr->second->start_time()) { - break; - } - unreachable->insert(itr->second); - searching_for.erase(itr++); - } - if (searching_for.empty()) { - return; - } - - // The searching horizon is the minimum timestamp of IOs in the searching set. - // This monotonically increases, because elements are only removed from the set. - uint64_t searching_horizon = min_time(searching_for); - - while (!boundary.empty()) { - // Take an IO from the end, which has the highest timestamp. - // This reduces the boundary horizon as early as possible, - // which means we can short cut as soon as possible. - map >::iterator b_itr(boundary.end()); - --b_itr; - boost::shared_ptr io(b_itr->second); - boundary.erase(b_itr); - - for (io_set_t::const_iterator itr = io->dependencies().begin(), end = io->dependencies().end(); itr != end; ++itr) { - IO::ptr dep(*itr); - assertf(dep->ionum() < io->ionum(), "IO: %d, dependency: %d", io->ionum(), dep->ionum()); - io_map_t::iterator p = searching_for.find(dep->ionum()); - if (p != searching_for.end()) { - searching_for.erase(p); - if (dep->start_time() == searching_horizon) { - searching_horizon = min_time(searching_for); - if (searching_horizon == 0) { - return; - } - } - } - boundary[dep->ionum()] = dep; - } - - boundary_horizon = max_time(boundary); - if (boundary_horizon != 0) { - // Anything we're searching for that has a timestamp greater than the - // boundary horizon will never be found, since the boundary horizon - // falls monotonically. - for (io_map_t::iterator itr = searching_for.begin(); itr != searching_for.end(); ) { - if (boundary_horizon >= itr->second->start_time()) { - break; - } - unreachable->insert(itr->second); - searching_for.erase(itr++); - } - searching_horizon = min_time(searching_for); - if (searching_horizon == 0) { - return; - } - } - } - - // Anything we're still searching for has not been found. - for (io_map_t::iterator itr = searching_for.begin(), end = searching_for.end(); itr != end; ++itr) { - unreachable->insert(itr->second); - } -} - -STATIC void usage(string prog) { +static void usage(string prog) { cout << "Usage: " << prog << " [ --window ] " << endl; } -__attribute__((noreturn)) STATIC void usage_exit(string prog, string msg) { +__attribute__((noreturn)) static void usage_exit(string prog, string msg) { cerr << msg << endl; usage(prog); exit(1); diff --git a/src/test/Makefile.am b/src/test/Makefile.am index 6ba23f9ca08b..ba994921594e 100644 --- a/src/test/Makefile.am +++ b/src/test/Makefile.am @@ -583,6 +583,7 @@ unittest_rbd_replay_LDADD = $(LIBRBD) \ $(LIBRADOS) \ $(CEPH_GLOBAL) \ librbd_replay.la \ + librbd_replay_ios.la \ $(UNITTEST_LDADD) unittest_rbd_replay_CXXFLAGS = $(UNITTEST_CXXFLAGS) check_PROGRAMS += unittest_rbd_replay diff --git a/src/test/test_rbd_replay.cc b/src/test/test_rbd_replay.cc index d05074af231b..eb3c8e34543c 100644 --- a/src/test/test_rbd_replay.cc +++ b/src/test/test_rbd_replay.cc @@ -16,14 +16,15 @@ #include "gtest/gtest.h" #include #include +#include #include "rbd_replay/Deser.hpp" #include "rbd_replay/ImageNameMap.hpp" +#include "rbd_replay/ios.hpp" #include "rbd_replay/rbd_loc.hpp" #include "rbd_replay/Ser.hpp" -using rbd_replay::ImageNameMap; -using rbd_replay::rbd_loc; +using namespace rbd_replay; std::ostream& operator<<(std::ostream& o, const rbd_loc& name) { return o << "('" << name.pool << "', '" << name.image << "', '" << name.snap << "')"; @@ -167,3 +168,58 @@ TEST(RBDReplay, rbd_loc_parse) { EXPECT_FALSE(m.parse("a/b/c")); EXPECT_FALSE(m.parse("a@b/c")); } + +static IO::ptr mkio(action_id_t ionum, ...) { + IO::ptr io(new StartThreadIO(ionum, ionum, 0)); + + va_list ap; + va_start(ap, ionum); + while (true) { + IO::ptr* dep = va_arg(ap, IO::ptr*); + if (!dep) { + break; + } + io->dependencies().insert(*dep); + } + va_end(ap); + + return io; +} + +TEST(RBDReplay, batch_unreachable_from) { + io_set_t deps; + io_set_t base; + io_set_t unreachable; + IO::ptr io1(mkio(1, NULL)); + IO::ptr io2(mkio(2, &io1, NULL)); + IO::ptr io3(mkio(3, &io2, NULL)); + IO::ptr io4(mkio(4, &io1, NULL)); + IO::ptr io5(mkio(5, &io2, &io4, NULL)); + IO::ptr io6(mkio(6, &io3, &io5, NULL)); + IO::ptr io7(mkio(7, &io4, NULL)); + IO::ptr io8(mkio(8, &io5, &io7, NULL)); + IO::ptr io9(mkio(9, &io6, &io8, NULL)); + // 1 (deps) <-- 2 (deps) <-- 3 (deps) + // ^ ^ ^ + // | | | + // 4 <--------- 5 (base) <-- 6 (deps) + // ^ ^ ^ + // | | | + // 7 <--------- 8 <--------- 9 + deps.insert(io1); + deps.insert(io2); + deps.insert(io3); + deps.insert(io6); + base.insert(io5); + // Anything in 'deps' which is not reachable from 'base' is added to 'unreachable' + batch_unreachable_from(deps, base, &unreachable); + EXPECT_EQ(0, unreachable.count(io1)); + EXPECT_EQ(0, unreachable.count(io2)); + EXPECT_EQ(1, unreachable.count(io3)); + EXPECT_EQ(0, unreachable.count(io4)); + EXPECT_EQ(0, unreachable.count(io5)); + EXPECT_EQ(1, unreachable.count(io6)); + EXPECT_EQ(0, unreachable.count(io7)); + EXPECT_EQ(0, unreachable.count(io8)); + EXPECT_EQ(0, unreachable.count(io9)); +}