enum op_type {
Write,
- Read
+ WriteFull,
+ Read,
+ Truncate,
+ Zero
};
struct Op {
shared_ptr<string> who;
librados::AioCompletion *completion;
bufferlist read_bl;
-
+
Op(
time_t at,
op_type type,
uint64_t length,
shared_ptr<string> object,
shared_ptr<string> collection,
- shared_ptr<string> who
+ shared_ptr<string> who
) : at(at), type(type), offset(offset), length(length), object(object), collection(collection), who(who), completion(nullptr) {}
-
+
};
void gen_buffer(bufferlist& bl, uint64_t size) {
Op *op = static_cast<Op*>(arg);
// Process the completed operation here
std::cout << fmt::format("Completed op {} object={} range={}~{}", op->type, *op->object, op->offset, op->length) << std::endl;
-
+
delete op->completion;
op->completion = nullptr;
if (op->type == Read) {
- op->read_bl.clear();
+ op->read_bl.clear();
}
{
uint64_t io_depth = 64;
string file;
std::filesystem::path ceph_conf_path;
-
+
if (argc < 3) {
cout << fmt::format("usage: ops_replayer file ceph.conf") << endl;
}
file = argv[1];
ceph_conf_path = argv[2];
cout << file << endl;
-
-
-
+
+
+
string date, time, who, type, range, object, collection;
ifstream fstream(file, ifstream::in);
const char* date_format_first_column = "%Y-%m-%d";
continue;
}
fstream >> time >> who >> type >> range >> object >> collection;
-
+
date += " " + time;
cout << date << endl;
// FIXME: this is wrong but it returns a reasonable bad timestamp :P
const char* date_format_full = "%Y-%m-%d %H:%M:%S.%f%z";
res = strptime(date.c_str(), date_format_full, &t);
time_t at = mktime(&t);
-
+
cout << fmt::format("{} {} {} {} {} {} {}", date, at, who, type, range, object, collection) << endl;
-
+
shared_ptr<string> who_ptr = make_shared<string>(who);
auto who_it = string_cache.find(who);
if (who_it == string_cache.end()) {
} else {
who_ptr = who_it->second;
}
-
+
shared_ptr<string> object_ptr = make_shared<string>(object);
auto object_it = string_cache.find(object);
if (object_it == string_cache.end()) {
} else {
object_ptr = object_it->second;
}
-
+
+ op_type ot;
+ if (type == "write") {
+ ot = Write;
+ } else if (type == "writefull") {
+ ot = WriteFull;
+ } else if (type == "read") {
+ ot = Read;
+ } else if (type == "sparse-read") {
+ ot = Read;
+ } else if (type == "truncate") {
+ ot = Truncate;
+ } else if (type == "zero") {
+ ot = Zero;
+ } else {
+ cout << "invalid type " << type << endl;
+ exit(1);
+ }
+
shared_ptr<string> collection_ptr = make_shared<string>(collection);
auto collection_it = string_cache.find(collection);
if (collection_it == string_cache.end()) {
} else {
collection_ptr = collection_it->second;
}
-
+
uint64_t offset = 0, length = 0;
stringstream range_stream(range);
string offset_str, length_str;
getline(range_stream, offset_str, '~');
- getline(range_stream, length_str, '~');
offset = stoll(offset_str);
- length = stoll(length_str);
-
+
+ if (ot != Truncate) {
+ // Truncate doesn't only has one number
+ getline(range_stream, length_str, '~');
+ length = stoll(length_str);
+ }
+
max_buffer_size = max(length, max_buffer_size);
-
- op_type ot = type == "write" ? Write : Read;
+
ops.push_back(Op(at, ot, offset, length, object_ptr, collection_ptr, who_ptr));
}
-
+
int ret = cluster.init2("client.admin", "ceph", 0);
if (ret < 0) {
std::cerr << "Couldn't init ceph! error " << ret << std::endl;
return EXIT_FAILURE;
}
std::cout << "cluster init ready" << std::endl;
-
+
ret = cluster.conf_read_file(ceph_conf_path.c_str());
if (ret < 0) {
std::cerr << "Couldn't read the Ceph configuration file! error " << ret << std::endl;
exit(EXIT_FAILURE);
}
std::cout << "test-pool ready" << std::endl;
-
-
+
+
// process ops
// Create a buffer big enough for every operation. We will take enoguh bytes from it for every operation
bufferlist bl;
gen_buffer(bl, max_buffer_size);
-
+
for (auto &op : ops) {
{
std::unique_lock<std::mutex> lock(in_flight_mutex);
cv.wait(lock, [&io_depth] { return in_flight_ops < io_depth; });
-
+
}
cout << fmt::format("Running op {} object={} range={}~{}", op.type, *op.object, op.offset, op.length) << endl;
op.completion = librados::Rados::aio_create_completion(static_cast<void*>(&op), completion_cb);
}
break;
}
+ case WriteFull: {
+ int ret = io.aio_write_full(*op.object, op.completion, bl);
+ if (ret != 0) {
+ cout << fmt::format("Error writing full ecode={}", ret) << endl;;
+ }
+ break;
+ }
case Read: {
bufferlist read;
int ret = io.aio_read(*op.object, op.completion, &op.read_bl, op.length, op.offset);
}
break;
}
+ case Truncate: {
+ librados::ObjectWriteOperation write_operation;
+ write_operation.truncate(op.offset);
+ int ret = io.aio_operate(*op.object, op.completion, &write_operation);
+ if (ret != 0) {
+ cout << fmt::format("Error truncating ecode={}", ret) << endl;;
+ }
+ break;
+ }
+ case Zero: {
+ librados::ObjectWriteOperation write_operation;
+ write_operation.zero(op.offset, op.length);
+ int ret = io.aio_operate(*op.object, op.completion, &write_operation);
+ if (ret != 0) {
+ cout << fmt::format("Error zeroing ecode={}", ret) << endl;;
+ }
+ break;
+ }
}
in_flight_ops++;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// io.write(const std::string &oid, bufferlist &bl, size_t len, uint64_t off)
-
+
cout << ops.size() << endl;
return 0;
}