#include <fstream>
#include <filesystem>
#include <mutex>
-#include <rados/buffer_fwd.h>
-#include <rados/librados.hpp>
+#include "include/rados/buffer_fwd.h"
+#include "include/rados/librados.hpp"
#include <atomic>
#include <fmt/format.h>
#include <map>
}
}
+
+int op_comparison_by_date(const Op& lhs, const Op& rhs) {
+ return lhs.at < rhs.at;
+}
+
void usage(po::options_description &desc) {
cout << desc << std::endl;
}
po::options_description po_options("Options");
po_options.add_options()
("help,h", "produce help message")
- (",i", po::value<string>(&file)->default_value("input.txt"), "Input file (output of op_scraper.py)")
+ ("input-files,i", po::value<vector<string>>()->multitoken(), "List of input files (output of op_scraper.py). Multiple files will be merged and sorted by time order")
("ceph-conf", po::value<string>(&ceph_conf_path)->default_value("ceph.conf"), "Path to ceph conf")
("io-depth", po::value<uint64_t>(&io_depth)->default_value(64), "I/O depth")
("parser-threads", po::value<uint64_t>(&nparser_threads)->default_value(16), "Number of parser threads")
po::parsed_options parsed = po::command_line_parser(argc, argv).options(po_all).allow_unregistered().run();
po::store( parsed, vm);
po::notify(vm);
+
if (vm.count("help")) {
usage(po_all);
exit(EXIT_SUCCESS);
}
-
- // Parse input file
- vector<std::thread> parser_threads;
- vector<shared_ptr<ParserContext>> parser_contexts;
- int fd = open(file.c_str(), O_RDONLY);
- if (fd == -1) {
- cout << "Error opening file" << endl;
- }
- struct stat file_stat;
- fstat(fd, &file_stat);
- char* mapped_buffer = (char*)mmap(NULL, file_stat.st_size, PROT_READ, MAP_SHARED, fd, 0);
- if (mapped_buffer == nullptr) {
- cout << "error mapping buffer" << endl;
- }
- uint64_t start_offset = 0;
- uint64_t step_size = file_stat.st_size / nparser_threads;
- for (int i = 0; i < nparser_threads; i++) {
- char* end = mapped_buffer + start_offset + step_size;
- while(*end != '\n') {
- end--;
+
+ assert(vm.count("input-files") > 0);
+
+ vector<string> input_files = vm["input-files"].as<vector<string>>();
+
+ vector<shared_ptr<ParserContext>> complete_parser_contexts; // list of ALL parser contexts so that shared_ptrs do not die.
+ for (auto &file : input_files) {
+ // Parse input file
+ vector<std::thread> parser_threads;
+ vector<shared_ptr<ParserContext>> parser_contexts;
+ cout << fmt::format("parsing file {}", file) << endl;
+ int fd = open(file.c_str(), O_RDONLY);
+ if (fd == -1) {
+ cout << "Error opening file" << endl;
+ exit(EXIT_FAILURE);
}
- if (i == nparser_threads - 1) {
- end = mapped_buffer + file_stat.st_size;
+ struct stat file_stat;
+ fstat(fd, &file_stat);
+ char* mapped_buffer = (char*)mmap(NULL, file_stat.st_size, PROT_READ, MAP_SHARED, fd, 0);
+ if (mapped_buffer == nullptr) {
+ cout << "error mapping buffer" << endl;
+ exit(EXIT_FAILURE);
}
- shared_ptr<ParserContext> context = make_shared<ParserContext>();
- context->start = mapped_buffer + start_offset;
- context->end = end;
- context->max_buffer_size = 0;
- parser_contexts.push_back(context);
- parser_threads.push_back(std::thread(parse_entry_point, context));
- start_offset += (end - mapped_buffer - start_offset);
- }
- for (auto& t : parser_threads) {
- t.join();
+ uint64_t start_offset = 0;
+ uint64_t step_size = file_stat.st_size / nparser_threads;
+ for (int i = 0; i < nparser_threads; i++) {
+ char* end = mapped_buffer + start_offset + step_size;
+ while(*end != '\n') {
+ end--;
+ }
+ if (i == nparser_threads - 1) {
+ end = mapped_buffer + file_stat.st_size;
+ }
+ shared_ptr<ParserContext> context = make_shared<ParserContext>();
+ context->start = mapped_buffer + start_offset;
+ context->end = end;
+ context->max_buffer_size = 0;
+ parser_contexts.push_back(context);
+ parser_threads.push_back(std::thread(parse_entry_point, context));
+ start_offset += (end - mapped_buffer - start_offset);
+ }
+ for (auto& t : parser_threads) {
+ t.join();
+ }
+ // reduce
+ for (auto context : parser_contexts) {
+ ops.insert(ops.end(), context->ops.begin(), context->ops.end());
+ max_buffer_size = max(context->max_buffer_size, max_buffer_size);
+ // context->ops.clear();
+ }
+ munmap(mapped_buffer, file_stat.st_size);
+ complete_parser_contexts.insert(complete_parser_contexts.end(), parser_contexts.begin(), parser_contexts.end());
}
- // reduce
- for (auto context : parser_contexts) {
- ops.insert(ops.end(), context->ops.begin(), context->ops.end());
- max_buffer_size = max(context->max_buffer_size, max_buffer_size);
- // context->ops.clear();
+
+ cout << "Sorting ops by date..." << endl;
+ sort(ops.begin(), ops.end(), op_comparison_by_date);
+ cout << "Sorting ops by date done" << endl;
+
+ if (skip_do_ops) {
+ return EXIT_SUCCESS;
}
-
-
+
int ret = cluster.init2("client.admin", "ceph", 0);
if (ret < 0) {
std::cerr << "Couldn't init ceph! error " << ret << std::endl;
std::cout << fmt::format("pool {} ready", pool) << std::endl;
- if (skip_do_ops) {
- return EXIT_SUCCESS;
- }
// process ops
vector<thread> worker_threads;
for (int i = 0; i < nworker_threads; i++) {