From: Patrick Donnelly Date: Wed, 21 Jan 2026 17:25:31 +0000 (-0500) Subject: tools/cephfs: add new cephfs-tool X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a6245eec07d3a7525fbe293da2f7c2ae4cf5c349;p=ceph.git tools/cephfs: add new cephfs-tool This patch introduces `cephfs-tool`, a new standalone C++ utility designed to interact directly with `libcephfs`. While the tool is architected to support various subcommands in the future, the initial implementation focuses on a `bench` command to measure library performance. This allows developers and administrators to benchmark the userspace library isolated from FUSE or Kernel client overheads. Key features include: * Multi-threaded Read/Write throughput benchmarking. * Configurable block sizes, file counts, and fsync intervals. * Detailed statistical reporting (Mean, Std Dev, Min/Max) for throughput and IOPS. * Support for specific CephFS user/group impersonation (UID/GID) via `ceph_mount_perms_set`. As an example test on a "trial" sepia machine against the new LRC, I used a command like: pdonnell@trial154:~$ env CEPH_ARGS="--log-to-stderr=false --log-to-file=false --log-file=/tmp/bench.log" ./cephfs-tool -c ~/ceph.conf -k ~/keyring -i scratch --filesystem scratch bench --root-path=/pdonnell --files 256 --size=$(( 128 * 2 ** 20 )) --threads=8 --iterations 3 Benchmark Configuration: Threads: 8 | Iterations: 3 Files: 256 | Size: 134217728 Filesystem: scratch Root: /pdonnell Subdirectory: bench_run_d942 UID: -1 GID: -1 --- Iteration 1 of 3 --- Starting Write Phase... Write: 2761.97 MB/s, 21.5779 files/s (11.864s) Starting Read Phase... Read: 2684.36 MB/s, 20.9716 files/s (12.207s) --- Iteration 2 of 3 --- Starting Write Phase... Write: 2698.51 MB/s, 21.0821 files/s (12.143s) Starting Read Phase... Read: 2682.16 MB/s, 20.9544 files/s (12.217s) --- Iteration 3 of 3 --- Starting Write Phase... Write: 2720.69 MB/s, 21.2554 files/s (12.044s) Starting Read Phase... Read: 2695.18 MB/s, 21.0561 files/s (12.158s) *** Final Report *** Write Throughput Statistics (3 runs): Mean: 2727.06 MB/s Std Dev: 26.2954 MB/s Min: 2698.51 MB/s Max: 2761.97 MB/s Read Throughput Statistics (3 runs): Mean: 2687.24 MB/s Std Dev: 5.68904 MB/s Min: 2682.16 MB/s Max: 2695.18 MB/s File Creates Statistics (3 runs): Mean: 21.3051 files/s Std Dev: 0.205433 files/s Min: 21.0821 files/s Max: 21.5779 files/s File Reads (Opens) Statistics (3 runs): Mean: 20.994 files/s Std Dev: 0.0444456 files/s Min: 20.9544 files/s Max: 21.0561 files/s Cleaning up... For a 25Gb NIC, this is just about saturating the sticker bandwidth with a single shared mount and 8 threads. For a per-thread mount: pdonnell@trial154:~$ env CEPH_ARGS="--log-to-stderr=false --log-to-file=false --log-file=/tmp/bench.log" ./cephfs-tool -c ~/ceph.conf -k ~/keyring -i scratch --filesystem scratch bench --root-path=/pdonnell --files 256 --size=$(( 128 * 2 ** 20 )) --threads=8 --iterations 3 --per-thread-mount Benchmark Configuration: Threads: 8 | Iterations: 3 Files: 256 | Size: 134217728 Filesystem: scratch Root: /pdonnell Subdirectory: bench_run_9d1c UID: -1 GID: -1 --- Iteration 1 of 3 --- Starting Write Phase... Write: 2691.2 MB/s, 21.025 files/s (12.176s) Starting Read Phase... Read: 2486.76 MB/s, 19.4278 files/s (13.177s) --- Iteration 2 of 3 --- Starting Write Phase... Write: 2688.77 MB/s, 21.006 files/s (12.187s) Starting Read Phase... Read: 2496.42 MB/s, 19.5033 files/s (13.126s) --- Iteration 3 of 3 --- Starting Write Phase... Write: 2692.08 MB/s, 21.0319 files/s (12.172s) Starting Read Phase... Read: 2488.27 MB/s, 19.4396 files/s (13.169s) *** Final Report *** Write Throughput Statistics (3 runs): Mean: 2690.68 MB/s Std Dev: 1.40086 MB/s Min: 2688.77 MB/s Max: 2692.08 MB/s Read Throughput Statistics (3 runs): Mean: 2490.48 MB/s Std Dev: 4.24374 MB/s Min: 2486.76 MB/s Max: 2496.42 MB/s File Creates Statistics (3 runs): Mean: 21.0209 files/s Std Dev: 0.0109442 files/s Min: 21.006 files/s Max: 21.0319 files/s File Reads (Opens) Statistics (3 runs): Mean: 19.4569 files/s Std Dev: 0.0331542 files/s Min: 19.4278 files/s Max: 19.5033 files/s Cleaning up... Or to measure file create performance: pdonnell@trial154:~$ env CEPH_ARGS="--log-to-stderr=false --log-to-file=false --log-file=/tmp/bench.log" ./cephfs-tool -c ~/ceph.conf -k ~/keyring -i scratch --filesystem scratch bench --root-path=/pdonnell --files=$(( 2 ** 16 )) --size=$(( 0 * 2 ** 20 )) --threads=8 --iterations 3 Benchmark Configuration: Threads: 8 | Iterations: 3 Files: 65536 | Size: 0 Filesystem: scratch Root: /pdonnell Subdirectory: bench_run_d435 UID: -1 GID: -1 --- Iteration 1 of 3 --- Starting Write Phase... Write: 3974.77 files/s (16.488s) Starting Read Phase... Read: 14537.7 files/s (4.508s) Cleaning up for next iteration... --- Iteration 2 of 3 --- Starting Write Phase... Write: 4167.1 files/s (15.727s) Starting Read Phase... Read: 13636.3 files/s (4.806s) Cleaning up for next iteration... --- Iteration 3 of 3 --- Starting Write Phase... Write: 3863.7 files/s (16.962s) Starting Read Phase... Read: 14972.8 files/s (4.377s) *** Final Report *** File Creates Statistics (3 runs): Mean: 4001.86 files/s Std Dev: 125.337 files/s Min: 3863.7 files/s Max: 4167.1 files/s File Reads (Opens) Statistics (3 runs): Mean: 14382.3 files/s Std Dev: 556.594 files/s Min: 13636.3 files/s Max: 14972.8 files/s Cleaning up... Here is the current help text: Usage: cephfs-bench [general-options] [command-options] Commands: bench Run IO benchmark Allowed options: General Options: -h [ --help ] Produce help message -c [ --conf ] arg Ceph config file path -i [ --id ] arg (=admin) Client ID -k [ --keyring ] arg Path to keyring file --filesystem arg CephFS filesystem name to mount --uid arg (=-1) User ID to mount as --gid arg (=-1) Group ID to mount as Benchmark Options (used with 'bench' command): --threads arg (=1) Number of threads --iterations arg (=1) Number of iterations --files arg (=100) Total number of files --size arg (=4MB) File size (e.g. 4MB, 0 for creates only) --block-size arg (=4MB) IO block size (e.g. 1MB) --fsync-every arg (=0) Call fsync every N bytes --prefix arg (=benchmark_) Filename prefix --dir-prefix arg (=bench_run_) Directory prefix --root-path arg (=/) Root path in CephFS --per-thread-mount Use separate mount per thread --no-cleanup Disable cleanup of files AI-Assisted: significant portions of this code were AI-generated through a dozens of iterative prompts. Signed-off-by: Patrick Donnelly --- diff --git a/ceph.spec.in b/ceph.spec.in index d4cabb21cfb9..df7b3050ca25 100644 --- a/ceph.spec.in +++ b/ceph.spec.in @@ -1785,6 +1785,7 @@ exit 0 %{_bindir}/cephfs-data-scan %{_bindir}/cephfs-journal-tool %{_bindir}/cephfs-table-tool +%{_bindir}/cephfs-tool %{_bindir}/crushdiff %{_bindir}/rados %{_bindir}/radosgw-admin diff --git a/debian/ceph-common.install b/debian/ceph-common.install index 68b1aa9d06fc..cc5632e805f6 100755 --- a/debian/ceph-common.install +++ b/debian/ceph-common.install @@ -15,6 +15,7 @@ usr/bin/ceph-syn usr/bin/cephfs-data-scan usr/bin/cephfs-journal-tool usr/bin/cephfs-table-tool +usr/bin/cephfs-tool usr/bin/crushdiff usr/bin/rados usr/bin/radosgw-admin diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index fbec0c9ba3fd..0047c771fecb 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -632,7 +632,7 @@ if (WITH_BLUESTORE) add_dependencies(tests ceph_test_bluefs) endif() if (WITH_CEPHFS) - add_dependencies(tests ceph-mds) + add_dependencies(tests ceph-mds cephfs-tool) endif() if(WITH_MGR) add_dependencies(tests ceph-mgr) diff --git a/src/tools/cephfs/CMakeLists.txt b/src/tools/cephfs/CMakeLists.txt index ea36a1645eca..3a97b05caa99 100644 --- a/src/tools/cephfs/CMakeLists.txt +++ b/src/tools/cephfs/CMakeLists.txt @@ -48,10 +48,18 @@ target_link_libraries(cephfs-data-scan ceph-common librados cephfs mds osdc glob cls_cephfs_client ${BLKID_LIBRARIES} ${CMAKE_DL_LIBS} Boost::filesystem) +set(cephfs_tool_srcs + cephfs-tool.cc) +add_executable(cephfs-tool ${cephfs_tool_srcs}) +target_include_directories(cephfs-tool PRIVATE ${PROJECT_SOURCE_DIR}/src/include) +target_link_libraries(cephfs-tool ceph-common librados cephfs osdc global uring::uring + ${BLKID_LIBRARIES} ${CMAKE_DL_LIBS} Boost::filesystem) + install(TARGETS cephfs-journal-tool cephfs-table-tool cephfs-data-scan + cephfs-tool DESTINATION bin) option(WITH_CEPHFS_SHELL "install cephfs-shell" OFF) diff --git a/src/tools/cephfs/cephfs-tool.cc b/src/tools/cephfs/cephfs-tool.cc new file mode 100644 index 000000000000..9d38c88ada61 --- /dev/null +++ b/src/tools/cephfs/cephfs-tool.cc @@ -0,0 +1,766 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2026 IBM Corp. + * + * This is free software; you can redistribute it and/or modify it under the + * terms of the GNU Lesser General Public License version 2.1, as published by + * the Free Software Foundation. See file COPYING. + */ + +/* + * cephfs-bench - Standalone tool to interact with cephfs + * Can be built outside Ceph: g++ --std=c++20 -D_FILE_OFFSET_BITS=64 -O3 -o cephfs-tool cephfs-tool.cc -lcephfs -lpthread -lboost_program_options + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// Standard Public Ceph API +#include +#include +#include + +// Boost Program Options +#include + +using std::cerr; +using std::cout; +using std::endl; +using std::string; +using std::vector; +using std::chrono::steady_clock; +using std::chrono::duration_cast; +using std::chrono::milliseconds; +namespace po = boost::program_options; + +// --- Helper: Parse sizes like "4MB", "1G" --- +uint64_t parse_size(const string& val) { + if (val.empty()) { + return 0; + } + size_t end_pos = 0; + uint64_t num = 0; + try { + num = std::stoull(val, &end_pos); + } catch (...) { + return 0; + } + + if (end_pos < val.length()) { + char suffix = std::toupper(val[end_pos]); + switch (suffix) { + case 'K': + num *= 1024; + break; + case 'M': + num *= 1024ULL * 1024; + break; + case 'G': + num *= 1024ULL * 1024 * 1024; + break; + case 'T': + num *= 1024ULL * 1024 * 1024 * 1024; + break; + } + } + return num; +} + +struct RandomHelper { + // Thread-local engine to prevent locking contention and re-seeding overhead + static std::mt19937& get_engine() { + static thread_local std::mt19937 engine(std::random_device{}()); + return engine; + } + + static void fill_buffer(std::span buf) { + auto& gen = get_engine(); + std::uniform_int_distribution<> dis(0, 255); + for (size_t i = 0; i < buf.size(); ++i) { + buf[i] = static_cast(dis(gen)); + } + } + + // Generates a random hex suffix (e.g., for unique directories) + static std::string generate_hex_suffix() { + auto& gen = get_engine(); + std::uniform_int_distribution<> dis(0, 0xFFFF); + std::stringstream ss; + ss << std::hex << dis(gen); + return ss.str(); + } +}; + +// Configuration for the benchmark +struct BenchConfig { + int num_threads; + int iterations; + int num_files; + uint64_t file_size; + uint64_t block_size; + uint64_t fsync_every_bytes; + bool per_thread_mount; + string prefix; + string mount_root; + bool cleanup; + string dir_prefix; + string ceph_conf; + string userid; + string keyring; + string filesystem; + string subdir; + int uid; + int gid; +}; + +struct ThreadStats { + uint64_t bytes_transferred = 0; + uint64_t ops = 0; // read/write calls + uint64_t files = 0; // files successfully opened/closed + int errors = 0; +}; + +// --- Setup Helper (Updated to use stream for output) --- +int setup_mount(struct ceph_mount_info **cmount, const BenchConfig& config, std::ostream& out_stream) { + if (int rc = ceph_create(cmount, config.userid.empty() ? NULL : config.userid.c_str()); rc < 0) { + out_stream << "Failed to create ceph instance: " << strerror(-rc) << endl; + return rc; + } + + auto cleanup_on_fail = [&](int rc) { + ceph_shutdown(*cmount); + *cmount = nullptr; + return rc; + }; + + // 1. Read Config File (sets defaults) + if (!config.ceph_conf.empty()) { + if (int rc = ceph_conf_read_file(*cmount, config.ceph_conf.c_str()); rc < 0) { + out_stream << "Failed to read ceph config file '" << config.ceph_conf << "': " << strerror(-rc) << endl; + return cleanup_on_fail(rc); + } + } else { + if (int rc = ceph_conf_read_file(*cmount, NULL); rc < 0) { // Search default locations + out_stream << "Failed to read default ceph config: " << strerror(-rc) << endl; + return cleanup_on_fail(rc); + } + } + + // 2. Apply Keyring Override (if provided) + if (!config.keyring.empty()) { + if (int rc = ceph_conf_set(*cmount, "keyring", config.keyring.c_str()); rc < 0) { + out_stream << "Failed to set keyring option: " << strerror(-rc) << endl; + return cleanup_on_fail(rc); + } + } + + // 3. Apply Filesystem Selection (if provided) + if (!config.filesystem.empty()) { + if (int rc = ceph_conf_set(*cmount, "client_mds_namespace", config.filesystem.c_str()); rc < 0) { + out_stream << "Failed to set filesystem (client_mds_namespace): " << strerror(-rc) << endl; + return cleanup_on_fail(rc); + } + } + + if (int rc = ceph_init(*cmount); rc < 0) { + out_stream << "Failed to initialize ceph client: " << strerror(-rc) << endl; + return cleanup_on_fail(rc); + } + + // 4. Apply UID/GID Permissions (if provided) + if (config.uid != -1 || config.gid != -1) { + UserPerm *perms = ceph_userperm_new(config.uid, config.gid, 0, NULL); + if (!perms) { + out_stream << "Failed to allocate user permissions struct." << endl; + return cleanup_on_fail(-ENOMEM); + } + + int rc = ceph_mount_perms_set(*cmount, perms); + ceph_userperm_destroy(perms); // Cleanup perms object after setting + + if (rc != 0) { + out_stream << "Failed to set mount permissions (uid=" << config.uid + << ", gid=" << config.gid << "): " << strerror(-rc) << endl; + return cleanup_on_fail(rc); + } + } + + // 5. Mount + if (int rc = ceph_mount(*cmount, config.mount_root.c_str()); rc < 0) { + out_stream << "Failed to mount at '" << config.mount_root << "': " << strerror(-rc) << endl; + return cleanup_on_fail(rc); + } + + return 0; +} + +// Worker function for Write phase +void bench_write_worker(int thread_id, + int files_to_write, + BenchConfig config, + struct ceph_mount_info *shared_cmount, + ThreadStats &stats, + std::atomic& stop_signal, + std::stringstream& ss) { + + struct ceph_mount_info *cmount = shared_cmount; + + if (config.per_thread_mount) { + if (int rc = setup_mount(&cmount, config, ss); rc < 0) { + ss << "Thread " << thread_id << " mount failed: " << strerror(-rc) << std::endl; + stats.errors++; + stop_signal = true; // Signal other threads to stop + return; + } + } + + auto buffer = std::vector(config.block_size); + RandomHelper::fill_buffer(std::as_writable_bytes(std::span(buffer))); + for (int i = 0; i < files_to_write; ++i) { + if (stop_signal) { + break; // Check if we should stop + } + + string fname = config.subdir + "/" + config.prefix + std::to_string(thread_id) + "_" + std::to_string(i); + + // O_CREAT ensures we measure creation overhead + int fd = ceph_open(cmount, fname.c_str(), O_CREAT | O_WRONLY | O_TRUNC, 0644); + if (fd < 0) { + ss << "Thread " << thread_id << " open failed " << fname << ": " << strerror(-fd) << std::endl; + stats.errors++; + stop_signal = true; + break; + } + + uint64_t written = 0; + uint64_t last_sync = 0; + bool write_error = false; + + while (written < config.file_size) { + if (stop_signal) { + break; + } + + uint64_t to_write = std::min(config.block_size, config.file_size - written); + if (int rc = ceph_write(cmount, fd, buffer.data(), to_write, -1); rc < 0) { + ss << "Thread " << thread_id << " write error: " << strerror(-rc) << std::endl; + stats.errors++; + stop_signal = true; + write_error = true; + break; + } else { + written += rc; + stats.bytes_transferred += rc; + stats.ops++; + } + + if (config.fsync_every_bytes > 0 && (written - last_sync) >= config.fsync_every_bytes) { + if (int rc = ceph_fsync(cmount, fd, 0); rc < 0) { + ss << "Thread " << thread_id << " fsync error: " << strerror(-rc) << std::endl; + stats.errors++; + stop_signal = true; + write_error = true; + break; + } + last_sync = written; + } + } + + if (!write_error && !stop_signal) { + if (int rc = ceph_close(cmount, fd); rc < 0) { + ss << "Thread " << thread_id << " close error " << fname << ": " << strerror(-rc) << std::endl; + stats.errors++; + stop_signal = true; + break; + } + stats.files++; + } else { + // Attempt close on error, ignore result + ceph_close(cmount, fd); + break; + } + } + + if (config.per_thread_mount) { + if (int rc = ceph_unmount(cmount); rc < 0) { + ss << "Thread " << thread_id << " unmount failed: " << strerror(-rc) << std::endl; + // Not critical enough to stop others, but log it + } + ceph_shutdown(cmount); + } +} + +// Worker function for Read phase +void bench_read_worker(int thread_id, + int files_to_read, + BenchConfig config, + struct ceph_mount_info *shared_cmount, + ThreadStats &stats, + std::atomic& stop_signal, + std::stringstream& ss) { + + struct ceph_mount_info *cmount = shared_cmount; + + if (config.per_thread_mount) { + if (int rc = setup_mount(&cmount, config, ss); rc < 0) { + stats.errors++; + stop_signal = true; + return; + } + } + + std::vector buffer(config.block_size); + + for (int i = 0; i < files_to_read; ++i) { + if (stop_signal) { + break; + } + + string fname = config.subdir + "/" + config.prefix + std::to_string(thread_id) + "_" + std::to_string(i); + + int fd = ceph_open(cmount, fname.c_str(), O_RDONLY, 0); + if (fd < 0) { + ss << "Thread " << thread_id << " open failed " << fname << ": " << strerror(-fd) << std::endl; + stats.errors++; + stop_signal = true; + break; + } + + uint64_t total_read = 0; + while (total_read < config.file_size) { + if (stop_signal) { + break; + } + + int rc = ceph_read(cmount, fd, buffer.data(), config.block_size, -1); + if (rc < 0) { + ss << "Thread " << thread_id << " read error: " << strerror(-rc) << std::endl; + stats.errors++; + stop_signal = true; + break; + } + if (rc == 0) { + break; // EOF + } + + total_read += rc; + stats.bytes_transferred += rc; + stats.ops++; + } + + if (int rc = ceph_close(cmount, fd); rc < 0) { + ss << "Thread " << thread_id << " close error " << fname << ": " << strerror(-rc) << std::endl; + stats.errors++; + stop_signal = true; + break; + } + stats.files++; + } + + if (config.per_thread_mount) { + if (int rc = ceph_unmount(cmount); rc < 0) { + ss << "Thread " << thread_id << " unmount failed: " << strerror(-rc) << std::endl; + } + ceph_shutdown(cmount); + } +} + +// Worker function for Cleanup (Unlink) phase +void bench_cleanup_worker(int thread_id, + int files_to_clean, + BenchConfig config, + struct ceph_mount_info *shared_cmount, + std::atomic& stop_signal, + std::stringstream& ss) { + + struct ceph_mount_info *cmount = shared_cmount; + + if (config.per_thread_mount) { + if (int rc = setup_mount(&cmount, config, ss); rc < 0) { + ss << "Thread " << thread_id << " cleanup mount failed: " << strerror(-rc) << std::endl; + stop_signal = true; + return; + } + } + + for (int i = 0; i < files_to_clean; ++i) { + // If stop signal is raised (e.g. fatal error elsewhere), stop processing + if (stop_signal) break; + + string fname = config.subdir + "/" + config.prefix + std::to_string(thread_id) + "_" + std::to_string(i); + + if (int rc = ceph_unlink(cmount, fname.c_str()); rc < 0) { + // Ignore ENOENT (file already gone), but report others + if (rc != -ENOENT) { + ss << "Thread " << thread_id << " unlink error " << fname << ": " << strerror(-rc) << std::endl; + // don't stop cleanup + } + } + } + + if (config.per_thread_mount) { + if (int rc = ceph_unmount(cmount); rc < 0) { + ss << "Thread " << thread_id << " unmount failed: " << strerror(-rc) << std::endl; + } + ceph_shutdown(cmount); + } +} + +void print_statistics(const string& type, const vector& rates, const string& unit) { + if (rates.empty()) { + return; + } + double sum = std::accumulate(rates.begin(), rates.end(), 0.0); + double mean = sum / rates.size(); + double sq_sum = std::inner_product(rates.begin(), rates.end(), rates.begin(), 0.0); + double stdev = std::sqrt(sq_sum / rates.size() - mean * mean); + double min_val = *std::min_element(rates.begin(), rates.end()); + double max_val = *std::max_element(rates.begin(), rates.end()); + + cout << "\n" << type << " Statistics (" << rates.size() << " runs):" << std::endl; + cout << " Mean: " << mean << " " << unit << std::endl; + cout << " Std Dev: " << stdev << " " << unit << std::endl; + cout << " Min: " << min_val << " " << unit << std::endl; + cout << " Max: " << max_val << " " << unit << std::endl; +} + +// Helper to check for errors and print them +bool check_and_report_errors(const std::atomic& stop_signal, + const std::vector& outputs) { + bool has_output = false; + for (const auto& ss : outputs) { + if (ss.rdbuf()->in_avail() > 0) has_output = true; + } + + if (stop_signal || has_output) { + if (stop_signal) { + cerr << "\n*** ERRORS ENCOUNTERED ***" << endl; + } else { + cerr << "\n*** WARNINGS/LOGS ***" << endl; + } + for (const auto& ss : outputs) { + string msg = ss.str(); + if (!msg.empty()) { + cerr << msg; + } + } + return stop_signal; // Return true if it was a critical stop + } + return false; +} + +int do_bench(BenchConfig& config) { + if (config.block_size > std::numeric_limits::max()) { + cerr << "Error: block-size cannot exceed 2GB due to API limitations." << endl; + return 1; + } + + // Create Main Mount + struct ceph_mount_info *shared_cmount = NULL; + if (int rc = setup_mount(&shared_cmount, config, cerr); rc < 0) { + cerr << "Failed to create/mount global handle. (Is ceph.conf valid?): " << strerror(-rc) << std::endl; + return 1; + } + + // Setup bench directory + config.subdir = config.dir_prefix + RandomHelper::generate_hex_suffix(); + + cout << "Benchmark Configuration:" << std::endl; + cout << " Threads: " << config.num_threads << " | Iterations: " << config.iterations << std::endl; + cout << " Files: " << config.num_files << " | Size: " << config.file_size << std::endl; + cout << " Filesystem: " << (config.filesystem.empty() ? "(default)" : config.filesystem) << std::endl; + cout << " Root: " << config.mount_root << std::endl; + cout << " Subdirectory: " << config.subdir << std::endl; + cout << " UID: " << config.uid << std::endl; + cout << " GID: " << config.gid << std::endl; + + if (int rc = ceph_mkdir(shared_cmount, config.subdir.c_str(), 0755); rc < 0) { + cerr << "Failed to create bench directory '" << config.subdir << "': " << strerror(-rc) << std::endl; + return 1; + } + + int files_per_thread = config.num_files / config.num_threads; + int remainder = config.num_files % config.num_threads; + + std::vector write_mbps; + std::vector write_fps; + std::vector read_mbps; + std::vector read_fps; + + std::atomic stop_signal{false}; + + for (int iter = 1; iter <= config.iterations; ++iter) { + cout << "\n--- Iteration " << iter << " of " << config.iterations << " ---" << std::endl; + + // --- WRITE PHASE --- + cout << "Starting Write Phase..." << std::endl; + std::vector threads; + std::vector write_stats(config.num_threads); + auto thread_outputs = std::vector(config.num_threads); + + auto start_time = steady_clock::now(); + + for (int i = 0; i < config.num_threads; ++i) { + int f_count = files_per_thread + (i < remainder ? 1 : 0); + struct ceph_mount_info *worker_mount = config.per_thread_mount ? NULL : shared_cmount; + threads.emplace_back(bench_write_worker, i, f_count, config, worker_mount, + std::ref(write_stats[i]), std::ref(stop_signal), std::ref(thread_outputs[i])); + } + for (auto& t : threads) { + t.join(); + } + + if (check_and_report_errors(stop_signal, thread_outputs)) { + if (int rc = ceph_unmount(shared_cmount); rc < 0) { + cerr << "Unmount error: " << strerror(-rc) << endl; + } + ceph_shutdown(shared_cmount); + return 1; + } + + auto end_time = steady_clock::now(); + + uint64_t total_write_bytes = 0; + uint64_t total_files = 0; + for (const auto& s : write_stats) { + total_write_bytes += s.bytes_transferred; + total_files += s.files; + } + + double elapsed_sec = duration_cast(end_time - start_time).count() / 1000.0; + double w_rate = (double)total_write_bytes / 1024.0 / 1024.0 / elapsed_sec; + double w_fps = (double)total_files / elapsed_sec; + + write_mbps.push_back(w_rate); + write_fps.push_back(w_fps); + + cout << " Write: "; + if (config.file_size > 0) { + cout << w_rate << " MB/s, "; + } + cout << w_fps << " files/s (" << elapsed_sec << "s)" << std::endl; + + // --- REMOUNT / CACHE CLEAR --- + if (!config.per_thread_mount) { + if (int rc = ceph_unmount(shared_cmount); rc < 0) { + cerr << "Unmount failed during cache clear: " << strerror(-rc) << endl; + return 1; + } + ceph_shutdown(shared_cmount); + if (int rc = setup_mount(&shared_cmount, config, cerr); rc < 0) { + cerr << "Failed to create/mount global handle. (Is ceph.conf valid?): " << strerror(-rc) << std::endl; + return 1; + } + } + + // --- READ PHASE --- + cout << "Starting Read Phase..." << std::endl; + threads.clear(); + std::vector read_stats(config.num_threads); + thread_outputs = std::vector(config.num_threads); + + start_time = steady_clock::now(); + + for (int i = 0; i < config.num_threads; ++i) { + int f_count = files_per_thread + (i < remainder ? 1 : 0); + struct ceph_mount_info *worker_mount = config.per_thread_mount ? NULL : shared_cmount; + threads.emplace_back(bench_read_worker, i, f_count, config, worker_mount, + std::ref(read_stats[i]), std::ref(stop_signal), std::ref(thread_outputs[i])); + } + for (auto& t : threads) { + t.join(); + } + + if (check_and_report_errors(stop_signal, thread_outputs)) { + if (int rc = ceph_unmount(shared_cmount); rc < 0) { + cerr << "Unmount error: " << strerror(-rc) << endl; + } + ceph_shutdown(shared_cmount); + return 1; + } + + end_time = steady_clock::now(); + + uint64_t total_read_bytes = 0; + total_files = 0; + for (const auto& s : read_stats) { + total_read_bytes += s.bytes_transferred; + total_files += s.files; + } + + elapsed_sec = duration_cast(end_time - start_time).count() / 1000.0; + double r_rate = (double)total_read_bytes / 1024.0 / 1024.0 / elapsed_sec; + double r_fps = (double)total_files / elapsed_sec; + + read_mbps.push_back(r_rate); + read_fps.push_back(r_fps); + + cout << " Read: "; + if (config.file_size > 0) { + cout << r_rate << " MB/s, "; + } + cout << r_fps << " files/s (" << elapsed_sec << "s)" << std::endl; + + // Cleanup for next iteration + if (iter < config.iterations) { + cout << "Cleaning up for next iteration..." << std::endl; + threads.clear(); + thread_outputs = std::vector(config.num_threads); + stop_signal = false; + + for (int i = 0; i < config.num_threads; ++i) { + int f_count = files_per_thread + (i < remainder ? 1 : 0); + struct ceph_mount_info *worker_mount = config.per_thread_mount ? NULL : shared_cmount; + threads.emplace_back(bench_cleanup_worker, i, f_count, config, worker_mount, + std::ref(stop_signal), std::ref(thread_outputs[i])); + } + for (auto& t : threads) { + t.join(); + } + // Report errors + if (check_and_report_errors(stop_signal, thread_outputs)) { + return 1; + } + } + } + + cout << std::endl << std::endl << "*** Final Report ***" << std::endl; + + // Statistics Output + if (config.file_size > 0) { + print_statistics("Write Throughput", write_mbps, "MB/s"); + print_statistics("Read Throughput", read_mbps, "MB/s"); + } + print_statistics("File Creates", write_fps, "files/s"); + print_statistics("File Reads (Opens)", read_fps, "files/s"); + + if (config.cleanup) { + cout << "\nCleaning up..." << std::endl; + std::vector threads; + auto thread_outputs = std::vector(config.num_threads); + stop_signal = false; + + for (int i = 0; i < config.num_threads; ++i) { + int f_count = files_per_thread + (i < remainder ? 1 : 0); + struct ceph_mount_info *worker_mount = config.per_thread_mount ? NULL : shared_cmount; + threads.emplace_back(bench_cleanup_worker, i, f_count, config, worker_mount, + std::ref(stop_signal), std::ref(thread_outputs[i])); + } + for (auto& t : threads) { + t.join(); + } + + // Check for warnings/errors but proceed to rmdir + check_and_report_errors(stop_signal, thread_outputs); + + if (int rc = ceph_rmdir(shared_cmount, config.subdir.c_str()); rc < 0) { + cerr << "Warning: Failed to cleanup (rmdir) " << config.subdir << ": " << strerror(-rc) << endl; + } + } + + if (int rc = ceph_unmount(shared_cmount); rc < 0) { + cerr << "Final unmount failed: " << strerror(-rc) << endl; + } + ceph_shutdown(shared_cmount); + return 0; +} + +int main(int argc, char **argv) { + BenchConfig config; + string size_str, block_size_str, fsync_str; + bool no_cleanup = false; + string subcommand; + + // Group 1: General Options + po::options_description general("General Options"); + general.add_options() + ("help,h", "Produce help message") + ("conf,c", po::value(&config.ceph_conf), "Ceph config file path") + ("id,i", po::value(&config.userid)->default_value("admin"), "Client ID") + ("keyring,k", po::value(&config.keyring), "Path to keyring file") + ("filesystem,fs", po::value(&config.filesystem), "CephFS filesystem name to mount") + ("uid", po::value(&config.uid)->default_value(-1), "User ID to mount as") + ("gid", po::value(&config.gid)->default_value(-1), "Group ID to mount as"); + + // Group 2: Benchmark Options + po::options_description bench("Benchmark Options (used with 'bench' command)"); + bench.add_options() + ("threads", po::value(&config.num_threads)->default_value(1), "Number of threads") + ("iterations", po::value(&config.iterations)->default_value(1), "Number of iterations") + ("files", po::value(&config.num_files)->default_value(100), "Total number of files") + ("size", po::value(&size_str)->default_value("4MB"), "File size (e.g. 4MB, 0 for creates only)") + ("block-size", po::value(&block_size_str)->default_value("4MB"), "IO block size (e.g. 1MB)") + ("fsync-every", po::value(&fsync_str)->default_value("0"), "Call fsync every N bytes") + ("prefix", po::value(&config.prefix)->default_value("benchmark_"), "Filename prefix") + ("dir-prefix", po::value(&config.dir_prefix)->default_value("bench_run_"), "Directory prefix") + ("root-path", po::value(&config.mount_root)->default_value("/"), "Root path in CephFS") + ("per-thread-mount", po::bool_switch(&config.per_thread_mount), "Use separate mount per thread") + ("no-cleanup", po::bool_switch(&no_cleanup), "Disable cleanup of files"); + + // Hidden positional option for the sub-command + po::options_description hidden("Hidden options"); + hidden.add_options() + ("subcommand", po::value(&subcommand), "Command to execute"); + + // Visible options for help output + po::options_description visible("Allowed options"); + visible.add(general).add(bench); + + // All options for parsing + po::options_description all(""); + all.add(visible).add(hidden); + + // Positional mapping + po::positional_options_description p; + p.add("subcommand", 1); // Only take the first positional arg as command + + po::variables_map vm; + try { + po::store(po::command_line_parser(argc, argv).options(all).positional(p).run(), vm); + po::notify(vm); + } catch(const std::exception& e) { + cerr << "Error parsing options: " << e.what() << endl; + return 1; + } + + if (vm.count("help")) { + cout << "Usage: cephfs-bench [general-options] [command-options]\n\n"; + cout << "Commands:\n"; + cout << " bench Run IO benchmark\n\n"; + cout << visible << "\n"; + return 0; + } + + if (subcommand == "help" || subcommand.empty()) { + cerr << "Error: No command specified.\n"; + cerr << "Usage: cephfs-bench [options] \n"; + cerr << "Try --help for more information.\n"; + return 1; + } + + if (subcommand == "bench") { + // Inside do_bench or main, after parsing options: + config.cleanup = !no_cleanup; + config.file_size = parse_size(size_str); + config.block_size = parse_size(block_size_str); + config.fsync_every_bytes = parse_size(fsync_str); + return do_bench(config); + } else { + cerr << "Unknown command: " << subcommand << "\n"; + return 1; + } +}