#include <algorithm>
#include <cassert>
#include <fcntl.h>
+#include <ranges>
#include <string_view>
#include <sys/mman.h>
#include <sys/stat.h>
}
}
+void worker_thread_entry(uint64_t id, uint64_t nworker_threads, vector<Op> &ops, uint64_t max_buffer_size, uint64_t io_depth, librados::IoCtx* io) {
+
+ bufferlist bl;
+ gen_buffer(bl, max_buffer_size);
+ hash<string> hasher;
+
+ cout << "starting thread " << io_depth << endl;
+ for (auto &op : ops) {
+ {
+ std::unique_lock<std::mutex> lock(in_flight_mutex);
+ cv.wait(lock, [&io_depth] { return in_flight_ops < io_depth; });
+
+ }
+ size_t key = hasher(*op.who) % nworker_threads;
+ if (key != id) {
+ continue;
+ }
+ // cout << fmt::format("Running op {} object={} range={}~{}", op.type, *op.object, op.offset, op.length) << endl;
+ op.completion = librados::Rados::aio_create_completion(static_cast<void*>(&op), completion_cb);
+ switch (op.type) {
+ case Write: {
+ int ret = io->aio_write(*op.object, op.completion, bl, op.length, op.offset);
+ if (ret != 0) {
+ cout << fmt::format("Error writing ecode={}", ret) << endl;;
+ }
+ break;
+ }
+ case WriteFull: {
+ int ret = io->aio_write_full(*op.object, op.completion, bl);
+ if (ret != 0) {
+ cout << fmt::format("Error writing full ecode={}", ret) << endl;;
+ }
+ break;
+ }
+ case Read: {
+ bufferlist read;
+ int ret = io->aio_read(*op.object, op.completion, &op.read_bl, op.length, op.offset);
+ if (ret != 0) {
+ cout << fmt::format("Error reading ecode={}", ret) << endl;;
+ }
+ break;
+ }
+ case Truncate: {
+ librados::ObjectWriteOperation write_operation;
+ write_operation.truncate(op.offset);
+ int ret = io->aio_operate(*op.object, op.completion, &write_operation);
+ if (ret != 0) {
+ cout << fmt::format("Error truncating ecode={}", ret) << endl;;
+ }
+ break;
+ }
+ case Zero: {
+ librados::ObjectWriteOperation write_operation;
+ write_operation.zero(op.offset, op.length);
+ int ret = io->aio_operate(*op.object, op.completion, &write_operation);
+ if (ret != 0) {
+ cout << fmt::format("Error zeroing ecode={}", ret) << endl;;
+ }
+ break;
+ }
+ }
+ in_flight_ops++;
+ }
+}
+
int main(int argc, char** argv) {
vector<Op> ops;
librados::Rados cluster;
librados::IoCtx io;
uint64_t max_buffer_size = 0;
- uint64_t io_depth = 64;
+ uint64_t io_depth = 8;
string file;
std::filesystem::path ceph_conf_path;
context->ops.clear();
}
-
-
int ret = cluster.init2("client.admin", "ceph", 0);
if (ret < 0) {
std::cerr << "Couldn't init ceph! error " << ret << std::endl;
// process ops
// Create a buffer big enough for every operation. We will take enoguh bytes from it for every operation
- bufferlist bl;
- gen_buffer(bl, max_buffer_size);
-
- for (auto &op : ops) {
- {
- std::unique_lock<std::mutex> lock(in_flight_mutex);
- cv.wait(lock, [&io_depth] { return in_flight_ops < io_depth; });
-
- }
- // cout << fmt::format("Running op {} object={} range={}~{}", op.type, *op.object, op.offset, op.length) << endl;
- op.completion = librados::Rados::aio_create_completion(static_cast<void*>(&op), completion_cb);
- switch (op.type) {
- case Write: {
- int ret = io.aio_write(*op.object, op.completion, bl, op.length, op.offset);
- if (ret != 0) {
- cout << fmt::format("Error writing ecode={}", ret) << endl;;
- }
- break;
- }
- case WriteFull: {
- int ret = io.aio_write_full(*op.object, op.completion, bl);
- if (ret != 0) {
- cout << fmt::format("Error writing full ecode={}", ret) << endl;;
- }
- break;
- }
- case Read: {
- bufferlist read;
- int ret = io.aio_read(*op.object, op.completion, &op.read_bl, op.length, op.offset);
- if (ret != 0) {
- cout << fmt::format("Error reading ecode={}", ret) << endl;;
- }
- break;
- }
- case Truncate: {
- librados::ObjectWriteOperation write_operation;
- write_operation.truncate(op.offset);
- int ret = io.aio_operate(*op.object, op.completion, &write_operation);
- if (ret != 0) {
- cout << fmt::format("Error truncating ecode={}", ret) << endl;;
- }
- break;
- }
- case Zero: {
- librados::ObjectWriteOperation write_operation;
- write_operation.zero(op.offset, op.length);
- int ret = io.aio_operate(*op.object, op.completion, &write_operation);
- if (ret != 0) {
- cout << fmt::format("Error zeroing ecode={}", ret) << endl;;
- }
- break;
- }
- }
- in_flight_ops++;
+ vector<thread> worker_threads;
+ uint64_t nworker_threads = 16;
+ for (int i = 0; i < nworker_threads; i++) {
+ worker_threads.push_back(thread(worker_thread_entry, i, nworker_threads, std::ref(ops), max_buffer_size, io_depth, &io));
+ }
+ for (auto& worker : worker_threads) {
+ worker.join();
}
while (in_flight_ops > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));