#include <algorithm>
+#include <boost/program_options/value_semantic.hpp>
#include <cassert>
+#include <cctype>
#include <cstdlib>
#include <fcntl.h>
#include <sys/mman.h>
#include <condition_variable>
#include <cstdint>
#include <ctime>
+#include <fstream>
#include <filesystem>
#include <mutex>
#include <rados/buffer_fwd.h>
};
struct ParserContext {
- map<string, shared_ptr<string>> string_cache;
+ map<string, shared_ptr<string>> collection_cache;
+ map<string, shared_ptr<string>> object_cache;
+ map<string, shared_ptr<string>> who_cache;
vector<Op> ops;
char *start; // starts and ends in new line or eof
char *end;
cv.notify_one();
}
+
+uint64_t timestamp_parser(std::string& date) {
+ uint64_t timestamp = 0;
+ uint64_t year, month, day, hour, minute, second;
+ // expeted format
+ // 2024-05-10 12:06:24.792232+00:00
+ // 0123456789012345678------------
+ year = std::stoull(date.substr(0, 4));
+ month = std::stoull(date.substr(5, 2));
+ day = std::stoull(date.substr(8, 2));
+ hour = std::stoull(date.substr(11, 2));
+ minute = std::stoull(date.substr(14, 2));
+ second = std::stoull(date.substr(17, 2));
+ // SECONDS SINCE JAN 01 1970. (UTC), we don't care about timestamp timezone accuracy
+ timestamp += (year - 1970) * 365 * 24 * 60 * 60;
+ timestamp += (month * 30 * 24 * 60 * 60); // Yes, 30 day month is the best format ever and you cannot complain
+ timestamp += (day * 24 * 60 * 60);
+ timestamp += (hour * 60 * 60);
+ timestamp += (minute * 60);
+ timestamp += second;
+ return timestamp;
+}
+
void parse_entry_point(shared_ptr<ParserContext> context) {
cout << fmt::format("Starting parser thread start={:p} end={:p}", context->start, context->end) << endl;
string date, time, who, type, range, object, collection;
MemoryInputStream fstream(context->start, context->end);
- const char* date_format_first_column = "%Y-%m-%d";
// we expect this input:
// 2024-05-10 12:06:24.990831+00:00 client.607247697.0:5632274 write 4096~4096 2:d03a455a:::08b0f2fd5f20f504e76c2dd3d24683a1:head 2.1c0b
while (fstream >> date){
// cout << date << endl;
- tm t;
- char* res = strptime(date.c_str(), date_format_first_column, &t);
- if (res == nullptr) {
+ if (!(date.size() > 4 && isdigit(date[0]) && isdigit(date[1]) && isdigit(date[2]) && isdigit(date[3]) && date[4] == '-')) {
fstream.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
continue;
+
}
fstream >> time >> who >> type >> range >> object >> collection;
date += " " + time;
// cout << date << endl;
// FIXME: this is wrong but it returns a reasonable bad timestamp :P
- const char* date_format_full = "%Y-%m-%d %H:%M:%S.%f%z";
- res = strptime(date.c_str(), date_format_full, &t);
- time_t at = mktime(&t);
+ // const char* date_format_full = "%Y-%m-%d %H:%M:%S.%f%z";
+ // res = strptime(date.c_str(), date_format_full, &t);
+ // time_t at = mktime(&t);
+ time_t at = timestamp_parser(date);
// cout << fmt::format("{} {} {} {} {} {} {}", date, at, who, type, range, object, collection) << endl;
shared_ptr<string> who_ptr = make_shared<string>(who);
- auto who_it = string_cache.find(who);
- if (who_it == string_cache.end()) {
- string_cache.insert({ who, who_ptr });
+ auto who_it = context->who_cache.find(who);
+ if (who_it == context->who_cache.end()) {
+ context->who_cache.insert({ who, who_ptr });
} else {
who_ptr = who_it->second;
}
shared_ptr<string> object_ptr = make_shared<string>(object);
- auto object_it = string_cache.find(object);
- if (object_it == string_cache.end()) {
- string_cache.insert({ object, object_ptr });
+ auto object_it = context->object_cache.find(object);
+ if (object_it == context->object_cache.end()) {
+ context->object_cache.insert({ object, object_ptr });
} else {
object_ptr = object_it->second;
}
op_type ot;
- if (type == "write") {
- ot = Write;
- } else if (type == "writefull") {
- ot = WriteFull;
- } else if (type == "read") {
+ switch (type[0]) {
+ case 'r': {
ot = Read;
- } else if (type == "sparse-read") {
+ break;
+ }
+ case 's': {
ot = Read;
- } else if (type == "truncate") {
- ot = Truncate;
- } else if (type == "zero") {
+ break;
+ }
+ case 'z': {
ot = Zero;
- } else {
+ break;
+ }
+ case 't': {
+ ot = Truncate;
+ break;
+ }
+ case 'w': {
+ if (type.size() > 6) {
+ ot = WriteFull;
+ } else {
+ ot = Write;
+ }
+ break;
+ }
+ default: {
cout << "invalid type " << type << endl;
exit(1);
+ }
}
shared_ptr<string> collection_ptr = make_shared<string>(collection);
- auto collection_it = string_cache.find(collection);
- if (collection_it == string_cache.end()) {
- string_cache.insert({ collection, collection_ptr });
+ auto collection_it = context->collection_cache.find(collection);
+ if (collection_it == context->collection_cache.end()) {
+ context->collection_cache.insert({ collection, collection_ptr });
} else {
collection_ptr = collection_it->second;
}
string file("input.txt");
string ceph_conf_path("./ceph.conf");
string pool("test_pool");
+ string input_ir_output("");
+ bool skip_do_ops = false;
po::options_description po_options("Options");
po_options.add_options()
("parser-threads", po::value<uint64_t>(&nparser_threads)->default_value(16), "Number of parser threads")
("worker-threads", po::value<uint64_t>(&nworker_threads)->default_value(16), "Number of I/O worker threads")
("pool", po::value<string>(&pool)->default_value("test_pool"), "Pool to use for I/O")
+ ("optimized-input-path", po::value<string>(&input_ir_output)->default_value(""), "Create a new input file that is optimzed for parsing. If not empty it will create it")
+ ("skip-do-ops", po::bool_switch(&skip_do_ops)->default_value(false), "Skip doing operations")
;
po::options_description po_all("All options");
}
// reduce
for (auto context : parser_contexts) {
- string_cache.insert(context->string_cache.begin(), context->string_cache.end());
+ string_cache.insert(context->object_cache.begin(), context->object_cache.end());
+ string_cache.insert(context->collection_cache.begin(), context->collection_cache.end());
+ string_cache.insert(context->who_cache.begin(), context->who_cache.end());
ops.insert(ops.end(), context->ops.begin(), context->ops.end());
max_buffer_size = max(context->max_buffer_size, max_buffer_size);
- context->string_cache.clear();
+ context->object_cache.clear();
+ context->collection_cache.clear();
+ context->who_cache.clear();
context->ops.clear();
}
+ if (!input_ir_output.empty()) {
+ // Create an optimized file for parsing
+ ofstream output(input_ir_output, ios::out);
+ output << "input-optimized" << endl;
+ }
+
int ret = cluster.init2("client.admin", "ceph", 0);
if (ret < 0) {
std::cerr << "Couldn't init ceph! error " << ret << std::endl;
std::cout << fmt::format("pool {} ready", pool) << std::endl;
+ if (skip_do_ops) {
+ return EXIT_SUCCESS;
+ }
// process ops
vector<thread> worker_threads;
for (int i = 0; i < nworker_threads; i++) {
}
cout << ops.size() << endl;
- return 0;
+ return EXIT_SUCCESS;
}