]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson: add Objecter, IoCtx and RadosClient
authorAlexander Indenbaum <aindenba@redhat.com>
Sat, 21 Feb 2026 14:27:02 +0000 (16:27 +0200)
committerAlexander Indenbaum <aindenba@redhat.com>
Mon, 23 Feb 2026 19:39:05 +0000 (21:39 +0200)
Implement Crimson RADOS client stack:
- Objecter: OSDMap integration, PG mapping, OSD session, read/write/stat,
  discard, write_zeroes, compare_and_write, exec (cls calls), per-OSD write throttling
- IoCtx: pool-scoped read, write, stat, discard, write_zeroes, compare_and_write, exec
- RadosClient: connect, create_ioctx, wait_for_osdmap, shutdown

Tests:
- unittest-crimson-objecter: calc_target, IoCtx, API checks
- crimson-rados-demo: integration test exercising all ops

Signed-off-by: Alexander Indenbaum <aindenba@redhat.com>
16 files changed:
.gitignore
src/crimson/CMakeLists.txt
src/crimson/client/io_context.cc [new file with mode: 0644]
src/crimson/client/io_context.h [new file with mode: 0644]
src/crimson/client/rados_client.cc [new file with mode: 0644]
src/crimson/client/rados_client.h [new file with mode: 0644]
src/crimson/osd/CMakeLists.txt
src/crimson/osd/main_config_bootstrap_helpers.cc
src/crimson/osd/main_config_bootstrap_helpers.h
src/crimson/osdc/objecter.cc [new file with mode: 0644]
src/crimson/osdc/objecter.h [new file with mode: 0644]
src/crimson/tools/CMakeLists.txt
src/crimson/tools/rados_demo.cc [new file with mode: 0644]
src/messages/MOSDOp.h
src/test/crimson/CMakeLists.txt
src/test/crimson/test_objecter.cc [new file with mode: 0644]

index c74ad2efd69bc8347dd1500894617e6df8fc01ce..c60d3d79f41ac23f85658c4f9efe153595435e5b 100644 (file)
@@ -38,6 +38,9 @@ core
 /src/td
 /src/vstart_environment.sh
 
+# Local integration test scripts (not for commit)
+/src/nvmeof/gateway/run_crimson_rbd_demo_integration_test.sh
+
 # alpine abuild things
 .abuild/
 alpine/APKBUILD
index 83f4519ff51691c64e8a4d8440da8e80db80807c..d1236a41c95e36982c70f70376aa40cf76392747 100644 (file)
@@ -171,6 +171,11 @@ set(crimson_mgr_srcs
 set(crimson_mon_srcs
   mon/MonClient.cc
   ${PROJECT_SOURCE_DIR}/src/mon/MonSub.cc)
+set(crimson_osdc_srcs
+  osdc/objecter.cc)
+set(crimson_client_srcs
+  client/io_context.cc
+  client/rados_client.cc)
 set(crimson_net_srcs
   ${PROJECT_SOURCE_DIR}/src/msg/async/crypto_onwire.cc
   ${PROJECT_SOURCE_DIR}/src/msg/async/compression_onwire.cc
@@ -186,8 +191,10 @@ set(crimson_net_srcs
   net/chained_dispatchers.cc)
 add_library(crimson STATIC
   ${crimson_auth_srcs}
+  ${crimson_client_srcs}
   ${crimson_mgr_srcs}
   ${crimson_mon_srcs}
+  ${crimson_osdc_srcs}
   ${crimson_net_srcs})
 target_compile_options(crimson PUBLIC
   "-ftemplate-backtrace-limit=0")
diff --git a/src/crimson/client/io_context.cc b/src/crimson/client/io_context.cc
new file mode 100644 (file)
index 0000000..2b44333
--- /dev/null
@@ -0,0 +1,107 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2026 IBM Corporation
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "crimson/client/io_context.h"
+
+#include "crimson/common/log.h"
+#include "crimson/osdc/objecter.h"
+#include "include/object.h"
+#include "osd/osd_types.h"
+
+namespace {
+  seastar::logger& logger() {
+    return crimson::get_logger(ceph_subsys_client);
+  }
+}
+
+namespace crimson::client {
+
+IoCtx::IoCtx(crimson::osdc::Objecter& objecter,
+             int64_t pool_id,
+             snapid_t snap_seq)
+  : objecter(objecter), pool_id(pool_id), snap_seq(snap_seq)
+{}
+
+seastar::future<ceph::bufferlist> IoCtx::read(const std::string& oid,
+                                              uint64_t off,
+                                              uint64_t len)
+{
+  logger().debug("IoCtx::read oid={} off={} len={}", oid, off, len);
+  object_locator_t oloc(pool_id);
+  return objecter.read(object_t(oid), oloc, off, len, snap_seq);
+}
+
+seastar::future<> IoCtx::write(const std::string& oid,
+                               uint64_t off,
+                               ceph::bufferlist&& bl)
+{
+  logger().debug("IoCtx::write oid={} off={} len={}", oid, off, bl.length());
+  object_locator_t oloc(pool_id);
+  return objecter.write(object_t(oid), oloc, off, std::move(bl), snap_seq);
+}
+
+seastar::future<std::pair<uint64_t, ceph::real_time>> IoCtx::stat(
+    const std::string& oid)
+{
+  logger().debug("IoCtx::stat oid={}", oid);
+  object_locator_t oloc(pool_id);
+  return objecter.stat(object_t(oid), oloc, snap_seq);
+}
+
+seastar::future<> IoCtx::discard(const std::string& oid,
+                                  uint64_t off,
+                                  uint64_t len)
+{
+  logger().debug("IoCtx::discard oid={} off={} len={}", oid, off, len);
+  object_locator_t oloc(pool_id);
+  return objecter.discard(object_t(oid), oloc, off, len, snap_seq);
+}
+
+seastar::future<> IoCtx::write_zeroes(const std::string& oid,
+                                      uint64_t off,
+                                      uint64_t len)
+{
+  logger().debug("IoCtx::write_zeroes oid={} off={} len={}", oid, off, len);
+  object_locator_t oloc(pool_id);
+  return objecter.write_zeroes(object_t(oid), oloc, off, len, snap_seq);
+}
+
+seastar::future<> IoCtx::compare_and_write(const std::string& oid,
+                                           uint64_t cmp_off,
+                                           ceph::bufferlist&& cmp_bl,
+                                           uint64_t write_off,
+                                           ceph::bufferlist&& write_bl)
+{
+  logger().debug("IoCtx::compare_and_write oid={} cmp_off={} write_off={}",
+                 oid, cmp_off, write_off);
+  object_locator_t oloc(pool_id);
+  return objecter.compare_and_write(object_t(oid), oloc,
+                                    cmp_off, std::move(cmp_bl),
+                                    write_off, std::move(write_bl),
+                                    snap_seq);
+}
+
+seastar::future<ceph::bufferlist> IoCtx::exec(const std::string& oid,
+                                              std::string_view cname,
+                                              std::string_view method,
+                                              ceph::bufferlist&& indata)
+{
+  logger().debug("IoCtx::exec oid={} cls={}.{}", oid, cname, method);
+  object_locator_t oloc(pool_id);
+  return objecter.exec(object_t(oid), oloc, cname, method, std::move(indata),
+                      snap_seq);
+}
+
+} // namespace crimson::client
diff --git a/src/crimson/client/io_context.h b/src/crimson/client/io_context.h
new file mode 100644 (file)
index 0000000..2fa361d
--- /dev/null
@@ -0,0 +1,83 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2026 IBM Corporation
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <string>
+#include <string_view>
+
+#include <seastar/core/future.hh>
+
+#include "include/buffer.h"
+#include "include/types.h"
+#include "common/ceph_time.h"
+
+namespace crimson::osdc {
+class Objecter;
+}
+
+namespace crimson::client {
+
+/**
+ * Pool-scoped I/O context. Delegates read, write, stat to Objecter
+ * with object_locator_t built from pool_id.
+ */
+class IoCtx {
+public:
+  IoCtx(crimson::osdc::Objecter& objecter,
+        int64_t pool_id,
+        snapid_t snap_seq = CEPH_NOSNAP);
+
+  seastar::future<ceph::bufferlist> read(const std::string& oid,
+                                         uint64_t off,
+                                         uint64_t len);
+
+  seastar::future<> write(const std::string& oid,
+                          uint64_t off,
+                          ceph::bufferlist&& bl);
+
+  seastar::future<std::pair<uint64_t, ceph::real_time>> stat(
+      const std::string& oid);
+
+  seastar::future<> discard(const std::string& oid,
+                            uint64_t off,
+                            uint64_t len);
+
+  seastar::future<> write_zeroes(const std::string& oid,
+                                 uint64_t off,
+                                 uint64_t len);
+
+  seastar::future<> compare_and_write(const std::string& oid,
+                                      uint64_t cmp_off,
+                                      ceph::bufferlist&& cmp_bl,
+                                      uint64_t write_off,
+                                      ceph::bufferlist&& write_bl);
+
+  /// Execute cls method on object. Returns output bufferlist.
+  seastar::future<ceph::bufferlist> exec(const std::string& oid,
+                                        std::string_view cname,
+                                        std::string_view method,
+                                        ceph::bufferlist&& indata);
+
+  int64_t get_pool_id() const { return pool_id; }
+  snapid_t get_snap_seq() const { return snap_seq; }
+
+private:
+  crimson::osdc::Objecter& objecter;
+  int64_t pool_id;
+  snapid_t snap_seq;
+};
+
+} // namespace crimson::client
diff --git a/src/crimson/client/rados_client.cc b/src/crimson/client/rados_client.cc
new file mode 100644 (file)
index 0000000..2334524
--- /dev/null
@@ -0,0 +1,98 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2026 IBM Corporation
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "crimson/client/rados_client.h"
+
+#include <cerrno>
+#include <system_error>
+
+#include "crimson/common/log.h"
+#include "crimson/osdc/objecter.h"
+#include "osd/OSDMap.h"
+
+namespace {
+  seastar::logger& logger() {
+    return crimson::get_logger(ceph_subsys_client);
+  }
+}
+
+namespace crimson::client {
+
+RadosClient::RadosClient(crimson::net::Messenger& msgr,
+                         crimson::mon::Client& monc)
+  : msgr(msgr),
+    monc(monc),
+    objecter(std::make_unique<crimson::osdc::Objecter>(msgr, monc))
+{}
+
+RadosClient::~RadosClient() = default;
+
+crimson::osdc::Objecter& RadosClient::get_objecter()
+{
+  return *objecter;
+}
+
+seastar::future<> RadosClient::connect()
+{
+  logger().info("RadosClient::connect");
+  return objecter->start();
+}
+
+seastar::future<IoCtx> RadosClient::create_ioctx(std::string_view pool_name)
+{
+  logger().debug("RadosClient::create_ioctx pool_name={}", pool_name);
+  return objecter->wait_for_osdmap()
+    .then([this, pool_name = std::string(pool_name)] {
+      return objecter->with_osdmap(
+        [pool_name](const OSDMap& o) {
+          return o.lookup_pg_pool_name(pool_name);
+        });
+    })
+    .then([this](int64_t pool_id) {
+      if (pool_id < 0) {
+        return seastar::make_exception_future<IoCtx>(
+          std::system_error(-pool_id, std::system_category(), "pool not found"));
+      }
+      logger().debug("RadosClient::create_ioctx pool_id={}", pool_id);
+      return seastar::make_ready_future<IoCtx>(IoCtx(*objecter, pool_id));
+    });
+}
+
+seastar::future<IoCtx> RadosClient::create_ioctx(int64_t pool_id)
+{
+  logger().debug("RadosClient::create_ioctx pool_id={}", pool_id);
+  return objecter->wait_for_osdmap()
+    .then([this, pool_id] {
+      return objecter->with_osdmap(
+        [pool_id](const OSDMap& o) {
+          return o.have_pg_pool(pool_id);
+        });
+    })
+    .then([this, pool_id](bool exists) {
+      if (!exists) {
+        return seastar::make_exception_future<IoCtx>(
+          std::system_error(ENOENT, std::system_category(), "pool not found"));
+      }
+      return seastar::make_ready_future<IoCtx>(IoCtx(*objecter, pool_id));
+    });
+}
+
+seastar::future<> RadosClient::shutdown()
+{
+  logger().info("RadosClient::shutdown");
+  return objecter->stop();
+}
+
+} // namespace crimson::client
diff --git a/src/crimson/client/rados_client.h b/src/crimson/client/rados_client.h
new file mode 100644 (file)
index 0000000..6265051
--- /dev/null
@@ -0,0 +1,92 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2026 IBM Corporation
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <string_view>
+
+#include <seastar/core/future.hh>
+
+#include "crimson/client/io_context.h"
+
+namespace crimson::mon {
+class Client;
+}
+
+namespace crimson::net {
+class Messenger;
+}
+
+namespace crimson::osdc {
+class Objecter;
+}
+
+namespace crimson::client {
+
+class IoCtx;
+
+/**
+ * RADOS client facade. Holds Messenger, MonClient, Objecter.
+ * Provides connect(), create_ioctx(pool_name), shutdown().
+ *
+ * Usage:
+ *   auto auth_handler = std::make_unique<DummyAuthHandler>();
+ *   auto msgr = crimson::net::Messenger::create(entity_name_t::CLIENT(), ...);
+ *   crimson::mon::Client monc{*msgr, *auth_handler};
+ *   RadosClient client(*msgr, monc);
+ *
+ *   msgr->set_auth_client(&monc);
+ *   msgr->start({&monc, &client.get_objecter()});
+ *   monc.start().get();
+ *   client.connect().get();
+ *
+ *   auto ioctx = client.create_ioctx("rbd").get();
+ *   co_await ioctx.write("obj", 0, std::move(bl));
+ *   auto data = co_await ioctx.read("obj", 0, 4096);
+ *
+ *   client.shutdown().get();
+ */
+class RadosClient {
+public:
+  RadosClient(crimson::net::Messenger& msgr,
+              crimson::mon::Client& monc);
+
+  ~RadosClient();
+
+  /// Objecter reference for adding to Messenger dispatchers before start.
+  crimson::osdc::Objecter& get_objecter();
+
+  /// Start Objecter (subscribe to osdmap). Call after msgr.start() and monc.start().
+  seastar::future<> connect();
+
+  /// Create pool-scoped IoCtx by pool name. Waits for OSDMap, then looks up pool.
+  /// Fails with -ENOENT if pool not found.
+  seastar::future<IoCtx> create_ioctx(std::string_view pool_name);
+
+  /// Create pool-scoped IoCtx by pool id. Validates pool exists in OSDMap.
+  seastar::future<IoCtx> create_ioctx(int64_t pool_id);
+
+  /// Stop Objecter.
+  seastar::future<> shutdown();
+
+private:
+  crimson::net::Messenger& msgr;
+  crimson::mon::Client& monc;
+  std::unique_ptr<crimson::osdc::Objecter> objecter;
+};
+
+} // namespace crimson::client
index 4fdcb4bba32ab5bc90448938c12dab6391129609..2910356db8b7550fbf132f8816dffc1b5c3b0b53 100644 (file)
@@ -1,10 +1,16 @@
+add_library(crimson-main-config-bootstrap STATIC
+  main_config_bootstrap_helpers.cc)
+target_link_libraries(crimson-main-config-bootstrap
+  crimson-common
+  crimson
+  legacy-option-headers)
+
 add_executable(crimson-osd
   backfill_state.cc
   ec_backend.cc
   heartbeat.cc
   lsan_suppressions.cc
   main.cc
-  main_config_bootstrap_helpers.cc
   osd.cc
   osd_meta.cc
   pg.cc
@@ -65,6 +71,7 @@ if(HAS_VTA)
     PROPERTIES COMPILE_FLAGS -fno-var-tracking-assignments)
 endif()
 target_link_libraries(crimson-osd
+  crimson-main-config-bootstrap
   legacy-option-headers
   crimson-admin
   crimson-common
index adf7d630b1392f589c5d5894bc6a91e2b1ba8066..2d7f647362732711861c21ffbc7b0f11b3e4b60e 100644 (file)
@@ -128,7 +128,7 @@ std::optional<std::string> get_option_value(const SeastarOption& option) {
 }
 
 static tl::expected<early_config_t, int>
-_get_early_config(int argc, const char *argv[])
+_get_early_config(int argc, const char *argv[], int entity_type)
 {
   early_config_t ret;
 
@@ -140,7 +140,7 @@ _get_early_config(int argc, const char *argv[])
 
   ret.init_params = ceph_argparse_early_args(
     early_args,
-    CEPH_ENTITY_TYPE_OSD,
+    entity_type,
     &ret.cluster_name,
     &ret.conf_file_list);
 
@@ -152,8 +152,8 @@ _get_early_config(int argc, const char *argv[])
   int r = app.run(
     sizeof(bootstrap_args) / sizeof(bootstrap_args[0]),
     const_cast<char**>(bootstrap_args),
-    [argc, argv, &ret, &early_args] {
-      return seastar::async([argc, argv, &ret, &early_args] {
+    [argc, argv, &ret, &early_args, entity_type] {
+      return seastar::async([argc, argv, &ret, &early_args, entity_type] {
        seastar::global_logger_registry().set_all_loggers_level(
          seastar::log_level::debug);
        sharded_conf().start(
@@ -216,20 +216,25 @@ _get_early_config(int argc, const char *argv[])
                          cpu_cores);
          } else {
            auto reactor_num = crimson::common::get_conf<uint64_t>("crimson_cpu_num");
-           if (!reactor_num) {
-             // We would like to avoid seastar using all available cores.
+           if (entity_type == CEPH_ENTITY_TYPE_CLIENT && !reactor_num) {
+             ret.early_args.emplace_back("--smp");
+             ret.early_args.emplace_back("1");
+             ret.early_args.emplace_back("--thread-affinity");
+             ret.early_args.emplace_back("0");
+             logger().info("get_early_config: client default --smp 1");
+           } else if (!reactor_num) {
              logger().error("get_early_config: crimson_cpu_set"
                             " or crimson_cpu_num must be set");
              ceph_abort();
+           } else {
+             std::string smp = fmt::format("{}", reactor_num);
+             ret.early_args.emplace_back("--smp");
+             ret.early_args.emplace_back(smp);
+             ret.early_args.emplace_back("--thread-affinity");
+             ret.early_args.emplace_back("0");
+             logger().info("get_early_config: set --thread-affinity 0 --smp {}",
+                           smp);
            }
-           std::string smp = fmt::format("{}", reactor_num);
-           ret.early_args.emplace_back("--smp");
-           ret.early_args.emplace_back(smp);
-           ret.early_args.emplace_back("--thread-affinity");
-           ret.early_args.emplace_back("0");
-           logger().info("get_early_config: set --thread-affinity 0 --smp {}",
-                         smp);
-
          }
        } else {
          logger().error("get_early_config: --cpuset can be "
@@ -301,7 +306,7 @@ get_early_config(int argc, const char *argv[])
     return tl::unexpected(-errno);
   } else if (worker == 0) { // child
     close(pipes[0]);
-    auto ret = _get_early_config(argc, argv);
+    auto ret = _get_early_config(argc, argv, CEPH_ENTITY_TYPE_OSD);
     if (ret.has_value()) {
       bufferlist bl;
       ::encode(ret.value(), bl);
@@ -355,4 +360,86 @@ get_early_config(int argc, const char *argv[])
   }
 }
 
+tl::expected<early_config_t, int>
+get_early_config_client(int argc, const char *argv[])
+{
+  auto args = argv_to_vec(argc, argv);
+  if (args.empty()) {
+    std::cerr << argv[0] << ": -h or --help for usage" << std::endl;
+    exit(1);
+  }
+  if (ceph_argparse_need_usage(args)) {
+    std::cout << "usage: " << argv[0] << " [options] [--pool <pool>]" << std::endl;
+    generic_server_usage();
+    exit(0);
+  }
+  int pipes[2];
+  int r = pipe2(pipes, 0);
+  if (r < 0) {
+    std::cerr << "get_early_config_client: failed to create pipes: "
+             << -errno << std::endl;
+    return tl::unexpected(-errno);
+  }
+
+  pid_t worker = fork();
+  if (worker < 0) {
+    close(pipes[0]);
+    close(pipes[1]);
+    std::cerr << "get_early_config_client: failed to fork: "
+             << -errno << std::endl;
+    return tl::unexpected(-errno);
+  } else if (worker == 0) { // child
+    close(pipes[0]);
+    auto ret = _get_early_config(argc, argv, CEPH_ENTITY_TYPE_CLIENT);
+    if (ret.has_value()) {
+      bufferlist bl;
+      ::encode(ret.value(), bl);
+      r = bl.write_fd(pipes[1]);
+      close(pipes[1]);
+      if (r < 0) {
+       std::cerr << "get_early_config_client: child failed to write_fd: "
+                 << r << std::endl;
+       exit(-r);
+      } else {
+       exit(0);
+      }
+    } else {
+      std::cerr << "get_early_config_client: child failed: "
+               << -ret.error() << std::endl;
+      exit(-ret.error());
+    }
+    return tl::unexpected(-1);
+  } else { // parent
+    close(pipes[1]);
+
+    bufferlist bl;
+    early_config_t ret;
+    bool have_data = false;
+    while ((r = bl.read_fd(pipes[0], 1024)) > 0) {
+      have_data = true;
+    }
+    close(pipes[0]);
+
+    int status;
+    waitpid(worker, &status, 0);
+
+    if (!have_data && WIFEXITED(status) && WEXITSTATUS(status) == 0) {
+      exit(0);
+    }
+    if (r < 0) {
+      std::cerr << "get_early_config_client: parent failed to read from pipe: "
+               << r << std::endl;
+      return tl::unexpected(r);
+    }
+    try {
+      auto bliter = bl.cbegin();
+      ::decode(ret, bliter);
+      return ret;
+    } catch (...) {
+      std::cerr << "get_early_config_client: parent failed to decode" << std::endl;
+      return tl::unexpected(-EINVAL);
+    }
+  }
+}
+
 }
index 6e993a58cf007b72622d0e24ce07054d52c3d084..e05907d64c6d557a34f6841fc117d17e568c54dd 100644 (file)
@@ -88,6 +88,10 @@ struct early_config_t {
 tl::expected<early_config_t, int>
 get_early_config(int argc, const char *argv[]);
 
+/// Like get_early_config but for CEPH_ENTITY_TYPE_CLIENT (e.g. client.admin).
+tl::expected<early_config_t, int>
+get_early_config_client(int argc, const char *argv[]);
+
 }
 
 WRITE_CLASS_ENCODER(crimson::osd::early_config_t)
diff --git a/src/crimson/osdc/objecter.cc b/src/crimson/osdc/objecter.cc
new file mode 100644 (file)
index 0000000..955fc37
--- /dev/null
@@ -0,0 +1,759 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2026 IBM Corporation
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "objecter.h"
+
+#include <boost/intrusive_ptr.hpp>
+
+#include "crimson/common/log.h"
+#include "crimson/net/Connection.h"
+#include "crimson/net/Messenger.h"
+#include "messages/MOSDOp.h"
+#include "messages/MOSDOpReply.h"
+#include "messages/MOSDMap.h"
+
+#include "common/entity_name.h"
+#include "common/hobject.h"
+#include "crimson/mon/MonClient.h"
+#include "include/rados.h"
+
+namespace {
+  seastar::logger& logger() {
+    return crimson::get_logger(ceph_subsys_objecter);
+  }
+}
+
+namespace crimson::osdc {
+
+Objecter::Objecter(crimson::net::Messenger& msgr,
+                   crimson::mon::Client& monc)
+  : msgr(msgr),
+    monc(monc),
+    osdmap(std::make_unique<OSDMap>())
+{}
+
+Objecter::~Objecter() {}
+
+seastar::future<> Objecter::start()
+{
+  logger().info("Objecter::start");
+  ceph_assert(!started);
+  started = true;
+  osdmap_ready.emplace();
+
+  // Subscribe to osdmap updates from the monitor
+  if (monc.sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME)) {
+    return monc.renew_subs();
+  }
+  return seastar::now();
+}
+
+seastar::future<> Objecter::stop()
+{
+  logger().info("Objecter::stop");
+  ceph_assert(started);
+  started = false;
+  return dispatch_gate.close();
+}
+
+std::optional<seastar::future<>>
+Objecter::ms_dispatch(crimson::net::ConnectionRef conn,
+                      MessageRef m)
+{
+  const int msg_type = m->get_type();
+
+  // Only handle messages from MON (OSDMap) or OSD (OpReply)
+  if (conn->peer_is_mon()) {
+    if (msg_type != CEPH_MSG_OSD_MAP) {
+      return std::nullopt;
+    }
+    auto mosdmap = boost::static_pointer_cast<MOSDMap>(m);
+    return seastar::with_gate(dispatch_gate, [this, mosdmap] {
+      return handle_osd_map(std::move(mosdmap));
+    });
+  }
+
+  if (conn->peer_is_osd()) {
+    if (msg_type != CEPH_MSG_OSD_OPREPLY) {
+      return std::nullopt;
+    }
+    auto reply = boost::static_pointer_cast<MOSDOpReply>(m);
+    return seastar::with_gate(dispatch_gate, [this, conn, reply] {
+      handle_osd_op_reply(conn, reply.get());
+      return seastar::now();
+    });
+  }
+
+  return std::nullopt;
+}
+
+seastar::future<> Objecter::handle_osd_map(Ref<MOSDMap> m)
+{
+  logger().debug("handle_osd_map: epochs [{},{}]",
+                m->get_first(), m->get_last());
+
+  if (m->fsid != monc.get_fsid()) {
+    logger().warn("handle_osd_map: fsid mismatch {} != {}",
+                  m->fsid, monc.get_fsid());
+    return seastar::now();
+  }
+
+  return seastar::with_lock(osdmap_mutex, [this, m = std::move(m)] {
+    seastar::future<> renew_fut = seastar::now();
+
+    if (m->get_last() <= osdmap->get_epoch()) {
+      logger().debug("handle_osd_map: ignoring stale epochs [{},{}] <= {}",
+                    m->get_first(), m->get_last(), osdmap->get_epoch());
+    } else {
+      if (osdmap->get_epoch()) {
+        // Apply incremental maps
+        for (epoch_t e = osdmap->get_epoch() + 1; e <= m->get_last(); ++e) {
+          if (osdmap->get_epoch() == e - 1 && m->incremental_maps.count(e)) {
+            logger().debug("handle_osd_map: applying incremental epoch {}", e);
+            OSDMap::Incremental inc(m->incremental_maps[e]);
+            osdmap->apply_incremental(inc);
+          } else if (m->maps.count(e)) {
+            logger().debug("handle_osd_map: applying full epoch {}", e);
+            auto new_osdmap = std::make_unique<OSDMap>();
+            new_osdmap->decode(m->maps[e]);
+            osdmap = std::move(new_osdmap);
+          }
+        }
+      } else {
+        // First map - need full
+        if (m->maps.count(m->get_last())) {
+          logger().debug("handle_osd_map: decoding initial full epoch {}",
+                        m->get_last());
+          osdmap->decode(m->maps[m->get_last()]);
+        } else {
+          logger().debug("handle_osd_map: need full map, requesting");
+          renew_fut = maybe_request_map();
+        }
+      }
+    }
+
+    monc.sub_got("osdmap", osdmap->get_epoch());
+
+    if (osdmap->get_epoch() > 0 && osdmap_ready && !osdmap_ready_fulfilled) {
+      osdmap_ready_fulfilled = true;
+      osdmap_ready->set_value();
+    }
+    return renew_fut;
+  });
+}
+
+void Objecter::handle_osd_op_reply(crimson::net::ConnectionRef conn,
+                                  MOSDOpReply* m)
+{
+  const ceph_tid_t tid = m->get_tid();
+  const int r = m->get_result();
+
+  if (!conn->peer_is_osd()) {
+    return;
+  }
+  const int osd_id = conn->get_peer_id();
+  auto it = sessions.find(osd_id);
+  if (it == sessions.end()) {
+    logger().warn("handle_osd_op_reply: tid={} from unknown osd.{}", tid, osd_id);
+    return;
+  }
+  auto op_it = it->second.ops.find(tid);
+  if (op_it == it->second.ops.end()) {
+    logger().debug("handle_osd_op_reply: tid={} not found (may be stale)", tid);
+    return;
+  }
+  auto completion = std::move(op_it->second);
+  it->second.ops.erase(op_it);
+
+  std::vector<OSDOp> ops;
+  m->claim_ops(ops);
+  completion(r, ops);
+}
+
+seastar::future<> Objecter::wait_for_osdmap()
+{
+  return with_osdmap([](const OSDMap& o) { return o.get_epoch(); })
+    .then([this](epoch_t epoch) {
+      if (epoch > 0) {
+        return seastar::now();
+      }
+      return osdmap_ready->get_shared_future();
+    });
+}
+
+seastar::future<> Objecter::maybe_request_map()
+{
+  if (monc.sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME)) {
+    return monc.renew_subs();
+  }
+  return seastar::now();
+}
+
+#ifdef UNIT_TESTS_BUILT
+seastar::future<> Objecter::inject_osdmap_for_test(std::unique_ptr<OSDMap> m)
+{
+  return seastar::with_lock(osdmap_mutex, [this, m = std::move(m)]() mutable {
+    osdmap = std::move(m);
+    monc.sub_got("osdmap", osdmap->get_epoch());
+    if (osdmap->get_epoch() > 0 && osdmap_ready && !osdmap_ready_fulfilled) {
+      osdmap_ready_fulfilled = true;
+      osdmap_ready->set_value();
+    }
+  });
+}
+#endif
+
+seastar::future<std::optional<OsdTarget>>
+Objecter::calc_target(const object_t& oid,
+                     const object_locator_t& oloc) const
+{
+  return seastar::with_shared(osdmap_mutex, [this, oid, oloc] {
+    if (!osdmap->get_epoch()) {
+      return std::optional<OsdTarget>{};  // no OSDMap yet
+    }
+
+    pg_t raw_pg;
+    const int ret = osdmap->object_locator_to_pg(oid, oloc, raw_pg);
+    if (ret != 0) {
+      return std::optional<OsdTarget>{};  // pool does not exist (e.g. -ENOENT)
+    }
+
+    const pg_t actual_pgid = osdmap->raw_pg_to_pg(raw_pg);
+
+    int up_primary = -1;
+    int acting_primary = -1;
+    std::vector<int> up;
+    std::vector<int> acting;
+    osdmap->pg_to_up_acting_osds(actual_pgid,
+                                 &up, &up_primary,
+                                 &acting, &acting_primary);
+
+    if (acting_primary < 0 || !osdmap->exists(acting_primary)) {
+      return std::optional<OsdTarget>{};
+    }
+
+    const entity_addr_t addr = osdmap->get_addrs(acting_primary).front();
+    return std::optional<OsdTarget>{OsdTarget{actual_pgid, raw_pg, acting_primary, addr}};
+  });
+}
+
+seastar::future<crimson::net::ConnectionRef>
+Objecter::get_or_connect(const entity_addr_t& addr, int osd_id)
+{
+  auto it = sessions.find(osd_id);
+  if (it != sessions.end() && it->second.conn && it->second.conn->is_connected()) {
+    return seastar::make_ready_future<crimson::net::ConnectionRef>(it->second.conn);
+  }
+
+  auto pend_it = pending_connects.find(osd_id);
+  if (pend_it != pending_connects.end()) {
+    return pend_it->second.get_shared_future();
+  }
+
+  crimson::net::ConnectionRef conn = msgr.connect(
+    addr, entity_name_t(CEPH_ENTITY_TYPE_OSD, osd_id));
+
+  if (conn->is_connected()) {
+    if (it == sessions.end()) {
+      sessions.emplace(osd_id, OsdSession{osd_id, addr, conn});
+    } else {
+      it->second.conn = conn;
+      it->second.addr = addr;
+    }
+    return seastar::make_ready_future<crimson::net::ConnectionRef>(conn);
+  }
+
+  if (it == sessions.end()) {
+    sessions.emplace(osd_id, OsdSession{osd_id, addr, conn});
+  } else {
+    it->second.conn = conn;
+    it->second.addr = addr;
+  }
+
+  auto [promise_it, inserted] = pending_connects.emplace(
+    osd_id, seastar::shared_promise<crimson::net::ConnectionRef>{});
+  ceph_assert(inserted);
+  return promise_it->second.get_shared_future();
+}
+
+seastar::future<> Objecter::send_to_osd(const OsdTarget& target, MessageURef msg)
+{
+  return get_or_connect(target.primary_addr, target.primary_osd)
+    .then([msg = std::move(msg)](crimson::net::ConnectionRef conn) mutable {
+      return conn->send(std::move(msg));
+    });
+}
+
+seastar::future<ceph::bufferlist> Objecter::read(const object_t& oid,
+                                                 const object_locator_t& oloc,
+                                                 uint64_t off, uint64_t len,
+                                                 snapid_t snap)
+{
+  return calc_target(oid, oloc)
+    .then([this, oid, oloc, off, len, snap](std::optional<OsdTarget> target) {
+      if (!target) {
+        return seastar::make_exception_future<ceph::bufferlist>(
+          std::system_error(ENOENT, std::generic_category(), "pool or object mapping failed"));
+      }
+      return with_osdmap([](const OSDMap& m) { return m.get_epoch(); })
+        .then([this, target = *target, oid, oloc, off, len, snap](epoch_t epoch) {
+          const ceph_tid_t tid = next_tid++;
+          const uint32_t hash = oloc.hash >= 0 ? static_cast<uint32_t>(oloc.hash)
+                                               : target.raw_pg.ps();
+          const hobject_t hobj(oid, oloc.key, snap, hash,
+                              oloc.get_pool(), oloc.nspace);
+          spg_t spgid(target.pgid);
+          const int flags = CEPH_OSD_FLAG_RETURNVEC;
+
+          auto msg = crimson::make_message<MOSDOp>(client_inc, tid, hobj, spgid, epoch, flags, 0);
+          msg->set_snapid(snap);
+          msg->read(off, len);
+
+          seastar::promise<ceph::bufferlist> promise;
+          auto future = promise.get_future();
+          const int osd_id = target.primary_osd;
+
+          auto p = std::make_shared<seastar::promise<ceph::bufferlist>>(std::move(promise));
+          OpCompletion completion = [p](int r, std::vector<OSDOp>& ops) {
+            if (r < 0) {
+              p->set_exception(std::make_exception_ptr(
+                std::system_error(-r, std::generic_category(), "read failed")));
+            } else if (!ops.empty()) {
+              p->set_value(std::move(ops[0].outdata));
+            } else {
+              p->set_value(ceph::bufferlist{});
+            }
+          };
+
+          return get_or_connect(target.primary_addr, osd_id)
+            .then([this, tid, osd_id, c = std::move(completion), msg = std::move(msg)](
+                  crimson::net::ConnectionRef conn) mutable {
+              auto it = sessions.find(osd_id);
+              ceph_assert(it != sessions.end());
+              it->second.ops[tid] = std::move(c);
+              return conn->send(std::move(msg));
+            })
+            .then([f = std::move(future)]() mutable {
+              return std::move(f);
+            });
+        });
+    });
+}
+
+seastar::future<> Objecter::write(const object_t& oid,
+                                  const object_locator_t& oloc,
+                                  uint64_t off, ceph::bufferlist&& bl,
+                                  snapid_t snap)
+{
+  return calc_target(oid, oloc)
+    .then([this, oid, oloc, off, bl = std::move(bl), snap](std::optional<OsdTarget> target) mutable {
+      if (!target) {
+        return seastar::make_exception_future<>(
+          std::system_error(ENOENT, std::generic_category(),
+                            "pool or object mapping failed"));
+      }
+      return with_osdmap([](const OSDMap& m) { return m.get_epoch(); })
+        .then([this, target = *target, oid, oloc, off, bl = std::move(bl), snap](epoch_t epoch) mutable {
+          const ceph_tid_t tid = next_tid++;
+          const uint32_t hash = oloc.hash >= 0 ? static_cast<uint32_t>(oloc.hash)
+                                               : target.raw_pg.ps();
+          const hobject_t hobj(oid, oloc.key, snap, hash,
+                              oloc.get_pool(), oloc.nspace);
+          spg_t spgid(target.pgid);
+          const size_t len = bl.length();
+          const int osd_id = target.primary_osd;
+
+          auto msg = crimson::make_message<MOSDOp>(client_inc, tid, hobj, spgid, epoch, 0, 0);
+          msg->set_snapid(snap);
+          msg->write(off, len, bl);
+          // MOSDOp::write puts data in Message::data, but OSD expects it in
+          // ops[0].indata for proper encode/decode. Move it there.
+          msg->ops[0].indata = std::move(msg->get_data());
+
+          seastar::promise<> promise;
+          auto future = promise.get_future();
+
+          auto p = std::make_shared<seastar::promise<>>(std::move(promise));
+          OpCompletion completion = [this, osd_id, len, p](int r, std::vector<OSDOp>&) {
+            auto it = sessions.find(osd_id);
+            if (it != sessions.end()) {
+              it->second.write_throttle.put(len);
+            }
+            if (r < 0) {
+              p->set_exception(std::make_exception_ptr(
+                std::system_error(-r, std::generic_category(), "write failed")));
+            } else {
+              p->set_value();
+            }
+          };
+
+          return get_or_connect(target.primary_addr, osd_id)
+            .then([this, tid, osd_id, len, c = std::move(completion), msg = std::move(msg)](
+                  crimson::net::ConnectionRef conn) mutable {
+              auto it = sessions.find(osd_id);
+              ceph_assert(it != sessions.end());
+              return it->second.write_throttle.get(len).then(
+                [this, conn, tid, osd_id, c = std::move(c),
+                 msg = std::move(msg)]() mutable {
+                  auto it2 = sessions.find(osd_id);
+                  ceph_assert(it2 != sessions.end());
+                  it2->second.ops[tid] = std::move(c);
+                  return conn->send(std::move(msg));
+                }).handle_exception([this, tid, osd_id, len](std::exception_ptr e) {
+                  auto it2 = sessions.find(osd_id);
+                  if (it2 != sessions.end()) {
+                    it2->second.ops.erase(tid);
+                    it2->second.write_throttle.put(len);
+                  }
+                  return seastar::make_exception_future<>(e);
+                });
+            })
+            .then([f = std::move(future)]() mutable {
+              return std::move(f);
+            });
+        });
+    });
+}
+
+seastar::future<> Objecter::discard(const object_t& oid,
+                                    const object_locator_t& oloc,
+                                    uint64_t off, uint64_t len,
+                                    snapid_t snap)
+{
+  return calc_target(oid, oloc)
+    .then([this, oid, oloc, off, len, snap](std::optional<OsdTarget> target) {
+      if (!target) {
+        return seastar::make_exception_future<>(
+          std::system_error(ENOENT, std::generic_category(),
+                            "pool or object mapping failed"));
+      }
+      return with_osdmap([](const OSDMap& m) { return m.get_epoch(); })
+        .then([this, target = *target, oid, oloc, off, len, snap](epoch_t epoch) {
+          const ceph_tid_t tid = next_tid++;
+          const uint32_t hash = oloc.hash >= 0 ? static_cast<uint32_t>(oloc.hash)
+                                               : target.raw_pg.ps();
+          const hobject_t hobj(oid, oloc.key, snap, hash,
+                              oloc.get_pool(), oloc.nspace);
+          spg_t spgid(target.pgid);
+          const int osd_id = target.primary_osd;
+
+          auto msg = crimson::make_message<MOSDOp>(client_inc, tid, hobj, spgid, epoch, 0, 0);
+          msg->set_snapid(snap);
+          msg->zero(off, len);
+
+          seastar::promise<> promise;
+          auto future = promise.get_future();
+          auto p = std::make_shared<seastar::promise<>>(std::move(promise));
+          OpCompletion completion = [p](int r, std::vector<OSDOp>&) {
+            if (r < 0) {
+              p->set_exception(std::make_exception_ptr(
+                std::system_error(-r, std::generic_category(), "discard failed")));
+            } else {
+              p->set_value();
+            }
+          };
+
+          return get_or_connect(target.primary_addr, osd_id)
+            .then([this, tid, osd_id, c = std::move(completion), msg = std::move(msg)](
+                  crimson::net::ConnectionRef conn) mutable {
+              auto it = sessions.find(osd_id);
+              ceph_assert(it != sessions.end());
+              it->second.ops[tid] = std::move(c);
+              return conn->send(std::move(msg));
+            })
+            .then([f = std::move(future)]() mutable {
+              return std::move(f);
+            });
+        });
+    });
+}
+
+seastar::future<> Objecter::write_zeroes(const object_t& oid,
+                                         const object_locator_t& oloc,
+                                         uint64_t off, uint64_t len,
+                                         snapid_t snap)
+{
+  return discard(oid, oloc, off, len, snap);
+}
+
+seastar::future<> Objecter::compare_and_write(const object_t& oid,
+                                              const object_locator_t& oloc,
+                                              uint64_t cmp_off, ceph::bufferlist&& cmp_bl,
+                                              uint64_t write_off, ceph::bufferlist&& write_bl,
+                                              snapid_t snap)
+{
+  return calc_target(oid, oloc)
+    .then([this, oid, oloc, cmp_off, cmp_bl = std::move(cmp_bl),
+           write_off, write_bl = std::move(write_bl), snap](std::optional<OsdTarget> target) mutable {
+      if (!target) {
+        return seastar::make_exception_future<>(
+          std::system_error(ENOENT, std::generic_category(),
+                            "pool or object mapping failed"));
+      }
+      return with_osdmap([](const OSDMap& m) { return m.get_epoch(); })
+        .then([this, target = *target, oid, oloc,
+               cmp_off, cmp_bl = std::move(cmp_bl),
+               write_off, write_bl = std::move(write_bl), snap](epoch_t epoch) mutable {
+          const ceph_tid_t tid = next_tid++;
+          const uint32_t hash = oloc.hash >= 0 ? static_cast<uint32_t>(oloc.hash)
+                                               : target.raw_pg.ps();
+          const hobject_t hobj(oid, oloc.key, snap, hash,
+                              oloc.get_pool(), oloc.nspace);
+          spg_t spgid(target.pgid);
+          const size_t write_len = write_bl.length();
+          const int osd_id = target.primary_osd;
+
+          auto msg = crimson::make_message<MOSDOp>(client_inc, tid, hobj, spgid, epoch, 0, 0);
+          msg->set_snapid(snap);
+          msg->add_op_with_data(CEPH_OSD_OP_CMPEXT, cmp_off, cmp_bl.length(), std::move(cmp_bl));
+          msg->add_op_with_data(CEPH_OSD_OP_WRITE, write_off, write_len, std::move(write_bl));
+
+          seastar::promise<> promise;
+          auto future = promise.get_future();
+          auto p = std::make_shared<seastar::promise<>>(std::move(promise));
+          OpCompletion completion = [this, osd_id, write_len, p](int r, std::vector<OSDOp>&) {
+            auto it = sessions.find(osd_id);
+            if (it != sessions.end()) {
+              it->second.write_throttle.put(write_len);
+            }
+            if (r == -EILSEQ) {
+              p->set_exception(std::make_exception_ptr(
+                std::system_error(EILSEQ, std::generic_category(), "compare-and-write mismatch")));
+            } else if (r < 0) {
+              p->set_exception(std::make_exception_ptr(
+                std::system_error(-r, std::generic_category(), "compare-and-write failed")));
+            } else {
+              p->set_value();
+            }
+          };
+
+          return get_or_connect(target.primary_addr, osd_id)
+            .then([this, tid, osd_id, write_len, c = std::move(completion), msg = std::move(msg)](
+                  crimson::net::ConnectionRef conn) mutable {
+              auto it = sessions.find(osd_id);
+              ceph_assert(it != sessions.end());
+              return it->second.write_throttle.get(write_len).then(
+                [this, conn, tid, osd_id, c = std::move(c),
+                 msg = std::move(msg)]() mutable {
+                  auto it2 = sessions.find(osd_id);
+                  ceph_assert(it2 != sessions.end());
+                  it2->second.ops[tid] = std::move(c);
+                  return conn->send(std::move(msg));
+                }).handle_exception([this, tid, osd_id, write_len](std::exception_ptr e) {
+                  auto it2 = sessions.find(osd_id);
+                  if (it2 != sessions.end()) {
+                    it2->second.ops.erase(tid);
+                    it2->second.write_throttle.put(write_len);
+                  }
+                  return seastar::make_exception_future<>(e);
+                });
+            })
+            .then([f = std::move(future)]() mutable {
+              return std::move(f);
+            });
+        });
+    });
+}
+
+seastar::future<ceph::bufferlist> Objecter::exec(const object_t& oid,
+                                                 const object_locator_t& oloc,
+                                                 std::string_view cname,
+                                                 std::string_view method,
+                                                 ceph::bufferlist&& indata,
+                                                 snapid_t snap)
+{
+  return calc_target(oid, oloc)
+    .then([this, oid, oloc, cname, method, indata = std::move(indata), snap](
+            std::optional<OsdTarget> target) mutable {
+      if (!target) {
+        return seastar::make_exception_future<ceph::bufferlist>(
+          std::system_error(ENOENT, std::generic_category(),
+                            "pool or object mapping failed"));
+      }
+      return with_osdmap([](const OSDMap& m) { return m.get_epoch(); })
+        .then([this, target = *target, oid, oloc, cname, method,
+               indata = std::move(indata), snap](epoch_t epoch) mutable {
+          const ceph_tid_t tid = next_tid++;
+          const uint32_t hash = oloc.hash >= 0 ? static_cast<uint32_t>(oloc.hash)
+                                               : target.raw_pg.ps();
+          const hobject_t hobj(oid, oloc.key, snap, hash,
+                              oloc.get_pool(), oloc.nspace);
+          spg_t spgid(target.pgid);
+          const int flags = CEPH_OSD_FLAG_RETURNVEC;
+
+          auto msg = crimson::make_message<MOSDOp>(client_inc, tid, hobj, spgid,
+                                                  epoch, flags, 0);
+          msg->set_snapid(snap);
+          msg->add_call(cname, method, indata);
+
+          seastar::promise<ceph::bufferlist> promise;
+          auto future = promise.get_future();
+          const int osd_id = target.primary_osd;
+
+          auto p = std::make_shared<seastar::promise<ceph::bufferlist>>(
+            std::move(promise));
+          OpCompletion completion = [p](int r, std::vector<OSDOp>& ops) {
+            if (r < 0) {
+              p->set_exception(std::make_exception_ptr(
+                std::system_error(-r, std::generic_category(), "exec failed")));
+            } else if (!ops.empty()) {
+              p->set_value(std::move(ops[0].outdata));
+            } else {
+              p->set_value(ceph::bufferlist{});
+            }
+          };
+
+          return get_or_connect(target.primary_addr, osd_id)
+            .then([this, tid, osd_id, c = std::move(completion),
+                   msg = std::move(msg)](crimson::net::ConnectionRef conn) mutable {
+              auto it = sessions.find(osd_id);
+              ceph_assert(it != sessions.end());
+              it->second.ops[tid] = std::move(c);
+              return conn->send(std::move(msg));
+            })
+            .then([f = std::move(future)]() mutable {
+              return std::move(f);
+            });
+        });
+    });
+}
+
+seastar::future<std::pair<uint64_t, ceph::real_time>>
+Objecter::stat(const object_t& oid,
+               const object_locator_t& oloc,
+               snapid_t snap)
+{
+  return calc_target(oid, oloc)
+    .then([this, oid, oloc, snap](std::optional<OsdTarget> target) {
+      if (!target) {
+        return seastar::make_exception_future<std::pair<uint64_t, ceph::real_time>>(
+          std::system_error(ENOENT, std::generic_category(),
+                            "pool or object mapping failed"));
+      }
+      return with_osdmap([](const OSDMap& m) { return m.get_epoch(); })
+        .then([this, target = *target, oid, oloc, snap](epoch_t epoch) {
+          const ceph_tid_t tid = next_tid++;
+          const uint32_t hash = oloc.hash >= 0 ? static_cast<uint32_t>(oloc.hash)
+                                               : target.raw_pg.ps();
+          const hobject_t hobj(oid, oloc.key, snap, hash,
+                              oloc.get_pool(), oloc.nspace);
+          spg_t spgid(target.pgid);
+          const int flags = CEPH_OSD_FLAG_RETURNVEC;
+
+          auto msg = crimson::make_message<MOSDOp>(client_inc, tid, hobj, spgid, epoch, flags, 0);
+          msg->set_snapid(snap);
+          msg->stat();
+
+          seastar::promise<std::pair<uint64_t, ceph::real_time>> promise;
+          auto future = promise.get_future();
+          const int osd_id = target.primary_osd;
+
+          auto p = std::make_shared<seastar::promise<std::pair<uint64_t, ceph::real_time>>>(
+            std::move(promise));
+          OpCompletion completion = [p](int r, std::vector<OSDOp>& ops) {
+            if (r < 0) {
+              p->set_exception(std::make_exception_ptr(
+                std::system_error(-r, std::generic_category(), "stat failed")));
+              return;
+            }
+            if (ops.empty() || ops[0].outdata.length() == 0) {
+              p->set_exception(std::make_exception_ptr(
+                std::runtime_error("stat reply empty")));
+              return;
+            }
+            try {
+              auto p_iter = ops[0].outdata.cbegin();
+              uint64_t size;
+              ceph::real_time mtime;
+              ceph::decode(size, p_iter);
+              ceph::decode(mtime, p_iter);
+              p->set_value(std::make_pair(size, mtime));
+            } catch (const ceph::buffer::error&) {
+              p->set_exception(std::make_exception_ptr(
+                std::runtime_error("stat reply decode failed")));
+            }
+          };
+
+          return get_or_connect(target.primary_addr, osd_id)
+            .then([this, tid, osd_id, c = std::move(completion), msg = std::move(msg)](
+                  crimson::net::ConnectionRef conn) mutable {
+              auto it = sessions.find(osd_id);
+              ceph_assert(it != sessions.end());
+              it->second.ops[tid] = std::move(c);
+              return conn->send(std::move(msg));
+            })
+            .then([f = std::move(future)]() mutable {
+              return std::move(f);
+            });
+        });
+    });
+}
+
+void Objecter::ms_handle_connect(crimson::net::ConnectionRef conn,
+                                seastar::shard_id prv_shard)
+{
+  if (!conn->peer_is_osd()) {
+    return;
+  }
+  const int osd_id = conn->get_peer_id();
+  logger().debug("ms_handle_connect: osd.{}", osd_id);
+
+  auto it = sessions.find(osd_id);
+  if (it != sessions.end()) {
+    it->second.conn = conn;
+    it->second.addr = conn->get_peer_addr();
+  } else {
+    sessions.emplace(osd_id,
+                    OsdSession{osd_id, conn->get_peer_addr(), conn});
+  }
+
+  auto pend = pending_connects.find(osd_id);
+  if (pend != pending_connects.end()) {
+    pend->second.set_value(conn);
+    pending_connects.erase(pend);
+  }
+}
+
+void Objecter::ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace)
+{
+  if (!conn->peer_is_osd()) {
+    return;
+  }
+  const int osd_id = conn->get_peer_id();
+  logger().debug("ms_handle_reset: osd.{} is_replace={}", osd_id, is_replace);
+
+  auto it = sessions.find(osd_id);
+  if (it != sessions.end()) {
+    if (it->second.conn.get() == conn.get()) {
+      it->second.conn = nullptr;
+    }
+    const auto ex = std::make_exception_ptr(
+      std::runtime_error(fmt::format("connection to osd.{} reset", osd_id)));
+    for (auto& [tid, completion] : it->second.ops) {
+      std::vector<OSDOp> empty_ops;
+      completion(-ECONNRESET, empty_ops);
+    }
+    it->second.ops.clear();
+  }
+
+  auto pend = pending_connects.find(osd_id);
+  if (pend != pending_connects.end()) {
+    pend->second.set_exception(std::make_exception_ptr(
+      std::runtime_error(fmt::format("connection to osd.{} reset", osd_id))));
+    pending_connects.erase(pend);
+  }
+}
+
+} // namespace crimson::osdc
diff --git a/src/crimson/osdc/objecter.h b/src/crimson/osdc/objecter.h
new file mode 100644 (file)
index 0000000..4b5103e
--- /dev/null
@@ -0,0 +1,216 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2026 IBM Corporation
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#pragma once
+
+#include <functional>
+#include <memory>
+#include <optional>
+#include <string_view>
+#include <unordered_map>
+#include <vector>
+
+#include <seastar/core/future.hh>
+#include <seastar/core/gate.hh>
+#include <seastar/core/shared_future.hh>
+#include <seastar/core/shared_mutex.hh>
+
+#include "include/ceph_fs.h"
+#include "include/types.h"
+#include "include/utime.h"
+
+#include "common/ceph_time.h"
+#include "crimson/common/throttle.h"
+#include "crimson/common/type_helpers.h"
+#include "crimson/net/Dispatcher.h"
+#include "crimson/net/Fwd.h"
+
+#include "msg/MessageRef.h"
+#include "msg/msg_types.h"
+#include "messages/MOSDOp.h"
+#include "osd/OSDMap.h"
+
+class MOSDOpReply;
+class MOSDMap;
+
+namespace crimson::mon {
+class Client;
+}
+
+namespace crimson::osdc {
+
+/// Result of mapping an object to its target OSD
+struct OsdTarget {
+  pg_t pgid;           ///< actual pg (after raw_pg_to_pg)
+  pg_t raw_pg;         ///< raw pg (for hobject_t hash - must match classic Objecter)
+  int primary_osd;
+  entity_addr_t primary_addr;
+};
+
+/// Completion callback for op reply: (result, ops) -> void
+using OpCompletion = std::function<void(int r, std::vector<OSDOp>&)>;
+
+/// Per-OSD session holding connection and in-flight ops
+struct OsdSession {
+  int osd_id = -1;
+  entity_addr_t addr;
+  crimson::net::ConnectionRef conn;
+
+  /// In-flight ops by tid (read, write, stat)
+  std::unordered_map<ceph_tid_t, OpCompletion> ops;
+
+  /// Throttle for write bytes in flight
+  crimson::common::Throttle write_throttle{64 * 1024 * 1024};  // 64 MiB default
+};
+
+/**
+ * Crimson Objecter - Seastar-native RADOS client objecter.
+ *
+ * - Registers as Dispatcher for MOSDOpReply (stub) and MOSDMap
+ * - Subscribes to osdmap via MonClient, receives and stores OSDMap
+ * - Exposes with_osdmap() for pool resolution and PG calculation
+ * - Exposes calc_target() for object -> PG -> primary OSD mapping
+ * - Exposes get_or_connect() and send_to_osd() for OSD connection and message send
+ */
+class Objecter : public crimson::net::Dispatcher {
+public:
+  Objecter(crimson::net::Messenger& msgr,
+           crimson::mon::Client& monc);
+
+  ~Objecter() override;
+
+  seastar::future<> start();
+  seastar::future<> stop();
+
+  /// Invoke a callback with read access to the current OSDMap.
+  /// Returns a future with the result of the callback. OSDMap may be empty
+  /// (epoch 0) until the first map is received from the monitor.
+  template<typename Func>
+  seastar::futurize_t<std::invoke_result_t<Func, const OSDMap&>>
+  with_osdmap(Func&& f) const {
+    return seastar::with_shared(
+      osdmap_mutex,
+      [this, f = std::forward<Func>(f)]() {
+        return seastar::futurize_invoke(f, static_cast<const OSDMap&>(*osdmap));
+      });
+  }
+
+  /// Map object + locator to target PG and primary OSD address.
+  /// Uses OSDMap::object_locator_to_pg, raw_pg_to_pg, pg_to_up_acting_osds.
+  /// Returns nullopt if pool does not exist or mapping fails.
+  seastar::future<std::optional<OsdTarget>> calc_target(const object_t& oid,
+                                                       const object_locator_t& oloc) const;
+
+  /// Get or create connection to OSD. Resolves when handshake completes.
+  seastar::future<crimson::net::ConnectionRef>
+  get_or_connect(const entity_addr_t& addr, int osd_id);
+
+  /// Send message to OSD (connects if needed). Requires OsdTarget from calc_target.
+  seastar::future<> send_to_osd(const OsdTarget& target, MessageURef msg);
+
+  /// Read object data. Returns bufferlist on success; completes with
+  /// exception on error (e.g. -ENOENT).
+  seastar::future<ceph::bufferlist> read(const object_t& oid,
+                                         const object_locator_t& oloc,
+                                         uint64_t off, uint64_t len,
+                                         snapid_t snap = CEPH_NOSNAP);
+
+  /// Write object data. Throttled per OSD.
+  seastar::future<> write(const object_t& oid,
+                         const object_locator_t& oloc,
+                         uint64_t off, ceph::bufferlist&& bl,
+                         snapid_t snap = CEPH_NOSNAP);
+
+  /// Stat object. Returns (size, mtime).
+  seastar::future<std::pair<uint64_t, ceph::real_time>> stat(
+      const object_t& oid,
+      const object_locator_t& oloc,
+      snapid_t snap = CEPH_NOSNAP);
+
+  /// Discard (zero) object range. For RBD trim/UNMAP.
+  seastar::future<> discard(const object_t& oid,
+                           const object_locator_t& oloc,
+                           uint64_t off, uint64_t len,
+                           snapid_t snap = CEPH_NOSNAP);
+
+  /// Write zeroes to object range. For RBD write_zeroes.
+  seastar::future<> write_zeroes(const object_t& oid,
+                                const object_locator_t& oloc,
+                                uint64_t off, uint64_t len,
+                                snapid_t snap = CEPH_NOSNAP);
+
+  /// Compare-and-write: compare at cmp_off with cmp_bl; if match, write
+  /// write_bl at write_off. Returns -EILSEQ on mismatch.
+  seastar::future<> compare_and_write(const object_t& oid,
+                                      const object_locator_t& oloc,
+                                      uint64_t cmp_off, ceph::bufferlist&& cmp_bl,
+                                      uint64_t write_off, ceph::bufferlist&& write_bl,
+                                      snapid_t snap = CEPH_NOSNAP);
+
+  /// Execute cls method on object. Returns output bufferlist.
+  /// For use by librbd_crimson (cls/rbd get_size, get_features, etc.).
+  seastar::future<ceph::bufferlist> exec(const object_t& oid,
+                                        const object_locator_t& oloc,
+                                        std::string_view cname,
+                                        std::string_view method,
+                                        ceph::bufferlist&& indata,
+                                        snapid_t snap = CEPH_NOSNAP);
+
+  /// Wait until OSDMap has been received (epoch > 0).
+  seastar::future<> wait_for_osdmap();
+
+  /// Set client incarnation for MOSDOp reqid. Call before first op to avoid
+  /// duplicate detection with previous sessions. Default 0.
+  void set_client_incarnation(int inc) { client_inc = inc; }
+
+#ifdef UNIT_TESTS_BUILT
+  /// Inject OSDMap for unit tests (bypasses monitor). Also fulfills wait_for_osdmap.
+  seastar::future<> inject_osdmap_for_test(std::unique_ptr<OSDMap> m);
+#endif
+
+  // Dispatcher interface
+  std::optional<seastar::future<>>
+  ms_dispatch(crimson::net::ConnectionRef conn,
+             MessageRef m) override;
+
+  void ms_handle_connect(crimson::net::ConnectionRef conn,
+                        seastar::shard_id prv_shard) override;
+  void ms_handle_reset(crimson::net::ConnectionRef conn, bool is_replace) override;
+
+private:
+  crimson::net::Messenger& msgr;
+  crimson::mon::Client& monc;
+
+  std::unique_ptr<OSDMap> osdmap;
+  mutable seastar::shared_mutex osdmap_mutex;
+
+  std::unordered_map<int, OsdSession> sessions;
+  std::unordered_map<int, seastar::shared_promise<crimson::net::ConnectionRef>>
+    pending_connects;
+
+  seastar::gate dispatch_gate;
+  bool started = false;
+  int client_inc = 0;
+  ceph_tid_t next_tid = 1;
+
+  std::optional<seastar::shared_promise<>> osdmap_ready;
+  bool osdmap_ready_fulfilled = false;
+
+  seastar::future<> handle_osd_map(Ref<MOSDMap> m);
+  void handle_osd_op_reply(crimson::net::ConnectionRef conn, MOSDOpReply* m);
+  seastar::future<> maybe_request_map();
+};
+
+} // namespace crimson::osdc
index 6898efa04a0870516b9e986a57c30dc193d9265f..ae7b493d42f4bb4c43dd31bf83ac81ce87c31b25 100644 (file)
@@ -19,6 +19,19 @@ install(TARGETS crimson-store-bench DESTINATION bin)
 add_executable(perf-crimson-msgr perf_crimson_msgr.cc)
 target_link_libraries(perf-crimson-msgr crimson)
 
+add_executable(crimson-rados-demo rados_demo.cc)
+if(HAS_VTA)
+  set_source_files_properties(rados_demo.cc
+    PROPERTIES COMPILE_FLAGS -fno-var-tracking-assignments)
+endif()
+target_link_libraries(crimson-rados-demo
+  crimson-main-config-bootstrap
+  crimson
+  crimson-common
+  legacy-option-headers
+  ${FMT_LIB})
+install(TARGETS crimson-rados-demo DESTINATION bin)
+
 add_executable(perf-async-msgr perf_async_msgr.cc)
 target_link_libraries(perf-async-msgr ceph-common global ${ALLOC_LIBS})
 
diff --git a/src/crimson/tools/rados_demo.cc b/src/crimson/tools/rados_demo.cc
new file mode 100644 (file)
index 0000000..5452d8f
--- /dev/null
@@ -0,0 +1,187 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2026 IBM Corporation
+ *
+ * Minimal RADOS client demo using Crimson Objecter and RadosClient.
+ * Connects to cluster, creates IoCtx, and exercises: write, read,
+ * discard, write_zeroes, compare_and_write. Integration test for
+ * Objecter extensions required by bdev_rbd.
+ *
+ * Usage: crimson-rados-demo -c /etc/ceph/ceph.conf -n client.admin --pool rbd
+ */
+
+#include <iostream>
+
+#include <seastar/core/app-template.hh>
+#include <seastar/core/print.hh>
+#include <seastar/util/closeable.hh>
+#include <seastar/util/defer.hh>
+
+#include "auth/KeyRing.h"
+#include "crimson/client/io_context.h"
+#include "crimson/client/rados_client.h"
+#include "crimson/osdc/objecter.h"
+#include "crimson/common/config_proxy.h"
+#include "crimson/common/fatal_signal.h"
+#include "crimson/common/log.h"
+#include "crimson/common/perf_counters_collection.h"
+#include "crimson/mon/MonClient.h"
+#include "crimson/net/Messenger.h"
+#include "crimson/osd/main_config_bootstrap_helpers.h"
+#include "msg/msg_types.h"
+
+namespace bpo = boost::program_options;
+
+static seastar::logger& logger() {
+  return crimson::get_logger(ceph_subsys_client);
+}
+
+int main(int argc, const char* argv[])
+{
+  auto early_result = crimson::osd::get_early_config_client(argc, argv);
+  if (!early_result.has_value()) {
+    std::cerr << "get_early_config_client failed: " << early_result.error()
+              << std::endl;
+    return early_result.error();
+  }
+  auto& early_config = early_result.value();
+
+  seastar::app_template::config app_cfg;
+  app_cfg.name = "crimson-rados-demo";
+  app_cfg.auto_handle_sigint_sigterm = false;
+  seastar::app_template app(std::move(app_cfg));
+  app.add_options()
+    ("pool", bpo::value<std::string>()->default_value("rbd"),
+     "pool name for I/O demo")
+    ("obj", bpo::value<std::string>()->default_value("crimson_rados_demo_obj"),
+     "object name for write/read")
+    ("debug", "enable debug logging");
+
+  try {
+    return app.run(
+      early_config.get_early_args().size(),
+      const_cast<char**>(early_config.get_early_args().data()),
+      [&] {
+        auto& config = app.configuration();
+        auto config_proxy_args = early_config.ceph_args;
+        return seastar::async([config_proxy_args, &config, &early_config] {
+          try {
+            FatalSignal fatal_signal;
+            if (config.count("debug")) {
+              seastar::global_logger_registry().set_all_loggers_level(
+                seastar::log_level::debug);
+            }
+
+            crimson::common::sharded_conf().start(
+              early_config.init_params.name,
+              early_config.cluster_name).get();
+            crimson::common::local_conf().start().get();
+            auto stop_conf = seastar::deferred_stop(
+              crimson::common::sharded_conf());
+            crimson::common::sharded_perf_coll().start().get();
+            auto stop_perf = seastar::deferred_stop(
+              crimson::common::sharded_perf_coll());
+
+            crimson::common::local_conf().parse_config_files(
+              early_config.conf_file_list).get();
+            crimson::common::local_conf().parse_env().get();
+            crimson::common::local_conf().parse_argv(
+              config_proxy_args).get();
+
+            crimson::osd::populate_config_from_mon().get();
+
+            const auto pool_name = config["pool"].as<std::string>();
+            const auto obj_name = config["obj"].as<std::string>();
+
+            class DemoAuthHandler : public crimson::common::AuthHandler {
+            public:
+              void handle_authentication(const EntityName& name,
+                                        const AuthCapsInfo& caps) override {}
+            };
+            auto auth_handler = std::make_unique<DemoAuthHandler>();
+            auto msgr = crimson::net::Messenger::create(
+              entity_name_t(early_config.init_params.name.get_type(), -1),
+              "rados_demo",
+              crimson::osd::get_nonce(),
+              true);
+            crimson::mon::Client monc(*msgr, *auth_handler);
+            msgr->set_auth_client(&monc);
+            msgr->set_auth_server(&monc);
+
+            crimson::client::RadosClient rados(*msgr, monc);
+            crimson::net::dispatchers_t dispatchers;
+            dispatchers.push_back(&monc);
+            dispatchers.push_back(&rados.get_objecter());
+            msgr->start(dispatchers).get();
+            auto stop_msgr = seastar::defer([&] {
+              msgr->stop();
+              msgr->shutdown().get();
+            });
+
+            monc.start().get();
+            auto stop_monc = seastar::defer([&] { monc.stop().get(); });
+
+            rados.get_objecter().set_client_incarnation(
+              static_cast<int>(crimson::osd::get_nonce() & 0x7fffffff));
+            rados.connect().get();
+            auto ioctx = rados.create_ioctx(pool_name).get();
+
+            ceph::bufferlist bl;
+            bl.append_zero(4096);
+            ioctx.write(obj_name, 0, std::move(bl)).get();
+            logger().info("wrote {} bytes to {}/{}", 4096, pool_name, obj_name);
+
+            auto data = ioctx.read(obj_name, 0, 4096).get();
+            logger().info("read {} bytes from {}/{}", data.length(),
+                         pool_name, obj_name);
+
+            // Test discard (zero range) and write_zeroes
+            ioctx.discard(obj_name, 512, 1024).get();
+            logger().info("discarded bytes 512-1536");
+            ioctx.write_zeroes(obj_name, 2048, 512).get();
+            logger().info("write_zeroes bytes 2048-2560");
+
+            data = ioctx.read(obj_name, 0, 4096).get();
+            logger().info("read {} bytes after discard/write_zeroes",
+                         data.length());
+
+            // Test compare_and_write: write known content, then compare-and-write
+            ceph::bufferlist init_bl;
+            init_bl.append("ABCD", 4);
+            ioctx.write(obj_name, 0, std::move(init_bl)).get();
+            logger().info("wrote ABCD at 0 for compare_and_write test");
+            ceph::bufferlist cmp_bl;
+            cmp_bl.append("ABCD", 4);
+            ceph::bufferlist write_bl;
+            write_bl.append("WXYZ", 4);
+            ioctx.compare_and_write(obj_name, 0, std::move(cmp_bl), 0,
+                                   std::move(write_bl)).get();
+            logger().info("compare_and_write succeeded (ABCD -> WXYZ)");
+            data = ioctx.read(obj_name, 0, 4).get();
+            logger().info("read {} bytes after compare_and_write", data.length());
+
+            // Test exec: cls hello say_hello (returns "Hello, world!" with empty in)
+            ceph::bufferlist exec_in;
+            auto exec_out = ioctx.exec(obj_name, "hello", "say_hello",
+                                      std::move(exec_in)).get();
+            std::string exec_str(exec_out.c_str(), exec_out.length());
+            logger().info("exec hello say_hello returned: {}", exec_str);
+
+            rados.shutdown().get();
+            logger().info("crimson-rados-demo completed successfully");
+            return EXIT_SUCCESS;
+          } catch (const std::exception& e) {
+            logger().error("crimson-rados-demo failed: {}", e.what());
+            return EXIT_FAILURE;
+          }
+        });
+      });
+  } catch (const std::exception& e) {
+    std::cerr << "FATAL: " << e.what() << std::endl;
+    return EXIT_FAILURE;
+  }
+}
index 7736ac1e264929d3f934a10a019da61f044abafc..518dfefda9e5d9aaf152122064ea309426bb3496 100644 (file)
@@ -24,6 +24,7 @@
 #include "MOSDFastDispatchOp.h"
 #include "include/ceph_features.h"
 #include "include/ceph_fs.h" // for CEPH_MSG_OSD_OP
+#include "include/rados.h"   // for CEPH_OSD_OP_CALL, ceph_osd_op::cls
 #include "common/hobject.h"
 
 /*
@@ -230,6 +231,28 @@ public:
   void zero(uint64_t off, uint64_t len) {
     add_simple_op(CEPH_OSD_OP_ZERO, off, len);
   }
+  /// Add op with data payload (e.g. CMPEXT compare buffer).
+  void add_op_with_data(int o, uint64_t off, uint64_t len, ceph::buffer::list&& bl) {
+    OSDOp osd_op;
+    osd_op.op.op = o;
+    osd_op.op.extent.offset = off;
+    osd_op.op.extent.length = len;
+    osd_op.indata = std::move(bl);
+    ops.push_back(std::move(osd_op));
+  }
+  /// Add cls call op (CEPH_OSD_OP_CALL). indata = cname + method + in.
+  void add_call(std::string_view cname, std::string_view method,
+                const ceph::buffer::list& indata) {
+    OSDOp osd_op;
+    osd_op.op.op = CEPH_OSD_OP_CALL;
+    osd_op.op.cls.class_len = static_cast<__u8>(std::min(cname.size(), size_t(255)));
+    osd_op.op.cls.method_len = static_cast<__u8>(std::min(method.size(), size_t(255)));
+    osd_op.op.cls.indata_len = indata.length();
+    osd_op.indata.append(cname.data(), osd_op.op.cls.class_len);
+    osd_op.indata.append(method.data(), osd_op.op.cls.method_len);
+    osd_op.indata.append(indata);
+    ops.push_back(std::move(osd_op));
+  }
   void truncate(uint64_t off) {
     add_simple_op(CEPH_OSD_OP_TRUNCATE, off, 0);
   }
index 43028e8c92a4c8491875e90b627d7d4485ccc5fd..5c2c23849505007577722262b5d1e9970d0d498b 100644 (file)
@@ -150,3 +150,12 @@ add_ceph_test(unittest-crimson-pg-pool-shift
 target_link_libraries(unittest-crimson-pg-pool-shift
   crimson-common
   crimson::gtest)
+
+add_executable(unittest-crimson-objecter
+  test_objecter.cc)
+add_ceph_test(unittest-crimson-objecter
+  ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/unittest-crimson-objecter
+  --memory 256M --smp 1)
+target_link_libraries(unittest-crimson-objecter
+  crimson
+  crimson::gtest)
diff --git a/src/test/crimson/test_objecter.cc b/src/test/crimson/test_objecter.cc
new file mode 100644 (file)
index 0000000..473e750
--- /dev/null
@@ -0,0 +1,171 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 sts=2 expandtab
+
+#include "test/crimson/gtest_seastar.h"
+
+#include "common/ceph_context.h"
+#include "crimson/client/io_context.h"
+#include "crimson/client/rados_client.h"
+#include "crimson/common/auth_handler.h"
+#include "crimson/osdc/objecter.h"
+#include "crimson/net/Messenger.h"
+#include "crimson/mon/MonClient.h"
+#include "osd/OSDMap.h"
+#include "include/object.h"
+
+using namespace crimson;
+
+namespace {
+
+class DummyAuthHandler : public crimson::common::AuthHandler {
+public:
+  void handle_authentication(const EntityName&, const AuthCapsInfo&) final {}
+};
+
+std::unique_ptr<OSDMap> build_test_osdmap()
+{
+  auto cct = new CephContext(CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
+  auto osdmap = std::make_unique<OSDMap>();
+  uuid_d fsid;
+  const int r = osdmap->build_simple_with_pool(cct, 1, fsid, 1, 0, 0);
+  cct->put();
+  if (r != 0) {
+    return nullptr;
+  }
+
+  // Mark OSD 0 as up and in
+  OSDMap::Incremental inc(osdmap->get_epoch() + 1);
+  inc.fsid = osdmap->get_fsid();
+  entity_addrvec_t addrs;
+  addrs.v.push_back(entity_addr_t());
+  uuid_d uuid;
+  uuid.generate_random();
+  addrs.v[0].nonce = 0;
+  inc.new_state[0] = CEPH_OSD_EXISTS | CEPH_OSD_NEW;
+  inc.new_up_client[0] = addrs;
+  inc.new_up_cluster[0] = addrs;
+  inc.new_hb_back_up[0] = addrs;
+  inc.new_hb_front_up[0] = addrs;
+  inc.new_weight[0] = CEPH_OSD_IN;
+  inc.new_uuid[0] = uuid;
+  osdmap->apply_incremental(inc);
+
+  return osdmap;
+}
+
+} // namespace
+
+struct objecter_test_t : public seastar_test_suite_t {};
+
+#ifdef UNIT_TESTS_BUILT
+TEST_F(objecter_test_t, calc_target_with_injected_osdmap)
+{
+  run_async([] {
+    auto osdmap = build_test_osdmap();
+    ASSERT_TRUE(osdmap) << "failed to build test OSDMap";
+
+    auto msgr = crimson::net::Messenger::create(
+      entity_name_t::CLIENT(0), "test", 0, true);
+    DummyAuthHandler auth_handler;
+    crimson::mon::Client monc(*msgr, auth_handler);
+    crimson::osdc::Objecter objecter(*msgr, monc);
+
+    objecter.inject_osdmap_for_test(std::move(osdmap)).get();
+
+    object_t oid("test-obj");
+    object_locator_t oloc(1);  // pool 1 (rbd from build_simple_with_pool)
+
+    auto target = objecter.calc_target(oid, oloc).get();
+    ASSERT_TRUE(target.has_value()) << "calc_target should succeed with valid pool";
+    EXPECT_GE(target->primary_osd, 0);
+    EXPECT_EQ(target->pgid.pool(), 1);
+  });
+}
+
+TEST_F(objecter_test_t, calc_target_nonexistent_pool)
+{
+  run_async([] {
+    auto osdmap = build_test_osdmap();
+    ASSERT_TRUE(osdmap);
+
+    auto msgr = crimson::net::Messenger::create(
+      entity_name_t::CLIENT(0), "test", 0, true);
+    DummyAuthHandler auth_handler;
+    crimson::mon::Client monc(*msgr, auth_handler);
+    crimson::osdc::Objecter objecter(*msgr, monc);
+
+    objecter.inject_osdmap_for_test(std::move(osdmap)).get();
+
+    object_t oid("test-obj");
+    object_locator_t oloc(999);  // non-existent pool
+
+    auto target = objecter.calc_target(oid, oloc).get();
+    EXPECT_FALSE(target.has_value()) << "calc_target should fail for nonexistent pool";
+  });
+}
+#endif
+
+TEST_F(objecter_test_t, iocontext_get_pool_id)
+{
+  run_async([] {
+    auto msgr = crimson::net::Messenger::create(
+      entity_name_t::CLIENT(0), "test", 0, true);
+    DummyAuthHandler auth_handler;
+    crimson::mon::Client monc(*msgr, auth_handler);
+    crimson::osdc::Objecter objecter(*msgr, monc);
+
+    crimson::client::IoCtx ioctx(objecter, 42, CEPH_NOSNAP);
+    EXPECT_EQ(ioctx.get_pool_id(), 42);
+    EXPECT_EQ(ioctx.get_snap_seq(), CEPH_NOSNAP);
+  });
+}
+
+#ifdef UNIT_TESTS_BUILT
+/// Verify IoCtx has discard, write_zeroes, compare_and_write, exec APIs.
+/// Full I/O testing requires a real cluster (crimson-rados-demo with vstart,
+/// run_crimson_rados_demo_integration_test.sh).
+TEST_F(objecter_test_t, iocontext_discard_write_zeroes_api)
+{
+  run_async([] {
+    auto msgr = crimson::net::Messenger::create(
+      entity_name_t::CLIENT(0), "test", 0, true);
+    DummyAuthHandler auth_handler;
+    crimson::mon::Client monc(*msgr, auth_handler);
+    crimson::osdc::Objecter objecter(*msgr, monc);
+
+    crimson::client::IoCtx ioctx(objecter, 1, CEPH_NOSNAP);
+
+    // Verify API compiles and returns seastar::future<>
+    static_assert(std::is_same_v<decltype(ioctx.discard("", 0, 0)),
+                                 seastar::future<>>);
+    static_assert(std::is_same_v<decltype(ioctx.write_zeroes("", 0, 0)),
+                                 seastar::future<>>);
+
+    ceph::bufferlist cmp_bl, write_bl;
+    cmp_bl.append_zero(4);
+    write_bl.append_zero(4);
+    static_assert(std::is_same_v<decltype(ioctx.compare_and_write(
+                                   "", 0, std::move(cmp_bl), 0, std::move(write_bl))),
+                                 seastar::future<>>);
+  });
+}
+
+/// Verify IoCtx has exec() API for cls calls.
+TEST_F(objecter_test_t, iocontext_exec_api)
+{
+  run_async([] {
+    auto msgr = crimson::net::Messenger::create(
+      entity_name_t::CLIENT(0), "test", 0, true);
+    DummyAuthHandler auth_handler;
+    crimson::mon::Client monc(*msgr, auth_handler);
+    crimson::osdc::Objecter objecter(*msgr, monc);
+
+    crimson::client::IoCtx ioctx(objecter, 1, CEPH_NOSNAP);
+
+    ceph::bufferlist indata;
+    static_assert(std::is_same_v<decltype(ioctx.exec("", "cls", "method",
+                                                    std::move(indata))),
+                                 seastar::future<ceph::bufferlist>>);
+  });
+}
+#endif