/src/td
/src/vstart_environment.sh
+# Local integration test scripts (not for commit)
+/src/nvmeof/gateway/run_crimson_rbd_demo_integration_test.sh
+
# alpine abuild things
.abuild/
alpine/APKBUILD
set(crimson_mon_srcs
mon/MonClient.cc
${PROJECT_SOURCE_DIR}/src/mon/MonSub.cc)
+set(crimson_osdc_srcs
+ osdc/objecter.cc)
+set(crimson_client_srcs
+ client/io_context.cc
+ client/rados_client.cc)
set(crimson_net_srcs
${PROJECT_SOURCE_DIR}/src/msg/async/crypto_onwire.cc
${PROJECT_SOURCE_DIR}/src/msg/async/compression_onwire.cc
net/chained_dispatchers.cc)
add_library(crimson STATIC
${crimson_auth_srcs}
+ ${crimson_client_srcs}
${crimson_mgr_srcs}
${crimson_mon_srcs}
+ ${crimson_osdc_srcs}
${crimson_net_srcs})
target_compile_options(crimson PUBLIC
"-ftemplate-backtrace-limit=0")
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2026 IBM Corporation
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "crimson/client/io_context.h"
+
+#include "crimson/common/log.h"
+#include "crimson/osdc/objecter.h"
+#include "include/object.h"
+#include "osd/osd_types.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_client);
+ }
+}
+
+namespace crimson::client {
+
+IoCtx::IoCtx(crimson::osdc::Objecter& objecter,
+ int64_t pool_id,
+ snapid_t snap_seq)
+ : objecter(objecter), pool_id(pool_id), snap_seq(snap_seq)
+{}
+
+seastar::future<ceph::bufferlist> IoCtx::read(const std::string& oid,
+ uint64_t off,
+ uint64_t len)
+{
+ logger().debug("IoCtx::read oid={} off={} len={}", oid, off, len);
+ object_locator_t oloc(pool_id);
+ return objecter.read(object_t(oid), oloc, off, len, snap_seq);
+}
+
+seastar::future<> IoCtx::write(const std::string& oid,
+ uint64_t off,
+ ceph::bufferlist&& bl)
+{
+ logger().debug("IoCtx::write oid={} off={} len={}", oid, off, bl.length());
+ object_locator_t oloc(pool_id);
+ return objecter.write(object_t(oid), oloc, off, std::move(bl), snap_seq);
+}
+
+seastar::future<std::pair<uint64_t, ceph::real_time>> IoCtx::stat(
+ const std::string& oid)
+{
+ logger().debug("IoCtx::stat oid={}", oid);
+ object_locator_t oloc(pool_id);
+ return objecter.stat(object_t(oid), oloc, snap_seq);
+}
+
+seastar::future<> IoCtx::discard(const std::string& oid,
+ uint64_t off,
+ uint64_t len)
+{
+ logger().debug("IoCtx::discard oid={} off={} len={}", oid, off, len);
+ object_locator_t oloc(pool_id);
+ return objecter.discard(object_t(oid), oloc, off, len, snap_seq);
+}
+
+seastar::future<> IoCtx::write_zeroes(const std::string& oid,
+ uint64_t off,
+ uint64_t len)
+{
+ logger().debug("IoCtx::write_zeroes oid={} off={} len={}", oid, off, len);
+ object_locator_t oloc(pool_id);
+ return objecter.write_zeroes(object_t(oid), oloc, off, len, snap_seq);
+}
+
+seastar::future<> IoCtx::compare_and_write(const std::string& oid,
+ uint64_t cmp_off,
+ ceph::bufferlist&& cmp_bl,
+ uint64_t write_off,
+ ceph::bufferlist&& write_bl)
+{
+ logger().debug("IoCtx::compare_and_write oid={} cmp_off={} write_off={}",
+ oid, cmp_off, write_off);
+ object_locator_t oloc(pool_id);
+ return objecter.compare_and_write(object_t(oid), oloc,
+ cmp_off, std::move(cmp_bl),
+ write_off, std::move(write_bl),
+ snap_seq);
+}
+
+seastar::future<ceph::bufferlist> IoCtx::exec(const std::string& oid,
+ std::string_view cname,
+ std::string_view method,
+ ceph::bufferlist&& indata)
+{
+ logger().debug("IoCtx::exec oid={} cls={}.{}", oid, cname, method);
+ object_locator_t oloc(pool_id);
+ return objecter.exec(object_t(oid), oloc, cname, method, std::move(indata),
+ snap_seq);
+}
+
+} // namespace crimson::client
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2026 IBM Corporation
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <string>
+#include <string_view>
+
+#include <seastar/core/future.hh>
+
+#include "include/buffer.h"
+#include "include/types.h"
+#include "common/ceph_time.h"
+
+namespace crimson::osdc {
+class Objecter;
+}
+
+namespace crimson::client {
+
+/**
+ * Pool-scoped I/O context. Delegates read, write, stat to Objecter
+ * with object_locator_t built from pool_id.
+ */
+class IoCtx {
+public:
+ IoCtx(crimson::osdc::Objecter& objecter,
+ int64_t pool_id,
+ snapid_t snap_seq = CEPH_NOSNAP);
+
+ seastar::future<ceph::bufferlist> read(const std::string& oid,
+ uint64_t off,
+ uint64_t len);
+
+ seastar::future<> write(const std::string& oid,
+ uint64_t off,
+ ceph::bufferlist&& bl);
+
+ seastar::future<std::pair<uint64_t, ceph::real_time>> stat(
+ const std::string& oid);
+
+ seastar::future<> discard(const std::string& oid,
+ uint64_t off,
+ uint64_t len);
+
+ seastar::future<> write_zeroes(const std::string& oid,
+ uint64_t off,
+ uint64_t len);
+
+ seastar::future<> compare_and_write(const std::string& oid,
+ uint64_t cmp_off,
+ ceph::bufferlist&& cmp_bl,
+ uint64_t write_off,
+ ceph::bufferlist&& write_bl);
+
+ /// Execute cls method on object. Returns output bufferlist.
+ seastar::future<ceph::bufferlist> exec(const std::string& oid,
+ std::string_view cname,
+ std::string_view method,
+ ceph::bufferlist&& indata);
+
+ int64_t get_pool_id() const { return pool_id; }
+ snapid_t get_snap_seq() const { return snap_seq; }
+
+private:
+ crimson::osdc::Objecter& objecter;
+ int64_t pool_id;
+ snapid_t snap_seq;
+};
+
+} // namespace crimson::client
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2026 IBM Corporation
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "crimson/client/rados_client.h"
+
+#include <cerrno>
+#include <system_error>
+
+#include "crimson/common/log.h"
+#include "crimson/osdc/objecter.h"
+#include "osd/OSDMap.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_client);
+ }
+}
+
+namespace crimson::client {
+
+RadosClient::RadosClient(crimson::net::Messenger& msgr,
+ crimson::mon::Client& monc)
+ : msgr(msgr),
+ monc(monc),
+ objecter(std::make_unique<crimson::osdc::Objecter>(msgr, monc))
+{}
+
+RadosClient::~RadosClient() = default;
+
+crimson::osdc::Objecter& RadosClient::get_objecter()
+{
+ return *objecter;
+}
+
+seastar::future<> RadosClient::connect()
+{
+ logger().info("RadosClient::connect");
+ return objecter->start();
+}
+
+seastar::future<IoCtx> RadosClient::create_ioctx(std::string_view pool_name)
+{
+ logger().debug("RadosClient::create_ioctx pool_name={}", pool_name);
+ return objecter->wait_for_osdmap()
+ .then([this, pool_name = std::string(pool_name)] {
+ return objecter->with_osdmap(
+ [pool_name](const OSDMap& o) {
+ return o.lookup_pg_pool_name(pool_name);
+ });
+ })
+ .then([this](int64_t pool_id) {
+ if (pool_id < 0) {
+ return seastar::make_exception_future<IoCtx>(
+ std::system_error(-pool_id, std::system_category(), "pool not found"));
+ }
+ logger().debug("RadosClient::create_ioctx pool_id={}", pool_id);
+ return seastar::make_ready_future<IoCtx>(IoCtx(*objecter, pool_id));
+ });
+}
+
+seastar::future<IoCtx> RadosClient::create_ioctx(int64_t pool_id)
+{
+ logger().debug("RadosClient::create_ioctx pool_id={}", pool_id);
+ return objecter->wait_for_osdmap()
+ .then([this, pool_id] {
+ return objecter->with_osdmap(
+ [pool_id](const OSDMap& o) {
+ return o.have_pg_pool(pool_id);
+ });
+ })
+ .then([this, pool_id](bool exists) {
+ if (!exists) {
+ return seastar::make_exception_future<IoCtx>(
+ std::system_error(ENOENT, std::system_category(), "pool not found"));
+ }
+ return seastar::make_ready_future<IoCtx>(IoCtx(*objecter, pool_id));
+ });
+}
+
+seastar::future<> RadosClient::shutdown()
+{
+ logger().info("RadosClient::shutdown");
+ return objecter->stop();
+}
+
+} // namespace crimson::client
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2026 IBM Corporation
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <string_view>
+
+#include <seastar/core/future.hh>
+
+#include "crimson/client/io_context.h"
+
+namespace crimson::mon {
+class Client;
+}
+
+namespace crimson::net {
+class Messenger;
+}
+
+namespace crimson::osdc {
+class Objecter;
+}
+
+namespace crimson::client {
+
+class IoCtx;
+
+/**
+ * RADOS client facade. Holds Messenger, MonClient, Objecter.
+ * Provides connect(), create_ioctx(pool_name), shutdown().
+ *
+ * Usage:
+ * auto auth_handler = std::make_unique<DummyAuthHandler>();
+ * auto msgr = crimson::net::Messenger::create(entity_name_t::CLIENT(), ...);
+ * crimson::mon::Client monc{*msgr, *auth_handler};
+ * RadosClient client(*msgr, monc);
+ *
+ * msgr->set_auth_client(&monc);
+ * msgr->start({&monc, &client.get_objecter()});
+ * monc.start().get();
+ * client.connect().get();
+ *
+ * auto ioctx = client.create_ioctx("rbd").get();
+ * co_await ioctx.write("obj", 0, std::move(bl));
+ * auto data = co_await ioctx.read("obj", 0, 4096);
+ *
+ * client.shutdown().get();
+ */
+class RadosClient {
+public:
+ RadosClient(crimson::net::Messenger& msgr,
+ crimson::mon::Client& monc);
+
+ ~RadosClient();
+
+ /// Objecter reference for adding to Messenger dispatchers before start.
+ crimson::osdc::Objecter& get_objecter();
+
+ /// Start Objecter (subscribe to osdmap). Call after msgr.start() and monc.start().
+ seastar::future<> connect();
+
+ /// Create pool-scoped IoCtx by pool name. Waits for OSDMap, then looks up pool.
+ /// Fails with -ENOENT if pool not found.
+ seastar::future<IoCtx> create_ioctx(std::string_view pool_name);
+
+ /// Create pool-scoped IoCtx by pool id. Validates pool exists in OSDMap.
+ seastar::future<IoCtx> create_ioctx(int64_t pool_id);
+
+ /// Stop Objecter.
+ seastar::future<> shutdown();
+
+private:
+ crimson::net::Messenger& msgr;
+ crimson::mon::Client& monc;
+ std::unique_ptr<crimson::osdc::Objecter> objecter;
+};
+
+} // namespace crimson::client
+add_library(crimson-main-config-bootstrap STATIC
+ main_config_bootstrap_helpers.cc)
+target_link_libraries(crimson-main-config-bootstrap
+ crimson-common
+ crimson
+ legacy-option-headers)
+
add_executable(crimson-osd
backfill_state.cc
ec_backend.cc
heartbeat.cc
lsan_suppressions.cc
main.cc
- main_config_bootstrap_helpers.cc
osd.cc
osd_meta.cc
pg.cc
PROPERTIES COMPILE_FLAGS -fno-var-tracking-assignments)
endif()
target_link_libraries(crimson-osd
+ crimson-main-config-bootstrap
legacy-option-headers
crimson-admin
crimson-common
}
static tl::expected<early_config_t, int>
-_get_early_config(int argc, const char *argv[])
+_get_early_config(int argc, const char *argv[], int entity_type)
{
early_config_t ret;
ret.init_params = ceph_argparse_early_args(
early_args,
- CEPH_ENTITY_TYPE_OSD,
+ entity_type,
&ret.cluster_name,
&ret.conf_file_list);
int r = app.run(
sizeof(bootstrap_args) / sizeof(bootstrap_args[0]),
const_cast<char**>(bootstrap_args),
- [argc, argv, &ret, &early_args] {
- return seastar::async([argc, argv, &ret, &early_args] {
+ [argc, argv, &ret, &early_args, entity_type] {
+ return seastar::async([argc, argv, &ret, &early_args, entity_type] {
seastar::global_logger_registry().set_all_loggers_level(
seastar::log_level::debug);
sharded_conf().start(
cpu_cores);
} else {
auto reactor_num = crimson::common::get_conf<uint64_t>("crimson_cpu_num");
- if (!reactor_num) {
- // We would like to avoid seastar using all available cores.
+ if (entity_type == CEPH_ENTITY_TYPE_CLIENT && !reactor_num) {
+ ret.early_args.emplace_back("--smp");
+ ret.early_args.emplace_back("1");
+ ret.early_args.emplace_back("--thread-affinity");
+ ret.early_args.emplace_back("0");
+ logger().info("get_early_config: client default --smp 1");
+ } else if (!reactor_num) {
logger().error("get_early_config: crimson_cpu_set"
" or crimson_cpu_num must be set");
ceph_abort();
+ } else {
+ std::string smp = fmt::format("{}", reactor_num);
+ ret.early_args.emplace_back("--smp");
+ ret.early_args.emplace_back(smp);
+ ret.early_args.emplace_back("--thread-affinity");
+ ret.early_args.emplace_back("0");
+ logger().info("get_early_config: set --thread-affinity 0 --smp {}",
+ smp);
}
- std::string smp = fmt::format("{}", reactor_num);
- ret.early_args.emplace_back("--smp");
- ret.early_args.emplace_back(smp);
- ret.early_args.emplace_back("--thread-affinity");
- ret.early_args.emplace_back("0");
- logger().info("get_early_config: set --thread-affinity 0 --smp {}",
- smp);
-
}
} else {
logger().error("get_early_config: --cpuset can be "
return tl::unexpected(-errno);
} else if (worker == 0) { // child
close(pipes[0]);
- auto ret = _get_early_config(argc, argv);
+ auto ret = _get_early_config(argc, argv, CEPH_ENTITY_TYPE_OSD);
if (ret.has_value()) {
bufferlist bl;
::encode(ret.value(), bl);
}
}
+tl::expected<early_config_t, int>
+get_early_config_client(int argc, const char *argv[])
+{
+ auto args = argv_to_vec(argc, argv);
+ if (args.empty()) {
+ std::cerr << argv[0] << ": -h or --help for usage" << std::endl;
+ exit(1);
+ }
+ if (ceph_argparse_need_usage(args)) {
+ std::cout << "usage: " << argv[0] << " [options] [--pool <pool>]" << std::endl;
+ generic_server_usage();
+ exit(0);
+ }
+ int pipes[2];
+ int r = pipe2(pipes, 0);
+ if (r < 0) {
+ std::cerr << "get_early_config_client: failed to create pipes: "
+ << -errno << std::endl;
+ return tl::unexpected(-errno);
+ }
+
+ pid_t worker = fork();
+ if (worker < 0) {
+ close(pipes[0]);
+ close(pipes[1]);
+ std::cerr << "get_early_config_client: failed to fork: "
+ << -errno << std::endl;
+ return tl::unexpected(-errno);
+ } else if (worker == 0) { // child
+ close(pipes[0]);
+ auto ret = _get_early_config(argc, argv, CEPH_ENTITY_TYPE_CLIENT);
+ if (ret.has_value()) {
+ bufferlist bl;
+ ::encode(ret.value(), bl);
+ r = bl.write_fd(pipes[1]);
+ close(pipes[1]);
+ if (r < 0) {
+ std::cerr << "get_early_config_client: child failed to write_fd: "
+ << r << std::endl;
+ exit(-r);
+ } else {
+ exit(0);
+ }
+ } else {
+ std::cerr << "get_early_config_client: child failed: "
+ << -ret.error() << std::endl;
+ exit(-ret.error());
+ }
+ return tl::unexpected(-1);
+ } else { // parent
+ close(pipes[1]);
+
+ bufferlist bl;
+ early_config_t ret;
+ bool have_data = false;
+ while ((r = bl.read_fd(pipes[0], 1024)) > 0) {
+ have_data = true;
+ }
+ close(pipes[0]);
+
+ int status;
+ waitpid(worker, &status, 0);
+
+ if (!have_data && WIFEXITED(status) && WEXITSTATUS(status) == 0) {
+ exit(0);
+ }
+ if (r < 0) {
+ std::cerr << "get_early_config_client: parent failed to read from pipe: "
+ << r << std::endl;
+ return tl::unexpected(r);
+ }
+ try {
+ auto bliter = bl.cbegin();
+ ::decode(ret, bliter);
+ return ret;
+ } catch (...) {
+ std::cerr << "get_early_config_client: parent failed to decode" << std::endl;
+ return tl::unexpected(-EINVAL);
+ }
+ }
+}
+
}
tl::expected<early_config_t, int>
get_early_config(int argc, const char *argv[]);
+/// Like get_early_config but for CEPH_ENTITY_TYPE_CLIENT (e.g. client.admin).
+tl::expected<early_config_t, int>
+get_early_config_client(int argc, const char *argv[]);
+
}
WRITE_CLASS_ENCODER(crimson::osd::early_config_t)
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2026 IBM Corporation
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "objecter.h"
+
+#include <boost/intrusive_ptr.hpp>
+
+#include "crimson/common/log.h"
+#include "crimson/net/Connection.h"
+#include "crimson/net/Messenger.h"
+#include "messages/MOSDOp.h"
+#include "messages/MOSDOpReply.h"
+#include "messages/MOSDMap.h"
+
+#include "common/entity_name.h"
+#include "common/hobject.h"
+#include "crimson/mon/MonClient.h"
+#include "include/rados.h"
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_objecter);
+ }
+}
+
+namespace crimson::osdc {
+
+Objecter::Objecter(crimson::net::Messenger& msgr,
+ crimson::mon::Client& monc)
+ : msgr(msgr),
+ monc(monc),
+ osdmap(std::make_unique<OSDMap>())
+{}
+
+Objecter::~Objecter() {}
+
+seastar::future<> Objecter::start()
+{
+ logger().info("Objecter::start");
+ ceph_assert(!started);
+ started = true;
+ osdmap_ready.emplace();
+
+ // Subscribe to osdmap updates from the monitor
+ if (monc.sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME)) {
+ return monc.renew_subs();
+ }
+ return seastar::now();
+}
+
+seastar::future<> Objecter::stop()
+{
+ logger().info("Objecter::stop");
+ ceph_assert(started);
+ started = false;
+ return dispatch_gate.close();
+}
+
+std::optional<seastar::future<>>
+Objecter::ms_dispatch(crimson::net::ConnectionRef conn,
+ MessageRef m)
+{
+ const int msg_type = m->get_type();
+
+ // Only handle messages from MON (OSDMap) or OSD (OpReply)
+ if (conn->peer_is_mon()) {
+ if (msg_type != CEPH_MSG_OSD_MAP) {
+ return std::nullopt;
+ }
+ auto mosdmap = boost::static_pointer_cast<MOSDMap>(m);
+ return seastar::with_gate(dispatch_gate, [this, mosdmap] {
+ return handle_osd_map(std::move(mosdmap));
+ });
+ }
+
+ if (conn->peer_is_osd()) {
+ if (msg_type != CEPH_MSG_OSD_OPREPLY) {
+ return std::nullopt;
+ }
+ auto reply = boost::static_pointer_cast<MOSDOpReply>(m);
+ return seastar::with_gate(dispatch_gate, [this, conn, reply] {
+ handle_osd_op_reply(conn, reply.get());
+ return seastar::now();
+ });
+ }
+
+ return std::nullopt;
+}
+
+seastar::future<> Objecter::handle_osd_map(Ref<MOSDMap> m)
+{
+ logger().debug("handle_osd_map: epochs [{},{}]",
+ m->get_first(), m->get_last());
+
+ if (m->fsid != monc.get_fsid()) {
+ logger().warn("handle_osd_map: fsid mismatch {} != {}",
+ m->fsid, monc.get_fsid());
+ return seastar::now();
+ }
+
+ return seastar::with_lock(osdmap_mutex, [this, m = std::move(m)] {
+ seastar::future<> renew_fut = seastar::now();
+
+ if (m->get_last() <= osdmap->get_epoch()) {
+ logger().debug("handle_osd_map: ignoring stale epochs [{},{}] <= {}",
+ m->get_first(), m->get_last(), osdmap->get_epoch());
+ } else {
+ if (osdmap->get_epoch()) {
+ // Apply incremental maps
+ for (epoch_t e = osdmap->get_epoch() + 1; e <= m->get_last(); ++e) {
+ if (osdmap->get_epoch() == e - 1 && m->incremental_maps.count(e)) {
+ logger().debug("handle_osd_map: applying incremental epoch {}", e);
+ OSDMap::Incremental inc(m->incremental_maps[e]);
+ osdmap->apply_incremental(inc);
+ } else if (m->maps.count(e)) {
+ logger().debug("handle_osd_map: applying full epoch {}", e);
+ auto new_osdmap = std::make_unique<OSDMap>();
+ new_osdmap->decode(m->maps[e]);
+ osdmap = std::move(new_osdmap);
+ }
+ }
+ } else {
+ // First map - need full
+ if (m->maps.count(m->get_last())) {
+ logger().debug("handle_osd_map: decoding initial full epoch {}",
+ m->get_last());
+ osdmap->decode(m->maps[m->get_last()]);
+ } else {
+ logger().debug("handle_osd_map: need full map, requesting");
+ renew_fut = maybe_request_map();
+ }
+ }
+ }
+
+ monc.sub_got("osdmap", osdmap->get_epoch());
+
+ if (osdmap->get_epoch() > 0 && osdmap_ready && !osdmap_ready_fulfilled) {
+ osdmap_ready_fulfilled = true;
+ osdmap_ready->set_value();
+ }
+ return renew_fut;
+ });
+}
+
+void Objecter::handle_osd_op_reply(crimson::net::ConnectionRef conn,
+ MOSDOpReply* m)
+{
+ const ceph_tid_t tid = m->get_tid();
+ const int r = m->get_result();
+
+ if (!conn->peer_is_osd()) {
+ return;
+ }
+ const int osd_id = conn->get_peer_id();
+ auto it = sessions.find(osd_id);
+ if (it == sessions.end()) {
+ logger().warn("handle_osd_op_reply: tid={} from unknown osd.{}", tid, osd_id);
+ return;
+ }
+ auto op_it = it->second.ops.find(tid);
+ if (op_it == it->second.ops.end()) {
+ logger().debug("handle_osd_op_reply: tid={} not found (may be stale)", tid);
+ return;
+ }
+ auto completion = std::move(op_it->second);
+ it->second.ops.erase(op_it);
+
+ std::vector<OSDOp> ops;
+ m->claim_ops(ops);
+ completion(r, ops);
+}
+
+seastar::future<> Objecter::wait_for_osdmap()
+{
+ return with_osdmap([](const OSDMap& o) { return o.get_epoch(); })
+ .then([this](epoch_t epoch) {
+ if (epoch > 0) {
+ return seastar::now();
+ }
+ return osdmap_ready->get_shared_future();
+ });
+}
+
+seastar::future<> Objecter::maybe_request_map()
+{
+ if (monc.sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME)) {
+ return monc.renew_subs();
+ }
+ return seastar::now();
+}
+
+#ifdef UNIT_TESTS_BUILT
+seastar::future<> Objecter::inject_osdmap_for_test(std::unique_ptr<OSDMap> m)
+{
+ return seastar::with_lock(osdmap_mutex, [this, m = std::move(m)]() mutable {
+ osdmap = std::move(m);
+ monc.sub_got("osdmap", osdmap->get_epoch());
+ if (osdmap->get_epoch() > 0 && osdmap_ready && !osdmap_ready_fulfilled) {
+ osdmap_ready_fulfilled = true;
+ osdmap_ready->set_value();
+ }
+ });
+}
+#endif
+
+seastar::future<std::optional<OsdTarget>>
+Objecter::calc_target(const object_t& oid,
+ const object_locator_t& oloc) const
+{
+ return seastar::with_shared(osdmap_mutex, [this, oid, oloc] {
+ if (!osdmap->get_epoch()) {
+ return std::optional<OsdTarget>{}; // no OSDMap yet
+ }
+
+ pg_t raw_pg;
+ const int ret = osdmap->object_locator_to_pg(oid, oloc, raw_pg);
+ if (ret != 0) {
+ return std::optional<OsdTarget>{}; // pool does not exist (e.g. -ENOENT)
+ }
+
+ const pg_t actual_pgid = osdmap->raw_pg_to_pg(raw_pg);
+
+ int up_primary = -1;
+ int acting_primary = -1;
+ std::vector<int> up;
+ std::vector<int> acting;
+ osdmap->pg_to_up_acting_osds(actual_pgid,
+ &up, &up_primary,
+ &acting, &acting_primary);
+
+ if (acting_primary < 0 || !osdmap->exists(acting_primary)) {
+ return std::optional<OsdTarget>{};
+ }
+
+ const entity_addr_t addr = osdmap->get_addrs(acting_primary).front();
+ return std::optional<OsdTarget>{OsdTarget{actual_pgid, raw_pg, acting_primary, addr}};
+ });
+}
+
+seastar::future<crimson::net::ConnectionRef>
+Objecter::get_or_connect(const entity_addr_t& addr, int osd_id)
+{
+ auto it = sessions.find(osd_id);
+ if (it != sessions.end() && it->second.conn && it->second.conn->is_connected()) {
+ return seastar::make_ready_future<crimson::net::ConnectionRef>(it->second.conn);
+ }
+
+ auto pend_it = pending_connects.find(osd_id);
+ if (pend_it != pending_connects.end()) {
+ return pend_it->second.get_shared_future();
+ }
+
+ crimson::net::ConnectionRef conn = msgr.connect(
+ addr, entity_name_t(CEPH_ENTITY_TYPE_OSD, osd_id));
+
+ if (conn->is_connected()) {
+ if (it == sessions.end()) {
+ sessions.emplace(osd_id, OsdSession{osd_id, addr, conn});
+ } else {
+ it->second.conn = conn;
+ it->second.addr = addr;
+ }
+ return seastar::make_ready_future<crimson::net::ConnectionRef>(conn);
+ }
+
+ if (it == sessions.end()) {
+ sessions.emplace(osd_id, OsdSession{osd_id, addr, conn});
+ } else {
+ it->second.conn = conn;
+ it->second.addr = addr;
+ }
+
+ auto [promise_it, inserted] = pending_connects.emplace(
+ osd_id, seastar::shared_promise<crimson::net::ConnectionRef>{});
+ ceph_assert(inserted);
+ return promise_it->second.get_shared_future();
+}
+
+seastar::future<> Objecter::send_to_osd(const OsdTarget& target, MessageURef msg)
+{
+ return get_or_connect(target.primary_addr, target.primary_osd)
+ .then([msg = std::move(msg)](crimson::net::ConnectionRef conn) mutable {
+ return conn->send(std::move(msg));
+ });
+}
+
+seastar::future<ceph::bufferlist> Objecter::read(const object_t& oid,
+ const object_locator_t& oloc,
+ uint64_t off, uint64_t len,
+ snapid_t snap)
+{
+ return calc_target(oid, oloc)
+ .then([this, oid, oloc, off, len, snap](std::optional<OsdTarget> target) {
+ if (!target) {
+ return seastar::make_exception_future<ceph::bufferlist>(
+ std::system_error(ENOENT, std::generic_category(), "pool or object mapping failed"));
+ }
+ return with_osdmap([](const OSDMap& m) { return m.get_epoch(); })
+ .then([this, target = *target, oid, oloc, off, len, snap](epoch_t epoch) {
+ const ceph_tid_t tid = next_tid++;
+ const uint32_t hash = oloc.hash >= 0 ? static_cast<uint32_t>(oloc.hash)
+ : target.raw_pg.ps();
+ const hobject_t hobj(oid, oloc.key, snap, hash,
+ oloc.get_pool(), oloc.nspace);
+ spg_t spgid(target.pgid);
+ const int flags = CEPH_OSD_FLAG_RETURNVEC;
+
+ auto msg = crimson::make_message<MOSDOp>(client_inc, tid, hobj, spgid, epoch, flags, 0);
+ msg->set_snapid(snap);
+ msg->read(off, len);
+
+ seastar::promise<ceph::bufferlist> promise;
+ auto future = promise.get_future();
+ const int osd_id = target.primary_osd;
+
+ auto p = std::make_shared<seastar::promise<ceph::bufferlist>>(std::move(promise));
+ OpCompletion completion = [p](int r, std::vector<OSDOp>& ops) {
+ if (r < 0) {
+ p->set_exception(std::make_exception_ptr(
+ std::system_error(-r, std::generic_category(), "read failed")));
+ } else if (!ops.empty()) {
+ p->set_value(std::move(ops[0].outdata));
+ } else {
+ p->set_value(ceph::bufferlist{});
+ }
+ };
+
+ return get_or_connect(target.primary_addr, osd_id)
+ .then([this, tid, osd_id, c = std::move(completion), msg = std::move(msg)](
+ crimson::net::ConnectionRef conn) mutable {
+ auto it = sessions.find(osd_id);
+ ceph_assert(it != sessions.end());
+ it->second.ops[tid] = std::move(c);
+ return conn->send(std::move(msg));
+ })
+ .then([f = std::move(future)]() mutable {
+ return std::move(f);
+ });
+ });
+ });
+}
+
+seastar::future<> Objecter::write(const object_t& oid,
+ const object_locator_t& oloc,
+ uint64_t off, ceph::bufferlist&& bl,
+ snapid_t snap)
+{
+ return calc_target(oid, oloc)
+ .then([this, oid, oloc, off, bl = std::move(bl), snap](std::optional<OsdTarget> target) mutable {
+ if (!target) {
+ return seastar::make_exception_future<>(
+ std::system_error(ENOENT, std::generic_category(),
+ "pool or object mapping failed"));
+ }
+ return with_osdmap([](const OSDMap& m) { return m.get_epoch(); })
+ .then([this, target = *target, oid, oloc, off, bl = std::move(bl), snap](epoch_t epoch) mutable {
+ const ceph_tid_t tid = next_tid++;
+ const uint32_t hash = oloc.hash >= 0 ? static_cast<uint32_t>(oloc.hash)
+ : target.raw_pg.ps();
+ const hobject_t hobj(oid, oloc.key, snap, hash,
+ oloc.get_pool(), oloc.nspace);
+ spg_t spgid(target.pgid);
+ const size_t len = bl.length();
+ const int osd_id = target.primary_osd;
+
+ auto msg = crimson::make_message<MOSDOp>(client_inc, tid, hobj, spgid, epoch, 0, 0);
+ msg->set_snapid(snap);
+ msg->write(off, len, bl);
+ // MOSDOp::write puts data in Message::data, but OSD expects it in
+ // ops[0].indata for proper encode/decode. Move it there.
+ msg->ops[0].indata = std::move(msg->get_data());
+
+ seastar::promise<> promise;
+ auto future = promise.get_future();
+
+ auto p = std::make_shared<seastar::promise<>>(std::move(promise));
+ OpCompletion completion = [this, osd_id, len, p](int r, std::vector<OSDOp>&) {
+ auto it = sessions.find(osd_id);
+ if (it != sessions.end()) {
+ it->second.write_throttle.put(len);
+ }
+ if (r < 0) {
+ p->set_exception(std::make_exception_ptr(
+ std::system_error(-r, std::generic_category(), "write failed")));
+ } else {
+ p->set_value();
+ }
+ };
+
+ return get_or_connect(target.primary_addr, osd_id)
+ .then([this, tid, osd_id, len, c = std::move(completion), msg = std::move(msg)](
+ crimson::net::ConnectionRef conn) mutable {
+ auto it = sessions.find(osd_id);
+ ceph_assert(it != sessions.end());
+ return it->second.write_throttle.get(len).then(
+ [this, conn, tid, osd_id, c = std::move(c),
+ msg = std::move(msg)]() mutable {
+ auto it2 = sessions.find(osd_id);
+ ceph_assert(it2 != sessions.end());
+ it2->second.ops[tid] = std::move(c);
+ return conn->send(std::move(msg));
+ }).handle_exception([this, tid, osd_id, len](std::exception_ptr e) {
+ auto it2 = sessions.find(osd_id);
+ if (it2 != sessions.end()) {
+ it2->second.ops.erase(tid);
+ it2->second.write_throttle.put(len);
+ }
+ return seastar::make_exception_future<>(e);
+ });
+ })
+ .then([f = std::move(future)]() mutable {
+ return std::move(f);
+ });
+ });
+ });
+}
+
+seastar::future<> Objecter::discard(const object_t& oid,
+ const object_locator_t& oloc,
+ uint64_t off, uint64_t len,
+ snapid_t snap)
+{
+ return calc_target(oid, oloc)
+ .then([this, oid, oloc, off, len, snap](std::optional<OsdTarget> target) {
+ if (!target) {
+ return seastar::make_exception_future<>(
+ std::system_error(ENOENT, std::generic_category(),
+ "pool or object mapping failed"));
+ }
+ return with_osdmap([](const OSDMap& m) { return m.get_epoch(); })
+ .then([this, target = *target, oid, oloc, off, len, snap](epoch_t epoch) {
+ const ceph_tid_t tid = next_tid++;
+ const uint32_t hash = oloc.hash >= 0 ? static_cast<uint32_t>(oloc.hash)
+ : target.raw_pg.ps();
+ const hobject_t hobj(oid, oloc.key, snap, hash,
+ oloc.get_pool(), oloc.nspace);
+ spg_t spgid(target.pgid);
+ const int osd_id = target.primary_osd;
+
+ auto msg = crimson::make_message<MOSDOp>(client_inc, tid, hobj, spgid, epoch, 0, 0);
+ msg->set_snapid(snap);
+ msg->zero(off, len);
+
+ seastar::promise<> promise;
+ auto future = promise.get_future();
+ auto p = std::make_shared<seastar::promise<>>(std::move(promise));
+ OpCompletion completion = [p](int r, std::vector<OSDOp>&) {
+ if (r < 0) {
+ p->set_exception(std::make_exception_ptr(
+ std::system_error(-r, std::generic_category(), "discard failed")));
+ } else {
+ p->set_value();
+ }
+ };
+
+ return get_or_connect(target.primary_addr, osd_id)
+ .then([this, tid, osd_id, c = std::move(completion), msg = std::move(msg)](
+ crimson::net::ConnectionRef conn) mutable {
+ auto it = sessions.find(osd_id);
+ ceph_assert(it != sessions.end());
+ it->second.ops[tid] = std::move(c);
+ return conn->send(std::move(msg));
+ })
+ .then([f = std::move(future)]() mutable {
+ return std::move(f);
+ });
+ });
+ });
+}
+
+seastar::future<> Objecter::write_zeroes(const object_t& oid,
+ const object_locator_t& oloc,
+ uint64_t off, uint64_t len,
+ snapid_t snap)
+{
+ return discard(oid, oloc, off, len, snap);
+}
+
+seastar::future<> Objecter::compare_and_write(const object_t& oid,
+ const object_locator_t& oloc,
+ uint64_t cmp_off, ceph::bufferlist&& cmp_bl,
+ uint64_t write_off, ceph::bufferlist&& write_bl,
+ snapid_t snap)
+{
+ return calc_target(oid, oloc)
+ .then([this, oid, oloc, cmp_off, cmp_bl = std::move(cmp_bl),
+ write_off, write_bl = std::move(write_bl), snap](std::optional<OsdTarget> target) mutable {
+ if (!target) {
+ return seastar::make_exception_future<>(
+ std::system_error(ENOENT, std::generic_category(),
+ "pool or object mapping failed"));
+ }
+ return with_osdmap([](const OSDMap& m) { return m.get_epoch(); })
+ .then([this, target = *target, oid, oloc,
+ cmp_off, cmp_bl = std::move(cmp_bl),
+ write_off, write_bl = std::move(write_bl), snap](epoch_t epoch) mutable {
+ const ceph_tid_t tid = next_tid++;
+ const uint32_t hash = oloc.hash >= 0 ? static_cast<uint32_t>(oloc.hash)
+ : target.raw_pg.ps();
+ const hobject_t hobj(oid, oloc.key, snap, hash,
+ oloc.get_pool(), oloc.nspace);
+ spg_t spgid(target.pgid);
+ const size_t write_len = write_bl.length();
+ const int osd_id = target.primary_osd;
+
+ auto msg = crimson::make_message<MOSDOp>(client_inc, tid, hobj, spgid, epoch, 0, 0);
+ msg->set_snapid(snap);
+ msg->add_op_with_data(CEPH_OSD_OP_CMPEXT, cmp_off, cmp_bl.length(), std::move(cmp_bl));
+ msg->add_op_with_data(CEPH_OSD_OP_WRITE, write_off, write_len, std::move(write_bl));
+
+ seastar::promise<> promise;
+ auto future = promise.get_future();
+ auto p = std::make_shared<seastar::promise<>>(std::move(promise));
+ OpCompletion completion = [this, osd_id, write_len, p](int r, std::vector<OSDOp>&) {
+ auto it = sessions.find(osd_id);
+ if (it != sessions.end()) {
+ it->second.write_throttle.put(write_len);
+ }
+ if (r == -EILSEQ) {
+ p->set_exception(std::make_exception_ptr(
+ std::system_error(EILSEQ, std::generic_category(), "compare-and-write mismatch")));
+ } else if (r < 0) {
+ p->set_exception(std::make_exception_ptr(
+ std::system_error(-r, std::generic_category(), "compare-and-write failed")));
+ } else {
+ p->set_value();
+ }
+ };
+
+ return get_or_connect(target.primary_addr, osd_id)
+ .then([this, tid, osd_id, write_len, c = std::move(completion), msg = std::move(msg)](
+ crimson::net::ConnectionRef conn) mutable {
+ auto it = sessions.find(osd_id);
+ ceph_assert(it != sessions.end());
+ return it->second.write_throttle.get(write_len).then(
+ [this, conn, tid, osd_id, c = std::move(c),
+ msg = std::move(msg)]() mutable {
+ auto it2 = sessions.find(osd_id);
+ ceph_assert(it2 != sessions.end());
+ it2->second.ops[tid] = std::move(c);
+ return conn->send(std::move(msg));
+ }).handle_exception([this, tid, osd_id, write_len](std::exception_ptr e) {
+ auto it2 = sessions.find(osd_id);
+ if (it2 != sessions.end()) {
+ it2->second.ops.erase(tid);
+ it2->second.write_throttle.put(write_len);
+ }
+ return seastar::make_exception_future<>(e);
+ });
+ })
+ .then([f = std::move(future)]() mutable {
+ return std::move(f);
+ });
+ });
+ });
+}
+
+seastar::future<ceph::bufferlist> Objecter::exec(const object_t& oid,
+ const object_locator_t& oloc,
+ std::string_view cname,
+ std::string_view method,
+ ceph::bufferlist&& indata,
+ snapid_t snap)
+{
+ return calc_target(oid, oloc)
+ .then([this, oid, oloc, cname, method, indata = std::move(indata), snap](
+ std::optional<OsdTarget> target) mutable {
+ if (!target) {
+ return seastar::make_exception_future<ceph::bufferlist>(
+ std::system_error(ENOENT, std::generic_category(),
+ "pool or object mapping failed"));
+ }
+ return with_osdmap([](const OSDMap& m) { return m.get_epoch(); })
+ .then([this, target = *target, oid, oloc, cname, method,
+ indata = std::move(indata), snap](epoch_t epoch) mutable {
+ const ceph_tid_t tid = next_tid++;
+ const uint32_t hash = oloc.hash >= 0 ? static_cast<uint32_t>(oloc.hash)
+ : target.raw_pg.ps();
+ const hobject_t hobj(oid, oloc.key, snap, hash,
+ oloc.get_pool(), oloc.nspace);
+ spg_t spgid(target.pgid);
+ const int flags = CEPH_OSD_FLAG_RETURNVEC;
+
+ auto msg = crimson::make_message<MOSDOp>(client_inc, tid, hobj, spgid,
+ epoch, flags, 0);
+ msg->set_snapid(snap);
+ msg->add_call(cname, method, indata);
+
+ seastar::promise<ceph::bufferlist> promise;
+ auto future = promise.get_future();
+ const int osd_id = target.primary_osd;
+
+ auto p = std::make_shared<seastar::promise<ceph::bufferlist>>(
+ std::move(promise));
+ OpCompletion completion = [p](int r, std::vector<OSDOp>& ops) {
+ if (r < 0) {
+ p->set_exception(std::make_exception_ptr(
+ std::system_error(-r, std::generic_category(), "exec failed")));
+ } else if (!ops.empty()) {
+ p->set_value(std::move(ops[0].outdata));
+ } else {
+ p->set_value(ceph::bufferlist{});
+ }
+ };
+
+ return get_or_connect(target.primary_addr, osd_id)
+ .then([this, tid, osd_id, c = std::move(completion),
+ msg = std::move(msg)](crimson::net::ConnectionRef conn) mutable {
+ auto it = sessions.find(osd_id);
+ ceph_assert(it != sessions.end());
+ it->second.ops[tid] = std::move(c);
+ return conn->send(std::move(msg));
+ })
+ .then([f = std::move(future)]() mutable {
+ return std::move(f);
+ });
+ });
+ });
+}
+
+seastar::future<std::pair<uint64_t, ceph::real_time>>
+Objecter::stat(const object_t& oid,
+ const object_locator_t& oloc,
+ snapid_t snap)
+{
+ return calc_target(oid, oloc)
+ .then([this, oid, oloc, snap](std::optional<OsdTarget> target) {
+ if (!target) {
+ return seastar::make_exception_future<std::pair<uint64_t, ceph::real_time>>(
+ std::system_error(ENOENT, std::generic_category(),
+ "pool or object mapping failed"));
+ }
+ return with_osdmap([](const OSDMap& m) { return m.get_epoch(); })
+ .then([this, target = *target, oid, oloc, snap](epoch_t epoch) {
+ const ceph_tid_t tid = next_tid++;
+ const uint32_t hash = oloc.hash >= 0 ? static_cast<uint32_t>(oloc.hash)
+ : target.raw_pg.ps();
+ const hobject_t hobj(oid, oloc.key, snap, hash,
+ oloc.get_pool(), oloc.nspace);
+ spg_t spgid(target.pgid);
+ const int flags = CEPH_OSD_FLAG_RETURNVEC;
+
+ auto msg = crimson::make_message<MOSDOp>(client_inc, tid, hobj, spgid, epoch, flags, 0);
+ msg->set_snapid(snap);
+ msg->stat();
+
+ seastar::promise<std::pair<uint64_t, ceph::real_time>> promise;
+ auto future = promise.get_future();
+ const int osd_id = target.primary_osd;
+
+ auto p = std::make_shared<seastar::promise<std::pair<uint64_t, ceph::real_time>>>(
+ std::move(promise));
+ OpCompletion completion = [p](int r, std::vector<OSDOp>& ops) {
+ if (r < 0) {
+ p->set_exception(std::make_exception_ptr(
+ std::system_error(-r, std::generic_category(), "stat failed")));
+ return;
+ }
+ if (ops.empty() || ops[0].outdata.length() == 0) {
+ p->set_exception(std::make_exception_ptr(
+ std::runtime_error("stat reply empty")));
+ return;
+ }
+ try {
+ auto p_iter = ops[0].outdata.cbegin();
+ uint64_t size;
+ ceph::real_time mtime;
+ ceph::decode(size, p_iter);
+ ceph::decode(mtime, p_iter);
+ p->set_value(std::make_pair(size, mtime));
+ } catch (const ceph::buffer::error&) {
+ p->set_exception(std::make_exception_ptr(
+ std::runtime_error("stat reply decode failed")));
+ }
+ };
+
+ return get_or_connect(target.primary_addr, osd_id)
+ .then([this, tid, osd_id, c = std::move(completion), msg = std::move(msg)](
+ crimson::net::ConnectionRef conn) mutable {
+ auto it = sessions.find(osd_id);
+ ceph_assert(it != sessions.end());
+ it->second.ops[tid] = std::move(c);
+ return conn->send(std::move(msg));
+ })
+ .then([f = std::move(future)]() mutable {
+ return std::move(f);
+ });
+ });
+ });
+}
+
+void Objecter::ms_handle_connect(crimson::net::ConnectionRef conn,
+ seastar::shard_id prv_shard)
+{
+ if (!conn->peer_is_osd()) {
+ return;
+ }
+ const int osd_id = conn->get_peer_id();
+ logger().debug("ms_handle_connect: osd.{}", osd_id);
+
+ auto it = sessions.find(osd_id);
+ if (it != sessions.end()) {
+ it->second.conn = conn;
+ it->second.addr = conn->get_peer_addr();
+ } else {
+ sessions.emplace(osd_id,
+ OsdSession{osd_id, conn->get_peer_addr(), conn});
+ }
+
+ auto pend = pending_connects.find(osd_id);
+ if (pend != pending_connects.end()) {
+ pend->second.set_value(conn);
+ pending_connects.erase(pend);
+ }
+}
+
+void Objecter::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
+{
+ if (!conn->peer_is_osd()) {
+ return;
+ }
+ const int osd_id = conn->get_peer_id();
+ logger().debug("ms_handle_reset: osd.{} is_replace={}", osd_id, is_replace);
+
+ auto it = sessions.find(osd_id);
+ if (it != sessions.end()) {
+ if (it->second.conn.get() == conn.get()) {
+ it->second.conn = nullptr;
+ }
+ const auto ex = std::make_exception_ptr(
+ std::runtime_error(fmt::format("connection to osd.{} reset", osd_id)));
+ for (auto& [tid, completion] : it->second.ops) {
+ std::vector<OSDOp> empty_ops;
+ completion(-ECONNRESET, empty_ops);
+ }
+ it->second.ops.clear();
+ }
+
+ auto pend = pending_connects.find(osd_id);
+ if (pend != pending_connects.end()) {
+ pend->second.set_exception(std::make_exception_ptr(
+ std::runtime_error(fmt::format("connection to osd.{} reset", osd_id))));
+ pending_connects.erase(pend);
+ }
+}
+
+} // namespace crimson::osdc
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2026 IBM Corporation
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <functional>
+#include <memory>
+#include <optional>
+#include <string_view>
+#include <unordered_map>
+#include <vector>
+
+#include <seastar/core/future.hh>
+#include <seastar/core/gate.hh>
+#include <seastar/core/shared_future.hh>
+#include <seastar/core/shared_mutex.hh>
+
+#include "include/ceph_fs.h"
+#include "include/types.h"
+#include "include/utime.h"
+
+#include "common/ceph_time.h"
+#include "crimson/common/throttle.h"
+#include "crimson/common/type_helpers.h"
+#include "crimson/net/Dispatcher.h"
+#include "crimson/net/Fwd.h"
+
+#include "msg/MessageRef.h"
+#include "msg/msg_types.h"
+#include "messages/MOSDOp.h"
+#include "osd/OSDMap.h"
+
+class MOSDOpReply;
+class MOSDMap;
+
+namespace crimson::mon {
+class Client;
+}
+
+namespace crimson::osdc {
+
+/// Result of mapping an object to its target OSD
+struct OsdTarget {
+ pg_t pgid; ///< actual pg (after raw_pg_to_pg)
+ pg_t raw_pg; ///< raw pg (for hobject_t hash - must match classic Objecter)
+ int primary_osd;
+ entity_addr_t primary_addr;
+};
+
+/// Completion callback for op reply: (result, ops) -> void
+using OpCompletion = std::function<void(int r, std::vector<OSDOp>&)>;
+
+/// Per-OSD session holding connection and in-flight ops
+struct OsdSession {
+ int osd_id = -1;
+ entity_addr_t addr;
+ crimson::net::ConnectionRef conn;
+
+ /// In-flight ops by tid (read, write, stat)
+ std::unordered_map<ceph_tid_t, OpCompletion> ops;
+
+ /// Throttle for write bytes in flight
+ crimson::common::Throttle write_throttle{64 * 1024 * 1024}; // 64 MiB default
+};
+
+/**
+ * Crimson Objecter - Seastar-native RADOS client objecter.
+ *
+ * - Registers as Dispatcher for MOSDOpReply (stub) and MOSDMap
+ * - Subscribes to osdmap via MonClient, receives and stores OSDMap
+ * - Exposes with_osdmap() for pool resolution and PG calculation
+ * - Exposes calc_target() for object -> PG -> primary OSD mapping
+ * - Exposes get_or_connect() and send_to_osd() for OSD connection and message send
+ */
+class Objecter : public crimson::net::Dispatcher {
+public:
+ Objecter(crimson::net::Messenger& msgr,
+ crimson::mon::Client& monc);
+
+ ~Objecter() override;
+
+ seastar::future<> start();
+ seastar::future<> stop();
+
+ /// Invoke a callback with read access to the current OSDMap.
+ /// Returns a future with the result of the callback. OSDMap may be empty
+ /// (epoch 0) until the first map is received from the monitor.
+ template<typename Func>
+ seastar::futurize_t<std::invoke_result_t<Func, const OSDMap&>>
+ with_osdmap(Func&& f) const {
+ return seastar::with_shared(
+ osdmap_mutex,
+ [this, f = std::forward<Func>(f)]() {
+ return seastar::futurize_invoke(f, static_cast<const OSDMap&>(*osdmap));
+ });
+ }
+
+ /// Map object + locator to target PG and primary OSD address.
+ /// Uses OSDMap::object_locator_to_pg, raw_pg_to_pg, pg_to_up_acting_osds.
+ /// Returns nullopt if pool does not exist or mapping fails.
+ seastar::future<std::optional<OsdTarget>> calc_target(const object_t& oid,
+ const object_locator_t& oloc) const;
+
+ /// Get or create connection to OSD. Resolves when handshake completes.
+ seastar::future<crimson::net::ConnectionRef>
+ get_or_connect(const entity_addr_t& addr, int osd_id);
+
+ /// Send message to OSD (connects if needed). Requires OsdTarget from calc_target.
+ seastar::future<> send_to_osd(const OsdTarget& target, MessageURef msg);
+
+ /// Read object data. Returns bufferlist on success; completes with
+ /// exception on error (e.g. -ENOENT).
+ seastar::future<ceph::bufferlist> read(const object_t& oid,
+ const object_locator_t& oloc,
+ uint64_t off, uint64_t len,
+ snapid_t snap = CEPH_NOSNAP);
+
+ /// Write object data. Throttled per OSD.
+ seastar::future<> write(const object_t& oid,
+ const object_locator_t& oloc,
+ uint64_t off, ceph::bufferlist&& bl,
+ snapid_t snap = CEPH_NOSNAP);
+
+ /// Stat object. Returns (size, mtime).
+ seastar::future<std::pair<uint64_t, ceph::real_time>> stat(
+ const object_t& oid,
+ const object_locator_t& oloc,
+ snapid_t snap = CEPH_NOSNAP);
+
+ /// Discard (zero) object range. For RBD trim/UNMAP.
+ seastar::future<> discard(const object_t& oid,
+ const object_locator_t& oloc,
+ uint64_t off, uint64_t len,
+ snapid_t snap = CEPH_NOSNAP);
+
+ /// Write zeroes to object range. For RBD write_zeroes.
+ seastar::future<> write_zeroes(const object_t& oid,
+ const object_locator_t& oloc,
+ uint64_t off, uint64_t len,
+ snapid_t snap = CEPH_NOSNAP);
+
+ /// Compare-and-write: compare at cmp_off with cmp_bl; if match, write
+ /// write_bl at write_off. Returns -EILSEQ on mismatch.
+ seastar::future<> compare_and_write(const object_t& oid,
+ const object_locator_t& oloc,
+ uint64_t cmp_off, ceph::bufferlist&& cmp_bl,
+ uint64_t write_off, ceph::bufferlist&& write_bl,
+ snapid_t snap = CEPH_NOSNAP);
+
+ /// Execute cls method on object. Returns output bufferlist.
+ /// For use by librbd_crimson (cls/rbd get_size, get_features, etc.).
+ seastar::future<ceph::bufferlist> exec(const object_t& oid,
+ const object_locator_t& oloc,
+ std::string_view cname,
+ std::string_view method,
+ ceph::bufferlist&& indata,
+ snapid_t snap = CEPH_NOSNAP);
+
+ /// Wait until OSDMap has been received (epoch > 0).
+ seastar::future<> wait_for_osdmap();
+
+ /// Set client incarnation for MOSDOp reqid. Call before first op to avoid
+ /// duplicate detection with previous sessions. Default 0.
+ void set_client_incarnation(int inc) { client_inc = inc; }
+
+#ifdef UNIT_TESTS_BUILT
+ /// Inject OSDMap for unit tests (bypasses monitor). Also fulfills wait_for_osdmap.
+ seastar::future<> inject_osdmap_for_test(std::unique_ptr<OSDMap> m);
+#endif
+
+ // Dispatcher interface
+ std::optional<seastar::future<>>
+ ms_dispatch(crimson::net::ConnectionRef conn,
+ MessageRef m) override;
+
+ void ms_handle_connect(crimson::net::ConnectionRef conn,
+ seastar::shard_id prv_shard) override;
+ void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
+
+private:
+ crimson::net::Messenger& msgr;
+ crimson::mon::Client& monc;
+
+ std::unique_ptr<OSDMap> osdmap;
+ mutable seastar::shared_mutex osdmap_mutex;
+
+ std::unordered_map<int, OsdSession> sessions;
+ std::unordered_map<int, seastar::shared_promise<crimson::net::ConnectionRef>>
+ pending_connects;
+
+ seastar::gate dispatch_gate;
+ bool started = false;
+ int client_inc = 0;
+ ceph_tid_t next_tid = 1;
+
+ std::optional<seastar::shared_promise<>> osdmap_ready;
+ bool osdmap_ready_fulfilled = false;
+
+ seastar::future<> handle_osd_map(Ref<MOSDMap> m);
+ void handle_osd_op_reply(crimson::net::ConnectionRef conn, MOSDOpReply* m);
+ seastar::future<> maybe_request_map();
+};
+
+} // namespace crimson::osdc
add_executable(perf-crimson-msgr perf_crimson_msgr.cc)
target_link_libraries(perf-crimson-msgr crimson)
+add_executable(crimson-rados-demo rados_demo.cc)
+if(HAS_VTA)
+ set_source_files_properties(rados_demo.cc
+ PROPERTIES COMPILE_FLAGS -fno-var-tracking-assignments)
+endif()
+target_link_libraries(crimson-rados-demo
+ crimson-main-config-bootstrap
+ crimson
+ crimson-common
+ legacy-option-headers
+ ${FMT_LIB})
+install(TARGETS crimson-rados-demo DESTINATION bin)
+
add_executable(perf-async-msgr perf_async_msgr.cc)
target_link_libraries(perf-async-msgr ceph-common global ${ALLOC_LIBS})
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2026 IBM Corporation
+ *
+ * Minimal RADOS client demo using Crimson Objecter and RadosClient.
+ * Connects to cluster, creates IoCtx, and exercises: write, read,
+ * discard, write_zeroes, compare_and_write. Integration test for
+ * Objecter extensions required by bdev_rbd.
+ *
+ * Usage: crimson-rados-demo -c /etc/ceph/ceph.conf -n client.admin --pool rbd
+ */
+
+#include <iostream>
+
+#include <seastar/core/app-template.hh>
+#include <seastar/core/print.hh>
+#include <seastar/util/closeable.hh>
+#include <seastar/util/defer.hh>
+
+#include "auth/KeyRing.h"
+#include "crimson/client/io_context.h"
+#include "crimson/client/rados_client.h"
+#include "crimson/osdc/objecter.h"
+#include "crimson/common/config_proxy.h"
+#include "crimson/common/fatal_signal.h"
+#include "crimson/common/log.h"
+#include "crimson/common/perf_counters_collection.h"
+#include "crimson/mon/MonClient.h"
+#include "crimson/net/Messenger.h"
+#include "crimson/osd/main_config_bootstrap_helpers.h"
+#include "msg/msg_types.h"
+
+namespace bpo = boost::program_options;
+
+static seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_client);
+}
+
+int main(int argc, const char* argv[])
+{
+ auto early_result = crimson::osd::get_early_config_client(argc, argv);
+ if (!early_result.has_value()) {
+ std::cerr << "get_early_config_client failed: " << early_result.error()
+ << std::endl;
+ return early_result.error();
+ }
+ auto& early_config = early_result.value();
+
+ seastar::app_template::config app_cfg;
+ app_cfg.name = "crimson-rados-demo";
+ app_cfg.auto_handle_sigint_sigterm = false;
+ seastar::app_template app(std::move(app_cfg));
+ app.add_options()
+ ("pool", bpo::value<std::string>()->default_value("rbd"),
+ "pool name for I/O demo")
+ ("obj", bpo::value<std::string>()->default_value("crimson_rados_demo_obj"),
+ "object name for write/read")
+ ("debug", "enable debug logging");
+
+ try {
+ return app.run(
+ early_config.get_early_args().size(),
+ const_cast<char**>(early_config.get_early_args().data()),
+ [&] {
+ auto& config = app.configuration();
+ auto config_proxy_args = early_config.ceph_args;
+ return seastar::async([config_proxy_args, &config, &early_config] {
+ try {
+ FatalSignal fatal_signal;
+ if (config.count("debug")) {
+ seastar::global_logger_registry().set_all_loggers_level(
+ seastar::log_level::debug);
+ }
+
+ crimson::common::sharded_conf().start(
+ early_config.init_params.name,
+ early_config.cluster_name).get();
+ crimson::common::local_conf().start().get();
+ auto stop_conf = seastar::deferred_stop(
+ crimson::common::sharded_conf());
+ crimson::common::sharded_perf_coll().start().get();
+ auto stop_perf = seastar::deferred_stop(
+ crimson::common::sharded_perf_coll());
+
+ crimson::common::local_conf().parse_config_files(
+ early_config.conf_file_list).get();
+ crimson::common::local_conf().parse_env().get();
+ crimson::common::local_conf().parse_argv(
+ config_proxy_args).get();
+
+ crimson::osd::populate_config_from_mon().get();
+
+ const auto pool_name = config["pool"].as<std::string>();
+ const auto obj_name = config["obj"].as<std::string>();
+
+ class DemoAuthHandler : public crimson::common::AuthHandler {
+ public:
+ void handle_authentication(const EntityName& name,
+ const AuthCapsInfo& caps) override {}
+ };
+ auto auth_handler = std::make_unique<DemoAuthHandler>();
+ auto msgr = crimson::net::Messenger::create(
+ entity_name_t(early_config.init_params.name.get_type(), -1),
+ "rados_demo",
+ crimson::osd::get_nonce(),
+ true);
+ crimson::mon::Client monc(*msgr, *auth_handler);
+ msgr->set_auth_client(&monc);
+ msgr->set_auth_server(&monc);
+
+ crimson::client::RadosClient rados(*msgr, monc);
+ crimson::net::dispatchers_t dispatchers;
+ dispatchers.push_back(&monc);
+ dispatchers.push_back(&rados.get_objecter());
+ msgr->start(dispatchers).get();
+ auto stop_msgr = seastar::defer([&] {
+ msgr->stop();
+ msgr->shutdown().get();
+ });
+
+ monc.start().get();
+ auto stop_monc = seastar::defer([&] { monc.stop().get(); });
+
+ rados.get_objecter().set_client_incarnation(
+ static_cast<int>(crimson::osd::get_nonce() & 0x7fffffff));
+ rados.connect().get();
+ auto ioctx = rados.create_ioctx(pool_name).get();
+
+ ceph::bufferlist bl;
+ bl.append_zero(4096);
+ ioctx.write(obj_name, 0, std::move(bl)).get();
+ logger().info("wrote {} bytes to {}/{}", 4096, pool_name, obj_name);
+
+ auto data = ioctx.read(obj_name, 0, 4096).get();
+ logger().info("read {} bytes from {}/{}", data.length(),
+ pool_name, obj_name);
+
+ // Test discard (zero range) and write_zeroes
+ ioctx.discard(obj_name, 512, 1024).get();
+ logger().info("discarded bytes 512-1536");
+ ioctx.write_zeroes(obj_name, 2048, 512).get();
+ logger().info("write_zeroes bytes 2048-2560");
+
+ data = ioctx.read(obj_name, 0, 4096).get();
+ logger().info("read {} bytes after discard/write_zeroes",
+ data.length());
+
+ // Test compare_and_write: write known content, then compare-and-write
+ ceph::bufferlist init_bl;
+ init_bl.append("ABCD", 4);
+ ioctx.write(obj_name, 0, std::move(init_bl)).get();
+ logger().info("wrote ABCD at 0 for compare_and_write test");
+ ceph::bufferlist cmp_bl;
+ cmp_bl.append("ABCD", 4);
+ ceph::bufferlist write_bl;
+ write_bl.append("WXYZ", 4);
+ ioctx.compare_and_write(obj_name, 0, std::move(cmp_bl), 0,
+ std::move(write_bl)).get();
+ logger().info("compare_and_write succeeded (ABCD -> WXYZ)");
+ data = ioctx.read(obj_name, 0, 4).get();
+ logger().info("read {} bytes after compare_and_write", data.length());
+
+ // Test exec: cls hello say_hello (returns "Hello, world!" with empty in)
+ ceph::bufferlist exec_in;
+ auto exec_out = ioctx.exec(obj_name, "hello", "say_hello",
+ std::move(exec_in)).get();
+ std::string exec_str(exec_out.c_str(), exec_out.length());
+ logger().info("exec hello say_hello returned: {}", exec_str);
+
+ rados.shutdown().get();
+ logger().info("crimson-rados-demo completed successfully");
+ return EXIT_SUCCESS;
+ } catch (const std::exception& e) {
+ logger().error("crimson-rados-demo failed: {}", e.what());
+ return EXIT_FAILURE;
+ }
+ });
+ });
+ } catch (const std::exception& e) {
+ std::cerr << "FATAL: " << e.what() << std::endl;
+ return EXIT_FAILURE;
+ }
+}
#include "MOSDFastDispatchOp.h"
#include "include/ceph_features.h"
#include "include/ceph_fs.h" // for CEPH_MSG_OSD_OP
+#include "include/rados.h" // for CEPH_OSD_OP_CALL, ceph_osd_op::cls
#include "common/hobject.h"
/*
void zero(uint64_t off, uint64_t len) {
add_simple_op(CEPH_OSD_OP_ZERO, off, len);
}
+ /// Add op with data payload (e.g. CMPEXT compare buffer).
+ void add_op_with_data(int o, uint64_t off, uint64_t len, ceph::buffer::list&& bl) {
+ OSDOp osd_op;
+ osd_op.op.op = o;
+ osd_op.op.extent.offset = off;
+ osd_op.op.extent.length = len;
+ osd_op.indata = std::move(bl);
+ ops.push_back(std::move(osd_op));
+ }
+ /// Add cls call op (CEPH_OSD_OP_CALL). indata = cname + method + in.
+ void add_call(std::string_view cname, std::string_view method,
+ const ceph::buffer::list& indata) {
+ OSDOp osd_op;
+ osd_op.op.op = CEPH_OSD_OP_CALL;
+ osd_op.op.cls.class_len = static_cast<__u8>(std::min(cname.size(), size_t(255)));
+ osd_op.op.cls.method_len = static_cast<__u8>(std::min(method.size(), size_t(255)));
+ osd_op.op.cls.indata_len = indata.length();
+ osd_op.indata.append(cname.data(), osd_op.op.cls.class_len);
+ osd_op.indata.append(method.data(), osd_op.op.cls.method_len);
+ osd_op.indata.append(indata);
+ ops.push_back(std::move(osd_op));
+ }
void truncate(uint64_t off) {
add_simple_op(CEPH_OSD_OP_TRUNCATE, off, 0);
}
target_link_libraries(unittest-crimson-pg-pool-shift
crimson-common
crimson::gtest)
+
+add_executable(unittest-crimson-objecter
+ test_objecter.cc)
+add_ceph_test(unittest-crimson-objecter
+ ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/unittest-crimson-objecter
+ --memory 256M --smp 1)
+target_link_libraries(unittest-crimson-objecter
+ crimson
+ crimson::gtest)
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+#include "test/crimson/gtest_seastar.h"
+
+#include "common/ceph_context.h"
+#include "crimson/client/io_context.h"
+#include "crimson/client/rados_client.h"
+#include "crimson/common/auth_handler.h"
+#include "crimson/osdc/objecter.h"
+#include "crimson/net/Messenger.h"
+#include "crimson/mon/MonClient.h"
+#include "osd/OSDMap.h"
+#include "include/object.h"
+
+using namespace crimson;
+
+namespace {
+
+class DummyAuthHandler : public crimson::common::AuthHandler {
+public:
+ void handle_authentication(const EntityName&, const AuthCapsInfo&) final {}
+};
+
+std::unique_ptr<OSDMap> build_test_osdmap()
+{
+ auto cct = new CephContext(CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
+ auto osdmap = std::make_unique<OSDMap>();
+ uuid_d fsid;
+ const int r = osdmap->build_simple_with_pool(cct, 1, fsid, 1, 0, 0);
+ cct->put();
+ if (r != 0) {
+ return nullptr;
+ }
+
+ // Mark OSD 0 as up and in
+ OSDMap::Incremental inc(osdmap->get_epoch() + 1);
+ inc.fsid = osdmap->get_fsid();
+ entity_addrvec_t addrs;
+ addrs.v.push_back(entity_addr_t());
+ uuid_d uuid;
+ uuid.generate_random();
+ addrs.v[0].nonce = 0;
+ inc.new_state[0] = CEPH_OSD_EXISTS | CEPH_OSD_NEW;
+ inc.new_up_client[0] = addrs;
+ inc.new_up_cluster[0] = addrs;
+ inc.new_hb_back_up[0] = addrs;
+ inc.new_hb_front_up[0] = addrs;
+ inc.new_weight[0] = CEPH_OSD_IN;
+ inc.new_uuid[0] = uuid;
+ osdmap->apply_incremental(inc);
+
+ return osdmap;
+}
+
+} // namespace
+
+struct objecter_test_t : public seastar_test_suite_t {};
+
+#ifdef UNIT_TESTS_BUILT
+TEST_F(objecter_test_t, calc_target_with_injected_osdmap)
+{
+ run_async([] {
+ auto osdmap = build_test_osdmap();
+ ASSERT_TRUE(osdmap) << "failed to build test OSDMap";
+
+ auto msgr = crimson::net::Messenger::create(
+ entity_name_t::CLIENT(0), "test", 0, true);
+ DummyAuthHandler auth_handler;
+ crimson::mon::Client monc(*msgr, auth_handler);
+ crimson::osdc::Objecter objecter(*msgr, monc);
+
+ objecter.inject_osdmap_for_test(std::move(osdmap)).get();
+
+ object_t oid("test-obj");
+ object_locator_t oloc(1); // pool 1 (rbd from build_simple_with_pool)
+
+ auto target = objecter.calc_target(oid, oloc).get();
+ ASSERT_TRUE(target.has_value()) << "calc_target should succeed with valid pool";
+ EXPECT_GE(target->primary_osd, 0);
+ EXPECT_EQ(target->pgid.pool(), 1);
+ });
+}
+
+TEST_F(objecter_test_t, calc_target_nonexistent_pool)
+{
+ run_async([] {
+ auto osdmap = build_test_osdmap();
+ ASSERT_TRUE(osdmap);
+
+ auto msgr = crimson::net::Messenger::create(
+ entity_name_t::CLIENT(0), "test", 0, true);
+ DummyAuthHandler auth_handler;
+ crimson::mon::Client monc(*msgr, auth_handler);
+ crimson::osdc::Objecter objecter(*msgr, monc);
+
+ objecter.inject_osdmap_for_test(std::move(osdmap)).get();
+
+ object_t oid("test-obj");
+ object_locator_t oloc(999); // non-existent pool
+
+ auto target = objecter.calc_target(oid, oloc).get();
+ EXPECT_FALSE(target.has_value()) << "calc_target should fail for nonexistent pool";
+ });
+}
+#endif
+
+TEST_F(objecter_test_t, iocontext_get_pool_id)
+{
+ run_async([] {
+ auto msgr = crimson::net::Messenger::create(
+ entity_name_t::CLIENT(0), "test", 0, true);
+ DummyAuthHandler auth_handler;
+ crimson::mon::Client monc(*msgr, auth_handler);
+ crimson::osdc::Objecter objecter(*msgr, monc);
+
+ crimson::client::IoCtx ioctx(objecter, 42, CEPH_NOSNAP);
+ EXPECT_EQ(ioctx.get_pool_id(), 42);
+ EXPECT_EQ(ioctx.get_snap_seq(), CEPH_NOSNAP);
+ });
+}
+
+#ifdef UNIT_TESTS_BUILT
+/// Verify IoCtx has discard, write_zeroes, compare_and_write, exec APIs.
+/// Full I/O testing requires a real cluster (crimson-rados-demo with vstart,
+/// run_crimson_rados_demo_integration_test.sh).
+TEST_F(objecter_test_t, iocontext_discard_write_zeroes_api)
+{
+ run_async([] {
+ auto msgr = crimson::net::Messenger::create(
+ entity_name_t::CLIENT(0), "test", 0, true);
+ DummyAuthHandler auth_handler;
+ crimson::mon::Client monc(*msgr, auth_handler);
+ crimson::osdc::Objecter objecter(*msgr, monc);
+
+ crimson::client::IoCtx ioctx(objecter, 1, CEPH_NOSNAP);
+
+ // Verify API compiles and returns seastar::future<>
+ static_assert(std::is_same_v<decltype(ioctx.discard("", 0, 0)),
+ seastar::future<>>);
+ static_assert(std::is_same_v<decltype(ioctx.write_zeroes("", 0, 0)),
+ seastar::future<>>);
+
+ ceph::bufferlist cmp_bl, write_bl;
+ cmp_bl.append_zero(4);
+ write_bl.append_zero(4);
+ static_assert(std::is_same_v<decltype(ioctx.compare_and_write(
+ "", 0, std::move(cmp_bl), 0, std::move(write_bl))),
+ seastar::future<>>);
+ });
+}
+
+/// Verify IoCtx has exec() API for cls calls.
+TEST_F(objecter_test_t, iocontext_exec_api)
+{
+ run_async([] {
+ auto msgr = crimson::net::Messenger::create(
+ entity_name_t::CLIENT(0), "test", 0, true);
+ DummyAuthHandler auth_handler;
+ crimson::mon::Client monc(*msgr, auth_handler);
+ crimson::osdc::Objecter objecter(*msgr, monc);
+
+ crimson::client::IoCtx ioctx(objecter, 1, CEPH_NOSNAP);
+
+ ceph::bufferlist indata;
+ static_assert(std::is_same_v<decltype(ioctx.exec("", "cls", "method",
+ std::move(indata))),
+ seastar::future<ceph::bufferlist>>);
+ });
+}
+#endif