rbd_replay/actions.hpp \
rbd_replay/Deser.hpp \
rbd_replay/PendingIO.hpp \
+ rbd_replay/rbd_replay_debug.hpp \
rbd_replay/Replayer.hpp
if LINUX
bin_PROGRAMS += rbd-replay
*/
#include "PendingIO.hpp"
+#include "rbd_replay_debug.hpp"
using namespace std;
}
void PendingIO::completed(librbd::completion_t cb) {
- cout << "Completed pending IO #" << m_id << endl;
+ dout(ACTION_LEVEL) << "Completed pending IO #" << m_id << dendl;
m_worker.remove_pending(shared_from_this());
}
#include <boost/foreach.hpp>
#include <boost/thread/thread.hpp>
#include <fstream>
+#include "global/global_context.h"
+#include "rbd_replay_debug.hpp"
using namespace std;
}
void Worker::run() {
- cout << "Worker thread started" << endl;
+ dout(THREAD_LEVEL) << "Worker thread started" << dendl;
while (!m_done) {
Action::ptr action;
m_buffer.pop_back(&action);
m_pending_ios_empty.wait(lock);
}
}
- cout << "Worker thread stopped" << endl;
+ dout(THREAD_LEVEL) << "Worker thread stopped" << dendl;
}
Replayer::Replayer() {
}
-void Replayer::run(const std::string conf_file, const std::string replay_file) {
- cout << "IO thread started" << endl;
+void Replayer::run(const std::string replay_file) {
{
librados::Rados rados;
rados.init(NULL);
- int r = rados.conf_read_file(conf_file.c_str());
+ int r = rados.init_with_context(g_ceph_context);
if (r) {
- cerr << "Unable to read conf file: " << r << endl;
+ cerr << "Unable to read conf file: " << r << std::endl;
goto out;
}
r = rados.connect();
if (r) {
- cerr << "Unable to connect to Rados: " << r << endl;
+ cerr << "Unable to connect to Rados: " << r << std::endl;
goto out;
}
m_ioctx = new librados::IoCtx();
const char* pool_name = "rbd";
r = rados.ioctx_create(pool_name, *m_ioctx);
if (r) {
- cerr << "Unable to create IoCtx: " << r << endl;
+ cerr << "Unable to create IoCtx: " << r << std::endl;
goto out2;
}
m_rbd = new librbd::RBD();
ifstream input(replay_file.c_str(), ios::in | ios::binary);
if (!input.is_open()) {
- cerr << "Unable to open " << replay_file << endl;
+ cerr << "Unable to open " << replay_file << std::endl;
exit(1);
}
}
}
- cout << "Waiting for workers to die" << endl;
+ dout(THREAD_LEVEL) << "Waiting for workers to die" << dendl;
pair<thread_id_t, Worker*> w;
BOOST_FOREACH(w, workers) {
w.second->join();
m_ioctx = NULL;
}
out:
- cout << "IO thread stopped" << endl;
+ ;
}
}
void Replayer::set_action_complete(action_id_t id) {
- cout << "ActionTracker::set_complete(" << id << ")" << endl;
+ dout(DEPGRAPH_LEVEL) << "ActionTracker::set_complete(" << id << ")" << dendl;
boost::system_time now(boost::get_system_time());
boost::unique_lock<boost::shared_mutex> lock(m_actions_complete_mutex);
assert(m_actions_complete.count(id) == 0);
void Replayer::wait_for_actions(const vector<dependency_d> &deps) {
boost::posix_time::ptime release_time(boost::posix_time::neg_infin);
BOOST_FOREACH(const dependency_d &dep, deps) {
- cout << "Waiting for " << dep.id << endl;
+ dout(DEPGRAPH_LEVEL) << "Waiting for " << dep.id << dendl;
boost::system_time start_time(boost::get_system_time());
boost::shared_lock<boost::shared_mutex> lock(m_actions_complete_mutex);
while (!_is_action_complete(dep.id)) {
//m_actions_complete_condition.wait(lock);
m_actions_complete_condition.timed_wait(lock, boost::posix_time::seconds(1));
- cout << "Still waiting for " << dep.id << endl;
+ dout(DEPGRAPH_LEVEL) << "Still waiting for " << dep.id << dendl;
}
boost::system_time action_completed_time(m_actions_complete[dep.id]);
lock.unlock();
boost::system_time end_time(boost::get_system_time());
long long micros = (end_time - start_time).total_microseconds();
- cout << "Finished waiting for " << dep.id << " after " << micros << " microseconds" << endl;
+ dout(DEPGRAPH_LEVEL) << "Finished waiting for " << dep.id << " after " << micros << " microseconds" << dendl;
// Apparently the nanoseconds constructor is optional:
// http://www.boost.org/doc/libs/1_46_0/doc/html/date_time/details.html#compile_options
boost::system_time sub_release_time(action_completed_time + boost::posix_time::microseconds(dep.time_delta * m_latency_multiplier / 1000));
}
}
if (release_time > boost::get_system_time()) {
- cout << "Sleeping for " << (release_time - boost::get_system_time()).total_microseconds() << " microseconds" << endl;
+ dout(SLEEP_LEVEL) << "Sleeping for " << (release_time - boost::get_system_time()).total_microseconds() << " microseconds" << dendl;
boost::this_thread::sleep(release_time);
}
}
public:
Replayer();
- void run(const std::string conf_file, const std::string replay_file);
+ void run(const std::string replay_file);
librbd::RBD* get_rbd() {
return m_rbd;
#include "actions.hpp"
#include <cstdlib>
#include "PendingIO.hpp"
+#include "rbd_replay_debug.hpp"
using namespace rbd_replay;
case 7:
return CloseImageAction::read_from(dummy, d);
default:
- cerr << "Invalid action type: " << type << endl;
+ cerr << "Invalid action type: " << type << std::endl;
exit(1);
}
}
}
void StartThreadAction::perform(ActionCtx &ctx) {
- cerr << "StartThreadAction should never actually be performed" << endl;
+ cerr << "StartThreadAction should never actually be performed" << std::endl;
exit(1);
}
}
void StopThreadAction::perform(ActionCtx &ctx) {
- cout << "Performing stop thread action #" << id() << endl;
+ dout(ACTION_LEVEL) << "Performing stop thread action #" << id() << dendl;
ctx.stop();
}
}
void AioReadAction::perform(ActionCtx &worker) {
- cout << "Performing AIO read action #" << id() << endl;
+ dout(ACTION_LEVEL) << "Performing AIO read action #" << id() << dendl;
librbd::Image *image = worker.get_image(m_imagectx_id);
assert(image);
PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
}
void ReadAction::perform(ActionCtx &worker) {
- cout << "Performing read action #" << id() << endl;
+ dout(ACTION_LEVEL) << "Performing read action #" << id() << dendl;
librbd::Image *image = worker.get_image(m_imagectx_id);
PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
worker.add_pending(io);
}
void AioWriteAction::perform(ActionCtx &worker) {
- cout << "Performing AIO write action #" << id() << endl;
+ dout(ACTION_LEVEL) << "Performing AIO write action #" << id() << dendl;
librbd::Image *image = worker.get_image(m_imagectx_id);
PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
io->bufferlist().append_zero(m_length);
}
void WriteAction::perform(ActionCtx &worker) {
- cout << "Performing write action #" << id() << endl;
+ dout(ACTION_LEVEL) << "Performing write action #" << id() << dendl;
librbd::Image *image = worker.get_image(m_imagectx_id);
PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
worker.add_pending(io);
}
void OpenImageAction::perform(ActionCtx &worker) {
- cout << "Performing open image action #" << id() << endl;
+ dout(ACTION_LEVEL) << "Performing open image action #" << id() << dendl;
PendingIO::ptr io(new PendingIO(pending_io_id(), worker));
worker.add_pending(io);
librbd::Image *image = new librbd::Image();
r = rbd->open(*worker.ioctx(), *image, m_name.c_str(), m_snap_name.c_str());
}
if (r) {
- cerr << "Unable to open image '" << m_name << "' with snap '" << m_snap_name << "' and readonly " << m_readonly << ": " << strerror(-r) << endl;
+ cerr << "Unable to open image '" << m_name << "' with snap '" << m_snap_name << "' and readonly " << m_readonly << ": " << strerror(-r) << std::endl;
exit(1);
}
worker.put_image(m_imagectx_id, image);
}
void CloseImageAction::perform(ActionCtx &worker) {
- cout << "Performing close image action #" << id() << endl;
+ dout(ACTION_LEVEL) << "Performing close image action #" << id() << dendl;
worker.erase_image(m_imagectx_id);
worker.set_action_complete(pending_io_id());
}
#include <vector>
#include "common/ceph_argparse.h"
+#include "global/global_init.h"
#include "Replayer.hpp"
+#include "rbd_replay_debug.hpp"
using namespace std;
}
static void usage(const char* program) {
- cout << "Usage: " << program << " --conf=<config_file> <replay_file>" << endl;
+ cout << "Usage: " << program << " --conf=<config_file> <replay_file>" << std::endl;
}
int main(int argc, const char **argv) {
argv_to_vec(argc, argv, args);
env_to_vec(args);
+ global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
std::vector<const char*>::iterator i;
- string conf;
float latency_multiplier = 1;
std::string val;
std::ostringstream err;
for (i = args.begin(); i != args.end(); ) {
if (ceph_argparse_double_dash(args, i)) {
break;
- } else if (ceph_argparse_witharg(args, i, &val, "-c", "--conf", (char*)NULL)) {
- conf = val;
} else if (ceph_argparse_withfloat(args, i, &latency_multiplier, &err, "--latency-multiplier",
(char*)NULL)) {
if (!err.str().empty()) {
usage(argv[0]);
return 0;
} else if (get_remainder(*i, "-")) {
- cerr << "Unrecognized argument: " << *i << endl;
+ cerr << "Unrecognized argument: " << *i << std::endl;
return 1;
} else {
++i;
}
}
- if (conf.empty()) {
- cerr << "No config file specified. Use -c or --conf." << endl;
- return 1;
- }
+ common_init_finish(g_ceph_context);
string replay_file;
if (!args.empty()) {
}
if (replay_file.empty()) {
- cerr << "No replay file specified." << endl;
+ cerr << "No replay file specified." << std::endl;
return 1;
}
Replayer replayer;
replayer.set_latency_multiplier(latency_multiplier);
- replayer.run(conf, replay_file);
+ replayer.run(replay_file);
}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 Adam Crume <adamcrume@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef _INCLUDED_RBD_REPLAY_DEBUG_H
+#define _INCLUDED_RBD_REPLAY_DEBUG_H
+
+#include "common/debug.h"
+#include "include/assert.h"
+
+namespace rbd_replay {
+
+static const int ACTION_LEVEL = 11;
+static const int DEPGRAPH_LEVEL = 12;
+static const int SLEEP_LEVEL = 13;
+static const int THREAD_LEVEL = 10;
+
+}
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "rbd_replay: "
+
+#endif