From: Alexander Indenbaum Date: Sat, 21 Feb 2026 14:27:02 +0000 (+0200) Subject: crimson: add Objecter, IoCtx and RadosClient X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=1ce79f53269b101e592b88d188706553bcb0f0cb;p=ceph-ci.git crimson: add Objecter, IoCtx and RadosClient Implement Crimson RADOS client stack: - Objecter: OSDMap integration, PG mapping, OSD session, read/write/stat, discard, write_zeroes, compare_and_write, exec (cls calls), per-OSD write throttling - IoCtx: pool-scoped read, write, stat, discard, write_zeroes, compare_and_write, exec - RadosClient: connect, create_ioctx, wait_for_osdmap, shutdown Tests: - unittest-crimson-objecter: calc_target, IoCtx, API checks - crimson-rados-demo: integration test exercising all ops Signed-off-by: Alexander Indenbaum --- diff --git a/.gitignore b/.gitignore index c74ad2efd69..c60d3d79f41 100644 --- a/.gitignore +++ b/.gitignore @@ -38,6 +38,9 @@ core /src/td /src/vstart_environment.sh +# Local integration test scripts (not for commit) +/src/nvmeof/gateway/run_crimson_rbd_demo_integration_test.sh + # alpine abuild things .abuild/ alpine/APKBUILD diff --git a/src/crimson/CMakeLists.txt b/src/crimson/CMakeLists.txt index 83f4519ff51..d1236a41c95 100644 --- a/src/crimson/CMakeLists.txt +++ b/src/crimson/CMakeLists.txt @@ -171,6 +171,11 @@ set(crimson_mgr_srcs set(crimson_mon_srcs mon/MonClient.cc ${PROJECT_SOURCE_DIR}/src/mon/MonSub.cc) +set(crimson_osdc_srcs + osdc/objecter.cc) +set(crimson_client_srcs + client/io_context.cc + client/rados_client.cc) set(crimson_net_srcs ${PROJECT_SOURCE_DIR}/src/msg/async/crypto_onwire.cc ${PROJECT_SOURCE_DIR}/src/msg/async/compression_onwire.cc @@ -186,8 +191,10 @@ set(crimson_net_srcs net/chained_dispatchers.cc) add_library(crimson STATIC ${crimson_auth_srcs} + ${crimson_client_srcs} ${crimson_mgr_srcs} ${crimson_mon_srcs} + ${crimson_osdc_srcs} ${crimson_net_srcs}) target_compile_options(crimson PUBLIC "-ftemplate-backtrace-limit=0") diff --git a/src/crimson/client/io_context.cc b/src/crimson/client/io_context.cc new file mode 100644 index 00000000000..2b4433302ce --- /dev/null +++ b/src/crimson/client/io_context.cc @@ -0,0 +1,107 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 sts=2 expandtab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2026 IBM Corporation + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "crimson/client/io_context.h" + +#include "crimson/common/log.h" +#include "crimson/osdc/objecter.h" +#include "include/object.h" +#include "osd/osd_types.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_client); + } +} + +namespace crimson::client { + +IoCtx::IoCtx(crimson::osdc::Objecter& objecter, + int64_t pool_id, + snapid_t snap_seq) + : objecter(objecter), pool_id(pool_id), snap_seq(snap_seq) +{} + +seastar::future IoCtx::read(const std::string& oid, + uint64_t off, + uint64_t len) +{ + logger().debug("IoCtx::read oid={} off={} len={}", oid, off, len); + object_locator_t oloc(pool_id); + return objecter.read(object_t(oid), oloc, off, len, snap_seq); +} + +seastar::future<> IoCtx::write(const std::string& oid, + uint64_t off, + ceph::bufferlist&& bl) +{ + logger().debug("IoCtx::write oid={} off={} len={}", oid, off, bl.length()); + object_locator_t oloc(pool_id); + return objecter.write(object_t(oid), oloc, off, std::move(bl), snap_seq); +} + +seastar::future> IoCtx::stat( + const std::string& oid) +{ + logger().debug("IoCtx::stat oid={}", oid); + object_locator_t oloc(pool_id); + return objecter.stat(object_t(oid), oloc, snap_seq); +} + +seastar::future<> IoCtx::discard(const std::string& oid, + uint64_t off, + uint64_t len) +{ + logger().debug("IoCtx::discard oid={} off={} len={}", oid, off, len); + object_locator_t oloc(pool_id); + return objecter.discard(object_t(oid), oloc, off, len, snap_seq); +} + +seastar::future<> IoCtx::write_zeroes(const std::string& oid, + uint64_t off, + uint64_t len) +{ + logger().debug("IoCtx::write_zeroes oid={} off={} len={}", oid, off, len); + object_locator_t oloc(pool_id); + return objecter.write_zeroes(object_t(oid), oloc, off, len, snap_seq); +} + +seastar::future<> IoCtx::compare_and_write(const std::string& oid, + uint64_t cmp_off, + ceph::bufferlist&& cmp_bl, + uint64_t write_off, + ceph::bufferlist&& write_bl) +{ + logger().debug("IoCtx::compare_and_write oid={} cmp_off={} write_off={}", + oid, cmp_off, write_off); + object_locator_t oloc(pool_id); + return objecter.compare_and_write(object_t(oid), oloc, + cmp_off, std::move(cmp_bl), + write_off, std::move(write_bl), + snap_seq); +} + +seastar::future IoCtx::exec(const std::string& oid, + std::string_view cname, + std::string_view method, + ceph::bufferlist&& indata) +{ + logger().debug("IoCtx::exec oid={} cls={}.{}", oid, cname, method); + object_locator_t oloc(pool_id); + return objecter.exec(object_t(oid), oloc, cname, method, std::move(indata), + snap_seq); +} + +} // namespace crimson::client diff --git a/src/crimson/client/io_context.h b/src/crimson/client/io_context.h new file mode 100644 index 00000000000..2fa361d2332 --- /dev/null +++ b/src/crimson/client/io_context.h @@ -0,0 +1,83 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 sts=2 expandtab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2026 IBM Corporation + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include + +#include + +#include "include/buffer.h" +#include "include/types.h" +#include "common/ceph_time.h" + +namespace crimson::osdc { +class Objecter; +} + +namespace crimson::client { + +/** + * Pool-scoped I/O context. Delegates read, write, stat to Objecter + * with object_locator_t built from pool_id. + */ +class IoCtx { +public: + IoCtx(crimson::osdc::Objecter& objecter, + int64_t pool_id, + snapid_t snap_seq = CEPH_NOSNAP); + + seastar::future read(const std::string& oid, + uint64_t off, + uint64_t len); + + seastar::future<> write(const std::string& oid, + uint64_t off, + ceph::bufferlist&& bl); + + seastar::future> stat( + const std::string& oid); + + seastar::future<> discard(const std::string& oid, + uint64_t off, + uint64_t len); + + seastar::future<> write_zeroes(const std::string& oid, + uint64_t off, + uint64_t len); + + seastar::future<> compare_and_write(const std::string& oid, + uint64_t cmp_off, + ceph::bufferlist&& cmp_bl, + uint64_t write_off, + ceph::bufferlist&& write_bl); + + /// Execute cls method on object. Returns output bufferlist. + seastar::future exec(const std::string& oid, + std::string_view cname, + std::string_view method, + ceph::bufferlist&& indata); + + int64_t get_pool_id() const { return pool_id; } + snapid_t get_snap_seq() const { return snap_seq; } + +private: + crimson::osdc::Objecter& objecter; + int64_t pool_id; + snapid_t snap_seq; +}; + +} // namespace crimson::client diff --git a/src/crimson/client/rados_client.cc b/src/crimson/client/rados_client.cc new file mode 100644 index 00000000000..2334524449c --- /dev/null +++ b/src/crimson/client/rados_client.cc @@ -0,0 +1,98 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 sts=2 expandtab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2026 IBM Corporation + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "crimson/client/rados_client.h" + +#include +#include + +#include "crimson/common/log.h" +#include "crimson/osdc/objecter.h" +#include "osd/OSDMap.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_client); + } +} + +namespace crimson::client { + +RadosClient::RadosClient(crimson::net::Messenger& msgr, + crimson::mon::Client& monc) + : msgr(msgr), + monc(monc), + objecter(std::make_unique(msgr, monc)) +{} + +RadosClient::~RadosClient() = default; + +crimson::osdc::Objecter& RadosClient::get_objecter() +{ + return *objecter; +} + +seastar::future<> RadosClient::connect() +{ + logger().info("RadosClient::connect"); + return objecter->start(); +} + +seastar::future RadosClient::create_ioctx(std::string_view pool_name) +{ + logger().debug("RadosClient::create_ioctx pool_name={}", pool_name); + return objecter->wait_for_osdmap() + .then([this, pool_name = std::string(pool_name)] { + return objecter->with_osdmap( + [pool_name](const OSDMap& o) { + return o.lookup_pg_pool_name(pool_name); + }); + }) + .then([this](int64_t pool_id) { + if (pool_id < 0) { + return seastar::make_exception_future( + std::system_error(-pool_id, std::system_category(), "pool not found")); + } + logger().debug("RadosClient::create_ioctx pool_id={}", pool_id); + return seastar::make_ready_future(IoCtx(*objecter, pool_id)); + }); +} + +seastar::future RadosClient::create_ioctx(int64_t pool_id) +{ + logger().debug("RadosClient::create_ioctx pool_id={}", pool_id); + return objecter->wait_for_osdmap() + .then([this, pool_id] { + return objecter->with_osdmap( + [pool_id](const OSDMap& o) { + return o.have_pg_pool(pool_id); + }); + }) + .then([this, pool_id](bool exists) { + if (!exists) { + return seastar::make_exception_future( + std::system_error(ENOENT, std::system_category(), "pool not found")); + } + return seastar::make_ready_future(IoCtx(*objecter, pool_id)); + }); +} + +seastar::future<> RadosClient::shutdown() +{ + logger().info("RadosClient::shutdown"); + return objecter->stop(); +} + +} // namespace crimson::client diff --git a/src/crimson/client/rados_client.h b/src/crimson/client/rados_client.h new file mode 100644 index 00000000000..626505180df --- /dev/null +++ b/src/crimson/client/rados_client.h @@ -0,0 +1,92 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 sts=2 expandtab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2026 IBM Corporation + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include +#include + +#include + +#include "crimson/client/io_context.h" + +namespace crimson::mon { +class Client; +} + +namespace crimson::net { +class Messenger; +} + +namespace crimson::osdc { +class Objecter; +} + +namespace crimson::client { + +class IoCtx; + +/** + * RADOS client facade. Holds Messenger, MonClient, Objecter. + * Provides connect(), create_ioctx(pool_name), shutdown(). + * + * Usage: + * auto auth_handler = std::make_unique(); + * auto msgr = crimson::net::Messenger::create(entity_name_t::CLIENT(), ...); + * crimson::mon::Client monc{*msgr, *auth_handler}; + * RadosClient client(*msgr, monc); + * + * msgr->set_auth_client(&monc); + * msgr->start({&monc, &client.get_objecter()}); + * monc.start().get(); + * client.connect().get(); + * + * auto ioctx = client.create_ioctx("rbd").get(); + * co_await ioctx.write("obj", 0, std::move(bl)); + * auto data = co_await ioctx.read("obj", 0, 4096); + * + * client.shutdown().get(); + */ +class RadosClient { +public: + RadosClient(crimson::net::Messenger& msgr, + crimson::mon::Client& monc); + + ~RadosClient(); + + /// Objecter reference for adding to Messenger dispatchers before start. + crimson::osdc::Objecter& get_objecter(); + + /// Start Objecter (subscribe to osdmap). Call after msgr.start() and monc.start(). + seastar::future<> connect(); + + /// Create pool-scoped IoCtx by pool name. Waits for OSDMap, then looks up pool. + /// Fails with -ENOENT if pool not found. + seastar::future create_ioctx(std::string_view pool_name); + + /// Create pool-scoped IoCtx by pool id. Validates pool exists in OSDMap. + seastar::future create_ioctx(int64_t pool_id); + + /// Stop Objecter. + seastar::future<> shutdown(); + +private: + crimson::net::Messenger& msgr; + crimson::mon::Client& monc; + std::unique_ptr objecter; +}; + +} // namespace crimson::client diff --git a/src/crimson/osd/CMakeLists.txt b/src/crimson/osd/CMakeLists.txt index 4fdcb4bba32..2910356db8b 100644 --- a/src/crimson/osd/CMakeLists.txt +++ b/src/crimson/osd/CMakeLists.txt @@ -1,10 +1,16 @@ +add_library(crimson-main-config-bootstrap STATIC + main_config_bootstrap_helpers.cc) +target_link_libraries(crimson-main-config-bootstrap + crimson-common + crimson + legacy-option-headers) + add_executable(crimson-osd backfill_state.cc ec_backend.cc heartbeat.cc lsan_suppressions.cc main.cc - main_config_bootstrap_helpers.cc osd.cc osd_meta.cc pg.cc @@ -65,6 +71,7 @@ if(HAS_VTA) PROPERTIES COMPILE_FLAGS -fno-var-tracking-assignments) endif() target_link_libraries(crimson-osd + crimson-main-config-bootstrap legacy-option-headers crimson-admin crimson-common diff --git a/src/crimson/osd/main_config_bootstrap_helpers.cc b/src/crimson/osd/main_config_bootstrap_helpers.cc index adf7d630b13..2d7f6473627 100644 --- a/src/crimson/osd/main_config_bootstrap_helpers.cc +++ b/src/crimson/osd/main_config_bootstrap_helpers.cc @@ -128,7 +128,7 @@ std::optional get_option_value(const SeastarOption& option) { } static tl::expected -_get_early_config(int argc, const char *argv[]) +_get_early_config(int argc, const char *argv[], int entity_type) { early_config_t ret; @@ -140,7 +140,7 @@ _get_early_config(int argc, const char *argv[]) ret.init_params = ceph_argparse_early_args( early_args, - CEPH_ENTITY_TYPE_OSD, + entity_type, &ret.cluster_name, &ret.conf_file_list); @@ -152,8 +152,8 @@ _get_early_config(int argc, const char *argv[]) int r = app.run( sizeof(bootstrap_args) / sizeof(bootstrap_args[0]), const_cast(bootstrap_args), - [argc, argv, &ret, &early_args] { - return seastar::async([argc, argv, &ret, &early_args] { + [argc, argv, &ret, &early_args, entity_type] { + return seastar::async([argc, argv, &ret, &early_args, entity_type] { seastar::global_logger_registry().set_all_loggers_level( seastar::log_level::debug); sharded_conf().start( @@ -216,20 +216,25 @@ _get_early_config(int argc, const char *argv[]) cpu_cores); } else { auto reactor_num = crimson::common::get_conf("crimson_cpu_num"); - if (!reactor_num) { - // We would like to avoid seastar using all available cores. + if (entity_type == CEPH_ENTITY_TYPE_CLIENT && !reactor_num) { + ret.early_args.emplace_back("--smp"); + ret.early_args.emplace_back("1"); + ret.early_args.emplace_back("--thread-affinity"); + ret.early_args.emplace_back("0"); + logger().info("get_early_config: client default --smp 1"); + } else if (!reactor_num) { logger().error("get_early_config: crimson_cpu_set" " or crimson_cpu_num must be set"); ceph_abort(); + } else { + std::string smp = fmt::format("{}", reactor_num); + ret.early_args.emplace_back("--smp"); + ret.early_args.emplace_back(smp); + ret.early_args.emplace_back("--thread-affinity"); + ret.early_args.emplace_back("0"); + logger().info("get_early_config: set --thread-affinity 0 --smp {}", + smp); } - std::string smp = fmt::format("{}", reactor_num); - ret.early_args.emplace_back("--smp"); - ret.early_args.emplace_back(smp); - ret.early_args.emplace_back("--thread-affinity"); - ret.early_args.emplace_back("0"); - logger().info("get_early_config: set --thread-affinity 0 --smp {}", - smp); - } } else { logger().error("get_early_config: --cpuset can be " @@ -301,7 +306,7 @@ get_early_config(int argc, const char *argv[]) return tl::unexpected(-errno); } else if (worker == 0) { // child close(pipes[0]); - auto ret = _get_early_config(argc, argv); + auto ret = _get_early_config(argc, argv, CEPH_ENTITY_TYPE_OSD); if (ret.has_value()) { bufferlist bl; ::encode(ret.value(), bl); @@ -355,4 +360,86 @@ get_early_config(int argc, const char *argv[]) } } +tl::expected +get_early_config_client(int argc, const char *argv[]) +{ + auto args = argv_to_vec(argc, argv); + if (args.empty()) { + std::cerr << argv[0] << ": -h or --help for usage" << std::endl; + exit(1); + } + if (ceph_argparse_need_usage(args)) { + std::cout << "usage: " << argv[0] << " [options] [--pool ]" << std::endl; + generic_server_usage(); + exit(0); + } + int pipes[2]; + int r = pipe2(pipes, 0); + if (r < 0) { + std::cerr << "get_early_config_client: failed to create pipes: " + << -errno << std::endl; + return tl::unexpected(-errno); + } + + pid_t worker = fork(); + if (worker < 0) { + close(pipes[0]); + close(pipes[1]); + std::cerr << "get_early_config_client: failed to fork: " + << -errno << std::endl; + return tl::unexpected(-errno); + } else if (worker == 0) { // child + close(pipes[0]); + auto ret = _get_early_config(argc, argv, CEPH_ENTITY_TYPE_CLIENT); + if (ret.has_value()) { + bufferlist bl; + ::encode(ret.value(), bl); + r = bl.write_fd(pipes[1]); + close(pipes[1]); + if (r < 0) { + std::cerr << "get_early_config_client: child failed to write_fd: " + << r << std::endl; + exit(-r); + } else { + exit(0); + } + } else { + std::cerr << "get_early_config_client: child failed: " + << -ret.error() << std::endl; + exit(-ret.error()); + } + return tl::unexpected(-1); + } else { // parent + close(pipes[1]); + + bufferlist bl; + early_config_t ret; + bool have_data = false; + while ((r = bl.read_fd(pipes[0], 1024)) > 0) { + have_data = true; + } + close(pipes[0]); + + int status; + waitpid(worker, &status, 0); + + if (!have_data && WIFEXITED(status) && WEXITSTATUS(status) == 0) { + exit(0); + } + if (r < 0) { + std::cerr << "get_early_config_client: parent failed to read from pipe: " + << r << std::endl; + return tl::unexpected(r); + } + try { + auto bliter = bl.cbegin(); + ::decode(ret, bliter); + return ret; + } catch (...) { + std::cerr << "get_early_config_client: parent failed to decode" << std::endl; + return tl::unexpected(-EINVAL); + } + } +} + } diff --git a/src/crimson/osd/main_config_bootstrap_helpers.h b/src/crimson/osd/main_config_bootstrap_helpers.h index 6e993a58cf0..e05907d64c6 100644 --- a/src/crimson/osd/main_config_bootstrap_helpers.h +++ b/src/crimson/osd/main_config_bootstrap_helpers.h @@ -88,6 +88,10 @@ struct early_config_t { tl::expected get_early_config(int argc, const char *argv[]); +/// Like get_early_config but for CEPH_ENTITY_TYPE_CLIENT (e.g. client.admin). +tl::expected +get_early_config_client(int argc, const char *argv[]); + } WRITE_CLASS_ENCODER(crimson::osd::early_config_t) diff --git a/src/crimson/osdc/objecter.cc b/src/crimson/osdc/objecter.cc new file mode 100644 index 00000000000..955fc37c551 --- /dev/null +++ b/src/crimson/osdc/objecter.cc @@ -0,0 +1,759 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 sts=2 expandtab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2026 IBM Corporation + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "objecter.h" + +#include + +#include "crimson/common/log.h" +#include "crimson/net/Connection.h" +#include "crimson/net/Messenger.h" +#include "messages/MOSDOp.h" +#include "messages/MOSDOpReply.h" +#include "messages/MOSDMap.h" + +#include "common/entity_name.h" +#include "common/hobject.h" +#include "crimson/mon/MonClient.h" +#include "include/rados.h" + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_objecter); + } +} + +namespace crimson::osdc { + +Objecter::Objecter(crimson::net::Messenger& msgr, + crimson::mon::Client& monc) + : msgr(msgr), + monc(monc), + osdmap(std::make_unique()) +{} + +Objecter::~Objecter() {} + +seastar::future<> Objecter::start() +{ + logger().info("Objecter::start"); + ceph_assert(!started); + started = true; + osdmap_ready.emplace(); + + // Subscribe to osdmap updates from the monitor + if (monc.sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME)) { + return monc.renew_subs(); + } + return seastar::now(); +} + +seastar::future<> Objecter::stop() +{ + logger().info("Objecter::stop"); + ceph_assert(started); + started = false; + return dispatch_gate.close(); +} + +std::optional> +Objecter::ms_dispatch(crimson::net::ConnectionRef conn, + MessageRef m) +{ + const int msg_type = m->get_type(); + + // Only handle messages from MON (OSDMap) or OSD (OpReply) + if (conn->peer_is_mon()) { + if (msg_type != CEPH_MSG_OSD_MAP) { + return std::nullopt; + } + auto mosdmap = boost::static_pointer_cast(m); + return seastar::with_gate(dispatch_gate, [this, mosdmap] { + return handle_osd_map(std::move(mosdmap)); + }); + } + + if (conn->peer_is_osd()) { + if (msg_type != CEPH_MSG_OSD_OPREPLY) { + return std::nullopt; + } + auto reply = boost::static_pointer_cast(m); + return seastar::with_gate(dispatch_gate, [this, conn, reply] { + handle_osd_op_reply(conn, reply.get()); + return seastar::now(); + }); + } + + return std::nullopt; +} + +seastar::future<> Objecter::handle_osd_map(Ref m) +{ + logger().debug("handle_osd_map: epochs [{},{}]", + m->get_first(), m->get_last()); + + if (m->fsid != monc.get_fsid()) { + logger().warn("handle_osd_map: fsid mismatch {} != {}", + m->fsid, monc.get_fsid()); + return seastar::now(); + } + + return seastar::with_lock(osdmap_mutex, [this, m = std::move(m)] { + seastar::future<> renew_fut = seastar::now(); + + if (m->get_last() <= osdmap->get_epoch()) { + logger().debug("handle_osd_map: ignoring stale epochs [{},{}] <= {}", + m->get_first(), m->get_last(), osdmap->get_epoch()); + } else { + if (osdmap->get_epoch()) { + // Apply incremental maps + for (epoch_t e = osdmap->get_epoch() + 1; e <= m->get_last(); ++e) { + if (osdmap->get_epoch() == e - 1 && m->incremental_maps.count(e)) { + logger().debug("handle_osd_map: applying incremental epoch {}", e); + OSDMap::Incremental inc(m->incremental_maps[e]); + osdmap->apply_incremental(inc); + } else if (m->maps.count(e)) { + logger().debug("handle_osd_map: applying full epoch {}", e); + auto new_osdmap = std::make_unique(); + new_osdmap->decode(m->maps[e]); + osdmap = std::move(new_osdmap); + } + } + } else { + // First map - need full + if (m->maps.count(m->get_last())) { + logger().debug("handle_osd_map: decoding initial full epoch {}", + m->get_last()); + osdmap->decode(m->maps[m->get_last()]); + } else { + logger().debug("handle_osd_map: need full map, requesting"); + renew_fut = maybe_request_map(); + } + } + } + + monc.sub_got("osdmap", osdmap->get_epoch()); + + if (osdmap->get_epoch() > 0 && osdmap_ready && !osdmap_ready_fulfilled) { + osdmap_ready_fulfilled = true; + osdmap_ready->set_value(); + } + return renew_fut; + }); +} + +void Objecter::handle_osd_op_reply(crimson::net::ConnectionRef conn, + MOSDOpReply* m) +{ + const ceph_tid_t tid = m->get_tid(); + const int r = m->get_result(); + + if (!conn->peer_is_osd()) { + return; + } + const int osd_id = conn->get_peer_id(); + auto it = sessions.find(osd_id); + if (it == sessions.end()) { + logger().warn("handle_osd_op_reply: tid={} from unknown osd.{}", tid, osd_id); + return; + } + auto op_it = it->second.ops.find(tid); + if (op_it == it->second.ops.end()) { + logger().debug("handle_osd_op_reply: tid={} not found (may be stale)", tid); + return; + } + auto completion = std::move(op_it->second); + it->second.ops.erase(op_it); + + std::vector ops; + m->claim_ops(ops); + completion(r, ops); +} + +seastar::future<> Objecter::wait_for_osdmap() +{ + return with_osdmap([](const OSDMap& o) { return o.get_epoch(); }) + .then([this](epoch_t epoch) { + if (epoch > 0) { + return seastar::now(); + } + return osdmap_ready->get_shared_future(); + }); +} + +seastar::future<> Objecter::maybe_request_map() +{ + if (monc.sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME)) { + return monc.renew_subs(); + } + return seastar::now(); +} + +#ifdef UNIT_TESTS_BUILT +seastar::future<> Objecter::inject_osdmap_for_test(std::unique_ptr m) +{ + return seastar::with_lock(osdmap_mutex, [this, m = std::move(m)]() mutable { + osdmap = std::move(m); + monc.sub_got("osdmap", osdmap->get_epoch()); + if (osdmap->get_epoch() > 0 && osdmap_ready && !osdmap_ready_fulfilled) { + osdmap_ready_fulfilled = true; + osdmap_ready->set_value(); + } + }); +} +#endif + +seastar::future> +Objecter::calc_target(const object_t& oid, + const object_locator_t& oloc) const +{ + return seastar::with_shared(osdmap_mutex, [this, oid, oloc] { + if (!osdmap->get_epoch()) { + return std::optional{}; // no OSDMap yet + } + + pg_t raw_pg; + const int ret = osdmap->object_locator_to_pg(oid, oloc, raw_pg); + if (ret != 0) { + return std::optional{}; // pool does not exist (e.g. -ENOENT) + } + + const pg_t actual_pgid = osdmap->raw_pg_to_pg(raw_pg); + + int up_primary = -1; + int acting_primary = -1; + std::vector up; + std::vector acting; + osdmap->pg_to_up_acting_osds(actual_pgid, + &up, &up_primary, + &acting, &acting_primary); + + if (acting_primary < 0 || !osdmap->exists(acting_primary)) { + return std::optional{}; + } + + const entity_addr_t addr = osdmap->get_addrs(acting_primary).front(); + return std::optional{OsdTarget{actual_pgid, raw_pg, acting_primary, addr}}; + }); +} + +seastar::future +Objecter::get_or_connect(const entity_addr_t& addr, int osd_id) +{ + auto it = sessions.find(osd_id); + if (it != sessions.end() && it->second.conn && it->second.conn->is_connected()) { + return seastar::make_ready_future(it->second.conn); + } + + auto pend_it = pending_connects.find(osd_id); + if (pend_it != pending_connects.end()) { + return pend_it->second.get_shared_future(); + } + + crimson::net::ConnectionRef conn = msgr.connect( + addr, entity_name_t(CEPH_ENTITY_TYPE_OSD, osd_id)); + + if (conn->is_connected()) { + if (it == sessions.end()) { + sessions.emplace(osd_id, OsdSession{osd_id, addr, conn}); + } else { + it->second.conn = conn; + it->second.addr = addr; + } + return seastar::make_ready_future(conn); + } + + if (it == sessions.end()) { + sessions.emplace(osd_id, OsdSession{osd_id, addr, conn}); + } else { + it->second.conn = conn; + it->second.addr = addr; + } + + auto [promise_it, inserted] = pending_connects.emplace( + osd_id, seastar::shared_promise{}); + ceph_assert(inserted); + return promise_it->second.get_shared_future(); +} + +seastar::future<> Objecter::send_to_osd(const OsdTarget& target, MessageURef msg) +{ + return get_or_connect(target.primary_addr, target.primary_osd) + .then([msg = std::move(msg)](crimson::net::ConnectionRef conn) mutable { + return conn->send(std::move(msg)); + }); +} + +seastar::future Objecter::read(const object_t& oid, + const object_locator_t& oloc, + uint64_t off, uint64_t len, + snapid_t snap) +{ + return calc_target(oid, oloc) + .then([this, oid, oloc, off, len, snap](std::optional target) { + if (!target) { + return seastar::make_exception_future( + std::system_error(ENOENT, std::generic_category(), "pool or object mapping failed")); + } + return with_osdmap([](const OSDMap& m) { return m.get_epoch(); }) + .then([this, target = *target, oid, oloc, off, len, snap](epoch_t epoch) { + const ceph_tid_t tid = next_tid++; + const uint32_t hash = oloc.hash >= 0 ? static_cast(oloc.hash) + : target.raw_pg.ps(); + const hobject_t hobj(oid, oloc.key, snap, hash, + oloc.get_pool(), oloc.nspace); + spg_t spgid(target.pgid); + const int flags = CEPH_OSD_FLAG_RETURNVEC; + + auto msg = crimson::make_message(client_inc, tid, hobj, spgid, epoch, flags, 0); + msg->set_snapid(snap); + msg->read(off, len); + + seastar::promise promise; + auto future = promise.get_future(); + const int osd_id = target.primary_osd; + + auto p = std::make_shared>(std::move(promise)); + OpCompletion completion = [p](int r, std::vector& ops) { + if (r < 0) { + p->set_exception(std::make_exception_ptr( + std::system_error(-r, std::generic_category(), "read failed"))); + } else if (!ops.empty()) { + p->set_value(std::move(ops[0].outdata)); + } else { + p->set_value(ceph::bufferlist{}); + } + }; + + return get_or_connect(target.primary_addr, osd_id) + .then([this, tid, osd_id, c = std::move(completion), msg = std::move(msg)]( + crimson::net::ConnectionRef conn) mutable { + auto it = sessions.find(osd_id); + ceph_assert(it != sessions.end()); + it->second.ops[tid] = std::move(c); + return conn->send(std::move(msg)); + }) + .then([f = std::move(future)]() mutable { + return std::move(f); + }); + }); + }); +} + +seastar::future<> Objecter::write(const object_t& oid, + const object_locator_t& oloc, + uint64_t off, ceph::bufferlist&& bl, + snapid_t snap) +{ + return calc_target(oid, oloc) + .then([this, oid, oloc, off, bl = std::move(bl), snap](std::optional target) mutable { + if (!target) { + return seastar::make_exception_future<>( + std::system_error(ENOENT, std::generic_category(), + "pool or object mapping failed")); + } + return with_osdmap([](const OSDMap& m) { return m.get_epoch(); }) + .then([this, target = *target, oid, oloc, off, bl = std::move(bl), snap](epoch_t epoch) mutable { + const ceph_tid_t tid = next_tid++; + const uint32_t hash = oloc.hash >= 0 ? static_cast(oloc.hash) + : target.raw_pg.ps(); + const hobject_t hobj(oid, oloc.key, snap, hash, + oloc.get_pool(), oloc.nspace); + spg_t spgid(target.pgid); + const size_t len = bl.length(); + const int osd_id = target.primary_osd; + + auto msg = crimson::make_message(client_inc, tid, hobj, spgid, epoch, 0, 0); + msg->set_snapid(snap); + msg->write(off, len, bl); + // MOSDOp::write puts data in Message::data, but OSD expects it in + // ops[0].indata for proper encode/decode. Move it there. + msg->ops[0].indata = std::move(msg->get_data()); + + seastar::promise<> promise; + auto future = promise.get_future(); + + auto p = std::make_shared>(std::move(promise)); + OpCompletion completion = [this, osd_id, len, p](int r, std::vector&) { + auto it = sessions.find(osd_id); + if (it != sessions.end()) { + it->second.write_throttle.put(len); + } + if (r < 0) { + p->set_exception(std::make_exception_ptr( + std::system_error(-r, std::generic_category(), "write failed"))); + } else { + p->set_value(); + } + }; + + return get_or_connect(target.primary_addr, osd_id) + .then([this, tid, osd_id, len, c = std::move(completion), msg = std::move(msg)]( + crimson::net::ConnectionRef conn) mutable { + auto it = sessions.find(osd_id); + ceph_assert(it != sessions.end()); + return it->second.write_throttle.get(len).then( + [this, conn, tid, osd_id, c = std::move(c), + msg = std::move(msg)]() mutable { + auto it2 = sessions.find(osd_id); + ceph_assert(it2 != sessions.end()); + it2->second.ops[tid] = std::move(c); + return conn->send(std::move(msg)); + }).handle_exception([this, tid, osd_id, len](std::exception_ptr e) { + auto it2 = sessions.find(osd_id); + if (it2 != sessions.end()) { + it2->second.ops.erase(tid); + it2->second.write_throttle.put(len); + } + return seastar::make_exception_future<>(e); + }); + }) + .then([f = std::move(future)]() mutable { + return std::move(f); + }); + }); + }); +} + +seastar::future<> Objecter::discard(const object_t& oid, + const object_locator_t& oloc, + uint64_t off, uint64_t len, + snapid_t snap) +{ + return calc_target(oid, oloc) + .then([this, oid, oloc, off, len, snap](std::optional target) { + if (!target) { + return seastar::make_exception_future<>( + std::system_error(ENOENT, std::generic_category(), + "pool or object mapping failed")); + } + return with_osdmap([](const OSDMap& m) { return m.get_epoch(); }) + .then([this, target = *target, oid, oloc, off, len, snap](epoch_t epoch) { + const ceph_tid_t tid = next_tid++; + const uint32_t hash = oloc.hash >= 0 ? static_cast(oloc.hash) + : target.raw_pg.ps(); + const hobject_t hobj(oid, oloc.key, snap, hash, + oloc.get_pool(), oloc.nspace); + spg_t spgid(target.pgid); + const int osd_id = target.primary_osd; + + auto msg = crimson::make_message(client_inc, tid, hobj, spgid, epoch, 0, 0); + msg->set_snapid(snap); + msg->zero(off, len); + + seastar::promise<> promise; + auto future = promise.get_future(); + auto p = std::make_shared>(std::move(promise)); + OpCompletion completion = [p](int r, std::vector&) { + if (r < 0) { + p->set_exception(std::make_exception_ptr( + std::system_error(-r, std::generic_category(), "discard failed"))); + } else { + p->set_value(); + } + }; + + return get_or_connect(target.primary_addr, osd_id) + .then([this, tid, osd_id, c = std::move(completion), msg = std::move(msg)]( + crimson::net::ConnectionRef conn) mutable { + auto it = sessions.find(osd_id); + ceph_assert(it != sessions.end()); + it->second.ops[tid] = std::move(c); + return conn->send(std::move(msg)); + }) + .then([f = std::move(future)]() mutable { + return std::move(f); + }); + }); + }); +} + +seastar::future<> Objecter::write_zeroes(const object_t& oid, + const object_locator_t& oloc, + uint64_t off, uint64_t len, + snapid_t snap) +{ + return discard(oid, oloc, off, len, snap); +} + +seastar::future<> Objecter::compare_and_write(const object_t& oid, + const object_locator_t& oloc, + uint64_t cmp_off, ceph::bufferlist&& cmp_bl, + uint64_t write_off, ceph::bufferlist&& write_bl, + snapid_t snap) +{ + return calc_target(oid, oloc) + .then([this, oid, oloc, cmp_off, cmp_bl = std::move(cmp_bl), + write_off, write_bl = std::move(write_bl), snap](std::optional target) mutable { + if (!target) { + return seastar::make_exception_future<>( + std::system_error(ENOENT, std::generic_category(), + "pool or object mapping failed")); + } + return with_osdmap([](const OSDMap& m) { return m.get_epoch(); }) + .then([this, target = *target, oid, oloc, + cmp_off, cmp_bl = std::move(cmp_bl), + write_off, write_bl = std::move(write_bl), snap](epoch_t epoch) mutable { + const ceph_tid_t tid = next_tid++; + const uint32_t hash = oloc.hash >= 0 ? static_cast(oloc.hash) + : target.raw_pg.ps(); + const hobject_t hobj(oid, oloc.key, snap, hash, + oloc.get_pool(), oloc.nspace); + spg_t spgid(target.pgid); + const size_t write_len = write_bl.length(); + const int osd_id = target.primary_osd; + + auto msg = crimson::make_message(client_inc, tid, hobj, spgid, epoch, 0, 0); + msg->set_snapid(snap); + msg->add_op_with_data(CEPH_OSD_OP_CMPEXT, cmp_off, cmp_bl.length(), std::move(cmp_bl)); + msg->add_op_with_data(CEPH_OSD_OP_WRITE, write_off, write_len, std::move(write_bl)); + + seastar::promise<> promise; + auto future = promise.get_future(); + auto p = std::make_shared>(std::move(promise)); + OpCompletion completion = [this, osd_id, write_len, p](int r, std::vector&) { + auto it = sessions.find(osd_id); + if (it != sessions.end()) { + it->second.write_throttle.put(write_len); + } + if (r == -EILSEQ) { + p->set_exception(std::make_exception_ptr( + std::system_error(EILSEQ, std::generic_category(), "compare-and-write mismatch"))); + } else if (r < 0) { + p->set_exception(std::make_exception_ptr( + std::system_error(-r, std::generic_category(), "compare-and-write failed"))); + } else { + p->set_value(); + } + }; + + return get_or_connect(target.primary_addr, osd_id) + .then([this, tid, osd_id, write_len, c = std::move(completion), msg = std::move(msg)]( + crimson::net::ConnectionRef conn) mutable { + auto it = sessions.find(osd_id); + ceph_assert(it != sessions.end()); + return it->second.write_throttle.get(write_len).then( + [this, conn, tid, osd_id, c = std::move(c), + msg = std::move(msg)]() mutable { + auto it2 = sessions.find(osd_id); + ceph_assert(it2 != sessions.end()); + it2->second.ops[tid] = std::move(c); + return conn->send(std::move(msg)); + }).handle_exception([this, tid, osd_id, write_len](std::exception_ptr e) { + auto it2 = sessions.find(osd_id); + if (it2 != sessions.end()) { + it2->second.ops.erase(tid); + it2->second.write_throttle.put(write_len); + } + return seastar::make_exception_future<>(e); + }); + }) + .then([f = std::move(future)]() mutable { + return std::move(f); + }); + }); + }); +} + +seastar::future Objecter::exec(const object_t& oid, + const object_locator_t& oloc, + std::string_view cname, + std::string_view method, + ceph::bufferlist&& indata, + snapid_t snap) +{ + return calc_target(oid, oloc) + .then([this, oid, oloc, cname, method, indata = std::move(indata), snap]( + std::optional target) mutable { + if (!target) { + return seastar::make_exception_future( + std::system_error(ENOENT, std::generic_category(), + "pool or object mapping failed")); + } + return with_osdmap([](const OSDMap& m) { return m.get_epoch(); }) + .then([this, target = *target, oid, oloc, cname, method, + indata = std::move(indata), snap](epoch_t epoch) mutable { + const ceph_tid_t tid = next_tid++; + const uint32_t hash = oloc.hash >= 0 ? static_cast(oloc.hash) + : target.raw_pg.ps(); + const hobject_t hobj(oid, oloc.key, snap, hash, + oloc.get_pool(), oloc.nspace); + spg_t spgid(target.pgid); + const int flags = CEPH_OSD_FLAG_RETURNVEC; + + auto msg = crimson::make_message(client_inc, tid, hobj, spgid, + epoch, flags, 0); + msg->set_snapid(snap); + msg->add_call(cname, method, indata); + + seastar::promise promise; + auto future = promise.get_future(); + const int osd_id = target.primary_osd; + + auto p = std::make_shared>( + std::move(promise)); + OpCompletion completion = [p](int r, std::vector& ops) { + if (r < 0) { + p->set_exception(std::make_exception_ptr( + std::system_error(-r, std::generic_category(), "exec failed"))); + } else if (!ops.empty()) { + p->set_value(std::move(ops[0].outdata)); + } else { + p->set_value(ceph::bufferlist{}); + } + }; + + return get_or_connect(target.primary_addr, osd_id) + .then([this, tid, osd_id, c = std::move(completion), + msg = std::move(msg)](crimson::net::ConnectionRef conn) mutable { + auto it = sessions.find(osd_id); + ceph_assert(it != sessions.end()); + it->second.ops[tid] = std::move(c); + return conn->send(std::move(msg)); + }) + .then([f = std::move(future)]() mutable { + return std::move(f); + }); + }); + }); +} + +seastar::future> +Objecter::stat(const object_t& oid, + const object_locator_t& oloc, + snapid_t snap) +{ + return calc_target(oid, oloc) + .then([this, oid, oloc, snap](std::optional target) { + if (!target) { + return seastar::make_exception_future>( + std::system_error(ENOENT, std::generic_category(), + "pool or object mapping failed")); + } + return with_osdmap([](const OSDMap& m) { return m.get_epoch(); }) + .then([this, target = *target, oid, oloc, snap](epoch_t epoch) { + const ceph_tid_t tid = next_tid++; + const uint32_t hash = oloc.hash >= 0 ? static_cast(oloc.hash) + : target.raw_pg.ps(); + const hobject_t hobj(oid, oloc.key, snap, hash, + oloc.get_pool(), oloc.nspace); + spg_t spgid(target.pgid); + const int flags = CEPH_OSD_FLAG_RETURNVEC; + + auto msg = crimson::make_message(client_inc, tid, hobj, spgid, epoch, flags, 0); + msg->set_snapid(snap); + msg->stat(); + + seastar::promise> promise; + auto future = promise.get_future(); + const int osd_id = target.primary_osd; + + auto p = std::make_shared>>( + std::move(promise)); + OpCompletion completion = [p](int r, std::vector& ops) { + if (r < 0) { + p->set_exception(std::make_exception_ptr( + std::system_error(-r, std::generic_category(), "stat failed"))); + return; + } + if (ops.empty() || ops[0].outdata.length() == 0) { + p->set_exception(std::make_exception_ptr( + std::runtime_error("stat reply empty"))); + return; + } + try { + auto p_iter = ops[0].outdata.cbegin(); + uint64_t size; + ceph::real_time mtime; + ceph::decode(size, p_iter); + ceph::decode(mtime, p_iter); + p->set_value(std::make_pair(size, mtime)); + } catch (const ceph::buffer::error&) { + p->set_exception(std::make_exception_ptr( + std::runtime_error("stat reply decode failed"))); + } + }; + + return get_or_connect(target.primary_addr, osd_id) + .then([this, tid, osd_id, c = std::move(completion), msg = std::move(msg)]( + crimson::net::ConnectionRef conn) mutable { + auto it = sessions.find(osd_id); + ceph_assert(it != sessions.end()); + it->second.ops[tid] = std::move(c); + return conn->send(std::move(msg)); + }) + .then([f = std::move(future)]() mutable { + return std::move(f); + }); + }); + }); +} + +void Objecter::ms_handle_connect(crimson::net::ConnectionRef conn, + seastar::shard_id prv_shard) +{ + if (!conn->peer_is_osd()) { + return; + } + const int osd_id = conn->get_peer_id(); + logger().debug("ms_handle_connect: osd.{}", osd_id); + + auto it = sessions.find(osd_id); + if (it != sessions.end()) { + it->second.conn = conn; + it->second.addr = conn->get_peer_addr(); + } else { + sessions.emplace(osd_id, + OsdSession{osd_id, conn->get_peer_addr(), conn}); + } + + auto pend = pending_connects.find(osd_id); + if (pend != pending_connects.end()) { + pend->second.set_value(conn); + pending_connects.erase(pend); + } +} + +void Objecter::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) +{ + if (!conn->peer_is_osd()) { + return; + } + const int osd_id = conn->get_peer_id(); + logger().debug("ms_handle_reset: osd.{} is_replace={}", osd_id, is_replace); + + auto it = sessions.find(osd_id); + if (it != sessions.end()) { + if (it->second.conn.get() == conn.get()) { + it->second.conn = nullptr; + } + const auto ex = std::make_exception_ptr( + std::runtime_error(fmt::format("connection to osd.{} reset", osd_id))); + for (auto& [tid, completion] : it->second.ops) { + std::vector empty_ops; + completion(-ECONNRESET, empty_ops); + } + it->second.ops.clear(); + } + + auto pend = pending_connects.find(osd_id); + if (pend != pending_connects.end()) { + pend->second.set_exception(std::make_exception_ptr( + std::runtime_error(fmt::format("connection to osd.{} reset", osd_id)))); + pending_connects.erase(pend); + } +} + +} // namespace crimson::osdc diff --git a/src/crimson/osdc/objecter.h b/src/crimson/osdc/objecter.h new file mode 100644 index 00000000000..4b5103e56f3 --- /dev/null +++ b/src/crimson/osdc/objecter.h @@ -0,0 +1,216 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 sts=2 expandtab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2026 IBM Corporation + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "include/ceph_fs.h" +#include "include/types.h" +#include "include/utime.h" + +#include "common/ceph_time.h" +#include "crimson/common/throttle.h" +#include "crimson/common/type_helpers.h" +#include "crimson/net/Dispatcher.h" +#include "crimson/net/Fwd.h" + +#include "msg/MessageRef.h" +#include "msg/msg_types.h" +#include "messages/MOSDOp.h" +#include "osd/OSDMap.h" + +class MOSDOpReply; +class MOSDMap; + +namespace crimson::mon { +class Client; +} + +namespace crimson::osdc { + +/// Result of mapping an object to its target OSD +struct OsdTarget { + pg_t pgid; ///< actual pg (after raw_pg_to_pg) + pg_t raw_pg; ///< raw pg (for hobject_t hash - must match classic Objecter) + int primary_osd; + entity_addr_t primary_addr; +}; + +/// Completion callback for op reply: (result, ops) -> void +using OpCompletion = std::function&)>; + +/// Per-OSD session holding connection and in-flight ops +struct OsdSession { + int osd_id = -1; + entity_addr_t addr; + crimson::net::ConnectionRef conn; + + /// In-flight ops by tid (read, write, stat) + std::unordered_map ops; + + /// Throttle for write bytes in flight + crimson::common::Throttle write_throttle{64 * 1024 * 1024}; // 64 MiB default +}; + +/** + * Crimson Objecter - Seastar-native RADOS client objecter. + * + * - Registers as Dispatcher for MOSDOpReply (stub) and MOSDMap + * - Subscribes to osdmap via MonClient, receives and stores OSDMap + * - Exposes with_osdmap() for pool resolution and PG calculation + * - Exposes calc_target() for object -> PG -> primary OSD mapping + * - Exposes get_or_connect() and send_to_osd() for OSD connection and message send + */ +class Objecter : public crimson::net::Dispatcher { +public: + Objecter(crimson::net::Messenger& msgr, + crimson::mon::Client& monc); + + ~Objecter() override; + + seastar::future<> start(); + seastar::future<> stop(); + + /// Invoke a callback with read access to the current OSDMap. + /// Returns a future with the result of the callback. OSDMap may be empty + /// (epoch 0) until the first map is received from the monitor. + template + seastar::futurize_t> + with_osdmap(Func&& f) const { + return seastar::with_shared( + osdmap_mutex, + [this, f = std::forward(f)]() { + return seastar::futurize_invoke(f, static_cast(*osdmap)); + }); + } + + /// Map object + locator to target PG and primary OSD address. + /// Uses OSDMap::object_locator_to_pg, raw_pg_to_pg, pg_to_up_acting_osds. + /// Returns nullopt if pool does not exist or mapping fails. + seastar::future> calc_target(const object_t& oid, + const object_locator_t& oloc) const; + + /// Get or create connection to OSD. Resolves when handshake completes. + seastar::future + get_or_connect(const entity_addr_t& addr, int osd_id); + + /// Send message to OSD (connects if needed). Requires OsdTarget from calc_target. + seastar::future<> send_to_osd(const OsdTarget& target, MessageURef msg); + + /// Read object data. Returns bufferlist on success; completes with + /// exception on error (e.g. -ENOENT). + seastar::future read(const object_t& oid, + const object_locator_t& oloc, + uint64_t off, uint64_t len, + snapid_t snap = CEPH_NOSNAP); + + /// Write object data. Throttled per OSD. + seastar::future<> write(const object_t& oid, + const object_locator_t& oloc, + uint64_t off, ceph::bufferlist&& bl, + snapid_t snap = CEPH_NOSNAP); + + /// Stat object. Returns (size, mtime). + seastar::future> stat( + const object_t& oid, + const object_locator_t& oloc, + snapid_t snap = CEPH_NOSNAP); + + /// Discard (zero) object range. For RBD trim/UNMAP. + seastar::future<> discard(const object_t& oid, + const object_locator_t& oloc, + uint64_t off, uint64_t len, + snapid_t snap = CEPH_NOSNAP); + + /// Write zeroes to object range. For RBD write_zeroes. + seastar::future<> write_zeroes(const object_t& oid, + const object_locator_t& oloc, + uint64_t off, uint64_t len, + snapid_t snap = CEPH_NOSNAP); + + /// Compare-and-write: compare at cmp_off with cmp_bl; if match, write + /// write_bl at write_off. Returns -EILSEQ on mismatch. + seastar::future<> compare_and_write(const object_t& oid, + const object_locator_t& oloc, + uint64_t cmp_off, ceph::bufferlist&& cmp_bl, + uint64_t write_off, ceph::bufferlist&& write_bl, + snapid_t snap = CEPH_NOSNAP); + + /// Execute cls method on object. Returns output bufferlist. + /// For use by librbd_crimson (cls/rbd get_size, get_features, etc.). + seastar::future exec(const object_t& oid, + const object_locator_t& oloc, + std::string_view cname, + std::string_view method, + ceph::bufferlist&& indata, + snapid_t snap = CEPH_NOSNAP); + + /// Wait until OSDMap has been received (epoch > 0). + seastar::future<> wait_for_osdmap(); + + /// Set client incarnation for MOSDOp reqid. Call before first op to avoid + /// duplicate detection with previous sessions. Default 0. + void set_client_incarnation(int inc) { client_inc = inc; } + +#ifdef UNIT_TESTS_BUILT + /// Inject OSDMap for unit tests (bypasses monitor). Also fulfills wait_for_osdmap. + seastar::future<> inject_osdmap_for_test(std::unique_ptr m); +#endif + + // Dispatcher interface + std::optional> + ms_dispatch(crimson::net::ConnectionRef conn, + MessageRef m) override; + + void ms_handle_connect(crimson::net::ConnectionRef conn, + seastar::shard_id prv_shard) override; + void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override; + +private: + crimson::net::Messenger& msgr; + crimson::mon::Client& monc; + + std::unique_ptr osdmap; + mutable seastar::shared_mutex osdmap_mutex; + + std::unordered_map sessions; + std::unordered_map> + pending_connects; + + seastar::gate dispatch_gate; + bool started = false; + int client_inc = 0; + ceph_tid_t next_tid = 1; + + std::optional> osdmap_ready; + bool osdmap_ready_fulfilled = false; + + seastar::future<> handle_osd_map(Ref m); + void handle_osd_op_reply(crimson::net::ConnectionRef conn, MOSDOpReply* m); + seastar::future<> maybe_request_map(); +}; + +} // namespace crimson::osdc diff --git a/src/crimson/tools/CMakeLists.txt b/src/crimson/tools/CMakeLists.txt index 6898efa04a0..ae7b493d42f 100644 --- a/src/crimson/tools/CMakeLists.txt +++ b/src/crimson/tools/CMakeLists.txt @@ -19,6 +19,19 @@ install(TARGETS crimson-store-bench DESTINATION bin) add_executable(perf-crimson-msgr perf_crimson_msgr.cc) target_link_libraries(perf-crimson-msgr crimson) +add_executable(crimson-rados-demo rados_demo.cc) +if(HAS_VTA) + set_source_files_properties(rados_demo.cc + PROPERTIES COMPILE_FLAGS -fno-var-tracking-assignments) +endif() +target_link_libraries(crimson-rados-demo + crimson-main-config-bootstrap + crimson + crimson-common + legacy-option-headers + ${FMT_LIB}) +install(TARGETS crimson-rados-demo DESTINATION bin) + add_executable(perf-async-msgr perf_async_msgr.cc) target_link_libraries(perf-async-msgr ceph-common global ${ALLOC_LIBS}) diff --git a/src/crimson/tools/rados_demo.cc b/src/crimson/tools/rados_demo.cc new file mode 100644 index 00000000000..5452d8fb65c --- /dev/null +++ b/src/crimson/tools/rados_demo.cc @@ -0,0 +1,187 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 sts=2 expandtab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2026 IBM Corporation + * + * Minimal RADOS client demo using Crimson Objecter and RadosClient. + * Connects to cluster, creates IoCtx, and exercises: write, read, + * discard, write_zeroes, compare_and_write. Integration test for + * Objecter extensions required by bdev_rbd. + * + * Usage: crimson-rados-demo -c /etc/ceph/ceph.conf -n client.admin --pool rbd + */ + +#include + +#include +#include +#include +#include + +#include "auth/KeyRing.h" +#include "crimson/client/io_context.h" +#include "crimson/client/rados_client.h" +#include "crimson/osdc/objecter.h" +#include "crimson/common/config_proxy.h" +#include "crimson/common/fatal_signal.h" +#include "crimson/common/log.h" +#include "crimson/common/perf_counters_collection.h" +#include "crimson/mon/MonClient.h" +#include "crimson/net/Messenger.h" +#include "crimson/osd/main_config_bootstrap_helpers.h" +#include "msg/msg_types.h" + +namespace bpo = boost::program_options; + +static seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_client); +} + +int main(int argc, const char* argv[]) +{ + auto early_result = crimson::osd::get_early_config_client(argc, argv); + if (!early_result.has_value()) { + std::cerr << "get_early_config_client failed: " << early_result.error() + << std::endl; + return early_result.error(); + } + auto& early_config = early_result.value(); + + seastar::app_template::config app_cfg; + app_cfg.name = "crimson-rados-demo"; + app_cfg.auto_handle_sigint_sigterm = false; + seastar::app_template app(std::move(app_cfg)); + app.add_options() + ("pool", bpo::value()->default_value("rbd"), + "pool name for I/O demo") + ("obj", bpo::value()->default_value("crimson_rados_demo_obj"), + "object name for write/read") + ("debug", "enable debug logging"); + + try { + return app.run( + early_config.get_early_args().size(), + const_cast(early_config.get_early_args().data()), + [&] { + auto& config = app.configuration(); + auto config_proxy_args = early_config.ceph_args; + return seastar::async([config_proxy_args, &config, &early_config] { + try { + FatalSignal fatal_signal; + if (config.count("debug")) { + seastar::global_logger_registry().set_all_loggers_level( + seastar::log_level::debug); + } + + crimson::common::sharded_conf().start( + early_config.init_params.name, + early_config.cluster_name).get(); + crimson::common::local_conf().start().get(); + auto stop_conf = seastar::deferred_stop( + crimson::common::sharded_conf()); + crimson::common::sharded_perf_coll().start().get(); + auto stop_perf = seastar::deferred_stop( + crimson::common::sharded_perf_coll()); + + crimson::common::local_conf().parse_config_files( + early_config.conf_file_list).get(); + crimson::common::local_conf().parse_env().get(); + crimson::common::local_conf().parse_argv( + config_proxy_args).get(); + + crimson::osd::populate_config_from_mon().get(); + + const auto pool_name = config["pool"].as(); + const auto obj_name = config["obj"].as(); + + class DemoAuthHandler : public crimson::common::AuthHandler { + public: + void handle_authentication(const EntityName& name, + const AuthCapsInfo& caps) override {} + }; + auto auth_handler = std::make_unique(); + auto msgr = crimson::net::Messenger::create( + entity_name_t(early_config.init_params.name.get_type(), -1), + "rados_demo", + crimson::osd::get_nonce(), + true); + crimson::mon::Client monc(*msgr, *auth_handler); + msgr->set_auth_client(&monc); + msgr->set_auth_server(&monc); + + crimson::client::RadosClient rados(*msgr, monc); + crimson::net::dispatchers_t dispatchers; + dispatchers.push_back(&monc); + dispatchers.push_back(&rados.get_objecter()); + msgr->start(dispatchers).get(); + auto stop_msgr = seastar::defer([&] { + msgr->stop(); + msgr->shutdown().get(); + }); + + monc.start().get(); + auto stop_monc = seastar::defer([&] { monc.stop().get(); }); + + rados.get_objecter().set_client_incarnation( + static_cast(crimson::osd::get_nonce() & 0x7fffffff)); + rados.connect().get(); + auto ioctx = rados.create_ioctx(pool_name).get(); + + ceph::bufferlist bl; + bl.append_zero(4096); + ioctx.write(obj_name, 0, std::move(bl)).get(); + logger().info("wrote {} bytes to {}/{}", 4096, pool_name, obj_name); + + auto data = ioctx.read(obj_name, 0, 4096).get(); + logger().info("read {} bytes from {}/{}", data.length(), + pool_name, obj_name); + + // Test discard (zero range) and write_zeroes + ioctx.discard(obj_name, 512, 1024).get(); + logger().info("discarded bytes 512-1536"); + ioctx.write_zeroes(obj_name, 2048, 512).get(); + logger().info("write_zeroes bytes 2048-2560"); + + data = ioctx.read(obj_name, 0, 4096).get(); + logger().info("read {} bytes after discard/write_zeroes", + data.length()); + + // Test compare_and_write: write known content, then compare-and-write + ceph::bufferlist init_bl; + init_bl.append("ABCD", 4); + ioctx.write(obj_name, 0, std::move(init_bl)).get(); + logger().info("wrote ABCD at 0 for compare_and_write test"); + ceph::bufferlist cmp_bl; + cmp_bl.append("ABCD", 4); + ceph::bufferlist write_bl; + write_bl.append("WXYZ", 4); + ioctx.compare_and_write(obj_name, 0, std::move(cmp_bl), 0, + std::move(write_bl)).get(); + logger().info("compare_and_write succeeded (ABCD -> WXYZ)"); + data = ioctx.read(obj_name, 0, 4).get(); + logger().info("read {} bytes after compare_and_write", data.length()); + + // Test exec: cls hello say_hello (returns "Hello, world!" with empty in) + ceph::bufferlist exec_in; + auto exec_out = ioctx.exec(obj_name, "hello", "say_hello", + std::move(exec_in)).get(); + std::string exec_str(exec_out.c_str(), exec_out.length()); + logger().info("exec hello say_hello returned: {}", exec_str); + + rados.shutdown().get(); + logger().info("crimson-rados-demo completed successfully"); + return EXIT_SUCCESS; + } catch (const std::exception& e) { + logger().error("crimson-rados-demo failed: {}", e.what()); + return EXIT_FAILURE; + } + }); + }); + } catch (const std::exception& e) { + std::cerr << "FATAL: " << e.what() << std::endl; + return EXIT_FAILURE; + } +} diff --git a/src/messages/MOSDOp.h b/src/messages/MOSDOp.h index 7736ac1e264..518dfefda9e 100644 --- a/src/messages/MOSDOp.h +++ b/src/messages/MOSDOp.h @@ -24,6 +24,7 @@ #include "MOSDFastDispatchOp.h" #include "include/ceph_features.h" #include "include/ceph_fs.h" // for CEPH_MSG_OSD_OP +#include "include/rados.h" // for CEPH_OSD_OP_CALL, ceph_osd_op::cls #include "common/hobject.h" /* @@ -230,6 +231,28 @@ public: void zero(uint64_t off, uint64_t len) { add_simple_op(CEPH_OSD_OP_ZERO, off, len); } + /// Add op with data payload (e.g. CMPEXT compare buffer). + void add_op_with_data(int o, uint64_t off, uint64_t len, ceph::buffer::list&& bl) { + OSDOp osd_op; + osd_op.op.op = o; + osd_op.op.extent.offset = off; + osd_op.op.extent.length = len; + osd_op.indata = std::move(bl); + ops.push_back(std::move(osd_op)); + } + /// Add cls call op (CEPH_OSD_OP_CALL). indata = cname + method + in. + void add_call(std::string_view cname, std::string_view method, + const ceph::buffer::list& indata) { + OSDOp osd_op; + osd_op.op.op = CEPH_OSD_OP_CALL; + osd_op.op.cls.class_len = static_cast<__u8>(std::min(cname.size(), size_t(255))); + osd_op.op.cls.method_len = static_cast<__u8>(std::min(method.size(), size_t(255))); + osd_op.op.cls.indata_len = indata.length(); + osd_op.indata.append(cname.data(), osd_op.op.cls.class_len); + osd_op.indata.append(method.data(), osd_op.op.cls.method_len); + osd_op.indata.append(indata); + ops.push_back(std::move(osd_op)); + } void truncate(uint64_t off) { add_simple_op(CEPH_OSD_OP_TRUNCATE, off, 0); } diff --git a/src/test/crimson/CMakeLists.txt b/src/test/crimson/CMakeLists.txt index 43028e8c92a..5c2c2384950 100644 --- a/src/test/crimson/CMakeLists.txt +++ b/src/test/crimson/CMakeLists.txt @@ -150,3 +150,12 @@ add_ceph_test(unittest-crimson-pg-pool-shift target_link_libraries(unittest-crimson-pg-pool-shift crimson-common crimson::gtest) + +add_executable(unittest-crimson-objecter + test_objecter.cc) +add_ceph_test(unittest-crimson-objecter + ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/unittest-crimson-objecter + --memory 256M --smp 1) +target_link_libraries(unittest-crimson-objecter + crimson + crimson::gtest) diff --git a/src/test/crimson/test_objecter.cc b/src/test/crimson/test_objecter.cc new file mode 100644 index 00000000000..473e7503270 --- /dev/null +++ b/src/test/crimson/test_objecter.cc @@ -0,0 +1,171 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 sts=2 expandtab + +#include "test/crimson/gtest_seastar.h" + +#include "common/ceph_context.h" +#include "crimson/client/io_context.h" +#include "crimson/client/rados_client.h" +#include "crimson/common/auth_handler.h" +#include "crimson/osdc/objecter.h" +#include "crimson/net/Messenger.h" +#include "crimson/mon/MonClient.h" +#include "osd/OSDMap.h" +#include "include/object.h" + +using namespace crimson; + +namespace { + +class DummyAuthHandler : public crimson::common::AuthHandler { +public: + void handle_authentication(const EntityName&, const AuthCapsInfo&) final {} +}; + +std::unique_ptr build_test_osdmap() +{ + auto cct = new CephContext(CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0); + auto osdmap = std::make_unique(); + uuid_d fsid; + const int r = osdmap->build_simple_with_pool(cct, 1, fsid, 1, 0, 0); + cct->put(); + if (r != 0) { + return nullptr; + } + + // Mark OSD 0 as up and in + OSDMap::Incremental inc(osdmap->get_epoch() + 1); + inc.fsid = osdmap->get_fsid(); + entity_addrvec_t addrs; + addrs.v.push_back(entity_addr_t()); + uuid_d uuid; + uuid.generate_random(); + addrs.v[0].nonce = 0; + inc.new_state[0] = CEPH_OSD_EXISTS | CEPH_OSD_NEW; + inc.new_up_client[0] = addrs; + inc.new_up_cluster[0] = addrs; + inc.new_hb_back_up[0] = addrs; + inc.new_hb_front_up[0] = addrs; + inc.new_weight[0] = CEPH_OSD_IN; + inc.new_uuid[0] = uuid; + osdmap->apply_incremental(inc); + + return osdmap; +} + +} // namespace + +struct objecter_test_t : public seastar_test_suite_t {}; + +#ifdef UNIT_TESTS_BUILT +TEST_F(objecter_test_t, calc_target_with_injected_osdmap) +{ + run_async([] { + auto osdmap = build_test_osdmap(); + ASSERT_TRUE(osdmap) << "failed to build test OSDMap"; + + auto msgr = crimson::net::Messenger::create( + entity_name_t::CLIENT(0), "test", 0, true); + DummyAuthHandler auth_handler; + crimson::mon::Client monc(*msgr, auth_handler); + crimson::osdc::Objecter objecter(*msgr, monc); + + objecter.inject_osdmap_for_test(std::move(osdmap)).get(); + + object_t oid("test-obj"); + object_locator_t oloc(1); // pool 1 (rbd from build_simple_with_pool) + + auto target = objecter.calc_target(oid, oloc).get(); + ASSERT_TRUE(target.has_value()) << "calc_target should succeed with valid pool"; + EXPECT_GE(target->primary_osd, 0); + EXPECT_EQ(target->pgid.pool(), 1); + }); +} + +TEST_F(objecter_test_t, calc_target_nonexistent_pool) +{ + run_async([] { + auto osdmap = build_test_osdmap(); + ASSERT_TRUE(osdmap); + + auto msgr = crimson::net::Messenger::create( + entity_name_t::CLIENT(0), "test", 0, true); + DummyAuthHandler auth_handler; + crimson::mon::Client monc(*msgr, auth_handler); + crimson::osdc::Objecter objecter(*msgr, monc); + + objecter.inject_osdmap_for_test(std::move(osdmap)).get(); + + object_t oid("test-obj"); + object_locator_t oloc(999); // non-existent pool + + auto target = objecter.calc_target(oid, oloc).get(); + EXPECT_FALSE(target.has_value()) << "calc_target should fail for nonexistent pool"; + }); +} +#endif + +TEST_F(objecter_test_t, iocontext_get_pool_id) +{ + run_async([] { + auto msgr = crimson::net::Messenger::create( + entity_name_t::CLIENT(0), "test", 0, true); + DummyAuthHandler auth_handler; + crimson::mon::Client monc(*msgr, auth_handler); + crimson::osdc::Objecter objecter(*msgr, monc); + + crimson::client::IoCtx ioctx(objecter, 42, CEPH_NOSNAP); + EXPECT_EQ(ioctx.get_pool_id(), 42); + EXPECT_EQ(ioctx.get_snap_seq(), CEPH_NOSNAP); + }); +} + +#ifdef UNIT_TESTS_BUILT +/// Verify IoCtx has discard, write_zeroes, compare_and_write, exec APIs. +/// Full I/O testing requires a real cluster (crimson-rados-demo with vstart, +/// run_crimson_rados_demo_integration_test.sh). +TEST_F(objecter_test_t, iocontext_discard_write_zeroes_api) +{ + run_async([] { + auto msgr = crimson::net::Messenger::create( + entity_name_t::CLIENT(0), "test", 0, true); + DummyAuthHandler auth_handler; + crimson::mon::Client monc(*msgr, auth_handler); + crimson::osdc::Objecter objecter(*msgr, monc); + + crimson::client::IoCtx ioctx(objecter, 1, CEPH_NOSNAP); + + // Verify API compiles and returns seastar::future<> + static_assert(std::is_same_v>); + static_assert(std::is_same_v>); + + ceph::bufferlist cmp_bl, write_bl; + cmp_bl.append_zero(4); + write_bl.append_zero(4); + static_assert(std::is_same_v>); + }); +} + +/// Verify IoCtx has exec() API for cls calls. +TEST_F(objecter_test_t, iocontext_exec_api) +{ + run_async([] { + auto msgr = crimson::net::Messenger::create( + entity_name_t::CLIENT(0), "test", 0, true); + DummyAuthHandler auth_handler; + crimson::mon::Client monc(*msgr, auth_handler); + crimson::osdc::Objecter objecter(*msgr, monc); + + crimson::client::IoCtx ioctx(objecter, 1, CEPH_NOSNAP); + + ceph::bufferlist indata; + static_assert(std::is_same_v>); + }); +} +#endif